BrowserUse_and_ComputerUse_.../browser_env/browser_use_runner.py

785 lines
30 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import json
import logging
import os
import re
import threading
import time
import uuid
from datetime import datetime, timezone
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from urllib import error, parse, request
from browser_use import Agent, Browser, ChatOpenAI
logger = logging.getLogger(__name__)
_EVENT_LIMIT = int(os.getenv("BROWSER_USE_EVENT_LIMIT", "300"))
_EVENT_TTL_SEC = int(os.getenv("BROWSER_USE_EVENT_TTL_SEC", "3600"))
_STATE_POLL_INTERVAL = float(os.getenv("BROWSER_USE_STATE_POLL_INTERVAL", "3.0"))
_RUNS = {}
_RUNS_LOCK = threading.RLock()
_ANSI_RE = re.compile(r"\x1b\[[0-9;]*m")
_SPACE_RE = re.compile(r"\s+")
_URL_RE = re.compile(r"https?://[^\s\"'<>),]+")
_CAPTCHA_RE = re.compile(
r"(captcha|cloudflare|verify you are human|human verification|checking your browser|"
r"anti[- ]?bot|challenge|blocked)",
re.IGNORECASE,
)
def _utc_now_iso():
return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
def _clean_text(value, limit=220):
text = _ANSI_RE.sub("", str(value or ""))
text = _SPACE_RE.sub(" ", text).strip()
if len(text) > limit:
return text[: limit - 1].rstrip() + ""
return text
def _short_url(url):
if not url:
return ""
parsed = parse.urlparse(url)
if parsed.scheme in {"http", "https"} and parsed.netloc:
path = parsed.path or "/"
if len(path) > 64:
path = path[:61] + "..."
return f"{parsed.netloc}{path}"
return _clean_text(url, limit=100)
def _extract_url(text):
match = _URL_RE.search(text or "")
return match.group(0) if match else ""
def _public_data(value):
if value is None:
return {}
if isinstance(value, dict):
return value
if hasattr(value, "model_dump"):
try:
dumped = value.model_dump(exclude_none=True)
return dumped if isinstance(dumped, dict) else {}
except Exception:
return {}
if hasattr(value, "dict"):
try:
dumped = value.dict(exclude_none=True)
return dumped if isinstance(dumped, dict) else {}
except Exception:
return {}
return {}
def _compact_value(value, limit=120):
if value is None:
return ""
if isinstance(value, (list, tuple)):
value = " ".join(str(item) for item in value if item is not None)
if isinstance(value, dict):
return ""
return _clean_text(value, limit=limit)
def _first_param(params, *names):
if not isinstance(params, dict):
return ""
for name in names:
if name in params:
value = _compact_value(params.get(name))
if value:
return value
return ""
def _strip_browser_use_hint(text):
return re.sub(r"\s*💡\s*This is an autocomplete field\..*$", "", text, flags=re.IGNORECASE).strip()
def _strip_browser_use_prefix(text):
return re.sub(r"^\s*[🔗🔍✅❌⚠️🖱️⌨️📖📍➡️🌐]+\s*", "", text or "").strip()
def _extract_quoted_label(text):
match = re.search(r'"([^"]{1,90})"', text or "")
if match:
return _clean_text(match.group(1), limit=80)
match = re.search(r"'([^']{1,90})'", text or "")
if match:
return _clean_text(match.group(1), limit=80)
return ""
def _extract_attribute_label(text):
for attr in ("aria-label", "name", "title", "id"):
match = re.search(rf"\b{attr}=([^\s]+)", text or "", flags=re.IGNORECASE)
if match:
value = match.group(1).strip("\"'")
if value:
return _clean_text(value.replace("_", " "), limit=80)
return ""
def _normalize_model_action(action):
data = _public_data(action)
if not data:
name = action.__class__.__name__ if action is not None else "action"
return name, {}
container = data.get("action") or data.get("actions")
if isinstance(container, list) and container:
return _normalize_model_action(container[0])
if isinstance(container, dict):
return _normalize_model_action(container)
if isinstance(container, str) and container.strip():
explicit_params = data.get("parameters") or data.get("params") or data.get("input") or data.get("args") or data
return container, _public_data(explicit_params) or data
explicit_name = data.get("name") or data.get("type")
explicit_params = data.get("parameters") or data.get("params") or data.get("input") or data.get("args")
if explicit_name:
return str(explicit_name), _public_data(explicit_params) or data
action_items = [(key, value) for key, value in data.items() if value not in (None, {}, [])]
if len(action_items) == 1:
name, params = action_items[0]
return str(name), _public_data(params)
if len(data) == 1:
name, params = next(iter(data.items()))
return str(name), _public_data(params)
return "action", data
def _format_model_action(action):
name, params = _normalize_model_action(action)
normalized = re.sub(r"[^a-z0-9]+", "_", name.lower()).strip("_")
label = _first_param(params, "label", "text", "title", "element", "button", "aria_label", "selector")
index = _first_param(params, "index", "element_index", "idx")
if normalized in {"go_to_url", "open_url", "navigate", "navigate_to", "open"} or "url" in normalized:
url = _first_param(params, "url", "href")
target = _short_url(url) if url else "новую страницу"
return "navigation", f"Перехожу на {target}."
if normalized in {"search", "search_google", "search_browser"} or "search" in normalized:
query = _first_param(params, "query", "text", "search_query", "value")
if query:
return "search", f"Ищу: {query}."
return "search", "Ищу информацию на странице."
if normalized in {"input_text", "type", "fill", "fill_form", "set_value"} or any(
token in normalized for token in ("input", "type", "fill")
):
value = _first_param(params, "text", "value", "query", "content")
if value:
return "input", f"Ввожу в поле: {value}."
return "input", "Ввожу данные в поле."
if normalized in {"click", "click_element", "click_element_by_index"} or "click" in normalized:
if label and not label.isdigit():
return "action", f"Нажимаю: {label}."
return None
if "scroll" in normalized:
direction = _first_param(params, "direction")
if direction:
return "action", f"Прокручиваю страницу: {direction}."
if "up" in normalized:
return "action", "Прокручиваю страницу вверх."
return "action", "Прокручиваю страницу вниз."
if normalized in {"send_keys", "press_key", "keyboard"} or any(token in normalized for token in ("key", "press")):
keys = _first_param(params, "keys", "key", "text")
if keys:
return "action", f"Нажимаю клавиши: {keys}."
return "action", "Нажимаю клавиши."
if "extract" in normalized:
query = _first_param(params, "query", "goal", "instruction")
if query:
return "read", f"Извлекаю данные: {query}."
return "read", "Извлекаю данные со страницы."
if normalized in {"find_elements", "find_element", "locate_element"} or "find_element" in normalized:
return None
if "wait" in normalized:
return "action", "Жду загрузку страницы."
if normalized in {"evaluate", "execute_script", "run_javascript"} or "javascript" in normalized:
return "action", "Проверяю страницу скриптом."
if "done" in normalized or "finish" in normalized:
return "done", "Завершаю браузерную задачу."
return None
def _format_action_result_content(content):
content = _strip_browser_use_prefix(_strip_browser_use_hint(_clean_text(content, limit=320)))
if not content:
return None
lower = content.lower()
if lower.startswith("navigated to "):
url = content[len("Navigated to ") :].strip()
return "navigation", f"Открыл страницу: {_short_url(url) or url}."
if lower.startswith("opened new tab with url "):
url = content[len("Opened new tab with url ") :].strip()
return "navigation", f"Открыл новую вкладку: {_short_url(url) or url}."
if lower.startswith(("http://", "https://")):
return "navigation", f"Открыл страницу: {_short_url(content) or content}."
if lower.startswith("clicked "):
label = _extract_quoted_label(content) or _extract_attribute_label(content)
if label:
return "action", f"Нажал: {label}."
target = content[len("Clicked ") :].strip()
target = re.sub(r"\s+(aria-label|id|role|name|type)=.*$", "", target, flags=re.IGNORECASE).strip()
target = target.replace("button", "кнопку").replace("input", "поле ввода").replace("span", "элемент")
if target in {"кнопку", "поле ввода", "элемент"}:
return "action", f"Нажал {target}."
return "action", f"Нажал: {target or 'элемент'}."
if lower.startswith("typed "):
value = _extract_quoted_label(content)
if value:
return "input", f"Ввёл в поиск: {value}."
typed = content[len("Typed ") :].strip()
return "input", f"Ввёл текст: {_clean_text(typed, 80)}."
wait_match = re.match(r"waited for\s+(.+)", content, flags=re.IGNORECASE)
if wait_match:
duration = wait_match.group(1).replace("seconds", "сек.").replace("second", "сек.")
return "action", f"Жду загрузку: {duration.rstrip('.')}."
if lower.startswith("scrolled"):
direction = "вверх" if "up" in lower else "вниз"
amount_match = re.search(r"scrolled\s+(?:up|down)\s+(.+)", content, flags=re.IGNORECASE)
amount = _clean_text(amount_match.group(1), 40) if amount_match else ""
return "action", f"Прокрутил страницу {direction}{f' на {amount}' if amount else ''}."
if lower.startswith("no elements found matching"):
return "read", "Не нашёл подходящее поле или кнопку на странице."
if lower.startswith("task completed successfully"):
return "done", "Browser-use сообщил, что задача выполнена."
return "read", f"Получил данные со страницы: {_clean_text(content, 180)}"
def _call_history(history, method_name):
method = getattr(history, method_name, None)
if not callable(method):
return []
try:
value = method()
except Exception:
return []
return list(value) if isinstance(value, (list, tuple)) else []
def _prune_runs_locked(now=None):
now = now or time.time()
for run_id, run in list(_RUNS.items()):
if run.get("done") and now - run.get("updated_at", now) > _EVENT_TTL_SEC:
_RUNS.pop(run_id, None)
def _ensure_run(run_id):
if not run_id:
return None
with _RUNS_LOCK:
_prune_runs_locked()
run = _RUNS.get(run_id)
if run is None:
now = time.time()
run = {
"run_id": run_id,
"seq": 0,
"events": [],
"done": False,
"started_at": now,
"updated_at": now,
}
_RUNS[run_id] = run
return run
def _append_event(run_id, phase, message, level="info", **data):
if not run_id or not message:
return None
message = _clean_text(message)
if not message:
return None
with _RUNS_LOCK:
run = _ensure_run(run_id)
if run is None:
return None
if run["events"]:
last = run["events"][-1]
if last.get("phase") == phase and last.get("message") == message:
return last
run["seq"] += 1
event = {
"seq": run["seq"],
"ts": _utc_now_iso(),
"phase": phase,
"level": level,
"message": message,
}
for key, value in data.items():
if value is not None:
event[key] = value
run["events"].append(event)
if len(run["events"]) > _EVENT_LIMIT:
run["events"] = run["events"][-_EVENT_LIMIT:]
run["updated_at"] = time.time()
return event
def _finish_run(run_id):
if not run_id:
return
with _RUNS_LOCK:
run = _ensure_run(run_id)
if run is not None:
run["done"] = True
run["updated_at"] = time.time()
def _get_events(run_id, after=0):
with _RUNS_LOCK:
run = _RUNS.get(run_id)
if run is None:
return None
events = [event for event in run["events"] if int(event.get("seq", 0)) > after]
return {
"success": True,
"run_id": run_id,
"events": events,
"done": bool(run.get("done")),
}
def _browser_log_to_event(raw_message, levelno):
msg = _clean_text(raw_message, limit=320)
if not msg:
return None
lower = msg.lower()
if _CAPTCHA_RE.search(lower):
return (
"human_help",
"Похоже, на странице проверка или капча. Откройте экран браузера и помогите пройти её.",
"help",
)
if levelno >= logging.ERROR:
return ("error", f"Browser-use сообщил об ошибке: {_clean_text(msg, 180)}", "error")
if levelno >= logging.WARNING and any(token in lower for token in ("failed", "error", "timeout")):
return ("warning", f"Возникла проблема в браузере: {_clean_text(msg, 180)}", "warning")
step_match = re.search(r"\bstep\s+(\d+)\b", lower)
if step_match:
return (
"step",
f"Шаг {step_match.group(1)}: анализирую страницу и выбираю следующее действие.",
"info",
)
if "go_to_url" in lower or "navigate" in lower or "open url" in lower:
url = _extract_url(msg)
target = _short_url(url) if url else "новую страницу"
return ("navigation", f"Перехожу на {target}.", "info")
if "click" in lower or "press" in lower or "button" in lower:
return ("action", "Кликаю по элементу на странице.", "info")
if any(token in lower for token in ("input_text", "type", "typing", "fill", "insert text")):
return ("input", "Ввожу данные в поле на странице.", "info")
if "scroll" in lower:
return ("action", "Прокручиваю страницу.", "info")
if any(token in lower for token in ("extract", "read", "content")):
return ("read", "Читаю содержимое страницы.", "info")
if "done" in lower or "final result" in lower or "finished" in lower:
return ("done", "Browser-use закончил выполнение задачи.", "done")
return None
class BrowserUseLiveLogHandler(logging.Handler):
def __init__(self, run_id):
super().__init__(level=logging.INFO)
self.run_id = run_id
self._last_emit_by_message = {}
def emit(self, record):
if not record.name.startswith("browser_use"):
return
try:
converted = _browser_log_to_event(record.getMessage(), record.levelno)
if not converted:
return
phase, message, level = converted
now = time.monotonic()
last_at = self._last_emit_by_message.get(message)
if last_at is not None and now - last_at < 8:
return
self._last_emit_by_message[message] = now
_append_event(self.run_id, phase, message, level=level)
except Exception:
logger.debug("Failed to convert browser-use log record", exc_info=True)
def _fetch_json(url):
with request.urlopen(url, timeout=2) as resp:
return json.loads(resp.read().decode("utf-8"))
async def _watch_browser_state(run_id, cdp_url, stop_event):
if not run_id or not cdp_url:
return
json_url = f"{cdp_url.rstrip('/')}/json"
last_url = None
human_help_sent = False
while not stop_event.is_set():
try:
pages = await asyncio.to_thread(_fetch_json, json_url)
if isinstance(pages, list):
page = next(
(
item
for item in pages
if item.get("type") == "page"
and item.get("url")
and not item.get("url", "").startswith(("devtools://", "chrome://"))
),
None,
)
if page:
url = page.get("url", "")
title = _clean_text(page.get("title", ""), limit=90)
if url and url != last_url:
short = _short_url(url)
if title:
message = f"Я на странице: {title}{short}"
else:
message = f"Я на странице: {short}"
_append_event(run_id, "page", message, url=url, title=title)
last_url = url
if not human_help_sent and _CAPTCHA_RE.search(f"{title} {url}"):
_append_event(
run_id,
"human_help",
"Похоже, на странице антибот-проверка. Откройте экран браузера и помогите пройти её.",
level="help",
url=url,
title=title,
)
human_help_sent = True
except Exception:
pass
try:
await asyncio.wait_for(stop_event.wait(), timeout=_STATE_POLL_INTERVAL)
except asyncio.TimeoutError:
continue
def _make_step_end_hook(run_id, cursor):
async def _on_step_end(agent):
history = getattr(agent, "history", None)
if history is None:
return
actions = _call_history(history, "model_actions")
start = int(cursor.get("actions", 0))
for action in actions[start:]:
formatted_action = _format_model_action(action)
if not formatted_action:
continue
phase, message = formatted_action
_append_event(
run_id,
phase,
message,
action_name=_normalize_model_action(action)[0],
)
cursor["actions"] = len(actions)
results = _call_history(history, "action_results")
result_start = int(cursor.get("results", 0))
for result in results[result_start:]:
data = _public_data(result)
error_text = _compact_value(data.get("error") or data.get("exception"), limit=180)
if error_text:
_append_event(run_id, "error", f"Ошибка действия: {error_text}", level="error")
continue
content = _compact_value(data.get("extracted_content") or data.get("content"), limit=320)
if content and not _CAPTCHA_RE.search(content):
formatted = _format_action_result_content(content)
if formatted:
phase, message = formatted
_append_event(run_id, phase, message)
elif content:
_append_event(
run_id,
"human_help",
"Похоже, страница просит проверку человека. Откройте экран браузера и помогите пройти её.",
level="help",
)
cursor["results"] = len(results)
return _on_step_end
def _json_response(handler, status_code, payload):
data = json.dumps(payload, ensure_ascii=False).encode("utf-8")
handler.send_response(status_code)
handler.send_header("Content-Type", "application/json; charset=utf-8")
handler.send_header("Content-Length", str(len(data)))
handler.end_headers()
handler.wfile.write(data)
async def run_browser_task(task, run_id=None):
_ensure_run(run_id)
_append_event(run_id, "start", "Запускаю browser-use задачу.")
logger.info(f"run_browser_task started with task: {task[:200]}...")
cdp_url = os.getenv("BROWSER_CDP_URL", "http://127.0.0.1:9222")
browser_view_url = os.getenv("BROWSER_VIEW_URL", "")
logger.info(f"CDP URL: {cdp_url}")
if browser_view_url:
_append_event(run_id, "view", f"Экран браузера доступен здесь: {browser_view_url}", view_url=browser_view_url)
browser = None
state_stop = asyncio.Event()
state_task = asyncio.create_task(_watch_browser_state(run_id, cdp_url, state_stop)) if run_id else None
live_log_handler = BrowserUseLiveLogHandler(run_id) if run_id else None
browser_use_logger = logging.getLogger("browser_use")
old_browser_use_level = browser_use_logger.level
if live_log_handler:
browser_use_logger.addHandler(live_log_handler)
if browser_use_logger.getEffectiveLevel() > logging.INFO:
browser_use_logger.setLevel(logging.INFO)
async def _cleanup():
state_stop.set()
if state_task:
try:
await asyncio.wait_for(state_task, timeout=1)
except Exception:
state_task.cancel()
if live_log_handler:
browser_use_logger.removeHandler(live_log_handler)
browser_use_logger.setLevel(old_browser_use_level)
try:
if browser is not None:
logger.info("Closing browser...")
await browser.close()
logger.info("Browser closed")
except Exception as close_err:
logger.warning(f"Browser close failed: {close_err}")
_finish_run(run_id)
try:
logger.info("Creating Browser...")
_append_event(run_id, "setup", "Подключаюсь к Chromium через CDP.")
browser = Browser(cdp_url=cdp_url)
logger.info("Browser created")
except Exception as err:
logger.error(f"Browser creation failed: {err}", exc_info=True)
_append_event(run_id, "error", f"Не удалось создать браузер: {_clean_text(err, 180)}", level="error")
await _cleanup()
return {"success": False, "run_id": run_id, "error": f"Browser creation failed: {err}"}
try:
logger.info("Creating LLM...")
model = os.getenv("MODEL_DEFAULT", "qwen3.5-122b")
api_key = os.getenv("OPENAI_API_KEY")
base_url = os.getenv("OPENAI_BASE_URL")
logger.info(
"LLM params: model=%s, base_url=%s, api_key=%s",
model,
(base_url[:80] + "...") if base_url and len(base_url) > 80 else base_url,
"set" if api_key else "missing",
)
_append_event(run_id, "setup", f"Готовлю модель управления браузером: {model}.")
llm = ChatOpenAI(
model=model,
api_key=api_key,
base_url=base_url,
temperature=0.0,
)
logger.info("LLM created")
except Exception as err:
logger.error(f"LLM creation failed: {err}", exc_info=True)
_append_event(run_id, "error", f"Не удалось создать модель для browser-use: {_clean_text(err, 180)}", level="error")
await _cleanup()
return {"success": False, "run_id": run_id, "error": f"LLM creation failed: {err}"}
try:
logger.info("Creating Agent...")
agent = Agent(task=task, llm=llm, browser=browser)
logger.info("Agent created")
_append_event(run_id, "setup", "Агент browser-use готов, начинаю действия на странице.")
except Exception as err:
logger.error(f"Agent creation failed: {err}", exc_info=True)
_append_event(run_id, "error", f"Не удалось создать browser-use агента: {_clean_text(err, 180)}", level="error")
await _cleanup()
return {"success": False, "run_id": run_id, "error": f"Agent creation failed: {err}"}
try:
logger.info("Running agent...")
_append_event(run_id, "running", "Выполняю задачу в браузере.")
step_cursor = {"actions": 0, "results": 0}
try:
history = await agent.run(on_step_end=_make_step_end_hook(run_id, step_cursor))
except TypeError as hook_err:
if "on_step_end" not in str(hook_err):
raise
logger.warning("browser-use does not support on_step_end hooks; falling back to plain run()")
history = await agent.run()
logger.info("Agent run completed")
for action in _call_history(history, "model_actions")[int(step_cursor.get("actions", 0)):]:
formatted_action = _format_model_action(action)
if not formatted_action:
continue
phase, message = formatted_action
_append_event(run_id, phase, message, action_name=_normalize_model_action(action)[0])
_append_event(run_id, "done", "Браузерная задача завершена.", level="done")
return {
"success": True,
"run_id": run_id,
"result": history.final_result(),
"browser_view": browser_view_url,
"events": (_get_events(run_id, after=0) or {}).get("events", [])[-50:],
}
except Exception as err:
logger.error(f"Agent run failed: {err}", exc_info=True)
_append_event(
run_id,
"error",
f"Браузерная автоматизация завершилась с ошибкой: {_clean_text(err, 180)}",
level="error",
)
return {"success": False, "run_id": run_id, "error": f"Browser automation failed: {err}"}
finally:
await _cleanup()
class BrowserUseRPCHandler(BaseHTTPRequestHandler):
def log_message(self, format_str, *args):
logger.info(f"HTTP {self.address_string()}: {format_str % args}")
def do_GET(self):
logger.info(f"GET {self.path}")
parsed = parse.urlparse(self.path)
if parsed.path == "/events":
params = parse.parse_qs(parsed.query)
run_id = (params.get("run_id") or [""])[0]
try:
after = int((params.get("after") or ["0"])[0])
except ValueError:
after = 0
payload = _get_events(run_id, after=after)
if payload is None:
_json_response(self, 404, {"success": False, "error": "Run not found"})
else:
_json_response(self, 200, payload)
return
if parsed.path != "/health":
_json_response(self, 404, {"success": False, "error": "Not found"})
return
try:
debug_url = os.getenv("BROWSER_HEALTH_URL", "http://127.0.0.1:9222/json/version")
with request.urlopen(debug_url, timeout=2):
pass
_json_response(self, 200, {"success": True})
except Exception as err:
logger.error(f"Health check failed: {err}")
_json_response(self, 503, {"success": False, "error": f"Browser is not ready: {err}"})
def do_POST(self):
logger.info(f"POST {self.path}")
if self.path != "/run":
_json_response(self, 404, {"success": False, "error": "Not found"})
return
try:
content_length = int(self.headers.get("Content-Length", "0"))
raw = self.rfile.read(content_length)
payload = json.loads(raw.decode("utf-8") if raw else "{}")
task = payload.get("task", "")
run_id = payload.get("run_id") or uuid.uuid4().hex
_ensure_run(run_id)
logger.info(f"RPC task received, run_id: {run_id}, task preview: {task[:200]}...")
if not isinstance(task, str) or not task.strip():
_json_response(self, 400, {"success": False, "error": "Field 'task' is required"})
return
logger.info("Starting browser task...")
result = asyncio.run(run_browser_task(task.strip(), run_id=run_id))
if isinstance(result, dict):
result.setdefault("run_id", run_id)
logger.info(f"Task completed: {result}")
code = 200 if result.get("success") else 500
_json_response(self, code, result)
except json.JSONDecodeError as err:
logger.error(f"JSON decode error: {err}")
_json_response(self, 400, {"success": False, "error": "Invalid JSON payload"})
except error.URLError as err:
logger.error(f"URLError: {err}")
_json_response(self, 503, {"success": False, "error": f"Transport error: {err}"})
except Exception as err:
logger.error(f"do_POST exception: {err}", exc_info=True)
_json_response(self, 500, {"success": False, "error": f"Internal error: {err}"})
def main():
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler()]
)
host = os.getenv("BROWSER_USE_RPC_HOST", "0.0.0.0")
port = int(os.getenv("BROWSER_USE_RPC_PORT", "8787"))
server = ThreadingHTTPServer((host, port), BrowserUseRPCHandler)
logger.info(f"browser-use RPC listening on {host}:{port}")
server.serve_forever()
if __name__ == "__main__":
main()