diff --git a/.env.example b/.env.example index cc5f2e0..4037b6d 100644 --- a/.env.example +++ b/.env.example @@ -30,3 +30,9 @@ SURFACES_WORKSPACE_DIR=/agents # Docker volume names (created automatically on first run) SURFACES_SHARED_VOLUME=surfaces-agents SURFACES_BOT_STATE_VOLUME=surfaces-bot-state + +# MAX Surface +MAX_BOT_TOKEN=real_max_token +MAX_API_URL=https://platform-api.max.ru +MAX_AGENT_REGISTRY_PATH=/app/config/max-agents.yaml +MAX_PLATFORM_BACKEND=mock # ← ДОБАВИТЬ: "mock" для локалки, "real" для прода \ No newline at end of file diff --git a/adapter/max/__init__.py b/adapter/max/__init__.py new file mode 100644 index 0000000..c12c8ba --- /dev/null +++ b/adapter/max/__init__.py @@ -0,0 +1 @@ +"""MAX surface adapter.""" \ No newline at end of file diff --git a/adapter/max/agent_registry.py b/adapter/max/agent_registry.py new file mode 100644 index 0000000..edec923 --- /dev/null +++ b/adapter/max/agent_registry.py @@ -0,0 +1,157 @@ +from __future__ import annotations + +from collections.abc import Mapping +from dataclasses import dataclass, field +from pathlib import Path +from typing import Literal + +import yaml + +import structlog + + + +logger = structlog.get_logger() + +class AgentRegistryError(ValueError): + pass + + +@dataclass(frozen=True) +class AgentDefinition: + agent_id: str + label: str + base_url: str = field(default="") + workspace_path: str = field(default="") + + +@dataclass(frozen=True) +class AgentAssignment: + agent_id: str | None + source: Literal["configured", "default", "none"] + + @property + def is_default(self) -> bool: + return self.source == "default" + + +class AgentRegistry: + """Same contract as Matrix agent registry: user_agents maps MAX user_id string -> agent_id.""" + + def __init__( + self, + agents: list[AgentDefinition], + user_agents: Mapping[str, str] | None = None, + ) -> None: + self.agents = tuple(agents) + self._by_id = {agent.agent_id: agent for agent in self.agents} + self._user_agents: dict[str, str] = dict(user_agents or {}) + + def get(self, agent_id: str) -> AgentDefinition: + try: + return self._by_id[agent_id] + except KeyError as exc: + raise AgentRegistryError(f"unknown agent id: {agent_id}") from exc + + def get_agent_id_for_user(self, max_user_id: str) -> str | None: + return self._user_agents.get(max_user_id) + + def resolve_agent_for_user(self, max_user_id: str) -> AgentAssignment: + agent_id = self.get_agent_id_for_user(max_user_id) + if agent_id is not None: + return AgentAssignment(agent_id=agent_id, source="configured") + if self.agents: + return AgentAssignment(agent_id=self.agents[0].agent_id, source="default") + return AgentAssignment(agent_id=None, source="none") + + +def _required_text(entry: Mapping[str, object], key: str) -> str: + value = entry.get(key) + if not isinstance(value, str): + raise AgentRegistryError("each agent entry requires id and label") + text = value.strip() + if not text: + raise AgentRegistryError("each agent entry requires id and label") + return text + + +def _optional_text(entry: Mapping[str, object], key: str) -> str: + value = entry.get(key) + if value is None: + return "" + if not isinstance(value, str): + raise AgentRegistryError(f"agent entry field '{key}' must be a string") + return value.strip() + + +def _load_registry_data(path: str | Path) -> dict[str, object]: + try: + raw = yaml.safe_load(Path(path).read_text(encoding="utf-8")) + except yaml.YAMLError as exc: + raise AgentRegistryError("invalid agent registry YAML") from exc + if raw is None: + return {} + if not isinstance(raw, Mapping): + raise AgentRegistryError("agent registry must be a mapping with an agents list") + return dict(raw) + + +def load_agent_registry(path: str | Path) -> AgentRegistry: + raw = _load_registry_data(path) + entries = raw.get("agents") + if not isinstance(entries, list) or not entries: + raise AgentRegistryError("agents registry must contain a non-empty agents list") + + agents: list[AgentDefinition] = [] + seen: set[str] = set() + for entry in entries: + if not isinstance(entry, Mapping): + raise AgentRegistryError("each agent entry requires id and label") + agent_id = _required_text(entry, "id") + label = _required_text(entry, "label") + base_url = _optional_text(entry, "base_url") + workspace_path = _optional_text(entry, "workspace_path") + if agent_id in seen: + raise AgentRegistryError(f"duplicate agent id: {agent_id}") + seen.add(agent_id) + agents.append( + AgentDefinition( + agent_id=agent_id, + label=label, + base_url=base_url, + workspace_path=workspace_path, + ) + ) + + user_agents = raw.get("user_agents") + if user_agents is not None: + if not isinstance(user_agents, Mapping): + raise AgentRegistryError("user_agents must be a mapping of user id strings to agent ids") + normalized: dict[str, str] = {} + for uid, aid in user_agents.items(): + if not isinstance(uid, str) or not isinstance(aid, str): + raise AgentRegistryError("user_agents keys and values must be strings") + normalized[uid.strip()] = aid.strip() + user_agents_map: Mapping[str, str] = normalized + else: + user_agents_map = {} + + return AgentRegistry(agents=agents, user_agents=user_agents_map) + + +def resolve_agent_for_user(self, max_user_id: str) -> AgentAssignment: + agent_id = self.get_agent_id_for_user(max_user_id) + if agent_id is not None: + return AgentAssignment(agent_id=agent_id, source="configured") + if self.agents: + # Логируем использование default агента + logger.warning("using_default_agent_for_user", user_id=max_user_id, agent_id=self.agents[0].agent_id) + return AgentAssignment(agent_id=self.agents[0].agent_id, source="default") + return AgentAssignment(agent_id=None, source="none") + + +def load_from_env() -> AgentRegistry: + import os + + path = os.environ.get("MAX_AGENT_REGISTRY_PATH", "/app/config/max-agents.yaml") + return load_agent_registry(path) diff --git a/adapter/max/api_client.py b/adapter/max/api_client.py new file mode 100644 index 0000000..7f3810e --- /dev/null +++ b/adapter/max/api_client.py @@ -0,0 +1,153 @@ +"""HTTP client for MAX Bot API (platform-api.max.ru).""" +from __future__ import annotations + +import logging +from typing import Any + +import httpx + +logger = logging.getLogger(__name__) + + +class MaxApiError(Exception): + def __init__(self, status: int, payload: Any): + super().__init__(f"MAX API error {status}: {payload}") + self.status = status + self.payload = payload + + +class MaxBotApi: + """ + Minimal async client. Auth: raw token in Authorization header (same as official TS SDK). + """ + + def __init__(self, token: str, base_url: str = "https://platform-api.max.ru") -> None: + self._token = token + self._base = base_url.rstrip("/") + self._client = httpx.AsyncClient( + base_url=self._base, + headers={"Authorization": token}, + timeout=httpx.Timeout(120.0, connect=30.0), + ) + + async def aclose(self) -> None: + await self._client.aclose() + + async def _request( + self, + method: str, + path: str, + *, + params: dict[str, Any] | None = None, + json: Any | None = None, + ) -> Any: + response = await self._client.request(method, path, params=params, json=json) + payload: Any + try: + payload = response.json() + except Exception: + payload = response.text + if response.status_code >= 400: + if isinstance(payload, dict): + raise MaxApiError( + response.status_code, + {"code": payload.get("code"), "message": payload.get("message", payload)}, + ) + raise MaxApiError(response.status_code, payload) + return payload + + async def get_me(self) -> dict[str, Any]: + data = await self._request("GET", "/me") + return dict(data) if isinstance(data, dict) else {} + + async def get_updates( + self, + *, + marker: int | None = None, + limit: int = 100, + timeout: int = 30, + types: list[str] | None = None, + ) -> tuple[list[dict[str, Any]], int | None]: + params: dict[str, Any] = {"limit": limit, "timeout": timeout} + if marker is not None: + params["marker"] = marker + if types: + params["types"] = ",".join(types) + data = await self._request("GET", "/updates", params=params) + if not isinstance(data, dict): + return [], None + raw_updates = data.get("updates") or [] + updates = [u for u in raw_updates if isinstance(u, dict)] + marker_out = data.get("marker") + return updates, marker_out if isinstance(marker_out, int) else None + + async def send_message_to_chat( + self, + chat_id: int, + *, + text: str | None = None, + attachments: list[dict[str, Any]] | None = None, + fmt: str | None = None, + ) -> dict[str, Any]: + params: dict[str, Any] = {"chat_id": chat_id} + body: dict[str, Any] = {} + if text is not None: + body["text"] = text + if attachments is not None: + body["attachments"] = attachments + if fmt: + body["format"] = fmt + return await self._request("POST", "/messages", params=params, json=body) + + async def send_message_to_user( + self, + user_id: int, + *, + text: str | None = None, + attachments: list[dict[str, Any]] | None = None, + fmt: str | None = None, + ) -> dict[str, Any]: + params: dict[str, Any] = {"user_id": user_id} + body: dict[str, Any] = {} + if text is not None: + body["text"] = text + if attachments is not None: + body["attachments"] = attachments + if fmt: + body["format"] = fmt + return await self._request("POST", "/messages", params=params, json=body) + + async def send_chat_action(self, chat_id: int, action: str) -> Any: + return await self._request( + "POST", + f"/chats/{chat_id}/actions", + json={"action": action}, + ) + + async def get_upload_url(self, upload_type: str) -> dict[str, Any]: + data = await self._request("POST", "/uploads", params={"type": upload_type}) + return dict(data) if isinstance(data, dict) else {} + + async def answer_callback( + self, + callback_id: str, + *, + message: dict[str, Any] | None = None, + notification: str | None = None, + ) -> Any: + body: dict[str, Any] = {} + if message is not None: + body["message"] = message + if notification is not None: + body["notification"] = notification + return await self._request( + "POST", + "/answers", + params={"callback_id": callback_id}, + json=body if body else {}, + ) + + async def download_file(self, url: str) -> bytes: + response = await self._client.get(url) + response.raise_for_status() + return response.content diff --git a/adapter/max/bot.py b/adapter/max/bot.py new file mode 100644 index 0000000..574dee4 --- /dev/null +++ b/adapter/max/bot.py @@ -0,0 +1,655 @@ +"""MAX messenger surface — runtime using official MAX Bot API (long polling).""" +from __future__ import annotations + +import asyncio +import logging +import os +import re +from pathlib import Path +from urllib.parse import urlsplit, urlunsplit + +import httpx +import structlog +from dotenv import load_dotenv + +from adapter.max.agent_registry import AgentRegistry, AgentRegistryError, load_from_env +from adapter.max.api_client import MaxApiError, MaxBotApi +from adapter.max.converter import ( + collect_max_attachments, + incoming_from_message_callback_payload, + incoming_from_text_commands, +) +from adapter.max.files import ( + guess_upload_type, + read_workspace_bytes, + save_incoming_from_url, + upload_file_as_attachment, +) +from adapter.max.handlers.attachments import AttachmentHandler +from adapter.max.handlers.chat import ChatHandler as MaxChatHandler +from adapter.max.handlers.commands import register_max_handlers +from adapter.max.store import ChatStore, RoomMeta +from core.auth import AuthManager +from core.chat import ChatManager +from core.handler import EventDispatcher +from core.handlers import register_all +from core.protocol import ( + Attachment, + IncomingCommand, + IncomingMessage, + OutgoingEvent, + OutgoingMessage, + OutgoingNotification, + OutgoingTyping, + OutgoingUI +) +from core.settings import SettingsManager +from core.store import InMemoryStore, StateStore +from sdk.interface import PlatformClient, PlatformError +from sdk.prototype_state import PrototypeStateStore + +logger = structlog.get_logger(__name__) + +MAX_TEXT_CHARS = 4000 +_POLL_TYPES_DEFAULT = ["message_created", "message_callback", "bot_started"] + +load_dotenv(Path(__file__).resolve().parents[2] / ".env") + + +def _normalize_agent_base_url(url: str) -> str: + parsed = urlsplit(url) + path = re.sub(r"(?:/v1)?/agent_ws(?:/[^/]+)?/?$", "", parsed.path.rstrip("/")) + return urlunsplit((parsed.scheme, parsed.netloc, path, "", "")) + + +def _agent_base_url_from_env() -> str: + if base_url := os.environ.get("AGENT_BASE_URL"): + return base_url + if ws_url := os.environ.get("AGENT_WS_URL"): + return _normalize_agent_base_url(ws_url) + return "http://127.0.0.1:8000" + + +class RoutedMaxPlatformClient(PlatformClient): + """Routes agent WS calls based on ChatStore mapping (same idea as RoutedPlatformClient).""" + + def __init__( + self, *, chat_store: ChatStore, delegates: dict[str, PlatformClient], default_client: PlatformClient + ): + if not delegates: + raise ValueError("RoutedMaxPlatformClient requires at least one delegate") + self._store = chat_store + self._delegates = dict(delegates) + self._default_client = default_client + self.agent_healthy = True + + async def get_or_create_user( + self, external_id: str, platform: str, display_name: str | None = None + ): + return await self._default_client.get_or_create_user( + external_id=external_id, platform=platform, display_name=display_name + ) + + async def _check_agent_health(self): + try: + async with httpx.AsyncClient() as client: + resp = await client.get(f"{self.agent_base_url}/health", timeout=2.0) + self.agent_healthy = resp.status_code == 200 + except Exception: + self.agent_healthy = False + + async def send_message(self, user_id: str, chat_id: str, text: str, attachments=None): + delegate, platform_chat_id = await self._resolve_delegate(user_id, chat_id) + return await delegate.send_message(user_id, platform_chat_id, text, attachments) + + async def stream_message(self, user_id: str, chat_id: str, text: str, attachments=None): + delegate, platform_chat_id = await self._resolve_delegate(user_id, chat_id) + async for chunk in delegate.stream_message(user_id, platform_chat_id, text, attachments): + yield chunk + + async def get_settings(self, user_id: str): + return await self._default_client.get_settings(user_id) + + async def update_settings(self, user_id: str, action) -> None: + await self._default_client.update_settings(user_id, action) + + async def close(self) -> None: + for delegate in self._delegates.values(): + close_fn = getattr(delegate, "close", None) + if callable(close_fn): + await close_fn() + + async def _resolve_delegate(self, user_id: str, local_chat_id: str): + room = self._store.get_room_by_platform_chat_id(local_chat_id) + if room is None: + raise PlatformError(f"unknown chat id: {local_chat_id}", code="CHAT_NOT_FOUND") + + agent_id = room.agent_id + platform_chat_id = room.platform_chat_id + + delegate = self._delegates.get(str(agent_id)) + if delegate is None: + raise PlatformError(f"unknown agent id: {agent_id}", code="AGENT_NOT_FOUND") + + return delegate, str(platform_chat_id) + + +class MaxBotApp: + def __init__(self) -> None: + self.token = os.environ["MAX_BOT_TOKEN"] + api_base = os.environ.get("MAX_API_URL", "https://platform-api.max.ru").strip().rstrip("/") + self.api = MaxBotApi(self.token, base_url=api_base) + self.surfaces_workspace = Path(os.environ.get("SURFACES_WORKSPACE_DIR", "/agents")) + agent_base_url = _agent_base_url_from_env() + + try: + self.registry: AgentRegistry = load_from_env() + except (AgentRegistryError, OSError) as exc: + raise RuntimeError("failed to load MAX agent registry") from exc + + + self.chat_store = ChatStore() + self.max_chat_handler = MaxChatHandler(self.chat_store) + self.attach_handler = AttachmentHandler(self.chat_store) + + self.core_store: StateStore = InMemoryStore() + self.prototype_state = PrototypeStateStore() + + backend_mode = os.environ.get("MAX_PLATFORM_BACKEND", "mock").strip().lower() + logger.info("max_platform_backend_selected", backend=backend_mode) + + if backend_mode == "real": + from sdk.real import RealPlatformClient + delegates: dict[str, RealPlatformClient] = {} + for agent in self.registry.agents: + base_raw = agent.base_url.strip() if agent.base_url else agent_base_url + delegates[agent.agent_id] = RealPlatformClient( + agent_id=agent.agent_id, + agent_base_url=base_raw, + prototype_state=self.prototype_state, + platform="max", + ) + + if not delegates: + raise RuntimeError("No agents configured for real backend") + + default_client = next(iter(delegates.values())) + self.platform: RoutedMaxPlatformClient = RoutedMaxPlatformClient( + chat_store=self.chat_store, + delegates=delegates, + default_client=default_client, + ) + else: + # Mock backend for local development/testing + logger.warning("max_using_mock_backend", note="No real agent connection") + from sdk.mock import MockPlatformClient + self.platform = MockPlatformClient() # type: ignore[assignment] + + self.chat_mgr = ChatManager(self.platform, self.core_store) + self.auth_mgr = AuthManager(self.platform, self.core_store) + self.settings_mgr = SettingsManager(self.platform, self.core_store) + self.dispatcher = EventDispatcher( + platform=self.platform, + chat_mgr=self.chat_mgr, + auth_mgr=self.auth_mgr, + settings_mgr=self.settings_mgr, + ) + + register_all(self.dispatcher) + register_max_handlers( + self.dispatcher, + chat_store=self.chat_store, + max_chat_handler=self.max_chat_handler, + prototype_state=self.prototype_state, + ) + + poll_types = os.environ.get("MAX_UPDATE_TYPES", "").strip() + self.update_types = ( + [t.strip() for t in poll_types.split(",") if t.strip()] + if poll_types + else list(_POLL_TYPES_DEFAULT) + ) + + self._marker: int | None = None + self.bot_user_ids: set[int] = set() + logging.basicConfig(level=logging.INFO) + + async def bootstrap_identity(self) -> None: + me = await self.api.get_me() + uid = me.get("user_id") + if isinstance(uid, int): + self.bot_user_ids.add(uid) + + async def ensure_user(self, max_user_id: str, *, display_name: str | None) -> None: + await self.platform.get_or_create_user(max_user_id, "max", display_name=display_name) + await self.auth_mgr.confirm(max_user_id) + + async def _resolve_room( + self, + *, + max_chat_key: str, + max_user_id: str, + ) -> RoomMeta: + room = self.chat_store.get_room_by_max_chat_id(max_chat_key) + if room is not None: + return room + + assignment = self.registry.resolve_agent_for_user(max_user_id) + if assignment.agent_id is None: + raise RuntimeError("no agents configured") + + ws_path = "" + try: + ws_path = self.registry.get(assignment.agent_id).workspace_path + except AgentRegistryError: + pass + + pid = self.max_chat_handler.handle_new( + max_chat_id=max_chat_key, + user_id=max_user_id, + agent_id=assignment.agent_id, + name="Чат 1", + workspace_path=ws_path, + ) + + await self.chat_mgr.get_or_create( + user_id=max_user_id, + chat_id=pid, + platform="max", + surface_ref=max_chat_key, + name="Чат 1", + ) + + refreshed = self.chat_store.get_room_by_max_chat_id(max_chat_key) + if refreshed is None: + raise RuntimeError("max room bootstrap failed") + + logger.info( + "max_chat_bootstrapped", + max_chat_key=max_chat_key, + platform_chat_id=pid, + agent_id=assignment.agent_id, + ) + + return refreshed + + async def process_message_created(self, payload: dict) -> None: + logger.info( + "DEBUG_PAYLOAD", + body=payload.get("message", {}).get("body"), + ) + message = payload.get("message") + if not isinstance(message, dict): + return + + sender = message.get("sender") or {} + if not isinstance(sender, dict): + return + + uid = sender.get("user_id") + if isinstance(uid, int): + uid_s = str(uid) + else: + return + + if sender.get("is_bot"): + return + + recipient = message.get("recipient") or {} + chat_id_numeric = recipient.get("chat_id") + if chat_id_numeric is None or not isinstance(chat_id_numeric, int): + dialog_uid = recipient.get("user_id") + if isinstance(dialog_uid, int): + chat_key = str(dialog_uid) + else: + return + else: + chat_key = str(chat_id_numeric) + + await self.ensure_user(uid_s, display_name=sender.get("first_name")) + + room = await self._resolve_room( + max_chat_key=chat_key, + max_user_id=uid_s, + ) + + body = message.get("body") or {} + text = "" + if isinstance(body, dict): + raw_txt = body.get("text") + text = raw_txt.strip() if isinstance(raw_txt, str) else "" + + attachments_core, raw_meta = collect_max_attachments(body) if isinstance(body, dict) else ([], []) + attachments_core = await self._materialize_attachments(room, attachments_core, raw_meta) + + if attachments_core and not text: + logger.info("QUEUE_ADD", chat_key=chat_key, count=len(attachments_core)) + for att in attachments_core: + logger.info("QUEUE_ITEM", filename=att.filename) + self.chat_store.stage_attachment(chat_key, (att.workspace_path or "", att.filename or "file")) + return + + queued = self.chat_store.pop_attachments(chat_key) + merged = list(attachments_core) + for ws_path, fname in queued: + if ws_path: + merged.append( + Attachment( + type="document", + filename=fname, + workspace_path=ws_path, + ) + ) + + incoming = incoming_from_text_commands( + text=text, + max_user_id=uid_s, + platform_chat_id=room.platform_chat_id, + attachments=merged, + ) + + if isinstance(incoming, IncomingMessage): + if not incoming.text.strip() and not incoming.attachments: + return + + if isinstance(incoming, IncomingCommand): + if incoming.command in {"list", "remove"}: + reply = await self._handle_local_attachment_command(incoming, chat_key) + await self._send_lines(int(chat_key), reply) + return + + try: + outgoing = await self.dispatcher.dispatch(incoming) + except PlatformError as exc: + logger.warning("max_dispatch_error", code=exc.code, err=str(exc)) + outgoing = [ + OutgoingMessage( + chat_id=room.platform_chat_id, + text="Сервис временно недоступен. Попробуйте позже.", + ) + ] + + if not outgoing and isinstance(incoming, IncomingCommand): + outgoing = [ + OutgoingMessage( + chat_id=room.platform_chat_id, + text="Неизвестная команда. Введите /help.", + ), + ] + + await self._send_outgoing(int(chat_key), outgoing, room) + + async def _handle_local_attachment_command(self, incoming: IncomingCommand, chat_key: str) -> str: + if incoming.command == "list": + logger.info("QUEUE_LIST_REQUEST", chat_key=chat_key) + return self.attach_handler.handle_list(chat_key) + return self.attach_handler.handle_remove(chat_key, incoming.args[0] if incoming.args else "") + + async def _materialize_attachments( + self, + room: RoomMeta, + attachments: list[Attachment], + raw_meta: list[dict], + ) -> list[Attachment]: + workspace = Path(room.workspace_path or str(self.surfaces_workspace)) + out: list[Attachment] = [] + for att, _meta in zip(attachments, raw_meta, strict=False): + if not att.url: + out.append(att) + continue + try: + rel = await save_incoming_from_url( + api=self.api, + workspace_root=workspace, + filename=att.filename or "file.bin", + url=att.url, + ) + except (httpx.HTTPError, OSError) as exc: + logger.warning("max_attachment_download_failed", error=str(exc)) + out.append(att) + continue + out.append( + Attachment( + type=att.type, + filename=att.filename, + mime_type=att.mime_type, + workspace_path=rel, + url=att.url, + ) + ) + return out + + async def process_message_callback(self, payload: dict) -> None: + cb = payload.get("callback") or {} + if not isinstance(cb, dict): + return + callback_id = cb.get("callback_id") + user_blob = cb.get("user") or {} + uid = user_blob.get("user_id") if isinstance(user_blob, dict) else None + uid_s = str(uid) if isinstance(uid, int) else None + + msg = payload.get("message") or {} + recipient = msg.get("recipient") or {} if isinstance(msg, dict) else {} + cc = recipient.get("chat_id") + if isinstance(cc, int): + chat_key = str(cc) + elif isinstance(uid_s, str): + chat_key = uid_s + else: + return + + mid = "" + body = msg.get("body") if isinstance(msg, dict) else None + if isinstance(body, dict): + mb = body.get("mid") + mid = mb if isinstance(mb, str) else "" + + if uid_s is None: + return + + await self.ensure_user(uid_s, display_name=user_blob.get("first_name")) + + room = self.chat_store.get_room_by_max_chat_id(chat_key) + if room is None: + return + + payload_raw = cb.get("payload") if cb.get("payload") is not None else None + payload_str = str(payload_raw) if payload_raw is not None else "" + + incoming = incoming_from_message_callback_payload( + max_user_id=uid_s, + platform_chat_id=room.platform_chat_id, + payload_raw=payload_str, + callback_message_id=mid, + ) + if incoming is None: + if isinstance(callback_id, str): + await self.api.answer_callback(callback_id, notification="ok") + return + + try: + outgoing = await self.dispatcher.dispatch(incoming) + except PlatformError: + outgoing = [] + + await self._send_outgoing(int(chat_key), outgoing, room) + + if isinstance(callback_id, str): + await self.api.answer_callback(callback_id, notification=" ") + + async def process_bot_started(self, payload: dict) -> None: + cid = payload.get("chat_id") + user_blob = payload.get("user") or {} + uid = user_blob.get("user_id") + + chat_key = str(cid) if isinstance(cid, int) else None + if chat_key is None or not isinstance(uid, int): + return + + uid_s = str(uid) + await self.ensure_user(uid_s, display_name=user_blob.get("first_name")) + + await self._resolve_room( + max_chat_key=chat_key, + max_user_id=uid_s, + ) + + deeplink_note = "" + dl = payload.get("payload") if isinstance(payload.get("payload"), str) else None + if dl: + deeplink_note = f" (payload: {dl})" + + welcome = ( + "Здравствуйте, я бот Lambda, который готов помочь с задачами." + f"Отправьте текст или файл.{deeplink_note}" + ) + + await self.api.send_message_to_chat(int(chat_key), text=welcome) + + async def dispatch_update(self, update: dict) -> None: + utype = update.get("update_type") + if utype == "message_created": + await self.process_message_created(update) + elif utype == "message_callback": + await self.process_message_callback(update) + elif utype == "bot_started": + await self.process_bot_started(update) + + async def _send_lines(self, max_chat_id: int, text: str) -> None: + if text: + await self._send_plain_text(max_chat_id, text) + + async def _send_plain_text(self, max_chat_id: int, text: str, *, fmt: str | None = None) -> None: + chunk_size = MAX_TEXT_CHARS + for i in range(0, len(text), chunk_size): + part = text[i : i + chunk_size] + await self.api.send_message_to_chat(max_chat_id, text=part, fmt=fmt) + + async def _send_outgoing(self, max_chat_id: int, events: list[OutgoingEvent], room: RoomMeta) -> None: + workspace_agent = Path( + room.workspace_path if room.workspace_path else self.surfaces_workspace, + ) + + for event in events: + if isinstance(event, OutgoingTyping): + await self.api.send_chat_action(max_chat_id, "typing_on") + continue + + if isinstance(event, OutgoingNotification): + body = f"[{event.level.upper()}] {event.text}" + await self._send_plain_text(max_chat_id, body) + continue + + if isinstance(event, OutgoingMessage): + fmt = None + if getattr(event, "parse_mode", "plain") == "markdown": + fmt = "markdown" + + merged_text = getattr(event, "text", "") or "" + attachments = list(getattr(event, "attachments", []) or []) + + agent_def = None + try: + agent_def = self.registry.get(room.agent_id) + except AgentRegistryError: + pass + + root = ( + Path(agent_def.workspace_path) + if agent_def and agent_def.workspace_path + else workspace_agent + ) + + req_atts: list[dict] = [] + for raw_att in attachments: + wp = getattr(raw_att, "workspace_path", None) + if not wp: + continue + try: + data = read_workspace_bytes(wp, agent_workspace=str(root)) + except OSError: + logger.warning("max_outgoing_missing_file", path=wp) + continue + + fn = getattr(raw_att, "filename", None) or Path(str(wp)).name + mime = getattr(raw_att, "mime_type", None) + att_type = str(getattr(raw_att, "type", "") or "") + ctype = guess_upload_type(mime, attachment_type=str(att_type)) + attached = await upload_file_as_attachment( + self.api, filename=fn, content=data, upload_type=ctype + ) + req_atts.append(attached) + + text_payload = merged_text.strip() or None + + if text_payload is None and not req_atts: + continue + + await self.api.send_message_to_chat( + max_chat_id, + text=text_payload, + attachments=req_atts or None, + fmt=fmt, + ) + + if isinstance(event, OutgoingUI): + lines = [event.text] + if getattr(event, "buttons", []): + lines.append("") + for button in event.buttons: + lines.append(f"• {button.label}") + lines.append("") + lines.append("Ответьте /yes или /no (или кнопки с callback в MAX).") + + merged = "\n".join(lines) + await self._send_plain_text(max_chat_id, merged) + + async def run(self) -> None: + await self.bootstrap_identity() + + logger.info( + "max_bot_poll_start", + update_types=self.update_types, + registry_agents=len(self.registry.agents), + ) + + while True: + try: + updates, marker = await self.api.get_updates( + marker=self._marker, + types=self.update_types, + timeout=40, + limit=100, + ) + + self._marker = marker + + for u in updates: + try: + await self.dispatch_update(u) + except Exception: + logger.exception("max_update_failed", update=u) + + except asyncio.CancelledError: + raise + except (MaxApiError, httpx.HTTPError) as exc: + logger.error("max_poll_fatal", error=str(exc)) + await asyncio.sleep(5) + + async def shutdown(self) -> None: + close = getattr(self.platform, "close", None) + if callable(close): + await close() + await self.api.aclose() + + +async def main() -> None: + app = MaxBotApp() + try: + await app.run() + finally: + await app.shutdown() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/adapter/max/converter.py b/adapter/max/converter.py new file mode 100644 index 0000000..f7d2584 --- /dev/null +++ b/adapter/max/converter.py @@ -0,0 +1,153 @@ +"""MAX Bot API payloads -> core Incoming* types.""" +from __future__ import annotations + +from typing import Any + +from core.protocol import Attachment, IncomingCallback, IncomingCommand, IncomingMessage + + +def incoming_from_text_commands( + *, + text: str, + max_user_id: str, + platform_chat_id: str, + attachments: list[Attachment], +) -> IncomingMessage | IncomingCommand | IncomingCallback: + """Парсинг текста: только slash-команды (как в Telegram), обычное сообщение иначе.""" + + stripped = text.strip() + proto = stripped.lower() + + if proto in {"/yes"}: + return IncomingCallback( + user_id=max_user_id, + platform="max", + chat_id=platform_chat_id, + action="confirm", + payload={}, + ) + if proto in {"/no"}: + return IncomingCallback( + user_id=max_user_id, + platform="max", + chat_id=platform_chat_id, + action="cancel", + payload={}, + ) + + if not stripped.startswith("/"): + return IncomingMessage( + user_id=max_user_id, + platform="max", + chat_id=platform_chat_id, + text=text, + attachments=attachments, + reply_to=None, + ) + + raw = stripped[1:] + parts = raw.split(maxsplit=1) + if not parts: + return None + name = (parts[0] or "").lower() + tail = parts[1] if len(parts) > 1 else "" + + return IncomingCommand( + user_id=max_user_id, + platform="max", + chat_id=platform_chat_id, + command=name, + args=tail.split() if tail else [], + ) + + +def incoming_from_message_callback_payload( + *, + max_user_id: str, + platform_chat_id: str, + payload_raw: str | None, + callback_message_id: str | None, +) -> IncomingCallback | None: + if not payload_raw: + return None + if payload_raw in {"confirm", "cancel", "toggle_skill"}: + return IncomingCallback( + user_id=max_user_id, + platform="max", + chat_id=platform_chat_id, + action=payload_raw, + payload={"message_id": callback_message_id or ""}, + ) + return IncomingCallback( + user_id=max_user_id, + platform="max", + chat_id=platform_chat_id, + action="max_callback", + payload={"payload": payload_raw, "message_id": callback_message_id}, + ) + + +def attachment_from_max_dict(raw: dict[str, Any]) -> tuple[Attachment, dict[str, Any]] | None: + """Return core Attachment placeholder + raw attachment for download.""" + + kind = raw.get("type") + payload = raw.get("payload") + if not isinstance(kind, str) or not isinstance(payload, dict): + return None + + url = payload.get("url") + if not isinstance(url, str): + url = "" + + token = payload.get("token") + filename = "attachment.bin" + mapped = "document" + mime: str | None = None + + if kind == "image": + mapped = "image" + filename = "image.jpg" + elif kind == "video": + mapped = "video" + filename = "video.mp4" + elif kind == "audio": + mapped = "audio" + mime = payload.get("mime_type") if isinstance(payload.get("mime_type"), str) else "audio/mpeg" + filename = "audio.bin" + elif kind == "file": + fname = payload.get("filename") + filename = fname if isinstance(fname, str) and fname else "file.bin" + mapped = "document" + else: + return None + + attachment = Attachment( + type=mapped, + url=url or None, + content=None, + filename=filename, + mime_type=mime, + ) + meta = dict(raw) + if token: + meta["_download_token_hint"] = token + return attachment, meta + + +def collect_max_attachments(message_body: dict[str, Any]) -> tuple[list[Attachment], list[dict[str, Any]]]: + attachments = message_body.get("attachments") + if not isinstance(attachments, list): + return [], [] + + core_list: list[Attachment] = [] + raw_list: list[dict[str, Any]] = [] + + for item in attachments: + if isinstance(item, dict): + parsed = attachment_from_max_dict(item) + if parsed is None: + continue + core_a, raw_a = parsed + core_list.append(core_a) + raw_list.append(raw_a) + return core_list, raw_list diff --git a/adapter/max/files.py b/adapter/max/files.py new file mode 100644 index 0000000..47433be --- /dev/null +++ b/adapter/max/files.py @@ -0,0 +1,149 @@ +"""Incoming / outgoing file helpers for MAX (aligned with Matrix workspace layout).""" +from __future__ import annotations + +import mimetypes +import re +from pathlib import Path, PurePosixPath + +import httpx +from adapter.max.api_client import MaxBotApi + + +def _sanitize_filename(value: str) -> str: + """Безопасное имя файла с поддержкой кириллицы.""" + filename = PurePosixPath(str(value).replace("\\", "/")).name.strip() + cleaned = re.sub(r'[\x00-\x1f\x7f<>"|?*]+', "_", filename) + cleaned = cleaned.strip(" .") + return cleaned or "attachment.bin" + + +def _with_copy_index(filename: str, index: int) -> str: + """Generate filename with copy index: file.txt -> file (1).txt""" + path = Path(filename) + suffix = path.suffix + stem = path.stem if suffix else filename + return f"{stem} ({index}){suffix}" + + +def _unique_workspace_relative_path(workspace_root: Path, filename: str) -> tuple[str, Path]: + """Generate unique filename if file already exists in workspace.""" + safe_name = _sanitize_filename(filename) + candidate = workspace_root / safe_name + + if not candidate.exists(): + return safe_name, candidate + + index = 1 + while True: + indexed_name = _with_copy_index(safe_name, index) + candidate = workspace_root / indexed_name + if not candidate.exists(): + return indexed_name, candidate + index += 1 + + +def build_agent_workspace_path( + *, + workspace_root: Path, + filename: str, +) -> tuple[str, Path]: + """Saves user files directly to {workspace_root}/{filename}.""" + return _unique_workspace_relative_path(workspace_root, filename) + + +def resolve_workspace_attachment_path(workspace_root: Path, workspace_path: str) -> Path: + """Resolve relative workspace path to absolute Path object.""" + path = Path(workspace_path) + if path.is_absolute(): + return path + return workspace_root / path + +# === Конец вспомогательных функций === + + +def guess_upload_type(mime_type: str | None, *, attachment_type: str) -> str: + if attachment_type == "image": + return "image" + if attachment_type == "video": + return "video" + if attachment_type == "audio": + return "audio" + mime = mime_type or "" + if mime.startswith("image/"): + return "image" + if mime.startswith("video/"): + return "video" + if mime.startswith("audio/"): + return "audio" + return "file" + + +async def save_incoming_from_url( + *, + api: MaxBotApi, # Теперь тип известен + workspace_root: Path, + filename: str, + url: str, +) -> str: + """Скачивает файл по URL и сохраняет в workspace.""" + data = await api.download_file(url) + workspace_root.mkdir(parents=True, exist_ok=True) + + # Используем добавленную функцию + relative_path, absolute_path = build_agent_workspace_path( + workspace_root=workspace_root, + filename=filename, + ) + + absolute_path.parent.mkdir(parents=True, exist_ok=True) + absolute_path.write_bytes(data) + return relative_path + + +async def upload_file_as_attachment( + api: MaxBotApi, # Теперь тип известен + *, + filename: str, + content: bytes, + upload_type: str, +) -> dict: + """Загружает файл в MAX для отправки пользователю.""" + meta = await api.get_upload_url(upload_type) + upload_url = meta.get("url") + token = meta.get("token") + if not isinstance(upload_url, str) or not upload_url: + raise RuntimeError("MAX uploads response missing url") + + async with httpx.AsyncClient(timeout=httpx.Timeout(120.0)) as client: + response = await client.post( + upload_url, + files={"data": (filename, content, guess_mimetype(filename))}, + ) + response.raise_for_status() + + payload: dict = {} + if token: + payload["token"] = token + if upload_type == "image": + return {"type": "image", "payload": payload} + + type_map = { + "file": "file", + "video": "video", + "audio": "audio", + } + mapped = type_map.get(upload_type, "file") + return {"type": mapped, "payload": payload} + + +def guess_mimetype(filename: str) -> str: + mime, _ = mimetypes.guess_type(filename) + return mime or "application/octet-stream" + + +def read_workspace_bytes(workspace_path: str | Path, *, agent_workspace: str) -> bytes: + """Читает файл из workspace по относительному пути.""" + root = Path(agent_workspace) + # Теперь эта функция определена выше + resolved = resolve_workspace_attachment_path(root, str(workspace_path)) + return resolved.read_bytes() \ No newline at end of file diff --git a/adapter/max/handlers/__init__.py b/adapter/max/handlers/__init__.py new file mode 100644 index 0000000..93da8fa --- /dev/null +++ b/adapter/max/handlers/__init__.py @@ -0,0 +1,5 @@ +"""MAX surface handlers.""" + +from adapter.max.handlers.commands import register_max_handlers + +__all__ = ["register_max_handlers"] diff --git a/adapter/max/handlers/attachments.py b/adapter/max/handlers/attachments.py new file mode 100644 index 0000000..ab94150 --- /dev/null +++ b/adapter/max/handlers/attachments.py @@ -0,0 +1,29 @@ +"""Attachment queue handlers for MAX surface.""" +from adapter.max.store import ChatStore + + +class AttachmentHandler: + def __init__(self, store: ChatStore): + self.store = store + + def handle_list(self, max_chat_id: str) -> str: + attachments = self.store.get_attachments(max_chat_id) + if not attachments: + return "Очередь вложений пуста." + lines = [f" {i+1}. {name}" for i, (_, name) in enumerate(attachments)] + return "\n".join(lines) + + def handle_remove(self, max_chat_id: str, index: str) -> str: + attachments = self.store.staged_attachments.get(max_chat_id, []) + if index.lower() == "all": + self.store.staged_attachments[max_chat_id] = [] + return "Все вложения удалены из очереди." + + try: + idx = int(index) - 1 + if 0 <= idx < len(attachments): + removed = attachments.pop(idx) + return f"Удалено: {removed[1]}" + return "Неверный номер." + except ValueError: + return "Использование: /remove <номер> или /remove all" diff --git a/adapter/max/handlers/chat.py b/adapter/max/handlers/chat.py new file mode 100644 index 0000000..f8b4b3c --- /dev/null +++ b/adapter/max/handlers/chat.py @@ -0,0 +1,57 @@ +"""Chat management handlers for MAX surface.""" +import uuid +from adapter.max.store import ChatStore, RoomMeta + + +class ChatHandler: + def __init__(self, store: ChatStore): + self.store = store + + def handle_new( + self, + max_chat_id: str, + user_id: str, + agent_id: str, + name: str | None = None, + *, + workspace_path: str = "", + ) -> str: + platform_chat_id = str(uuid.uuid4()) + room = RoomMeta( + platform_chat_id=platform_chat_id, + max_chat_id=max_chat_id, + name=name or "New Chat", + user_id=user_id, + agent_id=agent_id, + workspace_path=workspace_path, + ) + self.store.add_room(room) + return platform_chat_id + + def handle_chats(self, user_id: str) -> str: + rooms = self.store.list_rooms_for_user(user_id) + if not rooms: + return "Нет активных чатов." + lines = [f" {i+1}. {r.name}" for i, r in enumerate(rooms)] + return "\n".join(lines) + + def handle_rename(self, max_chat_id: str, new_name: str) -> str: + room = self.store.get_room_by_max_chat_id(max_chat_id) + if not room: + return "Чат не найден." + room.name = new_name + return f"Чат переименован в: {new_name}" + + def handle_archive(self, max_chat_id: str) -> str: + room = self.store.get_room_by_max_chat_id(max_chat_id) + if not room: + return "Чат не найден." + self.store.remove_room(max_chat_id) + return "Чат архивирован." + + def handle_clear(self, max_chat_id: str) -> str: + room = self.store.get_room_by_max_chat_id(max_chat_id) + if not room: + return "Чат не найден." + room.platform_chat_id = str(uuid.uuid4()) + return "Контекст чата очищен." \ No newline at end of file diff --git a/adapter/max/handlers/commands.py b/adapter/max/handlers/commands.py new file mode 100644 index 0000000..a488143 --- /dev/null +++ b/adapter/max/handlers/commands.py @@ -0,0 +1,112 @@ +from __future__ import annotations + +from adapter.max.handlers.chat import ChatHandler as MaxChatHandler +from adapter.max.handlers.help import get_help +from adapter.max.store import ChatStore +from sdk.prototype_state import PrototypeStateStore + +from core.handler import EventDispatcher +from core.protocol import IncomingCallback, IncomingCommand, OutgoingMessage + +_SINGLE_DIALOG_HINT = ( + "В MAX один диалог с ботом. Чтобы сбросить контекст агента, используйте /clear или /reset." +) + + +def register_max_handlers( + dispatcher: EventDispatcher, + *, + chat_store: ChatStore, + max_chat_handler: MaxChatHandler, + prototype_state: PrototypeStateStore, +) -> None: + async def _room_or_error( + event: IncomingCommand, + ) -> tuple[str, str] | list[OutgoingMessage]: + room = chat_store.get_room_by_platform_chat_id(event.chat_id) + if room is None: + return [ + OutgoingMessage( + chat_id=event.chat_id, + text="Состояние ещё не готово. Напишите сообщение ещё раз.", + ) + ] + return room.max_chat_id, room.platform_chat_id + + async def handle_max_help(event: IncomingCommand, auth_mgr, platform, chat_mgr, settings_mgr): # noqa: ARG001 + if not await auth_mgr.is_authenticated(event.user_id): + return [ + OutgoingMessage( + chat_id=event.chat_id, + text="Введите /start чтобы начать.", + ) + ] + return [OutgoingMessage(chat_id=event.chat_id, text=get_help())] + + async def handle_max_no_multichat(event: IncomingCommand, auth_mgr, platform, chat_mgr, settings_mgr): # noqa: ARG001 + if not await auth_mgr.is_authenticated(event.user_id): + return [ + OutgoingMessage( + chat_id=event.chat_id, + text="Введите /start чтобы начать.", + ) + ] + _ = event + return [OutgoingMessage(chat_id=event.chat_id, text=_SINGLE_DIALOG_HINT)] + + async def handle_max_clear(event: IncomingCommand, auth_mgr, platform, chat_mgr, settings_mgr): # noqa: ARG001 + if not await auth_mgr.is_authenticated(event.user_id): + return [ + OutgoingMessage( + chat_id=event.chat_id, + text="Введите /start чтобы начать.", + ) + ] + routed = await _room_or_error(event) + if isinstance(routed, list): + return routed + max_chat_id, platform_chat_old = routed + + await prototype_state.clear_current_session(platform_chat_old) + await chat_mgr.archive(platform_chat_old, event.user_id) + max_chat_handler.handle_clear(max_chat_id) + room = chat_store.get_room_by_max_chat_id(max_chat_id) + if room is None: + return [ + OutgoingMessage( + chat_id=platform_chat_old, + text="Не удалось сбросить контекст.", + ) + ] + platform_chat_new = room.platform_chat_id + await chat_mgr.get_or_create( + user_id=event.user_id, + chat_id=platform_chat_new, + platform="max", + surface_ref=max_chat_id, + name=room.name, + ) + return [ + OutgoingMessage( + chat_id=platform_chat_new, + text="Контекст агента сброшен.", + ) + ] + + for cmd in ("new", "rename", "archive", "chats"): + dispatcher.register(IncomingCommand, cmd, handle_max_no_multichat) + + dispatcher.register(IncomingCommand, "reset", handle_max_clear) + dispatcher.register(IncomingCommand, "clear", handle_max_clear) + dispatcher.register(IncomingCommand, "help", handle_max_help) + + async def handle_max_plain_callback(event: IncomingCallback, auth_mgr, platform, chat_mgr, settings_mgr): # noqa: ARG001 + payload = str(event.payload.get("payload", "")) + return [ + OutgoingMessage( + chat_id=event.chat_id, + text=f"Неизвестное действие кнопки: {payload}", + ) + ] + + dispatcher.register(IncomingCallback, "max_callback", handle_max_plain_callback) diff --git a/adapter/max/handlers/help.py b/adapter/max/handlers/help.py new file mode 100644 index 0000000..b066119 --- /dev/null +++ b/adapter/max/handlers/help.py @@ -0,0 +1,23 @@ +"""Help text for MAX surface (single dialog, slash commands).""" + +HELP_TEXT = """ +Команды: + + /start — начать + /help — эта справка + /clear или /reset — сбросить контекст агента + +Вложения (файл без текста ставится в очередь): + + /list — очередь вложений + /remove n — убрать из очереди + /remove all — очистить очередь + +Подтверждения агента: + + /yes / /no +""" + + +def get_help() -> str: + return HELP_TEXT.strip() diff --git a/adapter/max/store.py b/adapter/max/store.py new file mode 100644 index 0000000..ae9878e --- /dev/null +++ b/adapter/max/store.py @@ -0,0 +1,49 @@ +"""Chat store for MAX surface.""" +from dataclasses import dataclass, field +from typing import Dict, Optional + + +@dataclass +class RoomMeta: + platform_chat_id: str + max_chat_id: str + name: str + user_id: str + agent_id: str + workspace_path: str = "" + + +@dataclass +class ChatStore: + rooms: Dict[str, RoomMeta] = field(default_factory=dict) + staged_attachments: Dict[str, list] = field(default_factory=dict) + + def get_room_by_max_chat_id(self, max_chat_id: str) -> Optional[RoomMeta]: + return self.rooms.get(max_chat_id) + + def get_room_by_platform_chat_id(self, platform_chat_id: str) -> Optional[RoomMeta]: + for room in self.rooms.values(): + if room.platform_chat_id == platform_chat_id: + return room + return None + + def add_room(self, room: RoomMeta) -> None: + self.rooms[room.max_chat_id] = room + + def remove_room(self, max_chat_id: str) -> None: + self.rooms.pop(max_chat_id, None) + self.staged_attachments.pop(max_chat_id, None) + + def list_rooms_for_user(self, user_id: str) -> list: + return [r for r in self.rooms.values() if r.user_id == user_id] + + def stage_attachment(self, max_chat_id: str, attachment: tuple) -> None: + if max_chat_id not in self.staged_attachments: + self.staged_attachments[max_chat_id] = [] + self.staged_attachments[max_chat_id].append(attachment) + + def pop_attachments(self, max_chat_id: str) -> list: + return self.staged_attachments.pop(max_chat_id, []) + + def get_attachments(self, max_chat_id: str) -> list: + return self.staged_attachments.get(max_chat_id, []) \ No newline at end of file diff --git a/config/max-agents.yaml b/config/max-agents.yaml new file mode 100644 index 0000000..5dfc4dd --- /dev/null +++ b/config/max-agents.yaml @@ -0,0 +1,10 @@ +# Пример реестра агентов для MAX (формат совпадает с matrix-agents). +# +# user_agents: +# "123456789": agent-0 +# +agents: + - id: agent-0 + label: "Agent 0" + base_url: "http://agent-proxy:7000/agent_0/" + workspace_path: "/agents/0" diff --git a/docker-compose.max.yml b/docker-compose.max.yml new file mode 100644 index 0000000..266a44e --- /dev/null +++ b/docker-compose.max.yml @@ -0,0 +1,63 @@ +# Локальный MAX + platform-agent из исходников (аналог docker-compose.fullstack.yml для Matrix). +# Продакшен: только max-bot из docker-compose.prod.yml; AGENT_BASE_URL — URL агента, который поднимает команда платформы. +services: + max-bot: + build: + context: . + target: development + env_file: .env + environment: + MAX_BOT_TOKEN: ${MAX_BOT_TOKEN:?set MAX_BOT_TOKEN in .env} + MAX_API_URL: ${MAX_API_URL:-https://platform-api.max.ru} + MAX_AGENT_REGISTRY_PATH: ${MAX_AGENT_REGISTRY_PATH:-/app/config/max-agents.yaml} + AGENT_BASE_URL: http://platform-agent:8000 + SURFACES_WORKSPACE_DIR: ${SURFACES_WORKSPACE_DIR:-/agents} + PYTHONUNBUFFERED: "1" + depends_on: + platform-agent: + condition: service_healthy + volumes: + - agents:/agents + - ./config:/app/config:ro + command: python -m adapter.max.bot + restart: unless-stopped + + platform-agent: + build: + context: ./external/platform-agent + target: development + additional_contexts: + agent_api: ./external/platform-agent_api + env_file: .env + environment: + PYTHONUNBUFFERED: "1" + AGENT_ID: ${AGENT_ID:-max-dev} + PROVIDER_MODEL: ${PROVIDER_MODEL:-openai/gpt-4o-mini} + PROVIDER_URL: ${PROVIDER_URL:-} + PROVIDER_API_KEY: ${PROVIDER_API_KEY:-} + COMPOSIO_API_KEY: ${COMPOSIO_API_KEY:-} + volumes: + - ./external/platform-agent/src:/app/src + - ./external/platform-agent_api:/agent_api + - agents:/workspace + command: > + sh -lc " + mkdir -p /workspace && + chown -R agent:agent /workspace && + exec /app/.venv/bin/uvicorn src.main:app --host 0.0.0.0 --port 8000 --no-access-log + " + ports: + - "8000:8000" + healthcheck: + test: + - CMD-SHELL + - python -c "import urllib.request; urllib.request.urlopen('http://127.0.0.1:8000/openapi.json', timeout=2).read()" + interval: 60s + timeout: 5s + retries: 5 + start_period: 15s + restart: unless-stopped + +volumes: + agents: + name: ${SURFACES_SHARED_VOLUME:-surfaces-agents} diff --git a/pyproject.toml b/pyproject.toml index 73dfbd7..27fc01a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,3 +40,6 @@ target-version = "py311" [tool.ruff.lint] select = ["E", "F", "I", "UP", "B"] + +[tool.setuptools.packages.find] +include = ["core*", "adapter*", "sdk*"] diff --git a/tests/adapter/max/__init__.py b/tests/adapter/max/__init__.py new file mode 100644 index 0000000..f13a104 --- /dev/null +++ b/tests/adapter/max/__init__.py @@ -0,0 +1 @@ +# MAX adapter tests diff --git a/tests/adapter/max/test_agent_registry.py b/tests/adapter/max/test_agent_registry.py new file mode 100644 index 0000000..f69a9f1 --- /dev/null +++ b/tests/adapter/max/test_agent_registry.py @@ -0,0 +1,88 @@ +from pathlib import Path + +import pytest + +from adapter.max.agent_registry import AgentRegistryError, load_agent_registry + + +def test_load_agent_registry_reads_yaml(tmp_path: Path): + path = tmp_path / "max.yaml" + path.write_text( + "agents:\n" + " - id: agent-1\n" + " label: One\n" + " base_url: http://localhost:8000/a1/\n" + " workspace_path: /agents/1\n", + encoding="utf-8", + ) + reg = load_agent_registry(path) + assert [a.agent_id for a in reg.agents] == ["agent-1"] + a = reg.get("agent-1") + assert a.label == "One" + assert a.base_url == "http://localhost:8000/a1/" + assert a.workspace_path == "/agents/1" + + +def test_user_agents_resolve(tmp_path: Path): + path = tmp_path / "max.yaml" + path.write_text( + "user_agents:\n" + ' "42": agent-1\n' + "agents:\n" + " - id: agent-1\n" + " label: One\n" + " - id: agent-2\n" + " label: Two\n", + encoding="utf-8", + ) + reg = load_agent_registry(path) + assert reg.resolve_agent_for_user("42").agent_id == "agent-1" + assert reg.resolve_agent_for_user("42").source == "configured" + assert reg.resolve_agent_for_user("999").agent_id == "agent-1" + assert reg.resolve_agent_for_user("999").source == "default" + + +def test_duplicate_ids_rejected(tmp_path: Path): + path = tmp_path / "max.yaml" + path.write_text( + "agents:\n" + " - id: a\n" + " label: A\n" + " - id: a\n" + " label: B\n", + encoding="utf-8", + ) + with pytest.raises(AgentRegistryError, match="duplicate agent id"): + load_agent_registry(path) + + +def test_empty_agents_rejected(tmp_path: Path): + path = tmp_path / "max.yaml" + path.write_text("agents: []\n", encoding="utf-8") + with pytest.raises(AgentRegistryError, match="non-empty"): + load_agent_registry(path) + + +def test_user_agents_must_be_strings(tmp_path: Path): + path = tmp_path / "max.yaml" + path.write_text( + "user_agents:\n" + " 42: agent-1\n" + "agents:\n" + " - id: agent-1\n" + " label: One\n", + encoding="utf-8", + ) + with pytest.raises(AgentRegistryError, match="user_agents"): + load_agent_registry(path) + + +def test_unknown_agent_raises(tmp_path: Path): + path = tmp_path / "max.yaml" + path.write_text( + "agents:\n - id: a\n label: A\n", + encoding="utf-8", + ) + reg = load_agent_registry(path) + with pytest.raises(AgentRegistryError, match="unknown agent id"): + reg.get("missing") diff --git a/tests/adapter/max/test_api_client.py b/tests/adapter/max/test_api_client.py new file mode 100644 index 0000000..206d2d9 --- /dev/null +++ b/tests/adapter/max/test_api_client.py @@ -0,0 +1,90 @@ +from __future__ import annotations + +from unittest.mock import AsyncMock + +import httpx +import pytest + +from adapter.max.api_client import MaxApiError, MaxBotApi + + +@pytest.mark.asyncio +async def test_get_updates_returns_marker_and_updates(): + api = MaxBotApi("token-x", base_url="http://max.test") + try: + api._client.request = AsyncMock( + return_value=httpx.Response( + 200, + json={ + "updates": [{"update_type": "message_created", "timestamp": 1}], + "marker": 7, + }, + ) + ) + updates, marker = await api.get_updates(types=["message_created"]) + assert len(updates) == 1 + assert updates[0]["update_type"] == "message_created" + assert marker == 7 + + _, kwargs = api._client.request.call_args + assert kwargs["params"]["types"] == "message_created" + finally: + await api.aclose() + + +@pytest.mark.asyncio +async def test_get_updates_non_dict_body(): + api = MaxBotApi("token-x", base_url="http://max.test") + try: + api._client.request = AsyncMock(return_value=httpx.Response(200, text="oops")) + updates, marker = await api.get_updates() + assert updates == [] + assert marker is None + finally: + await api.aclose() + + +@pytest.mark.asyncio +async def test_http_error_raises_max_api_error(): + api = MaxBotApi("token-x", base_url="http://max.test") + try: + api._client.request = AsyncMock( + return_value=httpx.Response(401, json={"code": "verify.token", "message": "bad"}) + ) + with pytest.raises(MaxApiError) as ei: + await api.get_me() + assert ei.value.status == 401 + assert "bad" in str(ei.value).lower() or ei.value.payload + finally: + await api.aclose() + + +@pytest.mark.asyncio +async def test_send_message_to_chat_posts_json_body(): + api = MaxBotApi("token-x", base_url="http://max.test") + try: + api._client.request = AsyncMock(return_value=httpx.Response(200, json={"message": {}})) + + await api.send_message_to_chat(12345, text="hi", attachments=None, fmt=None) + + args, kw = api._client.request.call_args + assert args[0] == "POST" + assert args[1] == "/messages" + assert kw["params"]["chat_id"] == 12345 + assert kw["json"] == {"text": "hi"} + finally: + await api.aclose() + + +@pytest.mark.asyncio +async def test_download_file_uses_get(): + api = MaxBotApi("token-x", base_url="http://max.test") + try: + api._client.get = AsyncMock(return_value=httpx.Response(200, content=b"\xff\xd8")) + + buf = await api.download_file("https://files.example/bin") + + assert buf == b"\xff\xd8" + api._client.get.assert_awaited_once() + finally: + await api.aclose() diff --git a/tests/adapter/max/test_converter.py b/tests/adapter/max/test_converter.py new file mode 100644 index 0000000..b5f1a6e --- /dev/null +++ b/tests/adapter/max/test_converter.py @@ -0,0 +1,154 @@ +from __future__ import annotations + +import pytest + +from adapter.max.converter import ( + attachment_from_max_dict, + collect_max_attachments, + incoming_from_message_callback_payload, + incoming_from_text_commands, +) +from core.protocol import Attachment, IncomingCallback, IncomingCommand, IncomingMessage + + +@pytest.mark.parametrize( + "text,expect_type", + [ + ("Hello", IncomingMessage), + (" plain ", IncomingMessage), + ], +) +def test_plain_text_to_message(text, expect_type): + r = incoming_from_text_commands( + text=text, + max_user_id="10", + platform_chat_id="pc-1", + attachments=[], + ) + assert type(r) is expect_type + assert r.text == text + + +def test_slash_command_split(): + r = incoming_from_text_commands( + text="/new title here", + max_user_id="10", + platform_chat_id="pc-1", + attachments=[], + ) + assert isinstance(r, IncomingCommand) + assert r.command == "new" + assert r.args == ["title", "here"] + + +def test_slash_command_no_args(): + r = incoming_from_text_commands( + text="/help", + max_user_id="10", + platform_chat_id="pc-1", + attachments=[], + ) + assert isinstance(r, IncomingCommand) + assert r.command == "help" + assert r.args == [] + + +def test_bang_prefix_is_plain_message_not_command(): + """MAX: только / считается командой.""" + r = incoming_from_text_commands( + text="!help", + max_user_id="10", + platform_chat_id="pc-1", + attachments=[], + ) + assert isinstance(r, IncomingMessage) + assert r.text == "!help" + + +def test_yes_no_callbacks(): + yes = incoming_from_text_commands( + text="/yes", + max_user_id="10", + platform_chat_id="pc-1", + attachments=[], + ) + assert isinstance(yes, IncomingCallback) + assert yes.action == "confirm" + + no = incoming_from_text_commands( + text="/NO", + max_user_id="10", + platform_chat_id="pc-1", + attachments=[], + ) + assert isinstance(no, IncomingCallback) + assert no.action == "cancel" + + +def test_incoming_message_keeps_attachments(): + at = [Attachment(type="document", filename="a.txt")] + r = incoming_from_text_commands( + text="see file", + max_user_id="10", + platform_chat_id="pc-1", + attachments=at, + ) + assert isinstance(r, IncomingMessage) + assert r.attachments == at + + +def test_message_callback_known_actions(): + c = incoming_from_message_callback_payload( + max_user_id="10", + platform_chat_id="pc", + payload_raw="confirm", + callback_message_id="mid", + ) + assert c is not None + assert isinstance(c, IncomingCallback) + assert c.action == "confirm" + assert c.payload.get("message_id") == "mid" + + +def test_message_callback_unknown_becomes_max_callback(): + c = incoming_from_message_callback_payload( + max_user_id="10", + platform_chat_id="pc", + payload_raw="my_payload", + callback_message_id=None, + ) + assert c is not None + assert c.action == "max_callback" + assert c.payload["payload"] == "my_payload" + + +def test_attachment_from_max_file(): + parsed = attachment_from_max_dict( + { + "type": "file", + "payload": { + "url": "https://cdn.example/f", + "filename": "doc.pdf", + "token": "tok", + }, + } + ) + assert parsed is not None + att, raw = parsed + assert att.filename == "doc.pdf" + assert att.type == "document" + assert att.url == "https://cdn.example/f" + assert raw.get("_download_token_hint") == "tok" + + +def test_collect_max_attachments_skips_unknown(): + core, raw = collect_max_attachments( + { + "attachments": [ + {"type": "file", "payload": {"url": "u", "filename": "x.bin"}}, + {"type": "sticker", "payload": {}}, + ] + } + ) + assert len(core) == len(raw) == 1 + assert core[0].filename == "x.bin" diff --git a/tests/adapter/max/test_dispatcher_max.py b/tests/adapter/max/test_dispatcher_max.py new file mode 100644 index 0000000..e4c85a6 --- /dev/null +++ b/tests/adapter/max/test_dispatcher_max.py @@ -0,0 +1,98 @@ +from __future__ import annotations + +import pytest + +from adapter.max.handlers.chat import ChatHandler as MaxChatHandler +from adapter.max.handlers.commands import register_max_handlers +from adapter.max.store import ChatStore, RoomMeta +from core.auth import AuthManager +from core.chat import ChatManager +from core.handler import EventDispatcher +from core.handlers import register_all +from core.protocol import IncomingCommand, OutgoingMessage +from core.settings import SettingsManager +from core.store import InMemoryStore +from sdk.mock import MockPlatformClient +from sdk.prototype_state import PrototypeStateStore + + +def _build_dispatcher() -> tuple[ + EventDispatcher, + ChatManager, + AuthManager, + ChatStore, + str, +]: + store_mem = InMemoryStore() + chat_store = ChatStore() + chat_handler = MaxChatHandler(chat_store) + prototype_state = PrototypeStateStore() + platform = MockPlatformClient() + chat_mgr = ChatManager(platform, store_mem) + auth_mgr = AuthManager(platform, store_mem) + settings_mgr = SettingsManager(platform, store_mem) + dispatcher = EventDispatcher( + platform=platform, + chat_mgr=chat_mgr, + auth_mgr=auth_mgr, + settings_mgr=settings_mgr, + ) + register_all(dispatcher) + register_max_handlers( + dispatcher, + chat_store=chat_store, + max_chat_handler=chat_handler, + prototype_state=prototype_state, + ) + + pid = "550e8400-e29b-41d4-a716-446655440000" + chat_store.add_room( + RoomMeta( + platform_chat_id=pid, + max_chat_id="777", + name="Чат", + user_id="u1", + agent_id="agent-0", + workspace_path="/agents/0", + ) + ) + return dispatcher, chat_mgr, auth_mgr, chat_store, pid + + +@pytest.mark.asyncio +async def test_dispatcher_new_is_single_dialog_hint(): + dispatcher, _chat_mgr, auth_mgr, _chat_store, pid = _build_dispatcher() + await auth_mgr.confirm("u1") + + out = await dispatcher.dispatch( + IncomingCommand(user_id="u1", platform="max", chat_id=pid, command="new"), + ) + assert len(out) == 1 + assert isinstance(out[0], OutgoingMessage) + assert "один диалог" in out[0].text.lower() + + +@pytest.mark.asyncio +async def test_dispatcher_clear_rotates_platform_chat(): + dispatcher, chat_mgr, auth_mgr, chat_store, pid = _build_dispatcher() + await auth_mgr.confirm("u1") + await chat_mgr.get_or_create( + user_id="u1", + chat_id=pid, + platform="max", + surface_ref="777", + name="Чат", + ) + + out = await dispatcher.dispatch( + IncomingCommand(user_id="u1", platform="max", chat_id=pid, command="clear"), + ) + assert len(out) == 1 + msg = out[0] + assert isinstance(msg, OutgoingMessage) + assert msg.chat_id != pid + assert "сброшен" in msg.text.lower() + + room = chat_store.get_room_by_max_chat_id("777") + assert room is not None + assert room.platform_chat_id == msg.chat_id diff --git a/tests/adapter/max/test_store.py b/tests/adapter/max/test_store.py new file mode 100644 index 0000000..176dc10 --- /dev/null +++ b/tests/adapter/max/test_store.py @@ -0,0 +1,78 @@ +from __future__ import annotations + +import uuid + +from adapter.max.handlers.attachments import AttachmentHandler +from adapter.max.handlers.chat import ChatHandler as MaxChatHandler +from adapter.max.store import ChatStore, RoomMeta + + +def test_chat_store_room_roundtrip(): + store = ChatStore() + r = RoomMeta( + platform_chat_id="pid-1", + max_chat_id="100", + name="Main", + user_id="42", + agent_id="agent-0", + workspace_path="/agents/0", + ) + store.add_room(r) + assert store.get_room_by_max_chat_id("100") is r + assert store.get_room_by_platform_chat_id("pid-1") is r + + +def test_staged_attachments(): + store = ChatStore() + store.stage_attachment("100", ("rel/path.txt", "path.txt")) + assert store.get_attachments("100") + popped = store.pop_attachments("100") + assert len(popped) == 1 + assert store.pop_attachments("100") == [] + + +def test_remove_room_clears_staging(): + store = ChatStore() + store.stage_attachment("100", ("a", "a")) + store.add_room( + RoomMeta( + platform_chat_id="x", + max_chat_id="100", + name="", + user_id="u", + agent_id="a", + ) + ) + store.remove_room("100") + assert store.get_room_by_max_chat_id("100") is None + assert store.get_attachments("100") == [] + + +def test_chat_handler_clear_rotates_platform_id(): + store = ChatStore() + h = MaxChatHandler(store) + pid1 = str(uuid.uuid4()) + store.add_room( + RoomMeta( + platform_chat_id=pid1, + max_chat_id="100", + name="Tab", + user_id="42", + agent_id="agent-0", + workspace_path="/agents/0", + ) + ) + h.handle_clear("100") + room = store.get_room_by_max_chat_id("100") + assert room is not None + assert room.platform_chat_id != pid1 + + +def test_attachment_handler_list_remove(): + store = ChatStore() + h = AttachmentHandler(store) + store.stage_attachment("100", ("a", "f1.bin")) + assert "f1.bin" in h.handle_list("100") + msg = h.handle_remove("100", "1") + assert "Удалено" in msg or "удалено" in msg.lower() + assert "пуста" in h.handle_list("100").lower() or "пусто" in h.handle_list("100").lower()