This commit is contained in:
2022-04-24 21:46:22 +03:00
parent 01d0263156
commit a21788181d
5 changed files with 89 additions and 69 deletions

29
poetry.lock generated
View File

@@ -1,3 +1,11 @@
[[package]]
name = "aiofiles"
version = "0.8.0"
description = "File support for asyncio."
category = "main"
optional = false
python-versions = ">=3.6,<4.0"
[[package]]
name = "anyio"
version = "3.5.0"
@@ -26,6 +34,17 @@ python-versions = ">=3.7"
[package.extras]
tests = ["pytest", "pytest-asyncio", "mypy (>=0.800)"]
[[package]]
name = "asynctempfile"
version = "0.5.0"
description = "Async version of tempfile"
category = "main"
optional = false
python-versions = "*"
[package.dependencies]
aiofiles = ">=0.6.0"
[[package]]
name = "certifi"
version = "2021.10.8"
@@ -326,9 +345,13 @@ test = ["aiohttp", "flake8 (>=3.9.2,<3.10.0)", "psutil", "pycodestyle (>=2.7.0,<
[metadata]
lock-version = "1.1"
python-versions = "^3.9"
content-hash = "a4d999cff0a76d5061b5232d43b0f2c97db1489605017522b6a5ec6aefd850a8"
content-hash = "2361449346e203acb78f61fb55439c60af4e847c17045a56cb3101c6119b78a8"
[metadata.files]
aiofiles = [
{file = "aiofiles-0.8.0-py3-none-any.whl", hash = "sha256:7a973fc22b29e9962d0897805ace5856e6a566ab1f0c8e5c91ff6c866519c937"},
{file = "aiofiles-0.8.0.tar.gz", hash = "sha256:8334f23235248a3b2e83b2c3a78a22674f39969b96397126cc93664d9a901e59"},
]
anyio = [
{file = "anyio-3.5.0-py3-none-any.whl", hash = "sha256:b5fa16c5ff93fa1046f2eeb5bbff2dad4d3514d6cda61d02816dba34fa8c3c2e"},
{file = "anyio-3.5.0.tar.gz", hash = "sha256:a0aeffe2fb1fdf374a8e4b471444f0f3ac4fb9f5a5b542b48824475e0042a5a6"},
@@ -337,6 +360,10 @@ asgiref = [
{file = "asgiref-3.5.0-py3-none-any.whl", hash = "sha256:88d59c13d634dcffe0510be048210188edd79aeccb6a6c9028cdad6f31d730a9"},
{file = "asgiref-3.5.0.tar.gz", hash = "sha256:2f8abc20f7248433085eda803936d98992f1343ddb022065779f37c5da0181d0"},
]
asynctempfile = [
{file = "asynctempfile-0.5.0-py3-none-any.whl", hash = "sha256:cec59bdb71c850e3de9bb4415f88998165c364709696240eea9ec5204a7439af"},
{file = "asynctempfile-0.5.0.tar.gz", hash = "sha256:4a647c747357e8827397baadbdfe87f3095d30923fa789e797111eb02160884a"},
]
certifi = [
{file = "certifi-2021.10.8-py2.py3-none-any.whl", hash = "sha256:d62a0163eb4c2344ac042ab2bdf75399a71a2d8c7d47eac2e2ee91b9d6339569"},
{file = "certifi-2021.10.8.tar.gz", hash = "sha256:78884e7c1d4b00ce3cea67b44566851c4343c120abd683433ce934a68ea58872"},

View File

@@ -14,6 +14,7 @@ prometheus-fastapi-instrumentator = "^5.7.1"
uvloop = "^0.16.0"
gunicorn = "^20.1.0"
sentry-sdk = "^1.5.10"
asynctempfile = "^0.5.0"
[tool.poetry.dev-dependencies]

View File

@@ -0,0 +1,10 @@
class NotSuccess(Exception):
pass
class ReceivedHTML(Exception):
pass
class ConvertationError(Exception):
pass

View File

@@ -1,30 +1,18 @@
import asyncio
import os
import tempfile
from typing import IO, Optional, AsyncIterator, cast
from fastapi import UploadFile
from typing import Optional, AsyncIterator, cast
import aiofiles
import aiofiles.os
import asynctempfile
import httpx
from app.services.base import BaseDownloader
from app.services.book_library import BookLibraryClient
from app.services.exceptions import NotSuccess, ReceivedHTML, ConvertationError
from app.services.utils import zip, unzip, get_filename, process_pool_executor
from core.config import env_config, SourceConfig
class NotSuccess(Exception):
pass
class ReceivedHTML(Exception):
pass
class ConvertationError(Exception):
pass
class FLDownloader(BaseDownloader):
EXCLUDE_UNZIP = ["html"]
@@ -120,7 +108,7 @@ class FLDownloader(BaseDownloader):
await data[0].aclose()
await data[1].aclose()
except (NotSuccess, ReceivedHTML, ConvertationError):
except (NotSuccess, ReceivedHTML, ConvertationError, FileNotFoundError):
continue
async def _wait_until_some_done(
@@ -145,35 +133,28 @@ class FLDownloader(BaseDownloader):
)
return data
except (NotSuccess, ReceivedHTML, ConvertationError):
except (NotSuccess, ReceivedHTML, ConvertationError, FileNotFoundError):
continue
tasks_ = pending
return None
async def _write_response_content_to_ntf(self, ntf, response: httpx.Response):
temp_file = UploadFile(await self.get_filename(), ntf)
async def _write_response_content_to_ntf(self, temp_file, response: httpx.Response):
async for chunk in response.aiter_bytes(2048):
await temp_file.write(chunk)
temp_file.file.flush()
await temp_file.flush()
await temp_file.seek(0)
return temp_file.file
async def _unzip(self, response: httpx.Response) -> Optional[str]:
async with asynctempfile.NamedTemporaryFile(delete=False) as temp_file:
await self._write_response_content_to_ntf(temp_file, response)
async def _unzip(self, response: httpx.Response):
with tempfile.NamedTemporaryFile() as ntf:
await self._write_response_content_to_ntf(ntf, response)
internal_tempfile_name = await asyncio.get_event_loop().run_in_executor(
process_pool_executor, unzip, ntf.name, "fb2"
return await asyncio.get_event_loop().run_in_executor(
process_pool_executor, unzip, temp_file.name, "fb2"
)
return internal_tempfile_name
async def _download_with_converting(
self,
) -> tuple[httpx.AsyncClient, httpx.Response, bool]:
@@ -191,26 +172,25 @@ class FLDownloader(BaseDownloader):
client, response, is_zip = data
is_temp_file = False
try:
if is_zip:
file_to_convert_name = await self._unzip(response)
filename_to_convert = await self._unzip(response)
else:
file_to_convert = tempfile.NamedTemporaryFile()
await self._write_response_content_to_ntf(file_to_convert, response)
file_to_convert_name = file_to_convert.name
is_temp_file = True
async with asynctempfile.NamedTemporaryFile(delete=False) as temp_file:
await self._write_response_content_to_ntf(temp_file, response)
filename_to_convert = temp_file.name
finally:
await response.aclose()
await client.aclose()
form = {"format": self.file_type}
files = {"file": open(file_to_convert_name, "rb")}
files = {"file": open(filename_to_convert, "rb")}
converter_client = httpx.AsyncClient(timeout=2 * 60)
converter_request = converter_client.build_request(
"POST", env_config.CONVERTER_URL, data=form, files=files
)
try:
converter_response = await converter_client.send(
converter_request, stream=True
@@ -219,19 +199,17 @@ class FLDownloader(BaseDownloader):
await converter_client.aclose()
raise
finally:
if is_temp_file:
await asyncio.get_event_loop().run_in_executor(
process_pool_executor, os.remove, file_to_convert_name
)
if response.status_code != 200:
raise ConvertationError
await aiofiles.os.remove(filename_to_convert)
try:
if response.status_code != 200:
raise ConvertationError
return converter_client, converter_response, False
except asyncio.CancelledError:
await converter_response.aclose()
await converter_client.aclose()
await aiofiles.os.remove(filename_to_convert)
raise
async def _get_content(self) -> Optional[tuple[AsyncIterator[bytes], str]]:
@@ -252,40 +230,32 @@ class FLDownloader(BaseDownloader):
try:
if is_zip and self.file_type.lower() not in self.EXCLUDE_UNZIP:
temp_file_name = await self._unzip(response)
temp_filename = await self._unzip(response)
else:
temp_file = tempfile.NamedTemporaryFile()
await self._write_response_content_to_ntf(temp_file, response)
temp_file_name = temp_file.name
async with asynctempfile.NamedTemporaryFile() as temp_file:
temp_filename = temp_file.name
await self._write_response_content_to_ntf(temp_file, response)
finally:
await response.aclose()
await client.aclose()
is_unziped_temp_file = False
if self.need_zip:
content_filename = await asyncio.get_event_loop().run_in_executor(
process_pool_executor, zip, await self.get_filename(), temp_file_name
process_pool_executor, zip, await self.get_filename(), temp_filename
)
is_unziped_temp_file = True
await aiofiles.os.remove(temp_filename)
else:
content_filename = temp_file_name
content = cast(IO, open(content_filename, "rb"))
content_filename = temp_filename
force_zip = is_zip and self.file_type.lower() in self.EXCLUDE_UNZIP
async def _content_iterator() -> AsyncIterator[bytes]:
t_file = UploadFile(await self.get_final_filename(force_zip), content)
try:
while chunk := await t_file.read(2048):
yield cast(bytes, chunk)
async with aiofiles.open(content_filename) as temp_file:
while chunk := await temp_file.read(2048):
yield cast(bytes, chunk)
finally:
await t_file.close()
if is_unziped_temp_file:
await asyncio.get_event_loop().run_in_executor(
process_pool_executor, os.remove, content_filename
)
await aiofiles.os.remove(content_filename)
return _content_iterator(), await self.get_final_filename(force_zip)

View File

@@ -1,6 +1,8 @@
from concurrent.futures.process import ProcessPoolExecutor
import os
import re
import tempfile
from typing import Optional
import zipfile
import transliterate
@@ -11,7 +13,15 @@ from app.services.book_library import Book, BookAuthor
process_pool_executor = ProcessPoolExecutor(2)
def unzip(temp_zipfile, file_type: str):
def remove_temp_file(filename: str) -> bool:
try:
os.remove(filename)
return True
except OSError:
return False
def unzip(temp_zipfile: str, file_type: str) -> Optional[str]:
result = tempfile.NamedTemporaryFile(delete=False)
zip_file = zipfile.ZipFile(temp_zipfile)
@@ -24,6 +34,9 @@ def unzip(temp_zipfile, file_type: str):
result.seek(0)
return result.name
result.close()
remove_temp_file(result.name)
raise FileNotFoundError
@@ -50,7 +63,6 @@ def zip(
zfile.create_system = 0
zip_file.close()
result.close()
return result.name