Merge pull request '#7 Добавить основные модели ивентов агента. Добавлены основные инветы' (#8) from feature/stream-events-schema into master

Reviewed-on: #8
This commit is contained in:
Егор Кандрушин 2026-04-08 20:41:11 +00:00
commit cbba336049
4 changed files with 205 additions and 58 deletions

123
README.md
View file

@ -31,6 +31,14 @@ async def main():
async for chunk in response: async for chunk in response:
if isinstance(chunk, MsgEventTextChunk): if isinstance(chunk, MsgEventTextChunk):
print(chunk.text, end="", flush=True) print(chunk.text, end="", flush=True)
elif isinstance(chunk, MsgEventToolCallChunk):
print(f"Tool call started: {chunk.tool_name}")
elif isinstance(chunk, MsgEventToolResult):
print(f"Tool result: {chunk.result}")
elif isinstance(chunk, MsgEventCustomUpdate):
print(f"Progress update: {chunk.payload}")
elif isinstance(chunk, MsgEventEnd):
print(f"Generation ended, tokens used: {chunk.tokens_used}")
finally: finally:
await api.close() await api.close()
@ -39,6 +47,8 @@ async def main():
asyncio.run(main()) asyncio.run(main())
``` ```
> `AgentApi.send_message()` возвращает стриминг-итерируемый объект, который может выдавать не только текстовые чанки, но и события инструментов (`MsgEventToolCallChunk`, `MsgEventToolResult`, `MsgEventCustomUpdate`) и финальный `MsgEventEnd`.
## Предполагаемое использование ## Предполагаемое использование
```python ```python
@ -105,37 +115,97 @@ async def on_telegram_message(from_user: int, text: str):
} }
``` ```
#### AGENT_EVENT #### AGENT_EVENT_TEXT_CHUNK
Базовый класс для ивентов, которые стримит агент во время генерации ответа. Конкретный класс для ивента определяется по `subtype`.
##### TEXT_CHUNK
Чанк текста ответа агента. Чанк текста ответа агента.
```json ```json
{ {
"type": "AGENT_EVENT", "type": "AGENT_EVENT_TEXT_CHUNK",
"subtype": "TEXT_CHUNK", "text": "Фрагмент текста",
"text": "Фрагмент текста" "source": "main"
} }
``` ```
##### END | Поле | Тип | Описание |
|--------|--------|-----------------------------------------------|
| type | string | Всегда `AGENT_EVENT_TEXT_CHUNK` |
| text | string | Фрагмент текста ответа агента |
| source | string | Источник события (по умолчанию "main") |
#### AGENT_EVENT_TOOL_CALL_CHUNK
Агент решил использовать инструмент и генерирует аргументы.
```json
{
"type": "AGENT_EVENT_TOOL_CALL_CHUNK",
"tool_name": "имя_инструмента",
"args_chunk": "{\"key\": \"value\"}",
"source": "main"
}
```
| Поле | Тип | Описание |
|-------------|---------|-----------------------------------------------|
| type | string | Всегда `AGENT_EVENT_TOOL_CALL_CHUNK` |
| tool_name | string | Имя инструмента (может быть null в первом чанке) |
| args_chunk | string | Кусок JSON-аргументов (может быть null) |
| source | string | Источник события (по умолчанию "main") |
#### AGENT_EVENT_TOOL_RESULT
Инструмент отработал и вернул результат.
```json
{
"type": "AGENT_EVENT_TOOL_RESULT",
"tool_name": "имя_инструмента",
"result": "результат выполнения",
"source": "main"
}
```
| Поле | Тип | Описание |
|------------|--------|-----------------------------------------------|
| type | string | Всегда `AGENT_EVENT_TOOL_RESULT` |
| tool_name | string | Имя инструмента |
| result | any | Результат выполнения (строка, объект или массив) |
| source | string | Источник события (по умолчанию "main") |
#### AGENT_EVENT_CUSTOM_UPDATE
Кастомный прогресс (например, скачивание файла) изнутри инструмента.
```json
{
"type": "AGENT_EVENT_CUSTOM_UPDATE",
"payload": {"status": "in_progress", "progress": 50},
"source": "main"
}
```
| Поле | Тип | Описание |
|----------|-----------------|-----------------------------------------------|
| type | string | Всегда `AGENT_EVENT_CUSTOM_UPDATE` |
| payload | object | Любые данные о прогрессе |
| source | string | Источник события (по умолчанию "main") |
#### AGENT_EVENT_END
Агент закончил генерацию ответа. Агент закончил генерацию ответа.
```json ```json
{ {
"type": "AGENT_EVENT", "type": "AGENT_EVENT_END",
"subtype": "END",
"tokens_used": 42 "tokens_used": 42
} }
``` ```
| Поле | Тип | Описание | | Поле | Тип | Описание |
|-------------|--------|-----------------------| |-------------|--------|-----------------------------------------------|
| tokens_used | int | Количество использованных токенов | | type | string | Всегда `AGENT_EVENT_END` |
| tokens_used | int | Количество использованных токенов |
#### ERROR #### ERROR
@ -164,4 +234,29 @@ async def on_telegram_message(from_user: int, text: str):
} }
``` ```
Неопределенная ошибка в работе агента.
```json
{
"type": "ERROR",
"code": "error_code",
"details": "Описание ошибки"
}
```
| Поле | Тип | Описание |
|---------|-------|----------------|
| code | string | Код ошибки |
| details | string | Подробности |
#### GRACEFUL_DISCONNECT
Отправляется перед завершением работы контейнера с агентом. Например, при долгом бездействии. Нужно, чтобы отделять обрыв соединения из-за ошибки с необходимостью повторного подключения. Приход этого сообщения означает, что агент осознанно завершает работу с клиентом по какой-то причине. Для дальнейшего взаимодействия нужно снова обратиться к мастеру.
```json
{
"type": "GRACEFUL_DISCONNECT"
}
```
![Схема взаимодействия](docs/schema.png) ![Схема взаимодействия](docs/schema.png)

View file

@ -226,20 +226,19 @@ class AgentApi:
outgoing_msg = ServerMessage.validate_json( outgoing_msg = ServerMessage.validate_json(
msg.data) msg.data)
if isinstance(outgoing_msg, MsgEventTextChunk): if isinstance(outgoing_msg, (MsgEventTextChunk,
MsgEventToolCallChunk,
MsgEventToolResult,
MsgEventCustomUpdate,
MsgEventEnd)):
if self._current_queue: if self._current_queue:
await self._current_queue.put(outgoing_msg) await self._current_queue.put(outgoing_msg)
# Если очереди нет (клиент отменил запрос), но токены идут — шлем их в коллбек
elif self.callback: elif self.callback:
self.callback(outgoing_msg) self.callback(outgoing_msg)
else: else:
logger.warning( logger.warning(
f"[{self.id}] AgentEvent without active request") f"[{self.id}] AgentEvent without active request")
elif isinstance(outgoing_msg, MsgEventEnd):
if self._current_queue:
await self._current_queue.put(outgoing_msg)
elif isinstance(outgoing_msg, MsgError): elif isinstance(outgoing_msg, MsgError):
if self.callback: if self.callback:
self.callback(outgoing_msg) self.callback(outgoing_msg)

View file

@ -1,72 +1,120 @@
from pydantic import BaseModel, Field, TypeAdapter from pydantic import BaseModel, Field, TypeAdapter
from enum import Enum from enum import Enum
from typing import Literal, Annotated, Union from typing import Literal, Annotated, Union, Any, Dict, Optional
__all__ = ['EServerMessage', 'MsgStatus', 'MsgError', 'MsgEventTextChunk', 'MsgEventEnd', 'AgentEventUnion', 'ServerMessage'] __all__ = [
'EServerMessage', 'MsgStatus', 'MsgError', 'MsgGracefulDisconnect',
'MsgEventTextChunk', 'MsgEventToolCallChunk', 'MsgEventToolResult',
'MsgEventCustomUpdate', 'MsgEventEnd',
'AgentEventUnion', 'ServerMessage'
]
class EServerMessage(str, Enum): class EServerMessage(str, Enum):
STATUS = "STATUS" STATUS = "STATUS"
ERROR = "ERROR" ERROR = "ERROR"
GRACEFUL_DISCONNECT = "GRACEFUL_DISCONNECT" GRACEFUL_DISCONNECT = "GRACEFUL_DISCONNECT"
# Ивенты агента
AGENT_EVENT_TEXT_CHUNK = "AGENT_EVENT_TEXT_CHUNK" AGENT_EVENT_TEXT_CHUNK = "AGENT_EVENT_TEXT_CHUNK"
AGENT_EVENT_TOOL_CALL_CHUNK = "AGENT_EVENT_TOOL_CALL_CHUNK" # Новое
AGENT_EVENT_TOOL_RESULT = "AGENT_EVENT_TOOL_RESULT" # Новоеы
AGENT_EVENT_CUSTOM_UPDATE = "AGENT_EVENT_CUSTOM_UPDATE" # Новое
AGENT_EVENT_END = "AGENT_EVENT_END" AGENT_EVENT_END = "AGENT_EVENT_END"
class MsgStatus(BaseModel): class MsgStatus(BaseModel):
""" """Отправляется сервером при открытии соединения с клиентом."""
Отправляется сервером при открытии соединения с клиентом.
Будет дополнен информацией о готовности агента принимать сообщения.
"""
type: Literal[EServerMessage.STATUS] = EServerMessage.STATUS type: Literal[EServerMessage.STATUS] = EServerMessage.STATUS
class MsgEventTextChunk(BaseModel):
"""
Чанк текста ответа агента.
"""
type: Literal[EServerMessage.AGENT_EVENT_TEXT_CHUNK] = EServerMessage.AGENT_EVENT_TEXT_CHUNK
text: str
class MsgEventEnd(BaseModel):
"""
Агент закончил генерацию ответа.
"""
type: Literal[EServerMessage.AGENT_EVENT_END] = EServerMessage.AGENT_EVENT_END
tokens_used: int
class MsgError(BaseModel): class MsgError(BaseModel):
""" """Неопределенная ошибка в работе агента."""
Неопределенная ошибка в работе агента.
"""
type: Literal[EServerMessage.ERROR] = EServerMessage.ERROR type: Literal[EServerMessage.ERROR] = EServerMessage.ERROR
code: str code: str
details: str details: str
class MsgGracefulDisconnect(BaseModel): class MsgGracefulDisconnect(BaseModel):
""" """Отправляется перед завершением работы контейнера с агентом."""
Отправляется перед завершением работы контейнера с агентом. Например, при долгом бездействии.
Нужно, чтобы отделять обрыв соединения из-за ошибки с необходимостью повторного подключения.
Приход этого сообщения означает, что агент осознанно завершает работу с клиентом по какой-то причине.
Для дальнейшего взаимодействия нужно снова обратиться к мастеру.
"""
type: Literal[EServerMessage.GRACEFUL_DISCONNECT] = EServerMessage.GRACEFUL_DISCONNECT type: Literal[EServerMessage.GRACEFUL_DISCONNECT] = EServerMessage.GRACEFUL_DISCONNECT
AgentEventUnion = Union[MsgEventTextChunk, MsgEventEnd] # ------------------------------------------------------------------
# AGENT EVENTS (События генерации)
# ------------------------------------------------------------------
class MsgEventTextChunk(BaseModel):
"""Чанк текста ответа агента."""
type: Literal[EServerMessage.AGENT_EVENT_TEXT_CHUNK] = EServerMessage.AGENT_EVENT_TEXT_CHUNK
text: str
# Новое: "main" (главный агент) или "tools:..." (субагент, если будем использовать)
source: str = "main" # пока везде будет main
class MsgEventToolCallChunk(BaseModel):
"""Агент решил использовать инструмент и генерирует аргументы."""
type: Literal[EServerMessage.AGENT_EVENT_TOOL_CALL_CHUNK] = EServerMessage.AGENT_EVENT_TOOL_CALL_CHUNK
tool_name: Optional[str] = Field(
None, description="Имя инструмента (приходит обычно в первом чанке)")
args_chunk: Optional[str] = Field(
None, description="Кусок JSON-аргументов")
source: str = "main"
class MsgEventToolResult(BaseModel):
"""Инструмент отработал и вернул результат."""
type: Literal[EServerMessage.AGENT_EVENT_TOOL_RESULT] = EServerMessage.AGENT_EVENT_TOOL_RESULT
tool_name: str
result: Any # Может быть строкой, словарем или списком
source: str = "main"
class MsgEventCustomUpdate(BaseModel):
"""Кастомный прогресс (например, скачивание файла) изнутри инструмента."""
type: Literal[EServerMessage.AGENT_EVENT_CUSTOM_UPDATE] = EServerMessage.AGENT_EVENT_CUSTOM_UPDATE
payload: Dict[str, Any] = Field(
..., description="Любые данные о прогрессе (status, progress и т.д.)")
source: str = "main"
class MsgEventEnd(BaseModel):
"""Агент закончил генерацию ответа."""
type: Literal[EServerMessage.AGENT_EVENT_END] = EServerMessage.AGENT_EVENT_END
tokens_used: int
# ------------------------------------------------------------------
# UNIONS & ADAPTERS
# ------------------------------------------------------------------
# Обновлено: добавили новые модели в Union
AgentEventUnion = Union[
MsgEventTextChunk,
MsgEventToolCallChunk,
MsgEventToolResult,
MsgEventCustomUpdate,
MsgEventEnd
]
# Обновлено: добавили новые модели в Union адаптера
ServerMessage = TypeAdapter(Annotated[ ServerMessage = TypeAdapter(Annotated[
Union[MsgStatus, MsgEventTextChunk, MsgEventEnd, MsgError, MsgGracefulDisconnect], Union[
MsgStatus,
MsgError,
MsgGracefulDisconnect,
MsgEventTextChunk,
MsgEventToolCallChunk,
MsgEventToolResult,
MsgEventCustomUpdate,
MsgEventEnd
],
Field(discriminator="type") Field(discriminator="type")
]) ])
""" """
Объединяет все типы исходящих сообщений в одно для удобной автоматической десериализации.\n Объединяет все типы исходящих сообщений в одно для удобной автоматической десериализации.
Pydantic сам определит нужный тип в зависимости от поля ``type``.\n Pydantic сам определит нужный тип в зависимости от поля `type`.
Использование:\n Использование:
msg = ServerMessage.model_validate_json(json) msg = ServerMessage.validate_json(json_str)
""" """

View file

@ -33,6 +33,9 @@ def test_client_message_invalid(data):
[ [
({"type": "STATUS"}, MsgStatus), ({"type": "STATUS"}, MsgStatus),
({"type": "AGENT_EVENT_TEXT_CHUNK", "text": "hi"}, MsgEventTextChunk), ({"type": "AGENT_EVENT_TEXT_CHUNK", "text": "hi"}, MsgEventTextChunk),
({"type": "AGENT_EVENT_TOOL_CALL_CHUNK", "tool_name": "search", "args_chunk": "{\"q\": \"hello\"}"}, MsgEventToolCallChunk),
({"type": "AGENT_EVENT_TOOL_RESULT", "tool_name": "search", "result": {"items": [1, 2, 3]}}, MsgEventToolResult),
({"type": "AGENT_EVENT_CUSTOM_UPDATE", "payload": {"status": "in_progress", "progress": 50}}, MsgEventCustomUpdate),
({"type": "AGENT_EVENT_END", "tokens_used": 10}, MsgEventEnd), ({"type": "AGENT_EVENT_END", "tokens_used": 10}, MsgEventEnd),
({"type": "ERROR", "code": "E1", "details": "fail"}, MsgError), ({"type": "ERROR", "code": "E1", "details": "fail"}, MsgError),
({"type": "GRACEFUL_DISCONNECT"}, MsgGracefulDisconnect), ({"type": "GRACEFUL_DISCONNECT"}, MsgGracefulDisconnect),
@ -47,6 +50,8 @@ def test_server_message_valid(data, expected_type):
"data", "data",
[ [
{"type": "AGENT_EVENT_TEXT_CHUNK"}, # нет text {"type": "AGENT_EVENT_TEXT_CHUNK"}, # нет text
{"type": "AGENT_EVENT_TOOL_RESULT", "tool_name": "search"}, # нет result
{"type": "AGENT_EVENT_CUSTOM_UPDATE"}, # нет payload
{"type": "AGENT_EVENT_END"}, # нет tokens_used {"type": "AGENT_EVENT_END"}, # нет tokens_used
{"type": "ERROR", "code": "E1"}, # нет details {"type": "ERROR", "code": "E1"}, # нет details
{"type": "UNKNOWN"}, {"type": "UNKNOWN"},