"""MAX messenger surface — runtime using official MAX Bot API (long polling).""" from __future__ import annotations import asyncio import logging import os import re from pathlib import Path from urllib.parse import urlsplit, urlunsplit import httpx import structlog from dotenv import load_dotenv from adapter.max.agent_registry import AgentRegistry, AgentRegistryError, load_from_env from adapter.max.api_client import MaxApiError, MaxBotApi from adapter.max.converter import ( collect_max_attachments, incoming_from_message_callback_payload, incoming_from_text_commands, ) from adapter.max.files import ( guess_upload_type, read_workspace_bytes, save_incoming_from_url, upload_file_as_attachment, ) from adapter.max.handlers.attachments import AttachmentHandler from adapter.max.handlers.chat import ChatHandler as MaxChatHandler from adapter.max.handlers.commands import register_max_handlers from adapter.max.store import ChatStore, RoomMeta from core.auth import AuthManager from core.chat import ChatManager from core.handler import EventDispatcher from core.handlers import register_all from core.protocol import Attachment, IncomingCommand, OutgoingEvent, OutgoingMessage from core.protocol import OutgoingNotification, OutgoingTyping, OutgoingUI from core.settings import SettingsManager from core.store import InMemoryStore, StateStore from sdk.interface import PlatformClient, PlatformError from sdk.prototype_state import PrototypeStateStore from sdk.real import RealPlatformClient logger = structlog.get_logger(__name__) MAX_TEXT_CHARS = 4000 _POLL_TYPES_DEFAULT = ["message_created", "message_callback", "bot_started"] load_dotenv(Path(__file__).resolve().parents[2] / ".env") def _normalize_agent_base_url(url: str) -> str: parsed = urlsplit(url) path = re.sub(r"(?:/v1)?/agent_ws(?:/[^/]+)?/?$", "", parsed.path.rstrip("/")) return urlunsplit((parsed.scheme, parsed.netloc, path, "", "")) def _agent_base_url_from_env() -> str: if base_url := os.environ.get("AGENT_BASE_URL"): return base_url if ws_url := os.environ.get("AGENT_WS_URL"): return _normalize_agent_base_url(ws_url) return "http://127.0.0.1:8000" class RoutedMaxPlatformClient(PlatformClient): """Routes agent WS calls based on ChatStore mapping (same idea as RoutedPlatformClient).""" def __init__( self, *, chat_store: ChatStore, delegates: dict[str, PlatformClient], default_client: PlatformClient ): if not delegates: raise ValueError("RoutedMaxPlatformClient requires at least one delegate") self._store = chat_store self._delegates = dict(delegates) self._default_client = default_client async def get_or_create_user( self, external_id: str, platform: str, display_name: str | None = None ): return await self._default_client.get_or_create_user( external_id=external_id, platform=platform, display_name=display_name ) async def send_message(self, user_id: str, chat_id: str, text: str, attachments=None): delegate, platform_chat_id = await self._resolve_delegate(user_id, chat_id) return await delegate.send_message(user_id, platform_chat_id, text, attachments) async def stream_message(self, user_id: str, chat_id: str, text: str, attachments=None): delegate, platform_chat_id = await self._resolve_delegate(user_id, chat_id) async for chunk in delegate.stream_message(user_id, platform_chat_id, text, attachments): yield chunk async def get_settings(self, user_id: str): return await self._default_client.get_settings(user_id) async def update_settings(self, user_id: str, action) -> None: await self._default_client.update_settings(user_id, action) async def close(self) -> None: for delegate in self._delegates.values(): close_fn = getattr(delegate, "close", None) if callable(close_fn): await close_fn() async def _resolve_delegate(self, user_id: str, local_chat_id: str): room = self._store.get_room_by_platform_chat_id(local_chat_id) if room is None: raise PlatformError(f"unknown chat id: {local_chat_id}", code="CHAT_NOT_FOUND") agent_id = room.agent_id platform_chat_id = room.platform_chat_id delegate = self._delegates.get(str(agent_id)) if delegate is None: raise PlatformError(f"unknown agent id: {agent_id}", code="AGENT_NOT_FOUND") return delegate, str(platform_chat_id) class MaxBotApp: def __init__(self) -> None: self.token = os.environ["MAX_BOT_TOKEN"] api_base = os.environ.get("MAX_API_URL", "https://platform-api.max.ru").strip().rstrip("/") self.api = MaxBotApi(self.token, base_url=api_base) self.surfaces_workspace = Path(os.environ.get("SURFACES_WORKSPACE_DIR", "/agents")) agent_base_url = _agent_base_url_from_env() try: self.registry: AgentRegistry = load_from_env() except (AgentRegistryError, OSError) as exc: raise RuntimeError("failed to load MAX agent registry") from exc self.chat_store = ChatStore() self.max_chat_handler = MaxChatHandler(self.chat_store) self.attach_handler = AttachmentHandler(self.chat_store) self.core_store: StateStore = InMemoryStore() self.prototype_state = PrototypeStateStore() delegates: dict[str, RealPlatformClient] = {} for agent in self.registry.agents: base_raw = agent.base_url.strip() if agent.base_url else agent_base_url delegates[agent.agent_id] = RealPlatformClient( agent_id=agent.agent_id, agent_base_url=base_raw, prototype_state=self.prototype_state, platform="max", ) default_client = next(iter(delegates.values())) self.platform: RoutedMaxPlatformClient = RoutedMaxPlatformClient( chat_store=self.chat_store, delegates=delegates, default_client=default_client, ) self.chat_mgr = ChatManager(self.platform, self.core_store) self.auth_mgr = AuthManager(self.platform, self.core_store) self.settings_mgr = SettingsManager(self.platform, self.core_store) self.dispatcher = EventDispatcher( platform=self.platform, chat_mgr=self.chat_mgr, auth_mgr=self.auth_mgr, settings_mgr=self.settings_mgr, ) register_all(self.dispatcher) register_max_handlers( self.dispatcher, chat_store=self.chat_store, max_chat_handler=self.max_chat_handler, prototype_state=self.prototype_state, ) poll_types = os.environ.get("MAX_UPDATE_TYPES", "").strip() self.update_types = ( [t.strip() for t in poll_types.split(",") if t.strip()] if poll_types else list(_POLL_TYPES_DEFAULT) ) self._marker: int | None = None self.bot_user_ids: set[int] = set() logging.basicConfig(level=logging.INFO) async def bootstrap_identity(self) -> None: me = await self.api.get_me() uid = me.get("user_id") if isinstance(uid, int): self.bot_user_ids.add(uid) async def ensure_user(self, max_user_id: str, *, display_name: str | None) -> None: await self.platform.get_or_create_user(max_user_id, "max", display_name=display_name) await self.auth_mgr.confirm(max_user_id) async def _resolve_room( self, *, max_chat_key: str, max_user_id: str, ) -> RoomMeta: room = self.chat_store.get_room_by_max_chat_id(max_chat_key) if room is not None: return room assignment = self.registry.resolve_agent_for_user(max_user_id) if assignment.agent_id is None: raise RuntimeError("no agents configured") ws_path = "" try: ws_path = self.registry.get(assignment.agent_id).workspace_path except AgentRegistryError: pass pid = self.max_chat_handler.handle_new( max_chat_id=max_chat_key, user_id=max_user_id, agent_id=assignment.agent_id, name="Чат 1", workspace_path=ws_path, ) await self.chat_mgr.get_or_create( user_id=max_user_id, chat_id=pid, platform="max", surface_ref=max_chat_key, name="Чат 1", ) refreshed = self.chat_store.get_room_by_max_chat_id(max_chat_key) if refreshed is None: raise RuntimeError("max room bootstrap failed") logger.info( "max_chat_bootstrapped", max_chat_key=max_chat_key, platform_chat_id=pid, agent_id=assignment.agent_id, ) return refreshed async def process_message_created(self, payload: dict) -> None: message = payload.get("message") if not isinstance(message, dict): return sender = message.get("sender") or {} if not isinstance(sender, dict): return uid = sender.get("user_id") if isinstance(uid, int): uid_s = str(uid) else: return if sender.get("is_bot"): return recipient = message.get("recipient") or {} chat_id_numeric = recipient.get("chat_id") if chat_id_numeric is None or not isinstance(chat_id_numeric, int): dialog_uid = recipient.get("user_id") if isinstance(dialog_uid, int): chat_key = str(dialog_uid) else: return else: chat_key = str(chat_id_numeric) await self.ensure_user(uid_s, display_name=sender.get("first_name")) room = await self._resolve_room( max_chat_key=chat_key, max_user_id=uid_s, ) body = message.get("body") or {} text = "" if isinstance(body, dict): raw_txt = body.get("text") text = raw_txt.strip() if isinstance(raw_txt, str) else "" attachments_core, raw_meta = collect_max_attachments(body) if isinstance(body, dict) else ([], []) attachments_core = await self._materialize_attachments(room, attachments_core, raw_meta) if attachments_core and not text: for att in attachments_core: self.chat_store.stage_attachment(chat_key, (att.workspace_path or "", att.filename or "file")) return queued = self.chat_store.pop_attachments(chat_key) merged = list(attachments_core) for ws_path, fname in queued: if ws_path: merged.append( Attachment( type="document", filename=fname, workspace_path=ws_path, ) ) incoming = incoming_from_text_commands( text=text, max_user_id=uid_s, platform_chat_id=room.platform_chat_id, attachments=merged, ) if isinstance(incoming, IncomingMessage): if not incoming.text.strip() and not incoming.attachments: return if isinstance(incoming, IncomingCommand): if incoming.command in {"list", "remove"}: reply = await self._handle_local_attachment_command(incoming, chat_key) await self._send_lines(int(chat_key), reply) return try: outgoing = await self.dispatcher.dispatch(incoming) except PlatformError as exc: logger.warning("max_dispatch_error", code=exc.code, err=str(exc)) outgoing = [ OutgoingMessage( chat_id=room.platform_chat_id, text="Сервис временно недоступен. Попробуйте позже.", ) ] if not outgoing and isinstance(incoming, IncomingCommand): outgoing = [ OutgoingMessage( chat_id=room.platform_chat_id, text="Неизвестная команда. Введите /help.", ), ] await self._send_outgoing(int(chat_key), outgoing, room) async def _handle_local_attachment_command(self, incoming: IncomingCommand, chat_key: str) -> str: if incoming.command == "list": return self.attach_handler.handle_list(chat_key) return self.attach_handler.handle_remove(chat_key, incoming.args[0] if incoming.args else "") async def _materialize_attachments( self, room: RoomMeta, attachments: list[Attachment], raw_meta: list[dict], ) -> list[Attachment]: workspace = Path(room.workspace_path or str(self.surfaces_workspace)) out: list[Attachment] = [] for att, _meta in zip(attachments, raw_meta, strict=False): if not att.url: out.append(att) continue try: rel = await save_incoming_from_url( api=self.api, workspace_root=workspace, filename=att.filename or "file.bin", url=att.url, ) except (httpx.HTTPError, OSError) as exc: logger.warning("max_attachment_download_failed", error=str(exc)) out.append(att) continue out.append( Attachment( type=att.type, filename=att.filename, mime_type=att.mime_type, workspace_path=rel, url=att.url, ) ) return out async def process_message_callback(self, payload: dict) -> None: cb = payload.get("callback") or {} if not isinstance(cb, dict): return callback_id = cb.get("callback_id") user_blob = cb.get("user") or {} uid = user_blob.get("user_id") if isinstance(user_blob, dict) else None uid_s = str(uid) if isinstance(uid, int) else None msg = payload.get("message") or {} recipient = msg.get("recipient") or {} if isinstance(msg, dict) else {} cc = recipient.get("chat_id") if isinstance(cc, int): chat_key = str(cc) elif isinstance(uid_s, str): chat_key = uid_s else: return mid = "" body = msg.get("body") if isinstance(msg, dict) else None if isinstance(body, dict): mb = body.get("mid") mid = mb if isinstance(mb, str) else "" if uid_s is None: return await self.ensure_user(uid_s, display_name=user_blob.get("first_name")) room = self.chat_store.get_room_by_max_chat_id(chat_key) if room is None: return payload_raw = cb.get("payload") if cb.get("payload") is not None else None payload_str = str(payload_raw) if payload_raw is not None else "" incoming = incoming_from_message_callback_payload( max_user_id=uid_s, platform_chat_id=room.platform_chat_id, payload_raw=payload_str, callback_message_id=mid, ) if incoming is None: if isinstance(callback_id, str): await self.api.answer_callback(callback_id, notification="ok") return try: outgoing = await self.dispatcher.dispatch(incoming) except PlatformError: outgoing = [] await self._send_outgoing(int(chat_key), outgoing, room) if isinstance(callback_id, str): await self.api.answer_callback(callback_id, notification=" ") async def process_bot_started(self, payload: dict) -> None: cid = payload.get("chat_id") user_blob = payload.get("user") or {} uid = user_blob.get("user_id") chat_key = str(cid) if isinstance(cid, int) else None if chat_key is None or not isinstance(uid, int): return uid_s = str(uid) await self.ensure_user(uid_s, display_name=user_blob.get("first_name")) await self._resolve_room( max_chat_key=chat_key, max_user_id=uid_s, ) deeplink_note = "" dl = payload.get("payload") if isinstance(payload.get("payload"), str) else None if dl: deeplink_note = f" (payload: {dl})" welcome = ( "Здравствуйте, я помогу с задачами Lambda. " f"Отправьте текст или файл.{deeplink_note}" ) await self.api.send_message_to_chat(int(chat_key), text=welcome) async def dispatch_update(self, update: dict) -> None: utype = update.get("update_type") if utype == "message_created": await self.process_message_created(update) elif utype == "message_callback": await self.process_message_callback(update) elif utype == "bot_started": await self.process_bot_started(update) async def _send_lines(self, max_chat_id: int, text: str) -> None: if text: await self._send_plain_text(max_chat_id, text) async def _send_plain_text(self, max_chat_id: int, text: str, *, fmt: str | None = None) -> None: chunk_size = MAX_TEXT_CHARS for i in range(0, len(text), chunk_size): part = text[i : i + chunk_size] await self.api.send_message_to_chat(max_chat_id, text=part, fmt=fmt) async def _send_outgoing(self, max_chat_id: int, events: list[OutgoingEvent], room: RoomMeta) -> None: workspace_agent = Path( room.workspace_path if room.workspace_path else self.surfaces_workspace, ) for event in events: if isinstance(event, OutgoingTyping): await self.api.send_chat_action(max_chat_id, "typing_on") continue if isinstance(event, OutgoingNotification): body = f"[{event.level.upper()}] {event.text}" await self._send_plain_text(max_chat_id, body) continue if isinstance(event, OutgoingMessage): fmt = None if getattr(event, "parse_mode", "plain") == "markdown": fmt = "markdown" merged_text = getattr(event, "text", "") or "" attachments = list(getattr(event, "attachments", []) or []) agent_def = None try: agent_def = self.registry.get(room.agent_id) except AgentRegistryError: pass root = ( Path(agent_def.workspace_path) if agent_def and agent_def.workspace_path else workspace_agent ) req_atts: list[dict] = [] for raw_att in attachments: wp = getattr(raw_att, "workspace_path", None) if not wp: continue try: data = read_workspace_bytes(wp, agent_workspace=str(root)) except OSError: logger.warning("max_outgoing_missing_file", path=wp) continue fn = getattr(raw_att, "filename", None) or Path(str(wp)).name mime = getattr(raw_att, "mime_type", None) att_type = str(getattr(raw_att, "type", "") or "") ctype = guess_upload_type(mime, attachment_type=str(att_type)) attached = await upload_file_as_attachment( self.api, filename=fn, content=data, upload_type=ctype ) req_atts.append(attached) text_payload = merged_text.strip() or None if text_payload is None and not req_atts: continue await self.api.send_message_to_chat( max_chat_id, text=text_payload, attachments=req_atts or None, fmt=fmt, ) if isinstance(event, OutgoingUI): lines = [event.text] if getattr(event, "buttons", []): lines.append("") for button in event.buttons: lines.append(f"• {button.label}") lines.append("") lines.append("Ответьте /yes или /no (или кнопки с callback в MAX).") merged = "\n".join(lines) await self._send_plain_text(max_chat_id, merged) async def run(self) -> None: await self.bootstrap_identity() logger.info( "max_bot_poll_start", update_types=self.update_types, registry_agents=len(self.registry.agents), ) while True: try: updates, marker = await self.api.get_updates( marker=self._marker, types=self.update_types, timeout=40, limit=100, ) self._marker = marker for u in updates: try: await self.dispatch_update(u) except Exception: logger.exception("max_update_failed", update=u) except asyncio.CancelledError: raise except (MaxApiError, httpx.HTTPError) as exc: logger.error("max_poll_fatal", error=str(exc)) await asyncio.sleep(5) async def shutdown(self) -> None: close = getattr(self.platform, "close", None) if callable(close): await close() await self.api.aclose() async def main() -> None: app = MaxBotApp() try: await app.run() finally: await app.shutdown() if __name__ == "__main__": asyncio.run(main())