add new tool: to_captcha

This commit is contained in:
VladislavIlin7 2026-05-17 01:03:55 +03:00
parent 8f86dbbdac
commit 4852345bf6
12 changed files with 716 additions and 35 deletions

View file

@ -0,0 +1,233 @@
"""to_captcha custom action для browser-use.
Когда LLM-агент видит на странице капчу (reCAPTCHA / hCaptcha / Cloudflare Turnstile),
он вызывает action `to_captcha`. Action:
1. Уведомляет browser-api (POST /api/browser/tasks/{task_id}/captcha/notify),
передавая URL noVNC-просмотрщика, чтобы пользователь решил капчу руками.
2. Параллельно ОПРАШИВАЕТ DOM каждые ~1.5 сек:
* iframe reCAPTCHA/hCaptcha/Turnstile исчез
* скрытый textarea/input с токеном заполнен
Как только один из критериев сработал POST /captcha/solved (detector=dom_poller),
возвращает управление browser-use Agent. Агент продолжает с того же шага,
где остановился, потому что browser-use держит общий browser context.
3. Если за timeout_seconds капчу автодетектор не увидел решённой
поднимает captcha_state в timeout_prompt (через API), даёт пользователю шанс
ответить «продлить» (POST /captcha/extend) или «отменить» (POST /captcha/abort).
4. На abort action возвращает success=False Agent получит сигнал об ошибке.
Пользовательского подтверждения «готово» НЕТ. Решение засекается только DOM-детектором
либо внешним вызовом /captcha/solved.
"""
from __future__ import annotations
import asyncio
import json
import os
import time
from typing import Any
from urllib import error, request
CAPTCHA_KIND_DETECTORS: tuple[tuple[str, str], ...] = (
("recaptcha_v2", "() => !!document.querySelector('iframe[src*=\"recaptcha\"]')"),
("hcaptcha", "() => !!document.querySelector('iframe[src*=\"hcaptcha.com\"]')"),
("turnstile", "() => !!document.querySelector('iframe[src*=\"challenges.cloudflare.com\"]')"),
)
CAPTCHA_TOKEN_CHECKS: tuple[str, ...] = (
"() => { const el = document.querySelector('textarea[name=\"g-recaptcha-response\"]'); return !!(el && el.value && el.value.length > 20); }",
"() => { const el = document.querySelector('textarea[name=\"h-captcha-response\"]'); return !!(el && el.value && el.value.length > 20); }",
"() => { const el = document.querySelector('input[name=\"cf-turnstile-response\"]'); return !!(el && el.value && el.value.length > 5); }",
)
# Селекторы, по которым считаем что капча на странице ещё видна.
CAPTCHA_PRESENCE_CHECK = (
"() => !!document.querySelector("
"'iframe[src*=\"recaptcha\"], iframe[src*=\"hcaptcha.com\"], iframe[src*=\"challenges.cloudflare.com\"]'"
")"
)
async def _safe_eval(page: Any, js: str) -> bool:
"""Безопасно выполняет JS-проверку, прячет ошибки навигации/закрытой страницы."""
try:
result = await page.evaluate(js)
return bool(result)
except Exception:
return False
async def detect_captcha_kind(page: Any) -> str | None:
for name, js in CAPTCHA_KIND_DETECTORS:
if await _safe_eval(page, js):
return name
if await _safe_eval(page, CAPTCHA_PRESENCE_CHECK):
return "unknown"
return None
async def is_captcha_solved(page: Any) -> bool:
"""Капча считается решённой, если ни одного captcha-iframe нет, ИЛИ хотя бы один токен заполнен."""
for js in CAPTCHA_TOKEN_CHECKS:
if await _safe_eval(page, js):
return True
still_present = await _safe_eval(page, CAPTCHA_PRESENCE_CHECK)
return not still_present
def _http_post(url: str, payload: dict[str, Any] | None = None, timeout: float = 10.0) -> dict[str, Any]:
body = json.dumps(payload or {}).encode("utf-8")
req = request.Request(url, data=body, headers={"Content-Type": "application/json"}, method="POST")
try:
with request.urlopen(req, timeout=timeout) as resp:
raw = resp.read().decode("utf-8")
return json.loads(raw) if raw else {}
except error.HTTPError as exc:
raw = exc.read().decode("utf-8", errors="replace") if exc.fp else ""
return {"_http_error": exc.code, "_body": raw}
except Exception as exc:
return {"_error": str(exc)}
def _http_get(url: str, timeout: float = 35.0) -> dict[str, Any]:
req = request.Request(url, method="GET")
try:
with request.urlopen(req, timeout=timeout) as resp:
raw = resp.read().decode("utf-8")
return json.loads(raw) if raw else {}
except error.HTTPError as exc:
raw = exc.read().decode("utf-8", errors="replace") if exc.fp else ""
return {"_http_error": exc.code, "_body": raw}
except Exception as exc:
return {"_error": str(exc)}
async def run_to_captcha(
page: Any,
reason: str | None = None,
*,
task_id: str | None = None,
api_base: str | None = None,
view_url: str | None = None,
timeout_seconds: int | None = None,
poll_interval: float = 1.5,
) -> dict[str, Any]:
"""Основной сценарий. Вызывается из custom action browser-use.
Возвращает dict вида {"success": bool, "captcha_kind": str, "resolved_by": str|None, "error": str|None}.
"""
resolved_task_id = task_id or os.getenv("CURRENT_TASK_ID")
resolved_api_base = (api_base or os.getenv("BROWSER_API_INTERNAL_URL", "http://browser-api:8088/api/browser")).rstrip("/")
resolved_view_url = view_url or os.getenv("BROWSER_VIEW_URL", "")
resolved_timeout = int(timeout_seconds if timeout_seconds is not None else os.getenv("CAPTCHA_TIMEOUT_SECONDS", "300"))
if not resolved_task_id:
return {"success": False, "error": "to_captcha: CURRENT_TASK_ID is not set; tool cannot reach the API"}
captcha_kind = await detect_captcha_kind(page) or "unknown"
notify_resp = await asyncio.to_thread(
_http_post,
f"{resolved_api_base}/tasks/{resolved_task_id}/captcha/notify",
{
"browser_view_url": resolved_view_url or None,
"captcha_kind": captcha_kind,
"reason": reason,
"timeout_seconds": resolved_timeout,
},
)
if notify_resp.get("_error") or notify_resp.get("_http_error"):
return {
"success": False,
"captcha_kind": captcha_kind,
"error": f"to_captcha: notify failed: {notify_resp}",
}
deadline = time.time() + resolved_timeout
prompted_user = False
while True:
# 1) DOM-проверка: решилось ли само?
if await is_captcha_solved(page):
await asyncio.to_thread(
_http_post,
f"{resolved_api_base}/tasks/{resolved_task_id}/captcha/solved",
{"detector": "dom_poller"},
)
return {
"success": True,
"captcha_kind": captcha_kind,
"resolved_by": "dom_poller",
"browser_view_url": resolved_view_url,
}
# 2) Статус из API: вдруг внешний вызов abort/extend/solved
status = await asyncio.to_thread(
_http_get,
f"{resolved_api_base}/tasks/{resolved_task_id}/captcha",
)
state = (status or {}).get("state")
if state == "solved":
return {
"success": True,
"captcha_kind": captcha_kind,
"resolved_by": "external",
"browser_view_url": resolved_view_url,
}
if state == "aborted":
return {
"success": False,
"captcha_kind": captcha_kind,
"error": "to_captcha: aborted by user",
"browser_view_url": resolved_view_url,
}
if state == "extended":
api_deadline = (status or {}).get("deadline")
if isinstance(api_deadline, (int, float)) and api_deadline > deadline:
deadline = float(api_deadline)
prompted_user = False
# 3) Таймаут — спрашиваем пользователя «продлить/отменить» один раз
if time.time() >= deadline:
if not prompted_user:
await asyncio.to_thread(
_http_post,
f"{resolved_api_base}/tasks/{resolved_task_id}/captcha/timeout-prompt",
{},
)
prompted_user = True
deadline = time.time() + min(60, resolved_timeout)
continue
return {
"success": False,
"captcha_kind": captcha_kind,
"error": "to_captcha: timeout (no user response)",
"browser_view_url": resolved_view_url,
}
await asyncio.sleep(poll_interval)
def register(controller: Any) -> None:
"""Регистрирует action `to_captcha` на переданном browser-use Controller."""
@controller.action(
"Pause the run, ask the human to solve the on-page CAPTCHA via the live browser view, "
"and resume automatically once the DOM detector sees the challenge gone. "
"Call this ONLY when the current page is blocked by reCAPTCHA, hCaptcha or Cloudflare Turnstile."
)
async def to_captcha(reason: str = "", browser=None, page=None) -> dict[str, Any]:
actual_page = page
if actual_page is None and browser is not None:
get_page = getattr(browser, "get_current_page", None) or getattr(browser, "get_page", None)
if callable(get_page):
actual_page = get_page()
if asyncio.iscoroutine(actual_page):
actual_page = await actual_page
if actual_page is None:
return {"success": False, "error": "to_captcha: browser-use did not provide a page"}
return await run_to_captcha(actual_page, reason=reason or None)
return to_captcha