surfaces/adapter/max/bot.py

618 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""MAX messenger surface — runtime using official MAX Bot API (long polling)."""
from __future__ import annotations
import asyncio
import logging
import os
import re
from pathlib import Path
from urllib.parse import urlsplit, urlunsplit
import httpx
import structlog
from dotenv import load_dotenv
from adapter.max.agent_registry import AgentRegistry, AgentRegistryError, load_from_env
from adapter.max.api_client import MaxApiError, MaxBotApi
from adapter.max.converter import (
collect_max_attachments,
incoming_from_message_callback_payload,
incoming_from_text_commands,
)
from adapter.max.files import (
guess_upload_type,
read_workspace_bytes,
save_incoming_from_url,
upload_file_as_attachment,
)
from adapter.max.handlers.attachments import AttachmentHandler
from adapter.max.handlers.chat import ChatHandler as MaxChatHandler
from adapter.max.handlers.commands import register_max_handlers
from adapter.max.store import ChatStore, RoomMeta
from core.auth import AuthManager
from core.chat import ChatManager
from core.handler import EventDispatcher
from core.handlers import register_all
from core.protocol import Attachment, IncomingCommand, OutgoingEvent, OutgoingMessage
from core.protocol import OutgoingNotification, OutgoingTyping, OutgoingUI
from core.settings import SettingsManager
from core.store import InMemoryStore, StateStore
from sdk.interface import PlatformClient, PlatformError
from sdk.prototype_state import PrototypeStateStore
from sdk.real import RealPlatformClient
logger = structlog.get_logger(__name__)
MAX_TEXT_CHARS = 4000
_POLL_TYPES_DEFAULT = ["message_created", "message_callback", "bot_started"]
load_dotenv(Path(__file__).resolve().parents[2] / ".env")
def _normalize_agent_base_url(url: str) -> str:
parsed = urlsplit(url)
path = re.sub(r"(?:/v1)?/agent_ws(?:/[^/]+)?/?$", "", parsed.path.rstrip("/"))
return urlunsplit((parsed.scheme, parsed.netloc, path, "", ""))
def _agent_base_url_from_env() -> str:
if base_url := os.environ.get("AGENT_BASE_URL"):
return base_url
if ws_url := os.environ.get("AGENT_WS_URL"):
return _normalize_agent_base_url(ws_url)
return "http://127.0.0.1:8000"
class RoutedMaxPlatformClient(PlatformClient):
"""Routes agent WS calls based on ChatStore mapping (same idea as RoutedPlatformClient)."""
def __init__(
self, *, chat_store: ChatStore, delegates: dict[str, PlatformClient], default_client: PlatformClient
):
if not delegates:
raise ValueError("RoutedMaxPlatformClient requires at least one delegate")
self._store = chat_store
self._delegates = dict(delegates)
self._default_client = default_client
async def get_or_create_user(
self, external_id: str, platform: str, display_name: str | None = None
):
return await self._default_client.get_or_create_user(
external_id=external_id, platform=platform, display_name=display_name
)
async def send_message(self, user_id: str, chat_id: str, text: str, attachments=None):
delegate, platform_chat_id = await self._resolve_delegate(user_id, chat_id)
return await delegate.send_message(user_id, platform_chat_id, text, attachments)
async def stream_message(self, user_id: str, chat_id: str, text: str, attachments=None):
delegate, platform_chat_id = await self._resolve_delegate(user_id, chat_id)
async for chunk in delegate.stream_message(user_id, platform_chat_id, text, attachments):
yield chunk
async def get_settings(self, user_id: str):
return await self._default_client.get_settings(user_id)
async def update_settings(self, user_id: str, action) -> None:
await self._default_client.update_settings(user_id, action)
async def close(self) -> None:
for delegate in self._delegates.values():
close_fn = getattr(delegate, "close", None)
if callable(close_fn):
await close_fn()
async def _resolve_delegate(self, user_id: str, local_chat_id: str):
room = self._store.get_room_by_platform_chat_id(local_chat_id)
if room is None:
raise PlatformError(f"unknown chat id: {local_chat_id}", code="CHAT_NOT_FOUND")
agent_id = room.agent_id
platform_chat_id = room.platform_chat_id
delegate = self._delegates.get(str(agent_id))
if delegate is None:
raise PlatformError(f"unknown agent id: {agent_id}", code="AGENT_NOT_FOUND")
return delegate, str(platform_chat_id)
class MaxBotApp:
def __init__(self) -> None:
self.token = os.environ["MAX_BOT_TOKEN"]
api_base = os.environ.get("MAX_API_URL", "https://platform-api.max.ru").strip().rstrip("/")
self.api = MaxBotApi(self.token, base_url=api_base)
self.surfaces_workspace = Path(os.environ.get("SURFACES_WORKSPACE_DIR", "/agents"))
agent_base_url = _agent_base_url_from_env()
try:
self.registry: AgentRegistry = load_from_env()
except (AgentRegistryError, OSError) as exc:
raise RuntimeError("failed to load MAX agent registry") from exc
self.chat_store = ChatStore()
self.max_chat_handler = MaxChatHandler(self.chat_store)
self.attach_handler = AttachmentHandler(self.chat_store)
self.core_store: StateStore = InMemoryStore()
self.prototype_state = PrototypeStateStore()
delegates: dict[str, RealPlatformClient] = {}
for agent in self.registry.agents:
base_raw = agent.base_url.strip() if agent.base_url else agent_base_url
delegates[agent.agent_id] = RealPlatformClient(
agent_id=agent.agent_id,
agent_base_url=base_raw,
prototype_state=self.prototype_state,
platform="max",
)
default_client = next(iter(delegates.values()))
self.platform: RoutedMaxPlatformClient = RoutedMaxPlatformClient(
chat_store=self.chat_store,
delegates=delegates,
default_client=default_client,
)
self.chat_mgr = ChatManager(self.platform, self.core_store)
self.auth_mgr = AuthManager(self.platform, self.core_store)
self.settings_mgr = SettingsManager(self.platform, self.core_store)
self.dispatcher = EventDispatcher(
platform=self.platform,
chat_mgr=self.chat_mgr,
auth_mgr=self.auth_mgr,
settings_mgr=self.settings_mgr,
)
register_all(self.dispatcher)
register_max_handlers(
self.dispatcher,
chat_store=self.chat_store,
max_chat_handler=self.max_chat_handler,
prototype_state=self.prototype_state,
)
poll_types = os.environ.get("MAX_UPDATE_TYPES", "").strip()
self.update_types = (
[t.strip() for t in poll_types.split(",") if t.strip()]
if poll_types
else list(_POLL_TYPES_DEFAULT)
)
self._marker: int | None = None
self.bot_user_ids: set[int] = set()
logging.basicConfig(level=logging.INFO)
async def bootstrap_identity(self) -> None:
me = await self.api.get_me()
uid = me.get("user_id")
if isinstance(uid, int):
self.bot_user_ids.add(uid)
async def ensure_user(self, max_user_id: str, *, display_name: str | None) -> None:
await self.platform.get_or_create_user(max_user_id, "max", display_name=display_name)
await self.auth_mgr.confirm(max_user_id)
async def _resolve_room(
self,
*,
max_chat_key: str,
max_user_id: str,
) -> RoomMeta:
room = self.chat_store.get_room_by_max_chat_id(max_chat_key)
if room is not None:
return room
assignment = self.registry.resolve_agent_for_user(max_user_id)
if assignment.agent_id is None:
raise RuntimeError("no agents configured")
ws_path = ""
try:
ws_path = self.registry.get(assignment.agent_id).workspace_path
except AgentRegistryError:
pass
pid = self.max_chat_handler.handle_new(
max_chat_id=max_chat_key,
user_id=max_user_id,
agent_id=assignment.agent_id,
name="Чат 1",
workspace_path=ws_path,
)
await self.chat_mgr.get_or_create(
user_id=max_user_id,
chat_id=pid,
platform="max",
surface_ref=max_chat_key,
name="Чат 1",
)
refreshed = self.chat_store.get_room_by_max_chat_id(max_chat_key)
if refreshed is None:
raise RuntimeError("max room bootstrap failed")
logger.info(
"max_chat_bootstrapped",
max_chat_key=max_chat_key,
platform_chat_id=pid,
agent_id=assignment.agent_id,
)
return refreshed
async def process_message_created(self, payload: dict) -> None:
message = payload.get("message")
if not isinstance(message, dict):
return
sender = message.get("sender") or {}
if not isinstance(sender, dict):
return
uid = sender.get("user_id")
if isinstance(uid, int):
uid_s = str(uid)
else:
return
if sender.get("is_bot"):
return
recipient = message.get("recipient") or {}
chat_id_numeric = recipient.get("chat_id")
if chat_id_numeric is None or not isinstance(chat_id_numeric, int):
dialog_uid = recipient.get("user_id")
if isinstance(dialog_uid, int):
chat_key = str(dialog_uid)
else:
return
else:
chat_key = str(chat_id_numeric)
await self.ensure_user(uid_s, display_name=sender.get("first_name"))
room = await self._resolve_room(
max_chat_key=chat_key,
max_user_id=uid_s,
)
body = message.get("body") or {}
text = ""
if isinstance(body, dict):
raw_txt = body.get("text")
text = raw_txt.strip() if isinstance(raw_txt, str) else ""
attachments_core, raw_meta = collect_max_attachments(body) if isinstance(body, dict) else ([], [])
attachments_core = await self._materialize_attachments(room, attachments_core, raw_meta)
if attachments_core and not text:
for att in attachments_core:
self.chat_store.stage_attachment(chat_key, (att.workspace_path or "", att.filename or "file"))
return
queued = self.chat_store.pop_attachments(chat_key)
merged = list(attachments_core)
for ws_path, fname in queued:
if ws_path:
merged.append(
Attachment(
type="document",
filename=fname,
workspace_path=ws_path,
)
)
incoming = incoming_from_text_commands(
text=text,
max_user_id=uid_s,
platform_chat_id=room.platform_chat_id,
attachments=merged,
)
if isinstance(incoming, IncomingMessage):
if not incoming.text.strip() and not incoming.attachments:
return
if isinstance(incoming, IncomingCommand):
if incoming.command in {"list", "remove"}:
reply = await self._handle_local_attachment_command(incoming, chat_key)
await self._send_lines(int(chat_key), reply)
return
try:
outgoing = await self.dispatcher.dispatch(incoming)
except PlatformError as exc:
logger.warning("max_dispatch_error", code=exc.code, err=str(exc))
outgoing = [
OutgoingMessage(
chat_id=room.platform_chat_id,
text="Сервис временно недоступен. Попробуйте позже.",
)
]
if not outgoing and isinstance(incoming, IncomingCommand):
outgoing = [
OutgoingMessage(
chat_id=room.platform_chat_id,
text="Неизвестная команда. Введите /help.",
),
]
await self._send_outgoing(int(chat_key), outgoing, room)
async def _handle_local_attachment_command(self, incoming: IncomingCommand, chat_key: str) -> str:
if incoming.command == "list":
return self.attach_handler.handle_list(chat_key)
return self.attach_handler.handle_remove(chat_key, incoming.args[0] if incoming.args else "")
async def _materialize_attachments(
self,
room: RoomMeta,
attachments: list[Attachment],
raw_meta: list[dict],
) -> list[Attachment]:
workspace = Path(room.workspace_path or str(self.surfaces_workspace))
out: list[Attachment] = []
for att, _meta in zip(attachments, raw_meta, strict=False):
if not att.url:
out.append(att)
continue
try:
rel = await save_incoming_from_url(
api=self.api,
workspace_root=workspace,
filename=att.filename or "file.bin",
url=att.url,
)
except (httpx.HTTPError, OSError) as exc:
logger.warning("max_attachment_download_failed", error=str(exc))
out.append(att)
continue
out.append(
Attachment(
type=att.type,
filename=att.filename,
mime_type=att.mime_type,
workspace_path=rel,
url=att.url,
)
)
return out
async def process_message_callback(self, payload: dict) -> None:
cb = payload.get("callback") or {}
if not isinstance(cb, dict):
return
callback_id = cb.get("callback_id")
user_blob = cb.get("user") or {}
uid = user_blob.get("user_id") if isinstance(user_blob, dict) else None
uid_s = str(uid) if isinstance(uid, int) else None
msg = payload.get("message") or {}
recipient = msg.get("recipient") or {} if isinstance(msg, dict) else {}
cc = recipient.get("chat_id")
if isinstance(cc, int):
chat_key = str(cc)
elif isinstance(uid_s, str):
chat_key = uid_s
else:
return
mid = ""
body = msg.get("body") if isinstance(msg, dict) else None
if isinstance(body, dict):
mb = body.get("mid")
mid = mb if isinstance(mb, str) else ""
if uid_s is None:
return
await self.ensure_user(uid_s, display_name=user_blob.get("first_name"))
room = self.chat_store.get_room_by_max_chat_id(chat_key)
if room is None:
return
payload_raw = cb.get("payload") if cb.get("payload") is not None else None
payload_str = str(payload_raw) if payload_raw is not None else ""
incoming = incoming_from_message_callback_payload(
max_user_id=uid_s,
platform_chat_id=room.platform_chat_id,
payload_raw=payload_str,
callback_message_id=mid,
)
if incoming is None:
if isinstance(callback_id, str):
await self.api.answer_callback(callback_id, notification="ok")
return
try:
outgoing = await self.dispatcher.dispatch(incoming)
except PlatformError:
outgoing = []
await self._send_outgoing(int(chat_key), outgoing, room)
if isinstance(callback_id, str):
await self.api.answer_callback(callback_id, notification=" ")
async def process_bot_started(self, payload: dict) -> None:
cid = payload.get("chat_id")
user_blob = payload.get("user") or {}
uid = user_blob.get("user_id")
chat_key = str(cid) if isinstance(cid, int) else None
if chat_key is None or not isinstance(uid, int):
return
uid_s = str(uid)
await self.ensure_user(uid_s, display_name=user_blob.get("first_name"))
await self._resolve_room(
max_chat_key=chat_key,
max_user_id=uid_s,
)
deeplink_note = ""
dl = payload.get("payload") if isinstance(payload.get("payload"), str) else None
if dl:
deeplink_note = f" (payload: {dl})"
welcome = (
"Здравствуйте, я помогу с задачами Lambda. "
f"Отправьте текст или файл.{deeplink_note}"
)
await self.api.send_message_to_chat(int(chat_key), text=welcome)
async def dispatch_update(self, update: dict) -> None:
utype = update.get("update_type")
if utype == "message_created":
await self.process_message_created(update)
elif utype == "message_callback":
await self.process_message_callback(update)
elif utype == "bot_started":
await self.process_bot_started(update)
async def _send_lines(self, max_chat_id: int, text: str) -> None:
if text:
await self._send_plain_text(max_chat_id, text)
async def _send_plain_text(self, max_chat_id: int, text: str, *, fmt: str | None = None) -> None:
chunk_size = MAX_TEXT_CHARS
for i in range(0, len(text), chunk_size):
part = text[i : i + chunk_size]
await self.api.send_message_to_chat(max_chat_id, text=part, fmt=fmt)
async def _send_outgoing(self, max_chat_id: int, events: list[OutgoingEvent], room: RoomMeta) -> None:
workspace_agent = Path(
room.workspace_path if room.workspace_path else self.surfaces_workspace,
)
for event in events:
if isinstance(event, OutgoingTyping):
await self.api.send_chat_action(max_chat_id, "typing_on")
continue
if isinstance(event, OutgoingNotification):
body = f"[{event.level.upper()}] {event.text}"
await self._send_plain_text(max_chat_id, body)
continue
if isinstance(event, OutgoingMessage):
fmt = None
if getattr(event, "parse_mode", "plain") == "markdown":
fmt = "markdown"
merged_text = getattr(event, "text", "") or ""
attachments = list(getattr(event, "attachments", []) or [])
agent_def = None
try:
agent_def = self.registry.get(room.agent_id)
except AgentRegistryError:
pass
root = (
Path(agent_def.workspace_path)
if agent_def and agent_def.workspace_path
else workspace_agent
)
req_atts: list[dict] = []
for raw_att in attachments:
wp = getattr(raw_att, "workspace_path", None)
if not wp:
continue
try:
data = read_workspace_bytes(wp, agent_workspace=str(root))
except OSError:
logger.warning("max_outgoing_missing_file", path=wp)
continue
fn = getattr(raw_att, "filename", None) or Path(str(wp)).name
mime = getattr(raw_att, "mime_type", None)
att_type = str(getattr(raw_att, "type", "") or "")
ctype = guess_upload_type(mime, attachment_type=str(att_type))
attached = await upload_file_as_attachment(
self.api, filename=fn, content=data, upload_type=ctype
)
req_atts.append(attached)
text_payload = merged_text.strip() or None
if text_payload is None and not req_atts:
continue
await self.api.send_message_to_chat(
max_chat_id,
text=text_payload,
attachments=req_atts or None,
fmt=fmt,
)
if isinstance(event, OutgoingUI):
lines = [event.text]
if getattr(event, "buttons", []):
lines.append("")
for button in event.buttons:
lines.append(f"{button.label}")
lines.append("")
lines.append("Ответьте /yes или /no (или кнопки с callback в MAX).")
merged = "\n".join(lines)
await self._send_plain_text(max_chat_id, merged)
async def run(self) -> None:
await self.bootstrap_identity()
logger.info(
"max_bot_poll_start",
update_types=self.update_types,
registry_agents=len(self.registry.agents),
)
while True:
try:
updates, marker = await self.api.get_updates(
marker=self._marker,
types=self.update_types,
timeout=40,
limit=100,
)
self._marker = marker
for u in updates:
try:
await self.dispatch_update(u)
except Exception:
logger.exception("max_update_failed", update=u)
except asyncio.CancelledError:
raise
except (MaxApiError, httpx.HTTPError) as exc:
logger.error("max_poll_fatal", error=str(exc))
await asyncio.sleep(5)
async def shutdown(self) -> None:
close = getattr(self.platform, "close", None)
if callable(close):
await close()
await self.api.aclose()
async def main() -> None:
app = MaxBotApp()
try:
await app.run()
finally:
await app.shutdown()
if __name__ == "__main__":
asyncio.run(main())