from __future__ import annotations from collections.abc import AsyncIterator from pathlib import Path from types import SimpleNamespace from unittest.mock import AsyncMock import pytest from adapter.matrix.bot import MatrixBot, build_runtime from adapter.matrix.routed_platform import RoutedPlatformClient from adapter.matrix.store import set_room_meta from core.chat import ChatManager from core.store import InMemoryStore from sdk.interface import MessageChunk, MessageResponse, User, UserSettings from sdk.mock import MockPlatformClient from sdk.interface import PlatformError class FakeDelegate: def __init__(self, *, name: str) -> None: self.name = name self.send_calls: list[dict] = [] self.stream_calls: list[dict] = [] self.user_calls: list[dict] = [] self.settings_calls: list[str] = [] self.update_calls: list[tuple[str, object]] = [] async def get_or_create_user( self, external_id: str, platform: str, display_name: str | None = None, ) -> User: self.user_calls.append( { "external_id": external_id, "platform": platform, "display_name": display_name, } ) return User( user_id=f"user-{self.name}", external_id=external_id, platform=platform, display_name=display_name, created_at="2025-01-01T00:00:00Z", is_new=False, ) async def send_message( self, user_id: str, chat_id: str, text: str, attachments=None, ) -> MessageResponse: self.send_calls.append( { "user_id": user_id, "chat_id": chat_id, "text": text, "attachments": attachments, } ) return MessageResponse( message_id=f"msg-{self.name}", response=f"reply-{self.name}", tokens_used=0, finished=True, ) async def stream_message( self, user_id: str, chat_id: str, text: str, attachments=None, ) -> AsyncIterator[MessageChunk]: self.stream_calls.append( { "user_id": user_id, "chat_id": chat_id, "text": text, "attachments": attachments, } ) yield MessageChunk( message_id=f"stream-{self.name}", delta=f"delta-{self.name}", finished=True, tokens_used=0, ) async def get_settings(self, user_id: str) -> UserSettings: self.settings_calls.append(user_id) return UserSettings(skills={"files": True}) async def update_settings(self, user_id: str, action: object) -> None: self.update_calls.append((user_id, action)) @pytest.fixture(autouse=True) def clear_matrix_registry_env(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.delenv("MATRIX_AGENT_REGISTRY_PATH", raising=False) monkeypatch.delenv("MATRIX_PLATFORM_BACKEND", raising=False) @pytest.mark.asyncio async def test_send_message_routes_by_room_agent_and_platform_chat_id(): store = InMemoryStore() chat_mgr = ChatManager(None, store) await chat_mgr.get_or_create("u1", "C1", "matrix", "!room:example.org") await set_room_meta( store, "!room:example.org", {"platform_chat_id": "41", "agent_id": "agent-2"}, ) delegates = { "agent-1": FakeDelegate(name="agent-1"), "agent-2": FakeDelegate(name="agent-2"), } platform = RoutedPlatformClient(chat_mgr=chat_mgr, store=store, delegates=delegates) response = await platform.send_message("u1", "C1", "hello", attachments=[]) assert response.response == "reply-agent-2" assert delegates["agent-1"].send_calls == [] assert delegates["agent-2"].send_calls == [ { "user_id": "u1", "chat_id": "41", "text": "hello", "attachments": [], } ] @pytest.mark.asyncio async def test_stream_message_routes_by_room_agent_and_platform_chat_id(): store = InMemoryStore() chat_mgr = ChatManager(None, store) await chat_mgr.get_or_create("u1", "C1", "matrix", "!room:example.org") await set_room_meta( store, "!room:example.org", {"platform_chat_id": "41", "agent_id": "agent-2"}, ) delegates = { "agent-1": FakeDelegate(name="agent-1"), "agent-2": FakeDelegate(name="agent-2"), } platform = RoutedPlatformClient(chat_mgr=chat_mgr, store=store, delegates=delegates) chunks = [chunk async for chunk in platform.stream_message("u1", "C1", "hello")] assert [chunk.delta for chunk in chunks] == ["delta-agent-2"] assert delegates["agent-1"].stream_calls == [] assert delegates["agent-2"].stream_calls == [ { "user_id": "u1", "chat_id": "41", "text": "hello", "attachments": None, } ] @pytest.mark.asyncio async def test_send_message_fails_fast_when_platform_chat_id_is_missing(): store = InMemoryStore() chat_mgr = ChatManager(None, store) await chat_mgr.get_or_create("u1", "C1", "matrix", "!room:example.org") await set_room_meta( store, "!room:example.org", {"agent_id": "agent-2"}, ) platform = RoutedPlatformClient( chat_mgr=chat_mgr, store=store, delegates={"agent-2": FakeDelegate(name="agent-2")}, ) with pytest.raises(PlatformError, match="routing is incomplete") as exc_info: await platform.send_message("u1", "C1", "hello") assert exc_info.value.code == "MATRIX_ROUTE_INCOMPLETE" @pytest.mark.asyncio async def test_stream_message_fails_fast_when_agent_id_is_missing(): store = InMemoryStore() chat_mgr = ChatManager(None, store) await chat_mgr.get_or_create("u1", "C1", "matrix", "!room:example.org") await set_room_meta( store, "!room:example.org", {"platform_chat_id": "41"}, ) platform = RoutedPlatformClient( chat_mgr=chat_mgr, store=store, delegates={"agent-2": FakeDelegate(name="agent-2")}, ) with pytest.raises(PlatformError, match="routing is incomplete") as exc_info: await anext(platform.stream_message("u1", "C1", "hello")) assert exc_info.value.code == "MATRIX_ROUTE_INCOMPLETE" @pytest.mark.asyncio async def test_routing_uses_repaired_room_metadata_without_runtime_backfill(): store = InMemoryStore() chat_mgr = ChatManager(None, store) await chat_mgr.get_or_create("u1", "C1", "matrix", "!room:example.org") await set_room_meta( store, "!room:example.org", {"platform_chat_id": "restored-41", "agent_id": "agent-2"}, ) delegate = FakeDelegate(name="agent-2") platform = RoutedPlatformClient( chat_mgr=chat_mgr, store=store, delegates={"agent-2": delegate}, ) await platform.send_message("u1", "C1", "hello") assert delegate.send_calls == [ { "user_id": "u1", "chat_id": "restored-41", "text": "hello", "attachments": None, } ] @pytest.mark.asyncio async def test_user_and_settings_delegate_to_default_client(): store = InMemoryStore() chat_mgr = ChatManager(None, store) delegates = { "agent-1": FakeDelegate(name="agent-1"), "agent-2": FakeDelegate(name="agent-2"), } platform = RoutedPlatformClient(chat_mgr=chat_mgr, store=store, delegates=delegates) user = await platform.get_or_create_user("ext-1", "matrix", display_name="Alice") settings = await platform.get_settings("u1") await platform.update_settings("u1", {"action": "noop"}) assert user.user_id == "user-agent-1" assert settings.skills == {"files": True} assert delegates["agent-1"].user_calls == [ { "external_id": "ext-1", "platform": "matrix", "display_name": "Alice", } ] assert delegates["agent-2"].user_calls == [] assert delegates["agent-1"].settings_calls == ["u1"] assert delegates["agent-1"].update_calls == [("u1", {"action": "noop"})] @pytest.mark.asyncio async def test_build_runtime_real_backend_uses_routed_platform_with_registry( monkeypatch: pytest.MonkeyPatch, tmp_path: Path, ): registry_path = tmp_path / "matrix-agents.yaml" registry_path.write_text( "agents:\n" " - id: agent-1\n" " label: Analyst\n" " - id: agent-2\n" " label: Research\n", encoding="utf-8", ) monkeypatch.setenv("MATRIX_PLATFORM_BACKEND", "real") monkeypatch.setenv("MATRIX_AGENT_REGISTRY_PATH", str(registry_path)) monkeypatch.setenv("AGENT_BASE_URL", "http://agent.example") runtime = build_runtime() assert isinstance(runtime.platform, RoutedPlatformClient) assert set(runtime.platform._delegates) == {"agent-1", "agent-2"} assert runtime.platform._delegates["agent-1"].agent_base_url == "http://agent.example" assert runtime.platform._delegates["agent-1"].agent_id == "agent-1" assert runtime.platform._delegates["agent-2"].agent_id == "agent-2" def test_build_runtime_real_backend_requires_registry_path(monkeypatch: pytest.MonkeyPatch): monkeypatch.setenv("MATRIX_PLATFORM_BACKEND", "real") monkeypatch.delenv("MATRIX_AGENT_REGISTRY_PATH", raising=False) monkeypatch.setenv("AGENT_BASE_URL", "http://agent.example") with pytest.raises(RuntimeError, match="MATRIX_AGENT_REGISTRY_PATH"): build_runtime() def test_build_runtime_real_backend_fails_explicitly_on_invalid_registry( monkeypatch: pytest.MonkeyPatch, tmp_path: Path, ): registry_path = tmp_path / "missing.yaml" monkeypatch.setenv("MATRIX_PLATFORM_BACKEND", "real") monkeypatch.setenv("MATRIX_AGENT_REGISTRY_PATH", str(registry_path)) monkeypatch.setenv("AGENT_BASE_URL", "http://agent.example") with pytest.raises(RuntimeError, match="failed to load matrix agent registry"): build_runtime() @pytest.mark.asyncio async def test_bot_keeps_local_chat_id_for_plain_message_dispatch(): runtime = build_runtime(platform=MockPlatformClient()) await set_room_meta( runtime.store, "!chat1:example.org", { "chat_id": "C1", "matrix_user_id": "@alice:example.org", "platform_chat_id": "41", "agent_id": "agent-2", }, ) runtime.dispatcher.dispatch = AsyncMock(return_value=[]) bot = MatrixBot(SimpleNamespace(user_id="@bot:example.org"), runtime) await bot.on_room_message( SimpleNamespace(room_id="!chat1:example.org"), SimpleNamespace(sender="@alice:example.org", body="hello"), ) dispatched = runtime.dispatcher.dispatch.await_args.args[0] assert dispatched.chat_id == "C1" assert dispatched.text == "hello"