7.2 KiB
Lambda Agent API
WebSocket API SDK для взаимодействия с AI-агентом.
Установка
pip install .
Требуется Python 3.14+.
Быстрый старт (с использованием AgentApi)
import asyncio
from agent_api import AgentApi, OM
def my_callback(message):
if isinstance(message, OM.Error):
print(f"\n[Ошибка: {message.code}] {message.details}")
elif isinstance(message, OM.Status):
print("✓ Agent status update")
async def main():
# Автоматическое управление соединением через контекстный менеджер
async with AgentApi("ws://localhost:8000", callback=my_callback) as api:
# Отправляем сообщение и получаем итератор ответа
response = await api.send_message("Привет, агент!")
# Читаем ответ потоком чанков
async for chunk in response:
if isinstance(chunk, OM.EventTextChunk):
print(chunk.text, end="", flush=True)
print(f"\n[Завершено, использовано токенов: {response.tokens}]")
asyncio.run(main())
AgentApi - Асинхронный Python клиент
Новая библиотека AgentApi предоставляет типизированный асинхронный клиент для WebSocket взаимодействия с агентом.
Характеристики
- ✅ Асинхронный клиент на основе
aiohttp - ✅ Контекстный менеджер для управления соединением
- ✅ ResponseIterator для асинхронной итерации по чанкам ответа
- ✅ Callback для обработки событий вне генерации ответа (ошибки, статусы)
- ✅ Типизированные сообщения через Pydantic с дискриминированными объединениями
- ✅ Обработка ошибок с кастомным исключением
AgentException - ✅ Логирование на всех уровнях операций
- ✅ Полная документация всех методов
Использование
from agent_api import AgentApi, OM
# Использование с контекстным менеджером
async with AgentApi("ws://localhost:8000", callback=my_callback) as api:
# Отправка сообщения и получение итератора
response = await api.send_message("Your question here")
# Итерация по чанкам ответа
async for chunk in response:
if isinstance(chunk, OM.EventTextChunk):
print(chunk.text, end="", flush=True)
print(f"\nDone! Tokens: {response.tokens}")
Callback функция для обработки событий вне генерации:
def my_callback(message):
if isinstance(message, OM.Status):
print("Agent status update")
elif isinstance(message, OM.Error):
print(f"Agent error: {message.code} - {message.details}")
elif isinstance(message, OM.GracefulDisconnect):
print("Agent disconnecting gracefully")
Классический подход (низкоуровневый)
import asyncio
import websockets
from models import ServerMessage, OM
async def main():
uri = "ws://localhost:8000/ws"
async with websockets.connect(uri) as ws:
# 1. Ждём STATUS - подтверждение готовности
status = await ws.recv()
print(f"Connected: {status}")
# 2. Отправляем сообщение
await ws.send('{"type": "USER_MESSAGE", "text": "Привет!"}')
# 3. Читаем ответ в виде потока событий
while True:
msg = await ws.recv()
data = ServerMessage.model_validate_json(msg)
match data:
case OM.AgentEvent(subtype=OM.AgentEventType.TEXT_CHUNK):
print(data.text, end="", flush=True)
case OM.EventEnd():
print(f"\n[Завершено, использовано токенов: {data.tokens_used}]")
break
case OM.Error():
print(f"\n[Ошибка: {data.code}] {data.details}")
break
asyncio.run(main())
Протокол
Клиент → Сервер
USER_MESSAGE
Полное сообщение от пользователя.
{
"type": "USER_MESSAGE",
"text": "Текст сообщения"
}
| Поле | Тип | Описание |
|---|---|---|
| type | string | Всегда USER_MESSAGE |
| text | string | Текст сообщения |
Сервер → Клиент
STATUS
Отправляется сервером при открытии соединения с клиентом. Будет дополнен информацией о готовности агента принимать сообщения.
{
"type": "STATUS"
}
AGENT_EVENT
Базовый класс для ивентов, которые стримит агент во время генерации ответа. Конкретный класс для ивента определяется по subtype.
TEXT_CHUNK
Чанк текста ответа агента.
{
"type": "AGENT_EVENT",
"subtype": "TEXT_CHUNK",
"text": "Фрагмент текста"
}
END
Агент закончил генерацию ответа.
{
"type": "AGENT_EVENT",
"subtype": "END",
"tokens_used": 42
}
| Поле | Тип | Описание |
|---|---|---|
| tokens_used | int | Количество использованных токенов |
ERROR
Неопределенная ошибка в работе агента.
{
"type": "ERROR",
"code": "error_code",
"details": "Описание ошибки"
}
| Поле | Тип | Описание |
|---|---|---|
| code | string | Код ошибки |
| details | string | Подробности |
GRACEFUL_DISCONNECT
Отправляется перед завершением работы контейнера с агентом. Например, при долгом бездействии. Нужно, чтобы отделять обрыв соединения из-за ошибки с необходимостью повторного подключения. Приход этого сообщения означает, что агент осознанно завершает работу с клиентом по какой-то причине. Для дальнейшего взаимодействия нужно снова обратиться к мастеру.
{
"type": "GRACEFUL_DISCONNECT"
}
Зависимости
- Python 3.14+
- pydantic >= 2.12.5
