From bb945649b5935786f28ef9d12a3a0508010b988f Mon Sep 17 00:00:00 2001 From: Bulat Kurbanov Date: Tue, 28 Mar 2023 14:22:07 +0200 Subject: [PATCH] Update job result serialization --- src/core/setup_arq.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/core/setup_arq.py b/src/core/setup_arq.py index 0fe9584..c9620de 100644 --- a/src/core/setup_arq.py +++ b/src/core/setup_arq.py @@ -1,6 +1,7 @@ import asyncio from typing import Any +from arq.worker import JobExecutionFailed import msgpack from app.services.cache_updater import ( @@ -30,12 +31,16 @@ async def shutdown(ctx): def default(obj: Any): if isinstance(obj, asyncio.TimeoutError): return msgpack.ExtType(0, "") + elif isinstance(obj, JobExecutionFailed): + return msgpack.ExtType(1, obj.args[0].encode()) raise TypeError("Unknown type: %r" % (obj,)) -def ext_hook(code: int, data: str): +def ext_hook(code: int, data: bytes): if code == 0: return asyncio.TimeoutError() + elif code == 1: + return JobExecutionFailed((data.decode())) return msgpack.ExtType(code, data) @@ -53,7 +58,7 @@ class WorkerSettings: on_shutdown = shutdown redis_settings = get_redis_settings() max_jobs = 2 - max_tries = 1 + max_tries = 2 job_timeout = 10 * 60 expires_extra_ms = 7 * 24 * 60 * 1000 job_serializer = job_serializer