From 29e4113d8f80c548c28a0b3e57b44a5a841d3882 Mon Sep 17 00:00:00 2001 From: Bulat Kurbanov Date: Sat, 28 Dec 2024 12:42:20 +0100 Subject: [PATCH] Fix --- poetry.lock | 8 +++---- pyproject.toml | 2 +- src/modules/stream_notifications/tasks.py | 24 +++++++++++++++++++ .../stream_notifications/twitch/webhook.py | 24 ++----------------- 4 files changed, 31 insertions(+), 27 deletions(-) diff --git a/poetry.lock b/poetry.lock index 6547117..efb0ae1 100644 --- a/poetry.lock +++ b/poetry.lock @@ -184,13 +184,13 @@ tests-mypy = ["mypy (>=1.11.1)", "pytest-mypy-plugins"] [[package]] name = "authx" -version = "1.4.0" +version = "1.4.1" description = "Ready to use and customizable Authentications and Oauth2 management for FastAPI" optional = false python-versions = ">=3.9" files = [ - {file = "authx-1.4.0-py3-none-any.whl", hash = "sha256:9a95790e755b8bb037aa79756e178a1b8fd41a563d0b74b87ff023e5bd0fda91"}, - {file = "authx-1.4.0.tar.gz", hash = "sha256:bcd7ec8b604003ff11353785905f59fafe95dafbdb568e3740ed8b7cfa4ca444"}, + {file = "authx-1.4.1-py3-none-any.whl", hash = "sha256:4d42082230125e446fd5240b9531af7726d7d69370bfc5aedd6fafeb9df6ffc9"}, + {file = "authx-1.4.1.tar.gz", hash = "sha256:06ddad3f80ebe32ce7d97adc8915c1f64b389298849f0058a713f3b0c2c1e64a"}, ] [package.dependencies] @@ -1987,4 +1987,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.0" python-versions = "^3.11" -content-hash = "049480947b7bfd2da1651623fca542de6435159f20f7485947fe11e620193a5d" +content-hash = "f51f1ef0de414e9f58f94c9fdedf996f5606b4c48988ff16777ae0849f11dac3" diff --git a/pyproject.toml b/pyproject.toml index bf95fcc..229e544 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,7 +20,7 @@ taskiq = "^0.11.10" taskiq-redis = "^1.0.2" redis = {extras = ["hiredis"], version = "^5.2.1"} fastapi = "^0.115.6" -authx = "^1.4.0" +authx = "^1.4.1" httpx-oauth = "^0.16.1" uvicorn = {extras = ["standard"], version = "^0.34.0"} diff --git a/src/modules/stream_notifications/tasks.py b/src/modules/stream_notifications/tasks.py index f716098..18e6e7b 100644 --- a/src/modules/stream_notifications/tasks.py +++ b/src/modules/stream_notifications/tasks.py @@ -1,5 +1,8 @@ from datetime import datetime, timezone +from twitchAPI.helper import first +from twitchAPI.object.eventsub import ChannelUpdateEvent + from core.broker import broker from repositories.streamers import StreamerConfigRepository @@ -8,6 +11,27 @@ from .watcher import StateWatcher from .twitch.authorize import authorize +@broker.task( + "stream_notifications.twitch.on_stream_state_change_with_check", + retry_on_error=True +) +async def on_stream_state_change_with_check(event: ChannelUpdateEvent): + twitch = await authorize() + + stream = await first(twitch.get_streams(user_id=[event.event.broadcaster_user_id])) + if stream is None: + return + + await on_stream_state_change.kiq( + int(event.event.broadcaster_user_id), + State( + title=event.event.title, + category=event.event.category_name, + last_live_at=datetime.now(timezone.utc) + ) + ) + + @broker.task( "stream_notifications.twitch.on_stream_state_change", retry_on_error=True diff --git a/src/modules/stream_notifications/twitch/webhook.py b/src/modules/stream_notifications/twitch/webhook.py index 54f3eda..e28aeed 100644 --- a/src/modules/stream_notifications/twitch/webhook.py +++ b/src/modules/stream_notifications/twitch/webhook.py @@ -1,20 +1,17 @@ from asyncio import sleep, gather -from datetime import datetime, timezone import logging from typing import NoReturn, Literal from twitchAPI.eventsub.webhook import EventSubWebhook from twitchAPI.twitch import Twitch -from twitchAPI.helper import first from twitchAPI.object.eventsub import StreamOnlineEvent, ChannelUpdateEvent from twitchAPI.oauth import validate_token from core.config import config from repositories.streamers import StreamerConfigRepository, StreamerConfig -from modules.stream_notifications.tasks import on_stream_state_change, check_streams_states +from modules.stream_notifications.tasks import on_stream_state_change, on_stream_state_change_with_check from .authorize import authorize -from ..state import State logger = logging.getLogger(__name__) @@ -29,24 +26,7 @@ class TwitchService: self.failed = False async def on_channel_update(self, event: ChannelUpdateEvent): - try: - stream = await first(self.twitch.get_streams(user_id=[event.event.broadcaster_user_id])) - except RuntimeError as e: - await check_streams_states.kiq() - self.failed = True - raise e - - if stream is None: - return - - await on_stream_state_change.kiq( - int(event.event.broadcaster_user_id), - State( - title=event.event.title, - category=event.event.category_name, - last_live_at=datetime.now(timezone.utc) - ) - ) + await on_stream_state_change_with_check.kiq(event) async def on_stream_online(self, event: StreamOnlineEvent): await on_stream_state_change.kiq(int(event.event.broadcaster_user_id))