import collections from datetime import date, timedelta from io import BytesIO from tempfile import SpooledTemporaryFile from typing import Optional, cast from fastapi import UploadFile import httpx from redis.asyncio import ConnectionPool, Redis from taskiq import TaskiqDepends from app.depends import get_redis_pool from app.models import CachedFile from app.services.caption_getter import get_caption from app.services.downloader import download from app.services.files_client import upload_file from app.services.library_client import Book, get_book, get_books from core.taskiq_worker import broker PAGE_SIZE = 100 class Retry(Exception): pass class FileTypeNotAllowed(Exception): def __init__(self, message: str) -> None: super().__init__(message) @broker.task async def check_books_page( page_number: int, uploaded_gte: date, uploaded_lte: date ) -> bool: page = await get_books( page_number, PAGE_SIZE, uploaded_gte=uploaded_gte, uploaded_lte=uploaded_lte ) object_ids = [book.id for book in page.items] cached_files = await CachedFile.objects.filter(object_id__in=object_ids).all() cached_files_map = collections.defaultdict(set) for cached_file in cached_files: cached_files_map[cached_file.object_id].add(cached_file.object_type) for book in page.items: for file_type in book.available_types: if file_type not in cached_files_map[book.id]: await cache_file_by_book_id.kiq( book_id=book.id, file_type=file_type, by_request=False, ) return True @broker.task async def check_books(*args, **kwargs) -> bool: uploaded_lte = date.today() + timedelta(days=1) uploaded_gte = date.today() - timedelta(days=1) books_page = await get_books( 1, PAGE_SIZE, uploaded_gte=uploaded_gte, uploaded_lte=uploaded_lte ) for page_number in range(1, books_page.pages + 1): await check_books_page.kiq( page_number, uploaded_gte=uploaded_gte, uploaded_lte=uploaded_lte ) return True async def cache_file(book: Book, file_type: str) -> Optional[CachedFile]: if await CachedFile.objects.filter( object_id=book.id, object_type=file_type ).exists(): return try: data = await download(book.source.id, book.remote_id, file_type) except httpx.HTTPError: data = None if data is None: raise Retry response, client, filename = data caption = get_caption(book) temp_file = UploadFile(BytesIO(), filename=filename) async for chunk in response.aiter_bytes(2048): await temp_file.write(chunk) await temp_file.seek(0) await response.aclose() await client.aclose() upload_data = await upload_file( cast(SpooledTemporaryFile, temp_file.file), filename, caption ) if upload_data is None: return None cached_file, created = await CachedFile.objects.get_or_create( { "message_id": upload_data.data["message_id"], "chat_id": upload_data.data["chat_id"], }, object_id=book.id, object_type=file_type, ) if created: return cached_file cached_file.message_id = upload_data.data["message_id"] cached_file.chat_id = upload_data.data["chat_id"] return await cached_file.update(["message_id", "chat_id"]) @broker.task(retry_on_error=True) async def cache_file_by_book_id( book_id: int, file_type: str, by_request: bool = True, redis_pool: ConnectionPool = TaskiqDepends(get_redis_pool), ) -> Optional[CachedFile]: book = await get_book(book_id, 3) if book is None: if by_request: return None raise Retry if file_type not in book.available_types: return None async with Redis(connection_pool=redis_pool) as redis_client: lock = redis_client.lock( f"{book_id}_{file_type}", blocking_timeout=5, thread_local=False ) if await lock.locked() and not by_request: raise Retry try: result = await cache_file(book, file_type) except Retry as e: if by_request: return None raise e if by_request: return result return None