From 507ad1f91f10b648d1bf222cab3d2f3b35c47091 Mon Sep 17 00:00:00 2001 From: Bulat Kurbanov Date: Fri, 21 Jul 2023 01:07:08 +0200 Subject: [PATCH] Update bots manager --- Cargo.lock | 23 ++- Cargo.toml | 3 + src/bots_manager/mod.rs | 343 +++++++++++++++++++++++----------------- src/main.rs | 6 +- 4 files changed, 222 insertions(+), 153 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a5a38b2..6927109 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -225,7 +225,10 @@ dependencies = [ "teloxide", "textwrap", "tokio", + "tokio-stream", "tokio-util", + "tower", + "tower-http 0.4.3", "url", ] @@ -1986,7 +1989,7 @@ dependencies = [ "tokio-stream", "tokio-util", "tower", - "tower-http", + "tower-http 0.3.5", "url", ] @@ -2242,6 +2245,24 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower-http" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55ae70283aba8d2a8b411c695c437fe25b8b5e44e23e780662002fc72fb47a82" +dependencies = [ + "bitflags 2.3.3", + "bytes", + "futures-core", + "futures-util", + "http", + "http-body", + "http-range-header", + "pin-project-lite", + "tower-layer", + "tower-service", +] + [[package]] name = "tower-layer" version = "0.3.2" diff --git a/Cargo.toml b/Cargo.toml index 27904bf..eaf8026 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,3 +30,6 @@ moka = { version = "0.11.1", features = ["future"] } axum = "0.6.18" smallvec = { version = "1.10.0", features = ["serde"] } smartstring = { version = "1.0.1", features = ["serde"] } +tokio-stream = "0.1.14" +tower = "0.4.13" +tower-http = "0.4.3" diff --git a/src/bots_manager/mod.rs b/src/bots_manager/mod.rs index 6a21251..ad955d0 100644 --- a/src/bots_manager/mod.rs +++ b/src/bots_manager/mod.rs @@ -1,23 +1,28 @@ pub mod bot_manager_client; +use axum::extract::State; +use axum::response::IntoResponse; +use axum::routing::post; +use reqwest::StatusCode; use smartstring::alias::String as SmartString; +use teloxide::stop::{mk_stop_token, StopToken, StopFlag}; +use teloxide::update_listeners::{StatefulListener, UpdateListener}; +use tokio::sync::mpsc::{UnboundedSender, self}; +use tokio_stream::wrappers::UnboundedReceiverStream; +use url::Url; use std::collections::HashMap; +use std::convert::Infallible; use std::net::SocketAddr; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; -use axum::Router; use smallvec::SmallVec; use teloxide::adaptors::throttle::Limits; -use teloxide::types::BotCommand; +use teloxide::types::{BotCommand, UpdateKind}; use tokio::time::{sleep, Duration}; -use teloxide::{ - dispatching::{update_listeners::webhooks, ShutdownToken}, - prelude::*, -}; -use url::Url; +use teloxide::prelude::*; use moka::future::Cache; @@ -25,6 +30,15 @@ use self::bot_manager_client::get_bots; pub use self::bot_manager_client::{BotCache, BotData}; use crate::config; + +type UpdateSender = mpsc::UnboundedSender>; + + +fn tuple_first_mut(tuple: &mut (A, B)) -> &mut A { + &mut tuple.0 +} + + #[derive(Clone)] pub struct AppState { pub user_activity_cache: Cache, @@ -32,17 +46,19 @@ pub struct AppState { pub chat_donation_notifications_cache: Cache, } -pub struct BotsManager { - app_state: AppState, - next_port: u16, - bot_port_map: HashMap, - bot_shutdown_token_map: HashMap, + +#[derive(Default, Clone)] +struct ServerState { + routers: Arc>>>>, } -pub enum BotStartResult { - Success, - SuccessWithRouter(Router), - Failed +pub struct BotsManager { + app_state: AppState, + + port: u16, + stop_data: (StopToken, StopFlag), + + state: ServerState } impl BotsManager { @@ -62,35 +78,41 @@ impl BotsManager { .max_capacity(2048) .build(), }, - next_port: 8000, - bot_port_map: HashMap::new(), - bot_shutdown_token_map: HashMap::new(), + + port: 8000, + stop_data: mk_stop_token(), + + state: ServerState { + routers: Arc::new(RwLock::new(HashMap::new())) + } } } - async fn start_bot(&mut self, bot_data: &BotData, is_first_start: bool) -> BotStartResult { + fn get_listener(&self) -> (UnboundedSender>, impl UpdateListener) { + let (tx, rx): (UpdateSender, _) = mpsc::unbounded_channel(); + + let stream = UnboundedReceiverStream::new(rx); + + let listener = StatefulListener::new( + (stream, self.stop_data.0.clone()), + tuple_first_mut, + |state: &mut (_, StopToken)| { + state.1.clone() + }, + ); + + return (tx, listener); + } + + async fn start_bot(&mut self, bot_data: &BotData) -> bool { let bot = Bot::new(bot_data.token.clone()) .set_api_url(config::CONFIG.telegram_bot_api.clone()) .throttle(Limits::default()) .cache_me(); let token = bot.inner().inner().token(); - let port = self - .bot_port_map - .get(&bot_data.id) - .unwrap_or_else(|| panic!("Can't get bot port!")); - let addr = ([0, 0, 0, 0], *port).into(); - - let host = format!("{}:{}", &config::CONFIG.webhook_base_url, port); - let url = Url::parse(&format!("{host}/{token}")) - .unwrap_or_else(|_| panic!("Can't parse webhook url!")); - - log::info!( - "Start bot(id={}) port {}", - bot_data.id, - port - ); + log::info!("Start bot(id={})", bot_data.id); let (handler, commands) = crate::bots::get_bot_handler(); @@ -107,137 +129,164 @@ impl BotsManager { .dependencies(dptree::deps![bot_data.cache, self.app_state.clone()]) .build(); - let shutdown_token = dispatcher.shutdown_token(); - self.bot_shutdown_token_map - .insert(bot_data.id, shutdown_token); + let (tx, listener) = self.get_listener(); - if is_first_start { - let (listener, router) = match webhooks::axum_to_router(bot.clone(), webhooks::Options::new(addr, url)).await { - Ok(v) => (v.0, v.2), - Err(err) => { - log::warn!("{}", err); + let mut routers = self.state.routers.write().unwrap(); + routers.insert(token.to_string(), tx); - return BotStartResult::Failed; - }, - }; + let host = format!("{}:{}", &config::CONFIG.webhook_base_url, self.port); + let url = Url::parse(&format!("{host}/{token}")) + .unwrap_or_else(|_| panic!("Can't parse webhook url!")); - tokio::spawn(async move { - dispatcher - .dispatch_with_listener( - listener, - LoggingErrorHandler::with_custom_text("An error from the update listener"), - ) - .await; - }); - - BotStartResult::SuccessWithRouter(router) - } else { - let listener = match webhooks::axum(bot.clone(), webhooks::Options::new(addr, url)).await { - Ok(v) => v, - Err(err) => { - log::warn!("{}", err); - - return BotStartResult::Failed; - }, - }; - - tokio::spawn(async move { - dispatcher - .dispatch_with_listener( - listener, - LoggingErrorHandler::with_custom_text("An error from the update listener"), - ) - .await; - }); - - BotStartResult::Success - } - } - - async fn sd_token(token: &ShutdownToken) { - for _ in 1..10 { - if let Ok(v) = token.clone().shutdown() { - return v.await; - } - - sleep(Duration::from_millis(100)).await; - } - } - - async fn update_data(&mut self, bots_data: Vec, is_first_start: bool) -> Vec { - let mut routers: Vec = vec![]; - - for bot_data in bots_data.iter() { - if let std::collections::hash_map::Entry::Vacant(e) = - self.bot_port_map.entry(bot_data.id) - { - e.insert(self.next_port); - - if !is_first_start { - self.next_port += 1; - } - - match self.start_bot(bot_data, is_first_start).await { - BotStartResult::Success => (), - BotStartResult::SuccessWithRouter(router) => { - routers.push(router); - }, - BotStartResult::Failed => { - self.bot_shutdown_token_map.remove(&bot_data.id); - }, - } - } + match bot.set_webhook(url.clone()).await { + Ok(_) => (), + Err(_) => return false, } - routers + tokio::spawn(async move { + dispatcher + .dispatch_with_listener( + listener, + LoggingErrorHandler::with_custom_text("An error from the update listener"), + ) + .await; + }); + + return true; } - async fn check(&mut self, is_first_start: bool) -> Option> { + async fn check(&mut self){ let bots_data = get_bots().await; match bots_data { - Ok(v) => Some(self.update_data(v, is_first_start).await), + Ok(v) => { + for bot_data in v.iter() { + let need_start = { + let routers = self.state.routers.read().unwrap(); + !routers.contains_key(&bot_data.token) + }; + + if need_start { + self.start_bot(bot_data).await; + } + } + }, Err(err) => { log::info!("{:?}", err); - - None } } } - async fn stop_all(&mut self) { - for token in self.bot_shutdown_token_map.values() { - BotsManager::sd_token(token).await; - } - } + // async fn start_axum_server(&mut self) { + // loop { + // let routers = match self.check(true).await { + // Some(v) => v, + // None => continue, + // }; + + // let mut app = Router::new(); + + // for router in routers { + // app = app.merge(router); + // } + + // let addr = SocketAddr::from(([0, 0, 0, 0], self.next_port)); + // self.next_port += 1; + + // tokio::spawn(async move { + // log::info!("Start webserver..."); + // axum::Server::bind(&addr) + // .serve(app.into_make_service()) + // .await + // .unwrap(); + // log::info!("Webserver shutdown...") + // }); + + // return; + // } + // } async fn start_axum_server(&mut self) { - loop { - let routers = match self.check(true).await { - Some(v) => v, - None => continue, + async fn telegram_request( + State(ServerState { routers }): State, + // secret_header: XTelegramBotApiSecretToken, + input: String, + ) -> impl IntoResponse { + // // FIXME: use constant time comparison here + // if secret_header.0.as_deref() != secret.as_deref().map(str::as_bytes) { + // return StatusCode::UNAUTHORIZED; + // } + + let t1 = routers.read().unwrap(); + let tx = t1.get("t"); + + let tx = match tx { + Some(tx) => { + tx + // match tx.get() { + // None => return StatusCode::SERVICE_UNAVAILABLE, + // // Do not process updates after `.stop()` is called even if the server is still + // // running (useful for when you need to stop the bot but can't stop the server). + // // TODO + // // _ if flag.is_stopped() => { + // // tx.close(); + // // return StatusCode::SERVICE_UNAVAILABLE; + // // } + // Some(tx) => tx, + // }; + }, + None => return StatusCode::NOT_FOUND, }; - let mut app = Router::new(); + match serde_json::from_str::(&input) { + Ok(mut update) => { + // See HACK comment in + // `teloxide_core::net::request::process_response::{closure#0}` + if let UpdateKind::Error(value) = &mut update.kind { + *value = serde_json::from_str(&input).unwrap_or_default(); + } - for router in routers { - app = app.merge(router); - } + tx.send(Ok(update)).expect("Cannot send an incoming update from the webhook") + } + Err(error) => { + log::error!( + "Cannot parse an update.\nError: {:?}\nValue: {}\n\ + This is a bug in teloxide-core, please open an issue here: \ + https://github.com/teloxide/teloxide/issues.", + error, + input + ); + } + }; - let addr = SocketAddr::from(([0, 0, 0, 0], self.next_port)); - self.next_port += 1; - - tokio::spawn(async move { - log::info!("Start webserver..."); - axum::Server::bind(&addr) - .serve(app.into_make_service()) - .await - .unwrap(); - log::info!("Webserver shutdown...") - }); - - return; + StatusCode::OK } + + let stop_flag = self.stop_data.1.clone(); + let state = self.state.clone(); + + tokio::spawn(async move { + log::info!("Start webserver..."); + + let addr = SocketAddr::from(([0, 0, 0, 0], 8000)); + + let router = axum::Router::new() + .route("/:token/", post(telegram_request)) + // .layer(TraceLayer::new_for_http()) + .with_state(state); + + axum::Server::bind(&addr) + .serve(router.into_make_service()) + .with_graceful_shutdown(stop_flag) + .await + // .map_err(|err| { + // stop_token.stop(); + // err + // }) + .expect("Axum server error"); + + log::info!("Webserver shutdown..."); + }); } pub async fn start(running: Arc) { @@ -247,13 +296,13 @@ impl BotsManager { loop { if !running.load(Ordering::SeqCst) { - manager.stop_all().await; + manager.stop_data.0.stop(); return; } - sleep(Duration::from_secs(30)).await; + manager.check().await; - manager.check(false).await; + sleep(Duration::from_secs(30)).await; } } } diff --git a/src/main.rs b/src/main.rs index d1c982f..e99a2d1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -21,9 +21,5 @@ async fn main() { }) .expect("Error setting Ctrl-C handler"); - tokio::spawn(async move { - bots_manager::BotsManager::start(running).await; - }) - .await - .unwrap(); + bots_manager::BotsManager::start(running).await; }