Optimize user activity update

This commit is contained in:
2023-05-22 01:09:51 +02:00
parent b11766af96
commit b60005329f
5 changed files with 439 additions and 51 deletions

View File

@@ -2,9 +2,10 @@ pub mod modules;
pub mod services;
mod tools;
use moka::future::Cache;
use teloxide::{prelude::*, types::BotCommand, adaptors::{Throttle, CacheMe}};
use crate::bots::approved_bot::services::user_settings::create_or_update_user_settings;
use crate::{bots::approved_bot::services::user_settings::create_or_update_user_settings, bots_manager::AppState};
use self::{
modules::{
@@ -18,9 +19,15 @@ use self::{
use super::{ignore_channel_messages, BotCommands, BotHandler, bots_manager::get_manager_handler, ignore_chat_member_update};
async fn _update_activity(me: teloxide::types::Me, user: teloxide::types::User) -> Option<()> {
async fn _update_activity(me: teloxide::types::Me, user: teloxide::types::User, cache: Cache<UserId, bool>) -> Option<()> {
if cache.contains_key(&user.id) {
return None;
}
tokio::spawn(async move {
if update_user_activity(user.id).await.is_err() {
let mut update_result = update_user_activity(user.id).await;
if update_result.is_err() {
let allowed_langs = get_user_or_default_lang_codes(user.id).await;
if create_or_update_user_settings(
@@ -32,12 +39,13 @@ async fn _update_activity(me: teloxide::types::Me, user: teloxide::types::User)
allowed_langs,
).await.is_ok()
{
#[allow(unused_must_use)]
{
update_user_activity(user.id).await;
}
update_result = update_user_activity(user.id).await;
}
}
if update_result.is_ok() {
cache.insert(user.id, true).await;
}
});
None
@@ -47,15 +55,15 @@ fn update_user_activity_handler() -> BotHandler {
dptree::entry()
.branch(
Update::filter_callback_query().chain(dptree::filter_map_async(
|cq: CallbackQuery, bot: CacheMe<Throttle<Bot>>| async move {
_update_activity(bot.get_me().await.unwrap(), cq.from).await
|cq: CallbackQuery, bot: CacheMe<Throttle<Bot>>, app_state: AppState| async move {
_update_activity(bot.get_me().await.unwrap(), cq.from, app_state.user_activity_cache).await
},
)),
)
.branch(Update::filter_message().chain(dptree::filter_map_async(
|message: Message, bot: CacheMe<Throttle<Bot>>| async move {
|message: Message, bot: CacheMe<Throttle<Bot>>, app_state: AppState| async move {
match message.from() {
Some(user) => _update_activity(bot.get_me().await.unwrap(), user.clone()).await,
Some(user) => _update_activity(bot.get_me().await.unwrap(), user.clone(), app_state.user_activity_cache).await,
None => None,
}
},

View File

@@ -0,0 +1,44 @@
use serde::Deserialize;
use crate::config;
#[derive(Deserialize, Debug, PartialEq, Clone, Copy)]
pub enum BotStatus {
#[serde(rename = "pending")]
Pending,
#[serde(rename = "approved")]
Approved,
#[serde(rename = "blocked")]
Blocked,
}
#[derive(Deserialize, Debug, PartialEq, Clone, Copy)]
pub enum BotCache {
#[serde(rename = "original")]
Original,
#[serde(rename = "no_cache")]
NoCache,
}
#[derive(Deserialize, Debug)]
pub struct BotData {
pub id: u32,
pub token: String,
pub status: BotStatus,
pub cache: BotCache,
}
pub async fn get_bots() -> Result<Vec<BotData>, reqwest::Error> {
let client = reqwest::Client::new();
let response = client
.get(&config::CONFIG.manager_url)
.header("Authorization", &config::CONFIG.manager_api_key)
.send()
.await;
match response {
Ok(v) => v.json::<Vec<BotData>>().await,
Err(err) => Err(err),
}
}

View File

@@ -1,3 +1,5 @@
pub mod bot_manager_client;
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
@@ -12,51 +14,20 @@ use teloxide::{
};
use url::Url;
use serde::Deserialize;
use moka::future::Cache;
use crate::config;
pub use self::bot_manager_client::{BotStatus, BotCache, BotData};
use self::bot_manager_client::get_bots;
#[derive(Deserialize, Debug, PartialEq, Clone, Copy)]
pub enum BotStatus {
#[serde(rename = "pending")]
Pending,
#[serde(rename = "approved")]
Approved,
#[serde(rename = "blocked")]
Blocked,
}
#[derive(Deserialize, Debug, PartialEq, Clone, Copy)]
pub enum BotCache {
#[serde(rename = "original")]
Original,
#[serde(rename = "no_cache")]
NoCache,
}
#[derive(Deserialize, Debug)]
struct BotData {
id: u32,
token: String,
status: BotStatus,
cache: BotCache,
}
async fn get_bots() -> Result<Vec<BotData>, reqwest::Error> {
let client = reqwest::Client::new();
let response = client
.get(&config::CONFIG.manager_url)
.header("Authorization", &config::CONFIG.manager_api_key)
.send()
.await;
match response {
Ok(v) => v.json::<Vec<BotData>>().await,
Err(err) => Err(err),
}
#[derive(Clone)]
pub struct AppState {
pub user_activity_cache: Cache<UserId, bool>,
}
pub struct BotsManager {
app_state: AppState,
next_port: u16,
bot_port_map: HashMap<u32, u16>,
bot_status_and_cache_map: HashMap<u32, (BotStatus, BotCache)>,
@@ -66,6 +37,12 @@ pub struct BotsManager {
impl BotsManager {
pub fn create() -> Self {
BotsManager {
app_state: AppState {
user_activity_cache: Cache::builder()
.time_to_live(Duration::from_secs(5 * 60))
.max_capacity(4096)
.build()
},
next_port: 8000,
bot_port_map: HashMap::new(),
bot_status_and_cache_map: HashMap::new(),
@@ -120,7 +97,7 @@ impl BotsManager {
}
let mut dispatcher = Dispatcher::builder(bot, handler)
.dependencies(dptree::deps![bot_data.cache])
.dependencies(dptree::deps![bot_data.cache, self.app_state.clone()])
.build();
let shutdown_token = dispatcher.shutdown_token();