From 078ca87b4f25ae4337a360e062045ebd83b3644d Mon Sep 17 00:00:00 2001 From: Bulat Kurbanov Date: Sat, 9 Apr 2022 19:52:28 +0300 Subject: [PATCH] Add update_genres task --- src/app/services.py | 41 +++++++++++++++++++++++++++++++++++++++++ src/core/setup_arq.py | 12 +++++++++--- 2 files changed, 50 insertions(+), 3 deletions(-) diff --git a/src/app/services.py b/src/app/services.py index 123d19a..9560b0d 100644 --- a/src/app/services.py +++ b/src/app/services.py @@ -143,11 +143,52 @@ async def update_sequences(ctx) -> bool: return True +async def update_genres(ctx) -> bool: + loop = asyncio.get_event_loop() + + meili = get_meilisearch_client() + index = meili.index("genres") + + postgres = await get_postgres_connection() + + async with postgres.transaction(): + cursor = await postgres.cursor( + "SELECT id, description, meta, " + " array( " + " SELECT DISTINCT lang FROM book_genres " + " LEFT JOIN books ON book = books.id " + " WHERE genres.id = book_genres.genre " + " AND books.is_deleted = 'f' " + " ) as langs, " + " ( " + " SELECT count(*) FROM book_genres " + " LEFT JOIN books ON book = books.id " + " WHERE genres.id = book_genres.genre " + " AND books.is_deleted = 'f' " + " ) as books_count " + "FROM genres;" + ) + + while rows := await cursor.fetch(1024): + await loop.run_in_executor( + thread_pool, index.add_documents, [dict(row) for row in rows] + ) + + index.update_searchable_attributes(["description"]) + index.update_filterable_attributes(["langs"]) + index.update_ranking_rules([*DEFAULT_RANKING_RULES, "books_count:desc"]) + + await postgres.close() + + return True + + async def update(ctx: dict, *args, **kwargs) -> bool: arq_pool: ArqRedis = ctx["arc_pool"] await arq_pool.enqueue_job("update_books") await arq_pool.enqueue_job("update_authors") await arq_pool.enqueue_job("update_sequences") + await arq_pool.enqueue_job("update_genres") return True diff --git a/src/core/setup_arq.py b/src/core/setup_arq.py index 2be2bca..74d2e59 100644 --- a/src/core/setup_arq.py +++ b/src/core/setup_arq.py @@ -1,6 +1,12 @@ from arq.cron import cron -from app.services import update, update_books, update_authors, update_sequences +from app.services import ( + update, + update_books, + update_authors, + update_sequences, + update_genres, +) from core.arq_pool import get_redis_settings, get_arq_pool @@ -9,9 +15,9 @@ async def startup(ctx): class WorkerSettings: - functions = [update, update_books, update_authors, update_sequences] + functions = [update, update_books, update_authors, update_sequences, update_genres] on_startup = startup redis_settings = get_redis_settings() - max_jobs = 3 + max_jobs = 2 job_timeout = 15 * 60 cron_jobs = [cron(update, hour={4}, minute=0)]