diff --git a/Cargo.lock b/Cargo.lock index 3a790e9..52a84e0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -32,6 +32,12 @@ dependencies = [ "libc", ] +[[package]] +name = "anyhow" +version = "1.0.86" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da" + [[package]] name = "arrayvec" version = "0.7.4" @@ -308,11 +314,17 @@ dependencies = [ name = "discord-bot" version = "0.1.0" dependencies = [ + "anyhow", + "async-trait", "chrono", + "futures", "once_cell", "reqwest 0.12.5", + "serde", + "serde_json", "serenity", "tokio", + "tokio-tungstenite 0.23.1", ] [[package]] @@ -403,6 +415,7 @@ checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" dependencies = [ "futures-channel", "futures-core", + "futures-executor", "futures-io", "futures-sink", "futures-task", @@ -425,6 +438,17 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +[[package]] +name = "futures-executor" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + [[package]] name = "futures-io" version = "0.3.30" @@ -1510,7 +1534,7 @@ dependencies = [ "static_assertions", "time", "tokio", - "tokio-tungstenite", + "tokio-tungstenite 0.21.0", "tracing", "typemap_rev", "typesize", @@ -1808,10 +1832,22 @@ dependencies = [ "rustls-pki-types", "tokio", "tokio-rustls 0.25.0", - "tungstenite", + "tungstenite 0.21.0", "webpki-roots 0.26.3", ] +[[package]] +name = "tokio-tungstenite" +version = "0.23.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6989540ced10490aaf14e6bad2e3d33728a2813310a0c71d1574304c49631cd" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite 0.23.0", +] + [[package]] name = "tokio-util" version = "0.7.11" @@ -1917,6 +1953,24 @@ dependencies = [ "utf-8", ] +[[package]] +name = "tungstenite" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e2e2ce1e47ed2994fd43b04c8f618008d4cabdd5ee34027cf14f9d918edd9c8" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http 1.1.0", + "httparse", + "log", + "rand", + "sha1", + "thiserror", + "utf-8", +] + [[package]] name = "typemap_rev" version = "0.3.0" diff --git a/Cargo.toml b/Cargo.toml index 5442374..41cb93f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,3 +11,9 @@ tokio = { version = "1.35.1", features = ["rt-multi-thread", "macros", "time"] } serenity = { version = "0.12.1", features = ["collector"] } reqwest = { version = "0.12.5", features = ["json"] } chrono = "0.4.38" +futures = "0.3.30" +serde = "1.0.204" +serde_json = "1.0.122" +anyhow = "1.0.86" +async-trait = "0.1.81" +tokio-tungstenite = "0.23.1" diff --git a/src/config.rs b/src/config.rs index 34e3110..0acbac2 100644 --- a/src/config.rs +++ b/src/config.rs @@ -19,6 +19,10 @@ pub struct Config { pub telegram_bot_token: String, pub telegram_channel_id: i128, + + pub twitch_client_id: String, + pub twitch_client_secret: String, + pub twitch_channel_id: String, } @@ -33,6 +37,9 @@ impl Config { discord_game_list_message_id: get_env("DISCORD_GAME_LIST_MESSAGE_ID").parse().unwrap(), telegram_bot_token: get_env("TELEGRAM_BOT_TOKEN"), telegram_channel_id: get_env("TELEGRAM_CHANNEL_ID").parse().unwrap(), + twitch_client_id: get_env("TWITCH_CLIENT_ID"), + twitch_client_secret: get_env("TWITCH_CLIENT_SECRET"), + twitch_channel_id: get_env("TWITCH_CHANNEL_ID"), } } } diff --git a/src/handler/commands/add_game.rs b/src/discord_handler/commands/add_game.rs similarity index 100% rename from src/handler/commands/add_game.rs rename to src/discord_handler/commands/add_game.rs diff --git a/src/handler/commands/delete_game.rs b/src/discord_handler/commands/delete_game.rs similarity index 100% rename from src/handler/commands/delete_game.rs rename to src/discord_handler/commands/delete_game.rs diff --git a/src/handler/commands/mod.rs b/src/discord_handler/commands/mod.rs similarity index 100% rename from src/handler/commands/mod.rs rename to src/discord_handler/commands/mod.rs diff --git a/src/handler/mod.rs b/src/discord_handler/mod.rs similarity index 93% rename from src/handler/mod.rs rename to src/discord_handler/mod.rs index f0b53e2..c0266f2 100644 --- a/src/handler/mod.rs +++ b/src/discord_handler/mod.rs @@ -6,7 +6,6 @@ use serenity::model::channel::Message; use chrono::offset::FixedOffset; use crate::config; -use crate::notifiers::telegram::send_to_telegram; use crate::utils::{add_game, delete_game, format_games_list, parse_games_list}; pub mod commands; @@ -105,15 +104,8 @@ impl EventHandler for Handler { } } - async fn message(&self, _ctx: Context, msg: Message) { - if msg.guild_id != Some(config::CONFIG.discord_guild_id.into()) { - return; - } + async fn message(&self, _ctx: Context, _msg: Message) { - if msg.channel_id == config::CONFIG.discord_channel_id { - send_to_telegram(&msg.content).await; - return; - } } async fn ready(&self, ctx: Context, _ready: serenity::model::gateway::Ready) { diff --git a/src/main.rs b/src/main.rs index 384bc10..de4755c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,20 +1,28 @@ +use futures::StreamExt; use serenity::all::ActivityData; use serenity::prelude::*; +use twitch_handler::{auth::{self}, helix}; + +use tokio::join; + pub mod config; -pub mod handler; +pub mod discord_handler; +pub mod twitch_handler; pub mod utils; pub mod notifiers; async fn start_discord_bot() { + println!("Starting Discord bot..."); + let intents = GatewayIntents::GUILD_MESSAGES | GatewayIntents::DIRECT_MESSAGES | GatewayIntents::MESSAGE_CONTENT; let mut client = Client::builder(&config::CONFIG.discord_bot_token, intents) - .event_handler(handler::Handler) + .event_handler(discord_handler::Handler) .status(serenity::all::OnlineStatus::Online) .activity(ActivityData::playing(&config::CONFIG.discord_bot_activity)) .await @@ -25,8 +33,30 @@ async fn start_discord_bot() { } } +async fn start_twitch_bot() { + println!("Starting Twitch bot..."); + + let token_storage = auth::VoidStorage {}; + + let mut client = helix::Client::from_get_app_token( + config::CONFIG.twitch_client_id.clone(), + config::CONFIG.twitch_client_secret.clone(), + token_storage, + ).await.unwrap(); + + let mut t = client.connect_eventsub(vec![ + ("stream.online".to_string(), "1".to_string()), + ("stream.offline".to_string(), "1".to_string()), + ("channel.update".to_string(), "2".to_string()) + ], config::CONFIG.twitch_channel_id.clone()).await.unwrap(); + + while let Some(event) = t.next().await { + println!("{:?}", event); + } +} + #[tokio::main] async fn main() { - start_discord_bot().await; + join!(start_discord_bot(), start_twitch_bot()); } diff --git a/src/notifiers/discord.rs b/src/notifiers/discord.rs index d608ca7..4cb4986 100644 --- a/src/notifiers/discord.rs +++ b/src/notifiers/discord.rs @@ -1,3 +1,17 @@ -pub async fn send_to_discord(_msg: &str) { - todo!(); +use reqwest::Url; +use serenity::json::json; + +use crate::config; + + +pub async fn send_to_discord(msg: &str) { + let base_url = format!("https://discord.com/api/v10/channels/{}/messages", config::CONFIG.discord_channel_id); + + let url = Url::parse(&base_url.as_ref()).unwrap(); + + reqwest::Client::new().post(url) + .header("Authorization", format!("Bot {}", config::CONFIG.discord_bot_token)) + .json(&json!({ + "content": msg + })).send().await.expect("Error sending message to Discord"); } diff --git a/src/notifiers/telegram.rs b/src/notifiers/telegram.rs index 06029f3..f76ccbb 100644 --- a/src/notifiers/telegram.rs +++ b/src/notifiers/telegram.rs @@ -14,5 +14,6 @@ pub async fn send_to_telegram(msg: &str) { ] ).unwrap(); - reqwest::get(url).await.expect("Error sending message to Telegram"); + reqwest::Client::new().post(url) + .send().await.expect("Error sending message to Telegram"); } diff --git a/src/twitch_handler/auth.rs b/src/twitch_handler/auth.rs new file mode 100644 index 0000000..c59c7cf --- /dev/null +++ b/src/twitch_handler/auth.rs @@ -0,0 +1,186 @@ +use helix::Client; +use helix::User; + +use anyhow::Result; +use async_trait::async_trait; +use chrono::DateTime; +use chrono::Utc; +use serde::Deserialize; +use serde::Serialize; + +use super::helix; + +#[async_trait] +pub trait TokenStorage { + async fn save(&mut self, token: &Token) -> Result<()>; +} + +#[derive(Debug, Default, Clone, PartialEq)] +pub enum TokenType { + #[default] + UserAccessToken, + AppAccessToken, +} + +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +pub struct Token { + #[serde(skip)] + pub token_type: TokenType, + #[serde(default)] + pub refresh_token: String, + pub access_token: String, + pub expires_in: i64, + #[serde(default = "Utc::now")] + pub created_at: DateTime, + #[serde(skip)] + pub user: Option, +} + +#[derive(Debug, Clone)] +pub struct VoidStorage {} +#[async_trait] +impl TokenStorage for VoidStorage { + async fn save(&mut self, _token: &Token) -> Result<()> { + Ok(()) + } +} + +#[derive(Deserialize)] +struct ValidateToken { + pub expires_in: i64, +} + +impl Client { + pub async fn validate_token(&mut self) -> Result<()> { + let token = match self + .get::("https://id.twitch.tv/oauth2/validate".to_string()) + .await + { + Ok(r) => r, + Err(..) => { + self.refresh_token().await?; + return Ok(()); + } + }; + + if token.expires_in < 3600 { + self.refresh_token().await?; + } + + Ok(()) + } + + pub async fn refresh_token(&mut self) -> Result<()> { + if self.token.token_type == TokenType::AppAccessToken { + self.get_app_token().await?; + return Ok(()); + } + + let res = self + .http_request::<()>( + reqwest::Method::POST, + "https://id.twitch.tv/oauth2/token".to_string(), + None, + Some(format!( + "client_id={0}&client_secret={1}&grant_type=refresh_token&refresh_token={2}", + self.client_id, self.client_secret, self.token.refresh_token + )), + ) + .await?; + + self.token = res.json::().await?; + self.token_storage.save(&self.token).await?; + + Ok(()) + } + + pub fn from_token_no_validation( + client_id: String, + client_secret: String, + token_storage: T, + token: Token, + ) -> Client { + Client { + client_id: client_id, + client_secret: client_secret, + token: token, + http_client: reqwest::Client::new(), + token_storage: token_storage, + } + } + + pub async fn from_token( + client_id: String, + client_secret: String, + token_storage: T, + token: Token, + ) -> Result> { + let mut client = + Self::from_token_no_validation(client_id, client_secret, token_storage, token); + client.token.user = Some(client.get_user().await?); + Ok(client) + } + + async fn get_app_token(&mut self) -> Result<()> { + let token = self + .http_client + .post("https://id.twitch.tv/oauth2/token") + .body(format!( + "client_id={0}&client_secret={1}&grant_type=client_credentials", + self.client_id, self.client_secret + )) + .send() + .await? + .json::() + .await?; + + self.token = token; + self.token.token_type = TokenType::AppAccessToken; + self.token_storage.save(&self.token).await?; + + Ok(()) + } + + pub async fn from_get_app_token( + client_id: String, + client_secret: String, + token_storage: T, + ) -> Result> { + let http_client = reqwest::Client::new(); + let mut client = Client { + client_id: client_id, + client_secret: client_secret, + http_client: http_client, + token_storage: token_storage, + token: Token::default(), + }; + client.get_app_token().await?; + Ok(client) + } + + pub async fn from_authorization( + client_id: String, + client_secret: String, + token_storage: T, + code: String, + redirect_uri: String, + ) -> Result> { + let http_client = reqwest::Client::new(); + let token = http_client.post("https://id.twitch.tv/oauth2/token") + .body(format!("client_id={client_id}&client_secret={client_secret}&code={code}&grant_type=authorization_code&redirect_uri={redirect_uri}")) + .send() + .await? + .json::() + .await?; + let mut client = Client { + client_id: client_id, + client_secret: client_secret, + token: token, + http_client: http_client, + token_storage: token_storage, + }; + client.token.user = Some(client.get_user().await?); + client.token_storage.save(&client.token).await?; + Ok(client) + } +} diff --git a/src/twitch_handler/eventsub.rs b/src/twitch_handler/eventsub.rs new file mode 100644 index 0000000..1ec4df8 --- /dev/null +++ b/src/twitch_handler/eventsub.rs @@ -0,0 +1,296 @@ +use anyhow::bail; +use anyhow::Result; +use futures::Future; +use futures::Sink; +use futures::StreamExt; + +use serde::Deserialize; + +use futures::Stream; + +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +use super::auth; +use super::helix; + +#[derive(Debug, Deserialize)] +pub struct MessageMetadata { + pub message_id: String, + pub message_timestamp: String, + pub message_type: String, + pub subscription_type: Option, + pub subscription_version: Option, +} + +#[derive(Debug, Deserialize)] +pub struct Message { + pub metadata: MessageMetadata, + pub payload: serde_json::Value, +} + +#[derive(Debug, Deserialize)] +pub struct SessionWelcomeSession { + pub id: String, + pub connected_at: String, + pub status: String, + pub reconnect_url: Option, + pub keepalive_timeout_seconds: i64, +} + +#[derive(Debug, Deserialize)] +pub struct SessionWelcome { + pub session: SessionWelcomeSession, +} + +#[derive(Debug, Deserialize)] +pub struct Notification { + pub subscription: serde_json::Value, + pub event: serde_json::Value, +} + +#[derive(Debug, Deserialize)] +pub struct ChannelUpdate { + pub broadcaster_user_id: String, + pub broadcaster_user_login: String, + pub broadcaster_user_name: String, + pub title: String, + pub language: String, + pub category_id: String, + pub category_name: String, + pub content_classification_labels: Vec, +} + +#[derive(Debug, Deserialize)] +pub struct CustomRewardRedemptionAddReward { + pub id: String, +} + +#[derive(Debug, Deserialize)] +pub struct CustomRewardRedemptionAdd { + pub id: String, + pub user_login: String, + pub user_input: String, + pub reward: CustomRewardRedemptionAddReward, +} + +#[derive(Debug, Deserialize)] +pub struct StreamOnline { + pub id: String, + pub broadcaster_user_id: String, + pub broadcaster_user_login: String, + pub broadcaster_user_name: String, + pub r#type: String, + pub started_at: String, +} + +#[derive(Debug, Deserialize)] +pub struct StreamOffline { + pub broadcaster_user_id: String, + pub broadcaster_user_login: String, + pub broadcaster_user_name: String, +} + +#[derive(Debug, Deserialize)] +pub enum NotificationType { + ChannelUpdate(ChannelUpdate), + CustomRewardRedemptionAdd(CustomRewardRedemptionAdd), + StreamOnline(StreamOnline), + StreamOffline(StreamOffline), +} + +pub struct Client { + inner_stream: Pin< + Box< + tokio_tungstenite::WebSocketStream< + tokio_tungstenite::MaybeTlsStream, + >, + >, + >, + ping_sleep: Pin>, +} + +impl Stream for Client { + type Item = NotificationType; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + let mut inner_stream = this.inner_stream.as_mut(); + + match this.ping_sleep.as_mut().poll(cx) { + Poll::Pending => {} + Poll::Ready(..) => { + this.ping_sleep + .as_mut() + .reset(tokio::time::Instant::now() + tokio::time::Duration::from_secs(30)); + + match inner_stream.as_mut().start_send( + tokio_tungstenite::tungstenite::protocol::Message::Ping(vec![]), + ) { + Err(..) => return Poll::Ready(None), + _ => {} + }; + } + }; + + loop { + match inner_stream.as_mut().poll_next(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(v) => match v { + Some(Ok(tokio_tungstenite::tungstenite::protocol::Message::Ping(..))) => { + match inner_stream.as_mut().start_send( + tokio_tungstenite::tungstenite::protocol::Message::Pong(vec![]), + ) { + Ok(()) => continue, + Err(..) => break, + }; + } + Some(Ok(tokio_tungstenite::tungstenite::protocol::Message::Text(text))) => { + let message: Message = match serde_json::from_str(&text) { + Ok(v) => v, + Err(..) => break, + }; + + match message.metadata.message_type.as_str() { + "notification" => { + let subtype = match &message.metadata.subscription_type { + Some(v) => v, + None => break, + }; + + let notification: Notification = + match serde_json::from_value(message.payload.clone()) { + Ok(v) => v, + Err(..) => break, + }; + + match subtype.as_str() { + "channel.update" => { + let event: ChannelUpdate = + match serde_json::from_value(notification.event) { + Ok(v) => v, + Err(..) => break, + }; + + return Poll::Ready(Some(NotificationType::ChannelUpdate( + event, + ))); + } + "channel.channel_points_custom_reward_redemption.add" => { + let event: CustomRewardRedemptionAdd = + match serde_json::from_value(notification.event) { + Ok(v) => v, + Err(..) => break, + }; + + return Poll::Ready(Some( + NotificationType::CustomRewardRedemptionAdd(event), + )); + } + "stream.online" => { + let event: StreamOnline = + match serde_json::from_value(notification.event) { + Ok(v) => v, + Err(..) => break, + }; + + return Poll::Ready(Some(NotificationType::StreamOnline( + event, + ))); + } + "stream.offline" => { + let event: StreamOffline = + match serde_json::from_value(notification.event) { + Ok(v) => v, + Err(..) => break, + }; + + return Poll::Ready(Some(NotificationType::StreamOffline( + event, + ))); + } + _ => return Poll::Pending, + } + } + _ => continue, + } + } + Some(..) => continue, + None => break, + }, + } + } + + Poll::Ready(None) + } +} + +impl helix::Client { + pub async fn connect_eventsub(&mut self, topics: Vec<(String, String)>, broadcaster_id: String) -> Result { + let (mut ws_stream, _) = + match tokio_tungstenite::connect_async("wss://eventsub.wss.twitch.tv/ws").await { + Ok(v) => v, + Err(e) => return Err(e.into()), + }; + + let welcome = loop { + let msg = ws_stream.next().await; + match msg { + Some(Ok(tokio_tungstenite::tungstenite::protocol::Message::Text(text))) => { + let message: Message = match serde_json::from_str(&text) { + Ok(v) => v, + Err(e) => return Err(e.into()), + }; + + if message.metadata.message_type.as_str() != "session_welcome" { + bail!("No session welcome"); + } + + let welcome: SessionWelcome = + match serde_json::from_value(message.payload.clone()) { + Ok(v) => v, + Err(e) => return Err(e.into()), + }; + + break welcome; + } + Some(Err(e)) => return Err(e.into()), + Some(..) => {} + None => bail!("WebSocket dropped"), + } + }; + + for (subtype, version) in topics.into_iter() { + match self + .create_eventsub_subscription(&helix::EventSubCreate { + r#type: subtype, + version: version, + condition: helix::EventSubCondition { + broadcaster_id: Some(broadcaster_id.clone()), + broadcaster_user_id: Some(broadcaster_id.clone()), + moderator_user_id: Some(broadcaster_id.clone()), + user_id: Some(broadcaster_id.clone()), + ..Default::default() + }, + transport: helix::EventSubTransport { + method: "websocket".to_string(), + session_id: Some(welcome.session.id.clone()), + ..Default::default() + }, + }) + .await + { + Ok(..) => {} + Err(..) => { + bail!("create_eventsub_subscription failed") + } + }; + } + + Ok(Client { + inner_stream: Pin::new(Box::new(ws_stream)), + ping_sleep: Box::pin(tokio::time::sleep(tokio::time::Duration::from_secs(30))), + }) + } +} diff --git a/src/twitch_handler/helix.rs b/src/twitch_handler/helix.rs new file mode 100644 index 0000000..1154824 --- /dev/null +++ b/src/twitch_handler/helix.rs @@ -0,0 +1,1075 @@ +use auth::{Token, TokenStorage, TokenType}; + +use anyhow::bail; +use anyhow::Result; + +use reqwest::Client as HttpClient; +use reqwest::{Method, Response}; +use serde::{Deserialize, Serialize}; + +use super::auth; + +#[derive(Serialize, Deserialize)] +pub struct TwitchData { + pub data: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Pagination { + pub cursor: Option, +} + +#[derive(Debug, Clone)] +pub struct Client { + pub client_id: String, + pub client_secret: String, + pub token: Token, + pub http_client: HttpClient, + pub token_storage: T, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct User { + pub id: String, + pub login: String, + pub display_name: String, + pub r#type: String, + pub broadcaster_type: String, + pub description: String, + pub profile_image_url: String, + pub offline_image_url: String, + pub view_count: i64, + pub email: Option, + pub created_at: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RewardImage { + pub url_1x: String, + pub url_2x: String, + pub url_4x: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RewardMaxPerStream { + pub is_enabled: bool, + pub max_per_stream: i64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RewardMaxPerUserPerStream { + pub is_enabled: bool, + pub max_per_user_per_stream: i64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RewardGlobalCooldown { + pub is_enabled: bool, + pub global_cooldown_seconds: i64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Reward { + pub broadcaster_id: String, + pub broadcaster_login: String, + pub broadcaster_name: String, + pub id: String, + pub title: String, + pub prompt: String, + pub cost: i64, + pub image: Option, + pub default_image: RewardImage, + pub background_color: String, + pub is_enabled: bool, + pub is_user_input_required: bool, + pub max_per_stream_setting: RewardMaxPerStream, + pub max_per_user_per_stream_setting: RewardMaxPerUserPerStream, + pub global_cooldown_setting: RewardGlobalCooldown, + pub is_paused: bool, + pub is_in_stock: bool, + pub should_redemptions_skip_request_queue: bool, + pub redemptions_redeemed_current_stream: Option, + pub cooldown_expires_at: Option, +} + +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +pub struct RewardCreate { + pub title: String, + pub cost: i64, + pub prompt: Option, + pub is_enabled: Option, + pub background_color: Option, + pub is_user_input_required: Option, + pub is_max_per_stream_enabled: Option, + pub max_per_stream: Option, + pub is_max_per_user_per_stream_enabled: Option, + pub max_per_user_per_stream: Option, + pub is_global_cooldown_enabled: Option, + pub global_cooldown_seconds: Option, + pub should_redemptions_skip_request_queue: Option, +} + +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +pub struct RewardUpdate { + pub title: Option, + pub cost: Option, + pub prompt: Option, + pub is_enabled: Option, + pub background_color: Option, + pub is_user_input_required: Option, + pub is_max_per_stream_enabled: Option, + pub max_per_stream: Option, + pub is_max_per_user_per_stream_enabled: Option, + pub max_per_user_per_stream: Option, + pub is_global_cooldown_enabled: Option, + pub global_cooldown_seconds: Option, + pub is_paused: Option, + pub should_redemptions_skip_request_queue: Option, +} + +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +pub struct RedemptionStatus { + pub status: String, +} + +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +pub struct EventSubTransport { + pub method: String, + pub callback: Option, + pub secret: Option, + pub session_id: Option, + pub connected_at: Option, +} + +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +pub struct EventSubCondition { + pub broadcaster_id: Option, + pub broadcaster_user_id: Option, + pub moderator_user_id: Option, + pub user_id: Option, + pub from_broadcaster_user_id: Option, + pub to_broadcaster_user_id: Option, + pub reward_id: Option, +} + +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +pub struct EventSub { + pub id: String, + pub status: String, + pub r#type: String, + pub version: String, + pub condition: EventSubCondition, + pub created_at: String, + pub transport: EventSubTransport, + pub cost: i64, +} + +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +pub struct EventSubCreate { + pub r#type: String, + pub version: String, + pub condition: EventSubCondition, + pub transport: EventSubTransport, +} + +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +pub struct BanUser { + pub user_id: String, + pub duration: i64, + pub reason: Option, +} + +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +pub struct BanUserObj { + pub data: BanUser, +} + +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +pub struct BannedUser { + pub broadcaster_id: String, + pub moderator_id: String, + pub user_id: String, + pub created_at: String, + pub end_time: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ChannelInformation { + pub broadcaster_id: String, + pub broadcaster_login: String, + pub broadcaster_name: String, + pub broadcaster_language: String, + pub game_name: String, + pub game_id: String, + pub title: String, + pub delay: i64, + pub tags: Vec, + pub content_classification_labels: Vec, + pub is_branded_content: bool, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PredictionTopPredictor { + pub user_id: String, + pub user_name: String, + pub user_login: String, + pub channel_points_used: i64, + pub channel_points_won: i64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PredictionOutcome { + pub id: String, + pub title: String, + pub users: i64, + pub channel_points: i64, + pub top_predictors: Option>, + pub color: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Prediction { + pub id: String, + pub broadcaster_id: String, + pub broadcaster_name: String, + pub broadcaster_login: String, + pub title: String, + pub winning_outcome_id: Option, + pub outcomes: Vec, + pub prediction_window: i64, + pub status: String, + pub created_at: String, + pub ended_at: Option, + pub locked_at: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PredictionOutcomeCreate { + pub title: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PredictionCreate { + pub broadcaster_id: String, + pub title: String, + pub outcomes: Vec, + pub prediction_window: i64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PredictionEnd { + pub broadcaster_id: String, + pub id: String, + pub status: String, + pub winning_outcome_id: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CommercialStart { + pub broadcaster_id: String, + pub length: i64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Commercial { + pub length: i64, + pub message: String, + pub retry_after: i64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Announcement { + pub message: String, + pub color: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Whisper { + pub message: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Stream { + pub id: String, + pub user_id: String, + pub user_login: String, + pub user_name: String, + pub game_id: String, + pub game_name: String, + pub r#type: String, + pub title: String, + pub tags: Vec, + pub viewer_count: i64, + pub started_at: String, + pub language: String, + pub thumbnail_url: String, + pub is_mature: bool, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ChannelFollowers { + pub followed_at: String, + pub user_id: String, + pub user_login: String, + pub user_name: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ChannelFollowersData { + pub data: Vec, + pub pagination: Pagination, + pub total: i64, +} + +pub enum VideoId { + Id(String), + UserId(String), + GameId(String), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct VideoMutedSegment { + pub duration: i64, + pub offset: i64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Video { + pub id: String, + pub stream_id: String, + pub user_id: String, + pub user_login: String, + pub user_name: String, + pub title: String, + pub description: String, + pub created_at: String, + pub published_at: String, + pub url: String, + pub thumbnail_url: String, + pub viewable: String, + pub view_count: i64, + pub language: String, + pub r#type: String, + pub duration: String, + pub muted_segments: Option>, +} + +impl Client { + pub async fn http_request( + &mut self, + method: Method, + uri: String, + data_json: Option, + data_form: Option, + ) -> Result { + let mut req = self.http_client.request(method, uri); + + req = match data_json { + Some(data_json) => req.json(&data_json), + None => match data_form { + Some(data_form) => req.body(data_form), + None => req, + }, + }; + + let req = req + .timeout(core::time::Duration::from_secs(5)) + .header( + "Authorization", + format!("Bearer {0}", self.token.access_token), + ) + .header("Client-Id", self.client_id.clone()); + + Ok(req.send().await?) + } + + pub async fn request( + &mut self, + method: Method, + uri: String, + data_json: Option, + data_form: Option, + ) -> Result { + let mut res = self + .http_request( + method.clone(), + uri.clone(), + data_json.clone(), + data_form.clone(), + ) + .await?; + + if res.status() == reqwest::StatusCode::UNAUTHORIZED { + //Token invalid, get new? If fail, or fail again, return error. + self.refresh_token().await?; + res = self.http_request(method, uri, data_json, data_form).await?; + } + + Ok(res) + } + + pub async fn request_result< + T1: for<'de> serde::Deserialize<'de>, + T2: serde::Serialize + std::clone::Clone, + >( + &mut self, + method: Method, + uri: String, + data_json: Option, + data_form: Option, + ) -> Result { + let res = self + .request::(method, uri, data_json, data_form) + .await?; + Ok(res.json::().await?) + } + + pub async fn get serde::Deserialize<'de>>(&mut self, uri: String) -> Result { + return self + .request_result::(Method::GET, uri, None, None) + .await; + } + + pub async fn post_empty(&mut self, uri: String) -> Result<()> { + match self.request::(Method::POST, uri, None, None).await { + Ok(..) => Ok(()), + Err(e) => Err(e), + } + } + + pub async fn post_form serde::Deserialize<'de>>( + &mut self, + uri: String, + data: String, + ) -> Result { + return self + .request_result::(Method::POST, uri, None, Some(data)) + .await; + } + + pub async fn post_json< + T1: for<'de> serde::Deserialize<'de>, + T2: serde::Serialize + std::clone::Clone, + >( + &mut self, + uri: String, + data: T2, + ) -> Result { + return self + .request_result::(Method::POST, uri, Some(data), None) + .await; + } + + pub async fn post_json_empty( + &mut self, + uri: String, + data: T1, + ) -> Result<()> { + match self + .request::(Method::POST, uri, Some(data), None) + .await + { + Ok(..) => Ok(()), + Err(e) => Err(e), + } + } + + pub async fn patch_json< + T1: for<'de> serde::Deserialize<'de>, + T2: serde::Serialize + std::clone::Clone, + >( + &mut self, + uri: String, + data: T2, + ) -> Result { + return self + .request_result::(Method::PATCH, uri, Some(data), None) + .await; + } + + pub async fn delete(&mut self, uri: String) -> Result<()> { + self.request::(Method::DELETE, uri, None, None) + .await?; + Ok(()) + } + + pub async fn get_token_user(&mut self) -> Result { + match &self.token.user { + Some(v) => Ok(v.clone()), + None => { + if self.token.token_type == TokenType::UserAccessToken && self.token.user.is_none() + { + let user = self.get_user().await?; + self.token.user = Some(user.clone()); + return Ok(user); + } + + bail!("No User"); + } + } + } + + pub async fn get_token_user_id(&mut self) -> Result { + match &self.token.user { + Some(v) => Ok(v.id.clone()), + None => { + let user = self.get_token_user().await?; + return Ok(user.id.clone()); + } + } + } + + pub async fn get_token_user_login(&mut self) -> Result { + match &self.token.user { + Some(v) => Ok(v.id.clone()), + None => { + let user = self.get_token_user().await?; + return Ok(user.login.clone()); + } + } + } + + pub async fn get_users_by_ids(&mut self, user_ids: Vec) -> Result> { + Ok(self + .get::>(format!( + "https://api.twitch.tv/helix/users?id={0}", + user_ids.join("&id=") + )) + .await? + .data) + } + + pub async fn get_users_by_logins(&mut self, user_logins: Vec) -> Result> { + Ok(self + .get::>(format!( + "https://api.twitch.tv/helix/users?login={0}", + user_logins.join("&login=") + )) + .await? + .data) + } + + pub async fn get_user_by_id(&mut self, user_id: String) -> Result { + match self.get_users_by_ids(vec![user_id]).await?.first() { + Some(user) => Ok(user.clone()), + None => bail!("No User found"), + } + } + + pub async fn get_user_by_login(&mut self, user_login: String) -> Result { + match self.get_users_by_logins(vec![user_login]).await?.first() { + Some(user) => Ok(user.clone()), + None => bail!("No User found"), + } + } + + pub async fn get_user(&mut self) -> Result { + match self + .get::>("https://api.twitch.tv/helix/users".to_string()) + .await? + .data + .first() + { + Some(user) => Ok(user.clone()), + None => bail!("No User found"), + } + } + + pub async fn create_custom_reward(&mut self, reward: &RewardCreate) -> Result { + let broadcaster_id = self.get_token_user_id().await?; + match self + .post_json::, _>(format!("https://api.twitch.tv/helix/channel_points/custom_rewards?broadcaster_id={broadcaster_id}"), reward) + .await? + .data + .first() + { + Some(reward) => Ok(reward.clone()), + None => bail!("No User found"), + } + } + + pub async fn update_custom_reward( + &mut self, + id: String, + reward: &RewardUpdate, + ) -> Result { + let broadcaster_id = self.get_token_user_id().await?; + match self + .patch_json::, _>(format!("https://api.twitch.tv/helix/channel_points/custom_rewards?broadcaster_id={broadcaster_id}&id={id}"), reward) + .await? + .data + .first() + { + Some(reward) => Ok(reward.clone()), + None => bail!("No User found"), + } + } + + pub async fn get_custom_rewards(&mut self, ids: Vec) -> Result> { + let broadcaster_id = self.get_token_user_id().await?; + Ok(self + .get::>(format!( + "https://api.twitch.tv/helix/channel_points/custom_rewards?broadcaster_id={broadcaster_id}{0}", + if ids.len() > 0 { format!("&id={0}", ids.join("&id=") ) } else { "".to_string() } + )) + .await? + .data) + } + + pub async fn get_custom_reward(&mut self, id: String) -> Result { + match self.get_custom_rewards(vec![id]).await?.first() { + Some(reward) => Ok(reward.clone()), + None => bail!("No Reward found"), + } + } + + pub async fn delete_custom_reward(&mut self, id: String) -> Result<()> { + let broadcaster_id = self.get_token_user_id().await?; + Ok(self + .delete(format!( + "https://api.twitch.tv/helix/channel_points/custom_rewards?broadcaster_id={broadcaster_id}&id={id}" + )) + .await?) + } + + pub async fn update_redemptions_status( + &mut self, + id: &String, + redemptions: Vec, + status: &RedemptionStatus, + ) -> Result> { + let broadcaster_id = self.get_token_user_id().await?; + Ok(self + .patch_json::, _>(format!( + "https://api.twitch.tv/helix/channel_points/custom_rewards/redemptions?broadcaster_id={broadcaster_id}&reward_id={id}{0}", + format!("&id={0}", redemptions.join("&id=") ) + ), status) + .await? + .data) + } + + pub async fn update_redemption_status( + &mut self, + id: &String, + redemption: &String, + status: &RedemptionStatus, + ) -> Result { + match self + .update_redemptions_status(id, vec![redemption.clone()], status) + .await? + .first() + { + Some(status) => Ok(status.clone()), + None => bail!("No Redemption found"), + } + } + + pub async fn create_eventsub_subscription( + &mut self, + eventsub: &EventSubCreate, + ) -> Result { + match self + .post_json::, _>( + format!("https://api.twitch.tv/helix/eventsub/subscriptions"), + eventsub, + ) + .await? + .data + .first() + { + Some(eventsub) => Ok(eventsub.clone()), + None => bail!("No EventSub found"), + } + } + + pub async fn delete_eventsub_subscription(&mut self, id: String) -> Result<()> { + Ok(self + .delete(format!( + "https://api.twitch.tv/helix/eventsub/subscriptions?id={id}" + )) + .await?) + } + + pub async fn add_channel_moderator(&mut self, id: String) -> Result<()> { + let broadcaster_id = self.get_token_user_id().await?; + Ok(self + .post_empty(format!( + "https://api.twitch.tv/helix/moderation/moderators?broadcaster_id={broadcaster_id}&user_id={id}" + )) + .await?) + } + + pub async fn remove_channel_moderator(&mut self, id: String) -> Result<()> { + let broadcaster_id = self.get_token_user_id().await?; + Ok(self + .delete(format!( + "https://api.twitch.tv/helix/moderation/moderators?broadcaster_id={broadcaster_id}&user_id={id}" + )) + .await?) + } + + pub async fn ban_user( + &mut self, + broadcaster_id: String, + banuser: &BanUser, + ) -> Result { + let moderator_id = self.get_token_user_id().await?; + match self + .post_json::, _>( + format!("https://api.twitch.tv/helix/moderation/bans?moderator_id={moderator_id}&broadcaster_id={broadcaster_id}"), + BanUserObj { + data: banuser.clone() + }, + ) + .await? + .data + .first() + { + Some(banneduser) => Ok(banneduser.clone()), + None => bail!("Ban User failed"), + } + } + + pub async fn unban_user(&mut self, broadcaster_id: String, user_id: String) -> Result<()> { + let moderator_id = self.get_token_user_id().await?; + Ok(self + .delete(format!( + "https://api.twitch.tv/helix/moderation/bans?moderator_id={moderator_id}&broadcaster_id={broadcaster_id}&user_id={user_id}" + )) + .await?) + } + + pub async fn shoutout( + &mut self, + from_broadcaster_id: String, + to_broadcaster_id: String, + ) -> Result<()> { + let moderator_id = self.get_token_user_id().await?; + Ok(self + .post_empty(format!( + "https://api.twitch.tv/helix/chat/shoutouts?from_broadcaster_id={from_broadcaster_id}&to_broadcaster_id={to_broadcaster_id}&moderator_id={moderator_id}" + )) + .await?) + } + + pub async fn get_channel_information( + &mut self, + broadcaster_ids: Vec, + ) -> Result> { + Ok(self + .get::>(format!( + "https://api.twitch.tv/helix/channels?{0}", + if broadcaster_ids.len() > 0 { + format!( + "broadcaster_id={0}", + broadcaster_ids.join("&broadcaster_id=") + ) + } else { + "".to_string() + } + )) + .await? + .data) + } + + pub async fn whisper(&mut self, to_user_id: String, message: String) -> Result<()> { + let from_user_id = self.get_token_user_id().await?; + Ok(self + .post_json_empty( + format!("https://api.twitch.tv/helix/whispers?from_user_id={from_user_id}&to_user_id={to_user_id}"), + Whisper { + message: message + }, + ) + .await?) + } + + pub async fn get_predictions( + &mut self, + id: Option, + first: Option, + after: Option, + ) -> Result> { + let broadcaster_id = self.get_token_user_id().await?; + Ok(self + .get::>(format!( + "https://api.twitch.tv/helix/predictions?broadcaster_id={broadcaster_id}{0}{1}{2}", + if let Some(id) = id { + format!("&id={id}") + } else { + "".to_string() + }, + if let Some(first) = first { + format!("&first={first}") + } else { + "".to_string() + }, + if let Some(after) = after { + format!("&after={after}") + } else { + "".to_string() + }, + )) + .await? + .data) + } + + pub async fn create_prediction( + &mut self, + title: String, + outcomes: Vec, + prediction_window: i64, + ) -> Result { + let broadcaster_id = self.get_token_user_id().await?; + match self + .post_json::, _>( + "https://api.twitch.tv/helix/predictions".to_string(), + PredictionCreate { + broadcaster_id: broadcaster_id, + title: title, + outcomes: outcomes + .into_iter() + .map(|o| PredictionOutcomeCreate { title: o }) + .collect(), + prediction_window: prediction_window, + }, + ) + .await? + .data + .first() + { + Some(prediction) => Ok(prediction.clone()), + None => bail!("Create Prediction failed"), + } + } + + pub async fn end_prediction( + &mut self, + id: String, + status: String, + winning_outcome_id: Option, + ) -> Result { + let broadcaster_id = self.get_token_user_id().await?; + match self + .patch_json::, _>( + "https://api.twitch.tv/helix/predictions".to_string(), + PredictionEnd { + broadcaster_id: broadcaster_id, + id: id, + status: status, + winning_outcome_id: winning_outcome_id, + }, + ) + .await? + .data + .first() + { + Some(prediction) => Ok(prediction.clone()), + None => bail!("End Prediction failed"), + } + } + + pub async fn send_chat_announcement( + &mut self, + broadcaster_id: String, + message: String, + color: Option, + ) -> Result<()> { + let moderator_id = self.get_token_user_id().await?; + Ok(self + .post_json_empty( + format!("https://api.twitch.tv/helix/chat/announcements?broadcaster_id={broadcaster_id}&moderator_id={moderator_id}"), + Announcement { + message: message, + color: color, + } + ) + .await?) + } + + pub async fn start_commercial(&mut self, length: i64) -> Result { + let broadcaster_id = self.get_token_user_id().await?; + match self + .post_json::, _>( + "https://api.twitch.tv/helix/channels/commercial".to_string(), + CommercialStart { + broadcaster_id: broadcaster_id, + length: length, + }, + ) + .await? + .data + .first() + { + Some(commercial) => Ok(commercial.clone()), + None => bail!("Start Commercial failed"), + } + } + + pub async fn get_streams( + &mut self, + user_ids: Option>, + user_logins: Option>, + game_ids: Option>, + r#type: Option, + languages: Option>, + first: Option, + before: Option, + after: Option, + ) -> Result> { + Ok(self + .get::>(format!( + "https://api.twitch.tv/helix/streams?{0}{1}{2}{3}{4}{5}{6}{7}", + if let Some(user_ids) = user_ids { + format!("&user_id={}", user_ids.join("&user_id=")) + } else { + "".to_string() + }, + if let Some(user_logins) = user_logins { + format!("&user_login={}", user_logins.join("&user_login=")) + } else { + "".to_string() + }, + if let Some(game_ids) = game_ids { + format!("&game_id={}", game_ids.join("&game_id=")) + } else { + "".to_string() + }, + if let Some(type_) = r#type { + format!("&type={type_}") + } else { + "".to_string() + }, + if let Some(languages) = languages { + format!("&language={}", languages.join("&language=")) + } else { + "".to_string() + }, + if let Some(first) = first { + format!("&first={first}") + } else { + "".to_string() + }, + if let Some(before) = before { + format!("&before={before}") + } else { + "".to_string() + }, + if let Some(after) = after { + format!("&after={after}") + } else { + "".to_string() + }, + )) + .await? + .data) + } + + pub async fn get_stream(&mut self) -> Result { + let broadcaster_id = self.get_token_user_id().await?; + match self + .get_streams( + vec![broadcaster_id].into(), + None, + None, + None, + None, + None, + None, + None, + ) + .await? + .first() + { + Some(stream) => Ok(stream.clone()), + None => bail!("No stream found"), + } + } + + pub async fn add_channel_vip(&mut self, id: String) -> Result<()> { + let broadcaster_id = self.get_token_user_id().await?; + Ok(self + .post_empty(format!( + "https://api.twitch.tv/helix/channels/vips?broadcaster_id={broadcaster_id}&user_id={id}" + )) + .await?) + } + + pub async fn remove_channel_vip(&mut self, id: String) -> Result<()> { + let broadcaster_id = self.get_token_user_id().await?; + Ok(self + .delete(format!( + "https://api.twitch.tv/helix/channels/vips?broadcaster_id={broadcaster_id}&user_id={id}" + )) + .await?) + } + + pub async fn get_channel_followers_total(&mut self, broadcaster_id: String) -> Result { + Ok(self + .get::(format!( + "https://api.twitch.tv/helix/channels/followers?broadcaster_id={0}", + broadcaster_id + )) + .await? + .total) + } + + pub async fn get_videos( + &mut self, + id: VideoId, + language: Option, + period: Option, + sort: Option, + r#type: Option, + first: Option, + after: Option, + before: Option, + ) -> Result> { + Ok(self + .get::>(format!( + "https://api.twitch.tv/helix/videos?{}{}{}{}{}{}{}{}", + match id { + VideoId::Id(value) => format!("id={}", value), + VideoId::UserId(value) => format!("user_id={}", value), + VideoId::GameId(value) => format!("game_id={}", value), + }, + if let Some(value) = language { + format!("&language={}", value) + } else { + "".to_string() + }, + if let Some(value) = period { + format!("&period={}", value) + } else { + "".to_string() + }, + if let Some(value) = sort { + format!("&sort={}", value) + } else { + "".to_string() + }, + if let Some(value) = r#type { + format!("&type={}", value) + } else { + "".to_string() + }, + if let Some(value) = first { + format!("&first={}", value) + } else { + "".to_string() + }, + if let Some(value) = after { + format!("&after={}", value) + } else { + "".to_string() + }, + if let Some(value) = before { + format!("&before={}", value) + } else { + "".to_string() + } + )) + .await? + .data) + } +} diff --git a/src/twitch_handler/mod.rs b/src/twitch_handler/mod.rs new file mode 100644 index 0000000..5be11e1 --- /dev/null +++ b/src/twitch_handler/mod.rs @@ -0,0 +1,3 @@ +pub mod eventsub; +pub mod helix; +pub mod auth;