Compare commits

..

18 commits

Author SHA1 Message Date
8a4f4db6d3 фикс некорректной склейки url при налиции суффикса в base_url 2026-04-23 22:42:19 +03:00
aa480bbec5 Merge pull request '#15 Файловые вложения к сообщениям от юзера' (#16) from #15-attachments into master
Reviewed-on: #16
2026-04-20 10:22:38 +00:00
972fc23bfc актуальный README и menual тест 2026-04-20 13:21:26 +03:00
bd2923ffca поле attachments в модели MsgUserMessage и соответствующие параметры в send_message 2026-04-20 13:20:36 +03:00
483a63b999 Merge branch '#13-file-transfer'
# Conflicts:
#	README.md
#	lambda_agent_api/agent_api.py
#	tests/manual.py
2026-04-19 22:08:48 +03:00
700f7fa0e7 актуальный README 2026-04-19 21:59:37 +03:00
0cf3117fde обработка новой модели, актуальный manual.py 2026-04-19 21:14:19 +03:00
4c3e7253c7 AgentAPI принимает base_url (по идее ws://host:port) и сам дописывает нужный эндпоинт 2026-04-19 21:09:26 +03:00
8256453bbf актуализация документации и manual теста 2026-04-19 21:09:26 +03:00
1b8efdb4a4 корректная обработка ошибки, когда чат занят другим клиентом 2026-04-19 21:09:25 +03:00
9ac4ef1ba1 поддержка chat_id 2026-04-19 21:09:25 +03:00
146dcdf21d модель MsgEventSendFile для получения файлов от агента 2026-04-19 21:00:34 +03:00
aac37db672 Merge pull request '#11 Разделение истории сообщений через chat_id' (#12) from #11-chat-id into master
Reviewed-on: #12
2026-04-19 14:41:48 +00:00
0c9906ecb4 AgentAPI принимает base_url (по идее ws://host:port) и сам дописывает нужный эндпоинт 2026-04-19 15:49:33 +03:00
234050df9f актуализация документации и manual теста 2026-04-19 15:12:07 +03:00
ee98eb09d9 корректная обработка ошибки, когда чат занят другим клиентом 2026-04-19 14:58:49 +03:00
649834beae поддержка chat_id 2026-04-19 11:58:45 +03:00
bb20a84741 Merge pull request '#9 Обработка tool call и новых моделей на клиенте' (#10) from #9-clientside-tool-call into master
Reviewed-on: #10
2026-04-14 23:30:38 +00:00
5 changed files with 216 additions and 112 deletions

View file

@ -1,9 +1,16 @@
from lambda_agent_api import AgentApi
# Lambda Agent API
WebSocket API SDK для взаимодействия с AI-агентом.
## Release Notes
# v1.1
- **CRITICAL**: `AgentAPI` вместо `url` принимает `base_url` и сам дописывает нужный эндпоинт.
Раньше: `AgentAPI(url="ws://localhost:8000/agent_ws/")`. Сейчас: `AgentAPI(base_url="ws://localhost:8000/")`
- Добавлен параметр `chat_id` в конструктор `AgentAPI`. Нужен для разделения истории сообщений по чатам/веткам.
- `AgentAPI.connect()` вызывает `AgentBusyException`, если выбранный чат уже занят другим API клиентом.
- Добавлен новый тип события `MsgEventSendFile` для отправки файлов пользователю. Поле `path` — путь к файлу относительно `/workspace`.
- `send_message(text, attachments)` теперь принимает `attachments` — список путей к файлам относительно `/workspace`.
## Установка
В `master` всегда будет актуальная рабочая версия.
```bash
@ -14,42 +21,10 @@ pip install git+https://git.lambda.coredump.ru/platform/agent_api.git
## Быстрый старт (с использованием AgentApi)
```python
import asyncio
from lambda_agent_api.agent_api import AgentApi
from lambda_agent_api.server import MsgEventTextChunk
**Рабочий REPL пример: [tests/manual.py](tests/manual.py).**
async def main():
api = AgentApi("agent-1", "ws://localhost:8000/ws")
await api.connect()
try:
response = await api.send_message("Привет, агент!")
async for chunk in response:
if isinstance(chunk, MsgEventTextChunk):
print(chunk.text, end="", flush=True)
elif isinstance(chunk, MsgEventToolCallChunk):
print(f"Tool call started: {chunk.tool_name}")
elif isinstance(chunk, MsgEventToolResult):
print(f"Tool result: {chunk.result}")
elif isinstance(chunk, MsgEventCustomUpdate):
print(f"Progress update: {chunk.payload}")
elif isinstance(chunk, MsgEventEnd):
print(f"Generation ended, tokens used: {chunk.tokens_used}")
finally:
await api.close()
asyncio.run(main())
```
> `AgentApi.send_message()` возвращает стриминг-итерируемый объект, который может выдавать не только текстовые чанки, но и события инструментов (`MsgEventToolCallChunk`, `MsgEventToolResult`, `MsgEventCustomUpdate`) и финальный `MsgEventEnd`.
## Предполагаемое использование
## Предполагаемое управление подключениями
```python
from lambda_agent_api.agent_api import AgentApi
@ -191,6 +166,22 @@ async def on_telegram_message(from_user: int, text: str):
| payload | object | Любые данные о прогрессе |
| source | string | Источник события (по умолчанию "main") |
#### AGENT_EVENT_SEND_FILE
Агент отправляет файл пользователю. Путь к файлу относительно `/workspace`.
```json
{
"type": "AGENT_EVENT_SEND_FILE",
"path": "reports/2024/report.pdf"
}
```
| Поле | Тип | Описание |
|--------|--------|-----------------------------------------------|
| type | string | Всегда `AGENT_EVENT_SEND_FILE` |
| path | string | Путь к файлу относительно `/workspace` |
#### AGENT_EVENT_END
Агент закончил генерацию ответа.

View file

@ -2,6 +2,9 @@ import logging
from typing import Callable, Optional, AsyncIterator
import aiohttp
import asyncio
from urllib.parse import urljoin
from aiohttp import WSServerHandshakeError
from lambda_agent_api.server import *
from lambda_agent_api.client import *
@ -25,12 +28,14 @@ class AgentApi:
def __init__(
self,
agent_id: str,
url: str,
base_url: str,
callback: Optional[Callable[[ServerMessage], None]] = None,
on_disconnect: Optional[Callable[['AgentApi'], None]] = None
on_disconnect: Optional[Callable[["AgentApi"], None]] = None,
chat_id: int = 0, # значение по умолчанию для обратной совместимости # значение по умолчанию для обратной совместимости
):
self.id = agent_id # ID агента для словаря
self.url = url
self.chat_id = chat_id
self.url = urljoin(base_url, f"v1/agent_ws/{chat_id}/")
self.callback = callback
self.on_disconnect = on_disconnect
@ -43,7 +48,11 @@ class AgentApi:
self._listen_task: asyncio.Task | None = None
async def connect(self):
"""Явное подключение к агенту."""
"""Явное подключение к агенту.
:raise AgentBusyException: Чат занят другим клиентом.
:raise AgentException: Непредвиденная ошибка протокола, см. code и details
"""
self._session = aiohttp.ClientSession()
try:
self._ws = await self._session.ws_connect(self.url, heartbeat=30)
@ -58,10 +67,12 @@ class AgentApi:
logger.info(f"Agent {self.id} is ready")
else:
raise AgentException(
"INVALID_STATUS", f"Expected SM.Status, got {status_msg.type}")
"INVALID_STATUS", f"Expected SM.Status, got {status_msg.type}"
)
else:
raise AgentException("UNEXPECTED_MSG_TYPE",
f"Unexpected message type: {msg.type}")
raise AgentException(
"UNEXPECTED_MSG_TYPE", f"Unexpected message type: {msg.type}"
)
self._connected = True
self._listen_task = asyncio.create_task(self._listen())
@ -74,7 +85,8 @@ class AgentApi:
await self._session.close()
raise AgentException(
"TIMEOUT", "Agent did not send initial Status message within 5 seconds") from e
"TIMEOUT", "Agent did not send initial Status message within 5 seconds"
) from e
except AgentException:
# Перехватываем наши собственные ошибки (INVALID_STATUS, UNEXPECTED_MSG_TYPE),
@ -85,6 +97,25 @@ class AgentApi:
await self._session.close()
raise
except WSServerHandshakeError as e: # если при открытии подключения сервер вернул какую-то ошибку
if self._session and not self._session.closed:
await self._session.close()
# во-первых, aiohttp зачем-то приводит WS коды ошибок к HTTP.
# во-вторых, делает он это некорректно. Любой неизвестный код WS ошибки становится 403 HTTP
# в-третьих, он не передает оригинальное сообщение об ошибке.
# Т. е. какое бы сообщение сервер не отправлял, тут всегда будет "Invalid response status"
# см. site-packages\aiohttp\client.py, line 1104, in _ws_connect
# итого понять реальную причину ошибки почти невозможно. Нужно менять библиотеку
# сейчас сервер специально кидает только ошибку с WS кодом 1008 Policy Violation, когда внутри ловит ChatBusyError
# поэтому скорее всего, если мы получили WSServerHandshakeError с 403, то это внутренний ChatBusyError
if e.status != 403:
# обрабатываем как обычную ошибку WS по примеру except блока ниже
raise AgentException(code="CONNECTION_ERROR",
details=f"Failed to connect agent {self.id}: {e}") from e
raise AgentBusyException(f"Chat {self.chat_id} is already in use by other client")
except Exception as e:
# Обработка всех остальных ошибок (например, aiohttp.ClientConnectionError)
if self._ws and not self._ws.closed:
@ -93,8 +124,10 @@ class AgentApi:
await self._session.close()
# Можно оставить RuntimeError, а можно тоже завернуть в AgentException
raise AgentException(code="CONNECTION_ERROR",
details=f"Failed to connect agent {self.id}: {e}") from e
raise AgentException(
code="CONNECTION_ERROR",
details=f"Failed to connect agent {self.id}: {e}",
) from e
async def close(self):
"""Явное ручное закрытие соединения."""
@ -131,19 +164,25 @@ class AgentApi:
except Exception as e:
logger.error(f"Error in on_disconnect: {e}")
async def send_message(self, text: str) -> AsyncIterator[AgentEventUnion]:
async def send_message(
self, text: str, attachments: list[str] | None = None
) -> AsyncIterator[AgentEventUnion]:
"""
Нативный асинхронный генератор.
Не требует отдельного класса ResponseIterator.
Гарантированно освобождает блокировку.
Args:
text: Текст сообщения.
attachments: Список путей к файлам относительно /workspace.
"""
if not self._connected or not self._ws:
raise AgentException(code="NOT_CONNECTED",
details="Not connected. Call connect() first.")
raise AgentException(
code="NOT_CONNECTED", details="Not connected. Call connect() first."
)
if self._request_lock.locked():
raise AgentBusyException(
"Agent is currently processing another request")
raise AgentBusyException("Agent is currently processing another request")
# Блокируем параллельные запросы
# если идет стриминг ответа, то при попытки отправить новое сообщение будет ошибка - ее рейзим(делаем AgentBusyError)
@ -154,7 +193,8 @@ class AgentApi:
message = MsgUserMessage(
type=EClientMessage.USER_MESSAGE,
text=text
text=text,
attachments=attachments or [],
)
await self._ws.send_str(message.model_dump_json())
@ -198,7 +238,8 @@ class AgentApi:
# Если это исключение, просто логируем, в коллбек его кидать не стоит
if isinstance(orphan_msg, Exception):
logger.debug(
f"[{self.id}] Dropped exception from queue during cleanup: {orphan_msg}")
f"[{self.id}] Dropped exception from queue during cleanup: {orphan_msg}"
)
continue
# 3. Отправляем "мусорные/осиротевшие" куски в callback
@ -206,7 +247,8 @@ class AgentApi:
self.callback(orphan_msg)
else:
logger.debug(
f"[{self.id}] Dropped orphaned message during cleanup")
f"[{self.id}] Dropped orphaned message during cleanup"
)
except asyncio.QueueEmpty:
break
@ -216,34 +258,41 @@ class AgentApi:
self._request_lock.release()
async def _listen(self):
""""
"""
Прослушивание вебсокета.
"""
try:
async for msg in self._ws:
if msg.type == aiohttp.WSMsgType.TEXT:
try:
outgoing_msg = ServerMessage.validate_json(
msg.data)
outgoing_msg = ServerMessage.validate_json(msg.data)
if isinstance(outgoing_msg, (MsgEventTextChunk,
MsgEventToolCallChunk,
MsgEventToolResult,
MsgEventCustomUpdate,
MsgEventEnd)):
if isinstance(
outgoing_msg,
(
MsgEventTextChunk,
MsgEventToolCallChunk,
MsgEventToolResult,
MsgEventCustomUpdate,
MsgEventSendFile,
MsgEventEnd,
),
):
if self._current_queue:
await self._current_queue.put(outgoing_msg)
elif self.callback:
self.callback(outgoing_msg)
else:
logger.warning(
f"[{self.id}] AgentEvent without active request")
f"[{self.id}] AgentEvent without active request"
)
elif isinstance(outgoing_msg, MsgError):
if self.callback:
self.callback(outgoing_msg)
error = AgentException(
outgoing_msg.code, outgoing_msg.details)
outgoing_msg.code, outgoing_msg.details
)
logger.error(f"[{self.id}] Agent error: {error}")
if self._current_queue:
await self._current_queue.put(error)
@ -251,8 +300,7 @@ class AgentApi:
elif isinstance(outgoing_msg, MsgGracefulDisconnect):
if self.callback:
self.callback(outgoing_msg)
logger.info(
f"[{self.id}] Gracefully disconnecting")
logger.info(f"[{self.id}] Gracefully disconnecting")
break # Выход из цикла приведет к finally -> _cleanup
else:
@ -264,17 +312,14 @@ class AgentApi:
self.callback(outgoing_msg)
except Exception as e:
logger.error(
f"[{self.id}] Failed to deserialize message: {e}")
logger.error(f"[{self.id}] Failed to deserialize message: {e}")
if self._current_queue:
await self._current_queue.put(
AgentException(
"PARSE_ERROR", f"Validation failed: {e}")
AgentException("PARSE_ERROR", f"Validation failed: {e}")
)
elif msg.type in (aiohttp.WSMsgType.ERROR, aiohttp.WSMsgType.CLOSED):
logger.error(
f"[{self.id}] WebSocket closed/error: {msg.type}")
logger.error(f"[{self.id}] WebSocket closed/error: {msg.type}")
break
except asyncio.CancelledError:

View file

@ -19,6 +19,12 @@ class MsgUserMessage(BaseModel):
"""
Текст сообщения.
"""
attachments: list[str] = Field(default_factory=list)
"""
Список вложений (файлов) к сообщению.
Передается путь до файла относительно /workspace .
Файлы уже должны быть загружены в директорию.
"""
ClientMessage = TypeAdapter(Annotated[

View file

@ -4,10 +4,18 @@ from typing import Literal, Annotated, Union, Any, Dict, Optional
__all__ = [
'EServerMessage', 'MsgStatus', 'MsgError', 'MsgGracefulDisconnect',
'MsgEventTextChunk', 'MsgEventToolCallChunk', 'MsgEventToolResult',
'MsgEventCustomUpdate', 'MsgEventEnd',
'AgentEventUnion', 'ServerMessage'
"EServerMessage",
"MsgStatus",
"MsgError",
"MsgGracefulDisconnect",
"MsgEventTextChunk",
"MsgEventToolCallChunk",
"MsgEventToolResult",
"MsgEventCustomUpdate",
"MsgEventSendFile",
"MsgEventEnd",
"AgentEventUnion",
"ServerMessage",
]
@ -19,18 +27,21 @@ class EServerMessage(str, Enum):
# Ивенты агента
AGENT_EVENT_TEXT_CHUNK = "AGENT_EVENT_TEXT_CHUNK"
AGENT_EVENT_TOOL_CALL_CHUNK = "AGENT_EVENT_TOOL_CALL_CHUNK" # Новое
AGENT_EVENT_TOOL_RESULT = "AGENT_EVENT_TOOL_RESULT" # Новоеы
AGENT_EVENT_CUSTOM_UPDATE = "AGENT_EVENT_CUSTOM_UPDATE" # Новое
AGENT_EVENT_TOOL_RESULT = "AGENT_EVENT_TOOL_RESULT" # Новоеы
AGENT_EVENT_CUSTOM_UPDATE = "AGENT_EVENT_CUSTOM_UPDATE" # Новое
AGENT_EVENT_SEND_FILE = "AGENT_EVENT_SEND_FILE" # Новое
AGENT_EVENT_END = "AGENT_EVENT_END"
class MsgStatus(BaseModel):
"""Отправляется сервером при открытии соединения с клиентом."""
type: Literal[EServerMessage.STATUS] = EServerMessage.STATUS
class MsgError(BaseModel):
"""Неопределенная ошибка в работе агента."""
type: Literal[EServerMessage.ERROR] = EServerMessage.ERROR
code: str
details: str
@ -38,16 +49,23 @@ class MsgError(BaseModel):
class MsgGracefulDisconnect(BaseModel):
"""Отправляется перед завершением работы контейнера с агентом."""
type: Literal[EServerMessage.GRACEFUL_DISCONNECT] = EServerMessage.GRACEFUL_DISCONNECT
type: Literal[EServerMessage.GRACEFUL_DISCONNECT] = (
EServerMessage.GRACEFUL_DISCONNECT
)
# ------------------------------------------------------------------
# AGENT EVENTS (События генерации)
# ------------------------------------------------------------------
class MsgEventTextChunk(BaseModel):
"""Чанк текста ответа агента."""
type: Literal[EServerMessage.AGENT_EVENT_TEXT_CHUNK] = EServerMessage.AGENT_EVENT_TEXT_CHUNK
type: Literal[EServerMessage.AGENT_EVENT_TEXT_CHUNK] = (
EServerMessage.AGENT_EVENT_TEXT_CHUNK
)
text: str
# Новое: "main" (главный агент) или "tools:..." (субагент, если будем использовать)
source: str = "main" # пока везде будет main
@ -55,17 +73,23 @@ class MsgEventTextChunk(BaseModel):
class MsgEventToolCallChunk(BaseModel):
"""Агент решил использовать инструмент и генерирует аргументы."""
type: Literal[EServerMessage.AGENT_EVENT_TOOL_CALL_CHUNK] = EServerMessage.AGENT_EVENT_TOOL_CALL_CHUNK
type: Literal[EServerMessage.AGENT_EVENT_TOOL_CALL_CHUNK] = (
EServerMessage.AGENT_EVENT_TOOL_CALL_CHUNK
)
tool_name: Optional[str] = Field(
None, description="Имя инструмента (приходит обычно в первом чанке)")
args_chunk: Optional[str] = Field(
None, description="Кусок JSON-аргументов")
None, description="Имя инструмента (приходит обычно в первом чанке)"
)
args_chunk: Optional[str] = Field(None, description="Кусок JSON-аргументов")
source: str = "main"
class MsgEventToolResult(BaseModel):
"""Инструмент отработал и вернул результат."""
type: Literal[EServerMessage.AGENT_EVENT_TOOL_RESULT] = EServerMessage.AGENT_EVENT_TOOL_RESULT
type: Literal[EServerMessage.AGENT_EVENT_TOOL_RESULT] = (
EServerMessage.AGENT_EVENT_TOOL_RESULT
)
tool_name: str
result: Any # Может быть строкой, словарем или списком
source: str = "main"
@ -73,14 +97,28 @@ class MsgEventToolResult(BaseModel):
class MsgEventCustomUpdate(BaseModel):
"""Кастомный прогресс (например, скачивание файла) изнутри инструмента."""
type: Literal[EServerMessage.AGENT_EVENT_CUSTOM_UPDATE] = EServerMessage.AGENT_EVENT_CUSTOM_UPDATE
type: Literal[EServerMessage.AGENT_EVENT_CUSTOM_UPDATE] = (
EServerMessage.AGENT_EVENT_CUSTOM_UPDATE
)
payload: Dict[str, Any] = Field(
..., description="Любые данные о прогрессе (status, progress и т.д.)")
..., description="Любые данные о прогрессе (status, progress и т.д.)"
)
source: str = "main"
class MsgEventSendFile(BaseModel):
"""Агент отправляет файл пользователю."""
type: Literal[EServerMessage.AGENT_EVENT_SEND_FILE] = (
EServerMessage.AGENT_EVENT_SEND_FILE
)
path: str = Field(..., description="Путь к файлу относительно /workspace")
class MsgEventEnd(BaseModel):
"""Агент закончил генерацию ответа."""
type: Literal[EServerMessage.AGENT_EVENT_END] = EServerMessage.AGENT_EVENT_END
tokens_used: int
@ -95,25 +133,24 @@ AgentEventUnion = Union[
MsgEventToolCallChunk,
MsgEventToolResult,
MsgEventCustomUpdate,
MsgEventEnd
MsgEventSendFile,
MsgEventEnd,
]
# Обновлено: добавили новые модели в Union адаптера
ServerMessage = TypeAdapter(Annotated[
Union[
MsgStatus,
MsgError,
MsgGracefulDisconnect,
MsgEventTextChunk,
MsgEventToolCallChunk,
MsgEventToolResult,
MsgEventCustomUpdate,
MsgEventEnd
],
Field(discriminator="type")
])
# ServerMessage использует AgentEventUnion + остальные типы
ServerMessage = TypeAdapter(
Annotated[
Union[
MsgStatus,
MsgError,
MsgGracefulDisconnect,
AgentEventUnion,
],
Field(discriminator="type"),
]
)
"""
Объединяет все типы исходящих сообщений в одно для удобной автоматической десериализации.
TypeAdapter для десериализации всех входящих сообщений (от сервера).
Pydantic сам определит нужный тип в зависимости от поля `type`.
Использование:
msg = ServerMessage.validate_json(json_str)

View file

@ -1,33 +1,58 @@
import asyncio
import traceback
from lambda_agent_api.agent_api import AgentApi
from lambda_agent_api.server import MsgEventTextChunk, MsgEventToolCallChunk, MsgEventToolResult
from lambda_agent_api.agent_api import AgentApi, AgentBusyException
from lambda_agent_api.server import (
MsgEventTextChunk,
MsgEventToolCallChunk,
MsgEventToolResult,
MsgEventSendFile,
)
async def main():
api = AgentApi("agent-1", "ws://localhost:8000/agent_ws/")
chat_id = input("Chat id: ") or 0
api = AgentApi("agent-1", "ws://localhost:8000/", chat_id=chat_id)
try:
await api.connect()
except AgentBusyException:
print(f"Чат {chat_id} занят другим клиентом")
return
await api.connect()
while True:
try:
prompt = await asyncio.get_event_loop().run_in_executor(None, input, ">>> ")
attachments_input = await asyncio.get_event_loop().run_in_executor(
None, input, "Attachments (comma-separated, empty for none): "
)
attachments = (
[a.strip() for a in attachments_input.split(",") if a.strip()]
if attachments_input.strip()
else None
)
print("Agent: ", end="")
is_tool = False
async for chunk in api.send_message(prompt):
async for chunk in api.send_message(prompt, attachments):
match chunk:
case MsgEventTextChunk():
is_tool = False
print(chunk.text, end="", flush=True)
case MsgEventToolCallChunk():
if not is_tool:
print(f"\n\n### TOOL CALL: ({chunk.tool_name}) ", end="", flush=True)
print(
f"\n\n### TOOL CALL: ({chunk.tool_name}) ",
end="",
flush=True,
)
is_tool = True
print(chunk.args_chunk, end="", flush=True)
case MsgEventToolResult():
is_tool = False
print(f"\nResult: {chunk.result}\n\n", end="", flush=True)
case MsgEventSendFile():
print(f"\n[SEND FILE] {chunk.path}\n", end="", flush=True)
print("\n")
except KeyboardInterrupt: