This commit is contained in:
2025-02-27 23:20:25 +01:00
commit 6339336135
8 changed files with 3296 additions and 0 deletions

154
src/bot.rs Normal file
View File

@@ -0,0 +1,154 @@
use std::{error::Error, sync::Arc};
use teloxide::{
Bot,
adaptors::throttle::Limits,
dispatching::{HandlerExt, dialogue::GetChatId},
dptree::{self, Handler},
macros::BotCommands,
prelude::{Dispatcher, LoggingErrorHandler, Requester, RequesterExt},
types::{BotCommand, Message},
update_listeners::webhooks,
};
use crate::{config::CONFIG, subscription_manager::SubscriptionManager};
pub type BotHandlerInternal = Result<(), Box<dyn Error + Send + Sync>>;
type BotHandler = Handler<
'static,
dptree::di::DependencyMap,
BotHandlerInternal,
teloxide::dispatching::DpHandlerDescription,
>;
/// These commands are supported:
#[derive(BotCommands, Clone)]
#[command(rename_rule = "lowercase")]
enum Command {
Start,
Help,
Subscribe(String),
Unsubscribe(String),
}
pub async fn help_message_handler(bot: Bot, message: Message) -> BotHandlerInternal {
const HELP_MESSAGE: &str = r#"""
Welcome!
This bot allow you to subscribe to receive start stream notifications.
"""#;
match bot
.send_message(message.chat_id().unwrap(), HELP_MESSAGE)
.await
{
Ok(_) => Ok(()),
Err(err) => Err(Box::new(err)),
}
}
pub async fn subscribe_handler(
bot: Bot,
message: Message,
subscription_manager: Arc<SubscriptionManager>,
username: String,
) -> BotHandlerInternal {
let user_id = message.clone().from.unwrap().id;
subscription_manager.subscribe(user_id.0, username).await;
match bot
.send_message(message.chat_id().unwrap(), "Subscribed!")
.await
{
Ok(_) => Ok(()),
Err(err) => Err(Box::new(err)),
}
}
pub async fn unsubscribe_handler(
bot: Bot,
message: Message,
subscription_manager: Arc<SubscriptionManager>,
username: String,
) -> BotHandlerInternal {
let user_id = message.clone().from.unwrap().id;
subscription_manager.unsubscribe(user_id.0, username).await;
match bot
.send_message(message.chat_id().unwrap(), "Unsubscribed!")
.await
{
Ok(_) => Ok(()),
Err(err) => Err(Box::new(err)),
}
}
pub async fn get_handler() -> BotHandler {
dptree::entry().branch(dptree::entry().filter_command::<Command>().endpoint(
|bot: Bot,
message: Message,
command: Command,
subscription_manager: Arc<SubscriptionManager>| async move {
match command {
Command::Start | Command::Help => help_message_handler(bot, message).await,
Command::Subscribe(username) => {
subscribe_handler(bot, message, subscription_manager, username).await
}
Command::Unsubscribe(username) => {
unsubscribe_handler(bot, message, subscription_manager, username).await
}
}
},
))
}
pub async fn get_commands() -> Vec<BotCommand> {
vec![
BotCommand {
command: "start".into(),
description: "Start the bot".into(),
},
BotCommand {
command: "help".into(),
description: "Show help".into(),
},
BotCommand {
command: "subscribe".into(),
description: "Subscribe to the newsletter".into(),
},
BotCommand {
command: "unsubscribe".into(),
description: "Unsubscribe from the newsletter".into(),
},
]
}
pub async fn start_bot(subscription_manager: Arc<SubscriptionManager>) {
let bot = Bot::new(CONFIG.bot_token.clone())
.throttle(Limits::default())
.cache_me();
let handler = get_handler().await;
let commands = get_commands().await;
let _ = bot.set_my_commands(commands).await;
let mut dispatcher = Dispatcher::builder(bot.clone(), handler)
.dependencies(dptree::deps![subscription_manager])
.build();
let addr = ([0, 0, 0, 0], CONFIG.telegram_webhook_port).into();
let url = CONFIG.telegram_webhook_url.parse().unwrap();
let update_listener = webhooks::axum(bot, webhooks::Options::new(addr, url))
.await
.expect("Couldn't setup webhook");
dispatcher
.dispatch_with_listener(
update_listener,
LoggingErrorHandler::with_custom_text("An error from the update listener"),
)
.await;
}

48
src/config.rs Normal file
View File

@@ -0,0 +1,48 @@
use once_cell::sync::Lazy;
pub struct Config {
pub bot_token: String,
pub telegram_webhook_url: String,
pub telegram_webhook_port: u16,
pub twitch_client_id: String,
pub twitch_client_secret: String,
pub twitch_signing_secret: String,
pub twitch_webhook_url: String,
pub twitch_webhook_port: u16,
}
impl Config {
fn load() -> Self {
Self {
bot_token: std::env::var("BOT_TOKEN").expect("BOT_TOKEN is not set"),
telegram_webhook_url: std::env::var("TELEGRAM_WEBHOOK_URL")
.expect("TELEGRAM_WEBHOOK_URL is not set"),
telegram_webhook_port: std::env::var("TELEGRAM_WEBHOOK_PORT")
.expect("TELEGRAM_WEBHOOK_PORT is not set")
.parse()
.expect("TELEGRAM_WEBHOOK_PORT is not a valid u16"),
twitch_client_id: std::env::var("TWITCH_CLIENT_ID")
.expect("TWITCH_CLIENT_ID is not set"),
twitch_client_secret: std::env::var("TWITCH_CLIENT_SECRET")
.expect("TWITCH_CLIENT_SECRET is not set"),
twitch_signing_secret: std::env::var("TWITCH_SIGNING_SECRET")
.expect("TWITCH_SIGNING_SECRET is not set"),
twitch_webhook_url: std::env::var("TWITCH_WEBHOOK_URL")
.expect("TWITCH_WEBHOOK_URL is not set"),
twitch_webhook_port: std::env::var("TWITCH_WEBHOOK_PORT")
.expect("TWITCH_WEBHOOK_PORT is not set")
.parse()
.expect("TWITCH_WEBHOOK_PORT is not a valid u16"),
}
}
}
pub static CONFIG: Lazy<Config> = Lazy::new(Config::load);

26
src/main.rs Normal file
View File

@@ -0,0 +1,26 @@
pub mod bot;
pub mod config;
pub mod subscription_manager;
pub mod twitch_webhook;
use std::sync::Arc;
use bot::start_bot;
use subscription_manager::SubscriptionManager;
use twitch_webhook::start_twitch_webhook;
#[tokio::main]
async fn main() {
let subscription_manager = Arc::new(SubscriptionManager::new());
subscription_manager.init().await;
let (_, webhook_result) = tokio::join!(
start_bot(subscription_manager.clone()),
start_twitch_webhook(subscription_manager)
);
if let Err(e) = webhook_result {
eprintln!("Error in webhook: {:?}", e);
}
}

View File

@@ -0,0 +1,38 @@
use std::collections::{HashMap, HashSet};
use tokio::sync::RwLock;
pub struct SubscriptionManager {
pub subscriptions: RwLock<HashMap<String, HashSet<u64>>>,
}
impl SubscriptionManager {
pub fn new() -> Self {
Self {
subscriptions: RwLock::new(HashMap::new()),
}
}
pub async fn init(&self) {
println!("SubscriptionManager initialized");
}
pub async fn subscribe(&self, telegram_user_id: u64, username: String) {
self.subscriptions
.write()
.await
.entry(username)
.or_insert(HashSet::new())
.insert(telegram_user_id);
}
pub async fn unsubscribe(&self, telegram_user_id: u64, username: String) {
self.subscriptions
.write()
.await
.entry(username)
.and_modify(|set| {
set.remove(&telegram_user_id);
});
}
}

267
src/twitch_webhook.rs Normal file
View File

@@ -0,0 +1,267 @@
use std::{net::SocketAddr, sync::Arc};
use axum::{
Extension, Router,
body::HttpBody,
http::{self, StatusCode},
response::IntoResponse,
routing::post,
};
use eyre::{Context, ContextCompat};
use futures::TryStreamExt as _;
use http_body_util::BodyExt as _;
use tokio::{net::TcpListener, sync::RwLock};
use twitch_api::{
HelixClient,
client::ClientDefault,
eventsub::{
Event, EventType, Status,
stream::{StreamOnlineV1, StreamOnlineV1Payload},
},
};
use twitch_oauth2::AppAccessToken;
use crate::{config::CONFIG, subscription_manager::SubscriptionManager};
pub async fn eventsub_register(
token: Arc<RwLock<AppAccessToken>>,
login: String,
webhook_url: String,
) -> Result<(), eyre::Report> {
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
let client: HelixClient<'_, reqwest::Client> = HelixClient::new();
let channel_information = client
.get_channel_from_login(&login, &*token.read().await)
.await
.wrap_err("when getting channel")?
.wrap_err("when getting channel")?;
let broadcaster_id = channel_information.broadcaster_id;
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(24 * 60 * 60));
loop {
interval.tick().await;
let subs = client
.get_eventsub_subscriptions(Status::Enabled, None, None, &*token.read().await)
.map_ok(|events| {
futures::stream::iter(events.subscriptions.into_iter().map(Ok::<_, eyre::Report>))
})
.try_flatten()
.try_filter(|event| futures::future::ready(event.transport.is_webhook()))
.try_collect::<Vec<_>>()
.await?;
let online_exists = subs.iter().any(|sub| {
sub.transport.as_webhook().unwrap().callback == webhook_url
&& sub.type_ == EventType::StreamOnline
&& sub.version == "1"
&& sub
.condition
.as_object()
.expect("a stream.online did not contain broadcaster")
.get("broadcaster_user_id")
.unwrap()
.as_str()
== Some(broadcaster_id.as_str())
});
let transport = twitch_api::eventsub::Transport::webhook(
webhook_url.clone(),
CONFIG.twitch_signing_secret.clone(),
);
if !online_exists {
client
.create_eventsub_subscription(
StreamOnlineV1::broadcaster_user_id(broadcaster_id.clone()),
transport.clone(),
&*token.read().await,
)
.await
.wrap_err_with(|| "when registering online event")?;
}
}
}
pub async fn twitch_eventsub(
Extension(cache): Extension<Arc<retainer::Cache<http::HeaderValue, ()>>>,
request: http::Request<axum::body::Body>,
) -> impl IntoResponse {
const MAX_ALLOWED_RESPONSE_SIZE: u64 = 64 * 1024;
let (parts, body) = request.into_parts();
let response_content_length = match body.size_hint().upper() {
Some(v) => v,
None => MAX_ALLOWED_RESPONSE_SIZE + 1,
};
let body = if response_content_length < MAX_ALLOWED_RESPONSE_SIZE {
body.collect().await.unwrap().to_bytes().to_vec()
} else {
panic!("too big data given")
};
let request = http::Request::from_parts(parts, &*body);
if !Event::verify_payload(&request, CONFIG.twitch_signing_secret.as_bytes()) {
return (StatusCode::BAD_REQUEST, "Invalid signature".to_string());
}
if let Some(id) = request.headers().get("Twitch-Eventsub-Message-Id") {
if cache.get(id).await.is_none() {
cache.insert(id.clone(), (), 400).await;
} else {
return (StatusCode::OK, "".to_string());
}
}
let event = Event::parse_http(&request).unwrap();
if let Some(ver) = event.get_verification_request() {
return (StatusCode::OK, ver.challenge.clone());
}
if event.is_revocation() {
return (StatusCode::OK, "".to_string());
}
use twitch_api::eventsub::{Message as M, Payload as P};
match event {
Event::StreamOnlineV1(P {
message:
M::Notification(StreamOnlineV1Payload {
broadcaster_user_id,
started_at,
..
}),
..
}) => {
todo!(
"StreamOnlineV1: broadcaster_user_id: {}, started_at: {}",
broadcaster_user_id,
started_at
);
}
_ => {}
}
(StatusCode::OK, String::default())
}
struct TwitchWebhookServer {
subscription_manager: Arc<SubscriptionManager>,
subscribed_to: Arc<RwLock<Vec<String>>>,
app_access_token: Arc<RwLock<AppAccessToken>>,
}
impl TwitchWebhookServer {
pub fn new(
subscription_manager: Arc<SubscriptionManager>,
app_access_token: Arc<RwLock<AppAccessToken>>,
) -> Self {
Self {
subscription_manager,
subscribed_to: Arc::new(RwLock::new(Vec::new())),
app_access_token,
}
}
pub async fn start_webhook_server(&self) {
let retainer = Arc::new(retainer::Cache::<axum::http::HeaderValue, ()>::new());
let ret = retainer.clone();
let _: tokio::task::JoinHandle<Result<(), ()>> = tokio::spawn(async move {
ret.monitor(10, 0.50, tokio::time::Duration::from_secs(86400 / 2))
.await;
Ok(())
});
let app = Router::new()
.route(
"/twitch/eventsub/",
post(move |cache, request| twitch_eventsub(cache, request)),
)
.layer(Extension(retainer));
let address = SocketAddr::new([0, 0, 0, 0].into(), CONFIG.twitch_webhook_port);
let _ = axum::serve(
TcpListener::bind(address).await.unwrap(),
app.into_make_service(),
)
.await;
}
pub async fn subscribe(&self, streamer: String) {
let _ = eventsub_register(
self.app_access_token.clone(),
streamer,
format!("{}/twitch/eventsub/", CONFIG.twitch_webhook_url),
)
.await;
}
pub async fn check_subscriptions(&self) {
loop {
let streamers = self
.subscription_manager
.subscriptions
.read()
.await
.keys()
.cloned()
.collect::<Vec<String>>();
for streamer in streamers {
let is_subscribed = self.subscribed_to.read().await.contains(&streamer);
if !is_subscribed {
self.subscribe(streamer.clone()).await;
self.subscribed_to.write().await.push(streamer);
}
}
tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
}
}
pub async fn start(&self) -> Result<(), eyre::Report> {
let subscribe_future = self.check_subscriptions();
let webhook_future = self.start_webhook_server();
futures::join!(subscribe_future, webhook_future);
Ok(())
}
}
pub async fn start_twitch_webhook(
subscription_manager: Arc<SubscriptionManager>,
) -> Result<(), eyre::Report> {
let client: HelixClient<_> = twitch_api::HelixClient::with_client(
<reqwest::Client>::default_client_with_name(Some(
"twitch-rs/eventsub"
.parse()
.wrap_err_with(|| "when creating header name")
.unwrap(),
))
.wrap_err_with(|| "when creating client")?,
);
let token = twitch_oauth2::AppAccessToken::get_app_access_token(
&client,
CONFIG.twitch_client_id.clone().into(),
CONFIG.twitch_client_secret.clone().into(),
vec![],
)
.await?;
let token = Arc::new(tokio::sync::RwLock::new(token));
let twitch_webhook_server = TwitchWebhookServer::new(subscription_manager, token);
let _ = twitch_webhook_server.start().await;
Ok(())
}