surfaces/docs/superpowers/plans/2026-04-22-transport-layer-thin-adapter.md

18 KiB

Transport Layer Thin Adapter Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Replace the current custom transport behavior with a thin upstream-compatible adapter so Matrix uses platform-agent_api.AgentApi almost as-is and keeps only integration concerns on the surfaces side.

Architecture: Keep a tiny AgentApiWrapper only as a construction/factory shim (base_url normalization + for_chat(chat_id)). Move all resilience logic that still belongs to surfaces into RealPlatformClient: per-chat client caching, per-chat send locks, disconnect-on-failure, and PlatformError mapping. Delete all custom stream semantics from the wrapper so bugs in streaming can be attributed cleanly to either upstream or our integration layer.

Tech Stack: Python 3.11, aiohttp, upstream lambda_agent_api, pytest, pytest-asyncio, ruff


File Structure

  • Modify: sdk/agent_api_wrapper.py Purpose: shrink the wrapper to a thin constructor/factory shim with no custom _listen() or send_message() logic.
  • Modify: sdk/real.py Purpose: keep integration-only behavior: cached per-chat clients, chat locks, attachment forwarding, and failure cleanup.
  • Modify: adapter/matrix/bot.py Purpose: instantiate the wrapper with the modern base_url path instead of a raw url, matching the pinned upstream API.
  • Modify: tests/platform/test_real.py Purpose: remove tests that encode custom wrapper semantics and replace them with tests that prove only the intended integration guarantees.
  • Modify: README.md Purpose: document that the transport layer now uses the pinned upstream platform-agent_api client semantics directly, with only a thin local adapter.

Task 1: Shrink AgentApiWrapper To A Thin Factory Shim

Files:

  • Modify: sdk/agent_api_wrapper.py

  • Test: tests/platform/test_real.py

  • Step 1: Replace wrapper-only behavior tests with thin-wrapper tests

Update tests/platform/test_real.py so it no longer asserts any custom post-END drain or wrapper-owned timeout behavior. Replace those tests with the following:

def test_agent_api_wrapper_normalizes_base_url_and_uses_modern_constructor(monkeypatch):
    captured = {}

    def fake_init(self, agent_id, base_url=None, chat_id=0, **kwargs):
        captured["agent_id"] = agent_id
        captured["base_url"] = base_url
        captured["chat_id"] = chat_id

    monkeypatch.setattr(agent_api_wrapper_module.AgentApi, "__init__", fake_init)

    wrapper = AgentApiWrapper(
        agent_id="agent-1",
        base_url="ws://platform-agent:8000/v1/agent_ws/",
        chat_id="41",
    )

    assert wrapper.chat_id == "41"
    assert wrapper._base_url == "ws://platform-agent:8000"
    assert captured == {
        "agent_id": "agent-1",
        "base_url": "ws://platform-agent:8000",
        "chat_id": "41",
    }


def test_agent_api_wrapper_for_chat_reuses_normalized_base_url(monkeypatch):
    init_calls = []

    def fake_init(self, agent_id, base_url=None, chat_id=0, **kwargs):
        self.id = agent_id
        self.chat_id = chat_id
        self.url = base_url
        init_calls.append((agent_id, base_url, chat_id))

    monkeypatch.setattr(agent_api_wrapper_module.AgentApi, "__init__", fake_init)

    root = AgentApiWrapper(
        agent_id="agent-1",
        base_url="http://platform-agent:8000/v1/agent_ws/",
        chat_id="1",
    )

    child = root.for_chat("99")

    assert child is not root
    assert child.chat_id == "99"
    assert child._base_url == "http://platform-agent:8000"
    assert init_calls == [
        ("agent-1", "http://platform-agent:8000", "1"),
        ("agent-1", "http://platform-agent:8000", "99"),
    ]
  • Step 2: Run tests to verify old assumptions fail

Run:

/bin/zsh -lc 'PYTHONPATH=.:external/platform-agent_api uv run pytest tests/platform/test_real.py -q -k "recovers_late_text_after_first_end or times_out_on_idle_stream or normalizes_base_url_and_uses_modern_constructor or for_chat_reuses_normalized_base_url"'

Expected:

  • FAIL because the old wrapper-behavior tests still exist

  • FAIL or SKIP for the new thin-wrapper tests until code/tests are aligned

  • Step 3: Replace sdk/agent_api_wrapper.py with a thin wrapper

Rewrite sdk/agent_api_wrapper.py to the minimal implementation below:

from __future__ import annotations

import inspect
import re
import sys
from pathlib import Path
from urllib.parse import urlsplit, urlunsplit

_api_root = Path(__file__).resolve().parents[1] / "external" / "platform-agent_api"
if str(_api_root) not in sys.path:
    sys.path.insert(0, str(_api_root))

from lambda_agent_api.agent_api import AgentApi  # noqa: E402


class AgentApiWrapper(AgentApi):
    """Thin construction/factory shim over the pinned upstream AgentApi."""

    def __init__(
        self,
        agent_id: str,
        base_url: str,
        *,
        chat_id: int | str = 0,
        **kwargs,
    ) -> None:
        self._base_url = self._normalize_base_url(base_url)
        self._init_kwargs = dict(kwargs)
        self.chat_id = chat_id
        if not self._supports_modern_constructor():
            raise RuntimeError("Pinned platform-agent_api is expected to support base_url + chat_id")

        super().__init__(
            agent_id=agent_id,
            base_url=self._base_url,
            chat_id=chat_id,
            **kwargs,
        )

    @staticmethod
    def _supports_modern_constructor() -> bool:
        try:
            parameters = inspect.signature(AgentApi.__init__).parameters
        except (TypeError, ValueError):
            return False
        return "base_url" in parameters and "chat_id" in parameters

    @staticmethod
    def _normalize_base_url(base_url: str) -> str:
        parsed = urlsplit(base_url)
        path = re.sub(r"(?:/v1)?/agent_ws(?:/[^/]+)?/?$", "", parsed.path.rstrip("/"))
        return urlunsplit((parsed.scheme, parsed.netloc, path, "", ""))

    def for_chat(self, chat_id: int | str) -> "AgentApiWrapper":
        return type(self)(
            agent_id=self.id,
            base_url=self._base_url,
            chat_id=chat_id,
            **self._init_kwargs,
        )
  • Step 4: Run the wrapper-focused tests

Run:

/bin/zsh -lc 'PYTHONPATH=.:external/platform-agent_api uv run pytest tests/platform/test_real.py -q -k "normalizes_base_url_and_uses_modern_constructor or for_chat_reuses_normalized_base_url"'

Expected:

  • PASS

  • Step 5: Commit

git add sdk/agent_api_wrapper.py tests/platform/test_real.py
git commit -m "refactor: shrink agent api wrapper to thin adapter"

Task 2: Simplify RealPlatformClient To The Pinned Modern API

Files:

  • Modify: sdk/real.py

  • Modify: adapter/matrix/bot.py

  • Test: tests/platform/test_real.py

  • Step 1: Add failing integration tests for the desired thin-adapter contract

Extend tests/platform/test_real.py with these assertions:

@pytest.mark.asyncio
async def test_real_platform_client_passes_attachments_to_modern_send_message():
    agent_api = FakeAgentApiFactory()
    client = RealPlatformClient(
        agent_api=agent_api,
        prototype_state=PrototypeStateStore(),
        platform="matrix",
    )

    attachment = Attachment(
        type="document",
        filename="report.pdf",
        mime_type="application/pdf",
        workspace_path="surfaces/matrix/u1/r1/inbox/report.pdf",
    )

    result = await client.send_message(
        "@alice:example.org",
        "chat-1",
        "read this",
        attachments=[attachment],
    )

    assert result.response == "read this"
    assert agent_api.instances["chat-1"].calls == [
        ("read this", ["surfaces/matrix/u1/r1/inbox/report.pdf"])
    ]


@pytest.mark.asyncio
async def test_real_platform_client_disconnects_chat_after_agent_exception():
    class ErroringChatAgentApi:
        def __init__(self, chat_id: str) -> None:
            self.chat_id = chat_id
            self.connect_calls = 0
            self.close_calls = 0

        async def connect(self) -> None:
            self.connect_calls += 1

        async def close(self) -> None:
            self.close_calls += 1

        async def send_message(self, text: str, attachments: list[str] | None = None):
            raise agent_api_wrapper_module.AgentException("INTERNAL_ERROR", "boom")
            yield

    agent_api = FakeAgentApiFactory()
    erroring = ErroringChatAgentApi("chat-1")
    agent_api.for_chat = lambda chat_id: erroring
    client = RealPlatformClient(
        agent_api=agent_api,
        prototype_state=PrototypeStateStore(),
        platform="matrix",
    )

    with pytest.raises(PlatformError, match="boom") as exc_info:
        await client.send_message("@alice:example.org", "chat-1", "hello")

    assert exc_info.value.code == "INTERNAL_ERROR"
    assert erroring.close_calls == 1
    assert "chat-1" not in client._chat_apis
  • Step 2: Run tests to verify they fail before simplification

Run:

/bin/zsh -lc 'PYTHONPATH=.:external/platform-agent_api uv run pytest tests/platform/test_real.py -q -k "passes_attachments_to_modern_send_message or disconnects_chat_after_agent_exception"'

Expected:

  • FAIL until sdk/real.py and Matrix runtime wiring are aligned with the pinned modern API

  • Step 3: Simplify sdk/real.py and Matrix runtime construction

Make these exact edits:

# adapter/matrix/bot.py
def _build_platform_from_env() -> PlatformClient:
    backend = os.environ.get("MATRIX_PLATFORM_BACKEND", "mock").strip().lower()
    if backend == "real":
        base_url = os.environ["AGENT_BASE_URL"]
        return RealPlatformClient(
            agent_api=AgentApiWrapper(agent_id="matrix-bot", base_url=base_url),
            prototype_state=PrototypeStateStore(),
            platform="matrix",
        )
    return MockPlatformClient()
# sdk/real.py
from __future__ import annotations

import asyncio
from collections.abc import AsyncIterator
from pathlib import Path

from sdk.agent_api_wrapper import AgentApiWrapper
from sdk.interface import (
    Attachment,
    MessageChunk,
    MessageResponse,
    PlatformClient,
    PlatformError,
    User,
    UserSettings,
)
from sdk.prototype_state import PrototypeStateStore


class RealPlatformClient(PlatformClient):
    def __init__(
        self,
        agent_api: AgentApiWrapper,
        prototype_state: PrototypeStateStore,
        platform: str = "matrix",
    ) -> None:
        self._agent_api = agent_api
        self._prototype_state = prototype_state
        self._platform = platform
        self._chat_apis: dict[str, AgentApiWrapper] = {}
        self._chat_api_lock = asyncio.Lock()
        self._chat_send_locks: dict[str, asyncio.Lock] = {}

    @property
    def agent_api(self) -> AgentApiWrapper:
        return self._agent_api

    async def _get_chat_api(self, chat_id: str) -> AgentApiWrapper:
        chat_key = str(chat_id)
        chat_api = self._chat_apis.get(chat_key)
        if chat_api is None:
            async with self._chat_api_lock:
                chat_api = self._chat_apis.get(chat_key)
                if chat_api is None:
                    chat_api = self._agent_api.for_chat(chat_key)
                    await chat_api.connect()
                    self._chat_apis[chat_key] = chat_api
        return chat_api

    def _get_chat_send_lock(self, chat_id: str) -> asyncio.Lock:
        chat_key = str(chat_id)
        lock = self._chat_send_locks.get(chat_key)
        if lock is None:
            lock = asyncio.Lock()
            self._chat_send_locks[chat_key] = lock
        return lock

    async def send_message(
        self,
        user_id: str,
        chat_id: str,
        text: str,
        attachments: list[Attachment] | None = None,
    ) -> MessageResponse:
        response_parts: list[str] = []
        tokens_used = 0
        sent_attachments: list[Attachment] = []
        message_id = user_id

        lock = self._get_chat_send_lock(chat_id)
        async with lock:
            chat_api = await self._get_chat_api(chat_id)
            try:
                async for event in chat_api.send_message(text, attachments=self._attachment_paths(attachments)):
                    if hasattr(event, "text"):
                        response_parts.append(event.text)
                    elif event.__class__.__name__ == "MsgEventEnd":
                        tokens_used = getattr(event, "tokens_used", 0)
                    elif "SEND_FILE" in getattr(getattr(event, "type", None), "value", str(getattr(event, "type", ""))):
                        attachment = self._attachment_from_send_file_event(event)
                        if attachment is not None:
                            sent_attachments.append(attachment)
            except Exception as exc:
                await self._handle_chat_api_failure(chat_id, exc)

            await self._prototype_state.set_last_tokens_used(str(chat_id), tokens_used)

        return MessageResponse(
            message_id=message_id,
            response="".join(response_parts),
            tokens_used=tokens_used,
            finished=True,
            attachments=sent_attachments,
        )

    async def stream_message(
        self,
        user_id: str,
        chat_id: str,
        text: str,
        attachments: list[Attachment] | None = None,
    ) -> AsyncIterator[MessageChunk]:
        lock = self._get_chat_send_lock(chat_id)
        async with lock:
            chat_api = await self._get_chat_api(chat_id)
            try:
                async for event in chat_api.send_message(text, attachments=self._attachment_paths(attachments)):
                    if hasattr(event, "text"):
                        yield MessageChunk(
                            message_id=user_id,
                            delta=event.text,
                            finished=False,
                        )
                    elif event.__class__.__name__ == "MsgEventEnd":
                        tokens_used = getattr(event, "tokens_used", 0)
                        await self._prototype_state.set_last_tokens_used(str(chat_id), tokens_used)
                        yield MessageChunk(
                            message_id=user_id,
                            delta="",
                            finished=True,
                            tokens_used=tokens_used,
                        )
            except Exception as exc:
                await self._handle_chat_api_failure(chat_id, exc)

    async def disconnect_chat(self, chat_id: str) -> None:
        chat_key = str(chat_id)
        chat_api = self._chat_apis.pop(chat_key, None)
        self._chat_send_locks.pop(chat_key, None)
        if chat_api is not None:
            await chat_api.close()

    async def close(self) -> None:
        for chat_api in list(self._chat_apis.values()):
            await chat_api.close()
        self._chat_apis.clear()
        self._chat_send_locks.clear()

    async def _handle_chat_api_failure(self, chat_id: str, exc: Exception) -> None:
        await self.disconnect_chat(chat_id)
        code = getattr(exc, "code", None) or "PLATFORM_CONNECTION_ERROR"
        raise PlatformError(str(exc), code=code) from exc

    @staticmethod
    def _attachment_paths(attachments: list[Attachment] | None) -> list[str]:
        if not attachments:
            return []
        return [attachment.workspace_path for attachment in attachments if attachment.workspace_path]
  • Step 4: Run the focused transport tests

Run:

/bin/zsh -lc 'PYTHONPATH=.:external/platform-agent_api uv run pytest tests/platform/test_real.py -q -k "passes_attachments_to_modern_send_message or disconnects_chat_after_agent_exception or wraps_connection_closed_as_platform_error or reconnects_after_closed_chat_api"'

Expected:

  • PASS

  • Step 5: Commit

git add adapter/matrix/bot.py sdk/real.py tests/platform/test_real.py
git commit -m "refactor: use upstream transport semantics in real client"

Task 3: Remove Custom Transport Assumptions From Tests And Docs

Files:

  • Modify: tests/platform/test_real.py

  • Modify: README.md

  • Step 1: Delete tests that encode wrapper-owned stream semantics

Remove any tests that assert:

  • late text is recovered after the first END
  • duplicate END is repaired inside our wrapper
  • wrapper-owned idle timeout semantics

The file should keep only tests for:

  • wrapper construction/factory behavior

  • per-chat client reuse

  • reconnect/disconnect after failure

  • attachment forwarding

  • per-chat send locking

  • Step 2: Update README transport description

Add this text to the Matrix runtime/backend section in README.md:

Transport layer note:

- `surfaces` now uses the pinned upstream `platform-agent_api.AgentApi` stream semantics directly
- local code keeps only a thin adapter for client construction and per-chat client factories
- request serialization, disconnect-on-failure, and `PlatformError` mapping remain in `sdk/real.py`
- `surfaces` no longer performs local post-END stream reconstruction
  • Step 3: Run the full verification set

Run:

uv run ruff check adapter/matrix sdk tests/platform/test_real.py
/bin/zsh -lc 'PYTHONPATH=. uv run pytest tests/adapter/matrix -q'
/bin/zsh -lc 'PYTHONPATH=.:external/platform-agent_api uv run pytest tests/platform/test_real.py -q'

Expected:

  • ruff reports All checks passed!

  • Matrix adapter tests PASS

  • tests/platform/test_real.py PASS

  • Step 4: Commit

git add README.md tests/platform/test_real.py
git commit -m "test: remove custom transport semantics assumptions"

Self-Review

  • Spec coverage:

    • thin adapter target: covered by Task 1
    • integration-only RealPlatformClient: covered by Task 2
    • removal of custom stream semantics assumptions: covered by Task 3
    • re-verification after cleanup: covered by Task 3
  • Placeholder scan:

    • no TODO, TBD, or deferred implementation placeholders remain in task steps
  • Type consistency:

    • AgentApiWrapper remains the construction/factory type used by RealPlatformClient
    • failure mapping still terminates in PlatformError
    • attachment forwarding consistently uses attachments: list[str]