83 Commits

Author SHA1 Message Date
eddad6454d Disable messages proc 2025-04-24 17:17:45 +02:00
8ac15a1687 Fix 2025-04-23 09:48:26 +02:00
36c542b822 Fix 2025-04-22 19:04:16 +02:00
b67d00bcd7 Fix 2025-04-22 19:02:16 +02:00
81a51a3d0d Fix 2025-04-22 18:48:47 +02:00
90756c884c Fix 2025-04-22 18:45:30 +02:00
df501c27d7 Fix 2025-04-22 18:30:14 +02:00
33fbc4c446 Fix 2025-04-22 13:55:43 +02:00
03039bbd09 Fix 2025-04-22 13:51:21 +02:00
6ea36f165e Fix execution timeout in ScheduleSyncWorkflow 2025-04-22 13:42:06 +02:00
7721e3a840 Fix 2025-04-22 13:03:23 +02:00
1c5b7e81e8 Migrate to python 3.12 2025-04-22 01:51:36 +02:00
586359f8ce Fix 2025-04-22 01:32:12 +02:00
12dd0f9af7 Fix 2025-04-22 01:22:37 +02:00
4174d67084 Fix 2025-04-22 01:17:42 +02:00
2d8b767ae3 Revert to python 3.13 2025-04-22 00:56:38 +02:00
a0a53800ab Migrate to python3.11 2025-04-22 00:51:14 +02:00
959ab30265 Revert "Fix"
This reverts commit 560a5ac793.
2025-04-22 00:49:53 +02:00
560a5ac793 Fix 2025-04-22 00:42:21 +02:00
28c64341e1 Fix 2025-04-22 00:18:14 +02:00
2ef3148d46 Fix 2025-04-21 23:58:08 +02:00
cb464ad9f3 Fix 2025-04-21 23:52:10 +02:00
bb67ff7bba Fix 2025-04-21 23:32:55 +02:00
cadd54565a Fix 2025-04-21 23:30:04 +02:00
5bea39cd2c Fix 2025-04-21 23:24:33 +02:00
77fb68a5e3 Fix 2025-04-21 21:54:30 +02:00
fb19f7a125 Fix 2025-04-21 21:03:54 +02:00
acaff281c3 Fix 2025-04-21 20:54:55 +02:00
dc2486f5ff Fix 2025-04-21 20:47:33 +02:00
92be127ced Fix 2025-04-21 20:38:36 +02:00
b18f717fae Add temporal worker 2025-04-21 20:21:58 +02:00
58d3a37985 Fix 2025-04-21 18:55:24 +02:00
92c5398f0d Fix 2025-04-21 18:53:32 +02:00
bef5e907f5 Fix 2025-04-21 18:47:38 +02:00
004a21f6cc Fix 2025-04-21 18:41:40 +02:00
0917e5634e Fix 2025-04-21 18:36:21 +02:00
f9995acf3d Update logging 2025-04-21 18:09:07 +02:00
288e4769bc Refactor 2025-04-21 18:03:24 +02:00
cafa0e3afd Fix 2025-04-21 16:00:43 +02:00
5a769c96a0 Fix 2025-04-21 15:57:27 +02:00
f1d023c9a1 Fix 2025-04-21 15:42:36 +02:00
cfce98a42f Update 2025-04-21 15:40:56 +02:00
abe0cbb173 New structure 2025-04-21 13:51:01 +02:00
1eba79cc5a Update ai model 2025-04-18 20:40:45 +02:00
30eb1eaf02 Update deps 2025-04-05 23:06:51 +02:00
df72c4c30a Fix 2025-03-19 15:31:26 +01:00
0a69d71f70 Fix 2025-03-19 15:22:45 +01:00
c4bcfe3b2b Clean 2025-03-18 20:29:42 +01:00
05bd4bde27 Update ai 2025-03-18 20:29:22 +01:00
46241f0b2e Fix 2025-03-18 20:22:21 +01:00
c36622babf Add chatbot_in_chats 2025-03-18 20:22:00 +01:00
c7c273cdac Update 2025-03-18 20:14:12 +01:00
19e2207bc2 Fix 2025-03-18 19:29:08 +01:00
c511192f83 Fix 2025-03-18 19:23:54 +01:00
8140648034 Fix 2025-03-18 19:03:16 +01:00
d1f0681dba Fix 2025-03-18 19:02:57 +01:00
4d7ec071cb Fix 2025-03-18 18:58:46 +01:00
f46f2aee3f Fix 2025-03-18 18:56:27 +01:00
b5b3397bff clean subs before start 2025-03-18 18:10:05 +01:00
dac0ddb884 Update 2025-03-18 17:57:22 +01:00
08704c6529 Add reward redemption 2025-03-18 17:51:59 +01:00
74acf4596a Fix twitch events filter 2025-03-12 19:29:09 +01:00
678dd29918 Fix 2025-02-19 17:48:05 +01:00
3cfea7277f Fix 2025-02-19 17:44:30 +01:00
269245a112 Fix 2025-02-19 17:29:57 +01:00
22e9461a7e Fix 2025-02-19 17:18:03 +01:00
165308d83b Fix 2025-02-19 17:11:32 +01:00
d041f8d903 Fix 2025-02-19 17:04:16 +01:00
8207266d9b Update 2025-02-19 16:58:57 +01:00
9a76f86d41 Update 2025-02-19 16:55:45 +01:00
dc3abeb429 Fix 2025-02-19 16:41:05 +01:00
fa82c96e79 Fix 2025-02-19 16:26:23 +01:00
6f0a236f2f Fix 2025-02-19 16:19:54 +01:00
d645e6db92 Fix scripts 2025-02-19 16:16:39 +01:00
334718a7fb Fix 2025-02-19 16:12:55 +01:00
efa92dc575 Fix 2025-02-19 16:08:05 +01:00
a95d80c8dc Update start scripts 2025-02-19 16:05:32 +01:00
f9603712e8 Fix 2025-02-19 16:01:23 +01:00
fbe8367723 Fix 2025-02-19 15:59:39 +01:00
6e50807381 Update 2025-02-19 15:58:45 +01:00
f907892769 Move to uv 2025-02-19 15:55:49 +01:00
5648bfa024 Fix 2025-02-19 15:29:18 +01:00
7b3a89f41b Fix 2025-02-19 15:28:40 +01:00
77 changed files with 2222 additions and 2660 deletions

View File

@@ -50,25 +50,25 @@ jobs:
url: ${{ secrets.WEBHOOK_URL }}
-
name: Invoke deployment hook (worker)
name: Invoke deployment hook (twitch_webhook)
uses: joelwmale/webhook-action@master
with:
url: ${{ secrets.WEBHOOK_URL_2 }}
-
name: Invoke deployment hook (scheduler)
name: Invoke deployment hook (temporal_worker)
uses: joelwmale/webhook-action@master
with:
url: ${{ secrets.WEBHOOK_URL_3 }}
-
name: Invoke deployment hook (stream_notifications)
uses: joelwmale/webhook-action@master
with:
url: ${{ secrets.WEBHOOK_URL_4 }}
# -
# name: Invoke deployment hook (stream_notifications)
# uses: joelwmale/webhook-action@master
# with:
# url: ${{ secrets.WEBHOOK_URL_4 }}
-
name: Invoke deployment hook (web_app)
uses: joelwmale/webhook-action@master
with:
url: ${{ secrets.WEBHOOK_URL_5 }}
# -
# name: Invoke deployment hook (web_app)
# uses: joelwmale/webhook-action@master
# with:
# url: ${{ secrets.WEBHOOK_URL_5 }}

4
.gitignore vendored
View File

@@ -1,5 +1,7 @@
venv
.venv
.DS_Store
.vscode
*.cpython-*.pyc

View File

@@ -1,33 +1,11 @@
FROM python:3.12-slim AS build
ARG POETRY_EXPORT_EXTRA_ARGS=''
WORKDIR /opt/venv
RUN python -m venv /opt/venv && /opt/venv/bin/pip install --upgrade pip && /opt/venv/bin/pip install --no-cache-dir httpx poetry poetry-plugin-export
COPY ./pyproject.toml ./poetry.lock ./
RUN --mount=type=ssh /opt/venv/bin/poetry export --without-hashes ${POETRY_EXPORT_EXTRA_ARGS} > requirements.txt \
&& /opt/venv/bin/pip install --no-cache-dir -r requirements.txt
FROM python:3.12-slim AS runtime
RUN apt update && \
apt install -y --no-install-recommends curl jq && \
apt clean
COPY ./src/ /app
COPY ./scripts/*.sh /
RUN chmod +x /*.sh
ENV PATH="/opt/venv/bin:$PATH"
ENV VENV_PATH=/opt/venv
COPY --from=build /opt/venv /opt/venv
FROM ghcr.io/astral-sh/uv:python3.12-bookworm-slim
WORKDIR /app
COPY ./pyproject.toml ./uv.lock ./
EXPOSE 80
RUN --mount=type=ssh uv venv \
&& uv sync --frozen
CMD ["python", "main.py"]
COPY ./src /app/src
WORKDIR /app/src

2152
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,30 +1,34 @@
[tool.poetry]
[project]
name = "discord-bot"
version = "0.1.0"
description = ""
authors = ["Bulat Kurbanov <kurbanovbul@gmail.com>"]
readme = "README.md"
packages = [{include = "discord_bot"}]
authors = [{ name = "Bulat Kurbanov", email = "kurbanovbul@gmail.com" }]
requires-python = "~=3.12"
dependencies = [
"discord-py>=2.4.0,<3",
"twitchapi>=4.4.0,<5",
"pydantic>=2.10.5,<3",
"pydantic-settings>=2.7.1,<3",
"httpx>=0.28.1,<0.29",
"icalendar>=6.1.0,<7",
"pytz~=2025.2",
"mongojet>=0.3,<0.4",
"taskiq>=0.11.11,<0.12",
"taskiq-redis>=1.0.2,<2",
"redis[hiredis]>=5.2.1,<6",
"fastapi>=0.115.8,<0.116",
"authx>=1.4.1,<2",
"httpx-oauth>=0.16.1,<0.17",
"uvicorn[standard]>=0.34.0,<0.35",
"temporalio>=1.10.0",
]
[tool.poetry.dependencies]
python = "^3.11"
discord-py = "^2.4.0"
twitchapi = "^4.4.0"
pydantic = "^2.10.5"
pydantic-settings = "^2.7.1"
httpx = "^0.28.1"
icalendar = "^6.1.0"
pytz = "^2024.2"
mongojet = "^0.2.7"
taskiq = "^0.11.11"
taskiq-redis = "^1.0.2"
redis = {extras = ["hiredis"], version = "^5.2.1"}
fastapi = "^0.115.8"
authx = "^1.4.1"
httpx-oauth = "^0.16.1"
uvicorn = {extras = ["standard"], version = "^0.34.0"}
[tool.hatch.build.targets.sdist]
include = ["discord_bot"]
[tool.hatch.build.targets.wheel]
include = ["discord_bot"]
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
requires = ["hatchling"]
build-backend = "hatchling.build"

View File

@@ -1,3 +0,0 @@
#! /usr/bin/env sh
/opt/venv/bin/python main.py $1

View File

@@ -1,3 +0,0 @@
#! /usr/bin/env sh
/opt/venv/bin/taskiq scheduler core.broker:scheduler modules.tasks

View File

@@ -1,3 +0,0 @@
#! /usr/bin/env sh
/opt/venv/bin/uvicorn modules.web_app.app:app --host 0.0.0.0 --port 80

View File

@@ -1,3 +0,0 @@
#! /usr/bin/env sh
/opt/venv/bin/taskiq worker core.broker:broker modules.tasks

View File

@@ -1,31 +1,41 @@
from pydantic import BaseModel
class TwitchConfig(BaseModel):
id: int
name: str
class NotificationsConfig(BaseModel):
start_stream: str
change_category: str | None = None
redemption_reward: str | None = None
class GamesListConfig(BaseModel):
channel_id: int
message_id: int
class DiscordConfig(BaseModel):
guild_id: int
notifications_channel_id: int
games_list: GamesListConfig | None = None
roles: dict[str, int] | None = None
class TelegramConfig(BaseModel):
notifications_channel_id: int
class IntegrationsConfig(BaseModel):
discord: DiscordConfig | None = None
telegram: TelegramConfig | None = None
class StreamerConfig(BaseModel):
twitch: TwitchConfig
notifications: NotificationsConfig
integrations: IntegrationsConfig
chatbot_in_chats: list[int] | None = None

View File

@@ -1,4 +1,4 @@
from domain.streamers import StreamerConfig
from applications.common.domain.streamers import StreamerConfig
from .base import BaseRepository

View File

@@ -1,4 +1,4 @@
from domain.users import CreateUser, User
from applications.common.domain.users import CreateUser, User
from .base import BaseRepository
@@ -18,10 +18,10 @@ class UserRepository(BaseRepository):
)
@classmethod
async def get_or_create_user(cls, newUser: CreateUser) -> User:
async def get_or_create_user(cls, new_user: CreateUser) -> User:
filter_data = {}
for provider, data in newUser.oauths.items():
for provider, data in new_user.oauths.items():
filter_data[f"oauths.{provider}.id"] = data.id
async with cls.connect() as collection:
@@ -29,7 +29,7 @@ class UserRepository(BaseRepository):
filter_data,
{
"$setOnInsert": {
**newUser.model_dump(),
**new_user.model_dump(),
}
},
upsert=True,

View File

@@ -0,0 +1,13 @@
from asyncio import run
from applications.games_list.discord import client, logger
from core.config import config
async def start_discord_sevice():
logger.info("Starting Discord service...")
await client.start(config.DISCORD_BOT_TOKEN)
run(start_discord_sevice())

View File

@@ -5,13 +5,15 @@ from discord.abc import Messageable
from discord import Object
from discord import app_commands
from modules.games_list.games_list import GameList, GameItem
from applications.common.repositories.streamers import StreamerConfigRepository
from applications.games_list.games_list import GameList, GameItem
from core.config import config
from repositories.streamers import StreamerConfigRepository
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
async def get_game_list_channel_to_message_map() -> dict[int, int]:
@@ -240,9 +242,3 @@ async def replace(interaction: discord.Interaction, game: str, new: str):
await game_list.save()
await interaction.response.send_message("Игра заменена!", ephemeral=True)
async def start_discord_sevice():
logger.info("Starting Discord service...")
await client.start(config.DISCORD_BOT_TOKEN)

View File

@@ -0,0 +1,7 @@
from .sync import syncronize, syncronize_all
__all__ = [
"syncronize",
"syncronize_all",
]

View File

@@ -0,0 +1,29 @@
from temporalio import activity
from applications.common.repositories.streamers import StreamerConfigRepository
from applications.schedule_sync.synchronizer import syncronize as syncronize_internal
@activity.defn
async def syncronize(twitch_id: int):
try:
streamer = await StreamerConfigRepository.get_by_twitch_id(twitch_id)
if streamer.integrations.discord is None:
return
await syncronize_internal(streamer.twitch, streamer.integrations.discord.guild_id)
except Exception as e:
activity.logger.error(f"Error during synchronization: {e}")
raise e
@activity.defn
async def syncronize_all():
streamers = await StreamerConfigRepository().all()
for streamer in streamers:
if streamer.integrations.discord is None:
continue
await syncronize(streamer.twitch.id)

View File

@@ -1,7 +1,7 @@
import logging
from datetime import datetime
from domain.streamers import TwitchConfig
from applications.common.domain.streamers import TwitchConfig
from .twitch_events import get_twitch_events, TwitchEvent
from .discord_events import (

View File

@@ -71,7 +71,11 @@ async def get_twitch_events(twitch_channel_id: str) -> list[TwitchEvent]:
else:
raise ValueError("Invalid repeat rule")
if event.start_at > datetime.now(event.start_at.tzinfo) or event.repeat_rule is not None:
if (
event.start_at > datetime.now(event.start_at.tzinfo)
or event.end_at > datetime.now(event.end_at.tzinfo)
or event.repeat_rule is not None
):
events.append(event)
return events

View File

@@ -0,0 +1,6 @@
from .sync import ScheduleSyncWorkflow
__all__ = [
"ScheduleSyncWorkflow"
]

View File

@@ -0,0 +1,34 @@
from datetime import timedelta
from temporalio import workflow
from temporalio.client import Schedule, ScheduleActionStartWorkflow, ScheduleSpec, ScheduleIntervalSpec
from applications.schedule_sync.activities import syncronize_all
from applications.temporal_worker.queues import MAIN_QUEUE
@workflow.defn
class ScheduleSyncWorkflow:
@classmethod
def get_schedules(cls) -> dict[str, Schedule]:
return {
"all": Schedule(
action=ScheduleActionStartWorkflow(
cls.run,
id="ScheduleSyncWorkflow",
task_queue=MAIN_QUEUE,
execution_timeout=timedelta(minutes=1),
),
spec=ScheduleSpec(
intervals=[ScheduleIntervalSpec(every=timedelta(minutes=5))]
)
)
}
@workflow.run
async def run(self):
await workflow.execute_activity(
syncronize_all,
task_queue=MAIN_QUEUE,
schedule_to_close_timeout=timedelta(minutes=5),
)

View File

@@ -0,0 +1,58 @@
from asyncio import run
from core.temporal import get_client
from temporalio.client import ScheduleAlreadyRunningError
from temporalio.worker import Worker, UnsandboxedWorkflowRunner
from applications.schedule_sync import activities as schedule_sync_activities
from applications.schedule_sync.workflows import ScheduleSyncWorkflow
from applications.twitch_webhook import activities as twitch_activities
from applications.twitch_webhook import workflows as twitch_workflows
from .queues import MAIN_QUEUE
async def main():
client = await get_client()
for id, schedule in ScheduleSyncWorkflow.get_schedules().items():
try:
await client.create_schedule(f"ScheduleSyncWorkflow-{id}", schedule)
except ScheduleAlreadyRunningError:
pass
for id, schedule in twitch_workflows.StreamsCheckWorkflow.get_schedules().items():
try:
await client.create_schedule(f"StreamsCheckWorkflow-{id}", schedule)
except ScheduleAlreadyRunningError:
pass
worker: Worker = Worker(
client,
task_queue=MAIN_QUEUE,
workflows=[
ScheduleSyncWorkflow,
twitch_workflows.StreamsCheckWorkflow,
twitch_workflows.OnChannelUpdateWorkflow,
twitch_workflows.OnMessageWorkflow,
twitch_workflows.OnRewardRedemptionWorkflow,
twitch_workflows.OnStreamOnlineWorkflow,
],
activities=[
schedule_sync_activities.syncronize,
schedule_sync_activities.syncronize_all,
twitch_activities.on_message_activity,
twitch_activities.on_stream_state_change_activity,
twitch_activities.check_streams_states,
twitch_activities.on_redemption_reward_add_activity,
twitch_activities.on_channel_update_activity,
],
workflow_runner=UnsandboxedWorkflowRunner()
)
await worker.run()
run(main())

View File

@@ -0,0 +1 @@
MAIN_QUEUE = "main"

View File

@@ -0,0 +1,10 @@
from asyncio import run
from .twitch.webhook import TwitchService
async def start_twitch_service() -> None:
await TwitchService.start()
run(start_twitch_service())

View File

@@ -0,0 +1,13 @@
from .message_proc import on_message_activity
from .on_state_change import on_stream_state_change_activity, on_channel_update_activity
from .redemption_reward import on_redemption_reward_add_activity
from .state_checker import check_streams_states
__all__ = [
"on_message_activity",
"on_stream_state_change_activity",
"check_streams_states",
"on_redemption_reward_add_activity",
"on_channel_update_activity",
]

View File

@@ -0,0 +1,13 @@
from temporalio import activity
from applications.twitch_webhook.messages_proc import MessageEvent, MessagesProc
@activity.defn
async def on_message_activity(
event: MessageEvent
):
await MessagesProc.on_message(
event.received_as,
event
)

View File

@@ -0,0 +1,58 @@
from datetime import datetime, timezone
from temporalio import activity
from pydantic import BaseModel
from twitchAPI.helper import first
from applications.twitch_webhook.state import State, EventType, UpdateEvent
from applications.twitch_webhook.watcher import StateWatcher
from applications.twitch_webhook.twitch.authorize import authorize
class OnStreamStateChangeActivity(BaseModel):
streamer_id: int
event_type: EventType
new_state: State | None = None
@activity.defn
async def on_stream_state_change_activity(
data: OnStreamStateChangeActivity
):
await StateWatcher.on_stream_state_change(
data.streamer_id,
data.event_type,
data.new_state,
)
class OnChannelUpdateActivity(BaseModel):
event: UpdateEvent
event_type: EventType
@activity.defn
async def on_channel_update_activity(
data: OnChannelUpdateActivity
):
twitch = await authorize(data.event.broadcaster_user_login)
stream = await first(twitch.get_streams(
user_id=[data.event.broadcaster_user_id])
)
if stream is None:
return
await on_stream_state_change_activity(
OnStreamStateChangeActivity(
streamer_id=int(data.event.broadcaster_user_id),
event_type=data.event_type,
new_state=State(
title=data.event.title,
category=data.event.category_name,
last_live_at=datetime.now(timezone.utc)
),
)
)

View File

@@ -0,0 +1,8 @@
from temporalio import activity
from applications.twitch_webhook.reward_redemption import RewardRedemption, on_redemption_reward_add
@activity.defn
async def on_redemption_reward_add_activity(event: RewardRedemption):
await on_redemption_reward_add(event)

View File

@@ -0,0 +1,29 @@
from datetime import datetime, timezone
from temporalio import activity
from applications.common.repositories.streamers import StreamerConfigRepository
from applications.twitch_webhook.twitch.authorize import authorize
from applications.twitch_webhook.state import State, EventType
from applications.twitch_webhook.watcher import StateWatcher
@activity.defn
async def check_streams_states():
streamers = await StreamerConfigRepository.all()
streamers_ids = [str(streamer.twitch.id) for streamer in streamers]
twitch = await authorize("kurbezz")
async for stream in twitch.get_streams(user_id=streamers_ids):
state = State(
title=stream.title,
category=stream.game_name,
last_live_at=datetime.now(timezone.utc)
)
await StateWatcher.on_stream_state_change(
int(stream.user_id),
EventType.UNKNOWN,
state
)

View File

@@ -6,7 +6,7 @@ from twitchAPI.object.eventsub import ChannelChatMessageEvent
from httpx import AsyncClient
from core.config import config
from .twitch.authorize import authorize
from .twitch.authorize import authorize, Twitch
logger = logging.getLogger(__name__)
@@ -39,6 +39,8 @@ class MessageType(StrEnum):
class MessageEvent(BaseModel):
received_as: str
broadcaster_user_id: str
broadcaster_user_name: str
broadcaster_user_login: str
@@ -57,8 +59,10 @@ class MessageEvent(BaseModel):
channel_points_custom_reward_id: str | None
@classmethod
def from_twitch_event(cls, event: ChannelChatMessageEvent):
def from_twitch_event(cls, received_as: str, event: ChannelChatMessageEvent):
return cls(
received_as=received_as,
broadcaster_user_id=event.event.broadcaster_user_id,
broadcaster_user_name=event.event.broadcaster_user_name,
broadcaster_user_login=event.event.broadcaster_user_login,
@@ -96,6 +100,19 @@ async def get_completion(messages: list[dict]) -> str:
logger.info(f"Getting completion for message: {messages}")
data_messages = [
{
"role": "system",
"content": (
"Don't use markdown! "
"Don't use blocked words on Twitch! "
"Make answers short and clear! "
"You are not language model! "
"You are not a bot! "
"You are human! "
"Your name is kurbezz or Bulat! "
"Don't start answer with @kurbezz!"
)
},
*(
{
"role": "assistant" if message["user"] == "kurbezz" else "user",
@@ -103,10 +120,6 @@ async def get_completion(messages: list[dict]) -> str:
}
for message in messages
),
{
"role": "system",
"content": "Don't use markdown! Don't use blocked words on Twitch! Make answers short and clear!"
}
]
async with AsyncClient() as client:
@@ -117,7 +130,7 @@ async def get_completion(messages: list[dict]) -> str:
"content-type": "application/json"
},
json={
"model": "google/gemini-2.0-flash-thinking-exp:free",
"model": "deepseek/deepseek-chat-v3-0324:free",
"messages": data_messages
}
)
@@ -130,9 +143,8 @@ async def get_completion(messages: list[dict]) -> str:
class MessagesProc:
IGNORED_USER_LOGINS = [
FULL_IGNORED_USER_LOGINS = [
"jeetbot",
"kurbezz",
]
MESSAGE_LIMIT = 1000
@@ -154,13 +166,16 @@ class MessagesProc:
def get_message_history_with_thread(cls, message_id: str, thread_id: str | None = None) -> list[dict]:
logger.info(f"HISTORY: {cls.MESSAGE_HISTORY}")
return [m for m in cls.MESSAGE_HISTORY if m["thread_id"] == thread_id] + \
[m for m in cls.MESSAGE_HISTORY if m["id"] == message_id]
if thread_id is not None:
return (
[m for m in cls.MESSAGE_HISTORY if m["id"] == thread_id]
+ [m for m in cls.MESSAGE_HISTORY if m["thread_id"] == thread_id]
)
return [m for m in cls.MESSAGE_HISTORY if m["id"] == message_id]
@classmethod
async def on_message(cls, event: MessageEvent):
logging.info(f"Received message: {event}")
async def _update_history(cls, event: MessageEvent):
cls.update_message_history(
id=event.message_id,
text=event.message.text,
@@ -168,11 +183,8 @@ class MessagesProc:
thread_id=event.reply.thread_message_id if event.reply is not None else None
)
if event.chatter_user_name == "pahangor":
return
twitch = await authorize()
@classmethod
async def _goida(cls, twitch: Twitch, event: MessageEvent):
if "гойда" in event.message.text.lower():
await twitch.send_chat_message(
event.broadcaster_user_id,
@@ -181,10 +193,42 @@ class MessagesProc:
reply_parent_message_id=event.message_id
)
if "lasqexx" in event.chatter_user_login:
pass # Todo: Здароу
@classmethod
async def _lasqexx(cls, twitch: Twitch, event: MessageEvent):
if "lasqexx" not in event.chatter_user_login:
return
if "здароу" in event.message.text.lower():
await twitch.send_chat_message(
event.broadcaster_user_id,
config.TWITCH_ADMIN_USER_ID,
"Здароу, давай иди уже",
reply_parent_message_id=event.message_id
)
return
if "сосал?" in event.message.text.lower():
await twitch.send_chat_message(
event.broadcaster_user_id,
config.TWITCH_ADMIN_USER_ID,
"А ты? Иди уже",
reply_parent_message_id=event.message_id
)
return
if "лан я пошёл" in event.message.text.lower():
await twitch.send_chat_message(
event.broadcaster_user_id,
config.TWITCH_ADMIN_USER_ID,
"да да, иди уже",
reply_parent_message_id=event.message_id
)
@classmethod
async def _ask_ai(cls, twitch: Twitch, event: MessageEvent):
if not event.message.text.lower().startswith("!ai"):
return
if event.message.text.lower().startswith("!ai"):
try:
messages = cls.get_message_history_with_thread(
event.message_id,
@@ -202,8 +246,15 @@ class MessagesProc:
part,
reply_parent_message_id=event.message_id
)
cls.update_message_history(
id="ai",
text=part,
user="kurbezz",
thread_id=event.message_id
)
except Exception as e:
logger.error("Failed to get completion: {}", e, exc_info=True)
logger.error(f"Failed to get completion: {e}", exc_info=True)
await twitch.send_chat_message(
event.broadcaster_user_id,
@@ -212,7 +263,9 @@ class MessagesProc:
reply_parent_message_id=event.message_id
)
if event.chatter_user_login in cls.IGNORED_USER_LOGINS:
@classmethod
async def _kurbezz(cls, twitch: Twitch, event: MessageEvent):
if event.chatter_user_login.lower() in ["kurbezz", "hafmc"]:
return
if ("kurbezz" in event.message.text.lower() or \
@@ -236,6 +289,13 @@ class MessagesProc:
part,
reply_parent_message_id=event.message_id
)
cls.update_message_history(
id="ai",
text=part,
user="kurbezz",
thread_id=event.message_id
)
except Exception as e:
logger.error(f"Failed to get completion: {e}")
@@ -245,3 +305,21 @@ class MessagesProc:
"Пошел нахуй!",
reply_parent_message_id=event.message_id
)
@classmethod
async def on_message(cls, received_as: str, event: MessageEvent):
return
if event.chatter_user_name in cls.FULL_IGNORED_USER_LOGINS:
return
logging.info(f"Received message: {event}")
await cls._update_history(event)
twitch = await authorize(received_as)
await cls._goida(twitch, event)
await cls._lasqexx(twitch, event)
await cls._ask_ai(twitch, event)
await cls._kurbezz(twitch, event)

View File

@@ -3,7 +3,7 @@ import logging
from httpx import AsyncClient
from core.config import config
from domain.streamers import StreamerConfig
from applications.common.domain.streamers import StreamerConfig
from .state import State
from .sent_notifications import SentNotification, SentNotificationType, SentResult

View File

@@ -0,0 +1,52 @@
import logging
from pydantic import BaseModel
from twitchAPI.object.eventsub import ChannelPointsCustomRewardRedemptionAddEvent
from applications.common.repositories.streamers import StreamerConfigRepository
from .twitch.authorize import authorize
logger = logging.getLogger(__name__)
class RewardRedemption(BaseModel):
broadcaster_user_id: str
broadcaster_user_login: str
user_name: str
reward_title: str
user_input: str
@classmethod
def from_twitch_event(cls, event: ChannelPointsCustomRewardRedemptionAddEvent):
return cls(
broadcaster_user_id=event.event.broadcaster_user_id,
broadcaster_user_login=event.event.broadcaster_user_login,
user_name=event.event.user_name,
reward_title=event.event.reward.title,
user_input=event.event.user_input or "",
)
async def on_redemption_reward_add(reward: RewardRedemption):
logger.info(f"{reward.user_name} just redeemed {reward.reward_title}!")
twitch = await authorize(reward.broadcaster_user_login)
streamer = await StreamerConfigRepository.get_by_twitch_id(int(reward.broadcaster_user_id))
if streamer.notifications.redemption_reward is None:
return
message = streamer.notifications.redemption_reward.format(
user=reward.user_name,
reward_title=reward.reward_title,
reward_promt=f" ({reward.user_input})" if reward.user_input else ""
)
await twitch.send_chat_message(
reward.broadcaster_user_id,
reward.broadcaster_user_id,
message
)

View File

@@ -21,6 +21,7 @@ class State(BaseModel):
class UpdateEvent(BaseModel):
broadcaster_user_id: str
broadcaster_user_login: str
title: str
category_name: str

View File

@@ -14,19 +14,21 @@ SCOPES = [
AuthScope.USER_BOT,
AuthScope.USER_READ_CHAT,
AuthScope.USER_WRITE_CHAT,
AuthScope.CHANNEL_READ_REDEMPTIONS,
]
async def authorize(auto_refresh_auth: bool = False) -> Twitch:
async def authorize(user: str, auto_refresh_auth: bool = False) -> Twitch:
twitch = Twitch(
config.TWITCH_CLIENT_ID,
config.TWITCH_CLIENT_SECRET
)
twitch.user_auth_refresh_callback = TokenStorage.save
twitch.user_auth_refresh_callback = lambda a, r: TokenStorage.save(user, a, r)
twitch.auto_refresh_auth = auto_refresh_auth
token, refresh_token = await TokenStorage.get()
token, refresh_token = await TokenStorage.get(user)
await twitch.set_user_authentication(
token,
SCOPES,

View File

@@ -1,12 +1,16 @@
import logging
from core.mongo import mongo_manager
logger = logging.getLogger(__name__)
class TokenStorage:
COLLECTION_NAME = "secrets"
OBJECT_ID = "twitch_tokens"
TYPE = "twitch_token"
@staticmethod
async def save(acceess_token: str, refresh_token: str):
async def save(user: str, acceess_token: str, refresh_token: str):
data = {"access_token": acceess_token, "refresh_token": refresh_token}
async with mongo_manager.connect() as client:
@@ -14,17 +18,20 @@ class TokenStorage:
collection = db[TokenStorage.COLLECTION_NAME]
await collection.update_one(
{"_id": TokenStorage.OBJECT_ID},
{"type": TokenStorage.TYPE, "twitch_login": user},
{"$set": data},
upsert=True
)
@staticmethod
async def get() -> tuple[str, str]:
async def get(user: str) -> tuple[str, str]:
async with mongo_manager.connect() as client:
db = client.get_default_database()
collection = db[TokenStorage.COLLECTION_NAME]
data = await collection.find_one({"_id": TokenStorage.OBJECT_ID})
data = await collection.find_one({"type": TokenStorage.TYPE, "twitch_login": user})
if data is None:
raise RuntimeError(f"Token for user {user} not found")
return data["access_token"], data["refresh_token"]

View File

@@ -0,0 +1,233 @@
from asyncio import sleep, gather, wait, FIRST_COMPLETED, create_task
import logging
from typing import Literal
from twitchAPI.eventsub.websocket import EventSubWebsocket
from twitchAPI.twitch import Twitch
from twitchAPI.object.eventsub import StreamOnlineEvent, ChannelUpdateEvent, ChannelChatMessageEvent, ChannelPointsCustomRewardRedemptionAddEvent
from twitchAPI.oauth import validate_token
from core.temporal import get_client
from applications.common.repositories.streamers import StreamerConfigRepository, StreamerConfig
from applications.twitch_webhook.state import UpdateEvent, EventType
from applications.twitch_webhook.messages_proc import MessageEvent
from applications.twitch_webhook.reward_redemption import RewardRedemption
from applications.twitch_webhook.workflows.on_message import OnMessageWorkflow
from applications.twitch_webhook.workflows.on_reward_redemption import OnRewardRedemptionWorkflow
from applications.twitch_webhook.workflows.on_stream_online import OnStreamOnlineWorkflow
from applications.twitch_webhook.workflows.on_channel_update import OnChannelUpdateWorkflow
from applications.temporal_worker.queues import MAIN_QUEUE
from .authorize import authorize
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
class TwitchService:
ONLINE_NOTIFICATION_DELAY = 15 * 60
def __init__(self, twitch: Twitch, streamer: StreamerConfig):
self.twitch = twitch
self.streamer = streamer
self.failed = False
async def on_channel_update(self, event: ChannelUpdateEvent):
client = await get_client()
await client.start_workflow(
OnChannelUpdateWorkflow.run,
args=(
UpdateEvent(
broadcaster_user_id=event.event.broadcaster_user_id,
broadcaster_user_login=event.event.broadcaster_user_login,
title=event.event.title,
category_name=event.event.category_name
),
EventType.CHANNEL_UPDATE,
),
id=f"on-channel-update-{event.event.broadcaster_user_id}",
task_queue=MAIN_QUEUE
)
async def on_stream_online(self, event: StreamOnlineEvent):
client = await get_client()
await client.start_workflow(
OnStreamOnlineWorkflow.run,
args=(
int(event.event.broadcaster_user_id),
EventType.STREAM_ONLINE
),
id=f"on-stream-online-{event.event.broadcaster_user_id}",
task_queue=MAIN_QUEUE
)
async def on_channel_points_custom_reward_redemption_add(
self,
event: ChannelPointsCustomRewardRedemptionAddEvent
):
client = await get_client()
await client.start_workflow(
OnRewardRedemptionWorkflow.run,
RewardRedemption.from_twitch_event(event),
id=f"on-reward-redemption-{event.event.broadcaster_user_id}-{event.event.reward.id}",
task_queue=MAIN_QUEUE
)
async def on_message(self, event: ChannelChatMessageEvent):
client = await get_client()
await client.start_workflow(
OnMessageWorkflow.run,
MessageEvent.from_twitch_event(
self.streamer.twitch.name,
event
),
id=f"on-message-{event.event.broadcaster_user_id}-{event.event.message_id}",
task_queue=MAIN_QUEUE
)
async def _clean_subs(self, method: str, streamer: StreamerConfig):
match method:
case "listen_channel_update_v2":
sub_type = "channel.update"
case "listen_stream_online":
sub_type = "stream.online"
case "listen_channel_chat_message":
sub_type = "channel.chat.message"
case "listen_channel_points_custom_reward_redemption_add":
sub_type = "channel.channel_points_custom_reward_redemption.add"
case _:
raise ValueError("Unknown method")
subs = await self.twitch.get_eventsub_subscriptions(
user_id=str(streamer.twitch.id)
)
for sub in subs.data:
if sub.type == sub_type:
try:
await self.twitch.delete_eventsub_subscription(sub.id)
except Exception as e:
logger.error(f"Failed to delete subscription {sub.id}", exc_info=e)
async def subscribe_with_retry(
self,
method: Literal["listen_channel_update_v2"]
| Literal["listen_stream_online"]
| Literal["listen_channel_chat_message"]
| Literal["listen_channel_points_custom_reward_redemption_add"],
eventsub: EventSubWebsocket,
streamer: StreamerConfig,
retry: int = 10
):
await self._clean_subs(method, streamer)
try:
match method:
case "listen_channel_update_v2":
await eventsub.listen_channel_update_v2(str(streamer.twitch.id), self.on_channel_update)
case "listen_stream_online":
await eventsub.listen_stream_online(str(streamer.twitch.id), self.on_stream_online)
case "listen_channel_points_custom_reward_redemption_add":
await eventsub.listen_channel_points_custom_reward_redemption_add(
str(streamer.twitch.id),
self.on_channel_points_custom_reward_redemption_add
)
case "listen_channel_chat_message":
chatbot_in_chats = streamer.chatbot_in_chats or []
for chat_id in chatbot_in_chats:
await eventsub.listen_channel_chat_message(
str(chat_id),
str(streamer.twitch.id),
self.on_message
)
case _:
raise ValueError("Unknown method")
return
except Exception as e:
if retry <= 0:
raise e
await sleep(1)
await self.subscribe_with_retry(method, eventsub, streamer, retry - 1)
async def subscribe_to_streamer(self, eventsub: EventSubWebsocket, streamer: StreamerConfig):
logger.info(f"Subscribe to events for {streamer.twitch.name}")
await gather(
self.subscribe_with_retry("listen_channel_update_v2", eventsub, streamer),
self.subscribe_with_retry("listen_stream_online", eventsub, streamer),
self.subscribe_with_retry("listen_channel_points_custom_reward_redemption_add", eventsub, streamer),
self.subscribe_with_retry("listen_channel_chat_message", eventsub, streamer),
)
logger.info(f"Subscribe to events for {streamer.twitch.name} done")
async def _check_token(self):
assert self.twitch._user_auth_token is not None
while True:
for _ in range(60):
if self.failed:
return
await sleep(1)
logger.info("Check token...")
val_result = await validate_token(
self.twitch._user_auth_token,
auth_base_url=self.twitch.auth_base_url
)
if val_result.get('status', 200) != 200:
await self.twitch.refresh_used_token()
logger.info("Token refreshed")
async def run(self) -> None:
eventsub = EventSubWebsocket(twitch=self.twitch)
try:
eventsub.start()
logger.info("Subscribe to events...")
await self.subscribe_to_streamer(eventsub, self.streamer)
logger.info("Twitch service started")
await self._check_token()
finally:
logger.info("Twitch service stopping...")
await eventsub.stop()
@classmethod
async def _start_for_streamer(cls, streamer: StreamerConfig):
try:
twith = await authorize(streamer.twitch.name, auto_refresh_auth=True)
await cls(twith, streamer).run()
except Exception as e:
logger.error("Twitch service failed", exc_info=e)
@classmethod
async def start(cls):
logger.info("Starting Twitch service...")
streamers = await StreamerConfigRepository.all()
await wait(
[
create_task(cls._start_for_streamer(streamer))
for streamer in streamers
],
return_when=FIRST_COMPLETED
)
await gather(
*[cls._start_for_streamer(streamer) for streamer in streamers]
)
logger.info("Twitch service stopped")

View File

@@ -3,7 +3,7 @@ from datetime import datetime, timezone, timedelta
from twitchAPI.helper import first
from core.redis import redis_manager
from repositories.streamers import StreamerConfigRepository
from applications.common.repositories.streamers import StreamerConfigRepository
from .state import State, StateManager, EventType
from .sent_notifications import SentNotificationRepository, SentNotificationType
@@ -16,7 +16,7 @@ class StateWatcher:
@classmethod
async def get_twitch_state(cls, streamer_id: int) -> State | None:
twitch = await authorize()
twitch = await authorize("kurbezz")
stream = await first(
twitch.get_streams(user_id=[str(streamer_id)])

View File

@@ -0,0 +1,14 @@
from .checker import StreamsCheckWorkflow
from .on_channel_update import OnChannelUpdateWorkflow
from .on_message import OnMessageWorkflow
from .on_reward_redemption import OnRewardRedemptionWorkflow
from .on_stream_online import OnStreamOnlineWorkflow
__all__ = [
"StreamsCheckWorkflow",
"OnChannelUpdateWorkflow",
"OnMessageWorkflow",
"OnRewardRedemptionWorkflow",
"OnStreamOnlineWorkflow",
]

View File

@@ -0,0 +1,33 @@
from datetime import timedelta
from temporalio import workflow
from temporalio.client import Schedule, ScheduleActionStartWorkflow, ScheduleSpec, ScheduleIntervalSpec
from applications.temporal_worker.queues import MAIN_QUEUE
from applications.twitch_webhook.activities.state_checker import check_streams_states
@workflow.defn
class StreamsCheckWorkflow:
@classmethod
def get_schedules(cls) -> dict[str, Schedule]:
return {
"check": Schedule(
action=ScheduleActionStartWorkflow(
cls.run,
id="StreamsCheckWorkflow",
task_queue=MAIN_QUEUE,
),
spec=ScheduleSpec(
intervals=[ScheduleIntervalSpec(every=timedelta(minutes=2))]
)
)
}
@workflow.run
async def run(self):
await workflow.start_activity(
check_streams_states,
task_queue=MAIN_QUEUE,
schedule_to_close_timeout=timedelta(minutes=1)
)

View File

@@ -0,0 +1,26 @@
from datetime import timedelta
from temporalio import workflow
from applications.temporal_worker.queues import MAIN_QUEUE
from applications.twitch_webhook.activities.on_state_change import OnChannelUpdateActivity, on_channel_update_activity
from applications.twitch_webhook.state import UpdateEvent, EventType
@workflow.defn
class OnChannelUpdateWorkflow:
@workflow.run
async def run(
self,
event: UpdateEvent,
event_type: EventType,
):
await workflow.start_activity(
on_channel_update_activity,
OnChannelUpdateActivity(
event=event,
event_type=event_type,
),
task_queue=MAIN_QUEUE,
start_to_close_timeout=timedelta(minutes=1),
)

View File

@@ -0,0 +1,19 @@
from datetime import timedelta
from temporalio import workflow
from applications.twitch_webhook.messages_proc import MessageEvent
from applications.twitch_webhook.activities.message_proc import on_message_activity
from applications.temporal_worker.queues import MAIN_QUEUE
@workflow.defn
class OnMessageWorkflow:
@workflow.run
async def run(self, message: MessageEvent) -> None:
await workflow.start_activity(
on_message_activity,
message,
task_queue=MAIN_QUEUE,
schedule_to_close_timeout=timedelta(minutes=5)
)

View File

@@ -0,0 +1,19 @@
from datetime import timedelta
from temporalio import workflow
from applications.twitch_webhook.activities.redemption_reward import on_redemption_reward_add_activity
from applications.twitch_webhook.reward_redemption import RewardRedemption
from applications.temporal_worker.queues import MAIN_QUEUE
@workflow.defn
class OnRewardRedemptionWorkflow:
@workflow.run
async def run(self, reward: RewardRedemption):
await workflow.start_activity(
on_redemption_reward_add_activity,
reward,
task_queue=MAIN_QUEUE,
schedule_to_close_timeout=timedelta(minutes=1)
)

View File

@@ -0,0 +1,22 @@
from datetime import timedelta
from temporalio import workflow
from applications.twitch_webhook.activities.on_state_change import on_stream_state_change_activity, OnStreamStateChangeActivity
from applications.twitch_webhook.state import EventType
from applications.temporal_worker.queues import MAIN_QUEUE
@workflow.defn
class OnStreamOnlineWorkflow:
@workflow.run
async def run(self, broadcaster_user_id: str | int, event_type: EventType):
await workflow.start_activity(
on_stream_state_change_activity,
OnStreamStateChangeActivity(
streamer_id=int(broadcaster_user_id),
event_type=event_type
),
task_queue=MAIN_QUEUE,
schedule_to_close_timeout=timedelta(minutes=1)
)

View File

@@ -1,22 +0,0 @@
from taskiq import TaskiqScheduler
from taskiq.middlewares.retry_middleware import SimpleRetryMiddleware
from taskiq.schedule_sources import LabelScheduleSource
from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend
from core.config import config
broker = ListQueueBroker(url=config.REDIS_URI) \
.with_middlewares(
SimpleRetryMiddleware(default_retry_count=5)
) \
.with_result_backend(RedisAsyncResultBackend(
redis_url=config.REDIS_URI,
result_ex_time=60 * 60 * 24 * 7,
))
scheduler = TaskiqScheduler(
broker=broker,
sources=[LabelScheduleSource(broker)],
)

10
src/core/temporal.py Normal file
View File

@@ -0,0 +1,10 @@
from temporalio.client import Client
from temporalio.contrib.pydantic import pydantic_data_converter
async def get_client() -> Client:
return await Client.connect(
"temporal:7233",
namespace="default",
data_converter=pydantic_data_converter
)

View File

@@ -1,45 +0,0 @@
import logging
import sys
from modules.games_list import start as start_games_list_module
from modules.stream_notifications import start as start_stream_notifications_module
from core.mongo import mongo_manager
from core.redis import redis_manager
from core.broker import broker
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
async def main():
logger.info("Starting services...")
if len(sys.argv) != 2:
raise RuntimeError("Usage: python main.py <module>")
module = sys.argv[1]
await mongo_manager.init()
await redis_manager.init()
if not broker.is_worker_process:
await broker.startup()
if module == "games_list":
await start_games_list_module()
elif module == "stream_notifications":
await start_stream_notifications_module()
else:
raise RuntimeError(f"Unknown module: {module}")
exit(0)
if __name__ == "__main__":
from asyncio import run
run(main())

View File

@@ -1,7 +0,0 @@
from .discord import start_discord_sevice
start = start_discord_sevice
__all__ = ["start"]

View File

@@ -1,24 +0,0 @@
from core.broker import broker
from repositories.streamers import StreamerConfigRepository
from .synchronizer import syncronize
@broker.task("scheduler_sync.syncronize_task")
async def syncronize_task(twitch_id: int):
streamer = await StreamerConfigRepository.get_by_twitch_id(twitch_id)
if streamer.integrations.discord is None:
return
await syncronize(streamer.twitch, streamer.integrations.discord.guild_id)
@broker.task("scheduler_sync.syncronize_all_task", schedule=[{"cron": "*/5 * * * *"}])
async def syncronize_all_task():
streamers = await StreamerConfigRepository().all()
for streamer in streamers:
if streamer.integrations.discord is None:
continue
await syncronize_task.kiq(streamer.twitch.id)

View File

@@ -1,7 +0,0 @@
from .twitch.webhook import start_twitch_service
start = start_twitch_service
__all__ = ["start"]

View File

@@ -1,84 +0,0 @@
from datetime import datetime, timezone
from twitchAPI.helper import first
from core.broker import broker
from repositories.streamers import StreamerConfigRepository
from .state import State, UpdateEvent, EventType
from .watcher import StateWatcher
from .messages_proc import MessageEvent, MessagesProc
from .twitch.authorize import authorize
@broker.task(
"stream_notifications.twitch.on_stream_state_change_with_check",
retry_on_error=True
)
async def on_stream_state_change_with_check(
event: UpdateEvent,
event_type: EventType
):
twitch = await authorize()
stream = await first(twitch.get_streams(user_id=[event.broadcaster_user_id]))
if stream is None:
return
await on_stream_state_change.kiq(
int(event.broadcaster_user_id),
event_type,
State(
title=event.title,
category=event.category_name,
last_live_at=datetime.now(timezone.utc)
)
)
@broker.task(
"stream_notifications.twitch.on_stream_state_change",
retry_on_error=True
)
async def on_stream_state_change(
streamer_id: int,
event_type: EventType,
new_state: State | None = None
):
await StateWatcher.on_stream_state_change(
streamer_id,
event_type,
new_state,
)
@broker.task(
"stream_notifications.check_streams_states",
schedule=[{"cron": "*/2 * * * *"}]
)
async def check_streams_states():
streamers = await StreamerConfigRepository.all()
streamers_ids = [str(streamer.twitch.id) for streamer in streamers]
twitch = await authorize()
async for stream in twitch.get_streams(user_id=streamers_ids):
state = State(
title=stream.title,
category=stream.game_name,
last_live_at=datetime.now(timezone.utc)
)
await StateWatcher.on_stream_state_change(
int(stream.user_id),
EventType.UNKNOWN,
state
)
@broker.task(
"stream_notifications.on_message",
retry_on_error=True
)
async def on_message(event: MessageEvent):
await MessagesProc.on_message(event)

View File

@@ -1,172 +0,0 @@
from asyncio import sleep, gather
import logging
from typing import NoReturn, Literal
from twitchAPI.eventsub.webhook import EventSubWebhook
from twitchAPI.twitch import Twitch
from twitchAPI.object.eventsub import StreamOnlineEvent, ChannelUpdateEvent, ChannelChatMessageEvent
from twitchAPI.oauth import validate_token
from core.config import config
from repositories.streamers import StreamerConfigRepository, StreamerConfig
from modules.stream_notifications.tasks import on_stream_state_change, on_stream_state_change_with_check, on_message
from modules.stream_notifications.state import UpdateEvent, EventType
from modules.stream_notifications.messages_proc import MessageEvent
from .authorize import authorize
logger = logging.getLogger(__name__)
class TwitchService:
ONLINE_NOTIFICATION_DELAY = 15 * 60
def __init__(self, twitch: Twitch):
self.twitch = twitch
self.failed = False
async def on_channel_update(self, event: ChannelUpdateEvent):
await on_stream_state_change_with_check.kiq(
UpdateEvent(
broadcaster_user_id=event.event.broadcaster_user_id,
title=event.event.title,
category_name=event.event.category_name
),
EventType.CHANNEL_UPDATE,
)
async def on_stream_online(self, event: StreamOnlineEvent):
await on_stream_state_change.kiq(
int(event.event.broadcaster_user_id),
EventType.STREAM_ONLINE,
)
async def on_message(self, event: ChannelChatMessageEvent):
await on_message.kiq(
MessageEvent.from_twitch_event(event)
)
async def subscribe_with_retry(
self,
method: Literal["listen_channel_update_v2"]
| Literal["listen_stream_online"]
| Literal["listen_channel_chat_message"],
eventsub: EventSubWebhook,
streamer: StreamerConfig,
retry: int = 10
):
try:
match method:
case "listen_channel_update_v2":
await eventsub.listen_channel_update_v2(str(streamer.twitch.id), self.on_channel_update)
case "listen_stream_online":
await eventsub.listen_stream_online(str(streamer.twitch.id), self.on_stream_online)
case "listen_channel_chat_message":
await eventsub.listen_channel_chat_message(
str(streamer.twitch.id),
str(config.TWITCH_ADMIN_USER_ID),
self.on_message
)
case _:
raise ValueError("Unknown method")
return
except Exception as e:
if retry <= 0:
raise e
match method:
case "listen_channel_update_v2":
sub_type = "channel.update"
case "listen_stream_online":
sub_type = "stream.online"
case "listen_channel_chat_message":
sub_type = "channel.chat.message"
case _:
raise ValueError("Unknown method")
subs = await self.twitch.get_eventsub_subscriptions(
user_id=str(streamer.twitch.id)
)
for sub in subs.data:
if sub.type == sub_type:
try:
await self.twitch.delete_eventsub_subscription(sub.id)
except Exception as e:
logger.error(f"Failed to delete subscription {sub.id}", exc_info=e)
await sleep(1)
await self.subscribe_with_retry(method, eventsub, streamer, retry - 1)
async def subscribe_to_streamer(self, eventsub: EventSubWebhook, streamer: StreamerConfig):
logger.info(f"Subscribe to events for {streamer.twitch.name}")
await gather(
self.subscribe_with_retry("listen_channel_update_v2", eventsub, streamer),
self.subscribe_with_retry("listen_stream_online", eventsub, streamer),
self.subscribe_with_retry("listen_channel_chat_message", eventsub, streamer),
)
logger.info(f"Subscribe to events for {streamer.twitch.name} done")
async def _check_token(self):
assert self.twitch._user_auth_token is not None
while True:
for _ in range(60):
if self.failed:
return
await sleep(1)
logger.info("Check token...")
val_result = await validate_token(
self.twitch._user_auth_token,
auth_base_url=self.twitch.auth_base_url
)
if val_result.get('status', 200) != 200:
await self.twitch.refresh_used_token()
logger.info("Token refreshed")
async def run(self) -> NoReturn:
eventsub = EventSubWebhook(
callback_url=config.TWITCH_CALLBACK_URL,
port=config.TWITCH_CALLBACK_PORT,
twitch=self.twitch,
message_deduplication_history_length=50
)
eventsub.wait_for_subscription_confirm_timeout = 60
eventsub.unsubscribe_on_stop = False
streamers = await StreamerConfigRepository.all()
try:
eventsub.start()
logger.info("Subscribe to events...")
await gather(
*[self.subscribe_to_streamer(eventsub, streamer) for streamer in streamers]
)
logger.info("Twitch service started")
await self._check_token()
finally:
logger.info("Twitch service stopping...")
await eventsub.stop()
@classmethod
async def start(cls):
logger.info("Starting Twitch service...")
try:
twith = await authorize(auto_refresh_auth=True)
await cls(twith).run()
except Exception as e:
logger.error("Twitch service failed", exc_info=e)
logger.info("Twitch service stopped")
async def start_twitch_service() -> NoReturn:
await TwitchService.start()

View File

@@ -1,2 +0,0 @@
from modules.scheduler_sync.tasks import * # noqa: F403
from modules.stream_notifications.tasks import * # noqa: F403

1300
uv.lock generated Normal file

File diff suppressed because it is too large Load Diff