diff --git a/poetry.lock b/poetry.lock index fd1ab3d..a190340 100644 --- a/poetry.lock +++ b/poetry.lock @@ -2,14 +2,14 @@ [[package]] name = "alembic" -version = "1.10.4" +version = "1.11.1" description = "A database migration tool for SQLAlchemy." category = "main" optional = false python-versions = ">=3.7" files = [ - {file = "alembic-1.10.4-py3-none-any.whl", hash = "sha256:43942c3d4bf2620c466b91c0f4fca136fe51ae972394a0cc8b90810d664e4f5c"}, - {file = "alembic-1.10.4.tar.gz", hash = "sha256:295b54bbb92c4008ab6a7dcd1e227e668416d6f84b98b3c4446a2bc6214a556b"}, + {file = "alembic-1.11.1-py3-none-any.whl", hash = "sha256:dc871798a601fab38332e38d6ddb38d5e734f60034baeb8e2db5b642fccd8ab8"}, + {file = "alembic-1.11.1.tar.gz", hash = "sha256:6a810a6b012c88b33458fceb869aef09ac75d6ace5291915ba7fae44de372c01"}, ] [package.dependencies] @@ -41,26 +41,6 @@ doc = ["packaging", "sphinx-autodoc-typehints (>=1.2.0)", "sphinx-rtd-theme"] test = ["contextlib2", "coverage[toml] (>=4.5)", "hypothesis (>=4.0)", "mock (>=4)", "pytest (>=7.0)", "pytest-mock (>=3.6.1)", "trustme", "uvloop (<0.15)", "uvloop (>=0.15)"] trio = ["trio (>=0.16,<0.22)"] -[[package]] -name = "arq" -version = "0.25.0" -description = "Job queues in python with asyncio and redis" -category = "main" -optional = false -python-versions = ">=3.7" -files = [ - {file = "arq-0.25.0-py3-none-any.whl", hash = "sha256:db072d0f39c0bc06b436db67ae1f315c81abc1527563b828955670531815290b"}, - {file = "arq-0.25.0.tar.gz", hash = "sha256:d176ebadfba920c039dc578814d19b7814d67fa15f82fdccccaedb4330d65dae"}, -] - -[package.dependencies] -click = ">=8.0" -redis = {version = ">=4.2.0", extras = ["hiredis"]} -typing-extensions = ">=4.1.0" - -[package.extras] -watch = ["watchfiles (>=0.16)"] - [[package]] name = "async-timeout" version = "4.0.2" @@ -214,19 +194,19 @@ files = [ [[package]] name = "fastapi" -version = "0.95.1" +version = "0.95.2" description = "FastAPI framework, high performance, easy to learn, fast to code, ready for production" category = "main" optional = false python-versions = ">=3.7" files = [ - {file = "fastapi-0.95.1-py3-none-any.whl", hash = "sha256:a870d443e5405982e1667dfe372663abf10754f246866056336d7f01c21dab07"}, - {file = "fastapi-0.95.1.tar.gz", hash = "sha256:9569f0a381f8a457ec479d90fa01005cfddaae07546eb1f3fa035bc4797ae7d5"}, + {file = "fastapi-0.95.2-py3-none-any.whl", hash = "sha256:d374dbc4ef2ad9b803899bd3360d34c534adc574546e25314ab72c0c4411749f"}, + {file = "fastapi-0.95.2.tar.gz", hash = "sha256:4d9d3e8c71c73f11874bcf5e33626258d143252e329a01002f767306c64fb982"}, ] [package.dependencies] pydantic = ">=1.6.2,<1.7 || >1.7,<1.7.1 || >1.7.1,<1.7.2 || >1.7.2,<1.7.3 || >1.7.3,<1.8 || >1.8,<1.8.1 || >1.8.1,<2.0.0" -starlette = ">=0.26.1,<0.27.0" +starlette = ">=0.27.0,<0.28.0" [package.extras] all = ["email-validator (>=1.1.1)", "httpx (>=0.23.0)", "itsdangerous (>=1.1.0)", "jinja2 (>=2.11.2)", "orjson (>=3.2.1)", "python-multipart (>=0.0.5)", "pyyaml (>=5.3.1)", "ujson (>=4.0.1,!=4.0.2,!=4.1.0,!=4.2.0,!=4.3.0,!=5.0.0,!=5.1.0)", "uvicorn[standard] (>=0.12.0)"] @@ -513,14 +493,14 @@ test = ["Cython (>=0.29.24,<0.30.0)"] [[package]] name = "httpx" -version = "0.24.0" +version = "0.24.1" description = "The next generation HTTP client." category = "main" optional = false python-versions = ">=3.7" files = [ - {file = "httpx-0.24.0-py3-none-any.whl", hash = "sha256:447556b50c1921c351ea54b4fe79d91b724ed2b027462ab9a329465d147d5a4e"}, - {file = "httpx-0.24.0.tar.gz", hash = "sha256:507d676fc3e26110d41df7d35ebd8b3b8585052450f4097401c9be59d928c63e"}, + {file = "httpx-0.24.1-py3-none-any.whl", hash = "sha256:06781eb9ac53cde990577af654bd990a4949de37a28bdb4a230d434f3a30b9bd"}, + {file = "httpx-0.24.1.tar.gz", hash = "sha256:5853a43053df830c20f8110c5e69fe44d035d850b2dfe795e196f00fdb774bdd"}, ] [package.dependencies] @@ -562,6 +542,26 @@ files = [ {file = "idna-3.4.tar.gz", hash = "sha256:814f528e8dead7d329833b91c5faa87d60bf71824cd12a7530b5526063d02cb4"}, ] +[[package]] +name = "importlib-metadata" +version = "6.6.0" +description = "Read metadata from Python packages" +category = "main" +optional = false +python-versions = ">=3.7" +files = [ + {file = "importlib_metadata-6.6.0-py3-none-any.whl", hash = "sha256:43dd286a2cd8995d5eaef7fee2066340423b818ed3fd70adf0bad5f1fac53fed"}, + {file = "importlib_metadata-6.6.0.tar.gz", hash = "sha256:92501cdf9cc66ebd3e612f1b4f0c0765dfa42f0fa38ffb319b6bd84dd675d705"}, +] + +[package.dependencies] +zipp = ">=0.5" + +[package.extras] +docs = ["furo", "jaraco.packaging (>=9)", "jaraco.tidelift (>=1.4)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-lint"] +perf = ["ipython"] +testing = ["flake8 (<5)", "flufl.flake8", "importlib-resources (>=1.3)", "packaging", "pyfakefs", "pytest (>=6)", "pytest-black (>=0.3.7)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=1.3)", "pytest-flake8", "pytest-mypy (>=0.9.1)", "pytest-perf (>=0.9.2)"] + [[package]] name = "mako" version = "1.2.4" @@ -953,6 +953,17 @@ files = [ {file = "psycopg2_binary-2.9.6-cp39-cp39-win_amd64.whl", hash = "sha256:f6a88f384335bb27812293fdb11ac6aee2ca3f51d3c7820fe03de0a304ab6249"}, ] +[[package]] +name = "pycron" +version = "3.0.0" +description = "Simple cron-like parser, which determines if current datetime matches conditions." +category = "main" +optional = false +python-versions = ">=3.5" +files = [ + {file = "pycron-3.0.0.tar.gz", hash = "sha256:b916044e3e8253d5409c68df3ac64a3472c4e608dab92f40e8f595e5d3acb3de"}, +] + [[package]] name = "pydantic" version = "1.10.4" @@ -1093,14 +1104,14 @@ ocsp = ["cryptography (>=36.0.1)", "pyopenssl (==20.0.1)", "requests (>=2.26.0)" [[package]] name = "sentry-sdk" -version = "1.22.2" +version = "1.23.1" description = "Python client for Sentry (https://sentry.io)" category = "main" optional = false python-versions = "*" files = [ - {file = "sentry-sdk-1.22.2.tar.gz", hash = "sha256:5932c092c6e6035584eb74d77064e4bce3b7935dfc4a331349719a40db265840"}, - {file = "sentry_sdk-1.22.2-py2.py3-none-any.whl", hash = "sha256:cf89a5063ef84278d186aceaed6fb595bfe67d099298e537634a323664265669"}, + {file = "sentry-sdk-1.23.1.tar.gz", hash = "sha256:0300fbe7a07b3865b3885929fb863a68ff01f59e3bcfb4e7953d0bf7fd19c67f"}, + {file = "sentry_sdk-1.23.1-py2.py3-none-any.whl", hash = "sha256:a884e2478e0b055776ea2b9234d5de9339b4bae0b3a5e74ae43d131db8ded27e"}, ] [package.dependencies] @@ -1117,10 +1128,11 @@ chalice = ["chalice (>=1.16.0)"] django = ["django (>=1.8)"] falcon = ["falcon (>=1.4)"] fastapi = ["fastapi (>=0.79.0)"] -flask = ["blinker (>=1.1)", "flask (>=0.11)"] +flask = ["blinker (>=1.1)", "flask (>=0.11)", "markupsafe"] grpcio = ["grpcio (>=1.21.1)"] httpx = ["httpx (>=0.16.0)"] huey = ["huey (>=2)"] +loguru = ["loguru (>=0.5)"] opentelemetry = ["opentelemetry-distro (>=0.35b0)"] pure-eval = ["asttokens", "executing", "pure-eval"] pymongo = ["pymongo (>=3.1)"] @@ -1239,14 +1251,14 @@ sqlcipher = ["sqlcipher3-binary"] [[package]] name = "starlette" -version = "0.26.1" +version = "0.27.0" description = "The little ASGI library that shines." category = "main" optional = false python-versions = ">=3.7" files = [ - {file = "starlette-0.26.1-py3-none-any.whl", hash = "sha256:e87fce5d7cbdde34b76f0ac69013fd9d190d581d80681493016666e6f96c6d5e"}, - {file = "starlette-0.26.1.tar.gz", hash = "sha256:41da799057ea8620e4667a3e69a5b1923ebd32b1819c8fa75634bbe8d8bea9bd"}, + {file = "starlette-0.27.0-py3-none-any.whl", hash = "sha256:918416370e846586541235ccd38a474c08b80443ed31c578a418e2209b3eef91"}, + {file = "starlette-0.27.0.tar.gz", hash = "sha256:6a6b0d042acb8d469a01eba54e9cda6cbd24ac602c4cd016723117d6a7e73b75"}, ] [package.dependencies] @@ -1255,6 +1267,75 @@ anyio = ">=3.4.0,<5" [package.extras] full = ["httpx (>=0.22.0)", "itsdangerous", "jinja2", "python-multipart", "pyyaml"] +[[package]] +name = "taskiq" +version = "0.4.3" +description = "Distributed task queue with full async support" +category = "main" +optional = false +python-versions = ">=3.7,<4.0" +files = [ + {file = "taskiq-0.4.3-py3-none-any.whl", hash = "sha256:1a9305aae8df5ebd5d2388382da74ac96bb0a9434eeb97ad1dd6e9d9ab9d03aa"}, + {file = "taskiq-0.4.3.tar.gz", hash = "sha256:525644179f587cdacbc356f391639a1d6718ac9625251b36f80d45ee5e64f107"}, +] + +[package.dependencies] +importlib-metadata = "*" +pycron = ">=3.0.0,<4.0.0" +pydantic = ">=1.6.2,<2.0.0" +taskiq_dependencies = ">=1,<2" +typing-extensions = ">=3.10.0.0" + +[package.extras] +metrics = ["prometheus_client (>=0,<1)"] +reload = ["gitignore-parser (>=0,<1)", "watchdog (>=2.1.9,<3.0.0)"] +uv = ["uvloop (>=0.16.0,<1)"] +zmq = ["pyzmq (>=23.2.0,<24.0.0)"] + +[[package]] +name = "taskiq-dependencies" +version = "1.2.3" +description = "FastAPI like dependency injection implementation" +category = "main" +optional = false +python-versions = ">=3.7,<4.0" +files = [ + {file = "taskiq_dependencies-1.2.3-py3-none-any.whl", hash = "sha256:504792d7e505f0db7b4a0b57c20d5b6fd6f1598103eb2c5b8cbbb4d07e0c1f29"}, + {file = "taskiq_dependencies-1.2.3.tar.gz", hash = "sha256:22157aa538c3650ebdc8bdbc4618047d9bf7c29ce8697d7151def79e834ac2d1"}, +] + +[[package]] +name = "taskiq-fastapi" +version = "0.1.2" +description = "FastAPI integration for taskiq" +category = "main" +optional = false +python-versions = ">=3.7,<4.0" +files = [ + {file = "taskiq_fastapi-0.1.2-py3-none-any.whl", hash = "sha256:dbe4728ff6ea5675c76ed81e17bfe2da249ff1e8387e4915111108782b7a5b36"}, + {file = "taskiq_fastapi-0.1.2.tar.gz", hash = "sha256:7edff4a7e20dcf92fa79703a5c474f63b76863a9eb2e436353acb4a5f45a742e"}, +] + +[package.dependencies] +fastapi = "*" +taskiq = ">=0.3.1,<1" + +[[package]] +name = "taskiq-redis" +version = "0.3.1" +description = "Redis integration for taskiq" +category = "main" +optional = false +python-versions = ">=3.7,<4.0" +files = [ + {file = "taskiq_redis-0.3.1-py3-none-any.whl", hash = "sha256:437174b671a992a469fd0866cab2ed6317b9672cbc2a1a804d524f857ade2e6b"}, + {file = "taskiq_redis-0.3.1.tar.gz", hash = "sha256:4d33e9a8aa247cce194f38466d7f3dbe4f5530ca0bbb9a46d9a5bb015a85a3c4"}, +] + +[package.dependencies] +redis = ">=4.2.0,<5.0.0" +taskiq = ">=0,<1" + [[package]] name = "typing-extensions" version = "4.5.0" @@ -1491,7 +1572,23 @@ files = [ {file = "websockets-11.0.2.tar.gz", hash = "sha256:b1a69701eb98ed83dd099de4a686dc892c413d974fa31602bc00aca7cb988ac9"}, ] +[[package]] +name = "zipp" +version = "3.15.0" +description = "Backport of pathlib-compatible object wrapper for zip files" +category = "main" +optional = false +python-versions = ">=3.7" +files = [ + {file = "zipp-3.15.0-py3-none-any.whl", hash = "sha256:48904fc76a60e542af151aded95726c1a5c34ed43ab4134b597665c86d7ad556"}, + {file = "zipp-3.15.0.tar.gz", hash = "sha256:112929ad649da941c23de50f356a2b5570c954b65150642bccdd66bf194d224b"}, +] + +[package.extras] +docs = ["furo", "jaraco.packaging (>=9)", "jaraco.tidelift (>=1.4)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-lint"] +testing = ["big-O", "flake8 (<5)", "jaraco.functools", "jaraco.itertools", "more-itertools", "pytest (>=6)", "pytest-black (>=0.3.7)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=1.3)", "pytest-flake8", "pytest-mypy (>=0.9.1)"] + [metadata] lock-version = "2.0" python-versions = "^3.11" -content-hash = "e63feebc3e80b7d7e6c5cdaaadb0d32b59d61a970dd7698d13fb53ee6a808936" +content-hash = "279cb2082f3a3784c6bc0776b61b2cfba07fd2a3c89651871e45d1b0147e9329" diff --git a/pyproject.toml b/pyproject.toml index b6ff96a..a0c659c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,19 +6,21 @@ authors = ["Kurbanov Bulat "] [tool.poetry.dependencies] python = "^3.11" -fastapi = "^0.95.1" -httpx = "^0.24.0" -alembic = "^1.10.4" +fastapi = "^0.95.2" +httpx = "^0.24.1" +alembic = "^1.11.1" uvicorn = {extras = ["standard"], version = "^0.22.0"} -arq = "^0.25.0" prometheus-fastapi-instrumentator = "^6.0.0" uvloop = "^0.17.0" orjson = "^3.8.12" -sentry-sdk = "^1.22.2" +sentry-sdk = "^1.23.1" ormar = {extras = ["postgresql"], version = "^0.12.1"} pydantic = "^1.10.4" redis = {extras = ["hiredis"], version = "^4.5.5"} msgpack = "^1.0.5" +taskiq = "^0.4.3" +taskiq-redis = "^0.3.1" +taskiq-fastapi = "^0.1.2" [tool.poetry.group.dev.dependencies] pre-commit = "^2.21.0" @@ -55,7 +57,7 @@ exclude = [ ] [tool.ruff.flake8-bugbear] -extend-immutable-calls = ["fastapi.File", "fastapi.Form", "fastapi.Security"] +extend-immutable-calls = ["fastapi.File", "fastapi.Form", "fastapi.Security", "taskiq.TaskiqDepends"] [tool.ruff.mccabe] max-complexity = 15 diff --git a/src/app/depends.py b/src/app/depends.py index 943a882..b29631a 100644 --- a/src/app/depends.py +++ b/src/app/depends.py @@ -1,4 +1,7 @@ -from fastapi import HTTPException, Security, status +from fastapi import HTTPException, Request, Security, status + +from redis.asyncio import ConnectionPool +from taskiq import TaskiqDepends from core.auth import default_security from core.config import env_config @@ -9,3 +12,7 @@ async def check_token(api_key: str = Security(default_security)): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Wrong api key!" ) + + +def get_redis_pool(request: Request = TaskiqDepends()) -> ConnectionPool: + return request.app.state.redis_pool diff --git a/src/app/services/cache_updater.py b/src/app/services/cache_updater.py index 338bbdc..b608afa 100644 --- a/src/app/services/cache_updater.py +++ b/src/app/services/cache_updater.py @@ -1,24 +1,22 @@ import collections -from datetime import timedelta from io import BytesIO import logging -import random from tempfile import SpooledTemporaryFile from typing import Optional, cast from fastapi import UploadFile -from arq.connections import ArqRedis -from arq.worker import Retry import httpx -from redis import asyncio as aioredis -from redis.exceptions import LockError +from redis.asyncio import ConnectionPool, Redis +from taskiq import TaskiqDepends +from app.depends import get_redis_pool from app.models import CachedFile from app.services.caption_getter import get_caption from app.services.downloader import download from app.services.files_client import upload_file from app.services.library_client import Book, get_book, get_books, get_last_book_id +from core.taskiq_worker import broker logger = logging.getLogger("telegram_channel_files_manager") @@ -27,14 +25,17 @@ logger = logging.getLogger("telegram_channel_files_manager") PAGE_SIZE = 100 +class Retry(Exception): + pass + + class FileTypeNotAllowed(Exception): def __init__(self, message: str) -> None: super().__init__(message) -async def check_books_page(ctx: dict, page_number: int) -> None: - arq_pool: ArqRedis = ctx["arq_pool"] - +@broker.task +async def check_books_page(page_number: int) -> bool: page = await get_books(page_number, PAGE_SIZE) object_ids = [book.id for book in page.items] @@ -48,25 +49,21 @@ async def check_books_page(ctx: dict, page_number: int) -> None: for book in page.items: for file_type in book.available_types: if file_type not in cached_files_map[book.id]: - await arq_pool.enqueue_job( - "cache_file_by_book_id", - book.id, - file_type, + await cache_file_by_book_id.kiq( + book_id=book.id, + file_type=file_type, by_request=False, - _job_id=f"cache_file_by_book_id_{book.id}_{file_type}", ) + return True -async def check_books(ctx: dict, *args, **kwargs) -> bool: # NOSONAR - arq_pool: ArqRedis = ctx["arq_pool"] +@broker.task +async def check_books(*args, **kwargs) -> bool: # NOSONAR last_book_id = await get_last_book_id() for page_number in range(0, last_book_id // 100 + 1): - await arq_pool.enqueue_job( - "check_books_page", - page_number, - ) + await check_books_page.kiq(page_number) return True @@ -76,16 +73,13 @@ async def cache_file(book: Book, file_type: str) -> Optional[CachedFile]: object_id=book.id, object_type=file_type ).exists(): return - - retry_exc = Retry(defer=timedelta(minutes=15).seconds * random.random()) - try: data = await download(book.source.id, book.remote_id, file_type) except httpx.HTTPError: - raise retry_exc + data = None if data is None: - raise retry_exc + raise Retry response, client, filename = data caption = get_caption(book) @@ -113,41 +107,31 @@ async def cache_file(book: Book, file_type: str) -> Optional[CachedFile]: ) +@broker.task async def cache_file_by_book_id( - ctx: dict, # NOSONAR book_id: int, file_type: str, by_request: bool = True, + redis_pool: ConnectionPool = TaskiqDepends(get_redis_pool), ) -> Optional[CachedFile]: - r_client: aioredis.Redis = ctx["redis"] - - get_book_retry = 3 if by_request else 1 - book = await get_book(book_id, get_book_retry) + book = await get_book(book_id, 3) if book is None: if by_request: return None - raise Retry(defer=15) + raise Retry if file_type not in book.available_types: return None - lock = r_client.lock( - f"{book_id}_{file_type}", blocking_timeout=5, thread_local=False - ) + async with Redis(connection_pool=redis_pool) as redis_client: + lock = redis_client.lock( + f"{book_id}_{file_type}", blocking_timeout=5, thread_local=False + ) - try: - try: - async with lock: - result = await cache_file(book, file_type) + async with lock: + result = await cache_file(book, file_type) - if by_request: - return result - except LockError: - raise Retry( # noqa: B904 - defer=timedelta(minutes=15).seconds * random.random() - ) - except Retry as e: - if by_request: - return None - raise e + if by_request: + return result + return None diff --git a/src/core/app.py b/src/core/app.py index ea9ca20..2805e1e 100644 --- a/src/core/app.py +++ b/src/core/app.py @@ -1,38 +1,46 @@ +from contextlib import asynccontextmanager + from fastapi import FastAPI from fastapi.responses import ORJSONResponse from prometheus_fastapi_instrumentator import Instrumentator +from redis.asyncio import ConnectionPool from app.views import healthcheck_router, router -from core.arq_pool import get_arq_pool +from core.config import REDIS_URL from core.db import database -from core.redis_client import get_client -import core.sentry # noqa: F401 +from core.taskiq_worker import broker + + +@asynccontextmanager +async def lifespan(app: FastAPI): + database = app.state.database + if not database.is_connected: + await database.connect() + + if not broker.is_worker_process: + await broker.startup() + + yield + + if database.is_connected: + await database.disconnect() + + if not broker.is_worker_process: + await broker.shutdown() + + await app.state.redis_pool.disconnect() def start_app() -> FastAPI: - app = FastAPI(default_response_class=ORJSONResponse) + app = FastAPI(default_response_class=ORJSONResponse, lifespan=lifespan) app.state.database = database + app.state.redis_pool = ConnectionPool.from_url(REDIS_URL) app.include_router(router) app.include_router(healthcheck_router) - @app.on_event("startup") - async def startup() -> None: - database_ = app.state.database - if not database_.is_connected: - await database_.connect() - - app.state.arq_pool = await get_arq_pool() - app.state.redis_client = get_client() - - @app.on_event("shutdown") - async def shutdown() -> None: - database_ = app.state.database - if database_.is_connected: - await database_.disconnect() - Instrumentator( should_ignore_untemplated=True, excluded_handlers=["/docs", "/metrics", "/healthcheck"], diff --git a/src/core/arq_pool.py b/src/core/arq_pool.py deleted file mode 100644 index 13422d5..0000000 --- a/src/core/arq_pool.py +++ /dev/null @@ -1,48 +0,0 @@ -import asyncio -from typing import Any - -from arq.connections import ArqRedis, RedisSettings, create_pool -from arq.worker import JobExecutionFailed -import msgpack - -from core.config import env_config - - -def default(obj: Any): - if isinstance(obj, asyncio.TimeoutError): - return msgpack.ExtType(0, "") - elif isinstance(obj, JobExecutionFailed): - return msgpack.ExtType(1, obj.args[0].encode()) - raise TypeError("Unknown type: %r" % (obj,)) - - -def ext_hook(code: int, data: bytes): - if code == 0: - return asyncio.TimeoutError() - elif code == 1: - return JobExecutionFailed((data.decode())) - return msgpack.ExtType(code, data) - - -def job_serializer(d): - return msgpack.packb(d, default=default, use_bin_type=True) # noqa: E731 - - -def job_deserializer(b): - return msgpack.unpackb(b, ext_hook=ext_hook, raw=False) # noqa: E731 - - -def get_redis_settings() -> RedisSettings: - return RedisSettings( - host=env_config.REDIS_HOST, - port=env_config.REDIS_PORT, - database=env_config.REDIS_DB, - ) - - -async def get_arq_pool() -> ArqRedis: - return await create_pool( - get_redis_settings(), - job_serializer=job_serializer, # type: ignore - job_deserializer=job_deserializer, # noqa: E731 - ) diff --git a/src/core/config.py b/src/core/config.py index 22b5926..927d52d 100644 --- a/src/core/config.py +++ b/src/core/config.py @@ -26,4 +26,8 @@ class EnvConfig(BaseSettings): SENTRY_DSN: str -env_config = EnvConfig() +env_config = EnvConfig() # type: ignore + +REDIS_URL = ( + f"redis://{env_config.REDIS_HOST}:{env_config.REDIS_PORT}/{env_config.REDIS_DB}" +) diff --git a/src/core/redis_client.py b/src/core/redis_client.py deleted file mode 100644 index 09e6518..0000000 --- a/src/core/redis_client.py +++ /dev/null @@ -1,9 +0,0 @@ -from redis import asyncio as aioredis - -from core.config import env_config - - -def get_client() -> aioredis.Redis: - return aioredis.Redis( - host=env_config.REDIS_HOST, port=env_config.REDIS_PORT, db=env_config.REDIS_DB - ) diff --git a/src/core/setup_arq.py b/src/core/setup_arq.py deleted file mode 100644 index bfc6012..0000000 --- a/src/core/setup_arq.py +++ /dev/null @@ -1,40 +0,0 @@ -from app.services.cache_updater import ( - cache_file_by_book_id, - check_books, - check_books_page, -) -from core.arq_pool import ( - get_arq_pool, - get_redis_settings, - job_deserializer, - job_serializer, -) -from core.db import database -from core.redis_client import get_client -import core.sentry # noqa: F401 - - -async def startup(ctx): - if not database.is_connected: - await database.connect() - - ctx["arq_pool"] = await get_arq_pool() - ctx["redis"] = get_client() - - -async def shutdown(ctx): - if database.is_connected: - await database.disconnect() - - -class WorkerSettings: - functions = [check_books, check_books_page, cache_file_by_book_id] - on_startup = startup - on_shutdown = shutdown - redis_settings = get_redis_settings() - max_jobs = 2 - max_tries = 2 - job_timeout = 10 * 60 - expires_extra_ms = 7 * 24 * 60 * 1000 - job_serializer = job_serializer - job_deserializer = job_deserializer diff --git a/src/core/taskiq_worker.py b/src/core/taskiq_worker.py new file mode 100644 index 0000000..acab39f --- /dev/null +++ b/src/core/taskiq_worker.py @@ -0,0 +1,17 @@ +from taskiq import SimpleRetryMiddleware +import taskiq_fastapi +from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend + +from core.config import REDIS_URL + + +broker = ( + ListQueueBroker(url=REDIS_URL) + .with_result_backend( + RedisAsyncResultBackend(redis_url=REDIS_URL, result_ex_time=5 * 60) + ) + .with_middlewares(SimpleRetryMiddleware()) +) + + +taskiq_fastapi.init(broker, "main:app") diff --git a/src/main.py b/src/main.py index 0a4385b..5af4648 100644 --- a/src/main.py +++ b/src/main.py @@ -1,4 +1,12 @@ -from core.app import start_app +import sentry_sdk +from core.app import start_app +from core.config import env_config + + +if env_config.SENTRY_DSN: + sentry_sdk.init( + dsn=env_config.SENTRY_DSN, + ) app = start_app()