This commit is contained in:
2025-03-18 18:56:27 +01:00
parent b5b3397bff
commit f46f2aee3f
8 changed files with 38 additions and 37 deletions

View File

@@ -323,7 +323,7 @@ class MessagesProc:
await cls._update_history(event)
twitch = await authorize()
twitch = await authorize(event.broadcaster_user_login)
await cls._goida(twitch, event)
await cls._lasqexx(twitch, event)

View File

@@ -4,7 +4,6 @@ from pydantic import BaseModel
from twitchAPI.object.eventsub import ChannelPointsCustomRewardRedemptionAddEvent
from core.config import config
from .twitch.authorize import authorize
@@ -13,6 +12,7 @@ logger = logging.getLogger(__name__)
class RewardRedemption(BaseModel):
broadcaster_user_id: str
broadcaster_user_login: str
user_name: str
reward_title: str
reward_prompt: str
@@ -21,6 +21,7 @@ class RewardRedemption(BaseModel):
def from_twitch_event(cls, event: ChannelPointsCustomRewardRedemptionAddEvent):
return cls(
broadcaster_user_id=event.event.broadcaster_user_id,
broadcaster_user_login=event.event.broadcaster_user_login,
user_name=event.event.user_name,
reward_title=event.event.reward.title,
reward_prompt=event.event.reward.prompt or "",
@@ -30,10 +31,10 @@ class RewardRedemption(BaseModel):
async def on_redemption_reward_add(reward: RewardRedemption):
logger.info(f"{reward.user_name} just redeemed {reward.reward_title}!")
twitch = await authorize()
twitch = await authorize(reward.broadcaster_user_login)
await twitch.send_chat_message(
reward.broadcaster_user_id,
config.TWITCH_ADMIN_USER_ID,
reward.broadcaster_user_id,
f"🎉 {reward.user_name} just redeemed {reward.reward_title}! 🎉"
)

View File

@@ -21,6 +21,7 @@ class State(BaseModel):
class UpdateEvent(BaseModel):
broadcaster_user_id: str
broadcaster_user_login: str
title: str
category_name: str

View File

@@ -20,7 +20,7 @@ async def on_stream_state_change_with_check(
event: UpdateEvent,
event_type: EventType
):
twitch = await authorize()
twitch = await authorize(event.broadcaster_user_login)
stream = await first(twitch.get_streams(user_id=[event.broadcaster_user_id]))
if stream is None:
@@ -61,7 +61,7 @@ async def check_streams_states():
streamers = await StreamerConfigRepository.all()
streamers_ids = [str(streamer.twitch.id) for streamer in streamers]
twitch = await authorize()
twitch = await authorize("kurbezz")
async for stream in twitch.get_streams(user_id=streamers_ids):
state = State(

View File

@@ -19,16 +19,16 @@ SCOPES = [
]
async def authorize(auto_refresh_auth: bool = False) -> Twitch:
async def authorize(user: str, auto_refresh_auth: bool = False) -> Twitch:
twitch = Twitch(
config.TWITCH_CLIENT_ID,
config.TWITCH_CLIENT_SECRET
)
twitch.user_auth_refresh_callback = TokenStorage.save
twitch.user_auth_refresh_callback = lambda a, r: TokenStorage.save(user, a, r)
twitch.auto_refresh_auth = auto_refresh_auth
token, refresh_token = await TokenStorage.get()
token, refresh_token = await TokenStorage.get(user)
await twitch.set_user_authentication(
token,
SCOPES,

View File

@@ -3,10 +3,10 @@ from core.mongo import mongo_manager
class TokenStorage:
COLLECTION_NAME = "secrets"
OBJECT_ID = "twitch_tokens"
TYPE = "twitch_token"
@staticmethod
async def save(acceess_token: str, refresh_token: str):
async def save(user: str, acceess_token: str, refresh_token: str):
data = {"access_token": acceess_token, "refresh_token": refresh_token}
async with mongo_manager.connect() as client:
@@ -14,17 +14,17 @@ class TokenStorage:
collection = db[TokenStorage.COLLECTION_NAME]
await collection.update_one(
{"_id": TokenStorage.OBJECT_ID},
{"type": TokenStorage.TYPE, "user": user},
{"$set": data},
upsert=True
)
@staticmethod
async def get() -> tuple[str, str]:
async def get(user: str) -> tuple[str, str]:
async with mongo_manager.connect() as client:
db = client.get_default_database()
collection = db[TokenStorage.COLLECTION_NAME]
data = await collection.find_one({"_id": TokenStorage.OBJECT_ID})
data = await collection.find_one({"type": TokenStorage.TYPE, "user": user})
return data["access_token"], data["refresh_token"]

View File

@@ -2,7 +2,7 @@ from asyncio import sleep, gather
import logging
from typing import NoReturn, Literal
from twitchAPI.eventsub.webhook import EventSubWebhook
from twitchAPI.eventsub.websocket import EventSubWebsocket
from twitchAPI.twitch import Twitch
from twitchAPI.object.eventsub import StreamOnlineEvent, ChannelUpdateEvent, ChannelChatMessageEvent, ChannelPointsCustomRewardRedemptionAddEvent
from twitchAPI.oauth import validate_token
@@ -22,8 +22,9 @@ logger = logging.getLogger(__name__)
class TwitchService:
ONLINE_NOTIFICATION_DELAY = 15 * 60
def __init__(self, twitch: Twitch):
def __init__(self, twitch: Twitch, streamer: StreamerConfig):
self.twitch = twitch
self.streamer = streamer
self.failed = False
@@ -31,6 +32,7 @@ class TwitchService:
await on_stream_state_change_with_check.kiq(
UpdateEvent(
broadcaster_user_id=event.event.broadcaster_user_id,
broadcaster_user_login=event.event.broadcaster_user_login,
title=event.event.title,
category_name=event.event.category_name
),
@@ -86,7 +88,7 @@ class TwitchService:
| Literal["listen_stream_online"]
| Literal["listen_channel_chat_message"]
| Literal["listen_channel_points_custom_reward_redemption_add"],
eventsub: EventSubWebhook,
eventsub: EventSubWebsocket,
streamer: StreamerConfig,
retry: int = 10
):
@@ -120,7 +122,7 @@ class TwitchService:
await sleep(1)
await self.subscribe_with_retry(method, eventsub, streamer, retry - 1)
async def subscribe_to_streamer(self, eventsub: EventSubWebhook, streamer: StreamerConfig):
async def subscribe_to_streamer(self, eventsub: EventSubWebsocket, streamer: StreamerConfig):
logger.info(f"Subscribe to events for {streamer.twitch.name}")
await gather(
self.subscribe_with_retry("listen_channel_update_v2", eventsub, streamer),
@@ -150,24 +152,13 @@ class TwitchService:
logger.info("Token refreshed")
async def run(self) -> NoReturn:
eventsub = EventSubWebhook(
callback_url=config.TWITCH_CALLBACK_URL,
port=config.TWITCH_CALLBACK_PORT,
twitch=self.twitch,
message_deduplication_history_length=50
)
eventsub.wait_for_subscription_confirm_timeout = 60
eventsub.unsubscribe_on_stop = False
streamers = await StreamerConfigRepository.all()
eventsub = EventSubWebsocket(twitch=self.twitch)
try:
eventsub.start()
logger.info("Subscribe to events...")
await gather(
*[self.subscribe_to_streamer(eventsub, streamer) for streamer in streamers]
)
await self.subscribe_to_streamer(eventsub, self.streamer)
logger.info("Twitch service started")
await self._check_token()
@@ -175,15 +166,23 @@ class TwitchService:
logger.info("Twitch service stopping...")
await eventsub.stop()
@classmethod
async def _start_for_streamer(cls, streamer: StreamerConfig):
try:
twith = await authorize(streamer.twitch.name, auto_refresh_auth=True)
await cls(twith, streamer).run()
except Exception as e:
logger.error("Twitch service failed", exc_info=e)
@classmethod
async def start(cls):
logger.info("Starting Twitch service...")
try:
twith = await authorize(auto_refresh_auth=True)
await cls(twith).run()
except Exception as e:
logger.error("Twitch service failed", exc_info=e)
streamers = await StreamerConfigRepository.all()
await gather(
*[cls._start_for_streamer(streamer) for streamer in streamers]
)
logger.info("Twitch service stopped")

View File

@@ -16,7 +16,7 @@ class StateWatcher:
@classmethod
async def get_twitch_state(cls, streamer_id: int) -> State | None:
twitch = await authorize()
twitch = await authorize("kurbezz")
stream = await first(
twitch.get_streams(user_id=[str(streamer_id)])