agent_api/api
2026-03-31 11:02:45 +03:00
..
docs модели передаваемых сообщений 2026-03-25 17:51:32 +03:00
.python-version файлы проекта 2026-03-25 16:31:31 +03:00
__init__.py #5 Переименованы IM->CM(IncomigMessage->ClientMessage),OM->SM(OutgoingMessage->ServerMessage) 2026-03-31 11:02:45 +03:00
agent_api.py #5 Переименованы IM->CM(IncomigMessage->ClientMessage),OM->SM(OutgoingMessage->ServerMessage) 2026-03-31 11:02:45 +03:00
models.py #5 Переименованы IM->CM(IncomigMessage->ClientMessage),OM->SM(OutgoingMessage->ServerMessage) 2026-03-31 11:02:45 +03:00
pyproject.toml #5 Реализовать клиентскую часть канала общения.Добавлен класс AgentApi, реализованный контекстным менеджером. Добавлен init.py для пакета api. В uv добавлена зависимость aiohttp 2026-03-29 18:02:27 +03:00
README.md Исправлен ридми в соответствии с изменениями в agent_api 2026-03-30 10:30:28 +03:00
uv.lock #5 Реализовать клиентскую часть канала общения.Добавлен класс AgentApi, реализованный контекстным менеджером. Добавлен init.py для пакета api. В uv добавлена зависимость aiohttp 2026-03-29 18:02:27 +03:00

Lambda Agent API

WebSocket API SDK для взаимодействия с AI-агентом.

Установка

pip install .

Требуется Python 3.14+.

Быстрый старт (с использованием AgentApi)

import asyncio
from agent_api import AgentApi, OM

def my_callback(message):
    if isinstance(message, OM.Error):
        print(f"\n[Ошибка: {message.code}] {message.details}")
    elif isinstance(message, OM.Status):
        print("✓ Agent status update")

async def main():
    # Автоматическое управление соединением через контекстный менеджер
    async with AgentApi("ws://localhost:8000", callback=my_callback) as api:
        # Отправляем сообщение и получаем итератор ответа
        response = await api.send_message("Привет, агент!")

        # Читаем ответ потоком чанков
        async for chunk in response:
            if isinstance(chunk, OM.EventTextChunk):
                print(chunk.text, end="", flush=True)
        
        print(f"\n[Завершено, использовано токенов: {response.tokens}]")

asyncio.run(main())

AgentApi - Асинхронный Python клиент

Новая библиотека AgentApi предоставляет типизированный асинхронный клиент для WebSocket взаимодействия с агентом.

Характеристики

  • Асинхронный клиент на основе aiohttp
  • Контекстный менеджер для управления соединением
  • ResponseIterator для асинхронной итерации по чанкам ответа
  • Callback для обработки событий вне генерации ответа (ошибки, статусы)
  • Типизированные сообщения через Pydantic с дискриминированными объединениями
  • Обработка ошибок с кастомным исключением AgentException
  • Логирование на всех уровнях операций
  • Полная документация всех методов

Использование

from agent_api import AgentApi, OM

# Использование с контекстным менеджером
async with AgentApi("ws://localhost:8000", callback=my_callback) as api:
    # Отправка сообщения и получение итератора
    response = await api.send_message("Your question here")

    # Итерация по чанкам ответа
    async for chunk in response:
        if isinstance(chunk, OM.EventTextChunk):
            print(chunk.text, end="", flush=True)
    
    print(f"\nDone! Tokens: {response.tokens}")

Callback функция для обработки событий вне генерации:

def my_callback(message):
    if isinstance(message, OM.Status):
        print("Agent status update")
    elif isinstance(message, OM.Error):
        print(f"Agent error: {message.code} - {message.details}")
    elif isinstance(message, OM.GracefulDisconnect):
        print("Agent disconnecting gracefully")

Классический подход (низкоуровневый)

import asyncio
import websockets
from models import ServerMessage, OM

async def main():
    uri = "ws://localhost:8000/ws"

    async with websockets.connect(uri) as ws:
        # 1. Ждём STATUS - подтверждение готовности
        status = await ws.recv()
        print(f"Connected: {status}")

        # 2. Отправляем сообщение
        await ws.send('{"type": "USER_MESSAGE", "text": "Привет!"}')

        # 3. Читаем ответ в виде потока событий
        while True:
            msg = await ws.recv()
            data = ServerMessage.model_validate_json(msg)

            match data:
                case OM.AgentEvent(subtype=OM.AgentEventType.TEXT_CHUNK):
                    print(data.text, end="", flush=True)
                case OM.EventEnd():
                    print(f"\n[Завершено, использовано токенов: {data.tokens_used}]")
                    break
                case OM.Error():
                    print(f"\n[Ошибка: {data.code}] {data.details}")
                    break

asyncio.run(main())

Протокол

Клиент → Сервер

USER_MESSAGE

Полное сообщение от пользователя.

{
  "type": "USER_MESSAGE",
  "text": "Текст сообщения"
}
Поле Тип Описание
type string Всегда USER_MESSAGE
text string Текст сообщения

Сервер → Клиент

STATUS

Отправляется сервером при открытии соединения с клиентом. Будет дополнен информацией о готовности агента принимать сообщения.

{
  "type": "STATUS"
}

AGENT_EVENT

Базовый класс для ивентов, которые стримит агент во время генерации ответа. Конкретный класс для ивента определяется по subtype.

TEXT_CHUNK

Чанк текста ответа агента.

{
  "type": "AGENT_EVENT",
  "subtype": "TEXT_CHUNK",
  "text": "Фрагмент текста"
}
END

Агент закончил генерацию ответа.

{
  "type": "AGENT_EVENT",
  "subtype": "END",
  "tokens_used": 42
}
Поле Тип Описание
tokens_used int Количество использованных токенов

ERROR

Неопределенная ошибка в работе агента.

{
  "type": "ERROR",
  "code": "error_code",
  "details": "Описание ошибки"
}
Поле Тип Описание
code string Код ошибки
details string Подробности

GRACEFUL_DISCONNECT

Отправляется перед завершением работы контейнера с агентом. Например, при долгом бездействии. Нужно, чтобы отделять обрыв соединения из-за ошибки с необходимостью повторного подключения. Приход этого сообщения означает, что агент осознанно завершает работу с клиентом по какой-то причине. Для дальнейшего взаимодействия нужно снова обратиться к мастеру.

{
  "type": "GRACEFUL_DISCONNECT"
}

Схема взаимодействия

Зависимости

  • Python 3.14+
  • pydantic >= 2.12.5