BrowserUse_and_ComputerUse_.../api/services/browser_runtime_manager.py

464 lines
15 KiB
Python

"""Provision isolated browser-use Docker runtimes for API runs."""
from __future__ import annotations
import hashlib
import json
import logging
import os
import re
import subprocess
import tempfile
import threading
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from urllib import request
logger = logging.getLogger(__name__)
_DEFAULT_SHARED_CDP_URL = "http://browser:9222"
_DEFAULT_SHARED_RPC_URL = "http://browser:8787/run"
_DEFAULT_RUNTIME_IMAGE = "browser-use-browser-runtime:latest"
_DEFAULT_RUNTIME_NETWORK = "browser-net"
_DEFAULT_TTL_SECONDS = 900
_DEFAULT_START_TIMEOUT = 45
_DEFAULT_ENABLE_UI = True
_REGISTRY_LOCK = threading.Lock()
_VIEW_URL_CACHE_LOCK = threading.Lock()
_VIEW_URL_CACHE: dict[str, Any] = {"value": "", "expires_at": 0.0}
@dataclass(frozen=True)
class BrowserRuntimeConfig:
mode: str
runtime_image: str
runtime_network: str
runtime_ttl_seconds: int
runtime_start_timeout: int
shared_cdp_url: str
enable_ui: bool
def _state_dir() -> Path:
return Path(os.getenv("BROWSER_RUNTIME_STATE_DIR", "/tmp/browser-use-api"))
def _registry_path() -> Path:
return _state_dir() / "docker_runtimes.json"
def _as_int(value: Any, default: int) -> int:
try:
return max(1, int(value))
except (TypeError, ValueError):
return default
def _as_bool(value: Any, default: bool) -> bool:
if value is None:
return default
if isinstance(value, bool):
return value
return str(value).strip().lower() in {"1", "true", "yes", "on"}
def get_browser_runtime_config() -> BrowserRuntimeConfig:
mode = str(os.getenv("BROWSER_USE_ISOLATION_MODE", "shared")).strip().lower()
if mode not in {"shared", "docker-per-principal", "docker-per-task"}:
logger.warning("Unknown browser-use isolation mode %r; falling back to shared", mode)
mode = "shared"
return BrowserRuntimeConfig(
mode=mode,
runtime_image=os.getenv("BROWSER_RUNTIME_IMAGE", _DEFAULT_RUNTIME_IMAGE).strip()
or _DEFAULT_RUNTIME_IMAGE,
runtime_network=os.getenv("BROWSER_RUNTIME_NETWORK", _DEFAULT_RUNTIME_NETWORK).strip()
or _DEFAULT_RUNTIME_NETWORK,
runtime_ttl_seconds=_as_int(
os.getenv("BROWSER_RUNTIME_TTL_SECONDS"),
_DEFAULT_TTL_SECONDS,
),
runtime_start_timeout=_as_int(
os.getenv("BROWSER_RUNTIME_START_TIMEOUT"),
_DEFAULT_START_TIMEOUT,
),
shared_cdp_url=os.getenv("BROWSER_URL", _DEFAULT_SHARED_CDP_URL).strip()
or _DEFAULT_SHARED_CDP_URL,
enable_ui=_as_bool(
os.getenv("BROWSER_RUNTIME_ENABLE_UI"),
_DEFAULT_ENABLE_UI,
),
)
def resolve_isolation_owner(
mode: str,
task_id: str | None,
metadata: dict[str, Any] | None = None,
thread_id: str | None = None,
) -> str:
if mode == "docker-per-task":
return (task_id or "default").strip() or "default"
metadata = metadata or {}
for key in ("user_id", "session_id"):
value = metadata.get(key)
if value not in (None, ""):
return str(value).strip() or "default"
return (thread_id or task_id or "default").strip() or "default"
def hash_runtime_owner(owner: str) -> str:
return hashlib.sha256(owner.encode("utf-8")).hexdigest()[:16]
def _normalize_browser_view_base_url(raw_url: str) -> str:
url = (raw_url or "").strip()
if not url:
return ""
for marker in ("/vnc.html", "/index.html"):
idx = url.find(marker)
if idx != -1:
url = url[:idx]
break
return url.rstrip("/")
def _discover_browser_view_base_url_from_tunnel() -> str:
now = time.time()
with _VIEW_URL_CACHE_LOCK:
cached_value = str(_VIEW_URL_CACHE.get("value", "") or "")
expires_at = float(_VIEW_URL_CACHE.get("expires_at", 0.0) or 0.0)
if cached_value and now < expires_at:
return cached_value
try:
result = _run_docker(["logs", "--tail", "200", "browser-use-tunnel"], check=False)
combined = "\n".join(part for part in [result.stdout or "", result.stderr or ""] if part)
matches = re.findall(r"https://[^\s\"'<>]+", combined)
base_url = _normalize_browser_view_base_url(matches[-1]) if matches else ""
except Exception as exc:
logger.debug("Failed to discover browser view URL from tunnel logs: %s", exc)
base_url = ""
with _VIEW_URL_CACHE_LOCK:
_VIEW_URL_CACHE["value"] = base_url
_VIEW_URL_CACHE["expires_at"] = now + (60 if base_url else 10)
return base_url
def get_browser_view_url(
task_id: str | None = None,
metadata: dict[str, Any] | None = None,
thread_id: str | None = None,
) -> str:
base_url = _normalize_browser_view_base_url(
os.getenv("BROWSER_VIEW_BASE_URL", "") or os.getenv("BROWSER_VIEW_URL", "")
)
if not base_url:
base_url = _discover_browser_view_base_url_from_tunnel()
if not base_url:
return ""
config = get_browser_runtime_config()
if config.mode == "shared":
return f"{base_url}/vnc.html?path=websockify"
owner = resolve_isolation_owner(config.mode, task_id, metadata, thread_id)
owner_hash = hash_runtime_owner(owner)
return f"{base_url}/view/{owner_hash}/vnc.html?path=view/{owner_hash}/websockify"
def _shared_rpc_url() -> str:
return os.getenv("BROWSER_USE_RPC_URL", _DEFAULT_SHARED_RPC_URL).strip() or _DEFAULT_SHARED_RPC_URL
def _runtime_rpc_url(container_name: str) -> str:
return f"http://{container_name}:8787/run"
def _container_name(owner_hash: str) -> str:
return f"browser-use-browser-{owner_hash}"
def _volume_name(owner_hash: str) -> str:
return f"browser-use-profile-{owner_hash}"
def _load_registry() -> dict[str, Any]:
path = _registry_path()
if not path.exists():
return {"runtimes": {}}
try:
with open(path, "r", encoding="utf-8") as fh:
data = json.load(fh) or {}
if isinstance(data, dict) and isinstance(data.get("runtimes"), dict):
return data
except Exception as exc:
logger.warning("Failed to read browser-use runtime registry %s: %s", path, exc)
return {"runtimes": {}}
def _save_registry(payload: dict[str, Any]) -> None:
path = _registry_path()
path.parent.mkdir(parents=True, exist_ok=True)
fd, tmp_path = tempfile.mkstemp(dir=str(path.parent), prefix=".browser_use_", suffix=".tmp")
try:
with os.fdopen(fd, "w", encoding="utf-8") as fh:
json.dump(payload, fh, indent=2, sort_keys=True)
fh.flush()
os.fsync(fh.fileno())
os.replace(tmp_path, path)
except Exception:
try:
os.unlink(tmp_path)
except OSError:
pass
raise
def _run_docker(args: list[str], check: bool = True) -> subprocess.CompletedProcess[str]:
cmd = ["docker", *args]
logger.debug("browser-use docker cmd: %s", " ".join(cmd))
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=120,
)
if check and result.returncode != 0:
stderr = (result.stderr or result.stdout or "").strip()
raise RuntimeError(f"Docker command failed ({' '.join(cmd)}): {stderr}")
return result
def _ensure_docker_access() -> None:
_run_docker(["version"], check=True)
def _container_exists(container_name: str) -> bool:
result = _run_docker(["inspect", container_name], check=False)
return result.returncode == 0
def _container_running(container_name: str) -> bool:
result = _run_docker(["inspect", "-f", "{{.State.Running}}", container_name], check=False)
return result.returncode == 0 and result.stdout.strip().lower() == "true"
def _remove_container(container_name: str) -> None:
if container_name:
_run_docker(["rm", "-f", container_name], check=False)
def _volume_exists(volume_name: str) -> bool:
result = _run_docker(["volume", "inspect", volume_name], check=False)
return result.returncode == 0
def _ensure_volume(volume_name: str, owner_hash: str) -> None:
if _volume_exists(volume_name):
return
_run_docker(
[
"volume",
"create",
"--label",
"browser_use.runtime=true",
"--label",
f"browser_use.owner_hash={owner_hash}",
volume_name,
],
check=True,
)
def _remove_volume(volume_name: str) -> None:
if volume_name:
_run_docker(["volume", "rm", "-f", volume_name], check=False)
def _runtime_env_args(browser_view_url: str, config: BrowserRuntimeConfig) -> list[str]:
env: dict[str, str] = {
"BROWSER_ENABLE_UI": "true" if config.enable_ui else "false",
"BROWSER_DATA_DIR": "/data",
"BROWSER_USE_RPC_HOST": "0.0.0.0",
"BROWSER_USE_RPC_PORT": "8787",
}
if browser_view_url:
env["BROWSER_VIEW_URL"] = browser_view_url
for key in ("MODEL_DEFAULT", "OPENAI_API_KEY", "OPENAI_BASE_URL"):
value = os.getenv(key)
if value is not None:
env[key] = value
args: list[str] = []
for key, value in env.items():
args.extend(["-e", f"{key}={value}"])
return args
def _start_runtime_container(
container_name: str,
volume_name: str,
owner_hash: str,
browser_view_url: str,
config: BrowserRuntimeConfig,
) -> None:
_ensure_volume(volume_name, owner_hash)
run_args = [
"run",
"-d",
"--name",
container_name,
"--network",
config.runtime_network,
"--shm-size",
"2g",
"--label",
"browser_use.runtime=true",
"--label",
f"browser_use.owner_hash={owner_hash}",
"--label",
"browser_use.managed_by=browser_runtime_manager",
*_runtime_env_args(browser_view_url, config),
"-v",
f"{volume_name}:/data",
config.runtime_image,
]
_run_docker(run_args, check=True)
def _wait_for_runtime(container_name: str, timeout_seconds: int) -> None:
deadline = time.time() + timeout_seconds
health_url = f"http://{container_name}:8787/health"
last_error = ""
while time.time() < deadline:
try:
with request.urlopen(health_url, timeout=2) as response:
if 200 <= response.status < 300:
return
last_error = f"HTTP {response.status}"
except Exception as exc:
last_error = str(exc)
time.sleep(1)
raise RuntimeError(f"Browser runtime {container_name} did not become ready: {last_error}")
def _cleanup_expired_runtimes_locked(registry: dict[str, Any], config: BrowserRuntimeConfig) -> None:
now = time.time()
runtimes = registry.setdefault("runtimes", {})
expired_keys: list[str] = []
for runtime_key, entry in list(runtimes.items()):
last_used = float(entry.get("last_used", 0) or 0)
if not last_used or now - last_used < config.runtime_ttl_seconds:
continue
container_name = str(entry.get("container_name", "") or "")
volume_name = str(entry.get("volume_name", "") or "")
mode = str(entry.get("mode", "") or "")
logger.info("Cleaning expired browser-use runtime %s (%s)", runtime_key, container_name)
_remove_container(container_name)
if mode == "docker-per-task":
_remove_volume(volume_name)
expired_keys.append(runtime_key)
for runtime_key in expired_keys:
runtimes.pop(runtime_key, None)
def ensure_browser_runtime(
task_id: str | None = None,
metadata: dict[str, Any] | None = None,
thread_id: str | None = None,
) -> dict[str, str]:
config = get_browser_runtime_config()
if config.mode == "shared":
return {
"cdp_url": config.shared_cdp_url,
"rpc_url": _shared_rpc_url(),
"browser_view": get_browser_view_url(task_id=task_id, metadata=metadata, thread_id=thread_id),
"isolation_mode": "shared",
"owner_hash": "",
}
_ensure_docker_access()
owner = resolve_isolation_owner(config.mode, task_id, metadata, thread_id)
owner_hash = hash_runtime_owner(owner)
runtime_key = f"{config.mode}:{owner_hash}"
container_name = _container_name(owner_hash)
volume_name = _volume_name(owner_hash)
browser_view_url = get_browser_view_url(task_id=task_id, metadata=metadata, thread_id=thread_id)
with _REGISTRY_LOCK:
registry = _load_registry()
_cleanup_expired_runtimes_locked(registry, config)
if _container_running(container_name):
registry.setdefault("runtimes", {})[runtime_key] = {
"container_name": container_name,
"volume_name": volume_name,
"last_used": time.time(),
"mode": config.mode,
"owner_hash": owner_hash,
}
_save_registry(registry)
return {
"cdp_url": f"http://{container_name}:9222",
"rpc_url": _runtime_rpc_url(container_name),
"browser_view": browser_view_url,
"isolation_mode": config.mode,
"owner_hash": owner_hash,
}
if _container_exists(container_name):
_remove_container(container_name)
_start_runtime_container(container_name, volume_name, owner_hash, browser_view_url, config)
_wait_for_runtime(container_name, config.runtime_start_timeout)
registry.setdefault("runtimes", {})[runtime_key] = {
"container_name": container_name,
"volume_name": volume_name,
"last_used": time.time(),
"mode": config.mode,
"owner_hash": owner_hash,
}
_save_registry(registry)
return {
"cdp_url": f"http://{container_name}:9222",
"rpc_url": _runtime_rpc_url(container_name),
"browser_view": browser_view_url,
"isolation_mode": config.mode,
"owner_hash": owner_hash,
}
def cleanup_browser_runtime(
task_id: str | None = None,
metadata: dict[str, Any] | None = None,
thread_id: str | None = None,
) -> None:
config = get_browser_runtime_config()
if config.mode != "docker-per-task":
return
owner = resolve_isolation_owner(config.mode, task_id, metadata, thread_id)
owner_hash = hash_runtime_owner(owner)
runtime_key = f"{config.mode}:{owner_hash}"
container_name = _container_name(owner_hash)
volume_name = _volume_name(owner_hash)
with _REGISTRY_LOCK:
registry = _load_registry()
_remove_container(container_name)
_remove_volume(volume_name)
registry.setdefault("runtimes", {}).pop(runtime_key, None)
_save_registry(registry)