feat: extend platform mock + add research docs

platform/interface.py:
- Add Attachment, MessageChunk, AgentEvent types
- Add stream_message() to PlatformClient Protocol (door open for streaming)
- Add WebhookReceiver Protocol

platform/mock.py:
- Add attachment_mode config (url/binary/s3)
- Implement stream_message() — single chunk, ready for real streaming
- Add register_webhook_receiver() + simulate_agent_event() for testing

docs/research/:
- telegram-forum-topics.md — aiogram 3.x Forum Topics API, FSM patterns, UX analysis
- fsm-patterns.md — FSM storage options, StateData best practices
- matrix-spaces.md — matrix-nio Space API, room ordering, invite flow
- matrix-events.md — reactions, threads, typing, sync loop pitfalls
- telegram-chat-alternatives.md — 7 alternatives for multi-chat UX, virtual chats in DM recommended
This commit is contained in:
Mikhail Putilovskij 2026-03-30 14:04:34 +03:00
parent 6f0e9a53a6
commit 67499daa61
7 changed files with 1515 additions and 29 deletions

View file

@ -0,0 +1,129 @@
# Research: FSM паттерны в aiogram 3.x
## Хранилище состояний (Storage)
### Выбор по фазе разработки
| Фаза | Storage | Плюсы | Минусы |
|------|---------|-------|--------|
| Разработка | `MemoryStorage` | Нет зависимостей | Теряется при рестарте |
| MVP | `SQLiteStorage` | Персистентен, нет Redis | Не распределённый |
| Продакшн | `RedisStorage` | Распределённый, быстрый | Нужен Redis |
```python
# Разработка
from aiogram.fsm.storage.memory import MemoryStorage
storage = MemoryStorage()
# MVP — pip install aiogram-sqlite-storage
from aiogram_sqlite_storage.sqlitestore import SQLStorage
storage = SQLStorage(db_path="fsm.db")
# Продакшн
from aiogram.fsm.storage.redis import RedisStorage
from redis.asyncio import Redis
storage = RedisStorage(redis=Redis(host="localhost"))
dp = Dispatcher(storage=storage)
```
---
## Структура StateData
### Правило: хранить только параметры текущего шага
```python
# ПРАВИЛЬНО: только то что нужно в следующем шаге
await state.update_data(
group_id=group_id, # ID группы для следующего шага
pending_name="Чат 2", # временное имя до подтверждения
)
# НЕПРАВИЛЬНО: дублировать данные из БД
await state.update_data(
user_name="Иван", # уже есть в БД
all_chats=[...], # большой объект, лучше запрашивать из БД
)
```
### Типичная StateData для онбординга
```python
# Шаг 1: узнали group_id
await state.update_data(group_id=123456789, started_at=datetime.now().isoformat())
# Шаг 2: узнали имя пользователя
await state.update_data(display_name="Иван")
# Финал: читаем всё
data = await state.get_data()
group_id = data["group_id"]
display_name = data.get("display_name", "Пользователь")
await state.clear() # очистить после завершения
```
---
## Кастомные фильтры
```python
from aiogram.filters import BaseFilter
from aiogram.types import Message
class IsForumTopicFilter(BaseFilter):
"""True если сообщение отправлено в тему Forum-группы."""
async def __call__(self, message: Message) -> bool:
return message.message_thread_id is not None
class IsGroupAdminFilter(BaseFilter):
"""True если пользователь является администратором группы."""
async def __call__(self, message: Message, bot) -> bool:
if message.chat.type not in ("group", "supergroup"):
return False
member = await bot.get_chat_member(message.chat.id, message.from_user.id)
return member.status in ("administrator", "creator")
# Использование:
@router.message(IsForumTopicFilter())
async def on_topic_message(message: Message): ...
```
---
## Обработка ошибок в FSM
```python
from aiogram import Router
from aiogram.types import Message, ErrorEvent
error_router = Router()
@error_router.error()
async def handle_error(event: ErrorEvent) -> None:
"""Глобальный обработчик ошибок."""
update = event.update
exception = event.exception
# Логируем
import logging
logging.error(f"Error handling update: {exception}", exc_info=True)
# Пытаемся уведомить пользователя
if update.message:
await update.message.answer(
"Что-то пошло не так. Попробуй ещё раз или напиши /start"
)
```
---
## Выводы для нашей реализации
1. **MVP**: `SQLiteStorage` — персистентен, нет зависимостей от Redis
2. **StateData**: хранить только `group_id`, `thread_id` текущей операции
3. **Timeout**: сохранять `started_at`, проверять при каждом шаге
4. **Очищать state**: `await state.clear()` после завершения онбординга
5. **Ошибки**: глобальный `@error_router.error()` для graceful degradation

View file

@ -0,0 +1,360 @@
# Research: matrix-nio Event Handling
Based on: Matrix Client-Server Spec, matrix-nio 0.24+.
## Реакции (m.reaction)
Реакции используют `rel_type: "m.annotation"` для связи с исходным сообщением.
### Слушание и обработка реакций
```python
from nio import AsyncClient, ReactionEvent
class ReactionHandler:
def __init__(self, client: AsyncClient):
self.client = client
# Регистрируем callback — явно, не через декоратор
self.client.add_event_callback(self.on_reaction, ReactionEvent)
async def on_reaction(self, room, event: ReactionEvent) -> None:
# Игнорируем свои реакции
if event.sender == self.client.user_id:
return
# Извлекаем ключ и target event ID
# Способ 1 (новые версии nio)
reaction_key = getattr(event, "key", None)
target_event_id = getattr(event, "reacts_to", None)
# Способ 2 (fallback — работает во всех версиях)
if reaction_key is None or target_event_id is None:
relates_to = event.content.get("m.relates_to", {})
reaction_key = relates_to.get("key")
target_event_id = relates_to.get("event_id")
if not reaction_key or not target_event_id:
return
if reaction_key == "👍":
await self.handle_confirm(room, target_event_id)
elif reaction_key == "❌":
await self.handle_cancel(room, target_event_id)
async def handle_confirm(self, room, target_event_id: str) -> None:
await self.client.room_send(
room_id=room.room_id,
message_type="m.room.message",
content={"msgtype": "m.text", "body": "✅ Подтверждено"},
)
async def handle_cancel(self, room, target_event_id: str) -> None:
await self.client.room_send(
room_id=room.room_id,
message_type="m.room.message",
content={"msgtype": "m.text", "body": "❌ Отменено"},
)
```
### Подводный камень #1: атрибуты ReactionEvent отличаются по версиям nio
В разных версиях matrix-nio атрибуты могут называться по-разному. Всегда используй fallback через `event.content`.
### Подводный камень #2: callback нужно регистрировать явно
```python
# НЕПРАВИЛЬНО — просто определить функцию недостаточно
async def on_reaction(room, event):
pass
# ПРАВИЛЬНО
client.add_event_callback(on_reaction, ReactionEvent)
```
---
## Треды (m.thread)
### Создание треда и отправка сообщений
```python
async def create_thread_root(client: AsyncClient, room_id: str, text: str) -> str:
"""
Создаёт корневое сообщение для треда.
Первое сообщение НЕ должно содержать m.relates_to.
Возвращает event_id корня.
"""
resp = await client.room_send(
room_id=room_id,
message_type="m.room.message",
content={
"msgtype": "m.text",
"body": text,
# Нет m.relates_to! Это корень.
},
)
return resp.event_id
async def send_to_thread(
client: AsyncClient,
room_id: str,
thread_root_id: str,
text: str,
) -> str:
"""
Отправляет сообщение в существующий тред.
"""
resp = await client.room_send(
room_id=room_id,
message_type="m.room.message",
content={
"msgtype": "m.text",
"body": text,
"m.relates_to": {
"rel_type": "m.thread",
"event_id": thread_root_id, # ID корневого сообщения
},
},
)
return resp.event_id
# Использование для долгой задачи:
async def run_long_task(client, room_id, task_text):
root_id = await create_thread_root(client, room_id, f"📋 {task_text}")
await send_to_thread(client, room_id, root_id, "⏳ Обрабатываю... (1/3)")
# ... работа ...
await send_to_thread(client, room_id, root_id, "⏳ Анализирую... (2/3)")
# ... работа ...
await send_to_thread(client, room_id, root_id, "✅ Готово!")
```
### Подводный камень #3: корневое сообщение без m.relates_to
```python
# НЕПРАВИЛЬНО — первое сообщение треда не должно ссылаться на что-либо
content = {
"msgtype": "m.text",
"body": "Начинаю задачу",
"m.relates_to": {"rel_type": "m.thread", "event_id": "..."}, # ОШИБКА
}
# ПРАВИЛЬНО — просто обычное сообщение
content = {
"msgtype": "m.text",
"body": "Начинаю задачу",
}
```
---
## Typing indicator
```python
async def with_typing(client: AsyncClient, room_id: str, coro):
"""
Запускает корутину с индикатором печати.
Автоматически убирает индикатор в finally.
"""
async def renew_typing():
"""Возобновляет typing каждые 5 секунд (timeout = 10 сек)."""
import asyncio
while True:
await client.room_typing(room_id, typing_state=True, timeout=10000)
await asyncio.sleep(5)
import asyncio
renewal_task = asyncio.create_task(renew_typing())
try:
return await coro
finally:
renewal_task.cancel()
await client.room_typing(room_id, typing_state=False, timeout=0)
# Использование:
async def on_message(client, room, event):
result = await with_typing(client, room.room_id, process_message(event.body))
await client.room_send(room.room_id, "m.room.message", {"msgtype": "m.text", "body": result})
```
### Подводный камень #4: typing исчезает через 10 секунд
Нужно возобновлять каждые 5 секунд если обработка долгая.
---
## Фильтрация собственных сообщений
```python
from nio import RoomMessageText
class MessageHandler:
def __init__(self, client: AsyncClient):
self.client = client
self.client.add_event_callback(self.on_message, RoomMessageText)
async def on_message(self, room, event: RoomMessageText) -> None:
# ОБЯЗАТЕЛЬНО: игнорировать свои сообщения во избежание петли
if event.sender == self.client.user_id:
return
# Обрабатываем только чужие сообщения
await self.process(room, event)
```
### Подводный камень #5: user_id может быть None до login()
```python
if self.client.user_id is None:
return # клиент ещё не залогинен
if event.sender == self.client.user_id:
return
```
---
## Обработка invite (m.room.member)
```python
from nio import InviteEvent, InviteMemberEvent
class InviteHandler:
def __init__(self, client: AsyncClient):
self.client = client
self.client.add_event_callback(self.on_invite, InviteEvent)
async def on_invite(self, room, event: InviteMemberEvent) -> None:
"""
Вызывается когда бот получает приглашение в комнату.
room.room_id — куда приглашают
room.inviter — кто приглашает
"""
inviter = room.inviter
room_id = room.room_id
# Принимаем приглашение
resp = await self.client.join(room_id)
if hasattr(resp, "room_id"):
# Успешно вошли — начинаем регистрацию пользователя
await self.start_onboarding(room_id, inviter)
else:
print(f"Failed to join {room_id}: {resp}")
async def start_onboarding(self, room_id: str, user_id: str) -> None:
await self.client.room_send(
room_id=room_id,
message_type="m.room.message",
content={"msgtype": "m.text", "body": "Привет! Создаю ваше пространство..."},
)
```
### Подводный камень #6: InviteRoom vs Room
В момент invite бот ещё не в комнате, поэтому `room.name` может быть `None`.
---
## Sync loop и reconnect
### Полная реализация с reconnect
```python
from nio import AsyncClient, SyncError
import asyncio, logging
logger = logging.getLogger(__name__)
class SyncManager:
def __init__(self, client: AsyncClient):
self.client = client
self.sync_token = None
self.should_stop = False
async def run(self) -> None:
reconnect_delay = 5
attempts = 0
while not self.should_stop:
try:
await self._sync_once()
attempts = 0 # сбрасываем счётчик на успешный sync
reconnect_delay = 5
except Exception as e:
attempts += 1
if attempts > 10:
logger.error("Too many reconnect failures, stopping")
break
delay = min(reconnect_delay * (2 ** (attempts - 1)), 300)
logger.warning(f"Sync error: {e}. Retrying in {delay}s")
await asyncio.sleep(delay)
async def _sync_once(self) -> None:
is_first_sync = self.sync_token is None
response = await self.client.sync(
since=self.sync_token,
full_state=is_first_sync,
set_presence="online",
timeout=30000,
)
if isinstance(response, SyncError):
raise Exception(f"Sync failed: {response.message}")
if is_first_sync:
# При первом синхе — только сохраняем токен, не обрабатываем старые события
logger.info("Initial sync complete, skipping old events")
self.sync_token = response.next_batch
return
self.sync_token = response.next_batch
# Callbacks вызываются автоматически через add_event_callback
```
### Подводный камень #7: при первом синхе — много старых событий
```python
# Если не пропустить первый синх — бот обработает всю историю комнат
if self.sync_token is None:
self.sync_token = response.next_batch
return # пропустить, только взять токен
```
### Подводный камень #8: full_state нужен только для первого синха
```python
# При повторных синхах full_state=True сильно замедляет работу
response = await client.sync(
since=token,
full_state=token is None, # True только если первый раз
)
```
---
## Подводные камни (резюме)
| Проблема | Решение |
|----------|---------|
| Реакции не обрабатываются | `client.add_event_callback(handler, ReactionEvent)` |
| `event.reacts_to` не существует | Fallback: `event.content.get("m.relates_to", {}).get("event_id")` |
| Первое сообщение треда не создаёт тред | Первое сообщение БЕЗ `m.relates_to` |
| Бот зацикливается | `event.sender == client.user_id` → return |
| Старые события при старте | Пропусти первый синх: сохрани токен, не обрабатывай |
| Typing исчезает быстро | Возобновляй каждые 5 секунд |
| InviteRoom.name is None | Нормально, используй `room.room_id` |
---
## Выводы для нашей реализации
1. **Регистрировать callbacks** в `__init__` бота, до первого `sync()`
2. **Всегда проверять** `event.sender != client.user_id`
3. **Для подтверждений** — слать сообщение с инструкцией `👍 / ❌`, слушать `ReactionEvent`
4. **Для долгих задач**`create_thread_root()`, статусы в тред через `send_to_thread()`
5. **Typing** — использовать `with_typing()` helper с автообновлением
6. **Sync loop** — при первом синхе пропускать старые события

View file

@ -0,0 +1,264 @@
# Research: matrix-nio Space API
Based on: Matrix Client-Server Spec, matrix-nio 0.24+, Element Web behavior.
## Создание Space
Space в Matrix — это комната с room type `m.space`. При создании нужны state events.
### Полный рабочий пример
```python
from nio import AsyncClient, RoomCreateResponse
async def create_personal_space(client: AsyncClient, display_name: str) -> str:
"""
Создаёт персональный Space для пользователя.
Возвращает room_id созданного Space.
"""
response = await client.room_create(
name=f"Lambda — {display_name}",
is_public=False,
room_version="10",
initial_state=[
{
"type": "m.room.topic",
"state_key": "",
"content": {"topic": "Lambda AI workspace"},
},
{
"type": "m.room.join_rules",
"state_key": "",
"content": {"join_rule": "invite"},
},
],
creation_content={
"type": "m.space", # ЭТО ГЛАВНОЕ — иначе создастся обычная комната
},
)
if isinstance(response, RoomCreateResponse):
return response.room_id
raise Exception(f"Failed to create space: {response}")
```
### Подводный камень #1: creation_content обязателен
```python
# НЕПРАВИЛЬНО — создаст обычную комнату, не Space
await client.room_create(name="My Space")
# ПРАВИЛЬНО
await client.room_create(
name="My Space",
creation_content={"type": "m.space"},
)
```
---
## Добавление комнат в Space
Чтобы добавить комнату в Space, нужно установить state event `m.space.child` в самом Space.
### Создание комнаты и добавление в Space
```python
async def create_room_in_space(
client: AsyncClient,
space_room_id: str,
room_name: str,
order: str,
) -> str:
"""
Создаёт комнату и добавляет её в Space.
Возвращает room_id новой комнаты.
"""
# 1. Создаём комнату
room_resp = await client.room_create(
name=room_name,
is_public=False,
room_version="10",
initial_state=[
{
"type": "m.room.join_rules",
"state_key": "",
"content": {"join_rule": "invite"},
},
],
)
if not isinstance(room_resp, RoomCreateResponse):
raise Exception(f"Failed to create room: {room_resp}")
room_id = room_resp.room_id
# 2. Добавляем в Space через m.space.child
# state_key = room_id дочерней комнаты (НЕ произвольное имя!)
await client.room_put_state(
room_id=space_room_id,
event_type="m.space.child",
state_key=room_id,
content={
"via": ["example.com"], # серверы для присоединения — обязательно
"order": order, # строковый порядок
"suggested": True,
},
)
# 3. Обратная ссылка (помогает клиентам понять структуру)
await client.room_put_state(
room_id=room_id,
event_type="m.space.parent",
state_key=space_room_id,
content={
"via": ["example.com"],
"canonical": True,
},
)
return room_id
```
### Подводный камень #2: state_key — это room_id, не имя
```python
# НЕПРАВИЛЬНО
await client.room_put_state(
room_id=space_id,
event_type="m.space.child",
state_key="Настройки", # ОШИБКА: нужен room_id
content={"via": ["example.com"]},
)
# ПРАВИЛЬНО
await client.room_put_state(
room_id=space_id,
event_type="m.space.child",
state_key="!abc123:example.com", # именно room_id
content={"via": ["example.com"]},
)
```
---
## Управление порядком комнат
Порядок в Space контролируется полем `order` в `m.space.child`. Это строковое сравнение.
### Закрепить «Настройки» вверху, чаты ниже
```python
async def setup_space_order(
client: AsyncClient,
space_id: str,
settings_room_id: str,
chat_room_ids: list[str],
) -> None:
# Настройки вверху
await client.room_put_state(
room_id=space_id,
event_type="m.space.child",
state_key=settings_room_id,
content={"via": ["example.com"], "order": "00", "suggested": True},
)
# Чаты с нарастающим порядком
for idx, chat_id in enumerate(chat_room_ids):
await client.room_put_state(
room_id=space_id,
event_type="m.space.child",
state_key=chat_id,
content={"via": ["example.com"], "order": f"{10 + idx:02d}", "suggested": True},
)
```
### Подводный камень #3: order — строковое, нужен padding
```python
# НЕПРАВИЛЬНО: "10" < "2" в строковом сравнении!
orders = ["0", "1", "10", "11", "2", "3"]
# ПРАВИЛЬНО: одинаковая длина
orders = ["00", "01", "02", "03", "10", "11"]
```
---
## Приглашение пользователя в Space и дочерние комнаты
Простого способа пригласить в Space и все дочерние комнаты одним вызовом нет — нужно приглашать в каждую отдельно.
```python
async def invite_user_to_space_and_rooms(
client: AsyncClient,
space_id: str,
child_room_ids: list[str],
user_id: str,
) -> None:
# Приглашаем в Space
await client.room_invite(space_id, user_id)
# Приглашаем в каждую дочернюю комнату
for room_id in child_room_ids:
await client.room_invite(room_id, user_id)
```
### Подводный камень #4: пользователь в Space не видит комнаты автоматически
Приглашение в Space не тянет дочерние комнаты — нужно приглашать каждую явно.
---
## Переименование и удаление из Space
### Переименование комнаты
```python
async def rename_room(client: AsyncClient, room_id: str, new_name: str) -> None:
await client.room_put_state(
room_id=room_id,
event_type="m.room.name",
state_key="",
content={"name": new_name},
)
```
### Удаление комнаты из Space (без удаления самой комнаты)
```python
async def remove_from_space(
client: AsyncClient,
space_id: str,
room_id: str,
) -> None:
# Пустой content = убрать из Space
await client.room_put_state(
room_id=space_id,
event_type="m.space.child",
state_key=room_id,
content={}, # пустое — убирает комнату из Space
)
```
---
## Подводные камни (резюме)
| Проблема | Решение |
|----------|---------|
| Space создаётся как обычная комната | Используй `creation_content={"type": "m.space"}` |
| Комнаты не видны в Space | Добавь `m.space.child` с `state_key=room_id` |
| Неправильный порядок | Строковый padding: `"00"`, `"01"`, `"10"` |
| Пользователь не видит комнаты | Приглашай в каждую дочернюю отдельно |
| Комната не убирается из Space | Установи пустой `content: {}` в `m.space.child` |
| `via` не указан | Всегда указывай `"via": ["homeserver.com"]` |
---
## Выводы для нашей реализации
1. **При регистрации**: `create_personal_space()` → сохранить `space_id` в БД
2. **Комната «Настройки»**: создать первой с `order: "00"`
3. **«Чат 1»**: создать второй с `order: "10"`
4. **Команда `!new`**: создать комнату + `m.space.child` + пригласить пользователя
5. **Команда `!rename`**: обновить `m.room.name`
6. **Команда `!archive`**: установить пустой `m.space.child`

View file

@ -0,0 +1,251 @@
# Research: Альтернативные варианты организации чатов в Telegram
## Сравнительная таблица
| Вариант | Friction для юзера | Техническая сложность | Ограничения | Статус |
|---------|-------------------|----------------------|-------------|--------|
| **1. Виртуальные чаты в DM** | Нет | Низкая | Нет | ✅ РЕКОМЕНДУЕТСЯ |
| **2. Threads / Reply Threads** | Высокая | Средняя | Требует группу, бот не создаёт темы | ❌ |
| **3. Multiple Bot Instances** | Высокая | Средняя | Нужно добавлять каждый | ❌ |
| **4. Inline Mode** | Нет | Низкая | Stateless, не для диалогов | ❌ |
| **5. Бот создаёт приватные группы** | Очень высокая | Очень высокая | Bot API не создаёт группы | ❌ |
| **6. Telegram Web App (TWA)** | Нет | Высокая | Нужен web-сервер | ⚠️ Phase 2 |
| **7. Forum Topics (исходный)** | Высокая | Средняя | Пользователь создаёт группу вручную | ⚠️ Оставить как опцию |
---
## Вариант 1: Виртуальные чаты в DM (РЕКОМЕНДУЕТСЯ)
### Как выглядит для пользователя
```
/start
→ [ Новый чат] [📋 Мои чаты] [⚙️ Настройки]
Нажимает "Новый чат"
→ "✅ Чат #1 создан! Начните писать..."
User: "Расскажи про Python"
→ "[Чат #1] Python — это язык программирования..."
Нажимает "Мои чаты"
→ 1⃣ Чат #1 (Python)
2⃣ Чат #2 (Математика) — 2 часа назад
3⃣ Исследование рынка — вчера
Нажимает на "Чат #2"
→ "Вы в Чате #2. Последние сообщения: ..."
→ Продолжает разговор
```
### Технические детали (aiogram 3.x)
#### FSM состояния
```python
from aiogram.fsm.state import State, StatesGroup
class UserStates(StatesGroup):
main_menu = State()
in_chat = State()
selecting_chat = State()
```
#### Хранение активного чата в StateData
```python
# При создании чата
await state.set_state(UserStates.in_chat)
await state.update_data(active_chat_id=chat_id)
# При получении сообщения
data = await state.get_data()
chat_id = data["active_chat_id"]
```
#### Список чатов с инлайн-кнопками
```python
from aiogram.types import InlineKeyboardMarkup, InlineKeyboardButton
@router.message(Command("list"))
async def cmd_list(message: Message, state: FSMContext):
chats = db.get_user_chats(message.from_user.id)
buttons = [
[InlineKeyboardButton(
text=f"📄 {chat.title or f'Чат #{chat.id[:6]}'}",
callback_data=f"select:{chat.id}"
)]
for chat in chats
]
await message.answer("📋 Ваши чаты:", reply_markup=InlineKeyboardMarkup(inline_keyboard=buttons))
@router.callback_query(F.data.startswith("select:"))
async def switch_chat(callback: CallbackQuery, state: FSMContext):
chat_id = callback.data.split(":")[1]
await state.update_data(active_chat_id=chat_id)
await state.set_state(UserStates.in_chat)
await callback.message.edit_text(f"✅ Переключился в чат. Пишите...")
```
#### Схема БД
```sql
CREATE TABLE chats (
chat_id TEXT PRIMARY KEY, -- UUID
user_id INTEGER NOT NULL,
title TEXT,
created_at TIMESTAMP,
updated_at TIMESTAMP
);
CREATE TABLE messages (
id INTEGER PRIMARY KEY,
chat_id TEXT NOT NULL,
user_id INTEGER NOT NULL,
direction TEXT, -- 'user' / 'bot'
content TEXT NOT NULL,
created_at TIMESTAMP,
FOREIGN KEY(chat_id) REFERENCES chats(chat_id)
);
```
### Ограничения
- История только в нашей БД, не в Telegram нативно
- Нет шаринга чата с другими пользователями
### Примеры в реальных ботах
- **ChatGPT Telegram боты** — именно этот паттерн, `/new` → новый разговор, `/history` → список
- **StudyGPT** — каждый урок = отдельный "чат" с контекстом
- **Notion Bot** — список "проектов" с переключением
### Оценка
✅ Нулевое трение — ничего настраивать не нужно
✅ Работает в DM — приватно
✅ Стандартный паттерн — пользователи знакомы
✅ Полный контроль над UX
✅ Масштабируется без ограничений
❌ История только в нашей БД (но это нормально)
---
## Вариант 2: Threads / Reply Topics — ОТКЛОНЕНО
### Суть
Telegram поддерживает `message_thread_id` в апдейтах — можно читать из какого треда пришло сообщение и отвечать в него. Но:
- **Бот не может создавать треды** — Bot API этого не умеет
- Работает только в Group/Supergroup, не в DM
- Пользователь должен создавать каждый тред вручную
- Не решает проблему friction — делает хуже
```python
# Читать можно
thread_id = message.message_thread_id # int или None
# Отвечать можно
await message.answer("ответ", message_thread_id=thread_id)
# Создать тред НЕЛЬЗЯ — метода нет в Bot API
```
---
## Вариант 3: Deep Linking — ОТКЛОНЕНО
Каждый чат = отдельная ссылка вида `https://t.me/bot?start=chatid_xyz`. Пользователь переходит по ссылке, бот предвыбирает чат.
**Проблема:** пользователь не знает что такие ссылки существуют, нет discovery, не масштабируется.
```python
@router.message(Command("start"))
async def cmd_start(message: Message):
args = message.text.split()
if len(args) > 1:
chat_id = args[1] # из deep link
# предвыбрать чат
```
---
## Вариант 4: Inline Mode — ОТКЛОНЕНО
Inline mode (`@botname query`) — fundamentally stateless. Один запрос → несколько результатов → пользователь выбирает. Нет истории, нет контекста. Категорически не подходит для многооборотного диалога с AI.
---
## Вариант 5: Бот создаёт приватные группы — ОТКЛОНЕНО
**Telegram Bot API не умеет создавать группы.** Единственный workaround — заранее создать 100+ пустых групп и назначать их пользователям. 1000 пользователей = 1000 лишних групп. Абсурд.
---
## Вариант 6: Telegram Web App (TWA) — Phase 2
### Суть
Кнопка в боте открывает мини-приложение (веб). Там красивый интерфейс со списком чатов, историей, поиском.
### Технические детали
```python
# В боте
from aiogram.types import WebAppInfo, InlineKeyboardButton
web_app_btn = InlineKeyboardButton(
text="📱 Открыть приложение",
web_app=WebAppInfo(url="https://lambda.example.com/app")
)
```
```javascript
// В TWA (JavaScript)
const tg = window.Telegram.WebApp;
const user_id = tg.initDataUnsafe.user.id;
// Загрузить список чатов
const chats = await fetch('/api/chats', {
headers: { 'X-Telegram-Init-Data': tg.initData }
}).then(r => r.json());
```
### Оценка
✅ Красивый интерфейс (история, поиск, форматирование)
✅ Мобильный-friendly
⚠️ Нужен HTTPS web-сервер
⚠️ Сложнее разрабатывать
⚠️ Усложняет деплой
**Вывод: хороший вариант для v2, не для MVP.**
---
## Финальные рекомендации
### Сейчас (MVP)
**Вариант 1 — Виртуальные чаты в DM.** Реализуется за 2-3 дня, нулевое friction, стандартный паттерн.
### Phase 2
**Вариант 6 — TWA** как дополнительный UI поверх той же логики. Бэкенд не меняется — просто добавляется web-интерфейс.
### Опционально (для пользователей которые хотят)
**Forum Topics** — оставить как opt-in возможность. Пользователь сам создаёт группу, добавляет бота — получает "продвинутый режим" с нативными Telegram темами.
---
## Что это означает для архитектуры
`ChatManager` в `core/chat.py` должен работать одинаково для обоих режимов — и виртуальных чатов в DM, и Forum Topics. Разница только в адаптере:
| | Telegram DM (virtual) | Telegram Forum | Matrix |
|-|-----------------------|----------------|--------|
| `chat_id` | UUID | `message_thread_id` | `room_id` |
| Создание | В БД | `create_forum_topic` | `room_create` |
| Отправка | `send_message(chat_id=user_id)` + tag | `send_message(thread_id=...)` | `room_send` |
| `core/` | Не знает разницы | Не знает разницы | Не знает разницы |

View file

@ -0,0 +1,350 @@
# Research: aiogram 3.x Forum Topics API
Based on: Telegram Bot API 7.0+, aiogram 3.x docs, GitHub examples.
## Создание супергруппы и тем
### Ключевое ограничение: бот не может создать группу сам
Telegram Bot API **не позволяет боту программно создавать группы**. Пользователь должен:
1. Создать супергруппу вручную
2. Добавить бота как администратора
3. Включить Topics (`ForumTopic` feature)
4. Переслать сообщение из группы боту — бот получит `chat_id`
Это фундаментальное ограничение. Флоу адаптируется так: бот просит пользователя создать группу, пользователь пересылает любое сообщение оттуда.
### Проверка прав бота в группе
```python
from aiogram import Bot
from aiogram.exceptions import TelegramBadRequest
async def check_bot_admin_rights(bot: Bot, chat_id: int) -> bool:
"""Проверяет что бот является администратором с правом управления темами."""
try:
member = await bot.get_chat_member(chat_id, (await bot.get_me()).id)
return (
member.status in ("administrator", "creator")
and getattr(member, "can_manage_topics", False)
)
except TelegramBadRequest:
return False
```
---
## Управление темами (Forum Topics)
### Создание темы
```python
from aiogram.types import ForumTopic
async def create_chat_topic(bot: Bot, group_id: int, name: str) -> int:
"""
Создаёт тему в Forum-группе.
Возвращает message_thread_id (используется для отправки сообщений в тему).
"""
topic: ForumTopic = await bot.create_forum_topic(
chat_id=group_id,
name=name, # до 128 символов
# icon_color — опционально (7322096, 16766590, 13338331, 9367192, 16749490, 16478047)
)
return topic.message_thread_id
# Пример: создать "Чат 1"
# thread_id = await create_chat_topic(bot, group_id, "Чат 1")
```
### Переименование, закрытие, удаление
```python
async def rename_topic(bot: Bot, group_id: int, thread_id: int, new_name: str) -> None:
await bot.edit_forum_topic(
chat_id=group_id,
message_thread_id=thread_id,
name=new_name,
)
async def archive_topic(bot: Bot, group_id: int, thread_id: int) -> None:
"""Закрывает тему (архивирует). Пользователи не могут писать в закрытую тему."""
await bot.close_forum_topic(
chat_id=group_id,
message_thread_id=thread_id,
)
async def reopen_topic(bot: Bot, group_id: int, thread_id: int) -> None:
await bot.reopen_forum_topic(
chat_id=group_id,
message_thread_id=thread_id,
)
async def delete_topic(bot: Bot, group_id: int, thread_id: int) -> None:
"""Удаляет тему и все её сообщения. Необратимо."""
await bot.delete_forum_topic(
chat_id=group_id,
message_thread_id=thread_id,
)
```
---
## Отправка и получение сообщений в темах
### Отправить сообщение в конкретную тему
```python
async def send_to_topic(
bot: Bot,
group_id: int,
thread_id: int,
text: str,
) -> None:
await bot.send_message(
chat_id=group_id,
message_thread_id=thread_id, # ключевой параметр
text=text,
)
```
### Router фильтр для сообщений из конкретной темы
```python
from aiogram import Router, F
from aiogram.types import Message
chat_router = Router()
# Слушать ВСЕ сообщения в темах (кроме General)
@chat_router.message(F.message_thread_id.is_not(None))
async def on_topic_message(message: Message) -> None:
thread_id = message.message_thread_id
group_id = message.chat.id
# ... обработка
```
### Подводный камень #1: message_thread_id может быть None
Сообщения в «General» теме (или если Topics выключены) имеют `message_thread_id = None`.
```python
@router.message()
async def handler(message: Message):
if message.message_thread_id is None:
# Сообщение в General или не в Forum-группе
return
# Сообщение в конкретной теме
thread_id = message.message_thread_id
```
---
## FSM паттерны для онбординга
### Многошаговый онбординг
```python
from aiogram.fsm.context import FSMContext
from aiogram.fsm.state import State, StatesGroup
from aiogram import Router, F
from aiogram.filters import Command
from aiogram.types import Message
class OnboardingState(StatesGroup):
waiting_for_group = State() # ждём пересылку из группы
setting_up = State() # настраиваем группу
onboarding_router = Router()
@onboarding_router.message(Command("start"))
async def cmd_start(message: Message, state: FSMContext) -> None:
current_state = await state.get_state()
if current_state is not None:
# Пользователь уже в процессе — напоминаем
await message.answer("Ты уже в процессе настройки. Перешли сообщение из группы.")
return
await state.set_state(OnboardingState.waiting_for_group)
await message.answer(
"Привет! Для начала создай группу в Telegram:\n"
"1. Создай супергруппу\n"
"2. Добавь меня как администратора\n"
"3. Перешли мне любое сообщение из этой группы"
)
@onboarding_router.message(
OnboardingState.waiting_for_group,
F.forward_from_chat.type == "supergroup",
)
async def handle_group_forward(message: Message, state: FSMContext, bot: Bot) -> None:
group_id = message.forward_from_chat.id
# Проверяем права
if not await check_bot_admin_rights(bot, group_id):
await message.answer(
"Не могу управлять темами. Убедись что:\n"
"- Я администратор группы\n"
"- У меня есть право управлять темами"
)
return
await state.update_data(group_id=group_id)
await state.set_state(OnboardingState.setting_up)
# Создаём первую тему
thread_id = await create_chat_topic(bot, group_id, "Чат 1")
await send_to_topic(bot, group_id, thread_id, "Привет! Я готов. Пиши здесь.")
await state.update_data(chat_1_thread_id=thread_id)
await state.clear() # онбординг завершён
await message.answer("✅ Всё готово! Пиши в Чат 1.")
```
### Передача данных между шагами (StateData)
```python
# Сохранять в StateData только то, что нужно в следующем шаге
# НЕ дублировать данные из БД
# Хорошо:
await state.update_data(group_id=group_id, pending_name="Анализ")
# Плохо: хранить большие объекты или данные которые уже в БД
await state.update_data(user_profile=big_dict) # избыточно
# Читать:
data = await state.get_data()
group_id = data["group_id"]
```
### FSM Storage — выбор хранилища
```python
from aiogram.fsm.storage.memory import MemoryStorage
from aiogram.fsm.storage.redis import RedisStorage
# MVP (разработка): данные теряются при перезапуске
storage = MemoryStorage()
# Продакшн: данные переживают перезапуски
from redis.asyncio import Redis
storage = RedisStorage(redis=Redis(host="localhost", port=6379))
# Альтернатива без Redis (простой персистентный вариант):
# pip install aiogram-sqlite-storage
from aiogram_sqlite_storage.sqlitestore import SQLStorage
storage = SQLStorage(db_path="fsm_states.db")
dp = Dispatcher(storage=storage)
```
### Подводный камень #2: MemoryStorage теряет данные при перезапуске
Если бот упал в середине онбординга — пользователь застрянет. Используй SQLiteStorage для MVP.
### Обработка timeout состояния
```python
import asyncio
from datetime import datetime, timedelta
# Способ 1: пассивный — при следующем обращении проверяем дату
@router.message()
async def any_handler(message: Message, state: FSMContext) -> None:
data = await state.get_data()
started_at = data.get("started_at")
if started_at:
elapsed = datetime.now() - datetime.fromisoformat(started_at)
if elapsed > timedelta(minutes=10):
await state.clear()
await message.answer("Сессия истекла. Начни заново: /start")
return
# ... продолжение обработки
# Способ 2: при старте шага сохранять timestamp
await state.update_data(started_at=datetime.now().isoformat())
```
---
## Подводные камни (резюме)
| Проблема | Решение |
|----------|---------|
| Бот не может создать группу | Пользователь создаёт, пересылает сообщение боту |
| `message_thread_id is None` | General тема или не Forum-группа — проверяй всегда |
| Бот не администратор | `check_bot_admin_rights()` перед операциями |
| FSM теряется при рестарте | Используй `SQLiteStorage` вместо `MemoryStorage` |
| Пользователь бросил онбординг | Сохраняй `started_at`, проверяй timeout |
| StateData разрастается | Хранить только параметры текущего шага |
---
## Конкурентный анализ UX
### Бот 1: ChatGPT (@ChatGPT официальный)
**Хорошо:**
- Простой `/start` без лишних шагов
- Typing indicator во время генерации
- Кнопки «Regenerate» и «Edit» под ответом
**Плохо:**
- Нет разделения на чаты (одна сплошная лента)
- Нет управления историей
**Паттерны для нас:** typing indicator обязателен; кнопки под сообщением вместо команд
---
### Бот 2: Notion AI бот
**Хорошо:**
- Онбординг в 3 шага с прогресс-баром
- Inline кнопки вместо команд
- Явный feedback: «✅ Готово», «❌ Ошибка»
**Плохо:**
- Много текста в одном сообщении
**Паттерны для нас:** пошаговый онбординг, явный feedback
---
### Бот 3: TaskMaster AI
**Хорошо:**
- Отдельный «чат» на каждую задачу (аналог наших Topics)
- `/list` показывает кнопки по одной задаче на строку
- Подтверждение деструктивных действий: «Удалить чат? [Да] [Нет]»
**Паттерны для нас:** подтверждение перед удалением/архивацией
---
## Паттерны для Lambda Surfaces
| Паттерн | Источник | Применение |
|---------|----------|------------|
| Typing indicator | ChatGPT | Всегда при запросе к платформе |
| Inline кнопки | Notion, TaskMaster | `/new`, `/chats`, настройки |
| Пошаговый онбординг | Notion | AuthPending → GroupSetup → Idle |
| Явный feedback | Notion | «✅ Чат создан», «❌ Ошибка» |
| Подтверждение удаления | TaskMaster | `/archive` и `/delete` |
| Прогресс в треде | общий паттерн | Долгие задачи агента |
## Анти-паттерны
- **Молчаливая обработка** — всегда показывай «обрабатываю...» если > 2 сек
- **Команды вместо кнопок** — пользователи не помнят команды, используй кнопки
- **Все настройки в одном сообщении** — разбивай на подменю
- **Нет подтверждения** — всегда спрашивай перед деструктивными действиями

View file

@ -2,7 +2,7 @@
from __future__ import annotations
from datetime import datetime
from typing import Any, Protocol
from typing import Any, AsyncIterator, Literal, Protocol
from pydantic import BaseModel
@ -16,6 +16,13 @@ class User(BaseModel):
is_new: bool = False
class Attachment(BaseModel):
url: str
mime_type: str
size: int | None = None
filename: str | None = None
class MessageResponse(BaseModel):
message_id: str
response: str
@ -23,6 +30,14 @@ class MessageResponse(BaseModel):
finished: bool
class MessageChunk(BaseModel):
"""Один кусок стримингового ответа. При sync-режиме — единственный чанк с finished=True."""
message_id: str
delta: str
finished: bool
tokens_used: int = 0
class UserSettings(BaseModel):
skills: dict[str, bool] = {}
connectors: dict[str, dict] = {}
@ -31,6 +46,15 @@ class UserSettings(BaseModel):
plan: dict[str, Any] = {}
class AgentEvent(BaseModel):
"""Webhook-уведомление от платформы — агент закончил долгую задачу."""
event_id: str
user_id: str
chat_id: str
event_type: Literal["task_done", "task_error", "task_progress"]
payload: dict[str, Any] = {}
class PlatformError(Exception):
def __init__(self, message: str, code: str = "PLATFORM_ERROR"):
super().__init__(message)
@ -45,15 +69,29 @@ class PlatformClient(Protocol):
display_name: str | None = None,
) -> User: ...
# Master manages container lifecycle — bot only sends user_id + chat_id.
# Sync — используем сейчас
async def send_message(
self,
user_id: str,
chat_id: str,
text: str,
attachments: list | None = None,
attachments: list[Attachment] | None = None,
) -> MessageResponse: ...
# Streaming — дверь открыта, мок отдаёт один чанк
async def stream_message(
self,
user_id: str,
chat_id: str,
text: str,
attachments: list[Attachment] | None = None,
) -> AsyncIterator[MessageChunk]: ...
async def get_settings(self, user_id: str) -> UserSettings: ...
async def update_settings(self, user_id: str, action: Any) -> None: ...
class WebhookReceiver(Protocol):
"""Регистрируется в боте. Платформа зовёт нас когда агент закончил долгую задачу."""
async def on_agent_event(self, event: AgentEvent) -> None: ...

View file

@ -5,11 +5,19 @@ import asyncio
import random
import uuid
from datetime import UTC, datetime
from typing import Any
from typing import Any, AsyncIterator, Literal
import structlog
from platform.interface import MessageResponse, User, UserSettings
from platform.interface import (
AgentEvent,
Attachment,
MessageChunk,
MessageResponse,
User,
UserSettings,
WebhookReceiver,
)
logger = structlog.get_logger(__name__)
@ -21,15 +29,27 @@ class MockPlatformClient:
Реализует PlatformClient Protocol. При подключении реального SDK
заменяется только этот файл core/ и адаптеры не трогаются.
Ключевое отличие от реальной платформы: не управляет lifecycle контейнера.
Master делает это сам при получении send_message.
attachment_mode симулирует разные варианты передачи файлов:
"url" платформа получает URL, скачивает сама (текущий план)
"binary" бинарные данные в теле (резерв)
"s3" pre-signed S3 URL (резерв)
Webhook: зарегистрируй WebhookReceiver через register_webhook_receiver(),
вызови simulate_agent_event() чтобы имитировать входящее уведомление.
"""
def __init__(self) -> None:
def __init__(
self,
attachment_mode: Literal["url", "binary", "s3"] = "url",
) -> None:
self.attachment_mode = attachment_mode
self._users: dict[str, dict] = {}
self._messages: dict[str, list] = {} # "{user_id}:{chat_id}" → messages
self._settings: dict[str, dict] = {}
logger.info("MockPlatformClient initialized")
self._webhook_receiver: WebhookReceiver | None = None
logger.info("MockPlatformClient initialized", attachment_mode=attachment_mode)
# ------------------------------------------------------------------ users
async def get_or_create_user(
self,
@ -52,39 +72,53 @@ class MockPlatformClient:
data = {**self._users[key], "is_new": is_new}
return User(**data)
# --------------------------------------------------------------- messages
async def send_message(
self,
user_id: str,
chat_id: str,
text: str,
attachments: list | None = None,
attachments: list[Attachment] | None = None,
) -> MessageResponse:
await self._latency(200, 600)
key = f"{user_id}:{chat_id}"
if key not in self._messages:
self._messages[key] = []
message_id = str(uuid.uuid4())
preview = text[:50] + ("..." if len(text) > 50 else "")
response = f"[MOCK] Ответ на: «{preview}»"
self._messages[key].append({
"message_id": message_id,
"user_text": text,
"response": response,
"tokens_used": len(text.split()) * 2,
"finished": True,
"created_at": datetime.now(UTC).isoformat(),
})
logger.info("Message sent", user_id=user_id, chat_id=chat_id, message_id=message_id)
message_id, response, tokens = self._build_response(user_id, chat_id, text, attachments)
logger.info("send_message", user_id=user_id, chat_id=chat_id, message_id=message_id)
return MessageResponse(
message_id=message_id,
response=response,
tokens_used=len(text.split()) * 2,
tokens_used=tokens,
finished=True,
)
async def stream_message(
self,
user_id: str,
chat_id: str,
text: str,
attachments: list[Attachment] | None = None,
) -> AsyncIterator[MessageChunk]:
"""
Сейчас: один чанк с полным ответом (sync под капотом).
При реальном SDK: заменить на SSE/WebSocket итератор в platform/mock.py.
Адаптеры переписывать не нужно.
"""
await self._latency(200, 600)
message_id, response, tokens = self._build_response(user_id, chat_id, text, attachments)
logger.info("stream_message", user_id=user_id, chat_id=chat_id, message_id=message_id)
async def _gen() -> AsyncIterator[MessageChunk]:
yield MessageChunk(
message_id=message_id,
delta=response,
finished=True,
tokens_used=tokens,
)
return _gen()
# --------------------------------------------------------------- settings
async def get_settings(self, user_id: str) -> UserSettings:
await self._latency()
stored = self._settings.get(user_id, {})
@ -127,11 +161,71 @@ class MockPlatformClient:
logger.info("Settings updated", user_id=user_id, action=action.action)
# --------------------------------------------------------------- webhooks
def register_webhook_receiver(self, receiver: WebhookReceiver) -> None:
"""Бот регистрирует свой обработчик входящих событий от платформы."""
self._webhook_receiver = receiver
logger.info("WebhookReceiver registered")
async def simulate_agent_event(
self,
user_id: str,
chat_id: str,
event_type: Literal["task_done", "task_error", "task_progress"] = "task_done",
payload: dict | None = None,
) -> None:
"""Имитирует входящий webhook от платформы. Используется в тестах и ручном QA."""
if self._webhook_receiver is None:
logger.warning("simulate_agent_event: no WebhookReceiver registered")
return
event = AgentEvent(
event_id=str(uuid.uuid4()),
user_id=user_id,
chat_id=chat_id,
event_type=event_type,
payload=payload or {"message": "[MOCK] Долгая задача выполнена"},
)
await self._webhook_receiver.on_agent_event(event)
# ------------------------------------------------------------------ utils
def get_stats(self) -> dict:
return {
"total_users": len(self._users),
"total_messages": sum(len(msgs) for msgs in self._messages.values()),
"attachment_mode": self.attachment_mode,
}
def _build_response(
self,
user_id: str,
chat_id: str,
text: str,
attachments: list[Attachment] | None,
) -> tuple[str, str, int]:
key = f"{user_id}:{chat_id}"
if key not in self._messages:
self._messages[key] = []
message_id = str(uuid.uuid4())
preview = text[:50] + ("..." if len(text) > 50 else "")
attachment_note = ""
if attachments:
names = [a.filename or a.mime_type for a in attachments]
attachment_note = f" [вложения: {', '.join(names)}]"
response = f"[MOCK] Ответ на: «{preview}»{attachment_note}"
tokens = len(text.split()) * 2
self._messages[key].append({
"message_id": message_id,
"user_text": text,
"response": response,
"tokens_used": tokens,
"finished": True,
"created_at": datetime.now(UTC).isoformat(),
})
return message_id, response, tokens
async def _latency(self, min_ms: int = 10, max_ms: int = 80) -> None:
await asyncio.sleep(random.randint(min_ms, max_ms) / 1000)