import subprocess from typing import Literal from dataclasses import dataclass from app.utils.uploader import ChunkUploadBackend RESOLUTIONS = [240, 360, 480, 720, 1080, 1440, 2160] @dataclass class MediaContent: url: str headers: dict chunk_size: int class YtDlpManager: def __init__(self, url: str, backend: ChunkUploadBackend): self.url = url self.backend = backend self.info = None self._extract_info() def _extract_info(self): import yt_dlp ydl_opts = { "quiet": True, "skip_download": True, "noplaylist": True, "js-runtimes": "deno", } with yt_dlp.YoutubeDL(ydl_opts) as ydl: self.info = ydl.extract_info(self.url, download=False) @property def title(self) -> str: """Возвращает title видео""" return self.info.get("title", "unknown") def download_video(self, size: int): if size not in RESOLUTIONS: raise RuntimeError("Unsupported quality") command = [ "yt-dlp", "-f", f"bestvideo[height<={size}]", "--no-part", "--quiet", "--no-warnings", "-o", "-", self.url, ] return self._processing(command, self.title + ".mp4") def download_audio(self): command = [ "yt-dlp", "-f", "bestaudio", "--no-part", "--quiet", "--no-warnings", "-o", "-", self.url, ] return self._processing(command, self.title + ".m4a") def _processing(self, command: list[str], filename: str, chunk_size: int = 1024**2): process = subprocess.Popen( command, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, bufsize=0 ) self.backend.start(filename) length = 0 while True: chunk = process.stdout.read(chunk_size) if not chunk: break length += chunk_size self.backend.upload_chunk(chunk) ret = process.wait() if ret != 0: self.backend.abort() raise RuntimeError(f"yt-dlp failed, status code: {ret}") return self.backend.finish()