Convert to LF
This commit is contained in:
838
app/main.py
838
app/main.py
@@ -1,419 +1,419 @@
|
||||
#!/usr/bin/env python3
|
||||
# pylint: disable=no-member,method-hidden
|
||||
|
||||
import os
|
||||
import sys
|
||||
import asyncio
|
||||
from pathlib import Path
|
||||
from aiohttp import web
|
||||
from aiohttp.log import access_logger
|
||||
import ssl
|
||||
import socket
|
||||
import socketio
|
||||
import logging
|
||||
import json
|
||||
import pathlib
|
||||
import re
|
||||
from watchfiles import DefaultFilter, Change, awatch
|
||||
|
||||
from ytdl import DownloadQueueNotifier, DownloadQueue
|
||||
from yt_dlp.version import __version__ as yt_dlp_version
|
||||
|
||||
log = logging.getLogger('main')
|
||||
|
||||
class Config:
|
||||
_DEFAULTS = {
|
||||
'DOWNLOAD_DIR': '.',
|
||||
'AUDIO_DOWNLOAD_DIR': '%%DOWNLOAD_DIR',
|
||||
'TEMP_DIR': '%%DOWNLOAD_DIR',
|
||||
'DOWNLOAD_DIRS_INDEXABLE': 'false',
|
||||
'CUSTOM_DIRS': 'true',
|
||||
'CREATE_CUSTOM_DIRS': 'true',
|
||||
'CUSTOM_DIRS_EXCLUDE_REGEX': r'(^|/)[.@].*$',
|
||||
'DELETE_FILE_ON_TRASHCAN': 'false',
|
||||
'STATE_DIR': '.',
|
||||
'URL_PREFIX': '',
|
||||
'PUBLIC_HOST_URL': 'download/',
|
||||
'PUBLIC_HOST_AUDIO_URL': 'audio_download/',
|
||||
'OUTPUT_TEMPLATE': '%(title)s.%(ext)s',
|
||||
'OUTPUT_TEMPLATE_CHAPTER': '%(title)s - %(section_number)s %(section_title)s.%(ext)s',
|
||||
'OUTPUT_TEMPLATE_PLAYLIST': '%(playlist_title)s/%(title)s.%(ext)s',
|
||||
'DEFAULT_OPTION_PLAYLIST_STRICT_MODE' : 'false',
|
||||
'DEFAULT_OPTION_PLAYLIST_ITEM_LIMIT' : '0',
|
||||
'YTDL_OPTIONS': '{}',
|
||||
'YTDL_OPTIONS_FILE': '',
|
||||
'ROBOTS_TXT': '',
|
||||
'HOST': '0.0.0.0',
|
||||
'PORT': '8081',
|
||||
'HTTPS': 'false',
|
||||
'CERTFILE': '',
|
||||
'KEYFILE': '',
|
||||
'BASE_DIR': '',
|
||||
'DEFAULT_THEME': 'auto',
|
||||
'DOWNLOAD_MODE': 'limited',
|
||||
'MAX_CONCURRENT_DOWNLOADS': 3,
|
||||
'LOGLEVEL': 'INFO',
|
||||
'ENABLE_ACCESSLOG': 'false',
|
||||
}
|
||||
|
||||
_BOOLEAN = ('DOWNLOAD_DIRS_INDEXABLE', 'CUSTOM_DIRS', 'CREATE_CUSTOM_DIRS', 'DELETE_FILE_ON_TRASHCAN', 'DEFAULT_OPTION_PLAYLIST_STRICT_MODE', 'HTTPS', 'ENABLE_ACCESSLOG')
|
||||
|
||||
def __init__(self):
|
||||
for k, v in self._DEFAULTS.items():
|
||||
setattr(self, k, os.environ.get(k, v))
|
||||
|
||||
for k, v in self.__dict__.items():
|
||||
if isinstance(v, str) and v.startswith('%%'):
|
||||
setattr(self, k, getattr(self, v[2:]))
|
||||
if k in self._BOOLEAN:
|
||||
if v not in ('true', 'false', 'True', 'False', 'on', 'off', '1', '0'):
|
||||
log.error(f'Environment variable "{k}" is set to a non-boolean value "{v}"')
|
||||
sys.exit(1)
|
||||
setattr(self, k, v in ('true', 'True', 'on', '1'))
|
||||
|
||||
if not self.URL_PREFIX.endswith('/'):
|
||||
self.URL_PREFIX += '/'
|
||||
|
||||
# Convert relative addresses to absolute addresses to prevent the failure of file address comparison
|
||||
if self.YTDL_OPTIONS_FILE and self.YTDL_OPTIONS_FILE.startswith('.'):
|
||||
self.YTDL_OPTIONS_FILE = str(Path(self.YTDL_OPTIONS_FILE).resolve())
|
||||
|
||||
success,_ = self.load_ytdl_options()
|
||||
if not success:
|
||||
sys.exit(1)
|
||||
|
||||
def load_ytdl_options(self) -> tuple[bool, str]:
|
||||
try:
|
||||
self.YTDL_OPTIONS = json.loads(os.environ.get('YTDL_OPTIONS', '{}'))
|
||||
assert isinstance(self.YTDL_OPTIONS, dict)
|
||||
except (json.decoder.JSONDecodeError, AssertionError):
|
||||
msg = 'Environment variable YTDL_OPTIONS is invalid'
|
||||
log.error(msg)
|
||||
return (False, msg)
|
||||
|
||||
if not self.YTDL_OPTIONS_FILE:
|
||||
return (True, '')
|
||||
|
||||
log.info(f'Loading yt-dlp custom options from "{self.YTDL_OPTIONS_FILE}"')
|
||||
if not os.path.exists(self.YTDL_OPTIONS_FILE):
|
||||
msg = f'File "{self.YTDL_OPTIONS_FILE}" not found'
|
||||
log.error(msg)
|
||||
return (False, msg)
|
||||
try:
|
||||
with open(self.YTDL_OPTIONS_FILE) as json_data:
|
||||
opts = json.load(json_data)
|
||||
assert isinstance(opts, dict)
|
||||
except (json.decoder.JSONDecodeError, AssertionError):
|
||||
msg = 'YTDL_OPTIONS_FILE contents is invalid'
|
||||
log.error(msg)
|
||||
return (False, msg)
|
||||
|
||||
self.YTDL_OPTIONS.update(opts)
|
||||
return (True, '')
|
||||
|
||||
config = Config()
|
||||
|
||||
class ObjectSerializer(json.JSONEncoder):
|
||||
def default(self, obj):
|
||||
# First try to use __dict__ for custom objects
|
||||
if hasattr(obj, '__dict__'):
|
||||
return obj.__dict__
|
||||
# Convert iterables (generators, dict_items, etc.) to lists
|
||||
# Exclude strings and bytes which are also iterable
|
||||
elif hasattr(obj, '__iter__') and not isinstance(obj, (str, bytes)):
|
||||
try:
|
||||
return list(obj)
|
||||
except:
|
||||
pass
|
||||
# Fall back to default behavior
|
||||
return json.JSONEncoder.default(self, obj)
|
||||
|
||||
serializer = ObjectSerializer()
|
||||
app = web.Application()
|
||||
sio = socketio.AsyncServer(cors_allowed_origins='*')
|
||||
sio.attach(app, socketio_path=config.URL_PREFIX + 'socket.io')
|
||||
routes = web.RouteTableDef()
|
||||
|
||||
class Notifier(DownloadQueueNotifier):
|
||||
async def added(self, dl):
|
||||
log.info(f"Notifier: Download added - {dl.title}")
|
||||
await sio.emit('added', serializer.encode(dl))
|
||||
|
||||
async def updated(self, dl):
|
||||
log.info(f"Notifier: Download updated - {dl.title}")
|
||||
await sio.emit('updated', serializer.encode(dl))
|
||||
|
||||
async def completed(self, dl):
|
||||
log.info(f"Notifier: Download completed - {dl.title}")
|
||||
await sio.emit('completed', serializer.encode(dl))
|
||||
|
||||
async def canceled(self, id):
|
||||
log.info(f"Notifier: Download canceled - {id}")
|
||||
await sio.emit('canceled', serializer.encode(id))
|
||||
|
||||
async def cleared(self, id):
|
||||
log.info(f"Notifier: Download cleared - {id}")
|
||||
await sio.emit('cleared', serializer.encode(id))
|
||||
|
||||
dqueue = DownloadQueue(config, Notifier())
|
||||
app.on_startup.append(lambda app: dqueue.initialize())
|
||||
|
||||
class FileOpsFilter(DefaultFilter):
|
||||
def __call__(self, change_type: int, path: str) -> bool:
|
||||
# Check if this path matches our YTDL_OPTIONS_FILE
|
||||
if path != config.YTDL_OPTIONS_FILE:
|
||||
return False
|
||||
|
||||
# For existing files, use samefile comparison to handle symlinks correctly
|
||||
if os.path.exists(config.YTDL_OPTIONS_FILE):
|
||||
try:
|
||||
if not os.path.samefile(path, config.YTDL_OPTIONS_FILE):
|
||||
return False
|
||||
except (OSError, IOError):
|
||||
# If samefile fails, fall back to string comparison
|
||||
if path != config.YTDL_OPTIONS_FILE:
|
||||
return False
|
||||
|
||||
# Accept all change types for our file: modified, added, deleted
|
||||
return change_type in (Change.modified, Change.added, Change.deleted)
|
||||
|
||||
def get_options_update_time(success=True, msg=''):
|
||||
result = {
|
||||
'success': success,
|
||||
'msg': msg,
|
||||
'update_time': None
|
||||
}
|
||||
|
||||
# Only try to get file modification time if YTDL_OPTIONS_FILE is set and file exists
|
||||
if config.YTDL_OPTIONS_FILE and os.path.exists(config.YTDL_OPTIONS_FILE):
|
||||
try:
|
||||
result['update_time'] = os.path.getmtime(config.YTDL_OPTIONS_FILE)
|
||||
except (OSError, IOError) as e:
|
||||
log.warning(f"Could not get modification time for {config.YTDL_OPTIONS_FILE}: {e}")
|
||||
result['update_time'] = None
|
||||
|
||||
return result
|
||||
|
||||
async def watch_files():
|
||||
async def _watch_files():
|
||||
async for changes in awatch(config.YTDL_OPTIONS_FILE, watch_filter=FileOpsFilter()):
|
||||
success, msg = config.load_ytdl_options()
|
||||
result = get_options_update_time(success, msg)
|
||||
await sio.emit('ytdl_options_changed', serializer.encode(result))
|
||||
|
||||
log.info(f'Starting Watch File: {config.YTDL_OPTIONS_FILE}')
|
||||
asyncio.create_task(_watch_files())
|
||||
|
||||
if config.YTDL_OPTIONS_FILE:
|
||||
app.on_startup.append(lambda app: watch_files())
|
||||
|
||||
@routes.post(config.URL_PREFIX + 'add')
|
||||
async def add(request):
|
||||
log.info("Received request to add download")
|
||||
post = await request.json()
|
||||
log.info(f"Request data: {post}")
|
||||
url = post.get('url')
|
||||
quality = post.get('quality')
|
||||
if not url or not quality:
|
||||
log.error("Bad request: missing 'url' or 'quality'")
|
||||
raise web.HTTPBadRequest()
|
||||
format = post.get('format')
|
||||
folder = post.get('folder')
|
||||
custom_name_prefix = post.get('custom_name_prefix')
|
||||
playlist_strict_mode = post.get('playlist_strict_mode')
|
||||
playlist_item_limit = post.get('playlist_item_limit')
|
||||
auto_start = post.get('auto_start')
|
||||
|
||||
if custom_name_prefix is None:
|
||||
custom_name_prefix = ''
|
||||
if auto_start is None:
|
||||
auto_start = True
|
||||
if playlist_strict_mode is None:
|
||||
playlist_strict_mode = config.DEFAULT_OPTION_PLAYLIST_STRICT_MODE
|
||||
if playlist_item_limit is None:
|
||||
playlist_item_limit = config.DEFAULT_OPTION_PLAYLIST_ITEM_LIMIT
|
||||
|
||||
playlist_item_limit = int(playlist_item_limit)
|
||||
|
||||
status = await dqueue.add(url, quality, format, folder, custom_name_prefix, playlist_strict_mode, playlist_item_limit, auto_start)
|
||||
return web.Response(text=serializer.encode(status))
|
||||
|
||||
@routes.post(config.URL_PREFIX + 'delete')
|
||||
async def delete(request):
|
||||
post = await request.json()
|
||||
ids = post.get('ids')
|
||||
where = post.get('where')
|
||||
if not ids or where not in ['queue', 'done']:
|
||||
log.error("Bad request: missing 'ids' or incorrect 'where' value")
|
||||
raise web.HTTPBadRequest()
|
||||
status = await (dqueue.cancel(ids) if where == 'queue' else dqueue.clear(ids))
|
||||
log.info(f"Download delete request processed for ids: {ids}, where: {where}")
|
||||
return web.Response(text=serializer.encode(status))
|
||||
|
||||
@routes.post(config.URL_PREFIX + 'start')
|
||||
async def start(request):
|
||||
post = await request.json()
|
||||
ids = post.get('ids')
|
||||
log.info(f"Received request to start pending downloads for ids: {ids}")
|
||||
status = await dqueue.start_pending(ids)
|
||||
return web.Response(text=serializer.encode(status))
|
||||
|
||||
@routes.get(config.URL_PREFIX + 'history')
|
||||
async def history(request):
|
||||
history = { 'done': [], 'queue': [], 'pending': []}
|
||||
|
||||
for _, v in dqueue.queue.saved_items():
|
||||
history['queue'].append(v)
|
||||
for _, v in dqueue.done.saved_items():
|
||||
history['done'].append(v)
|
||||
for _, v in dqueue.pending.saved_items():
|
||||
history['pending'].append(v)
|
||||
|
||||
log.info("Sending download history")
|
||||
return web.Response(text=serializer.encode(history))
|
||||
|
||||
@sio.event
|
||||
async def connect(sid, environ):
|
||||
log.info(f"Client connected: {sid}")
|
||||
await sio.emit('all', serializer.encode(dqueue.get()), to=sid)
|
||||
await sio.emit('configuration', serializer.encode(config), to=sid)
|
||||
if config.CUSTOM_DIRS:
|
||||
await sio.emit('custom_dirs', serializer.encode(get_custom_dirs()), to=sid)
|
||||
if config.YTDL_OPTIONS_FILE:
|
||||
await sio.emit('ytdl_options_changed', serializer.encode(get_options_update_time()), to=sid)
|
||||
|
||||
def get_custom_dirs():
|
||||
def recursive_dirs(base):
|
||||
path = pathlib.Path(base)
|
||||
|
||||
# Converts PosixPath object to string, and remove base/ prefix
|
||||
def convert(p):
|
||||
s = str(p)
|
||||
if s.startswith(base):
|
||||
s = s[len(base):]
|
||||
|
||||
if s.startswith('/'):
|
||||
s = s[1:]
|
||||
|
||||
return s
|
||||
|
||||
# Include only directories which do not match the exclude filter
|
||||
def include_dir(d):
|
||||
if len(config.CUSTOM_DIRS_EXCLUDE_REGEX) == 0:
|
||||
return True
|
||||
else:
|
||||
return re.search(config.CUSTOM_DIRS_EXCLUDE_REGEX, d) is None
|
||||
|
||||
# Recursively lists all subdirectories of DOWNLOAD_DIR
|
||||
dirs = list(filter(include_dir, map(convert, path.glob('**/'))))
|
||||
|
||||
return dirs
|
||||
|
||||
download_dir = recursive_dirs(config.DOWNLOAD_DIR)
|
||||
|
||||
audio_download_dir = download_dir
|
||||
if config.DOWNLOAD_DIR != config.AUDIO_DOWNLOAD_DIR:
|
||||
audio_download_dir = recursive_dirs(config.AUDIO_DOWNLOAD_DIR)
|
||||
|
||||
return {
|
||||
"download_dir": download_dir,
|
||||
"audio_download_dir": audio_download_dir
|
||||
}
|
||||
|
||||
@routes.get(config.URL_PREFIX)
|
||||
def index(request):
|
||||
response = web.FileResponse(os.path.join(config.BASE_DIR, 'ui/dist/metube/browser/index.html'))
|
||||
if 'metube_theme' not in request.cookies:
|
||||
response.set_cookie('metube_theme', config.DEFAULT_THEME)
|
||||
return response
|
||||
|
||||
@routes.get(config.URL_PREFIX + 'robots.txt')
|
||||
def robots(request):
|
||||
if config.ROBOTS_TXT:
|
||||
response = web.FileResponse(os.path.join(config.BASE_DIR, config.ROBOTS_TXT))
|
||||
else:
|
||||
response = web.Response(
|
||||
text="User-agent: *\nDisallow: /download/\nDisallow: /audio_download/\n"
|
||||
)
|
||||
return response
|
||||
|
||||
@routes.get(config.URL_PREFIX + 'version')
|
||||
def version(request):
|
||||
return web.json_response({
|
||||
"yt-dlp": yt_dlp_version,
|
||||
"version": os.getenv("METUBE_VERSION", "dev")
|
||||
})
|
||||
|
||||
if config.URL_PREFIX != '/':
|
||||
@routes.get('/')
|
||||
def index_redirect_root(request):
|
||||
return web.HTTPFound(config.URL_PREFIX)
|
||||
|
||||
@routes.get(config.URL_PREFIX[:-1])
|
||||
def index_redirect_dir(request):
|
||||
return web.HTTPFound(config.URL_PREFIX)
|
||||
|
||||
routes.static(config.URL_PREFIX + 'download/', config.DOWNLOAD_DIR, show_index=config.DOWNLOAD_DIRS_INDEXABLE)
|
||||
routes.static(config.URL_PREFIX + 'audio_download/', config.AUDIO_DOWNLOAD_DIR, show_index=config.DOWNLOAD_DIRS_INDEXABLE)
|
||||
routes.static(config.URL_PREFIX, os.path.join(config.BASE_DIR, 'ui/dist/metube/browser'))
|
||||
try:
|
||||
app.add_routes(routes)
|
||||
except ValueError as e:
|
||||
if 'ui/dist/metube/browser' in str(e):
|
||||
raise RuntimeError('Could not find the frontend UI static assets. Please run `node_modules/.bin/ng build` inside the ui folder') from e
|
||||
raise e
|
||||
|
||||
# https://github.com/aio-libs/aiohttp/pull/4615 waiting for release
|
||||
# @routes.options(config.URL_PREFIX + 'add')
|
||||
async def add_cors(request):
|
||||
return web.Response(text=serializer.encode({"status": "ok"}))
|
||||
|
||||
app.router.add_route('OPTIONS', config.URL_PREFIX + 'add', add_cors)
|
||||
|
||||
async def on_prepare(request, response):
|
||||
if 'Origin' in request.headers:
|
||||
response.headers['Access-Control-Allow-Origin'] = request.headers['Origin']
|
||||
response.headers['Access-Control-Allow-Headers'] = 'Content-Type'
|
||||
|
||||
app.on_response_prepare.append(on_prepare)
|
||||
|
||||
def supports_reuse_port():
|
||||
try:
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
|
||||
sock.close()
|
||||
return True
|
||||
except (AttributeError, OSError):
|
||||
return False
|
||||
|
||||
def parseLogLevel(logLevel):
|
||||
match logLevel:
|
||||
case 'DEBUG':
|
||||
return logging.DEBUG
|
||||
case 'INFO':
|
||||
return logging.INFO
|
||||
case 'WARNING':
|
||||
return logging.WARNING
|
||||
case 'ERROR':
|
||||
return logging.ERROR
|
||||
case 'CRITICAL':
|
||||
return logging.CRITICAL
|
||||
case _:
|
||||
return None
|
||||
|
||||
def isAccessLogEnabled():
|
||||
if config.ENABLE_ACCESSLOG:
|
||||
return access_logger
|
||||
else:
|
||||
return None
|
||||
|
||||
if __name__ == '__main__':
|
||||
logging.basicConfig(level=parseLogLevel(config.LOGLEVEL))
|
||||
log.info(f"Listening on {config.HOST}:{config.PORT}")
|
||||
|
||||
if config.HTTPS:
|
||||
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
|
||||
ssl_context.load_cert_chain(certfile=config.CERTFILE, keyfile=config.KEYFILE)
|
||||
web.run_app(app, host=config.HOST, port=int(config.PORT), reuse_port=supports_reuse_port(), ssl_context=ssl_context, access_log=isAccessLogEnabled())
|
||||
else:
|
||||
web.run_app(app, host=config.HOST, port=int(config.PORT), reuse_port=supports_reuse_port(), access_log=isAccessLogEnabled())
|
||||
#!/usr/bin/env python3
|
||||
# pylint: disable=no-member,method-hidden
|
||||
|
||||
import os
|
||||
import sys
|
||||
import asyncio
|
||||
from pathlib import Path
|
||||
from aiohttp import web
|
||||
from aiohttp.log import access_logger
|
||||
import ssl
|
||||
import socket
|
||||
import socketio
|
||||
import logging
|
||||
import json
|
||||
import pathlib
|
||||
import re
|
||||
from watchfiles import DefaultFilter, Change, awatch
|
||||
|
||||
from ytdl import DownloadQueueNotifier, DownloadQueue
|
||||
from yt_dlp.version import __version__ as yt_dlp_version
|
||||
|
||||
log = logging.getLogger('main')
|
||||
|
||||
class Config:
|
||||
_DEFAULTS = {
|
||||
'DOWNLOAD_DIR': '.',
|
||||
'AUDIO_DOWNLOAD_DIR': '%%DOWNLOAD_DIR',
|
||||
'TEMP_DIR': '%%DOWNLOAD_DIR',
|
||||
'DOWNLOAD_DIRS_INDEXABLE': 'false',
|
||||
'CUSTOM_DIRS': 'true',
|
||||
'CREATE_CUSTOM_DIRS': 'true',
|
||||
'CUSTOM_DIRS_EXCLUDE_REGEX': r'(^|/)[.@].*$',
|
||||
'DELETE_FILE_ON_TRASHCAN': 'false',
|
||||
'STATE_DIR': '.',
|
||||
'URL_PREFIX': '',
|
||||
'PUBLIC_HOST_URL': 'download/',
|
||||
'PUBLIC_HOST_AUDIO_URL': 'audio_download/',
|
||||
'OUTPUT_TEMPLATE': '%(title)s.%(ext)s',
|
||||
'OUTPUT_TEMPLATE_CHAPTER': '%(title)s - %(section_number)s %(section_title)s.%(ext)s',
|
||||
'OUTPUT_TEMPLATE_PLAYLIST': '%(playlist_title)s/%(title)s.%(ext)s',
|
||||
'DEFAULT_OPTION_PLAYLIST_STRICT_MODE' : 'false',
|
||||
'DEFAULT_OPTION_PLAYLIST_ITEM_LIMIT' : '0',
|
||||
'YTDL_OPTIONS': '{}',
|
||||
'YTDL_OPTIONS_FILE': '',
|
||||
'ROBOTS_TXT': '',
|
||||
'HOST': '0.0.0.0',
|
||||
'PORT': '8081',
|
||||
'HTTPS': 'false',
|
||||
'CERTFILE': '',
|
||||
'KEYFILE': '',
|
||||
'BASE_DIR': '',
|
||||
'DEFAULT_THEME': 'auto',
|
||||
'DOWNLOAD_MODE': 'limited',
|
||||
'MAX_CONCURRENT_DOWNLOADS': 3,
|
||||
'LOGLEVEL': 'INFO',
|
||||
'ENABLE_ACCESSLOG': 'false',
|
||||
}
|
||||
|
||||
_BOOLEAN = ('DOWNLOAD_DIRS_INDEXABLE', 'CUSTOM_DIRS', 'CREATE_CUSTOM_DIRS', 'DELETE_FILE_ON_TRASHCAN', 'DEFAULT_OPTION_PLAYLIST_STRICT_MODE', 'HTTPS', 'ENABLE_ACCESSLOG')
|
||||
|
||||
def __init__(self):
|
||||
for k, v in self._DEFAULTS.items():
|
||||
setattr(self, k, os.environ.get(k, v))
|
||||
|
||||
for k, v in self.__dict__.items():
|
||||
if isinstance(v, str) and v.startswith('%%'):
|
||||
setattr(self, k, getattr(self, v[2:]))
|
||||
if k in self._BOOLEAN:
|
||||
if v not in ('true', 'false', 'True', 'False', 'on', 'off', '1', '0'):
|
||||
log.error(f'Environment variable "{k}" is set to a non-boolean value "{v}"')
|
||||
sys.exit(1)
|
||||
setattr(self, k, v in ('true', 'True', 'on', '1'))
|
||||
|
||||
if not self.URL_PREFIX.endswith('/'):
|
||||
self.URL_PREFIX += '/'
|
||||
|
||||
# Convert relative addresses to absolute addresses to prevent the failure of file address comparison
|
||||
if self.YTDL_OPTIONS_FILE and self.YTDL_OPTIONS_FILE.startswith('.'):
|
||||
self.YTDL_OPTIONS_FILE = str(Path(self.YTDL_OPTIONS_FILE).resolve())
|
||||
|
||||
success,_ = self.load_ytdl_options()
|
||||
if not success:
|
||||
sys.exit(1)
|
||||
|
||||
def load_ytdl_options(self) -> tuple[bool, str]:
|
||||
try:
|
||||
self.YTDL_OPTIONS = json.loads(os.environ.get('YTDL_OPTIONS', '{}'))
|
||||
assert isinstance(self.YTDL_OPTIONS, dict)
|
||||
except (json.decoder.JSONDecodeError, AssertionError):
|
||||
msg = 'Environment variable YTDL_OPTIONS is invalid'
|
||||
log.error(msg)
|
||||
return (False, msg)
|
||||
|
||||
if not self.YTDL_OPTIONS_FILE:
|
||||
return (True, '')
|
||||
|
||||
log.info(f'Loading yt-dlp custom options from "{self.YTDL_OPTIONS_FILE}"')
|
||||
if not os.path.exists(self.YTDL_OPTIONS_FILE):
|
||||
msg = f'File "{self.YTDL_OPTIONS_FILE}" not found'
|
||||
log.error(msg)
|
||||
return (False, msg)
|
||||
try:
|
||||
with open(self.YTDL_OPTIONS_FILE) as json_data:
|
||||
opts = json.load(json_data)
|
||||
assert isinstance(opts, dict)
|
||||
except (json.decoder.JSONDecodeError, AssertionError):
|
||||
msg = 'YTDL_OPTIONS_FILE contents is invalid'
|
||||
log.error(msg)
|
||||
return (False, msg)
|
||||
|
||||
self.YTDL_OPTIONS.update(opts)
|
||||
return (True, '')
|
||||
|
||||
config = Config()
|
||||
|
||||
class ObjectSerializer(json.JSONEncoder):
|
||||
def default(self, obj):
|
||||
# First try to use __dict__ for custom objects
|
||||
if hasattr(obj, '__dict__'):
|
||||
return obj.__dict__
|
||||
# Convert iterables (generators, dict_items, etc.) to lists
|
||||
# Exclude strings and bytes which are also iterable
|
||||
elif hasattr(obj, '__iter__') and not isinstance(obj, (str, bytes)):
|
||||
try:
|
||||
return list(obj)
|
||||
except:
|
||||
pass
|
||||
# Fall back to default behavior
|
||||
return json.JSONEncoder.default(self, obj)
|
||||
|
||||
serializer = ObjectSerializer()
|
||||
app = web.Application()
|
||||
sio = socketio.AsyncServer(cors_allowed_origins='*')
|
||||
sio.attach(app, socketio_path=config.URL_PREFIX + 'socket.io')
|
||||
routes = web.RouteTableDef()
|
||||
|
||||
class Notifier(DownloadQueueNotifier):
|
||||
async def added(self, dl):
|
||||
log.info(f"Notifier: Download added - {dl.title}")
|
||||
await sio.emit('added', serializer.encode(dl))
|
||||
|
||||
async def updated(self, dl):
|
||||
log.info(f"Notifier: Download updated - {dl.title}")
|
||||
await sio.emit('updated', serializer.encode(dl))
|
||||
|
||||
async def completed(self, dl):
|
||||
log.info(f"Notifier: Download completed - {dl.title}")
|
||||
await sio.emit('completed', serializer.encode(dl))
|
||||
|
||||
async def canceled(self, id):
|
||||
log.info(f"Notifier: Download canceled - {id}")
|
||||
await sio.emit('canceled', serializer.encode(id))
|
||||
|
||||
async def cleared(self, id):
|
||||
log.info(f"Notifier: Download cleared - {id}")
|
||||
await sio.emit('cleared', serializer.encode(id))
|
||||
|
||||
dqueue = DownloadQueue(config, Notifier())
|
||||
app.on_startup.append(lambda app: dqueue.initialize())
|
||||
|
||||
class FileOpsFilter(DefaultFilter):
|
||||
def __call__(self, change_type: int, path: str) -> bool:
|
||||
# Check if this path matches our YTDL_OPTIONS_FILE
|
||||
if path != config.YTDL_OPTIONS_FILE:
|
||||
return False
|
||||
|
||||
# For existing files, use samefile comparison to handle symlinks correctly
|
||||
if os.path.exists(config.YTDL_OPTIONS_FILE):
|
||||
try:
|
||||
if not os.path.samefile(path, config.YTDL_OPTIONS_FILE):
|
||||
return False
|
||||
except (OSError, IOError):
|
||||
# If samefile fails, fall back to string comparison
|
||||
if path != config.YTDL_OPTIONS_FILE:
|
||||
return False
|
||||
|
||||
# Accept all change types for our file: modified, added, deleted
|
||||
return change_type in (Change.modified, Change.added, Change.deleted)
|
||||
|
||||
def get_options_update_time(success=True, msg=''):
|
||||
result = {
|
||||
'success': success,
|
||||
'msg': msg,
|
||||
'update_time': None
|
||||
}
|
||||
|
||||
# Only try to get file modification time if YTDL_OPTIONS_FILE is set and file exists
|
||||
if config.YTDL_OPTIONS_FILE and os.path.exists(config.YTDL_OPTIONS_FILE):
|
||||
try:
|
||||
result['update_time'] = os.path.getmtime(config.YTDL_OPTIONS_FILE)
|
||||
except (OSError, IOError) as e:
|
||||
log.warning(f"Could not get modification time for {config.YTDL_OPTIONS_FILE}: {e}")
|
||||
result['update_time'] = None
|
||||
|
||||
return result
|
||||
|
||||
async def watch_files():
|
||||
async def _watch_files():
|
||||
async for changes in awatch(config.YTDL_OPTIONS_FILE, watch_filter=FileOpsFilter()):
|
||||
success, msg = config.load_ytdl_options()
|
||||
result = get_options_update_time(success, msg)
|
||||
await sio.emit('ytdl_options_changed', serializer.encode(result))
|
||||
|
||||
log.info(f'Starting Watch File: {config.YTDL_OPTIONS_FILE}')
|
||||
asyncio.create_task(_watch_files())
|
||||
|
||||
if config.YTDL_OPTIONS_FILE:
|
||||
app.on_startup.append(lambda app: watch_files())
|
||||
|
||||
@routes.post(config.URL_PREFIX + 'add')
|
||||
async def add(request):
|
||||
log.info("Received request to add download")
|
||||
post = await request.json()
|
||||
log.info(f"Request data: {post}")
|
||||
url = post.get('url')
|
||||
quality = post.get('quality')
|
||||
if not url or not quality:
|
||||
log.error("Bad request: missing 'url' or 'quality'")
|
||||
raise web.HTTPBadRequest()
|
||||
format = post.get('format')
|
||||
folder = post.get('folder')
|
||||
custom_name_prefix = post.get('custom_name_prefix')
|
||||
playlist_strict_mode = post.get('playlist_strict_mode')
|
||||
playlist_item_limit = post.get('playlist_item_limit')
|
||||
auto_start = post.get('auto_start')
|
||||
|
||||
if custom_name_prefix is None:
|
||||
custom_name_prefix = ''
|
||||
if auto_start is None:
|
||||
auto_start = True
|
||||
if playlist_strict_mode is None:
|
||||
playlist_strict_mode = config.DEFAULT_OPTION_PLAYLIST_STRICT_MODE
|
||||
if playlist_item_limit is None:
|
||||
playlist_item_limit = config.DEFAULT_OPTION_PLAYLIST_ITEM_LIMIT
|
||||
|
||||
playlist_item_limit = int(playlist_item_limit)
|
||||
|
||||
status = await dqueue.add(url, quality, format, folder, custom_name_prefix, playlist_strict_mode, playlist_item_limit, auto_start)
|
||||
return web.Response(text=serializer.encode(status))
|
||||
|
||||
@routes.post(config.URL_PREFIX + 'delete')
|
||||
async def delete(request):
|
||||
post = await request.json()
|
||||
ids = post.get('ids')
|
||||
where = post.get('where')
|
||||
if not ids or where not in ['queue', 'done']:
|
||||
log.error("Bad request: missing 'ids' or incorrect 'where' value")
|
||||
raise web.HTTPBadRequest()
|
||||
status = await (dqueue.cancel(ids) if where == 'queue' else dqueue.clear(ids))
|
||||
log.info(f"Download delete request processed for ids: {ids}, where: {where}")
|
||||
return web.Response(text=serializer.encode(status))
|
||||
|
||||
@routes.post(config.URL_PREFIX + 'start')
|
||||
async def start(request):
|
||||
post = await request.json()
|
||||
ids = post.get('ids')
|
||||
log.info(f"Received request to start pending downloads for ids: {ids}")
|
||||
status = await dqueue.start_pending(ids)
|
||||
return web.Response(text=serializer.encode(status))
|
||||
|
||||
@routes.get(config.URL_PREFIX + 'history')
|
||||
async def history(request):
|
||||
history = { 'done': [], 'queue': [], 'pending': []}
|
||||
|
||||
for _, v in dqueue.queue.saved_items():
|
||||
history['queue'].append(v)
|
||||
for _, v in dqueue.done.saved_items():
|
||||
history['done'].append(v)
|
||||
for _, v in dqueue.pending.saved_items():
|
||||
history['pending'].append(v)
|
||||
|
||||
log.info("Sending download history")
|
||||
return web.Response(text=serializer.encode(history))
|
||||
|
||||
@sio.event
|
||||
async def connect(sid, environ):
|
||||
log.info(f"Client connected: {sid}")
|
||||
await sio.emit('all', serializer.encode(dqueue.get()), to=sid)
|
||||
await sio.emit('configuration', serializer.encode(config), to=sid)
|
||||
if config.CUSTOM_DIRS:
|
||||
await sio.emit('custom_dirs', serializer.encode(get_custom_dirs()), to=sid)
|
||||
if config.YTDL_OPTIONS_FILE:
|
||||
await sio.emit('ytdl_options_changed', serializer.encode(get_options_update_time()), to=sid)
|
||||
|
||||
def get_custom_dirs():
|
||||
def recursive_dirs(base):
|
||||
path = pathlib.Path(base)
|
||||
|
||||
# Converts PosixPath object to string, and remove base/ prefix
|
||||
def convert(p):
|
||||
s = str(p)
|
||||
if s.startswith(base):
|
||||
s = s[len(base):]
|
||||
|
||||
if s.startswith('/'):
|
||||
s = s[1:]
|
||||
|
||||
return s
|
||||
|
||||
# Include only directories which do not match the exclude filter
|
||||
def include_dir(d):
|
||||
if len(config.CUSTOM_DIRS_EXCLUDE_REGEX) == 0:
|
||||
return True
|
||||
else:
|
||||
return re.search(config.CUSTOM_DIRS_EXCLUDE_REGEX, d) is None
|
||||
|
||||
# Recursively lists all subdirectories of DOWNLOAD_DIR
|
||||
dirs = list(filter(include_dir, map(convert, path.glob('**/'))))
|
||||
|
||||
return dirs
|
||||
|
||||
download_dir = recursive_dirs(config.DOWNLOAD_DIR)
|
||||
|
||||
audio_download_dir = download_dir
|
||||
if config.DOWNLOAD_DIR != config.AUDIO_DOWNLOAD_DIR:
|
||||
audio_download_dir = recursive_dirs(config.AUDIO_DOWNLOAD_DIR)
|
||||
|
||||
return {
|
||||
"download_dir": download_dir,
|
||||
"audio_download_dir": audio_download_dir
|
||||
}
|
||||
|
||||
@routes.get(config.URL_PREFIX)
|
||||
def index(request):
|
||||
response = web.FileResponse(os.path.join(config.BASE_DIR, 'ui/dist/metube/browser/index.html'))
|
||||
if 'metube_theme' not in request.cookies:
|
||||
response.set_cookie('metube_theme', config.DEFAULT_THEME)
|
||||
return response
|
||||
|
||||
@routes.get(config.URL_PREFIX + 'robots.txt')
|
||||
def robots(request):
|
||||
if config.ROBOTS_TXT:
|
||||
response = web.FileResponse(os.path.join(config.BASE_DIR, config.ROBOTS_TXT))
|
||||
else:
|
||||
response = web.Response(
|
||||
text="User-agent: *\nDisallow: /download/\nDisallow: /audio_download/\n"
|
||||
)
|
||||
return response
|
||||
|
||||
@routes.get(config.URL_PREFIX + 'version')
|
||||
def version(request):
|
||||
return web.json_response({
|
||||
"yt-dlp": yt_dlp_version,
|
||||
"version": os.getenv("METUBE_VERSION", "dev")
|
||||
})
|
||||
|
||||
if config.URL_PREFIX != '/':
|
||||
@routes.get('/')
|
||||
def index_redirect_root(request):
|
||||
return web.HTTPFound(config.URL_PREFIX)
|
||||
|
||||
@routes.get(config.URL_PREFIX[:-1])
|
||||
def index_redirect_dir(request):
|
||||
return web.HTTPFound(config.URL_PREFIX)
|
||||
|
||||
routes.static(config.URL_PREFIX + 'download/', config.DOWNLOAD_DIR, show_index=config.DOWNLOAD_DIRS_INDEXABLE)
|
||||
routes.static(config.URL_PREFIX + 'audio_download/', config.AUDIO_DOWNLOAD_DIR, show_index=config.DOWNLOAD_DIRS_INDEXABLE)
|
||||
routes.static(config.URL_PREFIX, os.path.join(config.BASE_DIR, 'ui/dist/metube/browser'))
|
||||
try:
|
||||
app.add_routes(routes)
|
||||
except ValueError as e:
|
||||
if 'ui/dist/metube/browser' in str(e):
|
||||
raise RuntimeError('Could not find the frontend UI static assets. Please run `node_modules/.bin/ng build` inside the ui folder') from e
|
||||
raise e
|
||||
|
||||
# https://github.com/aio-libs/aiohttp/pull/4615 waiting for release
|
||||
# @routes.options(config.URL_PREFIX + 'add')
|
||||
async def add_cors(request):
|
||||
return web.Response(text=serializer.encode({"status": "ok"}))
|
||||
|
||||
app.router.add_route('OPTIONS', config.URL_PREFIX + 'add', add_cors)
|
||||
|
||||
async def on_prepare(request, response):
|
||||
if 'Origin' in request.headers:
|
||||
response.headers['Access-Control-Allow-Origin'] = request.headers['Origin']
|
||||
response.headers['Access-Control-Allow-Headers'] = 'Content-Type'
|
||||
|
||||
app.on_response_prepare.append(on_prepare)
|
||||
|
||||
def supports_reuse_port():
|
||||
try:
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
|
||||
sock.close()
|
||||
return True
|
||||
except (AttributeError, OSError):
|
||||
return False
|
||||
|
||||
def parseLogLevel(logLevel):
|
||||
match logLevel:
|
||||
case 'DEBUG':
|
||||
return logging.DEBUG
|
||||
case 'INFO':
|
||||
return logging.INFO
|
||||
case 'WARNING':
|
||||
return logging.WARNING
|
||||
case 'ERROR':
|
||||
return logging.ERROR
|
||||
case 'CRITICAL':
|
||||
return logging.CRITICAL
|
||||
case _:
|
||||
return None
|
||||
|
||||
def isAccessLogEnabled():
|
||||
if config.ENABLE_ACCESSLOG:
|
||||
return access_logger
|
||||
else:
|
||||
return None
|
||||
|
||||
if __name__ == '__main__':
|
||||
logging.basicConfig(level=parseLogLevel(config.LOGLEVEL))
|
||||
log.info(f"Listening on {config.HOST}:{config.PORT}")
|
||||
|
||||
if config.HTTPS:
|
||||
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
|
||||
ssl_context.load_cert_chain(certfile=config.CERTFILE, keyfile=config.KEYFILE)
|
||||
web.run_app(app, host=config.HOST, port=int(config.PORT), reuse_port=supports_reuse_port(), ssl_context=ssl_context, access_log=isAccessLogEnabled())
|
||||
else:
|
||||
web.run_app(app, host=config.HOST, port=int(config.PORT), reuse_port=supports_reuse_port(), access_log=isAccessLogEnabled())
|
||||
|
||||
960
app/ytdl.py
960
app/ytdl.py
@@ -1,480 +1,480 @@
|
||||
import os
|
||||
import yt_dlp
|
||||
from collections import OrderedDict
|
||||
import shelve
|
||||
import time
|
||||
import asyncio
|
||||
import multiprocessing
|
||||
import logging
|
||||
import re
|
||||
import types
|
||||
|
||||
import yt_dlp.networking.impersonate
|
||||
from dl_formats import get_format, get_opts, AUDIO_FORMATS
|
||||
from datetime import datetime
|
||||
|
||||
log = logging.getLogger('ytdl')
|
||||
|
||||
def _convert_generators_to_lists(obj):
|
||||
"""Recursively convert generators to lists in a dictionary to make it pickleable."""
|
||||
if isinstance(obj, types.GeneratorType):
|
||||
return list(obj)
|
||||
elif isinstance(obj, dict):
|
||||
return {k: _convert_generators_to_lists(v) for k, v in obj.items()}
|
||||
elif isinstance(obj, (list, tuple)):
|
||||
return type(obj)(_convert_generators_to_lists(item) for item in obj)
|
||||
else:
|
||||
return obj
|
||||
|
||||
class DownloadQueueNotifier:
|
||||
async def added(self, dl):
|
||||
raise NotImplementedError
|
||||
|
||||
async def updated(self, dl):
|
||||
raise NotImplementedError
|
||||
|
||||
async def completed(self, dl):
|
||||
raise NotImplementedError
|
||||
|
||||
async def canceled(self, id):
|
||||
raise NotImplementedError
|
||||
|
||||
async def cleared(self, id):
|
||||
raise NotImplementedError
|
||||
|
||||
class DownloadInfo:
|
||||
def __init__(self, id, title, url, quality, format, folder, custom_name_prefix, error, entry, playlist_item_limit):
|
||||
self.id = id if len(custom_name_prefix) == 0 else f'{custom_name_prefix}.{id}'
|
||||
self.title = title if len(custom_name_prefix) == 0 else f'{custom_name_prefix}.{title}'
|
||||
self.url = url
|
||||
self.quality = quality
|
||||
self.format = format
|
||||
self.folder = folder
|
||||
self.custom_name_prefix = custom_name_prefix
|
||||
self.msg = self.percent = self.speed = self.eta = None
|
||||
self.status = "pending"
|
||||
self.size = None
|
||||
self.timestamp = time.time_ns()
|
||||
self.error = error
|
||||
# Convert generators to lists to make entry pickleable
|
||||
self.entry = _convert_generators_to_lists(entry) if entry is not None else None
|
||||
self.playlist_item_limit = playlist_item_limit
|
||||
|
||||
class Download:
|
||||
manager = None
|
||||
|
||||
def __init__(self, download_dir, temp_dir, output_template, output_template_chapter, quality, format, ytdl_opts, info):
|
||||
self.download_dir = download_dir
|
||||
self.temp_dir = temp_dir
|
||||
self.output_template = output_template
|
||||
self.output_template_chapter = output_template_chapter
|
||||
self.format = get_format(format, quality)
|
||||
self.ytdl_opts = get_opts(format, quality, ytdl_opts)
|
||||
if "impersonate" in self.ytdl_opts:
|
||||
self.ytdl_opts["impersonate"] = yt_dlp.networking.impersonate.ImpersonateTarget.from_str(self.ytdl_opts["impersonate"])
|
||||
self.info = info
|
||||
self.canceled = False
|
||||
self.tmpfilename = None
|
||||
self.status_queue = None
|
||||
self.proc = None
|
||||
self.loop = None
|
||||
self.notifier = None
|
||||
|
||||
def _download(self):
|
||||
log.info(f"Starting download for: {self.info.title} ({self.info.url})")
|
||||
try:
|
||||
def put_status(st):
|
||||
self.status_queue.put({k: v for k, v in st.items() if k in (
|
||||
'tmpfilename',
|
||||
'filename',
|
||||
'status',
|
||||
'msg',
|
||||
'total_bytes',
|
||||
'total_bytes_estimate',
|
||||
'downloaded_bytes',
|
||||
'speed',
|
||||
'eta',
|
||||
)})
|
||||
|
||||
def put_status_postprocessor(d):
|
||||
if d['postprocessor'] == 'MoveFiles' and d['status'] == 'finished':
|
||||
if '__finaldir' in d['info_dict']:
|
||||
filename = os.path.join(d['info_dict']['__finaldir'], os.path.basename(d['info_dict']['filepath']))
|
||||
else:
|
||||
filename = d['info_dict']['filepath']
|
||||
self.status_queue.put({'status': 'finished', 'filename': filename})
|
||||
|
||||
ret = yt_dlp.YoutubeDL(params={
|
||||
'quiet': True,
|
||||
'no_color': True,
|
||||
'paths': {"home": self.download_dir, "temp": self.temp_dir},
|
||||
'outtmpl': { "default": self.output_template, "chapter": self.output_template_chapter },
|
||||
'format': self.format,
|
||||
'socket_timeout': 30,
|
||||
'ignore_no_formats_error': True,
|
||||
'progress_hooks': [put_status],
|
||||
'postprocessor_hooks': [put_status_postprocessor],
|
||||
**self.ytdl_opts,
|
||||
}).download([self.info.url])
|
||||
self.status_queue.put({'status': 'finished' if ret == 0 else 'error'})
|
||||
log.info(f"Finished download for: {self.info.title}")
|
||||
except yt_dlp.utils.YoutubeDLError as exc:
|
||||
log.error(f"Download error for {self.info.title}: {str(exc)}")
|
||||
self.status_queue.put({'status': 'error', 'msg': str(exc)})
|
||||
|
||||
async def start(self, notifier):
|
||||
log.info(f"Preparing download for: {self.info.title}")
|
||||
if Download.manager is None:
|
||||
Download.manager = multiprocessing.Manager()
|
||||
self.status_queue = Download.manager.Queue()
|
||||
self.proc = multiprocessing.Process(target=self._download)
|
||||
self.proc.start()
|
||||
self.loop = asyncio.get_running_loop()
|
||||
self.notifier = notifier
|
||||
self.info.status = 'preparing'
|
||||
await self.notifier.updated(self.info)
|
||||
asyncio.create_task(self.update_status())
|
||||
return await self.loop.run_in_executor(None, self.proc.join)
|
||||
|
||||
def cancel(self):
|
||||
log.info(f"Cancelling download: {self.info.title}")
|
||||
if self.running():
|
||||
try:
|
||||
self.proc.kill()
|
||||
except Exception as e:
|
||||
log.error(f"Error killing process for {self.info.title}: {e}")
|
||||
self.canceled = True
|
||||
if self.status_queue is not None:
|
||||
self.status_queue.put(None)
|
||||
|
||||
def close(self):
|
||||
log.info(f"Closing download process for: {self.info.title}")
|
||||
if self.started():
|
||||
self.proc.close()
|
||||
if self.status_queue is not None:
|
||||
self.status_queue.put(None)
|
||||
|
||||
def running(self):
|
||||
try:
|
||||
return self.proc is not None and self.proc.is_alive()
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
def started(self):
|
||||
return self.proc is not None
|
||||
|
||||
async def update_status(self):
|
||||
while True:
|
||||
status = await self.loop.run_in_executor(None, self.status_queue.get)
|
||||
if status is None:
|
||||
log.info(f"Status update finished for: {self.info.title}")
|
||||
return
|
||||
if self.canceled:
|
||||
log.info(f"Download {self.info.title} is canceled; stopping status updates.")
|
||||
return
|
||||
self.tmpfilename = status.get('tmpfilename')
|
||||
if 'filename' in status:
|
||||
fileName = status.get('filename')
|
||||
self.info.filename = os.path.relpath(fileName, self.download_dir)
|
||||
self.info.size = os.path.getsize(fileName) if os.path.exists(fileName) else None
|
||||
if self.info.format == 'thumbnail':
|
||||
self.info.filename = re.sub(r'\.webm$', '.jpg', self.info.filename)
|
||||
self.info.status = status['status']
|
||||
self.info.msg = status.get('msg')
|
||||
if 'downloaded_bytes' in status:
|
||||
total = status.get('total_bytes') or status.get('total_bytes_estimate')
|
||||
if total:
|
||||
self.info.percent = status['downloaded_bytes'] / total * 100
|
||||
self.info.speed = status.get('speed')
|
||||
self.info.eta = status.get('eta')
|
||||
log.info(f"Updating status for {self.info.title}: {status}")
|
||||
await self.notifier.updated(self.info)
|
||||
|
||||
class PersistentQueue:
|
||||
def __init__(self, path):
|
||||
pdir = os.path.dirname(path)
|
||||
if not os.path.isdir(pdir):
|
||||
os.mkdir(pdir)
|
||||
with shelve.open(path, 'c'):
|
||||
pass
|
||||
self.path = path
|
||||
self.dict = OrderedDict()
|
||||
|
||||
def load(self):
|
||||
for k, v in self.saved_items():
|
||||
self.dict[k] = Download(None, None, None, None, None, None, {}, v)
|
||||
|
||||
def exists(self, key):
|
||||
return key in self.dict
|
||||
|
||||
def get(self, key):
|
||||
return self.dict[key]
|
||||
|
||||
def items(self):
|
||||
return self.dict.items()
|
||||
|
||||
def saved_items(self):
|
||||
with shelve.open(self.path, 'r') as shelf:
|
||||
return sorted(shelf.items(), key=lambda item: item[1].timestamp)
|
||||
|
||||
def put(self, value):
|
||||
key = value.info.url
|
||||
self.dict[key] = value
|
||||
with shelve.open(self.path, 'w') as shelf:
|
||||
shelf[key] = value.info
|
||||
|
||||
def delete(self, key):
|
||||
if key in self.dict:
|
||||
del self.dict[key]
|
||||
with shelve.open(self.path, 'w') as shelf:
|
||||
shelf.pop(key, None)
|
||||
|
||||
def next(self):
|
||||
k, v = next(iter(self.dict.items()))
|
||||
return k, v
|
||||
|
||||
def empty(self):
|
||||
return not bool(self.dict)
|
||||
|
||||
class DownloadQueue:
|
||||
def __init__(self, config, notifier):
|
||||
self.config = config
|
||||
self.notifier = notifier
|
||||
self.queue = PersistentQueue(self.config.STATE_DIR + '/queue')
|
||||
self.done = PersistentQueue(self.config.STATE_DIR + '/completed')
|
||||
self.pending = PersistentQueue(self.config.STATE_DIR + '/pending')
|
||||
self.active_downloads = set()
|
||||
self.semaphore = None
|
||||
# For sequential mode, use an asyncio lock to ensure one-at-a-time execution.
|
||||
if self.config.DOWNLOAD_MODE == 'sequential':
|
||||
self.seq_lock = asyncio.Lock()
|
||||
elif self.config.DOWNLOAD_MODE == 'limited':
|
||||
self.semaphore = asyncio.Semaphore(int(self.config.MAX_CONCURRENT_DOWNLOADS))
|
||||
|
||||
self.done.load()
|
||||
|
||||
async def __import_queue(self):
|
||||
for k, v in self.queue.saved_items():
|
||||
await self.__add_download(v, True)
|
||||
|
||||
async def __import_pending(self):
|
||||
for k, v in self.pending.saved_items():
|
||||
await self.__add_download(v, False)
|
||||
|
||||
async def initialize(self):
|
||||
log.info("Initializing DownloadQueue")
|
||||
asyncio.create_task(self.__import_queue())
|
||||
asyncio.create_task(self.__import_pending())
|
||||
|
||||
async def __start_download(self, download):
|
||||
if download.canceled:
|
||||
log.info(f"Download {download.info.title} was canceled, skipping start.")
|
||||
return
|
||||
if self.config.DOWNLOAD_MODE == 'sequential':
|
||||
async with self.seq_lock:
|
||||
log.info("Starting sequential download.")
|
||||
await download.start(self.notifier)
|
||||
self._post_download_cleanup(download)
|
||||
elif self.config.DOWNLOAD_MODE == 'limited' and self.semaphore is not None:
|
||||
await self.__limited_concurrent_download(download)
|
||||
else:
|
||||
await self.__concurrent_download(download)
|
||||
|
||||
async def __concurrent_download(self, download):
|
||||
log.info("Starting concurrent download without limits.")
|
||||
asyncio.create_task(self._run_download(download))
|
||||
|
||||
async def __limited_concurrent_download(self, download):
|
||||
log.info("Starting limited concurrent download.")
|
||||
async with self.semaphore:
|
||||
await self._run_download(download)
|
||||
|
||||
async def _run_download(self, download):
|
||||
if download.canceled:
|
||||
log.info(f"Download {download.info.title} is canceled; skipping start.")
|
||||
return
|
||||
await download.start(self.notifier)
|
||||
self._post_download_cleanup(download)
|
||||
|
||||
def _post_download_cleanup(self, download):
|
||||
if download.info.status != 'finished':
|
||||
if download.tmpfilename and os.path.isfile(download.tmpfilename):
|
||||
try:
|
||||
os.remove(download.tmpfilename)
|
||||
except:
|
||||
pass
|
||||
download.info.status = 'error'
|
||||
download.close()
|
||||
if self.queue.exists(download.info.url):
|
||||
self.queue.delete(download.info.url)
|
||||
if download.canceled:
|
||||
asyncio.create_task(self.notifier.canceled(download.info.url))
|
||||
else:
|
||||
self.done.put(download)
|
||||
asyncio.create_task(self.notifier.completed(download.info))
|
||||
|
||||
def __extract_info(self, url, playlist_strict_mode):
|
||||
return yt_dlp.YoutubeDL(params={
|
||||
'quiet': True,
|
||||
'no_color': True,
|
||||
'extract_flat': True,
|
||||
'ignore_no_formats_error': True,
|
||||
'noplaylist': playlist_strict_mode,
|
||||
'paths': {"home": self.config.DOWNLOAD_DIR, "temp": self.config.TEMP_DIR},
|
||||
**self.config.YTDL_OPTIONS,
|
||||
**({'impersonate': yt_dlp.networking.impersonate.ImpersonateTarget.from_str(self.config.YTDL_OPTIONS['impersonate'])} if 'impersonate' in self.config.YTDL_OPTIONS else {}),
|
||||
}).extract_info(url, download=False)
|
||||
|
||||
def __calc_download_path(self, quality, format, folder):
|
||||
base_directory = self.config.DOWNLOAD_DIR if (quality != 'audio' and format not in AUDIO_FORMATS) else self.config.AUDIO_DOWNLOAD_DIR
|
||||
if folder:
|
||||
if not self.config.CUSTOM_DIRS:
|
||||
return None, {'status': 'error', 'msg': f'A folder for the download was specified but CUSTOM_DIRS is not true in the configuration.'}
|
||||
dldirectory = os.path.realpath(os.path.join(base_directory, folder))
|
||||
real_base_directory = os.path.realpath(base_directory)
|
||||
if not dldirectory.startswith(real_base_directory):
|
||||
return None, {'status': 'error', 'msg': f'Folder "{folder}" must resolve inside the base download directory "{real_base_directory}"'}
|
||||
if not os.path.isdir(dldirectory):
|
||||
if not self.config.CREATE_CUSTOM_DIRS:
|
||||
return None, {'status': 'error', 'msg': f'Folder "{folder}" for download does not exist inside base directory "{real_base_directory}", and CREATE_CUSTOM_DIRS is not true in the configuration.'}
|
||||
os.makedirs(dldirectory, exist_ok=True)
|
||||
else:
|
||||
dldirectory = base_directory
|
||||
return dldirectory, None
|
||||
|
||||
async def __add_download(self, dl, auto_start):
|
||||
dldirectory, error_message = self.__calc_download_path(dl.quality, dl.format, dl.folder)
|
||||
if error_message is not None:
|
||||
return error_message
|
||||
output = self.config.OUTPUT_TEMPLATE if len(dl.custom_name_prefix) == 0 else f'{dl.custom_name_prefix}.{self.config.OUTPUT_TEMPLATE}'
|
||||
output_chapter = self.config.OUTPUT_TEMPLATE_CHAPTER
|
||||
entry = getattr(dl, 'entry', None)
|
||||
if entry is not None and 'playlist' in entry and entry['playlist'] is not None:
|
||||
if len(self.config.OUTPUT_TEMPLATE_PLAYLIST):
|
||||
output = self.config.OUTPUT_TEMPLATE_PLAYLIST
|
||||
for property, value in entry.items():
|
||||
if property.startswith("playlist"):
|
||||
output = output.replace(f"%({property})s", str(value))
|
||||
ytdl_options = dict(self.config.YTDL_OPTIONS)
|
||||
playlist_item_limit = getattr(dl, 'playlist_item_limit', 0)
|
||||
if playlist_item_limit > 0:
|
||||
log.info(f'playlist limit is set. Processing only first {playlist_item_limit} entries')
|
||||
ytdl_options['playlistend'] = playlist_item_limit
|
||||
download = Download(dldirectory, self.config.TEMP_DIR, output, output_chapter, dl.quality, dl.format, ytdl_options, dl)
|
||||
if auto_start is True:
|
||||
self.queue.put(download)
|
||||
asyncio.create_task(self.__start_download(download))
|
||||
else:
|
||||
self.pending.put(download)
|
||||
await self.notifier.added(dl)
|
||||
|
||||
async def __add_entry(self, entry, quality, format, folder, custom_name_prefix, playlist_strict_mode, playlist_item_limit, auto_start, already):
|
||||
if not entry:
|
||||
return {'status': 'error', 'msg': "Invalid/empty data was given."}
|
||||
|
||||
error = None
|
||||
if "live_status" in entry and "release_timestamp" in entry and entry.get("live_status") == "is_upcoming":
|
||||
dt_ts = datetime.fromtimestamp(entry.get("release_timestamp")).strftime('%Y-%m-%d %H:%M:%S %z')
|
||||
error = f"Live stream is scheduled to start at {dt_ts}"
|
||||
else:
|
||||
if "msg" in entry:
|
||||
error = entry["msg"]
|
||||
|
||||
etype = entry.get('_type') or 'video'
|
||||
|
||||
if etype.startswith('url'):
|
||||
log.debug('Processing as an url')
|
||||
return await self.add(entry['url'], quality, format, folder, custom_name_prefix, playlist_strict_mode, playlist_item_limit, auto_start, already)
|
||||
elif etype == 'playlist':
|
||||
log.debug('Processing as a playlist')
|
||||
entries = entry['entries']
|
||||
# Convert generator to list if needed (for len() and slicing operations)
|
||||
if isinstance(entries, types.GeneratorType):
|
||||
entries = list(entries)
|
||||
log.info(f'playlist detected with {len(entries)} entries')
|
||||
playlist_index_digits = len(str(len(entries)))
|
||||
results = []
|
||||
if playlist_item_limit > 0:
|
||||
log.info(f'Playlist item limit is set. Processing only first {playlist_item_limit} entries')
|
||||
entries = entries[:playlist_item_limit]
|
||||
for index, etr in enumerate(entries, start=1):
|
||||
etr["_type"] = "video"
|
||||
etr["playlist"] = entry["id"]
|
||||
etr["playlist_index"] = '{{0:0{0:d}d}}'.format(playlist_index_digits).format(index)
|
||||
for property in ("id", "title", "uploader", "uploader_id"):
|
||||
if property in entry:
|
||||
etr[f"playlist_{property}"] = entry[property]
|
||||
results.append(await self.__add_entry(etr, quality, format, folder, custom_name_prefix, playlist_strict_mode, playlist_item_limit, auto_start, already))
|
||||
if any(res['status'] == 'error' for res in results):
|
||||
return {'status': 'error', 'msg': ', '.join(res['msg'] for res in results if res['status'] == 'error' and 'msg' in res)}
|
||||
return {'status': 'ok'}
|
||||
elif etype == 'video' or (etype.startswith('url') and 'id' in entry and 'title' in entry):
|
||||
log.debug('Processing as a video')
|
||||
key = entry.get('webpage_url') or entry['url']
|
||||
if not self.queue.exists(key):
|
||||
dl = DownloadInfo(entry['id'], entry.get('title') or entry['id'], key, quality, format, folder, custom_name_prefix, error, entry, playlist_item_limit)
|
||||
await self.__add_download(dl, auto_start)
|
||||
return {'status': 'ok'}
|
||||
return {'status': 'error', 'msg': f'Unsupported resource "{etype}"'}
|
||||
|
||||
async def add(self, url, quality, format, folder, custom_name_prefix, playlist_strict_mode, playlist_item_limit, auto_start=True, already=None):
|
||||
log.info(f'adding {url}: {quality=} {format=} {already=} {folder=} {custom_name_prefix=} {playlist_strict_mode=} {playlist_item_limit=} {auto_start=}')
|
||||
already = set() if already is None else already
|
||||
if url in already:
|
||||
log.info('recursion detected, skipping')
|
||||
return {'status': 'ok'}
|
||||
else:
|
||||
already.add(url)
|
||||
try:
|
||||
entry = await asyncio.get_running_loop().run_in_executor(None, self.__extract_info, url, playlist_strict_mode)
|
||||
except yt_dlp.utils.YoutubeDLError as exc:
|
||||
return {'status': 'error', 'msg': str(exc)}
|
||||
return await self.__add_entry(entry, quality, format, folder, custom_name_prefix, playlist_strict_mode, playlist_item_limit, auto_start, already)
|
||||
|
||||
async def start_pending(self, ids):
|
||||
for id in ids:
|
||||
if not self.pending.exists(id):
|
||||
log.warn(f'requested start for non-existent download {id}')
|
||||
continue
|
||||
dl = self.pending.get(id)
|
||||
self.queue.put(dl)
|
||||
self.pending.delete(id)
|
||||
asyncio.create_task(self.__start_download(dl))
|
||||
return {'status': 'ok'}
|
||||
|
||||
async def cancel(self, ids):
|
||||
for id in ids:
|
||||
if self.pending.exists(id):
|
||||
self.pending.delete(id)
|
||||
await self.notifier.canceled(id)
|
||||
continue
|
||||
if not self.queue.exists(id):
|
||||
log.warn(f'requested cancel for non-existent download {id}')
|
||||
continue
|
||||
if self.queue.get(id).started():
|
||||
self.queue.get(id).cancel()
|
||||
else:
|
||||
self.queue.delete(id)
|
||||
await self.notifier.canceled(id)
|
||||
return {'status': 'ok'}
|
||||
|
||||
async def clear(self, ids):
|
||||
for id in ids:
|
||||
if not self.done.exists(id):
|
||||
log.warn(f'requested delete for non-existent download {id}')
|
||||
continue
|
||||
if self.config.DELETE_FILE_ON_TRASHCAN:
|
||||
dl = self.done.get(id)
|
||||
try:
|
||||
dldirectory, _ = self.__calc_download_path(dl.info.quality, dl.info.format, dl.info.folder)
|
||||
os.remove(os.path.join(dldirectory, dl.info.filename))
|
||||
except Exception as e:
|
||||
log.warn(f'deleting file for download {id} failed with error message {e!r}')
|
||||
self.done.delete(id)
|
||||
await self.notifier.cleared(id)
|
||||
return {'status': 'ok'}
|
||||
|
||||
def get(self):
|
||||
return (list((k, v.info) for k, v in self.queue.items()) +
|
||||
list((k, v.info) for k, v in self.pending.items()),
|
||||
list((k, v.info) for k, v in self.done.items()))
|
||||
import os
|
||||
import yt_dlp
|
||||
from collections import OrderedDict
|
||||
import shelve
|
||||
import time
|
||||
import asyncio
|
||||
import multiprocessing
|
||||
import logging
|
||||
import re
|
||||
import types
|
||||
|
||||
import yt_dlp.networking.impersonate
|
||||
from dl_formats import get_format, get_opts, AUDIO_FORMATS
|
||||
from datetime import datetime
|
||||
|
||||
log = logging.getLogger('ytdl')
|
||||
|
||||
def _convert_generators_to_lists(obj):
|
||||
"""Recursively convert generators to lists in a dictionary to make it pickleable."""
|
||||
if isinstance(obj, types.GeneratorType):
|
||||
return list(obj)
|
||||
elif isinstance(obj, dict):
|
||||
return {k: _convert_generators_to_lists(v) for k, v in obj.items()}
|
||||
elif isinstance(obj, (list, tuple)):
|
||||
return type(obj)(_convert_generators_to_lists(item) for item in obj)
|
||||
else:
|
||||
return obj
|
||||
|
||||
class DownloadQueueNotifier:
|
||||
async def added(self, dl):
|
||||
raise NotImplementedError
|
||||
|
||||
async def updated(self, dl):
|
||||
raise NotImplementedError
|
||||
|
||||
async def completed(self, dl):
|
||||
raise NotImplementedError
|
||||
|
||||
async def canceled(self, id):
|
||||
raise NotImplementedError
|
||||
|
||||
async def cleared(self, id):
|
||||
raise NotImplementedError
|
||||
|
||||
class DownloadInfo:
|
||||
def __init__(self, id, title, url, quality, format, folder, custom_name_prefix, error, entry, playlist_item_limit):
|
||||
self.id = id if len(custom_name_prefix) == 0 else f'{custom_name_prefix}.{id}'
|
||||
self.title = title if len(custom_name_prefix) == 0 else f'{custom_name_prefix}.{title}'
|
||||
self.url = url
|
||||
self.quality = quality
|
||||
self.format = format
|
||||
self.folder = folder
|
||||
self.custom_name_prefix = custom_name_prefix
|
||||
self.msg = self.percent = self.speed = self.eta = None
|
||||
self.status = "pending"
|
||||
self.size = None
|
||||
self.timestamp = time.time_ns()
|
||||
self.error = error
|
||||
# Convert generators to lists to make entry pickleable
|
||||
self.entry = _convert_generators_to_lists(entry) if entry is not None else None
|
||||
self.playlist_item_limit = playlist_item_limit
|
||||
|
||||
class Download:
|
||||
manager = None
|
||||
|
||||
def __init__(self, download_dir, temp_dir, output_template, output_template_chapter, quality, format, ytdl_opts, info):
|
||||
self.download_dir = download_dir
|
||||
self.temp_dir = temp_dir
|
||||
self.output_template = output_template
|
||||
self.output_template_chapter = output_template_chapter
|
||||
self.format = get_format(format, quality)
|
||||
self.ytdl_opts = get_opts(format, quality, ytdl_opts)
|
||||
if "impersonate" in self.ytdl_opts:
|
||||
self.ytdl_opts["impersonate"] = yt_dlp.networking.impersonate.ImpersonateTarget.from_str(self.ytdl_opts["impersonate"])
|
||||
self.info = info
|
||||
self.canceled = False
|
||||
self.tmpfilename = None
|
||||
self.status_queue = None
|
||||
self.proc = None
|
||||
self.loop = None
|
||||
self.notifier = None
|
||||
|
||||
def _download(self):
|
||||
log.info(f"Starting download for: {self.info.title} ({self.info.url})")
|
||||
try:
|
||||
def put_status(st):
|
||||
self.status_queue.put({k: v for k, v in st.items() if k in (
|
||||
'tmpfilename',
|
||||
'filename',
|
||||
'status',
|
||||
'msg',
|
||||
'total_bytes',
|
||||
'total_bytes_estimate',
|
||||
'downloaded_bytes',
|
||||
'speed',
|
||||
'eta',
|
||||
)})
|
||||
|
||||
def put_status_postprocessor(d):
|
||||
if d['postprocessor'] == 'MoveFiles' and d['status'] == 'finished':
|
||||
if '__finaldir' in d['info_dict']:
|
||||
filename = os.path.join(d['info_dict']['__finaldir'], os.path.basename(d['info_dict']['filepath']))
|
||||
else:
|
||||
filename = d['info_dict']['filepath']
|
||||
self.status_queue.put({'status': 'finished', 'filename': filename})
|
||||
|
||||
ret = yt_dlp.YoutubeDL(params={
|
||||
'quiet': True,
|
||||
'no_color': True,
|
||||
'paths': {"home": self.download_dir, "temp": self.temp_dir},
|
||||
'outtmpl': { "default": self.output_template, "chapter": self.output_template_chapter },
|
||||
'format': self.format,
|
||||
'socket_timeout': 30,
|
||||
'ignore_no_formats_error': True,
|
||||
'progress_hooks': [put_status],
|
||||
'postprocessor_hooks': [put_status_postprocessor],
|
||||
**self.ytdl_opts,
|
||||
}).download([self.info.url])
|
||||
self.status_queue.put({'status': 'finished' if ret == 0 else 'error'})
|
||||
log.info(f"Finished download for: {self.info.title}")
|
||||
except yt_dlp.utils.YoutubeDLError as exc:
|
||||
log.error(f"Download error for {self.info.title}: {str(exc)}")
|
||||
self.status_queue.put({'status': 'error', 'msg': str(exc)})
|
||||
|
||||
async def start(self, notifier):
|
||||
log.info(f"Preparing download for: {self.info.title}")
|
||||
if Download.manager is None:
|
||||
Download.manager = multiprocessing.Manager()
|
||||
self.status_queue = Download.manager.Queue()
|
||||
self.proc = multiprocessing.Process(target=self._download)
|
||||
self.proc.start()
|
||||
self.loop = asyncio.get_running_loop()
|
||||
self.notifier = notifier
|
||||
self.info.status = 'preparing'
|
||||
await self.notifier.updated(self.info)
|
||||
asyncio.create_task(self.update_status())
|
||||
return await self.loop.run_in_executor(None, self.proc.join)
|
||||
|
||||
def cancel(self):
|
||||
log.info(f"Cancelling download: {self.info.title}")
|
||||
if self.running():
|
||||
try:
|
||||
self.proc.kill()
|
||||
except Exception as e:
|
||||
log.error(f"Error killing process for {self.info.title}: {e}")
|
||||
self.canceled = True
|
||||
if self.status_queue is not None:
|
||||
self.status_queue.put(None)
|
||||
|
||||
def close(self):
|
||||
log.info(f"Closing download process for: {self.info.title}")
|
||||
if self.started():
|
||||
self.proc.close()
|
||||
if self.status_queue is not None:
|
||||
self.status_queue.put(None)
|
||||
|
||||
def running(self):
|
||||
try:
|
||||
return self.proc is not None and self.proc.is_alive()
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
def started(self):
|
||||
return self.proc is not None
|
||||
|
||||
async def update_status(self):
|
||||
while True:
|
||||
status = await self.loop.run_in_executor(None, self.status_queue.get)
|
||||
if status is None:
|
||||
log.info(f"Status update finished for: {self.info.title}")
|
||||
return
|
||||
if self.canceled:
|
||||
log.info(f"Download {self.info.title} is canceled; stopping status updates.")
|
||||
return
|
||||
self.tmpfilename = status.get('tmpfilename')
|
||||
if 'filename' in status:
|
||||
fileName = status.get('filename')
|
||||
self.info.filename = os.path.relpath(fileName, self.download_dir)
|
||||
self.info.size = os.path.getsize(fileName) if os.path.exists(fileName) else None
|
||||
if self.info.format == 'thumbnail':
|
||||
self.info.filename = re.sub(r'\.webm$', '.jpg', self.info.filename)
|
||||
self.info.status = status['status']
|
||||
self.info.msg = status.get('msg')
|
||||
if 'downloaded_bytes' in status:
|
||||
total = status.get('total_bytes') or status.get('total_bytes_estimate')
|
||||
if total:
|
||||
self.info.percent = status['downloaded_bytes'] / total * 100
|
||||
self.info.speed = status.get('speed')
|
||||
self.info.eta = status.get('eta')
|
||||
log.info(f"Updating status for {self.info.title}: {status}")
|
||||
await self.notifier.updated(self.info)
|
||||
|
||||
class PersistentQueue:
|
||||
def __init__(self, path):
|
||||
pdir = os.path.dirname(path)
|
||||
if not os.path.isdir(pdir):
|
||||
os.mkdir(pdir)
|
||||
with shelve.open(path, 'c'):
|
||||
pass
|
||||
self.path = path
|
||||
self.dict = OrderedDict()
|
||||
|
||||
def load(self):
|
||||
for k, v in self.saved_items():
|
||||
self.dict[k] = Download(None, None, None, None, None, None, {}, v)
|
||||
|
||||
def exists(self, key):
|
||||
return key in self.dict
|
||||
|
||||
def get(self, key):
|
||||
return self.dict[key]
|
||||
|
||||
def items(self):
|
||||
return self.dict.items()
|
||||
|
||||
def saved_items(self):
|
||||
with shelve.open(self.path, 'r') as shelf:
|
||||
return sorted(shelf.items(), key=lambda item: item[1].timestamp)
|
||||
|
||||
def put(self, value):
|
||||
key = value.info.url
|
||||
self.dict[key] = value
|
||||
with shelve.open(self.path, 'w') as shelf:
|
||||
shelf[key] = value.info
|
||||
|
||||
def delete(self, key):
|
||||
if key in self.dict:
|
||||
del self.dict[key]
|
||||
with shelve.open(self.path, 'w') as shelf:
|
||||
shelf.pop(key, None)
|
||||
|
||||
def next(self):
|
||||
k, v = next(iter(self.dict.items()))
|
||||
return k, v
|
||||
|
||||
def empty(self):
|
||||
return not bool(self.dict)
|
||||
|
||||
class DownloadQueue:
|
||||
def __init__(self, config, notifier):
|
||||
self.config = config
|
||||
self.notifier = notifier
|
||||
self.queue = PersistentQueue(self.config.STATE_DIR + '/queue')
|
||||
self.done = PersistentQueue(self.config.STATE_DIR + '/completed')
|
||||
self.pending = PersistentQueue(self.config.STATE_DIR + '/pending')
|
||||
self.active_downloads = set()
|
||||
self.semaphore = None
|
||||
# For sequential mode, use an asyncio lock to ensure one-at-a-time execution.
|
||||
if self.config.DOWNLOAD_MODE == 'sequential':
|
||||
self.seq_lock = asyncio.Lock()
|
||||
elif self.config.DOWNLOAD_MODE == 'limited':
|
||||
self.semaphore = asyncio.Semaphore(int(self.config.MAX_CONCURRENT_DOWNLOADS))
|
||||
|
||||
self.done.load()
|
||||
|
||||
async def __import_queue(self):
|
||||
for k, v in self.queue.saved_items():
|
||||
await self.__add_download(v, True)
|
||||
|
||||
async def __import_pending(self):
|
||||
for k, v in self.pending.saved_items():
|
||||
await self.__add_download(v, False)
|
||||
|
||||
async def initialize(self):
|
||||
log.info("Initializing DownloadQueue")
|
||||
asyncio.create_task(self.__import_queue())
|
||||
asyncio.create_task(self.__import_pending())
|
||||
|
||||
async def __start_download(self, download):
|
||||
if download.canceled:
|
||||
log.info(f"Download {download.info.title} was canceled, skipping start.")
|
||||
return
|
||||
if self.config.DOWNLOAD_MODE == 'sequential':
|
||||
async with self.seq_lock:
|
||||
log.info("Starting sequential download.")
|
||||
await download.start(self.notifier)
|
||||
self._post_download_cleanup(download)
|
||||
elif self.config.DOWNLOAD_MODE == 'limited' and self.semaphore is not None:
|
||||
await self.__limited_concurrent_download(download)
|
||||
else:
|
||||
await self.__concurrent_download(download)
|
||||
|
||||
async def __concurrent_download(self, download):
|
||||
log.info("Starting concurrent download without limits.")
|
||||
asyncio.create_task(self._run_download(download))
|
||||
|
||||
async def __limited_concurrent_download(self, download):
|
||||
log.info("Starting limited concurrent download.")
|
||||
async with self.semaphore:
|
||||
await self._run_download(download)
|
||||
|
||||
async def _run_download(self, download):
|
||||
if download.canceled:
|
||||
log.info(f"Download {download.info.title} is canceled; skipping start.")
|
||||
return
|
||||
await download.start(self.notifier)
|
||||
self._post_download_cleanup(download)
|
||||
|
||||
def _post_download_cleanup(self, download):
|
||||
if download.info.status != 'finished':
|
||||
if download.tmpfilename and os.path.isfile(download.tmpfilename):
|
||||
try:
|
||||
os.remove(download.tmpfilename)
|
||||
except:
|
||||
pass
|
||||
download.info.status = 'error'
|
||||
download.close()
|
||||
if self.queue.exists(download.info.url):
|
||||
self.queue.delete(download.info.url)
|
||||
if download.canceled:
|
||||
asyncio.create_task(self.notifier.canceled(download.info.url))
|
||||
else:
|
||||
self.done.put(download)
|
||||
asyncio.create_task(self.notifier.completed(download.info))
|
||||
|
||||
def __extract_info(self, url, playlist_strict_mode):
|
||||
return yt_dlp.YoutubeDL(params={
|
||||
'quiet': True,
|
||||
'no_color': True,
|
||||
'extract_flat': True,
|
||||
'ignore_no_formats_error': True,
|
||||
'noplaylist': playlist_strict_mode,
|
||||
'paths': {"home": self.config.DOWNLOAD_DIR, "temp": self.config.TEMP_DIR},
|
||||
**self.config.YTDL_OPTIONS,
|
||||
**({'impersonate': yt_dlp.networking.impersonate.ImpersonateTarget.from_str(self.config.YTDL_OPTIONS['impersonate'])} if 'impersonate' in self.config.YTDL_OPTIONS else {}),
|
||||
}).extract_info(url, download=False)
|
||||
|
||||
def __calc_download_path(self, quality, format, folder):
|
||||
base_directory = self.config.DOWNLOAD_DIR if (quality != 'audio' and format not in AUDIO_FORMATS) else self.config.AUDIO_DOWNLOAD_DIR
|
||||
if folder:
|
||||
if not self.config.CUSTOM_DIRS:
|
||||
return None, {'status': 'error', 'msg': f'A folder for the download was specified but CUSTOM_DIRS is not true in the configuration.'}
|
||||
dldirectory = os.path.realpath(os.path.join(base_directory, folder))
|
||||
real_base_directory = os.path.realpath(base_directory)
|
||||
if not dldirectory.startswith(real_base_directory):
|
||||
return None, {'status': 'error', 'msg': f'Folder "{folder}" must resolve inside the base download directory "{real_base_directory}"'}
|
||||
if not os.path.isdir(dldirectory):
|
||||
if not self.config.CREATE_CUSTOM_DIRS:
|
||||
return None, {'status': 'error', 'msg': f'Folder "{folder}" for download does not exist inside base directory "{real_base_directory}", and CREATE_CUSTOM_DIRS is not true in the configuration.'}
|
||||
os.makedirs(dldirectory, exist_ok=True)
|
||||
else:
|
||||
dldirectory = base_directory
|
||||
return dldirectory, None
|
||||
|
||||
async def __add_download(self, dl, auto_start):
|
||||
dldirectory, error_message = self.__calc_download_path(dl.quality, dl.format, dl.folder)
|
||||
if error_message is not None:
|
||||
return error_message
|
||||
output = self.config.OUTPUT_TEMPLATE if len(dl.custom_name_prefix) == 0 else f'{dl.custom_name_prefix}.{self.config.OUTPUT_TEMPLATE}'
|
||||
output_chapter = self.config.OUTPUT_TEMPLATE_CHAPTER
|
||||
entry = getattr(dl, 'entry', None)
|
||||
if entry is not None and 'playlist' in entry and entry['playlist'] is not None:
|
||||
if len(self.config.OUTPUT_TEMPLATE_PLAYLIST):
|
||||
output = self.config.OUTPUT_TEMPLATE_PLAYLIST
|
||||
for property, value in entry.items():
|
||||
if property.startswith("playlist"):
|
||||
output = output.replace(f"%({property})s", str(value))
|
||||
ytdl_options = dict(self.config.YTDL_OPTIONS)
|
||||
playlist_item_limit = getattr(dl, 'playlist_item_limit', 0)
|
||||
if playlist_item_limit > 0:
|
||||
log.info(f'playlist limit is set. Processing only first {playlist_item_limit} entries')
|
||||
ytdl_options['playlistend'] = playlist_item_limit
|
||||
download = Download(dldirectory, self.config.TEMP_DIR, output, output_chapter, dl.quality, dl.format, ytdl_options, dl)
|
||||
if auto_start is True:
|
||||
self.queue.put(download)
|
||||
asyncio.create_task(self.__start_download(download))
|
||||
else:
|
||||
self.pending.put(download)
|
||||
await self.notifier.added(dl)
|
||||
|
||||
async def __add_entry(self, entry, quality, format, folder, custom_name_prefix, playlist_strict_mode, playlist_item_limit, auto_start, already):
|
||||
if not entry:
|
||||
return {'status': 'error', 'msg': "Invalid/empty data was given."}
|
||||
|
||||
error = None
|
||||
if "live_status" in entry and "release_timestamp" in entry and entry.get("live_status") == "is_upcoming":
|
||||
dt_ts = datetime.fromtimestamp(entry.get("release_timestamp")).strftime('%Y-%m-%d %H:%M:%S %z')
|
||||
error = f"Live stream is scheduled to start at {dt_ts}"
|
||||
else:
|
||||
if "msg" in entry:
|
||||
error = entry["msg"]
|
||||
|
||||
etype = entry.get('_type') or 'video'
|
||||
|
||||
if etype.startswith('url'):
|
||||
log.debug('Processing as an url')
|
||||
return await self.add(entry['url'], quality, format, folder, custom_name_prefix, playlist_strict_mode, playlist_item_limit, auto_start, already)
|
||||
elif etype == 'playlist':
|
||||
log.debug('Processing as a playlist')
|
||||
entries = entry['entries']
|
||||
# Convert generator to list if needed (for len() and slicing operations)
|
||||
if isinstance(entries, types.GeneratorType):
|
||||
entries = list(entries)
|
||||
log.info(f'playlist detected with {len(entries)} entries')
|
||||
playlist_index_digits = len(str(len(entries)))
|
||||
results = []
|
||||
if playlist_item_limit > 0:
|
||||
log.info(f'Playlist item limit is set. Processing only first {playlist_item_limit} entries')
|
||||
entries = entries[:playlist_item_limit]
|
||||
for index, etr in enumerate(entries, start=1):
|
||||
etr["_type"] = "video"
|
||||
etr["playlist"] = entry["id"]
|
||||
etr["playlist_index"] = '{{0:0{0:d}d}}'.format(playlist_index_digits).format(index)
|
||||
for property in ("id", "title", "uploader", "uploader_id"):
|
||||
if property in entry:
|
||||
etr[f"playlist_{property}"] = entry[property]
|
||||
results.append(await self.__add_entry(etr, quality, format, folder, custom_name_prefix, playlist_strict_mode, playlist_item_limit, auto_start, already))
|
||||
if any(res['status'] == 'error' for res in results):
|
||||
return {'status': 'error', 'msg': ', '.join(res['msg'] for res in results if res['status'] == 'error' and 'msg' in res)}
|
||||
return {'status': 'ok'}
|
||||
elif etype == 'video' or (etype.startswith('url') and 'id' in entry and 'title' in entry):
|
||||
log.debug('Processing as a video')
|
||||
key = entry.get('webpage_url') or entry['url']
|
||||
if not self.queue.exists(key):
|
||||
dl = DownloadInfo(entry['id'], entry.get('title') or entry['id'], key, quality, format, folder, custom_name_prefix, error, entry, playlist_item_limit)
|
||||
await self.__add_download(dl, auto_start)
|
||||
return {'status': 'ok'}
|
||||
return {'status': 'error', 'msg': f'Unsupported resource "{etype}"'}
|
||||
|
||||
async def add(self, url, quality, format, folder, custom_name_prefix, playlist_strict_mode, playlist_item_limit, auto_start=True, already=None):
|
||||
log.info(f'adding {url}: {quality=} {format=} {already=} {folder=} {custom_name_prefix=} {playlist_strict_mode=} {playlist_item_limit=} {auto_start=}')
|
||||
already = set() if already is None else already
|
||||
if url in already:
|
||||
log.info('recursion detected, skipping')
|
||||
return {'status': 'ok'}
|
||||
else:
|
||||
already.add(url)
|
||||
try:
|
||||
entry = await asyncio.get_running_loop().run_in_executor(None, self.__extract_info, url, playlist_strict_mode)
|
||||
except yt_dlp.utils.YoutubeDLError as exc:
|
||||
return {'status': 'error', 'msg': str(exc)}
|
||||
return await self.__add_entry(entry, quality, format, folder, custom_name_prefix, playlist_strict_mode, playlist_item_limit, auto_start, already)
|
||||
|
||||
async def start_pending(self, ids):
|
||||
for id in ids:
|
||||
if not self.pending.exists(id):
|
||||
log.warn(f'requested start for non-existent download {id}')
|
||||
continue
|
||||
dl = self.pending.get(id)
|
||||
self.queue.put(dl)
|
||||
self.pending.delete(id)
|
||||
asyncio.create_task(self.__start_download(dl))
|
||||
return {'status': 'ok'}
|
||||
|
||||
async def cancel(self, ids):
|
||||
for id in ids:
|
||||
if self.pending.exists(id):
|
||||
self.pending.delete(id)
|
||||
await self.notifier.canceled(id)
|
||||
continue
|
||||
if not self.queue.exists(id):
|
||||
log.warn(f'requested cancel for non-existent download {id}')
|
||||
continue
|
||||
if self.queue.get(id).started():
|
||||
self.queue.get(id).cancel()
|
||||
else:
|
||||
self.queue.delete(id)
|
||||
await self.notifier.canceled(id)
|
||||
return {'status': 'ok'}
|
||||
|
||||
async def clear(self, ids):
|
||||
for id in ids:
|
||||
if not self.done.exists(id):
|
||||
log.warn(f'requested delete for non-existent download {id}')
|
||||
continue
|
||||
if self.config.DELETE_FILE_ON_TRASHCAN:
|
||||
dl = self.done.get(id)
|
||||
try:
|
||||
dldirectory, _ = self.__calc_download_path(dl.info.quality, dl.info.format, dl.info.folder)
|
||||
os.remove(os.path.join(dldirectory, dl.info.filename))
|
||||
except Exception as e:
|
||||
log.warn(f'deleting file for download {id} failed with error message {e!r}')
|
||||
self.done.delete(id)
|
||||
await self.notifier.cleared(id)
|
||||
return {'status': 'ok'}
|
||||
|
||||
def get(self):
|
||||
return (list((k, v.info) for k, v in self.queue.items()) +
|
||||
list((k, v.info) for k, v in self.pending.items()),
|
||||
list((k, v.info) for k, v in self.done.items()))
|
||||
|
||||
Reference in New Issue
Block a user