diff --git a/README.md b/README.md index b2f69fb..318a45d 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ ## Статус -Прототип в разработке. Matrix-адаптер по умолчанию работает через `MockPlatformClient`, но может переключаться на реальный direct-agent backend через `MATRIX_PLATFORM_BACKEND=real`. +Прототип в разработке. SDK платформы ещё не готов — работаем через `MockPlatformClient`. | Поверхность | Статус | Описание | |---|---|---| @@ -71,8 +71,6 @@ surfaces-bot/ - **Диалог** — сообщения, вложения, подтверждения `!yes` / `!no` и routing через `EventDispatcher` - **Стабильность** — перед `sync_forever()` бот делает bootstrap sync и стартует с `since`, чтобы не переигрывать старую timeline после рестарта - **Текущее ограничение** — encrypted DM пока не поддержан; ручное тестирование Matrix ведётся в незашифрованных комнатах и зависит от локального state-store бота -- **Backend selection** — `MATRIX_PLATFORM_BACKEND=mock` остаётся значением по умолчанию; `MATRIX_PLATFORM_BACKEND=real` требует `AGENT_WS_URL=ws://host:port/agent_ws/` -- **Ограничения real backend** — пока это текстовый direct-agent прототип без вложений и без асинхронных callbacks; локальные настройки и user-state хранятся в `PrototypeStateStore` --- @@ -91,7 +89,7 @@ class PlatformClient(Protocol): Бот не управляет lifecycle контейнеров — это делает Master (платформа). Бот передаёт `user_id` + `chat_id` + сообщение; платформа сама решает нужно ли поднять контейнер. -Сейчас: `MockPlatformClient` в `sdk/mock.py`, а Matrix real backend собирается через `sdk/real.py` при `MATRIX_PLATFORM_BACKEND=real`. +Сейчас: `MockPlatformClient` в `sdk/mock.py`. Когда SDK готов: добавляем `SdkPlatformClient`, меняем одну строку в DI. Адаптеры и ядро не трогаем. --- diff --git a/adapter/matrix/bot.py b/adapter/matrix/bot.py index a413fad..08638cb 100644 --- a/adapter/matrix/bot.py +++ b/adapter/matrix/bot.py @@ -35,11 +35,7 @@ from core.protocol import ( ) from core.settings import SettingsManager from core.store import InMemoryStore, SQLiteStore, StateStore -from sdk.agent_session import AgentSessionClient, AgentSessionConfig -from sdk.interface import PlatformClient from sdk.mock import MockPlatformClient -from sdk.prototype_state import PrototypeStateStore -from sdk.real import RealPlatformClient logger = structlog.get_logger(__name__) @@ -48,7 +44,7 @@ load_dotenv(Path(__file__).resolve().parents[2] / ".env") @dataclass class MatrixRuntime: - platform: PlatformClient + platform: MockPlatformClient store: StateStore chat_mgr: ChatManager auth_mgr: AuthManager @@ -56,7 +52,7 @@ class MatrixRuntime: dispatcher: EventDispatcher -def build_event_dispatcher(platform: PlatformClient, store: StateStore) -> EventDispatcher: +def build_event_dispatcher(platform: MockPlatformClient, store: StateStore) -> EventDispatcher: chat_mgr = ChatManager(platform, store) auth_mgr = AuthManager(platform, store) settings_mgr = SettingsManager(platform, store) @@ -68,24 +64,12 @@ def build_event_dispatcher(platform: PlatformClient, store: StateStore) -> Event return dispatcher -def _build_platform_from_env() -> PlatformClient: - backend = os.environ.get("MATRIX_PLATFORM_BACKEND", "mock").strip().lower() - if backend == "real": - ws_url = os.environ["AGENT_WS_URL"] - return RealPlatformClient( - agent_sessions=AgentSessionClient(AgentSessionConfig(base_ws_url=ws_url)), - prototype_state=PrototypeStateStore(), - platform="matrix", - ) - return MockPlatformClient() - - def build_runtime( - platform: PlatformClient | None = None, + platform: MockPlatformClient | None = None, store: StateStore | None = None, client: AsyncClient | None = None, ) -> MatrixRuntime: - platform = platform or _build_platform_from_env() + platform = platform or MockPlatformClient() store = store or InMemoryStore() chat_mgr = ChatManager(platform, store) auth_mgr = AuthManager(platform, store) diff --git a/adapter/matrix/handlers/auth.py b/adapter/matrix/handlers/auth.py index 6882404..83f1ac6 100644 --- a/adapter/matrix/handlers/auth.py +++ b/adapter/matrix/handlers/auth.py @@ -20,12 +20,12 @@ async def handle_invite(client: Any, room: Any, event: Any, platform, store, aut matrix_user_id = getattr(event, "sender", "") display_name = getattr(room, "display_name", None) or matrix_user_id - await client.join(room.room_id) - existing = await get_user_meta(store, matrix_user_id) if existing and existing.get("space_id"): return + await client.join(room.room_id) + user = await platform.get_or_create_user( external_id=matrix_user_id, platform="matrix", diff --git a/docs/superpowers/specs/2026-04-08-matrix-direct-agent-prototype-design.md b/docs/superpowers/specs/2026-04-08-matrix-direct-agent-prototype-design.md deleted file mode 100644 index 581eb56..0000000 --- a/docs/superpowers/specs/2026-04-08-matrix-direct-agent-prototype-design.md +++ /dev/null @@ -1,243 +0,0 @@ -# Matrix Direct-Agent Prototype Design - -## Goal - -Ship a working Matrix prototype that talks to the real Lambda agent instead of `MockPlatformClient`, while preserving the current Matrix adapter logic and keeping the code expandable toward future platform versions. - -## Scope - -This design is for a Matrix-only prototype delivered from this repository on a dedicated branch. It is not a `main` branch rollout and it is not a separate prototype repo. - -The design assumes a minimal companion fork of `platform/agent` may be used, but changes there must stay as small as possible. - -## Constraints - -- Preserve the current Matrix transport logic as much as possible. -- Keep `core/` unaware of platform immaturity. -- Avoid broad changes to platform repos. -- Prefer one narrow patch to `platform/agent` over changes to both `platform/agent` and `platform/agent_api`. -- Keep the backend boundary reusable for future Telegram or other surfaces. -- Do not pretend unsupported platform capabilities are real. - -## Live Platform Findings - -Based on the live repo analysis performed on April 7, 2026: - -- `platform/master` is not yet a usable consumer-facing backend for surfaces. -- `platform/agent` exposes a working WebSocket endpoint for prompt/response exchange. -- `platform/agent_api` documents and implements text-oriented WebSocket messaging, but the bot does not need to depend on that package directly. -- `platform/agent` currently hardcodes a single shared backend memory thread via `thread_id = "default"`, which would cause all chats to share context. - -## Architecture - -The prototype remains in this repo and introduces a new real backend path behind the existing SDK boundary. - -### New files - -- `sdk/real.py` - - Exports `RealPlatformClient` - - Implements the existing `PlatformClient` contract from `sdk/interface.py` - - Composes the lower-level prototype pieces - -- `sdk/agent_session.py` - - Owns direct WebSocket communication with the real agent - - Manages connection lifecycle, request/response handling, and thread identity - -- `sdk/prototype_state.py` - - Owns local prototype-only state - - Stores user mapping, local settings, and lightweight metadata needed until a real control plane exists - -### Responsibility split - -- Matrix adapter remains transport-specific only. -- `core/` continues to depend only on `PlatformClient`. -- `RealPlatformClient` acts as the anti-corruption layer between the current bot contract and the platform’s incomplete shape. -- Local control-plane behavior remains explicit and replaceable later. - -## Message and Identity Model - -Each Matrix chat gets a stable backend session identity. - -### Surface identity - -- Surface: `matrix` -- Surface user id: Matrix MXID, for example `@alice:example.org` -- Surface chat id: logical chat id from `ChatManager`, for example `C1` -- Surface ref: Matrix room id - -### Backend thread identity - -Use a deterministic thread key: - -`matrix:{matrix_user_id}:{chat_id}` - -Example: - -`matrix:@alice:example.org:C1` - -### Mapping rules - -- One Matrix logical chat maps to one backend memory thread. -- `!new` creates a fresh logical chat and therefore a fresh backend thread. -- `!rename` only changes display metadata and does not change backend identity. -- `!archive` stops active use of the thread in the surface, but does not need to delete backend memory in v1. - -## Runtime Flow - -### Normal message flow - -1. Matrix event arrives in an existing room. -2. Existing Matrix routing resolves room to logical `chat_id`. -3. `core/handlers/message.py` calls `platform.send_message(...)`. -4. `RealPlatformClient` derives the backend thread key from `(platform, user_id, chat_id)`. -5. `AgentSessionClient` sends the prompt to the agent WebSocket using that thread key. -6. The reply is converted into the existing `MessageResponse` or `MessageChunk` contract. -7. Matrix sends the final text back to the room. - -### Settings flow - -For v1, settings remain local: - -- `get_settings()` reads from local prototype state -- `update_settings()` writes to local prototype state - -This is intentional. The prototype must not claim settings are backed by the real platform when no such platform API exists yet. - -## Feature Matrix - -### Real in v1 - -- `!start` -- Plain text messaging with the real agent -- Matrix chat lifecycle already implemented in this repo: - - `!new` - - `!chats` - - `!rename` - - `!archive` -- Per-chat conversation memory, provided the agent accepts dynamic thread identity - -### Local in v1 - -- `!settings` -- `!skills` -- `!soul` -- `!safety` -- `!status` -- user registration and local user mapping - -### Deferred - -- Attachments and file upload to the agent -- Voice input to the agent -- Image input to the agent -- Long-running task callbacks and webhook-style async completion -- Real control-plane integration through `platform/master` - -## Minimal Upstream Change - -To avoid shared memory across all conversations, make one narrow change in the forked `platform/agent` repo: - -- stop hardcoding `thread_id = "default"` -- derive thread identity from WebSocket connection context - -### Preferred mechanism - -Read `thread_id` from WebSocket query parameters rather than changing the message payload format. - -Example: - -`ws://host:port/agent_ws/?thread_id=matrix:@alice:example.org:C1` - -This is preferred because: - -- it limits the platform patch to one repo -- it avoids changing both server and SDK protocol shape -- it keeps the client message body text-only -- it makes session identity explicit and easy to reason about - -## Why Not Use `platform/agent_api` Directly - -The bot should not depend on their client package for the prototype. - -Reasons: - -- the bot already has its own internal integration boundary in `sdk/interface.py` -- a tiny local WebSocket client is enough for this protocol -- avoiding a dependency on `platform/agent_api` keeps rebasing simpler -- if upstream stabilizes later, the bot can adopt their SDK without affecting Matrix handlers - -## Repo Strategy - -### This repo - -Owns: - -- Matrix surface logic -- SDK compatibility layer -- local prototype state -- backend selection and wiring - -### Forked `platform/agent` - -Owns only: - -- minimal thread identity patch required for per-chat memory - -### Explicitly not doing - -- no separate prototype repo -- no changes to `platform/master` for v1 -- no unnecessary changes to `platform/agent_api` - -## Migration Path - -This design is intentionally expandable. - -When the platform develops further: - -- `sdk/prototype_state.py` can be replaced or reduced by a real `MasterClient` -- `sdk/agent_session.py` can remain the direct session transport if still relevant -- `RealPlatformClient` can continue to present the stable bot-facing interface -- Telegram or another surface can reuse the same backend components without rethinking the integration model - -## Risks - -### Risk: hidden platform assumptions leak upward - -Mitigation: -- keep all direct-agent logic below `RealPlatformClient` -- avoid changing `core/` contracts for prototype convenience - -### Risk: settings semantics drift from future platform reality - -Mitigation: -- make local settings behavior explicit in code and docs -- keep settings isolated in `sdk/prototype_state.py` - -### Risk: upstream `agent` fork diverges - -Mitigation: -- keep the patch minimal and narrowly scoped to thread identity - -### Risk: thread identity source is unstable - -Mitigation: -- derive thread key from existing stable bot-side identities: platform, surface user id, logical chat id - -## Testing Strategy - -- Unit tests for `sdk/agent_session.py` request/response behavior -- Unit tests for `sdk/prototype_state.py` local settings and user mapping -- Unit tests for `sdk/real.py` contract compliance with `PlatformClient` -- Matrix integration tests confirming: - - existing commands still work - - different logical chats map to different backend thread keys - - rename does not change thread identity - - archive stops reuse from the surface perspective - -## Success Criteria - -- Matrix can talk to the real agent without rewriting the Matrix adapter architecture -- Chats do not share backend memory accidentally -- Unsupported platform capabilities remain local or deferred rather than being faked as “real” -- The backend boundary remains suitable for later Telegram or other surfaces diff --git a/pyproject.toml b/pyproject.toml index ccc6309..8f4978b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,14 +15,12 @@ dependencies = [ "structlog>=24.1", "python-dotenv>=1.0", "httpx>=0.27", - "aiohttp>=3.9", ] [project.optional-dependencies] dev = [ "pytest>=8.0", "pytest-asyncio>=0.23", - "pytest-aiohttp>=1.0", "pytest-cov>=4.1", "ruff>=0.3", "mypy>=1.8", diff --git a/sdk/__init__.py b/sdk/__init__.py index f7939f7..e69de29 100644 --- a/sdk/__init__.py +++ b/sdk/__init__.py @@ -1,9 +0,0 @@ -__all__ = ["RealPlatformClient"] - - -def __getattr__(name: str): - if name == "RealPlatformClient": - from sdk.real import RealPlatformClient - - return RealPlatformClient - raise AttributeError(f"module {__name__!r} has no attribute {name!r}") diff --git a/sdk/agent_session.py b/sdk/agent_session.py deleted file mode 100644 index 0f959a1..0000000 --- a/sdk/agent_session.py +++ /dev/null @@ -1,93 +0,0 @@ -from __future__ import annotations - -from dataclasses import dataclass -from typing import AsyncIterator -from urllib.parse import parse_qsl, urlencode, urlsplit, urlunsplit - -from sdk.interface import MessageChunk, MessageResponse, PlatformError - - -def build_thread_key(platform: str, user_id: str, chat_id: str) -> str: - return f"{len(platform)}:{platform}{len(user_id)}:{user_id}{len(chat_id)}:{chat_id}" - - -@dataclass(frozen=True, slots=True) -class AgentSessionConfig: - base_ws_url: str - timeout_seconds: float = 30.0 - - -class AgentSessionClient: - def __init__(self, config: AgentSessionConfig) -> None: - self._config = config - - async def send_message(self, *, thread_key: str, text: str) -> MessageResponse: - response_parts: list[str] = [] - tokens_used = 0 - - async for chunk in self.stream_message(thread_key=thread_key, text=text): - if chunk.delta: - response_parts.append(chunk.delta) - if chunk.finished: - tokens_used = chunk.tokens_used - - return MessageResponse( - message_id=thread_key, - response="".join(response_parts), - tokens_used=tokens_used, - finished=True, - ) - - async def stream_message(self, *, thread_key: str, text: str) -> AsyncIterator[MessageChunk]: - import aiohttp - - async with aiohttp.ClientSession() as session: - async with session.ws_connect( - self._ws_url(thread_key), - heartbeat=30, - ) as ws: - status = await ws.receive_json(timeout=self._config.timeout_seconds) - if status.get("type") != "STATUS": - raise PlatformError("Agent did not send STATUS", code="AGENT_PROTOCOL_ERROR") - - await ws.send_json({"type": "USER_MESSAGE", "text": text}) - - while True: - payload = await ws.receive_json(timeout=self._config.timeout_seconds) - msg_type = payload.get("type") - - if msg_type == "AGENT_EVENT_TEXT_CHUNK": - yield MessageChunk( - message_id=thread_key, - delta=payload["text"], - finished=False, - ) - elif msg_type == "AGENT_EVENT_END": - yield MessageChunk( - message_id=thread_key, - delta="", - finished=True, - tokens_used=payload.get("tokens_used", 0), - ) - return - elif msg_type == "ERROR": - raise PlatformError( - payload.get("details", "Agent error"), - code=payload.get("code", "AGENT_ERROR"), - ) - elif msg_type == "GRACEFUL_DISCONNECT": - raise PlatformError( - "Agent disconnected gracefully", - code="GRACEFUL_DISCONNECT", - ) - else: - raise PlatformError( - f"Unexpected agent message: {payload}", - code="AGENT_PROTOCOL_ERROR", - ) - - def _ws_url(self, thread_key: str) -> str: - parts = urlsplit(self._config.base_ws_url) - query = dict(parse_qsl(parts.query, keep_blank_values=True)) - query["thread_id"] = thread_key - return urlunsplit(parts._replace(query=urlencode(query))) diff --git a/sdk/prototype_state.py b/sdk/prototype_state.py deleted file mode 100644 index ccb75f1..0000000 --- a/sdk/prototype_state.py +++ /dev/null @@ -1,80 +0,0 @@ -from __future__ import annotations - -from datetime import UTC, datetime -from typing import Any - -from sdk.interface import User, UserSettings - -# Keep the prototype backend self-contained; do not import these from sdk.mock. -DEFAULT_SKILLS: dict[str, bool] = { - "web-search": True, - "fetch-url": True, - "email": False, - "browser": False, - "image-gen": False, - "files": True, -} -DEFAULT_SAFETY: dict[str, bool] = { - "email-send": True, - "file-delete": True, - "social-post": True, -} -DEFAULT_SOUL: dict[str, str] = {"name": "Лямбда", "instructions": ""} -DEFAULT_PLAN: dict[str, Any] = { - "name": "Beta", - "tokens_used": 0, - "tokens_limit": 1000, -} - - -class PrototypeStateStore: - def __init__(self) -> None: - self._users: dict[str, User] = {} - self._settings: dict[str, dict[str, Any]] = {} - - async def get_or_create_user( - self, - external_id: str, - platform: str, - display_name: str | None = None, - ) -> User: - key = f"{platform}:{external_id}" - existing = self._users.get(key) - if existing is not None: - stored = existing.model_copy(update={"is_new": False}) - self._users[key] = stored - return stored.model_copy() - - user = User( - user_id=f"usr-{platform}-{external_id}", - external_id=external_id, - platform=platform, - display_name=display_name, - created_at=datetime.now(UTC), - is_new=True, - ) - self._users[key] = user.model_copy(update={"is_new": False}) - return user.model_copy() - - async def get_settings(self, user_id: str) -> UserSettings: - stored = self._settings.get(user_id, {}) - return UserSettings( - skills={**DEFAULT_SKILLS, **stored.get("skills", {})}, - connectors=dict(stored.get("connectors", {})), - soul={**DEFAULT_SOUL, **stored.get("soul", {})}, - safety={**DEFAULT_SAFETY, **stored.get("safety", {})}, - plan={**DEFAULT_PLAN, **stored.get("plan", {})}, - ) - - async def update_settings(self, user_id: str, action: Any) -> None: - settings = self._settings.setdefault(user_id, {}) - - if action.action == "toggle_skill": - skills = settings.setdefault("skills", DEFAULT_SKILLS.copy()) - skills[action.payload["skill"]] = action.payload.get("enabled", True) - elif action.action == "set_soul": - soul = settings.setdefault("soul", DEFAULT_SOUL.copy()) - soul[action.payload["field"]] = action.payload["value"] - elif action.action == "set_safety": - safety = settings.setdefault("safety", DEFAULT_SAFETY.copy()) - safety[action.payload["trigger"]] = action.payload.get("enabled", True) diff --git a/sdk/real.py b/sdk/real.py deleted file mode 100644 index 7da48c8..0000000 --- a/sdk/real.py +++ /dev/null @@ -1,61 +0,0 @@ -from __future__ import annotations - -from typing import TYPE_CHECKING, AsyncIterator - -from sdk.agent_session import build_thread_key -from sdk.interface import Attachment, MessageChunk, MessageResponse, PlatformClient, User, UserSettings -from sdk.prototype_state import PrototypeStateStore - -if TYPE_CHECKING: - from sdk.agent_session import AgentSessionClient - - -class RealPlatformClient(PlatformClient): - def __init__( - self, - agent_sessions: AgentSessionClient, - prototype_state: PrototypeStateStore, - platform: str = "matrix", - ) -> None: - self._agent_sessions = agent_sessions - self._prototype_state = prototype_state - self._platform = platform - - async def get_or_create_user( - self, - external_id: str, - platform: str, - display_name: str | None = None, - ) -> User: - return await self._prototype_state.get_or_create_user( - external_id=external_id, - platform=platform, - display_name=display_name, - ) - - async def send_message( - self, - user_id: str, - chat_id: str, - text: str, - attachments: list[Attachment] | None = None, - ) -> MessageResponse: - thread_key = build_thread_key(self._platform, user_id, chat_id) - return await self._agent_sessions.send_message(thread_key=thread_key, text=text) - - async def stream_message( - self, - user_id: str, - chat_id: str, - text: str, - attachments: list[Attachment] | None = None, - ) -> AsyncIterator[MessageChunk]: - thread_key = build_thread_key(self._platform, user_id, chat_id) - async for chunk in self._agent_sessions.stream_message(thread_key=thread_key, text=text): - yield chunk - - async def get_settings(self, user_id: str) -> UserSettings: - return await self._prototype_state.get_settings(user_id) - - async def update_settings(self, user_id: str, action) -> None: - await self._prototype_state.update_settings(user_id, action) diff --git a/tests/adapter/matrix/test_dispatcher.py b/tests/adapter/matrix/test_dispatcher.py index ad4746c..dce9243 100644 --- a/tests/adapter/matrix/test_dispatcher.py +++ b/tests/adapter/matrix/test_dispatcher.py @@ -11,7 +11,6 @@ from adapter.matrix.handlers.auth import handle_invite from adapter.matrix.store import get_room_meta, get_user_meta, set_user_meta from core.protocol import IncomingCallback, IncomingCommand, OutgoingMessage from sdk.mock import MockPlatformClient -from sdk.real import RealPlatformClient async def test_matrix_dispatcher_registers_custom_handlers(): @@ -179,9 +178,7 @@ async def test_invite_event_is_idempotent_per_user(): runtime.chat_mgr, ) - assert client.join.await_count == 2 assert client.room_create.await_count == 2 - client.room_send.assert_awaited_once() async def test_bot_ignores_its_own_messages(): @@ -257,12 +254,3 @@ async def test_prepare_live_sync_returns_next_batch_from_bootstrap_sync(): client.sync.assert_awaited_once_with(timeout=0, full_state=True) assert since == "s123" - - -async def test_build_runtime_uses_real_platform_when_matrix_backend_is_real(monkeypatch): - monkeypatch.setenv("MATRIX_PLATFORM_BACKEND", "real") - monkeypatch.setenv("AGENT_WS_URL", "ws://agent.example/agent_ws/") - - runtime = build_runtime() - - assert isinstance(runtime.platform, RealPlatformClient) diff --git a/tests/core/test_integration.py b/tests/core/test_integration.py index db2cf8f..207a0ba 100644 --- a/tests/core/test_integration.py +++ b/tests/core/test_integration.py @@ -5,10 +5,6 @@ Smoke test: полный цикл через dispatcher + реальные manag """ import pytest from sdk.mock import MockPlatformClient -from sdk.agent_session import build_thread_key -from sdk.interface import MessageChunk, MessageResponse -from sdk.prototype_state import PrototypeStateStore -from sdk.real import RealPlatformClient from core.store import InMemoryStore from core.chat import ChatManager from core.auth import AuthManager @@ -22,30 +18,6 @@ from core.protocol import ( ) -class FakeAgentSessionClient: - def __init__(self) -> None: - self.send_calls: list[tuple[str, str]] = [] - - async def send_message(self, *, thread_key: str, text: str) -> MessageResponse: - self.send_calls.append((thread_key, text)) - return MessageResponse( - message_id=thread_key, - response=f"[REAL] {text}", - tokens_used=5, - finished=True, - ) - - async def stream_message(self, *, thread_key: str, text: str): - self.send_calls.append((thread_key, text)) - if False: - yield MessageChunk( - message_id=thread_key, - delta=text, - tokens_used=0, - finished=True, - ) - - @pytest.fixture def dispatcher(): platform = MockPlatformClient() @@ -60,25 +32,6 @@ def dispatcher(): return d -@pytest.fixture -def real_dispatcher(): - agent_sessions = FakeAgentSessionClient() - platform = RealPlatformClient( - agent_sessions=agent_sessions, - prototype_state=PrototypeStateStore(), - platform="matrix", - ) - store = InMemoryStore() - d = EventDispatcher( - platform=platform, - chat_mgr=ChatManager(platform, store), - auth_mgr=AuthManager(platform, store), - settings_mgr=SettingsManager(platform, store), - ) - register_all(d) - return d, agent_sessions - - async def test_full_flow_start_then_message(dispatcher): start = IncomingCommand(user_id="tg_123", platform="telegram", chat_id="C1", command="start") result = await dispatcher.dispatch(start) @@ -130,20 +83,3 @@ async def test_toggle_skill_callback(dispatcher): ) result = await dispatcher.dispatch(cb) assert any("browser" in r.text for r in result if isinstance(r, OutgoingMessage)) - - -async def test_full_flow_with_real_platform_uses_thread_key(real_dispatcher): - dispatcher, agent_sessions = real_dispatcher - - start = IncomingCommand(user_id="u1", platform="matrix", chat_id="C1", command="start") - result = await dispatcher.dispatch(start) - assert any(isinstance(r, OutgoingMessage) for r in result) - - msg = IncomingMessage(user_id="u1", platform="matrix", chat_id="C1", text="Привет!") - result = await dispatcher.dispatch(msg) - texts = [r.text for r in result if isinstance(r, OutgoingMessage)] - - assert texts == ["[REAL] Привет!"] - assert agent_sessions.send_calls == [ - (build_thread_key("matrix", "u1", "C1"), "Привет!") - ] diff --git a/tests/platform/test_agent_session.py b/tests/platform/test_agent_session.py deleted file mode 100644 index 2d085c3..0000000 --- a/tests/platform/test_agent_session.py +++ /dev/null @@ -1,193 +0,0 @@ -import sys -from pathlib import Path -from types import ModuleType - -import pytest -from aiohttp import web - -from sdk.interface import MessageChunk, MessageResponse -from sdk.agent_session import AgentSessionClient, AgentSessionConfig, build_thread_key - -AGENT_ROOT = Path(__file__).resolve().parents[2] / "external" / "platform-agent" -AGENT_API_ROOT = Path(__file__).resolve().parents[2] / "external" / "platform-agent_api" -for path in (AGENT_ROOT, AGENT_API_ROOT): - if str(path) not in sys.path: - sys.path.insert(0, str(path)) - -if "fastapi" not in sys.modules: - fastapi = ModuleType("fastapi") - - class _Router: - def websocket(self, _path: str): - def decorator(fn): - return fn - - return decorator - - class _WebSocketDisconnect(Exception): - pass - - def _depends(value): - return value - - fastapi.APIRouter = _Router - fastapi.WebSocket = object - fastapi.WebSocketDisconnect = _WebSocketDisconnect - fastapi.Depends = _depends - sys.modules["fastapi"] = fastapi - -if "src.agent" not in sys.modules: - agent_module = ModuleType("src.agent") - - class _AgentService: - async def astream(self, text: str, thread_id: str): - yield text - - def _get_agent_service(): - return _AgentService() - - agent_module.AgentService = _AgentService - agent_module.get_agent_service = _get_agent_service - sys.modules["src.agent"] = agent_module - -from lambda_agent_api.client import MsgUserMessage # noqa: E402 -from src.api.external import process_message # noqa: E402 - - -def test_build_thread_key_uses_platform_user_and_chat_id(): - assert build_thread_key("matrix", "@alice:example.org", "C1") == "6:matrix18:@alice:example.org2:C1" - - -def test_build_thread_key_does_not_collide_when_user_id_contains_colons(): - left = build_thread_key("matrix", "@alice:example.org", "C1") - right = build_thread_key("matrix", "@alice", "example.org:C1") - - assert left != right - - -@pytest.mark.asyncio -async def test_stream_message_yields_text_chunks_and_end(aiohttp_server): - thread_key = build_thread_key("matrix", "@alice:example.org", "C1") - - async def handler(request): - ws = web.WebSocketResponse() - await ws.prepare(request) - - assert request.query["thread_id"] == thread_key - - await ws.send_json({"type": "STATUS"}) - - message = await ws.receive_json() - assert message == {"type": "USER_MESSAGE", "text": "hello"} - - await ws.send_json({"type": "AGENT_EVENT_TEXT_CHUNK", "text": "hel"}) - await ws.send_json({"type": "AGENT_EVENT_TEXT_CHUNK", "text": "lo"}) - await ws.send_json({"type": "AGENT_EVENT_END", "tokens_used": 7}) - await ws.close() - return ws - - app = web.Application() - app.router.add_get("/agent_ws/", handler) - server = await aiohttp_server(app) - - client = AgentSessionClient(AgentSessionConfig(base_ws_url=str(server.make_url("/agent_ws/")))) - - chunks = [] - async for chunk in client.stream_message( - thread_key=thread_key, - text="hello", - ): - chunks.append(chunk) - - assert chunks == [ - MessageChunk(message_id=thread_key, delta="hel", finished=False, tokens_used=0), - MessageChunk(message_id=thread_key, delta="lo", finished=False, tokens_used=0), - MessageChunk(message_id=thread_key, delta="", finished=True, tokens_used=7), - ] - - -@pytest.mark.asyncio -async def test_send_message_collects_streamed_chunks_and_tokens(aiohttp_server): - thread_key = build_thread_key("matrix", "@alice:example.org", "C1") - - async def handler(request): - ws = web.WebSocketResponse() - await ws.prepare(request) - - assert request.query["thread_id"] == thread_key - - await ws.send_json({"type": "STATUS"}) - - message = await ws.receive_json() - assert message == {"type": "USER_MESSAGE", "text": "hello world"} - - await ws.send_json({"type": "AGENT_EVENT_TEXT_CHUNK", "text": "hello "}) - await ws.send_json({"type": "AGENT_EVENT_TEXT_CHUNK", "text": "world"}) - await ws.send_json({"type": "AGENT_EVENT_END", "tokens_used": 11}) - await ws.close() - return ws - - app = web.Application() - app.router.add_get("/agent_ws/", handler) - server = await aiohttp_server(app) - - client = AgentSessionClient(AgentSessionConfig(base_ws_url=str(server.make_url("/agent_ws/")))) - - result = await client.send_message( - thread_key=thread_key, - text="hello world", - ) - - assert result == MessageResponse( - message_id=thread_key, - response="hello world", - tokens_used=11, - finished=True, - ) - - -@pytest.mark.asyncio -async def test_process_message_requires_thread_id_query_param(): - class FakeWebSocket: - query_params = {} - - async def send_text(self, text: str) -> None: - raise AssertionError(f"send_text should not be called: {text}") - - class FakeAgentService: - async def astream(self, text: str, thread_id: str): - yield text - - with pytest.raises(ValueError, match="thread_id query parameter is required"): - await process_message( - FakeWebSocket(), - MsgUserMessage(text="hello"), - FakeAgentService(), - ) - - -@pytest.mark.asyncio -async def test_process_message_passes_thread_id_to_agent_service(): - class FakeWebSocket: - def __init__(self) -> None: - self.query_params = {"thread_id": "6:matrix18:@alice:example.org2:C1"} - self.sent_messages: list[str] = [] - - async def send_text(self, text: str) -> None: - self.sent_messages.append(text) - - class FakeAgentService: - def __init__(self) -> None: - self.calls: list[tuple[str, str]] = [] - - async def astream(self, text: str, thread_id: str): - self.calls.append((text, thread_id)) - yield "hello" - - ws = FakeWebSocket() - agent_service = FakeAgentService() - await process_message(ws, MsgUserMessage(text="hello"), agent_service) - - assert agent_service.calls == [("hello", "6:matrix18:@alice:example.org2:C1")] - assert any("AGENT_EVENT_TEXT_CHUNK" in message for message in ws.sent_messages) - assert any("AGENT_EVENT_END" in message for message in ws.sent_messages) diff --git a/tests/platform/test_prototype_state.py b/tests/platform/test_prototype_state.py deleted file mode 100644 index b5f5dc3..0000000 --- a/tests/platform/test_prototype_state.py +++ /dev/null @@ -1,91 +0,0 @@ -import pytest - -from core.protocol import SettingsAction -from sdk.interface import UserSettings -from sdk.prototype_state import PrototypeStateStore - - -@pytest.mark.asyncio -async def test_get_or_create_user_is_stable_per_surface_identity(): - store = PrototypeStateStore() - - first = await store.get_or_create_user("@alice:example.org", "matrix", "Alice") - second = await store.get_or_create_user("@alice:example.org", "matrix") - - assert first.user_id == "usr-matrix-@alice:example.org" - assert first.is_new is True - assert store._users["matrix:@alice:example.org"].is_new is False - - first.display_name = "Mallory" - first.is_new = False - - assert second.user_id == first.user_id - assert second.is_new is False - assert second.display_name == "Alice" - assert store._users["matrix:@alice:example.org"].display_name == "Alice" - assert store._users["matrix:@alice:example.org"].is_new is False - - -@pytest.mark.asyncio -async def test_settings_defaults_match_existing_mock_shape(): - store = PrototypeStateStore() - - settings = await store.get_settings("usr-matrix-@alice:example.org") - - assert isinstance(settings, UserSettings) - assert settings.skills == { - "web-search": True, - "fetch-url": True, - "email": False, - "browser": False, - "image-gen": False, - "files": True, - } - assert settings.safety == { - "email-send": True, - "file-delete": True, - "social-post": True, - } - assert settings.soul == {"name": "Лямбда", "instructions": ""} - assert settings.plan == {"name": "Beta", "tokens_used": 0, "tokens_limit": 1000} - - -@pytest.mark.asyncio -async def test_get_settings_returns_connectors_copy(): - store = PrototypeStateStore() - store._settings["usr-matrix-@alice:example.org"] = { - "connectors": {"github": {"enabled": True}}, - } - - settings = await store.get_settings("usr-matrix-@alice:example.org") - settings.connectors["github"]["enabled"] = False - settings.connectors["slack"] = {"enabled": True} - - assert store._settings["usr-matrix-@alice:example.org"]["connectors"] == { - "github": {"enabled": True}, - } - - -@pytest.mark.asyncio -async def test_update_settings_supports_toggle_skill_and_setters(): - store = PrototypeStateStore() - - await store.update_settings( - "usr-matrix-@alice:example.org", - SettingsAction(action="toggle_skill", payload={"skill": "browser", "enabled": True}), - ) - await store.update_settings( - "usr-matrix-@alice:example.org", - SettingsAction(action="set_soul", payload={"field": "instructions", "value": "Be concise"}), - ) - await store.update_settings( - "usr-matrix-@alice:example.org", - SettingsAction(action="set_safety", payload={"trigger": "social-post", "enabled": False}), - ) - - settings = await store.get_settings("usr-matrix-@alice:example.org") - - assert settings.skills["browser"] is True - assert settings.skills["web-search"] is True - assert settings.soul["instructions"] == "Be concise" - assert settings.safety["social-post"] is False diff --git a/tests/platform/test_real.py b/tests/platform/test_real.py deleted file mode 100644 index 7225cfd..0000000 --- a/tests/platform/test_real.py +++ /dev/null @@ -1,116 +0,0 @@ -import pytest - -from core.protocol import SettingsAction -from sdk.agent_session import build_thread_key -from sdk.interface import MessageChunk, MessageResponse, UserSettings -from sdk.prototype_state import PrototypeStateStore -from sdk.real import RealPlatformClient - - -class FakeAgentSessionClient: - def __init__(self) -> None: - self.send_calls: list[tuple[str, str]] = [] - self.stream_calls: list[tuple[str, str]] = [] - - async def send_message(self, *, thread_key: str, text: str) -> MessageResponse: - self.send_calls.append((thread_key, text)) - return MessageResponse( - message_id=thread_key, - response=f"echo:{text}", - tokens_used=3, - finished=True, - ) - - async def stream_message(self, *, thread_key: str, text: str): - self.stream_calls.append((thread_key, text)) - yield MessageChunk(message_id=thread_key, delta=text[:2], finished=False) - yield MessageChunk(message_id=thread_key, delta=text[2:], finished=True, tokens_used=3) - - -@pytest.mark.asyncio -async def test_real_platform_client_get_or_create_user_uses_local_state(): - client = RealPlatformClient( - agent_sessions=FakeAgentSessionClient(), - prototype_state=PrototypeStateStore(), - ) - - first = await client.get_or_create_user("u1", "matrix", "Alice") - second = await client.get_or_create_user("u1", "matrix") - - assert first.user_id == "usr-matrix-u1" - assert first.is_new is True - assert second.user_id == first.user_id - assert second.is_new is False - assert second.display_name == "Alice" - - -@pytest.mark.asyncio -async def test_real_platform_client_send_message_uses_surface_user_thread_identity(): - agent_sessions = FakeAgentSessionClient() - client = RealPlatformClient( - agent_sessions=agent_sessions, - prototype_state=PrototypeStateStore(), - platform="matrix", - ) - - thread_key = build_thread_key("matrix", "@alice:example.org", "C1") - result = await client.send_message("@alice:example.org", "C1", "hello") - - assert result == MessageResponse( - message_id=thread_key, - response="echo:hello", - tokens_used=3, - finished=True, - ) - assert agent_sessions.send_calls == [(thread_key, "hello")] - - -@pytest.mark.asyncio -async def test_real_platform_client_stream_message_uses_surface_user_thread_identity(): - agent_sessions = FakeAgentSessionClient() - client = RealPlatformClient( - agent_sessions=agent_sessions, - prototype_state=PrototypeStateStore(), - platform="matrix", - ) - - thread_key = build_thread_key("matrix", "@alice:example.org", "C1") - chunks = [] - async for chunk in client.stream_message("@alice:example.org", "C1", "hello"): - chunks.append(chunk) - - assert chunks == [ - MessageChunk( - message_id=thread_key, - delta="he", - finished=False, - tokens_used=0, - ), - MessageChunk( - message_id=thread_key, - delta="llo", - finished=True, - tokens_used=3, - ), - ] - assert agent_sessions.stream_calls == [(thread_key, "hello")] - - -@pytest.mark.asyncio -async def test_real_platform_client_settings_are_local(): - client = RealPlatformClient( - agent_sessions=FakeAgentSessionClient(), - prototype_state=PrototypeStateStore(), - platform="matrix", - ) - - await client.update_settings( - "usr-matrix-u1", - SettingsAction(action="toggle_skill", payload={"skill": "browser", "enabled": True}), - ) - - settings = await client.get_settings("usr-matrix-u1") - - assert isinstance(settings, UserSettings) - assert settings.skills["browser"] is True - assert settings.skills["web-search"] is True diff --git a/uv.lock b/uv.lock index 35c8460..0c37403 100644 --- a/uv.lock +++ b/uv.lock @@ -1095,20 +1095,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/3b/ab/b3226f0bd7cdcf710fbede2b3548584366da3b19b5021e74f5bde2a8fa3f/pytest-9.0.2-py3-none-any.whl", hash = "sha256:711ffd45bf766d5264d487b917733b453d917afd2b0ad65223959f59089f875b", size = 374801, upload-time = "2025-12-06T21:30:49.154Z" }, ] -[[package]] -name = "pytest-aiohttp" -version = "1.1.0" -source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "aiohttp" }, - { name = "pytest" }, - { name = "pytest-asyncio" }, -] -sdist = { url = "https://files.pythonhosted.org/packages/72/4b/d326890c153f2c4ce1bf45d07683c08c10a1766058a22934620bc6ac6592/pytest_aiohttp-1.1.0.tar.gz", hash = "sha256:147de8cb164f3fc9d7196967f109ab3c0b93ea3463ab50631e56438eab7b5adc", size = 12842, upload-time = "2025-01-23T12:44:04.465Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/ba/0f/e6af71c02e0f1098eaf7d2dbf3ffdf0a69fc1e0ef174f96af05cef161f1b/pytest_aiohttp-1.1.0-py3-none-any.whl", hash = "sha256:f39a11693a0dce08dd6c542d241e199dd8047a6e6596b2bcfa60d373f143456d", size = 8932, upload-time = "2025-01-23T12:44:03.27Z" }, -] - [[package]] name = "pytest-asyncio" version = "1.3.0" @@ -1316,7 +1302,6 @@ version = "0.1.0" source = { editable = "." } dependencies = [ { name = "aiogram" }, - { name = "aiohttp" }, { name = "httpx" }, { name = "matrix-nio" }, { name = "pydantic" }, @@ -1328,7 +1313,6 @@ dependencies = [ dev = [ { name = "mypy" }, { name = "pytest" }, - { name = "pytest-aiohttp" }, { name = "pytest-asyncio" }, { name = "pytest-cov" }, { name = "ruff" }, @@ -1337,13 +1321,11 @@ dev = [ [package.metadata] requires-dist = [ { name = "aiogram", specifier = ">=3.4,<4" }, - { name = "aiohttp", specifier = ">=3.9" }, { name = "httpx", specifier = ">=0.27" }, { name = "matrix-nio", specifier = ">=0.21" }, { name = "mypy", marker = "extra == 'dev'", specifier = ">=1.8" }, { name = "pydantic", specifier = ">=2.5" }, { name = "pytest", marker = "extra == 'dev'", specifier = ">=8.0" }, - { name = "pytest-aiohttp", marker = "extra == 'dev'", specifier = ">=1.0" }, { name = "pytest-asyncio", marker = "extra == 'dev'", specifier = ">=0.23" }, { name = "pytest-cov", marker = "extra == 'dev'", specifier = ">=4.1" }, { name = "python-dotenv", specifier = ">=1.0" },