mirror of
https://github.com/flibusta-apps/book_library_server.git
synced 2025-12-06 15:15:36 +01:00
Use redis list for base search service
This commit is contained in:
@@ -4,14 +4,11 @@ from concurrent.futures import ThreadPoolExecutor
|
||||
from random import choice
|
||||
from typing import Optional, Generic, TypeVar, TypedDict, Union
|
||||
|
||||
from fastapi import BackgroundTasks
|
||||
|
||||
import aioredis
|
||||
from databases import Database
|
||||
from fastapi_pagination.api import resolve_params
|
||||
from fastapi_pagination.bases import AbstractParams, RawParams
|
||||
import meilisearch
|
||||
import orjson
|
||||
from ormar import Model, QuerySet
|
||||
from sqlalchemy import Table
|
||||
|
||||
@@ -78,15 +75,21 @@ class BaseSearchService(Generic[MODEL, QUERY], abc.ABC):
|
||||
cls,
|
||||
query: QUERY,
|
||||
redis: aioredis.Redis,
|
||||
) -> Optional[list[int]]:
|
||||
params: RawParams,
|
||||
) -> Optional[tuple[int, list[int]]]:
|
||||
try:
|
||||
key = cls.get_cache_key(query)
|
||||
data = await redis.get(key)
|
||||
active_key = f"{key}_active"
|
||||
|
||||
if data is None:
|
||||
if not await redis.exists(active_key):
|
||||
return None
|
||||
|
||||
return orjson.loads(data)
|
||||
objects_count, objects = await asyncio.gather(
|
||||
redis.llen(key),
|
||||
redis.lrange(key, params.offset, params.offset + params.limit),
|
||||
)
|
||||
|
||||
return objects_count, [int(item.decode()) for item in objects]
|
||||
except aioredis.RedisError as e:
|
||||
print(e)
|
||||
return None
|
||||
@@ -97,34 +100,48 @@ class BaseSearchService(Generic[MODEL, QUERY], abc.ABC):
|
||||
query: QUERY,
|
||||
object_ids: list[int],
|
||||
redis: aioredis.Redis,
|
||||
):
|
||||
) -> bool:
|
||||
try:
|
||||
key = cls.get_cache_key(query)
|
||||
await redis.set(key, orjson.dumps(object_ids), ex=cls.CACHE_TTL)
|
||||
active_key = f"{key}_active"
|
||||
|
||||
p = redis.pipeline()
|
||||
|
||||
await p.delete(key)
|
||||
await p.set(active_key, 1, ex=cls.CACHE_TTL)
|
||||
await p.rpush(key, *object_ids)
|
||||
|
||||
await p.execute()
|
||||
|
||||
return True
|
||||
except aioredis.RedisError as e:
|
||||
print(e)
|
||||
return False
|
||||
|
||||
@classmethod
|
||||
async def _get_objects(cls, query: QUERY, redis: aioredis.Redis) -> list[int]:
|
||||
cached_object_ids = await cls.get_cached_ids(query, redis)
|
||||
async def get_object_ids(
|
||||
cls, query: QUERY, redis: aioredis.Redis
|
||||
) -> tuple[int, list[int]]:
|
||||
params = cls.get_raw_params()
|
||||
|
||||
if cached_object_ids is None:
|
||||
object_ids = await cls._get_object_ids(query)
|
||||
if (
|
||||
cached_object_ids := await cls.get_cached_ids(query, redis, params)
|
||||
) is not None:
|
||||
return cached_object_ids
|
||||
|
||||
object_ids = await cls._get_object_ids(query)
|
||||
limited_object_ids = object_ids[params.offset : params.offset + params.limit]
|
||||
|
||||
if len(object_ids) != 0:
|
||||
await cls.cache_object_ids(query, object_ids, redis)
|
||||
else:
|
||||
object_ids = cached_object_ids
|
||||
|
||||
return object_ids
|
||||
return len(object_ids), limited_object_ids
|
||||
|
||||
@classmethod
|
||||
async def get_limited_objects(
|
||||
cls, query: QUERY, redis: aioredis.Redis
|
||||
) -> tuple[int, list[MODEL]]:
|
||||
object_ids = await cls._get_objects(query, redis)
|
||||
|
||||
params = cls.get_raw_params()
|
||||
|
||||
limited_object_ids = object_ids[params.offset : params.offset + params.limit]
|
||||
count, object_ids = await cls.get_object_ids(query, redis)
|
||||
|
||||
queryset: QuerySet[MODEL] = cls.model.objects
|
||||
|
||||
@@ -134,10 +151,8 @@ class BaseSearchService(Generic[MODEL, QUERY], abc.ABC):
|
||||
if cls.SELECT_RELATED:
|
||||
queryset = queryset.select_related(cls.SELECT_RELATED)
|
||||
|
||||
db_objects = await queryset.filter(id__in=limited_object_ids).all()
|
||||
return len(object_ids), sorted(
|
||||
db_objects, key=lambda o: limited_object_ids.index(o.id)
|
||||
)
|
||||
db_objects = await queryset.filter(id__in=object_ids).all()
|
||||
return count, sorted(db_objects, key=lambda o: object_ids.index(o.id))
|
||||
|
||||
@classmethod
|
||||
async def get(cls, query: QUERY, redis: aioredis.Redis) -> Page[MODEL]:
|
||||
@@ -309,9 +324,14 @@ class GetRandomService(Generic[MODEL]):
|
||||
key = cls.get_cache_key(allowed_langs)
|
||||
active_key = f"{key}_active"
|
||||
|
||||
await redis.set(active_key, 1, ex=cls.CACHE_TTL)
|
||||
await redis.delete(key)
|
||||
await redis.sadd(key, *object_ids)
|
||||
p = redis.pipeline()
|
||||
|
||||
await p.set(active_key, 1, ex=cls.CACHE_TTL)
|
||||
await p.delete(key)
|
||||
await p.sadd(key, *object_ids)
|
||||
|
||||
await p.execute()
|
||||
|
||||
return True
|
||||
except aioredis.RedisError as e:
|
||||
print(e)
|
||||
@@ -322,7 +342,6 @@ class GetRandomService(Generic[MODEL]):
|
||||
cls,
|
||||
allowed_langs: frozenset[str],
|
||||
redis: aioredis.Redis,
|
||||
background_tasks: BackgroundTasks,
|
||||
) -> int:
|
||||
cached_object_id = await cls._get_random_object_from_cache(allowed_langs, redis)
|
||||
|
||||
@@ -331,7 +350,7 @@ class GetRandomService(Generic[MODEL]):
|
||||
|
||||
object_ids = await cls._get_objects_from_db(allowed_langs)
|
||||
|
||||
background_tasks.add_task(cls._cache_object_ids, allowed_langs, redis)
|
||||
await cls._cache_object_ids(object_ids, allowed_langs, redis)
|
||||
|
||||
return choice(object_ids)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user