from __future__ import annotations import asyncio import os from dataclasses import dataclass from pathlib import Path import structlog from nio import ( AsyncClient, AsyncClientConfig, InviteMemberEvent, MatrixRoom, RoomMemberEvent, RoomMessageText, ) from nio.responses import SyncResponse from dotenv import load_dotenv from adapter.matrix.converter import from_room_event from adapter.matrix.handlers import register_matrix_handlers from adapter.matrix.handlers.context_commands import ( LOAD_PROMPT, SAVE_PROMPT, _call_reset_endpoint, _sanitize_session_name, ) from adapter.matrix.handlers.auth import handle_invite from adapter.matrix.room_router import resolve_chat_id from adapter.matrix.store import ( clear_load_pending, clear_reset_pending, get_load_pending, get_reset_pending, get_room_meta, set_pending_confirm, ) from core.auth import AuthManager from core.chat import ChatManager from core.handler import EventDispatcher from core.handlers import register_all from core.protocol import ( OutgoingEvent, OutgoingMessage, OutgoingNotification, OutgoingTyping, OutgoingUI, ) from core.settings import SettingsManager from core.store import InMemoryStore, SQLiteStore, StateStore from sdk.agent_api_wrapper import AgentApiWrapper from sdk.interface import PlatformClient, PlatformError from sdk.mock import MockPlatformClient from sdk.prototype_state import PrototypeStateStore from sdk.real import RealPlatformClient logger = structlog.get_logger(__name__) load_dotenv(Path(__file__).resolve().parents[2] / ".env") @dataclass class MatrixRuntime: platform: PlatformClient store: StateStore chat_mgr: ChatManager auth_mgr: AuthManager settings_mgr: SettingsManager dispatcher: EventDispatcher def build_event_dispatcher(platform: PlatformClient, store: StateStore) -> EventDispatcher: chat_mgr = ChatManager(platform, store) auth_mgr = AuthManager(platform, store) settings_mgr = SettingsManager(platform, store) prototype_state = getattr(platform, "_prototype_state", None) agent_api = getattr(platform, "_agent_api", None) agent_base_url = os.environ.get("AGENT_BASE_URL", "http://127.0.0.1:8000") dispatcher = EventDispatcher( platform=platform, chat_mgr=chat_mgr, auth_mgr=auth_mgr, settings_mgr=settings_mgr ) register_all(dispatcher) register_matrix_handlers( dispatcher, store=store, agent_api=agent_api, prototype_state=prototype_state, agent_base_url=agent_base_url, ) return dispatcher def _build_platform_from_env() -> PlatformClient: backend = os.environ.get("MATRIX_PLATFORM_BACKEND", "mock").strip().lower() if backend == "real": ws_url = os.environ["AGENT_WS_URL"] return RealPlatformClient( agent_api=AgentApiWrapper(agent_id="matrix-bot", url=ws_url), prototype_state=PrototypeStateStore(), platform="matrix", ) return MockPlatformClient() def build_runtime( platform: PlatformClient | None = None, store: StateStore | None = None, client: AsyncClient | None = None, ) -> MatrixRuntime: platform = platform or _build_platform_from_env() store = store or InMemoryStore() chat_mgr = ChatManager(platform, store) auth_mgr = AuthManager(platform, store) settings_mgr = SettingsManager(platform, store) prototype_state = getattr(platform, "_prototype_state", None) agent_api = getattr(platform, "_agent_api", None) agent_base_url = os.environ.get("AGENT_BASE_URL", "http://127.0.0.1:8000") dispatcher = EventDispatcher( platform=platform, chat_mgr=chat_mgr, auth_mgr=auth_mgr, settings_mgr=settings_mgr ) register_all(dispatcher) register_matrix_handlers( dispatcher, client=client, store=store, agent_api=agent_api, prototype_state=prototype_state, agent_base_url=agent_base_url, ) return MatrixRuntime( platform=platform, store=store, chat_mgr=chat_mgr, auth_mgr=auth_mgr, settings_mgr=settings_mgr, dispatcher=dispatcher, ) class MatrixBot: def __init__(self, client: AsyncClient, runtime: MatrixRuntime) -> None: self.client = client self.runtime = runtime async def on_room_message(self, room: MatrixRoom, event: RoomMessageText) -> None: if getattr(event, "sender", None) == self.client.user_id: return sender = getattr(event, "sender", None) body = (getattr(event, "body", None) or "").strip() load_pending = await get_load_pending(self.runtime.store, sender, room.room_id) if load_pending is not None and (body.isdigit() or body == "!cancel"): outgoing = await self._handle_load_selection(sender, room.room_id, body, load_pending) await self._send_all(room.room_id, outgoing) return reset_pending = await get_reset_pending(self.runtime.store, sender, room.room_id) if reset_pending is not None and (body in {"!yes", "!no"} or body.startswith("!save ")): outgoing = await self._handle_reset_selection(sender, room.room_id, body) await self._send_all(room.room_id, outgoing) return chat_id = await resolve_chat_id(self.runtime.store, room.room_id, sender) incoming = from_room_event(event, room_id=room.room_id, chat_id=chat_id) if incoming is None: return try: outgoing = await self.runtime.dispatcher.dispatch(incoming) except PlatformError as exc: logger.warning( "matrix_message_platform_error", room_id=room.room_id, sender=getattr(event, "sender", None), code=exc.code, error=str(exc), ) outgoing = [ OutgoingMessage( chat_id=chat_id, text="Сервис временно недоступен. Попробуйте ещё раз позже." ) ] await self._send_all(room.room_id, outgoing) async def _handle_load_selection( self, user_id: str, room_id: str, text: str, pending: dict, ) -> list[OutgoingEvent]: saves = pending.get("saves", []) if text in {"0", "!cancel"}: await clear_load_pending(self.runtime.store, user_id, room_id) return [OutgoingMessage(chat_id=room_id, text="Отменено.")] index = int(text) - 1 if index < 0 or index >= len(saves): return [ OutgoingMessage( chat_id=room_id, text=f"Неверный номер. Введи от 1 до {len(saves)} или 0 для отмены.", ) ] name = saves[index]["name"] await clear_load_pending(self.runtime.store, user_id, room_id) prototype_state = getattr(self.runtime.platform, "_prototype_state", None) if prototype_state is not None: await prototype_state.set_current_session(user_id, name) try: await self.runtime.platform.send_message( user_id, room_id, LOAD_PROMPT.format(name=name), ) except Exception as exc: logger.warning("load_agent_call_failed", error=str(exc)) return [OutgoingMessage(chat_id=room_id, text=f"Ошибка при загрузке: {exc}")] return [OutgoingMessage(chat_id=room_id, text=f"Загрузка: {name}")] async def _handle_reset_selection( self, user_id: str, room_id: str, text: str, ) -> list[OutgoingEvent]: agent_base_url = os.environ.get("AGENT_BASE_URL", "http://127.0.0.1:8000") prototype_state = getattr(self.runtime.platform, "_prototype_state", None) await clear_reset_pending(self.runtime.store, user_id, room_id) if text == "!no": return [OutgoingMessage(chat_id=room_id, text="Отменено.")] if text.startswith("!save "): name = _sanitize_session_name(text[len("!save ") :].strip()) if name is None: return [ OutgoingMessage( chat_id=room_id, text="Имя сохранения может содержать только буквы, цифры, _ и -.", ) ] try: await self.runtime.platform.send_message( user_id, room_id, SAVE_PROMPT.format(name=name), ) if prototype_state is not None: await prototype_state.add_saved_session(user_id, name) except Exception as exc: logger.warning("save_before_reset_failed", error=str(exc)) return [OutgoingMessage(chat_id=room_id, text=f"Ошибка при сохранении: {exc}")] if prototype_state is not None: await prototype_state.clear_current_session(user_id) return await _call_reset_endpoint(agent_base_url, room_id) async def on_member(self, room: MatrixRoom, event: RoomMemberEvent) -> None: if getattr(event, "sender", None) == self.client.user_id: return membership = getattr(event, "membership", None) if membership == "invite": await handle_invite( self.client, room, event, self.runtime.platform, self.runtime.store, self.runtime.auth_mgr, self.runtime.chat_mgr, ) async def _send_all(self, room_id: str, outgoing: list[OutgoingEvent]) -> None: for event in outgoing: await send_outgoing(self.client, room_id, event, store=self.runtime.store) async def prepare_live_sync(client: AsyncClient) -> str | None: response = await client.sync(timeout=0, full_state=True) if isinstance(response, SyncResponse): return response.next_batch return None async def send_outgoing( client: AsyncClient, room_id: str, event: OutgoingEvent, store: StateStore | None = None, ) -> None: if isinstance(event, OutgoingTyping): await client.room_typing(room_id, event.is_typing, timeout=25000) return if isinstance(event, OutgoingNotification): body = f"[{event.level.upper()}] {event.text}" await client.room_send(room_id, "m.room.message", {"msgtype": "m.text", "body": body}) return if isinstance(event, OutgoingMessage): await client.room_send(room_id, "m.room.message", {"msgtype": "m.text", "body": event.text}) return if isinstance(event, OutgoingUI): lines = [event.text] if event.buttons: lines.append("") for button in event.buttons: lines.append(f" {button.label}") lines.append("") lines.append("Ответьте !yes для подтверждения или !no для отмены.") body = "\n".join(lines) await client.room_send(room_id, "m.room.message", {"msgtype": "m.text", "body": body}) if event.buttons and store is not None: action_id = event.buttons[0].action payload = event.buttons[0].payload room_meta = await get_room_meta(store, room_id) matrix_user_id = room_meta.get("matrix_user_id") if room_meta else None if matrix_user_id: await set_pending_confirm( store, matrix_user_id, room_id, { "action_id": action_id, "description": event.text, "payload": payload, }, ) return async def main() -> None: homeserver = os.environ.get("MATRIX_HOMESERVER") user_id = os.environ.get("MATRIX_USER_ID") device_id = os.environ.get("MATRIX_DEVICE_ID", "") password = os.environ.get("MATRIX_PASSWORD") token = os.environ.get("MATRIX_ACCESS_TOKEN") db_path = os.environ.get("MATRIX_DB_PATH", "lambda_matrix.db") store_path = os.environ.get("MATRIX_STORE_PATH", "matrix_store") if not homeserver or not user_id: raise RuntimeError("MATRIX_HOMESERVER and MATRIX_USER_ID are required") client_config = AsyncClientConfig( request_timeout=120, max_timeouts=12, max_limit_exceeded=20, backoff_factor=0.5, max_timeout_retry_wait_time=15, ) client = AsyncClient( homeserver, user=user_id, device_id=device_id, store_path=store_path, config=client_config, ) runtime = build_runtime(store=SQLiteStore(db_path), client=client) if token: client.access_token = token elif password: await client.login(password=password, device_name="surfaces-bot") since_token = await prepare_live_sync(client) bot = MatrixBot(client, runtime) client.add_event_callback(bot.on_room_message, RoomMessageText) client.add_event_callback(bot.on_member, (InviteMemberEvent, RoomMemberEvent)) logger.info( "Matrix bot starting", homeserver=homeserver, user_id=user_id, store_path=store_path, request_timeout=client_config.request_timeout, ) try: if isinstance(runtime.platform, RealPlatformClient): await runtime.platform.agent_api.connect() await client.sync_forever(timeout=30000, since=since_token) finally: if isinstance(runtime.platform, RealPlatformClient): await runtime.platform.agent_api.close() await client.close() if __name__ == "__main__": asyncio.run(main())