обработка новой модели, актуальный manual.py

This commit is contained in:
Егор Кандрушин 2026-04-19 21:14:19 +03:00
parent 4c3e7253c7
commit 0cf3117fde
2 changed files with 68 additions and 45 deletions

View file

@ -30,8 +30,8 @@ class AgentApi:
agent_id: str, agent_id: str,
base_url: str, base_url: str,
callback: Optional[Callable[[ServerMessage], None]] = None, callback: Optional[Callable[[ServerMessage], None]] = None,
on_disconnect: Optional[Callable[['AgentApi'], None]] = None, on_disconnect: Optional[Callable[["AgentApi"], None]] = None,
chat_id: int = 0 # значение по умолчанию для обратной совместимости chat_id: int = 0, # значение по умолчанию для обратной совместимости
): ):
self.id = agent_id # ID агента для словаря self.id = agent_id # ID агента для словаря
self.chat_id = chat_id self.chat_id = chat_id
@ -67,10 +67,12 @@ class AgentApi:
logger.info(f"Agent {self.id} is ready") logger.info(f"Agent {self.id} is ready")
else: else:
raise AgentException( raise AgentException(
"INVALID_STATUS", f"Expected SM.Status, got {status_msg.type}") "INVALID_STATUS", f"Expected SM.Status, got {status_msg.type}"
)
else: else:
raise AgentException("UNEXPECTED_MSG_TYPE", raise AgentException(
f"Unexpected message type: {msg.type}") "UNEXPECTED_MSG_TYPE", f"Unexpected message type: {msg.type}"
)
self._connected = True self._connected = True
self._listen_task = asyncio.create_task(self._listen()) self._listen_task = asyncio.create_task(self._listen())
@ -83,7 +85,8 @@ class AgentApi:
await self._session.close() await self._session.close()
raise AgentException( raise AgentException(
"TIMEOUT", "Agent did not send initial Status message within 5 seconds") from e "TIMEOUT", "Agent did not send initial Status message within 5 seconds"
) from e
except AgentException: except AgentException:
# Перехватываем наши собственные ошибки (INVALID_STATUS, UNEXPECTED_MSG_TYPE), # Перехватываем наши собственные ошибки (INVALID_STATUS, UNEXPECTED_MSG_TYPE),
@ -94,7 +97,9 @@ class AgentApi:
await self._session.close() await self._session.close()
raise raise
except WSServerHandshakeError as e: # если при открытии подключения сервер вернул какую-то ошибку except (
WSServerHandshakeError
) as e: # если при открытии подключения сервер вернул какую-то ошибку
if self._session and not self._session.closed: if self._session and not self._session.closed:
await self._session.close() await self._session.close()
@ -108,10 +113,14 @@ class AgentApi:
# поэтому скорее всего, если мы получили WSServerHandshakeError с 403, то это внутренний ChatBusyError # поэтому скорее всего, если мы получили WSServerHandshakeError с 403, то это внутренний ChatBusyError
if e.status != 403: if e.status != 403:
# обрабатываем как обычную ошибку WS по примеру except блока ниже # обрабатываем как обычную ошибку WS по примеру except блока ниже
raise AgentException(code="CONNECTION_ERROR", raise AgentException(
details=f"Failed to connect agent {self.id}: {e}") from e code="CONNECTION_ERROR",
details=f"Failed to connect agent {self.id}: {e}",
) from e
raise AgentBusyException(f"Chat {self.chat_id} is already in use by other client") raise AgentBusyException(
f"Chat {self.chat_id} is already in use by other client"
)
except Exception as e: except Exception as e:
# Обработка всех остальных ошибок (например, aiohttp.ClientConnectionError) # Обработка всех остальных ошибок (например, aiohttp.ClientConnectionError)
@ -121,8 +130,10 @@ class AgentApi:
await self._session.close() await self._session.close()
# Можно оставить RuntimeError, а можно тоже завернуть в AgentException # Можно оставить RuntimeError, а можно тоже завернуть в AgentException
raise AgentException(code="CONNECTION_ERROR", raise AgentException(
details=f"Failed to connect agent {self.id}: {e}") from e code="CONNECTION_ERROR",
details=f"Failed to connect agent {self.id}: {e}",
) from e
async def close(self): async def close(self):
"""Явное ручное закрытие соединения.""" """Явное ручное закрытие соединения."""
@ -161,17 +172,17 @@ class AgentApi:
async def send_message(self, text: str) -> AsyncIterator[AgentEventUnion]: async def send_message(self, text: str) -> AsyncIterator[AgentEventUnion]:
""" """
Нативный асинхронный генератор. Нативный асинхронный генератор.
Не требует отдельного класса ResponseIterator. Не требует отдельного класса ResponseIterator.
Гарантированно освобождает блокировку. Гарантированно освобождает блокировку.
""" """
if not self._connected or not self._ws: if not self._connected or not self._ws:
raise AgentException(code="NOT_CONNECTED", raise AgentException(
details="Not connected. Call connect() first.") code="NOT_CONNECTED", details="Not connected. Call connect() first."
)
if self._request_lock.locked(): if self._request_lock.locked():
raise AgentBusyException( raise AgentBusyException("Agent is currently processing another request")
"Agent is currently processing another request")
# Блокируем параллельные запросы # Блокируем параллельные запросы
# если идет стриминг ответа, то при попытки отправить новое сообщение будет ошибка - ее рейзим(делаем AgentBusyError) # если идет стриминг ответа, то при попытки отправить новое сообщение будет ошибка - ее рейзим(делаем AgentBusyError)
@ -180,10 +191,7 @@ class AgentApi:
try: try:
self._current_queue = asyncio.Queue() self._current_queue = asyncio.Queue()
message = MsgUserMessage( message = MsgUserMessage(type=EClientMessage.USER_MESSAGE, text=text)
type=EClientMessage.USER_MESSAGE,
text=text
)
await self._ws.send_str(message.model_dump_json()) await self._ws.send_str(message.model_dump_json())
logger.debug(f"[{self.id}] Sent message: {text[:50]}...") logger.debug(f"[{self.id}] Sent message: {text[:50]}...")
@ -226,7 +234,8 @@ class AgentApi:
# Если это исключение, просто логируем, в коллбек его кидать не стоит # Если это исключение, просто логируем, в коллбек его кидать не стоит
if isinstance(orphan_msg, Exception): if isinstance(orphan_msg, Exception):
logger.debug( logger.debug(
f"[{self.id}] Dropped exception from queue during cleanup: {orphan_msg}") f"[{self.id}] Dropped exception from queue during cleanup: {orphan_msg}"
)
continue continue
# 3. Отправляем "мусорные/осиротевшие" куски в callback # 3. Отправляем "мусорные/осиротевшие" куски в callback
@ -234,7 +243,8 @@ class AgentApi:
self.callback(orphan_msg) self.callback(orphan_msg)
else: else:
logger.debug( logger.debug(
f"[{self.id}] Dropped orphaned message during cleanup") f"[{self.id}] Dropped orphaned message during cleanup"
)
except asyncio.QueueEmpty: except asyncio.QueueEmpty:
break break
@ -244,34 +254,41 @@ class AgentApi:
self._request_lock.release() self._request_lock.release()
async def _listen(self): async def _listen(self):
"""" """ "
Прослушивание вебсокета. Прослушивание вебсокета.
""" """
try: try:
async for msg in self._ws: async for msg in self._ws:
if msg.type == aiohttp.WSMsgType.TEXT: if msg.type == aiohttp.WSMsgType.TEXT:
try: try:
outgoing_msg = ServerMessage.validate_json( outgoing_msg = ServerMessage.validate_json(msg.data)
msg.data)
if isinstance(outgoing_msg, (MsgEventTextChunk, if isinstance(
MsgEventToolCallChunk, outgoing_msg,
MsgEventToolResult, (
MsgEventCustomUpdate, MsgEventTextChunk,
MsgEventEnd)): MsgEventToolCallChunk,
MsgEventToolResult,
MsgEventCustomUpdate,
MsgEventSendFile,
MsgEventEnd,
),
):
if self._current_queue: if self._current_queue:
await self._current_queue.put(outgoing_msg) await self._current_queue.put(outgoing_msg)
elif self.callback: elif self.callback:
self.callback(outgoing_msg) self.callback(outgoing_msg)
else: else:
logger.warning( logger.warning(
f"[{self.id}] AgentEvent without active request") f"[{self.id}] AgentEvent without active request"
)
elif isinstance(outgoing_msg, MsgError): elif isinstance(outgoing_msg, MsgError):
if self.callback: if self.callback:
self.callback(outgoing_msg) self.callback(outgoing_msg)
error = AgentException( error = AgentException(
outgoing_msg.code, outgoing_msg.details) outgoing_msg.code, outgoing_msg.details
)
logger.error(f"[{self.id}] Agent error: {error}") logger.error(f"[{self.id}] Agent error: {error}")
if self._current_queue: if self._current_queue:
await self._current_queue.put(error) await self._current_queue.put(error)
@ -279,8 +296,7 @@ class AgentApi:
elif isinstance(outgoing_msg, MsgGracefulDisconnect): elif isinstance(outgoing_msg, MsgGracefulDisconnect):
if self.callback: if self.callback:
self.callback(outgoing_msg) self.callback(outgoing_msg)
logger.info( logger.info(f"[{self.id}] Gracefully disconnecting")
f"[{self.id}] Gracefully disconnecting")
break # Выход из цикла приведет к finally -> _cleanup break # Выход из цикла приведет к finally -> _cleanup
else: else:
@ -292,17 +308,14 @@ class AgentApi:
self.callback(outgoing_msg) self.callback(outgoing_msg)
except Exception as e: except Exception as e:
logger.error( logger.error(f"[{self.id}] Failed to deserialize message: {e}")
f"[{self.id}] Failed to deserialize message: {e}")
if self._current_queue: if self._current_queue:
await self._current_queue.put( await self._current_queue.put(
AgentException( AgentException("PARSE_ERROR", f"Validation failed: {e}")
"PARSE_ERROR", f"Validation failed: {e}")
) )
elif msg.type in (aiohttp.WSMsgType.ERROR, aiohttp.WSMsgType.CLOSED): elif msg.type in (aiohttp.WSMsgType.ERROR, aiohttp.WSMsgType.CLOSED):
logger.error( logger.error(f"[{self.id}] WebSocket closed/error: {msg.type}")
f"[{self.id}] WebSocket closed/error: {msg.type}")
break break
except asyncio.CancelledError: except asyncio.CancelledError:

View file

@ -2,8 +2,12 @@ import asyncio
import traceback import traceback
from lambda_agent_api.agent_api import AgentApi, AgentBusyException from lambda_agent_api.agent_api import AgentApi, AgentBusyException
from lambda_agent_api.server import MsgEventTextChunk, MsgEventToolCallChunk, MsgEventToolResult from lambda_agent_api.server import (
MsgEventTextChunk,
MsgEventToolCallChunk,
MsgEventToolResult,
MsgEventSendFile,
)
async def main(): async def main():
@ -28,12 +32,18 @@ async def main():
print(chunk.text, end="", flush=True) print(chunk.text, end="", flush=True)
case MsgEventToolCallChunk(): case MsgEventToolCallChunk():
if not is_tool: if not is_tool:
print(f"\n\n### TOOL CALL: ({chunk.tool_name}) ", end="", flush=True) print(
f"\n\n### TOOL CALL: ({chunk.tool_name}) ",
end="",
flush=True,
)
is_tool = True is_tool = True
print(chunk.args_chunk, end="", flush=True) print(chunk.args_chunk, end="", flush=True)
case MsgEventToolResult(): case MsgEventToolResult():
is_tool = False is_tool = False
print(f"\nResult: {chunk.result}\n\n", end="", flush=True) print(f"\nResult: {chunk.result}\n\n", end="", flush=True)
case MsgEventSendFile():
print(f"\n[SEND FILE] {chunk.path}\n", end="", flush=True)
print("\n") print("\n")
except KeyboardInterrupt: except KeyboardInterrupt:
@ -44,4 +54,4 @@ async def main():
await api.close() await api.close()
asyncio.run(main()) asyncio.run(main())