Compare commits
4 commits
7a2ad86b88
...
0c2884c2b1
| Author | SHA1 | Date | |
|---|---|---|---|
| 0c2884c2b1 | |||
| 569824ead1 | |||
| 4d917ac794 | |||
| 3a3fcdc695 |
10 changed files with 1319 additions and 806 deletions
|
|
@ -70,7 +70,7 @@ surfaces-bot/
|
|||
- **Стабильность** — перед `sync_forever()` бот делает bootstrap sync и стартует с `since`, чтобы не переигрывать старую timeline после рестарта
|
||||
- **Текущее ограничение** — encrypted DM официально не поддержан; ручное тестирование Matrix ведётся в незашифрованных комнатах и зависит от локального state-store бота
|
||||
- **Backend selection** — `MATRIX_PLATFORM_BACKEND=mock` остаётся значением по умолчанию; `MATRIX_PLATFORM_BACKEND=real` использует `platform-agent` из compose и WebSocket contract `/v1/agent_ws/{chat_id}/`
|
||||
- **Ограничения real backend** — локальный runtime использует shared `/workspace`, а файлы передаются как относительные пути в `attachments`
|
||||
- **Ограничения real backend** — локальный runtime использует shared `/workspace`, файлы передаются как относительные пути в `attachments`, а transport layer со стороны `surfaces` использует pinned upstream `platform-agent_api.AgentApi` почти без локальной stream-логики; текущая реализация рабочая, но после tool/file flow остаётся подтверждённый upstream streaming bug, из-за которого начало ответа может пропадать
|
||||
|
||||
---
|
||||
|
||||
|
|
@ -122,6 +122,8 @@ MATRIX_PASSWORD=... # или MATRIX_ACCESS_TOKEN=...
|
|||
MATRIX_PLATFORM_BACKEND=real
|
||||
|
||||
# compose runtime: platform-agent service name + shared /workspace
|
||||
# значение передаётся в thin wrapper как base URL; wrapper сам нормализует его
|
||||
# до upstream WS route /v1/agent_ws/{chat_id}/
|
||||
AGENT_WS_URL=ws://platform-agent:8000/v1/agent_ws/
|
||||
AGENT_BASE_URL=http://platform-agent:8000
|
||||
SURFACES_WORKSPACE_DIR=/workspace
|
||||
|
|
@ -245,7 +247,8 @@ PYTHONPATH=. uv run python -m adapter.matrix.bot
|
|||
| Функция | Почему не работает |
|
||||
|---|---|
|
||||
| `!load` в другом чате | platform-agent использует `StateBackend` — файлы живут в памяти отдельно для каждого `thread_id`. Файл, сохранённый в чате A, не виден в чате B. Фикс: переключить platform-agent на `FilesystemBackend` с общим хранилищем. |
|
||||
| Счётчик токенов в `!context` | platform-agent отдаёт `tokens_used=0` хардкодом в `MsgEventEnd`. Наш код перехватывает значение корректно. |
|
||||
| Стриминг после tool/file flow | В текущем upstream `platform-agent` первый `MsgEventTextChunk` иногда рождается уже обрезанным до попадания в websocket-клиент. Наш transport layer после cleanup максимально близок к upstream и больше не пытается локально “лечить” этот поток. Подробности и raw evidence: `docs/reports/2026-04-22-platform-streaming-final-bug-report-ru.md`. |
|
||||
| Счётчик токенов в `!context` | pinned `platform-agent_api.AgentApi` потребляет `MsgEventEnd` внутри клиента и не публикует `tokens_used` наружу. Сейчас `surfaces` честно показывает `0`, пока upstream не добавит поддержанный способ получить это значение. |
|
||||
| `!reset` | platform-agent не имеет endpoint `/reset`. Задокументировано в ТЗ к платформе. |
|
||||
| Персистентность между рестартами | platform-agent использует `MemorySaver` (in-memory). Все разговоры теряются при рестарте процесса. |
|
||||
| E2EE комнаты | `python-olm` не собирается на macOS/ARM. Ограничение инфраструктуры. |
|
||||
|
|
@ -269,6 +272,7 @@ PYTHONPATH=. uv run python -m adapter.matrix.bot
|
|||
| [`docs/api-contract.md`](docs/api-contract.md) | Контракт к SDK платформы |
|
||||
| [`docs/user-flow.md`](docs/user-flow.md) | FSM и user journey |
|
||||
| [`docs/claude-code-guide.md`](docs/claude-code-guide.md) | Гайд по работе с Claude Code |
|
||||
| [`docs/reports/2026-04-22-platform-streaming-final-bug-report-ru.md`](docs/reports/2026-04-22-platform-streaming-final-bug-report-ru.md) | Финальный аудит platform streaming bug после cleanup transport layer |
|
||||
|
||||
---
|
||||
|
||||
|
|
|
|||
|
|
@ -110,7 +110,7 @@ def _build_platform_from_env() -> PlatformClient:
|
|||
if backend == "real":
|
||||
ws_url = os.environ["AGENT_WS_URL"]
|
||||
return RealPlatformClient(
|
||||
agent_api=AgentApiWrapper(agent_id="matrix-bot", url=ws_url),
|
||||
agent_api=AgentApiWrapper(agent_id="matrix-bot", base_url=ws_url),
|
||||
prototype_state=PrototypeStateStore(),
|
||||
platform="matrix",
|
||||
)
|
||||
|
|
|
|||
|
|
@ -0,0 +1,294 @@
|
|||
# Финальный баг-репорт: потеря начала ответа и сбои streaming/image path в `platform-agent`
|
||||
|
||||
## Статус
|
||||
|
||||
Это финальный отчёт после полного аудита интеграции `surfaces -> platform-agent_api -> platform-agent`.
|
||||
|
||||
Итог:
|
||||
|
||||
- текущая реализация `surfaces` рабочая, но проблемная из-за upstream-дефектов платформы
|
||||
- баг с пропадающим началом ответа на текущем состоянии **не локализуется в `surfaces`**
|
||||
- в воспроизведённом сценарии повреждённый первый текстовый chunk рождается уже внутри `platform-agent`
|
||||
- помимо этого подтверждены ещё два независимых platform-side дефекта:
|
||||
- duplicate `END`
|
||||
- некорректная обработка больших изображений (`data-uri > 10 MB`, `WS 1009`)
|
||||
|
||||
## Версии и состояние кода
|
||||
|
||||
Рантайм воспроизводился на vendored upstream-репозиториях без локальных platform patch’ей:
|
||||
|
||||
- `platform-agent`: `5e7c2df954cc3cd2f5bf8ae688e10a20038dde61`
|
||||
- `platform-agent_api`: `aa480bbec5bbf8e006284dd03aed1c2754e9bbee`
|
||||
|
||||
Со стороны `surfaces` transport layer был предварительно очищен:
|
||||
|
||||
- убрана локальная stream-semantics из `sdk/agent_api_wrapper.py`
|
||||
- `sdk/real.py` переведён на pinned upstream `platform-agent_api.AgentApi`
|
||||
- больше нет локального post-END drain, custom listener и wrapper-owned reclassification events
|
||||
|
||||
Это важно: баг воспроизводился **после** удаления наших транспортных костылей.
|
||||
|
||||
## Контекст интеграции
|
||||
|
||||
- поверхность: Matrix
|
||||
- транспорт к платформе: WebSocket через upstream `platform-agent_api.AgentApi`
|
||||
- `chat_id` на платформу: стабильный числовой surrogate id, выдаваемый со стороны `surfaces`
|
||||
- файловый контракт: shared `/workspace`, вложения передаются как относительные пути в `attachments`
|
||||
|
||||
## Пользовательские симптомы
|
||||
|
||||
Наблюдались несколько классов сбоев:
|
||||
|
||||
1. Начало ответа может пропасть
|
||||
- ожидалось: `Моя ошибка: ...`
|
||||
- фактически: `оя ошибка: ...`
|
||||
|
||||
- ожидалось: `На двух изображениях: ...`
|
||||
- фактически: ` двух изображениях: ...`
|
||||
|
||||
2. После tool/file flow ответы могут вести себя нестабильно
|
||||
- следующий ответ стартует с середины фразы
|
||||
- в некоторых сценариях после image/tool path платформа отвечает ошибкой или замолкает
|
||||
|
||||
3. На больших изображениях image path падает совсем
|
||||
- provider error `Exceeded limit on max bytes per data-uri item : 10485760`
|
||||
- websocket закрывается с `1009 (message too big)`
|
||||
|
||||
## Что было проверено на стороне `surfaces`
|
||||
|
||||
Ниже перечислено, что именно было перепроверено в нашем коде и почему это не выглядит корнем проблемы.
|
||||
|
||||
### 1. Мы больше не режем и не переклассифицируем stream локально
|
||||
|
||||
В текущем `surfaces`:
|
||||
|
||||
- `sdk/agent_api_wrapper.py` — thin construction/factory shim над upstream `AgentApi`
|
||||
- `sdk/real.py` — просто итерирует upstream events и склеивает `MsgEventTextChunk.text`
|
||||
- `adapter/matrix/bot.py` — отправляет `OutgoingMessage.text` в Matrix без `strip/lstrip`
|
||||
|
||||
Наблюдение:
|
||||
|
||||
- в текущем коде не осталось места, где строка могла бы превратиться из `Моя ошибка` в `оя ошибка` через локальный slicing
|
||||
|
||||
### 2. Сборка ответа у нас линейная и тупая
|
||||
|
||||
`sdk/real.py` делает только следующее:
|
||||
|
||||
- если пришёл `MsgEventTextChunk` — добавляет `event.text` в `response_parts`
|
||||
- если пришёл `MsgEventSendFile` — превращает его в `Attachment`
|
||||
- не пытается “восстанавливать” поток после `END`
|
||||
|
||||
Следствие:
|
||||
|
||||
- если начало уже отсутствует в `event.text`, мы его не можем ни потерять, ни вернуть
|
||||
|
||||
### 3. Matrix sender не модифицирует текст
|
||||
|
||||
`adapter/matrix/bot.py` передаёт текст дальше как есть.
|
||||
|
||||
Следствие:
|
||||
|
||||
- Matrix renderer не является объяснением пропажи первого куска
|
||||
|
||||
## Что было проверено в `platform-agent_api`
|
||||
|
||||
Upstream client всё ещё имеет спорную queue-архитектуру:
|
||||
|
||||
- одна активная `_current_queue`
|
||||
- `MsgEventEnd` съедается внутри `send_message()`
|
||||
- в `finally` очередь отвязывается и дренится orphan messages
|
||||
|
||||
Это архитектурно хрупко и может быть источником других boundary bugs.
|
||||
|
||||
Но в конкретном воспроизведении этот слой не был точкой порчи текста.
|
||||
|
||||
Почему:
|
||||
|
||||
- в raw logs клиент получил **ровно тот же** первый text chunk, который сервер уже отправил
|
||||
- queue/dequeue не изменили его содержимое
|
||||
|
||||
## Что удалось доказать по raw logs
|
||||
|
||||
Для финальной проверки была временно добавлена точечная диагностика в:
|
||||
|
||||
- `external/platform-agent/src/agent/service.py`
|
||||
- `external/platform-agent/src/api/external.py`
|
||||
- `external/platform-agent_api/lambda_agent_api/agent_api.py`
|
||||
|
||||
Эта диагностика **не входила** в рабочую реализацию и использовалась только для локализации бага.
|
||||
|
||||
### Ключевое наблюдение
|
||||
|
||||
На проблемном запросе после tool/file flow сервер сам yield’ил уже обрезанный первый chunk:
|
||||
|
||||
```text
|
||||
platform-agent-1 | [raw-stream][server-yield] chat=1 event=TEXT text=' двух изображениях:\n\n**Первое изображение'
|
||||
platform-agent-1 | [raw-stream][ws-send] chat=1 event=AGENT_EVENT_TEXT_CHUNK text=' двух изображениях:\n\n**Первое изображение' path=None
|
||||
matrix-bot-1 | [raw-stream][client-listen] agent=matrix-bot chat=1 queue_active=True AGENT_EVENT_TEXT_CHUNK text=' двух изображениях:\n\n**Первое изображение'
|
||||
matrix-bot-1 | [raw-stream][client-dequeue] agent=matrix-bot chat=1 request=2 AGENT_EVENT_TEXT_CHUNK text=' двух изображениях:\n\n**Первое изображение'
|
||||
```
|
||||
|
||||
Это означает:
|
||||
|
||||
- порча произошла **до** websocket-клиента
|
||||
- `surfaces` transport layer не является источником именно этого дефекта
|
||||
- `platform-agent_api` не исказил этот конкретный chunk по дороге
|
||||
|
||||
Дополнительно тот же паттерн виден и вне image-сценария:
|
||||
|
||||
```text
|
||||
platform-agent-1 | [raw-stream][server-yield] chat=1 event=TEXT text='сё работает напрямую'
|
||||
...
|
||||
matrix-bot-1 | [raw-stream][client-dequeue] ... text='сё работает напрямую'
|
||||
```
|
||||
|
||||
То есть сервер уже выдаёт `сё`, а не `Всё`.
|
||||
|
||||
## Наиболее вероятный root cause
|
||||
|
||||
Главный подозреваемый — `external/platform-agent/src/agent/service.py`.
|
||||
|
||||
Сейчас он делает следующее:
|
||||
|
||||
- читает `self._agent.astream_events(...)`
|
||||
- обрабатывает только `kind == "on_chat_model_stream"`
|
||||
- берёт `chunk = event["data"]["chunk"]`
|
||||
- если `chunk.content`, форвардит `MsgEventTextChunk(text=chunk.content)`
|
||||
|
||||
Проблема в том, что это очень грубое преобразование raw event stream в пользовательский текст.
|
||||
|
||||
### Почему именно это место выглядит корнем
|
||||
|
||||
1. Первый битый chunk уже рождается на server-side
|
||||
- это подтверждено логами выше
|
||||
|
||||
2. Код берёт только `chunk.content`
|
||||
- если начало ответа приходит в другой форме, поле или raw event, оно просто теряется
|
||||
|
||||
3. Код не учитывает `ns` / `source`
|
||||
- после tool/vision flow у Deep Agents / LangChain может быть более сложная структура потока
|
||||
- текущий adapter flatten’ит её слишком агрессивно
|
||||
|
||||
4. Код никак не валидирует, что наружу уходит именно main assistant output
|
||||
|
||||
Итоговая гипотеза:
|
||||
|
||||
> После tool/file/vision flow `platform-agent` неправильно адаптирует `astream_events()` в `MsgEventTextChunk`. Начало итогового пользовательского ответа может находиться не в том raw event, который сейчас читается через `chunk.content`, либо теряться из-за упрощённой фильтрации потока.
|
||||
|
||||
## Подтверждённый отдельный баг: duplicate `END`
|
||||
|
||||
Это отдельный platform-side дефект.
|
||||
|
||||
Сейчас:
|
||||
|
||||
- `external/platform-agent/src/agent/service.py` уже делает `yield MsgEventEnd(...)`
|
||||
- `external/platform-agent/src/api/external.py` после завершения цикла дополнительно отправляет ещё один `MsgEventEnd(...)`
|
||||
|
||||
По логам это выглядит так:
|
||||
|
||||
```text
|
||||
platform-agent-1 | [raw-stream][server-yield] chat=1 event=END
|
||||
platform-agent-1 | [raw-stream][ws-send] chat=1 event=AGENT_EVENT_END text=None path=None
|
||||
platform-agent-1 | [raw-stream][ws-send] chat=1 event=AGENT_EVENT_END duplicate_end=true
|
||||
matrix-bot-1 | ... AGENT_EVENT_END tokens_used=0
|
||||
matrix-bot-1 | ... AGENT_EVENT_END tokens_used=0
|
||||
```
|
||||
|
||||
Независимая оценка:
|
||||
|
||||
- duplicate `END` — реальный баг платформы
|
||||
- он делает границу ответа менее надёжной
|
||||
- но в текущем воспроизведении он **не объясняет** сам факт потери первого text chunk
|
||||
|
||||
То есть это важный, но вторичный дефект.
|
||||
|
||||
## Подтверждённый отдельный баг: большие изображения ломают image path
|
||||
|
||||
В отдельном воспроизведении платформа падала на анализе изображений с provider error:
|
||||
|
||||
```text
|
||||
Exceeded limit on max bytes per data-uri item : 10485760
|
||||
```
|
||||
|
||||
И параллельно websocket рвался с:
|
||||
|
||||
```text
|
||||
received 1009 (message too big); then sent 1009 (message too big)
|
||||
```
|
||||
|
||||
Это означает:
|
||||
|
||||
- image path отправляет в provider oversized `data:` URI
|
||||
- безопасной предвалидации / деградации нет
|
||||
- failure scenario сопровождается разрывом websocket-соединения
|
||||
|
||||
Независимая оценка:
|
||||
|
||||
- это отдельный platform-side bug
|
||||
- он не объясняет потерю первого чанка в текстовом сценарии напрямую
|
||||
- но подтверждает, что path `tool/file/image -> platform stream` в целом сейчас нестабилен
|
||||
|
||||
## Что мы считаем исключённым
|
||||
|
||||
С достаточной уверенностью можно исключить:
|
||||
|
||||
1. Локальный slicing текста в `surfaces`
|
||||
2. Локальную “умную” реконструкцию потока, потому что она была удалена
|
||||
3. Matrix sender как источник потери первого чанка
|
||||
4. `platform-agent_api` queue/dequeue как primary root cause именно в этом воспроизведении
|
||||
|
||||
## Финальная независимая оценка
|
||||
|
||||
Текущая оценка вероятностей:
|
||||
|
||||
- `75%` — ошибка в `platform-agent`, в адаптере `astream_events() -> MsgEventTextChunk`
|
||||
- `15%` — provider/model stream приносит начало ответа в другой raw event/field, а `platform-agent` его некорректно интерпретирует
|
||||
- `10%` — вторичные platform-side boundary defects (`duplicate END`, queue semantics и т.д.)
|
||||
- `~0-5%` — ошибка в `surfaces`
|
||||
|
||||
Итоговый вывод:
|
||||
|
||||
> На текущем состоянии кода баг с пропадающим началом ответа следует считать platform-side дефектом. Воспроизведение после cleanup transport layer показывает, что первый повреждённый text chunk формируется уже внутри `platform-agent` до отправки в websocket.
|
||||
|
||||
## Что нужно исправить в платформе
|
||||
|
||||
### Обязательно
|
||||
|
||||
1. Убрать duplicate `END`
|
||||
- один ответ должен завершаться ровно одним `MsgEventEnd`
|
||||
|
||||
2. Перепроверить адаптацию `astream_events()` в `service.py`
|
||||
- логировать и проанализировать raw `event["event"]`
|
||||
- проверить `event.get("name")`
|
||||
- смотреть `event.get("ns")`
|
||||
- сравнить `chunk.content` с тем, что реально лежит в `chunk.text` / raw chunk repr
|
||||
|
||||
3. Форвардить наружу только финальный main assistant output
|
||||
- не flatten’ить весь поток без учёта `ns/source`
|
||||
|
||||
### Желательно
|
||||
|
||||
4. Сделать image path устойчивым к oversized payload
|
||||
- preflight check размера
|
||||
- resize/compress или controlled error без разрыва WS
|
||||
|
||||
5. Улучшить client/server protocol boundary
|
||||
- более строгая корреляция запроса и ответа
|
||||
- более однозначная semantics конца ответа
|
||||
|
||||
## Что мы сделали со своей стороны
|
||||
|
||||
Со стороны `surfaces` уже выполнено следующее:
|
||||
|
||||
- transport layer очищен до thin adapter над upstream `AgentApi`
|
||||
- локальные stream-workaround’ы удалены
|
||||
- рабочая интеграция сохранена
|
||||
- known issue задокументирован
|
||||
|
||||
То есть текущая интеграция не “идеальна”, но её поведение теперь достаточно чистое, чтобы platform bug было можно локализовать без смешения ответственности.
|
||||
|
||||
## Приложение: короткий диагноз
|
||||
|
||||
Если нужна самая короткая формулировка для issue tracker:
|
||||
|
||||
> После cleanup transport layer в `surfaces` и воспроизведения на clean upstream platform repos видно, что `platform-agent` иногда сам порождает первый `MsgEventTextChunk` уже обрезанным, особенно после tool/file flow. Дополнительно подтверждены duplicate `END` и отдельный image-path failure на больших `data:` URI.
|
||||
|
|
@ -0,0 +1,540 @@
|
|||
# Transport Layer Thin Adapter Implementation Plan
|
||||
|
||||
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
|
||||
|
||||
**Goal:** Replace the current custom transport behavior with a thin upstream-compatible adapter so Matrix uses `platform-agent_api.AgentApi` almost as-is and keeps only integration concerns on the `surfaces` side.
|
||||
|
||||
**Architecture:** Keep a tiny `AgentApiWrapper` only as a construction/factory shim (`base_url` normalization + `for_chat(chat_id)`). Move all resilience logic that still belongs to `surfaces` into `RealPlatformClient`: per-chat client caching, per-chat send locks, disconnect-on-failure, and `PlatformError` mapping. Delete all custom stream semantics from the wrapper so bugs in streaming can be attributed cleanly to either upstream or our integration layer.
|
||||
|
||||
**Tech Stack:** Python 3.11, `aiohttp`, upstream `lambda_agent_api`, `pytest`, `pytest-asyncio`, `ruff`
|
||||
|
||||
---
|
||||
|
||||
## File Structure
|
||||
|
||||
- Modify: `sdk/agent_api_wrapper.py`
|
||||
Purpose: shrink the wrapper to a thin constructor/factory shim with no custom `_listen()` or `send_message()` logic.
|
||||
- Modify: `sdk/real.py`
|
||||
Purpose: keep integration-only behavior: cached per-chat clients, chat locks, attachment forwarding, and failure cleanup.
|
||||
- Modify: `adapter/matrix/bot.py`
|
||||
Purpose: instantiate the wrapper with the modern `base_url` path instead of a raw `url`, matching the pinned upstream API.
|
||||
- Modify: `tests/platform/test_real.py`
|
||||
Purpose: remove tests that encode custom wrapper semantics and replace them with tests that prove only the intended integration guarantees.
|
||||
- Modify: `README.md`
|
||||
Purpose: document that the transport layer now uses the pinned upstream `platform-agent_api` client semantics directly, with only a thin local adapter.
|
||||
|
||||
### Task 1: Shrink `AgentApiWrapper` To A Thin Factory Shim
|
||||
|
||||
**Files:**
|
||||
- Modify: `sdk/agent_api_wrapper.py`
|
||||
- Test: `tests/platform/test_real.py`
|
||||
|
||||
- [ ] **Step 1: Replace wrapper-only behavior tests with thin-wrapper tests**
|
||||
|
||||
Update `tests/platform/test_real.py` so it no longer asserts any custom post-END drain or wrapper-owned timeout behavior. Replace those tests with the following:
|
||||
|
||||
```python
|
||||
def test_agent_api_wrapper_normalizes_base_url_and_uses_modern_constructor(monkeypatch):
|
||||
captured = {}
|
||||
|
||||
def fake_init(self, agent_id, base_url=None, chat_id=0, **kwargs):
|
||||
captured["agent_id"] = agent_id
|
||||
captured["base_url"] = base_url
|
||||
captured["chat_id"] = chat_id
|
||||
|
||||
monkeypatch.setattr(agent_api_wrapper_module.AgentApi, "__init__", fake_init)
|
||||
|
||||
wrapper = AgentApiWrapper(
|
||||
agent_id="agent-1",
|
||||
base_url="ws://platform-agent:8000/v1/agent_ws/",
|
||||
chat_id="41",
|
||||
)
|
||||
|
||||
assert wrapper.chat_id == "41"
|
||||
assert wrapper._base_url == "ws://platform-agent:8000"
|
||||
assert captured == {
|
||||
"agent_id": "agent-1",
|
||||
"base_url": "ws://platform-agent:8000",
|
||||
"chat_id": "41",
|
||||
}
|
||||
|
||||
|
||||
def test_agent_api_wrapper_for_chat_reuses_normalized_base_url(monkeypatch):
|
||||
init_calls = []
|
||||
|
||||
def fake_init(self, agent_id, base_url=None, chat_id=0, **kwargs):
|
||||
self.id = agent_id
|
||||
self.chat_id = chat_id
|
||||
self.url = base_url
|
||||
init_calls.append((agent_id, base_url, chat_id))
|
||||
|
||||
monkeypatch.setattr(agent_api_wrapper_module.AgentApi, "__init__", fake_init)
|
||||
|
||||
root = AgentApiWrapper(
|
||||
agent_id="agent-1",
|
||||
base_url="http://platform-agent:8000/v1/agent_ws/",
|
||||
chat_id="1",
|
||||
)
|
||||
|
||||
child = root.for_chat("99")
|
||||
|
||||
assert child is not root
|
||||
assert child.chat_id == "99"
|
||||
assert child._base_url == "http://platform-agent:8000"
|
||||
assert init_calls == [
|
||||
("agent-1", "http://platform-agent:8000", "1"),
|
||||
("agent-1", "http://platform-agent:8000", "99"),
|
||||
]
|
||||
```
|
||||
|
||||
- [ ] **Step 2: Run tests to verify old assumptions fail**
|
||||
|
||||
Run:
|
||||
|
||||
```bash
|
||||
/bin/zsh -lc 'PYTHONPATH=.:external/platform-agent_api uv run pytest tests/platform/test_real.py -q -k "recovers_late_text_after_first_end or times_out_on_idle_stream or normalizes_base_url_and_uses_modern_constructor or for_chat_reuses_normalized_base_url"'
|
||||
```
|
||||
|
||||
Expected:
|
||||
|
||||
- FAIL because the old wrapper-behavior tests still exist
|
||||
- FAIL or SKIP for the new thin-wrapper tests until code/tests are aligned
|
||||
|
||||
- [ ] **Step 3: Replace `sdk/agent_api_wrapper.py` with a thin wrapper**
|
||||
|
||||
Rewrite `sdk/agent_api_wrapper.py` to the minimal implementation below:
|
||||
|
||||
```python
|
||||
from __future__ import annotations
|
||||
|
||||
import inspect
|
||||
import re
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from urllib.parse import urlsplit, urlunsplit
|
||||
|
||||
_api_root = Path(__file__).resolve().parents[1] / "external" / "platform-agent_api"
|
||||
if str(_api_root) not in sys.path:
|
||||
sys.path.insert(0, str(_api_root))
|
||||
|
||||
from lambda_agent_api.agent_api import AgentApi # noqa: E402
|
||||
|
||||
|
||||
class AgentApiWrapper(AgentApi):
|
||||
"""Thin construction/factory shim over the pinned upstream AgentApi."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
agent_id: str,
|
||||
base_url: str,
|
||||
*,
|
||||
chat_id: int | str = 0,
|
||||
**kwargs,
|
||||
) -> None:
|
||||
self._base_url = self._normalize_base_url(base_url)
|
||||
self._init_kwargs = dict(kwargs)
|
||||
self.chat_id = chat_id
|
||||
if not self._supports_modern_constructor():
|
||||
raise RuntimeError("Pinned platform-agent_api is expected to support base_url + chat_id")
|
||||
|
||||
super().__init__(
|
||||
agent_id=agent_id,
|
||||
base_url=self._base_url,
|
||||
chat_id=chat_id,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _supports_modern_constructor() -> bool:
|
||||
try:
|
||||
parameters = inspect.signature(AgentApi.__init__).parameters
|
||||
except (TypeError, ValueError):
|
||||
return False
|
||||
return "base_url" in parameters and "chat_id" in parameters
|
||||
|
||||
@staticmethod
|
||||
def _normalize_base_url(base_url: str) -> str:
|
||||
parsed = urlsplit(base_url)
|
||||
path = re.sub(r"(?:/v1)?/agent_ws(?:/[^/]+)?/?$", "", parsed.path.rstrip("/"))
|
||||
return urlunsplit((parsed.scheme, parsed.netloc, path, "", ""))
|
||||
|
||||
def for_chat(self, chat_id: int | str) -> "AgentApiWrapper":
|
||||
return type(self)(
|
||||
agent_id=self.id,
|
||||
base_url=self._base_url,
|
||||
chat_id=chat_id,
|
||||
**self._init_kwargs,
|
||||
)
|
||||
```
|
||||
|
||||
- [ ] **Step 4: Run the wrapper-focused tests**
|
||||
|
||||
Run:
|
||||
|
||||
```bash
|
||||
/bin/zsh -lc 'PYTHONPATH=.:external/platform-agent_api uv run pytest tests/platform/test_real.py -q -k "normalizes_base_url_and_uses_modern_constructor or for_chat_reuses_normalized_base_url"'
|
||||
```
|
||||
|
||||
Expected:
|
||||
|
||||
- PASS
|
||||
|
||||
- [ ] **Step 5: Commit**
|
||||
|
||||
```bash
|
||||
git add sdk/agent_api_wrapper.py tests/platform/test_real.py
|
||||
git commit -m "refactor: shrink agent api wrapper to thin adapter"
|
||||
```
|
||||
|
||||
### Task 2: Simplify `RealPlatformClient` To The Pinned Modern API
|
||||
|
||||
**Files:**
|
||||
- Modify: `sdk/real.py`
|
||||
- Modify: `adapter/matrix/bot.py`
|
||||
- Test: `tests/platform/test_real.py`
|
||||
|
||||
- [ ] **Step 1: Add failing integration tests for the desired thin-adapter contract**
|
||||
|
||||
Extend `tests/platform/test_real.py` with these assertions:
|
||||
|
||||
```python
|
||||
@pytest.mark.asyncio
|
||||
async def test_real_platform_client_passes_attachments_to_modern_send_message():
|
||||
agent_api = FakeAgentApiFactory()
|
||||
client = RealPlatformClient(
|
||||
agent_api=agent_api,
|
||||
prototype_state=PrototypeStateStore(),
|
||||
platform="matrix",
|
||||
)
|
||||
|
||||
attachment = Attachment(
|
||||
type="document",
|
||||
filename="report.pdf",
|
||||
mime_type="application/pdf",
|
||||
workspace_path="surfaces/matrix/u1/r1/inbox/report.pdf",
|
||||
)
|
||||
|
||||
result = await client.send_message(
|
||||
"@alice:example.org",
|
||||
"chat-1",
|
||||
"read this",
|
||||
attachments=[attachment],
|
||||
)
|
||||
|
||||
assert result.response == "read this"
|
||||
assert agent_api.instances["chat-1"].calls == [
|
||||
("read this", ["surfaces/matrix/u1/r1/inbox/report.pdf"])
|
||||
]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_real_platform_client_disconnects_chat_after_agent_exception():
|
||||
class ErroringChatAgentApi:
|
||||
def __init__(self, chat_id: str) -> None:
|
||||
self.chat_id = chat_id
|
||||
self.connect_calls = 0
|
||||
self.close_calls = 0
|
||||
|
||||
async def connect(self) -> None:
|
||||
self.connect_calls += 1
|
||||
|
||||
async def close(self) -> None:
|
||||
self.close_calls += 1
|
||||
|
||||
async def send_message(self, text: str, attachments: list[str] | None = None):
|
||||
raise agent_api_wrapper_module.AgentException("INTERNAL_ERROR", "boom")
|
||||
yield
|
||||
|
||||
agent_api = FakeAgentApiFactory()
|
||||
erroring = ErroringChatAgentApi("chat-1")
|
||||
agent_api.for_chat = lambda chat_id: erroring
|
||||
client = RealPlatformClient(
|
||||
agent_api=agent_api,
|
||||
prototype_state=PrototypeStateStore(),
|
||||
platform="matrix",
|
||||
)
|
||||
|
||||
with pytest.raises(PlatformError, match="boom") as exc_info:
|
||||
await client.send_message("@alice:example.org", "chat-1", "hello")
|
||||
|
||||
assert exc_info.value.code == "INTERNAL_ERROR"
|
||||
assert erroring.close_calls == 1
|
||||
assert "chat-1" not in client._chat_apis
|
||||
```
|
||||
|
||||
- [ ] **Step 2: Run tests to verify they fail before simplification**
|
||||
|
||||
Run:
|
||||
|
||||
```bash
|
||||
/bin/zsh -lc 'PYTHONPATH=.:external/platform-agent_api uv run pytest tests/platform/test_real.py -q -k "passes_attachments_to_modern_send_message or disconnects_chat_after_agent_exception"'
|
||||
```
|
||||
|
||||
Expected:
|
||||
|
||||
- FAIL until `sdk/real.py` and Matrix runtime wiring are aligned with the pinned modern API
|
||||
|
||||
- [ ] **Step 3: Simplify `sdk/real.py` and Matrix runtime construction**
|
||||
|
||||
Make these exact edits:
|
||||
|
||||
```python
|
||||
# adapter/matrix/bot.py
|
||||
def _build_platform_from_env() -> PlatformClient:
|
||||
backend = os.environ.get("MATRIX_PLATFORM_BACKEND", "mock").strip().lower()
|
||||
if backend == "real":
|
||||
base_url = os.environ["AGENT_BASE_URL"]
|
||||
return RealPlatformClient(
|
||||
agent_api=AgentApiWrapper(agent_id="matrix-bot", base_url=base_url),
|
||||
prototype_state=PrototypeStateStore(),
|
||||
platform="matrix",
|
||||
)
|
||||
return MockPlatformClient()
|
||||
```
|
||||
|
||||
```python
|
||||
# sdk/real.py
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from collections.abc import AsyncIterator
|
||||
from pathlib import Path
|
||||
|
||||
from sdk.agent_api_wrapper import AgentApiWrapper
|
||||
from sdk.interface import (
|
||||
Attachment,
|
||||
MessageChunk,
|
||||
MessageResponse,
|
||||
PlatformClient,
|
||||
PlatformError,
|
||||
User,
|
||||
UserSettings,
|
||||
)
|
||||
from sdk.prototype_state import PrototypeStateStore
|
||||
|
||||
|
||||
class RealPlatformClient(PlatformClient):
|
||||
def __init__(
|
||||
self,
|
||||
agent_api: AgentApiWrapper,
|
||||
prototype_state: PrototypeStateStore,
|
||||
platform: str = "matrix",
|
||||
) -> None:
|
||||
self._agent_api = agent_api
|
||||
self._prototype_state = prototype_state
|
||||
self._platform = platform
|
||||
self._chat_apis: dict[str, AgentApiWrapper] = {}
|
||||
self._chat_api_lock = asyncio.Lock()
|
||||
self._chat_send_locks: dict[str, asyncio.Lock] = {}
|
||||
|
||||
@property
|
||||
def agent_api(self) -> AgentApiWrapper:
|
||||
return self._agent_api
|
||||
|
||||
async def _get_chat_api(self, chat_id: str) -> AgentApiWrapper:
|
||||
chat_key = str(chat_id)
|
||||
chat_api = self._chat_apis.get(chat_key)
|
||||
if chat_api is None:
|
||||
async with self._chat_api_lock:
|
||||
chat_api = self._chat_apis.get(chat_key)
|
||||
if chat_api is None:
|
||||
chat_api = self._agent_api.for_chat(chat_key)
|
||||
await chat_api.connect()
|
||||
self._chat_apis[chat_key] = chat_api
|
||||
return chat_api
|
||||
|
||||
def _get_chat_send_lock(self, chat_id: str) -> asyncio.Lock:
|
||||
chat_key = str(chat_id)
|
||||
lock = self._chat_send_locks.get(chat_key)
|
||||
if lock is None:
|
||||
lock = asyncio.Lock()
|
||||
self._chat_send_locks[chat_key] = lock
|
||||
return lock
|
||||
|
||||
async def send_message(
|
||||
self,
|
||||
user_id: str,
|
||||
chat_id: str,
|
||||
text: str,
|
||||
attachments: list[Attachment] | None = None,
|
||||
) -> MessageResponse:
|
||||
response_parts: list[str] = []
|
||||
tokens_used = 0
|
||||
sent_attachments: list[Attachment] = []
|
||||
message_id = user_id
|
||||
|
||||
lock = self._get_chat_send_lock(chat_id)
|
||||
async with lock:
|
||||
chat_api = await self._get_chat_api(chat_id)
|
||||
try:
|
||||
async for event in chat_api.send_message(text, attachments=self._attachment_paths(attachments)):
|
||||
if hasattr(event, "text"):
|
||||
response_parts.append(event.text)
|
||||
elif event.__class__.__name__ == "MsgEventEnd":
|
||||
tokens_used = getattr(event, "tokens_used", 0)
|
||||
elif "SEND_FILE" in getattr(getattr(event, "type", None), "value", str(getattr(event, "type", ""))):
|
||||
attachment = self._attachment_from_send_file_event(event)
|
||||
if attachment is not None:
|
||||
sent_attachments.append(attachment)
|
||||
except Exception as exc:
|
||||
await self._handle_chat_api_failure(chat_id, exc)
|
||||
|
||||
await self._prototype_state.set_last_tokens_used(str(chat_id), tokens_used)
|
||||
|
||||
return MessageResponse(
|
||||
message_id=message_id,
|
||||
response="".join(response_parts),
|
||||
tokens_used=tokens_used,
|
||||
finished=True,
|
||||
attachments=sent_attachments,
|
||||
)
|
||||
|
||||
async def stream_message(
|
||||
self,
|
||||
user_id: str,
|
||||
chat_id: str,
|
||||
text: str,
|
||||
attachments: list[Attachment] | None = None,
|
||||
) -> AsyncIterator[MessageChunk]:
|
||||
lock = self._get_chat_send_lock(chat_id)
|
||||
async with lock:
|
||||
chat_api = await self._get_chat_api(chat_id)
|
||||
try:
|
||||
async for event in chat_api.send_message(text, attachments=self._attachment_paths(attachments)):
|
||||
if hasattr(event, "text"):
|
||||
yield MessageChunk(
|
||||
message_id=user_id,
|
||||
delta=event.text,
|
||||
finished=False,
|
||||
)
|
||||
elif event.__class__.__name__ == "MsgEventEnd":
|
||||
tokens_used = getattr(event, "tokens_used", 0)
|
||||
await self._prototype_state.set_last_tokens_used(str(chat_id), tokens_used)
|
||||
yield MessageChunk(
|
||||
message_id=user_id,
|
||||
delta="",
|
||||
finished=True,
|
||||
tokens_used=tokens_used,
|
||||
)
|
||||
except Exception as exc:
|
||||
await self._handle_chat_api_failure(chat_id, exc)
|
||||
|
||||
async def disconnect_chat(self, chat_id: str) -> None:
|
||||
chat_key = str(chat_id)
|
||||
chat_api = self._chat_apis.pop(chat_key, None)
|
||||
self._chat_send_locks.pop(chat_key, None)
|
||||
if chat_api is not None:
|
||||
await chat_api.close()
|
||||
|
||||
async def close(self) -> None:
|
||||
for chat_api in list(self._chat_apis.values()):
|
||||
await chat_api.close()
|
||||
self._chat_apis.clear()
|
||||
self._chat_send_locks.clear()
|
||||
|
||||
async def _handle_chat_api_failure(self, chat_id: str, exc: Exception) -> None:
|
||||
await self.disconnect_chat(chat_id)
|
||||
code = getattr(exc, "code", None) or "PLATFORM_CONNECTION_ERROR"
|
||||
raise PlatformError(str(exc), code=code) from exc
|
||||
|
||||
@staticmethod
|
||||
def _attachment_paths(attachments: list[Attachment] | None) -> list[str]:
|
||||
if not attachments:
|
||||
return []
|
||||
return [attachment.workspace_path for attachment in attachments if attachment.workspace_path]
|
||||
```
|
||||
|
||||
- [ ] **Step 4: Run the focused transport tests**
|
||||
|
||||
Run:
|
||||
|
||||
```bash
|
||||
/bin/zsh -lc 'PYTHONPATH=.:external/platform-agent_api uv run pytest tests/platform/test_real.py -q -k "passes_attachments_to_modern_send_message or disconnects_chat_after_agent_exception or wraps_connection_closed_as_platform_error or reconnects_after_closed_chat_api"'
|
||||
```
|
||||
|
||||
Expected:
|
||||
|
||||
- PASS
|
||||
|
||||
- [ ] **Step 5: Commit**
|
||||
|
||||
```bash
|
||||
git add adapter/matrix/bot.py sdk/real.py tests/platform/test_real.py
|
||||
git commit -m "refactor: use upstream transport semantics in real client"
|
||||
```
|
||||
|
||||
### Task 3: Remove Custom Transport Assumptions From Tests And Docs
|
||||
|
||||
**Files:**
|
||||
- Modify: `tests/platform/test_real.py`
|
||||
- Modify: `README.md`
|
||||
|
||||
- [ ] **Step 1: Delete tests that encode wrapper-owned stream semantics**
|
||||
|
||||
Remove any tests that assert:
|
||||
|
||||
- late text is recovered after the first `END`
|
||||
- duplicate `END` is repaired inside our wrapper
|
||||
- wrapper-owned idle timeout semantics
|
||||
|
||||
The file should keep only tests for:
|
||||
|
||||
- wrapper construction/factory behavior
|
||||
- per-chat client reuse
|
||||
- reconnect/disconnect after failure
|
||||
- attachment forwarding
|
||||
- per-chat send locking
|
||||
|
||||
- [ ] **Step 2: Update README transport description**
|
||||
|
||||
Add this text to the Matrix runtime/backend section in `README.md`:
|
||||
|
||||
```md
|
||||
Transport layer note:
|
||||
|
||||
- `surfaces` now uses the pinned upstream `platform-agent_api.AgentApi` stream semantics directly
|
||||
- local code keeps only a thin adapter for client construction and per-chat client factories
|
||||
- request serialization, disconnect-on-failure, and `PlatformError` mapping remain in `sdk/real.py`
|
||||
- `surfaces` no longer performs local post-END stream reconstruction
|
||||
```
|
||||
|
||||
- [ ] **Step 3: Run the full verification set**
|
||||
|
||||
Run:
|
||||
|
||||
```bash
|
||||
uv run ruff check adapter/matrix sdk tests/platform/test_real.py
|
||||
/bin/zsh -lc 'PYTHONPATH=. uv run pytest tests/adapter/matrix -q'
|
||||
/bin/zsh -lc 'PYTHONPATH=.:external/platform-agent_api uv run pytest tests/platform/test_real.py -q'
|
||||
```
|
||||
|
||||
Expected:
|
||||
|
||||
- `ruff` reports `All checks passed!`
|
||||
- Matrix adapter tests PASS
|
||||
- `tests/platform/test_real.py` PASS
|
||||
|
||||
- [ ] **Step 4: Commit**
|
||||
|
||||
```bash
|
||||
git add README.md tests/platform/test_real.py
|
||||
git commit -m "test: remove custom transport semantics assumptions"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Self-Review
|
||||
|
||||
- Spec coverage:
|
||||
- thin adapter target: covered by Task 1
|
||||
- integration-only `RealPlatformClient`: covered by Task 2
|
||||
- removal of custom stream semantics assumptions: covered by Task 3
|
||||
- re-verification after cleanup: covered by Task 3
|
||||
|
||||
- Placeholder scan:
|
||||
- no `TODO`, `TBD`, or deferred implementation placeholders remain in task steps
|
||||
|
||||
- Type consistency:
|
||||
- `AgentApiWrapper` remains the construction/factory type used by `RealPlatformClient`
|
||||
- failure mapping still terminates in `PlatformError`
|
||||
- attachment forwarding consistently uses `attachments: list[str]`
|
||||
|
|
@ -0,0 +1,318 @@
|
|||
# Transport Layer Thin Adapter Design
|
||||
|
||||
## Цель
|
||||
|
||||
Упростить transport layer между Matrix surface и `platform-agent` до максимально production-like вида:
|
||||
|
||||
- использовать upstream `platform-agent_api.AgentApi` почти как есть
|
||||
- убрать из surface-side клиента собственную интерпретацию stream semantics
|
||||
- оставить в нашем коде только integration concerns:
|
||||
- per-chat lifecycle
|
||||
- per-chat serialization
|
||||
- attachment path forwarding
|
||||
- exception mapping в `PlatformError`
|
||||
|
||||
Это нужно, чтобы:
|
||||
|
||||
- восстановить чёткую границу ответственности между `surfaces` и платформой
|
||||
- убрать из диагностики ложные факторы, внесённые нашей кастомной обёрткой
|
||||
- получить честную картину реальных platform bugs до добавления любых policy-надстроек
|
||||
|
||||
## Контекст
|
||||
|
||||
Сейчас transport path состоит из:
|
||||
|
||||
- [sdk/agent_api_wrapper.py](/Users/a/MAI/sem2/lambda/surfaces-bot/sdk/agent_api_wrapper.py)
|
||||
- [sdk/real.py](/Users/a/MAI/sem2/lambda/surfaces-bot/sdk/real.py)
|
||||
|
||||
Изначально `AgentApiWrapper` был создан по разумным причинам:
|
||||
|
||||
- поддержка переходного периода между разными версиями `platform-agent_api`
|
||||
- унификация `base_url/url`
|
||||
- создание per-chat client instances через `for_chat()`
|
||||
- локальный учёт `tokens_used`
|
||||
|
||||
Позже в этот слой были добавлены уже не compatibility-функции, а собственные transport semantics:
|
||||
|
||||
- custom `_listen()`
|
||||
- custom `send_message()`
|
||||
- post-END drain window
|
||||
- custom idle timeout
|
||||
- event-kind reclassification
|
||||
|
||||
После этого `surfaces` перестал быть тонким клиентом платформы и начал вести себя как альтернативная реализация transport layer. Это делает диагностику platform bugs нечистой.
|
||||
|
||||
## Принципы дизайна
|
||||
|
||||
### 1. Transport должен быть скучным
|
||||
|
||||
Transport layer не должен:
|
||||
|
||||
- спасать поздние chunks
|
||||
- лечить duplicate `END`
|
||||
- придумывать собственные правила границы ответа
|
||||
- по-своему классифицировать stream events сверх upstream client behavior
|
||||
|
||||
Если upstream stream protocol повреждён, мы должны видеть это как platform issue, а не скрывать его кастомной очередью.
|
||||
|
||||
### 2. Policy и transport разделяются
|
||||
|
||||
Transport:
|
||||
|
||||
- говорит с upstream API
|
||||
- доставляет события
|
||||
- закрывает соединение
|
||||
|
||||
Policy:
|
||||
|
||||
- решает, что считать recoverable failure
|
||||
- нужна ли повторная попытка
|
||||
- как сообщать ошибку пользователю
|
||||
- нужно ли сбрасывать chat session
|
||||
|
||||
На первом этапе policy не расширяется. Мы сначала приводим transport к тонкому адаптеру, потом заново оцениваем реальные проблемы.
|
||||
|
||||
### 3. Session lifecycle остаётся на нашей стороне
|
||||
|
||||
Даже в thin-adapter модели `surfaces` по-прежнему отвечает за:
|
||||
|
||||
- кеширование client per chat
|
||||
- один send lock на chat
|
||||
- сброс мёртвой chat session после failure
|
||||
- mapping upstream exceptions в `PlatformError`
|
||||
|
||||
Это не transport semantics, а integration lifecycle.
|
||||
|
||||
## Варианты
|
||||
|
||||
### Вариант A. Оставить текущий кастомный wrapper
|
||||
|
||||
Плюсы:
|
||||
|
||||
- уже работает на части сценариев
|
||||
- содержит built-in mitigations против observed failures
|
||||
|
||||
Минусы:
|
||||
|
||||
- нарушает границу ответственности
|
||||
- усложняет диагностику
|
||||
- делает platform bug reports спорными
|
||||
- содержит symptom-fix логику в transport layer
|
||||
|
||||
Вердикт: не подходит как production-like target.
|
||||
|
||||
### Вариант B. Thin upstream adapter
|
||||
|
||||
Плюсы:
|
||||
|
||||
- чистая архитектура
|
||||
- честная диагностика upstream проблем
|
||||
- минимальная собственная магия
|
||||
|
||||
Минусы:
|
||||
|
||||
- локальные mitigations исчезают
|
||||
- если upstream client несовершенен, это сразу проявится
|
||||
|
||||
Вердикт: правильный первый этап.
|
||||
|
||||
### Вариант C. Thin adapter сейчас, outer policy layer потом
|
||||
|
||||
Плюсы:
|
||||
|
||||
- даёт production-like эволюцию
|
||||
- не смешивает transport и resilience policy
|
||||
- позволяет сначала увидеть реальные проблемы, потом адресовать только нужные
|
||||
|
||||
Минусы:
|
||||
|
||||
- требует двух фаз вместо одной
|
||||
|
||||
Вердикт: рекомендуемый путь.
|
||||
|
||||
## Рекомендуемая архитектура
|
||||
|
||||
### Слой 1. Upstream client
|
||||
|
||||
Источник истины:
|
||||
|
||||
- [external/platform-agent_api/lambda_agent_api/agent_api.py](/Users/a/MAI/sem2/lambda/surfaces-bot/external/platform-agent_api/lambda_agent_api/agent_api.py)
|
||||
|
||||
Мы принимаем его stream semantics как authoritative behavior.
|
||||
|
||||
### Слой 2. Thin adapter
|
||||
|
||||
Файл:
|
||||
|
||||
- [sdk/agent_api_wrapper.py](/Users/a/MAI/sem2/lambda/surfaces-bot/sdk/agent_api_wrapper.py)
|
||||
|
||||
После cleanup он должен содержать только:
|
||||
|
||||
- создание клиента через modern constructor
|
||||
- `base_url` normalization, если это действительно нужно для наших env
|
||||
- `for_chat(chat_id)` как factory convenience
|
||||
- опционально thin storage для `last_tokens_used`, если это можно сделать без переопределения stream semantics
|
||||
|
||||
Он не должен переопределять:
|
||||
|
||||
- `_listen()`
|
||||
- `send_message()`
|
||||
- queue lifecycle
|
||||
- post-END behavior
|
||||
- timeout behavior
|
||||
|
||||
### Слой 3. Integration/session layer
|
||||
|
||||
Файл:
|
||||
|
||||
- [sdk/real.py](/Users/a/MAI/sem2/lambda/surfaces-bot/sdk/real.py)
|
||||
|
||||
Ответственность:
|
||||
|
||||
- кешировать chat client instances
|
||||
- сериализовать sends по chat lock
|
||||
- вызывать `disconnect_chat(chat_id)` после transport failure
|
||||
- превращать upstream exceptions в `PlatformError`
|
||||
- форвардить `attachments` как relative workspace paths
|
||||
- собирать `MessageResponse` / `MessageChunk` для остального приложения
|
||||
|
||||
Этот слой не должен заниматься:
|
||||
|
||||
- исправлением broken stream boundaries
|
||||
- custom post-END reconstruction
|
||||
- поздним дренированием очереди
|
||||
|
||||
## Что удаляем
|
||||
|
||||
Из [sdk/agent_api_wrapper.py](/Users/a/MAI/sem2/lambda/surfaces-bot/sdk/agent_api_wrapper.py):
|
||||
|
||||
- custom `_listen()`
|
||||
- custom `send_message()`
|
||||
- `_drain_post_end_events()`
|
||||
- `_event_kind()`
|
||||
- `_is_kind()`
|
||||
- `_is_text_event()`
|
||||
- `_is_end_event()`
|
||||
- `_is_send_file_event()`
|
||||
- `_POST_END_DRAIN_MS`
|
||||
- `_STREAM_IDLE_TIMEOUT_MS`
|
||||
- debug logging, завязанное на наш собственный queue lifecycle
|
||||
|
||||
## Что оставляем
|
||||
|
||||
В thin adapter:
|
||||
|
||||
- `__init__()` для modern `base_url/chat_id`
|
||||
- `_normalize_base_url()` только если нужен стабильный env input
|
||||
- `for_chat(chat_id)`
|
||||
|
||||
В [sdk/real.py](/Users/a/MAI/sem2/lambda/surfaces-bot/sdk/real.py):
|
||||
|
||||
- `_get_chat_api()`
|
||||
- `_get_chat_send_lock()`
|
||||
- `_attachment_paths()`
|
||||
- `disconnect_chat()`
|
||||
- `_handle_chat_api_failure()`
|
||||
- `send_message()`
|
||||
- `stream_message()`
|
||||
|
||||
## Дополнительное упрощение
|
||||
|
||||
Если после cleanup мы считаем pinned upstream API обязательным, то из [sdk/real.py](/Users/a/MAI/sem2/lambda/surfaces-bot/sdk/real.py) можно убрать legacy-minded probing:
|
||||
|
||||
- `inspect.signature(send_message)`
|
||||
- conditional fallback на старый `send_message(text)` без `attachments`
|
||||
|
||||
В этом случае `RealPlatformClient` всегда использует современный контракт:
|
||||
|
||||
- `send_message(text, attachments=...)`
|
||||
|
||||
Это ещё сильнее уменьшит ambiguity.
|
||||
|
||||
## Этапы миграции
|
||||
|
||||
### Этап 1. Cleanup до thin adapter
|
||||
|
||||
Делаем:
|
||||
|
||||
- сжимаем `sdk/agent_api_wrapper.py` до thin shim
|
||||
- переносим всю допустимую resilience logic только в `sdk/real.py`
|
||||
- удаляем тесты, которые закрепляют наши кастомные transport semantics
|
||||
|
||||
### Этап 2. Повторная верификация
|
||||
|
||||
Заново прогоняем:
|
||||
|
||||
- text-only flow
|
||||
- staged attachments flow
|
||||
- large image failure
|
||||
- duplicate `END` behavior
|
||||
- behavior after transport disconnect
|
||||
|
||||
На этом этапе мы честно увидим, что реально делает upstream transport.
|
||||
|
||||
### Этап 3. Опциональный outer policy layer
|
||||
|
||||
Только если после Этапа 2 это действительно нужно, добавляем policy поверх transport:
|
||||
|
||||
- request timeout целиком
|
||||
- retry policy
|
||||
- circuit-breaker-like behavior
|
||||
|
||||
Но это должно жить не в client wrapper, а выше, в integration layer.
|
||||
|
||||
## Тестовая стратегия
|
||||
|
||||
### Удаляем как нецелевые тесты
|
||||
|
||||
Больше не считаем нормой:
|
||||
|
||||
- post-END drain behavior
|
||||
- recovery late chunks после `END`
|
||||
- idle timeout внутри wrapper как часть client contract
|
||||
|
||||
### Оставляем и добавляем
|
||||
|
||||
Нужные guarantees:
|
||||
|
||||
1. создаётся отдельный client per chat
|
||||
2. один chat сериализуется через lock
|
||||
3. разные чаты не делят client instance
|
||||
4. attachment paths уходят в `send_message(..., attachments=...)`
|
||||
5. transport failure приводит к `disconnect_chat(chat_id)`
|
||||
6. следующий запрос после failure открывает новую chat session
|
||||
7. upstream exception превращается в `PlatformError`
|
||||
|
||||
## Риски
|
||||
|
||||
### 1. Может снова проявиться реальный upstream bug
|
||||
|
||||
Это не regression дизайна, а полезный результат cleanup.
|
||||
|
||||
### 2. Может исчезнуть локальная защита от зависших стримов
|
||||
|
||||
Это допустимо на первом этапе.
|
||||
Если она действительно нужна, она должна вернуться как outer policy, а не как переписанный client transport.
|
||||
|
||||
### 3. Может выясниться, что даже thin wrapper не нужен
|
||||
|
||||
Если modern upstream `AgentApi` уже полностью покрывает наш use case, файл `sdk/agent_api_wrapper.py` можно будет заменить на маленький factory helper или убрать совсем.
|
||||
|
||||
## Критерии успеха
|
||||
|
||||
Результат считается успешным, если:
|
||||
|
||||
- transport layer в `surfaces` перестаёт иметь собственную stream semantics
|
||||
- platform bug reports снова можно формулировать без сильного caveat про кастомный клиент
|
||||
- Matrix real backend продолжает работать на text-only и attachments scenarios
|
||||
- failure handling остаётся контролируемым, но больше не маскирует transport behavior платформы
|
||||
|
||||
## Решение
|
||||
|
||||
Принять путь:
|
||||
|
||||
- `Thin upstream adapter now`
|
||||
- `Observe real behavior`
|
||||
- `Add outer policy later only if needed`
|
||||
|
||||
Это наиболее близкий к production best practice вариант для текущего состояния проекта.
|
||||
|
|
@ -1,87 +1,44 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import inspect
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
from collections.abc import AsyncIterator
|
||||
from pathlib import Path
|
||||
from urllib.parse import urlsplit, urlunsplit
|
||||
|
||||
import aiohttp
|
||||
|
||||
_api_root = Path(__file__).resolve().parents[1] / "external" / "platform-agent_api"
|
||||
if str(_api_root) not in sys.path:
|
||||
sys.path.insert(0, str(_api_root))
|
||||
|
||||
from lambda_agent_api.agent_api import AgentApi, AgentBusyException, AgentException # noqa: E402
|
||||
from lambda_agent_api.client import EClientMessage, MsgUserMessage # noqa: E402
|
||||
from lambda_agent_api.server import AgentEventUnion, MsgEventEnd, ServerMessage # noqa: E402
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
_DEBUG_STREAM = os.environ.get("SURFACES_AGENT_DEBUG_STREAM", "").strip().lower() in {
|
||||
"1",
|
||||
"true",
|
||||
"yes",
|
||||
}
|
||||
_POST_END_DRAIN_MS = int(os.environ.get("SURFACES_AGENT_POST_END_DRAIN_MS", "120"))
|
||||
_STREAM_IDLE_TIMEOUT_MS = int(os.environ.get("SURFACES_AGENT_IDLE_TIMEOUT_MS", "60000"))
|
||||
from lambda_agent_api.agent_api import AgentApi # noqa: E402
|
||||
|
||||
|
||||
class AgentApiWrapper(AgentApi):
|
||||
"""Capture tokens_used from MsgEventEnd without patching upstream code."""
|
||||
"""Thin construction/factory shim over the pinned upstream AgentApi."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
agent_id: str,
|
||||
base_url: str | None = None,
|
||||
base_url: str,
|
||||
*,
|
||||
chat_id: int | str = 0,
|
||||
url: str | None = None,
|
||||
**kwargs,
|
||||
) -> None:
|
||||
if base_url is None and url is None:
|
||||
raise TypeError("AgentApiWrapper requires base_url or url")
|
||||
|
||||
self._base_url = self._normalize_base_url(base_url or url or "")
|
||||
self._base_url = self._normalize_base_url(base_url)
|
||||
self._init_kwargs = dict(kwargs)
|
||||
self.chat_id = chat_id
|
||||
if self._supports_modern_constructor():
|
||||
super().__init__(
|
||||
agent_id=agent_id,
|
||||
base_url=self._base_url,
|
||||
chat_id=chat_id,
|
||||
**kwargs,
|
||||
)
|
||||
else:
|
||||
super().__init__(
|
||||
agent_id=agent_id,
|
||||
url=self._build_ws_url(self._base_url, chat_id),
|
||||
**kwargs,
|
||||
)
|
||||
self.last_tokens_used = 0
|
||||
|
||||
@staticmethod
|
||||
def _supports_modern_constructor() -> bool:
|
||||
try:
|
||||
parameters = inspect.signature(AgentApi.__init__).parameters
|
||||
except (TypeError, ValueError):
|
||||
return False
|
||||
|
||||
return "base_url" in parameters and "chat_id" in parameters
|
||||
super().__init__(
|
||||
agent_id=agent_id,
|
||||
base_url=self._base_url,
|
||||
chat_id=chat_id,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _normalize_base_url(base_url: str) -> str:
|
||||
parsed = urlsplit(base_url)
|
||||
path = re.sub(r"(?:/v1)?/agent_ws(?:/[^/]+)?$", "", parsed.path.rstrip("/"))
|
||||
path = re.sub(r"(?:/v1)?/agent_ws(?:/[^/]+)?/?$", "", parsed.path.rstrip("/"))
|
||||
return urlunsplit((parsed.scheme, parsed.netloc, path, "", ""))
|
||||
|
||||
@staticmethod
|
||||
def _build_ws_url(base_url: str, chat_id: int | str) -> str:
|
||||
return base_url.rstrip("/") + f"/agent_ws/?thread_id={chat_id}"
|
||||
|
||||
def for_chat(self, chat_id: int | str) -> AgentApiWrapper:
|
||||
return type(self)(
|
||||
agent_id=self.id,
|
||||
|
|
@ -89,227 +46,3 @@ class AgentApiWrapper(AgentApi):
|
|||
chat_id=chat_id,
|
||||
**self._init_kwargs,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _event_kind(event: object) -> str:
|
||||
raw_kind = getattr(event, "type", None)
|
||||
if hasattr(raw_kind, "value"):
|
||||
raw_kind = raw_kind.value
|
||||
if raw_kind is None:
|
||||
raw_kind = event.__class__.__name__
|
||||
|
||||
kind = str(raw_kind).replace("-", "_")
|
||||
if "_" in kind:
|
||||
return kind.upper()
|
||||
|
||||
normalized = []
|
||||
for index, char in enumerate(kind):
|
||||
if index and char.isupper() and not kind[index - 1].isupper():
|
||||
normalized.append("_")
|
||||
normalized.append(char)
|
||||
return "".join(normalized).upper()
|
||||
|
||||
@classmethod
|
||||
def _is_kind(cls, event: object, *needles: str) -> bool:
|
||||
kind = cls._event_kind(event)
|
||||
return any(needle in kind for needle in needles)
|
||||
|
||||
@classmethod
|
||||
def _is_text_event(cls, event: object) -> bool:
|
||||
return hasattr(event, "text") or cls._is_kind(event, "TEXT_CHUNK")
|
||||
|
||||
@classmethod
|
||||
def _is_end_event(cls, event: object) -> bool:
|
||||
kind = cls._event_kind(event)
|
||||
return kind == "END" or kind.endswith("_END")
|
||||
|
||||
@classmethod
|
||||
def _is_send_file_event(cls, event: object) -> bool:
|
||||
return "SEND_FILE" in cls._event_kind(event)
|
||||
|
||||
async def _publish_event(self, event: object, *, queue_event: object | None = None) -> None:
|
||||
if self.callback:
|
||||
self.callback(event)
|
||||
if self._current_queue:
|
||||
await self._current_queue.put(queue_event if queue_event is not None else event)
|
||||
|
||||
async def _publish_error(self, event: object) -> None:
|
||||
if self.callback:
|
||||
self.callback(event)
|
||||
if self._current_queue and hasattr(event, "code") and hasattr(event, "details"):
|
||||
await self._current_queue.put(AgentException(event.code, event.details))
|
||||
|
||||
async def _listen(self):
|
||||
try:
|
||||
async for msg in self._ws:
|
||||
if msg.type == aiohttp.WSMsgType.TEXT:
|
||||
try:
|
||||
outgoing_msg = ServerMessage.validate_json(msg.data)
|
||||
|
||||
if self._is_text_event(outgoing_msg):
|
||||
if _DEBUG_STREAM:
|
||||
logger.warning(
|
||||
"[%s] text chunk queue=%s text=%r",
|
||||
self.id,
|
||||
self._current_queue is not None,
|
||||
getattr(outgoing_msg, "text", "")[:80],
|
||||
)
|
||||
if self._current_queue:
|
||||
await self._current_queue.put(outgoing_msg)
|
||||
elif self.callback:
|
||||
self.callback(outgoing_msg)
|
||||
else:
|
||||
logger.warning("[%s] AgentEvent without active request", self.id)
|
||||
|
||||
elif self._is_end_event(outgoing_msg):
|
||||
self.last_tokens_used = outgoing_msg.tokens_used
|
||||
if _DEBUG_STREAM:
|
||||
logger.warning(
|
||||
"[%s] end event queue=%s tokens=%s",
|
||||
self.id,
|
||||
self._current_queue is not None,
|
||||
getattr(outgoing_msg, "tokens_used", None),
|
||||
)
|
||||
await self._publish_event(outgoing_msg)
|
||||
|
||||
elif self._is_kind(outgoing_msg, "ERROR"):
|
||||
error = AgentException(outgoing_msg.code, outgoing_msg.details)
|
||||
logger.error("[%s] Agent error: %s", self.id, error)
|
||||
await self._publish_error(outgoing_msg)
|
||||
|
||||
elif self._is_kind(outgoing_msg, "GRACEFUL_DISCONNECT"):
|
||||
await self._publish_event(outgoing_msg)
|
||||
logger.info("[%s] Gracefully disconnecting", self.id)
|
||||
break
|
||||
|
||||
else:
|
||||
await self._publish_event(outgoing_msg)
|
||||
|
||||
except Exception as exc:
|
||||
logger.error("[%s] Failed to deserialize message: %s", self.id, exc)
|
||||
if self._current_queue:
|
||||
await self._current_queue.put(
|
||||
AgentException("PARSE_ERROR", f"Validation failed: {exc}")
|
||||
)
|
||||
|
||||
elif msg.type in (aiohttp.WSMsgType.ERROR, aiohttp.WSMsgType.CLOSED):
|
||||
logger.error("[%s] WebSocket closed/error: %s", self.id, msg.type)
|
||||
break
|
||||
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
except Exception as exc:
|
||||
logger.error("[%s] Error in listen loop: %s", self.id, exc)
|
||||
finally:
|
||||
await self._cleanup()
|
||||
|
||||
async def send_message(
|
||||
self, text: str, attachments: list[str] | None = None
|
||||
) -> AsyncIterator[AgentEventUnion]:
|
||||
if not self._connected or not self._ws:
|
||||
raise AgentException(
|
||||
code="NOT_CONNECTED", details="Not connected. Call connect() first."
|
||||
)
|
||||
|
||||
if self._request_lock.locked():
|
||||
raise AgentBusyException("Agent is currently processing another request")
|
||||
|
||||
await self._request_lock.acquire()
|
||||
try:
|
||||
self._current_queue = asyncio.Queue()
|
||||
|
||||
message = MsgUserMessage(
|
||||
type=EClientMessage.USER_MESSAGE,
|
||||
text=text,
|
||||
attachments=attachments or [],
|
||||
)
|
||||
|
||||
await self._ws.send_str(message.model_dump_json())
|
||||
logger.debug("[%s] Sent message: %s...", self.id, text[:50])
|
||||
|
||||
while True:
|
||||
try:
|
||||
chunk = await asyncio.wait_for(
|
||||
self._current_queue.get(),
|
||||
timeout=max(_STREAM_IDLE_TIMEOUT_MS, 0) / 1000,
|
||||
)
|
||||
except TimeoutError as exc:
|
||||
raise AgentException(
|
||||
"TIMEOUT",
|
||||
(
|
||||
"Timed out waiting for the next agent stream event "
|
||||
f"after {max(_STREAM_IDLE_TIMEOUT_MS, 0)}ms"
|
||||
),
|
||||
) from exc
|
||||
|
||||
if isinstance(chunk, Exception):
|
||||
raise chunk
|
||||
|
||||
if isinstance(chunk, MsgEventEnd):
|
||||
self.last_tokens_used = chunk.tokens_used
|
||||
async for late_chunk in self._drain_post_end_events():
|
||||
yield late_chunk
|
||||
break
|
||||
|
||||
yield chunk
|
||||
|
||||
finally:
|
||||
if self._current_queue:
|
||||
orphan_queue = self._current_queue
|
||||
self._current_queue = None
|
||||
|
||||
while not orphan_queue.empty():
|
||||
try:
|
||||
orphan_msg = orphan_queue.get_nowait()
|
||||
if isinstance(orphan_msg, Exception):
|
||||
logger.debug(
|
||||
"[%s] Dropped exception from queue during cleanup: %s",
|
||||
self.id,
|
||||
orphan_msg,
|
||||
)
|
||||
continue
|
||||
|
||||
if self.callback:
|
||||
self.callback(orphan_msg)
|
||||
else:
|
||||
logger.debug("[%s] Dropped orphaned message during cleanup", self.id)
|
||||
|
||||
except asyncio.QueueEmpty:
|
||||
break
|
||||
|
||||
if self._request_lock.locked():
|
||||
self._request_lock.release()
|
||||
|
||||
async def _drain_post_end_events(self) -> AsyncIterator[AgentEventUnion]:
|
||||
if self._current_queue is None:
|
||||
return
|
||||
|
||||
timeout_s = max(_POST_END_DRAIN_MS, 0) / 1000
|
||||
while True:
|
||||
try:
|
||||
chunk = await asyncio.wait_for(self._current_queue.get(), timeout=timeout_s)
|
||||
except TimeoutError:
|
||||
break
|
||||
|
||||
if isinstance(chunk, Exception):
|
||||
logger.warning("[%s] dropping post-END exception: %s", self.id, chunk)
|
||||
continue
|
||||
|
||||
if isinstance(chunk, MsgEventEnd):
|
||||
self.last_tokens_used = chunk.tokens_used
|
||||
if _DEBUG_STREAM:
|
||||
logger.warning(
|
||||
"[%s] dropped duplicate END tokens=%s",
|
||||
self.id,
|
||||
chunk.tokens_used,
|
||||
)
|
||||
continue
|
||||
|
||||
if _DEBUG_STREAM and self._is_text_event(chunk):
|
||||
logger.warning(
|
||||
"[%s] recovered post-END text chunk=%r",
|
||||
self.id,
|
||||
getattr(chunk, "text", "")[:80],
|
||||
)
|
||||
|
||||
yield chunk
|
||||
|
|
|
|||
152
sdk/real.py
152
sdk/real.py
|
|
@ -1,10 +1,11 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import inspect
|
||||
from collections.abc import AsyncIterator
|
||||
from pathlib import Path
|
||||
|
||||
from lambda_agent_api.server import MsgEventSendFile, MsgEventTextChunk
|
||||
|
||||
from sdk.agent_api_wrapper import AgentApiWrapper
|
||||
from sdk.interface import (
|
||||
Attachment,
|
||||
|
|
@ -40,14 +41,10 @@ class RealPlatformClient(PlatformClient):
|
|||
chat_key = str(chat_id)
|
||||
chat_api = self._chat_apis.get(chat_key)
|
||||
if chat_api is None:
|
||||
chat_api_factory = getattr(self._agent_api, "for_chat", None)
|
||||
if not callable(chat_api_factory):
|
||||
return self._agent_api
|
||||
|
||||
async with self._chat_api_lock:
|
||||
chat_api = self._chat_apis.get(chat_key)
|
||||
if chat_api is None:
|
||||
chat_api = chat_api_factory(chat_key)
|
||||
chat_api = self._agent_api.for_chat(chat_key)
|
||||
await chat_api.connect()
|
||||
self._chat_apis[chat_key] = chat_api
|
||||
return chat_api
|
||||
|
|
@ -80,48 +77,36 @@ class RealPlatformClient(PlatformClient):
|
|||
attachments: list[Attachment] | None = None,
|
||||
) -> MessageResponse:
|
||||
response_parts: list[str] = []
|
||||
tokens_used = 0
|
||||
sent_attachments: list[Attachment] = []
|
||||
message_id = user_id
|
||||
saw_end_event = False
|
||||
|
||||
lock = self._get_chat_send_lock(chat_id)
|
||||
async with lock:
|
||||
chat_api = await self._get_chat_api(chat_id)
|
||||
if hasattr(chat_api, "last_tokens_used"):
|
||||
chat_api.last_tokens_used = 0
|
||||
|
||||
try:
|
||||
async for event in self._stream_agent_events(
|
||||
chat_api, text, attachments=attachments
|
||||
):
|
||||
message_id = user_id
|
||||
if self._is_text_event(event):
|
||||
chunk_text = getattr(event, "text", "")
|
||||
if chunk_text:
|
||||
response_parts.append(chunk_text)
|
||||
elif self._is_end_event(event):
|
||||
tokens_used = getattr(event, "tokens_used", tokens_used)
|
||||
saw_end_event = True
|
||||
elif self._is_send_file_event(event):
|
||||
if isinstance(event, MsgEventTextChunk) and event.text:
|
||||
response_parts.append(event.text)
|
||||
elif isinstance(event, MsgEventSendFile):
|
||||
attachment = self._attachment_from_send_file_event(event)
|
||||
if attachment is not None:
|
||||
sent_attachments.append(attachment)
|
||||
except Exception as exc:
|
||||
await self._handle_chat_api_failure(chat_id, exc)
|
||||
|
||||
if not saw_end_event:
|
||||
tokens_used = getattr(chat_api, "last_tokens_used", tokens_used)
|
||||
await self._prototype_state.set_last_tokens_used(str(chat_id), tokens_used)
|
||||
await self._prototype_state.set_last_tokens_used(str(chat_id), 0)
|
||||
|
||||
response_kwargs = {
|
||||
"message_id": message_id,
|
||||
"response": "".join(response_parts),
|
||||
"tokens_used": tokens_used,
|
||||
"tokens_used": 0,
|
||||
"finished": True,
|
||||
"attachments": sent_attachments,
|
||||
}
|
||||
if self._message_response_accepts_attachments():
|
||||
response_kwargs["attachments"] = sent_attachments
|
||||
return MessageResponse(**response_kwargs)
|
||||
|
||||
async def stream_message(
|
||||
|
|
@ -134,44 +119,27 @@ class RealPlatformClient(PlatformClient):
|
|||
lock = self._get_chat_send_lock(chat_id)
|
||||
async with lock:
|
||||
chat_api = await self._get_chat_api(chat_id)
|
||||
if hasattr(chat_api, "last_tokens_used"):
|
||||
chat_api.last_tokens_used = 0
|
||||
saw_end_event = False
|
||||
try:
|
||||
async for event in self._stream_agent_events(
|
||||
chat_api, text, attachments=attachments
|
||||
):
|
||||
if self._is_text_event(event):
|
||||
if isinstance(event, MsgEventTextChunk):
|
||||
yield MessageChunk(
|
||||
message_id=user_id,
|
||||
delta=getattr(event, "text", ""),
|
||||
delta=event.text,
|
||||
finished=False,
|
||||
)
|
||||
elif self._is_end_event(event):
|
||||
tokens_used = getattr(event, "tokens_used", 0)
|
||||
saw_end_event = True
|
||||
await self._prototype_state.set_last_tokens_used(str(chat_id), tokens_used)
|
||||
yield MessageChunk(
|
||||
message_id=user_id,
|
||||
delta="",
|
||||
finished=True,
|
||||
tokens_used=tokens_used,
|
||||
)
|
||||
elif self._is_send_file_event(event):
|
||||
continue
|
||||
else:
|
||||
elif isinstance(event, MsgEventSendFile):
|
||||
continue
|
||||
except Exception as exc:
|
||||
await self._handle_chat_api_failure(chat_id, exc)
|
||||
if not saw_end_event:
|
||||
tokens_used = getattr(chat_api, "last_tokens_used", 0)
|
||||
await self._prototype_state.set_last_tokens_used(str(chat_id), tokens_used)
|
||||
yield MessageChunk(
|
||||
message_id=user_id,
|
||||
delta="",
|
||||
finished=True,
|
||||
tokens_used=tokens_used,
|
||||
)
|
||||
await self._prototype_state.set_last_tokens_used(str(chat_id), 0)
|
||||
yield MessageChunk(
|
||||
message_id=user_id,
|
||||
delta="",
|
||||
finished=True,
|
||||
tokens_used=0,
|
||||
)
|
||||
|
||||
async def get_settings(self, user_id: str) -> UserSettings:
|
||||
return await self._prototype_state.get_settings(user_id)
|
||||
|
|
@ -195,10 +163,6 @@ class RealPlatformClient(PlatformClient):
|
|||
await close()
|
||||
self._chat_apis.clear()
|
||||
self._chat_send_locks.clear()
|
||||
if not callable(getattr(self._agent_api, "for_chat", None)):
|
||||
close = getattr(self._agent_api, "close", None)
|
||||
if callable(close):
|
||||
await close()
|
||||
|
||||
async def _stream_agent_events(
|
||||
self,
|
||||
|
|
@ -206,12 +170,8 @@ class RealPlatformClient(PlatformClient):
|
|||
text: str,
|
||||
attachments: list[Attachment] | None = None,
|
||||
) -> AsyncIterator[object]:
|
||||
send_message = chat_api.send_message
|
||||
attachment_paths = self._attachment_paths(attachments)
|
||||
if attachment_paths and self._send_message_accepts_attachments(send_message):
|
||||
event_stream = send_message(text, attachments=attachment_paths)
|
||||
else:
|
||||
event_stream = send_message(text)
|
||||
event_stream = chat_api.send_message(text, attachments=attachment_paths or None)
|
||||
async for event in event_stream:
|
||||
yield event
|
||||
|
||||
|
|
@ -231,61 +191,9 @@ class RealPlatformClient(PlatformClient):
|
|||
return paths
|
||||
|
||||
@staticmethod
|
||||
def _send_message_accepts_attachments(send_message) -> bool:
|
||||
try:
|
||||
parameters = inspect.signature(send_message).parameters
|
||||
except (TypeError, ValueError):
|
||||
return False
|
||||
return "attachments" in parameters or any(
|
||||
parameter.kind == inspect.Parameter.VAR_KEYWORD for parameter in parameters.values()
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _event_kind(event: object) -> str:
|
||||
raw_kind = getattr(event, "type", None)
|
||||
if hasattr(raw_kind, "value"):
|
||||
raw_kind = raw_kind.value
|
||||
if raw_kind is None:
|
||||
raw_kind = event.__class__.__name__
|
||||
|
||||
kind = str(raw_kind).replace("-", "_")
|
||||
if "_" in kind:
|
||||
return kind.upper()
|
||||
normalized = []
|
||||
for index, char in enumerate(kind):
|
||||
if index and char.isupper() and not kind[index - 1].isupper():
|
||||
normalized.append("_")
|
||||
normalized.append(char)
|
||||
return "".join(normalized).upper()
|
||||
|
||||
@classmethod
|
||||
def _is_text_event(cls, event: object) -> bool:
|
||||
return hasattr(event, "text") or "TEXT_CHUNK" in cls._event_kind(event)
|
||||
|
||||
@classmethod
|
||||
def _is_end_event(cls, event: object) -> bool:
|
||||
kind = cls._event_kind(event)
|
||||
return kind == "END" or kind.endswith("_END")
|
||||
|
||||
@classmethod
|
||||
def _is_send_file_event(cls, event: object) -> bool:
|
||||
kind = cls._event_kind(event)
|
||||
return "SEND_FILE" in kind
|
||||
|
||||
@staticmethod
|
||||
def _attachment_from_send_file_event(event: object) -> Attachment | None:
|
||||
location = None
|
||||
for attr in ("url", "workspace_path", "path", "file_path", "uri"):
|
||||
value = getattr(event, attr, None)
|
||||
if value:
|
||||
location = str(value)
|
||||
break
|
||||
if location is None:
|
||||
return None
|
||||
|
||||
mime_type = getattr(event, "mime_type", None) or "application/octet-stream"
|
||||
filename = getattr(event, "filename", None) or Path(location).name or None
|
||||
size = getattr(event, "size", None)
|
||||
def _attachment_from_send_file_event(event: MsgEventSendFile) -> Attachment:
|
||||
location = str(event.path)
|
||||
filename = Path(location).name or None
|
||||
workspace_path = location
|
||||
if workspace_path.startswith("/workspace/"):
|
||||
workspace_path = workspace_path[len("/workspace/") :]
|
||||
|
|
@ -293,18 +201,8 @@ class RealPlatformClient(PlatformClient):
|
|||
workspace_path = ""
|
||||
return Attachment(
|
||||
url=location,
|
||||
mime_type=mime_type,
|
||||
size=size,
|
||||
mime_type="application/octet-stream",
|
||||
size=None,
|
||||
filename=filename,
|
||||
workspace_path=workspace_path or None,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _message_response_accepts_attachments() -> bool:
|
||||
fields = getattr(MessageResponse, "model_fields", None)
|
||||
if isinstance(fields, dict):
|
||||
return "attachments" in fields
|
||||
try:
|
||||
return "attachments" in inspect.signature(MessageResponse).parameters
|
||||
except (TypeError, ValueError):
|
||||
return False
|
||||
|
|
|
|||
|
|
@ -911,9 +911,9 @@ async def test_build_runtime_uses_real_platform_when_matrix_backend_is_real(monk
|
|||
bot_module = importlib.import_module("adapter.matrix.bot")
|
||||
|
||||
class FakeAgentApiWrapper:
|
||||
def __init__(self, agent_id: str, url: str) -> None:
|
||||
def __init__(self, agent_id: str, base_url: str) -> None:
|
||||
self.agent_id = agent_id
|
||||
self.url = url
|
||||
self.base_url = base_url
|
||||
|
||||
monkeypatch.setattr(bot_module, "AgentApiWrapper", FakeAgentApiWrapper)
|
||||
monkeypatch.setenv("MATRIX_PLATFORM_BACKEND", "real")
|
||||
|
|
@ -922,7 +922,7 @@ async def test_build_runtime_uses_real_platform_when_matrix_backend_is_real(monk
|
|||
runtime = build_runtime()
|
||||
|
||||
assert isinstance(runtime.platform, RealPlatformClient)
|
||||
assert runtime.platform.agent_api.url == "ws://agent.example/agent_ws/"
|
||||
assert runtime.platform.agent_api.base_url == "ws://agent.example/agent_ws/"
|
||||
|
||||
|
||||
async def test_matrix_main_closes_platform_without_connecting_root_agent(monkeypatch):
|
||||
|
|
|
|||
|
|
@ -4,32 +4,55 @@ Smoke test: полный цикл через dispatcher + реальные manag
|
|||
Имитирует что делает адаптер (Telegram или Matrix) при получении события.
|
||||
"""
|
||||
import pytest
|
||||
from sdk.mock import MockPlatformClient
|
||||
from sdk.interface import MessageChunk, MessageResponse
|
||||
from sdk.prototype_state import PrototypeStateStore
|
||||
from sdk.real import RealPlatformClient
|
||||
from core.store import InMemoryStore
|
||||
from core.chat import ChatManager
|
||||
from lambda_agent_api.server import MsgEventTextChunk
|
||||
|
||||
from core.auth import AuthManager
|
||||
from core.settings import SettingsManager
|
||||
from core.chat import ChatManager
|
||||
from core.handler import EventDispatcher
|
||||
from core.handlers import register_all
|
||||
from core.protocol import (
|
||||
IncomingCommand, IncomingMessage, IncomingCallback,
|
||||
OutgoingMessage, OutgoingUI,
|
||||
Attachment, SettingsAction,
|
||||
Attachment,
|
||||
IncomingCallback,
|
||||
IncomingCommand,
|
||||
IncomingMessage,
|
||||
OutgoingMessage,
|
||||
OutgoingUI,
|
||||
)
|
||||
from core.settings import SettingsManager
|
||||
from core.store import InMemoryStore
|
||||
from sdk.mock import MockPlatformClient
|
||||
from sdk.prototype_state import PrototypeStateStore
|
||||
from sdk.real import RealPlatformClient
|
||||
|
||||
|
||||
class FakeAgentApi:
|
||||
def __init__(self) -> None:
|
||||
def __init__(self, chat_id: str) -> None:
|
||||
self.chat_id = chat_id
|
||||
self.calls: list[tuple[str, list[str]]] = []
|
||||
self.last_tokens_used = 0
|
||||
self.connect_calls = 0
|
||||
self.close_calls = 0
|
||||
|
||||
async def connect(self) -> None:
|
||||
self.connect_calls += 1
|
||||
|
||||
async def close(self) -> None:
|
||||
self.close_calls += 1
|
||||
|
||||
async def send_message(self, text: str, attachments: list[str] | None = None):
|
||||
self.calls.append((text, attachments or []))
|
||||
yield type("Chunk", (), {"text": f"[REAL] {text}"})()
|
||||
self.last_tokens_used = 5
|
||||
yield MsgEventTextChunk(text=f"[REAL] {text}")
|
||||
|
||||
|
||||
class FakeAgentApiFactory:
|
||||
def __init__(self) -> None:
|
||||
self.created_chat_ids: list[str] = []
|
||||
self.instances: dict[str, FakeAgentApi] = {}
|
||||
|
||||
def for_chat(self, chat_id: str) -> FakeAgentApi:
|
||||
chat_api = FakeAgentApi(chat_id)
|
||||
self.created_chat_ids.append(chat_id)
|
||||
self.instances[chat_id] = chat_api
|
||||
return chat_api
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
|
|
@ -48,7 +71,7 @@ def dispatcher():
|
|||
|
||||
@pytest.fixture
|
||||
def real_dispatcher():
|
||||
agent_api = FakeAgentApi()
|
||||
agent_api = FakeAgentApiFactory()
|
||||
platform = RealPlatformClient(
|
||||
agent_api=agent_api,
|
||||
prototype_state=PrototypeStateStore(),
|
||||
|
|
@ -80,7 +103,13 @@ async def test_new_chat_command(dispatcher):
|
|||
start = IncomingCommand(user_id="u1", platform="matrix", chat_id="C1", command="start")
|
||||
await dispatcher.dispatch(start)
|
||||
|
||||
new = IncomingCommand(user_id="u1", platform="matrix", chat_id="C2", command="new", args=["Анализ"])
|
||||
new = IncomingCommand(
|
||||
user_id="u1",
|
||||
platform="matrix",
|
||||
chat_id="C2",
|
||||
command="new",
|
||||
args=["Анализ"],
|
||||
)
|
||||
result = await dispatcher.dispatch(new)
|
||||
assert any("Анализ" in r.text for r in result if isinstance(r, OutgoingMessage))
|
||||
|
||||
|
|
@ -130,7 +159,8 @@ async def test_full_flow_with_real_platform_uses_shared_agent_api(real_dispatche
|
|||
texts = [r.text for r in result if isinstance(r, OutgoingMessage)]
|
||||
|
||||
assert texts == ["[REAL] Привет!"]
|
||||
assert agent_api.calls == [("Привет!", [])]
|
||||
assert agent_api.created_chat_ids == ["C1"]
|
||||
assert agent_api.instances["C1"].calls == [("Привет!", [])]
|
||||
|
||||
|
||||
async def test_full_flow_with_real_platform_forwards_workspace_attachment(real_dispatcher):
|
||||
|
|
@ -155,6 +185,6 @@ async def test_full_flow_with_real_platform_forwards_workspace_attachment(real_d
|
|||
)
|
||||
await dispatcher.dispatch(msg)
|
||||
|
||||
assert agent_api.calls == [
|
||||
assert agent_api.instances["C1"].calls == [
|
||||
("Посмотри файл", ["surfaces/matrix/u1/room/inbox/report.pdf"])
|
||||
]
|
||||
|
|
|
|||
|
|
@ -1,7 +1,8 @@
|
|||
import asyncio
|
||||
|
||||
import pytest
|
||||
from lambda_agent_api.server import MsgEventEnd, MsgEventTextChunk
|
||||
from lambda_agent_api.server import MsgEventSendFile, MsgEventTextChunk
|
||||
from pydantic import Field
|
||||
|
||||
import sdk.agent_api_wrapper as agent_api_wrapper_module
|
||||
from core.protocol import SettingsAction
|
||||
|
|
@ -11,18 +12,12 @@ from sdk.prototype_state import PrototypeStateStore
|
|||
from sdk.real import RealPlatformClient
|
||||
|
||||
|
||||
class FakeChunk:
|
||||
def __init__(self, text: str) -> None:
|
||||
self.text = text
|
||||
|
||||
|
||||
class FakeChatAgentApi:
|
||||
def __init__(self, chat_id: str) -> None:
|
||||
self.chat_id = chat_id
|
||||
self.calls: list[str] = []
|
||||
self.connect_calls = 0
|
||||
self.close_calls = 0
|
||||
self.last_tokens_used = 0
|
||||
|
||||
async def connect(self) -> None:
|
||||
self.connect_calls += 1
|
||||
|
|
@ -30,12 +25,11 @@ class FakeChatAgentApi:
|
|||
async def close(self) -> None:
|
||||
self.close_calls += 1
|
||||
|
||||
async def send_message(self, text: str):
|
||||
async def send_message(self, text: str, attachments: list[str] | None = None):
|
||||
self.calls.append(text)
|
||||
midpoint = len(text) // 2
|
||||
yield FakeChunk(text[:midpoint])
|
||||
yield FakeChunk(text[midpoint:])
|
||||
self.last_tokens_used = 3
|
||||
yield MsgEventTextChunk(text=text[:midpoint])
|
||||
yield MsgEventTextChunk(text=text[midpoint:])
|
||||
|
||||
|
||||
class FakeAgentApiFactory:
|
||||
|
|
@ -50,25 +44,12 @@ class FakeAgentApiFactory:
|
|||
return chat_api
|
||||
|
||||
|
||||
class LegacyAgentApi:
|
||||
def __init__(self) -> None:
|
||||
self.calls: list[str] = []
|
||||
self.last_tokens_used = 0
|
||||
|
||||
async def send_message(self, text: str):
|
||||
self.calls.append(text)
|
||||
yield FakeChunk(text[:2])
|
||||
yield FakeChunk(text[2:])
|
||||
self.last_tokens_used = 7
|
||||
|
||||
|
||||
class BlockingChatAgentApi:
|
||||
def __init__(self, chat_id: str) -> None:
|
||||
self.chat_id = chat_id
|
||||
self.calls: list[str] = []
|
||||
self.connect_calls = 0
|
||||
self.close_calls = 0
|
||||
self.last_tokens_used = 0
|
||||
self.active_calls = 0
|
||||
self.max_active_calls = 0
|
||||
self.started = asyncio.Event()
|
||||
|
|
@ -80,15 +61,14 @@ class BlockingChatAgentApi:
|
|||
async def close(self) -> None:
|
||||
self.close_calls += 1
|
||||
|
||||
async def send_message(self, text: str):
|
||||
async def send_message(self, text: str, attachments: list[str] | None = None):
|
||||
self.calls.append(text)
|
||||
self.active_calls += 1
|
||||
self.max_active_calls = max(self.max_active_calls, self.active_calls)
|
||||
self.started.set()
|
||||
await self.release.wait()
|
||||
self.active_calls -= 1
|
||||
yield FakeChunk(text)
|
||||
self.last_tokens_used = len(text)
|
||||
yield MsgEventTextChunk(text=text)
|
||||
|
||||
|
||||
class AttachmentTrackingChatAgentApi:
|
||||
|
|
@ -97,7 +77,6 @@ class AttachmentTrackingChatAgentApi:
|
|||
self.calls: list[tuple[str, list[str] | None]] = []
|
||||
self.connect_calls = 0
|
||||
self.close_calls = 0
|
||||
self.last_tokens_used = 0
|
||||
|
||||
async def connect(self) -> None:
|
||||
self.connect_calls += 1
|
||||
|
|
@ -107,8 +86,20 @@ class AttachmentTrackingChatAgentApi:
|
|||
|
||||
async def send_message(self, text: str, attachments: list[str] | None = None):
|
||||
self.calls.append((text, attachments))
|
||||
yield FakeChunk(text)
|
||||
self.last_tokens_used = 5
|
||||
yield MsgEventTextChunk(text=text)
|
||||
|
||||
|
||||
class AttachmentTrackingAgentApiFactory:
|
||||
def __init__(self, chat_api_cls=AttachmentTrackingChatAgentApi) -> None:
|
||||
self.chat_api_cls = chat_api_cls
|
||||
self.created_chat_ids: list[str] = []
|
||||
self.instances: dict[str, AttachmentTrackingChatAgentApi] = {}
|
||||
|
||||
def for_chat(self, chat_id: str) -> AttachmentTrackingChatAgentApi:
|
||||
chat_api = self.chat_api_cls(chat_id)
|
||||
self.created_chat_ids.append(chat_id)
|
||||
self.instances[chat_id] = chat_api
|
||||
return chat_api
|
||||
|
||||
|
||||
class FlakyChatAgentApi:
|
||||
|
|
@ -128,247 +119,61 @@ class FlakyChatAgentApi:
|
|||
yield
|
||||
|
||||
|
||||
class SendFileEvent:
|
||||
def __init__(self, *, workspace_path: str, mime_type: str, filename: str, size: int) -> None:
|
||||
self.type = "AGENT_EVENT_SEND_FILE"
|
||||
self.workspace_path = workspace_path
|
||||
self.mime_type = mime_type
|
||||
self.filename = filename
|
||||
self.size = size
|
||||
|
||||
|
||||
class TextChunkEvent:
|
||||
def __init__(self, text: str) -> None:
|
||||
self.type = "AGENT_EVENT_TEXT_CHUNK"
|
||||
self.text = text
|
||||
|
||||
|
||||
class ToolCallChunkEvent:
|
||||
def __init__(self, payload: str) -> None:
|
||||
self.type = "AGENT_EVENT_TOOL_CALL_CHUNK"
|
||||
self.payload = payload
|
||||
|
||||
|
||||
class ToolResultEvent:
|
||||
def __init__(self, payload: str) -> None:
|
||||
self.type = "AGENT_EVENT_TOOL_RESULT"
|
||||
self.payload = payload
|
||||
|
||||
|
||||
class CustomUpdateEvent:
|
||||
def __init__(self, payload: str) -> None:
|
||||
self.type = "AGENT_EVENT_CUSTOM_UPDATE"
|
||||
self.payload = payload
|
||||
|
||||
|
||||
class EndEvent:
|
||||
def __init__(self, tokens_used: int) -> None:
|
||||
self.type = "AGENT_EVENT_END"
|
||||
self.tokens_used = tokens_used
|
||||
|
||||
|
||||
class ErrorEvent:
|
||||
def __init__(self, code: str, details: str) -> None:
|
||||
self.type = "ERROR"
|
||||
self.code = code
|
||||
self.details = details
|
||||
|
||||
|
||||
class GracefulDisconnectEvent:
|
||||
def __init__(self) -> None:
|
||||
self.type = "GRACEFUL_DISCONNECT"
|
||||
|
||||
|
||||
class FakeWSMessage:
|
||||
def __init__(self, data: str) -> None:
|
||||
self.type = agent_api_wrapper_module.aiohttp.WSMsgType.TEXT
|
||||
self.data = data
|
||||
|
||||
|
||||
class FakeWebSocket:
|
||||
def __init__(self, messages: list[FakeWSMessage]) -> None:
|
||||
self._messages = list(messages)
|
||||
|
||||
def __aiter__(self):
|
||||
return self
|
||||
|
||||
async def __anext__(self):
|
||||
if not self._messages:
|
||||
raise StopAsyncIteration
|
||||
return self._messages.pop(0)
|
||||
|
||||
|
||||
class QueueFeedingWebSocket:
|
||||
def __init__(self, owner, queued_events: list[object]) -> None:
|
||||
self.owner = owner
|
||||
self.queued_events = list(queued_events)
|
||||
self.sent_payloads: list[str] = []
|
||||
|
||||
async def send_str(self, payload: str) -> None:
|
||||
self.sent_payloads.append(payload)
|
||||
for event in self.queued_events:
|
||||
await self.owner._current_queue.put(event)
|
||||
|
||||
|
||||
class SilentWebSocket:
|
||||
def __init__(self) -> None:
|
||||
self.sent_payloads: list[str] = []
|
||||
|
||||
async def send_str(self, payload: str) -> None:
|
||||
self.sent_payloads.append(payload)
|
||||
|
||||
|
||||
class MessageResponseWithAttachments(MessageResponse):
|
||||
attachments: list[Attachment] = []
|
||||
attachments: list[Attachment] = Field(default_factory=list)
|
||||
|
||||
|
||||
def test_agent_api_wrapper_uses_modern_constructor_when_available(monkeypatch):
|
||||
calls: list[dict[str, object]] = []
|
||||
def test_agent_api_wrapper_normalizes_base_url_and_uses_modern_constructor(monkeypatch):
|
||||
captured = {}
|
||||
|
||||
def fake_init(self, agent_id, base_url, chat_id, **kwargs):
|
||||
calls.append(
|
||||
{
|
||||
"agent_id": agent_id,
|
||||
"base_url": base_url,
|
||||
"chat_id": chat_id,
|
||||
"kwargs": kwargs,
|
||||
}
|
||||
)
|
||||
self.id = agent_id
|
||||
self.url = base_url
|
||||
self.callback = kwargs.get("callback")
|
||||
self.on_disconnect = kwargs.get("on_disconnect")
|
||||
def fake_init(self, agent_id, base_url=None, chat_id=0, **kwargs):
|
||||
captured["agent_id"] = agent_id
|
||||
captured["base_url"] = base_url
|
||||
captured["chat_id"] = chat_id
|
||||
|
||||
monkeypatch.setattr(agent_api_wrapper_module.AgentApi, "__init__", fake_init)
|
||||
|
||||
wrapper = AgentApiWrapper(
|
||||
agent_id="agent-1",
|
||||
base_url="https://agent.example.com/v1/agent_ws",
|
||||
chat_id="chat-1",
|
||||
callback="cb",
|
||||
on_disconnect="disconnect",
|
||||
)
|
||||
child = wrapper.for_chat("chat-2")
|
||||
|
||||
assert calls == [
|
||||
{
|
||||
"agent_id": "agent-1",
|
||||
"base_url": "https://agent.example.com",
|
||||
"chat_id": "chat-1",
|
||||
"kwargs": {"callback": "cb", "on_disconnect": "disconnect"},
|
||||
},
|
||||
{
|
||||
"agent_id": "agent-1",
|
||||
"base_url": "https://agent.example.com",
|
||||
"chat_id": "chat-2",
|
||||
"kwargs": {"callback": "cb", "on_disconnect": "disconnect"},
|
||||
},
|
||||
]
|
||||
assert wrapper._base_url == "https://agent.example.com"
|
||||
assert wrapper.chat_id == "chat-1"
|
||||
assert wrapper.last_tokens_used == 0
|
||||
assert child.chat_id == "chat-2"
|
||||
|
||||
|
||||
def test_agent_api_wrapper_falls_back_to_legacy_url_constructor(monkeypatch):
|
||||
calls: list[dict[str, object]] = []
|
||||
|
||||
def fake_init(self, agent_id, url, callback=None, on_disconnect=None):
|
||||
calls.append(
|
||||
{
|
||||
"agent_id": agent_id,
|
||||
"url": url,
|
||||
"callback": callback,
|
||||
"on_disconnect": on_disconnect,
|
||||
}
|
||||
)
|
||||
self.id = agent_id
|
||||
self.url = url
|
||||
self.callback = callback
|
||||
self.on_disconnect = on_disconnect
|
||||
|
||||
monkeypatch.setattr(agent_api_wrapper_module.AgentApi, "__init__", fake_init)
|
||||
|
||||
wrapper = AgentApiWrapper(
|
||||
agent_id="agent-2",
|
||||
url="https://agent.example.com/agent_ws/",
|
||||
chat_id="chat-9",
|
||||
callback="cb",
|
||||
base_url="ws://platform-agent:8000/v1/agent_ws/",
|
||||
chat_id="41",
|
||||
)
|
||||
|
||||
assert calls == [
|
||||
{
|
||||
"agent_id": "agent-2",
|
||||
"url": "https://agent.example.com/agent_ws/?thread_id=chat-9",
|
||||
"callback": "cb",
|
||||
"on_disconnect": None,
|
||||
}
|
||||
]
|
||||
assert wrapper._base_url == "https://agent.example.com"
|
||||
assert wrapper.chat_id == "chat-9"
|
||||
assert wrapper.last_tokens_used == 0
|
||||
assert wrapper.chat_id == "41"
|
||||
assert wrapper._base_url == "ws://platform-agent:8000"
|
||||
assert captured == {
|
||||
"agent_id": "agent-1",
|
||||
"base_url": "ws://platform-agent:8000",
|
||||
"chat_id": "41",
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_agent_api_wrapper_recovers_late_text_after_first_end(monkeypatch):
|
||||
def test_agent_api_wrapper_for_chat_reuses_normalized_base_url(monkeypatch):
|
||||
init_calls = []
|
||||
|
||||
def fake_init(self, agent_id, base_url=None, chat_id=0, **kwargs):
|
||||
self.id = agent_id
|
||||
self.chat_id = chat_id
|
||||
self.url = base_url
|
||||
self.callback = kwargs.get("callback")
|
||||
self.on_disconnect = kwargs.get("on_disconnect")
|
||||
init_calls.append((agent_id, base_url, chat_id))
|
||||
|
||||
monkeypatch.setattr(agent_api_wrapper_module.AgentApi, "__init__", fake_init)
|
||||
|
||||
wrapper = AgentApiWrapper(
|
||||
root = AgentApiWrapper(
|
||||
agent_id="agent-1",
|
||||
base_url="https://agent.example.com/v1/agent_ws",
|
||||
chat_id="chat-1",
|
||||
)
|
||||
wrapper._connected = True
|
||||
wrapper._request_lock = asyncio.Lock()
|
||||
wrapper._current_queue = None
|
||||
wrapper._ws = QueueFeedingWebSocket(
|
||||
wrapper,
|
||||
[
|
||||
MsgEventTextChunk(text="Иллюстра"),
|
||||
MsgEventEnd(tokens_used=5),
|
||||
MsgEventTextChunk(text="ция"),
|
||||
MsgEventEnd(tokens_used=5),
|
||||
],
|
||||
base_url="http://platform-agent:8000/v1/agent_ws/",
|
||||
chat_id="1",
|
||||
)
|
||||
|
||||
chunks = []
|
||||
async for chunk in wrapper.send_message("hello"):
|
||||
chunks.append(chunk)
|
||||
child = root.for_chat("99")
|
||||
|
||||
assert [chunk.text for chunk in chunks] == ["Иллюстра", "ция"]
|
||||
assert wrapper.last_tokens_used == 5
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_agent_api_wrapper_times_out_on_idle_stream(monkeypatch):
|
||||
def fake_init(self, agent_id, base_url=None, chat_id=0, **kwargs):
|
||||
self.id = agent_id
|
||||
self.url = base_url
|
||||
self.callback = kwargs.get("callback")
|
||||
self.on_disconnect = kwargs.get("on_disconnect")
|
||||
|
||||
monkeypatch.setattr(agent_api_wrapper_module.AgentApi, "__init__", fake_init)
|
||||
monkeypatch.setattr(agent_api_wrapper_module, "_STREAM_IDLE_TIMEOUT_MS", 10)
|
||||
|
||||
wrapper = AgentApiWrapper(
|
||||
agent_id="agent-1",
|
||||
base_url="https://agent.example.com/v1/agent_ws",
|
||||
chat_id="chat-1",
|
||||
)
|
||||
wrapper._connected = True
|
||||
wrapper._request_lock = asyncio.Lock()
|
||||
wrapper._current_queue = None
|
||||
wrapper._ws = SilentWebSocket()
|
||||
|
||||
with pytest.raises(agent_api_wrapper_module.AgentException, match="Timed out waiting"):
|
||||
async for _ in wrapper.send_message("hello"):
|
||||
pass
|
||||
assert child is not root
|
||||
assert child.chat_id == "99"
|
||||
assert child._base_url == "http://platform-agent:8000"
|
||||
assert init_calls == [
|
||||
("agent-1", "http://platform-agent:8000", "1"),
|
||||
("agent-1", "http://platform-agent:8000", "99"),
|
||||
]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
|
|
@ -403,19 +208,19 @@ async def test_real_platform_client_send_message_uses_chat_bound_client():
|
|||
assert result == MessageResponse(
|
||||
message_id="@alice:example.org",
|
||||
response="hello",
|
||||
tokens_used=3,
|
||||
tokens_used=0,
|
||||
finished=True,
|
||||
)
|
||||
assert agent_api.created_chat_ids == ["chat-7"]
|
||||
assert agent_api.instances["chat-7"].chat_id == "chat-7"
|
||||
assert agent_api.instances["chat-7"].calls == ["hello"]
|
||||
assert agent_api.instances["chat-7"].connect_calls == 1
|
||||
assert await prototype_state.get_last_tokens_used_for_context("chat-7") == 3
|
||||
assert await prototype_state.get_last_tokens_used_for_context("chat-7") == 0
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_real_platform_client_forwards_attachments_to_chat_api():
|
||||
agent_api = AttachmentTrackingChatAgentApi("chat-7")
|
||||
agent_api = AttachmentTrackingAgentApiFactory()
|
||||
client = RealPlatformClient(
|
||||
agent_api=agent_api,
|
||||
prototype_state=PrototypeStateStore(),
|
||||
|
|
@ -435,74 +240,49 @@ async def test_real_platform_client_forwards_attachments_to_chat_api():
|
|||
attachments=[attachment],
|
||||
)
|
||||
|
||||
assert agent_api.calls == [("hello", ["surfaces/matrix/alice/room/inbox/report.pdf"])]
|
||||
assert agent_api.instances["chat-7"].calls == [
|
||||
("hello", ["surfaces/matrix/alice/room/inbox/report.pdf"])
|
||||
]
|
||||
assert result.response == "hello"
|
||||
assert result.tokens_used == 5
|
||||
assert result.tokens_used == 0
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_real_platform_client_preserves_send_file_events_in_sync_result(monkeypatch):
|
||||
agent_api = AttachmentTrackingChatAgentApi("chat-7")
|
||||
class FileEventAgentApi(AttachmentTrackingChatAgentApi):
|
||||
async def send_message(self, text: str, attachments: list[str] | None = None):
|
||||
self.calls.append((text, attachments))
|
||||
yield MsgEventTextChunk(text="he")
|
||||
yield MsgEventSendFile(path="report.pdf")
|
||||
yield MsgEventTextChunk(text="llo")
|
||||
|
||||
agent_api = AttachmentTrackingAgentApiFactory(chat_api_cls=FileEventAgentApi)
|
||||
client = RealPlatformClient(
|
||||
agent_api=agent_api,
|
||||
prototype_state=PrototypeStateStore(),
|
||||
platform="matrix",
|
||||
)
|
||||
|
||||
class FileEventAgentApi(AttachmentTrackingChatAgentApi):
|
||||
async def send_message(self, text: str, attachments: list[str] | None = None):
|
||||
self.calls.append((text, attachments))
|
||||
yield TextChunkEvent("he")
|
||||
yield SendFileEvent(
|
||||
workspace_path="/workspace/report.pdf",
|
||||
mime_type="application/pdf",
|
||||
filename="report.pdf",
|
||||
size=123,
|
||||
)
|
||||
yield TextChunkEvent("llo")
|
||||
self.last_tokens_used = 9
|
||||
|
||||
monkeypatch.setattr(
|
||||
"sdk.real.MessageResponse",
|
||||
MessageResponseWithAttachments,
|
||||
)
|
||||
client._agent_api = FileEventAgentApi("chat-7")
|
||||
|
||||
result = await client.send_message("@alice:example.org", "chat-7", "hello")
|
||||
|
||||
assert result.response == "hello"
|
||||
assert result.tokens_used == 9
|
||||
assert result.tokens_used == 0
|
||||
assert result.attachments == [
|
||||
Attachment(
|
||||
url="/workspace/report.pdf",
|
||||
mime_type="application/pdf",
|
||||
url="report.pdf",
|
||||
mime_type="application/octet-stream",
|
||||
filename="report.pdf",
|
||||
size=123,
|
||||
size=None,
|
||||
workspace_path="report.pdf",
|
||||
)
|
||||
]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_real_platform_client_works_with_legacy_agent_api_without_for_chat():
|
||||
legacy_api = LegacyAgentApi()
|
||||
client = RealPlatformClient(
|
||||
agent_api=legacy_api,
|
||||
prototype_state=PrototypeStateStore(),
|
||||
platform="matrix",
|
||||
)
|
||||
|
||||
result = await client.send_message("@alice:example.org", "chat-legacy", "hello")
|
||||
|
||||
assert result == MessageResponse(
|
||||
message_id="@alice:example.org",
|
||||
response="hello",
|
||||
tokens_used=7,
|
||||
finished=True,
|
||||
)
|
||||
assert legacy_api.calls == ["hello"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_real_platform_client_reuses_cached_chat_client():
|
||||
agent_api = FakeAgentApiFactory()
|
||||
|
|
@ -678,7 +458,7 @@ async def test_real_platform_client_stream_message_emits_final_tokens_chunk():
|
|||
message_id="@alice:example.org",
|
||||
delta="",
|
||||
finished=True,
|
||||
tokens_used=3,
|
||||
tokens_used=0,
|
||||
),
|
||||
]
|
||||
assert agent_api.created_chat_ids == ["chat-1"]
|
||||
|
|
@ -703,87 +483,3 @@ async def test_real_platform_client_settings_are_local():
|
|||
assert isinstance(settings, UserSettings)
|
||||
assert settings.skills["browser"] is True
|
||||
assert settings.skills["web-search"] is True
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_agent_api_wrapper_transparently_surfaces_modern_events(monkeypatch):
|
||||
callback_events: list[object] = []
|
||||
queue: asyncio.Queue = asyncio.Queue()
|
||||
event_map = {
|
||||
"text": TextChunkEvent("he"),
|
||||
"tool_call": ToolCallChunkEvent("call"),
|
||||
"tool_result": ToolResultEvent("result"),
|
||||
"custom_update": CustomUpdateEvent("update"),
|
||||
"send_file": SendFileEvent(
|
||||
workspace_path="/workspace/report.pdf",
|
||||
mime_type="application/pdf",
|
||||
filename="report.pdf",
|
||||
size=123,
|
||||
),
|
||||
"end": EndEvent(tokens_used=11),
|
||||
"error": ErrorEvent(code="BOOM", details="bad things"),
|
||||
"disconnect": GracefulDisconnectEvent(),
|
||||
}
|
||||
|
||||
def fake_validate_json(data: str):
|
||||
return event_map[data]
|
||||
|
||||
monkeypatch.setattr(
|
||||
agent_api_wrapper_module,
|
||||
"ServerMessage",
|
||||
type("FakeServerMessage", (), {"validate_json": staticmethod(fake_validate_json)}),
|
||||
)
|
||||
|
||||
async def fake_cleanup(self):
|
||||
return None
|
||||
|
||||
monkeypatch.setattr(agent_api_wrapper_module.AgentApiWrapper, "_cleanup", fake_cleanup)
|
||||
monkeypatch.setattr(
|
||||
agent_api_wrapper_module.AgentApi,
|
||||
"__init__",
|
||||
lambda self, agent_id, base_url=None, chat_id=0, **kwargs: (
|
||||
setattr(self, "id", agent_id)
|
||||
or setattr(self, "callback", kwargs.get("callback"))
|
||||
or setattr(self, "on_disconnect", kwargs.get("on_disconnect"))
|
||||
or setattr(self, "_current_queue", None)
|
||||
),
|
||||
)
|
||||
|
||||
wrapper = AgentApiWrapper(
|
||||
agent_id="agent-1",
|
||||
base_url="https://agent.example.com/v1/agent_ws",
|
||||
chat_id="chat-1",
|
||||
callback=callback_events.append,
|
||||
)
|
||||
wrapper._current_queue = queue
|
||||
wrapper._ws = FakeWebSocket(
|
||||
[
|
||||
FakeWSMessage("text"),
|
||||
FakeWSMessage("tool_call"),
|
||||
FakeWSMessage("tool_result"),
|
||||
FakeWSMessage("custom_update"),
|
||||
FakeWSMessage("send_file"),
|
||||
FakeWSMessage("end"),
|
||||
FakeWSMessage("error"),
|
||||
FakeWSMessage("disconnect"),
|
||||
]
|
||||
)
|
||||
|
||||
await wrapper._listen()
|
||||
|
||||
queue_events = []
|
||||
while not queue.empty():
|
||||
queue_events.append(await queue.get())
|
||||
|
||||
assert queue_events[0].text == "he"
|
||||
assert any(isinstance(event, SendFileEvent) for event in queue_events)
|
||||
assert any(isinstance(event, EndEvent) for event in queue_events)
|
||||
assert any(isinstance(event, GracefulDisconnectEvent) for event in queue_events)
|
||||
assert callback_events[0].payload == "call"
|
||||
assert callback_events[1].payload == "result"
|
||||
assert callback_events[2].payload == "update"
|
||||
assert any(isinstance(event, SendFileEvent) for event in callback_events)
|
||||
assert any(isinstance(event, EndEvent) for event in callback_events)
|
||||
assert any(isinstance(event, ErrorEvent) for event in callback_events)
|
||||
assert any(isinstance(event, GracefulDisconnectEvent) for event in callback_events)
|
||||
assert wrapper.last_tokens_used == 11
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue