BrowserUse_and_ComputerUse_.../hermes_code/tools/browser_use_manager.py

496 lines
15 KiB
Python

"""Provision isolated browser-use Docker runtimes with per-principal profiles."""
from __future__ import annotations
import hashlib
import json
import logging
import os
import re
import subprocess
import tempfile
import threading
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, Optional
import requests
logger = logging.getLogger(__name__)
_DEFAULT_SHARED_CDP_URL = "http://browser:9222"
_DEFAULT_SHARED_RPC_URL = "http://browser:8787/run"
_DEFAULT_RUNTIME_IMAGE = "hermes-browser-runtime:latest"
_DEFAULT_RUNTIME_NETWORK = "hermes-net"
_DEFAULT_TTL_SECONDS = 900
_DEFAULT_START_TIMEOUT = 45
_DEFAULT_ENABLE_UI = True
_REGISTRY_LOCK = threading.Lock()
_VIEW_URL_CACHE_LOCK = threading.Lock()
_VIEW_URL_CACHE: dict[str, Any] = {"value": "", "expires_at": 0.0}
@dataclass(frozen=True)
class BrowserUseIsolationConfig:
mode: str
runtime_image: str
runtime_network: str
runtime_ttl_seconds: int
runtime_start_timeout: int
shared_cdp_url: str
enable_ui: bool
def _hermes_home() -> Path:
return Path(os.environ.get("HERMES_HOME", Path.home() / ".hermes"))
def _registry_path() -> Path:
return _hermes_home() / "browser_use" / "docker_runtimes.json"
def _load_runtime_config_file() -> Dict[str, Any]:
config_path = _hermes_home() / "config.yaml"
if not config_path.exists():
return {}
try:
import yaml
with open(config_path, "r", encoding="utf-8") as fh:
data = yaml.safe_load(fh) or {}
if isinstance(data, dict):
browser_cfg = data.get("browser", {})
return browser_cfg if isinstance(browser_cfg, dict) else {}
except Exception as exc:
logger.debug("Failed to load browser config for browser-use isolation: %s", exc)
return {}
def _env_or_config(env_name: str, config_key: str, default: Any) -> Any:
value = os.getenv(env_name)
if value not in (None, ""):
return value
return _load_runtime_config_file().get(config_key, default)
def _as_int(value: Any, default: int) -> int:
try:
return max(1, int(value))
except (TypeError, ValueError):
return default
def _as_bool(value: Any, default: bool) -> bool:
if value is None:
return default
if isinstance(value, bool):
return value
return str(value).strip().lower() in {"1", "true", "yes", "on"}
def get_browser_use_isolation_config() -> BrowserUseIsolationConfig:
mode = str(
_env_or_config(
"BROWSER_USE_ISOLATION_MODE",
"browser_use_isolation_mode",
"shared",
)
).strip().lower()
if mode not in {"shared", "docker-per-principal", "docker-per-task"}:
logger.warning("Unknown browser-use isolation mode %r; falling back to shared", mode)
mode = "shared"
return BrowserUseIsolationConfig(
mode=mode,
runtime_image=str(
_env_or_config(
"BROWSER_RUNTIME_IMAGE",
"browser_use_runtime_image",
_DEFAULT_RUNTIME_IMAGE,
)
).strip()
or _DEFAULT_RUNTIME_IMAGE,
runtime_network=str(
_env_or_config(
"BROWSER_RUNTIME_NETWORK",
"browser_use_runtime_network",
_DEFAULT_RUNTIME_NETWORK,
)
).strip(),
runtime_ttl_seconds=_as_int(
_env_or_config(
"BROWSER_RUNTIME_TTL_SECONDS",
"browser_use_runtime_ttl_seconds",
_DEFAULT_TTL_SECONDS,
),
_DEFAULT_TTL_SECONDS,
),
runtime_start_timeout=_as_int(
_env_or_config(
"BROWSER_RUNTIME_START_TIMEOUT",
"browser_use_runtime_start_timeout",
_DEFAULT_START_TIMEOUT,
),
_DEFAULT_START_TIMEOUT,
),
shared_cdp_url=str(os.getenv("BROWSER_URL", _DEFAULT_SHARED_CDP_URL)).strip() or _DEFAULT_SHARED_CDP_URL,
enable_ui=_as_bool(
_env_or_config(
"BROWSER_RUNTIME_ENABLE_UI",
"browser_use_runtime_enable_ui",
_DEFAULT_ENABLE_UI,
),
_DEFAULT_ENABLE_UI,
),
)
def resolve_isolation_owner(mode: str, task_id: Optional[str], honcho_session_key: Optional[str]) -> str:
if mode == "docker-per-task":
return (task_id or "default").strip() or "default"
if honcho_session_key:
return honcho_session_key.strip() or (task_id or "default")
return (task_id or "default").strip() or "default"
def hash_runtime_owner(owner: str) -> str:
return hashlib.sha256(owner.encode("utf-8")).hexdigest()[:16]
def _normalize_browser_view_base_url(raw_url: str) -> str:
url = (raw_url or "").strip()
if not url:
return ""
for marker in ("/vnc.html", "/index.html"):
idx = url.find(marker)
if idx != -1:
url = url[:idx]
break
return url.rstrip("/")
def _discover_browser_view_base_url_from_tunnel() -> str:
now = time.time()
with _VIEW_URL_CACHE_LOCK:
cached_value = str(_VIEW_URL_CACHE.get("value", "") or "")
expires_at = float(_VIEW_URL_CACHE.get("expires_at", 0.0) or 0.0)
if cached_value and now < expires_at:
return cached_value
try:
result = _run_docker(["logs", "--tail", "200", "hermes-tunnel"], check=False)
combined = "\n".join(
part for part in [result.stdout or "", result.stderr or ""] if part
)
matches = re.findall(r"https://[^\s\"'<>]+", combined)
base_url = _normalize_browser_view_base_url(matches[-1]) if matches else ""
except Exception as exc:
logger.debug("Failed to discover browser view URL from hermes-tunnel logs: %s", exc)
base_url = ""
with _VIEW_URL_CACHE_LOCK:
_VIEW_URL_CACHE["value"] = base_url
_VIEW_URL_CACHE["expires_at"] = now + (60 if base_url else 10)
return base_url
def get_browser_use_view_url(
task_id: Optional[str] = None,
honcho_session_key: Optional[str] = None,
) -> str:
base_url = _normalize_browser_view_base_url(
os.getenv("BROWSER_VIEW_BASE_URL", "") or os.getenv("BROWSER_VIEW_URL", "")
)
if not base_url:
base_url = _discover_browser_view_base_url_from_tunnel()
if not base_url:
return ""
config = get_browser_use_isolation_config()
owner = resolve_isolation_owner(config.mode, task_id, honcho_session_key)
owner_hash = hash_runtime_owner(owner)
if config.mode == "shared":
return f"{base_url}/vnc.html?path=websockify"
return f"{base_url}/view/{owner_hash}/vnc.html?path=view/{owner_hash}/websockify"
def _shared_rpc_url() -> str:
return str(os.getenv("BROWSER_USE_RPC_URL", _DEFAULT_SHARED_RPC_URL)).strip() or _DEFAULT_SHARED_RPC_URL
def _runtime_rpc_url(container_name: str) -> str:
return f"http://{container_name}:8787/run"
def _load_registry() -> Dict[str, Any]:
path = _registry_path()
if not path.exists():
return {"runtimes": {}}
try:
with open(path, "r", encoding="utf-8") as fh:
data = json.load(fh) or {}
if isinstance(data, dict) and isinstance(data.get("runtimes"), dict):
return data
except Exception as exc:
logger.warning("Failed to read browser-use runtime registry %s: %s", path, exc)
return {"runtimes": {}}
def _save_registry(payload: Dict[str, Any]) -> None:
path = _registry_path()
path.parent.mkdir(parents=True, exist_ok=True)
fd, tmp_path = tempfile.mkstemp(dir=str(path.parent), prefix=".browser_use_", suffix=".tmp")
try:
with os.fdopen(fd, "w", encoding="utf-8") as fh:
json.dump(payload, fh, indent=2, sort_keys=True)
fh.flush()
os.fsync(fh.fileno())
os.replace(tmp_path, path)
except Exception:
try:
os.unlink(tmp_path)
except OSError:
pass
raise
def _run_docker(args: list[str], check: bool = True) -> subprocess.CompletedProcess[str]:
cmd = ["docker", *args]
logger.debug("browser-use docker cmd: %s", " ".join(cmd))
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=120,
)
if check and result.returncode != 0:
stderr = (result.stderr or result.stdout or "").strip()
raise RuntimeError(f"Docker command failed ({' '.join(cmd)}): {stderr}")
return result
def _ensure_docker_access() -> None:
_run_docker(["version"], check=True)
def _container_exists(container_name: str) -> bool:
result = _run_docker(["inspect", container_name], check=False)
return result.returncode == 0
def _container_running(container_name: str) -> bool:
result = _run_docker(
["inspect", "-f", "{{.State.Running}}", container_name],
check=False,
)
return result.returncode == 0 and result.stdout.strip().lower() == "true"
def _remove_container(container_name: str) -> None:
if not container_name:
return
_run_docker(["rm", "-f", container_name], check=False)
def _volume_exists(volume_name: str) -> bool:
result = _run_docker(["volume", "inspect", volume_name], check=False)
return result.returncode == 0
def _ensure_volume(volume_name: str, owner_hash: str) -> None:
if _volume_exists(volume_name):
return
_run_docker(
[
"volume",
"create",
"--label",
"hermes.browser_use=true",
"--label",
f"hermes.owner_hash={owner_hash}",
volume_name,
],
check=True,
)
def _remove_volume(volume_name: str) -> None:
if not volume_name:
return
_run_docker(["volume", "rm", "-f", volume_name], check=False)
def _start_runtime_container(
container_name: str,
volume_name: str,
owner_hash: str,
config: BrowserUseIsolationConfig,
) -> None:
_ensure_volume(volume_name, owner_hash)
run_args = [
"run",
"-d",
"--name",
container_name,
"--network",
config.runtime_network or _DEFAULT_RUNTIME_NETWORK,
"--shm-size",
"2g",
"--label",
"hermes.browser_use=true",
"--label",
f"hermes.owner_hash={owner_hash}",
"--label",
"hermes.managed_by=browser_use_manager",
"-e",
f"BROWSER_ENABLE_UI={'true' if config.enable_ui else 'false'}",
"-e",
"BROWSER_DATA_DIR=/data",
"-v",
f"{volume_name}:/data",
config.runtime_image,
]
_run_docker(run_args, check=True)
def _wait_for_cdp(container_name: str, timeout_seconds: int) -> None:
deadline = time.time() + timeout_seconds
cdp_url = f"http://{container_name}:9222/json/version"
last_error = ""
while time.time() < deadline:
try:
response = requests.get(cdp_url, timeout=2)
if response.ok:
return
last_error = f"HTTP {response.status_code}"
except Exception as exc:
last_error = str(exc)
time.sleep(1)
raise RuntimeError(f"Browser runtime {container_name} did not become ready: {last_error}")
def _cleanup_expired_runtimes_locked(registry: Dict[str, Any], config: BrowserUseIsolationConfig) -> None:
now = time.time()
runtimes = registry.setdefault("runtimes", {})
expired_keys = []
for runtime_key, entry in list(runtimes.items()):
last_used = float(entry.get("last_used", 0) or 0)
if not last_used or now - last_used < config.runtime_ttl_seconds:
continue
container_name = entry.get("container_name", "")
volume_name = entry.get("volume_name", "")
mode = entry.get("mode", "")
logger.info("Cleaning expired browser-use runtime %s (%s)", runtime_key, container_name)
_remove_container(container_name)
if mode == "docker-per-task":
_remove_volume(volume_name)
expired_keys.append(runtime_key)
for runtime_key in expired_keys:
runtimes.pop(runtime_key, None)
def ensure_isolated_browser_runtime(
task_id: Optional[str] = None,
honcho_session_key: Optional[str] = None,
) -> Dict[str, str]:
config = get_browser_use_isolation_config()
if config.mode == "shared":
return {
"cdp_url": config.shared_cdp_url,
"rpc_url": _shared_rpc_url(),
"browser_view": get_browser_use_view_url(
task_id=task_id,
honcho_session_key=honcho_session_key,
),
"isolation_mode": "shared",
"owner": "",
"owner_hash": "",
}
_ensure_docker_access()
owner = resolve_isolation_owner(config.mode, task_id, honcho_session_key)
owner_hash = hash_runtime_owner(owner)
runtime_key = f"{config.mode}:{owner_hash}"
container_name = f"hermes-browser-{owner_hash}"
volume_name = f"hermes-browser-profile-{owner_hash}"
with _REGISTRY_LOCK:
registry = _load_registry()
_cleanup_expired_runtimes_locked(registry, config)
if _container_running(container_name):
registry.setdefault("runtimes", {})[runtime_key] = {
"container_name": container_name,
"volume_name": volume_name,
"last_used": time.time(),
"mode": config.mode,
"owner_hash": owner_hash,
}
_save_registry(registry)
return {
"cdp_url": f"http://{container_name}:9222",
"rpc_url": _runtime_rpc_url(container_name),
"browser_view": get_browser_use_view_url(
task_id=task_id,
honcho_session_key=honcho_session_key,
),
"isolation_mode": config.mode,
"owner": owner,
"owner_hash": owner_hash,
}
if _container_exists(container_name):
_remove_container(container_name)
_start_runtime_container(container_name, volume_name, owner_hash, config)
_wait_for_cdp(container_name, config.runtime_start_timeout)
registry.setdefault("runtimes", {})[runtime_key] = {
"container_name": container_name,
"volume_name": volume_name,
"last_used": time.time(),
"mode": config.mode,
"owner_hash": owner_hash,
}
_save_registry(registry)
return {
"cdp_url": f"http://{container_name}:9222",
"rpc_url": _runtime_rpc_url(container_name),
"browser_view": get_browser_use_view_url(
task_id=task_id,
honcho_session_key=honcho_session_key,
),
"isolation_mode": config.mode,
"owner": owner,
"owner_hash": owner_hash,
}
def cleanup_browser_use_runtime(
task_id: Optional[str] = None,
honcho_session_key: Optional[str] = None,
) -> None:
config = get_browser_use_isolation_config()
if config.mode != "docker-per-task":
return
owner = resolve_isolation_owner(config.mode, task_id, honcho_session_key)
owner_hash = hash_runtime_owner(owner)
runtime_key = f"{config.mode}:{owner_hash}"
container_name = f"hermes-browser-{owner_hash}"
volume_name = f"hermes-browser-profile-{owner_hash}"
with _REGISTRY_LOCK:
registry = _load_registry()
_remove_container(container_name)
_remove_volume(volume_name)
registry.setdefault("runtimes", {}).pop(runtime_key, None)
_save_registry(registry)