"""to_captcha custom action для browser-use. Когда LLM-агент видит на странице капчу (reCAPTCHA / hCaptcha / Cloudflare Turnstile), он вызывает action `to_captcha`. Action: 1. Уведомляет browser-api (POST /api/browser/tasks/{task_id}/captcha/notify), передавая URL noVNC-просмотрщика, чтобы пользователь решил капчу руками. 2. Параллельно ОПРАШИВАЕТ DOM каждые ~1.5 сек: * iframe reCAPTCHA/hCaptcha/Turnstile исчез * скрытый textarea/input с токеном заполнен Как только один из критериев сработал — POST /captcha/solved (detector=dom_poller), возвращает управление browser-use Agent. Агент продолжает с того же шага, где остановился, потому что browser-use держит общий browser context. 3. Если за timeout_seconds капчу автодетектор не увидел решённой — поднимает captcha_state в timeout_prompt (через API), даёт пользователю шанс ответить «продлить» (POST /captcha/extend) или «отменить» (POST /captcha/abort). 4. На abort action возвращает success=False — Agent получит сигнал об ошибке. Пользовательского подтверждения «готово» НЕТ. Решение засекается только DOM-детектором либо внешним вызовом /captcha/solved. """ from __future__ import annotations import asyncio import json import os import time from typing import Any from urllib import error, request CAPTCHA_KIND_DETECTORS: tuple[tuple[str, str], ...] = ( ("recaptcha_v2", "() => !!document.querySelector('iframe[src*=\"recaptcha\"]')"), ("hcaptcha", "() => !!document.querySelector('iframe[src*=\"hcaptcha.com\"]')"), ("turnstile", "() => !!document.querySelector('iframe[src*=\"challenges.cloudflare.com\"]')"), ) CAPTCHA_TOKEN_CHECKS: tuple[str, ...] = ( "() => { const el = document.querySelector('textarea[name=\"g-recaptcha-response\"]'); return !!(el && el.value && el.value.length > 20); }", "() => { const el = document.querySelector('textarea[name=\"h-captcha-response\"]'); return !!(el && el.value && el.value.length > 20); }", "() => { const el = document.querySelector('input[name=\"cf-turnstile-response\"]'); return !!(el && el.value && el.value.length > 5); }", ) # Селекторы, по которым считаем что капча на странице ещё видна. CAPTCHA_PRESENCE_CHECK = ( "() => !!document.querySelector(" "'iframe[src*=\"recaptcha\"], iframe[src*=\"hcaptcha.com\"], iframe[src*=\"challenges.cloudflare.com\"]'" ")" ) async def _safe_eval(page: Any, js: str) -> bool: """Безопасно выполняет JS-проверку, прячет ошибки навигации/закрытой страницы.""" try: result = await page.evaluate(js) return bool(result) except Exception: return False async def detect_captcha_kind(page: Any) -> str | None: for name, js in CAPTCHA_KIND_DETECTORS: if await _safe_eval(page, js): return name if await _safe_eval(page, CAPTCHA_PRESENCE_CHECK): return "unknown" return None async def is_captcha_solved(page: Any) -> bool: """Капча считается решённой, если ни одного captcha-iframe нет, ИЛИ хотя бы один токен заполнен.""" for js in CAPTCHA_TOKEN_CHECKS: if await _safe_eval(page, js): return True still_present = await _safe_eval(page, CAPTCHA_PRESENCE_CHECK) return not still_present def _http_post(url: str, payload: dict[str, Any] | None = None, timeout: float = 10.0) -> dict[str, Any]: body = json.dumps(payload or {}).encode("utf-8") req = request.Request(url, data=body, headers={"Content-Type": "application/json"}, method="POST") try: with request.urlopen(req, timeout=timeout) as resp: raw = resp.read().decode("utf-8") return json.loads(raw) if raw else {} except error.HTTPError as exc: raw = exc.read().decode("utf-8", errors="replace") if exc.fp else "" return {"_http_error": exc.code, "_body": raw} except Exception as exc: return {"_error": str(exc)} def _http_get(url: str, timeout: float = 35.0) -> dict[str, Any]: req = request.Request(url, method="GET") try: with request.urlopen(req, timeout=timeout) as resp: raw = resp.read().decode("utf-8") return json.loads(raw) if raw else {} except error.HTTPError as exc: raw = exc.read().decode("utf-8", errors="replace") if exc.fp else "" return {"_http_error": exc.code, "_body": raw} except Exception as exc: return {"_error": str(exc)} async def run_to_captcha( page: Any, reason: str | None = None, *, task_id: str | None = None, api_base: str | None = None, view_url: str | None = None, timeout_seconds: int | None = None, poll_interval: float = 1.5, ) -> dict[str, Any]: """Основной сценарий. Вызывается из custom action browser-use. Возвращает dict вида {"success": bool, "captcha_kind": str, "resolved_by": str|None, "error": str|None}. """ resolved_task_id = task_id or os.getenv("CURRENT_TASK_ID") resolved_api_base = (api_base or os.getenv("BROWSER_API_INTERNAL_URL", "http://browser-api:8088/api/browser")).rstrip("/") resolved_view_url = view_url or os.getenv("BROWSER_VIEW_URL", "") resolved_timeout = int(timeout_seconds if timeout_seconds is not None else os.getenv("CAPTCHA_TIMEOUT_SECONDS", "300")) if not resolved_task_id: return {"success": False, "error": "to_captcha: CURRENT_TASK_ID is not set; tool cannot reach the API"} captcha_kind = await detect_captcha_kind(page) or "unknown" notify_resp = await asyncio.to_thread( _http_post, f"{resolved_api_base}/tasks/{resolved_task_id}/captcha/notify", { "browser_view_url": resolved_view_url or None, "captcha_kind": captcha_kind, "reason": reason, "timeout_seconds": resolved_timeout, }, ) if notify_resp.get("_error") or notify_resp.get("_http_error"): return { "success": False, "captcha_kind": captcha_kind, "error": f"to_captcha: notify failed: {notify_resp}", } deadline = time.time() + resolved_timeout prompted_user = False while True: # 1) DOM-проверка: решилось ли само? if await is_captcha_solved(page): await asyncio.to_thread( _http_post, f"{resolved_api_base}/tasks/{resolved_task_id}/captcha/solved", {"detector": "dom_poller"}, ) return { "success": True, "captcha_kind": captcha_kind, "resolved_by": "dom_poller", "browser_view_url": resolved_view_url, } # 2) Статус из API: вдруг внешний вызов abort/extend/solved status = await asyncio.to_thread( _http_get, f"{resolved_api_base}/tasks/{resolved_task_id}/captcha", ) state = (status or {}).get("state") if state == "solved": return { "success": True, "captcha_kind": captcha_kind, "resolved_by": "external", "browser_view_url": resolved_view_url, } if state == "aborted": return { "success": False, "captcha_kind": captcha_kind, "error": "to_captcha: aborted by user", "browser_view_url": resolved_view_url, } if state == "extended": api_deadline = (status or {}).get("deadline") if isinstance(api_deadline, (int, float)) and api_deadline > deadline: deadline = float(api_deadline) prompted_user = False # 3) Таймаут — спрашиваем пользователя «продлить/отменить» один раз if time.time() >= deadline: if not prompted_user: await asyncio.to_thread( _http_post, f"{resolved_api_base}/tasks/{resolved_task_id}/captcha/timeout-prompt", {}, ) prompted_user = True deadline = time.time() + min(60, resolved_timeout) continue return { "success": False, "captcha_kind": captcha_kind, "error": "to_captcha: timeout (no user response)", "browser_view_url": resolved_view_url, } await asyncio.sleep(poll_interval) def register(controller: Any) -> None: """Регистрирует action `to_captcha` на переданном browser-use Controller.""" @controller.action( "Pause the run, ask the human to solve the on-page CAPTCHA via the live browser view, " "and resume automatically once the DOM detector sees the challenge gone. " "Call this ONLY when the current page is blocked by reCAPTCHA, hCaptcha or Cloudflare Turnstile." ) async def to_captcha(reason: str = "", browser=None, page=None) -> dict[str, Any]: actual_page = page if actual_page is None and browser is not None: get_page = getattr(browser, "get_current_page", None) or getattr(browser, "get_page", None) if callable(get_page): actual_page = get_page() if asyncio.iscoroutine(actual_page): actual_page = await actual_page if actual_page is None: return {"success": False, "error": "to_captcha: browser-use did not provide a page"} return await run_to_captcha(actual_page, reason=reason or None) return to_captcha