mirror of
https://github.com/flibusta-apps/telegram_files_cache_server.git
synced 2025-12-06 14:45:36 +01:00
Add get link handler
This commit is contained in:
@@ -9,6 +9,11 @@ pub struct Config {
|
||||
pub postgres_port: u32,
|
||||
pub postgres_db: String,
|
||||
|
||||
pub minio_host: String,
|
||||
pub minio_bucket: String,
|
||||
pub minio_access_key: String,
|
||||
pub minio_secret_key: String,
|
||||
|
||||
pub downloader_api_key: String,
|
||||
pub downloader_url: String,
|
||||
|
||||
@@ -38,6 +43,11 @@ impl Config {
|
||||
postgres_port: get_env("POSTGRES_PORT").parse().unwrap(),
|
||||
postgres_db: get_env("POSTGRES_DB"),
|
||||
|
||||
minio_host: get_env("MINIO_HOST"),
|
||||
minio_bucket: get_env("MINIO_BUCKET"),
|
||||
minio_access_key: get_env("MINIO_ACCESS_KEY"),
|
||||
minio_secret_key: get_env("MINIO_SECRET_KEY"),
|
||||
|
||||
downloader_api_key: get_env("DOWNLOADER_API_KEY"),
|
||||
downloader_url: get_env("DOWNLOADER_URL"),
|
||||
|
||||
|
||||
@@ -1,5 +1,9 @@
|
||||
use std::io::{Write, Seek, SeekFrom};
|
||||
|
||||
use bytes::Buf;
|
||||
use futures::TryStreamExt;
|
||||
use reqwest::Response;
|
||||
use tempfile::SpooledTempFile;
|
||||
use tokio::io::AsyncRead;
|
||||
use tokio_util::compat::FuturesAsyncReadCompatExt;
|
||||
|
||||
@@ -17,3 +21,36 @@ pub fn get_response_async_read(it: Response) -> impl AsyncRead {
|
||||
.into_async_read()
|
||||
.compat()
|
||||
}
|
||||
|
||||
pub async fn response_to_tempfile(res: &mut Response) -> Option<(SpooledTempFile, usize)> {
|
||||
let mut tmp_file = tempfile::spooled_tempfile(5 * 1024 * 1024);
|
||||
|
||||
let mut data_size: usize = 0;
|
||||
|
||||
{
|
||||
loop {
|
||||
let chunk = res.chunk().await;
|
||||
|
||||
let result = match chunk {
|
||||
Ok(v) => v,
|
||||
Err(_) => return None,
|
||||
};
|
||||
|
||||
let data = match result {
|
||||
Some(v) => v,
|
||||
None => break,
|
||||
};
|
||||
|
||||
data_size += data.len();
|
||||
|
||||
match tmp_file.write(data.chunk()) {
|
||||
Ok(_) => (),
|
||||
Err(_) => return None,
|
||||
}
|
||||
}
|
||||
|
||||
tmp_file.seek(SeekFrom::Start(0)).unwrap();
|
||||
}
|
||||
|
||||
Some((tmp_file, data_size))
|
||||
}
|
||||
|
||||
73
src/services/minio.rs
Normal file
73
src/services/minio.rs
Normal file
@@ -0,0 +1,73 @@
|
||||
use std::io::Read;
|
||||
|
||||
use async_stream::stream;
|
||||
use bytes::Bytes;
|
||||
use minio_rsc::{provider::StaticProvider, Minio, types::args::{ObjectArgs, PresignedArgs}, errors::MinioError};
|
||||
use tempfile::SpooledTempFile;
|
||||
|
||||
use crate::config;
|
||||
|
||||
|
||||
pub fn get_minio() -> Minio {
|
||||
let provider = StaticProvider::new(
|
||||
&config::CONFIG.minio_access_key,
|
||||
&config::CONFIG.minio_secret_key,
|
||||
None
|
||||
);
|
||||
|
||||
Minio::builder()
|
||||
.host(&config::CONFIG.minio_host)
|
||||
.provider(provider)
|
||||
.secure(false)
|
||||
.build()
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
|
||||
pub fn get_stream(mut temp_file: Box<dyn Read + Send>) -> impl futures_core::Stream<Item = Result<Bytes, MinioError>> {
|
||||
stream! {
|
||||
let mut buf = [0; 2048];
|
||||
|
||||
while let Ok(count) = temp_file.read(&mut buf) {
|
||||
if count == 0 {
|
||||
break;
|
||||
}
|
||||
|
||||
yield Ok(Bytes::copy_from_slice(&buf[0..count]))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
pub async fn upload_to_minio(archive: SpooledTempFile, filename: String) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
|
||||
let minio = get_minio();
|
||||
|
||||
let is_bucket_exist = match minio.bucket_exists(&config::CONFIG.minio_bucket).await {
|
||||
Ok(v) => v,
|
||||
Err(err) => return Err(Box::new(err)),
|
||||
};
|
||||
|
||||
if !is_bucket_exist {
|
||||
let _ = minio.make_bucket(&config::CONFIG.minio_bucket, false).await;
|
||||
}
|
||||
|
||||
let data_stream = get_stream(Box::new(archive));
|
||||
|
||||
if let Err(err) = minio.put_object_stream(
|
||||
ObjectArgs::new(&config::CONFIG.minio_bucket, filename.clone()),
|
||||
Box::pin(data_stream)
|
||||
).await {
|
||||
return Err(Box::new(err));
|
||||
}
|
||||
|
||||
let link = match minio.presigned_get_object(
|
||||
PresignedArgs::new(&config::CONFIG.minio_bucket, filename)
|
||||
).await {
|
||||
Ok(v) => v,
|
||||
Err(err) => {
|
||||
return Err(Box::new(err));
|
||||
},
|
||||
};
|
||||
|
||||
Ok(link)
|
||||
}
|
||||
@@ -2,13 +2,15 @@ pub mod book_library;
|
||||
pub mod download_utils;
|
||||
pub mod telegram_files;
|
||||
pub mod downloader;
|
||||
pub mod minio;
|
||||
|
||||
use chrono::Duration;
|
||||
use serde::Serialize;
|
||||
use tracing::log;
|
||||
|
||||
use crate::{prisma::cached_file, views::Database};
|
||||
|
||||
use self::{download_utils::DownloadResult, telegram_files::{download_from_telegram_files, UploadData, upload_to_telegram_files}, downloader::{get_filename, FilenameData, download_from_downloader}, book_library::{get_book, types::BaseBook, get_books}};
|
||||
use self::{download_utils::{DownloadResult, response_to_tempfile}, telegram_files::{download_from_telegram_files, UploadData, upload_to_telegram_files}, downloader::{get_filename, FilenameData, download_from_downloader}, book_library::{get_book, types::BaseBook, get_books}, minio::upload_to_minio};
|
||||
|
||||
|
||||
pub async fn get_cached_file_or_cache(
|
||||
@@ -131,6 +133,49 @@ pub async fn download_from_cache(
|
||||
})
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct FileLinkResult {
|
||||
pub link: String,
|
||||
pub filename: String,
|
||||
pub filename_ascii: String,
|
||||
pub caption: String
|
||||
}
|
||||
|
||||
pub async fn get_download_link(
|
||||
object_id: i32,
|
||||
object_type: String,
|
||||
db: Database
|
||||
) -> Result<Option<FileLinkResult>, Box<dyn std::error::Error + Send + Sync>> {
|
||||
let cached_file = match get_cached_file_or_cache(object_id, object_type, db.clone()).await {
|
||||
Some(v) => v,
|
||||
None => return Ok(None),
|
||||
};
|
||||
|
||||
let data = match download_from_cache(cached_file, db).await {
|
||||
Some(v) => v,
|
||||
None => return Ok(None),
|
||||
};
|
||||
|
||||
let DownloadResult { mut response, filename, filename_ascii, caption } = data;
|
||||
|
||||
let tempfile = match response_to_tempfile(&mut response).await {
|
||||
Some(v) => v.0,
|
||||
None => return Ok(None),
|
||||
};
|
||||
|
||||
let link = match upload_to_minio(tempfile, filename.clone()).await {
|
||||
Ok(v) => v,
|
||||
Err(err) => return Err(err),
|
||||
};
|
||||
|
||||
Ok(Some(FileLinkResult {
|
||||
link,
|
||||
filename,
|
||||
filename_ascii,
|
||||
caption
|
||||
}))
|
||||
}
|
||||
|
||||
pub async fn get_books_for_update() -> Result<Vec<BaseBook>, Box<dyn std::error::Error + Send + Sync>> {
|
||||
let mut result: Vec<BaseBook> = vec![];
|
||||
|
||||
|
||||
23
src/views.rs
23
src/views.rs
@@ -2,11 +2,11 @@ use axum::{Router, response::{Response, IntoResponse, AppendHeaders}, http::{Sta
|
||||
use axum_prometheus::PrometheusMetricLayer;
|
||||
use tokio_util::io::ReaderStream;
|
||||
use tower_http::trace::{TraceLayer, self};
|
||||
use tracing::Level;
|
||||
use tracing::{Level, log};
|
||||
use std::sync::Arc;
|
||||
use base64::{engine::general_purpose, Engine};
|
||||
|
||||
use crate::{config::CONFIG, db::get_prisma_client, prisma::{PrismaClient, cached_file::{self}}, services::{get_cached_file_or_cache, download_from_cache, download_utils::get_response_async_read, start_update_cache}};
|
||||
use crate::{config::CONFIG, db::get_prisma_client, prisma::{PrismaClient, cached_file::{self}}, services::{get_cached_file_or_cache, download_from_cache, download_utils::get_response_async_read, start_update_cache, get_download_link}};
|
||||
|
||||
|
||||
pub type Database = Arc<PrismaClient>;
|
||||
@@ -75,6 +75,24 @@ async fn download_cached_file(
|
||||
(headers, body).into_response()
|
||||
}
|
||||
|
||||
async fn get_link(
|
||||
Path((object_id, object_type)): Path<(i32, String)>,
|
||||
Extension(Ext { db, .. }): Extension<Ext>
|
||||
) -> impl IntoResponse {
|
||||
match get_download_link(object_id, object_type.clone(), db.clone()).await {
|
||||
Ok(data) => {
|
||||
match data {
|
||||
Some(data) => Json(data).into_response(),
|
||||
None => return StatusCode::NO_CONTENT.into_response(),
|
||||
}
|
||||
},
|
||||
Err(err) => {
|
||||
log::error!("{:?}", err);
|
||||
return StatusCode::NO_CONTENT.into_response();
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
async fn delete_cached_file(
|
||||
Path((object_id, object_type)): Path<(i32, String)>,
|
||||
Extension(Ext { db, .. }): Extension<Ext>
|
||||
@@ -145,6 +163,7 @@ pub async fn get_router() -> Router {
|
||||
let app_router = Router::new()
|
||||
.route("/:object_id/:object_type/", get(get_cached_file))
|
||||
.route("/download/:object_id/:object_type/", get(download_cached_file))
|
||||
.route("/link/:object_id/:object_type/", get(get_link))
|
||||
.route("/:object_id/:object_type/", delete(delete_cached_file))
|
||||
.route("/update_cache", post(update_cache))
|
||||
|
||||
|
||||
Reference in New Issue
Block a user