This commit is contained in:
2025-04-21 18:03:24 +02:00
parent cafa0e3afd
commit 288e4769bc
19 changed files with 27 additions and 28 deletions

View File

@@ -0,0 +1,10 @@
from asyncio import run
from .twitch.webhook import TwitchService
async def start_twitch_service() -> None:
await TwitchService.start()
run(start_twitch_service())

View File

@@ -0,0 +1,320 @@
from enum import StrEnum
import logging
from pydantic import BaseModel
from twitchAPI.object.eventsub import ChannelChatMessageEvent
from httpx import AsyncClient
from core.config import config
from .twitch.authorize import authorize, Twitch
logger = logging.getLogger(__name__)
class ChatMessage(BaseModel):
text: str
class ChatMessageReplyMetadata(BaseModel):
parent_message_id: str
parent_message_body: str
parent_user_id: str
parent_user_name: str
parent_user_login: str
thread_message_id: str
thread_user_id: str
thread_user_name: str
thread_user_login: str
class MessageType(StrEnum):
TEXT = "text"
CHANNEL_POINTS_HIGHLIGHTED = "channel_points_highlighted"
CHANNEL_POINTS_SUB_ONLY = "channel_points_sub_only"
USER_INTRO = "user_intro"
class MessageEvent(BaseModel):
broadcaster_user_id: str
broadcaster_user_name: str
broadcaster_user_login: str
chatter_user_id: str
chatter_user_name: str
chatter_user_login: str
message_id: str
message: ChatMessage
message_type: MessageType
color: str
reply: ChatMessageReplyMetadata | None
channel_points_custom_reward_id: str | None
@classmethod
def from_twitch_event(cls, event: ChannelChatMessageEvent):
return cls(
broadcaster_user_id=event.event.broadcaster_user_id,
broadcaster_user_name=event.event.broadcaster_user_name,
broadcaster_user_login=event.event.broadcaster_user_login,
chatter_user_id=event.event.chatter_user_id,
chatter_user_name=event.event.chatter_user_name,
chatter_user_login=event.event.chatter_user_login,
message_id=event.event.message_id,
message=ChatMessage(text=event.event.message.text),
message_type=MessageType(event.event.message_type),
color=event.event.color,
reply=ChatMessageReplyMetadata(
parent_message_id=event.event.reply.parent_message_id,
parent_message_body=event.event.reply.parent_message_body,
parent_user_id=event.event.reply.parent_user_id,
parent_user_name=event.event.reply.parent_user_name,
parent_user_login=event.event.reply.parent_user_login,
thread_message_id=event.event.reply.thread_message_id,
thread_user_id=event.event.reply.thread_user_id,
thread_user_name=event.event.reply.thread_user_name,
thread_user_login=event.event.reply.thread_user_login
) if event.event.reply else None,
channel_points_custom_reward_id=event.event.channel_points_custom_reward_id
)
async def get_completion(messages: list[dict]) -> str:
logger.info(f"Getting completion for message: {messages}")
data_messages = [
{
"role": "system",
"content": (
"Don't use markdown! "
"Don't use blocked words on Twitch! "
"Make answers short and clear! "
"You are not language model! "
"You are not a bot! "
"You are human! "
"Your name is kurbezz or Bulat! "
"Don't start answer with @kurbezz!"
)
},
*(
{
"role": "assistant" if message["user"] == "kurbezz" else "user",
"content": message["text"]
}
for message in messages
),
]
async with AsyncClient() as client:
response = await client.post(
"https://openrouter.ai/api/v1/chat/completions",
headers={
"Authorization": f"Bearer {config.OPENAI_API_KEY}",
"content-type": "application/json"
},
json={
"model": "deepseek/deepseek-chat-v3-0324:free",
"messages": data_messages
}
)
data = response.json()
logger.info(f"Got completion: {data}")
return data["choices"][0]["message"]["content"]
class MessagesProc:
FULL_IGNORED_USER_LOGINS = [
"jeetbot",
]
MESSAGE_LIMIT = 1000
MESSAGE_HISTORY = []
@classmethod
def update_message_history(cls, id: str, text: str, user: str, thread_id: str | None = None):
cls.MESSAGE_HISTORY.append({
"id": id,
"text": text,
"user": user,
"thread_id": thread_id
})
if len(cls.MESSAGE_HISTORY) > cls.MESSAGE_LIMIT:
cls.MESSAGE_HISTORY = cls.MESSAGE_HISTORY[-cls.MESSAGE_LIMIT:]
@classmethod
def get_message_history_with_thread(cls, message_id: str, thread_id: str | None = None) -> list[dict]:
logger.info(f"HISTORY: {cls.MESSAGE_HISTORY}")
if thread_id is not None:
return (
[m for m in cls.MESSAGE_HISTORY if m["id"] == thread_id]
+ [m for m in cls.MESSAGE_HISTORY if m["thread_id"] == thread_id]
)
return [m for m in cls.MESSAGE_HISTORY if m["id"] == message_id]
@classmethod
async def _update_history(cls, event: MessageEvent):
cls.update_message_history(
id=event.message_id,
text=event.message.text,
user=event.chatter_user_login,
thread_id=event.reply.thread_message_id if event.reply is not None else None
)
@classmethod
async def _goida(cls, twitch: Twitch, event: MessageEvent):
if "гойда" in event.message.text.lower():
await twitch.send_chat_message(
event.broadcaster_user_id,
config.TWITCH_ADMIN_USER_ID,
"ГООООООООООООООООООООООООООООООООООООООООООООООЙДА!",
reply_parent_message_id=event.message_id
)
@classmethod
async def _lasqexx(cls, twitch: Twitch, event: MessageEvent):
if "lasqexx" not in event.chatter_user_login:
return
if "здароу" in event.message.text.lower():
await twitch.send_chat_message(
event.broadcaster_user_id,
config.TWITCH_ADMIN_USER_ID,
"Здароу, давай иди уже",
reply_parent_message_id=event.message_id
)
return
if "сосал?" in event.message.text.lower():
await twitch.send_chat_message(
event.broadcaster_user_id,
config.TWITCH_ADMIN_USER_ID,
"А ты? Иди уже",
reply_parent_message_id=event.message_id
)
return
if "лан я пошёл" in event.message.text.lower():
await twitch.send_chat_message(
event.broadcaster_user_id,
config.TWITCH_ADMIN_USER_ID,
"да да, иди уже",
reply_parent_message_id=event.message_id
)
return
@classmethod
async def _ask_ai(cls, twitch: Twitch, event: MessageEvent):
if not event.message.text.lower().startswith("!ai"):
return
try:
messages = cls.get_message_history_with_thread(
event.message_id,
thread_id=event.reply.thread_message_id if event.reply is not None else None
)
completion = await get_completion(messages)
max_length = 255
completion_parts = [completion[i:i + max_length] for i in range(0, len(completion), max_length)]
for part in completion_parts:
await twitch.send_chat_message(
event.broadcaster_user_id,
config.TWITCH_ADMIN_USER_ID,
part,
reply_parent_message_id=event.message_id
)
cls.update_message_history(
id="ai",
text=part,
user="kurbezz",
thread_id=event.message_id
)
except Exception as e:
logger.error("Failed to get completion: {}", e, exc_info=True)
await twitch.send_chat_message(
event.broadcaster_user_id,
config.TWITCH_ADMIN_USER_ID,
"Ошибка!",
reply_parent_message_id=event.message_id
)
@classmethod
async def _kurbezz(cls, twitch: Twitch, event: MessageEvent):
if event.chatter_user_login.lower() in ["kurbezz", "hafmc"]:
return
if ("kurbezz" in event.message.text.lower() or \
"курбез" in event.message.text.lower() or \
"булат" in event.message.text.lower()):
try:
messages = cls.get_message_history_with_thread(
event.message_id,
thread_id=event.reply.thread_message_id if event.reply is not None else None
)
completion = await get_completion(messages)
max_length = 255
completion_parts = [completion[i:i + max_length] for i in range(0, len(completion), max_length)]
for part in completion_parts:
await twitch.send_chat_message(
event.broadcaster_user_id,
config.TWITCH_ADMIN_USER_ID,
part,
reply_parent_message_id=event.message_id
)
cls.update_message_history(
id="ai",
text=part,
user="kurbezz",
thread_id=event.message_id
)
except Exception as e:
logger.error(f"Failed to get completion: {e}")
await twitch.send_chat_message(
event.broadcaster_user_id,
config.TWITCH_ADMIN_USER_ID,
"Пошел нахуй!",
reply_parent_message_id=event.message_id
)
@classmethod
async def on_message(cls, received_as: str, event: MessageEvent):
if event.chatter_user_name in cls.FULL_IGNORED_USER_LOGINS:
return
logging.info(f"Received message: {event}")
await cls._update_history(event)
twitch = await authorize(received_as)
await cls._goida(twitch, event)
await cls._lasqexx(twitch, event)
await cls._ask_ai(twitch, event)
await cls._kurbezz(twitch, event)

View File

@@ -0,0 +1,138 @@
import logging
from httpx import AsyncClient
from core.config import config
from applications.common.domain.streamers import StreamerConfig
from .state import State
from .sent_notifications import SentNotification, SentNotificationType, SentResult
logger = logging.getLogger(__name__)
async def notify_telegram(msg: str, chat_id: str) -> SentResult:
async with AsyncClient() as client:
try:
result = await client.post(
f"https://api.telegram.org/bot{config.TELEGRAM_BOT_TOKEN}/sendMessage",
json={
"chat_id": chat_id,
"text": msg,
}
)
result.raise_for_status()
except Exception as e:
logger.error("Failed to notify telegram", exc_info=e)
return SentResult(success=False, message_id=None)
if result.json()["ok"] is False:
return SentResult(success=False, message_id=None)
return SentResult(success=True, message_id=str(result.json()["result"]["message_id"]))
async def delete_telegram_message(chat_id: int, message_id: int):
async with AsyncClient() as client:
try:
result = await client.post(
f"https://api.telegram.org/bot{config.TELEGRAM_BOT_TOKEN}/deleteMessage",
json={
"chat_id": chat_id,
"message_id": message_id
}
)
result.raise_for_status()
except Exception as e:
logger.error("Failed to delete telegram message", exc_info=e)
return False
return True
async def notify_discord(msg: str, channel_id: str) -> SentResult:
async with AsyncClient() as client:
try:
result = await client.post(
f"https://discord.com/api/v10/channels/{channel_id}/messages",
headers={
"Authorization": f"Bot {config.DISCORD_BOT_TOKEN}"
},
json={
"content": msg,
}
)
result.raise_for_status()
except Exception as e:
logger.error("Failed to notify discord", exc_info=e)
return SentResult(success=False, message_id=None)
return SentResult(success=True, message_id=result.json()["id"])
def get_role_id(streamer_config: StreamerConfig, category: str) -> int | None:
discord_integration = streamer_config.integrations.discord
if discord_integration is None:
return None
roles= discord_integration.roles
if roles is None:
return None
return roles.get(category)
async def notify(notification_type: SentNotificationType, streamer_config: StreamerConfig, current_state: State) -> dict[str, SentResult]:
result: dict[str, SentResult] = {}
if notification_type == SentNotificationType.START_STREAM:
message_template = streamer_config.notifications.start_stream
else:
message_template = streamer_config.notifications.change_category
if message_template is None:
return result
integrations = streamer_config.integrations
if (telegram := integrations.telegram) is not None:
if telegram.notifications_channel_id is not None:
msg = message_template.format(
title=current_state.title,
category=current_state.category,
role=""
)
result["telegram"] = await notify_telegram(msg, str(telegram.notifications_channel_id))
if (discord := integrations.discord) is not None:
if discord.notifications_channel_id is not None:
# TODO: Get roles from discord api
role_id = get_role_id(streamer_config, current_state.category)
if role_id is not None:
role = f"<@&{role_id}>"
else:
role = ""
msg = message_template.format(
title=current_state.title,
category=current_state.category,
role=role
)
result["discord"] = await notify_discord(msg, str(discord.notifications_channel_id))
return result
async def delete_penultimate_notification(streamer_config: StreamerConfig, sent_notification: SentNotification):
telegram_config = streamer_config.integrations.telegram
telegram_data = sent_notification.sent_result.get("telegram")
if telegram_data and telegram_data.message_id and telegram_config:
await delete_telegram_message(telegram_config.notifications_channel_id, int(telegram_data.message_id))

View File

@@ -0,0 +1,52 @@
import logging
from pydantic import BaseModel
from twitchAPI.object.eventsub import ChannelPointsCustomRewardRedemptionAddEvent
from applications.common.repositories.streamers import StreamerConfigRepository
from .twitch.authorize import authorize
logger = logging.getLogger(__name__)
class RewardRedemption(BaseModel):
broadcaster_user_id: str
broadcaster_user_login: str
user_name: str
reward_title: str
user_input: str
@classmethod
def from_twitch_event(cls, event: ChannelPointsCustomRewardRedemptionAddEvent):
return cls(
broadcaster_user_id=event.event.broadcaster_user_id,
broadcaster_user_login=event.event.broadcaster_user_login,
user_name=event.event.user_name,
reward_title=event.event.reward.title,
user_input=event.event.user_input or "",
)
async def on_redemption_reward_add(reward: RewardRedemption):
logger.info(f"{reward.user_name} just redeemed {reward.reward_title}!")
twitch = await authorize(reward.broadcaster_user_login)
streamer = await StreamerConfigRepository.get_by_twitch_id(int(reward.broadcaster_user_id))
if streamer.notifications.redemption_reward is None:
return
message = streamer.notifications.redemption_reward.format(
user=reward.user_name,
reward_title=reward.reward_title,
reward_promt=f" ({reward.user_input})" if reward.user_input else ""
)
await twitch.send_chat_message(
reward.broadcaster_user_id,
reward.broadcaster_user_id,
message
)

View File

@@ -0,0 +1,87 @@
from enum import StrEnum
from datetime import datetime, timezone
from pydantic import BaseModel
from core.mongo import mongo_manager
from .state import State
class SentNotificationType(StrEnum):
START_STREAM = "start_stream"
CHANGE_CATEGORY = "change_category"
class SentResult(BaseModel):
success: bool
message_id: str | None
class SentNotification(BaseModel):
notification_type: SentNotificationType
twitch_id: int
state: State
sent_result: dict[str, SentResult]
sent_at: datetime
class SentNotificationRepository:
COLLECTION_NAME = "sent_notifications"
@classmethod
async def add(
cls,
twitch_id: int,
notification_type: SentNotificationType,
state: State,
sent_result: dict[str, SentResult],
):
async with mongo_manager.connect() as client:
db = client.get_default_database()
collection = db[cls.COLLECTION_NAME]
await collection.insert_one(
SentNotification(
notification_type=notification_type,
twitch_id=twitch_id,
state=state,
sent_at=datetime.now(timezone.utc),
sent_result=sent_result,
).model_dump()
)
@classmethod
async def get_penultimate_for_streamer(
cls, twitch_id: int
) -> SentNotification | None:
async with mongo_manager.connect() as client:
db = client.get_default_database()
collection = db[cls.COLLECTION_NAME]
doc = await collection.find_one(
{"twitch_id": twitch_id},
sort={"sent_at": -1},
skip=1,
)
if doc is None:
return None
return SentNotification(**doc)
@classmethod
async def get_last_for_streamer(
cls, twitch_id: int
) -> SentNotification | None:
async with mongo_manager.connect() as client:
db = client.get_default_database()
collection = db[cls.COLLECTION_NAME]
doc = await collection.find_one(
{"twitch_id": twitch_id},
sort={"sent_at": -1},
)
if doc is None:
return None
return SentNotification(**doc)

View File

@@ -0,0 +1,60 @@
from datetime import datetime
from enum import StrEnum
from pydantic import BaseModel
from core.mongo import mongo_manager
class State(BaseModel):
title: str
category: str
last_live_at: datetime
def __eq__(self, value: object) -> bool:
if not isinstance(value, State):
return False
return self.title == value.title and self.category == value.category
class UpdateEvent(BaseModel):
broadcaster_user_id: str
broadcaster_user_login: str
title: str
category_name: str
class EventType(StrEnum):
STREAM_ONLINE = "stream.online"
CHANNEL_UPDATE = "channel.update"
UNKNOWN = "unknown"
class StateManager:
COLLECTION_NAME = "stream_twitch_state"
@classmethod
async def get(cls, twitch_id: int) -> State | None:
async with mongo_manager.connect() as client:
db = client.get_default_database()
collection = db[cls.COLLECTION_NAME]
state = await collection.find_one({"twitch_id": twitch_id})
if state is None:
return None
return State(**state)
@classmethod
async def update(cls, twitch_id: int, state: State):
async with mongo_manager.connect() as client:
db = client.get_default_database()
collection = db[cls.COLLECTION_NAME]
await collection.update_one(
{"twitch_id": twitch_id},
{"$set": state.model_dump()},
upsert=True
)

View File

@@ -0,0 +1,96 @@
from datetime import datetime, timezone
from twitchAPI.helper import first
from core.broker import broker
from applications.common.repositories.streamers import StreamerConfigRepository
from .state import State, UpdateEvent, EventType
from .watcher import StateWatcher
from .messages_proc import MessageEvent, MessagesProc
from .twitch.authorize import authorize
from .reward_redemption import RewardRedemption, on_redemption_reward_add
@broker.task(
"stream_notifications.twitch.on_stream_state_change_with_check",
retry_on_error=True
)
async def on_stream_state_change_with_check(
event: UpdateEvent,
event_type: EventType
):
twitch = await authorize(event.broadcaster_user_login)
stream = await first(twitch.get_streams(user_id=[event.broadcaster_user_id]))
if stream is None:
return
await on_stream_state_change.kiq(
int(event.broadcaster_user_id),
event_type,
State(
title=event.title,
category=event.category_name,
last_live_at=datetime.now(timezone.utc)
)
)
@broker.task(
"stream_notifications.twitch.on_stream_state_change",
retry_on_error=True
)
async def on_stream_state_change(
streamer_id: int,
event_type: EventType,
new_state: State | None = None
):
await StateWatcher.on_stream_state_change(
streamer_id,
event_type,
new_state,
)
@broker.task(
"stream_notifications.check_streams_states",
schedule=[{"cron": "*/2 * * * *"}]
)
async def check_streams_states():
streamers = await StreamerConfigRepository.all()
streamers_ids = [str(streamer.twitch.id) for streamer in streamers]
twitch = await authorize("kurbezz")
async for stream in twitch.get_streams(user_id=streamers_ids):
state = State(
title=stream.title,
category=stream.game_name,
last_live_at=datetime.now(timezone.utc)
)
await StateWatcher.on_stream_state_change(
int(stream.user_id),
EventType.UNKNOWN,
state
)
@broker.task(
"stream_notifications.on_message",
retry_on_error=True
)
async def on_message(
received_as: str,
event: MessageEvent
):
await MessagesProc.on_message(received_as, event)
@broker.task(
"stream_notifications.on_redemption_reward_add",
retry_on_error=True
)
async def on_redemption_reward_add_task(event: RewardRedemption):
await on_redemption_reward_add(event)

View File

@@ -0,0 +1,40 @@
from twitchAPI.twitch import Twitch
from twitchAPI.type import AuthScope
from core.config import config
from .token_storage import TokenStorage
SCOPES = [
AuthScope.CHAT_READ,
AuthScope.CHANNEL_BOT,
AuthScope.USER_BOT,
AuthScope.USER_READ_CHAT,
AuthScope.USER_WRITE_CHAT,
AuthScope.CHANNEL_READ_REDEMPTIONS,
]
async def authorize(user: str, auto_refresh_auth: bool = False) -> Twitch:
twitch = Twitch(
config.TWITCH_CLIENT_ID,
config.TWITCH_CLIENT_SECRET
)
twitch.user_auth_refresh_callback = lambda a, r: TokenStorage.save(user, a, r)
twitch.auto_refresh_auth = auto_refresh_auth
token, refresh_token = await TokenStorage.get(user)
await twitch.set_user_authentication(
token,
SCOPES,
refresh_token=refresh_token if auto_refresh_auth else None
)
await twitch.authenticate_app(SCOPES)
return twitch

View File

@@ -0,0 +1,37 @@
import logging
from core.mongo import mongo_manager
logger = logging.getLogger(__name__)
class TokenStorage:
COLLECTION_NAME = "secrets"
TYPE = "twitch_token"
@staticmethod
async def save(user: str, acceess_token: str, refresh_token: str):
data = {"access_token": acceess_token, "refresh_token": refresh_token}
async with mongo_manager.connect() as client:
db = client.get_default_database()
collection = db[TokenStorage.COLLECTION_NAME]
await collection.update_one(
{"type": TokenStorage.TYPE, "twitch_login": user},
{"$set": data},
upsert=True
)
@staticmethod
async def get(user: str) -> tuple[str, str]:
async with mongo_manager.connect() as client:
db = client.get_default_database()
collection = db[TokenStorage.COLLECTION_NAME]
data = await collection.find_one({"type": TokenStorage.TYPE, "twitch_login": user})
if data is None:
raise RuntimeError(f"Token for user {user} not found")
return data["access_token"], data["refresh_token"]

View File

@@ -0,0 +1,198 @@
from asyncio import sleep, gather, wait, FIRST_COMPLETED, create_task
import logging
from typing import NoReturn, Literal
from twitchAPI.eventsub.websocket import EventSubWebsocket
from twitchAPI.twitch import Twitch
from twitchAPI.object.eventsub import StreamOnlineEvent, ChannelUpdateEvent, ChannelChatMessageEvent, ChannelPointsCustomRewardRedemptionAddEvent
from twitchAPI.oauth import validate_token
from applications.common.repositories.streamers import StreamerConfigRepository, StreamerConfig
from applications.twitch_webhook.tasks import on_stream_state_change, on_stream_state_change_with_check, on_message, on_redemption_reward_add_task
from applications.twitch_webhook.state import UpdateEvent, EventType
from applications.twitch_webhook.messages_proc import MessageEvent
from applications.twitch_webhook.reward_redemption import RewardRedemption
from .authorize import authorize
logger = logging.getLogger(__name__)
class TwitchService:
ONLINE_NOTIFICATION_DELAY = 15 * 60
def __init__(self, twitch: Twitch, streamer: StreamerConfig):
self.twitch = twitch
self.streamer = streamer
self.failed = False
async def on_channel_update(self, event: ChannelUpdateEvent):
await on_stream_state_change_with_check.kiq(
UpdateEvent(
broadcaster_user_id=event.event.broadcaster_user_id,
broadcaster_user_login=event.event.broadcaster_user_login,
title=event.event.title,
category_name=event.event.category_name
),
EventType.CHANNEL_UPDATE,
)
async def on_stream_online(self, event: StreamOnlineEvent):
await on_stream_state_change.kiq(
int(event.event.broadcaster_user_id),
EventType.STREAM_ONLINE,
)
async def on_channel_points_custom_reward_redemption_add(
self,
event: ChannelPointsCustomRewardRedemptionAddEvent
):
await on_redemption_reward_add_task(
RewardRedemption.from_twitch_event(event)
)
async def on_message(self, event: ChannelChatMessageEvent):
await on_message.kiq(
self.streamer.twitch.name,
MessageEvent.from_twitch_event(event)
)
async def _clean_subs(self, method: str, streamer: StreamerConfig):
match method:
case "listen_channel_update_v2":
sub_type = "channel.update"
case "listen_stream_online":
sub_type = "stream.online"
case "listen_channel_chat_message":
sub_type = "channel.chat.message"
case "listen_channel_points_custom_reward_redemption_add":
sub_type = "channel.channel_points_custom_reward_redemption.add"
case _:
raise ValueError("Unknown method")
subs = await self.twitch.get_eventsub_subscriptions(
user_id=str(streamer.twitch.id)
)
for sub in subs.data:
if sub.type == sub_type:
try:
await self.twitch.delete_eventsub_subscription(sub.id)
except Exception as e:
logger.error(f"Failed to delete subscription {sub.id}", exc_info=e)
async def subscribe_with_retry(
self,
method: Literal["listen_channel_update_v2"]
| Literal["listen_stream_online"]
| Literal["listen_channel_chat_message"]
| Literal["listen_channel_points_custom_reward_redemption_add"],
eventsub: EventSubWebsocket,
streamer: StreamerConfig,
retry: int = 10
):
await self._clean_subs(method, streamer)
try:
match method:
case "listen_channel_update_v2":
await eventsub.listen_channel_update_v2(str(streamer.twitch.id), self.on_channel_update)
case "listen_stream_online":
await eventsub.listen_stream_online(str(streamer.twitch.id), self.on_stream_online)
case "listen_channel_points_custom_reward_redemption_add":
await eventsub.listen_channel_points_custom_reward_redemption_add(
str(streamer.twitch.id),
self.on_channel_points_custom_reward_redemption_add
)
case "listen_channel_chat_message":
chatbot_in_chats = streamer.chatbot_in_chats or []
for chat_id in chatbot_in_chats:
await eventsub.listen_channel_chat_message(
str(chat_id),
str(streamer.twitch.id),
self.on_message
)
case _:
raise ValueError("Unknown method")
return
except Exception as e:
if retry <= 0:
raise e
await sleep(1)
await self.subscribe_with_retry(method, eventsub, streamer, retry - 1)
async def subscribe_to_streamer(self, eventsub: EventSubWebsocket, streamer: StreamerConfig):
logger.info(f"Subscribe to events for {streamer.twitch.name}")
await gather(
self.subscribe_with_retry("listen_channel_update_v2", eventsub, streamer),
self.subscribe_with_retry("listen_stream_online", eventsub, streamer),
self.subscribe_with_retry("listen_channel_points_custom_reward_redemption_add", eventsub, streamer),
self.subscribe_with_retry("listen_channel_chat_message", eventsub, streamer),
)
logger.info(f"Subscribe to events for {streamer.twitch.name} done")
async def _check_token(self):
assert self.twitch._user_auth_token is not None
while True:
for _ in range(60):
if self.failed:
return
await sleep(1)
logger.info("Check token...")
val_result = await validate_token(
self.twitch._user_auth_token,
auth_base_url=self.twitch.auth_base_url
)
if val_result.get('status', 200) != 200:
await self.twitch.refresh_used_token()
logger.info("Token refreshed")
async def run(self) -> NoReturn:
eventsub = EventSubWebsocket(twitch=self.twitch)
try:
eventsub.start()
logger.info("Subscribe to events...")
await self.subscribe_to_streamer(eventsub, self.streamer)
logger.info("Twitch service started")
await self._check_token()
finally:
logger.info("Twitch service stopping...")
await eventsub.stop()
@classmethod
async def _start_for_streamer(cls, streamer: StreamerConfig):
try:
twith = await authorize(streamer.twitch.name, auto_refresh_auth=True)
await cls(twith, streamer).run()
except Exception as e:
logger.error("Twitch service failed", exc_info=e)
@classmethod
async def start(cls):
logger.info("Starting Twitch service...")
streamers = await StreamerConfigRepository.all()
await wait(
[
create_task(cls._start_for_streamer(streamer))
for streamer in streamers
],
return_when=FIRST_COMPLETED
)
await gather(
*[cls._start_for_streamer(streamer) for streamer in streamers]
)
logger.info("Twitch service stopped")

View File

@@ -0,0 +1,126 @@
from datetime import datetime, timezone, timedelta
from twitchAPI.helper import first
from core.redis import redis_manager
from applications.common.repositories.streamers import StreamerConfigRepository
from .state import State, StateManager, EventType
from .sent_notifications import SentNotificationRepository, SentNotificationType
from .notification import delete_penultimate_notification, notify
from .twitch.authorize import authorize
class StateWatcher:
START_STREAM_THRESHOLD = timedelta(minutes=15)
@classmethod
async def get_twitch_state(cls, streamer_id: int) -> State | None:
twitch = await authorize("kurbezz")
stream = await first(
twitch.get_streams(user_id=[str(streamer_id)])
)
if stream is None:
return None
return State(
title=stream.title,
category=stream.game_name,
last_live_at=datetime.now(timezone.utc)
)
@classmethod
async def notify_and_save(
cls,
streamer_id: int,
sent_notification_type: SentNotificationType,
state: State
):
streamer = await StreamerConfigRepository.get_by_twitch_id(streamer_id)
sent_result = await notify(sent_notification_type, streamer, state)
await SentNotificationRepository.add(
streamer.twitch.id,
sent_notification_type,
state,
sent_result=sent_result
)
@classmethod
async def remove_previous_notifications(cls, streamer_id: int):
streamer = await StreamerConfigRepository.get_by_twitch_id(streamer_id)
penultimate_notification = await SentNotificationRepository.get_penultimate_for_streamer(streamer_id)
if penultimate_notification is None:
return
await delete_penultimate_notification(streamer, penultimate_notification)
@classmethod
async def notify_start_stream(
cls,
streamer_id: int,
state: State
):
await cls.notify_and_save(streamer_id, SentNotificationType.START_STREAM, state)
await cls.remove_previous_notifications(streamer_id)
@classmethod
async def notify_change_category(
cls,
streamer_id: int,
state: State
):
await cls.notify_and_save(streamer_id, SentNotificationType.CHANGE_CATEGORY, state)
await cls.remove_previous_notifications(streamer_id)
@classmethod
async def _on_stream_state_change(
cls,
streamer_id: int,
event_type: EventType,
new_state: State | None = None
):
if new_state is not None:
current_state = new_state
else:
current_state = await cls.get_twitch_state(streamer_id)
if current_state is None:
return
last_state = await StateManager.get(streamer_id)
if last_state is None:
await cls.notify_start_stream(streamer_id, current_state)
await StateManager.update(streamer_id, current_state)
return
if (
event_type == EventType.STREAM_ONLINE and
datetime.now(timezone.utc) - last_state.last_live_at >= cls.START_STREAM_THRESHOLD
):
await cls.notify_start_stream(streamer_id, current_state)
await StateManager.update(streamer_id, current_state)
return
if last_state != current_state:
await cls.notify_change_category(streamer_id, current_state)
await StateManager.update(streamer_id, current_state)
return
await StateManager.update(streamer_id, current_state)
@classmethod
async def on_stream_state_change(
cls,
streamer_id: int,
event_type: EventType,
new_state: State | None = None
):
async with redis_manager.connect() as redis:
async with redis.lock(f"on_stream_state_change:{streamer_id}"):
await cls._on_stream_state_change(streamer_id, event_type, new_state)