mirror of
https://github.com/flibusta-apps/telegram_files_server.git
synced 2025-12-06 12:35:39 +01:00
Compare commits
34 Commits
rewrite-to
...
2e98c84ba7
| Author | SHA1 | Date | |
|---|---|---|---|
| 2e98c84ba7 | |||
|
|
a18d1f1094 | ||
| 0572e300ef | |||
|
|
cce2586a2e | ||
| d7f702109b | |||
|
|
e168f255fb | ||
| e91318f728 | |||
| b2969f7693 | |||
|
|
15091f633d | ||
| 4bc0e29c66 | |||
| 7672fc3f60 | |||
| 491bb75df2 | |||
| 88f91af907 | |||
| f2b46817d6 | |||
|
|
fc5bf1190f | ||
| a1b52a1d2e | |||
|
|
c33be9463d | ||
| 1104a91570 | |||
| 94193fae41 | |||
| cd0cde70de | |||
| 8ff0a069b1 | |||
| e21273a2b8 | |||
| bd62f1b076 | |||
| b944a9e724 | |||
| 16a1691212 | |||
|
|
b705b0cb30 | ||
|
|
e18d9555a6 | ||
| dbd4b547c6 | |||
| adc47f1b75 | |||
| 0976471562 | |||
| d2fcf96695 | |||
| f9b2e8b0a3 | |||
| 494569d1ac | |||
| 7319312754 |
2
.github/workflows/build_docker_image.yml
vendored
2
.github/workflows/build_docker_image.yml
vendored
@@ -33,7 +33,7 @@ jobs:
|
||||
-
|
||||
name: Build and push
|
||||
id: docker_build
|
||||
uses: docker/build-push-action@v5
|
||||
uses: docker/build-push-action@v6
|
||||
env:
|
||||
IMAGE: ${{ steps.repository_name.outputs.lowercase }}
|
||||
with:
|
||||
|
||||
2
.gitignore
vendored
2
.gitignore
vendored
@@ -11,6 +11,8 @@ __pycache__
|
||||
venv
|
||||
|
||||
|
||||
.DS_Store
|
||||
|
||||
# Added by cargo
|
||||
|
||||
/target
|
||||
|
||||
1974
Cargo.lock
generated
1974
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
36
Cargo.toml
36
Cargo.toml
@@ -4,33 +4,33 @@ version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
serde = "1.0.200"
|
||||
serde_json = "1.0.116"
|
||||
serde = "1.0.219"
|
||||
serde_json = "1.0.140"
|
||||
|
||||
axum = { version = "0.7.5", features = ["multipart"] }
|
||||
axum_typed_multipart = "0.11.1"
|
||||
axum = { version = "0.8.1", features = ["multipart"] }
|
||||
axum_typed_multipart = "0.15.1"
|
||||
|
||||
tracing = "0.1.40"
|
||||
tracing-subscriber = { version = "0.3.18", features = ["env-filter"]}
|
||||
tower-http = { version = "0.5.2", features = ["trace"] }
|
||||
sentry-tracing = "0.32.3"
|
||||
tracing = "0.1.41"
|
||||
tracing-subscriber = { version = "0.3.19", features = ["env-filter"]}
|
||||
tower-http = { version = "0.6.2", features = ["trace"] }
|
||||
sentry-tracing = "0.36.0"
|
||||
|
||||
tokio = "1.37.0"
|
||||
tokio-util = { version = "0.7.11", features = [ "full" ] }
|
||||
axum-prometheus = "0.6.1"
|
||||
tokio = { version = "1.44.2", features = [ "full" ] }
|
||||
tokio-util = { version = "0.7.14", features = [ "full" ] }
|
||||
axum-prometheus = "0.8.0"
|
||||
|
||||
futures = "0.3.30"
|
||||
futures = "0.3.31"
|
||||
|
||||
once_cell = "1.19.0"
|
||||
teloxide = "0.12.2"
|
||||
once_cell = "1.21.1"
|
||||
teloxide = { git = "https://github.com/teloxide/teloxide.git" }
|
||||
|
||||
sentry = "0.32.3"
|
||||
sentry = "0.36.0"
|
||||
|
||||
dotenv = "0.15.0"
|
||||
dotenvy = "0.15.7"
|
||||
|
||||
reqwest = { version = "0.11.10", features = [
|
||||
reqwest = { version = "0.12.15", features = [
|
||||
"json",
|
||||
"stream",
|
||||
"multipart",
|
||||
], default-features = false }
|
||||
moka = { version = "0.12.7", features = ["future"] }
|
||||
moka = { version = "0.12.10", features = ["future"] }
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use std::sync::{
|
||||
use std::{sync::{
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
Arc,
|
||||
};
|
||||
}, time::Duration};
|
||||
|
||||
use once_cell::sync::Lazy;
|
||||
use teloxide::Bot;
|
||||
@@ -23,8 +23,18 @@ impl RoundRobinBot {
|
||||
|
||||
pub fn get_bot(&self) -> Bot {
|
||||
let index = self.current_index.fetch_add(1, Ordering::Relaxed) % self.bot_tokens.len();
|
||||
Bot::new(self.bot_tokens[index].clone())
|
||||
.set_api_url(reqwest::Url::parse(CONFIG.telegram_api_url.as_str()).unwrap())
|
||||
|
||||
let client = reqwest::Client::builder()
|
||||
.connect_timeout(Duration::from_secs(5))
|
||||
.timeout(Duration::from_secs(5 * 60))
|
||||
.tcp_nodelay(true)
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
Bot::with_client(
|
||||
self.bot_tokens[index].clone(),
|
||||
client
|
||||
).set_api_url(reqwest::Url::parse(CONFIG.telegram_api_url.as_str()).unwrap())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,17 +1,13 @@
|
||||
use std::pin::Pin;
|
||||
use std::error::Error;
|
||||
|
||||
use axum::body::Bytes;
|
||||
use futures::TryStreamExt;
|
||||
use once_cell::sync::Lazy;
|
||||
use serde::Serialize;
|
||||
use teloxide::{
|
||||
net::Download,
|
||||
requests::Requester,
|
||||
types::{ChatId, InputFile, MessageId, Recipient},
|
||||
Bot,
|
||||
};
|
||||
use tokio::io::AsyncRead;
|
||||
use tokio_util::compat::FuturesAsyncReadCompatExt;
|
||||
use tokio::fs::File;
|
||||
use tracing::log;
|
||||
use moka::future::Cache;
|
||||
|
||||
@@ -48,6 +44,7 @@ pub static TEMP_FILES_CACHE: Lazy<Cache<i32, MessageId>> = Lazy::new(|| {
|
||||
.build()
|
||||
});
|
||||
|
||||
|
||||
pub async fn upload_file(
|
||||
file: Bytes,
|
||||
filename: String,
|
||||
@@ -73,7 +70,8 @@ pub async fn upload_file(
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn download_file(chat_id: i64, message_id: i32) -> Option<BotDownloader> {
|
||||
|
||||
pub async fn download_file(chat_id: i64, message_id: i32) -> Result<Option<File>, Box<dyn Error>> {
|
||||
let bot = ROUND_ROBIN_BOT.get_bot();
|
||||
|
||||
let forwarded_message = match bot
|
||||
@@ -86,50 +84,71 @@ pub async fn download_file(chat_id: i64, message_id: i32) -> Option<BotDownloade
|
||||
{
|
||||
Ok(v) => v,
|
||||
Err(err) => {
|
||||
if let teloxide::RequestError::Api(ref err) = err {
|
||||
if let teloxide::ApiError::MessageToForwardNotFound = err {
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
|
||||
if let teloxide::RequestError::Api(ref err) = err {
|
||||
if let teloxide::ApiError::MessageIdInvalid = err {
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
|
||||
log::error!("Error: {}", err);
|
||||
return None;
|
||||
return Err(Box::new(err));
|
||||
}
|
||||
};
|
||||
|
||||
let file_id = match forwarded_message.document() {
|
||||
Some(v) => v.file.id.clone(),
|
||||
None => {
|
||||
log::error!("Document not found!");
|
||||
return None;
|
||||
}
|
||||
};
|
||||
let file_id = forwarded_message.document().unwrap().file.id.clone();
|
||||
|
||||
TEMP_FILES_CACHE.insert(message_id, forwarded_message.id.clone()).await;
|
||||
TEMP_FILES_CACHE.insert(message_id, forwarded_message.id).await;
|
||||
|
||||
let path = match bot.get_file(file_id.clone()).await {
|
||||
Ok(v) => v.path,
|
||||
Err(err) => {
|
||||
log::error!("Error: {}", err);
|
||||
return None;
|
||||
return Err(Box::new(err));
|
||||
}
|
||||
};
|
||||
|
||||
return Some(BotDownloader::new(bot, path));
|
||||
Ok(Some(File::open(path).await?))
|
||||
}
|
||||
|
||||
pub struct BotDownloader {
|
||||
bot: Bot,
|
||||
file_path: String,
|
||||
|
||||
pub async fn clean_files() -> Result<(), Box<dyn Error>> {
|
||||
let bots_folder = "/var/lib/telegram-bot-api/";
|
||||
let documents_folder_name = "documents";
|
||||
|
||||
let mut bots_folder = tokio::fs::read_dir(bots_folder).await.unwrap();
|
||||
|
||||
while let Some(entry) = bots_folder.next_entry().await? {
|
||||
if !entry.metadata().await.unwrap().is_dir() {
|
||||
continue;
|
||||
}
|
||||
|
||||
impl BotDownloader {
|
||||
pub fn new(bot: Bot, file_path: String) -> Self {
|
||||
Self { bot, file_path }
|
||||
let documents_folder_path = entry.path().join(documents_folder_name);
|
||||
if !documents_folder_path.exists() {
|
||||
continue;
|
||||
}
|
||||
|
||||
pub fn get_async_read(self) -> Pin<Box<dyn AsyncRead + Send>> {
|
||||
let stream = self.bot.download_file_stream(&self.file_path);
|
||||
let mut document_folder = match tokio::fs::read_dir(documents_folder_path.clone()).await {
|
||||
Ok(v) => v,
|
||||
Err(err) => panic!("Path: {:?}, Error: {:?}", documents_folder_path, err),
|
||||
};
|
||||
|
||||
Box::pin(
|
||||
stream
|
||||
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
|
||||
.into_async_read()
|
||||
.compat(),
|
||||
)
|
||||
while let Some(file) = document_folder.next_entry().await? {
|
||||
let metadata = file.metadata().await.unwrap();
|
||||
|
||||
if metadata.created()?.elapsed().unwrap().as_secs() > 3600 {
|
||||
match tokio::fs::remove_file(file.path()).await {
|
||||
Ok(_) => log::info!("File {:?} removed", file.path()),
|
||||
Err(err) => log::error!("Error: {}", err),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@ use axum_typed_multipart::{TryFromMultipart, TypedMultipart};
|
||||
use tokio_util::io::ReaderStream;
|
||||
use tower_http::trace::{self, TraceLayer};
|
||||
use tracing::Level;
|
||||
use tracing::log;
|
||||
|
||||
use crate::config::CONFIG;
|
||||
|
||||
@@ -44,8 +45,8 @@ pub async fn get_router() -> Router {
|
||||
let (prometheus_layer, metric_handle) = PrometheusMetricLayer::pair();
|
||||
|
||||
let app_router = Router::new()
|
||||
.route("/upload/", post(upload))
|
||||
.route("/download_by_message/:chat_id/:message_id", get(download))
|
||||
.route("/api/v1/files/upload/", post(upload))
|
||||
.route("/api/v1/files/download_by_message/{chat_id}/{message_id}", get(download))
|
||||
.layer(DefaultBodyLimit::max(BODY_LIMIT))
|
||||
.layer(middleware::from_fn(auth))
|
||||
.layer(prometheus_layer);
|
||||
@@ -54,8 +55,8 @@ pub async fn get_router() -> Router {
|
||||
Router::new().route("/metrics", get(|| async move { metric_handle.render() }));
|
||||
|
||||
Router::new()
|
||||
.nest("/api/v1/files", app_router)
|
||||
.nest("/", metric_router)
|
||||
.merge(app_router)
|
||||
.merge(metric_router)
|
||||
.layer(
|
||||
TraceLayer::new_for_http()
|
||||
.make_span_with(trace::DefaultMakeSpan::new().level(Level::INFO))
|
||||
@@ -89,14 +90,20 @@ async fn upload(data: TypedMultipart<UploadFileRequest>) -> impl IntoResponse {
|
||||
}
|
||||
|
||||
async fn download(Path((chat_id, message_id)): Path<(i64, i32)>) -> impl IntoResponse {
|
||||
let downloader = download_file(chat_id, message_id).await;
|
||||
|
||||
let data = match downloader {
|
||||
Some(v) => v.get_async_read(),
|
||||
None => return StatusCode::NOT_FOUND.into_response()
|
||||
let file = match download_file(chat_id, message_id).await {
|
||||
Ok(v) => {
|
||||
match v {
|
||||
Some(v) => v,
|
||||
None => return StatusCode::NO_CONTENT.into_response(),
|
||||
}
|
||||
},
|
||||
Err(err) => {
|
||||
log::error!("{}", err);
|
||||
return StatusCode::INTERNAL_SERVER_ERROR.into_response()
|
||||
}
|
||||
};
|
||||
|
||||
let reader = ReaderStream::new(data);
|
||||
let reader = ReaderStream::new(file);
|
||||
|
||||
axum::body::Body::from_stream(reader).into_response()
|
||||
}
|
||||
|
||||
40
src/main.rs
40
src/main.rs
@@ -1,17 +1,41 @@
|
||||
mod config;
|
||||
mod core;
|
||||
|
||||
use core::file_utils::clean_files;
|
||||
use std::{net::SocketAddr, str::FromStr};
|
||||
use sentry::{integrations::debug_images::DebugImagesIntegration, types::Dsn, ClientOptions};
|
||||
use sentry_tracing::EventFilter;
|
||||
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
|
||||
use tracing_subscriber::{filter, layer::SubscriberExt, util::SubscriberInitExt};
|
||||
|
||||
use crate::core::views::get_router;
|
||||
|
||||
|
||||
async fn start_app() {
|
||||
let addr = SocketAddr::from(([0, 0, 0, 0], 8080));
|
||||
|
||||
let app = get_router().await;
|
||||
|
||||
println!("Start webserver...");
|
||||
let listener = tokio::net::TcpListener::bind(&addr).await.unwrap();
|
||||
axum::serve(listener, app).await.unwrap();
|
||||
println!("Webserver shutdown...");
|
||||
}
|
||||
|
||||
|
||||
async fn cron_jobs() {
|
||||
let mut interval = tokio::time::interval(std::time::Duration::from_secs(5 * 60));
|
||||
|
||||
loop {
|
||||
interval.tick().await;
|
||||
|
||||
let _ = clean_files().await;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
dotenv::dotenv().ok();
|
||||
dotenvy::dotenv().ok();
|
||||
|
||||
let options = ClientOptions {
|
||||
dsn: Some(Dsn::from_str(&config::CONFIG.sentry_dsn).unwrap()),
|
||||
@@ -28,16 +52,10 @@ async fn main() {
|
||||
});
|
||||
|
||||
tracing_subscriber::registry()
|
||||
.with(tracing_subscriber::fmt::layer())
|
||||
.with(tracing_subscriber::fmt::layer().with_target(false))
|
||||
.with(filter::LevelFilter::INFO)
|
||||
.with(sentry_layer)
|
||||
.init();
|
||||
|
||||
let addr = SocketAddr::from(([0, 0, 0, 0], 8080));
|
||||
|
||||
let app = get_router().await;
|
||||
|
||||
println!("Start webserver...");
|
||||
let listener = tokio::net::TcpListener::bind(&addr).await.unwrap();
|
||||
axum::serve(listener, app).await.unwrap();
|
||||
println!("Webserver shutdown...");
|
||||
tokio::join![cron_jobs(), start_app()];
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user