surfaces/adapter/matrix/bot.py

709 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import asyncio
import os
import re
from dataclasses import dataclass
from pathlib import Path
from urllib.parse import urlsplit, urlunsplit
import structlog
from dotenv import load_dotenv
from nio import (
AsyncClient,
AsyncClientConfig,
InviteMemberEvent,
MatrixRoom,
RoomMemberEvent,
RoomMessage,
RoomMessageAudio,
RoomMessageFile,
RoomMessageImage,
RoomMessageText,
RoomMessageVideo,
)
from nio.responses import SyncResponse
from adapter.matrix.converter import from_room_event
from adapter.matrix.files import (
download_matrix_attachment,
matrix_msgtype_for_attachment,
resolve_workspace_attachment_path,
)
from adapter.matrix.handlers import register_matrix_handlers
from adapter.matrix.handlers.auth import handle_invite, provision_workspace_chat
from adapter.matrix.handlers.context_commands import (
LOAD_PROMPT,
)
from adapter.matrix.room_router import resolve_chat_id
from adapter.matrix.store import (
add_staged_attachment,
clear_load_pending,
clear_staged_attachments,
get_load_pending,
get_room_meta,
get_staged_attachments,
next_platform_chat_id,
remove_staged_attachment_at,
set_pending_confirm,
set_platform_chat_id,
set_room_meta,
)
from core.auth import AuthManager
from core.chat import ChatManager
from core.handler import EventDispatcher
from core.handlers import register_all
from core.protocol import (
Attachment,
IncomingCommand,
IncomingMessage,
OutgoingEvent,
OutgoingMessage,
OutgoingNotification,
OutgoingTyping,
OutgoingUI,
)
from core.settings import SettingsManager
from core.store import InMemoryStore, SQLiteStore, StateStore
from sdk.interface import PlatformClient, PlatformError
from sdk.mock import MockPlatformClient
from sdk.prototype_state import PrototypeStateStore
from sdk.real import RealPlatformClient
logger = structlog.get_logger(__name__)
load_dotenv(Path(__file__).resolve().parents[2] / ".env")
@dataclass
class MatrixRuntime:
platform: PlatformClient
store: StateStore
chat_mgr: ChatManager
auth_mgr: AuthManager
settings_mgr: SettingsManager
dispatcher: EventDispatcher
def build_event_dispatcher(platform: PlatformClient, store: StateStore) -> EventDispatcher:
chat_mgr = ChatManager(platform, store)
auth_mgr = AuthManager(platform, store)
settings_mgr = SettingsManager(platform, store)
prototype_state = getattr(platform, "_prototype_state", None)
agent_base_url = _agent_base_url_from_env()
dispatcher = EventDispatcher(
platform=platform, chat_mgr=chat_mgr, auth_mgr=auth_mgr, settings_mgr=settings_mgr
)
register_all(dispatcher)
register_matrix_handlers(
dispatcher,
store=store,
prototype_state=prototype_state,
agent_base_url=agent_base_url,
)
return dispatcher
def _normalize_agent_base_url(url: str) -> str:
parsed = urlsplit(url)
path = re.sub(r"(?:/v1)?/agent_ws(?:/[^/]+)?/?$", "", parsed.path.rstrip("/"))
return urlunsplit((parsed.scheme, parsed.netloc, path, "", ""))
def _agent_base_url_from_env() -> str:
if base_url := os.environ.get("AGENT_BASE_URL"):
return base_url
if ws_url := os.environ.get("AGENT_WS_URL"):
return _normalize_agent_base_url(ws_url)
return "http://127.0.0.1:8000"
def _build_platform_from_env() -> PlatformClient:
backend = os.environ.get("MATRIX_PLATFORM_BACKEND", "mock").strip().lower()
if backend == "real":
return RealPlatformClient(
agent_id="matrix-bot",
agent_base_url=_agent_base_url_from_env(),
prototype_state=PrototypeStateStore(),
platform="matrix",
)
return MockPlatformClient()
def build_runtime(
platform: PlatformClient | None = None,
store: StateStore | None = None,
client: AsyncClient | None = None,
) -> MatrixRuntime:
platform = platform or _build_platform_from_env()
store = store or InMemoryStore()
chat_mgr = ChatManager(platform, store)
auth_mgr = AuthManager(platform, store)
settings_mgr = SettingsManager(platform, store)
prototype_state = getattr(platform, "_prototype_state", None)
agent_base_url = _agent_base_url_from_env()
dispatcher = EventDispatcher(
platform=platform, chat_mgr=chat_mgr, auth_mgr=auth_mgr, settings_mgr=settings_mgr
)
register_all(dispatcher)
register_matrix_handlers(
dispatcher,
client=client,
store=store,
prototype_state=prototype_state,
agent_base_url=agent_base_url,
)
return MatrixRuntime(
platform=platform,
store=store,
chat_mgr=chat_mgr,
auth_mgr=auth_mgr,
settings_mgr=settings_mgr,
dispatcher=dispatcher,
)
class MatrixBot:
def __init__(self, client: AsyncClient, runtime: MatrixRuntime) -> None:
self.client = client
self.runtime = runtime
async def _ensure_platform_chat_id(self, room_id: str, room_meta: dict | None) -> None:
if not room_meta:
return
if room_meta.get("redirect_room_id"):
return
if room_meta.get("platform_chat_id"):
return
await set_platform_chat_id(
self.runtime.store,
room_id,
await next_platform_chat_id(self.runtime.store),
)
async def on_room_message(self, room: MatrixRoom, event: RoomMessageText) -> None:
if getattr(event, "sender", None) == self.client.user_id:
return
sender = getattr(event, "sender", None)
body = (getattr(event, "body", None) or "").strip()
room_meta = await get_room_meta(self.runtime.store, room.room_id)
if room_meta is not None and not room_meta.get("redirect_room_id"):
await self._ensure_platform_chat_id(room.room_id, room_meta)
load_pending = await get_load_pending(self.runtime.store, sender, room.room_id)
if load_pending is not None and (body.isdigit() or body == "!cancel"):
outgoing = await self._handle_load_selection(sender, room.room_id, body, load_pending)
await self._send_all(room.room_id, outgoing)
return
if room_meta is None:
outgoing = await self._bootstrap_unregistered_room(room, sender)
if outgoing:
await self._send_all(room.room_id, outgoing)
return
elif room_meta.get("redirect_room_id"):
redirect_room_id = room_meta["redirect_room_id"]
redirect_chat_id = room_meta.get("redirect_chat_id", "рабочий чат")
await self._send_all(
room.room_id,
[
OutgoingMessage(
chat_id=room.room_id,
text=(
f"Рабочий чат уже создан: {redirect_chat_id}. "
"Открой приглашённую комнату для продолжения."
),
)
],
)
logger.info(
"matrix_redirect_entry_room",
room_id=room.room_id,
redirect_room_id=redirect_room_id,
user=sender,
)
return
local_chat_id = await resolve_chat_id(self.runtime.store, room.room_id, sender)
dispatch_chat_id = local_chat_id
if not body.startswith("!"):
dispatch_chat_id = (room_meta or {}).get("platform_chat_id") or local_chat_id
incoming = from_room_event(event, room_id=room.room_id, chat_id=dispatch_chat_id)
if incoming is None:
return
if isinstance(incoming, IncomingCommand) and incoming.command in {
"matrix_list_attachments",
"matrix_remove_attachment",
}:
outgoing = await self._handle_staged_attachment_command(
room.room_id,
sender,
incoming,
)
await self._send_all(room.room_id, outgoing)
return
if self._is_file_only_event(event, incoming):
materialized = await self._materialize_incoming_attachments(
room.room_id,
sender,
incoming,
)
await self._stage_attachments(room.room_id, sender, materialized.attachments)
return
if isinstance(incoming, IncomingMessage) and incoming.attachments:
incoming = await self._materialize_incoming_attachments(
room.room_id,
sender,
incoming,
)
clear_staged_after_dispatch = False
if isinstance(incoming, IncomingMessage) and incoming.text:
incoming, clear_staged_after_dispatch = await self._merge_staged_attachments(
room.room_id,
sender,
incoming,
)
try:
outgoing = await self.runtime.dispatcher.dispatch(incoming)
except PlatformError as exc:
logger.warning(
"matrix_message_platform_error",
room_id=room.room_id,
sender=getattr(event, "sender", None),
code=exc.code,
error=str(exc),
)
outgoing = [
OutgoingMessage(
chat_id=dispatch_chat_id,
text="Сервис временно недоступен. Попробуйте ещё раз позже.",
)
]
else:
if clear_staged_after_dispatch:
await clear_staged_attachments(self.runtime.store, room.room_id, sender)
await self._send_all(room.room_id, outgoing)
def _is_file_only_event(
self, event: RoomMessage, incoming: IncomingMessage | IncomingCommand
) -> bool:
return (
isinstance(incoming, IncomingMessage)
and bool(incoming.attachments)
and not isinstance(event, RoomMessageText)
)
async def _stage_attachments(
self,
room_id: str,
user_id: str,
attachments: list,
) -> None:
for attachment in attachments:
await add_staged_attachment(
self.runtime.store,
room_id,
user_id,
{
"type": attachment.type,
"url": attachment.url,
"filename": attachment.filename,
"mime_type": attachment.mime_type,
"workspace_path": attachment.workspace_path,
},
)
async def _format_staged_attachments(
self,
room_id: str,
user_id: str,
*,
include_hint: bool = False,
) -> str:
attachments = await get_staged_attachments(self.runtime.store, room_id, user_id)
if not attachments:
return "Нет сохраненных вложений."
lines = ["Вложения в очереди:"]
for index, attachment in enumerate(attachments, start=1):
lines.append(f"{index}. {attachment.get('filename') or 'attachment'}")
if include_hint:
lines.extend(
[
"",
"Следующее сообщение отправит файлы агенту.",
"Команды: !list, !remove <n>, !remove all",
]
)
return "\n".join(lines)
async def _handle_staged_attachment_command(
self,
room_id: str,
user_id: str,
incoming: IncomingCommand,
) -> list[OutgoingEvent]:
if incoming.command == "matrix_list_attachments":
return [
OutgoingMessage(
chat_id=incoming.chat_id,
text=await self._format_staged_attachments(room_id, user_id),
)
]
arg = incoming.args[0] if incoming.args else ""
if arg == "all":
await clear_staged_attachments(self.runtime.store, room_id, user_id)
return [OutgoingMessage(chat_id=incoming.chat_id, text="Все вложения удалены.")]
try:
index = int(arg) - 1
except ValueError:
return [OutgoingMessage(chat_id=incoming.chat_id, text="Нет такого вложения.")]
removed = await remove_staged_attachment_at(self.runtime.store, room_id, user_id, index)
if removed is None:
return [OutgoingMessage(chat_id=incoming.chat_id, text="Нет такого вложения.")]
return [
OutgoingMessage(
chat_id=incoming.chat_id,
text=await self._format_staged_attachments(room_id, user_id),
)
]
async def _merge_staged_attachments(
self,
room_id: str,
user_id: str,
incoming: IncomingMessage,
) -> tuple[IncomingMessage, bool]:
staged = await get_staged_attachments(self.runtime.store, room_id, user_id)
if not staged:
return incoming, False
attachments = [
Attachment(
type=item.get("type", "document"),
url=item.get("url"),
filename=item.get("filename"),
mime_type=item.get("mime_type"),
workspace_path=item.get("workspace_path"),
)
for item in staged
]
return (
IncomingMessage(
user_id=incoming.user_id,
platform=incoming.platform,
chat_id=incoming.chat_id,
text=incoming.text,
attachments=attachments,
reply_to=incoming.reply_to,
),
True,
)
async def _materialize_incoming_attachments(
self,
room_id: str,
matrix_user_id: str,
incoming: IncomingMessage,
) -> IncomingMessage:
workspace_root = Path(os.environ.get("SURFACES_WORKSPACE_DIR", "/workspace"))
materialized = []
for attachment in incoming.attachments:
materialized.append(
await download_matrix_attachment(
client=self.client,
workspace_root=workspace_root,
matrix_user_id=matrix_user_id,
room_id=room_id,
attachment=attachment,
)
)
return IncomingMessage(
user_id=incoming.user_id,
platform=incoming.platform,
chat_id=incoming.chat_id,
text=incoming.text,
attachments=materialized,
reply_to=incoming.reply_to,
)
async def _bootstrap_unregistered_room(
self,
room: MatrixRoom,
sender: str,
) -> list[OutgoingEvent] | None:
if not hasattr(self.client, "room_create") or not hasattr(self.client, "room_put_state"):
return None
display_name = getattr(room, "display_name", None) or sender
try:
created = await provision_workspace_chat(
self.client,
sender,
display_name,
self.runtime.platform,
self.runtime.store,
self.runtime.auth_mgr,
self.runtime.chat_mgr,
)
except Exception as exc:
logger.warning(
"matrix_unregistered_room_bootstrap_failed",
room_id=room.room_id,
sender=sender,
error=str(exc),
)
return [
OutgoingMessage(
chat_id=room.room_id,
text="Не удалось подготовить рабочий чат. Попробуйте ещё раз позже.",
)
]
welcome = (
f"Привет, {created['user'].display_name or sender}! Пиши — я здесь.\n\n"
"Команды: !new · !chats · !rename · !archive · !context · !save · !load · !help"
)
await set_room_meta(
self.runtime.store,
room.room_id,
{
"matrix_user_id": sender,
"redirect_room_id": created["chat_room_id"],
"redirect_chat_id": created["chat_id"],
},
)
await self.client.room_send(
created["chat_room_id"],
"m.room.message",
{"msgtype": "m.text", "body": welcome},
)
return [
OutgoingMessage(
chat_id=room.room_id,
text=(
f"Создал рабочий чат {created['room_name']} ({created['chat_id']}) "
"и добавил его в пространство Lambda. "
"Открой приглашённую комнату для продолжения."
),
)
]
async def _handle_load_selection(
self,
user_id: str,
room_id: str,
text: str,
pending: dict,
) -> list[OutgoingEvent]:
saves = pending.get("saves", [])
if text in {"0", "!cancel"}:
await clear_load_pending(self.runtime.store, user_id, room_id)
return [OutgoingMessage(chat_id=room_id, text="Отменено.")]
index = int(text) - 1
if index < 0 or index >= len(saves):
return [
OutgoingMessage(
chat_id=room_id,
text=f"Неверный номер. Введи от 1 до {len(saves)} или 0 для отмены.",
)
]
name = saves[index]["name"]
await clear_load_pending(self.runtime.store, user_id, room_id)
prototype_state = getattr(self.runtime.platform, "_prototype_state", None)
if prototype_state is not None:
room_meta = await get_room_meta(self.runtime.store, room_id)
context_keys = []
if room_meta is not None:
platform_chat_id = room_meta.get("platform_chat_id")
if platform_chat_id:
context_keys.append(platform_chat_id)
chat_id = room_meta.get("chat_id")
if chat_id:
context_keys.append(chat_id)
if not context_keys:
context_keys.append(room_id)
for context_key in dict.fromkeys(context_keys):
await prototype_state.set_current_session(context_key, name)
try:
await self.runtime.platform.send_message(
user_id,
room_id,
LOAD_PROMPT.format(name=name),
)
except Exception as exc:
logger.warning("load_agent_call_failed", error=str(exc))
return [OutgoingMessage(chat_id=room_id, text=f"Ошибка при загрузке: {exc}")]
return [
OutgoingMessage(chat_id=room_id, text=f"Запрос на загрузку отправлен агенту: {name}")
]
async def on_member(self, room: MatrixRoom, event: RoomMemberEvent) -> None:
if getattr(event, "sender", None) == self.client.user_id:
return
membership = getattr(event, "membership", None)
if membership == "invite":
await handle_invite(
self.client,
room,
event,
self.runtime.platform,
self.runtime.store,
self.runtime.auth_mgr,
self.runtime.chat_mgr,
)
async def _send_all(self, room_id: str, outgoing: list[OutgoingEvent]) -> None:
for event in outgoing:
await send_outgoing(self.client, room_id, event, store=self.runtime.store)
async def prepare_live_sync(client: AsyncClient) -> str | None:
response = await client.sync(timeout=0, full_state=True)
if isinstance(response, SyncResponse):
return response.next_batch
return None
async def send_outgoing(
client: AsyncClient,
room_id: str,
event: OutgoingEvent,
store: StateStore | None = None,
) -> None:
if isinstance(event, OutgoingTyping):
await client.room_typing(room_id, event.is_typing, timeout=25000)
return
if isinstance(event, OutgoingNotification):
body = f"[{event.level.upper()}] {event.text}"
await client.room_send(room_id, "m.room.message", {"msgtype": "m.text", "body": body})
return
if isinstance(event, OutgoingMessage):
if event.text:
await client.room_send(
room_id, "m.room.message", {"msgtype": "m.text", "body": event.text}
)
if event.attachments:
workspace_root = Path(os.environ.get("SURFACES_WORKSPACE_DIR", "/workspace"))
for attachment in event.attachments:
if not attachment.workspace_path:
continue
file_path = resolve_workspace_attachment_path(
workspace_root, attachment.workspace_path
)
with file_path.open("rb") as handle:
upload_response, _ = await client.upload(
handle,
content_type=attachment.mime_type or "application/octet-stream",
filename=attachment.filename or file_path.name,
filesize=file_path.stat().st_size,
)
content_uri = getattr(upload_response, "content_uri", None)
if not content_uri:
raise RuntimeError(f"Matrix upload failed for {file_path}")
await client.room_send(
room_id,
"m.room.message",
{
"msgtype": matrix_msgtype_for_attachment(attachment),
"body": attachment.filename or file_path.name,
"url": content_uri,
},
)
return
if isinstance(event, OutgoingUI):
lines = [event.text]
if event.buttons:
lines.append("")
for button in event.buttons:
lines.append(f" {button.label}")
lines.append("")
lines.append("Ответьте !yes для подтверждения или !no для отмены.")
body = "\n".join(lines)
await client.room_send(room_id, "m.room.message", {"msgtype": "m.text", "body": body})
if event.buttons and store is not None:
action_id = event.buttons[0].action
payload = event.buttons[0].payload
room_meta = await get_room_meta(store, room_id)
matrix_user_id = room_meta.get("matrix_user_id") if room_meta else None
if matrix_user_id:
await set_pending_confirm(
store,
matrix_user_id,
room_id,
{
"action_id": action_id,
"description": event.text,
"payload": payload,
},
)
return
async def main() -> None:
homeserver = os.environ.get("MATRIX_HOMESERVER")
user_id = os.environ.get("MATRIX_USER_ID")
device_id = os.environ.get("MATRIX_DEVICE_ID", "")
password = os.environ.get("MATRIX_PASSWORD")
token = os.environ.get("MATRIX_ACCESS_TOKEN")
db_path = os.environ.get("MATRIX_DB_PATH", "lambda_matrix.db")
store_path = os.environ.get("MATRIX_STORE_PATH", "matrix_store")
if not homeserver or not user_id:
raise RuntimeError("MATRIX_HOMESERVER and MATRIX_USER_ID are required")
client_config = AsyncClientConfig(
request_timeout=120,
max_timeouts=12,
max_limit_exceeded=20,
backoff_factor=0.5,
max_timeout_retry_wait_time=15,
)
client = AsyncClient(
homeserver,
user=user_id,
device_id=device_id,
store_path=store_path,
config=client_config,
)
runtime = build_runtime(store=SQLiteStore(db_path), client=client)
if token:
client.access_token = token
elif password:
await client.login(password=password, device_name="surfaces-bot")
since_token = await prepare_live_sync(client)
bot = MatrixBot(client, runtime)
client.add_event_callback(
bot.on_room_message,
(
RoomMessageText,
RoomMessageFile,
RoomMessageImage,
RoomMessageVideo,
RoomMessageAudio,
),
)
client.add_event_callback(bot.on_member, (InviteMemberEvent, RoomMemberEvent))
logger.info(
"Matrix bot starting",
homeserver=homeserver,
user_id=user_id,
store_path=store_path,
request_timeout=client_config.request_timeout,
)
try:
await client.sync_forever(timeout=30000, since=since_token)
finally:
close = getattr(runtime.platform, "close", None)
if callable(close):
await close()
await client.close()
if __name__ == "__main__":
asyncio.run(main())