From f82607ccef5bcc9144740af74c347d99d22fc373 Mon Sep 17 00:00:00 2001 From: Bulat Kurbanov Date: Thu, 3 Mar 2022 18:01:03 +0300 Subject: [PATCH] Refactoring for reduce RAM usage --- poetry.lock | 47 ++++--- src/app/services/base.py | 4 +- src/app/services/fl_downloader.py | 226 ++++++++++++++++++++++-------- src/app/services/utils.py | 47 +++++-- src/app/views.py | 7 +- 5 files changed, 227 insertions(+), 104 deletions(-) diff --git a/poetry.lock b/poetry.lock index 0ae2db5..f46a4ea 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,6 +1,6 @@ [[package]] name = "anyio" -version = "3.4.0" +version = "3.5.0" description = "High level compatibility layer for multiple asynchronous event loop implementations" category = "main" optional = false @@ -11,17 +11,17 @@ idna = ">=2.8" sniffio = ">=1.1" [package.extras] -doc = ["sphinx-rtd-theme", "sphinx-autodoc-typehints (>=1.2.0)"] +doc = ["packaging", "sphinx-rtd-theme", "sphinx-autodoc-typehints (>=1.2.0)"] test = ["coverage[toml] (>=4.5)", "hypothesis (>=4.0)", "pytest (>=6.0)", "pytest-mock (>=3.6.1)", "trustme", "contextlib2", "uvloop (<0.15)", "mock (>=4)", "uvloop (>=0.15)"] trio = ["trio (>=0.16)"] [[package]] name = "asgiref" -version = "3.4.1" +version = "3.5.0" description = "ASGI specs, helper code, and adapters" category = "main" optional = false -python-versions = ">=3.6" +python-versions = ">=3.7" [package.extras] tests = ["pytest", "pytest-asyncio", "mypy (>=0.800)"] @@ -36,7 +36,7 @@ python-versions = "*" [[package]] name = "charset-normalizer" -version = "2.0.9" +version = "2.0.12" description = "The Real First Universal Charset Detector. Open, modern and actively maintained alternative to Chardet." category = "main" optional = false @@ -47,7 +47,7 @@ unicode_backport = ["unicodedata2"] [[package]] name = "click" -version = "8.0.3" +version = "8.0.4" description = "Composable command line interface toolkit" category = "main" optional = false @@ -92,7 +92,7 @@ python-versions = ">=3.6" [[package]] name = "httpcore" -version = "0.14.3" +version = "0.14.7" description = "A minimal low-level HTTP client." category = "main" optional = false @@ -106,10 +106,11 @@ sniffio = ">=1.0.0,<2.0.0" [package.extras] http2 = ["h2 (>=3,<5)"] +socks = ["socksio (>=1.0.0,<2.0.0)"] [[package]] name = "httpx" -version = "0.21.1" +version = "0.21.3" description = "The next generation HTTP client." category = "main" optional = false @@ -230,7 +231,7 @@ six = ">=1.1.0" [[package]] name = "typing-extensions" -version = "4.0.1" +version = "4.1.1" description = "Backported and Experimental Type Hints for Python 3.6+" category = "main" optional = false @@ -259,24 +260,24 @@ content-hash = "64dbb0d31d03be39512a3def521deb69d4ffbb6fc31c7a66e8d2e7b7f4888611 [metadata.files] anyio = [ - {file = "anyio-3.4.0-py3-none-any.whl", hash = "sha256:2855a9423524abcdd652d942f8932fda1735210f77a6b392eafd9ff34d3fe020"}, - {file = "anyio-3.4.0.tar.gz", hash = "sha256:24adc69309fb5779bc1e06158e143e0b6d2c56b302a3ac3de3083c705a6ed39d"}, + {file = "anyio-3.5.0-py3-none-any.whl", hash = "sha256:b5fa16c5ff93fa1046f2eeb5bbff2dad4d3514d6cda61d02816dba34fa8c3c2e"}, + {file = "anyio-3.5.0.tar.gz", hash = "sha256:a0aeffe2fb1fdf374a8e4b471444f0f3ac4fb9f5a5b542b48824475e0042a5a6"}, ] asgiref = [ - {file = "asgiref-3.4.1-py3-none-any.whl", hash = "sha256:ffc141aa908e6f175673e7b1b3b7af4fdb0ecb738fc5c8b88f69f055c2415214"}, - {file = "asgiref-3.4.1.tar.gz", hash = "sha256:4ef1ab46b484e3c706329cedeff284a5d40824200638503f5768edb6de7d58e9"}, + {file = "asgiref-3.5.0-py3-none-any.whl", hash = "sha256:88d59c13d634dcffe0510be048210188edd79aeccb6a6c9028cdad6f31d730a9"}, + {file = "asgiref-3.5.0.tar.gz", hash = "sha256:2f8abc20f7248433085eda803936d98992f1343ddb022065779f37c5da0181d0"}, ] certifi = [ {file = "certifi-2021.10.8-py2.py3-none-any.whl", hash = "sha256:d62a0163eb4c2344ac042ab2bdf75399a71a2d8c7d47eac2e2ee91b9d6339569"}, {file = "certifi-2021.10.8.tar.gz", hash = "sha256:78884e7c1d4b00ce3cea67b44566851c4343c120abd683433ce934a68ea58872"}, ] charset-normalizer = [ - {file = "charset-normalizer-2.0.9.tar.gz", hash = "sha256:b0b883e8e874edfdece9c28f314e3dd5badf067342e42fb162203335ae61aa2c"}, - {file = "charset_normalizer-2.0.9-py3-none-any.whl", hash = "sha256:1eecaa09422db5be9e29d7fc65664e6c33bd06f9ced7838578ba40d58bdf3721"}, + {file = "charset-normalizer-2.0.12.tar.gz", hash = "sha256:2857e29ff0d34db842cd7ca3230549d1a697f96ee6d3fb071cfa6c7393832597"}, + {file = "charset_normalizer-2.0.12-py3-none-any.whl", hash = "sha256:6881edbebdb17b39b4eaaa821b438bf6eddffb4468cf344f09f89def34a8b1df"}, ] click = [ - {file = "click-8.0.3-py3-none-any.whl", hash = "sha256:353f466495adaeb40b6b5f592f9f91cb22372351c84caeb068132442a4518ef3"}, - {file = "click-8.0.3.tar.gz", hash = "sha256:410e932b050f5eed773c4cda94de75971c89cdb3155a72a0831139a79e5ecb5b"}, + {file = "click-8.0.4-py3-none-any.whl", hash = "sha256:6a7a62563bbfabfda3a38f3023a1db4a35978c0abd76f6c9605ecd6554d6d9b1"}, + {file = "click-8.0.4.tar.gz", hash = "sha256:8458d7b1287c5fb128c90e23381cf99dcde74beaf6c7ff6384ce84d6fe090adb"}, ] colorama = [ {file = "colorama-0.4.4-py2.py3-none-any.whl", hash = "sha256:9f47eda37229f68eee03b24b9748937c7dc3868f906e8ba69fbcbdd3bc5dc3e2"}, @@ -291,12 +292,12 @@ h11 = [ {file = "h11-0.12.0.tar.gz", hash = "sha256:47222cb6067e4a307d535814917cd98fd0a57b6788ce715755fa2b6c28b56042"}, ] httpcore = [ - {file = "httpcore-0.14.3-py3-none-any.whl", hash = "sha256:9a98d2416b78976fc5396ff1f6b26ae9885efbb3105d24eed490f20ab4c95ec1"}, - {file = "httpcore-0.14.3.tar.gz", hash = "sha256:d10162a63265a0228d5807964bd964478cbdb5178f9a2eedfebb2faba27eef5d"}, + {file = "httpcore-0.14.7-py3-none-any.whl", hash = "sha256:47d772f754359e56dd9d892d9593b6f9870a37aeb8ba51e9a88b09b3d68cfade"}, + {file = "httpcore-0.14.7.tar.gz", hash = "sha256:7503ec1c0f559066e7e39bc4003fd2ce023d01cf51793e3c173b864eb456ead1"}, ] httpx = [ - {file = "httpx-0.21.1-py3-none-any.whl", hash = "sha256:208e5ef2ad4d105213463cfd541898ed9d11851b346473539a8425e644bb7c66"}, - {file = "httpx-0.21.1.tar.gz", hash = "sha256:02af20df486b78892a614a7ccd4e4e86a5409ec4981ab0e422c579a887acad83"}, + {file = "httpx-0.21.3-py3-none-any.whl", hash = "sha256:df9a0fd43fa79dbab411d83eb1ea6f7a525c96ad92e60c2d7f40388971b25777"}, + {file = "httpx-0.21.3.tar.gz", hash = "sha256:7a3eb67ef0b8abbd6d9402248ef2f84a76080fa1c839f8662e6eb385640e445a"}, ] idna = [ {file = "idna-3.3-py3-none-any.whl", hash = "sha256:84d9dd047ffa80596e0f246e2eab0b391788b0503584e8945f2368256d2735ff"}, @@ -368,8 +369,8 @@ transliterate = [ {file = "transliterate-1.10.2.tar.gz", hash = "sha256:bc608e0d48e687db9c2b1d7ea7c381afe0d1849cad216087d8e03d8d06a57c85"}, ] typing-extensions = [ - {file = "typing_extensions-4.0.1-py3-none-any.whl", hash = "sha256:7f001e5ac290a0c0401508864c7ec868be4e701886d5b573a9528ed3973d9d3b"}, - {file = "typing_extensions-4.0.1.tar.gz", hash = "sha256:4ca091dea149f945ec56afb48dae714f21e8692ef22a395223bcd328961b6a0e"}, + {file = "typing_extensions-4.1.1-py3-none-any.whl", hash = "sha256:21c85e0fe4b9a155d0799430b0ad741cdce7e359660ccbd8b530613e8df88ce2"}, + {file = "typing_extensions-4.1.1.tar.gz", hash = "sha256:1a9462dcc3347a79b1f1c0271fbe79e844580bb598bafa1ed208b94da3cdcd42"}, ] uvicorn = [ {file = "uvicorn-0.16.0-py3-none-any.whl", hash = "sha256:d8c839231f270adaa6d338d525e2652a0b4a5f4c2430b5c4ef6ae4d11776b0d2"}, diff --git a/src/app/services/base.py b/src/app/services/base.py index a500092..83ecae4 100644 --- a/src/app/services/base.py +++ b/src/app/services/base.py @@ -1,9 +1,9 @@ -from typing import Protocol, Optional +from typing import Protocol, Optional, AsyncIterator class BaseDownloader(Protocol): @classmethod async def download( cls, remote_id: int, file_type: str, source_id: int - ) -> Optional[tuple[bytes, str]]: + ) -> Optional[tuple[AsyncIterator[bytes], str]]: ... diff --git a/src/app/services/fl_downloader.py b/src/app/services/fl_downloader.py index aa0ff49..9a5b109 100644 --- a/src/app/services/fl_downloader.py +++ b/src/app/services/fl_downloader.py @@ -1,10 +1,14 @@ import asyncio -from typing import Optional, cast +import os +import tempfile +from typing import IO, Optional, AsyncIterator, cast + +from fastapi import UploadFile import httpx from app.services.base import BaseDownloader -from app.services.book_library import BookLibraryClient, Book +from app.services.book_library import BookLibraryClient from app.services.utils import zip, unzip, get_filename, process_pool_executor from core.config import env_config, SourceConfig @@ -27,7 +31,8 @@ class FLDownloader(BaseDownloader): self.original_file_type = file_type self.source_id = source_id - self.book: Optional[Book] = None + self.get_book_data_task = asyncio.create_task(self._get_book_data()) + self.get_content_task = asyncio.create_task(self._get_content()) @property def file_type(self): @@ -41,10 +46,11 @@ class FLDownloader(BaseDownloader): if not self.get_book_data_task.done(): await asyncio.wait_for(self.get_book_data_task, None) - if self.book is None: + book = self.get_book_data_task.result() + if book is None: raise ValueError("Book is None!") - return get_filename(self.book_id, self.book, self.file_type) + return get_filename(self.book_id, book, self.file_type) async def get_final_filename(self) -> str: if self.need_zip: @@ -53,8 +59,8 @@ class FLDownloader(BaseDownloader): return await self.get_filename() async def _download_from_source( - self, source_config: SourceConfig, file_type: str = None - ) -> tuple[bytes, bool]: + self, source_config: SourceConfig, file_type: Optional[str] = None + ) -> tuple[httpx.AsyncClient, httpx.Response, bool]: basic_url: str = source_config.URL proxy: Optional[str] = source_config.PROXY @@ -65,28 +71,59 @@ class FLDownloader(BaseDownloader): else: url = basic_url + f"/b/{self.book_id}/download" - httpx_proxy = None - if proxy is not None: - httpx_proxy = httpx.Proxy(url=proxy) + client_kwargs = {"timeout": 10 * 60, "follow_redirects": True} - async with httpx.AsyncClient(proxies=httpx_proxy) as client: - response = await client.get(url, follow_redirects=True, timeout=10 * 60) + if proxy is not None: + client = httpx.AsyncClient(proxies=httpx.Proxy(url=proxy), **client_kwargs) + else: + client = httpx.AsyncClient(**client_kwargs) + + request = client.build_request( + "GET", + url, + ) + try: + response = await client.send(request, stream=True) + except asyncio.CancelledError: + await client.aclose() + raise + + try: content_type = response.headers.get("Content-Type") if response.status_code != 200: + await response.aclose() + await client.aclose() raise NotSuccess(f"Status code is {response.status_code}!") if "text/html" in content_type: + await response.aclose() + await client.aclose() raise ReceivedHTML() - if "application/zip" in content_type: - return response.content, True + return client, response, "application/zip" in content_type + except asyncio.CancelledError: + await client.aclose() + await client.aclose() + raise - return response.content, False + @classmethod + async def _close_other_done( + cls, + done_tasks: set[asyncio.Task[tuple[httpx.AsyncClient, httpx.Response, bool]]], + ): + for task in done_tasks: + try: + data = task.result() + + await data[0].aclose() + await data[1].aclose() + except (NotSuccess, ReceivedHTML, ConvertationError): + continue async def _wait_until_some_done( - self, tasks: set[asyncio.Task] - ) -> Optional[tuple[bytes, bool]]: + self, tasks: set[asyncio.Task[tuple[httpx.AsyncClient, httpx.Response, bool]]] + ) -> Optional[tuple[httpx.AsyncClient, httpx.Response, bool]]: tasks_ = tasks while tasks_: @@ -96,11 +133,15 @@ class FLDownloader(BaseDownloader): for task in done: try: - data = cast(tuple[bytes, bool], task.result()) + data = task.result() for p_task in pending: p_task.cancel() + await self._close_other_done( + {ttask for ttask in done if ttask != task} + ) + return data except (NotSuccess, ReceivedHTML, ConvertationError): continue @@ -109,7 +150,31 @@ class FLDownloader(BaseDownloader): return None - async def _download_with_converting(self) -> tuple[bytes, bool]: + async def _write_response_content_to_ntf(self, ntf, response: httpx.Response): + temp_file = UploadFile(await self.get_filename(), ntf) + + async for chunk in response.aiter_bytes(2048): + await temp_file.write(chunk) + + temp_file.file.flush() + + await temp_file.seek(0) + + return temp_file.file + + async def _unzip(self, response: httpx.Response): + with tempfile.NamedTemporaryFile() as ntf: + await self._write_response_content_to_ntf(ntf, response) + + internal_tempfile_name = await asyncio.get_event_loop().run_in_executor( + process_pool_executor, unzip, ntf.name, "fb2" + ) + + return internal_tempfile_name + + async def _download_with_converting( + self, + ) -> tuple[httpx.AsyncClient, httpx.Response, bool]: tasks = set() for source in env_config.FL_SOURCES: @@ -122,76 +187,115 @@ class FLDownloader(BaseDownloader): if data is None: raise ValueError - content, is_zip = data + client, response, is_zip = data - if is_zip: - content = await asyncio.get_event_loop().run_in_executor( - process_pool_executor, unzip, content, "fb2" - ) + is_temp_file = False + try: + if is_zip: + file_to_convert_name = await self._unzip(response) + else: + file_to_convert = tempfile.NamedTemporaryFile() + await self._write_response_content_to_ntf(file_to_convert, response) + file_to_convert_name = file_to_convert.name + is_temp_file = True + finally: + await response.aclose() + await client.aclose() - async with httpx.AsyncClient() as client: - form = {"format": self.file_type} - files = {"file": content} - response = await client.post( - env_config.CONVERTER_URL, data=form, files=files, timeout=2 * 60 - ) + form = {"format": self.file_type} + files = {"file": open(file_to_convert_name, "rb")} - if response.status_code != 200: - raise ConvertationError - - return content, False - - async def _get_book_data(self): - self.book = await BookLibraryClient.get_remote_book( - self.source_id, self.book_id + converter_client = httpx.AsyncClient(timeout=2 * 60) + converter_request = converter_client.build_request( + "POST", env_config.CONVERTER_URL, data=form, files=files ) + try: + converter_response = await converter_client.send( + converter_request, stream=True + ) + except asyncio.CancelledError: + await converter_client.aclose() + raise + finally: + if is_temp_file: + await asyncio.get_event_loop().run_in_executor( + process_pool_executor, os.remove, file_to_convert_name + ) - async def _get_content(self) -> Optional[tuple[bytes, str]]: + if response.status_code != 200: + raise ConvertationError + + try: + return converter_client, converter_response, False + except asyncio.CancelledError: + await converter_response.aclose() + await converter_client.aclose() + raise + + async def _get_content(self) -> Optional[tuple[AsyncIterator[bytes], str]]: tasks = set() - if self.file_type in ["epub", "mobi"]: - tasks.add(asyncio.create_task(self._download_with_converting())) - for source in env_config.FL_SOURCES: tasks.add(asyncio.create_task(self._download_from_source(source))) + if self.file_type in ["epub", "mobi"]: + tasks.add(asyncio.create_task(self._download_with_converting())) + data = await self._wait_until_some_done(tasks) if data is None: return None - content, is_zip = data + client, response, is_zip = data - if content is None or is_zip is None: - return None + try: + if is_zip: + temp_file_name = await self._unzip(response) + else: - if is_zip: - content = await asyncio.get_event_loop().run_in_executor( - process_pool_executor, unzip, content, self.file_type - ) + temp_file = tempfile.NamedTemporaryFile() + await self._write_response_content_to_ntf(temp_file, response) + temp_file_name = temp_file.name + finally: + await response.aclose() + await client.aclose() + is_unziped_temp_file = False if self.need_zip: - content = await asyncio.get_event_loop().run_in_executor( - process_pool_executor, zip, await self.get_filename(), content + content_filename = await asyncio.get_event_loop().run_in_executor( + process_pool_executor, zip, await self.get_filename(), temp_file_name ) + is_unziped_temp_file = True + else: + content_filename = temp_file_name - return content, await self.get_final_filename() + content = cast(IO, open(content_filename, "rb")) - async def _download(self): - self.get_book_data_task = asyncio.create_task(self._get_book_data()) + async def _content_iterator() -> AsyncIterator[bytes]: + t_file = UploadFile(await self.get_filename(), content) + try: + while chunk := await t_file.read(2048): + yield cast(bytes, chunk) + finally: + await t_file.close() + if is_unziped_temp_file: + await asyncio.get_event_loop().run_in_executor( + process_pool_executor, os.remove, content_filename + ) - tasks = [ - asyncio.create_task(self._get_content()), - self.get_book_data_task, - ] + return _content_iterator(), await self.get_final_filename() - await asyncio.wait(tasks) + async def _get_book_data(self): + return await BookLibraryClient.get_remote_book(self.source_id, self.book_id) - return tasks[0].result() + async def _download(self) -> Optional[tuple[AsyncIterator[bytes], str]]: + await asyncio.wait([self.get_book_data_task, self.get_content_task]) + + return self.get_content_task.result() @classmethod async def download( cls, remote_id: int, file_type: str, source_id: int - ) -> Optional[tuple[bytes, str]]: + ) -> Optional[tuple[AsyncIterator[bytes], str]]: downloader = cls(remote_id, file_type, source_id) return await downloader._download() diff --git a/src/app/services/utils.py b/src/app/services/utils.py index dbbcbe2..050a13c 100644 --- a/src/app/services/utils.py +++ b/src/app/services/utils.py @@ -1,5 +1,6 @@ from concurrent.futures.process import ProcessPoolExecutor -import io +import re +import tempfile import zipfile import transliterate @@ -10,33 +11,49 @@ from app.services.book_library import Book, BookAuthor process_pool_executor = ProcessPoolExecutor(2) -def unzip(file_bytes: bytes, file_type: str): - zip_file = zipfile.ZipFile(io.BytesIO(file_bytes)) +def unzip(temp_zipfile, file_type: str): + result = tempfile.NamedTemporaryFile(delete=False) + + zip_file = zipfile.ZipFile(temp_zipfile) for name in zip_file.namelist(): # type: str - if file_type in name.lower(): - return zip_file.read(name) + if file_type.lower() in name.lower(): + with zip_file.open(name, "r") as internal_file: + while chunk := internal_file.read(2048): + result.write(chunk) + + result.seek(0) + return result.name + raise FileNotFoundError -def zip(filename, content): - buffer = io.BytesIO() +def zip( + filename: str, + content_filename: str, +) -> str: + result = tempfile.NamedTemporaryFile(delete=False) + zip_file = zipfile.ZipFile( - file=buffer, + file=result, mode="w", compression=zipfile.ZIP_DEFLATED, allowZip64=False, compresslevel=9, ) - zip_file.writestr(filename, content) + + with open(content_filename, "rb") as content: + with zip_file.open(filename, "w") as internal_file: + while chunk := content.read(2048): + internal_file.write(chunk) for zfile in zip_file.filelist: zfile.create_system = 0 zip_file.close() - buffer.seek(0) + result.close() - return buffer.read() + return result.name def get_short_name(author: BookAuthor) -> str: @@ -71,8 +88,7 @@ def get_filename(book_id: int, book: Book, file_type: str) -> str: filename = "".join(filename_parts) - if book.lang in ["ru"]: - filename = transliterate.translit(filename, "ru", reversed=True) + filename = transliterate.translit(filename, reversed=True) for c in "(),….’!\"?»«':": filename = filename.replace(c, "") @@ -85,9 +101,12 @@ def get_filename(book_id: int, book: Book, file_type: str) -> str: ("–", "-"), ("á", "a"), (" ", "_"), + ("'", ""), ): filename = filename.replace(c, r) + filename = re.sub(r"[^\x00-\x7f]", r"", filename) + right_part = f".{book_id}.{file_type_}" - return filename[: 64 - len(right_part)] + right_part + return filename[: 64 - len(right_part) - 1] + right_part diff --git a/src/app/views.py b/src/app/views.py index c3e2acf..a239cb9 100644 --- a/src/app/views.py +++ b/src/app/views.py @@ -1,4 +1,5 @@ from fastapi import APIRouter, Depends, Response, status +from fastapi.responses import StreamingResponse from app.depends import check_token from app.services.book_library import BookLibraryClient @@ -23,7 +24,7 @@ async def download(source_id: int, remote_id: int, file_type: str): content, filename = result - return Response( + return StreamingResponse( content, headers={"Content-Disposition": f"attachment; filename={filename}"} ) @@ -35,9 +36,7 @@ async def get_filename(book_id: int, file_type: str): return _get_filename(book.remote_id, book, file_type) -healthcheck_router = APIRouter( - tags=["healthcheck"] -) +healthcheck_router = APIRouter(tags=["healthcheck"]) @healthcheck_router.get("/healthcheck")