Implement notification sending

This commit is contained in:
2025-03-06 15:02:40 +01:00
parent eccceef663
commit 0377fc0e53
2 changed files with 36 additions and 12 deletions

View File

@@ -126,10 +126,14 @@ pub async fn get_commands() -> Vec<BotCommand> {
] ]
} }
pub async fn start_telegram_bot(subscription_manager: Arc<SubscriptionManager>) { pub fn get_telegram_bot() -> Bot {
let bot = OriginBot::new(CONFIG.bot_token.clone()) OriginBot::new(CONFIG.bot_token.clone())
.throttle(Limits::default()) .throttle(Limits::default())
.cache_me(); .cache_me()
}
pub async fn start_telegram_bot(subscription_manager: Arc<SubscriptionManager>) {
let bot = get_telegram_bot();
let handler = get_handler().await; let handler = get_handler().await;
let commands = get_commands().await; let commands = get_commands().await;

View File

@@ -10,6 +10,7 @@ use axum::{
use eyre::{Context, ContextCompat}; use eyre::{Context, ContextCompat};
use futures::TryStreamExt as _; use futures::TryStreamExt as _;
use http_body_util::BodyExt as _; use http_body_util::BodyExt as _;
use teloxide::prelude::Requester as _;
use tokio::{net::TcpListener, sync::RwLock}; use tokio::{net::TcpListener, sync::RwLock};
use tower_http::trace::TraceLayer; use tower_http::trace::TraceLayer;
use twitch_api::{ use twitch_api::{
@@ -22,7 +23,11 @@ use twitch_api::{
}; };
use twitch_oauth2::AppAccessToken; use twitch_oauth2::AppAccessToken;
use crate::{config::CONFIG, subscription_manager::SubscriptionManager}; use crate::{
config::CONFIG,
subscription_manager::{self, SubscriptionManager},
telegram_bot::get_telegram_bot,
};
pub async fn eventsub_register( pub async fn eventsub_register(
token: Arc<RwLock<AppAccessToken>>, token: Arc<RwLock<AppAccessToken>>,
@@ -90,6 +95,7 @@ pub async fn eventsub_register(
pub async fn twitch_eventsub( pub async fn twitch_eventsub(
Extension(cache): Extension<Arc<retainer::Cache<http::HeaderValue, ()>>>, Extension(cache): Extension<Arc<retainer::Cache<http::HeaderValue, ()>>>,
Extension(subscription_manager): Extension<Arc<SubscriptionManager>>,
request: http::Request<axum::body::Body>, request: http::Request<axum::body::Body>,
) -> impl IntoResponse { ) -> impl IntoResponse {
const MAX_ALLOWED_RESPONSE_SIZE: u64 = 64 * 1024; const MAX_ALLOWED_RESPONSE_SIZE: u64 = 64 * 1024;
@@ -136,17 +142,30 @@ pub async fn twitch_eventsub(
Event::StreamOnlineV1(P { Event::StreamOnlineV1(P {
message: message:
M::Notification(StreamOnlineV1Payload { M::Notification(StreamOnlineV1Payload {
broadcaster_user_id, broadcaster_user_name,
started_at,
.. ..
}), }),
.. ..
}) => { }) => {
todo!( let bot = get_telegram_bot();
"StreamOnlineV1: broadcaster_user_id: {}, started_at: {}",
broadcaster_user_id, let subscriptions = subscription_manager.subscriptions.read().await;
started_at let user_ids = match subscriptions.get(&broadcaster_user_name.to_string()) {
); Some(v) => v,
None => return (StatusCode::OK, String::default()),
};
for user_id in user_ids.iter() {
if let Err(err) = bot
.send_message(
user_id.to_string(),
format!("Streamer {} is now live! ()", broadcaster_user_name,),
)
.await
{
tracing::error!("Failed to send message to {}: {:?}", user_id, err);
}
}
} }
_ => {} _ => {}
} }
@@ -184,9 +203,10 @@ impl TwitchWebhookServer {
let app = Router::new() let app = Router::new()
.route( .route(
"/twitch/eventsub/", "/twitch/eventsub/",
post(move |cache, request| twitch_eventsub(cache, request)), post(move |cache, subs, request| twitch_eventsub(cache, subs, request)),
) )
.layer(Extension(retainer)) .layer(Extension(retainer))
.layer(Extension(self.subscription_manager.clone()))
.layer(TraceLayer::new_for_http()); .layer(TraceLayer::new_for_http());
let address = SocketAddr::new([0, 0, 0, 0].into(), CONFIG.twitch_webhook_port); let address = SocketAddr::new([0, 0, 0, 0].into(), CONFIG.twitch_webhook_port);