fix(gateway): replace bare text approval with /approve and /deny commands (#2002)

The gateway approval system previously intercepted bare 'yes'/'no' text
from the user's next message to approve/deny dangerous commands. This was
fragile and dangerous — if the agent asked a clarify question and the user
said 'yes' to answer it, the gateway would execute the pending dangerous
command instead. (Fixes #1888)

Changes:
- Remove bare text matching ('yes', 'y', 'approve', 'ok', etc.) from
  _handle_message approval check
- Add /approve and /deny as gateway-only slash commands in the command
  registry
- /approve supports scoping: /approve (one-time), /approve session,
  /approve always (permanent)
- Add 5-minute timeout for stale approvals
- Gateway appends structured instructions to the agent response when a
  dangerous command is pending, telling the user exactly how to respond
- 9 tests covering approve, deny, timeout, scoping, and verification
  that bare 'yes' no longer triggers execution

Credit to @solo386 and @FlyByNight69420 for identifying and reporting
this security issue in PR #1971 and issue #1888.

Co-authored-by: Test <test@test.com>
This commit is contained in:
Teknium 2026-03-18 16:58:20 -07:00 committed by GitHub
parent 67d707e851
commit 7b6d14e62a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 338 additions and 26 deletions

View file

@ -1441,6 +1441,12 @@ class GatewayRunner:
if canonical == "reload-mcp":
return await self._handle_reload_mcp_command(event)
if canonical == "approve":
return await self._handle_approve_command(event)
if canonical == "deny":
return await self._handle_deny_command(event)
if canonical == "update":
return await self._handle_update_command(event)
@ -1518,32 +1524,9 @@ class GatewayRunner:
except Exception as e:
logger.debug("Skill command check failed (non-fatal): %s", e)
# Check for pending exec approval responses
session_key_preview = self._session_key_for_source(source)
if session_key_preview in self._pending_approvals:
user_text = event.text.strip().lower()
if user_text in ("yes", "y", "approve", "ok", "go", "do it"):
approval = self._pending_approvals.pop(session_key_preview)
cmd = approval["command"]
pattern_keys = approval.get("pattern_keys", [])
if not pattern_keys:
pk = approval.get("pattern_key", "")
pattern_keys = [pk] if pk else []
logger.info("User approved dangerous command: %s...", cmd[:60])
from tools.terminal_tool import terminal_tool
from tools.approval import approve_session
for pk in pattern_keys:
approve_session(session_key_preview, pk)
result = terminal_tool(command=cmd, force=True)
return f"✅ Command approved and executed.\n\n```\n{result[:3500]}\n```"
elif user_text in ("no", "n", "deny", "cancel", "nope"):
self._pending_approvals.pop(session_key_preview)
return "❌ Command denied."
elif user_text in ("full", "show", "view", "show full", "view full"):
# Show full command without consuming the approval
cmd = self._pending_approvals[session_key_preview]["command"]
return f"Full command:\n\n```\n{cmd}\n```\n\nReply yes/no to approve or deny."
# If it's not clearly an approval/denial, fall through to normal processing
# Pending exec approvals are handled by /approve and /deny commands above.
# No bare text matching — "yes" in normal conversation must not trigger
# execution of a dangerous command.
# Get or create session
session_entry = self.session_store.get_or_create_session(source)
@ -2059,9 +2042,22 @@ class GatewayRunner:
# Check if the agent encountered a dangerous command needing approval
try:
from tools.approval import pop_pending
import time as _time
pending = pop_pending(session_key)
if pending:
pending["timestamp"] = _time.time()
self._pending_approvals[session_key] = pending
# Append structured instructions so the user knows how to respond
cmd_preview = pending.get("command", "")
if len(cmd_preview) > 200:
cmd_preview = cmd_preview[:200] + "..."
approval_hint = (
f"\n\n⚠️ **Dangerous command requires approval:**\n"
f"```\n{cmd_preview}\n```\n"
f"Reply `/approve` to execute, `/approve session` to approve this pattern "
f"for the session, or `/deny` to cancel."
)
response = (response or "") + approval_hint
except Exception as e:
logger.debug("Failed to check pending approvals: %s", e)
@ -3696,6 +3692,78 @@ class GatewayRunner:
logger.warning("MCP reload failed: %s", e)
return f"❌ MCP reload failed: {e}"
# ------------------------------------------------------------------
# /approve & /deny — explicit dangerous-command approval
# ------------------------------------------------------------------
_APPROVAL_TIMEOUT_SECONDS = 300 # 5 minutes
async def _handle_approve_command(self, event: MessageEvent) -> str:
"""Handle /approve command — execute a pending dangerous command.
Usage:
/approve approve and execute the pending command
/approve session approve and remember for this session
/approve always approve this pattern permanently
"""
source = event.source
session_key = self._session_key_for_source(source)
if session_key not in self._pending_approvals:
return "No pending command to approve."
import time as _time
approval = self._pending_approvals[session_key]
# Check for timeout
ts = approval.get("timestamp", 0)
if _time.time() - ts > self._APPROVAL_TIMEOUT_SECONDS:
self._pending_approvals.pop(session_key, None)
return "⚠️ Approval expired (timed out after 5 minutes). Ask the agent to try again."
self._pending_approvals.pop(session_key)
cmd = approval["command"]
pattern_keys = approval.get("pattern_keys", [])
if not pattern_keys:
pk = approval.get("pattern_key", "")
pattern_keys = [pk] if pk else []
# Determine approval scope from args
args = event.get_command_args().strip().lower()
from tools.approval import approve_session, approve_permanent
if args in ("always", "permanent", "permanently"):
for pk in pattern_keys:
approve_permanent(pk)
scope_msg = " (pattern approved permanently)"
elif args in ("session", "ses"):
for pk in pattern_keys:
approve_session(session_key, pk)
scope_msg = " (pattern approved for this session)"
else:
# One-time approval — just approve for session so the immediate
# replay works, but don't advertise it as session-wide
for pk in pattern_keys:
approve_session(session_key, pk)
scope_msg = ""
logger.info("User approved dangerous command via /approve: %s...%s", cmd[:60], scope_msg)
from tools.terminal_tool import terminal_tool
result = terminal_tool(command=cmd, force=True)
return f"✅ Command approved and executed{scope_msg}.\n\n```\n{result[:3500]}\n```"
async def _handle_deny_command(self, event: MessageEvent) -> str:
"""Handle /deny command — reject a pending dangerous command."""
source = event.source
session_key = self._session_key_for_source(source)
if session_key not in self._pending_approvals:
return "No pending command to deny."
self._pending_approvals.pop(session_key)
logger.info("User denied dangerous command via /deny")
return "❌ Command denied."
async def _handle_update_command(self, event: MessageEvent) -> str:
"""Handle /update command — update Hermes Agent to the latest version.