From eed1533cdc368a096e9524978fe0013b76924320 Mon Sep 17 00:00:00 2001 From: Vladislav Yashnov Date: Wed, 6 May 2026 00:24:47 +0300 Subject: [PATCH 1/5] max first steps --- adapter/max/__init__.py | 1 + adapter/max/agent_registry.py | 50 ++++++ adapter/max/bot.py | 247 ++++++++++++++++++++++++++++ adapter/max/converter.py | 88 ++++++++++ adapter/max/files.py | 51 ++++++ adapter/max/handlers/__init__.py | 1 + adapter/max/handlers/attachments.py | 29 ++++ adapter/max/handlers/chat.py | 48 ++++++ adapter/max/handlers/help.py | 26 +++ adapter/max/store.py | 48 ++++++ 10 files changed, 589 insertions(+) create mode 100644 adapter/max/__init__.py create mode 100644 adapter/max/agent_registry.py create mode 100644 adapter/max/bot.py create mode 100644 adapter/max/converter.py create mode 100644 adapter/max/files.py create mode 100644 adapter/max/handlers/__init__.py create mode 100644 adapter/max/handlers/attachments.py create mode 100644 adapter/max/handlers/chat.py create mode 100644 adapter/max/handlers/help.py create mode 100644 adapter/max/store.py diff --git a/adapter/max/__init__.py b/adapter/max/__init__.py new file mode 100644 index 0000000..c12c8ba --- /dev/null +++ b/adapter/max/__init__.py @@ -0,0 +1 @@ +"""MAX surface adapter.""" \ No newline at end of file diff --git a/adapter/max/agent_registry.py b/adapter/max/agent_registry.py new file mode 100644 index 0000000..a3c2b67 --- /dev/null +++ b/adapter/max/agent_registry.py @@ -0,0 +1,50 @@ +"""Agent registry for MAX surface.""" +import os +import yaml +from typing import List, Optional +from dataclasses import dataclass, field + + +@dataclass +class AgentConfig: + id: str + label: str + base_url: str + workspace_path: str + + +@dataclass +class AgentRegistry: + agents: List[AgentConfig] = field(default_factory=list) + + def get_agent_for_user(self, user_id: str) -> AgentConfig: + return self.agents[0] + + def get_agent_by_id(self, agent_id: str) -> Optional[AgentConfig]: + for agent in self.agents: + if agent.id == agent_id: + return agent + return None + + +def load_agent_registry(path: str) -> AgentRegistry: + with open(path, "r") as f: + data = yaml.safe_load(f) + + registry = AgentRegistry() + for a in data.get("agents", []): + registry.agents.append(AgentConfig( + id=a["id"], + label=a.get("label", ""), + base_url=a["base_url"], + workspace_path=a["workspace_path"], + )) + return registry + + +def load_from_env() -> AgentRegistry: + path = os.environ.get( + "MAX_AGENT_REGISTRY_PATH", + "/app/config/max-agents.yaml", + ) + return load_agent_registry(path) \ No newline at end of file diff --git a/adapter/max/bot.py b/adapter/max/bot.py new file mode 100644 index 0000000..af857f7 --- /dev/null +++ b/adapter/max/bot.py @@ -0,0 +1,247 @@ +"""MAX surface bot runtime.""" +import os +import asyncio +import aiohttp +from typing import Optional + +from adapter.max.agent_registry import load_from_env, AgentRegistry, AgentConfig +from adapter.max.store import ChatStore +from adapter.max.files import FileHandler +from adapter.max.converter import ( + max_message_to_incoming, + max_attachment_to_internal, +) +from adapter.max.handlers.chat import ChatHandler +from adapter.max.handlers.attachments import AttachmentHandler +from adapter.max.handlers.help import get_help +from core.protocol import IncomingMessage, IncomingCommand, IncomingCallback + + +class MaxSurface: + def __init__(self): + self.token = os.environ["MAX_BOT_TOKEN"] + self.api_url = os.environ.get("MAX_API_URL", "https://api.max.ru/v1") + self.workspace_dir = os.environ.get("SURFACES_WORKSPACE_DIR", "/agents") + + self.registry: AgentRegistry = load_from_env() + self.store = ChatStore() + self.files = FileHandler(self.workspace_dir) + self.chat_handler = ChatHandler(self.store) + self.attach_handler = AttachmentHandler(self.store) + self.session: Optional[aiohttp.ClientSession] = None + + async def start(self): + self.session = aiohttp.ClientSession( + headers={"Authorization": f"Bearer {self.token}"} + ) + print("[MAX Surface] Starting long poll...") + offset = 0 + + while True: + try: + updates = await self._get_updates(offset) + for update in updates: + offset = update["update_id"] + 1 + await self._process_update(update) + except Exception as e: + print(f"[MAX Surface] Error: {e}") + await asyncio.sleep(5) + + async def _get_updates(self, offset: int) -> list: + async with self.session.get( + f"{self.api_url}/updates", + params={"offset": offset, "timeout": 30}, + ) as resp: + data = await resp.json() + return data.get("result", []) + + async def _process_update(self, update: dict) -> None: + if "message" in update: + await self._handle_message(update["message"]) + elif "callback_query" in update: + await self._handle_callback(update["callback_query"]) + + async def _handle_message(self, message: dict) -> None: + text = message.get("text", "") or message.get("caption", "") + user_id = str(message["from"]["id"]) + chat_id = str(message["chat"]["id"]) + message_id = str(message["message_id"]) + + room = self.store.get_room_by_max_chat_id(chat_id) + if room is None: + agent = self.registry.get_agent_for_user(user_id) + platform_chat_id = self.chat_handler.handle_new( + max_chat_id=chat_id, + user_id=user_id, + agent_id=agent.id, + ) + room = self.store.get_room_by_max_chat_id(chat_id) + else: + agent = self.registry.get_agent_by_id(room.agent_id) + + attachments = [] + if "attachment" in message: + att = message["attachment"] + internal_att = max_attachment_to_internal( + filename=att["filename"], + mime_type=att.get("mime_type", "application/octet-stream"), + download_url=att["download_url"], + ) + attachments.append(internal_att) + + workspace_path = await self.files.download_attachment( + download_url=att["download_url"], + filename=att["filename"], + agent_workspace=agent.workspace_path, + headers={"Authorization": f"Bearer {self.token}"}, + ) + self.store.stage_attachment(chat_id, (workspace_path, att["filename"])) + + incoming = max_message_to_incoming( + text=text, + user_id=user_id, + chat_id=room.platform_chat_id, + attachments=attachments, + ) + + if isinstance(incoming, IncomingCommand): + response_text = await self._handle_surface_command( + incoming, max_chat_id=chat_id, user_id=user_id, agent=agent + ) + if response_text: + await self._send_message(chat_id, response_text) + return + + if isinstance(incoming, IncomingCallback): + await self._call_agent( + agent=agent, + platform_chat_id=room.platform_chat_id, + message=incoming, + ) + return + + if isinstance(incoming, IncomingMessage) and (incoming.text or attachments): + queued = self.store.pop_attachments(chat_id) + await self._send_typing(chat_id) + agent_response = await self._call_agent( + agent=agent, + platform_chat_id=room.platform_chat_id, + message=incoming, + attachments=queued, + ) + await self._send_message(chat_id, agent_response) + + async def _handle_callback(self, callback: dict) -> None: + user_id = str(callback["from"]["id"]) + chat_id = str(callback["message"]["chat"]["id"]) + message_id = str(callback["message"]["message_id"]) + data = callback.get("data", "") + + room = self.store.get_room_by_max_chat_id(chat_id) + if room is None: + return + + incoming = max_message_to_incoming( + text="", + user_id=user_id, + chat_id=room.platform_chat_id, + callback_data=data, + message_id=message_id, + ) + + agent = self.registry.get_agent_by_id(room.agent_id) + await self._call_agent( + agent=agent, + platform_chat_id=room.platform_chat_id, + message=incoming, + ) + + async def _handle_surface_command( + self, cmd: IncomingCommand, max_chat_id: str, user_id: str, agent: AgentConfig + ) -> Optional[str]: + command = cmd.command + args = cmd.args + + if command == "new": + name = " ".join(args) if args else None + self.chat_handler.handle_new( + max_chat_id=max_chat_id, + user_id=user_id, + agent_id=agent.id, + name=name, + ) + return f"New chat created: {name or 'Unnamed'}" + + elif command == "chats": + return self.chat_handler.handle_chats(user_id) + + elif command == "rename": + return self.chat_handler.handle_rename(max_chat_id, args) + + elif command == "archive": + return self.chat_handler.handle_archive(max_chat_id) + + elif command in ("clear", "reset"): + return self.chat_handler.handle_clear(max_chat_id) + + elif command == "list": + return self.attach_handler.handle_list(max_chat_id) + + elif command == "remove": + return self.attach_handler.handle_remove(max_chat_id, args) + + elif command == "help": + return get_help() + + return None + + async def _call_agent( + self, + agent: AgentConfig, + platform_chat_id: str, + message=None, + attachments: list = None, + ) -> str: + payload = { + "chat_id": platform_chat_id, + "agent_id": agent.id, + } + if message: + if isinstance(message, IncomingMessage): + payload["message"] = message.text + elif isinstance(message, IncomingCallback): + payload["action"] = message.action + payload["payload"] = message.payload + elif isinstance(message, IncomingCommand): + payload["command"] = message.command + payload["args"] = message.args + if attachments: + payload["attachments"] = [a[0] for a in attachments] + + url = f"{agent.base_url}/chat/{agent.id}" + async with self.session.post(url, json=payload) as resp: + data = await resp.json() + return data.get("response", "") + + async def _send_message(self, chat_id: str, text: str) -> None: + async with self.session.post( + f"{self.api_url}/sendMessage", + json={"chat_id": chat_id, "text": text}, + ) as resp: + await resp.json() + + async def _send_typing(self, chat_id: str) -> None: + async with self.session.post( + f"{self.api_url}/sendChatAction", + json={"chat_id": chat_id, "action": "typing"}, + ) as resp: + await resp.json() + + +async def main(): + surface = MaxSurface() + await surface.start() + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/adapter/max/converter.py b/adapter/max/converter.py new file mode 100644 index 0000000..2bbfd8b --- /dev/null +++ b/adapter/max/converter.py @@ -0,0 +1,88 @@ +"""MAX event to internal protocol converter.""" +from typing import Union, List +from core.protocol import ( + IncomingMessage, + IncomingCommand, + IncomingCallback, + Attachment, +) + + +def _extract_command(text: str) -> Union[IncomingCommand, IncomingCallback, None]: + if not text.startswith("!"): + return None + + parts = text.strip().split(maxsplit=1) + cmd = parts[0].lower() + args = parts[1] if len(parts) > 1 else "" + + if cmd == "!yes": + return IncomingCallback( + user_id="", + platform="max", + chat_id="", + action="confirm", + ) + elif cmd == "!no": + return IncomingCallback( + user_id="", + platform="max", + chat_id="", + action="cancel", + ) + else: + return IncomingCommand( + user_id="", + platform="max", + chat_id="", + command=cmd.lstrip("!"), + args=args.split() if args else [], + ) + + +def max_message_to_incoming( + *, + text: str, + user_id: str, + chat_id: str, + attachments: List[Attachment] = None, + callback_data: str = None, + message_id: str = None, +) -> Union[IncomingMessage, IncomingCommand, IncomingCallback]: + if callback_data: + return IncomingCallback( + user_id=user_id, + platform="max", + chat_id=chat_id, + action=callback_data, + payload={"message_id": message_id} if message_id else {}, + ) + + if text: + cmd = _extract_command(text) + if cmd is not None: + cmd.user_id = user_id + cmd.chat_id = chat_id + return cmd + + return IncomingMessage( + user_id=user_id, + platform="max", + chat_id=chat_id, + text=text or "", + attachments=attachments or [], + ) + + +def max_attachment_to_internal( + *, + filename: str, + mime_type: str, + download_url: str, +) -> Attachment: + return Attachment( + type="document", + url=download_url, + filename=filename, + mime_type=mime_type, + ) \ No newline at end of file diff --git a/adapter/max/files.py b/adapter/max/files.py new file mode 100644 index 0000000..58b87fb --- /dev/null +++ b/adapter/max/files.py @@ -0,0 +1,51 @@ +"""File handling for MAX surface.""" +import os +import aiohttp +from pathlib import Path + + +class FileHandler: + def __init__(self, workspace_root: str): + self.workspace_root = workspace_root + + def _make_unique_filename(self, directory: str, filename: str) -> str: + base = Path(filename).stem + ext = Path(filename).suffix + candidate = filename + counter = 1 + while os.path.exists(os.path.join(directory, candidate)): + candidate = f"{base} ({counter}){ext}" + counter += 1 + return candidate + + async def download_attachment( + self, + download_url: str, + filename: str, + agent_workspace: str, + headers: dict = None, + ) -> str: + full_dir = os.path.join(self.workspace_root, agent_workspace.strip("/")) + os.makedirs(full_dir, exist_ok=True) + + unique_name = self._make_unique_filename(full_dir, filename) + filepath = os.path.join(full_dir, unique_name) + + async with aiohttp.ClientSession() as session: + async with session.get(download_url, headers=headers) as resp: + resp.raise_for_status() + with open(filepath, "wb") as f: + f.write(await resp.read()) + + return unique_name + + def read_outgoing_file(self, workspace_path: str, agent_workspace: str) -> bytes: + full_dir = os.path.join(self.workspace_root, agent_workspace.strip("/")) + filepath = os.path.join(full_dir, workspace_path.lstrip("/")) + with open(filepath, "rb") as f: + return f.read() + + def file_exists(self, workspace_path: str, agent_workspace: str) -> bool: + full_dir = os.path.join(self.workspace_root, agent_workspace.strip("/")) + filepath = os.path.join(full_dir, workspace_path.lstrip("/")) + return os.path.exists(filepath) \ No newline at end of file diff --git a/adapter/max/handlers/__init__.py b/adapter/max/handlers/__init__.py new file mode 100644 index 0000000..3355924 --- /dev/null +++ b/adapter/max/handlers/__init__.py @@ -0,0 +1 @@ +"""MAX surface handlers.""" \ No newline at end of file diff --git a/adapter/max/handlers/attachments.py b/adapter/max/handlers/attachments.py new file mode 100644 index 0000000..c4d0da5 --- /dev/null +++ b/adapter/max/handlers/attachments.py @@ -0,0 +1,29 @@ +"""Attachment queue handlers for MAX surface.""" +from adapter.max.store import ChatStore + + +class AttachmentHandler: + def __init__(self, store: ChatStore): + self.store = store + + def handle_list(self, max_chat_id: str) -> str: + attachments = self.store.get_attachments(max_chat_id) + if not attachments: + return "Attachment queue is empty." + lines = [f" {i+1}. {name}" for i, (_, name) in enumerate(attachments)] + return "\n".join(lines) + + def handle_remove(self, max_chat_id: str, index: str) -> str: + attachments = self.store.staged_attachments.get(max_chat_id, []) + if index.lower() == "all": + self.store.staged_attachments[max_chat_id] = [] + return "All attachments removed from queue." + + try: + idx = int(index) - 1 + if 0 <= idx < len(attachments): + removed = attachments.pop(idx) + return f"Removed: {removed[1]}" + return "Invalid index." + except ValueError: + return "Usage: !remove or !remove all" \ No newline at end of file diff --git a/adapter/max/handlers/chat.py b/adapter/max/handlers/chat.py new file mode 100644 index 0000000..2933d5f --- /dev/null +++ b/adapter/max/handlers/chat.py @@ -0,0 +1,48 @@ +"""Chat management handlers for MAX surface.""" +import uuid +from adapter.max.store import ChatStore, RoomMeta + + +class ChatHandler: + def __init__(self, store: ChatStore): + self.store = store + + def handle_new(self, max_chat_id: str, user_id: str, agent_id: str, name: str = None) -> str: + platform_chat_id = str(uuid.uuid4()) + room = RoomMeta( + platform_chat_id=platform_chat_id, + max_chat_id=max_chat_id, + name=name or "New Chat", + user_id=user_id, + agent_id=agent_id, + ) + self.store.add_room(room) + return platform_chat_id + + def handle_chats(self, user_id: str) -> str: + rooms = self.store.list_rooms_for_user(user_id) + if not rooms: + return "No active chats." + lines = [f" {i+1}. {r.name}" for i, r in enumerate(rooms)] + return "\n".join(lines) + + def handle_rename(self, max_chat_id: str, new_name: str) -> str: + room = self.store.get_room_by_max_chat_id(max_chat_id) + if not room: + return "Chat not found." + room.name = new_name + return f"Chat renamed to: {new_name}" + + def handle_archive(self, max_chat_id: str) -> str: + room = self.store.get_room_by_max_chat_id(max_chat_id) + if not room: + return "Chat not found." + self.store.remove_room(max_chat_id) + return "Chat archived." + + def handle_clear(self, max_chat_id: str) -> str: + room = self.store.get_room_by_max_chat_id(max_chat_id) + if not room: + return "Chat not found." + room.platform_chat_id = str(uuid.uuid4()) + return "Chat context cleared." \ No newline at end of file diff --git a/adapter/max/handlers/help.py b/adapter/max/handlers/help.py new file mode 100644 index 0000000..24181c2 --- /dev/null +++ b/adapter/max/handlers/help.py @@ -0,0 +1,26 @@ +"""Help handler for MAX surface.""" + +HELP_TEXT = """ +Available commands: + +Chat management: + !new [name] — Create a new chat + !chats — List active chats + !rename — Rename current chat + !archive — Archive current chat + !clear / !reset — Reset chat context + +Attachments: + !list — Show attachment queue + !remove — Remove attachment from queue + !remove all — Clear attachment queue + +Actions: + !yes — Confirm agent action + !no — Cancel agent action + !help — Show this help +""" + + +def get_help() -> str: + return HELP_TEXT.strip() \ No newline at end of file diff --git a/adapter/max/store.py b/adapter/max/store.py new file mode 100644 index 0000000..d8e6ce6 --- /dev/null +++ b/adapter/max/store.py @@ -0,0 +1,48 @@ +"""Chat store for MAX surface.""" +from dataclasses import dataclass, field +from typing import Dict, Optional + + +@dataclass +class RoomMeta: + platform_chat_id: str + max_chat_id: str + name: str + user_id: str + agent_id: str + + +@dataclass +class ChatStore: + rooms: Dict[str, RoomMeta] = field(default_factory=dict) + staged_attachments: Dict[str, list] = field(default_factory=dict) + + def get_room_by_max_chat_id(self, max_chat_id: str) -> Optional[RoomMeta]: + return self.rooms.get(max_chat_id) + + def get_room_by_platform_chat_id(self, platform_chat_id: str) -> Optional[RoomMeta]: + for room in self.rooms.values(): + if room.platform_chat_id == platform_chat_id: + return room + return None + + def add_room(self, room: RoomMeta) -> None: + self.rooms[room.max_chat_id] = room + + def remove_room(self, max_chat_id: str) -> None: + self.rooms.pop(max_chat_id, None) + self.staged_attachments.pop(max_chat_id, None) + + def list_rooms_for_user(self, user_id: str) -> list: + return [r for r in self.rooms.values() if r.user_id == user_id] + + def stage_attachment(self, max_chat_id: str, attachment: tuple) -> None: + if max_chat_id not in self.staged_attachments: + self.staged_attachments[max_chat_id] = [] + self.staged_attachments[max_chat_id].append(attachment) + + def pop_attachments(self, max_chat_id: str) -> list: + return self.staged_attachments.pop(max_chat_id, []) + + def get_attachments(self, max_chat_id: str) -> list: + return self.staged_attachments.get(max_chat_id, []) \ No newline at end of file From 3118b3c99a8d508a7138794c39019d9bfcdc1b97 Mon Sep 17 00:00:00 2001 From: Vladislav Yashnov Date: Wed, 6 May 2026 00:35:17 +0300 Subject: [PATCH 2/5] bot fix --- adapter/max/bot.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/adapter/max/bot.py b/adapter/max/bot.py index af857f7..2e539b1 100644 --- a/adapter/max/bot.py +++ b/adapter/max/bot.py @@ -22,6 +22,7 @@ class MaxSurface: self.token = os.environ["MAX_BOT_TOKEN"] self.api_url = os.environ.get("MAX_API_URL", "https://api.max.ru/v1") self.workspace_dir = os.environ.get("SURFACES_WORKSPACE_DIR", "/agents") + self.agent_base_url = os.environ.get("AGENT_BASE_URL", "") self.registry: AgentRegistry = load_from_env() self.store = ChatStore() @@ -176,7 +177,8 @@ class MaxSurface: return self.chat_handler.handle_chats(user_id) elif command == "rename": - return self.chat_handler.handle_rename(max_chat_id, args) + new_name = " ".join(args) if args else "" + return self.chat_handler.handle_rename(max_chat_id, new_name) elif command == "archive": return self.chat_handler.handle_archive(max_chat_id) @@ -188,7 +190,8 @@ class MaxSurface: return self.attach_handler.handle_list(max_chat_id) elif command == "remove": - return self.attach_handler.handle_remove(max_chat_id, args) + idx = args[0] if args else "" + return self.attach_handler.handle_remove(max_chat_id, idx) elif command == "help": return get_help() @@ -218,7 +221,9 @@ class MaxSurface: if attachments: payload["attachments"] = [a[0] for a in attachments] - url = f"{agent.base_url}/chat/{agent.id}" + base = self.agent_base_url or agent.base_url.rstrip("/") + url = f"{base}/chat/{agent.id}" + async with self.session.post(url, json=payload) as resp: data = await resp.json() return data.get("response", "") From 7abbaf7e7a43d82c4d11248b6fc4eb9101a49f95 Mon Sep 17 00:00:00 2001 From: Vladislav Yashnov Date: Wed, 6 May 2026 15:16:22 +0300 Subject: [PATCH 3/5] max bot upd, max-agents.yaml --- .env.example | 5 + adapter/max/bot.py | 356 +++++++++++++++++++++++++++++++++-------- config/max-agents.yaml | 5 + docker-compose.max.yml | 24 +++ 4 files changed, 321 insertions(+), 69 deletions(-) create mode 100644 config/max-agents.yaml create mode 100644 docker-compose.max.yml diff --git a/.env.example b/.env.example index cc5f2e0..ebc56e4 100644 --- a/.env.example +++ b/.env.example @@ -30,3 +30,8 @@ SURFACES_WORKSPACE_DIR=/agents # Docker volume names (created automatically on first run) SURFACES_SHARED_VOLUME=surfaces-agents SURFACES_BOT_STATE_VOLUME=surfaces-bot-state + +# MAX Surface +MAX_BOT_TOKEN=real_max_token +MAX_API_URL=https://api.max.ru/v1 +MAX_AGENT_REGISTRY_PATH=/app/config/max-agents.yaml \ No newline at end of file diff --git a/adapter/max/bot.py b/adapter/max/bot.py index 2e539b1..ffb7f6e 100644 --- a/adapter/max/bot.py +++ b/adapter/max/bot.py @@ -1,41 +1,197 @@ """MAX surface bot runtime.""" +from __future__ import annotations + import os import asyncio -import aiohttp -from typing import Optional +import logging +from pathlib import Path -from adapter.max.agent_registry import load_from_env, AgentRegistry, AgentConfig -from adapter.max.store import ChatStore -from adapter.max.files import FileHandler +import aiohttp +import structlog + +from adapter.max.agent_registry import load_from_env, AgentRegistry from adapter.max.converter import ( max_message_to_incoming, max_attachment_to_internal, ) -from adapter.max.handlers.chat import ChatHandler +from adapter.max.files import FileHandler +from adapter.max.handlers.chat import ChatHandler as MaxChatHandler from adapter.max.handlers.attachments import AttachmentHandler from adapter.max.handlers.help import get_help -from core.protocol import IncomingMessage, IncomingCommand, IncomingCallback +from adapter.max.store import ChatStore, RoomMeta +from core.chat import ChatManager +from core.auth import AuthManager +from core.handler import EventDispatcher +from core.protocol import ( + Attachment, + IncomingMessage, + IncomingCommand, + IncomingCallback, + OutgoingEvent, + OutgoingMessage, + OutgoingNotification, + OutgoingTyping, + OutgoingUI, +) +from core.settings import SettingsManager +from core.store import InMemoryStore, StateStore + +from sdk.interface import ( + MessageChunk, + MessageResponse, + PlatformClient, + PlatformError, + User, + UserSettings, +) +from sdk.real import RealPlatformClient + +logger = structlog.get_logger(__name__) + + +# --------------------------------------------------------------------------- +# Routed MAX platform client — копия логики RoutedPlatformClient из Matrix +# --------------------------------------------------------------------------- + +class RoutedMaxPlatformClient(PlatformClient): + """Маршрутизирует запросы к нужному агенту на основе chat_id.""" + + def __init__(self, *, store: ChatStore, delegates: dict[str, PlatformClient]): + if not delegates: + raise ValueError("RoutedMaxPlatformClient requires at least one delegate") + self._store = store + self._delegates = dict(delegates) + self._default_client = next(iter(self._delegates.values())) + + async def get_or_create_user( + self, external_id: str, platform: str, display_name: str | None = None + ) -> User: + return await self._default_client.get_or_create_user( + external_id=external_id, platform=platform, display_name=display_name + ) + + async def send_message( + self, + user_id: str, + chat_id: str, + text: str, + attachments: list[Attachment] | None = None, + ) -> MessageResponse: + delegate, platform_chat_id = await self._resolve_delegate(user_id, chat_id) + return await delegate.send_message(user_id, platform_chat_id, text, attachments) + + async def stream_message( + self, + user_id: str, + chat_id: str, + text: str, + attachments: list[Attachment] | None = None, + ): + delegate, platform_chat_id = await self._resolve_delegate(user_id, chat_id) + async for chunk in delegate.stream_message(user_id, platform_chat_id, text, attachments): + yield chunk + + async def get_settings(self, user_id: str) -> UserSettings: + return await self._default_client.get_settings(user_id) + + async def update_settings(self, user_id: str, action) -> None: + await self._default_client.update_settings(user_id, action) + + async def close(self) -> None: + for delegate in self._delegates.values(): + close_fn = getattr(delegate, "close", None) + if callable(close_fn): + await close_fn() + + async def _resolve_delegate( + self, user_id: str, local_chat_id: str + ) -> tuple[PlatformClient, str]: + room = self._store.get_room_by_platform_chat_id(local_chat_id) + if room is None: + raise PlatformError(f"unknown chat id: {local_chat_id}", code="CHAT_NOT_FOUND") + + agent_id = room.agent_id + platform_chat_id = room.platform_chat_id + + if not agent_id or not platform_chat_id: + raise PlatformError( + f"routing incomplete for chat: {local_chat_id}", code="ROUTE_INCOMPLETE" + ) + + delegate = self._delegates.get(str(agent_id)) + if delegate is None: + raise PlatformError(f"unknown agent id: {agent_id}", code="AGENT_NOT_FOUND") + + return delegate, str(platform_chat_id) + + +# --------------------------------------------------------------------------- +# MAX Surface +# --------------------------------------------------------------------------- class MaxSurface: def __init__(self): + # Env self.token = os.environ["MAX_BOT_TOKEN"] self.api_url = os.environ.get("MAX_API_URL", "https://api.max.ru/v1") self.workspace_dir = os.environ.get("SURFACES_WORKSPACE_DIR", "/agents") self.agent_base_url = os.environ.get("AGENT_BASE_URL", "") + # Registry self.registry: AgentRegistry = load_from_env() + + # MAX-specific store for chat ↔ agent mapping self.store = ChatStore() self.files = FileHandler(self.workspace_dir) - self.chat_handler = ChatHandler(self.store) + self.max_chat_handler = MaxChatHandler(self.store) self.attach_handler = AttachmentHandler(self.store) - self.session: Optional[aiohttp.ClientSession] = None + + # Core store (in-memory, lost on restart — OK for MVP) + self.core_store: StateStore = InMemoryStore() + + # Platform client per agent + delegates: dict[str, PlatformClient] = {} + for agent in self.registry.agents: + base = self.agent_base_url or agent.base_url.rstrip("/") + delegates[agent.id] = RealPlatformClient( + agent_id=agent.id, + agent_base_url=base, + prototype_state=None, + platform="max", + ) + + # Routed platform + self.platform = RoutedMaxPlatformClient( + store=self.store, + delegates=delegates, + ) + + # Core managers + self.chat_mgr = ChatManager(self.platform, self.core_store) + self.auth_mgr = AuthManager(self.platform, self.core_store) + self.settings_mgr = SettingsManager(self.platform, self.core_store) + + # Event dispatcher — это и есть "ядро" + self.dispatcher = EventDispatcher( + platform=self.platform, + chat_mgr=self.chat_mgr, + auth_mgr=self.auth_mgr, + settings_mgr=self.settings_mgr, + ) + + # HTTP session for MAX API + self.session: aiohttp.ClientSession | None = None + + # ------------------------------------------------------------------ + # Long polling + # ------------------------------------------------------------------ async def start(self): self.session = aiohttp.ClientSession( headers={"Authorization": f"Bearer {self.token}"} ) - print("[MAX Surface] Starting long poll...") + logger.info("max_surface_starting", api_url=self.api_url) offset = 0 while True: @@ -45,7 +201,7 @@ class MaxSurface: offset = update["update_id"] + 1 await self._process_update(update) except Exception as e: - print(f"[MAX Surface] Error: {e}") + logger.error("max_poll_error", error=str(e)) await asyncio.sleep(5) async def _get_updates(self, offset: int) -> list: @@ -62,16 +218,20 @@ class MaxSurface: elif "callback_query" in update: await self._handle_callback(update["callback_query"]) + # ------------------------------------------------------------------ + # Message handling + # ------------------------------------------------------------------ + async def _handle_message(self, message: dict) -> None: text = message.get("text", "") or message.get("caption", "") user_id = str(message["from"]["id"]) chat_id = str(message["chat"]["id"]) - message_id = str(message["message_id"]) + # Ensure room exists room = self.store.get_room_by_max_chat_id(chat_id) if room is None: agent = self.registry.get_agent_for_user(user_id) - platform_chat_id = self.chat_handler.handle_new( + platform_chat_id = self.max_chat_handler.handle_new( max_chat_id=chat_id, user_id=user_id, agent_id=agent.id, @@ -80,6 +240,7 @@ class MaxSurface: else: agent = self.registry.get_agent_by_id(room.agent_id) + # Handle attachments attachments = [] if "attachment" in message: att = message["attachment"] @@ -98,6 +259,23 @@ class MaxSurface: ) self.store.stage_attachment(chat_id, (workspace_path, att["filename"])) + # File-only message → stage and return + if attachments and not text: + return + + # Merge staged attachments + queued = self.store.pop_attachments(chat_id) + if queued: + for ws_path, filename in queued: + attachments.append( + Attachment( + type="document", + filename=filename, + workspace_path=ws_path, + ) + ) + + # Convert to incoming event incoming = max_message_to_incoming( text=text, user_id=user_id, @@ -105,6 +283,7 @@ class MaxSurface: attachments=attachments, ) + # Surface-level commands if isinstance(incoming, IncomingCommand): response_text = await self._handle_surface_command( incoming, max_chat_id=chat_id, user_id=user_id, agent=agent @@ -113,24 +292,31 @@ class MaxSurface: await self._send_message(chat_id, response_text) return - if isinstance(incoming, IncomingCallback): - await self._call_agent( - agent=agent, - platform_chat_id=room.platform_chat_id, - message=incoming, + # Dispatch to core + try: + outgoing_events = await self.dispatcher.dispatch(incoming) + except PlatformError as exc: + logger.warning( + "max_dispatch_platform_error", + user_id=user_id, + chat_id=chat_id, + code=exc.code, + error=str(exc), ) - return + outgoing_events = [ + OutgoingMessage( + chat_id=room.platform_chat_id, + text="Сервис временно недоступен. Попробуйте ещё раз позже.", + ) + ] - if isinstance(incoming, IncomingMessage) and (incoming.text or attachments): - queued = self.store.pop_attachments(chat_id) - await self._send_typing(chat_id) - agent_response = await self._call_agent( - agent=agent, - platform_chat_id=room.platform_chat_id, - message=incoming, - attachments=queued, - ) - await self._send_message(chat_id, agent_response) + # Send outgoing events back to MAX + for event in outgoing_events: + await self._send_outgoing(chat_id, event, agent.workspace_path) + + # ------------------------------------------------------------------ + # Callbacks + # ------------------------------------------------------------------ async def _handle_callback(self, callback: dict) -> None: user_id = str(callback["from"]["id"]) @@ -150,22 +336,29 @@ class MaxSurface: message_id=message_id, ) - agent = self.registry.get_agent_by_id(room.agent_id) - await self._call_agent( - agent=agent, - platform_chat_id=room.platform_chat_id, - message=incoming, - ) + try: + outgoing_events = await self.dispatcher.dispatch(incoming) + except PlatformError: + return + + for event in outgoing_events: + agent = self.registry.get_agent_by_id(room.agent_id) + ws = agent.workspace_path if agent else "/agents/0" + await self._send_outgoing(chat_id, event, ws) + + # ------------------------------------------------------------------ + # Surface commands + # ------------------------------------------------------------------ async def _handle_surface_command( - self, cmd: IncomingCommand, max_chat_id: str, user_id: str, agent: AgentConfig - ) -> Optional[str]: + self, cmd: IncomingCommand, max_chat_id: str, user_id: str, agent + ) -> str | None: command = cmd.command args = cmd.args if command == "new": name = " ".join(args) if args else None - self.chat_handler.handle_new( + self.max_chat_handler.handle_new( max_chat_id=max_chat_id, user_id=user_id, agent_id=agent.id, @@ -174,17 +367,17 @@ class MaxSurface: return f"New chat created: {name or 'Unnamed'}" elif command == "chats": - return self.chat_handler.handle_chats(user_id) + return self.max_chat_handler.handle_chats(user_id) elif command == "rename": new_name = " ".join(args) if args else "" - return self.chat_handler.handle_rename(max_chat_id, new_name) + return self.max_chat_handler.handle_rename(max_chat_id, new_name) elif command == "archive": - return self.chat_handler.handle_archive(max_chat_id) + return self.max_chat_handler.handle_archive(max_chat_id) elif command in ("clear", "reset"): - return self.chat_handler.handle_clear(max_chat_id) + return self.max_chat_handler.handle_clear(max_chat_id) elif command == "list": return self.attach_handler.handle_list(max_chat_id) @@ -198,35 +391,56 @@ class MaxSurface: return None - async def _call_agent( - self, - agent: AgentConfig, - platform_chat_id: str, - message=None, - attachments: list = None, - ) -> str: - payload = { - "chat_id": platform_chat_id, - "agent_id": agent.id, - } - if message: - if isinstance(message, IncomingMessage): - payload["message"] = message.text - elif isinstance(message, IncomingCallback): - payload["action"] = message.action - payload["payload"] = message.payload - elif isinstance(message, IncomingCommand): - payload["command"] = message.command - payload["args"] = message.args - if attachments: - payload["attachments"] = [a[0] for a in attachments] + # ------------------------------------------------------------------ + # Outgoing to MAX + # ------------------------------------------------------------------ - base = self.agent_base_url or agent.base_url.rstrip("/") - url = f"{base}/chat/{agent.id}" + async def _send_outgoing( + self, max_chat_id: str, event: OutgoingEvent, workspace_path: str + ) -> None: + if isinstance(event, OutgoingTyping): + await self._send_typing(max_chat_id) + return - async with self.session.post(url, json=payload) as resp: - data = await resp.json() - return data.get("response", "") + if isinstance(event, OutgoingNotification): + text = f"[{event.level.upper()}] {event.text}" + await self._send_message(max_chat_id, text) + return + + if isinstance(event, OutgoingMessage): + if event.text: + await self._send_message(max_chat_id, event.text) + + # Upload outgoing files + for att in event.attachments: + if not att.workspace_path: + continue + if self.files.file_exists(att.workspace_path, workspace_path): + # Read file and upload to MAX + file_data = self.files.read_outgoing_file( + att.workspace_path, workspace_path + ) + # MAX file upload logic — зависит от API MAX + # Пока просто отправляем имя файла текстом + await self._send_message( + max_chat_id, + f"[Файл: {att.filename or att.workspace_path}]", + ) + return + + if isinstance(event, OutgoingUI): + lines = [event.text] + if event.buttons: + for btn in event.buttons: + lines.append(f" {btn.label}") + lines.append("") + lines.append("Ответьте !yes для подтверждения или !no для отмены.") + await self._send_message(max_chat_id, "\n".join(lines)) + return + + # ------------------------------------------------------------------ + # Low-level MAX API + # ------------------------------------------------------------------ async def _send_message(self, chat_id: str, text: str) -> None: async with self.session.post( @@ -243,6 +457,10 @@ class MaxSurface: await resp.json() +# --------------------------------------------------------------------------- +# Entry point +# --------------------------------------------------------------------------- + async def main(): surface = MaxSurface() await surface.start() diff --git a/config/max-agents.yaml b/config/max-agents.yaml new file mode 100644 index 0000000..8713355 --- /dev/null +++ b/config/max-agents.yaml @@ -0,0 +1,5 @@ +agents: + - id: agent-0 + label: "Agent 0" + base_url: "http://agent-proxy:7000/agent_0/" + workspace_path: "/agents/0" \ No newline at end of file diff --git a/docker-compose.max.yml b/docker-compose.max.yml new file mode 100644 index 0000000..cd75da1 --- /dev/null +++ b/docker-compose.max.yml @@ -0,0 +1,24 @@ +services: + max-bot: + build: + context: . + target: development + environment: + - MAX_BOT_TOKEN=${MAX_BOT_TOKEN} + - MAX_API_URL=${MAX_API_URL:-https://api.max.ru/v1} + - MAX_AGENT_REGISTRY_PATH=/app/config/max-agents.yaml + - AGENT_BASE_URL=http://platform-agent:8000 + - SURFACES_WORKSPACE_DIR=/agents + volumes: + - surfaces-agents:/agents + command: python -m adapter.max.bot + + platform-agent: + image: platform-agent:latest + environment: + - WORKSPACE_DIR=/workspace + volumes: + - surfaces-agents:/workspace + +volumes: + surfaces-agents: \ No newline at end of file From 2ad1438e1cede9f2f66bfdea14b6d5ee3a4adef6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=90=D0=BB=D0=B5=D0=BA=D1=81=D0=B0=D0=BD=D0=B4=D1=80?= =?UTF-8?q?=D0=B0=20=D0=9F=D1=80=D0=BE=D0=BD=D0=B8=D0=BD=D0=B0?= Date: Fri, 15 May 2026 10:22:43 +0300 Subject: [PATCH 4/5] fix max-bot, add tests --- adapter/max/agent_registry.py | 161 ++++- adapter/max/api_client.py | 153 +++++ adapter/max/bot.py | 776 ++++++++++++++--------- adapter/max/converter.py | 209 +++--- adapter/max/files.py | 119 ++-- adapter/max/handlers/__init__.py | 6 +- adapter/max/handlers/attachments.py | 10 +- adapter/max/handlers/chat.py | 25 +- adapter/max/handlers/commands.py | 112 ++++ adapter/max/handlers/help.py | 34 +- adapter/max/store.py | 1 + tests/adapter/max/__init__.py | 1 + tests/adapter/max/test_agent_registry.py | 88 +++ tests/adapter/max/test_api_client.py | 90 +++ tests/adapter/max/test_converter.py | 154 +++++ tests/adapter/max/test_dispatcher_max.py | 98 +++ tests/adapter/max/test_store.py | 78 +++ 17 files changed, 1621 insertions(+), 494 deletions(-) create mode 100644 adapter/max/api_client.py create mode 100644 adapter/max/handlers/commands.py create mode 100644 tests/adapter/max/__init__.py create mode 100644 tests/adapter/max/test_agent_registry.py create mode 100644 tests/adapter/max/test_api_client.py create mode 100644 tests/adapter/max/test_converter.py create mode 100644 tests/adapter/max/test_dispatcher_max.py create mode 100644 tests/adapter/max/test_store.py diff --git a/adapter/max/agent_registry.py b/adapter/max/agent_registry.py index a3c2b67..cc3c0af 100644 --- a/adapter/max/agent_registry.py +++ b/adapter/max/agent_registry.py @@ -1,50 +1,141 @@ -"""Agent registry for MAX surface.""" -import os -import yaml -from typing import List, Optional +from __future__ import annotations + +from collections.abc import Mapping from dataclasses import dataclass, field +from pathlib import Path +from typing import Literal + +import yaml -@dataclass -class AgentConfig: - id: str +class AgentRegistryError(ValueError): + pass + + +@dataclass(frozen=True) +class AgentDefinition: + agent_id: str label: str - base_url: str - workspace_path: str + base_url: str = field(default="") + workspace_path: str = field(default="") + + +@dataclass(frozen=True) +class AgentAssignment: + agent_id: str | None + source: Literal["configured", "default", "none"] + + @property + def is_default(self) -> bool: + return self.source == "default" -@dataclass class AgentRegistry: - agents: List[AgentConfig] = field(default_factory=list) + """Same contract as Matrix agent registry: user_agents maps MAX user_id string -> agent_id.""" - def get_agent_for_user(self, user_id: str) -> AgentConfig: - return self.agents[0] + def __init__( + self, + agents: list[AgentDefinition], + user_agents: Mapping[str, str] | None = None, + ) -> None: + self.agents = tuple(agents) + self._by_id = {agent.agent_id: agent for agent in self.agents} + self._user_agents: dict[str, str] = dict(user_agents or {}) - def get_agent_by_id(self, agent_id: str) -> Optional[AgentConfig]: - for agent in self.agents: - if agent.id == agent_id: - return agent - return None + def get(self, agent_id: str) -> AgentDefinition: + try: + return self._by_id[agent_id] + except KeyError as exc: + raise AgentRegistryError(f"unknown agent id: {agent_id}") from exc + + def get_agent_id_for_user(self, max_user_id: str) -> str | None: + return self._user_agents.get(max_user_id) + + def resolve_agent_for_user(self, max_user_id: str) -> AgentAssignment: + agent_id = self.get_agent_id_for_user(max_user_id) + if agent_id is not None: + return AgentAssignment(agent_id=agent_id, source="configured") + if self.agents: + return AgentAssignment(agent_id=self.agents[0].agent_id, source="default") + return AgentAssignment(agent_id=None, source="none") -def load_agent_registry(path: str) -> AgentRegistry: - with open(path, "r") as f: - data = yaml.safe_load(f) +def _required_text(entry: Mapping[str, object], key: str) -> str: + value = entry.get(key) + if not isinstance(value, str): + raise AgentRegistryError("each agent entry requires id and label") + text = value.strip() + if not text: + raise AgentRegistryError("each agent entry requires id and label") + return text - registry = AgentRegistry() - for a in data.get("agents", []): - registry.agents.append(AgentConfig( - id=a["id"], - label=a.get("label", ""), - base_url=a["base_url"], - workspace_path=a["workspace_path"], - )) - return registry + +def _optional_text(entry: Mapping[str, object], key: str) -> str: + value = entry.get(key) + if value is None: + return "" + if not isinstance(value, str): + raise AgentRegistryError(f"agent entry field '{key}' must be a string") + return value.strip() + + +def _load_registry_data(path: str | Path) -> dict[str, object]: + try: + raw = yaml.safe_load(Path(path).read_text(encoding="utf-8")) + except yaml.YAMLError as exc: + raise AgentRegistryError("invalid agent registry YAML") from exc + if raw is None: + return {} + if not isinstance(raw, Mapping): + raise AgentRegistryError("agent registry must be a mapping with an agents list") + return dict(raw) + + +def load_agent_registry(path: str | Path) -> AgentRegistry: + raw = _load_registry_data(path) + entries = raw.get("agents") + if not isinstance(entries, list) or not entries: + raise AgentRegistryError("agents registry must contain a non-empty agents list") + + agents: list[AgentDefinition] = [] + seen: set[str] = set() + for entry in entries: + if not isinstance(entry, Mapping): + raise AgentRegistryError("each agent entry requires id and label") + agent_id = _required_text(entry, "id") + label = _required_text(entry, "label") + base_url = _optional_text(entry, "base_url") + workspace_path = _optional_text(entry, "workspace_path") + if agent_id in seen: + raise AgentRegistryError(f"duplicate agent id: {agent_id}") + seen.add(agent_id) + agents.append( + AgentDefinition( + agent_id=agent_id, + label=label, + base_url=base_url, + workspace_path=workspace_path, + ) + ) + + user_agents = raw.get("user_agents") + if user_agents is not None: + if not isinstance(user_agents, Mapping): + raise AgentRegistryError("user_agents must be a mapping of user id strings to agent ids") + normalized: dict[str, str] = {} + for uid, aid in user_agents.items(): + if not isinstance(uid, str) or not isinstance(aid, str): + raise AgentRegistryError("user_agents keys and values must be strings") + normalized[uid.strip()] = aid.strip() + user_agents_map: Mapping[str, str] = normalized + else: + user_agents_map = {} + + return AgentRegistry(agents=agents, user_agents=user_agents_map) def load_from_env() -> AgentRegistry: - path = os.environ.get( - "MAX_AGENT_REGISTRY_PATH", - "/app/config/max-agents.yaml", - ) - return load_agent_registry(path) \ No newline at end of file + import os + + path = os.environ.get("MAX_AGENT_REGISTRY_PATH", "/app/config/max-agents.yaml") + return load_agent_registry(path) diff --git a/adapter/max/api_client.py b/adapter/max/api_client.py new file mode 100644 index 0000000..7f3810e --- /dev/null +++ b/adapter/max/api_client.py @@ -0,0 +1,153 @@ +"""HTTP client for MAX Bot API (platform-api.max.ru).""" +from __future__ import annotations + +import logging +from typing import Any + +import httpx + +logger = logging.getLogger(__name__) + + +class MaxApiError(Exception): + def __init__(self, status: int, payload: Any): + super().__init__(f"MAX API error {status}: {payload}") + self.status = status + self.payload = payload + + +class MaxBotApi: + """ + Minimal async client. Auth: raw token in Authorization header (same as official TS SDK). + """ + + def __init__(self, token: str, base_url: str = "https://platform-api.max.ru") -> None: + self._token = token + self._base = base_url.rstrip("/") + self._client = httpx.AsyncClient( + base_url=self._base, + headers={"Authorization": token}, + timeout=httpx.Timeout(120.0, connect=30.0), + ) + + async def aclose(self) -> None: + await self._client.aclose() + + async def _request( + self, + method: str, + path: str, + *, + params: dict[str, Any] | None = None, + json: Any | None = None, + ) -> Any: + response = await self._client.request(method, path, params=params, json=json) + payload: Any + try: + payload = response.json() + except Exception: + payload = response.text + if response.status_code >= 400: + if isinstance(payload, dict): + raise MaxApiError( + response.status_code, + {"code": payload.get("code"), "message": payload.get("message", payload)}, + ) + raise MaxApiError(response.status_code, payload) + return payload + + async def get_me(self) -> dict[str, Any]: + data = await self._request("GET", "/me") + return dict(data) if isinstance(data, dict) else {} + + async def get_updates( + self, + *, + marker: int | None = None, + limit: int = 100, + timeout: int = 30, + types: list[str] | None = None, + ) -> tuple[list[dict[str, Any]], int | None]: + params: dict[str, Any] = {"limit": limit, "timeout": timeout} + if marker is not None: + params["marker"] = marker + if types: + params["types"] = ",".join(types) + data = await self._request("GET", "/updates", params=params) + if not isinstance(data, dict): + return [], None + raw_updates = data.get("updates") or [] + updates = [u for u in raw_updates if isinstance(u, dict)] + marker_out = data.get("marker") + return updates, marker_out if isinstance(marker_out, int) else None + + async def send_message_to_chat( + self, + chat_id: int, + *, + text: str | None = None, + attachments: list[dict[str, Any]] | None = None, + fmt: str | None = None, + ) -> dict[str, Any]: + params: dict[str, Any] = {"chat_id": chat_id} + body: dict[str, Any] = {} + if text is not None: + body["text"] = text + if attachments is not None: + body["attachments"] = attachments + if fmt: + body["format"] = fmt + return await self._request("POST", "/messages", params=params, json=body) + + async def send_message_to_user( + self, + user_id: int, + *, + text: str | None = None, + attachments: list[dict[str, Any]] | None = None, + fmt: str | None = None, + ) -> dict[str, Any]: + params: dict[str, Any] = {"user_id": user_id} + body: dict[str, Any] = {} + if text is not None: + body["text"] = text + if attachments is not None: + body["attachments"] = attachments + if fmt: + body["format"] = fmt + return await self._request("POST", "/messages", params=params, json=body) + + async def send_chat_action(self, chat_id: int, action: str) -> Any: + return await self._request( + "POST", + f"/chats/{chat_id}/actions", + json={"action": action}, + ) + + async def get_upload_url(self, upload_type: str) -> dict[str, Any]: + data = await self._request("POST", "/uploads", params={"type": upload_type}) + return dict(data) if isinstance(data, dict) else {} + + async def answer_callback( + self, + callback_id: str, + *, + message: dict[str, Any] | None = None, + notification: str | None = None, + ) -> Any: + body: dict[str, Any] = {} + if message is not None: + body["message"] = message + if notification is not None: + body["notification"] = notification + return await self._request( + "POST", + "/answers", + params={"callback_id": callback_id}, + json=body if body else {}, + ) + + async def download_file(self, url: str) -> bytes: + response = await self._client.get(url) + response.raise_for_status() + return response.content diff --git a/adapter/max/bot.py b/adapter/max/bot.py index ffb7f6e..0200d3f 100644 --- a/adapter/max/bot.py +++ b/adapter/max/bot.py @@ -1,98 +1,97 @@ -"""MAX surface bot runtime.""" +"""MAX messenger surface — runtime using official MAX Bot API (long polling).""" from __future__ import annotations -import os import asyncio import logging +import os +import re from pathlib import Path +from urllib.parse import urlsplit, urlunsplit -import aiohttp +import httpx import structlog +from dotenv import load_dotenv -from adapter.max.agent_registry import load_from_env, AgentRegistry +from adapter.max.agent_registry import AgentRegistry, AgentRegistryError, load_from_env +from adapter.max.api_client import MaxApiError, MaxBotApi from adapter.max.converter import ( - max_message_to_incoming, - max_attachment_to_internal, + collect_max_attachments, + incoming_from_message_callback_payload, + incoming_from_text_commands, +) +from adapter.max.files import ( + guess_upload_type, + read_workspace_bytes, + save_incoming_from_url, + upload_file_as_attachment, ) -from adapter.max.files import FileHandler -from adapter.max.handlers.chat import ChatHandler as MaxChatHandler from adapter.max.handlers.attachments import AttachmentHandler -from adapter.max.handlers.help import get_help +from adapter.max.handlers.chat import ChatHandler as MaxChatHandler +from adapter.max.handlers.commands import register_max_handlers from adapter.max.store import ChatStore, RoomMeta - -from core.chat import ChatManager from core.auth import AuthManager +from core.chat import ChatManager from core.handler import EventDispatcher -from core.protocol import ( - Attachment, - IncomingMessage, - IncomingCommand, - IncomingCallback, - OutgoingEvent, - OutgoingMessage, - OutgoingNotification, - OutgoingTyping, - OutgoingUI, -) +from core.handlers import register_all +from core.protocol import Attachment, IncomingCommand, OutgoingEvent, OutgoingMessage +from core.protocol import OutgoingNotification, OutgoingTyping, OutgoingUI from core.settings import SettingsManager from core.store import InMemoryStore, StateStore - -from sdk.interface import ( - MessageChunk, - MessageResponse, - PlatformClient, - PlatformError, - User, - UserSettings, -) +from sdk.interface import PlatformClient, PlatformError +from sdk.prototype_state import PrototypeStateStore from sdk.real import RealPlatformClient logger = structlog.get_logger(__name__) +MAX_TEXT_CHARS = 4000 +_POLL_TYPES_DEFAULT = ["message_created", "message_callback", "bot_started"] + +load_dotenv(Path(__file__).resolve().parents[2] / ".env") + + +def _normalize_agent_base_url(url: str) -> str: + parsed = urlsplit(url) + path = re.sub(r"(?:/v1)?/agent_ws(?:/[^/]+)?/?$", "", parsed.path.rstrip("/")) + return urlunsplit((parsed.scheme, parsed.netloc, path, "", "")) + + +def _agent_base_url_from_env() -> str: + if base_url := os.environ.get("AGENT_BASE_URL"): + return base_url + if ws_url := os.environ.get("AGENT_WS_URL"): + return _normalize_agent_base_url(ws_url) + return "http://127.0.0.1:8000" -# --------------------------------------------------------------------------- -# Routed MAX platform client — копия логики RoutedPlatformClient из Matrix -# --------------------------------------------------------------------------- class RoutedMaxPlatformClient(PlatformClient): - """Маршрутизирует запросы к нужному агенту на основе chat_id.""" + """Routes agent WS calls based on ChatStore mapping (same idea as RoutedPlatformClient).""" - def __init__(self, *, store: ChatStore, delegates: dict[str, PlatformClient]): + def __init__( + self, *, chat_store: ChatStore, delegates: dict[str, PlatformClient], default_client: PlatformClient + ): if not delegates: raise ValueError("RoutedMaxPlatformClient requires at least one delegate") - self._store = store + self._store = chat_store self._delegates = dict(delegates) - self._default_client = next(iter(self._delegates.values())) + self._default_client = default_client async def get_or_create_user( self, external_id: str, platform: str, display_name: str | None = None - ) -> User: + ): return await self._default_client.get_or_create_user( external_id=external_id, platform=platform, display_name=display_name ) - async def send_message( - self, - user_id: str, - chat_id: str, - text: str, - attachments: list[Attachment] | None = None, - ) -> MessageResponse: + async def send_message(self, user_id: str, chat_id: str, text: str, attachments=None): delegate, platform_chat_id = await self._resolve_delegate(user_id, chat_id) return await delegate.send_message(user_id, platform_chat_id, text, attachments) - async def stream_message( - self, - user_id: str, - chat_id: str, - text: str, - attachments: list[Attachment] | None = None, - ): + async def stream_message(self, user_id: str, chat_id: str, text: str, attachments=None): delegate, platform_chat_id = await self._resolve_delegate(user_id, chat_id) async for chunk in delegate.stream_message(user_id, platform_chat_id, text, attachments): yield chunk - async def get_settings(self, user_id: str) -> UserSettings: + async def get_settings(self, user_id: str): return await self._default_client.get_settings(user_id) async def update_settings(self, user_id: str, action) -> None: @@ -104,9 +103,7 @@ class RoutedMaxPlatformClient(PlatformClient): if callable(close_fn): await close_fn() - async def _resolve_delegate( - self, user_id: str, local_chat_id: str - ) -> tuple[PlatformClient, str]: + async def _resolve_delegate(self, user_id: str, local_chat_id: str): room = self._store.get_room_by_platform_chat_id(local_chat_id) if room is None: raise PlatformError(f"unknown chat id: {local_chat_id}", code="CHAT_NOT_FOUND") @@ -114,11 +111,6 @@ class RoutedMaxPlatformClient(PlatformClient): agent_id = room.agent_id platform_chat_id = room.platform_chat_id - if not agent_id or not platform_chat_id: - raise PlatformError( - f"routing incomplete for chat: {local_chat_id}", code="ROUTE_INCOMPLETE" - ) - delegate = self._delegates.get(str(agent_id)) if delegate is None: raise PlatformError(f"unknown agent id: {agent_id}", code="AGENT_NOT_FOUND") @@ -126,53 +118,46 @@ class RoutedMaxPlatformClient(PlatformClient): return delegate, str(platform_chat_id) -# --------------------------------------------------------------------------- -# MAX Surface -# --------------------------------------------------------------------------- - -class MaxSurface: - def __init__(self): - # Env +class MaxBotApp: + def __init__(self) -> None: self.token = os.environ["MAX_BOT_TOKEN"] - self.api_url = os.environ.get("MAX_API_URL", "https://api.max.ru/v1") - self.workspace_dir = os.environ.get("SURFACES_WORKSPACE_DIR", "/agents") - self.agent_base_url = os.environ.get("AGENT_BASE_URL", "") + api_base = os.environ.get("MAX_API_URL", "https://platform-api.max.ru").strip().rstrip("/") + self.api = MaxBotApi(self.token, base_url=api_base) + self.surfaces_workspace = Path(os.environ.get("SURFACES_WORKSPACE_DIR", "/agents")) + agent_base_url = _agent_base_url_from_env() - # Registry - self.registry: AgentRegistry = load_from_env() + try: + self.registry: AgentRegistry = load_from_env() + except (AgentRegistryError, OSError) as exc: + raise RuntimeError("failed to load MAX agent registry") from exc - # MAX-specific store for chat ↔ agent mapping - self.store = ChatStore() - self.files = FileHandler(self.workspace_dir) - self.max_chat_handler = MaxChatHandler(self.store) - self.attach_handler = AttachmentHandler(self.store) + self.chat_store = ChatStore() + self.max_chat_handler = MaxChatHandler(self.chat_store) + self.attach_handler = AttachmentHandler(self.chat_store) - # Core store (in-memory, lost on restart — OK for MVP) self.core_store: StateStore = InMemoryStore() + self.prototype_state = PrototypeStateStore() - # Platform client per agent - delegates: dict[str, PlatformClient] = {} + delegates: dict[str, RealPlatformClient] = {} for agent in self.registry.agents: - base = self.agent_base_url or agent.base_url.rstrip("/") - delegates[agent.id] = RealPlatformClient( - agent_id=agent.id, - agent_base_url=base, - prototype_state=None, + base_raw = agent.base_url.strip() if agent.base_url else agent_base_url + delegates[agent.agent_id] = RealPlatformClient( + agent_id=agent.agent_id, + agent_base_url=base_raw, + prototype_state=self.prototype_state, platform="max", ) - # Routed platform - self.platform = RoutedMaxPlatformClient( - store=self.store, + default_client = next(iter(delegates.values())) + self.platform: RoutedMaxPlatformClient = RoutedMaxPlatformClient( + chat_store=self.chat_store, delegates=delegates, + default_client=default_client, ) - # Core managers self.chat_mgr = ChatManager(self.platform, self.core_store) self.auth_mgr = AuthManager(self.platform, self.core_store) self.settings_mgr = SettingsManager(self.platform, self.core_store) - - # Event dispatcher — это и есть "ядро" self.dispatcher = EventDispatcher( platform=self.platform, chat_mgr=self.chat_mgr, @@ -180,291 +165,454 @@ class MaxSurface: settings_mgr=self.settings_mgr, ) - # HTTP session for MAX API - self.session: aiohttp.ClientSession | None = None - - # ------------------------------------------------------------------ - # Long polling - # ------------------------------------------------------------------ - - async def start(self): - self.session = aiohttp.ClientSession( - headers={"Authorization": f"Bearer {self.token}"} + register_all(self.dispatcher) + register_max_handlers( + self.dispatcher, + chat_store=self.chat_store, + max_chat_handler=self.max_chat_handler, + prototype_state=self.prototype_state, ) - logger.info("max_surface_starting", api_url=self.api_url) - offset = 0 - while True: - try: - updates = await self._get_updates(offset) - for update in updates: - offset = update["update_id"] + 1 - await self._process_update(update) - except Exception as e: - logger.error("max_poll_error", error=str(e)) - await asyncio.sleep(5) + poll_types = os.environ.get("MAX_UPDATE_TYPES", "").strip() + self.update_types = ( + [t.strip() for t in poll_types.split(",") if t.strip()] + if poll_types + else list(_POLL_TYPES_DEFAULT) + ) - async def _get_updates(self, offset: int) -> list: - async with self.session.get( - f"{self.api_url}/updates", - params={"offset": offset, "timeout": 30}, - ) as resp: - data = await resp.json() - return data.get("result", []) + self._marker: int | None = None + self.bot_user_ids: set[int] = set() + logging.basicConfig(level=logging.INFO) - async def _process_update(self, update: dict) -> None: - if "message" in update: - await self._handle_message(update["message"]) - elif "callback_query" in update: - await self._handle_callback(update["callback_query"]) + async def bootstrap_identity(self) -> None: + me = await self.api.get_me() + uid = me.get("user_id") + if isinstance(uid, int): + self.bot_user_ids.add(uid) - # ------------------------------------------------------------------ - # Message handling - # ------------------------------------------------------------------ + async def ensure_user(self, max_user_id: str, *, display_name: str | None) -> None: + await self.platform.get_or_create_user(max_user_id, "max", display_name=display_name) + await self.auth_mgr.confirm(max_user_id) - async def _handle_message(self, message: dict) -> None: - text = message.get("text", "") or message.get("caption", "") - user_id = str(message["from"]["id"]) - chat_id = str(message["chat"]["id"]) + async def _resolve_room( + self, + *, + max_chat_key: str, + max_user_id: str, + ) -> RoomMeta: + room = self.chat_store.get_room_by_max_chat_id(max_chat_key) + if room is not None: + return room - # Ensure room exists - room = self.store.get_room_by_max_chat_id(chat_id) - if room is None: - agent = self.registry.get_agent_for_user(user_id) - platform_chat_id = self.max_chat_handler.handle_new( - max_chat_id=chat_id, - user_id=user_id, - agent_id=agent.id, - ) - room = self.store.get_room_by_max_chat_id(chat_id) - else: - agent = self.registry.get_agent_by_id(room.agent_id) + assignment = self.registry.resolve_agent_for_user(max_user_id) + if assignment.agent_id is None: + raise RuntimeError("no agents configured") - # Handle attachments - attachments = [] - if "attachment" in message: - att = message["attachment"] - internal_att = max_attachment_to_internal( - filename=att["filename"], - mime_type=att.get("mime_type", "application/octet-stream"), - download_url=att["download_url"], - ) - attachments.append(internal_att) + ws_path = "" + try: + ws_path = self.registry.get(assignment.agent_id).workspace_path + except AgentRegistryError: + pass - workspace_path = await self.files.download_attachment( - download_url=att["download_url"], - filename=att["filename"], - agent_workspace=agent.workspace_path, - headers={"Authorization": f"Bearer {self.token}"}, - ) - self.store.stage_attachment(chat_id, (workspace_path, att["filename"])) + pid = self.max_chat_handler.handle_new( + max_chat_id=max_chat_key, + user_id=max_user_id, + agent_id=assignment.agent_id, + name="Чат 1", + workspace_path=ws_path, + ) - # File-only message → stage and return - if attachments and not text: + await self.chat_mgr.get_or_create( + user_id=max_user_id, + chat_id=pid, + platform="max", + surface_ref=max_chat_key, + name="Чат 1", + ) + + refreshed = self.chat_store.get_room_by_max_chat_id(max_chat_key) + if refreshed is None: + raise RuntimeError("max room bootstrap failed") + + logger.info( + "max_chat_bootstrapped", + max_chat_key=max_chat_key, + platform_chat_id=pid, + agent_id=assignment.agent_id, + ) + + return refreshed + + async def process_message_created(self, payload: dict) -> None: + message = payload.get("message") + if not isinstance(message, dict): return - # Merge staged attachments - queued = self.store.pop_attachments(chat_id) - if queued: - for ws_path, filename in queued: - attachments.append( + sender = message.get("sender") or {} + if not isinstance(sender, dict): + return + + uid = sender.get("user_id") + if isinstance(uid, int): + uid_s = str(uid) + else: + return + + if sender.get("is_bot"): + return + + recipient = message.get("recipient") or {} + chat_id_numeric = recipient.get("chat_id") + if chat_id_numeric is None or not isinstance(chat_id_numeric, int): + dialog_uid = recipient.get("user_id") + if isinstance(dialog_uid, int): + chat_key = str(dialog_uid) + else: + return + else: + chat_key = str(chat_id_numeric) + + await self.ensure_user(uid_s, display_name=sender.get("first_name")) + + room = await self._resolve_room( + max_chat_key=chat_key, + max_user_id=uid_s, + ) + + body = message.get("body") or {} + text = "" + if isinstance(body, dict): + raw_txt = body.get("text") + text = raw_txt.strip() if isinstance(raw_txt, str) else "" + + attachments_core, raw_meta = collect_max_attachments(body) if isinstance(body, dict) else ([], []) + attachments_core = await self._materialize_attachments(room, attachments_core, raw_meta) + + if attachments_core and not text: + for att in attachments_core: + self.chat_store.stage_attachment(chat_key, (att.workspace_path or "", att.filename or "file")) + return + + queued = self.chat_store.pop_attachments(chat_key) + merged = list(attachments_core) + for ws_path, fname in queued: + if ws_path: + merged.append( Attachment( type="document", - filename=filename, + filename=fname, workspace_path=ws_path, ) ) - # Convert to incoming event - incoming = max_message_to_incoming( + incoming = incoming_from_text_commands( text=text, - user_id=user_id, - chat_id=room.platform_chat_id, - attachments=attachments, + max_user_id=uid_s, + platform_chat_id=room.platform_chat_id, + attachments=merged, ) - # Surface-level commands - if isinstance(incoming, IncomingCommand): - response_text = await self._handle_surface_command( - incoming, max_chat_id=chat_id, user_id=user_id, agent=agent - ) - if response_text: - await self._send_message(chat_id, response_text) - return + if isinstance(incoming, IncomingMessage): + if not incoming.text.strip() and not incoming.attachments: + return + + if isinstance(incoming, IncomingCommand): + if incoming.command in {"list", "remove"}: + reply = await self._handle_local_attachment_command(incoming, chat_key) + await self._send_lines(int(chat_key), reply) + return - # Dispatch to core try: - outgoing_events = await self.dispatcher.dispatch(incoming) + outgoing = await self.dispatcher.dispatch(incoming) except PlatformError as exc: - logger.warning( - "max_dispatch_platform_error", - user_id=user_id, - chat_id=chat_id, - code=exc.code, - error=str(exc), - ) - outgoing_events = [ + logger.warning("max_dispatch_error", code=exc.code, err=str(exc)) + outgoing = [ OutgoingMessage( chat_id=room.platform_chat_id, - text="Сервис временно недоступен. Попробуйте ещё раз позже.", + text="Сервис временно недоступен. Попробуйте позже.", ) ] - # Send outgoing events back to MAX - for event in outgoing_events: - await self._send_outgoing(chat_id, event, agent.workspace_path) + if not outgoing and isinstance(incoming, IncomingCommand): + outgoing = [ + OutgoingMessage( + chat_id=room.platform_chat_id, + text="Неизвестная команда. Введите /help.", + ), + ] - # ------------------------------------------------------------------ - # Callbacks - # ------------------------------------------------------------------ + await self._send_outgoing(int(chat_key), outgoing, room) - async def _handle_callback(self, callback: dict) -> None: - user_id = str(callback["from"]["id"]) - chat_id = str(callback["message"]["chat"]["id"]) - message_id = str(callback["message"]["message_id"]) - data = callback.get("data", "") + async def _handle_local_attachment_command(self, incoming: IncomingCommand, chat_key: str) -> str: + if incoming.command == "list": + return self.attach_handler.handle_list(chat_key) + return self.attach_handler.handle_remove(chat_key, incoming.args[0] if incoming.args else "") - room = self.store.get_room_by_max_chat_id(chat_id) + async def _materialize_attachments( + self, + room: RoomMeta, + attachments: list[Attachment], + raw_meta: list[dict], + ) -> list[Attachment]: + workspace = Path(room.workspace_path or str(self.surfaces_workspace)) + out: list[Attachment] = [] + for att, _meta in zip(attachments, raw_meta, strict=False): + if not att.url: + out.append(att) + continue + try: + rel = await save_incoming_from_url( + api=self.api, + workspace_root=workspace, + filename=att.filename or "file.bin", + url=att.url, + ) + except (httpx.HTTPError, OSError) as exc: + logger.warning("max_attachment_download_failed", error=str(exc)) + out.append(att) + continue + out.append( + Attachment( + type=att.type, + filename=att.filename, + mime_type=att.mime_type, + workspace_path=rel, + url=att.url, + ) + ) + return out + + async def process_message_callback(self, payload: dict) -> None: + cb = payload.get("callback") or {} + if not isinstance(cb, dict): + return + callback_id = cb.get("callback_id") + user_blob = cb.get("user") or {} + uid = user_blob.get("user_id") if isinstance(user_blob, dict) else None + uid_s = str(uid) if isinstance(uid, int) else None + + msg = payload.get("message") or {} + recipient = msg.get("recipient") or {} if isinstance(msg, dict) else {} + cc = recipient.get("chat_id") + if isinstance(cc, int): + chat_key = str(cc) + elif isinstance(uid_s, str): + chat_key = uid_s + else: + return + + mid = "" + body = msg.get("body") if isinstance(msg, dict) else None + if isinstance(body, dict): + mb = body.get("mid") + mid = mb if isinstance(mb, str) else "" + + if uid_s is None: + return + + await self.ensure_user(uid_s, display_name=user_blob.get("first_name")) + + room = self.chat_store.get_room_by_max_chat_id(chat_key) if room is None: return - incoming = max_message_to_incoming( - text="", - user_id=user_id, - chat_id=room.platform_chat_id, - callback_data=data, - message_id=message_id, + payload_raw = cb.get("payload") if cb.get("payload") is not None else None + payload_str = str(payload_raw) if payload_raw is not None else "" + + incoming = incoming_from_message_callback_payload( + max_user_id=uid_s, + platform_chat_id=room.platform_chat_id, + payload_raw=payload_str, + callback_message_id=mid, ) + if incoming is None: + if isinstance(callback_id, str): + await self.api.answer_callback(callback_id, notification="ok") + return try: - outgoing_events = await self.dispatcher.dispatch(incoming) + outgoing = await self.dispatcher.dispatch(incoming) except PlatformError: + outgoing = [] + + await self._send_outgoing(int(chat_key), outgoing, room) + + if isinstance(callback_id, str): + await self.api.answer_callback(callback_id, notification=" ") + + async def process_bot_started(self, payload: dict) -> None: + cid = payload.get("chat_id") + user_blob = payload.get("user") or {} + uid = user_blob.get("user_id") + + chat_key = str(cid) if isinstance(cid, int) else None + if chat_key is None or not isinstance(uid, int): return - for event in outgoing_events: - agent = self.registry.get_agent_by_id(room.agent_id) - ws = agent.workspace_path if agent else "/agents/0" - await self._send_outgoing(chat_id, event, ws) + uid_s = str(uid) + await self.ensure_user(uid_s, display_name=user_blob.get("first_name")) - # ------------------------------------------------------------------ - # Surface commands - # ------------------------------------------------------------------ + await self._resolve_room( + max_chat_key=chat_key, + max_user_id=uid_s, + ) - async def _handle_surface_command( - self, cmd: IncomingCommand, max_chat_id: str, user_id: str, agent - ) -> str | None: - command = cmd.command - args = cmd.args + deeplink_note = "" + dl = payload.get("payload") if isinstance(payload.get("payload"), str) else None + if dl: + deeplink_note = f" (payload: {dl})" - if command == "new": - name = " ".join(args) if args else None - self.max_chat_handler.handle_new( - max_chat_id=max_chat_id, - user_id=user_id, - agent_id=agent.id, - name=name, - ) - return f"New chat created: {name or 'Unnamed'}" + welcome = ( + "Здравствуйте, я помогу с задачами Lambda. " + f"Отправьте текст или файл.{deeplink_note}" + ) - elif command == "chats": - return self.max_chat_handler.handle_chats(user_id) + await self.api.send_message_to_chat(int(chat_key), text=welcome) - elif command == "rename": - new_name = " ".join(args) if args else "" - return self.max_chat_handler.handle_rename(max_chat_id, new_name) + async def dispatch_update(self, update: dict) -> None: + utype = update.get("update_type") + if utype == "message_created": + await self.process_message_created(update) + elif utype == "message_callback": + await self.process_message_callback(update) + elif utype == "bot_started": + await self.process_bot_started(update) - elif command == "archive": - return self.max_chat_handler.handle_archive(max_chat_id) + async def _send_lines(self, max_chat_id: int, text: str) -> None: + if text: + await self._send_plain_text(max_chat_id, text) - elif command in ("clear", "reset"): - return self.max_chat_handler.handle_clear(max_chat_id) + async def _send_plain_text(self, max_chat_id: int, text: str, *, fmt: str | None = None) -> None: + chunk_size = MAX_TEXT_CHARS + for i in range(0, len(text), chunk_size): + part = text[i : i + chunk_size] + await self.api.send_message_to_chat(max_chat_id, text=part, fmt=fmt) - elif command == "list": - return self.attach_handler.handle_list(max_chat_id) + async def _send_outgoing(self, max_chat_id: int, events: list[OutgoingEvent], room: RoomMeta) -> None: + workspace_agent = Path( + room.workspace_path if room.workspace_path else self.surfaces_workspace, + ) - elif command == "remove": - idx = args[0] if args else "" - return self.attach_handler.handle_remove(max_chat_id, idx) + for event in events: + if isinstance(event, OutgoingTyping): + await self.api.send_chat_action(max_chat_id, "typing_on") + continue - elif command == "help": - return get_help() + if isinstance(event, OutgoingNotification): + body = f"[{event.level.upper()}] {event.text}" + await self._send_plain_text(max_chat_id, body) + continue - return None + if isinstance(event, OutgoingMessage): + fmt = None + if getattr(event, "parse_mode", "plain") == "markdown": + fmt = "markdown" - # ------------------------------------------------------------------ - # Outgoing to MAX - # ------------------------------------------------------------------ + merged_text = getattr(event, "text", "") or "" + attachments = list(getattr(event, "attachments", []) or []) - async def _send_outgoing( - self, max_chat_id: str, event: OutgoingEvent, workspace_path: str - ) -> None: - if isinstance(event, OutgoingTyping): - await self._send_typing(max_chat_id) - return + agent_def = None + try: + agent_def = self.registry.get(room.agent_id) + except AgentRegistryError: + pass - if isinstance(event, OutgoingNotification): - text = f"[{event.level.upper()}] {event.text}" - await self._send_message(max_chat_id, text) - return + root = ( + Path(agent_def.workspace_path) + if agent_def and agent_def.workspace_path + else workspace_agent + ) - if isinstance(event, OutgoingMessage): - if event.text: - await self._send_message(max_chat_id, event.text) + req_atts: list[dict] = [] + for raw_att in attachments: + wp = getattr(raw_att, "workspace_path", None) + if not wp: + continue + try: + data = read_workspace_bytes(wp, agent_workspace=str(root)) + except OSError: + logger.warning("max_outgoing_missing_file", path=wp) + continue - # Upload outgoing files - for att in event.attachments: - if not att.workspace_path: + fn = getattr(raw_att, "filename", None) or Path(str(wp)).name + mime = getattr(raw_att, "mime_type", None) + att_type = str(getattr(raw_att, "type", "") or "") + ctype = guess_upload_type(mime, attachment_type=str(att_type)) + attached = await upload_file_as_attachment( + self.api, filename=fn, content=data, upload_type=ctype + ) + req_atts.append(attached) + + text_payload = merged_text.strip() or None + + if text_payload is None and not req_atts: continue - if self.files.file_exists(att.workspace_path, workspace_path): - # Read file and upload to MAX - file_data = self.files.read_outgoing_file( - att.workspace_path, workspace_path - ) - # MAX file upload logic — зависит от API MAX - # Пока просто отправляем имя файла текстом - await self._send_message( - max_chat_id, - f"[Файл: {att.filename or att.workspace_path}]", - ) - return - if isinstance(event, OutgoingUI): - lines = [event.text] - if event.buttons: - for btn in event.buttons: - lines.append(f" {btn.label}") - lines.append("") - lines.append("Ответьте !yes для подтверждения или !no для отмены.") - await self._send_message(max_chat_id, "\n".join(lines)) - return + await self.api.send_message_to_chat( + max_chat_id, + text=text_payload, + attachments=req_atts or None, + fmt=fmt, + ) - # ------------------------------------------------------------------ - # Low-level MAX API - # ------------------------------------------------------------------ + if isinstance(event, OutgoingUI): + lines = [event.text] + if getattr(event, "buttons", []): + lines.append("") + for button in event.buttons: + lines.append(f"• {button.label}") + lines.append("") + lines.append("Ответьте /yes или /no (или кнопки с callback в MAX).") - async def _send_message(self, chat_id: str, text: str) -> None: - async with self.session.post( - f"{self.api_url}/sendMessage", - json={"chat_id": chat_id, "text": text}, - ) as resp: - await resp.json() + merged = "\n".join(lines) + await self._send_plain_text(max_chat_id, merged) - async def _send_typing(self, chat_id: str) -> None: - async with self.session.post( - f"{self.api_url}/sendChatAction", - json={"chat_id": chat_id, "action": "typing"}, - ) as resp: - await resp.json() + async def run(self) -> None: + await self.bootstrap_identity() + + logger.info( + "max_bot_poll_start", + update_types=self.update_types, + registry_agents=len(self.registry.agents), + ) + + while True: + try: + updates, marker = await self.api.get_updates( + marker=self._marker, + types=self.update_types, + timeout=40, + limit=100, + ) + + self._marker = marker + + for u in updates: + try: + await self.dispatch_update(u) + except Exception: + logger.exception("max_update_failed", update=u) + + except asyncio.CancelledError: + raise + except (MaxApiError, httpx.HTTPError) as exc: + logger.error("max_poll_fatal", error=str(exc)) + await asyncio.sleep(5) + + async def shutdown(self) -> None: + close = getattr(self.platform, "close", None) + if callable(close): + await close() + await self.api.aclose() -# --------------------------------------------------------------------------- -# Entry point -# --------------------------------------------------------------------------- - -async def main(): - surface = MaxSurface() - await surface.start() +async def main() -> None: + app = MaxBotApp() + try: + await app.run() + finally: + await app.shutdown() if __name__ == "__main__": - asyncio.run(main()) \ No newline at end of file + asyncio.run(main()) diff --git a/adapter/max/converter.py b/adapter/max/converter.py index 2bbfd8b..758e2b1 100644 --- a/adapter/max/converter.py +++ b/adapter/max/converter.py @@ -1,88 +1,151 @@ -"""MAX event to internal protocol converter.""" -from typing import Union, List -from core.protocol import ( - IncomingMessage, - IncomingCommand, - IncomingCallback, - Attachment, -) +"""MAX Bot API payloads -> core Incoming* types.""" +from __future__ import annotations + +from typing import Any + +from core.protocol import Attachment, IncomingCallback, IncomingCommand, IncomingMessage -def _extract_command(text: str) -> Union[IncomingCommand, IncomingCallback, None]: - if not text.startswith("!"): - return None - - parts = text.strip().split(maxsplit=1) - cmd = parts[0].lower() - args = parts[1] if len(parts) > 1 else "" - - if cmd == "!yes": - return IncomingCallback( - user_id="", - platform="max", - chat_id="", - action="confirm", - ) - elif cmd == "!no": - return IncomingCallback( - user_id="", - platform="max", - chat_id="", - action="cancel", - ) - else: - return IncomingCommand( - user_id="", - platform="max", - chat_id="", - command=cmd.lstrip("!"), - args=args.split() if args else [], - ) - - -def max_message_to_incoming( +def incoming_from_text_commands( *, text: str, - user_id: str, - chat_id: str, - attachments: List[Attachment] = None, - callback_data: str = None, - message_id: str = None, -) -> Union[IncomingMessage, IncomingCommand, IncomingCallback]: - if callback_data: + max_user_id: str, + platform_chat_id: str, + attachments: list[Attachment], +) -> IncomingMessage | IncomingCommand | IncomingCallback: + """Парсинг текста: только slash-команды (как в Telegram), обычное сообщение иначе.""" + + stripped = text.strip() + proto = stripped.lower() + + if proto in {"/yes"}: return IncomingCallback( - user_id=user_id, + user_id=max_user_id, platform="max", - chat_id=chat_id, - action=callback_data, - payload={"message_id": message_id} if message_id else {}, + chat_id=platform_chat_id, + action="confirm", + payload={}, + ) + if proto in {"/no"}: + return IncomingCallback( + user_id=max_user_id, + platform="max", + chat_id=platform_chat_id, + action="cancel", + payload={}, ) - if text: - cmd = _extract_command(text) - if cmd is not None: - cmd.user_id = user_id - cmd.chat_id = chat_id - return cmd + if not stripped.startswith("/"): + return IncomingMessage( + user_id=max_user_id, + platform="max", + chat_id=platform_chat_id, + text=text, + attachments=attachments, + reply_to=None, + ) - return IncomingMessage( - user_id=user_id, + raw = stripped[1:] + parts = raw.split(maxsplit=1) + name = (parts[0] or "").lower() + tail = parts[1] if len(parts) > 1 else "" + + return IncomingCommand( + user_id=max_user_id, platform="max", - chat_id=chat_id, - text=text or "", - attachments=attachments or [], + chat_id=platform_chat_id, + command=name, + args=tail.split() if tail else [], ) -def max_attachment_to_internal( +def incoming_from_message_callback_payload( *, - filename: str, - mime_type: str, - download_url: str, -) -> Attachment: - return Attachment( - type="document", - url=download_url, + max_user_id: str, + platform_chat_id: str, + payload_raw: str | None, + callback_message_id: str | None, +) -> IncomingCallback | None: + if not payload_raw: + return None + if payload_raw in {"confirm", "cancel", "toggle_skill"}: + return IncomingCallback( + user_id=max_user_id, + platform="max", + chat_id=platform_chat_id, + action=payload_raw, + payload={"message_id": callback_message_id or ""}, + ) + return IncomingCallback( + user_id=max_user_id, + platform="max", + chat_id=platform_chat_id, + action="max_callback", + payload={"payload": payload_raw, "message_id": callback_message_id}, + ) + + +def attachment_from_max_dict(raw: dict[str, Any]) -> tuple[Attachment, dict[str, Any]] | None: + """Return core Attachment placeholder + raw attachment for download.""" + + kind = raw.get("type") + payload = raw.get("payload") + if not isinstance(kind, str) or not isinstance(payload, dict): + return None + + url = payload.get("url") + if not isinstance(url, str): + url = "" + + token = payload.get("token") + filename = "attachment.bin" + mapped = "document" + mime: str | None = None + + if kind == "image": + mapped = "image" + filename = "image.jpg" + elif kind == "video": + mapped = "video" + filename = "video.mp4" + elif kind == "audio": + mapped = "audio" + mime = payload.get("mime_type") if isinstance(payload.get("mime_type"), str) else "audio/mpeg" + filename = "audio.bin" + elif kind == "file": + fname = payload.get("filename") + filename = fname if isinstance(fname, str) and fname else "file.bin" + mapped = "document" + else: + return None + + attachment = Attachment( + type=mapped, + url=url or None, + content=None, filename=filename, - mime_type=mime_type, - ) \ No newline at end of file + mime_type=mime, + ) + meta = dict(raw) + if token: + meta["_download_token_hint"] = token + return attachment, meta + + +def collect_max_attachments(message_body: dict[str, Any]) -> tuple[list[Attachment], list[dict[str, Any]]]: + attachments = message_body.get("attachments") + if not isinstance(attachments, list): + return [], [] + + core_list: list[Attachment] = [] + raw_list: list[dict[str, Any]] = [] + + for item in attachments: + if isinstance(item, dict): + parsed = attachment_from_max_dict(item) + if parsed is None: + continue + core_a, raw_a = parsed + core_list.append(core_a) + raw_list.append(raw_a) + return core_list, raw_list diff --git a/adapter/max/files.py b/adapter/max/files.py index 58b87fb..8e70718 100644 --- a/adapter/max/files.py +++ b/adapter/max/files.py @@ -1,51 +1,88 @@ -"""File handling for MAX surface.""" -import os -import aiohttp +"""Incoming / outgoing file helpers for MAX (aligned with Matrix workspace layout).""" +from __future__ import annotations + +import mimetypes from pathlib import Path +import httpx -class FileHandler: - def __init__(self, workspace_root: str): - self.workspace_root = workspace_root - def _make_unique_filename(self, directory: str, filename: str) -> str: - base = Path(filename).stem - ext = Path(filename).suffix - candidate = filename - counter = 1 - while os.path.exists(os.path.join(directory, candidate)): - candidate = f"{base} ({counter}){ext}" - counter += 1 - return candidate +def guess_upload_type(mime_type: str | None, *, attachment_type: str) -> str: + if attachment_type == "image": + return "image" + if attachment_type == "video": + return "video" + if attachment_type == "audio": + return "audio" + mime = mime_type or "" + if mime.startswith("image/"): + return "image" + if mime.startswith("video/"): + return "video" + if mime.startswith("audio/"): + return "audio" + return "file" - async def download_attachment( - self, - download_url: str, - filename: str, - agent_workspace: str, - headers: dict = None, - ) -> str: - full_dir = os.path.join(self.workspace_root, agent_workspace.strip("/")) - os.makedirs(full_dir, exist_ok=True) - unique_name = self._make_unique_filename(full_dir, filename) - filepath = os.path.join(full_dir, unique_name) +async def save_incoming_from_url( + *, + api: MaxBotApi, + workspace_root: Path, + filename: str, + url: str, +) -> str: + data = await api.download_file(url) + workspace_root.mkdir(parents=True, exist_ok=True) + relative_path, absolute_path = build_agent_workspace_path( + workspace_root=workspace_root, + filename=filename, + ) + absolute_path.parent.mkdir(parents=True, exist_ok=True) + absolute_path.write_bytes(data) + return relative_path - async with aiohttp.ClientSession() as session: - async with session.get(download_url, headers=headers) as resp: - resp.raise_for_status() - with open(filepath, "wb") as f: - f.write(await resp.read()) - return unique_name +async def upload_file_as_attachment( + api: MaxBotApi, + *, + filename: str, + content: bytes, + upload_type: str, +) -> dict: + meta = await api.get_upload_url(upload_type) + upload_url = meta.get("url") + token = meta.get("token") + if not isinstance(upload_url, str) or not upload_url: + raise RuntimeError("MAX uploads response missing url") - def read_outgoing_file(self, workspace_path: str, agent_workspace: str) -> bytes: - full_dir = os.path.join(self.workspace_root, agent_workspace.strip("/")) - filepath = os.path.join(full_dir, workspace_path.lstrip("/")) - with open(filepath, "rb") as f: - return f.read() + async with httpx.AsyncClient(timeout=httpx.Timeout(120.0)) as client: + response = await client.post( + upload_url, + files={"data": (filename, content, guess_mimetype(filename))}, + ) + response.raise_for_status() - def file_exists(self, workspace_path: str, agent_workspace: str) -> bool: - full_dir = os.path.join(self.workspace_root, agent_workspace.strip("/")) - filepath = os.path.join(full_dir, workspace_path.lstrip("/")) - return os.path.exists(filepath) \ No newline at end of file + payload: dict = {} + if token: + payload["token"] = token + if upload_type == "image": + return {"type": "image", "payload": payload} + + type_map = { + "file": "file", + "video": "video", + "audio": "audio", + } + mapped = type_map.get(upload_type, "file") + return {"type": mapped, "payload": payload} + + +def guess_mimetype(filename: str) -> str: + mime, _ = mimetypes.guess_type(filename) + return mime or "application/octet-stream" + + +def read_workspace_bytes(workspace_path: str | Path, *, agent_workspace: str) -> bytes: + root = Path(agent_workspace) + resolved = resolve_workspace_attachment_path(root, str(workspace_path)) + return resolved.read_bytes() diff --git a/adapter/max/handlers/__init__.py b/adapter/max/handlers/__init__.py index 3355924..93da8fa 100644 --- a/adapter/max/handlers/__init__.py +++ b/adapter/max/handlers/__init__.py @@ -1 +1,5 @@ -"""MAX surface handlers.""" \ No newline at end of file +"""MAX surface handlers.""" + +from adapter.max.handlers.commands import register_max_handlers + +__all__ = ["register_max_handlers"] diff --git a/adapter/max/handlers/attachments.py b/adapter/max/handlers/attachments.py index c4d0da5..ab94150 100644 --- a/adapter/max/handlers/attachments.py +++ b/adapter/max/handlers/attachments.py @@ -9,7 +9,7 @@ class AttachmentHandler: def handle_list(self, max_chat_id: str) -> str: attachments = self.store.get_attachments(max_chat_id) if not attachments: - return "Attachment queue is empty." + return "Очередь вложений пуста." lines = [f" {i+1}. {name}" for i, (_, name) in enumerate(attachments)] return "\n".join(lines) @@ -17,13 +17,13 @@ class AttachmentHandler: attachments = self.store.staged_attachments.get(max_chat_id, []) if index.lower() == "all": self.store.staged_attachments[max_chat_id] = [] - return "All attachments removed from queue." + return "Все вложения удалены из очереди." try: idx = int(index) - 1 if 0 <= idx < len(attachments): removed = attachments.pop(idx) - return f"Removed: {removed[1]}" - return "Invalid index." + return f"Удалено: {removed[1]}" + return "Неверный номер." except ValueError: - return "Usage: !remove or !remove all" \ No newline at end of file + return "Использование: /remove <номер> или /remove all" diff --git a/adapter/max/handlers/chat.py b/adapter/max/handlers/chat.py index 2933d5f..f8b4b3c 100644 --- a/adapter/max/handlers/chat.py +++ b/adapter/max/handlers/chat.py @@ -7,7 +7,15 @@ class ChatHandler: def __init__(self, store: ChatStore): self.store = store - def handle_new(self, max_chat_id: str, user_id: str, agent_id: str, name: str = None) -> str: + def handle_new( + self, + max_chat_id: str, + user_id: str, + agent_id: str, + name: str | None = None, + *, + workspace_path: str = "", + ) -> str: platform_chat_id = str(uuid.uuid4()) room = RoomMeta( platform_chat_id=platform_chat_id, @@ -15,6 +23,7 @@ class ChatHandler: name=name or "New Chat", user_id=user_id, agent_id=agent_id, + workspace_path=workspace_path, ) self.store.add_room(room) return platform_chat_id @@ -22,27 +31,27 @@ class ChatHandler: def handle_chats(self, user_id: str) -> str: rooms = self.store.list_rooms_for_user(user_id) if not rooms: - return "No active chats." + return "Нет активных чатов." lines = [f" {i+1}. {r.name}" for i, r in enumerate(rooms)] return "\n".join(lines) def handle_rename(self, max_chat_id: str, new_name: str) -> str: room = self.store.get_room_by_max_chat_id(max_chat_id) if not room: - return "Chat not found." + return "Чат не найден." room.name = new_name - return f"Chat renamed to: {new_name}" + return f"Чат переименован в: {new_name}" def handle_archive(self, max_chat_id: str) -> str: room = self.store.get_room_by_max_chat_id(max_chat_id) if not room: - return "Chat not found." + return "Чат не найден." self.store.remove_room(max_chat_id) - return "Chat archived." + return "Чат архивирован." def handle_clear(self, max_chat_id: str) -> str: room = self.store.get_room_by_max_chat_id(max_chat_id) if not room: - return "Chat not found." + return "Чат не найден." room.platform_chat_id = str(uuid.uuid4()) - return "Chat context cleared." \ No newline at end of file + return "Контекст чата очищен." \ No newline at end of file diff --git a/adapter/max/handlers/commands.py b/adapter/max/handlers/commands.py new file mode 100644 index 0000000..a488143 --- /dev/null +++ b/adapter/max/handlers/commands.py @@ -0,0 +1,112 @@ +from __future__ import annotations + +from adapter.max.handlers.chat import ChatHandler as MaxChatHandler +from adapter.max.handlers.help import get_help +from adapter.max.store import ChatStore +from sdk.prototype_state import PrototypeStateStore + +from core.handler import EventDispatcher +from core.protocol import IncomingCallback, IncomingCommand, OutgoingMessage + +_SINGLE_DIALOG_HINT = ( + "В MAX один диалог с ботом. Чтобы сбросить контекст агента, используйте /clear или /reset." +) + + +def register_max_handlers( + dispatcher: EventDispatcher, + *, + chat_store: ChatStore, + max_chat_handler: MaxChatHandler, + prototype_state: PrototypeStateStore, +) -> None: + async def _room_or_error( + event: IncomingCommand, + ) -> tuple[str, str] | list[OutgoingMessage]: + room = chat_store.get_room_by_platform_chat_id(event.chat_id) + if room is None: + return [ + OutgoingMessage( + chat_id=event.chat_id, + text="Состояние ещё не готово. Напишите сообщение ещё раз.", + ) + ] + return room.max_chat_id, room.platform_chat_id + + async def handle_max_help(event: IncomingCommand, auth_mgr, platform, chat_mgr, settings_mgr): # noqa: ARG001 + if not await auth_mgr.is_authenticated(event.user_id): + return [ + OutgoingMessage( + chat_id=event.chat_id, + text="Введите /start чтобы начать.", + ) + ] + return [OutgoingMessage(chat_id=event.chat_id, text=get_help())] + + async def handle_max_no_multichat(event: IncomingCommand, auth_mgr, platform, chat_mgr, settings_mgr): # noqa: ARG001 + if not await auth_mgr.is_authenticated(event.user_id): + return [ + OutgoingMessage( + chat_id=event.chat_id, + text="Введите /start чтобы начать.", + ) + ] + _ = event + return [OutgoingMessage(chat_id=event.chat_id, text=_SINGLE_DIALOG_HINT)] + + async def handle_max_clear(event: IncomingCommand, auth_mgr, platform, chat_mgr, settings_mgr): # noqa: ARG001 + if not await auth_mgr.is_authenticated(event.user_id): + return [ + OutgoingMessage( + chat_id=event.chat_id, + text="Введите /start чтобы начать.", + ) + ] + routed = await _room_or_error(event) + if isinstance(routed, list): + return routed + max_chat_id, platform_chat_old = routed + + await prototype_state.clear_current_session(platform_chat_old) + await chat_mgr.archive(platform_chat_old, event.user_id) + max_chat_handler.handle_clear(max_chat_id) + room = chat_store.get_room_by_max_chat_id(max_chat_id) + if room is None: + return [ + OutgoingMessage( + chat_id=platform_chat_old, + text="Не удалось сбросить контекст.", + ) + ] + platform_chat_new = room.platform_chat_id + await chat_mgr.get_or_create( + user_id=event.user_id, + chat_id=platform_chat_new, + platform="max", + surface_ref=max_chat_id, + name=room.name, + ) + return [ + OutgoingMessage( + chat_id=platform_chat_new, + text="Контекст агента сброшен.", + ) + ] + + for cmd in ("new", "rename", "archive", "chats"): + dispatcher.register(IncomingCommand, cmd, handle_max_no_multichat) + + dispatcher.register(IncomingCommand, "reset", handle_max_clear) + dispatcher.register(IncomingCommand, "clear", handle_max_clear) + dispatcher.register(IncomingCommand, "help", handle_max_help) + + async def handle_max_plain_callback(event: IncomingCallback, auth_mgr, platform, chat_mgr, settings_mgr): # noqa: ARG001 + payload = str(event.payload.get("payload", "")) + return [ + OutgoingMessage( + chat_id=event.chat_id, + text=f"Неизвестное действие кнопки: {payload}", + ) + ] + + dispatcher.register(IncomingCallback, "max_callback", handle_max_plain_callback) diff --git a/adapter/max/handlers/help.py b/adapter/max/handlers/help.py index 24181c2..cad3e32 100644 --- a/adapter/max/handlers/help.py +++ b/adapter/max/handlers/help.py @@ -1,26 +1,26 @@ -"""Help handler for MAX surface.""" +"""Help text for MAX surface (single dialog, slash commands).""" HELP_TEXT = """ -Available commands: +Команды (/ как в Telegram): -Chat management: - !new [name] — Create a new chat - !chats — List active chats - !rename — Rename current chat - !archive — Archive current chat - !clear / !reset — Reset chat context + /start — начать + /help — эта справка + /clear или /reset — сбросить контекст агента -Attachments: - !list — Show attachment queue - !remove — Remove attachment from queue - !remove all — Clear attachment queue +Вложения (файл без текста ставится в очередь): -Actions: - !yes — Confirm agent action - !no — Cancel agent action - !help — Show this help + /list — очередь вложений + /remove n — убрать из очереди + /remove all — очистить очередь + +Подтверждения агента: + + /yes / /no + +Команды вида /new, /chats, /rename, /archive в MAX не нужны — +у вас один диалог с ботом; контекст сбрасывайте через /clear. """ def get_help() -> str: - return HELP_TEXT.strip() \ No newline at end of file + return HELP_TEXT.strip() diff --git a/adapter/max/store.py b/adapter/max/store.py index d8e6ce6..ae9878e 100644 --- a/adapter/max/store.py +++ b/adapter/max/store.py @@ -10,6 +10,7 @@ class RoomMeta: name: str user_id: str agent_id: str + workspace_path: str = "" @dataclass diff --git a/tests/adapter/max/__init__.py b/tests/adapter/max/__init__.py new file mode 100644 index 0000000..f13a104 --- /dev/null +++ b/tests/adapter/max/__init__.py @@ -0,0 +1 @@ +# MAX adapter tests diff --git a/tests/adapter/max/test_agent_registry.py b/tests/adapter/max/test_agent_registry.py new file mode 100644 index 0000000..f69a9f1 --- /dev/null +++ b/tests/adapter/max/test_agent_registry.py @@ -0,0 +1,88 @@ +from pathlib import Path + +import pytest + +from adapter.max.agent_registry import AgentRegistryError, load_agent_registry + + +def test_load_agent_registry_reads_yaml(tmp_path: Path): + path = tmp_path / "max.yaml" + path.write_text( + "agents:\n" + " - id: agent-1\n" + " label: One\n" + " base_url: http://localhost:8000/a1/\n" + " workspace_path: /agents/1\n", + encoding="utf-8", + ) + reg = load_agent_registry(path) + assert [a.agent_id for a in reg.agents] == ["agent-1"] + a = reg.get("agent-1") + assert a.label == "One" + assert a.base_url == "http://localhost:8000/a1/" + assert a.workspace_path == "/agents/1" + + +def test_user_agents_resolve(tmp_path: Path): + path = tmp_path / "max.yaml" + path.write_text( + "user_agents:\n" + ' "42": agent-1\n' + "agents:\n" + " - id: agent-1\n" + " label: One\n" + " - id: agent-2\n" + " label: Two\n", + encoding="utf-8", + ) + reg = load_agent_registry(path) + assert reg.resolve_agent_for_user("42").agent_id == "agent-1" + assert reg.resolve_agent_for_user("42").source == "configured" + assert reg.resolve_agent_for_user("999").agent_id == "agent-1" + assert reg.resolve_agent_for_user("999").source == "default" + + +def test_duplicate_ids_rejected(tmp_path: Path): + path = tmp_path / "max.yaml" + path.write_text( + "agents:\n" + " - id: a\n" + " label: A\n" + " - id: a\n" + " label: B\n", + encoding="utf-8", + ) + with pytest.raises(AgentRegistryError, match="duplicate agent id"): + load_agent_registry(path) + + +def test_empty_agents_rejected(tmp_path: Path): + path = tmp_path / "max.yaml" + path.write_text("agents: []\n", encoding="utf-8") + with pytest.raises(AgentRegistryError, match="non-empty"): + load_agent_registry(path) + + +def test_user_agents_must_be_strings(tmp_path: Path): + path = tmp_path / "max.yaml" + path.write_text( + "user_agents:\n" + " 42: agent-1\n" + "agents:\n" + " - id: agent-1\n" + " label: One\n", + encoding="utf-8", + ) + with pytest.raises(AgentRegistryError, match="user_agents"): + load_agent_registry(path) + + +def test_unknown_agent_raises(tmp_path: Path): + path = tmp_path / "max.yaml" + path.write_text( + "agents:\n - id: a\n label: A\n", + encoding="utf-8", + ) + reg = load_agent_registry(path) + with pytest.raises(AgentRegistryError, match="unknown agent id"): + reg.get("missing") diff --git a/tests/adapter/max/test_api_client.py b/tests/adapter/max/test_api_client.py new file mode 100644 index 0000000..206d2d9 --- /dev/null +++ b/tests/adapter/max/test_api_client.py @@ -0,0 +1,90 @@ +from __future__ import annotations + +from unittest.mock import AsyncMock + +import httpx +import pytest + +from adapter.max.api_client import MaxApiError, MaxBotApi + + +@pytest.mark.asyncio +async def test_get_updates_returns_marker_and_updates(): + api = MaxBotApi("token-x", base_url="http://max.test") + try: + api._client.request = AsyncMock( + return_value=httpx.Response( + 200, + json={ + "updates": [{"update_type": "message_created", "timestamp": 1}], + "marker": 7, + }, + ) + ) + updates, marker = await api.get_updates(types=["message_created"]) + assert len(updates) == 1 + assert updates[0]["update_type"] == "message_created" + assert marker == 7 + + _, kwargs = api._client.request.call_args + assert kwargs["params"]["types"] == "message_created" + finally: + await api.aclose() + + +@pytest.mark.asyncio +async def test_get_updates_non_dict_body(): + api = MaxBotApi("token-x", base_url="http://max.test") + try: + api._client.request = AsyncMock(return_value=httpx.Response(200, text="oops")) + updates, marker = await api.get_updates() + assert updates == [] + assert marker is None + finally: + await api.aclose() + + +@pytest.mark.asyncio +async def test_http_error_raises_max_api_error(): + api = MaxBotApi("token-x", base_url="http://max.test") + try: + api._client.request = AsyncMock( + return_value=httpx.Response(401, json={"code": "verify.token", "message": "bad"}) + ) + with pytest.raises(MaxApiError) as ei: + await api.get_me() + assert ei.value.status == 401 + assert "bad" in str(ei.value).lower() or ei.value.payload + finally: + await api.aclose() + + +@pytest.mark.asyncio +async def test_send_message_to_chat_posts_json_body(): + api = MaxBotApi("token-x", base_url="http://max.test") + try: + api._client.request = AsyncMock(return_value=httpx.Response(200, json={"message": {}})) + + await api.send_message_to_chat(12345, text="hi", attachments=None, fmt=None) + + args, kw = api._client.request.call_args + assert args[0] == "POST" + assert args[1] == "/messages" + assert kw["params"]["chat_id"] == 12345 + assert kw["json"] == {"text": "hi"} + finally: + await api.aclose() + + +@pytest.mark.asyncio +async def test_download_file_uses_get(): + api = MaxBotApi("token-x", base_url="http://max.test") + try: + api._client.get = AsyncMock(return_value=httpx.Response(200, content=b"\xff\xd8")) + + buf = await api.download_file("https://files.example/bin") + + assert buf == b"\xff\xd8" + api._client.get.assert_awaited_once() + finally: + await api.aclose() diff --git a/tests/adapter/max/test_converter.py b/tests/adapter/max/test_converter.py new file mode 100644 index 0000000..b5f1a6e --- /dev/null +++ b/tests/adapter/max/test_converter.py @@ -0,0 +1,154 @@ +from __future__ import annotations + +import pytest + +from adapter.max.converter import ( + attachment_from_max_dict, + collect_max_attachments, + incoming_from_message_callback_payload, + incoming_from_text_commands, +) +from core.protocol import Attachment, IncomingCallback, IncomingCommand, IncomingMessage + + +@pytest.mark.parametrize( + "text,expect_type", + [ + ("Hello", IncomingMessage), + (" plain ", IncomingMessage), + ], +) +def test_plain_text_to_message(text, expect_type): + r = incoming_from_text_commands( + text=text, + max_user_id="10", + platform_chat_id="pc-1", + attachments=[], + ) + assert type(r) is expect_type + assert r.text == text + + +def test_slash_command_split(): + r = incoming_from_text_commands( + text="/new title here", + max_user_id="10", + platform_chat_id="pc-1", + attachments=[], + ) + assert isinstance(r, IncomingCommand) + assert r.command == "new" + assert r.args == ["title", "here"] + + +def test_slash_command_no_args(): + r = incoming_from_text_commands( + text="/help", + max_user_id="10", + platform_chat_id="pc-1", + attachments=[], + ) + assert isinstance(r, IncomingCommand) + assert r.command == "help" + assert r.args == [] + + +def test_bang_prefix_is_plain_message_not_command(): + """MAX: только / считается командой.""" + r = incoming_from_text_commands( + text="!help", + max_user_id="10", + platform_chat_id="pc-1", + attachments=[], + ) + assert isinstance(r, IncomingMessage) + assert r.text == "!help" + + +def test_yes_no_callbacks(): + yes = incoming_from_text_commands( + text="/yes", + max_user_id="10", + platform_chat_id="pc-1", + attachments=[], + ) + assert isinstance(yes, IncomingCallback) + assert yes.action == "confirm" + + no = incoming_from_text_commands( + text="/NO", + max_user_id="10", + platform_chat_id="pc-1", + attachments=[], + ) + assert isinstance(no, IncomingCallback) + assert no.action == "cancel" + + +def test_incoming_message_keeps_attachments(): + at = [Attachment(type="document", filename="a.txt")] + r = incoming_from_text_commands( + text="see file", + max_user_id="10", + platform_chat_id="pc-1", + attachments=at, + ) + assert isinstance(r, IncomingMessage) + assert r.attachments == at + + +def test_message_callback_known_actions(): + c = incoming_from_message_callback_payload( + max_user_id="10", + platform_chat_id="pc", + payload_raw="confirm", + callback_message_id="mid", + ) + assert c is not None + assert isinstance(c, IncomingCallback) + assert c.action == "confirm" + assert c.payload.get("message_id") == "mid" + + +def test_message_callback_unknown_becomes_max_callback(): + c = incoming_from_message_callback_payload( + max_user_id="10", + platform_chat_id="pc", + payload_raw="my_payload", + callback_message_id=None, + ) + assert c is not None + assert c.action == "max_callback" + assert c.payload["payload"] == "my_payload" + + +def test_attachment_from_max_file(): + parsed = attachment_from_max_dict( + { + "type": "file", + "payload": { + "url": "https://cdn.example/f", + "filename": "doc.pdf", + "token": "tok", + }, + } + ) + assert parsed is not None + att, raw = parsed + assert att.filename == "doc.pdf" + assert att.type == "document" + assert att.url == "https://cdn.example/f" + assert raw.get("_download_token_hint") == "tok" + + +def test_collect_max_attachments_skips_unknown(): + core, raw = collect_max_attachments( + { + "attachments": [ + {"type": "file", "payload": {"url": "u", "filename": "x.bin"}}, + {"type": "sticker", "payload": {}}, + ] + } + ) + assert len(core) == len(raw) == 1 + assert core[0].filename == "x.bin" diff --git a/tests/adapter/max/test_dispatcher_max.py b/tests/adapter/max/test_dispatcher_max.py new file mode 100644 index 0000000..e4c85a6 --- /dev/null +++ b/tests/adapter/max/test_dispatcher_max.py @@ -0,0 +1,98 @@ +from __future__ import annotations + +import pytest + +from adapter.max.handlers.chat import ChatHandler as MaxChatHandler +from adapter.max.handlers.commands import register_max_handlers +from adapter.max.store import ChatStore, RoomMeta +from core.auth import AuthManager +from core.chat import ChatManager +from core.handler import EventDispatcher +from core.handlers import register_all +from core.protocol import IncomingCommand, OutgoingMessage +from core.settings import SettingsManager +from core.store import InMemoryStore +from sdk.mock import MockPlatformClient +from sdk.prototype_state import PrototypeStateStore + + +def _build_dispatcher() -> tuple[ + EventDispatcher, + ChatManager, + AuthManager, + ChatStore, + str, +]: + store_mem = InMemoryStore() + chat_store = ChatStore() + chat_handler = MaxChatHandler(chat_store) + prototype_state = PrototypeStateStore() + platform = MockPlatformClient() + chat_mgr = ChatManager(platform, store_mem) + auth_mgr = AuthManager(platform, store_mem) + settings_mgr = SettingsManager(platform, store_mem) + dispatcher = EventDispatcher( + platform=platform, + chat_mgr=chat_mgr, + auth_mgr=auth_mgr, + settings_mgr=settings_mgr, + ) + register_all(dispatcher) + register_max_handlers( + dispatcher, + chat_store=chat_store, + max_chat_handler=chat_handler, + prototype_state=prototype_state, + ) + + pid = "550e8400-e29b-41d4-a716-446655440000" + chat_store.add_room( + RoomMeta( + platform_chat_id=pid, + max_chat_id="777", + name="Чат", + user_id="u1", + agent_id="agent-0", + workspace_path="/agents/0", + ) + ) + return dispatcher, chat_mgr, auth_mgr, chat_store, pid + + +@pytest.mark.asyncio +async def test_dispatcher_new_is_single_dialog_hint(): + dispatcher, _chat_mgr, auth_mgr, _chat_store, pid = _build_dispatcher() + await auth_mgr.confirm("u1") + + out = await dispatcher.dispatch( + IncomingCommand(user_id="u1", platform="max", chat_id=pid, command="new"), + ) + assert len(out) == 1 + assert isinstance(out[0], OutgoingMessage) + assert "один диалог" in out[0].text.lower() + + +@pytest.mark.asyncio +async def test_dispatcher_clear_rotates_platform_chat(): + dispatcher, chat_mgr, auth_mgr, chat_store, pid = _build_dispatcher() + await auth_mgr.confirm("u1") + await chat_mgr.get_or_create( + user_id="u1", + chat_id=pid, + platform="max", + surface_ref="777", + name="Чат", + ) + + out = await dispatcher.dispatch( + IncomingCommand(user_id="u1", platform="max", chat_id=pid, command="clear"), + ) + assert len(out) == 1 + msg = out[0] + assert isinstance(msg, OutgoingMessage) + assert msg.chat_id != pid + assert "сброшен" in msg.text.lower() + + room = chat_store.get_room_by_max_chat_id("777") + assert room is not None + assert room.platform_chat_id == msg.chat_id diff --git a/tests/adapter/max/test_store.py b/tests/adapter/max/test_store.py new file mode 100644 index 0000000..176dc10 --- /dev/null +++ b/tests/adapter/max/test_store.py @@ -0,0 +1,78 @@ +from __future__ import annotations + +import uuid + +from adapter.max.handlers.attachments import AttachmentHandler +from adapter.max.handlers.chat import ChatHandler as MaxChatHandler +from adapter.max.store import ChatStore, RoomMeta + + +def test_chat_store_room_roundtrip(): + store = ChatStore() + r = RoomMeta( + platform_chat_id="pid-1", + max_chat_id="100", + name="Main", + user_id="42", + agent_id="agent-0", + workspace_path="/agents/0", + ) + store.add_room(r) + assert store.get_room_by_max_chat_id("100") is r + assert store.get_room_by_platform_chat_id("pid-1") is r + + +def test_staged_attachments(): + store = ChatStore() + store.stage_attachment("100", ("rel/path.txt", "path.txt")) + assert store.get_attachments("100") + popped = store.pop_attachments("100") + assert len(popped) == 1 + assert store.pop_attachments("100") == [] + + +def test_remove_room_clears_staging(): + store = ChatStore() + store.stage_attachment("100", ("a", "a")) + store.add_room( + RoomMeta( + platform_chat_id="x", + max_chat_id="100", + name="", + user_id="u", + agent_id="a", + ) + ) + store.remove_room("100") + assert store.get_room_by_max_chat_id("100") is None + assert store.get_attachments("100") == [] + + +def test_chat_handler_clear_rotates_platform_id(): + store = ChatStore() + h = MaxChatHandler(store) + pid1 = str(uuid.uuid4()) + store.add_room( + RoomMeta( + platform_chat_id=pid1, + max_chat_id="100", + name="Tab", + user_id="42", + agent_id="agent-0", + workspace_path="/agents/0", + ) + ) + h.handle_clear("100") + room = store.get_room_by_max_chat_id("100") + assert room is not None + assert room.platform_chat_id != pid1 + + +def test_attachment_handler_list_remove(): + store = ChatStore() + h = AttachmentHandler(store) + store.stage_attachment("100", ("a", "f1.bin")) + assert "f1.bin" in h.handle_list("100") + msg = h.handle_remove("100", "1") + assert "Удалено" in msg or "удалено" in msg.lower() + assert "пуста" in h.handle_list("100").lower() or "пусто" in h.handle_list("100").lower() From 961ee7bb0b74997667e992e319af78c426e13715 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=90=D0=BB=D0=B5=D0=BA=D1=81=D0=B0=D0=BD=D0=B4=D1=80?= =?UTF-8?q?=D0=B0=20=D0=9F=D1=80=D0=BE=D0=BD=D0=B8=D0=BD=D0=B0?= Date: Fri, 15 May 2026 10:37:12 +0300 Subject: [PATCH 5/5] chore(max): add docker-compose.max and example MAX/agent config --- .env.example | 2 +- config/max-agents.yaml | 7 ++++- docker-compose.max.yml | 59 +++++++++++++++++++++++++++++++++++------- 3 files changed, 56 insertions(+), 12 deletions(-) diff --git a/.env.example b/.env.example index ebc56e4..27fe5dd 100644 --- a/.env.example +++ b/.env.example @@ -33,5 +33,5 @@ SURFACES_BOT_STATE_VOLUME=surfaces-bot-state # MAX Surface MAX_BOT_TOKEN=real_max_token -MAX_API_URL=https://api.max.ru/v1 +MAX_API_URL=https://platform-api.max.ru MAX_AGENT_REGISTRY_PATH=/app/config/max-agents.yaml \ No newline at end of file diff --git a/config/max-agents.yaml b/config/max-agents.yaml index 8713355..5dfc4dd 100644 --- a/config/max-agents.yaml +++ b/config/max-agents.yaml @@ -1,5 +1,10 @@ +# Пример реестра агентов для MAX (формат совпадает с matrix-agents). +# +# user_agents: +# "123456789": agent-0 +# agents: - id: agent-0 label: "Agent 0" base_url: "http://agent-proxy:7000/agent_0/" - workspace_path: "/agents/0" \ No newline at end of file + workspace_path: "/agents/0" diff --git a/docker-compose.max.yml b/docker-compose.max.yml index cd75da1..266a44e 100644 --- a/docker-compose.max.yml +++ b/docker-compose.max.yml @@ -1,24 +1,63 @@ +# Локальный MAX + platform-agent из исходников (аналог docker-compose.fullstack.yml для Matrix). +# Продакшен: только max-bot из docker-compose.prod.yml; AGENT_BASE_URL — URL агента, который поднимает команда платформы. services: max-bot: build: context: . target: development + env_file: .env environment: - - MAX_BOT_TOKEN=${MAX_BOT_TOKEN} - - MAX_API_URL=${MAX_API_URL:-https://api.max.ru/v1} - - MAX_AGENT_REGISTRY_PATH=/app/config/max-agents.yaml - - AGENT_BASE_URL=http://platform-agent:8000 - - SURFACES_WORKSPACE_DIR=/agents + MAX_BOT_TOKEN: ${MAX_BOT_TOKEN:?set MAX_BOT_TOKEN in .env} + MAX_API_URL: ${MAX_API_URL:-https://platform-api.max.ru} + MAX_AGENT_REGISTRY_PATH: ${MAX_AGENT_REGISTRY_PATH:-/app/config/max-agents.yaml} + AGENT_BASE_URL: http://platform-agent:8000 + SURFACES_WORKSPACE_DIR: ${SURFACES_WORKSPACE_DIR:-/agents} + PYTHONUNBUFFERED: "1" + depends_on: + platform-agent: + condition: service_healthy volumes: - - surfaces-agents:/agents + - agents:/agents + - ./config:/app/config:ro command: python -m adapter.max.bot + restart: unless-stopped platform-agent: - image: platform-agent:latest + build: + context: ./external/platform-agent + target: development + additional_contexts: + agent_api: ./external/platform-agent_api + env_file: .env environment: - - WORKSPACE_DIR=/workspace + PYTHONUNBUFFERED: "1" + AGENT_ID: ${AGENT_ID:-max-dev} + PROVIDER_MODEL: ${PROVIDER_MODEL:-openai/gpt-4o-mini} + PROVIDER_URL: ${PROVIDER_URL:-} + PROVIDER_API_KEY: ${PROVIDER_API_KEY:-} + COMPOSIO_API_KEY: ${COMPOSIO_API_KEY:-} volumes: - - surfaces-agents:/workspace + - ./external/platform-agent/src:/app/src + - ./external/platform-agent_api:/agent_api + - agents:/workspace + command: > + sh -lc " + mkdir -p /workspace && + chown -R agent:agent /workspace && + exec /app/.venv/bin/uvicorn src.main:app --host 0.0.0.0 --port 8000 --no-access-log + " + ports: + - "8000:8000" + healthcheck: + test: + - CMD-SHELL + - python -c "import urllib.request; urllib.request.urlopen('http://127.0.0.1:8000/openapi.json', timeout=2).read()" + interval: 60s + timeout: 5s + retries: 5 + start_period: 15s + restart: unless-stopped volumes: - surfaces-agents: \ No newline at end of file + agents: + name: ${SURFACES_SHARED_VOLUME:-surfaces-agents}