surfaces/tests/adapter/matrix/test_routed_platform.py

262 lines
8.4 KiB
Python

from __future__ import annotations
from collections.abc import AsyncIterator
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import AsyncMock
import pytest
from adapter.matrix.bot import MatrixBot, build_runtime
from adapter.matrix.routed_platform import RoutedPlatformClient
from adapter.matrix.store import set_room_meta
from core.chat import ChatManager
from core.store import InMemoryStore
from sdk.interface import MessageChunk, MessageResponse, User, UserSettings
from sdk.mock import MockPlatformClient
class FakeDelegate:
def __init__(self, *, name: str) -> None:
self.name = name
self.send_calls: list[dict] = []
self.stream_calls: list[dict] = []
self.user_calls: list[dict] = []
self.settings_calls: list[str] = []
self.update_calls: list[tuple[str, object]] = []
async def get_or_create_user(
self,
external_id: str,
platform: str,
display_name: str | None = None,
) -> User:
self.user_calls.append(
{
"external_id": external_id,
"platform": platform,
"display_name": display_name,
}
)
return User(
user_id=f"user-{self.name}",
external_id=external_id,
platform=platform,
display_name=display_name,
created_at="2025-01-01T00:00:00Z",
is_new=False,
)
async def send_message(
self,
user_id: str,
chat_id: str,
text: str,
attachments=None,
) -> MessageResponse:
self.send_calls.append(
{
"user_id": user_id,
"chat_id": chat_id,
"text": text,
"attachments": attachments,
}
)
return MessageResponse(
message_id=f"msg-{self.name}",
response=f"reply-{self.name}",
tokens_used=0,
finished=True,
)
async def stream_message(
self,
user_id: str,
chat_id: str,
text: str,
attachments=None,
) -> AsyncIterator[MessageChunk]:
self.stream_calls.append(
{
"user_id": user_id,
"chat_id": chat_id,
"text": text,
"attachments": attachments,
}
)
yield MessageChunk(
message_id=f"stream-{self.name}",
delta=f"delta-{self.name}",
finished=True,
tokens_used=0,
)
async def get_settings(self, user_id: str) -> UserSettings:
self.settings_calls.append(user_id)
return UserSettings(skills={"files": True})
async def update_settings(self, user_id: str, action: object) -> None:
self.update_calls.append((user_id, action))
@pytest.mark.asyncio
async def test_send_message_routes_by_room_agent_and_platform_chat_id():
store = InMemoryStore()
chat_mgr = ChatManager(None, store)
await chat_mgr.get_or_create("u1", "C1", "matrix", "!room:example.org")
await set_room_meta(
store,
"!room:example.org",
{"platform_chat_id": "41", "agent_id": "agent-2"},
)
delegates = {
"agent-1": FakeDelegate(name="agent-1"),
"agent-2": FakeDelegate(name="agent-2"),
}
platform = RoutedPlatformClient(chat_mgr=chat_mgr, store=store, delegates=delegates)
response = await platform.send_message("u1", "C1", "hello", attachments=[])
assert response.response == "reply-agent-2"
assert delegates["agent-1"].send_calls == []
assert delegates["agent-2"].send_calls == [
{
"user_id": "u1",
"chat_id": "41",
"text": "hello",
"attachments": [],
}
]
@pytest.mark.asyncio
async def test_stream_message_routes_by_room_agent_and_platform_chat_id():
store = InMemoryStore()
chat_mgr = ChatManager(None, store)
await chat_mgr.get_or_create("u1", "C1", "matrix", "!room:example.org")
await set_room_meta(
store,
"!room:example.org",
{"platform_chat_id": "41", "agent_id": "agent-2"},
)
delegates = {
"agent-1": FakeDelegate(name="agent-1"),
"agent-2": FakeDelegate(name="agent-2"),
}
platform = RoutedPlatformClient(chat_mgr=chat_mgr, store=store, delegates=delegates)
chunks = [chunk async for chunk in platform.stream_message("u1", "C1", "hello")]
assert [chunk.delta for chunk in chunks] == ["delta-agent-2"]
assert delegates["agent-1"].stream_calls == []
assert delegates["agent-2"].stream_calls == [
{
"user_id": "u1",
"chat_id": "41",
"text": "hello",
"attachments": None,
}
]
@pytest.mark.asyncio
async def test_user_and_settings_delegate_to_default_client():
store = InMemoryStore()
chat_mgr = ChatManager(None, store)
delegates = {
"agent-1": FakeDelegate(name="agent-1"),
"agent-2": FakeDelegate(name="agent-2"),
}
platform = RoutedPlatformClient(chat_mgr=chat_mgr, store=store, delegates=delegates)
user = await platform.get_or_create_user("ext-1", "matrix", display_name="Alice")
settings = await platform.get_settings("u1")
await platform.update_settings("u1", {"action": "noop"})
assert user.user_id == "user-agent-1"
assert settings.skills == {"files": True}
assert delegates["agent-1"].user_calls == [
{
"external_id": "ext-1",
"platform": "matrix",
"display_name": "Alice",
}
]
assert delegates["agent-2"].user_calls == []
assert delegates["agent-1"].settings_calls == ["u1"]
assert delegates["agent-1"].update_calls == [("u1", {"action": "noop"})]
@pytest.mark.asyncio
async def test_build_runtime_real_backend_uses_routed_platform_with_registry(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
):
registry_path = tmp_path / "matrix-agents.yaml"
registry_path.write_text(
"agents:\n"
" - id: agent-1\n"
" label: Analyst\n"
" - id: agent-2\n"
" label: Research\n",
encoding="utf-8",
)
monkeypatch.setenv("MATRIX_PLATFORM_BACKEND", "real")
monkeypatch.setenv("MATRIX_AGENT_REGISTRY_PATH", str(registry_path))
monkeypatch.setenv("AGENT_BASE_URL", "http://agent.example")
runtime = build_runtime()
assert isinstance(runtime.platform, RoutedPlatformClient)
assert set(runtime.platform._delegates) == {"agent-1", "agent-2"}
assert runtime.platform._delegates["agent-1"].agent_base_url == "http://agent.example"
assert runtime.platform._delegates["agent-1"].agent_id == "agent-1"
assert runtime.platform._delegates["agent-2"].agent_id == "agent-2"
def test_build_runtime_real_backend_requires_registry_path(monkeypatch: pytest.MonkeyPatch):
monkeypatch.setenv("MATRIX_PLATFORM_BACKEND", "real")
monkeypatch.delenv("MATRIX_AGENT_REGISTRY_PATH", raising=False)
monkeypatch.setenv("AGENT_BASE_URL", "http://agent.example")
with pytest.raises(RuntimeError, match="MATRIX_AGENT_REGISTRY_PATH"):
build_runtime()
def test_build_runtime_real_backend_fails_explicitly_on_invalid_registry(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
):
registry_path = tmp_path / "missing.yaml"
monkeypatch.setenv("MATRIX_PLATFORM_BACKEND", "real")
monkeypatch.setenv("MATRIX_AGENT_REGISTRY_PATH", str(registry_path))
monkeypatch.setenv("AGENT_BASE_URL", "http://agent.example")
with pytest.raises(RuntimeError, match="failed to load matrix agent registry"):
build_runtime()
@pytest.mark.asyncio
async def test_bot_keeps_local_chat_id_for_plain_message_dispatch():
runtime = build_runtime(platform=MockPlatformClient())
await set_room_meta(
runtime.store,
"!chat1:example.org",
{
"chat_id": "C1",
"matrix_user_id": "@alice:example.org",
"platform_chat_id": "41",
"agent_id": "agent-2",
},
)
runtime.dispatcher.dispatch = AsyncMock(return_value=[])
bot = MatrixBot(SimpleNamespace(user_id="@bot:example.org"), runtime)
await bot.on_room_message(
SimpleNamespace(room_id="!chat1:example.org"),
SimpleNamespace(sender="@alice:example.org", body="hello"),
)
dispatched = runtime.dispatcher.dispatch.await_args.args[0]
assert dispatched.chat_id == "C1"
assert dispatched.text == "hello"