diff --git a/adapter/max/__init__.py b/adapter/max/__init__.py new file mode 100644 index 0000000..c12c8ba --- /dev/null +++ b/adapter/max/__init__.py @@ -0,0 +1 @@ +"""MAX surface adapter.""" \ No newline at end of file diff --git a/adapter/max/agent_registry.py b/adapter/max/agent_registry.py new file mode 100644 index 0000000..a3c2b67 --- /dev/null +++ b/adapter/max/agent_registry.py @@ -0,0 +1,50 @@ +"""Agent registry for MAX surface.""" +import os +import yaml +from typing import List, Optional +from dataclasses import dataclass, field + + +@dataclass +class AgentConfig: + id: str + label: str + base_url: str + workspace_path: str + + +@dataclass +class AgentRegistry: + agents: List[AgentConfig] = field(default_factory=list) + + def get_agent_for_user(self, user_id: str) -> AgentConfig: + return self.agents[0] + + def get_agent_by_id(self, agent_id: str) -> Optional[AgentConfig]: + for agent in self.agents: + if agent.id == agent_id: + return agent + return None + + +def load_agent_registry(path: str) -> AgentRegistry: + with open(path, "r") as f: + data = yaml.safe_load(f) + + registry = AgentRegistry() + for a in data.get("agents", []): + registry.agents.append(AgentConfig( + id=a["id"], + label=a.get("label", ""), + base_url=a["base_url"], + workspace_path=a["workspace_path"], + )) + return registry + + +def load_from_env() -> AgentRegistry: + path = os.environ.get( + "MAX_AGENT_REGISTRY_PATH", + "/app/config/max-agents.yaml", + ) + return load_agent_registry(path) \ No newline at end of file diff --git a/adapter/max/bot.py b/adapter/max/bot.py new file mode 100644 index 0000000..af857f7 --- /dev/null +++ b/adapter/max/bot.py @@ -0,0 +1,247 @@ +"""MAX surface bot runtime.""" +import os +import asyncio +import aiohttp +from typing import Optional + +from adapter.max.agent_registry import load_from_env, AgentRegistry, AgentConfig +from adapter.max.store import ChatStore +from adapter.max.files import FileHandler +from adapter.max.converter import ( + max_message_to_incoming, + max_attachment_to_internal, +) +from adapter.max.handlers.chat import ChatHandler +from adapter.max.handlers.attachments import AttachmentHandler +from adapter.max.handlers.help import get_help +from core.protocol import IncomingMessage, IncomingCommand, IncomingCallback + + +class MaxSurface: + def __init__(self): + self.token = os.environ["MAX_BOT_TOKEN"] + self.api_url = os.environ.get("MAX_API_URL", "https://api.max.ru/v1") + self.workspace_dir = os.environ.get("SURFACES_WORKSPACE_DIR", "/agents") + + self.registry: AgentRegistry = load_from_env() + self.store = ChatStore() + self.files = FileHandler(self.workspace_dir) + self.chat_handler = ChatHandler(self.store) + self.attach_handler = AttachmentHandler(self.store) + self.session: Optional[aiohttp.ClientSession] = None + + async def start(self): + self.session = aiohttp.ClientSession( + headers={"Authorization": f"Bearer {self.token}"} + ) + print("[MAX Surface] Starting long poll...") + offset = 0 + + while True: + try: + updates = await self._get_updates(offset) + for update in updates: + offset = update["update_id"] + 1 + await self._process_update(update) + except Exception as e: + print(f"[MAX Surface] Error: {e}") + await asyncio.sleep(5) + + async def _get_updates(self, offset: int) -> list: + async with self.session.get( + f"{self.api_url}/updates", + params={"offset": offset, "timeout": 30}, + ) as resp: + data = await resp.json() + return data.get("result", []) + + async def _process_update(self, update: dict) -> None: + if "message" in update: + await self._handle_message(update["message"]) + elif "callback_query" in update: + await self._handle_callback(update["callback_query"]) + + async def _handle_message(self, message: dict) -> None: + text = message.get("text", "") or message.get("caption", "") + user_id = str(message["from"]["id"]) + chat_id = str(message["chat"]["id"]) + message_id = str(message["message_id"]) + + room = self.store.get_room_by_max_chat_id(chat_id) + if room is None: + agent = self.registry.get_agent_for_user(user_id) + platform_chat_id = self.chat_handler.handle_new( + max_chat_id=chat_id, + user_id=user_id, + agent_id=agent.id, + ) + room = self.store.get_room_by_max_chat_id(chat_id) + else: + agent = self.registry.get_agent_by_id(room.agent_id) + + attachments = [] + if "attachment" in message: + att = message["attachment"] + internal_att = max_attachment_to_internal( + filename=att["filename"], + mime_type=att.get("mime_type", "application/octet-stream"), + download_url=att["download_url"], + ) + attachments.append(internal_att) + + workspace_path = await self.files.download_attachment( + download_url=att["download_url"], + filename=att["filename"], + agent_workspace=agent.workspace_path, + headers={"Authorization": f"Bearer {self.token}"}, + ) + self.store.stage_attachment(chat_id, (workspace_path, att["filename"])) + + incoming = max_message_to_incoming( + text=text, + user_id=user_id, + chat_id=room.platform_chat_id, + attachments=attachments, + ) + + if isinstance(incoming, IncomingCommand): + response_text = await self._handle_surface_command( + incoming, max_chat_id=chat_id, user_id=user_id, agent=agent + ) + if response_text: + await self._send_message(chat_id, response_text) + return + + if isinstance(incoming, IncomingCallback): + await self._call_agent( + agent=agent, + platform_chat_id=room.platform_chat_id, + message=incoming, + ) + return + + if isinstance(incoming, IncomingMessage) and (incoming.text or attachments): + queued = self.store.pop_attachments(chat_id) + await self._send_typing(chat_id) + agent_response = await self._call_agent( + agent=agent, + platform_chat_id=room.platform_chat_id, + message=incoming, + attachments=queued, + ) + await self._send_message(chat_id, agent_response) + + async def _handle_callback(self, callback: dict) -> None: + user_id = str(callback["from"]["id"]) + chat_id = str(callback["message"]["chat"]["id"]) + message_id = str(callback["message"]["message_id"]) + data = callback.get("data", "") + + room = self.store.get_room_by_max_chat_id(chat_id) + if room is None: + return + + incoming = max_message_to_incoming( + text="", + user_id=user_id, + chat_id=room.platform_chat_id, + callback_data=data, + message_id=message_id, + ) + + agent = self.registry.get_agent_by_id(room.agent_id) + await self._call_agent( + agent=agent, + platform_chat_id=room.platform_chat_id, + message=incoming, + ) + + async def _handle_surface_command( + self, cmd: IncomingCommand, max_chat_id: str, user_id: str, agent: AgentConfig + ) -> Optional[str]: + command = cmd.command + args = cmd.args + + if command == "new": + name = " ".join(args) if args else None + self.chat_handler.handle_new( + max_chat_id=max_chat_id, + user_id=user_id, + agent_id=agent.id, + name=name, + ) + return f"New chat created: {name or 'Unnamed'}" + + elif command == "chats": + return self.chat_handler.handle_chats(user_id) + + elif command == "rename": + return self.chat_handler.handle_rename(max_chat_id, args) + + elif command == "archive": + return self.chat_handler.handle_archive(max_chat_id) + + elif command in ("clear", "reset"): + return self.chat_handler.handle_clear(max_chat_id) + + elif command == "list": + return self.attach_handler.handle_list(max_chat_id) + + elif command == "remove": + return self.attach_handler.handle_remove(max_chat_id, args) + + elif command == "help": + return get_help() + + return None + + async def _call_agent( + self, + agent: AgentConfig, + platform_chat_id: str, + message=None, + attachments: list = None, + ) -> str: + payload = { + "chat_id": platform_chat_id, + "agent_id": agent.id, + } + if message: + if isinstance(message, IncomingMessage): + payload["message"] = message.text + elif isinstance(message, IncomingCallback): + payload["action"] = message.action + payload["payload"] = message.payload + elif isinstance(message, IncomingCommand): + payload["command"] = message.command + payload["args"] = message.args + if attachments: + payload["attachments"] = [a[0] for a in attachments] + + url = f"{agent.base_url}/chat/{agent.id}" + async with self.session.post(url, json=payload) as resp: + data = await resp.json() + return data.get("response", "") + + async def _send_message(self, chat_id: str, text: str) -> None: + async with self.session.post( + f"{self.api_url}/sendMessage", + json={"chat_id": chat_id, "text": text}, + ) as resp: + await resp.json() + + async def _send_typing(self, chat_id: str) -> None: + async with self.session.post( + f"{self.api_url}/sendChatAction", + json={"chat_id": chat_id, "action": "typing"}, + ) as resp: + await resp.json() + + +async def main(): + surface = MaxSurface() + await surface.start() + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/adapter/max/converter.py b/adapter/max/converter.py new file mode 100644 index 0000000..2bbfd8b --- /dev/null +++ b/adapter/max/converter.py @@ -0,0 +1,88 @@ +"""MAX event to internal protocol converter.""" +from typing import Union, List +from core.protocol import ( + IncomingMessage, + IncomingCommand, + IncomingCallback, + Attachment, +) + + +def _extract_command(text: str) -> Union[IncomingCommand, IncomingCallback, None]: + if not text.startswith("!"): + return None + + parts = text.strip().split(maxsplit=1) + cmd = parts[0].lower() + args = parts[1] if len(parts) > 1 else "" + + if cmd == "!yes": + return IncomingCallback( + user_id="", + platform="max", + chat_id="", + action="confirm", + ) + elif cmd == "!no": + return IncomingCallback( + user_id="", + platform="max", + chat_id="", + action="cancel", + ) + else: + return IncomingCommand( + user_id="", + platform="max", + chat_id="", + command=cmd.lstrip("!"), + args=args.split() if args else [], + ) + + +def max_message_to_incoming( + *, + text: str, + user_id: str, + chat_id: str, + attachments: List[Attachment] = None, + callback_data: str = None, + message_id: str = None, +) -> Union[IncomingMessage, IncomingCommand, IncomingCallback]: + if callback_data: + return IncomingCallback( + user_id=user_id, + platform="max", + chat_id=chat_id, + action=callback_data, + payload={"message_id": message_id} if message_id else {}, + ) + + if text: + cmd = _extract_command(text) + if cmd is not None: + cmd.user_id = user_id + cmd.chat_id = chat_id + return cmd + + return IncomingMessage( + user_id=user_id, + platform="max", + chat_id=chat_id, + text=text or "", + attachments=attachments or [], + ) + + +def max_attachment_to_internal( + *, + filename: str, + mime_type: str, + download_url: str, +) -> Attachment: + return Attachment( + type="document", + url=download_url, + filename=filename, + mime_type=mime_type, + ) \ No newline at end of file diff --git a/adapter/max/files.py b/adapter/max/files.py new file mode 100644 index 0000000..58b87fb --- /dev/null +++ b/adapter/max/files.py @@ -0,0 +1,51 @@ +"""File handling for MAX surface.""" +import os +import aiohttp +from pathlib import Path + + +class FileHandler: + def __init__(self, workspace_root: str): + self.workspace_root = workspace_root + + def _make_unique_filename(self, directory: str, filename: str) -> str: + base = Path(filename).stem + ext = Path(filename).suffix + candidate = filename + counter = 1 + while os.path.exists(os.path.join(directory, candidate)): + candidate = f"{base} ({counter}){ext}" + counter += 1 + return candidate + + async def download_attachment( + self, + download_url: str, + filename: str, + agent_workspace: str, + headers: dict = None, + ) -> str: + full_dir = os.path.join(self.workspace_root, agent_workspace.strip("/")) + os.makedirs(full_dir, exist_ok=True) + + unique_name = self._make_unique_filename(full_dir, filename) + filepath = os.path.join(full_dir, unique_name) + + async with aiohttp.ClientSession() as session: + async with session.get(download_url, headers=headers) as resp: + resp.raise_for_status() + with open(filepath, "wb") as f: + f.write(await resp.read()) + + return unique_name + + def read_outgoing_file(self, workspace_path: str, agent_workspace: str) -> bytes: + full_dir = os.path.join(self.workspace_root, agent_workspace.strip("/")) + filepath = os.path.join(full_dir, workspace_path.lstrip("/")) + with open(filepath, "rb") as f: + return f.read() + + def file_exists(self, workspace_path: str, agent_workspace: str) -> bool: + full_dir = os.path.join(self.workspace_root, agent_workspace.strip("/")) + filepath = os.path.join(full_dir, workspace_path.lstrip("/")) + return os.path.exists(filepath) \ No newline at end of file diff --git a/adapter/max/handlers/__init__.py b/adapter/max/handlers/__init__.py new file mode 100644 index 0000000..3355924 --- /dev/null +++ b/adapter/max/handlers/__init__.py @@ -0,0 +1 @@ +"""MAX surface handlers.""" \ No newline at end of file diff --git a/adapter/max/handlers/attachments.py b/adapter/max/handlers/attachments.py new file mode 100644 index 0000000..c4d0da5 --- /dev/null +++ b/adapter/max/handlers/attachments.py @@ -0,0 +1,29 @@ +"""Attachment queue handlers for MAX surface.""" +from adapter.max.store import ChatStore + + +class AttachmentHandler: + def __init__(self, store: ChatStore): + self.store = store + + def handle_list(self, max_chat_id: str) -> str: + attachments = self.store.get_attachments(max_chat_id) + if not attachments: + return "Attachment queue is empty." + lines = [f" {i+1}. {name}" for i, (_, name) in enumerate(attachments)] + return "\n".join(lines) + + def handle_remove(self, max_chat_id: str, index: str) -> str: + attachments = self.store.staged_attachments.get(max_chat_id, []) + if index.lower() == "all": + self.store.staged_attachments[max_chat_id] = [] + return "All attachments removed from queue." + + try: + idx = int(index) - 1 + if 0 <= idx < len(attachments): + removed = attachments.pop(idx) + return f"Removed: {removed[1]}" + return "Invalid index." + except ValueError: + return "Usage: !remove or !remove all" \ No newline at end of file diff --git a/adapter/max/handlers/chat.py b/adapter/max/handlers/chat.py new file mode 100644 index 0000000..2933d5f --- /dev/null +++ b/adapter/max/handlers/chat.py @@ -0,0 +1,48 @@ +"""Chat management handlers for MAX surface.""" +import uuid +from adapter.max.store import ChatStore, RoomMeta + + +class ChatHandler: + def __init__(self, store: ChatStore): + self.store = store + + def handle_new(self, max_chat_id: str, user_id: str, agent_id: str, name: str = None) -> str: + platform_chat_id = str(uuid.uuid4()) + room = RoomMeta( + platform_chat_id=platform_chat_id, + max_chat_id=max_chat_id, + name=name or "New Chat", + user_id=user_id, + agent_id=agent_id, + ) + self.store.add_room(room) + return platform_chat_id + + def handle_chats(self, user_id: str) -> str: + rooms = self.store.list_rooms_for_user(user_id) + if not rooms: + return "No active chats." + lines = [f" {i+1}. {r.name}" for i, r in enumerate(rooms)] + return "\n".join(lines) + + def handle_rename(self, max_chat_id: str, new_name: str) -> str: + room = self.store.get_room_by_max_chat_id(max_chat_id) + if not room: + return "Chat not found." + room.name = new_name + return f"Chat renamed to: {new_name}" + + def handle_archive(self, max_chat_id: str) -> str: + room = self.store.get_room_by_max_chat_id(max_chat_id) + if not room: + return "Chat not found." + self.store.remove_room(max_chat_id) + return "Chat archived." + + def handle_clear(self, max_chat_id: str) -> str: + room = self.store.get_room_by_max_chat_id(max_chat_id) + if not room: + return "Chat not found." + room.platform_chat_id = str(uuid.uuid4()) + return "Chat context cleared." \ No newline at end of file diff --git a/adapter/max/handlers/help.py b/adapter/max/handlers/help.py new file mode 100644 index 0000000..24181c2 --- /dev/null +++ b/adapter/max/handlers/help.py @@ -0,0 +1,26 @@ +"""Help handler for MAX surface.""" + +HELP_TEXT = """ +Available commands: + +Chat management: + !new [name] — Create a new chat + !chats — List active chats + !rename — Rename current chat + !archive — Archive current chat + !clear / !reset — Reset chat context + +Attachments: + !list — Show attachment queue + !remove — Remove attachment from queue + !remove all — Clear attachment queue + +Actions: + !yes — Confirm agent action + !no — Cancel agent action + !help — Show this help +""" + + +def get_help() -> str: + return HELP_TEXT.strip() \ No newline at end of file diff --git a/adapter/max/store.py b/adapter/max/store.py new file mode 100644 index 0000000..d8e6ce6 --- /dev/null +++ b/adapter/max/store.py @@ -0,0 +1,48 @@ +"""Chat store for MAX surface.""" +from dataclasses import dataclass, field +from typing import Dict, Optional + + +@dataclass +class RoomMeta: + platform_chat_id: str + max_chat_id: str + name: str + user_id: str + agent_id: str + + +@dataclass +class ChatStore: + rooms: Dict[str, RoomMeta] = field(default_factory=dict) + staged_attachments: Dict[str, list] = field(default_factory=dict) + + def get_room_by_max_chat_id(self, max_chat_id: str) -> Optional[RoomMeta]: + return self.rooms.get(max_chat_id) + + def get_room_by_platform_chat_id(self, platform_chat_id: str) -> Optional[RoomMeta]: + for room in self.rooms.values(): + if room.platform_chat_id == platform_chat_id: + return room + return None + + def add_room(self, room: RoomMeta) -> None: + self.rooms[room.max_chat_id] = room + + def remove_room(self, max_chat_id: str) -> None: + self.rooms.pop(max_chat_id, None) + self.staged_attachments.pop(max_chat_id, None) + + def list_rooms_for_user(self, user_id: str) -> list: + return [r for r in self.rooms.values() if r.user_id == user_id] + + def stage_attachment(self, max_chat_id: str, attachment: tuple) -> None: + if max_chat_id not in self.staged_attachments: + self.staged_attachments[max_chat_id] = [] + self.staged_attachments[max_chat_id].append(attachment) + + def pop_attachments(self, max_chat_id: str) -> list: + return self.staged_attachments.pop(max_chat_id, []) + + def get_attachments(self, max_chat_id: str) -> list: + return self.staged_attachments.get(max_chat_id, []) \ No newline at end of file