diff --git a/src/modules/stream_notifications/state.py b/src/modules/stream_notifications/state.py index c44aa1b..b8ed394 100644 --- a/src/modules/stream_notifications/state.py +++ b/src/modules/stream_notifications/state.py @@ -1,4 +1,5 @@ from datetime import datetime +from enum import StrEnum from pydantic import BaseModel @@ -11,6 +12,12 @@ class State(BaseModel): last_live_at: datetime + def __eq__(self, value: object) -> bool: + if not isinstance(value, State): + return False + + return self.title == value.title and self.category == value.category + class UpdateEvent(BaseModel): broadcaster_user_id: str @@ -18,6 +25,12 @@ class UpdateEvent(BaseModel): category_name: str +class EventType(StrEnum): + STREAM_ONLINE = "stream.online" + CHANNEL_UPDATE = "channel.update" + UNKNOWN = "unknown" + + class StateManager: COLLECTION_NAME = "stream_twitch_state" diff --git a/src/modules/stream_notifications/tasks.py b/src/modules/stream_notifications/tasks.py index 2d69aef..34e0b9e 100644 --- a/src/modules/stream_notifications/tasks.py +++ b/src/modules/stream_notifications/tasks.py @@ -5,7 +5,7 @@ from twitchAPI.helper import first from core.broker import broker from repositories.streamers import StreamerConfigRepository -from .state import State, UpdateEvent +from .state import State, UpdateEvent, EventType from .watcher import StateWatcher from .twitch.authorize import authorize @@ -14,7 +14,10 @@ from .twitch.authorize import authorize "stream_notifications.twitch.on_stream_state_change_with_check", retry_on_error=True ) -async def on_stream_state_change_with_check(event: UpdateEvent): +async def on_stream_state_change_with_check( + event: UpdateEvent, + event_type: EventType +): twitch = await authorize() stream = await first(twitch.get_streams(user_id=[event.broadcaster_user_id])) @@ -23,6 +26,7 @@ async def on_stream_state_change_with_check(event: UpdateEvent): await on_stream_state_change.kiq( int(event.broadcaster_user_id), + event_type, State( title=event.title, category=event.category_name, @@ -36,9 +40,15 @@ async def on_stream_state_change_with_check(event: UpdateEvent): retry_on_error=True ) async def on_stream_state_change( - streamer_id: int, new_state: State | None = None + streamer_id: int, + event_type: EventType, + new_state: State | None = None ): - await StateWatcher.on_stream_state_change(streamer_id, new_state) + await StateWatcher.on_stream_state_change( + streamer_id, + event_type, + new_state, + ) @broker.task( @@ -58,4 +68,8 @@ async def check_streams_states(): last_live_at=datetime.now(timezone.utc) ) - await StateWatcher.on_stream_state_change(int(stream.user_id), state) + await StateWatcher.on_stream_state_change( + int(stream.user_id), + EventType.UNKNOWN, + state + ) diff --git a/src/modules/stream_notifications/twitch/webhook.py b/src/modules/stream_notifications/twitch/webhook.py index b89bda5..1b032c5 100644 --- a/src/modules/stream_notifications/twitch/webhook.py +++ b/src/modules/stream_notifications/twitch/webhook.py @@ -10,7 +10,7 @@ from twitchAPI.oauth import validate_token from core.config import config from repositories.streamers import StreamerConfigRepository, StreamerConfig from modules.stream_notifications.tasks import on_stream_state_change, on_stream_state_change_with_check -from modules.stream_notifications.state import UpdateEvent +from modules.stream_notifications.state import UpdateEvent, EventType from .authorize import authorize @@ -26,14 +26,20 @@ class TwitchService: self.failed = False async def on_channel_update(self, event: ChannelUpdateEvent): - await on_stream_state_change_with_check.kiq(UpdateEvent( - broadcaster_user_id=event.event.broadcaster_user_id, - title=event.event.title, - category_name=event.event.category_name - )) + await on_stream_state_change_with_check.kiq( + UpdateEvent( + broadcaster_user_id=event.event.broadcaster_user_id, + title=event.event.title, + category_name=event.event.category_name + ), + EventType.CHANNEL_UPDATE, + ) async def on_stream_online(self, event: StreamOnlineEvent): - await on_stream_state_change.kiq(int(event.event.broadcaster_user_id)) + await on_stream_state_change.kiq( + int(event.event.broadcaster_user_id), + EventType.STREAM_ONLINE, + ) async def subscribe_with_retry( self, diff --git a/src/modules/stream_notifications/watcher.py b/src/modules/stream_notifications/watcher.py index 8ce1ff6..e665467 100644 --- a/src/modules/stream_notifications/watcher.py +++ b/src/modules/stream_notifications/watcher.py @@ -5,7 +5,7 @@ from twitchAPI.helper import first from core.redis import redis_manager from repositories.streamers import StreamerConfigRepository -from .state import State, StateManager +from .state import State, StateManager, EventType from .sent_notifications import SentNotificationRepository, SentNotificationType from .notification import delete_penultimate_notification, notify from .twitch.authorize import authorize @@ -79,7 +79,12 @@ class StateWatcher: await cls.remove_previous_notifications(streamer_id) @classmethod - async def _on_stream_state_change(cls, streamer_id: int, new_state: State | None = None): + async def _on_stream_state_change( + cls, + streamer_id: int, + event_type: EventType, + new_state: State | None = None + ): if new_state is not None: current_state = new_state else: @@ -94,12 +99,15 @@ class StateWatcher: await StateManager.update(streamer_id, current_state) return - if datetime.now(timezone.utc) - last_state.last_live_at > cls.START_STREAM_THRESHOLD: + if ( + event_type == EventType.STREAM_ONLINE and + datetime.now(timezone.utc) - last_state.last_live_at >= cls.START_STREAM_THRESHOLD + ): await cls.notify_start_stream(streamer_id, current_state) await StateManager.update(streamer_id, current_state) return - if last_state.category != current_state.category: + if last_state != current_state: await cls.notify_change_category(streamer_id, current_state) await StateManager.update(streamer_id, current_state) return @@ -107,7 +115,12 @@ class StateWatcher: await StateManager.update(streamer_id, current_state) @classmethod - async def on_stream_state_change(cls, streamer_id: int, new_state: State | None = None): + async def on_stream_state_change( + cls, + streamer_id: int, + event_type: EventType, + new_state: State | None = None + ): async with redis_manager.connect() as redis: async with redis.lock(f"on_stream_state_change:{streamer_id}"): - await cls._on_stream_state_change(streamer_id, new_state) + await cls._on_stream_state_change(streamer_id, event_type, new_state)