"""MAX surface bot runtime.""" from __future__ import annotations import os import asyncio import logging from pathlib import Path import aiohttp import structlog from adapter.max.agent_registry import load_from_env, AgentRegistry from adapter.max.converter import ( max_message_to_incoming, max_attachment_to_internal, ) from adapter.max.files import FileHandler from adapter.max.handlers.chat import ChatHandler as MaxChatHandler from adapter.max.handlers.attachments import AttachmentHandler from adapter.max.handlers.help import get_help from adapter.max.store import ChatStore, RoomMeta from core.chat import ChatManager from core.auth import AuthManager from core.handler import EventDispatcher from core.protocol import ( Attachment, IncomingMessage, IncomingCommand, IncomingCallback, OutgoingEvent, OutgoingMessage, OutgoingNotification, OutgoingTyping, OutgoingUI, ) from core.settings import SettingsManager from core.store import InMemoryStore, StateStore from sdk.interface import ( MessageChunk, MessageResponse, PlatformClient, PlatformError, User, UserSettings, ) from sdk.real import RealPlatformClient logger = structlog.get_logger(__name__) # --------------------------------------------------------------------------- # Routed MAX platform client — копия логики RoutedPlatformClient из Matrix # --------------------------------------------------------------------------- class RoutedMaxPlatformClient(PlatformClient): """Маршрутизирует запросы к нужному агенту на основе chat_id.""" def __init__(self, *, store: ChatStore, delegates: dict[str, PlatformClient]): if not delegates: raise ValueError("RoutedMaxPlatformClient requires at least one delegate") self._store = store self._delegates = dict(delegates) self._default_client = next(iter(self._delegates.values())) async def get_or_create_user( self, external_id: str, platform: str, display_name: str | None = None ) -> User: return await self._default_client.get_or_create_user( external_id=external_id, platform=platform, display_name=display_name ) async def send_message( self, user_id: str, chat_id: str, text: str, attachments: list[Attachment] | None = None, ) -> MessageResponse: delegate, platform_chat_id = await self._resolve_delegate(user_id, chat_id) return await delegate.send_message(user_id, platform_chat_id, text, attachments) async def stream_message( self, user_id: str, chat_id: str, text: str, attachments: list[Attachment] | None = None, ): delegate, platform_chat_id = await self._resolve_delegate(user_id, chat_id) async for chunk in delegate.stream_message(user_id, platform_chat_id, text, attachments): yield chunk async def get_settings(self, user_id: str) -> UserSettings: return await self._default_client.get_settings(user_id) async def update_settings(self, user_id: str, action) -> None: await self._default_client.update_settings(user_id, action) async def close(self) -> None: for delegate in self._delegates.values(): close_fn = getattr(delegate, "close", None) if callable(close_fn): await close_fn() async def _resolve_delegate( self, user_id: str, local_chat_id: str ) -> tuple[PlatformClient, str]: room = self._store.get_room_by_platform_chat_id(local_chat_id) if room is None: raise PlatformError(f"unknown chat id: {local_chat_id}", code="CHAT_NOT_FOUND") agent_id = room.agent_id platform_chat_id = room.platform_chat_id if not agent_id or not platform_chat_id: raise PlatformError( f"routing incomplete for chat: {local_chat_id}", code="ROUTE_INCOMPLETE" ) delegate = self._delegates.get(str(agent_id)) if delegate is None: raise PlatformError(f"unknown agent id: {agent_id}", code="AGENT_NOT_FOUND") return delegate, str(platform_chat_id) # --------------------------------------------------------------------------- # MAX Surface # --------------------------------------------------------------------------- class MaxSurface: def __init__(self): # Env self.token = os.environ["MAX_BOT_TOKEN"] self.api_url = os.environ.get("MAX_API_URL", "https://api.max.ru/v1") self.workspace_dir = os.environ.get("SURFACES_WORKSPACE_DIR", "/agents") self.agent_base_url = os.environ.get("AGENT_BASE_URL", "") # Registry self.registry: AgentRegistry = load_from_env() # MAX-specific store for chat ↔ agent mapping self.store = ChatStore() self.files = FileHandler(self.workspace_dir) self.max_chat_handler = MaxChatHandler(self.store) self.attach_handler = AttachmentHandler(self.store) # Core store (in-memory, lost on restart — OK for MVP) self.core_store: StateStore = InMemoryStore() # Platform client per agent delegates: dict[str, PlatformClient] = {} for agent in self.registry.agents: base = self.agent_base_url or agent.base_url.rstrip("/") delegates[agent.id] = RealPlatformClient( agent_id=agent.id, agent_base_url=base, prototype_state=None, platform="max", ) # Routed platform self.platform = RoutedMaxPlatformClient( store=self.store, delegates=delegates, ) # Core managers self.chat_mgr = ChatManager(self.platform, self.core_store) self.auth_mgr = AuthManager(self.platform, self.core_store) self.settings_mgr = SettingsManager(self.platform, self.core_store) # Event dispatcher — это и есть "ядро" self.dispatcher = EventDispatcher( platform=self.platform, chat_mgr=self.chat_mgr, auth_mgr=self.auth_mgr, settings_mgr=self.settings_mgr, ) # HTTP session for MAX API self.session: aiohttp.ClientSession | None = None # ------------------------------------------------------------------ # Long polling # ------------------------------------------------------------------ async def start(self): self.session = aiohttp.ClientSession( headers={"Authorization": f"Bearer {self.token}"} ) logger.info("max_surface_starting", api_url=self.api_url) offset = 0 while True: try: updates = await self._get_updates(offset) for update in updates: offset = update["update_id"] + 1 await self._process_update(update) except Exception as e: logger.error("max_poll_error", error=str(e)) await asyncio.sleep(5) async def _get_updates(self, offset: int) -> list: async with self.session.get( f"{self.api_url}/updates", params={"offset": offset, "timeout": 30}, ) as resp: data = await resp.json() return data.get("result", []) async def _process_update(self, update: dict) -> None: if "message" in update: await self._handle_message(update["message"]) elif "callback_query" in update: await self._handle_callback(update["callback_query"]) # ------------------------------------------------------------------ # Message handling # ------------------------------------------------------------------ async def _handle_message(self, message: dict) -> None: text = message.get("text", "") or message.get("caption", "") user_id = str(message["from"]["id"]) chat_id = str(message["chat"]["id"]) # Ensure room exists room = self.store.get_room_by_max_chat_id(chat_id) if room is None: agent = self.registry.get_agent_for_user(user_id) platform_chat_id = self.max_chat_handler.handle_new( max_chat_id=chat_id, user_id=user_id, agent_id=agent.id, ) room = self.store.get_room_by_max_chat_id(chat_id) else: agent = self.registry.get_agent_by_id(room.agent_id) # Handle attachments attachments = [] if "attachment" in message: att = message["attachment"] internal_att = max_attachment_to_internal( filename=att["filename"], mime_type=att.get("mime_type", "application/octet-stream"), download_url=att["download_url"], ) attachments.append(internal_att) workspace_path = await self.files.download_attachment( download_url=att["download_url"], filename=att["filename"], agent_workspace=agent.workspace_path, headers={"Authorization": f"Bearer {self.token}"}, ) self.store.stage_attachment(chat_id, (workspace_path, att["filename"])) # File-only message → stage and return if attachments and not text: return # Merge staged attachments queued = self.store.pop_attachments(chat_id) if queued: for ws_path, filename in queued: attachments.append( Attachment( type="document", filename=filename, workspace_path=ws_path, ) ) # Convert to incoming event incoming = max_message_to_incoming( text=text, user_id=user_id, chat_id=room.platform_chat_id, attachments=attachments, ) # Surface-level commands if isinstance(incoming, IncomingCommand): response_text = await self._handle_surface_command( incoming, max_chat_id=chat_id, user_id=user_id, agent=agent ) if response_text: await self._send_message(chat_id, response_text) return # Dispatch to core try: outgoing_events = await self.dispatcher.dispatch(incoming) except PlatformError as exc: logger.warning( "max_dispatch_platform_error", user_id=user_id, chat_id=chat_id, code=exc.code, error=str(exc), ) outgoing_events = [ OutgoingMessage( chat_id=room.platform_chat_id, text="Сервис временно недоступен. Попробуйте ещё раз позже.", ) ] # Send outgoing events back to MAX for event in outgoing_events: await self._send_outgoing(chat_id, event, agent.workspace_path) # ------------------------------------------------------------------ # Callbacks # ------------------------------------------------------------------ async def _handle_callback(self, callback: dict) -> None: user_id = str(callback["from"]["id"]) chat_id = str(callback["message"]["chat"]["id"]) message_id = str(callback["message"]["message_id"]) data = callback.get("data", "") room = self.store.get_room_by_max_chat_id(chat_id) if room is None: return incoming = max_message_to_incoming( text="", user_id=user_id, chat_id=room.platform_chat_id, callback_data=data, message_id=message_id, ) try: outgoing_events = await self.dispatcher.dispatch(incoming) except PlatformError: return for event in outgoing_events: agent = self.registry.get_agent_by_id(room.agent_id) ws = agent.workspace_path if agent else "/agents/0" await self._send_outgoing(chat_id, event, ws) # ------------------------------------------------------------------ # Surface commands # ------------------------------------------------------------------ async def _handle_surface_command( self, cmd: IncomingCommand, max_chat_id: str, user_id: str, agent ) -> str | None: command = cmd.command args = cmd.args if command == "new": name = " ".join(args) if args else None self.max_chat_handler.handle_new( max_chat_id=max_chat_id, user_id=user_id, agent_id=agent.id, name=name, ) return f"New chat created: {name or 'Unnamed'}" elif command == "chats": return self.max_chat_handler.handle_chats(user_id) elif command == "rename": new_name = " ".join(args) if args else "" return self.max_chat_handler.handle_rename(max_chat_id, new_name) elif command == "archive": return self.max_chat_handler.handle_archive(max_chat_id) elif command in ("clear", "reset"): return self.max_chat_handler.handle_clear(max_chat_id) elif command == "list": return self.attach_handler.handle_list(max_chat_id) elif command == "remove": idx = args[0] if args else "" return self.attach_handler.handle_remove(max_chat_id, idx) elif command == "help": return get_help() return None # ------------------------------------------------------------------ # Outgoing to MAX # ------------------------------------------------------------------ async def _send_outgoing( self, max_chat_id: str, event: OutgoingEvent, workspace_path: str ) -> None: if isinstance(event, OutgoingTyping): await self._send_typing(max_chat_id) return if isinstance(event, OutgoingNotification): text = f"[{event.level.upper()}] {event.text}" await self._send_message(max_chat_id, text) return if isinstance(event, OutgoingMessage): if event.text: await self._send_message(max_chat_id, event.text) # Upload outgoing files for att in event.attachments: if not att.workspace_path: continue if self.files.file_exists(att.workspace_path, workspace_path): # Read file and upload to MAX file_data = self.files.read_outgoing_file( att.workspace_path, workspace_path ) # MAX file upload logic — зависит от API MAX # Пока просто отправляем имя файла текстом await self._send_message( max_chat_id, f"[Файл: {att.filename or att.workspace_path}]", ) return if isinstance(event, OutgoingUI): lines = [event.text] if event.buttons: for btn in event.buttons: lines.append(f" {btn.label}") lines.append("") lines.append("Ответьте !yes для подтверждения или !no для отмены.") await self._send_message(max_chat_id, "\n".join(lines)) return # ------------------------------------------------------------------ # Low-level MAX API # ------------------------------------------------------------------ async def _send_message(self, chat_id: str, text: str) -> None: async with self.session.post( f"{self.api_url}/sendMessage", json={"chat_id": chat_id, "text": text}, ) as resp: await resp.json() async def _send_typing(self, chat_id: str) -> None: async with self.session.post( f"{self.api_url}/sendChatAction", json={"chat_id": chat_id, "action": "typing"}, ) as resp: await resp.json() # --------------------------------------------------------------------------- # Entry point # --------------------------------------------------------------------------- async def main(): surface = MaxSurface() await surface.start() if __name__ == "__main__": asyncio.run(main())