Integrate per-user browser runtimes into subagent API
This commit is contained in:
parent
952b2e7d17
commit
280247e1e5
11 changed files with 777 additions and 21 deletions
464
api/services/browser_runtime_manager.py
Normal file
464
api/services/browser_runtime_manager.py
Normal file
|
|
@ -0,0 +1,464 @@
|
|||
"""Provision isolated browser-use Docker runtimes for API runs."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
import tempfile
|
||||
import threading
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from urllib import request
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_DEFAULT_SHARED_CDP_URL = "http://browser:9222"
|
||||
_DEFAULT_SHARED_RPC_URL = "http://browser:8787/run"
|
||||
_DEFAULT_RUNTIME_IMAGE = "browser-use-browser-runtime:latest"
|
||||
_DEFAULT_RUNTIME_NETWORK = "browser-net"
|
||||
_DEFAULT_TTL_SECONDS = 900
|
||||
_DEFAULT_START_TIMEOUT = 45
|
||||
_DEFAULT_ENABLE_UI = True
|
||||
_REGISTRY_LOCK = threading.Lock()
|
||||
_VIEW_URL_CACHE_LOCK = threading.Lock()
|
||||
_VIEW_URL_CACHE: dict[str, Any] = {"value": "", "expires_at": 0.0}
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class BrowserRuntimeConfig:
|
||||
mode: str
|
||||
runtime_image: str
|
||||
runtime_network: str
|
||||
runtime_ttl_seconds: int
|
||||
runtime_start_timeout: int
|
||||
shared_cdp_url: str
|
||||
enable_ui: bool
|
||||
|
||||
|
||||
def _state_dir() -> Path:
|
||||
return Path(os.getenv("BROWSER_RUNTIME_STATE_DIR", "/tmp/browser-use-api"))
|
||||
|
||||
|
||||
def _registry_path() -> Path:
|
||||
return _state_dir() / "docker_runtimes.json"
|
||||
|
||||
|
||||
def _as_int(value: Any, default: int) -> int:
|
||||
try:
|
||||
return max(1, int(value))
|
||||
except (TypeError, ValueError):
|
||||
return default
|
||||
|
||||
|
||||
def _as_bool(value: Any, default: bool) -> bool:
|
||||
if value is None:
|
||||
return default
|
||||
if isinstance(value, bool):
|
||||
return value
|
||||
return str(value).strip().lower() in {"1", "true", "yes", "on"}
|
||||
|
||||
|
||||
def get_browser_runtime_config() -> BrowserRuntimeConfig:
|
||||
mode = str(os.getenv("BROWSER_USE_ISOLATION_MODE", "shared")).strip().lower()
|
||||
if mode not in {"shared", "docker-per-principal", "docker-per-task"}:
|
||||
logger.warning("Unknown browser-use isolation mode %r; falling back to shared", mode)
|
||||
mode = "shared"
|
||||
|
||||
return BrowserRuntimeConfig(
|
||||
mode=mode,
|
||||
runtime_image=os.getenv("BROWSER_RUNTIME_IMAGE", _DEFAULT_RUNTIME_IMAGE).strip()
|
||||
or _DEFAULT_RUNTIME_IMAGE,
|
||||
runtime_network=os.getenv("BROWSER_RUNTIME_NETWORK", _DEFAULT_RUNTIME_NETWORK).strip()
|
||||
or _DEFAULT_RUNTIME_NETWORK,
|
||||
runtime_ttl_seconds=_as_int(
|
||||
os.getenv("BROWSER_RUNTIME_TTL_SECONDS"),
|
||||
_DEFAULT_TTL_SECONDS,
|
||||
),
|
||||
runtime_start_timeout=_as_int(
|
||||
os.getenv("BROWSER_RUNTIME_START_TIMEOUT"),
|
||||
_DEFAULT_START_TIMEOUT,
|
||||
),
|
||||
shared_cdp_url=os.getenv("BROWSER_URL", _DEFAULT_SHARED_CDP_URL).strip()
|
||||
or _DEFAULT_SHARED_CDP_URL,
|
||||
enable_ui=_as_bool(
|
||||
os.getenv("BROWSER_RUNTIME_ENABLE_UI"),
|
||||
_DEFAULT_ENABLE_UI,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
def resolve_isolation_owner(
|
||||
mode: str,
|
||||
task_id: str | None,
|
||||
metadata: dict[str, Any] | None = None,
|
||||
thread_id: str | None = None,
|
||||
) -> str:
|
||||
if mode == "docker-per-task":
|
||||
return (task_id or "default").strip() or "default"
|
||||
|
||||
metadata = metadata or {}
|
||||
for key in ("user_id", "session_id"):
|
||||
value = metadata.get(key)
|
||||
if value not in (None, ""):
|
||||
return str(value).strip() or "default"
|
||||
|
||||
return (thread_id or task_id or "default").strip() or "default"
|
||||
|
||||
|
||||
def hash_runtime_owner(owner: str) -> str:
|
||||
return hashlib.sha256(owner.encode("utf-8")).hexdigest()[:16]
|
||||
|
||||
|
||||
def _normalize_browser_view_base_url(raw_url: str) -> str:
|
||||
url = (raw_url or "").strip()
|
||||
if not url:
|
||||
return ""
|
||||
for marker in ("/vnc.html", "/index.html"):
|
||||
idx = url.find(marker)
|
||||
if idx != -1:
|
||||
url = url[:idx]
|
||||
break
|
||||
return url.rstrip("/")
|
||||
|
||||
|
||||
def _discover_browser_view_base_url_from_tunnel() -> str:
|
||||
now = time.time()
|
||||
with _VIEW_URL_CACHE_LOCK:
|
||||
cached_value = str(_VIEW_URL_CACHE.get("value", "") or "")
|
||||
expires_at = float(_VIEW_URL_CACHE.get("expires_at", 0.0) or 0.0)
|
||||
if cached_value and now < expires_at:
|
||||
return cached_value
|
||||
|
||||
try:
|
||||
result = _run_docker(["logs", "--tail", "200", "browser-use-tunnel"], check=False)
|
||||
combined = "\n".join(part for part in [result.stdout or "", result.stderr or ""] if part)
|
||||
matches = re.findall(r"https://[^\s\"'<>]+", combined)
|
||||
base_url = _normalize_browser_view_base_url(matches[-1]) if matches else ""
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to discover browser view URL from tunnel logs: %s", exc)
|
||||
base_url = ""
|
||||
|
||||
with _VIEW_URL_CACHE_LOCK:
|
||||
_VIEW_URL_CACHE["value"] = base_url
|
||||
_VIEW_URL_CACHE["expires_at"] = now + (60 if base_url else 10)
|
||||
|
||||
return base_url
|
||||
|
||||
|
||||
def get_browser_view_url(
|
||||
task_id: str | None = None,
|
||||
metadata: dict[str, Any] | None = None,
|
||||
thread_id: str | None = None,
|
||||
) -> str:
|
||||
base_url = _normalize_browser_view_base_url(
|
||||
os.getenv("BROWSER_VIEW_BASE_URL", "") or os.getenv("BROWSER_VIEW_URL", "")
|
||||
)
|
||||
if not base_url:
|
||||
base_url = _discover_browser_view_base_url_from_tunnel()
|
||||
if not base_url:
|
||||
return ""
|
||||
|
||||
config = get_browser_runtime_config()
|
||||
if config.mode == "shared":
|
||||
return f"{base_url}/vnc.html?path=websockify"
|
||||
|
||||
owner = resolve_isolation_owner(config.mode, task_id, metadata, thread_id)
|
||||
owner_hash = hash_runtime_owner(owner)
|
||||
return f"{base_url}/view/{owner_hash}/vnc.html?path=view/{owner_hash}/websockify"
|
||||
|
||||
|
||||
def _shared_rpc_url() -> str:
|
||||
return os.getenv("BROWSER_USE_RPC_URL", _DEFAULT_SHARED_RPC_URL).strip() or _DEFAULT_SHARED_RPC_URL
|
||||
|
||||
|
||||
def _runtime_rpc_url(container_name: str) -> str:
|
||||
return f"http://{container_name}:8787/run"
|
||||
|
||||
|
||||
def _container_name(owner_hash: str) -> str:
|
||||
return f"browser-use-browser-{owner_hash}"
|
||||
|
||||
|
||||
def _volume_name(owner_hash: str) -> str:
|
||||
return f"browser-use-profile-{owner_hash}"
|
||||
|
||||
|
||||
def _load_registry() -> dict[str, Any]:
|
||||
path = _registry_path()
|
||||
if not path.exists():
|
||||
return {"runtimes": {}}
|
||||
try:
|
||||
with open(path, "r", encoding="utf-8") as fh:
|
||||
data = json.load(fh) or {}
|
||||
if isinstance(data, dict) and isinstance(data.get("runtimes"), dict):
|
||||
return data
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to read browser-use runtime registry %s: %s", path, exc)
|
||||
return {"runtimes": {}}
|
||||
|
||||
|
||||
def _save_registry(payload: dict[str, Any]) -> None:
|
||||
path = _registry_path()
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
fd, tmp_path = tempfile.mkstemp(dir=str(path.parent), prefix=".browser_use_", suffix=".tmp")
|
||||
try:
|
||||
with os.fdopen(fd, "w", encoding="utf-8") as fh:
|
||||
json.dump(payload, fh, indent=2, sort_keys=True)
|
||||
fh.flush()
|
||||
os.fsync(fh.fileno())
|
||||
os.replace(tmp_path, path)
|
||||
except Exception:
|
||||
try:
|
||||
os.unlink(tmp_path)
|
||||
except OSError:
|
||||
pass
|
||||
raise
|
||||
|
||||
|
||||
def _run_docker(args: list[str], check: bool = True) -> subprocess.CompletedProcess[str]:
|
||||
cmd = ["docker", *args]
|
||||
logger.debug("browser-use docker cmd: %s", " ".join(cmd))
|
||||
result = subprocess.run(
|
||||
cmd,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=120,
|
||||
)
|
||||
if check and result.returncode != 0:
|
||||
stderr = (result.stderr or result.stdout or "").strip()
|
||||
raise RuntimeError(f"Docker command failed ({' '.join(cmd)}): {stderr}")
|
||||
return result
|
||||
|
||||
|
||||
def _ensure_docker_access() -> None:
|
||||
_run_docker(["version"], check=True)
|
||||
|
||||
|
||||
def _container_exists(container_name: str) -> bool:
|
||||
result = _run_docker(["inspect", container_name], check=False)
|
||||
return result.returncode == 0
|
||||
|
||||
|
||||
def _container_running(container_name: str) -> bool:
|
||||
result = _run_docker(["inspect", "-f", "{{.State.Running}}", container_name], check=False)
|
||||
return result.returncode == 0 and result.stdout.strip().lower() == "true"
|
||||
|
||||
|
||||
def _remove_container(container_name: str) -> None:
|
||||
if container_name:
|
||||
_run_docker(["rm", "-f", container_name], check=False)
|
||||
|
||||
|
||||
def _volume_exists(volume_name: str) -> bool:
|
||||
result = _run_docker(["volume", "inspect", volume_name], check=False)
|
||||
return result.returncode == 0
|
||||
|
||||
|
||||
def _ensure_volume(volume_name: str, owner_hash: str) -> None:
|
||||
if _volume_exists(volume_name):
|
||||
return
|
||||
_run_docker(
|
||||
[
|
||||
"volume",
|
||||
"create",
|
||||
"--label",
|
||||
"browser_use.runtime=true",
|
||||
"--label",
|
||||
f"browser_use.owner_hash={owner_hash}",
|
||||
volume_name,
|
||||
],
|
||||
check=True,
|
||||
)
|
||||
|
||||
|
||||
def _remove_volume(volume_name: str) -> None:
|
||||
if volume_name:
|
||||
_run_docker(["volume", "rm", "-f", volume_name], check=False)
|
||||
|
||||
|
||||
def _runtime_env_args(browser_view_url: str, config: BrowserRuntimeConfig) -> list[str]:
|
||||
env: dict[str, str] = {
|
||||
"BROWSER_ENABLE_UI": "true" if config.enable_ui else "false",
|
||||
"BROWSER_DATA_DIR": "/data",
|
||||
"BROWSER_USE_RPC_HOST": "0.0.0.0",
|
||||
"BROWSER_USE_RPC_PORT": "8787",
|
||||
}
|
||||
|
||||
if browser_view_url:
|
||||
env["BROWSER_VIEW_URL"] = browser_view_url
|
||||
|
||||
for key in ("MODEL_DEFAULT", "OPENAI_API_KEY", "OPENAI_BASE_URL"):
|
||||
value = os.getenv(key)
|
||||
if value is not None:
|
||||
env[key] = value
|
||||
|
||||
args: list[str] = []
|
||||
for key, value in env.items():
|
||||
args.extend(["-e", f"{key}={value}"])
|
||||
return args
|
||||
|
||||
|
||||
def _start_runtime_container(
|
||||
container_name: str,
|
||||
volume_name: str,
|
||||
owner_hash: str,
|
||||
browser_view_url: str,
|
||||
config: BrowserRuntimeConfig,
|
||||
) -> None:
|
||||
_ensure_volume(volume_name, owner_hash)
|
||||
run_args = [
|
||||
"run",
|
||||
"-d",
|
||||
"--name",
|
||||
container_name,
|
||||
"--network",
|
||||
config.runtime_network,
|
||||
"--shm-size",
|
||||
"2g",
|
||||
"--label",
|
||||
"browser_use.runtime=true",
|
||||
"--label",
|
||||
f"browser_use.owner_hash={owner_hash}",
|
||||
"--label",
|
||||
"browser_use.managed_by=browser_runtime_manager",
|
||||
*_runtime_env_args(browser_view_url, config),
|
||||
"-v",
|
||||
f"{volume_name}:/data",
|
||||
config.runtime_image,
|
||||
]
|
||||
_run_docker(run_args, check=True)
|
||||
|
||||
|
||||
def _wait_for_runtime(container_name: str, timeout_seconds: int) -> None:
|
||||
deadline = time.time() + timeout_seconds
|
||||
health_url = f"http://{container_name}:8787/health"
|
||||
last_error = ""
|
||||
while time.time() < deadline:
|
||||
try:
|
||||
with request.urlopen(health_url, timeout=2) as response:
|
||||
if 200 <= response.status < 300:
|
||||
return
|
||||
last_error = f"HTTP {response.status}"
|
||||
except Exception as exc:
|
||||
last_error = str(exc)
|
||||
time.sleep(1)
|
||||
raise RuntimeError(f"Browser runtime {container_name} did not become ready: {last_error}")
|
||||
|
||||
|
||||
def _cleanup_expired_runtimes_locked(registry: dict[str, Any], config: BrowserRuntimeConfig) -> None:
|
||||
now = time.time()
|
||||
runtimes = registry.setdefault("runtimes", {})
|
||||
expired_keys: list[str] = []
|
||||
for runtime_key, entry in list(runtimes.items()):
|
||||
last_used = float(entry.get("last_used", 0) or 0)
|
||||
if not last_used or now - last_used < config.runtime_ttl_seconds:
|
||||
continue
|
||||
|
||||
container_name = str(entry.get("container_name", "") or "")
|
||||
volume_name = str(entry.get("volume_name", "") or "")
|
||||
mode = str(entry.get("mode", "") or "")
|
||||
logger.info("Cleaning expired browser-use runtime %s (%s)", runtime_key, container_name)
|
||||
_remove_container(container_name)
|
||||
if mode == "docker-per-task":
|
||||
_remove_volume(volume_name)
|
||||
expired_keys.append(runtime_key)
|
||||
|
||||
for runtime_key in expired_keys:
|
||||
runtimes.pop(runtime_key, None)
|
||||
|
||||
|
||||
def ensure_browser_runtime(
|
||||
task_id: str | None = None,
|
||||
metadata: dict[str, Any] | None = None,
|
||||
thread_id: str | None = None,
|
||||
) -> dict[str, str]:
|
||||
config = get_browser_runtime_config()
|
||||
if config.mode == "shared":
|
||||
return {
|
||||
"cdp_url": config.shared_cdp_url,
|
||||
"rpc_url": _shared_rpc_url(),
|
||||
"browser_view": get_browser_view_url(task_id=task_id, metadata=metadata, thread_id=thread_id),
|
||||
"isolation_mode": "shared",
|
||||
"owner_hash": "",
|
||||
}
|
||||
|
||||
_ensure_docker_access()
|
||||
owner = resolve_isolation_owner(config.mode, task_id, metadata, thread_id)
|
||||
owner_hash = hash_runtime_owner(owner)
|
||||
runtime_key = f"{config.mode}:{owner_hash}"
|
||||
container_name = _container_name(owner_hash)
|
||||
volume_name = _volume_name(owner_hash)
|
||||
browser_view_url = get_browser_view_url(task_id=task_id, metadata=metadata, thread_id=thread_id)
|
||||
|
||||
with _REGISTRY_LOCK:
|
||||
registry = _load_registry()
|
||||
_cleanup_expired_runtimes_locked(registry, config)
|
||||
|
||||
if _container_running(container_name):
|
||||
registry.setdefault("runtimes", {})[runtime_key] = {
|
||||
"container_name": container_name,
|
||||
"volume_name": volume_name,
|
||||
"last_used": time.time(),
|
||||
"mode": config.mode,
|
||||
"owner_hash": owner_hash,
|
||||
}
|
||||
_save_registry(registry)
|
||||
return {
|
||||
"cdp_url": f"http://{container_name}:9222",
|
||||
"rpc_url": _runtime_rpc_url(container_name),
|
||||
"browser_view": browser_view_url,
|
||||
"isolation_mode": config.mode,
|
||||
"owner_hash": owner_hash,
|
||||
}
|
||||
|
||||
if _container_exists(container_name):
|
||||
_remove_container(container_name)
|
||||
|
||||
_start_runtime_container(container_name, volume_name, owner_hash, browser_view_url, config)
|
||||
_wait_for_runtime(container_name, config.runtime_start_timeout)
|
||||
|
||||
registry.setdefault("runtimes", {})[runtime_key] = {
|
||||
"container_name": container_name,
|
||||
"volume_name": volume_name,
|
||||
"last_used": time.time(),
|
||||
"mode": config.mode,
|
||||
"owner_hash": owner_hash,
|
||||
}
|
||||
_save_registry(registry)
|
||||
|
||||
return {
|
||||
"cdp_url": f"http://{container_name}:9222",
|
||||
"rpc_url": _runtime_rpc_url(container_name),
|
||||
"browser_view": browser_view_url,
|
||||
"isolation_mode": config.mode,
|
||||
"owner_hash": owner_hash,
|
||||
}
|
||||
|
||||
|
||||
def cleanup_browser_runtime(
|
||||
task_id: str | None = None,
|
||||
metadata: dict[str, Any] | None = None,
|
||||
thread_id: str | None = None,
|
||||
) -> None:
|
||||
config = get_browser_runtime_config()
|
||||
if config.mode != "docker-per-task":
|
||||
return
|
||||
|
||||
owner = resolve_isolation_owner(config.mode, task_id, metadata, thread_id)
|
||||
owner_hash = hash_runtime_owner(owner)
|
||||
runtime_key = f"{config.mode}:{owner_hash}"
|
||||
container_name = _container_name(owner_hash)
|
||||
volume_name = _volume_name(owner_hash)
|
||||
|
||||
with _REGISTRY_LOCK:
|
||||
registry = _load_registry()
|
||||
_remove_container(container_name)
|
||||
_remove_volume(volume_name)
|
||||
registry.setdefault("runtimes", {}).pop(runtime_key, None)
|
||||
_save_registry(registry)
|
||||
|
|
@ -5,6 +5,7 @@ from typing import Any
|
|||
from api.clients.browser_rpc_contracts import BrowserRpcError, BrowserRpcRunner
|
||||
from api.domain.task_status import TaskStatus
|
||||
from api.repositories.task_store import TaskRecord, TaskStore
|
||||
from api.services.browser_runtime_manager import cleanup_browser_runtime, ensure_browser_runtime
|
||||
|
||||
|
||||
class TaskService:
|
||||
|
|
@ -108,20 +109,28 @@ class TaskService:
|
|||
await self._store.publish(task_id, self._event(task_id, "started", {"status": TaskStatus.running.value}))
|
||||
|
||||
async with self._semaphore:
|
||||
runtime: dict[str, str] | None = None
|
||||
try:
|
||||
if rec.cancel_requested:
|
||||
await self._store.set_cancelled(task_id)
|
||||
await self._store.publish(task_id, self._event(task_id, "cancelled", {"status": TaskStatus.cancelled.value}))
|
||||
return
|
||||
|
||||
runtime = await asyncio.to_thread(
|
||||
ensure_browser_runtime,
|
||||
task_id=task_id,
|
||||
metadata=rec.metadata,
|
||||
thread_id=rec.thread_id,
|
||||
)
|
||||
rpc_timeout = float(rec.timeout)
|
||||
if self._rpc_timeout_cap is not None:
|
||||
rpc_timeout = min(rpc_timeout, self._rpc_timeout_cap)
|
||||
|
||||
raw = await asyncio.wait_for(
|
||||
self._rpc_client.run(task=rec.task, timeout_sec=rpc_timeout),
|
||||
self._rpc_client.run(task=rec.task, timeout_sec=rpc_timeout, rpc_url=runtime.get("rpc_url")),
|
||||
timeout=float(rec.timeout) + 5,
|
||||
)
|
||||
raw = self._with_runtime_metadata(raw, runtime)
|
||||
success = bool(raw.get("success"))
|
||||
await self._store.set_done(
|
||||
task_id=task_id,
|
||||
|
|
@ -188,6 +197,16 @@ class TaskService:
|
|||
"status": failed.status.value,
|
||||
"error": failed.error,
|
||||
}))
|
||||
finally:
|
||||
try:
|
||||
await asyncio.to_thread(
|
||||
cleanup_browser_runtime,
|
||||
task_id=task_id,
|
||||
metadata=rec.metadata,
|
||||
thread_id=rec.thread_id,
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
async def _publish_history_events(self, rec: TaskRecord) -> None:
|
||||
for index, item in enumerate(rec.history, start=1):
|
||||
|
|
@ -225,3 +244,17 @@ class TaskService:
|
|||
normalized.append(event)
|
||||
return normalized
|
||||
|
||||
@staticmethod
|
||||
def _with_runtime_metadata(raw: dict[str, Any], runtime: dict[str, str] | None) -> dict[str, Any]:
|
||||
if not isinstance(raw, dict) or not runtime:
|
||||
return raw
|
||||
|
||||
enriched = dict(raw)
|
||||
browser_view = runtime.get("browser_view")
|
||||
if browser_view and not enriched.get("browser_view"):
|
||||
enriched["browser_view"] = browser_view
|
||||
enriched["isolation_mode"] = runtime.get("isolation_mode", "shared")
|
||||
owner_hash = runtime.get("owner_hash")
|
||||
if owner_hash:
|
||||
enriched["owner_hash"] = owner_hash
|
||||
return enriched
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue