from __future__ import annotations import asyncio import json import os import re import time from dataclasses import dataclass from pathlib import Path from urllib.parse import urlsplit, urlunsplit import aiohttp import structlog from aiohttp import web from dotenv import load_dotenv from adapter.web.converter import json_to_incoming, outgoing_to_json from adapter.web.store import WebSessionStore from core.auth import AuthManager from core.chat import ChatManager from core.handler import EventDispatcher from core.handlers import register_all from core.protocol import IncomingCommand, IncomingMessage, OutgoingEvent, OutgoingMessage from core.settings import SettingsManager from core.store import InMemoryStore, StateStore from sdk.interface import PlatformClient, PlatformError from sdk.mock import MockPlatformClient logger = structlog.get_logger(__name__) load_dotenv(Path(__file__).resolve().parents[2] / ".env") PLATFORM = "web" @dataclass class WebRuntime: platform: PlatformClient store: StateStore chat_mgr: ChatManager auth_mgr: AuthManager settings_mgr: SettingsManager dispatcher: EventDispatcher session_store: WebSessionStore workspace_dir: Path def _agent_base_url_from_env() -> str: if base_url := os.environ.get("AGENT_BASE_URL"): return base_url if ws_url := os.environ.get("AGENT_WS_URL"): parsed = urlsplit(ws_url) path = re.sub(r"(?:/v1)?/agent_ws(?:/[^/]+)?/?$", "", parsed.path.rstrip("/")) return urlunsplit((parsed.scheme, parsed.netloc, path, "", "")) return "http://127.0.0.1:8000" def _build_platform_from_env(*, store: StateStore, chat_mgr: ChatManager) -> PlatformClient: backend = os.environ.get("WEB_PLATFORM_BACKEND", "mock").strip().lower() if backend == "real": from sdk.prototype_state import PrototypeStateStore from sdk.real import RealPlatformClient prototype_state = PrototypeStateStore() base_url = _agent_base_url_from_env() return RealPlatformClient( agent_id="web-agent", agent_base_url=base_url, prototype_state=prototype_state, platform=PLATFORM, ) return MockPlatformClient() def build_runtime( platform: PlatformClient | None = None, store: StateStore | None = None, ) -> WebRuntime: store = store or InMemoryStore() session_store = WebSessionStore() workspace_dir = Path(os.environ.get("SURFACES_WORKSPACE_DIR", "/workspace")) chat_mgr = ChatManager(platform, store) platform = platform or _build_platform_from_env(store=store, chat_mgr=chat_mgr) chat_mgr = ChatManager(platform, store) auth_mgr = AuthManager(platform, store) settings_mgr = SettingsManager(platform, store) dispatcher = EventDispatcher( platform=platform, chat_mgr=chat_mgr, auth_mgr=auth_mgr, settings_mgr=settings_mgr ) register_all(dispatcher) return WebRuntime( platform=platform, store=store, chat_mgr=chat_mgr, auth_mgr=auth_mgr, settings_mgr=settings_mgr, dispatcher=dispatcher, session_store=session_store, workspace_dir=workspace_dir, ) class WebBot: def __init__(self, runtime: WebRuntime) -> None: self.runtime = runtime self._connections: dict[str, web.WebSocketResponse] = {} async def handle_ws(self, request: web.Request) -> web.WebSocketResponse: ws = web.WebSocketResponse(max_msg_size=0) await ws.prepare(request) session_id = "" user_id = "" logger.info("web_ws_connected", peer=request.remote) try: async for msg in ws: if msg.type == aiohttp.WSMsgType.TEXT: session_id, user_id = await self._on_text(ws, msg.data, session_id, user_id) elif msg.type == aiohttp.WSMsgType.BINARY: await self._on_binary(ws, msg.data, session_id, user_id) elif msg.type == aiohttp.WSMsgType.ERROR: logger.error("web_ws_error", error=ws.exception()) except asyncio.CancelledError: pass finally: if user_id: self._connections.pop(user_id, None) logger.info("web_ws_disconnected", user_id=user_id, peer=request.remote) return ws async def _on_text( self, ws: web.WebSocketResponse, data: str, session_id: str, user_id: str, ) -> tuple[str, str]: try: payload = json.loads(data) except json.JSONDecodeError: await self._send(ws, json.dumps({"type": "error", "text": "Invalid JSON"})) return session_id, user_id msg_type = payload.get("type", "") if msg_type == "auth": session_token = payload.get("session_token", "") if session_token: uid = await self.runtime.session_store.get_user_id(session_token) if uid: session_id = session_token user_id = uid self._connections[user_id] = ws await self._send( ws, json.dumps( {"type": "connected", "user_id": user_id, "session_token": session_id}, ensure_ascii=False, ), ) await self._send_history(ws, user_id) return session_id, user_id session_id = await self.runtime.session_store.create_session( payload.get("display_name") ) user_id = await self.runtime.session_store.get_user_id(session_id) self._connections[user_id] = ws await self._send( ws, json.dumps( {"type": "connected", "user_id": user_id, "session_token": session_id}, ensure_ascii=False, ), ) return session_id, user_id if not user_id: await self._send(ws, json.dumps({"type": "error", "text": "Not authenticated"})) return session_id, user_id try: incoming = json_to_incoming(payload) except Exception as exc: logger.warning("web_convert_error", error=str(exc)) await self._send(ws, json.dumps( {"type": "error", "text": "Ошибка обработки сообщения"}, ensure_ascii=False, )) return session_id, user_id if incoming is None: await self._send(ws, json.dumps( {"type": "error", "text": "Неизвестный тип сообщения"}, ensure_ascii=False, )) return session_id, user_id now = time.time() user_text = payload.get("text", "") if not user_text and payload.get("type") == "command": cmd = payload.get("command", "") args = payload.get("args", []) user_text = f"/{cmd} {' '.join(args)}".strip() await self._save_message(user_id, { "type": "message", "chat_id": incoming.chat_id, "text": user_text, "from": "user", "timestamp": now, "attachments": getattr(incoming, "attachments", None), }) # Handle chat-management commands at web surface level if isinstance(incoming, IncomingCommand): cmd = incoming.command chat_id = incoming.chat_id if cmd == "delete": # Delete chat history, name, and archive state for store_key in ( f"web_history:{user_id}:{chat_id}", f"web_chat_name:{user_id}:{chat_id}", f"web_chat_archived:{user_id}:{chat_id}", ): await self.runtime.store.delete(store_key) await self._send(ws, outgoing_to_json( OutgoingMessage(chat_id=chat_id, text=f"Чат {chat_id} удалён") )) return session_id, user_id if cmd == "archive": target = incoming.args[0] if incoming.args else chat_id await self.runtime.store.set(f"web_chat_archived:{user_id}:{target}", "1") await self._save_message(user_id, { "type": "message", "chat_id": target, "text": f"Чат {target} архивирован", "from": "agent", "timestamp": time.time(), }) await self._send(ws, outgoing_to_json( OutgoingMessage(chat_id=target, text=f"Чат {target} архивирован") )) await self._send(ws, json.dumps({ "type": "chat_archived", "chat_id": target, })) return session_id, user_id if cmd == "unarchive": target = incoming.args[0] if incoming.args else chat_id await self.runtime.store.delete(f"web_chat_archived:{user_id}:{target}") await self._save_message(user_id, { "type": "message", "chat_id": target, "text": f"Чат {target} восстановлен из архива", "from": "agent", "timestamp": time.time(), }) await self._send(ws, outgoing_to_json( OutgoingMessage(chat_id=target, text=f"Чат {target} восстановлен из архива") )) await self._send(ws, json.dumps({ "type": "chat_unarchived", "chat_id": target, })) return session_id, user_id if cmd == "new": # Generate next available chat_id existing = set() for key in await self.runtime.store.keys(f"web_history:{user_id}:"): cid = key.rsplit(":", 1)[-1] existing.add(cid) n = 1 while f"C{n}" in existing: n += 1 new_id = f"C{n}" default_name = f"Чат {n}" await self.runtime.store.set(f"web_chat_name:{user_id}:{new_id}", default_name) await self._save_message(user_id, { "type": "message", "chat_id": new_id, "text": f"Создан новый чат {new_id}", "from": "agent", "timestamp": time.time(), }) await self._send(ws, outgoing_to_json( OutgoingMessage(chat_id=new_id, text=f"Создан чат {new_id}") )) await self._send(ws, json.dumps({ "type": "chat_new", "chat_id": new_id, })) await self._send(ws, json.dumps({ "type": "chat_name", "chat_id": new_id, "name": default_name, })) return session_id, user_id if cmd == "chats": chat_list = [] for key in await self.runtime.store.keys(f"web_history:{user_id}:"): cid = key.rsplit(":", 1)[-1] is_archived = await self.runtime.store.get(f"web_chat_archived:{user_id}:{cid}") name = await self.runtime.store.get(f"web_chat_name:{user_id}:{cid}") label = name or cid if is_archived: label += " (архив)" chat_list.append(f"• {label}") text = "Ваши чаты:\n" + "\n".join(chat_list) if chat_list else "Нет чатов" await self._send(ws, outgoing_to_json( OutgoingMessage(chat_id=chat_id, text=text) )) return session_id, user_id if cmd == "save": logger.info("web_cmd_save_entered", args=incoming.args, cmd=cmd) if not incoming.args: await self._send(ws, outgoing_to_json( OutgoingMessage(chat_id=chat_id, text="⚠ Укажите имя: /save <имя>") )) return session_id, user_id name = " ".join(incoming.args) sessions = await self.runtime.store.get(f"web_sessions:{user_id}") or [] if any(s["name"] == name for s in sessions): await self._send(ws, outgoing_to_json( OutgoingMessage(chat_id=chat_id, text=f"⚠ Сессия «{name}» уже существует.") )) return session_id, user_id sessions.append({"name": name, "created_at": time.strftime("%Y-%m-%dT%H:%M:%S")}) await self.runtime.store.set(f"web_sessions:{user_id}", sessions) await self._save_message(user_id, { "type": "message", "chat_id": chat_id, "text": f"💾 Сохраняю контекст как «{name}»...", "from": "agent", "timestamp": time.time(), }) await self._send(ws, outgoing_to_json( OutgoingMessage(chat_id=chat_id, text=f"💾 Сохраняю контекст как «{name}»...") )) prompt = IncomingMessage( user_id=user_id, platform=PLATFORM, chat_id=chat_id, text=f"Summarize our conversation and save to /workspace/contexts/{name}.md. Reply only with: Saved: {name}", attachments=[], ) try: outgoing = await self.runtime.dispatcher.dispatch(prompt) except PlatformError as exc: logger.warning("web_save_dispatch_error", user_id=user_id, error=str(exc)) text = f"⚠ Ошибка сохранения: {exc}" outgoing = [OutgoingMessage(chat_id=chat_id, text=text)] except Exception as exc: logger.warning("web_save_dispatch_error", user_id=user_id, error=str(exc)) text = f"⚠ Ошибка сохранения: {exc}" outgoing = [OutgoingMessage(chat_id=chat_id, text=text)] save_msg = next((o for o in outgoing if isinstance(o, OutgoingMessage)), None) save_text = save_msg.text if save_msg else f"✅ Сохранено: {name}" await self._save_message(user_id, { "type": "message", "chat_id": chat_id, "text": save_text, "from": "agent", "timestamp": time.time(), }) await self._send(ws, outgoing_to_json( OutgoingMessage(chat_id=chat_id, text=save_text) )) return session_id, user_id if cmd == "load": sessions = await self.runtime.store.get(f"web_sessions:{user_id}") or [] if not incoming.args: if not sessions: await self._send(ws, outgoing_to_json( OutgoingMessage(chat_id=chat_id, text="Нет сохранённых сессий. Используй /save [имя].") )) return session_id, user_id session_list = [{"name": s["name"], "created_at": s["created_at"][:10]} for s in sessions] await self._send(ws, json.dumps({"type": "sessions", "sessions": session_list})) return session_id, user_id name = " ".join(incoming.args) match = next((s for s in sessions if s["name"] == name), None) if sessions else None if not match: await self._send(ws, outgoing_to_json( OutgoingMessage(chat_id=chat_id, text=f"⚠ Сессия «{name}» не найдена.") )) return session_id, user_id await self.runtime.store.set(f"web_current_session:{user_id}", name) await self._save_message(user_id, { "type": "message", "chat_id": chat_id, "text": f"📂 Загружаю контекст «{name}»...", "from": "agent", "timestamp": time.time(), }) await self._send(ws, outgoing_to_json( OutgoingMessage(chat_id=chat_id, text=f"📂 Загружаю контекст «{name}»...") )) prompt = IncomingMessage( user_id=user_id, platform=PLATFORM, chat_id=chat_id, text=f"Load context from /workspace/contexts/{name}.md and use it as background for our conversation. Reply: Loaded: {name}", attachments=[], ) try: outgoing = await self.runtime.dispatcher.dispatch(prompt) except PlatformError as exc: logger.warning("web_load_dispatch_error", user_id=user_id, error=str(exc)) text = f"⚠ Ошибка загрузки: {exc}" outgoing = [OutgoingMessage(chat_id=chat_id, text=text)] except Exception as exc: logger.warning("web_load_dispatch_error", user_id=user_id, error=str(exc)) text = f"⚠ Ошибка загрузки: {exc}" outgoing = [OutgoingMessage(chat_id=chat_id, text=text)] load_msg = next((o for o in outgoing if isinstance(o, OutgoingMessage)), None) load_text = load_msg.text if load_msg else f"✅ Загружено: {name}" await self._save_message(user_id, { "type": "message", "chat_id": chat_id, "text": load_text, "from": "agent", "timestamp": time.time(), }) await self._send(ws, outgoing_to_json( OutgoingMessage(chat_id=chat_id, text=load_text) )) return session_id, user_id if cmd == "context": sessions = await self.runtime.store.get(f"web_sessions:{user_id}") or [] current = await self.runtime.store.get(f"web_current_session:{user_id}") lines = [f"Контекст чата: {chat_id}"] lines.append(f"Сессия: {current or 'не загружена'}") lines.append(f"Сохранения ({len(sessions)}):") for s in sessions: lines.append(f" - {s['name']} ({s['created_at'][:10]})") text = "\n".join(lines) await self._save_message(user_id, { "type": "message", "chat_id": chat_id, "text": text, "from": "agent", "timestamp": time.time(), }) await self._send(ws, outgoing_to_json( OutgoingMessage(chat_id=chat_id, text=text) )) return session_id, user_id if cmd in ("reset", "clear"): await self.runtime.store.delete(f"web_current_session:{user_id}") await self._save_message(user_id, { "type": "message", "chat_id": chat_id, "text": "🔄 Контекст сброшен. Агент не помнит предыдущий разговор.", "from": "agent", "timestamp": time.time(), }) await self._send(ws, outgoing_to_json( OutgoingMessage(chat_id=chat_id, text="🔄 Контекст сброшен.") )) return session_id, user_id # Handle rename at web surface level for persistence if isinstance(incoming, IncomingCommand) and incoming.command == "rename": new_name = " ".join(incoming.args) if incoming.args else "" if not new_name: await self._send(ws, outgoing_to_json( OutgoingMessage(chat_id=incoming.chat_id, text="Укажите имя: /rename <название>") )) else: # Check for duplicate names is_dup = False for key in await self.runtime.store.keys(f"web_chat_name:{user_id}:"): existing = await self.runtime.store.get(key) if existing and existing.lower() == new_name.lower(): cid = key.rsplit(":", 1)[-1] if cid != incoming.chat_id: is_dup = True break if is_dup: await self._send(ws, outgoing_to_json( OutgoingMessage(chat_id=incoming.chat_id, text=f"Чат с именем «{new_name}» уже существует") )) return session_id, user_id chat_id = incoming.chat_id await self.runtime.store.set(f"web_chat_name:{user_id}:{chat_id}", new_name) await self._save_message(user_id, { "type": "message", "chat_id": chat_id, "text": f"Переименован в: {new_name}", "from": "agent", "timestamp": time.time(), }) await self._send(ws, outgoing_to_json( OutgoingMessage(chat_id=chat_id, text=f"Переименован в: {new_name}") )) await self._send(ws, json.dumps( {"type": "chat_name", "chat_id": chat_id, "name": new_name} )) return session_id, user_id try: outgoing = await self.runtime.dispatcher.dispatch(incoming) except PlatformError as exc: logger.warning("web_platform_error", user_id=user_id, code=exc.code, error=str(exc)) outgoing = [ OutgoingMessage( chat_id=incoming.chat_id, text="Сервис временно недоступен. Попробуйте ещё раз позже.", ) ] except Exception as exc: logger.warning("web_dispatch_error", user_id=user_id, error=str(exc)) outgoing = [ OutgoingMessage( chat_id=incoming.chat_id, text=f"Ошибка: {exc}", ) ] for event in outgoing: if isinstance(event, OutgoingMessage): await self._save_message(user_id, { "type": "message", "chat_id": event.chat_id, "text": event.text, "from": "agent", "timestamp": time.time(), "attachments": event.attachments, }) await self._send_outgoing(user_id, outgoing) return session_id, user_id async def _on_binary( self, ws: web.WebSocketResponse, raw: bytes, session_id: str, user_id: str, ) -> None: pass async def _send_outgoing(self, user_id: str, outgoing: list[OutgoingEvent]) -> None: ws = self._connections.get(user_id) if ws is None: return for event in outgoing: await self._send(ws, outgoing_to_json(event)) async def _history_key(self, user_id: str, chat_id: str) -> str: return f"web_history:{user_id}:{chat_id}" async def _save_message(self, user_id: str, msg: dict) -> None: key = await self._history_key(user_id, msg["chat_id"]) history = await self.runtime.store.get(key) or {"messages": []} entry = dict(msg) if entry.get("attachments"): entry["attachments"] = [ { "type": getattr(a, "type", "document"), "filename": getattr(a, "filename", None), "mime_type": getattr(a, "mime_type", None), "workspace_path": getattr(a, "workspace_path", None), "download_url": f"/files/{a.workspace_path}" if a.workspace_path else None, } for a in entry["attachments"] ] history["messages"].append(entry) history["messages"] = history["messages"][-200:] await self.runtime.store.set(key, history) async def _load_history(self, user_id: str) -> dict[str, list[dict]]: result: dict[str, list[dict]] = {} for raw_key in await self.runtime.store.keys(f"web_history:{user_id}:"): chat_id = raw_key.rsplit(":", 1)[-1] data = await self.runtime.store.get(raw_key) if data and "messages" in data: result[chat_id] = data["messages"] return result async def _send_history(self, ws: web.WebSocketResponse, user_id: str) -> None: history = await self._load_history(user_id) if not history: return payload: dict = {"type": "history", "chats": history} chat_names = {} for key in await self.runtime.store.keys(f"web_chat_name:{user_id}:"): cid = key.rsplit(":", 1)[-1] name = await self.runtime.store.get(key) if name: chat_names[cid] = name if chat_names: payload["chat_names"] = chat_names archived_chats = [] for key in await self.runtime.store.keys(f"web_chat_archived:{user_id}:"): cid = key.rsplit(":", 1)[-1] val = await self.runtime.store.get(key) if val: archived_chats.append(cid) if archived_chats: payload["archived"] = archived_chats await self._send(ws, json.dumps(payload, ensure_ascii=False)) async def handle_file_download(self, request: web.Request) -> web.Response: filepath = self.runtime.workspace_dir / request.match_info["path"] if not filepath.exists() or not filepath.is_file(): raise web.HTTPNotFound safe_path = filepath.resolve() safe_workspace = self.runtime.workspace_dir.resolve() if not str(safe_path).startswith(str(safe_workspace)): raise web.HTTPForbidden return web.FileResponse(filepath) async def handle_upload(self, request: web.Request) -> web.Response: reader = await request.multipart() field = await reader.next() if not field or not field.filename: return web.json_response({"error": "no file"}, status=400) user_id = request.query.get("user_id", "unknown") safe_name = re.sub(r"[^\w.-]", "_", field.filename) filename = f"{user_id}_{safe_name}" workspace = self.runtime.workspace_dir filepath = workspace / filename filepath.parent.mkdir(parents=True, exist_ok=True) with open(filepath, "wb") as f: while True: chunk = await field.read_chunk() if not chunk: break f.write(chunk) return web.json_response({ "attachment_id": filename, "filename": field.filename, "workspace_path": filename, }) async def _send(self, ws: web.WebSocketResponse, text: str) -> None: try: await ws.send_str(text) except ConnectionResetError: pass async def main() -> None: port = int(os.environ.get("WEB_PORT", "8080")) runtime = build_runtime() bot = WebBot(runtime) app = web.Application() static_dir = Path(__file__).resolve().parent / "static" if static_dir.is_dir(): app.router.add_static("/static/", path=str(static_dir), name="static") app.router.add_get("/ws", bot.handle_ws) async def index(request: web.Request) -> web.FileResponse: index_file = static_dir / "index.html" if index_file.is_file(): return web.FileResponse(index_file) return web.Response(text="Web Chat surface — frontend not built", content_type="text/plain") app.router.add_get("/", index) app.router.add_post("/upload", bot.handle_upload) app.router.add_get("/files/{path:.*}", bot.handle_file_download) logger.info("Web surface starting", port=port) runner = web.AppRunner(app) await runner.setup() site = web.TCPSite(runner, "0.0.0.0", port) await site.start() try: await asyncio.Event().wait() finally: close = getattr(runtime.platform, "close", None) if callable(close): await close() await runner.cleanup() if __name__ == "__main__": asyncio.run(main())