11 Commits

Author SHA1 Message Date
eddad6454d Disable messages proc 2025-04-24 17:17:45 +02:00
8ac15a1687 Fix 2025-04-23 09:48:26 +02:00
36c542b822 Fix 2025-04-22 19:04:16 +02:00
b67d00bcd7 Fix 2025-04-22 19:02:16 +02:00
81a51a3d0d Fix 2025-04-22 18:48:47 +02:00
90756c884c Fix 2025-04-22 18:45:30 +02:00
df501c27d7 Fix 2025-04-22 18:30:14 +02:00
33fbc4c446 Fix 2025-04-22 13:55:43 +02:00
03039bbd09 Fix 2025-04-22 13:51:21 +02:00
6ea36f165e Fix execution timeout in ScheduleSyncWorkflow 2025-04-22 13:42:06 +02:00
7721e3a840 Fix 2025-04-22 13:03:23 +02:00
11 changed files with 87 additions and 51 deletions

View File

@@ -1,6 +1,7 @@
from .sync import syncronize
from .sync import syncronize, syncronize_all
__all__ = [
"syncronize",
"syncronize_all",
]

View File

@@ -6,13 +6,24 @@ from applications.schedule_sync.synchronizer import syncronize as syncronize_int
@activity.defn
async def syncronize(twitch_id: int):
try:
streamer = await StreamerConfigRepository.get_by_twitch_id(twitch_id)
if streamer.integrations.discord is None:
return
try:
await syncronize_internal(streamer.twitch, streamer.integrations.discord.guild_id)
except Exception as e:
activity.logger.error(f"Error during synchronization: {e}")
raise e
@activity.defn
async def syncronize_all():
streamers = await StreamerConfigRepository().all()
for streamer in streamers:
if streamer.integrations.discord is None:
continue
await syncronize(streamer.twitch.id)

View File

@@ -3,8 +3,7 @@ from datetime import timedelta
from temporalio import workflow
from temporalio.client import Schedule, ScheduleActionStartWorkflow, ScheduleSpec, ScheduleIntervalSpec
from applications.common.repositories.streamers import StreamerConfigRepository
from applications.schedule_sync.activities import syncronize
from applications.schedule_sync.activities import syncronize_all
from applications.temporal_worker.queues import MAIN_QUEUE
@@ -18,6 +17,7 @@ class ScheduleSyncWorkflow:
cls.run,
id="ScheduleSyncWorkflow",
task_queue=MAIN_QUEUE,
execution_timeout=timedelta(minutes=1),
),
spec=ScheduleSpec(
intervals=[ScheduleIntervalSpec(every=timedelta(minutes=5))]
@@ -27,15 +27,8 @@ class ScheduleSyncWorkflow:
@workflow.run
async def run(self):
streamers = await StreamerConfigRepository().all()
for streamer in streamers:
if streamer.integrations.discord is None:
continue
await workflow.start_activity(
syncronize,
streamer.twitch.id,
await workflow.execute_activity(
syncronize_all,
task_queue=MAIN_QUEUE,
schedule_to_close_timeout=timedelta(minutes=5),
)

View File

@@ -42,12 +42,14 @@ async def main():
],
activities=[
schedule_sync_activities.syncronize,
schedule_sync_activities.syncronize_all,
twitch_activities.on_message_activity,
twitch_activities.on_stream_state_change_activity,
twitch_activities.check_streams_states,
twitch_activities.on_redemption_reward_add_activity,
twitch_activities.on_channel_update_activity,
],
workflow_runner=UnsandboxedWorkflowRunner(),
workflow_runner=UnsandboxedWorkflowRunner()
)
await worker.run()

View File

@@ -1,5 +1,5 @@
from .message_proc import on_message_activity
from .on_state_change import on_stream_state_change_activity
from .on_state_change import on_stream_state_change_activity, on_channel_update_activity
from .redemption_reward import on_redemption_reward_add_activity
from .state_checker import check_streams_states
@@ -9,4 +9,5 @@ __all__ = [
"on_stream_state_change_activity",
"check_streams_states",
"on_redemption_reward_add_activity",
"on_channel_update_activity",
]

View File

@@ -1,12 +1,17 @@
from datetime import datetime, timezone
from temporalio import activity
from dataclasses import dataclass
from applications.twitch_webhook.state import State, EventType
from pydantic import BaseModel
from twitchAPI.helper import first
from applications.twitch_webhook.state import State, EventType, UpdateEvent
from applications.twitch_webhook.watcher import StateWatcher
from applications.twitch_webhook.twitch.authorize import authorize
@dataclass
class OnStreamStateChangeActivity:
class OnStreamStateChangeActivity(BaseModel):
streamer_id: int
event_type: EventType
new_state: State | None = None
@@ -21,3 +26,33 @@ async def on_stream_state_change_activity(
data.event_type,
data.new_state,
)
class OnChannelUpdateActivity(BaseModel):
event: UpdateEvent
event_type: EventType
@activity.defn
async def on_channel_update_activity(
data: OnChannelUpdateActivity
):
twitch = await authorize(data.event.broadcaster_user_login)
stream = await first(twitch.get_streams(
user_id=[data.event.broadcaster_user_id])
)
if stream is None:
return
await on_stream_state_change_activity(
OnStreamStateChangeActivity(
streamer_id=int(data.event.broadcaster_user_id),
event_type=data.event_type,
new_state=State(
title=data.event.title,
category=data.event.category_name,
last_live_at=datetime.now(timezone.utc)
),
)
)

View File

@@ -308,6 +308,8 @@ class MessagesProc:
@classmethod
async def on_message(cls, received_as: str, event: MessageEvent):
return
if event.chatter_user_name in cls.FULL_IGNORED_USER_LOGINS:
return

View File

@@ -1,13 +1,10 @@
from datetime import datetime, timezone, timedelta
from datetime import timedelta
from temporalio import workflow
from twitchAPI.helper import first
from applications.twitch_webhook.state import UpdateEvent, EventType, State
from applications.twitch_webhook.twitch.authorize import authorize
from applications.twitch_webhook.activities.on_state_change import on_stream_state_change_activity, OnStreamStateChangeActivity
from applications.temporal_worker.queues import MAIN_QUEUE
from applications.twitch_webhook.activities.on_state_change import OnChannelUpdateActivity, on_channel_update_activity
from applications.twitch_webhook.state import UpdateEvent, EventType
@workflow.defn
@@ -18,23 +15,12 @@ class OnChannelUpdateWorkflow:
event: UpdateEvent,
event_type: EventType,
):
twitch = await authorize(event.broadcaster_user_login)
stream = await first(twitch.get_streams(user_id=[event.broadcaster_user_id]))
if stream is None:
return
await workflow.start_activity(
on_stream_state_change_activity,
OnStreamStateChangeActivity(
int(event.broadcaster_user_id),
event_type,
State(
title=event.title,
category=event.category_name,
last_live_at=datetime.now(timezone.utc)
),
on_channel_update_activity,
OnChannelUpdateActivity(
event=event,
event_type=event_type,
),
task_queue=MAIN_QUEUE,
schedule_to_close_timeout=timedelta(minutes=1)
start_to_close_timeout=timedelta(minutes=1),
)

View File

@@ -15,5 +15,5 @@ class OnMessageWorkflow:
on_message_activity,
message,
task_queue=MAIN_QUEUE,
schedule_to_close_timeout=timedelta(minutes=1)
schedule_to_close_timeout=timedelta(minutes=5)
)

View File

@@ -10,7 +10,7 @@ from applications.temporal_worker.queues import MAIN_QUEUE
@workflow.defn
class OnStreamOnlineWorkflow:
@workflow.run
async def run(self, broadcaster_user_id: str, event_type: EventType):
async def run(self, broadcaster_user_id: str | int, event_type: EventType):
await workflow.start_activity(
on_stream_state_change_activity,
OnStreamStateChangeActivity(

View File

@@ -1,5 +1,10 @@
from temporalio.client import Client
from temporalio.contrib.pydantic import pydantic_data_converter
async def get_client() -> Client:
return await Client.connect("temporal:7233", namespace="default")
return await Client.connect(
"temporal:7233",
namespace="default",
data_converter=pydantic_data_converter
)