diff --git a/pyproject.toml b/pyproject.toml index d3c9e02..ec82973 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,6 +20,7 @@ dependencies = [ "authx>=1.4.1,<2", "httpx-oauth>=0.16.1,<0.17", "uvicorn[standard]>=0.34.0,<0.35", + "temporalio>=1.10.0", ] [tool.hatch.build.targets.sdist] diff --git a/src/applications/schedule_sync/activities/__init__.py b/src/applications/schedule_sync/activities/__init__.py new file mode 100644 index 0000000..e8df4a7 --- /dev/null +++ b/src/applications/schedule_sync/activities/__init__.py @@ -0,0 +1,6 @@ +from .sync import ScheduleSyncActivity + + +__all__ = [ + "ScheduleSyncActivity", +] diff --git a/src/applications/schedule_sync/activities/sync.py b/src/applications/schedule_sync/activities/sync.py new file mode 100644 index 0000000..0b80552 --- /dev/null +++ b/src/applications/schedule_sync/activities/sync.py @@ -0,0 +1,16 @@ +from temporalio import activity + +from applications.common.repositories.streamers import StreamerConfigRepository +from applications.schedule_sync.synchronizer import syncronize + + +class ScheduleSyncActivity: + @activity.defn + @classmethod + async def syncronize(cls, twitch_id: int): + streamer = await StreamerConfigRepository.get_by_twitch_id(twitch_id) + + if streamer.integrations.discord is None: + return + + await syncronize(streamer.twitch, streamer.integrations.discord.guild_id) diff --git a/src/applications/schedule_sync/tasks.py b/src/applications/schedule_sync/tasks.py deleted file mode 100644 index e8a8b85..0000000 --- a/src/applications/schedule_sync/tasks.py +++ /dev/null @@ -1,24 +0,0 @@ -from core.broker import broker -from applications.common.repositories.streamers import StreamerConfigRepository -from .synchronizer import syncronize - - -@broker.task("scheduler_sync.syncronize_task") -async def syncronize_task(twitch_id: int): - streamer = await StreamerConfigRepository.get_by_twitch_id(twitch_id) - - if streamer.integrations.discord is None: - return - - await syncronize(streamer.twitch, streamer.integrations.discord.guild_id) - - -@broker.task("scheduler_sync.syncronize_all_task", schedule=[{"cron": "*/5 * * * *"}]) -async def syncronize_all_task(): - streamers = await StreamerConfigRepository().all() - - for streamer in streamers: - if streamer.integrations.discord is None: - continue - - await syncronize_task.kiq(streamer.twitch.id) diff --git a/src/applications/schedule_sync/workflows/__init__.py b/src/applications/schedule_sync/workflows/__init__.py new file mode 100644 index 0000000..a019a40 --- /dev/null +++ b/src/applications/schedule_sync/workflows/__init__.py @@ -0,0 +1,6 @@ +from .sync import ScheduleSyncWorkflow + + +__all__ = [ + "ScheduleSyncWorkflow" +] diff --git a/src/applications/schedule_sync/workflows/sync.py b/src/applications/schedule_sync/workflows/sync.py new file mode 100644 index 0000000..38d42e4 --- /dev/null +++ b/src/applications/schedule_sync/workflows/sync.py @@ -0,0 +1,43 @@ +from datetime import timedelta + +from temporalio import workflow +from temporalio.client import Schedule, ScheduleActionStartWorkflow, ScheduleSpec, ScheduleIntervalSpec + +from applications.common.repositories.streamers import StreamerConfigRepository + +with workflow.unsafe.imports_passed_through(): + from applications.schedule_sync.activities import ScheduleSyncActivity + + +TASK_QUEUE = "main" + + +@workflow.defn +class ScheduleSyncWorkflow: + @classmethod + def get_schedules(cls) -> dict[str, Schedule]: + return { + "all": Schedule( + action=ScheduleActionStartWorkflow( + cls.run, + id="ScheduleSyncWorkflow", + task_queue=TASK_QUEUE, + ), + spec=ScheduleSpec( + intervals=[ScheduleIntervalSpec(every=timedelta(minutes=5))] + ) + ) + } + + @workflow.run + async def run(self): + streamers = await StreamerConfigRepository().all() + + for streamer in streamers: + if streamer.integrations.discord is None: + continue + + await workflow.execute_activity_method( + ScheduleSyncActivity.syncronize, + streamer.twitch.id + ) diff --git a/src/applications/temporal_worker/__main__.py b/src/applications/temporal_worker/__main__.py new file mode 100644 index 0000000..926e3ee --- /dev/null +++ b/src/applications/temporal_worker/__main__.py @@ -0,0 +1,35 @@ +from asyncio import run + +from temporalio.client import Client +from temporalio.worker import Worker + +from applications.schedule_sync.activities import ScheduleSyncActivity +from applications.schedule_sync.workflows import ScheduleSyncWorkflow + + +TASK_QUEUE = "main" + + +async def main(): + client: Client = await Client.connect("temporal:7233", namespace="default") + + for id, schedule in ScheduleSyncWorkflow.get_schedules().items(): + await client.create_schedule( + f"ScheduleSyncWorkflow-{id}", schedule + ) + + worker: Worker = Worker( + client, + task_queue=TASK_QUEUE, + workflows=[ + ScheduleSyncWorkflow + ], + activities=[ + ScheduleSyncActivity.syncronize + ], + ) + + await worker.run() + + +run(main()) diff --git a/src/core/broker.py b/src/core/broker.py deleted file mode 100644 index 39b084f..0000000 --- a/src/core/broker.py +++ /dev/null @@ -1,22 +0,0 @@ -from taskiq import TaskiqScheduler -from taskiq.middlewares.retry_middleware import SimpleRetryMiddleware -from taskiq.schedule_sources import LabelScheduleSource -from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend - -from core.config import config - - -broker = ListQueueBroker(url=config.REDIS_URI) \ - .with_middlewares( - SimpleRetryMiddleware(default_retry_count=5) - ) \ - .with_result_backend(RedisAsyncResultBackend( - redis_url=config.REDIS_URI, - result_ex_time=60 * 60 * 24 * 7, - )) - - -scheduler = TaskiqScheduler( - broker=broker, - sources=[LabelScheduleSource(broker)], -) diff --git a/uv.lock b/uv.lock index d9b1a6e..be12747 100644 --- a/uv.lock +++ b/uv.lock @@ -283,6 +283,7 @@ dependencies = [ { name = "redis", extra = ["hiredis"] }, { name = "taskiq" }, { name = "taskiq-redis" }, + { name = "temporalio" }, { name = "twitchapi" }, { name = "uvicorn", extra = ["standard"] }, ] @@ -302,6 +303,7 @@ requires-dist = [ { name = "redis", extras = ["hiredis"], specifier = ">=5.2.1,<6" }, { name = "taskiq", specifier = ">=0.11.11,<0.12" }, { name = "taskiq-redis", specifier = ">=1.0.2,<2" }, + { name = "temporalio", specifier = ">=1.10.0" }, { name = "twitchapi", specifier = ">=4.4.0,<5" }, { name = "uvicorn", extras = ["standard"], specifier = ">=0.34.0,<0.35" }, ] @@ -719,6 +721,20 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/b8/d3/c3cb8f1d6ae3b37f83e1de806713a9b3642c5895f0215a62e1a4bd6e5e34/propcache-0.3.1-py3-none-any.whl", hash = "sha256:9a8ecf38de50a7f518c21568c80f985e776397b902f1ce0b01f799aba1608b40", size = 12376 }, ] +[[package]] +name = "protobuf" +version = "6.30.2" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/c8/8c/cf2ac658216eebe49eaedf1e06bc06cbf6a143469236294a1171a51357c3/protobuf-6.30.2.tar.gz", hash = "sha256:35c859ae076d8c56054c25b59e5e59638d86545ed6e2b6efac6be0b6ea3ba048", size = 429315 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/be/85/cd53abe6a6cbf2e0029243d6ae5fb4335da2996f6c177bb2ce685068e43d/protobuf-6.30.2-cp310-abi3-win32.whl", hash = "sha256:b12ef7df7b9329886e66404bef5e9ce6a26b54069d7f7436a0853ccdeb91c103", size = 419148 }, + { url = "https://files.pythonhosted.org/packages/97/e9/7b9f1b259d509aef2b833c29a1f3c39185e2bf21c9c1be1cd11c22cb2149/protobuf-6.30.2-cp310-abi3-win_amd64.whl", hash = "sha256:7653c99774f73fe6b9301b87da52af0e69783a2e371e8b599b3e9cb4da4b12b9", size = 431003 }, + { url = "https://files.pythonhosted.org/packages/8e/66/7f3b121f59097c93267e7f497f10e52ced7161b38295137a12a266b6c149/protobuf-6.30.2-cp39-abi3-macosx_10_9_universal2.whl", hash = "sha256:0eb523c550a66a09a0c20f86dd554afbf4d32b02af34ae53d93268c1f73bc65b", size = 417579 }, + { url = "https://files.pythonhosted.org/packages/d0/89/bbb1bff09600e662ad5b384420ad92de61cab2ed0f12ace1fd081fd4c295/protobuf-6.30.2-cp39-abi3-manylinux2014_aarch64.whl", hash = "sha256:50f32cc9fd9cb09c783ebc275611b4f19dfdfb68d1ee55d2f0c7fa040df96815", size = 317319 }, + { url = "https://files.pythonhosted.org/packages/28/50/1925de813499546bc8ab3ae857e3ec84efe7d2f19b34529d0c7c3d02d11d/protobuf-6.30.2-cp39-abi3-manylinux2014_x86_64.whl", hash = "sha256:4f6c687ae8efae6cf6093389a596548214467778146b7245e886f35e1485315d", size = 316212 }, + { url = "https://files.pythonhosted.org/packages/e5/a1/93c2acf4ade3c5b557d02d500b06798f4ed2c176fa03e3c34973ca92df7f/protobuf-6.30.2-py3-none-any.whl", hash = "sha256:ae86b030e69a98e08c77beab574cbcb9fff6d031d57209f574a5aea1445f4b51", size = 167062 }, +] + [[package]] name = "pyasn1" version = "0.4.8" @@ -1045,6 +1061,24 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/f8/1a/c4cf9d4f01d996582be78d15626d026e52584846f057e537954cb97b798f/taskiq_redis-1.0.4-py3-none-any.whl", hash = "sha256:ffc151e212cddef7ed73e41aa11b874328433510c685aaf2acee2976b757caf3", size = 19107 }, ] +[[package]] +name = "temporalio" +version = "1.10.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "protobuf" }, + { name = "types-protobuf" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/a5/8d/745ccb5da079062db8a14227052ac8d0b8a50bb41df4660fad3345a73ecc/temporalio-1.10.0.tar.gz", hash = "sha256:688400e4ca7f6b47c0ade3ebb6549e4d79b0762430ea735a0d8ff83b1ab8b8ba", size = 1418529 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/16/f5/9c75e50db0d54d7960a3571b16cc55083d7bd449d1734cbe5f25779d1469/temporalio-1.10.0-cp39-abi3-macosx_10_9_x86_64.whl", hash = "sha256:81fd40eeeba0396a7193ab5b45877301234b983aa38e444dfceecad2b3224398", size = 11032281 }, + { url = "https://files.pythonhosted.org/packages/f5/b2/ad8fc89e5f8f59c3128661a47a65687619734af0b87fbe4756460de8e00e/temporalio-1.10.0-cp39-abi3-macosx_11_0_arm64.whl", hash = "sha256:2f7bff9ac1fc832655342e9677bbee1b413b97857d1f2265f018ce72fb7758f8", size = 10796640 }, + { url = "https://files.pythonhosted.org/packages/ee/f4/a06053515ecd67e797b7dd0516bdd11c28f952a54449b4c771dcb097de00/temporalio-1.10.0-cp39-abi3-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:c3ee8416d1cab04036e03e39a4db4cf4a9a799750b8da20022e0d719da9c9371", size = 11146377 }, + { url = "https://files.pythonhosted.org/packages/f7/b4/875f10f1da52879d44fb126cd1b68186ba7f4d35589dc9f3c9ae743f65c5/temporalio-1.10.0-cp39-abi3-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:4f5805e0d33ba525ccea01e3cd9d3eca313e0c66c6b86e6cb0f15ef004c0acc0", size = 11303069 }, + { url = "https://files.pythonhosted.org/packages/21/55/a2248f3798498584133be848c0f6072b37995e201a3f93aff413f77f00cc/temporalio-1.10.0-cp39-abi3-win_amd64.whl", hash = "sha256:81cb8bef8aef6d3cc130c7cecf008cf529177a2f9cb206cfba2897db7df9d093", size = 11338952 }, +] + [[package]] name = "twitchapi" version = "4.4.0" @@ -1060,6 +1094,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/e9/b0/3041e012e59747990ec54d957ddeef1c6ca0a6120e2cecb4d6c07e573caa/twitchAPI-4.4.0-py3-none-any.whl", hash = "sha256:087939a4891a447aee5d1ab564da8ad3a984b580b0708ab9c59afd93b9a3d5da", size = 120579 }, ] +[[package]] +name = "types-protobuf" +version = "5.29.1.20250403" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/78/6d/62a2e73b966c77609560800004dd49a926920dd4976a9fdd86cf998e7048/types_protobuf-5.29.1.20250403.tar.gz", hash = "sha256:7ff44f15022119c9d7558ce16e78b2d485bf7040b4fadced4dd069bb5faf77a2", size = 59413 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/69/e3/b74dcc2797b21b39d5a4f08a8b08e20369b4ca250d718df7af41a60dd9f0/types_protobuf-5.29.1.20250403-py3-none-any.whl", hash = "sha256:c71de04106a2d54e5b2173d0a422058fae0ef2d058d70cf369fb797bf61ffa59", size = 73874 }, +] + [[package]] name = "typing-extensions" version = "4.13.1"