Compare commits

...

4 commits

10 changed files with 1319 additions and 806 deletions

View file

@ -70,7 +70,7 @@ surfaces-bot/
- **Стабильность** — перед `sync_forever()` бот делает bootstrap sync и стартует с `since`, чтобы не переигрывать старую timeline после рестарта - **Стабильность** — перед `sync_forever()` бот делает bootstrap sync и стартует с `since`, чтобы не переигрывать старую timeline после рестарта
- **Текущее ограничение** — encrypted DM официально не поддержан; ручное тестирование Matrix ведётся в незашифрованных комнатах и зависит от локального state-store бота - **Текущее ограничение** — encrypted DM официально не поддержан; ручное тестирование Matrix ведётся в незашифрованных комнатах и зависит от локального state-store бота
- **Backend selection**`MATRIX_PLATFORM_BACKEND=mock` остаётся значением по умолчанию; `MATRIX_PLATFORM_BACKEND=real` использует `platform-agent` из compose и WebSocket contract `/v1/agent_ws/{chat_id}/` - **Backend selection**`MATRIX_PLATFORM_BACKEND=mock` остаётся значением по умолчанию; `MATRIX_PLATFORM_BACKEND=real` использует `platform-agent` из compose и WebSocket contract `/v1/agent_ws/{chat_id}/`
- **Ограничения real backend** — локальный runtime использует shared `/workspace`, а файлы передаются как относительные пути в `attachments` - **Ограничения real backend** — локальный runtime использует shared `/workspace`, файлы передаются как относительные пути в `attachments`, а transport layer со стороны `surfaces` использует pinned upstream `platform-agent_api.AgentApi` почти без локальной stream-логики; текущая реализация рабочая, но после tool/file flow остаётся подтверждённый upstream streaming bug, из-за которого начало ответа может пропадать
--- ---
@ -122,6 +122,8 @@ MATRIX_PASSWORD=... # или MATRIX_ACCESS_TOKEN=...
MATRIX_PLATFORM_BACKEND=real MATRIX_PLATFORM_BACKEND=real
# compose runtime: platform-agent service name + shared /workspace # compose runtime: platform-agent service name + shared /workspace
# значение передаётся в thin wrapper как base URL; wrapper сам нормализует его
# до upstream WS route /v1/agent_ws/{chat_id}/
AGENT_WS_URL=ws://platform-agent:8000/v1/agent_ws/ AGENT_WS_URL=ws://platform-agent:8000/v1/agent_ws/
AGENT_BASE_URL=http://platform-agent:8000 AGENT_BASE_URL=http://platform-agent:8000
SURFACES_WORKSPACE_DIR=/workspace SURFACES_WORKSPACE_DIR=/workspace
@ -245,7 +247,8 @@ PYTHONPATH=. uv run python -m adapter.matrix.bot
| Функция | Почему не работает | | Функция | Почему не работает |
|---|---| |---|---|
| `!load` в другом чате | platform-agent использует `StateBackend` — файлы живут в памяти отдельно для каждого `thread_id`. Файл, сохранённый в чате A, не виден в чате B. Фикс: переключить platform-agent на `FilesystemBackend` с общим хранилищем. | | `!load` в другом чате | platform-agent использует `StateBackend` — файлы живут в памяти отдельно для каждого `thread_id`. Файл, сохранённый в чате A, не виден в чате B. Фикс: переключить platform-agent на `FilesystemBackend` с общим хранилищем. |
| Счётчик токенов в `!context` | platform-agent отдаёт `tokens_used=0` хардкодом в `MsgEventEnd`. Наш код перехватывает значение корректно. | | Стриминг после tool/file flow | В текущем upstream `platform-agent` первый `MsgEventTextChunk` иногда рождается уже обрезанным до попадания в websocket-клиент. Наш transport layer после cleanup максимально близок к upstream и больше не пытается локально “лечить” этот поток. Подробности и raw evidence: `docs/reports/2026-04-22-platform-streaming-final-bug-report-ru.md`. |
| Счётчик токенов в `!context` | pinned `platform-agent_api.AgentApi` потребляет `MsgEventEnd` внутри клиента и не публикует `tokens_used` наружу. Сейчас `surfaces` честно показывает `0`, пока upstream не добавит поддержанный способ получить это значение. |
| `!reset` | platform-agent не имеет endpoint `/reset`. Задокументировано в ТЗ к платформе. | | `!reset` | platform-agent не имеет endpoint `/reset`. Задокументировано в ТЗ к платформе. |
| Персистентность между рестартами | platform-agent использует `MemorySaver` (in-memory). Все разговоры теряются при рестарте процесса. | | Персистентность между рестартами | platform-agent использует `MemorySaver` (in-memory). Все разговоры теряются при рестарте процесса. |
| E2EE комнаты | `python-olm` не собирается на macOS/ARM. Ограничение инфраструктуры. | | E2EE комнаты | `python-olm` не собирается на macOS/ARM. Ограничение инфраструктуры. |
@ -269,6 +272,7 @@ PYTHONPATH=. uv run python -m adapter.matrix.bot
| [`docs/api-contract.md`](docs/api-contract.md) | Контракт к SDK платформы | | [`docs/api-contract.md`](docs/api-contract.md) | Контракт к SDK платформы |
| [`docs/user-flow.md`](docs/user-flow.md) | FSM и user journey | | [`docs/user-flow.md`](docs/user-flow.md) | FSM и user journey |
| [`docs/claude-code-guide.md`](docs/claude-code-guide.md) | Гайд по работе с Claude Code | | [`docs/claude-code-guide.md`](docs/claude-code-guide.md) | Гайд по работе с Claude Code |
| [`docs/reports/2026-04-22-platform-streaming-final-bug-report-ru.md`](docs/reports/2026-04-22-platform-streaming-final-bug-report-ru.md) | Финальный аудит platform streaming bug после cleanup transport layer |
--- ---

View file

@ -110,7 +110,7 @@ def _build_platform_from_env() -> PlatformClient:
if backend == "real": if backend == "real":
ws_url = os.environ["AGENT_WS_URL"] ws_url = os.environ["AGENT_WS_URL"]
return RealPlatformClient( return RealPlatformClient(
agent_api=AgentApiWrapper(agent_id="matrix-bot", url=ws_url), agent_api=AgentApiWrapper(agent_id="matrix-bot", base_url=ws_url),
prototype_state=PrototypeStateStore(), prototype_state=PrototypeStateStore(),
platform="matrix", platform="matrix",
) )

View file

@ -0,0 +1,294 @@
# Финальный баг-репорт: потеря начала ответа и сбои streaming/image path в `platform-agent`
## Статус
Это финальный отчёт после полного аудита интеграции `surfaces -> platform-agent_api -> platform-agent`.
Итог:
- текущая реализация `surfaces` рабочая, но проблемная из-за upstream-дефектов платформы
- баг с пропадающим началом ответа на текущем состоянии **не локализуется в `surfaces`**
- в воспроизведённом сценарии повреждённый первый текстовый chunk рождается уже внутри `platform-agent`
- помимо этого подтверждены ещё два независимых platform-side дефекта:
- duplicate `END`
- некорректная обработка больших изображений (`data-uri > 10 MB`, `WS 1009`)
## Версии и состояние кода
Рантайм воспроизводился на vendored upstream-репозиториях без локальных platform patchей:
- `platform-agent`: `5e7c2df954cc3cd2f5bf8ae688e10a20038dde61`
- `platform-agent_api`: `aa480bbec5bbf8e006284dd03aed1c2754e9bbee`
Со стороны `surfaces` transport layer был предварительно очищен:
- убрана локальная stream-semantics из `sdk/agent_api_wrapper.py`
- `sdk/real.py` переведён на pinned upstream `platform-agent_api.AgentApi`
- больше нет локального post-END drain, custom listener и wrapper-owned reclassification events
Это важно: баг воспроизводился **после** удаления наших транспортных костылей.
## Контекст интеграции
- поверхность: Matrix
- транспорт к платформе: WebSocket через upstream `platform-agent_api.AgentApi`
- `chat_id` на платформу: стабильный числовой surrogate id, выдаваемый со стороны `surfaces`
- файловый контракт: shared `/workspace`, вложения передаются как относительные пути в `attachments`
## Пользовательские симптомы
Наблюдались несколько классов сбоев:
1. Начало ответа может пропасть
- ожидалось: `Моя ошибка: ...`
- фактически: `оя ошибка: ...`
- ожидалось: `На двух изображениях: ...`
- фактически: ` двух изображениях: ...`
2. После tool/file flow ответы могут вести себя нестабильно
- следующий ответ стартует с середины фразы
- в некоторых сценариях после image/tool path платформа отвечает ошибкой или замолкает
3. На больших изображениях image path падает совсем
- provider error `Exceeded limit on max bytes per data-uri item : 10485760`
- websocket закрывается с `1009 (message too big)`
## Что было проверено на стороне `surfaces`
Ниже перечислено, что именно было перепроверено в нашем коде и почему это не выглядит корнем проблемы.
### 1. Мы больше не режем и не переклассифицируем stream локально
В текущем `surfaces`:
- `sdk/agent_api_wrapper.py` — thin construction/factory shim над upstream `AgentApi`
- `sdk/real.py` — просто итерирует upstream events и склеивает `MsgEventTextChunk.text`
- `adapter/matrix/bot.py` — отправляет `OutgoingMessage.text` в Matrix без `strip/lstrip`
Наблюдение:
- в текущем коде не осталось места, где строка могла бы превратиться из `Моя ошибка` в `оя ошибка` через локальный slicing
### 2. Сборка ответа у нас линейная и тупая
`sdk/real.py` делает только следующее:
- если пришёл `MsgEventTextChunk` — добавляет `event.text` в `response_parts`
- если пришёл `MsgEventSendFile` — превращает его в `Attachment`
- не пытается “восстанавливать” поток после `END`
Следствие:
- если начало уже отсутствует в `event.text`, мы его не можем ни потерять, ни вернуть
### 3. Matrix sender не модифицирует текст
`adapter/matrix/bot.py` передаёт текст дальше как есть.
Следствие:
- Matrix renderer не является объяснением пропажи первого куска
## Что было проверено в `platform-agent_api`
Upstream client всё ещё имеет спорную queue-архитектуру:
- одна активная `_current_queue`
- `MsgEventEnd` съедается внутри `send_message()`
- в `finally` очередь отвязывается и дренится orphan messages
Это архитектурно хрупко и может быть источником других boundary bugs.
Но в конкретном воспроизведении этот слой не был точкой порчи текста.
Почему:
- в raw logs клиент получил **ровно тот же** первый text chunk, который сервер уже отправил
- queue/dequeue не изменили его содержимое
## Что удалось доказать по raw logs
Для финальной проверки была временно добавлена точечная диагностика в:
- `external/platform-agent/src/agent/service.py`
- `external/platform-agent/src/api/external.py`
- `external/platform-agent_api/lambda_agent_api/agent_api.py`
Эта диагностика **не входила** в рабочую реализацию и использовалась только для локализации бага.
### Ключевое наблюдение
На проблемном запросе после tool/file flow сервер сам yieldил уже обрезанный первый chunk:
```text
platform-agent-1 | [raw-stream][server-yield] chat=1 event=TEXT text=' двух изображениях:\n\n**Первое изображение'
platform-agent-1 | [raw-stream][ws-send] chat=1 event=AGENT_EVENT_TEXT_CHUNK text=' двух изображениях:\n\n**Первое изображение' path=None
matrix-bot-1 | [raw-stream][client-listen] agent=matrix-bot chat=1 queue_active=True AGENT_EVENT_TEXT_CHUNK text=' двух изображениях:\n\n**Первое изображение'
matrix-bot-1 | [raw-stream][client-dequeue] agent=matrix-bot chat=1 request=2 AGENT_EVENT_TEXT_CHUNK text=' двух изображениях:\n\n**Первое изображение'
```
Это означает:
- порча произошла **до** websocket-клиента
- `surfaces` transport layer не является источником именно этого дефекта
- `platform-agent_api` не исказил этот конкретный chunk по дороге
Дополнительно тот же паттерн виден и вне image-сценария:
```text
platform-agent-1 | [raw-stream][server-yield] chat=1 event=TEXT text='сё работает напрямую'
...
matrix-bot-1 | [raw-stream][client-dequeue] ... text='сё работает напрямую'
```
То есть сервер уже выдаёт `сё`, а не `Всё`.
## Наиболее вероятный root cause
Главный подозреваемый — `external/platform-agent/src/agent/service.py`.
Сейчас он делает следующее:
- читает `self._agent.astream_events(...)`
- обрабатывает только `kind == "on_chat_model_stream"`
- берёт `chunk = event["data"]["chunk"]`
- если `chunk.content`, форвардит `MsgEventTextChunk(text=chunk.content)`
Проблема в том, что это очень грубое преобразование raw event stream в пользовательский текст.
### Почему именно это место выглядит корнем
1. Первый битый chunk уже рождается на server-side
- это подтверждено логами выше
2. Код берёт только `chunk.content`
- если начало ответа приходит в другой форме, поле или raw event, оно просто теряется
3. Код не учитывает `ns` / `source`
- после tool/vision flow у Deep Agents / LangChain может быть более сложная структура потока
- текущий adapter flattenит её слишком агрессивно
4. Код никак не валидирует, что наружу уходит именно main assistant output
Итоговая гипотеза:
> После tool/file/vision flow `platform-agent` неправильно адаптирует `astream_events()` в `MsgEventTextChunk`. Начало итогового пользовательского ответа может находиться не в том raw event, который сейчас читается через `chunk.content`, либо теряться из-за упрощённой фильтрации потока.
## Подтверждённый отдельный баг: duplicate `END`
Это отдельный platform-side дефект.
Сейчас:
- `external/platform-agent/src/agent/service.py` уже делает `yield MsgEventEnd(...)`
- `external/platform-agent/src/api/external.py` после завершения цикла дополнительно отправляет ещё один `MsgEventEnd(...)`
По логам это выглядит так:
```text
platform-agent-1 | [raw-stream][server-yield] chat=1 event=END
platform-agent-1 | [raw-stream][ws-send] chat=1 event=AGENT_EVENT_END text=None path=None
platform-agent-1 | [raw-stream][ws-send] chat=1 event=AGENT_EVENT_END duplicate_end=true
matrix-bot-1 | ... AGENT_EVENT_END tokens_used=0
matrix-bot-1 | ... AGENT_EVENT_END tokens_used=0
```
Независимая оценка:
- duplicate `END` — реальный баг платформы
- он делает границу ответа менее надёжной
- но в текущем воспроизведении он **не объясняет** сам факт потери первого text chunk
То есть это важный, но вторичный дефект.
## Подтверждённый отдельный баг: большие изображения ломают image path
В отдельном воспроизведении платформа падала на анализе изображений с provider error:
```text
Exceeded limit on max bytes per data-uri item : 10485760
```
И параллельно websocket рвался с:
```text
received 1009 (message too big); then sent 1009 (message too big)
```
Это означает:
- image path отправляет в provider oversized `data:` URI
- безопасной предвалидации / деградации нет
- failure scenario сопровождается разрывом websocket-соединения
Независимая оценка:
- это отдельный platform-side bug
- он не объясняет потерю первого чанка в текстовом сценарии напрямую
- но подтверждает, что path `tool/file/image -> platform stream` в целом сейчас нестабилен
## Что мы считаем исключённым
С достаточной уверенностью можно исключить:
1. Локальный slicing текста в `surfaces`
2. Локальную “умную” реконструкцию потока, потому что она была удалена
3. Matrix sender как источник потери первого чанка
4. `platform-agent_api` queue/dequeue как primary root cause именно в этом воспроизведении
## Финальная независимая оценка
Текущая оценка вероятностей:
- `75%` — ошибка в `platform-agent`, в адаптере `astream_events() -> MsgEventTextChunk`
- `15%` — provider/model stream приносит начало ответа в другой raw event/field, а `platform-agent` его некорректно интерпретирует
- `10%` — вторичные platform-side boundary defects (`duplicate END`, queue semantics и т.д.)
- `~0-5%` — ошибка в `surfaces`
Итоговый вывод:
> На текущем состоянии кода баг с пропадающим началом ответа следует считать platform-side дефектом. Воспроизведение после cleanup transport layer показывает, что первый повреждённый text chunk формируется уже внутри `platform-agent` до отправки в websocket.
## Что нужно исправить в платформе
### Обязательно
1. Убрать duplicate `END`
- один ответ должен завершаться ровно одним `MsgEventEnd`
2. Перепроверить адаптацию `astream_events()` в `service.py`
- логировать и проанализировать raw `event["event"]`
- проверить `event.get("name")`
- смотреть `event.get("ns")`
- сравнить `chunk.content` с тем, что реально лежит в `chunk.text` / raw chunk repr
3. Форвардить наружу только финальный main assistant output
- не flattenить весь поток без учёта `ns/source`
### Желательно
4. Сделать image path устойчивым к oversized payload
- preflight check размера
- resize/compress или controlled error без разрыва WS
5. Улучшить client/server protocol boundary
- более строгая корреляция запроса и ответа
- более однозначная semantics конца ответа
## Что мы сделали со своей стороны
Со стороны `surfaces` уже выполнено следующее:
- transport layer очищен до thin adapter над upstream `AgentApi`
- локальные stream-workaroundы удалены
- рабочая интеграция сохранена
- known issue задокументирован
То есть текущая интеграция не “идеальна”, но её поведение теперь достаточно чистое, чтобы platform bug было можно локализовать без смешения ответственности.
## Приложение: короткий диагноз
Если нужна самая короткая формулировка для issue tracker:
> После cleanup transport layer в `surfaces` и воспроизведения на clean upstream platform repos видно, что `platform-agent` иногда сам порождает первый `MsgEventTextChunk` уже обрезанным, особенно после tool/file flow. Дополнительно подтверждены duplicate `END` и отдельный image-path failure на больших `data:` URI.

View file

@ -0,0 +1,540 @@
# Transport Layer Thin Adapter Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Replace the current custom transport behavior with a thin upstream-compatible adapter so Matrix uses `platform-agent_api.AgentApi` almost as-is and keeps only integration concerns on the `surfaces` side.
**Architecture:** Keep a tiny `AgentApiWrapper` only as a construction/factory shim (`base_url` normalization + `for_chat(chat_id)`). Move all resilience logic that still belongs to `surfaces` into `RealPlatformClient`: per-chat client caching, per-chat send locks, disconnect-on-failure, and `PlatformError` mapping. Delete all custom stream semantics from the wrapper so bugs in streaming can be attributed cleanly to either upstream or our integration layer.
**Tech Stack:** Python 3.11, `aiohttp`, upstream `lambda_agent_api`, `pytest`, `pytest-asyncio`, `ruff`
---
## File Structure
- Modify: `sdk/agent_api_wrapper.py`
Purpose: shrink the wrapper to a thin constructor/factory shim with no custom `_listen()` or `send_message()` logic.
- Modify: `sdk/real.py`
Purpose: keep integration-only behavior: cached per-chat clients, chat locks, attachment forwarding, and failure cleanup.
- Modify: `adapter/matrix/bot.py`
Purpose: instantiate the wrapper with the modern `base_url` path instead of a raw `url`, matching the pinned upstream API.
- Modify: `tests/platform/test_real.py`
Purpose: remove tests that encode custom wrapper semantics and replace them with tests that prove only the intended integration guarantees.
- Modify: `README.md`
Purpose: document that the transport layer now uses the pinned upstream `platform-agent_api` client semantics directly, with only a thin local adapter.
### Task 1: Shrink `AgentApiWrapper` To A Thin Factory Shim
**Files:**
- Modify: `sdk/agent_api_wrapper.py`
- Test: `tests/platform/test_real.py`
- [ ] **Step 1: Replace wrapper-only behavior tests with thin-wrapper tests**
Update `tests/platform/test_real.py` so it no longer asserts any custom post-END drain or wrapper-owned timeout behavior. Replace those tests with the following:
```python
def test_agent_api_wrapper_normalizes_base_url_and_uses_modern_constructor(monkeypatch):
captured = {}
def fake_init(self, agent_id, base_url=None, chat_id=0, **kwargs):
captured["agent_id"] = agent_id
captured["base_url"] = base_url
captured["chat_id"] = chat_id
monkeypatch.setattr(agent_api_wrapper_module.AgentApi, "__init__", fake_init)
wrapper = AgentApiWrapper(
agent_id="agent-1",
base_url="ws://platform-agent:8000/v1/agent_ws/",
chat_id="41",
)
assert wrapper.chat_id == "41"
assert wrapper._base_url == "ws://platform-agent:8000"
assert captured == {
"agent_id": "agent-1",
"base_url": "ws://platform-agent:8000",
"chat_id": "41",
}
def test_agent_api_wrapper_for_chat_reuses_normalized_base_url(monkeypatch):
init_calls = []
def fake_init(self, agent_id, base_url=None, chat_id=0, **kwargs):
self.id = agent_id
self.chat_id = chat_id
self.url = base_url
init_calls.append((agent_id, base_url, chat_id))
monkeypatch.setattr(agent_api_wrapper_module.AgentApi, "__init__", fake_init)
root = AgentApiWrapper(
agent_id="agent-1",
base_url="http://platform-agent:8000/v1/agent_ws/",
chat_id="1",
)
child = root.for_chat("99")
assert child is not root
assert child.chat_id == "99"
assert child._base_url == "http://platform-agent:8000"
assert init_calls == [
("agent-1", "http://platform-agent:8000", "1"),
("agent-1", "http://platform-agent:8000", "99"),
]
```
- [ ] **Step 2: Run tests to verify old assumptions fail**
Run:
```bash
/bin/zsh -lc 'PYTHONPATH=.:external/platform-agent_api uv run pytest tests/platform/test_real.py -q -k "recovers_late_text_after_first_end or times_out_on_idle_stream or normalizes_base_url_and_uses_modern_constructor or for_chat_reuses_normalized_base_url"'
```
Expected:
- FAIL because the old wrapper-behavior tests still exist
- FAIL or SKIP for the new thin-wrapper tests until code/tests are aligned
- [ ] **Step 3: Replace `sdk/agent_api_wrapper.py` with a thin wrapper**
Rewrite `sdk/agent_api_wrapper.py` to the minimal implementation below:
```python
from __future__ import annotations
import inspect
import re
import sys
from pathlib import Path
from urllib.parse import urlsplit, urlunsplit
_api_root = Path(__file__).resolve().parents[1] / "external" / "platform-agent_api"
if str(_api_root) not in sys.path:
sys.path.insert(0, str(_api_root))
from lambda_agent_api.agent_api import AgentApi # noqa: E402
class AgentApiWrapper(AgentApi):
"""Thin construction/factory shim over the pinned upstream AgentApi."""
def __init__(
self,
agent_id: str,
base_url: str,
*,
chat_id: int | str = 0,
**kwargs,
) -> None:
self._base_url = self._normalize_base_url(base_url)
self._init_kwargs = dict(kwargs)
self.chat_id = chat_id
if not self._supports_modern_constructor():
raise RuntimeError("Pinned platform-agent_api is expected to support base_url + chat_id")
super().__init__(
agent_id=agent_id,
base_url=self._base_url,
chat_id=chat_id,
**kwargs,
)
@staticmethod
def _supports_modern_constructor() -> bool:
try:
parameters = inspect.signature(AgentApi.__init__).parameters
except (TypeError, ValueError):
return False
return "base_url" in parameters and "chat_id" in parameters
@staticmethod
def _normalize_base_url(base_url: str) -> str:
parsed = urlsplit(base_url)
path = re.sub(r"(?:/v1)?/agent_ws(?:/[^/]+)?/?$", "", parsed.path.rstrip("/"))
return urlunsplit((parsed.scheme, parsed.netloc, path, "", ""))
def for_chat(self, chat_id: int | str) -> "AgentApiWrapper":
return type(self)(
agent_id=self.id,
base_url=self._base_url,
chat_id=chat_id,
**self._init_kwargs,
)
```
- [ ] **Step 4: Run the wrapper-focused tests**
Run:
```bash
/bin/zsh -lc 'PYTHONPATH=.:external/platform-agent_api uv run pytest tests/platform/test_real.py -q -k "normalizes_base_url_and_uses_modern_constructor or for_chat_reuses_normalized_base_url"'
```
Expected:
- PASS
- [ ] **Step 5: Commit**
```bash
git add sdk/agent_api_wrapper.py tests/platform/test_real.py
git commit -m "refactor: shrink agent api wrapper to thin adapter"
```
### Task 2: Simplify `RealPlatformClient` To The Pinned Modern API
**Files:**
- Modify: `sdk/real.py`
- Modify: `adapter/matrix/bot.py`
- Test: `tests/platform/test_real.py`
- [ ] **Step 1: Add failing integration tests for the desired thin-adapter contract**
Extend `tests/platform/test_real.py` with these assertions:
```python
@pytest.mark.asyncio
async def test_real_platform_client_passes_attachments_to_modern_send_message():
agent_api = FakeAgentApiFactory()
client = RealPlatformClient(
agent_api=agent_api,
prototype_state=PrototypeStateStore(),
platform="matrix",
)
attachment = Attachment(
type="document",
filename="report.pdf",
mime_type="application/pdf",
workspace_path="surfaces/matrix/u1/r1/inbox/report.pdf",
)
result = await client.send_message(
"@alice:example.org",
"chat-1",
"read this",
attachments=[attachment],
)
assert result.response == "read this"
assert agent_api.instances["chat-1"].calls == [
("read this", ["surfaces/matrix/u1/r1/inbox/report.pdf"])
]
@pytest.mark.asyncio
async def test_real_platform_client_disconnects_chat_after_agent_exception():
class ErroringChatAgentApi:
def __init__(self, chat_id: str) -> None:
self.chat_id = chat_id
self.connect_calls = 0
self.close_calls = 0
async def connect(self) -> None:
self.connect_calls += 1
async def close(self) -> None:
self.close_calls += 1
async def send_message(self, text: str, attachments: list[str] | None = None):
raise agent_api_wrapper_module.AgentException("INTERNAL_ERROR", "boom")
yield
agent_api = FakeAgentApiFactory()
erroring = ErroringChatAgentApi("chat-1")
agent_api.for_chat = lambda chat_id: erroring
client = RealPlatformClient(
agent_api=agent_api,
prototype_state=PrototypeStateStore(),
platform="matrix",
)
with pytest.raises(PlatformError, match="boom") as exc_info:
await client.send_message("@alice:example.org", "chat-1", "hello")
assert exc_info.value.code == "INTERNAL_ERROR"
assert erroring.close_calls == 1
assert "chat-1" not in client._chat_apis
```
- [ ] **Step 2: Run tests to verify they fail before simplification**
Run:
```bash
/bin/zsh -lc 'PYTHONPATH=.:external/platform-agent_api uv run pytest tests/platform/test_real.py -q -k "passes_attachments_to_modern_send_message or disconnects_chat_after_agent_exception"'
```
Expected:
- FAIL until `sdk/real.py` and Matrix runtime wiring are aligned with the pinned modern API
- [ ] **Step 3: Simplify `sdk/real.py` and Matrix runtime construction**
Make these exact edits:
```python
# adapter/matrix/bot.py
def _build_platform_from_env() -> PlatformClient:
backend = os.environ.get("MATRIX_PLATFORM_BACKEND", "mock").strip().lower()
if backend == "real":
base_url = os.environ["AGENT_BASE_URL"]
return RealPlatformClient(
agent_api=AgentApiWrapper(agent_id="matrix-bot", base_url=base_url),
prototype_state=PrototypeStateStore(),
platform="matrix",
)
return MockPlatformClient()
```
```python
# sdk/real.py
from __future__ import annotations
import asyncio
from collections.abc import AsyncIterator
from pathlib import Path
from sdk.agent_api_wrapper import AgentApiWrapper
from sdk.interface import (
Attachment,
MessageChunk,
MessageResponse,
PlatformClient,
PlatformError,
User,
UserSettings,
)
from sdk.prototype_state import PrototypeStateStore
class RealPlatformClient(PlatformClient):
def __init__(
self,
agent_api: AgentApiWrapper,
prototype_state: PrototypeStateStore,
platform: str = "matrix",
) -> None:
self._agent_api = agent_api
self._prototype_state = prototype_state
self._platform = platform
self._chat_apis: dict[str, AgentApiWrapper] = {}
self._chat_api_lock = asyncio.Lock()
self._chat_send_locks: dict[str, asyncio.Lock] = {}
@property
def agent_api(self) -> AgentApiWrapper:
return self._agent_api
async def _get_chat_api(self, chat_id: str) -> AgentApiWrapper:
chat_key = str(chat_id)
chat_api = self._chat_apis.get(chat_key)
if chat_api is None:
async with self._chat_api_lock:
chat_api = self._chat_apis.get(chat_key)
if chat_api is None:
chat_api = self._agent_api.for_chat(chat_key)
await chat_api.connect()
self._chat_apis[chat_key] = chat_api
return chat_api
def _get_chat_send_lock(self, chat_id: str) -> asyncio.Lock:
chat_key = str(chat_id)
lock = self._chat_send_locks.get(chat_key)
if lock is None:
lock = asyncio.Lock()
self._chat_send_locks[chat_key] = lock
return lock
async def send_message(
self,
user_id: str,
chat_id: str,
text: str,
attachments: list[Attachment] | None = None,
) -> MessageResponse:
response_parts: list[str] = []
tokens_used = 0
sent_attachments: list[Attachment] = []
message_id = user_id
lock = self._get_chat_send_lock(chat_id)
async with lock:
chat_api = await self._get_chat_api(chat_id)
try:
async for event in chat_api.send_message(text, attachments=self._attachment_paths(attachments)):
if hasattr(event, "text"):
response_parts.append(event.text)
elif event.__class__.__name__ == "MsgEventEnd":
tokens_used = getattr(event, "tokens_used", 0)
elif "SEND_FILE" in getattr(getattr(event, "type", None), "value", str(getattr(event, "type", ""))):
attachment = self._attachment_from_send_file_event(event)
if attachment is not None:
sent_attachments.append(attachment)
except Exception as exc:
await self._handle_chat_api_failure(chat_id, exc)
await self._prototype_state.set_last_tokens_used(str(chat_id), tokens_used)
return MessageResponse(
message_id=message_id,
response="".join(response_parts),
tokens_used=tokens_used,
finished=True,
attachments=sent_attachments,
)
async def stream_message(
self,
user_id: str,
chat_id: str,
text: str,
attachments: list[Attachment] | None = None,
) -> AsyncIterator[MessageChunk]:
lock = self._get_chat_send_lock(chat_id)
async with lock:
chat_api = await self._get_chat_api(chat_id)
try:
async for event in chat_api.send_message(text, attachments=self._attachment_paths(attachments)):
if hasattr(event, "text"):
yield MessageChunk(
message_id=user_id,
delta=event.text,
finished=False,
)
elif event.__class__.__name__ == "MsgEventEnd":
tokens_used = getattr(event, "tokens_used", 0)
await self._prototype_state.set_last_tokens_used(str(chat_id), tokens_used)
yield MessageChunk(
message_id=user_id,
delta="",
finished=True,
tokens_used=tokens_used,
)
except Exception as exc:
await self._handle_chat_api_failure(chat_id, exc)
async def disconnect_chat(self, chat_id: str) -> None:
chat_key = str(chat_id)
chat_api = self._chat_apis.pop(chat_key, None)
self._chat_send_locks.pop(chat_key, None)
if chat_api is not None:
await chat_api.close()
async def close(self) -> None:
for chat_api in list(self._chat_apis.values()):
await chat_api.close()
self._chat_apis.clear()
self._chat_send_locks.clear()
async def _handle_chat_api_failure(self, chat_id: str, exc: Exception) -> None:
await self.disconnect_chat(chat_id)
code = getattr(exc, "code", None) or "PLATFORM_CONNECTION_ERROR"
raise PlatformError(str(exc), code=code) from exc
@staticmethod
def _attachment_paths(attachments: list[Attachment] | None) -> list[str]:
if not attachments:
return []
return [attachment.workspace_path for attachment in attachments if attachment.workspace_path]
```
- [ ] **Step 4: Run the focused transport tests**
Run:
```bash
/bin/zsh -lc 'PYTHONPATH=.:external/platform-agent_api uv run pytest tests/platform/test_real.py -q -k "passes_attachments_to_modern_send_message or disconnects_chat_after_agent_exception or wraps_connection_closed_as_platform_error or reconnects_after_closed_chat_api"'
```
Expected:
- PASS
- [ ] **Step 5: Commit**
```bash
git add adapter/matrix/bot.py sdk/real.py tests/platform/test_real.py
git commit -m "refactor: use upstream transport semantics in real client"
```
### Task 3: Remove Custom Transport Assumptions From Tests And Docs
**Files:**
- Modify: `tests/platform/test_real.py`
- Modify: `README.md`
- [ ] **Step 1: Delete tests that encode wrapper-owned stream semantics**
Remove any tests that assert:
- late text is recovered after the first `END`
- duplicate `END` is repaired inside our wrapper
- wrapper-owned idle timeout semantics
The file should keep only tests for:
- wrapper construction/factory behavior
- per-chat client reuse
- reconnect/disconnect after failure
- attachment forwarding
- per-chat send locking
- [ ] **Step 2: Update README transport description**
Add this text to the Matrix runtime/backend section in `README.md`:
```md
Transport layer note:
- `surfaces` now uses the pinned upstream `platform-agent_api.AgentApi` stream semantics directly
- local code keeps only a thin adapter for client construction and per-chat client factories
- request serialization, disconnect-on-failure, and `PlatformError` mapping remain in `sdk/real.py`
- `surfaces` no longer performs local post-END stream reconstruction
```
- [ ] **Step 3: Run the full verification set**
Run:
```bash
uv run ruff check adapter/matrix sdk tests/platform/test_real.py
/bin/zsh -lc 'PYTHONPATH=. uv run pytest tests/adapter/matrix -q'
/bin/zsh -lc 'PYTHONPATH=.:external/platform-agent_api uv run pytest tests/platform/test_real.py -q'
```
Expected:
- `ruff` reports `All checks passed!`
- Matrix adapter tests PASS
- `tests/platform/test_real.py` PASS
- [ ] **Step 4: Commit**
```bash
git add README.md tests/platform/test_real.py
git commit -m "test: remove custom transport semantics assumptions"
```
---
## Self-Review
- Spec coverage:
- thin adapter target: covered by Task 1
- integration-only `RealPlatformClient`: covered by Task 2
- removal of custom stream semantics assumptions: covered by Task 3
- re-verification after cleanup: covered by Task 3
- Placeholder scan:
- no `TODO`, `TBD`, or deferred implementation placeholders remain in task steps
- Type consistency:
- `AgentApiWrapper` remains the construction/factory type used by `RealPlatformClient`
- failure mapping still terminates in `PlatformError`
- attachment forwarding consistently uses `attachments: list[str]`

View file

@ -0,0 +1,318 @@
# Transport Layer Thin Adapter Design
## Цель
Упростить transport layer между Matrix surface и `platform-agent` до максимально production-like вида:
- использовать upstream `platform-agent_api.AgentApi` почти как есть
- убрать из surface-side клиента собственную интерпретацию stream semantics
- оставить в нашем коде только integration concerns:
- per-chat lifecycle
- per-chat serialization
- attachment path forwarding
- exception mapping в `PlatformError`
Это нужно, чтобы:
- восстановить чёткую границу ответственности между `surfaces` и платформой
- убрать из диагностики ложные факторы, внесённые нашей кастомной обёрткой
- получить честную картину реальных platform bugs до добавления любых policy-надстроек
## Контекст
Сейчас transport path состоит из:
- [sdk/agent_api_wrapper.py](/Users/a/MAI/sem2/lambda/surfaces-bot/sdk/agent_api_wrapper.py)
- [sdk/real.py](/Users/a/MAI/sem2/lambda/surfaces-bot/sdk/real.py)
Изначально `AgentApiWrapper` был создан по разумным причинам:
- поддержка переходного периода между разными версиями `platform-agent_api`
- унификация `base_url/url`
- создание per-chat client instances через `for_chat()`
- локальный учёт `tokens_used`
Позже в этот слой были добавлены уже не compatibility-функции, а собственные transport semantics:
- custom `_listen()`
- custom `send_message()`
- post-END drain window
- custom idle timeout
- event-kind reclassification
После этого `surfaces` перестал быть тонким клиентом платформы и начал вести себя как альтернативная реализация transport layer. Это делает диагностику platform bugs нечистой.
## Принципы дизайна
### 1. Transport должен быть скучным
Transport layer не должен:
- спасать поздние chunks
- лечить duplicate `END`
- придумывать собственные правила границы ответа
- по-своему классифицировать stream events сверх upstream client behavior
Если upstream stream protocol повреждён, мы должны видеть это как platform issue, а не скрывать его кастомной очередью.
### 2. Policy и transport разделяются
Transport:
- говорит с upstream API
- доставляет события
- закрывает соединение
Policy:
- решает, что считать recoverable failure
- нужна ли повторная попытка
- как сообщать ошибку пользователю
- нужно ли сбрасывать chat session
На первом этапе policy не расширяется. Мы сначала приводим transport к тонкому адаптеру, потом заново оцениваем реальные проблемы.
### 3. Session lifecycle остаётся на нашей стороне
Даже в thin-adapter модели `surfaces` по-прежнему отвечает за:
- кеширование client per chat
- один send lock на chat
- сброс мёртвой chat session после failure
- mapping upstream exceptions в `PlatformError`
Это не transport semantics, а integration lifecycle.
## Варианты
### Вариант A. Оставить текущий кастомный wrapper
Плюсы:
- уже работает на части сценариев
- содержит built-in mitigations против observed failures
Минусы:
- нарушает границу ответственности
- усложняет диагностику
- делает platform bug reports спорными
- содержит symptom-fix логику в transport layer
Вердикт: не подходит как production-like target.
### Вариант B. Thin upstream adapter
Плюсы:
- чистая архитектура
- честная диагностика upstream проблем
- минимальная собственная магия
Минусы:
- локальные mitigations исчезают
- если upstream client несовершенен, это сразу проявится
Вердикт: правильный первый этап.
### Вариант C. Thin adapter сейчас, outer policy layer потом
Плюсы:
- даёт production-like эволюцию
- не смешивает transport и resilience policy
- позволяет сначала увидеть реальные проблемы, потом адресовать только нужные
Минусы:
- требует двух фаз вместо одной
Вердикт: рекомендуемый путь.
## Рекомендуемая архитектура
### Слой 1. Upstream client
Источник истины:
- [external/platform-agent_api/lambda_agent_api/agent_api.py](/Users/a/MAI/sem2/lambda/surfaces-bot/external/platform-agent_api/lambda_agent_api/agent_api.py)
Мы принимаем его stream semantics как authoritative behavior.
### Слой 2. Thin adapter
Файл:
- [sdk/agent_api_wrapper.py](/Users/a/MAI/sem2/lambda/surfaces-bot/sdk/agent_api_wrapper.py)
После cleanup он должен содержать только:
- создание клиента через modern constructor
- `base_url` normalization, если это действительно нужно для наших env
- `for_chat(chat_id)` как factory convenience
- опционально thin storage для `last_tokens_used`, если это можно сделать без переопределения stream semantics
Он не должен переопределять:
- `_listen()`
- `send_message()`
- queue lifecycle
- post-END behavior
- timeout behavior
### Слой 3. Integration/session layer
Файл:
- [sdk/real.py](/Users/a/MAI/sem2/lambda/surfaces-bot/sdk/real.py)
Ответственность:
- кешировать chat client instances
- сериализовать sends по chat lock
- вызывать `disconnect_chat(chat_id)` после transport failure
- превращать upstream exceptions в `PlatformError`
- форвардить `attachments` как relative workspace paths
- собирать `MessageResponse` / `MessageChunk` для остального приложения
Этот слой не должен заниматься:
- исправлением broken stream boundaries
- custom post-END reconstruction
- поздним дренированием очереди
## Что удаляем
Из [sdk/agent_api_wrapper.py](/Users/a/MAI/sem2/lambda/surfaces-bot/sdk/agent_api_wrapper.py):
- custom `_listen()`
- custom `send_message()`
- `_drain_post_end_events()`
- `_event_kind()`
- `_is_kind()`
- `_is_text_event()`
- `_is_end_event()`
- `_is_send_file_event()`
- `_POST_END_DRAIN_MS`
- `_STREAM_IDLE_TIMEOUT_MS`
- debug logging, завязанное на наш собственный queue lifecycle
## Что оставляем
В thin adapter:
- `__init__()` для modern `base_url/chat_id`
- `_normalize_base_url()` только если нужен стабильный env input
- `for_chat(chat_id)`
В [sdk/real.py](/Users/a/MAI/sem2/lambda/surfaces-bot/sdk/real.py):
- `_get_chat_api()`
- `_get_chat_send_lock()`
- `_attachment_paths()`
- `disconnect_chat()`
- `_handle_chat_api_failure()`
- `send_message()`
- `stream_message()`
## Дополнительное упрощение
Если после cleanup мы считаем pinned upstream API обязательным, то из [sdk/real.py](/Users/a/MAI/sem2/lambda/surfaces-bot/sdk/real.py) можно убрать legacy-minded probing:
- `inspect.signature(send_message)`
- conditional fallback на старый `send_message(text)` без `attachments`
В этом случае `RealPlatformClient` всегда использует современный контракт:
- `send_message(text, attachments=...)`
Это ещё сильнее уменьшит ambiguity.
## Этапы миграции
### Этап 1. Cleanup до thin adapter
Делаем:
- сжимаем `sdk/agent_api_wrapper.py` до thin shim
- переносим всю допустимую resilience logic только в `sdk/real.py`
- удаляем тесты, которые закрепляют наши кастомные transport semantics
### Этап 2. Повторная верификация
Заново прогоняем:
- text-only flow
- staged attachments flow
- large image failure
- duplicate `END` behavior
- behavior after transport disconnect
На этом этапе мы честно увидим, что реально делает upstream transport.
### Этап 3. Опциональный outer policy layer
Только если после Этапа 2 это действительно нужно, добавляем policy поверх transport:
- request timeout целиком
- retry policy
- circuit-breaker-like behavior
Но это должно жить не в client wrapper, а выше, в integration layer.
## Тестовая стратегия
### Удаляем как нецелевые тесты
Больше не считаем нормой:
- post-END drain behavior
- recovery late chunks после `END`
- idle timeout внутри wrapper как часть client contract
### Оставляем и добавляем
Нужные guarantees:
1. создаётся отдельный client per chat
2. один chat сериализуется через lock
3. разные чаты не делят client instance
4. attachment paths уходят в `send_message(..., attachments=...)`
5. transport failure приводит к `disconnect_chat(chat_id)`
6. следующий запрос после failure открывает новую chat session
7. upstream exception превращается в `PlatformError`
## Риски
### 1. Может снова проявиться реальный upstream bug
Это не regression дизайна, а полезный результат cleanup.
### 2. Может исчезнуть локальная защита от зависших стримов
Это допустимо на первом этапе.
Если она действительно нужна, она должна вернуться как outer policy, а не как переписанный client transport.
### 3. Может выясниться, что даже thin wrapper не нужен
Если modern upstream `AgentApi` уже полностью покрывает наш use case, файл `sdk/agent_api_wrapper.py` можно будет заменить на маленький factory helper или убрать совсем.
## Критерии успеха
Результат считается успешным, если:
- transport layer в `surfaces` перестаёт иметь собственную stream semantics
- platform bug reports снова можно формулировать без сильного caveat про кастомный клиент
- Matrix real backend продолжает работать на text-only и attachments scenarios
- failure handling остаётся контролируемым, но больше не маскирует transport behavior платформы
## Решение
Принять путь:
- `Thin upstream adapter now`
- `Observe real behavior`
- `Add outer policy later only if needed`
Это наиболее близкий к production best practice вариант для текущего состояния проекта.

View file

@ -1,87 +1,44 @@
from __future__ import annotations from __future__ import annotations
import asyncio
import inspect
import logging
import os
import re import re
import sys import sys
from collections.abc import AsyncIterator
from pathlib import Path from pathlib import Path
from urllib.parse import urlsplit, urlunsplit from urllib.parse import urlsplit, urlunsplit
import aiohttp
_api_root = Path(__file__).resolve().parents[1] / "external" / "platform-agent_api" _api_root = Path(__file__).resolve().parents[1] / "external" / "platform-agent_api"
if str(_api_root) not in sys.path: if str(_api_root) not in sys.path:
sys.path.insert(0, str(_api_root)) sys.path.insert(0, str(_api_root))
from lambda_agent_api.agent_api import AgentApi, AgentBusyException, AgentException # noqa: E402 from lambda_agent_api.agent_api import AgentApi # noqa: E402
from lambda_agent_api.client import EClientMessage, MsgUserMessage # noqa: E402
from lambda_agent_api.server import AgentEventUnion, MsgEventEnd, ServerMessage # noqa: E402
logger = logging.getLogger(__name__)
_DEBUG_STREAM = os.environ.get("SURFACES_AGENT_DEBUG_STREAM", "").strip().lower() in {
"1",
"true",
"yes",
}
_POST_END_DRAIN_MS = int(os.environ.get("SURFACES_AGENT_POST_END_DRAIN_MS", "120"))
_STREAM_IDLE_TIMEOUT_MS = int(os.environ.get("SURFACES_AGENT_IDLE_TIMEOUT_MS", "60000"))
class AgentApiWrapper(AgentApi): class AgentApiWrapper(AgentApi):
"""Capture tokens_used from MsgEventEnd without patching upstream code.""" """Thin construction/factory shim over the pinned upstream AgentApi."""
def __init__( def __init__(
self, self,
agent_id: str, agent_id: str,
base_url: str | None = None, base_url: str,
*, *,
chat_id: int | str = 0, chat_id: int | str = 0,
url: str | None = None,
**kwargs, **kwargs,
) -> None: ) -> None:
if base_url is None and url is None: self._base_url = self._normalize_base_url(base_url)
raise TypeError("AgentApiWrapper requires base_url or url")
self._base_url = self._normalize_base_url(base_url or url or "")
self._init_kwargs = dict(kwargs) self._init_kwargs = dict(kwargs)
self.chat_id = chat_id self.chat_id = chat_id
if self._supports_modern_constructor():
super().__init__( super().__init__(
agent_id=agent_id, agent_id=agent_id,
base_url=self._base_url, base_url=self._base_url,
chat_id=chat_id, chat_id=chat_id,
**kwargs, **kwargs,
) )
else:
super().__init__(
agent_id=agent_id,
url=self._build_ws_url(self._base_url, chat_id),
**kwargs,
)
self.last_tokens_used = 0
@staticmethod
def _supports_modern_constructor() -> bool:
try:
parameters = inspect.signature(AgentApi.__init__).parameters
except (TypeError, ValueError):
return False
return "base_url" in parameters and "chat_id" in parameters
@staticmethod @staticmethod
def _normalize_base_url(base_url: str) -> str: def _normalize_base_url(base_url: str) -> str:
parsed = urlsplit(base_url) parsed = urlsplit(base_url)
path = re.sub(r"(?:/v1)?/agent_ws(?:/[^/]+)?$", "", parsed.path.rstrip("/")) path = re.sub(r"(?:/v1)?/agent_ws(?:/[^/]+)?/?$", "", parsed.path.rstrip("/"))
return urlunsplit((parsed.scheme, parsed.netloc, path, "", "")) return urlunsplit((parsed.scheme, parsed.netloc, path, "", ""))
@staticmethod
def _build_ws_url(base_url: str, chat_id: int | str) -> str:
return base_url.rstrip("/") + f"/agent_ws/?thread_id={chat_id}"
def for_chat(self, chat_id: int | str) -> AgentApiWrapper: def for_chat(self, chat_id: int | str) -> AgentApiWrapper:
return type(self)( return type(self)(
agent_id=self.id, agent_id=self.id,
@ -89,227 +46,3 @@ class AgentApiWrapper(AgentApi):
chat_id=chat_id, chat_id=chat_id,
**self._init_kwargs, **self._init_kwargs,
) )
@staticmethod
def _event_kind(event: object) -> str:
raw_kind = getattr(event, "type", None)
if hasattr(raw_kind, "value"):
raw_kind = raw_kind.value
if raw_kind is None:
raw_kind = event.__class__.__name__
kind = str(raw_kind).replace("-", "_")
if "_" in kind:
return kind.upper()
normalized = []
for index, char in enumerate(kind):
if index and char.isupper() and not kind[index - 1].isupper():
normalized.append("_")
normalized.append(char)
return "".join(normalized).upper()
@classmethod
def _is_kind(cls, event: object, *needles: str) -> bool:
kind = cls._event_kind(event)
return any(needle in kind for needle in needles)
@classmethod
def _is_text_event(cls, event: object) -> bool:
return hasattr(event, "text") or cls._is_kind(event, "TEXT_CHUNK")
@classmethod
def _is_end_event(cls, event: object) -> bool:
kind = cls._event_kind(event)
return kind == "END" or kind.endswith("_END")
@classmethod
def _is_send_file_event(cls, event: object) -> bool:
return "SEND_FILE" in cls._event_kind(event)
async def _publish_event(self, event: object, *, queue_event: object | None = None) -> None:
if self.callback:
self.callback(event)
if self._current_queue:
await self._current_queue.put(queue_event if queue_event is not None else event)
async def _publish_error(self, event: object) -> None:
if self.callback:
self.callback(event)
if self._current_queue and hasattr(event, "code") and hasattr(event, "details"):
await self._current_queue.put(AgentException(event.code, event.details))
async def _listen(self):
try:
async for msg in self._ws:
if msg.type == aiohttp.WSMsgType.TEXT:
try:
outgoing_msg = ServerMessage.validate_json(msg.data)
if self._is_text_event(outgoing_msg):
if _DEBUG_STREAM:
logger.warning(
"[%s] text chunk queue=%s text=%r",
self.id,
self._current_queue is not None,
getattr(outgoing_msg, "text", "")[:80],
)
if self._current_queue:
await self._current_queue.put(outgoing_msg)
elif self.callback:
self.callback(outgoing_msg)
else:
logger.warning("[%s] AgentEvent without active request", self.id)
elif self._is_end_event(outgoing_msg):
self.last_tokens_used = outgoing_msg.tokens_used
if _DEBUG_STREAM:
logger.warning(
"[%s] end event queue=%s tokens=%s",
self.id,
self._current_queue is not None,
getattr(outgoing_msg, "tokens_used", None),
)
await self._publish_event(outgoing_msg)
elif self._is_kind(outgoing_msg, "ERROR"):
error = AgentException(outgoing_msg.code, outgoing_msg.details)
logger.error("[%s] Agent error: %s", self.id, error)
await self._publish_error(outgoing_msg)
elif self._is_kind(outgoing_msg, "GRACEFUL_DISCONNECT"):
await self._publish_event(outgoing_msg)
logger.info("[%s] Gracefully disconnecting", self.id)
break
else:
await self._publish_event(outgoing_msg)
except Exception as exc:
logger.error("[%s] Failed to deserialize message: %s", self.id, exc)
if self._current_queue:
await self._current_queue.put(
AgentException("PARSE_ERROR", f"Validation failed: {exc}")
)
elif msg.type in (aiohttp.WSMsgType.ERROR, aiohttp.WSMsgType.CLOSED):
logger.error("[%s] WebSocket closed/error: %s", self.id, msg.type)
break
except asyncio.CancelledError:
pass
except Exception as exc:
logger.error("[%s] Error in listen loop: %s", self.id, exc)
finally:
await self._cleanup()
async def send_message(
self, text: str, attachments: list[str] | None = None
) -> AsyncIterator[AgentEventUnion]:
if not self._connected or not self._ws:
raise AgentException(
code="NOT_CONNECTED", details="Not connected. Call connect() first."
)
if self._request_lock.locked():
raise AgentBusyException("Agent is currently processing another request")
await self._request_lock.acquire()
try:
self._current_queue = asyncio.Queue()
message = MsgUserMessage(
type=EClientMessage.USER_MESSAGE,
text=text,
attachments=attachments or [],
)
await self._ws.send_str(message.model_dump_json())
logger.debug("[%s] Sent message: %s...", self.id, text[:50])
while True:
try:
chunk = await asyncio.wait_for(
self._current_queue.get(),
timeout=max(_STREAM_IDLE_TIMEOUT_MS, 0) / 1000,
)
except TimeoutError as exc:
raise AgentException(
"TIMEOUT",
(
"Timed out waiting for the next agent stream event "
f"after {max(_STREAM_IDLE_TIMEOUT_MS, 0)}ms"
),
) from exc
if isinstance(chunk, Exception):
raise chunk
if isinstance(chunk, MsgEventEnd):
self.last_tokens_used = chunk.tokens_used
async for late_chunk in self._drain_post_end_events():
yield late_chunk
break
yield chunk
finally:
if self._current_queue:
orphan_queue = self._current_queue
self._current_queue = None
while not orphan_queue.empty():
try:
orphan_msg = orphan_queue.get_nowait()
if isinstance(orphan_msg, Exception):
logger.debug(
"[%s] Dropped exception from queue during cleanup: %s",
self.id,
orphan_msg,
)
continue
if self.callback:
self.callback(orphan_msg)
else:
logger.debug("[%s] Dropped orphaned message during cleanup", self.id)
except asyncio.QueueEmpty:
break
if self._request_lock.locked():
self._request_lock.release()
async def _drain_post_end_events(self) -> AsyncIterator[AgentEventUnion]:
if self._current_queue is None:
return
timeout_s = max(_POST_END_DRAIN_MS, 0) / 1000
while True:
try:
chunk = await asyncio.wait_for(self._current_queue.get(), timeout=timeout_s)
except TimeoutError:
break
if isinstance(chunk, Exception):
logger.warning("[%s] dropping post-END exception: %s", self.id, chunk)
continue
if isinstance(chunk, MsgEventEnd):
self.last_tokens_used = chunk.tokens_used
if _DEBUG_STREAM:
logger.warning(
"[%s] dropped duplicate END tokens=%s",
self.id,
chunk.tokens_used,
)
continue
if _DEBUG_STREAM and self._is_text_event(chunk):
logger.warning(
"[%s] recovered post-END text chunk=%r",
self.id,
getattr(chunk, "text", "")[:80],
)
yield chunk

View file

@ -1,10 +1,11 @@
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
import inspect
from collections.abc import AsyncIterator from collections.abc import AsyncIterator
from pathlib import Path from pathlib import Path
from lambda_agent_api.server import MsgEventSendFile, MsgEventTextChunk
from sdk.agent_api_wrapper import AgentApiWrapper from sdk.agent_api_wrapper import AgentApiWrapper
from sdk.interface import ( from sdk.interface import (
Attachment, Attachment,
@ -40,14 +41,10 @@ class RealPlatformClient(PlatformClient):
chat_key = str(chat_id) chat_key = str(chat_id)
chat_api = self._chat_apis.get(chat_key) chat_api = self._chat_apis.get(chat_key)
if chat_api is None: if chat_api is None:
chat_api_factory = getattr(self._agent_api, "for_chat", None)
if not callable(chat_api_factory):
return self._agent_api
async with self._chat_api_lock: async with self._chat_api_lock:
chat_api = self._chat_apis.get(chat_key) chat_api = self._chat_apis.get(chat_key)
if chat_api is None: if chat_api is None:
chat_api = chat_api_factory(chat_key) chat_api = self._agent_api.for_chat(chat_key)
await chat_api.connect() await chat_api.connect()
self._chat_apis[chat_key] = chat_api self._chat_apis[chat_key] = chat_api
return chat_api return chat_api
@ -80,48 +77,36 @@ class RealPlatformClient(PlatformClient):
attachments: list[Attachment] | None = None, attachments: list[Attachment] | None = None,
) -> MessageResponse: ) -> MessageResponse:
response_parts: list[str] = [] response_parts: list[str] = []
tokens_used = 0
sent_attachments: list[Attachment] = [] sent_attachments: list[Attachment] = []
message_id = user_id message_id = user_id
saw_end_event = False
lock = self._get_chat_send_lock(chat_id) lock = self._get_chat_send_lock(chat_id)
async with lock: async with lock:
chat_api = await self._get_chat_api(chat_id) chat_api = await self._get_chat_api(chat_id)
if hasattr(chat_api, "last_tokens_used"):
chat_api.last_tokens_used = 0
try: try:
async for event in self._stream_agent_events( async for event in self._stream_agent_events(
chat_api, text, attachments=attachments chat_api, text, attachments=attachments
): ):
message_id = user_id message_id = user_id
if self._is_text_event(event): if isinstance(event, MsgEventTextChunk) and event.text:
chunk_text = getattr(event, "text", "") response_parts.append(event.text)
if chunk_text: elif isinstance(event, MsgEventSendFile):
response_parts.append(chunk_text)
elif self._is_end_event(event):
tokens_used = getattr(event, "tokens_used", tokens_used)
saw_end_event = True
elif self._is_send_file_event(event):
attachment = self._attachment_from_send_file_event(event) attachment = self._attachment_from_send_file_event(event)
if attachment is not None: if attachment is not None:
sent_attachments.append(attachment) sent_attachments.append(attachment)
except Exception as exc: except Exception as exc:
await self._handle_chat_api_failure(chat_id, exc) await self._handle_chat_api_failure(chat_id, exc)
if not saw_end_event: await self._prototype_state.set_last_tokens_used(str(chat_id), 0)
tokens_used = getattr(chat_api, "last_tokens_used", tokens_used)
await self._prototype_state.set_last_tokens_used(str(chat_id), tokens_used)
response_kwargs = { response_kwargs = {
"message_id": message_id, "message_id": message_id,
"response": "".join(response_parts), "response": "".join(response_parts),
"tokens_used": tokens_used, "tokens_used": 0,
"finished": True, "finished": True,
"attachments": sent_attachments,
} }
if self._message_response_accepts_attachments():
response_kwargs["attachments"] = sent_attachments
return MessageResponse(**response_kwargs) return MessageResponse(**response_kwargs)
async def stream_message( async def stream_message(
@ -134,43 +119,26 @@ class RealPlatformClient(PlatformClient):
lock = self._get_chat_send_lock(chat_id) lock = self._get_chat_send_lock(chat_id)
async with lock: async with lock:
chat_api = await self._get_chat_api(chat_id) chat_api = await self._get_chat_api(chat_id)
if hasattr(chat_api, "last_tokens_used"):
chat_api.last_tokens_used = 0
saw_end_event = False
try: try:
async for event in self._stream_agent_events( async for event in self._stream_agent_events(
chat_api, text, attachments=attachments chat_api, text, attachments=attachments
): ):
if self._is_text_event(event): if isinstance(event, MsgEventTextChunk):
yield MessageChunk( yield MessageChunk(
message_id=user_id, message_id=user_id,
delta=getattr(event, "text", ""), delta=event.text,
finished=False, finished=False,
) )
elif self._is_end_event(event): elif isinstance(event, MsgEventSendFile):
tokens_used = getattr(event, "tokens_used", 0)
saw_end_event = True
await self._prototype_state.set_last_tokens_used(str(chat_id), tokens_used)
yield MessageChunk(
message_id=user_id,
delta="",
finished=True,
tokens_used=tokens_used,
)
elif self._is_send_file_event(event):
continue
else:
continue continue
except Exception as exc: except Exception as exc:
await self._handle_chat_api_failure(chat_id, exc) await self._handle_chat_api_failure(chat_id, exc)
if not saw_end_event: await self._prototype_state.set_last_tokens_used(str(chat_id), 0)
tokens_used = getattr(chat_api, "last_tokens_used", 0)
await self._prototype_state.set_last_tokens_used(str(chat_id), tokens_used)
yield MessageChunk( yield MessageChunk(
message_id=user_id, message_id=user_id,
delta="", delta="",
finished=True, finished=True,
tokens_used=tokens_used, tokens_used=0,
) )
async def get_settings(self, user_id: str) -> UserSettings: async def get_settings(self, user_id: str) -> UserSettings:
@ -195,10 +163,6 @@ class RealPlatformClient(PlatformClient):
await close() await close()
self._chat_apis.clear() self._chat_apis.clear()
self._chat_send_locks.clear() self._chat_send_locks.clear()
if not callable(getattr(self._agent_api, "for_chat", None)):
close = getattr(self._agent_api, "close", None)
if callable(close):
await close()
async def _stream_agent_events( async def _stream_agent_events(
self, self,
@ -206,12 +170,8 @@ class RealPlatformClient(PlatformClient):
text: str, text: str,
attachments: list[Attachment] | None = None, attachments: list[Attachment] | None = None,
) -> AsyncIterator[object]: ) -> AsyncIterator[object]:
send_message = chat_api.send_message
attachment_paths = self._attachment_paths(attachments) attachment_paths = self._attachment_paths(attachments)
if attachment_paths and self._send_message_accepts_attachments(send_message): event_stream = chat_api.send_message(text, attachments=attachment_paths or None)
event_stream = send_message(text, attachments=attachment_paths)
else:
event_stream = send_message(text)
async for event in event_stream: async for event in event_stream:
yield event yield event
@ -231,61 +191,9 @@ class RealPlatformClient(PlatformClient):
return paths return paths
@staticmethod @staticmethod
def _send_message_accepts_attachments(send_message) -> bool: def _attachment_from_send_file_event(event: MsgEventSendFile) -> Attachment:
try: location = str(event.path)
parameters = inspect.signature(send_message).parameters filename = Path(location).name or None
except (TypeError, ValueError):
return False
return "attachments" in parameters or any(
parameter.kind == inspect.Parameter.VAR_KEYWORD for parameter in parameters.values()
)
@staticmethod
def _event_kind(event: object) -> str:
raw_kind = getattr(event, "type", None)
if hasattr(raw_kind, "value"):
raw_kind = raw_kind.value
if raw_kind is None:
raw_kind = event.__class__.__name__
kind = str(raw_kind).replace("-", "_")
if "_" in kind:
return kind.upper()
normalized = []
for index, char in enumerate(kind):
if index and char.isupper() and not kind[index - 1].isupper():
normalized.append("_")
normalized.append(char)
return "".join(normalized).upper()
@classmethod
def _is_text_event(cls, event: object) -> bool:
return hasattr(event, "text") or "TEXT_CHUNK" in cls._event_kind(event)
@classmethod
def _is_end_event(cls, event: object) -> bool:
kind = cls._event_kind(event)
return kind == "END" or kind.endswith("_END")
@classmethod
def _is_send_file_event(cls, event: object) -> bool:
kind = cls._event_kind(event)
return "SEND_FILE" in kind
@staticmethod
def _attachment_from_send_file_event(event: object) -> Attachment | None:
location = None
for attr in ("url", "workspace_path", "path", "file_path", "uri"):
value = getattr(event, attr, None)
if value:
location = str(value)
break
if location is None:
return None
mime_type = getattr(event, "mime_type", None) or "application/octet-stream"
filename = getattr(event, "filename", None) or Path(location).name or None
size = getattr(event, "size", None)
workspace_path = location workspace_path = location
if workspace_path.startswith("/workspace/"): if workspace_path.startswith("/workspace/"):
workspace_path = workspace_path[len("/workspace/") :] workspace_path = workspace_path[len("/workspace/") :]
@ -293,18 +201,8 @@ class RealPlatformClient(PlatformClient):
workspace_path = "" workspace_path = ""
return Attachment( return Attachment(
url=location, url=location,
mime_type=mime_type, mime_type="application/octet-stream",
size=size, size=None,
filename=filename, filename=filename,
workspace_path=workspace_path or None, workspace_path=workspace_path or None,
) )
@staticmethod
def _message_response_accepts_attachments() -> bool:
fields = getattr(MessageResponse, "model_fields", None)
if isinstance(fields, dict):
return "attachments" in fields
try:
return "attachments" in inspect.signature(MessageResponse).parameters
except (TypeError, ValueError):
return False

View file

@ -911,9 +911,9 @@ async def test_build_runtime_uses_real_platform_when_matrix_backend_is_real(monk
bot_module = importlib.import_module("adapter.matrix.bot") bot_module = importlib.import_module("adapter.matrix.bot")
class FakeAgentApiWrapper: class FakeAgentApiWrapper:
def __init__(self, agent_id: str, url: str) -> None: def __init__(self, agent_id: str, base_url: str) -> None:
self.agent_id = agent_id self.agent_id = agent_id
self.url = url self.base_url = base_url
monkeypatch.setattr(bot_module, "AgentApiWrapper", FakeAgentApiWrapper) monkeypatch.setattr(bot_module, "AgentApiWrapper", FakeAgentApiWrapper)
monkeypatch.setenv("MATRIX_PLATFORM_BACKEND", "real") monkeypatch.setenv("MATRIX_PLATFORM_BACKEND", "real")
@ -922,7 +922,7 @@ async def test_build_runtime_uses_real_platform_when_matrix_backend_is_real(monk
runtime = build_runtime() runtime = build_runtime()
assert isinstance(runtime.platform, RealPlatformClient) assert isinstance(runtime.platform, RealPlatformClient)
assert runtime.platform.agent_api.url == "ws://agent.example/agent_ws/" assert runtime.platform.agent_api.base_url == "ws://agent.example/agent_ws/"
async def test_matrix_main_closes_platform_without_connecting_root_agent(monkeypatch): async def test_matrix_main_closes_platform_without_connecting_root_agent(monkeypatch):

View file

@ -4,32 +4,55 @@ Smoke test: полный цикл через dispatcher + реальные manag
Имитирует что делает адаптер (Telegram или Matrix) при получении события. Имитирует что делает адаптер (Telegram или Matrix) при получении события.
""" """
import pytest import pytest
from sdk.mock import MockPlatformClient from lambda_agent_api.server import MsgEventTextChunk
from sdk.interface import MessageChunk, MessageResponse
from sdk.prototype_state import PrototypeStateStore
from sdk.real import RealPlatformClient
from core.store import InMemoryStore
from core.chat import ChatManager
from core.auth import AuthManager from core.auth import AuthManager
from core.settings import SettingsManager from core.chat import ChatManager
from core.handler import EventDispatcher from core.handler import EventDispatcher
from core.handlers import register_all from core.handlers import register_all
from core.protocol import ( from core.protocol import (
IncomingCommand, IncomingMessage, IncomingCallback, Attachment,
OutgoingMessage, OutgoingUI, IncomingCallback,
Attachment, SettingsAction, IncomingCommand,
IncomingMessage,
OutgoingMessage,
OutgoingUI,
) )
from core.settings import SettingsManager
from core.store import InMemoryStore
from sdk.mock import MockPlatformClient
from sdk.prototype_state import PrototypeStateStore
from sdk.real import RealPlatformClient
class FakeAgentApi: class FakeAgentApi:
def __init__(self) -> None: def __init__(self, chat_id: str) -> None:
self.chat_id = chat_id
self.calls: list[tuple[str, list[str]]] = [] self.calls: list[tuple[str, list[str]]] = []
self.last_tokens_used = 0 self.connect_calls = 0
self.close_calls = 0
async def connect(self) -> None:
self.connect_calls += 1
async def close(self) -> None:
self.close_calls += 1
async def send_message(self, text: str, attachments: list[str] | None = None): async def send_message(self, text: str, attachments: list[str] | None = None):
self.calls.append((text, attachments or [])) self.calls.append((text, attachments or []))
yield type("Chunk", (), {"text": f"[REAL] {text}"})() yield MsgEventTextChunk(text=f"[REAL] {text}")
self.last_tokens_used = 5
class FakeAgentApiFactory:
def __init__(self) -> None:
self.created_chat_ids: list[str] = []
self.instances: dict[str, FakeAgentApi] = {}
def for_chat(self, chat_id: str) -> FakeAgentApi:
chat_api = FakeAgentApi(chat_id)
self.created_chat_ids.append(chat_id)
self.instances[chat_id] = chat_api
return chat_api
@pytest.fixture @pytest.fixture
@ -48,7 +71,7 @@ def dispatcher():
@pytest.fixture @pytest.fixture
def real_dispatcher(): def real_dispatcher():
agent_api = FakeAgentApi() agent_api = FakeAgentApiFactory()
platform = RealPlatformClient( platform = RealPlatformClient(
agent_api=agent_api, agent_api=agent_api,
prototype_state=PrototypeStateStore(), prototype_state=PrototypeStateStore(),
@ -80,7 +103,13 @@ async def test_new_chat_command(dispatcher):
start = IncomingCommand(user_id="u1", platform="matrix", chat_id="C1", command="start") start = IncomingCommand(user_id="u1", platform="matrix", chat_id="C1", command="start")
await dispatcher.dispatch(start) await dispatcher.dispatch(start)
new = IncomingCommand(user_id="u1", platform="matrix", chat_id="C2", command="new", args=["Анализ"]) new = IncomingCommand(
user_id="u1",
platform="matrix",
chat_id="C2",
command="new",
args=["Анализ"],
)
result = await dispatcher.dispatch(new) result = await dispatcher.dispatch(new)
assert any("Анализ" in r.text for r in result if isinstance(r, OutgoingMessage)) assert any("Анализ" in r.text for r in result if isinstance(r, OutgoingMessage))
@ -130,7 +159,8 @@ async def test_full_flow_with_real_platform_uses_shared_agent_api(real_dispatche
texts = [r.text for r in result if isinstance(r, OutgoingMessage)] texts = [r.text for r in result if isinstance(r, OutgoingMessage)]
assert texts == ["[REAL] Привет!"] assert texts == ["[REAL] Привет!"]
assert agent_api.calls == [("Привет!", [])] assert agent_api.created_chat_ids == ["C1"]
assert agent_api.instances["C1"].calls == [("Привет!", [])]
async def test_full_flow_with_real_platform_forwards_workspace_attachment(real_dispatcher): async def test_full_flow_with_real_platform_forwards_workspace_attachment(real_dispatcher):
@ -155,6 +185,6 @@ async def test_full_flow_with_real_platform_forwards_workspace_attachment(real_d
) )
await dispatcher.dispatch(msg) await dispatcher.dispatch(msg)
assert agent_api.calls == [ assert agent_api.instances["C1"].calls == [
("Посмотри файл", ["surfaces/matrix/u1/room/inbox/report.pdf"]) ("Посмотри файл", ["surfaces/matrix/u1/room/inbox/report.pdf"])
] ]

View file

@ -1,7 +1,8 @@
import asyncio import asyncio
import pytest import pytest
from lambda_agent_api.server import MsgEventEnd, MsgEventTextChunk from lambda_agent_api.server import MsgEventSendFile, MsgEventTextChunk
from pydantic import Field
import sdk.agent_api_wrapper as agent_api_wrapper_module import sdk.agent_api_wrapper as agent_api_wrapper_module
from core.protocol import SettingsAction from core.protocol import SettingsAction
@ -11,18 +12,12 @@ from sdk.prototype_state import PrototypeStateStore
from sdk.real import RealPlatformClient from sdk.real import RealPlatformClient
class FakeChunk:
def __init__(self, text: str) -> None:
self.text = text
class FakeChatAgentApi: class FakeChatAgentApi:
def __init__(self, chat_id: str) -> None: def __init__(self, chat_id: str) -> None:
self.chat_id = chat_id self.chat_id = chat_id
self.calls: list[str] = [] self.calls: list[str] = []
self.connect_calls = 0 self.connect_calls = 0
self.close_calls = 0 self.close_calls = 0
self.last_tokens_used = 0
async def connect(self) -> None: async def connect(self) -> None:
self.connect_calls += 1 self.connect_calls += 1
@ -30,12 +25,11 @@ class FakeChatAgentApi:
async def close(self) -> None: async def close(self) -> None:
self.close_calls += 1 self.close_calls += 1
async def send_message(self, text: str): async def send_message(self, text: str, attachments: list[str] | None = None):
self.calls.append(text) self.calls.append(text)
midpoint = len(text) // 2 midpoint = len(text) // 2
yield FakeChunk(text[:midpoint]) yield MsgEventTextChunk(text=text[:midpoint])
yield FakeChunk(text[midpoint:]) yield MsgEventTextChunk(text=text[midpoint:])
self.last_tokens_used = 3
class FakeAgentApiFactory: class FakeAgentApiFactory:
@ -50,25 +44,12 @@ class FakeAgentApiFactory:
return chat_api return chat_api
class LegacyAgentApi:
def __init__(self) -> None:
self.calls: list[str] = []
self.last_tokens_used = 0
async def send_message(self, text: str):
self.calls.append(text)
yield FakeChunk(text[:2])
yield FakeChunk(text[2:])
self.last_tokens_used = 7
class BlockingChatAgentApi: class BlockingChatAgentApi:
def __init__(self, chat_id: str) -> None: def __init__(self, chat_id: str) -> None:
self.chat_id = chat_id self.chat_id = chat_id
self.calls: list[str] = [] self.calls: list[str] = []
self.connect_calls = 0 self.connect_calls = 0
self.close_calls = 0 self.close_calls = 0
self.last_tokens_used = 0
self.active_calls = 0 self.active_calls = 0
self.max_active_calls = 0 self.max_active_calls = 0
self.started = asyncio.Event() self.started = asyncio.Event()
@ -80,15 +61,14 @@ class BlockingChatAgentApi:
async def close(self) -> None: async def close(self) -> None:
self.close_calls += 1 self.close_calls += 1
async def send_message(self, text: str): async def send_message(self, text: str, attachments: list[str] | None = None):
self.calls.append(text) self.calls.append(text)
self.active_calls += 1 self.active_calls += 1
self.max_active_calls = max(self.max_active_calls, self.active_calls) self.max_active_calls = max(self.max_active_calls, self.active_calls)
self.started.set() self.started.set()
await self.release.wait() await self.release.wait()
self.active_calls -= 1 self.active_calls -= 1
yield FakeChunk(text) yield MsgEventTextChunk(text=text)
self.last_tokens_used = len(text)
class AttachmentTrackingChatAgentApi: class AttachmentTrackingChatAgentApi:
@ -97,7 +77,6 @@ class AttachmentTrackingChatAgentApi:
self.calls: list[tuple[str, list[str] | None]] = [] self.calls: list[tuple[str, list[str] | None]] = []
self.connect_calls = 0 self.connect_calls = 0
self.close_calls = 0 self.close_calls = 0
self.last_tokens_used = 0
async def connect(self) -> None: async def connect(self) -> None:
self.connect_calls += 1 self.connect_calls += 1
@ -107,8 +86,20 @@ class AttachmentTrackingChatAgentApi:
async def send_message(self, text: str, attachments: list[str] | None = None): async def send_message(self, text: str, attachments: list[str] | None = None):
self.calls.append((text, attachments)) self.calls.append((text, attachments))
yield FakeChunk(text) yield MsgEventTextChunk(text=text)
self.last_tokens_used = 5
class AttachmentTrackingAgentApiFactory:
def __init__(self, chat_api_cls=AttachmentTrackingChatAgentApi) -> None:
self.chat_api_cls = chat_api_cls
self.created_chat_ids: list[str] = []
self.instances: dict[str, AttachmentTrackingChatAgentApi] = {}
def for_chat(self, chat_id: str) -> AttachmentTrackingChatAgentApi:
chat_api = self.chat_api_cls(chat_id)
self.created_chat_ids.append(chat_id)
self.instances[chat_id] = chat_api
return chat_api
class FlakyChatAgentApi: class FlakyChatAgentApi:
@ -128,247 +119,61 @@ class FlakyChatAgentApi:
yield yield
class SendFileEvent:
def __init__(self, *, workspace_path: str, mime_type: str, filename: str, size: int) -> None:
self.type = "AGENT_EVENT_SEND_FILE"
self.workspace_path = workspace_path
self.mime_type = mime_type
self.filename = filename
self.size = size
class TextChunkEvent:
def __init__(self, text: str) -> None:
self.type = "AGENT_EVENT_TEXT_CHUNK"
self.text = text
class ToolCallChunkEvent:
def __init__(self, payload: str) -> None:
self.type = "AGENT_EVENT_TOOL_CALL_CHUNK"
self.payload = payload
class ToolResultEvent:
def __init__(self, payload: str) -> None:
self.type = "AGENT_EVENT_TOOL_RESULT"
self.payload = payload
class CustomUpdateEvent:
def __init__(self, payload: str) -> None:
self.type = "AGENT_EVENT_CUSTOM_UPDATE"
self.payload = payload
class EndEvent:
def __init__(self, tokens_used: int) -> None:
self.type = "AGENT_EVENT_END"
self.tokens_used = tokens_used
class ErrorEvent:
def __init__(self, code: str, details: str) -> None:
self.type = "ERROR"
self.code = code
self.details = details
class GracefulDisconnectEvent:
def __init__(self) -> None:
self.type = "GRACEFUL_DISCONNECT"
class FakeWSMessage:
def __init__(self, data: str) -> None:
self.type = agent_api_wrapper_module.aiohttp.WSMsgType.TEXT
self.data = data
class FakeWebSocket:
def __init__(self, messages: list[FakeWSMessage]) -> None:
self._messages = list(messages)
def __aiter__(self):
return self
async def __anext__(self):
if not self._messages:
raise StopAsyncIteration
return self._messages.pop(0)
class QueueFeedingWebSocket:
def __init__(self, owner, queued_events: list[object]) -> None:
self.owner = owner
self.queued_events = list(queued_events)
self.sent_payloads: list[str] = []
async def send_str(self, payload: str) -> None:
self.sent_payloads.append(payload)
for event in self.queued_events:
await self.owner._current_queue.put(event)
class SilentWebSocket:
def __init__(self) -> None:
self.sent_payloads: list[str] = []
async def send_str(self, payload: str) -> None:
self.sent_payloads.append(payload)
class MessageResponseWithAttachments(MessageResponse): class MessageResponseWithAttachments(MessageResponse):
attachments: list[Attachment] = [] attachments: list[Attachment] = Field(default_factory=list)
def test_agent_api_wrapper_uses_modern_constructor_when_available(monkeypatch): def test_agent_api_wrapper_normalizes_base_url_and_uses_modern_constructor(monkeypatch):
calls: list[dict[str, object]] = [] captured = {}
def fake_init(self, agent_id, base_url, chat_id, **kwargs): def fake_init(self, agent_id, base_url=None, chat_id=0, **kwargs):
calls.append( captured["agent_id"] = agent_id
{ captured["base_url"] = base_url
"agent_id": agent_id, captured["chat_id"] = chat_id
"base_url": base_url,
"chat_id": chat_id,
"kwargs": kwargs,
}
)
self.id = agent_id
self.url = base_url
self.callback = kwargs.get("callback")
self.on_disconnect = kwargs.get("on_disconnect")
monkeypatch.setattr(agent_api_wrapper_module.AgentApi, "__init__", fake_init) monkeypatch.setattr(agent_api_wrapper_module.AgentApi, "__init__", fake_init)
wrapper = AgentApiWrapper( wrapper = AgentApiWrapper(
agent_id="agent-1", agent_id="agent-1",
base_url="https://agent.example.com/v1/agent_ws", base_url="ws://platform-agent:8000/v1/agent_ws/",
chat_id="chat-1", chat_id="41",
callback="cb",
on_disconnect="disconnect",
) )
child = wrapper.for_chat("chat-2")
assert calls == [ assert wrapper.chat_id == "41"
{ assert wrapper._base_url == "ws://platform-agent:8000"
assert captured == {
"agent_id": "agent-1", "agent_id": "agent-1",
"base_url": "https://agent.example.com", "base_url": "ws://platform-agent:8000",
"chat_id": "chat-1", "chat_id": "41",
"kwargs": {"callback": "cb", "on_disconnect": "disconnect"},
},
{
"agent_id": "agent-1",
"base_url": "https://agent.example.com",
"chat_id": "chat-2",
"kwargs": {"callback": "cb", "on_disconnect": "disconnect"},
},
]
assert wrapper._base_url == "https://agent.example.com"
assert wrapper.chat_id == "chat-1"
assert wrapper.last_tokens_used == 0
assert child.chat_id == "chat-2"
def test_agent_api_wrapper_falls_back_to_legacy_url_constructor(monkeypatch):
calls: list[dict[str, object]] = []
def fake_init(self, agent_id, url, callback=None, on_disconnect=None):
calls.append(
{
"agent_id": agent_id,
"url": url,
"callback": callback,
"on_disconnect": on_disconnect,
} }
)
self.id = agent_id
self.url = url
self.callback = callback
self.on_disconnect = on_disconnect
monkeypatch.setattr(agent_api_wrapper_module.AgentApi, "__init__", fake_init)
wrapper = AgentApiWrapper(
agent_id="agent-2",
url="https://agent.example.com/agent_ws/",
chat_id="chat-9",
callback="cb",
)
assert calls == [
{
"agent_id": "agent-2",
"url": "https://agent.example.com/agent_ws/?thread_id=chat-9",
"callback": "cb",
"on_disconnect": None,
}
]
assert wrapper._base_url == "https://agent.example.com"
assert wrapper.chat_id == "chat-9"
assert wrapper.last_tokens_used == 0
@pytest.mark.asyncio def test_agent_api_wrapper_for_chat_reuses_normalized_base_url(monkeypatch):
async def test_agent_api_wrapper_recovers_late_text_after_first_end(monkeypatch): init_calls = []
def fake_init(self, agent_id, base_url=None, chat_id=0, **kwargs): def fake_init(self, agent_id, base_url=None, chat_id=0, **kwargs):
self.id = agent_id self.id = agent_id
self.chat_id = chat_id
self.url = base_url self.url = base_url
self.callback = kwargs.get("callback") init_calls.append((agent_id, base_url, chat_id))
self.on_disconnect = kwargs.get("on_disconnect")
monkeypatch.setattr(agent_api_wrapper_module.AgentApi, "__init__", fake_init) monkeypatch.setattr(agent_api_wrapper_module.AgentApi, "__init__", fake_init)
wrapper = AgentApiWrapper( root = AgentApiWrapper(
agent_id="agent-1", agent_id="agent-1",
base_url="https://agent.example.com/v1/agent_ws", base_url="http://platform-agent:8000/v1/agent_ws/",
chat_id="chat-1", chat_id="1",
)
wrapper._connected = True
wrapper._request_lock = asyncio.Lock()
wrapper._current_queue = None
wrapper._ws = QueueFeedingWebSocket(
wrapper,
[
MsgEventTextChunk(text="Иллюстра"),
MsgEventEnd(tokens_used=5),
MsgEventTextChunk(text="ция"),
MsgEventEnd(tokens_used=5),
],
) )
chunks = [] child = root.for_chat("99")
async for chunk in wrapper.send_message("hello"):
chunks.append(chunk)
assert [chunk.text for chunk in chunks] == ["Иллюстра", "ция"] assert child is not root
assert wrapper.last_tokens_used == 5 assert child.chat_id == "99"
assert child._base_url == "http://platform-agent:8000"
assert init_calls == [
@pytest.mark.asyncio ("agent-1", "http://platform-agent:8000", "1"),
async def test_agent_api_wrapper_times_out_on_idle_stream(monkeypatch): ("agent-1", "http://platform-agent:8000", "99"),
def fake_init(self, agent_id, base_url=None, chat_id=0, **kwargs): ]
self.id = agent_id
self.url = base_url
self.callback = kwargs.get("callback")
self.on_disconnect = kwargs.get("on_disconnect")
monkeypatch.setattr(agent_api_wrapper_module.AgentApi, "__init__", fake_init)
monkeypatch.setattr(agent_api_wrapper_module, "_STREAM_IDLE_TIMEOUT_MS", 10)
wrapper = AgentApiWrapper(
agent_id="agent-1",
base_url="https://agent.example.com/v1/agent_ws",
chat_id="chat-1",
)
wrapper._connected = True
wrapper._request_lock = asyncio.Lock()
wrapper._current_queue = None
wrapper._ws = SilentWebSocket()
with pytest.raises(agent_api_wrapper_module.AgentException, match="Timed out waiting"):
async for _ in wrapper.send_message("hello"):
pass
@pytest.mark.asyncio @pytest.mark.asyncio
@ -403,19 +208,19 @@ async def test_real_platform_client_send_message_uses_chat_bound_client():
assert result == MessageResponse( assert result == MessageResponse(
message_id="@alice:example.org", message_id="@alice:example.org",
response="hello", response="hello",
tokens_used=3, tokens_used=0,
finished=True, finished=True,
) )
assert agent_api.created_chat_ids == ["chat-7"] assert agent_api.created_chat_ids == ["chat-7"]
assert agent_api.instances["chat-7"].chat_id == "chat-7" assert agent_api.instances["chat-7"].chat_id == "chat-7"
assert agent_api.instances["chat-7"].calls == ["hello"] assert agent_api.instances["chat-7"].calls == ["hello"]
assert agent_api.instances["chat-7"].connect_calls == 1 assert agent_api.instances["chat-7"].connect_calls == 1
assert await prototype_state.get_last_tokens_used_for_context("chat-7") == 3 assert await prototype_state.get_last_tokens_used_for_context("chat-7") == 0
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_real_platform_client_forwards_attachments_to_chat_api(): async def test_real_platform_client_forwards_attachments_to_chat_api():
agent_api = AttachmentTrackingChatAgentApi("chat-7") agent_api = AttachmentTrackingAgentApiFactory()
client = RealPlatformClient( client = RealPlatformClient(
agent_api=agent_api, agent_api=agent_api,
prototype_state=PrototypeStateStore(), prototype_state=PrototypeStateStore(),
@ -435,74 +240,49 @@ async def test_real_platform_client_forwards_attachments_to_chat_api():
attachments=[attachment], attachments=[attachment],
) )
assert agent_api.calls == [("hello", ["surfaces/matrix/alice/room/inbox/report.pdf"])] assert agent_api.instances["chat-7"].calls == [
("hello", ["surfaces/matrix/alice/room/inbox/report.pdf"])
]
assert result.response == "hello" assert result.response == "hello"
assert result.tokens_used == 5 assert result.tokens_used == 0
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_real_platform_client_preserves_send_file_events_in_sync_result(monkeypatch): async def test_real_platform_client_preserves_send_file_events_in_sync_result(monkeypatch):
agent_api = AttachmentTrackingChatAgentApi("chat-7") class FileEventAgentApi(AttachmentTrackingChatAgentApi):
async def send_message(self, text: str, attachments: list[str] | None = None):
self.calls.append((text, attachments))
yield MsgEventTextChunk(text="he")
yield MsgEventSendFile(path="report.pdf")
yield MsgEventTextChunk(text="llo")
agent_api = AttachmentTrackingAgentApiFactory(chat_api_cls=FileEventAgentApi)
client = RealPlatformClient( client = RealPlatformClient(
agent_api=agent_api, agent_api=agent_api,
prototype_state=PrototypeStateStore(), prototype_state=PrototypeStateStore(),
platform="matrix", platform="matrix",
) )
class FileEventAgentApi(AttachmentTrackingChatAgentApi):
async def send_message(self, text: str, attachments: list[str] | None = None):
self.calls.append((text, attachments))
yield TextChunkEvent("he")
yield SendFileEvent(
workspace_path="/workspace/report.pdf",
mime_type="application/pdf",
filename="report.pdf",
size=123,
)
yield TextChunkEvent("llo")
self.last_tokens_used = 9
monkeypatch.setattr( monkeypatch.setattr(
"sdk.real.MessageResponse", "sdk.real.MessageResponse",
MessageResponseWithAttachments, MessageResponseWithAttachments,
) )
client._agent_api = FileEventAgentApi("chat-7")
result = await client.send_message("@alice:example.org", "chat-7", "hello") result = await client.send_message("@alice:example.org", "chat-7", "hello")
assert result.response == "hello" assert result.response == "hello"
assert result.tokens_used == 9 assert result.tokens_used == 0
assert result.attachments == [ assert result.attachments == [
Attachment( Attachment(
url="/workspace/report.pdf", url="report.pdf",
mime_type="application/pdf", mime_type="application/octet-stream",
filename="report.pdf", filename="report.pdf",
size=123, size=None,
workspace_path="report.pdf", workspace_path="report.pdf",
) )
] ]
@pytest.mark.asyncio
async def test_real_platform_client_works_with_legacy_agent_api_without_for_chat():
legacy_api = LegacyAgentApi()
client = RealPlatformClient(
agent_api=legacy_api,
prototype_state=PrototypeStateStore(),
platform="matrix",
)
result = await client.send_message("@alice:example.org", "chat-legacy", "hello")
assert result == MessageResponse(
message_id="@alice:example.org",
response="hello",
tokens_used=7,
finished=True,
)
assert legacy_api.calls == ["hello"]
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_real_platform_client_reuses_cached_chat_client(): async def test_real_platform_client_reuses_cached_chat_client():
agent_api = FakeAgentApiFactory() agent_api = FakeAgentApiFactory()
@ -678,7 +458,7 @@ async def test_real_platform_client_stream_message_emits_final_tokens_chunk():
message_id="@alice:example.org", message_id="@alice:example.org",
delta="", delta="",
finished=True, finished=True,
tokens_used=3, tokens_used=0,
), ),
] ]
assert agent_api.created_chat_ids == ["chat-1"] assert agent_api.created_chat_ids == ["chat-1"]
@ -703,87 +483,3 @@ async def test_real_platform_client_settings_are_local():
assert isinstance(settings, UserSettings) assert isinstance(settings, UserSettings)
assert settings.skills["browser"] is True assert settings.skills["browser"] is True
assert settings.skills["web-search"] is True assert settings.skills["web-search"] is True
@pytest.mark.asyncio
async def test_agent_api_wrapper_transparently_surfaces_modern_events(monkeypatch):
callback_events: list[object] = []
queue: asyncio.Queue = asyncio.Queue()
event_map = {
"text": TextChunkEvent("he"),
"tool_call": ToolCallChunkEvent("call"),
"tool_result": ToolResultEvent("result"),
"custom_update": CustomUpdateEvent("update"),
"send_file": SendFileEvent(
workspace_path="/workspace/report.pdf",
mime_type="application/pdf",
filename="report.pdf",
size=123,
),
"end": EndEvent(tokens_used=11),
"error": ErrorEvent(code="BOOM", details="bad things"),
"disconnect": GracefulDisconnectEvent(),
}
def fake_validate_json(data: str):
return event_map[data]
monkeypatch.setattr(
agent_api_wrapper_module,
"ServerMessage",
type("FakeServerMessage", (), {"validate_json": staticmethod(fake_validate_json)}),
)
async def fake_cleanup(self):
return None
monkeypatch.setattr(agent_api_wrapper_module.AgentApiWrapper, "_cleanup", fake_cleanup)
monkeypatch.setattr(
agent_api_wrapper_module.AgentApi,
"__init__",
lambda self, agent_id, base_url=None, chat_id=0, **kwargs: (
setattr(self, "id", agent_id)
or setattr(self, "callback", kwargs.get("callback"))
or setattr(self, "on_disconnect", kwargs.get("on_disconnect"))
or setattr(self, "_current_queue", None)
),
)
wrapper = AgentApiWrapper(
agent_id="agent-1",
base_url="https://agent.example.com/v1/agent_ws",
chat_id="chat-1",
callback=callback_events.append,
)
wrapper._current_queue = queue
wrapper._ws = FakeWebSocket(
[
FakeWSMessage("text"),
FakeWSMessage("tool_call"),
FakeWSMessage("tool_result"),
FakeWSMessage("custom_update"),
FakeWSMessage("send_file"),
FakeWSMessage("end"),
FakeWSMessage("error"),
FakeWSMessage("disconnect"),
]
)
await wrapper._listen()
queue_events = []
while not queue.empty():
queue_events.append(await queue.get())
assert queue_events[0].text == "he"
assert any(isinstance(event, SendFileEvent) for event in queue_events)
assert any(isinstance(event, EndEvent) for event in queue_events)
assert any(isinstance(event, GracefulDisconnectEvent) for event in queue_events)
assert callback_events[0].payload == "call"
assert callback_events[1].payload == "result"
assert callback_events[2].payload == "update"
assert any(isinstance(event, SendFileEvent) for event in callback_events)
assert any(isinstance(event, EndEvent) for event in callback_events)
assert any(isinstance(event, ErrorEvent) for event in callback_events)
assert any(isinstance(event, GracefulDisconnectEvent) for event in callback_events)
assert wrapper.last_tokens_used == 11