Add telegram mini app

This commit is contained in:
2025-03-08 01:32:36 +01:00
parent 148a583ce3
commit 53ec421839
22 changed files with 1226 additions and 247 deletions

36
src/backend/Cargo.toml Normal file
View File

@@ -0,0 +1,36 @@
[package]
name = "backend"
version = "0.1.0"
edition = "2024"
[dependencies]
once_cell = "1.20.3"
eyre = { version = "0.6" }
tokio = { version = "1.43.0", features = ["rt-multi-thread", "macros"] }
futures = "0.3.31"
teloxide = { version = "0.13.0", features = ["macros", "webhooks-axum", "cache-me", "throttle"] }
twitch_api = { version = "0.7.0", features = ["reqwest", "helix", "eventsub", "hmac"] }
twitch_oauth2 = "0.15.1"
axum = { version = "0.8.1", features = ["http2"] }
tower = { version = "0.5.2" }
tower-http = { version = "0.6.2", features = ["fs", "trace"] }
http-body-util = "0.1.2"
retainer = "0.3.0"
reqwest = "0.12.12"
tracing = "0.1.37"
tracing-subscriber = "0.3.16"
mongodb = "3.2.1"
url = "2.5.4"
hmac = "0.12.1"
sha2 = "0.10.8"
hex = "0.4.3"
serde = { version = "1.0.218", features = ["derive"] }
serde_json = "1.0.140"

View File

@@ -1,11 +1,15 @@
use once_cell::sync::Lazy;
pub struct Config {
pub bot_token: String,
// Telegram
pub telegram_bot_token: String,
pub telegram_webhook_url: String,
pub telegram_webhook_port: u16,
pub telegram_mini_app_port: u16,
// Twitch
pub twitch_client_id: String,
pub twitch_client_secret: String,
@@ -14,13 +18,14 @@ pub struct Config {
pub twitch_webhook_url: String,
pub twitch_webhook_port: u16,
// Common
pub mongodb_connection_string: String,
}
impl Config {
fn load() -> Self {
Self {
bot_token: std::env::var("BOT_TOKEN").expect("BOT_TOKEN is not set"),
telegram_bot_token: std::env::var("BOT_TOKEN").expect("BOT_TOKEN is not set"),
telegram_webhook_url: std::env::var("TELEGRAM_WEBHOOK_URL")
.expect("TELEGRAM_WEBHOOK_URL is not set"),
@@ -28,6 +33,10 @@ impl Config {
.expect("TELEGRAM_WEBHOOK_PORT is not set")
.parse()
.expect("TELEGRAM_WEBHOOK_PORT is not a valid u16"),
telegram_mini_app_port: std::env::var("TELEGRAM_MINI_APP_PORT")
.expect("TELEGRAM_MINI_APP_PORT is not set")
.parse()
.expect("TELEGRAM_MINI_APP_PORT is not a valid u16"),
twitch_client_id: std::env::var("TWITCH_CLIENT_ID")
.expect("TWITCH_CLIENT_ID is not set"),

View File

@@ -1,7 +1,9 @@
pub mod config;
pub mod repositories;
pub mod subscription_manager;
pub mod telegram_bot;
pub mod twitch_webhook;
pub mod web_app;
use std::sync::Arc;

View File

@@ -0,0 +1 @@
pub mod subscriptions;

View File

@@ -0,0 +1,122 @@
use futures::StreamExt as _;
use mongodb::{
Client, Collection,
bson::{Document, doc, oid::ObjectId},
};
use serde::Serialize;
use crate::config::CONFIG;
pub struct SubscriptionRepository {}
#[derive(Serialize)]
pub struct Subscription {
pub id: ObjectId,
pub streamer: String,
pub telegram_user_id: u64,
}
impl From<Document> for Subscription {
fn from(doc: Document) -> Self {
Self {
id: doc.get_object_id("_id").unwrap(),
streamer: doc.get_str("streamer").unwrap().to_string(),
telegram_user_id: doc.get_i64("telegram_user_id").unwrap() as u64,
}
}
}
impl SubscriptionRepository {
async fn get_collection() -> mongodb::error::Result<Collection<Document>> {
let client = Client::with_uri_str(CONFIG.mongodb_connection_string.clone()).await?;
let database = client.database("telegram-twitch-notifier");
Ok(database.collection("subscriptions"))
}
pub async fn get_by_id(id: ObjectId) -> mongodb::error::Result<Option<Subscription>> {
let collection = Self::get_collection().await?;
let doc = collection.find_one(doc! { "_id": id }).await?;
match doc {
Some(doc) => Ok(Some(Subscription::from(doc))),
None => Ok(None),
}
}
pub async fn get_or_create(
streamer: String,
telegram_user_id: u64,
) -> mongodb::error::Result<Subscription> {
let collection = Self::get_collection().await?;
let existing = collection
.find_one(doc! {
"streamer": streamer.clone(),
"telegram_user_id": telegram_user_id as i64,
})
.await?;
if let Some(v) = existing {
return Ok(Subscription::from(v));
}
let created = collection
.insert_one(doc! {
"streamer": streamer,
"telegram_user_id": telegram_user_id as i64,
})
.await?;
let inserted_id = created.inserted_id.as_object_id().unwrap();
Ok(SubscriptionRepository::get_by_id(inserted_id.clone())
.await?
.unwrap())
}
pub async fn delete(streamer: String, telegram_user_id: u64) -> mongodb::error::Result<()> {
let collection = Self::get_collection().await?;
collection
.delete_one(doc! {
"streamer": streamer,
"telegram_user_id": telegram_user_id as i64,
})
.await?;
Ok(())
}
pub async fn all_by_user(telegram_user_id: u64) -> mongodb::error::Result<Vec<Subscription>> {
let collection = Self::get_collection().await?;
let mut subs = collection
.find(doc! { "telegram_user_id": telegram_user_id as i64 })
.await?;
let mut result = Vec::new();
while let Some(sub) = subs.next().await {
result.push(Subscription::from(sub?));
}
Ok(result)
}
pub async fn all() -> mongodb::error::Result<Vec<Subscription>> {
let collection = Self::get_collection().await?;
let mut subs = collection.find(doc! {}).await?;
let mut result = Vec::new();
while let Some(sub) = subs.next().await {
result.push(Subscription::from(sub?));
}
Ok(result)
}
}

View File

@@ -0,0 +1,60 @@
use std::collections::{HashMap, HashSet};
use tokio::sync::RwLock;
use crate::repositories::subscriptions::SubscriptionRepository;
pub struct SubscriptionManager {
pub subscriptions: RwLock<HashMap<String, HashSet<u64>>>,
}
impl SubscriptionManager {
pub fn new() -> Self {
Self {
subscriptions: RwLock::new(HashMap::new()),
}
}
pub async fn load(&self) -> mongodb::error::Result<()> {
let subs = SubscriptionRepository::all().await?;
for sub in subs {
self.subscriptions
.write()
.await
.entry(sub.streamer.clone())
.or_insert(HashSet::new())
.insert(sub.telegram_user_id);
}
Ok(())
}
pub async fn subscribe(&self, telegram_user_id: u64, username: String) {
tracing::debug!("Subscribing {} to {}", telegram_user_id, username);
let inserted = self
.subscriptions
.write()
.await
.entry(username.clone())
.or_insert(HashSet::new())
.insert(telegram_user_id);
if !inserted {
return;
}
SubscriptionRepository::get_or_create(username, telegram_user_id)
.await
.expect("Failed to create subscription");
}
pub async fn unsubscribe(&self, telegram_user_id: u64, username: String) {
tracing::debug!("Unsubscribing {} from {}", telegram_user_id, username);
SubscriptionRepository::delete(username, telegram_user_id)
.await
.expect("Failed to delete subscription");
}
}

View File

@@ -127,7 +127,7 @@ pub async fn get_commands() -> Vec<BotCommand> {
}
pub fn get_telegram_bot() -> Bot {
OriginBot::new(CONFIG.bot_token.clone())
OriginBot::new(CONFIG.telegram_bot_token.clone())
.throttle(Limits::default())
.cache_me()
}

View File

@@ -255,7 +255,7 @@ impl TwitchWebhookServer {
}
}
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
}

View File

@@ -0,0 +1,74 @@
use axum::{
extract::Request,
http::StatusCode,
response::{IntoResponse, Response},
};
use futures::future::BoxFuture;
use tower::{Layer, Service};
use crate::config::CONFIG;
use super::validation::validate;
#[derive(Clone)]
pub struct UserId(pub u64);
#[derive(Clone)]
pub struct AuthLayer;
impl<S> Layer<S> for AuthLayer {
type Service = AuthMiddleware<S>;
fn layer(&self, inner: S) -> Self::Service {
AuthMiddleware { inner }
}
}
#[derive(Clone)]
pub struct AuthMiddleware<S> {
inner: S,
}
impl<S> Service<Request> for AuthMiddleware<S>
where
S: Service<Request, Response = Response> + Clone + Send + 'static,
S::Future: Send + 'static,
{
type Response = S::Response;
type Error = S::Error;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(
&mut self,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, mut req: Request) -> Self::Future {
let init_data = {
let header = req.headers().get("X-Init-Data");
match header {
Some(header) => {
let header = header.to_str().unwrap();
header
}
None => return Box::pin(async { Ok(StatusCode::UNAUTHORIZED.into_response()) }),
}
};
let user_id = match validate(init_data, &CONFIG.telegram_bot_token) {
Some(user_id) => user_id,
None => return Box::pin(async { Ok(StatusCode::UNAUTHORIZED.into_response()) }),
};
req.extensions_mut().insert(UserId(user_id));
let future = self.inner.call(req);
Box::pin(async move {
let response: Response = future.await?;
Ok(response)
})
}
}

View File

@@ -0,0 +1,33 @@
pub mod auth;
pub mod subscriptions;
pub mod validation;
use std::net::SocketAddr;
use axum::Router;
use subscriptions::get_api_router;
use tokio::net::TcpListener;
use tower_http::services::ServeFile;
use crate::config::CONFIG;
fn get_app() -> Router {
Router::new()
.nest_service("/assets", ServeFile::new("assets"))
.nest("/api", get_api_router())
.fallback_service(ServeFile::new("assets/index.html"))
}
pub async fn start_web_app() -> Result<(), eyre::Report> {
let app = get_app();
let address = SocketAddr::new([0, 0, 0, 0].into(), CONFIG.telegram_mini_app_port);
let _ = axum::serve(
TcpListener::bind(address).await.unwrap(),
app.into_make_service(),
)
.await;
Ok(())
}

View File

@@ -0,0 +1,47 @@
use axum::{
Extension, Json, Router,
extract::Path,
http::StatusCode,
response::IntoResponse,
routing::{delete, get, post},
};
use crate::repositories::subscriptions::SubscriptionRepository;
use super::auth::{AuthLayer, UserId};
async fn get_subscriptions(Extension(UserId(user_id)): Extension<UserId>) -> impl IntoResponse {
let subs = SubscriptionRepository::all_by_user(user_id).await.unwrap();
Json(subs).into_response()
}
async fn create_subscription(
Path(streamer): Path<String>,
Extension(UserId(user_id)): Extension<UserId>,
) -> impl IntoResponse {
let sub = SubscriptionRepository::get_or_create(streamer, user_id)
.await
.unwrap();
Json(sub).into_response()
}
async fn delete_subscription(
Path(streamer): Path<String>,
Extension(UserId(user_id)): Extension<UserId>,
) -> impl IntoResponse {
SubscriptionRepository::delete(streamer, user_id)
.await
.unwrap();
StatusCode::NO_CONTENT
}
pub fn get_api_router() -> Router {
Router::new()
.route("/subscriptions/", get(get_subscriptions))
.route("/subscriptions/:streamer/", post(create_subscription))
.route("/subscriptions/:streamer/", delete(delete_subscription))
.layer(AuthLayer)
}

View File

@@ -0,0 +1,89 @@
use hmac::{Hmac, Mac};
use serde::Deserialize;
use sha2::Sha256;
use url::form_urlencoded;
type HmacSha256 = Hmac<Sha256>;
#[derive(Debug, Clone, Deserialize)]
pub struct User {
pub id: u64,
}
pub fn parse(init_data: &str) -> Option<u64> {
if init_data.is_empty() {
return None;
}
if init_data.contains(';') || !init_data.contains('=') {
return None;
}
let pairs = form_urlencoded::parse(init_data.as_bytes());
for (key, value) in pairs {
if key == "user" {
let user_data = serde_json::from_str::<User>(&value).ok();
return match user_data {
Some(user) => Some(user.id),
None => None,
};
}
}
None
}
fn extract_hash(init_data: &str) -> Option<(String, String)> {
let (base_data, hash) = if let Some(pos) = init_data.find("&hash=") {
let (base, hash_part) = init_data.split_at(pos);
let hash = &hash_part[6..]; // Skip "&hash="
(base.to_string(), hash.to_string())
} else {
return None;
};
if !hash.chars().all(|c| c.is_ascii_hexdigit()) || hash.len() != 64 {
return None;
}
Some((base_data, hash))
}
fn sign(data: &str, token: &str) -> Result<String, ()> {
let secret_key = {
let mut mac = HmacSha256::new_from_slice(token.as_bytes()).unwrap();
mac.update(b"WebAppData");
mac.finalize().into_bytes()
};
let token_bytes = {
let mut mac = HmacSha256::new_from_slice(data.as_bytes()).unwrap();
mac.update(&secret_key);
mac.finalize().into_bytes()
};
Ok(hex::encode(token_bytes))
}
pub fn validate(init_data: &str, token: &str) -> Option<u64> {
if init_data.is_empty() || !init_data.contains('=') {
return None;
}
let (base_data, hash) = match extract_hash(init_data) {
Some(v) => v,
None => return None,
};
let expected_hash = match sign(&base_data, token) {
Ok(v) => v,
Err(_) => return None,
};
if hash != expected_hash {
return None;
}
parse(&base_data)
}

10
src/frontend/Cargo.toml Normal file
View File

@@ -0,0 +1,10 @@
[package]
name = "frontend"
version = "0.1.0"
edition = "2024"
[dependencies]
yew = { git = "https://github.com/yewstack/yew/", features = ["csr"] }
stylist = "0.13"
web-sys = { version = "0.3.77" }
gloo-net = "0.6.0"

6
src/frontend/Trunk.toml Normal file
View File

@@ -0,0 +1,6 @@
[build]
public_url = "/assets/"
[serve]
address = "127.0.0.1"
port = 8000

10
src/frontend/index.html Normal file
View File

@@ -0,0 +1,10 @@
<!doctype html>
<html>
<head>
<meta charset="utf-8" />
<title>Yew App</title>
</head>
<body>
<script src="https://telegram.org/js/telegram-web-app.js?56"></script>
</body>
</html>

53
src/frontend/src/main.rs Normal file
View File

@@ -0,0 +1,53 @@
use stylist::style;
use yew::prelude::*;
#[derive(Clone, PartialEq, Properties)]
struct SubscriptionProps {
username: String,
}
#[function_component]
fn Subscription(props: &SubscriptionProps) -> Html {
html! {
<div>
{ props.username.clone() }
</div>
}
}
#[function_component]
fn Settings() -> Html {
let subscriptions = vec!["kurbezz"];
let header_style = style!(
r#"
font-size: 24px;
"#
)
.expect("Failed to mount style");
html! {
<div>
<h1 class={classes!(header_style.get_class_name().to_string())}>{ "Settings" }</h1>
<div>
{
subscriptions
.iter()
.map(|sub| html! { <Subscription username={*sub} /> })
.collect::<Html>()
}
</div>
</div>
}
}
#[function_component]
fn App() -> Html {
html! {
<Settings />
}
}
fn main() {
yew::Renderer::<App>::new().render();
}

View File

@@ -1,95 +0,0 @@
use std::collections::{HashMap, HashSet};
use futures::StreamExt;
use mongodb::{
Client, Collection,
bson::{Document, doc},
};
use tokio::sync::RwLock;
use crate::config::CONFIG;
pub struct SubscriptionManager {
pub subscriptions: RwLock<HashMap<String, HashSet<u64>>>,
}
impl SubscriptionManager {
pub fn new() -> Self {
Self {
subscriptions: RwLock::new(HashMap::new()),
}
}
async fn get_collection() -> mongodb::error::Result<Collection<Document>> {
let client = Client::with_uri_str(CONFIG.mongodb_connection_string.clone()).await?;
let database = client.database("telegram-twitch-notifier");
Ok(database.collection("subscriptions"))
}
pub async fn load(&self) -> mongodb::error::Result<()> {
let collection = Self::get_collection().await?;
let mut subs = collection.find(doc! {}).await?;
while let Some(sub) = subs.next().await {
let sub = sub?;
let username = sub.get_str("streamer").unwrap();
let telegram_user_id = sub.get_i64("telegram_user_id").unwrap() as u64;
self.subscribe(telegram_user_id, username.to_string()).await;
}
Ok(())
}
pub async fn subscribe(&self, telegram_user_id: u64, username: String) {
tracing::debug!("Subscribing {} to {}", telegram_user_id, username);
let inserted = self
.subscriptions
.write()
.await
.entry(username.clone())
.or_insert(HashSet::new())
.insert(telegram_user_id);
if !inserted {
return;
}
Self::get_collection()
.await
.unwrap()
.insert_one(doc! {
"streamer": username,
"telegram_user_id": telegram_user_id as i64,
})
.await
.unwrap();
}
pub async fn unsubscribe(&self, telegram_user_id: u64, username: String) {
tracing::debug!("Unsubscribing {} from {}", telegram_user_id, username);
self.subscriptions
.write()
.await
.entry(username.clone())
.and_modify(|set| {
set.remove(&telegram_user_id);
});
Self::get_collection()
.await
.unwrap()
.delete_one(doc! {
"streamer": username,
"telegram_user_id": telegram_user_id as i64,
})
.await
.unwrap();
}
}