From eed1533cdc368a096e9524978fe0013b76924320 Mon Sep 17 00:00:00 2001 From: Vladislav Yashnov Date: Wed, 6 May 2026 00:24:47 +0300 Subject: [PATCH 1/3] max first steps --- adapter/max/__init__.py | 1 + adapter/max/agent_registry.py | 50 ++++++ adapter/max/bot.py | 247 ++++++++++++++++++++++++++++ adapter/max/converter.py | 88 ++++++++++ adapter/max/files.py | 51 ++++++ adapter/max/handlers/__init__.py | 1 + adapter/max/handlers/attachments.py | 29 ++++ adapter/max/handlers/chat.py | 48 ++++++ adapter/max/handlers/help.py | 26 +++ adapter/max/store.py | 48 ++++++ 10 files changed, 589 insertions(+) create mode 100644 adapter/max/__init__.py create mode 100644 adapter/max/agent_registry.py create mode 100644 adapter/max/bot.py create mode 100644 adapter/max/converter.py create mode 100644 adapter/max/files.py create mode 100644 adapter/max/handlers/__init__.py create mode 100644 adapter/max/handlers/attachments.py create mode 100644 adapter/max/handlers/chat.py create mode 100644 adapter/max/handlers/help.py create mode 100644 adapter/max/store.py diff --git a/adapter/max/__init__.py b/adapter/max/__init__.py new file mode 100644 index 0000000..c12c8ba --- /dev/null +++ b/adapter/max/__init__.py @@ -0,0 +1 @@ +"""MAX surface adapter.""" \ No newline at end of file diff --git a/adapter/max/agent_registry.py b/adapter/max/agent_registry.py new file mode 100644 index 0000000..a3c2b67 --- /dev/null +++ b/adapter/max/agent_registry.py @@ -0,0 +1,50 @@ +"""Agent registry for MAX surface.""" +import os +import yaml +from typing import List, Optional +from dataclasses import dataclass, field + + +@dataclass +class AgentConfig: + id: str + label: str + base_url: str + workspace_path: str + + +@dataclass +class AgentRegistry: + agents: List[AgentConfig] = field(default_factory=list) + + def get_agent_for_user(self, user_id: str) -> AgentConfig: + return self.agents[0] + + def get_agent_by_id(self, agent_id: str) -> Optional[AgentConfig]: + for agent in self.agents: + if agent.id == agent_id: + return agent + return None + + +def load_agent_registry(path: str) -> AgentRegistry: + with open(path, "r") as f: + data = yaml.safe_load(f) + + registry = AgentRegistry() + for a in data.get("agents", []): + registry.agents.append(AgentConfig( + id=a["id"], + label=a.get("label", ""), + base_url=a["base_url"], + workspace_path=a["workspace_path"], + )) + return registry + + +def load_from_env() -> AgentRegistry: + path = os.environ.get( + "MAX_AGENT_REGISTRY_PATH", + "/app/config/max-agents.yaml", + ) + return load_agent_registry(path) \ No newline at end of file diff --git a/adapter/max/bot.py b/adapter/max/bot.py new file mode 100644 index 0000000..af857f7 --- /dev/null +++ b/adapter/max/bot.py @@ -0,0 +1,247 @@ +"""MAX surface bot runtime.""" +import os +import asyncio +import aiohttp +from typing import Optional + +from adapter.max.agent_registry import load_from_env, AgentRegistry, AgentConfig +from adapter.max.store import ChatStore +from adapter.max.files import FileHandler +from adapter.max.converter import ( + max_message_to_incoming, + max_attachment_to_internal, +) +from adapter.max.handlers.chat import ChatHandler +from adapter.max.handlers.attachments import AttachmentHandler +from adapter.max.handlers.help import get_help +from core.protocol import IncomingMessage, IncomingCommand, IncomingCallback + + +class MaxSurface: + def __init__(self): + self.token = os.environ["MAX_BOT_TOKEN"] + self.api_url = os.environ.get("MAX_API_URL", "https://api.max.ru/v1") + self.workspace_dir = os.environ.get("SURFACES_WORKSPACE_DIR", "/agents") + + self.registry: AgentRegistry = load_from_env() + self.store = ChatStore() + self.files = FileHandler(self.workspace_dir) + self.chat_handler = ChatHandler(self.store) + self.attach_handler = AttachmentHandler(self.store) + self.session: Optional[aiohttp.ClientSession] = None + + async def start(self): + self.session = aiohttp.ClientSession( + headers={"Authorization": f"Bearer {self.token}"} + ) + print("[MAX Surface] Starting long poll...") + offset = 0 + + while True: + try: + updates = await self._get_updates(offset) + for update in updates: + offset = update["update_id"] + 1 + await self._process_update(update) + except Exception as e: + print(f"[MAX Surface] Error: {e}") + await asyncio.sleep(5) + + async def _get_updates(self, offset: int) -> list: + async with self.session.get( + f"{self.api_url}/updates", + params={"offset": offset, "timeout": 30}, + ) as resp: + data = await resp.json() + return data.get("result", []) + + async def _process_update(self, update: dict) -> None: + if "message" in update: + await self._handle_message(update["message"]) + elif "callback_query" in update: + await self._handle_callback(update["callback_query"]) + + async def _handle_message(self, message: dict) -> None: + text = message.get("text", "") or message.get("caption", "") + user_id = str(message["from"]["id"]) + chat_id = str(message["chat"]["id"]) + message_id = str(message["message_id"]) + + room = self.store.get_room_by_max_chat_id(chat_id) + if room is None: + agent = self.registry.get_agent_for_user(user_id) + platform_chat_id = self.chat_handler.handle_new( + max_chat_id=chat_id, + user_id=user_id, + agent_id=agent.id, + ) + room = self.store.get_room_by_max_chat_id(chat_id) + else: + agent = self.registry.get_agent_by_id(room.agent_id) + + attachments = [] + if "attachment" in message: + att = message["attachment"] + internal_att = max_attachment_to_internal( + filename=att["filename"], + mime_type=att.get("mime_type", "application/octet-stream"), + download_url=att["download_url"], + ) + attachments.append(internal_att) + + workspace_path = await self.files.download_attachment( + download_url=att["download_url"], + filename=att["filename"], + agent_workspace=agent.workspace_path, + headers={"Authorization": f"Bearer {self.token}"}, + ) + self.store.stage_attachment(chat_id, (workspace_path, att["filename"])) + + incoming = max_message_to_incoming( + text=text, + user_id=user_id, + chat_id=room.platform_chat_id, + attachments=attachments, + ) + + if isinstance(incoming, IncomingCommand): + response_text = await self._handle_surface_command( + incoming, max_chat_id=chat_id, user_id=user_id, agent=agent + ) + if response_text: + await self._send_message(chat_id, response_text) + return + + if isinstance(incoming, IncomingCallback): + await self._call_agent( + agent=agent, + platform_chat_id=room.platform_chat_id, + message=incoming, + ) + return + + if isinstance(incoming, IncomingMessage) and (incoming.text or attachments): + queued = self.store.pop_attachments(chat_id) + await self._send_typing(chat_id) + agent_response = await self._call_agent( + agent=agent, + platform_chat_id=room.platform_chat_id, + message=incoming, + attachments=queued, + ) + await self._send_message(chat_id, agent_response) + + async def _handle_callback(self, callback: dict) -> None: + user_id = str(callback["from"]["id"]) + chat_id = str(callback["message"]["chat"]["id"]) + message_id = str(callback["message"]["message_id"]) + data = callback.get("data", "") + + room = self.store.get_room_by_max_chat_id(chat_id) + if room is None: + return + + incoming = max_message_to_incoming( + text="", + user_id=user_id, + chat_id=room.platform_chat_id, + callback_data=data, + message_id=message_id, + ) + + agent = self.registry.get_agent_by_id(room.agent_id) + await self._call_agent( + agent=agent, + platform_chat_id=room.platform_chat_id, + message=incoming, + ) + + async def _handle_surface_command( + self, cmd: IncomingCommand, max_chat_id: str, user_id: str, agent: AgentConfig + ) -> Optional[str]: + command = cmd.command + args = cmd.args + + if command == "new": + name = " ".join(args) if args else None + self.chat_handler.handle_new( + max_chat_id=max_chat_id, + user_id=user_id, + agent_id=agent.id, + name=name, + ) + return f"New chat created: {name or 'Unnamed'}" + + elif command == "chats": + return self.chat_handler.handle_chats(user_id) + + elif command == "rename": + return self.chat_handler.handle_rename(max_chat_id, args) + + elif command == "archive": + return self.chat_handler.handle_archive(max_chat_id) + + elif command in ("clear", "reset"): + return self.chat_handler.handle_clear(max_chat_id) + + elif command == "list": + return self.attach_handler.handle_list(max_chat_id) + + elif command == "remove": + return self.attach_handler.handle_remove(max_chat_id, args) + + elif command == "help": + return get_help() + + return None + + async def _call_agent( + self, + agent: AgentConfig, + platform_chat_id: str, + message=None, + attachments: list = None, + ) -> str: + payload = { + "chat_id": platform_chat_id, + "agent_id": agent.id, + } + if message: + if isinstance(message, IncomingMessage): + payload["message"] = message.text + elif isinstance(message, IncomingCallback): + payload["action"] = message.action + payload["payload"] = message.payload + elif isinstance(message, IncomingCommand): + payload["command"] = message.command + payload["args"] = message.args + if attachments: + payload["attachments"] = [a[0] for a in attachments] + + url = f"{agent.base_url}/chat/{agent.id}" + async with self.session.post(url, json=payload) as resp: + data = await resp.json() + return data.get("response", "") + + async def _send_message(self, chat_id: str, text: str) -> None: + async with self.session.post( + f"{self.api_url}/sendMessage", + json={"chat_id": chat_id, "text": text}, + ) as resp: + await resp.json() + + async def _send_typing(self, chat_id: str) -> None: + async with self.session.post( + f"{self.api_url}/sendChatAction", + json={"chat_id": chat_id, "action": "typing"}, + ) as resp: + await resp.json() + + +async def main(): + surface = MaxSurface() + await surface.start() + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/adapter/max/converter.py b/adapter/max/converter.py new file mode 100644 index 0000000..2bbfd8b --- /dev/null +++ b/adapter/max/converter.py @@ -0,0 +1,88 @@ +"""MAX event to internal protocol converter.""" +from typing import Union, List +from core.protocol import ( + IncomingMessage, + IncomingCommand, + IncomingCallback, + Attachment, +) + + +def _extract_command(text: str) -> Union[IncomingCommand, IncomingCallback, None]: + if not text.startswith("!"): + return None + + parts = text.strip().split(maxsplit=1) + cmd = parts[0].lower() + args = parts[1] if len(parts) > 1 else "" + + if cmd == "!yes": + return IncomingCallback( + user_id="", + platform="max", + chat_id="", + action="confirm", + ) + elif cmd == "!no": + return IncomingCallback( + user_id="", + platform="max", + chat_id="", + action="cancel", + ) + else: + return IncomingCommand( + user_id="", + platform="max", + chat_id="", + command=cmd.lstrip("!"), + args=args.split() if args else [], + ) + + +def max_message_to_incoming( + *, + text: str, + user_id: str, + chat_id: str, + attachments: List[Attachment] = None, + callback_data: str = None, + message_id: str = None, +) -> Union[IncomingMessage, IncomingCommand, IncomingCallback]: + if callback_data: + return IncomingCallback( + user_id=user_id, + platform="max", + chat_id=chat_id, + action=callback_data, + payload={"message_id": message_id} if message_id else {}, + ) + + if text: + cmd = _extract_command(text) + if cmd is not None: + cmd.user_id = user_id + cmd.chat_id = chat_id + return cmd + + return IncomingMessage( + user_id=user_id, + platform="max", + chat_id=chat_id, + text=text or "", + attachments=attachments or [], + ) + + +def max_attachment_to_internal( + *, + filename: str, + mime_type: str, + download_url: str, +) -> Attachment: + return Attachment( + type="document", + url=download_url, + filename=filename, + mime_type=mime_type, + ) \ No newline at end of file diff --git a/adapter/max/files.py b/adapter/max/files.py new file mode 100644 index 0000000..58b87fb --- /dev/null +++ b/adapter/max/files.py @@ -0,0 +1,51 @@ +"""File handling for MAX surface.""" +import os +import aiohttp +from pathlib import Path + + +class FileHandler: + def __init__(self, workspace_root: str): + self.workspace_root = workspace_root + + def _make_unique_filename(self, directory: str, filename: str) -> str: + base = Path(filename).stem + ext = Path(filename).suffix + candidate = filename + counter = 1 + while os.path.exists(os.path.join(directory, candidate)): + candidate = f"{base} ({counter}){ext}" + counter += 1 + return candidate + + async def download_attachment( + self, + download_url: str, + filename: str, + agent_workspace: str, + headers: dict = None, + ) -> str: + full_dir = os.path.join(self.workspace_root, agent_workspace.strip("/")) + os.makedirs(full_dir, exist_ok=True) + + unique_name = self._make_unique_filename(full_dir, filename) + filepath = os.path.join(full_dir, unique_name) + + async with aiohttp.ClientSession() as session: + async with session.get(download_url, headers=headers) as resp: + resp.raise_for_status() + with open(filepath, "wb") as f: + f.write(await resp.read()) + + return unique_name + + def read_outgoing_file(self, workspace_path: str, agent_workspace: str) -> bytes: + full_dir = os.path.join(self.workspace_root, agent_workspace.strip("/")) + filepath = os.path.join(full_dir, workspace_path.lstrip("/")) + with open(filepath, "rb") as f: + return f.read() + + def file_exists(self, workspace_path: str, agent_workspace: str) -> bool: + full_dir = os.path.join(self.workspace_root, agent_workspace.strip("/")) + filepath = os.path.join(full_dir, workspace_path.lstrip("/")) + return os.path.exists(filepath) \ No newline at end of file diff --git a/adapter/max/handlers/__init__.py b/adapter/max/handlers/__init__.py new file mode 100644 index 0000000..3355924 --- /dev/null +++ b/adapter/max/handlers/__init__.py @@ -0,0 +1 @@ +"""MAX surface handlers.""" \ No newline at end of file diff --git a/adapter/max/handlers/attachments.py b/adapter/max/handlers/attachments.py new file mode 100644 index 0000000..c4d0da5 --- /dev/null +++ b/adapter/max/handlers/attachments.py @@ -0,0 +1,29 @@ +"""Attachment queue handlers for MAX surface.""" +from adapter.max.store import ChatStore + + +class AttachmentHandler: + def __init__(self, store: ChatStore): + self.store = store + + def handle_list(self, max_chat_id: str) -> str: + attachments = self.store.get_attachments(max_chat_id) + if not attachments: + return "Attachment queue is empty." + lines = [f" {i+1}. {name}" for i, (_, name) in enumerate(attachments)] + return "\n".join(lines) + + def handle_remove(self, max_chat_id: str, index: str) -> str: + attachments = self.store.staged_attachments.get(max_chat_id, []) + if index.lower() == "all": + self.store.staged_attachments[max_chat_id] = [] + return "All attachments removed from queue." + + try: + idx = int(index) - 1 + if 0 <= idx < len(attachments): + removed = attachments.pop(idx) + return f"Removed: {removed[1]}" + return "Invalid index." + except ValueError: + return "Usage: !remove or !remove all" \ No newline at end of file diff --git a/adapter/max/handlers/chat.py b/adapter/max/handlers/chat.py new file mode 100644 index 0000000..2933d5f --- /dev/null +++ b/adapter/max/handlers/chat.py @@ -0,0 +1,48 @@ +"""Chat management handlers for MAX surface.""" +import uuid +from adapter.max.store import ChatStore, RoomMeta + + +class ChatHandler: + def __init__(self, store: ChatStore): + self.store = store + + def handle_new(self, max_chat_id: str, user_id: str, agent_id: str, name: str = None) -> str: + platform_chat_id = str(uuid.uuid4()) + room = RoomMeta( + platform_chat_id=platform_chat_id, + max_chat_id=max_chat_id, + name=name or "New Chat", + user_id=user_id, + agent_id=agent_id, + ) + self.store.add_room(room) + return platform_chat_id + + def handle_chats(self, user_id: str) -> str: + rooms = self.store.list_rooms_for_user(user_id) + if not rooms: + return "No active chats." + lines = [f" {i+1}. {r.name}" for i, r in enumerate(rooms)] + return "\n".join(lines) + + def handle_rename(self, max_chat_id: str, new_name: str) -> str: + room = self.store.get_room_by_max_chat_id(max_chat_id) + if not room: + return "Chat not found." + room.name = new_name + return f"Chat renamed to: {new_name}" + + def handle_archive(self, max_chat_id: str) -> str: + room = self.store.get_room_by_max_chat_id(max_chat_id) + if not room: + return "Chat not found." + self.store.remove_room(max_chat_id) + return "Chat archived." + + def handle_clear(self, max_chat_id: str) -> str: + room = self.store.get_room_by_max_chat_id(max_chat_id) + if not room: + return "Chat not found." + room.platform_chat_id = str(uuid.uuid4()) + return "Chat context cleared." \ No newline at end of file diff --git a/adapter/max/handlers/help.py b/adapter/max/handlers/help.py new file mode 100644 index 0000000..24181c2 --- /dev/null +++ b/adapter/max/handlers/help.py @@ -0,0 +1,26 @@ +"""Help handler for MAX surface.""" + +HELP_TEXT = """ +Available commands: + +Chat management: + !new [name] — Create a new chat + !chats — List active chats + !rename — Rename current chat + !archive — Archive current chat + !clear / !reset — Reset chat context + +Attachments: + !list — Show attachment queue + !remove — Remove attachment from queue + !remove all — Clear attachment queue + +Actions: + !yes — Confirm agent action + !no — Cancel agent action + !help — Show this help +""" + + +def get_help() -> str: + return HELP_TEXT.strip() \ No newline at end of file diff --git a/adapter/max/store.py b/adapter/max/store.py new file mode 100644 index 0000000..d8e6ce6 --- /dev/null +++ b/adapter/max/store.py @@ -0,0 +1,48 @@ +"""Chat store for MAX surface.""" +from dataclasses import dataclass, field +from typing import Dict, Optional + + +@dataclass +class RoomMeta: + platform_chat_id: str + max_chat_id: str + name: str + user_id: str + agent_id: str + + +@dataclass +class ChatStore: + rooms: Dict[str, RoomMeta] = field(default_factory=dict) + staged_attachments: Dict[str, list] = field(default_factory=dict) + + def get_room_by_max_chat_id(self, max_chat_id: str) -> Optional[RoomMeta]: + return self.rooms.get(max_chat_id) + + def get_room_by_platform_chat_id(self, platform_chat_id: str) -> Optional[RoomMeta]: + for room in self.rooms.values(): + if room.platform_chat_id == platform_chat_id: + return room + return None + + def add_room(self, room: RoomMeta) -> None: + self.rooms[room.max_chat_id] = room + + def remove_room(self, max_chat_id: str) -> None: + self.rooms.pop(max_chat_id, None) + self.staged_attachments.pop(max_chat_id, None) + + def list_rooms_for_user(self, user_id: str) -> list: + return [r for r in self.rooms.values() if r.user_id == user_id] + + def stage_attachment(self, max_chat_id: str, attachment: tuple) -> None: + if max_chat_id not in self.staged_attachments: + self.staged_attachments[max_chat_id] = [] + self.staged_attachments[max_chat_id].append(attachment) + + def pop_attachments(self, max_chat_id: str) -> list: + return self.staged_attachments.pop(max_chat_id, []) + + def get_attachments(self, max_chat_id: str) -> list: + return self.staged_attachments.get(max_chat_id, []) \ No newline at end of file From 3118b3c99a8d508a7138794c39019d9bfcdc1b97 Mon Sep 17 00:00:00 2001 From: Vladislav Yashnov Date: Wed, 6 May 2026 00:35:17 +0300 Subject: [PATCH 2/3] bot fix --- adapter/max/bot.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/adapter/max/bot.py b/adapter/max/bot.py index af857f7..2e539b1 100644 --- a/adapter/max/bot.py +++ b/adapter/max/bot.py @@ -22,6 +22,7 @@ class MaxSurface: self.token = os.environ["MAX_BOT_TOKEN"] self.api_url = os.environ.get("MAX_API_URL", "https://api.max.ru/v1") self.workspace_dir = os.environ.get("SURFACES_WORKSPACE_DIR", "/agents") + self.agent_base_url = os.environ.get("AGENT_BASE_URL", "") self.registry: AgentRegistry = load_from_env() self.store = ChatStore() @@ -176,7 +177,8 @@ class MaxSurface: return self.chat_handler.handle_chats(user_id) elif command == "rename": - return self.chat_handler.handle_rename(max_chat_id, args) + new_name = " ".join(args) if args else "" + return self.chat_handler.handle_rename(max_chat_id, new_name) elif command == "archive": return self.chat_handler.handle_archive(max_chat_id) @@ -188,7 +190,8 @@ class MaxSurface: return self.attach_handler.handle_list(max_chat_id) elif command == "remove": - return self.attach_handler.handle_remove(max_chat_id, args) + idx = args[0] if args else "" + return self.attach_handler.handle_remove(max_chat_id, idx) elif command == "help": return get_help() @@ -218,7 +221,9 @@ class MaxSurface: if attachments: payload["attachments"] = [a[0] for a in attachments] - url = f"{agent.base_url}/chat/{agent.id}" + base = self.agent_base_url or agent.base_url.rstrip("/") + url = f"{base}/chat/{agent.id}" + async with self.session.post(url, json=payload) as resp: data = await resp.json() return data.get("response", "") From 7abbaf7e7a43d82c4d11248b6fc4eb9101a49f95 Mon Sep 17 00:00:00 2001 From: Vladislav Yashnov Date: Wed, 6 May 2026 15:16:22 +0300 Subject: [PATCH 3/3] max bot upd, max-agents.yaml --- .env.example | 5 + adapter/max/bot.py | 356 +++++++++++++++++++++++++++++++++-------- config/max-agents.yaml | 5 + docker-compose.max.yml | 24 +++ 4 files changed, 321 insertions(+), 69 deletions(-) create mode 100644 config/max-agents.yaml create mode 100644 docker-compose.max.yml diff --git a/.env.example b/.env.example index cc5f2e0..ebc56e4 100644 --- a/.env.example +++ b/.env.example @@ -30,3 +30,8 @@ SURFACES_WORKSPACE_DIR=/agents # Docker volume names (created automatically on first run) SURFACES_SHARED_VOLUME=surfaces-agents SURFACES_BOT_STATE_VOLUME=surfaces-bot-state + +# MAX Surface +MAX_BOT_TOKEN=real_max_token +MAX_API_URL=https://api.max.ru/v1 +MAX_AGENT_REGISTRY_PATH=/app/config/max-agents.yaml \ No newline at end of file diff --git a/adapter/max/bot.py b/adapter/max/bot.py index 2e539b1..ffb7f6e 100644 --- a/adapter/max/bot.py +++ b/adapter/max/bot.py @@ -1,41 +1,197 @@ """MAX surface bot runtime.""" +from __future__ import annotations + import os import asyncio -import aiohttp -from typing import Optional +import logging +from pathlib import Path -from adapter.max.agent_registry import load_from_env, AgentRegistry, AgentConfig -from adapter.max.store import ChatStore -from adapter.max.files import FileHandler +import aiohttp +import structlog + +from adapter.max.agent_registry import load_from_env, AgentRegistry from adapter.max.converter import ( max_message_to_incoming, max_attachment_to_internal, ) -from adapter.max.handlers.chat import ChatHandler +from adapter.max.files import FileHandler +from adapter.max.handlers.chat import ChatHandler as MaxChatHandler from adapter.max.handlers.attachments import AttachmentHandler from adapter.max.handlers.help import get_help -from core.protocol import IncomingMessage, IncomingCommand, IncomingCallback +from adapter.max.store import ChatStore, RoomMeta +from core.chat import ChatManager +from core.auth import AuthManager +from core.handler import EventDispatcher +from core.protocol import ( + Attachment, + IncomingMessage, + IncomingCommand, + IncomingCallback, + OutgoingEvent, + OutgoingMessage, + OutgoingNotification, + OutgoingTyping, + OutgoingUI, +) +from core.settings import SettingsManager +from core.store import InMemoryStore, StateStore + +from sdk.interface import ( + MessageChunk, + MessageResponse, + PlatformClient, + PlatformError, + User, + UserSettings, +) +from sdk.real import RealPlatformClient + +logger = structlog.get_logger(__name__) + + +# --------------------------------------------------------------------------- +# Routed MAX platform client — копия логики RoutedPlatformClient из Matrix +# --------------------------------------------------------------------------- + +class RoutedMaxPlatformClient(PlatformClient): + """Маршрутизирует запросы к нужному агенту на основе chat_id.""" + + def __init__(self, *, store: ChatStore, delegates: dict[str, PlatformClient]): + if not delegates: + raise ValueError("RoutedMaxPlatformClient requires at least one delegate") + self._store = store + self._delegates = dict(delegates) + self._default_client = next(iter(self._delegates.values())) + + async def get_or_create_user( + self, external_id: str, platform: str, display_name: str | None = None + ) -> User: + return await self._default_client.get_or_create_user( + external_id=external_id, platform=platform, display_name=display_name + ) + + async def send_message( + self, + user_id: str, + chat_id: str, + text: str, + attachments: list[Attachment] | None = None, + ) -> MessageResponse: + delegate, platform_chat_id = await self._resolve_delegate(user_id, chat_id) + return await delegate.send_message(user_id, platform_chat_id, text, attachments) + + async def stream_message( + self, + user_id: str, + chat_id: str, + text: str, + attachments: list[Attachment] | None = None, + ): + delegate, platform_chat_id = await self._resolve_delegate(user_id, chat_id) + async for chunk in delegate.stream_message(user_id, platform_chat_id, text, attachments): + yield chunk + + async def get_settings(self, user_id: str) -> UserSettings: + return await self._default_client.get_settings(user_id) + + async def update_settings(self, user_id: str, action) -> None: + await self._default_client.update_settings(user_id, action) + + async def close(self) -> None: + for delegate in self._delegates.values(): + close_fn = getattr(delegate, "close", None) + if callable(close_fn): + await close_fn() + + async def _resolve_delegate( + self, user_id: str, local_chat_id: str + ) -> tuple[PlatformClient, str]: + room = self._store.get_room_by_platform_chat_id(local_chat_id) + if room is None: + raise PlatformError(f"unknown chat id: {local_chat_id}", code="CHAT_NOT_FOUND") + + agent_id = room.agent_id + platform_chat_id = room.platform_chat_id + + if not agent_id or not platform_chat_id: + raise PlatformError( + f"routing incomplete for chat: {local_chat_id}", code="ROUTE_INCOMPLETE" + ) + + delegate = self._delegates.get(str(agent_id)) + if delegate is None: + raise PlatformError(f"unknown agent id: {agent_id}", code="AGENT_NOT_FOUND") + + return delegate, str(platform_chat_id) + + +# --------------------------------------------------------------------------- +# MAX Surface +# --------------------------------------------------------------------------- class MaxSurface: def __init__(self): + # Env self.token = os.environ["MAX_BOT_TOKEN"] self.api_url = os.environ.get("MAX_API_URL", "https://api.max.ru/v1") self.workspace_dir = os.environ.get("SURFACES_WORKSPACE_DIR", "/agents") self.agent_base_url = os.environ.get("AGENT_BASE_URL", "") + # Registry self.registry: AgentRegistry = load_from_env() + + # MAX-specific store for chat ↔ agent mapping self.store = ChatStore() self.files = FileHandler(self.workspace_dir) - self.chat_handler = ChatHandler(self.store) + self.max_chat_handler = MaxChatHandler(self.store) self.attach_handler = AttachmentHandler(self.store) - self.session: Optional[aiohttp.ClientSession] = None + + # Core store (in-memory, lost on restart — OK for MVP) + self.core_store: StateStore = InMemoryStore() + + # Platform client per agent + delegates: dict[str, PlatformClient] = {} + for agent in self.registry.agents: + base = self.agent_base_url or agent.base_url.rstrip("/") + delegates[agent.id] = RealPlatformClient( + agent_id=agent.id, + agent_base_url=base, + prototype_state=None, + platform="max", + ) + + # Routed platform + self.platform = RoutedMaxPlatformClient( + store=self.store, + delegates=delegates, + ) + + # Core managers + self.chat_mgr = ChatManager(self.platform, self.core_store) + self.auth_mgr = AuthManager(self.platform, self.core_store) + self.settings_mgr = SettingsManager(self.platform, self.core_store) + + # Event dispatcher — это и есть "ядро" + self.dispatcher = EventDispatcher( + platform=self.platform, + chat_mgr=self.chat_mgr, + auth_mgr=self.auth_mgr, + settings_mgr=self.settings_mgr, + ) + + # HTTP session for MAX API + self.session: aiohttp.ClientSession | None = None + + # ------------------------------------------------------------------ + # Long polling + # ------------------------------------------------------------------ async def start(self): self.session = aiohttp.ClientSession( headers={"Authorization": f"Bearer {self.token}"} ) - print("[MAX Surface] Starting long poll...") + logger.info("max_surface_starting", api_url=self.api_url) offset = 0 while True: @@ -45,7 +201,7 @@ class MaxSurface: offset = update["update_id"] + 1 await self._process_update(update) except Exception as e: - print(f"[MAX Surface] Error: {e}") + logger.error("max_poll_error", error=str(e)) await asyncio.sleep(5) async def _get_updates(self, offset: int) -> list: @@ -62,16 +218,20 @@ class MaxSurface: elif "callback_query" in update: await self._handle_callback(update["callback_query"]) + # ------------------------------------------------------------------ + # Message handling + # ------------------------------------------------------------------ + async def _handle_message(self, message: dict) -> None: text = message.get("text", "") or message.get("caption", "") user_id = str(message["from"]["id"]) chat_id = str(message["chat"]["id"]) - message_id = str(message["message_id"]) + # Ensure room exists room = self.store.get_room_by_max_chat_id(chat_id) if room is None: agent = self.registry.get_agent_for_user(user_id) - platform_chat_id = self.chat_handler.handle_new( + platform_chat_id = self.max_chat_handler.handle_new( max_chat_id=chat_id, user_id=user_id, agent_id=agent.id, @@ -80,6 +240,7 @@ class MaxSurface: else: agent = self.registry.get_agent_by_id(room.agent_id) + # Handle attachments attachments = [] if "attachment" in message: att = message["attachment"] @@ -98,6 +259,23 @@ class MaxSurface: ) self.store.stage_attachment(chat_id, (workspace_path, att["filename"])) + # File-only message → stage and return + if attachments and not text: + return + + # Merge staged attachments + queued = self.store.pop_attachments(chat_id) + if queued: + for ws_path, filename in queued: + attachments.append( + Attachment( + type="document", + filename=filename, + workspace_path=ws_path, + ) + ) + + # Convert to incoming event incoming = max_message_to_incoming( text=text, user_id=user_id, @@ -105,6 +283,7 @@ class MaxSurface: attachments=attachments, ) + # Surface-level commands if isinstance(incoming, IncomingCommand): response_text = await self._handle_surface_command( incoming, max_chat_id=chat_id, user_id=user_id, agent=agent @@ -113,24 +292,31 @@ class MaxSurface: await self._send_message(chat_id, response_text) return - if isinstance(incoming, IncomingCallback): - await self._call_agent( - agent=agent, - platform_chat_id=room.platform_chat_id, - message=incoming, + # Dispatch to core + try: + outgoing_events = await self.dispatcher.dispatch(incoming) + except PlatformError as exc: + logger.warning( + "max_dispatch_platform_error", + user_id=user_id, + chat_id=chat_id, + code=exc.code, + error=str(exc), ) - return + outgoing_events = [ + OutgoingMessage( + chat_id=room.platform_chat_id, + text="Сервис временно недоступен. Попробуйте ещё раз позже.", + ) + ] - if isinstance(incoming, IncomingMessage) and (incoming.text or attachments): - queued = self.store.pop_attachments(chat_id) - await self._send_typing(chat_id) - agent_response = await self._call_agent( - agent=agent, - platform_chat_id=room.platform_chat_id, - message=incoming, - attachments=queued, - ) - await self._send_message(chat_id, agent_response) + # Send outgoing events back to MAX + for event in outgoing_events: + await self._send_outgoing(chat_id, event, agent.workspace_path) + + # ------------------------------------------------------------------ + # Callbacks + # ------------------------------------------------------------------ async def _handle_callback(self, callback: dict) -> None: user_id = str(callback["from"]["id"]) @@ -150,22 +336,29 @@ class MaxSurface: message_id=message_id, ) - agent = self.registry.get_agent_by_id(room.agent_id) - await self._call_agent( - agent=agent, - platform_chat_id=room.platform_chat_id, - message=incoming, - ) + try: + outgoing_events = await self.dispatcher.dispatch(incoming) + except PlatformError: + return + + for event in outgoing_events: + agent = self.registry.get_agent_by_id(room.agent_id) + ws = agent.workspace_path if agent else "/agents/0" + await self._send_outgoing(chat_id, event, ws) + + # ------------------------------------------------------------------ + # Surface commands + # ------------------------------------------------------------------ async def _handle_surface_command( - self, cmd: IncomingCommand, max_chat_id: str, user_id: str, agent: AgentConfig - ) -> Optional[str]: + self, cmd: IncomingCommand, max_chat_id: str, user_id: str, agent + ) -> str | None: command = cmd.command args = cmd.args if command == "new": name = " ".join(args) if args else None - self.chat_handler.handle_new( + self.max_chat_handler.handle_new( max_chat_id=max_chat_id, user_id=user_id, agent_id=agent.id, @@ -174,17 +367,17 @@ class MaxSurface: return f"New chat created: {name or 'Unnamed'}" elif command == "chats": - return self.chat_handler.handle_chats(user_id) + return self.max_chat_handler.handle_chats(user_id) elif command == "rename": new_name = " ".join(args) if args else "" - return self.chat_handler.handle_rename(max_chat_id, new_name) + return self.max_chat_handler.handle_rename(max_chat_id, new_name) elif command == "archive": - return self.chat_handler.handle_archive(max_chat_id) + return self.max_chat_handler.handle_archive(max_chat_id) elif command in ("clear", "reset"): - return self.chat_handler.handle_clear(max_chat_id) + return self.max_chat_handler.handle_clear(max_chat_id) elif command == "list": return self.attach_handler.handle_list(max_chat_id) @@ -198,35 +391,56 @@ class MaxSurface: return None - async def _call_agent( - self, - agent: AgentConfig, - platform_chat_id: str, - message=None, - attachments: list = None, - ) -> str: - payload = { - "chat_id": platform_chat_id, - "agent_id": agent.id, - } - if message: - if isinstance(message, IncomingMessage): - payload["message"] = message.text - elif isinstance(message, IncomingCallback): - payload["action"] = message.action - payload["payload"] = message.payload - elif isinstance(message, IncomingCommand): - payload["command"] = message.command - payload["args"] = message.args - if attachments: - payload["attachments"] = [a[0] for a in attachments] + # ------------------------------------------------------------------ + # Outgoing to MAX + # ------------------------------------------------------------------ - base = self.agent_base_url or agent.base_url.rstrip("/") - url = f"{base}/chat/{agent.id}" + async def _send_outgoing( + self, max_chat_id: str, event: OutgoingEvent, workspace_path: str + ) -> None: + if isinstance(event, OutgoingTyping): + await self._send_typing(max_chat_id) + return - async with self.session.post(url, json=payload) as resp: - data = await resp.json() - return data.get("response", "") + if isinstance(event, OutgoingNotification): + text = f"[{event.level.upper()}] {event.text}" + await self._send_message(max_chat_id, text) + return + + if isinstance(event, OutgoingMessage): + if event.text: + await self._send_message(max_chat_id, event.text) + + # Upload outgoing files + for att in event.attachments: + if not att.workspace_path: + continue + if self.files.file_exists(att.workspace_path, workspace_path): + # Read file and upload to MAX + file_data = self.files.read_outgoing_file( + att.workspace_path, workspace_path + ) + # MAX file upload logic — зависит от API MAX + # Пока просто отправляем имя файла текстом + await self._send_message( + max_chat_id, + f"[Файл: {att.filename or att.workspace_path}]", + ) + return + + if isinstance(event, OutgoingUI): + lines = [event.text] + if event.buttons: + for btn in event.buttons: + lines.append(f" {btn.label}") + lines.append("") + lines.append("Ответьте !yes для подтверждения или !no для отмены.") + await self._send_message(max_chat_id, "\n".join(lines)) + return + + # ------------------------------------------------------------------ + # Low-level MAX API + # ------------------------------------------------------------------ async def _send_message(self, chat_id: str, text: str) -> None: async with self.session.post( @@ -243,6 +457,10 @@ class MaxSurface: await resp.json() +# --------------------------------------------------------------------------- +# Entry point +# --------------------------------------------------------------------------- + async def main(): surface = MaxSurface() await surface.start() diff --git a/config/max-agents.yaml b/config/max-agents.yaml new file mode 100644 index 0000000..8713355 --- /dev/null +++ b/config/max-agents.yaml @@ -0,0 +1,5 @@ +agents: + - id: agent-0 + label: "Agent 0" + base_url: "http://agent-proxy:7000/agent_0/" + workspace_path: "/agents/0" \ No newline at end of file diff --git a/docker-compose.max.yml b/docker-compose.max.yml new file mode 100644 index 0000000..cd75da1 --- /dev/null +++ b/docker-compose.max.yml @@ -0,0 +1,24 @@ +services: + max-bot: + build: + context: . + target: development + environment: + - MAX_BOT_TOKEN=${MAX_BOT_TOKEN} + - MAX_API_URL=${MAX_API_URL:-https://api.max.ru/v1} + - MAX_AGENT_REGISTRY_PATH=/app/config/max-agents.yaml + - AGENT_BASE_URL=http://platform-agent:8000 + - SURFACES_WORKSPACE_DIR=/agents + volumes: + - surfaces-agents:/agents + command: python -m adapter.max.bot + + platform-agent: + image: platform-agent:latest + environment: + - WORKSPACE_DIR=/workspace + volumes: + - surfaces-agents:/workspace + +volumes: + surfaces-agents: \ No newline at end of file