From 5bea39cd2cecc39f6292ffa1b98a805edff87d7f Mon Sep 17 00:00:00 2001 From: Bulat Kurbanov Date: Mon, 21 Apr 2025 23:24:33 +0200 Subject: [PATCH] Fix --- src/applications/temporal_worker/__main__.py | 6 +- .../twitch_webhook/activities/message_proc.py | 6 +- .../activities/on_state_change.py | 40 ++++------- .../twitch_webhook/messages_proc.py | 9 ++- .../twitch_webhook/twitch/webhook.py | 68 ++++++++++++++----- .../twitch_webhook/workflows/checker.py | 3 +- .../workflows/on_channel_update.py | 37 ++++++++++ .../twitch_webhook/workflows/on_message.py | 16 +++++ .../workflows/on_reward_redemption.py | 16 +++++ .../workflows/on_stream_online.py | 19 ++++++ src/core/temporal.py | 5 ++ 11 files changed, 171 insertions(+), 54 deletions(-) create mode 100644 src/applications/twitch_webhook/workflows/on_channel_update.py create mode 100644 src/applications/twitch_webhook/workflows/on_message.py create mode 100644 src/applications/twitch_webhook/workflows/on_reward_redemption.py create mode 100644 src/applications/twitch_webhook/workflows/on_stream_online.py create mode 100644 src/core/temporal.py diff --git a/src/applications/temporal_worker/__main__.py b/src/applications/temporal_worker/__main__.py index b12e6d0..fa44d21 100644 --- a/src/applications/temporal_worker/__main__.py +++ b/src/applications/temporal_worker/__main__.py @@ -1,6 +1,8 @@ from asyncio import run -from temporalio.client import Client, ScheduleAlreadyRunningError +from core.temporal import get_client + +from temporalio.client import ScheduleAlreadyRunningError from temporalio.worker import Worker, UnsandboxedWorkflowRunner from applications.schedule_sync import activities as schedule_sync_activities @@ -10,7 +12,7 @@ from .queues import MAIN_QUEUE async def main(): - client: Client = await Client.connect("temporal:7233", namespace="default") + client = await get_client() for id, schedule in ScheduleSyncWorkflow.get_schedules().items(): try: diff --git a/src/applications/twitch_webhook/activities/message_proc.py b/src/applications/twitch_webhook/activities/message_proc.py index b267b40..79f835f 100644 --- a/src/applications/twitch_webhook/activities/message_proc.py +++ b/src/applications/twitch_webhook/activities/message_proc.py @@ -5,7 +5,9 @@ from applications.twitch_webhook.messages_proc import MessageEvent, MessagesProc @activity.defn async def on_message_activity( - received_as: str, event: MessageEvent ): - await MessagesProc.on_message(received_as, event) + await MessagesProc.on_message( + event.received_as, + event + ) diff --git a/src/applications/twitch_webhook/activities/on_state_change.py b/src/applications/twitch_webhook/activities/on_state_change.py index 8b230d1..0814d12 100644 --- a/src/applications/twitch_webhook/activities/on_state_change.py +++ b/src/applications/twitch_webhook/activities/on_state_change.py @@ -1,39 +1,23 @@ from temporalio import activity +from dataclasses import dataclass from applications.twitch_webhook.state import State, EventType from applications.twitch_webhook.watcher import StateWatcher +@dataclass +class OnStreamStateChangeActivity: + streamer_id: int + event_type: EventType + new_state: State | None = None + + @activity.defn async def on_stream_state_change_activity( - streamer_id: int, - event_type: EventType, - new_state: State | None = None + data: OnStreamStateChangeActivity ): await StateWatcher.on_stream_state_change( - streamer_id, - event_type, - new_state, + data.streamer_id, + data.event_type, + data.new_state, ) - - -# @activity.defn -# async def on_stream_state_change_with_check( -# event: UpdateEvent, -# event_type: EventType -# ): -# twitch = await authorize(event.broadcaster_user_login) - -# stream = await first(twitch.get_streams(user_id=[event.broadcaster_user_id])) -# if stream is None: -# return - -# await on_stream_state_change.kiq( -# int(event.broadcaster_user_id), -# event_type, -# State( -# title=event.title, -# category=event.category_name, -# last_live_at=datetime.now(timezone.utc) -# ) -# ) diff --git a/src/applications/twitch_webhook/messages_proc.py b/src/applications/twitch_webhook/messages_proc.py index 3b95807..e243805 100644 --- a/src/applications/twitch_webhook/messages_proc.py +++ b/src/applications/twitch_webhook/messages_proc.py @@ -39,6 +39,8 @@ class MessageType(StrEnum): class MessageEvent(BaseModel): + received_as: str + broadcaster_user_id: str broadcaster_user_name: str broadcaster_user_login: str @@ -57,8 +59,10 @@ class MessageEvent(BaseModel): channel_points_custom_reward_id: str | None @classmethod - def from_twitch_event(cls, event: ChannelChatMessageEvent): + def from_twitch_event(cls, received_as: str, event: ChannelChatMessageEvent): return cls( + received_as=received_as, + broadcaster_user_id=event.event.broadcaster_user_id, broadcaster_user_name=event.event.broadcaster_user_name, broadcaster_user_login=event.event.broadcaster_user_login, @@ -219,7 +223,6 @@ class MessagesProc: "да да, иди уже", reply_parent_message_id=event.message_id ) - return @classmethod async def _ask_ai(cls, twitch: Twitch, event: MessageEvent): @@ -251,7 +254,7 @@ class MessagesProc: thread_id=event.message_id ) except Exception as e: - logger.error("Failed to get completion: {}", e, exc_info=True) + logger.error(f"Failed to get completion: {e}", exc_info=True) await twitch.send_chat_message( event.broadcaster_user_id, diff --git a/src/applications/twitch_webhook/twitch/webhook.py b/src/applications/twitch_webhook/twitch/webhook.py index 986b2c1..581bc61 100644 --- a/src/applications/twitch_webhook/twitch/webhook.py +++ b/src/applications/twitch_webhook/twitch/webhook.py @@ -1,17 +1,23 @@ from asyncio import sleep, gather, wait, FIRST_COMPLETED, create_task import logging -from typing import NoReturn, Literal +from typing import Literal from twitchAPI.eventsub.websocket import EventSubWebsocket from twitchAPI.twitch import Twitch from twitchAPI.object.eventsub import StreamOnlineEvent, ChannelUpdateEvent, ChannelChatMessageEvent, ChannelPointsCustomRewardRedemptionAddEvent from twitchAPI.oauth import validate_token +from core.temporal import get_client + from applications.common.repositories.streamers import StreamerConfigRepository, StreamerConfig -from applications.twitch_webhook.tasks import on_stream_state_change, on_stream_state_change_with_check, on_message, on_redemption_reward_add_task from applications.twitch_webhook.state import UpdateEvent, EventType from applications.twitch_webhook.messages_proc import MessageEvent from applications.twitch_webhook.reward_redemption import RewardRedemption +from applications.twitch_webhook.workflows.on_message import OnMessageWorkflow +from applications.twitch_webhook.workflows.on_reward_redemption import OnRewardRedemptionWorkflow +from applications.twitch_webhook.workflows.on_stream_online import OnStreamOnlineWorkflow +from applications.twitch_webhook.workflows.on_channel_update import OnChannelUpdateWorkflow +from applications.temporal_worker.queues import MAIN_QUEUE from .authorize import authorize @@ -31,34 +37,60 @@ class TwitchService: self.failed = False async def on_channel_update(self, event: ChannelUpdateEvent): - await on_stream_state_change_with_check.kiq( - UpdateEvent( - broadcaster_user_id=event.event.broadcaster_user_id, - broadcaster_user_login=event.event.broadcaster_user_login, - title=event.event.title, - category_name=event.event.category_name + client = await get_client() + + await client.start_workflow( + OnChannelUpdateWorkflow.run, + args=( + UpdateEvent( + broadcaster_user_id=event.event.broadcaster_user_id, + broadcaster_user_login=event.event.broadcaster_user_login, + title=event.event.title, + category_name=event.event.category_name + ), + EventType.CHANNEL_UPDATE, ), - EventType.CHANNEL_UPDATE, + id=f"on-channel-update-{event.event.broadcaster_user_id}", + task_queue=MAIN_QUEUE ) async def on_stream_online(self, event: StreamOnlineEvent): - await on_stream_state_change.kiq( - int(event.event.broadcaster_user_id), - EventType.STREAM_ONLINE, + client = await get_client() + + await client.start_workflow( + OnStreamOnlineWorkflow.run, + args=( + int(event.event.broadcaster_user_id), + EventType.STREAM_ONLINE + ), + id=f"on-stream-online-{event.event.broadcaster_user_id}", + task_queue=MAIN_QUEUE ) async def on_channel_points_custom_reward_redemption_add( self, event: ChannelPointsCustomRewardRedemptionAddEvent ): - await on_redemption_reward_add_task( - RewardRedemption.from_twitch_event(event) + client = await get_client() + + await client.start_workflow( + OnRewardRedemptionWorkflow.run, + RewardRedemption.from_twitch_event(event), + id=f"on-reward-redemption-{event.event.broadcaster_user_id}-{event.event.reward.id}", + task_queue=MAIN_QUEUE ) async def on_message(self, event: ChannelChatMessageEvent): - await on_message.kiq( - self.streamer.twitch.name, - MessageEvent.from_twitch_event(event) + client = await get_client() + + await client.start_workflow( + OnMessageWorkflow.run, + MessageEvent.from_twitch_event( + self.streamer.twitch.name, + event + ), + id=f"on-message-{event.event.broadcaster_user_id}-{event.event.message_id}", + task_queue=MAIN_QUEUE ) async def _clean_subs(self, method: str, streamer: StreamerConfig): @@ -157,7 +189,7 @@ class TwitchService: await self.twitch.refresh_used_token() logger.info("Token refreshed") - async def run(self) -> NoReturn: + async def run(self) -> None: eventsub = EventSubWebsocket(twitch=self.twitch) try: diff --git a/src/applications/twitch_webhook/workflows/checker.py b/src/applications/twitch_webhook/workflows/checker.py index aa91971..b0b29e1 100644 --- a/src/applications/twitch_webhook/workflows/checker.py +++ b/src/applications/twitch_webhook/workflows/checker.py @@ -4,6 +4,7 @@ from temporalio import workflow from temporalio.client import Schedule, ScheduleActionStartWorkflow, ScheduleSpec, ScheduleIntervalSpec from applications.temporal_worker.queues import MAIN_QUEUE +from applications.twitch_webhook.activities.state_checker import check_streams_states workflow.defn() @@ -25,4 +26,4 @@ class StreamsCheckWorkflow: @workflow.run async def run(self): - pass + await check_streams_states() diff --git a/src/applications/twitch_webhook/workflows/on_channel_update.py b/src/applications/twitch_webhook/workflows/on_channel_update.py new file mode 100644 index 0000000..f51dc55 --- /dev/null +++ b/src/applications/twitch_webhook/workflows/on_channel_update.py @@ -0,0 +1,37 @@ +from datetime import datetime, timezone + +from temporalio import workflow + +from twitchAPI.helper import first + +from applications.twitch_webhook.state import UpdateEvent, EventType, State +from applications.twitch_webhook.twitch.authorize import authorize +from applications.twitch_webhook.activities.on_state_change import on_stream_state_change_activity, OnStreamStateChangeActivity + + +@workflow.defn +class OnChannelUpdateWorkflow: + @workflow.run + async def run( + self, + event: UpdateEvent, + event_type: EventType, + ): + twitch = await authorize(event.broadcaster_user_login) + + stream = await first(twitch.get_streams(user_id=[event.broadcaster_user_id])) + if stream is None: + return + + await workflow.start_activity( + on_stream_state_change_activity, + OnStreamStateChangeActivity( + int(event.broadcaster_user_id), + event_type, + State( + title=event.title, + category=event.category_name, + last_live_at=datetime.now(timezone.utc) + ), + ) + ) diff --git a/src/applications/twitch_webhook/workflows/on_message.py b/src/applications/twitch_webhook/workflows/on_message.py new file mode 100644 index 0000000..0f7ade2 --- /dev/null +++ b/src/applications/twitch_webhook/workflows/on_message.py @@ -0,0 +1,16 @@ +from temporalio import workflow + +from applications.twitch_webhook.messages_proc import MessageEvent +from applications.twitch_webhook.activities.message_proc import on_message_activity +from applications.temporal_worker.queues import MAIN_QUEUE + + +@workflow.defn +class OnMessageWorkflow: + @workflow.run + async def run(self, message: MessageEvent) -> None: + await workflow.start_activity( + on_message_activity, + message, + task_queue=MAIN_QUEUE, + ) diff --git a/src/applications/twitch_webhook/workflows/on_reward_redemption.py b/src/applications/twitch_webhook/workflows/on_reward_redemption.py new file mode 100644 index 0000000..f06803b --- /dev/null +++ b/src/applications/twitch_webhook/workflows/on_reward_redemption.py @@ -0,0 +1,16 @@ +from temporalio import workflow + +from applications.twitch_webhook.activities.redemption_reward import on_redemption_reward_add_activity +from applications.twitch_webhook.reward_redemption import RewardRedemption +from applications.temporal_worker.queues import MAIN_QUEUE + + +@workflow.defn +class OnRewardRedemptionWorkflow: + @workflow.run + async def run(self, reward: RewardRedemption): + await workflow.execute_activity( + on_redemption_reward_add_activity, + reward, + task_queue=MAIN_QUEUE + ) diff --git a/src/applications/twitch_webhook/workflows/on_stream_online.py b/src/applications/twitch_webhook/workflows/on_stream_online.py new file mode 100644 index 0000000..6f14fc4 --- /dev/null +++ b/src/applications/twitch_webhook/workflows/on_stream_online.py @@ -0,0 +1,19 @@ +from temporalio import workflow + +from applications.twitch_webhook.activities.on_state_change import on_stream_state_change_activity, OnStreamStateChangeActivity +from applications.twitch_webhook.state import EventType +from applications.temporal_worker.queues import MAIN_QUEUE + + +@workflow.defn +class OnStreamOnlineWorkflow: + @workflow.run + async def run(self, broadcaster_user_id: str, event_type: EventType): + await workflow.start_activity( + on_stream_state_change_activity, + OnStreamStateChangeActivity( + streamer_id=int(broadcaster_user_id), + event_type=event_type + ), + task_queue=MAIN_QUEUE + ) diff --git a/src/core/temporal.py b/src/core/temporal.py new file mode 100644 index 0000000..1baf4f9 --- /dev/null +++ b/src/core/temporal.py @@ -0,0 +1,5 @@ +from temporalio.client import Client + + +async def get_client() -> Client: + return await Client.connect("temporal:7233", namespace="default")