# Transport Layer Thin Adapter Implementation Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** Replace the current custom transport behavior with a thin upstream-compatible adapter so Matrix uses `platform-agent_api.AgentApi` almost as-is and keeps only integration concerns on the `surfaces` side. **Architecture:** Keep a tiny `AgentApiWrapper` only as a construction/factory shim (`base_url` normalization + `for_chat(chat_id)`). Move all resilience logic that still belongs to `surfaces` into `RealPlatformClient`: per-chat client caching, per-chat send locks, disconnect-on-failure, and `PlatformError` mapping. Delete all custom stream semantics from the wrapper so bugs in streaming can be attributed cleanly to either upstream or our integration layer. **Tech Stack:** Python 3.11, `aiohttp`, upstream `lambda_agent_api`, `pytest`, `pytest-asyncio`, `ruff` --- ## File Structure - Modify: `sdk/agent_api_wrapper.py` Purpose: shrink the wrapper to a thin constructor/factory shim with no custom `_listen()` or `send_message()` logic. - Modify: `sdk/real.py` Purpose: keep integration-only behavior: cached per-chat clients, chat locks, attachment forwarding, and failure cleanup. - Modify: `adapter/matrix/bot.py` Purpose: instantiate the wrapper with the modern `base_url` path instead of a raw `url`, matching the pinned upstream API. - Modify: `tests/platform/test_real.py` Purpose: remove tests that encode custom wrapper semantics and replace them with tests that prove only the intended integration guarantees. - Modify: `README.md` Purpose: document that the transport layer now uses the pinned upstream `platform-agent_api` client semantics directly, with only a thin local adapter. ### Task 1: Shrink `AgentApiWrapper` To A Thin Factory Shim **Files:** - Modify: `sdk/agent_api_wrapper.py` - Test: `tests/platform/test_real.py` - [ ] **Step 1: Replace wrapper-only behavior tests with thin-wrapper tests** Update `tests/platform/test_real.py` so it no longer asserts any custom post-END drain or wrapper-owned timeout behavior. Replace those tests with the following: ```python def test_agent_api_wrapper_normalizes_base_url_and_uses_modern_constructor(monkeypatch): captured = {} def fake_init(self, agent_id, base_url=None, chat_id=0, **kwargs): captured["agent_id"] = agent_id captured["base_url"] = base_url captured["chat_id"] = chat_id monkeypatch.setattr(agent_api_wrapper_module.AgentApi, "__init__", fake_init) wrapper = AgentApiWrapper( agent_id="agent-1", base_url="ws://platform-agent:8000/v1/agent_ws/", chat_id="41", ) assert wrapper.chat_id == "41" assert wrapper._base_url == "ws://platform-agent:8000" assert captured == { "agent_id": "agent-1", "base_url": "ws://platform-agent:8000", "chat_id": "41", } def test_agent_api_wrapper_for_chat_reuses_normalized_base_url(monkeypatch): init_calls = [] def fake_init(self, agent_id, base_url=None, chat_id=0, **kwargs): self.id = agent_id self.chat_id = chat_id self.url = base_url init_calls.append((agent_id, base_url, chat_id)) monkeypatch.setattr(agent_api_wrapper_module.AgentApi, "__init__", fake_init) root = AgentApiWrapper( agent_id="agent-1", base_url="http://platform-agent:8000/v1/agent_ws/", chat_id="1", ) child = root.for_chat("99") assert child is not root assert child.chat_id == "99" assert child._base_url == "http://platform-agent:8000" assert init_calls == [ ("agent-1", "http://platform-agent:8000", "1"), ("agent-1", "http://platform-agent:8000", "99"), ] ``` - [ ] **Step 2: Run tests to verify old assumptions fail** Run: ```bash /bin/zsh -lc 'PYTHONPATH=.:external/platform-agent_api uv run pytest tests/platform/test_real.py -q -k "recovers_late_text_after_first_end or times_out_on_idle_stream or normalizes_base_url_and_uses_modern_constructor or for_chat_reuses_normalized_base_url"' ``` Expected: - FAIL because the old wrapper-behavior tests still exist - FAIL or SKIP for the new thin-wrapper tests until code/tests are aligned - [ ] **Step 3: Replace `sdk/agent_api_wrapper.py` with a thin wrapper** Rewrite `sdk/agent_api_wrapper.py` to the minimal implementation below: ```python from __future__ import annotations import inspect import re import sys from pathlib import Path from urllib.parse import urlsplit, urlunsplit _api_root = Path(__file__).resolve().parents[1] / "external" / "platform-agent_api" if str(_api_root) not in sys.path: sys.path.insert(0, str(_api_root)) from lambda_agent_api.agent_api import AgentApi # noqa: E402 class AgentApiWrapper(AgentApi): """Thin construction/factory shim over the pinned upstream AgentApi.""" def __init__( self, agent_id: str, base_url: str, *, chat_id: int | str = 0, **kwargs, ) -> None: self._base_url = self._normalize_base_url(base_url) self._init_kwargs = dict(kwargs) self.chat_id = chat_id if not self._supports_modern_constructor(): raise RuntimeError("Pinned platform-agent_api is expected to support base_url + chat_id") super().__init__( agent_id=agent_id, base_url=self._base_url, chat_id=chat_id, **kwargs, ) @staticmethod def _supports_modern_constructor() -> bool: try: parameters = inspect.signature(AgentApi.__init__).parameters except (TypeError, ValueError): return False return "base_url" in parameters and "chat_id" in parameters @staticmethod def _normalize_base_url(base_url: str) -> str: parsed = urlsplit(base_url) path = re.sub(r"(?:/v1)?/agent_ws(?:/[^/]+)?/?$", "", parsed.path.rstrip("/")) return urlunsplit((parsed.scheme, parsed.netloc, path, "", "")) def for_chat(self, chat_id: int | str) -> "AgentApiWrapper": return type(self)( agent_id=self.id, base_url=self._base_url, chat_id=chat_id, **self._init_kwargs, ) ``` - [ ] **Step 4: Run the wrapper-focused tests** Run: ```bash /bin/zsh -lc 'PYTHONPATH=.:external/platform-agent_api uv run pytest tests/platform/test_real.py -q -k "normalizes_base_url_and_uses_modern_constructor or for_chat_reuses_normalized_base_url"' ``` Expected: - PASS - [ ] **Step 5: Commit** ```bash git add sdk/agent_api_wrapper.py tests/platform/test_real.py git commit -m "refactor: shrink agent api wrapper to thin adapter" ``` ### Task 2: Simplify `RealPlatformClient` To The Pinned Modern API **Files:** - Modify: `sdk/real.py` - Modify: `adapter/matrix/bot.py` - Test: `tests/platform/test_real.py` - [ ] **Step 1: Add failing integration tests for the desired thin-adapter contract** Extend `tests/platform/test_real.py` with these assertions: ```python @pytest.mark.asyncio async def test_real_platform_client_passes_attachments_to_modern_send_message(): agent_api = FakeAgentApiFactory() client = RealPlatformClient( agent_api=agent_api, prototype_state=PrototypeStateStore(), platform="matrix", ) attachment = Attachment( type="document", filename="report.pdf", mime_type="application/pdf", workspace_path="surfaces/matrix/u1/r1/inbox/report.pdf", ) result = await client.send_message( "@alice:example.org", "chat-1", "read this", attachments=[attachment], ) assert result.response == "read this" assert agent_api.instances["chat-1"].calls == [ ("read this", ["surfaces/matrix/u1/r1/inbox/report.pdf"]) ] @pytest.mark.asyncio async def test_real_platform_client_disconnects_chat_after_agent_exception(): class ErroringChatAgentApi: def __init__(self, chat_id: str) -> None: self.chat_id = chat_id self.connect_calls = 0 self.close_calls = 0 async def connect(self) -> None: self.connect_calls += 1 async def close(self) -> None: self.close_calls += 1 async def send_message(self, text: str, attachments: list[str] | None = None): raise agent_api_wrapper_module.AgentException("INTERNAL_ERROR", "boom") yield agent_api = FakeAgentApiFactory() erroring = ErroringChatAgentApi("chat-1") agent_api.for_chat = lambda chat_id: erroring client = RealPlatformClient( agent_api=agent_api, prototype_state=PrototypeStateStore(), platform="matrix", ) with pytest.raises(PlatformError, match="boom") as exc_info: await client.send_message("@alice:example.org", "chat-1", "hello") assert exc_info.value.code == "INTERNAL_ERROR" assert erroring.close_calls == 1 assert "chat-1" not in client._chat_apis ``` - [ ] **Step 2: Run tests to verify they fail before simplification** Run: ```bash /bin/zsh -lc 'PYTHONPATH=.:external/platform-agent_api uv run pytest tests/platform/test_real.py -q -k "passes_attachments_to_modern_send_message or disconnects_chat_after_agent_exception"' ``` Expected: - FAIL until `sdk/real.py` and Matrix runtime wiring are aligned with the pinned modern API - [ ] **Step 3: Simplify `sdk/real.py` and Matrix runtime construction** Make these exact edits: ```python # adapter/matrix/bot.py def _build_platform_from_env() -> PlatformClient: backend = os.environ.get("MATRIX_PLATFORM_BACKEND", "mock").strip().lower() if backend == "real": base_url = os.environ["AGENT_BASE_URL"] return RealPlatformClient( agent_api=AgentApiWrapper(agent_id="matrix-bot", base_url=base_url), prototype_state=PrototypeStateStore(), platform="matrix", ) return MockPlatformClient() ``` ```python # sdk/real.py from __future__ import annotations import asyncio from collections.abc import AsyncIterator from pathlib import Path from sdk.agent_api_wrapper import AgentApiWrapper from sdk.interface import ( Attachment, MessageChunk, MessageResponse, PlatformClient, PlatformError, User, UserSettings, ) from sdk.prototype_state import PrototypeStateStore class RealPlatformClient(PlatformClient): def __init__( self, agent_api: AgentApiWrapper, prototype_state: PrototypeStateStore, platform: str = "matrix", ) -> None: self._agent_api = agent_api self._prototype_state = prototype_state self._platform = platform self._chat_apis: dict[str, AgentApiWrapper] = {} self._chat_api_lock = asyncio.Lock() self._chat_send_locks: dict[str, asyncio.Lock] = {} @property def agent_api(self) -> AgentApiWrapper: return self._agent_api async def _get_chat_api(self, chat_id: str) -> AgentApiWrapper: chat_key = str(chat_id) chat_api = self._chat_apis.get(chat_key) if chat_api is None: async with self._chat_api_lock: chat_api = self._chat_apis.get(chat_key) if chat_api is None: chat_api = self._agent_api.for_chat(chat_key) await chat_api.connect() self._chat_apis[chat_key] = chat_api return chat_api def _get_chat_send_lock(self, chat_id: str) -> asyncio.Lock: chat_key = str(chat_id) lock = self._chat_send_locks.get(chat_key) if lock is None: lock = asyncio.Lock() self._chat_send_locks[chat_key] = lock return lock async def send_message( self, user_id: str, chat_id: str, text: str, attachments: list[Attachment] | None = None, ) -> MessageResponse: response_parts: list[str] = [] tokens_used = 0 sent_attachments: list[Attachment] = [] message_id = user_id lock = self._get_chat_send_lock(chat_id) async with lock: chat_api = await self._get_chat_api(chat_id) try: async for event in chat_api.send_message(text, attachments=self._attachment_paths(attachments)): if hasattr(event, "text"): response_parts.append(event.text) elif event.__class__.__name__ == "MsgEventEnd": tokens_used = getattr(event, "tokens_used", 0) elif "SEND_FILE" in getattr(getattr(event, "type", None), "value", str(getattr(event, "type", ""))): attachment = self._attachment_from_send_file_event(event) if attachment is not None: sent_attachments.append(attachment) except Exception as exc: await self._handle_chat_api_failure(chat_id, exc) await self._prototype_state.set_last_tokens_used(str(chat_id), tokens_used) return MessageResponse( message_id=message_id, response="".join(response_parts), tokens_used=tokens_used, finished=True, attachments=sent_attachments, ) async def stream_message( self, user_id: str, chat_id: str, text: str, attachments: list[Attachment] | None = None, ) -> AsyncIterator[MessageChunk]: lock = self._get_chat_send_lock(chat_id) async with lock: chat_api = await self._get_chat_api(chat_id) try: async for event in chat_api.send_message(text, attachments=self._attachment_paths(attachments)): if hasattr(event, "text"): yield MessageChunk( message_id=user_id, delta=event.text, finished=False, ) elif event.__class__.__name__ == "MsgEventEnd": tokens_used = getattr(event, "tokens_used", 0) await self._prototype_state.set_last_tokens_used(str(chat_id), tokens_used) yield MessageChunk( message_id=user_id, delta="", finished=True, tokens_used=tokens_used, ) except Exception as exc: await self._handle_chat_api_failure(chat_id, exc) async def disconnect_chat(self, chat_id: str) -> None: chat_key = str(chat_id) chat_api = self._chat_apis.pop(chat_key, None) self._chat_send_locks.pop(chat_key, None) if chat_api is not None: await chat_api.close() async def close(self) -> None: for chat_api in list(self._chat_apis.values()): await chat_api.close() self._chat_apis.clear() self._chat_send_locks.clear() async def _handle_chat_api_failure(self, chat_id: str, exc: Exception) -> None: await self.disconnect_chat(chat_id) code = getattr(exc, "code", None) or "PLATFORM_CONNECTION_ERROR" raise PlatformError(str(exc), code=code) from exc @staticmethod def _attachment_paths(attachments: list[Attachment] | None) -> list[str]: if not attachments: return [] return [attachment.workspace_path for attachment in attachments if attachment.workspace_path] ``` - [ ] **Step 4: Run the focused transport tests** Run: ```bash /bin/zsh -lc 'PYTHONPATH=.:external/platform-agent_api uv run pytest tests/platform/test_real.py -q -k "passes_attachments_to_modern_send_message or disconnects_chat_after_agent_exception or wraps_connection_closed_as_platform_error or reconnects_after_closed_chat_api"' ``` Expected: - PASS - [ ] **Step 5: Commit** ```bash git add adapter/matrix/bot.py sdk/real.py tests/platform/test_real.py git commit -m "refactor: use upstream transport semantics in real client" ``` ### Task 3: Remove Custom Transport Assumptions From Tests And Docs **Files:** - Modify: `tests/platform/test_real.py` - Modify: `README.md` - [ ] **Step 1: Delete tests that encode wrapper-owned stream semantics** Remove any tests that assert: - late text is recovered after the first `END` - duplicate `END` is repaired inside our wrapper - wrapper-owned idle timeout semantics The file should keep only tests for: - wrapper construction/factory behavior - per-chat client reuse - reconnect/disconnect after failure - attachment forwarding - per-chat send locking - [ ] **Step 2: Update README transport description** Add this text to the Matrix runtime/backend section in `README.md`: ```md Transport layer note: - `surfaces` now uses the pinned upstream `platform-agent_api.AgentApi` stream semantics directly - local code keeps only a thin adapter for client construction and per-chat client factories - request serialization, disconnect-on-failure, and `PlatformError` mapping remain in `sdk/real.py` - `surfaces` no longer performs local post-END stream reconstruction ``` - [ ] **Step 3: Run the full verification set** Run: ```bash uv run ruff check adapter/matrix sdk tests/platform/test_real.py /bin/zsh -lc 'PYTHONPATH=. uv run pytest tests/adapter/matrix -q' /bin/zsh -lc 'PYTHONPATH=.:external/platform-agent_api uv run pytest tests/platform/test_real.py -q' ``` Expected: - `ruff` reports `All checks passed!` - Matrix adapter tests PASS - `tests/platform/test_real.py` PASS - [ ] **Step 4: Commit** ```bash git add README.md tests/platform/test_real.py git commit -m "test: remove custom transport semantics assumptions" ``` --- ## Self-Review - Spec coverage: - thin adapter target: covered by Task 1 - integration-only `RealPlatformClient`: covered by Task 2 - removal of custom stream semantics assumptions: covered by Task 3 - re-verification after cleanup: covered by Task 3 - Placeholder scan: - no `TODO`, `TBD`, or deferred implementation placeholders remain in task steps - Type consistency: - `AgentApiWrapper` remains the construction/factory type used by `RealPlatformClient` - failure mapping still terminates in `PlatformError` - attachment forwarding consistently uses `attachments: list[str]`