Init code
This commit is contained in:
4
app/__init__.py
Normal file
4
app/__init__.py
Normal file
@@ -0,0 +1,4 @@
|
||||
# from app.main import app
|
||||
|
||||
|
||||
# __all__ = ["app"]
|
||||
0
app/api/v1/router.py
Normal file
0
app/api/v1/router.py
Normal file
25
app/main.py
Normal file
25
app/main.py
Normal file
@@ -0,0 +1,25 @@
|
||||
from app.utils.downloader import HttpStreamingDownloader
|
||||
from app.utils.uploader import DiskChunkUploadBackend
|
||||
from app.utils.youtube import YtDlpInfo
|
||||
|
||||
|
||||
def download(url: str):
|
||||
upload_backend = DiskChunkUploadBackend("trash_holder")
|
||||
downloader = HttpStreamingDownloader(upload_backend)
|
||||
youtube = YtDlpInfo(url)
|
||||
video_url = youtube.get_video_url("480p")
|
||||
video_name = youtube.title + ".mp4"
|
||||
audio_url = youtube.get_audio_url()
|
||||
audio_name = youtube.title + ".mp4a"
|
||||
|
||||
# downloader.download(video_url, video_name)
|
||||
downloader.download(audio_url, audio_name)
|
||||
|
||||
|
||||
def main():
|
||||
url = "https://youtu.be/OSAOh4L41Wg"
|
||||
download(url)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
8
app/schemas.py
Normal file
8
app/schemas.py
Normal file
@@ -0,0 +1,8 @@
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class DownloadRequest(BaseModel):
|
||||
url: str
|
||||
quality: str
|
||||
codec: str
|
||||
identifier: str
|
||||
0
app/services.py
Normal file
0
app/services.py
Normal file
29
app/utils/downloader.py
Normal file
29
app/utils/downloader.py
Normal file
@@ -0,0 +1,29 @@
|
||||
from tqdm import tqdm
|
||||
import httpx
|
||||
from app.utils.uploader import ChunkUploadBackend
|
||||
|
||||
|
||||
class HttpStreamingDownloader:
|
||||
def __init__(self, backend: ChunkUploadBackend, chunk_size: int = 1024**2):
|
||||
self.backend = backend
|
||||
self.chunk_size = chunk_size
|
||||
|
||||
def download(self, url: str, filename: str, headers: dict = {}):
|
||||
self.backend.start(filename)
|
||||
# http = httpx.Client(http2=True)
|
||||
http = httpx
|
||||
try:
|
||||
print("Try to download")
|
||||
with http.stream("GET", url, timeout=20, headers=headers) as response:
|
||||
response.raise_for_status()
|
||||
total = int(response.headers.get("Content-Length", 0))
|
||||
with tqdm(total=total, unit="B", unit_scale=True, desc=filename) as pbar:
|
||||
for chunk in response.iter_bytes(self.chunk_size):
|
||||
if chunk:
|
||||
self.backend.upload_chunk(chunk)
|
||||
pbar.update(len(chunk)) # обновляем прогресс по размеру чанка
|
||||
|
||||
self.backend.finish()
|
||||
except:
|
||||
self.backend.abort()
|
||||
raise
|
||||
44
app/utils/uploader.py
Normal file
44
app/utils/uploader.py
Normal file
@@ -0,0 +1,44 @@
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
|
||||
class ChunkUploadBackend(ABC):
|
||||
@abstractmethod
|
||||
def start(self, filename: str) -> None:
|
||||
"""Начало загрузки нового файла"""
|
||||
|
||||
@abstractmethod
|
||||
def upload_chunk(self, chunk: bytes) -> None:
|
||||
"""Загрузка очередного чанка"""
|
||||
|
||||
@abstractmethod
|
||||
def finish(self) -> None:
|
||||
"""Завершение загрузки"""
|
||||
|
||||
@abstractmethod
|
||||
def abort(self) -> None:
|
||||
"""Прерывания загрузки"""
|
||||
|
||||
|
||||
class DiskChunkUploadBackend(ChunkUploadBackend):
|
||||
def __init__(self, base_path: str):
|
||||
import os
|
||||
|
||||
self.os = os
|
||||
|
||||
self.base_path = base_path
|
||||
self.os.makedirs(self.base_path, exist_ok=True)
|
||||
self._file = None
|
||||
|
||||
def start(self, filename):
|
||||
self._file = open(self.os.path.join(self.base_path, filename), "wb")
|
||||
|
||||
def upload_chunk(self, chunk: bytes):
|
||||
self._file.write(chunk)
|
||||
|
||||
def finish(self):
|
||||
self._file.close()
|
||||
|
||||
def abort(self):
|
||||
if self._file:
|
||||
self._file.close()
|
||||
self._file = None
|
||||
60
app/utils/youtube.py
Normal file
60
app/utils/youtube.py
Normal file
@@ -0,0 +1,60 @@
|
||||
from typing import Optional
|
||||
|
||||
|
||||
class YtDlpInfo:
|
||||
def __init__(self, url: str):
|
||||
self.url = url
|
||||
self.info = None
|
||||
self._extract_info()
|
||||
|
||||
def _extract_info(self):
|
||||
import yt_dlp
|
||||
|
||||
ydl_opts = {
|
||||
"quiet": True,
|
||||
"skip_download": True,
|
||||
"noplaylist": True,
|
||||
"js-runtimes": "deno"
|
||||
}
|
||||
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
||||
self.info = ydl.extract_info(self.url, download=False)
|
||||
|
||||
@property
|
||||
def title(self) -> str:
|
||||
"""Возвращает title видео"""
|
||||
return self.info.get("title", "unknown")
|
||||
|
||||
def get_video_url(self, resolution: Optional[str] = None) -> Optional[tuple[dict, str]]:
|
||||
"""
|
||||
Возвращает ссылку на видеопоток с указанным разрешением
|
||||
resolution: например '1080p', '720p', '480p'
|
||||
"""
|
||||
formats = self.info.get("formats", [])
|
||||
video_formats = [
|
||||
f for f in formats
|
||||
if f.get("vcodec") != "none" # есть видео
|
||||
and f.get("acodec") == "none" # без аудио
|
||||
]
|
||||
if resolution:
|
||||
# ищем точное соответствие разрешению
|
||||
for f in video_formats:
|
||||
if f.get("format_note") == resolution:
|
||||
return f.get("http_headers", {}), f.get("url")
|
||||
# если разрешение не указано или не найдено — берем лучший
|
||||
if video_formats:
|
||||
# сортируем по height
|
||||
video_formats.sort(key=lambda x: x.get("height") or 0, reverse=True)
|
||||
return video_formats[0].get("http_headers"), video_formats[0].get("url")
|
||||
return None
|
||||
|
||||
def get_audio_url(self) -> Optional[tuple[dict,str]]:
|
||||
"""Возвращает ссылку на аудиопоток"""
|
||||
formats = self.info.get("formats", [])
|
||||
audio_formats = [
|
||||
f for f in formats if f.get("vcodec") == "none" and f.get("acodec") != "none"
|
||||
]
|
||||
if audio_formats:
|
||||
# берем наилучшее качество
|
||||
audio_formats.sort(key=lambda x: x.get("abr") or 0, reverse=True)
|
||||
return audio_formats[0].get("http_headers"), audio_formats[0].get("url")
|
||||
return None
|
||||
Reference in New Issue
Block a user