surfaces/.planning/phases/04-matrix-mvp-shared-agent-context-and-context-management-comma/04-01-PLAN.md

22 KiB
Raw Blame History

phase plan type wave depends_on files_modified autonomous requirements must_haves
04-matrix-mvp-shared-agent-context-and-context-management-comma 01 execute 1
sdk/agent_session.py
sdk/real.py
adapter/matrix/bot.py
tests/platform/test_agent_session.py
tests/platform/test_real.py
tests/adapter/matrix/test_dispatcher.py
true
Replace AgentSessionClient with AgentApi
Wire AgentApi lifecycle into MatrixBot
truths artifacts key_links
RealPlatformClient uses AgentApi, not AgentSessionClient
AgentApi is connected before sync_forever and closed in finally block of main()
build_thread_key and AgentSessionClient are gone from sdk/
stream_message() yields MessageChunk objects including a final chunk with tokens_used from last_tokens_used
AGENT_WS_URL is used unchanged (no thread_id query param)
MATRIX_PLATFORM_BACKEND=real still works end-to-end without test crash
All existing tests pass after the swap
path provides contains
sdk/real.py RealPlatformClient wrapping AgentApi AgentApi
path provides contains
adapter/matrix/bot.py main() awaits agent_api.connect() and agent_api.close() agent_api.connect
path provides
tests/platform/test_real.py Updated tests using FakeAgentApi instead of FakeAgentSessionClient
from to via pattern
adapter/matrix/bot.py main() RealPlatformClient._agent_api runtime.platform.agent_api property agent_api.connect
from to via pattern
sdk/real.py stream_message() agent_api.last_tokens_used attribute read after async-for loop last_tokens_used
Replace the custom per-request AgentSessionClient with the persistent AgentApi from lambda_agent_api. Remove build_thread_key and AgentSessionClient entirely. Wire AgentApi connect/close into bot.py main(). Update all tests that referenced the old client.

Purpose: The existing AgentSessionClient creates a new WebSocket per message and injects thread_id into the URL — both incompatible with origin/main platform-agent. AgentApi maintains a single persistent WS connection managed via connect()/close() and exposes send_message() as an AsyncIterator.

Output: sdk/real.py, sdk/agent_session.py (deleted/emptied), adapter/matrix/bot.py updated, tests green.

<execution_context> @$HOME/.claude/get-shit-done/workflows/execute-plan.md @$HOME/.claude/get-shit-done/templates/summary.md </execution_context>

@.planning/PROJECT.md @.planning/ROADMAP.md @.planning/phases/04-matrix-mvp-shared-agent-context-and-context-management-comma/04-CONTEXT.md @.planning/phases/04-matrix-mvp-shared-agent-context-and-context-management-comma/04-RESEARCH.md

From external/platform-agent_api/lambda_agent_api/agent_api.py:

class AgentApi:
    def __init__(self, agent_id: str, url: str,
                 callback=None, on_disconnect=None): ...
    async def connect(self) -> None: ...   # opens WS, awaits MsgStatus, starts _listen task
    async def close(self) -> None: ...     # cancels _listen, closes WS+session
    async def send_message(self, text: str) -> AsyncIterator[AgentEventUnion]:
        # yields MsgEventTextChunk only; breaks on MsgEventEnd (does NOT yield it)
        # MsgEventEnd.tokens_used is consumed internally but NOT stored — executor
        # MUST add self.last_tokens_used: int = 0 to AgentApi and set it at the
        # break point, OR store it in a thin wrapper on RealPlatformClient.
        ...
    # AgentEventUnion = Union[MsgEventTextChunk, MsgEventEnd] per server.py

From external/platform-agent_api/lambda_agent_api/server.py:

class MsgEventTextChunk(BaseModel):
    type: Literal[EServerMessage.AGENT_EVENT_TEXT_CHUNK]
    text: str

class MsgEventEnd(BaseModel):
    type: Literal[EServerMessage.AGENT_EVENT_END]
    tokens_used: int

From sdk/interface.py (unchanged):

class MessageChunk(BaseModel):
    message_id: str
    delta: str
    finished: bool
    tokens_used: int = 0

class PlatformClient(Protocol):
    async def send_message(self, user_id, chat_id, text, attachments=None) -> MessageResponse: ...
    async def stream_message(self, user_id, chat_id, text, attachments=None) -> AsyncIterator[MessageChunk]: ...
Task 1: Replace AgentSessionClient with AgentApi in sdk/real.py, delete sdk/agent_session.py, patch tokens_used capture

<read_first> - sdk/real.py (full file — being replaced) - sdk/agent_session.py (full file — being deleted) - external/platform-agent_api/lambda_agent_api/agent_api.py (lines 134216 — send_message generator + finally block) - sdk/interface.py (MessageChunk, PlatformClient Protocol) </read_first>

sdk/real.py, sdk/agent_session.py, external/platform-agent_api/lambda_agent_api/agent_api.py

- RealPlatformClient.__init__ accepts agent_api: AgentApi (not AgentSessionClient), prototype_state: PrototypeStateStore, platform: str = "matrix" - RealPlatformClient exposes agent_api as property self.agent_api so bot.py main() can call connect/close - stream_message() iterates agent_api.send_message(text) yielding MessageChunk per MsgEventTextChunk chunk; after loop yields final MessageChunk(finished=True, delta="", tokens_used=agent_api.last_tokens_used) - send_message() collects all chunks from stream_message() and returns MessageResponse - No thread_key, no build_thread_key references anywhere in sdk/real.py - AgentApi.last_tokens_used: int = 0 added as instance attribute in __init__; set inside send_message() generator at the "if isinstance(chunk, MsgEventEnd): break" line — change that line to "self.last_tokens_used = chunk.tokens_used; break" - sdk/agent_session.py: delete file contents and replace with single comment "# Deleted in Phase 4 — replaced by AgentApi from lambda_agent_api" (keep file to avoid import errors in test_real.py until tests are updated in Task 2) 1. Edit external/platform-agent_api/lambda_agent_api/agent_api.py: - In __init__: add `self.last_tokens_used: int = 0` - In send_message() at line ~172 (`if isinstance(chunk, MsgEventEnd): break`): replace with: ```python if isinstance(chunk, MsgEventEnd): self.last_tokens_used = chunk.tokens_used break ```
  1. Rewrite sdk/real.py entirely:
from __future__ import annotations

from typing import TYPE_CHECKING, AsyncIterator

from sdk.interface import Attachment, MessageChunk, MessageResponse, PlatformClient, User, UserSettings
from sdk.prototype_state import PrototypeStateStore

if TYPE_CHECKING:
    from lambda_agent_api.agent_api import AgentApi


class RealPlatformClient(PlatformClient):
    def __init__(
        self,
        agent_api: "AgentApi",
        prototype_state: PrototypeStateStore,
        platform: str = "matrix",
    ) -> None:
        self._agent_api = agent_api
        self._prototype_state = prototype_state
        self._platform = platform

    @property
    def agent_api(self) -> "AgentApi":
        return self._agent_api

    async def get_or_create_user(
        self,
        external_id: str,
        platform: str,
        display_name: str | None = None,
    ) -> User:
        return await self._prototype_state.get_or_create_user(
            external_id=external_id,
            platform=platform,
            display_name=display_name,
        )

    async def send_message(
        self,
        user_id: str,
        chat_id: str,
        text: str,
        attachments: list[Attachment] | None = None,
    ) -> MessageResponse:
        parts: list[str] = []
        tokens_used = 0
        async for chunk in self.stream_message(user_id, chat_id, text, attachments):
            if chunk.delta:
                parts.append(chunk.delta)
            if chunk.finished:
                tokens_used = chunk.tokens_used
        return MessageResponse(
            message_id=user_id,
            response="".join(parts),
            tokens_used=tokens_used,
            finished=True,
        )

    async def stream_message(
        self,
        user_id: str,
        chat_id: str,
        text: str,
        attachments: list[Attachment] | None = None,
    ) -> AsyncIterator[MessageChunk]:
        from lambda_agent_api.server import MsgEventTextChunk
        async for event in self._agent_api.send_message(text):
            if isinstance(event, MsgEventTextChunk):
                yield MessageChunk(
                    message_id=user_id,
                    delta=event.text,
                    finished=False,
                )
        yield MessageChunk(
            message_id=user_id,
            delta="",
            finished=True,
            tokens_used=self._agent_api.last_tokens_used,
        )

    async def get_settings(self, user_id: str) -> UserSettings:
        return await self._prototype_state.get_settings(user_id)

    async def update_settings(self, user_id: str, action) -> None:
        await self._prototype_state.update_settings(user_id, action)
  1. Replace sdk/agent_session.py content with:
# Deleted in Phase 4 — replaced by AgentApi from lambda_agent_api
# File kept as stub to avoid import errors during migration; remove after test_agent_session.py is updated.
cd /Users/a/MAI/sem2/lambda/surfaces-bot && python -c "from sdk.real import RealPlatformClient; print('import ok')" - sdk/real.py imports AgentApi (not AgentSessionClient), exposes self.agent_api property - sdk/real.py stream_message yields final chunk with tokens_used from agent_api.last_tokens_used - agent_api.py __init__ has self.last_tokens_used = 0 and send_message sets it before break - sdk/agent_session.py contains only a comment stub (no class definitions) - `python -c "from sdk.real import RealPlatformClient"` exits 0 Task 2: Wire AgentApi lifecycle into bot.py main(); update all broken tests

<read_first> - adapter/matrix/bot.py (full file — _build_platform_from_env and main() need changes) - tests/platform/test_agent_session.py (full file — delete or rewrite) - tests/platform/test_real.py (full file — FakeAgentSessionClient → FakeAgentApi) - tests/adapter/matrix/test_dispatcher.py (test_build_runtime_uses_real_platform — needs update) </read_first>

adapter/matrix/bot.py, tests/platform/test_agent_session.py, tests/platform/test_real.py, tests/adapter/matrix/test_dispatcher.py

- _build_platform_from_env() returns a RealPlatformClient with an unconnected AgentApi (connect() NOT called here — called in main()) - main() calls await runtime.platform.agent_api.connect() after build_runtime() (only when backend is "real"; mock has no agent_api); wrap in `if hasattr(runtime.platform, "agent_api")` guard - main() finally block: await agent_api.close() before await client.close() - AGENT_WS_URL env var is passed unchanged to AgentApi(url=ws_url) — no query param manipulation - test_agent_session.py: completely rewritten — remove all build_thread_key tests, remove AgentSessionClient tests, remove process_message tests (those tested our platform-agent patch which is being discarded); replace with 2 tests: (1) import check for lambda_agent_api module, (2) stub test that documents the deletion - test_real.py: FakeAgentSessionClient replaced with FakeAgentApi that has send_message(text: str) -> AsyncIterator and last_tokens_used: int = 0; tests updated to construct RealPlatformClient(agent_api=FakeAgentApi(), prototype_state=PrototypeStateStore()); test_send_message no longer checks thread_key in message_id (now uses user_id); test_stream_message checks final chunk tokens_used comes from FakeAgentApi.last_tokens_used - test_dispatcher.py: test_build_runtime_uses_real_platform_when_matrix_backend_is_real must NOT call agent_api.connect() (build_runtime only constructs, does not connect); update test to mock AgentApi so it does not attempt a real WS connection; assert isinstance(runtime.platform, RealPlatformClient) still passes 1. Edit adapter/matrix/bot.py:

a. Remove imports: from sdk.agent_session import AgentSessionClient, AgentSessionConfig

b. Add import at top: import sys; sys.path.insert(0, str(Path(__file__).resolve().parents[2] / "external" / "platform-agent_api")) — NO, instead add lambda_agent_api to sys.path only in bot.py startup, or better: install the package. In _build_platform_from_env(), do a lazy import:

def _build_platform_from_env() -> PlatformClient:
    backend = os.environ.get("MATRIX_PLATFORM_BACKEND", "mock").strip().lower()
    if backend == "real":
        import sys
        _api_root = Path(__file__).resolve().parents[2] / "external" / "platform-agent_api"
        if str(_api_root) not in sys.path:
            sys.path.insert(0, str(_api_root))
        from lambda_agent_api.agent_api import AgentApi
        ws_url = os.environ["AGENT_WS_URL"]
        agent_api = AgentApi(agent_id="matrix-bot", url=ws_url)
        return RealPlatformClient(
            agent_api=agent_api,
            prototype_state=PrototypeStateStore(),
            platform="matrix",
        )
    return MockPlatformClient()

c. In main(), after runtime = build_runtime(store=SQLiteStore(db_path), client=client), add:

if hasattr(runtime.platform, "agent_api"):
    await runtime.platform.agent_api.connect()

d. In main() finally block, add before await client.close():

if hasattr(runtime.platform, "agent_api"):
    await runtime.platform.agent_api.close()
  1. Rewrite tests/platform/test_agent_session.py:
"""
test_agent_session.py — stub after Phase 4 migration.

AgentSessionClient and build_thread_key were removed in Phase 4.
The platform client is now AgentApi from lambda_agent_api.
See tests/platform/test_real.py for RealPlatformClient tests.
"""
import sys
from pathlib import Path

_api_root = Path(__file__).resolve().parents[2] / "external" / "platform-agent_api"
if str(_api_root) not in sys.path:
    sys.path.insert(0, str(_api_root))


def test_lambda_agent_api_module_importable():
    from lambda_agent_api.agent_api import AgentApi  # noqa: F401
    from lambda_agent_api.server import MsgEventTextChunk, MsgEventEnd  # noqa: F401
    assert True


def test_agent_session_module_is_stub():
    """Ensure old module no longer exposes AgentSessionClient or build_thread_key."""
    import sdk.agent_session as mod
    assert not hasattr(mod, "AgentSessionClient"), "AgentSessionClient should be removed"
    assert not hasattr(mod, "build_thread_key"), "build_thread_key should be removed"
  1. Rewrite tests/platform/test_real.py:
from __future__ import annotations

import sys
from pathlib import Path
from typing import AsyncIterator

import pytest

from core.protocol import SettingsAction
from sdk.interface import MessageChunk, MessageResponse, UserSettings
from sdk.prototype_state import PrototypeStateStore
from sdk.real import RealPlatformClient

_api_root = Path(__file__).resolve().parents[2] / "external" / "platform-agent_api"
if str(_api_root) not in sys.path:
    sys.path.insert(0, str(_api_root))

from lambda_agent_api.server import MsgEventTextChunk, EServerMessage  # noqa: E402


class FakeAgentApi:
    """Minimal fake for AgentApi — no real WebSocket."""
    def __init__(self) -> None:
        self.last_tokens_used: int = 0
        self.send_calls: list[str] = []

    async def send_message(self, text: str) -> AsyncIterator[MsgEventTextChunk]:
        self.send_calls.append(text)
        self.last_tokens_used = 7
        yield MsgEventTextChunk(type=EServerMessage.AGENT_EVENT_TEXT_CHUNK, text=text[:2])
        yield MsgEventTextChunk(type=EServerMessage.AGENT_EVENT_TEXT_CHUNK, text=text[2:])
        # send_message() in real AgentApi breaks on MsgEventEnd without yielding it;
        # FakeAgentApi mirrors this by not yielding MsgEventEnd — last_tokens_used is set directly.


@pytest.mark.asyncio
async def test_real_platform_client_get_or_create_user_uses_local_state():
    client = RealPlatformClient(
        agent_api=FakeAgentApi(),
        prototype_state=PrototypeStateStore(),
    )
    first = await client.get_or_create_user("u1", "matrix", "Alice")
    second = await client.get_or_create_user("u1", "matrix")

    assert first.user_id == "usr-matrix-u1"
    assert first.is_new is True
    assert second.user_id == first.user_id
    assert second.is_new is False
    assert second.display_name == "Alice"


@pytest.mark.asyncio
async def test_real_platform_client_send_message_calls_agent_with_text():
    fake = FakeAgentApi()
    client = RealPlatformClient(agent_api=fake, prototype_state=PrototypeStateStore())

    result = await client.send_message("@alice:example.org", "C1", "hello")

    assert result.response == "hello"
    assert result.tokens_used == 7
    assert fake.send_calls == ["hello"]


@pytest.mark.asyncio
async def test_real_platform_client_stream_message_yields_chunks_and_final_with_tokens():
    fake = FakeAgentApi()
    client = RealPlatformClient(agent_api=fake, prototype_state=PrototypeStateStore())

    chunks = []
    async for chunk in client.stream_message("@alice:example.org", "C1", "hello"):
        chunks.append(chunk)

    assert chunks[-1].finished is True
    assert chunks[-1].tokens_used == 7
    assert "".join(c.delta for c in chunks) == "hello"


@pytest.mark.asyncio
async def test_real_platform_client_settings_are_local():
    client = RealPlatformClient(
        agent_api=FakeAgentApi(),
        prototype_state=PrototypeStateStore(),
    )
    await client.update_settings(
        "usr-matrix-u1",
        SettingsAction(action="toggle_skill", payload={"skill": "browser", "enabled": True}),
    )
    settings = await client.get_settings("usr-matrix-u1")
    assert isinstance(settings, UserSettings)
    assert settings.skills["browser"] is True
  1. Edit tests/adapter/matrix/test_dispatcher.py — update test_build_runtime_uses_real_platform_when_matrix_backend_is_real:
    • Add sys.path setup for lambda_agent_api (same pattern as above)
    • Mock AgentApi so it does not open a real WS:
    async def test_build_runtime_uses_real_platform_when_matrix_backend_is_real(monkeypatch):
        import sys
        from pathlib import Path
        _api_root = Path(__file__).resolve().parents[3] / "external" / "platform-agent_api"
        if str(_api_root) not in sys.path:
            sys.path.insert(0, str(_api_root))
    
        monkeypatch.setenv("MATRIX_PLATFORM_BACKEND", "real")
        monkeypatch.setenv("AGENT_WS_URL", "ws://agent.example/agent_ws/")
    
        # Patch AgentApi to avoid real WS connection during build_runtime
        import lambda_agent_api.agent_api as _mod
        class _FakeAgentApi:
            def __init__(self, agent_id, url, **kw):
                self.last_tokens_used = 0
            async def connect(self): pass
            async def close(self): pass
            async def send_message(self, text):
                return; yield  # empty async generator
        monkeypatch.setattr(_mod, "AgentApi", _FakeAgentApi)
    
        from adapter.matrix.bot import build_runtime
        from sdk.real import RealPlatformClient
        runtime = build_runtime()
        assert isinstance(runtime.platform, RealPlatformClient)
    
cd /Users/a/MAI/sem2/lambda/surfaces-bot && python -m pytest tests/platform/test_agent_session.py tests/platform/test_real.py tests/adapter/matrix/test_dispatcher.py -v 2>&1 | tail -20 - All tests in test_agent_session.py, test_real.py, test_dispatcher.py pass - main() in bot.py has agent_api.connect() call guarded by hasattr check - main() finally block closes agent_api before matrix client - grep confirms no "AgentSessionClient" or "build_thread_key" remain in sdk/real.py or adapter/matrix/bot.py

<threat_model>

Trust Boundaries

Boundary Description
bot → platform-agent WS Outbound WS to agent service; input is user text
env vars → bot config AGENT_WS_URL, MATRIX_PLATFORM_BACKEND read from environment

STRIDE Threat Register

Threat ID Category Component Disposition Mitigation Plan
T-04-01-01 Tampering AgentApi.send_message() text accept Single-user prototype; text originates from authenticated Matrix user
T-04-01-02 Denial of Service AgentBusyException from concurrent sends mitigate AgentApi._request_lock already prevents concurrent sends; bot must surface error to user instead of crashing
T-04-01-03 Information Disclosure AGENT_WS_URL in env accept Internal service URL; not exposed to users
</threat_model>
Run full test suite after both tasks complete:
cd /Users/a/MAI/sem2/lambda/surfaces-bot && python -m pytest tests/ -v 2>&1 | tail -30

Grep checks:

# No old imports should remain
grep -r "AgentSessionClient\|build_thread_key" sdk/ adapter/ tests/ --include="*.py" | grep -v "stub\|Deleted\|removed"

# AgentApi wired in bot.py
grep "agent_api.connect\|agent_api.close" adapter/matrix/bot.py

# last_tokens_used set in agent_api.py
grep "last_tokens_used" external/platform-agent_api/lambda_agent_api/agent_api.py

<success_criteria>

  • pytest tests/platform/ tests/adapter/matrix/test_dispatcher.py -v exits 0 with no failures
  • grep -r "AgentSessionClient" sdk/ adapter/ returns empty (or only the stub comment)
  • grep -r "build_thread_key" sdk/ adapter/ returns empty
  • grep "agent_api.connect" adapter/matrix/bot.py returns a match
  • grep "last_tokens_used" external/platform-agent_api/lambda_agent_api/agent_api.py returns the assignment line </success_criteria>
After completion, create `.planning/phases/04-matrix-mvp-shared-agent-context-and-context-management-comma/04-01-SUMMARY.md`