mirror of
https://github.com/kurbezz/discord-bot.git
synced 2025-12-06 07:05:36 +01:00
Compare commits
83 Commits
402042080e
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| eddad6454d | |||
| 8ac15a1687 | |||
| 36c542b822 | |||
| b67d00bcd7 | |||
| 81a51a3d0d | |||
| 90756c884c | |||
| df501c27d7 | |||
| 33fbc4c446 | |||
| 03039bbd09 | |||
| 6ea36f165e | |||
| 7721e3a840 | |||
| 1c5b7e81e8 | |||
| 586359f8ce | |||
| 12dd0f9af7 | |||
| 4174d67084 | |||
| 2d8b767ae3 | |||
| a0a53800ab | |||
| 959ab30265 | |||
| 560a5ac793 | |||
| 28c64341e1 | |||
| 2ef3148d46 | |||
| cb464ad9f3 | |||
| bb67ff7bba | |||
| cadd54565a | |||
| 5bea39cd2c | |||
| 77fb68a5e3 | |||
| fb19f7a125 | |||
| acaff281c3 | |||
| dc2486f5ff | |||
| 92be127ced | |||
| b18f717fae | |||
| 58d3a37985 | |||
| 92c5398f0d | |||
| bef5e907f5 | |||
| 004a21f6cc | |||
| 0917e5634e | |||
| f9995acf3d | |||
| 288e4769bc | |||
| cafa0e3afd | |||
| 5a769c96a0 | |||
| f1d023c9a1 | |||
| cfce98a42f | |||
| abe0cbb173 | |||
| 1eba79cc5a | |||
| 30eb1eaf02 | |||
| df72c4c30a | |||
| 0a69d71f70 | |||
| c4bcfe3b2b | |||
| 05bd4bde27 | |||
| 46241f0b2e | |||
| c36622babf | |||
| c7c273cdac | |||
| 19e2207bc2 | |||
| c511192f83 | |||
| 8140648034 | |||
| d1f0681dba | |||
| 4d7ec071cb | |||
| f46f2aee3f | |||
| b5b3397bff | |||
| dac0ddb884 | |||
| 08704c6529 | |||
| 74acf4596a | |||
| 678dd29918 | |||
| 3cfea7277f | |||
| 269245a112 | |||
| 22e9461a7e | |||
| 165308d83b | |||
| d041f8d903 | |||
| 8207266d9b | |||
| 9a76f86d41 | |||
| dc3abeb429 | |||
| fa82c96e79 | |||
| 6f0a236f2f | |||
| d645e6db92 | |||
| 334718a7fb | |||
| efa92dc575 | |||
| a95d80c8dc | |||
| f9603712e8 | |||
| fbe8367723 | |||
| 6e50807381 | |||
| f907892769 | |||
| 5648bfa024 | |||
| 7b3a89f41b |
24
.github/workflows/build_docker_image.yml
vendored
24
.github/workflows/build_docker_image.yml
vendored
@@ -50,25 +50,25 @@ jobs:
|
||||
url: ${{ secrets.WEBHOOK_URL }}
|
||||
|
||||
-
|
||||
name: Invoke deployment hook (worker)
|
||||
name: Invoke deployment hook (twitch_webhook)
|
||||
uses: joelwmale/webhook-action@master
|
||||
with:
|
||||
url: ${{ secrets.WEBHOOK_URL_2 }}
|
||||
|
||||
-
|
||||
name: Invoke deployment hook (scheduler)
|
||||
name: Invoke deployment hook (temporal_worker)
|
||||
uses: joelwmale/webhook-action@master
|
||||
with:
|
||||
url: ${{ secrets.WEBHOOK_URL_3 }}
|
||||
|
||||
-
|
||||
name: Invoke deployment hook (stream_notifications)
|
||||
uses: joelwmale/webhook-action@master
|
||||
with:
|
||||
url: ${{ secrets.WEBHOOK_URL_4 }}
|
||||
# -
|
||||
# name: Invoke deployment hook (stream_notifications)
|
||||
# uses: joelwmale/webhook-action@master
|
||||
# with:
|
||||
# url: ${{ secrets.WEBHOOK_URL_4 }}
|
||||
|
||||
-
|
||||
name: Invoke deployment hook (web_app)
|
||||
uses: joelwmale/webhook-action@master
|
||||
with:
|
||||
url: ${{ secrets.WEBHOOK_URL_5 }}
|
||||
# -
|
||||
# name: Invoke deployment hook (web_app)
|
||||
# uses: joelwmale/webhook-action@master
|
||||
# with:
|
||||
# url: ${{ secrets.WEBHOOK_URL_5 }}
|
||||
|
||||
4
.gitignore
vendored
4
.gitignore
vendored
@@ -1,5 +1,7 @@
|
||||
venv
|
||||
.venv
|
||||
|
||||
.DS_Store
|
||||
|
||||
.vscode
|
||||
|
||||
*.cpython-*.pyc
|
||||
|
||||
@@ -1,33 +1,11 @@
|
||||
FROM python:3.12-slim AS build
|
||||
|
||||
ARG POETRY_EXPORT_EXTRA_ARGS=''
|
||||
|
||||
WORKDIR /opt/venv
|
||||
RUN python -m venv /opt/venv && /opt/venv/bin/pip install --upgrade pip && /opt/venv/bin/pip install --no-cache-dir httpx poetry poetry-plugin-export
|
||||
|
||||
COPY ./pyproject.toml ./poetry.lock ./
|
||||
RUN --mount=type=ssh /opt/venv/bin/poetry export --without-hashes ${POETRY_EXPORT_EXTRA_ARGS} > requirements.txt \
|
||||
&& /opt/venv/bin/pip install --no-cache-dir -r requirements.txt
|
||||
|
||||
|
||||
FROM python:3.12-slim AS runtime
|
||||
|
||||
RUN apt update && \
|
||||
apt install -y --no-install-recommends curl jq && \
|
||||
apt clean
|
||||
|
||||
COPY ./src/ /app
|
||||
|
||||
COPY ./scripts/*.sh /
|
||||
RUN chmod +x /*.sh
|
||||
|
||||
ENV PATH="/opt/venv/bin:$PATH"
|
||||
ENV VENV_PATH=/opt/venv
|
||||
|
||||
COPY --from=build /opt/venv /opt/venv
|
||||
FROM ghcr.io/astral-sh/uv:python3.12-bookworm-slim
|
||||
|
||||
WORKDIR /app
|
||||
COPY ./pyproject.toml ./uv.lock ./
|
||||
|
||||
EXPOSE 80
|
||||
RUN --mount=type=ssh uv venv \
|
||||
&& uv sync --frozen
|
||||
|
||||
CMD ["python", "main.py"]
|
||||
COPY ./src /app/src
|
||||
|
||||
WORKDIR /app/src
|
||||
|
||||
2152
poetry.lock
generated
2152
poetry.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -1,30 +1,34 @@
|
||||
[tool.poetry]
|
||||
[project]
|
||||
name = "discord-bot"
|
||||
version = "0.1.0"
|
||||
description = ""
|
||||
authors = ["Bulat Kurbanov <kurbanovbul@gmail.com>"]
|
||||
readme = "README.md"
|
||||
packages = [{include = "discord_bot"}]
|
||||
authors = [{ name = "Bulat Kurbanov", email = "kurbanovbul@gmail.com" }]
|
||||
requires-python = "~=3.12"
|
||||
dependencies = [
|
||||
"discord-py>=2.4.0,<3",
|
||||
"twitchapi>=4.4.0,<5",
|
||||
"pydantic>=2.10.5,<3",
|
||||
"pydantic-settings>=2.7.1,<3",
|
||||
"httpx>=0.28.1,<0.29",
|
||||
"icalendar>=6.1.0,<7",
|
||||
"pytz~=2025.2",
|
||||
"mongojet>=0.3,<0.4",
|
||||
"taskiq>=0.11.11,<0.12",
|
||||
"taskiq-redis>=1.0.2,<2",
|
||||
"redis[hiredis]>=5.2.1,<6",
|
||||
"fastapi>=0.115.8,<0.116",
|
||||
"authx>=1.4.1,<2",
|
||||
"httpx-oauth>=0.16.1,<0.17",
|
||||
"uvicorn[standard]>=0.34.0,<0.35",
|
||||
"temporalio>=1.10.0",
|
||||
]
|
||||
|
||||
[tool.poetry.dependencies]
|
||||
python = "^3.11"
|
||||
discord-py = "^2.4.0"
|
||||
twitchapi = "^4.4.0"
|
||||
pydantic = "^2.10.5"
|
||||
pydantic-settings = "^2.7.1"
|
||||
httpx = "^0.28.1"
|
||||
icalendar = "^6.1.0"
|
||||
pytz = "^2024.2"
|
||||
mongojet = "^0.2.7"
|
||||
taskiq = "^0.11.11"
|
||||
taskiq-redis = "^1.0.2"
|
||||
redis = {extras = ["hiredis"], version = "^5.2.1"}
|
||||
fastapi = "^0.115.8"
|
||||
authx = "^1.4.1"
|
||||
httpx-oauth = "^0.16.1"
|
||||
uvicorn = {extras = ["standard"], version = "^0.34.0"}
|
||||
[tool.hatch.build.targets.sdist]
|
||||
include = ["discord_bot"]
|
||||
|
||||
[tool.hatch.build.targets.wheel]
|
||||
include = ["discord_bot"]
|
||||
|
||||
[build-system]
|
||||
requires = ["poetry-core"]
|
||||
build-backend = "poetry.core.masonry.api"
|
||||
requires = ["hatchling"]
|
||||
build-backend = "hatchling.build"
|
||||
|
||||
@@ -1,3 +0,0 @@
|
||||
#! /usr/bin/env sh
|
||||
|
||||
/opt/venv/bin/python main.py $1
|
||||
@@ -1,3 +0,0 @@
|
||||
#! /usr/bin/env sh
|
||||
|
||||
/opt/venv/bin/taskiq scheduler core.broker:scheduler modules.tasks
|
||||
@@ -1,3 +0,0 @@
|
||||
#! /usr/bin/env sh
|
||||
|
||||
/opt/venv/bin/uvicorn modules.web_app.app:app --host 0.0.0.0 --port 80
|
||||
@@ -1,3 +0,0 @@
|
||||
#! /usr/bin/env sh
|
||||
|
||||
/opt/venv/bin/taskiq worker core.broker:broker modules.tasks
|
||||
@@ -1,31 +1,41 @@
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class TwitchConfig(BaseModel):
|
||||
id: int
|
||||
name: str
|
||||
|
||||
|
||||
class NotificationsConfig(BaseModel):
|
||||
start_stream: str
|
||||
change_category: str | None = None
|
||||
redemption_reward: str | None = None
|
||||
|
||||
|
||||
class GamesListConfig(BaseModel):
|
||||
channel_id: int
|
||||
message_id: int
|
||||
|
||||
|
||||
class DiscordConfig(BaseModel):
|
||||
guild_id: int
|
||||
notifications_channel_id: int
|
||||
games_list: GamesListConfig | None = None
|
||||
roles: dict[str, int] | None = None
|
||||
|
||||
|
||||
class TelegramConfig(BaseModel):
|
||||
notifications_channel_id: int
|
||||
|
||||
|
||||
class IntegrationsConfig(BaseModel):
|
||||
discord: DiscordConfig | None = None
|
||||
telegram: TelegramConfig | None = None
|
||||
|
||||
|
||||
class StreamerConfig(BaseModel):
|
||||
twitch: TwitchConfig
|
||||
notifications: NotificationsConfig
|
||||
integrations: IntegrationsConfig
|
||||
|
||||
chatbot_in_chats: list[int] | None = None
|
||||
@@ -1,4 +1,4 @@
|
||||
from domain.streamers import StreamerConfig
|
||||
from applications.common.domain.streamers import StreamerConfig
|
||||
|
||||
from .base import BaseRepository
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
from domain.users import CreateUser, User
|
||||
from applications.common.domain.users import CreateUser, User
|
||||
|
||||
from .base import BaseRepository
|
||||
|
||||
@@ -18,10 +18,10 @@ class UserRepository(BaseRepository):
|
||||
)
|
||||
|
||||
@classmethod
|
||||
async def get_or_create_user(cls, newUser: CreateUser) -> User:
|
||||
async def get_or_create_user(cls, new_user: CreateUser) -> User:
|
||||
filter_data = {}
|
||||
|
||||
for provider, data in newUser.oauths.items():
|
||||
for provider, data in new_user.oauths.items():
|
||||
filter_data[f"oauths.{provider}.id"] = data.id
|
||||
|
||||
async with cls.connect() as collection:
|
||||
@@ -29,7 +29,7 @@ class UserRepository(BaseRepository):
|
||||
filter_data,
|
||||
{
|
||||
"$setOnInsert": {
|
||||
**newUser.model_dump(),
|
||||
**new_user.model_dump(),
|
||||
}
|
||||
},
|
||||
upsert=True,
|
||||
13
src/applications/games_list/__main__.py
Normal file
13
src/applications/games_list/__main__.py
Normal file
@@ -0,0 +1,13 @@
|
||||
from asyncio import run
|
||||
|
||||
from applications.games_list.discord import client, logger
|
||||
from core.config import config
|
||||
|
||||
|
||||
async def start_discord_sevice():
|
||||
logger.info("Starting Discord service...")
|
||||
|
||||
await client.start(config.DISCORD_BOT_TOKEN)
|
||||
|
||||
|
||||
run(start_discord_sevice())
|
||||
@@ -5,13 +5,15 @@ from discord.abc import Messageable
|
||||
from discord import Object
|
||||
from discord import app_commands
|
||||
|
||||
from modules.games_list.games_list import GameList, GameItem
|
||||
|
||||
from applications.common.repositories.streamers import StreamerConfigRepository
|
||||
from applications.games_list.games_list import GameList, GameItem
|
||||
from core.config import config
|
||||
from repositories.streamers import StreamerConfigRepository
|
||||
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.setLevel(logging.INFO)
|
||||
|
||||
|
||||
async def get_game_list_channel_to_message_map() -> dict[int, int]:
|
||||
@@ -240,9 +242,3 @@ async def replace(interaction: discord.Interaction, game: str, new: str):
|
||||
await game_list.save()
|
||||
|
||||
await interaction.response.send_message("Игра заменена!", ephemeral=True)
|
||||
|
||||
|
||||
async def start_discord_sevice():
|
||||
logger.info("Starting Discord service...")
|
||||
|
||||
await client.start(config.DISCORD_BOT_TOKEN)
|
||||
7
src/applications/schedule_sync/activities/__init__.py
Normal file
7
src/applications/schedule_sync/activities/__init__.py
Normal file
@@ -0,0 +1,7 @@
|
||||
from .sync import syncronize, syncronize_all
|
||||
|
||||
|
||||
__all__ = [
|
||||
"syncronize",
|
||||
"syncronize_all",
|
||||
]
|
||||
29
src/applications/schedule_sync/activities/sync.py
Normal file
29
src/applications/schedule_sync/activities/sync.py
Normal file
@@ -0,0 +1,29 @@
|
||||
from temporalio import activity
|
||||
|
||||
from applications.common.repositories.streamers import StreamerConfigRepository
|
||||
from applications.schedule_sync.synchronizer import syncronize as syncronize_internal
|
||||
|
||||
|
||||
@activity.defn
|
||||
async def syncronize(twitch_id: int):
|
||||
try:
|
||||
streamer = await StreamerConfigRepository.get_by_twitch_id(twitch_id)
|
||||
|
||||
if streamer.integrations.discord is None:
|
||||
return
|
||||
|
||||
await syncronize_internal(streamer.twitch, streamer.integrations.discord.guild_id)
|
||||
except Exception as e:
|
||||
activity.logger.error(f"Error during synchronization: {e}")
|
||||
raise e
|
||||
|
||||
|
||||
@activity.defn
|
||||
async def syncronize_all():
|
||||
streamers = await StreamerConfigRepository().all()
|
||||
|
||||
for streamer in streamers:
|
||||
if streamer.integrations.discord is None:
|
||||
continue
|
||||
|
||||
await syncronize(streamer.twitch.id)
|
||||
@@ -1,7 +1,7 @@
|
||||
import logging
|
||||
from datetime import datetime
|
||||
|
||||
from domain.streamers import TwitchConfig
|
||||
from applications.common.domain.streamers import TwitchConfig
|
||||
|
||||
from .twitch_events import get_twitch_events, TwitchEvent
|
||||
from .discord_events import (
|
||||
@@ -71,7 +71,11 @@ async def get_twitch_events(twitch_channel_id: str) -> list[TwitchEvent]:
|
||||
else:
|
||||
raise ValueError("Invalid repeat rule")
|
||||
|
||||
if event.start_at > datetime.now(event.start_at.tzinfo) or event.repeat_rule is not None:
|
||||
if (
|
||||
event.start_at > datetime.now(event.start_at.tzinfo)
|
||||
or event.end_at > datetime.now(event.end_at.tzinfo)
|
||||
or event.repeat_rule is not None
|
||||
):
|
||||
events.append(event)
|
||||
|
||||
return events
|
||||
6
src/applications/schedule_sync/workflows/__init__.py
Normal file
6
src/applications/schedule_sync/workflows/__init__.py
Normal file
@@ -0,0 +1,6 @@
|
||||
from .sync import ScheduleSyncWorkflow
|
||||
|
||||
|
||||
__all__ = [
|
||||
"ScheduleSyncWorkflow"
|
||||
]
|
||||
34
src/applications/schedule_sync/workflows/sync.py
Normal file
34
src/applications/schedule_sync/workflows/sync.py
Normal file
@@ -0,0 +1,34 @@
|
||||
from datetime import timedelta
|
||||
|
||||
from temporalio import workflow
|
||||
from temporalio.client import Schedule, ScheduleActionStartWorkflow, ScheduleSpec, ScheduleIntervalSpec
|
||||
|
||||
from applications.schedule_sync.activities import syncronize_all
|
||||
from applications.temporal_worker.queues import MAIN_QUEUE
|
||||
|
||||
|
||||
@workflow.defn
|
||||
class ScheduleSyncWorkflow:
|
||||
@classmethod
|
||||
def get_schedules(cls) -> dict[str, Schedule]:
|
||||
return {
|
||||
"all": Schedule(
|
||||
action=ScheduleActionStartWorkflow(
|
||||
cls.run,
|
||||
id="ScheduleSyncWorkflow",
|
||||
task_queue=MAIN_QUEUE,
|
||||
execution_timeout=timedelta(minutes=1),
|
||||
),
|
||||
spec=ScheduleSpec(
|
||||
intervals=[ScheduleIntervalSpec(every=timedelta(minutes=5))]
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
@workflow.run
|
||||
async def run(self):
|
||||
await workflow.execute_activity(
|
||||
syncronize_all,
|
||||
task_queue=MAIN_QUEUE,
|
||||
schedule_to_close_timeout=timedelta(minutes=5),
|
||||
)
|
||||
58
src/applications/temporal_worker/__main__.py
Normal file
58
src/applications/temporal_worker/__main__.py
Normal file
@@ -0,0 +1,58 @@
|
||||
from asyncio import run
|
||||
|
||||
from core.temporal import get_client
|
||||
|
||||
from temporalio.client import ScheduleAlreadyRunningError
|
||||
from temporalio.worker import Worker, UnsandboxedWorkflowRunner
|
||||
|
||||
from applications.schedule_sync import activities as schedule_sync_activities
|
||||
from applications.schedule_sync.workflows import ScheduleSyncWorkflow
|
||||
|
||||
from applications.twitch_webhook import activities as twitch_activities
|
||||
from applications.twitch_webhook import workflows as twitch_workflows
|
||||
|
||||
from .queues import MAIN_QUEUE
|
||||
|
||||
|
||||
async def main():
|
||||
client = await get_client()
|
||||
|
||||
for id, schedule in ScheduleSyncWorkflow.get_schedules().items():
|
||||
try:
|
||||
await client.create_schedule(f"ScheduleSyncWorkflow-{id}", schedule)
|
||||
except ScheduleAlreadyRunningError:
|
||||
pass
|
||||
|
||||
for id, schedule in twitch_workflows.StreamsCheckWorkflow.get_schedules().items():
|
||||
try:
|
||||
await client.create_schedule(f"StreamsCheckWorkflow-{id}", schedule)
|
||||
except ScheduleAlreadyRunningError:
|
||||
pass
|
||||
|
||||
worker: Worker = Worker(
|
||||
client,
|
||||
task_queue=MAIN_QUEUE,
|
||||
workflows=[
|
||||
ScheduleSyncWorkflow,
|
||||
twitch_workflows.StreamsCheckWorkflow,
|
||||
twitch_workflows.OnChannelUpdateWorkflow,
|
||||
twitch_workflows.OnMessageWorkflow,
|
||||
twitch_workflows.OnRewardRedemptionWorkflow,
|
||||
twitch_workflows.OnStreamOnlineWorkflow,
|
||||
],
|
||||
activities=[
|
||||
schedule_sync_activities.syncronize,
|
||||
schedule_sync_activities.syncronize_all,
|
||||
twitch_activities.on_message_activity,
|
||||
twitch_activities.on_stream_state_change_activity,
|
||||
twitch_activities.check_streams_states,
|
||||
twitch_activities.on_redemption_reward_add_activity,
|
||||
twitch_activities.on_channel_update_activity,
|
||||
],
|
||||
workflow_runner=UnsandboxedWorkflowRunner()
|
||||
)
|
||||
|
||||
await worker.run()
|
||||
|
||||
|
||||
run(main())
|
||||
1
src/applications/temporal_worker/queues.py
Normal file
1
src/applications/temporal_worker/queues.py
Normal file
@@ -0,0 +1 @@
|
||||
MAIN_QUEUE = "main"
|
||||
10
src/applications/twitch_webhook/__main__.py
Normal file
10
src/applications/twitch_webhook/__main__.py
Normal file
@@ -0,0 +1,10 @@
|
||||
from asyncio import run
|
||||
|
||||
from .twitch.webhook import TwitchService
|
||||
|
||||
|
||||
async def start_twitch_service() -> None:
|
||||
await TwitchService.start()
|
||||
|
||||
|
||||
run(start_twitch_service())
|
||||
13
src/applications/twitch_webhook/activities/__init__.py
Normal file
13
src/applications/twitch_webhook/activities/__init__.py
Normal file
@@ -0,0 +1,13 @@
|
||||
from .message_proc import on_message_activity
|
||||
from .on_state_change import on_stream_state_change_activity, on_channel_update_activity
|
||||
from .redemption_reward import on_redemption_reward_add_activity
|
||||
from .state_checker import check_streams_states
|
||||
|
||||
|
||||
__all__ = [
|
||||
"on_message_activity",
|
||||
"on_stream_state_change_activity",
|
||||
"check_streams_states",
|
||||
"on_redemption_reward_add_activity",
|
||||
"on_channel_update_activity",
|
||||
]
|
||||
13
src/applications/twitch_webhook/activities/message_proc.py
Normal file
13
src/applications/twitch_webhook/activities/message_proc.py
Normal file
@@ -0,0 +1,13 @@
|
||||
from temporalio import activity
|
||||
|
||||
from applications.twitch_webhook.messages_proc import MessageEvent, MessagesProc
|
||||
|
||||
|
||||
@activity.defn
|
||||
async def on_message_activity(
|
||||
event: MessageEvent
|
||||
):
|
||||
await MessagesProc.on_message(
|
||||
event.received_as,
|
||||
event
|
||||
)
|
||||
@@ -0,0 +1,58 @@
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from temporalio import activity
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from twitchAPI.helper import first
|
||||
|
||||
from applications.twitch_webhook.state import State, EventType, UpdateEvent
|
||||
from applications.twitch_webhook.watcher import StateWatcher
|
||||
from applications.twitch_webhook.twitch.authorize import authorize
|
||||
|
||||
|
||||
class OnStreamStateChangeActivity(BaseModel):
|
||||
streamer_id: int
|
||||
event_type: EventType
|
||||
new_state: State | None = None
|
||||
|
||||
|
||||
@activity.defn
|
||||
async def on_stream_state_change_activity(
|
||||
data: OnStreamStateChangeActivity
|
||||
):
|
||||
await StateWatcher.on_stream_state_change(
|
||||
data.streamer_id,
|
||||
data.event_type,
|
||||
data.new_state,
|
||||
)
|
||||
|
||||
|
||||
class OnChannelUpdateActivity(BaseModel):
|
||||
event: UpdateEvent
|
||||
event_type: EventType
|
||||
|
||||
|
||||
@activity.defn
|
||||
async def on_channel_update_activity(
|
||||
data: OnChannelUpdateActivity
|
||||
):
|
||||
twitch = await authorize(data.event.broadcaster_user_login)
|
||||
|
||||
stream = await first(twitch.get_streams(
|
||||
user_id=[data.event.broadcaster_user_id])
|
||||
)
|
||||
if stream is None:
|
||||
return
|
||||
|
||||
await on_stream_state_change_activity(
|
||||
OnStreamStateChangeActivity(
|
||||
streamer_id=int(data.event.broadcaster_user_id),
|
||||
event_type=data.event_type,
|
||||
new_state=State(
|
||||
title=data.event.title,
|
||||
category=data.event.category_name,
|
||||
last_live_at=datetime.now(timezone.utc)
|
||||
),
|
||||
)
|
||||
)
|
||||
@@ -0,0 +1,8 @@
|
||||
from temporalio import activity
|
||||
|
||||
from applications.twitch_webhook.reward_redemption import RewardRedemption, on_redemption_reward_add
|
||||
|
||||
|
||||
@activity.defn
|
||||
async def on_redemption_reward_add_activity(event: RewardRedemption):
|
||||
await on_redemption_reward_add(event)
|
||||
29
src/applications/twitch_webhook/activities/state_checker.py
Normal file
29
src/applications/twitch_webhook/activities/state_checker.py
Normal file
@@ -0,0 +1,29 @@
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from temporalio import activity
|
||||
|
||||
from applications.common.repositories.streamers import StreamerConfigRepository
|
||||
from applications.twitch_webhook.twitch.authorize import authorize
|
||||
from applications.twitch_webhook.state import State, EventType
|
||||
from applications.twitch_webhook.watcher import StateWatcher
|
||||
|
||||
|
||||
@activity.defn
|
||||
async def check_streams_states():
|
||||
streamers = await StreamerConfigRepository.all()
|
||||
streamers_ids = [str(streamer.twitch.id) for streamer in streamers]
|
||||
|
||||
twitch = await authorize("kurbezz")
|
||||
|
||||
async for stream in twitch.get_streams(user_id=streamers_ids):
|
||||
state = State(
|
||||
title=stream.title,
|
||||
category=stream.game_name,
|
||||
last_live_at=datetime.now(timezone.utc)
|
||||
)
|
||||
|
||||
await StateWatcher.on_stream_state_change(
|
||||
int(stream.user_id),
|
||||
EventType.UNKNOWN,
|
||||
state
|
||||
)
|
||||
@@ -6,7 +6,7 @@ from twitchAPI.object.eventsub import ChannelChatMessageEvent
|
||||
from httpx import AsyncClient
|
||||
|
||||
from core.config import config
|
||||
from .twitch.authorize import authorize
|
||||
from .twitch.authorize import authorize, Twitch
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -39,6 +39,8 @@ class MessageType(StrEnum):
|
||||
|
||||
|
||||
class MessageEvent(BaseModel):
|
||||
received_as: str
|
||||
|
||||
broadcaster_user_id: str
|
||||
broadcaster_user_name: str
|
||||
broadcaster_user_login: str
|
||||
@@ -57,8 +59,10 @@ class MessageEvent(BaseModel):
|
||||
channel_points_custom_reward_id: str | None
|
||||
|
||||
@classmethod
|
||||
def from_twitch_event(cls, event: ChannelChatMessageEvent):
|
||||
def from_twitch_event(cls, received_as: str, event: ChannelChatMessageEvent):
|
||||
return cls(
|
||||
received_as=received_as,
|
||||
|
||||
broadcaster_user_id=event.event.broadcaster_user_id,
|
||||
broadcaster_user_name=event.event.broadcaster_user_name,
|
||||
broadcaster_user_login=event.event.broadcaster_user_login,
|
||||
@@ -96,6 +100,19 @@ async def get_completion(messages: list[dict]) -> str:
|
||||
logger.info(f"Getting completion for message: {messages}")
|
||||
|
||||
data_messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": (
|
||||
"Don't use markdown! "
|
||||
"Don't use blocked words on Twitch! "
|
||||
"Make answers short and clear! "
|
||||
"You are not language model! "
|
||||
"You are not a bot! "
|
||||
"You are human! "
|
||||
"Your name is kurbezz or Bulat! "
|
||||
"Don't start answer with @kurbezz!"
|
||||
)
|
||||
},
|
||||
*(
|
||||
{
|
||||
"role": "assistant" if message["user"] == "kurbezz" else "user",
|
||||
@@ -103,10 +120,6 @@ async def get_completion(messages: list[dict]) -> str:
|
||||
}
|
||||
for message in messages
|
||||
),
|
||||
{
|
||||
"role": "system",
|
||||
"content": "Don't use markdown! Don't use blocked words on Twitch! Make answers short and clear!"
|
||||
}
|
||||
]
|
||||
|
||||
async with AsyncClient() as client:
|
||||
@@ -117,7 +130,7 @@ async def get_completion(messages: list[dict]) -> str:
|
||||
"content-type": "application/json"
|
||||
},
|
||||
json={
|
||||
"model": "google/gemini-2.0-flash-thinking-exp:free",
|
||||
"model": "deepseek/deepseek-chat-v3-0324:free",
|
||||
"messages": data_messages
|
||||
}
|
||||
)
|
||||
@@ -130,9 +143,8 @@ async def get_completion(messages: list[dict]) -> str:
|
||||
|
||||
|
||||
class MessagesProc:
|
||||
IGNORED_USER_LOGINS = [
|
||||
FULL_IGNORED_USER_LOGINS = [
|
||||
"jeetbot",
|
||||
"kurbezz",
|
||||
]
|
||||
|
||||
MESSAGE_LIMIT = 1000
|
||||
@@ -154,13 +166,16 @@ class MessagesProc:
|
||||
def get_message_history_with_thread(cls, message_id: str, thread_id: str | None = None) -> list[dict]:
|
||||
logger.info(f"HISTORY: {cls.MESSAGE_HISTORY}")
|
||||
|
||||
return [m for m in cls.MESSAGE_HISTORY if m["thread_id"] == thread_id] + \
|
||||
[m for m in cls.MESSAGE_HISTORY if m["id"] == message_id]
|
||||
if thread_id is not None:
|
||||
return (
|
||||
[m for m in cls.MESSAGE_HISTORY if m["id"] == thread_id]
|
||||
+ [m for m in cls.MESSAGE_HISTORY if m["thread_id"] == thread_id]
|
||||
)
|
||||
|
||||
return [m for m in cls.MESSAGE_HISTORY if m["id"] == message_id]
|
||||
|
||||
@classmethod
|
||||
async def on_message(cls, event: MessageEvent):
|
||||
logging.info(f"Received message: {event}")
|
||||
|
||||
async def _update_history(cls, event: MessageEvent):
|
||||
cls.update_message_history(
|
||||
id=event.message_id,
|
||||
text=event.message.text,
|
||||
@@ -168,11 +183,8 @@ class MessagesProc:
|
||||
thread_id=event.reply.thread_message_id if event.reply is not None else None
|
||||
)
|
||||
|
||||
if event.chatter_user_name == "pahangor":
|
||||
return
|
||||
|
||||
twitch = await authorize()
|
||||
|
||||
@classmethod
|
||||
async def _goida(cls, twitch: Twitch, event: MessageEvent):
|
||||
if "гойда" in event.message.text.lower():
|
||||
await twitch.send_chat_message(
|
||||
event.broadcaster_user_id,
|
||||
@@ -181,38 +193,79 @@ class MessagesProc:
|
||||
reply_parent_message_id=event.message_id
|
||||
)
|
||||
|
||||
if "lasqexx" in event.chatter_user_login:
|
||||
pass # Todo: Здароу
|
||||
@classmethod
|
||||
async def _lasqexx(cls, twitch: Twitch, event: MessageEvent):
|
||||
if "lasqexx" not in event.chatter_user_login:
|
||||
return
|
||||
|
||||
if event.message.text.lower().startswith("!ai"):
|
||||
try:
|
||||
messages = cls.get_message_history_with_thread(
|
||||
event.message_id,
|
||||
thread_id=event.reply.thread_message_id if event.reply is not None else None
|
||||
)
|
||||
completion = await get_completion(messages)
|
||||
if "здароу" in event.message.text.lower():
|
||||
await twitch.send_chat_message(
|
||||
event.broadcaster_user_id,
|
||||
config.TWITCH_ADMIN_USER_ID,
|
||||
"Здароу, давай иди уже",
|
||||
reply_parent_message_id=event.message_id
|
||||
)
|
||||
return
|
||||
|
||||
max_length = 255
|
||||
completion_parts = [completion[i:i + max_length] for i in range(0, len(completion), max_length)]
|
||||
if "сосал?" in event.message.text.lower():
|
||||
await twitch.send_chat_message(
|
||||
event.broadcaster_user_id,
|
||||
config.TWITCH_ADMIN_USER_ID,
|
||||
"А ты? Иди уже",
|
||||
reply_parent_message_id=event.message_id
|
||||
)
|
||||
return
|
||||
|
||||
for part in completion_parts:
|
||||
await twitch.send_chat_message(
|
||||
event.broadcaster_user_id,
|
||||
config.TWITCH_ADMIN_USER_ID,
|
||||
part,
|
||||
reply_parent_message_id=event.message_id
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error("Failed to get completion: {}", e, exc_info=True)
|
||||
if "лан я пошёл" in event.message.text.lower():
|
||||
await twitch.send_chat_message(
|
||||
event.broadcaster_user_id,
|
||||
config.TWITCH_ADMIN_USER_ID,
|
||||
"да да, иди уже",
|
||||
reply_parent_message_id=event.message_id
|
||||
)
|
||||
|
||||
@classmethod
|
||||
async def _ask_ai(cls, twitch: Twitch, event: MessageEvent):
|
||||
if not event.message.text.lower().startswith("!ai"):
|
||||
return
|
||||
|
||||
try:
|
||||
messages = cls.get_message_history_with_thread(
|
||||
event.message_id,
|
||||
thread_id=event.reply.thread_message_id if event.reply is not None else None
|
||||
)
|
||||
completion = await get_completion(messages)
|
||||
|
||||
max_length = 255
|
||||
completion_parts = [completion[i:i + max_length] for i in range(0, len(completion), max_length)]
|
||||
|
||||
for part in completion_parts:
|
||||
await twitch.send_chat_message(
|
||||
event.broadcaster_user_id,
|
||||
config.TWITCH_ADMIN_USER_ID,
|
||||
"Ошибка!",
|
||||
part,
|
||||
reply_parent_message_id=event.message_id
|
||||
)
|
||||
|
||||
if event.chatter_user_login in cls.IGNORED_USER_LOGINS:
|
||||
cls.update_message_history(
|
||||
id="ai",
|
||||
text=part,
|
||||
user="kurbezz",
|
||||
thread_id=event.message_id
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get completion: {e}", exc_info=True)
|
||||
|
||||
await twitch.send_chat_message(
|
||||
event.broadcaster_user_id,
|
||||
config.TWITCH_ADMIN_USER_ID,
|
||||
"Ошибка!",
|
||||
reply_parent_message_id=event.message_id
|
||||
)
|
||||
|
||||
@classmethod
|
||||
async def _kurbezz(cls, twitch: Twitch, event: MessageEvent):
|
||||
if event.chatter_user_login.lower() in ["kurbezz", "hafmc"]:
|
||||
return
|
||||
|
||||
if ("kurbezz" in event.message.text.lower() or \
|
||||
@@ -236,6 +289,13 @@ class MessagesProc:
|
||||
part,
|
||||
reply_parent_message_id=event.message_id
|
||||
)
|
||||
|
||||
cls.update_message_history(
|
||||
id="ai",
|
||||
text=part,
|
||||
user="kurbezz",
|
||||
thread_id=event.message_id
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get completion: {e}")
|
||||
|
||||
@@ -245,3 +305,21 @@ class MessagesProc:
|
||||
"Пошел нахуй!",
|
||||
reply_parent_message_id=event.message_id
|
||||
)
|
||||
|
||||
@classmethod
|
||||
async def on_message(cls, received_as: str, event: MessageEvent):
|
||||
return
|
||||
|
||||
if event.chatter_user_name in cls.FULL_IGNORED_USER_LOGINS:
|
||||
return
|
||||
|
||||
logging.info(f"Received message: {event}")
|
||||
|
||||
await cls._update_history(event)
|
||||
|
||||
twitch = await authorize(received_as)
|
||||
|
||||
await cls._goida(twitch, event)
|
||||
await cls._lasqexx(twitch, event)
|
||||
await cls._ask_ai(twitch, event)
|
||||
await cls._kurbezz(twitch, event)
|
||||
@@ -3,7 +3,7 @@ import logging
|
||||
from httpx import AsyncClient
|
||||
|
||||
from core.config import config
|
||||
from domain.streamers import StreamerConfig
|
||||
from applications.common.domain.streamers import StreamerConfig
|
||||
|
||||
from .state import State
|
||||
from .sent_notifications import SentNotification, SentNotificationType, SentResult
|
||||
52
src/applications/twitch_webhook/reward_redemption.py
Normal file
52
src/applications/twitch_webhook/reward_redemption.py
Normal file
@@ -0,0 +1,52 @@
|
||||
import logging
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from twitchAPI.object.eventsub import ChannelPointsCustomRewardRedemptionAddEvent
|
||||
|
||||
from applications.common.repositories.streamers import StreamerConfigRepository
|
||||
from .twitch.authorize import authorize
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RewardRedemption(BaseModel):
|
||||
broadcaster_user_id: str
|
||||
broadcaster_user_login: str
|
||||
user_name: str
|
||||
reward_title: str
|
||||
user_input: str
|
||||
|
||||
@classmethod
|
||||
def from_twitch_event(cls, event: ChannelPointsCustomRewardRedemptionAddEvent):
|
||||
return cls(
|
||||
broadcaster_user_id=event.event.broadcaster_user_id,
|
||||
broadcaster_user_login=event.event.broadcaster_user_login,
|
||||
user_name=event.event.user_name,
|
||||
reward_title=event.event.reward.title,
|
||||
user_input=event.event.user_input or "",
|
||||
)
|
||||
|
||||
|
||||
async def on_redemption_reward_add(reward: RewardRedemption):
|
||||
logger.info(f"{reward.user_name} just redeemed {reward.reward_title}!")
|
||||
|
||||
twitch = await authorize(reward.broadcaster_user_login)
|
||||
|
||||
streamer = await StreamerConfigRepository.get_by_twitch_id(int(reward.broadcaster_user_id))
|
||||
|
||||
if streamer.notifications.redemption_reward is None:
|
||||
return
|
||||
|
||||
message = streamer.notifications.redemption_reward.format(
|
||||
user=reward.user_name,
|
||||
reward_title=reward.reward_title,
|
||||
reward_promt=f" ({reward.user_input})" if reward.user_input else ""
|
||||
)
|
||||
|
||||
await twitch.send_chat_message(
|
||||
reward.broadcaster_user_id,
|
||||
reward.broadcaster_user_id,
|
||||
message
|
||||
)
|
||||
@@ -21,6 +21,7 @@ class State(BaseModel):
|
||||
|
||||
class UpdateEvent(BaseModel):
|
||||
broadcaster_user_id: str
|
||||
broadcaster_user_login: str
|
||||
title: str
|
||||
category_name: str
|
||||
|
||||
@@ -14,19 +14,21 @@ SCOPES = [
|
||||
AuthScope.USER_BOT,
|
||||
AuthScope.USER_READ_CHAT,
|
||||
AuthScope.USER_WRITE_CHAT,
|
||||
|
||||
AuthScope.CHANNEL_READ_REDEMPTIONS,
|
||||
]
|
||||
|
||||
|
||||
async def authorize(auto_refresh_auth: bool = False) -> Twitch:
|
||||
async def authorize(user: str, auto_refresh_auth: bool = False) -> Twitch:
|
||||
twitch = Twitch(
|
||||
config.TWITCH_CLIENT_ID,
|
||||
config.TWITCH_CLIENT_SECRET
|
||||
)
|
||||
|
||||
twitch.user_auth_refresh_callback = TokenStorage.save
|
||||
twitch.user_auth_refresh_callback = lambda a, r: TokenStorage.save(user, a, r)
|
||||
twitch.auto_refresh_auth = auto_refresh_auth
|
||||
|
||||
token, refresh_token = await TokenStorage.get()
|
||||
token, refresh_token = await TokenStorage.get(user)
|
||||
await twitch.set_user_authentication(
|
||||
token,
|
||||
SCOPES,
|
||||
@@ -1,12 +1,16 @@
|
||||
import logging
|
||||
|
||||
from core.mongo import mongo_manager
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TokenStorage:
|
||||
COLLECTION_NAME = "secrets"
|
||||
OBJECT_ID = "twitch_tokens"
|
||||
TYPE = "twitch_token"
|
||||
|
||||
@staticmethod
|
||||
async def save(acceess_token: str, refresh_token: str):
|
||||
async def save(user: str, acceess_token: str, refresh_token: str):
|
||||
data = {"access_token": acceess_token, "refresh_token": refresh_token}
|
||||
|
||||
async with mongo_manager.connect() as client:
|
||||
@@ -14,17 +18,20 @@ class TokenStorage:
|
||||
collection = db[TokenStorage.COLLECTION_NAME]
|
||||
|
||||
await collection.update_one(
|
||||
{"_id": TokenStorage.OBJECT_ID},
|
||||
{"type": TokenStorage.TYPE, "twitch_login": user},
|
||||
{"$set": data},
|
||||
upsert=True
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
async def get() -> tuple[str, str]:
|
||||
async def get(user: str) -> tuple[str, str]:
|
||||
async with mongo_manager.connect() as client:
|
||||
db = client.get_default_database()
|
||||
collection = db[TokenStorage.COLLECTION_NAME]
|
||||
|
||||
data = await collection.find_one({"_id": TokenStorage.OBJECT_ID})
|
||||
data = await collection.find_one({"type": TokenStorage.TYPE, "twitch_login": user})
|
||||
|
||||
if data is None:
|
||||
raise RuntimeError(f"Token for user {user} not found")
|
||||
|
||||
return data["access_token"], data["refresh_token"]
|
||||
233
src/applications/twitch_webhook/twitch/webhook.py
Normal file
233
src/applications/twitch_webhook/twitch/webhook.py
Normal file
@@ -0,0 +1,233 @@
|
||||
from asyncio import sleep, gather, wait, FIRST_COMPLETED, create_task
|
||||
import logging
|
||||
from typing import Literal
|
||||
|
||||
from twitchAPI.eventsub.websocket import EventSubWebsocket
|
||||
from twitchAPI.twitch import Twitch
|
||||
from twitchAPI.object.eventsub import StreamOnlineEvent, ChannelUpdateEvent, ChannelChatMessageEvent, ChannelPointsCustomRewardRedemptionAddEvent
|
||||
from twitchAPI.oauth import validate_token
|
||||
|
||||
from core.temporal import get_client
|
||||
|
||||
from applications.common.repositories.streamers import StreamerConfigRepository, StreamerConfig
|
||||
from applications.twitch_webhook.state import UpdateEvent, EventType
|
||||
from applications.twitch_webhook.messages_proc import MessageEvent
|
||||
from applications.twitch_webhook.reward_redemption import RewardRedemption
|
||||
from applications.twitch_webhook.workflows.on_message import OnMessageWorkflow
|
||||
from applications.twitch_webhook.workflows.on_reward_redemption import OnRewardRedemptionWorkflow
|
||||
from applications.twitch_webhook.workflows.on_stream_online import OnStreamOnlineWorkflow
|
||||
from applications.twitch_webhook.workflows.on_channel_update import OnChannelUpdateWorkflow
|
||||
from applications.temporal_worker.queues import MAIN_QUEUE
|
||||
from .authorize import authorize
|
||||
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.setLevel(logging.INFO)
|
||||
|
||||
|
||||
class TwitchService:
|
||||
ONLINE_NOTIFICATION_DELAY = 15 * 60
|
||||
|
||||
def __init__(self, twitch: Twitch, streamer: StreamerConfig):
|
||||
self.twitch = twitch
|
||||
self.streamer = streamer
|
||||
|
||||
self.failed = False
|
||||
|
||||
async def on_channel_update(self, event: ChannelUpdateEvent):
|
||||
client = await get_client()
|
||||
|
||||
await client.start_workflow(
|
||||
OnChannelUpdateWorkflow.run,
|
||||
args=(
|
||||
UpdateEvent(
|
||||
broadcaster_user_id=event.event.broadcaster_user_id,
|
||||
broadcaster_user_login=event.event.broadcaster_user_login,
|
||||
title=event.event.title,
|
||||
category_name=event.event.category_name
|
||||
),
|
||||
EventType.CHANNEL_UPDATE,
|
||||
),
|
||||
id=f"on-channel-update-{event.event.broadcaster_user_id}",
|
||||
task_queue=MAIN_QUEUE
|
||||
)
|
||||
|
||||
async def on_stream_online(self, event: StreamOnlineEvent):
|
||||
client = await get_client()
|
||||
|
||||
await client.start_workflow(
|
||||
OnStreamOnlineWorkflow.run,
|
||||
args=(
|
||||
int(event.event.broadcaster_user_id),
|
||||
EventType.STREAM_ONLINE
|
||||
),
|
||||
id=f"on-stream-online-{event.event.broadcaster_user_id}",
|
||||
task_queue=MAIN_QUEUE
|
||||
)
|
||||
|
||||
async def on_channel_points_custom_reward_redemption_add(
|
||||
self,
|
||||
event: ChannelPointsCustomRewardRedemptionAddEvent
|
||||
):
|
||||
client = await get_client()
|
||||
|
||||
await client.start_workflow(
|
||||
OnRewardRedemptionWorkflow.run,
|
||||
RewardRedemption.from_twitch_event(event),
|
||||
id=f"on-reward-redemption-{event.event.broadcaster_user_id}-{event.event.reward.id}",
|
||||
task_queue=MAIN_QUEUE
|
||||
)
|
||||
|
||||
async def on_message(self, event: ChannelChatMessageEvent):
|
||||
client = await get_client()
|
||||
|
||||
await client.start_workflow(
|
||||
OnMessageWorkflow.run,
|
||||
MessageEvent.from_twitch_event(
|
||||
self.streamer.twitch.name,
|
||||
event
|
||||
),
|
||||
id=f"on-message-{event.event.broadcaster_user_id}-{event.event.message_id}",
|
||||
task_queue=MAIN_QUEUE
|
||||
)
|
||||
|
||||
async def _clean_subs(self, method: str, streamer: StreamerConfig):
|
||||
match method:
|
||||
case "listen_channel_update_v2":
|
||||
sub_type = "channel.update"
|
||||
case "listen_stream_online":
|
||||
sub_type = "stream.online"
|
||||
case "listen_channel_chat_message":
|
||||
sub_type = "channel.chat.message"
|
||||
case "listen_channel_points_custom_reward_redemption_add":
|
||||
sub_type = "channel.channel_points_custom_reward_redemption.add"
|
||||
case _:
|
||||
raise ValueError("Unknown method")
|
||||
|
||||
subs = await self.twitch.get_eventsub_subscriptions(
|
||||
user_id=str(streamer.twitch.id)
|
||||
)
|
||||
|
||||
for sub in subs.data:
|
||||
if sub.type == sub_type:
|
||||
try:
|
||||
await self.twitch.delete_eventsub_subscription(sub.id)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to delete subscription {sub.id}", exc_info=e)
|
||||
|
||||
async def subscribe_with_retry(
|
||||
self,
|
||||
method: Literal["listen_channel_update_v2"]
|
||||
| Literal["listen_stream_online"]
|
||||
| Literal["listen_channel_chat_message"]
|
||||
| Literal["listen_channel_points_custom_reward_redemption_add"],
|
||||
eventsub: EventSubWebsocket,
|
||||
streamer: StreamerConfig,
|
||||
retry: int = 10
|
||||
):
|
||||
await self._clean_subs(method, streamer)
|
||||
|
||||
try:
|
||||
match method:
|
||||
case "listen_channel_update_v2":
|
||||
await eventsub.listen_channel_update_v2(str(streamer.twitch.id), self.on_channel_update)
|
||||
case "listen_stream_online":
|
||||
await eventsub.listen_stream_online(str(streamer.twitch.id), self.on_stream_online)
|
||||
case "listen_channel_points_custom_reward_redemption_add":
|
||||
await eventsub.listen_channel_points_custom_reward_redemption_add(
|
||||
str(streamer.twitch.id),
|
||||
self.on_channel_points_custom_reward_redemption_add
|
||||
)
|
||||
case "listen_channel_chat_message":
|
||||
chatbot_in_chats = streamer.chatbot_in_chats or []
|
||||
|
||||
for chat_id in chatbot_in_chats:
|
||||
await eventsub.listen_channel_chat_message(
|
||||
str(chat_id),
|
||||
str(streamer.twitch.id),
|
||||
self.on_message
|
||||
)
|
||||
case _:
|
||||
raise ValueError("Unknown method")
|
||||
|
||||
return
|
||||
except Exception as e:
|
||||
if retry <= 0:
|
||||
raise e
|
||||
|
||||
await sleep(1)
|
||||
await self.subscribe_with_retry(method, eventsub, streamer, retry - 1)
|
||||
|
||||
async def subscribe_to_streamer(self, eventsub: EventSubWebsocket, streamer: StreamerConfig):
|
||||
logger.info(f"Subscribe to events for {streamer.twitch.name}")
|
||||
await gather(
|
||||
self.subscribe_with_retry("listen_channel_update_v2", eventsub, streamer),
|
||||
self.subscribe_with_retry("listen_stream_online", eventsub, streamer),
|
||||
self.subscribe_with_retry("listen_channel_points_custom_reward_redemption_add", eventsub, streamer),
|
||||
self.subscribe_with_retry("listen_channel_chat_message", eventsub, streamer),
|
||||
)
|
||||
logger.info(f"Subscribe to events for {streamer.twitch.name} done")
|
||||
|
||||
async def _check_token(self):
|
||||
assert self.twitch._user_auth_token is not None
|
||||
|
||||
while True:
|
||||
for _ in range(60):
|
||||
if self.failed:
|
||||
return
|
||||
|
||||
await sleep(1)
|
||||
|
||||
logger.info("Check token...")
|
||||
val_result = await validate_token(
|
||||
self.twitch._user_auth_token,
|
||||
auth_base_url=self.twitch.auth_base_url
|
||||
)
|
||||
if val_result.get('status', 200) != 200:
|
||||
await self.twitch.refresh_used_token()
|
||||
logger.info("Token refreshed")
|
||||
|
||||
async def run(self) -> None:
|
||||
eventsub = EventSubWebsocket(twitch=self.twitch)
|
||||
|
||||
try:
|
||||
eventsub.start()
|
||||
|
||||
logger.info("Subscribe to events...")
|
||||
await self.subscribe_to_streamer(eventsub, self.streamer)
|
||||
logger.info("Twitch service started")
|
||||
|
||||
await self._check_token()
|
||||
finally:
|
||||
logger.info("Twitch service stopping...")
|
||||
await eventsub.stop()
|
||||
|
||||
@classmethod
|
||||
async def _start_for_streamer(cls, streamer: StreamerConfig):
|
||||
try:
|
||||
twith = await authorize(streamer.twitch.name, auto_refresh_auth=True)
|
||||
await cls(twith, streamer).run()
|
||||
except Exception as e:
|
||||
logger.error("Twitch service failed", exc_info=e)
|
||||
|
||||
@classmethod
|
||||
async def start(cls):
|
||||
logger.info("Starting Twitch service...")
|
||||
|
||||
streamers = await StreamerConfigRepository.all()
|
||||
|
||||
await wait(
|
||||
[
|
||||
create_task(cls._start_for_streamer(streamer))
|
||||
for streamer in streamers
|
||||
],
|
||||
return_when=FIRST_COMPLETED
|
||||
)
|
||||
|
||||
await gather(
|
||||
*[cls._start_for_streamer(streamer) for streamer in streamers]
|
||||
)
|
||||
|
||||
logger.info("Twitch service stopped")
|
||||
@@ -3,7 +3,7 @@ from datetime import datetime, timezone, timedelta
|
||||
from twitchAPI.helper import first
|
||||
|
||||
from core.redis import redis_manager
|
||||
from repositories.streamers import StreamerConfigRepository
|
||||
from applications.common.repositories.streamers import StreamerConfigRepository
|
||||
|
||||
from .state import State, StateManager, EventType
|
||||
from .sent_notifications import SentNotificationRepository, SentNotificationType
|
||||
@@ -16,7 +16,7 @@ class StateWatcher:
|
||||
|
||||
@classmethod
|
||||
async def get_twitch_state(cls, streamer_id: int) -> State | None:
|
||||
twitch = await authorize()
|
||||
twitch = await authorize("kurbezz")
|
||||
|
||||
stream = await first(
|
||||
twitch.get_streams(user_id=[str(streamer_id)])
|
||||
14
src/applications/twitch_webhook/workflows/__init__.py
Normal file
14
src/applications/twitch_webhook/workflows/__init__.py
Normal file
@@ -0,0 +1,14 @@
|
||||
from .checker import StreamsCheckWorkflow
|
||||
from .on_channel_update import OnChannelUpdateWorkflow
|
||||
from .on_message import OnMessageWorkflow
|
||||
from .on_reward_redemption import OnRewardRedemptionWorkflow
|
||||
from .on_stream_online import OnStreamOnlineWorkflow
|
||||
|
||||
|
||||
__all__ = [
|
||||
"StreamsCheckWorkflow",
|
||||
"OnChannelUpdateWorkflow",
|
||||
"OnMessageWorkflow",
|
||||
"OnRewardRedemptionWorkflow",
|
||||
"OnStreamOnlineWorkflow",
|
||||
]
|
||||
33
src/applications/twitch_webhook/workflows/checker.py
Normal file
33
src/applications/twitch_webhook/workflows/checker.py
Normal file
@@ -0,0 +1,33 @@
|
||||
from datetime import timedelta
|
||||
|
||||
from temporalio import workflow
|
||||
from temporalio.client import Schedule, ScheduleActionStartWorkflow, ScheduleSpec, ScheduleIntervalSpec
|
||||
|
||||
from applications.temporal_worker.queues import MAIN_QUEUE
|
||||
from applications.twitch_webhook.activities.state_checker import check_streams_states
|
||||
|
||||
|
||||
@workflow.defn
|
||||
class StreamsCheckWorkflow:
|
||||
@classmethod
|
||||
def get_schedules(cls) -> dict[str, Schedule]:
|
||||
return {
|
||||
"check": Schedule(
|
||||
action=ScheduleActionStartWorkflow(
|
||||
cls.run,
|
||||
id="StreamsCheckWorkflow",
|
||||
task_queue=MAIN_QUEUE,
|
||||
),
|
||||
spec=ScheduleSpec(
|
||||
intervals=[ScheduleIntervalSpec(every=timedelta(minutes=2))]
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
@workflow.run
|
||||
async def run(self):
|
||||
await workflow.start_activity(
|
||||
check_streams_states,
|
||||
task_queue=MAIN_QUEUE,
|
||||
schedule_to_close_timeout=timedelta(minutes=1)
|
||||
)
|
||||
@@ -0,0 +1,26 @@
|
||||
from datetime import timedelta
|
||||
|
||||
from temporalio import workflow
|
||||
|
||||
from applications.temporal_worker.queues import MAIN_QUEUE
|
||||
from applications.twitch_webhook.activities.on_state_change import OnChannelUpdateActivity, on_channel_update_activity
|
||||
from applications.twitch_webhook.state import UpdateEvent, EventType
|
||||
|
||||
|
||||
@workflow.defn
|
||||
class OnChannelUpdateWorkflow:
|
||||
@workflow.run
|
||||
async def run(
|
||||
self,
|
||||
event: UpdateEvent,
|
||||
event_type: EventType,
|
||||
):
|
||||
await workflow.start_activity(
|
||||
on_channel_update_activity,
|
||||
OnChannelUpdateActivity(
|
||||
event=event,
|
||||
event_type=event_type,
|
||||
),
|
||||
task_queue=MAIN_QUEUE,
|
||||
start_to_close_timeout=timedelta(minutes=1),
|
||||
)
|
||||
19
src/applications/twitch_webhook/workflows/on_message.py
Normal file
19
src/applications/twitch_webhook/workflows/on_message.py
Normal file
@@ -0,0 +1,19 @@
|
||||
from datetime import timedelta
|
||||
|
||||
from temporalio import workflow
|
||||
|
||||
from applications.twitch_webhook.messages_proc import MessageEvent
|
||||
from applications.twitch_webhook.activities.message_proc import on_message_activity
|
||||
from applications.temporal_worker.queues import MAIN_QUEUE
|
||||
|
||||
|
||||
@workflow.defn
|
||||
class OnMessageWorkflow:
|
||||
@workflow.run
|
||||
async def run(self, message: MessageEvent) -> None:
|
||||
await workflow.start_activity(
|
||||
on_message_activity,
|
||||
message,
|
||||
task_queue=MAIN_QUEUE,
|
||||
schedule_to_close_timeout=timedelta(minutes=5)
|
||||
)
|
||||
@@ -0,0 +1,19 @@
|
||||
from datetime import timedelta
|
||||
|
||||
from temporalio import workflow
|
||||
|
||||
from applications.twitch_webhook.activities.redemption_reward import on_redemption_reward_add_activity
|
||||
from applications.twitch_webhook.reward_redemption import RewardRedemption
|
||||
from applications.temporal_worker.queues import MAIN_QUEUE
|
||||
|
||||
|
||||
@workflow.defn
|
||||
class OnRewardRedemptionWorkflow:
|
||||
@workflow.run
|
||||
async def run(self, reward: RewardRedemption):
|
||||
await workflow.start_activity(
|
||||
on_redemption_reward_add_activity,
|
||||
reward,
|
||||
task_queue=MAIN_QUEUE,
|
||||
schedule_to_close_timeout=timedelta(minutes=1)
|
||||
)
|
||||
@@ -0,0 +1,22 @@
|
||||
from datetime import timedelta
|
||||
|
||||
from temporalio import workflow
|
||||
|
||||
from applications.twitch_webhook.activities.on_state_change import on_stream_state_change_activity, OnStreamStateChangeActivity
|
||||
from applications.twitch_webhook.state import EventType
|
||||
from applications.temporal_worker.queues import MAIN_QUEUE
|
||||
|
||||
|
||||
@workflow.defn
|
||||
class OnStreamOnlineWorkflow:
|
||||
@workflow.run
|
||||
async def run(self, broadcaster_user_id: str | int, event_type: EventType):
|
||||
await workflow.start_activity(
|
||||
on_stream_state_change_activity,
|
||||
OnStreamStateChangeActivity(
|
||||
streamer_id=int(broadcaster_user_id),
|
||||
event_type=event_type
|
||||
),
|
||||
task_queue=MAIN_QUEUE,
|
||||
schedule_to_close_timeout=timedelta(minutes=1)
|
||||
)
|
||||
@@ -1,22 +0,0 @@
|
||||
from taskiq import TaskiqScheduler
|
||||
from taskiq.middlewares.retry_middleware import SimpleRetryMiddleware
|
||||
from taskiq.schedule_sources import LabelScheduleSource
|
||||
from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend
|
||||
|
||||
from core.config import config
|
||||
|
||||
|
||||
broker = ListQueueBroker(url=config.REDIS_URI) \
|
||||
.with_middlewares(
|
||||
SimpleRetryMiddleware(default_retry_count=5)
|
||||
) \
|
||||
.with_result_backend(RedisAsyncResultBackend(
|
||||
redis_url=config.REDIS_URI,
|
||||
result_ex_time=60 * 60 * 24 * 7,
|
||||
))
|
||||
|
||||
|
||||
scheduler = TaskiqScheduler(
|
||||
broker=broker,
|
||||
sources=[LabelScheduleSource(broker)],
|
||||
)
|
||||
10
src/core/temporal.py
Normal file
10
src/core/temporal.py
Normal file
@@ -0,0 +1,10 @@
|
||||
from temporalio.client import Client
|
||||
from temporalio.contrib.pydantic import pydantic_data_converter
|
||||
|
||||
|
||||
async def get_client() -> Client:
|
||||
return await Client.connect(
|
||||
"temporal:7233",
|
||||
namespace="default",
|
||||
data_converter=pydantic_data_converter
|
||||
)
|
||||
45
src/main.py
45
src/main.py
@@ -1,45 +0,0 @@
|
||||
import logging
|
||||
import sys
|
||||
|
||||
from modules.games_list import start as start_games_list_module
|
||||
from modules.stream_notifications import start as start_stream_notifications_module
|
||||
|
||||
from core.mongo import mongo_manager
|
||||
from core.redis import redis_manager
|
||||
from core.broker import broker
|
||||
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.setLevel(logging.INFO)
|
||||
|
||||
|
||||
async def main():
|
||||
logger.info("Starting services...")
|
||||
|
||||
if len(sys.argv) != 2:
|
||||
raise RuntimeError("Usage: python main.py <module>")
|
||||
|
||||
module = sys.argv[1]
|
||||
|
||||
await mongo_manager.init()
|
||||
await redis_manager.init()
|
||||
|
||||
if not broker.is_worker_process:
|
||||
await broker.startup()
|
||||
|
||||
if module == "games_list":
|
||||
await start_games_list_module()
|
||||
elif module == "stream_notifications":
|
||||
await start_stream_notifications_module()
|
||||
else:
|
||||
raise RuntimeError(f"Unknown module: {module}")
|
||||
|
||||
exit(0)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from asyncio import run
|
||||
|
||||
run(main())
|
||||
@@ -1,7 +0,0 @@
|
||||
from .discord import start_discord_sevice
|
||||
|
||||
|
||||
start = start_discord_sevice
|
||||
|
||||
|
||||
__all__ = ["start"]
|
||||
@@ -1,24 +0,0 @@
|
||||
from core.broker import broker
|
||||
from repositories.streamers import StreamerConfigRepository
|
||||
from .synchronizer import syncronize
|
||||
|
||||
|
||||
@broker.task("scheduler_sync.syncronize_task")
|
||||
async def syncronize_task(twitch_id: int):
|
||||
streamer = await StreamerConfigRepository.get_by_twitch_id(twitch_id)
|
||||
|
||||
if streamer.integrations.discord is None:
|
||||
return
|
||||
|
||||
await syncronize(streamer.twitch, streamer.integrations.discord.guild_id)
|
||||
|
||||
|
||||
@broker.task("scheduler_sync.syncronize_all_task", schedule=[{"cron": "*/5 * * * *"}])
|
||||
async def syncronize_all_task():
|
||||
streamers = await StreamerConfigRepository().all()
|
||||
|
||||
for streamer in streamers:
|
||||
if streamer.integrations.discord is None:
|
||||
continue
|
||||
|
||||
await syncronize_task.kiq(streamer.twitch.id)
|
||||
@@ -1,7 +0,0 @@
|
||||
from .twitch.webhook import start_twitch_service
|
||||
|
||||
|
||||
start = start_twitch_service
|
||||
|
||||
|
||||
__all__ = ["start"]
|
||||
@@ -1,84 +0,0 @@
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from twitchAPI.helper import first
|
||||
|
||||
from core.broker import broker
|
||||
from repositories.streamers import StreamerConfigRepository
|
||||
|
||||
from .state import State, UpdateEvent, EventType
|
||||
from .watcher import StateWatcher
|
||||
from .messages_proc import MessageEvent, MessagesProc
|
||||
from .twitch.authorize import authorize
|
||||
|
||||
|
||||
@broker.task(
|
||||
"stream_notifications.twitch.on_stream_state_change_with_check",
|
||||
retry_on_error=True
|
||||
)
|
||||
async def on_stream_state_change_with_check(
|
||||
event: UpdateEvent,
|
||||
event_type: EventType
|
||||
):
|
||||
twitch = await authorize()
|
||||
|
||||
stream = await first(twitch.get_streams(user_id=[event.broadcaster_user_id]))
|
||||
if stream is None:
|
||||
return
|
||||
|
||||
await on_stream_state_change.kiq(
|
||||
int(event.broadcaster_user_id),
|
||||
event_type,
|
||||
State(
|
||||
title=event.title,
|
||||
category=event.category_name,
|
||||
last_live_at=datetime.now(timezone.utc)
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@broker.task(
|
||||
"stream_notifications.twitch.on_stream_state_change",
|
||||
retry_on_error=True
|
||||
)
|
||||
async def on_stream_state_change(
|
||||
streamer_id: int,
|
||||
event_type: EventType,
|
||||
new_state: State | None = None
|
||||
):
|
||||
await StateWatcher.on_stream_state_change(
|
||||
streamer_id,
|
||||
event_type,
|
||||
new_state,
|
||||
)
|
||||
|
||||
|
||||
@broker.task(
|
||||
"stream_notifications.check_streams_states",
|
||||
schedule=[{"cron": "*/2 * * * *"}]
|
||||
)
|
||||
async def check_streams_states():
|
||||
streamers = await StreamerConfigRepository.all()
|
||||
streamers_ids = [str(streamer.twitch.id) for streamer in streamers]
|
||||
|
||||
twitch = await authorize()
|
||||
|
||||
async for stream in twitch.get_streams(user_id=streamers_ids):
|
||||
state = State(
|
||||
title=stream.title,
|
||||
category=stream.game_name,
|
||||
last_live_at=datetime.now(timezone.utc)
|
||||
)
|
||||
|
||||
await StateWatcher.on_stream_state_change(
|
||||
int(stream.user_id),
|
||||
EventType.UNKNOWN,
|
||||
state
|
||||
)
|
||||
|
||||
|
||||
@broker.task(
|
||||
"stream_notifications.on_message",
|
||||
retry_on_error=True
|
||||
)
|
||||
async def on_message(event: MessageEvent):
|
||||
await MessagesProc.on_message(event)
|
||||
@@ -1,172 +0,0 @@
|
||||
from asyncio import sleep, gather
|
||||
import logging
|
||||
from typing import NoReturn, Literal
|
||||
|
||||
from twitchAPI.eventsub.webhook import EventSubWebhook
|
||||
from twitchAPI.twitch import Twitch
|
||||
from twitchAPI.object.eventsub import StreamOnlineEvent, ChannelUpdateEvent, ChannelChatMessageEvent
|
||||
from twitchAPI.oauth import validate_token
|
||||
|
||||
from core.config import config
|
||||
from repositories.streamers import StreamerConfigRepository, StreamerConfig
|
||||
from modules.stream_notifications.tasks import on_stream_state_change, on_stream_state_change_with_check, on_message
|
||||
from modules.stream_notifications.state import UpdateEvent, EventType
|
||||
from modules.stream_notifications.messages_proc import MessageEvent
|
||||
from .authorize import authorize
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TwitchService:
|
||||
ONLINE_NOTIFICATION_DELAY = 15 * 60
|
||||
|
||||
def __init__(self, twitch: Twitch):
|
||||
self.twitch = twitch
|
||||
|
||||
self.failed = False
|
||||
|
||||
async def on_channel_update(self, event: ChannelUpdateEvent):
|
||||
await on_stream_state_change_with_check.kiq(
|
||||
UpdateEvent(
|
||||
broadcaster_user_id=event.event.broadcaster_user_id,
|
||||
title=event.event.title,
|
||||
category_name=event.event.category_name
|
||||
),
|
||||
EventType.CHANNEL_UPDATE,
|
||||
)
|
||||
|
||||
async def on_stream_online(self, event: StreamOnlineEvent):
|
||||
await on_stream_state_change.kiq(
|
||||
int(event.event.broadcaster_user_id),
|
||||
EventType.STREAM_ONLINE,
|
||||
)
|
||||
|
||||
async def on_message(self, event: ChannelChatMessageEvent):
|
||||
await on_message.kiq(
|
||||
MessageEvent.from_twitch_event(event)
|
||||
)
|
||||
|
||||
async def subscribe_with_retry(
|
||||
self,
|
||||
method: Literal["listen_channel_update_v2"]
|
||||
| Literal["listen_stream_online"]
|
||||
| Literal["listen_channel_chat_message"],
|
||||
eventsub: EventSubWebhook,
|
||||
streamer: StreamerConfig,
|
||||
retry: int = 10
|
||||
):
|
||||
|
||||
try:
|
||||
match method:
|
||||
case "listen_channel_update_v2":
|
||||
await eventsub.listen_channel_update_v2(str(streamer.twitch.id), self.on_channel_update)
|
||||
case "listen_stream_online":
|
||||
await eventsub.listen_stream_online(str(streamer.twitch.id), self.on_stream_online)
|
||||
case "listen_channel_chat_message":
|
||||
await eventsub.listen_channel_chat_message(
|
||||
str(streamer.twitch.id),
|
||||
str(config.TWITCH_ADMIN_USER_ID),
|
||||
self.on_message
|
||||
)
|
||||
case _:
|
||||
raise ValueError("Unknown method")
|
||||
|
||||
return
|
||||
except Exception as e:
|
||||
if retry <= 0:
|
||||
raise e
|
||||
|
||||
match method:
|
||||
case "listen_channel_update_v2":
|
||||
sub_type = "channel.update"
|
||||
case "listen_stream_online":
|
||||
sub_type = "stream.online"
|
||||
case "listen_channel_chat_message":
|
||||
sub_type = "channel.chat.message"
|
||||
case _:
|
||||
raise ValueError("Unknown method")
|
||||
|
||||
subs = await self.twitch.get_eventsub_subscriptions(
|
||||
user_id=str(streamer.twitch.id)
|
||||
)
|
||||
|
||||
for sub in subs.data:
|
||||
if sub.type == sub_type:
|
||||
try:
|
||||
await self.twitch.delete_eventsub_subscription(sub.id)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to delete subscription {sub.id}", exc_info=e)
|
||||
|
||||
await sleep(1)
|
||||
await self.subscribe_with_retry(method, eventsub, streamer, retry - 1)
|
||||
|
||||
async def subscribe_to_streamer(self, eventsub: EventSubWebhook, streamer: StreamerConfig):
|
||||
logger.info(f"Subscribe to events for {streamer.twitch.name}")
|
||||
await gather(
|
||||
self.subscribe_with_retry("listen_channel_update_v2", eventsub, streamer),
|
||||
self.subscribe_with_retry("listen_stream_online", eventsub, streamer),
|
||||
self.subscribe_with_retry("listen_channel_chat_message", eventsub, streamer),
|
||||
)
|
||||
logger.info(f"Subscribe to events for {streamer.twitch.name} done")
|
||||
|
||||
async def _check_token(self):
|
||||
assert self.twitch._user_auth_token is not None
|
||||
|
||||
while True:
|
||||
for _ in range(60):
|
||||
if self.failed:
|
||||
return
|
||||
|
||||
await sleep(1)
|
||||
|
||||
logger.info("Check token...")
|
||||
val_result = await validate_token(
|
||||
self.twitch._user_auth_token,
|
||||
auth_base_url=self.twitch.auth_base_url
|
||||
)
|
||||
if val_result.get('status', 200) != 200:
|
||||
await self.twitch.refresh_used_token()
|
||||
logger.info("Token refreshed")
|
||||
|
||||
async def run(self) -> NoReturn:
|
||||
eventsub = EventSubWebhook(
|
||||
callback_url=config.TWITCH_CALLBACK_URL,
|
||||
port=config.TWITCH_CALLBACK_PORT,
|
||||
twitch=self.twitch,
|
||||
message_deduplication_history_length=50
|
||||
)
|
||||
eventsub.wait_for_subscription_confirm_timeout = 60
|
||||
eventsub.unsubscribe_on_stop = False
|
||||
|
||||
streamers = await StreamerConfigRepository.all()
|
||||
|
||||
try:
|
||||
eventsub.start()
|
||||
|
||||
logger.info("Subscribe to events...")
|
||||
await gather(
|
||||
*[self.subscribe_to_streamer(eventsub, streamer) for streamer in streamers]
|
||||
)
|
||||
logger.info("Twitch service started")
|
||||
|
||||
await self._check_token()
|
||||
finally:
|
||||
logger.info("Twitch service stopping...")
|
||||
await eventsub.stop()
|
||||
|
||||
@classmethod
|
||||
async def start(cls):
|
||||
logger.info("Starting Twitch service...")
|
||||
|
||||
try:
|
||||
twith = await authorize(auto_refresh_auth=True)
|
||||
await cls(twith).run()
|
||||
except Exception as e:
|
||||
logger.error("Twitch service failed", exc_info=e)
|
||||
|
||||
logger.info("Twitch service stopped")
|
||||
|
||||
|
||||
async def start_twitch_service() -> NoReturn:
|
||||
await TwitchService.start()
|
||||
@@ -1,2 +0,0 @@
|
||||
from modules.scheduler_sync.tasks import * # noqa: F403
|
||||
from modules.stream_notifications.tasks import * # noqa: F403
|
||||
Reference in New Issue
Block a user