From cfc068e8573366baaca9fa949c66cafa090e381a Mon Sep 17 00:00:00 2001 From: Viner Abubakirov Date: Wed, 18 Feb 2026 20:54:28 +0500 Subject: [PATCH] =?UTF-8?q?=D0=92=D1=80=D0=B5=D0=BC=D0=B5=D0=BD=D0=BD?= =?UTF-8?q?=D0=BE=20=D0=BF=D0=B5=D1=80=D0=B5=D0=B4=D0=B5=D0=BB=D0=B0=D0=BB?= =?UTF-8?q?=20downloader=20=D0=B8=20youtube?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit возможно потребуется перенести yt-dlp в консоль, и работать через stdout --- app/utils/downloader.py | 23 ++++++++++++++-------- app/utils/youtube.py | 43 +++++++++++++++++++++++++++++++++-------- test.py | 10 +++++----- 3 files changed, 55 insertions(+), 21 deletions(-) diff --git a/app/utils/downloader.py b/app/utils/downloader.py index caa6fef..5f73c13 100644 --- a/app/utils/downloader.py +++ b/app/utils/downloader.py @@ -4,24 +4,31 @@ from app.utils.uploader import ChunkUploadBackend class HttpStreamingDownloader: - def __init__(self, backend: ChunkUploadBackend, chunk_size: int = 1024**2): + def __init__(self, backend: ChunkUploadBackend): self.backend = backend - self.chunk_size = chunk_size - def download(self, url: str, filename: str, headers: dict = {}): + def download( + self, + url: str, + filename: str, + headers: dict = {}, + chunk_size: int = 1024**2, + ): self.backend.start(filename) - # http = httpx.Client(http2=True) - http = httpx + http = httpx.Client(http2=True) try: print("Try to download") with http.stream("GET", url, timeout=20, headers=headers) as response: response.raise_for_status() total = int(response.headers.get("Content-Length", 0)) - with tqdm(total=total, unit="B", unit_scale=True, desc=filename) as pbar: - for chunk in response.iter_bytes(self.chunk_size): + with tqdm( + total=total, unit="B", unit_scale=True, desc=filename + ) as pbar: + for chunk in response.iter_bytes(chunk_size): if chunk: self.backend.upload_chunk(chunk) - pbar.update(len(chunk)) # обновляем прогресс по размеру чанка + # обновляем прогресс по размеру чанка + pbar.update(len(chunk)) self.backend.finish() except: diff --git a/app/utils/youtube.py b/app/utils/youtube.py index 6483b5c..3d97236 100644 --- a/app/utils/youtube.py +++ b/app/utils/youtube.py @@ -1,4 +1,12 @@ from typing import Optional +from dataclasses import dataclass + + +@dataclass +class MediaContent: + url: str + headers: dict + chunk_size: int class YtDlpInfo: @@ -14,7 +22,7 @@ class YtDlpInfo: "quiet": True, "skip_download": True, "noplaylist": True, - "js-runtimes": "deno" + "js-runtimes": "deno", } with yt_dlp.YoutubeDL(ydl_opts) as ydl: self.info = ydl.extract_info(self.url, download=False) @@ -24,14 +32,15 @@ class YtDlpInfo: """Возвращает title видео""" return self.info.get("title", "unknown") - def get_video_url(self, resolution: Optional[str] = None) -> Optional[tuple[dict, str]]: + def get_video_url(self, resolution: Optional[str] = None) -> Optional[MediaContent]: """ Возвращает ссылку на видеопоток с указанным разрешением resolution: например '1080p', '720p', '480p' """ formats = self.info.get("formats", []) video_formats = [ - f for f in formats + f + for f in formats if f.get("vcodec") != "none" # есть видео and f.get("acodec") == "none" # без аудио ] @@ -39,22 +48,40 @@ class YtDlpInfo: # ищем точное соответствие разрешению for f in video_formats: if f.get("format_note") == resolution: - return f.get("http_headers", {}), f.get("url") + return MediaContent( + f.get("url", ""), + f.get("http_headers", {}), + f.get("downloader_options", {}).get("http_chunk_size", 1024**2), + ) # если разрешение не указано или не найдено — берем лучший if video_formats: # сортируем по height video_formats.sort(key=lambda x: x.get("height") or 0, reverse=True) - return video_formats[0].get("http_headers"), video_formats[0].get("url") + return MediaContent( + video_formats[0].get("url", ""), + video_formats[0].get("http_headers", {}), + video_formats[0] + .get("downloader_options", {}) + .get("http_chunk_size", 1024**2), + ) return None - def get_audio_url(self) -> Optional[tuple[dict,str]]: + def get_audio_url(self) -> Optional[MediaContent]: """Возвращает ссылку на аудиопоток""" formats = self.info.get("formats", []) audio_formats = [ - f for f in formats if f.get("vcodec") == "none" and f.get("acodec") != "none" + f + for f in formats + if f.get("vcodec") == "none" and f.get("acodec") != "none" ] if audio_formats: # берем наилучшее качество audio_formats.sort(key=lambda x: x.get("abr") or 0, reverse=True) - return audio_formats[0].get("http_headers"), audio_formats[0].get("url") + return MediaContent( + audio_formats[0].get("url", ""), + audio_formats[0].get("http_headers", {}), + audio_formats[0] + .get("downloader_options", {}) + .get("http_chunk_size", 1024**2), + ) return None diff --git a/test.py b/test.py index 149159f..8ffca01 100644 --- a/test.py +++ b/test.py @@ -7,13 +7,13 @@ def download(url: str): upload_backend = DiskChunkUploadBackend("trash_holder") downloader = HttpStreamingDownloader(upload_backend) youtube = YtDlpInfo(url) - video_headers, video_url = youtube.get_video_url("480p") + video = youtube.get_video_url("480p") video_name = youtube.title + ".mp4" - audio_headers, audio_url = youtube.get_audio_url() + audio = youtube.get_audio_url() audio_name = youtube.title + ".m4a" - - downloader.download(video_url, video_name, video_headers) - downloader.download(audio_url, audio_name, audio_headers) + + downloader.download(video.url, video_name, video.headers, video.chunk_size) + downloader.download(audio.url, audio_name, audio.headers, audio.chunk_size) def main():