From 1fdb5bf303d6029ef5d83d9a9c474db12b941546 Mon Sep 17 00:00:00 2001 From: Mikhail Putilovskij Date: Wed, 8 Apr 2026 00:22:20 +0300 Subject: [PATCH 01/10] docs: add matrix direct-agent prototype design --- ...08-matrix-direct-agent-prototype-design.md | 243 ++++++++++++++++++ 1 file changed, 243 insertions(+) create mode 100644 docs/superpowers/specs/2026-04-08-matrix-direct-agent-prototype-design.md diff --git a/docs/superpowers/specs/2026-04-08-matrix-direct-agent-prototype-design.md b/docs/superpowers/specs/2026-04-08-matrix-direct-agent-prototype-design.md new file mode 100644 index 0000000..581eb56 --- /dev/null +++ b/docs/superpowers/specs/2026-04-08-matrix-direct-agent-prototype-design.md @@ -0,0 +1,243 @@ +# Matrix Direct-Agent Prototype Design + +## Goal + +Ship a working Matrix prototype that talks to the real Lambda agent instead of `MockPlatformClient`, while preserving the current Matrix adapter logic and keeping the code expandable toward future platform versions. + +## Scope + +This design is for a Matrix-only prototype delivered from this repository on a dedicated branch. It is not a `main` branch rollout and it is not a separate prototype repo. + +The design assumes a minimal companion fork of `platform/agent` may be used, but changes there must stay as small as possible. + +## Constraints + +- Preserve the current Matrix transport logic as much as possible. +- Keep `core/` unaware of platform immaturity. +- Avoid broad changes to platform repos. +- Prefer one narrow patch to `platform/agent` over changes to both `platform/agent` and `platform/agent_api`. +- Keep the backend boundary reusable for future Telegram or other surfaces. +- Do not pretend unsupported platform capabilities are real. + +## Live Platform Findings + +Based on the live repo analysis performed on April 7, 2026: + +- `platform/master` is not yet a usable consumer-facing backend for surfaces. +- `platform/agent` exposes a working WebSocket endpoint for prompt/response exchange. +- `platform/agent_api` documents and implements text-oriented WebSocket messaging, but the bot does not need to depend on that package directly. +- `platform/agent` currently hardcodes a single shared backend memory thread via `thread_id = "default"`, which would cause all chats to share context. + +## Architecture + +The prototype remains in this repo and introduces a new real backend path behind the existing SDK boundary. + +### New files + +- `sdk/real.py` + - Exports `RealPlatformClient` + - Implements the existing `PlatformClient` contract from `sdk/interface.py` + - Composes the lower-level prototype pieces + +- `sdk/agent_session.py` + - Owns direct WebSocket communication with the real agent + - Manages connection lifecycle, request/response handling, and thread identity + +- `sdk/prototype_state.py` + - Owns local prototype-only state + - Stores user mapping, local settings, and lightweight metadata needed until a real control plane exists + +### Responsibility split + +- Matrix adapter remains transport-specific only. +- `core/` continues to depend only on `PlatformClient`. +- `RealPlatformClient` acts as the anti-corruption layer between the current bot contract and the platform’s incomplete shape. +- Local control-plane behavior remains explicit and replaceable later. + +## Message and Identity Model + +Each Matrix chat gets a stable backend session identity. + +### Surface identity + +- Surface: `matrix` +- Surface user id: Matrix MXID, for example `@alice:example.org` +- Surface chat id: logical chat id from `ChatManager`, for example `C1` +- Surface ref: Matrix room id + +### Backend thread identity + +Use a deterministic thread key: + +`matrix:{matrix_user_id}:{chat_id}` + +Example: + +`matrix:@alice:example.org:C1` + +### Mapping rules + +- One Matrix logical chat maps to one backend memory thread. +- `!new` creates a fresh logical chat and therefore a fresh backend thread. +- `!rename` only changes display metadata and does not change backend identity. +- `!archive` stops active use of the thread in the surface, but does not need to delete backend memory in v1. + +## Runtime Flow + +### Normal message flow + +1. Matrix event arrives in an existing room. +2. Existing Matrix routing resolves room to logical `chat_id`. +3. `core/handlers/message.py` calls `platform.send_message(...)`. +4. `RealPlatformClient` derives the backend thread key from `(platform, user_id, chat_id)`. +5. `AgentSessionClient` sends the prompt to the agent WebSocket using that thread key. +6. The reply is converted into the existing `MessageResponse` or `MessageChunk` contract. +7. Matrix sends the final text back to the room. + +### Settings flow + +For v1, settings remain local: + +- `get_settings()` reads from local prototype state +- `update_settings()` writes to local prototype state + +This is intentional. The prototype must not claim settings are backed by the real platform when no such platform API exists yet. + +## Feature Matrix + +### Real in v1 + +- `!start` +- Plain text messaging with the real agent +- Matrix chat lifecycle already implemented in this repo: + - `!new` + - `!chats` + - `!rename` + - `!archive` +- Per-chat conversation memory, provided the agent accepts dynamic thread identity + +### Local in v1 + +- `!settings` +- `!skills` +- `!soul` +- `!safety` +- `!status` +- user registration and local user mapping + +### Deferred + +- Attachments and file upload to the agent +- Voice input to the agent +- Image input to the agent +- Long-running task callbacks and webhook-style async completion +- Real control-plane integration through `platform/master` + +## Minimal Upstream Change + +To avoid shared memory across all conversations, make one narrow change in the forked `platform/agent` repo: + +- stop hardcoding `thread_id = "default"` +- derive thread identity from WebSocket connection context + +### Preferred mechanism + +Read `thread_id` from WebSocket query parameters rather than changing the message payload format. + +Example: + +`ws://host:port/agent_ws/?thread_id=matrix:@alice:example.org:C1` + +This is preferred because: + +- it limits the platform patch to one repo +- it avoids changing both server and SDK protocol shape +- it keeps the client message body text-only +- it makes session identity explicit and easy to reason about + +## Why Not Use `platform/agent_api` Directly + +The bot should not depend on their client package for the prototype. + +Reasons: + +- the bot already has its own internal integration boundary in `sdk/interface.py` +- a tiny local WebSocket client is enough for this protocol +- avoiding a dependency on `platform/agent_api` keeps rebasing simpler +- if upstream stabilizes later, the bot can adopt their SDK without affecting Matrix handlers + +## Repo Strategy + +### This repo + +Owns: + +- Matrix surface logic +- SDK compatibility layer +- local prototype state +- backend selection and wiring + +### Forked `platform/agent` + +Owns only: + +- minimal thread identity patch required for per-chat memory + +### Explicitly not doing + +- no separate prototype repo +- no changes to `platform/master` for v1 +- no unnecessary changes to `platform/agent_api` + +## Migration Path + +This design is intentionally expandable. + +When the platform develops further: + +- `sdk/prototype_state.py` can be replaced or reduced by a real `MasterClient` +- `sdk/agent_session.py` can remain the direct session transport if still relevant +- `RealPlatformClient` can continue to present the stable bot-facing interface +- Telegram or another surface can reuse the same backend components without rethinking the integration model + +## Risks + +### Risk: hidden platform assumptions leak upward + +Mitigation: +- keep all direct-agent logic below `RealPlatformClient` +- avoid changing `core/` contracts for prototype convenience + +### Risk: settings semantics drift from future platform reality + +Mitigation: +- make local settings behavior explicit in code and docs +- keep settings isolated in `sdk/prototype_state.py` + +### Risk: upstream `agent` fork diverges + +Mitigation: +- keep the patch minimal and narrowly scoped to thread identity + +### Risk: thread identity source is unstable + +Mitigation: +- derive thread key from existing stable bot-side identities: platform, surface user id, logical chat id + +## Testing Strategy + +- Unit tests for `sdk/agent_session.py` request/response behavior +- Unit tests for `sdk/prototype_state.py` local settings and user mapping +- Unit tests for `sdk/real.py` contract compliance with `PlatformClient` +- Matrix integration tests confirming: + - existing commands still work + - different logical chats map to different backend thread keys + - rename does not change thread identity + - archive stops reuse from the surface perspective + +## Success Criteria + +- Matrix can talk to the real agent without rewriting the Matrix adapter architecture +- Chats do not share backend memory accidentally +- Unsupported platform capabilities remain local or deferred rather than being faked as “real” +- The backend boundary remains suitable for later Telegram or other surfaces From de20ff638a0951803f9b19fb7d6b80ce137429b1 Mon Sep 17 00:00:00 2001 From: Mikhail Putilovskij Date: Wed, 8 Apr 2026 01:00:02 +0300 Subject: [PATCH 02/10] feat: add direct agent session transport --- sdk/agent_session.py | 88 ++++++++++++++++++++++++++++ tests/platform/test_agent_session.py | 86 +++++++++++++++++++++++++++ 2 files changed, 174 insertions(+) create mode 100644 sdk/agent_session.py create mode 100644 tests/platform/test_agent_session.py diff --git a/sdk/agent_session.py b/sdk/agent_session.py new file mode 100644 index 0000000..6f90e3f --- /dev/null +++ b/sdk/agent_session.py @@ -0,0 +1,88 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import AsyncIterator +from urllib.parse import parse_qsl, urlencode, urlsplit, urlunsplit + +import aiohttp + +from sdk.interface import MessageChunk, MessageResponse, PlatformError + + +def build_thread_key(platform: str, user_id: str, chat_id: str) -> str: + return f"{platform}:{user_id}:{chat_id}" + + +@dataclass(frozen=True, slots=True) +class AgentSessionConfig: + base_ws_url: str + timeout_seconds: float = 30.0 + + +class AgentSessionClient: + def __init__(self, config: AgentSessionConfig) -> None: + self._config = config + + async def send_message(self, *, thread_key: str, text: str) -> MessageResponse: + response_parts: list[str] = [] + tokens_used = 0 + + async for chunk in self.stream_message(thread_key=thread_key, text=text): + if chunk.delta: + response_parts.append(chunk.delta) + if chunk.finished: + tokens_used = chunk.tokens_used + + return MessageResponse( + message_id=thread_key, + response="".join(response_parts), + tokens_used=tokens_used, + finished=True, + ) + + async def stream_message(self, *, thread_key: str, text: str) -> AsyncIterator[MessageChunk]: + async with aiohttp.ClientSession() as session: + async with session.ws_connect( + self._ws_url(thread_key), + heartbeat=30, + ) as ws: + status = await ws.receive_json(timeout=self._config.timeout_seconds) + if status.get("type") != "STATUS": + raise PlatformError("Agent did not send STATUS", code="AGENT_PROTOCOL_ERROR") + + await ws.send_json({"type": "USER_MESSAGE", "text": text}) + + while True: + payload = await ws.receive_json(timeout=self._config.timeout_seconds) + msg_type = payload.get("type") + + if msg_type == "AGENT_EVENT_TEXT_CHUNK": + yield MessageChunk( + message_id=thread_key, + delta=payload["text"], + finished=False, + ) + elif msg_type == "AGENT_EVENT_END": + yield MessageChunk( + message_id=thread_key, + delta="", + finished=True, + tokens_used=payload.get("tokens_used", 0), + ) + return + elif msg_type == "ERROR": + raise PlatformError( + payload.get("details", "Agent error"), + code=payload.get("code", "AGENT_ERROR"), + ) + else: + raise PlatformError( + f"Unexpected agent message: {payload}", + code="AGENT_PROTOCOL_ERROR", + ) + + def _ws_url(self, thread_key: str) -> str: + parts = urlsplit(self._config.base_ws_url) + query = dict(parse_qsl(parts.query, keep_blank_values=True)) + query["thread_id"] = thread_key + return urlunsplit(parts._replace(query=urlencode(query))) diff --git a/tests/platform/test_agent_session.py b/tests/platform/test_agent_session.py new file mode 100644 index 0000000..a1d9dd6 --- /dev/null +++ b/tests/platform/test_agent_session.py @@ -0,0 +1,86 @@ +import pytest +from aiohttp import web + +from sdk.interface import MessageChunk, MessageResponse +from sdk.agent_session import AgentSessionClient, AgentSessionConfig, build_thread_key + + +def test_build_thread_key_uses_platform_user_and_chat_id(): + assert build_thread_key("matrix", "@alice:example.org", "C1") == "matrix:@alice:example.org:C1" + + +@pytest.mark.asyncio +async def test_stream_message_yields_text_chunks_and_end(aiohttp_server): + async def handler(request): + ws = web.WebSocketResponse() + await ws.prepare(request) + + assert request.query["thread_id"] == "matrix:@alice:example.org:C1" + + await ws.send_json({"type": "STATUS"}) + + message = await ws.receive_json() + assert message == {"type": "USER_MESSAGE", "text": "hello"} + + await ws.send_json({"type": "AGENT_EVENT_TEXT_CHUNK", "text": "hel"}) + await ws.send_json({"type": "AGENT_EVENT_TEXT_CHUNK", "text": "lo"}) + await ws.send_json({"type": "AGENT_EVENT_END", "tokens_used": 7}) + await ws.close() + return ws + + app = web.Application() + app.router.add_get("/agent_ws/", handler) + server = await aiohttp_server(app) + + client = AgentSessionClient(AgentSessionConfig(base_ws_url=str(server.make_url("/agent_ws/")))) + + chunks = [] + async for chunk in client.stream_message( + thread_key="matrix:@alice:example.org:C1", + text="hello", + ): + chunks.append(chunk) + + assert chunks == [ + MessageChunk(message_id="matrix:@alice:example.org:C1", delta="hel", finished=False, tokens_used=0), + MessageChunk(message_id="matrix:@alice:example.org:C1", delta="lo", finished=False, tokens_used=0), + MessageChunk(message_id="matrix:@alice:example.org:C1", delta="", finished=True, tokens_used=7), + ] + + +@pytest.mark.asyncio +async def test_send_message_collects_streamed_chunks_and_tokens(aiohttp_server): + async def handler(request): + ws = web.WebSocketResponse() + await ws.prepare(request) + + assert request.query["thread_id"] == "matrix:@alice:example.org:C1" + + await ws.send_json({"type": "STATUS"}) + + message = await ws.receive_json() + assert message == {"type": "USER_MESSAGE", "text": "hello world"} + + await ws.send_json({"type": "AGENT_EVENT_TEXT_CHUNK", "text": "hello "}) + await ws.send_json({"type": "AGENT_EVENT_TEXT_CHUNK", "text": "world"}) + await ws.send_json({"type": "AGENT_EVENT_END", "tokens_used": 11}) + await ws.close() + return ws + + app = web.Application() + app.router.add_get("/agent_ws/", handler) + server = await aiohttp_server(app) + + client = AgentSessionClient(AgentSessionConfig(base_ws_url=str(server.make_url("/agent_ws/")))) + + result = await client.send_message( + thread_key="matrix:@alice:example.org:C1", + text="hello world", + ) + + assert result == MessageResponse( + message_id="matrix:@alice:example.org:C1", + response="hello world", + tokens_used=11, + finished=True, + ) From 2fad1aaa66ff1d8687d94eadc747a5fd83337e7a Mon Sep 17 00:00:00 2001 From: Mikhail Putilovskij Date: Wed, 8 Apr 2026 01:25:46 +0300 Subject: [PATCH 03/10] feat: add prototype local state store --- sdk/prototype_state.py | 79 ++++++++++++++++++++++++++ tests/platform/test_prototype_state.py | 75 ++++++++++++++++++++++++ 2 files changed, 154 insertions(+) create mode 100644 sdk/prototype_state.py create mode 100644 tests/platform/test_prototype_state.py diff --git a/sdk/prototype_state.py b/sdk/prototype_state.py new file mode 100644 index 0000000..4682bd9 --- /dev/null +++ b/sdk/prototype_state.py @@ -0,0 +1,79 @@ +from __future__ import annotations + +from datetime import UTC, datetime +from typing import Any + +from sdk.interface import User, UserSettings + +# Keep the prototype backend self-contained; do not import these from sdk.mock. +DEFAULT_SKILLS: dict[str, bool] = { + "web-search": True, + "fetch-url": True, + "email": False, + "browser": False, + "image-gen": False, + "files": True, +} +DEFAULT_SAFETY: dict[str, bool] = { + "email-send": True, + "file-delete": True, + "social-post": True, +} +DEFAULT_SOUL: dict[str, str] = {"name": "Лямбда", "instructions": ""} +DEFAULT_PLAN: dict[str, Any] = { + "name": "Beta", + "tokens_used": 0, + "tokens_limit": 1000, +} + + +class PrototypeStateStore: + def __init__(self) -> None: + self._users: dict[str, User] = {} + self._settings: dict[str, dict[str, Any]] = {} + + async def get_or_create_user( + self, + *, + external_id: str, + platform: str, + display_name: str | None = None, + ) -> User: + key = f"{platform}:{external_id}" + existing = self._users.get(key) + if existing is not None: + return existing.model_copy(update={"is_new": False}) + + user = User( + user_id=f"usr-{platform}-{external_id}", + external_id=external_id, + platform=platform, + display_name=display_name, + created_at=datetime.now(UTC), + is_new=True, + ) + self._users[key] = user + return user + + async def get_settings(self, user_id: str) -> UserSettings: + stored = self._settings.get(user_id, {}) + return UserSettings( + skills={**DEFAULT_SKILLS, **stored.get("skills", {})}, + connectors=stored.get("connectors", {}), + soul={**DEFAULT_SOUL, **stored.get("soul", {})}, + safety={**DEFAULT_SAFETY, **stored.get("safety", {})}, + plan={**DEFAULT_PLAN, **stored.get("plan", {})}, + ) + + async def update_settings(self, user_id: str, action: Any) -> None: + settings = self._settings.setdefault(user_id, {}) + + if action.action == "toggle_skill": + skills = settings.setdefault("skills", DEFAULT_SKILLS.copy()) + skills[action.payload["skill"]] = action.payload.get("enabled", True) + elif action.action == "set_soul": + soul = settings.setdefault("soul", DEFAULT_SOUL.copy()) + soul[action.payload["field"]] = action.payload["value"] + elif action.action == "set_safety": + safety = settings.setdefault("safety", DEFAULT_SAFETY.copy()) + safety[action.payload["trigger"]] = action.payload.get("enabled", True) diff --git a/tests/platform/test_prototype_state.py b/tests/platform/test_prototype_state.py new file mode 100644 index 0000000..2a49375 --- /dev/null +++ b/tests/platform/test_prototype_state.py @@ -0,0 +1,75 @@ +import pytest + +from core.protocol import SettingsAction +from sdk.interface import UserSettings +from sdk.prototype_state import PrototypeStateStore + + +@pytest.mark.asyncio +async def test_get_or_create_user_is_stable_per_surface_identity(): + store = PrototypeStateStore() + + first = await store.get_or_create_user( + external_id="@alice:example.org", + platform="matrix", + display_name="Alice", + ) + second = await store.get_or_create_user( + external_id="@alice:example.org", + platform="matrix", + ) + + assert first.user_id == "usr-matrix-@alice:example.org" + assert first.is_new is True + assert second.user_id == first.user_id + assert second.is_new is False + assert second.display_name == "Alice" + + +@pytest.mark.asyncio +async def test_settings_defaults_match_existing_mock_shape(): + store = PrototypeStateStore() + + settings = await store.get_settings("usr-matrix-@alice:example.org") + + assert isinstance(settings, UserSettings) + assert settings.skills == { + "web-search": True, + "fetch-url": True, + "email": False, + "browser": False, + "image-gen": False, + "files": True, + } + assert settings.safety == { + "email-send": True, + "file-delete": True, + "social-post": True, + } + assert settings.soul == {"name": "Лямбда", "instructions": ""} + assert settings.plan == {"name": "Beta", "tokens_used": 0, "tokens_limit": 1000} + + +@pytest.mark.asyncio +async def test_update_settings_supports_toggle_skill_and_setters(): + store = PrototypeStateStore() + + await store.update_settings( + "usr-matrix-@alice:example.org", + SettingsAction(action="toggle_skill", payload={"skill": "browser", "enabled": True}), + ) + await store.update_settings( + "usr-matrix-@alice:example.org", + SettingsAction(action="set_soul", payload={"field": "instructions", "value": "Be concise"}), + ) + await store.update_settings( + "usr-matrix-@alice:example.org", + SettingsAction(action="set_safety", payload={"trigger": "social-post", "enabled": False}), + ) + + settings = await store.get_settings("usr-matrix-@alice:example.org") + + assert settings.skills["browser"] is True + assert settings.skills["web-search"] is True + assert settings.soul["instructions"] == "Be concise" + assert settings.safety["social-post"] is False From 083be77404cf507e55e53af4f698e98080887ba7 Mon Sep 17 00:00:00 2001 From: Mikhail Putilovskij Date: Wed, 8 Apr 2026 01:25:52 +0300 Subject: [PATCH 04/10] fix(agent): collision-safe thread keys --- sdk/agent_session.py | 7 ++++++- tests/platform/test_agent_session.py | 9 ++++++++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/sdk/agent_session.py b/sdk/agent_session.py index 6f90e3f..8b7c7b3 100644 --- a/sdk/agent_session.py +++ b/sdk/agent_session.py @@ -10,7 +10,7 @@ from sdk.interface import MessageChunk, MessageResponse, PlatformError def build_thread_key(platform: str, user_id: str, chat_id: str) -> str: - return f"{platform}:{user_id}:{chat_id}" + return f"{len(platform)}:{platform}{len(user_id)}:{user_id}{len(chat_id)}:{chat_id}" @dataclass(frozen=True, slots=True) @@ -75,6 +75,11 @@ class AgentSessionClient: payload.get("details", "Agent error"), code=payload.get("code", "AGENT_ERROR"), ) + elif msg_type == "GRACEFUL_DISCONNECT": + raise PlatformError( + "Agent disconnected gracefully", + code="GRACEFUL_DISCONNECT", + ) else: raise PlatformError( f"Unexpected agent message: {payload}", diff --git a/tests/platform/test_agent_session.py b/tests/platform/test_agent_session.py index a1d9dd6..bd38b27 100644 --- a/tests/platform/test_agent_session.py +++ b/tests/platform/test_agent_session.py @@ -6,7 +6,14 @@ from sdk.agent_session import AgentSessionClient, AgentSessionConfig, build_thre def test_build_thread_key_uses_platform_user_and_chat_id(): - assert build_thread_key("matrix", "@alice:example.org", "C1") == "matrix:@alice:example.org:C1" + assert build_thread_key("matrix", "@alice:example.org", "C1") == "6:matrix18:@alice:example.org2:C1" + + +def test_build_thread_key_does_not_collide_when_user_id_contains_colons(): + left = build_thread_key("matrix", "@alice:example.org", "C1") + right = build_thread_key("matrix", "@alice", "example.org:C1") + + assert left != right @pytest.mark.asyncio From 19c85db89a1ef9e4a2e923c90962641cf08e98f6 Mon Sep 17 00:00:00 2001 From: Mikhail Putilovskij Date: Wed, 8 Apr 2026 01:29:02 +0300 Subject: [PATCH 05/10] Persist canonical prototype user state --- sdk/prototype_state.py | 4 +++- tests/platform/test_prototype_state.py | 1 + 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/sdk/prototype_state.py b/sdk/prototype_state.py index 4682bd9..78243e4 100644 --- a/sdk/prototype_state.py +++ b/sdk/prototype_state.py @@ -42,7 +42,9 @@ class PrototypeStateStore: key = f"{platform}:{external_id}" existing = self._users.get(key) if existing is not None: - return existing.model_copy(update={"is_new": False}) + stored = existing.model_copy(update={"is_new": False}) + self._users[key] = stored + return stored user = User( user_id=f"usr-{platform}-{external_id}", diff --git a/tests/platform/test_prototype_state.py b/tests/platform/test_prototype_state.py index 2a49375..3c2c25a 100644 --- a/tests/platform/test_prototype_state.py +++ b/tests/platform/test_prototype_state.py @@ -24,6 +24,7 @@ async def test_get_or_create_user_is_stable_per_surface_identity(): assert second.user_id == first.user_id assert second.is_new is False assert second.display_name == "Alice" + assert store._users["matrix:@alice:example.org"].is_new is False @pytest.mark.asyncio From fabedb105b40d56a84501554ded44477e96453f0 Mon Sep 17 00:00:00 2001 From: Mikhail Putilovskij Date: Wed, 8 Apr 2026 01:30:37 +0300 Subject: [PATCH 06/10] Fix prototype state user isolation --- sdk/prototype_state.py | 4 ++-- tests/platform/test_prototype_state.py | 5 +++++ 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/sdk/prototype_state.py b/sdk/prototype_state.py index 78243e4..3423701 100644 --- a/sdk/prototype_state.py +++ b/sdk/prototype_state.py @@ -44,7 +44,7 @@ class PrototypeStateStore: if existing is not None: stored = existing.model_copy(update={"is_new": False}) self._users[key] = stored - return stored + return stored.model_copy() user = User( user_id=f"usr-{platform}-{external_id}", @@ -55,7 +55,7 @@ class PrototypeStateStore: is_new=True, ) self._users[key] = user - return user + return user.model_copy() async def get_settings(self, user_id: str) -> UserSettings: stored = self._settings.get(user_id, {}) diff --git a/tests/platform/test_prototype_state.py b/tests/platform/test_prototype_state.py index 3c2c25a..c1a2d73 100644 --- a/tests/platform/test_prototype_state.py +++ b/tests/platform/test_prototype_state.py @@ -21,9 +21,14 @@ async def test_get_or_create_user_is_stable_per_surface_identity(): assert first.user_id == "usr-matrix-@alice:example.org" assert first.is_new is True + + first.display_name = "Mallory" + first.is_new = False + assert second.user_id == first.user_id assert second.is_new is False assert second.display_name == "Alice" + assert store._users["matrix:@alice:example.org"].display_name == "Alice" assert store._users["matrix:@alice:example.org"].is_new is False From 9784ca678323dfbcdacf9c34995918694eafa37d Mon Sep 17 00:00:00 2001 From: Mikhail Putilovskij Date: Wed, 8 Apr 2026 01:38:28 +0300 Subject: [PATCH 07/10] feat: add real platform compatibility layer --- sdk/__init__.py | 3 + sdk/real.py | 58 ++++++++++++++++ tests/core/test_integration.py | 64 ++++++++++++++++++ tests/platform/test_real.py | 118 +++++++++++++++++++++++++++++++++ 4 files changed, 243 insertions(+) create mode 100644 sdk/real.py create mode 100644 tests/platform/test_real.py diff --git a/sdk/__init__.py b/sdk/__init__.py index e69de29..36f81c1 100644 --- a/sdk/__init__.py +++ b/sdk/__init__.py @@ -0,0 +1,3 @@ +from sdk.real import RealPlatformClient + +__all__ = ["RealPlatformClient"] diff --git a/sdk/real.py b/sdk/real.py new file mode 100644 index 0000000..cd38cc2 --- /dev/null +++ b/sdk/real.py @@ -0,0 +1,58 @@ +from __future__ import annotations + +from typing import AsyncIterator + +from sdk.agent_session import AgentSessionClient, build_thread_key +from sdk.interface import Attachment, MessageChunk, MessageResponse, PlatformClient, User, UserSettings +from sdk.prototype_state import PrototypeStateStore + + +class RealPlatformClient(PlatformClient): + def __init__( + self, + agent_sessions: AgentSessionClient, + prototype_state: PrototypeStateStore, + platform: str, + ) -> None: + self._agent_sessions = agent_sessions + self._prototype_state = prototype_state + self._platform = platform + + async def get_or_create_user( + self, + external_id: str, + platform: str, + display_name: str | None = None, + ) -> User: + return await self._prototype_state.get_or_create_user( + external_id=external_id, + platform=platform, + display_name=display_name, + ) + + async def send_message( + self, + user_id: str, + chat_id: str, + text: str, + attachments: list[Attachment] | None = None, + ) -> MessageResponse: + thread_key = build_thread_key(self._platform, user_id, chat_id) + return await self._agent_sessions.send_message(thread_key=thread_key, text=text) + + async def stream_message( + self, + user_id: str, + chat_id: str, + text: str, + attachments: list[Attachment] | None = None, + ) -> AsyncIterator[MessageChunk]: + thread_key = build_thread_key(self._platform, user_id, chat_id) + async for chunk in self._agent_sessions.stream_message(thread_key=thread_key, text=text): + yield chunk + + async def get_settings(self, user_id: str) -> UserSettings: + return await self._prototype_state.get_settings(user_id) + + async def update_settings(self, user_id: str, action) -> None: + await self._prototype_state.update_settings(user_id, action) diff --git a/tests/core/test_integration.py b/tests/core/test_integration.py index 207a0ba..db2cf8f 100644 --- a/tests/core/test_integration.py +++ b/tests/core/test_integration.py @@ -5,6 +5,10 @@ Smoke test: полный цикл через dispatcher + реальные manag """ import pytest from sdk.mock import MockPlatformClient +from sdk.agent_session import build_thread_key +from sdk.interface import MessageChunk, MessageResponse +from sdk.prototype_state import PrototypeStateStore +from sdk.real import RealPlatformClient from core.store import InMemoryStore from core.chat import ChatManager from core.auth import AuthManager @@ -18,6 +22,30 @@ from core.protocol import ( ) +class FakeAgentSessionClient: + def __init__(self) -> None: + self.send_calls: list[tuple[str, str]] = [] + + async def send_message(self, *, thread_key: str, text: str) -> MessageResponse: + self.send_calls.append((thread_key, text)) + return MessageResponse( + message_id=thread_key, + response=f"[REAL] {text}", + tokens_used=5, + finished=True, + ) + + async def stream_message(self, *, thread_key: str, text: str): + self.send_calls.append((thread_key, text)) + if False: + yield MessageChunk( + message_id=thread_key, + delta=text, + tokens_used=0, + finished=True, + ) + + @pytest.fixture def dispatcher(): platform = MockPlatformClient() @@ -32,6 +60,25 @@ def dispatcher(): return d +@pytest.fixture +def real_dispatcher(): + agent_sessions = FakeAgentSessionClient() + platform = RealPlatformClient( + agent_sessions=agent_sessions, + prototype_state=PrototypeStateStore(), + platform="matrix", + ) + store = InMemoryStore() + d = EventDispatcher( + platform=platform, + chat_mgr=ChatManager(platform, store), + auth_mgr=AuthManager(platform, store), + settings_mgr=SettingsManager(platform, store), + ) + register_all(d) + return d, agent_sessions + + async def test_full_flow_start_then_message(dispatcher): start = IncomingCommand(user_id="tg_123", platform="telegram", chat_id="C1", command="start") result = await dispatcher.dispatch(start) @@ -83,3 +130,20 @@ async def test_toggle_skill_callback(dispatcher): ) result = await dispatcher.dispatch(cb) assert any("browser" in r.text for r in result if isinstance(r, OutgoingMessage)) + + +async def test_full_flow_with_real_platform_uses_thread_key(real_dispatcher): + dispatcher, agent_sessions = real_dispatcher + + start = IncomingCommand(user_id="u1", platform="matrix", chat_id="C1", command="start") + result = await dispatcher.dispatch(start) + assert any(isinstance(r, OutgoingMessage) for r in result) + + msg = IncomingMessage(user_id="u1", platform="matrix", chat_id="C1", text="Привет!") + result = await dispatcher.dispatch(msg) + texts = [r.text for r in result if isinstance(r, OutgoingMessage)] + + assert texts == ["[REAL] Привет!"] + assert agent_sessions.send_calls == [ + (build_thread_key("matrix", "u1", "C1"), "Привет!") + ] diff --git a/tests/platform/test_real.py b/tests/platform/test_real.py new file mode 100644 index 0000000..f10e2c0 --- /dev/null +++ b/tests/platform/test_real.py @@ -0,0 +1,118 @@ +import pytest + +from core.protocol import SettingsAction +from sdk.interface import MessageChunk, MessageResponse, UserSettings +from sdk.prototype_state import PrototypeStateStore +from sdk.real import RealPlatformClient + + +class FakeAgentSessionClient: + def __init__(self) -> None: + self.send_calls: list[tuple[str, str]] = [] + self.stream_calls: list[tuple[str, str]] = [] + + async def send_message(self, *, thread_key: str, text: str) -> MessageResponse: + self.send_calls.append((thread_key, text)) + return MessageResponse( + message_id=thread_key, + response=f"echo:{text}", + tokens_used=3, + finished=True, + ) + + async def stream_message(self, *, thread_key: str, text: str): + self.stream_calls.append((thread_key, text)) + yield MessageChunk(message_id=thread_key, delta=text[:2], finished=False) + yield MessageChunk(message_id=thread_key, delta=text[2:], finished=True, tokens_used=3) + + +@pytest.mark.asyncio +async def test_real_platform_client_get_or_create_user_uses_local_state(): + client = RealPlatformClient( + agent_sessions=FakeAgentSessionClient(), + prototype_state=PrototypeStateStore(), + platform="telegram", + ) + + first = await client.get_or_create_user("u1", "telegram", "Alice") + second = await client.get_or_create_user("u1", "telegram") + + assert first.user_id == "usr-telegram-u1" + assert first.is_new is True + assert second.user_id == first.user_id + assert second.is_new is False + assert second.display_name == "Alice" + + +@pytest.mark.asyncio +async def test_real_platform_client_send_message_uses_configured_platform(): + agent_sessions = FakeAgentSessionClient() + client = RealPlatformClient( + agent_sessions=agent_sessions, + prototype_state=PrototypeStateStore(), + platform="telegram", + ) + + result = await client.send_message("usr-telegram-u1", "C1", "hello") + + assert result == MessageResponse( + message_id="8:telegram15:usr-telegram-u12:C1", + response="echo:hello", + tokens_used=3, + finished=True, + ) + assert agent_sessions.send_calls == [ + ("8:telegram15:usr-telegram-u12:C1", "hello") + ] + + +@pytest.mark.asyncio +async def test_real_platform_client_stream_message_uses_configured_platform(): + agent_sessions = FakeAgentSessionClient() + client = RealPlatformClient( + agent_sessions=agent_sessions, + prototype_state=PrototypeStateStore(), + platform="telegram", + ) + + chunks = [] + async for chunk in client.stream_message("usr-telegram-u1", "C1", "hello"): + chunks.append(chunk) + + assert chunks == [ + MessageChunk( + message_id="8:telegram15:usr-telegram-u12:C1", + delta="he", + finished=False, + tokens_used=0, + ), + MessageChunk( + message_id="8:telegram15:usr-telegram-u12:C1", + delta="llo", + finished=True, + tokens_used=3, + ), + ] + assert agent_sessions.stream_calls == [ + ("8:telegram15:usr-telegram-u12:C1", "hello") + ] + + +@pytest.mark.asyncio +async def test_real_platform_client_settings_are_local(): + client = RealPlatformClient( + agent_sessions=FakeAgentSessionClient(), + prototype_state=PrototypeStateStore(), + platform="matrix", + ) + + await client.update_settings( + "usr-matrix-u1", + SettingsAction(action="toggle_skill", payload={"skill": "browser", "enabled": True}), + ) + + settings = await client.get_settings("usr-matrix-u1") + + assert isinstance(settings, UserSettings) + assert settings.skills["browser"] is True + assert settings.skills["web-search"] is True From 94bdb44b935dd824e969cad134970ae6f1f2b92f Mon Sep 17 00:00:00 2001 From: Mikhail Putilovskij Date: Wed, 8 Apr 2026 01:40:38 +0300 Subject: [PATCH 08/10] feat: wire matrix runtime to real backend --- README.md | 6 ++++-- adapter/matrix/bot.py | 24 ++++++++++++++++++++---- tests/adapter/matrix/test_dispatcher.py | 10 ++++++++++ 3 files changed, 34 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 318a45d..b2f69fb 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ ## Статус -Прототип в разработке. SDK платформы ещё не готов — работаем через `MockPlatformClient`. +Прототип в разработке. Matrix-адаптер по умолчанию работает через `MockPlatformClient`, но может переключаться на реальный direct-agent backend через `MATRIX_PLATFORM_BACKEND=real`. | Поверхность | Статус | Описание | |---|---|---| @@ -71,6 +71,8 @@ surfaces-bot/ - **Диалог** — сообщения, вложения, подтверждения `!yes` / `!no` и routing через `EventDispatcher` - **Стабильность** — перед `sync_forever()` бот делает bootstrap sync и стартует с `since`, чтобы не переигрывать старую timeline после рестарта - **Текущее ограничение** — encrypted DM пока не поддержан; ручное тестирование Matrix ведётся в незашифрованных комнатах и зависит от локального state-store бота +- **Backend selection** — `MATRIX_PLATFORM_BACKEND=mock` остаётся значением по умолчанию; `MATRIX_PLATFORM_BACKEND=real` требует `AGENT_WS_URL=ws://host:port/agent_ws/` +- **Ограничения real backend** — пока это текстовый direct-agent прототип без вложений и без асинхронных callbacks; локальные настройки и user-state хранятся в `PrototypeStateStore` --- @@ -89,7 +91,7 @@ class PlatformClient(Protocol): Бот не управляет lifecycle контейнеров — это делает Master (платформа). Бот передаёт `user_id` + `chat_id` + сообщение; платформа сама решает нужно ли поднять контейнер. -Сейчас: `MockPlatformClient` в `sdk/mock.py`. +Сейчас: `MockPlatformClient` в `sdk/mock.py`, а Matrix real backend собирается через `sdk/real.py` при `MATRIX_PLATFORM_BACKEND=real`. Когда SDK готов: добавляем `SdkPlatformClient`, меняем одну строку в DI. Адаптеры и ядро не трогаем. --- diff --git a/adapter/matrix/bot.py b/adapter/matrix/bot.py index 08638cb..a413fad 100644 --- a/adapter/matrix/bot.py +++ b/adapter/matrix/bot.py @@ -35,7 +35,11 @@ from core.protocol import ( ) from core.settings import SettingsManager from core.store import InMemoryStore, SQLiteStore, StateStore +from sdk.agent_session import AgentSessionClient, AgentSessionConfig +from sdk.interface import PlatformClient from sdk.mock import MockPlatformClient +from sdk.prototype_state import PrototypeStateStore +from sdk.real import RealPlatformClient logger = structlog.get_logger(__name__) @@ -44,7 +48,7 @@ load_dotenv(Path(__file__).resolve().parents[2] / ".env") @dataclass class MatrixRuntime: - platform: MockPlatformClient + platform: PlatformClient store: StateStore chat_mgr: ChatManager auth_mgr: AuthManager @@ -52,7 +56,7 @@ class MatrixRuntime: dispatcher: EventDispatcher -def build_event_dispatcher(platform: MockPlatformClient, store: StateStore) -> EventDispatcher: +def build_event_dispatcher(platform: PlatformClient, store: StateStore) -> EventDispatcher: chat_mgr = ChatManager(platform, store) auth_mgr = AuthManager(platform, store) settings_mgr = SettingsManager(platform, store) @@ -64,12 +68,24 @@ def build_event_dispatcher(platform: MockPlatformClient, store: StateStore) -> E return dispatcher +def _build_platform_from_env() -> PlatformClient: + backend = os.environ.get("MATRIX_PLATFORM_BACKEND", "mock").strip().lower() + if backend == "real": + ws_url = os.environ["AGENT_WS_URL"] + return RealPlatformClient( + agent_sessions=AgentSessionClient(AgentSessionConfig(base_ws_url=ws_url)), + prototype_state=PrototypeStateStore(), + platform="matrix", + ) + return MockPlatformClient() + + def build_runtime( - platform: MockPlatformClient | None = None, + platform: PlatformClient | None = None, store: StateStore | None = None, client: AsyncClient | None = None, ) -> MatrixRuntime: - platform = platform or MockPlatformClient() + platform = platform or _build_platform_from_env() store = store or InMemoryStore() chat_mgr = ChatManager(platform, store) auth_mgr = AuthManager(platform, store) diff --git a/tests/adapter/matrix/test_dispatcher.py b/tests/adapter/matrix/test_dispatcher.py index dce9243..7f064f2 100644 --- a/tests/adapter/matrix/test_dispatcher.py +++ b/tests/adapter/matrix/test_dispatcher.py @@ -11,6 +11,7 @@ from adapter.matrix.handlers.auth import handle_invite from adapter.matrix.store import get_room_meta, get_user_meta, set_user_meta from core.protocol import IncomingCallback, IncomingCommand, OutgoingMessage from sdk.mock import MockPlatformClient +from sdk.real import RealPlatformClient async def test_matrix_dispatcher_registers_custom_handlers(): @@ -254,3 +255,12 @@ async def test_prepare_live_sync_returns_next_batch_from_bootstrap_sync(): client.sync.assert_awaited_once_with(timeout=0, full_state=True) assert since == "s123" + + +async def test_build_runtime_uses_real_platform_when_matrix_backend_is_real(monkeypatch): + monkeypatch.setenv("MATRIX_PLATFORM_BACKEND", "real") + monkeypatch.setenv("AGENT_WS_URL", "ws://agent.example/agent_ws/") + + runtime = build_runtime() + + assert isinstance(runtime.platform, RealPlatformClient) From 37643a96952cece5679db7a249226028af145c9c Mon Sep 17 00:00:00 2001 From: Mikhail Putilovskij Date: Wed, 8 Apr 2026 01:43:44 +0300 Subject: [PATCH 09/10] fix prototype backend review issues --- pyproject.toml | 2 + sdk/__init__.py | 10 ++- sdk/agent_session.py | 4 +- sdk/prototype_state.py | 5 +- sdk/real.py | 9 +- tests/platform/test_agent_session.py | 116 +++++++++++++++++++++++-- tests/platform/test_prototype_state.py | 28 ++++-- tests/platform/test_real.py | 36 ++++---- uv.lock | 18 ++++ 9 files changed, 182 insertions(+), 46 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 8f4978b..ccc6309 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,12 +15,14 @@ dependencies = [ "structlog>=24.1", "python-dotenv>=1.0", "httpx>=0.27", + "aiohttp>=3.9", ] [project.optional-dependencies] dev = [ "pytest>=8.0", "pytest-asyncio>=0.23", + "pytest-aiohttp>=1.0", "pytest-cov>=4.1", "ruff>=0.3", "mypy>=1.8", diff --git a/sdk/__init__.py b/sdk/__init__.py index 36f81c1..f7939f7 100644 --- a/sdk/__init__.py +++ b/sdk/__init__.py @@ -1,3 +1,9 @@ -from sdk.real import RealPlatformClient - __all__ = ["RealPlatformClient"] + + +def __getattr__(name: str): + if name == "RealPlatformClient": + from sdk.real import RealPlatformClient + + return RealPlatformClient + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") diff --git a/sdk/agent_session.py b/sdk/agent_session.py index 8b7c7b3..0f959a1 100644 --- a/sdk/agent_session.py +++ b/sdk/agent_session.py @@ -4,8 +4,6 @@ from dataclasses import dataclass from typing import AsyncIterator from urllib.parse import parse_qsl, urlencode, urlsplit, urlunsplit -import aiohttp - from sdk.interface import MessageChunk, MessageResponse, PlatformError @@ -41,6 +39,8 @@ class AgentSessionClient: ) async def stream_message(self, *, thread_key: str, text: str) -> AsyncIterator[MessageChunk]: + import aiohttp + async with aiohttp.ClientSession() as session: async with session.ws_connect( self._ws_url(thread_key), diff --git a/sdk/prototype_state.py b/sdk/prototype_state.py index 3423701..ccb75f1 100644 --- a/sdk/prototype_state.py +++ b/sdk/prototype_state.py @@ -34,7 +34,6 @@ class PrototypeStateStore: async def get_or_create_user( self, - *, external_id: str, platform: str, display_name: str | None = None, @@ -54,14 +53,14 @@ class PrototypeStateStore: created_at=datetime.now(UTC), is_new=True, ) - self._users[key] = user + self._users[key] = user.model_copy(update={"is_new": False}) return user.model_copy() async def get_settings(self, user_id: str) -> UserSettings: stored = self._settings.get(user_id, {}) return UserSettings( skills={**DEFAULT_SKILLS, **stored.get("skills", {})}, - connectors=stored.get("connectors", {}), + connectors=dict(stored.get("connectors", {})), soul={**DEFAULT_SOUL, **stored.get("soul", {})}, safety={**DEFAULT_SAFETY, **stored.get("safety", {})}, plan={**DEFAULT_PLAN, **stored.get("plan", {})}, diff --git a/sdk/real.py b/sdk/real.py index cd38cc2..7da48c8 100644 --- a/sdk/real.py +++ b/sdk/real.py @@ -1,18 +1,21 @@ from __future__ import annotations -from typing import AsyncIterator +from typing import TYPE_CHECKING, AsyncIterator -from sdk.agent_session import AgentSessionClient, build_thread_key +from sdk.agent_session import build_thread_key from sdk.interface import Attachment, MessageChunk, MessageResponse, PlatformClient, User, UserSettings from sdk.prototype_state import PrototypeStateStore +if TYPE_CHECKING: + from sdk.agent_session import AgentSessionClient + class RealPlatformClient(PlatformClient): def __init__( self, agent_sessions: AgentSessionClient, prototype_state: PrototypeStateStore, - platform: str, + platform: str = "matrix", ) -> None: self._agent_sessions = agent_sessions self._prototype_state = prototype_state diff --git a/tests/platform/test_agent_session.py b/tests/platform/test_agent_session.py index bd38b27..2d085c3 100644 --- a/tests/platform/test_agent_session.py +++ b/tests/platform/test_agent_session.py @@ -1,9 +1,58 @@ +import sys +from pathlib import Path +from types import ModuleType + import pytest from aiohttp import web from sdk.interface import MessageChunk, MessageResponse from sdk.agent_session import AgentSessionClient, AgentSessionConfig, build_thread_key +AGENT_ROOT = Path(__file__).resolve().parents[2] / "external" / "platform-agent" +AGENT_API_ROOT = Path(__file__).resolve().parents[2] / "external" / "platform-agent_api" +for path in (AGENT_ROOT, AGENT_API_ROOT): + if str(path) not in sys.path: + sys.path.insert(0, str(path)) + +if "fastapi" not in sys.modules: + fastapi = ModuleType("fastapi") + + class _Router: + def websocket(self, _path: str): + def decorator(fn): + return fn + + return decorator + + class _WebSocketDisconnect(Exception): + pass + + def _depends(value): + return value + + fastapi.APIRouter = _Router + fastapi.WebSocket = object + fastapi.WebSocketDisconnect = _WebSocketDisconnect + fastapi.Depends = _depends + sys.modules["fastapi"] = fastapi + +if "src.agent" not in sys.modules: + agent_module = ModuleType("src.agent") + + class _AgentService: + async def astream(self, text: str, thread_id: str): + yield text + + def _get_agent_service(): + return _AgentService() + + agent_module.AgentService = _AgentService + agent_module.get_agent_service = _get_agent_service + sys.modules["src.agent"] = agent_module + +from lambda_agent_api.client import MsgUserMessage # noqa: E402 +from src.api.external import process_message # noqa: E402 + def test_build_thread_key_uses_platform_user_and_chat_id(): assert build_thread_key("matrix", "@alice:example.org", "C1") == "6:matrix18:@alice:example.org2:C1" @@ -18,11 +67,13 @@ def test_build_thread_key_does_not_collide_when_user_id_contains_colons(): @pytest.mark.asyncio async def test_stream_message_yields_text_chunks_and_end(aiohttp_server): + thread_key = build_thread_key("matrix", "@alice:example.org", "C1") + async def handler(request): ws = web.WebSocketResponse() await ws.prepare(request) - assert request.query["thread_id"] == "matrix:@alice:example.org:C1" + assert request.query["thread_id"] == thread_key await ws.send_json({"type": "STATUS"}) @@ -43,25 +94,27 @@ async def test_stream_message_yields_text_chunks_and_end(aiohttp_server): chunks = [] async for chunk in client.stream_message( - thread_key="matrix:@alice:example.org:C1", + thread_key=thread_key, text="hello", ): chunks.append(chunk) assert chunks == [ - MessageChunk(message_id="matrix:@alice:example.org:C1", delta="hel", finished=False, tokens_used=0), - MessageChunk(message_id="matrix:@alice:example.org:C1", delta="lo", finished=False, tokens_used=0), - MessageChunk(message_id="matrix:@alice:example.org:C1", delta="", finished=True, tokens_used=7), + MessageChunk(message_id=thread_key, delta="hel", finished=False, tokens_used=0), + MessageChunk(message_id=thread_key, delta="lo", finished=False, tokens_used=0), + MessageChunk(message_id=thread_key, delta="", finished=True, tokens_used=7), ] @pytest.mark.asyncio async def test_send_message_collects_streamed_chunks_and_tokens(aiohttp_server): + thread_key = build_thread_key("matrix", "@alice:example.org", "C1") + async def handler(request): ws = web.WebSocketResponse() await ws.prepare(request) - assert request.query["thread_id"] == "matrix:@alice:example.org:C1" + assert request.query["thread_id"] == thread_key await ws.send_json({"type": "STATUS"}) @@ -81,13 +134,60 @@ async def test_send_message_collects_streamed_chunks_and_tokens(aiohttp_server): client = AgentSessionClient(AgentSessionConfig(base_ws_url=str(server.make_url("/agent_ws/")))) result = await client.send_message( - thread_key="matrix:@alice:example.org:C1", + thread_key=thread_key, text="hello world", ) assert result == MessageResponse( - message_id="matrix:@alice:example.org:C1", + message_id=thread_key, response="hello world", tokens_used=11, finished=True, ) + + +@pytest.mark.asyncio +async def test_process_message_requires_thread_id_query_param(): + class FakeWebSocket: + query_params = {} + + async def send_text(self, text: str) -> None: + raise AssertionError(f"send_text should not be called: {text}") + + class FakeAgentService: + async def astream(self, text: str, thread_id: str): + yield text + + with pytest.raises(ValueError, match="thread_id query parameter is required"): + await process_message( + FakeWebSocket(), + MsgUserMessage(text="hello"), + FakeAgentService(), + ) + + +@pytest.mark.asyncio +async def test_process_message_passes_thread_id_to_agent_service(): + class FakeWebSocket: + def __init__(self) -> None: + self.query_params = {"thread_id": "6:matrix18:@alice:example.org2:C1"} + self.sent_messages: list[str] = [] + + async def send_text(self, text: str) -> None: + self.sent_messages.append(text) + + class FakeAgentService: + def __init__(self) -> None: + self.calls: list[tuple[str, str]] = [] + + async def astream(self, text: str, thread_id: str): + self.calls.append((text, thread_id)) + yield "hello" + + ws = FakeWebSocket() + agent_service = FakeAgentService() + await process_message(ws, MsgUserMessage(text="hello"), agent_service) + + assert agent_service.calls == [("hello", "6:matrix18:@alice:example.org2:C1")] + assert any("AGENT_EVENT_TEXT_CHUNK" in message for message in ws.sent_messages) + assert any("AGENT_EVENT_END" in message for message in ws.sent_messages) diff --git a/tests/platform/test_prototype_state.py b/tests/platform/test_prototype_state.py index c1a2d73..b5f5dc3 100644 --- a/tests/platform/test_prototype_state.py +++ b/tests/platform/test_prototype_state.py @@ -9,18 +9,12 @@ from sdk.prototype_state import PrototypeStateStore async def test_get_or_create_user_is_stable_per_surface_identity(): store = PrototypeStateStore() - first = await store.get_or_create_user( - external_id="@alice:example.org", - platform="matrix", - display_name="Alice", - ) - second = await store.get_or_create_user( - external_id="@alice:example.org", - platform="matrix", - ) + first = await store.get_or_create_user("@alice:example.org", "matrix", "Alice") + second = await store.get_or_create_user("@alice:example.org", "matrix") assert first.user_id == "usr-matrix-@alice:example.org" assert first.is_new is True + assert store._users["matrix:@alice:example.org"].is_new is False first.display_name = "Mallory" first.is_new = False @@ -56,6 +50,22 @@ async def test_settings_defaults_match_existing_mock_shape(): assert settings.plan == {"name": "Beta", "tokens_used": 0, "tokens_limit": 1000} +@pytest.mark.asyncio +async def test_get_settings_returns_connectors_copy(): + store = PrototypeStateStore() + store._settings["usr-matrix-@alice:example.org"] = { + "connectors": {"github": {"enabled": True}}, + } + + settings = await store.get_settings("usr-matrix-@alice:example.org") + settings.connectors["github"]["enabled"] = False + settings.connectors["slack"] = {"enabled": True} + + assert store._settings["usr-matrix-@alice:example.org"]["connectors"] == { + "github": {"enabled": True}, + } + + @pytest.mark.asyncio async def test_update_settings_supports_toggle_skill_and_setters(): store = PrototypeStateStore() diff --git a/tests/platform/test_real.py b/tests/platform/test_real.py index f10e2c0..7225cfd 100644 --- a/tests/platform/test_real.py +++ b/tests/platform/test_real.py @@ -1,6 +1,7 @@ import pytest from core.protocol import SettingsAction +from sdk.agent_session import build_thread_key from sdk.interface import MessageChunk, MessageResponse, UserSettings from sdk.prototype_state import PrototypeStateStore from sdk.real import RealPlatformClient @@ -31,13 +32,12 @@ async def test_real_platform_client_get_or_create_user_uses_local_state(): client = RealPlatformClient( agent_sessions=FakeAgentSessionClient(), prototype_state=PrototypeStateStore(), - platform="telegram", ) - first = await client.get_or_create_user("u1", "telegram", "Alice") - second = await client.get_or_create_user("u1", "telegram") + first = await client.get_or_create_user("u1", "matrix", "Alice") + second = await client.get_or_create_user("u1", "matrix") - assert first.user_id == "usr-telegram-u1" + assert first.user_id == "usr-matrix-u1" assert first.is_new is True assert second.user_id == first.user_id assert second.is_new is False @@ -45,57 +45,55 @@ async def test_real_platform_client_get_or_create_user_uses_local_state(): @pytest.mark.asyncio -async def test_real_platform_client_send_message_uses_configured_platform(): +async def test_real_platform_client_send_message_uses_surface_user_thread_identity(): agent_sessions = FakeAgentSessionClient() client = RealPlatformClient( agent_sessions=agent_sessions, prototype_state=PrototypeStateStore(), - platform="telegram", + platform="matrix", ) - result = await client.send_message("usr-telegram-u1", "C1", "hello") + thread_key = build_thread_key("matrix", "@alice:example.org", "C1") + result = await client.send_message("@alice:example.org", "C1", "hello") assert result == MessageResponse( - message_id="8:telegram15:usr-telegram-u12:C1", + message_id=thread_key, response="echo:hello", tokens_used=3, finished=True, ) - assert agent_sessions.send_calls == [ - ("8:telegram15:usr-telegram-u12:C1", "hello") - ] + assert agent_sessions.send_calls == [(thread_key, "hello")] @pytest.mark.asyncio -async def test_real_platform_client_stream_message_uses_configured_platform(): +async def test_real_platform_client_stream_message_uses_surface_user_thread_identity(): agent_sessions = FakeAgentSessionClient() client = RealPlatformClient( agent_sessions=agent_sessions, prototype_state=PrototypeStateStore(), - platform="telegram", + platform="matrix", ) + thread_key = build_thread_key("matrix", "@alice:example.org", "C1") chunks = [] - async for chunk in client.stream_message("usr-telegram-u1", "C1", "hello"): + async for chunk in client.stream_message("@alice:example.org", "C1", "hello"): chunks.append(chunk) assert chunks == [ MessageChunk( - message_id="8:telegram15:usr-telegram-u12:C1", + message_id=thread_key, delta="he", finished=False, tokens_used=0, ), MessageChunk( - message_id="8:telegram15:usr-telegram-u12:C1", + message_id=thread_key, delta="llo", finished=True, tokens_used=3, ), ] - assert agent_sessions.stream_calls == [ - ("8:telegram15:usr-telegram-u12:C1", "hello") - ] + assert agent_sessions.stream_calls == [(thread_key, "hello")] @pytest.mark.asyncio diff --git a/uv.lock b/uv.lock index 0c37403..35c8460 100644 --- a/uv.lock +++ b/uv.lock @@ -1095,6 +1095,20 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/3b/ab/b3226f0bd7cdcf710fbede2b3548584366da3b19b5021e74f5bde2a8fa3f/pytest-9.0.2-py3-none-any.whl", hash = "sha256:711ffd45bf766d5264d487b917733b453d917afd2b0ad65223959f59089f875b", size = 374801, upload-time = "2025-12-06T21:30:49.154Z" }, ] +[[package]] +name = "pytest-aiohttp" +version = "1.1.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "aiohttp" }, + { name = "pytest" }, + { name = "pytest-asyncio" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/72/4b/d326890c153f2c4ce1bf45d07683c08c10a1766058a22934620bc6ac6592/pytest_aiohttp-1.1.0.tar.gz", hash = "sha256:147de8cb164f3fc9d7196967f109ab3c0b93ea3463ab50631e56438eab7b5adc", size = 12842, upload-time = "2025-01-23T12:44:04.465Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/ba/0f/e6af71c02e0f1098eaf7d2dbf3ffdf0a69fc1e0ef174f96af05cef161f1b/pytest_aiohttp-1.1.0-py3-none-any.whl", hash = "sha256:f39a11693a0dce08dd6c542d241e199dd8047a6e6596b2bcfa60d373f143456d", size = 8932, upload-time = "2025-01-23T12:44:03.27Z" }, +] + [[package]] name = "pytest-asyncio" version = "1.3.0" @@ -1302,6 +1316,7 @@ version = "0.1.0" source = { editable = "." } dependencies = [ { name = "aiogram" }, + { name = "aiohttp" }, { name = "httpx" }, { name = "matrix-nio" }, { name = "pydantic" }, @@ -1313,6 +1328,7 @@ dependencies = [ dev = [ { name = "mypy" }, { name = "pytest" }, + { name = "pytest-aiohttp" }, { name = "pytest-asyncio" }, { name = "pytest-cov" }, { name = "ruff" }, @@ -1321,11 +1337,13 @@ dev = [ [package.metadata] requires-dist = [ { name = "aiogram", specifier = ">=3.4,<4" }, + { name = "aiohttp", specifier = ">=3.9" }, { name = "httpx", specifier = ">=0.27" }, { name = "matrix-nio", specifier = ">=0.21" }, { name = "mypy", marker = "extra == 'dev'", specifier = ">=1.8" }, { name = "pydantic", specifier = ">=2.5" }, { name = "pytest", marker = "extra == 'dev'", specifier = ">=8.0" }, + { name = "pytest-aiohttp", marker = "extra == 'dev'", specifier = ">=1.0" }, { name = "pytest-asyncio", marker = "extra == 'dev'", specifier = ">=0.23" }, { name = "pytest-cov", marker = "extra == 'dev'", specifier = ">=4.1" }, { name = "python-dotenv", specifier = ">=1.0" }, From 8efc91b02b07ece536628e1c4b0f4ea8fa24a6df Mon Sep 17 00:00:00 2001 From: Mikhail Putilovskij Date: Wed, 8 Apr 2026 02:18:11 +0300 Subject: [PATCH 10/10] fix(matrix): accept repeat invites before provisioning --- adapter/matrix/handlers/auth.py | 4 ++-- tests/adapter/matrix/test_dispatcher.py | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/adapter/matrix/handlers/auth.py b/adapter/matrix/handlers/auth.py index 83f1ac6..6882404 100644 --- a/adapter/matrix/handlers/auth.py +++ b/adapter/matrix/handlers/auth.py @@ -20,12 +20,12 @@ async def handle_invite(client: Any, room: Any, event: Any, platform, store, aut matrix_user_id = getattr(event, "sender", "") display_name = getattr(room, "display_name", None) or matrix_user_id + await client.join(room.room_id) + existing = await get_user_meta(store, matrix_user_id) if existing and existing.get("space_id"): return - await client.join(room.room_id) - user = await platform.get_or_create_user( external_id=matrix_user_id, platform="matrix", diff --git a/tests/adapter/matrix/test_dispatcher.py b/tests/adapter/matrix/test_dispatcher.py index 7f064f2..ad4746c 100644 --- a/tests/adapter/matrix/test_dispatcher.py +++ b/tests/adapter/matrix/test_dispatcher.py @@ -179,7 +179,9 @@ async def test_invite_event_is_idempotent_per_user(): runtime.chat_mgr, ) + assert client.join.await_count == 2 assert client.room_create.await_count == 2 + client.room_send.assert_awaited_once() async def test_bot_ignores_its_own_messages():