commit 2c91d5d5ff2d823035cb85f5add04b68e2ba0474 Author: Kurbanov Bulat Date: Thu Nov 18 21:48:59 2021 +0300 Init diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6e969e6 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +__pycache__ + +venv + +.vscode diff --git a/docker/build.dockerfile b/docker/build.dockerfile new file mode 100644 index 0000000..e9ff8d4 --- /dev/null +++ b/docker/build.dockerfile @@ -0,0 +1,32 @@ +FROM python:3.10-slim as build-image + +RUN apt-get update \ + && apt-get install --no-install-recommends -y gcc build-essential python3-dev libpq-dev libffi-dev \ + && rm -rf /var/lib/apt/lists/* + +WORKDIR / +COPY ./requirements.txt ./ + +ENV VENV_PATH=/opt/venv +RUN python -m venv $VENV_PATH \ + && . "${VENV_PATH}/bin/activate" \ + && pip install -r requirements.txt --no-cache-dir + + +FROM python:3.10-slim as runtime-image + +RUN apt-get update \ + && apt-get install --no-install-recommends -y wget python3-dev libpq-dev libffi-dev default-mysql-client-core \ + && rm -rf /var/lib/apt/lists/* + +COPY ./src/ /app/ + +ENV VENV_PATH=/opt/venv +COPY --from=build-image $VENV_PATH $VENV_PATH +ENV PATH="$VENV_PATH/bin:$PATH" + +EXPOSE 8080 + +WORKDIR /app/ + +CMD uvicorn main:app --host="0.0.0.0" --port="8080" diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..3663d67 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,6 @@ +fastapi +pydantic[dotenv] +asyncpg +aiomysql +uvicorn[standart] +aiologger diff --git a/src/app/depends.py b/src/app/depends.py new file mode 100644 index 0000000..b99768e --- /dev/null +++ b/src/app/depends.py @@ -0,0 +1,9 @@ +from fastapi import Security, HTTPException, status + +from core.auth import default_security +from core.config import env_config + + +async def check_token(api_key: str = Security(default_security)): + if api_key != env_config.API_KEY: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Wrong api key!") diff --git a/src/app/services/updaters/__init__.py b/src/app/services/updaters/__init__.py new file mode 100644 index 0000000..289f7f3 --- /dev/null +++ b/src/app/services/updaters/__init__.py @@ -0,0 +1,13 @@ +from enum import Enum + +from app.services.updaters.base import BaseUpdater +from app.services.updaters.flibusta_updater import FlibustaUpdater + + +class UpdaterTypes(Enum): + FLIBUSTA = 'flibusta' + + +UPDATERS: dict[UpdaterTypes, BaseUpdater] = { + UpdaterTypes.FLIBUSTA: FlibustaUpdater +} diff --git a/src/app/services/updaters/base.py b/src/app/services/updaters/base.py new file mode 100644 index 0000000..57eca09 --- /dev/null +++ b/src/app/services/updaters/base.py @@ -0,0 +1,7 @@ +from typing import Protocol + + +class BaseUpdater(Protocol): + @classmethod + async def update(cls) -> bool: + ... diff --git a/src/app/services/updaters/flibusta_updater.py b/src/app/services/updaters/flibusta_updater.py new file mode 100644 index 0000000..b4bbae9 --- /dev/null +++ b/src/app/services/updaters/flibusta_updater.py @@ -0,0 +1,482 @@ +from typing import Optional +import asyncio +import platform + +import asyncpg +import aiomysql +from aiologger import Logger + +from core.config import env_config +from app.services.updaters.base import BaseUpdater + + +async def run(cmd) -> tuple[bytes, bytes, Optional[int]]: + proc = await asyncio.create_subprocess_shell( + cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE) + + stdout, stderr = await proc.communicate() + return stdout, stderr, proc.returncode + + +def remove_wrong_ch(s: str): + return s.replace(";", "").replace("\n", " ").replace('ё', 'е') + + +def remove_dots(s: str): + return s.replace('.', '') + + +class FlibustaUpdater(BaseUpdater): + SOURCE: int + + FILES = [ + 'lib.libavtor.sql', + 'lib.libbook.sql', + 'lib.libavtorname.sql', + 'lib.libtranslator.sql', + 'lib.libseqname.sql', + 'lib.libseq.sql', + 'lib.libgenre.sql', + 'lib.libgenrelist.sql', + 'lib.b.annotations.sql', + 'lib.b.annotations_pics.sql', + 'lib.a.annotations.sql', + 'lib.a.annotations_pics.sql', + ] + + postgres_pool: asyncpg.Pool + mysql_pool: aiomysql.Pool + + authors_updated_event: asyncio.Event + books_updated_event: asyncio.Event + sequences_updated_event: asyncio.Event + genres_updated_event: asyncio.Event + + platform: str + logger: Logger + + async def log(self, message): + if 'windows' in self.platform.lower(): + print(message) + else: + await self.logger.info(message) + + async def _drop_tables(self): + async with self.mysql_pool.acquire() as conn: + async with conn.cursor() as cursor: + cursor.execute( + "DROP TABLE IF EXISTS libaannotations;" + "DROP TABLE IF EXISTS libapics;" + "DROP TABLE IF EXISTS libavtor;" + "DROP TABLE IF EXISTS libavtorname;" + "DROP TABLE IF EXISTS libbannotations;" + "DROP TABLE IF EXISTS libbook;" + "DROP TABLE IF EXISTS libbpics;" + "DROP TABLE IF EXISTS libgenre;" + "DROP TABLE IF EXISTS libgenrelist;" + "DROP TABLE IF EXISTS libseq;" + "DROP TABLE IF EXISTS libseqname;" + "DROP TABLE IF EXISTS libtranslator;" + ) + + async def _import_dump(self, filename: str): + result = await run( + f"wget -O - http://flibusta.is/sql/{filename}.gz | gunzip | " + f"mysql -h {env_config.MYSQL_HOST} -u {env_config.MYSQL_USER} " + f"-p\"{env_config.MYSQL_PASSWORD}\" {env_config.MYSQL_DB_NAME}" + ) + await self.log(f"Imported {filename}: {result}.") + + async def _prepare(self): + posgres_pool = await asyncpg.create_pool( + database=env_config.POSTGRES_DB_NAME, + host=env_config.POSTGRES_HOST, + port=env_config.POSTGRES_PORT, + user=env_config.POSTGRES_USER, + password=env_config.POSTGRES_PASSWORD, + ) + + self.mysql_pool = await aiomysql.create_pool( + db=env_config.MYSQL_DB_NAME, + host=env_config.MYSQL_HOST, + port=env_config.MYSQL_PORT, + user=env_config.MYSQL_USER, + password=env_config.MYSQL_PASSWORD, + ) + + if not posgres_pool: + raise asyncpg.exceptions.PostgresConnectionError() + + self.postgres_pool = posgres_pool + + async def _set_source(self): + await self.log('Set source...') + + source_row = await self.postgres_pool.fetchrow( + "SELECT id FROM sources WHERE name = 'flibusta';" + ) + + if not source_row: + await self.postgres_pool.execute( + "INSERT INTO sources (name) VALUES ('flibusta');" + ) + + source_row = await self.postgres_pool.fetchrow( + "SELECT id FROM sources WHERE name = 'flibusta';" + ) + + self.SOURCE = source_row['id'] + + await self.log("Source has set!") + + async def _update_authors(self): + def prepare_author(row: list): + return [ + self.SOURCE, + row[0], + remove_wrong_ch(row[1]), + remove_wrong_ch(row[2]), + remove_wrong_ch(row[3]) + ] + + await self.log("Update authors...") + + async with self.mysql_pool.acquire() as conn: + async with conn.cursor() as cursor: + await cursor.execute( + "SELECT AvtorId, FirstName, MiddleName, LastName FROM libavtorname;" + ) + + while (rows := await cursor.fetchmany(32)): + await self.postgres_pool.executemany( + "INSERT INTO authors (source, remote_id, first_name, last_name, middle_name) " + "VALUES ($1, $2, cast($3 as varchar), cast($4 as varchar), cast($5 as varchar)) " + "ON CONFLICT (source, remote_id) " + "DO UPDATE SET first_name = EXCLUDED.first_name, last_name = EXCLUDED.last_name, middle_name = EXCLUDED.middle_name;", + [prepare_author(row) for row in rows] + ) + + self.authors_updated_event.set() + + await self.log("Authors updated!") + + async def _update_books(self): + def prepare_book(row: list): + return [ + self.SOURCE, + row[0], + remove_wrong_ch(row[1]), + row[2], + row[3], + row[4], + row[5] == '1' + ] + + await self.log("Update books...") + + async with self.mysql_pool.acquire() as conn: + async with conn.cursor() as cursor: + await cursor.execute( + "SELECT BookId, Title, Lang, FileType, Time, Deleted FROM libbook;" + ) + + while (rows := await cursor.fetchmany(32)): + await self.postgres_pool.executemany( + "INSERT INTO books (source, remote_id, title, lang, file_type, uploaded, is_deleted) " + "VALUES ($1, $2, cast($3 as varchar), cast($4 as varchar), cast($5 as varchar), $6, $7) " + "ON CONFLICT (source, remote_id) " + "DO UPDATE SET title = EXCLUDED.title, lang = EXCLUDED.lang, file_type = EXCLUDED.file_type, " + "uploaded = EXCLUDED.uploaded, is_deleted = EXCLUDED.is_deleted;", + [prepare_book(row) for row in rows] + ) + + self.books_updated_event.set() + + await self.log("Books updated!") + + async def _update_books_authors(self): + await self.books_updated_event.wait() + await self.authors_updated_event.wait() + + await self.log("Update books_authors...") + + async with self.mysql_pool.acquire() as conn: + async with conn.cursor() as cursor: + await cursor.execute( + "SELECT BookId, AvtorId FROM libavtor;" + ) + + while (rows := await cursor.fetchmany(32)): + await self.postgres_pool.executemany( + "INSERT INTO book_authors (book, author) " + "SELECT " + "(SELECT id FROM books WHERE source = $1 AND remote_id = $2), " + "(SELECT id FROM authors WHERE source = $1 AND remote_id = $3) " + "ON CONFLICT (book, author) DO NOTHING;", + [(self.SOURCE, *row) for row in rows] + ) + + await self.log("Books_authors updated!") + + async def _update_translations(self): + await self.books_updated_event.wait() + await self.authors_updated_event.wait() + + await self.log("Update translations...") + + async with self.mysql_pool.acquire() as conn: + async with conn.cursor() as cursor: + await cursor.execute( + "SELECT BookId, TranslatorId, Pos FROM libtranslator " + "WHERE BookId IN (SELECT BookId FROM libbook);" + ) + + while (rows := await cursor.fetchmany(32)): + await self.postgres_pool.executemany( + "INSERT INTO translations (book, author, position) " + "SELECT " + "(SELECT id FROM books WHERE source = $1 AND remote_id = $2), " + "(SELECT id FROM authors WHERE source = $1 AND remote_id = $3), " + "$4 " + "ON CONFLICT (book, author) " + "DO UPDATE SET position = EXCLUDED.position;", + [(self.SOURCE, *row) for row in rows] + ) + + await self.log("Translations updated!") + + async def _update_sequences(self): + def prepare_sequence(row: list): + return [ + self.SOURCE, + row[0], + remove_wrong_ch(row[1]), + ] + + await self.log("Update sequences...") + + async with self.mysql_pool.acquire() as conn: + async with conn.cursor() as cursor: + await cursor.execute( + "SELECT SeqId, SeqName FROM libseqname;" + ) + + while (rows := await cursor.fetchmany(32)): + await self.postgres_pool.executemany( + "INSERT INTO sequences (source, remote_id, name) " + "VALUES ($1, $2, cast($3 as varchar)) " + "ON CONFLICT (source, remote_id) " + "DO UPDATE SET name = EXCLUDED.name;", + [prepare_sequence(row) for row in rows] + ) + + self.sequences_updated_event.set() + + await self.log("Sequences updated!") + + async def _update_sequences_info(self): + await self.sequences_updated_event.wait() + await self.books_updated_event.wait() + + await self.log("Update book_sequences...") + + async with self.mysql_pool.acquire() as conn: + async with conn.cursor() as cursor: + await cursor.execute( + "SELECT BookId, SeqId, level FROM libseq " + "WHERE " + "BookId IN (SELECT BookId FROM libbook) AND " + "SeqId IN (SELECT SeqId FROM libseqname);" + ) + + while (rows := await cursor.fetchmany(32)): + await self.postgres_pool.executemany( + "INSERT INTO book_sequences (book, sequence, position) " + "SELECT " + "(SELECT id FROM books WHERE source = $1 AND remote_id = $2), " + "(SELECT id FROM sequences WHERE source = $1 AND remote_id = $3), " + "$4 " + "ON CONFLICT (book, sequence) " + "DO UPDATE SET position = EXCLUDED.position;", + [[self.SOURCE, *row] for row in rows] + ) + + await self.log("Book_sequences updated!") + + async def _update_book_annotations(self): + await self.books_updated_event.wait() + + await self.log("Update book_annotations...") + + async with self.mysql_pool.acquire() as conn: + async with conn.cursor() as cursor: + await cursor.execute( + "SELECT BookId, Title, Body FROM libbannotations " + "WHERE BookId IN (SELECT BookId FROM libbook);" + ) + + while (rows := await cursor.fetchmany(32)): + await self.postgres_pool.executemany( + "INSERT INTO book_annotations (book, title, text) " + "SELECT " + "(SELECT id FROM books WHERE source = $1 AND remote_id = $2), " + "$3, $4 " + "ON CONFLICT (book) " + "DO UPDATE SET title = EXCLUDED.title, text = EXCLUDED.text;", + [[self.SOURCE, *row] for row in rows] + ) + + await self.log("Book_annotations updated!") + + await self._update_book_annotations_pic() + + async def _update_book_annotations_pic(self): + await self.log("Update book_annotations_pic...") + + async with self.mysql_pool.acquire() as conn: + async with conn.cursor() as cursor: + await cursor.execute( + "SELECT BookId, File FROM libbpics;" + ) + + while (rows := await cursor.fetchmany(32)): + await self.postgres_pool.executemany( + "UPDATE book_annotations " + "SET file = cast($3 as varchar) " + "FROM (SELECT id FROM books WHERE source = $1 AND remote_id = $2) as books " + "WHERE book = books.id;", + [[self.SOURCE, *row] for row in rows] + ) + + await self.log("Book_annotation_pics updated!") + + async def _update_author_annotations(self): + await self.authors_updated_event.wait() + + await self.log("Update author_annotations...") + + async with self.mysql_pool.acquire() as conn: + async with conn.cursor() as cursor: + await cursor.execute( + "SELECT AvtorId, Title, Body FROM libaannotations;" + ) + + while (rows := await cursor.fetchmany(32)): + await self.postgres_pool.executemany( + "INSERT INTO author_annotations (author, title, text) " + "SELECT " + "(SELECT id FROM authors WHERE source = $1 AND remote_id = $2), " + "$3, $4 " + "ON CONFLICT (author) " + "DO UPDATE SET title = EXCLUDED.title, text = EXCLUDED.text;", + [[self.SOURCE, *row] for row in rows] + ) + + await self.log("Author_annotation_updated!") + + await self._update_author_annotations_pics() + + async def _update_author_annotations_pics(self): + await self.log("Update author_annotations_pic...") + + async with self.mysql_pool.acquire() as conn: + async with conn.cursor() as cursor: + await cursor.execute( + "SELECT AvtorId, File FROM libapics;" + ) + + while (rows := await cursor.fetchmany(32)): + await self.postgres_pool.executemany( + "UPDATE author_annotations " + "SET file = cast($3 as varchar) " + "FROM (SELECT id FROM authors WHERE source = $1 AND remote_id = $2) as authors " + "WHERE author = authors.id;", + [[self.SOURCE, *row] for row in rows] + ) + + await self.log("Author_annotatioins_pic updated!") + + async def _update_genres(self): + await self.log("Update genres...") + + async with self.mysql_pool.acquire() as conn: + async with conn.cursor() as cursor: + await cursor.execute( + "SELECT GenreId, GenreCode, GenreDesc, GenreMeta FROM libgenrelist;" + ) + + while (rows := await cursor.fetchmany(32)): + await self.postgres_pool.executemany( + "INSERT INTO genres (source, remote_id, code, description, meta) " + "VALUES ($1, $2, cast($3 as varchar), cast($4 as varchar), cast($5 as varchar)) " + "ON CONFLICT (source, remote_id) " + "DO UPDATE SET code = EXCLUDED.code, description = EXCLUDED.description, meta = EXCLUDED.meta;", + [[self.SOURCE, *row] for row in rows] + ) + + await self.log("Genres updated!") + + async def _update_books_genres(self): + await self.books_updated_event.wait() + await self.genres_updated_event.wait() + + await self.log("Update book_genres...") + + async with self.mysql_pool.acquire() as conn: + async with conn.cursor() as cursor: + await cursor.execute( + "SELECT BookId, GenreId FROM libgenre;" + ) + + while (rows := await cursor.fetchmany(32)): + await self.postgres_pool.executemany( + "INSERT INTO book_genres (book, genre) " + "SELECT " + "(SELECT id FROM books WHERE source = $1 AND remote_id = $2), " + "(SELECT id FROM genres WHERE source = $1 AND remote_id = $3) " + "ON CONFLICT (book, author) DO NOTHING;", + [(self.SOURCE, *row) for row in rows] + ) + + await self.log("Book_genres updated!") + + async def _update(self) -> bool: + self.platform = platform.platform() + self.logger = Logger.with_default_handlers() + + await self._prepare() + + await self._drop_tables() + + await asyncio.gather( + *[self._import_dump(filename) for filename in self.FILES] + ) + + await self._set_source() + + self.authors_updated_event = asyncio.Event() + self.books_updated_event = asyncio.Event() + self.sequences_updated_event = asyncio.Event() + self.genres_updated_event = asyncio.Event() + + await asyncio.gather( + self._update_authors(), + self._update_books(), + self._update_books_authors(), + self._update_translations(), + self._update_sequences(), + self._update_sequences_info(), + self._update_book_annotations(), + self._update_author_annotations(), + self._update_genres(), + self._update_books_genres() + ) + + return True + + @classmethod + async def update(cls) -> bool: + updater = cls() + return await updater._update() diff --git a/src/app/views.py b/src/app/views.py new file mode 100644 index 0000000..4ff99f2 --- /dev/null +++ b/src/app/views.py @@ -0,0 +1,21 @@ +from fastapi import APIRouter, BackgroundTasks, Depends + +from app.services.updaters import UpdaterTypes, UPDATERS +from app.depends import check_token + + +router = APIRouter( + tags=["updater"], + dependencies=[Depends(check_token)] +) + + +@router.post("/update/{updater}") +async def update(updater: UpdaterTypes, background_tasks: BackgroundTasks): + updater_ = UPDATERS[updater] + + background_tasks.add_task( + updater_.update + ) + + return True diff --git a/src/core/app.py b/src/core/app.py new file mode 100644 index 0000000..4f37812 --- /dev/null +++ b/src/core/app.py @@ -0,0 +1,11 @@ +from fastapi import FastAPI + +from app.views import router + + +def start_app() -> FastAPI: + app = FastAPI() + + app.include_router(router) + + return app diff --git a/src/core/auth.py b/src/core/auth.py new file mode 100644 index 0000000..7cc07b5 --- /dev/null +++ b/src/core/auth.py @@ -0,0 +1,4 @@ +from fastapi.security import APIKeyHeader + + +default_security = APIKeyHeader(name="Authorization") diff --git a/src/core/config.py b/src/core/config.py new file mode 100644 index 0000000..d7a2ed1 --- /dev/null +++ b/src/core/config.py @@ -0,0 +1,20 @@ +from pydantic import BaseSettings + + +class EnvConfig(BaseSettings): + API_KEY: str + + POSTGRES_DB_NAME: str + POSTGRES_HOST: str + POSTGRES_PORT: int + POSTGRES_USER: str + POSTGRES_PASSWORD: str + + MYSQL_DB_NAME: str + MYSQL_HOST: str + MYSQL_PORT: int + MYSQL_USER: str + MYSQL_PASSWORD: str + + +env_config = EnvConfig() diff --git a/src/main.py b/src/main.py new file mode 100644 index 0000000..2739482 --- /dev/null +++ b/src/main.py @@ -0,0 +1,3 @@ +from core.app import start_app + +app = start_app()