Add temporal worker

This commit is contained in:
2025-04-21 20:21:58 +02:00
parent 58d3a37985
commit b18f717fae
9 changed files with 150 additions and 46 deletions

View File

@@ -0,0 +1,6 @@
from .sync import ScheduleSyncActivity
__all__ = [
"ScheduleSyncActivity",
]

View File

@@ -0,0 +1,16 @@
from temporalio import activity
from applications.common.repositories.streamers import StreamerConfigRepository
from applications.schedule_sync.synchronizer import syncronize
class ScheduleSyncActivity:
@activity.defn
@classmethod
async def syncronize(cls, twitch_id: int):
streamer = await StreamerConfigRepository.get_by_twitch_id(twitch_id)
if streamer.integrations.discord is None:
return
await syncronize(streamer.twitch, streamer.integrations.discord.guild_id)

View File

@@ -1,24 +0,0 @@
from core.broker import broker
from applications.common.repositories.streamers import StreamerConfigRepository
from .synchronizer import syncronize
@broker.task("scheduler_sync.syncronize_task")
async def syncronize_task(twitch_id: int):
streamer = await StreamerConfigRepository.get_by_twitch_id(twitch_id)
if streamer.integrations.discord is None:
return
await syncronize(streamer.twitch, streamer.integrations.discord.guild_id)
@broker.task("scheduler_sync.syncronize_all_task", schedule=[{"cron": "*/5 * * * *"}])
async def syncronize_all_task():
streamers = await StreamerConfigRepository().all()
for streamer in streamers:
if streamer.integrations.discord is None:
continue
await syncronize_task.kiq(streamer.twitch.id)

View File

@@ -0,0 +1,6 @@
from .sync import ScheduleSyncWorkflow
__all__ = [
"ScheduleSyncWorkflow"
]

View File

@@ -0,0 +1,43 @@
from datetime import timedelta
from temporalio import workflow
from temporalio.client import Schedule, ScheduleActionStartWorkflow, ScheduleSpec, ScheduleIntervalSpec
from applications.common.repositories.streamers import StreamerConfigRepository
with workflow.unsafe.imports_passed_through():
from applications.schedule_sync.activities import ScheduleSyncActivity
TASK_QUEUE = "main"
@workflow.defn
class ScheduleSyncWorkflow:
@classmethod
def get_schedules(cls) -> dict[str, Schedule]:
return {
"all": Schedule(
action=ScheduleActionStartWorkflow(
cls.run,
id="ScheduleSyncWorkflow",
task_queue=TASK_QUEUE,
),
spec=ScheduleSpec(
intervals=[ScheduleIntervalSpec(every=timedelta(minutes=5))]
)
)
}
@workflow.run
async def run(self):
streamers = await StreamerConfigRepository().all()
for streamer in streamers:
if streamer.integrations.discord is None:
continue
await workflow.execute_activity_method(
ScheduleSyncActivity.syncronize,
streamer.twitch.id
)

View File

@@ -0,0 +1,35 @@
from asyncio import run
from temporalio.client import Client
from temporalio.worker import Worker
from applications.schedule_sync.activities import ScheduleSyncActivity
from applications.schedule_sync.workflows import ScheduleSyncWorkflow
TASK_QUEUE = "main"
async def main():
client: Client = await Client.connect("temporal:7233", namespace="default")
for id, schedule in ScheduleSyncWorkflow.get_schedules().items():
await client.create_schedule(
f"ScheduleSyncWorkflow-{id}", schedule
)
worker: Worker = Worker(
client,
task_queue=TASK_QUEUE,
workflows=[
ScheduleSyncWorkflow
],
activities=[
ScheduleSyncActivity.syncronize
],
)
await worker.run()
run(main())