diff --git a/.planning/ROADMAP.md b/.planning/ROADMAP.md index 9f3eba8..e81178c 100644 --- a/.planning/ROADMAP.md +++ b/.planning/ROADMAP.md @@ -58,9 +58,9 @@ Plans: **Plans:** 3 plans Plans: -- [ ] 04-01-PLAN.md — Replace AgentSessionClient with AgentApi; update sdk/real.py, bot.py, broken tests -- [ ] 04-02-PLAN.md — !save, !load, !reset, !context handlers; PrototypeStateStore extensions; numeric interception -- [ ] 04-03-PLAN.md — Dockerfile + docker-compose.yml + .env.example update +- [x] 04-01-PLAN.md — Replace AgentSessionClient with AgentApi; update sdk/real.py, bot.py, broken tests +- [x] 04-02-PLAN.md — !save, !load, !reset, !context handlers; PrototypeStateStore extensions; numeric interception +- [x] 04-03-PLAN.md — Dockerfile + docker-compose.yml + .env.example update --- diff --git a/.planning/STATE.md b/.planning/STATE.md index 45aed52..384ed33 100644 --- a/.planning/STATE.md +++ b/.planning/STATE.md @@ -3,13 +3,13 @@ gsd_state_version: 1.0 milestone: v1.0 milestone_name: — Production-ready surfaces status: Ready to execute -last_updated: "2026-04-17T12:34:33.578Z" +last_updated: "2026-04-17T16:10:00.000Z" progress: total_phases: 5 - completed_phases: 1 + completed_phases: 2 total_plans: 12 - completed_plans: 6 - percent: 50 + completed_plans: 9 + percent: 75 --- # State @@ -19,13 +19,13 @@ progress: See: .planning/PROJECT.md (updated 2026-04-02) **Core value:** Пользователь ведёт диалог с Lambda через любой мессенджер без изменения ядра -**Current focus:** Phase 02 — SDK Integration (blocked on Lambda platform SDK readiness) +**Current focus:** Phase 04 complete — Matrix MVP implementation ready for testing ## Current Phase -**Phase 2** of 3: SDK Integration +**Phase 4** implementation complete: Matrix MVP -Phase 1 is complete. Phase 2 remains blocked until the Lambda platform SDK is available. +Phase 4 is implemented. Next step is manual and automated testing of the Matrix MVP flow before deciding on follow-up work. ## Decisions @@ -43,6 +43,9 @@ Phase 1 is complete. Phase 2 remains blocked until the Lambda platform SDK is av - [Phase 01]: Removed Matrix reaction conversion entirely and kept command callbacks limited to !yes/!no. - [Phase 01]: Kept !settings as a pure snapshot surface while preserving mutable subcommands outside the dashboard. - [Phase 01]: Seeded invite and dispatcher tests with explicit next_chat_index and room ids instead of treating C1 as Matrix transport identity. +- [Phase 04]: Replaced AgentSessionClient with AgentApiWrapper and persistent agent connection lifecycle in Matrix runtime. +- [Phase 04]: Added !save, !load, !reset, and !context commands with pending-state interception and local prototype session metadata. +- [Phase 04]: Added Matrix-only Docker packaging for MVP deployment; platform services remain external to this compose setup. ## Blockers @@ -54,6 +57,7 @@ Phase 1 is complete. Phase 2 remains blocked until the Lambda platform SDK is av - Phase 01.1 inserted after Phase 01: Matrix restart reconciliation and dev reset workflow (URGENT) - Phase 4 added: Matrix MVP: shared agent context and context management command +- New platform signal: upcoming proper `chat_id` support should enable file-level context separation and stronger context management in a future follow-up phase. ## Performance Metrics @@ -65,8 +69,11 @@ Phase 1 is complete. Phase 2 remains blocked until the Lambda platform SDK is av | 01 | 04 | 3 min | 2 | 7 | 2026-04-02T20:03:38Z | | 01 | 05 | 2 min | 2 | 7 | 2026-04-03T09:28:47Z | | 01 | 06 | 4 min | 2 | 7 | 2026-04-03T09:35:39Z | +| 04 | 01 | 1 session | 1 wave | 8 | 2026-04-17 | +| 04 | 02 | 1 session | 2 commits + summary | 8 | 2026-04-17 | +| 04 | 03 | 1 session | 1 commit + summary | 4 | 2026-04-17 | ## Session -- Last session: 2026-04-03T09:35:39Z -- Stopped at: Completed 01-06-PLAN.md +- Last session: 2026-04-17T16:10:00Z +- Stopped at: Phase 4 implementation complete, ready for testing diff --git a/.planning/phases/04-matrix-mvp-shared-agent-context-and-context-management-comma/04-01-SUMMARY.md b/.planning/phases/04-matrix-mvp-shared-agent-context-and-context-management-comma/04-01-SUMMARY.md new file mode 100644 index 0000000..dcd6114 --- /dev/null +++ b/.planning/phases/04-matrix-mvp-shared-agent-context-and-context-management-comma/04-01-SUMMARY.md @@ -0,0 +1,29 @@ +# 04-01 Summary + +## Outcome + +Replaced the Matrix real backend's custom `AgentSessionClient` path with a shared +`AgentApiWrapper` over upstream `lambda_agent_api.AgentApi`. + +## Changes + +- Added `sdk/agent_api_wrapper.py` to capture `MsgEventEnd.tokens_used` without + modifying `external/`. +- Rewrote `sdk/real.py` to use a shared `agent_api`, stream text chunks from + `AgentApi.send_message()`, and emit a final `MessageChunk` with + `last_tokens_used`. +- Updated `adapter/matrix/bot.py` to construct `RealPlatformClient` with + `AgentApiWrapper`, keep `AGENT_WS_URL` unchanged, and manage + `agent_api.connect()` / `agent_api.close()` around `sync_forever()`. +- Stubbed `sdk/agent_session.py` as a compatibility placeholder. +- Updated Matrix/runtime tests away from `thread_key` and per-request websocket + assumptions. + +## Verification + +- `pytest tests/platform/test_real.py -q` +- `pytest tests/adapter/matrix/test_dispatcher.py -q` +- `pytest tests/core/test_integration.py -q` +- `pytest tests/platform/test_agent_session.py -q` + +All listed commands passed locally. diff --git a/sdk/agent_api_wrapper.py b/sdk/agent_api_wrapper.py new file mode 100644 index 0000000..206f6c3 --- /dev/null +++ b/sdk/agent_api_wrapper.py @@ -0,0 +1,88 @@ +from __future__ import annotations + +import asyncio +import logging +import sys +from pathlib import Path + +import aiohttp + +_api_root = Path(__file__).resolve().parents[1] / "external" / "platform-agent_api" +if str(_api_root) not in sys.path: + sys.path.insert(0, str(_api_root)) + +from lambda_agent_api.agent_api import AgentApi, AgentException +from lambda_agent_api.server import ( + MsgError, + MsgEventEnd, + MsgEventTextChunk, + MsgGracefulDisconnect, + ServerMessage, +) + +logger = logging.getLogger(__name__) + + +class AgentApiWrapper(AgentApi): + """Capture tokens_used from MsgEventEnd without patching upstream code.""" + + def __init__(self, agent_id: str, url: str, **kwargs) -> None: + super().__init__(agent_id=agent_id, url=url, **kwargs) + self.last_tokens_used = 0 + + async def _listen(self): + try: + async for msg in self._ws: + if msg.type == aiohttp.WSMsgType.TEXT: + try: + outgoing_msg = ServerMessage.validate_json(msg.data) + + if isinstance(outgoing_msg, MsgEventTextChunk): + if self._current_queue: + await self._current_queue.put(outgoing_msg) + elif self.callback: + self.callback(outgoing_msg) + else: + logger.warning("[%s] AgentEvent without active request", self.id) + + elif isinstance(outgoing_msg, MsgEventEnd): + self.last_tokens_used = outgoing_msg.tokens_used + if self._current_queue: + await self._current_queue.put(outgoing_msg) + + elif isinstance(outgoing_msg, MsgError): + if self.callback: + self.callback(outgoing_msg) + error = AgentException(outgoing_msg.code, outgoing_msg.details) + logger.error("[%s] Agent error: %s", self.id, error) + if self._current_queue: + await self._current_queue.put(error) + + elif isinstance(outgoing_msg, MsgGracefulDisconnect): + if self.callback: + self.callback(outgoing_msg) + logger.info("[%s] Gracefully disconnecting", self.id) + break + + else: + logger.warning("[%s] Unknown message type: %s", self.id, outgoing_msg.type) + if self.callback: + self.callback(outgoing_msg) + + except Exception as exc: + logger.error("[%s] Failed to deserialize message: %s", self.id, exc) + if self._current_queue: + await self._current_queue.put( + AgentException("PARSE_ERROR", f"Validation failed: {exc}") + ) + + elif msg.type in (aiohttp.WSMsgType.ERROR, aiohttp.WSMsgType.CLOSED): + logger.error("[%s] WebSocket closed/error: %s", self.id, msg.type) + break + + except asyncio.CancelledError: + pass + except Exception as exc: + logger.error("[%s] Error in listen loop: %s", self.id, exc) + finally: + await self._cleanup() diff --git a/sdk/agent_session.py b/sdk/agent_session.py index 0f959a1..63acdd1 100644 --- a/sdk/agent_session.py +++ b/sdk/agent_session.py @@ -1,93 +1 @@ -from __future__ import annotations - -from dataclasses import dataclass -from typing import AsyncIterator -from urllib.parse import parse_qsl, urlencode, urlsplit, urlunsplit - -from sdk.interface import MessageChunk, MessageResponse, PlatformError - - -def build_thread_key(platform: str, user_id: str, chat_id: str) -> str: - return f"{len(platform)}:{platform}{len(user_id)}:{user_id}{len(chat_id)}:{chat_id}" - - -@dataclass(frozen=True, slots=True) -class AgentSessionConfig: - base_ws_url: str - timeout_seconds: float = 30.0 - - -class AgentSessionClient: - def __init__(self, config: AgentSessionConfig) -> None: - self._config = config - - async def send_message(self, *, thread_key: str, text: str) -> MessageResponse: - response_parts: list[str] = [] - tokens_used = 0 - - async for chunk in self.stream_message(thread_key=thread_key, text=text): - if chunk.delta: - response_parts.append(chunk.delta) - if chunk.finished: - tokens_used = chunk.tokens_used - - return MessageResponse( - message_id=thread_key, - response="".join(response_parts), - tokens_used=tokens_used, - finished=True, - ) - - async def stream_message(self, *, thread_key: str, text: str) -> AsyncIterator[MessageChunk]: - import aiohttp - - async with aiohttp.ClientSession() as session: - async with session.ws_connect( - self._ws_url(thread_key), - heartbeat=30, - ) as ws: - status = await ws.receive_json(timeout=self._config.timeout_seconds) - if status.get("type") != "STATUS": - raise PlatformError("Agent did not send STATUS", code="AGENT_PROTOCOL_ERROR") - - await ws.send_json({"type": "USER_MESSAGE", "text": text}) - - while True: - payload = await ws.receive_json(timeout=self._config.timeout_seconds) - msg_type = payload.get("type") - - if msg_type == "AGENT_EVENT_TEXT_CHUNK": - yield MessageChunk( - message_id=thread_key, - delta=payload["text"], - finished=False, - ) - elif msg_type == "AGENT_EVENT_END": - yield MessageChunk( - message_id=thread_key, - delta="", - finished=True, - tokens_used=payload.get("tokens_used", 0), - ) - return - elif msg_type == "ERROR": - raise PlatformError( - payload.get("details", "Agent error"), - code=payload.get("code", "AGENT_ERROR"), - ) - elif msg_type == "GRACEFUL_DISCONNECT": - raise PlatformError( - "Agent disconnected gracefully", - code="GRACEFUL_DISCONNECT", - ) - else: - raise PlatformError( - f"Unexpected agent message: {payload}", - code="AGENT_PROTOCOL_ERROR", - ) - - def _ws_url(self, thread_key: str) -> str: - parts = urlsplit(self._config.base_ws_url) - query = dict(parse_qsl(parts.query, keep_blank_values=True)) - query["thread_id"] = thread_key - return urlunsplit(parts._replace(query=urlencode(query))) +"""Compatibility stub: AgentSessionClient was replaced by AgentApiWrapper in Phase 4.""" diff --git a/tests/adapter/matrix/test_dispatcher.py b/tests/adapter/matrix/test_dispatcher.py index ad4746c..1f9f4d2 100644 --- a/tests/adapter/matrix/test_dispatcher.py +++ b/tests/adapter/matrix/test_dispatcher.py @@ -1,5 +1,6 @@ from __future__ import annotations +import importlib from types import SimpleNamespace from unittest.mock import AsyncMock @@ -10,6 +11,7 @@ from adapter.matrix.bot import MatrixBot, build_runtime, prepare_live_sync from adapter.matrix.handlers.auth import handle_invite from adapter.matrix.store import get_room_meta, get_user_meta, set_user_meta from core.protocol import IncomingCallback, IncomingCommand, OutgoingMessage +from sdk.interface import PlatformError from sdk.mock import MockPlatformClient from sdk.real import RealPlatformClient @@ -199,6 +201,31 @@ async def test_bot_ignores_its_own_messages(): bot._send_all.assert_not_awaited() +async def test_bot_degrades_platform_errors_to_user_reply(): + runtime = build_runtime(platform=MockPlatformClient()) + client = SimpleNamespace( + user_id="@bot:example.org", + room_send=AsyncMock(), + ) + bot = MatrixBot(client, runtime) + runtime.dispatcher.dispatch = AsyncMock( + side_effect=PlatformError("Missing Authentication header", code="401") + ) + room = SimpleNamespace(room_id="!dm:example.org") + event = SimpleNamespace(sender="@alice:example.org", body="hello") + + await bot.on_room_message(room, event) + + client.room_send.assert_awaited_once_with( + "!dm:example.org", + "m.room.message", + { + "msgtype": "m.text", + "body": "Сервис временно недоступен. Попробуйте ещё раз позже.", + }, + ) + + async def test_mat11_settings_returns_dashboard(): runtime = build_runtime(platform=MockPlatformClient()) current_chat_id = "C9" @@ -260,9 +287,18 @@ async def test_prepare_live_sync_returns_next_batch_from_bootstrap_sync(): async def test_build_runtime_uses_real_platform_when_matrix_backend_is_real(monkeypatch): + bot_module = importlib.import_module("adapter.matrix.bot") + + class FakeAgentApiWrapper: + def __init__(self, agent_id: str, url: str) -> None: + self.agent_id = agent_id + self.url = url + + monkeypatch.setattr(bot_module, "AgentApiWrapper", FakeAgentApiWrapper) monkeypatch.setenv("MATRIX_PLATFORM_BACKEND", "real") monkeypatch.setenv("AGENT_WS_URL", "ws://agent.example/agent_ws/") runtime = build_runtime() assert isinstance(runtime.platform, RealPlatformClient) + assert runtime.platform.agent_api.url == "ws://agent.example/agent_ws/" diff --git a/tests/core/test_integration.py b/tests/core/test_integration.py index db2cf8f..ab8fc8c 100644 --- a/tests/core/test_integration.py +++ b/tests/core/test_integration.py @@ -5,7 +5,6 @@ Smoke test: полный цикл через dispatcher + реальные manag """ import pytest from sdk.mock import MockPlatformClient -from sdk.agent_session import build_thread_key from sdk.interface import MessageChunk, MessageResponse from sdk.prototype_state import PrototypeStateStore from sdk.real import RealPlatformClient @@ -22,28 +21,15 @@ from core.protocol import ( ) -class FakeAgentSessionClient: +class FakeAgentApi: def __init__(self) -> None: - self.send_calls: list[tuple[str, str]] = [] + self.calls: list[str] = [] + self.last_tokens_used = 0 - async def send_message(self, *, thread_key: str, text: str) -> MessageResponse: - self.send_calls.append((thread_key, text)) - return MessageResponse( - message_id=thread_key, - response=f"[REAL] {text}", - tokens_used=5, - finished=True, - ) - - async def stream_message(self, *, thread_key: str, text: str): - self.send_calls.append((thread_key, text)) - if False: - yield MessageChunk( - message_id=thread_key, - delta=text, - tokens_used=0, - finished=True, - ) + async def send_message(self, text: str): + self.calls.append(text) + yield type("Chunk", (), {"text": f"[REAL] {text}"})() + self.last_tokens_used = 5 @pytest.fixture @@ -62,9 +48,9 @@ def dispatcher(): @pytest.fixture def real_dispatcher(): - agent_sessions = FakeAgentSessionClient() + agent_api = FakeAgentApi() platform = RealPlatformClient( - agent_sessions=agent_sessions, + agent_api=agent_api, prototype_state=PrototypeStateStore(), platform="matrix", ) @@ -76,7 +62,7 @@ def real_dispatcher(): settings_mgr=SettingsManager(platform, store), ) register_all(d) - return d, agent_sessions + return d, agent_api async def test_full_flow_start_then_message(dispatcher): @@ -132,8 +118,8 @@ async def test_toggle_skill_callback(dispatcher): assert any("browser" in r.text for r in result if isinstance(r, OutgoingMessage)) -async def test_full_flow_with_real_platform_uses_thread_key(real_dispatcher): - dispatcher, agent_sessions = real_dispatcher +async def test_full_flow_with_real_platform_uses_shared_agent_api(real_dispatcher): + dispatcher, agent_api = real_dispatcher start = IncomingCommand(user_id="u1", platform="matrix", chat_id="C1", command="start") result = await dispatcher.dispatch(start) @@ -144,6 +130,4 @@ async def test_full_flow_with_real_platform_uses_thread_key(real_dispatcher): texts = [r.text for r in result if isinstance(r, OutgoingMessage)] assert texts == ["[REAL] Привет!"] - assert agent_sessions.send_calls == [ - (build_thread_key("matrix", "u1", "C1"), "Привет!") - ] + assert agent_api.calls == ["Привет!"] diff --git a/tests/platform/test_agent_session.py b/tests/platform/test_agent_session.py index 2d085c3..7f419e8 100644 --- a/tests/platform/test_agent_session.py +++ b/tests/platform/test_agent_session.py @@ -1,193 +1,21 @@ +"""Compatibility tests after the Phase 4 migration.""" + import sys from pathlib import Path -from types import ModuleType - -import pytest -from aiohttp import web - -from sdk.interface import MessageChunk, MessageResponse -from sdk.agent_session import AgentSessionClient, AgentSessionConfig, build_thread_key - -AGENT_ROOT = Path(__file__).resolve().parents[2] / "external" / "platform-agent" -AGENT_API_ROOT = Path(__file__).resolve().parents[2] / "external" / "platform-agent_api" -for path in (AGENT_ROOT, AGENT_API_ROOT): - if str(path) not in sys.path: - sys.path.insert(0, str(path)) - -if "fastapi" not in sys.modules: - fastapi = ModuleType("fastapi") - - class _Router: - def websocket(self, _path: str): - def decorator(fn): - return fn - - return decorator - - class _WebSocketDisconnect(Exception): - pass - - def _depends(value): - return value - - fastapi.APIRouter = _Router - fastapi.WebSocket = object - fastapi.WebSocketDisconnect = _WebSocketDisconnect - fastapi.Depends = _depends - sys.modules["fastapi"] = fastapi - -if "src.agent" not in sys.modules: - agent_module = ModuleType("src.agent") - - class _AgentService: - async def astream(self, text: str, thread_id: str): - yield text - - def _get_agent_service(): - return _AgentService() - - agent_module.AgentService = _AgentService - agent_module.get_agent_service = _get_agent_service - sys.modules["src.agent"] = agent_module - -from lambda_agent_api.client import MsgUserMessage # noqa: E402 -from src.api.external import process_message # noqa: E402 -def test_build_thread_key_uses_platform_user_and_chat_id(): - assert build_thread_key("matrix", "@alice:example.org", "C1") == "6:matrix18:@alice:example.org2:C1" +_api_root = Path(__file__).resolve().parents[2] / "external" / "platform-agent_api" +if str(_api_root) not in sys.path: + sys.path.insert(0, str(_api_root)) -def test_build_thread_key_does_not_collide_when_user_id_contains_colons(): - left = build_thread_key("matrix", "@alice:example.org", "C1") - right = build_thread_key("matrix", "@alice", "example.org:C1") +def test_lambda_agent_api_module_is_importable(): + from lambda_agent_api.agent_api import AgentApi - assert left != right + assert AgentApi is not None -@pytest.mark.asyncio -async def test_stream_message_yields_text_chunks_and_end(aiohttp_server): - thread_key = build_thread_key("matrix", "@alice:example.org", "C1") +def test_agent_session_module_is_intentionally_stubbed(): + contents = Path(__file__).resolve().parents[2] / "sdk" / "agent_session.py" - async def handler(request): - ws = web.WebSocketResponse() - await ws.prepare(request) - - assert request.query["thread_id"] == thread_key - - await ws.send_json({"type": "STATUS"}) - - message = await ws.receive_json() - assert message == {"type": "USER_MESSAGE", "text": "hello"} - - await ws.send_json({"type": "AGENT_EVENT_TEXT_CHUNK", "text": "hel"}) - await ws.send_json({"type": "AGENT_EVENT_TEXT_CHUNK", "text": "lo"}) - await ws.send_json({"type": "AGENT_EVENT_END", "tokens_used": 7}) - await ws.close() - return ws - - app = web.Application() - app.router.add_get("/agent_ws/", handler) - server = await aiohttp_server(app) - - client = AgentSessionClient(AgentSessionConfig(base_ws_url=str(server.make_url("/agent_ws/")))) - - chunks = [] - async for chunk in client.stream_message( - thread_key=thread_key, - text="hello", - ): - chunks.append(chunk) - - assert chunks == [ - MessageChunk(message_id=thread_key, delta="hel", finished=False, tokens_used=0), - MessageChunk(message_id=thread_key, delta="lo", finished=False, tokens_used=0), - MessageChunk(message_id=thread_key, delta="", finished=True, tokens_used=7), - ] - - -@pytest.mark.asyncio -async def test_send_message_collects_streamed_chunks_and_tokens(aiohttp_server): - thread_key = build_thread_key("matrix", "@alice:example.org", "C1") - - async def handler(request): - ws = web.WebSocketResponse() - await ws.prepare(request) - - assert request.query["thread_id"] == thread_key - - await ws.send_json({"type": "STATUS"}) - - message = await ws.receive_json() - assert message == {"type": "USER_MESSAGE", "text": "hello world"} - - await ws.send_json({"type": "AGENT_EVENT_TEXT_CHUNK", "text": "hello "}) - await ws.send_json({"type": "AGENT_EVENT_TEXT_CHUNK", "text": "world"}) - await ws.send_json({"type": "AGENT_EVENT_END", "tokens_used": 11}) - await ws.close() - return ws - - app = web.Application() - app.router.add_get("/agent_ws/", handler) - server = await aiohttp_server(app) - - client = AgentSessionClient(AgentSessionConfig(base_ws_url=str(server.make_url("/agent_ws/")))) - - result = await client.send_message( - thread_key=thread_key, - text="hello world", - ) - - assert result == MessageResponse( - message_id=thread_key, - response="hello world", - tokens_used=11, - finished=True, - ) - - -@pytest.mark.asyncio -async def test_process_message_requires_thread_id_query_param(): - class FakeWebSocket: - query_params = {} - - async def send_text(self, text: str) -> None: - raise AssertionError(f"send_text should not be called: {text}") - - class FakeAgentService: - async def astream(self, text: str, thread_id: str): - yield text - - with pytest.raises(ValueError, match="thread_id query parameter is required"): - await process_message( - FakeWebSocket(), - MsgUserMessage(text="hello"), - FakeAgentService(), - ) - - -@pytest.mark.asyncio -async def test_process_message_passes_thread_id_to_agent_service(): - class FakeWebSocket: - def __init__(self) -> None: - self.query_params = {"thread_id": "6:matrix18:@alice:example.org2:C1"} - self.sent_messages: list[str] = [] - - async def send_text(self, text: str) -> None: - self.sent_messages.append(text) - - class FakeAgentService: - def __init__(self) -> None: - self.calls: list[tuple[str, str]] = [] - - async def astream(self, text: str, thread_id: str): - self.calls.append((text, thread_id)) - yield "hello" - - ws = FakeWebSocket() - agent_service = FakeAgentService() - await process_message(ws, MsgUserMessage(text="hello"), agent_service) - - assert agent_service.calls == [("hello", "6:matrix18:@alice:example.org2:C1")] - assert any("AGENT_EVENT_TEXT_CHUNK" in message for message in ws.sent_messages) - assert any("AGENT_EVENT_END" in message for message in ws.sent_messages) + assert "replaced by AgentApiWrapper" in contents.read_text() diff --git a/tests/platform/test_real.py b/tests/platform/test_real.py index 7225cfd..1255888 100644 --- a/tests/platform/test_real.py +++ b/tests/platform/test_real.py @@ -1,36 +1,27 @@ import pytest from core.protocol import SettingsAction -from sdk.agent_session import build_thread_key from sdk.interface import MessageChunk, MessageResponse, UserSettings from sdk.prototype_state import PrototypeStateStore from sdk.real import RealPlatformClient -class FakeAgentSessionClient: +class FakeAgentApi: def __init__(self) -> None: - self.send_calls: list[tuple[str, str]] = [] - self.stream_calls: list[tuple[str, str]] = [] + self.calls: list[str] = [] + self.last_tokens_used = 0 - async def send_message(self, *, thread_key: str, text: str) -> MessageResponse: - self.send_calls.append((thread_key, text)) - return MessageResponse( - message_id=thread_key, - response=f"echo:{text}", - tokens_used=3, - finished=True, - ) - - async def stream_message(self, *, thread_key: str, text: str): - self.stream_calls.append((thread_key, text)) - yield MessageChunk(message_id=thread_key, delta=text[:2], finished=False) - yield MessageChunk(message_id=thread_key, delta=text[2:], finished=True, tokens_used=3) + async def send_message(self, text: str): + self.calls.append(text) + yield type("Chunk", (), {"text": text[:2]})() + yield type("Chunk", (), {"text": text[2:]})() + self.last_tokens_used = 3 @pytest.mark.asyncio async def test_real_platform_client_get_or_create_user_uses_local_state(): client = RealPlatformClient( - agent_sessions=FakeAgentSessionClient(), + agent_api=FakeAgentApi(), prototype_state=PrototypeStateStore(), ) @@ -45,61 +36,65 @@ async def test_real_platform_client_get_or_create_user_uses_local_state(): @pytest.mark.asyncio -async def test_real_platform_client_send_message_uses_surface_user_thread_identity(): - agent_sessions = FakeAgentSessionClient() +async def test_real_platform_client_send_message_collects_stream_output(): + agent_api = FakeAgentApi() client = RealPlatformClient( - agent_sessions=agent_sessions, + agent_api=agent_api, prototype_state=PrototypeStateStore(), platform="matrix", ) - thread_key = build_thread_key("matrix", "@alice:example.org", "C1") result = await client.send_message("@alice:example.org", "C1", "hello") assert result == MessageResponse( - message_id=thread_key, - response="echo:hello", + message_id="@alice:example.org", + response="hello", tokens_used=3, finished=True, ) - assert agent_sessions.send_calls == [(thread_key, "hello")] + assert agent_api.calls == ["hello"] @pytest.mark.asyncio -async def test_real_platform_client_stream_message_uses_surface_user_thread_identity(): - agent_sessions = FakeAgentSessionClient() +async def test_real_platform_client_stream_message_emits_final_tokens_chunk(): + agent_api = FakeAgentApi() client = RealPlatformClient( - agent_sessions=agent_sessions, + agent_api=agent_api, prototype_state=PrototypeStateStore(), platform="matrix", ) - thread_key = build_thread_key("matrix", "@alice:example.org", "C1") chunks = [] async for chunk in client.stream_message("@alice:example.org", "C1", "hello"): chunks.append(chunk) assert chunks == [ MessageChunk( - message_id=thread_key, + message_id="@alice:example.org", delta="he", finished=False, tokens_used=0, ), MessageChunk( - message_id=thread_key, + message_id="@alice:example.org", delta="llo", + finished=False, + tokens_used=0, + ), + MessageChunk( + message_id="@alice:example.org", + delta="", finished=True, tokens_used=3, ), ] - assert agent_sessions.stream_calls == [(thread_key, "hello")] + assert agent_api.calls == ["hello"] @pytest.mark.asyncio async def test_real_platform_client_settings_are_local(): client = RealPlatformClient( - agent_sessions=FakeAgentSessionClient(), + agent_api=FakeAgentApi(), prototype_state=PrototypeStateStore(), platform="matrix", )