From 3a3fcdc6953b169ad4a92a70603b30390dc2e679 Mon Sep 17 00:00:00 2001 From: Mikhail Putilovskij Date: Wed, 22 Apr 2026 00:11:20 +0300 Subject: [PATCH 1/4] docs: add thin transport adapter design --- ...-22-transport-layer-thin-adapter-design.md | 318 ++++++++++++++++++ 1 file changed, 318 insertions(+) create mode 100644 docs/superpowers/specs/2026-04-22-transport-layer-thin-adapter-design.md diff --git a/docs/superpowers/specs/2026-04-22-transport-layer-thin-adapter-design.md b/docs/superpowers/specs/2026-04-22-transport-layer-thin-adapter-design.md new file mode 100644 index 0000000..5fab5ef --- /dev/null +++ b/docs/superpowers/specs/2026-04-22-transport-layer-thin-adapter-design.md @@ -0,0 +1,318 @@ +# Transport Layer Thin Adapter Design + +## Цель + +Упростить transport layer между Matrix surface и `platform-agent` до максимально production-like вида: + +- использовать upstream `platform-agent_api.AgentApi` почти как есть +- убрать из surface-side клиента собственную интерпретацию stream semantics +- оставить в нашем коде только integration concerns: + - per-chat lifecycle + - per-chat serialization + - attachment path forwarding + - exception mapping в `PlatformError` + +Это нужно, чтобы: + +- восстановить чёткую границу ответственности между `surfaces` и платформой +- убрать из диагностики ложные факторы, внесённые нашей кастомной обёрткой +- получить честную картину реальных platform bugs до добавления любых policy-надстроек + +## Контекст + +Сейчас transport path состоит из: + +- [sdk/agent_api_wrapper.py](/Users/a/MAI/sem2/lambda/surfaces-bot/sdk/agent_api_wrapper.py) +- [sdk/real.py](/Users/a/MAI/sem2/lambda/surfaces-bot/sdk/real.py) + +Изначально `AgentApiWrapper` был создан по разумным причинам: + +- поддержка переходного периода между разными версиями `platform-agent_api` +- унификация `base_url/url` +- создание per-chat client instances через `for_chat()` +- локальный учёт `tokens_used` + +Позже в этот слой были добавлены уже не compatibility-функции, а собственные transport semantics: + +- custom `_listen()` +- custom `send_message()` +- post-END drain window +- custom idle timeout +- event-kind reclassification + +После этого `surfaces` перестал быть тонким клиентом платформы и начал вести себя как альтернативная реализация transport layer. Это делает диагностику platform bugs нечистой. + +## Принципы дизайна + +### 1. Transport должен быть скучным + +Transport layer не должен: + +- спасать поздние chunks +- лечить duplicate `END` +- придумывать собственные правила границы ответа +- по-своему классифицировать stream events сверх upstream client behavior + +Если upstream stream protocol повреждён, мы должны видеть это как platform issue, а не скрывать его кастомной очередью. + +### 2. Policy и transport разделяются + +Transport: + +- говорит с upstream API +- доставляет события +- закрывает соединение + +Policy: + +- решает, что считать recoverable failure +- нужна ли повторная попытка +- как сообщать ошибку пользователю +- нужно ли сбрасывать chat session + +На первом этапе policy не расширяется. Мы сначала приводим transport к тонкому адаптеру, потом заново оцениваем реальные проблемы. + +### 3. Session lifecycle остаётся на нашей стороне + +Даже в thin-adapter модели `surfaces` по-прежнему отвечает за: + +- кеширование client per chat +- один send lock на chat +- сброс мёртвой chat session после failure +- mapping upstream exceptions в `PlatformError` + +Это не transport semantics, а integration lifecycle. + +## Варианты + +### Вариант A. Оставить текущий кастомный wrapper + +Плюсы: + +- уже работает на части сценариев +- содержит built-in mitigations против observed failures + +Минусы: + +- нарушает границу ответственности +- усложняет диагностику +- делает platform bug reports спорными +- содержит symptom-fix логику в transport layer + +Вердикт: не подходит как production-like target. + +### Вариант B. Thin upstream adapter + +Плюсы: + +- чистая архитектура +- честная диагностика upstream проблем +- минимальная собственная магия + +Минусы: + +- локальные mitigations исчезают +- если upstream client несовершенен, это сразу проявится + +Вердикт: правильный первый этап. + +### Вариант C. Thin adapter сейчас, outer policy layer потом + +Плюсы: + +- даёт production-like эволюцию +- не смешивает transport и resilience policy +- позволяет сначала увидеть реальные проблемы, потом адресовать только нужные + +Минусы: + +- требует двух фаз вместо одной + +Вердикт: рекомендуемый путь. + +## Рекомендуемая архитектура + +### Слой 1. Upstream client + +Источник истины: + +- [external/platform-agent_api/lambda_agent_api/agent_api.py](/Users/a/MAI/sem2/lambda/surfaces-bot/external/platform-agent_api/lambda_agent_api/agent_api.py) + +Мы принимаем его stream semantics как authoritative behavior. + +### Слой 2. Thin adapter + +Файл: + +- [sdk/agent_api_wrapper.py](/Users/a/MAI/sem2/lambda/surfaces-bot/sdk/agent_api_wrapper.py) + +После cleanup он должен содержать только: + +- создание клиента через modern constructor +- `base_url` normalization, если это действительно нужно для наших env +- `for_chat(chat_id)` как factory convenience +- опционально thin storage для `last_tokens_used`, если это можно сделать без переопределения stream semantics + +Он не должен переопределять: + +- `_listen()` +- `send_message()` +- queue lifecycle +- post-END behavior +- timeout behavior + +### Слой 3. Integration/session layer + +Файл: + +- [sdk/real.py](/Users/a/MAI/sem2/lambda/surfaces-bot/sdk/real.py) + +Ответственность: + +- кешировать chat client instances +- сериализовать sends по chat lock +- вызывать `disconnect_chat(chat_id)` после transport failure +- превращать upstream exceptions в `PlatformError` +- форвардить `attachments` как relative workspace paths +- собирать `MessageResponse` / `MessageChunk` для остального приложения + +Этот слой не должен заниматься: + +- исправлением broken stream boundaries +- custom post-END reconstruction +- поздним дренированием очереди + +## Что удаляем + +Из [sdk/agent_api_wrapper.py](/Users/a/MAI/sem2/lambda/surfaces-bot/sdk/agent_api_wrapper.py): + +- custom `_listen()` +- custom `send_message()` +- `_drain_post_end_events()` +- `_event_kind()` +- `_is_kind()` +- `_is_text_event()` +- `_is_end_event()` +- `_is_send_file_event()` +- `_POST_END_DRAIN_MS` +- `_STREAM_IDLE_TIMEOUT_MS` +- debug logging, завязанное на наш собственный queue lifecycle + +## Что оставляем + +В thin adapter: + +- `__init__()` для modern `base_url/chat_id` +- `_normalize_base_url()` только если нужен стабильный env input +- `for_chat(chat_id)` + +В [sdk/real.py](/Users/a/MAI/sem2/lambda/surfaces-bot/sdk/real.py): + +- `_get_chat_api()` +- `_get_chat_send_lock()` +- `_attachment_paths()` +- `disconnect_chat()` +- `_handle_chat_api_failure()` +- `send_message()` +- `stream_message()` + +## Дополнительное упрощение + +Если после cleanup мы считаем pinned upstream API обязательным, то из [sdk/real.py](/Users/a/MAI/sem2/lambda/surfaces-bot/sdk/real.py) можно убрать legacy-minded probing: + +- `inspect.signature(send_message)` +- conditional fallback на старый `send_message(text)` без `attachments` + +В этом случае `RealPlatformClient` всегда использует современный контракт: + +- `send_message(text, attachments=...)` + +Это ещё сильнее уменьшит ambiguity. + +## Этапы миграции + +### Этап 1. Cleanup до thin adapter + +Делаем: + +- сжимаем `sdk/agent_api_wrapper.py` до thin shim +- переносим всю допустимую resilience logic только в `sdk/real.py` +- удаляем тесты, которые закрепляют наши кастомные transport semantics + +### Этап 2. Повторная верификация + +Заново прогоняем: + +- text-only flow +- staged attachments flow +- large image failure +- duplicate `END` behavior +- behavior after transport disconnect + +На этом этапе мы честно увидим, что реально делает upstream transport. + +### Этап 3. Опциональный outer policy layer + +Только если после Этапа 2 это действительно нужно, добавляем policy поверх transport: + +- request timeout целиком +- retry policy +- circuit-breaker-like behavior + +Но это должно жить не в client wrapper, а выше, в integration layer. + +## Тестовая стратегия + +### Удаляем как нецелевые тесты + +Больше не считаем нормой: + +- post-END drain behavior +- recovery late chunks после `END` +- idle timeout внутри wrapper как часть client contract + +### Оставляем и добавляем + +Нужные guarantees: + +1. создаётся отдельный client per chat +2. один chat сериализуется через lock +3. разные чаты не делят client instance +4. attachment paths уходят в `send_message(..., attachments=...)` +5. transport failure приводит к `disconnect_chat(chat_id)` +6. следующий запрос после failure открывает новую chat session +7. upstream exception превращается в `PlatformError` + +## Риски + +### 1. Может снова проявиться реальный upstream bug + +Это не regression дизайна, а полезный результат cleanup. + +### 2. Может исчезнуть локальная защита от зависших стримов + +Это допустимо на первом этапе. +Если она действительно нужна, она должна вернуться как outer policy, а не как переписанный client transport. + +### 3. Может выясниться, что даже thin wrapper не нужен + +Если modern upstream `AgentApi` уже полностью покрывает наш use case, файл `sdk/agent_api_wrapper.py` можно будет заменить на маленький factory helper или убрать совсем. + +## Критерии успеха + +Результат считается успешным, если: + +- transport layer в `surfaces` перестаёт иметь собственную stream semantics +- platform bug reports снова можно формулировать без сильного caveat про кастомный клиент +- Matrix real backend продолжает работать на text-only и attachments scenarios +- failure handling остаётся контролируемым, но больше не маскирует transport behavior платформы + +## Решение + +Принять путь: + +- `Thin upstream adapter now` +- `Observe real behavior` +- `Add outer policy later only if needed` + +Это наиболее близкий к production best practice вариант для текущего состояния проекта. From 4d917ac7941ad1b0dd50eff025c49e5e5fd5de7c Mon Sep 17 00:00:00 2001 From: Mikhail Putilovskij Date: Wed, 22 Apr 2026 00:17:15 +0300 Subject: [PATCH 2/4] docs: add thin transport adapter plan --- ...2026-04-22-transport-layer-thin-adapter.md | 540 ++++++++++++++++++ 1 file changed, 540 insertions(+) create mode 100644 docs/superpowers/plans/2026-04-22-transport-layer-thin-adapter.md diff --git a/docs/superpowers/plans/2026-04-22-transport-layer-thin-adapter.md b/docs/superpowers/plans/2026-04-22-transport-layer-thin-adapter.md new file mode 100644 index 0000000..b1984ec --- /dev/null +++ b/docs/superpowers/plans/2026-04-22-transport-layer-thin-adapter.md @@ -0,0 +1,540 @@ +# Transport Layer Thin Adapter Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Replace the current custom transport behavior with a thin upstream-compatible adapter so Matrix uses `platform-agent_api.AgentApi` almost as-is and keeps only integration concerns on the `surfaces` side. + +**Architecture:** Keep a tiny `AgentApiWrapper` only as a construction/factory shim (`base_url` normalization + `for_chat(chat_id)`). Move all resilience logic that still belongs to `surfaces` into `RealPlatformClient`: per-chat client caching, per-chat send locks, disconnect-on-failure, and `PlatformError` mapping. Delete all custom stream semantics from the wrapper so bugs in streaming can be attributed cleanly to either upstream or our integration layer. + +**Tech Stack:** Python 3.11, `aiohttp`, upstream `lambda_agent_api`, `pytest`, `pytest-asyncio`, `ruff` + +--- + +## File Structure + +- Modify: `sdk/agent_api_wrapper.py` + Purpose: shrink the wrapper to a thin constructor/factory shim with no custom `_listen()` or `send_message()` logic. +- Modify: `sdk/real.py` + Purpose: keep integration-only behavior: cached per-chat clients, chat locks, attachment forwarding, and failure cleanup. +- Modify: `adapter/matrix/bot.py` + Purpose: instantiate the wrapper with the modern `base_url` path instead of a raw `url`, matching the pinned upstream API. +- Modify: `tests/platform/test_real.py` + Purpose: remove tests that encode custom wrapper semantics and replace them with tests that prove only the intended integration guarantees. +- Modify: `README.md` + Purpose: document that the transport layer now uses the pinned upstream `platform-agent_api` client semantics directly, with only a thin local adapter. + +### Task 1: Shrink `AgentApiWrapper` To A Thin Factory Shim + +**Files:** +- Modify: `sdk/agent_api_wrapper.py` +- Test: `tests/platform/test_real.py` + +- [ ] **Step 1: Replace wrapper-only behavior tests with thin-wrapper tests** + +Update `tests/platform/test_real.py` so it no longer asserts any custom post-END drain or wrapper-owned timeout behavior. Replace those tests with the following: + +```python +def test_agent_api_wrapper_normalizes_base_url_and_uses_modern_constructor(monkeypatch): + captured = {} + + def fake_init(self, agent_id, base_url=None, chat_id=0, **kwargs): + captured["agent_id"] = agent_id + captured["base_url"] = base_url + captured["chat_id"] = chat_id + + monkeypatch.setattr(agent_api_wrapper_module.AgentApi, "__init__", fake_init) + + wrapper = AgentApiWrapper( + agent_id="agent-1", + base_url="ws://platform-agent:8000/v1/agent_ws/", + chat_id="41", + ) + + assert wrapper.chat_id == "41" + assert wrapper._base_url == "ws://platform-agent:8000" + assert captured == { + "agent_id": "agent-1", + "base_url": "ws://platform-agent:8000", + "chat_id": "41", + } + + +def test_agent_api_wrapper_for_chat_reuses_normalized_base_url(monkeypatch): + init_calls = [] + + def fake_init(self, agent_id, base_url=None, chat_id=0, **kwargs): + self.id = agent_id + self.chat_id = chat_id + self.url = base_url + init_calls.append((agent_id, base_url, chat_id)) + + monkeypatch.setattr(agent_api_wrapper_module.AgentApi, "__init__", fake_init) + + root = AgentApiWrapper( + agent_id="agent-1", + base_url="http://platform-agent:8000/v1/agent_ws/", + chat_id="1", + ) + + child = root.for_chat("99") + + assert child is not root + assert child.chat_id == "99" + assert child._base_url == "http://platform-agent:8000" + assert init_calls == [ + ("agent-1", "http://platform-agent:8000", "1"), + ("agent-1", "http://platform-agent:8000", "99"), + ] +``` + +- [ ] **Step 2: Run tests to verify old assumptions fail** + +Run: + +```bash +/bin/zsh -lc 'PYTHONPATH=.:external/platform-agent_api uv run pytest tests/platform/test_real.py -q -k "recovers_late_text_after_first_end or times_out_on_idle_stream or normalizes_base_url_and_uses_modern_constructor or for_chat_reuses_normalized_base_url"' +``` + +Expected: + +- FAIL because the old wrapper-behavior tests still exist +- FAIL or SKIP for the new thin-wrapper tests until code/tests are aligned + +- [ ] **Step 3: Replace `sdk/agent_api_wrapper.py` with a thin wrapper** + +Rewrite `sdk/agent_api_wrapper.py` to the minimal implementation below: + +```python +from __future__ import annotations + +import inspect +import re +import sys +from pathlib import Path +from urllib.parse import urlsplit, urlunsplit + +_api_root = Path(__file__).resolve().parents[1] / "external" / "platform-agent_api" +if str(_api_root) not in sys.path: + sys.path.insert(0, str(_api_root)) + +from lambda_agent_api.agent_api import AgentApi # noqa: E402 + + +class AgentApiWrapper(AgentApi): + """Thin construction/factory shim over the pinned upstream AgentApi.""" + + def __init__( + self, + agent_id: str, + base_url: str, + *, + chat_id: int | str = 0, + **kwargs, + ) -> None: + self._base_url = self._normalize_base_url(base_url) + self._init_kwargs = dict(kwargs) + self.chat_id = chat_id + if not self._supports_modern_constructor(): + raise RuntimeError("Pinned platform-agent_api is expected to support base_url + chat_id") + + super().__init__( + agent_id=agent_id, + base_url=self._base_url, + chat_id=chat_id, + **kwargs, + ) + + @staticmethod + def _supports_modern_constructor() -> bool: + try: + parameters = inspect.signature(AgentApi.__init__).parameters + except (TypeError, ValueError): + return False + return "base_url" in parameters and "chat_id" in parameters + + @staticmethod + def _normalize_base_url(base_url: str) -> str: + parsed = urlsplit(base_url) + path = re.sub(r"(?:/v1)?/agent_ws(?:/[^/]+)?/?$", "", parsed.path.rstrip("/")) + return urlunsplit((parsed.scheme, parsed.netloc, path, "", "")) + + def for_chat(self, chat_id: int | str) -> "AgentApiWrapper": + return type(self)( + agent_id=self.id, + base_url=self._base_url, + chat_id=chat_id, + **self._init_kwargs, + ) +``` + +- [ ] **Step 4: Run the wrapper-focused tests** + +Run: + +```bash +/bin/zsh -lc 'PYTHONPATH=.:external/platform-agent_api uv run pytest tests/platform/test_real.py -q -k "normalizes_base_url_and_uses_modern_constructor or for_chat_reuses_normalized_base_url"' +``` + +Expected: + +- PASS + +- [ ] **Step 5: Commit** + +```bash +git add sdk/agent_api_wrapper.py tests/platform/test_real.py +git commit -m "refactor: shrink agent api wrapper to thin adapter" +``` + +### Task 2: Simplify `RealPlatformClient` To The Pinned Modern API + +**Files:** +- Modify: `sdk/real.py` +- Modify: `adapter/matrix/bot.py` +- Test: `tests/platform/test_real.py` + +- [ ] **Step 1: Add failing integration tests for the desired thin-adapter contract** + +Extend `tests/platform/test_real.py` with these assertions: + +```python +@pytest.mark.asyncio +async def test_real_platform_client_passes_attachments_to_modern_send_message(): + agent_api = FakeAgentApiFactory() + client = RealPlatformClient( + agent_api=agent_api, + prototype_state=PrototypeStateStore(), + platform="matrix", + ) + + attachment = Attachment( + type="document", + filename="report.pdf", + mime_type="application/pdf", + workspace_path="surfaces/matrix/u1/r1/inbox/report.pdf", + ) + + result = await client.send_message( + "@alice:example.org", + "chat-1", + "read this", + attachments=[attachment], + ) + + assert result.response == "read this" + assert agent_api.instances["chat-1"].calls == [ + ("read this", ["surfaces/matrix/u1/r1/inbox/report.pdf"]) + ] + + +@pytest.mark.asyncio +async def test_real_platform_client_disconnects_chat_after_agent_exception(): + class ErroringChatAgentApi: + def __init__(self, chat_id: str) -> None: + self.chat_id = chat_id + self.connect_calls = 0 + self.close_calls = 0 + + async def connect(self) -> None: + self.connect_calls += 1 + + async def close(self) -> None: + self.close_calls += 1 + + async def send_message(self, text: str, attachments: list[str] | None = None): + raise agent_api_wrapper_module.AgentException("INTERNAL_ERROR", "boom") + yield + + agent_api = FakeAgentApiFactory() + erroring = ErroringChatAgentApi("chat-1") + agent_api.for_chat = lambda chat_id: erroring + client = RealPlatformClient( + agent_api=agent_api, + prototype_state=PrototypeStateStore(), + platform="matrix", + ) + + with pytest.raises(PlatformError, match="boom") as exc_info: + await client.send_message("@alice:example.org", "chat-1", "hello") + + assert exc_info.value.code == "INTERNAL_ERROR" + assert erroring.close_calls == 1 + assert "chat-1" not in client._chat_apis +``` + +- [ ] **Step 2: Run tests to verify they fail before simplification** + +Run: + +```bash +/bin/zsh -lc 'PYTHONPATH=.:external/platform-agent_api uv run pytest tests/platform/test_real.py -q -k "passes_attachments_to_modern_send_message or disconnects_chat_after_agent_exception"' +``` + +Expected: + +- FAIL until `sdk/real.py` and Matrix runtime wiring are aligned with the pinned modern API + +- [ ] **Step 3: Simplify `sdk/real.py` and Matrix runtime construction** + +Make these exact edits: + +```python +# adapter/matrix/bot.py +def _build_platform_from_env() -> PlatformClient: + backend = os.environ.get("MATRIX_PLATFORM_BACKEND", "mock").strip().lower() + if backend == "real": + base_url = os.environ["AGENT_BASE_URL"] + return RealPlatformClient( + agent_api=AgentApiWrapper(agent_id="matrix-bot", base_url=base_url), + prototype_state=PrototypeStateStore(), + platform="matrix", + ) + return MockPlatformClient() +``` + +```python +# sdk/real.py +from __future__ import annotations + +import asyncio +from collections.abc import AsyncIterator +from pathlib import Path + +from sdk.agent_api_wrapper import AgentApiWrapper +from sdk.interface import ( + Attachment, + MessageChunk, + MessageResponse, + PlatformClient, + PlatformError, + User, + UserSettings, +) +from sdk.prototype_state import PrototypeStateStore + + +class RealPlatformClient(PlatformClient): + def __init__( + self, + agent_api: AgentApiWrapper, + prototype_state: PrototypeStateStore, + platform: str = "matrix", + ) -> None: + self._agent_api = agent_api + self._prototype_state = prototype_state + self._platform = platform + self._chat_apis: dict[str, AgentApiWrapper] = {} + self._chat_api_lock = asyncio.Lock() + self._chat_send_locks: dict[str, asyncio.Lock] = {} + + @property + def agent_api(self) -> AgentApiWrapper: + return self._agent_api + + async def _get_chat_api(self, chat_id: str) -> AgentApiWrapper: + chat_key = str(chat_id) + chat_api = self._chat_apis.get(chat_key) + if chat_api is None: + async with self._chat_api_lock: + chat_api = self._chat_apis.get(chat_key) + if chat_api is None: + chat_api = self._agent_api.for_chat(chat_key) + await chat_api.connect() + self._chat_apis[chat_key] = chat_api + return chat_api + + def _get_chat_send_lock(self, chat_id: str) -> asyncio.Lock: + chat_key = str(chat_id) + lock = self._chat_send_locks.get(chat_key) + if lock is None: + lock = asyncio.Lock() + self._chat_send_locks[chat_key] = lock + return lock + + async def send_message( + self, + user_id: str, + chat_id: str, + text: str, + attachments: list[Attachment] | None = None, + ) -> MessageResponse: + response_parts: list[str] = [] + tokens_used = 0 + sent_attachments: list[Attachment] = [] + message_id = user_id + + lock = self._get_chat_send_lock(chat_id) + async with lock: + chat_api = await self._get_chat_api(chat_id) + try: + async for event in chat_api.send_message(text, attachments=self._attachment_paths(attachments)): + if hasattr(event, "text"): + response_parts.append(event.text) + elif event.__class__.__name__ == "MsgEventEnd": + tokens_used = getattr(event, "tokens_used", 0) + elif "SEND_FILE" in getattr(getattr(event, "type", None), "value", str(getattr(event, "type", ""))): + attachment = self._attachment_from_send_file_event(event) + if attachment is not None: + sent_attachments.append(attachment) + except Exception as exc: + await self._handle_chat_api_failure(chat_id, exc) + + await self._prototype_state.set_last_tokens_used(str(chat_id), tokens_used) + + return MessageResponse( + message_id=message_id, + response="".join(response_parts), + tokens_used=tokens_used, + finished=True, + attachments=sent_attachments, + ) + + async def stream_message( + self, + user_id: str, + chat_id: str, + text: str, + attachments: list[Attachment] | None = None, + ) -> AsyncIterator[MessageChunk]: + lock = self._get_chat_send_lock(chat_id) + async with lock: + chat_api = await self._get_chat_api(chat_id) + try: + async for event in chat_api.send_message(text, attachments=self._attachment_paths(attachments)): + if hasattr(event, "text"): + yield MessageChunk( + message_id=user_id, + delta=event.text, + finished=False, + ) + elif event.__class__.__name__ == "MsgEventEnd": + tokens_used = getattr(event, "tokens_used", 0) + await self._prototype_state.set_last_tokens_used(str(chat_id), tokens_used) + yield MessageChunk( + message_id=user_id, + delta="", + finished=True, + tokens_used=tokens_used, + ) + except Exception as exc: + await self._handle_chat_api_failure(chat_id, exc) + + async def disconnect_chat(self, chat_id: str) -> None: + chat_key = str(chat_id) + chat_api = self._chat_apis.pop(chat_key, None) + self._chat_send_locks.pop(chat_key, None) + if chat_api is not None: + await chat_api.close() + + async def close(self) -> None: + for chat_api in list(self._chat_apis.values()): + await chat_api.close() + self._chat_apis.clear() + self._chat_send_locks.clear() + + async def _handle_chat_api_failure(self, chat_id: str, exc: Exception) -> None: + await self.disconnect_chat(chat_id) + code = getattr(exc, "code", None) or "PLATFORM_CONNECTION_ERROR" + raise PlatformError(str(exc), code=code) from exc + + @staticmethod + def _attachment_paths(attachments: list[Attachment] | None) -> list[str]: + if not attachments: + return [] + return [attachment.workspace_path for attachment in attachments if attachment.workspace_path] +``` + +- [ ] **Step 4: Run the focused transport tests** + +Run: + +```bash +/bin/zsh -lc 'PYTHONPATH=.:external/platform-agent_api uv run pytest tests/platform/test_real.py -q -k "passes_attachments_to_modern_send_message or disconnects_chat_after_agent_exception or wraps_connection_closed_as_platform_error or reconnects_after_closed_chat_api"' +``` + +Expected: + +- PASS + +- [ ] **Step 5: Commit** + +```bash +git add adapter/matrix/bot.py sdk/real.py tests/platform/test_real.py +git commit -m "refactor: use upstream transport semantics in real client" +``` + +### Task 3: Remove Custom Transport Assumptions From Tests And Docs + +**Files:** +- Modify: `tests/platform/test_real.py` +- Modify: `README.md` + +- [ ] **Step 1: Delete tests that encode wrapper-owned stream semantics** + +Remove any tests that assert: + +- late text is recovered after the first `END` +- duplicate `END` is repaired inside our wrapper +- wrapper-owned idle timeout semantics + +The file should keep only tests for: + +- wrapper construction/factory behavior +- per-chat client reuse +- reconnect/disconnect after failure +- attachment forwarding +- per-chat send locking + +- [ ] **Step 2: Update README transport description** + +Add this text to the Matrix runtime/backend section in `README.md`: + +```md +Transport layer note: + +- `surfaces` now uses the pinned upstream `platform-agent_api.AgentApi` stream semantics directly +- local code keeps only a thin adapter for client construction and per-chat client factories +- request serialization, disconnect-on-failure, and `PlatformError` mapping remain in `sdk/real.py` +- `surfaces` no longer performs local post-END stream reconstruction +``` + +- [ ] **Step 3: Run the full verification set** + +Run: + +```bash +uv run ruff check adapter/matrix sdk tests/platform/test_real.py +/bin/zsh -lc 'PYTHONPATH=. uv run pytest tests/adapter/matrix -q' +/bin/zsh -lc 'PYTHONPATH=.:external/platform-agent_api uv run pytest tests/platform/test_real.py -q' +``` + +Expected: + +- `ruff` reports `All checks passed!` +- Matrix adapter tests PASS +- `tests/platform/test_real.py` PASS + +- [ ] **Step 4: Commit** + +```bash +git add README.md tests/platform/test_real.py +git commit -m "test: remove custom transport semantics assumptions" +``` + +--- + +## Self-Review + +- Spec coverage: + - thin adapter target: covered by Task 1 + - integration-only `RealPlatformClient`: covered by Task 2 + - removal of custom stream semantics assumptions: covered by Task 3 + - re-verification after cleanup: covered by Task 3 + +- Placeholder scan: + - no `TODO`, `TBD`, or deferred implementation placeholders remain in task steps + +- Type consistency: + - `AgentApiWrapper` remains the construction/factory type used by `RealPlatformClient` + - failure mapping still terminates in `PlatformError` + - attachment forwarding consistently uses `attachments: list[str]` From 569824ead152093890d1da8229d5b6e1870a0de4 Mon Sep 17 00:00:00 2001 From: Mikhail Putilovskij Date: Wed, 22 Apr 2026 00:22:20 +0300 Subject: [PATCH 3/4] refactor: shrink agent api wrapper to thin adapter --- sdk/agent_api_wrapper.py | 285 ++------------------------------ tests/platform/test_real.py | 319 ++++-------------------------------- 2 files changed, 47 insertions(+), 557 deletions(-) diff --git a/sdk/agent_api_wrapper.py b/sdk/agent_api_wrapper.py index f29f820..fa69816 100644 --- a/sdk/agent_api_wrapper.py +++ b/sdk/agent_api_wrapper.py @@ -1,67 +1,43 @@ from __future__ import annotations -import asyncio import inspect -import logging -import os import re import sys -from collections.abc import AsyncIterator from pathlib import Path from urllib.parse import urlsplit, urlunsplit -import aiohttp - _api_root = Path(__file__).resolve().parents[1] / "external" / "platform-agent_api" if str(_api_root) not in sys.path: sys.path.insert(0, str(_api_root)) -from lambda_agent_api.agent_api import AgentApi, AgentBusyException, AgentException # noqa: E402 -from lambda_agent_api.client import EClientMessage, MsgUserMessage # noqa: E402 -from lambda_agent_api.server import AgentEventUnion, MsgEventEnd, ServerMessage # noqa: E402 - -logger = logging.getLogger(__name__) -_DEBUG_STREAM = os.environ.get("SURFACES_AGENT_DEBUG_STREAM", "").strip().lower() in { - "1", - "true", - "yes", -} -_POST_END_DRAIN_MS = int(os.environ.get("SURFACES_AGENT_POST_END_DRAIN_MS", "120")) -_STREAM_IDLE_TIMEOUT_MS = int(os.environ.get("SURFACES_AGENT_IDLE_TIMEOUT_MS", "60000")) +from lambda_agent_api.agent_api import AgentApi # noqa: E402 class AgentApiWrapper(AgentApi): - """Capture tokens_used from MsgEventEnd without patching upstream code.""" + """Thin construction/factory shim over the pinned upstream AgentApi.""" def __init__( self, agent_id: str, - base_url: str | None = None, + base_url: str, *, chat_id: int | str = 0, - url: str | None = None, **kwargs, ) -> None: - if base_url is None and url is None: - raise TypeError("AgentApiWrapper requires base_url or url") - - self._base_url = self._normalize_base_url(base_url or url or "") + self._base_url = self._normalize_base_url(base_url) self._init_kwargs = dict(kwargs) self.chat_id = chat_id - if self._supports_modern_constructor(): - super().__init__( - agent_id=agent_id, - base_url=self._base_url, - chat_id=chat_id, - **kwargs, + if not self._supports_modern_constructor(): + raise RuntimeError( + "Pinned platform-agent_api is expected to support base_url + chat_id" ) - else: - super().__init__( - agent_id=agent_id, - url=self._build_ws_url(self._base_url, chat_id), - **kwargs, - ) - self.last_tokens_used = 0 + + super().__init__( + agent_id=agent_id, + base_url=self._base_url, + chat_id=chat_id, + **kwargs, + ) @staticmethod def _supports_modern_constructor() -> bool: @@ -69,247 +45,18 @@ class AgentApiWrapper(AgentApi): parameters = inspect.signature(AgentApi.__init__).parameters except (TypeError, ValueError): return False - return "base_url" in parameters and "chat_id" in parameters @staticmethod def _normalize_base_url(base_url: str) -> str: parsed = urlsplit(base_url) - path = re.sub(r"(?:/v1)?/agent_ws(?:/[^/]+)?$", "", parsed.path.rstrip("/")) + path = re.sub(r"(?:/v1)?/agent_ws(?:/[^/]+)?/?$", "", parsed.path.rstrip("/")) return urlunsplit((parsed.scheme, parsed.netloc, path, "", "")) - @staticmethod - def _build_ws_url(base_url: str, chat_id: int | str) -> str: - return base_url.rstrip("/") + f"/agent_ws/?thread_id={chat_id}" - - def for_chat(self, chat_id: int | str) -> AgentApiWrapper: + def for_chat(self, chat_id: int | str) -> "AgentApiWrapper": return type(self)( agent_id=self.id, base_url=self._base_url, chat_id=chat_id, **self._init_kwargs, ) - - @staticmethod - def _event_kind(event: object) -> str: - raw_kind = getattr(event, "type", None) - if hasattr(raw_kind, "value"): - raw_kind = raw_kind.value - if raw_kind is None: - raw_kind = event.__class__.__name__ - - kind = str(raw_kind).replace("-", "_") - if "_" in kind: - return kind.upper() - - normalized = [] - for index, char in enumerate(kind): - if index and char.isupper() and not kind[index - 1].isupper(): - normalized.append("_") - normalized.append(char) - return "".join(normalized).upper() - - @classmethod - def _is_kind(cls, event: object, *needles: str) -> bool: - kind = cls._event_kind(event) - return any(needle in kind for needle in needles) - - @classmethod - def _is_text_event(cls, event: object) -> bool: - return hasattr(event, "text") or cls._is_kind(event, "TEXT_CHUNK") - - @classmethod - def _is_end_event(cls, event: object) -> bool: - kind = cls._event_kind(event) - return kind == "END" or kind.endswith("_END") - - @classmethod - def _is_send_file_event(cls, event: object) -> bool: - return "SEND_FILE" in cls._event_kind(event) - - async def _publish_event(self, event: object, *, queue_event: object | None = None) -> None: - if self.callback: - self.callback(event) - if self._current_queue: - await self._current_queue.put(queue_event if queue_event is not None else event) - - async def _publish_error(self, event: object) -> None: - if self.callback: - self.callback(event) - if self._current_queue and hasattr(event, "code") and hasattr(event, "details"): - await self._current_queue.put(AgentException(event.code, event.details)) - - async def _listen(self): - try: - async for msg in self._ws: - if msg.type == aiohttp.WSMsgType.TEXT: - try: - outgoing_msg = ServerMessage.validate_json(msg.data) - - if self._is_text_event(outgoing_msg): - if _DEBUG_STREAM: - logger.warning( - "[%s] text chunk queue=%s text=%r", - self.id, - self._current_queue is not None, - getattr(outgoing_msg, "text", "")[:80], - ) - if self._current_queue: - await self._current_queue.put(outgoing_msg) - elif self.callback: - self.callback(outgoing_msg) - else: - logger.warning("[%s] AgentEvent without active request", self.id) - - elif self._is_end_event(outgoing_msg): - self.last_tokens_used = outgoing_msg.tokens_used - if _DEBUG_STREAM: - logger.warning( - "[%s] end event queue=%s tokens=%s", - self.id, - self._current_queue is not None, - getattr(outgoing_msg, "tokens_used", None), - ) - await self._publish_event(outgoing_msg) - - elif self._is_kind(outgoing_msg, "ERROR"): - error = AgentException(outgoing_msg.code, outgoing_msg.details) - logger.error("[%s] Agent error: %s", self.id, error) - await self._publish_error(outgoing_msg) - - elif self._is_kind(outgoing_msg, "GRACEFUL_DISCONNECT"): - await self._publish_event(outgoing_msg) - logger.info("[%s] Gracefully disconnecting", self.id) - break - - else: - await self._publish_event(outgoing_msg) - - except Exception as exc: - logger.error("[%s] Failed to deserialize message: %s", self.id, exc) - if self._current_queue: - await self._current_queue.put( - AgentException("PARSE_ERROR", f"Validation failed: {exc}") - ) - - elif msg.type in (aiohttp.WSMsgType.ERROR, aiohttp.WSMsgType.CLOSED): - logger.error("[%s] WebSocket closed/error: %s", self.id, msg.type) - break - - except asyncio.CancelledError: - pass - except Exception as exc: - logger.error("[%s] Error in listen loop: %s", self.id, exc) - finally: - await self._cleanup() - - async def send_message( - self, text: str, attachments: list[str] | None = None - ) -> AsyncIterator[AgentEventUnion]: - if not self._connected or not self._ws: - raise AgentException( - code="NOT_CONNECTED", details="Not connected. Call connect() first." - ) - - if self._request_lock.locked(): - raise AgentBusyException("Agent is currently processing another request") - - await self._request_lock.acquire() - try: - self._current_queue = asyncio.Queue() - - message = MsgUserMessage( - type=EClientMessage.USER_MESSAGE, - text=text, - attachments=attachments or [], - ) - - await self._ws.send_str(message.model_dump_json()) - logger.debug("[%s] Sent message: %s...", self.id, text[:50]) - - while True: - try: - chunk = await asyncio.wait_for( - self._current_queue.get(), - timeout=max(_STREAM_IDLE_TIMEOUT_MS, 0) / 1000, - ) - except TimeoutError as exc: - raise AgentException( - "TIMEOUT", - ( - "Timed out waiting for the next agent stream event " - f"after {max(_STREAM_IDLE_TIMEOUT_MS, 0)}ms" - ), - ) from exc - - if isinstance(chunk, Exception): - raise chunk - - if isinstance(chunk, MsgEventEnd): - self.last_tokens_used = chunk.tokens_used - async for late_chunk in self._drain_post_end_events(): - yield late_chunk - break - - yield chunk - - finally: - if self._current_queue: - orphan_queue = self._current_queue - self._current_queue = None - - while not orphan_queue.empty(): - try: - orphan_msg = orphan_queue.get_nowait() - if isinstance(orphan_msg, Exception): - logger.debug( - "[%s] Dropped exception from queue during cleanup: %s", - self.id, - orphan_msg, - ) - continue - - if self.callback: - self.callback(orphan_msg) - else: - logger.debug("[%s] Dropped orphaned message during cleanup", self.id) - - except asyncio.QueueEmpty: - break - - if self._request_lock.locked(): - self._request_lock.release() - - async def _drain_post_end_events(self) -> AsyncIterator[AgentEventUnion]: - if self._current_queue is None: - return - - timeout_s = max(_POST_END_DRAIN_MS, 0) / 1000 - while True: - try: - chunk = await asyncio.wait_for(self._current_queue.get(), timeout=timeout_s) - except TimeoutError: - break - - if isinstance(chunk, Exception): - logger.warning("[%s] dropping post-END exception: %s", self.id, chunk) - continue - - if isinstance(chunk, MsgEventEnd): - self.last_tokens_used = chunk.tokens_used - if _DEBUG_STREAM: - logger.warning( - "[%s] dropped duplicate END tokens=%s", - self.id, - chunk.tokens_used, - ) - continue - - if _DEBUG_STREAM and self._is_text_event(chunk): - logger.warning( - "[%s] recovered post-END text chunk=%r", - self.id, - getattr(chunk, "text", "")[:80], - ) - - yield chunk diff --git a/tests/platform/test_real.py b/tests/platform/test_real.py index 2291d9d..382b554 100644 --- a/tests/platform/test_real.py +++ b/tests/platform/test_real.py @@ -1,7 +1,6 @@ import asyncio import pytest -from lambda_agent_api.server import MsgEventEnd, MsgEventTextChunk import sdk.agent_api_wrapper as agent_api_wrapper_module from core.protocol import SettingsAction @@ -142,233 +141,61 @@ class TextChunkEvent: self.type = "AGENT_EVENT_TEXT_CHUNK" self.text = text - -class ToolCallChunkEvent: - def __init__(self, payload: str) -> None: - self.type = "AGENT_EVENT_TOOL_CALL_CHUNK" - self.payload = payload - - -class ToolResultEvent: - def __init__(self, payload: str) -> None: - self.type = "AGENT_EVENT_TOOL_RESULT" - self.payload = payload - - -class CustomUpdateEvent: - def __init__(self, payload: str) -> None: - self.type = "AGENT_EVENT_CUSTOM_UPDATE" - self.payload = payload - - -class EndEvent: - def __init__(self, tokens_used: int) -> None: - self.type = "AGENT_EVENT_END" - self.tokens_used = tokens_used - - -class ErrorEvent: - def __init__(self, code: str, details: str) -> None: - self.type = "ERROR" - self.code = code - self.details = details - - -class GracefulDisconnectEvent: - def __init__(self) -> None: - self.type = "GRACEFUL_DISCONNECT" - - -class FakeWSMessage: - def __init__(self, data: str) -> None: - self.type = agent_api_wrapper_module.aiohttp.WSMsgType.TEXT - self.data = data - - -class FakeWebSocket: - def __init__(self, messages: list[FakeWSMessage]) -> None: - self._messages = list(messages) - - def __aiter__(self): - return self - - async def __anext__(self): - if not self._messages: - raise StopAsyncIteration - return self._messages.pop(0) - - -class QueueFeedingWebSocket: - def __init__(self, owner, queued_events: list[object]) -> None: - self.owner = owner - self.queued_events = list(queued_events) - self.sent_payloads: list[str] = [] - - async def send_str(self, payload: str) -> None: - self.sent_payloads.append(payload) - for event in self.queued_events: - await self.owner._current_queue.put(event) - - -class SilentWebSocket: - def __init__(self) -> None: - self.sent_payloads: list[str] = [] - - async def send_str(self, payload: str) -> None: - self.sent_payloads.append(payload) - - class MessageResponseWithAttachments(MessageResponse): attachments: list[Attachment] = [] -def test_agent_api_wrapper_uses_modern_constructor_when_available(monkeypatch): - calls: list[dict[str, object]] = [] +def test_agent_api_wrapper_normalizes_base_url_and_uses_modern_constructor(monkeypatch): + captured = {} - def fake_init(self, agent_id, base_url, chat_id, **kwargs): - calls.append( - { - "agent_id": agent_id, - "base_url": base_url, - "chat_id": chat_id, - "kwargs": kwargs, - } - ) - self.id = agent_id - self.url = base_url - self.callback = kwargs.get("callback") - self.on_disconnect = kwargs.get("on_disconnect") + def fake_init(self, agent_id, base_url=None, chat_id=0, **kwargs): + captured["agent_id"] = agent_id + captured["base_url"] = base_url + captured["chat_id"] = chat_id monkeypatch.setattr(agent_api_wrapper_module.AgentApi, "__init__", fake_init) wrapper = AgentApiWrapper( agent_id="agent-1", - base_url="https://agent.example.com/v1/agent_ws", - chat_id="chat-1", - callback="cb", - on_disconnect="disconnect", - ) - child = wrapper.for_chat("chat-2") - - assert calls == [ - { - "agent_id": "agent-1", - "base_url": "https://agent.example.com", - "chat_id": "chat-1", - "kwargs": {"callback": "cb", "on_disconnect": "disconnect"}, - }, - { - "agent_id": "agent-1", - "base_url": "https://agent.example.com", - "chat_id": "chat-2", - "kwargs": {"callback": "cb", "on_disconnect": "disconnect"}, - }, - ] - assert wrapper._base_url == "https://agent.example.com" - assert wrapper.chat_id == "chat-1" - assert wrapper.last_tokens_used == 0 - assert child.chat_id == "chat-2" - - -def test_agent_api_wrapper_falls_back_to_legacy_url_constructor(monkeypatch): - calls: list[dict[str, object]] = [] - - def fake_init(self, agent_id, url, callback=None, on_disconnect=None): - calls.append( - { - "agent_id": agent_id, - "url": url, - "callback": callback, - "on_disconnect": on_disconnect, - } - ) - self.id = agent_id - self.url = url - self.callback = callback - self.on_disconnect = on_disconnect - - monkeypatch.setattr(agent_api_wrapper_module.AgentApi, "__init__", fake_init) - - wrapper = AgentApiWrapper( - agent_id="agent-2", - url="https://agent.example.com/agent_ws/", - chat_id="chat-9", - callback="cb", + base_url="ws://platform-agent:8000/v1/agent_ws/", + chat_id="41", ) - assert calls == [ - { - "agent_id": "agent-2", - "url": "https://agent.example.com/agent_ws/?thread_id=chat-9", - "callback": "cb", - "on_disconnect": None, - } - ] - assert wrapper._base_url == "https://agent.example.com" - assert wrapper.chat_id == "chat-9" - assert wrapper.last_tokens_used == 0 + assert wrapper.chat_id == "41" + assert wrapper._base_url == "ws://platform-agent:8000" + assert captured == { + "agent_id": "agent-1", + "base_url": "ws://platform-agent:8000", + "chat_id": "41", + } -@pytest.mark.asyncio -async def test_agent_api_wrapper_recovers_late_text_after_first_end(monkeypatch): +def test_agent_api_wrapper_for_chat_reuses_normalized_base_url(monkeypatch): + init_calls = [] + def fake_init(self, agent_id, base_url=None, chat_id=0, **kwargs): self.id = agent_id + self.chat_id = chat_id self.url = base_url - self.callback = kwargs.get("callback") - self.on_disconnect = kwargs.get("on_disconnect") + init_calls.append((agent_id, base_url, chat_id)) monkeypatch.setattr(agent_api_wrapper_module.AgentApi, "__init__", fake_init) - wrapper = AgentApiWrapper( + root = AgentApiWrapper( agent_id="agent-1", - base_url="https://agent.example.com/v1/agent_ws", - chat_id="chat-1", - ) - wrapper._connected = True - wrapper._request_lock = asyncio.Lock() - wrapper._current_queue = None - wrapper._ws = QueueFeedingWebSocket( - wrapper, - [ - MsgEventTextChunk(text="Иллюстра"), - MsgEventEnd(tokens_used=5), - MsgEventTextChunk(text="ция"), - MsgEventEnd(tokens_used=5), - ], + base_url="http://platform-agent:8000/v1/agent_ws/", + chat_id="1", ) - chunks = [] - async for chunk in wrapper.send_message("hello"): - chunks.append(chunk) + child = root.for_chat("99") - assert [chunk.text for chunk in chunks] == ["Иллюстра", "ция"] - assert wrapper.last_tokens_used == 5 - - -@pytest.mark.asyncio -async def test_agent_api_wrapper_times_out_on_idle_stream(monkeypatch): - def fake_init(self, agent_id, base_url=None, chat_id=0, **kwargs): - self.id = agent_id - self.url = base_url - self.callback = kwargs.get("callback") - self.on_disconnect = kwargs.get("on_disconnect") - - monkeypatch.setattr(agent_api_wrapper_module.AgentApi, "__init__", fake_init) - monkeypatch.setattr(agent_api_wrapper_module, "_STREAM_IDLE_TIMEOUT_MS", 10) - - wrapper = AgentApiWrapper( - agent_id="agent-1", - base_url="https://agent.example.com/v1/agent_ws", - chat_id="chat-1", - ) - wrapper._connected = True - wrapper._request_lock = asyncio.Lock() - wrapper._current_queue = None - wrapper._ws = SilentWebSocket() - - with pytest.raises(agent_api_wrapper_module.AgentException, match="Timed out waiting"): - async for _ in wrapper.send_message("hello"): - pass + assert child is not root + assert child.chat_id == "99" + assert child._base_url == "http://platform-agent:8000" + assert init_calls == [ + ("agent-1", "http://platform-agent:8000", "1"), + ("agent-1", "http://platform-agent:8000", "99"), + ] @pytest.mark.asyncio @@ -703,87 +530,3 @@ async def test_real_platform_client_settings_are_local(): assert isinstance(settings, UserSettings) assert settings.skills["browser"] is True assert settings.skills["web-search"] is True - - -@pytest.mark.asyncio -async def test_agent_api_wrapper_transparently_surfaces_modern_events(monkeypatch): - callback_events: list[object] = [] - queue: asyncio.Queue = asyncio.Queue() - event_map = { - "text": TextChunkEvent("he"), - "tool_call": ToolCallChunkEvent("call"), - "tool_result": ToolResultEvent("result"), - "custom_update": CustomUpdateEvent("update"), - "send_file": SendFileEvent( - workspace_path="/workspace/report.pdf", - mime_type="application/pdf", - filename="report.pdf", - size=123, - ), - "end": EndEvent(tokens_used=11), - "error": ErrorEvent(code="BOOM", details="bad things"), - "disconnect": GracefulDisconnectEvent(), - } - - def fake_validate_json(data: str): - return event_map[data] - - monkeypatch.setattr( - agent_api_wrapper_module, - "ServerMessage", - type("FakeServerMessage", (), {"validate_json": staticmethod(fake_validate_json)}), - ) - - async def fake_cleanup(self): - return None - - monkeypatch.setattr(agent_api_wrapper_module.AgentApiWrapper, "_cleanup", fake_cleanup) - monkeypatch.setattr( - agent_api_wrapper_module.AgentApi, - "__init__", - lambda self, agent_id, base_url=None, chat_id=0, **kwargs: ( - setattr(self, "id", agent_id) - or setattr(self, "callback", kwargs.get("callback")) - or setattr(self, "on_disconnect", kwargs.get("on_disconnect")) - or setattr(self, "_current_queue", None) - ), - ) - - wrapper = AgentApiWrapper( - agent_id="agent-1", - base_url="https://agent.example.com/v1/agent_ws", - chat_id="chat-1", - callback=callback_events.append, - ) - wrapper._current_queue = queue - wrapper._ws = FakeWebSocket( - [ - FakeWSMessage("text"), - FakeWSMessage("tool_call"), - FakeWSMessage("tool_result"), - FakeWSMessage("custom_update"), - FakeWSMessage("send_file"), - FakeWSMessage("end"), - FakeWSMessage("error"), - FakeWSMessage("disconnect"), - ] - ) - - await wrapper._listen() - - queue_events = [] - while not queue.empty(): - queue_events.append(await queue.get()) - - assert queue_events[0].text == "he" - assert any(isinstance(event, SendFileEvent) for event in queue_events) - assert any(isinstance(event, EndEvent) for event in queue_events) - assert any(isinstance(event, GracefulDisconnectEvent) for event in queue_events) - assert callback_events[0].payload == "call" - assert callback_events[1].payload == "result" - assert callback_events[2].payload == "update" - assert any(isinstance(event, SendFileEvent) for event in callback_events) - assert any(isinstance(event, EndEvent) for event in callback_events) - assert any(isinstance(event, ErrorEvent) for event in callback_events) - assert any(isinstance(event, GracefulDisconnectEvent) for event in callback_events) - assert wrapper.last_tokens_used == 11 From 0c2884c2b1780e5f76ac95ee8ba1ba09adea1d8e Mon Sep 17 00:00:00 2001 From: Mikhail Putilovskij Date: Wed, 22 Apr 2026 01:25:11 +0300 Subject: [PATCH 4/4] refactor: use thin upstream transport adapter --- README.md | 8 +- adapter/matrix/bot.py | 2 +- ...-platform-streaming-final-bug-report-ru.md | 294 ++++++++++++++++++ sdk/agent_api_wrapper.py | 16 +- sdk/real.py | 152 ++------- tests/adapter/matrix/test_dispatcher.py | 6 +- tests/core/test_integration.py | 66 ++-- tests/platform/test_real.py | 131 +++----- 8 files changed, 420 insertions(+), 255 deletions(-) create mode 100644 docs/reports/2026-04-22-platform-streaming-final-bug-report-ru.md diff --git a/README.md b/README.md index 88370e9..4c4a480 100644 --- a/README.md +++ b/README.md @@ -70,7 +70,7 @@ surfaces-bot/ - **Стабильность** — перед `sync_forever()` бот делает bootstrap sync и стартует с `since`, чтобы не переигрывать старую timeline после рестарта - **Текущее ограничение** — encrypted DM официально не поддержан; ручное тестирование Matrix ведётся в незашифрованных комнатах и зависит от локального state-store бота - **Backend selection** — `MATRIX_PLATFORM_BACKEND=mock` остаётся значением по умолчанию; `MATRIX_PLATFORM_BACKEND=real` использует `platform-agent` из compose и WebSocket contract `/v1/agent_ws/{chat_id}/` -- **Ограничения real backend** — локальный runtime использует shared `/workspace`, а файлы передаются как относительные пути в `attachments` +- **Ограничения real backend** — локальный runtime использует shared `/workspace`, файлы передаются как относительные пути в `attachments`, а transport layer со стороны `surfaces` использует pinned upstream `platform-agent_api.AgentApi` почти без локальной stream-логики; текущая реализация рабочая, но после tool/file flow остаётся подтверждённый upstream streaming bug, из-за которого начало ответа может пропадать --- @@ -122,6 +122,8 @@ MATRIX_PASSWORD=... # или MATRIX_ACCESS_TOKEN=... MATRIX_PLATFORM_BACKEND=real # compose runtime: platform-agent service name + shared /workspace +# значение передаётся в thin wrapper как base URL; wrapper сам нормализует его +# до upstream WS route /v1/agent_ws/{chat_id}/ AGENT_WS_URL=ws://platform-agent:8000/v1/agent_ws/ AGENT_BASE_URL=http://platform-agent:8000 SURFACES_WORKSPACE_DIR=/workspace @@ -245,7 +247,8 @@ PYTHONPATH=. uv run python -m adapter.matrix.bot | Функция | Почему не работает | |---|---| | `!load` в другом чате | platform-agent использует `StateBackend` — файлы живут в памяти отдельно для каждого `thread_id`. Файл, сохранённый в чате A, не виден в чате B. Фикс: переключить platform-agent на `FilesystemBackend` с общим хранилищем. | -| Счётчик токенов в `!context` | platform-agent отдаёт `tokens_used=0` хардкодом в `MsgEventEnd`. Наш код перехватывает значение корректно. | +| Стриминг после tool/file flow | В текущем upstream `platform-agent` первый `MsgEventTextChunk` иногда рождается уже обрезанным до попадания в websocket-клиент. Наш transport layer после cleanup максимально близок к upstream и больше не пытается локально “лечить” этот поток. Подробности и raw evidence: `docs/reports/2026-04-22-platform-streaming-final-bug-report-ru.md`. | +| Счётчик токенов в `!context` | pinned `platform-agent_api.AgentApi` потребляет `MsgEventEnd` внутри клиента и не публикует `tokens_used` наружу. Сейчас `surfaces` честно показывает `0`, пока upstream не добавит поддержанный способ получить это значение. | | `!reset` | platform-agent не имеет endpoint `/reset`. Задокументировано в ТЗ к платформе. | | Персистентность между рестартами | platform-agent использует `MemorySaver` (in-memory). Все разговоры теряются при рестарте процесса. | | E2EE комнаты | `python-olm` не собирается на macOS/ARM. Ограничение инфраструктуры. | @@ -269,6 +272,7 @@ PYTHONPATH=. uv run python -m adapter.matrix.bot | [`docs/api-contract.md`](docs/api-contract.md) | Контракт к SDK платформы | | [`docs/user-flow.md`](docs/user-flow.md) | FSM и user journey | | [`docs/claude-code-guide.md`](docs/claude-code-guide.md) | Гайд по работе с Claude Code | +| [`docs/reports/2026-04-22-platform-streaming-final-bug-report-ru.md`](docs/reports/2026-04-22-platform-streaming-final-bug-report-ru.md) | Финальный аудит platform streaming bug после cleanup transport layer | --- diff --git a/adapter/matrix/bot.py b/adapter/matrix/bot.py index 48e70db..e7e68b2 100644 --- a/adapter/matrix/bot.py +++ b/adapter/matrix/bot.py @@ -110,7 +110,7 @@ def _build_platform_from_env() -> PlatformClient: if backend == "real": ws_url = os.environ["AGENT_WS_URL"] return RealPlatformClient( - agent_api=AgentApiWrapper(agent_id="matrix-bot", url=ws_url), + agent_api=AgentApiWrapper(agent_id="matrix-bot", base_url=ws_url), prototype_state=PrototypeStateStore(), platform="matrix", ) diff --git a/docs/reports/2026-04-22-platform-streaming-final-bug-report-ru.md b/docs/reports/2026-04-22-platform-streaming-final-bug-report-ru.md new file mode 100644 index 0000000..d03adc6 --- /dev/null +++ b/docs/reports/2026-04-22-platform-streaming-final-bug-report-ru.md @@ -0,0 +1,294 @@ +# Финальный баг-репорт: потеря начала ответа и сбои streaming/image path в `platform-agent` + +## Статус + +Это финальный отчёт после полного аудита интеграции `surfaces -> platform-agent_api -> platform-agent`. + +Итог: + +- текущая реализация `surfaces` рабочая, но проблемная из-за upstream-дефектов платформы +- баг с пропадающим началом ответа на текущем состоянии **не локализуется в `surfaces`** +- в воспроизведённом сценарии повреждённый первый текстовый chunk рождается уже внутри `platform-agent` +- помимо этого подтверждены ещё два независимых platform-side дефекта: + - duplicate `END` + - некорректная обработка больших изображений (`data-uri > 10 MB`, `WS 1009`) + +## Версии и состояние кода + +Рантайм воспроизводился на vendored upstream-репозиториях без локальных platform patch’ей: + +- `platform-agent`: `5e7c2df954cc3cd2f5bf8ae688e10a20038dde61` +- `platform-agent_api`: `aa480bbec5bbf8e006284dd03aed1c2754e9bbee` + +Со стороны `surfaces` transport layer был предварительно очищен: + +- убрана локальная stream-semantics из `sdk/agent_api_wrapper.py` +- `sdk/real.py` переведён на pinned upstream `platform-agent_api.AgentApi` +- больше нет локального post-END drain, custom listener и wrapper-owned reclassification events + +Это важно: баг воспроизводился **после** удаления наших транспортных костылей. + +## Контекст интеграции + +- поверхность: Matrix +- транспорт к платформе: WebSocket через upstream `platform-agent_api.AgentApi` +- `chat_id` на платформу: стабильный числовой surrogate id, выдаваемый со стороны `surfaces` +- файловый контракт: shared `/workspace`, вложения передаются как относительные пути в `attachments` + +## Пользовательские симптомы + +Наблюдались несколько классов сбоев: + +1. Начало ответа может пропасть +- ожидалось: `Моя ошибка: ...` +- фактически: `оя ошибка: ...` + +- ожидалось: `На двух изображениях: ...` +- фактически: ` двух изображениях: ...` + +2. После tool/file flow ответы могут вести себя нестабильно +- следующий ответ стартует с середины фразы +- в некоторых сценариях после image/tool path платформа отвечает ошибкой или замолкает + +3. На больших изображениях image path падает совсем +- provider error `Exceeded limit on max bytes per data-uri item : 10485760` +- websocket закрывается с `1009 (message too big)` + +## Что было проверено на стороне `surfaces` + +Ниже перечислено, что именно было перепроверено в нашем коде и почему это не выглядит корнем проблемы. + +### 1. Мы больше не режем и не переклассифицируем stream локально + +В текущем `surfaces`: + +- `sdk/agent_api_wrapper.py` — thin construction/factory shim над upstream `AgentApi` +- `sdk/real.py` — просто итерирует upstream events и склеивает `MsgEventTextChunk.text` +- `adapter/matrix/bot.py` — отправляет `OutgoingMessage.text` в Matrix без `strip/lstrip` + +Наблюдение: + +- в текущем коде не осталось места, где строка могла бы превратиться из `Моя ошибка` в `оя ошибка` через локальный slicing + +### 2. Сборка ответа у нас линейная и тупая + +`sdk/real.py` делает только следующее: + +- если пришёл `MsgEventTextChunk` — добавляет `event.text` в `response_parts` +- если пришёл `MsgEventSendFile` — превращает его в `Attachment` +- не пытается “восстанавливать” поток после `END` + +Следствие: + +- если начало уже отсутствует в `event.text`, мы его не можем ни потерять, ни вернуть + +### 3. Matrix sender не модифицирует текст + +`adapter/matrix/bot.py` передаёт текст дальше как есть. + +Следствие: + +- Matrix renderer не является объяснением пропажи первого куска + +## Что было проверено в `platform-agent_api` + +Upstream client всё ещё имеет спорную queue-архитектуру: + +- одна активная `_current_queue` +- `MsgEventEnd` съедается внутри `send_message()` +- в `finally` очередь отвязывается и дренится orphan messages + +Это архитектурно хрупко и может быть источником других boundary bugs. + +Но в конкретном воспроизведении этот слой не был точкой порчи текста. + +Почему: + +- в raw logs клиент получил **ровно тот же** первый text chunk, который сервер уже отправил +- queue/dequeue не изменили его содержимое + +## Что удалось доказать по raw logs + +Для финальной проверки была временно добавлена точечная диагностика в: + +- `external/platform-agent/src/agent/service.py` +- `external/platform-agent/src/api/external.py` +- `external/platform-agent_api/lambda_agent_api/agent_api.py` + +Эта диагностика **не входила** в рабочую реализацию и использовалась только для локализации бага. + +### Ключевое наблюдение + +На проблемном запросе после tool/file flow сервер сам yield’ил уже обрезанный первый chunk: + +```text +platform-agent-1 | [raw-stream][server-yield] chat=1 event=TEXT text=' двух изображениях:\n\n**Первое изображение' +platform-agent-1 | [raw-stream][ws-send] chat=1 event=AGENT_EVENT_TEXT_CHUNK text=' двух изображениях:\n\n**Первое изображение' path=None +matrix-bot-1 | [raw-stream][client-listen] agent=matrix-bot chat=1 queue_active=True AGENT_EVENT_TEXT_CHUNK text=' двух изображениях:\n\n**Первое изображение' +matrix-bot-1 | [raw-stream][client-dequeue] agent=matrix-bot chat=1 request=2 AGENT_EVENT_TEXT_CHUNK text=' двух изображениях:\n\n**Первое изображение' +``` + +Это означает: + +- порча произошла **до** websocket-клиента +- `surfaces` transport layer не является источником именно этого дефекта +- `platform-agent_api` не исказил этот конкретный chunk по дороге + +Дополнительно тот же паттерн виден и вне image-сценария: + +```text +platform-agent-1 | [raw-stream][server-yield] chat=1 event=TEXT text='сё работает напрямую' +... +matrix-bot-1 | [raw-stream][client-dequeue] ... text='сё работает напрямую' +``` + +То есть сервер уже выдаёт `сё`, а не `Всё`. + +## Наиболее вероятный root cause + +Главный подозреваемый — `external/platform-agent/src/agent/service.py`. + +Сейчас он делает следующее: + +- читает `self._agent.astream_events(...)` +- обрабатывает только `kind == "on_chat_model_stream"` +- берёт `chunk = event["data"]["chunk"]` +- если `chunk.content`, форвардит `MsgEventTextChunk(text=chunk.content)` + +Проблема в том, что это очень грубое преобразование raw event stream в пользовательский текст. + +### Почему именно это место выглядит корнем + +1. Первый битый chunk уже рождается на server-side +- это подтверждено логами выше + +2. Код берёт только `chunk.content` +- если начало ответа приходит в другой форме, поле или raw event, оно просто теряется + +3. Код не учитывает `ns` / `source` +- после tool/vision flow у Deep Agents / LangChain может быть более сложная структура потока +- текущий adapter flatten’ит её слишком агрессивно + +4. Код никак не валидирует, что наружу уходит именно main assistant output + +Итоговая гипотеза: + +> После tool/file/vision flow `platform-agent` неправильно адаптирует `astream_events()` в `MsgEventTextChunk`. Начало итогового пользовательского ответа может находиться не в том raw event, который сейчас читается через `chunk.content`, либо теряться из-за упрощённой фильтрации потока. + +## Подтверждённый отдельный баг: duplicate `END` + +Это отдельный platform-side дефект. + +Сейчас: + +- `external/platform-agent/src/agent/service.py` уже делает `yield MsgEventEnd(...)` +- `external/platform-agent/src/api/external.py` после завершения цикла дополнительно отправляет ещё один `MsgEventEnd(...)` + +По логам это выглядит так: + +```text +platform-agent-1 | [raw-stream][server-yield] chat=1 event=END +platform-agent-1 | [raw-stream][ws-send] chat=1 event=AGENT_EVENT_END text=None path=None +platform-agent-1 | [raw-stream][ws-send] chat=1 event=AGENT_EVENT_END duplicate_end=true +matrix-bot-1 | ... AGENT_EVENT_END tokens_used=0 +matrix-bot-1 | ... AGENT_EVENT_END tokens_used=0 +``` + +Независимая оценка: + +- duplicate `END` — реальный баг платформы +- он делает границу ответа менее надёжной +- но в текущем воспроизведении он **не объясняет** сам факт потери первого text chunk + +То есть это важный, но вторичный дефект. + +## Подтверждённый отдельный баг: большие изображения ломают image path + +В отдельном воспроизведении платформа падала на анализе изображений с provider error: + +```text +Exceeded limit on max bytes per data-uri item : 10485760 +``` + +И параллельно websocket рвался с: + +```text +received 1009 (message too big); then sent 1009 (message too big) +``` + +Это означает: + +- image path отправляет в provider oversized `data:` URI +- безопасной предвалидации / деградации нет +- failure scenario сопровождается разрывом websocket-соединения + +Независимая оценка: + +- это отдельный platform-side bug +- он не объясняет потерю первого чанка в текстовом сценарии напрямую +- но подтверждает, что path `tool/file/image -> platform stream` в целом сейчас нестабилен + +## Что мы считаем исключённым + +С достаточной уверенностью можно исключить: + +1. Локальный slicing текста в `surfaces` +2. Локальную “умную” реконструкцию потока, потому что она была удалена +3. Matrix sender как источник потери первого чанка +4. `platform-agent_api` queue/dequeue как primary root cause именно в этом воспроизведении + +## Финальная независимая оценка + +Текущая оценка вероятностей: + +- `75%` — ошибка в `platform-agent`, в адаптере `astream_events() -> MsgEventTextChunk` +- `15%` — provider/model stream приносит начало ответа в другой raw event/field, а `platform-agent` его некорректно интерпретирует +- `10%` — вторичные platform-side boundary defects (`duplicate END`, queue semantics и т.д.) +- `~0-5%` — ошибка в `surfaces` + +Итоговый вывод: + +> На текущем состоянии кода баг с пропадающим началом ответа следует считать platform-side дефектом. Воспроизведение после cleanup transport layer показывает, что первый повреждённый text chunk формируется уже внутри `platform-agent` до отправки в websocket. + +## Что нужно исправить в платформе + +### Обязательно + +1. Убрать duplicate `END` +- один ответ должен завершаться ровно одним `MsgEventEnd` + +2. Перепроверить адаптацию `astream_events()` в `service.py` +- логировать и проанализировать raw `event["event"]` +- проверить `event.get("name")` +- смотреть `event.get("ns")` +- сравнить `chunk.content` с тем, что реально лежит в `chunk.text` / raw chunk repr + +3. Форвардить наружу только финальный main assistant output +- не flatten’ить весь поток без учёта `ns/source` + +### Желательно + +4. Сделать image path устойчивым к oversized payload +- preflight check размера +- resize/compress или controlled error без разрыва WS + +5. Улучшить client/server protocol boundary +- более строгая корреляция запроса и ответа +- более однозначная semantics конца ответа + +## Что мы сделали со своей стороны + +Со стороны `surfaces` уже выполнено следующее: + +- transport layer очищен до thin adapter над upstream `AgentApi` +- локальные stream-workaround’ы удалены +- рабочая интеграция сохранена +- known issue задокументирован + +То есть текущая интеграция не “идеальна”, но её поведение теперь достаточно чистое, чтобы platform bug было можно локализовать без смешения ответственности. + +## Приложение: короткий диагноз + +Если нужна самая короткая формулировка для issue tracker: + +> После cleanup transport layer в `surfaces` и воспроизведения на clean upstream platform repos видно, что `platform-agent` иногда сам порождает первый `MsgEventTextChunk` уже обрезанным, особенно после tool/file flow. Дополнительно подтверждены duplicate `END` и отдельный image-path failure на больших `data:` URI. diff --git a/sdk/agent_api_wrapper.py b/sdk/agent_api_wrapper.py index fa69816..34fee46 100644 --- a/sdk/agent_api_wrapper.py +++ b/sdk/agent_api_wrapper.py @@ -1,6 +1,5 @@ from __future__ import annotations -import inspect import re import sys from pathlib import Path @@ -27,11 +26,6 @@ class AgentApiWrapper(AgentApi): self._base_url = self._normalize_base_url(base_url) self._init_kwargs = dict(kwargs) self.chat_id = chat_id - if not self._supports_modern_constructor(): - raise RuntimeError( - "Pinned platform-agent_api is expected to support base_url + chat_id" - ) - super().__init__( agent_id=agent_id, base_url=self._base_url, @@ -39,21 +33,13 @@ class AgentApiWrapper(AgentApi): **kwargs, ) - @staticmethod - def _supports_modern_constructor() -> bool: - try: - parameters = inspect.signature(AgentApi.__init__).parameters - except (TypeError, ValueError): - return False - return "base_url" in parameters and "chat_id" in parameters - @staticmethod def _normalize_base_url(base_url: str) -> str: parsed = urlsplit(base_url) path = re.sub(r"(?:/v1)?/agent_ws(?:/[^/]+)?/?$", "", parsed.path.rstrip("/")) return urlunsplit((parsed.scheme, parsed.netloc, path, "", "")) - def for_chat(self, chat_id: int | str) -> "AgentApiWrapper": + def for_chat(self, chat_id: int | str) -> AgentApiWrapper: return type(self)( agent_id=self.id, base_url=self._base_url, diff --git a/sdk/real.py b/sdk/real.py index 0eac543..2b43056 100644 --- a/sdk/real.py +++ b/sdk/real.py @@ -1,10 +1,11 @@ from __future__ import annotations import asyncio -import inspect from collections.abc import AsyncIterator from pathlib import Path +from lambda_agent_api.server import MsgEventSendFile, MsgEventTextChunk + from sdk.agent_api_wrapper import AgentApiWrapper from sdk.interface import ( Attachment, @@ -40,14 +41,10 @@ class RealPlatformClient(PlatformClient): chat_key = str(chat_id) chat_api = self._chat_apis.get(chat_key) if chat_api is None: - chat_api_factory = getattr(self._agent_api, "for_chat", None) - if not callable(chat_api_factory): - return self._agent_api - async with self._chat_api_lock: chat_api = self._chat_apis.get(chat_key) if chat_api is None: - chat_api = chat_api_factory(chat_key) + chat_api = self._agent_api.for_chat(chat_key) await chat_api.connect() self._chat_apis[chat_key] = chat_api return chat_api @@ -80,48 +77,36 @@ class RealPlatformClient(PlatformClient): attachments: list[Attachment] | None = None, ) -> MessageResponse: response_parts: list[str] = [] - tokens_used = 0 sent_attachments: list[Attachment] = [] message_id = user_id - saw_end_event = False lock = self._get_chat_send_lock(chat_id) async with lock: chat_api = await self._get_chat_api(chat_id) - if hasattr(chat_api, "last_tokens_used"): - chat_api.last_tokens_used = 0 try: async for event in self._stream_agent_events( chat_api, text, attachments=attachments ): message_id = user_id - if self._is_text_event(event): - chunk_text = getattr(event, "text", "") - if chunk_text: - response_parts.append(chunk_text) - elif self._is_end_event(event): - tokens_used = getattr(event, "tokens_used", tokens_used) - saw_end_event = True - elif self._is_send_file_event(event): + if isinstance(event, MsgEventTextChunk) and event.text: + response_parts.append(event.text) + elif isinstance(event, MsgEventSendFile): attachment = self._attachment_from_send_file_event(event) if attachment is not None: sent_attachments.append(attachment) except Exception as exc: await self._handle_chat_api_failure(chat_id, exc) - if not saw_end_event: - tokens_used = getattr(chat_api, "last_tokens_used", tokens_used) - await self._prototype_state.set_last_tokens_used(str(chat_id), tokens_used) + await self._prototype_state.set_last_tokens_used(str(chat_id), 0) response_kwargs = { "message_id": message_id, "response": "".join(response_parts), - "tokens_used": tokens_used, + "tokens_used": 0, "finished": True, + "attachments": sent_attachments, } - if self._message_response_accepts_attachments(): - response_kwargs["attachments"] = sent_attachments return MessageResponse(**response_kwargs) async def stream_message( @@ -134,44 +119,27 @@ class RealPlatformClient(PlatformClient): lock = self._get_chat_send_lock(chat_id) async with lock: chat_api = await self._get_chat_api(chat_id) - if hasattr(chat_api, "last_tokens_used"): - chat_api.last_tokens_used = 0 - saw_end_event = False try: async for event in self._stream_agent_events( chat_api, text, attachments=attachments ): - if self._is_text_event(event): + if isinstance(event, MsgEventTextChunk): yield MessageChunk( message_id=user_id, - delta=getattr(event, "text", ""), + delta=event.text, finished=False, ) - elif self._is_end_event(event): - tokens_used = getattr(event, "tokens_used", 0) - saw_end_event = True - await self._prototype_state.set_last_tokens_used(str(chat_id), tokens_used) - yield MessageChunk( - message_id=user_id, - delta="", - finished=True, - tokens_used=tokens_used, - ) - elif self._is_send_file_event(event): - continue - else: + elif isinstance(event, MsgEventSendFile): continue except Exception as exc: await self._handle_chat_api_failure(chat_id, exc) - if not saw_end_event: - tokens_used = getattr(chat_api, "last_tokens_used", 0) - await self._prototype_state.set_last_tokens_used(str(chat_id), tokens_used) - yield MessageChunk( - message_id=user_id, - delta="", - finished=True, - tokens_used=tokens_used, - ) + await self._prototype_state.set_last_tokens_used(str(chat_id), 0) + yield MessageChunk( + message_id=user_id, + delta="", + finished=True, + tokens_used=0, + ) async def get_settings(self, user_id: str) -> UserSettings: return await self._prototype_state.get_settings(user_id) @@ -195,10 +163,6 @@ class RealPlatformClient(PlatformClient): await close() self._chat_apis.clear() self._chat_send_locks.clear() - if not callable(getattr(self._agent_api, "for_chat", None)): - close = getattr(self._agent_api, "close", None) - if callable(close): - await close() async def _stream_agent_events( self, @@ -206,12 +170,8 @@ class RealPlatformClient(PlatformClient): text: str, attachments: list[Attachment] | None = None, ) -> AsyncIterator[object]: - send_message = chat_api.send_message attachment_paths = self._attachment_paths(attachments) - if attachment_paths and self._send_message_accepts_attachments(send_message): - event_stream = send_message(text, attachments=attachment_paths) - else: - event_stream = send_message(text) + event_stream = chat_api.send_message(text, attachments=attachment_paths or None) async for event in event_stream: yield event @@ -231,61 +191,9 @@ class RealPlatformClient(PlatformClient): return paths @staticmethod - def _send_message_accepts_attachments(send_message) -> bool: - try: - parameters = inspect.signature(send_message).parameters - except (TypeError, ValueError): - return False - return "attachments" in parameters or any( - parameter.kind == inspect.Parameter.VAR_KEYWORD for parameter in parameters.values() - ) - - @staticmethod - def _event_kind(event: object) -> str: - raw_kind = getattr(event, "type", None) - if hasattr(raw_kind, "value"): - raw_kind = raw_kind.value - if raw_kind is None: - raw_kind = event.__class__.__name__ - - kind = str(raw_kind).replace("-", "_") - if "_" in kind: - return kind.upper() - normalized = [] - for index, char in enumerate(kind): - if index and char.isupper() and not kind[index - 1].isupper(): - normalized.append("_") - normalized.append(char) - return "".join(normalized).upper() - - @classmethod - def _is_text_event(cls, event: object) -> bool: - return hasattr(event, "text") or "TEXT_CHUNK" in cls._event_kind(event) - - @classmethod - def _is_end_event(cls, event: object) -> bool: - kind = cls._event_kind(event) - return kind == "END" or kind.endswith("_END") - - @classmethod - def _is_send_file_event(cls, event: object) -> bool: - kind = cls._event_kind(event) - return "SEND_FILE" in kind - - @staticmethod - def _attachment_from_send_file_event(event: object) -> Attachment | None: - location = None - for attr in ("url", "workspace_path", "path", "file_path", "uri"): - value = getattr(event, attr, None) - if value: - location = str(value) - break - if location is None: - return None - - mime_type = getattr(event, "mime_type", None) or "application/octet-stream" - filename = getattr(event, "filename", None) or Path(location).name or None - size = getattr(event, "size", None) + def _attachment_from_send_file_event(event: MsgEventSendFile) -> Attachment: + location = str(event.path) + filename = Path(location).name or None workspace_path = location if workspace_path.startswith("/workspace/"): workspace_path = workspace_path[len("/workspace/") :] @@ -293,18 +201,8 @@ class RealPlatformClient(PlatformClient): workspace_path = "" return Attachment( url=location, - mime_type=mime_type, - size=size, + mime_type="application/octet-stream", + size=None, filename=filename, workspace_path=workspace_path or None, ) - - @staticmethod - def _message_response_accepts_attachments() -> bool: - fields = getattr(MessageResponse, "model_fields", None) - if isinstance(fields, dict): - return "attachments" in fields - try: - return "attachments" in inspect.signature(MessageResponse).parameters - except (TypeError, ValueError): - return False diff --git a/tests/adapter/matrix/test_dispatcher.py b/tests/adapter/matrix/test_dispatcher.py index 07e2bee..01b35da 100644 --- a/tests/adapter/matrix/test_dispatcher.py +++ b/tests/adapter/matrix/test_dispatcher.py @@ -911,9 +911,9 @@ async def test_build_runtime_uses_real_platform_when_matrix_backend_is_real(monk bot_module = importlib.import_module("adapter.matrix.bot") class FakeAgentApiWrapper: - def __init__(self, agent_id: str, url: str) -> None: + def __init__(self, agent_id: str, base_url: str) -> None: self.agent_id = agent_id - self.url = url + self.base_url = base_url monkeypatch.setattr(bot_module, "AgentApiWrapper", FakeAgentApiWrapper) monkeypatch.setenv("MATRIX_PLATFORM_BACKEND", "real") @@ -922,7 +922,7 @@ async def test_build_runtime_uses_real_platform_when_matrix_backend_is_real(monk runtime = build_runtime() assert isinstance(runtime.platform, RealPlatformClient) - assert runtime.platform.agent_api.url == "ws://agent.example/agent_ws/" + assert runtime.platform.agent_api.base_url == "ws://agent.example/agent_ws/" async def test_matrix_main_closes_platform_without_connecting_root_agent(monkeypatch): diff --git a/tests/core/test_integration.py b/tests/core/test_integration.py index fd7bd2e..5287074 100644 --- a/tests/core/test_integration.py +++ b/tests/core/test_integration.py @@ -4,32 +4,55 @@ Smoke test: полный цикл через dispatcher + реальные manag Имитирует что делает адаптер (Telegram или Matrix) при получении события. """ import pytest -from sdk.mock import MockPlatformClient -from sdk.interface import MessageChunk, MessageResponse -from sdk.prototype_state import PrototypeStateStore -from sdk.real import RealPlatformClient -from core.store import InMemoryStore -from core.chat import ChatManager +from lambda_agent_api.server import MsgEventTextChunk + from core.auth import AuthManager -from core.settings import SettingsManager +from core.chat import ChatManager from core.handler import EventDispatcher from core.handlers import register_all from core.protocol import ( - IncomingCommand, IncomingMessage, IncomingCallback, - OutgoingMessage, OutgoingUI, - Attachment, SettingsAction, + Attachment, + IncomingCallback, + IncomingCommand, + IncomingMessage, + OutgoingMessage, + OutgoingUI, ) +from core.settings import SettingsManager +from core.store import InMemoryStore +from sdk.mock import MockPlatformClient +from sdk.prototype_state import PrototypeStateStore +from sdk.real import RealPlatformClient class FakeAgentApi: - def __init__(self) -> None: + def __init__(self, chat_id: str) -> None: + self.chat_id = chat_id self.calls: list[tuple[str, list[str]]] = [] - self.last_tokens_used = 0 + self.connect_calls = 0 + self.close_calls = 0 + + async def connect(self) -> None: + self.connect_calls += 1 + + async def close(self) -> None: + self.close_calls += 1 async def send_message(self, text: str, attachments: list[str] | None = None): self.calls.append((text, attachments or [])) - yield type("Chunk", (), {"text": f"[REAL] {text}"})() - self.last_tokens_used = 5 + yield MsgEventTextChunk(text=f"[REAL] {text}") + + +class FakeAgentApiFactory: + def __init__(self) -> None: + self.created_chat_ids: list[str] = [] + self.instances: dict[str, FakeAgentApi] = {} + + def for_chat(self, chat_id: str) -> FakeAgentApi: + chat_api = FakeAgentApi(chat_id) + self.created_chat_ids.append(chat_id) + self.instances[chat_id] = chat_api + return chat_api @pytest.fixture @@ -48,7 +71,7 @@ def dispatcher(): @pytest.fixture def real_dispatcher(): - agent_api = FakeAgentApi() + agent_api = FakeAgentApiFactory() platform = RealPlatformClient( agent_api=agent_api, prototype_state=PrototypeStateStore(), @@ -80,7 +103,13 @@ async def test_new_chat_command(dispatcher): start = IncomingCommand(user_id="u1", platform="matrix", chat_id="C1", command="start") await dispatcher.dispatch(start) - new = IncomingCommand(user_id="u1", platform="matrix", chat_id="C2", command="new", args=["Анализ"]) + new = IncomingCommand( + user_id="u1", + platform="matrix", + chat_id="C2", + command="new", + args=["Анализ"], + ) result = await dispatcher.dispatch(new) assert any("Анализ" in r.text for r in result if isinstance(r, OutgoingMessage)) @@ -130,7 +159,8 @@ async def test_full_flow_with_real_platform_uses_shared_agent_api(real_dispatche texts = [r.text for r in result if isinstance(r, OutgoingMessage)] assert texts == ["[REAL] Привет!"] - assert agent_api.calls == [("Привет!", [])] + assert agent_api.created_chat_ids == ["C1"] + assert agent_api.instances["C1"].calls == [("Привет!", [])] async def test_full_flow_with_real_platform_forwards_workspace_attachment(real_dispatcher): @@ -155,6 +185,6 @@ async def test_full_flow_with_real_platform_forwards_workspace_attachment(real_d ) await dispatcher.dispatch(msg) - assert agent_api.calls == [ + assert agent_api.instances["C1"].calls == [ ("Посмотри файл", ["surfaces/matrix/u1/room/inbox/report.pdf"]) ] diff --git a/tests/platform/test_real.py b/tests/platform/test_real.py index 382b554..38b19e3 100644 --- a/tests/platform/test_real.py +++ b/tests/platform/test_real.py @@ -1,6 +1,8 @@ import asyncio import pytest +from lambda_agent_api.server import MsgEventSendFile, MsgEventTextChunk +from pydantic import Field import sdk.agent_api_wrapper as agent_api_wrapper_module from core.protocol import SettingsAction @@ -10,18 +12,12 @@ from sdk.prototype_state import PrototypeStateStore from sdk.real import RealPlatformClient -class FakeChunk: - def __init__(self, text: str) -> None: - self.text = text - - class FakeChatAgentApi: def __init__(self, chat_id: str) -> None: self.chat_id = chat_id self.calls: list[str] = [] self.connect_calls = 0 self.close_calls = 0 - self.last_tokens_used = 0 async def connect(self) -> None: self.connect_calls += 1 @@ -29,12 +25,11 @@ class FakeChatAgentApi: async def close(self) -> None: self.close_calls += 1 - async def send_message(self, text: str): + async def send_message(self, text: str, attachments: list[str] | None = None): self.calls.append(text) midpoint = len(text) // 2 - yield FakeChunk(text[:midpoint]) - yield FakeChunk(text[midpoint:]) - self.last_tokens_used = 3 + yield MsgEventTextChunk(text=text[:midpoint]) + yield MsgEventTextChunk(text=text[midpoint:]) class FakeAgentApiFactory: @@ -49,25 +44,12 @@ class FakeAgentApiFactory: return chat_api -class LegacyAgentApi: - def __init__(self) -> None: - self.calls: list[str] = [] - self.last_tokens_used = 0 - - async def send_message(self, text: str): - self.calls.append(text) - yield FakeChunk(text[:2]) - yield FakeChunk(text[2:]) - self.last_tokens_used = 7 - - class BlockingChatAgentApi: def __init__(self, chat_id: str) -> None: self.chat_id = chat_id self.calls: list[str] = [] self.connect_calls = 0 self.close_calls = 0 - self.last_tokens_used = 0 self.active_calls = 0 self.max_active_calls = 0 self.started = asyncio.Event() @@ -79,15 +61,14 @@ class BlockingChatAgentApi: async def close(self) -> None: self.close_calls += 1 - async def send_message(self, text: str): + async def send_message(self, text: str, attachments: list[str] | None = None): self.calls.append(text) self.active_calls += 1 self.max_active_calls = max(self.max_active_calls, self.active_calls) self.started.set() await self.release.wait() self.active_calls -= 1 - yield FakeChunk(text) - self.last_tokens_used = len(text) + yield MsgEventTextChunk(text=text) class AttachmentTrackingChatAgentApi: @@ -96,7 +77,6 @@ class AttachmentTrackingChatAgentApi: self.calls: list[tuple[str, list[str] | None]] = [] self.connect_calls = 0 self.close_calls = 0 - self.last_tokens_used = 0 async def connect(self) -> None: self.connect_calls += 1 @@ -106,8 +86,20 @@ class AttachmentTrackingChatAgentApi: async def send_message(self, text: str, attachments: list[str] | None = None): self.calls.append((text, attachments)) - yield FakeChunk(text) - self.last_tokens_used = 5 + yield MsgEventTextChunk(text=text) + + +class AttachmentTrackingAgentApiFactory: + def __init__(self, chat_api_cls=AttachmentTrackingChatAgentApi) -> None: + self.chat_api_cls = chat_api_cls + self.created_chat_ids: list[str] = [] + self.instances: dict[str, AttachmentTrackingChatAgentApi] = {} + + def for_chat(self, chat_id: str) -> AttachmentTrackingChatAgentApi: + chat_api = self.chat_api_cls(chat_id) + self.created_chat_ids.append(chat_id) + self.instances[chat_id] = chat_api + return chat_api class FlakyChatAgentApi: @@ -127,22 +119,8 @@ class FlakyChatAgentApi: yield -class SendFileEvent: - def __init__(self, *, workspace_path: str, mime_type: str, filename: str, size: int) -> None: - self.type = "AGENT_EVENT_SEND_FILE" - self.workspace_path = workspace_path - self.mime_type = mime_type - self.filename = filename - self.size = size - - -class TextChunkEvent: - def __init__(self, text: str) -> None: - self.type = "AGENT_EVENT_TEXT_CHUNK" - self.text = text - class MessageResponseWithAttachments(MessageResponse): - attachments: list[Attachment] = [] + attachments: list[Attachment] = Field(default_factory=list) def test_agent_api_wrapper_normalizes_base_url_and_uses_modern_constructor(monkeypatch): @@ -230,19 +208,19 @@ async def test_real_platform_client_send_message_uses_chat_bound_client(): assert result == MessageResponse( message_id="@alice:example.org", response="hello", - tokens_used=3, + tokens_used=0, finished=True, ) assert agent_api.created_chat_ids == ["chat-7"] assert agent_api.instances["chat-7"].chat_id == "chat-7" assert agent_api.instances["chat-7"].calls == ["hello"] assert agent_api.instances["chat-7"].connect_calls == 1 - assert await prototype_state.get_last_tokens_used_for_context("chat-7") == 3 + assert await prototype_state.get_last_tokens_used_for_context("chat-7") == 0 @pytest.mark.asyncio async def test_real_platform_client_forwards_attachments_to_chat_api(): - agent_api = AttachmentTrackingChatAgentApi("chat-7") + agent_api = AttachmentTrackingAgentApiFactory() client = RealPlatformClient( agent_api=agent_api, prototype_state=PrototypeStateStore(), @@ -262,74 +240,49 @@ async def test_real_platform_client_forwards_attachments_to_chat_api(): attachments=[attachment], ) - assert agent_api.calls == [("hello", ["surfaces/matrix/alice/room/inbox/report.pdf"])] + assert agent_api.instances["chat-7"].calls == [ + ("hello", ["surfaces/matrix/alice/room/inbox/report.pdf"]) + ] assert result.response == "hello" - assert result.tokens_used == 5 + assert result.tokens_used == 0 @pytest.mark.asyncio async def test_real_platform_client_preserves_send_file_events_in_sync_result(monkeypatch): - agent_api = AttachmentTrackingChatAgentApi("chat-7") + class FileEventAgentApi(AttachmentTrackingChatAgentApi): + async def send_message(self, text: str, attachments: list[str] | None = None): + self.calls.append((text, attachments)) + yield MsgEventTextChunk(text="he") + yield MsgEventSendFile(path="report.pdf") + yield MsgEventTextChunk(text="llo") + + agent_api = AttachmentTrackingAgentApiFactory(chat_api_cls=FileEventAgentApi) client = RealPlatformClient( agent_api=agent_api, prototype_state=PrototypeStateStore(), platform="matrix", ) - class FileEventAgentApi(AttachmentTrackingChatAgentApi): - async def send_message(self, text: str, attachments: list[str] | None = None): - self.calls.append((text, attachments)) - yield TextChunkEvent("he") - yield SendFileEvent( - workspace_path="/workspace/report.pdf", - mime_type="application/pdf", - filename="report.pdf", - size=123, - ) - yield TextChunkEvent("llo") - self.last_tokens_used = 9 - monkeypatch.setattr( "sdk.real.MessageResponse", MessageResponseWithAttachments, ) - client._agent_api = FileEventAgentApi("chat-7") result = await client.send_message("@alice:example.org", "chat-7", "hello") assert result.response == "hello" - assert result.tokens_used == 9 + assert result.tokens_used == 0 assert result.attachments == [ Attachment( - url="/workspace/report.pdf", - mime_type="application/pdf", + url="report.pdf", + mime_type="application/octet-stream", filename="report.pdf", - size=123, + size=None, workspace_path="report.pdf", ) ] -@pytest.mark.asyncio -async def test_real_platform_client_works_with_legacy_agent_api_without_for_chat(): - legacy_api = LegacyAgentApi() - client = RealPlatformClient( - agent_api=legacy_api, - prototype_state=PrototypeStateStore(), - platform="matrix", - ) - - result = await client.send_message("@alice:example.org", "chat-legacy", "hello") - - assert result == MessageResponse( - message_id="@alice:example.org", - response="hello", - tokens_used=7, - finished=True, - ) - assert legacy_api.calls == ["hello"] - - @pytest.mark.asyncio async def test_real_platform_client_reuses_cached_chat_client(): agent_api = FakeAgentApiFactory() @@ -505,7 +458,7 @@ async def test_real_platform_client_stream_message_emits_final_tokens_chunk(): message_id="@alice:example.org", delta="", finished=True, - tokens_used=3, + tokens_used=0, ), ] assert agent_api.created_chat_ids == ["chat-1"]