surfaces/adapter/matrix/bot.py
Mikhail Putilovskij e733119d1e feat: enforce agent routing and persist restart state
Task 4: stale room blocking + agent_id binding
- MatrixBot._check_agent_routing: blocks normal messages when user has no
  selected agent or room is bound to a different agent
- agent_routing_enabled flag on MatrixRuntime activates the check only
  in real multi-agent mode (RoutedPlatformClient)
- make_handle_new_chat now writes agent_id into new room metadata when
  user already has a selected agent

Task 5: durable restart state tests
- test_restart_persistence.py proves selected_agent_id, room agent_id,
  platform_chat_id, and the sequence counter all survive SQLiteStore
  close/reopen; also covers clean startup with no prior state
2026-04-24 14:01:49 +03:00

780 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import asyncio
import os
import re
from dataclasses import dataclass
from pathlib import Path
from urllib.parse import urlsplit, urlunsplit
import structlog
from dotenv import load_dotenv
from nio import (
AsyncClient,
AsyncClientConfig,
InviteMemberEvent,
MatrixRoom,
RoomMemberEvent,
RoomMessage,
RoomMessageAudio,
RoomMessageFile,
RoomMessageImage,
RoomMessageText,
RoomMessageVideo,
)
from nio.responses import SyncResponse
from adapter.matrix.converter import from_room_event
from adapter.matrix.files import (
download_matrix_attachment,
matrix_msgtype_for_attachment,
resolve_workspace_attachment_path,
)
from adapter.matrix.agent_registry import AgentRegistry, AgentRegistryError, load_agent_registry
from adapter.matrix.handlers import register_matrix_handlers
from adapter.matrix.handlers.auth import handle_invite, provision_workspace_chat
from adapter.matrix.handlers.context_commands import (
LOAD_PROMPT,
)
from adapter.matrix.routed_platform import RoutedPlatformClient
from adapter.matrix.room_router import resolve_chat_id
from adapter.matrix.store import (
add_staged_attachment,
clear_load_pending,
clear_staged_attachments,
get_load_pending,
get_room_meta,
get_selected_agent_id,
get_staged_attachments,
next_platform_chat_id,
remove_staged_attachment_at,
set_pending_confirm,
set_platform_chat_id,
set_room_agent_id,
set_room_meta,
)
from core.auth import AuthManager
from core.chat import ChatManager
from core.handler import EventDispatcher
from core.handlers import register_all
from core.protocol import (
Attachment,
IncomingCommand,
IncomingMessage,
OutgoingEvent,
OutgoingMessage,
OutgoingNotification,
OutgoingTyping,
OutgoingUI,
)
from core.settings import SettingsManager
from core.store import InMemoryStore, SQLiteStore, StateStore
from sdk.interface import PlatformClient, PlatformError
from sdk.mock import MockPlatformClient
from sdk.prototype_state import PrototypeStateStore
from sdk.real import RealPlatformClient
logger = structlog.get_logger(__name__)
load_dotenv(Path(__file__).resolve().parents[2] / ".env")
@dataclass
class MatrixRuntime:
platform: PlatformClient
store: StateStore
chat_mgr: ChatManager
auth_mgr: AuthManager
settings_mgr: SettingsManager
dispatcher: EventDispatcher
agent_routing_enabled: bool = False
def build_event_dispatcher(platform: PlatformClient, store: StateStore) -> EventDispatcher:
chat_mgr = ChatManager(platform, store)
auth_mgr = AuthManager(platform, store)
settings_mgr = SettingsManager(platform, store)
prototype_state = getattr(platform, "_prototype_state", None)
agent_base_url = _agent_base_url_from_env()
registry = _load_agent_registry_from_env()
dispatcher = EventDispatcher(
platform=platform, chat_mgr=chat_mgr, auth_mgr=auth_mgr, settings_mgr=settings_mgr
)
register_all(dispatcher)
register_matrix_handlers(
dispatcher,
store=store,
registry=registry,
prototype_state=prototype_state,
agent_base_url=agent_base_url,
)
return dispatcher
def _normalize_agent_base_url(url: str) -> str:
parsed = urlsplit(url)
path = re.sub(r"(?:/v1)?/agent_ws(?:/[^/]+)?/?$", "", parsed.path.rstrip("/"))
return urlunsplit((parsed.scheme, parsed.netloc, path, "", ""))
def _agent_base_url_from_env() -> str:
if base_url := os.environ.get("AGENT_BASE_URL"):
return base_url
if ws_url := os.environ.get("AGENT_WS_URL"):
return _normalize_agent_base_url(ws_url)
return "http://127.0.0.1:8000"
def _load_agent_registry_from_env(required: bool = False) -> AgentRegistry | None:
registry_path = os.environ.get("MATRIX_AGENT_REGISTRY_PATH", "").strip()
if not registry_path:
if required:
raise RuntimeError(
"MATRIX_AGENT_REGISTRY_PATH is required when MATRIX_PLATFORM_BACKEND=real"
)
return None
try:
return load_agent_registry(registry_path)
except (AgentRegistryError, OSError) as exc:
raise RuntimeError(f"failed to load matrix agent registry: {registry_path}") from exc
def _build_platform_from_env(*, store: StateStore, chat_mgr: ChatManager) -> PlatformClient:
backend = os.environ.get("MATRIX_PLATFORM_BACKEND", "mock").strip().lower()
if backend == "real":
prototype_state = PrototypeStateStore()
registry = _load_agent_registry_from_env(required=True)
assert registry is not None
delegates = {
agent.agent_id: RealPlatformClient(
agent_id=agent.agent_id,
agent_base_url=_agent_base_url_from_env(),
prototype_state=prototype_state,
platform="matrix",
)
for agent in registry.agents
}
return RoutedPlatformClient(
chat_mgr=chat_mgr,
store=store,
delegates=delegates,
)
return MockPlatformClient()
def build_runtime(
platform: PlatformClient | None = None,
store: StateStore | None = None,
client: AsyncClient | None = None,
) -> MatrixRuntime:
store = store or InMemoryStore()
chat_mgr = ChatManager(platform, store)
platform = platform or _build_platform_from_env(store=store, chat_mgr=chat_mgr)
chat_mgr = ChatManager(platform, store)
auth_mgr = AuthManager(platform, store)
settings_mgr = SettingsManager(platform, store)
prototype_state = getattr(platform, "_prototype_state", None)
agent_base_url = _agent_base_url_from_env()
registry = _load_agent_registry_from_env()
dispatcher = EventDispatcher(
platform=platform, chat_mgr=chat_mgr, auth_mgr=auth_mgr, settings_mgr=settings_mgr
)
register_all(dispatcher)
register_matrix_handlers(
dispatcher,
client=client,
store=store,
registry=registry,
prototype_state=prototype_state,
agent_base_url=agent_base_url,
)
return MatrixRuntime(
platform=platform,
store=store,
chat_mgr=chat_mgr,
auth_mgr=auth_mgr,
settings_mgr=settings_mgr,
dispatcher=dispatcher,
agent_routing_enabled=isinstance(platform, RoutedPlatformClient),
)
class MatrixBot:
def __init__(self, client: AsyncClient, runtime: MatrixRuntime) -> None:
self.client = client
self.runtime = runtime
async def _ensure_platform_chat_id(self, room_id: str, room_meta: dict | None) -> None:
if not room_meta:
return
if room_meta.get("redirect_room_id"):
return
if room_meta.get("platform_chat_id"):
return
await set_platform_chat_id(
self.runtime.store,
room_id,
await next_platform_chat_id(self.runtime.store),
)
async def on_room_message(self, room: MatrixRoom, event: RoomMessageText) -> None:
if getattr(event, "sender", None) == self.client.user_id:
return
sender = getattr(event, "sender", None)
body = (getattr(event, "body", None) or "").strip()
room_meta = await get_room_meta(self.runtime.store, room.room_id)
if room_meta is not None and not room_meta.get("redirect_room_id"):
await self._ensure_platform_chat_id(room.room_id, room_meta)
load_pending = await get_load_pending(self.runtime.store, sender, room.room_id)
if load_pending is not None and (body.isdigit() or body == "!cancel"):
outgoing = await self._handle_load_selection(sender, room.room_id, body, load_pending)
await self._send_all(room.room_id, outgoing)
return
if room_meta is None:
outgoing = await self._bootstrap_unregistered_room(room, sender)
if outgoing:
await self._send_all(room.room_id, outgoing)
return
elif room_meta.get("redirect_room_id"):
redirect_room_id = room_meta["redirect_room_id"]
redirect_chat_id = room_meta.get("redirect_chat_id", "рабочий чат")
await self._send_all(
room.room_id,
[
OutgoingMessage(
chat_id=room.room_id,
text=(
f"Рабочий чат уже создан: {redirect_chat_id}. "
"Открой приглашённую комнату для продолжения."
),
)
],
)
logger.info(
"matrix_redirect_entry_room",
room_id=room.room_id,
redirect_room_id=redirect_room_id,
user=sender,
)
return
if not body.startswith("!") and self.runtime.agent_routing_enabled:
block = await self._check_agent_routing(room.room_id, sender, room_meta)
if block is not None:
await self._send_all(room.room_id, block)
return
local_chat_id = await resolve_chat_id(self.runtime.store, room.room_id, sender)
incoming = from_room_event(event, room_id=room.room_id, chat_id=local_chat_id)
if incoming is None:
return
if isinstance(incoming, IncomingCommand) and incoming.command in {
"matrix_list_attachments",
"matrix_remove_attachment",
}:
outgoing = await self._handle_staged_attachment_command(
room.room_id,
sender,
incoming,
)
await self._send_all(room.room_id, outgoing)
return
if self._is_file_only_event(event, incoming):
materialized = await self._materialize_incoming_attachments(
room.room_id,
sender,
incoming,
)
await self._stage_attachments(room.room_id, sender, materialized.attachments)
return
if isinstance(incoming, IncomingMessage) and incoming.attachments:
incoming = await self._materialize_incoming_attachments(
room.room_id,
sender,
incoming,
)
clear_staged_after_dispatch = False
if isinstance(incoming, IncomingMessage) and incoming.text:
incoming, clear_staged_after_dispatch = await self._merge_staged_attachments(
room.room_id,
sender,
incoming,
)
try:
outgoing = await self.runtime.dispatcher.dispatch(incoming)
except PlatformError as exc:
logger.warning(
"matrix_message_platform_error",
room_id=room.room_id,
sender=getattr(event, "sender", None),
code=exc.code,
error=str(exc),
)
outgoing = [
OutgoingMessage(
chat_id=local_chat_id,
text="Сервис временно недоступен. Попробуйте ещё раз позже.",
)
]
else:
if clear_staged_after_dispatch:
await clear_staged_attachments(self.runtime.store, room.room_id, sender)
await self._send_all(room.room_id, outgoing)
def _is_file_only_event(
self, event: RoomMessage, incoming: IncomingMessage | IncomingCommand
) -> bool:
return (
isinstance(incoming, IncomingMessage)
and bool(incoming.attachments)
and not isinstance(event, RoomMessageText)
)
async def _stage_attachments(
self,
room_id: str,
user_id: str,
attachments: list,
) -> None:
for attachment in attachments:
await add_staged_attachment(
self.runtime.store,
room_id,
user_id,
{
"type": attachment.type,
"url": attachment.url,
"filename": attachment.filename,
"mime_type": attachment.mime_type,
"workspace_path": attachment.workspace_path,
},
)
async def _format_staged_attachments(
self,
room_id: str,
user_id: str,
*,
include_hint: bool = False,
) -> str:
attachments = await get_staged_attachments(self.runtime.store, room_id, user_id)
if not attachments:
return "Нет сохраненных вложений."
lines = ["Вложения в очереди:"]
for index, attachment in enumerate(attachments, start=1):
lines.append(f"{index}. {attachment.get('filename') or 'attachment'}")
if include_hint:
lines.extend(
[
"",
"Следующее сообщение отправит файлы агенту.",
"Команды: !list, !remove <n>, !remove all",
]
)
return "\n".join(lines)
async def _handle_staged_attachment_command(
self,
room_id: str,
user_id: str,
incoming: IncomingCommand,
) -> list[OutgoingEvent]:
if incoming.command == "matrix_list_attachments":
return [
OutgoingMessage(
chat_id=incoming.chat_id,
text=await self._format_staged_attachments(room_id, user_id),
)
]
arg = incoming.args[0] if incoming.args else ""
if arg == "all":
await clear_staged_attachments(self.runtime.store, room_id, user_id)
return [OutgoingMessage(chat_id=incoming.chat_id, text="Все вложения удалены.")]
try:
index = int(arg) - 1
except ValueError:
return [OutgoingMessage(chat_id=incoming.chat_id, text="Нет такого вложения.")]
removed = await remove_staged_attachment_at(self.runtime.store, room_id, user_id, index)
if removed is None:
return [OutgoingMessage(chat_id=incoming.chat_id, text="Нет такого вложения.")]
return [
OutgoingMessage(
chat_id=incoming.chat_id,
text=await self._format_staged_attachments(room_id, user_id),
)
]
async def _merge_staged_attachments(
self,
room_id: str,
user_id: str,
incoming: IncomingMessage,
) -> tuple[IncomingMessage, bool]:
staged = await get_staged_attachments(self.runtime.store, room_id, user_id)
if not staged:
return incoming, False
attachments = [
Attachment(
type=item.get("type", "document"),
url=item.get("url"),
filename=item.get("filename"),
mime_type=item.get("mime_type"),
workspace_path=item.get("workspace_path"),
)
for item in staged
]
return (
IncomingMessage(
user_id=incoming.user_id,
platform=incoming.platform,
chat_id=incoming.chat_id,
text=incoming.text,
attachments=attachments,
reply_to=incoming.reply_to,
),
True,
)
async def _materialize_incoming_attachments(
self,
room_id: str,
matrix_user_id: str,
incoming: IncomingMessage,
) -> IncomingMessage:
workspace_root = Path(os.environ.get("SURFACES_WORKSPACE_DIR", "/workspace"))
materialized = []
for attachment in incoming.attachments:
materialized.append(
await download_matrix_attachment(
client=self.client,
workspace_root=workspace_root,
matrix_user_id=matrix_user_id,
room_id=room_id,
attachment=attachment,
)
)
return IncomingMessage(
user_id=incoming.user_id,
platform=incoming.platform,
chat_id=incoming.chat_id,
text=incoming.text,
attachments=materialized,
reply_to=incoming.reply_to,
)
async def _bootstrap_unregistered_room(
self,
room: MatrixRoom,
sender: str,
) -> list[OutgoingEvent] | None:
if not hasattr(self.client, "room_create") or not hasattr(self.client, "room_put_state"):
return None
display_name = getattr(room, "display_name", None) or sender
try:
created = await provision_workspace_chat(
self.client,
sender,
display_name,
self.runtime.platform,
self.runtime.store,
self.runtime.auth_mgr,
self.runtime.chat_mgr,
)
except Exception as exc:
logger.warning(
"matrix_unregistered_room_bootstrap_failed",
room_id=room.room_id,
sender=sender,
error=str(exc),
)
return [
OutgoingMessage(
chat_id=room.room_id,
text="Не удалось подготовить рабочий чат. Попробуйте ещё раз позже.",
)
]
welcome = (
f"Привет, {created['user'].display_name or sender}! Пиши — я здесь.\n\n"
"Команды: !new · !chats · !rename · !archive · !context · !save · !load · !help"
)
await set_room_meta(
self.runtime.store,
room.room_id,
{
"matrix_user_id": sender,
"redirect_room_id": created["chat_room_id"],
"redirect_chat_id": created["chat_id"],
},
)
await self.client.room_send(
created["chat_room_id"],
"m.room.message",
{"msgtype": "m.text", "body": welcome},
)
return [
OutgoingMessage(
chat_id=room.room_id,
text=(
f"Создал рабочий чат {created['room_name']} ({created['chat_id']}) "
"и добавил его в пространство Lambda. "
"Открой приглашённую комнату для продолжения."
),
)
]
async def _handle_load_selection(
self,
user_id: str,
room_id: str,
text: str,
pending: dict,
) -> list[OutgoingEvent]:
saves = pending.get("saves", [])
if text in {"0", "!cancel"}:
await clear_load_pending(self.runtime.store, user_id, room_id)
return [OutgoingMessage(chat_id=room_id, text="Отменено.")]
index = int(text) - 1
if index < 0 or index >= len(saves):
return [
OutgoingMessage(
chat_id=room_id,
text=f"Неверный номер. Введи от 1 до {len(saves)} или 0 для отмены.",
)
]
name = saves[index]["name"]
await clear_load_pending(self.runtime.store, user_id, room_id)
prototype_state = getattr(self.runtime.platform, "_prototype_state", None)
if prototype_state is not None:
room_meta = await get_room_meta(self.runtime.store, room_id)
context_keys = []
if room_meta is not None:
platform_chat_id = room_meta.get("platform_chat_id")
if platform_chat_id:
context_keys.append(platform_chat_id)
chat_id = room_meta.get("chat_id")
if chat_id:
context_keys.append(chat_id)
if not context_keys:
context_keys.append(room_id)
for context_key in dict.fromkeys(context_keys):
await prototype_state.set_current_session(context_key, name)
try:
await self.runtime.platform.send_message(
user_id,
room_id,
LOAD_PROMPT.format(name=name),
)
except Exception as exc:
logger.warning("load_agent_call_failed", error=str(exc))
return [OutgoingMessage(chat_id=room_id, text=f"Ошибка при загрузке: {exc}")]
return [
OutgoingMessage(chat_id=room_id, text=f"Запрос на загрузку отправлен агенту: {name}")
]
async def on_member(self, room: MatrixRoom, event: RoomMemberEvent) -> None:
if getattr(event, "sender", None) == self.client.user_id:
return
membership = getattr(event, "membership", None)
if membership == "invite":
await handle_invite(
self.client,
room,
event,
self.runtime.platform,
self.runtime.store,
self.runtime.auth_mgr,
self.runtime.chat_mgr,
)
async def _check_agent_routing(
self,
room_id: str,
sender: str,
room_meta: dict,
) -> list[OutgoingEvent] | None:
selected_agent_id = await get_selected_agent_id(self.runtime.store, sender)
if not selected_agent_id:
return [
OutgoingMessage(
chat_id=room_id,
text="Выбери агент через !agent прежде чем отправлять сообщения.",
)
]
room_agent_id = room_meta.get("agent_id")
if room_agent_id and room_agent_id != selected_agent_id:
return [
OutgoingMessage(
chat_id=room_id,
text=(
f"Этот чат привязан к агенту «{room_agent_id}». "
"Создай новый чат командой !new."
),
)
]
if not room_agent_id:
await set_room_agent_id(self.runtime.store, room_id, selected_agent_id)
await self._ensure_platform_chat_id(
room_id, await get_room_meta(self.runtime.store, room_id)
)
return None
async def _send_all(self, room_id: str, outgoing: list[OutgoingEvent]) -> None:
for event in outgoing:
await send_outgoing(self.client, room_id, event, store=self.runtime.store)
async def prepare_live_sync(client: AsyncClient) -> str | None:
response = await client.sync(timeout=0, full_state=True)
if isinstance(response, SyncResponse):
return response.next_batch
return None
async def send_outgoing(
client: AsyncClient,
room_id: str,
event: OutgoingEvent,
store: StateStore | None = None,
) -> None:
if isinstance(event, OutgoingTyping):
await client.room_typing(room_id, event.is_typing, timeout=25000)
return
if isinstance(event, OutgoingNotification):
body = f"[{event.level.upper()}] {event.text}"
await client.room_send(room_id, "m.room.message", {"msgtype": "m.text", "body": body})
return
if isinstance(event, OutgoingMessage):
if event.text:
await client.room_send(
room_id, "m.room.message", {"msgtype": "m.text", "body": event.text}
)
if event.attachments:
workspace_root = Path(os.environ.get("SURFACES_WORKSPACE_DIR", "/workspace"))
for attachment in event.attachments:
if not attachment.workspace_path:
continue
file_path = resolve_workspace_attachment_path(
workspace_root, attachment.workspace_path
)
with file_path.open("rb") as handle:
upload_response, _ = await client.upload(
handle,
content_type=attachment.mime_type or "application/octet-stream",
filename=attachment.filename or file_path.name,
filesize=file_path.stat().st_size,
)
content_uri = getattr(upload_response, "content_uri", None)
if not content_uri:
raise RuntimeError(f"Matrix upload failed for {file_path}")
await client.room_send(
room_id,
"m.room.message",
{
"msgtype": matrix_msgtype_for_attachment(attachment),
"body": attachment.filename or file_path.name,
"url": content_uri,
},
)
return
if isinstance(event, OutgoingUI):
lines = [event.text]
if event.buttons:
lines.append("")
for button in event.buttons:
lines.append(f" {button.label}")
lines.append("")
lines.append("Ответьте !yes для подтверждения или !no для отмены.")
body = "\n".join(lines)
await client.room_send(room_id, "m.room.message", {"msgtype": "m.text", "body": body})
if event.buttons and store is not None:
action_id = event.buttons[0].action
payload = event.buttons[0].payload
room_meta = await get_room_meta(store, room_id)
matrix_user_id = room_meta.get("matrix_user_id") if room_meta else None
if matrix_user_id:
await set_pending_confirm(
store,
matrix_user_id,
room_id,
{
"action_id": action_id,
"description": event.text,
"payload": payload,
},
)
return
async def main() -> None:
homeserver = os.environ.get("MATRIX_HOMESERVER")
user_id = os.environ.get("MATRIX_USER_ID")
device_id = os.environ.get("MATRIX_DEVICE_ID", "")
password = os.environ.get("MATRIX_PASSWORD")
token = os.environ.get("MATRIX_ACCESS_TOKEN")
db_path = os.environ.get("MATRIX_DB_PATH", "lambda_matrix.db")
store_path = os.environ.get("MATRIX_STORE_PATH", "matrix_store")
if not homeserver or not user_id:
raise RuntimeError("MATRIX_HOMESERVER and MATRIX_USER_ID are required")
client_config = AsyncClientConfig(
request_timeout=120,
max_timeouts=12,
max_limit_exceeded=20,
backoff_factor=0.5,
max_timeout_retry_wait_time=15,
)
client = AsyncClient(
homeserver,
user=user_id,
device_id=device_id,
store_path=store_path,
config=client_config,
)
runtime = build_runtime(store=SQLiteStore(db_path), client=client)
if token:
client.access_token = token
elif password:
await client.login(password=password, device_name="surfaces-bot")
since_token = await prepare_live_sync(client)
bot = MatrixBot(client, runtime)
client.add_event_callback(
bot.on_room_message,
(
RoomMessageText,
RoomMessageFile,
RoomMessageImage,
RoomMessageVideo,
RoomMessageAudio,
),
)
client.add_event_callback(bot.on_member, (InviteMemberEvent, RoomMemberEvent))
logger.info(
"Matrix bot starting",
homeserver=homeserver,
user_id=user_id,
store_path=store_path,
request_timeout=client_config.request_timeout,
)
try:
await client.sync_forever(timeout=30000, since=since_token)
finally:
close = getattr(runtime.platform, "close", None)
if callable(close):
await close()
await client.close()
if __name__ == "__main__":
asyncio.run(main())