diff --git a/src/modules/stream_notifications/notification.py b/src/modules/stream_notifications/notification.py index eedbf62..08c379c 100644 --- a/src/modules/stream_notifications/notification.py +++ b/src/modules/stream_notifications/notification.py @@ -6,34 +6,72 @@ from core.config import config from domain.streamers import StreamerConfig from .state import State -from .sent_notifications import SentNotificationType +from .sent_notifications import SentNotification, SentNotificationType, SentResult logger = logging.getLogger(__name__) -async def notify_telegram(msg: str, chat_id: str): +async def notify_telegram(msg: str, chat_id: str) -> SentResult: async with AsyncClient() as client: - await client.post( - f"https://api.telegram.org/bot{config.TELEGRAM_BOT_TOKEN}/sendMessage", - json={ - "chat_id": chat_id, - "text": msg, - } - ) + try: + result = await client.post( + f"https://api.telegram.org/bot{config.TELEGRAM_BOT_TOKEN}/sendMessage", + json={ + "chat_id": chat_id, + "text": msg, + } + ) + + result.raise_for_status() + except Exception as e: + logger.error("Failed to notify telegram", exc_info=e) + return SentResult(success=False, message_id=None) + + if result.json()["ok"] is False: + return SentResult(success=False, message_id=None) + + return SentResult(success=True, message_id=result.json()["result"]["message_id"]) -async def notify_discord(msg: str, channel_id: str): +async def delete_telegram_message(chat_id: int, message_id: int): async with AsyncClient() as client: - await client.post( + try: + result = await client.post( + f"https://api.telegram.org/bot{config.TELEGRAM_BOT_TOKEN}/deleteMessage", + json={ + "chat_id": chat_id, + "message_id": message_id + } + ) + + result.raise_for_status() + except Exception as e: + logger.error("Failed to delete telegram message", exc_info=e) + return False + + return True + + +async def notify_discord(msg: str, channel_id: str) -> SentResult: + async with AsyncClient() as client: + try: + result = await client.post( f"https://discord.com/api/v10/channels/{channel_id}/messages", - headers={ - "Authorization": f"Bot {config.DISCORD_BOT_TOKEN}" - }, - json={ - "content": msg, - } - ) + headers={ + "Authorization": f"Bot {config.DISCORD_BOT_TOKEN}" + }, + json={ + "content": msg, + } + ) + + result.raise_for_status() + except Exception as e: + logger.error("Failed to notify discord", exc_info=e) + return SentResult(success=False, message_id=None) + + return SentResult(success=True, message_id=result.json()["id"]) def get_role_id(streamer_config: StreamerConfig, category: str) -> int | None: @@ -48,8 +86,8 @@ def get_role_id(streamer_config: StreamerConfig, category: str) -> int | None: return roles.get(category) -async def notify(notification_type: SentNotificationType, streamer_config: StreamerConfig, current_state: State) -> dict[str, bool]: - result: dict[str, bool] = {} +async def notify(notification_type: SentNotificationType, streamer_config: StreamerConfig, current_state: State) -> dict[str, SentResult]: + result: dict[str, SentResult] = {} if notification_type == SentNotificationType.START_STREAM: message_template = streamer_config.notifications.start_stream @@ -69,12 +107,7 @@ async def notify(notification_type: SentNotificationType, streamer_config: Strea role="" ) - try: - await notify_telegram(msg, str(telegram.notifications_channel_id)) - result["telegram"] = True - except Exception as e: - result["telegram"] = False - logger.error("Failed to notify telegram", exc_info=e) + result["telegram"] = await notify_telegram(msg, str(telegram.notifications_channel_id)) if (discord := integrations.discord) is not None: if discord.notifications_channel_id is not None: @@ -92,11 +125,14 @@ async def notify(notification_type: SentNotificationType, streamer_config: Strea role=role ) - try: - await notify_discord(msg, str(discord.notifications_channel_id)) - result["discord"] = True - except Exception as e: - result["discord"] = False - logger.error("Failed to notify discord", exc_info=e) + result["discord"] = await notify_discord(msg, str(discord.notifications_channel_id)) return result + + +async def delete_penultimate_notification(streamer_config: StreamerConfig, sent_notification: SentNotification): + telegram_config = streamer_config.integrations.telegram + telegram_data = sent_notification.sent_result.get("telegram") + + if telegram_data and telegram_data.message_id and telegram_config: + await delete_telegram_message(telegram_config.notifications_channel_id, int(telegram_data.message_id)) diff --git a/src/modules/stream_notifications/sent_notifications.py b/src/modules/stream_notifications/sent_notifications.py index 747cdaa..757cbfd 100644 --- a/src/modules/stream_notifications/sent_notifications.py +++ b/src/modules/stream_notifications/sent_notifications.py @@ -13,11 +13,16 @@ class SentNotificationType(StrEnum): CHANGE_CATEGORY = "change_category" +class SentResult(BaseModel): + success: bool + message_id: str | None + + class SentNotification(BaseModel): notification_type: SentNotificationType twitch_id: int state: State - sent_result: dict[str, bool] + sent_result: dict[str, SentResult] sent_at: datetime @@ -30,7 +35,7 @@ class SentNotificationRepository: twitch_id: int, notification_type: SentNotificationType, state: State, - sent_result: dict[str, bool], + sent_result: dict[str, SentResult], ): async with mongo_manager.connect() as client: db = client.get_default_database() @@ -46,6 +51,24 @@ class SentNotificationRepository: ).model_dump() ) + @classmethod + async def get_penultimate_for_streamer( + cls, twitch_id: int + ) -> SentNotification | None: + async with mongo_manager.connect() as client: + db = client.get_default_database() + collection = db[cls.COLLECTION_NAME] + + doc = await collection.find_one( + {"twitch_id": twitch_id}, + sort={"sent_at": -1}, + skip=1, + ) + if doc is None: + return None + + return SentNotification(**doc) + @classmethod async def get_last_for_streamer( cls, twitch_id: int diff --git a/src/modules/stream_notifications/watcher.py b/src/modules/stream_notifications/watcher.py index da8cdee..8ce1ff6 100644 --- a/src/modules/stream_notifications/watcher.py +++ b/src/modules/stream_notifications/watcher.py @@ -7,7 +7,7 @@ from repositories.streamers import StreamerConfigRepository from .state import State, StateManager from .sent_notifications import SentNotificationRepository, SentNotificationType -from .notification import notify +from .notification import delete_penultimate_notification, notify from .twitch.authorize import authorize @@ -41,6 +41,7 @@ class StateWatcher: streamer = await StreamerConfigRepository.get_by_twitch_id(streamer_id) sent_result = await notify(sent_notification_type, streamer, state) + await SentNotificationRepository.add( streamer.twitch.id, sent_notification_type, @@ -48,6 +49,17 @@ class StateWatcher: sent_result=sent_result ) + @classmethod + async def remove_previous_notifications(cls, streamer_id: int): + streamer = await StreamerConfigRepository.get_by_twitch_id(streamer_id) + + penultimate_notification = await SentNotificationRepository.get_penultimate_for_streamer(streamer_id) + + if penultimate_notification is None: + return + + await delete_penultimate_notification(streamer, penultimate_notification) + @classmethod async def notify_start_stream( cls, @@ -55,6 +67,7 @@ class StateWatcher: state: State ): await cls.notify_and_save(streamer_id, SentNotificationType.START_STREAM, state) + await cls.remove_previous_notifications(streamer_id) @classmethod async def notify_change_category( @@ -63,6 +76,7 @@ class StateWatcher: state: State ): await cls.notify_and_save(streamer_id, SentNotificationType.CHANGE_CATEGORY, state) + await cls.remove_previous_notifications(streamer_id) @classmethod async def _on_stream_state_change(cls, streamer_id: int, new_state: State | None = None):