From 203169a9cc54fd058ce4bd575c3f0139ec0b9572 Mon Sep 17 00:00:00 2001 From: Bulat Kurbanov Date: Sun, 17 Nov 2024 12:58:08 +0100 Subject: [PATCH] move event sync to taskiq --- src/main.py | 2 -- src/modules/scheduler_sync/__init__.py | 7 ------- src/modules/scheduler_sync/synchronizer.py | 20 ------------------ src/modules/scheduler_sync/tasks.py | 24 ++++++++++++++++++++++ 4 files changed, 24 insertions(+), 29 deletions(-) create mode 100644 src/modules/scheduler_sync/tasks.py diff --git a/src/main.py b/src/main.py index e44aaf8..7482c14 100644 --- a/src/main.py +++ b/src/main.py @@ -2,7 +2,6 @@ from asyncio import wait, create_task import logging from modules.games_list import start as start_games_list_module -from modules.scheduler_sync import start as start_scheduler_sync_module from modules.stream_notifications import start as start_stream_notifications_module from core.mongo import mongo_manager @@ -21,7 +20,6 @@ async def main(): await wait([ create_task(start_games_list_module()), - create_task(start_scheduler_sync_module()), create_task(start_stream_notifications_module()) ], return_when="FIRST_COMPLETED") diff --git a/src/modules/scheduler_sync/__init__.py b/src/modules/scheduler_sync/__init__.py index 620aedc..e69de29 100644 --- a/src/modules/scheduler_sync/__init__.py +++ b/src/modules/scheduler_sync/__init__.py @@ -1,7 +0,0 @@ -from .synchronizer import start_synchronizer - - -start = start_synchronizer - - -__all__ = ["start"] diff --git a/src/modules/scheduler_sync/synchronizer.py b/src/modules/scheduler_sync/synchronizer.py index cf42857..46d54b6 100644 --- a/src/modules/scheduler_sync/synchronizer.py +++ b/src/modules/scheduler_sync/synchronizer.py @@ -1,9 +1,7 @@ -from asyncio import sleep import logging from datetime import datetime from domain.streamers import TwitchConfig -from repositories.streamers import StreamerConfigRepository from .twitch_events import get_twitch_events, TwitchEvent from .discord_events import ( @@ -95,21 +93,3 @@ async def syncronize(twitch: TwitchConfig, discord_guild_id: int): await add_events(discord_guild_id, twitch.name, twitch_events_with_id, discord_events_with_id) await remove_events(discord_guild_id, twitch_events_with_id, discord_events_with_id) await edit_events(discord_guild_id, twitch.name, twitch_events_with_id, discord_events_with_id) - - -async def start_synchronizer(): - logger.info("Starting events syncronizer...") - - while True: - try: - streamers = await StreamerConfigRepository().all() - - for streamer in streamers: - if (integration := streamer.integrations.discord) is None: - continue - - await syncronize(streamer.twitch, integration.guild_id) - except Exception as e: - logging.error(e) - - await sleep(5 * 60) diff --git a/src/modules/scheduler_sync/tasks.py b/src/modules/scheduler_sync/tasks.py new file mode 100644 index 0000000..7de13d4 --- /dev/null +++ b/src/modules/scheduler_sync/tasks.py @@ -0,0 +1,24 @@ +from core.broker import broker +from repositories.streamers import StreamerConfigRepository +from .synchronizer import syncronize + + +@broker.task() +async def syncronize_task(twitch_id: int): + streamer = await StreamerConfigRepository.get_by_twitch_id(twitch_id) + + if streamer.integrations.discord is None: + return + + await syncronize(streamer.twitch, streamer.integrations.discord.guild_id) + + +@broker.task(schedule=[{"cron": "*/5 * * * *"}]) +async def syncronize_all_task(): + streamers = await StreamerConfigRepository().all() + + for streamer in streamers: + if streamer.integrations.discord is None: + continue + + await syncronize_task.kiq(streamer.twitch.id)