add new tool: to_captcha

This commit is contained in:
VladislavIlin7 2026-04-21 23:32:09 +03:00
parent 50589232d6
commit f1f32d8366
14 changed files with 1008 additions and 130 deletions

View file

@ -1922,6 +1922,7 @@ class HermesCLI:
platform="cli",
session_db=self._session_db,
clarify_callback=self._clarify_callback,
captcha_callback=self._captcha_callback,
reasoning_callback=(
self._stream_reasoning_delta if (self.streaming_enabled and self.show_reasoning)
else self._on_reasoning if (self.show_reasoning or self.verbose)
@ -5113,6 +5114,40 @@ class HermesCLI:
"Use your best judgement to make the choice and proceed."
)
def _captcha_callback(self, payload):
"""Prompt the user to complete a paused CAPTCHA flow in the live browser."""
import time as _time
timeout = int((payload.get("verification") or {}).get("max_wait_seconds", 900))
response_queue = queue.Queue()
self._captcha_state = {
"payload": payload,
"response_queue": response_queue,
}
self._captcha_deadline = _time.monotonic() + timeout
self._invalidate()
last_refresh = _time.monotonic()
while True:
try:
result = response_queue.get(timeout=1)
self._captcha_deadline = 0
return result
except queue.Empty:
remaining = self._captcha_deadline - _time.monotonic()
if remaining <= 0:
break
now = _time.monotonic()
if now - last_refresh >= 5.0:
last_refresh = now
self._invalidate()
self._captcha_state = None
self._captcha_deadline = 0
self._invalidate()
_cprint(f"\n{_DIM}(captcha wait timed out after {timeout}s){_RST}")
return {"action": "timeout", "user_response": ""}
def _sudo_password_callback(self) -> str:
"""
Prompt for sudo password through the prompt_toolkit UI.
@ -5812,6 +5847,8 @@ class HermesCLI:
return [("class:sudo-prompt", f"🔑 {state_suffix}")]
if self._approval_state:
return [("class:prompt-working", f"{state_suffix}")]
if self._captcha_state:
return [("class:prompt-working", f"🧩 {state_suffix}")]
if self._clarify_freetext:
return [("class:clarify-selected", f"{state_suffix}")]
if self._clarify_state:
@ -5878,6 +5915,7 @@ class HermesCLI:
sudo_widget,
secret_widget,
approval_widget,
captcha_widget,
clarify_widget,
spinner_widget,
spacer,
@ -5900,6 +5938,7 @@ class HermesCLI:
sudo_widget,
secret_widget,
approval_widget,
captcha_widget,
clarify_widget,
spinner_widget,
spacer,
@ -5983,6 +6022,10 @@ class HermesCLI:
self._approval_deadline = 0
self._approval_lock = threading.Lock() # serialize concurrent approval prompts (delegation race fix)
# CAPTCHA / human verification prompt state
self._captcha_state = None # dict with payload + response_queue
self._captcha_deadline = 0
# Slash command loading state
self._command_running = False
self._command_status = ""
@ -6058,6 +6101,23 @@ class HermesCLI:
event.app.invalidate()
return
# --- CAPTCHA prompt: accept ready/done/abort style input ---
if self._captcha_state:
text = event.app.current_buffer.text.strip()
normalized = text.lower()
if normalized in {"abort", "cancel", "stop"}:
action = "abort"
elif text:
action = "ready"
else:
return
self._captcha_state["response_queue"].put({"action": action, "user_response": text})
self._captcha_state = None
self._captcha_deadline = 0
event.app.current_buffer.reset()
event.app.invalidate()
return
# --- Clarify freetext mode: user typed their own answer ---
if self._clarify_freetext and self._clarify_state:
text = event.app.current_buffer.text.strip()
@ -6194,7 +6254,7 @@ class HermesCLI:
# Buffer.auto_up/auto_down handle both: cursor movement when multi-line,
# history browsing when on the first/last line (or single-line input).
_normal_input = Condition(
lambda: not self._clarify_state and not self._approval_state and not self._sudo_state and not self._secret_state
lambda: not self._clarify_state and not self._approval_state and not self._captcha_state and not self._sudo_state and not self._secret_state
)
@kb.add('up', filter=_normal_input)
@ -6261,6 +6321,14 @@ class HermesCLI:
event.app.invalidate()
return
if self._captcha_state:
self._captcha_state["response_queue"].put({"action": "timeout", "user_response": ""})
self._captcha_state = None
self._captcha_deadline = 0
event.app.current_buffer.reset()
event.app.invalidate()
return
# Cancel clarify prompt
if self._clarify_state:
self._clarify_state["response_queue"].put(
@ -6334,7 +6402,7 @@ class HermesCLI:
# Guard: don't START recording during agent run or interactive prompts
if cli_ref._agent_running:
return
if cli_ref._clarify_state or cli_ref._sudo_state or cli_ref._approval_state:
if cli_ref._clarify_state or cli_ref._sudo_state or cli_ref._approval_state or cli_ref._captcha_state:
return
# Guard: don't start while a previous stop/transcribe cycle is
# still running — recorder.stop() holds AudioRecorder._lock and
@ -6554,6 +6622,8 @@ class HermesCLI:
return "type secret (hidden), Enter to skip"
if cli_ref._approval_state:
return ""
if cli_ref._captcha_state:
return "type ready/done after you solve the challenge, or abort to cancel"
if cli_ref._clarify_freetext:
return "type your answer here and press Enter"
if cli_ref._clarify_state:
@ -6597,6 +6667,13 @@ class HermesCLI:
('class:clarify-countdown', f' ({remaining}s)'),
]
if cli_ref._captcha_state:
remaining = max(0, int(cli_ref._captcha_deadline - _time.monotonic()))
return [
('class:hint', " complete the challenge in the browser, then type 'ready'"),
('class:clarify-countdown', f' ({remaining}s)'),
]
if cli_ref._clarify_state:
remaining = max(0, int(cli_ref._clarify_deadline - _time.monotonic()))
countdown = f' ({remaining}s)' if cli_ref._clarify_deadline else ''
@ -6619,7 +6696,7 @@ class HermesCLI:
return []
def get_hint_height():
if cli_ref._sudo_state or cli_ref._secret_state or cli_ref._approval_state or cli_ref._clarify_state or cli_ref._command_running:
if cli_ref._sudo_state or cli_ref._secret_state or cli_ref._approval_state or cli_ref._captcha_state or cli_ref._clarify_state or cli_ref._command_running:
return 1
# Keep a 1-line spacer while agent runs so output doesn't push
# right up against the top rule of the input area
@ -6644,7 +6721,7 @@ class HermesCLI:
height=get_hint_height,
)
# --- Clarify tool: dynamic display widget for questions + choices ---
# --- Interactive panels: CAPTCHA + clarify ---
def _panel_box_width(title: str, content_lines: list[str], min_width: int = 46, max_width: int = 76) -> int:
"""Choose a stable panel width wide enough for the title and content."""
@ -6672,6 +6749,45 @@ class HermesCLI:
def _append_blank_panel_line(lines, border_style: str, box_width: int) -> None:
lines.append((border_style, "" + (" " * box_width) + "\n"))
def _get_captcha_display():
state = cli_ref._captcha_state
if not state:
return []
payload = state.get("payload") or {}
title = "Manual CAPTCHA Required"
browser_view_url = payload.get("browser_view_url") or "Browser view URL is not configured."
body_lines = [
payload.get("instructions") or "Open the live browser and complete the verification challenge.",
f"Type: {payload.get('captcha_type', 'unknown')}",
f"Task ID: {payload.get('task_id', '')}",
f"Browser: {browser_view_url}",
"When the challenge disappears, type 'ready' or 'done' and press Enter.",
"Type 'abort' to stop this browser task.",
]
box_width = _panel_box_width(title, body_lines, min_width=56, max_width=94)
inner_text_width = max(8, box_width - 2)
lines = []
lines.append(('class:captcha-border', '╭─ '))
lines.append(('class:captcha-title', title))
lines.append(('class:captcha-border', ' ' + ('' * max(0, box_width - len(title) - 3)) + '\n'))
_append_blank_panel_line(lines, 'class:captcha-border', box_width)
for text in body_lines:
style = 'class:captcha-link' if text.startswith("Browser: ") else 'class:captcha-text'
for wrapped in _wrap_panel_text(text, inner_text_width):
_append_panel_line(lines, 'class:captcha-border', style, wrapped, box_width)
_append_blank_panel_line(lines, 'class:captcha-border', box_width)
lines.append(('class:captcha-border', '' + ('' * box_width) + '\n'))
return lines
captcha_widget = ConditionalContainer(
Window(
FormattedTextControl(_get_captcha_display),
wrap_lines=True,
),
filter=Condition(lambda: cli_ref._captcha_state is not None),
)
def _get_clarify_display():
"""Build styled text for the clarify question/choices panel."""
state = cli_ref._clarify_state
@ -6897,6 +7013,7 @@ class HermesCLI:
sudo_widget=sudo_widget,
secret_widget=secret_widget,
approval_widget=approval_widget,
captcha_widget=captcha_widget,
clarify_widget=clarify_widget,
spinner_widget=spinner_widget,
spacer=spacer,
@ -6954,6 +7071,11 @@ class HermesCLI:
'approval-cmd': '#AAAAAA italic',
'approval-choice': '#AAAAAA',
'approval-selected': '#FFD700 bold',
# CAPTCHA panel
'captcha-border': '#CD7F32',
'captcha-title': '#FFBF00 bold',
'captcha-text': '#FFF8DC',
'captcha-link': '#87CEEB underline',
# Voice mode
'voice-prompt': '#87CEEB',
'voice-recording': '#FF4444 bold',

View file

@ -152,6 +152,7 @@ def _discover_tools():
"tools.memory_tool",
"tools.session_search_tool",
"tools.clarify_tool",
"tools.to_captcha_tool",
"tools.code_execution_tool",
"tools.delegate_tool",
"tools.process_registry",
@ -362,7 +363,7 @@ def get_tool_definitions(
# because they need agent-level state (TodoStore, MemoryStore, etc.).
# The registry still holds their schemas; dispatch just returns a stub error
# so if something slips through, the LLM sees a sensible message.
_AGENT_LOOP_TOOLS = {"todo", "memory", "session_search", "delegate_task"}
_AGENT_LOOP_TOOLS = {"todo", "memory", "session_search", "delegate_task", "to_captcha"}
_READ_SEARCH_TOOLS = {"read_file", "search_files"}

View file

@ -400,6 +400,7 @@ class AIAgent:
thinking_callback: callable = None,
reasoning_callback: callable = None,
clarify_callback: callable = None,
captcha_callback: callable = None,
step_callback: callable = None,
stream_delta_callback: callable = None,
tool_gen_callback: callable = None,
@ -447,6 +448,7 @@ class AIAgent:
tool_progress_callback (callable): Callback function(tool_name, args_preview) for progress notifications
clarify_callback (callable): Callback function(question, choices) -> str for interactive user questions.
Provided by the platform layer (CLI or gateway). If None, the clarify tool returns an error.
captcha_callback (callable): Callback function(payload_dict) -> dict for manual CAPTCHA completion flows.
max_tokens (int): Maximum tokens for model responses (optional, uses model default if not set)
reasoning_config (Dict): OpenRouter reasoning configuration override (e.g. {"effort": "none"} to disable thinking).
If None, defaults to {"enabled": True, "effort": "medium"} for OpenRouter. Set to disable/customize reasoning.
@ -529,6 +531,7 @@ class AIAgent:
self.thinking_callback = thinking_callback
self.reasoning_callback = reasoning_callback
self.clarify_callback = clarify_callback
self.captcha_callback = captcha_callback
self.step_callback = step_callback
self.stream_delta_callback = stream_delta_callback
self.status_callback = status_callback
@ -4693,6 +4696,18 @@ class AIAgent:
choices=function_args.get("choices"),
callback=self.clarify_callback,
)
elif function_name == "to_captcha":
from tools.to_captcha_tool import to_captcha_tool as _to_captcha_tool
return _to_captcha_tool(
task_id=function_args.get("task_id", ""),
browser_view_url=function_args.get("browser_view_url"),
captcha_type=function_args.get("captcha_type"),
instructions=function_args.get("instructions"),
detected_at=function_args.get("detected_at"),
verification=function_args.get("verification"),
resume_token=function_args.get("resume_token"),
callback=self.captcha_callback,
)
elif function_name == "delegate_task":
from tools.delegate_tool import delegate_task as _delegate_task
return _delegate_task(
@ -4711,6 +4726,53 @@ class AIAgent:
honcho_session_key=self._honcho_session_key,
)
def _maybe_resolve_captcha(self, function_name: str, function_result: str, effective_task_id: str) -> str:
"""Bridge paused browser tasks into the dedicated CAPTCHA orchestration flow."""
if function_name != "internet_browser":
return function_result
try:
payload = json.loads(function_result)
except (json.JSONDecodeError, TypeError):
return function_result
if not isinstance(payload, dict):
return function_result
if payload.get("status") != "awaiting_user_captcha":
return function_result
human = payload.get("human_intervention") or {}
captcha_args = {
"task_id": payload.get("task_id") or human.get("task_id") or effective_task_id,
"browser_view_url": human.get("browser_view_url"),
"captcha_type": human.get("captcha_type"),
"instructions": human.get("instructions"),
"detected_at": human.get("detected_at"),
"verification": human.get("verification"),
"resume_token": human.get("resume_token"),
}
captcha_result = self._invoke_tool("to_captcha", captcha_args, effective_task_id)
try:
captcha_payload = json.loads(captcha_result)
except (json.JSONDecodeError, TypeError):
return captcha_result
if not isinstance(captcha_payload, dict):
return captcha_result
if captcha_payload.get("status") == "resumed":
task_result = captcha_payload.get("task_result")
if isinstance(task_result, dict):
return json.dumps(task_result, ensure_ascii=False)
return json.dumps(
{
"success": False,
"status": captcha_payload.get("status", "still_blocked"),
"task_id": captcha_args["task_id"],
"human_intervention": human,
"captcha_flow": captcha_payload,
},
ensure_ascii=False,
)
def _execute_tool_calls_concurrent(self, assistant_message, messages: list, effective_task_id: str, api_call_count: int = 0) -> None:
"""Execute multiple tool calls concurrently using a thread pool.
@ -4843,6 +4905,8 @@ class AIAgent:
tool_duration = 0.0
else:
function_name, function_args, function_result, tool_duration, is_error = r
function_result = self._maybe_resolve_captcha(function_name, function_result, effective_task_id)
is_error, _ = _detect_tool_failure(function_name, function_result)
if is_error:
result_preview = function_result[:200] if len(function_result) > 200 else function_result
@ -5029,6 +5093,21 @@ class AIAgent:
tool_duration = time.time() - tool_start_time
if self.quiet_mode:
self._vprint(f" {_get_cute_tool_message_impl('clarify', function_args, tool_duration, result=function_result)}")
elif function_name == "to_captcha":
from tools.to_captcha_tool import to_captcha_tool as _to_captcha_tool
function_result = _to_captcha_tool(
task_id=function_args.get("task_id", ""),
browser_view_url=function_args.get("browser_view_url"),
captcha_type=function_args.get("captcha_type"),
instructions=function_args.get("instructions"),
detected_at=function_args.get("detected_at"),
verification=function_args.get("verification"),
resume_token=function_args.get("resume_token"),
callback=self.captcha_callback,
)
tool_duration = time.time() - tool_start_time
if self.quiet_mode:
self._vprint(f" {_get_cute_tool_message_impl('to_captcha', function_args, tool_duration, result=function_result)}")
elif function_name == "delegate_task":
from tools.delegate_tool import delegate_task as _delegate_task
tasks_arg = function_args.get("tasks")
@ -5099,6 +5178,7 @@ class AIAgent:
logger.error("handle_function_call raised for %s: %s", function_name, tool_error, exc_info=True)
tool_duration = time.time() - tool_start_time
function_result = self._maybe_resolve_captcha(function_name, function_result, effective_task_id)
result_preview = function_result if self.verbose_logging else (
function_result[:200] if len(function_result) > 200 else function_result
)

View file

@ -1,65 +1,130 @@
import json
import os
import time
from urllib import error, request
from tools.registry import registry
def run_browser_task(task):
def _browser_api_base_url() -> str:
return os.getenv("BROWSER_API_URL", "http://browser-api:8088/api/browser").rstrip("/")
def _http_json(url: str, method: str = "GET", payload: dict | None = None, timeout_sec: int = 30) -> dict:
body = None
headers = {"Content-Type": "application/json"}
if payload is not None:
body = json.dumps(payload).encode("utf-8")
req = request.Request(url, data=body, headers=headers, method=method)
try:
with request.urlopen(req, timeout=timeout_sec) as resp:
raw = resp.read().decode("utf-8")
return json.loads(raw) if raw else {}
except error.HTTPError as http_err:
raw = http_err.read().decode("utf-8", errors="replace")
try:
data = json.loads(raw) if raw else {}
except json.JSONDecodeError:
data = {"details": raw}
return {
"success": False,
"error": f"Browser API returned HTTP {http_err.code}",
"details": data,
}
except Exception as err:
return {"success": False, "error": f"Browser API request failed: {err}"}
def run_browser_task(task: str):
if not task or not str(task).strip():
return json.dumps({"success": False, "error": "Task is required"}, ensure_ascii=False)
rpc_url = os.getenv("BROWSER_USE_RPC_URL", "http://browser:8787/run")
timeout_sec = int(os.getenv("BROWSER_USE_RPC_TIMEOUT", "900"))
payload = json.dumps({"task": task}).encode("utf-8")
req = request.Request(rpc_url, data=payload, headers={"Content-Type": "application/json"}, method="POST")
poll_interval = float(os.getenv("BROWSER_API_POLL_INTERVAL", "1.5"))
api_base = _browser_api_base_url()
try:
with request.urlopen(req, timeout=timeout_sec) as resp:
body = resp.read().decode("utf-8")
return body
except error.HTTPError as http_err:
body = http_err.read().decode("utf-8", errors="replace")
accepted = _http_json(
f"{api_base}/tasks",
method="POST",
payload={"task": task, "timeout": timeout_sec, "metadata": {"source": "internet_browser"}},
timeout_sec=30,
)
task_id = accepted.get("task_id")
if not task_id:
return json.dumps(
{
"success": False,
"error": f"browser-use RPC returned HTTP {http_err.code}",
"details": body,
},
ensure_ascii=False,
)
except Exception as err:
return json.dumps(
{
"success": False,
"error": f"browser-use RPC request failed: {err}",
"error": accepted.get("error", "Browser task was not accepted"),
"details": accepted,
},
ensure_ascii=False,
)
deadline = time.time() + timeout_sec + 10
status_url = f"{api_base}/tasks/{task_id}"
result_url = f"{api_base}/tasks/{task_id}/result"
while time.time() < deadline:
status_payload = _http_json(status_url, timeout_sec=15)
status = status_payload.get("status")
if not status and status_payload.get("error"):
return json.dumps(
{
"success": False,
"status": "failed",
"task_id": task_id,
"error": status_payload.get("error"),
"details": status_payload.get("details"),
},
ensure_ascii=False,
)
if status == "awaiting_user_captcha":
return json.dumps(
{
"success": False,
"status": status,
"task_id": task_id,
"human_intervention": status_payload.get("human_intervention"),
},
ensure_ascii=False,
)
if status in {"succeeded", "failed"}:
result_payload = _http_json(result_url, timeout_sec=30)
result_payload.setdefault("task_id", task_id)
return json.dumps(result_payload, ensure_ascii=False)
time.sleep(poll_interval)
return json.dumps(
{
"success": False,
"status": "failed",
"task_id": task_id,
"error": "Timed out while waiting for browser task result",
},
ensure_ascii=False,
)
registry.register(
name="internet_browser",
toolset="browse_cmd",
toolset="browse_cmd",
schema={
"name": "internet_browser",
"description": (
"ГЛАВНЫЙ ИНСТРУМЕНТ ДЛЯ ВЕБ-СЕРФИНГА. Вызывай этот инструмент НАПРЯМУЮ (через стандартный tool call/function call). "
"КАТЕГОРИЧЕСКИ ЗАПРЕЩЕНО использовать `execute_code` или `delegate_task` для работы с браузером. "
"Не пиши Python-скрипты! Просто передай в этот инструмент параметр `task`. "
"Используй для любых задач в интернете: поиск товаров (Wildberries, Ozon), чтение статей, клики, навигация."
"Main browser automation tool for internet tasks. Call it directly via a normal tool/function call. "
"Do not use execute_code or delegate_task for browser work. Pass the task in natural language."
),
"parameters": {
"type": "object",
"properties": {
"task": {
"type": "string",
"description": "Подробная задача на естественном языке. Например: 'Зайди на wildberries.ru, найди черную футболку и верни цену'."
"type": "string",
"description": "Detailed natural-language browser task."
}
},
"required": ["task"]
}
},
handler=lambda args, **kw: run_browser_task(args.get("task")),
emoji="🌐",
)
)