Compare commits

...

10 commits

15 changed files with 1008 additions and 8 deletions

View file

@ -4,7 +4,7 @@
## Статус ## Статус
Прототип в разработке. SDK платформы ещё не готов — работаем через `MockPlatformClient`. Прототип в разработке. Matrix-адаптер по умолчанию работает через `MockPlatformClient`, но может переключаться на реальный direct-agent backend через `MATRIX_PLATFORM_BACKEND=real`.
| Поверхность | Статус | Описание | | Поверхность | Статус | Описание |
|---|---|---| |---|---|---|
@ -71,6 +71,8 @@ surfaces-bot/
- **Диалог** — сообщения, вложения, подтверждения `!yes` / `!no` и routing через `EventDispatcher` - **Диалог** — сообщения, вложения, подтверждения `!yes` / `!no` и routing через `EventDispatcher`
- **Стабильность** — перед `sync_forever()` бот делает bootstrap sync и стартует с `since`, чтобы не переигрывать старую timeline после рестарта - **Стабильность** — перед `sync_forever()` бот делает bootstrap sync и стартует с `since`, чтобы не переигрывать старую timeline после рестарта
- **Текущее ограничение** — encrypted DM пока не поддержан; ручное тестирование Matrix ведётся в незашифрованных комнатах и зависит от локального state-store бота - **Текущее ограничение** — encrypted DM пока не поддержан; ручное тестирование Matrix ведётся в незашифрованных комнатах и зависит от локального state-store бота
- **Backend selection**`MATRIX_PLATFORM_BACKEND=mock` остаётся значением по умолчанию; `MATRIX_PLATFORM_BACKEND=real` требует `AGENT_WS_URL=ws://host:port/agent_ws/`
- **Ограничения real backend** — пока это текстовый direct-agent прототип без вложений и без асинхронных callbacks; локальные настройки и user-state хранятся в `PrototypeStateStore`
--- ---
@ -89,7 +91,7 @@ class PlatformClient(Protocol):
Бот не управляет lifecycle контейнеров — это делает Master (платформа). Бот не управляет lifecycle контейнеров — это делает Master (платформа).
Бот передаёт `user_id` + `chat_id` + сообщение; платформа сама решает нужно ли поднять контейнер. Бот передаёт `user_id` + `chat_id` + сообщение; платформа сама решает нужно ли поднять контейнер.
Сейчас: `MockPlatformClient` в `sdk/mock.py`. Сейчас: `MockPlatformClient` в `sdk/mock.py`, а Matrix real backend собирается через `sdk/real.py` при `MATRIX_PLATFORM_BACKEND=real`.
Когда SDK готов: добавляем `SdkPlatformClient`, меняем одну строку в DI. Адаптеры и ядро не трогаем. Когда SDK готов: добавляем `SdkPlatformClient`, меняем одну строку в DI. Адаптеры и ядро не трогаем.
--- ---

View file

@ -35,7 +35,11 @@ from core.protocol import (
) )
from core.settings import SettingsManager from core.settings import SettingsManager
from core.store import InMemoryStore, SQLiteStore, StateStore from core.store import InMemoryStore, SQLiteStore, StateStore
from sdk.agent_session import AgentSessionClient, AgentSessionConfig
from sdk.interface import PlatformClient
from sdk.mock import MockPlatformClient from sdk.mock import MockPlatformClient
from sdk.prototype_state import PrototypeStateStore
from sdk.real import RealPlatformClient
logger = structlog.get_logger(__name__) logger = structlog.get_logger(__name__)
@ -44,7 +48,7 @@ load_dotenv(Path(__file__).resolve().parents[2] / ".env")
@dataclass @dataclass
class MatrixRuntime: class MatrixRuntime:
platform: MockPlatformClient platform: PlatformClient
store: StateStore store: StateStore
chat_mgr: ChatManager chat_mgr: ChatManager
auth_mgr: AuthManager auth_mgr: AuthManager
@ -52,7 +56,7 @@ class MatrixRuntime:
dispatcher: EventDispatcher dispatcher: EventDispatcher
def build_event_dispatcher(platform: MockPlatformClient, store: StateStore) -> EventDispatcher: def build_event_dispatcher(platform: PlatformClient, store: StateStore) -> EventDispatcher:
chat_mgr = ChatManager(platform, store) chat_mgr = ChatManager(platform, store)
auth_mgr = AuthManager(platform, store) auth_mgr = AuthManager(platform, store)
settings_mgr = SettingsManager(platform, store) settings_mgr = SettingsManager(platform, store)
@ -64,12 +68,24 @@ def build_event_dispatcher(platform: MockPlatformClient, store: StateStore) -> E
return dispatcher return dispatcher
def _build_platform_from_env() -> PlatformClient:
backend = os.environ.get("MATRIX_PLATFORM_BACKEND", "mock").strip().lower()
if backend == "real":
ws_url = os.environ["AGENT_WS_URL"]
return RealPlatformClient(
agent_sessions=AgentSessionClient(AgentSessionConfig(base_ws_url=ws_url)),
prototype_state=PrototypeStateStore(),
platform="matrix",
)
return MockPlatformClient()
def build_runtime( def build_runtime(
platform: MockPlatformClient | None = None, platform: PlatformClient | None = None,
store: StateStore | None = None, store: StateStore | None = None,
client: AsyncClient | None = None, client: AsyncClient | None = None,
) -> MatrixRuntime: ) -> MatrixRuntime:
platform = platform or MockPlatformClient() platform = platform or _build_platform_from_env()
store = store or InMemoryStore() store = store or InMemoryStore()
chat_mgr = ChatManager(platform, store) chat_mgr = ChatManager(platform, store)
auth_mgr = AuthManager(platform, store) auth_mgr = AuthManager(platform, store)

View file

@ -20,12 +20,12 @@ async def handle_invite(client: Any, room: Any, event: Any, platform, store, aut
matrix_user_id = getattr(event, "sender", "") matrix_user_id = getattr(event, "sender", "")
display_name = getattr(room, "display_name", None) or matrix_user_id display_name = getattr(room, "display_name", None) or matrix_user_id
await client.join(room.room_id)
existing = await get_user_meta(store, matrix_user_id) existing = await get_user_meta(store, matrix_user_id)
if existing and existing.get("space_id"): if existing and existing.get("space_id"):
return return
await client.join(room.room_id)
user = await platform.get_or_create_user( user = await platform.get_or_create_user(
external_id=matrix_user_id, external_id=matrix_user_id,
platform="matrix", platform="matrix",

View file

@ -0,0 +1,243 @@
# Matrix Direct-Agent Prototype Design
## Goal
Ship a working Matrix prototype that talks to the real Lambda agent instead of `MockPlatformClient`, while preserving the current Matrix adapter logic and keeping the code expandable toward future platform versions.
## Scope
This design is for a Matrix-only prototype delivered from this repository on a dedicated branch. It is not a `main` branch rollout and it is not a separate prototype repo.
The design assumes a minimal companion fork of `platform/agent` may be used, but changes there must stay as small as possible.
## Constraints
- Preserve the current Matrix transport logic as much as possible.
- Keep `core/` unaware of platform immaturity.
- Avoid broad changes to platform repos.
- Prefer one narrow patch to `platform/agent` over changes to both `platform/agent` and `platform/agent_api`.
- Keep the backend boundary reusable for future Telegram or other surfaces.
- Do not pretend unsupported platform capabilities are real.
## Live Platform Findings
Based on the live repo analysis performed on April 7, 2026:
- `platform/master` is not yet a usable consumer-facing backend for surfaces.
- `platform/agent` exposes a working WebSocket endpoint for prompt/response exchange.
- `platform/agent_api` documents and implements text-oriented WebSocket messaging, but the bot does not need to depend on that package directly.
- `platform/agent` currently hardcodes a single shared backend memory thread via `thread_id = "default"`, which would cause all chats to share context.
## Architecture
The prototype remains in this repo and introduces a new real backend path behind the existing SDK boundary.
### New files
- `sdk/real.py`
- Exports `RealPlatformClient`
- Implements the existing `PlatformClient` contract from `sdk/interface.py`
- Composes the lower-level prototype pieces
- `sdk/agent_session.py`
- Owns direct WebSocket communication with the real agent
- Manages connection lifecycle, request/response handling, and thread identity
- `sdk/prototype_state.py`
- Owns local prototype-only state
- Stores user mapping, local settings, and lightweight metadata needed until a real control plane exists
### Responsibility split
- Matrix adapter remains transport-specific only.
- `core/` continues to depend only on `PlatformClient`.
- `RealPlatformClient` acts as the anti-corruption layer between the current bot contract and the platforms incomplete shape.
- Local control-plane behavior remains explicit and replaceable later.
## Message and Identity Model
Each Matrix chat gets a stable backend session identity.
### Surface identity
- Surface: `matrix`
- Surface user id: Matrix MXID, for example `@alice:example.org`
- Surface chat id: logical chat id from `ChatManager`, for example `C1`
- Surface ref: Matrix room id
### Backend thread identity
Use a deterministic thread key:
`matrix:{matrix_user_id}:{chat_id}`
Example:
`matrix:@alice:example.org:C1`
### Mapping rules
- One Matrix logical chat maps to one backend memory thread.
- `!new` creates a fresh logical chat and therefore a fresh backend thread.
- `!rename` only changes display metadata and does not change backend identity.
- `!archive` stops active use of the thread in the surface, but does not need to delete backend memory in v1.
## Runtime Flow
### Normal message flow
1. Matrix event arrives in an existing room.
2. Existing Matrix routing resolves room to logical `chat_id`.
3. `core/handlers/message.py` calls `platform.send_message(...)`.
4. `RealPlatformClient` derives the backend thread key from `(platform, user_id, chat_id)`.
5. `AgentSessionClient` sends the prompt to the agent WebSocket using that thread key.
6. The reply is converted into the existing `MessageResponse` or `MessageChunk` contract.
7. Matrix sends the final text back to the room.
### Settings flow
For v1, settings remain local:
- `get_settings()` reads from local prototype state
- `update_settings()` writes to local prototype state
This is intentional. The prototype must not claim settings are backed by the real platform when no such platform API exists yet.
## Feature Matrix
### Real in v1
- `!start`
- Plain text messaging with the real agent
- Matrix chat lifecycle already implemented in this repo:
- `!new`
- `!chats`
- `!rename`
- `!archive`
- Per-chat conversation memory, provided the agent accepts dynamic thread identity
### Local in v1
- `!settings`
- `!skills`
- `!soul`
- `!safety`
- `!status`
- user registration and local user mapping
### Deferred
- Attachments and file upload to the agent
- Voice input to the agent
- Image input to the agent
- Long-running task callbacks and webhook-style async completion
- Real control-plane integration through `platform/master`
## Minimal Upstream Change
To avoid shared memory across all conversations, make one narrow change in the forked `platform/agent` repo:
- stop hardcoding `thread_id = "default"`
- derive thread identity from WebSocket connection context
### Preferred mechanism
Read `thread_id` from WebSocket query parameters rather than changing the message payload format.
Example:
`ws://host:port/agent_ws/?thread_id=matrix:@alice:example.org:C1`
This is preferred because:
- it limits the platform patch to one repo
- it avoids changing both server and SDK protocol shape
- it keeps the client message body text-only
- it makes session identity explicit and easy to reason about
## Why Not Use `platform/agent_api` Directly
The bot should not depend on their client package for the prototype.
Reasons:
- the bot already has its own internal integration boundary in `sdk/interface.py`
- a tiny local WebSocket client is enough for this protocol
- avoiding a dependency on `platform/agent_api` keeps rebasing simpler
- if upstream stabilizes later, the bot can adopt their SDK without affecting Matrix handlers
## Repo Strategy
### This repo
Owns:
- Matrix surface logic
- SDK compatibility layer
- local prototype state
- backend selection and wiring
### Forked `platform/agent`
Owns only:
- minimal thread identity patch required for per-chat memory
### Explicitly not doing
- no separate prototype repo
- no changes to `platform/master` for v1
- no unnecessary changes to `platform/agent_api`
## Migration Path
This design is intentionally expandable.
When the platform develops further:
- `sdk/prototype_state.py` can be replaced or reduced by a real `MasterClient`
- `sdk/agent_session.py` can remain the direct session transport if still relevant
- `RealPlatformClient` can continue to present the stable bot-facing interface
- Telegram or another surface can reuse the same backend components without rethinking the integration model
## Risks
### Risk: hidden platform assumptions leak upward
Mitigation:
- keep all direct-agent logic below `RealPlatformClient`
- avoid changing `core/` contracts for prototype convenience
### Risk: settings semantics drift from future platform reality
Mitigation:
- make local settings behavior explicit in code and docs
- keep settings isolated in `sdk/prototype_state.py`
### Risk: upstream `agent` fork diverges
Mitigation:
- keep the patch minimal and narrowly scoped to thread identity
### Risk: thread identity source is unstable
Mitigation:
- derive thread key from existing stable bot-side identities: platform, surface user id, logical chat id
## Testing Strategy
- Unit tests for `sdk/agent_session.py` request/response behavior
- Unit tests for `sdk/prototype_state.py` local settings and user mapping
- Unit tests for `sdk/real.py` contract compliance with `PlatformClient`
- Matrix integration tests confirming:
- existing commands still work
- different logical chats map to different backend thread keys
- rename does not change thread identity
- archive stops reuse from the surface perspective
## Success Criteria
- Matrix can talk to the real agent without rewriting the Matrix adapter architecture
- Chats do not share backend memory accidentally
- Unsupported platform capabilities remain local or deferred rather than being faked as “real”
- The backend boundary remains suitable for later Telegram or other surfaces

View file

@ -15,12 +15,14 @@ dependencies = [
"structlog>=24.1", "structlog>=24.1",
"python-dotenv>=1.0", "python-dotenv>=1.0",
"httpx>=0.27", "httpx>=0.27",
"aiohttp>=3.9",
] ]
[project.optional-dependencies] [project.optional-dependencies]
dev = [ dev = [
"pytest>=8.0", "pytest>=8.0",
"pytest-asyncio>=0.23", "pytest-asyncio>=0.23",
"pytest-aiohttp>=1.0",
"pytest-cov>=4.1", "pytest-cov>=4.1",
"ruff>=0.3", "ruff>=0.3",
"mypy>=1.8", "mypy>=1.8",

View file

@ -0,0 +1,9 @@
__all__ = ["RealPlatformClient"]
def __getattr__(name: str):
if name == "RealPlatformClient":
from sdk.real import RealPlatformClient
return RealPlatformClient
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")

93
sdk/agent_session.py Normal file
View file

@ -0,0 +1,93 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import AsyncIterator
from urllib.parse import parse_qsl, urlencode, urlsplit, urlunsplit
from sdk.interface import MessageChunk, MessageResponse, PlatformError
def build_thread_key(platform: str, user_id: str, chat_id: str) -> str:
return f"{len(platform)}:{platform}{len(user_id)}:{user_id}{len(chat_id)}:{chat_id}"
@dataclass(frozen=True, slots=True)
class AgentSessionConfig:
base_ws_url: str
timeout_seconds: float = 30.0
class AgentSessionClient:
def __init__(self, config: AgentSessionConfig) -> None:
self._config = config
async def send_message(self, *, thread_key: str, text: str) -> MessageResponse:
response_parts: list[str] = []
tokens_used = 0
async for chunk in self.stream_message(thread_key=thread_key, text=text):
if chunk.delta:
response_parts.append(chunk.delta)
if chunk.finished:
tokens_used = chunk.tokens_used
return MessageResponse(
message_id=thread_key,
response="".join(response_parts),
tokens_used=tokens_used,
finished=True,
)
async def stream_message(self, *, thread_key: str, text: str) -> AsyncIterator[MessageChunk]:
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.ws_connect(
self._ws_url(thread_key),
heartbeat=30,
) as ws:
status = await ws.receive_json(timeout=self._config.timeout_seconds)
if status.get("type") != "STATUS":
raise PlatformError("Agent did not send STATUS", code="AGENT_PROTOCOL_ERROR")
await ws.send_json({"type": "USER_MESSAGE", "text": text})
while True:
payload = await ws.receive_json(timeout=self._config.timeout_seconds)
msg_type = payload.get("type")
if msg_type == "AGENT_EVENT_TEXT_CHUNK":
yield MessageChunk(
message_id=thread_key,
delta=payload["text"],
finished=False,
)
elif msg_type == "AGENT_EVENT_END":
yield MessageChunk(
message_id=thread_key,
delta="",
finished=True,
tokens_used=payload.get("tokens_used", 0),
)
return
elif msg_type == "ERROR":
raise PlatformError(
payload.get("details", "Agent error"),
code=payload.get("code", "AGENT_ERROR"),
)
elif msg_type == "GRACEFUL_DISCONNECT":
raise PlatformError(
"Agent disconnected gracefully",
code="GRACEFUL_DISCONNECT",
)
else:
raise PlatformError(
f"Unexpected agent message: {payload}",
code="AGENT_PROTOCOL_ERROR",
)
def _ws_url(self, thread_key: str) -> str:
parts = urlsplit(self._config.base_ws_url)
query = dict(parse_qsl(parts.query, keep_blank_values=True))
query["thread_id"] = thread_key
return urlunsplit(parts._replace(query=urlencode(query)))

80
sdk/prototype_state.py Normal file
View file

@ -0,0 +1,80 @@
from __future__ import annotations
from datetime import UTC, datetime
from typing import Any
from sdk.interface import User, UserSettings
# Keep the prototype backend self-contained; do not import these from sdk.mock.
DEFAULT_SKILLS: dict[str, bool] = {
"web-search": True,
"fetch-url": True,
"email": False,
"browser": False,
"image-gen": False,
"files": True,
}
DEFAULT_SAFETY: dict[str, bool] = {
"email-send": True,
"file-delete": True,
"social-post": True,
}
DEFAULT_SOUL: dict[str, str] = {"name": "Лямбда", "instructions": ""}
DEFAULT_PLAN: dict[str, Any] = {
"name": "Beta",
"tokens_used": 0,
"tokens_limit": 1000,
}
class PrototypeStateStore:
def __init__(self) -> None:
self._users: dict[str, User] = {}
self._settings: dict[str, dict[str, Any]] = {}
async def get_or_create_user(
self,
external_id: str,
platform: str,
display_name: str | None = None,
) -> User:
key = f"{platform}:{external_id}"
existing = self._users.get(key)
if existing is not None:
stored = existing.model_copy(update={"is_new": False})
self._users[key] = stored
return stored.model_copy()
user = User(
user_id=f"usr-{platform}-{external_id}",
external_id=external_id,
platform=platform,
display_name=display_name,
created_at=datetime.now(UTC),
is_new=True,
)
self._users[key] = user.model_copy(update={"is_new": False})
return user.model_copy()
async def get_settings(self, user_id: str) -> UserSettings:
stored = self._settings.get(user_id, {})
return UserSettings(
skills={**DEFAULT_SKILLS, **stored.get("skills", {})},
connectors=dict(stored.get("connectors", {})),
soul={**DEFAULT_SOUL, **stored.get("soul", {})},
safety={**DEFAULT_SAFETY, **stored.get("safety", {})},
plan={**DEFAULT_PLAN, **stored.get("plan", {})},
)
async def update_settings(self, user_id: str, action: Any) -> None:
settings = self._settings.setdefault(user_id, {})
if action.action == "toggle_skill":
skills = settings.setdefault("skills", DEFAULT_SKILLS.copy())
skills[action.payload["skill"]] = action.payload.get("enabled", True)
elif action.action == "set_soul":
soul = settings.setdefault("soul", DEFAULT_SOUL.copy())
soul[action.payload["field"]] = action.payload["value"]
elif action.action == "set_safety":
safety = settings.setdefault("safety", DEFAULT_SAFETY.copy())
safety[action.payload["trigger"]] = action.payload.get("enabled", True)

61
sdk/real.py Normal file
View file

@ -0,0 +1,61 @@
from __future__ import annotations
from typing import TYPE_CHECKING, AsyncIterator
from sdk.agent_session import build_thread_key
from sdk.interface import Attachment, MessageChunk, MessageResponse, PlatformClient, User, UserSettings
from sdk.prototype_state import PrototypeStateStore
if TYPE_CHECKING:
from sdk.agent_session import AgentSessionClient
class RealPlatformClient(PlatformClient):
def __init__(
self,
agent_sessions: AgentSessionClient,
prototype_state: PrototypeStateStore,
platform: str = "matrix",
) -> None:
self._agent_sessions = agent_sessions
self._prototype_state = prototype_state
self._platform = platform
async def get_or_create_user(
self,
external_id: str,
platform: str,
display_name: str | None = None,
) -> User:
return await self._prototype_state.get_or_create_user(
external_id=external_id,
platform=platform,
display_name=display_name,
)
async def send_message(
self,
user_id: str,
chat_id: str,
text: str,
attachments: list[Attachment] | None = None,
) -> MessageResponse:
thread_key = build_thread_key(self._platform, user_id, chat_id)
return await self._agent_sessions.send_message(thread_key=thread_key, text=text)
async def stream_message(
self,
user_id: str,
chat_id: str,
text: str,
attachments: list[Attachment] | None = None,
) -> AsyncIterator[MessageChunk]:
thread_key = build_thread_key(self._platform, user_id, chat_id)
async for chunk in self._agent_sessions.stream_message(thread_key=thread_key, text=text):
yield chunk
async def get_settings(self, user_id: str) -> UserSettings:
return await self._prototype_state.get_settings(user_id)
async def update_settings(self, user_id: str, action) -> None:
await self._prototype_state.update_settings(user_id, action)

View file

@ -11,6 +11,7 @@ from adapter.matrix.handlers.auth import handle_invite
from adapter.matrix.store import get_room_meta, get_user_meta, set_user_meta from adapter.matrix.store import get_room_meta, get_user_meta, set_user_meta
from core.protocol import IncomingCallback, IncomingCommand, OutgoingMessage from core.protocol import IncomingCallback, IncomingCommand, OutgoingMessage
from sdk.mock import MockPlatformClient from sdk.mock import MockPlatformClient
from sdk.real import RealPlatformClient
async def test_matrix_dispatcher_registers_custom_handlers(): async def test_matrix_dispatcher_registers_custom_handlers():
@ -178,7 +179,9 @@ async def test_invite_event_is_idempotent_per_user():
runtime.chat_mgr, runtime.chat_mgr,
) )
assert client.join.await_count == 2
assert client.room_create.await_count == 2 assert client.room_create.await_count == 2
client.room_send.assert_awaited_once()
async def test_bot_ignores_its_own_messages(): async def test_bot_ignores_its_own_messages():
@ -254,3 +257,12 @@ async def test_prepare_live_sync_returns_next_batch_from_bootstrap_sync():
client.sync.assert_awaited_once_with(timeout=0, full_state=True) client.sync.assert_awaited_once_with(timeout=0, full_state=True)
assert since == "s123" assert since == "s123"
async def test_build_runtime_uses_real_platform_when_matrix_backend_is_real(monkeypatch):
monkeypatch.setenv("MATRIX_PLATFORM_BACKEND", "real")
monkeypatch.setenv("AGENT_WS_URL", "ws://agent.example/agent_ws/")
runtime = build_runtime()
assert isinstance(runtime.platform, RealPlatformClient)

View file

@ -5,6 +5,10 @@ Smoke test: полный цикл через dispatcher + реальные manag
""" """
import pytest import pytest
from sdk.mock import MockPlatformClient from sdk.mock import MockPlatformClient
from sdk.agent_session import build_thread_key
from sdk.interface import MessageChunk, MessageResponse
from sdk.prototype_state import PrototypeStateStore
from sdk.real import RealPlatformClient
from core.store import InMemoryStore from core.store import InMemoryStore
from core.chat import ChatManager from core.chat import ChatManager
from core.auth import AuthManager from core.auth import AuthManager
@ -18,6 +22,30 @@ from core.protocol import (
) )
class FakeAgentSessionClient:
def __init__(self) -> None:
self.send_calls: list[tuple[str, str]] = []
async def send_message(self, *, thread_key: str, text: str) -> MessageResponse:
self.send_calls.append((thread_key, text))
return MessageResponse(
message_id=thread_key,
response=f"[REAL] {text}",
tokens_used=5,
finished=True,
)
async def stream_message(self, *, thread_key: str, text: str):
self.send_calls.append((thread_key, text))
if False:
yield MessageChunk(
message_id=thread_key,
delta=text,
tokens_used=0,
finished=True,
)
@pytest.fixture @pytest.fixture
def dispatcher(): def dispatcher():
platform = MockPlatformClient() platform = MockPlatformClient()
@ -32,6 +60,25 @@ def dispatcher():
return d return d
@pytest.fixture
def real_dispatcher():
agent_sessions = FakeAgentSessionClient()
platform = RealPlatformClient(
agent_sessions=agent_sessions,
prototype_state=PrototypeStateStore(),
platform="matrix",
)
store = InMemoryStore()
d = EventDispatcher(
platform=platform,
chat_mgr=ChatManager(platform, store),
auth_mgr=AuthManager(platform, store),
settings_mgr=SettingsManager(platform, store),
)
register_all(d)
return d, agent_sessions
async def test_full_flow_start_then_message(dispatcher): async def test_full_flow_start_then_message(dispatcher):
start = IncomingCommand(user_id="tg_123", platform="telegram", chat_id="C1", command="start") start = IncomingCommand(user_id="tg_123", platform="telegram", chat_id="C1", command="start")
result = await dispatcher.dispatch(start) result = await dispatcher.dispatch(start)
@ -83,3 +130,20 @@ async def test_toggle_skill_callback(dispatcher):
) )
result = await dispatcher.dispatch(cb) result = await dispatcher.dispatch(cb)
assert any("browser" in r.text for r in result if isinstance(r, OutgoingMessage)) assert any("browser" in r.text for r in result if isinstance(r, OutgoingMessage))
async def test_full_flow_with_real_platform_uses_thread_key(real_dispatcher):
dispatcher, agent_sessions = real_dispatcher
start = IncomingCommand(user_id="u1", platform="matrix", chat_id="C1", command="start")
result = await dispatcher.dispatch(start)
assert any(isinstance(r, OutgoingMessage) for r in result)
msg = IncomingMessage(user_id="u1", platform="matrix", chat_id="C1", text="Привет!")
result = await dispatcher.dispatch(msg)
texts = [r.text for r in result if isinstance(r, OutgoingMessage)]
assert texts == ["[REAL] Привет!"]
assert agent_sessions.send_calls == [
(build_thread_key("matrix", "u1", "C1"), "Привет!")
]

View file

@ -0,0 +1,193 @@
import sys
from pathlib import Path
from types import ModuleType
import pytest
from aiohttp import web
from sdk.interface import MessageChunk, MessageResponse
from sdk.agent_session import AgentSessionClient, AgentSessionConfig, build_thread_key
AGENT_ROOT = Path(__file__).resolve().parents[2] / "external" / "platform-agent"
AGENT_API_ROOT = Path(__file__).resolve().parents[2] / "external" / "platform-agent_api"
for path in (AGENT_ROOT, AGENT_API_ROOT):
if str(path) not in sys.path:
sys.path.insert(0, str(path))
if "fastapi" not in sys.modules:
fastapi = ModuleType("fastapi")
class _Router:
def websocket(self, _path: str):
def decorator(fn):
return fn
return decorator
class _WebSocketDisconnect(Exception):
pass
def _depends(value):
return value
fastapi.APIRouter = _Router
fastapi.WebSocket = object
fastapi.WebSocketDisconnect = _WebSocketDisconnect
fastapi.Depends = _depends
sys.modules["fastapi"] = fastapi
if "src.agent" not in sys.modules:
agent_module = ModuleType("src.agent")
class _AgentService:
async def astream(self, text: str, thread_id: str):
yield text
def _get_agent_service():
return _AgentService()
agent_module.AgentService = _AgentService
agent_module.get_agent_service = _get_agent_service
sys.modules["src.agent"] = agent_module
from lambda_agent_api.client import MsgUserMessage # noqa: E402
from src.api.external import process_message # noqa: E402
def test_build_thread_key_uses_platform_user_and_chat_id():
assert build_thread_key("matrix", "@alice:example.org", "C1") == "6:matrix18:@alice:example.org2:C1"
def test_build_thread_key_does_not_collide_when_user_id_contains_colons():
left = build_thread_key("matrix", "@alice:example.org", "C1")
right = build_thread_key("matrix", "@alice", "example.org:C1")
assert left != right
@pytest.mark.asyncio
async def test_stream_message_yields_text_chunks_and_end(aiohttp_server):
thread_key = build_thread_key("matrix", "@alice:example.org", "C1")
async def handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
assert request.query["thread_id"] == thread_key
await ws.send_json({"type": "STATUS"})
message = await ws.receive_json()
assert message == {"type": "USER_MESSAGE", "text": "hello"}
await ws.send_json({"type": "AGENT_EVENT_TEXT_CHUNK", "text": "hel"})
await ws.send_json({"type": "AGENT_EVENT_TEXT_CHUNK", "text": "lo"})
await ws.send_json({"type": "AGENT_EVENT_END", "tokens_used": 7})
await ws.close()
return ws
app = web.Application()
app.router.add_get("/agent_ws/", handler)
server = await aiohttp_server(app)
client = AgentSessionClient(AgentSessionConfig(base_ws_url=str(server.make_url("/agent_ws/"))))
chunks = []
async for chunk in client.stream_message(
thread_key=thread_key,
text="hello",
):
chunks.append(chunk)
assert chunks == [
MessageChunk(message_id=thread_key, delta="hel", finished=False, tokens_used=0),
MessageChunk(message_id=thread_key, delta="lo", finished=False, tokens_used=0),
MessageChunk(message_id=thread_key, delta="", finished=True, tokens_used=7),
]
@pytest.mark.asyncio
async def test_send_message_collects_streamed_chunks_and_tokens(aiohttp_server):
thread_key = build_thread_key("matrix", "@alice:example.org", "C1")
async def handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
assert request.query["thread_id"] == thread_key
await ws.send_json({"type": "STATUS"})
message = await ws.receive_json()
assert message == {"type": "USER_MESSAGE", "text": "hello world"}
await ws.send_json({"type": "AGENT_EVENT_TEXT_CHUNK", "text": "hello "})
await ws.send_json({"type": "AGENT_EVENT_TEXT_CHUNK", "text": "world"})
await ws.send_json({"type": "AGENT_EVENT_END", "tokens_used": 11})
await ws.close()
return ws
app = web.Application()
app.router.add_get("/agent_ws/", handler)
server = await aiohttp_server(app)
client = AgentSessionClient(AgentSessionConfig(base_ws_url=str(server.make_url("/agent_ws/"))))
result = await client.send_message(
thread_key=thread_key,
text="hello world",
)
assert result == MessageResponse(
message_id=thread_key,
response="hello world",
tokens_used=11,
finished=True,
)
@pytest.mark.asyncio
async def test_process_message_requires_thread_id_query_param():
class FakeWebSocket:
query_params = {}
async def send_text(self, text: str) -> None:
raise AssertionError(f"send_text should not be called: {text}")
class FakeAgentService:
async def astream(self, text: str, thread_id: str):
yield text
with pytest.raises(ValueError, match="thread_id query parameter is required"):
await process_message(
FakeWebSocket(),
MsgUserMessage(text="hello"),
FakeAgentService(),
)
@pytest.mark.asyncio
async def test_process_message_passes_thread_id_to_agent_service():
class FakeWebSocket:
def __init__(self) -> None:
self.query_params = {"thread_id": "6:matrix18:@alice:example.org2:C1"}
self.sent_messages: list[str] = []
async def send_text(self, text: str) -> None:
self.sent_messages.append(text)
class FakeAgentService:
def __init__(self) -> None:
self.calls: list[tuple[str, str]] = []
async def astream(self, text: str, thread_id: str):
self.calls.append((text, thread_id))
yield "hello"
ws = FakeWebSocket()
agent_service = FakeAgentService()
await process_message(ws, MsgUserMessage(text="hello"), agent_service)
assert agent_service.calls == [("hello", "6:matrix18:@alice:example.org2:C1")]
assert any("AGENT_EVENT_TEXT_CHUNK" in message for message in ws.sent_messages)
assert any("AGENT_EVENT_END" in message for message in ws.sent_messages)

View file

@ -0,0 +1,91 @@
import pytest
from core.protocol import SettingsAction
from sdk.interface import UserSettings
from sdk.prototype_state import PrototypeStateStore
@pytest.mark.asyncio
async def test_get_or_create_user_is_stable_per_surface_identity():
store = PrototypeStateStore()
first = await store.get_or_create_user("@alice:example.org", "matrix", "Alice")
second = await store.get_or_create_user("@alice:example.org", "matrix")
assert first.user_id == "usr-matrix-@alice:example.org"
assert first.is_new is True
assert store._users["matrix:@alice:example.org"].is_new is False
first.display_name = "Mallory"
first.is_new = False
assert second.user_id == first.user_id
assert second.is_new is False
assert second.display_name == "Alice"
assert store._users["matrix:@alice:example.org"].display_name == "Alice"
assert store._users["matrix:@alice:example.org"].is_new is False
@pytest.mark.asyncio
async def test_settings_defaults_match_existing_mock_shape():
store = PrototypeStateStore()
settings = await store.get_settings("usr-matrix-@alice:example.org")
assert isinstance(settings, UserSettings)
assert settings.skills == {
"web-search": True,
"fetch-url": True,
"email": False,
"browser": False,
"image-gen": False,
"files": True,
}
assert settings.safety == {
"email-send": True,
"file-delete": True,
"social-post": True,
}
assert settings.soul == {"name": "Лямбда", "instructions": ""}
assert settings.plan == {"name": "Beta", "tokens_used": 0, "tokens_limit": 1000}
@pytest.mark.asyncio
async def test_get_settings_returns_connectors_copy():
store = PrototypeStateStore()
store._settings["usr-matrix-@alice:example.org"] = {
"connectors": {"github": {"enabled": True}},
}
settings = await store.get_settings("usr-matrix-@alice:example.org")
settings.connectors["github"]["enabled"] = False
settings.connectors["slack"] = {"enabled": True}
assert store._settings["usr-matrix-@alice:example.org"]["connectors"] == {
"github": {"enabled": True},
}
@pytest.mark.asyncio
async def test_update_settings_supports_toggle_skill_and_setters():
store = PrototypeStateStore()
await store.update_settings(
"usr-matrix-@alice:example.org",
SettingsAction(action="toggle_skill", payload={"skill": "browser", "enabled": True}),
)
await store.update_settings(
"usr-matrix-@alice:example.org",
SettingsAction(action="set_soul", payload={"field": "instructions", "value": "Be concise"}),
)
await store.update_settings(
"usr-matrix-@alice:example.org",
SettingsAction(action="set_safety", payload={"trigger": "social-post", "enabled": False}),
)
settings = await store.get_settings("usr-matrix-@alice:example.org")
assert settings.skills["browser"] is True
assert settings.skills["web-search"] is True
assert settings.soul["instructions"] == "Be concise"
assert settings.safety["social-post"] is False

116
tests/platform/test_real.py Normal file
View file

@ -0,0 +1,116 @@
import pytest
from core.protocol import SettingsAction
from sdk.agent_session import build_thread_key
from sdk.interface import MessageChunk, MessageResponse, UserSettings
from sdk.prototype_state import PrototypeStateStore
from sdk.real import RealPlatformClient
class FakeAgentSessionClient:
def __init__(self) -> None:
self.send_calls: list[tuple[str, str]] = []
self.stream_calls: list[tuple[str, str]] = []
async def send_message(self, *, thread_key: str, text: str) -> MessageResponse:
self.send_calls.append((thread_key, text))
return MessageResponse(
message_id=thread_key,
response=f"echo:{text}",
tokens_used=3,
finished=True,
)
async def stream_message(self, *, thread_key: str, text: str):
self.stream_calls.append((thread_key, text))
yield MessageChunk(message_id=thread_key, delta=text[:2], finished=False)
yield MessageChunk(message_id=thread_key, delta=text[2:], finished=True, tokens_used=3)
@pytest.mark.asyncio
async def test_real_platform_client_get_or_create_user_uses_local_state():
client = RealPlatformClient(
agent_sessions=FakeAgentSessionClient(),
prototype_state=PrototypeStateStore(),
)
first = await client.get_or_create_user("u1", "matrix", "Alice")
second = await client.get_or_create_user("u1", "matrix")
assert first.user_id == "usr-matrix-u1"
assert first.is_new is True
assert second.user_id == first.user_id
assert second.is_new is False
assert second.display_name == "Alice"
@pytest.mark.asyncio
async def test_real_platform_client_send_message_uses_surface_user_thread_identity():
agent_sessions = FakeAgentSessionClient()
client = RealPlatformClient(
agent_sessions=agent_sessions,
prototype_state=PrototypeStateStore(),
platform="matrix",
)
thread_key = build_thread_key("matrix", "@alice:example.org", "C1")
result = await client.send_message("@alice:example.org", "C1", "hello")
assert result == MessageResponse(
message_id=thread_key,
response="echo:hello",
tokens_used=3,
finished=True,
)
assert agent_sessions.send_calls == [(thread_key, "hello")]
@pytest.mark.asyncio
async def test_real_platform_client_stream_message_uses_surface_user_thread_identity():
agent_sessions = FakeAgentSessionClient()
client = RealPlatformClient(
agent_sessions=agent_sessions,
prototype_state=PrototypeStateStore(),
platform="matrix",
)
thread_key = build_thread_key("matrix", "@alice:example.org", "C1")
chunks = []
async for chunk in client.stream_message("@alice:example.org", "C1", "hello"):
chunks.append(chunk)
assert chunks == [
MessageChunk(
message_id=thread_key,
delta="he",
finished=False,
tokens_used=0,
),
MessageChunk(
message_id=thread_key,
delta="llo",
finished=True,
tokens_used=3,
),
]
assert agent_sessions.stream_calls == [(thread_key, "hello")]
@pytest.mark.asyncio
async def test_real_platform_client_settings_are_local():
client = RealPlatformClient(
agent_sessions=FakeAgentSessionClient(),
prototype_state=PrototypeStateStore(),
platform="matrix",
)
await client.update_settings(
"usr-matrix-u1",
SettingsAction(action="toggle_skill", payload={"skill": "browser", "enabled": True}),
)
settings = await client.get_settings("usr-matrix-u1")
assert isinstance(settings, UserSettings)
assert settings.skills["browser"] is True
assert settings.skills["web-search"] is True

18
uv.lock generated
View file

@ -1095,6 +1095,20 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/3b/ab/b3226f0bd7cdcf710fbede2b3548584366da3b19b5021e74f5bde2a8fa3f/pytest-9.0.2-py3-none-any.whl", hash = "sha256:711ffd45bf766d5264d487b917733b453d917afd2b0ad65223959f59089f875b", size = 374801, upload-time = "2025-12-06T21:30:49.154Z" }, { url = "https://files.pythonhosted.org/packages/3b/ab/b3226f0bd7cdcf710fbede2b3548584366da3b19b5021e74f5bde2a8fa3f/pytest-9.0.2-py3-none-any.whl", hash = "sha256:711ffd45bf766d5264d487b917733b453d917afd2b0ad65223959f59089f875b", size = 374801, upload-time = "2025-12-06T21:30:49.154Z" },
] ]
[[package]]
name = "pytest-aiohttp"
version = "1.1.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "aiohttp" },
{ name = "pytest" },
{ name = "pytest-asyncio" },
]
sdist = { url = "https://files.pythonhosted.org/packages/72/4b/d326890c153f2c4ce1bf45d07683c08c10a1766058a22934620bc6ac6592/pytest_aiohttp-1.1.0.tar.gz", hash = "sha256:147de8cb164f3fc9d7196967f109ab3c0b93ea3463ab50631e56438eab7b5adc", size = 12842, upload-time = "2025-01-23T12:44:04.465Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/ba/0f/e6af71c02e0f1098eaf7d2dbf3ffdf0a69fc1e0ef174f96af05cef161f1b/pytest_aiohttp-1.1.0-py3-none-any.whl", hash = "sha256:f39a11693a0dce08dd6c542d241e199dd8047a6e6596b2bcfa60d373f143456d", size = 8932, upload-time = "2025-01-23T12:44:03.27Z" },
]
[[package]] [[package]]
name = "pytest-asyncio" name = "pytest-asyncio"
version = "1.3.0" version = "1.3.0"
@ -1302,6 +1316,7 @@ version = "0.1.0"
source = { editable = "." } source = { editable = "." }
dependencies = [ dependencies = [
{ name = "aiogram" }, { name = "aiogram" },
{ name = "aiohttp" },
{ name = "httpx" }, { name = "httpx" },
{ name = "matrix-nio" }, { name = "matrix-nio" },
{ name = "pydantic" }, { name = "pydantic" },
@ -1313,6 +1328,7 @@ dependencies = [
dev = [ dev = [
{ name = "mypy" }, { name = "mypy" },
{ name = "pytest" }, { name = "pytest" },
{ name = "pytest-aiohttp" },
{ name = "pytest-asyncio" }, { name = "pytest-asyncio" },
{ name = "pytest-cov" }, { name = "pytest-cov" },
{ name = "ruff" }, { name = "ruff" },
@ -1321,11 +1337,13 @@ dev = [
[package.metadata] [package.metadata]
requires-dist = [ requires-dist = [
{ name = "aiogram", specifier = ">=3.4,<4" }, { name = "aiogram", specifier = ">=3.4,<4" },
{ name = "aiohttp", specifier = ">=3.9" },
{ name = "httpx", specifier = ">=0.27" }, { name = "httpx", specifier = ">=0.27" },
{ name = "matrix-nio", specifier = ">=0.21" }, { name = "matrix-nio", specifier = ">=0.21" },
{ name = "mypy", marker = "extra == 'dev'", specifier = ">=1.8" }, { name = "mypy", marker = "extra == 'dev'", specifier = ">=1.8" },
{ name = "pydantic", specifier = ">=2.5" }, { name = "pydantic", specifier = ">=2.5" },
{ name = "pytest", marker = "extra == 'dev'", specifier = ">=8.0" }, { name = "pytest", marker = "extra == 'dev'", specifier = ">=8.0" },
{ name = "pytest-aiohttp", marker = "extra == 'dev'", specifier = ">=1.0" },
{ name = "pytest-asyncio", marker = "extra == 'dev'", specifier = ">=0.23" }, { name = "pytest-asyncio", marker = "extra == 'dev'", specifier = ">=0.23" },
{ name = "pytest-cov", marker = "extra == 'dev'", specifier = ">=4.1" }, { name = "pytest-cov", marker = "extra == 'dev'", specifier = ">=4.1" },
{ name = "python-dotenv", specifier = ">=1.0" }, { name = "python-dotenv", specifier = ">=1.0" },