mirror of
https://github.com/flibusta-apps/telegram_files_cache_server.git
synced 2025-12-06 14:45:36 +01:00
Use taskiq
This commit is contained in:
@@ -1,4 +1,7 @@
|
||||
from fastapi import HTTPException, Security, status
|
||||
from fastapi import HTTPException, Request, Security, status
|
||||
|
||||
from redis.asyncio import ConnectionPool
|
||||
from taskiq import TaskiqDepends
|
||||
|
||||
from core.auth import default_security
|
||||
from core.config import env_config
|
||||
@@ -9,3 +12,7 @@ async def check_token(api_key: str = Security(default_security)):
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN, detail="Wrong api key!"
|
||||
)
|
||||
|
||||
|
||||
def get_redis_pool(request: Request = TaskiqDepends()) -> ConnectionPool:
|
||||
return request.app.state.redis_pool
|
||||
|
||||
@@ -1,24 +1,22 @@
|
||||
import collections
|
||||
from datetime import timedelta
|
||||
from io import BytesIO
|
||||
import logging
|
||||
import random
|
||||
from tempfile import SpooledTemporaryFile
|
||||
from typing import Optional, cast
|
||||
|
||||
from fastapi import UploadFile
|
||||
|
||||
from arq.connections import ArqRedis
|
||||
from arq.worker import Retry
|
||||
import httpx
|
||||
from redis import asyncio as aioredis
|
||||
from redis.exceptions import LockError
|
||||
from redis.asyncio import ConnectionPool, Redis
|
||||
from taskiq import TaskiqDepends
|
||||
|
||||
from app.depends import get_redis_pool
|
||||
from app.models import CachedFile
|
||||
from app.services.caption_getter import get_caption
|
||||
from app.services.downloader import download
|
||||
from app.services.files_client import upload_file
|
||||
from app.services.library_client import Book, get_book, get_books, get_last_book_id
|
||||
from core.taskiq_worker import broker
|
||||
|
||||
|
||||
logger = logging.getLogger("telegram_channel_files_manager")
|
||||
@@ -27,14 +25,17 @@ logger = logging.getLogger("telegram_channel_files_manager")
|
||||
PAGE_SIZE = 100
|
||||
|
||||
|
||||
class Retry(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class FileTypeNotAllowed(Exception):
|
||||
def __init__(self, message: str) -> None:
|
||||
super().__init__(message)
|
||||
|
||||
|
||||
async def check_books_page(ctx: dict, page_number: int) -> None:
|
||||
arq_pool: ArqRedis = ctx["arq_pool"]
|
||||
|
||||
@broker.task
|
||||
async def check_books_page(page_number: int) -> bool:
|
||||
page = await get_books(page_number, PAGE_SIZE)
|
||||
|
||||
object_ids = [book.id for book in page.items]
|
||||
@@ -48,25 +49,21 @@ async def check_books_page(ctx: dict, page_number: int) -> None:
|
||||
for book in page.items:
|
||||
for file_type in book.available_types:
|
||||
if file_type not in cached_files_map[book.id]:
|
||||
await arq_pool.enqueue_job(
|
||||
"cache_file_by_book_id",
|
||||
book.id,
|
||||
file_type,
|
||||
await cache_file_by_book_id.kiq(
|
||||
book_id=book.id,
|
||||
file_type=file_type,
|
||||
by_request=False,
|
||||
_job_id=f"cache_file_by_book_id_{book.id}_{file_type}",
|
||||
)
|
||||
|
||||
return True
|
||||
|
||||
async def check_books(ctx: dict, *args, **kwargs) -> bool: # NOSONAR
|
||||
arq_pool: ArqRedis = ctx["arq_pool"]
|
||||
|
||||
@broker.task
|
||||
async def check_books(*args, **kwargs) -> bool: # NOSONAR
|
||||
last_book_id = await get_last_book_id()
|
||||
|
||||
for page_number in range(0, last_book_id // 100 + 1):
|
||||
await arq_pool.enqueue_job(
|
||||
"check_books_page",
|
||||
page_number,
|
||||
)
|
||||
await check_books_page.kiq(page_number)
|
||||
|
||||
return True
|
||||
|
||||
@@ -76,16 +73,13 @@ async def cache_file(book: Book, file_type: str) -> Optional[CachedFile]:
|
||||
object_id=book.id, object_type=file_type
|
||||
).exists():
|
||||
return
|
||||
|
||||
retry_exc = Retry(defer=timedelta(minutes=15).seconds * random.random())
|
||||
|
||||
try:
|
||||
data = await download(book.source.id, book.remote_id, file_type)
|
||||
except httpx.HTTPError:
|
||||
raise retry_exc
|
||||
data = None
|
||||
|
||||
if data is None:
|
||||
raise retry_exc
|
||||
raise Retry
|
||||
|
||||
response, client, filename = data
|
||||
caption = get_caption(book)
|
||||
@@ -113,41 +107,31 @@ async def cache_file(book: Book, file_type: str) -> Optional[CachedFile]:
|
||||
)
|
||||
|
||||
|
||||
@broker.task
|
||||
async def cache_file_by_book_id(
|
||||
ctx: dict, # NOSONAR
|
||||
book_id: int,
|
||||
file_type: str,
|
||||
by_request: bool = True,
|
||||
redis_pool: ConnectionPool = TaskiqDepends(get_redis_pool),
|
||||
) -> Optional[CachedFile]:
|
||||
r_client: aioredis.Redis = ctx["redis"]
|
||||
|
||||
get_book_retry = 3 if by_request else 1
|
||||
book = await get_book(book_id, get_book_retry)
|
||||
book = await get_book(book_id, 3)
|
||||
|
||||
if book is None:
|
||||
if by_request:
|
||||
return None
|
||||
raise Retry(defer=15)
|
||||
raise Retry
|
||||
|
||||
if file_type not in book.available_types:
|
||||
return None
|
||||
|
||||
lock = r_client.lock(
|
||||
f"{book_id}_{file_type}", blocking_timeout=5, thread_local=False
|
||||
)
|
||||
async with Redis(connection_pool=redis_pool) as redis_client:
|
||||
lock = redis_client.lock(
|
||||
f"{book_id}_{file_type}", blocking_timeout=5, thread_local=False
|
||||
)
|
||||
|
||||
try:
|
||||
try:
|
||||
async with lock:
|
||||
result = await cache_file(book, file_type)
|
||||
async with lock:
|
||||
result = await cache_file(book, file_type)
|
||||
|
||||
if by_request:
|
||||
return result
|
||||
except LockError:
|
||||
raise Retry( # noqa: B904
|
||||
defer=timedelta(minutes=15).seconds * random.random()
|
||||
)
|
||||
except Retry as e:
|
||||
if by_request:
|
||||
return None
|
||||
raise e
|
||||
if by_request:
|
||||
return result
|
||||
return None
|
||||
|
||||
@@ -1,38 +1,46 @@
|
||||
from contextlib import asynccontextmanager
|
||||
|
||||
from fastapi import FastAPI
|
||||
from fastapi.responses import ORJSONResponse
|
||||
|
||||
from prometheus_fastapi_instrumentator import Instrumentator
|
||||
from redis.asyncio import ConnectionPool
|
||||
|
||||
from app.views import healthcheck_router, router
|
||||
from core.arq_pool import get_arq_pool
|
||||
from core.config import REDIS_URL
|
||||
from core.db import database
|
||||
from core.redis_client import get_client
|
||||
import core.sentry # noqa: F401
|
||||
from core.taskiq_worker import broker
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
database = app.state.database
|
||||
if not database.is_connected:
|
||||
await database.connect()
|
||||
|
||||
if not broker.is_worker_process:
|
||||
await broker.startup()
|
||||
|
||||
yield
|
||||
|
||||
if database.is_connected:
|
||||
await database.disconnect()
|
||||
|
||||
if not broker.is_worker_process:
|
||||
await broker.shutdown()
|
||||
|
||||
await app.state.redis_pool.disconnect()
|
||||
|
||||
|
||||
def start_app() -> FastAPI:
|
||||
app = FastAPI(default_response_class=ORJSONResponse)
|
||||
app = FastAPI(default_response_class=ORJSONResponse, lifespan=lifespan)
|
||||
|
||||
app.state.database = database
|
||||
app.state.redis_pool = ConnectionPool.from_url(REDIS_URL)
|
||||
|
||||
app.include_router(router)
|
||||
app.include_router(healthcheck_router)
|
||||
|
||||
@app.on_event("startup")
|
||||
async def startup() -> None:
|
||||
database_ = app.state.database
|
||||
if not database_.is_connected:
|
||||
await database_.connect()
|
||||
|
||||
app.state.arq_pool = await get_arq_pool()
|
||||
app.state.redis_client = get_client()
|
||||
|
||||
@app.on_event("shutdown")
|
||||
async def shutdown() -> None:
|
||||
database_ = app.state.database
|
||||
if database_.is_connected:
|
||||
await database_.disconnect()
|
||||
|
||||
Instrumentator(
|
||||
should_ignore_untemplated=True,
|
||||
excluded_handlers=["/docs", "/metrics", "/healthcheck"],
|
||||
|
||||
@@ -1,48 +0,0 @@
|
||||
import asyncio
|
||||
from typing import Any
|
||||
|
||||
from arq.connections import ArqRedis, RedisSettings, create_pool
|
||||
from arq.worker import JobExecutionFailed
|
||||
import msgpack
|
||||
|
||||
from core.config import env_config
|
||||
|
||||
|
||||
def default(obj: Any):
|
||||
if isinstance(obj, asyncio.TimeoutError):
|
||||
return msgpack.ExtType(0, "")
|
||||
elif isinstance(obj, JobExecutionFailed):
|
||||
return msgpack.ExtType(1, obj.args[0].encode())
|
||||
raise TypeError("Unknown type: %r" % (obj,))
|
||||
|
||||
|
||||
def ext_hook(code: int, data: bytes):
|
||||
if code == 0:
|
||||
return asyncio.TimeoutError()
|
||||
elif code == 1:
|
||||
return JobExecutionFailed((data.decode()))
|
||||
return msgpack.ExtType(code, data)
|
||||
|
||||
|
||||
def job_serializer(d):
|
||||
return msgpack.packb(d, default=default, use_bin_type=True) # noqa: E731
|
||||
|
||||
|
||||
def job_deserializer(b):
|
||||
return msgpack.unpackb(b, ext_hook=ext_hook, raw=False) # noqa: E731
|
||||
|
||||
|
||||
def get_redis_settings() -> RedisSettings:
|
||||
return RedisSettings(
|
||||
host=env_config.REDIS_HOST,
|
||||
port=env_config.REDIS_PORT,
|
||||
database=env_config.REDIS_DB,
|
||||
)
|
||||
|
||||
|
||||
async def get_arq_pool() -> ArqRedis:
|
||||
return await create_pool(
|
||||
get_redis_settings(),
|
||||
job_serializer=job_serializer, # type: ignore
|
||||
job_deserializer=job_deserializer, # noqa: E731
|
||||
)
|
||||
@@ -26,4 +26,8 @@ class EnvConfig(BaseSettings):
|
||||
SENTRY_DSN: str
|
||||
|
||||
|
||||
env_config = EnvConfig()
|
||||
env_config = EnvConfig() # type: ignore
|
||||
|
||||
REDIS_URL = (
|
||||
f"redis://{env_config.REDIS_HOST}:{env_config.REDIS_PORT}/{env_config.REDIS_DB}"
|
||||
)
|
||||
|
||||
@@ -1,9 +0,0 @@
|
||||
from redis import asyncio as aioredis
|
||||
|
||||
from core.config import env_config
|
||||
|
||||
|
||||
def get_client() -> aioredis.Redis:
|
||||
return aioredis.Redis(
|
||||
host=env_config.REDIS_HOST, port=env_config.REDIS_PORT, db=env_config.REDIS_DB
|
||||
)
|
||||
@@ -1,40 +0,0 @@
|
||||
from app.services.cache_updater import (
|
||||
cache_file_by_book_id,
|
||||
check_books,
|
||||
check_books_page,
|
||||
)
|
||||
from core.arq_pool import (
|
||||
get_arq_pool,
|
||||
get_redis_settings,
|
||||
job_deserializer,
|
||||
job_serializer,
|
||||
)
|
||||
from core.db import database
|
||||
from core.redis_client import get_client
|
||||
import core.sentry # noqa: F401
|
||||
|
||||
|
||||
async def startup(ctx):
|
||||
if not database.is_connected:
|
||||
await database.connect()
|
||||
|
||||
ctx["arq_pool"] = await get_arq_pool()
|
||||
ctx["redis"] = get_client()
|
||||
|
||||
|
||||
async def shutdown(ctx):
|
||||
if database.is_connected:
|
||||
await database.disconnect()
|
||||
|
||||
|
||||
class WorkerSettings:
|
||||
functions = [check_books, check_books_page, cache_file_by_book_id]
|
||||
on_startup = startup
|
||||
on_shutdown = shutdown
|
||||
redis_settings = get_redis_settings()
|
||||
max_jobs = 2
|
||||
max_tries = 2
|
||||
job_timeout = 10 * 60
|
||||
expires_extra_ms = 7 * 24 * 60 * 1000
|
||||
job_serializer = job_serializer
|
||||
job_deserializer = job_deserializer
|
||||
17
src/core/taskiq_worker.py
Normal file
17
src/core/taskiq_worker.py
Normal file
@@ -0,0 +1,17 @@
|
||||
from taskiq import SimpleRetryMiddleware
|
||||
import taskiq_fastapi
|
||||
from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend
|
||||
|
||||
from core.config import REDIS_URL
|
||||
|
||||
|
||||
broker = (
|
||||
ListQueueBroker(url=REDIS_URL)
|
||||
.with_result_backend(
|
||||
RedisAsyncResultBackend(redis_url=REDIS_URL, result_ex_time=5 * 60)
|
||||
)
|
||||
.with_middlewares(SimpleRetryMiddleware())
|
||||
)
|
||||
|
||||
|
||||
taskiq_fastapi.init(broker, "main:app")
|
||||
10
src/main.py
10
src/main.py
@@ -1,4 +1,12 @@
|
||||
from core.app import start_app
|
||||
import sentry_sdk
|
||||
|
||||
from core.app import start_app
|
||||
from core.config import env_config
|
||||
|
||||
|
||||
if env_config.SENTRY_DSN:
|
||||
sentry_sdk.init(
|
||||
dsn=env_config.SENTRY_DSN,
|
||||
)
|
||||
|
||||
app = start_app()
|
||||
|
||||
Reference in New Issue
Block a user