233 lines
10 KiB
Python
233 lines
10 KiB
Python
"""to_captcha custom action для browser-use.
|
||
|
||
Когда LLM-агент видит на странице капчу (reCAPTCHA / hCaptcha / Cloudflare Turnstile),
|
||
он вызывает action `to_captcha`. Action:
|
||
1. Уведомляет browser-api (POST /api/browser/tasks/{task_id}/captcha/notify),
|
||
передавая URL noVNC-просмотрщика, чтобы пользователь решил капчу руками.
|
||
2. Параллельно ОПРАШИВАЕТ DOM каждые ~1.5 сек:
|
||
* iframe reCAPTCHA/hCaptcha/Turnstile исчез
|
||
* скрытый textarea/input с токеном заполнен
|
||
Как только один из критериев сработал — POST /captcha/solved (detector=dom_poller),
|
||
возвращает управление browser-use Agent. Агент продолжает с того же шага,
|
||
где остановился, потому что browser-use держит общий browser context.
|
||
3. Если за timeout_seconds капчу автодетектор не увидел решённой —
|
||
поднимает captcha_state в timeout_prompt (через API), даёт пользователю шанс
|
||
ответить «продлить» (POST /captcha/extend) или «отменить» (POST /captcha/abort).
|
||
4. На abort action возвращает success=False — Agent получит сигнал об ошибке.
|
||
|
||
Пользовательского подтверждения «готово» НЕТ. Решение засекается только DOM-детектором
|
||
либо внешним вызовом /captcha/solved.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import json
|
||
import os
|
||
import time
|
||
from typing import Any
|
||
from urllib import error, request
|
||
|
||
|
||
CAPTCHA_KIND_DETECTORS: tuple[tuple[str, str], ...] = (
|
||
("recaptcha_v2", "() => !!document.querySelector('iframe[src*=\"recaptcha\"]')"),
|
||
("hcaptcha", "() => !!document.querySelector('iframe[src*=\"hcaptcha.com\"]')"),
|
||
("turnstile", "() => !!document.querySelector('iframe[src*=\"challenges.cloudflare.com\"]')"),
|
||
)
|
||
|
||
CAPTCHA_TOKEN_CHECKS: tuple[str, ...] = (
|
||
"() => { const el = document.querySelector('textarea[name=\"g-recaptcha-response\"]'); return !!(el && el.value && el.value.length > 20); }",
|
||
"() => { const el = document.querySelector('textarea[name=\"h-captcha-response\"]'); return !!(el && el.value && el.value.length > 20); }",
|
||
"() => { const el = document.querySelector('input[name=\"cf-turnstile-response\"]'); return !!(el && el.value && el.value.length > 5); }",
|
||
)
|
||
|
||
# Селекторы, по которым считаем что капча на странице ещё видна.
|
||
CAPTCHA_PRESENCE_CHECK = (
|
||
"() => !!document.querySelector("
|
||
"'iframe[src*=\"recaptcha\"], iframe[src*=\"hcaptcha.com\"], iframe[src*=\"challenges.cloudflare.com\"]'"
|
||
")"
|
||
)
|
||
|
||
|
||
async def _safe_eval(page: Any, js: str) -> bool:
|
||
"""Безопасно выполняет JS-проверку, прячет ошибки навигации/закрытой страницы."""
|
||
try:
|
||
result = await page.evaluate(js)
|
||
return bool(result)
|
||
except Exception:
|
||
return False
|
||
|
||
|
||
async def detect_captcha_kind(page: Any) -> str | None:
|
||
for name, js in CAPTCHA_KIND_DETECTORS:
|
||
if await _safe_eval(page, js):
|
||
return name
|
||
if await _safe_eval(page, CAPTCHA_PRESENCE_CHECK):
|
||
return "unknown"
|
||
return None
|
||
|
||
|
||
async def is_captcha_solved(page: Any) -> bool:
|
||
"""Капча считается решённой, если ни одного captcha-iframe нет, ИЛИ хотя бы один токен заполнен."""
|
||
for js in CAPTCHA_TOKEN_CHECKS:
|
||
if await _safe_eval(page, js):
|
||
return True
|
||
still_present = await _safe_eval(page, CAPTCHA_PRESENCE_CHECK)
|
||
return not still_present
|
||
|
||
|
||
def _http_post(url: str, payload: dict[str, Any] | None = None, timeout: float = 10.0) -> dict[str, Any]:
|
||
body = json.dumps(payload or {}).encode("utf-8")
|
||
req = request.Request(url, data=body, headers={"Content-Type": "application/json"}, method="POST")
|
||
try:
|
||
with request.urlopen(req, timeout=timeout) as resp:
|
||
raw = resp.read().decode("utf-8")
|
||
return json.loads(raw) if raw else {}
|
||
except error.HTTPError as exc:
|
||
raw = exc.read().decode("utf-8", errors="replace") if exc.fp else ""
|
||
return {"_http_error": exc.code, "_body": raw}
|
||
except Exception as exc:
|
||
return {"_error": str(exc)}
|
||
|
||
|
||
def _http_get(url: str, timeout: float = 35.0) -> dict[str, Any]:
|
||
req = request.Request(url, method="GET")
|
||
try:
|
||
with request.urlopen(req, timeout=timeout) as resp:
|
||
raw = resp.read().decode("utf-8")
|
||
return json.loads(raw) if raw else {}
|
||
except error.HTTPError as exc:
|
||
raw = exc.read().decode("utf-8", errors="replace") if exc.fp else ""
|
||
return {"_http_error": exc.code, "_body": raw}
|
||
except Exception as exc:
|
||
return {"_error": str(exc)}
|
||
|
||
|
||
async def run_to_captcha(
|
||
page: Any,
|
||
reason: str | None = None,
|
||
*,
|
||
task_id: str | None = None,
|
||
api_base: str | None = None,
|
||
view_url: str | None = None,
|
||
timeout_seconds: int | None = None,
|
||
poll_interval: float = 1.5,
|
||
) -> dict[str, Any]:
|
||
"""Основной сценарий. Вызывается из custom action browser-use.
|
||
|
||
Возвращает dict вида {"success": bool, "captcha_kind": str, "resolved_by": str|None, "error": str|None}.
|
||
"""
|
||
|
||
resolved_task_id = task_id or os.getenv("CURRENT_TASK_ID")
|
||
resolved_api_base = (api_base or os.getenv("BROWSER_API_INTERNAL_URL", "http://browser-api:8088/api/browser")).rstrip("/")
|
||
resolved_view_url = view_url or os.getenv("BROWSER_VIEW_URL", "")
|
||
resolved_timeout = int(timeout_seconds if timeout_seconds is not None else os.getenv("CAPTCHA_TIMEOUT_SECONDS", "300"))
|
||
|
||
if not resolved_task_id:
|
||
return {"success": False, "error": "to_captcha: CURRENT_TASK_ID is not set; tool cannot reach the API"}
|
||
|
||
captcha_kind = await detect_captcha_kind(page) or "unknown"
|
||
|
||
notify_resp = await asyncio.to_thread(
|
||
_http_post,
|
||
f"{resolved_api_base}/tasks/{resolved_task_id}/captcha/notify",
|
||
{
|
||
"browser_view_url": resolved_view_url or None,
|
||
"captcha_kind": captcha_kind,
|
||
"reason": reason,
|
||
"timeout_seconds": resolved_timeout,
|
||
},
|
||
)
|
||
if notify_resp.get("_error") or notify_resp.get("_http_error"):
|
||
return {
|
||
"success": False,
|
||
"captcha_kind": captcha_kind,
|
||
"error": f"to_captcha: notify failed: {notify_resp}",
|
||
}
|
||
|
||
deadline = time.time() + resolved_timeout
|
||
prompted_user = False
|
||
|
||
while True:
|
||
# 1) DOM-проверка: решилось ли само?
|
||
if await is_captcha_solved(page):
|
||
await asyncio.to_thread(
|
||
_http_post,
|
||
f"{resolved_api_base}/tasks/{resolved_task_id}/captcha/solved",
|
||
{"detector": "dom_poller"},
|
||
)
|
||
return {
|
||
"success": True,
|
||
"captcha_kind": captcha_kind,
|
||
"resolved_by": "dom_poller",
|
||
"browser_view_url": resolved_view_url,
|
||
}
|
||
|
||
# 2) Статус из API: вдруг внешний вызов abort/extend/solved
|
||
status = await asyncio.to_thread(
|
||
_http_get,
|
||
f"{resolved_api_base}/tasks/{resolved_task_id}/captcha",
|
||
)
|
||
state = (status or {}).get("state")
|
||
if state == "solved":
|
||
return {
|
||
"success": True,
|
||
"captcha_kind": captcha_kind,
|
||
"resolved_by": "external",
|
||
"browser_view_url": resolved_view_url,
|
||
}
|
||
if state == "aborted":
|
||
return {
|
||
"success": False,
|
||
"captcha_kind": captcha_kind,
|
||
"error": "to_captcha: aborted by user",
|
||
"browser_view_url": resolved_view_url,
|
||
}
|
||
if state == "extended":
|
||
api_deadline = (status or {}).get("deadline")
|
||
if isinstance(api_deadline, (int, float)) and api_deadline > deadline:
|
||
deadline = float(api_deadline)
|
||
prompted_user = False
|
||
|
||
# 3) Таймаут — спрашиваем пользователя «продлить/отменить» один раз
|
||
if time.time() >= deadline:
|
||
if not prompted_user:
|
||
await asyncio.to_thread(
|
||
_http_post,
|
||
f"{resolved_api_base}/tasks/{resolved_task_id}/captcha/timeout-prompt",
|
||
{},
|
||
)
|
||
prompted_user = True
|
||
deadline = time.time() + min(60, resolved_timeout)
|
||
continue
|
||
return {
|
||
"success": False,
|
||
"captcha_kind": captcha_kind,
|
||
"error": "to_captcha: timeout (no user response)",
|
||
"browser_view_url": resolved_view_url,
|
||
}
|
||
|
||
await asyncio.sleep(poll_interval)
|
||
|
||
|
||
def register(controller: Any) -> None:
|
||
"""Регистрирует action `to_captcha` на переданном browser-use Controller."""
|
||
|
||
@controller.action(
|
||
"Pause the run, ask the human to solve the on-page CAPTCHA via the live browser view, "
|
||
"and resume automatically once the DOM detector sees the challenge gone. "
|
||
"Call this ONLY when the current page is blocked by reCAPTCHA, hCaptcha or Cloudflare Turnstile."
|
||
)
|
||
async def to_captcha(reason: str = "", browser=None, page=None) -> dict[str, Any]:
|
||
actual_page = page
|
||
if actual_page is None and browser is not None:
|
||
get_page = getattr(browser, "get_current_page", None) or getattr(browser, "get_page", None)
|
||
if callable(get_page):
|
||
actual_page = get_page()
|
||
if asyncio.iscoroutine(actual_page):
|
||
actual_page = await actual_page
|
||
if actual_page is None:
|
||
return {"success": False, "error": "to_captcha: browser-use did not provide a page"}
|
||
return await run_to_captcha(actual_page, reason=reason or None)
|
||
|
||
return to_captcha
|