surfaces/docs/research/fsm-patterns.md
Mikhail Putilovskij 67499daa61 feat: extend platform mock + add research docs
platform/interface.py:
- Add Attachment, MessageChunk, AgentEvent types
- Add stream_message() to PlatformClient Protocol (door open for streaming)
- Add WebhookReceiver Protocol

platform/mock.py:
- Add attachment_mode config (url/binary/s3)
- Implement stream_message() — single chunk, ready for real streaming
- Add register_webhook_receiver() + simulate_agent_event() for testing

docs/research/:
- telegram-forum-topics.md — aiogram 3.x Forum Topics API, FSM patterns, UX analysis
- fsm-patterns.md — FSM storage options, StateData best practices
- matrix-spaces.md — matrix-nio Space API, room ordering, invite flow
- matrix-events.md — reactions, threads, typing, sync loop pitfalls
- telegram-chat-alternatives.md — 7 alternatives for multi-chat UX, virtual chats in DM recommended
2026-03-30 14:04:34 +03:00

129 lines
4.4 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Research: FSM паттерны в aiogram 3.x
## Хранилище состояний (Storage)
### Выбор по фазе разработки
| Фаза | Storage | Плюсы | Минусы |
|------|---------|-------|--------|
| Разработка | `MemoryStorage` | Нет зависимостей | Теряется при рестарте |
| MVP | `SQLiteStorage` | Персистентен, нет Redis | Не распределённый |
| Продакшн | `RedisStorage` | Распределённый, быстрый | Нужен Redis |
```python
# Разработка
from aiogram.fsm.storage.memory import MemoryStorage
storage = MemoryStorage()
# MVP — pip install aiogram-sqlite-storage
from aiogram_sqlite_storage.sqlitestore import SQLStorage
storage = SQLStorage(db_path="fsm.db")
# Продакшн
from aiogram.fsm.storage.redis import RedisStorage
from redis.asyncio import Redis
storage = RedisStorage(redis=Redis(host="localhost"))
dp = Dispatcher(storage=storage)
```
---
## Структура StateData
### Правило: хранить только параметры текущего шага
```python
# ПРАВИЛЬНО: только то что нужно в следующем шаге
await state.update_data(
group_id=group_id, # ID группы для следующего шага
pending_name="Чат 2", # временное имя до подтверждения
)
# НЕПРАВИЛЬНО: дублировать данные из БД
await state.update_data(
user_name="Иван", # уже есть в БД
all_chats=[...], # большой объект, лучше запрашивать из БД
)
```
### Типичная StateData для онбординга
```python
# Шаг 1: узнали group_id
await state.update_data(group_id=123456789, started_at=datetime.now().isoformat())
# Шаг 2: узнали имя пользователя
await state.update_data(display_name="Иван")
# Финал: читаем всё
data = await state.get_data()
group_id = data["group_id"]
display_name = data.get("display_name", "Пользователь")
await state.clear() # очистить после завершения
```
---
## Кастомные фильтры
```python
from aiogram.filters import BaseFilter
from aiogram.types import Message
class IsForumTopicFilter(BaseFilter):
"""True если сообщение отправлено в тему Forum-группы."""
async def __call__(self, message: Message) -> bool:
return message.message_thread_id is not None
class IsGroupAdminFilter(BaseFilter):
"""True если пользователь является администратором группы."""
async def __call__(self, message: Message, bot) -> bool:
if message.chat.type not in ("group", "supergroup"):
return False
member = await bot.get_chat_member(message.chat.id, message.from_user.id)
return member.status in ("administrator", "creator")
# Использование:
@router.message(IsForumTopicFilter())
async def on_topic_message(message: Message): ...
```
---
## Обработка ошибок в FSM
```python
from aiogram import Router
from aiogram.types import Message, ErrorEvent
error_router = Router()
@error_router.error()
async def handle_error(event: ErrorEvent) -> None:
"""Глобальный обработчик ошибок."""
update = event.update
exception = event.exception
# Логируем
import logging
logging.error(f"Error handling update: {exception}", exc_info=True)
# Пытаемся уведомить пользователя
if update.message:
await update.message.answer(
"Что-то пошло не так. Попробуй ещё раз или напиши /start"
)
```
---
## Выводы для нашей реализации
1. **MVP**: `SQLiteStorage` — персистентен, нет зависимостей от Redis
2. **StateData**: хранить только `group_id`, `thread_id` текущей операции
3. **Timeout**: сохранять `started_at`, проверять при каждом шаге
4. **Очищать state**: `await state.clear()` после завершения онбординга
5. **Ошибки**: глобальный `@error_router.error()` для graceful degradation