From 4d917ac7941ad1b0dd50eff025c49e5e5fd5de7c Mon Sep 17 00:00:00 2001 From: Mikhail Putilovskij Date: Wed, 22 Apr 2026 00:17:15 +0300 Subject: [PATCH] docs: add thin transport adapter plan --- ...2026-04-22-transport-layer-thin-adapter.md | 540 ++++++++++++++++++ 1 file changed, 540 insertions(+) create mode 100644 docs/superpowers/plans/2026-04-22-transport-layer-thin-adapter.md diff --git a/docs/superpowers/plans/2026-04-22-transport-layer-thin-adapter.md b/docs/superpowers/plans/2026-04-22-transport-layer-thin-adapter.md new file mode 100644 index 0000000..b1984ec --- /dev/null +++ b/docs/superpowers/plans/2026-04-22-transport-layer-thin-adapter.md @@ -0,0 +1,540 @@ +# Transport Layer Thin Adapter Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Replace the current custom transport behavior with a thin upstream-compatible adapter so Matrix uses `platform-agent_api.AgentApi` almost as-is and keeps only integration concerns on the `surfaces` side. + +**Architecture:** Keep a tiny `AgentApiWrapper` only as a construction/factory shim (`base_url` normalization + `for_chat(chat_id)`). Move all resilience logic that still belongs to `surfaces` into `RealPlatformClient`: per-chat client caching, per-chat send locks, disconnect-on-failure, and `PlatformError` mapping. Delete all custom stream semantics from the wrapper so bugs in streaming can be attributed cleanly to either upstream or our integration layer. + +**Tech Stack:** Python 3.11, `aiohttp`, upstream `lambda_agent_api`, `pytest`, `pytest-asyncio`, `ruff` + +--- + +## File Structure + +- Modify: `sdk/agent_api_wrapper.py` + Purpose: shrink the wrapper to a thin constructor/factory shim with no custom `_listen()` or `send_message()` logic. +- Modify: `sdk/real.py` + Purpose: keep integration-only behavior: cached per-chat clients, chat locks, attachment forwarding, and failure cleanup. +- Modify: `adapter/matrix/bot.py` + Purpose: instantiate the wrapper with the modern `base_url` path instead of a raw `url`, matching the pinned upstream API. +- Modify: `tests/platform/test_real.py` + Purpose: remove tests that encode custom wrapper semantics and replace them with tests that prove only the intended integration guarantees. +- Modify: `README.md` + Purpose: document that the transport layer now uses the pinned upstream `platform-agent_api` client semantics directly, with only a thin local adapter. + +### Task 1: Shrink `AgentApiWrapper` To A Thin Factory Shim + +**Files:** +- Modify: `sdk/agent_api_wrapper.py` +- Test: `tests/platform/test_real.py` + +- [ ] **Step 1: Replace wrapper-only behavior tests with thin-wrapper tests** + +Update `tests/platform/test_real.py` so it no longer asserts any custom post-END drain or wrapper-owned timeout behavior. Replace those tests with the following: + +```python +def test_agent_api_wrapper_normalizes_base_url_and_uses_modern_constructor(monkeypatch): + captured = {} + + def fake_init(self, agent_id, base_url=None, chat_id=0, **kwargs): + captured["agent_id"] = agent_id + captured["base_url"] = base_url + captured["chat_id"] = chat_id + + monkeypatch.setattr(agent_api_wrapper_module.AgentApi, "__init__", fake_init) + + wrapper = AgentApiWrapper( + agent_id="agent-1", + base_url="ws://platform-agent:8000/v1/agent_ws/", + chat_id="41", + ) + + assert wrapper.chat_id == "41" + assert wrapper._base_url == "ws://platform-agent:8000" + assert captured == { + "agent_id": "agent-1", + "base_url": "ws://platform-agent:8000", + "chat_id": "41", + } + + +def test_agent_api_wrapper_for_chat_reuses_normalized_base_url(monkeypatch): + init_calls = [] + + def fake_init(self, agent_id, base_url=None, chat_id=0, **kwargs): + self.id = agent_id + self.chat_id = chat_id + self.url = base_url + init_calls.append((agent_id, base_url, chat_id)) + + monkeypatch.setattr(agent_api_wrapper_module.AgentApi, "__init__", fake_init) + + root = AgentApiWrapper( + agent_id="agent-1", + base_url="http://platform-agent:8000/v1/agent_ws/", + chat_id="1", + ) + + child = root.for_chat("99") + + assert child is not root + assert child.chat_id == "99" + assert child._base_url == "http://platform-agent:8000" + assert init_calls == [ + ("agent-1", "http://platform-agent:8000", "1"), + ("agent-1", "http://platform-agent:8000", "99"), + ] +``` + +- [ ] **Step 2: Run tests to verify old assumptions fail** + +Run: + +```bash +/bin/zsh -lc 'PYTHONPATH=.:external/platform-agent_api uv run pytest tests/platform/test_real.py -q -k "recovers_late_text_after_first_end or times_out_on_idle_stream or normalizes_base_url_and_uses_modern_constructor or for_chat_reuses_normalized_base_url"' +``` + +Expected: + +- FAIL because the old wrapper-behavior tests still exist +- FAIL or SKIP for the new thin-wrapper tests until code/tests are aligned + +- [ ] **Step 3: Replace `sdk/agent_api_wrapper.py` with a thin wrapper** + +Rewrite `sdk/agent_api_wrapper.py` to the minimal implementation below: + +```python +from __future__ import annotations + +import inspect +import re +import sys +from pathlib import Path +from urllib.parse import urlsplit, urlunsplit + +_api_root = Path(__file__).resolve().parents[1] / "external" / "platform-agent_api" +if str(_api_root) not in sys.path: + sys.path.insert(0, str(_api_root)) + +from lambda_agent_api.agent_api import AgentApi # noqa: E402 + + +class AgentApiWrapper(AgentApi): + """Thin construction/factory shim over the pinned upstream AgentApi.""" + + def __init__( + self, + agent_id: str, + base_url: str, + *, + chat_id: int | str = 0, + **kwargs, + ) -> None: + self._base_url = self._normalize_base_url(base_url) + self._init_kwargs = dict(kwargs) + self.chat_id = chat_id + if not self._supports_modern_constructor(): + raise RuntimeError("Pinned platform-agent_api is expected to support base_url + chat_id") + + super().__init__( + agent_id=agent_id, + base_url=self._base_url, + chat_id=chat_id, + **kwargs, + ) + + @staticmethod + def _supports_modern_constructor() -> bool: + try: + parameters = inspect.signature(AgentApi.__init__).parameters + except (TypeError, ValueError): + return False + return "base_url" in parameters and "chat_id" in parameters + + @staticmethod + def _normalize_base_url(base_url: str) -> str: + parsed = urlsplit(base_url) + path = re.sub(r"(?:/v1)?/agent_ws(?:/[^/]+)?/?$", "", parsed.path.rstrip("/")) + return urlunsplit((parsed.scheme, parsed.netloc, path, "", "")) + + def for_chat(self, chat_id: int | str) -> "AgentApiWrapper": + return type(self)( + agent_id=self.id, + base_url=self._base_url, + chat_id=chat_id, + **self._init_kwargs, + ) +``` + +- [ ] **Step 4: Run the wrapper-focused tests** + +Run: + +```bash +/bin/zsh -lc 'PYTHONPATH=.:external/platform-agent_api uv run pytest tests/platform/test_real.py -q -k "normalizes_base_url_and_uses_modern_constructor or for_chat_reuses_normalized_base_url"' +``` + +Expected: + +- PASS + +- [ ] **Step 5: Commit** + +```bash +git add sdk/agent_api_wrapper.py tests/platform/test_real.py +git commit -m "refactor: shrink agent api wrapper to thin adapter" +``` + +### Task 2: Simplify `RealPlatformClient` To The Pinned Modern API + +**Files:** +- Modify: `sdk/real.py` +- Modify: `adapter/matrix/bot.py` +- Test: `tests/platform/test_real.py` + +- [ ] **Step 1: Add failing integration tests for the desired thin-adapter contract** + +Extend `tests/platform/test_real.py` with these assertions: + +```python +@pytest.mark.asyncio +async def test_real_platform_client_passes_attachments_to_modern_send_message(): + agent_api = FakeAgentApiFactory() + client = RealPlatformClient( + agent_api=agent_api, + prototype_state=PrototypeStateStore(), + platform="matrix", + ) + + attachment = Attachment( + type="document", + filename="report.pdf", + mime_type="application/pdf", + workspace_path="surfaces/matrix/u1/r1/inbox/report.pdf", + ) + + result = await client.send_message( + "@alice:example.org", + "chat-1", + "read this", + attachments=[attachment], + ) + + assert result.response == "read this" + assert agent_api.instances["chat-1"].calls == [ + ("read this", ["surfaces/matrix/u1/r1/inbox/report.pdf"]) + ] + + +@pytest.mark.asyncio +async def test_real_platform_client_disconnects_chat_after_agent_exception(): + class ErroringChatAgentApi: + def __init__(self, chat_id: str) -> None: + self.chat_id = chat_id + self.connect_calls = 0 + self.close_calls = 0 + + async def connect(self) -> None: + self.connect_calls += 1 + + async def close(self) -> None: + self.close_calls += 1 + + async def send_message(self, text: str, attachments: list[str] | None = None): + raise agent_api_wrapper_module.AgentException("INTERNAL_ERROR", "boom") + yield + + agent_api = FakeAgentApiFactory() + erroring = ErroringChatAgentApi("chat-1") + agent_api.for_chat = lambda chat_id: erroring + client = RealPlatformClient( + agent_api=agent_api, + prototype_state=PrototypeStateStore(), + platform="matrix", + ) + + with pytest.raises(PlatformError, match="boom") as exc_info: + await client.send_message("@alice:example.org", "chat-1", "hello") + + assert exc_info.value.code == "INTERNAL_ERROR" + assert erroring.close_calls == 1 + assert "chat-1" not in client._chat_apis +``` + +- [ ] **Step 2: Run tests to verify they fail before simplification** + +Run: + +```bash +/bin/zsh -lc 'PYTHONPATH=.:external/platform-agent_api uv run pytest tests/platform/test_real.py -q -k "passes_attachments_to_modern_send_message or disconnects_chat_after_agent_exception"' +``` + +Expected: + +- FAIL until `sdk/real.py` and Matrix runtime wiring are aligned with the pinned modern API + +- [ ] **Step 3: Simplify `sdk/real.py` and Matrix runtime construction** + +Make these exact edits: + +```python +# adapter/matrix/bot.py +def _build_platform_from_env() -> PlatformClient: + backend = os.environ.get("MATRIX_PLATFORM_BACKEND", "mock").strip().lower() + if backend == "real": + base_url = os.environ["AGENT_BASE_URL"] + return RealPlatformClient( + agent_api=AgentApiWrapper(agent_id="matrix-bot", base_url=base_url), + prototype_state=PrototypeStateStore(), + platform="matrix", + ) + return MockPlatformClient() +``` + +```python +# sdk/real.py +from __future__ import annotations + +import asyncio +from collections.abc import AsyncIterator +from pathlib import Path + +from sdk.agent_api_wrapper import AgentApiWrapper +from sdk.interface import ( + Attachment, + MessageChunk, + MessageResponse, + PlatformClient, + PlatformError, + User, + UserSettings, +) +from sdk.prototype_state import PrototypeStateStore + + +class RealPlatformClient(PlatformClient): + def __init__( + self, + agent_api: AgentApiWrapper, + prototype_state: PrototypeStateStore, + platform: str = "matrix", + ) -> None: + self._agent_api = agent_api + self._prototype_state = prototype_state + self._platform = platform + self._chat_apis: dict[str, AgentApiWrapper] = {} + self._chat_api_lock = asyncio.Lock() + self._chat_send_locks: dict[str, asyncio.Lock] = {} + + @property + def agent_api(self) -> AgentApiWrapper: + return self._agent_api + + async def _get_chat_api(self, chat_id: str) -> AgentApiWrapper: + chat_key = str(chat_id) + chat_api = self._chat_apis.get(chat_key) + if chat_api is None: + async with self._chat_api_lock: + chat_api = self._chat_apis.get(chat_key) + if chat_api is None: + chat_api = self._agent_api.for_chat(chat_key) + await chat_api.connect() + self._chat_apis[chat_key] = chat_api + return chat_api + + def _get_chat_send_lock(self, chat_id: str) -> asyncio.Lock: + chat_key = str(chat_id) + lock = self._chat_send_locks.get(chat_key) + if lock is None: + lock = asyncio.Lock() + self._chat_send_locks[chat_key] = lock + return lock + + async def send_message( + self, + user_id: str, + chat_id: str, + text: str, + attachments: list[Attachment] | None = None, + ) -> MessageResponse: + response_parts: list[str] = [] + tokens_used = 0 + sent_attachments: list[Attachment] = [] + message_id = user_id + + lock = self._get_chat_send_lock(chat_id) + async with lock: + chat_api = await self._get_chat_api(chat_id) + try: + async for event in chat_api.send_message(text, attachments=self._attachment_paths(attachments)): + if hasattr(event, "text"): + response_parts.append(event.text) + elif event.__class__.__name__ == "MsgEventEnd": + tokens_used = getattr(event, "tokens_used", 0) + elif "SEND_FILE" in getattr(getattr(event, "type", None), "value", str(getattr(event, "type", ""))): + attachment = self._attachment_from_send_file_event(event) + if attachment is not None: + sent_attachments.append(attachment) + except Exception as exc: + await self._handle_chat_api_failure(chat_id, exc) + + await self._prototype_state.set_last_tokens_used(str(chat_id), tokens_used) + + return MessageResponse( + message_id=message_id, + response="".join(response_parts), + tokens_used=tokens_used, + finished=True, + attachments=sent_attachments, + ) + + async def stream_message( + self, + user_id: str, + chat_id: str, + text: str, + attachments: list[Attachment] | None = None, + ) -> AsyncIterator[MessageChunk]: + lock = self._get_chat_send_lock(chat_id) + async with lock: + chat_api = await self._get_chat_api(chat_id) + try: + async for event in chat_api.send_message(text, attachments=self._attachment_paths(attachments)): + if hasattr(event, "text"): + yield MessageChunk( + message_id=user_id, + delta=event.text, + finished=False, + ) + elif event.__class__.__name__ == "MsgEventEnd": + tokens_used = getattr(event, "tokens_used", 0) + await self._prototype_state.set_last_tokens_used(str(chat_id), tokens_used) + yield MessageChunk( + message_id=user_id, + delta="", + finished=True, + tokens_used=tokens_used, + ) + except Exception as exc: + await self._handle_chat_api_failure(chat_id, exc) + + async def disconnect_chat(self, chat_id: str) -> None: + chat_key = str(chat_id) + chat_api = self._chat_apis.pop(chat_key, None) + self._chat_send_locks.pop(chat_key, None) + if chat_api is not None: + await chat_api.close() + + async def close(self) -> None: + for chat_api in list(self._chat_apis.values()): + await chat_api.close() + self._chat_apis.clear() + self._chat_send_locks.clear() + + async def _handle_chat_api_failure(self, chat_id: str, exc: Exception) -> None: + await self.disconnect_chat(chat_id) + code = getattr(exc, "code", None) or "PLATFORM_CONNECTION_ERROR" + raise PlatformError(str(exc), code=code) from exc + + @staticmethod + def _attachment_paths(attachments: list[Attachment] | None) -> list[str]: + if not attachments: + return [] + return [attachment.workspace_path for attachment in attachments if attachment.workspace_path] +``` + +- [ ] **Step 4: Run the focused transport tests** + +Run: + +```bash +/bin/zsh -lc 'PYTHONPATH=.:external/platform-agent_api uv run pytest tests/platform/test_real.py -q -k "passes_attachments_to_modern_send_message or disconnects_chat_after_agent_exception or wraps_connection_closed_as_platform_error or reconnects_after_closed_chat_api"' +``` + +Expected: + +- PASS + +- [ ] **Step 5: Commit** + +```bash +git add adapter/matrix/bot.py sdk/real.py tests/platform/test_real.py +git commit -m "refactor: use upstream transport semantics in real client" +``` + +### Task 3: Remove Custom Transport Assumptions From Tests And Docs + +**Files:** +- Modify: `tests/platform/test_real.py` +- Modify: `README.md` + +- [ ] **Step 1: Delete tests that encode wrapper-owned stream semantics** + +Remove any tests that assert: + +- late text is recovered after the first `END` +- duplicate `END` is repaired inside our wrapper +- wrapper-owned idle timeout semantics + +The file should keep only tests for: + +- wrapper construction/factory behavior +- per-chat client reuse +- reconnect/disconnect after failure +- attachment forwarding +- per-chat send locking + +- [ ] **Step 2: Update README transport description** + +Add this text to the Matrix runtime/backend section in `README.md`: + +```md +Transport layer note: + +- `surfaces` now uses the pinned upstream `platform-agent_api.AgentApi` stream semantics directly +- local code keeps only a thin adapter for client construction and per-chat client factories +- request serialization, disconnect-on-failure, and `PlatformError` mapping remain in `sdk/real.py` +- `surfaces` no longer performs local post-END stream reconstruction +``` + +- [ ] **Step 3: Run the full verification set** + +Run: + +```bash +uv run ruff check adapter/matrix sdk tests/platform/test_real.py +/bin/zsh -lc 'PYTHONPATH=. uv run pytest tests/adapter/matrix -q' +/bin/zsh -lc 'PYTHONPATH=.:external/platform-agent_api uv run pytest tests/platform/test_real.py -q' +``` + +Expected: + +- `ruff` reports `All checks passed!` +- Matrix adapter tests PASS +- `tests/platform/test_real.py` PASS + +- [ ] **Step 4: Commit** + +```bash +git add README.md tests/platform/test_real.py +git commit -m "test: remove custom transport semantics assumptions" +``` + +--- + +## Self-Review + +- Spec coverage: + - thin adapter target: covered by Task 1 + - integration-only `RealPlatformClient`: covered by Task 2 + - removal of custom stream semantics assumptions: covered by Task 3 + - re-verification after cleanup: covered by Task 3 + +- Placeholder scan: + - no `TODO`, `TBD`, or deferred implementation placeholders remain in task steps + +- Type consistency: + - `AgentApiWrapper` remains the construction/factory type used by `RealPlatformClient` + - failure mapping still terminates in `PlatformError` + - attachment forwarding consistently uses `attachments: list[str]`