From c98ee985259470a271dfbcb5fc7715263965f315 Mon Sep 17 00:00:00 2001 From: teknium1 Date: Sat, 21 Feb 2026 12:15:40 -0800 Subject: [PATCH] feat: implement interactive prompts for sudo password and command approval in CLI - Added methods for handling sudo password and dangerous command approval prompts using a callback mechanism in cli.py. - Integrated these prompts with the prompt_toolkit UI for improved user experience. - Updated terminal_tool.py to support callback registration for interactive prompts, enhancing the CLI's interactivity. - Introduced a background thread for API calls in run_agent.py to allow for interrupt handling during long-running operations. - Enhanced error handling for interrupted API calls, ensuring graceful degradation of user experience. --- cli.py | 357 +++++++++++++++++++++++++++++++++++++---- run_agent.py | 45 +++++- tools/terminal_tool.py | 46 +++++- 3 files changed, 409 insertions(+), 39 deletions(-) diff --git a/cli.py b/cli.py index 0ff5ad5c..35903b56 100755 --- a/cli.py +++ b/cli.py @@ -281,6 +281,7 @@ from cron import create_job, list_jobs, remove_job, get_job, run_daemon as run_c # Resource cleanup imports for safe shutdown (terminal VMs, browser sessions) from tools.terminal_tool import cleanup_all_environments as _cleanup_all_terminals +from tools.terminal_tool import set_sudo_password_callback, set_approval_callback from tools.browser_tool import _emergency_cleanup_all_sessions as _cleanup_all_browsers # Guard to prevent cleanup from running multiple times on exit @@ -1582,6 +1583,100 @@ class HermesCLI: "Use your best judgement to make the choice and proceed." ) + def _sudo_password_callback(self) -> str: + """ + Prompt for sudo password through the prompt_toolkit UI. + + Called from the agent thread when a sudo command is encountered. + Uses the same clarify-style mechanism: sets UI state, waits on a + queue for the user's response via the Enter key binding. + """ + import time as _time + + timeout = 45 + response_queue = queue.Queue() + + self._sudo_state = { + "response_queue": response_queue, + } + self._sudo_deadline = _time.monotonic() + timeout + + if hasattr(self, '_app') and self._app: + self._app.invalidate() + + while True: + try: + result = response_queue.get(timeout=1) + self._sudo_state = None + self._sudo_deadline = 0 + if hasattr(self, '_app') and self._app: + self._app.invalidate() + if result: + _cprint(f"\n{_DIM} ✓ Password received (cached for session){_RST}") + else: + _cprint(f"\n{_DIM} ⏭ Skipped{_RST}") + return result + except queue.Empty: + remaining = self._sudo_deadline - _time.monotonic() + if remaining <= 0: + break + if hasattr(self, '_app') and self._app: + self._app.invalidate() + + self._sudo_state = None + self._sudo_deadline = 0 + if hasattr(self, '_app') and self._app: + self._app.invalidate() + _cprint(f"\n{_DIM} ⏱ Timeout — continuing without sudo{_RST}") + return "" + + def _approval_callback(self, command: str, description: str) -> str: + """ + Prompt for dangerous command approval through the prompt_toolkit UI. + + Called from the agent thread. Shows a selection UI similar to clarify + with choices: once / session / always / deny. + """ + import time as _time + + timeout = 60 + response_queue = queue.Queue() + choices = ["once", "session", "always", "deny"] + + self._approval_state = { + "command": command, + "description": description, + "choices": choices, + "selected": 0, + "response_queue": response_queue, + } + self._approval_deadline = _time.monotonic() + timeout + + if hasattr(self, '_app') and self._app: + self._app.invalidate() + + while True: + try: + result = response_queue.get(timeout=1) + self._approval_state = None + self._approval_deadline = 0 + if hasattr(self, '_app') and self._app: + self._app.invalidate() + return result + except queue.Empty: + remaining = self._approval_deadline - _time.monotonic() + if remaining <= 0: + break + if hasattr(self, '_app') and self._app: + self._app.invalidate() + + self._approval_state = None + self._approval_deadline = 0 + if hasattr(self, '_app') and self._app: + self._app.invalidate() + _cprint(f"\n{_DIM} ⏱ Timeout — denying command{_RST}") + return "deny" + def chat(self, message: str) -> Optional[str]: """ Send a message to the agent and get a response. @@ -1724,6 +1819,18 @@ class HermesCLI: self._clarify_state = None # dict with question, choices, selected, response_queue self._clarify_freetext = False # True when user chose "Other" and is typing self._clarify_deadline = 0 # monotonic timestamp when the clarify times out + + # Sudo password prompt state (similar mechanism to clarify) + self._sudo_state = None # dict with response_queue when active + self._sudo_deadline = 0 + + # Dangerous command approval state (similar mechanism to clarify) + self._approval_state = None # dict with command, description, choices, selected, response_queue + self._approval_deadline = 0 + + # Register callbacks so terminal_tool prompts route through our UI + set_sudo_password_callback(self._sudo_password_callback) + set_approval_callback(self._approval_callback) # Key bindings for the input area kb = KeyBindings() @@ -1732,7 +1839,9 @@ class HermesCLI: def handle_enter(event): """Handle Enter key - submit input. - Routes to the correct queue based on agent state: + Routes to the correct queue based on active UI state: + - Sudo password prompt: password goes to sudo response queue + - Approval selection: selected choice goes to approval response queue - Clarify freetext mode: answer goes to the clarify response queue - Clarify choice mode: selected choice goes to the clarify response queue - Agent running: goes to _interrupt_queue (chat() monitors this) @@ -1740,6 +1849,26 @@ class HermesCLI: Commands (starting with /) always go to _pending_input so they're handled as commands, not sent as interrupt text to the agent. """ + # --- Sudo password prompt: submit the typed password --- + if self._sudo_state: + text = event.app.current_buffer.text + self._sudo_state["response_queue"].put(text) + self._sudo_state = None + event.app.current_buffer.reset() + event.app.invalidate() + return + + # --- Approval selection: confirm the highlighted choice --- + if self._approval_state: + state = self._approval_state + selected = state["selected"] + choices = state["choices"] + if 0 <= selected < len(choices): + state["response_queue"].put(choices[selected]) + self._approval_state = None + event.app.invalidate() + return + # --- Clarify freetext mode: user typed their own answer --- if self._clarify_freetext and self._clarify_state: text = event.app.current_buffer.text.strip() @@ -1802,31 +1931,71 @@ class HermesCLI: max_idx = len(choices) # last index is the "Other" option self._clarify_state["selected"] = min(max_idx, self._clarify_state["selected"] + 1) event.app.invalidate() - + + # --- Dangerous command approval: arrow-key navigation --- + + @kb.add('up', filter=Condition(lambda: bool(self._approval_state))) + def approval_up(event): + if self._approval_state: + self._approval_state["selected"] = max(0, self._approval_state["selected"] - 1) + event.app.invalidate() + + @kb.add('down', filter=Condition(lambda: bool(self._approval_state))) + def approval_down(event): + if self._approval_state: + max_idx = len(self._approval_state["choices"]) - 1 + self._approval_state["selected"] = min(max_idx, self._approval_state["selected"] + 1) + event.app.invalidate() + @kb.add('c-c') def handle_ctrl_c(event): - """Handle Ctrl+C - interrupt agent or force exit on double press. + """Handle Ctrl+C - cancel interactive prompts, interrupt agent, or exit. - First Ctrl+C: interrupt the running agent gracefully. - Second Ctrl+C within 2 seconds (or when agent is idle): force exit. + Priority: + 1. Cancel active sudo/approval/clarify prompt + 2. Interrupt the running agent (first press) + 3. Force exit (second press within 2s, or when idle) """ import time as _time now = _time.time() - + + # Cancel sudo prompt + if self._sudo_state: + self._sudo_state["response_queue"].put("") + self._sudo_state = None + event.app.current_buffer.reset() + event.app.invalidate() + return + + # Cancel approval prompt (deny) + if self._approval_state: + self._approval_state["response_queue"].put("deny") + self._approval_state = None + event.app.invalidate() + return + + # Cancel clarify prompt + if self._clarify_state: + self._clarify_state["response_queue"].put( + "The user cancelled. Use your best judgement to proceed." + ) + self._clarify_state = None + self._clarify_freetext = False + event.app.current_buffer.reset() + event.app.invalidate() + return + if self._agent_running and self.agent: - # Check for double Ctrl+C (second press within 2 seconds) if now - self._last_ctrl_c_time < 2.0: print("\n⚡ Force exiting...") self._should_exit = True event.app.exit() return - # First Ctrl+C: try graceful interrupt self._last_ctrl_c_time = now print("\n⚡ Interrupting agent... (press Ctrl+C again to force exit)") self.agent.interrupt() else: - # Agent not running, exit immediately self._should_exit = True event.app.exit() @@ -1841,6 +2010,10 @@ class HermesCLI: cli_ref = self def get_prompt(): + if cli_ref._sudo_state: + return [('class:sudo-prompt', '🔐 ❯ ')] + if cli_ref._approval_state: + return [('class:prompt-working', '⚠ ❯ ')] if cli_ref._clarify_freetext: return [('class:clarify-selected', '✎ ❯ ')] if cli_ref._clarify_state: @@ -1861,14 +2034,23 @@ class HermesCLI: complete_while_typing=True, ) - # Dynamic height: return the exact line count so the TextArea is - # always exactly as tall as its content -- no extra blank space. - # The bottom rule sits directly below the last line of text and - # pushes down only when the user adds a newline. + # Dynamic height: accounts for both explicit newlines AND visual + # wrapping of long lines so the input area always fits its content. + # The prompt characters ("❯ " etc.) consume ~4 columns. def _input_height(): try: - lines = input_area.buffer.document.line_count - return min(max(lines, 1), 8) + doc = input_area.buffer.document + available_width = (cli_ref.console.width or 80) - 4 # subtract prompt width + if available_width < 10: + available_width = 40 + visual_lines = 0 + for line in doc.lines: + # Each logical line takes at least 1 visual row; long lines wrap + if len(line) == 0: + visual_lines += 1 + else: + visual_lines += max(1, -(-len(line) // available_width)) # ceil division + return min(max(visual_lines, 1), 8) except Exception: return 1 @@ -1895,15 +2077,29 @@ class HermesCLI: input_area.buffer.on_text_changed += _on_text_changed - # Hint line above input: shows placeholder when agent is working - # and the user hasn't typed anything yet. Disappears when idle - # or when the user starts typing. + # Hint line above input: context-sensitive instructions for the + # current UI state (sudo prompt, approval, clarify, interrupt). def get_hint_text(): - if not cli_ref._agent_running: - return [] - # When clarify is active, show a different hint with countdown + import time as _time + + # Sudo password prompt + if cli_ref._sudo_state: + remaining = max(0, int(cli_ref._sudo_deadline - _time.monotonic())) + return [ + ('class:hint', ' type password (hidden) and press Enter, or Enter to skip'), + ('class:clarify-countdown', f' ({remaining}s)'), + ] + + # Dangerous command approval + if cli_ref._approval_state: + remaining = max(0, int(cli_ref._approval_deadline - _time.monotonic())) + return [ + ('class:hint', ' ↑/↓ to select, Enter to confirm'), + ('class:clarify-countdown', f' ({remaining}s)'), + ] + + # Clarify question if cli_ref._clarify_state: - import time as _time remaining = max(0, int(cli_ref._clarify_deadline - _time.monotonic())) countdown = f' ({remaining}s)' if cli_ref._clarify_deadline else '' if cli_ref._clarify_freetext: @@ -1915,13 +2111,18 @@ class HermesCLI: ('class:hint', ' ↑/↓ to select, Enter to confirm'), ('class:clarify-countdown', countdown), ] + + if not cli_ref._agent_running: + return [] + + # Agent is running — show interrupt hint only when buffer is empty buf = input_area.buffer if buf.text: - return [] - return [('class:hint', ' type here to interrupt')] + return [('class:hint', ' press Enter to send interrupt')] + return [('class:hint', ' type a message + Enter to interrupt, or Ctrl+C to cancel')] def get_hint_height(): - if cli_ref._clarify_state: + if cli_ref._sudo_state or cli_ref._approval_state or cli_ref._clarify_state: return 1 return 1 if cli_ref._agent_running else 0 @@ -1987,7 +2188,84 @@ class HermesCLI: ), filter=Condition(lambda: cli_ref._clarify_state is not None), ) - + + # --- Sudo password: display widget --- + + def _get_sudo_display(): + state = cli_ref._sudo_state + if not state: + return [] + lines = [] + lines.append(('class:sudo-border', '╭─ ')) + lines.append(('class:sudo-title', '🔐 Sudo Password Required')) + lines.append(('class:sudo-border', ' ──────────────────────────╮\n')) + lines.append(('class:sudo-border', '│\n')) + lines.append(('class:sudo-border', '│ ')) + lines.append(('class:sudo-text', 'Enter password below (hidden), or press Enter to skip')) + lines.append(('', '\n')) + lines.append(('class:sudo-border', '│\n')) + lines.append(('class:sudo-border', '╰──────────────────────────────────────────────────╯\n')) + return lines + + sudo_widget = ConditionalContainer( + Window( + FormattedTextControl(_get_sudo_display), + wrap_lines=True, + ), + filter=Condition(lambda: cli_ref._sudo_state is not None), + ) + + # --- Dangerous command approval: display widget --- + + def _get_approval_display(): + state = cli_ref._approval_state + if not state: + return [] + command = state["command"] + description = state["description"] + choices = state["choices"] + selected = state.get("selected", 0) + + cmd_display = command[:70] + '...' if len(command) > 70 else command + choice_labels = { + "once": "Allow once", + "session": "Allow for this session", + "always": "Add to permanent allowlist", + "deny": "Deny", + } + + lines = [] + lines.append(('class:approval-border', '╭─ ')) + lines.append(('class:approval-title', '⚠️ Dangerous Command')) + lines.append(('class:approval-border', ' ───────────────────────────────╮\n')) + lines.append(('class:approval-border', '│\n')) + lines.append(('class:approval-border', '│ ')) + lines.append(('class:approval-desc', description)) + lines.append(('', '\n')) + lines.append(('class:approval-border', '│ ')) + lines.append(('class:approval-cmd', cmd_display)) + lines.append(('', '\n')) + lines.append(('class:approval-border', '│\n')) + for i, choice in enumerate(choices): + lines.append(('class:approval-border', '│ ')) + label = choice_labels.get(choice, choice) + if i == selected: + lines.append(('class:approval-selected', f'❯ {label}')) + else: + lines.append(('class:approval-choice', f' {label}')) + lines.append(('', '\n')) + lines.append(('class:approval-border', '│\n')) + lines.append(('class:approval-border', '╰──────────────────────────────────────────────────────╯\n')) + return lines + + approval_widget = ConditionalContainer( + Window( + FormattedTextControl(_get_approval_display), + wrap_lines=True, + ), + filter=Condition(lambda: cli_ref._approval_state is not None), + ) + # Horizontal rules above and below the input (bronze, 1 line each). # The bottom rule moves down as the TextArea grows with newlines. input_rule_top = Window( @@ -1999,16 +2277,14 @@ class HermesCLI: height=1, ) - # Layout: spacer + ruled input at bottom, completions below. - # Using inline CompletionsMenu (not a Float) so it reliably appears even - # after agent output has filled the terminal via patch_stdout. Float-based - # menus lose their rendering space in non-full-screen mode once scrollback - # pushes the app area to the very bottom of the terminal. - # The clarify_widget appears above the input area when the agent - # asks a multiple-choice or open-ended question. + # Layout: interactive prompt widgets + ruled input at bottom. + # The sudo, approval, and clarify widgets appear above the input when + # the corresponding interactive prompt is active. layout = Layout( HSplit([ Window(height=0), + sudo_widget, + approval_widget, clarify_widget, spacer, input_rule_top, @@ -2039,6 +2315,18 @@ class HermesCLI: 'clarify-selected': '#FFD700 bold', 'clarify-active-other': '#FFD700 italic', 'clarify-countdown': '#CD7F32', + # Sudo password panel + 'sudo-prompt': '#FF6B6B bold', + 'sudo-border': '#CD7F32', + 'sudo-title': '#FF6B6B bold', + 'sudo-text': '#FFF8DC', + # Dangerous command approval panel + 'approval-border': '#CD7F32', + 'approval-title': '#FF8C00 bold', + 'approval-desc': '#FFF8DC bold', + 'approval-cmd': '#AAAAAA italic', + 'approval-choice': '#AAAAAA', + 'approval-selected': '#FFD700 bold', }) # Create the application @@ -2126,6 +2414,9 @@ class HermesCLI: pass finally: self._should_exit = True + # Unregister terminal_tool callbacks to avoid dangling references + set_sudo_password_callback(None) + set_approval_callback(None) # Close session in SQLite if hasattr(self, '_session_db') and self._session_db and self.agent: try: diff --git a/run_agent.py b/run_agent.py index b44360ab..854d9b9f 100644 --- a/run_agent.py +++ b/run_agent.py @@ -2285,6 +2285,35 @@ class AIAgent: if self._memory_store: self._memory_store.load_from_disk() + def _interruptible_api_call(self, api_kwargs: dict): + """ + Run the API call in a background thread so the main conversation loop + can detect interrupts without waiting for the full HTTP round-trip. + + Returns the API response, or raises InterruptedError if the agent was + interrupted while waiting. + """ + result = {"response": None, "error": None} + + def _call(): + try: + result["response"] = self.client.chat.completions.create(**api_kwargs) + except Exception as e: + result["error"] = e + + t = threading.Thread(target=_call, daemon=True) + t.start() + # Poll every 0.3s so interrupts are noticed quickly + while t.is_alive(): + t.join(timeout=0.3) + if self._interrupt_requested: + # Can't cancel the HTTP request cleanly, but we can stop + # waiting and let the thread finish in the background. + raise InterruptedError("Agent interrupted during API call") + if result["error"] is not None: + raise result["error"] + return result["response"] + def _build_api_kwargs(self, api_messages: list) -> dict: """Build the keyword arguments dict for the chat completions API call.""" provider_preferences = {} @@ -2778,7 +2807,7 @@ class AIAgent: if os.getenv("HERMES_DUMP_REQUESTS", "").strip().lower() in {"1", "true", "yes", "on"}: self._dump_api_request_debug(api_kwargs, reason="preflight") - response = self.client.chat.completions.create(**api_kwargs) + response = self._interruptible_api_call(api_kwargs) api_duration = time.time() - api_start_time @@ -2935,6 +2964,16 @@ class AIAgent: break # Success, exit retry loop + except InterruptedError: + if thinking_spinner: + thinking_spinner.stop("") + thinking_spinner = None + print(f"{self.log_prefix}⚡ Interrupted during API call.") + self._flush_messages_to_session_db(messages, conversation_history) + interrupted = True + final_response = "Operation interrupted." + break + except Exception as api_error: # Stop spinner before printing error messages if thinking_spinner: @@ -3053,6 +3092,10 @@ class AIAgent: } time.sleep(0.2) # Check interrupt every 200ms + # If the API call was interrupted, skip response processing + if interrupted: + break + try: assistant_message = response.choices[0].message diff --git a/tools/terminal_tool.py b/tools/terminal_tool.py index e05ae6ab..6b06d342 100644 --- a/tools/terminal_tool.py +++ b/tools/terminal_tool.py @@ -231,6 +231,26 @@ def _check_disk_usage_warning(): # Session-cached sudo password (persists until CLI exits) _cached_sudo_password: str = "" +# Optional UI callbacks for interactive prompts. When set, these are called +# instead of the default /dev/tty or input() readers. The CLI registers these +# so prompts route through prompt_toolkit's event loop. +# _sudo_password_callback() -> str (return password or "" to skip) +# _approval_callback(command, description) -> str ("once"/"session"/"always"/"deny") +_sudo_password_callback = None +_approval_callback = None + + +def set_sudo_password_callback(cb): + """Register a callback for sudo password prompts (used by CLI).""" + global _sudo_password_callback + _sudo_password_callback = cb + + +def set_approval_callback(cb): + """Register a callback for dangerous command approval prompts (used by CLI).""" + global _approval_callback + _approval_callback = cb + # ============================================================================= # Dangerous Command Approval System # ============================================================================= @@ -319,16 +339,26 @@ def _prompt_dangerous_approval(command: str, description: str, timeout_seconds: """ Prompt user to approve a dangerous command (CLI only). + If an _approval_callback is registered (by the CLI), delegates to it so the + prompt integrates with prompt_toolkit's UI. Otherwise falls back to the + raw input() approach (works outside the TUI, e.g. tests). + Returns: 'once', 'session', 'always', or 'deny' """ import sys import threading + # Use the registered callback when available (prompt_toolkit-compatible) + if _approval_callback is not None: + try: + return _approval_callback(command, description) + except Exception: + return "deny" + # Pause spinner if one is running os.environ["HERMES_SPINNER_PAUSE"] = "1" try: - # Use simple ASCII art for compatibility (no ANSI color codes) print() print(f" ⚠️ DANGEROUS COMMAND: {description}") print(f" {command[:80]}{'...' if len(command) > 80 else ''}") @@ -484,12 +514,20 @@ def _prompt_for_sudo_password(timeout_seconds: int = 45) -> str: - Any error occurs Only works in interactive mode (HERMES_INTERACTIVE=1). - Reads directly from /dev/tty with echo disabled to avoid conflicts - with prompt_toolkit's patch_stdout / Application input handling. + If a _sudo_password_callback is registered (by the CLI), delegates to it + so the prompt integrates with prompt_toolkit's UI. Otherwise reads + directly from /dev/tty with echo disabled. """ import sys import time as time_module + # Use the registered callback when available (prompt_toolkit-compatible) + if _sudo_password_callback is not None: + try: + return _sudo_password_callback() or "" + except Exception: + return "" + result = {"password": None, "done": False} def read_password_thread(): @@ -500,11 +538,9 @@ def _prompt_for_sudo_password(timeout_seconds: int = 45) -> str: import termios tty_fd = os.open("/dev/tty", os.O_RDONLY) old_attrs = termios.tcgetattr(tty_fd) - # Disable echo (ECHO) but keep canonical mode (ICANON) for line buffering new_attrs = termios.tcgetattr(tty_fd) new_attrs[3] = new_attrs[3] & ~termios.ECHO termios.tcsetattr(tty_fd, termios.TCSAFLUSH, new_attrs) - # Read one line (up to newline) chars = [] while True: b = os.read(tty_fd, 1)