Fix job serializer/deserializer

This commit is contained in:
2023-03-28 15:39:18 +02:00
parent bb945649b5
commit 04533ab759
6 changed files with 65 additions and 44 deletions

View File

@@ -1,9 +1,37 @@
import asyncio
from typing import Any
from arq.connections import ArqRedis, RedisSettings, create_pool
from arq.worker import JobExecutionFailed
import msgpack
from core.config import env_config
def default(obj: Any):
if isinstance(obj, asyncio.TimeoutError):
return msgpack.ExtType(0, "")
elif isinstance(obj, JobExecutionFailed):
return msgpack.ExtType(1, obj.args[0].encode())
raise TypeError("Unknown type: %r" % (obj,))
def ext_hook(code: int, data: bytes):
if code == 0:
return asyncio.TimeoutError()
elif code == 1:
return JobExecutionFailed((data.decode()))
return msgpack.ExtType(code, data)
def job_serializer(d):
return msgpack.packb(d, default=default, use_bin_type=True) # noqa: E731
def job_deserializer(b):
return msgpack.unpackb(b, ext_hook=ext_hook, raw=False) # noqa: E731
def get_redis_settings() -> RedisSettings:
return RedisSettings(
host=env_config.REDIS_HOST,
@@ -15,6 +43,6 @@ def get_redis_settings() -> RedisSettings:
async def get_arq_pool() -> ArqRedis:
return await create_pool(
get_redis_settings(),
job_serializer=msgpack.packb, # type: ignore
job_deserializer=lambda b: msgpack.unpackb(b, raw=False), # noqa: E731
job_serializer=job_serializer, # type: ignore
job_deserializer=job_deserializer, # noqa: E731
)

View File

@@ -1,15 +1,14 @@
import asyncio
from typing import Any
from arq.worker import JobExecutionFailed
import msgpack
from app.services.cache_updater import (
cache_file_by_book_id,
check_books,
check_books_page,
)
from core.arq_pool import get_arq_pool, get_redis_settings
from core.arq_pool import (
get_arq_pool,
get_redis_settings,
job_deserializer,
job_serializer,
)
from core.db import database
from core.redis_client import get_client
import core.sentry # noqa: F401
@@ -19,7 +18,7 @@ async def startup(ctx):
if not database.is_connected:
await database.connect()
ctx["arc_pool"] = await get_arq_pool()
ctx["arq_pool"] = await get_arq_pool()
ctx["redis"] = get_client()
@@ -28,30 +27,6 @@ async def shutdown(ctx):
await database.disconnect()
def default(obj: Any):
if isinstance(obj, asyncio.TimeoutError):
return msgpack.ExtType(0, "")
elif isinstance(obj, JobExecutionFailed):
return msgpack.ExtType(1, obj.args[0].encode())
raise TypeError("Unknown type: %r" % (obj,))
def ext_hook(code: int, data: bytes):
if code == 0:
return asyncio.TimeoutError()
elif code == 1:
return JobExecutionFailed((data.decode()))
return msgpack.ExtType(code, data)
def job_serializer(d):
return msgpack.packb(d, default=default, use_bin_type=True) # noqa: E731
def job_deserializer(b):
return msgpack.unpackb(b, ext_hook=ext_hook, raw=False) # noqa: E731
class WorkerSettings:
functions = [check_books, check_books_page, cache_file_by_book_id]
on_startup = startup