This commit is contained in:
2024-11-18 00:22:51 +01:00
parent 5d675ecbaf
commit 6cc83f2f45
3 changed files with 23 additions and 8 deletions

View File

@@ -1,8 +1,11 @@
from core.broker import broker from core.broker import broker
from .state import State
from .watcher import StateWatcher from .watcher import StateWatcher
@broker.task("stream_notifications.twitch.on_stream_state_change") @broker.task("stream_notifications.twitch.on_stream_state_change")
async def on_stream_state_change(streamer_id: int): async def on_stream_state_change(
await StateWatcher.on_stream_state_change(streamer_id) streamer_id: int, new_state: State | None = None
):
await StateWatcher.on_stream_state_change(streamer_id, new_state)

View File

@@ -1,17 +1,18 @@
from asyncio import sleep, gather from asyncio import sleep, gather
from datetime import datetime, timezone
import logging import logging
from typing import NoReturn, Literal from typing import NoReturn, Literal
from twitchAPI.eventsub.webhook import EventSubWebhook from twitchAPI.eventsub.webhook import EventSubWebhook
from twitchAPI.twitch import Twitch from twitchAPI.twitch import Twitch
from twitchAPI.object.eventsub import StreamOnlineEvent, ChannelUpdateEvent from twitchAPI.object.eventsub import StreamOnlineEvent, ChannelUpdateEvent
from twitchAPI.type import EventSubSubscriptionConflict
from core.config import config from core.config import config
from repositories.streamers import StreamerConfigRepository, StreamerConfig from repositories.streamers import StreamerConfigRepository, StreamerConfig
from modules.stream_notifications.tasks import on_stream_state_change from modules.stream_notifications.tasks import on_stream_state_change
from .authorize import authorize from .authorize import authorize
from ..state import State
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -24,7 +25,14 @@ class TwitchService:
self.twitch = twitch self.twitch = twitch
async def on_channel_update(self, event: ChannelUpdateEvent): async def on_channel_update(self, event: ChannelUpdateEvent):
await on_stream_state_change.kiq(int(event.event.broadcaster_user_id)) await on_stream_state_change.kiq(
int(event.event.broadcaster_user_id),
State(
title=event.event.title,
category=event.event.category_name,
last_live_at=datetime.now(timezone.utc)
)
)
async def on_stream_online(self, event: StreamOnlineEvent): async def on_stream_online(self, event: StreamOnlineEvent):
await on_stream_state_change.kiq(int(event.event.broadcaster_user_id)) await on_stream_state_change.kiq(int(event.event.broadcaster_user_id))

View File

@@ -65,8 +65,12 @@ class StateWatcher:
await cls.notify_and_save(streamer_id, SentNotificationType.CHANGE_CATEGORY, state) await cls.notify_and_save(streamer_id, SentNotificationType.CHANGE_CATEGORY, state)
@classmethod @classmethod
async def _on_stream_state_change(cls, streamer_id: int): async def _on_stream_state_change(cls, streamer_id: int, new_state: State | None = None):
if new_state is not None:
current_state = new_state
else:
current_state = await cls.get_twitch_state(streamer_id) current_state = await cls.get_twitch_state(streamer_id)
if current_state is None: if current_state is None:
return return
@@ -89,7 +93,7 @@ class StateWatcher:
await StateManager.update(streamer_id, current_state) await StateManager.update(streamer_id, current_state)
@classmethod @classmethod
async def on_stream_state_change(cls, streamer_id: int): async def on_stream_state_change(cls, streamer_id: int, new_state: State | None = None):
async with redis_manager.connect() as redis: async with redis_manager.connect() as redis:
async with redis.lock(f"on_stream_state_change:{streamer_id}"): async with redis.lock(f"on_stream_state_change:{streamer_id}"):
await cls._on_stream_state_change(streamer_id) await cls._on_stream_state_change(streamer_id, new_state)