diff --git a/src/bots_manager/axum_server.rs b/src/bots_manager/axum_server.rs new file mode 100644 index 0000000..5bd63ad --- /dev/null +++ b/src/bots_manager/axum_server.rs @@ -0,0 +1,88 @@ +use axum::extract::Path; +use axum::response::IntoResponse; +use axum::routing::post; + +use reqwest::StatusCode; + +use std::net::SocketAddr; + +use teloxide::types::{Update, UpdateKind}; + +use tower_http::trace::TraceLayer; + +use tracing::log; + +use crate::bots_manager::{internal::start_bot, BOTS_DATA, BOTS_ROUTES, SERVER_PORT}; + +pub async fn start_axum_server() { + async fn telegram_request(Path(token): Path, input: String) -> impl IntoResponse { + let (_, r_tx) = match BOTS_ROUTES.get(&token).await { + Some(tx) => tx, + None => { + let bot_data = BOTS_DATA.get(&token).await; + + if bot_data.is_none() { + return StatusCode::NOT_FOUND; + } + + let start_result = start_bot(&bot_data.unwrap(), SERVER_PORT).await; + + if !start_result { + return StatusCode::SERVICE_UNAVAILABLE; + } + + BOTS_ROUTES.get(&token).await.unwrap() + } + }; + + let tx = match r_tx.get() { + Some(v) => v, + None => { + BOTS_ROUTES.remove(&token).await; + return StatusCode::SERVICE_UNAVAILABLE; + } + }; + + match serde_json::from_str::(&input) { + Ok(mut update) => { + if let UpdateKind::Error(value) = &mut update.kind { + *value = serde_json::from_str(&input).unwrap_or_default(); + } + + if let Err(err) = tx.send(Ok(update)) { + log::error!("{:?}", err); + BOTS_ROUTES.remove(&token).await; + return StatusCode::SERVICE_UNAVAILABLE; + } + } + Err(error) => { + log::error!( + "Cannot parse an update.\nError: {:?}\nValue: {}\n\ + This is a bug in teloxide-core, please open an issue here: \ + https://github.com/teloxide/teloxide/issues.", + error, + input + ); + } + }; + + StatusCode::OK + } + + let router = axum::Router::new() + .route("/:token/", post(telegram_request)) + .layer(TraceLayer::new_for_http()); + + tokio::spawn(async move { + log::info!("Start webserver..."); + + let addr = SocketAddr::from(([0, 0, 0, 0], SERVER_PORT)); + + axum::Server::bind(&addr) + .serve(router.into_make_service()) + .await + .expect("Axum server error"); + + log::info!("Webserver shutdown..."); + }); +} diff --git a/src/bots_manager/bot_manager_client.rs b/src/bots_manager/bot_manager_client.rs index 85f2c17..a78384e 100644 --- a/src/bots_manager/bot_manager_client.rs +++ b/src/bots_manager/bot_manager_client.rs @@ -10,7 +10,7 @@ pub enum BotCache { NoCache, } -#[derive(Deserialize, Debug)] +#[derive(Deserialize, Debug, Clone)] pub struct BotData { pub id: u32, pub token: String, diff --git a/src/bots_manager/closable_sender.rs b/src/bots_manager/closable_sender.rs new file mode 100644 index 0000000..59c9e9e --- /dev/null +++ b/src/bots_manager/closable_sender.rs @@ -0,0 +1,29 @@ +use tokio::sync::mpsc; + +pub struct ClosableSender { + origin: std::sync::Arc>>>, +} + +impl Clone for ClosableSender { + fn clone(&self) -> Self { + Self { + origin: self.origin.clone(), + } + } +} + +impl ClosableSender { + pub fn new(sender: mpsc::UnboundedSender) -> Self { + Self { + origin: std::sync::Arc::new(std::sync::RwLock::new(Some(sender))), + } + } + + pub fn get(&self) -> Option> { + self.origin.read().unwrap().clone() + } + + pub fn close(&mut self) { + self.origin.write().unwrap().take(); + } +} diff --git a/src/bots_manager/internal.rs b/src/bots_manager/internal.rs new file mode 100644 index 0000000..8bcbd8c --- /dev/null +++ b/src/bots_manager/internal.rs @@ -0,0 +1,98 @@ +use teloxide::adaptors::throttle::Limits; +use teloxide::dispatching::Dispatcher; +use teloxide::error_handlers::LoggingErrorHandler; +use teloxide::requests::{Request, Requester, RequesterExt}; +use teloxide::stop::StopToken; +use teloxide::stop::{mk_stop_token, StopFlag}; +use teloxide::types::{BotCommand, Update}; +use teloxide::update_listeners::{StatefulListener, UpdateListener}; +use teloxide::{dptree, Bot}; + +use tokio::sync::mpsc::{self, UnboundedSender}; +use tokio_stream::wrappers::UnboundedReceiverStream; + +use tracing::log; +use url::Url; + +use std::convert::Infallible; + +use crate::bots_manager::BOTS_ROUTES; +use crate::config; + +use super::closable_sender::ClosableSender; +use super::utils::tuple_first_mut; +use super::BotData; + +type UpdateSender = mpsc::UnboundedSender>; + +pub fn get_listener() -> ( + StopToken, + StopFlag, + UnboundedSender>, + impl UpdateListener, +) { + let (tx, rx): (UpdateSender, _) = mpsc::unbounded_channel(); + + let (stop_token, stop_flag) = mk_stop_token(); + + let stream = UnboundedReceiverStream::new(rx); + + let listener = StatefulListener::new( + (stream, stop_token.clone()), + tuple_first_mut, + |state: &mut (_, StopToken)| state.1.clone(), + ); + + (stop_token, stop_flag, tx, listener) +} + +pub async fn start_bot(bot_data: &BotData, port: u16) -> bool { + let bot = Bot::new(bot_data.token.clone()) + .set_api_url(config::CONFIG.telegram_bot_api.clone()) + .throttle(Limits::default()) + .cache_me(); + + let token = bot.inner().inner().token(); + + log::info!("Start bot(id={})", bot_data.id); + + let (handler, commands) = crate::bots::get_bot_handler(); + + let set_command_result = match commands { + Some(v) => bot.set_my_commands::>(v).send().await, + None => bot.delete_my_commands().send().await, + }; + match set_command_result { + Ok(_) => (), + Err(err) => log::error!("{:?}", err), + } + + let mut dispatcher = Dispatcher::builder(bot.clone(), handler) + .dependencies(dptree::deps![bot_data.cache]) + .build(); + + let (stop_token, _stop_flag, tx, listener) = get_listener(); + + let host = format!("{}:{}", &config::CONFIG.webhook_base_url, port); + let url = Url::parse(&format!("{host}/{token}/")) + .unwrap_or_else(|_| panic!("Can't parse webhook url!")); + + if bot.set_webhook(url.clone()).await.is_err() { + return false; + } + + tokio::spawn(async move { + dispatcher + .dispatch_with_listener( + listener, + LoggingErrorHandler::with_custom_text("An error from the update listener"), + ) + .await; + }); + + BOTS_ROUTES + .insert(token.to_string(), (stop_token, ClosableSender::new(tx))) + .await; + + true +} diff --git a/src/bots_manager/mod.rs b/src/bots_manager/mod.rs index 8296b01..7de07c6 100644 --- a/src/bots_manager/mod.rs +++ b/src/bots_manager/mod.rs @@ -1,45 +1,28 @@ +pub mod axum_server; pub mod bot_manager_client; +pub mod closable_sender; +pub mod internal; +pub mod utils; -use axum::extract::{Path, State}; -use axum::response::IntoResponse; -use axum::routing::post; use once_cell::sync::Lazy; -use reqwest::StatusCode; use smartstring::alias::String as SmartString; -use teloxide::stop::{mk_stop_token, StopFlag, StopToken}; -use teloxide::update_listeners::{StatefulListener, UpdateListener}; -use tokio::sync::mpsc::{self, UnboundedSender}; -use tokio_stream::wrappers::UnboundedReceiverStream; +use teloxide::stop::StopToken; use tracing::log; -use url::Url; -use std::collections::HashMap; -use std::convert::Infallible; -use std::net::SocketAddr; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -use tokio::sync::RwLock; - use smallvec::SmallVec; -use teloxide::adaptors::throttle::Limits; -use teloxide::types::{BotCommand, UpdateKind}; use tokio::time::{self, sleep, Duration}; -use tower_http::trace::TraceLayer; use teloxide::prelude::*; use moka::future::Cache; +use self::axum_server::start_axum_server; use self::bot_manager_client::get_bots; pub use self::bot_manager_client::{BotCache, BotData}; -use crate::config; - -type UpdateSender = mpsc::UnboundedSender>; - -fn tuple_first_mut(tuple: &mut (A, B)) -> &mut A { - &mut tuple.0 -} +use self::closable_sender::ClosableSender; pub static USER_ACTIVITY_CACHE: Lazy> = Lazy::new(|| { Cache::builder() @@ -62,158 +45,40 @@ pub static CHAT_DONATION_NOTIFICATIONS_CACHE: Lazy> = Lazy::ne .build() }); -type Routes = Arc< - RwLock< - HashMap< - String, - ( - StopToken, - ClosableSender>, - ), - >, - >, ->; +pub static SERVER_PORT: u16 = 8000; -struct ClosableSender { - origin: std::sync::Arc>>>, -} +type StopTokenWithSender = ( + StopToken, + ClosableSender>, +); -impl Clone for ClosableSender { - fn clone(&self) -> Self { - Self { - origin: self.origin.clone(), - } - } -} +pub static BOTS_ROUTES: Lazy> = Lazy::new(|| { + Cache::builder() + .time_to_idle(Duration::from_secs(60 * 60)) + .max_capacity(100) + .eviction_listener(|_token, value: StopTokenWithSender, _cause| { + let (stop_token, mut sender) = value; -impl ClosableSender { - fn new(sender: mpsc::UnboundedSender) -> Self { - Self { - origin: std::sync::Arc::new(std::sync::RwLock::new(Some(sender))), - } - } + stop_token.stop(); + sender.close(); + }) + .build() +}); - fn get(&self) -> Option> { - self.origin.read().unwrap().clone() - } +pub static BOTS_DATA: Lazy> = Lazy::new(|| Cache::builder().build()); - fn close(&mut self) { - self.origin.write().unwrap().take(); - } -} - -#[derive(Default, Clone)] -struct ServerState { - routers: Routes, -} - -pub struct BotsManager { - port: u16, - - state: ServerState, -} +pub struct BotsManager; impl BotsManager { - pub fn create() -> Self { - BotsManager { - port: 8000, - - state: ServerState { - routers: Arc::new(RwLock::new(HashMap::new())), - }, - } - } - - fn get_listener( - &self, - ) -> ( - StopToken, - StopFlag, - UnboundedSender>, - impl UpdateListener, - ) { - let (tx, rx): (UpdateSender, _) = mpsc::unbounded_channel(); - - let (stop_token, stop_flag) = mk_stop_token(); - - let stream = UnboundedReceiverStream::new(rx); - - let listener = StatefulListener::new( - (stream, stop_token.clone()), - tuple_first_mut, - |state: &mut (_, StopToken)| state.1.clone(), - ); - - (stop_token, stop_flag, tx, listener) - } - - async fn start_bot(&mut self, bot_data: &BotData) -> bool { - let bot = Bot::new(bot_data.token.clone()) - .set_api_url(config::CONFIG.telegram_bot_api.clone()) - .throttle(Limits::default()) - .cache_me(); - - let token = bot.inner().inner().token(); - - log::info!("Start bot(id={})", bot_data.id); - - let (handler, commands) = crate::bots::get_bot_handler(); - - let set_command_result = match commands { - Some(v) => bot.set_my_commands::>(v).send().await, - None => bot.delete_my_commands().send().await, - }; - match set_command_result { - Ok(_) => (), - Err(err) => log::error!("{:?}", err), - } - - let mut dispatcher = Dispatcher::builder(bot.clone(), handler) - .dependencies(dptree::deps![bot_data.cache]) - .build(); - - let (stop_token, _stop_flag, tx, listener) = self.get_listener(); - - { - let mut routers = self.state.routers.write().await; - routers.insert(token.to_string(), (stop_token, ClosableSender::new(tx))); - } - - let host = format!("{}:{}", &config::CONFIG.webhook_base_url, self.port); - let url = Url::parse(&format!("{host}/{token}/")) - .unwrap_or_else(|_| panic!("Can't parse webhook url!")); - - match bot.set_webhook(url.clone()).await { - Ok(_) => (), - Err(_) => return false, - } - - tokio::spawn(async move { - dispatcher - .dispatch_with_listener( - listener, - LoggingErrorHandler::with_custom_text("An error from the update listener"), - ) - .await; - }); - - true - } - - async fn check(&mut self) { + async fn check() { let bots_data = get_bots().await; match bots_data { Ok(v) => { for bot_data in v.iter() { - let need_start = { - let routers = self.state.routers.read().await; - !routers.contains_key(&bot_data.token) - }; - - if need_start { - self.start_bot(bot_data).await; - } + BOTS_DATA + .insert(bot_data.token.clone(), bot_data.clone()) + .await; } } Err(err) => { @@ -222,106 +87,29 @@ impl BotsManager { } } - async fn start_axum_server(&mut self) { - async fn telegram_request( - State(ServerState { routers }): State, - Path(token): Path, - input: String, - ) -> impl IntoResponse { - let routes = routers.read().await; - let tx = routes.get(&token); - - let (stop_token, r_tx) = match tx { - Some(tx) => tx, - None => return StatusCode::NOT_FOUND, - }; - - let tx = match r_tx.get() { - Some(v) => v, - None => { - stop_token.stop(); - if let Some((_, mut sender)) = routers.write().await.remove(&token) { - sender.close(); - }; - return StatusCode::SERVICE_UNAVAILABLE; - } - }; - - match serde_json::from_str::(&input) { - Ok(mut update) => { - if let UpdateKind::Error(value) = &mut update.kind { - *value = serde_json::from_str(&input).unwrap_or_default(); - } - - if let Err(err) = tx.send(Ok(update)) { - log::error!("{:?}", err); - stop_token.stop(); - if let Some((_, mut sender)) = routers.write().await.remove(&token) { - sender.close(); - }; - return StatusCode::SERVICE_UNAVAILABLE; - } - } - Err(error) => { - log::error!( - "Cannot parse an update.\nError: {:?}\nValue: {}\n\ - This is a bug in teloxide-core, please open an issue here: \ - https://github.com/teloxide/teloxide/issues.", - error, - input - ); - } - }; - - StatusCode::OK - } - - let port = self.port; - - let router = axum::Router::new() - .route("/:token/", post(telegram_request)) - .layer(TraceLayer::new_for_http()) - .with_state(self.state.clone()); - - tokio::spawn(async move { - log::info!("Start webserver..."); - - let addr = SocketAddr::from(([0, 0, 0, 0], port)); - - axum::Server::bind(&addr) - .serve(router.into_make_service()) - .await - .expect("Axum server error"); - - log::info!("Webserver shutdown..."); - }); - } - - pub async fn stop_all(self) { - let routers = self.state.routers.read().await; - - for (stop_token, _) in routers.values() { + pub async fn stop_all() { + for (_, (stop_token, _)) in BOTS_ROUTES.iter() { stop_token.stop(); } + BOTS_ROUTES.invalidate_all(); + sleep(Duration::from_secs(5)).await; } pub async fn start(running: Arc) { - let mut manager = BotsManager::create(); - - manager.start_axum_server().await; + start_axum_server().await; let mut interval = time::interval(Duration::from_secs(5)); loop { - manager.check().await; + BotsManager::check().await; for _i in 0..30 { interval.tick().await; if !running.load(Ordering::SeqCst) { - manager.stop_all().await; + BotsManager::stop_all().await; return; }; } diff --git a/src/bots_manager/utils.rs b/src/bots_manager/utils.rs new file mode 100644 index 0000000..505df3d --- /dev/null +++ b/src/bots_manager/utils.rs @@ -0,0 +1,3 @@ +pub fn tuple_first_mut(tuple: &mut (A, B)) -> &mut A { + &mut tuple.0 +}