From b32d5e845ed012635ac53bc52084831d6e48f7f5 Mon Sep 17 00:00:00 2001 From: Bulat Kurbanov Date: Wed, 14 Aug 2024 00:57:56 +0200 Subject: [PATCH] Update --- src/config.py | 44 +++++-- src/services/discord.py | 56 +++++++-- src/services/notification.py | 32 +++-- src/services/scheduler_sync/discord_events.py | 21 ++-- src/services/scheduler_sync/synchronizer.py | 35 ++++-- src/services/scheduler_sync/twitch_events.py | 6 +- src/services/twitch.py | 113 ++++++++++++------ 7 files changed, 215 insertions(+), 92 deletions(-) diff --git a/src/config.py b/src/config.py index 0477b93..a6059d0 100644 --- a/src/config.py +++ b/src/config.py @@ -1,31 +1,57 @@ +import json + +from pydantic import BaseModel, field_validator from pydantic_settings import BaseSettings +class TwitchConfig(BaseModel): + CHANNEL_ID: str + CHANNEL_NAME: str + + +class DiscordConfig(BaseModel): + GUILD_ID: int + CHANNEL_ID: int + + GAME_LIST_CHANNEL_ID: int + GAME_LIST_MESSAGE_ID: int + + +class StreamerConfig(BaseModel): + TWITCH: TwitchConfig + DISCORD: DiscordConfig | None = None + TELEGRAM_CHANNEL_ID: int | None = None + + START_STREAM_MESSAGE: str | None = None + CHANGE_CATEGORY_MESSAGE: str | None = None + + class Config(BaseSettings): DISCORD_BOT_TOKEN: str DISCORD_BOT_ID: str - - DISCORD_GUILD_ID: int - DISCORD_CHANNEL_ID: int - DISCORD_BOT_ACTIVITY: str - DISCORD_GAME_LIST_CHANNEL_ID: int - DISCORD_GAME_LIST_MESSAGE_ID: int - TELEGRAM_BOT_TOKEN: str - TELEGRAM_CHANNEL_ID: int TWITCH_CLIENT_ID: str TWITCH_CLIENT_SECRET: str - TWITCH_CHANNEL_ID: str TWITCH_ADMIN_USER_ID: str TWITCH_CALLBACK_URL: str TWITCH_CALLBACK_PORT: int = 80 + STREAMERS: list[StreamerConfig] = [] + SECRETS_FILE_PATH: str + @field_validator("STREAMERS", mode="before") + def check_streamers(cls, value): + if isinstance(value, str): + return json.loads(value) + + return value + + config = Config() # type: ignore diff --git a/src/services/discord.py b/src/services/discord.py index 9c8cac1..c5313eb 100644 --- a/src/services/discord.py +++ b/src/services/discord.py @@ -1,4 +1,3 @@ -import asyncio import logging import discord @@ -14,6 +13,21 @@ from config import config logger = logging.getLogger(__name__) +def get_game_list_channel_to_message_map() -> dict[int, int]: + result = {} + + for streamer in config.STREAMERS: + if streamer.DISCORD is None: + continue + + if streamer.DISCORD.GAME_LIST_CHANNEL_ID is None or streamer.DISCORD.GAME_LIST_MESSAGE_ID is None: + continue + + result[streamer.DISCORD.GAME_LIST_CHANNEL_ID] = streamer.DISCORD.GAME_LIST_MESSAGE_ID + + return result + + class DiscordClient(discord.Client): def __init__(self) -> None: intents = discord.Intents.default() @@ -24,8 +38,15 @@ class DiscordClient(discord.Client): self.tree = app_commands.CommandTree(self) async def setup_hook(self): - self.tree.copy_global_to(guild=Object(id=config.DISCORD_GUILD_ID)) - await self.tree.sync(guild=Object(id=config.DISCORD_GUILD_ID)) + for streamer in config.STREAMERS: + if streamer.DISCORD is None: + continue + + if streamer.DISCORD.GAME_LIST_CHANNEL_ID is None or streamer.DISCORD.GAME_LIST_MESSAGE_ID is None: + continue + + self.tree.copy_global_to(guild=Object(id=streamer.DISCORD.GUILD_ID)) + await self.tree.sync(guild=Object(id=streamer.DISCORD.GUILD_ID)) async def on_ready(self): await self.change_presence( @@ -58,15 +79,22 @@ async def add( game: str, date: str | None = None ): - if interaction.channel is None or interaction.channel.id != config.DISCORD_GAME_LIST_CHANNEL_ID: + channel_to_message = get_game_list_channel_to_message_map() + + if interaction.channel is None: await interaction.response.send_message("Команда не доступна в этом канале (#1)", ephemeral=True) return + message_id = channel_to_message.get(interaction.channel.id) + if message_id is None: + await interaction.response.send_message("Команда не доступна в этом канале (#3)", ephemeral=True) + return + if not isinstance(interaction.channel, Messageable): await interaction.response.send_message("Команда не доступна в этом канале (#2)", ephemeral=True) return - game_list_message = await interaction.channel.fetch_message(config.DISCORD_GAME_LIST_MESSAGE_ID) + game_list_message = await interaction.channel.fetch_message(message_id) game_list = GameList.parse(game_list_message.content) game_list.add_game(category, GameItem(name=game, customer=customer, date=date)) @@ -83,7 +111,12 @@ async def game_list_autocomplete( if not isinstance(interaction.channel, Messageable): return [] - game_list_message = await interaction.channel.fetch_message(config.DISCORD_GAME_LIST_MESSAGE_ID) + channel_to_message = get_game_list_channel_to_message_map() + message_id = channel_to_message.get(interaction.channel.id) + if message_id is None: + return [] + + game_list_message = await interaction.channel.fetch_message(message_id) game_list = GameList.parse(game_list_message.content) @@ -94,15 +127,22 @@ async def game_list_autocomplete( @app_commands.describe(game="Игра") @app_commands.autocomplete(game=game_list_autocomplete) async def delete(interaction: discord.Interaction, game: str): - if interaction.channel is None or interaction.channel.id != config.DISCORD_GAME_LIST_CHANNEL_ID: + channel_to_message = get_game_list_channel_to_message_map() + + if interaction.channel is None: await interaction.response.send_message("Команда не доступна в этом канале (#1)", ephemeral=True) return + message_id = channel_to_message.get(interaction.channel.id) + if message_id is None: + await interaction.response.send_message("Команда не доступна в этом канале (#3)", ephemeral=True) + return + if not isinstance(interaction.channel, Messageable): await interaction.response.send_message("Команда не доступна в этом канале (#2)", ephemeral=True) return - game_list_message = await interaction.channel.fetch_message(config.DISCORD_GAME_LIST_MESSAGE_ID) + game_list_message = await interaction.channel.fetch_message(message_id) game_list = GameList.parse(game_list_message.content) game_list.delete_game(game) diff --git a/src/services/notification.py b/src/services/notification.py index f81aaa4..162a94c 100644 --- a/src/services/notification.py +++ b/src/services/notification.py @@ -1,25 +1,28 @@ -from asyncio import gather +import logging from httpx import AsyncClient -from config import config +from config import config, StreamerConfig -async def notify_telegram(msg: str): +logger = logging.getLogger(__name__) + + +async def notify_telegram(msg: str, chat_id: str): async with AsyncClient() as client: await client.post( f"https://api.telegram.org/bot{config.TELEGRAM_BOT_TOKEN}/sendMessage", json={ - "chat_id": config.TELEGRAM_CHANNEL_ID, + "chat_id": chat_id, "text": msg, } ) -async def notify_discord(msg: str): +async def notify_discord(msg: str, channel_id: str): async with AsyncClient() as client: await client.post( - f"https://discord.com/api/v10/channels/{config.DISCORD_CHANNEL_ID}/messages", + f"https://discord.com/api/v10/channels/{channel_id}/messages", headers={ "Authorization": f"Bot {config.DISCORD_BOT_TOKEN}" }, @@ -29,8 +32,15 @@ async def notify_discord(msg: str): ) -async def notify(msg: str): - await gather( - notify_telegram(msg), - notify_discord(msg) - ) +async def notify(msg: str, streamer_config: StreamerConfig): + if streamer_config.DISCORD is not None: + try: + await notify_discord(msg, str(streamer_config.DISCORD.CHANNEL_ID)) + except Exception as e: + logger.error("Failed to notify discord", exc_info=e) + + if streamer_config.TELEGRAM_CHANNEL_ID is not None: + try: + await notify_telegram(msg, str(streamer_config.TELEGRAM_CHANNEL_ID)) + except Exception as e: + logger.error("Failed to notify telegram", exc_info=e) diff --git a/src/services/scheduler_sync/discord_events.py b/src/services/scheduler_sync/discord_events.py index 4771fb0..3b07d9f 100644 --- a/src/services/scheduler_sync/discord_events.py +++ b/src/services/scheduler_sync/discord_events.py @@ -46,10 +46,10 @@ class DiscordEvent(BaseModel): creator_id: str -async def get_discord_events() -> list[DiscordEvent]: +async def get_discord_events(guild_id: int) -> list[DiscordEvent]: async with AsyncClient() as client: response = await client.get( - f"https://discord.com/api/v10/guilds/{config.DISCORD_GUILD_ID}/scheduled-events", + f"https://discord.com/api/v10/guilds/{guild_id}/scheduled-events", headers={"Authorization": f"Bot {config.DISCORD_BOT_TOKEN}"} ) @@ -60,10 +60,10 @@ async def get_discord_events() -> list[DiscordEvent]: return [event for event in events if event.creator_id == config.DISCORD_BOT_ID] -async def delete_discord_event(event_id: str): +async def delete_discord_event(guild_id: int, event_id: str): async with AsyncClient() as client: response = await client.delete( - f"https://discord.com/api/v10/guilds/{config.DISCORD_GUILD_ID}/scheduled-events/{event_id}", + f"https://discord.com/api/v10/guilds/{guild_id}/scheduled-events/{event_id}", headers={"Authorization": f"Bot {config.DISCORD_BOT_TOKEN}"} ) @@ -90,7 +90,7 @@ class CreateDiscordEvent(BaseModel): return value.isoformat() @classmethod - def parse_from_twitch_event(cls, event: TwitchEvent) -> Self: + def parse_from_twitch_event(cls, event: TwitchEvent, channel_name: str) -> Self: if event.categories: name = f"{event.name} | {event.categories}" else: @@ -111,18 +111,17 @@ class CreateDiscordEvent(BaseModel): description=f"{event.description or ''}\n\n\n\n#{event.uid}", privacy_level=2, entity_type=3, - entity_metadata=EntityMetadata(location="https://twitch.tv/hafmc"), + entity_metadata=EntityMetadata(location=f"https://twitch.tv/{channel_name}"), scheduled_start_time=event.start_at, scheduled_end_time=event.end_at, recurrence_rule=recurrence_rule ) - -async def create_discord_event(event: CreateDiscordEvent): +async def create_discord_event(guild_id: int, event: CreateDiscordEvent): async with AsyncClient() as client: response = await client.post( - f"https://discord.com/api/v10/guilds/{config.DISCORD_GUILD_ID}/scheduled-events", + f"https://discord.com/api/v10/guilds/{guild_id}/scheduled-events", json=event.model_dump(), headers={ "Authorization": f"Bot {config.DISCORD_BOT_TOKEN}", @@ -148,10 +147,10 @@ class UpdateDiscordEvent(BaseModel): return value.isoformat() -async def edit_discord_event(event_id: str, event: UpdateDiscordEvent): +async def edit_discord_event(guild_id: int, event_id: str, event: UpdateDiscordEvent): async with AsyncClient() as client: response = await client.patch( - f"https://discord.com/api/v10/guilds/{config.DISCORD_GUILD_ID}/scheduled-events/{event_id}", + f"https://discord.com/api/v10/guilds/{guild_id}/scheduled-events/{event_id}", json=event.model_dump(), headers={ "Authorization": f"Bot {config.DISCORD_BOT_TOKEN}", diff --git a/src/services/scheduler_sync/synchronizer.py b/src/services/scheduler_sync/synchronizer.py index be2eb31..68e6d24 100644 --- a/src/services/scheduler_sync/synchronizer.py +++ b/src/services/scheduler_sync/synchronizer.py @@ -10,11 +10,15 @@ from services.scheduler_sync.discord_events import ( ) from services.scheduler_sync.comparators import compare +from config import config, TwitchConfig + logger = logging.getLogger(__name__) async def add_events( + guild_id: int, + twitch_channel_name: str, twitch_events: list[tuple[str, TwitchEvent]], discord_events: list[tuple[str, DiscordEvent]] ): @@ -22,11 +26,12 @@ async def add_events( for (uid, event) in twitch_events: if uid not in discord_events_ids: - create_event = CreateDiscordEvent.parse_from_twitch_event(event) - await create_discord_event(create_event) + create_event = CreateDiscordEvent.parse_from_twitch_event(event, twitch_channel_name) + await create_discord_event(guild_id, create_event) async def remove_events( + guild_id: int, twith_events: list[tuple[str, TwitchEvent]], discord_events: list[tuple[str, DiscordEvent]] ): @@ -34,10 +39,12 @@ async def remove_events( for (uid, event) in discord_events: if uid not in twith_events_ids: - await delete_discord_event(uid) + await delete_discord_event(guild_id, uid) async def edit_events( + guild_id: int, + twitch_channel_name: str, twith_events: list[tuple[str, TwitchEvent]], discord_events: list[tuple[str, DiscordEvent]] ): @@ -46,7 +53,7 @@ async def edit_events( if uid != discord_id: continue - create_event = CreateDiscordEvent.parse_from_twitch_event(twitch_event) + create_event = CreateDiscordEvent.parse_from_twitch_event(twitch_event, twitch_channel_name) if compare(create_event, discord_event): continue @@ -67,12 +74,12 @@ async def edit_events( update_event.recurrence_rule.start = update_event.scheduled_start_time - await edit_discord_event(discord_event.id, update_event) + await edit_discord_event(guild_id, discord_event.id, update_event) -async def syncronize(): - twitch_events = await get_twitch_events() - discord_events = await get_discord_events() +async def syncronize(twitch: TwitchConfig, discord_guild_id: int): + twitch_events = await get_twitch_events(twitch.CHANNEL_ID) + discord_events = await get_discord_events(discord_guild_id) twitch_events_with_id = [(event.uid, event) for event in twitch_events] discord_events_with_id = [ @@ -80,15 +87,19 @@ async def syncronize(): for event in discord_events ] - await add_events(twitch_events_with_id, discord_events_with_id) - await remove_events(twitch_events_with_id, discord_events_with_id) - await edit_events(twitch_events_with_id, discord_events_with_id) + await add_events(discord_guild_id, twitch.CHANNEL_NAME, twitch_events_with_id, discord_events_with_id) + await remove_events(discord_guild_id, twitch_events_with_id, discord_events_with_id) + await edit_events(discord_guild_id, twitch.CHANNEL_NAME, twitch_events_with_id, discord_events_with_id) async def start_synchronizer(): while True: try: - await syncronize() + for streamer in config.STREAMERS: + if streamer.DISCORD is None: + continue + + await syncronize(streamer.TWITCH, streamer.DISCORD.GUILD_ID) except Exception as e: logging.error(e) diff --git a/src/services/scheduler_sync/twitch_events.py b/src/services/scheduler_sync/twitch_events.py index e6a1556..6c933da 100644 --- a/src/services/scheduler_sync/twitch_events.py +++ b/src/services/scheduler_sync/twitch_events.py @@ -7,8 +7,6 @@ import icalendar from httpx import AsyncClient from pydantic import BaseModel -from config import config - class Weekday(StrEnum): Mon = "MO" @@ -45,10 +43,10 @@ class TwitchEvent(BaseModel): repeat_rule: Optional[WeeklyRepeatRule] -async def get_twitch_events() -> list[TwitchEvent]: +async def get_twitch_events(twitch_channel_id: str) -> list[TwitchEvent]: async with AsyncClient() as client: response = await client.get( - f"https://api.twitch.tv/helix/schedule/icalendar?broadcaster_id={config.TWITCH_CHANNEL_ID}" + f"https://api.twitch.tv/helix/schedule/icalendar?broadcaster_id={twitch_channel_id}" ) events: list[TwitchEvent] = [] diff --git a/src/services/twitch.py b/src/services/twitch.py index 811f871..f536c87 100644 --- a/src/services/twitch.py +++ b/src/services/twitch.py @@ -7,13 +7,13 @@ from twitchAPI.helper import first from twitchAPI.eventsub.webhook import EventSubWebhook from twitchAPI.twitch import Twitch from twitchAPI.type import AuthScope -from twitchAPI.object.eventsub import StreamOnlineEvent, StreamOfflineEvent, ChannelUpdateEvent +from twitchAPI.object.eventsub import StreamOnlineEvent, ChannelUpdateEvent import aiofiles from pydantic import BaseModel -from config import config +from config import config, StreamerConfig from services.notification import notify @@ -26,6 +26,7 @@ class State(BaseModel): last_live_at: datetime + class TokenStorage: lock = Lock() @@ -59,7 +60,7 @@ class TwitchService: def __init__(self, twitch: Twitch): self.twitch = twitch - self.state: State | None = None + self.state: dict[str, State | None] = {} @classmethod async def authorize(cls): @@ -77,30 +78,55 @@ class TwitchService: return twitch - async def notify_online(self): - if self.state is None: + def get_streamer_config(self, streamer_id: str) -> StreamerConfig: + for streamer in config.STREAMERS: + if streamer.TWITCH.CHANNEL_ID == streamer_id: + return streamer + + raise ValueError(f"Streamer with id {streamer_id} not found") + + async def notify_online(self, streamer_id: str): + current_state = self.state.get(streamer_id) + if current_state is None: raise RuntimeError("State is None") - msg = f"HafMC сейчас стримит {self.state.title} ({self.state.category})! \nПрисоединяйся: https://twitch.tv/hafmc" + streamer = self.get_streamer_config(streamer_id) - await notify(msg) + if streamer.START_STREAM_MESSAGE is None: + return - async def notify_change_category(self): - if self.state is None: + msg = streamer.START_STREAM_MESSAGE.format( + title=current_state.title, + category=current_state.category + ) + + await notify(msg, streamer) + + async def notify_change_category(self, streamer_id: str): + current_state = self.state.get(streamer_id) + + if current_state is None: raise RuntimeError("State is None") - if (datetime.now() - self.state.last_live_at).seconds > 60: + if (datetime.now() - current_state.last_live_at).seconds > 60: raise RuntimeError("State is not live") - msg = f"HafMC начал играть в {self.state.category}! \nПрисоединяйся: https://twitch.tv/hafmc" + streamer = self.get_streamer_config(streamer_id) - await notify(msg) + if streamer.CHANGE_CATEGORY_MESSAGE is None: + return - async def get_current_stream(self, retry_count: int = 5, delay: int = 5): + msg = streamer.CHANGE_CATEGORY_MESSAGE.format( + category=current_state.category + ) + + await notify(msg, streamer) + + async def get_current_stream(self, streamer_id: str, retry_count: int = 5, delay: int = 5): remain_retry = retry_count while remain_retry > 0: - stream = await first(self.twitch.get_streams(user_id=[config.TWITCH_CHANNEL_ID])) + stream = await first(self.twitch.get_streams(user_id=[streamer_id])) if stream is not None: return stream @@ -111,24 +137,29 @@ class TwitchService: return None async def on_channel_update(self, event: ChannelUpdateEvent): - stream = await self.get_current_stream() + brodcaster_id = event.event.broadcaster_user_id + + stream = await self.get_current_stream(brodcaster_id) if stream is None: return - if self.state is None: + current_state = self.state.get(brodcaster_id) + if current_state is None: return - changed = self.state.category == event.event.category_name + changed = current_state.category == event.event.category_name - self.state.title = event.event.title - self.state.category = event.event.category_name - self.state.last_live_at = datetime.now() + current_state.title = event.event.title + current_state.category = event.event.category_name + current_state.last_live_at = datetime.now() + + self.state[brodcaster_id] = current_state if changed: - await self.notify_change_category() + await self.notify_change_category(brodcaster_id) - async def _on_stream_online(self): - current_stream = await self.get_current_stream() + async def _on_stream_online(self, streamer_id: str): + current_stream = await self.get_current_stream(streamer_id) if current_stream is None: return @@ -138,13 +169,15 @@ class TwitchService: last_live_at=datetime.now() ) - if self.state is None or (datetime.now() - self.state.last_live_at).seconds >= self.ONLINE_NOTIFICATION_DELAY: - await self.notify_online() + current_state = self.state.get(streamer_id) - self.state = state + if current_state is None or (datetime.now() - current_state.last_live_at).seconds >= self.ONLINE_NOTIFICATION_DELAY: + await self.notify_online(streamer_id) + + self.state[streamer_id] = state async def on_stream_online(self, event: StreamOnlineEvent): - await self._on_stream_online() + await self._on_stream_online(event.event.broadcaster_user_id) async def run(self): eventsub = EventSubWebhook( @@ -154,13 +187,16 @@ class TwitchService: message_deduplication_history_length=50 ) - current_stream = await self.get_current_stream() - if current_stream: - self.state = State( - title=current_stream.title, - category=current_stream.game_name, - last_live_at=datetime.now() - ) + for streamer in config.STREAMERS: + current_stream = await self.get_current_stream(streamer.TWITCH.CHANNEL_ID) + if current_stream: + self.state[streamer.TWITCH.CHANNEL_ID] = State( + title=current_stream.title, + category=current_stream.game_name, + last_live_at=datetime.now() + ) + else: + self.state[streamer.TWITCH.CHANNEL_ID] = None try: await eventsub.unsubscribe_all() @@ -169,14 +205,17 @@ class TwitchService: logger.info("Subscribe to events...") - await eventsub.listen_channel_update_v2(config.TWITCH_CHANNEL_ID, self.on_channel_update) - await eventsub.listen_stream_online(config.TWITCH_CHANNEL_ID, self.on_stream_online) + for streamer in config.STREAMERS: + await eventsub.listen_channel_update_v2(streamer.TWITCH.CHANNEL_ID, self.on_channel_update) + await eventsub.listen_stream_online(streamer.TWITCH.CHANNEL_ID, self.on_stream_online) logger.info("Twitch service started") while True: await sleep(self.UPDATE_DELAY) - await self._on_stream_online() + + for streamer in config.STREAMERS: + await self._on_stream_online(streamer.TWITCH.CHANNEL_ID) finally: await eventsub.stop() await self.twitch.close()