docs: add thin transport adapter plan
This commit is contained in:
parent
3a3fcdc695
commit
4d917ac794
1 changed files with 540 additions and 0 deletions
|
|
@ -0,0 +1,540 @@
|
|||
# Transport Layer Thin Adapter Implementation Plan
|
||||
|
||||
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
|
||||
|
||||
**Goal:** Replace the current custom transport behavior with a thin upstream-compatible adapter so Matrix uses `platform-agent_api.AgentApi` almost as-is and keeps only integration concerns on the `surfaces` side.
|
||||
|
||||
**Architecture:** Keep a tiny `AgentApiWrapper` only as a construction/factory shim (`base_url` normalization + `for_chat(chat_id)`). Move all resilience logic that still belongs to `surfaces` into `RealPlatformClient`: per-chat client caching, per-chat send locks, disconnect-on-failure, and `PlatformError` mapping. Delete all custom stream semantics from the wrapper so bugs in streaming can be attributed cleanly to either upstream or our integration layer.
|
||||
|
||||
**Tech Stack:** Python 3.11, `aiohttp`, upstream `lambda_agent_api`, `pytest`, `pytest-asyncio`, `ruff`
|
||||
|
||||
---
|
||||
|
||||
## File Structure
|
||||
|
||||
- Modify: `sdk/agent_api_wrapper.py`
|
||||
Purpose: shrink the wrapper to a thin constructor/factory shim with no custom `_listen()` or `send_message()` logic.
|
||||
- Modify: `sdk/real.py`
|
||||
Purpose: keep integration-only behavior: cached per-chat clients, chat locks, attachment forwarding, and failure cleanup.
|
||||
- Modify: `adapter/matrix/bot.py`
|
||||
Purpose: instantiate the wrapper with the modern `base_url` path instead of a raw `url`, matching the pinned upstream API.
|
||||
- Modify: `tests/platform/test_real.py`
|
||||
Purpose: remove tests that encode custom wrapper semantics and replace them with tests that prove only the intended integration guarantees.
|
||||
- Modify: `README.md`
|
||||
Purpose: document that the transport layer now uses the pinned upstream `platform-agent_api` client semantics directly, with only a thin local adapter.
|
||||
|
||||
### Task 1: Shrink `AgentApiWrapper` To A Thin Factory Shim
|
||||
|
||||
**Files:**
|
||||
- Modify: `sdk/agent_api_wrapper.py`
|
||||
- Test: `tests/platform/test_real.py`
|
||||
|
||||
- [ ] **Step 1: Replace wrapper-only behavior tests with thin-wrapper tests**
|
||||
|
||||
Update `tests/platform/test_real.py` so it no longer asserts any custom post-END drain or wrapper-owned timeout behavior. Replace those tests with the following:
|
||||
|
||||
```python
|
||||
def test_agent_api_wrapper_normalizes_base_url_and_uses_modern_constructor(monkeypatch):
|
||||
captured = {}
|
||||
|
||||
def fake_init(self, agent_id, base_url=None, chat_id=0, **kwargs):
|
||||
captured["agent_id"] = agent_id
|
||||
captured["base_url"] = base_url
|
||||
captured["chat_id"] = chat_id
|
||||
|
||||
monkeypatch.setattr(agent_api_wrapper_module.AgentApi, "__init__", fake_init)
|
||||
|
||||
wrapper = AgentApiWrapper(
|
||||
agent_id="agent-1",
|
||||
base_url="ws://platform-agent:8000/v1/agent_ws/",
|
||||
chat_id="41",
|
||||
)
|
||||
|
||||
assert wrapper.chat_id == "41"
|
||||
assert wrapper._base_url == "ws://platform-agent:8000"
|
||||
assert captured == {
|
||||
"agent_id": "agent-1",
|
||||
"base_url": "ws://platform-agent:8000",
|
||||
"chat_id": "41",
|
||||
}
|
||||
|
||||
|
||||
def test_agent_api_wrapper_for_chat_reuses_normalized_base_url(monkeypatch):
|
||||
init_calls = []
|
||||
|
||||
def fake_init(self, agent_id, base_url=None, chat_id=0, **kwargs):
|
||||
self.id = agent_id
|
||||
self.chat_id = chat_id
|
||||
self.url = base_url
|
||||
init_calls.append((agent_id, base_url, chat_id))
|
||||
|
||||
monkeypatch.setattr(agent_api_wrapper_module.AgentApi, "__init__", fake_init)
|
||||
|
||||
root = AgentApiWrapper(
|
||||
agent_id="agent-1",
|
||||
base_url="http://platform-agent:8000/v1/agent_ws/",
|
||||
chat_id="1",
|
||||
)
|
||||
|
||||
child = root.for_chat("99")
|
||||
|
||||
assert child is not root
|
||||
assert child.chat_id == "99"
|
||||
assert child._base_url == "http://platform-agent:8000"
|
||||
assert init_calls == [
|
||||
("agent-1", "http://platform-agent:8000", "1"),
|
||||
("agent-1", "http://platform-agent:8000", "99"),
|
||||
]
|
||||
```
|
||||
|
||||
- [ ] **Step 2: Run tests to verify old assumptions fail**
|
||||
|
||||
Run:
|
||||
|
||||
```bash
|
||||
/bin/zsh -lc 'PYTHONPATH=.:external/platform-agent_api uv run pytest tests/platform/test_real.py -q -k "recovers_late_text_after_first_end or times_out_on_idle_stream or normalizes_base_url_and_uses_modern_constructor or for_chat_reuses_normalized_base_url"'
|
||||
```
|
||||
|
||||
Expected:
|
||||
|
||||
- FAIL because the old wrapper-behavior tests still exist
|
||||
- FAIL or SKIP for the new thin-wrapper tests until code/tests are aligned
|
||||
|
||||
- [ ] **Step 3: Replace `sdk/agent_api_wrapper.py` with a thin wrapper**
|
||||
|
||||
Rewrite `sdk/agent_api_wrapper.py` to the minimal implementation below:
|
||||
|
||||
```python
|
||||
from __future__ import annotations
|
||||
|
||||
import inspect
|
||||
import re
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from urllib.parse import urlsplit, urlunsplit
|
||||
|
||||
_api_root = Path(__file__).resolve().parents[1] / "external" / "platform-agent_api"
|
||||
if str(_api_root) not in sys.path:
|
||||
sys.path.insert(0, str(_api_root))
|
||||
|
||||
from lambda_agent_api.agent_api import AgentApi # noqa: E402
|
||||
|
||||
|
||||
class AgentApiWrapper(AgentApi):
|
||||
"""Thin construction/factory shim over the pinned upstream AgentApi."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
agent_id: str,
|
||||
base_url: str,
|
||||
*,
|
||||
chat_id: int | str = 0,
|
||||
**kwargs,
|
||||
) -> None:
|
||||
self._base_url = self._normalize_base_url(base_url)
|
||||
self._init_kwargs = dict(kwargs)
|
||||
self.chat_id = chat_id
|
||||
if not self._supports_modern_constructor():
|
||||
raise RuntimeError("Pinned platform-agent_api is expected to support base_url + chat_id")
|
||||
|
||||
super().__init__(
|
||||
agent_id=agent_id,
|
||||
base_url=self._base_url,
|
||||
chat_id=chat_id,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _supports_modern_constructor() -> bool:
|
||||
try:
|
||||
parameters = inspect.signature(AgentApi.__init__).parameters
|
||||
except (TypeError, ValueError):
|
||||
return False
|
||||
return "base_url" in parameters and "chat_id" in parameters
|
||||
|
||||
@staticmethod
|
||||
def _normalize_base_url(base_url: str) -> str:
|
||||
parsed = urlsplit(base_url)
|
||||
path = re.sub(r"(?:/v1)?/agent_ws(?:/[^/]+)?/?$", "", parsed.path.rstrip("/"))
|
||||
return urlunsplit((parsed.scheme, parsed.netloc, path, "", ""))
|
||||
|
||||
def for_chat(self, chat_id: int | str) -> "AgentApiWrapper":
|
||||
return type(self)(
|
||||
agent_id=self.id,
|
||||
base_url=self._base_url,
|
||||
chat_id=chat_id,
|
||||
**self._init_kwargs,
|
||||
)
|
||||
```
|
||||
|
||||
- [ ] **Step 4: Run the wrapper-focused tests**
|
||||
|
||||
Run:
|
||||
|
||||
```bash
|
||||
/bin/zsh -lc 'PYTHONPATH=.:external/platform-agent_api uv run pytest tests/platform/test_real.py -q -k "normalizes_base_url_and_uses_modern_constructor or for_chat_reuses_normalized_base_url"'
|
||||
```
|
||||
|
||||
Expected:
|
||||
|
||||
- PASS
|
||||
|
||||
- [ ] **Step 5: Commit**
|
||||
|
||||
```bash
|
||||
git add sdk/agent_api_wrapper.py tests/platform/test_real.py
|
||||
git commit -m "refactor: shrink agent api wrapper to thin adapter"
|
||||
```
|
||||
|
||||
### Task 2: Simplify `RealPlatformClient` To The Pinned Modern API
|
||||
|
||||
**Files:**
|
||||
- Modify: `sdk/real.py`
|
||||
- Modify: `adapter/matrix/bot.py`
|
||||
- Test: `tests/platform/test_real.py`
|
||||
|
||||
- [ ] **Step 1: Add failing integration tests for the desired thin-adapter contract**
|
||||
|
||||
Extend `tests/platform/test_real.py` with these assertions:
|
||||
|
||||
```python
|
||||
@pytest.mark.asyncio
|
||||
async def test_real_platform_client_passes_attachments_to_modern_send_message():
|
||||
agent_api = FakeAgentApiFactory()
|
||||
client = RealPlatformClient(
|
||||
agent_api=agent_api,
|
||||
prototype_state=PrototypeStateStore(),
|
||||
platform="matrix",
|
||||
)
|
||||
|
||||
attachment = Attachment(
|
||||
type="document",
|
||||
filename="report.pdf",
|
||||
mime_type="application/pdf",
|
||||
workspace_path="surfaces/matrix/u1/r1/inbox/report.pdf",
|
||||
)
|
||||
|
||||
result = await client.send_message(
|
||||
"@alice:example.org",
|
||||
"chat-1",
|
||||
"read this",
|
||||
attachments=[attachment],
|
||||
)
|
||||
|
||||
assert result.response == "read this"
|
||||
assert agent_api.instances["chat-1"].calls == [
|
||||
("read this", ["surfaces/matrix/u1/r1/inbox/report.pdf"])
|
||||
]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_real_platform_client_disconnects_chat_after_agent_exception():
|
||||
class ErroringChatAgentApi:
|
||||
def __init__(self, chat_id: str) -> None:
|
||||
self.chat_id = chat_id
|
||||
self.connect_calls = 0
|
||||
self.close_calls = 0
|
||||
|
||||
async def connect(self) -> None:
|
||||
self.connect_calls += 1
|
||||
|
||||
async def close(self) -> None:
|
||||
self.close_calls += 1
|
||||
|
||||
async def send_message(self, text: str, attachments: list[str] | None = None):
|
||||
raise agent_api_wrapper_module.AgentException("INTERNAL_ERROR", "boom")
|
||||
yield
|
||||
|
||||
agent_api = FakeAgentApiFactory()
|
||||
erroring = ErroringChatAgentApi("chat-1")
|
||||
agent_api.for_chat = lambda chat_id: erroring
|
||||
client = RealPlatformClient(
|
||||
agent_api=agent_api,
|
||||
prototype_state=PrototypeStateStore(),
|
||||
platform="matrix",
|
||||
)
|
||||
|
||||
with pytest.raises(PlatformError, match="boom") as exc_info:
|
||||
await client.send_message("@alice:example.org", "chat-1", "hello")
|
||||
|
||||
assert exc_info.value.code == "INTERNAL_ERROR"
|
||||
assert erroring.close_calls == 1
|
||||
assert "chat-1" not in client._chat_apis
|
||||
```
|
||||
|
||||
- [ ] **Step 2: Run tests to verify they fail before simplification**
|
||||
|
||||
Run:
|
||||
|
||||
```bash
|
||||
/bin/zsh -lc 'PYTHONPATH=.:external/platform-agent_api uv run pytest tests/platform/test_real.py -q -k "passes_attachments_to_modern_send_message or disconnects_chat_after_agent_exception"'
|
||||
```
|
||||
|
||||
Expected:
|
||||
|
||||
- FAIL until `sdk/real.py` and Matrix runtime wiring are aligned with the pinned modern API
|
||||
|
||||
- [ ] **Step 3: Simplify `sdk/real.py` and Matrix runtime construction**
|
||||
|
||||
Make these exact edits:
|
||||
|
||||
```python
|
||||
# adapter/matrix/bot.py
|
||||
def _build_platform_from_env() -> PlatformClient:
|
||||
backend = os.environ.get("MATRIX_PLATFORM_BACKEND", "mock").strip().lower()
|
||||
if backend == "real":
|
||||
base_url = os.environ["AGENT_BASE_URL"]
|
||||
return RealPlatformClient(
|
||||
agent_api=AgentApiWrapper(agent_id="matrix-bot", base_url=base_url),
|
||||
prototype_state=PrototypeStateStore(),
|
||||
platform="matrix",
|
||||
)
|
||||
return MockPlatformClient()
|
||||
```
|
||||
|
||||
```python
|
||||
# sdk/real.py
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from collections.abc import AsyncIterator
|
||||
from pathlib import Path
|
||||
|
||||
from sdk.agent_api_wrapper import AgentApiWrapper
|
||||
from sdk.interface import (
|
||||
Attachment,
|
||||
MessageChunk,
|
||||
MessageResponse,
|
||||
PlatformClient,
|
||||
PlatformError,
|
||||
User,
|
||||
UserSettings,
|
||||
)
|
||||
from sdk.prototype_state import PrototypeStateStore
|
||||
|
||||
|
||||
class RealPlatformClient(PlatformClient):
|
||||
def __init__(
|
||||
self,
|
||||
agent_api: AgentApiWrapper,
|
||||
prototype_state: PrototypeStateStore,
|
||||
platform: str = "matrix",
|
||||
) -> None:
|
||||
self._agent_api = agent_api
|
||||
self._prototype_state = prototype_state
|
||||
self._platform = platform
|
||||
self._chat_apis: dict[str, AgentApiWrapper] = {}
|
||||
self._chat_api_lock = asyncio.Lock()
|
||||
self._chat_send_locks: dict[str, asyncio.Lock] = {}
|
||||
|
||||
@property
|
||||
def agent_api(self) -> AgentApiWrapper:
|
||||
return self._agent_api
|
||||
|
||||
async def _get_chat_api(self, chat_id: str) -> AgentApiWrapper:
|
||||
chat_key = str(chat_id)
|
||||
chat_api = self._chat_apis.get(chat_key)
|
||||
if chat_api is None:
|
||||
async with self._chat_api_lock:
|
||||
chat_api = self._chat_apis.get(chat_key)
|
||||
if chat_api is None:
|
||||
chat_api = self._agent_api.for_chat(chat_key)
|
||||
await chat_api.connect()
|
||||
self._chat_apis[chat_key] = chat_api
|
||||
return chat_api
|
||||
|
||||
def _get_chat_send_lock(self, chat_id: str) -> asyncio.Lock:
|
||||
chat_key = str(chat_id)
|
||||
lock = self._chat_send_locks.get(chat_key)
|
||||
if lock is None:
|
||||
lock = asyncio.Lock()
|
||||
self._chat_send_locks[chat_key] = lock
|
||||
return lock
|
||||
|
||||
async def send_message(
|
||||
self,
|
||||
user_id: str,
|
||||
chat_id: str,
|
||||
text: str,
|
||||
attachments: list[Attachment] | None = None,
|
||||
) -> MessageResponse:
|
||||
response_parts: list[str] = []
|
||||
tokens_used = 0
|
||||
sent_attachments: list[Attachment] = []
|
||||
message_id = user_id
|
||||
|
||||
lock = self._get_chat_send_lock(chat_id)
|
||||
async with lock:
|
||||
chat_api = await self._get_chat_api(chat_id)
|
||||
try:
|
||||
async for event in chat_api.send_message(text, attachments=self._attachment_paths(attachments)):
|
||||
if hasattr(event, "text"):
|
||||
response_parts.append(event.text)
|
||||
elif event.__class__.__name__ == "MsgEventEnd":
|
||||
tokens_used = getattr(event, "tokens_used", 0)
|
||||
elif "SEND_FILE" in getattr(getattr(event, "type", None), "value", str(getattr(event, "type", ""))):
|
||||
attachment = self._attachment_from_send_file_event(event)
|
||||
if attachment is not None:
|
||||
sent_attachments.append(attachment)
|
||||
except Exception as exc:
|
||||
await self._handle_chat_api_failure(chat_id, exc)
|
||||
|
||||
await self._prototype_state.set_last_tokens_used(str(chat_id), tokens_used)
|
||||
|
||||
return MessageResponse(
|
||||
message_id=message_id,
|
||||
response="".join(response_parts),
|
||||
tokens_used=tokens_used,
|
||||
finished=True,
|
||||
attachments=sent_attachments,
|
||||
)
|
||||
|
||||
async def stream_message(
|
||||
self,
|
||||
user_id: str,
|
||||
chat_id: str,
|
||||
text: str,
|
||||
attachments: list[Attachment] | None = None,
|
||||
) -> AsyncIterator[MessageChunk]:
|
||||
lock = self._get_chat_send_lock(chat_id)
|
||||
async with lock:
|
||||
chat_api = await self._get_chat_api(chat_id)
|
||||
try:
|
||||
async for event in chat_api.send_message(text, attachments=self._attachment_paths(attachments)):
|
||||
if hasattr(event, "text"):
|
||||
yield MessageChunk(
|
||||
message_id=user_id,
|
||||
delta=event.text,
|
||||
finished=False,
|
||||
)
|
||||
elif event.__class__.__name__ == "MsgEventEnd":
|
||||
tokens_used = getattr(event, "tokens_used", 0)
|
||||
await self._prototype_state.set_last_tokens_used(str(chat_id), tokens_used)
|
||||
yield MessageChunk(
|
||||
message_id=user_id,
|
||||
delta="",
|
||||
finished=True,
|
||||
tokens_used=tokens_used,
|
||||
)
|
||||
except Exception as exc:
|
||||
await self._handle_chat_api_failure(chat_id, exc)
|
||||
|
||||
async def disconnect_chat(self, chat_id: str) -> None:
|
||||
chat_key = str(chat_id)
|
||||
chat_api = self._chat_apis.pop(chat_key, None)
|
||||
self._chat_send_locks.pop(chat_key, None)
|
||||
if chat_api is not None:
|
||||
await chat_api.close()
|
||||
|
||||
async def close(self) -> None:
|
||||
for chat_api in list(self._chat_apis.values()):
|
||||
await chat_api.close()
|
||||
self._chat_apis.clear()
|
||||
self._chat_send_locks.clear()
|
||||
|
||||
async def _handle_chat_api_failure(self, chat_id: str, exc: Exception) -> None:
|
||||
await self.disconnect_chat(chat_id)
|
||||
code = getattr(exc, "code", None) or "PLATFORM_CONNECTION_ERROR"
|
||||
raise PlatformError(str(exc), code=code) from exc
|
||||
|
||||
@staticmethod
|
||||
def _attachment_paths(attachments: list[Attachment] | None) -> list[str]:
|
||||
if not attachments:
|
||||
return []
|
||||
return [attachment.workspace_path for attachment in attachments if attachment.workspace_path]
|
||||
```
|
||||
|
||||
- [ ] **Step 4: Run the focused transport tests**
|
||||
|
||||
Run:
|
||||
|
||||
```bash
|
||||
/bin/zsh -lc 'PYTHONPATH=.:external/platform-agent_api uv run pytest tests/platform/test_real.py -q -k "passes_attachments_to_modern_send_message or disconnects_chat_after_agent_exception or wraps_connection_closed_as_platform_error or reconnects_after_closed_chat_api"'
|
||||
```
|
||||
|
||||
Expected:
|
||||
|
||||
- PASS
|
||||
|
||||
- [ ] **Step 5: Commit**
|
||||
|
||||
```bash
|
||||
git add adapter/matrix/bot.py sdk/real.py tests/platform/test_real.py
|
||||
git commit -m "refactor: use upstream transport semantics in real client"
|
||||
```
|
||||
|
||||
### Task 3: Remove Custom Transport Assumptions From Tests And Docs
|
||||
|
||||
**Files:**
|
||||
- Modify: `tests/platform/test_real.py`
|
||||
- Modify: `README.md`
|
||||
|
||||
- [ ] **Step 1: Delete tests that encode wrapper-owned stream semantics**
|
||||
|
||||
Remove any tests that assert:
|
||||
|
||||
- late text is recovered after the first `END`
|
||||
- duplicate `END` is repaired inside our wrapper
|
||||
- wrapper-owned idle timeout semantics
|
||||
|
||||
The file should keep only tests for:
|
||||
|
||||
- wrapper construction/factory behavior
|
||||
- per-chat client reuse
|
||||
- reconnect/disconnect after failure
|
||||
- attachment forwarding
|
||||
- per-chat send locking
|
||||
|
||||
- [ ] **Step 2: Update README transport description**
|
||||
|
||||
Add this text to the Matrix runtime/backend section in `README.md`:
|
||||
|
||||
```md
|
||||
Transport layer note:
|
||||
|
||||
- `surfaces` now uses the pinned upstream `platform-agent_api.AgentApi` stream semantics directly
|
||||
- local code keeps only a thin adapter for client construction and per-chat client factories
|
||||
- request serialization, disconnect-on-failure, and `PlatformError` mapping remain in `sdk/real.py`
|
||||
- `surfaces` no longer performs local post-END stream reconstruction
|
||||
```
|
||||
|
||||
- [ ] **Step 3: Run the full verification set**
|
||||
|
||||
Run:
|
||||
|
||||
```bash
|
||||
uv run ruff check adapter/matrix sdk tests/platform/test_real.py
|
||||
/bin/zsh -lc 'PYTHONPATH=. uv run pytest tests/adapter/matrix -q'
|
||||
/bin/zsh -lc 'PYTHONPATH=.:external/platform-agent_api uv run pytest tests/platform/test_real.py -q'
|
||||
```
|
||||
|
||||
Expected:
|
||||
|
||||
- `ruff` reports `All checks passed!`
|
||||
- Matrix adapter tests PASS
|
||||
- `tests/platform/test_real.py` PASS
|
||||
|
||||
- [ ] **Step 4: Commit**
|
||||
|
||||
```bash
|
||||
git add README.md tests/platform/test_real.py
|
||||
git commit -m "test: remove custom transport semantics assumptions"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Self-Review
|
||||
|
||||
- Spec coverage:
|
||||
- thin adapter target: covered by Task 1
|
||||
- integration-only `RealPlatformClient`: covered by Task 2
|
||||
- removal of custom stream semantics assumptions: covered by Task 3
|
||||
- re-verification after cleanup: covered by Task 3
|
||||
|
||||
- Placeholder scan:
|
||||
- no `TODO`, `TBD`, or deferred implementation placeholders remain in task steps
|
||||
|
||||
- Type consistency:
|
||||
- `AgentApiWrapper` remains the construction/factory type used by `RealPlatformClient`
|
||||
- failure mapping still terminates in `PlatformError`
|
||||
- attachment forwarding consistently uses `attachments: list[str]`
|
||||
Loading…
Add table
Add a link
Reference in a new issue