Update bots manager

This commit is contained in:
2023-07-21 01:07:08 +02:00
parent 1b4a5a6b2e
commit 507ad1f91f
4 changed files with 222 additions and 153 deletions

23
Cargo.lock generated
View File

@@ -225,7 +225,10 @@ dependencies = [
"teloxide", "teloxide",
"textwrap", "textwrap",
"tokio", "tokio",
"tokio-stream",
"tokio-util", "tokio-util",
"tower",
"tower-http 0.4.3",
"url", "url",
] ]
@@ -1986,7 +1989,7 @@ dependencies = [
"tokio-stream", "tokio-stream",
"tokio-util", "tokio-util",
"tower", "tower",
"tower-http", "tower-http 0.3.5",
"url", "url",
] ]
@@ -2242,6 +2245,24 @@ dependencies = [
"tracing", "tracing",
] ]
[[package]]
name = "tower-http"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55ae70283aba8d2a8b411c695c437fe25b8b5e44e23e780662002fc72fb47a82"
dependencies = [
"bitflags 2.3.3",
"bytes",
"futures-core",
"futures-util",
"http",
"http-body",
"http-range-header",
"pin-project-lite",
"tower-layer",
"tower-service",
]
[[package]] [[package]]
name = "tower-layer" name = "tower-layer"
version = "0.3.2" version = "0.3.2"

View File

@@ -30,3 +30,6 @@ moka = { version = "0.11.1", features = ["future"] }
axum = "0.6.18" axum = "0.6.18"
smallvec = { version = "1.10.0", features = ["serde"] } smallvec = { version = "1.10.0", features = ["serde"] }
smartstring = { version = "1.0.1", features = ["serde"] } smartstring = { version = "1.0.1", features = ["serde"] }
tokio-stream = "0.1.14"
tower = "0.4.13"
tower-http = "0.4.3"

View File

@@ -1,23 +1,28 @@
pub mod bot_manager_client; pub mod bot_manager_client;
use axum::extract::State;
use axum::response::IntoResponse;
use axum::routing::post;
use reqwest::StatusCode;
use smartstring::alias::String as SmartString; use smartstring::alias::String as SmartString;
use teloxide::stop::{mk_stop_token, StopToken, StopFlag};
use teloxide::update_listeners::{StatefulListener, UpdateListener};
use tokio::sync::mpsc::{UnboundedSender, self};
use tokio_stream::wrappers::UnboundedReceiverStream;
use url::Url;
use std::collections::HashMap; use std::collections::HashMap;
use std::convert::Infallible;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::{Arc, RwLock};
use axum::Router;
use smallvec::SmallVec; use smallvec::SmallVec;
use teloxide::adaptors::throttle::Limits; use teloxide::adaptors::throttle::Limits;
use teloxide::types::BotCommand; use teloxide::types::{BotCommand, UpdateKind};
use tokio::time::{sleep, Duration}; use tokio::time::{sleep, Duration};
use teloxide::{ use teloxide::prelude::*;
dispatching::{update_listeners::webhooks, ShutdownToken},
prelude::*,
};
use url::Url;
use moka::future::Cache; use moka::future::Cache;
@@ -25,6 +30,15 @@ use self::bot_manager_client::get_bots;
pub use self::bot_manager_client::{BotCache, BotData}; pub use self::bot_manager_client::{BotCache, BotData};
use crate::config; use crate::config;
type UpdateSender = mpsc::UnboundedSender<Result<Update, std::convert::Infallible>>;
fn tuple_first_mut<A, B>(tuple: &mut (A, B)) -> &mut A {
&mut tuple.0
}
#[derive(Clone)] #[derive(Clone)]
pub struct AppState { pub struct AppState {
pub user_activity_cache: Cache<UserId, ()>, pub user_activity_cache: Cache<UserId, ()>,
@@ -32,17 +46,19 @@ pub struct AppState {
pub chat_donation_notifications_cache: Cache<ChatId, ()>, pub chat_donation_notifications_cache: Cache<ChatId, ()>,
} }
pub struct BotsManager {
app_state: AppState, #[derive(Default, Clone)]
next_port: u16, struct ServerState {
bot_port_map: HashMap<u32, u16>, routers: Arc<RwLock<HashMap<String, UnboundedSender<Result<Update, std::convert::Infallible>>>>>,
bot_shutdown_token_map: HashMap<u32, ShutdownToken>,
} }
pub enum BotStartResult { pub struct BotsManager {
Success, app_state: AppState,
SuccessWithRouter(Router),
Failed port: u16,
stop_data: (StopToken, StopFlag),
state: ServerState
} }
impl BotsManager { impl BotsManager {
@@ -62,35 +78,41 @@ impl BotsManager {
.max_capacity(2048) .max_capacity(2048)
.build(), .build(),
}, },
next_port: 8000,
bot_port_map: HashMap::new(), port: 8000,
bot_shutdown_token_map: HashMap::new(), stop_data: mk_stop_token(),
state: ServerState {
routers: Arc::new(RwLock::new(HashMap::new()))
}
} }
} }
async fn start_bot(&mut self, bot_data: &BotData, is_first_start: bool) -> BotStartResult { fn get_listener(&self) -> (UnboundedSender<Result<Update, std::convert::Infallible>>, impl UpdateListener<Err = Infallible>) {
let (tx, rx): (UpdateSender, _) = mpsc::unbounded_channel();
let stream = UnboundedReceiverStream::new(rx);
let listener = StatefulListener::new(
(stream, self.stop_data.0.clone()),
tuple_first_mut,
|state: &mut (_, StopToken)| {
state.1.clone()
},
);
return (tx, listener);
}
async fn start_bot(&mut self, bot_data: &BotData) -> bool {
let bot = Bot::new(bot_data.token.clone()) let bot = Bot::new(bot_data.token.clone())
.set_api_url(config::CONFIG.telegram_bot_api.clone()) .set_api_url(config::CONFIG.telegram_bot_api.clone())
.throttle(Limits::default()) .throttle(Limits::default())
.cache_me(); .cache_me();
let token = bot.inner().inner().token(); let token = bot.inner().inner().token();
let port = self
.bot_port_map
.get(&bot_data.id)
.unwrap_or_else(|| panic!("Can't get bot port!"));
let addr = ([0, 0, 0, 0], *port).into(); log::info!("Start bot(id={})", bot_data.id);
let host = format!("{}:{}", &config::CONFIG.webhook_base_url, port);
let url = Url::parse(&format!("{host}/{token}"))
.unwrap_or_else(|_| panic!("Can't parse webhook url!"));
log::info!(
"Start bot(id={}) port {}",
bot_data.id,
port
);
let (handler, commands) = crate::bots::get_bot_handler(); let (handler, commands) = crate::bots::get_bot_handler();
@@ -107,137 +129,164 @@ impl BotsManager {
.dependencies(dptree::deps![bot_data.cache, self.app_state.clone()]) .dependencies(dptree::deps![bot_data.cache, self.app_state.clone()])
.build(); .build();
let shutdown_token = dispatcher.shutdown_token(); let (tx, listener) = self.get_listener();
self.bot_shutdown_token_map
.insert(bot_data.id, shutdown_token);
if is_first_start { let mut routers = self.state.routers.write().unwrap();
let (listener, router) = match webhooks::axum_to_router(bot.clone(), webhooks::Options::new(addr, url)).await { routers.insert(token.to_string(), tx);
Ok(v) => (v.0, v.2),
Err(err) => {
log::warn!("{}", err);
return BotStartResult::Failed; let host = format!("{}:{}", &config::CONFIG.webhook_base_url, self.port);
}, let url = Url::parse(&format!("{host}/{token}"))
}; .unwrap_or_else(|_| panic!("Can't parse webhook url!"));
tokio::spawn(async move { match bot.set_webhook(url.clone()).await {
dispatcher Ok(_) => (),
.dispatch_with_listener( Err(_) => return false,
listener,
LoggingErrorHandler::with_custom_text("An error from the update listener"),
)
.await;
});
BotStartResult::SuccessWithRouter(router)
} else {
let listener = match webhooks::axum(bot.clone(), webhooks::Options::new(addr, url)).await {
Ok(v) => v,
Err(err) => {
log::warn!("{}", err);
return BotStartResult::Failed;
},
};
tokio::spawn(async move {
dispatcher
.dispatch_with_listener(
listener,
LoggingErrorHandler::with_custom_text("An error from the update listener"),
)
.await;
});
BotStartResult::Success
}
}
async fn sd_token(token: &ShutdownToken) {
for _ in 1..10 {
if let Ok(v) = token.clone().shutdown() {
return v.await;
}
sleep(Duration::from_millis(100)).await;
}
}
async fn update_data(&mut self, bots_data: Vec<BotData>, is_first_start: bool) -> Vec<Router> {
let mut routers: Vec<Router> = vec![];
for bot_data in bots_data.iter() {
if let std::collections::hash_map::Entry::Vacant(e) =
self.bot_port_map.entry(bot_data.id)
{
e.insert(self.next_port);
if !is_first_start {
self.next_port += 1;
}
match self.start_bot(bot_data, is_first_start).await {
BotStartResult::Success => (),
BotStartResult::SuccessWithRouter(router) => {
routers.push(router);
},
BotStartResult::Failed => {
self.bot_shutdown_token_map.remove(&bot_data.id);
},
}
}
} }
routers tokio::spawn(async move {
dispatcher
.dispatch_with_listener(
listener,
LoggingErrorHandler::with_custom_text("An error from the update listener"),
)
.await;
});
return true;
} }
async fn check(&mut self, is_first_start: bool) -> Option<Vec<Router>> { async fn check(&mut self){
let bots_data = get_bots().await; let bots_data = get_bots().await;
match bots_data { match bots_data {
Ok(v) => Some(self.update_data(v, is_first_start).await), Ok(v) => {
for bot_data in v.iter() {
let need_start = {
let routers = self.state.routers.read().unwrap();
!routers.contains_key(&bot_data.token)
};
if need_start {
self.start_bot(bot_data).await;
}
}
},
Err(err) => { Err(err) => {
log::info!("{:?}", err); log::info!("{:?}", err);
None
} }
} }
} }
async fn stop_all(&mut self) { // async fn start_axum_server(&mut self) {
for token in self.bot_shutdown_token_map.values() { // loop {
BotsManager::sd_token(token).await; // let routers = match self.check(true).await {
} // Some(v) => v,
} // None => continue,
// };
// let mut app = Router::new();
// for router in routers {
// app = app.merge(router);
// }
// let addr = SocketAddr::from(([0, 0, 0, 0], self.next_port));
// self.next_port += 1;
// tokio::spawn(async move {
// log::info!("Start webserver...");
// axum::Server::bind(&addr)
// .serve(app.into_make_service())
// .await
// .unwrap();
// log::info!("Webserver shutdown...")
// });
// return;
// }
// }
async fn start_axum_server(&mut self) { async fn start_axum_server(&mut self) {
loop { async fn telegram_request(
let routers = match self.check(true).await { State(ServerState { routers }): State<ServerState>,
Some(v) => v, // secret_header: XTelegramBotApiSecretToken,
None => continue, input: String,
) -> impl IntoResponse {
// // FIXME: use constant time comparison here
// if secret_header.0.as_deref() != secret.as_deref().map(str::as_bytes) {
// return StatusCode::UNAUTHORIZED;
// }
let t1 = routers.read().unwrap();
let tx = t1.get("t");
let tx = match tx {
Some(tx) => {
tx
// match tx.get() {
// None => return StatusCode::SERVICE_UNAVAILABLE,
// // Do not process updates after `.stop()` is called even if the server is still
// // running (useful for when you need to stop the bot but can't stop the server).
// // TODO
// // _ if flag.is_stopped() => {
// // tx.close();
// // return StatusCode::SERVICE_UNAVAILABLE;
// // }
// Some(tx) => tx,
// };
},
None => return StatusCode::NOT_FOUND,
}; };
let mut app = Router::new(); match serde_json::from_str::<Update>(&input) {
Ok(mut update) => {
// See HACK comment in
// `teloxide_core::net::request::process_response::{closure#0}`
if let UpdateKind::Error(value) = &mut update.kind {
*value = serde_json::from_str(&input).unwrap_or_default();
}
for router in routers { tx.send(Ok(update)).expect("Cannot send an incoming update from the webhook")
app = app.merge(router); }
} Err(error) => {
log::error!(
"Cannot parse an update.\nError: {:?}\nValue: {}\n\
This is a bug in teloxide-core, please open an issue here: \
https://github.com/teloxide/teloxide/issues.",
error,
input
);
}
};
let addr = SocketAddr::from(([0, 0, 0, 0], self.next_port)); StatusCode::OK
self.next_port += 1;
tokio::spawn(async move {
log::info!("Start webserver...");
axum::Server::bind(&addr)
.serve(app.into_make_service())
.await
.unwrap();
log::info!("Webserver shutdown...")
});
return;
} }
let stop_flag = self.stop_data.1.clone();
let state = self.state.clone();
tokio::spawn(async move {
log::info!("Start webserver...");
let addr = SocketAddr::from(([0, 0, 0, 0], 8000));
let router = axum::Router::new()
.route("/:token/", post(telegram_request))
// .layer(TraceLayer::new_for_http())
.with_state(state);
axum::Server::bind(&addr)
.serve(router.into_make_service())
.with_graceful_shutdown(stop_flag)
.await
// .map_err(|err| {
// stop_token.stop();
// err
// })
.expect("Axum server error");
log::info!("Webserver shutdown...");
});
} }
pub async fn start(running: Arc<AtomicBool>) { pub async fn start(running: Arc<AtomicBool>) {
@@ -247,13 +296,13 @@ impl BotsManager {
loop { loop {
if !running.load(Ordering::SeqCst) { if !running.load(Ordering::SeqCst) {
manager.stop_all().await; manager.stop_data.0.stop();
return; return;
} }
sleep(Duration::from_secs(30)).await; manager.check().await;
manager.check(false).await; sleep(Duration::from_secs(30)).await;
} }
} }
} }

View File

@@ -21,9 +21,5 @@ async fn main() {
}) })
.expect("Error setting Ctrl-C handler"); .expect("Error setting Ctrl-C handler");
tokio::spawn(async move { bots_manager::BotsManager::start(running).await;
bots_manager::BotsManager::start(running).await;
})
.await
.unwrap();
} }