From 0377fc0e537845aaad6f4f38c157c4b03f8206f7 Mon Sep 17 00:00:00 2001 From: Bulat Kurbanov Date: Thu, 6 Mar 2025 15:02:40 +0100 Subject: [PATCH] Implement notification sending --- src/telegram_bot.rs | 10 +++++++--- src/twitch_webhook.rs | 38 +++++++++++++++++++++++++++++--------- 2 files changed, 36 insertions(+), 12 deletions(-) diff --git a/src/telegram_bot.rs b/src/telegram_bot.rs index 3ec0a46..f55bb3d 100644 --- a/src/telegram_bot.rs +++ b/src/telegram_bot.rs @@ -126,10 +126,14 @@ pub async fn get_commands() -> Vec { ] } -pub async fn start_telegram_bot(subscription_manager: Arc) { - let bot = OriginBot::new(CONFIG.bot_token.clone()) +pub fn get_telegram_bot() -> Bot { + OriginBot::new(CONFIG.bot_token.clone()) .throttle(Limits::default()) - .cache_me(); + .cache_me() +} + +pub async fn start_telegram_bot(subscription_manager: Arc) { + let bot = get_telegram_bot(); let handler = get_handler().await; let commands = get_commands().await; diff --git a/src/twitch_webhook.rs b/src/twitch_webhook.rs index 50b9a95..06c2c8e 100644 --- a/src/twitch_webhook.rs +++ b/src/twitch_webhook.rs @@ -10,6 +10,7 @@ use axum::{ use eyre::{Context, ContextCompat}; use futures::TryStreamExt as _; use http_body_util::BodyExt as _; +use teloxide::prelude::Requester as _; use tokio::{net::TcpListener, sync::RwLock}; use tower_http::trace::TraceLayer; use twitch_api::{ @@ -22,7 +23,11 @@ use twitch_api::{ }; use twitch_oauth2::AppAccessToken; -use crate::{config::CONFIG, subscription_manager::SubscriptionManager}; +use crate::{ + config::CONFIG, + subscription_manager::{self, SubscriptionManager}, + telegram_bot::get_telegram_bot, +}; pub async fn eventsub_register( token: Arc>, @@ -90,6 +95,7 @@ pub async fn eventsub_register( pub async fn twitch_eventsub( Extension(cache): Extension>>, + Extension(subscription_manager): Extension>, request: http::Request, ) -> impl IntoResponse { const MAX_ALLOWED_RESPONSE_SIZE: u64 = 64 * 1024; @@ -136,17 +142,30 @@ pub async fn twitch_eventsub( Event::StreamOnlineV1(P { message: M::Notification(StreamOnlineV1Payload { - broadcaster_user_id, - started_at, + broadcaster_user_name, .. }), .. }) => { - todo!( - "StreamOnlineV1: broadcaster_user_id: {}, started_at: {}", - broadcaster_user_id, - started_at - ); + let bot = get_telegram_bot(); + + let subscriptions = subscription_manager.subscriptions.read().await; + let user_ids = match subscriptions.get(&broadcaster_user_name.to_string()) { + Some(v) => v, + None => return (StatusCode::OK, String::default()), + }; + + for user_id in user_ids.iter() { + if let Err(err) = bot + .send_message( + user_id.to_string(), + format!("Streamer {} is now live! ()", broadcaster_user_name,), + ) + .await + { + tracing::error!("Failed to send message to {}: {:?}", user_id, err); + } + } } _ => {} } @@ -184,9 +203,10 @@ impl TwitchWebhookServer { let app = Router::new() .route( "/twitch/eventsub/", - post(move |cache, request| twitch_eventsub(cache, request)), + post(move |cache, subs, request| twitch_eventsub(cache, subs, request)), ) .layer(Extension(retainer)) + .layer(Extension(self.subscription_manager.clone())) .layer(TraceLayer::new_for_http()); let address = SocketAddr::new([0, 0, 0, 0].into(), CONFIG.twitch_webhook_port);