diff --git a/README.md b/README.md index dd88b72..496c876 100644 --- a/README.md +++ b/README.md @@ -1,16 +1,9 @@ +from lambda_agent_api import AgentApi + # Lambda Agent API WebSocket API SDK для взаимодействия с AI-агентом. -## Release Notes -# v1.1 -- **CRITICAL**: `AgentAPI` вместо `url` принимает `base_url` и сам дописывает нужный эндпоинт. - Раньше: `AgentAPI(url="ws://localhost:8000/agent_ws/")`. Сейчас: `AgentAPI(base_url="ws://localhost:8000/")` -- Добавлен параметр `chat_id` в конструктор `AgentAPI`. Нужен для разделения истории сообщений по чатам/веткам. -- `AgentAPI.connect()` вызывает `AgentBusyException`, если выбранный чат уже занят другим API клиентом. -- Добавлен новый тип события `MsgEventSendFile` для отправки файлов пользователю. Поле `path` — путь к файлу относительно `/workspace`. -- `send_message(text, attachments)` теперь принимает `attachments` — список путей к файлам относительно `/workspace`. - ## Установка В `master` всегда будет актуальная рабочая версия. ```bash @@ -21,10 +14,42 @@ pip install git+https://git.lambda.coredump.ru/platform/agent_api.git ## Быстрый старт (с использованием AgentApi) -**Рабочий REPL пример: [tests/manual.py](tests/manual.py).** +```python +import asyncio + +from lambda_agent_api.agent_api import AgentApi +from lambda_agent_api.server import MsgEventTextChunk -## Предполагаемое управление подключениями +async def main(): + api = AgentApi("agent-1", "ws://localhost:8000/ws") + + await api.connect() + try: + response = await api.send_message("Привет, агент!") + + async for chunk in response: + if isinstance(chunk, MsgEventTextChunk): + print(chunk.text, end="", flush=True) + elif isinstance(chunk, MsgEventToolCallChunk): + print(f"Tool call started: {chunk.tool_name}") + elif isinstance(chunk, MsgEventToolResult): + print(f"Tool result: {chunk.result}") + elif isinstance(chunk, MsgEventCustomUpdate): + print(f"Progress update: {chunk.payload}") + elif isinstance(chunk, MsgEventEnd): + print(f"Generation ended, tokens used: {chunk.tokens_used}") + + finally: + await api.close() + + +asyncio.run(main()) +``` + +> `AgentApi.send_message()` возвращает стриминг-итерируемый объект, который может выдавать не только текстовые чанки, но и события инструментов (`MsgEventToolCallChunk`, `MsgEventToolResult`, `MsgEventCustomUpdate`) и финальный `MsgEventEnd`. + +## Предполагаемое использование ```python from lambda_agent_api.agent_api import AgentApi @@ -166,22 +191,6 @@ async def on_telegram_message(from_user: int, text: str): | payload | object | Любые данные о прогрессе | | source | string | Источник события (по умолчанию "main") | -#### AGENT_EVENT_SEND_FILE - -Агент отправляет файл пользователю. Путь к файлу относительно `/workspace`. - -```json -{ - "type": "AGENT_EVENT_SEND_FILE", - "path": "reports/2024/report.pdf" -} -``` - -| Поле | Тип | Описание | -|--------|--------|-----------------------------------------------| -| type | string | Всегда `AGENT_EVENT_SEND_FILE` | -| path | string | Путь к файлу относительно `/workspace` | - #### AGENT_EVENT_END Агент закончил генерацию ответа. diff --git a/lambda_agent_api/agent_api.py b/lambda_agent_api/agent_api.py index 0c57c8c..d130bcf 100644 --- a/lambda_agent_api/agent_api.py +++ b/lambda_agent_api/agent_api.py @@ -2,9 +2,6 @@ import logging from typing import Callable, Optional, AsyncIterator import aiohttp import asyncio -from urllib.parse import urljoin - -from aiohttp import WSServerHandshakeError from lambda_agent_api.server import * from lambda_agent_api.client import * @@ -28,14 +25,12 @@ class AgentApi: def __init__( self, agent_id: str, - base_url: str, + url: str, callback: Optional[Callable[[ServerMessage], None]] = None, - on_disconnect: Optional[Callable[["AgentApi"], None]] = None, - chat_id: int = 0, # значение по умолчанию для обратной совместимости # значение по умолчанию для обратной совместимости + on_disconnect: Optional[Callable[['AgentApi'], None]] = None ): self.id = agent_id # ID агента для словаря - self.chat_id = chat_id - self.url = urljoin(base_url, f"v1/agent_ws/{chat_id}/") + self.url = url self.callback = callback self.on_disconnect = on_disconnect @@ -48,11 +43,7 @@ class AgentApi: self._listen_task: asyncio.Task | None = None async def connect(self): - """Явное подключение к агенту. - - :raise AgentBusyException: Чат занят другим клиентом. - :raise AgentException: Непредвиденная ошибка протокола, см. code и details - """ + """Явное подключение к агенту.""" self._session = aiohttp.ClientSession() try: self._ws = await self._session.ws_connect(self.url, heartbeat=30) @@ -67,12 +58,10 @@ class AgentApi: logger.info(f"Agent {self.id} is ready") else: raise AgentException( - "INVALID_STATUS", f"Expected SM.Status, got {status_msg.type}" - ) + "INVALID_STATUS", f"Expected SM.Status, got {status_msg.type}") else: - raise AgentException( - "UNEXPECTED_MSG_TYPE", f"Unexpected message type: {msg.type}" - ) + raise AgentException("UNEXPECTED_MSG_TYPE", + f"Unexpected message type: {msg.type}") self._connected = True self._listen_task = asyncio.create_task(self._listen()) @@ -85,8 +74,7 @@ class AgentApi: await self._session.close() raise AgentException( - "TIMEOUT", "Agent did not send initial Status message within 5 seconds" - ) from e + "TIMEOUT", "Agent did not send initial Status message within 5 seconds") from e except AgentException: # Перехватываем наши собственные ошибки (INVALID_STATUS, UNEXPECTED_MSG_TYPE), @@ -97,25 +85,6 @@ class AgentApi: await self._session.close() raise - except WSServerHandshakeError as e: # если при открытии подключения сервер вернул какую-то ошибку - if self._session and not self._session.closed: - await self._session.close() - - # во-первых, aiohttp зачем-то приводит WS коды ошибок к HTTP. - # во-вторых, делает он это некорректно. Любой неизвестный код WS ошибки становится 403 HTTP - # в-третьих, он не передает оригинальное сообщение об ошибке. - # Т. е. какое бы сообщение сервер не отправлял, тут всегда будет "Invalid response status" - # см. site-packages\aiohttp\client.py, line 1104, in _ws_connect - # итого понять реальную причину ошибки почти невозможно. Нужно менять библиотеку - # сейчас сервер специально кидает только ошибку с WS кодом 1008 Policy Violation, когда внутри ловит ChatBusyError - # поэтому скорее всего, если мы получили WSServerHandshakeError с 403, то это внутренний ChatBusyError - if e.status != 403: - # обрабатываем как обычную ошибку WS по примеру except блока ниже - raise AgentException(code="CONNECTION_ERROR", - details=f"Failed to connect agent {self.id}: {e}") from e - - raise AgentBusyException(f"Chat {self.chat_id} is already in use by other client") - except Exception as e: # Обработка всех остальных ошибок (например, aiohttp.ClientConnectionError) if self._ws and not self._ws.closed: @@ -124,10 +93,8 @@ class AgentApi: await self._session.close() # Можно оставить RuntimeError, а можно тоже завернуть в AgentException - raise AgentException( - code="CONNECTION_ERROR", - details=f"Failed to connect agent {self.id}: {e}", - ) from e + raise AgentException(code="CONNECTION_ERROR", + details=f"Failed to connect agent {self.id}: {e}") from e async def close(self): """Явное ручное закрытие соединения.""" @@ -164,25 +131,19 @@ class AgentApi: except Exception as e: logger.error(f"Error in on_disconnect: {e}") - async def send_message( - self, text: str, attachments: list[str] | None = None - ) -> AsyncIterator[AgentEventUnion]: + async def send_message(self, text: str) -> AsyncIterator[AgentEventUnion]: """ - Нативный асинхронный генератор. + Нативный асинхронный генератор. Не требует отдельного класса ResponseIterator. Гарантированно освобождает блокировку. - - Args: - text: Текст сообщения. - attachments: Список путей к файлам относительно /workspace. """ if not self._connected or not self._ws: - raise AgentException( - code="NOT_CONNECTED", details="Not connected. Call connect() first." - ) + raise AgentException(code="NOT_CONNECTED", + details="Not connected. Call connect() first.") if self._request_lock.locked(): - raise AgentBusyException("Agent is currently processing another request") + raise AgentBusyException( + "Agent is currently processing another request") # Блокируем параллельные запросы # если идет стриминг ответа, то при попытки отправить новое сообщение будет ошибка - ее рейзим(делаем AgentBusyError) @@ -193,8 +154,7 @@ class AgentApi: message = MsgUserMessage( type=EClientMessage.USER_MESSAGE, - text=text, - attachments=attachments or [], + text=text ) await self._ws.send_str(message.model_dump_json()) @@ -238,8 +198,7 @@ class AgentApi: # Если это исключение, просто логируем, в коллбек его кидать не стоит if isinstance(orphan_msg, Exception): logger.debug( - f"[{self.id}] Dropped exception from queue during cleanup: {orphan_msg}" - ) + f"[{self.id}] Dropped exception from queue during cleanup: {orphan_msg}") continue # 3. Отправляем "мусорные/осиротевшие" куски в callback @@ -247,8 +206,7 @@ class AgentApi: self.callback(orphan_msg) else: logger.debug( - f"[{self.id}] Dropped orphaned message during cleanup" - ) + f"[{self.id}] Dropped orphaned message during cleanup") except asyncio.QueueEmpty: break @@ -258,41 +216,34 @@ class AgentApi: self._request_lock.release() async def _listen(self): - """ + """" Прослушивание вебсокета. """ try: async for msg in self._ws: if msg.type == aiohttp.WSMsgType.TEXT: try: - outgoing_msg = ServerMessage.validate_json(msg.data) + outgoing_msg = ServerMessage.validate_json( + msg.data) - if isinstance( - outgoing_msg, - ( - MsgEventTextChunk, - MsgEventToolCallChunk, - MsgEventToolResult, - MsgEventCustomUpdate, - MsgEventSendFile, - MsgEventEnd, - ), - ): + if isinstance(outgoing_msg, (MsgEventTextChunk, + MsgEventToolCallChunk, + MsgEventToolResult, + MsgEventCustomUpdate, + MsgEventEnd)): if self._current_queue: await self._current_queue.put(outgoing_msg) elif self.callback: self.callback(outgoing_msg) else: logger.warning( - f"[{self.id}] AgentEvent without active request" - ) + f"[{self.id}] AgentEvent without active request") elif isinstance(outgoing_msg, MsgError): if self.callback: self.callback(outgoing_msg) error = AgentException( - outgoing_msg.code, outgoing_msg.details - ) + outgoing_msg.code, outgoing_msg.details) logger.error(f"[{self.id}] Agent error: {error}") if self._current_queue: await self._current_queue.put(error) @@ -300,7 +251,8 @@ class AgentApi: elif isinstance(outgoing_msg, MsgGracefulDisconnect): if self.callback: self.callback(outgoing_msg) - logger.info(f"[{self.id}] Gracefully disconnecting") + logger.info( + f"[{self.id}] Gracefully disconnecting") break # Выход из цикла приведет к finally -> _cleanup else: @@ -312,14 +264,17 @@ class AgentApi: self.callback(outgoing_msg) except Exception as e: - logger.error(f"[{self.id}] Failed to deserialize message: {e}") + logger.error( + f"[{self.id}] Failed to deserialize message: {e}") if self._current_queue: await self._current_queue.put( - AgentException("PARSE_ERROR", f"Validation failed: {e}") + AgentException( + "PARSE_ERROR", f"Validation failed: {e}") ) elif msg.type in (aiohttp.WSMsgType.ERROR, aiohttp.WSMsgType.CLOSED): - logger.error(f"[{self.id}] WebSocket closed/error: {msg.type}") + logger.error( + f"[{self.id}] WebSocket closed/error: {msg.type}") break except asyncio.CancelledError: diff --git a/lambda_agent_api/client.py b/lambda_agent_api/client.py index ca41435..df672c0 100644 --- a/lambda_agent_api/client.py +++ b/lambda_agent_api/client.py @@ -19,12 +19,6 @@ class MsgUserMessage(BaseModel): """ Текст сообщения. """ - attachments: list[str] = Field(default_factory=list) - """ - Список вложений (файлов) к сообщению. - Передается путь до файла относительно /workspace . - Файлы уже должны быть загружены в директорию. - """ ClientMessage = TypeAdapter(Annotated[ diff --git a/lambda_agent_api/server.py b/lambda_agent_api/server.py index c0f01df..e5882f7 100644 --- a/lambda_agent_api/server.py +++ b/lambda_agent_api/server.py @@ -4,18 +4,10 @@ from typing import Literal, Annotated, Union, Any, Dict, Optional __all__ = [ - "EServerMessage", - "MsgStatus", - "MsgError", - "MsgGracefulDisconnect", - "MsgEventTextChunk", - "MsgEventToolCallChunk", - "MsgEventToolResult", - "MsgEventCustomUpdate", - "MsgEventSendFile", - "MsgEventEnd", - "AgentEventUnion", - "ServerMessage", + 'EServerMessage', 'MsgStatus', 'MsgError', 'MsgGracefulDisconnect', + 'MsgEventTextChunk', 'MsgEventToolCallChunk', 'MsgEventToolResult', + 'MsgEventCustomUpdate', 'MsgEventEnd', + 'AgentEventUnion', 'ServerMessage' ] @@ -27,21 +19,18 @@ class EServerMessage(str, Enum): # Ивенты агента AGENT_EVENT_TEXT_CHUNK = "AGENT_EVENT_TEXT_CHUNK" AGENT_EVENT_TOOL_CALL_CHUNK = "AGENT_EVENT_TOOL_CALL_CHUNK" # Новое - AGENT_EVENT_TOOL_RESULT = "AGENT_EVENT_TOOL_RESULT" # Новоеы - AGENT_EVENT_CUSTOM_UPDATE = "AGENT_EVENT_CUSTOM_UPDATE" # Новое - AGENT_EVENT_SEND_FILE = "AGENT_EVENT_SEND_FILE" # Новое + AGENT_EVENT_TOOL_RESULT = "AGENT_EVENT_TOOL_RESULT" # Новоеы + AGENT_EVENT_CUSTOM_UPDATE = "AGENT_EVENT_CUSTOM_UPDATE" # Новое AGENT_EVENT_END = "AGENT_EVENT_END" class MsgStatus(BaseModel): """Отправляется сервером при открытии соединения с клиентом.""" - type: Literal[EServerMessage.STATUS] = EServerMessage.STATUS class MsgError(BaseModel): """Неопределенная ошибка в работе агента.""" - type: Literal[EServerMessage.ERROR] = EServerMessage.ERROR code: str details: str @@ -49,23 +38,16 @@ class MsgError(BaseModel): class MsgGracefulDisconnect(BaseModel): """Отправляется перед завершением работы контейнера с агентом.""" - - type: Literal[EServerMessage.GRACEFUL_DISCONNECT] = ( - EServerMessage.GRACEFUL_DISCONNECT - ) + type: Literal[EServerMessage.GRACEFUL_DISCONNECT] = EServerMessage.GRACEFUL_DISCONNECT # ------------------------------------------------------------------ # AGENT EVENTS (События генерации) # ------------------------------------------------------------------ - class MsgEventTextChunk(BaseModel): """Чанк текста ответа агента.""" - - type: Literal[EServerMessage.AGENT_EVENT_TEXT_CHUNK] = ( - EServerMessage.AGENT_EVENT_TEXT_CHUNK - ) + type: Literal[EServerMessage.AGENT_EVENT_TEXT_CHUNK] = EServerMessage.AGENT_EVENT_TEXT_CHUNK text: str # Новое: "main" (главный агент) или "tools:..." (субагент, если будем использовать) source: str = "main" # пока везде будет main @@ -73,23 +55,17 @@ class MsgEventTextChunk(BaseModel): class MsgEventToolCallChunk(BaseModel): """Агент решил использовать инструмент и генерирует аргументы.""" - - type: Literal[EServerMessage.AGENT_EVENT_TOOL_CALL_CHUNK] = ( - EServerMessage.AGENT_EVENT_TOOL_CALL_CHUNK - ) + type: Literal[EServerMessage.AGENT_EVENT_TOOL_CALL_CHUNK] = EServerMessage.AGENT_EVENT_TOOL_CALL_CHUNK tool_name: Optional[str] = Field( - None, description="Имя инструмента (приходит обычно в первом чанке)" - ) - args_chunk: Optional[str] = Field(None, description="Кусок JSON-аргументов") + None, description="Имя инструмента (приходит обычно в первом чанке)") + args_chunk: Optional[str] = Field( + None, description="Кусок JSON-аргументов") source: str = "main" class MsgEventToolResult(BaseModel): """Инструмент отработал и вернул результат.""" - - type: Literal[EServerMessage.AGENT_EVENT_TOOL_RESULT] = ( - EServerMessage.AGENT_EVENT_TOOL_RESULT - ) + type: Literal[EServerMessage.AGENT_EVENT_TOOL_RESULT] = EServerMessage.AGENT_EVENT_TOOL_RESULT tool_name: str result: Any # Может быть строкой, словарем или списком source: str = "main" @@ -97,28 +73,14 @@ class MsgEventToolResult(BaseModel): class MsgEventCustomUpdate(BaseModel): """Кастомный прогресс (например, скачивание файла) изнутри инструмента.""" - - type: Literal[EServerMessage.AGENT_EVENT_CUSTOM_UPDATE] = ( - EServerMessage.AGENT_EVENT_CUSTOM_UPDATE - ) + type: Literal[EServerMessage.AGENT_EVENT_CUSTOM_UPDATE] = EServerMessage.AGENT_EVENT_CUSTOM_UPDATE payload: Dict[str, Any] = Field( - ..., description="Любые данные о прогрессе (status, progress и т.д.)" - ) + ..., description="Любые данные о прогрессе (status, progress и т.д.)") source: str = "main" -class MsgEventSendFile(BaseModel): - """Агент отправляет файл пользователю.""" - - type: Literal[EServerMessage.AGENT_EVENT_SEND_FILE] = ( - EServerMessage.AGENT_EVENT_SEND_FILE - ) - path: str = Field(..., description="Путь к файлу относительно /workspace") - - class MsgEventEnd(BaseModel): """Агент закончил генерацию ответа.""" - type: Literal[EServerMessage.AGENT_EVENT_END] = EServerMessage.AGENT_EVENT_END tokens_used: int @@ -133,24 +95,25 @@ AgentEventUnion = Union[ MsgEventToolCallChunk, MsgEventToolResult, MsgEventCustomUpdate, - MsgEventSendFile, - MsgEventEnd, + MsgEventEnd ] -# ServerMessage использует AgentEventUnion + остальные типы -ServerMessage = TypeAdapter( - Annotated[ - Union[ - MsgStatus, - MsgError, - MsgGracefulDisconnect, - AgentEventUnion, - ], - Field(discriminator="type"), - ] -) +# Обновлено: добавили новые модели в Union адаптера +ServerMessage = TypeAdapter(Annotated[ + Union[ + MsgStatus, + MsgError, + MsgGracefulDisconnect, + MsgEventTextChunk, + MsgEventToolCallChunk, + MsgEventToolResult, + MsgEventCustomUpdate, + MsgEventEnd + ], + Field(discriminator="type") +]) """ -TypeAdapter для десериализации всех входящих сообщений (от сервера). +Объединяет все типы исходящих сообщений в одно для удобной автоматической десериализации. Pydantic сам определит нужный тип в зависимости от поля `type`. Использование: msg = ServerMessage.validate_json(json_str) diff --git a/tests/manual.py b/tests/manual.py index 0c7c657..a39802b 100644 --- a/tests/manual.py +++ b/tests/manual.py @@ -1,58 +1,33 @@ import asyncio import traceback -from lambda_agent_api.agent_api import AgentApi, AgentBusyException -from lambda_agent_api.server import ( - MsgEventTextChunk, - MsgEventToolCallChunk, - MsgEventToolResult, - MsgEventSendFile, -) +from lambda_agent_api.agent_api import AgentApi +from lambda_agent_api.server import MsgEventTextChunk, MsgEventToolCallChunk, MsgEventToolResult + async def main(): - chat_id = input("Chat id: ") or 0 - api = AgentApi("agent-1", "ws://localhost:8000/", chat_id=chat_id) - - try: - await api.connect() - except AgentBusyException: - print(f"Чат {chat_id} занят другим клиентом") - return + api = AgentApi("agent-1", "ws://localhost:8000/agent_ws/") + await api.connect() while True: try: prompt = await asyncio.get_event_loop().run_in_executor(None, input, ">>> ") - attachments_input = await asyncio.get_event_loop().run_in_executor( - None, input, "Attachments (comma-separated, empty for none): " - ) - attachments = ( - [a.strip() for a in attachments_input.split(",") if a.strip()] - if attachments_input.strip() - else None - ) - print("Agent: ", end="") is_tool = False - async for chunk in api.send_message(prompt, attachments): + async for chunk in api.send_message(prompt): match chunk: case MsgEventTextChunk(): is_tool = False print(chunk.text, end="", flush=True) case MsgEventToolCallChunk(): if not is_tool: - print( - f"\n\n### TOOL CALL: ({chunk.tool_name}) ", - end="", - flush=True, - ) + print(f"\n\n### TOOL CALL: ({chunk.tool_name}) ", end="", flush=True) is_tool = True print(chunk.args_chunk, end="", flush=True) case MsgEventToolResult(): is_tool = False print(f"\nResult: {chunk.result}\n\n", end="", flush=True) - case MsgEventSendFile(): - print(f"\n[SEND FILE] {chunk.path}\n", end="", flush=True) print("\n") except KeyboardInterrupt: @@ -63,4 +38,4 @@ async def main(): await api.close() -asyncio.run(main()) +asyncio.run(main()) \ No newline at end of file