Compare commits

...
Sign in to create a new pull request.

3 commits

Author SHA1 Message Date
Vladislav Yashnov
7abbaf7e7a max bot upd, max-agents.yaml 2026-05-06 15:16:22 +03:00
Vladislav Yashnov
3118b3c99a bot fix 2026-05-06 00:35:17 +03:00
Vladislav Yashnov
eed1533cdc max first steps 2026-05-06 00:24:47 +03:00
13 changed files with 846 additions and 0 deletions

View file

@ -30,3 +30,8 @@ SURFACES_WORKSPACE_DIR=/agents
# Docker volume names (created automatically on first run) # Docker volume names (created automatically on first run)
SURFACES_SHARED_VOLUME=surfaces-agents SURFACES_SHARED_VOLUME=surfaces-agents
SURFACES_BOT_STATE_VOLUME=surfaces-bot-state SURFACES_BOT_STATE_VOLUME=surfaces-bot-state
# MAX Surface
MAX_BOT_TOKEN=real_max_token
MAX_API_URL=https://api.max.ru/v1
MAX_AGENT_REGISTRY_PATH=/app/config/max-agents.yaml

1
adapter/max/__init__.py Normal file
View file

@ -0,0 +1 @@
"""MAX surface adapter."""

View file

@ -0,0 +1,50 @@
"""Agent registry for MAX surface."""
import os
import yaml
from typing import List, Optional
from dataclasses import dataclass, field
@dataclass
class AgentConfig:
id: str
label: str
base_url: str
workspace_path: str
@dataclass
class AgentRegistry:
agents: List[AgentConfig] = field(default_factory=list)
def get_agent_for_user(self, user_id: str) -> AgentConfig:
return self.agents[0]
def get_agent_by_id(self, agent_id: str) -> Optional[AgentConfig]:
for agent in self.agents:
if agent.id == agent_id:
return agent
return None
def load_agent_registry(path: str) -> AgentRegistry:
with open(path, "r") as f:
data = yaml.safe_load(f)
registry = AgentRegistry()
for a in data.get("agents", []):
registry.agents.append(AgentConfig(
id=a["id"],
label=a.get("label", ""),
base_url=a["base_url"],
workspace_path=a["workspace_path"],
))
return registry
def load_from_env() -> AgentRegistry:
path = os.environ.get(
"MAX_AGENT_REGISTRY_PATH",
"/app/config/max-agents.yaml",
)
return load_agent_registry(path)

470
adapter/max/bot.py Normal file
View file

@ -0,0 +1,470 @@
"""MAX surface bot runtime."""
from __future__ import annotations
import os
import asyncio
import logging
from pathlib import Path
import aiohttp
import structlog
from adapter.max.agent_registry import load_from_env, AgentRegistry
from adapter.max.converter import (
max_message_to_incoming,
max_attachment_to_internal,
)
from adapter.max.files import FileHandler
from adapter.max.handlers.chat import ChatHandler as MaxChatHandler
from adapter.max.handlers.attachments import AttachmentHandler
from adapter.max.handlers.help import get_help
from adapter.max.store import ChatStore, RoomMeta
from core.chat import ChatManager
from core.auth import AuthManager
from core.handler import EventDispatcher
from core.protocol import (
Attachment,
IncomingMessage,
IncomingCommand,
IncomingCallback,
OutgoingEvent,
OutgoingMessage,
OutgoingNotification,
OutgoingTyping,
OutgoingUI,
)
from core.settings import SettingsManager
from core.store import InMemoryStore, StateStore
from sdk.interface import (
MessageChunk,
MessageResponse,
PlatformClient,
PlatformError,
User,
UserSettings,
)
from sdk.real import RealPlatformClient
logger = structlog.get_logger(__name__)
# ---------------------------------------------------------------------------
# Routed MAX platform client — копия логики RoutedPlatformClient из Matrix
# ---------------------------------------------------------------------------
class RoutedMaxPlatformClient(PlatformClient):
"""Маршрутизирует запросы к нужному агенту на основе chat_id."""
def __init__(self, *, store: ChatStore, delegates: dict[str, PlatformClient]):
if not delegates:
raise ValueError("RoutedMaxPlatformClient requires at least one delegate")
self._store = store
self._delegates = dict(delegates)
self._default_client = next(iter(self._delegates.values()))
async def get_or_create_user(
self, external_id: str, platform: str, display_name: str | None = None
) -> User:
return await self._default_client.get_or_create_user(
external_id=external_id, platform=platform, display_name=display_name
)
async def send_message(
self,
user_id: str,
chat_id: str,
text: str,
attachments: list[Attachment] | None = None,
) -> MessageResponse:
delegate, platform_chat_id = await self._resolve_delegate(user_id, chat_id)
return await delegate.send_message(user_id, platform_chat_id, text, attachments)
async def stream_message(
self,
user_id: str,
chat_id: str,
text: str,
attachments: list[Attachment] | None = None,
):
delegate, platform_chat_id = await self._resolve_delegate(user_id, chat_id)
async for chunk in delegate.stream_message(user_id, platform_chat_id, text, attachments):
yield chunk
async def get_settings(self, user_id: str) -> UserSettings:
return await self._default_client.get_settings(user_id)
async def update_settings(self, user_id: str, action) -> None:
await self._default_client.update_settings(user_id, action)
async def close(self) -> None:
for delegate in self._delegates.values():
close_fn = getattr(delegate, "close", None)
if callable(close_fn):
await close_fn()
async def _resolve_delegate(
self, user_id: str, local_chat_id: str
) -> tuple[PlatformClient, str]:
room = self._store.get_room_by_platform_chat_id(local_chat_id)
if room is None:
raise PlatformError(f"unknown chat id: {local_chat_id}", code="CHAT_NOT_FOUND")
agent_id = room.agent_id
platform_chat_id = room.platform_chat_id
if not agent_id or not platform_chat_id:
raise PlatformError(
f"routing incomplete for chat: {local_chat_id}", code="ROUTE_INCOMPLETE"
)
delegate = self._delegates.get(str(agent_id))
if delegate is None:
raise PlatformError(f"unknown agent id: {agent_id}", code="AGENT_NOT_FOUND")
return delegate, str(platform_chat_id)
# ---------------------------------------------------------------------------
# MAX Surface
# ---------------------------------------------------------------------------
class MaxSurface:
def __init__(self):
# Env
self.token = os.environ["MAX_BOT_TOKEN"]
self.api_url = os.environ.get("MAX_API_URL", "https://api.max.ru/v1")
self.workspace_dir = os.environ.get("SURFACES_WORKSPACE_DIR", "/agents")
self.agent_base_url = os.environ.get("AGENT_BASE_URL", "")
# Registry
self.registry: AgentRegistry = load_from_env()
# MAX-specific store for chat ↔ agent mapping
self.store = ChatStore()
self.files = FileHandler(self.workspace_dir)
self.max_chat_handler = MaxChatHandler(self.store)
self.attach_handler = AttachmentHandler(self.store)
# Core store (in-memory, lost on restart — OK for MVP)
self.core_store: StateStore = InMemoryStore()
# Platform client per agent
delegates: dict[str, PlatformClient] = {}
for agent in self.registry.agents:
base = self.agent_base_url or agent.base_url.rstrip("/")
delegates[agent.id] = RealPlatformClient(
agent_id=agent.id,
agent_base_url=base,
prototype_state=None,
platform="max",
)
# Routed platform
self.platform = RoutedMaxPlatformClient(
store=self.store,
delegates=delegates,
)
# Core managers
self.chat_mgr = ChatManager(self.platform, self.core_store)
self.auth_mgr = AuthManager(self.platform, self.core_store)
self.settings_mgr = SettingsManager(self.platform, self.core_store)
# Event dispatcher — это и есть "ядро"
self.dispatcher = EventDispatcher(
platform=self.platform,
chat_mgr=self.chat_mgr,
auth_mgr=self.auth_mgr,
settings_mgr=self.settings_mgr,
)
# HTTP session for MAX API
self.session: aiohttp.ClientSession | None = None
# ------------------------------------------------------------------
# Long polling
# ------------------------------------------------------------------
async def start(self):
self.session = aiohttp.ClientSession(
headers={"Authorization": f"Bearer {self.token}"}
)
logger.info("max_surface_starting", api_url=self.api_url)
offset = 0
while True:
try:
updates = await self._get_updates(offset)
for update in updates:
offset = update["update_id"] + 1
await self._process_update(update)
except Exception as e:
logger.error("max_poll_error", error=str(e))
await asyncio.sleep(5)
async def _get_updates(self, offset: int) -> list:
async with self.session.get(
f"{self.api_url}/updates",
params={"offset": offset, "timeout": 30},
) as resp:
data = await resp.json()
return data.get("result", [])
async def _process_update(self, update: dict) -> None:
if "message" in update:
await self._handle_message(update["message"])
elif "callback_query" in update:
await self._handle_callback(update["callback_query"])
# ------------------------------------------------------------------
# Message handling
# ------------------------------------------------------------------
async def _handle_message(self, message: dict) -> None:
text = message.get("text", "") or message.get("caption", "")
user_id = str(message["from"]["id"])
chat_id = str(message["chat"]["id"])
# Ensure room exists
room = self.store.get_room_by_max_chat_id(chat_id)
if room is None:
agent = self.registry.get_agent_for_user(user_id)
platform_chat_id = self.max_chat_handler.handle_new(
max_chat_id=chat_id,
user_id=user_id,
agent_id=agent.id,
)
room = self.store.get_room_by_max_chat_id(chat_id)
else:
agent = self.registry.get_agent_by_id(room.agent_id)
# Handle attachments
attachments = []
if "attachment" in message:
att = message["attachment"]
internal_att = max_attachment_to_internal(
filename=att["filename"],
mime_type=att.get("mime_type", "application/octet-stream"),
download_url=att["download_url"],
)
attachments.append(internal_att)
workspace_path = await self.files.download_attachment(
download_url=att["download_url"],
filename=att["filename"],
agent_workspace=agent.workspace_path,
headers={"Authorization": f"Bearer {self.token}"},
)
self.store.stage_attachment(chat_id, (workspace_path, att["filename"]))
# File-only message → stage and return
if attachments and not text:
return
# Merge staged attachments
queued = self.store.pop_attachments(chat_id)
if queued:
for ws_path, filename in queued:
attachments.append(
Attachment(
type="document",
filename=filename,
workspace_path=ws_path,
)
)
# Convert to incoming event
incoming = max_message_to_incoming(
text=text,
user_id=user_id,
chat_id=room.platform_chat_id,
attachments=attachments,
)
# Surface-level commands
if isinstance(incoming, IncomingCommand):
response_text = await self._handle_surface_command(
incoming, max_chat_id=chat_id, user_id=user_id, agent=agent
)
if response_text:
await self._send_message(chat_id, response_text)
return
# Dispatch to core
try:
outgoing_events = await self.dispatcher.dispatch(incoming)
except PlatformError as exc:
logger.warning(
"max_dispatch_platform_error",
user_id=user_id,
chat_id=chat_id,
code=exc.code,
error=str(exc),
)
outgoing_events = [
OutgoingMessage(
chat_id=room.platform_chat_id,
text="Сервис временно недоступен. Попробуйте ещё раз позже.",
)
]
# Send outgoing events back to MAX
for event in outgoing_events:
await self._send_outgoing(chat_id, event, agent.workspace_path)
# ------------------------------------------------------------------
# Callbacks
# ------------------------------------------------------------------
async def _handle_callback(self, callback: dict) -> None:
user_id = str(callback["from"]["id"])
chat_id = str(callback["message"]["chat"]["id"])
message_id = str(callback["message"]["message_id"])
data = callback.get("data", "")
room = self.store.get_room_by_max_chat_id(chat_id)
if room is None:
return
incoming = max_message_to_incoming(
text="",
user_id=user_id,
chat_id=room.platform_chat_id,
callback_data=data,
message_id=message_id,
)
try:
outgoing_events = await self.dispatcher.dispatch(incoming)
except PlatformError:
return
for event in outgoing_events:
agent = self.registry.get_agent_by_id(room.agent_id)
ws = agent.workspace_path if agent else "/agents/0"
await self._send_outgoing(chat_id, event, ws)
# ------------------------------------------------------------------
# Surface commands
# ------------------------------------------------------------------
async def _handle_surface_command(
self, cmd: IncomingCommand, max_chat_id: str, user_id: str, agent
) -> str | None:
command = cmd.command
args = cmd.args
if command == "new":
name = " ".join(args) if args else None
self.max_chat_handler.handle_new(
max_chat_id=max_chat_id,
user_id=user_id,
agent_id=agent.id,
name=name,
)
return f"New chat created: {name or 'Unnamed'}"
elif command == "chats":
return self.max_chat_handler.handle_chats(user_id)
elif command == "rename":
new_name = " ".join(args) if args else ""
return self.max_chat_handler.handle_rename(max_chat_id, new_name)
elif command == "archive":
return self.max_chat_handler.handle_archive(max_chat_id)
elif command in ("clear", "reset"):
return self.max_chat_handler.handle_clear(max_chat_id)
elif command == "list":
return self.attach_handler.handle_list(max_chat_id)
elif command == "remove":
idx = args[0] if args else ""
return self.attach_handler.handle_remove(max_chat_id, idx)
elif command == "help":
return get_help()
return None
# ------------------------------------------------------------------
# Outgoing to MAX
# ------------------------------------------------------------------
async def _send_outgoing(
self, max_chat_id: str, event: OutgoingEvent, workspace_path: str
) -> None:
if isinstance(event, OutgoingTyping):
await self._send_typing(max_chat_id)
return
if isinstance(event, OutgoingNotification):
text = f"[{event.level.upper()}] {event.text}"
await self._send_message(max_chat_id, text)
return
if isinstance(event, OutgoingMessage):
if event.text:
await self._send_message(max_chat_id, event.text)
# Upload outgoing files
for att in event.attachments:
if not att.workspace_path:
continue
if self.files.file_exists(att.workspace_path, workspace_path):
# Read file and upload to MAX
file_data = self.files.read_outgoing_file(
att.workspace_path, workspace_path
)
# MAX file upload logic — зависит от API MAX
# Пока просто отправляем имя файла текстом
await self._send_message(
max_chat_id,
f"[Файл: {att.filename or att.workspace_path}]",
)
return
if isinstance(event, OutgoingUI):
lines = [event.text]
if event.buttons:
for btn in event.buttons:
lines.append(f" {btn.label}")
lines.append("")
lines.append("Ответьте !yes для подтверждения или !no для отмены.")
await self._send_message(max_chat_id, "\n".join(lines))
return
# ------------------------------------------------------------------
# Low-level MAX API
# ------------------------------------------------------------------
async def _send_message(self, chat_id: str, text: str) -> None:
async with self.session.post(
f"{self.api_url}/sendMessage",
json={"chat_id": chat_id, "text": text},
) as resp:
await resp.json()
async def _send_typing(self, chat_id: str) -> None:
async with self.session.post(
f"{self.api_url}/sendChatAction",
json={"chat_id": chat_id, "action": "typing"},
) as resp:
await resp.json()
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
async def main():
surface = MaxSurface()
await surface.start()
if __name__ == "__main__":
asyncio.run(main())

88
adapter/max/converter.py Normal file
View file

@ -0,0 +1,88 @@
"""MAX event to internal protocol converter."""
from typing import Union, List
from core.protocol import (
IncomingMessage,
IncomingCommand,
IncomingCallback,
Attachment,
)
def _extract_command(text: str) -> Union[IncomingCommand, IncomingCallback, None]:
if not text.startswith("!"):
return None
parts = text.strip().split(maxsplit=1)
cmd = parts[0].lower()
args = parts[1] if len(parts) > 1 else ""
if cmd == "!yes":
return IncomingCallback(
user_id="",
platform="max",
chat_id="",
action="confirm",
)
elif cmd == "!no":
return IncomingCallback(
user_id="",
platform="max",
chat_id="",
action="cancel",
)
else:
return IncomingCommand(
user_id="",
platform="max",
chat_id="",
command=cmd.lstrip("!"),
args=args.split() if args else [],
)
def max_message_to_incoming(
*,
text: str,
user_id: str,
chat_id: str,
attachments: List[Attachment] = None,
callback_data: str = None,
message_id: str = None,
) -> Union[IncomingMessage, IncomingCommand, IncomingCallback]:
if callback_data:
return IncomingCallback(
user_id=user_id,
platform="max",
chat_id=chat_id,
action=callback_data,
payload={"message_id": message_id} if message_id else {},
)
if text:
cmd = _extract_command(text)
if cmd is not None:
cmd.user_id = user_id
cmd.chat_id = chat_id
return cmd
return IncomingMessage(
user_id=user_id,
platform="max",
chat_id=chat_id,
text=text or "",
attachments=attachments or [],
)
def max_attachment_to_internal(
*,
filename: str,
mime_type: str,
download_url: str,
) -> Attachment:
return Attachment(
type="document",
url=download_url,
filename=filename,
mime_type=mime_type,
)

51
adapter/max/files.py Normal file
View file

@ -0,0 +1,51 @@
"""File handling for MAX surface."""
import os
import aiohttp
from pathlib import Path
class FileHandler:
def __init__(self, workspace_root: str):
self.workspace_root = workspace_root
def _make_unique_filename(self, directory: str, filename: str) -> str:
base = Path(filename).stem
ext = Path(filename).suffix
candidate = filename
counter = 1
while os.path.exists(os.path.join(directory, candidate)):
candidate = f"{base} ({counter}){ext}"
counter += 1
return candidate
async def download_attachment(
self,
download_url: str,
filename: str,
agent_workspace: str,
headers: dict = None,
) -> str:
full_dir = os.path.join(self.workspace_root, agent_workspace.strip("/"))
os.makedirs(full_dir, exist_ok=True)
unique_name = self._make_unique_filename(full_dir, filename)
filepath = os.path.join(full_dir, unique_name)
async with aiohttp.ClientSession() as session:
async with session.get(download_url, headers=headers) as resp:
resp.raise_for_status()
with open(filepath, "wb") as f:
f.write(await resp.read())
return unique_name
def read_outgoing_file(self, workspace_path: str, agent_workspace: str) -> bytes:
full_dir = os.path.join(self.workspace_root, agent_workspace.strip("/"))
filepath = os.path.join(full_dir, workspace_path.lstrip("/"))
with open(filepath, "rb") as f:
return f.read()
def file_exists(self, workspace_path: str, agent_workspace: str) -> bool:
full_dir = os.path.join(self.workspace_root, agent_workspace.strip("/"))
filepath = os.path.join(full_dir, workspace_path.lstrip("/"))
return os.path.exists(filepath)

View file

@ -0,0 +1 @@
"""MAX surface handlers."""

View file

@ -0,0 +1,29 @@
"""Attachment queue handlers for MAX surface."""
from adapter.max.store import ChatStore
class AttachmentHandler:
def __init__(self, store: ChatStore):
self.store = store
def handle_list(self, max_chat_id: str) -> str:
attachments = self.store.get_attachments(max_chat_id)
if not attachments:
return "Attachment queue is empty."
lines = [f" {i+1}. {name}" for i, (_, name) in enumerate(attachments)]
return "\n".join(lines)
def handle_remove(self, max_chat_id: str, index: str) -> str:
attachments = self.store.staged_attachments.get(max_chat_id, [])
if index.lower() == "all":
self.store.staged_attachments[max_chat_id] = []
return "All attachments removed from queue."
try:
idx = int(index) - 1
if 0 <= idx < len(attachments):
removed = attachments.pop(idx)
return f"Removed: {removed[1]}"
return "Invalid index."
except ValueError:
return "Usage: !remove <number> or !remove all"

View file

@ -0,0 +1,48 @@
"""Chat management handlers for MAX surface."""
import uuid
from adapter.max.store import ChatStore, RoomMeta
class ChatHandler:
def __init__(self, store: ChatStore):
self.store = store
def handle_new(self, max_chat_id: str, user_id: str, agent_id: str, name: str = None) -> str:
platform_chat_id = str(uuid.uuid4())
room = RoomMeta(
platform_chat_id=platform_chat_id,
max_chat_id=max_chat_id,
name=name or "New Chat",
user_id=user_id,
agent_id=agent_id,
)
self.store.add_room(room)
return platform_chat_id
def handle_chats(self, user_id: str) -> str:
rooms = self.store.list_rooms_for_user(user_id)
if not rooms:
return "No active chats."
lines = [f" {i+1}. {r.name}" for i, r in enumerate(rooms)]
return "\n".join(lines)
def handle_rename(self, max_chat_id: str, new_name: str) -> str:
room = self.store.get_room_by_max_chat_id(max_chat_id)
if not room:
return "Chat not found."
room.name = new_name
return f"Chat renamed to: {new_name}"
def handle_archive(self, max_chat_id: str) -> str:
room = self.store.get_room_by_max_chat_id(max_chat_id)
if not room:
return "Chat not found."
self.store.remove_room(max_chat_id)
return "Chat archived."
def handle_clear(self, max_chat_id: str) -> str:
room = self.store.get_room_by_max_chat_id(max_chat_id)
if not room:
return "Chat not found."
room.platform_chat_id = str(uuid.uuid4())
return "Chat context cleared."

View file

@ -0,0 +1,26 @@
"""Help handler for MAX surface."""
HELP_TEXT = """
Available commands:
Chat management:
!new [name] Create a new chat
!chats List active chats
!rename <name> Rename current chat
!archive Archive current chat
!clear / !reset Reset chat context
Attachments:
!list Show attachment queue
!remove <n> Remove attachment from queue
!remove all Clear attachment queue
Actions:
!yes Confirm agent action
!no Cancel agent action
!help Show this help
"""
def get_help() -> str:
return HELP_TEXT.strip()

48
adapter/max/store.py Normal file
View file

@ -0,0 +1,48 @@
"""Chat store for MAX surface."""
from dataclasses import dataclass, field
from typing import Dict, Optional
@dataclass
class RoomMeta:
platform_chat_id: str
max_chat_id: str
name: str
user_id: str
agent_id: str
@dataclass
class ChatStore:
rooms: Dict[str, RoomMeta] = field(default_factory=dict)
staged_attachments: Dict[str, list] = field(default_factory=dict)
def get_room_by_max_chat_id(self, max_chat_id: str) -> Optional[RoomMeta]:
return self.rooms.get(max_chat_id)
def get_room_by_platform_chat_id(self, platform_chat_id: str) -> Optional[RoomMeta]:
for room in self.rooms.values():
if room.platform_chat_id == platform_chat_id:
return room
return None
def add_room(self, room: RoomMeta) -> None:
self.rooms[room.max_chat_id] = room
def remove_room(self, max_chat_id: str) -> None:
self.rooms.pop(max_chat_id, None)
self.staged_attachments.pop(max_chat_id, None)
def list_rooms_for_user(self, user_id: str) -> list:
return [r for r in self.rooms.values() if r.user_id == user_id]
def stage_attachment(self, max_chat_id: str, attachment: tuple) -> None:
if max_chat_id not in self.staged_attachments:
self.staged_attachments[max_chat_id] = []
self.staged_attachments[max_chat_id].append(attachment)
def pop_attachments(self, max_chat_id: str) -> list:
return self.staged_attachments.pop(max_chat_id, [])
def get_attachments(self, max_chat_id: str) -> list:
return self.staged_attachments.get(max_chat_id, [])

5
config/max-agents.yaml Normal file
View file

@ -0,0 +1,5 @@
agents:
- id: agent-0
label: "Agent 0"
base_url: "http://agent-proxy:7000/agent_0/"
workspace_path: "/agents/0"

24
docker-compose.max.yml Normal file
View file

@ -0,0 +1,24 @@
services:
max-bot:
build:
context: .
target: development
environment:
- MAX_BOT_TOKEN=${MAX_BOT_TOKEN}
- MAX_API_URL=${MAX_API_URL:-https://api.max.ru/v1}
- MAX_AGENT_REGISTRY_PATH=/app/config/max-agents.yaml
- AGENT_BASE_URL=http://platform-agent:8000
- SURFACES_WORKSPACE_DIR=/agents
volumes:
- surfaces-agents:/agents
command: python -m adapter.max.bot
platform-agent:
image: platform-agent:latest
environment:
- WORKSPACE_DIR=/workspace
volumes:
- surfaces-agents:/workspace
volumes:
surfaces-agents: