This commit is contained in:
2025-04-22 19:02:16 +02:00
parent 81a51a3d0d
commit b67d00bcd7
4 changed files with 47 additions and 24 deletions

View File

@@ -47,6 +47,7 @@ async def main():
twitch_activities.on_stream_state_change_activity,
twitch_activities.check_streams_states,
twitch_activities.on_redemption_reward_add_activity,
twitch_activities.on_stream_state_change_activity,
],
workflow_runner=UnsandboxedWorkflowRunner()
)

View File

@@ -1,5 +1,5 @@
from .message_proc import on_message_activity
from .on_state_change import on_stream_state_change_activity
from .on_state_change import on_stream_state_change_activity, on_channel_update_activity
from .redemption_reward import on_redemption_reward_add_activity
from .state_checker import check_streams_states
@@ -9,4 +9,5 @@ __all__ = [
"on_stream_state_change_activity",
"check_streams_states",
"on_redemption_reward_add_activity",
"on_channel_update_activity",
]

View File

@@ -1,9 +1,14 @@
from datetime import datetime, timezone
from temporalio import activity
from pydantic import BaseModel
from applications.twitch_webhook.state import State, EventType
from twitchAPI.helper import first
from applications.twitch_webhook.state import State, EventType, UpdateEvent
from applications.twitch_webhook.watcher import StateWatcher
from applications.twitch_webhook.twitch.authorize import authorize
class OnStreamStateChangeActivity(BaseModel):
@@ -21,3 +26,33 @@ async def on_stream_state_change_activity(
data.event_type,
data.new_state,
)
class OnChannelUpdateActivity(BaseModel):
event: UpdateEvent
event_type: EventType
@activity.defn
async def on_channel_update_activity(
data: OnChannelUpdateActivity
):
twitch = await authorize(data.event.broadcaster_user_login)
stream = await first(twitch.get_streams(
user_id=[data.event.broadcaster_user_id])
)
if stream is None:
return
await on_stream_state_change_activity(
OnStreamStateChangeActivity(
streamer_id=int(data.event.broadcaster_user_id),
event_type=data.event_type,
new_state=State(
title=data.event.title,
category=data.event.category_name,
last_live_at=datetime.now(timezone.utc)
),
)
)

View File

@@ -1,13 +1,10 @@
from datetime import datetime, timezone, timedelta
from datetime import timedelta
from temporalio import workflow
from twitchAPI.helper import first
from applications.twitch_webhook.state import UpdateEvent, EventType, State
from applications.twitch_webhook.twitch.authorize import authorize
from applications.twitch_webhook.activities.on_state_change import on_stream_state_change_activity, OnStreamStateChangeActivity
from applications.temporal_worker.queues import MAIN_QUEUE
from applications.twitch_webhook.activities.on_state_change import OnChannelUpdateActivity, on_channel_update_activity
from applications.twitch_webhook.state import UpdateEvent, EventType
@workflow.defn
@@ -18,23 +15,12 @@ class OnChannelUpdateWorkflow:
event: UpdateEvent,
event_type: EventType,
):
twitch = await authorize(event.broadcaster_user_login)
stream = await first(twitch.get_streams(user_id=[event.broadcaster_user_id]))
if stream is None:
return
await workflow.start_activity(
on_stream_state_change_activity,
OnStreamStateChangeActivity(
int(event.broadcaster_user_id),
event_type,
State(
title=event.title,
category=event.category_name,
last_live_at=datetime.now(timezone.utc)
),
on_channel_update_activity,
OnChannelUpdateActivity(
event=event,
event_type=event_type,
),
task_queue=MAIN_QUEUE,
schedule_to_close_timeout=timedelta(minutes=1)
start_to_close_timeout=timedelta(minutes=1),
)