Add simple rust implementation

This commit is contained in:
2022-09-18 20:10:59 +03:00
parent c3d48add7e
commit 95f5a31570
29 changed files with 3400 additions and 1892 deletions

View File

@@ -1,11 +0,0 @@
from fastapi import Security, HTTPException, status
from core.auth import default_security
from core.config import env_config
async def check_token(api_key: str = Security(default_security)):
if api_key != env_config.API_KEY:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Wrong api key!"
)

View File

@@ -1,8 +0,0 @@
from enum import Enum
class UpdaterTypes(Enum):
FL = "fl"
UPDATERS: dict[UpdaterTypes, str] = {UpdaterTypes.FL: "run_fl_update"}

View File

@@ -1,865 +0,0 @@
from enum import Enum
import logging
import time
from typing import Optional
import aiomysql
from arq.connections import ArqRedis
from arq.worker import Retry
import asyncpg
import httpx
from app.services.updaters.utils.cmd import run_cmd
from app.services.updaters.utils.tasks import is_jobs_complete
from app.services.updaters.utils.text import remove_wrong_ch, fix_annotation_text
from core.config import env_config
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
logger.addHandler(ch)
class JobId(Enum):
import_libavtor = "import_libavtor"
import_libbook = "import_libbook"
import_libavtorname = "import_libavtorname"
import_libtranslator = "import_libtranslator"
import_libseqname = "import_libseqname"
import_libseq = "import_libseq"
import_libgenre = "import_libgenre"
import_libgenrelist = "import_libgenrelsit"
import_lib_b_annotations = "import_lib_b_annotations"
import_lib_b_annotations_pics = "import_lib_b_annotations_pics"
import_lib_a_annotations = "import_lib_a_annotations"
import_lib_a_annotations_pics = "import_lib_a_annotations_pics"
update_authors = "update_fl_authors"
update_books = "update_fl_books"
update_books_authors = "update_fl_books_authors"
update_translations = "update_fl_translations"
update_sequences = "update_fl_sequences"
update_sequences_info = "update_fl_sequences_info"
update_book_annotations = "update_fl_book_annotations"
update_book_annotations_pic = "update_fl_book_annotations_pic"
update_author_annotations = "update_fl_author_annotations"
update_author_annotations_pics = "update_fl_author_annotations_pics"
update_genres = "update_fl_genres"
update_books_genres = "update_fl_books_genres"
webhook = "update_fl_webhook"
async def import_fl_dump(ctx: dict, filename: str, *args, **kwargs):
stdout, stderr, return_code = await run_cmd(
f"wget -O - {env_config.FL_BASE_URL}/sql/{filename}.gz | gunzip | "
f"mysql -h {env_config.MYSQL_HOST} -u {env_config.MYSQL_USER} "
f'-p"{env_config.MYSQL_PASSWORD}" {env_config.MYSQL_DB_NAME}'
)
if return_code != 0:
raise InterruptedError(stdout, stderr)
async def get_db_cons() -> tuple[asyncpg.Connection, aiomysql.Connection]:
postgres = await asyncpg.connect(
database=env_config.POSTGRES_DB_NAME,
host=env_config.POSTGRES_HOST,
port=env_config.POSTGRES_PORT,
user=env_config.POSTGRES_USER,
password=env_config.POSTGRES_PASSWORD,
)
mysql = await aiomysql.connect(
db=env_config.MYSQL_DB_NAME,
host=env_config.MYSQL_HOST,
port=env_config.MYSQL_PORT,
user=env_config.MYSQL_USER,
password=env_config.MYSQL_PASSWORD,
)
assert postgres
return postgres, mysql
async def get_source(postgres: asyncpg.Connection) -> int:
source_row = await postgres.fetchrow(
"SELECT id FROM sources WHERE name = 'flibusta';"
)
if not source_row:
await postgres.execute("INSERT INTO sources (name) VALUES ('flibusta');")
source_row = await postgres.fetchrow(
"SELECT id FROM sources WHERE name = 'flibusta';"
)
assert source_row
return source_row["id"]
async def update_fl_authors(ctx: dict, *args, prefix: Optional[str] = None, **kwargs):
arq_pool: ArqRedis = ctx["arq_pool"]
is_deps_complete, not_complete_count = await is_jobs_complete(
arq_pool, [JobId.import_libavtorname.value], prefix=prefix
)
if not is_deps_complete:
raise Retry(defer=45 * not_complete_count)
postgres, mysql = await get_db_cons()
source = await get_source(postgres)
def prepare_author(row: list):
return [
source,
row[0],
remove_wrong_ch(row[1]),
remove_wrong_ch(row[2]),
remove_wrong_ch(row[3]),
]
await postgres.execute(
"""
CREATE OR REPLACE FUNCTION update_author(
source_ smallint, remote_id_ int, first_name_ varchar, last_name_ varchar, middle_name_ varchar
) RETURNS void AS $$
BEGIN
IF EXISTS (SELECT * FROM authors WHERE source = source_ AND remote_id = remote_id_) THEN
UPDATE authors SET first_name = first_name_, last_name = last_name_, middle_name = middle_name_
WHERE source = source_ AND remote_id = remote_id_;
RETURN;
END IF;
INSERT INTO authors (source, remote_id, first_name, last_name, middle_name)
VALUES (source_, remote_id_, first_name_, last_name_, middle_name_);
END;
$$ LANGUAGE plpgsql;
"""
)
async with mysql.cursor(aiomysql.SSCursor) as cursor:
await cursor.execute(
"SELECT AvtorId, FirstName, LastName, MiddleName FROM libavtorname;"
)
while rows := await cursor.fetchmany(4096):
await postgres.executemany(
"SELECT update_author($1, $2, cast($3 as varchar), cast($4 as varchar), cast($5 as varchar));",
(prepare_author(row) for row in rows),
)
await postgres.close()
mysql.close()
async def update_fl_books(ctx: dict, *args, prefix: Optional[str] = None, **kwargs):
arq_pool: ArqRedis = ctx["arq_pool"]
is_deps_complete, not_complete_count = await is_jobs_complete(
arq_pool, [JobId.import_libbook.value], prefix=prefix
)
if not is_deps_complete:
raise Retry(defer=45 * not_complete_count)
postgres, mysql = await get_db_cons()
source = await get_source(postgres)
replace_dict = {"ru-": "ru", "ru~": "ru"}
def fix_lang(lang: str) -> str:
lower_lang = lang.lower()
replaced_lang = replace_dict.get(lower_lang, lower_lang)
return replaced_lang
def prepare_book(row: list):
return [
source,
row[0],
remove_wrong_ch(row[1]),
fix_lang(row[2]),
row[3],
row[4],
row[5] == "1",
row[6],
]
await postgres.execute(
"""
CREATE OR REPLACE FUNCTION update_book(
source_ smallint, remote_id_ int, title_ varchar, lang_ varchar,
file_type_ varchar, uploaded_ date, is_deleted_ boolean, pages_ int
) RETURNS void AS $$
BEGIN
IF EXISTS (SELECT * FROM books WHERE source = source_ AND remote_id = remote_id_) THEN
UPDATE books SET title = title_, lang = lang_, file_type = file_type_,
uploaded = uploaded_, is_deleted = is_deleted, pages = pages_
WHERE source = source_ AND remote_id = remote_id_;
RETURN;
END IF;
INSERT INTO books (source, remote_id, title, lang, file_type, uploaded, is_deleted, pages)
VALUES (source_, remote_id_, title_, lang_, file_type_, uploaded_, is_deleted_, pages_);
END;
$$ LANGUAGE plpgsql;
"""
)
async with mysql.cursor(aiomysql.SSCursor) as cursor:
await cursor.execute(
"SELECT BookId, Title, Lang, FileType, Time, Deleted, Pages FROM libbook;"
)
while rows := await cursor.fetchmany(1024):
await postgres.executemany(
"SELECT update_book($1, $2, cast($3 as varchar), cast($4 as varchar), cast($5 as varchar), $6, $7, $8);",
(prepare_book(row) for row in rows),
)
await postgres.close()
mysql.close()
async def update_fl_books_authors(
ctx: dict, *arsg, prefix: Optional[str] = None, **kwargs
):
arq_pool: ArqRedis = ctx["arq_pool"]
is_deps_complete, not_complete_count = await is_jobs_complete(
arq_pool,
[
JobId.import_libavtor.value,
JobId.update_authors.value,
JobId.update_books.value,
],
prefix=prefix,
)
if not is_deps_complete:
raise Retry(defer=45 * not_complete_count)
postgres, mysql = await get_db_cons()
source = await get_source(postgres)
await postgres.execute(
"""
CREATE OR REPLACE FUNCTION update_book_author(source_ smallint, book_ integer, author_ integer) RETURNS void AS $$
DECLARE
book_id integer := -1;
author_id integer := -1;
BEGIN
SELECT id INTO book_id FROM books WHERE source = source_ AND remote_id = book_;
SELECT id INTO author_id FROM authors WHERE source = source_ AND remote_id = author_;
IF EXISTS (SELECT * FROM book_authors WHERE book = book_id AND author = author_id) THEN
RETURN;
END IF;
INSERT INTO book_authors (book, author) VALUES (book_id, author_id);
END;
$$ LANGUAGE plpgsql;
"""
)
async with mysql.cursor(aiomysql.SSCursor) as cursor:
await cursor.execute("SELECT BookId, AvtorId FROM libavtor;")
while rows := await cursor.fetchmany(4096):
await postgres.executemany(
"SELECT update_book_author($1, $2, $3);",
((source, *row) for row in rows),
)
await postgres.close()
mysql.close()
async def update_fl_translations(
ctx: dict, *args, prefix: Optional[str] = None, **kwargs
):
arq_pool: ArqRedis = ctx["arq_pool"]
is_deps_complete, not_complete_count = await is_jobs_complete(
arq_pool,
[
JobId.import_libtranslator.value,
JobId.update_authors.value,
JobId.update_books.value,
],
prefix=prefix,
)
if not is_deps_complete:
raise Retry(defer=45 * not_complete_count)
postgres, mysql = await get_db_cons()
source = await get_source(postgres)
await postgres.execute(
"""
CREATE OR REPLACE FUNCTION update_translation(source_ smallint, book_ integer, author_ integer, position_ smallint) RETURNS void AS $$
DECLARE
book_id integer := -1;
author_id integer := -1;
BEGIN
SELECT id INTO book_id FROM books WHERE source = source_ AND remote_id = book_;
SELECT id INTO author_id FROM authors WHERE source = source_ AND remote_id = author_;
IF EXISTS (SELECT * FROM translations WHERE book = book_id AND author = author_id) THEN
UPDATE translations SET position = position_
WHERE book = book_id AND author = author_id;
RETURN;
END IF;
INSERT INTO translations (book, author, position) VALUES (book_id, author_id, position_);
END;
$$ LANGUAGE plpgsql;
"""
)
async with mysql.cursor(aiomysql.SSCursor) as cursor:
await cursor.execute(
"SELECT BookId, TranslatorId, Pos FROM libtranslator "
"WHERE BookId IN (SELECT BookId FROM libbook);"
)
while rows := await cursor.fetchmany(4096):
await postgres.executemany(
"SELECT update_translation($1, $2, $3, $4)",
((source, *row) for row in rows),
)
await postgres.close()
mysql.close()
async def update_fl_sequences(ctx: dict, *args, prefix: Optional[str] = None, **kwargs):
arq_pool: ArqRedis = ctx["arq_pool"]
is_deps_complete, not_complete_count = await is_jobs_complete(
arq_pool,
[
JobId.import_libseqname.value,
],
prefix=prefix,
)
if not is_deps_complete:
raise Retry(defer=45 * not_complete_count)
postgres, mysql = await get_db_cons()
source = await get_source(postgres)
def prepare_sequence(row: list):
return [
source,
row[0],
remove_wrong_ch(row[1]),
]
await postgres.execute(
"""
CREATE OR REPLACE FUNCTION update_sequences(source_ smallint, remote_id_ int, name_ varchar) RETURNS void AS $$
BEGIN
IF EXISTS (SELECT * FROM sequences WHERE source = source_ AND remote_id = remote_id_) THEN
UPDATE sequences SET name = name_ WHERE source = source_ AND remote_id = remote_id_;
RETURN;
END IF;
INSERT INTO sequences (source, remote_id, name) VALUES (source_, remote_id_, name_);
END;
$$ LANGUAGE plpgsql;
"""
)
async with mysql.cursor(aiomysql.SSCursor) as cursor:
await cursor.execute("SELECT SeqId, SeqName FROM libseqname;")
while rows := await cursor.fetchmany(4096):
await postgres.executemany(
"SELECT update_sequences($1, $2, cast($3 as varchar));",
(prepare_sequence(row) for row in rows),
)
await postgres.close()
mysql.close()
async def update_fl_sequences_info(
ctx: dict, *args, prefix: Optional[str] = None, **kwargs
):
arq_pool: ArqRedis = ctx["arq_pool"]
is_deps_complete, not_complete_count = await is_jobs_complete(
arq_pool,
[
JobId.import_libseq.value,
JobId.update_sequences.value,
JobId.update_books.value,
],
prefix=prefix,
)
if not is_deps_complete:
raise Retry(defer=45 * not_complete_count)
postgres, mysql = await get_db_cons()
source = await get_source(postgres)
await postgres.execute(
"""
CREATE OR REPLACE FUNCTION update_book_sequence(source_ smallint, book_ integer, sequence_ integer, position_ smallint) RETURNS void AS $$
DECLARE
book_id integer := -1;
sequence_id integer := -1;
BEGIN
SELECT id INTO book_id FROM books WHERE source = source_ AND remote_id = book_;
SELECT id INTO sequence_id FROM sequences WHERE source = source_ AND remote_id = sequence_;
IF EXISTS (SELECT * FROM book_sequences WHERE book = book_id AND sequence = sequence_id) THEN
UPDATE book_sequences SET position = position_ WHERE book = book_id AND sequence = sequence_id;
RETURN;
END IF;
INSERT INTO book_sequences (book, sequence, position) VALUES (book_id, sequence_id, position_);
END;
$$ LANGUAGE plpgsql;
"""
)
async with mysql.cursor(aiomysql.SSCursor) as cursor:
await cursor.execute(
"SELECT BookId, SeqId, level FROM libseq "
"WHERE "
"BookId IN (SELECT BookId FROM libbook) AND "
"SeqId IN (SELECT SeqId FROM libseqname);"
)
while rows := await cursor.fetchmany(4096):
await postgres.executemany(
"SELECT update_book_sequence($1, $2, $3, $4);",
([source, *row] for row in rows),
)
await postgres.close()
mysql.close()
async def update_fl_book_annotations(
ctx: dict, *args, prefix: Optional[str] = None, **kwargs
):
arq_pool: ArqRedis = ctx["arq_pool"]
is_deps_complete, not_complete_count = await is_jobs_complete(
arq_pool,
[
JobId.import_lib_b_annotations.value,
JobId.update_books.value,
],
prefix=prefix,
)
if not is_deps_complete:
raise Retry(defer=45 * not_complete_count)
postgres, mysql = await get_db_cons()
source = await get_source(postgres)
await postgres.execute(
"""
CREATE OR REPLACE FUNCTION update_book_annotation(source_ smallint, book_ integer, title_ varchar, text_ text) RETURNS void AS $$
DECLARE
book_id integer := -1;
BEGIN
SELECT id INTO book_id FROM books WHERE source = source_ AND remote_id = book_;
IF EXISTS (SELECT * FROM book_annotations WHERE book = book_id) THEN
UPDATE book_annotations SET title = title_, text = text_ WHERE book = book_id;
RETURN;
END IF;
INSERT INTO book_annotations (book, title, text) VALUES (book_id, title_, text_);
END;
$$ LANGUAGE plpgsql;
"""
)
def fix_annotation(row) -> list:
return [
source,
row[0],
row[1],
fix_annotation_text(row[2]),
]
async with mysql.cursor(aiomysql.SSCursor) as cursor:
await cursor.execute(
"SELECT BookId, Title, Body FROM libbannotations "
"WHERE BookId IN (SELECT BookId FROM libbook);"
)
while rows := await cursor.fetchmany(4096):
await postgres.executemany(
"SELECT update_book_annotation($1, $2, cast($3 as varchar), cast($4 as text));",
(fix_annotation(row) for row in rows),
)
await postgres.close()
mysql.close()
async def update_fl_book_annotations_pic(
ctx: dict, *args, prefix: Optional[str] = None, **kwargs
):
arq_pool: ArqRedis = ctx["arq_pool"]
is_deps_complete, not_complete_count = await is_jobs_complete(
arq_pool,
[
JobId.import_lib_b_annotations_pics.value,
JobId.update_book_annotations.value,
],
prefix=prefix,
)
if not is_deps_complete:
raise Retry(defer=45 * not_complete_count)
postgres, mysql = await get_db_cons()
source = await get_source(postgres)
def fix_link(row):
return [source, row[0], f"{env_config.FL_BASE_URL}/i/{row[1]}"]
async with mysql.cursor(aiomysql.SSCursor) as cursor:
await cursor.execute("SELECT BookId, File FROM libbpics;")
while rows := await cursor.fetchmany(4096):
await postgres.executemany(
"UPDATE book_annotations "
"SET file = cast($3 as varchar) "
"FROM (SELECT id FROM books WHERE source = $1 AND remote_id = $2) as books "
"WHERE book = books.id;",
(fix_link(row) for row in rows),
)
await postgres.close()
mysql.close()
async def update_fl_author_annotations(
ctx: dict, *args, prefix: Optional[str] = None, **kwargs
):
arq_pool: ArqRedis = ctx["arq_pool"]
is_deps_complete, not_complete_count = await is_jobs_complete(
arq_pool,
[
JobId.import_lib_a_annotations.value,
JobId.update_authors.value,
],
prefix=prefix,
)
if not is_deps_complete:
raise Retry(defer=45 * not_complete_count)
postgres, mysql = await get_db_cons()
source = await get_source(postgres)
await postgres.execute(
"""
CREATE OR REPLACE FUNCTION update_author_annotation(source_ smallint, author_ integer, title_ varchar, text_ text) RETURNS void AS $$
DECLARE
author_id integer := -1;
BEGIN
SELECT id INTO author_id FROM authors WHERE source = source_ AND remote_id = author_;
IF EXISTS (SELECT * FROM author_annotations WHERE author = author_id) THEN
UPDATE author_annotations SET title = title_, text = text_ WHERE author = author_id;
RETURN;
END IF;
INSERT INTO author_annotations (author, title, text) VALUES (author_id, title_, text_);
END;
$$ LANGUAGE plpgsql;
"""
)
def fix_annotation(row) -> list:
return [
source,
row[0],
row[1],
fix_annotation_text(row[2]),
]
async with mysql.cursor(aiomysql.SSCursor) as cursor:
await cursor.execute("SELECT AvtorId, Title, Body FROM libaannotations;")
while rows := await cursor.fetchmany(4096):
await postgres.executemany(
"SELECT update_author_annotation($1, $2, cast($3 as varchar), cast($4 as text));",
(fix_annotation(row) for row in rows),
)
await postgres.close()
mysql.close()
async def update_fl_author_annotations_pics(
ctx: dict, *args, prefix: Optional[str] = None, **kwargs
):
arq_pool: ArqRedis = ctx["arq_pool"]
if not await is_jobs_complete(
arq_pool,
[
JobId.import_lib_a_annotations_pics.value,
JobId.update_author_annotations.value,
],
prefix=prefix,
):
raise Retry(defer=60)
postgres, mysql = await get_db_cons()
source = await get_source(postgres)
def fix_link(row):
return [source, row[0], f"{env_config.FL_BASE_URL}/ia/{row[1]}"]
async with mysql.cursor(aiomysql.SSCursor) as cursor:
await cursor.execute("SELECT AvtorId, File FROM libapics;")
while rows := await cursor.fetchmany(4096):
await postgres.executemany(
"UPDATE author_annotations "
"SET file = cast($3 as varchar) "
"FROM (SELECT id FROM authors WHERE source = $1 AND remote_id = $2) as authors "
"WHERE author = authors.id;",
(fix_link(row) for row in rows),
)
await postgres.close()
mysql.close()
async def update_fl_genres(ctx: dict, *args, prefix: Optional[str] = None, **kwargs):
arq_pool: ArqRedis = ctx["arq_pool"]
is_deps_complete, not_complete_count = await is_jobs_complete(
arq_pool,
[
JobId.import_libgenrelist.value,
],
prefix=prefix,
)
if not is_deps_complete:
raise Retry(defer=45 * not_complete_count)
postgres, mysql = await get_db_cons()
source = await get_source(postgres)
await postgres.execute(
"""
CREATE OR REPLACE FUNCTION update_genre(
source_ smallint, remote_id_ int, code_ varchar, description_ varchar, meta_ varchar
) RETURNS void AS $$
BEGIN
IF EXISTS (SELECT * FROM genres WHERE source = source_ AND remote_id = remote_id_) THEN
UPDATE genres SET code = code_, description = description_, meta = meta_
WHERE source = source_ AND remote_id = remote_id_;
RETURN;
END IF;
INSERT INTO authors (source, remote_id, code, description, meta)
VALUES (source_, remote_id_, code_, description_, meta_);
END;
$$ LANGUAGE plpgsql;
"""
)
async with mysql.cursor(aiomysql.SSCursor) as cursor:
await cursor.execute(
"SELECT GenreId, GenreCode, GenreDesc, GenreMeta FROM libgenrelist;"
)
while rows := await cursor.fetchmany(4096):
await postgres.executemany(
"SELECT update_genre($1, $2, cast($3 as varchar), cast($4 as varchar), cast($5 as varchar));",
([source, *row] for row in rows),
)
await postgres.close()
mysql.close()
async def update_fl_books_genres(
ctx: dict, *args, prefix: Optional[str] = None, **kwargs
):
arq_pool: ArqRedis = ctx["arq_pool"]
is_deps_complete, not_complete_count = await is_jobs_complete(
arq_pool,
[
JobId.import_libgenre.value,
JobId.update_books.value,
JobId.update_genres.value,
],
prefix=prefix,
)
if not is_deps_complete:
raise Retry(defer=45 * not_complete_count)
postgres, mysql = await get_db_cons()
source = await get_source(postgres)
await postgres.execute(
"""
CREATE OR REPLACE FUNCTION update_book_sequence(source_ smallint, book_ integer, genre_ integer) RETURNS void AS $$
DECLARE
book_id integer := -1;
genre_id integer := -1;
BEGIN
SELECT id INTO book_id FROM books WHERE source = source_ AND remote_id = book_;
SELECT id INTO genre_id FROM genres WHERE source = source_ AND remote_id = genre_;
IF EXISTS (SELECT * FROM book_genres WHERE book = book_id AND genre = genre_id) THEN
RETURN;
END IF;
INSERT INTO book_genres (book, genre) VALUES (book_id, genre_id);
END;
$$ LANGUAGE plpgsql;
"""
)
async with mysql.cursor(aiomysql.SSCursor) as cursor:
await cursor.execute("SELECT BookId, GenreId FROM libgenre;")
while rows := await cursor.fetchmany(4096):
await postgres.executemany(
"SELECT update_book_sequence($1, $2, $3);",
((source, *row) for row in rows),
)
await postgres.close()
mysql.close()
async def update_fl_webhook(
ctx: dict,
*args,
prefix: Optional[str] = None,
**kwargs,
):
arq_pool: ArqRedis = ctx["arq_pool"]
is_deps_complete, not_complete_count = await is_jobs_complete(
arq_pool, [e.value for e in JobId if e != JobId.webhook], prefix=prefix
)
if not is_deps_complete:
raise Retry(defer=45 * not_complete_count)
all_success = True
for webhook in env_config.WEBHOOKS:
async with httpx.AsyncClient() as client:
response: httpx.Response = await getattr(client, webhook.method)(
webhook.url, headers=webhook.headers
)
if response.status_code != 200:
all_success = False
return all_success
async def run_fl_update(ctx: dict, *args, **kwargs) -> bool:
IMPORTS = {
JobId.import_libbook: "lib.libbook.sql",
JobId.import_libavtor: "lib.libavtor.sql",
JobId.import_libavtorname: "lib.libavtorname.sql",
JobId.import_libtranslator: "lib.libtranslator.sql",
JobId.import_libseqname: "lib.libseqname.sql",
JobId.import_libseq: "lib.libseq.sql",
JobId.import_libgenre: "lib.libgenre.sql",
JobId.import_libgenrelist: "lib.libgenrelist.sql",
JobId.import_lib_b_annotations: "lib.b.annotations.sql",
JobId.import_lib_b_annotations_pics: "lib.b.annotations_pics.sql",
JobId.import_lib_a_annotations: "lib.a.annotations.sql",
JobId.import_lib_a_annotations_pics: "lib.a.annotations_pics.sql",
}
UPDATES = (
JobId.update_books,
JobId.update_book_annotations,
JobId.update_book_annotations_pic,
JobId.update_books_genres,
JobId.update_authors,
JobId.update_author_annotations,
JobId.update_author_annotations_pics,
JobId.update_books_authors,
JobId.update_translations,
JobId.update_sequences,
JobId.update_sequences_info,
JobId.update_genres,
JobId.webhook,
)
arq_pool: ArqRedis = ctx["arq_pool"]
prefix = str(int(time.time()) // (5 * 60))
for job_id, filename in IMPORTS.items():
await arq_pool.enqueue_job(
"import_fl_dump", filename, _job_id=f"{prefix}_{job_id.value}"
)
for job_id in UPDATES:
await arq_pool.enqueue_job(
job_id.value, prefix=prefix, _job_id=f"{prefix}_{job_id.value}"
)
return True
__tasks__ = [
run_fl_update,
import_fl_dump,
update_fl_authors,
update_fl_books,
update_fl_books_authors,
update_fl_translations,
update_fl_sequences,
update_fl_sequences_info,
update_fl_book_annotations,
update_fl_book_annotations_pic,
update_fl_author_annotations,
update_fl_author_annotations_pics,
update_fl_genres,
update_fl_books_genres,
update_fl_webhook,
]

View File

@@ -1,11 +0,0 @@
import asyncio
from typing import Optional
async def run_cmd(cmd: str) -> tuple[bytes, bytes, Optional[int]]:
proc = await asyncio.create_subprocess_shell(
cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
return stdout, stderr, proc.returncode

View File

@@ -1,29 +0,0 @@
from typing import Optional
from arq.connections import ArqRedis
from arq.jobs import Job, JobStatus
async def is_jobs_complete(
arq_pool: ArqRedis, job_ids: list[str], prefix: Optional[str] = None
) -> tuple[bool, int]:
job_statuses = []
for job_id in job_ids:
_job_id = f"{prefix}_{job_id}" if prefix else job_id
status = await Job(
_job_id, arq_pool, arq_pool.default_queue_name, arq_pool.job_deserializer
).status()
job_statuses.append(status.value)
not_complete_count = 0
for status in (
JobStatus.not_found.value,
JobStatus.deferred.value,
JobStatus.in_progress.value,
JobStatus.queued.value,
):
if status in job_statuses:
not_complete_count += 1
return not_complete_count == 0, not_complete_count

View File

@@ -1,28 +0,0 @@
import re
def remove_wrong_ch(s: str) -> str:
return s.replace(";", "").replace("\n", " ").replace("ё", "е")
def remove_dots(s: str) -> str:
return s.replace(".", "")
tags_regexp = re.compile(r"<.*?>")
def fix_annotation_text(text: str) -> str:
replace_map = {
"&nbsp;": "",
"[b]": "",
"[/b]": "",
"[hr]": "",
}
t = tags_regexp.sub("", text)
for key in replace_map:
t = t.replace(key, replace_map[key])
return t

View File

@@ -1,22 +0,0 @@
from fastapi import APIRouter, Depends, Request
from arq.connections import ArqRedis
from app.depends import check_token
from app.services.updaters import UpdaterTypes, UPDATERS
router = APIRouter(tags=["updater"], dependencies=[Depends(check_token)])
@router.post("/update/{updater}")
async def update(request: Request, updater: UpdaterTypes):
arq_pool: ArqRedis = request.app.state.arq_pool
await arq_pool.enqueue_job(UPDATERS[updater])
return "Ok!"
@router.get("/healthcheck")
async def healthcheck():
return "Ok!"

57
src/config.rs Normal file
View File

@@ -0,0 +1,57 @@
use serde::Deserialize;
use serde_json::Map;
#[derive(Deserialize, Clone)]
pub enum Method {
#[serde(rename = "get")]
Get,
#[serde(rename = "post")]
Post
}
#[derive(Deserialize, Clone)]
pub struct Webhook {
pub method: Method,
pub url: String,
pub headers: Map<String, serde_json::Value>,
}
pub struct Config {
pub sentry_dsn: String,
pub postgres_db_name: String,
pub postgres_host: String,
pub postgres_port: u16,
pub postgres_user: String,
pub postgres_password: String,
pub fl_base_url: String,
pub webhooks: Vec<Webhook>,
}
fn get_env(env: &'static str) -> String {
std::env::var(env).unwrap_or_else(|_| panic!("Cannot get the {} env variable", env))
}
impl Config {
pub fn load() -> Config {
Config {
sentry_dsn: get_env("SENTRY_DSN"),
postgres_db_name: get_env("POSTGRES_DB_NAME"),
postgres_host: get_env("POSTGRES_HOST"),
postgres_port: get_env("POSTGRES_PORT").parse().unwrap(),
postgres_user: get_env("POSTGRES_USER"),
postgres_password: get_env("POSTGRES_PASSWORD"),
fl_base_url: get_env("FL_BASE_URL"),
webhooks: serde_json::from_str(&get_env("WEBHOOKS")).unwrap(),
}
}
}
lazy_static! {
pub static ref CONFIG: Config = Config::load();
}

View File

@@ -1,17 +0,0 @@
from fastapi import FastAPI
from app.views import router
from core.arq_pool import get_arq_pool
import core.sentry # noqa: F401
def start_app() -> FastAPI:
app = FastAPI()
app.include_router(router)
@app.on_event("startup")
async def startup() -> None:
app.state.arq_pool = await get_arq_pool()
return app

View File

@@ -1,15 +0,0 @@
from arq.connections import create_pool, RedisSettings, ArqRedis
from core.config import env_config
def get_redis_settings() -> RedisSettings:
return RedisSettings(
host=env_config.REDIS_HOST,
port=env_config.REDIS_PORT,
database=env_config.REDIS_DB,
)
async def get_arq_pool() -> ArqRedis:
return await create_pool(get_redis_settings())

View File

@@ -1,4 +0,0 @@
from fastapi.security import APIKeyHeader
default_security = APIKeyHeader(name="Authorization")

View File

@@ -1,38 +0,0 @@
from typing import Union, Literal
from pydantic import BaseModel, BaseSettings
class WebhookConfig(BaseModel):
method: Union[Literal["get"], Literal["post"]]
url: str
headers: dict[str, str]
class EnvConfig(BaseSettings):
API_KEY: str
POSTGRES_DB_NAME: str
POSTGRES_HOST: str
POSTGRES_PORT: int
POSTGRES_USER: str
POSTGRES_PASSWORD: str
MYSQL_DB_NAME: str
MYSQL_HOST: str
MYSQL_PORT: int
MYSQL_USER: str
MYSQL_PASSWORD: str
REDIS_HOST: str
REDIS_PORT: int
REDIS_DB: int
FL_BASE_URL: str
SENTRY_DSN: str
WEBHOOKS: list[WebhookConfig]
env_config = EnvConfig()

View File

@@ -1,8 +0,0 @@
import sentry_sdk
from core.config import env_config
sentry_sdk.init(
env_config.SENTRY_DSN,
)

View File

@@ -1,32 +0,0 @@
from arq.connections import ArqRedis
from app.services.updaters.fl_updater import __tasks__ as fl_tasks
# from app.services.updaters.fl_updater import run_fl_update
from core.arq_pool import get_redis_settings, get_arq_pool
import core.sentry # noqa: F401
# from arq.cron import cron
async def startup(ctx):
ctx["arq_pool"] = await get_arq_pool()
async def shutdown(ctx):
arq_pool: ArqRedis = ctx["arq_pool"]
arq_pool.close()
await arq_pool.wait_closed()
class WorkerSettings:
functions = [*fl_tasks]
on_startup = startup
on_shutdown = shutdown
redis_settings = get_redis_settings()
max_jobs = 2
max_tries = 30
job_timeout = 10 * 60
# cron_jobs = [cron(run_fl_update, hour={5}, minute=0)]

View File

@@ -1,4 +0,0 @@
from core.app import start_app
app = start_app()

525
src/main.rs Normal file
View File

@@ -0,0 +1,525 @@
#[macro_use]
extern crate lazy_static;
pub mod config;
pub mod types;
pub mod utils;
use std::{
fmt::Debug,
sync::{Arc, Mutex}, str::FromStr
};
use config::Webhook;
use deadpool_postgres::{Config, CreatePoolError, ManagerConfig, Pool, RecyclingMethod, Runtime};
use futures::{io::copy, TryStreamExt};
use reqwest::header::{HeaderMap, HeaderValue, HeaderName};
use tokio::fs::{File, remove_file};
use tokio_cron_scheduler::{JobScheduler, Job, JobSchedulerError};
use tokio_postgres::NoTls;
use async_compression::futures::bufread::GzipDecoder;
use sql_parse::{
parse_statement, InsertReplace, InsertReplaceType, ParseOptions, SQLArguments, SQLDialect,
Statement,
};
use tokio_util::compat::TokioAsyncReadCompatExt;
use types::{
Author, AuthorAnnotation, AuthorAnnotationPic, BookAnnotation, BookAnnotationPic, BookAuthor,
BookGenre, FromVecExpression, Genre, Sequence, SequenceInfo, Translator, Update,
};
use utils::read_lines;
use crate::types::Book;
async fn download_file(filename_str: &str) -> Result<(), Box<dyn std::error::Error + Send>> {
log::info!("Download {filename_str}...");
let link = format!("{}/sql/{filename_str}.gz", &config::CONFIG.fl_base_url);
let response = match reqwest::get(link).await {
Ok(v) => v,
Err(err) => return Err(Box::new(err)),
};
let response = match response.error_for_status() {
Ok(v) => v,
Err(err) => return Err(Box::new(err)),
};
match remove_file(filename_str).await {
Ok(_) => (),
Err(err) => log::debug!("Can't remove file: {:?}", err),
};
let mut file = match File::create(filename_str).await {
Ok(v) => v.compat(),
Err(err) => {
log::error!("Can't create {filename_str}: {:?}", err);
return Err(Box::new(err))
},
};
let data = response
.bytes_stream()
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
.into_async_read();
let decoder = GzipDecoder::new(data);
match copy(decoder, &mut file).await {
Ok(_) => (),
Err(err) => {
log::error!("Can't write data {filename_str}: {}", err);
return Err(Box::new(err))
},
};
log::info!("{filename_str} downloaded!");
Ok(())
}
async fn process<T>(
pool: Pool,
source_id: i16,
file_name: &str,
deps: Vec<Arc<Mutex<Option<UpdateStatus>>>>,
) -> Result<(), Box<dyn std::error::Error + Send>>
where
T: Debug + FromVecExpression<T> + Update,
{
if deps.len() != 0 {
loop {
let mut some_failed = false;
let mut some_none = false;
for dep in deps.iter() {
let status = dep.lock().unwrap();
match &*status {
Some(status) => match status {
UpdateStatus::Success => (),
UpdateStatus::Fail => some_failed = true,
},
None => some_none = true,
}
}
if !some_failed && !some_none {
break;
}
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}
match download_file(file_name).await {
Ok(_) => (),
Err(err) => return Err(err),
};
let parse_options = ParseOptions::new()
.dialect(SQLDialect::MariaDB)
.arguments(SQLArguments::QuestionMark)
.warn_unquoted_identifiers(true);
let lines = read_lines(file_name);
let lines = match lines {
Ok(v) => v,
Err(err) => return Err(Box::new(err)),
};
match T::before_update(&pool.get().await.unwrap()).await {
Ok(_) => (),
Err(err) => return Err(err),
};
log::info!("Start update {file_name}...");
for line in lines.into_iter() {
let line = match line {
Ok(line) => line,
Err(err) => return Err(Box::new(err)),
};
let mut issues = Vec::new();
let ast = parse_statement(&line, &mut issues, &parse_options);
match ast {
Some(Statement::InsertReplace(
i @ InsertReplace {
type_: InsertReplaceType::Insert(_),
..
},
)) => {
for value in i.values.into_iter() {
for t_value in value.1.into_iter() {
let value = T::from_vec_expression(&t_value);
let client = pool.get().await.unwrap();
match value.update(&client, source_id).await {
Ok(_) => {
// log::info!("{:?}", value);
()
}
Err(err) => {
log::error!("Update error: {:?} : {:?}", value, err);
return Err(err)
},
}
}
}
}
_ => (),
}
}
log::info!("Updated {file_name}...");
Ok(())
}
async fn get_postgres_pool() -> Result<Pool, CreatePoolError> {
let mut config = Config::new();
config.host = Some(config::CONFIG.postgres_host.clone());
config.port = Some(config::CONFIG.postgres_port);
config.dbname = Some(config::CONFIG.postgres_db_name.clone());
config.user = Some(config::CONFIG.postgres_user.clone());
config.password = Some(config::CONFIG.postgres_password.clone());
config.connect_timeout = Some(std::time::Duration::from_secs(5));
config.manager = Some(ManagerConfig {
recycling_method: RecyclingMethod::Verified,
});
match config.create_pool(Some(Runtime::Tokio1), NoTls) {
Ok(pool) => Ok(pool),
Err(err) => Err(err),
}
}
async fn get_source(pool: Pool) -> Result<i16, Box<dyn std::error::Error>> {
let client = pool.get().await.unwrap();
let row = match client
.query_one("SELECT id FROM sources WHERE name = 'flibusta';", &[])
.await
{
Ok(v) => v,
Err(err) => return Err(Box::new(err)),
};
let id = row.get(0);
Ok(id)
}
enum UpdateStatus {
Success,
Fail,
}
async fn send_webhooks() -> Result<(), Box<reqwest::Error>> {
for webhook in config::CONFIG.webhooks.clone().into_iter() {
let Webhook { method, url, headers } = webhook;
let client = reqwest::Client::new();
let builder = match method {
config::Method::Get => {
client.get(url)
},
config::Method::Post => {
client.post(url)
},
};
let t_headers: Vec<(HeaderName, HeaderValue)> = headers.into_iter().map(|(key, val)| {
let value = match val {
serde_json::Value::String(v) => v,
_ => panic!("Header value not string!")
};
(
HeaderName::from_str(key.as_ref()).unwrap(),
HeaderValue::from_str(&value).unwrap()
)
}).collect();
let headers = HeaderMap::from_iter(t_headers.into_iter());
let response = builder.headers(headers).send().await;
let response = match response {
Ok(v) => v,
Err(err) => return Err(Box::new(err)),
};
match response.error_for_status() {
Ok(_) => (),
Err(err) => return Err(Box::new(err)),
};
};
Ok(())
}
async fn update() -> Result<(), Box<dyn std::error::Error>> {
log::info!("Start update...");
let pool = match get_postgres_pool().await {
Ok(pool) => pool,
Err(err) => panic!("{:?}", err),
};
let source_id = match get_source(pool.clone()).await {
Ok(v) => Arc::new(v),
Err(err) => panic!("{:?}", err),
};
let author_status: Arc<Mutex<Option<UpdateStatus>>> = Arc::new(Mutex::new(None));
let book_status: Arc<Mutex<Option<UpdateStatus>>> = Arc::new(Mutex::new(None));
let sequence_status: Arc<Mutex<Option<UpdateStatus>>> = Arc::new(Mutex::new(None));
let book_annotation_status: Arc<Mutex<Option<UpdateStatus>>> = Arc::new(Mutex::new(None));
let author_annotation_status: Arc<Mutex<Option<UpdateStatus>>> = Arc::new(Mutex::new(None));
let genre_status: Arc<Mutex<Option<UpdateStatus>>> = Arc::new(Mutex::new(None));
let pool_clone = pool.clone();
let author_status_clone = author_status.clone();
let source_id_clone = source_id.clone();
let author_process = tokio::spawn(async move {
match process::<Author>(pool_clone, *source_id_clone, "lib.libavtorname.sql", vec![]).await
{
Ok(_) => {
let mut status = author_status_clone.lock().unwrap();
*status = Some(UpdateStatus::Success);
Ok(())
}
Err(err) => {
let mut status = author_status_clone.lock().unwrap();
*status = Some(UpdateStatus::Success);
Err(err)
}
}
});
let pool_clone = pool.clone();
let book_status_clone = book_status.clone();
let source_id_clone = source_id.clone();
let book_process = tokio::spawn(async move {
match process::<Book>(pool_clone, *source_id_clone, "lib.libbook.sql", vec![]).await {
Ok(_) => {
let mut status = book_status_clone.lock().unwrap();
*status = Some(UpdateStatus::Success);
Ok(())
}
Err(err) => {
let mut status = book_status_clone.lock().unwrap();
*status = Some(UpdateStatus::Fail);
Err(err)
}
}
});
let pool_clone = pool.clone();
let deps = vec![author_status.clone(), book_status.clone()];
let source_id_clone = source_id.clone();
let book_author_process = tokio::spawn(async move {
process::<BookAuthor>(pool_clone, *source_id_clone, "lib.libavtor.sql", deps).await
});
let pool_clone = pool.clone();
let deps = vec![author_status.clone(), book_status.clone()];
let source_id_clone = source_id.clone();
let translator_process = tokio::spawn(async move {
process::<Translator>(pool_clone, *source_id_clone, "lib.libtranslator.sql", deps).await
});
let pool_clone = pool.clone();
let sequence_status_clone = sequence_status.clone();
let source_id_clone = source_id.clone();
let sequence_process = tokio::spawn(async move {
match process::<Sequence>(pool_clone, *source_id_clone, "lib.libseqname.sql", vec![]).await
{
Ok(_) => {
let mut status = sequence_status_clone.lock().unwrap();
*status = Some(UpdateStatus::Success);
Ok(())
}
Err(err) => {
let mut status = sequence_status_clone.lock().unwrap();
*status = Some(UpdateStatus::Fail);
Err(err)
}
}
});
let pool_clone = pool.clone();
let deps = vec![book_status.clone(), sequence_status.clone()];
let source_id_clone = source_id.clone();
let sequence_info_process = tokio::spawn(async move {
process::<SequenceInfo>(pool_clone, *source_id_clone, "lib.libseq.sql", deps).await
});
let pool_clone = pool.clone();
let deps = vec![book_status.clone()];
let book_annotation_status_clone = book_annotation_status.clone();
let source_id_clone = source_id.clone();
let book_annotation_process = tokio::spawn(async move {
match process::<BookAnnotation>(pool_clone, *source_id_clone, "lib.b.annotations.sql", deps)
.await
{
Ok(_) => {
let mut status = book_annotation_status_clone.lock().unwrap();
*status = Some(UpdateStatus::Success);
Ok(())
}
Err(err) => {
let mut status = book_annotation_status_clone.lock().unwrap();
*status = Some(UpdateStatus::Fail);
Err(err)
}
}
});
let pool_clone = pool.clone();
let deps = vec![book_annotation_status.clone()];
let source_id_clone = source_id.clone();
let book_annotation_pics_process = tokio::spawn(async move {
process::<BookAnnotationPic>(
pool_clone,
*source_id_clone,
"lib.b.annotations_pics.sql",
deps,
)
.await
});
let pool_clone = pool.clone();
let deps = vec![author_status.clone()];
let author_annotation_status_clone = author_annotation_status.clone();
let source_id_clone = source_id.clone();
let author_annotation_process = tokio::spawn(async move {
match process::<AuthorAnnotation>(
pool_clone,
*source_id_clone,
"lib.a.annotations.sql",
deps,
)
.await
{
Ok(_) => {
let mut status = author_annotation_status_clone.lock().unwrap();
*status = Some(UpdateStatus::Success);
Ok(())
}
Err(err) => {
let mut status = author_annotation_status_clone.lock().unwrap();
*status = Some(UpdateStatus::Fail);
Err(err)
}
}
});
let pool_clone = pool.clone();
let deps = vec![author_annotation_status.clone()];
let source_id_clone = source_id.clone();
let author_annotation_pics_process = tokio::spawn(async move {
process::<AuthorAnnotationPic>(
pool_clone,
*source_id_clone,
"lib.a.annotations_pics.sql",
deps,
)
.await
});
let pool_clone = pool.clone();
let genre_status_clone = genre_status.clone();
let source_id_clone = source_id.clone();
let genre_annotation_process = tokio::spawn(async move {
match process::<Genre>(pool_clone, *source_id_clone, "lib.libgenrelist.sql", vec![]).await {
Ok(_) => {
let mut status = genre_status_clone.lock().unwrap();
*status = Some(UpdateStatus::Success);
Ok(())
}
Err(err) => {
let mut status = genre_status_clone.lock().unwrap();
*status = Some(UpdateStatus::Fail);
Err(err)
}
}
});
let pool_clone = pool.clone();
let deps = vec![genre_status.clone(), book_status.clone()];
let source_id_clone = source_id.clone();
let book_genre_process = tokio::spawn(async move {
process::<BookGenre>(pool_clone, *source_id_clone, "lib.libgenre.sql", deps).await
});
for process in [
author_process,
book_process,
book_author_process,
translator_process,
sequence_process,
sequence_info_process,
book_annotation_process,
book_annotation_pics_process,
author_annotation_process,
author_annotation_pics_process,
genre_annotation_process,
book_genre_process
] {
let process_result = match process.await {
Ok(v) => v,
Err(err) => return Err(Box::new(err)),
};
match process_result {
Ok(_) => (),
Err(err) => panic!("{:?}", err),
}
}
match send_webhooks().await {
Ok(_) => {
log::info!("Webhooks sended!");
},
Err(err) => {
log::info!("Webhooks send failed : {err}");
return Err(Box::new(err))
},
};
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), JobSchedulerError> {
let _guard = sentry::init(config::CONFIG.sentry_dsn.clone());
env_logger::init();
let job_scheduler = JobScheduler::new().await.unwrap();
let update_job = match Job::new_async("* 0 5 * * *", |_uuid, _l| Box::pin(async {
match update().await {
Ok(_) => log::info!("Updated"),
Err(err) => log::info!("Update err: {:?}", err),
};
})) {
Ok(v) => v,
Err(err) => panic!("{:?}", err),
};
job_scheduler.add(update_job).await.unwrap();
match job_scheduler.start().await {
Ok(_) => Ok(()),
Err(err) => Err(err),
}
}

864
src/types.rs Normal file
View File

@@ -0,0 +1,864 @@
use std::convert::TryInto;
use async_trait::async_trait;
use chrono::{NaiveDate, NaiveDateTime};
use sql_parse::Expression;
use tokio_postgres::Client;
use crate::utils::{fix_annotation_text, parse_lang, remove_wrong_chars};
pub trait FromVecExpression<T> {
fn from_vec_expression(value: &Vec<Expression>) -> T;
}
#[async_trait]
pub trait Update {
async fn before_update(
client: &Client,
) -> Result<(), Box<tokio_postgres::Error>>;
async fn update(
&self,
client: &Client,
source_id: i16,
) -> Result<(), Box<tokio_postgres::Error>>;
}
#[derive(Debug)]
pub struct Author {
pub id: u64,
pub last_name: String,
pub first_name: String,
pub middle_name: String,
}
impl FromVecExpression<Author> for Author {
fn from_vec_expression(value: &Vec<Expression>) -> Author {
Author {
id: match &value[0] {
sql_parse::Expression::Integer(v) => v.0,
_ => panic!("Author.id"),
},
last_name: match &value[3] {
sql_parse::Expression::String(v) => remove_wrong_chars(&v.value),
_ => panic!("Author.last_name"),
},
first_name: match &value[1] {
sql_parse::Expression::String(v) => remove_wrong_chars(&v.value),
_ => panic!("Author.first_name"),
},
middle_name: match &value[2] {
sql_parse::Expression::String(v) => remove_wrong_chars(&v.value),
_ => panic!("Author.middle_name"),
},
}
}
}
#[async_trait]
impl Update for Author {
async fn before_update(
client: &Client
) -> Result<(), Box<tokio_postgres::Error>> {
match client.execute(
"
CREATE OR REPLACE FUNCTION update_author(
source_ smallint, remote_id_ int, first_name_ varchar, last_name_ varchar, middle_name_ varchar
) RETURNS void AS $$
BEGIN
IF EXISTS (SELECT * FROM authors WHERE source = source_ AND remote_id = remote_id_) THEN
UPDATE authors SET first_name = first_name_, last_name = last_name_, middle_name = middle_name_
WHERE source = source_ AND remote_id = remote_id_;
RETURN;
END IF;
INSERT INTO authors (source, remote_id, first_name, last_name, middle_name)
VALUES (source_, remote_id_, first_name_, last_name_, middle_name_);
END;
$$ LANGUAGE plpgsql;
"
, &[]).await {
Ok(_) => Ok(()),
Err(err) => Err(Box::new(err)),
}
}
async fn update(
&self,
client: &Client,
source_id: i16,
) -> Result<(), Box<tokio_postgres::Error>> {
match client.execute(
"SELECT update_author($1, $2, cast($3 as varchar), cast($4 as varchar), cast($5 as varchar));",
&[&source_id, &(self.id as i32), &self.first_name, &self.last_name, &self.middle_name]
).await {
Ok(_) => Ok(()),
Err(err) => Err(Box::new(err)),
}
}
}
#[derive(Debug)]
pub struct Book {
pub id: u64,
pub title: String,
pub lang: String,
pub file_type: String,
pub uploaded: NaiveDate,
pub is_deleted: bool,
pub pages: u64,
}
impl FromVecExpression<Book> for Book {
fn from_vec_expression(value: &Vec<Expression>) -> Book {
Book {
id: match &value[0] {
sql_parse::Expression::Integer(v) => v.0,
_ => panic!("Book.id"),
},
title: match &value[3] {
sql_parse::Expression::String(v) => remove_wrong_chars(&v.value),
_ => panic!("Book.title"),
},
lang: match &value[5] {
sql_parse::Expression::String(v) => parse_lang(&v.value),
_ => panic!("Book.lang"),
},
file_type: match &value[8] {
sql_parse::Expression::String(v) => v.value.to_string(),
_ => panic!("Book.file_type"),
},
uploaded: match &value[2] {
sql_parse::Expression::String(v) => {
NaiveDateTime::parse_from_str(&v.value.to_string(), "%Y-%m-%d %H:%M:%S")
.unwrap()
.date()
}
_ => panic!("Book.uploaded"),
},
is_deleted: match &value[11] {
sql_parse::Expression::String(v) => v.value == "1",
_ => panic!("Book.is_deleted"),
},
pages: match &value[20] {
sql_parse::Expression::Integer(v) => v.0,
_ => panic!("Book.id"),
},
}
}
}
#[async_trait]
impl Update for Book {
async fn before_update(
client: &Client
) -> Result<(), Box<tokio_postgres::Error>> {
match client.execute(
"
CREATE OR REPLACE FUNCTION update_book(
source_ smallint, remote_id_ int, title_ varchar, lang_ varchar,
file_type_ varchar, uploaded_ date, is_deleted_ boolean, pages_ int
) RETURNS void AS $$
BEGIN
IF EXISTS (SELECT * FROM books WHERE source = source_ AND remote_id = remote_id_) THEN
UPDATE books SET title = title_, lang = lang_, file_type = file_type_,
uploaded = uploaded_, is_deleted = is_deleted, pages = pages_
WHERE source = source_ AND remote_id = remote_id_;
RETURN;
END IF;
INSERT INTO books (source, remote_id, title, lang, file_type, uploaded, is_deleted, pages)
VALUES (source_, remote_id_, title_, lang_, file_type_, uploaded_, is_deleted_, pages_);
END;
$$ LANGUAGE plpgsql;
"
, &[]).await {
Ok(_) => Ok(()),
Err(err) => Err(Box::new(err)),
}
}
async fn update(
&self,
client: &Client,
source_id: i16,
) -> Result<(), Box<tokio_postgres::Error>> {
match client.execute(
"SELECT update_book($1, $2, cast($3 as varchar), cast($4 as varchar), cast($5 as varchar), $6, $7, $8);",
&[&source_id, &(self.id as i32), &self.title, &self.lang, &self.file_type, &self.uploaded, &self.is_deleted, &(self.pages as i32)]
).await {
Ok(_) => Ok(()),
Err(err) => Err(Box::new(err)),
}
}
}
#[derive(Debug)]
pub struct BookAuthor {
pub book_id: u64,
pub author_id: u64,
// TODO: position
}
impl FromVecExpression<BookAuthor> for BookAuthor {
fn from_vec_expression(value: &Vec<Expression>) -> BookAuthor {
BookAuthor {
book_id: match &value[0] {
sql_parse::Expression::Integer(v) => v.0,
_ => panic!("BookAuthor.book_id"),
},
author_id: match &value[1] {
sql_parse::Expression::Integer(v) => v.0,
_ => panic!("BookAuthor.author_id"),
},
}
}
}
#[async_trait]
impl Update for BookAuthor {
async fn before_update(
client: &Client
) -> Result<(), Box<tokio_postgres::Error>> {
match client.execute(
"
CREATE OR REPLACE FUNCTION update_book_author(source_ smallint, book_ integer, author_ integer) RETURNS void AS $$
DECLARE
book_id integer := -1;
author_id integer := -1;
BEGIN
SELECT id INTO book_id FROM books WHERE source = source_ AND remote_id = book_;
SELECT id INTO author_id FROM authors WHERE source = source_ AND remote_id = author_;
IF EXISTS (SELECT * FROM book_authors WHERE book = book_id AND author = author_id) THEN
RETURN;
END IF;
INSERT INTO book_authors (book, author) VALUES (book_id, author_id);
END;
$$ LANGUAGE plpgsql;
"
, &[]).await {
Ok(_) => Ok(()),
Err(err) => Err(Box::new(err)),
}
}
async fn update(
&self,
client: &Client,
source_id: i16,
) -> Result<(), Box<tokio_postgres::Error>> {
match client
.execute(
"SELECT update_book_author($1, $2, $3);",
&[&source_id, &(self.book_id as i32), &(self.author_id as i32)],
)
.await
{
Ok(_) => Ok(()),
Err(err) => Err(Box::new(err)),
}
}
}
#[derive(Debug)]
pub struct Translator {
pub book_id: u64,
pub author_id: u64,
pub position: u64,
}
impl FromVecExpression<Translator> for Translator {
fn from_vec_expression(value: &Vec<Expression>) -> Translator {
Translator {
book_id: match &value[0] {
sql_parse::Expression::Integer(v) => v.0,
_ => panic!("Translator.book_id"),
},
author_id: match &value[1] {
sql_parse::Expression::Integer(v) => v.0,
_ => panic!("Translator.author_id"),
},
position: match &value[2] {
sql_parse::Expression::Integer(v) => v.0,
_ => panic!("Translator.pos"),
},
}
}
}
#[async_trait]
impl Update for Translator {
async fn before_update(
client: &Client
) -> Result<(), Box<tokio_postgres::Error>> {
match client.execute(
"
CREATE OR REPLACE FUNCTION update_translation(source_ smallint, book_ integer, author_ integer, position_ smallint) RETURNS void AS $$
DECLARE
book_id integer := -1;
author_id integer := -1;
BEGIN
SELECT id INTO book_id FROM books WHERE source = source_ AND remote_id = book_;
SELECT id INTO author_id FROM authors WHERE source = source_ AND remote_id = author_;
IF EXISTS (SELECT * FROM translations WHERE book = book_id AND author = author_id) THEN
UPDATE translations SET position = position_
WHERE book = book_id AND author = author_id;
RETURN;
END IF;
INSERT INTO translations (book, author, position) VALUES (book_id, author_id, position_);
END;
$$ LANGUAGE plpgsql;
"
, &[]).await {
Ok(_) => Ok(()),
Err(err) => Err(Box::new(err)),
}
}
async fn update(
&self,
client: &Client,
source_id: i16,
) -> Result<(), Box<tokio_postgres::Error>> {
match client
.execute(
"SELECT update_translation($1, $2, $3, $4);",
&[
&source_id,
&(self.book_id as i32),
&(self.author_id as i32),
&(self.position as i16),
],
)
.await
{
Ok(_) => Ok(()),
Err(err) => Err(Box::new(err)),
}
}
}
#[derive(Debug)]
pub struct Sequence {
pub id: u64,
pub name: String,
}
impl FromVecExpression<Sequence> for Sequence {
fn from_vec_expression(value: &Vec<Expression>) -> Sequence {
Sequence {
id: match &value[0] {
sql_parse::Expression::Integer(v) => v.0,
_ => panic!("Sequence.id"),
},
name: match &value[1] {
sql_parse::Expression::String(v) => remove_wrong_chars(&v.value),
_ => panic!("Sequence.name"),
},
}
}
}
#[async_trait]
impl Update for Sequence {
async fn before_update(
client: &Client
) -> Result<(), Box<tokio_postgres::Error>> {
match client.execute(
"
CREATE OR REPLACE FUNCTION update_sequences(source_ smallint, remote_id_ int, name_ varchar) RETURNS void AS $$
BEGIN
IF EXISTS (SELECT * FROM sequences WHERE source = source_ AND remote_id = remote_id_) THEN
UPDATE sequences SET name = name_ WHERE source = source_ AND remote_id = remote_id_;
RETURN;
END IF;
INSERT INTO sequences (source, remote_id, name) VALUES (source_, remote_id_, name_);
END;
$$ LANGUAGE plpgsql;
"
, &[]).await {
Ok(_) => Ok(()),
Err(err) => Err(Box::new(err)),
}
}
async fn update(
&self,
client: &Client,
source_id: i16,
) -> Result<(), Box<tokio_postgres::Error>> {
match client
.execute(
"SELECT update_sequences($1, $2, cast($3 as varchar));",
&[&source_id, &(self.id as i32), &self.name],
)
.await
{
Ok(_) => Ok(()),
Err(err) => Err(Box::new(err)),
}
}
}
#[derive(Debug)]
pub struct SequenceInfo {
pub book_id: u64,
pub sequence_id: u64,
pub position: i64,
}
impl FromVecExpression<SequenceInfo> for SequenceInfo {
fn from_vec_expression(value: &Vec<Expression>) -> SequenceInfo {
SequenceInfo {
book_id: match &value[0] {
sql_parse::Expression::Integer(v) => v.0,
_ => panic!("SequenceInfo.book_id"),
},
sequence_id: match &value[1] {
sql_parse::Expression::Integer(v) => v.0,
_ => panic!("SequenceInfo.sequence_id"),
},
position: match &value[2] {
sql_parse::Expression::Integer(v) => v.0.try_into().unwrap(),
sql_parse::Expression::Unary {
op,
op_span: _,
operand,
} => match (op, operand.as_ref()) {
(sql_parse::UnaryOperator::Minus, Expression::Integer(v)) => {
let value: i64 = (v.0).try_into().unwrap();
-value
}
(_, _) => panic!("SequenceInfo.position = {:?}", &value[2]),
},
_ => panic!("SequenceInfo.position = {:?}", &value[2]),
},
}
}
}
#[async_trait]
impl Update for SequenceInfo {
async fn before_update(
client: &Client
) -> Result<(), Box<tokio_postgres::Error>> {
match client.execute(
"
CREATE OR REPLACE FUNCTION update_book_sequence(source_ smallint, book_ integer, sequence_ integer, position_ smallint) RETURNS void AS $$
DECLARE
book_id integer := -1;
sequence_id integer := -1;
BEGIN
SELECT id INTO book_id FROM books WHERE source = source_ AND remote_id = book_;
SELECT id INTO sequence_id FROM sequences WHERE source = source_ AND remote_id = sequence_;
IF EXISTS (SELECT * FROM book_sequences WHERE book = book_id AND sequence = sequence_id) THEN
UPDATE book_sequences SET position = position_ WHERE book = book_id AND sequence = sequence_id;
RETURN;
END IF;
INSERT INTO book_sequences (book, sequence, position) VALUES (book_id, sequence_id, position_);
END;
$$ LANGUAGE plpgsql;
"
, &[]).await {
Ok(_) => Ok(()),
Err(err) => Err(Box::new(err)),
}
}
async fn update(
&self,
client: &Client,
source_id: i16,
) -> Result<(), Box<tokio_postgres::Error>> {
match client
.execute(
"SELECT update_book_sequence($1, $2, $3, $4);",
&[
&source_id,
&(self.book_id as i32),
&(self.sequence_id as i32),
&(self.position as i16),
],
)
.await
{
Ok(_) => Ok(()),
Err(err) => Err(Box::new(err)),
}
}
}
#[derive(Debug)]
pub struct BookAnnotation {
pub book_id: u64,
pub title: String,
pub body: Option<String>,
}
impl FromVecExpression<BookAnnotation> for BookAnnotation {
fn from_vec_expression(value: &Vec<Expression>) -> BookAnnotation {
BookAnnotation {
book_id: match &value[0] {
sql_parse::Expression::Integer(v) => v.0,
_ => panic!("BookAnnotation.book_id"),
},
title: match &value[2] {
sql_parse::Expression::String(v) => v.value.to_string(),
_ => panic!("BookAnnotation.title"),
},
body: match &value[3] {
sql_parse::Expression::String(v) => Some(fix_annotation_text(&v.value)),
sql_parse::Expression::Null(_) => None,
_ => panic!("BookAnnotation.body"),
},
}
}
}
#[async_trait]
impl Update for BookAnnotation {
async fn before_update(
client: &Client
) -> Result<(), Box<tokio_postgres::Error>> {
match client.execute(
"
CREATE OR REPLACE FUNCTION update_book_annotation(source_ smallint, book_ integer, title_ varchar, text_ text) RETURNS void AS $$
DECLARE
book_id integer := -1;
BEGIN
SELECT id INTO book_id FROM books WHERE source = source_ AND remote_id = book_;
IF EXISTS (SELECT * FROM book_annotations WHERE book = book_id) THEN
UPDATE book_annotations SET title = title_, text = text_ WHERE book = book_id;
RETURN;
END IF;
IF book_id IS NULL THEN
RETURN;
END IF;
INSERT INTO book_annotations (book, title, text) VALUES (book_id, title_, text_);
END;
$$ LANGUAGE plpgsql;
"
, &[]).await {
Ok(_) => Ok(()),
Err(err) => Err(Box::new(err)),
}
}
async fn update(
&self,
client: &Client,
source_id: i16,
) -> Result<(), Box<tokio_postgres::Error>> {
match client
.execute(
"SELECT update_book_annotation($1, $2, cast($3 as varchar), cast($4 as text));",
&[&source_id, &(self.book_id as i32), &self.title, &self.body],
)
.await
{
Ok(_) => Ok(()),
Err(err) => Err(Box::new(err)),
}
}
}
#[derive(Debug)]
pub struct BookAnnotationPic {
pub book_id: u64,
pub file: String,
}
impl FromVecExpression<BookAnnotationPic> for BookAnnotationPic {
fn from_vec_expression(value: &Vec<Expression>) -> BookAnnotationPic {
BookAnnotationPic {
book_id: match &value[0] {
sql_parse::Expression::Integer(v) => v.0,
_ => panic!("BookAnnotationPic.book_id"),
},
file: match &value[2] {
sql_parse::Expression::String(v) => v.value.to_string(),
_ => panic!("BookAnnotationPic.file"),
},
}
}
}
#[async_trait]
impl Update for BookAnnotationPic {
async fn before_update(
_client: &Client
) -> Result<(), Box<tokio_postgres::Error>> {
Ok(())
}
async fn update(
&self,
client: &Client,
source_id: i16,
) -> Result<(), Box<tokio_postgres::Error>> {
match client
.execute(
"\
UPDATE book_annotations \
SET file = cast($3 as varchar) \
FROM (SELECT id FROM books WHERE source = $1 AND remote_id = $2) as books \
WHERE book = books.id;\
",
&[&source_id, &(self.book_id as i32), &self.file],
)
.await
{
Ok(_) => Ok(()),
Err(err) => Err(Box::new(err)),
}
}
}
#[derive(Debug)]
pub struct AuthorAnnotation {
pub author_id: u64,
pub title: String,
pub body: Option<String>,
}
impl FromVecExpression<AuthorAnnotation> for AuthorAnnotation {
fn from_vec_expression(value: &Vec<Expression>) -> AuthorAnnotation {
AuthorAnnotation {
author_id: match &value[0] {
sql_parse::Expression::Integer(v) => v.0,
_ => panic!("AuthorAnnotation.author_id"),
},
title: match &value[2] {
sql_parse::Expression::String(v) => v.value.to_string(),
_ => panic!("AuthorAnnotation.title"),
},
body: match &value[3] {
sql_parse::Expression::String(v) => Some(fix_annotation_text(&v.value)),
sql_parse::Expression::Null(_) => None,
_ => panic!("AuthorAnnotation.body"),
},
}
}
}
#[async_trait]
impl Update for AuthorAnnotation {
async fn before_update(
client: &Client
) -> Result<(), Box<tokio_postgres::Error>> {
match client.execute(
"
CREATE OR REPLACE FUNCTION update_author_annotation(source_ smallint, author_ integer, title_ varchar, text_ text) RETURNS void AS $$
DECLARE
author_id integer := -1;
BEGIN
SELECT id INTO author_id FROM authors WHERE source = source_ AND remote_id = author_;
IF EXISTS (SELECT * FROM author_annotations WHERE author = author_id) THEN
UPDATE author_annotations SET title = title_, text = text_ WHERE author = author_id;
RETURN;
END IF;
INSERT INTO author_annotations (author, title, text) VALUES (author_id, title_, text_);
END;
$$ LANGUAGE plpgsql;
"
, &[]).await {
Ok(_) => Ok(()),
Err(err) => Err(Box::new(err)),
}
}
async fn update(
&self,
client: &Client,
source_id: i16,
) -> Result<(), Box<tokio_postgres::Error>> {
match client
.execute(
"SELECT update_author_annotation($1, $2, cast($3 as varchar), cast($4 as text));",
&[
&source_id,
&(self.author_id as i32),
&self.title,
&self.body,
],
)
.await
{
Ok(_) => Ok(()),
Err(err) => Err(Box::new(err)),
}
}
}
#[derive(Debug)]
pub struct AuthorAnnotationPic {
pub author_id: u64,
pub file: String,
}
impl FromVecExpression<AuthorAnnotationPic> for AuthorAnnotationPic {
fn from_vec_expression(value: &Vec<Expression>) -> AuthorAnnotationPic {
AuthorAnnotationPic {
author_id: match &value[0] {
sql_parse::Expression::Integer(v) => v.0,
_ => panic!("AuthorAnnotationPic.book_id"),
},
file: match &value[2] {
sql_parse::Expression::String(v) => v.value.to_string(),
_ => panic!("AuthorAnnotationPic.file"),
},
}
}
}
#[async_trait]
impl Update for AuthorAnnotationPic {
async fn before_update(
_client: &Client
) -> Result<(), Box<tokio_postgres::Error>> {
Ok(())
}
async fn update(
&self,
client: &Client,
source_id: i16,
) -> Result<(), Box<tokio_postgres::Error>> {
match client
.execute(
"\
UPDATE author_annotations \
SET file = cast($3 as varchar) \
FROM (SELECT id FROM authors WHERE source = $1 AND remote_id = $2) as authors \
WHERE author = authors.id;",
&[&source_id, &(self.author_id as i32), &self.file],
)
.await
{
Ok(_) => Ok(()),
Err(err) => Err(Box::new(err)),
}
}
}
#[derive(Debug)]
pub struct Genre {
pub id: u64,
pub code: String,
pub description: String,
pub meta: String,
}
impl FromVecExpression<Genre> for Genre {
fn from_vec_expression(value: &Vec<Expression>) -> Genre {
Genre {
id: match &value[0] {
sql_parse::Expression::Integer(v) => v.0,
_ => panic!("Genre.id"),
},
code: match &value[1] {
sql_parse::Expression::String(v) => v.value.to_string(),
_ => panic!("Genre.code = {:?}", &value[1]),
},
description: match &value[2] {
sql_parse::Expression::String(v) => v.value.to_string(),
_ => panic!("Genre.description = {:?}", &value[2]),
},
meta: match &value[3] {
sql_parse::Expression::String(v) => v.value.to_string(),
_ => panic!("Genre.meta"),
},
}
}
}
#[async_trait]
impl Update for Genre {
async fn before_update(
client: &Client
) -> Result<(), Box<tokio_postgres::Error>> {
match client.execute(
"
CREATE OR REPLACE FUNCTION update_book_sequence(source_ smallint, book_ integer, genre_ integer) RETURNS void AS $$
DECLARE
book_id integer := -1;
genre_id integer := -1;
BEGIN
SELECT id INTO book_id FROM books WHERE source = source_ AND remote_id = book_;
SELECT id INTO genre_id FROM genres WHERE source = source_ AND remote_id = genre_;
IF EXISTS (SELECT * FROM book_genres WHERE book = book_id AND genre = genre_id) THEN
RETURN;
END IF;
INSERT INTO book_genres (book, genre) VALUES (book_id, genre_id);
END;
$$ LANGUAGE plpgsql;
"
, &[]).await {
Ok(_) => Ok(()),
Err(err) => Err(Box::new(err)),
}
}
async fn update(
&self,
client: &Client,
source_id: i16,
) -> Result<(), Box<tokio_postgres::Error>> {
match client
.execute(
"SELECT update_genre($1, $2, cast($3 as varchar), cast($4 as varchar), cast($5 as varchar));",
&[&source_id, &(self.id as i32), &self.code, &self.description, &self.meta]
).await
{
Ok(_) => Ok(()),
Err(err) => Err(Box::new(err)),
}
}
}
#[derive(Debug)]
pub struct BookGenre {
pub book_id: u64,
pub genre_id: u64,
}
impl FromVecExpression<BookGenre> for BookGenre {
fn from_vec_expression(value: &Vec<Expression>) -> BookGenre {
BookGenre {
book_id: match &value[1] {
sql_parse::Expression::Integer(v) => v.0,
_ => panic!("BookGenre.book_id"),
},
genre_id: match &value[2] {
sql_parse::Expression::Integer(v) => v.0,
_ => panic!("BookGenre.genre_id"),
},
}
}
}
#[async_trait]
impl Update for BookGenre {
async fn before_update(
_client: &Client
) -> Result<(), Box<tokio_postgres::Error>> {
Ok(())
}
async fn update(
&self,
client: &Client,
source_id: i16,
) -> Result<(), Box<tokio_postgres::Error>> {
match client
.execute(
"SELECT update_book_sequence($1, $2, $3);",
&[&source_id, &(self.book_id as i32), &(self.genre_id as i32)],
)
.await
{
Ok(_) => Ok(()),
Err(err) => Err(Box::new(err)),
}
}
}

26
src/utils.rs Normal file
View File

@@ -0,0 +1,26 @@
use std::fs::File;
use std::io::{self, BufRead};
use std::path::Path;
pub fn read_lines<P>(filename: P) -> io::Result<io::Lines<io::BufReader<File>>>
where
P: AsRef<Path>,
{
let file = File::open(filename)?;
Ok(io::BufReader::new(file).lines())
}
pub fn remove_wrong_chars(s: &str) -> String {
s.replace(';', "").replace('\n', " ").replace('ё', "е")
}
pub fn parse_lang(s: &str) -> String {
s.replace('-', "").replace('~', "").to_lowercase()
}
pub fn fix_annotation_text(text: &str) -> String {
text.replace("&nbsp;", "")
.replace("[b]", "")
.replace("[/b]", "")
.replace("[hr]", "")
}