This commit is contained in:
2023-06-20 02:41:56 +02:00
parent 97efec18f1
commit 6d049fd1ca
4 changed files with 119 additions and 111 deletions

1
Cargo.lock generated
View File

@@ -203,6 +203,7 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
name = "book_bot" name = "book_bot"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"axum",
"base64 0.21.2", "base64 0.21.2",
"chrono", "chrono",
"ctrlc", "ctrlc",

View File

@@ -27,3 +27,4 @@ dateparser = "0.2.0"
sentry = "0.31.3" sentry = "0.31.3"
lazy_static = "1.4.0" lazy_static = "1.4.0"
moka = { version = "0.11.1", features = ["future"] } moka = { version = "0.11.1", features = ["future"] }
axum = "0.6.18"

View File

@@ -3,7 +3,7 @@ pub mod bots_manager;
use std::error::Error; use std::error::Error;
use teloxide::{prelude::*, adaptors::{Throttle, CacheMe}}; use teloxide::prelude::*;
pub type BotHandlerInternal = Result<(), Box<dyn Error + Send + Sync>>; pub type BotHandlerInternal = Result<(), Box<dyn Error + Send + Sync>>;
@@ -32,41 +32,6 @@ fn ignore_chat_member_update() -> crate::bots::BotHandler {
.endpoint(|| async { Ok(()) }) .endpoint(|| async { Ok(()) })
} }
fn get_pending_handler() -> BotHandler { pub fn get_bot_handler() -> (BotHandler, BotCommands) {
let handler = |msg: Message, bot: CacheMe<Throttle<Bot>>| async move { approved_bot::get_approved_handler()
let message_text = "
Бот зарегистрирован, но не подтвержден администратором! \
Подтверждение занимает примерно 12 часов.
";
bot.send_message(msg.chat.id, message_text).await?;
Ok(())
};
dptree::entry()
.branch(ignore_channel_messages())
.branch(ignore_chat_member_update())
.branch(Update::filter_message().chain(dptree::endpoint(handler)))
}
fn get_blocked_handler() -> BotHandler {
let handler = |msg: Message, bot: CacheMe<Throttle<Bot>>| async move {
let message_text = "Бот заблокирован!";
bot.send_message(msg.chat.id, message_text).await?;
Ok(())
};
dptree::entry()
.branch(ignore_channel_messages())
.branch(ignore_chat_member_update())
.branch(Update::filter_message().chain(dptree::endpoint(handler)))
}
pub fn get_bot_handler(status: crate::bots_manager::BotStatus) -> (BotHandler, BotCommands) {
match status {
crate::bots_manager::BotStatus::Pending => (get_pending_handler(), None),
crate::bots_manager::BotStatus::Approved => approved_bot::get_approved_handler(),
crate::bots_manager::BotStatus::Blocked => (get_blocked_handler(), None),
}
} }

View File

@@ -1,9 +1,11 @@
pub mod bot_manager_client; pub mod bot_manager_client;
use std::collections::HashMap; use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::Arc;
use axum::Router;
use teloxide::adaptors::throttle::Limits; use teloxide::adaptors::throttle::Limits;
use teloxide::types::BotCommand; use teloxide::types::BotCommand;
use tokio::time::{sleep, Duration}; use tokio::time::{sleep, Duration};
@@ -16,10 +18,9 @@ use url::Url;
use moka::future::Cache; use moka::future::Cache;
use crate::config;
pub use self::bot_manager_client::{BotStatus, BotCache, BotData};
use self::bot_manager_client::get_bots; use self::bot_manager_client::get_bots;
pub use self::bot_manager_client::{BotCache, BotData, BotStatus};
use crate::config;
#[derive(Clone)] #[derive(Clone)]
pub struct AppState { pub struct AppState {
@@ -32,10 +33,15 @@ pub struct BotsManager {
app_state: AppState, app_state: AppState,
next_port: u16, next_port: u16,
bot_port_map: HashMap<u32, u16>, bot_port_map: HashMap<u32, u16>,
bot_status_and_cache_map: HashMap<u32, (BotStatus, BotCache)>,
bot_shutdown_token_map: HashMap<u32, ShutdownToken>, bot_shutdown_token_map: HashMap<u32, ShutdownToken>,
} }
pub enum BotStartResult {
Success,
SuccessWithRouter(Router),
Failed
}
impl BotsManager { impl BotsManager {
pub fn create() -> Self { pub fn create() -> Self {
BotsManager { BotsManager {
@@ -51,23 +57,23 @@ impl BotsManager {
chat_donation_notifications_cache: Cache::builder() chat_donation_notifications_cache: Cache::builder()
.time_to_live(Duration::from_secs(24 * 60 * 60)) .time_to_live(Duration::from_secs(24 * 60 * 60))
.max_capacity(32768) .max_capacity(32768)
.build() .build(),
}, },
next_port: 8000, next_port: 8000,
bot_port_map: HashMap::new(), bot_port_map: HashMap::new(),
bot_status_and_cache_map: HashMap::new(),
bot_shutdown_token_map: HashMap::new(), bot_shutdown_token_map: HashMap::new(),
} }
} }
async fn start_bot(&mut self, bot_data: &BotData) -> bool { async fn start_bot(&mut self, bot_data: &BotData, is_first_start: bool) -> BotStartResult {
let bot = Bot::new(bot_data.token.clone()) let bot = Bot::new(bot_data.token.clone())
.set_api_url(config::CONFIG.telegram_bot_api.clone()) .set_api_url(config::CONFIG.telegram_bot_api.clone())
.throttle(Limits::default()) .throttle(Limits::default())
.cache_me(); .cache_me();
let token = bot.inner().inner().token(); let token = bot.inner().inner().token();
let port = self.bot_port_map let port = self
.bot_port_map
.get(&bot_data.id) .get(&bot_data.id)
.unwrap_or_else(|| panic!("Can't get bot port!")); .unwrap_or_else(|| panic!("Can't get bot port!"));
@@ -84,18 +90,7 @@ impl BotsManager {
port port
); );
let listener_result = webhooks::axum(bot.clone(), webhooks::Options::new(addr, url)).await; let (handler, commands) = crate::bots::get_bot_handler();
let listener = match listener_result {
Ok(v) => v,
Err(err) => {
log::warn!("{}", err);
return false;
},
};
let (handler, commands) = crate::bots::get_bot_handler(bot_data.status);
let set_command_result = match commands { let set_command_result = match commands {
Some(v) => bot.set_my_commands::<Vec<BotCommand>>(v).send().await, Some(v) => bot.set_my_commands::<Vec<BotCommand>>(v).send().await,
@@ -106,7 +101,7 @@ impl BotsManager {
Err(err) => log::error!("{:?}", err), Err(err) => log::error!("{:?}", err),
} }
let mut dispatcher = Dispatcher::builder(bot, handler) let mut dispatcher = Dispatcher::builder(bot.clone(), handler)
.dependencies(dptree::deps![bot_data.cache, self.app_state.clone()]) .dependencies(dptree::deps![bot_data.cache, self.app_state.clone()])
.build(); .build();
@@ -114,6 +109,16 @@ impl BotsManager {
self.bot_shutdown_token_map self.bot_shutdown_token_map
.insert(bot_data.id, shutdown_token); .insert(bot_data.id, shutdown_token);
if is_first_start {
let (listener, router) = match webhooks::axum_to_router(bot.clone(), webhooks::Options::new(addr, url)).await {
Ok(v) => (v.0, v.2),
Err(err) => {
log::warn!("{}", err);
return BotStartResult::Failed;
},
};
tokio::spawn(async move { tokio::spawn(async move {
dispatcher dispatcher
.dispatch_with_listener( .dispatch_with_listener(
@@ -123,68 +128,75 @@ impl BotsManager {
.await; .await;
}); });
true BotStartResult::SuccessWithRouter(router)
} else {
let listener = match webhooks::axum(bot.clone(), webhooks::Options::new(addr, url)).await {
Ok(v) => v,
Err(err) => {
log::warn!("{}", err);
return BotStartResult::Failed;
},
};
tokio::spawn(async move {
dispatcher
.dispatch_with_listener(
listener,
LoggingErrorHandler::with_custom_text("An error from the update listener"),
)
.await;
});
BotStartResult::Success
}
} }
async fn sd_token(token: &ShutdownToken) { async fn sd_token(token: &ShutdownToken) {
for _ in 1..10 { for _ in 1..10 {
if let Ok(v) = token.clone().shutdown() { return v.await } if let Ok(v) = token.clone().shutdown() {
return v.await;
}
sleep(Duration::from_millis(100)).await; sleep(Duration::from_millis(100)).await;
} }
} }
async fn stop_bot(&mut self, bot_id: u32) { async fn update_data(&mut self, bots_data: Vec<BotData>, is_first_start: bool) -> Vec<Router> {
let shutdown_token = match self.bot_shutdown_token_map.remove(&bot_id) { let mut routers: Vec<Router> = vec![];
Some(v) => v,
None => return,
};
BotsManager::sd_token(&shutdown_token).await;
}
async fn update_data(&mut self, bots_data: Vec<BotData>) {
for bot_data in bots_data.iter() { for bot_data in bots_data.iter() {
if let std::collections::hash_map::Entry::Vacant(e) = self.bot_port_map.entry(bot_data.id) { if let std::collections::hash_map::Entry::Vacant(e) =
self.bot_port_map.entry(bot_data.id)
{
e.insert(self.next_port); e.insert(self.next_port);
self.next_port += 1; self.next_port += 1;
}
let result = match self.bot_status_and_cache_map.get(&bot_data.id) { match self.start_bot(bot_data, is_first_start).await {
Some(v) => { BotStartResult::Success => (),
let mut update_result = true; BotStartResult::SuccessWithRouter(router) => {
routers.push(router);
if *v != (bot_data.status, bot_data.cache) { },
self.bot_status_and_cache_map BotStartResult::Failed => {
.insert(bot_data.id, (bot_data.status, bot_data.cache));
self.stop_bot(bot_data.id).await;
update_result = self.start_bot(bot_data).await;
}
update_result
}
None => {
self.bot_status_and_cache_map
.insert(bot_data.id, (bot_data.status, bot_data.cache));
self.start_bot(bot_data).await
}
};
if !result {
self.bot_status_and_cache_map.remove(&bot_data.id);
self.bot_shutdown_token_map.remove(&bot_data.id); self.bot_shutdown_token_map.remove(&bot_data.id);
},
} }
} }
} }
async fn check(&mut self) { routers
}
async fn check(&mut self, is_first_start: bool) -> Option<Vec<Router>> {
let bots_data = get_bots().await; let bots_data = get_bots().await;
match bots_data { match bots_data {
Ok(v) => self.update_data(v).await, Ok(v) => Some(self.update_data(v, is_first_start).await),
Err(err) => log::info!("{:?}", err), Err(err) => {
log::info!("{:?}", err);
None
}
} }
} }
@@ -194,18 +206,47 @@ impl BotsManager {
} }
} }
async fn start_axum_server(&mut self) {
loop {
let routers = match self.check(true).await {
Some(v) => v,
None => continue,
};
let mut app = Router::new();
for router in routers {
app = app.merge(router);
}
let addr = SocketAddr::from(([0, 0, 0, 0], self.next_port));
self.next_port += 1;
tokio::spawn(async move {
log::info!("Start webserver...");
axum::Server::bind(&addr)
.serve(app.into_make_service())
.await
.unwrap();
log::info!("Webserver shutdown...")
});
}
}
pub async fn start(running: Arc<AtomicBool>) { pub async fn start(running: Arc<AtomicBool>) {
let mut manager = BotsManager::create(); let mut manager = BotsManager::create();
loop { manager.start_axum_server().await;
manager.check().await;
loop {
if !running.load(Ordering::SeqCst) { if !running.load(Ordering::SeqCst) {
manager.stop_all().await; manager.stop_all().await;
return; return;
} }
sleep(Duration::from_secs(30)).await; sleep(Duration::from_secs(30)).await;
manager.check(false).await;
} }
} }
} }