diff --git a/.env.example b/.env.example index 16fbeacf..3afe0b99 100644 --- a/.env.example +++ b/.env.example @@ -18,5 +18,9 @@ BROWSER_VIEW_URL= BROWSER_API_HOST=0.0.0.0 BROWSER_API_PORT=8088 BROWSER_USE_RPC_URL=http://browser:8787/run +BROWSER_USE_EVENTS_URL=http://browser:8787/events BROWSER_USE_RPC_TIMEOUT=900 -BROWSER_API_MAX_CONCURRENCY=2 \ No newline at end of file +BROWSER_LIVE_LOGS=true +BROWSER_LIVE_LOG_POLL_INTERVAL=1.5 +BROWSER_LIVE_LOG_MAX_EVENTS=40 +BROWSER_API_MAX_CONCURRENCY=2 diff --git a/READ.md b/READ.md index 41abd358..e91b3841 100644 --- a/READ.md +++ b/READ.md @@ -15,9 +15,13 @@ docker compose logs tunnel После команды логов листаешь терминал и ищешь ссылку https в рамке. Её вписываешь в переменную BROWSER_VIEW_URL. Чтобы увидеть действия агента, переходишь по данной сслыке и выбираешь vnc.html. Далее в мессенджере просишь агента сделать что-то через tool browser-use. +Во время выполнения Hermes будет обновлять одно progress-сообщение в Telegram: +текущая страница, короткие действия, ошибки и просьба помочь, если замечена капча +или антибот-проверка. Частоту и объем можно настроить через +`BROWSER_LIVE_LOG_POLL_INTERVAL` и `BROWSER_LIVE_LOG_MAX_EVENTS` в `.env`. Возможно придётся перезапустить контейнеры, но при перезапуске контейнеров меняется ссылка. ```commandline docker compose down docker compose up -d ``` -## Удачного пользования \ No newline at end of file +## Удачного пользования diff --git a/browser_env/browser_use_runner.py b/browser_env/browser_use_runner.py index 08ed6b42..cb7786a6 100644 --- a/browser_env/browser_use_runner.py +++ b/browser_env/browser_use_runner.py @@ -1,11 +1,567 @@ import asyncio import json +import logging import os +import re +import threading +import time +import uuid +from datetime import datetime, timezone from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer -from urllib import error, request +from urllib import error, parse, request from browser_use import Agent, Browser, ChatOpenAI +logger = logging.getLogger(__name__) + +_EVENT_LIMIT = int(os.getenv("BROWSER_USE_EVENT_LIMIT", "300")) +_EVENT_TTL_SEC = int(os.getenv("BROWSER_USE_EVENT_TTL_SEC", "3600")) +_STATE_POLL_INTERVAL = float(os.getenv("BROWSER_USE_STATE_POLL_INTERVAL", "3.0")) + +_RUNS = {} +_RUNS_LOCK = threading.RLock() + +_ANSI_RE = re.compile(r"\x1b\[[0-9;]*m") +_SPACE_RE = re.compile(r"\s+") +_URL_RE = re.compile(r"https?://[^\s\"'<>),]+") +_CAPTCHA_RE = re.compile( + r"(captcha|cloudflare|verify you are human|human verification|checking your browser|" + r"anti[- ]?bot|challenge|blocked)", + re.IGNORECASE, +) + + +def _utc_now_iso(): + return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") + + +def _clean_text(value, limit=220): + text = _ANSI_RE.sub("", str(value or "")) + text = _SPACE_RE.sub(" ", text).strip() + if len(text) > limit: + return text[: limit - 1].rstrip() + "…" + return text + + +def _short_url(url): + if not url: + return "" + parsed = parse.urlparse(url) + if parsed.scheme in {"http", "https"} and parsed.netloc: + path = parsed.path or "/" + if len(path) > 64: + path = path[:61] + "..." + return f"{parsed.netloc}{path}" + return _clean_text(url, limit=100) + + +def _extract_url(text): + match = _URL_RE.search(text or "") + return match.group(0) if match else "" + + +def _public_data(value): + if value is None: + return {} + if isinstance(value, dict): + return value + if hasattr(value, "model_dump"): + try: + dumped = value.model_dump(exclude_none=True) + return dumped if isinstance(dumped, dict) else {} + except Exception: + return {} + if hasattr(value, "dict"): + try: + dumped = value.dict(exclude_none=True) + return dumped if isinstance(dumped, dict) else {} + except Exception: + return {} + return {} + + +def _compact_value(value, limit=120): + if value is None: + return "" + if isinstance(value, (list, tuple)): + value = " ".join(str(item) for item in value if item is not None) + if isinstance(value, dict): + return "" + return _clean_text(value, limit=limit) + + +def _first_param(params, *names): + if not isinstance(params, dict): + return "" + for name in names: + if name in params: + value = _compact_value(params.get(name)) + if value: + return value + return "" + + +def _strip_browser_use_hint(text): + return re.sub(r"\s*💡\s*This is an autocomplete field\..*$", "", text, flags=re.IGNORECASE).strip() + + +def _strip_browser_use_prefix(text): + return re.sub(r"^\s*[🔗🔍✅❌⚠️🖱️⌨️📖📍➡️🌐]+\s*", "", text or "").strip() + + +def _extract_quoted_label(text): + match = re.search(r'"([^"]{1,90})"', text or "") + if match: + return _clean_text(match.group(1), limit=80) + match = re.search(r"'([^']{1,90})'", text or "") + if match: + return _clean_text(match.group(1), limit=80) + return "" + + +def _extract_attribute_label(text): + for attr in ("aria-label", "name", "title", "id"): + match = re.search(rf"\b{attr}=([^\s]+)", text or "", flags=re.IGNORECASE) + if match: + value = match.group(1).strip("\"'") + if value: + return _clean_text(value.replace("_", " "), limit=80) + return "" + + +def _normalize_model_action(action): + data = _public_data(action) + if not data: + name = action.__class__.__name__ if action is not None else "action" + return name, {} + + container = data.get("action") or data.get("actions") + if isinstance(container, list) and container: + return _normalize_model_action(container[0]) + if isinstance(container, dict): + return _normalize_model_action(container) + if isinstance(container, str) and container.strip(): + explicit_params = data.get("parameters") or data.get("params") or data.get("input") or data.get("args") or data + return container, _public_data(explicit_params) or data + + explicit_name = data.get("name") or data.get("type") + explicit_params = data.get("parameters") or data.get("params") or data.get("input") or data.get("args") + if explicit_name: + return str(explicit_name), _public_data(explicit_params) or data + + action_items = [(key, value) for key, value in data.items() if value not in (None, {}, [])] + if len(action_items) == 1: + name, params = action_items[0] + return str(name), _public_data(params) + + if len(data) == 1: + name, params = next(iter(data.items())) + return str(name), _public_data(params) + + return "action", data + + +def _format_model_action(action): + name, params = _normalize_model_action(action) + normalized = re.sub(r"[^a-z0-9]+", "_", name.lower()).strip("_") + label = _first_param(params, "label", "text", "title", "element", "button", "aria_label", "selector") + index = _first_param(params, "index", "element_index", "idx") + + if normalized in {"go_to_url", "open_url", "navigate", "navigate_to", "open"} or "url" in normalized: + url = _first_param(params, "url", "href") + target = _short_url(url) if url else "новую страницу" + return "navigation", f"Перехожу на {target}." + + if normalized in {"search", "search_google", "search_browser"} or "search" in normalized: + query = _first_param(params, "query", "text", "search_query", "value") + if query: + return "search", f"Ищу: {query}." + return "search", "Ищу информацию на странице." + + if normalized in {"input_text", "type", "fill", "fill_form", "set_value"} or any( + token in normalized for token in ("input", "type", "fill") + ): + value = _first_param(params, "text", "value", "query", "content") + if value: + return "input", f"Ввожу в поле: {value}." + return "input", "Ввожу данные в поле." + + if normalized in {"click", "click_element", "click_element_by_index"} or "click" in normalized: + if label and not label.isdigit(): + return "action", f"Нажимаю: {label}." + return None + + if "scroll" in normalized: + direction = _first_param(params, "direction") + if direction: + return "action", f"Прокручиваю страницу: {direction}." + if "up" in normalized: + return "action", "Прокручиваю страницу вверх." + return "action", "Прокручиваю страницу вниз." + + if normalized in {"send_keys", "press_key", "keyboard"} or any(token in normalized for token in ("key", "press")): + keys = _first_param(params, "keys", "key", "text") + if keys: + return "action", f"Нажимаю клавиши: {keys}." + return "action", "Нажимаю клавиши." + + if "extract" in normalized: + query = _first_param(params, "query", "goal", "instruction") + if query: + return "read", f"Извлекаю данные: {query}." + return "read", "Извлекаю данные со страницы." + + if normalized in {"find_elements", "find_element", "locate_element"} or "find_element" in normalized: + return None + + if "wait" in normalized: + return "action", "Жду загрузку страницы." + + if normalized in {"evaluate", "execute_script", "run_javascript"} or "javascript" in normalized: + return "action", "Проверяю страницу скриптом." + + if "done" in normalized or "finish" in normalized: + return "done", "Завершаю браузерную задачу." + + return None + + +def _format_action_result_content(content): + content = _strip_browser_use_prefix(_strip_browser_use_hint(_clean_text(content, limit=320))) + if not content: + return None + + lower = content.lower() + + if lower.startswith("navigated to "): + url = content[len("Navigated to ") :].strip() + return "navigation", f"Открыл страницу: {_short_url(url) or url}." + + if lower.startswith("opened new tab with url "): + url = content[len("Opened new tab with url ") :].strip() + return "navigation", f"Открыл новую вкладку: {_short_url(url) or url}." + + if lower.startswith(("http://", "https://")): + return "navigation", f"Открыл страницу: {_short_url(content) or content}." + + if lower.startswith("clicked "): + label = _extract_quoted_label(content) or _extract_attribute_label(content) + if label: + return "action", f"Нажал: {label}." + + target = content[len("Clicked ") :].strip() + target = re.sub(r"\s+(aria-label|id|role|name|type)=.*$", "", target, flags=re.IGNORECASE).strip() + target = target.replace("button", "кнопку").replace("input", "поле ввода").replace("span", "элемент") + if target in {"кнопку", "поле ввода", "элемент"}: + return "action", f"Нажал {target}." + return "action", f"Нажал: {target or 'элемент'}." + + if lower.startswith("typed "): + value = _extract_quoted_label(content) + if value: + return "input", f"Ввёл в поиск: {value}." + typed = content[len("Typed ") :].strip() + return "input", f"Ввёл текст: {_clean_text(typed, 80)}." + + wait_match = re.match(r"waited for\s+(.+)", content, flags=re.IGNORECASE) + if wait_match: + duration = wait_match.group(1).replace("seconds", "сек.").replace("second", "сек.") + return "action", f"Жду загрузку: {duration.rstrip('.')}." + + if lower.startswith("scrolled"): + direction = "вверх" if "up" in lower else "вниз" + amount_match = re.search(r"scrolled\s+(?:up|down)\s+(.+)", content, flags=re.IGNORECASE) + amount = _clean_text(amount_match.group(1), 40) if amount_match else "" + return "action", f"Прокрутил страницу {direction}{f' на {amount}' if amount else ''}." + + if lower.startswith("no elements found matching"): + return "read", "Не нашёл подходящее поле или кнопку на странице." + + if lower.startswith("task completed successfully"): + return "done", "Browser-use сообщил, что задача выполнена." + + return "read", f"Получил данные со страницы: {_clean_text(content, 180)}" + + +def _call_history(history, method_name): + method = getattr(history, method_name, None) + if not callable(method): + return [] + try: + value = method() + except Exception: + return [] + return list(value) if isinstance(value, (list, tuple)) else [] + + +def _prune_runs_locked(now=None): + now = now or time.time() + for run_id, run in list(_RUNS.items()): + if run.get("done") and now - run.get("updated_at", now) > _EVENT_TTL_SEC: + _RUNS.pop(run_id, None) + + +def _ensure_run(run_id): + if not run_id: + return None + with _RUNS_LOCK: + _prune_runs_locked() + run = _RUNS.get(run_id) + if run is None: + now = time.time() + run = { + "run_id": run_id, + "seq": 0, + "events": [], + "done": False, + "started_at": now, + "updated_at": now, + } + _RUNS[run_id] = run + return run + + +def _append_event(run_id, phase, message, level="info", **data): + if not run_id or not message: + return None + + message = _clean_text(message) + if not message: + return None + + with _RUNS_LOCK: + run = _ensure_run(run_id) + if run is None: + return None + + if run["events"]: + last = run["events"][-1] + if last.get("phase") == phase and last.get("message") == message: + return last + + run["seq"] += 1 + event = { + "seq": run["seq"], + "ts": _utc_now_iso(), + "phase": phase, + "level": level, + "message": message, + } + for key, value in data.items(): + if value is not None: + event[key] = value + + run["events"].append(event) + if len(run["events"]) > _EVENT_LIMIT: + run["events"] = run["events"][-_EVENT_LIMIT:] + run["updated_at"] = time.time() + return event + + +def _finish_run(run_id): + if not run_id: + return + with _RUNS_LOCK: + run = _ensure_run(run_id) + if run is not None: + run["done"] = True + run["updated_at"] = time.time() + + +def _get_events(run_id, after=0): + with _RUNS_LOCK: + run = _RUNS.get(run_id) + if run is None: + return None + events = [event for event in run["events"] if int(event.get("seq", 0)) > after] + return { + "success": True, + "run_id": run_id, + "events": events, + "done": bool(run.get("done")), + } + + +def _browser_log_to_event(raw_message, levelno): + msg = _clean_text(raw_message, limit=320) + if not msg: + return None + + lower = msg.lower() + + if _CAPTCHA_RE.search(lower): + return ( + "human_help", + "Похоже, на странице проверка или капча. Откройте экран браузера и помогите пройти её.", + "help", + ) + + if levelno >= logging.ERROR: + return ("error", f"Browser-use сообщил об ошибке: {_clean_text(msg, 180)}", "error") + + if levelno >= logging.WARNING and any(token in lower for token in ("failed", "error", "timeout")): + return ("warning", f"Возникла проблема в браузере: {_clean_text(msg, 180)}", "warning") + + step_match = re.search(r"\bstep\s+(\d+)\b", lower) + if step_match: + return ( + "step", + f"Шаг {step_match.group(1)}: анализирую страницу и выбираю следующее действие.", + "info", + ) + + if "go_to_url" in lower or "navigate" in lower or "open url" in lower: + url = _extract_url(msg) + target = _short_url(url) if url else "новую страницу" + return ("navigation", f"Перехожу на {target}.", "info") + + if "click" in lower or "press" in lower or "button" in lower: + return ("action", "Кликаю по элементу на странице.", "info") + + if any(token in lower for token in ("input_text", "type", "typing", "fill", "insert text")): + return ("input", "Ввожу данные в поле на странице.", "info") + + if "scroll" in lower: + return ("action", "Прокручиваю страницу.", "info") + + if any(token in lower for token in ("extract", "read", "content")): + return ("read", "Читаю содержимое страницы.", "info") + + if "done" in lower or "final result" in lower or "finished" in lower: + return ("done", "Browser-use закончил выполнение задачи.", "done") + + return None + + +class BrowserUseLiveLogHandler(logging.Handler): + def __init__(self, run_id): + super().__init__(level=logging.INFO) + self.run_id = run_id + self._last_emit_by_message = {} + + def emit(self, record): + if not record.name.startswith("browser_use"): + return + try: + converted = _browser_log_to_event(record.getMessage(), record.levelno) + if not converted: + return + phase, message, level = converted + now = time.monotonic() + last_at = self._last_emit_by_message.get(message) + if last_at is not None and now - last_at < 8: + return + self._last_emit_by_message[message] = now + _append_event(self.run_id, phase, message, level=level) + except Exception: + logger.debug("Failed to convert browser-use log record", exc_info=True) + + +def _fetch_json(url): + with request.urlopen(url, timeout=2) as resp: + return json.loads(resp.read().decode("utf-8")) + + +async def _watch_browser_state(run_id, cdp_url, stop_event): + if not run_id or not cdp_url: + return + + json_url = f"{cdp_url.rstrip('/')}/json" + last_url = None + human_help_sent = False + + while not stop_event.is_set(): + try: + pages = await asyncio.to_thread(_fetch_json, json_url) + if isinstance(pages, list): + page = next( + ( + item + for item in pages + if item.get("type") == "page" + and item.get("url") + and not item.get("url", "").startswith(("devtools://", "chrome://")) + ), + None, + ) + if page: + url = page.get("url", "") + title = _clean_text(page.get("title", ""), limit=90) + if url and url != last_url: + short = _short_url(url) + if title: + message = f"Я на странице: {title} — {short}" + else: + message = f"Я на странице: {short}" + _append_event(run_id, "page", message, url=url, title=title) + last_url = url + + if not human_help_sent and _CAPTCHA_RE.search(f"{title} {url}"): + _append_event( + run_id, + "human_help", + "Похоже, на странице антибот-проверка. Откройте экран браузера и помогите пройти её.", + level="help", + url=url, + title=title, + ) + human_help_sent = True + except Exception: + pass + + try: + await asyncio.wait_for(stop_event.wait(), timeout=_STATE_POLL_INTERVAL) + except asyncio.TimeoutError: + continue + + +def _make_step_end_hook(run_id, cursor): + async def _on_step_end(agent): + history = getattr(agent, "history", None) + if history is None: + return + + actions = _call_history(history, "model_actions") + start = int(cursor.get("actions", 0)) + for action in actions[start:]: + formatted_action = _format_model_action(action) + if not formatted_action: + continue + phase, message = formatted_action + _append_event( + run_id, + phase, + message, + action_name=_normalize_model_action(action)[0], + ) + cursor["actions"] = len(actions) + + results = _call_history(history, "action_results") + result_start = int(cursor.get("results", 0)) + for result in results[result_start:]: + data = _public_data(result) + error_text = _compact_value(data.get("error") or data.get("exception"), limit=180) + if error_text: + _append_event(run_id, "error", f"Ошибка действия: {error_text}", level="error") + continue + + content = _compact_value(data.get("extracted_content") or data.get("content"), limit=320) + if content and not _CAPTCHA_RE.search(content): + formatted = _format_action_result_content(content) + if formatted: + phase, message = formatted + _append_event(run_id, phase, message) + elif content: + _append_event( + run_id, + "human_help", + "Похоже, страница просит проверку человека. Откройте экран браузера и помогите пройти её.", + level="help", + ) + cursor["results"] = len(results) + + return _on_step_end + def _json_response(handler, status_code, payload): data = json.dumps(payload, ensure_ascii=False).encode("utf-8") @@ -16,40 +572,154 @@ def _json_response(handler, status_code, payload): handler.wfile.write(data) -async def run_browser_task(task): +async def run_browser_task(task, run_id=None): + _ensure_run(run_id) + _append_event(run_id, "start", "Запускаю browser-use задачу.") + logger.info(f"run_browser_task started with task: {task[:200]}...") cdp_url = os.getenv("BROWSER_CDP_URL", "http://127.0.0.1:9222") browser_view_url = os.getenv("BROWSER_VIEW_URL", "") + logger.info(f"CDP URL: {cdp_url}") + if browser_view_url: + _append_event(run_id, "view", f"Экран браузера доступен здесь: {browser_view_url}", view_url=browser_view_url) - browser = Browser(cdp_url=cdp_url) + browser = None + state_stop = asyncio.Event() + state_task = asyncio.create_task(_watch_browser_state(run_id, cdp_url, state_stop)) if run_id else None + live_log_handler = BrowserUseLiveLogHandler(run_id) if run_id else None + browser_use_logger = logging.getLogger("browser_use") + old_browser_use_level = browser_use_logger.level + if live_log_handler: + browser_use_logger.addHandler(live_log_handler) + if browser_use_logger.getEffectiveLevel() > logging.INFO: + browser_use_logger.setLevel(logging.INFO) - llm = ChatOpenAI( - model=os.getenv("MODEL_DEFAULT", "qwen3.5-122b"), - api_key=os.getenv("OPENAI_API_KEY"), - base_url=os.getenv("OPENAI_BASE_URL"), - temperature=0.0, - ) - - agent = Agent(task=task, llm=llm, browser=browser) + async def _cleanup(): + state_stop.set() + if state_task: + try: + await asyncio.wait_for(state_task, timeout=1) + except Exception: + state_task.cancel() + if live_log_handler: + browser_use_logger.removeHandler(live_log_handler) + browser_use_logger.setLevel(old_browser_use_level) + try: + if browser is not None: + logger.info("Closing browser...") + await browser.close() + logger.info("Browser closed") + except Exception as close_err: + logger.warning(f"Browser close failed: {close_err}") + _finish_run(run_id) try: - history = await agent.run() + logger.info("Creating Browser...") + _append_event(run_id, "setup", "Подключаюсь к Chromium через CDP.") + browser = Browser(cdp_url=cdp_url) + logger.info("Browser created") + except Exception as err: + logger.error(f"Browser creation failed: {err}", exc_info=True) + _append_event(run_id, "error", f"Не удалось создать браузер: {_clean_text(err, 180)}", level="error") + await _cleanup() + return {"success": False, "run_id": run_id, "error": f"Browser creation failed: {err}"} + + try: + logger.info("Creating LLM...") + model = os.getenv("MODEL_DEFAULT", "qwen3.5-122b") + api_key = os.getenv("OPENAI_API_KEY") + base_url = os.getenv("OPENAI_BASE_URL") + logger.info( + "LLM params: model=%s, base_url=%s, api_key=%s", + model, + (base_url[:80] + "...") if base_url and len(base_url) > 80 else base_url, + "set" if api_key else "missing", + ) + _append_event(run_id, "setup", f"Готовлю модель управления браузером: {model}.") + llm = ChatOpenAI( + model=model, + api_key=api_key, + base_url=base_url, + temperature=0.0, + ) + logger.info("LLM created") + except Exception as err: + logger.error(f"LLM creation failed: {err}", exc_info=True) + _append_event(run_id, "error", f"Не удалось создать модель для browser-use: {_clean_text(err, 180)}", level="error") + await _cleanup() + return {"success": False, "run_id": run_id, "error": f"LLM creation failed: {err}"} + + try: + logger.info("Creating Agent...") + agent = Agent(task=task, llm=llm, browser=browser) + logger.info("Agent created") + _append_event(run_id, "setup", "Агент browser-use готов, начинаю действия на странице.") + except Exception as err: + logger.error(f"Agent creation failed: {err}", exc_info=True) + _append_event(run_id, "error", f"Не удалось создать browser-use агента: {_clean_text(err, 180)}", level="error") + await _cleanup() + return {"success": False, "run_id": run_id, "error": f"Agent creation failed: {err}"} + + try: + logger.info("Running agent...") + _append_event(run_id, "running", "Выполняю задачу в браузере.") + step_cursor = {"actions": 0, "results": 0} + try: + history = await agent.run(on_step_end=_make_step_end_hook(run_id, step_cursor)) + except TypeError as hook_err: + if "on_step_end" not in str(hook_err): + raise + logger.warning("browser-use does not support on_step_end hooks; falling back to plain run()") + history = await agent.run() + logger.info("Agent run completed") + for action in _call_history(history, "model_actions")[int(step_cursor.get("actions", 0)):]: + formatted_action = _format_model_action(action) + if not formatted_action: + continue + phase, message = formatted_action + _append_event(run_id, phase, message, action_name=_normalize_model_action(action)[0]) + _append_event(run_id, "done", "Браузерная задача завершена.", level="done") return { "success": True, + "run_id": run_id, "result": history.final_result(), "browser_view": browser_view_url, + "events": (_get_events(run_id, after=0) or {}).get("events", [])[-50:], } except Exception as err: - return {"success": False, "error": f"Browser automation failed: {err}"} + logger.error(f"Agent run failed: {err}", exc_info=True) + _append_event( + run_id, + "error", + f"Браузерная автоматизация завершилась с ошибкой: {_clean_text(err, 180)}", + level="error", + ) + return {"success": False, "run_id": run_id, "error": f"Browser automation failed: {err}"} finally: - try: - await browser.close() - except Exception: - pass + await _cleanup() class BrowserUseRPCHandler(BaseHTTPRequestHandler): + def log_message(self, format_str, *args): + logger.info(f"HTTP {self.address_string()}: {format_str % args}") + def do_GET(self): - if self.path != "/health": + logger.info(f"GET {self.path}") + parsed = parse.urlparse(self.path) + if parsed.path == "/events": + params = parse.parse_qs(parsed.query) + run_id = (params.get("run_id") or [""])[0] + try: + after = int((params.get("after") or ["0"])[0]) + except ValueError: + after = 0 + payload = _get_events(run_id, after=after) + if payload is None: + _json_response(self, 404, {"success": False, "error": "Run not found"}) + else: + _json_response(self, 200, payload) + return + + if parsed.path != "/health": _json_response(self, 404, {"success": False, "error": "Not found"}) return @@ -59,9 +729,11 @@ class BrowserUseRPCHandler(BaseHTTPRequestHandler): pass _json_response(self, 200, {"success": True}) except Exception as err: + logger.error(f"Health check failed: {err}") _json_response(self, 503, {"success": False, "error": f"Browser is not ready: {err}"}) def do_POST(self): + logger.info(f"POST {self.path}") if self.path != "/run": _json_response(self, 404, {"success": False, "error": "Not found"}) return @@ -71,29 +743,41 @@ class BrowserUseRPCHandler(BaseHTTPRequestHandler): raw = self.rfile.read(content_length) payload = json.loads(raw.decode("utf-8") if raw else "{}") task = payload.get("task", "") + run_id = payload.get("run_id") or uuid.uuid4().hex + _ensure_run(run_id) + logger.info(f"RPC task received, run_id: {run_id}, task preview: {task[:200]}...") if not isinstance(task, str) or not task.strip(): _json_response(self, 400, {"success": False, "error": "Field 'task' is required"}) return - result = asyncio.run(run_browser_task(task.strip())) + logger.info("Starting browser task...") + result = asyncio.run(run_browser_task(task.strip(), run_id=run_id)) + if isinstance(result, dict): + result.setdefault("run_id", run_id) + logger.info(f"Task completed: {result}") code = 200 if result.get("success") else 500 _json_response(self, code, result) - except json.JSONDecodeError: + except json.JSONDecodeError as err: + logger.error(f"JSON decode error: {err}") _json_response(self, 400, {"success": False, "error": "Invalid JSON payload"}) except error.URLError as err: + logger.error(f"URLError: {err}") _json_response(self, 503, {"success": False, "error": f"Transport error: {err}"}) except Exception as err: + logger.error(f"do_POST exception: {err}", exc_info=True) _json_response(self, 500, {"success": False, "error": f"Internal error: {err}"}) - def log_message(self, format_str, *args): - return - def main(): + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[logging.StreamHandler()] + ) host = os.getenv("BROWSER_USE_RPC_HOST", "0.0.0.0") port = int(os.getenv("BROWSER_USE_RPC_PORT", "8787")) server = ThreadingHTTPServer((host, port), BrowserUseRPCHandler) - print(f"browser-use RPC listening on {host}:{port}") + logger.info(f"browser-use RPC listening on {host}:{port}") server.serve_forever() diff --git a/hermes_code/gateway/run.py b/hermes_code/gateway/run.py index c8cfae5d..9451e27d 100644 --- a/hermes_code/gateway/run.py +++ b/hermes_code/gateway/run.py @@ -4936,6 +4936,15 @@ class GatewayRunner: """Callback invoked by agent when a tool is called.""" if not progress_queue: return + + # Long-running tools can emit already-formatted live updates. + # Keep these out of the normal "tool(args)" formatter so Telegram + # receives readable, compact lines in the edited progress message. + if isinstance(args, dict) and args.get("_browser_live"): + msg = (preview or "").strip() + if msg: + progress_queue.put(msg) + return # "new" mode: only report when tool changes if progress_mode == "new" and tool_name == last_tool[0]: @@ -4990,6 +4999,25 @@ class GatewayRunner: if not adapter: return + max_progress_chars = int(os.getenv("HERMES_TOOL_PROGRESS_MAX_CHARS", "3500")) + + def _progress_text(lines): + text = "\n".join(str(line) for line in lines if str(line).strip()) + if len(text) <= max_progress_chars: + return text + + kept = [] + current_len = len("…\n") + for line in reversed(lines): + line = str(line) + next_len = current_len + len(line) + (1 if kept else 0) + if next_len > max_progress_chars: + break + kept.append(line) + current_len = next_len + kept.reverse() + return "…\n" + "\n".join(kept) + progress_lines = [] # Accumulated tool lines progress_msg_id = None # ID of the progress message to edit can_edit = True # False once an edit fails (platform doesn't support it) @@ -5010,7 +5038,7 @@ class GatewayRunner: if can_edit and progress_msg_id is not None: # Try to edit the existing progress message - full_text = "\n".join(progress_lines) + full_text = _progress_text(progress_lines) result = await adapter.edit_message( chat_id=source.chat_id, message_id=progress_msg_id, @@ -5024,7 +5052,7 @@ class GatewayRunner: else: if can_edit: # First tool: send all accumulated text as new message - full_text = "\n".join(progress_lines) + full_text = _progress_text(progress_lines) result = await adapter.send(chat_id=source.chat_id, content=full_text, metadata=_progress_metadata) else: # Editing unsupported: send just this line @@ -5053,7 +5081,7 @@ class GatewayRunner: break # Final edit with all remaining tools (only if editing works) if can_edit and progress_lines and progress_msg_id: - full_text = "\n".join(progress_lines) + full_text = _progress_text(progress_lines) try: await adapter.edit_message( chat_id=source.chat_id, diff --git a/hermes_code/model_tools.py b/hermes_code/model_tools.py index ceae2ceb..e57695fa 100644 --- a/hermes_code/model_tools.py +++ b/hermes_code/model_tools.py @@ -374,6 +374,7 @@ def handle_function_call( enabled_tools: Optional[List[str]] = None, honcho_manager: Optional[Any] = None, honcho_session_key: Optional[str] = None, + tool_progress_callback: Optional[Any] = None, ) -> str: """ Main function call dispatcher that routes calls to the tool registry. @@ -387,6 +388,8 @@ def handle_function_call( execute_code uses this list to determine which sandbox tools to generate. Falls back to the process-global ``_last_resolved_tool_names`` for backward compat. + tool_progress_callback: Optional gateway/CLI progress callback that + long-running tools can use for live status updates. Returns: Function result as a JSON string. @@ -420,6 +423,7 @@ def handle_function_call( enabled_tools=sandbox_enabled, honcho_manager=honcho_manager, honcho_session_key=honcho_session_key, + tool_progress_callback=tool_progress_callback, ) else: result = registry.dispatch( @@ -428,6 +432,7 @@ def handle_function_call( user_task=user_task, honcho_manager=honcho_manager, honcho_session_key=honcho_session_key, + tool_progress_callback=tool_progress_callback, ) try: diff --git a/hermes_code/run_agent.py b/hermes_code/run_agent.py index 08e2807b..3780526e 100644 --- a/hermes_code/run_agent.py +++ b/hermes_code/run_agent.py @@ -4709,6 +4709,7 @@ class AIAgent: enabled_tools=list(self.valid_tool_names) if self.valid_tool_names else None, honcho_manager=self._honcho, honcho_session_key=self._honcho_session_key, + tool_progress_callback=self.tool_progress_callback, ) def _execute_tool_calls_concurrent(self, assistant_message, messages: list, effective_task_id: str, api_call_count: int = 0) -> None: @@ -5077,6 +5078,7 @@ class AIAgent: enabled_tools=list(self.valid_tool_names) if self.valid_tool_names else None, honcho_manager=self._honcho, honcho_session_key=self._honcho_session_key, + tool_progress_callback=self.tool_progress_callback, ) _spinner_result = function_result except Exception as tool_error: @@ -5093,6 +5095,7 @@ class AIAgent: enabled_tools=list(self.valid_tool_names) if self.valid_tool_names else None, honcho_manager=self._honcho, honcho_session_key=self._honcho_session_key, + tool_progress_callback=self.tool_progress_callback, ) except Exception as tool_error: function_result = f"Error executing tool '{function_name}': {tool_error}" diff --git a/hermes_code/tests/gateway/test_run_progress_topics.py b/hermes_code/tests/gateway/test_run_progress_topics.py index c4839133..b8fcc8e1 100644 --- a/hermes_code/tests/gateway/test_run_progress_topics.py +++ b/hermes_code/tests/gateway/test_run_progress_topics.py @@ -71,6 +71,38 @@ class FakeAgent: } +class FakeBrowserLiveAgent(FakeAgent): + def run_conversation(self, message, conversation_history=None, task_id=None): + self.tool_progress_callback( + "internet_browser", + "📍 Я на странице: example.com", + {"_browser_live": True}, + ) + time.sleep(0.35) + return { + "final_response": "done", + "messages": [], + "api_calls": 1, + } + + +class FakeLongBrowserLiveAgent(FakeAgent): + def run_conversation(self, message, conversation_history=None, task_id=None): + for index in range(8): + self.tool_progress_callback( + "internet_browser", + f"📍 Событие браузера номер {index}: " + ("x" * 50), + {"_browser_live": True}, + ) + time.sleep(0.05) + time.sleep(0.35) + return { + "final_response": "done", + "messages": [], + "api_calls": 1, + } + + def _make_runner(adapter): gateway_run = importlib.import_module("gateway.run") GatewayRunner = gateway_run.GatewayRunner @@ -133,3 +165,81 @@ async def test_run_agent_progress_stays_in_originating_topic(monkeypatch, tmp_pa ] assert adapter.edits assert all(call["metadata"] == {"thread_id": "17585"} for call in adapter.typing) + + +@pytest.mark.asyncio +async def test_browser_live_progress_uses_raw_message(monkeypatch, tmp_path): + monkeypatch.setenv("HERMES_TOOL_PROGRESS_MODE", "all") + + fake_dotenv = types.ModuleType("dotenv") + fake_dotenv.load_dotenv = lambda *args, **kwargs: None + monkeypatch.setitem(sys.modules, "dotenv", fake_dotenv) + + fake_run_agent = types.ModuleType("run_agent") + fake_run_agent.AIAgent = FakeBrowserLiveAgent + monkeypatch.setitem(sys.modules, "run_agent", fake_run_agent) + + adapter = ProgressCaptureAdapter() + runner = _make_runner(adapter) + gateway_run = importlib.import_module("gateway.run") + monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path) + monkeypatch.setattr(gateway_run, "_resolve_runtime_agent_kwargs", lambda: {"api_key": "fake"}) + source = SessionSource( + platform=Platform.TELEGRAM, + chat_id="-1001", + chat_type="group", + thread_id="17585", + ) + + result = await runner._run_agent( + message="hello", + context_prompt="", + history=[], + source=source, + session_id="sess-1", + session_key="agent:main:telegram:group:-1001:17585", + ) + + assert result["final_response"] == "done" + assert adapter.sent[0]["content"] == "📍 Я на странице: example.com" + assert "internet_browser" not in adapter.sent[0]["content"] + + +@pytest.mark.asyncio +async def test_browser_live_progress_is_capped_for_telegram(monkeypatch, tmp_path): + monkeypatch.setenv("HERMES_TOOL_PROGRESS_MODE", "all") + monkeypatch.setenv("HERMES_TOOL_PROGRESS_MAX_CHARS", "180") + + fake_dotenv = types.ModuleType("dotenv") + fake_dotenv.load_dotenv = lambda *args, **kwargs: None + monkeypatch.setitem(sys.modules, "dotenv", fake_dotenv) + + fake_run_agent = types.ModuleType("run_agent") + fake_run_agent.AIAgent = FakeLongBrowserLiveAgent + monkeypatch.setitem(sys.modules, "run_agent", fake_run_agent) + + adapter = ProgressCaptureAdapter() + runner = _make_runner(adapter) + gateway_run = importlib.import_module("gateway.run") + monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path) + monkeypatch.setattr(gateway_run, "_resolve_runtime_agent_kwargs", lambda: {"api_key": "fake"}) + source = SessionSource( + platform=Platform.TELEGRAM, + chat_id="-1001", + chat_type="group", + thread_id="17585", + ) + + result = await runner._run_agent( + message="hello", + context_prompt="", + history=[], + source=source, + session_id="sess-1", + session_key="agent:main:telegram:group:-1001:17585", + ) + + assert result["final_response"] == "done" + assert adapter.edits + assert all(len(call["content"]) <= 180 for call in adapter.edits) + assert adapter.edits[-1]["content"].startswith("…\n") diff --git a/hermes_code/tests/tools/test_browser_use_live_events.py b/hermes_code/tests/tools/test_browser_use_live_events.py new file mode 100644 index 00000000..1f2941e1 --- /dev/null +++ b/hermes_code/tests/tools/test_browser_use_live_events.py @@ -0,0 +1,154 @@ +import asyncio + +from tools import browser_use_tool + + +def test_build_events_url_replaces_run_endpoint(monkeypatch): + monkeypatch.delenv("BROWSER_USE_EVENTS_URL", raising=False) + + assert ( + browser_use_tool._build_events_url("http://browser:8787/run") + == "http://browser:8787/events" + ) + assert ( + browser_use_tool._build_events_url("http://browser:8787/api/run?x=1") + == "http://browser:8787/api/events" + ) + + +def test_build_events_url_uses_override(monkeypatch): + monkeypatch.setenv("BROWSER_USE_EVENTS_URL", "http://agent/events") + + assert browser_use_tool._build_events_url("http://browser:8787/run") == "http://agent/events" + + +def test_format_event_for_progress_adds_human_help_link(): + text = browser_use_tool._format_event_for_progress( + { + "phase": "human_help", + "level": "help", + "message": "Похоже, нужна капча.", + }, + vnc_url="https://vnc.example", + ) + + assert text == "🧑‍💻 Нужна помощь: Похоже, нужна капча. Экран: https://vnc.example" + + +def test_emit_unseen_events_keeps_high_priority_after_limit(): + calls = [] + + def progress_callback(name, preview, args): + calls.append((name, preview, args)) + + last_seq = {"value": 0} + emitted = browser_use_tool._emit_unseen_events( + [ + {"seq": 1, "phase": "page", "message": "Я на странице: example.com"}, + {"seq": 2, "phase": "page", "message": "Я на странице: example.org"}, + { + "seq": 3, + "phase": "human_help", + "level": "help", + "message": "Нужна капча.", + }, + ], + last_seq, + progress_callback, + "https://vnc.example", + max_events=1, + ) + + assert emitted == 2 + assert last_seq["value"] == 3 + assert [call[1] for call in calls] == [ + "📍 Я на странице: example.com", + "🧑‍💻 Нужна помощь: Нужна капча. Экран: https://vnc.example", + ] + assert all(call[0] == "internet_browser" for call in calls) + assert all(call[2]["_browser_live"] is True for call in calls) + + +def test_poll_live_events_emits_until_done(monkeypatch): + responses = [ + { + "success": True, + "events": [{"seq": 1, "phase": "start", "message": "Запускаю."}], + "done": False, + }, + { + "success": True, + "events": [{"seq": 2, "phase": "done", "level": "done", "message": "Готово."}], + "done": True, + }, + ] + + def fake_fetch(events_url, run_id, after): + return responses.pop(0) + + calls = [] + + def progress_callback(name, preview, args): + calls.append((name, preview, args)) + + monkeypatch.setattr(browser_use_tool, "_fetch_browser_events", fake_fetch) + monkeypatch.setenv("BROWSER_LIVE_LOG_POLL_INTERVAL", "0.1") + + async def run_poll(): + stop_event = asyncio.Event() + last_seq = {"value": 0} + await browser_use_tool._poll_live_events( + "http://browser:8787/events", + "run-1", + progress_callback, + stop_event, + "", + last_seq, + ) + return last_seq + + last_seq = asyncio.run(run_poll()) + + assert last_seq["value"] == 2 + assert [call[1] for call in calls] == ["🌐 Запускаю.", "✅ Готово."] + + +def test_poll_live_events_applies_event_limit_per_poll(monkeypatch): + responses = [ + { + "success": True, + "events": [{"seq": 1, "phase": "page", "message": "Первая страница."}], + "done": False, + }, + { + "success": True, + "events": [{"seq": 2, "phase": "action", "message": "Нажимаю play."}], + "done": True, + }, + ] + + def fake_fetch(events_url, run_id, after): + return responses.pop(0) + + calls = [] + + def progress_callback(name, preview, args): + calls.append(preview) + + monkeypatch.setattr(browser_use_tool, "_fetch_browser_events", fake_fetch) + monkeypatch.setenv("BROWSER_LIVE_LOG_POLL_INTERVAL", "0.1") + monkeypatch.setenv("BROWSER_LIVE_LOG_MAX_EVENTS", "1") + + async def run_poll(): + await browser_use_tool._poll_live_events( + "http://browser:8787/events", + "run-1", + progress_callback, + asyncio.Event(), + "", + {"value": 0}, + ) + + asyncio.run(run_poll()) + + assert calls == ["📍 Первая страница.", "🖱️ Нажимаю play."] diff --git a/hermes_code/tests/tools/test_browser_use_runner_events.py b/hermes_code/tests/tools/test_browser_use_runner_events.py new file mode 100644 index 00000000..e50beb9e --- /dev/null +++ b/hermes_code/tests/tools/test_browser_use_runner_events.py @@ -0,0 +1,216 @@ +import importlib.util +import asyncio +import logging +import sys +import types +from pathlib import Path + + +def _load_runner(monkeypatch): + fake_browser_use = types.ModuleType("browser_use") + fake_browser_use.Agent = object + fake_browser_use.Browser = object + fake_browser_use.ChatOpenAI = object + monkeypatch.setitem(sys.modules, "browser_use", fake_browser_use) + + path = Path(__file__).resolve().parents[3] / "browser_env" / "browser_use_runner.py" + spec = importlib.util.spec_from_file_location("test_browser_use_runner", path) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + return module + + +def test_event_store_returns_incremental_events(monkeypatch): + runner = _load_runner(monkeypatch) + runner._RUNS.clear() + + runner._append_event("run-1", "start", "Запускаю.") + runner._append_event("run-1", "page", "Я на странице: example.com") + runner._finish_run("run-1") + + payload = runner._get_events("run-1", after=1) + + assert payload["success"] is True + assert payload["done"] is True + assert [event["message"] for event in payload["events"]] == ["Я на странице: example.com"] + + +def test_browser_use_log_translation_detects_actions_and_captcha(monkeypatch): + runner = _load_runner(monkeypatch) + + assert runner._browser_log_to_event("Step 3: deciding next action", logging.INFO) == ( + "step", + "Шаг 3: анализирую страницу и выбираю следующее действие.", + "info", + ) + assert runner._browser_log_to_event("Action: click button Sign in", logging.INFO) == ( + "action", + "Кликаю по элементу на странице.", + "info", + ) + assert runner._browser_log_to_event("Cloudflare captcha detected", logging.INFO) == ( + "human_help", + "Похоже, на странице проверка или капча. Откройте экран браузера и помогите пройти её.", + "help", + ) + + +def test_model_action_formatter_reports_specific_actions(monkeypatch): + runner = _load_runner(monkeypatch) + + assert runner._format_model_action({"go_to_url": {"url": "https://music.yandex.ru/search?text=Дора"}}) == ( + "navigation", + "Перехожу на music.yandex.ru/search.", + ) + assert runner._format_model_action({"input_text": {"text": "Дора", "index": 4}}) == ( + "input", + "Ввожу в поле: Дора.", + ) + assert runner._format_model_action({"click_element_by_index": {"index": 12}}) is None + assert runner._format_model_action({"click": {"label": "Playback"}}) == ( + "action", + "Нажимаю: Playback.", + ) + assert runner._format_model_action({"send_keys": {"keys": "Enter"}}) == ( + "action", + "Нажимаю клавиши: Enter.", + ) + assert runner._format_model_action({"scroll_down": {"amount": 614}}) == ( + "action", + "Прокручиваю страницу вниз.", + ) + assert runner._format_model_action({"action": [{"click_element_by_index": {"index": 5}}]}) is None + assert runner._format_model_action({"action": "evaluate", "code": "document.querySelector('#q')"}) == ( + "action", + "Проверяю страницу скриптом.", + ) + + +def test_action_result_formatter_humanizes_browser_use_content(monkeypatch): + runner = _load_runner(monkeypatch) + + assert runner._format_action_result_content("Navigated to https://youtube.com") == ( + "navigation", + "Открыл страницу: youtube.com/.", + ) + assert runner._format_action_result_content("🔗 Navigated to https://music.vk.com") == ( + "navigation", + "Открыл страницу: music.vk.com/.", + ) + assert runner._format_action_result_content( + "🔗 Opened new tab with url https://yandex.ru/search/?text=ВК+Музыка+Дора" + ) == ( + "navigation", + "Открыл новую вкладку: yandex.ru/search/.", + ) + assert runner._format_action_result_content("https://music.yandex.ru/search?text=Дора") == ( + "navigation", + "Открыл страницу: music.yandex.ru/search.", + ) + assert runner._format_action_result_content('Clicked button "Accept all" aria-label=Accept the use') == ( + "action", + "Нажал: Accept all.", + ) + assert runner._format_action_result_content('Clicked div "Меню" id=header__burger_menu') == ( + "action", + "Нажал: Меню.", + ) + assert runner._format_action_result_content("Clicked a aria-label=Home") == ( + "action", + "Нажал: Home.", + ) + assert runner._format_action_result_content("Clicked button aria-label=Playback") == ( + "action", + "Нажал: Playback.", + ) + assert runner._format_action_result_content("Clicked button") == ( + "action", + "Нажал кнопку.", + ) + assert runner._format_action_result_content( + "Typed 'Дора' 💡 This is an autocomplete field. Wait for suggestions to appear" + ) == ( + "input", + "Ввёл в поиск: Дора.", + ) + assert runner._format_action_result_content("Waited for 3 seconds") == ( + "action", + "Жду загрузку: 3 сек.", + ) + assert runner._format_action_result_content("🔍 Scrolled up 1.5 pages") == ( + "action", + "Прокрутил страницу вверх на 1.5 pages.", + ) + assert runner._format_action_result_content("🔍 Scrolled down 613px") == ( + "action", + "Прокрутил страницу вниз на 613px.", + ) + assert runner._format_action_result_content( + "No elements found matching \"input[type='text'], [role='searchbox']\"." + ) == ( + "read", + "Не нашёл подходящее поле или кнопку на странице.", + ) + assert runner._format_action_result_content("Task Completed Successfully! I found band 'Дора'") == ( + "done", + "Browser-use сообщил, что задача выполнена.", + ) + + +def test_step_end_hook_emits_history_actions(monkeypatch): + runner = _load_runner(monkeypatch) + runner._RUNS.clear() + + class FakeHistory: + def model_actions(self): + return [ + {"go_to_url": {"url": "https://music.yandex.ru"}}, + {"input_text": {"text": "Дора", "index": 3}}, + {"click_element_by_index": {"index": 7}}, + ] + + def action_results(self): + return [] + + agent = types.SimpleNamespace(history=FakeHistory()) + hook = runner._make_step_end_hook("run-2", {"actions": 0, "results": 0}) + + asyncio.run(hook(agent)) + payload = runner._get_events("run-2", after=0) + + assert [event["message"] for event in payload["events"]] == [ + "Перехожу на music.yandex.ru/.", + "Ввожу в поле: Дора.", + ] + + asyncio.run(hook(agent)) + assert runner._get_events("run-2", after=2)["events"] == [] + + +def test_step_end_hook_emits_action_results(monkeypatch): + runner = _load_runner(monkeypatch) + runner._RUNS.clear() + + class FakeHistory: + def model_actions(self): + return () + + def action_results(self): + return ( + {"extracted_content": "Clicked button \"Play (k)\" aria-label=Play (k)"}, + {"error": "Element not found"}, + {"extracted_content": "Cloudflare captcha"}, + ) + + agent = types.SimpleNamespace(history=FakeHistory()) + hook = runner._make_step_end_hook("run-3", {"actions": 0, "results": 0}) + + asyncio.run(hook(agent)) + payload = runner._get_events("run-3", after=0) + + assert [event["phase"] for event in payload["events"]] == ["action", "error", "human_help"] + assert [event["message"] for event in payload["events"]] == [ + "Нажал: Play (k).", + "Ошибка действия: Element not found", + "Похоже, страница просит проверку человека. Откройте экран браузера и помогите пройти её.", + ] diff --git a/hermes_code/tools/browser_use_tool.py b/hermes_code/tools/browser_use_tool.py index 167b61b3..2daadcf0 100644 --- a/hermes_code/tools/browser_use_tool.py +++ b/hermes_code/tools/browser_use_tool.py @@ -1,24 +1,294 @@ +import asyncio import json +import logging import os +import socket +import uuid from urllib import error, request +from urllib import parse as urlparse + from tools.registry import registry +logger = logging.getLogger("hermes.browser_use_tool") -def run_browser_task(task): + +def _env_bool(name: str, default: bool) -> bool: + value = os.getenv(name) + if value is None: + return default + return value.strip().lower() not in {"0", "false", "no", "off"} + + +def _env_int(name: str, default: int, minimum: int = 1) -> int: + try: + return max(minimum, int(os.getenv(name, str(default)))) + except (TypeError, ValueError): + return default + + +def _env_float(name: str, default: float, minimum: float = 0.1) -> float: + try: + return max(minimum, float(os.getenv(name, str(default)))) + except (TypeError, ValueError): + return default + + +def _build_events_url(rpc_url: str) -> str: + override = os.getenv("BROWSER_USE_EVENTS_URL", "").strip() + if override: + return override + + parsed = urlparse.urlparse(rpc_url) + path = parsed.path or "/run" + if path.endswith("/run"): + path = path[: -len("/run")] + "/events" + else: + path = path.rstrip("/") + "/events" + return urlparse.urlunparse(parsed._replace(path=path, query="", fragment="")) + + +def _fetch_browser_events(events_url: str, run_id: str, after: int) -> dict: + query = urlparse.urlencode({"run_id": run_id, "after": str(after)}) + separator = "&" if urlparse.urlparse(events_url).query else "?" + url = f"{events_url}{separator}{query}" + with request.urlopen(url, timeout=5) as resp: + return json.loads(resp.read().decode("utf-8")) + + +def _emit_browser_progress(progress_callback, text: str, **metadata) -> None: + if not progress_callback or not text: + return + try: + progress_callback( + "internet_browser", + text, + {"_browser_live": True, **metadata}, + ) + except Exception as exc: + logger.debug("Browser progress callback failed: %s", exc) + + +def _format_event_for_progress(event: dict, vnc_url: str = "") -> str: + if not isinstance(event, dict): + return "" + + message = str(event.get("message") or "").strip() + if not message: + return "" + + phase = str(event.get("phase") or "") + level = str(event.get("level") or "info") + + if level == "help": + suffix = f" Экран: {vnc_url}" if vnc_url else "" + return f"🧑‍💻 Нужна помощь: {message}{suffix}" + if level == "error": + return f"⚠️ {message}" + if level == "done" or phase == "done": + return f"✅ {message}" + if phase == "view": + return f"🌐 {message}" + if phase == "page": + return f"📍 {message}" + if phase == "navigation": + return f"➡️ {message}" + if phase == "action": + return f"🖱️ {message}" + if phase == "input": + return f"⌨️ {message}" + if phase == "read": + return f"📖 {message}" + return f"🌐 {message}" + + +def _emit_unseen_events( + events, + last_seq: dict, + progress_callback, + vnc_url: str, + *, + max_events: int, +) -> int: + if not isinstance(events, list): + return 0 + + emitted = 0 + for event in events: + if not isinstance(event, dict): + continue + seq = int(event.get("seq") or 0) + if seq and seq <= int(last_seq.get("value", 0)): + continue + + level = str(event.get("level") or "info") + high_priority = level in {"help", "error", "done"} or event.get("phase") in {"start", "view"} + if emitted >= max_events and not high_priority: + if seq: + last_seq["value"] = max(int(last_seq.get("value", 0)), seq) + continue + + text = _format_event_for_progress(event, vnc_url=vnc_url) + if text: + _emit_browser_progress(progress_callback, text, browser_event=event) + emitted += 1 + if seq: + last_seq["value"] = max(int(last_seq.get("value", 0)), seq) + return emitted + + +async def _poll_live_events( + events_url: str, + run_id: str, + progress_callback, + stop_event: asyncio.Event, + vnc_url: str, + last_seq: dict, +) -> None: + if not progress_callback or not events_url or not run_id: + return + + interval = _env_float("BROWSER_LIVE_LOG_POLL_INTERVAL", 1.5) + max_events = _env_int("BROWSER_LIVE_LOG_MAX_EVENTS", 40) + failures = 0 + misses = 0 + + while not stop_event.is_set(): + try: + data = await asyncio.to_thread( + _fetch_browser_events, + events_url, + run_id, + int(last_seq.get("value", 0)), + ) + except error.HTTPError as exc: + if exc.code == 404: + misses += 1 + if misses >= 4: + return + else: + failures += 1 + if failures >= 4: + return + except Exception as exc: + failures += 1 + if failures >= 4: + logger.debug("Stopping browser live event polling after repeated failures: %s", exc) + return + else: + failures = 0 + misses = 0 + _emit_unseen_events( + data.get("events", []), + last_seq, + progress_callback, + vnc_url, + max_events=max_events, + ) + if data.get("done"): + return + + try: + await asyncio.wait_for(stop_event.wait(), timeout=interval) + except asyncio.TimeoutError: + continue + + +async def run_browser_task( + task: str, + honcho_session_key: str = None, + progress_callback=None, +): if not task or not str(task).strip(): return json.dumps({"success": False, "error": "Task is required"}, ensure_ascii=False) - rpc_url = os.getenv("BROWSER_USE_RPC_URL", "http://browser:8787/run") - timeout_sec = int(os.getenv("BROWSER_USE_RPC_TIMEOUT", "900")) - payload = json.dumps({"task": task}).encode("utf-8") - req = request.Request(rpc_url, data=payload, headers={"Content-Type": "application/json"}, method="POST") + browser_host = "browser" + browser_port = 9222 + vnc_url = os.getenv("BROWSER_VIEW_URL", "") try: - with request.urlopen(req, timeout=timeout_sec) as resp: - body = resp.read().decode("utf-8") - return body + browser_ip = socket.gethostbyname(browser_host) + cdp_url = f"http://{browser_ip}:{browser_port}" + except Exception: + cdp_url = f"http://{browser_host}:{browser_port}" + + rpc_url = os.getenv("BROWSER_USE_RPC_URL", "http://browser:8787/run") + events_url = _build_events_url(rpc_url) + timeout_sec = int(os.getenv("BROWSER_USE_RPC_TIMEOUT", "900")) + run_id = uuid.uuid4().hex + payload = json.dumps({"task": task.strip(), "run_id": run_id}).encode("utf-8") + req = request.Request(rpc_url, data=payload, headers={"Content-Type": "application/json"}, method="POST") + live_logs_enabled = _env_bool("BROWSER_LIVE_LOGS", True) and bool(progress_callback) + stop_event = asyncio.Event() + last_seq = {"value": 0} + poll_task = None + + if live_logs_enabled: + if vnc_url: + _emit_browser_progress( + progress_callback, + f"Открыл браузерный экран: {vnc_url}", + run_id=run_id, + cdp_url=cdp_url, + ) + else: + _emit_browser_progress( + progress_callback, + "Запускаю browser-use. Экран noVNC не настроен: задайте BROWSER_VIEW_URL.", + run_id=run_id, + cdp_url=cdp_url, + ) + poll_task = asyncio.create_task( + _poll_live_events( + events_url, + run_id, + progress_callback, + stop_event, + vnc_url, + last_seq, + ) + ) + + try: + def _do_rpc(): + with request.urlopen(req, timeout=timeout_sec) as resp: + return resp.read().decode("utf-8") + + body = await asyncio.to_thread(_do_rpc) + + try: + resp_json = json.loads(body) + if isinstance(resp_json, dict): + if "vnc_url" not in resp_json: + resp_json["vnc_url"] = vnc_url + resp_json.setdefault("run_id", run_id) + + if live_logs_enabled: + _emit_unseen_events( + resp_json.get("events", []), + last_seq, + progress_callback, + vnc_url, + max_events=_env_int("BROWSER_LIVE_LOG_MAX_EVENTS", 40), + ) + if resp_json.get("success"): + _emit_browser_progress(progress_callback, "✅ Browser-use завершил задачу.", run_id=run_id) + else: + _emit_browser_progress( + progress_callback, + f"⚠️ Browser-use завершился с ошибкой: {resp_json.get('error', 'unknown error')}", + run_id=run_id, + ) + + return json.dumps(resp_json, ensure_ascii=False) + except Exception: + pass + + return body + except error.HTTPError as http_err: body = http_err.read().decode("utf-8", errors="replace") + if live_logs_enabled: + _emit_browser_progress(progress_callback, f"⚠️ Ошибка browser-use RPC: HTTP {http_err.code}", run_id=run_id) return json.dumps( { "success": False, @@ -28,6 +298,8 @@ def run_browser_task(task): ensure_ascii=False, ) except Exception as err: + if live_logs_enabled: + _emit_browser_progress(progress_callback, f"⚠️ Ошибка browser-use RPC: {err}", run_id=run_id) return json.dumps( { "success": False, @@ -35,31 +307,37 @@ def run_browser_task(task): }, ensure_ascii=False, ) + finally: + stop_event.set() + if poll_task: + try: + await asyncio.wait_for(poll_task, timeout=3) + except asyncio.TimeoutError: + poll_task.cancel() + except Exception as exc: + logger.debug("Browser live polling finished with error: %s", exc) registry.register( name="internet_browser", - toolset="browse_cmd", + toolset="browse_cmd", schema={ "name": "internet_browser", - "description": ( - "ГЛАВНЫЙ ИНСТРУМЕНТ ДЛЯ ВЕБ-СЕРФИНГА. Вызывай этот инструмент НАПРЯМУЮ (через стандартный tool call/function call). " - "КАТЕГОРИЧЕСКИ ЗАПРЕЩЕНО использовать `execute_code` или `delegate_task` для работы с браузером. " - "Не пиши Python-скрипты! Просто передай в этот инструмент параметр `task`. " - "Используй для любых задач в интернете: поиск товаров (Wildberries, Ozon), чтение статей, клики, навигация." - ), + "description": "ГЛАВНЫЙ ИНСТРУМЕНТ ДЛЯ ВЕБ-СЕРФИНГА. Вызывай напрямую.", "parameters": { "type": "object", "properties": { - "task": { - "type": "string", - "description": "Подробная задача на естественном языке. Например: 'Зайди на wildberries.ru, найди черную футболку и верни цену'." - } + "task": {"type": "string", "description": "Задача для браузера"} }, "required": ["task"] } }, - - handler=lambda args, **kw: run_browser_task(args.get("task")), + handler=lambda args, **kw: asyncio.run( + run_browser_task( + args.get("task", ""), + kw.get("honcho_session_key", ""), + kw.get("tool_progress_callback"), + ) + ), emoji="🌐", -) \ No newline at end of file +)