83 Commits

Author SHA1 Message Date
eddad6454d Disable messages proc 2025-04-24 17:17:45 +02:00
8ac15a1687 Fix 2025-04-23 09:48:26 +02:00
36c542b822 Fix 2025-04-22 19:04:16 +02:00
b67d00bcd7 Fix 2025-04-22 19:02:16 +02:00
81a51a3d0d Fix 2025-04-22 18:48:47 +02:00
90756c884c Fix 2025-04-22 18:45:30 +02:00
df501c27d7 Fix 2025-04-22 18:30:14 +02:00
33fbc4c446 Fix 2025-04-22 13:55:43 +02:00
03039bbd09 Fix 2025-04-22 13:51:21 +02:00
6ea36f165e Fix execution timeout in ScheduleSyncWorkflow 2025-04-22 13:42:06 +02:00
7721e3a840 Fix 2025-04-22 13:03:23 +02:00
1c5b7e81e8 Migrate to python 3.12 2025-04-22 01:51:36 +02:00
586359f8ce Fix 2025-04-22 01:32:12 +02:00
12dd0f9af7 Fix 2025-04-22 01:22:37 +02:00
4174d67084 Fix 2025-04-22 01:17:42 +02:00
2d8b767ae3 Revert to python 3.13 2025-04-22 00:56:38 +02:00
a0a53800ab Migrate to python3.11 2025-04-22 00:51:14 +02:00
959ab30265 Revert "Fix"
This reverts commit 560a5ac793.
2025-04-22 00:49:53 +02:00
560a5ac793 Fix 2025-04-22 00:42:21 +02:00
28c64341e1 Fix 2025-04-22 00:18:14 +02:00
2ef3148d46 Fix 2025-04-21 23:58:08 +02:00
cb464ad9f3 Fix 2025-04-21 23:52:10 +02:00
bb67ff7bba Fix 2025-04-21 23:32:55 +02:00
cadd54565a Fix 2025-04-21 23:30:04 +02:00
5bea39cd2c Fix 2025-04-21 23:24:33 +02:00
77fb68a5e3 Fix 2025-04-21 21:54:30 +02:00
fb19f7a125 Fix 2025-04-21 21:03:54 +02:00
acaff281c3 Fix 2025-04-21 20:54:55 +02:00
dc2486f5ff Fix 2025-04-21 20:47:33 +02:00
92be127ced Fix 2025-04-21 20:38:36 +02:00
b18f717fae Add temporal worker 2025-04-21 20:21:58 +02:00
58d3a37985 Fix 2025-04-21 18:55:24 +02:00
92c5398f0d Fix 2025-04-21 18:53:32 +02:00
bef5e907f5 Fix 2025-04-21 18:47:38 +02:00
004a21f6cc Fix 2025-04-21 18:41:40 +02:00
0917e5634e Fix 2025-04-21 18:36:21 +02:00
f9995acf3d Update logging 2025-04-21 18:09:07 +02:00
288e4769bc Refactor 2025-04-21 18:03:24 +02:00
cafa0e3afd Fix 2025-04-21 16:00:43 +02:00
5a769c96a0 Fix 2025-04-21 15:57:27 +02:00
f1d023c9a1 Fix 2025-04-21 15:42:36 +02:00
cfce98a42f Update 2025-04-21 15:40:56 +02:00
abe0cbb173 New structure 2025-04-21 13:51:01 +02:00
1eba79cc5a Update ai model 2025-04-18 20:40:45 +02:00
30eb1eaf02 Update deps 2025-04-05 23:06:51 +02:00
df72c4c30a Fix 2025-03-19 15:31:26 +01:00
0a69d71f70 Fix 2025-03-19 15:22:45 +01:00
c4bcfe3b2b Clean 2025-03-18 20:29:42 +01:00
05bd4bde27 Update ai 2025-03-18 20:29:22 +01:00
46241f0b2e Fix 2025-03-18 20:22:21 +01:00
c36622babf Add chatbot_in_chats 2025-03-18 20:22:00 +01:00
c7c273cdac Update 2025-03-18 20:14:12 +01:00
19e2207bc2 Fix 2025-03-18 19:29:08 +01:00
c511192f83 Fix 2025-03-18 19:23:54 +01:00
8140648034 Fix 2025-03-18 19:03:16 +01:00
d1f0681dba Fix 2025-03-18 19:02:57 +01:00
4d7ec071cb Fix 2025-03-18 18:58:46 +01:00
f46f2aee3f Fix 2025-03-18 18:56:27 +01:00
b5b3397bff clean subs before start 2025-03-18 18:10:05 +01:00
dac0ddb884 Update 2025-03-18 17:57:22 +01:00
08704c6529 Add reward redemption 2025-03-18 17:51:59 +01:00
74acf4596a Fix twitch events filter 2025-03-12 19:29:09 +01:00
678dd29918 Fix 2025-02-19 17:48:05 +01:00
3cfea7277f Fix 2025-02-19 17:44:30 +01:00
269245a112 Fix 2025-02-19 17:29:57 +01:00
22e9461a7e Fix 2025-02-19 17:18:03 +01:00
165308d83b Fix 2025-02-19 17:11:32 +01:00
d041f8d903 Fix 2025-02-19 17:04:16 +01:00
8207266d9b Update 2025-02-19 16:58:57 +01:00
9a76f86d41 Update 2025-02-19 16:55:45 +01:00
dc3abeb429 Fix 2025-02-19 16:41:05 +01:00
fa82c96e79 Fix 2025-02-19 16:26:23 +01:00
6f0a236f2f Fix 2025-02-19 16:19:54 +01:00
d645e6db92 Fix scripts 2025-02-19 16:16:39 +01:00
334718a7fb Fix 2025-02-19 16:12:55 +01:00
efa92dc575 Fix 2025-02-19 16:08:05 +01:00
a95d80c8dc Update start scripts 2025-02-19 16:05:32 +01:00
f9603712e8 Fix 2025-02-19 16:01:23 +01:00
fbe8367723 Fix 2025-02-19 15:59:39 +01:00
6e50807381 Update 2025-02-19 15:58:45 +01:00
f907892769 Move to uv 2025-02-19 15:55:49 +01:00
5648bfa024 Fix 2025-02-19 15:29:18 +01:00
7b3a89f41b Fix 2025-02-19 15:28:40 +01:00
77 changed files with 2222 additions and 2660 deletions

View File

@@ -50,25 +50,25 @@ jobs:
url: ${{ secrets.WEBHOOK_URL }} url: ${{ secrets.WEBHOOK_URL }}
- -
name: Invoke deployment hook (worker) name: Invoke deployment hook (twitch_webhook)
uses: joelwmale/webhook-action@master uses: joelwmale/webhook-action@master
with: with:
url: ${{ secrets.WEBHOOK_URL_2 }} url: ${{ secrets.WEBHOOK_URL_2 }}
- -
name: Invoke deployment hook (scheduler) name: Invoke deployment hook (temporal_worker)
uses: joelwmale/webhook-action@master uses: joelwmale/webhook-action@master
with: with:
url: ${{ secrets.WEBHOOK_URL_3 }} url: ${{ secrets.WEBHOOK_URL_3 }}
- # -
name: Invoke deployment hook (stream_notifications) # name: Invoke deployment hook (stream_notifications)
uses: joelwmale/webhook-action@master # uses: joelwmale/webhook-action@master
with: # with:
url: ${{ secrets.WEBHOOK_URL_4 }} # url: ${{ secrets.WEBHOOK_URL_4 }}
- # -
name: Invoke deployment hook (web_app) # name: Invoke deployment hook (web_app)
uses: joelwmale/webhook-action@master # uses: joelwmale/webhook-action@master
with: # with:
url: ${{ secrets.WEBHOOK_URL_5 }} # url: ${{ secrets.WEBHOOK_URL_5 }}

4
.gitignore vendored
View File

@@ -1,5 +1,7 @@
venv .venv
.DS_Store .DS_Store
.vscode .vscode
*.cpython-*.pyc

View File

@@ -1,33 +1,11 @@
FROM python:3.12-slim AS build FROM ghcr.io/astral-sh/uv:python3.12-bookworm-slim
ARG POETRY_EXPORT_EXTRA_ARGS=''
WORKDIR /opt/venv
RUN python -m venv /opt/venv && /opt/venv/bin/pip install --upgrade pip && /opt/venv/bin/pip install --no-cache-dir httpx poetry poetry-plugin-export
COPY ./pyproject.toml ./poetry.lock ./
RUN --mount=type=ssh /opt/venv/bin/poetry export --without-hashes ${POETRY_EXPORT_EXTRA_ARGS} > requirements.txt \
&& /opt/venv/bin/pip install --no-cache-dir -r requirements.txt
FROM python:3.12-slim AS runtime
RUN apt update && \
apt install -y --no-install-recommends curl jq && \
apt clean
COPY ./src/ /app
COPY ./scripts/*.sh /
RUN chmod +x /*.sh
ENV PATH="/opt/venv/bin:$PATH"
ENV VENV_PATH=/opt/venv
COPY --from=build /opt/venv /opt/venv
WORKDIR /app WORKDIR /app
COPY ./pyproject.toml ./uv.lock ./
EXPOSE 80 RUN --mount=type=ssh uv venv \
&& uv sync --frozen
CMD ["python", "main.py"] COPY ./src /app/src
WORKDIR /app/src

2152
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,30 +1,34 @@
[tool.poetry] [project]
name = "discord-bot" name = "discord-bot"
version = "0.1.0" version = "0.1.0"
description = "" description = ""
authors = ["Bulat Kurbanov <kurbanovbul@gmail.com>"] authors = [{ name = "Bulat Kurbanov", email = "kurbanovbul@gmail.com" }]
readme = "README.md" requires-python = "~=3.12"
packages = [{include = "discord_bot"}] dependencies = [
"discord-py>=2.4.0,<3",
"twitchapi>=4.4.0,<5",
"pydantic>=2.10.5,<3",
"pydantic-settings>=2.7.1,<3",
"httpx>=0.28.1,<0.29",
"icalendar>=6.1.0,<7",
"pytz~=2025.2",
"mongojet>=0.3,<0.4",
"taskiq>=0.11.11,<0.12",
"taskiq-redis>=1.0.2,<2",
"redis[hiredis]>=5.2.1,<6",
"fastapi>=0.115.8,<0.116",
"authx>=1.4.1,<2",
"httpx-oauth>=0.16.1,<0.17",
"uvicorn[standard]>=0.34.0,<0.35",
"temporalio>=1.10.0",
]
[tool.poetry.dependencies] [tool.hatch.build.targets.sdist]
python = "^3.11" include = ["discord_bot"]
discord-py = "^2.4.0"
twitchapi = "^4.4.0"
pydantic = "^2.10.5"
pydantic-settings = "^2.7.1"
httpx = "^0.28.1"
icalendar = "^6.1.0"
pytz = "^2024.2"
mongojet = "^0.2.7"
taskiq = "^0.11.11"
taskiq-redis = "^1.0.2"
redis = {extras = ["hiredis"], version = "^5.2.1"}
fastapi = "^0.115.8"
authx = "^1.4.1"
httpx-oauth = "^0.16.1"
uvicorn = {extras = ["standard"], version = "^0.34.0"}
[tool.hatch.build.targets.wheel]
include = ["discord_bot"]
[build-system] [build-system]
requires = ["poetry-core"] requires = ["hatchling"]
build-backend = "poetry.core.masonry.api" build-backend = "hatchling.build"

View File

@@ -1,3 +0,0 @@
#! /usr/bin/env sh
/opt/venv/bin/python main.py $1

View File

@@ -1,3 +0,0 @@
#! /usr/bin/env sh
/opt/venv/bin/taskiq scheduler core.broker:scheduler modules.tasks

View File

@@ -1,3 +0,0 @@
#! /usr/bin/env sh
/opt/venv/bin/uvicorn modules.web_app.app:app --host 0.0.0.0 --port 80

View File

@@ -1,3 +0,0 @@
#! /usr/bin/env sh
/opt/venv/bin/taskiq worker core.broker:broker modules.tasks

View File

@@ -1,31 +1,41 @@
from pydantic import BaseModel from pydantic import BaseModel
class TwitchConfig(BaseModel): class TwitchConfig(BaseModel):
id: int id: int
name: str name: str
class NotificationsConfig(BaseModel): class NotificationsConfig(BaseModel):
start_stream: str start_stream: str
change_category: str | None = None change_category: str | None = None
redemption_reward: str | None = None
class GamesListConfig(BaseModel): class GamesListConfig(BaseModel):
channel_id: int channel_id: int
message_id: int message_id: int
class DiscordConfig(BaseModel): class DiscordConfig(BaseModel):
guild_id: int guild_id: int
notifications_channel_id: int notifications_channel_id: int
games_list: GamesListConfig | None = None games_list: GamesListConfig | None = None
roles: dict[str, int] | None = None roles: dict[str, int] | None = None
class TelegramConfig(BaseModel): class TelegramConfig(BaseModel):
notifications_channel_id: int notifications_channel_id: int
class IntegrationsConfig(BaseModel): class IntegrationsConfig(BaseModel):
discord: DiscordConfig | None = None discord: DiscordConfig | None = None
telegram: TelegramConfig | None = None telegram: TelegramConfig | None = None
class StreamerConfig(BaseModel): class StreamerConfig(BaseModel):
twitch: TwitchConfig twitch: TwitchConfig
notifications: NotificationsConfig notifications: NotificationsConfig
integrations: IntegrationsConfig integrations: IntegrationsConfig
chatbot_in_chats: list[int] | None = None

View File

@@ -1,4 +1,4 @@
from domain.streamers import StreamerConfig from applications.common.domain.streamers import StreamerConfig
from .base import BaseRepository from .base import BaseRepository

View File

@@ -1,4 +1,4 @@
from domain.users import CreateUser, User from applications.common.domain.users import CreateUser, User
from .base import BaseRepository from .base import BaseRepository
@@ -18,10 +18,10 @@ class UserRepository(BaseRepository):
) )
@classmethod @classmethod
async def get_or_create_user(cls, newUser: CreateUser) -> User: async def get_or_create_user(cls, new_user: CreateUser) -> User:
filter_data = {} filter_data = {}
for provider, data in newUser.oauths.items(): for provider, data in new_user.oauths.items():
filter_data[f"oauths.{provider}.id"] = data.id filter_data[f"oauths.{provider}.id"] = data.id
async with cls.connect() as collection: async with cls.connect() as collection:
@@ -29,7 +29,7 @@ class UserRepository(BaseRepository):
filter_data, filter_data,
{ {
"$setOnInsert": { "$setOnInsert": {
**newUser.model_dump(), **new_user.model_dump(),
} }
}, },
upsert=True, upsert=True,

View File

@@ -0,0 +1,13 @@
from asyncio import run
from applications.games_list.discord import client, logger
from core.config import config
async def start_discord_sevice():
logger.info("Starting Discord service...")
await client.start(config.DISCORD_BOT_TOKEN)
run(start_discord_sevice())

View File

@@ -5,13 +5,15 @@ from discord.abc import Messageable
from discord import Object from discord import Object
from discord import app_commands from discord import app_commands
from modules.games_list.games_list import GameList, GameItem from applications.common.repositories.streamers import StreamerConfigRepository
from applications.games_list.games_list import GameList, GameItem
from core.config import config from core.config import config
from repositories.streamers import StreamerConfigRepository
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
async def get_game_list_channel_to_message_map() -> dict[int, int]: async def get_game_list_channel_to_message_map() -> dict[int, int]:
@@ -240,9 +242,3 @@ async def replace(interaction: discord.Interaction, game: str, new: str):
await game_list.save() await game_list.save()
await interaction.response.send_message("Игра заменена!", ephemeral=True) await interaction.response.send_message("Игра заменена!", ephemeral=True)
async def start_discord_sevice():
logger.info("Starting Discord service...")
await client.start(config.DISCORD_BOT_TOKEN)

View File

@@ -0,0 +1,7 @@
from .sync import syncronize, syncronize_all
__all__ = [
"syncronize",
"syncronize_all",
]

View File

@@ -0,0 +1,29 @@
from temporalio import activity
from applications.common.repositories.streamers import StreamerConfigRepository
from applications.schedule_sync.synchronizer import syncronize as syncronize_internal
@activity.defn
async def syncronize(twitch_id: int):
try:
streamer = await StreamerConfigRepository.get_by_twitch_id(twitch_id)
if streamer.integrations.discord is None:
return
await syncronize_internal(streamer.twitch, streamer.integrations.discord.guild_id)
except Exception as e:
activity.logger.error(f"Error during synchronization: {e}")
raise e
@activity.defn
async def syncronize_all():
streamers = await StreamerConfigRepository().all()
for streamer in streamers:
if streamer.integrations.discord is None:
continue
await syncronize(streamer.twitch.id)

View File

@@ -1,7 +1,7 @@
import logging import logging
from datetime import datetime from datetime import datetime
from domain.streamers import TwitchConfig from applications.common.domain.streamers import TwitchConfig
from .twitch_events import get_twitch_events, TwitchEvent from .twitch_events import get_twitch_events, TwitchEvent
from .discord_events import ( from .discord_events import (

View File

@@ -71,7 +71,11 @@ async def get_twitch_events(twitch_channel_id: str) -> list[TwitchEvent]:
else: else:
raise ValueError("Invalid repeat rule") raise ValueError("Invalid repeat rule")
if event.start_at > datetime.now(event.start_at.tzinfo) or event.repeat_rule is not None: if (
event.start_at > datetime.now(event.start_at.tzinfo)
or event.end_at > datetime.now(event.end_at.tzinfo)
or event.repeat_rule is not None
):
events.append(event) events.append(event)
return events return events

View File

@@ -0,0 +1,6 @@
from .sync import ScheduleSyncWorkflow
__all__ = [
"ScheduleSyncWorkflow"
]

View File

@@ -0,0 +1,34 @@
from datetime import timedelta
from temporalio import workflow
from temporalio.client import Schedule, ScheduleActionStartWorkflow, ScheduleSpec, ScheduleIntervalSpec
from applications.schedule_sync.activities import syncronize_all
from applications.temporal_worker.queues import MAIN_QUEUE
@workflow.defn
class ScheduleSyncWorkflow:
@classmethod
def get_schedules(cls) -> dict[str, Schedule]:
return {
"all": Schedule(
action=ScheduleActionStartWorkflow(
cls.run,
id="ScheduleSyncWorkflow",
task_queue=MAIN_QUEUE,
execution_timeout=timedelta(minutes=1),
),
spec=ScheduleSpec(
intervals=[ScheduleIntervalSpec(every=timedelta(minutes=5))]
)
)
}
@workflow.run
async def run(self):
await workflow.execute_activity(
syncronize_all,
task_queue=MAIN_QUEUE,
schedule_to_close_timeout=timedelta(minutes=5),
)

View File

@@ -0,0 +1,58 @@
from asyncio import run
from core.temporal import get_client
from temporalio.client import ScheduleAlreadyRunningError
from temporalio.worker import Worker, UnsandboxedWorkflowRunner
from applications.schedule_sync import activities as schedule_sync_activities
from applications.schedule_sync.workflows import ScheduleSyncWorkflow
from applications.twitch_webhook import activities as twitch_activities
from applications.twitch_webhook import workflows as twitch_workflows
from .queues import MAIN_QUEUE
async def main():
client = await get_client()
for id, schedule in ScheduleSyncWorkflow.get_schedules().items():
try:
await client.create_schedule(f"ScheduleSyncWorkflow-{id}", schedule)
except ScheduleAlreadyRunningError:
pass
for id, schedule in twitch_workflows.StreamsCheckWorkflow.get_schedules().items():
try:
await client.create_schedule(f"StreamsCheckWorkflow-{id}", schedule)
except ScheduleAlreadyRunningError:
pass
worker: Worker = Worker(
client,
task_queue=MAIN_QUEUE,
workflows=[
ScheduleSyncWorkflow,
twitch_workflows.StreamsCheckWorkflow,
twitch_workflows.OnChannelUpdateWorkflow,
twitch_workflows.OnMessageWorkflow,
twitch_workflows.OnRewardRedemptionWorkflow,
twitch_workflows.OnStreamOnlineWorkflow,
],
activities=[
schedule_sync_activities.syncronize,
schedule_sync_activities.syncronize_all,
twitch_activities.on_message_activity,
twitch_activities.on_stream_state_change_activity,
twitch_activities.check_streams_states,
twitch_activities.on_redemption_reward_add_activity,
twitch_activities.on_channel_update_activity,
],
workflow_runner=UnsandboxedWorkflowRunner()
)
await worker.run()
run(main())

View File

@@ -0,0 +1 @@
MAIN_QUEUE = "main"

View File

@@ -0,0 +1,10 @@
from asyncio import run
from .twitch.webhook import TwitchService
async def start_twitch_service() -> None:
await TwitchService.start()
run(start_twitch_service())

View File

@@ -0,0 +1,13 @@
from .message_proc import on_message_activity
from .on_state_change import on_stream_state_change_activity, on_channel_update_activity
from .redemption_reward import on_redemption_reward_add_activity
from .state_checker import check_streams_states
__all__ = [
"on_message_activity",
"on_stream_state_change_activity",
"check_streams_states",
"on_redemption_reward_add_activity",
"on_channel_update_activity",
]

View File

@@ -0,0 +1,13 @@
from temporalio import activity
from applications.twitch_webhook.messages_proc import MessageEvent, MessagesProc
@activity.defn
async def on_message_activity(
event: MessageEvent
):
await MessagesProc.on_message(
event.received_as,
event
)

View File

@@ -0,0 +1,58 @@
from datetime import datetime, timezone
from temporalio import activity
from pydantic import BaseModel
from twitchAPI.helper import first
from applications.twitch_webhook.state import State, EventType, UpdateEvent
from applications.twitch_webhook.watcher import StateWatcher
from applications.twitch_webhook.twitch.authorize import authorize
class OnStreamStateChangeActivity(BaseModel):
streamer_id: int
event_type: EventType
new_state: State | None = None
@activity.defn
async def on_stream_state_change_activity(
data: OnStreamStateChangeActivity
):
await StateWatcher.on_stream_state_change(
data.streamer_id,
data.event_type,
data.new_state,
)
class OnChannelUpdateActivity(BaseModel):
event: UpdateEvent
event_type: EventType
@activity.defn
async def on_channel_update_activity(
data: OnChannelUpdateActivity
):
twitch = await authorize(data.event.broadcaster_user_login)
stream = await first(twitch.get_streams(
user_id=[data.event.broadcaster_user_id])
)
if stream is None:
return
await on_stream_state_change_activity(
OnStreamStateChangeActivity(
streamer_id=int(data.event.broadcaster_user_id),
event_type=data.event_type,
new_state=State(
title=data.event.title,
category=data.event.category_name,
last_live_at=datetime.now(timezone.utc)
),
)
)

View File

@@ -0,0 +1,8 @@
from temporalio import activity
from applications.twitch_webhook.reward_redemption import RewardRedemption, on_redemption_reward_add
@activity.defn
async def on_redemption_reward_add_activity(event: RewardRedemption):
await on_redemption_reward_add(event)

View File

@@ -0,0 +1,29 @@
from datetime import datetime, timezone
from temporalio import activity
from applications.common.repositories.streamers import StreamerConfigRepository
from applications.twitch_webhook.twitch.authorize import authorize
from applications.twitch_webhook.state import State, EventType
from applications.twitch_webhook.watcher import StateWatcher
@activity.defn
async def check_streams_states():
streamers = await StreamerConfigRepository.all()
streamers_ids = [str(streamer.twitch.id) for streamer in streamers]
twitch = await authorize("kurbezz")
async for stream in twitch.get_streams(user_id=streamers_ids):
state = State(
title=stream.title,
category=stream.game_name,
last_live_at=datetime.now(timezone.utc)
)
await StateWatcher.on_stream_state_change(
int(stream.user_id),
EventType.UNKNOWN,
state
)

View File

@@ -6,7 +6,7 @@ from twitchAPI.object.eventsub import ChannelChatMessageEvent
from httpx import AsyncClient from httpx import AsyncClient
from core.config import config from core.config import config
from .twitch.authorize import authorize from .twitch.authorize import authorize, Twitch
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -39,6 +39,8 @@ class MessageType(StrEnum):
class MessageEvent(BaseModel): class MessageEvent(BaseModel):
received_as: str
broadcaster_user_id: str broadcaster_user_id: str
broadcaster_user_name: str broadcaster_user_name: str
broadcaster_user_login: str broadcaster_user_login: str
@@ -57,8 +59,10 @@ class MessageEvent(BaseModel):
channel_points_custom_reward_id: str | None channel_points_custom_reward_id: str | None
@classmethod @classmethod
def from_twitch_event(cls, event: ChannelChatMessageEvent): def from_twitch_event(cls, received_as: str, event: ChannelChatMessageEvent):
return cls( return cls(
received_as=received_as,
broadcaster_user_id=event.event.broadcaster_user_id, broadcaster_user_id=event.event.broadcaster_user_id,
broadcaster_user_name=event.event.broadcaster_user_name, broadcaster_user_name=event.event.broadcaster_user_name,
broadcaster_user_login=event.event.broadcaster_user_login, broadcaster_user_login=event.event.broadcaster_user_login,
@@ -96,6 +100,19 @@ async def get_completion(messages: list[dict]) -> str:
logger.info(f"Getting completion for message: {messages}") logger.info(f"Getting completion for message: {messages}")
data_messages = [ data_messages = [
{
"role": "system",
"content": (
"Don't use markdown! "
"Don't use blocked words on Twitch! "
"Make answers short and clear! "
"You are not language model! "
"You are not a bot! "
"You are human! "
"Your name is kurbezz or Bulat! "
"Don't start answer with @kurbezz!"
)
},
*( *(
{ {
"role": "assistant" if message["user"] == "kurbezz" else "user", "role": "assistant" if message["user"] == "kurbezz" else "user",
@@ -103,10 +120,6 @@ async def get_completion(messages: list[dict]) -> str:
} }
for message in messages for message in messages
), ),
{
"role": "system",
"content": "Don't use markdown! Don't use blocked words on Twitch! Make answers short and clear!"
}
] ]
async with AsyncClient() as client: async with AsyncClient() as client:
@@ -117,7 +130,7 @@ async def get_completion(messages: list[dict]) -> str:
"content-type": "application/json" "content-type": "application/json"
}, },
json={ json={
"model": "google/gemini-2.0-flash-thinking-exp:free", "model": "deepseek/deepseek-chat-v3-0324:free",
"messages": data_messages "messages": data_messages
} }
) )
@@ -130,9 +143,8 @@ async def get_completion(messages: list[dict]) -> str:
class MessagesProc: class MessagesProc:
IGNORED_USER_LOGINS = [ FULL_IGNORED_USER_LOGINS = [
"jeetbot", "jeetbot",
"kurbezz",
] ]
MESSAGE_LIMIT = 1000 MESSAGE_LIMIT = 1000
@@ -154,13 +166,16 @@ class MessagesProc:
def get_message_history_with_thread(cls, message_id: str, thread_id: str | None = None) -> list[dict]: def get_message_history_with_thread(cls, message_id: str, thread_id: str | None = None) -> list[dict]:
logger.info(f"HISTORY: {cls.MESSAGE_HISTORY}") logger.info(f"HISTORY: {cls.MESSAGE_HISTORY}")
return [m for m in cls.MESSAGE_HISTORY if m["thread_id"] == thread_id] + \ if thread_id is not None:
[m for m in cls.MESSAGE_HISTORY if m["id"] == message_id] return (
[m for m in cls.MESSAGE_HISTORY if m["id"] == thread_id]
+ [m for m in cls.MESSAGE_HISTORY if m["thread_id"] == thread_id]
)
return [m for m in cls.MESSAGE_HISTORY if m["id"] == message_id]
@classmethod @classmethod
async def on_message(cls, event: MessageEvent): async def _update_history(cls, event: MessageEvent):
logging.info(f"Received message: {event}")
cls.update_message_history( cls.update_message_history(
id=event.message_id, id=event.message_id,
text=event.message.text, text=event.message.text,
@@ -168,11 +183,8 @@ class MessagesProc:
thread_id=event.reply.thread_message_id if event.reply is not None else None thread_id=event.reply.thread_message_id if event.reply is not None else None
) )
if event.chatter_user_name == "pahangor": @classmethod
return async def _goida(cls, twitch: Twitch, event: MessageEvent):
twitch = await authorize()
if "гойда" in event.message.text.lower(): if "гойда" in event.message.text.lower():
await twitch.send_chat_message( await twitch.send_chat_message(
event.broadcaster_user_id, event.broadcaster_user_id,
@@ -181,38 +193,79 @@ class MessagesProc:
reply_parent_message_id=event.message_id reply_parent_message_id=event.message_id
) )
if "lasqexx" in event.chatter_user_login: @classmethod
pass # Todo: Здароу async def _lasqexx(cls, twitch: Twitch, event: MessageEvent):
if "lasqexx" not in event.chatter_user_login:
return
if event.message.text.lower().startswith("!ai"): if "здароу" in event.message.text.lower():
try: await twitch.send_chat_message(
messages = cls.get_message_history_with_thread( event.broadcaster_user_id,
event.message_id, config.TWITCH_ADMIN_USER_ID,
thread_id=event.reply.thread_message_id if event.reply is not None else None "Здароу, давай иди уже",
) reply_parent_message_id=event.message_id
completion = await get_completion(messages) )
return
max_length = 255 if "сосал?" in event.message.text.lower():
completion_parts = [completion[i:i + max_length] for i in range(0, len(completion), max_length)] await twitch.send_chat_message(
event.broadcaster_user_id,
config.TWITCH_ADMIN_USER_ID,
"А ты? Иди уже",
reply_parent_message_id=event.message_id
)
return
for part in completion_parts: if "лан я пошёл" in event.message.text.lower():
await twitch.send_chat_message( await twitch.send_chat_message(
event.broadcaster_user_id, event.broadcaster_user_id,
config.TWITCH_ADMIN_USER_ID, config.TWITCH_ADMIN_USER_ID,
part, "да да, иди уже",
reply_parent_message_id=event.message_id reply_parent_message_id=event.message_id
) )
except Exception as e:
logger.error("Failed to get completion: {}", e, exc_info=True)
@classmethod
async def _ask_ai(cls, twitch: Twitch, event: MessageEvent):
if not event.message.text.lower().startswith("!ai"):
return
try:
messages = cls.get_message_history_with_thread(
event.message_id,
thread_id=event.reply.thread_message_id if event.reply is not None else None
)
completion = await get_completion(messages)
max_length = 255
completion_parts = [completion[i:i + max_length] for i in range(0, len(completion), max_length)]
for part in completion_parts:
await twitch.send_chat_message( await twitch.send_chat_message(
event.broadcaster_user_id, event.broadcaster_user_id,
config.TWITCH_ADMIN_USER_ID, config.TWITCH_ADMIN_USER_ID,
"Ошибка!", part,
reply_parent_message_id=event.message_id reply_parent_message_id=event.message_id
) )
if event.chatter_user_login in cls.IGNORED_USER_LOGINS: cls.update_message_history(
id="ai",
text=part,
user="kurbezz",
thread_id=event.message_id
)
except Exception as e:
logger.error(f"Failed to get completion: {e}", exc_info=True)
await twitch.send_chat_message(
event.broadcaster_user_id,
config.TWITCH_ADMIN_USER_ID,
"Ошибка!",
reply_parent_message_id=event.message_id
)
@classmethod
async def _kurbezz(cls, twitch: Twitch, event: MessageEvent):
if event.chatter_user_login.lower() in ["kurbezz", "hafmc"]:
return return
if ("kurbezz" in event.message.text.lower() or \ if ("kurbezz" in event.message.text.lower() or \
@@ -236,6 +289,13 @@ class MessagesProc:
part, part,
reply_parent_message_id=event.message_id reply_parent_message_id=event.message_id
) )
cls.update_message_history(
id="ai",
text=part,
user="kurbezz",
thread_id=event.message_id
)
except Exception as e: except Exception as e:
logger.error(f"Failed to get completion: {e}") logger.error(f"Failed to get completion: {e}")
@@ -245,3 +305,21 @@ class MessagesProc:
"Пошел нахуй!", "Пошел нахуй!",
reply_parent_message_id=event.message_id reply_parent_message_id=event.message_id
) )
@classmethod
async def on_message(cls, received_as: str, event: MessageEvent):
return
if event.chatter_user_name in cls.FULL_IGNORED_USER_LOGINS:
return
logging.info(f"Received message: {event}")
await cls._update_history(event)
twitch = await authorize(received_as)
await cls._goida(twitch, event)
await cls._lasqexx(twitch, event)
await cls._ask_ai(twitch, event)
await cls._kurbezz(twitch, event)

View File

@@ -3,7 +3,7 @@ import logging
from httpx import AsyncClient from httpx import AsyncClient
from core.config import config from core.config import config
from domain.streamers import StreamerConfig from applications.common.domain.streamers import StreamerConfig
from .state import State from .state import State
from .sent_notifications import SentNotification, SentNotificationType, SentResult from .sent_notifications import SentNotification, SentNotificationType, SentResult

View File

@@ -0,0 +1,52 @@
import logging
from pydantic import BaseModel
from twitchAPI.object.eventsub import ChannelPointsCustomRewardRedemptionAddEvent
from applications.common.repositories.streamers import StreamerConfigRepository
from .twitch.authorize import authorize
logger = logging.getLogger(__name__)
class RewardRedemption(BaseModel):
broadcaster_user_id: str
broadcaster_user_login: str
user_name: str
reward_title: str
user_input: str
@classmethod
def from_twitch_event(cls, event: ChannelPointsCustomRewardRedemptionAddEvent):
return cls(
broadcaster_user_id=event.event.broadcaster_user_id,
broadcaster_user_login=event.event.broadcaster_user_login,
user_name=event.event.user_name,
reward_title=event.event.reward.title,
user_input=event.event.user_input or "",
)
async def on_redemption_reward_add(reward: RewardRedemption):
logger.info(f"{reward.user_name} just redeemed {reward.reward_title}!")
twitch = await authorize(reward.broadcaster_user_login)
streamer = await StreamerConfigRepository.get_by_twitch_id(int(reward.broadcaster_user_id))
if streamer.notifications.redemption_reward is None:
return
message = streamer.notifications.redemption_reward.format(
user=reward.user_name,
reward_title=reward.reward_title,
reward_promt=f" ({reward.user_input})" if reward.user_input else ""
)
await twitch.send_chat_message(
reward.broadcaster_user_id,
reward.broadcaster_user_id,
message
)

View File

@@ -21,6 +21,7 @@ class State(BaseModel):
class UpdateEvent(BaseModel): class UpdateEvent(BaseModel):
broadcaster_user_id: str broadcaster_user_id: str
broadcaster_user_login: str
title: str title: str
category_name: str category_name: str

View File

@@ -14,19 +14,21 @@ SCOPES = [
AuthScope.USER_BOT, AuthScope.USER_BOT,
AuthScope.USER_READ_CHAT, AuthScope.USER_READ_CHAT,
AuthScope.USER_WRITE_CHAT, AuthScope.USER_WRITE_CHAT,
AuthScope.CHANNEL_READ_REDEMPTIONS,
] ]
async def authorize(auto_refresh_auth: bool = False) -> Twitch: async def authorize(user: str, auto_refresh_auth: bool = False) -> Twitch:
twitch = Twitch( twitch = Twitch(
config.TWITCH_CLIENT_ID, config.TWITCH_CLIENT_ID,
config.TWITCH_CLIENT_SECRET config.TWITCH_CLIENT_SECRET
) )
twitch.user_auth_refresh_callback = TokenStorage.save twitch.user_auth_refresh_callback = lambda a, r: TokenStorage.save(user, a, r)
twitch.auto_refresh_auth = auto_refresh_auth twitch.auto_refresh_auth = auto_refresh_auth
token, refresh_token = await TokenStorage.get() token, refresh_token = await TokenStorage.get(user)
await twitch.set_user_authentication( await twitch.set_user_authentication(
token, token,
SCOPES, SCOPES,

View File

@@ -1,12 +1,16 @@
import logging
from core.mongo import mongo_manager from core.mongo import mongo_manager
logger = logging.getLogger(__name__)
class TokenStorage: class TokenStorage:
COLLECTION_NAME = "secrets" COLLECTION_NAME = "secrets"
OBJECT_ID = "twitch_tokens" TYPE = "twitch_token"
@staticmethod @staticmethod
async def save(acceess_token: str, refresh_token: str): async def save(user: str, acceess_token: str, refresh_token: str):
data = {"access_token": acceess_token, "refresh_token": refresh_token} data = {"access_token": acceess_token, "refresh_token": refresh_token}
async with mongo_manager.connect() as client: async with mongo_manager.connect() as client:
@@ -14,17 +18,20 @@ class TokenStorage:
collection = db[TokenStorage.COLLECTION_NAME] collection = db[TokenStorage.COLLECTION_NAME]
await collection.update_one( await collection.update_one(
{"_id": TokenStorage.OBJECT_ID}, {"type": TokenStorage.TYPE, "twitch_login": user},
{"$set": data}, {"$set": data},
upsert=True upsert=True
) )
@staticmethod @staticmethod
async def get() -> tuple[str, str]: async def get(user: str) -> tuple[str, str]:
async with mongo_manager.connect() as client: async with mongo_manager.connect() as client:
db = client.get_default_database() db = client.get_default_database()
collection = db[TokenStorage.COLLECTION_NAME] collection = db[TokenStorage.COLLECTION_NAME]
data = await collection.find_one({"_id": TokenStorage.OBJECT_ID}) data = await collection.find_one({"type": TokenStorage.TYPE, "twitch_login": user})
if data is None:
raise RuntimeError(f"Token for user {user} not found")
return data["access_token"], data["refresh_token"] return data["access_token"], data["refresh_token"]

View File

@@ -0,0 +1,233 @@
from asyncio import sleep, gather, wait, FIRST_COMPLETED, create_task
import logging
from typing import Literal
from twitchAPI.eventsub.websocket import EventSubWebsocket
from twitchAPI.twitch import Twitch
from twitchAPI.object.eventsub import StreamOnlineEvent, ChannelUpdateEvent, ChannelChatMessageEvent, ChannelPointsCustomRewardRedemptionAddEvent
from twitchAPI.oauth import validate_token
from core.temporal import get_client
from applications.common.repositories.streamers import StreamerConfigRepository, StreamerConfig
from applications.twitch_webhook.state import UpdateEvent, EventType
from applications.twitch_webhook.messages_proc import MessageEvent
from applications.twitch_webhook.reward_redemption import RewardRedemption
from applications.twitch_webhook.workflows.on_message import OnMessageWorkflow
from applications.twitch_webhook.workflows.on_reward_redemption import OnRewardRedemptionWorkflow
from applications.twitch_webhook.workflows.on_stream_online import OnStreamOnlineWorkflow
from applications.twitch_webhook.workflows.on_channel_update import OnChannelUpdateWorkflow
from applications.temporal_worker.queues import MAIN_QUEUE
from .authorize import authorize
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
class TwitchService:
ONLINE_NOTIFICATION_DELAY = 15 * 60
def __init__(self, twitch: Twitch, streamer: StreamerConfig):
self.twitch = twitch
self.streamer = streamer
self.failed = False
async def on_channel_update(self, event: ChannelUpdateEvent):
client = await get_client()
await client.start_workflow(
OnChannelUpdateWorkflow.run,
args=(
UpdateEvent(
broadcaster_user_id=event.event.broadcaster_user_id,
broadcaster_user_login=event.event.broadcaster_user_login,
title=event.event.title,
category_name=event.event.category_name
),
EventType.CHANNEL_UPDATE,
),
id=f"on-channel-update-{event.event.broadcaster_user_id}",
task_queue=MAIN_QUEUE
)
async def on_stream_online(self, event: StreamOnlineEvent):
client = await get_client()
await client.start_workflow(
OnStreamOnlineWorkflow.run,
args=(
int(event.event.broadcaster_user_id),
EventType.STREAM_ONLINE
),
id=f"on-stream-online-{event.event.broadcaster_user_id}",
task_queue=MAIN_QUEUE
)
async def on_channel_points_custom_reward_redemption_add(
self,
event: ChannelPointsCustomRewardRedemptionAddEvent
):
client = await get_client()
await client.start_workflow(
OnRewardRedemptionWorkflow.run,
RewardRedemption.from_twitch_event(event),
id=f"on-reward-redemption-{event.event.broadcaster_user_id}-{event.event.reward.id}",
task_queue=MAIN_QUEUE
)
async def on_message(self, event: ChannelChatMessageEvent):
client = await get_client()
await client.start_workflow(
OnMessageWorkflow.run,
MessageEvent.from_twitch_event(
self.streamer.twitch.name,
event
),
id=f"on-message-{event.event.broadcaster_user_id}-{event.event.message_id}",
task_queue=MAIN_QUEUE
)
async def _clean_subs(self, method: str, streamer: StreamerConfig):
match method:
case "listen_channel_update_v2":
sub_type = "channel.update"
case "listen_stream_online":
sub_type = "stream.online"
case "listen_channel_chat_message":
sub_type = "channel.chat.message"
case "listen_channel_points_custom_reward_redemption_add":
sub_type = "channel.channel_points_custom_reward_redemption.add"
case _:
raise ValueError("Unknown method")
subs = await self.twitch.get_eventsub_subscriptions(
user_id=str(streamer.twitch.id)
)
for sub in subs.data:
if sub.type == sub_type:
try:
await self.twitch.delete_eventsub_subscription(sub.id)
except Exception as e:
logger.error(f"Failed to delete subscription {sub.id}", exc_info=e)
async def subscribe_with_retry(
self,
method: Literal["listen_channel_update_v2"]
| Literal["listen_stream_online"]
| Literal["listen_channel_chat_message"]
| Literal["listen_channel_points_custom_reward_redemption_add"],
eventsub: EventSubWebsocket,
streamer: StreamerConfig,
retry: int = 10
):
await self._clean_subs(method, streamer)
try:
match method:
case "listen_channel_update_v2":
await eventsub.listen_channel_update_v2(str(streamer.twitch.id), self.on_channel_update)
case "listen_stream_online":
await eventsub.listen_stream_online(str(streamer.twitch.id), self.on_stream_online)
case "listen_channel_points_custom_reward_redemption_add":
await eventsub.listen_channel_points_custom_reward_redemption_add(
str(streamer.twitch.id),
self.on_channel_points_custom_reward_redemption_add
)
case "listen_channel_chat_message":
chatbot_in_chats = streamer.chatbot_in_chats or []
for chat_id in chatbot_in_chats:
await eventsub.listen_channel_chat_message(
str(chat_id),
str(streamer.twitch.id),
self.on_message
)
case _:
raise ValueError("Unknown method")
return
except Exception as e:
if retry <= 0:
raise e
await sleep(1)
await self.subscribe_with_retry(method, eventsub, streamer, retry - 1)
async def subscribe_to_streamer(self, eventsub: EventSubWebsocket, streamer: StreamerConfig):
logger.info(f"Subscribe to events for {streamer.twitch.name}")
await gather(
self.subscribe_with_retry("listen_channel_update_v2", eventsub, streamer),
self.subscribe_with_retry("listen_stream_online", eventsub, streamer),
self.subscribe_with_retry("listen_channel_points_custom_reward_redemption_add", eventsub, streamer),
self.subscribe_with_retry("listen_channel_chat_message", eventsub, streamer),
)
logger.info(f"Subscribe to events for {streamer.twitch.name} done")
async def _check_token(self):
assert self.twitch._user_auth_token is not None
while True:
for _ in range(60):
if self.failed:
return
await sleep(1)
logger.info("Check token...")
val_result = await validate_token(
self.twitch._user_auth_token,
auth_base_url=self.twitch.auth_base_url
)
if val_result.get('status', 200) != 200:
await self.twitch.refresh_used_token()
logger.info("Token refreshed")
async def run(self) -> None:
eventsub = EventSubWebsocket(twitch=self.twitch)
try:
eventsub.start()
logger.info("Subscribe to events...")
await self.subscribe_to_streamer(eventsub, self.streamer)
logger.info("Twitch service started")
await self._check_token()
finally:
logger.info("Twitch service stopping...")
await eventsub.stop()
@classmethod
async def _start_for_streamer(cls, streamer: StreamerConfig):
try:
twith = await authorize(streamer.twitch.name, auto_refresh_auth=True)
await cls(twith, streamer).run()
except Exception as e:
logger.error("Twitch service failed", exc_info=e)
@classmethod
async def start(cls):
logger.info("Starting Twitch service...")
streamers = await StreamerConfigRepository.all()
await wait(
[
create_task(cls._start_for_streamer(streamer))
for streamer in streamers
],
return_when=FIRST_COMPLETED
)
await gather(
*[cls._start_for_streamer(streamer) for streamer in streamers]
)
logger.info("Twitch service stopped")

View File

@@ -3,7 +3,7 @@ from datetime import datetime, timezone, timedelta
from twitchAPI.helper import first from twitchAPI.helper import first
from core.redis import redis_manager from core.redis import redis_manager
from repositories.streamers import StreamerConfigRepository from applications.common.repositories.streamers import StreamerConfigRepository
from .state import State, StateManager, EventType from .state import State, StateManager, EventType
from .sent_notifications import SentNotificationRepository, SentNotificationType from .sent_notifications import SentNotificationRepository, SentNotificationType
@@ -16,7 +16,7 @@ class StateWatcher:
@classmethod @classmethod
async def get_twitch_state(cls, streamer_id: int) -> State | None: async def get_twitch_state(cls, streamer_id: int) -> State | None:
twitch = await authorize() twitch = await authorize("kurbezz")
stream = await first( stream = await first(
twitch.get_streams(user_id=[str(streamer_id)]) twitch.get_streams(user_id=[str(streamer_id)])

View File

@@ -0,0 +1,14 @@
from .checker import StreamsCheckWorkflow
from .on_channel_update import OnChannelUpdateWorkflow
from .on_message import OnMessageWorkflow
from .on_reward_redemption import OnRewardRedemptionWorkflow
from .on_stream_online import OnStreamOnlineWorkflow
__all__ = [
"StreamsCheckWorkflow",
"OnChannelUpdateWorkflow",
"OnMessageWorkflow",
"OnRewardRedemptionWorkflow",
"OnStreamOnlineWorkflow",
]

View File

@@ -0,0 +1,33 @@
from datetime import timedelta
from temporalio import workflow
from temporalio.client import Schedule, ScheduleActionStartWorkflow, ScheduleSpec, ScheduleIntervalSpec
from applications.temporal_worker.queues import MAIN_QUEUE
from applications.twitch_webhook.activities.state_checker import check_streams_states
@workflow.defn
class StreamsCheckWorkflow:
@classmethod
def get_schedules(cls) -> dict[str, Schedule]:
return {
"check": Schedule(
action=ScheduleActionStartWorkflow(
cls.run,
id="StreamsCheckWorkflow",
task_queue=MAIN_QUEUE,
),
spec=ScheduleSpec(
intervals=[ScheduleIntervalSpec(every=timedelta(minutes=2))]
)
)
}
@workflow.run
async def run(self):
await workflow.start_activity(
check_streams_states,
task_queue=MAIN_QUEUE,
schedule_to_close_timeout=timedelta(minutes=1)
)

View File

@@ -0,0 +1,26 @@
from datetime import timedelta
from temporalio import workflow
from applications.temporal_worker.queues import MAIN_QUEUE
from applications.twitch_webhook.activities.on_state_change import OnChannelUpdateActivity, on_channel_update_activity
from applications.twitch_webhook.state import UpdateEvent, EventType
@workflow.defn
class OnChannelUpdateWorkflow:
@workflow.run
async def run(
self,
event: UpdateEvent,
event_type: EventType,
):
await workflow.start_activity(
on_channel_update_activity,
OnChannelUpdateActivity(
event=event,
event_type=event_type,
),
task_queue=MAIN_QUEUE,
start_to_close_timeout=timedelta(minutes=1),
)

View File

@@ -0,0 +1,19 @@
from datetime import timedelta
from temporalio import workflow
from applications.twitch_webhook.messages_proc import MessageEvent
from applications.twitch_webhook.activities.message_proc import on_message_activity
from applications.temporal_worker.queues import MAIN_QUEUE
@workflow.defn
class OnMessageWorkflow:
@workflow.run
async def run(self, message: MessageEvent) -> None:
await workflow.start_activity(
on_message_activity,
message,
task_queue=MAIN_QUEUE,
schedule_to_close_timeout=timedelta(minutes=5)
)

View File

@@ -0,0 +1,19 @@
from datetime import timedelta
from temporalio import workflow
from applications.twitch_webhook.activities.redemption_reward import on_redemption_reward_add_activity
from applications.twitch_webhook.reward_redemption import RewardRedemption
from applications.temporal_worker.queues import MAIN_QUEUE
@workflow.defn
class OnRewardRedemptionWorkflow:
@workflow.run
async def run(self, reward: RewardRedemption):
await workflow.start_activity(
on_redemption_reward_add_activity,
reward,
task_queue=MAIN_QUEUE,
schedule_to_close_timeout=timedelta(minutes=1)
)

View File

@@ -0,0 +1,22 @@
from datetime import timedelta
from temporalio import workflow
from applications.twitch_webhook.activities.on_state_change import on_stream_state_change_activity, OnStreamStateChangeActivity
from applications.twitch_webhook.state import EventType
from applications.temporal_worker.queues import MAIN_QUEUE
@workflow.defn
class OnStreamOnlineWorkflow:
@workflow.run
async def run(self, broadcaster_user_id: str | int, event_type: EventType):
await workflow.start_activity(
on_stream_state_change_activity,
OnStreamStateChangeActivity(
streamer_id=int(broadcaster_user_id),
event_type=event_type
),
task_queue=MAIN_QUEUE,
schedule_to_close_timeout=timedelta(minutes=1)
)

View File

@@ -1,22 +0,0 @@
from taskiq import TaskiqScheduler
from taskiq.middlewares.retry_middleware import SimpleRetryMiddleware
from taskiq.schedule_sources import LabelScheduleSource
from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend
from core.config import config
broker = ListQueueBroker(url=config.REDIS_URI) \
.with_middlewares(
SimpleRetryMiddleware(default_retry_count=5)
) \
.with_result_backend(RedisAsyncResultBackend(
redis_url=config.REDIS_URI,
result_ex_time=60 * 60 * 24 * 7,
))
scheduler = TaskiqScheduler(
broker=broker,
sources=[LabelScheduleSource(broker)],
)

10
src/core/temporal.py Normal file
View File

@@ -0,0 +1,10 @@
from temporalio.client import Client
from temporalio.contrib.pydantic import pydantic_data_converter
async def get_client() -> Client:
return await Client.connect(
"temporal:7233",
namespace="default",
data_converter=pydantic_data_converter
)

View File

@@ -1,45 +0,0 @@
import logging
import sys
from modules.games_list import start as start_games_list_module
from modules.stream_notifications import start as start_stream_notifications_module
from core.mongo import mongo_manager
from core.redis import redis_manager
from core.broker import broker
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
async def main():
logger.info("Starting services...")
if len(sys.argv) != 2:
raise RuntimeError("Usage: python main.py <module>")
module = sys.argv[1]
await mongo_manager.init()
await redis_manager.init()
if not broker.is_worker_process:
await broker.startup()
if module == "games_list":
await start_games_list_module()
elif module == "stream_notifications":
await start_stream_notifications_module()
else:
raise RuntimeError(f"Unknown module: {module}")
exit(0)
if __name__ == "__main__":
from asyncio import run
run(main())

View File

@@ -1,7 +0,0 @@
from .discord import start_discord_sevice
start = start_discord_sevice
__all__ = ["start"]

View File

@@ -1,24 +0,0 @@
from core.broker import broker
from repositories.streamers import StreamerConfigRepository
from .synchronizer import syncronize
@broker.task("scheduler_sync.syncronize_task")
async def syncronize_task(twitch_id: int):
streamer = await StreamerConfigRepository.get_by_twitch_id(twitch_id)
if streamer.integrations.discord is None:
return
await syncronize(streamer.twitch, streamer.integrations.discord.guild_id)
@broker.task("scheduler_sync.syncronize_all_task", schedule=[{"cron": "*/5 * * * *"}])
async def syncronize_all_task():
streamers = await StreamerConfigRepository().all()
for streamer in streamers:
if streamer.integrations.discord is None:
continue
await syncronize_task.kiq(streamer.twitch.id)

View File

@@ -1,7 +0,0 @@
from .twitch.webhook import start_twitch_service
start = start_twitch_service
__all__ = ["start"]

View File

@@ -1,84 +0,0 @@
from datetime import datetime, timezone
from twitchAPI.helper import first
from core.broker import broker
from repositories.streamers import StreamerConfigRepository
from .state import State, UpdateEvent, EventType
from .watcher import StateWatcher
from .messages_proc import MessageEvent, MessagesProc
from .twitch.authorize import authorize
@broker.task(
"stream_notifications.twitch.on_stream_state_change_with_check",
retry_on_error=True
)
async def on_stream_state_change_with_check(
event: UpdateEvent,
event_type: EventType
):
twitch = await authorize()
stream = await first(twitch.get_streams(user_id=[event.broadcaster_user_id]))
if stream is None:
return
await on_stream_state_change.kiq(
int(event.broadcaster_user_id),
event_type,
State(
title=event.title,
category=event.category_name,
last_live_at=datetime.now(timezone.utc)
)
)
@broker.task(
"stream_notifications.twitch.on_stream_state_change",
retry_on_error=True
)
async def on_stream_state_change(
streamer_id: int,
event_type: EventType,
new_state: State | None = None
):
await StateWatcher.on_stream_state_change(
streamer_id,
event_type,
new_state,
)
@broker.task(
"stream_notifications.check_streams_states",
schedule=[{"cron": "*/2 * * * *"}]
)
async def check_streams_states():
streamers = await StreamerConfigRepository.all()
streamers_ids = [str(streamer.twitch.id) for streamer in streamers]
twitch = await authorize()
async for stream in twitch.get_streams(user_id=streamers_ids):
state = State(
title=stream.title,
category=stream.game_name,
last_live_at=datetime.now(timezone.utc)
)
await StateWatcher.on_stream_state_change(
int(stream.user_id),
EventType.UNKNOWN,
state
)
@broker.task(
"stream_notifications.on_message",
retry_on_error=True
)
async def on_message(event: MessageEvent):
await MessagesProc.on_message(event)

View File

@@ -1,172 +0,0 @@
from asyncio import sleep, gather
import logging
from typing import NoReturn, Literal
from twitchAPI.eventsub.webhook import EventSubWebhook
from twitchAPI.twitch import Twitch
from twitchAPI.object.eventsub import StreamOnlineEvent, ChannelUpdateEvent, ChannelChatMessageEvent
from twitchAPI.oauth import validate_token
from core.config import config
from repositories.streamers import StreamerConfigRepository, StreamerConfig
from modules.stream_notifications.tasks import on_stream_state_change, on_stream_state_change_with_check, on_message
from modules.stream_notifications.state import UpdateEvent, EventType
from modules.stream_notifications.messages_proc import MessageEvent
from .authorize import authorize
logger = logging.getLogger(__name__)
class TwitchService:
ONLINE_NOTIFICATION_DELAY = 15 * 60
def __init__(self, twitch: Twitch):
self.twitch = twitch
self.failed = False
async def on_channel_update(self, event: ChannelUpdateEvent):
await on_stream_state_change_with_check.kiq(
UpdateEvent(
broadcaster_user_id=event.event.broadcaster_user_id,
title=event.event.title,
category_name=event.event.category_name
),
EventType.CHANNEL_UPDATE,
)
async def on_stream_online(self, event: StreamOnlineEvent):
await on_stream_state_change.kiq(
int(event.event.broadcaster_user_id),
EventType.STREAM_ONLINE,
)
async def on_message(self, event: ChannelChatMessageEvent):
await on_message.kiq(
MessageEvent.from_twitch_event(event)
)
async def subscribe_with_retry(
self,
method: Literal["listen_channel_update_v2"]
| Literal["listen_stream_online"]
| Literal["listen_channel_chat_message"],
eventsub: EventSubWebhook,
streamer: StreamerConfig,
retry: int = 10
):
try:
match method:
case "listen_channel_update_v2":
await eventsub.listen_channel_update_v2(str(streamer.twitch.id), self.on_channel_update)
case "listen_stream_online":
await eventsub.listen_stream_online(str(streamer.twitch.id), self.on_stream_online)
case "listen_channel_chat_message":
await eventsub.listen_channel_chat_message(
str(streamer.twitch.id),
str(config.TWITCH_ADMIN_USER_ID),
self.on_message
)
case _:
raise ValueError("Unknown method")
return
except Exception as e:
if retry <= 0:
raise e
match method:
case "listen_channel_update_v2":
sub_type = "channel.update"
case "listen_stream_online":
sub_type = "stream.online"
case "listen_channel_chat_message":
sub_type = "channel.chat.message"
case _:
raise ValueError("Unknown method")
subs = await self.twitch.get_eventsub_subscriptions(
user_id=str(streamer.twitch.id)
)
for sub in subs.data:
if sub.type == sub_type:
try:
await self.twitch.delete_eventsub_subscription(sub.id)
except Exception as e:
logger.error(f"Failed to delete subscription {sub.id}", exc_info=e)
await sleep(1)
await self.subscribe_with_retry(method, eventsub, streamer, retry - 1)
async def subscribe_to_streamer(self, eventsub: EventSubWebhook, streamer: StreamerConfig):
logger.info(f"Subscribe to events for {streamer.twitch.name}")
await gather(
self.subscribe_with_retry("listen_channel_update_v2", eventsub, streamer),
self.subscribe_with_retry("listen_stream_online", eventsub, streamer),
self.subscribe_with_retry("listen_channel_chat_message", eventsub, streamer),
)
logger.info(f"Subscribe to events for {streamer.twitch.name} done")
async def _check_token(self):
assert self.twitch._user_auth_token is not None
while True:
for _ in range(60):
if self.failed:
return
await sleep(1)
logger.info("Check token...")
val_result = await validate_token(
self.twitch._user_auth_token,
auth_base_url=self.twitch.auth_base_url
)
if val_result.get('status', 200) != 200:
await self.twitch.refresh_used_token()
logger.info("Token refreshed")
async def run(self) -> NoReturn:
eventsub = EventSubWebhook(
callback_url=config.TWITCH_CALLBACK_URL,
port=config.TWITCH_CALLBACK_PORT,
twitch=self.twitch,
message_deduplication_history_length=50
)
eventsub.wait_for_subscription_confirm_timeout = 60
eventsub.unsubscribe_on_stop = False
streamers = await StreamerConfigRepository.all()
try:
eventsub.start()
logger.info("Subscribe to events...")
await gather(
*[self.subscribe_to_streamer(eventsub, streamer) for streamer in streamers]
)
logger.info("Twitch service started")
await self._check_token()
finally:
logger.info("Twitch service stopping...")
await eventsub.stop()
@classmethod
async def start(cls):
logger.info("Starting Twitch service...")
try:
twith = await authorize(auto_refresh_auth=True)
await cls(twith).run()
except Exception as e:
logger.error("Twitch service failed", exc_info=e)
logger.info("Twitch service stopped")
async def start_twitch_service() -> NoReturn:
await TwitchService.start()

View File

@@ -1,2 +0,0 @@
from modules.scheduler_sync.tasks import * # noqa: F403
from modules.stream_notifications.tasks import * # noqa: F403

1300
uv.lock generated Normal file

File diff suppressed because it is too large Load Diff