252 lines
No EOL
8.7 KiB
Python
252 lines
No EOL
8.7 KiB
Python
"""MAX surface bot runtime."""
|
|
import os
|
|
import asyncio
|
|
import aiohttp
|
|
from typing import Optional
|
|
|
|
from adapter.max.agent_registry import load_from_env, AgentRegistry, AgentConfig
|
|
from adapter.max.store import ChatStore
|
|
from adapter.max.files import FileHandler
|
|
from adapter.max.converter import (
|
|
max_message_to_incoming,
|
|
max_attachment_to_internal,
|
|
)
|
|
from adapter.max.handlers.chat import ChatHandler
|
|
from adapter.max.handlers.attachments import AttachmentHandler
|
|
from adapter.max.handlers.help import get_help
|
|
from core.protocol import IncomingMessage, IncomingCommand, IncomingCallback
|
|
|
|
|
|
class MaxSurface:
|
|
def __init__(self):
|
|
self.token = os.environ["MAX_BOT_TOKEN"]
|
|
self.api_url = os.environ.get("MAX_API_URL", "https://api.max.ru/v1")
|
|
self.workspace_dir = os.environ.get("SURFACES_WORKSPACE_DIR", "/agents")
|
|
self.agent_base_url = os.environ.get("AGENT_BASE_URL", "")
|
|
|
|
self.registry: AgentRegistry = load_from_env()
|
|
self.store = ChatStore()
|
|
self.files = FileHandler(self.workspace_dir)
|
|
self.chat_handler = ChatHandler(self.store)
|
|
self.attach_handler = AttachmentHandler(self.store)
|
|
self.session: Optional[aiohttp.ClientSession] = None
|
|
|
|
async def start(self):
|
|
self.session = aiohttp.ClientSession(
|
|
headers={"Authorization": f"Bearer {self.token}"}
|
|
)
|
|
print("[MAX Surface] Starting long poll...")
|
|
offset = 0
|
|
|
|
while True:
|
|
try:
|
|
updates = await self._get_updates(offset)
|
|
for update in updates:
|
|
offset = update["update_id"] + 1
|
|
await self._process_update(update)
|
|
except Exception as e:
|
|
print(f"[MAX Surface] Error: {e}")
|
|
await asyncio.sleep(5)
|
|
|
|
async def _get_updates(self, offset: int) -> list:
|
|
async with self.session.get(
|
|
f"{self.api_url}/updates",
|
|
params={"offset": offset, "timeout": 30},
|
|
) as resp:
|
|
data = await resp.json()
|
|
return data.get("result", [])
|
|
|
|
async def _process_update(self, update: dict) -> None:
|
|
if "message" in update:
|
|
await self._handle_message(update["message"])
|
|
elif "callback_query" in update:
|
|
await self._handle_callback(update["callback_query"])
|
|
|
|
async def _handle_message(self, message: dict) -> None:
|
|
text = message.get("text", "") or message.get("caption", "")
|
|
user_id = str(message["from"]["id"])
|
|
chat_id = str(message["chat"]["id"])
|
|
message_id = str(message["message_id"])
|
|
|
|
room = self.store.get_room_by_max_chat_id(chat_id)
|
|
if room is None:
|
|
agent = self.registry.get_agent_for_user(user_id)
|
|
platform_chat_id = self.chat_handler.handle_new(
|
|
max_chat_id=chat_id,
|
|
user_id=user_id,
|
|
agent_id=agent.id,
|
|
)
|
|
room = self.store.get_room_by_max_chat_id(chat_id)
|
|
else:
|
|
agent = self.registry.get_agent_by_id(room.agent_id)
|
|
|
|
attachments = []
|
|
if "attachment" in message:
|
|
att = message["attachment"]
|
|
internal_att = max_attachment_to_internal(
|
|
filename=att["filename"],
|
|
mime_type=att.get("mime_type", "application/octet-stream"),
|
|
download_url=att["download_url"],
|
|
)
|
|
attachments.append(internal_att)
|
|
|
|
workspace_path = await self.files.download_attachment(
|
|
download_url=att["download_url"],
|
|
filename=att["filename"],
|
|
agent_workspace=agent.workspace_path,
|
|
headers={"Authorization": f"Bearer {self.token}"},
|
|
)
|
|
self.store.stage_attachment(chat_id, (workspace_path, att["filename"]))
|
|
|
|
incoming = max_message_to_incoming(
|
|
text=text,
|
|
user_id=user_id,
|
|
chat_id=room.platform_chat_id,
|
|
attachments=attachments,
|
|
)
|
|
|
|
if isinstance(incoming, IncomingCommand):
|
|
response_text = await self._handle_surface_command(
|
|
incoming, max_chat_id=chat_id, user_id=user_id, agent=agent
|
|
)
|
|
if response_text:
|
|
await self._send_message(chat_id, response_text)
|
|
return
|
|
|
|
if isinstance(incoming, IncomingCallback):
|
|
await self._call_agent(
|
|
agent=agent,
|
|
platform_chat_id=room.platform_chat_id,
|
|
message=incoming,
|
|
)
|
|
return
|
|
|
|
if isinstance(incoming, IncomingMessage) and (incoming.text or attachments):
|
|
queued = self.store.pop_attachments(chat_id)
|
|
await self._send_typing(chat_id)
|
|
agent_response = await self._call_agent(
|
|
agent=agent,
|
|
platform_chat_id=room.platform_chat_id,
|
|
message=incoming,
|
|
attachments=queued,
|
|
)
|
|
await self._send_message(chat_id, agent_response)
|
|
|
|
async def _handle_callback(self, callback: dict) -> None:
|
|
user_id = str(callback["from"]["id"])
|
|
chat_id = str(callback["message"]["chat"]["id"])
|
|
message_id = str(callback["message"]["message_id"])
|
|
data = callback.get("data", "")
|
|
|
|
room = self.store.get_room_by_max_chat_id(chat_id)
|
|
if room is None:
|
|
return
|
|
|
|
incoming = max_message_to_incoming(
|
|
text="",
|
|
user_id=user_id,
|
|
chat_id=room.platform_chat_id,
|
|
callback_data=data,
|
|
message_id=message_id,
|
|
)
|
|
|
|
agent = self.registry.get_agent_by_id(room.agent_id)
|
|
await self._call_agent(
|
|
agent=agent,
|
|
platform_chat_id=room.platform_chat_id,
|
|
message=incoming,
|
|
)
|
|
|
|
async def _handle_surface_command(
|
|
self, cmd: IncomingCommand, max_chat_id: str, user_id: str, agent: AgentConfig
|
|
) -> Optional[str]:
|
|
command = cmd.command
|
|
args = cmd.args
|
|
|
|
if command == "new":
|
|
name = " ".join(args) if args else None
|
|
self.chat_handler.handle_new(
|
|
max_chat_id=max_chat_id,
|
|
user_id=user_id,
|
|
agent_id=agent.id,
|
|
name=name,
|
|
)
|
|
return f"New chat created: {name or 'Unnamed'}"
|
|
|
|
elif command == "chats":
|
|
return self.chat_handler.handle_chats(user_id)
|
|
|
|
elif command == "rename":
|
|
new_name = " ".join(args) if args else ""
|
|
return self.chat_handler.handle_rename(max_chat_id, new_name)
|
|
|
|
elif command == "archive":
|
|
return self.chat_handler.handle_archive(max_chat_id)
|
|
|
|
elif command in ("clear", "reset"):
|
|
return self.chat_handler.handle_clear(max_chat_id)
|
|
|
|
elif command == "list":
|
|
return self.attach_handler.handle_list(max_chat_id)
|
|
|
|
elif command == "remove":
|
|
idx = args[0] if args else ""
|
|
return self.attach_handler.handle_remove(max_chat_id, idx)
|
|
|
|
elif command == "help":
|
|
return get_help()
|
|
|
|
return None
|
|
|
|
async def _call_agent(
|
|
self,
|
|
agent: AgentConfig,
|
|
platform_chat_id: str,
|
|
message=None,
|
|
attachments: list = None,
|
|
) -> str:
|
|
payload = {
|
|
"chat_id": platform_chat_id,
|
|
"agent_id": agent.id,
|
|
}
|
|
if message:
|
|
if isinstance(message, IncomingMessage):
|
|
payload["message"] = message.text
|
|
elif isinstance(message, IncomingCallback):
|
|
payload["action"] = message.action
|
|
payload["payload"] = message.payload
|
|
elif isinstance(message, IncomingCommand):
|
|
payload["command"] = message.command
|
|
payload["args"] = message.args
|
|
if attachments:
|
|
payload["attachments"] = [a[0] for a in attachments]
|
|
|
|
base = self.agent_base_url or agent.base_url.rstrip("/")
|
|
url = f"{base}/chat/{agent.id}"
|
|
|
|
async with self.session.post(url, json=payload) as resp:
|
|
data = await resp.json()
|
|
return data.get("response", "")
|
|
|
|
async def _send_message(self, chat_id: str, text: str) -> None:
|
|
async with self.session.post(
|
|
f"{self.api_url}/sendMessage",
|
|
json={"chat_id": chat_id, "text": text},
|
|
) as resp:
|
|
await resp.json()
|
|
|
|
async def _send_typing(self, chat_id: str) -> None:
|
|
async with self.session.post(
|
|
f"{self.api_url}/sendChatAction",
|
|
json={"chat_id": chat_id, "action": "typing"},
|
|
) as resp:
|
|
await resp.json()
|
|
|
|
|
|
async def main():
|
|
surface = MaxSurface()
|
|
await surface.start()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main()) |