@@ -1,50 +1,49 @@
from typing import Optional
import asyncio
import platform
from typing import Optional
import asyncpg
import aiomysql
from aiologger import Logger
import aiomysql
import asyncpg
from core . config import env_config
from app . services . updaters . base import BaseUpdater
from app . services . webhook import WebhookSender
from core . config import env_config
async def run ( cmd ) - > tuple [ bytes , bytes , Optional [ int ] ] :
proc = await asyncio . create_subprocess_shell (
cmd ,
stdout = asyncio . subprocess . PIPE ,
stderr = asyncio . subprocess . PIPE )
cmd , stdout = asyncio . subprocess . PIPE , stderr = asyncio . subprocess . PIPE
)
stdout , stderr = await proc . communicate ( )
return stdout , stderr , proc . returncode
def remove_wrong_ch ( s : str ) :
return s . replace ( " ; " , " " ) . replace ( " \n " , " " ) . replace ( ' ё ' , ' е ' )
return s . replace ( " ; " , " " ) . replace ( " \n " , " " ) . replace ( " ё " , " е " )
def remove_dots ( s : str ) :
return s . replace ( ' . ' , ' ' )
return s . replace ( " . " , " " )
class FlUpdater ( BaseUpdater ) :
SOURCE : int
FILES = [
' lib.libavtor.sql' ,
' lib.libbook.sql' ,
' lib.libavtorname.sql' ,
' lib.libtranslator.sql' ,
' lib.libseqname.sql' ,
' lib.libseq.sql' ,
' lib.libgenre.sql' ,
' lib.libgenrelist.sql' ,
' lib.b.annotations.sql' ,
' lib.b.annotations_pics.sql' ,
' lib.a.annotations.sql' ,
' lib.a.annotations_pics.sql' ,
" lib.libavtor.sql" ,
" lib.libbook.sql" ,
" lib.libavtorname.sql" ,
" lib.libtranslator.sql" ,
" lib.libseqname.sql" ,
" lib.libseq.sql" ,
" lib.libgenre.sql" ,
" lib.libgenrelist.sql" ,
" lib.b.annotations.sql" ,
" lib.b.annotations_pics.sql" ,
" lib.a.annotations.sql" ,
" lib.a.annotations_pics.sql" ,
]
postgres_pool : asyncpg . Pool
@@ -59,7 +58,7 @@ class FlUpdater(BaseUpdater):
logger : Logger
async def log ( self , message ) :
if ' windows' in self . platform . lower ( ) :
if " windows" in self . platform . lower ( ) :
print ( message )
else :
await self . logger . info ( message )
@@ -86,7 +85,7 @@ class FlUpdater(BaseUpdater):
result = await run (
f " wget -O - { env_config . FL_BASE_URL } /sql/ { filename } .gz | gunzip | "
f " mysql -h { env_config . MYSQL_HOST } -u { env_config . MYSQL_USER } "
f " -p\ "{ env_config . MYSQL_PASSWORD } \ " { env_config . MYSQL_DB_NAME } "
f ' -p "{ env_config . MYSQL_PASSWORD } " { env_config . MYSQL_DB_NAME } '
)
await self . log ( f " Imported { filename } : { result } . " )
@@ -113,7 +112,7 @@ class FlUpdater(BaseUpdater):
self . postgres_pool = posgres_pool
async def _set_source ( self ) :
await self . log ( ' Set source...' )
await self . log ( " Set source..." )
source_row = await self . postgres_pool . fetchrow (
" SELECT id FROM sources WHERE name = ' flibusta ' ; "
@@ -128,7 +127,7 @@ class FlUpdater(BaseUpdater):
" SELECT id FROM sources WHERE name = ' flibusta ' ; "
)
self . SOURCE = source_row [ ' id' ]
self . SOURCE = source_row [ " id" ]
await self . log ( " Source has set! " )
@@ -139,7 +138,7 @@ class FlUpdater(BaseUpdater):
row [ 0 ] ,
remove_wrong_ch ( row [ 1 ] ) ,
remove_wrong_ch ( row [ 2 ] ) ,
remove_wrong_ch ( row [ 3 ] )
remove_wrong_ch ( row [ 3 ] ) ,
]
await self . log ( " Update authors... " )
@@ -150,24 +149,21 @@ class FlUpdater(BaseUpdater):
" SELECT AvtorId, FirstName, LastName, MiddleName FROM libavtorname; "
)
while ( rows := await cursor . fetchmany ( 32 ) ) :
while rows := await cursor . fetchmany ( 32 ) :
await self . postgres_pool . executemany (
" INSERT INTO authors (source, remote_id, first_name, last_name, middle_name) "
" VALUES ($1, $2, cast($3 as varchar), cast($4 as varchar), cast($5 as varchar)) "
" ON CONFLICT (source, remote_id) "
" DO UPDATE SET first_name = EXCLUDED.first_name, last_name = EXCLUDED.last_name, middle_name = EXCLUDED.middle_name; " ,
[ prepare_author ( row ) for row in rows ]
[ prepare_author ( row ) for row in rows ] ,
)
self . authors_updated_event . set ( )
await self . log ( " Authors updated! " )
async def _update_books ( self ) :
replace_dict = {
" ru- " : " ru " ,
" ru~ " : " ru "
}
replace_dict = { " ru- " : " ru " , " ru~ " : " ru " }
def fix_lang ( lang : str ) - > str :
lower_lang = lang . lower ( )
@@ -182,7 +178,7 @@ class FlUpdater(BaseUpdater):
fix_lang ( row [ 2 ] ) ,
row [ 3 ] ,
row [ 4 ] ,
row [ 5 ] == ' 1 '
row [ 5 ] == " 1 " ,
]
await self . log ( " Update books... " )
@@ -193,16 +189,16 @@ class FlUpdater(BaseUpdater):
" SELECT BookId, Title, Lang, FileType, Time, Deleted FROM libbook; "
)
while ( rows := await cursor . fetchmany ( 32 ) ) :
while rows := await cursor . fetchmany ( 32 ) :
await self . postgres_pool . executemany (
" INSERT INTO books (source, remote_id, title, lang, file_type, uploaded, is_deleted) "
" VALUES ($1, $2, cast($3 as varchar), cast($4 as varchar), cast($5 as varchar), $6, $7) "
" ON CONFLICT (source, remote_id) "
" DO UPDATE SET title = EXCLUDED.title, lang = EXCLUDED.lang, file_type = EXCLUDED.file_type, "
" uploaded = EXCLUDED.uploaded, is_deleted = EXCLUDED.is_deleted; " ,
[ prepare_book ( row ) for row in rows ]
[ prepare_book ( row ) for row in rows ] ,
)
self . books_updated_event . set ( )
await self . log ( " Books updated! " )
@@ -215,18 +211,16 @@ class FlUpdater(BaseUpdater):
async with self . mysql_pool . acquire ( ) as conn :
async with conn . cursor ( ) as cursor :
await cursor . execute (
" SELECT BookId, AvtorId FROM libavtor; "
)
await cursor . execute ( " SELECT BookId, AvtorId FROM libavtor; " )
while ( rows := await cursor . fetchmany ( 32 ) ) :
while rows := await cursor . fetchmany ( 32 ) :
await self . postgres_pool . executemany (
" INSERT INTO book_authors (book, author) "
" SELECT "
" (SELECT id FROM books WHERE source = $1 AND remote_id = $2), "
" (SELECT id FROM authors WHERE source = $1 AND remote_id = $3) "
" ON CONFLICT (book, author) DO NOTHING; " ,
[ ( self . SOURCE , * row ) for row in rows ]
[ ( self . SOURCE , * row ) for row in rows ] ,
)
await self . log ( " Books_authors updated! " )
@@ -244,7 +238,7 @@ class FlUpdater(BaseUpdater):
" WHERE BookId IN (SELECT BookId FROM libbook); "
)
while ( rows := await cursor . fetchmany ( 32 ) ) :
while rows := await cursor . fetchmany ( 32 ) :
await self . postgres_pool . executemany (
" INSERT INTO translations (book, author, position) "
" SELECT "
@@ -253,7 +247,7 @@ class FlUpdater(BaseUpdater):
" $4 "
" ON CONFLICT (book, author) "
" DO UPDATE SET position = EXCLUDED.position; " ,
[ ( self . SOURCE , * row ) for row in rows ]
[ ( self . SOURCE , * row ) for row in rows ] ,
)
await self . log ( " Translations updated! " )
@@ -270,17 +264,15 @@ class FlUpdater(BaseUpdater):
async with self . mysql_pool . acquire ( ) as conn :
async with conn . cursor ( ) as cursor :
await cursor . execute (
" SELECT SeqId, SeqName FROM libseqname; "
)
await cursor . execute ( " SELECT SeqId, SeqName FROM libseqname; " )
while ( rows := await cursor . fetchmany ( 32 ) ) :
while rows := await cursor . fetchmany ( 32 ) :
await self . postgres_pool . executemany (
" INSERT INTO sequences (source, remote_id, name) "
" VALUES ($1, $2, cast($3 as varchar)) "
" ON CONFLICT (source, remote_id) "
" DO UPDATE SET name = EXCLUDED.name; " ,
[ prepare_sequence ( row ) for row in rows ]
[ prepare_sequence ( row ) for row in rows ] ,
)
self . sequences_updated_event . set ( )
@@ -302,7 +294,7 @@ class FlUpdater(BaseUpdater):
" SeqId IN (SELECT SeqId FROM libseqname); "
)
while ( rows := await cursor . fetchmany ( 32 ) ) :
while rows := await cursor . fetchmany ( 32 ) :
await self . postgres_pool . executemany (
" INSERT INTO book_sequences (book, sequence, position) "
" SELECT "
@@ -311,7 +303,7 @@ class FlUpdater(BaseUpdater):
" $4 "
" ON CONFLICT (book, sequence) "
" DO UPDATE SET position = EXCLUDED.position; " ,
[ [ self . SOURCE , * row ] for row in rows ]
[ [ self . SOURCE , * row ] for row in rows ] ,
)
await self . log ( " Book_sequences updated! " )
@@ -328,7 +320,7 @@ class FlUpdater(BaseUpdater):
" WHERE BookId IN (SELECT BookId FROM libbook); "
)
while ( rows := await cursor . fetchmany ( 32 ) ) :
while rows := await cursor . fetchmany ( 32 ) :
await self . postgres_pool . executemany (
" INSERT INTO book_annotations (book, title, text) "
" SELECT "
@@ -336,7 +328,7 @@ class FlUpdater(BaseUpdater):
" $3, $4 "
" ON CONFLICT (book) "
" DO UPDATE SET title = EXCLUDED.title, text = EXCLUDED.text; " ,
[ [ self . SOURCE , * row ] for row in rows ]
[ [ self . SOURCE , * row ] for row in rows ] ,
)
await self . log ( " Book_annotations updated! " )
@@ -348,17 +340,15 @@ class FlUpdater(BaseUpdater):
async with self . mysql_pool . acquire ( ) as conn :
async with conn . cursor ( ) as cursor :
await cursor . execute (
" SELECT BookId, File FROM libbpics; "
)
await cursor . execute ( " SELECT BookId, File FROM libbpics; " )
while ( rows := await cursor . fetchmany ( 32 ) ) :
while rows := await cursor . fetchmany ( 32 ) :
await self . postgres_pool . executemany (
" UPDATE book_annotations "
" SET file = cast($3 as varchar) "
" FROM (SELECT id FROM books WHERE source = $1 AND remote_id = $2) as books "
" WHERE book = books.id; " ,
[ [ self . SOURCE , * row ] for row in rows ]
[ [ self . SOURCE , * row ] for row in rows ] ,
)
await self . log ( " Book_annotation_pics updated! " )
@@ -374,7 +364,7 @@ class FlUpdater(BaseUpdater):
" SELECT AvtorId, Title, Body FROM libaannotations; "
)
while ( rows := await cursor . fetchmany ( 32 ) ) :
while rows := await cursor . fetchmany ( 32 ) :
await self . postgres_pool . executemany (
" INSERT INTO author_annotations (author, title, text) "
" SELECT "
@@ -382,7 +372,7 @@ class FlUpdater(BaseUpdater):
" $3, $4 "
" ON CONFLICT (author) "
" DO UPDATE SET title = EXCLUDED.title, text = EXCLUDED.text; " ,
[ [ self . SOURCE , * row ] for row in rows ]
[ [ self . SOURCE , * row ] for row in rows ] ,
)
await self . log ( " Author_annotation_updated! " )
@@ -394,17 +384,15 @@ class FlUpdater(BaseUpdater):
async with self . mysql_pool . acquire ( ) as conn :
async with conn . cursor ( ) as cursor :
await cursor . execute (
" SELECT AvtorId, File FROM libapics; "
)
await cursor . execute ( " SELECT AvtorId, File FROM libapics; " )
while ( rows := await cursor . fetchmany ( 32 ) ) :
while rows := await cursor . fetchmany ( 32 ) :
await self . postgres_pool . executemany (
" UPDATE author_annotations "
" SET file = cast($3 as varchar) "
" FROM (SELECT id FROM authors WHERE source = $1 AND remote_id = $2) as authors "
" WHERE author = authors.id; " ,
[ [ self . SOURCE , * row ] for row in rows ]
[ [ self . SOURCE , * row ] for row in rows ] ,
)
await self . log ( " Author_annotatioins_pic updated! " )
@@ -418,13 +406,13 @@ class FlUpdater(BaseUpdater):
" SELECT GenreId, GenreCode, GenreDesc, GenreMeta FROM libgenrelist; "
)
while ( rows := await cursor . fetchmany ( 32 ) ) :
while rows := await cursor . fetchmany ( 32 ) :
await self . postgres_pool . executemany (
" INSERT INTO genres (source, remote_id, code, description, meta) "
" VALUES ($1, $2, cast($3 as varchar), cast($4 as varchar), cast($5 as varchar)) "
" ON CONFLICT (source, remote_id) "
" DO UPDATE SET code = EXCLUDED.code, description = EXCLUDED.description, meta = EXCLUDED.meta; " ,
[ [ self . SOURCE , * row ] for row in rows ]
[ [ self . SOURCE , * row ] for row in rows ] ,
)
await self . log ( " Genres updated! " )
@@ -437,18 +425,16 @@ class FlUpdater(BaseUpdater):
async with self . mysql_pool . acquire ( ) as conn :
async with conn . cursor ( ) as cursor :
await cursor . execute (
" SELECT BookId, GenreId FROM libgenre; "
)
await cursor . execute ( " SELECT BookId, GenreId FROM libgenre; " )
while ( rows := await cursor . fetchmany ( 32 ) ) :
while rows := await cursor . fetchmany ( 32 ) :
await self . postgres_pool . executemany (
" INSERT INTO book_genres (book, genre) "
" SELECT "
" (SELECT id FROM books WHERE source = $1 AND remote_id = $2), "
" (SELECT id FROM genres WHERE source = $1 AND remote_id = $3) "
" ON CONFLICT (book, author) DO NOTHING; " ,
[ ( self . SOURCE , * row ) for row in rows ]
[ ( self . SOURCE , * row ) for row in rows ] ,
)
await self . log ( " Book_genres updated! " )
@@ -461,9 +447,7 @@ class FlUpdater(BaseUpdater):
await self . _drop_tables ( )
await asyncio . gather (
* [ self . _import_dump ( filename ) for filename in self . FILES ]
)
await asyncio . gather ( * [ self . _import_dump ( filename ) for filename in self . FILES ] )
await self . _set_source ( )
@@ -482,7 +466,7 @@ class FlUpdater(BaseUpdater):
self . _update_book_annotations ( ) ,
self . _update_author_annotations ( ) ,
self . _update_genres ( ) ,
self . _update_books_genres ( )
self . _update_books_genres ( ) ,
)
await WebhookSender . send ( )