This commit is contained in:
2024-01-08 00:39:00 +01:00
parent e3038ca6c5
commit 8431b27ce0
6 changed files with 255 additions and 249 deletions

View File

@@ -0,0 +1,88 @@
use axum::extract::Path;
use axum::response::IntoResponse;
use axum::routing::post;
use reqwest::StatusCode;
use std::net::SocketAddr;
use teloxide::types::{Update, UpdateKind};
use tower_http::trace::TraceLayer;
use tracing::log;
use crate::bots_manager::{internal::start_bot, BOTS_DATA, BOTS_ROUTES, SERVER_PORT};
pub async fn start_axum_server() {
async fn telegram_request(Path(token): Path<String>, input: String) -> impl IntoResponse {
let (_, r_tx) = match BOTS_ROUTES.get(&token).await {
Some(tx) => tx,
None => {
let bot_data = BOTS_DATA.get(&token).await;
if bot_data.is_none() {
return StatusCode::NOT_FOUND;
}
let start_result = start_bot(&bot_data.unwrap(), SERVER_PORT).await;
if !start_result {
return StatusCode::SERVICE_UNAVAILABLE;
}
BOTS_ROUTES.get(&token).await.unwrap()
}
};
let tx = match r_tx.get() {
Some(v) => v,
None => {
BOTS_ROUTES.remove(&token).await;
return StatusCode::SERVICE_UNAVAILABLE;
}
};
match serde_json::from_str::<Update>(&input) {
Ok(mut update) => {
if let UpdateKind::Error(value) = &mut update.kind {
*value = serde_json::from_str(&input).unwrap_or_default();
}
if let Err(err) = tx.send(Ok(update)) {
log::error!("{:?}", err);
BOTS_ROUTES.remove(&token).await;
return StatusCode::SERVICE_UNAVAILABLE;
}
}
Err(error) => {
log::error!(
"Cannot parse an update.\nError: {:?}\nValue: {}\n\
This is a bug in teloxide-core, please open an issue here: \
https://github.com/teloxide/teloxide/issues.",
error,
input
);
}
};
StatusCode::OK
}
let router = axum::Router::new()
.route("/:token/", post(telegram_request))
.layer(TraceLayer::new_for_http());
tokio::spawn(async move {
log::info!("Start webserver...");
let addr = SocketAddr::from(([0, 0, 0, 0], SERVER_PORT));
axum::Server::bind(&addr)
.serve(router.into_make_service())
.await
.expect("Axum server error");
log::info!("Webserver shutdown...");
});
}

View File

@@ -10,7 +10,7 @@ pub enum BotCache {
NoCache, NoCache,
} }
#[derive(Deserialize, Debug)] #[derive(Deserialize, Debug, Clone)]
pub struct BotData { pub struct BotData {
pub id: u32, pub id: u32,
pub token: String, pub token: String,

View File

@@ -0,0 +1,29 @@
use tokio::sync::mpsc;
pub struct ClosableSender<T> {
origin: std::sync::Arc<std::sync::RwLock<Option<mpsc::UnboundedSender<T>>>>,
}
impl<T> Clone for ClosableSender<T> {
fn clone(&self) -> Self {
Self {
origin: self.origin.clone(),
}
}
}
impl<T> ClosableSender<T> {
pub fn new(sender: mpsc::UnboundedSender<T>) -> Self {
Self {
origin: std::sync::Arc::new(std::sync::RwLock::new(Some(sender))),
}
}
pub fn get(&self) -> Option<mpsc::UnboundedSender<T>> {
self.origin.read().unwrap().clone()
}
pub fn close(&mut self) {
self.origin.write().unwrap().take();
}
}

View File

@@ -0,0 +1,98 @@
use teloxide::adaptors::throttle::Limits;
use teloxide::dispatching::Dispatcher;
use teloxide::error_handlers::LoggingErrorHandler;
use teloxide::requests::{Request, Requester, RequesterExt};
use teloxide::stop::StopToken;
use teloxide::stop::{mk_stop_token, StopFlag};
use teloxide::types::{BotCommand, Update};
use teloxide::update_listeners::{StatefulListener, UpdateListener};
use teloxide::{dptree, Bot};
use tokio::sync::mpsc::{self, UnboundedSender};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::log;
use url::Url;
use std::convert::Infallible;
use crate::bots_manager::BOTS_ROUTES;
use crate::config;
use super::closable_sender::ClosableSender;
use super::utils::tuple_first_mut;
use super::BotData;
type UpdateSender = mpsc::UnboundedSender<Result<Update, std::convert::Infallible>>;
pub fn get_listener() -> (
StopToken,
StopFlag,
UnboundedSender<Result<Update, std::convert::Infallible>>,
impl UpdateListener<Err = Infallible>,
) {
let (tx, rx): (UpdateSender, _) = mpsc::unbounded_channel();
let (stop_token, stop_flag) = mk_stop_token();
let stream = UnboundedReceiverStream::new(rx);
let listener = StatefulListener::new(
(stream, stop_token.clone()),
tuple_first_mut,
|state: &mut (_, StopToken)| state.1.clone(),
);
(stop_token, stop_flag, tx, listener)
}
pub async fn start_bot(bot_data: &BotData, port: u16) -> bool {
let bot = Bot::new(bot_data.token.clone())
.set_api_url(config::CONFIG.telegram_bot_api.clone())
.throttle(Limits::default())
.cache_me();
let token = bot.inner().inner().token();
log::info!("Start bot(id={})", bot_data.id);
let (handler, commands) = crate::bots::get_bot_handler();
let set_command_result = match commands {
Some(v) => bot.set_my_commands::<Vec<BotCommand>>(v).send().await,
None => bot.delete_my_commands().send().await,
};
match set_command_result {
Ok(_) => (),
Err(err) => log::error!("{:?}", err),
}
let mut dispatcher = Dispatcher::builder(bot.clone(), handler)
.dependencies(dptree::deps![bot_data.cache])
.build();
let (stop_token, _stop_flag, tx, listener) = get_listener();
let host = format!("{}:{}", &config::CONFIG.webhook_base_url, port);
let url = Url::parse(&format!("{host}/{token}/"))
.unwrap_or_else(|_| panic!("Can't parse webhook url!"));
if bot.set_webhook(url.clone()).await.is_err() {
return false;
}
tokio::spawn(async move {
dispatcher
.dispatch_with_listener(
listener,
LoggingErrorHandler::with_custom_text("An error from the update listener"),
)
.await;
});
BOTS_ROUTES
.insert(token.to_string(), (stop_token, ClosableSender::new(tx)))
.await;
true
}

View File

@@ -1,45 +1,28 @@
pub mod axum_server;
pub mod bot_manager_client; pub mod bot_manager_client;
pub mod closable_sender;
pub mod internal;
pub mod utils;
use axum::extract::{Path, State};
use axum::response::IntoResponse;
use axum::routing::post;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use reqwest::StatusCode;
use smartstring::alias::String as SmartString; use smartstring::alias::String as SmartString;
use teloxide::stop::{mk_stop_token, StopFlag, StopToken}; use teloxide::stop::StopToken;
use teloxide::update_listeners::{StatefulListener, UpdateListener};
use tokio::sync::mpsc::{self, UnboundedSender};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::log; use tracing::log;
use url::Url;
use std::collections::HashMap;
use std::convert::Infallible;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::RwLock;
use smallvec::SmallVec; use smallvec::SmallVec;
use teloxide::adaptors::throttle::Limits;
use teloxide::types::{BotCommand, UpdateKind};
use tokio::time::{self, sleep, Duration}; use tokio::time::{self, sleep, Duration};
use tower_http::trace::TraceLayer;
use teloxide::prelude::*; use teloxide::prelude::*;
use moka::future::Cache; use moka::future::Cache;
use self::axum_server::start_axum_server;
use self::bot_manager_client::get_bots; use self::bot_manager_client::get_bots;
pub use self::bot_manager_client::{BotCache, BotData}; pub use self::bot_manager_client::{BotCache, BotData};
use crate::config; use self::closable_sender::ClosableSender;
type UpdateSender = mpsc::UnboundedSender<Result<Update, std::convert::Infallible>>;
fn tuple_first_mut<A, B>(tuple: &mut (A, B)) -> &mut A {
&mut tuple.0
}
pub static USER_ACTIVITY_CACHE: Lazy<Cache<UserId, ()>> = Lazy::new(|| { pub static USER_ACTIVITY_CACHE: Lazy<Cache<UserId, ()>> = Lazy::new(|| {
Cache::builder() Cache::builder()
@@ -62,158 +45,40 @@ pub static CHAT_DONATION_NOTIFICATIONS_CACHE: Lazy<Cache<ChatId, ()>> = Lazy::ne
.build() .build()
}); });
type Routes = Arc< pub static SERVER_PORT: u16 = 8000;
RwLock<
HashMap<
String,
(
StopToken,
ClosableSender<Result<Update, std::convert::Infallible>>,
),
>,
>,
>;
struct ClosableSender<T> { type StopTokenWithSender = (
origin: std::sync::Arc<std::sync::RwLock<Option<mpsc::UnboundedSender<T>>>>, StopToken,
} ClosableSender<Result<Update, std::convert::Infallible>>,
);
impl<T> Clone for ClosableSender<T> { pub static BOTS_ROUTES: Lazy<Cache<String, StopTokenWithSender>> = Lazy::new(|| {
fn clone(&self) -> Self { Cache::builder()
Self { .time_to_idle(Duration::from_secs(60 * 60))
origin: self.origin.clone(), .max_capacity(100)
} .eviction_listener(|_token, value: StopTokenWithSender, _cause| {
} let (stop_token, mut sender) = value;
}
impl<T> ClosableSender<T> { stop_token.stop();
fn new(sender: mpsc::UnboundedSender<T>) -> Self { sender.close();
Self { })
origin: std::sync::Arc::new(std::sync::RwLock::new(Some(sender))), .build()
} });
}
fn get(&self) -> Option<mpsc::UnboundedSender<T>> { pub static BOTS_DATA: Lazy<Cache<String, BotData>> = Lazy::new(|| Cache::builder().build());
self.origin.read().unwrap().clone()
}
fn close(&mut self) { pub struct BotsManager;
self.origin.write().unwrap().take();
}
}
#[derive(Default, Clone)]
struct ServerState {
routers: Routes,
}
pub struct BotsManager {
port: u16,
state: ServerState,
}
impl BotsManager { impl BotsManager {
pub fn create() -> Self { async fn check() {
BotsManager {
port: 8000,
state: ServerState {
routers: Arc::new(RwLock::new(HashMap::new())),
},
}
}
fn get_listener(
&self,
) -> (
StopToken,
StopFlag,
UnboundedSender<Result<Update, std::convert::Infallible>>,
impl UpdateListener<Err = Infallible>,
) {
let (tx, rx): (UpdateSender, _) = mpsc::unbounded_channel();
let (stop_token, stop_flag) = mk_stop_token();
let stream = UnboundedReceiverStream::new(rx);
let listener = StatefulListener::new(
(stream, stop_token.clone()),
tuple_first_mut,
|state: &mut (_, StopToken)| state.1.clone(),
);
(stop_token, stop_flag, tx, listener)
}
async fn start_bot(&mut self, bot_data: &BotData) -> bool {
let bot = Bot::new(bot_data.token.clone())
.set_api_url(config::CONFIG.telegram_bot_api.clone())
.throttle(Limits::default())
.cache_me();
let token = bot.inner().inner().token();
log::info!("Start bot(id={})", bot_data.id);
let (handler, commands) = crate::bots::get_bot_handler();
let set_command_result = match commands {
Some(v) => bot.set_my_commands::<Vec<BotCommand>>(v).send().await,
None => bot.delete_my_commands().send().await,
};
match set_command_result {
Ok(_) => (),
Err(err) => log::error!("{:?}", err),
}
let mut dispatcher = Dispatcher::builder(bot.clone(), handler)
.dependencies(dptree::deps![bot_data.cache])
.build();
let (stop_token, _stop_flag, tx, listener) = self.get_listener();
{
let mut routers = self.state.routers.write().await;
routers.insert(token.to_string(), (stop_token, ClosableSender::new(tx)));
}
let host = format!("{}:{}", &config::CONFIG.webhook_base_url, self.port);
let url = Url::parse(&format!("{host}/{token}/"))
.unwrap_or_else(|_| panic!("Can't parse webhook url!"));
match bot.set_webhook(url.clone()).await {
Ok(_) => (),
Err(_) => return false,
}
tokio::spawn(async move {
dispatcher
.dispatch_with_listener(
listener,
LoggingErrorHandler::with_custom_text("An error from the update listener"),
)
.await;
});
true
}
async fn check(&mut self) {
let bots_data = get_bots().await; let bots_data = get_bots().await;
match bots_data { match bots_data {
Ok(v) => { Ok(v) => {
for bot_data in v.iter() { for bot_data in v.iter() {
let need_start = { BOTS_DATA
let routers = self.state.routers.read().await; .insert(bot_data.token.clone(), bot_data.clone())
!routers.contains_key(&bot_data.token) .await;
};
if need_start {
self.start_bot(bot_data).await;
}
} }
} }
Err(err) => { Err(err) => {
@@ -222,106 +87,29 @@ impl BotsManager {
} }
} }
async fn start_axum_server(&mut self) { pub async fn stop_all() {
async fn telegram_request( for (_, (stop_token, _)) in BOTS_ROUTES.iter() {
State(ServerState { routers }): State<ServerState>,
Path(token): Path<String>,
input: String,
) -> impl IntoResponse {
let routes = routers.read().await;
let tx = routes.get(&token);
let (stop_token, r_tx) = match tx {
Some(tx) => tx,
None => return StatusCode::NOT_FOUND,
};
let tx = match r_tx.get() {
Some(v) => v,
None => {
stop_token.stop();
if let Some((_, mut sender)) = routers.write().await.remove(&token) {
sender.close();
};
return StatusCode::SERVICE_UNAVAILABLE;
}
};
match serde_json::from_str::<Update>(&input) {
Ok(mut update) => {
if let UpdateKind::Error(value) = &mut update.kind {
*value = serde_json::from_str(&input).unwrap_or_default();
}
if let Err(err) = tx.send(Ok(update)) {
log::error!("{:?}", err);
stop_token.stop();
if let Some((_, mut sender)) = routers.write().await.remove(&token) {
sender.close();
};
return StatusCode::SERVICE_UNAVAILABLE;
}
}
Err(error) => {
log::error!(
"Cannot parse an update.\nError: {:?}\nValue: {}\n\
This is a bug in teloxide-core, please open an issue here: \
https://github.com/teloxide/teloxide/issues.",
error,
input
);
}
};
StatusCode::OK
}
let port = self.port;
let router = axum::Router::new()
.route("/:token/", post(telegram_request))
.layer(TraceLayer::new_for_http())
.with_state(self.state.clone());
tokio::spawn(async move {
log::info!("Start webserver...");
let addr = SocketAddr::from(([0, 0, 0, 0], port));
axum::Server::bind(&addr)
.serve(router.into_make_service())
.await
.expect("Axum server error");
log::info!("Webserver shutdown...");
});
}
pub async fn stop_all(self) {
let routers = self.state.routers.read().await;
for (stop_token, _) in routers.values() {
stop_token.stop(); stop_token.stop();
} }
BOTS_ROUTES.invalidate_all();
sleep(Duration::from_secs(5)).await; sleep(Duration::from_secs(5)).await;
} }
pub async fn start(running: Arc<AtomicBool>) { pub async fn start(running: Arc<AtomicBool>) {
let mut manager = BotsManager::create(); start_axum_server().await;
manager.start_axum_server().await;
let mut interval = time::interval(Duration::from_secs(5)); let mut interval = time::interval(Duration::from_secs(5));
loop { loop {
manager.check().await; BotsManager::check().await;
for _i in 0..30 { for _i in 0..30 {
interval.tick().await; interval.tick().await;
if !running.load(Ordering::SeqCst) { if !running.load(Ordering::SeqCst) {
manager.stop_all().await; BotsManager::stop_all().await;
return; return;
}; };
} }

View File

@@ -0,0 +1,3 @@
pub fn tuple_first_mut<A, B>(tuple: &mut (A, B)) -> &mut A {
&mut tuple.0
}