import asyncio import json import os from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from typing import Any from urllib import error, request from browser_use import Agent, Browser, ChatOpenAI def _json_response(handler, status_code, payload): data = json.dumps(payload, ensure_ascii=False).encode("utf-8") handler.send_response(status_code) handler.send_header("Content-Type", "application/json; charset=utf-8") handler.send_header("Content-Length", str(len(data))) handler.end_headers() handler.wfile.write(data) async def run_browser_task(task): cdp_url = os.getenv("BROWSER_CDP_URL", "http://127.0.0.1:9222") browser_view_url = os.getenv("BROWSER_VIEW_URL", "") browser = Browser(cdp_url=cdp_url) llm = ChatOpenAI( model=os.getenv("MODEL_DEFAULT", "qwen3.5-122b"), api_key=os.getenv("OPENAI_API_KEY"), base_url=os.getenv("OPENAI_BASE_URL"), temperature=0.0, ) agent = Agent(task=task, llm=llm, browser=browser) try: history = await agent.run() return { "success": True, "result": history.final_result(), "history": _extract_history_events(history), "browser_view": browser_view_url, } except Exception as err: return {"success": False, "error": f"Browser automation failed: {err}"} finally: try: await browser.close() except Exception: pass def _to_jsonable(value: Any) -> Any: if value is None or isinstance(value, (str, int, float, bool)): return value if isinstance(value, dict): return {str(key): _to_jsonable(val) for key, val in value.items()} if isinstance(value, (list, tuple, set)): return [_to_jsonable(item) for item in value] for method_name in ("model_dump", "dict", "to_dict"): method = getattr(value, method_name, None) if callable(method): try: dumped = method() return _to_jsonable(dumped) except Exception: pass return str(value) def _call_history_items(history: Any, attr_name: str) -> list[Any]: method = getattr(history, attr_name, None) if not callable(method): return [] try: raw: Any = method() except Exception: return [] if raw is None: return [] if isinstance(raw, list): return raw if isinstance(raw, (str, bytes, dict)): return [raw] try: return list(raw) except TypeError: return [raw] except Exception: return [raw] def _extract_history_events(history: Any) -> list[dict[str, Any]]: events: list[dict[str, Any]] = [] def append_many(kind: str, items: list[Any]) -> None: if not items: return for item in items: normalized = _to_jsonable(item) payload = normalized if isinstance(normalized, dict) else {"value": normalized} content = normalized if isinstance(normalized, str) else json.dumps(normalized, ensure_ascii=False) events.append( { "step": len(events) + 1, "kind": kind, "content": content, "data": payload, } ) append_many("thought", _call_history_items(history, "model_thoughts")) append_many("action", _call_history_items(history, "model_actions")) append_many("error", _call_history_items(history, "errors")) if events: return events fallback = _to_jsonable(history) return [ { "step": 1, "kind": "system", "content": fallback if isinstance(fallback, str) else json.dumps(fallback, ensure_ascii=False), "data": fallback if isinstance(fallback, dict) else {"value": fallback}, } ] class BrowserUseRPCHandler(BaseHTTPRequestHandler): def do_GET(self): if self.path != "/health": _json_response(self, 404, {"success": False, "error": "Not found"}) return try: debug_url = os.getenv("BROWSER_HEALTH_URL", "http://127.0.0.1:9222/json/version") with request.urlopen(debug_url, timeout=2): pass _json_response(self, 200, {"success": True}) except Exception as err: _json_response(self, 503, {"success": False, "error": f"Browser is not ready: {err}"}) def do_POST(self): if self.path != "/run": _json_response(self, 404, {"success": False, "error": "Not found"}) return try: content_length = int(self.headers.get("Content-Length", "0")) raw = self.rfile.read(content_length) payload = json.loads(raw.decode("utf-8") if raw else "{}") task = payload.get("task", "") if not isinstance(task, str) or not task.strip(): _json_response(self, 400, {"success": False, "error": "Field 'task' is required"}) return result = asyncio.run(run_browser_task(task.strip())) code = 200 if result.get("success") else 500 _json_response(self, code, result) except json.JSONDecodeError: _json_response(self, 400, {"success": False, "error": "Invalid JSON payload"}) except error.URLError as err: _json_response(self, 503, {"success": False, "error": f"Transport error: {err}"}) except Exception as err: _json_response(self, 500, {"success": False, "error": f"Internal error: {err}"}) def log_message(self, format_str, *args): return def main(): host = os.getenv("BROWSER_USE_RPC_HOST", "0.0.0.0") port = int(os.getenv("BROWSER_USE_RPC_PORT", "8787")) server = ThreadingHTTPServer((host, port), BrowserUseRPCHandler) print(f"browser-use RPC listening on {host}:{port}") server.serve_forever() if __name__ == "__main__": main()