reworked persistent queues
This commit is contained in:
56
app/ytdl.py
56
app/ytdl.py
@@ -135,19 +135,14 @@ class Download:
|
||||
await self.notifier.updated(self.info)
|
||||
|
||||
class PersistentQueue:
|
||||
def __init__(self, filePath, load = False):
|
||||
def __init__(self, path):
|
||||
self.path = path
|
||||
with shelve.open(path, 'c'):
|
||||
pass
|
||||
self.dict = OrderedDict()
|
||||
self.shelvePath = filePath
|
||||
self.__createShelve()
|
||||
if load:
|
||||
self.__loadShelve()
|
||||
|
||||
def __createShelve(self):
|
||||
shelf = shelve.open(self.shelvePath, 'c')
|
||||
shelf.close()
|
||||
|
||||
def __loadShelve(self):
|
||||
for k, v in self.savedItems():
|
||||
def load(self):
|
||||
for k, v in self.saved_items():
|
||||
self.dict[k] = Download(None, None, None, None, {}, v)
|
||||
|
||||
def exists(self, key):
|
||||
@@ -159,19 +154,19 @@ class PersistentQueue:
|
||||
def items(self):
|
||||
return self.dict.items()
|
||||
|
||||
def savedItems(self):
|
||||
with shelve.open(self.shelvePath, 'r') as shelf:
|
||||
def saved_items(self):
|
||||
with shelve.open(self.path, 'r') as shelf:
|
||||
return sorted(shelf.items(), key=lambda item: item[1].timestamp)
|
||||
|
||||
def put(self, value):
|
||||
key = value.info.id
|
||||
self.dict[key] = value
|
||||
with shelve.open(self.shelvePath, 'w') as shelf:
|
||||
with shelve.open(self.path, 'w') as shelf:
|
||||
shelf[key] = value.info
|
||||
|
||||
def delete(self, key):
|
||||
del self.dict[key]
|
||||
with shelve.open(self.shelvePath, 'w') as shelf:
|
||||
with shelve.open(self.path, 'w') as shelf:
|
||||
shelf.pop(key)
|
||||
|
||||
def next(self):
|
||||
@@ -187,24 +182,17 @@ class DownloadQueue:
|
||||
self.config = config
|
||||
self.notifier = notifier
|
||||
self.queue = PersistentQueue(self.config.STATE_DIR + '/queue')
|
||||
self.done = PersistentQueue(self.config.STATE_DIR + '/completed', True)
|
||||
self.initialized = False
|
||||
self.imported = False
|
||||
self.done = PersistentQueue(self.config.STATE_DIR + '/completed')
|
||||
self.done.load()
|
||||
|
||||
async def importQueue(self):
|
||||
if not self.imported:
|
||||
for item in self.queue.savedItems():
|
||||
await self.add(
|
||||
item[1].url,
|
||||
item[1].quality,
|
||||
item[1].format)
|
||||
self.imported = True
|
||||
async def __import_queue(self):
|
||||
for k, v in self.queue.saved_items():
|
||||
await self.add(v.url, v.quality, v.format)
|
||||
|
||||
def __initialize(self):
|
||||
if not self.initialized:
|
||||
self.initialized = True
|
||||
self.event = asyncio.Event()
|
||||
asyncio.create_task(self.__download())
|
||||
async def initialize(self):
|
||||
self.event = asyncio.Event()
|
||||
asyncio.create_task(self.__download())
|
||||
asyncio.create_task(self.__import_queue())
|
||||
|
||||
def __extract_info(self, url):
|
||||
return yt_dlp.YoutubeDL(params={
|
||||
@@ -239,7 +227,6 @@ class DownloadQueue:
|
||||
|
||||
async def add(self, url, quality, format, already=None):
|
||||
log.info(f'adding {url}')
|
||||
self.__initialize()
|
||||
already = set() if already is None else already
|
||||
if url in already:
|
||||
log.info('recursion detected, skipping')
|
||||
@@ -274,9 +261,8 @@ class DownloadQueue:
|
||||
return {'status': 'ok'}
|
||||
|
||||
def get(self):
|
||||
item = (list((k, v) for k, v in self.queue.savedItems()),
|
||||
list((k, v) for k, v in self.done.savedItems()))
|
||||
return item
|
||||
return(list((k, v.info) for k, v in self.queue.items()),
|
||||
list((k, v.info) for k, v in self.done.items()))
|
||||
|
||||
async def __download(self):
|
||||
while True:
|
||||
|
||||
Reference in New Issue
Block a user