BrowserUse_and_ComputerUse_.../browser_env/tools/captcha_tool.py

233 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""to_captcha custom action для browser-use.
Когда LLM-агент видит на странице капчу (reCAPTCHA / hCaptcha / Cloudflare Turnstile),
он вызывает action `to_captcha`. Action:
1. Уведомляет browser-api (POST /api/browser/tasks/{task_id}/captcha/notify),
передавая URL noVNC-просмотрщика, чтобы пользователь решил капчу руками.
2. Параллельно ОПРАШИВАЕТ DOM каждые ~1.5 сек:
* iframe reCAPTCHA/hCaptcha/Turnstile исчез
* скрытый textarea/input с токеном заполнен
Как только один из критериев сработал — POST /captcha/solved (detector=dom_poller),
возвращает управление browser-use Agent. Агент продолжает с того же шага,
где остановился, потому что browser-use держит общий browser context.
3. Если за timeout_seconds капчу автодетектор не увидел решённой —
поднимает captcha_state в timeout_prompt (через API), даёт пользователю шанс
ответить «продлить» (POST /captcha/extend) или «отменить» (POST /captcha/abort).
4. На abort action возвращает success=False — Agent получит сигнал об ошибке.
Пользовательского подтверждения «готово» НЕТ. Решение засекается только DOM-детектором
либо внешним вызовом /captcha/solved.
"""
from __future__ import annotations
import asyncio
import json
import os
import time
from typing import Any
from urllib import error, request
CAPTCHA_KIND_DETECTORS: tuple[tuple[str, str], ...] = (
("recaptcha_v2", "() => !!document.querySelector('iframe[src*=\"recaptcha\"]')"),
("hcaptcha", "() => !!document.querySelector('iframe[src*=\"hcaptcha.com\"]')"),
("turnstile", "() => !!document.querySelector('iframe[src*=\"challenges.cloudflare.com\"]')"),
)
CAPTCHA_TOKEN_CHECKS: tuple[str, ...] = (
"() => { const el = document.querySelector('textarea[name=\"g-recaptcha-response\"]'); return !!(el && el.value && el.value.length > 20); }",
"() => { const el = document.querySelector('textarea[name=\"h-captcha-response\"]'); return !!(el && el.value && el.value.length > 20); }",
"() => { const el = document.querySelector('input[name=\"cf-turnstile-response\"]'); return !!(el && el.value && el.value.length > 5); }",
)
# Селекторы, по которым считаем что капча на странице ещё видна.
CAPTCHA_PRESENCE_CHECK = (
"() => !!document.querySelector("
"'iframe[src*=\"recaptcha\"], iframe[src*=\"hcaptcha.com\"], iframe[src*=\"challenges.cloudflare.com\"]'"
")"
)
async def _safe_eval(page: Any, js: str) -> bool:
"""Безопасно выполняет JS-проверку, прячет ошибки навигации/закрытой страницы."""
try:
result = await page.evaluate(js)
return bool(result)
except Exception:
return False
async def detect_captcha_kind(page: Any) -> str | None:
for name, js in CAPTCHA_KIND_DETECTORS:
if await _safe_eval(page, js):
return name
if await _safe_eval(page, CAPTCHA_PRESENCE_CHECK):
return "unknown"
return None
async def is_captcha_solved(page: Any) -> bool:
"""Капча считается решённой, если ни одного captcha-iframe нет, ИЛИ хотя бы один токен заполнен."""
for js in CAPTCHA_TOKEN_CHECKS:
if await _safe_eval(page, js):
return True
still_present = await _safe_eval(page, CAPTCHA_PRESENCE_CHECK)
return not still_present
def _http_post(url: str, payload: dict[str, Any] | None = None, timeout: float = 10.0) -> dict[str, Any]:
body = json.dumps(payload or {}).encode("utf-8")
req = request.Request(url, data=body, headers={"Content-Type": "application/json"}, method="POST")
try:
with request.urlopen(req, timeout=timeout) as resp:
raw = resp.read().decode("utf-8")
return json.loads(raw) if raw else {}
except error.HTTPError as exc:
raw = exc.read().decode("utf-8", errors="replace") if exc.fp else ""
return {"_http_error": exc.code, "_body": raw}
except Exception as exc:
return {"_error": str(exc)}
def _http_get(url: str, timeout: float = 35.0) -> dict[str, Any]:
req = request.Request(url, method="GET")
try:
with request.urlopen(req, timeout=timeout) as resp:
raw = resp.read().decode("utf-8")
return json.loads(raw) if raw else {}
except error.HTTPError as exc:
raw = exc.read().decode("utf-8", errors="replace") if exc.fp else ""
return {"_http_error": exc.code, "_body": raw}
except Exception as exc:
return {"_error": str(exc)}
async def run_to_captcha(
page: Any,
reason: str | None = None,
*,
task_id: str | None = None,
api_base: str | None = None,
view_url: str | None = None,
timeout_seconds: int | None = None,
poll_interval: float = 1.5,
) -> dict[str, Any]:
"""Основной сценарий. Вызывается из custom action browser-use.
Возвращает dict вида {"success": bool, "captcha_kind": str, "resolved_by": str|None, "error": str|None}.
"""
resolved_task_id = task_id or os.getenv("CURRENT_TASK_ID")
resolved_api_base = (api_base or os.getenv("BROWSER_API_INTERNAL_URL", "http://browser-api:8088/api/browser")).rstrip("/")
resolved_view_url = view_url or os.getenv("BROWSER_VIEW_URL", "")
resolved_timeout = int(timeout_seconds if timeout_seconds is not None else os.getenv("CAPTCHA_TIMEOUT_SECONDS", "300"))
if not resolved_task_id:
return {"success": False, "error": "to_captcha: CURRENT_TASK_ID is not set; tool cannot reach the API"}
captcha_kind = await detect_captcha_kind(page) or "unknown"
notify_resp = await asyncio.to_thread(
_http_post,
f"{resolved_api_base}/tasks/{resolved_task_id}/captcha/notify",
{
"browser_view_url": resolved_view_url or None,
"captcha_kind": captcha_kind,
"reason": reason,
"timeout_seconds": resolved_timeout,
},
)
if notify_resp.get("_error") or notify_resp.get("_http_error"):
return {
"success": False,
"captcha_kind": captcha_kind,
"error": f"to_captcha: notify failed: {notify_resp}",
}
deadline = time.time() + resolved_timeout
prompted_user = False
while True:
# 1) DOM-проверка: решилось ли само?
if await is_captcha_solved(page):
await asyncio.to_thread(
_http_post,
f"{resolved_api_base}/tasks/{resolved_task_id}/captcha/solved",
{"detector": "dom_poller"},
)
return {
"success": True,
"captcha_kind": captcha_kind,
"resolved_by": "dom_poller",
"browser_view_url": resolved_view_url,
}
# 2) Статус из API: вдруг внешний вызов abort/extend/solved
status = await asyncio.to_thread(
_http_get,
f"{resolved_api_base}/tasks/{resolved_task_id}/captcha",
)
state = (status or {}).get("state")
if state == "solved":
return {
"success": True,
"captcha_kind": captcha_kind,
"resolved_by": "external",
"browser_view_url": resolved_view_url,
}
if state == "aborted":
return {
"success": False,
"captcha_kind": captcha_kind,
"error": "to_captcha: aborted by user",
"browser_view_url": resolved_view_url,
}
if state == "extended":
api_deadline = (status or {}).get("deadline")
if isinstance(api_deadline, (int, float)) and api_deadline > deadline:
deadline = float(api_deadline)
prompted_user = False
# 3) Таймаут — спрашиваем пользователя «продлить/отменить» один раз
if time.time() >= deadline:
if not prompted_user:
await asyncio.to_thread(
_http_post,
f"{resolved_api_base}/tasks/{resolved_task_id}/captcha/timeout-prompt",
{},
)
prompted_user = True
deadline = time.time() + min(60, resolved_timeout)
continue
return {
"success": False,
"captcha_kind": captcha_kind,
"error": "to_captcha: timeout (no user response)",
"browser_view_url": resolved_view_url,
}
await asyncio.sleep(poll_interval)
def register(controller: Any) -> None:
"""Регистрирует action `to_captcha` на переданном browser-use Controller."""
@controller.action(
"Pause the run, ask the human to solve the on-page CAPTCHA via the live browser view, "
"and resume automatically once the DOM detector sees the challenge gone. "
"Call this ONLY when the current page is blocked by reCAPTCHA, hCaptcha or Cloudflare Turnstile."
)
async def to_captcha(reason: str = "", browser=None, page=None) -> dict[str, Any]:
actual_page = page
if actual_page is None and browser is not None:
get_page = getattr(browser, "get_current_page", None) or getattr(browser, "get_page", None)
if callable(get_page):
actual_page = get_page()
if asyncio.iscoroutine(actual_page):
actual_page = await actual_page
if actual_page is None:
return {"success": False, "error": "to_captcha: browser-use did not provide a page"}
return await run_to_captcha(actual_page, reason=reason or None)
return to_captcha