mirror of
https://github.com/kurbezz/discord-bot.git
synced 2025-12-06 15:15:37 +01:00
Refactor stream notifier
This commit is contained in:
33
src/core/redis.py
Normal file
33
src/core/redis.py
Normal file
@@ -0,0 +1,33 @@
|
||||
import contextlib
|
||||
|
||||
from redis.asyncio import from_url
|
||||
|
||||
from core.config import config
|
||||
|
||||
|
||||
def create_redis_pool():
|
||||
return from_url(config.REDIS_URI)
|
||||
|
||||
|
||||
class RedisSessionManager:
|
||||
def __init__(self):
|
||||
self.pool = None
|
||||
|
||||
async def init(self):
|
||||
self.pool = await create_redis_pool()
|
||||
|
||||
async def close(self):
|
||||
if self.pool is not None:
|
||||
await self.pool.close()
|
||||
|
||||
@contextlib.asynccontextmanager
|
||||
async def connect(self):
|
||||
if self.pool is None:
|
||||
await self.init()
|
||||
|
||||
assert self.pool is not None
|
||||
|
||||
yield self.pool
|
||||
|
||||
|
||||
redis_manager = RedisSessionManager()
|
||||
@@ -5,6 +5,7 @@ from modules.games_list import start as start_games_list_module
|
||||
from modules.stream_notifications import start as start_stream_notifications_module
|
||||
|
||||
from core.mongo import mongo_manager
|
||||
from core.redis import redis_manager
|
||||
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
@@ -17,6 +18,7 @@ async def main():
|
||||
logger.info("Starting services...")
|
||||
|
||||
await mongo_manager.init()
|
||||
await redis_manager.init()
|
||||
|
||||
await wait([
|
||||
create_task(start_games_list_module()),
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
from .twitch.twitch import start_twitch_service
|
||||
from .twitch.webhook import start_twitch_service
|
||||
|
||||
|
||||
start = start_twitch_service
|
||||
|
||||
@@ -6,7 +6,8 @@ from httpx import AsyncClient
|
||||
from core.config import config
|
||||
from domain.streamers import StreamerConfig
|
||||
|
||||
from .twitch.state import State
|
||||
from .state import State
|
||||
from .sent_notifications import SentNotificationType
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -48,14 +49,16 @@ def get_role_id(streamer_config: StreamerConfig, category: str) -> int | None:
|
||||
return roles.get(category)
|
||||
|
||||
|
||||
async def notify(notification_type: Literal["start"] | Literal["change_category"], streamer_config: StreamerConfig, current_state: State):
|
||||
if notification_type == "start":
|
||||
async def notify(notification_type: SentNotificationType, streamer_config: StreamerConfig, current_state: State) -> dict[str, bool]:
|
||||
result: dict[str, bool] = {}
|
||||
|
||||
if notification_type == SentNotificationType.START_STREAM:
|
||||
message_template = streamer_config.notifications.start_stream
|
||||
else:
|
||||
message_template = streamer_config.notifications.change_category
|
||||
|
||||
if message_template is None:
|
||||
return
|
||||
return result
|
||||
|
||||
integrations = streamer_config.integrations
|
||||
|
||||
@@ -69,7 +72,9 @@ async def notify(notification_type: Literal["start"] | Literal["change_category"
|
||||
|
||||
try:
|
||||
await notify_telegram(msg, str(telegram.notifications_channel_id))
|
||||
result["telegram"] = True
|
||||
except Exception as e:
|
||||
result["telegram"] = False
|
||||
logger.error("Failed to notify telegram", exc_info=e)
|
||||
|
||||
if (discord := integrations.discord) is not None:
|
||||
@@ -90,5 +95,9 @@ async def notify(notification_type: Literal["start"] | Literal["change_category"
|
||||
|
||||
try:
|
||||
await notify_discord(msg, str(discord.notifications_channel_id))
|
||||
result["discord"] = True
|
||||
except Exception as e:
|
||||
result["discord"] = False
|
||||
logger.error("Failed to notify discord", exc_info=e)
|
||||
|
||||
return result
|
||||
|
||||
64
src/modules/stream_notifications/sent_notifications.py
Normal file
64
src/modules/stream_notifications/sent_notifications.py
Normal file
@@ -0,0 +1,64 @@
|
||||
from enum import StrEnum
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from core.mongo import mongo_manager
|
||||
|
||||
from .state import State
|
||||
|
||||
|
||||
class SentNotificationType(StrEnum):
|
||||
START_STREAM = "start_stream"
|
||||
CHANGE_CATEGORY = "change_category"
|
||||
|
||||
|
||||
class SentNotification(BaseModel):
|
||||
notification_type: SentNotificationType
|
||||
twitch_id: int
|
||||
state: State
|
||||
sent_result: dict[str, bool]
|
||||
sent_at: datetime
|
||||
|
||||
|
||||
class SentNotificationRepository:
|
||||
COLLECTION_NAME = "sent_notifications"
|
||||
|
||||
@classmethod
|
||||
async def add(
|
||||
cls,
|
||||
twitch_id: int,
|
||||
notification_type: SentNotificationType,
|
||||
state: State,
|
||||
sent_result: dict[str, bool],
|
||||
):
|
||||
async with mongo_manager.connect() as client:
|
||||
db = client.get_default_database()
|
||||
collection = db[cls.COLLECTION_NAME]
|
||||
|
||||
await collection.insert_one(
|
||||
SentNotification(
|
||||
notification_type=notification_type,
|
||||
twitch_id=twitch_id,
|
||||
state=state,
|
||||
sent_at=datetime.now(timezone.utc),
|
||||
sent_result=sent_result,
|
||||
).model_dump()
|
||||
)
|
||||
|
||||
@classmethod
|
||||
async def get_last_for_streamer(
|
||||
cls, twitch_id: int
|
||||
) -> SentNotification | None:
|
||||
async with mongo_manager.connect() as client:
|
||||
db = client.get_default_database()
|
||||
collection = db[cls.COLLECTION_NAME]
|
||||
|
||||
doc = await collection.find_one(
|
||||
{"twitch_id": twitch_id},
|
||||
sort={"sent_at": -1},
|
||||
)
|
||||
if doc is None:
|
||||
return None
|
||||
|
||||
return SentNotification(**doc)
|
||||
40
src/modules/stream_notifications/state.py
Normal file
40
src/modules/stream_notifications/state.py
Normal file
@@ -0,0 +1,40 @@
|
||||
from datetime import datetime
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from core.mongo import mongo_manager
|
||||
|
||||
|
||||
class State(BaseModel):
|
||||
title: str
|
||||
category: str
|
||||
|
||||
last_live_at: datetime
|
||||
|
||||
|
||||
class StateManager:
|
||||
COLLECTION_NAME = "stream_twitch_state"
|
||||
|
||||
@classmethod
|
||||
async def get(cls, twitch_id: int) -> State | None:
|
||||
async with mongo_manager.connect() as client:
|
||||
db = client.get_default_database()
|
||||
collection = db[cls.COLLECTION_NAME]
|
||||
|
||||
state = await collection.find_one({"twitch_id": twitch_id})
|
||||
if state is None:
|
||||
return None
|
||||
|
||||
return State(**state)
|
||||
|
||||
@classmethod
|
||||
async def update(cls, twitch_id: int, state: State):
|
||||
async with mongo_manager.connect() as client:
|
||||
db = client.get_default_database()
|
||||
collection = db[cls.COLLECTION_NAME]
|
||||
|
||||
await collection.update_one(
|
||||
{"twitch_id": twitch_id},
|
||||
{"$set": state.model_dump()},
|
||||
upsert=True
|
||||
)
|
||||
8
src/modules/stream_notifications/tasks.py
Normal file
8
src/modules/stream_notifications/tasks.py
Normal file
@@ -0,0 +1,8 @@
|
||||
from core.broker import broker
|
||||
|
||||
from .watcher import StateWatcher
|
||||
|
||||
|
||||
@broker.task("stream_notifications.twitch.on_stream_state_change")
|
||||
async def on_stream_state_change(streamer_id: int):
|
||||
await StateWatcher.on_stream_state_change(streamer_id)
|
||||
34
src/modules/stream_notifications/twitch/authorize.py
Normal file
34
src/modules/stream_notifications/twitch/authorize.py
Normal file
@@ -0,0 +1,34 @@
|
||||
from twitchAPI.twitch import Twitch
|
||||
from twitchAPI.type import AuthScope
|
||||
|
||||
from core.config import config
|
||||
|
||||
from .token_storage import TokenStorage
|
||||
|
||||
|
||||
SCOPES = [
|
||||
AuthScope.CHAT_READ,
|
||||
AuthScope.CHAT_EDIT,
|
||||
]
|
||||
|
||||
|
||||
async def authorize(auto_refresh_auth: bool = False) -> Twitch:
|
||||
twitch = Twitch(
|
||||
config.TWITCH_CLIENT_ID,
|
||||
config.TWITCH_CLIENT_SECRET
|
||||
)
|
||||
|
||||
twitch.user_auth_refresh_callback = TokenStorage.save
|
||||
twitch.auto_refresh_auth = auto_refresh_auth
|
||||
|
||||
token, refresh_token = await TokenStorage.get()
|
||||
await twitch.set_user_authentication(
|
||||
token,
|
||||
SCOPES,
|
||||
refresh_token=refresh_token if auto_refresh_auth else None,
|
||||
validate=True
|
||||
)
|
||||
|
||||
await twitch.authenticate_app(SCOPES)
|
||||
|
||||
return twitch
|
||||
@@ -1,10 +0,0 @@
|
||||
from datetime import datetime
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class State(BaseModel):
|
||||
title: str
|
||||
category: str
|
||||
|
||||
last_live_at: datetime
|
||||
@@ -1,200 +0,0 @@
|
||||
from asyncio import Lock, sleep
|
||||
from datetime import datetime
|
||||
import logging
|
||||
|
||||
from twitchAPI.helper import first
|
||||
from twitchAPI.eventsub.webhook import EventSubWebhook
|
||||
from twitchAPI.twitch import Twitch
|
||||
from twitchAPI.type import AuthScope
|
||||
from twitchAPI.object.eventsub import StreamOnlineEvent, ChannelUpdateEvent
|
||||
|
||||
from core.config import config
|
||||
from modules.stream_notifications.notification import notify
|
||||
from repositories.streamers import StreamerConfigRepository
|
||||
|
||||
from .state import State
|
||||
from .token_storage import TokenStorage
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TwitchService:
|
||||
lock = Lock()
|
||||
|
||||
SCOPES = [
|
||||
AuthScope.CHAT_READ,
|
||||
AuthScope.CHAT_EDIT,
|
||||
]
|
||||
|
||||
ONLINE_NOTIFICATION_DELAY = 15 * 60
|
||||
UPDATE_DELAY = 5 * 60
|
||||
|
||||
def __init__(self, twitch: Twitch):
|
||||
self.twitch = twitch
|
||||
|
||||
self.state: dict[int, State | None] = {}
|
||||
|
||||
@classmethod
|
||||
async def authorize(cls):
|
||||
twitch = Twitch(
|
||||
config.TWITCH_CLIENT_ID,
|
||||
config.TWITCH_CLIENT_SECRET
|
||||
)
|
||||
|
||||
twitch.user_auth_refresh_callback = TokenStorage.save
|
||||
|
||||
token, refresh_token = await TokenStorage.get()
|
||||
await twitch.set_user_authentication(token, cls.SCOPES, refresh_token)
|
||||
|
||||
await twitch.authenticate_app(cls.SCOPES)
|
||||
|
||||
return twitch
|
||||
|
||||
async def notify_online(self, streamer_id: int):
|
||||
current_state = self.state.get(streamer_id)
|
||||
if current_state is None:
|
||||
raise RuntimeError("State is None")
|
||||
|
||||
streamer = await StreamerConfigRepository.get_by_twitch_id(streamer_id)
|
||||
|
||||
if streamer.notifications.start_stream is None:
|
||||
return
|
||||
|
||||
await notify("start", streamer, current_state)
|
||||
|
||||
async def notify_change_category(self, streamer_id: int):
|
||||
current_state = self.state.get(streamer_id)
|
||||
|
||||
if current_state is None:
|
||||
raise RuntimeError("State is None")
|
||||
|
||||
if (datetime.now() - current_state.last_live_at).seconds >= self.ONLINE_NOTIFICATION_DELAY:
|
||||
return
|
||||
|
||||
streamer = await StreamerConfigRepository.get_by_twitch_id(streamer_id)
|
||||
|
||||
if streamer.notifications.change_category is None:
|
||||
return
|
||||
|
||||
await notify("change_category", streamer, current_state)
|
||||
|
||||
async def get_current_stream(self, streamer_id: int, retry_count: int = 5, delay: int = 5):
|
||||
remain_retry = retry_count
|
||||
|
||||
while remain_retry > 0:
|
||||
stream = await first(self.twitch.get_streams(user_id=[str(streamer_id)]))
|
||||
|
||||
if stream is not None:
|
||||
return stream
|
||||
|
||||
remain_retry -= 1
|
||||
await sleep(delay)
|
||||
|
||||
return None
|
||||
|
||||
async def on_channel_update(self, event: ChannelUpdateEvent):
|
||||
brodcaster_id = int(event.event.broadcaster_user_id)
|
||||
|
||||
stream = await self.get_current_stream(brodcaster_id)
|
||||
if stream is None:
|
||||
return
|
||||
|
||||
async with self.lock:
|
||||
current_state = self.state.get(brodcaster_id)
|
||||
if current_state is None:
|
||||
return
|
||||
|
||||
changed = current_state.category != event.event.category_name
|
||||
|
||||
current_state.title = event.event.title
|
||||
current_state.category = event.event.category_name
|
||||
current_state.last_live_at = datetime.now()
|
||||
|
||||
self.state[brodcaster_id] = current_state
|
||||
|
||||
if changed:
|
||||
await self.notify_change_category(brodcaster_id)
|
||||
|
||||
async def _on_stream_online(self, streamer_id: int):
|
||||
current_stream = await self.get_current_stream(streamer_id)
|
||||
if current_stream is None:
|
||||
return
|
||||
|
||||
state = State(
|
||||
title=current_stream.title,
|
||||
category=current_stream.game_name,
|
||||
last_live_at=datetime.now()
|
||||
)
|
||||
|
||||
async with self.lock:
|
||||
current_state = self.state.get(streamer_id)
|
||||
|
||||
is_need_notify = current_state is None or (datetime.now() - current_state.last_live_at).seconds >= self.ONLINE_NOTIFICATION_DELAY
|
||||
|
||||
self.state[streamer_id] = state
|
||||
|
||||
if is_need_notify:
|
||||
await self.notify_online(streamer_id)
|
||||
|
||||
async def on_stream_online(self, event: StreamOnlineEvent):
|
||||
await self._on_stream_online(int(event.event.broadcaster_user_id))
|
||||
|
||||
async def run(self):
|
||||
eventsub = EventSubWebhook(
|
||||
callback_url=config.TWITCH_CALLBACK_URL,
|
||||
port=config.TWITCH_CALLBACK_PORT,
|
||||
twitch=self.twitch,
|
||||
message_deduplication_history_length=50
|
||||
)
|
||||
|
||||
streamers = await StreamerConfigRepository.all()
|
||||
|
||||
for streamer in streamers:
|
||||
current_stream = await self.get_current_stream(streamer.twitch.id)
|
||||
|
||||
if current_stream:
|
||||
self.state[streamer.twitch.id] = State(
|
||||
title=current_stream.title,
|
||||
category=current_stream.game_name,
|
||||
last_live_at=datetime.now()
|
||||
)
|
||||
else:
|
||||
self.state[streamer.twitch.id] = None
|
||||
|
||||
try:
|
||||
await eventsub.unsubscribe_all()
|
||||
|
||||
eventsub.start()
|
||||
|
||||
logger.info("Subscribe to events...")
|
||||
|
||||
for streamer in streamers:
|
||||
logger.info(f"Subscribe to events for {streamer.twitch.name}")
|
||||
await eventsub.listen_channel_update_v2(str(streamer.twitch.id), self.on_channel_update)
|
||||
await eventsub.listen_stream_online(str(streamer.twitch.id), self.on_stream_online)
|
||||
logger.info(f"Subscribe to events for {streamer.twitch.name} done")
|
||||
|
||||
logger.info("Twitch service started")
|
||||
|
||||
while True:
|
||||
await sleep(self.UPDATE_DELAY)
|
||||
|
||||
for streamer in streamers:
|
||||
await self._on_stream_online(streamer.twitch.id)
|
||||
finally:
|
||||
await eventsub.stop()
|
||||
await self.twitch.close()
|
||||
|
||||
raise RuntimeError("Twitch service stopped")
|
||||
|
||||
@classmethod
|
||||
async def start(cls):
|
||||
logger.info("Starting Twitch service...")
|
||||
|
||||
twith = await cls.authorize()
|
||||
await cls(twith).run()
|
||||
|
||||
|
||||
async def start_twitch_service():
|
||||
await TwitchService.start()
|
||||
73
src/modules/stream_notifications/twitch/webhook.py
Normal file
73
src/modules/stream_notifications/twitch/webhook.py
Normal file
@@ -0,0 +1,73 @@
|
||||
from asyncio import sleep
|
||||
import logging
|
||||
from typing import NoReturn
|
||||
|
||||
from twitchAPI.eventsub.webhook import EventSubWebhook
|
||||
from twitchAPI.twitch import Twitch
|
||||
from twitchAPI.object.eventsub import StreamOnlineEvent, ChannelUpdateEvent
|
||||
|
||||
from core.config import config
|
||||
from repositories.streamers import StreamerConfigRepository
|
||||
from modules.stream_notifications.tasks import on_stream_state_change
|
||||
|
||||
from .authorize import authorize
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TwitchService:
|
||||
ONLINE_NOTIFICATION_DELAY = 15 * 60
|
||||
|
||||
def __init__(self, twitch: Twitch):
|
||||
self.twitch = twitch
|
||||
|
||||
async def on_channel_update(self, event: ChannelUpdateEvent):
|
||||
await on_stream_state_change.kiq(int(event.event.broadcaster_user_id))
|
||||
|
||||
async def on_stream_online(self, event: StreamOnlineEvent):
|
||||
await on_stream_state_change.kiq(int(event.event.broadcaster_user_id))
|
||||
|
||||
async def run(self) -> NoReturn:
|
||||
eventsub = EventSubWebhook(
|
||||
callback_url=config.TWITCH_CALLBACK_URL,
|
||||
port=config.TWITCH_CALLBACK_PORT,
|
||||
twitch=self.twitch,
|
||||
message_deduplication_history_length=50
|
||||
)
|
||||
|
||||
streamers = await StreamerConfigRepository.all()
|
||||
|
||||
try:
|
||||
await eventsub.unsubscribe_all()
|
||||
|
||||
eventsub.start()
|
||||
|
||||
logger.info("Subscribe to events...")
|
||||
|
||||
for streamer in streamers:
|
||||
logger.info(f"Subscribe to events for {streamer.twitch.name}")
|
||||
await eventsub.listen_channel_update_v2(str(streamer.twitch.id), self.on_channel_update)
|
||||
await eventsub.listen_stream_online(str(streamer.twitch.id), self.on_stream_online)
|
||||
logger.info(f"Subscribe to events for {streamer.twitch.name} done")
|
||||
|
||||
logger.info("Twitch service started")
|
||||
|
||||
while True:
|
||||
await sleep(0.1)
|
||||
finally:
|
||||
await eventsub.stop()
|
||||
await self.twitch.close()
|
||||
|
||||
raise RuntimeError("Twitch service stopped")
|
||||
|
||||
@classmethod
|
||||
async def start(cls):
|
||||
logger.info("Starting Twitch service...")
|
||||
|
||||
twith = await authorize(auto_refresh_auth=True)
|
||||
await cls(twith).run()
|
||||
|
||||
|
||||
async def start_twitch_service() -> NoReturn:
|
||||
await TwitchService.start()
|
||||
95
src/modules/stream_notifications/watcher.py
Normal file
95
src/modules/stream_notifications/watcher.py
Normal file
@@ -0,0 +1,95 @@
|
||||
from datetime import datetime, timezone, timedelta
|
||||
|
||||
from twitchAPI.helper import first
|
||||
|
||||
from core.redis import redis_manager
|
||||
from repositories.streamers import StreamerConfigRepository
|
||||
|
||||
from .state import State, StateManager
|
||||
from .sent_notifications import SentNotificationRepository, SentNotificationType
|
||||
from .notification import notify
|
||||
from .twitch.authorize import authorize
|
||||
|
||||
|
||||
class StateWatcher:
|
||||
START_STREAM_THRESHOLD = timedelta(minutes=15)
|
||||
|
||||
@classmethod
|
||||
async def get_twitch_state(cls, streamer_id: int) -> State | None:
|
||||
twitch = await authorize()
|
||||
|
||||
stream = await first(
|
||||
twitch.get_streams(user_id=[str(streamer_id)])
|
||||
)
|
||||
|
||||
if stream is None:
|
||||
return None
|
||||
|
||||
return State(
|
||||
title=stream.title,
|
||||
category=stream.game_name,
|
||||
last_live_at=datetime.now(timezone.utc)
|
||||
)
|
||||
|
||||
@classmethod
|
||||
async def notify_and_save(
|
||||
cls,
|
||||
streamer_id: int,
|
||||
sent_notification_type: SentNotificationType,
|
||||
state: State
|
||||
):
|
||||
streamer = await StreamerConfigRepository.get_by_twitch_id(streamer_id)
|
||||
|
||||
sent_result = await notify(sent_notification_type, streamer, state)
|
||||
await SentNotificationRepository.add(
|
||||
streamer.twitch.id,
|
||||
sent_notification_type,
|
||||
state,
|
||||
sent_result=sent_result
|
||||
)
|
||||
|
||||
@classmethod
|
||||
async def notify_start_stream(
|
||||
cls,
|
||||
streamer_id: int,
|
||||
state: State
|
||||
):
|
||||
await cls.notify_and_save(streamer_id, SentNotificationType.START_STREAM, state)
|
||||
|
||||
@classmethod
|
||||
async def notify_change_category(
|
||||
cls,
|
||||
streamer_id: int,
|
||||
state: State
|
||||
):
|
||||
await cls.notify_and_save(streamer_id, SentNotificationType.CHANGE_CATEGORY, state)
|
||||
|
||||
@classmethod
|
||||
async def _on_stream_state_change(cls, streamer_id: int):
|
||||
current_state = await cls.get_twitch_state(streamer_id)
|
||||
if current_state is None:
|
||||
return
|
||||
|
||||
last_state = await StateManager.get(streamer_id)
|
||||
if last_state is None:
|
||||
await cls.notify_start_stream(streamer_id, current_state)
|
||||
await StateManager.update(streamer_id, current_state)
|
||||
return
|
||||
|
||||
if datetime.now(timezone.utc) - last_state.last_live_at > cls.START_STREAM_THRESHOLD:
|
||||
await cls.notify_start_stream(streamer_id, current_state)
|
||||
await StateManager.update(streamer_id, current_state)
|
||||
return
|
||||
|
||||
if last_state.category != current_state.category:
|
||||
await cls.notify_change_category(streamer_id, current_state)
|
||||
await StateManager.update(streamer_id, current_state)
|
||||
return
|
||||
|
||||
await StateManager.update(streamer_id, current_state)
|
||||
|
||||
@classmethod
|
||||
async def on_stream_state_change(cls, streamer_id: int):
|
||||
async with redis_manager.connect() as redis:
|
||||
async with redis.lock(f"on_stream_state_change:{streamer_id}"):
|
||||
await cls._on_stream_state_change(streamer_id)
|
||||
@@ -1 +1,2 @@
|
||||
from modules.scheduler_sync.tasks import * # noqa: F403
|
||||
from modules.stream_notifications.tasks import * # noqa: F403
|
||||
|
||||
Reference in New Issue
Block a user