Compare commits

...

4 Commits

Author SHA1 Message Date
33fbc4c446 Fix 2025-04-22 13:55:43 +02:00
03039bbd09 Fix 2025-04-22 13:51:21 +02:00
6ea36f165e Fix execution timeout in ScheduleSyncWorkflow 2025-04-22 13:42:06 +02:00
7721e3a840 Fix 2025-04-22 13:03:23 +02:00
6 changed files with 36 additions and 25 deletions

View File

@@ -1,6 +1,7 @@
from .sync import syncronize from .sync import syncronize, syncronize_all
__all__ = [ __all__ = [
"syncronize", "syncronize",
"syncronize_all",
] ]

View File

@@ -6,13 +6,24 @@ from applications.schedule_sync.synchronizer import syncronize as syncronize_int
@activity.defn @activity.defn
async def syncronize(twitch_id: int): async def syncronize(twitch_id: int):
streamer = await StreamerConfigRepository.get_by_twitch_id(twitch_id)
if streamer.integrations.discord is None:
return
try: try:
streamer = await StreamerConfigRepository.get_by_twitch_id(twitch_id)
if streamer.integrations.discord is None:
return
await syncronize_internal(streamer.twitch, streamer.integrations.discord.guild_id) await syncronize_internal(streamer.twitch, streamer.integrations.discord.guild_id)
except Exception as e: except Exception as e:
activity.logger.error(f"Error during synchronization: {e}") activity.logger.error(f"Error during synchronization: {e}")
raise e raise e
@activity.defn
async def syncronize_all():
streamers = await StreamerConfigRepository().all()
for streamer in streamers:
if streamer.integrations.discord is None:
continue
await syncronize(streamer.twitch.id)

View File

@@ -3,8 +3,7 @@ from datetime import timedelta
from temporalio import workflow from temporalio import workflow
from temporalio.client import Schedule, ScheduleActionStartWorkflow, ScheduleSpec, ScheduleIntervalSpec from temporalio.client import Schedule, ScheduleActionStartWorkflow, ScheduleSpec, ScheduleIntervalSpec
from applications.common.repositories.streamers import StreamerConfigRepository from applications.schedule_sync.activities import syncronize_all
from applications.schedule_sync.activities import syncronize
from applications.temporal_worker.queues import MAIN_QUEUE from applications.temporal_worker.queues import MAIN_QUEUE
@@ -18,6 +17,7 @@ class ScheduleSyncWorkflow:
cls.run, cls.run,
id="ScheduleSyncWorkflow", id="ScheduleSyncWorkflow",
task_queue=MAIN_QUEUE, task_queue=MAIN_QUEUE,
execution_timeout=timedelta(minutes=1),
), ),
spec=ScheduleSpec( spec=ScheduleSpec(
intervals=[ScheduleIntervalSpec(every=timedelta(minutes=5))] intervals=[ScheduleIntervalSpec(every=timedelta(minutes=5))]
@@ -27,15 +27,8 @@ class ScheduleSyncWorkflow:
@workflow.run @workflow.run
async def run(self): async def run(self):
streamers = await StreamerConfigRepository().all() await workflow.execute_activity(
syncronize_all,
for streamer in streamers: task_queue=MAIN_QUEUE,
if streamer.integrations.discord is None: schedule_to_close_timeout=timedelta(minutes=5),
continue )
await workflow.start_activity(
syncronize,
streamer.twitch.id,
task_queue=MAIN_QUEUE,
schedule_to_close_timeout=timedelta(minutes=5),
)

View File

@@ -42,12 +42,13 @@ async def main():
], ],
activities=[ activities=[
schedule_sync_activities.syncronize, schedule_sync_activities.syncronize,
schedule_sync_activities.syncronize_all,
twitch_activities.on_message_activity, twitch_activities.on_message_activity,
twitch_activities.on_stream_state_change_activity, twitch_activities.on_stream_state_change_activity,
twitch_activities.check_streams_states, twitch_activities.check_streams_states,
twitch_activities.on_redemption_reward_add_activity, twitch_activities.on_redemption_reward_add_activity,
], ],
workflow_runner=UnsandboxedWorkflowRunner(), workflow_runner=UnsandboxedWorkflowRunner()
) )
await worker.run() await worker.run()

View File

@@ -1,12 +1,12 @@
from temporalio import activity from temporalio import activity
from dataclasses import dataclass
from pydantic import BaseModel
from applications.twitch_webhook.state import State, EventType from applications.twitch_webhook.state import State, EventType
from applications.twitch_webhook.watcher import StateWatcher from applications.twitch_webhook.watcher import StateWatcher
@dataclass class OnStreamStateChangeActivity(BaseModel):
class OnStreamStateChangeActivity:
streamer_id: int streamer_id: int
event_type: EventType event_type: EventType
new_state: State | None = None new_state: State | None = None

View File

@@ -1,5 +1,10 @@
from temporalio.client import Client from temporalio.client import Client
from temporalio.contrib.pydantic import pydantic_data_converter
async def get_client() -> Client: async def get_client() -> Client:
return await Client.connect("temporal:7233", namespace="default") return await Client.connect(
"temporal:7233",
namespace="default",
data_converter=pydantic_data_converter
)