Refactor bots manager

This commit is contained in:
2022-03-01 22:27:26 +03:00
parent e35b874fb7
commit 1edc63614f
5 changed files with 326 additions and 325 deletions

View File

@@ -2,7 +2,7 @@ import { Context } from 'telegraf';
import { clearBookCache, getBookCache, downloadFromCache } from '../services/book_cache'; import { clearBookCache, getBookCache, downloadFromCache } from '../services/book_cache';
import { getBookCacheBuffer } from '../services/book_cache_buffer'; import { getBookCacheBuffer } from '../services/book_cache_buffer';
import { BotState, Cache } from '@/bots/manager'; import { BotState, Cache } from '@/bots/manager/types';
async function _sendFile(ctx: Context, state: BotState, chatId: number, id: number, format: string) { async function _sendFile(ctx: Context, state: BotState, chatId: number, id: number, format: string) {

View File

@@ -5,14 +5,13 @@ import { Server } from 'http';
import * as dockerIpTools from "docker-ip-get"; import * as dockerIpTools from "docker-ip-get";
import got from 'got';
import { Telegraf } from 'telegraf'; import { Telegraf } from 'telegraf';
import env from '@/config'; import env from '@/config';
import getBot, { BotStatuses } from './factory/index'; import getBot from '../factory/index';
import UsersCounter from '@/analytics/users_counter'; import UsersCounter from '@/analytics/users_counter';
import { WebhookInfo } from 'telegraf/typings/core/types/typegram'; import { makeSyncRequest } from "./utils";
import { BotState } from "./types";
Sentry.init({ Sentry.init({
@@ -20,44 +19,16 @@ Sentry.init({
}); });
export enum Cache {
ORIGINAL = "original",
BUFFER = "buffer",
NO_CACHE = "no_cache"
}
export interface BotState {
id: number;
token: string;
status: BotStatuses;
cache: Cache;
created_time: string;
}
async function _makeSyncRequest(): Promise<BotState[] | null> {
try {
const response = await got<BotState[]>(env.MANAGER_URL, {
headers: {
'Authorization': env.MANAGER_API_KEY
},
responseType: 'json',
});
return response.body;
} catch (err) {
return null;
}
}
export default class BotsManager { export default class BotsManager {
static server: Server | null = null;
// Bots
static bots: {[key: number]: Telegraf} = {}; static bots: {[key: number]: Telegraf} = {};
static botsStates: {[key: number]: BotState} = {}; static botsStates: {[key: number]: BotState} = {};
static botsPeddingUpdateCount: {[key: number]: number} = {};
// Intervals
static syncInterval: NodeJS.Timer | null = null; static syncInterval: NodeJS.Timer | null = null;
static checkStateInterval: NodeJS.Timer | null = null;
static server: Server | null = null;
static async start() { static async start() {
this.launch(); this.launch();
@@ -67,56 +38,25 @@ export default class BotsManager {
if (this.syncInterval === null) { if (this.syncInterval === null) {
this.syncInterval = setInterval(() => this.sync(), 30_000); this.syncInterval = setInterval(() => this.sync(), 30_000);
} }
if (this.checkStateInterval === null) {
this.checkStateInterval = setInterval(() => this.checkBotStatus(), 60_000);
}
}
static async checkBotStatus() {
Object.values(this.bots).forEach((bot) => this._checkBotState(bot));
}
static async _checkBotState(bot: Telegraf) {
let webhookInfo: WebhookInfo | null = null;
try {
webhookInfo = await bot.telegram.getWebhookInfo();
} catch (e) {}
if (webhookInfo === null) return;
if (webhookInfo.pending_update_count <= 5 || webhookInfo.url === undefined) return;
try {
await bot.telegram.setWebhook(webhookInfo.url);
} catch (e) {}
} }
static async sync() { static async sync() {
const botsData = await _makeSyncRequest(); const botsData = await makeSyncRequest();
if (botsData !== null) { if (botsData !== null) {
await Promise.all(botsData.map((state) => this.updateBotState(state))); await Promise.all(botsData.map((state) => this._updateBotState(state)));
}
} }
static async setWebhook(bot: Telegraf, state: BotState): Promise<boolean> { if (botsData !== null) {
const dockerIps = (await dockerIpTools.getContainerIp()).split(" "); await Promise.all(
Object.values(this.botsStates).map(
for (const dockerIp of dockerIps) { (value: BotState) => this._checkPendingUpdates(this.bots[value.id], value)
try { )
await bot.telegram.setWebhook(
`${env.WEBHOOK_BASE_URL}:${env.WEBHOOK_PORT}/${state.id}/${bot.telegram.token}`, {
ip_address: dockerIp,
}
); );
return true;
} catch (e) {}
} }
return false;
} }
static async updateBotState(state: BotState) { static async _updateBotState(state: BotState) {
const isExists = this.bots[state.id] !== undefined; const isExists = this.bots[state.id] !== undefined;
if (isExists && if (isExists &&
@@ -143,19 +83,48 @@ export default class BotsManager {
return; return;
} }
if (!(await this.setWebhook(bot, state))) return; if (!(await this._setWebhook(bot, state))) return;
this.bots[state.id] = bot; this.bots[state.id] = bot;
this.botsStates[state.id] = state; this.botsStates[state.id] = state;
} }
static async _checkPendingUpdates(bot: Telegraf, state: BotState) {
try {
const webhookInfo = await bot.telegram.getWebhookInfo();
const previousPendingUpdateCount = this.botsPeddingUpdateCount[state.id] || 0;
if (previousPendingUpdateCount !== 0 && webhookInfo.pending_update_count !== 0) {
this._setWebhook(bot, state);
}
this.botsPeddingUpdateCount[state.id] = webhookInfo.pending_update_count;
} catch (e) {}
}
static async _setWebhook(bot: Telegraf, state: BotState): Promise<boolean> {
const dockerIps = (await dockerIpTools.getContainerIp()).split(" ");
for (const dockerIp of dockerIps) {
try {
await bot.telegram.setWebhook(
`${env.WEBHOOK_BASE_URL}:${env.WEBHOOK_PORT}/${state.id}/${bot.telegram.token}`, {
ip_address: dockerIp,
}
);
return true;
} catch (e) {}
}
return false;
}
static async handleUpdate(req: Request, res: Response, next: NextFunction) { static async handleUpdate(req: Request, res: Response, next: NextFunction) {
const botIdStr = req.url.split("/")[1]; const botIdStr = req.url.split("/")[1];
const bot = this.bots[parseInt(botIdStr)]; const bot = this.bots[parseInt(botIdStr)];
await bot.webhookCallback(`/${botIdStr}/${bot.telegram.token}`)(req, res); await bot.webhookCallback(`/${botIdStr}/${bot.telegram.token}`)(req, res);
} }
static async launch() { private static async launch() {
const application = express(); const application = express();
application.get("/healthcheck", (req, res) => { application.get("/healthcheck", (req, res) => {

16
src/bots/manager/types.ts Normal file
View File

@@ -0,0 +1,16 @@
import { BotStatuses } from '../factory/index';
export enum Cache {
ORIGINAL = "original",
BUFFER = "buffer",
NO_CACHE = "no_cache"
}
export interface BotState {
id: number;
token: string;
status: BotStatuses;
cache: Cache;
created_time: string;
}

21
src/bots/manager/utils.ts Normal file
View File

@@ -0,0 +1,21 @@
import got from 'got';
import env from '@/config';
import { BotState } from "./types";
export async function makeSyncRequest(): Promise<BotState[] | null> {
try {
const response = await got<BotState[]>(env.MANAGER_URL, {
headers: {
'Authorization': env.MANAGER_API_KEY
},
responseType: 'json',
});
return response.body;
} catch (err) {
return null;
}
}

475
yarn.lock

File diff suppressed because it is too large Load Diff