diff --git a/adapter/matrix/bot.py b/adapter/matrix/bot.py index debd2fa..fc1b57a 100644 --- a/adapter/matrix/bot.py +++ b/adapter/matrix/bot.py @@ -30,11 +30,13 @@ from adapter.matrix.files import ( matrix_msgtype_for_attachment, resolve_workspace_attachment_path, ) +from adapter.matrix.agent_registry import load_agent_registry from adapter.matrix.handlers import register_matrix_handlers from adapter.matrix.handlers.auth import handle_invite, provision_workspace_chat from adapter.matrix.handlers.context_commands import ( LOAD_PROMPT, ) +from adapter.matrix.routed_platform import RoutedPlatformClient from adapter.matrix.room_router import resolve_chat_id from adapter.matrix.store import ( add_staged_attachment, @@ -118,13 +120,31 @@ def _agent_base_url_from_env() -> str: return "http://127.0.0.1:8000" -def _build_platform_from_env() -> PlatformClient: +def _build_platform_from_env(*, store: StateStore, chat_mgr: ChatManager) -> PlatformClient: backend = os.environ.get("MATRIX_PLATFORM_BACKEND", "mock").strip().lower() if backend == "real": + prototype_state = PrototypeStateStore() + registry_path = os.environ.get("MATRIX_AGENT_REGISTRY_PATH", "").strip() + if registry_path: + registry = load_agent_registry(registry_path) + delegates = { + agent.agent_id: RealPlatformClient( + agent_id=agent.agent_id, + agent_base_url=_agent_base_url_from_env(), + prototype_state=prototype_state, + platform="matrix", + ) + for agent in registry.agents + } + return RoutedPlatformClient( + chat_mgr=chat_mgr, + store=store, + delegates=delegates, + ) return RealPlatformClient( agent_id="matrix-bot", agent_base_url=_agent_base_url_from_env(), - prototype_state=PrototypeStateStore(), + prototype_state=prototype_state, platform="matrix", ) return MockPlatformClient() @@ -135,9 +155,10 @@ def build_runtime( store: StateStore | None = None, client: AsyncClient | None = None, ) -> MatrixRuntime: - platform = platform or _build_platform_from_env() store = store or InMemoryStore() chat_mgr = ChatManager(platform, store) + platform = platform or _build_platform_from_env(store=store, chat_mgr=chat_mgr) + chat_mgr = ChatManager(platform, store) auth_mgr = AuthManager(platform, store) settings_mgr = SettingsManager(platform, store) prototype_state = getattr(platform, "_prototype_state", None) @@ -224,10 +245,7 @@ class MatrixBot: ) return local_chat_id = await resolve_chat_id(self.runtime.store, room.room_id, sender) - dispatch_chat_id = local_chat_id - if not body.startswith("!"): - dispatch_chat_id = (room_meta or {}).get("platform_chat_id") or local_chat_id - incoming = from_room_event(event, room_id=room.room_id, chat_id=dispatch_chat_id) + incoming = from_room_event(event, room_id=room.room_id, chat_id=local_chat_id) if incoming is None: return if isinstance(incoming, IncomingCommand) and incoming.command in { @@ -274,7 +292,7 @@ class MatrixBot: ) outgoing = [ OutgoingMessage( - chat_id=dispatch_chat_id, + chat_id=local_chat_id, text="Сервис временно недоступен. Попробуйте ещё раз позже.", ) ] diff --git a/adapter/matrix/routed_platform.py b/adapter/matrix/routed_platform.py new file mode 100644 index 0000000..8f505e5 --- /dev/null +++ b/adapter/matrix/routed_platform.py @@ -0,0 +1,110 @@ +from __future__ import annotations + +from collections.abc import AsyncIterator, Mapping + +from adapter.matrix.store import get_room_meta +from core.chat import ChatManager +from core.store import StateStore +from sdk.interface import ( + Attachment, + MessageChunk, + MessageResponse, + PlatformClient, + PlatformError, + User, + UserSettings, +) + + +class RoutedPlatformClient(PlatformClient): + def __init__( + self, + *, + chat_mgr: ChatManager, + store: StateStore, + delegates: Mapping[str, PlatformClient], + ) -> None: + if not delegates: + raise ValueError("RoutedPlatformClient requires at least one delegate") + self._chat_mgr = chat_mgr + self._store = store + self._delegates = dict(delegates) + self._default_client = next(iter(self._delegates.values())) + self._prototype_state = getattr(self._default_client, "_prototype_state", None) + + async def get_or_create_user( + self, + external_id: str, + platform: str, + display_name: str | None = None, + ) -> User: + return await self._default_client.get_or_create_user( + external_id=external_id, + platform=platform, + display_name=display_name, + ) + + async def send_message( + self, + user_id: str, + chat_id: str, + text: str, + attachments: list[Attachment] | None = None, + ) -> MessageResponse: + delegate, platform_chat_id = await self._resolve_delegate(user_id, chat_id) + return await delegate.send_message(user_id, platform_chat_id, text, attachments) + + async def stream_message( + self, + user_id: str, + chat_id: str, + text: str, + attachments: list[Attachment] | None = None, + ) -> AsyncIterator[MessageChunk]: + delegate, platform_chat_id = await self._resolve_delegate(user_id, chat_id) + async for chunk in delegate.stream_message(user_id, platform_chat_id, text, attachments): + yield chunk + + async def get_settings(self, user_id: str) -> UserSettings: + return await self._default_client.get_settings(user_id) + + async def update_settings(self, user_id: str, action) -> None: + await self._default_client.update_settings(user_id, action) + + async def close(self) -> None: + for delegate in self._delegates.values(): + close = getattr(delegate, "close", None) + if callable(close): + await close() + + async def _resolve_delegate(self, user_id: str, local_chat_id: str) -> tuple[PlatformClient, str]: + chat = await self._chat_mgr.get(local_chat_id, user_id) + if chat is None: + raise PlatformError( + f"unknown matrix chat id: {local_chat_id}", + code="MATRIX_CHAT_NOT_FOUND", + ) + + room_meta = await get_room_meta(self._store, chat.surface_ref) + if room_meta is None: + raise PlatformError( + f"matrix room is not bound: {chat.surface_ref}", + code="MATRIX_ROOM_NOT_BOUND", + ) + + agent_id = room_meta.get("agent_id") + platform_chat_id = room_meta.get("platform_chat_id") + if not agent_id or not platform_chat_id: + raise PlatformError( + f"matrix room routing is incomplete: {chat.surface_ref}", + code="MATRIX_ROUTE_INCOMPLETE", + ) + + delegate = self._delegates.get(str(agent_id)) + if delegate is None: + raise PlatformError( + f"unknown matrix agent id: {agent_id}", + code="MATRIX_AGENT_NOT_FOUND", + ) + + return delegate, str(platform_chat_id) diff --git a/tests/adapter/matrix/test_routed_platform.py b/tests/adapter/matrix/test_routed_platform.py new file mode 100644 index 0000000..fc2f89d --- /dev/null +++ b/tests/adapter/matrix/test_routed_platform.py @@ -0,0 +1,240 @@ +from __future__ import annotations + +from collections.abc import AsyncIterator +from pathlib import Path +from types import SimpleNamespace +from unittest.mock import AsyncMock + +import pytest + +from adapter.matrix.bot import MatrixBot, build_runtime +from adapter.matrix.routed_platform import RoutedPlatformClient +from adapter.matrix.store import set_room_meta +from core.chat import ChatManager +from core.store import InMemoryStore +from sdk.interface import MessageChunk, MessageResponse, User, UserSettings +from sdk.mock import MockPlatformClient + + +class FakeDelegate: + def __init__(self, *, name: str) -> None: + self.name = name + self.send_calls: list[dict] = [] + self.stream_calls: list[dict] = [] + self.user_calls: list[dict] = [] + self.settings_calls: list[str] = [] + self.update_calls: list[tuple[str, object]] = [] + + async def get_or_create_user( + self, + external_id: str, + platform: str, + display_name: str | None = None, + ) -> User: + self.user_calls.append( + { + "external_id": external_id, + "platform": platform, + "display_name": display_name, + } + ) + return User( + user_id=f"user-{self.name}", + external_id=external_id, + platform=platform, + display_name=display_name, + created_at="2025-01-01T00:00:00Z", + is_new=False, + ) + + async def send_message( + self, + user_id: str, + chat_id: str, + text: str, + attachments=None, + ) -> MessageResponse: + self.send_calls.append( + { + "user_id": user_id, + "chat_id": chat_id, + "text": text, + "attachments": attachments, + } + ) + return MessageResponse( + message_id=f"msg-{self.name}", + response=f"reply-{self.name}", + tokens_used=0, + finished=True, + ) + + async def stream_message( + self, + user_id: str, + chat_id: str, + text: str, + attachments=None, + ) -> AsyncIterator[MessageChunk]: + self.stream_calls.append( + { + "user_id": user_id, + "chat_id": chat_id, + "text": text, + "attachments": attachments, + } + ) + yield MessageChunk( + message_id=f"stream-{self.name}", + delta=f"delta-{self.name}", + finished=True, + tokens_used=0, + ) + + async def get_settings(self, user_id: str) -> UserSettings: + self.settings_calls.append(user_id) + return UserSettings(skills={"files": True}) + + async def update_settings(self, user_id: str, action: object) -> None: + self.update_calls.append((user_id, action)) + + +@pytest.mark.asyncio +async def test_send_message_routes_by_room_agent_and_platform_chat_id(): + store = InMemoryStore() + chat_mgr = ChatManager(None, store) + await chat_mgr.get_or_create("u1", "C1", "matrix", "!room:example.org") + await set_room_meta( + store, + "!room:example.org", + {"platform_chat_id": "41", "agent_id": "agent-2"}, + ) + delegates = { + "agent-1": FakeDelegate(name="agent-1"), + "agent-2": FakeDelegate(name="agent-2"), + } + platform = RoutedPlatformClient(chat_mgr=chat_mgr, store=store, delegates=delegates) + + response = await platform.send_message("u1", "C1", "hello", attachments=[]) + + assert response.response == "reply-agent-2" + assert delegates["agent-1"].send_calls == [] + assert delegates["agent-2"].send_calls == [ + { + "user_id": "u1", + "chat_id": "41", + "text": "hello", + "attachments": [], + } + ] + + +@pytest.mark.asyncio +async def test_stream_message_routes_by_room_agent_and_platform_chat_id(): + store = InMemoryStore() + chat_mgr = ChatManager(None, store) + await chat_mgr.get_or_create("u1", "C1", "matrix", "!room:example.org") + await set_room_meta( + store, + "!room:example.org", + {"platform_chat_id": "41", "agent_id": "agent-2"}, + ) + delegates = { + "agent-1": FakeDelegate(name="agent-1"), + "agent-2": FakeDelegate(name="agent-2"), + } + platform = RoutedPlatformClient(chat_mgr=chat_mgr, store=store, delegates=delegates) + + chunks = [chunk async for chunk in platform.stream_message("u1", "C1", "hello")] + + assert [chunk.delta for chunk in chunks] == ["delta-agent-2"] + assert delegates["agent-1"].stream_calls == [] + assert delegates["agent-2"].stream_calls == [ + { + "user_id": "u1", + "chat_id": "41", + "text": "hello", + "attachments": None, + } + ] + + +@pytest.mark.asyncio +async def test_user_and_settings_delegate_to_default_client(): + store = InMemoryStore() + chat_mgr = ChatManager(None, store) + delegates = { + "agent-1": FakeDelegate(name="agent-1"), + "agent-2": FakeDelegate(name="agent-2"), + } + platform = RoutedPlatformClient(chat_mgr=chat_mgr, store=store, delegates=delegates) + + user = await platform.get_or_create_user("ext-1", "matrix", display_name="Alice") + settings = await platform.get_settings("u1") + await platform.update_settings("u1", {"action": "noop"}) + + assert user.user_id == "user-agent-1" + assert settings.skills == {"files": True} + assert delegates["agent-1"].user_calls == [ + { + "external_id": "ext-1", + "platform": "matrix", + "display_name": "Alice", + } + ] + assert delegates["agent-2"].user_calls == [] + assert delegates["agent-1"].settings_calls == ["u1"] + assert delegates["agent-1"].update_calls == [("u1", {"action": "noop"})] + + +@pytest.mark.asyncio +async def test_build_runtime_real_backend_uses_routed_platform_with_registry( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +): + registry_path = tmp_path / "matrix-agents.yaml" + registry_path.write_text( + "agents:\n" + " - id: agent-1\n" + " label: Analyst\n" + " - id: agent-2\n" + " label: Research\n", + encoding="utf-8", + ) + monkeypatch.setenv("MATRIX_PLATFORM_BACKEND", "real") + monkeypatch.setenv("MATRIX_AGENT_REGISTRY_PATH", str(registry_path)) + monkeypatch.setenv("AGENT_BASE_URL", "http://agent.example") + + runtime = build_runtime() + + assert isinstance(runtime.platform, RoutedPlatformClient) + assert set(runtime.platform._delegates) == {"agent-1", "agent-2"} + assert runtime.platform._delegates["agent-1"].agent_base_url == "http://agent.example" + assert runtime.platform._delegates["agent-1"].agent_id == "agent-1" + assert runtime.platform._delegates["agent-2"].agent_id == "agent-2" + + +@pytest.mark.asyncio +async def test_bot_keeps_local_chat_id_for_plain_message_dispatch(): + runtime = build_runtime(platform=MockPlatformClient()) + await set_room_meta( + runtime.store, + "!chat1:example.org", + { + "chat_id": "C1", + "matrix_user_id": "@alice:example.org", + "platform_chat_id": "41", + "agent_id": "agent-2", + }, + ) + runtime.dispatcher.dispatch = AsyncMock(return_value=[]) + bot = MatrixBot(SimpleNamespace(user_id="@bot:example.org"), runtime) + + await bot.on_room_message( + SimpleNamespace(room_id="!chat1:example.org"), + SimpleNamespace(sender="@alice:example.org", body="hello"), + ) + + dispatched = runtime.dispatcher.dispatch.await_args.args[0] + assert dispatched.chat_id == "C1" + assert dispatched.text == "hello"