Update stream notifications

This commit is contained in:
2025-01-12 19:39:03 +01:00
parent aebe64829a
commit b015907778
4 changed files with 64 additions and 18 deletions

View File

@@ -1,4 +1,5 @@
from datetime import datetime from datetime import datetime
from enum import StrEnum
from pydantic import BaseModel from pydantic import BaseModel
@@ -11,6 +12,12 @@ class State(BaseModel):
last_live_at: datetime last_live_at: datetime
def __eq__(self, value: object) -> bool:
if not isinstance(value, State):
return False
return self.title == value.title and self.category == value.category
class UpdateEvent(BaseModel): class UpdateEvent(BaseModel):
broadcaster_user_id: str broadcaster_user_id: str
@@ -18,6 +25,12 @@ class UpdateEvent(BaseModel):
category_name: str category_name: str
class EventType(StrEnum):
STREAM_ONLINE = "stream.online"
CHANNEL_UPDATE = "channel.update"
UNKNOWN = "unknown"
class StateManager: class StateManager:
COLLECTION_NAME = "stream_twitch_state" COLLECTION_NAME = "stream_twitch_state"

View File

@@ -5,7 +5,7 @@ from twitchAPI.helper import first
from core.broker import broker from core.broker import broker
from repositories.streamers import StreamerConfigRepository from repositories.streamers import StreamerConfigRepository
from .state import State, UpdateEvent from .state import State, UpdateEvent, EventType
from .watcher import StateWatcher from .watcher import StateWatcher
from .twitch.authorize import authorize from .twitch.authorize import authorize
@@ -14,7 +14,10 @@ from .twitch.authorize import authorize
"stream_notifications.twitch.on_stream_state_change_with_check", "stream_notifications.twitch.on_stream_state_change_with_check",
retry_on_error=True retry_on_error=True
) )
async def on_stream_state_change_with_check(event: UpdateEvent): async def on_stream_state_change_with_check(
event: UpdateEvent,
event_type: EventType
):
twitch = await authorize() twitch = await authorize()
stream = await first(twitch.get_streams(user_id=[event.broadcaster_user_id])) stream = await first(twitch.get_streams(user_id=[event.broadcaster_user_id]))
@@ -23,6 +26,7 @@ async def on_stream_state_change_with_check(event: UpdateEvent):
await on_stream_state_change.kiq( await on_stream_state_change.kiq(
int(event.broadcaster_user_id), int(event.broadcaster_user_id),
event_type,
State( State(
title=event.title, title=event.title,
category=event.category_name, category=event.category_name,
@@ -36,9 +40,15 @@ async def on_stream_state_change_with_check(event: UpdateEvent):
retry_on_error=True retry_on_error=True
) )
async def on_stream_state_change( async def on_stream_state_change(
streamer_id: int, new_state: State | None = None streamer_id: int,
event_type: EventType,
new_state: State | None = None
): ):
await StateWatcher.on_stream_state_change(streamer_id, new_state) await StateWatcher.on_stream_state_change(
streamer_id,
event_type,
new_state,
)
@broker.task( @broker.task(
@@ -58,4 +68,8 @@ async def check_streams_states():
last_live_at=datetime.now(timezone.utc) last_live_at=datetime.now(timezone.utc)
) )
await StateWatcher.on_stream_state_change(int(stream.user_id), state) await StateWatcher.on_stream_state_change(
int(stream.user_id),
EventType.UNKNOWN,
state
)

View File

@@ -10,7 +10,7 @@ from twitchAPI.oauth import validate_token
from core.config import config from core.config import config
from repositories.streamers import StreamerConfigRepository, StreamerConfig from repositories.streamers import StreamerConfigRepository, StreamerConfig
from modules.stream_notifications.tasks import on_stream_state_change, on_stream_state_change_with_check from modules.stream_notifications.tasks import on_stream_state_change, on_stream_state_change_with_check
from modules.stream_notifications.state import UpdateEvent from modules.stream_notifications.state import UpdateEvent, EventType
from .authorize import authorize from .authorize import authorize
@@ -26,14 +26,20 @@ class TwitchService:
self.failed = False self.failed = False
async def on_channel_update(self, event: ChannelUpdateEvent): async def on_channel_update(self, event: ChannelUpdateEvent):
await on_stream_state_change_with_check.kiq(UpdateEvent( await on_stream_state_change_with_check.kiq(
UpdateEvent(
broadcaster_user_id=event.event.broadcaster_user_id, broadcaster_user_id=event.event.broadcaster_user_id,
title=event.event.title, title=event.event.title,
category_name=event.event.category_name category_name=event.event.category_name
)) ),
EventType.CHANNEL_UPDATE,
)
async def on_stream_online(self, event: StreamOnlineEvent): async def on_stream_online(self, event: StreamOnlineEvent):
await on_stream_state_change.kiq(int(event.event.broadcaster_user_id)) await on_stream_state_change.kiq(
int(event.event.broadcaster_user_id),
EventType.STREAM_ONLINE,
)
async def subscribe_with_retry( async def subscribe_with_retry(
self, self,

View File

@@ -5,7 +5,7 @@ from twitchAPI.helper import first
from core.redis import redis_manager from core.redis import redis_manager
from repositories.streamers import StreamerConfigRepository from repositories.streamers import StreamerConfigRepository
from .state import State, StateManager from .state import State, StateManager, EventType
from .sent_notifications import SentNotificationRepository, SentNotificationType from .sent_notifications import SentNotificationRepository, SentNotificationType
from .notification import delete_penultimate_notification, notify from .notification import delete_penultimate_notification, notify
from .twitch.authorize import authorize from .twitch.authorize import authorize
@@ -79,7 +79,12 @@ class StateWatcher:
await cls.remove_previous_notifications(streamer_id) await cls.remove_previous_notifications(streamer_id)
@classmethod @classmethod
async def _on_stream_state_change(cls, streamer_id: int, new_state: State | None = None): async def _on_stream_state_change(
cls,
streamer_id: int,
event_type: EventType,
new_state: State | None = None
):
if new_state is not None: if new_state is not None:
current_state = new_state current_state = new_state
else: else:
@@ -94,12 +99,15 @@ class StateWatcher:
await StateManager.update(streamer_id, current_state) await StateManager.update(streamer_id, current_state)
return return
if datetime.now(timezone.utc) - last_state.last_live_at > cls.START_STREAM_THRESHOLD: if (
event_type == EventType.STREAM_ONLINE and
datetime.now(timezone.utc) - last_state.last_live_at >= cls.START_STREAM_THRESHOLD
):
await cls.notify_start_stream(streamer_id, current_state) await cls.notify_start_stream(streamer_id, current_state)
await StateManager.update(streamer_id, current_state) await StateManager.update(streamer_id, current_state)
return return
if last_state.category != current_state.category: if last_state != current_state:
await cls.notify_change_category(streamer_id, current_state) await cls.notify_change_category(streamer_id, current_state)
await StateManager.update(streamer_id, current_state) await StateManager.update(streamer_id, current_state)
return return
@@ -107,7 +115,12 @@ class StateWatcher:
await StateManager.update(streamer_id, current_state) await StateManager.update(streamer_id, current_state)
@classmethod @classmethod
async def on_stream_state_change(cls, streamer_id: int, new_state: State | None = None): async def on_stream_state_change(
cls,
streamer_id: int,
event_type: EventType,
new_state: State | None = None
):
async with redis_manager.connect() as redis: async with redis_manager.connect() as redis:
async with redis.lock(f"on_stream_state_change:{streamer_id}"): async with redis.lock(f"on_stream_state_change:{streamer_id}"):
await cls._on_stream_state_change(streamer_id, new_state) await cls._on_stream_state_change(streamer_id, event_type, new_state)