From 0cf3117fde29a328a8de9adb830e1d3f9fec0834 Mon Sep 17 00:00:00 2001 From: MrKan Date: Sun, 19 Apr 2026 21:14:19 +0300 Subject: [PATCH] =?UTF-8?q?=D0=BE=D0=B1=D1=80=D0=B0=D0=B1=D0=BE=D1=82?= =?UTF-8?q?=D0=BA=D0=B0=20=D0=BD=D0=BE=D0=B2=D0=BE=D0=B9=20=D0=BC=D0=BE?= =?UTF-8?q?=D0=B4=D0=B5=D0=BB=D0=B8,=20=D0=B0=D0=BA=D1=82=D1=83=D0=B0?= =?UTF-8?q?=D0=BB=D1=8C=D0=BD=D1=8B=D0=B9=20manual.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lambda_agent_api/agent_api.py | 95 ++++++++++++++++++++--------------- tests/manual.py | 18 +++++-- 2 files changed, 68 insertions(+), 45 deletions(-) diff --git a/lambda_agent_api/agent_api.py b/lambda_agent_api/agent_api.py index e22a0b8..9a82992 100644 --- a/lambda_agent_api/agent_api.py +++ b/lambda_agent_api/agent_api.py @@ -30,8 +30,8 @@ class AgentApi: agent_id: str, base_url: str, callback: Optional[Callable[[ServerMessage], None]] = None, - on_disconnect: Optional[Callable[['AgentApi'], None]] = None, - chat_id: int = 0 # значение по умолчанию для обратной совместимости + on_disconnect: Optional[Callable[["AgentApi"], None]] = None, + chat_id: int = 0, # значение по умолчанию для обратной совместимости ): self.id = agent_id # ID агента для словаря self.chat_id = chat_id @@ -67,10 +67,12 @@ class AgentApi: logger.info(f"Agent {self.id} is ready") else: raise AgentException( - "INVALID_STATUS", f"Expected SM.Status, got {status_msg.type}") + "INVALID_STATUS", f"Expected SM.Status, got {status_msg.type}" + ) else: - raise AgentException("UNEXPECTED_MSG_TYPE", - f"Unexpected message type: {msg.type}") + raise AgentException( + "UNEXPECTED_MSG_TYPE", f"Unexpected message type: {msg.type}" + ) self._connected = True self._listen_task = asyncio.create_task(self._listen()) @@ -83,7 +85,8 @@ class AgentApi: await self._session.close() raise AgentException( - "TIMEOUT", "Agent did not send initial Status message within 5 seconds") from e + "TIMEOUT", "Agent did not send initial Status message within 5 seconds" + ) from e except AgentException: # Перехватываем наши собственные ошибки (INVALID_STATUS, UNEXPECTED_MSG_TYPE), @@ -94,7 +97,9 @@ class AgentApi: await self._session.close() raise - except WSServerHandshakeError as e: # если при открытии подключения сервер вернул какую-то ошибку + except ( + WSServerHandshakeError + ) as e: # если при открытии подключения сервер вернул какую-то ошибку if self._session and not self._session.closed: await self._session.close() @@ -108,10 +113,14 @@ class AgentApi: # поэтому скорее всего, если мы получили WSServerHandshakeError с 403, то это внутренний ChatBusyError if e.status != 403: # обрабатываем как обычную ошибку WS по примеру except блока ниже - raise AgentException(code="CONNECTION_ERROR", - details=f"Failed to connect agent {self.id}: {e}") from e + raise AgentException( + code="CONNECTION_ERROR", + details=f"Failed to connect agent {self.id}: {e}", + ) from e - raise AgentBusyException(f"Chat {self.chat_id} is already in use by other client") + raise AgentBusyException( + f"Chat {self.chat_id} is already in use by other client" + ) except Exception as e: # Обработка всех остальных ошибок (например, aiohttp.ClientConnectionError) @@ -121,8 +130,10 @@ class AgentApi: await self._session.close() # Можно оставить RuntimeError, а можно тоже завернуть в AgentException - raise AgentException(code="CONNECTION_ERROR", - details=f"Failed to connect agent {self.id}: {e}") from e + raise AgentException( + code="CONNECTION_ERROR", + details=f"Failed to connect agent {self.id}: {e}", + ) from e async def close(self): """Явное ручное закрытие соединения.""" @@ -161,17 +172,17 @@ class AgentApi: async def send_message(self, text: str) -> AsyncIterator[AgentEventUnion]: """ - Нативный асинхронный генератор. + Нативный асинхронный генератор. Не требует отдельного класса ResponseIterator. Гарантированно освобождает блокировку. """ if not self._connected or not self._ws: - raise AgentException(code="NOT_CONNECTED", - details="Not connected. Call connect() first.") + raise AgentException( + code="NOT_CONNECTED", details="Not connected. Call connect() first." + ) if self._request_lock.locked(): - raise AgentBusyException( - "Agent is currently processing another request") + raise AgentBusyException("Agent is currently processing another request") # Блокируем параллельные запросы # если идет стриминг ответа, то при попытки отправить новое сообщение будет ошибка - ее рейзим(делаем AgentBusyError) @@ -180,10 +191,7 @@ class AgentApi: try: self._current_queue = asyncio.Queue() - message = MsgUserMessage( - type=EClientMessage.USER_MESSAGE, - text=text - ) + message = MsgUserMessage(type=EClientMessage.USER_MESSAGE, text=text) await self._ws.send_str(message.model_dump_json()) logger.debug(f"[{self.id}] Sent message: {text[:50]}...") @@ -226,7 +234,8 @@ class AgentApi: # Если это исключение, просто логируем, в коллбек его кидать не стоит if isinstance(orphan_msg, Exception): logger.debug( - f"[{self.id}] Dropped exception from queue during cleanup: {orphan_msg}") + f"[{self.id}] Dropped exception from queue during cleanup: {orphan_msg}" + ) continue # 3. Отправляем "мусорные/осиротевшие" куски в callback @@ -234,7 +243,8 @@ class AgentApi: self.callback(orphan_msg) else: logger.debug( - f"[{self.id}] Dropped orphaned message during cleanup") + f"[{self.id}] Dropped orphaned message during cleanup" + ) except asyncio.QueueEmpty: break @@ -244,34 +254,41 @@ class AgentApi: self._request_lock.release() async def _listen(self): - """" + """ " Прослушивание вебсокета. """ try: async for msg in self._ws: if msg.type == aiohttp.WSMsgType.TEXT: try: - outgoing_msg = ServerMessage.validate_json( - msg.data) + outgoing_msg = ServerMessage.validate_json(msg.data) - if isinstance(outgoing_msg, (MsgEventTextChunk, - MsgEventToolCallChunk, - MsgEventToolResult, - MsgEventCustomUpdate, - MsgEventEnd)): + if isinstance( + outgoing_msg, + ( + MsgEventTextChunk, + MsgEventToolCallChunk, + MsgEventToolResult, + MsgEventCustomUpdate, + MsgEventSendFile, + MsgEventEnd, + ), + ): if self._current_queue: await self._current_queue.put(outgoing_msg) elif self.callback: self.callback(outgoing_msg) else: logger.warning( - f"[{self.id}] AgentEvent without active request") + f"[{self.id}] AgentEvent without active request" + ) elif isinstance(outgoing_msg, MsgError): if self.callback: self.callback(outgoing_msg) error = AgentException( - outgoing_msg.code, outgoing_msg.details) + outgoing_msg.code, outgoing_msg.details + ) logger.error(f"[{self.id}] Agent error: {error}") if self._current_queue: await self._current_queue.put(error) @@ -279,8 +296,7 @@ class AgentApi: elif isinstance(outgoing_msg, MsgGracefulDisconnect): if self.callback: self.callback(outgoing_msg) - logger.info( - f"[{self.id}] Gracefully disconnecting") + logger.info(f"[{self.id}] Gracefully disconnecting") break # Выход из цикла приведет к finally -> _cleanup else: @@ -292,17 +308,14 @@ class AgentApi: self.callback(outgoing_msg) except Exception as e: - logger.error( - f"[{self.id}] Failed to deserialize message: {e}") + logger.error(f"[{self.id}] Failed to deserialize message: {e}") if self._current_queue: await self._current_queue.put( - AgentException( - "PARSE_ERROR", f"Validation failed: {e}") + AgentException("PARSE_ERROR", f"Validation failed: {e}") ) elif msg.type in (aiohttp.WSMsgType.ERROR, aiohttp.WSMsgType.CLOSED): - logger.error( - f"[{self.id}] WebSocket closed/error: {msg.type}") + logger.error(f"[{self.id}] WebSocket closed/error: {msg.type}") break except asyncio.CancelledError: diff --git a/tests/manual.py b/tests/manual.py index 10f3005..44adea2 100644 --- a/tests/manual.py +++ b/tests/manual.py @@ -2,8 +2,12 @@ import asyncio import traceback from lambda_agent_api.agent_api import AgentApi, AgentBusyException -from lambda_agent_api.server import MsgEventTextChunk, MsgEventToolCallChunk, MsgEventToolResult - +from lambda_agent_api.server import ( + MsgEventTextChunk, + MsgEventToolCallChunk, + MsgEventToolResult, + MsgEventSendFile, +) async def main(): @@ -28,12 +32,18 @@ async def main(): print(chunk.text, end="", flush=True) case MsgEventToolCallChunk(): if not is_tool: - print(f"\n\n### TOOL CALL: ({chunk.tool_name}) ", end="", flush=True) + print( + f"\n\n### TOOL CALL: ({chunk.tool_name}) ", + end="", + flush=True, + ) is_tool = True print(chunk.args_chunk, end="", flush=True) case MsgEventToolResult(): is_tool = False print(f"\nResult: {chunk.result}\n\n", end="", flush=True) + case MsgEventSendFile(): + print(f"\n[SEND FILE] {chunk.path}\n", end="", flush=True) print("\n") except KeyboardInterrupt: @@ -44,4 +54,4 @@ async def main(): await api.close() -asyncio.run(main()) \ No newline at end of file +asyncio.run(main())