"""Provision isolated browser-use Docker runtimes for API runs.""" from __future__ import annotations import hashlib import json import logging import os import re import subprocess import tempfile import threading import time from dataclasses import dataclass from pathlib import Path from typing import Any from urllib import request logger = logging.getLogger(__name__) _DEFAULT_SHARED_CDP_URL = "http://browser:9222" _DEFAULT_SHARED_RPC_URL = "http://browser:8787/run" _DEFAULT_RUNTIME_IMAGE = "browser-use-browser-runtime:latest" _DEFAULT_RUNTIME_NETWORK = "browser-net" _DEFAULT_TTL_SECONDS = 900 _DEFAULT_START_TIMEOUT = 45 _DEFAULT_ENABLE_UI = True _REGISTRY_LOCK = threading.Lock() _VIEW_URL_CACHE_LOCK = threading.Lock() _VIEW_URL_CACHE: dict[str, Any] = {"value": "", "expires_at": 0.0} @dataclass(frozen=True) class BrowserRuntimeConfig: mode: str runtime_image: str runtime_network: str runtime_ttl_seconds: int runtime_start_timeout: int shared_cdp_url: str enable_ui: bool def _state_dir() -> Path: return Path(os.getenv("BROWSER_RUNTIME_STATE_DIR", "/tmp/browser-use-api")) def _registry_path() -> Path: return _state_dir() / "docker_runtimes.json" def _as_int(value: Any, default: int) -> int: try: return max(1, int(value)) except (TypeError, ValueError): return default def _as_bool(value: Any, default: bool) -> bool: if value is None: return default if isinstance(value, bool): return value return str(value).strip().lower() in {"1", "true", "yes", "on"} def get_browser_runtime_config() -> BrowserRuntimeConfig: mode = str(os.getenv("BROWSER_USE_ISOLATION_MODE", "shared")).strip().lower() if mode not in {"shared", "docker-per-principal", "docker-per-task"}: logger.warning("Unknown browser-use isolation mode %r; falling back to shared", mode) mode = "shared" return BrowserRuntimeConfig( mode=mode, runtime_image=os.getenv("BROWSER_RUNTIME_IMAGE", _DEFAULT_RUNTIME_IMAGE).strip() or _DEFAULT_RUNTIME_IMAGE, runtime_network=os.getenv("BROWSER_RUNTIME_NETWORK", _DEFAULT_RUNTIME_NETWORK).strip() or _DEFAULT_RUNTIME_NETWORK, runtime_ttl_seconds=_as_int( os.getenv("BROWSER_RUNTIME_TTL_SECONDS"), _DEFAULT_TTL_SECONDS, ), runtime_start_timeout=_as_int( os.getenv("BROWSER_RUNTIME_START_TIMEOUT"), _DEFAULT_START_TIMEOUT, ), shared_cdp_url=os.getenv("BROWSER_URL", _DEFAULT_SHARED_CDP_URL).strip() or _DEFAULT_SHARED_CDP_URL, enable_ui=_as_bool( os.getenv("BROWSER_RUNTIME_ENABLE_UI"), _DEFAULT_ENABLE_UI, ), ) def resolve_isolation_owner( mode: str, task_id: str | None, metadata: dict[str, Any] | None = None, thread_id: str | None = None, ) -> str: if mode == "docker-per-task": return (task_id or "default").strip() or "default" metadata = metadata or {} for key in ("user_id", "session_id"): value = metadata.get(key) if value not in (None, ""): return str(value).strip() or "default" return (thread_id or task_id or "default").strip() or "default" def hash_runtime_owner(owner: str) -> str: return hashlib.sha256(owner.encode("utf-8")).hexdigest()[:16] def _normalize_browser_view_base_url(raw_url: str) -> str: url = (raw_url or "").strip() if not url: return "" for marker in ("/vnc.html", "/index.html"): idx = url.find(marker) if idx != -1: url = url[:idx] break return url.rstrip("/") def _discover_browser_view_base_url_from_tunnel() -> str: now = time.time() with _VIEW_URL_CACHE_LOCK: cached_value = str(_VIEW_URL_CACHE.get("value", "") or "") expires_at = float(_VIEW_URL_CACHE.get("expires_at", 0.0) or 0.0) if cached_value and now < expires_at: return cached_value try: result = _run_docker(["logs", "--tail", "200", "browser-use-tunnel"], check=False) combined = "\n".join(part for part in [result.stdout or "", result.stderr or ""] if part) matches = re.findall(r"https://[^\s\"'<>]+", combined) base_url = _normalize_browser_view_base_url(matches[-1]) if matches else "" except Exception as exc: logger.debug("Failed to discover browser view URL from tunnel logs: %s", exc) base_url = "" with _VIEW_URL_CACHE_LOCK: _VIEW_URL_CACHE["value"] = base_url _VIEW_URL_CACHE["expires_at"] = now + (60 if base_url else 10) return base_url def get_browser_view_url( task_id: str | None = None, metadata: dict[str, Any] | None = None, thread_id: str | None = None, ) -> str: base_url = _normalize_browser_view_base_url( os.getenv("BROWSER_VIEW_BASE_URL", "") or os.getenv("BROWSER_VIEW_URL", "") ) if not base_url: base_url = _discover_browser_view_base_url_from_tunnel() if not base_url: return "" config = get_browser_runtime_config() if config.mode == "shared": return f"{base_url}/vnc.html?path=websockify" owner = resolve_isolation_owner(config.mode, task_id, metadata, thread_id) owner_hash = hash_runtime_owner(owner) return f"{base_url}/view/{owner_hash}/vnc.html?path=view/{owner_hash}/websockify" def _shared_rpc_url() -> str: return os.getenv("BROWSER_USE_RPC_URL", _DEFAULT_SHARED_RPC_URL).strip() or _DEFAULT_SHARED_RPC_URL def _runtime_rpc_url(container_name: str) -> str: return f"http://{container_name}:8787/run" def _container_name(owner_hash: str) -> str: return f"browser-use-browser-{owner_hash}" def _volume_name(owner_hash: str) -> str: return f"browser-use-profile-{owner_hash}" def _load_registry() -> dict[str, Any]: path = _registry_path() if not path.exists(): return {"runtimes": {}} try: with open(path, "r", encoding="utf-8") as fh: data = json.load(fh) or {} if isinstance(data, dict) and isinstance(data.get("runtimes"), dict): return data except Exception as exc: logger.warning("Failed to read browser-use runtime registry %s: %s", path, exc) return {"runtimes": {}} def _save_registry(payload: dict[str, Any]) -> None: path = _registry_path() path.parent.mkdir(parents=True, exist_ok=True) fd, tmp_path = tempfile.mkstemp(dir=str(path.parent), prefix=".browser_use_", suffix=".tmp") try: with os.fdopen(fd, "w", encoding="utf-8") as fh: json.dump(payload, fh, indent=2, sort_keys=True) fh.flush() os.fsync(fh.fileno()) os.replace(tmp_path, path) except Exception: try: os.unlink(tmp_path) except OSError: pass raise def _run_docker(args: list[str], check: bool = True) -> subprocess.CompletedProcess[str]: cmd = ["docker", *args] logger.debug("browser-use docker cmd: %s", " ".join(cmd)) result = subprocess.run( cmd, capture_output=True, text=True, timeout=120, ) if check and result.returncode != 0: stderr = (result.stderr or result.stdout or "").strip() raise RuntimeError(f"Docker command failed ({' '.join(cmd)}): {stderr}") return result def _ensure_docker_access() -> None: _run_docker(["version"], check=True) def _container_exists(container_name: str) -> bool: result = _run_docker(["inspect", container_name], check=False) return result.returncode == 0 def _container_running(container_name: str) -> bool: result = _run_docker(["inspect", "-f", "{{.State.Running}}", container_name], check=False) return result.returncode == 0 and result.stdout.strip().lower() == "true" def _remove_container(container_name: str) -> None: if container_name: _run_docker(["rm", "-f", container_name], check=False) def _volume_exists(volume_name: str) -> bool: result = _run_docker(["volume", "inspect", volume_name], check=False) return result.returncode == 0 def _ensure_volume(volume_name: str, owner_hash: str) -> None: if _volume_exists(volume_name): return _run_docker( [ "volume", "create", "--label", "browser_use.runtime=true", "--label", f"browser_use.owner_hash={owner_hash}", volume_name, ], check=True, ) def _remove_volume(volume_name: str) -> None: if volume_name: _run_docker(["volume", "rm", "-f", volume_name], check=False) def _runtime_env_args(browser_view_url: str, config: BrowserRuntimeConfig) -> list[str]: env: dict[str, str] = { "BROWSER_ENABLE_UI": "true" if config.enable_ui else "false", "BROWSER_DATA_DIR": "/data", "BROWSER_USE_RPC_HOST": "0.0.0.0", "BROWSER_USE_RPC_PORT": "8787", } if browser_view_url: env["BROWSER_VIEW_URL"] = browser_view_url for key in ("MODEL_DEFAULT", "OPENAI_API_KEY", "OPENAI_BASE_URL"): value = os.getenv(key) if value is not None: env[key] = value args: list[str] = [] for key, value in env.items(): args.extend(["-e", f"{key}={value}"]) return args def _start_runtime_container( container_name: str, volume_name: str, owner_hash: str, browser_view_url: str, config: BrowserRuntimeConfig, ) -> None: _ensure_volume(volume_name, owner_hash) run_args = [ "run", "-d", "--name", container_name, "--network", config.runtime_network, "--shm-size", "2g", "--label", "browser_use.runtime=true", "--label", f"browser_use.owner_hash={owner_hash}", "--label", "browser_use.managed_by=browser_runtime_manager", *_runtime_env_args(browser_view_url, config), "-v", f"{volume_name}:/data", config.runtime_image, ] _run_docker(run_args, check=True) def _wait_for_runtime(container_name: str, timeout_seconds: int) -> None: deadline = time.time() + timeout_seconds health_url = f"http://{container_name}:8787/health" last_error = "" while time.time() < deadline: try: with request.urlopen(health_url, timeout=2) as response: if 200 <= response.status < 300: return last_error = f"HTTP {response.status}" except Exception as exc: last_error = str(exc) time.sleep(1) raise RuntimeError(f"Browser runtime {container_name} did not become ready: {last_error}") def _cleanup_expired_runtimes_locked(registry: dict[str, Any], config: BrowserRuntimeConfig) -> None: now = time.time() runtimes = registry.setdefault("runtimes", {}) expired_keys: list[str] = [] for runtime_key, entry in list(runtimes.items()): last_used = float(entry.get("last_used", 0) or 0) if not last_used or now - last_used < config.runtime_ttl_seconds: continue container_name = str(entry.get("container_name", "") or "") volume_name = str(entry.get("volume_name", "") or "") mode = str(entry.get("mode", "") or "") logger.info("Cleaning expired browser-use runtime %s (%s)", runtime_key, container_name) _remove_container(container_name) if mode == "docker-per-task": _remove_volume(volume_name) expired_keys.append(runtime_key) for runtime_key in expired_keys: runtimes.pop(runtime_key, None) def ensure_browser_runtime( task_id: str | None = None, metadata: dict[str, Any] | None = None, thread_id: str | None = None, ) -> dict[str, str]: config = get_browser_runtime_config() if config.mode == "shared": return { "cdp_url": config.shared_cdp_url, "rpc_url": _shared_rpc_url(), "browser_view": get_browser_view_url(task_id=task_id, metadata=metadata, thread_id=thread_id), "isolation_mode": "shared", "owner_hash": "", } _ensure_docker_access() owner = resolve_isolation_owner(config.mode, task_id, metadata, thread_id) owner_hash = hash_runtime_owner(owner) runtime_key = f"{config.mode}:{owner_hash}" container_name = _container_name(owner_hash) volume_name = _volume_name(owner_hash) browser_view_url = get_browser_view_url(task_id=task_id, metadata=metadata, thread_id=thread_id) with _REGISTRY_LOCK: registry = _load_registry() _cleanup_expired_runtimes_locked(registry, config) if _container_running(container_name): registry.setdefault("runtimes", {})[runtime_key] = { "container_name": container_name, "volume_name": volume_name, "last_used": time.time(), "mode": config.mode, "owner_hash": owner_hash, } _save_registry(registry) return { "cdp_url": f"http://{container_name}:9222", "rpc_url": _runtime_rpc_url(container_name), "browser_view": browser_view_url, "isolation_mode": config.mode, "owner_hash": owner_hash, } if _container_exists(container_name): _remove_container(container_name) _start_runtime_container(container_name, volume_name, owner_hash, browser_view_url, config) _wait_for_runtime(container_name, config.runtime_start_timeout) registry.setdefault("runtimes", {})[runtime_key] = { "container_name": container_name, "volume_name": volume_name, "last_used": time.time(), "mode": config.mode, "owner_hash": owner_hash, } _save_registry(registry) return { "cdp_url": f"http://{container_name}:9222", "rpc_url": _runtime_rpc_url(container_name), "browser_view": browser_view_url, "isolation_mode": config.mode, "owner_hash": owner_hash, } def cleanup_browser_runtime( task_id: str | None = None, metadata: dict[str, Any] | None = None, thread_id: str | None = None, ) -> None: config = get_browser_runtime_config() if config.mode != "docker-per-task": return owner = resolve_isolation_owner(config.mode, task_id, metadata, thread_id) owner_hash = hash_runtime_owner(owner) runtime_key = f"{config.mode}:{owner_hash}" container_name = _container_name(owner_hash) volume_name = _volume_name(owner_hash) with _REGISTRY_LOCK: registry = _load_registry() _remove_container(container_name) _remove_volume(volume_name) registry.setdefault("runtimes", {}).pop(runtime_key, None) _save_registry(registry)