Compare commits

..

58 Commits

Author SHA1 Message Date
402042080e Fix 2025-02-19 00:43:38 +01:00
f6c7128cdc Fix 2025-02-19 00:39:14 +01:00
67a18ba3dd Update 2025-02-19 00:30:56 +01:00
b0f668e97f Add debug 2025-02-19 00:26:18 +01:00
4de3336230 Fix 2025-02-19 00:20:34 +01:00
4aff92a68d Fix 2025-02-19 00:15:36 +01:00
44441b615c Update 2025-02-19 00:14:36 +01:00
4d19ae568f Fix 2025-02-19 00:09:09 +01:00
ed3fdbcc05 Fix 2025-02-18 23:57:14 +01:00
bf43ffa8a3 Fix 2025-02-18 23:52:36 +01:00
a6a10d9110 Update 2025-02-18 23:49:16 +01:00
e86fa6635a Fix 2025-02-18 23:43:58 +01:00
60a3d903ae Update 2025-02-18 23:43:29 +01:00
aec3939816 Update 2025-02-18 23:42:57 +01:00
f9c1f7e77b Fix 2025-02-18 23:30:26 +01:00
8c1e6adc32 Fix 2025-02-18 23:21:01 +01:00
dd6be7abde Fix 2025-02-18 23:18:56 +01:00
462bd0d7dd Ignore pahangor 2025-02-18 23:16:41 +01:00
3eb49f8f4e Update 2025-02-18 23:09:32 +01:00
1f94675639 Update 2025-02-18 23:03:16 +01:00
e33d53d554 Update 2025-02-18 23:01:00 +01:00
ca63648374 Update 2025-02-18 22:49:57 +01:00
b96f651b40 Fix 2025-02-18 22:40:43 +01:00
f998a61e54 Fix 2025-02-18 22:39:51 +01:00
469275540f Fix 2025-02-18 22:34:35 +01:00
027cddd886 Fix 2025-02-18 22:33:26 +01:00
ca7f382d11 Update 2025-02-18 22:18:54 +01:00
1ea24db686 Update 2025-02-18 22:18:14 +01:00
ae5efdae73 Fix 2025-02-18 22:17:47 +01:00
5679405c7e Fix 2025-02-18 22:12:41 +01:00
99be4cbab2 Fix 2025-02-18 22:11:35 +01:00
a1d7833c1d Fix 2025-02-18 22:10:08 +01:00
ef9c88b86b Update 2025-02-18 22:09:05 +01:00
c21909138a Fix 2025-02-18 22:02:58 +01:00
d53892f9ea Fix 2025-02-18 22:01:17 +01:00
62956dc5f0 Use gemini 2025-02-18 21:58:15 +01:00
8ccd3debce Fix 2025-02-18 21:47:11 +01:00
c536bd45d7 Fix 2025-02-18 21:43:32 +01:00
fe5a39a40c Fix 2025-02-18 21:41:54 +01:00
9fc794d3ed Fix 2025-02-18 21:36:57 +01:00
20ba243272 Fix 2025-02-18 21:35:59 +01:00
5d33bd7ea9 Fix 2025-02-18 21:32:27 +01:00
c85d86ec92 Fix 2025-02-18 21:31:22 +01:00
8b58ac480a Fix 2025-02-18 21:24:23 +01:00
41ffb15e15 Update 2025-02-18 21:22:46 +01:00
20b34a0e69 Fix 2025-02-18 20:54:14 +01:00
4eac5ecd11 Fix 2025-02-18 20:51:11 +01:00
113b84c837 Update 2025-02-18 20:48:57 +01:00
4ff7d1a1d7 Fix 2025-02-18 20:45:24 +01:00
f8f923bfb4 Update 2025-02-18 20:40:03 +01:00
2e41a08bd9 Update 2025-02-18 20:33:04 +01:00
2ef0cc06c2 Fix 2025-02-18 20:25:39 +01:00
1c57392a44 Add debug 2025-02-18 20:20:39 +01:00
11d1142346 Fix channel_points_custom_reward_id 2025-02-18 20:15:40 +01:00
bb2ed88736 Update 2025-02-18 20:11:49 +01:00
3cd6b9ebe3 Test message processing 2025-02-18 20:05:51 +01:00
cc45a437d9 Add debug 2025-02-18 19:26:51 +01:00
c2c8889590 Add listen_channel_chat_message 2025-02-18 19:01:28 +01:00
7 changed files with 1318 additions and 871 deletions

1862
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -15,11 +15,11 @@ pydantic-settings = "^2.7.1"
httpx = "^0.28.1" httpx = "^0.28.1"
icalendar = "^6.1.0" icalendar = "^6.1.0"
pytz = "^2024.2" pytz = "^2024.2"
mongojet = "^0.2.5" mongojet = "^0.2.7"
taskiq = "^0.11.10" taskiq = "^0.11.11"
taskiq-redis = "^1.0.2" taskiq-redis = "^1.0.2"
redis = {extras = ["hiredis"], version = "^5.2.1"} redis = {extras = ["hiredis"], version = "^5.2.1"}
fastapi = "^0.115.6" fastapi = "^0.115.8"
authx = "^1.4.1" authx = "^1.4.1"
httpx-oauth = "^0.16.1" httpx-oauth = "^0.16.1"
uvicorn = {extras = ["standard"], version = "^0.34.0"} uvicorn = {extras = ["standard"], version = "^0.34.0"}

View File

@@ -33,6 +33,8 @@ class Config(BaseModel):
SECRET_KEY: str SECRET_KEY: str
OPENAI_API_KEY: str
def get_config() -> Config: def get_config() -> Config:
settings = Settings() # type: ignore settings = Settings() # type: ignore

View File

@@ -0,0 +1,247 @@
from enum import StrEnum
import logging
from pydantic import BaseModel
from twitchAPI.object.eventsub import ChannelChatMessageEvent
from httpx import AsyncClient
from core.config import config
from .twitch.authorize import authorize
logger = logging.getLogger(__name__)
class ChatMessage(BaseModel):
text: str
class ChatMessageReplyMetadata(BaseModel):
parent_message_id: str
parent_message_body: str
parent_user_id: str
parent_user_name: str
parent_user_login: str
thread_message_id: str
thread_user_id: str
thread_user_name: str
thread_user_login: str
class MessageType(StrEnum):
TEXT = "text"
CHANNEL_POINTS_HIGHLIGHTED = "channel_points_highlighted"
CHANNEL_POINTS_SUB_ONLY = "channel_points_sub_only"
USER_INTRO = "user_intro"
class MessageEvent(BaseModel):
broadcaster_user_id: str
broadcaster_user_name: str
broadcaster_user_login: str
chatter_user_id: str
chatter_user_name: str
chatter_user_login: str
message_id: str
message: ChatMessage
message_type: MessageType
color: str
reply: ChatMessageReplyMetadata | None
channel_points_custom_reward_id: str | None
@classmethod
def from_twitch_event(cls, event: ChannelChatMessageEvent):
return cls(
broadcaster_user_id=event.event.broadcaster_user_id,
broadcaster_user_name=event.event.broadcaster_user_name,
broadcaster_user_login=event.event.broadcaster_user_login,
chatter_user_id=event.event.chatter_user_id,
chatter_user_name=event.event.chatter_user_name,
chatter_user_login=event.event.chatter_user_login,
message_id=event.event.message_id,
message=ChatMessage(text=event.event.message.text),
message_type=MessageType(event.event.message_type),
color=event.event.color,
reply=ChatMessageReplyMetadata(
parent_message_id=event.event.reply.parent_message_id,
parent_message_body=event.event.reply.parent_message_body,
parent_user_id=event.event.reply.parent_user_id,
parent_user_name=event.event.reply.parent_user_name,
parent_user_login=event.event.reply.parent_user_login,
thread_message_id=event.event.reply.thread_message_id,
thread_user_id=event.event.reply.thread_user_id,
thread_user_name=event.event.reply.thread_user_name,
thread_user_login=event.event.reply.thread_user_login
) if event.event.reply else None,
channel_points_custom_reward_id=event.event.channel_points_custom_reward_id
)
async def get_completion(messages: list[dict]) -> str:
logger.info(f"Getting completion for message: {messages}")
data_messages = [
*(
{
"role": "assistant" if message["user"] == "kurbezz" else "user",
"content": message["text"]
}
for message in messages
),
{
"role": "system",
"content": "Don't use markdown! Don't use blocked words on Twitch! Make answers short and clear!"
}
]
async with AsyncClient() as client:
response = await client.post(
"https://openrouter.ai/api/v1/chat/completions",
headers={
"Authorization": f"Bearer {config.OPENAI_API_KEY}",
"content-type": "application/json"
},
json={
"model": "google/gemini-2.0-flash-thinking-exp:free",
"messages": data_messages
}
)
data = response.json()
logger.info(f"Got completion: {data}")
return data["choices"][0]["message"]["content"]
class MessagesProc:
IGNORED_USER_LOGINS = [
"jeetbot",
"kurbezz",
]
MESSAGE_LIMIT = 1000
MESSAGE_HISTORY = []
@classmethod
def update_message_history(cls, id: str, text: str, user: str, thread_id: str | None = None):
cls.MESSAGE_HISTORY.append({
"id": id,
"text": text,
"user": user,
"thread_id": thread_id
})
if len(cls.MESSAGE_HISTORY) > cls.MESSAGE_LIMIT:
cls.MESSAGE_HISTORY = cls.MESSAGE_HISTORY[-cls.MESSAGE_LIMIT:]
@classmethod
def get_message_history_with_thread(cls, message_id: str, thread_id: str | None = None) -> list[dict]:
logger.info(f"HISTORY: {cls.MESSAGE_HISTORY}")
return [m for m in cls.MESSAGE_HISTORY if m["thread_id"] == thread_id] + \
[m for m in cls.MESSAGE_HISTORY if m["id"] == message_id]
@classmethod
async def on_message(cls, event: MessageEvent):
logging.info(f"Received message: {event}")
cls.update_message_history(
id=event.message_id,
text=event.message.text,
user=event.chatter_user_login,
thread_id=event.reply.thread_message_id if event.reply is not None else None
)
if event.chatter_user_name == "pahangor":
return
twitch = await authorize()
if "гойда" in event.message.text.lower():
await twitch.send_chat_message(
event.broadcaster_user_id,
config.TWITCH_ADMIN_USER_ID,
"ГООООООООООООООООООООООООООООООООООООООООООООООЙДА!",
reply_parent_message_id=event.message_id
)
if "lasqexx" in event.chatter_user_login:
pass # Todo: Здароу
if event.message.text.lower().startswith("!ai"):
try:
messages = cls.get_message_history_with_thread(
event.message_id,
thread_id=event.reply.thread_message_id if event.reply is not None else None
)
completion = await get_completion(messages)
max_length = 255
completion_parts = [completion[i:i + max_length] for i in range(0, len(completion), max_length)]
for part in completion_parts:
await twitch.send_chat_message(
event.broadcaster_user_id,
config.TWITCH_ADMIN_USER_ID,
part,
reply_parent_message_id=event.message_id
)
except Exception as e:
logger.error("Failed to get completion: {}", e, exc_info=True)
await twitch.send_chat_message(
event.broadcaster_user_id,
config.TWITCH_ADMIN_USER_ID,
"Ошибка!",
reply_parent_message_id=event.message_id
)
if event.chatter_user_login in cls.IGNORED_USER_LOGINS:
return
if ("kurbezz" in event.message.text.lower() or \
"курбез" in event.message.text.lower() or \
"булат" in event.message.text.lower()):
try:
messages = cls.get_message_history_with_thread(
event.message_id,
thread_id=event.reply.thread_message_id if event.reply is not None else None
)
completion = await get_completion(messages)
max_length = 255
completion_parts = [completion[i:i + max_length] for i in range(0, len(completion), max_length)]
for part in completion_parts:
await twitch.send_chat_message(
event.broadcaster_user_id,
config.TWITCH_ADMIN_USER_ID,
part,
reply_parent_message_id=event.message_id
)
except Exception as e:
logger.error(f"Failed to get completion: {e}")
await twitch.send_chat_message(
event.broadcaster_user_id,
config.TWITCH_ADMIN_USER_ID,
"Пошел нахуй!",
reply_parent_message_id=event.message_id
)

View File

@@ -7,6 +7,7 @@ from repositories.streamers import StreamerConfigRepository
from .state import State, UpdateEvent, EventType from .state import State, UpdateEvent, EventType
from .watcher import StateWatcher from .watcher import StateWatcher
from .messages_proc import MessageEvent, MessagesProc
from .twitch.authorize import authorize from .twitch.authorize import authorize
@@ -73,3 +74,11 @@ async def check_streams_states():
EventType.UNKNOWN, EventType.UNKNOWN,
state state
) )
@broker.task(
"stream_notifications.on_message",
retry_on_error=True
)
async def on_message(event: MessageEvent):
await MessagesProc.on_message(event)

View File

@@ -8,7 +8,12 @@ from .token_storage import TokenStorage
SCOPES = [ SCOPES = [
AuthScope.CHAT_READ, AuthScope.CHAT_READ,
AuthScope.CHAT_EDIT,
AuthScope.CHANNEL_BOT,
AuthScope.USER_BOT,
AuthScope.USER_READ_CHAT,
AuthScope.USER_WRITE_CHAT,
] ]

View File

@@ -4,13 +4,14 @@ from typing import NoReturn, Literal
from twitchAPI.eventsub.webhook import EventSubWebhook from twitchAPI.eventsub.webhook import EventSubWebhook
from twitchAPI.twitch import Twitch from twitchAPI.twitch import Twitch
from twitchAPI.object.eventsub import StreamOnlineEvent, ChannelUpdateEvent from twitchAPI.object.eventsub import StreamOnlineEvent, ChannelUpdateEvent, ChannelChatMessageEvent
from twitchAPI.oauth import validate_token from twitchAPI.oauth import validate_token
from core.config import config from core.config import config
from repositories.streamers import StreamerConfigRepository, StreamerConfig from repositories.streamers import StreamerConfigRepository, StreamerConfig
from modules.stream_notifications.tasks import on_stream_state_change, on_stream_state_change_with_check from modules.stream_notifications.tasks import on_stream_state_change, on_stream_state_change_with_check, on_message
from modules.stream_notifications.state import UpdateEvent, EventType from modules.stream_notifications.state import UpdateEvent, EventType
from modules.stream_notifications.messages_proc import MessageEvent
from .authorize import authorize from .authorize import authorize
@@ -41,20 +42,34 @@ class TwitchService:
EventType.STREAM_ONLINE, EventType.STREAM_ONLINE,
) )
async def on_message(self, event: ChannelChatMessageEvent):
await on_message.kiq(
MessageEvent.from_twitch_event(event)
)
async def subscribe_with_retry( async def subscribe_with_retry(
self, self,
method: Literal["listen_channel_update_v2"] | Literal["listen_stream_online"], method: Literal["listen_channel_update_v2"]
| Literal["listen_stream_online"]
| Literal["listen_channel_chat_message"],
eventsub: EventSubWebhook, eventsub: EventSubWebhook,
streamer: StreamerConfig, streamer: StreamerConfig,
retry: int = 10 retry: int = 10
): ):
try: try:
if method == "listen_channel_update_v2": match method:
case "listen_channel_update_v2":
await eventsub.listen_channel_update_v2(str(streamer.twitch.id), self.on_channel_update) await eventsub.listen_channel_update_v2(str(streamer.twitch.id), self.on_channel_update)
elif method == "listen_stream_online": case "listen_stream_online":
await eventsub.listen_stream_online(str(streamer.twitch.id), self.on_stream_online) await eventsub.listen_stream_online(str(streamer.twitch.id), self.on_stream_online)
else: case "listen_channel_chat_message":
await eventsub.listen_channel_chat_message(
str(streamer.twitch.id),
str(config.TWITCH_ADMIN_USER_ID),
self.on_message
)
case _:
raise ValueError("Unknown method") raise ValueError("Unknown method")
return return
@@ -62,11 +77,14 @@ class TwitchService:
if retry <= 0: if retry <= 0:
raise e raise e
if method == "listen_channel_update_v2": match method:
case "listen_channel_update_v2":
sub_type = "channel.update" sub_type = "channel.update"
elif method == "listen_stream_online": case "listen_stream_online":
sub_type = "stream.online" sub_type = "stream.online"
else: case "listen_channel_chat_message":
sub_type = "channel.chat.message"
case _:
raise ValueError("Unknown method") raise ValueError("Unknown method")
subs = await self.twitch.get_eventsub_subscriptions( subs = await self.twitch.get_eventsub_subscriptions(
@@ -75,7 +93,10 @@ class TwitchService:
for sub in subs.data: for sub in subs.data:
if sub.type == sub_type: if sub.type == sub_type:
try:
await self.twitch.delete_eventsub_subscription(sub.id) await self.twitch.delete_eventsub_subscription(sub.id)
except Exception as e:
logger.error(f"Failed to delete subscription {sub.id}", exc_info=e)
await sleep(1) await sleep(1)
await self.subscribe_with_retry(method, eventsub, streamer, retry - 1) await self.subscribe_with_retry(method, eventsub, streamer, retry - 1)
@@ -84,7 +105,8 @@ class TwitchService:
logger.info(f"Subscribe to events for {streamer.twitch.name}") logger.info(f"Subscribe to events for {streamer.twitch.name}")
await gather( await gather(
self.subscribe_with_retry("listen_channel_update_v2", eventsub, streamer), self.subscribe_with_retry("listen_channel_update_v2", eventsub, streamer),
self.subscribe_with_retry("listen_stream_online", eventsub, streamer) self.subscribe_with_retry("listen_stream_online", eventsub, streamer),
self.subscribe_with_retry("listen_channel_chat_message", eventsub, streamer),
) )
logger.info(f"Subscribe to events for {streamer.twitch.name} done") logger.info(f"Subscribe to events for {streamer.twitch.name} done")