Добавил возможность загружать файлы сразу в S3

This commit is contained in:
Viner Abubakirov
2026-02-19 13:52:02 +05:00
parent 1981cb7da3
commit 8ac132e503
6 changed files with 198 additions and 30 deletions

View File

@@ -1,5 +1,9 @@
import os
from abc import ABC, abstractmethod
import boto3
from botocore.client import Config
class ChunkUploadBackend(ABC):
@abstractmethod
@@ -11,7 +15,7 @@ class ChunkUploadBackend(ABC):
"""Загрузка очередного чанка"""
@abstractmethod
def finish(self) -> None:
def finish(self) -> any:
"""Завершение загрузки"""
@abstractmethod
@@ -21,24 +25,96 @@ class ChunkUploadBackend(ABC):
class DiskChunkUploadBackend(ChunkUploadBackend):
def __init__(self, base_path: str):
import os
self.os = os
self.base_path = base_path
self.os.makedirs(self.base_path, exist_ok=True)
os.makedirs(self.base_path, exist_ok=True)
self._file = None
def start(self, filename):
self._file = open(self.os.path.join(self.base_path, filename), "wb")
self._file = open(os.path.join(self.base_path, filename), "wb")
def upload_chunk(self, chunk: bytes):
self._file.write(chunk)
def finish(self):
if self._file is None:
return
self._file.close()
return os.path.join(self.base_path, self._file.name)
def abort(self):
if self._file:
self._file.close()
self._file = None
class S3ChunkUploadBackend(ChunkUploadBackend):
def __init__(self, key_prefix: str = ""):
from dotenv import load_dotenv
load_dotenv(".env")
self.s3 = boto3.client(
service_name="s3",
aws_access_key_id=os.getenv("S3_ACCESS_KEY"),
aws_secret_access_key=os.getenv("S3_SECRET_KEY"),
endpoint_url=os.getenv("S3_ENDPOINT_URL"),
region_name=os.getenv("S3_REGION_NAME"),
use_ssl=True,
config=Config(signature_version=os.getenv("S3_SIGNATURE_VERSION")),
)
self.bucket = os.getenv("S3_BUCKET_NAME")
self.key_prefix = key_prefix
self.upload_id = None
self.parts = []
self.part_number = 1
self.buffer = bytearray()
self.multipart_threshold = 5 * 1024 * 1024 # 5MB
self.key = None
def start(self, filename: str) -> None:
self.key = f"{self.key_prefix}{filename}"
response = self.s3.create_multipart_upload(Bucket=self.bucket, Key=self.key)
self.upload_id = response["UploadId"]
def upload_chunk(self, chunk: bytes) -> None:
self.buffer.extend(chunk)
if len(self.buffer) >= self.multipart_threshold:
self._flush_part()
def _flush_part(self):
response = self.s3.upload_part(
Bucket=self.bucket,
Key=self.key,
PartNumber=self.part_number,
UploadId=self.upload_id,
Body=bytes(self.buffer),
)
self.parts.append({"PartNumber": self.part_number, "ETag": response["ETag"]})
self.part_number += 1
self.buffer.clear()
def finish(self):
if self.buffer:
self._flush_part()
# Сбрасываем счетчик
self.part_number = 1
response = self.s3.complete_multipart_upload(
Bucket=self.bucket,
Key=self.key,
UploadId=self.upload_id,
MultipartUpload={"Parts": self.parts},
)
# Сбрасываем части
self.parts = []
return response
def abort(self) -> None:
if self.upload_id:
self.s3.abort_multipart_upload(
Bucket=self.bucket, Key=self.key, UploadId=self.upload_id
)
self.part_number = 1
self.parts = []

View File

@@ -46,25 +46,38 @@ class YtDlpManager:
"--no-warnings",
"-o",
"-",
self.url
self.url,
]
print("Start processing")
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, bufsize=0)
print("Write filename to upload backend")
self.backend.start(self.title + ".mp4")
print("Start write chunk to upload backend")
chunk_size = 1024 ** 2
return self._processing(command, self.title + ".mp4")
def download_audio(self):
command = [
"yt-dlp",
"-f",
"bestaudio",
"--no-part",
"--quiet",
"--no-warnings",
"-o",
"-",
self.url,
]
return self._processing(command, self.title + ".m4a")
def _processing(self, command: list[str], filename: str, chunk_size: int = 1024**2):
process = subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, bufsize=0
)
self.backend.start(filename)
length = 0
while True:
chunk = process.stdout.read(chunk_size)
if not chunk:
break
length += chunk_size
print("Write chunk to backend", length)
self.backend.upload_chunk(chunk)
print("End writing to backend")
ret = process.wait()
print("Check ret status")
if ret != 0:
self.backend.abort()
raise RuntimeError(f"yt-dlp failed, status code: {ret}")
return self.backend.finish()