Files
discord-bot/src/services/twitch.py
2024-08-10 04:26:52 +02:00

203 lines
6.0 KiB
Python

from asyncio import Lock, sleep
from datetime import datetime
import json
import logging
from twitchAPI.helper import first
from twitchAPI.eventsub.webhook import EventSubWebhook
from twitchAPI.twitch import Twitch
from twitchAPI.type import AuthScope
from twitchAPI.object.eventsub import ChannelChatMessageEvent, StreamOnlineEvent, StreamOfflineEvent, ChannelUpdateEvent
import aiofiles
from pydantic import BaseModel
from config import config
from services.notification import notify
logger = logging.getLogger(__name__)
class State(BaseModel):
title: str
category: str
is_live: bool
last_live_at: datetime
class TokenStorage:
lock = Lock()
@staticmethod
async def save(acceess_token: str, refresh_token: str):
data = json.dumps({"access_token": acceess_token, "refresh_token": refresh_token})
async with TokenStorage.lock:
async with aiofiles.open(config.SECRETS_FILE_PATH, "w") as f:
await f.write(data)
@staticmethod
async def get() -> tuple[str, str]:
async with TokenStorage.lock:
async with aiofiles.open(config.SECRETS_FILE_PATH, "r") as f:
data_str = await f.read()
data = json.loads(data_str)
return data["access_token"], data["refresh_token"]
class TwitchService:
SCOPES = [
AuthScope.CHAT_READ,
AuthScope.CHAT_EDIT,
AuthScope.USER_BOT,
AuthScope.CHANNEL_BOT
]
ONLINE_NOTIFICATION_DELAY = 5 * 60
def __init__(self, twitch: Twitch):
self.twitch = twitch
self.state: State | None = None
@classmethod
async def authorize(cls):
twitch = Twitch(
config.TWITCH_CLIENT_ID,
config.TWITCH_CLIENT_SECRET
)
twitch.user_auth_refresh_callback = TokenStorage.save
token, refresh_token = await TokenStorage.get()
await twitch.set_user_authentication(token, cls.SCOPES, refresh_token)
await twitch.authenticate_app(cls.SCOPES)
return twitch
async def notify_online(self):
if self.state is None:
raise RuntimeError("State is None")
msg = f"HafMC сейчас стримит {self.state.title} ({self.state.category})! \nПрисоединяйся: https://twitch.tv/hafmc"
await notify(msg)
async def notify_change_category(self):
if self.state is None:
raise RuntimeError("State is None")
msg = f"HafMC начал играть в {self.state.category}! \nПрисоединяйся: https://twitch.tv/hafmc"
await notify(msg)
async def get_current_stream(self, retry_count: int = 5, delay: int = 5):
remain_retry = retry_count
while remain_retry > 0:
stream = await first(self.twitch.get_streams(user_id=[config.TWITCH_CHANNEL_ID]))
if stream is not None:
return stream
remain_retry -= 1
await sleep(delay)
return None
async def on_channel_chat_message(self, event: ChannelChatMessageEvent):
if self.state is None or (datetime.now() - self.state.last_live_at).seconds <= self.ONLINE_NOTIFICATION_DELAY:
return
current_stream = await self.get_current_stream()
if current_stream is None:
return
self.state.last_live_at = datetime.now()
async def on_channel_update(self, event: ChannelUpdateEvent):
if self.state is None:
return
if self.state.category == event.event.category_name:
return
self.state.title = event.event.title
self.state.category = event.event.category_name
self.state.last_live_at = datetime.now()
await self.notify_change_category()
async def on_stream_online(self, event: StreamOnlineEvent):
current_stream = await self.get_current_stream()
if current_stream is None:
raise RuntimeError("Stream not found")
state = State(
title=current_stream.title,
category=current_stream.game_name,
is_live=True,
last_live_at=datetime.now()
)
if self.state is None:
self.state = state
await self.notify_online()
if (datetime.now() - self.state.last_live_at).seconds >= self.ONLINE_NOTIFICATION_DELAY:
self.state = state
await self.notify_online()
async def on_stream_offline(self, event: StreamOfflineEvent):
if self.state:
self.state.is_live = False
self.last_live_at = datetime.now()
async def run(self):
eventsub = EventSubWebhook(
callback_url=config.TWITCH_CALLBACK_URL,
port=config.TWITCH_CALLBACK_PORT,
twitch=self.twitch,
message_deduplication_history_length=50
)
current_stream = await self.get_current_stream()
if current_stream:
self.state = State(
title=current_stream.title,
category=current_stream.game_name,
is_live=current_stream.type == "live",
last_live_at=datetime.now()
)
try:
await eventsub.unsubscribe_all()
eventsub.start()
await eventsub.listen_channel_chat_message(config.TWITCH_CHANNEL_ID, config.TWITCH_ADMIN_USER_ID, self.on_channel_chat_message)
await eventsub.listen_channel_update_v2(config.TWITCH_CHANNEL_ID, self.on_channel_update)
await eventsub.listen_stream_online(config.TWITCH_CHANNEL_ID, self.on_stream_online)
await eventsub.listen_stream_offline(config.TWITCH_CHANNEL_ID, self.on_stream_offline)
while True:
await sleep(1)
finally:
await eventsub.stop()
await self.twitch.close()
@classmethod
async def start(cls):
logger.info("Starting Twitch service...")
twith = await cls.authorize()
await cls(twith).run()
async def start_twitch_service():
await TwitchService.start()