Fix db connections close

This commit is contained in:
2022-03-18 21:47:21 +03:00
parent 4570250fd1
commit f5b7eee1a3

View File

@@ -59,8 +59,8 @@ async def import_fl_dump(ctx: dict, filename: str, *args, **kwargs):
)
async def get_db_pools() -> tuple[asyncpg.Pool, aiomysql.Pool]:
posgres_pool = await asyncpg.create_pool(
async def get_db_cons() -> tuple[asyncpg.Connection, aiomysql.Connection]:
posgres = await asyncpg.connect(
database=env_config.POSTGRES_DB_NAME,
host=env_config.POSTGRES_HOST,
port=env_config.POSTGRES_PORT,
@@ -68,7 +68,7 @@ async def get_db_pools() -> tuple[asyncpg.Pool, aiomysql.Pool]:
password=env_config.POSTGRES_PASSWORD,
)
mysql_pool = await aiomysql.create_pool(
mysql = await aiomysql.connect(
db=env_config.MYSQL_DB_NAME,
host=env_config.MYSQL_HOST,
port=env_config.MYSQL_PORT,
@@ -76,23 +76,25 @@ async def get_db_pools() -> tuple[asyncpg.Pool, aiomysql.Pool]:
password=env_config.MYSQL_PASSWORD,
)
assert posgres_pool
assert posgres
return posgres_pool, mysql_pool
return posgres, mysql
async def get_source(postgres_pool: asyncpg.Pool) -> int:
source_row = await postgres_pool.fetchrow(
async def get_source(postgres: asyncpg.Connection) -> int:
source_row = await postgres.fetchrow(
"SELECT id FROM sources WHERE name = 'flibusta';"
)
if not source_row:
await postgres_pool.execute("INSERT INTO sources (name) VALUES ('flibusta');")
await postgres.execute("INSERT INTO sources (name) VALUES ('flibusta');")
source_row = await postgres_pool.fetchrow(
source_row = await postgres.fetchrow(
"SELECT id FROM sources WHERE name = 'flibusta';"
)
assert source_row
return source_row["id"]
@@ -104,9 +106,9 @@ async def update_fl_authors(ctx: dict, *args, prefix: Optional[str] = None, **kw
):
raise Retry(defer=60)
postgres_pool, mysql_pool = await get_db_pools()
postgres, mysql = await get_db_cons()
source = await get_source(postgres_pool)
source = await get_source(postgres)
def prepare_author(row: list):
return [
@@ -117,7 +119,7 @@ async def update_fl_authors(ctx: dict, *args, prefix: Optional[str] = None, **kw
remove_wrong_ch(row[3]),
]
await postgres_pool.execute(
await postgres.execute(
"""
CREATE OR REPLACE FUNCTION update_author(
source_ smallint, remote_id_ int, first_name_ varchar, last_name_ varchar, middle_name_ varchar
@@ -136,18 +138,20 @@ async def update_fl_authors(ctx: dict, *args, prefix: Optional[str] = None, **kw
"""
)
async with mysql_pool.acquire() as conn:
async with conn.cursor(aiomysql.SSCursor) as cursor:
async with mysql.cursor(aiomysql.SSCursor) as cursor:
await cursor.execute(
"SELECT AvtorId, FirstName, LastName, MiddleName FROM libavtorname;"
)
while rows := await cursor.fetchmany(4096):
await postgres_pool.executemany(
await postgres.executemany(
"SELECT update_author($1, $2, cast($3 as varchar), cast($4 as varchar), cast($5 as varchar));",
(prepare_author(row) for row in rows),
)
await postgres.close()
mysql.close()
async def update_fl_books(ctx: dict, *args, prefix: Optional[str] = None, **kwargs):
arq_pool: ArqRedis = ctx["arq_pool"]
@@ -157,9 +161,9 @@ async def update_fl_books(ctx: dict, *args, prefix: Optional[str] = None, **kwar
):
raise Retry(defer=60)
postgres_pool, mysql_pool = await get_db_pools()
postgres, mysql = await get_db_cons()
source = await get_source(postgres_pool)
source = await get_source(postgres)
replace_dict = {"ru-": "ru", "ru~": "ru"}
@@ -179,7 +183,7 @@ async def update_fl_books(ctx: dict, *args, prefix: Optional[str] = None, **kwar
row[5] == "1",
]
await postgres_pool.execute(
await postgres.execute(
"""
CREATE OR REPLACE FUNCTION update_book(
source_ smallint, remote_id_ int, title_ varchar, lang_ varchar,
@@ -200,18 +204,20 @@ async def update_fl_books(ctx: dict, *args, prefix: Optional[str] = None, **kwar
"""
)
async with mysql_pool.acquire() as conn:
async with conn.cursor(aiomysql.SSCursor) as cursor:
async with mysql.cursor(aiomysql.SSCursor) as cursor:
await cursor.execute(
"SELECT BookId, Title, Lang, FileType, Time, Deleted FROM libbook;"
)
while rows := await cursor.fetchmany(1024):
await postgres_pool.executemany(
await postgres.executemany(
"SELECT update_book($1, $2, cast($3 as varchar), cast($4 as varchar), cast($5 as varchar), $6, $7);",
(prepare_book(row) for row in rows),
)
await postgres.close()
mysql.close()
async def update_fl_books_authors(
ctx: dict, *arsg, prefix: Optional[str] = None, **kwargs
@@ -229,11 +235,11 @@ async def update_fl_books_authors(
):
raise Retry(defer=60)
postgres_pool, mysql_pool = await get_db_pools()
postgres, mysql = await get_db_cons()
source = await get_source(postgres_pool)
source = await get_source(postgres)
await postgres_pool.execute(
await postgres.execute(
"""
CREATE OR REPLACE FUNCTION update_book_author(source_ smallint, book_ integer, author_ integer) RETURNS void AS $$
DECLARE
@@ -253,16 +259,18 @@ async def update_fl_books_authors(
"""
)
async with mysql_pool.acquire() as conn:
async with conn.cursor(aiomysql.SSCursor) as cursor:
async with mysql.cursor(aiomysql.SSCursor) as cursor:
await cursor.execute("SELECT BookId, AvtorId FROM libavtor;")
while rows := await cursor.fetchmany(4096):
await postgres_pool.executemany(
await postgres.executemany(
"SELECT update_book_author($1, $2, $3);",
((source, *row) for row in rows),
)
await postgres.close()
mysql.close()
async def update_fl_translations(
ctx: dict, *args, prefix: Optional[str] = None, **kwargs
@@ -280,11 +288,11 @@ async def update_fl_translations(
):
raise Retry(defer=60)
postgres_pool, mysql_pool = await get_db_pools()
postgres, mysql = await get_db_cons()
source = await get_source(postgres_pool)
source = await get_source(postgres)
await postgres_pool.execute(
await postgres.execute(
"""
CREATE OR REPLACE FUNCTION update_translation(source_ smallint, book_ integer, author_ integer, position_ smallint) RETURNS void AS $$
DECLARE
@@ -306,19 +314,21 @@ async def update_fl_translations(
"""
)
async with mysql_pool.acquire() as conn:
async with conn.cursor(aiomysql.SSCursor) as cursor:
async with mysql.cursor(aiomysql.SSCursor) as cursor:
await cursor.execute(
"SELECT BookId, TranslatorId, Pos FROM libtranslator "
"WHERE BookId IN (SELECT BookId FROM libbook);"
)
while rows := await cursor.fetchmany(4096):
await postgres_pool.executemany(
await postgres.executemany(
"SELECT update_translation($1, $2, $3, $4)",
((source, *row) for row in rows),
)
await postgres.close()
mysql.close()
async def update_fl_sequences(ctx: dict, *args, prefix: Optional[str] = None, **kwargs):
arq_pool: ArqRedis = ctx["arq_pool"]
@@ -332,9 +342,9 @@ async def update_fl_sequences(ctx: dict, *args, prefix: Optional[str] = None, **
):
raise Retry(defer=60)
postgres_pool, mysql_pool = await get_db_pools()
postgres, mysql = await get_db_cons()
source = await get_source(postgres_pool)
source = await get_source(postgres)
def prepare_sequence(row: list):
return [
@@ -343,7 +353,7 @@ async def update_fl_sequences(ctx: dict, *args, prefix: Optional[str] = None, **
remove_wrong_ch(row[1]),
]
await postgres_pool.execute(
await postgres.execute(
"""
CREATE OR REPLACE FUNCTION update_sequences(source_ smallint, remote_id_ int, name_ varchar) RETURNS void AS $$
BEGIN
@@ -358,16 +368,18 @@ async def update_fl_sequences(ctx: dict, *args, prefix: Optional[str] = None, **
"""
)
async with mysql_pool.acquire() as conn:
async with conn.cursor(aiomysql.SSCursor) as cursor:
async with mysql.cursor(aiomysql.SSCursor) as cursor:
await cursor.execute("SELECT SeqId, SeqName FROM libseqname;")
while rows := await cursor.fetchmany(4096):
await postgres_pool.executemany(
await postgres.executemany(
"SELECT update_sequences($1, $2, cast($3 as varchar));",
(prepare_sequence(row) for row in rows),
)
await postgres.close()
mysql.close()
async def update_fl_sequences_info(
ctx: dict, *args, prefix: Optional[str] = None, **kwargs
@@ -385,11 +397,11 @@ async def update_fl_sequences_info(
):
raise Retry(defer=60)
postgres_pool, mysql_pool = await get_db_pools()
postgres, mysql = await get_db_cons()
source = await get_source(postgres_pool)
source = await get_source(postgres)
await postgres_pool.execute(
await postgres.execute(
"""
CREATE OR REPLACE FUNCTION update_book_sequence(source_ smallint, book_ integer, sequence_ integer, position_ smallint) RETURNS void AS $$
DECLARE
@@ -410,8 +422,7 @@ async def update_fl_sequences_info(
"""
)
async with mysql_pool.acquire() as conn:
async with conn.cursor(aiomysql.SSCursor) as cursor:
async with mysql.cursor(aiomysql.SSCursor) as cursor:
await cursor.execute(
"SELECT BookId, SeqId, level FROM libseq "
"WHERE "
@@ -420,11 +431,14 @@ async def update_fl_sequences_info(
)
while rows := await cursor.fetchmany(4096):
await postgres_pool.executemany(
await postgres.executemany(
"SELECT update_book_sequence($1, $2, $3, $4);",
([source, *row] for row in rows),
)
await postgres.close()
mysql.close()
async def update_fl_book_annotations(
ctx: dict, *args, prefix: Optional[str] = None, **kwargs
@@ -441,11 +455,11 @@ async def update_fl_book_annotations(
):
raise Retry(defer=60)
postgres_pool, mysql_pool = await get_db_pools()
postgres, mysql = await get_db_cons()
source = await get_source(postgres_pool)
source = await get_source(postgres)
await postgres_pool.execute(
await postgres.execute(
"""
CREATE OR REPLACE FUNCTION update_book_annotation(source_ smallint, book_ integer, title_ varchar, text_ text) RETURNS void AS $$
DECLARE
@@ -472,19 +486,21 @@ async def update_fl_book_annotations(
fix_annotation_text(row[2]),
]
async with mysql_pool.acquire() as conn:
async with conn.cursor(aiomysql.SSCursor) as cursor:
async with mysql.cursor(aiomysql.SSCursor) as cursor:
await cursor.execute(
"SELECT BookId, Title, Body FROM libbannotations "
"WHERE BookId IN (SELECT BookId FROM libbook);"
)
while rows := await cursor.fetchmany(4096):
await postgres_pool.executemany(
await postgres.executemany(
"SELECT update_book_annotation($1, $2, cast($3 as varchar), cast($4 as text));",
(fix_annotation(row) for row in rows),
)
await postgres.close()
mysql.close()
async def update_fl_book_annotations_pic(
ctx: dict, *args, prefix: Optional[str] = None, **kwargs
@@ -501,19 +517,18 @@ async def update_fl_book_annotations_pic(
):
raise Retry(defer=60)
postgres_pool, mysql_pool = await get_db_pools()
postgres, mysql = await get_db_cons()
source = await get_source(postgres_pool)
source = await get_source(postgres)
def fix_link(row):
return [source, row[0], f"{env_config.FL_BASE_URL}/i/{row[1]}"]
async with mysql_pool.acquire() as conn:
async with conn.cursor(aiomysql.SSCursor) as cursor:
async with mysql.cursor(aiomysql.SSCursor) as cursor:
await cursor.execute("SELECT BookId, File FROM libbpics;")
while rows := await cursor.fetchmany(4096):
await postgres_pool.executemany(
await postgres.executemany(
"UPDATE book_annotations "
"SET file = cast($3 as varchar) "
"FROM (SELECT id FROM books WHERE source = $1 AND remote_id = $2) as books "
@@ -521,6 +536,9 @@ async def update_fl_book_annotations_pic(
(fix_link(row) for row in rows),
)
await postgres.close()
mysql.close()
async def update_fl_author_annotations(
ctx: dict, *args, prefix: Optional[str] = None, **kwargs
@@ -537,11 +555,11 @@ async def update_fl_author_annotations(
):
raise Retry(defer=60)
postgres_pool, mysql_pool = await get_db_pools()
postgres, mysql = await get_db_cons()
source = await get_source(postgres_pool)
source = await get_source(postgres)
await postgres_pool.execute(
await postgres.execute(
"""
CREATE OR REPLACE FUNCTION update_author_annotation(source_ smallint, author_ integer, title_ varchar, text_ text) RETURNS void AS $$
DECLARE
@@ -568,16 +586,18 @@ async def update_fl_author_annotations(
fix_annotation_text(row[2]),
]
async with mysql_pool.acquire() as conn:
async with conn.cursor(aiomysql.SSCursor) as cursor:
async with mysql.cursor(aiomysql.SSCursor) as cursor:
await cursor.execute("SELECT AvtorId, Title, Body FROM libaannotations;")
while rows := await cursor.fetchmany(4096):
await postgres_pool.executemany(
await postgres.executemany(
"SELECT update_author_annotation($1, $2, cast($3 as varchar), cast($4 as text));",
(fix_annotation(row) for row in rows),
)
await postgres.close()
mysql.close()
async def update_fl_author_annotations_pics(
ctx: dict, *args, prefix: Optional[str] = None, **kwargs
@@ -594,19 +614,18 @@ async def update_fl_author_annotations_pics(
):
raise Retry(defer=60)
postgres_pool, mysql_pool = await get_db_pools()
postgres, mysql = await get_db_cons()
source = await get_source(postgres_pool)
source = await get_source(postgres)
def fix_link(row):
return [source, row[0], f"{env_config.FL_BASE_URL}/ia/{row[1]}"]
async with mysql_pool.acquire() as conn:
async with conn.cursor(aiomysql.SSCursor) as cursor:
async with mysql.cursor(aiomysql.SSCursor) as cursor:
await cursor.execute("SELECT AvtorId, File FROM libapics;")
while rows := await cursor.fetchmany(4096):
await postgres_pool.executemany(
await postgres.executemany(
"UPDATE author_annotations "
"SET file = cast($3 as varchar) "
"FROM (SELECT id FROM authors WHERE source = $1 AND remote_id = $2) as authors "
@@ -614,6 +633,9 @@ async def update_fl_author_annotations_pics(
(fix_link(row) for row in rows),
)
await postgres.close()
mysql.close()
async def update_fl_genres(ctx: dict, *args, prefix: Optional[str] = None, **kwargs):
arq_pool: ArqRedis = ctx["arq_pool"]
@@ -627,11 +649,11 @@ async def update_fl_genres(ctx: dict, *args, prefix: Optional[str] = None, **kwa
):
raise Retry(defer=60)
postgres_pool, mysql_pool = await get_db_pools()
postgres, mysql = await get_db_cons()
source = await get_source(postgres_pool)
source = await get_source(postgres)
await postgres_pool.execute(
await postgres.execute(
"""
CREATE OR REPLACE FUNCTION update_genre(
source_ smallint, remote_id_ int, code_ varchar, description_ varchar, meta_ varchar
@@ -650,18 +672,20 @@ async def update_fl_genres(ctx: dict, *args, prefix: Optional[str] = None, **kwa
"""
)
async with mysql_pool.acquire() as conn:
async with conn.cursor(aiomysql.SSCursor) as cursor:
async with mysql.cursor(aiomysql.SSCursor) as cursor:
await cursor.execute(
"SELECT GenreId, GenreCode, GenreDesc, GenreMeta FROM libgenrelist;"
)
while rows := await cursor.fetchmany(4096):
await postgres_pool.executemany(
await postgres.executemany(
"SELECT update_genre($1, $2, cast($3 as varchar), cast($4 as varchar), cast($5 as varchar));",
([source, *row] for row in rows),
)
await postgres.close()
mysql.close()
async def update_fl_books_genres(
ctx: dict, *args, prefix: Optional[str] = None, **kwargs
@@ -679,11 +703,11 @@ async def update_fl_books_genres(
):
raise Retry(defer=60)
postgres_pool, mysql_pool = await get_db_pools()
postgres, mysql = await get_db_cons()
source = await get_source(postgres_pool)
source = await get_source(postgres)
await postgres_pool.execute(
await postgres.execute(
"""
CREATE OR REPLACE FUNCTION update_book_sequence(source_ smallint, book_ integer, genre_ integer) RETURNS void AS $$
DECLARE
@@ -703,16 +727,18 @@ async def update_fl_books_genres(
"""
)
async with mysql_pool.acquire() as conn:
async with conn.cursor(aiomysql.SSCursor) as cursor:
async with mysql.cursor(aiomysql.SSCursor) as cursor:
await cursor.execute("SELECT BookId, GenreId FROM libgenre;")
while rows := await cursor.fetchmany(4096):
await postgres_pool.executemany(
await postgres.executemany(
"SELECT update_book_sequence($1, $2, $3);",
((source, *row) for row in rows),
)
await postgres.close()
mysql.close()
async def run_fl_update(ctx: dict, *args, **kwargs) -> bool:
prefix = str(int(time.time()) // (5 * 60))