diff --git a/src/core/setup_arq.py b/src/core/setup_arq.py index bd6ddee..0fe9584 100644 --- a/src/core/setup_arq.py +++ b/src/core/setup_arq.py @@ -1,3 +1,6 @@ +import asyncio +from typing import Any + import msgpack from app.services.cache_updater import ( @@ -24,6 +27,26 @@ async def shutdown(ctx): await database.disconnect() +def default(obj: Any): + if isinstance(obj, asyncio.TimeoutError): + return msgpack.ExtType(0, "") + raise TypeError("Unknown type: %r" % (obj,)) + + +def ext_hook(code: int, data: str): + if code == 0: + return asyncio.TimeoutError() + return msgpack.ExtType(code, data) + + +def job_serializer(d): + return msgpack.packb(d, default=default, use_bin_type=True) # noqa: E731 + + +def job_deserializer(b): + return msgpack.unpackb(b, ext_hook=ext_hook, raw=False) # noqa: E731 + + class WorkerSettings: functions = [check_books, check_books_page, cache_file_by_book_id] on_startup = startup @@ -32,6 +55,6 @@ class WorkerSettings: max_jobs = 2 max_tries = 1 job_timeout = 10 * 60 - job_serializer = msgpack.packb - job_deserializer = lambda b: msgpack.unpackb(b, raw=False) # noqa: E731 expires_extra_ms = 7 * 24 * 60 * 1000 + job_serializer = job_serializer + job_deserializer = job_deserializer