This commit is contained in:
2023-06-06 01:02:13 +02:00
parent 40994db39d
commit 08e7a049b4
23 changed files with 1890 additions and 0 deletions

18
src/app/depends.py Normal file
View File

@@ -0,0 +1,18 @@
from fastapi import HTTPException, Request, Security, status
from redis.asyncio import Redis
from taskiq import TaskiqDepends
from core.auth import default_security
from core.config import env_config
async def check_token(api_key: str = Security(default_security)):
if api_key != env_config.API_KEY:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Wrong api key!"
)
def get_redis(request: Request = TaskiqDepends()) -> Redis:
return request.app.state.redis

10
src/app/serializers.py Normal file
View File

@@ -0,0 +1,10 @@
from pydantic import BaseModel
from app.services.task_creator import ObjectType
class CreateTaskData(BaseModel):
object_id: int
object_type: ObjectType
file_format: str
allowed_langs: list[str]

View File

@@ -0,0 +1,233 @@
import asyncio
from base64 import b64decode
from io import BytesIO
import tempfile
from typing import cast
import uuid
import zipfile
import httpx
from minio import Minio
from redis.asyncio import Redis
from taskiq import TaskiqDepends
from taskiq.task import AsyncTaskiqTask
from transliterate import translit
from app.depends import get_redis
from app.services.library_client import LibraryClient
from app.services.task_manager import ObjectType, TaskManager, TaskStatusEnum
from core.config import env_config
from core.taskiq_broker import broker, result_backend
def get_minio_client():
return Minio(
env_config.MINIO_HOST,
access_key=env_config.MINIO_ACCESS_KEY,
secret_key=env_config.MINIO_SECRET_KEY,
secure=False,
)
async def _download_to_tmpfile(
book_id: int, file_type: str, output: tempfile.SpooledTemporaryFile
) -> tuple[str, int] | None:
async with httpx.AsyncClient() as client:
request = client.build_request(
"get",
f"{env_config.CACHE_URL}/api/v1/download/{book_id}/{file_type}",
headers={"Authorization": env_config.CACHE_API_KEY},
)
response = await client.send(request, stream=True)
if response.status_code != 200:
await response.aclose()
return None
filename = b64decode(response.headers["X-Filename-B64"]).decode()
loop = asyncio.get_running_loop()
async for chunk in response.aiter_bytes(2048):
await loop.run_in_executor(None, output.write, chunk)
await loop.run_in_executor(None, output.flush)
await loop.run_in_executor(None, output.seek, 0, 2)
size = await loop.run_in_executor(None, output.tell)
await loop.run_in_executor(None, output.seek, 0)
return filename, size
async def download_file_to_file(link: str, output: BytesIO) -> bool:
async with httpx.AsyncClient() as client:
request = client.build_request(
"get", link, headers={"Authorization": env_config.CACHE_API_KEY}
)
response = await client.send(request, stream=True)
if response.status_code != 200:
await response.aclose()
return False
loop = asyncio.get_running_loop()
async for chunk in response.aiter_bytes(2048):
await loop.run_in_executor(None, output.write, chunk)
return True
@broker.task()
async def download(task_id: uuid.UUID, book_id: int, file_type: str) -> str | None:
try:
with tempfile.SpooledTemporaryFile() as temp_file:
data = await _download_to_tmpfile(book_id, file_type, temp_file)
if data is None:
return None
filename, size = data
minio_client = get_minio_client()
loop = asyncio.get_event_loop()
await loop.run_in_executor(
None,
minio_client.put_object,
env_config.MINIO_BUCKET,
filename,
temp_file,
size,
)
return filename
finally:
await check_subtasks.kiq(task_id)
async def _check_subtasks(subtasks: list[str]) -> bool:
"""
Return `true` if all substask `.is_ready()`
"""
internal_subtasks = [
AsyncTaskiqTask(subtask, result_backend) for subtask in subtasks
]
for task in internal_subtasks:
task_is_ready = await task.is_ready()
if not task_is_ready:
return False
return True
@broker.task()
async def check_subtasks(task_id: uuid.UUID, redis: Redis = TaskiqDepends(get_redis)):
task = await TaskManager.get_task(redis, task_id)
if task is None:
return False
is_subtasks_ready = await _check_subtasks(task.subtasks)
if is_subtasks_ready:
await create_archive.kiq(task_id)
@broker.task()
async def create_archive(task_id: uuid.UUID, redis: Redis = TaskiqDepends(get_redis)):
task = await TaskManager.get_task(redis, task_id)
assert task
match task.object_type:
case ObjectType.SEQUENCE:
item = await LibraryClient.get_sequence(task.object_id)
assert item
name = item.name
case ObjectType.AUTHOR:
item = await LibraryClient.get_author(task.object_id)
assert item
names = [item.first_name, item.last_name, item.middle_name]
name = "_".join([i for i in names if i])
# TODO: test with `uk` and `be`
tr_name = translit(name, "ru", reversed=True, strict=True)
archive_filename = f"{item.id}_{tr_name}.zip"
assert item
task.status = TaskStatusEnum.ARCHIVING
await TaskManager.save_task(redis, task)
minio_client = get_minio_client()
loop = asyncio.get_running_loop()
with tempfile.SpooledTemporaryFile() as temp_zipfile:
zip_file = zipfile.ZipFile(
temp_zipfile,
mode="w",
compression=zipfile.ZIP_DEFLATED,
allowZip64=False,
compresslevel=9,
)
for subtask_id in task.subtasks:
subtask = AsyncTaskiqTask(subtask_id, result_backend)
result = await subtask.get_result()
if result.is_err:
continue
filename: str | None = result.return_value
if filename is None:
continue
book_file_link = await loop.run_in_executor(
None,
minio_client.get_presigned_url,
"GET",
env_config.MINIO_BUCKET,
filename,
)
with zip_file.open(filename, "w") as internal_zip_file:
await download_file_to_file(
book_file_link, cast(BytesIO, internal_zip_file)
)
await loop.run_in_executor(
None, minio_client.remove_object, env_config.MINIO_BUCKET, filename
)
zip_file.close()
await loop.run_in_executor(None, temp_zipfile.flush)
await loop.run_in_executor(None, temp_zipfile.seek, 0, 2)
size = await loop.run_in_executor(None, temp_zipfile.tell)
await loop.run_in_executor(None, temp_zipfile.seek, 0)
await loop.run_in_executor(
None,
minio_client.put_object,
env_config.MINIO_BUCKET,
archive_filename,
temp_zipfile,
size,
)
task.result_link = await loop.run_in_executor(
None,
minio_client.get_presigned_url,
"GET",
env_config.MINIO_BUCKET,
archive_filename,
)
await TaskManager.save_task(redis, task)

View File

@@ -0,0 +1,100 @@
from typing import Generic, TypeVar
import httpx
from pydantic import BaseModel
from pydantic.generics import GenericModel
from core.config import env_config
class SequenceBook(BaseModel):
id: int
available_types: list[str]
class AuthorBook(BaseModel):
id: int
available_types: list[str]
Item = TypeVar("Item", bound=BaseModel)
class Page(GenericModel, Generic[Item]):
items: list[Item]
total: int
page: int
size: int
total_pages: int
class Sequence(BaseModel):
id: int
name: str
class Author(BaseModel):
id: int
first_name: str
last_name: str
middle_name: str | None = None
class LibraryClient:
@staticmethod
async def get_sequence_books(
sequence_id: int, allowed_langs: list[str], page: int = 1
) -> Page[SequenceBook] | None:
async with httpx.AsyncClient() as client:
response = await client.get(
f"{env_config.LIBRARY_URL}/api/v1/sequences/{sequence_id}/books",
params={"page": page, "allowed_langs": allowed_langs},
headers={"Authorization": env_config.LIBRARY_API_KEY},
)
if response.status_code != 200:
return None
return Page[SequenceBook].parse_raw(response.text)
@staticmethod
async def get_author_books(
author_id: int, allowed_langs: list[str], page: int = 1
) -> Page[AuthorBook] | None:
async with httpx.AsyncClient() as client:
response = await client.get(
f"{env_config.LIBRARY_URL}/api/v1/authors/{author_id}/books",
params={"page": page, "allowed_langs": allowed_langs},
headers={"Authorization": env_config.LIBRARY_API_KEY},
)
if response.status_code != 200:
return None
return Page[AuthorBook].parse_raw(response.text)
@staticmethod
async def get_sequence(sequence_id: int) -> Sequence | None:
async with httpx.AsyncClient() as client:
response = await client.get(
f"{env_config.LIBRARY_URL}/api/v1/sequences/{sequence_id}",
headers={"Authorization": env_config.LIBRARY_API_KEY},
)
if response.status_code != 200:
return None
return Sequence.parse_raw(response.text)
@staticmethod
async def get_author(author_id: int) -> Author | None:
async with httpx.AsyncClient() as client:
response = await client.get(
f"{env_config.LIBRARY_URL}/api/v1/authors/{author_id}",
headers={"Authorization": env_config.LIBRARY_API_KEY},
)
if response.status_code != 200:
return None
return Author.parse_raw(response.text)

View File

@@ -0,0 +1,98 @@
import uuid
from pydantic import BaseModel
from redis.asyncio import Redis
from app.services.downloader import download
from app.services.library_client import LibraryClient, SequenceBook
from app.services.task_manager import ObjectType, Task, TaskManager
class CreateTaskError(BaseModel):
message: str
class TaskCreator:
@classmethod
async def _get_books(
cls, object_id: int, object_type: ObjectType, allowed_langs: list[str]
) -> list[SequenceBook] | CreateTaskError:
books = []
current_page = 1
pages_count = 1
match object_type:
case ObjectType.SEQUENCE:
books_getter = LibraryClient.get_sequence_books
case ObjectType.AUTHOR:
books_getter = LibraryClient.get_author_books
while current_page <= pages_count:
book_page = await books_getter(object_id, allowed_langs, page=current_page)
if book_page is None:
return CreateTaskError(message="Can't get books!")
books.extend(book_page.items)
current_page += 1
pages_count = book_page.total_pages
if len(books) == 0:
return CreateTaskError(message="No books!")
return books
@classmethod
async def _create_subtasks(
cls,
task_id: uuid.UUID,
object_id: int,
object_type: ObjectType,
file_format: str,
allowed_langs: list[str],
) -> list[str] | CreateTaskError:
books = await cls._get_books(object_id, object_type, allowed_langs)
if isinstance(books, CreateTaskError):
return books
task_ids: list[str] = []
for book in books:
if file_format not in book.available_types:
continue
task = await download.kiq(task_id, book.id, file_format)
task_ids.append(task.task_id)
if len(task_ids) == 0:
return CreateTaskError(message="No books to archive!")
return task_ids
@classmethod
async def create_task(
cls,
redis: Redis,
object_id: int,
object_type: ObjectType,
file_format: str,
allowed_langs: list[str],
) -> Task | CreateTaskError:
task_id = uuid.uuid4()
subtasks = await cls._create_subtasks(
task_id, object_id, object_type, file_format, allowed_langs
)
if isinstance(subtasks, CreateTaskError):
return subtasks
task = Task(
id=task_id, object_id=object_id, object_type=object_type, subtasks=subtasks
)
is_saved = await TaskManager.save_task(redis, task)
if not is_saved:
return CreateTaskError(message="Save task error")
return task

View File

@@ -0,0 +1,56 @@
import enum
import uuid
from pydantic import BaseModel
from redis.asyncio import Redis, RedisError
class TaskStatusEnum(enum.StrEnum):
IN_PROGRESS = "in_progress"
ARCHIVING = "archiving"
COMPLETE = "complete"
class ObjectType(enum.StrEnum):
SEQUENCE = "sequence"
AUTHOR = "author"
class Task(BaseModel):
id: uuid.UUID
object_id: int
object_type: ObjectType
subtasks: list[str]
status: TaskStatusEnum = TaskStatusEnum.IN_PROGRESS
result_link: str | None = None
class TaskManager:
@classmethod
def _get_key(cls, task_id: uuid.UUID) -> str:
return f"at_{task_id}"
@classmethod
async def save_task(cls, redis: Redis, task: Task) -> bool:
key = cls._get_key(task.id)
try:
data = task.json()
await redis.set(key, data, ex=60 * 60)
return True
except RedisError:
return False
@classmethod
async def get_task(cls, redis: Redis, task_id: uuid.UUID) -> Task | None:
key = cls._get_key(task_id)
try:
data = await redis.get(key)
if data is None:
return None
return Task.parse_raw(data)
except RedisError:
return None

44
src/app/views.py Normal file
View File

@@ -0,0 +1,44 @@
from typing import Annotated
import uuid
from fastapi import APIRouter, Depends, HTTPException, status
from redis.asyncio import Redis
from app.depends import check_token, get_redis
from app.serializers import CreateTaskData
from app.services.task_creator import CreateTaskError, TaskCreator
from app.services.task_manager import TaskManager
router = APIRouter(prefix="/api", dependencies=[Depends(check_token)])
@router.post("/")
async def create_archive_task(
redis: Annotated[Redis, Depends(get_redis)], data: CreateTaskData
):
task = await TaskCreator.create_task(
redis=redis,
object_id=data.object_id,
object_type=data.object_type,
file_format=data.file_format,
allowed_langs=data.allowed_langs,
)
if isinstance(task, CreateTaskError):
raise HTTPException(status.HTTP_400_BAD_REQUEST, task)
return task
@router.get("/check_archive/{task_id}")
async def check_archive_task_status(
redis: Annotated[Redis, Depends(get_redis)], task_id: uuid.UUID
):
task = await TaskManager.get_task(redis, task_id)
if task is None:
raise HTTPException(status.HTTP_404_NOT_FOUND)
return task

31
src/core/app.py Normal file
View File

@@ -0,0 +1,31 @@
from fastapi import FastAPI
from fastapi.responses import ORJSONResponse
from redis.asyncio import Redis
from app.views import router
from core.config import REDIS_URL
from core.taskiq_broker import broker
def start_app() -> FastAPI:
app = FastAPI(default_response_class=ORJSONResponse)
redis = Redis.from_url(REDIS_URL)
app.state.redis = redis
app.include_router(router)
@app.on_event("startup")
async def app_startup():
if not broker.is_worker_process:
await broker.startup()
@app.on_event("shutdown")
async def app_shutdown():
if not broker.is_worker_process:
await broker.shutdown()
await redis.close()
return app

4
src/core/auth.py Normal file
View File

@@ -0,0 +1,4 @@
from fastapi.security import APIKeyHeader
default_security = APIKeyHeader(name="Authorization")

30
src/core/config.py Normal file
View File

@@ -0,0 +1,30 @@
from pydantic import BaseSettings
class Config(BaseSettings):
API_KEY: str
REDIS_HOST: str
REDIS_PORT: int
REDIS_DB: int
REDIS_PASSWORD: str | None
MINIO_HOST: str
MINIO_BUCKET: str
MINIO_ACCESS_KEY: str
MINIO_SECRET_KEY: str
LIBRARY_API_KEY: str
LIBRARY_URL: str
CACHE_API_KEY: str
CACHE_URL: str
SENTRY_DSN: str | None
env_config = Config()
REDIS_URL = (
f"redis://{env_config.REDIS_HOST}:{env_config.REDIS_PORT}/{env_config.REDIS_DB}"
)

11
src/core/taskiq_broker.py Normal file
View File

@@ -0,0 +1,11 @@
import taskiq_fastapi
from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend
from core.config import REDIS_URL
result_backend = RedisAsyncResultBackend(redis_url=REDIS_URL, result_ex_time=5 * 60)
broker = ListQueueBroker(url=REDIS_URL).with_result_backend(result_backend)
taskiq_fastapi.init(broker, "main:app")

11
src/main.py Normal file
View File

@@ -0,0 +1,11 @@
import sentry_sdk
from core.app import start_app
from core.config import env_config
if env_config.SENTRY_DSN:
sentry_sdk.init(dsn=env_config.SENTRY_DSN)
app = start_app()