This commit is contained in:
2024-08-14 00:57:56 +02:00
parent 0aed4e3553
commit b32d5e845e
7 changed files with 215 additions and 92 deletions

View File

@@ -1,31 +1,57 @@
import json
from pydantic import BaseModel, field_validator
from pydantic_settings import BaseSettings
class TwitchConfig(BaseModel):
CHANNEL_ID: str
CHANNEL_NAME: str
class DiscordConfig(BaseModel):
GUILD_ID: int
CHANNEL_ID: int
GAME_LIST_CHANNEL_ID: int
GAME_LIST_MESSAGE_ID: int
class StreamerConfig(BaseModel):
TWITCH: TwitchConfig
DISCORD: DiscordConfig | None = None
TELEGRAM_CHANNEL_ID: int | None = None
START_STREAM_MESSAGE: str | None = None
CHANGE_CATEGORY_MESSAGE: str | None = None
class Config(BaseSettings):
DISCORD_BOT_TOKEN: str
DISCORD_BOT_ID: str
DISCORD_GUILD_ID: int
DISCORD_CHANNEL_ID: int
DISCORD_BOT_ACTIVITY: str
DISCORD_GAME_LIST_CHANNEL_ID: int
DISCORD_GAME_LIST_MESSAGE_ID: int
TELEGRAM_BOT_TOKEN: str
TELEGRAM_CHANNEL_ID: int
TWITCH_CLIENT_ID: str
TWITCH_CLIENT_SECRET: str
TWITCH_CHANNEL_ID: str
TWITCH_ADMIN_USER_ID: str
TWITCH_CALLBACK_URL: str
TWITCH_CALLBACK_PORT: int = 80
STREAMERS: list[StreamerConfig] = []
SECRETS_FILE_PATH: str
@field_validator("STREAMERS", mode="before")
def check_streamers(cls, value):
if isinstance(value, str):
return json.loads(value)
return value
config = Config() # type: ignore

View File

@@ -1,4 +1,3 @@
import asyncio
import logging
import discord
@@ -14,6 +13,21 @@ from config import config
logger = logging.getLogger(__name__)
def get_game_list_channel_to_message_map() -> dict[int, int]:
result = {}
for streamer in config.STREAMERS:
if streamer.DISCORD is None:
continue
if streamer.DISCORD.GAME_LIST_CHANNEL_ID is None or streamer.DISCORD.GAME_LIST_MESSAGE_ID is None:
continue
result[streamer.DISCORD.GAME_LIST_CHANNEL_ID] = streamer.DISCORD.GAME_LIST_MESSAGE_ID
return result
class DiscordClient(discord.Client):
def __init__(self) -> None:
intents = discord.Intents.default()
@@ -24,8 +38,15 @@ class DiscordClient(discord.Client):
self.tree = app_commands.CommandTree(self)
async def setup_hook(self):
self.tree.copy_global_to(guild=Object(id=config.DISCORD_GUILD_ID))
await self.tree.sync(guild=Object(id=config.DISCORD_GUILD_ID))
for streamer in config.STREAMERS:
if streamer.DISCORD is None:
continue
if streamer.DISCORD.GAME_LIST_CHANNEL_ID is None or streamer.DISCORD.GAME_LIST_MESSAGE_ID is None:
continue
self.tree.copy_global_to(guild=Object(id=streamer.DISCORD.GUILD_ID))
await self.tree.sync(guild=Object(id=streamer.DISCORD.GUILD_ID))
async def on_ready(self):
await self.change_presence(
@@ -58,15 +79,22 @@ async def add(
game: str,
date: str | None = None
):
if interaction.channel is None or interaction.channel.id != config.DISCORD_GAME_LIST_CHANNEL_ID:
channel_to_message = get_game_list_channel_to_message_map()
if interaction.channel is None:
await interaction.response.send_message("Команда не доступна в этом канале (#1)", ephemeral=True)
return
message_id = channel_to_message.get(interaction.channel.id)
if message_id is None:
await interaction.response.send_message("Команда не доступна в этом канале (#3)", ephemeral=True)
return
if not isinstance(interaction.channel, Messageable):
await interaction.response.send_message("Команда не доступна в этом канале (#2)", ephemeral=True)
return
game_list_message = await interaction.channel.fetch_message(config.DISCORD_GAME_LIST_MESSAGE_ID)
game_list_message = await interaction.channel.fetch_message(message_id)
game_list = GameList.parse(game_list_message.content)
game_list.add_game(category, GameItem(name=game, customer=customer, date=date))
@@ -83,7 +111,12 @@ async def game_list_autocomplete(
if not isinstance(interaction.channel, Messageable):
return []
game_list_message = await interaction.channel.fetch_message(config.DISCORD_GAME_LIST_MESSAGE_ID)
channel_to_message = get_game_list_channel_to_message_map()
message_id = channel_to_message.get(interaction.channel.id)
if message_id is None:
return []
game_list_message = await interaction.channel.fetch_message(message_id)
game_list = GameList.parse(game_list_message.content)
@@ -94,15 +127,22 @@ async def game_list_autocomplete(
@app_commands.describe(game="Игра")
@app_commands.autocomplete(game=game_list_autocomplete)
async def delete(interaction: discord.Interaction, game: str):
if interaction.channel is None or interaction.channel.id != config.DISCORD_GAME_LIST_CHANNEL_ID:
channel_to_message = get_game_list_channel_to_message_map()
if interaction.channel is None:
await interaction.response.send_message("Команда не доступна в этом канале (#1)", ephemeral=True)
return
message_id = channel_to_message.get(interaction.channel.id)
if message_id is None:
await interaction.response.send_message("Команда не доступна в этом канале (#3)", ephemeral=True)
return
if not isinstance(interaction.channel, Messageable):
await interaction.response.send_message("Команда не доступна в этом канале (#2)", ephemeral=True)
return
game_list_message = await interaction.channel.fetch_message(config.DISCORD_GAME_LIST_MESSAGE_ID)
game_list_message = await interaction.channel.fetch_message(message_id)
game_list = GameList.parse(game_list_message.content)
game_list.delete_game(game)

View File

@@ -1,25 +1,28 @@
from asyncio import gather
import logging
from httpx import AsyncClient
from config import config
from config import config, StreamerConfig
async def notify_telegram(msg: str):
logger = logging.getLogger(__name__)
async def notify_telegram(msg: str, chat_id: str):
async with AsyncClient() as client:
await client.post(
f"https://api.telegram.org/bot{config.TELEGRAM_BOT_TOKEN}/sendMessage",
json={
"chat_id": config.TELEGRAM_CHANNEL_ID,
"chat_id": chat_id,
"text": msg,
}
)
async def notify_discord(msg: str):
async def notify_discord(msg: str, channel_id: str):
async with AsyncClient() as client:
await client.post(
f"https://discord.com/api/v10/channels/{config.DISCORD_CHANNEL_ID}/messages",
f"https://discord.com/api/v10/channels/{channel_id}/messages",
headers={
"Authorization": f"Bot {config.DISCORD_BOT_TOKEN}"
},
@@ -29,8 +32,15 @@ async def notify_discord(msg: str):
)
async def notify(msg: str):
await gather(
notify_telegram(msg),
notify_discord(msg)
)
async def notify(msg: str, streamer_config: StreamerConfig):
if streamer_config.DISCORD is not None:
try:
await notify_discord(msg, str(streamer_config.DISCORD.CHANNEL_ID))
except Exception as e:
logger.error("Failed to notify discord", exc_info=e)
if streamer_config.TELEGRAM_CHANNEL_ID is not None:
try:
await notify_telegram(msg, str(streamer_config.TELEGRAM_CHANNEL_ID))
except Exception as e:
logger.error("Failed to notify telegram", exc_info=e)

View File

@@ -46,10 +46,10 @@ class DiscordEvent(BaseModel):
creator_id: str
async def get_discord_events() -> list[DiscordEvent]:
async def get_discord_events(guild_id: int) -> list[DiscordEvent]:
async with AsyncClient() as client:
response = await client.get(
f"https://discord.com/api/v10/guilds/{config.DISCORD_GUILD_ID}/scheduled-events",
f"https://discord.com/api/v10/guilds/{guild_id}/scheduled-events",
headers={"Authorization": f"Bot {config.DISCORD_BOT_TOKEN}"}
)
@@ -60,10 +60,10 @@ async def get_discord_events() -> list[DiscordEvent]:
return [event for event in events if event.creator_id == config.DISCORD_BOT_ID]
async def delete_discord_event(event_id: str):
async def delete_discord_event(guild_id: int, event_id: str):
async with AsyncClient() as client:
response = await client.delete(
f"https://discord.com/api/v10/guilds/{config.DISCORD_GUILD_ID}/scheduled-events/{event_id}",
f"https://discord.com/api/v10/guilds/{guild_id}/scheduled-events/{event_id}",
headers={"Authorization": f"Bot {config.DISCORD_BOT_TOKEN}"}
)
@@ -90,7 +90,7 @@ class CreateDiscordEvent(BaseModel):
return value.isoformat()
@classmethod
def parse_from_twitch_event(cls, event: TwitchEvent) -> Self:
def parse_from_twitch_event(cls, event: TwitchEvent, channel_name: str) -> Self:
if event.categories:
name = f"{event.name} | {event.categories}"
else:
@@ -111,18 +111,17 @@ class CreateDiscordEvent(BaseModel):
description=f"{event.description or ''}\n\n\n\n#{event.uid}",
privacy_level=2,
entity_type=3,
entity_metadata=EntityMetadata(location="https://twitch.tv/hafmc"),
entity_metadata=EntityMetadata(location=f"https://twitch.tv/{channel_name}"),
scheduled_start_time=event.start_at,
scheduled_end_time=event.end_at,
recurrence_rule=recurrence_rule
)
async def create_discord_event(event: CreateDiscordEvent):
async def create_discord_event(guild_id: int, event: CreateDiscordEvent):
async with AsyncClient() as client:
response = await client.post(
f"https://discord.com/api/v10/guilds/{config.DISCORD_GUILD_ID}/scheduled-events",
f"https://discord.com/api/v10/guilds/{guild_id}/scheduled-events",
json=event.model_dump(),
headers={
"Authorization": f"Bot {config.DISCORD_BOT_TOKEN}",
@@ -148,10 +147,10 @@ class UpdateDiscordEvent(BaseModel):
return value.isoformat()
async def edit_discord_event(event_id: str, event: UpdateDiscordEvent):
async def edit_discord_event(guild_id: int, event_id: str, event: UpdateDiscordEvent):
async with AsyncClient() as client:
response = await client.patch(
f"https://discord.com/api/v10/guilds/{config.DISCORD_GUILD_ID}/scheduled-events/{event_id}",
f"https://discord.com/api/v10/guilds/{guild_id}/scheduled-events/{event_id}",
json=event.model_dump(),
headers={
"Authorization": f"Bot {config.DISCORD_BOT_TOKEN}",

View File

@@ -10,11 +10,15 @@ from services.scheduler_sync.discord_events import (
)
from services.scheduler_sync.comparators import compare
from config import config, TwitchConfig
logger = logging.getLogger(__name__)
async def add_events(
guild_id: int,
twitch_channel_name: str,
twitch_events: list[tuple[str, TwitchEvent]],
discord_events: list[tuple[str, DiscordEvent]]
):
@@ -22,11 +26,12 @@ async def add_events(
for (uid, event) in twitch_events:
if uid not in discord_events_ids:
create_event = CreateDiscordEvent.parse_from_twitch_event(event)
await create_discord_event(create_event)
create_event = CreateDiscordEvent.parse_from_twitch_event(event, twitch_channel_name)
await create_discord_event(guild_id, create_event)
async def remove_events(
guild_id: int,
twith_events: list[tuple[str, TwitchEvent]],
discord_events: list[tuple[str, DiscordEvent]]
):
@@ -34,10 +39,12 @@ async def remove_events(
for (uid, event) in discord_events:
if uid not in twith_events_ids:
await delete_discord_event(uid)
await delete_discord_event(guild_id, uid)
async def edit_events(
guild_id: int,
twitch_channel_name: str,
twith_events: list[tuple[str, TwitchEvent]],
discord_events: list[tuple[str, DiscordEvent]]
):
@@ -46,7 +53,7 @@ async def edit_events(
if uid != discord_id:
continue
create_event = CreateDiscordEvent.parse_from_twitch_event(twitch_event)
create_event = CreateDiscordEvent.parse_from_twitch_event(twitch_event, twitch_channel_name)
if compare(create_event, discord_event):
continue
@@ -67,12 +74,12 @@ async def edit_events(
update_event.recurrence_rule.start = update_event.scheduled_start_time
await edit_discord_event(discord_event.id, update_event)
await edit_discord_event(guild_id, discord_event.id, update_event)
async def syncronize():
twitch_events = await get_twitch_events()
discord_events = await get_discord_events()
async def syncronize(twitch: TwitchConfig, discord_guild_id: int):
twitch_events = await get_twitch_events(twitch.CHANNEL_ID)
discord_events = await get_discord_events(discord_guild_id)
twitch_events_with_id = [(event.uid, event) for event in twitch_events]
discord_events_with_id = [
@@ -80,15 +87,19 @@ async def syncronize():
for event in discord_events
]
await add_events(twitch_events_with_id, discord_events_with_id)
await remove_events(twitch_events_with_id, discord_events_with_id)
await edit_events(twitch_events_with_id, discord_events_with_id)
await add_events(discord_guild_id, twitch.CHANNEL_NAME, twitch_events_with_id, discord_events_with_id)
await remove_events(discord_guild_id, twitch_events_with_id, discord_events_with_id)
await edit_events(discord_guild_id, twitch.CHANNEL_NAME, twitch_events_with_id, discord_events_with_id)
async def start_synchronizer():
while True:
try:
await syncronize()
for streamer in config.STREAMERS:
if streamer.DISCORD is None:
continue
await syncronize(streamer.TWITCH, streamer.DISCORD.GUILD_ID)
except Exception as e:
logging.error(e)

View File

@@ -7,8 +7,6 @@ import icalendar
from httpx import AsyncClient
from pydantic import BaseModel
from config import config
class Weekday(StrEnum):
Mon = "MO"
@@ -45,10 +43,10 @@ class TwitchEvent(BaseModel):
repeat_rule: Optional[WeeklyRepeatRule]
async def get_twitch_events() -> list[TwitchEvent]:
async def get_twitch_events(twitch_channel_id: str) -> list[TwitchEvent]:
async with AsyncClient() as client:
response = await client.get(
f"https://api.twitch.tv/helix/schedule/icalendar?broadcaster_id={config.TWITCH_CHANNEL_ID}"
f"https://api.twitch.tv/helix/schedule/icalendar?broadcaster_id={twitch_channel_id}"
)
events: list[TwitchEvent] = []

View File

@@ -7,13 +7,13 @@ from twitchAPI.helper import first
from twitchAPI.eventsub.webhook import EventSubWebhook
from twitchAPI.twitch import Twitch
from twitchAPI.type import AuthScope
from twitchAPI.object.eventsub import StreamOnlineEvent, StreamOfflineEvent, ChannelUpdateEvent
from twitchAPI.object.eventsub import StreamOnlineEvent, ChannelUpdateEvent
import aiofiles
from pydantic import BaseModel
from config import config
from config import config, StreamerConfig
from services.notification import notify
@@ -26,6 +26,7 @@ class State(BaseModel):
last_live_at: datetime
class TokenStorage:
lock = Lock()
@@ -59,7 +60,7 @@ class TwitchService:
def __init__(self, twitch: Twitch):
self.twitch = twitch
self.state: State | None = None
self.state: dict[str, State | None] = {}
@classmethod
async def authorize(cls):
@@ -77,30 +78,55 @@ class TwitchService:
return twitch
async def notify_online(self):
if self.state is None:
def get_streamer_config(self, streamer_id: str) -> StreamerConfig:
for streamer in config.STREAMERS:
if streamer.TWITCH.CHANNEL_ID == streamer_id:
return streamer
raise ValueError(f"Streamer with id {streamer_id} not found")
async def notify_online(self, streamer_id: str):
current_state = self.state.get(streamer_id)
if current_state is None:
raise RuntimeError("State is None")
msg = f"HafMC сейчас стримит {self.state.title} ({self.state.category})! \nПрисоединяйся: https://twitch.tv/hafmc"
streamer = self.get_streamer_config(streamer_id)
await notify(msg)
if streamer.START_STREAM_MESSAGE is None:
return
async def notify_change_category(self):
if self.state is None:
msg = streamer.START_STREAM_MESSAGE.format(
title=current_state.title,
category=current_state.category
)
await notify(msg, streamer)
async def notify_change_category(self, streamer_id: str):
current_state = self.state.get(streamer_id)
if current_state is None:
raise RuntimeError("State is None")
if (datetime.now() - self.state.last_live_at).seconds > 60:
if (datetime.now() - current_state.last_live_at).seconds > 60:
raise RuntimeError("State is not live")
msg = f"HafMC начал играть в {self.state.category}! \nПрисоединяйся: https://twitch.tv/hafmc"
streamer = self.get_streamer_config(streamer_id)
await notify(msg)
if streamer.CHANGE_CATEGORY_MESSAGE is None:
return
async def get_current_stream(self, retry_count: int = 5, delay: int = 5):
msg = streamer.CHANGE_CATEGORY_MESSAGE.format(
category=current_state.category
)
await notify(msg, streamer)
async def get_current_stream(self, streamer_id: str, retry_count: int = 5, delay: int = 5):
remain_retry = retry_count
while remain_retry > 0:
stream = await first(self.twitch.get_streams(user_id=[config.TWITCH_CHANNEL_ID]))
stream = await first(self.twitch.get_streams(user_id=[streamer_id]))
if stream is not None:
return stream
@@ -111,24 +137,29 @@ class TwitchService:
return None
async def on_channel_update(self, event: ChannelUpdateEvent):
stream = await self.get_current_stream()
brodcaster_id = event.event.broadcaster_user_id
stream = await self.get_current_stream(brodcaster_id)
if stream is None:
return
if self.state is None:
current_state = self.state.get(brodcaster_id)
if current_state is None:
return
changed = self.state.category == event.event.category_name
changed = current_state.category == event.event.category_name
self.state.title = event.event.title
self.state.category = event.event.category_name
self.state.last_live_at = datetime.now()
current_state.title = event.event.title
current_state.category = event.event.category_name
current_state.last_live_at = datetime.now()
self.state[brodcaster_id] = current_state
if changed:
await self.notify_change_category()
await self.notify_change_category(brodcaster_id)
async def _on_stream_online(self):
current_stream = await self.get_current_stream()
async def _on_stream_online(self, streamer_id: str):
current_stream = await self.get_current_stream(streamer_id)
if current_stream is None:
return
@@ -138,13 +169,15 @@ class TwitchService:
last_live_at=datetime.now()
)
if self.state is None or (datetime.now() - self.state.last_live_at).seconds >= self.ONLINE_NOTIFICATION_DELAY:
await self.notify_online()
current_state = self.state.get(streamer_id)
self.state = state
if current_state is None or (datetime.now() - current_state.last_live_at).seconds >= self.ONLINE_NOTIFICATION_DELAY:
await self.notify_online(streamer_id)
self.state[streamer_id] = state
async def on_stream_online(self, event: StreamOnlineEvent):
await self._on_stream_online()
await self._on_stream_online(event.event.broadcaster_user_id)
async def run(self):
eventsub = EventSubWebhook(
@@ -154,13 +187,16 @@ class TwitchService:
message_deduplication_history_length=50
)
current_stream = await self.get_current_stream()
for streamer in config.STREAMERS:
current_stream = await self.get_current_stream(streamer.TWITCH.CHANNEL_ID)
if current_stream:
self.state = State(
self.state[streamer.TWITCH.CHANNEL_ID] = State(
title=current_stream.title,
category=current_stream.game_name,
last_live_at=datetime.now()
)
else:
self.state[streamer.TWITCH.CHANNEL_ID] = None
try:
await eventsub.unsubscribe_all()
@@ -169,14 +205,17 @@ class TwitchService:
logger.info("Subscribe to events...")
await eventsub.listen_channel_update_v2(config.TWITCH_CHANNEL_ID, self.on_channel_update)
await eventsub.listen_stream_online(config.TWITCH_CHANNEL_ID, self.on_stream_online)
for streamer in config.STREAMERS:
await eventsub.listen_channel_update_v2(streamer.TWITCH.CHANNEL_ID, self.on_channel_update)
await eventsub.listen_stream_online(streamer.TWITCH.CHANNEL_ID, self.on_stream_online)
logger.info("Twitch service started")
while True:
await sleep(self.UPDATE_DELAY)
await self._on_stream_online()
for streamer in config.STREAMERS:
await self._on_stream_online(streamer.TWITCH.CHANNEL_ID)
finally:
await eventsub.stop()
await self.twitch.close()