14 KiB
Telegram Adapter Design
Date: 2026-03-31
Status: Approved — ready for implementation
Scope: adapter/telegram/
Контекст
Telegram-адаптер — поверхность для взаимодействия пользователя с AI-агентом Lambda через Telegram.
Адаптер конвертирует Telegram-события в IncomingEvent (core protocol) и отправляет OutgoingEvent обратно.
Бизнес-логика — в core/, адаптер только переводит форматы и управляет Telegram API.
Чаты: основной режим — Виртуальные чаты в DM
Решение зафиксировано: основной режим — виртуальные чаты прямо в личке с ботом. Forum Topics — опциональный advanced режим (не реализуется в этом прототипе).
Принцип работы
active_chat_id— куда идут входящие сообщения от пользователя в данный момент- Ответы от агента всегда приходят в общий DM-поток с тегом:
[Чат #1] Вот ответ... - Пользователь может переключиться до получения ответа — ответ всё равно придёт, тегирован
UX флоу
/start
→ Приветствие + Чат #1 создан автоматически
→ Пользователь сразу пишет
/new [название]
→ Новый чат создан, переключаемся на него
/chats
→ Инлайн-кнопки: 1. Чат #1 2. Чат #2 3. Исследование рынка
→ Нажимает — переключился
Сообщение в активный чат
→ Typing indicator
→ [Чат #1] Ответ агента
Аутентификация
Флоу (мок)
/start→get_or_create_user(tg_user_id, "telegram", display_name)is_new=True→ создать Чат #1, написать приветствиеis_new=False→ восстановитьactive_chat_idиз БД, написать "С возвращением"
FSM состояния
class AuthState(StatesGroup):
# В моке состояний нет — auth мгновенный
# Зарезервировано для реального SDK (waiting_confirmation и т.п.)
pass
FSM состояния (полная схема)
class ChatState(StatesGroup):
idle = State() # В активном чате, ждём сообщения
waiting_response = State() # Запрос ушёл на платформу, ждём ответа
class SettingsState(StatesGroup):
menu = State() # Главное меню настроек
soul_editing = State() # Редактирует имя/инструкции агента
confirm_action = State() # Подтверждение деструктивного действия
active_chat_id хранится в FSM StateData, не в состоянии.
Структура файлов
adapter/telegram/
bot.py — точка входа: Dispatcher, routers, middleware
states.py — FSM StatesGroup
converter.py — aiogram Message → IncomingEvent и обратно
handlers/
auth.py — /start
chat.py — /new, /chats, /rename, /archive, сообщения в чате
settings.py — /settings и callback_query для настроек
confirm.py — подтверждение действий агента (InlineKeyboard ✅/❌)
keyboards/
chat.py — список чатов, управление чатом
settings.py — меню настроек, скиллы, коннекторы
confirm.py — кнопки подтверждения действия
Converter
Конвертация в обе стороны — adapter/telegram/converter.py.
aiogram → IncomingEvent
def from_message(message: Message) -> IncomingMessage:
return IncomingMessage(
user_id=str(message.from_user.id),
chat_id=active_chat_id, # из FSM StateData
text=message.text or "",
attachments=extract_attachments(message),
platform="telegram",
raw=message.model_dump(),
)
def extract_attachments(message: Message) -> list[Attachment]:
attachments = []
if message.photo:
file = message.photo[-1] # наибольшее разрешение
attachments.append(Attachment(
url=f"tg://file/{file.file_id}", # резолвим через getFile при необходимости
mime_type="image/jpeg",
size=file.file_size,
))
if message.document:
attachments.append(Attachment(
url=f"tg://file/{message.document.file_id}",
mime_type=message.document.mime_type or "application/octet-stream",
size=message.document.file_size,
filename=message.document.file_name,
))
if message.voice:
attachments.append(Attachment(
url=f"tg://file/{message.voice.file_id}",
mime_type="audio/ogg",
size=message.voice.file_size,
))
return attachments
OutgoingEvent → Telegram
async def send_outgoing(bot: Bot, user_id: int, chat_name: str, event: OutgoingEvent) -> None:
prefix = f"[{chat_name}] "
if isinstance(event, OutgoingMessage):
await bot.send_message(user_id, prefix + event.text)
elif isinstance(event, OutgoingUI):
# Кнопки подтверждения действия
keyboard = build_confirm_keyboard(event)
await bot.send_message(user_id, prefix + event.text, reply_markup=keyboard)
Обработчики
auth.py — /start
@router.message(CommandStart())
async def cmd_start(message: Message, state: FSMContext, platform: PlatformClient):
user = await platform.get_or_create_user(
external_id=str(message.from_user.id),
platform="telegram",
display_name=message.from_user.full_name,
)
if user.is_new:
chat_id = create_chat(user.user_id, "Чат #1") # в локальной БД
await state.update_data(active_chat_id=chat_id, active_chat_name="Чат #1")
await state.set_state(ChatState.idle)
await message.answer(
f"Привет, {message.from_user.first_name}! 👋\n"
f"Я создал тебе первый чат. Просто пиши.\n\n"
f"Команды: /new — новый чат, /chats — список"
)
else:
# Восстановить последний активный чат
last_chat = get_last_chat(user.user_id)
await state.update_data(active_chat_id=last_chat.id, active_chat_name=last_chat.name)
await state.set_state(ChatState.idle)
await message.answer(f"С возвращением! Продолжаем [{last_chat.name}]")
chat.py — сообщения
@router.message(ChatState.idle, F.text)
async def handle_message(message: Message, state: FSMContext, platform: PlatformClient):
data = await state.get_data()
chat_id = data["active_chat_id"]
chat_name = data["active_chat_name"]
await state.set_state(ChatState.waiting_response)
await message.bot.send_chat_action(message.chat.id, "typing")
incoming = from_message(message, chat_id)
outgoing_events = await core_handler.handle(incoming, platform)
await state.set_state(ChatState.idle)
for event in outgoing_events:
await send_outgoing(message.bot, message.from_user.id, chat_name, event)
chat.py — управление чатами
@router.message(Command("new"))
async def cmd_new_chat(message: Message, state: FSMContext):
args = message.text.split(maxsplit=1)
name = args[1] if len(args) > 1 else None
data = await state.get_data()
user_id = ... # из платформы
count = count_chats(user_id)
chat_name = name or f"Чат #{count + 1}"
chat_id = create_chat(user_id, chat_name)
await state.update_data(active_chat_id=chat_id, active_chat_name=chat_name)
await message.answer(f"✅ [{chat_name}] создан. Пиши!")
@router.message(Command("chats"))
async def cmd_list_chats(message: Message, state: FSMContext):
chats = get_user_chats(user_id)
data = await state.get_data()
active_id = data.get("active_chat_id")
buttons = []
for chat in chats:
mark = "● " if chat.id == active_id else ""
buttons.append([InlineKeyboardButton(
text=f"{mark}{chat.name}",
callback_data=f"switch:{chat.id}:{chat.name}"
)])
buttons.append([InlineKeyboardButton(text="➕ Новый чат", callback_data="new_chat")])
await message.answer("Твои чаты:", reply_markup=InlineKeyboardMarkup(inline_keyboard=buttons))
@router.callback_query(F.data.startswith("switch:"))
async def switch_chat(callback: CallbackQuery, state: FSMContext):
_, chat_id, chat_name = callback.data.split(":", 2)
await state.update_data(active_chat_id=chat_id, active_chat_name=chat_name)
await callback.message.edit_text(f"✅ Переключился на [{chat_name}]")
await callback.answer()
confirm.py — подтверждение действий агента
# Агент хочет выполнить действие → OutgoingUI приходит из core handler
# Бот показывает кнопки, ждёт ответа
@router.callback_query(F.data.startswith("confirm:"))
async def handle_confirm(callback: CallbackQuery, state: FSMContext, platform: PlatformClient):
_, action_id, decision = callback.data.split(":") # "confirm" / "cancel"
data = await state.get_data()
incoming = IncomingCallback(
user_id=...,
chat_id=data["active_chat_id"],
action="confirm" if decision == "yes" else "cancel",
payload={"action_id": action_id},
platform="telegram",
)
outgoing_events = await core_handler.handle(incoming, platform)
await callback.message.edit_reply_markup(reply_markup=None)
for event in outgoing_events:
await send_outgoing(callback.bot, callback.from_user.id, data["active_chat_name"], event)
await callback.answer()
Настройки
/settings → инлайн-меню. Структура:
⚙️ Настройки
[🧩 Скиллы] [🔗 Коннекторы]
[🧠 Личность] [🔒 Безопасность]
[💳 Подписка]
Скиллы — список с кнопками-переключателями ✅/❌. Нажатие → SettingsAction(toggle_skill).
Личность — свободные поля (имя агента, инструкции). Без пресетов стилей.
FSM: SettingsState.soul_editing → бот задаёт вопросы по одному полю.
Коннекторы — заглушка OAuth ссылки.
Безопасность — переключатели для деструктивных действий.
Подписка — заглушка с токенами.
Хранилище (БД)
Минимальная схема для прототипа:
CREATE TABLE tg_users (
tg_user_id INTEGER PRIMARY KEY,
platform_user_id TEXT NOT NULL, -- из MockPlatformClient
display_name TEXT,
created_at TIMESTAMP
);
CREATE TABLE chats (
chat_id TEXT PRIMARY KEY, -- UUID
tg_user_id INTEGER NOT NULL,
name TEXT NOT NULL,
created_at TIMESTAMP,
archived_at TIMESTAMP,
FOREIGN KEY(tg_user_id) REFERENCES tg_users(tg_user_id)
);
StateStore из core/store.py (SQLiteStore) — для FSM и общего состояния.
Typing indicator
Отправлять send_chat_action("typing") перед запросом к платформе.
Если запрос > 5 сек — возобновлять каждые 4 сек (action живёт ~5 сек).
async def with_typing(bot: Bot, chat_id: int, coro):
async def renew():
while True:
await bot.send_chat_action(chat_id, "typing")
await asyncio.sleep(4)
task = asyncio.create_task(renew())
try:
return await coro
finally:
task.cancel()
Обработка ответов при смене чата
Ответ всегда приходит в DM-поток с тегом:
[Чат #1] Вот мой ответ на вопрос про Python...
Пользователь мог переключить active_chat_id пока шёл запрос — это нормально.
chat_name берётся из StateData в момент отправки запроса (до set_state(waiting_response)).
Что НЕ реализуем в прототипе
- Forum Topics режим (researched, отложено)
- Webhook от платформы (platform ещё не готов — используем sync
send_message) /rename,/archiveдля чатов (добавить после основного флоу)- Экспорт истории
Порядок реализации
bot.py— Dispatcher, middleware для platform clientstates.py— FSM классыconverter.py— from_message, extract_attachmentshandlers/auth.py— /starthandlers/chat.py— сообщения + /new + /chatskeyboards/chat.py— список чатовhandlers/settings.py+keyboards/settings.py— меню настроекhandlers/confirm.py+keyboards/confirm.py— подтверждения