Add twitch integration

This commit is contained in:
2024-08-05 23:23:58 +02:00
parent 4b1024eb9a
commit 8a31222f25
14 changed files with 1681 additions and 17 deletions

58
Cargo.lock generated
View File

@@ -32,6 +32,12 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "anyhow"
version = "1.0.86"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da"
[[package]] [[package]]
name = "arrayvec" name = "arrayvec"
version = "0.7.4" version = "0.7.4"
@@ -308,11 +314,17 @@ dependencies = [
name = "discord-bot" name = "discord-bot"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow",
"async-trait",
"chrono", "chrono",
"futures",
"once_cell", "once_cell",
"reqwest 0.12.5", "reqwest 0.12.5",
"serde",
"serde_json",
"serenity", "serenity",
"tokio", "tokio",
"tokio-tungstenite 0.23.1",
] ]
[[package]] [[package]]
@@ -403,6 +415,7 @@ checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0"
dependencies = [ dependencies = [
"futures-channel", "futures-channel",
"futures-core", "futures-core",
"futures-executor",
"futures-io", "futures-io",
"futures-sink", "futures-sink",
"futures-task", "futures-task",
@@ -425,6 +438,17 @@ version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d"
[[package]]
name = "futures-executor"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]] [[package]]
name = "futures-io" name = "futures-io"
version = "0.3.30" version = "0.3.30"
@@ -1510,7 +1534,7 @@ dependencies = [
"static_assertions", "static_assertions",
"time", "time",
"tokio", "tokio",
"tokio-tungstenite", "tokio-tungstenite 0.21.0",
"tracing", "tracing",
"typemap_rev", "typemap_rev",
"typesize", "typesize",
@@ -1808,10 +1832,22 @@ dependencies = [
"rustls-pki-types", "rustls-pki-types",
"tokio", "tokio",
"tokio-rustls 0.25.0", "tokio-rustls 0.25.0",
"tungstenite", "tungstenite 0.21.0",
"webpki-roots 0.26.3", "webpki-roots 0.26.3",
] ]
[[package]]
name = "tokio-tungstenite"
version = "0.23.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c6989540ced10490aaf14e6bad2e3d33728a2813310a0c71d1574304c49631cd"
dependencies = [
"futures-util",
"log",
"tokio",
"tungstenite 0.23.0",
]
[[package]] [[package]]
name = "tokio-util" name = "tokio-util"
version = "0.7.11" version = "0.7.11"
@@ -1917,6 +1953,24 @@ dependencies = [
"utf-8", "utf-8",
] ]
[[package]]
name = "tungstenite"
version = "0.23.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e2e2ce1e47ed2994fd43b04c8f618008d4cabdd5ee34027cf14f9d918edd9c8"
dependencies = [
"byteorder",
"bytes",
"data-encoding",
"http 1.1.0",
"httparse",
"log",
"rand",
"sha1",
"thiserror",
"utf-8",
]
[[package]] [[package]]
name = "typemap_rev" name = "typemap_rev"
version = "0.3.0" version = "0.3.0"

View File

@@ -11,3 +11,9 @@ tokio = { version = "1.35.1", features = ["rt-multi-thread", "macros", "time"] }
serenity = { version = "0.12.1", features = ["collector"] } serenity = { version = "0.12.1", features = ["collector"] }
reqwest = { version = "0.12.5", features = ["json"] } reqwest = { version = "0.12.5", features = ["json"] }
chrono = "0.4.38" chrono = "0.4.38"
futures = "0.3.30"
serde = "1.0.204"
serde_json = "1.0.122"
anyhow = "1.0.86"
async-trait = "0.1.81"
tokio-tungstenite = "0.23.1"

View File

@@ -19,6 +19,10 @@ pub struct Config {
pub telegram_bot_token: String, pub telegram_bot_token: String,
pub telegram_channel_id: i128, pub telegram_channel_id: i128,
pub twitch_client_id: String,
pub twitch_client_secret: String,
pub twitch_channel_id: String,
} }
@@ -33,6 +37,9 @@ impl Config {
discord_game_list_message_id: get_env("DISCORD_GAME_LIST_MESSAGE_ID").parse().unwrap(), discord_game_list_message_id: get_env("DISCORD_GAME_LIST_MESSAGE_ID").parse().unwrap(),
telegram_bot_token: get_env("TELEGRAM_BOT_TOKEN"), telegram_bot_token: get_env("TELEGRAM_BOT_TOKEN"),
telegram_channel_id: get_env("TELEGRAM_CHANNEL_ID").parse().unwrap(), telegram_channel_id: get_env("TELEGRAM_CHANNEL_ID").parse().unwrap(),
twitch_client_id: get_env("TWITCH_CLIENT_ID"),
twitch_client_secret: get_env("TWITCH_CLIENT_SECRET"),
twitch_channel_id: get_env("TWITCH_CHANNEL_ID"),
} }
} }
} }

View File

@@ -6,7 +6,6 @@ use serenity::model::channel::Message;
use chrono::offset::FixedOffset; use chrono::offset::FixedOffset;
use crate::config; use crate::config;
use crate::notifiers::telegram::send_to_telegram;
use crate::utils::{add_game, delete_game, format_games_list, parse_games_list}; use crate::utils::{add_game, delete_game, format_games_list, parse_games_list};
pub mod commands; pub mod commands;
@@ -105,15 +104,8 @@ impl EventHandler for Handler {
} }
} }
async fn message(&self, _ctx: Context, msg: Message) { async fn message(&self, _ctx: Context, _msg: Message) {
if msg.guild_id != Some(config::CONFIG.discord_guild_id.into()) {
return;
}
if msg.channel_id == config::CONFIG.discord_channel_id {
send_to_telegram(&msg.content).await;
return;
}
} }
async fn ready(&self, ctx: Context, _ready: serenity::model::gateway::Ready) { async fn ready(&self, ctx: Context, _ready: serenity::model::gateway::Ready) {

View File

@@ -1,20 +1,28 @@
use futures::StreamExt;
use serenity::all::ActivityData; use serenity::all::ActivityData;
use serenity::prelude::*; use serenity::prelude::*;
use twitch_handler::{auth::{self}, helix};
use tokio::join;
pub mod config; pub mod config;
pub mod handler; pub mod discord_handler;
pub mod twitch_handler;
pub mod utils; pub mod utils;
pub mod notifiers; pub mod notifiers;
async fn start_discord_bot() { async fn start_discord_bot() {
println!("Starting Discord bot...");
let intents = GatewayIntents::GUILD_MESSAGES let intents = GatewayIntents::GUILD_MESSAGES
| GatewayIntents::DIRECT_MESSAGES | GatewayIntents::DIRECT_MESSAGES
| GatewayIntents::MESSAGE_CONTENT; | GatewayIntents::MESSAGE_CONTENT;
let mut client = let mut client =
Client::builder(&config::CONFIG.discord_bot_token, intents) Client::builder(&config::CONFIG.discord_bot_token, intents)
.event_handler(handler::Handler) .event_handler(discord_handler::Handler)
.status(serenity::all::OnlineStatus::Online) .status(serenity::all::OnlineStatus::Online)
.activity(ActivityData::playing(&config::CONFIG.discord_bot_activity)) .activity(ActivityData::playing(&config::CONFIG.discord_bot_activity))
.await .await
@@ -25,8 +33,30 @@ async fn start_discord_bot() {
} }
} }
async fn start_twitch_bot() {
println!("Starting Twitch bot...");
let token_storage = auth::VoidStorage {};
let mut client = helix::Client::from_get_app_token(
config::CONFIG.twitch_client_id.clone(),
config::CONFIG.twitch_client_secret.clone(),
token_storage,
).await.unwrap();
let mut t = client.connect_eventsub(vec![
("stream.online".to_string(), "1".to_string()),
("stream.offline".to_string(), "1".to_string()),
("channel.update".to_string(), "2".to_string())
], config::CONFIG.twitch_channel_id.clone()).await.unwrap();
while let Some(event) = t.next().await {
println!("{:?}", event);
}
}
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
start_discord_bot().await; join!(start_discord_bot(), start_twitch_bot());
} }

View File

@@ -1,3 +1,17 @@
pub async fn send_to_discord(_msg: &str) { use reqwest::Url;
todo!(); use serenity::json::json;
use crate::config;
pub async fn send_to_discord(msg: &str) {
let base_url = format!("https://discord.com/api/v10/channels/{}/messages", config::CONFIG.discord_channel_id);
let url = Url::parse(&base_url.as_ref()).unwrap();
reqwest::Client::new().post(url)
.header("Authorization", format!("Bot {}", config::CONFIG.discord_bot_token))
.json(&json!({
"content": msg
})).send().await.expect("Error sending message to Discord");
} }

View File

@@ -14,5 +14,6 @@ pub async fn send_to_telegram(msg: &str) {
] ]
).unwrap(); ).unwrap();
reqwest::get(url).await.expect("Error sending message to Telegram"); reqwest::Client::new().post(url)
.send().await.expect("Error sending message to Telegram");
} }

186
src/twitch_handler/auth.rs Normal file
View File

@@ -0,0 +1,186 @@
use helix::Client;
use helix::User;
use anyhow::Result;
use async_trait::async_trait;
use chrono::DateTime;
use chrono::Utc;
use serde::Deserialize;
use serde::Serialize;
use super::helix;
#[async_trait]
pub trait TokenStorage {
async fn save(&mut self, token: &Token) -> Result<()>;
}
#[derive(Debug, Default, Clone, PartialEq)]
pub enum TokenType {
#[default]
UserAccessToken,
AppAccessToken,
}
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct Token {
#[serde(skip)]
pub token_type: TokenType,
#[serde(default)]
pub refresh_token: String,
pub access_token: String,
pub expires_in: i64,
#[serde(default = "Utc::now")]
pub created_at: DateTime<Utc>,
#[serde(skip)]
pub user: Option<User>,
}
#[derive(Debug, Clone)]
pub struct VoidStorage {}
#[async_trait]
impl TokenStorage for VoidStorage {
async fn save(&mut self, _token: &Token) -> Result<()> {
Ok(())
}
}
#[derive(Deserialize)]
struct ValidateToken {
pub expires_in: i64,
}
impl<T: TokenStorage> Client<T> {
pub async fn validate_token(&mut self) -> Result<()> {
let token = match self
.get::<ValidateToken>("https://id.twitch.tv/oauth2/validate".to_string())
.await
{
Ok(r) => r,
Err(..) => {
self.refresh_token().await?;
return Ok(());
}
};
if token.expires_in < 3600 {
self.refresh_token().await?;
}
Ok(())
}
pub async fn refresh_token(&mut self) -> Result<()> {
if self.token.token_type == TokenType::AppAccessToken {
self.get_app_token().await?;
return Ok(());
}
let res = self
.http_request::<()>(
reqwest::Method::POST,
"https://id.twitch.tv/oauth2/token".to_string(),
None,
Some(format!(
"client_id={0}&client_secret={1}&grant_type=refresh_token&refresh_token={2}",
self.client_id, self.client_secret, self.token.refresh_token
)),
)
.await?;
self.token = res.json::<Token>().await?;
self.token_storage.save(&self.token).await?;
Ok(())
}
pub fn from_token_no_validation(
client_id: String,
client_secret: String,
token_storage: T,
token: Token,
) -> Client<T> {
Client {
client_id: client_id,
client_secret: client_secret,
token: token,
http_client: reqwest::Client::new(),
token_storage: token_storage,
}
}
pub async fn from_token(
client_id: String,
client_secret: String,
token_storage: T,
token: Token,
) -> Result<Client<T>> {
let mut client =
Self::from_token_no_validation(client_id, client_secret, token_storage, token);
client.token.user = Some(client.get_user().await?);
Ok(client)
}
async fn get_app_token(&mut self) -> Result<()> {
let token = self
.http_client
.post("https://id.twitch.tv/oauth2/token")
.body(format!(
"client_id={0}&client_secret={1}&grant_type=client_credentials",
self.client_id, self.client_secret
))
.send()
.await?
.json::<Token>()
.await?;
self.token = token;
self.token.token_type = TokenType::AppAccessToken;
self.token_storage.save(&self.token).await?;
Ok(())
}
pub async fn from_get_app_token(
client_id: String,
client_secret: String,
token_storage: T,
) -> Result<Client<T>> {
let http_client = reqwest::Client::new();
let mut client = Client {
client_id: client_id,
client_secret: client_secret,
http_client: http_client,
token_storage: token_storage,
token: Token::default(),
};
client.get_app_token().await?;
Ok(client)
}
pub async fn from_authorization(
client_id: String,
client_secret: String,
token_storage: T,
code: String,
redirect_uri: String,
) -> Result<Client<T>> {
let http_client = reqwest::Client::new();
let token = http_client.post("https://id.twitch.tv/oauth2/token")
.body(format!("client_id={client_id}&client_secret={client_secret}&code={code}&grant_type=authorization_code&redirect_uri={redirect_uri}"))
.send()
.await?
.json::<Token>()
.await?;
let mut client = Client {
client_id: client_id,
client_secret: client_secret,
token: token,
http_client: http_client,
token_storage: token_storage,
};
client.token.user = Some(client.get_user().await?);
client.token_storage.save(&client.token).await?;
Ok(client)
}
}

View File

@@ -0,0 +1,296 @@
use anyhow::bail;
use anyhow::Result;
use futures::Future;
use futures::Sink;
use futures::StreamExt;
use serde::Deserialize;
use futures::Stream;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use super::auth;
use super::helix;
#[derive(Debug, Deserialize)]
pub struct MessageMetadata {
pub message_id: String,
pub message_timestamp: String,
pub message_type: String,
pub subscription_type: Option<String>,
pub subscription_version: Option<String>,
}
#[derive(Debug, Deserialize)]
pub struct Message {
pub metadata: MessageMetadata,
pub payload: serde_json::Value,
}
#[derive(Debug, Deserialize)]
pub struct SessionWelcomeSession {
pub id: String,
pub connected_at: String,
pub status: String,
pub reconnect_url: Option<String>,
pub keepalive_timeout_seconds: i64,
}
#[derive(Debug, Deserialize)]
pub struct SessionWelcome {
pub session: SessionWelcomeSession,
}
#[derive(Debug, Deserialize)]
pub struct Notification {
pub subscription: serde_json::Value,
pub event: serde_json::Value,
}
#[derive(Debug, Deserialize)]
pub struct ChannelUpdate {
pub broadcaster_user_id: String,
pub broadcaster_user_login: String,
pub broadcaster_user_name: String,
pub title: String,
pub language: String,
pub category_id: String,
pub category_name: String,
pub content_classification_labels: Vec<String>,
}
#[derive(Debug, Deserialize)]
pub struct CustomRewardRedemptionAddReward {
pub id: String,
}
#[derive(Debug, Deserialize)]
pub struct CustomRewardRedemptionAdd {
pub id: String,
pub user_login: String,
pub user_input: String,
pub reward: CustomRewardRedemptionAddReward,
}
#[derive(Debug, Deserialize)]
pub struct StreamOnline {
pub id: String,
pub broadcaster_user_id: String,
pub broadcaster_user_login: String,
pub broadcaster_user_name: String,
pub r#type: String,
pub started_at: String,
}
#[derive(Debug, Deserialize)]
pub struct StreamOffline {
pub broadcaster_user_id: String,
pub broadcaster_user_login: String,
pub broadcaster_user_name: String,
}
#[derive(Debug, Deserialize)]
pub enum NotificationType {
ChannelUpdate(ChannelUpdate),
CustomRewardRedemptionAdd(CustomRewardRedemptionAdd),
StreamOnline(StreamOnline),
StreamOffline(StreamOffline),
}
pub struct Client {
inner_stream: Pin<
Box<
tokio_tungstenite::WebSocketStream<
tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
>,
>,
>,
ping_sleep: Pin<Box<tokio::time::Sleep>>,
}
impl Stream for Client {
type Item = NotificationType;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
let mut inner_stream = this.inner_stream.as_mut();
match this.ping_sleep.as_mut().poll(cx) {
Poll::Pending => {}
Poll::Ready(..) => {
this.ping_sleep
.as_mut()
.reset(tokio::time::Instant::now() + tokio::time::Duration::from_secs(30));
match inner_stream.as_mut().start_send(
tokio_tungstenite::tungstenite::protocol::Message::Ping(vec![]),
) {
Err(..) => return Poll::Ready(None),
_ => {}
};
}
};
loop {
match inner_stream.as_mut().poll_next(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(v) => match v {
Some(Ok(tokio_tungstenite::tungstenite::protocol::Message::Ping(..))) => {
match inner_stream.as_mut().start_send(
tokio_tungstenite::tungstenite::protocol::Message::Pong(vec![]),
) {
Ok(()) => continue,
Err(..) => break,
};
}
Some(Ok(tokio_tungstenite::tungstenite::protocol::Message::Text(text))) => {
let message: Message = match serde_json::from_str(&text) {
Ok(v) => v,
Err(..) => break,
};
match message.metadata.message_type.as_str() {
"notification" => {
let subtype = match &message.metadata.subscription_type {
Some(v) => v,
None => break,
};
let notification: Notification =
match serde_json::from_value(message.payload.clone()) {
Ok(v) => v,
Err(..) => break,
};
match subtype.as_str() {
"channel.update" => {
let event: ChannelUpdate =
match serde_json::from_value(notification.event) {
Ok(v) => v,
Err(..) => break,
};
return Poll::Ready(Some(NotificationType::ChannelUpdate(
event,
)));
}
"channel.channel_points_custom_reward_redemption.add" => {
let event: CustomRewardRedemptionAdd =
match serde_json::from_value(notification.event) {
Ok(v) => v,
Err(..) => break,
};
return Poll::Ready(Some(
NotificationType::CustomRewardRedemptionAdd(event),
));
}
"stream.online" => {
let event: StreamOnline =
match serde_json::from_value(notification.event) {
Ok(v) => v,
Err(..) => break,
};
return Poll::Ready(Some(NotificationType::StreamOnline(
event,
)));
}
"stream.offline" => {
let event: StreamOffline =
match serde_json::from_value(notification.event) {
Ok(v) => v,
Err(..) => break,
};
return Poll::Ready(Some(NotificationType::StreamOffline(
event,
)));
}
_ => return Poll::Pending,
}
}
_ => continue,
}
}
Some(..) => continue,
None => break,
},
}
}
Poll::Ready(None)
}
}
impl<T: auth::TokenStorage> helix::Client<T> {
pub async fn connect_eventsub(&mut self, topics: Vec<(String, String)>, broadcaster_id: String) -> Result<Client> {
let (mut ws_stream, _) =
match tokio_tungstenite::connect_async("wss://eventsub.wss.twitch.tv/ws").await {
Ok(v) => v,
Err(e) => return Err(e.into()),
};
let welcome = loop {
let msg = ws_stream.next().await;
match msg {
Some(Ok(tokio_tungstenite::tungstenite::protocol::Message::Text(text))) => {
let message: Message = match serde_json::from_str(&text) {
Ok(v) => v,
Err(e) => return Err(e.into()),
};
if message.metadata.message_type.as_str() != "session_welcome" {
bail!("No session welcome");
}
let welcome: SessionWelcome =
match serde_json::from_value(message.payload.clone()) {
Ok(v) => v,
Err(e) => return Err(e.into()),
};
break welcome;
}
Some(Err(e)) => return Err(e.into()),
Some(..) => {}
None => bail!("WebSocket dropped"),
}
};
for (subtype, version) in topics.into_iter() {
match self
.create_eventsub_subscription(&helix::EventSubCreate {
r#type: subtype,
version: version,
condition: helix::EventSubCondition {
broadcaster_id: Some(broadcaster_id.clone()),
broadcaster_user_id: Some(broadcaster_id.clone()),
moderator_user_id: Some(broadcaster_id.clone()),
user_id: Some(broadcaster_id.clone()),
..Default::default()
},
transport: helix::EventSubTransport {
method: "websocket".to_string(),
session_id: Some(welcome.session.id.clone()),
..Default::default()
},
})
.await
{
Ok(..) => {}
Err(..) => {
bail!("create_eventsub_subscription failed")
}
};
}
Ok(Client {
inner_stream: Pin::new(Box::new(ws_stream)),
ping_sleep: Box::pin(tokio::time::sleep(tokio::time::Duration::from_secs(30))),
})
}
}

1075
src/twitch_handler/helix.rs Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,3 @@
pub mod eventsub;
pub mod helix;
pub mod auth;