import os from abc import ABC, abstractmethod from urllib.parse import quote import boto3 from botocore.client import Config from app.core.config import settings class ChunkUploadBackend(ABC): @abstractmethod def start(self, filename: str) -> None: """Начало загрузки нового файла""" @abstractmethod def upload_chunk(self, chunk: bytes) -> None: """Загрузка очередного чанка""" @abstractmethod def finish(self) -> str: """Завершение загрузки""" @abstractmethod def abort(self) -> None: """Прерывания загрузки""" class UploadBackend(ABC): @abstractmethod def upload(self, name: str, file: bytes | str) -> str: """Загрузка файла""" class DiskChunkUploadBackend(ChunkUploadBackend): def __init__(self, key_prefix: str = ""): self.base_path = str(settings.MEDIA_DIR) os.makedirs(self.base_path, exist_ok=True) self.key_prefix = key_prefix self._file = None def start(self, filename): name = f"{self.key_prefix}{filename}" self._file = open(os.path.join(self.base_path, name), "wb") def upload_chunk(self, chunk: bytes): self._file.write(chunk) def finish(self): if self._file is None: return self._file.close() relative_path = settings.MEDIA_DIR.relative_to(settings.BASE_DIR) return str(relative_path / self._file.name) def abort(self): if self._file: self._file.close() self._file = None class S3ChunkUploadBackend(ChunkUploadBackend): def __init__(self, key_prefix: str = ""): self.s3 = boto3.client( service_name="s3", aws_access_key_id=settings.S3_ACCESS_KEY, aws_secret_access_key=settings.S3_SECRET_KEY, endpoint_url=settings.S3_ENDPOINT_URL, region_name=settings.S3_REGION_NAME, use_ssl=True, config=Config(signature_version=settings.S3_SIGNATURE_VERSION), ) self.bucket = settings.S3_BUCKET_NAME self.key_prefix = key_prefix self.upload_id = None self.parts = [] self.part_number = 1 self.buffer = bytearray() self.multipart_threshold = 5 * 1024 * 1024 # 5MB self.key = None def start(self, filename: str) -> None: self.key = f"{self.key_prefix}{filename}" response = self.s3.create_multipart_upload(Bucket=self.bucket, Key=self.key) self.upload_id = response["UploadId"] def upload_chunk(self, chunk: bytes) -> None: self.buffer.extend(chunk) if len(self.buffer) >= self.multipart_threshold: self._flush_part() def _flush_part(self): response = self.s3.upload_part( Bucket=self.bucket, Key=self.key, PartNumber=self.part_number, UploadId=self.upload_id, Body=bytes(self.buffer), ) self.parts.append({"PartNumber": self.part_number, "ETag": response["ETag"]}) self.part_number += 1 self.buffer.clear() def finish(self): if self.buffer: self._flush_part() # Сбрасываем счетчик self.part_number = 1 response = self.s3.complete_multipart_upload( Bucket=self.bucket, Key=self.key, UploadId=self.upload_id, MultipartUpload={"Parts": self.parts}, ) # Сбрасываем части self.parts = [] return response["Location"] def abort(self) -> None: if self.upload_id: self.s3.abort_multipart_upload( Bucket=self.bucket, Key=self.key, UploadId=self.upload_id ) self.part_number = 1 self.parts = [] class HybridDiskS3UploadBackend(ChunkUploadBackend): def __init__(self, key_prefix: str = ""): self.s3 = boto3.client( service_name="s3", aws_access_key_id=settings.S3_ACCESS_KEY, aws_secret_access_key=settings.S3_SECRET_KEY, endpoint_url=settings.S3_ENDPOINT_URL, region_name=settings.S3_REGION_NAME, use_ssl=True, config=Config(signature_version=settings.S3_SIGNATURE_VERSION), ) self.bucket = settings.S3_BUCKET_NAME self.key_prefix = key_prefix self.filename = None self.disk_backend = DiskChunkUploadBackend() def start(self, filename): self.filename = filename return self.disk_backend.start(filename) def upload_chunk(self, chunk): return self.disk_backend.upload_chunk(chunk) def abort(self): self.filename = None return self.disk_backend.abort() def finish(self): if filepath := self.disk_backend.finish(): response = self.s3.upload_file( Filename=filepath, Bucket=settings.S3_BUCKET_NAME, Key=f"{self.key_prefix}{self.filename}", ) os.remove(filepath) return response["Location"] class S3UploadBackend(UploadBackend): def __init__(self, key_prefix=""): self.s3 = boto3.client( service_name="s3", aws_access_key_id=settings.S3_ACCESS_KEY, aws_secret_access_key=settings.S3_SECRET_KEY, endpoint_url=settings.S3_ENDPOINT_URL, region_name=settings.S3_REGION_NAME, use_ssl=True, config=Config(signature_version=settings.S3_SIGNATURE_VERSION), ) self.bucket = settings.S3_BUCKET_NAME self.key_prefix = key_prefix def upload(self, name: str, file: bytes | str): key = f"{self.key_prefix}{name}" if isinstance(file, str): self.s3.upload_file(Filename=file, Bucket=self.bucket, Key=key) else: self.s3.put_object(Bucket=self.bucket, Key=key, Body=file) encoded_key = quote(key, "") return f"{settings.S3_ENDPOINT_URL}/{self.bucket}/{encoded_key}"