surfaces/adapter/matrix/bot.py

971 lines
35 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import asyncio
import logging
import os
import re
from dataclasses import dataclass
from pathlib import Path
from urllib.parse import urlsplit, urlunsplit
import structlog
from dotenv import load_dotenv
from nio import (
AsyncClient,
AsyncClientConfig,
InviteMemberEvent,
MatrixRoom,
RoomMemberEvent,
RoomMessage,
RoomMessageAudio,
RoomMessageFile,
RoomMessageImage,
RoomMessageText,
RoomMessageVideo,
)
from nio.responses import SyncResponse
from adapter.matrix.agent_registry import AgentRegistry, AgentRegistryError, load_agent_registry
from adapter.matrix.converter import from_room_event
from adapter.matrix.files import (
download_matrix_attachment,
matrix_msgtype_for_attachment,
resolve_workspace_attachment_path,
)
from adapter.matrix.handlers import register_matrix_handlers
from adapter.matrix.handlers.auth import (
default_agent_notice,
handle_invite,
provision_workspace_chat,
restore_workspace_access,
)
from adapter.matrix.handlers.context_commands import (
LOAD_PROMPT,
)
from adapter.matrix.reconciliation import reconcile_startup_state
from adapter.matrix.room_router import resolve_chat_id
from adapter.matrix.routed_platform import RoutedPlatformClient
from adapter.matrix.store import (
add_staged_attachment,
clear_load_pending,
clear_staged_attachments,
get_load_pending,
get_room_meta,
get_staged_attachments,
next_platform_chat_id,
remove_staged_attachment_at,
set_pending_confirm,
set_platform_chat_id,
set_room_meta,
)
from core.auth import AuthManager
from core.chat import ChatManager
from core.handler import EventDispatcher
from core.handlers import register_all
from core.protocol import (
Attachment,
IncomingCommand,
IncomingMessage,
OutgoingEvent,
OutgoingMessage,
OutgoingNotification,
OutgoingTyping,
OutgoingUI,
)
from core.settings import SettingsManager
from core.store import InMemoryStore, SQLiteStore, StateStore
from sdk.interface import PlatformClient, PlatformError
from sdk.mock import MockPlatformClient
from sdk.prototype_state import PrototypeStateStore
from sdk.real import RealPlatformClient
logger = structlog.get_logger(__name__)
load_dotenv(Path(__file__).resolve().parents[2] / ".env")
@dataclass
class MatrixRuntime:
platform: PlatformClient
store: StateStore
chat_mgr: ChatManager
auth_mgr: AuthManager
settings_mgr: SettingsManager
dispatcher: EventDispatcher
agent_routing_enabled: bool = False
registry: AgentRegistry | None = None
def build_event_dispatcher(platform: PlatformClient, store: StateStore) -> EventDispatcher:
chat_mgr = ChatManager(platform, store)
auth_mgr = AuthManager(platform, store)
settings_mgr = SettingsManager(platform, store)
prototype_state = getattr(platform, "_prototype_state", None)
agent_base_url = _agent_base_url_from_env()
registry = _load_agent_registry_from_env()
dispatcher = EventDispatcher(
platform=platform, chat_mgr=chat_mgr, auth_mgr=auth_mgr, settings_mgr=settings_mgr
)
register_all(dispatcher)
register_matrix_handlers(
dispatcher,
store=store,
registry=registry,
prototype_state=prototype_state,
agent_base_url=agent_base_url,
)
return dispatcher
def _normalize_agent_base_url(url: str) -> str:
parsed = urlsplit(url)
path = re.sub(r"(?:/v1)?/agent_ws(?:/[^/]+)?/?$", "", parsed.path.rstrip("/"))
return urlunsplit((parsed.scheme, parsed.netloc, path, "", ""))
def _ws_debug_enabled() -> bool:
value = os.environ.get("SURFACES_DEBUG_WS", "")
return value.strip().lower() in {"1", "true", "yes", "on"}
def _configure_debug_logging() -> None:
if not _ws_debug_enabled():
return
root_logger = logging.getLogger()
if not root_logger.handlers:
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)-8s] %(name)s %(message)s",
)
elif root_logger.level > logging.INFO:
root_logger.setLevel(logging.INFO)
logging.getLogger("lambda_agent_api").setLevel(logging.INFO)
logging.getLogger("lambda_agent_api.agent_api").setLevel(logging.INFO)
def _agent_base_url_from_env() -> str:
if base_url := os.environ.get("AGENT_BASE_URL"):
return base_url
if ws_url := os.environ.get("AGENT_WS_URL"):
return _normalize_agent_base_url(ws_url)
return "http://127.0.0.1:8000"
def _load_agent_registry_from_env(required: bool = False) -> AgentRegistry | None:
registry_path = os.environ.get("MATRIX_AGENT_REGISTRY_PATH", "").strip()
if not registry_path:
if required:
raise RuntimeError(
"MATRIX_AGENT_REGISTRY_PATH is required when MATRIX_PLATFORM_BACKEND=real"
)
return None
try:
registry = load_agent_registry(registry_path)
except (AgentRegistryError, OSError) as exc:
raise RuntimeError(f"failed to load matrix agent registry: {registry_path}") from exc
if _ws_debug_enabled():
logger.warning(
"matrix_agent_registry_loaded",
registry_path=registry_path,
agent_count=len(registry.agents),
)
for agent in registry.agents:
logger.warning(
"matrix_agent_registry_entry",
registry_path=registry_path,
agent_id=agent.agent_id,
label=agent.label,
configured_base_url=agent.base_url,
normalized_base_url=_normalize_agent_base_url(agent.base_url)
if agent.base_url
else "",
workspace_path=agent.workspace_path,
)
return registry
def _build_platform_from_env(*, store: StateStore, chat_mgr: ChatManager) -> PlatformClient:
backend = os.environ.get("MATRIX_PLATFORM_BACKEND", "mock").strip().lower()
if _ws_debug_enabled():
logger.warning(
"matrix_platform_backend_selected",
backend=backend,
global_agent_base_url=_agent_base_url_from_env(),
registry_path=os.environ.get("MATRIX_AGENT_REGISTRY_PATH", "").strip(),
)
if backend == "real":
prototype_state = PrototypeStateStore()
registry = _load_agent_registry_from_env(required=True)
assert registry is not None
global_base_url = _agent_base_url_from_env()
delegates = {
agent.agent_id: RealPlatformClient(
agent_id=agent.agent_id,
agent_base_url=agent.base_url or global_base_url,
prototype_state=prototype_state,
platform="matrix",
)
for agent in registry.agents
}
return RoutedPlatformClient(
chat_mgr=chat_mgr,
store=store,
delegates=delegates,
)
return MockPlatformClient()
def build_runtime(
platform: PlatformClient | None = None,
store: StateStore | None = None,
client: AsyncClient | None = None,
) -> MatrixRuntime:
store = store or InMemoryStore()
chat_mgr = ChatManager(platform, store)
platform = platform or _build_platform_from_env(store=store, chat_mgr=chat_mgr)
chat_mgr = ChatManager(platform, store)
auth_mgr = AuthManager(platform, store)
settings_mgr = SettingsManager(platform, store)
prototype_state = getattr(platform, "_prototype_state", None)
agent_base_url = _agent_base_url_from_env()
registry = _load_agent_registry_from_env()
dispatcher = EventDispatcher(
platform=platform, chat_mgr=chat_mgr, auth_mgr=auth_mgr, settings_mgr=settings_mgr
)
register_all(dispatcher)
register_matrix_handlers(
dispatcher,
client=client,
store=store,
registry=registry,
prototype_state=prototype_state,
agent_base_url=agent_base_url,
)
return MatrixRuntime(
platform=platform,
store=store,
chat_mgr=chat_mgr,
auth_mgr=auth_mgr,
settings_mgr=settings_mgr,
dispatcher=dispatcher,
agent_routing_enabled=isinstance(platform, RoutedPlatformClient),
registry=registry,
)
class MatrixBot:
def __init__(self, client: AsyncClient, runtime: MatrixRuntime) -> None:
self.client = client
self.runtime = runtime
async def _ensure_platform_chat_id(self, room_id: str, room_meta: dict | None) -> None:
if not room_meta:
return
if room_meta.get("redirect_room_id"):
return
if room_meta.get("platform_chat_id"):
return
await set_platform_chat_id(
self.runtime.store,
room_id,
await next_platform_chat_id(self.runtime.store),
)
async def _refresh_room_agent_assignment(
self, room_id: str, matrix_user_id: str, room_meta: dict | None
) -> tuple[dict | None, bool]:
if not room_meta or room_meta.get("redirect_room_id") or self.runtime.registry is None:
return room_meta, False
assignment = self.runtime.registry.resolve_agent_for_user(matrix_user_id)
updated = dict(room_meta)
should_warn_default = False
if assignment.source == "configured" and (
updated.get("agent_id") != assignment.agent_id
or updated.get("agent_assignment") != "configured"
):
updated["agent_id"] = assignment.agent_id
updated["agent_assignment"] = "configured"
updated.pop("default_agent_notice_sent", None)
elif assignment.source == "default":
if not updated.get("agent_id"):
updated["agent_id"] = assignment.agent_id
if updated.get("agent_id") == assignment.agent_id:
updated["agent_assignment"] = "default"
should_warn_default = not updated.get("default_agent_notice_sent")
updated["default_agent_notice_sent"] = True
if updated != room_meta:
await set_room_meta(self.runtime.store, room_id, updated)
return updated, should_warn_default
return room_meta, should_warn_default
async def on_room_message(self, room: MatrixRoom, event: RoomMessageText) -> None:
if getattr(event, "sender", None) == self.client.user_id:
return
sender = getattr(event, "sender", None)
body = (getattr(event, "body", None) or "").strip()
room_meta = await get_room_meta(self.runtime.store, room.room_id)
if room_meta is not None and not room_meta.get("redirect_room_id"):
await self._ensure_platform_chat_id(room.room_id, room_meta)
room_meta, warn_default_agent = await self._refresh_room_agent_assignment(
room.room_id, sender, room_meta
)
if warn_default_agent and not body.startswith("!"):
await self._send_all(
room.room_id,
[OutgoingMessage(chat_id=room.room_id, text=default_agent_notice())],
)
load_pending = await get_load_pending(self.runtime.store, sender, room.room_id)
if load_pending is not None and (body.isdigit() or body == "!cancel"):
outgoing = await self._handle_load_selection(sender, room.room_id, body, load_pending)
await self._send_all(room.room_id, outgoing)
return
if room_meta is None:
outgoing = await self._bootstrap_unregistered_room(room, sender)
if outgoing:
await self._send_all(room.room_id, outgoing)
return
elif room_meta.get("redirect_room_id"):
display_name = getattr(room, "display_name", None) or sender
if body == "!new":
try:
created = await provision_workspace_chat(
self.client,
sender,
display_name,
self.runtime.platform,
self.runtime.store,
self.runtime.auth_mgr,
self.runtime.chat_mgr,
registry=self.runtime.registry,
)
except Exception as exc:
logger.warning(
"matrix_entry_room_new_chat_failed",
room_id=room.room_id,
sender=sender,
error=str(exc),
)
await self._send_all(
room.room_id,
[
OutgoingMessage(
chat_id=room.room_id,
text="Не удалось создать новый рабочий чат.",
)
],
)
return
welcome = f"Создал новый рабочий чат {created['room_name']}."
if created.get("agent_assignment") == "default":
welcome = f"{welcome}\n\n{default_agent_notice()}"
await self.client.room_send(
created["chat_room_id"],
"m.room.message",
{"msgtype": "m.text", "body": welcome},
)
await set_room_meta(
self.runtime.store,
room.room_id,
{
**room_meta,
"redirect_room_id": created["chat_room_id"],
"redirect_chat_id": created["chat_id"],
},
)
await self._send_all(
room.room_id,
[
OutgoingMessage(
chat_id=room.room_id,
text=(
f"Создал рабочий чат {created['room_name']} "
f"({created['chat_id']}) и отправил приглашение."
),
)
],
)
return
restored = await restore_workspace_access(
self.client,
sender,
display_name,
self.runtime.platform,
self.runtime.store,
self.runtime.auth_mgr,
self.runtime.chat_mgr,
registry=self.runtime.registry,
)
redirect_room_id = room_meta["redirect_room_id"]
redirect_chat_id = room_meta.get("redirect_chat_id", "рабочий чат")
if restored.get("created_new_chat"):
text = (
f"Создал новый рабочий чат {restored['room_name']} "
f"({restored['chat_id']}) и отправил приглашение."
)
else:
text = (
f"Рабочий чат уже создан: {redirect_chat_id}. "
"Я повторно отправил приглашения в пространство Lambda и рабочие чаты. "
"Чтобы создать новый чат, напишите !new здесь."
)
await self._send_all(
room.room_id,
[
OutgoingMessage(
chat_id=room.room_id,
text=text,
)
],
)
logger.info(
"matrix_redirect_entry_room",
room_id=room.room_id,
redirect_room_id=redirect_room_id,
user=sender,
)
return
if not body.startswith("!") and self.runtime.agent_routing_enabled:
pass
local_chat_id = await resolve_chat_id(self.runtime.store, room.room_id, sender)
incoming = from_room_event(event, room_id=room.room_id, chat_id=local_chat_id)
if incoming is None:
return
if isinstance(incoming, IncomingCommand) and incoming.command in {
"matrix_list_attachments",
"matrix_remove_attachment",
}:
outgoing = await self._handle_staged_attachment_command(
room.room_id,
sender,
incoming,
)
await self._send_all(room.room_id, outgoing)
return
if self._is_file_only_event(event, incoming):
materialized = await self._materialize_incoming_attachments(
room.room_id,
sender,
incoming,
)
await self._stage_attachments(room.room_id, sender, materialized.attachments)
return
if isinstance(incoming, IncomingMessage) and incoming.attachments:
incoming = await self._materialize_incoming_attachments(
room.room_id,
sender,
incoming,
)
clear_staged_after_dispatch = False
if isinstance(incoming, IncomingMessage) and incoming.text:
incoming, clear_staged_after_dispatch = await self._merge_staged_attachments(
room.room_id,
sender,
incoming,
)
agent_id = (room_meta or {}).get("agent_id")
if _ws_debug_enabled() and not body.startswith("!"):
logger.warning(
"matrix_incoming_message_route",
room_id=room.room_id,
sender=sender,
local_chat_id=local_chat_id,
agent_id=agent_id,
platform_chat_id=(room_meta or {}).get("platform_chat_id"),
)
workspace_root = self._agent_workspace_root(agent_id)
try:
outgoing = await self.runtime.dispatcher.dispatch(incoming)
except PlatformError as exc:
logger.warning(
"matrix_message_platform_error",
room_id=room.room_id,
sender=getattr(event, "sender", None),
code=exc.code,
error=str(exc),
)
outgoing = [
OutgoingMessage(
chat_id=local_chat_id,
text="Сервис временно недоступен. Попробуйте ещё раз позже.",
)
]
else:
if clear_staged_after_dispatch:
await clear_staged_attachments(self.runtime.store, room.room_id, sender)
await self._send_all(room.room_id, outgoing, workspace_root=workspace_root)
def _is_file_only_event(
self, event: RoomMessage, incoming: IncomingMessage | IncomingCommand
) -> bool:
return (
isinstance(incoming, IncomingMessage)
and bool(incoming.attachments)
and not isinstance(event, RoomMessageText)
)
async def _stage_attachments(
self,
room_id: str,
user_id: str,
attachments: list,
) -> None:
for attachment in attachments:
await add_staged_attachment(
self.runtime.store,
room_id,
user_id,
{
"type": attachment.type,
"url": attachment.url,
"filename": attachment.filename,
"mime_type": attachment.mime_type,
"workspace_path": attachment.workspace_path,
},
)
async def _format_staged_attachments(
self,
room_id: str,
user_id: str,
*,
include_hint: bool = False,
) -> str:
attachments = await get_staged_attachments(self.runtime.store, room_id, user_id)
if not attachments:
return "Нет сохраненных вложений."
lines = ["Вложения в очереди:"]
for index, attachment in enumerate(attachments, start=1):
lines.append(f"{index}. {attachment.get('filename') or 'attachment'}")
if include_hint:
lines.extend(
[
"",
"Следующее сообщение отправит файлы агенту.",
"Команды: !list, !remove <n>, !remove all",
]
)
return "\n".join(lines)
async def _handle_staged_attachment_command(
self,
room_id: str,
user_id: str,
incoming: IncomingCommand,
) -> list[OutgoingEvent]:
if incoming.command == "matrix_list_attachments":
return [
OutgoingMessage(
chat_id=incoming.chat_id,
text=await self._format_staged_attachments(room_id, user_id),
)
]
arg = incoming.args[0] if incoming.args else ""
if arg == "all":
await clear_staged_attachments(self.runtime.store, room_id, user_id)
return [OutgoingMessage(chat_id=incoming.chat_id, text="Все вложения удалены.")]
try:
index = int(arg) - 1
except ValueError:
return [OutgoingMessage(chat_id=incoming.chat_id, text="Нет такого вложения.")]
removed = await remove_staged_attachment_at(self.runtime.store, room_id, user_id, index)
if removed is None:
return [OutgoingMessage(chat_id=incoming.chat_id, text="Нет такого вложения.")]
return [
OutgoingMessage(
chat_id=incoming.chat_id,
text=await self._format_staged_attachments(room_id, user_id),
)
]
async def _merge_staged_attachments(
self,
room_id: str,
user_id: str,
incoming: IncomingMessage,
) -> tuple[IncomingMessage, bool]:
staged = await get_staged_attachments(self.runtime.store, room_id, user_id)
if not staged:
return incoming, False
attachments = [
Attachment(
type=item.get("type", "document"),
url=item.get("url"),
filename=item.get("filename"),
mime_type=item.get("mime_type"),
workspace_path=item.get("workspace_path"),
)
for item in staged
]
return (
IncomingMessage(
user_id=incoming.user_id,
platform=incoming.platform,
chat_id=incoming.chat_id,
text=incoming.text,
attachments=attachments,
reply_to=incoming.reply_to,
),
True,
)
def _agent_workspace_root(self, agent_id: str | None) -> Path:
default = Path(os.environ.get("SURFACES_WORKSPACE_DIR", "/workspace"))
if agent_id is None or self.runtime.registry is None:
return default
try:
agent = self.runtime.registry.get(agent_id)
if agent.workspace_path:
return Path(agent.workspace_path)
except Exception:
pass
return default
async def _materialize_incoming_attachments(
self,
room_id: str,
matrix_user_id: str,
incoming: IncomingMessage,
) -> IncomingMessage:
room_meta = await get_room_meta(self.runtime.store, room_id)
agent_id = (room_meta or {}).get("agent_id")
workspace_root = self._agent_workspace_root(agent_id)
materialized = []
for attachment in incoming.attachments:
materialized.append(
await download_matrix_attachment(
client=self.client,
workspace_root=workspace_root,
matrix_user_id=matrix_user_id,
room_id=room_id,
attachment=attachment,
)
)
return IncomingMessage(
user_id=incoming.user_id,
platform=incoming.platform,
chat_id=incoming.chat_id,
text=incoming.text,
attachments=materialized,
reply_to=incoming.reply_to,
)
async def _bootstrap_unregistered_room(
self,
room: MatrixRoom,
sender: str,
) -> list[OutgoingEvent] | None:
if not hasattr(self.client, "room_create") or not hasattr(self.client, "room_put_state"):
return None
display_name = getattr(room, "display_name", None) or sender
try:
created = await provision_workspace_chat(
self.client,
sender,
display_name,
self.runtime.platform,
self.runtime.store,
self.runtime.auth_mgr,
self.runtime.chat_mgr,
registry=self.runtime.registry,
)
except Exception as exc:
logger.warning(
"matrix_unregistered_room_bootstrap_failed",
room_id=room.room_id,
sender=sender,
error=str(exc),
)
return [
OutgoingMessage(
chat_id=room.room_id,
text="Не удалось подготовить рабочий чат. Попробуйте ещё раз позже.",
)
]
welcome = (
f"Привет, {created['user'].display_name or sender}! Пиши — я здесь.\n\n"
"Команды: !new · !chats · !rename · !archive · !context · !save · !load · !help"
)
if created.get("agent_assignment") == "default":
welcome = f"{welcome}\n\n{default_agent_notice()}"
await set_room_meta(
self.runtime.store,
room.room_id,
{
"matrix_user_id": sender,
"redirect_room_id": created["chat_room_id"],
"redirect_chat_id": created["chat_id"],
},
)
await self.client.room_send(
created["chat_room_id"],
"m.room.message",
{"msgtype": "m.text", "body": welcome},
)
return [
OutgoingMessage(
chat_id=room.room_id,
text=(
f"Создал рабочий чат {created['room_name']} ({created['chat_id']}) "
"и добавил его в пространство Lambda. "
"Открой приглашённую комнату для продолжения."
),
)
]
async def _handle_load_selection(
self,
user_id: str,
room_id: str,
text: str,
pending: dict,
) -> list[OutgoingEvent]:
saves = pending.get("saves", [])
if text in {"0", "!cancel"}:
await clear_load_pending(self.runtime.store, user_id, room_id)
return [OutgoingMessage(chat_id=room_id, text="Отменено.")]
index = int(text) - 1
if index < 0 or index >= len(saves):
return [
OutgoingMessage(
chat_id=room_id,
text=f"Неверный номер. Введи от 1 до {len(saves)} или 0 для отмены.",
)
]
name = saves[index]["name"]
await clear_load_pending(self.runtime.store, user_id, room_id)
prototype_state = getattr(self.runtime.platform, "_prototype_state", None)
if prototype_state is not None:
room_meta = await get_room_meta(self.runtime.store, room_id)
context_keys = []
if room_meta is not None:
platform_chat_id = room_meta.get("platform_chat_id")
if platform_chat_id:
context_keys.append(platform_chat_id)
chat_id = room_meta.get("chat_id")
if chat_id:
context_keys.append(chat_id)
if not context_keys:
context_keys.append(room_id)
for context_key in dict.fromkeys(context_keys):
await prototype_state.set_current_session(context_key, name)
try:
await self.runtime.platform.send_message(
user_id,
room_id,
LOAD_PROMPT.format(name=name),
)
except Exception as exc:
logger.warning("load_agent_call_failed", error=str(exc))
return [OutgoingMessage(chat_id=room_id, text=f"Ошибка при загрузке: {exc}")]
return [
OutgoingMessage(chat_id=room_id, text=f"Запрос на загрузку отправлен агенту: {name}")
]
async def on_member(self, room: MatrixRoom, event: RoomMemberEvent) -> None:
if getattr(event, "sender", None) == self.client.user_id:
return
membership = getattr(event, "membership", None)
if membership == "invite":
await handle_invite(
self.client,
room,
event,
self.runtime.platform,
self.runtime.store,
self.runtime.auth_mgr,
self.runtime.chat_mgr,
self.runtime.registry,
)
async def _send_all(
self,
room_id: str,
outgoing: list[OutgoingEvent],
workspace_root: Path | None = None,
) -> None:
for event in outgoing:
await send_outgoing(
self.client,
room_id,
event,
store=self.runtime.store,
workspace_root=workspace_root,
)
async def prepare_live_sync(client: AsyncClient) -> str | None:
response = await client.sync(timeout=0, full_state=True)
if isinstance(response, SyncResponse):
return response.next_batch
return None
async def send_outgoing(
client: AsyncClient,
room_id: str,
event: OutgoingEvent,
store: StateStore | None = None,
workspace_root: Path | None = None,
) -> None:
if isinstance(event, OutgoingTyping):
await client.room_typing(room_id, event.is_typing, timeout=25000)
return
if isinstance(event, OutgoingNotification):
body = f"[{event.level.upper()}] {event.text}"
await client.room_send(room_id, "m.room.message", {"msgtype": "m.text", "body": body})
return
if isinstance(event, OutgoingMessage):
if event.text:
await client.room_send(
room_id, "m.room.message", {"msgtype": "m.text", "body": event.text}
)
if event.attachments:
workspace_root = workspace_root or Path(
os.environ.get("SURFACES_WORKSPACE_DIR", "/workspace")
)
for attachment in event.attachments:
if not attachment.workspace_path:
continue
file_path = resolve_workspace_attachment_path(
workspace_root, attachment.workspace_path
)
with file_path.open("rb") as handle:
upload_response, _ = await client.upload(
handle,
content_type=attachment.mime_type or "application/octet-stream",
filename=attachment.filename or file_path.name,
filesize=file_path.stat().st_size,
)
content_uri = getattr(upload_response, "content_uri", None)
if not content_uri:
raise RuntimeError(f"Matrix upload failed for {file_path}")
await client.room_send(
room_id,
"m.room.message",
{
"msgtype": matrix_msgtype_for_attachment(attachment),
"body": attachment.filename or file_path.name,
"url": content_uri,
},
)
return
if isinstance(event, OutgoingUI):
lines = [event.text]
if event.buttons:
lines.append("")
for button in event.buttons:
lines.append(f" {button.label}")
lines.append("")
lines.append("Ответьте !yes для подтверждения или !no для отмены.")
body = "\n".join(lines)
await client.room_send(room_id, "m.room.message", {"msgtype": "m.text", "body": body})
if event.buttons and store is not None:
action_id = event.buttons[0].action
payload = event.buttons[0].payload
room_meta = await get_room_meta(store, room_id)
matrix_user_id = room_meta.get("matrix_user_id") if room_meta else None
if matrix_user_id:
await set_pending_confirm(
store,
matrix_user_id,
room_id,
{
"action_id": action_id,
"description": event.text,
"payload": payload,
},
)
return
async def main() -> None:
_configure_debug_logging()
homeserver = os.environ.get("MATRIX_HOMESERVER")
user_id = os.environ.get("MATRIX_USER_ID")
device_id = os.environ.get("MATRIX_DEVICE_ID", "")
password = os.environ.get("MATRIX_PASSWORD")
token = os.environ.get("MATRIX_ACCESS_TOKEN")
db_path = os.environ.get("MATRIX_DB_PATH", "lambda_matrix.db")
store_path = os.environ.get("MATRIX_STORE_PATH", "matrix_store")
if not homeserver or not user_id:
raise RuntimeError("MATRIX_HOMESERVER and MATRIX_USER_ID are required")
client_config = AsyncClientConfig(
request_timeout=120,
max_timeouts=12,
max_limit_exceeded=20,
backoff_factor=0.5,
max_timeout_retry_wait_time=15,
)
client = AsyncClient(
homeserver,
user=user_id,
device_id=device_id,
store_path=store_path,
config=client_config,
)
runtime = build_runtime(store=SQLiteStore(db_path), client=client)
if token:
client.access_token = token
elif password:
await client.login(password=password, device_name="surfaces-bot")
since_token = await prepare_live_sync(client)
await reconcile_startup_state(client, runtime)
bot = MatrixBot(client, runtime)
client.add_event_callback(
bot.on_room_message,
(
RoomMessageText,
RoomMessageFile,
RoomMessageImage,
RoomMessageVideo,
RoomMessageAudio,
),
)
client.add_event_callback(bot.on_member, (InviteMemberEvent, RoomMemberEvent))
logger.info(
"Matrix bot starting",
homeserver=homeserver,
user_id=user_id,
store_path=store_path,
request_timeout=client_config.request_timeout,
)
if _ws_debug_enabled():
logger.warning(
"matrix_ws_debug_enabled",
homeserver=homeserver,
user_id=user_id,
backend=os.environ.get("MATRIX_PLATFORM_BACKEND", "mock").strip().lower(),
global_agent_base_url=_agent_base_url_from_env(),
registry_path=os.environ.get("MATRIX_AGENT_REGISTRY_PATH", "").strip(),
)
try:
await client.sync_forever(timeout=30000, since=since_token)
finally:
close = getattr(runtime.platform, "close", None)
if callable(close):
await close()
await client.close()
if __name__ == "__main__":
asyncio.run(main())