surfaces/docs/superpowers/specs/2026-03-31-telegram-adapter-design.md

14 KiB
Raw Permalink Blame History

Telegram Adapter Design

Date: 2026-03-31 Status: Approved — ready for implementation Scope: adapter/telegram/


Контекст

Telegram-адаптер — поверхность для взаимодействия пользователя с AI-агентом Lambda через Telegram. Адаптер конвертирует Telegram-события в IncomingEvent (core protocol) и отправляет OutgoingEvent обратно. Бизнес-логика — в core/, адаптер только переводит форматы и управляет Telegram API.


Чаты: основной режим — Виртуальные чаты в DM

Решение зафиксировано: основной режим — виртуальные чаты прямо в личке с ботом. Forum Topics — опциональный advanced режим (не реализуется в этом прототипе).

Принцип работы

  • active_chat_id — куда идут входящие сообщения от пользователя в данный момент
  • Ответы от агента всегда приходят в общий DM-поток с тегом: [Чат #1] Вот ответ...
  • Пользователь может переключиться до получения ответа — ответ всё равно придёт, тегирован

UX флоу

/start
→ Приветствие + Чат #1 создан автоматически
→ Пользователь сразу пишет

/new [название]
→ Новый чат создан, переключаемся на него

/chats
→ Инлайн-кнопки: 1. Чат #1  2. Чат #2  3. Исследование рынка
→ Нажимает — переключился

Сообщение в активный чат
→ Typing indicator
→ [Чат #1] Ответ агента

Аутентификация

Флоу (мок)

  1. /startget_or_create_user(tg_user_id, "telegram", display_name)
  2. is_new=True → создать Чат #1, написать приветствие
  3. is_new=False → восстановить active_chat_id из БД, написать "С возвращением"

FSM состояния

class AuthState(StatesGroup):
    # В моке состояний нет — auth мгновенный
    # Зарезервировано для реального SDK (waiting_confirmation и т.п.)
    pass

FSM состояния (полная схема)

class ChatState(StatesGroup):
    idle = State()              # В активном чате, ждём сообщения
    waiting_response = State()  # Запрос ушёл на платформу, ждём ответа

class SettingsState(StatesGroup):
    menu = State()              # Главное меню настроек
    soul_editing = State()      # Редактирует имя/инструкции агента
    confirm_action = State()    # Подтверждение деструктивного действия

active_chat_id хранится в FSM StateData, не в состоянии.


Структура файлов

adapter/telegram/
  bot.py                  — точка входа: Dispatcher, routers, middleware
  states.py               — FSM StatesGroup
  converter.py            — aiogram Message → IncomingEvent и обратно

  handlers/
    auth.py               — /start
    chat.py               — /new, /chats, /rename, /archive, сообщения в чате
    settings.py           — /settings и callback_query для настроек
    confirm.py            — подтверждение действий агента (InlineKeyboard ✅/❌)

  keyboards/
    chat.py               — список чатов, управление чатом
    settings.py           — меню настроек, скиллы, коннекторы
    confirm.py            — кнопки подтверждения действия

Converter

Конвертация в обе стороны — adapter/telegram/converter.py.

aiogram → IncomingEvent

def from_message(message: Message) -> IncomingMessage:
    return IncomingMessage(
        user_id=str(message.from_user.id),
        chat_id=active_chat_id,          # из FSM StateData
        text=message.text or "",
        attachments=extract_attachments(message),
        platform="telegram",
        raw=message.model_dump(),
    )

def extract_attachments(message: Message) -> list[Attachment]:
    attachments = []
    if message.photo:
        file = message.photo[-1]          # наибольшее разрешение
        attachments.append(Attachment(
            url=f"tg://file/{file.file_id}",   # резолвим через getFile при необходимости
            mime_type="image/jpeg",
            size=file.file_size,
        ))
    if message.document:
        attachments.append(Attachment(
            url=f"tg://file/{message.document.file_id}",
            mime_type=message.document.mime_type or "application/octet-stream",
            size=message.document.file_size,
            filename=message.document.file_name,
        ))
    if message.voice:
        attachments.append(Attachment(
            url=f"tg://file/{message.voice.file_id}",
            mime_type="audio/ogg",
            size=message.voice.file_size,
        ))
    return attachments

OutgoingEvent → Telegram

async def send_outgoing(bot: Bot, user_id: int, chat_name: str, event: OutgoingEvent) -> None:
    prefix = f"[{chat_name}] "

    if isinstance(event, OutgoingMessage):
        await bot.send_message(user_id, prefix + event.text)

    elif isinstance(event, OutgoingUI):
        # Кнопки подтверждения действия
        keyboard = build_confirm_keyboard(event)
        await bot.send_message(user_id, prefix + event.text, reply_markup=keyboard)

Обработчики

auth.py — /start

@router.message(CommandStart())
async def cmd_start(message: Message, state: FSMContext, platform: PlatformClient):
    user = await platform.get_or_create_user(
        external_id=str(message.from_user.id),
        platform="telegram",
        display_name=message.from_user.full_name,
    )

    if user.is_new:
        chat_id = create_chat(user.user_id, "Чат #1")   # в локальной БД
        await state.update_data(active_chat_id=chat_id, active_chat_name="Чат #1")
        await state.set_state(ChatState.idle)
        await message.answer(
            f"Привет, {message.from_user.first_name}! 👋\n"
            f"Я создал тебе первый чат. Просто пиши.\n\n"
            f"Команды: /new — новый чат, /chats — список"
        )
    else:
        # Восстановить последний активный чат
        last_chat = get_last_chat(user.user_id)
        await state.update_data(active_chat_id=last_chat.id, active_chat_name=last_chat.name)
        await state.set_state(ChatState.idle)
        await message.answer(f"С возвращением! Продолжаем [{last_chat.name}]")

chat.py — сообщения

@router.message(ChatState.idle, F.text)
async def handle_message(message: Message, state: FSMContext, platform: PlatformClient):
    data = await state.get_data()
    chat_id = data["active_chat_id"]
    chat_name = data["active_chat_name"]

    await state.set_state(ChatState.waiting_response)
    await message.bot.send_chat_action(message.chat.id, "typing")

    incoming = from_message(message, chat_id)
    outgoing_events = await core_handler.handle(incoming, platform)

    await state.set_state(ChatState.idle)

    for event in outgoing_events:
        await send_outgoing(message.bot, message.from_user.id, chat_name, event)

chat.py — управление чатами

@router.message(Command("new"))
async def cmd_new_chat(message: Message, state: FSMContext):
    args = message.text.split(maxsplit=1)
    name = args[1] if len(args) > 1 else None

    data = await state.get_data()
    user_id = ...   # из платформы
    count = count_chats(user_id)
    chat_name = name or f"Чат #{count + 1}"

    chat_id = create_chat(user_id, chat_name)
    await state.update_data(active_chat_id=chat_id, active_chat_name=chat_name)
    await message.answer(f"✅ [{chat_name}] создан. Пиши!")


@router.message(Command("chats"))
async def cmd_list_chats(message: Message, state: FSMContext):
    chats = get_user_chats(user_id)
    data = await state.get_data()
    active_id = data.get("active_chat_id")

    buttons = []
    for chat in chats:
        mark = "● " if chat.id == active_id else ""
        buttons.append([InlineKeyboardButton(
            text=f"{mark}{chat.name}",
            callback_data=f"switch:{chat.id}:{chat.name}"
        )])
    buttons.append([InlineKeyboardButton(text=" Новый чат", callback_data="new_chat")])

    await message.answer("Твои чаты:", reply_markup=InlineKeyboardMarkup(inline_keyboard=buttons))


@router.callback_query(F.data.startswith("switch:"))
async def switch_chat(callback: CallbackQuery, state: FSMContext):
    _, chat_id, chat_name = callback.data.split(":", 2)
    await state.update_data(active_chat_id=chat_id, active_chat_name=chat_name)
    await callback.message.edit_text(f"✅ Переключился на [{chat_name}]")
    await callback.answer()

confirm.py — подтверждение действий агента

# Агент хочет выполнить действие → OutgoingUI приходит из core handler
# Бот показывает кнопки, ждёт ответа

@router.callback_query(F.data.startswith("confirm:"))
async def handle_confirm(callback: CallbackQuery, state: FSMContext, platform: PlatformClient):
    _, action_id, decision = callback.data.split(":")  # "confirm" / "cancel"
    data = await state.get_data()

    incoming = IncomingCallback(
        user_id=...,
        chat_id=data["active_chat_id"],
        action="confirm" if decision == "yes" else "cancel",
        payload={"action_id": action_id},
        platform="telegram",
    )
    outgoing_events = await core_handler.handle(incoming, platform)
    await callback.message.edit_reply_markup(reply_markup=None)
    for event in outgoing_events:
        await send_outgoing(callback.bot, callback.from_user.id, data["active_chat_name"], event)
    await callback.answer()

Настройки

/settings → инлайн-меню. Структура:

⚙️ Настройки
[🧩 Скиллы]        [🔗 Коннекторы]
[🧠 Личность]      [🔒 Безопасность]
[💳 Подписка]

Скиллы — список с кнопками-переключателями /. Нажатие → SettingsAction(toggle_skill).

Личность — свободные поля (имя агента, инструкции). Без пресетов стилей. FSM: SettingsState.soul_editing → бот задаёт вопросы по одному полю.

Коннекторы — заглушка OAuth ссылки.

Безопасность — переключатели для деструктивных действий.

Подписка — заглушка с токенами.


Хранилище (БД)

Минимальная схема для прототипа:

CREATE TABLE tg_users (
    tg_user_id   INTEGER PRIMARY KEY,
    platform_user_id TEXT NOT NULL,    -- из MockPlatformClient
    display_name TEXT,
    created_at   TIMESTAMP
);

CREATE TABLE chats (
    chat_id      TEXT PRIMARY KEY,     -- UUID
    tg_user_id   INTEGER NOT NULL,
    name         TEXT NOT NULL,
    created_at   TIMESTAMP,
    archived_at  TIMESTAMP,
    FOREIGN KEY(tg_user_id) REFERENCES tg_users(tg_user_id)
);

StateStore из core/store.py (SQLiteStore) — для FSM и общего состояния.


Typing indicator

Отправлять send_chat_action("typing") перед запросом к платформе. Если запрос > 5 сек — возобновлять каждые 4 сек (action живёт ~5 сек).

async def with_typing(bot: Bot, chat_id: int, coro):
    async def renew():
        while True:
            await bot.send_chat_action(chat_id, "typing")
            await asyncio.sleep(4)
    task = asyncio.create_task(renew())
    try:
        return await coro
    finally:
        task.cancel()

Обработка ответов при смене чата

Ответ всегда приходит в DM-поток с тегом:

[Чат #1] Вот мой ответ на вопрос про Python...

Пользователь мог переключить active_chat_id пока шёл запрос — это нормально. chat_name берётся из StateData в момент отправки запроса (до set_state(waiting_response)).


Что НЕ реализуем в прототипе

  • Forum Topics режим (researched, отложено)
  • Webhook от платформы (platform ещё не готов — используем sync send_message)
  • /rename, /archive для чатов (добавить после основного флоу)
  • Экспорт истории

Порядок реализации

  1. bot.py — Dispatcher, middleware для platform client
  2. states.py — FSM классы
  3. converter.py — from_message, extract_attachments
  4. handlers/auth.py — /start
  5. handlers/chat.py — сообщения + /new + /chats
  6. keyboards/chat.py — список чатов
  7. handlers/settings.py + keyboards/settings.py — меню настроек
  8. handlers/confirm.py + keyboards/confirm.py — подтверждения