This commit is contained in:
2023-09-23 00:57:35 +02:00
parent 5fb4f6b7a7
commit 3fdad853f0
6 changed files with 149 additions and 283 deletions

View File

@@ -1,6 +1,6 @@
use std::io::Seek;
use minio_rsc::types::args::{ObjectArgs, PresignedArgs};
use minio_rsc::client::PresignedArgs;
use smallvec::SmallVec;
use smartstring::alias::String as SmartString;
use tempfile::SpooledTempFile;
@@ -59,7 +59,8 @@ pub async fn set_task_error(key: String, error_message: String) {
status_description: "Ошибка!".to_string(),
error_message: Some(error_message),
result_filename: None,
result_link: None
result_link: None,
content_size: None
};
TASK_RESULTS.insert(key, task.clone()).await;
@@ -73,7 +74,8 @@ pub async fn set_progress_description(key: String, description: String) {
status_description: description,
error_message: None,
result_filename: None,
result_link: None
result_link: None,
content_size: None
};
TASK_RESULTS.insert(key, task.clone()).await;
@@ -96,7 +98,8 @@ pub async fn upload_to_minio(archive: SpooledTempFile, filename: String) -> Resu
let data_stream = get_stream(Box::new(archive));
if let Err(err) = minio.put_object_stream(
ObjectArgs::new(&config::CONFIG.minio_bucket, filename.clone()),
&config::CONFIG.minio_bucket,
filename.clone(),
Box::pin(data_stream),
None
).await {
@@ -116,7 +119,7 @@ pub async fn upload_to_minio(archive: SpooledTempFile, filename: String) -> Resu
}
pub async fn create_archive(key: String, books: Vec<Book>, file_format: SmartString) -> Result<SpooledTempFile, Box<dyn std::error::Error + Send + Sync>> {
pub async fn create_archive(key: String, books: Vec<Book>, file_format: SmartString) -> Result<(SpooledTempFile, u64), Box<dyn std::error::Error + Send + Sync>> {
let output_file = tempfile::spooled_tempfile(5 * 1024 * 1024);
let mut archive = zip::ZipWriter::new(output_file);
@@ -126,6 +129,7 @@ pub async fn create_archive(key: String, books: Vec<Book>, file_format: SmartStr
.unix_permissions(0o755);
let books_count = books.len();
let mut bytes_count: u64 = 0;
for (index, book) in books.iter().enumerate() {
let (mut tmp_file, filename) = match download(book.id, file_format.clone()).await {
@@ -139,7 +143,7 @@ pub async fn create_archive(key: String, books: Vec<Book>, file_format: SmartStr
};
match std::io::copy(&mut tmp_file, &mut archive) {
Ok(_) => (),
Ok(file_bytes_count) => bytes_count += file_bytes_count,
Err(err) => return Err(Box::new(err)),
};
@@ -153,7 +157,7 @@ pub async fn create_archive(key: String, books: Vec<Book>, file_format: SmartStr
archive_result.rewind().unwrap();
Ok(archive_result)
Ok((archive_result, bytes_count))
}
@@ -191,7 +195,7 @@ pub async fn create_archive_task(key: String, data: CreateTask) {
set_progress_description(key.clone(), "Сборка архива...".to_string()).await;
let archive_result = match create_archive(key.clone(), books, data.file_format).await {
let (archive_result, content_size) = match create_archive(key.clone(), books, data.file_format).await {
Ok(v) => v,
Err(err) => {
set_task_error(key.clone(), "Failed downloading books!".to_string()).await;
@@ -217,7 +221,8 @@ pub async fn create_archive_task(key: String, data: CreateTask) {
status_description: "Архив готов! Ожидайте файл".to_string(),
error_message: None,
result_filename: Some(final_filename),
result_link: Some(link)
result_link: Some(link),
content_size: Some(content_size)
};
TASK_RESULTS.insert(key.clone(), task.clone()).await;
@@ -235,7 +240,8 @@ pub async fn create_task(
status_description: "Подготовка".to_string(),
error_message: None,
result_filename: None,
result_link: None
result_link: None,
content_size: None
};
TASK_RESULTS.insert(key.clone(), task.clone()).await;

View File

@@ -1,4 +1,4 @@
use minio_rsc::errors::MinioError;
use minio_rsc::error::Error;
use reqwest::Response;
use tempfile::SpooledTempFile;
use bytes::{Buf, Bytes};
@@ -59,7 +59,7 @@ pub async fn response_to_tempfile(res: &mut Response) -> Option<(SpooledTempFile
}
pub fn get_stream(mut temp_file: Box<dyn Read + Send>) -> impl futures_core::Stream<Item = Result<Bytes, MinioError>> {
pub fn get_stream(mut temp_file: Box<dyn Read + Send>) -> impl futures_core::Stream<Item = Result<Bytes, Error>> {
stream! {
let mut buf = [0; 2048];

View File

@@ -35,5 +35,6 @@ pub struct Task {
pub status_description: String,
pub error_message: Option<String>,
pub result_filename: Option<String>,
pub result_link: Option<String>
pub result_link: Option<String>,
pub content_size: Option<u64>
}

View File

@@ -1,6 +1,6 @@
use std::time::Duration;
use axum::{Router, routing::{get, post}, middleware::{self, Next}, http::{Request, StatusCode, self}, response::{Response, IntoResponse}, extract::{Path}, Json};
use axum::{Router, routing::{get, post}, middleware::{self, Next}, http::{Request, StatusCode, self}, response::{Response, IntoResponse}, extract::Path, Json};
use axum_prometheus::PrometheusMetricLayer;
use moka::future::Cache;
use once_cell::sync::Lazy;
@@ -24,7 +24,7 @@ async fn create_archive_task(
) -> impl IntoResponse {
let key = get_key(data.clone());
let result = match TASK_RESULTS.get(&key) {
let result = match TASK_RESULTS.get(&key).await {
Some(result) => {
if result.status == TaskStatus::Failed {
create_task(data).await
@@ -42,7 +42,7 @@ async fn create_archive_task(
async fn check_archive_task_status(
Path(task_id): Path<String>
) -> impl IntoResponse {
match TASK_RESULTS.get(&task_id) {
match TASK_RESULTS.get(&task_id).await {
Some(result) => Json::<Task>(result).into_response(),
None => StatusCode::NOT_FOUND.into_response(),
}