From f5b7eee1a31d526e39b254c45010786eae617e39 Mon Sep 17 00:00:00 2001 From: Bulat Kurbanov Date: Fri, 18 Mar 2022 21:47:21 +0300 Subject: [PATCH] Fix db connections close --- src/app/services/updaters/fl_updater.py | 338 +++++++++++++----------- 1 file changed, 182 insertions(+), 156 deletions(-) diff --git a/src/app/services/updaters/fl_updater.py b/src/app/services/updaters/fl_updater.py index b3e3b3d..5698cfa 100644 --- a/src/app/services/updaters/fl_updater.py +++ b/src/app/services/updaters/fl_updater.py @@ -59,8 +59,8 @@ async def import_fl_dump(ctx: dict, filename: str, *args, **kwargs): ) -async def get_db_pools() -> tuple[asyncpg.Pool, aiomysql.Pool]: - posgres_pool = await asyncpg.create_pool( +async def get_db_cons() -> tuple[asyncpg.Connection, aiomysql.Connection]: + posgres = await asyncpg.connect( database=env_config.POSTGRES_DB_NAME, host=env_config.POSTGRES_HOST, port=env_config.POSTGRES_PORT, @@ -68,7 +68,7 @@ async def get_db_pools() -> tuple[asyncpg.Pool, aiomysql.Pool]: password=env_config.POSTGRES_PASSWORD, ) - mysql_pool = await aiomysql.create_pool( + mysql = await aiomysql.connect( db=env_config.MYSQL_DB_NAME, host=env_config.MYSQL_HOST, port=env_config.MYSQL_PORT, @@ -76,23 +76,25 @@ async def get_db_pools() -> tuple[asyncpg.Pool, aiomysql.Pool]: password=env_config.MYSQL_PASSWORD, ) - assert posgres_pool + assert posgres - return posgres_pool, mysql_pool + return posgres, mysql -async def get_source(postgres_pool: asyncpg.Pool) -> int: - source_row = await postgres_pool.fetchrow( +async def get_source(postgres: asyncpg.Connection) -> int: + source_row = await postgres.fetchrow( "SELECT id FROM sources WHERE name = 'flibusta';" ) if not source_row: - await postgres_pool.execute("INSERT INTO sources (name) VALUES ('flibusta');") + await postgres.execute("INSERT INTO sources (name) VALUES ('flibusta');") - source_row = await postgres_pool.fetchrow( + source_row = await postgres.fetchrow( "SELECT id FROM sources WHERE name = 'flibusta';" ) + assert source_row + return source_row["id"] @@ -104,9 +106,9 @@ async def update_fl_authors(ctx: dict, *args, prefix: Optional[str] = None, **kw ): raise Retry(defer=60) - postgres_pool, mysql_pool = await get_db_pools() + postgres, mysql = await get_db_cons() - source = await get_source(postgres_pool) + source = await get_source(postgres) def prepare_author(row: list): return [ @@ -117,7 +119,7 @@ async def update_fl_authors(ctx: dict, *args, prefix: Optional[str] = None, **kw remove_wrong_ch(row[3]), ] - await postgres_pool.execute( + await postgres.execute( """ CREATE OR REPLACE FUNCTION update_author( source_ smallint, remote_id_ int, first_name_ varchar, last_name_ varchar, middle_name_ varchar @@ -136,17 +138,19 @@ async def update_fl_authors(ctx: dict, *args, prefix: Optional[str] = None, **kw """ ) - async with mysql_pool.acquire() as conn: - async with conn.cursor(aiomysql.SSCursor) as cursor: - await cursor.execute( - "SELECT AvtorId, FirstName, LastName, MiddleName FROM libavtorname;" + async with mysql.cursor(aiomysql.SSCursor) as cursor: + await cursor.execute( + "SELECT AvtorId, FirstName, LastName, MiddleName FROM libavtorname;" + ) + + while rows := await cursor.fetchmany(4096): + await postgres.executemany( + "SELECT update_author($1, $2, cast($3 as varchar), cast($4 as varchar), cast($5 as varchar));", + (prepare_author(row) for row in rows), ) - while rows := await cursor.fetchmany(4096): - await postgres_pool.executemany( - "SELECT update_author($1, $2, cast($3 as varchar), cast($4 as varchar), cast($5 as varchar));", - (prepare_author(row) for row in rows), - ) + await postgres.close() + mysql.close() async def update_fl_books(ctx: dict, *args, prefix: Optional[str] = None, **kwargs): @@ -157,9 +161,9 @@ async def update_fl_books(ctx: dict, *args, prefix: Optional[str] = None, **kwar ): raise Retry(defer=60) - postgres_pool, mysql_pool = await get_db_pools() + postgres, mysql = await get_db_cons() - source = await get_source(postgres_pool) + source = await get_source(postgres) replace_dict = {"ru-": "ru", "ru~": "ru"} @@ -179,7 +183,7 @@ async def update_fl_books(ctx: dict, *args, prefix: Optional[str] = None, **kwar row[5] == "1", ] - await postgres_pool.execute( + await postgres.execute( """ CREATE OR REPLACE FUNCTION update_book( source_ smallint, remote_id_ int, title_ varchar, lang_ varchar, @@ -200,17 +204,19 @@ async def update_fl_books(ctx: dict, *args, prefix: Optional[str] = None, **kwar """ ) - async with mysql_pool.acquire() as conn: - async with conn.cursor(aiomysql.SSCursor) as cursor: - await cursor.execute( - "SELECT BookId, Title, Lang, FileType, Time, Deleted FROM libbook;" + async with mysql.cursor(aiomysql.SSCursor) as cursor: + await cursor.execute( + "SELECT BookId, Title, Lang, FileType, Time, Deleted FROM libbook;" + ) + + while rows := await cursor.fetchmany(1024): + await postgres.executemany( + "SELECT update_book($1, $2, cast($3 as varchar), cast($4 as varchar), cast($5 as varchar), $6, $7);", + (prepare_book(row) for row in rows), ) - while rows := await cursor.fetchmany(1024): - await postgres_pool.executemany( - "SELECT update_book($1, $2, cast($3 as varchar), cast($4 as varchar), cast($5 as varchar), $6, $7);", - (prepare_book(row) for row in rows), - ) + await postgres.close() + mysql.close() async def update_fl_books_authors( @@ -229,11 +235,11 @@ async def update_fl_books_authors( ): raise Retry(defer=60) - postgres_pool, mysql_pool = await get_db_pools() + postgres, mysql = await get_db_cons() - source = await get_source(postgres_pool) + source = await get_source(postgres) - await postgres_pool.execute( + await postgres.execute( """ CREATE OR REPLACE FUNCTION update_book_author(source_ smallint, book_ integer, author_ integer) RETURNS void AS $$ DECLARE @@ -253,15 +259,17 @@ async def update_fl_books_authors( """ ) - async with mysql_pool.acquire() as conn: - async with conn.cursor(aiomysql.SSCursor) as cursor: - await cursor.execute("SELECT BookId, AvtorId FROM libavtor;") + async with mysql.cursor(aiomysql.SSCursor) as cursor: + await cursor.execute("SELECT BookId, AvtorId FROM libavtor;") - while rows := await cursor.fetchmany(4096): - await postgres_pool.executemany( - "SELECT update_book_author($1, $2, $3);", - ((source, *row) for row in rows), - ) + while rows := await cursor.fetchmany(4096): + await postgres.executemany( + "SELECT update_book_author($1, $2, $3);", + ((source, *row) for row in rows), + ) + + await postgres.close() + mysql.close() async def update_fl_translations( @@ -280,11 +288,11 @@ async def update_fl_translations( ): raise Retry(defer=60) - postgres_pool, mysql_pool = await get_db_pools() + postgres, mysql = await get_db_cons() - source = await get_source(postgres_pool) + source = await get_source(postgres) - await postgres_pool.execute( + await postgres.execute( """ CREATE OR REPLACE FUNCTION update_translation(source_ smallint, book_ integer, author_ integer, position_ smallint) RETURNS void AS $$ DECLARE @@ -306,18 +314,20 @@ async def update_fl_translations( """ ) - async with mysql_pool.acquire() as conn: - async with conn.cursor(aiomysql.SSCursor) as cursor: - await cursor.execute( - "SELECT BookId, TranslatorId, Pos FROM libtranslator " - "WHERE BookId IN (SELECT BookId FROM libbook);" + async with mysql.cursor(aiomysql.SSCursor) as cursor: + await cursor.execute( + "SELECT BookId, TranslatorId, Pos FROM libtranslator " + "WHERE BookId IN (SELECT BookId FROM libbook);" + ) + + while rows := await cursor.fetchmany(4096): + await postgres.executemany( + "SELECT update_translation($1, $2, $3, $4)", + ((source, *row) for row in rows), ) - while rows := await cursor.fetchmany(4096): - await postgres_pool.executemany( - "SELECT update_translation($1, $2, $3, $4)", - ((source, *row) for row in rows), - ) + await postgres.close() + mysql.close() async def update_fl_sequences(ctx: dict, *args, prefix: Optional[str] = None, **kwargs): @@ -332,9 +342,9 @@ async def update_fl_sequences(ctx: dict, *args, prefix: Optional[str] = None, ** ): raise Retry(defer=60) - postgres_pool, mysql_pool = await get_db_pools() + postgres, mysql = await get_db_cons() - source = await get_source(postgres_pool) + source = await get_source(postgres) def prepare_sequence(row: list): return [ @@ -343,7 +353,7 @@ async def update_fl_sequences(ctx: dict, *args, prefix: Optional[str] = None, ** remove_wrong_ch(row[1]), ] - await postgres_pool.execute( + await postgres.execute( """ CREATE OR REPLACE FUNCTION update_sequences(source_ smallint, remote_id_ int, name_ varchar) RETURNS void AS $$ BEGIN @@ -358,15 +368,17 @@ async def update_fl_sequences(ctx: dict, *args, prefix: Optional[str] = None, ** """ ) - async with mysql_pool.acquire() as conn: - async with conn.cursor(aiomysql.SSCursor) as cursor: - await cursor.execute("SELECT SeqId, SeqName FROM libseqname;") + async with mysql.cursor(aiomysql.SSCursor) as cursor: + await cursor.execute("SELECT SeqId, SeqName FROM libseqname;") - while rows := await cursor.fetchmany(4096): - await postgres_pool.executemany( - "SELECT update_sequences($1, $2, cast($3 as varchar));", - (prepare_sequence(row) for row in rows), - ) + while rows := await cursor.fetchmany(4096): + await postgres.executemany( + "SELECT update_sequences($1, $2, cast($3 as varchar));", + (prepare_sequence(row) for row in rows), + ) + + await postgres.close() + mysql.close() async def update_fl_sequences_info( @@ -385,11 +397,11 @@ async def update_fl_sequences_info( ): raise Retry(defer=60) - postgres_pool, mysql_pool = await get_db_pools() + postgres, mysql = await get_db_cons() - source = await get_source(postgres_pool) + source = await get_source(postgres) - await postgres_pool.execute( + await postgres.execute( """ CREATE OR REPLACE FUNCTION update_book_sequence(source_ smallint, book_ integer, sequence_ integer, position_ smallint) RETURNS void AS $$ DECLARE @@ -410,20 +422,22 @@ async def update_fl_sequences_info( """ ) - async with mysql_pool.acquire() as conn: - async with conn.cursor(aiomysql.SSCursor) as cursor: - await cursor.execute( - "SELECT BookId, SeqId, level FROM libseq " - "WHERE " - "BookId IN (SELECT BookId FROM libbook) AND " - "SeqId IN (SELECT SeqId FROM libseqname);" + async with mysql.cursor(aiomysql.SSCursor) as cursor: + await cursor.execute( + "SELECT BookId, SeqId, level FROM libseq " + "WHERE " + "BookId IN (SELECT BookId FROM libbook) AND " + "SeqId IN (SELECT SeqId FROM libseqname);" + ) + + while rows := await cursor.fetchmany(4096): + await postgres.executemany( + "SELECT update_book_sequence($1, $2, $3, $4);", + ([source, *row] for row in rows), ) - while rows := await cursor.fetchmany(4096): - await postgres_pool.executemany( - "SELECT update_book_sequence($1, $2, $3, $4);", - ([source, *row] for row in rows), - ) + await postgres.close() + mysql.close() async def update_fl_book_annotations( @@ -441,11 +455,11 @@ async def update_fl_book_annotations( ): raise Retry(defer=60) - postgres_pool, mysql_pool = await get_db_pools() + postgres, mysql = await get_db_cons() - source = await get_source(postgres_pool) + source = await get_source(postgres) - await postgres_pool.execute( + await postgres.execute( """ CREATE OR REPLACE FUNCTION update_book_annotation(source_ smallint, book_ integer, title_ varchar, text_ text) RETURNS void AS $$ DECLARE @@ -472,18 +486,20 @@ async def update_fl_book_annotations( fix_annotation_text(row[2]), ] - async with mysql_pool.acquire() as conn: - async with conn.cursor(aiomysql.SSCursor) as cursor: - await cursor.execute( - "SELECT BookId, Title, Body FROM libbannotations " - "WHERE BookId IN (SELECT BookId FROM libbook);" + async with mysql.cursor(aiomysql.SSCursor) as cursor: + await cursor.execute( + "SELECT BookId, Title, Body FROM libbannotations " + "WHERE BookId IN (SELECT BookId FROM libbook);" + ) + + while rows := await cursor.fetchmany(4096): + await postgres.executemany( + "SELECT update_book_annotation($1, $2, cast($3 as varchar), cast($4 as text));", + (fix_annotation(row) for row in rows), ) - while rows := await cursor.fetchmany(4096): - await postgres_pool.executemany( - "SELECT update_book_annotation($1, $2, cast($3 as varchar), cast($4 as text));", - (fix_annotation(row) for row in rows), - ) + await postgres.close() + mysql.close() async def update_fl_book_annotations_pic( @@ -501,25 +517,27 @@ async def update_fl_book_annotations_pic( ): raise Retry(defer=60) - postgres_pool, mysql_pool = await get_db_pools() + postgres, mysql = await get_db_cons() - source = await get_source(postgres_pool) + source = await get_source(postgres) def fix_link(row): return [source, row[0], f"{env_config.FL_BASE_URL}/i/{row[1]}"] - async with mysql_pool.acquire() as conn: - async with conn.cursor(aiomysql.SSCursor) as cursor: - await cursor.execute("SELECT BookId, File FROM libbpics;") + async with mysql.cursor(aiomysql.SSCursor) as cursor: + await cursor.execute("SELECT BookId, File FROM libbpics;") - while rows := await cursor.fetchmany(4096): - await postgres_pool.executemany( - "UPDATE book_annotations " - "SET file = cast($3 as varchar) " - "FROM (SELECT id FROM books WHERE source = $1 AND remote_id = $2) as books " - "WHERE book = books.id;", - (fix_link(row) for row in rows), - ) + while rows := await cursor.fetchmany(4096): + await postgres.executemany( + "UPDATE book_annotations " + "SET file = cast($3 as varchar) " + "FROM (SELECT id FROM books WHERE source = $1 AND remote_id = $2) as books " + "WHERE book = books.id;", + (fix_link(row) for row in rows), + ) + + await postgres.close() + mysql.close() async def update_fl_author_annotations( @@ -537,11 +555,11 @@ async def update_fl_author_annotations( ): raise Retry(defer=60) - postgres_pool, mysql_pool = await get_db_pools() + postgres, mysql = await get_db_cons() - source = await get_source(postgres_pool) + source = await get_source(postgres) - await postgres_pool.execute( + await postgres.execute( """ CREATE OR REPLACE FUNCTION update_author_annotation(source_ smallint, author_ integer, title_ varchar, text_ text) RETURNS void AS $$ DECLARE @@ -568,15 +586,17 @@ async def update_fl_author_annotations( fix_annotation_text(row[2]), ] - async with mysql_pool.acquire() as conn: - async with conn.cursor(aiomysql.SSCursor) as cursor: - await cursor.execute("SELECT AvtorId, Title, Body FROM libaannotations;") + async with mysql.cursor(aiomysql.SSCursor) as cursor: + await cursor.execute("SELECT AvtorId, Title, Body FROM libaannotations;") - while rows := await cursor.fetchmany(4096): - await postgres_pool.executemany( - "SELECT update_author_annotation($1, $2, cast($3 as varchar), cast($4 as text));", - (fix_annotation(row) for row in rows), - ) + while rows := await cursor.fetchmany(4096): + await postgres.executemany( + "SELECT update_author_annotation($1, $2, cast($3 as varchar), cast($4 as text));", + (fix_annotation(row) for row in rows), + ) + + await postgres.close() + mysql.close() async def update_fl_author_annotations_pics( @@ -594,25 +614,27 @@ async def update_fl_author_annotations_pics( ): raise Retry(defer=60) - postgres_pool, mysql_pool = await get_db_pools() + postgres, mysql = await get_db_cons() - source = await get_source(postgres_pool) + source = await get_source(postgres) def fix_link(row): return [source, row[0], f"{env_config.FL_BASE_URL}/ia/{row[1]}"] - async with mysql_pool.acquire() as conn: - async with conn.cursor(aiomysql.SSCursor) as cursor: - await cursor.execute("SELECT AvtorId, File FROM libapics;") + async with mysql.cursor(aiomysql.SSCursor) as cursor: + await cursor.execute("SELECT AvtorId, File FROM libapics;") - while rows := await cursor.fetchmany(4096): - await postgres_pool.executemany( - "UPDATE author_annotations " - "SET file = cast($3 as varchar) " - "FROM (SELECT id FROM authors WHERE source = $1 AND remote_id = $2) as authors " - "WHERE author = authors.id;", - (fix_link(row) for row in rows), - ) + while rows := await cursor.fetchmany(4096): + await postgres.executemany( + "UPDATE author_annotations " + "SET file = cast($3 as varchar) " + "FROM (SELECT id FROM authors WHERE source = $1 AND remote_id = $2) as authors " + "WHERE author = authors.id;", + (fix_link(row) for row in rows), + ) + + await postgres.close() + mysql.close() async def update_fl_genres(ctx: dict, *args, prefix: Optional[str] = None, **kwargs): @@ -627,11 +649,11 @@ async def update_fl_genres(ctx: dict, *args, prefix: Optional[str] = None, **kwa ): raise Retry(defer=60) - postgres_pool, mysql_pool = await get_db_pools() + postgres, mysql = await get_db_cons() - source = await get_source(postgres_pool) + source = await get_source(postgres) - await postgres_pool.execute( + await postgres.execute( """ CREATE OR REPLACE FUNCTION update_genre( source_ smallint, remote_id_ int, code_ varchar, description_ varchar, meta_ varchar @@ -650,17 +672,19 @@ async def update_fl_genres(ctx: dict, *args, prefix: Optional[str] = None, **kwa """ ) - async with mysql_pool.acquire() as conn: - async with conn.cursor(aiomysql.SSCursor) as cursor: - await cursor.execute( - "SELECT GenreId, GenreCode, GenreDesc, GenreMeta FROM libgenrelist;" + async with mysql.cursor(aiomysql.SSCursor) as cursor: + await cursor.execute( + "SELECT GenreId, GenreCode, GenreDesc, GenreMeta FROM libgenrelist;" + ) + + while rows := await cursor.fetchmany(4096): + await postgres.executemany( + "SELECT update_genre($1, $2, cast($3 as varchar), cast($4 as varchar), cast($5 as varchar));", + ([source, *row] for row in rows), ) - while rows := await cursor.fetchmany(4096): - await postgres_pool.executemany( - "SELECT update_genre($1, $2, cast($3 as varchar), cast($4 as varchar), cast($5 as varchar));", - ([source, *row] for row in rows), - ) + await postgres.close() + mysql.close() async def update_fl_books_genres( @@ -679,11 +703,11 @@ async def update_fl_books_genres( ): raise Retry(defer=60) - postgres_pool, mysql_pool = await get_db_pools() + postgres, mysql = await get_db_cons() - source = await get_source(postgres_pool) + source = await get_source(postgres) - await postgres_pool.execute( + await postgres.execute( """ CREATE OR REPLACE FUNCTION update_book_sequence(source_ smallint, book_ integer, genre_ integer) RETURNS void AS $$ DECLARE @@ -703,15 +727,17 @@ async def update_fl_books_genres( """ ) - async with mysql_pool.acquire() as conn: - async with conn.cursor(aiomysql.SSCursor) as cursor: - await cursor.execute("SELECT BookId, GenreId FROM libgenre;") + async with mysql.cursor(aiomysql.SSCursor) as cursor: + await cursor.execute("SELECT BookId, GenreId FROM libgenre;") - while rows := await cursor.fetchmany(4096): - await postgres_pool.executemany( - "SELECT update_book_sequence($1, $2, $3);", - ((source, *row) for row in rows), - ) + while rows := await cursor.fetchmany(4096): + await postgres.executemany( + "SELECT update_book_sequence($1, $2, $3);", + ((source, *row) for row in rows), + ) + + await postgres.close() + mysql.close() async def run_fl_update(ctx: dict, *args, **kwargs) -> bool: