diff --git a/Cargo.lock b/Cargo.lock index 199a087..6cb32c1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -90,6 +90,15 @@ version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eab1c04a571841102f5345a8fc0f6bb3d31c315dec879b5c6e42e40ce7ffa34e" +[[package]] +name = "async-mutex" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "479db852db25d9dbf6204e6cb6253698f175c15726470f78af0d918e99d6156e" +dependencies = [ + "event-listener", +] + [[package]] name = "async-native-tls" version = "0.4.0" @@ -1085,6 +1094,12 @@ dependencies = [ "libc", ] +[[package]] +name = "event-listener" +version = "2.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" + [[package]] name = "failure" version = "0.1.8" @@ -1563,6 +1578,20 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-rustls" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d78e1e73ec14cf7375674f74d7dde185c8206fd9dea6fb6295e8a98098aaa97" +dependencies = [ + "futures-util", + "http", + "hyper", + "rustls 0.21.6", + "tokio", + "tokio-rustls 0.24.1", +] + [[package]] name = "hyper-tls" version = "0.5.0" @@ -2013,6 +2042,12 @@ version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e6bcd6433cff03a4bfc3d9834d504467db1f1cf6d0ea765d37d330249ed629d" +[[package]] +name = "md5" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771" + [[package]] name = "memchr" version = "2.5.0" @@ -2231,6 +2266,33 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" +[[package]] +name = "minio-rsc" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0958cac6e5ce437d21bf866c2e410f5e88f849150ab2d946702ed994729e2e1f" +dependencies = [ + "async-mutex", + "base64 0.21.2", + "bytes", + "chrono", + "futures", + "futures-core", + "futures-util", + "hex", + "hmac", + "hyper", + "md5 0.7.0", + "once_cell", + "quick-xml", + "regex", + "reqwest", + "serde", + "serde_json", + "sha2 0.10.7", + "urlencoding", +] + [[package]] name = "miniz_oxide" version = "0.7.1" @@ -2295,7 +2357,7 @@ dependencies = [ "percent-encoding", "rand 0.8.5", "rustc_version_runtime", - "rustls", + "rustls 0.20.8", "rustls-pemfile", "serde", "serde_bytes", @@ -2308,7 +2370,7 @@ dependencies = [ "take_mut", "thiserror", "tokio", - "tokio-rustls", + "tokio-rustls 0.23.4", "tokio-util 0.7.8", "trust-dns-proto", "trust-dns-resolver", @@ -3393,6 +3455,16 @@ version = "1.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" +[[package]] +name = "quick-xml" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81b9228215d82c7b61490fec1de287136b5de6f5700f6e58ea9ad61a7964ca51" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "quote" version = "1.0.32" @@ -3626,6 +3698,7 @@ dependencies = [ "http", "http-body", "hyper", + "hyper-rustls", "hyper-tls", "ipnet", "js-sys", @@ -3636,11 +3709,14 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", + "rustls 0.21.6", + "rustls-pemfile", "serde", "serde_json", "serde_urlencoded", "tokio", "tokio-native-tls 0.3.1", + "tokio-rustls 0.24.1", "tokio-util 0.7.8", "tower-service", "url", @@ -3648,6 +3724,7 @@ dependencies = [ "wasm-bindgen-futures", "wasm-streams", "web-sys", + "webpki-roots", "winreg 0.10.1", ] @@ -3803,6 +3880,18 @@ dependencies = [ "webpki", ] +[[package]] +name = "rustls" +version = "0.21.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d1feddffcfcc0b33f5c6ce9a29e341e4cd59c3f78e7ee45f4a40c038b1d6cbb" +dependencies = [ + "log", + "ring", + "rustls-webpki", + "sct", +] + [[package]] name = "rustls-pemfile" version = "1.0.3" @@ -3812,6 +3901,16 @@ dependencies = [ "base64 0.21.2", ] +[[package]] +name = "rustls-webpki" +version = "0.101.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "261e9e0888cba427c3316e6322805653c9425240b6fd96cee7cb671ab70ab8d0" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "rustversion" version = "1.0.14" @@ -4525,15 +4624,18 @@ dependencies = [ "axum", "axum-prometheus", "base64 0.21.2", + "bytes", "chrono", "futures", "futures-core", + "minio-rsc", "once_cell", "prisma-client-rust", "reqwest", "sentry", "serde", "serde_json", + "tempfile", "tokio", "tokio-util 0.7.8", "tower-http", @@ -4748,11 +4850,21 @@ version = "0.23.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59" dependencies = [ - "rustls", + "rustls 0.20.8", "tokio", "webpki", ] +[[package]] +name = "tokio-rustls" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" +dependencies = [ + "rustls 0.21.6", + "tokio", +] + [[package]] name = "tokio-util" version = "0.6.10" @@ -5124,6 +5236,12 @@ dependencies = [ "serde", ] +[[package]] +name = "urlencoding" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" + [[package]] name = "user-facing-error-macros" version = "0.1.0" @@ -5362,7 +5480,7 @@ checksum = "8f820cd208ce9c6b050812dc2d724ba98c6c1e9db5ce9b3f58d925ae5723a5e6" dependencies = [ "bitflags 1.3.2", "byteorder", - "md5", + "md5 0.6.1", "rand 0.7.3", "winapi", ] diff --git a/Cargo.toml b/Cargo.toml index 9d75400..c047626 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,3 +35,7 @@ base64 = "0.21.2" futures = "0.3.28" futures-core = "0.3.28" async-stream = "0.3.5" + +minio-rsc = "0.1.5" +tempfile = "3.7.0" +bytes = "1.4.0" diff --git a/src/config.rs b/src/config.rs index 6c8d37b..3c74159 100644 --- a/src/config.rs +++ b/src/config.rs @@ -9,6 +9,11 @@ pub struct Config { pub postgres_port: u32, pub postgres_db: String, + pub minio_host: String, + pub minio_bucket: String, + pub minio_access_key: String, + pub minio_secret_key: String, + pub downloader_api_key: String, pub downloader_url: String, @@ -38,6 +43,11 @@ impl Config { postgres_port: get_env("POSTGRES_PORT").parse().unwrap(), postgres_db: get_env("POSTGRES_DB"), + minio_host: get_env("MINIO_HOST"), + minio_bucket: get_env("MINIO_BUCKET"), + minio_access_key: get_env("MINIO_ACCESS_KEY"), + minio_secret_key: get_env("MINIO_SECRET_KEY"), + downloader_api_key: get_env("DOWNLOADER_API_KEY"), downloader_url: get_env("DOWNLOADER_URL"), diff --git a/src/services/download_utils.rs b/src/services/download_utils.rs index 9f54ac2..93b06c3 100644 --- a/src/services/download_utils.rs +++ b/src/services/download_utils.rs @@ -1,5 +1,9 @@ +use std::io::{Write, Seek, SeekFrom}; + +use bytes::Buf; use futures::TryStreamExt; use reqwest::Response; +use tempfile::SpooledTempFile; use tokio::io::AsyncRead; use tokio_util::compat::FuturesAsyncReadCompatExt; @@ -17,3 +21,36 @@ pub fn get_response_async_read(it: Response) -> impl AsyncRead { .into_async_read() .compat() } + +pub async fn response_to_tempfile(res: &mut Response) -> Option<(SpooledTempFile, usize)> { + let mut tmp_file = tempfile::spooled_tempfile(5 * 1024 * 1024); + + let mut data_size: usize = 0; + + { + loop { + let chunk = res.chunk().await; + + let result = match chunk { + Ok(v) => v, + Err(_) => return None, + }; + + let data = match result { + Some(v) => v, + None => break, + }; + + data_size += data.len(); + + match tmp_file.write(data.chunk()) { + Ok(_) => (), + Err(_) => return None, + } + } + + tmp_file.seek(SeekFrom::Start(0)).unwrap(); + } + + Some((tmp_file, data_size)) +} diff --git a/src/services/minio.rs b/src/services/minio.rs new file mode 100644 index 0000000..4b04885 --- /dev/null +++ b/src/services/minio.rs @@ -0,0 +1,73 @@ +use std::io::Read; + +use async_stream::stream; +use bytes::Bytes; +use minio_rsc::{provider::StaticProvider, Minio, types::args::{ObjectArgs, PresignedArgs}, errors::MinioError}; +use tempfile::SpooledTempFile; + +use crate::config; + + +pub fn get_minio() -> Minio { + let provider = StaticProvider::new( + &config::CONFIG.minio_access_key, + &config::CONFIG.minio_secret_key, + None + ); + + Minio::builder() + .host(&config::CONFIG.minio_host) + .provider(provider) + .secure(false) + .build() + .unwrap() +} + + +pub fn get_stream(mut temp_file: Box) -> impl futures_core::Stream> { + stream! { + let mut buf = [0; 2048]; + + while let Ok(count) = temp_file.read(&mut buf) { + if count == 0 { + break; + } + + yield Ok(Bytes::copy_from_slice(&buf[0..count])) + } + } +} + + +pub async fn upload_to_minio(archive: SpooledTempFile, filename: String) -> Result> { + let minio = get_minio(); + + let is_bucket_exist = match minio.bucket_exists(&config::CONFIG.minio_bucket).await { + Ok(v) => v, + Err(err) => return Err(Box::new(err)), + }; + + if !is_bucket_exist { + let _ = minio.make_bucket(&config::CONFIG.minio_bucket, false).await; + } + + let data_stream = get_stream(Box::new(archive)); + + if let Err(err) = minio.put_object_stream( + ObjectArgs::new(&config::CONFIG.minio_bucket, filename.clone()), + Box::pin(data_stream) + ).await { + return Err(Box::new(err)); + } + + let link = match minio.presigned_get_object( + PresignedArgs::new(&config::CONFIG.minio_bucket, filename) + ).await { + Ok(v) => v, + Err(err) => { + return Err(Box::new(err)); + }, + }; + + Ok(link) +} diff --git a/src/services/mod.rs b/src/services/mod.rs index 8bd7656..3a990ef 100644 --- a/src/services/mod.rs +++ b/src/services/mod.rs @@ -2,13 +2,15 @@ pub mod book_library; pub mod download_utils; pub mod telegram_files; pub mod downloader; +pub mod minio; use chrono::Duration; +use serde::Serialize; use tracing::log; use crate::{prisma::cached_file, views::Database}; -use self::{download_utils::DownloadResult, telegram_files::{download_from_telegram_files, UploadData, upload_to_telegram_files}, downloader::{get_filename, FilenameData, download_from_downloader}, book_library::{get_book, types::BaseBook, get_books}}; +use self::{download_utils::{DownloadResult, response_to_tempfile}, telegram_files::{download_from_telegram_files, UploadData, upload_to_telegram_files}, downloader::{get_filename, FilenameData, download_from_downloader}, book_library::{get_book, types::BaseBook, get_books}, minio::upload_to_minio}; pub async fn get_cached_file_or_cache( @@ -131,6 +133,49 @@ pub async fn download_from_cache( }) } +#[derive(Serialize)] +pub struct FileLinkResult { + pub link: String, + pub filename: String, + pub filename_ascii: String, + pub caption: String +} + +pub async fn get_download_link( + object_id: i32, + object_type: String, + db: Database +) -> Result, Box> { + let cached_file = match get_cached_file_or_cache(object_id, object_type, db.clone()).await { + Some(v) => v, + None => return Ok(None), + }; + + let data = match download_from_cache(cached_file, db).await { + Some(v) => v, + None => return Ok(None), + }; + + let DownloadResult { mut response, filename, filename_ascii, caption } = data; + + let tempfile = match response_to_tempfile(&mut response).await { + Some(v) => v.0, + None => return Ok(None), + }; + + let link = match upload_to_minio(tempfile, filename.clone()).await { + Ok(v) => v, + Err(err) => return Err(err), + }; + + Ok(Some(FileLinkResult { + link, + filename, + filename_ascii, + caption + })) +} + pub async fn get_books_for_update() -> Result, Box> { let mut result: Vec = vec![]; diff --git a/src/views.rs b/src/views.rs index f1f5051..7a1ebf3 100644 --- a/src/views.rs +++ b/src/views.rs @@ -2,11 +2,11 @@ use axum::{Router, response::{Response, IntoResponse, AppendHeaders}, http::{Sta use axum_prometheus::PrometheusMetricLayer; use tokio_util::io::ReaderStream; use tower_http::trace::{TraceLayer, self}; -use tracing::Level; +use tracing::{Level, log}; use std::sync::Arc; use base64::{engine::general_purpose, Engine}; -use crate::{config::CONFIG, db::get_prisma_client, prisma::{PrismaClient, cached_file::{self}}, services::{get_cached_file_or_cache, download_from_cache, download_utils::get_response_async_read, start_update_cache}}; +use crate::{config::CONFIG, db::get_prisma_client, prisma::{PrismaClient, cached_file::{self}}, services::{get_cached_file_or_cache, download_from_cache, download_utils::get_response_async_read, start_update_cache, get_download_link}}; pub type Database = Arc; @@ -75,6 +75,24 @@ async fn download_cached_file( (headers, body).into_response() } +async fn get_link( + Path((object_id, object_type)): Path<(i32, String)>, + Extension(Ext { db, .. }): Extension +) -> impl IntoResponse { + match get_download_link(object_id, object_type.clone(), db.clone()).await { + Ok(data) => { + match data { + Some(data) => Json(data).into_response(), + None => return StatusCode::NO_CONTENT.into_response(), + } + }, + Err(err) => { + log::error!("{:?}", err); + return StatusCode::NO_CONTENT.into_response(); + }, + } +} + async fn delete_cached_file( Path((object_id, object_type)): Path<(i32, String)>, Extension(Ext { db, .. }): Extension @@ -145,6 +163,7 @@ pub async fn get_router() -> Router { let app_router = Router::new() .route("/:object_id/:object_type/", get(get_cached_file)) .route("/download/:object_id/:object_type/", get(download_cached_file)) + .route("/link/:object_id/:object_type/", get(get_link)) .route("/:object_id/:object_type/", delete(delete_cached_file)) .route("/update_cache", post(update_cache))