surfaces/docs/research/matrix-events.md
Mikhail Putilovskij 67499daa61 feat: extend platform mock + add research docs
platform/interface.py:
- Add Attachment, MessageChunk, AgentEvent types
- Add stream_message() to PlatformClient Protocol (door open for streaming)
- Add WebhookReceiver Protocol

platform/mock.py:
- Add attachment_mode config (url/binary/s3)
- Implement stream_message() — single chunk, ready for real streaming
- Add register_webhook_receiver() + simulate_agent_event() for testing

docs/research/:
- telegram-forum-topics.md — aiogram 3.x Forum Topics API, FSM patterns, UX analysis
- fsm-patterns.md — FSM storage options, StateData best practices
- matrix-spaces.md — matrix-nio Space API, room ordering, invite flow
- matrix-events.md — reactions, threads, typing, sync loop pitfalls
- telegram-chat-alternatives.md — 7 alternatives for multi-chat UX, virtual chats in DM recommended
2026-03-30 14:04:34 +03:00

13 KiB
Raw Permalink Blame History

Research: matrix-nio Event Handling

Based on: Matrix Client-Server Spec, matrix-nio 0.24+.

Реакции (m.reaction)

Реакции используют rel_type: "m.annotation" для связи с исходным сообщением.

Слушание и обработка реакций

from nio import AsyncClient, ReactionEvent

class ReactionHandler:
    def __init__(self, client: AsyncClient):
        self.client = client
        # Регистрируем callback — явно, не через декоратор
        self.client.add_event_callback(self.on_reaction, ReactionEvent)

    async def on_reaction(self, room, event: ReactionEvent) -> None:
        # Игнорируем свои реакции
        if event.sender == self.client.user_id:
            return

        # Извлекаем ключ и target event ID
        # Способ 1 (новые версии nio)
        reaction_key = getattr(event, "key", None)
        target_event_id = getattr(event, "reacts_to", None)

        # Способ 2 (fallback — работает во всех версиях)
        if reaction_key is None or target_event_id is None:
            relates_to = event.content.get("m.relates_to", {})
            reaction_key = relates_to.get("key")
            target_event_id = relates_to.get("event_id")

        if not reaction_key or not target_event_id:
            return

        if reaction_key == "👍":
            await self.handle_confirm(room, target_event_id)
        elif reaction_key == "❌":
            await self.handle_cancel(room, target_event_id)

    async def handle_confirm(self, room, target_event_id: str) -> None:
        await self.client.room_send(
            room_id=room.room_id,
            message_type="m.room.message",
            content={"msgtype": "m.text", "body": "✅ Подтверждено"},
        )

    async def handle_cancel(self, room, target_event_id: str) -> None:
        await self.client.room_send(
            room_id=room.room_id,
            message_type="m.room.message",
            content={"msgtype": "m.text", "body": "❌ Отменено"},
        )

Подводный камень #1: атрибуты ReactionEvent отличаются по версиям nio

В разных версиях matrix-nio атрибуты могут называться по-разному. Всегда используй fallback через event.content.

Подводный камень #2: callback нужно регистрировать явно

# НЕПРАВИЛЬНО — просто определить функцию недостаточно
async def on_reaction(room, event):
    pass

# ПРАВИЛЬНО
client.add_event_callback(on_reaction, ReactionEvent)

Треды (m.thread)

Создание треда и отправка сообщений

async def create_thread_root(client: AsyncClient, room_id: str, text: str) -> str:
    """
    Создаёт корневое сообщение для треда.
    Первое сообщение НЕ должно содержать m.relates_to.
    Возвращает event_id корня.
    """
    resp = await client.room_send(
        room_id=room_id,
        message_type="m.room.message",
        content={
            "msgtype": "m.text",
            "body": text,
            # Нет m.relates_to! Это корень.
        },
    )
    return resp.event_id


async def send_to_thread(
    client: AsyncClient,
    room_id: str,
    thread_root_id: str,
    text: str,
) -> str:
    """
    Отправляет сообщение в существующий тред.
    """
    resp = await client.room_send(
        room_id=room_id,
        message_type="m.room.message",
        content={
            "msgtype": "m.text",
            "body": text,
            "m.relates_to": {
                "rel_type": "m.thread",
                "event_id": thread_root_id,  # ID корневого сообщения
            },
        },
    )
    return resp.event_id


# Использование для долгой задачи:
async def run_long_task(client, room_id, task_text):
    root_id = await create_thread_root(client, room_id, f"📋 {task_text}")

    await send_to_thread(client, room_id, root_id, "⏳ Обрабатываю... (1/3)")
    # ... работа ...
    await send_to_thread(client, room_id, root_id, "⏳ Анализирую... (2/3)")
    # ... работа ...
    await send_to_thread(client, room_id, root_id, "✅ Готово!")

Подводный камень #3: корневое сообщение без m.relates_to

# НЕПРАВИЛЬНО — первое сообщение треда не должно ссылаться на что-либо
content = {
    "msgtype": "m.text",
    "body": "Начинаю задачу",
    "m.relates_to": {"rel_type": "m.thread", "event_id": "..."},  # ОШИБКА
}

# ПРАВИЛЬНО — просто обычное сообщение
content = {
    "msgtype": "m.text",
    "body": "Начинаю задачу",
}

Typing indicator

async def with_typing(client: AsyncClient, room_id: str, coro):
    """
    Запускает корутину с индикатором печати.
    Автоматически убирает индикатор в finally.
    """
    async def renew_typing():
        """Возобновляет typing каждые 5 секунд (timeout = 10 сек)."""
        import asyncio
        while True:
            await client.room_typing(room_id, typing_state=True, timeout=10000)
            await asyncio.sleep(5)

    import asyncio
    renewal_task = asyncio.create_task(renew_typing())
    try:
        return await coro
    finally:
        renewal_task.cancel()
        await client.room_typing(room_id, typing_state=False, timeout=0)


# Использование:
async def on_message(client, room, event):
    result = await with_typing(client, room.room_id, process_message(event.body))
    await client.room_send(room.room_id, "m.room.message", {"msgtype": "m.text", "body": result})

Подводный камень #4: typing исчезает через 10 секунд

Нужно возобновлять каждые 5 секунд если обработка долгая.


Фильтрация собственных сообщений

from nio import RoomMessageText

class MessageHandler:
    def __init__(self, client: AsyncClient):
        self.client = client
        self.client.add_event_callback(self.on_message, RoomMessageText)

    async def on_message(self, room, event: RoomMessageText) -> None:
        # ОБЯЗАТЕЛЬНО: игнорировать свои сообщения во избежание петли
        if event.sender == self.client.user_id:
            return

        # Обрабатываем только чужие сообщения
        await self.process(room, event)

Подводный камень #5: user_id может быть None до login()

if self.client.user_id is None:
    return  # клиент ещё не залогинен

if event.sender == self.client.user_id:
    return

Обработка invite (m.room.member)

from nio import InviteEvent, InviteMemberEvent

class InviteHandler:
    def __init__(self, client: AsyncClient):
        self.client = client
        self.client.add_event_callback(self.on_invite, InviteEvent)

    async def on_invite(self, room, event: InviteMemberEvent) -> None:
        """
        Вызывается когда бот получает приглашение в комнату.
        room.room_id — куда приглашают
        room.inviter — кто приглашает
        """
        inviter = room.inviter
        room_id = room.room_id

        # Принимаем приглашение
        resp = await self.client.join(room_id)
        if hasattr(resp, "room_id"):
            # Успешно вошли — начинаем регистрацию пользователя
            await self.start_onboarding(room_id, inviter)
        else:
            print(f"Failed to join {room_id}: {resp}")

    async def start_onboarding(self, room_id: str, user_id: str) -> None:
        await self.client.room_send(
            room_id=room_id,
            message_type="m.room.message",
            content={"msgtype": "m.text", "body": "Привет! Создаю ваше пространство..."},
        )

Подводный камень #6: InviteRoom vs Room

В момент invite бот ещё не в комнате, поэтому room.name может быть None.


Sync loop и reconnect

Полная реализация с reconnect

from nio import AsyncClient, SyncError
import asyncio, logging

logger = logging.getLogger(__name__)

class SyncManager:
    def __init__(self, client: AsyncClient):
        self.client = client
        self.sync_token = None
        self.should_stop = False

    async def run(self) -> None:
        reconnect_delay = 5
        attempts = 0

        while not self.should_stop:
            try:
                await self._sync_once()
                attempts = 0  # сбрасываем счётчик на успешный sync
                reconnect_delay = 5
            except Exception as e:
                attempts += 1
                if attempts > 10:
                    logger.error("Too many reconnect failures, stopping")
                    break
                delay = min(reconnect_delay * (2 ** (attempts - 1)), 300)
                logger.warning(f"Sync error: {e}. Retrying in {delay}s")
                await asyncio.sleep(delay)

    async def _sync_once(self) -> None:
        is_first_sync = self.sync_token is None

        response = await self.client.sync(
            since=self.sync_token,
            full_state=is_first_sync,
            set_presence="online",
            timeout=30000,
        )

        if isinstance(response, SyncError):
            raise Exception(f"Sync failed: {response.message}")

        if is_first_sync:
            # При первом синхе — только сохраняем токен, не обрабатываем старые события
            logger.info("Initial sync complete, skipping old events")
            self.sync_token = response.next_batch
            return

        self.sync_token = response.next_batch
        # Callbacks вызываются автоматически через add_event_callback

Подводный камень #7: при первом синхе — много старых событий

# Если не пропустить первый синх — бот обработает всю историю комнат
if self.sync_token is None:
    self.sync_token = response.next_batch
    return  # пропустить, только взять токен

Подводный камень #8: full_state нужен только для первого синха

# При повторных синхах full_state=True сильно замедляет работу
response = await client.sync(
    since=token,
    full_state=token is None,  # True только если первый раз
)

Подводные камни (резюме)

Проблема Решение
Реакции не обрабатываются client.add_event_callback(handler, ReactionEvent)
event.reacts_to не существует Fallback: event.content.get("m.relates_to", {}).get("event_id")
Первое сообщение треда не создаёт тред Первое сообщение БЕЗ m.relates_to
Бот зацикливается event.sender == client.user_id → return
Старые события при старте Пропусти первый синх: сохрани токен, не обрабатывай
Typing исчезает быстро Возобновляй каждые 5 секунд
InviteRoom.name is None Нормально, используй room.room_id

Выводы для нашей реализации

  1. Регистрировать callbacks в __init__ бота, до первого sync()
  2. Всегда проверять event.sender != client.user_id
  3. Для подтверждений — слать сообщение с инструкцией 👍 / ❌, слушать ReactionEvent
  4. Для долгих задачcreate_thread_root(), статусы в тред через send_to_thread()
  5. Typing — использовать with_typing() helper с автообновлением
  6. Sync loop — при первом синхе пропускать старые события