From 7abbaf7e7a43d82c4d11248b6fc4eb9101a49f95 Mon Sep 17 00:00:00 2001 From: Vladislav Yashnov Date: Wed, 6 May 2026 15:16:22 +0300 Subject: [PATCH] max bot upd, max-agents.yaml --- .env.example | 5 + adapter/max/bot.py | 356 +++++++++++++++++++++++++++++++++-------- config/max-agents.yaml | 5 + docker-compose.max.yml | 24 +++ 4 files changed, 321 insertions(+), 69 deletions(-) create mode 100644 config/max-agents.yaml create mode 100644 docker-compose.max.yml diff --git a/.env.example b/.env.example index cc5f2e0..ebc56e4 100644 --- a/.env.example +++ b/.env.example @@ -30,3 +30,8 @@ SURFACES_WORKSPACE_DIR=/agents # Docker volume names (created automatically on first run) SURFACES_SHARED_VOLUME=surfaces-agents SURFACES_BOT_STATE_VOLUME=surfaces-bot-state + +# MAX Surface +MAX_BOT_TOKEN=real_max_token +MAX_API_URL=https://api.max.ru/v1 +MAX_AGENT_REGISTRY_PATH=/app/config/max-agents.yaml \ No newline at end of file diff --git a/adapter/max/bot.py b/adapter/max/bot.py index 2e539b1..ffb7f6e 100644 --- a/adapter/max/bot.py +++ b/adapter/max/bot.py @@ -1,41 +1,197 @@ """MAX surface bot runtime.""" +from __future__ import annotations + import os import asyncio -import aiohttp -from typing import Optional +import logging +from pathlib import Path -from adapter.max.agent_registry import load_from_env, AgentRegistry, AgentConfig -from adapter.max.store import ChatStore -from adapter.max.files import FileHandler +import aiohttp +import structlog + +from adapter.max.agent_registry import load_from_env, AgentRegistry from adapter.max.converter import ( max_message_to_incoming, max_attachment_to_internal, ) -from adapter.max.handlers.chat import ChatHandler +from adapter.max.files import FileHandler +from adapter.max.handlers.chat import ChatHandler as MaxChatHandler from adapter.max.handlers.attachments import AttachmentHandler from adapter.max.handlers.help import get_help -from core.protocol import IncomingMessage, IncomingCommand, IncomingCallback +from adapter.max.store import ChatStore, RoomMeta +from core.chat import ChatManager +from core.auth import AuthManager +from core.handler import EventDispatcher +from core.protocol import ( + Attachment, + IncomingMessage, + IncomingCommand, + IncomingCallback, + OutgoingEvent, + OutgoingMessage, + OutgoingNotification, + OutgoingTyping, + OutgoingUI, +) +from core.settings import SettingsManager +from core.store import InMemoryStore, StateStore + +from sdk.interface import ( + MessageChunk, + MessageResponse, + PlatformClient, + PlatformError, + User, + UserSettings, +) +from sdk.real import RealPlatformClient + +logger = structlog.get_logger(__name__) + + +# --------------------------------------------------------------------------- +# Routed MAX platform client — копия логики RoutedPlatformClient из Matrix +# --------------------------------------------------------------------------- + +class RoutedMaxPlatformClient(PlatformClient): + """Маршрутизирует запросы к нужному агенту на основе chat_id.""" + + def __init__(self, *, store: ChatStore, delegates: dict[str, PlatformClient]): + if not delegates: + raise ValueError("RoutedMaxPlatformClient requires at least one delegate") + self._store = store + self._delegates = dict(delegates) + self._default_client = next(iter(self._delegates.values())) + + async def get_or_create_user( + self, external_id: str, platform: str, display_name: str | None = None + ) -> User: + return await self._default_client.get_or_create_user( + external_id=external_id, platform=platform, display_name=display_name + ) + + async def send_message( + self, + user_id: str, + chat_id: str, + text: str, + attachments: list[Attachment] | None = None, + ) -> MessageResponse: + delegate, platform_chat_id = await self._resolve_delegate(user_id, chat_id) + return await delegate.send_message(user_id, platform_chat_id, text, attachments) + + async def stream_message( + self, + user_id: str, + chat_id: str, + text: str, + attachments: list[Attachment] | None = None, + ): + delegate, platform_chat_id = await self._resolve_delegate(user_id, chat_id) + async for chunk in delegate.stream_message(user_id, platform_chat_id, text, attachments): + yield chunk + + async def get_settings(self, user_id: str) -> UserSettings: + return await self._default_client.get_settings(user_id) + + async def update_settings(self, user_id: str, action) -> None: + await self._default_client.update_settings(user_id, action) + + async def close(self) -> None: + for delegate in self._delegates.values(): + close_fn = getattr(delegate, "close", None) + if callable(close_fn): + await close_fn() + + async def _resolve_delegate( + self, user_id: str, local_chat_id: str + ) -> tuple[PlatformClient, str]: + room = self._store.get_room_by_platform_chat_id(local_chat_id) + if room is None: + raise PlatformError(f"unknown chat id: {local_chat_id}", code="CHAT_NOT_FOUND") + + agent_id = room.agent_id + platform_chat_id = room.platform_chat_id + + if not agent_id or not platform_chat_id: + raise PlatformError( + f"routing incomplete for chat: {local_chat_id}", code="ROUTE_INCOMPLETE" + ) + + delegate = self._delegates.get(str(agent_id)) + if delegate is None: + raise PlatformError(f"unknown agent id: {agent_id}", code="AGENT_NOT_FOUND") + + return delegate, str(platform_chat_id) + + +# --------------------------------------------------------------------------- +# MAX Surface +# --------------------------------------------------------------------------- class MaxSurface: def __init__(self): + # Env self.token = os.environ["MAX_BOT_TOKEN"] self.api_url = os.environ.get("MAX_API_URL", "https://api.max.ru/v1") self.workspace_dir = os.environ.get("SURFACES_WORKSPACE_DIR", "/agents") self.agent_base_url = os.environ.get("AGENT_BASE_URL", "") + # Registry self.registry: AgentRegistry = load_from_env() + + # MAX-specific store for chat ↔ agent mapping self.store = ChatStore() self.files = FileHandler(self.workspace_dir) - self.chat_handler = ChatHandler(self.store) + self.max_chat_handler = MaxChatHandler(self.store) self.attach_handler = AttachmentHandler(self.store) - self.session: Optional[aiohttp.ClientSession] = None + + # Core store (in-memory, lost on restart — OK for MVP) + self.core_store: StateStore = InMemoryStore() + + # Platform client per agent + delegates: dict[str, PlatformClient] = {} + for agent in self.registry.agents: + base = self.agent_base_url or agent.base_url.rstrip("/") + delegates[agent.id] = RealPlatformClient( + agent_id=agent.id, + agent_base_url=base, + prototype_state=None, + platform="max", + ) + + # Routed platform + self.platform = RoutedMaxPlatformClient( + store=self.store, + delegates=delegates, + ) + + # Core managers + self.chat_mgr = ChatManager(self.platform, self.core_store) + self.auth_mgr = AuthManager(self.platform, self.core_store) + self.settings_mgr = SettingsManager(self.platform, self.core_store) + + # Event dispatcher — это и есть "ядро" + self.dispatcher = EventDispatcher( + platform=self.platform, + chat_mgr=self.chat_mgr, + auth_mgr=self.auth_mgr, + settings_mgr=self.settings_mgr, + ) + + # HTTP session for MAX API + self.session: aiohttp.ClientSession | None = None + + # ------------------------------------------------------------------ + # Long polling + # ------------------------------------------------------------------ async def start(self): self.session = aiohttp.ClientSession( headers={"Authorization": f"Bearer {self.token}"} ) - print("[MAX Surface] Starting long poll...") + logger.info("max_surface_starting", api_url=self.api_url) offset = 0 while True: @@ -45,7 +201,7 @@ class MaxSurface: offset = update["update_id"] + 1 await self._process_update(update) except Exception as e: - print(f"[MAX Surface] Error: {e}") + logger.error("max_poll_error", error=str(e)) await asyncio.sleep(5) async def _get_updates(self, offset: int) -> list: @@ -62,16 +218,20 @@ class MaxSurface: elif "callback_query" in update: await self._handle_callback(update["callback_query"]) + # ------------------------------------------------------------------ + # Message handling + # ------------------------------------------------------------------ + async def _handle_message(self, message: dict) -> None: text = message.get("text", "") or message.get("caption", "") user_id = str(message["from"]["id"]) chat_id = str(message["chat"]["id"]) - message_id = str(message["message_id"]) + # Ensure room exists room = self.store.get_room_by_max_chat_id(chat_id) if room is None: agent = self.registry.get_agent_for_user(user_id) - platform_chat_id = self.chat_handler.handle_new( + platform_chat_id = self.max_chat_handler.handle_new( max_chat_id=chat_id, user_id=user_id, agent_id=agent.id, @@ -80,6 +240,7 @@ class MaxSurface: else: agent = self.registry.get_agent_by_id(room.agent_id) + # Handle attachments attachments = [] if "attachment" in message: att = message["attachment"] @@ -98,6 +259,23 @@ class MaxSurface: ) self.store.stage_attachment(chat_id, (workspace_path, att["filename"])) + # File-only message → stage and return + if attachments and not text: + return + + # Merge staged attachments + queued = self.store.pop_attachments(chat_id) + if queued: + for ws_path, filename in queued: + attachments.append( + Attachment( + type="document", + filename=filename, + workspace_path=ws_path, + ) + ) + + # Convert to incoming event incoming = max_message_to_incoming( text=text, user_id=user_id, @@ -105,6 +283,7 @@ class MaxSurface: attachments=attachments, ) + # Surface-level commands if isinstance(incoming, IncomingCommand): response_text = await self._handle_surface_command( incoming, max_chat_id=chat_id, user_id=user_id, agent=agent @@ -113,24 +292,31 @@ class MaxSurface: await self._send_message(chat_id, response_text) return - if isinstance(incoming, IncomingCallback): - await self._call_agent( - agent=agent, - platform_chat_id=room.platform_chat_id, - message=incoming, + # Dispatch to core + try: + outgoing_events = await self.dispatcher.dispatch(incoming) + except PlatformError as exc: + logger.warning( + "max_dispatch_platform_error", + user_id=user_id, + chat_id=chat_id, + code=exc.code, + error=str(exc), ) - return + outgoing_events = [ + OutgoingMessage( + chat_id=room.platform_chat_id, + text="Сервис временно недоступен. Попробуйте ещё раз позже.", + ) + ] - if isinstance(incoming, IncomingMessage) and (incoming.text or attachments): - queued = self.store.pop_attachments(chat_id) - await self._send_typing(chat_id) - agent_response = await self._call_agent( - agent=agent, - platform_chat_id=room.platform_chat_id, - message=incoming, - attachments=queued, - ) - await self._send_message(chat_id, agent_response) + # Send outgoing events back to MAX + for event in outgoing_events: + await self._send_outgoing(chat_id, event, agent.workspace_path) + + # ------------------------------------------------------------------ + # Callbacks + # ------------------------------------------------------------------ async def _handle_callback(self, callback: dict) -> None: user_id = str(callback["from"]["id"]) @@ -150,22 +336,29 @@ class MaxSurface: message_id=message_id, ) - agent = self.registry.get_agent_by_id(room.agent_id) - await self._call_agent( - agent=agent, - platform_chat_id=room.platform_chat_id, - message=incoming, - ) + try: + outgoing_events = await self.dispatcher.dispatch(incoming) + except PlatformError: + return + + for event in outgoing_events: + agent = self.registry.get_agent_by_id(room.agent_id) + ws = agent.workspace_path if agent else "/agents/0" + await self._send_outgoing(chat_id, event, ws) + + # ------------------------------------------------------------------ + # Surface commands + # ------------------------------------------------------------------ async def _handle_surface_command( - self, cmd: IncomingCommand, max_chat_id: str, user_id: str, agent: AgentConfig - ) -> Optional[str]: + self, cmd: IncomingCommand, max_chat_id: str, user_id: str, agent + ) -> str | None: command = cmd.command args = cmd.args if command == "new": name = " ".join(args) if args else None - self.chat_handler.handle_new( + self.max_chat_handler.handle_new( max_chat_id=max_chat_id, user_id=user_id, agent_id=agent.id, @@ -174,17 +367,17 @@ class MaxSurface: return f"New chat created: {name or 'Unnamed'}" elif command == "chats": - return self.chat_handler.handle_chats(user_id) + return self.max_chat_handler.handle_chats(user_id) elif command == "rename": new_name = " ".join(args) if args else "" - return self.chat_handler.handle_rename(max_chat_id, new_name) + return self.max_chat_handler.handle_rename(max_chat_id, new_name) elif command == "archive": - return self.chat_handler.handle_archive(max_chat_id) + return self.max_chat_handler.handle_archive(max_chat_id) elif command in ("clear", "reset"): - return self.chat_handler.handle_clear(max_chat_id) + return self.max_chat_handler.handle_clear(max_chat_id) elif command == "list": return self.attach_handler.handle_list(max_chat_id) @@ -198,35 +391,56 @@ class MaxSurface: return None - async def _call_agent( - self, - agent: AgentConfig, - platform_chat_id: str, - message=None, - attachments: list = None, - ) -> str: - payload = { - "chat_id": platform_chat_id, - "agent_id": agent.id, - } - if message: - if isinstance(message, IncomingMessage): - payload["message"] = message.text - elif isinstance(message, IncomingCallback): - payload["action"] = message.action - payload["payload"] = message.payload - elif isinstance(message, IncomingCommand): - payload["command"] = message.command - payload["args"] = message.args - if attachments: - payload["attachments"] = [a[0] for a in attachments] + # ------------------------------------------------------------------ + # Outgoing to MAX + # ------------------------------------------------------------------ - base = self.agent_base_url or agent.base_url.rstrip("/") - url = f"{base}/chat/{agent.id}" + async def _send_outgoing( + self, max_chat_id: str, event: OutgoingEvent, workspace_path: str + ) -> None: + if isinstance(event, OutgoingTyping): + await self._send_typing(max_chat_id) + return - async with self.session.post(url, json=payload) as resp: - data = await resp.json() - return data.get("response", "") + if isinstance(event, OutgoingNotification): + text = f"[{event.level.upper()}] {event.text}" + await self._send_message(max_chat_id, text) + return + + if isinstance(event, OutgoingMessage): + if event.text: + await self._send_message(max_chat_id, event.text) + + # Upload outgoing files + for att in event.attachments: + if not att.workspace_path: + continue + if self.files.file_exists(att.workspace_path, workspace_path): + # Read file and upload to MAX + file_data = self.files.read_outgoing_file( + att.workspace_path, workspace_path + ) + # MAX file upload logic — зависит от API MAX + # Пока просто отправляем имя файла текстом + await self._send_message( + max_chat_id, + f"[Файл: {att.filename or att.workspace_path}]", + ) + return + + if isinstance(event, OutgoingUI): + lines = [event.text] + if event.buttons: + for btn in event.buttons: + lines.append(f" {btn.label}") + lines.append("") + lines.append("Ответьте !yes для подтверждения или !no для отмены.") + await self._send_message(max_chat_id, "\n".join(lines)) + return + + # ------------------------------------------------------------------ + # Low-level MAX API + # ------------------------------------------------------------------ async def _send_message(self, chat_id: str, text: str) -> None: async with self.session.post( @@ -243,6 +457,10 @@ class MaxSurface: await resp.json() +# --------------------------------------------------------------------------- +# Entry point +# --------------------------------------------------------------------------- + async def main(): surface = MaxSurface() await surface.start() diff --git a/config/max-agents.yaml b/config/max-agents.yaml new file mode 100644 index 0000000..8713355 --- /dev/null +++ b/config/max-agents.yaml @@ -0,0 +1,5 @@ +agents: + - id: agent-0 + label: "Agent 0" + base_url: "http://agent-proxy:7000/agent_0/" + workspace_path: "/agents/0" \ No newline at end of file diff --git a/docker-compose.max.yml b/docker-compose.max.yml new file mode 100644 index 0000000..cd75da1 --- /dev/null +++ b/docker-compose.max.yml @@ -0,0 +1,24 @@ +services: + max-bot: + build: + context: . + target: development + environment: + - MAX_BOT_TOKEN=${MAX_BOT_TOKEN} + - MAX_API_URL=${MAX_API_URL:-https://api.max.ru/v1} + - MAX_AGENT_REGISTRY_PATH=/app/config/max-agents.yaml + - AGENT_BASE_URL=http://platform-agent:8000 + - SURFACES_WORKSPACE_DIR=/agents + volumes: + - surfaces-agents:/agents + command: python -m adapter.max.bot + + platform-agent: + image: platform-agent:latest + environment: + - WORKSPACE_DIR=/workspace + volumes: + - surfaces-agents:/workspace + +volumes: + surfaces-agents: \ No newline at end of file