platform/interface.py: - Add Attachment, MessageChunk, AgentEvent types - Add stream_message() to PlatformClient Protocol (door open for streaming) - Add WebhookReceiver Protocol platform/mock.py: - Add attachment_mode config (url/binary/s3) - Implement stream_message() — single chunk, ready for real streaming - Add register_webhook_receiver() + simulate_agent_event() for testing docs/research/: - telegram-forum-topics.md — aiogram 3.x Forum Topics API, FSM patterns, UX analysis - fsm-patterns.md — FSM storage options, StateData best practices - matrix-spaces.md — matrix-nio Space API, room ordering, invite flow - matrix-events.md — reactions, threads, typing, sync loop pitfalls - telegram-chat-alternatives.md — 7 alternatives for multi-chat UX, virtual chats in DM recommended
4.4 KiB
4.4 KiB
Research: FSM паттерны в aiogram 3.x
Хранилище состояний (Storage)
Выбор по фазе разработки
| Фаза | Storage | Плюсы | Минусы |
|---|---|---|---|
| Разработка | MemoryStorage |
Нет зависимостей | Теряется при рестарте |
| MVP | SQLiteStorage |
Персистентен, нет Redis | Не распределённый |
| Продакшн | RedisStorage |
Распределённый, быстрый | Нужен Redis |
# Разработка
from aiogram.fsm.storage.memory import MemoryStorage
storage = MemoryStorage()
# MVP — pip install aiogram-sqlite-storage
from aiogram_sqlite_storage.sqlitestore import SQLStorage
storage = SQLStorage(db_path="fsm.db")
# Продакшн
from aiogram.fsm.storage.redis import RedisStorage
from redis.asyncio import Redis
storage = RedisStorage(redis=Redis(host="localhost"))
dp = Dispatcher(storage=storage)
Структура StateData
Правило: хранить только параметры текущего шага
# ПРАВИЛЬНО: только то что нужно в следующем шаге
await state.update_data(
group_id=group_id, # ID группы для следующего шага
pending_name="Чат 2", # временное имя до подтверждения
)
# НЕПРАВИЛЬНО: дублировать данные из БД
await state.update_data(
user_name="Иван", # уже есть в БД
all_chats=[...], # большой объект, лучше запрашивать из БД
)
Типичная StateData для онбординга
# Шаг 1: узнали group_id
await state.update_data(group_id=123456789, started_at=datetime.now().isoformat())
# Шаг 2: узнали имя пользователя
await state.update_data(display_name="Иван")
# Финал: читаем всё
data = await state.get_data()
group_id = data["group_id"]
display_name = data.get("display_name", "Пользователь")
await state.clear() # очистить после завершения
Кастомные фильтры
from aiogram.filters import BaseFilter
from aiogram.types import Message
class IsForumTopicFilter(BaseFilter):
"""True если сообщение отправлено в тему Forum-группы."""
async def __call__(self, message: Message) -> bool:
return message.message_thread_id is not None
class IsGroupAdminFilter(BaseFilter):
"""True если пользователь является администратором группы."""
async def __call__(self, message: Message, bot) -> bool:
if message.chat.type not in ("group", "supergroup"):
return False
member = await bot.get_chat_member(message.chat.id, message.from_user.id)
return member.status in ("administrator", "creator")
# Использование:
@router.message(IsForumTopicFilter())
async def on_topic_message(message: Message): ...
Обработка ошибок в FSM
from aiogram import Router
from aiogram.types import Message, ErrorEvent
error_router = Router()
@error_router.error()
async def handle_error(event: ErrorEvent) -> None:
"""Глобальный обработчик ошибок."""
update = event.update
exception = event.exception
# Логируем
import logging
logging.error(f"Error handling update: {exception}", exc_info=True)
# Пытаемся уведомить пользователя
if update.message:
await update.message.answer(
"Что-то пошло не так. Попробуй ещё раз или напиши /start"
)
Выводы для нашей реализации
- MVP:
SQLiteStorage— персистентен, нет зависимостей от Redis - StateData: хранить только
group_id,thread_idтекущей операции - Timeout: сохранять
started_at, проверять при каждом шаге - Очищать state:
await state.clear()после завершения онбординга - Ошибки: глобальный
@error_router.error()для graceful degradation