feat: add matrix routed platform facade

This commit is contained in:
Mikhail Putilovskij 2026-04-24 13:22:05 +03:00
parent 7627012f24
commit 242f4aadd3
3 changed files with 376 additions and 8 deletions

View file

@ -30,11 +30,13 @@ from adapter.matrix.files import (
matrix_msgtype_for_attachment, matrix_msgtype_for_attachment,
resolve_workspace_attachment_path, resolve_workspace_attachment_path,
) )
from adapter.matrix.agent_registry import load_agent_registry
from adapter.matrix.handlers import register_matrix_handlers from adapter.matrix.handlers import register_matrix_handlers
from adapter.matrix.handlers.auth import handle_invite, provision_workspace_chat from adapter.matrix.handlers.auth import handle_invite, provision_workspace_chat
from adapter.matrix.handlers.context_commands import ( from adapter.matrix.handlers.context_commands import (
LOAD_PROMPT, LOAD_PROMPT,
) )
from adapter.matrix.routed_platform import RoutedPlatformClient
from adapter.matrix.room_router import resolve_chat_id from adapter.matrix.room_router import resolve_chat_id
from adapter.matrix.store import ( from adapter.matrix.store import (
add_staged_attachment, add_staged_attachment,
@ -118,13 +120,31 @@ def _agent_base_url_from_env() -> str:
return "http://127.0.0.1:8000" return "http://127.0.0.1:8000"
def _build_platform_from_env() -> PlatformClient: def _build_platform_from_env(*, store: StateStore, chat_mgr: ChatManager) -> PlatformClient:
backend = os.environ.get("MATRIX_PLATFORM_BACKEND", "mock").strip().lower() backend = os.environ.get("MATRIX_PLATFORM_BACKEND", "mock").strip().lower()
if backend == "real": if backend == "real":
prototype_state = PrototypeStateStore()
registry_path = os.environ.get("MATRIX_AGENT_REGISTRY_PATH", "").strip()
if registry_path:
registry = load_agent_registry(registry_path)
delegates = {
agent.agent_id: RealPlatformClient(
agent_id=agent.agent_id,
agent_base_url=_agent_base_url_from_env(),
prototype_state=prototype_state,
platform="matrix",
)
for agent in registry.agents
}
return RoutedPlatformClient(
chat_mgr=chat_mgr,
store=store,
delegates=delegates,
)
return RealPlatformClient( return RealPlatformClient(
agent_id="matrix-bot", agent_id="matrix-bot",
agent_base_url=_agent_base_url_from_env(), agent_base_url=_agent_base_url_from_env(),
prototype_state=PrototypeStateStore(), prototype_state=prototype_state,
platform="matrix", platform="matrix",
) )
return MockPlatformClient() return MockPlatformClient()
@ -135,9 +155,10 @@ def build_runtime(
store: StateStore | None = None, store: StateStore | None = None,
client: AsyncClient | None = None, client: AsyncClient | None = None,
) -> MatrixRuntime: ) -> MatrixRuntime:
platform = platform or _build_platform_from_env()
store = store or InMemoryStore() store = store or InMemoryStore()
chat_mgr = ChatManager(platform, store) chat_mgr = ChatManager(platform, store)
platform = platform or _build_platform_from_env(store=store, chat_mgr=chat_mgr)
chat_mgr = ChatManager(platform, store)
auth_mgr = AuthManager(platform, store) auth_mgr = AuthManager(platform, store)
settings_mgr = SettingsManager(platform, store) settings_mgr = SettingsManager(platform, store)
prototype_state = getattr(platform, "_prototype_state", None) prototype_state = getattr(platform, "_prototype_state", None)
@ -224,10 +245,7 @@ class MatrixBot:
) )
return return
local_chat_id = await resolve_chat_id(self.runtime.store, room.room_id, sender) local_chat_id = await resolve_chat_id(self.runtime.store, room.room_id, sender)
dispatch_chat_id = local_chat_id incoming = from_room_event(event, room_id=room.room_id, chat_id=local_chat_id)
if not body.startswith("!"):
dispatch_chat_id = (room_meta or {}).get("platform_chat_id") or local_chat_id
incoming = from_room_event(event, room_id=room.room_id, chat_id=dispatch_chat_id)
if incoming is None: if incoming is None:
return return
if isinstance(incoming, IncomingCommand) and incoming.command in { if isinstance(incoming, IncomingCommand) and incoming.command in {
@ -274,7 +292,7 @@ class MatrixBot:
) )
outgoing = [ outgoing = [
OutgoingMessage( OutgoingMessage(
chat_id=dispatch_chat_id, chat_id=local_chat_id,
text="Сервис временно недоступен. Попробуйте ещё раз позже.", text="Сервис временно недоступен. Попробуйте ещё раз позже.",
) )
] ]

View file

@ -0,0 +1,110 @@
from __future__ import annotations
from collections.abc import AsyncIterator, Mapping
from adapter.matrix.store import get_room_meta
from core.chat import ChatManager
from core.store import StateStore
from sdk.interface import (
Attachment,
MessageChunk,
MessageResponse,
PlatformClient,
PlatformError,
User,
UserSettings,
)
class RoutedPlatformClient(PlatformClient):
def __init__(
self,
*,
chat_mgr: ChatManager,
store: StateStore,
delegates: Mapping[str, PlatformClient],
) -> None:
if not delegates:
raise ValueError("RoutedPlatformClient requires at least one delegate")
self._chat_mgr = chat_mgr
self._store = store
self._delegates = dict(delegates)
self._default_client = next(iter(self._delegates.values()))
self._prototype_state = getattr(self._default_client, "_prototype_state", None)
async def get_or_create_user(
self,
external_id: str,
platform: str,
display_name: str | None = None,
) -> User:
return await self._default_client.get_or_create_user(
external_id=external_id,
platform=platform,
display_name=display_name,
)
async def send_message(
self,
user_id: str,
chat_id: str,
text: str,
attachments: list[Attachment] | None = None,
) -> MessageResponse:
delegate, platform_chat_id = await self._resolve_delegate(user_id, chat_id)
return await delegate.send_message(user_id, platform_chat_id, text, attachments)
async def stream_message(
self,
user_id: str,
chat_id: str,
text: str,
attachments: list[Attachment] | None = None,
) -> AsyncIterator[MessageChunk]:
delegate, platform_chat_id = await self._resolve_delegate(user_id, chat_id)
async for chunk in delegate.stream_message(user_id, platform_chat_id, text, attachments):
yield chunk
async def get_settings(self, user_id: str) -> UserSettings:
return await self._default_client.get_settings(user_id)
async def update_settings(self, user_id: str, action) -> None:
await self._default_client.update_settings(user_id, action)
async def close(self) -> None:
for delegate in self._delegates.values():
close = getattr(delegate, "close", None)
if callable(close):
await close()
async def _resolve_delegate(self, user_id: str, local_chat_id: str) -> tuple[PlatformClient, str]:
chat = await self._chat_mgr.get(local_chat_id, user_id)
if chat is None:
raise PlatformError(
f"unknown matrix chat id: {local_chat_id}",
code="MATRIX_CHAT_NOT_FOUND",
)
room_meta = await get_room_meta(self._store, chat.surface_ref)
if room_meta is None:
raise PlatformError(
f"matrix room is not bound: {chat.surface_ref}",
code="MATRIX_ROOM_NOT_BOUND",
)
agent_id = room_meta.get("agent_id")
platform_chat_id = room_meta.get("platform_chat_id")
if not agent_id or not platform_chat_id:
raise PlatformError(
f"matrix room routing is incomplete: {chat.surface_ref}",
code="MATRIX_ROUTE_INCOMPLETE",
)
delegate = self._delegates.get(str(agent_id))
if delegate is None:
raise PlatformError(
f"unknown matrix agent id: {agent_id}",
code="MATRIX_AGENT_NOT_FOUND",
)
return delegate, str(platform_chat_id)

View file

@ -0,0 +1,240 @@
from __future__ import annotations
from collections.abc import AsyncIterator
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import AsyncMock
import pytest
from adapter.matrix.bot import MatrixBot, build_runtime
from adapter.matrix.routed_platform import RoutedPlatformClient
from adapter.matrix.store import set_room_meta
from core.chat import ChatManager
from core.store import InMemoryStore
from sdk.interface import MessageChunk, MessageResponse, User, UserSettings
from sdk.mock import MockPlatformClient
class FakeDelegate:
def __init__(self, *, name: str) -> None:
self.name = name
self.send_calls: list[dict] = []
self.stream_calls: list[dict] = []
self.user_calls: list[dict] = []
self.settings_calls: list[str] = []
self.update_calls: list[tuple[str, object]] = []
async def get_or_create_user(
self,
external_id: str,
platform: str,
display_name: str | None = None,
) -> User:
self.user_calls.append(
{
"external_id": external_id,
"platform": platform,
"display_name": display_name,
}
)
return User(
user_id=f"user-{self.name}",
external_id=external_id,
platform=platform,
display_name=display_name,
created_at="2025-01-01T00:00:00Z",
is_new=False,
)
async def send_message(
self,
user_id: str,
chat_id: str,
text: str,
attachments=None,
) -> MessageResponse:
self.send_calls.append(
{
"user_id": user_id,
"chat_id": chat_id,
"text": text,
"attachments": attachments,
}
)
return MessageResponse(
message_id=f"msg-{self.name}",
response=f"reply-{self.name}",
tokens_used=0,
finished=True,
)
async def stream_message(
self,
user_id: str,
chat_id: str,
text: str,
attachments=None,
) -> AsyncIterator[MessageChunk]:
self.stream_calls.append(
{
"user_id": user_id,
"chat_id": chat_id,
"text": text,
"attachments": attachments,
}
)
yield MessageChunk(
message_id=f"stream-{self.name}",
delta=f"delta-{self.name}",
finished=True,
tokens_used=0,
)
async def get_settings(self, user_id: str) -> UserSettings:
self.settings_calls.append(user_id)
return UserSettings(skills={"files": True})
async def update_settings(self, user_id: str, action: object) -> None:
self.update_calls.append((user_id, action))
@pytest.mark.asyncio
async def test_send_message_routes_by_room_agent_and_platform_chat_id():
store = InMemoryStore()
chat_mgr = ChatManager(None, store)
await chat_mgr.get_or_create("u1", "C1", "matrix", "!room:example.org")
await set_room_meta(
store,
"!room:example.org",
{"platform_chat_id": "41", "agent_id": "agent-2"},
)
delegates = {
"agent-1": FakeDelegate(name="agent-1"),
"agent-2": FakeDelegate(name="agent-2"),
}
platform = RoutedPlatformClient(chat_mgr=chat_mgr, store=store, delegates=delegates)
response = await platform.send_message("u1", "C1", "hello", attachments=[])
assert response.response == "reply-agent-2"
assert delegates["agent-1"].send_calls == []
assert delegates["agent-2"].send_calls == [
{
"user_id": "u1",
"chat_id": "41",
"text": "hello",
"attachments": [],
}
]
@pytest.mark.asyncio
async def test_stream_message_routes_by_room_agent_and_platform_chat_id():
store = InMemoryStore()
chat_mgr = ChatManager(None, store)
await chat_mgr.get_or_create("u1", "C1", "matrix", "!room:example.org")
await set_room_meta(
store,
"!room:example.org",
{"platform_chat_id": "41", "agent_id": "agent-2"},
)
delegates = {
"agent-1": FakeDelegate(name="agent-1"),
"agent-2": FakeDelegate(name="agent-2"),
}
platform = RoutedPlatformClient(chat_mgr=chat_mgr, store=store, delegates=delegates)
chunks = [chunk async for chunk in platform.stream_message("u1", "C1", "hello")]
assert [chunk.delta for chunk in chunks] == ["delta-agent-2"]
assert delegates["agent-1"].stream_calls == []
assert delegates["agent-2"].stream_calls == [
{
"user_id": "u1",
"chat_id": "41",
"text": "hello",
"attachments": None,
}
]
@pytest.mark.asyncio
async def test_user_and_settings_delegate_to_default_client():
store = InMemoryStore()
chat_mgr = ChatManager(None, store)
delegates = {
"agent-1": FakeDelegate(name="agent-1"),
"agent-2": FakeDelegate(name="agent-2"),
}
platform = RoutedPlatformClient(chat_mgr=chat_mgr, store=store, delegates=delegates)
user = await platform.get_or_create_user("ext-1", "matrix", display_name="Alice")
settings = await platform.get_settings("u1")
await platform.update_settings("u1", {"action": "noop"})
assert user.user_id == "user-agent-1"
assert settings.skills == {"files": True}
assert delegates["agent-1"].user_calls == [
{
"external_id": "ext-1",
"platform": "matrix",
"display_name": "Alice",
}
]
assert delegates["agent-2"].user_calls == []
assert delegates["agent-1"].settings_calls == ["u1"]
assert delegates["agent-1"].update_calls == [("u1", {"action": "noop"})]
@pytest.mark.asyncio
async def test_build_runtime_real_backend_uses_routed_platform_with_registry(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
):
registry_path = tmp_path / "matrix-agents.yaml"
registry_path.write_text(
"agents:\n"
" - id: agent-1\n"
" label: Analyst\n"
" - id: agent-2\n"
" label: Research\n",
encoding="utf-8",
)
monkeypatch.setenv("MATRIX_PLATFORM_BACKEND", "real")
monkeypatch.setenv("MATRIX_AGENT_REGISTRY_PATH", str(registry_path))
monkeypatch.setenv("AGENT_BASE_URL", "http://agent.example")
runtime = build_runtime()
assert isinstance(runtime.platform, RoutedPlatformClient)
assert set(runtime.platform._delegates) == {"agent-1", "agent-2"}
assert runtime.platform._delegates["agent-1"].agent_base_url == "http://agent.example"
assert runtime.platform._delegates["agent-1"].agent_id == "agent-1"
assert runtime.platform._delegates["agent-2"].agent_id == "agent-2"
@pytest.mark.asyncio
async def test_bot_keeps_local_chat_id_for_plain_message_dispatch():
runtime = build_runtime(platform=MockPlatformClient())
await set_room_meta(
runtime.store,
"!chat1:example.org",
{
"chat_id": "C1",
"matrix_user_id": "@alice:example.org",
"platform_chat_id": "41",
"agent_id": "agent-2",
},
)
runtime.dispatcher.dispatch = AsyncMock(return_value=[])
bot = MatrixBot(SimpleNamespace(user_id="@bot:example.org"), runtime)
await bot.on_room_message(
SimpleNamespace(room_id="!chat1:example.org"),
SimpleNamespace(sender="@alice:example.org", body="hello"),
)
dispatched = runtime.dispatcher.dispatch.await_args.args[0]
assert dispatched.chat_id == "C1"
assert dispatched.text == "hello"