diff --git a/Cargo.lock b/Cargo.lock index f417312..1a41644 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -203,6 +203,7 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" name = "book_bot" version = "0.1.0" dependencies = [ + "axum", "base64 0.21.2", "chrono", "ctrlc", diff --git a/Cargo.toml b/Cargo.toml index 301f348..ef7830e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,3 +27,4 @@ dateparser = "0.2.0" sentry = "0.31.3" lazy_static = "1.4.0" moka = { version = "0.11.1", features = ["future"] } +axum = "0.6.18" diff --git a/src/bots/mod.rs b/src/bots/mod.rs index 8a4027b..f8086bc 100644 --- a/src/bots/mod.rs +++ b/src/bots/mod.rs @@ -3,7 +3,7 @@ pub mod bots_manager; use std::error::Error; -use teloxide::{prelude::*, adaptors::{Throttle, CacheMe}}; +use teloxide::prelude::*; pub type BotHandlerInternal = Result<(), Box>; @@ -32,41 +32,6 @@ fn ignore_chat_member_update() -> crate::bots::BotHandler { .endpoint(|| async { Ok(()) }) } -fn get_pending_handler() -> BotHandler { - let handler = |msg: Message, bot: CacheMe>| async move { - let message_text = " - Бот зарегистрирован, но не подтвержден администратором! \ - Подтверждение занимает примерно 12 часов. - "; - - bot.send_message(msg.chat.id, message_text).await?; - Ok(()) - }; - - dptree::entry() - .branch(ignore_channel_messages()) - .branch(ignore_chat_member_update()) - .branch(Update::filter_message().chain(dptree::endpoint(handler))) -} - -fn get_blocked_handler() -> BotHandler { - let handler = |msg: Message, bot: CacheMe>| async move { - let message_text = "Бот заблокирован!"; - - bot.send_message(msg.chat.id, message_text).await?; - Ok(()) - }; - - dptree::entry() - .branch(ignore_channel_messages()) - .branch(ignore_chat_member_update()) - .branch(Update::filter_message().chain(dptree::endpoint(handler))) -} - -pub fn get_bot_handler(status: crate::bots_manager::BotStatus) -> (BotHandler, BotCommands) { - match status { - crate::bots_manager::BotStatus::Pending => (get_pending_handler(), None), - crate::bots_manager::BotStatus::Approved => approved_bot::get_approved_handler(), - crate::bots_manager::BotStatus::Blocked => (get_blocked_handler(), None), - } +pub fn get_bot_handler() -> (BotHandler, BotCommands) { + approved_bot::get_approved_handler() } diff --git a/src/bots_manager/mod.rs b/src/bots_manager/mod.rs index 6c9fb79..0d837cf 100644 --- a/src/bots_manager/mod.rs +++ b/src/bots_manager/mod.rs @@ -1,9 +1,11 @@ pub mod bot_manager_client; use std::collections::HashMap; +use std::net::SocketAddr; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +use axum::Router; use teloxide::adaptors::throttle::Limits; use teloxide::types::BotCommand; use tokio::time::{sleep, Duration}; @@ -16,10 +18,9 @@ use url::Url; use moka::future::Cache; -use crate::config; -pub use self::bot_manager_client::{BotStatus, BotCache, BotData}; use self::bot_manager_client::get_bots; - +pub use self::bot_manager_client::{BotCache, BotData, BotStatus}; +use crate::config; #[derive(Clone)] pub struct AppState { @@ -32,10 +33,15 @@ pub struct BotsManager { app_state: AppState, next_port: u16, bot_port_map: HashMap, - bot_status_and_cache_map: HashMap, bot_shutdown_token_map: HashMap, } +pub enum BotStartResult { + Success, + SuccessWithRouter(Router), + Failed +} + impl BotsManager { pub fn create() -> Self { BotsManager { @@ -51,23 +57,23 @@ impl BotsManager { chat_donation_notifications_cache: Cache::builder() .time_to_live(Duration::from_secs(24 * 60 * 60)) .max_capacity(32768) - .build() + .build(), }, next_port: 8000, bot_port_map: HashMap::new(), - bot_status_and_cache_map: HashMap::new(), bot_shutdown_token_map: HashMap::new(), } } - async fn start_bot(&mut self, bot_data: &BotData) -> bool { + async fn start_bot(&mut self, bot_data: &BotData, is_first_start: bool) -> BotStartResult { let bot = Bot::new(bot_data.token.clone()) .set_api_url(config::CONFIG.telegram_bot_api.clone()) .throttle(Limits::default()) .cache_me(); let token = bot.inner().inner().token(); - let port = self.bot_port_map + let port = self + .bot_port_map .get(&bot_data.id) .unwrap_or_else(|| panic!("Can't get bot port!")); @@ -84,18 +90,7 @@ impl BotsManager { port ); - let listener_result = webhooks::axum(bot.clone(), webhooks::Options::new(addr, url)).await; - - let listener = match listener_result { - Ok(v) => v, - Err(err) => { - log::warn!("{}", err); - - return false; - }, - }; - - let (handler, commands) = crate::bots::get_bot_handler(bot_data.status); + let (handler, commands) = crate::bots::get_bot_handler(); let set_command_result = match commands { Some(v) => bot.set_my_commands::>(v).send().await, @@ -106,85 +101,102 @@ impl BotsManager { Err(err) => log::error!("{:?}", err), } - let mut dispatcher = Dispatcher::builder(bot, handler) + let mut dispatcher = Dispatcher::builder(bot.clone(), handler) .dependencies(dptree::deps![bot_data.cache, self.app_state.clone()]) .build(); let shutdown_token = dispatcher.shutdown_token(); - self.bot_shutdown_token_map - .insert(bot_data.id, shutdown_token); + self.bot_shutdown_token_map + .insert(bot_data.id, shutdown_token); - tokio::spawn(async move { - dispatcher - .dispatch_with_listener( - listener, - LoggingErrorHandler::with_custom_text("An error from the update listener"), - ) - .await; - }); + if is_first_start { + let (listener, router) = match webhooks::axum_to_router(bot.clone(), webhooks::Options::new(addr, url)).await { + Ok(v) => (v.0, v.2), + Err(err) => { + log::warn!("{}", err); - true + return BotStartResult::Failed; + }, + }; + + tokio::spawn(async move { + dispatcher + .dispatch_with_listener( + listener, + LoggingErrorHandler::with_custom_text("An error from the update listener"), + ) + .await; + }); + + BotStartResult::SuccessWithRouter(router) + } else { + let listener = match webhooks::axum(bot.clone(), webhooks::Options::new(addr, url)).await { + Ok(v) => v, + Err(err) => { + log::warn!("{}", err); + + return BotStartResult::Failed; + }, + }; + + tokio::spawn(async move { + dispatcher + .dispatch_with_listener( + listener, + LoggingErrorHandler::with_custom_text("An error from the update listener"), + ) + .await; + }); + + BotStartResult::Success + } } async fn sd_token(token: &ShutdownToken) { for _ in 1..10 { - if let Ok(v) = token.clone().shutdown() { return v.await } + if let Ok(v) = token.clone().shutdown() { + return v.await; + } sleep(Duration::from_millis(100)).await; } } - async fn stop_bot(&mut self, bot_id: u32) { - let shutdown_token = match self.bot_shutdown_token_map.remove(&bot_id) { - Some(v) => v, - None => return, - }; + async fn update_data(&mut self, bots_data: Vec, is_first_start: bool) -> Vec { + let mut routers: Vec = vec![]; - BotsManager::sd_token(&shutdown_token).await; - } - - async fn update_data(&mut self, bots_data: Vec) { for bot_data in bots_data.iter() { - if let std::collections::hash_map::Entry::Vacant(e) = self.bot_port_map.entry(bot_data.id) { + if let std::collections::hash_map::Entry::Vacant(e) = + self.bot_port_map.entry(bot_data.id) + { e.insert(self.next_port); self.next_port += 1; - } - let result = match self.bot_status_and_cache_map.get(&bot_data.id) { - Some(v) => { - let mut update_result = true; - - if *v != (bot_data.status, bot_data.cache) { - self.bot_status_and_cache_map - .insert(bot_data.id, (bot_data.status, bot_data.cache)); - self.stop_bot(bot_data.id).await; - - update_result = self.start_bot(bot_data).await; - } - - update_result + match self.start_bot(bot_data, is_first_start).await { + BotStartResult::Success => (), + BotStartResult::SuccessWithRouter(router) => { + routers.push(router); + }, + BotStartResult::Failed => { + self.bot_shutdown_token_map.remove(&bot_data.id); + }, } - None => { - self.bot_status_and_cache_map - .insert(bot_data.id, (bot_data.status, bot_data.cache)); - - self.start_bot(bot_data).await - } - }; - - if !result { - self.bot_status_and_cache_map.remove(&bot_data.id); - self.bot_shutdown_token_map.remove(&bot_data.id); } } + + routers } - async fn check(&mut self) { + async fn check(&mut self, is_first_start: bool) -> Option> { let bots_data = get_bots().await; match bots_data { - Ok(v) => self.update_data(v).await, - Err(err) => log::info!("{:?}", err), + Ok(v) => Some(self.update_data(v, is_first_start).await), + Err(err) => { + log::info!("{:?}", err); + + None + } } } @@ -194,18 +206,47 @@ impl BotsManager { } } + async fn start_axum_server(&mut self) { + loop { + let routers = match self.check(true).await { + Some(v) => v, + None => continue, + }; + + let mut app = Router::new(); + + for router in routers { + app = app.merge(router); + } + + let addr = SocketAddr::from(([0, 0, 0, 0], self.next_port)); + self.next_port += 1; + + tokio::spawn(async move { + log::info!("Start webserver..."); + axum::Server::bind(&addr) + .serve(app.into_make_service()) + .await + .unwrap(); + log::info!("Webserver shutdown...") + }); + } + } + pub async fn start(running: Arc) { let mut manager = BotsManager::create(); - loop { - manager.check().await; + manager.start_axum_server().await; + loop { if !running.load(Ordering::SeqCst) { manager.stop_all().await; return; } sleep(Duration::from_secs(30)).await; + + manager.check(false).await; } } }