This commit is contained in:
2023-08-06 22:22:57 +02:00
parent 06592bcb65
commit f31a4645c9
8 changed files with 330 additions and 236 deletions

View File

@@ -13,12 +13,6 @@ jobs:
name: Checkout name: Checkout
uses: actions/checkout@v3 uses: actions/checkout@v3
-
name: Set up QEMU
uses: docker/setup-qemu-action@v2
with:
platforms: arm64
- -
name: Set up Docker Buildx name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2 uses: docker/setup-buildx-action@v2
@@ -44,7 +38,7 @@ jobs:
IMAGE: ${{ steps.repository_name.outputs.lowercase }} IMAGE: ${{ steps.repository_name.outputs.lowercase }}
with: with:
push: true push: true
platforms: linux/amd64,linux/arm64 platforms: linux/amd64
tags: ghcr.io/${{ env.IMAGE }}:latest tags: ghcr.io/${{ env.IMAGE }}:latest
context: . context: .
file: ./docker/production.dockerfile file: ./docker/production.dockerfile

View File

@@ -0,0 +1,63 @@
use std::fmt;
use base64::{engine::general_purpose, Engine};
use reqwest::StatusCode;
use tempfile::SpooledTempFile;
use smartstring::alias::String as SmartString;
use crate::config;
use super::utils::response_to_tempfile;
#[derive(Debug, Clone)]
struct DownloadError {
status_code: StatusCode,
}
impl fmt::Display for DownloadError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Status code is {0}", self.status_code)
}
}
impl std::error::Error for DownloadError {}
pub async fn download(
book_id: u64,
file_type: SmartString,
) -> Result<(SpooledTempFile, String), Box<dyn std::error::Error + Send + Sync>> {
let mut response = reqwest::Client::new()
.get(format!(
"{}/api/v1/download/{book_id}/{file_type}",
&config::CONFIG.cache_url
))
.header("Authorization", &config::CONFIG.cache_api_key)
.send()
.await?
.error_for_status()?;
if response.status() != StatusCode::OK {
return Err(Box::new(DownloadError {
status_code: response.status(),
}));
};
let headers = response.headers();
let base64_encoder = general_purpose::STANDARD;
let filename = std::str::from_utf8(
&base64_encoder
.decode(headers.get("x-filename-b64").unwrap())
.unwrap(),
)
.unwrap()
.to_string();
let output_file = response_to_tempfile(&mut response).await.unwrap();
Ok((output_file.0, filename))
}

19
src/services/minio.rs Normal file
View File

@@ -0,0 +1,19 @@
use minio_rsc::{provider::StaticProvider, Minio};
use crate::config;
pub fn get_minio() -> Minio {
let provider = StaticProvider::new(
&config::CONFIG.minio_access_key,
&config::CONFIG.minio_secret_key,
None
);
return Minio::builder()
.host(&config::CONFIG.minio_host)
.provider(provider)
.secure(false)
.build()
.unwrap()
}

View File

@@ -2,3 +2,4 @@ pub mod task_creator;
pub mod library_client; pub mod library_client;
pub mod utils; pub mod utils;
pub mod downloader; pub mod downloader;
pub mod minio;

View File

@@ -1,37 +1,23 @@
use std::{fmt, io::{Seek, Read}}; use std::io::Seek;
use base64::{engine::general_purpose, Engine}; use minio_rsc::types::args::{ObjectArgs, PresignedArgs};
use bytes::Bytes;
use minio_rsc::{provider::StaticProvider, Minio, types::args::{ObjectArgs, PresignedArgs}, errors::MinioError};
use reqwest::StatusCode;
use smallvec::SmallVec; use smallvec::SmallVec;
use smartstring::alias::String as SmartString; use smartstring::alias::String as SmartString;
use tempfile::SpooledTempFile; use tempfile::SpooledTempFile;
use tracing::log;
use translit::{Transliterator, gost779b_ru, CharsMapping}; use translit::{Transliterator, gost779b_ru, CharsMapping};
use zip::write::FileOptions; use zip::write::FileOptions;
use async_stream::stream;
use crate::{structures::{CreateTask, Task, ObjectType}, config, views::TASK_RESULTS}; use crate::{structures::{CreateTask, Task, ObjectType}, config, views::TASK_RESULTS, services::{downloader::download, utils::{get_stream, get_filename}, minio::get_minio}};
use super::{library_client::{Book, get_sequence_books, get_author_books, get_translator_books, Page, get_sequence, get_author}, utils::response_to_tempfile}; use super::{library_client::{Book, get_sequence_books, get_author_books, get_translator_books, Page, get_sequence, get_author}, utils::get_key};
pub fn get_key(
input_data: CreateTask
) -> String {
let mut data = input_data.clone();
data.allowed_langs.sort();
let data_string = serde_json::to_string(&data).unwrap();
format!("{:x}", md5::compute(data_string))
}
pub async fn get_books<Fut>( pub async fn get_books<Fut>(
object_id: u32, object_id: u32,
allowed_langs: SmallVec<[SmartString; 3]>, allowed_langs: SmallVec<[SmartString; 3]>,
books_getter: fn(id: u32, page: u32, allowed_langs: SmallVec<[SmartString; 3]>) -> Fut books_getter: fn(id: u32, page: u32, allowed_langs: SmallVec<[SmartString; 3]>) -> Fut,
file_format: SmartString
) -> Result<Vec<Book>, Box<dyn std::error::Error + Send + Sync>> ) -> Result<Vec<Book>, Box<dyn std::error::Error + Send + Sync>>
where where
Fut: std::future::Future<Output = Result<Page<Book>, Box<dyn std::error::Error + Send + Sync>>>, Fut: std::future::Future<Output = Result<Page<Book>, Box<dyn std::error::Error + Send + Sync>>>,
@@ -58,106 +44,80 @@ where
current_page += 1; current_page += 1;
}; };
let result = result
.iter()
.filter(|book| book.available_types.contains(&file_format.to_string()))
.map(|b| b.clone())
.collect();
Ok(result) Ok(result)
} }
#[derive(Debug, Clone)]
struct DownloadError { pub async fn set_task_error(key: String, error_message: String) {
status_code: StatusCode, let task = Task {
id: key.clone(),
status: crate::structures::TaskStatus::Failled,
status_description: "Ошибка!".to_string(),
error_message: Some(error_message),
result_filename: None,
result_link: None
};
TASK_RESULTS.insert(key, task.clone()).await;
} }
impl fmt::Display for DownloadError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { pub async fn set_progress_description(key: String, description: String) {
write!(f, "Status code is {0}", self.status_code) let task = Task {
id: key.clone(),
status: crate::structures::TaskStatus::InProgress,
status_description: description,
error_message: None,
result_filename: None,
result_link: None
};
TASK_RESULTS.insert(key, task.clone()).await;
}
pub async fn upload_to_minio(archive: SpooledTempFile, filename: String) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
let minio = get_minio();
let is_bucket_exist = match minio.bucket_exists(&config::CONFIG.minio_bucket).await {
Ok(v) => v,
Err(err) => return Err(Box::new(err)),
};
if !is_bucket_exist {
let _ = minio.make_bucket(&config::CONFIG.minio_bucket, false).await;
} }
}
impl std::error::Error for DownloadError {} let data_stream = get_stream(Box::new(archive));
pub async fn download( if let Err(err) = minio.put_object_stream(
book_id: u64, ObjectArgs::new(&config::CONFIG.minio_bucket, filename.clone()),
file_type: String, Box::pin(data_stream)
) -> Result<Option<(SpooledTempFile, String)>, Box<dyn std::error::Error + Send + Sync>> { ).await {
let mut response = reqwest::Client::new() return Err(Box::new(err));
.get(format!(
"{}/api/v1/download/{book_id}/{file_type}",
&config::CONFIG.cache_url
))
.header("Authorization", &config::CONFIG.cache_api_key)
.send()
.await?
.error_for_status()?;
if response.status() != StatusCode::OK {
return Err(Box::new(DownloadError {
status_code: response.status(),
}));
};
let headers = response.headers();
let base64_encoder = general_purpose::STANDARD;
let filename = std::str::from_utf8(
&base64_encoder
.decode(headers.get("x-filename-b64").unwrap())
.unwrap(),
)
.unwrap()
.to_string();
let output_file = match response_to_tempfile(&mut response).await {
Some(v) => v.0,
None => return Ok(None),
};
Ok(Some((output_file, filename)))
}
fn get_stream(mut temp_file: Box<dyn Read + Send>) -> impl futures_core::Stream<Item = Result<Bytes, MinioError>> {
stream! {
let mut buf = [0; 2048];
loop {
match temp_file.read(&mut buf) {
Ok(count) => {
if count == 0 {
break;
}
yield Ok(Bytes::copy_from_slice(&buf[0..count]))
},
Err(_) => break
}
}
} }
}
let link = match minio.presigned_get_object(
pub async fn create_archive_task(key: String, data: CreateTask) { PresignedArgs::new(&config::CONFIG.minio_bucket, filename)
let books = match data.object_type { ).await {
ObjectType::Sequence => get_books(data.object_id, data.allowed_langs, get_sequence_books).await,
ObjectType::Author => get_books(data.object_id, data.allowed_langs, get_author_books).await,
ObjectType::Translator => get_books(data.object_id, data.allowed_langs, get_translator_books).await,
};
let books = match books {
Ok(v) => v, Ok(v) => v,
Err(err) => { Err(err) => {
return; // log error and task error return Err(Box::new(err));
}, },
}; };
let books: Vec<_> = books Ok(link)
.iter() }
.filter(|book| book.available_types.contains(&data.file_format))
.collect();
if books.is_empty() {
return; // log error and task error
}
pub async fn create_archive(key: String, books: Vec<Book>, file_format: SmartString) -> Result<SpooledTempFile, Box<dyn std::error::Error + Send + Sync>> {
let output_file = tempfile::spooled_tempfile(5 * 1024 * 1024); let output_file = tempfile::spooled_tempfile(5 * 1024 * 1024);
let mut archive = zip::ZipWriter::new(output_file); let mut archive = zip::ZipWriter::new(output_file);
@@ -166,152 +126,96 @@ pub async fn create_archive_task(key: String, data: CreateTask) {
.compression_method(zip::CompressionMethod::Deflated) .compression_method(zip::CompressionMethod::Deflated)
.unix_permissions(0o755); .unix_permissions(0o755);
for book in books { let books_count = books.len();
let (mut tmp_file, filename) = match download(book.id, data.file_format.clone()).await {
Ok(v) => { for (index, book) in books.iter().enumerate() {
match v { let (mut tmp_file, filename) = match download(book.id, file_format.clone()).await {
Some(v) => v, Ok(v) => v,
None => { Err(_) => continue,
return; // log error and task error
},
}
},
Err(err) => {
return; // log error and task error
},
}; };
match archive.start_file(filename, options) { match archive.start_file(filename, options) {
Ok(_) => (), Ok(_) => (),
Err(_) => return, // log error and task error Err(err) => return Err(Box::new(err)),
}; };
match std::io::copy(&mut tmp_file, &mut archive) { match std::io::copy(&mut tmp_file, &mut archive) {
Ok(_) => (), Ok(_) => (),
Err(_) => return, // log error and task error Err(err) => return Err(Box::new(err)),
}; };
set_progress_description(key.clone(), format!("Обработано: {}/{}", index + 1, books_count)).await;
} }
let mut archive_result = match archive.finish() { let mut archive_result = match archive.finish() {
Ok(v) => v, Ok(v) => v,
Err(err) => return, // log error and task error Err(err) => return Err(Box::new(err)),
}; };
archive_result.rewind().unwrap(); archive_result.rewind().unwrap();
let result_filename = match data.object_type { Ok(archive_result)
ObjectType::Sequence => { }
match get_sequence(data.object_id).await {
Ok(v) => v.name,
Err(err) => { pub async fn create_archive_task(key: String, data: CreateTask) {
println!("{}", err); let books = match data.object_type {
return; // log error and task error ObjectType::Sequence => get_books(data.object_id, data.allowed_langs, get_sequence_books, data.file_format.clone()).await,
}, ObjectType::Author => get_books(data.object_id, data.allowed_langs, get_author_books, data.file_format.clone()).await,
} ObjectType::Translator => get_books(data.object_id, data.allowed_langs, get_translator_books, data.file_format.clone()).await,
}, };
ObjectType::Author | ObjectType::Translator => {
match get_author(data.object_id).await { let books = match books {
Ok(v) => { Ok(v) => v,
vec![v.first_name, v.last_name, v.middle_name.unwrap_or("".to_string())] Err(err) => {
.into_iter() set_task_error(key.clone(), "Failed getting books!".to_string()).await;
.filter(|v| !v.is_empty()) log::error!("{}", err);
.collect::<Vec<String>>() return;
.join("_")
},
Err(err) => {
println!("{}", err);
return; // log error and task error
},
}
}, },
}; };
let final_filename = { if books.is_empty() {
let transliterator = Transliterator::new(gost779b_ru()); set_task_error(key.clone(), "No books!".to_string()).await;
return;
let mut filename_without_type = transliterator.convert(&result_filename, false);
"(),….!\"?»«':".get(..).into_iter().for_each(|char| {
filename_without_type = filename_without_type.replace(char, "");
});
let replace_char_map: CharsMapping = [
("", "-"),
("/", "_"),
("", "N"),
(" ", "_"),
("", "-"),
("á", "a"),
(" ", "_"),
("'", ""),
("`", ""),
("[", ""),
("]", ""),
("\"", ""),
].to_vec();
let replace_transliterator = Transliterator::new(replace_char_map);
let normal_filename = replace_transliterator.convert(&filename_without_type, false);
let normal_filename = normal_filename.replace(|c: char| !c.is_ascii(), "");
let right_part = format!(".zip");
let normal_filename_slice = std::cmp::min(64 - right_part.len() - 1, normal_filename.len() - 1);
let left_part = if normal_filename_slice == normal_filename.len() - 1 {
&normal_filename
} else {
normal_filename.get(..normal_filename_slice).unwrap_or_else(|| panic!("Can't slice left part: {:?} {:?}", normal_filename, normal_filename_slice))
};
format!("{left_part}{right_part}")
};
let provider = StaticProvider::new(
&config::CONFIG.minio_access_key,
&config::CONFIG.minio_secret_key,
None
);
let minio = Minio::builder()
.host(&config::CONFIG.minio_host)
.provider(provider)
.secure(false)
.build()
.unwrap();
let is_bucket_exist = match minio.bucket_exists(&config::CONFIG.minio_bucket).await {
Ok(v) => v,
Err(err) => {
println!("{}", err);
return; // log error and task error
}, // log error and task error
};
if !is_bucket_exist {
minio.make_bucket(&config::CONFIG.minio_bucket, false).await;
} }
let data_stream = get_stream(Box::new(archive_result)); let final_filename = match get_filename(data.object_type, data.object_id).await {
if let Err(err) = minio.put_object_stream(
ObjectArgs::new(&config::CONFIG.minio_bucket, final_filename.clone()),
Box::pin(data_stream)
).await {
println!("{}", err);
return; // log error and task error
}
let link = match minio.presigned_get_object(
PresignedArgs::new(&config::CONFIG.minio_bucket, final_filename)
).await {
Ok(v) => v, Ok(v) => v,
Err(err) => { Err(err) => {
println!("{}", err); set_task_error(key.clone(), "Can't get archive name!".to_string()).await;
return; // log error and task error log::error!("{}", err);
}, // log error and task error return;
},
}; };
println!("{}", link); let archive_result = match create_archive(books, data.file_format).await {
Ok(v) => v,
Err(err) => {
set_task_error(key.clone(), "Failed downloading books!".to_string()).await;
log::error!("{}", err);
return;
},
};
let link = match upload_to_minio(archive_result, final_filename.clone()).await {
Ok(v) => v,
Err(err) => {
set_task_error(key.clone(), "Failed uploading archive!".to_string()).await;
log::error!("{}", err);
return;
},
};
let task = Task {
id: key.clone(),
status: crate::structures::TaskStatus::Complete,
status_description: "Архив готов!".to_string(),
error_message: None,
result_filename: Some(final_filename),
result_link: Some(link)
};
TASK_RESULTS.insert(key.clone(), task.clone()).await;
} }
@@ -323,6 +227,8 @@ pub async fn create_task(
let task = Task { let task = Task {
id: key.clone(), id: key.clone(),
status: crate::structures::TaskStatus::InProgress, status: crate::structures::TaskStatus::InProgress,
status_description: "Подготовка".to_string(),
error_message: None,
result_filename: None, result_filename: None,
result_link: None result_link: None
}; };

View File

@@ -1,9 +1,27 @@
use minio_rsc::errors::MinioError;
use reqwest::Response; use reqwest::Response;
use tempfile::SpooledTempFile; use tempfile::SpooledTempFile;
use bytes::Buf; use bytes::{Buf, Bytes};
use async_stream::stream;
use translit::{gost779b_ru, Transliterator, CharsMapping};
use std::io::{Seek, SeekFrom, Write, Read};
use crate::structures::{CreateTask, ObjectType};
use super::library_client::{get_sequence, get_author};
use std::io::{Seek, SeekFrom, Write}; pub fn get_key(
input_data: CreateTask
) -> String {
let mut data = input_data.clone();
data.allowed_langs.sort();
let data_string = serde_json::to_string(&data).unwrap();
format!("{:x}", md5::compute(data_string))
}
pub async fn response_to_tempfile(res: &mut Response) -> Option<(SpooledTempFile, usize)> { pub async fn response_to_tempfile(res: &mut Response) -> Option<(SpooledTempFile, usize)> {
@@ -38,3 +56,93 @@ pub async fn response_to_tempfile(res: &mut Response) -> Option<(SpooledTempFile
Some((tmp_file, data_size)) Some((tmp_file, data_size))
} }
pub fn get_stream(mut temp_file: Box<dyn Read + Send>) -> impl futures_core::Stream<Item = Result<Bytes, MinioError>> {
stream! {
let mut buf = [0; 2048];
loop {
match temp_file.read(&mut buf) {
Ok(count) => {
if count == 0 {
break;
}
yield Ok(Bytes::copy_from_slice(&buf[0..count]))
},
Err(_) => break
}
}
}
}
pub async fn get_filename(object_type: ObjectType, object_id: u32) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
let result_filename = match object_type {
ObjectType::Sequence => {
match get_sequence(object_id).await {
Ok(v) => v.name,
Err(err) => {
return Err(err);
},
}
},
ObjectType::Author | ObjectType::Translator => {
match get_author(object_id).await {
Ok(v) => {
vec![v.first_name, v.last_name, v.middle_name.unwrap_or("".to_string())]
.into_iter()
.filter(|v| !v.is_empty())
.collect::<Vec<String>>()
.join("_")
},
Err(err) => {
return Err(err);
},
}
},
};
let final_filename = {
let transliterator = Transliterator::new(gost779b_ru());
let mut filename_without_type = transliterator.convert(&result_filename, false);
"(),….!\"?»«':".get(..).into_iter().for_each(|char| {
filename_without_type = filename_without_type.replace(char, "");
});
let replace_char_map: CharsMapping = [
("", "-"),
("/", "_"),
("", "N"),
(" ", "_"),
("", "-"),
("á", "a"),
(" ", "_"),
("'", ""),
("`", ""),
("[", ""),
("]", ""),
("\"", ""),
].to_vec();
let replace_transliterator = Transliterator::new(replace_char_map);
let normal_filename = replace_transliterator.convert(&filename_without_type, false);
let normal_filename = normal_filename.replace(|c: char| !c.is_ascii(), "");
let right_part = format!(".zip");
let normal_filename_slice = std::cmp::min(64 - right_part.len() - 1, normal_filename.len() - 1);
let left_part = if normal_filename_slice == normal_filename.len() - 1 {
&normal_filename
} else {
normal_filename.get(..normal_filename_slice).unwrap_or_else(|| panic!("Can't slice left part: {:?} {:?}", normal_filename, normal_filename_slice))
};
format!("{left_part}{right_part}")
};
Ok(final_filename)
}

View File

@@ -8,7 +8,8 @@ use smartstring::alias::String as SmartString;
pub enum TaskStatus { pub enum TaskStatus {
InProgress, InProgress,
Archiving, Archiving,
Complete Complete,
Failled
} }
#[derive(Serialize, Deserialize, Clone)] #[derive(Serialize, Deserialize, Clone)]
@@ -23,7 +24,7 @@ pub enum ObjectType {
pub struct CreateTask{ pub struct CreateTask{
pub object_id: u32, pub object_id: u32,
pub object_type: ObjectType, pub object_type: ObjectType,
pub file_format: String, pub file_format: SmartString,
pub allowed_langs: SmallVec<[SmartString; 3]> pub allowed_langs: SmallVec<[SmartString; 3]>
} }
@@ -31,6 +32,8 @@ pub struct CreateTask{
pub struct Task { pub struct Task {
pub id: String, pub id: String,
pub status: TaskStatus, pub status: TaskStatus,
pub status_description: String,
pub error_message: Option<String>,
pub result_filename: Option<String>, pub result_filename: Option<String>,
pub result_link: Option<String> pub result_link: Option<String>
} }

View File

@@ -8,7 +8,7 @@ use tower_http::trace::{TraceLayer, self};
use tracing::Level; use tracing::Level;
use crate::{config::CONFIG, structures::{Task, CreateTask}, services::task_creator::{get_key, create_task}}; use crate::{config::CONFIG, structures::{Task, CreateTask}, services::{task_creator::create_task, utils::get_key}};
pub static TASK_RESULTS: Lazy<Cache<String, Task>> = Lazy::new(|| { pub static TASK_RESULTS: Lazy<Cache<String, Task>> = Lazy::new(|| {