surfaces/docs/research/fsm-patterns.md
Mikhail Putilovskij 67499daa61 feat: extend platform mock + add research docs
platform/interface.py:
- Add Attachment, MessageChunk, AgentEvent types
- Add stream_message() to PlatformClient Protocol (door open for streaming)
- Add WebhookReceiver Protocol

platform/mock.py:
- Add attachment_mode config (url/binary/s3)
- Implement stream_message() — single chunk, ready for real streaming
- Add register_webhook_receiver() + simulate_agent_event() for testing

docs/research/:
- telegram-forum-topics.md — aiogram 3.x Forum Topics API, FSM patterns, UX analysis
- fsm-patterns.md — FSM storage options, StateData best practices
- matrix-spaces.md — matrix-nio Space API, room ordering, invite flow
- matrix-events.md — reactions, threads, typing, sync loop pitfalls
- telegram-chat-alternatives.md — 7 alternatives for multi-chat UX, virtual chats in DM recommended
2026-03-30 14:04:34 +03:00

4.4 KiB
Raw Permalink Blame History

Research: FSM паттерны в aiogram 3.x

Хранилище состояний (Storage)

Выбор по фазе разработки

Фаза Storage Плюсы Минусы
Разработка MemoryStorage Нет зависимостей Теряется при рестарте
MVP SQLiteStorage Персистентен, нет Redis Не распределённый
Продакшн RedisStorage Распределённый, быстрый Нужен Redis
# Разработка
from aiogram.fsm.storage.memory import MemoryStorage
storage = MemoryStorage()

# MVP — pip install aiogram-sqlite-storage
from aiogram_sqlite_storage.sqlitestore import SQLStorage
storage = SQLStorage(db_path="fsm.db")

# Продакшн
from aiogram.fsm.storage.redis import RedisStorage
from redis.asyncio import Redis
storage = RedisStorage(redis=Redis(host="localhost"))

dp = Dispatcher(storage=storage)

Структура StateData

Правило: хранить только параметры текущего шага

# ПРАВИЛЬНО: только то что нужно в следующем шаге
await state.update_data(
    group_id=group_id,      # ID группы для следующего шага
    pending_name="Чат 2",   # временное имя до подтверждения
)

# НЕПРАВИЛЬНО: дублировать данные из БД
await state.update_data(
    user_name="Иван",       # уже есть в БД
    all_chats=[...],        # большой объект, лучше запрашивать из БД
)

Типичная StateData для онбординга

# Шаг 1: узнали group_id
await state.update_data(group_id=123456789, started_at=datetime.now().isoformat())

# Шаг 2: узнали имя пользователя
await state.update_data(display_name="Иван")

# Финал: читаем всё
data = await state.get_data()
group_id = data["group_id"]
display_name = data.get("display_name", "Пользователь")
await state.clear()  # очистить после завершения

Кастомные фильтры

from aiogram.filters import BaseFilter
from aiogram.types import Message

class IsForumTopicFilter(BaseFilter):
    """True если сообщение отправлено в тему Forum-группы."""
    async def __call__(self, message: Message) -> bool:
        return message.message_thread_id is not None


class IsGroupAdminFilter(BaseFilter):
    """True если пользователь является администратором группы."""
    async def __call__(self, message: Message, bot) -> bool:
        if message.chat.type not in ("group", "supergroup"):
            return False
        member = await bot.get_chat_member(message.chat.id, message.from_user.id)
        return member.status in ("administrator", "creator")


# Использование:
@router.message(IsForumTopicFilter())
async def on_topic_message(message: Message): ...

Обработка ошибок в FSM

from aiogram import Router
from aiogram.types import Message, ErrorEvent

error_router = Router()

@error_router.error()
async def handle_error(event: ErrorEvent) -> None:
    """Глобальный обработчик ошибок."""
    update = event.update
    exception = event.exception

    # Логируем
    import logging
    logging.error(f"Error handling update: {exception}", exc_info=True)

    # Пытаемся уведомить пользователя
    if update.message:
        await update.message.answer(
            "Что-то пошло не так. Попробуй ещё раз или напиши /start"
        )

Выводы для нашей реализации

  1. MVP: SQLiteStorage — персистентен, нет зависимостей от Redis
  2. StateData: хранить только group_id, thread_id текущей операции
  3. Timeout: сохранять started_at, проверять при каждом шаге
  4. Очищать state: await state.clear() после завершения онбординга
  5. Ошибки: глобальный @error_router.error() для graceful degradation