Add deleting penultimate notifications

This commit is contained in:
2024-12-29 22:08:57 +01:00
parent 29e4113d8f
commit 13c1b4aad4
3 changed files with 108 additions and 35 deletions

View File

@@ -6,34 +6,72 @@ from core.config import config
from domain.streamers import StreamerConfig from domain.streamers import StreamerConfig
from .state import State from .state import State
from .sent_notifications import SentNotificationType from .sent_notifications import SentNotification, SentNotificationType, SentResult
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
async def notify_telegram(msg: str, chat_id: str): async def notify_telegram(msg: str, chat_id: str) -> SentResult:
async with AsyncClient() as client: async with AsyncClient() as client:
await client.post( try:
f"https://api.telegram.org/bot{config.TELEGRAM_BOT_TOKEN}/sendMessage", result = await client.post(
json={ f"https://api.telegram.org/bot{config.TELEGRAM_BOT_TOKEN}/sendMessage",
"chat_id": chat_id, json={
"text": msg, "chat_id": chat_id,
} "text": msg,
) }
)
result.raise_for_status()
except Exception as e:
logger.error("Failed to notify telegram", exc_info=e)
return SentResult(success=False, message_id=None)
if result.json()["ok"] is False:
return SentResult(success=False, message_id=None)
return SentResult(success=True, message_id=result.json()["result"]["message_id"])
async def notify_discord(msg: str, channel_id: str): async def delete_telegram_message(chat_id: int, message_id: int):
async with AsyncClient() as client: async with AsyncClient() as client:
await client.post( try:
result = await client.post(
f"https://api.telegram.org/bot{config.TELEGRAM_BOT_TOKEN}/deleteMessage",
json={
"chat_id": chat_id,
"message_id": message_id
}
)
result.raise_for_status()
except Exception as e:
logger.error("Failed to delete telegram message", exc_info=e)
return False
return True
async def notify_discord(msg: str, channel_id: str) -> SentResult:
async with AsyncClient() as client:
try:
result = await client.post(
f"https://discord.com/api/v10/channels/{channel_id}/messages", f"https://discord.com/api/v10/channels/{channel_id}/messages",
headers={ headers={
"Authorization": f"Bot {config.DISCORD_BOT_TOKEN}" "Authorization": f"Bot {config.DISCORD_BOT_TOKEN}"
}, },
json={ json={
"content": msg, "content": msg,
} }
) )
result.raise_for_status()
except Exception as e:
logger.error("Failed to notify discord", exc_info=e)
return SentResult(success=False, message_id=None)
return SentResult(success=True, message_id=result.json()["id"])
def get_role_id(streamer_config: StreamerConfig, category: str) -> int | None: def get_role_id(streamer_config: StreamerConfig, category: str) -> int | None:
@@ -48,8 +86,8 @@ def get_role_id(streamer_config: StreamerConfig, category: str) -> int | None:
return roles.get(category) return roles.get(category)
async def notify(notification_type: SentNotificationType, streamer_config: StreamerConfig, current_state: State) -> dict[str, bool]: async def notify(notification_type: SentNotificationType, streamer_config: StreamerConfig, current_state: State) -> dict[str, SentResult]:
result: dict[str, bool] = {} result: dict[str, SentResult] = {}
if notification_type == SentNotificationType.START_STREAM: if notification_type == SentNotificationType.START_STREAM:
message_template = streamer_config.notifications.start_stream message_template = streamer_config.notifications.start_stream
@@ -69,12 +107,7 @@ async def notify(notification_type: SentNotificationType, streamer_config: Strea
role="" role=""
) )
try: result["telegram"] = await notify_telegram(msg, str(telegram.notifications_channel_id))
await notify_telegram(msg, str(telegram.notifications_channel_id))
result["telegram"] = True
except Exception as e:
result["telegram"] = False
logger.error("Failed to notify telegram", exc_info=e)
if (discord := integrations.discord) is not None: if (discord := integrations.discord) is not None:
if discord.notifications_channel_id is not None: if discord.notifications_channel_id is not None:
@@ -92,11 +125,14 @@ async def notify(notification_type: SentNotificationType, streamer_config: Strea
role=role role=role
) )
try: result["discord"] = await notify_discord(msg, str(discord.notifications_channel_id))
await notify_discord(msg, str(discord.notifications_channel_id))
result["discord"] = True
except Exception as e:
result["discord"] = False
logger.error("Failed to notify discord", exc_info=e)
return result return result
async def delete_penultimate_notification(streamer_config: StreamerConfig, sent_notification: SentNotification):
telegram_config = streamer_config.integrations.telegram
telegram_data = sent_notification.sent_result.get("telegram")
if telegram_data and telegram_data.message_id and telegram_config:
await delete_telegram_message(telegram_config.notifications_channel_id, int(telegram_data.message_id))

View File

@@ -13,11 +13,16 @@ class SentNotificationType(StrEnum):
CHANGE_CATEGORY = "change_category" CHANGE_CATEGORY = "change_category"
class SentResult(BaseModel):
success: bool
message_id: str | None
class SentNotification(BaseModel): class SentNotification(BaseModel):
notification_type: SentNotificationType notification_type: SentNotificationType
twitch_id: int twitch_id: int
state: State state: State
sent_result: dict[str, bool] sent_result: dict[str, SentResult]
sent_at: datetime sent_at: datetime
@@ -30,7 +35,7 @@ class SentNotificationRepository:
twitch_id: int, twitch_id: int,
notification_type: SentNotificationType, notification_type: SentNotificationType,
state: State, state: State,
sent_result: dict[str, bool], sent_result: dict[str, SentResult],
): ):
async with mongo_manager.connect() as client: async with mongo_manager.connect() as client:
db = client.get_default_database() db = client.get_default_database()
@@ -46,6 +51,24 @@ class SentNotificationRepository:
).model_dump() ).model_dump()
) )
@classmethod
async def get_penultimate_for_streamer(
cls, twitch_id: int
) -> SentNotification | None:
async with mongo_manager.connect() as client:
db = client.get_default_database()
collection = db[cls.COLLECTION_NAME]
doc = await collection.find_one(
{"twitch_id": twitch_id},
sort={"sent_at": -1},
skip=1,
)
if doc is None:
return None
return SentNotification(**doc)
@classmethod @classmethod
async def get_last_for_streamer( async def get_last_for_streamer(
cls, twitch_id: int cls, twitch_id: int

View File

@@ -7,7 +7,7 @@ from repositories.streamers import StreamerConfigRepository
from .state import State, StateManager from .state import State, StateManager
from .sent_notifications import SentNotificationRepository, SentNotificationType from .sent_notifications import SentNotificationRepository, SentNotificationType
from .notification import notify from .notification import delete_penultimate_notification, notify
from .twitch.authorize import authorize from .twitch.authorize import authorize
@@ -41,6 +41,7 @@ class StateWatcher:
streamer = await StreamerConfigRepository.get_by_twitch_id(streamer_id) streamer = await StreamerConfigRepository.get_by_twitch_id(streamer_id)
sent_result = await notify(sent_notification_type, streamer, state) sent_result = await notify(sent_notification_type, streamer, state)
await SentNotificationRepository.add( await SentNotificationRepository.add(
streamer.twitch.id, streamer.twitch.id,
sent_notification_type, sent_notification_type,
@@ -48,6 +49,17 @@ class StateWatcher:
sent_result=sent_result sent_result=sent_result
) )
@classmethod
async def remove_previous_notifications(cls, streamer_id: int):
streamer = await StreamerConfigRepository.get_by_twitch_id(streamer_id)
penultimate_notification = await SentNotificationRepository.get_penultimate_for_streamer(streamer_id)
if penultimate_notification is None:
return
await delete_penultimate_notification(streamer, penultimate_notification)
@classmethod @classmethod
async def notify_start_stream( async def notify_start_stream(
cls, cls,
@@ -55,6 +67,7 @@ class StateWatcher:
state: State state: State
): ):
await cls.notify_and_save(streamer_id, SentNotificationType.START_STREAM, state) await cls.notify_and_save(streamer_id, SentNotificationType.START_STREAM, state)
await cls.remove_previous_notifications(streamer_id)
@classmethod @classmethod
async def notify_change_category( async def notify_change_category(
@@ -63,6 +76,7 @@ class StateWatcher:
state: State state: State
): ):
await cls.notify_and_save(streamer_id, SentNotificationType.CHANGE_CATEGORY, state) await cls.notify_and_save(streamer_id, SentNotificationType.CHANGE_CATEGORY, state)
await cls.remove_previous_notifications(streamer_id)
@classmethod @classmethod
async def _on_stream_state_change(cls, streamer_id: int, new_state: State | None = None): async def _on_stream_state_change(cls, streamer_id: int, new_state: State | None = None):