Compare commits

...

4 Commits

Author SHA1 Message Date
33fbc4c446 Fix 2025-04-22 13:55:43 +02:00
03039bbd09 Fix 2025-04-22 13:51:21 +02:00
6ea36f165e Fix execution timeout in ScheduleSyncWorkflow 2025-04-22 13:42:06 +02:00
7721e3a840 Fix 2025-04-22 13:03:23 +02:00
6 changed files with 36 additions and 25 deletions

View File

@@ -1,6 +1,7 @@
from .sync import syncronize
from .sync import syncronize, syncronize_all
__all__ = [
"syncronize",
"syncronize_all",
]

View File

@@ -6,13 +6,24 @@ from applications.schedule_sync.synchronizer import syncronize as syncronize_int
@activity.defn
async def syncronize(twitch_id: int):
streamer = await StreamerConfigRepository.get_by_twitch_id(twitch_id)
if streamer.integrations.discord is None:
return
try:
streamer = await StreamerConfigRepository.get_by_twitch_id(twitch_id)
if streamer.integrations.discord is None:
return
await syncronize_internal(streamer.twitch, streamer.integrations.discord.guild_id)
except Exception as e:
activity.logger.error(f"Error during synchronization: {e}")
raise e
@activity.defn
async def syncronize_all():
streamers = await StreamerConfigRepository().all()
for streamer in streamers:
if streamer.integrations.discord is None:
continue
await syncronize(streamer.twitch.id)

View File

@@ -3,8 +3,7 @@ from datetime import timedelta
from temporalio import workflow
from temporalio.client import Schedule, ScheduleActionStartWorkflow, ScheduleSpec, ScheduleIntervalSpec
from applications.common.repositories.streamers import StreamerConfigRepository
from applications.schedule_sync.activities import syncronize
from applications.schedule_sync.activities import syncronize_all
from applications.temporal_worker.queues import MAIN_QUEUE
@@ -18,6 +17,7 @@ class ScheduleSyncWorkflow:
cls.run,
id="ScheduleSyncWorkflow",
task_queue=MAIN_QUEUE,
execution_timeout=timedelta(minutes=1),
),
spec=ScheduleSpec(
intervals=[ScheduleIntervalSpec(every=timedelta(minutes=5))]
@@ -27,15 +27,8 @@ class ScheduleSyncWorkflow:
@workflow.run
async def run(self):
streamers = await StreamerConfigRepository().all()
for streamer in streamers:
if streamer.integrations.discord is None:
continue
await workflow.start_activity(
syncronize,
streamer.twitch.id,
task_queue=MAIN_QUEUE,
schedule_to_close_timeout=timedelta(minutes=5),
)
await workflow.execute_activity(
syncronize_all,
task_queue=MAIN_QUEUE,
schedule_to_close_timeout=timedelta(minutes=5),
)

View File

@@ -42,12 +42,13 @@ async def main():
],
activities=[
schedule_sync_activities.syncronize,
schedule_sync_activities.syncronize_all,
twitch_activities.on_message_activity,
twitch_activities.on_stream_state_change_activity,
twitch_activities.check_streams_states,
twitch_activities.on_redemption_reward_add_activity,
],
workflow_runner=UnsandboxedWorkflowRunner(),
workflow_runner=UnsandboxedWorkflowRunner()
)
await worker.run()

View File

@@ -1,12 +1,12 @@
from temporalio import activity
from dataclasses import dataclass
from pydantic import BaseModel
from applications.twitch_webhook.state import State, EventType
from applications.twitch_webhook.watcher import StateWatcher
@dataclass
class OnStreamStateChangeActivity:
class OnStreamStateChangeActivity(BaseModel):
streamer_id: int
event_type: EventType
new_state: State | None = None

View File

@@ -1,5 +1,10 @@
from temporalio.client import Client
from temporalio.contrib.pydantic import pydantic_data_converter
async def get_client() -> Client:
return await Client.connect("temporal:7233", namespace="default")
return await Client.connect(
"temporal:7233",
namespace="default",
data_converter=pydantic_data_converter
)