Нашел +- быстрый способ скачивания файла с помощью yt-dlp

This commit is contained in:
Viner Abubakirov
2026-02-18 22:57:52 +05:00
parent cfc068e857
commit 1981cb7da3
2 changed files with 49 additions and 64 deletions

View File

@@ -1,6 +1,9 @@
from typing import Optional import subprocess
from typing import Literal
from dataclasses import dataclass from dataclasses import dataclass
from app.utils.uploader import ChunkUploadBackend
@dataclass @dataclass
class MediaContent: class MediaContent:
@@ -9,9 +12,10 @@ class MediaContent:
chunk_size: int chunk_size: int
class YtDlpInfo: class YtDlpManager:
def __init__(self, url: str): def __init__(self, url: str, backend: ChunkUploadBackend):
self.url = url self.url = url
self.backend = backend
self.info = None self.info = None
self._extract_info() self._extract_info()
@@ -32,56 +36,35 @@ class YtDlpInfo:
"""Возвращает title видео""" """Возвращает title видео"""
return self.info.get("title", "unknown") return self.info.get("title", "unknown")
def get_video_url(self, resolution: Optional[str] = None) -> Optional[MediaContent]: def download_video(self, size: Literal[240, 360, 480, 720, 1080, 1440, 2160]):
""" command = [
Возвращает ссылку на видеопоток с указанным разрешением "yt-dlp",
resolution: например '1080p', '720p', '480p' "-f",
""" f"bestvideo[height<={size}]",
formats = self.info.get("formats", []) "--no-part",
video_formats = [ "--quiet",
f "--no-warnings",
for f in formats "-o",
if f.get("vcodec") != "none" # есть видео "-",
and f.get("acodec") == "none" # без аудио self.url
] ]
if resolution: print("Start processing")
# ищем точное соответствие разрешению process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, bufsize=0)
for f in video_formats: print("Write filename to upload backend")
if f.get("format_note") == resolution: self.backend.start(self.title + ".mp4")
return MediaContent( print("Start write chunk to upload backend")
f.get("url", ""), chunk_size = 1024 ** 2
f.get("http_headers", {}), length = 0
f.get("downloader_options", {}).get("http_chunk_size", 1024**2), while True:
) chunk = process.stdout.read(chunk_size)
# если разрешение не указано или не найдено — берем лучший if not chunk:
if video_formats: break
# сортируем по height length += chunk_size
video_formats.sort(key=lambda x: x.get("height") or 0, reverse=True) print("Write chunk to backend", length)
return MediaContent( self.backend.upload_chunk(chunk)
video_formats[0].get("url", ""), print("End writing to backend")
video_formats[0].get("http_headers", {}), ret = process.wait()
video_formats[0] print("Check ret status")
.get("downloader_options", {}) if ret != 0:
.get("http_chunk_size", 1024**2), self.backend.abort()
) raise RuntimeError(f"yt-dlp failed, status code: {ret}")
return None
def get_audio_url(self) -> Optional[MediaContent]:
"""Возвращает ссылку на аудиопоток"""
formats = self.info.get("formats", [])
audio_formats = [
f
for f in formats
if f.get("vcodec") == "none" and f.get("acodec") != "none"
]
if audio_formats:
# берем наилучшее качество
audio_formats.sort(key=lambda x: x.get("abr") or 0, reverse=True)
return MediaContent(
audio_formats[0].get("url", ""),
audio_formats[0].get("http_headers", {}),
audio_formats[0]
.get("downloader_options", {})
.get("http_chunk_size", 1024**2),
)
return None

20
test.py
View File

@@ -1,19 +1,21 @@
from app.utils.downloader import HttpStreamingDownloader from app.utils.downloader import HttpStreamingDownloader
from app.utils.uploader import DiskChunkUploadBackend from app.utils.uploader import DiskChunkUploadBackend
from app.utils.youtube import YtDlpInfo from app.utils.youtube import YtDlpManager
def download(url: str): def download(url: str):
upload_backend = DiskChunkUploadBackend("trash_holder") upload_backend = DiskChunkUploadBackend("trash_holder")
downloader = HttpStreamingDownloader(upload_backend) youtube = YtDlpManager(url, upload_backend)
youtube = YtDlpInfo(url) youtube.download_video(360)
video = youtube.get_video_url("480p") # downloader = HttpStreamingDownloader(upload_backend)
video_name = youtube.title + ".mp4" # youtube = YtDlpInfo(url)
audio = youtube.get_audio_url() # video = youtube.get_video_url("480p")
audio_name = youtube.title + ".m4a" # video_name = youtube.title + ".mp4"
# audio = youtube.get_audio_url()
# audio_name = youtube.title + ".m4a"
downloader.download(video.url, video_name, video.headers, video.chunk_size) # downloader.download(video.url, video_name, video.headers, video.chunk_size)
downloader.download(audio.url, audio_name, audio.headers, audio.chunk_size) # downloader.download(audio.url, audio_name, audio.headers, audio.chunk_size)
def main(): def main():