43 Commits

Author SHA1 Message Date
eddad6454d Disable messages proc 2025-04-24 17:17:45 +02:00
8ac15a1687 Fix 2025-04-23 09:48:26 +02:00
36c542b822 Fix 2025-04-22 19:04:16 +02:00
b67d00bcd7 Fix 2025-04-22 19:02:16 +02:00
81a51a3d0d Fix 2025-04-22 18:48:47 +02:00
90756c884c Fix 2025-04-22 18:45:30 +02:00
df501c27d7 Fix 2025-04-22 18:30:14 +02:00
33fbc4c446 Fix 2025-04-22 13:55:43 +02:00
03039bbd09 Fix 2025-04-22 13:51:21 +02:00
6ea36f165e Fix execution timeout in ScheduleSyncWorkflow 2025-04-22 13:42:06 +02:00
7721e3a840 Fix 2025-04-22 13:03:23 +02:00
1c5b7e81e8 Migrate to python 3.12 2025-04-22 01:51:36 +02:00
586359f8ce Fix 2025-04-22 01:32:12 +02:00
12dd0f9af7 Fix 2025-04-22 01:22:37 +02:00
4174d67084 Fix 2025-04-22 01:17:42 +02:00
2d8b767ae3 Revert to python 3.13 2025-04-22 00:56:38 +02:00
a0a53800ab Migrate to python3.11 2025-04-22 00:51:14 +02:00
959ab30265 Revert "Fix"
This reverts commit 560a5ac793.
2025-04-22 00:49:53 +02:00
560a5ac793 Fix 2025-04-22 00:42:21 +02:00
28c64341e1 Fix 2025-04-22 00:18:14 +02:00
2ef3148d46 Fix 2025-04-21 23:58:08 +02:00
cb464ad9f3 Fix 2025-04-21 23:52:10 +02:00
bb67ff7bba Fix 2025-04-21 23:32:55 +02:00
cadd54565a Fix 2025-04-21 23:30:04 +02:00
5bea39cd2c Fix 2025-04-21 23:24:33 +02:00
77fb68a5e3 Fix 2025-04-21 21:54:30 +02:00
fb19f7a125 Fix 2025-04-21 21:03:54 +02:00
acaff281c3 Fix 2025-04-21 20:54:55 +02:00
dc2486f5ff Fix 2025-04-21 20:47:33 +02:00
92be127ced Fix 2025-04-21 20:38:36 +02:00
b18f717fae Add temporal worker 2025-04-21 20:21:58 +02:00
58d3a37985 Fix 2025-04-21 18:55:24 +02:00
92c5398f0d Fix 2025-04-21 18:53:32 +02:00
bef5e907f5 Fix 2025-04-21 18:47:38 +02:00
004a21f6cc Fix 2025-04-21 18:41:40 +02:00
0917e5634e Fix 2025-04-21 18:36:21 +02:00
f9995acf3d Update logging 2025-04-21 18:09:07 +02:00
288e4769bc Refactor 2025-04-21 18:03:24 +02:00
cafa0e3afd Fix 2025-04-21 16:00:43 +02:00
5a769c96a0 Fix 2025-04-21 15:57:27 +02:00
f1d023c9a1 Fix 2025-04-21 15:42:36 +02:00
cfce98a42f Update 2025-04-21 15:40:56 +02:00
abe0cbb173 New structure 2025-04-21 13:51:01 +02:00
75 changed files with 571 additions and 287 deletions

View File

@@ -50,25 +50,25 @@ jobs:
url: ${{ secrets.WEBHOOK_URL }}
-
name: Invoke deployment hook (worker)
name: Invoke deployment hook (twitch_webhook)
uses: joelwmale/webhook-action@master
with:
url: ${{ secrets.WEBHOOK_URL_2 }}
-
name: Invoke deployment hook (scheduler)
name: Invoke deployment hook (temporal_worker)
uses: joelwmale/webhook-action@master
with:
url: ${{ secrets.WEBHOOK_URL_3 }}
-
name: Invoke deployment hook (stream_notifications)
uses: joelwmale/webhook-action@master
with:
url: ${{ secrets.WEBHOOK_URL_4 }}
# -
# name: Invoke deployment hook (stream_notifications)
# uses: joelwmale/webhook-action@master
# with:
# url: ${{ secrets.WEBHOOK_URL_4 }}
-
name: Invoke deployment hook (web_app)
uses: joelwmale/webhook-action@master
with:
url: ${{ secrets.WEBHOOK_URL_5 }}
# -
# name: Invoke deployment hook (web_app)
# uses: joelwmale/webhook-action@master
# with:
# url: ${{ secrets.WEBHOOK_URL_5 }}

2
.gitignore vendored
View File

@@ -3,3 +3,5 @@
.DS_Store
.vscode
*.cpython-*.pyc

View File

@@ -1,19 +1,11 @@
FROM ghcr.io/astral-sh/uv:python3.12-bookworm-slim
COPY ./scripts/*.sh /
RUN chmod +x /*.sh
WORKDIR /app
COPY ./pyproject.toml ./uv.lock ./
COPY ./pyproject.toml ./
COPY ./uv.lock ./
RUN --mount=type=ssh uv venv \
&& uv sync --frozen
RUN uv venv && uv sync --frozen
COPY ./src /app/src
COPY ./src ./src
ENV PATH="/app/.venv/bin:$PATH"
EXPOSE 80
CMD ["uv", "run", "src/main.py"]
WORKDIR /app/src

View File

@@ -20,6 +20,7 @@ dependencies = [
"authx>=1.4.1,<2",
"httpx-oauth>=0.16.1,<0.17",
"uvicorn[standard]>=0.34.0,<0.35",
"temporalio>=1.10.0",
]
[tool.hatch.build.targets.sdist]

View File

@@ -1,3 +0,0 @@
#! /usr/bin/env sh
uv run ./src/main.py $1

View File

@@ -1,3 +0,0 @@
#! /usr/bin/env sh
uv run --directory src taskiq scheduler core.broker:scheduler modules.tasks

View File

@@ -1,3 +0,0 @@
#! /usr/bin/env sh
uv run --directory src uvicorn modules.web_app.app:app --host 0.0.0.0 --port 80

View File

@@ -1,3 +0,0 @@
#! /usr/bin/env sh
uv run --directory src taskiq worker core.broker:broker modules.tasks

View File

@@ -1,31 +1,38 @@
from pydantic import BaseModel
class TwitchConfig(BaseModel):
id: int
name: str
class NotificationsConfig(BaseModel):
start_stream: str
change_category: str | None = None
redemption_reward: str | None = None
class GamesListConfig(BaseModel):
channel_id: int
message_id: int
class DiscordConfig(BaseModel):
guild_id: int
notifications_channel_id: int
games_list: GamesListConfig | None = None
roles: dict[str, int] | None = None
class TelegramConfig(BaseModel):
notifications_channel_id: int
class IntegrationsConfig(BaseModel):
discord: DiscordConfig | None = None
telegram: TelegramConfig | None = None
class StreamerConfig(BaseModel):
twitch: TwitchConfig
notifications: NotificationsConfig

View File

@@ -1,4 +1,4 @@
from domain.streamers import StreamerConfig
from applications.common.domain.streamers import StreamerConfig
from .base import BaseRepository

View File

@@ -1,4 +1,4 @@
from domain.users import CreateUser, User
from applications.common.domain.users import CreateUser, User
from .base import BaseRepository
@@ -18,10 +18,10 @@ class UserRepository(BaseRepository):
)
@classmethod
async def get_or_create_user(cls, newUser: CreateUser) -> User:
async def get_or_create_user(cls, new_user: CreateUser) -> User:
filter_data = {}
for provider, data in newUser.oauths.items():
for provider, data in new_user.oauths.items():
filter_data[f"oauths.{provider}.id"] = data.id
async with cls.connect() as collection:
@@ -29,7 +29,7 @@ class UserRepository(BaseRepository):
filter_data,
{
"$setOnInsert": {
**newUser.model_dump(),
**new_user.model_dump(),
}
},
upsert=True,

View File

@@ -0,0 +1,13 @@
from asyncio import run
from applications.games_list.discord import client, logger
from core.config import config
async def start_discord_sevice():
logger.info("Starting Discord service...")
await client.start(config.DISCORD_BOT_TOKEN)
run(start_discord_sevice())

View File

@@ -5,13 +5,15 @@ from discord.abc import Messageable
from discord import Object
from discord import app_commands
from modules.games_list.games_list import GameList, GameItem
from applications.common.repositories.streamers import StreamerConfigRepository
from applications.games_list.games_list import GameList, GameItem
from core.config import config
from repositories.streamers import StreamerConfigRepository
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
async def get_game_list_channel_to_message_map() -> dict[int, int]:
@@ -240,9 +242,3 @@ async def replace(interaction: discord.Interaction, game: str, new: str):
await game_list.save()
await interaction.response.send_message("Игра заменена!", ephemeral=True)
async def start_discord_sevice():
logger.info("Starting Discord service...")
await client.start(config.DISCORD_BOT_TOKEN)

View File

@@ -0,0 +1,7 @@
from .sync import syncronize, syncronize_all
__all__ = [
"syncronize",
"syncronize_all",
]

View File

@@ -0,0 +1,29 @@
from temporalio import activity
from applications.common.repositories.streamers import StreamerConfigRepository
from applications.schedule_sync.synchronizer import syncronize as syncronize_internal
@activity.defn
async def syncronize(twitch_id: int):
try:
streamer = await StreamerConfigRepository.get_by_twitch_id(twitch_id)
if streamer.integrations.discord is None:
return
await syncronize_internal(streamer.twitch, streamer.integrations.discord.guild_id)
except Exception as e:
activity.logger.error(f"Error during synchronization: {e}")
raise e
@activity.defn
async def syncronize_all():
streamers = await StreamerConfigRepository().all()
for streamer in streamers:
if streamer.integrations.discord is None:
continue
await syncronize(streamer.twitch.id)

View File

@@ -1,7 +1,7 @@
import logging
from datetime import datetime
from domain.streamers import TwitchConfig
from applications.common.domain.streamers import TwitchConfig
from .twitch_events import get_twitch_events, TwitchEvent
from .discord_events import (

View File

@@ -0,0 +1,6 @@
from .sync import ScheduleSyncWorkflow
__all__ = [
"ScheduleSyncWorkflow"
]

View File

@@ -0,0 +1,34 @@
from datetime import timedelta
from temporalio import workflow
from temporalio.client import Schedule, ScheduleActionStartWorkflow, ScheduleSpec, ScheduleIntervalSpec
from applications.schedule_sync.activities import syncronize_all
from applications.temporal_worker.queues import MAIN_QUEUE
@workflow.defn
class ScheduleSyncWorkflow:
@classmethod
def get_schedules(cls) -> dict[str, Schedule]:
return {
"all": Schedule(
action=ScheduleActionStartWorkflow(
cls.run,
id="ScheduleSyncWorkflow",
task_queue=MAIN_QUEUE,
execution_timeout=timedelta(minutes=1),
),
spec=ScheduleSpec(
intervals=[ScheduleIntervalSpec(every=timedelta(minutes=5))]
)
)
}
@workflow.run
async def run(self):
await workflow.execute_activity(
syncronize_all,
task_queue=MAIN_QUEUE,
schedule_to_close_timeout=timedelta(minutes=5),
)

View File

@@ -0,0 +1,58 @@
from asyncio import run
from core.temporal import get_client
from temporalio.client import ScheduleAlreadyRunningError
from temporalio.worker import Worker, UnsandboxedWorkflowRunner
from applications.schedule_sync import activities as schedule_sync_activities
from applications.schedule_sync.workflows import ScheduleSyncWorkflow
from applications.twitch_webhook import activities as twitch_activities
from applications.twitch_webhook import workflows as twitch_workflows
from .queues import MAIN_QUEUE
async def main():
client = await get_client()
for id, schedule in ScheduleSyncWorkflow.get_schedules().items():
try:
await client.create_schedule(f"ScheduleSyncWorkflow-{id}", schedule)
except ScheduleAlreadyRunningError:
pass
for id, schedule in twitch_workflows.StreamsCheckWorkflow.get_schedules().items():
try:
await client.create_schedule(f"StreamsCheckWorkflow-{id}", schedule)
except ScheduleAlreadyRunningError:
pass
worker: Worker = Worker(
client,
task_queue=MAIN_QUEUE,
workflows=[
ScheduleSyncWorkflow,
twitch_workflows.StreamsCheckWorkflow,
twitch_workflows.OnChannelUpdateWorkflow,
twitch_workflows.OnMessageWorkflow,
twitch_workflows.OnRewardRedemptionWorkflow,
twitch_workflows.OnStreamOnlineWorkflow,
],
activities=[
schedule_sync_activities.syncronize,
schedule_sync_activities.syncronize_all,
twitch_activities.on_message_activity,
twitch_activities.on_stream_state_change_activity,
twitch_activities.check_streams_states,
twitch_activities.on_redemption_reward_add_activity,
twitch_activities.on_channel_update_activity,
],
workflow_runner=UnsandboxedWorkflowRunner()
)
await worker.run()
run(main())

View File

@@ -0,0 +1 @@
MAIN_QUEUE = "main"

View File

@@ -0,0 +1,10 @@
from asyncio import run
from .twitch.webhook import TwitchService
async def start_twitch_service() -> None:
await TwitchService.start()
run(start_twitch_service())

View File

@@ -0,0 +1,13 @@
from .message_proc import on_message_activity
from .on_state_change import on_stream_state_change_activity, on_channel_update_activity
from .redemption_reward import on_redemption_reward_add_activity
from .state_checker import check_streams_states
__all__ = [
"on_message_activity",
"on_stream_state_change_activity",
"check_streams_states",
"on_redemption_reward_add_activity",
"on_channel_update_activity",
]

View File

@@ -0,0 +1,13 @@
from temporalio import activity
from applications.twitch_webhook.messages_proc import MessageEvent, MessagesProc
@activity.defn
async def on_message_activity(
event: MessageEvent
):
await MessagesProc.on_message(
event.received_as,
event
)

View File

@@ -0,0 +1,58 @@
from datetime import datetime, timezone
from temporalio import activity
from pydantic import BaseModel
from twitchAPI.helper import first
from applications.twitch_webhook.state import State, EventType, UpdateEvent
from applications.twitch_webhook.watcher import StateWatcher
from applications.twitch_webhook.twitch.authorize import authorize
class OnStreamStateChangeActivity(BaseModel):
streamer_id: int
event_type: EventType
new_state: State | None = None
@activity.defn
async def on_stream_state_change_activity(
data: OnStreamStateChangeActivity
):
await StateWatcher.on_stream_state_change(
data.streamer_id,
data.event_type,
data.new_state,
)
class OnChannelUpdateActivity(BaseModel):
event: UpdateEvent
event_type: EventType
@activity.defn
async def on_channel_update_activity(
data: OnChannelUpdateActivity
):
twitch = await authorize(data.event.broadcaster_user_login)
stream = await first(twitch.get_streams(
user_id=[data.event.broadcaster_user_id])
)
if stream is None:
return
await on_stream_state_change_activity(
OnStreamStateChangeActivity(
streamer_id=int(data.event.broadcaster_user_id),
event_type=data.event_type,
new_state=State(
title=data.event.title,
category=data.event.category_name,
last_live_at=datetime.now(timezone.utc)
),
)
)

View File

@@ -0,0 +1,8 @@
from temporalio import activity
from applications.twitch_webhook.reward_redemption import RewardRedemption, on_redemption_reward_add
@activity.defn
async def on_redemption_reward_add_activity(event: RewardRedemption):
await on_redemption_reward_add(event)

View File

@@ -0,0 +1,29 @@
from datetime import datetime, timezone
from temporalio import activity
from applications.common.repositories.streamers import StreamerConfigRepository
from applications.twitch_webhook.twitch.authorize import authorize
from applications.twitch_webhook.state import State, EventType
from applications.twitch_webhook.watcher import StateWatcher
@activity.defn
async def check_streams_states():
streamers = await StreamerConfigRepository.all()
streamers_ids = [str(streamer.twitch.id) for streamer in streamers]
twitch = await authorize("kurbezz")
async for stream in twitch.get_streams(user_id=streamers_ids):
state = State(
title=stream.title,
category=stream.game_name,
last_live_at=datetime.now(timezone.utc)
)
await StateWatcher.on_stream_state_change(
int(stream.user_id),
EventType.UNKNOWN,
state
)

View File

@@ -39,6 +39,8 @@ class MessageType(StrEnum):
class MessageEvent(BaseModel):
received_as: str
broadcaster_user_id: str
broadcaster_user_name: str
broadcaster_user_login: str
@@ -57,8 +59,10 @@ class MessageEvent(BaseModel):
channel_points_custom_reward_id: str | None
@classmethod
def from_twitch_event(cls, event: ChannelChatMessageEvent):
def from_twitch_event(cls, received_as: str, event: ChannelChatMessageEvent):
return cls(
received_as=received_as,
broadcaster_user_id=event.event.broadcaster_user_id,
broadcaster_user_name=event.event.broadcaster_user_name,
broadcaster_user_login=event.event.broadcaster_user_login,
@@ -219,7 +223,6 @@ class MessagesProc:
"да да, иди уже",
reply_parent_message_id=event.message_id
)
return
@classmethod
async def _ask_ai(cls, twitch: Twitch, event: MessageEvent):
@@ -251,7 +254,7 @@ class MessagesProc:
thread_id=event.message_id
)
except Exception as e:
logger.error("Failed to get completion: {}", e, exc_info=True)
logger.error(f"Failed to get completion: {e}", exc_info=True)
await twitch.send_chat_message(
event.broadcaster_user_id,
@@ -305,6 +308,8 @@ class MessagesProc:
@classmethod
async def on_message(cls, received_as: str, event: MessageEvent):
return
if event.chatter_user_name in cls.FULL_IGNORED_USER_LOGINS:
return

View File

@@ -3,7 +3,7 @@ import logging
from httpx import AsyncClient
from core.config import config
from domain.streamers import StreamerConfig
from applications.common.domain.streamers import StreamerConfig
from .state import State
from .sent_notifications import SentNotification, SentNotificationType, SentResult

View File

@@ -4,7 +4,7 @@ from pydantic import BaseModel
from twitchAPI.object.eventsub import ChannelPointsCustomRewardRedemptionAddEvent
from repositories.streamers import StreamerConfigRepository
from applications.common.repositories.streamers import StreamerConfigRepository
from .twitch.authorize import authorize

View File

@@ -1,21 +1,30 @@
from asyncio import sleep, gather, wait, FIRST_COMPLETED, create_task
import logging
from typing import NoReturn, Literal
from typing import Literal
from twitchAPI.eventsub.websocket import EventSubWebsocket
from twitchAPI.twitch import Twitch
from twitchAPI.object.eventsub import StreamOnlineEvent, ChannelUpdateEvent, ChannelChatMessageEvent, ChannelPointsCustomRewardRedemptionAddEvent
from twitchAPI.oauth import validate_token
from repositories.streamers import StreamerConfigRepository, StreamerConfig
from modules.stream_notifications.tasks import on_stream_state_change, on_stream_state_change_with_check, on_message, on_redemption_reward_add_task
from modules.stream_notifications.state import UpdateEvent, EventType
from modules.stream_notifications.messages_proc import MessageEvent
from modules.stream_notifications.reward_redemption import RewardRedemption
from core.temporal import get_client
from applications.common.repositories.streamers import StreamerConfigRepository, StreamerConfig
from applications.twitch_webhook.state import UpdateEvent, EventType
from applications.twitch_webhook.messages_proc import MessageEvent
from applications.twitch_webhook.reward_redemption import RewardRedemption
from applications.twitch_webhook.workflows.on_message import OnMessageWorkflow
from applications.twitch_webhook.workflows.on_reward_redemption import OnRewardRedemptionWorkflow
from applications.twitch_webhook.workflows.on_stream_online import OnStreamOnlineWorkflow
from applications.twitch_webhook.workflows.on_channel_update import OnChannelUpdateWorkflow
from applications.temporal_worker.queues import MAIN_QUEUE
from .authorize import authorize
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
class TwitchService:
@@ -28,7 +37,11 @@ class TwitchService:
self.failed = False
async def on_channel_update(self, event: ChannelUpdateEvent):
await on_stream_state_change_with_check.kiq(
client = await get_client()
await client.start_workflow(
OnChannelUpdateWorkflow.run,
args=(
UpdateEvent(
broadcaster_user_id=event.event.broadcaster_user_id,
broadcaster_user_login=event.event.broadcaster_user_login,
@@ -36,26 +49,48 @@ class TwitchService:
category_name=event.event.category_name
),
EventType.CHANNEL_UPDATE,
),
id=f"on-channel-update-{event.event.broadcaster_user_id}",
task_queue=MAIN_QUEUE
)
async def on_stream_online(self, event: StreamOnlineEvent):
await on_stream_state_change.kiq(
client = await get_client()
await client.start_workflow(
OnStreamOnlineWorkflow.run,
args=(
int(event.event.broadcaster_user_id),
EventType.STREAM_ONLINE,
EventType.STREAM_ONLINE
),
id=f"on-stream-online-{event.event.broadcaster_user_id}",
task_queue=MAIN_QUEUE
)
async def on_channel_points_custom_reward_redemption_add(
self,
event: ChannelPointsCustomRewardRedemptionAddEvent
):
await on_redemption_reward_add_task(
RewardRedemption.from_twitch_event(event)
client = await get_client()
await client.start_workflow(
OnRewardRedemptionWorkflow.run,
RewardRedemption.from_twitch_event(event),
id=f"on-reward-redemption-{event.event.broadcaster_user_id}-{event.event.reward.id}",
task_queue=MAIN_QUEUE
)
async def on_message(self, event: ChannelChatMessageEvent):
await on_message.kiq(
client = await get_client()
await client.start_workflow(
OnMessageWorkflow.run,
MessageEvent.from_twitch_event(
self.streamer.twitch.name,
MessageEvent.from_twitch_event(event)
event
),
id=f"on-message-{event.event.broadcaster_user_id}-{event.event.message_id}",
task_queue=MAIN_QUEUE
)
async def _clean_subs(self, method: str, streamer: StreamerConfig):
@@ -154,7 +189,7 @@ class TwitchService:
await self.twitch.refresh_used_token()
logger.info("Token refreshed")
async def run(self) -> NoReturn:
async def run(self) -> None:
eventsub = EventSubWebsocket(twitch=self.twitch)
try:
@@ -196,7 +231,3 @@ class TwitchService:
)
logger.info("Twitch service stopped")
async def start_twitch_service() -> NoReturn:
await TwitchService.start()

View File

@@ -3,7 +3,7 @@ from datetime import datetime, timezone, timedelta
from twitchAPI.helper import first
from core.redis import redis_manager
from repositories.streamers import StreamerConfigRepository
from applications.common.repositories.streamers import StreamerConfigRepository
from .state import State, StateManager, EventType
from .sent_notifications import SentNotificationRepository, SentNotificationType

View File

@@ -0,0 +1,14 @@
from .checker import StreamsCheckWorkflow
from .on_channel_update import OnChannelUpdateWorkflow
from .on_message import OnMessageWorkflow
from .on_reward_redemption import OnRewardRedemptionWorkflow
from .on_stream_online import OnStreamOnlineWorkflow
__all__ = [
"StreamsCheckWorkflow",
"OnChannelUpdateWorkflow",
"OnMessageWorkflow",
"OnRewardRedemptionWorkflow",
"OnStreamOnlineWorkflow",
]

View File

@@ -0,0 +1,33 @@
from datetime import timedelta
from temporalio import workflow
from temporalio.client import Schedule, ScheduleActionStartWorkflow, ScheduleSpec, ScheduleIntervalSpec
from applications.temporal_worker.queues import MAIN_QUEUE
from applications.twitch_webhook.activities.state_checker import check_streams_states
@workflow.defn
class StreamsCheckWorkflow:
@classmethod
def get_schedules(cls) -> dict[str, Schedule]:
return {
"check": Schedule(
action=ScheduleActionStartWorkflow(
cls.run,
id="StreamsCheckWorkflow",
task_queue=MAIN_QUEUE,
),
spec=ScheduleSpec(
intervals=[ScheduleIntervalSpec(every=timedelta(minutes=2))]
)
)
}
@workflow.run
async def run(self):
await workflow.start_activity(
check_streams_states,
task_queue=MAIN_QUEUE,
schedule_to_close_timeout=timedelta(minutes=1)
)

View File

@@ -0,0 +1,26 @@
from datetime import timedelta
from temporalio import workflow
from applications.temporal_worker.queues import MAIN_QUEUE
from applications.twitch_webhook.activities.on_state_change import OnChannelUpdateActivity, on_channel_update_activity
from applications.twitch_webhook.state import UpdateEvent, EventType
@workflow.defn
class OnChannelUpdateWorkflow:
@workflow.run
async def run(
self,
event: UpdateEvent,
event_type: EventType,
):
await workflow.start_activity(
on_channel_update_activity,
OnChannelUpdateActivity(
event=event,
event_type=event_type,
),
task_queue=MAIN_QUEUE,
start_to_close_timeout=timedelta(minutes=1),
)

View File

@@ -0,0 +1,19 @@
from datetime import timedelta
from temporalio import workflow
from applications.twitch_webhook.messages_proc import MessageEvent
from applications.twitch_webhook.activities.message_proc import on_message_activity
from applications.temporal_worker.queues import MAIN_QUEUE
@workflow.defn
class OnMessageWorkflow:
@workflow.run
async def run(self, message: MessageEvent) -> None:
await workflow.start_activity(
on_message_activity,
message,
task_queue=MAIN_QUEUE,
schedule_to_close_timeout=timedelta(minutes=5)
)

View File

@@ -0,0 +1,19 @@
from datetime import timedelta
from temporalio import workflow
from applications.twitch_webhook.activities.redemption_reward import on_redemption_reward_add_activity
from applications.twitch_webhook.reward_redemption import RewardRedemption
from applications.temporal_worker.queues import MAIN_QUEUE
@workflow.defn
class OnRewardRedemptionWorkflow:
@workflow.run
async def run(self, reward: RewardRedemption):
await workflow.start_activity(
on_redemption_reward_add_activity,
reward,
task_queue=MAIN_QUEUE,
schedule_to_close_timeout=timedelta(minutes=1)
)

View File

@@ -0,0 +1,22 @@
from datetime import timedelta
from temporalio import workflow
from applications.twitch_webhook.activities.on_state_change import on_stream_state_change_activity, OnStreamStateChangeActivity
from applications.twitch_webhook.state import EventType
from applications.temporal_worker.queues import MAIN_QUEUE
@workflow.defn
class OnStreamOnlineWorkflow:
@workflow.run
async def run(self, broadcaster_user_id: str | int, event_type: EventType):
await workflow.start_activity(
on_stream_state_change_activity,
OnStreamStateChangeActivity(
streamer_id=int(broadcaster_user_id),
event_type=event_type
),
task_queue=MAIN_QUEUE,
schedule_to_close_timeout=timedelta(minutes=1)
)

View File

@@ -1,22 +0,0 @@
from taskiq import TaskiqScheduler
from taskiq.middlewares.retry_middleware import SimpleRetryMiddleware
from taskiq.schedule_sources import LabelScheduleSource
from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend
from core.config import config
broker = ListQueueBroker(url=config.REDIS_URI) \
.with_middlewares(
SimpleRetryMiddleware(default_retry_count=5)
) \
.with_result_backend(RedisAsyncResultBackend(
redis_url=config.REDIS_URI,
result_ex_time=60 * 60 * 24 * 7,
))
scheduler = TaskiqScheduler(
broker=broker,
sources=[LabelScheduleSource(broker)],
)

10
src/core/temporal.py Normal file
View File

@@ -0,0 +1,10 @@
from temporalio.client import Client
from temporalio.contrib.pydantic import pydantic_data_converter
async def get_client() -> Client:
return await Client.connect(
"temporal:7233",
namespace="default",
data_converter=pydantic_data_converter
)

View File

@@ -1,45 +0,0 @@
import logging
import sys
from modules.games_list import start as start_games_list_module
from modules.stream_notifications import start as start_stream_notifications_module
from core.mongo import mongo_manager
from core.redis import redis_manager
from core.broker import broker
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
async def main():
logger.info("Starting services...")
if len(sys.argv) != 2:
raise RuntimeError("Usage: python main.py <module>")
module = sys.argv[1]
await mongo_manager.init()
await redis_manager.init()
if not broker.is_worker_process:
await broker.startup()
if module == "games_list":
await start_games_list_module()
elif module == "stream_notifications":
await start_stream_notifications_module()
else:
raise RuntimeError(f"Unknown module: {module}")
exit(0)
if __name__ == "__main__":
from asyncio import run
run(main())

View File

@@ -1,7 +0,0 @@
from .discord import start_discord_sevice
start = start_discord_sevice
__all__ = ["start"]

View File

@@ -1,24 +0,0 @@
from core.broker import broker
from repositories.streamers import StreamerConfigRepository
from .synchronizer import syncronize
@broker.task("scheduler_sync.syncronize_task")
async def syncronize_task(twitch_id: int):
streamer = await StreamerConfigRepository.get_by_twitch_id(twitch_id)
if streamer.integrations.discord is None:
return
await syncronize(streamer.twitch, streamer.integrations.discord.guild_id)
@broker.task("scheduler_sync.syncronize_all_task", schedule=[{"cron": "*/5 * * * *"}])
async def syncronize_all_task():
streamers = await StreamerConfigRepository().all()
for streamer in streamers:
if streamer.integrations.discord is None:
continue
await syncronize_task.kiq(streamer.twitch.id)

View File

@@ -1,7 +0,0 @@
from .twitch.webhook import start_twitch_service
start = start_twitch_service
__all__ = ["start"]

View File

@@ -1,96 +0,0 @@
from datetime import datetime, timezone
from twitchAPI.helper import first
from core.broker import broker
from repositories.streamers import StreamerConfigRepository
from .state import State, UpdateEvent, EventType
from .watcher import StateWatcher
from .messages_proc import MessageEvent, MessagesProc
from .twitch.authorize import authorize
from .reward_redemption import RewardRedemption, on_redemption_reward_add
@broker.task(
"stream_notifications.twitch.on_stream_state_change_with_check",
retry_on_error=True
)
async def on_stream_state_change_with_check(
event: UpdateEvent,
event_type: EventType
):
twitch = await authorize(event.broadcaster_user_login)
stream = await first(twitch.get_streams(user_id=[event.broadcaster_user_id]))
if stream is None:
return
await on_stream_state_change.kiq(
int(event.broadcaster_user_id),
event_type,
State(
title=event.title,
category=event.category_name,
last_live_at=datetime.now(timezone.utc)
)
)
@broker.task(
"stream_notifications.twitch.on_stream_state_change",
retry_on_error=True
)
async def on_stream_state_change(
streamer_id: int,
event_type: EventType,
new_state: State | None = None
):
await StateWatcher.on_stream_state_change(
streamer_id,
event_type,
new_state,
)
@broker.task(
"stream_notifications.check_streams_states",
schedule=[{"cron": "*/2 * * * *"}]
)
async def check_streams_states():
streamers = await StreamerConfigRepository.all()
streamers_ids = [str(streamer.twitch.id) for streamer in streamers]
twitch = await authorize("kurbezz")
async for stream in twitch.get_streams(user_id=streamers_ids):
state = State(
title=stream.title,
category=stream.game_name,
last_live_at=datetime.now(timezone.utc)
)
await StateWatcher.on_stream_state_change(
int(stream.user_id),
EventType.UNKNOWN,
state
)
@broker.task(
"stream_notifications.on_message",
retry_on_error=True
)
async def on_message(
received_as: str,
event: MessageEvent
):
await MessagesProc.on_message(received_as, event)
@broker.task(
"stream_notifications.on_redemption_reward_add",
retry_on_error=True
)
async def on_redemption_reward_add_task(event: RewardRedemption):
await on_redemption_reward_add(event)

View File

@@ -1,2 +0,0 @@
from modules.scheduler_sync.tasks import * # noqa: F403
from modules.stream_notifications.tasks import * # noqa: F403

43
uv.lock generated
View File

@@ -283,6 +283,7 @@ dependencies = [
{ name = "redis", extra = ["hiredis"] },
{ name = "taskiq" },
{ name = "taskiq-redis" },
{ name = "temporalio" },
{ name = "twitchapi" },
{ name = "uvicorn", extra = ["standard"] },
]
@@ -302,6 +303,7 @@ requires-dist = [
{ name = "redis", extras = ["hiredis"], specifier = ">=5.2.1,<6" },
{ name = "taskiq", specifier = ">=0.11.11,<0.12" },
{ name = "taskiq-redis", specifier = ">=1.0.2,<2" },
{ name = "temporalio", specifier = ">=1.10.0" },
{ name = "twitchapi", specifier = ">=4.4.0,<5" },
{ name = "uvicorn", extras = ["standard"], specifier = ">=0.34.0,<0.35" },
]
@@ -719,6 +721,20 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/b8/d3/c3cb8f1d6ae3b37f83e1de806713a9b3642c5895f0215a62e1a4bd6e5e34/propcache-0.3.1-py3-none-any.whl", hash = "sha256:9a8ecf38de50a7f518c21568c80f985e776397b902f1ce0b01f799aba1608b40", size = 12376 },
]
[[package]]
name = "protobuf"
version = "6.30.2"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/c8/8c/cf2ac658216eebe49eaedf1e06bc06cbf6a143469236294a1171a51357c3/protobuf-6.30.2.tar.gz", hash = "sha256:35c859ae076d8c56054c25b59e5e59638d86545ed6e2b6efac6be0b6ea3ba048", size = 429315 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/be/85/cd53abe6a6cbf2e0029243d6ae5fb4335da2996f6c177bb2ce685068e43d/protobuf-6.30.2-cp310-abi3-win32.whl", hash = "sha256:b12ef7df7b9329886e66404bef5e9ce6a26b54069d7f7436a0853ccdeb91c103", size = 419148 },
{ url = "https://files.pythonhosted.org/packages/97/e9/7b9f1b259d509aef2b833c29a1f3c39185e2bf21c9c1be1cd11c22cb2149/protobuf-6.30.2-cp310-abi3-win_amd64.whl", hash = "sha256:7653c99774f73fe6b9301b87da52af0e69783a2e371e8b599b3e9cb4da4b12b9", size = 431003 },
{ url = "https://files.pythonhosted.org/packages/8e/66/7f3b121f59097c93267e7f497f10e52ced7161b38295137a12a266b6c149/protobuf-6.30.2-cp39-abi3-macosx_10_9_universal2.whl", hash = "sha256:0eb523c550a66a09a0c20f86dd554afbf4d32b02af34ae53d93268c1f73bc65b", size = 417579 },
{ url = "https://files.pythonhosted.org/packages/d0/89/bbb1bff09600e662ad5b384420ad92de61cab2ed0f12ace1fd081fd4c295/protobuf-6.30.2-cp39-abi3-manylinux2014_aarch64.whl", hash = "sha256:50f32cc9fd9cb09c783ebc275611b4f19dfdfb68d1ee55d2f0c7fa040df96815", size = 317319 },
{ url = "https://files.pythonhosted.org/packages/28/50/1925de813499546bc8ab3ae857e3ec84efe7d2f19b34529d0c7c3d02d11d/protobuf-6.30.2-cp39-abi3-manylinux2014_x86_64.whl", hash = "sha256:4f6c687ae8efae6cf6093389a596548214467778146b7245e886f35e1485315d", size = 316212 },
{ url = "https://files.pythonhosted.org/packages/e5/a1/93c2acf4ade3c5b557d02d500b06798f4ed2c176fa03e3c34973ca92df7f/protobuf-6.30.2-py3-none-any.whl", hash = "sha256:ae86b030e69a98e08c77beab574cbcb9fff6d031d57209f574a5aea1445f4b51", size = 167062 },
]
[[package]]
name = "pyasn1"
version = "0.4.8"
@@ -1045,6 +1061,24 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/f8/1a/c4cf9d4f01d996582be78d15626d026e52584846f057e537954cb97b798f/taskiq_redis-1.0.4-py3-none-any.whl", hash = "sha256:ffc151e212cddef7ed73e41aa11b874328433510c685aaf2acee2976b757caf3", size = 19107 },
]
[[package]]
name = "temporalio"
version = "1.10.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "protobuf" },
{ name = "types-protobuf" },
{ name = "typing-extensions" },
]
sdist = { url = "https://files.pythonhosted.org/packages/a5/8d/745ccb5da079062db8a14227052ac8d0b8a50bb41df4660fad3345a73ecc/temporalio-1.10.0.tar.gz", hash = "sha256:688400e4ca7f6b47c0ade3ebb6549e4d79b0762430ea735a0d8ff83b1ab8b8ba", size = 1418529 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/16/f5/9c75e50db0d54d7960a3571b16cc55083d7bd449d1734cbe5f25779d1469/temporalio-1.10.0-cp39-abi3-macosx_10_9_x86_64.whl", hash = "sha256:81fd40eeeba0396a7193ab5b45877301234b983aa38e444dfceecad2b3224398", size = 11032281 },
{ url = "https://files.pythonhosted.org/packages/f5/b2/ad8fc89e5f8f59c3128661a47a65687619734af0b87fbe4756460de8e00e/temporalio-1.10.0-cp39-abi3-macosx_11_0_arm64.whl", hash = "sha256:2f7bff9ac1fc832655342e9677bbee1b413b97857d1f2265f018ce72fb7758f8", size = 10796640 },
{ url = "https://files.pythonhosted.org/packages/ee/f4/a06053515ecd67e797b7dd0516bdd11c28f952a54449b4c771dcb097de00/temporalio-1.10.0-cp39-abi3-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:c3ee8416d1cab04036e03e39a4db4cf4a9a799750b8da20022e0d719da9c9371", size = 11146377 },
{ url = "https://files.pythonhosted.org/packages/f7/b4/875f10f1da52879d44fb126cd1b68186ba7f4d35589dc9f3c9ae743f65c5/temporalio-1.10.0-cp39-abi3-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:4f5805e0d33ba525ccea01e3cd9d3eca313e0c66c6b86e6cb0f15ef004c0acc0", size = 11303069 },
{ url = "https://files.pythonhosted.org/packages/21/55/a2248f3798498584133be848c0f6072b37995e201a3f93aff413f77f00cc/temporalio-1.10.0-cp39-abi3-win_amd64.whl", hash = "sha256:81cb8bef8aef6d3cc130c7cecf008cf529177a2f9cb206cfba2897db7df9d093", size = 11338952 },
]
[[package]]
name = "twitchapi"
version = "4.4.0"
@@ -1060,6 +1094,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/e9/b0/3041e012e59747990ec54d957ddeef1c6ca0a6120e2cecb4d6c07e573caa/twitchAPI-4.4.0-py3-none-any.whl", hash = "sha256:087939a4891a447aee5d1ab564da8ad3a984b580b0708ab9c59afd93b9a3d5da", size = 120579 },
]
[[package]]
name = "types-protobuf"
version = "5.29.1.20250403"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/78/6d/62a2e73b966c77609560800004dd49a926920dd4976a9fdd86cf998e7048/types_protobuf-5.29.1.20250403.tar.gz", hash = "sha256:7ff44f15022119c9d7558ce16e78b2d485bf7040b4fadced4dd069bb5faf77a2", size = 59413 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/69/e3/b74dcc2797b21b39d5a4f08a8b08e20369b4ca250d718df7af41a60dd9f0/types_protobuf-5.29.1.20250403-py3-none-any.whl", hash = "sha256:c71de04106a2d54e5b2173d0a422058fae0ef2d058d70cf369fb797bf61ffa59", size = 73874 },
]
[[package]]
name = "typing-extensions"
version = "4.13.1"