diff --git a/src/app/services/updaters/fl_updater.py b/src/app/services/updaters/fl_updater.py index cf7dd9f..b3e3b3d 100644 --- a/src/app/services/updaters/fl_updater.py +++ b/src/app/services/updaters/fl_updater.py @@ -1,12 +1,16 @@ -import asyncio +from enum import Enum import logging -import re +import time from typing import Optional import aiomysql from arq.connections import ArqRedis +from arq.worker import Retry import asyncpg +from app.services.updaters.utils.cmd import run_cmd +from app.services.updaters.utils.tasks import is_jobs_complete +from app.services.updaters.utils.text import remove_wrong_ch, fix_annotation_text from core.config import env_config @@ -19,758 +23,758 @@ ch.setLevel(logging.INFO) logger.addHandler(ch) -async def run(cmd) -> tuple[bytes, bytes, Optional[int]]: - proc = await asyncio.create_subprocess_shell( - cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE +class JobId(Enum): + import_libavtor = "import_libavtor" + import_libbook = "import_libbook" + import_libavtorname = "import_libavtorname" + import_libtranslator = "import_libtranslator" + import_libseqname = "import_libseqname" + import_libseq = "import_libseq" + import_libgenre = "import_libgenre" + import_libgenrelist = "import_libgenrelsit" + import_lib_b_annotations = "import_lib_b_annotations" + import_lib_b_annotations_pics = "import_lib_b_annotations_pics" + import_lib_a_annotations = "import_lib_a_annotations" + import_lib_a_annotations_pics = "import_lib_a_annotations_pics" + + update_authors = "update_fl_authors" + update_books = "update_fl_books" + update_books_authors = "update_fl_books_authors" + update_translations = "update_fl_translations" + update_sequences = "update_fl_sequences" + update_sequences_info = "update_fl_sequences_info" + update_book_annotations = "update_fl_book_annotations" + update_book_annotations_pic = "update_fl_book_annotations_pic" + update_author_annotations = "update_fl_author_annotations" + update_author_annotations_pics = "update_fl_author_annotations_pics" + update_genres = "update_fl_genres" + update_books_genres = "update_fl_books_genres" + + +async def import_fl_dump(ctx: dict, filename: str, *args, **kwargs): + await run_cmd( + f"wget -O - {env_config.FL_BASE_URL}/sql/{filename}.gz | gunzip | " + f"mysql -h {env_config.MYSQL_HOST} -u {env_config.MYSQL_USER} " + f'-p"{env_config.MYSQL_PASSWORD}" {env_config.MYSQL_DB_NAME}' ) - stdout, stderr = await proc.communicate() - return stdout, stderr, proc.returncode + +async def get_db_pools() -> tuple[asyncpg.Pool, aiomysql.Pool]: + posgres_pool = await asyncpg.create_pool( + database=env_config.POSTGRES_DB_NAME, + host=env_config.POSTGRES_HOST, + port=env_config.POSTGRES_PORT, + user=env_config.POSTGRES_USER, + password=env_config.POSTGRES_PASSWORD, + ) + + mysql_pool = await aiomysql.create_pool( + db=env_config.MYSQL_DB_NAME, + host=env_config.MYSQL_HOST, + port=env_config.MYSQL_PORT, + user=env_config.MYSQL_USER, + password=env_config.MYSQL_PASSWORD, + ) + + assert posgres_pool + + return posgres_pool, mysql_pool -def remove_wrong_ch(s: str): - return s.replace(";", "").replace("\n", " ").replace("ё", "е") +async def get_source(postgres_pool: asyncpg.Pool) -> int: + source_row = await postgres_pool.fetchrow( + "SELECT id FROM sources WHERE name = 'flibusta';" + ) + if not source_row: + await postgres_pool.execute("INSERT INTO sources (name) VALUES ('flibusta');") -def remove_dots(s: str): - return s.replace(".", "") - - -tags_regexp = re.compile(r"<.*?>") - - -def fix_annotation_text(text: str) -> str: - replace_map = { - " ": "", - "[b]": "", - "[/b]": "", - "[hr]": "", - } - - t = tags_regexp.sub("", text) - - for key in replace_map: - t = t.replace(key, replace_map[key]) - - return t - - -class FlUpdater: - SOURCE: int - - FILES = [ - "lib.libavtor.sql", - "lib.libbook.sql", - "lib.libavtorname.sql", - "lib.libtranslator.sql", - "lib.libseqname.sql", - "lib.libseq.sql", - "lib.libgenre.sql", - "lib.libgenrelist.sql", - "lib.b.annotations.sql", - "lib.b.annotations_pics.sql", - "lib.a.annotations.sql", - "lib.a.annotations_pics.sql", - ] - - postgres_pool: asyncpg.Pool - mysql_pool: aiomysql.Pool - - authors_updated_event: asyncio.Event - books_updated_event: asyncio.Event - sequences_updated_event: asyncio.Event - genres_updated_event: asyncio.Event - - async def _import_dump(self, filename: str): - await run( - f"wget -O - {env_config.FL_BASE_URL}/sql/{filename}.gz | gunzip | " - f"mysql -h {env_config.MYSQL_HOST} -u {env_config.MYSQL_USER} " - f'-p"{env_config.MYSQL_PASSWORD}" {env_config.MYSQL_DB_NAME}' - ) - logger.info(f"Imported {filename}") - - async def _prepare(self): - posgres_pool = await asyncpg.create_pool( - database=env_config.POSTGRES_DB_NAME, - host=env_config.POSTGRES_HOST, - port=env_config.POSTGRES_PORT, - user=env_config.POSTGRES_USER, - password=env_config.POSTGRES_PASSWORD, - ) - - self.mysql_pool = await aiomysql.create_pool( - db=env_config.MYSQL_DB_NAME, - host=env_config.MYSQL_HOST, - port=env_config.MYSQL_PORT, - user=env_config.MYSQL_USER, - password=env_config.MYSQL_PASSWORD, - ) - - if not posgres_pool: - raise asyncpg.exceptions.PostgresConnectionError() - - self.postgres_pool = posgres_pool - - async def _set_source(self): - logger.info("Set source...") - - source_row = await self.postgres_pool.fetchrow( + source_row = await postgres_pool.fetchrow( "SELECT id FROM sources WHERE name = 'flibusta';" ) - if not source_row: - await self.postgres_pool.execute( - "INSERT INTO sources (name) VALUES ('flibusta');" + return source_row["id"] + + +async def update_fl_authors(ctx: dict, *args, prefix: Optional[str] = None, **kwargs): + arq_pool: ArqRedis = ctx["arq_pool"] + + if not await is_jobs_complete( + arq_pool, [JobId.import_libavtorname.value], prefix=prefix + ): + raise Retry(defer=60) + + postgres_pool, mysql_pool = await get_db_pools() + + source = await get_source(postgres_pool) + + def prepare_author(row: list): + return [ + source, + row[0], + remove_wrong_ch(row[1]), + remove_wrong_ch(row[2]), + remove_wrong_ch(row[3]), + ] + + await postgres_pool.execute( + """ + CREATE OR REPLACE FUNCTION update_author( + source_ smallint, remote_id_ int, first_name_ varchar, last_name_ varchar, middle_name_ varchar + ) RETURNS void AS $$ + BEGIN + IF EXISTS (SELECT * FROM authors WHERE source = source_ AND remote_id = remote_id_) THEN + UPDATE authors SET first_name = first_name_, last_name = last_name_, middle_name = middle_name_ + WHERE source = source_ AND remote_id = remote_id_; + RETURN; + END IF; + + INSERT INTO authors (source, remote_id, first_name, last_name, middle_name) + VALUES (source_, remote_id_, first_name_, last_name_, middle_name_); + END; + $$ LANGUAGE plpgsql; + """ + ) + + async with mysql_pool.acquire() as conn: + async with conn.cursor(aiomysql.SSCursor) as cursor: + await cursor.execute( + "SELECT AvtorId, FirstName, LastName, MiddleName FROM libavtorname;" ) - source_row = await self.postgres_pool.fetchrow( - "SELECT id FROM sources WHERE name = 'flibusta';" + while rows := await cursor.fetchmany(4096): + await postgres_pool.executemany( + "SELECT update_author($1, $2, cast($3 as varchar), cast($4 as varchar), cast($5 as varchar));", + (prepare_author(row) for row in rows), + ) + + +async def update_fl_books(ctx: dict, *args, prefix: Optional[str] = None, **kwargs): + arq_pool: ArqRedis = ctx["arq_pool"] + + if not await is_jobs_complete( + arq_pool, [JobId.import_libbook.value], prefix=prefix + ): + raise Retry(defer=60) + + postgres_pool, mysql_pool = await get_db_pools() + + source = await get_source(postgres_pool) + + replace_dict = {"ru-": "ru", "ru~": "ru"} + + def fix_lang(lang: str) -> str: + lower_lang = lang.lower() + replaced_lang = replace_dict.get(lower_lang, lower_lang) + return replaced_lang + + def prepare_book(row: list): + return [ + source, + row[0], + remove_wrong_ch(row[1]), + fix_lang(row[2]), + row[3], + row[4], + row[5] == "1", + ] + + await postgres_pool.execute( + """ + CREATE OR REPLACE FUNCTION update_book( + source_ smallint, remote_id_ int, title_ varchar, lang_ varchar, + file_type_ varchar, uploaded_ date, is_deleted_ boolean + ) RETURNS void AS $$ + BEGIN + IF EXISTS (SELECT * FROM books WHERE source = source_ AND remote_id = remote_id_) THEN + UPDATE books SET title = title_, lang = lang_, file_type = file_type_, + uploaded = uploaded_, is_deleted = is_deleted + WHERE source = source_ AND remote_id = remote_id_; + RETURN; + END IF; + + INSERT INTO books (source, remote_id, title, lang, file_type, uploaded, is_deleted) + VALUES (source_, remote_id_, title_, lang_, file_type_, uploaded_, is_deleted_); + END; + $$ LANGUAGE plpgsql; + """ + ) + + async with mysql_pool.acquire() as conn: + async with conn.cursor(aiomysql.SSCursor) as cursor: + await cursor.execute( + "SELECT BookId, Title, Lang, FileType, Time, Deleted FROM libbook;" ) - self.SOURCE = source_row["id"] - - logger.info("Source has set!") - - async def _update_authors(self): - def prepare_author(row: list): - return [ - self.SOURCE, - row[0], - remove_wrong_ch(row[1]), - remove_wrong_ch(row[2]), - remove_wrong_ch(row[3]), - ] - - logger.info("Update authors...") - - await self.postgres_pool.execute( - """ - CREATE OR REPLACE FUNCTION update_author( - source_ smallint, remote_id_ int, first_name_ varchar, last_name_ varchar, middle_name_ varchar - ) RETURNS void AS $$ - BEGIN - IF EXISTS (SELECT * FROM authors WHERE source = source_ AND remote_id = remote_id_) THEN - UPDATE authors SET first_name = first_name_, last_name = last_name_, middle_name = middle_name_ - WHERE source = source_ AND remote_id = remote_id_; - RETURN; - END IF; - - INSERT INTO authors (source, remote_id, first_name, last_name, middle_name) - VALUES (source_, remote_id_, first_name_, last_name_, middle_name_); - END; - $$ LANGUAGE plpgsql; - """ - ) - - async with self.mysql_pool.acquire() as conn: - async with conn.cursor() as cursor: - await cursor.execute("SELECT COUNT(*) FROM libavtorname;") - - (rows_count,) = await cursor.fetchone() - - for offset in range(0, rows_count, 4096): - await cursor.execute( - "SELECT AvtorId, FirstName, LastName, MiddleName FROM libavtorname ORDER BY AvtorId LIMIT 4096 OFFSET {offset};".format( - offset=offset - ) - ) - - rows = await cursor.fetchall() - - await self.postgres_pool.executemany( - "SELECT update_author($1, $2, cast($3 as varchar), cast($4 as varchar), cast($5 as varchar));", - (prepare_author(row) for row in rows), - ) - - self.authors_updated_event.set() - - logger.info("Authors updated!") - - async def _update_books(self): - replace_dict = {"ru-": "ru", "ru~": "ru"} - - def fix_lang(lang: str) -> str: - lower_lang = lang.lower() - replaced_lang = replace_dict.get(lower_lang, lower_lang) - return replaced_lang - - def prepare_book(row: list): - return [ - self.SOURCE, - row[0], - remove_wrong_ch(row[1]), - fix_lang(row[2]), - row[3], - row[4], - row[5] == "1", - ] - - logger.info("Update books...") - - await self.postgres_pool.execute( - """ - CREATE OR REPLACE FUNCTION update_book( - source_ smallint, remote_id_ int, title_ varchar, lang_ varchar, - file_type_ varchar, uploaded_ date, is_deleted_ boolean - ) RETURNS void AS $$ - BEGIN - IF EXISTS (SELECT * FROM books WHERE source = source_ AND remote_id = remote_id_) THEN - UPDATE books SET title = title_, lang = lang_, file_type = file_type_, - uploaded = uploaded_, is_deleted = is_deleted - WHERE source = source_ AND remote_id = remote_id_; - RETURN; - END IF; - - INSERT INTO books (source, remote_id, title, lang, file_type, uploaded, is_deleted) - VALUES (source_, remote_id_, title_, lang_, file_type_, uploaded_, is_deleted_); - END; - $$ LANGUAGE plpgsql; - """ - ) - - async with self.mysql_pool.acquire() as conn: - async with conn.cursor() as cursor: - await cursor.execute("SELECT COUNT(*) FROM libbook;") - - (rows_count,) = await cursor.fetchone() - - for offset in range(0, rows_count, 4096): - await cursor.execute( - "SELECT BookId, Title, Lang, FileType, Time, Deleted FROM libbook ORDER BY BookId LIMIT 4096 OFFSET {offset};".format( - offset=offset - ) - ) - - rows = await cursor.fetchall() - - await self.postgres_pool.executemany( - "SELECT update_book($1, $2, cast($3 as varchar), cast($4 as varchar), cast($5 as varchar), $6, $7);", - (prepare_book(row) for row in rows), - ) - - self.books_updated_event.set() - - logger.info("Books updated!") - - async def _update_books_authors(self): - await self.books_updated_event.wait() - await self.authors_updated_event.wait() - - logger.info("Update books_authors...") - - await self.postgres_pool.execute( - """ - CREATE OR REPLACE FUNCTION update_book_author(source_ smallint, book_ integer, author_ integer) RETURNS void AS $$ - DECLARE - book_id integer := -1; - author_id integer := -1; - BEGIN - SELECT id INTO book_id FROM books WHERE source = source_ AND remote_id = book_; - SELECT id INTO author_id FROM authors WHERE source = source_ AND remote_id = author_; - - IF EXISTS (SELECT * FROM book_authors WHERE book = book_id AND author = author_id) THEN - RETURN; - END IF; - - INSERT INTO book_authors (book, author) VALUES (book_id, author_id); - END; - $$ LANGUAGE plpgsql; - """ - ) - - async with self.mysql_pool.acquire() as conn: - async with conn.cursor() as cursor: - await cursor.execute("SELECT COUNT(*) FROM libavtor;") - - (rows_count,) = await cursor.fetchone() - - for offset in range(0, rows_count, 4096): - await cursor.execute( - "SELECT BookId, AvtorId FROM libavtor ORDER BY BookId, AvtorId LIMIT 4096 OFFSET {offset};".format( - offset=offset - ) - ) - - rows = await cursor.fetchall() - - await self.postgres_pool.executemany( - "SELECT update_book_author($1, $2, $3);", - ((self.SOURCE, *row) for row in rows), - ) - - logger.info("Books authors updated!") - - async def _update_translations(self): - await self.books_updated_event.wait() - await self.authors_updated_event.wait() - - logger.info("Update translations...") - - await self.postgres_pool.execute( - """ - CREATE OR REPLACE FUNCTION update_translation(source_ smallint, book_ integer, author_ integer, position_ smallint) RETURNS void AS $$ - DECLARE - book_id integer := -1; - author_id integer := -1; - BEGIN - SELECT id INTO book_id FROM books WHERE source = source_ AND remote_id = book_; - SELECT id INTO author_id FROM authors WHERE source = source_ AND remote_id = author_; - - IF EXISTS (SELECT * FROM translations WHERE book = book_id AND author = author_id) THEN - UPDATE translations SET position = position_ - WHERE book = book_id AND author = author_id; - RETURN; - END IF; - - INSERT INTO translations (book, author, position) VALUES (book_id, author_id, position_); - END; - $$ LANGUAGE plpgsql; - """ - ) - - async with self.mysql_pool.acquire() as conn: - async with conn.cursor() as cursor: - await cursor.execute( - "SELECT COUNT(*) FROM libtranslator " - "WHERE BookId IN (SELECT BookId FROM libbook);" + while rows := await cursor.fetchmany(1024): + await postgres_pool.executemany( + "SELECT update_book($1, $2, cast($3 as varchar), cast($4 as varchar), cast($5 as varchar), $6, $7);", + (prepare_book(row) for row in rows), ) - (rows_count,) = await cursor.fetchone() - for offset in range(0, rows_count, 4096): - await cursor.execute( - "SELECT BookId, TranslatorId, Pos FROM libtranslator " - "WHERE BookId IN (SELECT BookId FROM libbook) " - "ORDER BY BookId, TranslatorId LIMIT 4096 OFFSET {offset};".format( - offset=offset - ) - ) +async def update_fl_books_authors( + ctx: dict, *arsg, prefix: Optional[str] = None, **kwargs +): + arq_pool: ArqRedis = ctx["arq_pool"] - rows = await cursor.fetchall() + if not await is_jobs_complete( + arq_pool, + [ + JobId.import_libavtor.value, + JobId.update_authors.value, + JobId.update_books.value, + ], + prefix=prefix, + ): + raise Retry(defer=60) - await self.postgres_pool.executemany( - "SELECT update_translation($1, $2, $3, $4)", - ((self.SOURCE, *row) for row in rows), - ) + postgres_pool, mysql_pool = await get_db_pools() - logger.info("Translations updated!") + source = await get_source(postgres_pool) - async def _update_sequences(self): - def prepare_sequence(row: list): - return [ - self.SOURCE, - row[0], - remove_wrong_ch(row[1]), - ] - - logger.info("Update sequences...") - - await self.postgres_pool.execute( - """ - CREATE OR REPLACE FUNCTION update_sequences(source_ smallint, remote_id_ int, name_ varchar) RETURNS void AS $$ - BEGIN - IF EXISTS (SELECT * FROM sequences WHERE source = source_ AND remote_id = remote_id_) THEN - UPDATE sequences SET name = name_ WHERE source = source_ AND remote_id = remote_id_; - RETURN; - END IF; - - INSERT INTO sequences (source, remote_id, name) VALUES (source_, remote_id_, name_); - END; - $$ LANGUAGE plpgsql; + await postgres_pool.execute( """ - ) + CREATE OR REPLACE FUNCTION update_book_author(source_ smallint, book_ integer, author_ integer) RETURNS void AS $$ + DECLARE + book_id integer := -1; + author_id integer := -1; + BEGIN + SELECT id INTO book_id FROM books WHERE source = source_ AND remote_id = book_; + SELECT id INTO author_id FROM authors WHERE source = source_ AND remote_id = author_; - async with self.mysql_pool.acquire() as conn: - async with conn.cursor() as cursor: - await cursor.execute("SELECT COUNT(*) FROM libseqname;") + IF EXISTS (SELECT * FROM book_authors WHERE book = book_id AND author = author_id) THEN + RETURN; + END IF; - (rows_count,) = await cursor.fetchone() + INSERT INTO book_authors (book, author) VALUES (book_id, author_id); + END; + $$ LANGUAGE plpgsql; + """ + ) - for offset in range(0, rows_count, 4096): - await cursor.execute( - "SELECT SeqId, SeqName FROM libseqname ORDER BY SeqId LIMIT 4096 OFFSET {offset};".format( - offset=offset - ) - ) + async with mysql_pool.acquire() as conn: + async with conn.cursor(aiomysql.SSCursor) as cursor: + await cursor.execute("SELECT BookId, AvtorId FROM libavtor;") - rows = await cursor.fetchall() - - await self.postgres_pool.executemany( - "SELECT update_sequences($1, $2, cast($3 as varchar));", - (prepare_sequence(row) for row in rows), - ) - - self.sequences_updated_event.set() - - logger.info("Sequences updated!") - - async def _update_sequences_info(self): - await self.sequences_updated_event.wait() - await self.books_updated_event.wait() - - logger.info("Update book_sequences...") - - await self.postgres_pool.execute( - """ - CREATE OR REPLACE FUNCTION update_book_sequence(source_ smallint, book_ integer, sequence_ integer, position_ smallint) RETURNS void AS $$ - DECLARE - book_id integer := -1; - sequence_id integer := -1; - BEGIN - SELECT id INTO book_id FROM books WHERE source = source_ AND remote_id = book_; - SELECT id INTO sequence_id FROM sequences WHERE source = source_ AND remote_id = sequence_; - - IF EXISTS (SELECT * FROM book_sequences WHERE book = book_id AND sequence = sequence_id) THEN - UPDATE book_sequences SET position = position_ WHERE book = book_id AND sequence = sequence_id; - RETURN; - END IF; - - INSERT INTO book_sequences (book, sequence, position) VALUES (book_id, sequence_id, position_); - END; - $$ LANGUAGE plpgsql; - """ - ) - - async with self.mysql_pool.acquire() as conn: - async with conn.cursor() as cursor: - await cursor.execute( - "SELECT COUNT(*) FROM libseq " - "WHERE " - "BookId IN (SELECT BookId FROM libbook) AND " - "SeqId IN (SELECT SeqId FROM libseqname);" + while rows := await cursor.fetchmany(4096): + await postgres_pool.executemany( + "SELECT update_book_author($1, $2, $3);", + ((source, *row) for row in rows), ) - (rows_count,) = await cursor.fetchone() - for offset in range(0, rows_count, 4096): - await cursor.execute( - "SELECT BookId, SeqId, level FROM libseq " - "WHERE " - "BookId IN (SELECT BookId FROM libbook) AND " - "SeqId IN (SELECT SeqId FROM libseqname) " - "ORDER BY BookId, SeqId LIMIT 4096 OFFSET {offset};".format( - offset=offset - ) - ) +async def update_fl_translations( + ctx: dict, *args, prefix: Optional[str] = None, **kwargs +): + arq_pool: ArqRedis = ctx["arq_pool"] - rows = await cursor.fetchall() + if not await is_jobs_complete( + arq_pool, + [ + JobId.import_libtranslator.value, + JobId.update_authors.value, + JobId.update_books.value, + ], + prefix=prefix, + ): + raise Retry(defer=60) - await self.postgres_pool.executemany( - "SELECT update_book_sequence($1, $2, $3, $4);", - ([self.SOURCE, *row] for row in rows), - ) + postgres_pool, mysql_pool = await get_db_pools() - logger.info("Book sequences updated!") + source = await get_source(postgres_pool) - async def _update_book_annotations(self): - await self.books_updated_event.wait() - - logger.info("Update book_annotations...") - - await self.postgres_pool.execute( - """ - CREATE OR REPLACE FUNCTION update_book_annotation(source_ smallint, book_ integer, title_ varchar, text_ text) RETURNS void AS $$ - DECLARE - book_id integer := -1; - BEGIN - SELECT id INTO book_id FROM books WHERE source = source_ AND remote_id = book_; - - IF EXISTS (SELECT * FROM book_annotations WHERE book = book_id) THEN - UPDATE book_annotations SET title = title_, text = text_ WHERE book = book_id; - RETURN; - END IF; - - INSERT INTO book_annotations (book, title, text) VALUES (book_id, title_, text_); - END; - $$ LANGUAGE plpgsql; + await postgres_pool.execute( """ - ) + CREATE OR REPLACE FUNCTION update_translation(source_ smallint, book_ integer, author_ integer, position_ smallint) RETURNS void AS $$ + DECLARE + book_id integer := -1; + author_id integer := -1; + BEGIN + SELECT id INTO book_id FROM books WHERE source = source_ AND remote_id = book_; + SELECT id INTO author_id FROM authors WHERE source = source_ AND remote_id = author_; - def fix_annotation(row) -> list: - return [ - self.SOURCE, - row[0], - row[1], - fix_annotation_text(row[2]), - ] + IF EXISTS (SELECT * FROM translations WHERE book = book_id AND author = author_id) THEN + UPDATE translations SET position = position_ + WHERE book = book_id AND author = author_id; + RETURN; + END IF; - async with self.mysql_pool.acquire() as conn: - async with conn.cursor() as cursor: - await cursor.execute( - "SELECT COUNT(*) FROM libbannotations " - "WHERE BookId IN (SELECT BookId FROM libbook);" + INSERT INTO translations (book, author, position) VALUES (book_id, author_id, position_); + END; + $$ LANGUAGE plpgsql; + """ + ) + + async with mysql_pool.acquire() as conn: + async with conn.cursor(aiomysql.SSCursor) as cursor: + await cursor.execute( + "SELECT BookId, TranslatorId, Pos FROM libtranslator " + "WHERE BookId IN (SELECT BookId FROM libbook);" + ) + + while rows := await cursor.fetchmany(4096): + await postgres_pool.executemany( + "SELECT update_translation($1, $2, $3, $4)", + ((source, *row) for row in rows), ) - (rows_count,) = await cursor.fetchone() - for offset in range(0, rows_count, 4096): - await cursor.execute( - "SELECT BookId, Title, Body FROM libbannotations " - "WHERE BookId IN (SELECT BookId FROM libbook) " - "ORDER BY BookId LIMIT 4096 OFFSET {offset};".format( - offset=offset - ) - ) +async def update_fl_sequences(ctx: dict, *args, prefix: Optional[str] = None, **kwargs): + arq_pool: ArqRedis = ctx["arq_pool"] - rows = await cursor.fetchall() + if not await is_jobs_complete( + arq_pool, + [ + JobId.import_libseqname.value, + ], + prefix=prefix, + ): + raise Retry(defer=60) - await self.postgres_pool.executemany( - "SELECT update_book_annotation($1, $2, cast($3 as varchar), cast($4 as text));", - (fix_annotation(row) for row in rows), - ) + postgres_pool, mysql_pool = await get_db_pools() - logger.info("Book_annotations updated!") + source = await get_source(postgres_pool) - await self._update_book_annotations_pic() + def prepare_sequence(row: list): + return [ + source, + row[0], + remove_wrong_ch(row[1]), + ] - async def _update_book_annotations_pic(self): - logger.info("Update book_annotations_pic...") - - def fix_link(row): - return [self.SOURCE, row[0], f"{env_config.FL_BASE_URL}/i/{row[1]}"] - - async with self.mysql_pool.acquire() as conn: - async with conn.cursor() as cursor: - await cursor.execute("SELECT COUNT(*) FROM libbpics;") - - (rows_count,) = await cursor.fetchone() - - for offset in range(0, rows_count, 4096): - await cursor.execute( - "SELECT BookId, File FROM libbpics ORDER BY BookId LIMIT 4096 OFFSET {offset};".format( - offset=offset - ) - ) - - rows = await cursor.fetchall() - - await self.postgres_pool.executemany( - "UPDATE book_annotations " - "SET file = cast($3 as varchar) " - "FROM (SELECT id FROM books WHERE source = $1 AND remote_id = $2) as books " - "WHERE book = books.id;", - (fix_link(row) for row in rows), - ) - - logger.info("Book annotation pics updated!") - - async def _update_author_annotations(self): - await self.authors_updated_event.wait() - - logger.info("Update author_annotations...") - - await self.postgres_pool.execute( - """ - CREATE OR REPLACE FUNCTION update_author_annotation(source_ smallint, author_ integer, title_ varchar, text_ text) RETURNS void AS $$ - DECLARE - author_id integer := -1; - BEGIN - SELECT id INTO author_id FROM authors WHERE source = source_ AND remote_id = author_; - - IF EXISTS (SELECT * FROM author_annotations WHERE author = author_id) THEN - UPDATE author_annotations SET title = title_, text = text_ WHERE author = author_id; - RETURN; - END IF; - - INSERT INTO author_annotations (author, title, text) VALUES (author_id, title_, text_); - END; - $$ LANGUAGE plpgsql; + await postgres_pool.execute( """ - ) + CREATE OR REPLACE FUNCTION update_sequences(source_ smallint, remote_id_ int, name_ varchar) RETURNS void AS $$ + BEGIN + IF EXISTS (SELECT * FROM sequences WHERE source = source_ AND remote_id = remote_id_) THEN + UPDATE sequences SET name = name_ WHERE source = source_ AND remote_id = remote_id_; + RETURN; + END IF; - def fix_annotation(row) -> list: - return [ - self.SOURCE, - row[0], - row[1], - fix_annotation_text(row[2]), - ] + INSERT INTO sequences (source, remote_id, name) VALUES (source_, remote_id_, name_); + END; + $$ LANGUAGE plpgsql; + """ + ) - async with self.mysql_pool.acquire() as conn: - async with conn.cursor() as cursor: - await cursor.execute("SELECT COUNT(*) FROM libaannotations;") + async with mysql_pool.acquire() as conn: + async with conn.cursor(aiomysql.SSCursor) as cursor: + await cursor.execute("SELECT SeqId, SeqName FROM libseqname;") - (rows_count,) = await cursor.fetchone() + while rows := await cursor.fetchmany(4096): + await postgres_pool.executemany( + "SELECT update_sequences($1, $2, cast($3 as varchar));", + (prepare_sequence(row) for row in rows), + ) - for offset in range(0, rows_count, 4096): - await cursor.execute( - "SELECT AvtorId, Title, Body FROM libaannotations ORDER BY AvtorId LIMIT 4096 OFFSET {offset};".format( - offset=offset - ) - ) - rows = await cursor.fetchall() +async def update_fl_sequences_info( + ctx: dict, *args, prefix: Optional[str] = None, **kwargs +): + arq_pool: ArqRedis = ctx["arq_pool"] - await self.postgres_pool.executemany( - "SELECT update_author_annotation($1, $2, cast($3 as varchar), cast($4 as text));", - (fix_annotation(row) for row in rows), - ) + if not await is_jobs_complete( + arq_pool, + [ + JobId.import_libseq.value, + JobId.update_sequences.value, + JobId.update_books.value, + ], + prefix=prefix, + ): + raise Retry(defer=60) - logger.info("Author annotation updated!") + postgres_pool, mysql_pool = await get_db_pools() - await self._update_author_annotations_pics() + source = await get_source(postgres_pool) - async def _update_author_annotations_pics(self): - logger.info("Update author_annotations_pic...") - - def fix_link(row): - return [self.SOURCE, row[0], f"{env_config.FL_BASE_URL}/ia/{row[1]}"] - - async with self.mysql_pool.acquire() as conn: - async with conn.cursor() as cursor: - await cursor.execute("SELECT COUNT(*) FROM libapics;") - - (rows_count,) = await cursor.fetchone() - - for offset in range(0, rows_count, 4096): - await cursor.execute( - "SELECT AvtorId, File FROM libapics ORDER BY AvtorId LIMIT 4096 OFFSET {offset};".format( - offset=offset - ) - ) - - rows = await cursor.fetchall() - - await self.postgres_pool.executemany( - "UPDATE author_annotations " - "SET file = cast($3 as varchar) " - "FROM (SELECT id FROM authors WHERE source = $1 AND remote_id = $2) as authors " - "WHERE author = authors.id;", - (fix_link(row) for row in rows), - ) - - logger.info("Author annotatioins pic updated!") - - async def _update_genres(self): - logger.info("Update genres...") - - await self.postgres_pool.execute( - """ - CREATE OR REPLACE FUNCTION update_genre( - source_ smallint, remote_id_ int, code_ varchar, description_ varchar, meta_ varchar - ) RETURNS void AS $$ - BEGIN - IF EXISTS (SELECT * FROM genres WHERE source = source_ AND remote_id = remote_id_) THEN - UPDATE genres SET code = code_, description = description_, meta = meta_ - WHERE source = source_ AND remote_id = remote_id_; - RETURN; - END IF; - - INSERT INTO authors (source, remote_id, code, description, meta) - VALUES (source_, remote_id_, code_, description_, meta_); - END; - $$ LANGUAGE plpgsql; + await postgres_pool.execute( """ - ) + CREATE OR REPLACE FUNCTION update_book_sequence(source_ smallint, book_ integer, sequence_ integer, position_ smallint) RETURNS void AS $$ + DECLARE + book_id integer := -1; + sequence_id integer := -1; + BEGIN + SELECT id INTO book_id FROM books WHERE source = source_ AND remote_id = book_; + SELECT id INTO sequence_id FROM sequences WHERE source = source_ AND remote_id = sequence_; - async with self.mysql_pool.acquire() as conn: - async with conn.cursor() as cursor: - await cursor.execute("SELECT COUNT(*) FROM libgenrelist;") + IF EXISTS (SELECT * FROM book_sequences WHERE book = book_id AND sequence = sequence_id) THEN + UPDATE book_sequences SET position = position_ WHERE book = book_id AND sequence = sequence_id; + RETURN; + END IF; - (rows_count,) = await cursor.fetchone() + INSERT INTO book_sequences (book, sequence, position) VALUES (book_id, sequence_id, position_); + END; + $$ LANGUAGE plpgsql; + """ + ) - for offset in range(0, rows_count, 4096): - await cursor.execute( - "SELECT GenreId, GenreCode, GenreDesc, GenreMeta FROM libgenrelist ORDER BY GenreId LIMIT 4096 OFFSET {offset};".format( - offset=offset - ) - ) + async with mysql_pool.acquire() as conn: + async with conn.cursor(aiomysql.SSCursor) as cursor: + await cursor.execute( + "SELECT BookId, SeqId, level FROM libseq " + "WHERE " + "BookId IN (SELECT BookId FROM libbook) AND " + "SeqId IN (SELECT SeqId FROM libseqname);" + ) - rows = await cursor.fetchall() - await self.postgres_pool.executemany( - "SELECT update_genre($1, $2, cast($3 as varchar), cast($4 as varchar), cast($5 as varchar));", - ([self.SOURCE, *row] for row in rows), - ) + while rows := await cursor.fetchmany(4096): + await postgres_pool.executemany( + "SELECT update_book_sequence($1, $2, $3, $4);", + ([source, *row] for row in rows), + ) - self.genres_updated_event.set() - logger.info("Genres updated!") +async def update_fl_book_annotations( + ctx: dict, *args, prefix: Optional[str] = None, **kwargs +): + arq_pool: ArqRedis = ctx["arq_pool"] - async def _update_books_genres(self): - await self.books_updated_event.wait() - await self.genres_updated_event.wait() + if not await is_jobs_complete( + arq_pool, + [ + JobId.import_lib_b_annotations.value, + JobId.update_books.value, + ], + prefix=prefix, + ): + raise Retry(defer=60) - logger.info("Update book_genres...") + postgres_pool, mysql_pool = await get_db_pools() - await self.postgres_pool.execute( - """ - CREATE OR REPLACE FUNCTION update_book_sequence(source_ smallint, book_ integer, genre_ integer) RETURNS void AS $$ - DECLARE - book_id integer := -1; - genre_id integer := -1; - BEGIN - SELECT id INTO book_id FROM books WHERE source = source_ AND remote_id = book_; - SELECT id INTO genre_id FROM genres WHERE source = source_ AND remote_id = genre_; + source = await get_source(postgres_pool) - IF EXISTS (SELECT * FROM book_genres WHERE book = book_id AND genre = genre_id) THEN - RETURN; - END IF; - - INSERT INTO book_genres (book, genre) VALUES (book_id, genre_id); - END; - $$ LANGUAGE plpgsql; + await postgres_pool.execute( """ - ) + CREATE OR REPLACE FUNCTION update_book_annotation(source_ smallint, book_ integer, title_ varchar, text_ text) RETURNS void AS $$ + DECLARE + book_id integer := -1; + BEGIN + SELECT id INTO book_id FROM books WHERE source = source_ AND remote_id = book_; - async with self.mysql_pool.acquire() as conn: - async with conn.cursor() as cursor: - await cursor.execute("SELECT COUNT(*) FROM libgenre;") + IF EXISTS (SELECT * FROM book_annotations WHERE book = book_id) THEN + UPDATE book_annotations SET title = title_, text = text_ WHERE book = book_id; + RETURN; + END IF; - (rows_count,) = await cursor.fetchone() + INSERT INTO book_annotations (book, title, text) VALUES (book_id, title_, text_); + END; + $$ LANGUAGE plpgsql; + """ + ) - for offset in range(0, rows_count, 4096): - await cursor.execute( - "SELECT BookId, GenreId FROM libgenre ORDER BY BookId, GenreId LIMIT 4096 OFFSET {offset};".format( - offset=offset - ) - ) + def fix_annotation(row) -> list: + return [ + source, + row[0], + row[1], + fix_annotation_text(row[2]), + ] - rows = await cursor.fetchall() + async with mysql_pool.acquire() as conn: + async with conn.cursor(aiomysql.SSCursor) as cursor: + await cursor.execute( + "SELECT BookId, Title, Body FROM libbannotations " + "WHERE BookId IN (SELECT BookId FROM libbook);" + ) - await self.postgres_pool.executemany( - "SELECT update_book_sequence($1, $2, $3);", - ((self.SOURCE, *row) for row in rows), - ) + while rows := await cursor.fetchmany(4096): + await postgres_pool.executemany( + "SELECT update_book_annotation($1, $2, cast($3 as varchar), cast($4 as text));", + (fix_annotation(row) for row in rows), + ) - logger.info("Book_genres updated!") - async def _import(self, ctx) -> bool: - await asyncio.gather(*[self._import_dump(filename) for filename in self.FILES]) +async def update_fl_book_annotations_pic( + ctx: dict, *args, prefix: Optional[str] = None, **kwargs +): + arq_pool: ArqRedis = ctx["arq_pool"] - arq_pool: ArqRedis = ctx["arq_pool"] - await arq_pool.enqueue_job("run_fl_update2") + if not await is_jobs_complete( + arq_pool, + [ + JobId.import_lib_b_annotations_pics.value, + JobId.update_book_annotations.value, + ], + prefix=prefix, + ): + raise Retry(defer=60) - return True + postgres_pool, mysql_pool = await get_db_pools() - async def _update(self, ctx) -> bool: - await self._prepare() + source = await get_source(postgres_pool) - await self._set_source() + def fix_link(row): + return [source, row[0], f"{env_config.FL_BASE_URL}/i/{row[1]}"] - self.authors_updated_event = asyncio.Event() - self.books_updated_event = asyncio.Event() - self.sequences_updated_event = asyncio.Event() - self.genres_updated_event = asyncio.Event() + async with mysql_pool.acquire() as conn: + async with conn.cursor(aiomysql.SSCursor) as cursor: + await cursor.execute("SELECT BookId, File FROM libbpics;") - await asyncio.gather( - self._update_authors(), - self._update_books(), - self._update_books_authors(), - self._update_translations(), - self._update_sequences(), - self._update_sequences_info(), - self._update_book_annotations(), - self._update_author_annotations(), - self._update_genres(), - self._update_books_genres(), - ) + while rows := await cursor.fetchmany(4096): + await postgres_pool.executemany( + "UPDATE book_annotations " + "SET file = cast($3 as varchar) " + "FROM (SELECT id FROM books WHERE source = $1 AND remote_id = $2) as books " + "WHERE book = books.id;", + (fix_link(row) for row in rows), + ) - return True + +async def update_fl_author_annotations( + ctx: dict, *args, prefix: Optional[str] = None, **kwargs +): + arq_pool: ArqRedis = ctx["arq_pool"] + + if not await is_jobs_complete( + arq_pool, + [ + JobId.import_lib_a_annotations.value, + JobId.update_authors.value, + ], + prefix=prefix, + ): + raise Retry(defer=60) + + postgres_pool, mysql_pool = await get_db_pools() + + source = await get_source(postgres_pool) + + await postgres_pool.execute( + """ + CREATE OR REPLACE FUNCTION update_author_annotation(source_ smallint, author_ integer, title_ varchar, text_ text) RETURNS void AS $$ + DECLARE + author_id integer := -1; + BEGIN + SELECT id INTO author_id FROM authors WHERE source = source_ AND remote_id = author_; + + IF EXISTS (SELECT * FROM author_annotations WHERE author = author_id) THEN + UPDATE author_annotations SET title = title_, text = text_ WHERE author = author_id; + RETURN; + END IF; + + INSERT INTO author_annotations (author, title, text) VALUES (author_id, title_, text_); + END; + $$ LANGUAGE plpgsql; + """ + ) + + def fix_annotation(row) -> list: + return [ + source, + row[0], + row[1], + fix_annotation_text(row[2]), + ] + + async with mysql_pool.acquire() as conn: + async with conn.cursor(aiomysql.SSCursor) as cursor: + await cursor.execute("SELECT AvtorId, Title, Body FROM libaannotations;") + + while rows := await cursor.fetchmany(4096): + await postgres_pool.executemany( + "SELECT update_author_annotation($1, $2, cast($3 as varchar), cast($4 as text));", + (fix_annotation(row) for row in rows), + ) + + +async def update_fl_author_annotations_pics( + ctx: dict, *args, prefix: Optional[str] = None, **kwargs +): + arq_pool: ArqRedis = ctx["arq_pool"] + + if not await is_jobs_complete( + arq_pool, + [ + JobId.import_lib_a_annotations_pics.value, + JobId.update_author_annotations.value, + ], + prefix=prefix, + ): + raise Retry(defer=60) + + postgres_pool, mysql_pool = await get_db_pools() + + source = await get_source(postgres_pool) + + def fix_link(row): + return [source, row[0], f"{env_config.FL_BASE_URL}/ia/{row[1]}"] + + async with mysql_pool.acquire() as conn: + async with conn.cursor(aiomysql.SSCursor) as cursor: + await cursor.execute("SELECT AvtorId, File FROM libapics;") + + while rows := await cursor.fetchmany(4096): + await postgres_pool.executemany( + "UPDATE author_annotations " + "SET file = cast($3 as varchar) " + "FROM (SELECT id FROM authors WHERE source = $1 AND remote_id = $2) as authors " + "WHERE author = authors.id;", + (fix_link(row) for row in rows), + ) + + +async def update_fl_genres(ctx: dict, *args, prefix: Optional[str] = None, **kwargs): + arq_pool: ArqRedis = ctx["arq_pool"] + + if not await is_jobs_complete( + arq_pool, + [ + JobId.import_libgenrelist.value, + ], + prefix=prefix, + ): + raise Retry(defer=60) + + postgres_pool, mysql_pool = await get_db_pools() + + source = await get_source(postgres_pool) + + await postgres_pool.execute( + """ + CREATE OR REPLACE FUNCTION update_genre( + source_ smallint, remote_id_ int, code_ varchar, description_ varchar, meta_ varchar + ) RETURNS void AS $$ + BEGIN + IF EXISTS (SELECT * FROM genres WHERE source = source_ AND remote_id = remote_id_) THEN + UPDATE genres SET code = code_, description = description_, meta = meta_ + WHERE source = source_ AND remote_id = remote_id_; + RETURN; + END IF; + + INSERT INTO authors (source, remote_id, code, description, meta) + VALUES (source_, remote_id_, code_, description_, meta_); + END; + $$ LANGUAGE plpgsql; + """ + ) + + async with mysql_pool.acquire() as conn: + async with conn.cursor(aiomysql.SSCursor) as cursor: + await cursor.execute( + "SELECT GenreId, GenreCode, GenreDesc, GenreMeta FROM libgenrelist;" + ) + + while rows := await cursor.fetchmany(4096): + await postgres_pool.executemany( + "SELECT update_genre($1, $2, cast($3 as varchar), cast($4 as varchar), cast($5 as varchar));", + ([source, *row] for row in rows), + ) + + +async def update_fl_books_genres( + ctx: dict, *args, prefix: Optional[str] = None, **kwargs +): + arq_pool: ArqRedis = ctx["arq_pool"] + + if not await is_jobs_complete( + arq_pool, + [ + JobId.import_libgenre.value, + JobId.update_books.value, + JobId.update_genres.value, + ], + prefix=prefix, + ): + raise Retry(defer=60) + + postgres_pool, mysql_pool = await get_db_pools() + + source = await get_source(postgres_pool) + + await postgres_pool.execute( + """ + CREATE OR REPLACE FUNCTION update_book_sequence(source_ smallint, book_ integer, genre_ integer) RETURNS void AS $$ + DECLARE + book_id integer := -1; + genre_id integer := -1; + BEGIN + SELECT id INTO book_id FROM books WHERE source = source_ AND remote_id = book_; + SELECT id INTO genre_id FROM genres WHERE source = source_ AND remote_id = genre_; + + IF EXISTS (SELECT * FROM book_genres WHERE book = book_id AND genre = genre_id) THEN + RETURN; + END IF; + + INSERT INTO book_genres (book, genre) VALUES (book_id, genre_id); + END; + $$ LANGUAGE plpgsql; + """ + ) + + async with mysql_pool.acquire() as conn: + async with conn.cursor(aiomysql.SSCursor) as cursor: + await cursor.execute("SELECT BookId, GenreId FROM libgenre;") + + while rows := await cursor.fetchmany(4096): + await postgres_pool.executemany( + "SELECT update_book_sequence($1, $2, $3);", + ((source, *row) for row in rows), + ) async def run_fl_update(ctx: dict, *args, **kwargs) -> bool: - return await FlUpdater()._import(ctx) + prefix = str(int(time.time()) // (5 * 60)) + + IMPORTS = { + JobId.import_libbook: "lib.libbook.sql", + JobId.import_libavtor: "lib.libavtor.sql", + JobId.import_libavtorname: "lib.libavtorname.sql", + JobId.import_libtranslator: "lib.libtranslator.sql", + JobId.import_libseqname: "lib.libseqname.sql", + JobId.import_libseq: "lib.libseq.sql", + JobId.import_libgenre: "lib.libgenre.sql", + JobId.import_libgenrelist: "lib.libgenrelist.sql", + JobId.import_lib_b_annotations: "lib.b.annotations.sql", + JobId.import_lib_b_annotations_pics: "lib.b.annotations_pics.sql", + JobId.import_lib_a_annotations: "lib.a.annotations.sql", + JobId.import_lib_a_annotations_pics: "lib.a.annotations_pics.sql", + } + + UPDATES = ( + JobId.update_books, + JobId.update_book_annotations, + JobId.update_book_annotations_pic, + JobId.update_books_genres, + JobId.update_authors, + JobId.update_author_annotations, + JobId.update_author_annotations_pics, + JobId.update_books_authors, + JobId.update_translations, + JobId.update_sequences, + JobId.update_sequences_info, + JobId.update_genres, + ) + + arq_pool: ArqRedis = ctx["arq_pool"] + + for job_id, filename in IMPORTS.items(): + await arq_pool.enqueue_job( + "import_fl_dump", filename, _job_id=f"{prefix}_{job_id.value}" + ) + + for job_id in UPDATES: + await arq_pool.enqueue_job( + job_id.value, prefix=prefix, _job_id=f"{prefix}_{job_id.value}" + ) + + return True -async def run_fl_update2(ctx) -> bool: - return await FlUpdater()._update(ctx) +__tasks__ = [ + run_fl_update, + import_fl_dump, + update_fl_authors, + update_fl_books, + update_fl_books_authors, + update_fl_translations, + update_fl_sequences, + update_fl_sequences_info, + update_fl_book_annotations, + update_fl_book_annotations_pic, + update_fl_author_annotations, + update_fl_author_annotations_pics, + update_fl_genres, + update_fl_books_genres, +] diff --git a/src/app/services/updaters/utils/__init__.py b/src/app/services/updaters/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/app/services/updaters/utils/cmd.py b/src/app/services/updaters/utils/cmd.py new file mode 100644 index 0000000..1f3d7d5 --- /dev/null +++ b/src/app/services/updaters/utils/cmd.py @@ -0,0 +1,11 @@ +import asyncio +from typing import Optional + + +async def run_cmd(cmd: str) -> tuple[bytes, bytes, Optional[int]]: + proc = await asyncio.create_subprocess_shell( + cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE + ) + + stdout, stderr = await proc.communicate() + return stdout, stderr, proc.returncode diff --git a/src/app/services/updaters/utils/tasks.py b/src/app/services/updaters/utils/tasks.py new file mode 100644 index 0000000..69bdc5c --- /dev/null +++ b/src/app/services/updaters/utils/tasks.py @@ -0,0 +1,29 @@ +from typing import Optional + +from arq.connections import ArqRedis +from arq.jobs import Job, JobStatus + + +async def is_jobs_complete( + arq_pool: ArqRedis, job_ids: list[str], prefix: Optional[str] = None +) -> Optional[bool]: + job_statuses = set() + for job_id in job_ids: + _job_id = f"{prefix}_{job_id}" if prefix else job_id + status = await Job( + _job_id, arq_pool, arq_pool.default_queue_name, arq_pool.job_deserializer + ).status() + job_statuses.add(status.value) + + if JobStatus.not_found.value in job_statuses: + return False + + for status in ( + JobStatus.deferred.value, + JobStatus.in_progress.value, + JobStatus.queued.value, + ): + if status in job_statuses: + return False + + return True diff --git a/src/app/services/updaters/utils/text.py b/src/app/services/updaters/utils/text.py new file mode 100644 index 0000000..3e5860f --- /dev/null +++ b/src/app/services/updaters/utils/text.py @@ -0,0 +1,28 @@ +import re + + +def remove_wrong_ch(s: str) -> str: + return s.replace(";", "").replace("\n", " ").replace("ё", "е") + + +def remove_dots(s: str) -> str: + return s.replace(".", "") + + +tags_regexp = re.compile(r"<.*?>") + + +def fix_annotation_text(text: str) -> str: + replace_map = { + " ": "", + "[b]": "", + "[/b]": "", + "[hr]": "", + } + + t = tags_regexp.sub("", text) + + for key in replace_map: + t = t.replace(key, replace_map[key]) + + return t diff --git a/src/core/setup_arq.py b/src/core/setup_arq.py index 93d668b..25367b0 100644 --- a/src/core/setup_arq.py +++ b/src/core/setup_arq.py @@ -1,6 +1,8 @@ +from arq.connections import ArqRedis from arq.cron import cron -from app.services.updaters.fl_updater import run_fl_update, run_fl_update2 +from app.services.updaters.fl_updater import __tasks__ as fl_tasks +from app.services.updaters.fl_updater import run_fl_update from core.arq_pool import get_redis_settings, get_arq_pool @@ -8,10 +10,19 @@ async def startup(ctx): ctx["arq_pool"] = await get_arq_pool() +async def shutdown(ctx): + arq_pool: ArqRedis = ctx["arq_pool"] + + arq_pool.close() + await arq_pool.wait_closed() + + class WorkerSettings: - functions = [run_fl_update, run_fl_update2] + functions = [*fl_tasks] on_startup = startup + on_shutdown = shutdown redis_settings = get_redis_settings() - max_jobs = 1 - job_timeout = 45 * 60 + max_jobs = 2 + max_tries = 10 + job_timeout = 5 * 60 cron_jobs = [cron(run_fl_update, hour={5}, minute=0)]