Временно переделал downloader и youtube

возможно потребуется перенести yt-dlp в консоль, и работать через stdout
This commit is contained in:
Viner Abubakirov
2026-02-18 20:54:28 +05:00
parent 0cf412ea1e
commit cfc068e857
3 changed files with 55 additions and 21 deletions

View File

@@ -4,24 +4,31 @@ from app.utils.uploader import ChunkUploadBackend
class HttpStreamingDownloader:
def __init__(self, backend: ChunkUploadBackend, chunk_size: int = 1024**2):
def __init__(self, backend: ChunkUploadBackend):
self.backend = backend
self.chunk_size = chunk_size
def download(self, url: str, filename: str, headers: dict = {}):
def download(
self,
url: str,
filename: str,
headers: dict = {},
chunk_size: int = 1024**2,
):
self.backend.start(filename)
# http = httpx.Client(http2=True)
http = httpx
http = httpx.Client(http2=True)
try:
print("Try to download")
with http.stream("GET", url, timeout=20, headers=headers) as response:
response.raise_for_status()
total = int(response.headers.get("Content-Length", 0))
with tqdm(total=total, unit="B", unit_scale=True, desc=filename) as pbar:
for chunk in response.iter_bytes(self.chunk_size):
with tqdm(
total=total, unit="B", unit_scale=True, desc=filename
) as pbar:
for chunk in response.iter_bytes(chunk_size):
if chunk:
self.backend.upload_chunk(chunk)
pbar.update(len(chunk)) # обновляем прогресс по размеру чанка
# обновляем прогресс по размеру чанка
pbar.update(len(chunk))
self.backend.finish()
except:

View File

@@ -1,4 +1,12 @@
from typing import Optional
from dataclasses import dataclass
@dataclass
class MediaContent:
url: str
headers: dict
chunk_size: int
class YtDlpInfo:
@@ -14,7 +22,7 @@ class YtDlpInfo:
"quiet": True,
"skip_download": True,
"noplaylist": True,
"js-runtimes": "deno"
"js-runtimes": "deno",
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
self.info = ydl.extract_info(self.url, download=False)
@@ -24,14 +32,15 @@ class YtDlpInfo:
"""Возвращает title видео"""
return self.info.get("title", "unknown")
def get_video_url(self, resolution: Optional[str] = None) -> Optional[tuple[dict, str]]:
def get_video_url(self, resolution: Optional[str] = None) -> Optional[MediaContent]:
"""
Возвращает ссылку на видеопоток с указанным разрешением
resolution: например '1080p', '720p', '480p'
"""
formats = self.info.get("formats", [])
video_formats = [
f for f in formats
f
for f in formats
if f.get("vcodec") != "none" # есть видео
and f.get("acodec") == "none" # без аудио
]
@@ -39,22 +48,40 @@ class YtDlpInfo:
# ищем точное соответствие разрешению
for f in video_formats:
if f.get("format_note") == resolution:
return f.get("http_headers", {}), f.get("url")
return MediaContent(
f.get("url", ""),
f.get("http_headers", {}),
f.get("downloader_options", {}).get("http_chunk_size", 1024**2),
)
# если разрешение не указано или не найдено — берем лучший
if video_formats:
# сортируем по height
video_formats.sort(key=lambda x: x.get("height") or 0, reverse=True)
return video_formats[0].get("http_headers"), video_formats[0].get("url")
return MediaContent(
video_formats[0].get("url", ""),
video_formats[0].get("http_headers", {}),
video_formats[0]
.get("downloader_options", {})
.get("http_chunk_size", 1024**2),
)
return None
def get_audio_url(self) -> Optional[tuple[dict,str]]:
def get_audio_url(self) -> Optional[MediaContent]:
"""Возвращает ссылку на аудиопоток"""
formats = self.info.get("formats", [])
audio_formats = [
f for f in formats if f.get("vcodec") == "none" and f.get("acodec") != "none"
f
for f in formats
if f.get("vcodec") == "none" and f.get("acodec") != "none"
]
if audio_formats:
# берем наилучшее качество
audio_formats.sort(key=lambda x: x.get("abr") or 0, reverse=True)
return audio_formats[0].get("http_headers"), audio_formats[0].get("url")
return MediaContent(
audio_formats[0].get("url", ""),
audio_formats[0].get("http_headers", {}),
audio_formats[0]
.get("downloader_options", {})
.get("http_chunk_size", 1024**2),
)
return None