Compare commits

..

No commits in common. "483a63b99962ec6668d9ceb125eaf35deaf66cf4" and "aac37db672ed60d614f90e746514e3e42903c214" have entirely different histories.

4 changed files with 73 additions and 144 deletions

View file

@ -8,7 +8,6 @@ WebSocket API SDK для взаимодействия с AI-агентом.
Раньше: `AgentAPI(url="ws://localhost:8000/agent_ws/")`. Сейчас: `AgentAPI(base_url="ws://localhost:8000/")` Раньше: `AgentAPI(url="ws://localhost:8000/agent_ws/")`. Сейчас: `AgentAPI(base_url="ws://localhost:8000/")`
- Добавлен параметр `chat_id` в конструктор `AgentAPI`. Нужен для разделения истории сообщений по чатам/веткам. - Добавлен параметр `chat_id` в конструктор `AgentAPI`. Нужен для разделения истории сообщений по чатам/веткам.
- `AgentAPI.connect()` вызывает `AgentBusyException`, если выбранный чат уже занят другим API клиентом. - `AgentAPI.connect()` вызывает `AgentBusyException`, если выбранный чат уже занят другим API клиентом.
- Добавлен новый тип события `MsgEventSendFile` для отправки файлов пользователю. Поле `path` — путь к файлу относительно `/workspace`.
## Установка ## Установка
В `master` всегда будет актуальная рабочая версия. В `master` всегда будет актуальная рабочая версия.
@ -165,22 +164,6 @@ async def on_telegram_message(from_user: int, text: str):
| payload | object | Любые данные о прогрессе | | payload | object | Любые данные о прогрессе |
| source | string | Источник события (по умолчанию "main") | | source | string | Источник события (по умолчанию "main") |
#### AGENT_EVENT_SEND_FILE
Агент отправляет файл пользователю. Путь к файлу относительно `/workspace`.
```json
{
"type": "AGENT_EVENT_SEND_FILE",
"path": "reports/2024/report.pdf"
}
```
| Поле | Тип | Описание |
|--------|--------|-----------------------------------------------|
| type | string | Всегда `AGENT_EVENT_SEND_FILE` |
| path | string | Путь к файлу относительно `/workspace` |
#### AGENT_EVENT_END #### AGENT_EVENT_END
Агент закончил генерацию ответа. Агент закончил генерацию ответа.

View file

@ -30,8 +30,8 @@ class AgentApi:
agent_id: str, agent_id: str,
base_url: str, base_url: str,
callback: Optional[Callable[[ServerMessage], None]] = None, callback: Optional[Callable[[ServerMessage], None]] = None,
on_disconnect: Optional[Callable[["AgentApi"], None]] = None, on_disconnect: Optional[Callable[['AgentApi'], None]] = None,
chat_id: int = 0, # значение по умолчанию для обратной совместимости # значение по умолчанию для обратной совместимости chat_id: int = 0 # значение по умолчанию для обратной совместимости
): ):
self.id = agent_id # ID агента для словаря self.id = agent_id # ID агента для словаря
self.chat_id = chat_id self.chat_id = chat_id
@ -67,12 +67,10 @@ class AgentApi:
logger.info(f"Agent {self.id} is ready") logger.info(f"Agent {self.id} is ready")
else: else:
raise AgentException( raise AgentException(
"INVALID_STATUS", f"Expected SM.Status, got {status_msg.type}" "INVALID_STATUS", f"Expected SM.Status, got {status_msg.type}")
)
else: else:
raise AgentException( raise AgentException("UNEXPECTED_MSG_TYPE",
"UNEXPECTED_MSG_TYPE", f"Unexpected message type: {msg.type}" f"Unexpected message type: {msg.type}")
)
self._connected = True self._connected = True
self._listen_task = asyncio.create_task(self._listen()) self._listen_task = asyncio.create_task(self._listen())
@ -85,8 +83,7 @@ class AgentApi:
await self._session.close() await self._session.close()
raise AgentException( raise AgentException(
"TIMEOUT", "Agent did not send initial Status message within 5 seconds" "TIMEOUT", "Agent did not send initial Status message within 5 seconds") from e
) from e
except AgentException: except AgentException:
# Перехватываем наши собственные ошибки (INVALID_STATUS, UNEXPECTED_MSG_TYPE), # Перехватываем наши собственные ошибки (INVALID_STATUS, UNEXPECTED_MSG_TYPE),
@ -124,10 +121,8 @@ class AgentApi:
await self._session.close() await self._session.close()
# Можно оставить RuntimeError, а можно тоже завернуть в AgentException # Можно оставить RuntimeError, а можно тоже завернуть в AgentException
raise AgentException( raise AgentException(code="CONNECTION_ERROR",
code="CONNECTION_ERROR", details=f"Failed to connect agent {self.id}: {e}") from e
details=f"Failed to connect agent {self.id}: {e}",
) from e
async def close(self): async def close(self):
"""Явное ручное закрытие соединения.""" """Явное ручное закрытие соединения."""
@ -171,12 +166,12 @@ class AgentApi:
Гарантированно освобождает блокировку. Гарантированно освобождает блокировку.
""" """
if not self._connected or not self._ws: if not self._connected or not self._ws:
raise AgentException( raise AgentException(code="NOT_CONNECTED",
code="NOT_CONNECTED", details="Not connected. Call connect() first." details="Not connected. Call connect() first.")
)
if self._request_lock.locked(): if self._request_lock.locked():
raise AgentBusyException("Agent is currently processing another request") raise AgentBusyException(
"Agent is currently processing another request")
# Блокируем параллельные запросы # Блокируем параллельные запросы
# если идет стриминг ответа, то при попытки отправить новое сообщение будет ошибка - ее рейзим(делаем AgentBusyError) # если идет стриминг ответа, то при попытки отправить новое сообщение будет ошибка - ее рейзим(делаем AgentBusyError)
@ -185,7 +180,10 @@ class AgentApi:
try: try:
self._current_queue = asyncio.Queue() self._current_queue = asyncio.Queue()
message = MsgUserMessage(type=EClientMessage.USER_MESSAGE, text=text) message = MsgUserMessage(
type=EClientMessage.USER_MESSAGE,
text=text
)
await self._ws.send_str(message.model_dump_json()) await self._ws.send_str(message.model_dump_json())
logger.debug(f"[{self.id}] Sent message: {text[:50]}...") logger.debug(f"[{self.id}] Sent message: {text[:50]}...")
@ -228,8 +226,7 @@ class AgentApi:
# Если это исключение, просто логируем, в коллбек его кидать не стоит # Если это исключение, просто логируем, в коллбек его кидать не стоит
if isinstance(orphan_msg, Exception): if isinstance(orphan_msg, Exception):
logger.debug( logger.debug(
f"[{self.id}] Dropped exception from queue during cleanup: {orphan_msg}" f"[{self.id}] Dropped exception from queue during cleanup: {orphan_msg}")
)
continue continue
# 3. Отправляем "мусорные/осиротевшие" куски в callback # 3. Отправляем "мусорные/осиротевшие" куски в callback
@ -237,8 +234,7 @@ class AgentApi:
self.callback(orphan_msg) self.callback(orphan_msg)
else: else:
logger.debug( logger.debug(
f"[{self.id}] Dropped orphaned message during cleanup" f"[{self.id}] Dropped orphaned message during cleanup")
)
except asyncio.QueueEmpty: except asyncio.QueueEmpty:
break break
@ -255,34 +251,27 @@ class AgentApi:
async for msg in self._ws: async for msg in self._ws:
if msg.type == aiohttp.WSMsgType.TEXT: if msg.type == aiohttp.WSMsgType.TEXT:
try: try:
outgoing_msg = ServerMessage.validate_json(msg.data) outgoing_msg = ServerMessage.validate_json(
msg.data)
if isinstance( if isinstance(outgoing_msg, (MsgEventTextChunk,
outgoing_msg,
(
MsgEventTextChunk,
MsgEventToolCallChunk, MsgEventToolCallChunk,
MsgEventToolResult, MsgEventToolResult,
MsgEventCustomUpdate, MsgEventCustomUpdate,
MsgEventSendFile, MsgEventEnd)):
MsgEventEnd,
),
):
if self._current_queue: if self._current_queue:
await self._current_queue.put(outgoing_msg) await self._current_queue.put(outgoing_msg)
elif self.callback: elif self.callback:
self.callback(outgoing_msg) self.callback(outgoing_msg)
else: else:
logger.warning( logger.warning(
f"[{self.id}] AgentEvent without active request" f"[{self.id}] AgentEvent without active request")
)
elif isinstance(outgoing_msg, MsgError): elif isinstance(outgoing_msg, MsgError):
if self.callback: if self.callback:
self.callback(outgoing_msg) self.callback(outgoing_msg)
error = AgentException( error = AgentException(
outgoing_msg.code, outgoing_msg.details outgoing_msg.code, outgoing_msg.details)
)
logger.error(f"[{self.id}] Agent error: {error}") logger.error(f"[{self.id}] Agent error: {error}")
if self._current_queue: if self._current_queue:
await self._current_queue.put(error) await self._current_queue.put(error)
@ -290,7 +279,8 @@ class AgentApi:
elif isinstance(outgoing_msg, MsgGracefulDisconnect): elif isinstance(outgoing_msg, MsgGracefulDisconnect):
if self.callback: if self.callback:
self.callback(outgoing_msg) self.callback(outgoing_msg)
logger.info(f"[{self.id}] Gracefully disconnecting") logger.info(
f"[{self.id}] Gracefully disconnecting")
break # Выход из цикла приведет к finally -> _cleanup break # Выход из цикла приведет к finally -> _cleanup
else: else:
@ -302,14 +292,17 @@ class AgentApi:
self.callback(outgoing_msg) self.callback(outgoing_msg)
except Exception as e: except Exception as e:
logger.error(f"[{self.id}] Failed to deserialize message: {e}") logger.error(
f"[{self.id}] Failed to deserialize message: {e}")
if self._current_queue: if self._current_queue:
await self._current_queue.put( await self._current_queue.put(
AgentException("PARSE_ERROR", f"Validation failed: {e}") AgentException(
"PARSE_ERROR", f"Validation failed: {e}")
) )
elif msg.type in (aiohttp.WSMsgType.ERROR, aiohttp.WSMsgType.CLOSED): elif msg.type in (aiohttp.WSMsgType.ERROR, aiohttp.WSMsgType.CLOSED):
logger.error(f"[{self.id}] WebSocket closed/error: {msg.type}") logger.error(
f"[{self.id}] WebSocket closed/error: {msg.type}")
break break
except asyncio.CancelledError: except asyncio.CancelledError:

View file

@ -4,18 +4,10 @@ from typing import Literal, Annotated, Union, Any, Dict, Optional
__all__ = [ __all__ = [
"EServerMessage", 'EServerMessage', 'MsgStatus', 'MsgError', 'MsgGracefulDisconnect',
"MsgStatus", 'MsgEventTextChunk', 'MsgEventToolCallChunk', 'MsgEventToolResult',
"MsgError", 'MsgEventCustomUpdate', 'MsgEventEnd',
"MsgGracefulDisconnect", 'AgentEventUnion', 'ServerMessage'
"MsgEventTextChunk",
"MsgEventToolCallChunk",
"MsgEventToolResult",
"MsgEventCustomUpdate",
"MsgEventSendFile",
"MsgEventEnd",
"AgentEventUnion",
"ServerMessage",
] ]
@ -29,19 +21,16 @@ class EServerMessage(str, Enum):
AGENT_EVENT_TOOL_CALL_CHUNK = "AGENT_EVENT_TOOL_CALL_CHUNK" # Новое AGENT_EVENT_TOOL_CALL_CHUNK = "AGENT_EVENT_TOOL_CALL_CHUNK" # Новое
AGENT_EVENT_TOOL_RESULT = "AGENT_EVENT_TOOL_RESULT" # Новоеы AGENT_EVENT_TOOL_RESULT = "AGENT_EVENT_TOOL_RESULT" # Новоеы
AGENT_EVENT_CUSTOM_UPDATE = "AGENT_EVENT_CUSTOM_UPDATE" # Новое AGENT_EVENT_CUSTOM_UPDATE = "AGENT_EVENT_CUSTOM_UPDATE" # Новое
AGENT_EVENT_SEND_FILE = "AGENT_EVENT_SEND_FILE" # Новое
AGENT_EVENT_END = "AGENT_EVENT_END" AGENT_EVENT_END = "AGENT_EVENT_END"
class MsgStatus(BaseModel): class MsgStatus(BaseModel):
"""Отправляется сервером при открытии соединения с клиентом.""" """Отправляется сервером при открытии соединения с клиентом."""
type: Literal[EServerMessage.STATUS] = EServerMessage.STATUS type: Literal[EServerMessage.STATUS] = EServerMessage.STATUS
class MsgError(BaseModel): class MsgError(BaseModel):
"""Неопределенная ошибка в работе агента.""" """Неопределенная ошибка в работе агента."""
type: Literal[EServerMessage.ERROR] = EServerMessage.ERROR type: Literal[EServerMessage.ERROR] = EServerMessage.ERROR
code: str code: str
details: str details: str
@ -49,23 +38,16 @@ class MsgError(BaseModel):
class MsgGracefulDisconnect(BaseModel): class MsgGracefulDisconnect(BaseModel):
"""Отправляется перед завершением работы контейнера с агентом.""" """Отправляется перед завершением работы контейнера с агентом."""
type: Literal[EServerMessage.GRACEFUL_DISCONNECT] = EServerMessage.GRACEFUL_DISCONNECT
type: Literal[EServerMessage.GRACEFUL_DISCONNECT] = (
EServerMessage.GRACEFUL_DISCONNECT
)
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# AGENT EVENTS (События генерации) # AGENT EVENTS (События генерации)
# ------------------------------------------------------------------ # ------------------------------------------------------------------
class MsgEventTextChunk(BaseModel): class MsgEventTextChunk(BaseModel):
"""Чанк текста ответа агента.""" """Чанк текста ответа агента."""
type: Literal[EServerMessage.AGENT_EVENT_TEXT_CHUNK] = EServerMessage.AGENT_EVENT_TEXT_CHUNK
type: Literal[EServerMessage.AGENT_EVENT_TEXT_CHUNK] = (
EServerMessage.AGENT_EVENT_TEXT_CHUNK
)
text: str text: str
# Новое: "main" (главный агент) или "tools:..." (субагент, если будем использовать) # Новое: "main" (главный агент) или "tools:..." (субагент, если будем использовать)
source: str = "main" # пока везде будет main source: str = "main" # пока везде будет main
@ -73,23 +55,17 @@ class MsgEventTextChunk(BaseModel):
class MsgEventToolCallChunk(BaseModel): class MsgEventToolCallChunk(BaseModel):
"""Агент решил использовать инструмент и генерирует аргументы.""" """Агент решил использовать инструмент и генерирует аргументы."""
type: Literal[EServerMessage.AGENT_EVENT_TOOL_CALL_CHUNK] = EServerMessage.AGENT_EVENT_TOOL_CALL_CHUNK
type: Literal[EServerMessage.AGENT_EVENT_TOOL_CALL_CHUNK] = (
EServerMessage.AGENT_EVENT_TOOL_CALL_CHUNK
)
tool_name: Optional[str] = Field( tool_name: Optional[str] = Field(
None, description="Имя инструмента (приходит обычно в первом чанке)" None, description="Имя инструмента (приходит обычно в первом чанке)")
) args_chunk: Optional[str] = Field(
args_chunk: Optional[str] = Field(None, description="Кусок JSON-аргументов") None, description="Кусок JSON-аргументов")
source: str = "main" source: str = "main"
class MsgEventToolResult(BaseModel): class MsgEventToolResult(BaseModel):
"""Инструмент отработал и вернул результат.""" """Инструмент отработал и вернул результат."""
type: Literal[EServerMessage.AGENT_EVENT_TOOL_RESULT] = EServerMessage.AGENT_EVENT_TOOL_RESULT
type: Literal[EServerMessage.AGENT_EVENT_TOOL_RESULT] = (
EServerMessage.AGENT_EVENT_TOOL_RESULT
)
tool_name: str tool_name: str
result: Any # Может быть строкой, словарем или списком result: Any # Может быть строкой, словарем или списком
source: str = "main" source: str = "main"
@ -97,28 +73,14 @@ class MsgEventToolResult(BaseModel):
class MsgEventCustomUpdate(BaseModel): class MsgEventCustomUpdate(BaseModel):
"""Кастомный прогресс (например, скачивание файла) изнутри инструмента.""" """Кастомный прогресс (например, скачивание файла) изнутри инструмента."""
type: Literal[EServerMessage.AGENT_EVENT_CUSTOM_UPDATE] = EServerMessage.AGENT_EVENT_CUSTOM_UPDATE
type: Literal[EServerMessage.AGENT_EVENT_CUSTOM_UPDATE] = (
EServerMessage.AGENT_EVENT_CUSTOM_UPDATE
)
payload: Dict[str, Any] = Field( payload: Dict[str, Any] = Field(
..., description="Любые данные о прогрессе (status, progress и т.д.)" ..., description="Любые данные о прогрессе (status, progress и т.д.)")
)
source: str = "main" source: str = "main"
class MsgEventSendFile(BaseModel):
"""Агент отправляет файл пользователю."""
type: Literal[EServerMessage.AGENT_EVENT_SEND_FILE] = (
EServerMessage.AGENT_EVENT_SEND_FILE
)
path: str = Field(..., description="Путь к файлу относительно /workspace")
class MsgEventEnd(BaseModel): class MsgEventEnd(BaseModel):
"""Агент закончил генерацию ответа.""" """Агент закончил генерацию ответа."""
type: Literal[EServerMessage.AGENT_EVENT_END] = EServerMessage.AGENT_EVENT_END type: Literal[EServerMessage.AGENT_EVENT_END] = EServerMessage.AGENT_EVENT_END
tokens_used: int tokens_used: int
@ -133,24 +95,25 @@ AgentEventUnion = Union[
MsgEventToolCallChunk, MsgEventToolCallChunk,
MsgEventToolResult, MsgEventToolResult,
MsgEventCustomUpdate, MsgEventCustomUpdate,
MsgEventSendFile, MsgEventEnd
MsgEventEnd,
] ]
# ServerMessage использует AgentEventUnion + остальные типы # Обновлено: добавили новые модели в Union адаптера
ServerMessage = TypeAdapter( ServerMessage = TypeAdapter(Annotated[
Annotated[
Union[ Union[
MsgStatus, MsgStatus,
MsgError, MsgError,
MsgGracefulDisconnect, MsgGracefulDisconnect,
AgentEventUnion, MsgEventTextChunk,
MsgEventToolCallChunk,
MsgEventToolResult,
MsgEventCustomUpdate,
MsgEventEnd
], ],
Field(discriminator="type"), Field(discriminator="type")
] ])
)
""" """
TypeAdapter для десериализации всех входящих сообщений (от сервера). Объединяет все типы исходящих сообщений в одно для удобной автоматической десериализации.
Pydantic сам определит нужный тип в зависимости от поля `type`. Pydantic сам определит нужный тип в зависимости от поля `type`.
Использование: Использование:
msg = ServerMessage.validate_json(json_str) msg = ServerMessage.validate_json(json_str)

View file

@ -2,12 +2,8 @@ import asyncio
import traceback import traceback
from lambda_agent_api.agent_api import AgentApi, AgentBusyException from lambda_agent_api.agent_api import AgentApi, AgentBusyException
from lambda_agent_api.server import ( from lambda_agent_api.server import MsgEventTextChunk, MsgEventToolCallChunk, MsgEventToolResult
MsgEventTextChunk,
MsgEventToolCallChunk,
MsgEventToolResult,
MsgEventSendFile,
)
async def main(): async def main():
@ -32,18 +28,12 @@ async def main():
print(chunk.text, end="", flush=True) print(chunk.text, end="", flush=True)
case MsgEventToolCallChunk(): case MsgEventToolCallChunk():
if not is_tool: if not is_tool:
print( print(f"\n\n### TOOL CALL: ({chunk.tool_name}) ", end="", flush=True)
f"\n\n### TOOL CALL: ({chunk.tool_name}) ",
end="",
flush=True,
)
is_tool = True is_tool = True
print(chunk.args_chunk, end="", flush=True) print(chunk.args_chunk, end="", flush=True)
case MsgEventToolResult(): case MsgEventToolResult():
is_tool = False is_tool = False
print(f"\nResult: {chunk.result}\n\n", end="", flush=True) print(f"\nResult: {chunk.result}\n\n", end="", flush=True)
case MsgEventSendFile():
print(f"\n[SEND FILE] {chunk.path}\n", end="", flush=True)
print("\n") print("\n")
except KeyboardInterrupt: except KeyboardInterrupt: