19 Commits

Author SHA1 Message Date
eddad6454d Disable messages proc 2025-04-24 17:17:45 +02:00
8ac15a1687 Fix 2025-04-23 09:48:26 +02:00
36c542b822 Fix 2025-04-22 19:04:16 +02:00
b67d00bcd7 Fix 2025-04-22 19:02:16 +02:00
81a51a3d0d Fix 2025-04-22 18:48:47 +02:00
90756c884c Fix 2025-04-22 18:45:30 +02:00
df501c27d7 Fix 2025-04-22 18:30:14 +02:00
33fbc4c446 Fix 2025-04-22 13:55:43 +02:00
03039bbd09 Fix 2025-04-22 13:51:21 +02:00
6ea36f165e Fix execution timeout in ScheduleSyncWorkflow 2025-04-22 13:42:06 +02:00
7721e3a840 Fix 2025-04-22 13:03:23 +02:00
1c5b7e81e8 Migrate to python 3.12 2025-04-22 01:51:36 +02:00
586359f8ce Fix 2025-04-22 01:32:12 +02:00
12dd0f9af7 Fix 2025-04-22 01:22:37 +02:00
4174d67084 Fix 2025-04-22 01:17:42 +02:00
2d8b767ae3 Revert to python 3.13 2025-04-22 00:56:38 +02:00
a0a53800ab Migrate to python3.11 2025-04-22 00:51:14 +02:00
959ab30265 Revert "Fix"
This reverts commit 560a5ac793.
2025-04-22 00:49:53 +02:00
560a5ac793 Fix 2025-04-22 00:42:21 +02:00
13 changed files with 94 additions and 50 deletions

View File

@@ -1,4 +1,4 @@
FROM ghcr.io/astral-sh/uv:python3.13-bookworm-slim FROM ghcr.io/astral-sh/uv:python3.12-bookworm-slim
WORKDIR /app WORKDIR /app
COPY ./pyproject.toml ./uv.lock ./ COPY ./pyproject.toml ./uv.lock ./

View File

@@ -1,6 +1,7 @@
from .sync import syncronize from .sync import syncronize, syncronize_all
__all__ = [ __all__ = [
"syncronize", "syncronize",
"syncronize_all",
] ]

View File

@@ -6,9 +6,24 @@ from applications.schedule_sync.synchronizer import syncronize as syncronize_int
@activity.defn @activity.defn
async def syncronize(twitch_id: int): async def syncronize(twitch_id: int):
streamer = await StreamerConfigRepository.get_by_twitch_id(twitch_id) try:
streamer = await StreamerConfigRepository.get_by_twitch_id(twitch_id)
if streamer.integrations.discord is None: if streamer.integrations.discord is None:
return return
await syncronize_internal(streamer.twitch, streamer.integrations.discord.guild_id) await syncronize_internal(streamer.twitch, streamer.integrations.discord.guild_id)
except Exception as e:
activity.logger.error(f"Error during synchronization: {e}")
raise e
@activity.defn
async def syncronize_all():
streamers = await StreamerConfigRepository().all()
for streamer in streamers:
if streamer.integrations.discord is None:
continue
await syncronize(streamer.twitch.id)

View File

@@ -3,8 +3,7 @@ from datetime import timedelta
from temporalio import workflow from temporalio import workflow
from temporalio.client import Schedule, ScheduleActionStartWorkflow, ScheduleSpec, ScheduleIntervalSpec from temporalio.client import Schedule, ScheduleActionStartWorkflow, ScheduleSpec, ScheduleIntervalSpec
from applications.common.repositories.streamers import StreamerConfigRepository from applications.schedule_sync.activities import syncronize_all
from applications.schedule_sync.activities import syncronize
from applications.temporal_worker.queues import MAIN_QUEUE from applications.temporal_worker.queues import MAIN_QUEUE
@@ -18,6 +17,7 @@ class ScheduleSyncWorkflow:
cls.run, cls.run,
id="ScheduleSyncWorkflow", id="ScheduleSyncWorkflow",
task_queue=MAIN_QUEUE, task_queue=MAIN_QUEUE,
execution_timeout=timedelta(minutes=1),
), ),
spec=ScheduleSpec( spec=ScheduleSpec(
intervals=[ScheduleIntervalSpec(every=timedelta(minutes=5))] intervals=[ScheduleIntervalSpec(every=timedelta(minutes=5))]
@@ -27,14 +27,8 @@ class ScheduleSyncWorkflow:
@workflow.run @workflow.run
async def run(self): async def run(self):
streamers = await StreamerConfigRepository().all() await workflow.execute_activity(
syncronize_all,
for streamer in streamers: task_queue=MAIN_QUEUE,
if streamer.integrations.discord is None: schedule_to_close_timeout=timedelta(minutes=5),
continue )
await workflow.start_activity(
syncronize,
streamer.twitch.id,
schedule_to_close_timeout=timedelta(minutes=5),
)

View File

@@ -42,12 +42,14 @@ async def main():
], ],
activities=[ activities=[
schedule_sync_activities.syncronize, schedule_sync_activities.syncronize,
schedule_sync_activities.syncronize_all,
twitch_activities.on_message_activity, twitch_activities.on_message_activity,
twitch_activities.on_stream_state_change_activity, twitch_activities.on_stream_state_change_activity,
twitch_activities.check_streams_states, twitch_activities.check_streams_states,
twitch_activities.on_redemption_reward_add_activity, twitch_activities.on_redemption_reward_add_activity,
twitch_activities.on_channel_update_activity,
], ],
workflow_runner=UnsandboxedWorkflowRunner(), workflow_runner=UnsandboxedWorkflowRunner()
) )
await worker.run() await worker.run()

View File

@@ -1,5 +1,5 @@
from .message_proc import on_message_activity from .message_proc import on_message_activity
from .on_state_change import on_stream_state_change_activity from .on_state_change import on_stream_state_change_activity, on_channel_update_activity
from .redemption_reward import on_redemption_reward_add_activity from .redemption_reward import on_redemption_reward_add_activity
from .state_checker import check_streams_states from .state_checker import check_streams_states
@@ -9,4 +9,5 @@ __all__ = [
"on_stream_state_change_activity", "on_stream_state_change_activity",
"check_streams_states", "check_streams_states",
"on_redemption_reward_add_activity", "on_redemption_reward_add_activity",
"on_channel_update_activity",
] ]

View File

@@ -1,12 +1,17 @@
from datetime import datetime, timezone
from temporalio import activity from temporalio import activity
from dataclasses import dataclass
from applications.twitch_webhook.state import State, EventType from pydantic import BaseModel
from twitchAPI.helper import first
from applications.twitch_webhook.state import State, EventType, UpdateEvent
from applications.twitch_webhook.watcher import StateWatcher from applications.twitch_webhook.watcher import StateWatcher
from applications.twitch_webhook.twitch.authorize import authorize
@dataclass class OnStreamStateChangeActivity(BaseModel):
class OnStreamStateChangeActivity:
streamer_id: int streamer_id: int
event_type: EventType event_type: EventType
new_state: State | None = None new_state: State | None = None
@@ -21,3 +26,33 @@ async def on_stream_state_change_activity(
data.event_type, data.event_type,
data.new_state, data.new_state,
) )
class OnChannelUpdateActivity(BaseModel):
event: UpdateEvent
event_type: EventType
@activity.defn
async def on_channel_update_activity(
data: OnChannelUpdateActivity
):
twitch = await authorize(data.event.broadcaster_user_login)
stream = await first(twitch.get_streams(
user_id=[data.event.broadcaster_user_id])
)
if stream is None:
return
await on_stream_state_change_activity(
OnStreamStateChangeActivity(
streamer_id=int(data.event.broadcaster_user_id),
event_type=data.event_type,
new_state=State(
title=data.event.title,
category=data.event.category_name,
last_live_at=datetime.now(timezone.utc)
),
)
)

View File

@@ -308,6 +308,8 @@ class MessagesProc:
@classmethod @classmethod
async def on_message(cls, received_as: str, event: MessageEvent): async def on_message(cls, received_as: str, event: MessageEvent):
return
if event.chatter_user_name in cls.FULL_IGNORED_USER_LOGINS: if event.chatter_user_name in cls.FULL_IGNORED_USER_LOGINS:
return return

View File

@@ -28,5 +28,6 @@ class StreamsCheckWorkflow:
async def run(self): async def run(self):
await workflow.start_activity( await workflow.start_activity(
check_streams_states, check_streams_states,
task_queue=MAIN_QUEUE,
schedule_to_close_timeout=timedelta(minutes=1) schedule_to_close_timeout=timedelta(minutes=1)
) )

View File

@@ -1,12 +1,10 @@
from datetime import datetime, timezone, timedelta from datetime import timedelta
from temporalio import workflow from temporalio import workflow
from twitchAPI.helper import first from applications.temporal_worker.queues import MAIN_QUEUE
from applications.twitch_webhook.activities.on_state_change import OnChannelUpdateActivity, on_channel_update_activity
from applications.twitch_webhook.state import UpdateEvent, EventType, State from applications.twitch_webhook.state import UpdateEvent, EventType
from applications.twitch_webhook.twitch.authorize import authorize
from applications.twitch_webhook.activities.on_state_change import on_stream_state_change_activity, OnStreamStateChangeActivity
@workflow.defn @workflow.defn
@@ -17,22 +15,12 @@ class OnChannelUpdateWorkflow:
event: UpdateEvent, event: UpdateEvent,
event_type: EventType, event_type: EventType,
): ):
twitch = await authorize(event.broadcaster_user_login)
stream = await first(twitch.get_streams(user_id=[event.broadcaster_user_id]))
if stream is None:
return
await workflow.start_activity( await workflow.start_activity(
on_stream_state_change_activity, on_channel_update_activity,
OnStreamStateChangeActivity( OnChannelUpdateActivity(
int(event.broadcaster_user_id), event=event,
event_type, event_type=event_type,
State(
title=event.title,
category=event.category_name,
last_live_at=datetime.now(timezone.utc)
),
), ),
schedule_to_close_timeout=timedelta(minutes=1) task_queue=MAIN_QUEUE,
start_to_close_timeout=timedelta(minutes=1),
) )

View File

@@ -15,5 +15,5 @@ class OnMessageWorkflow:
on_message_activity, on_message_activity,
message, message,
task_queue=MAIN_QUEUE, task_queue=MAIN_QUEUE,
schedule_to_close_timeout=timedelta(minutes=1) schedule_to_close_timeout=timedelta(minutes=5)
) )

View File

@@ -10,7 +10,7 @@ from applications.temporal_worker.queues import MAIN_QUEUE
@workflow.defn @workflow.defn
class OnStreamOnlineWorkflow: class OnStreamOnlineWorkflow:
@workflow.run @workflow.run
async def run(self, broadcaster_user_id: str, event_type: EventType): async def run(self, broadcaster_user_id: str | int, event_type: EventType):
await workflow.start_activity( await workflow.start_activity(
on_stream_state_change_activity, on_stream_state_change_activity,
OnStreamStateChangeActivity( OnStreamStateChangeActivity(

View File

@@ -1,5 +1,10 @@
from temporalio.client import Client from temporalio.client import Client
from temporalio.contrib.pydantic import pydantic_data_converter
async def get_client() -> Client: async def get_client() -> Client:
return await Client.connect("temporal:7233", namespace="default") return await Client.connect(
"temporal:7233",
namespace="default",
data_converter=pydantic_data_converter
)