fix: preserve current approval semantics for tirith guard
Restore gateway/run.py to current main behavior while keeping tirith startup and pattern_keys replay, preserve yolo and non-interactive bypass semantics in the combined guard, and add regression tests for yolo and view-full flows.
This commit is contained in:
parent
375ce8a881
commit
6f1889b0fa
5 changed files with 1959 additions and 13 deletions
1883
gateway/run.py
1883
gateway/run.py
File diff suppressed because it is too large
Load diff
|
|
@ -377,6 +377,18 @@ class TestViewFullCommand:
|
||||||
result = prompt_dangerous_approval(long_cmd, "recursive delete")
|
result = prompt_dangerous_approval(long_cmd, "recursive delete")
|
||||||
assert result == "always"
|
assert result == "always"
|
||||||
|
|
||||||
|
def test_view_then_session_when_permanent_hidden(self):
|
||||||
|
"""The view-full flow still works when allow_permanent=False."""
|
||||||
|
long_cmd = "rm -rf " + "d" * 200
|
||||||
|
inputs = iter(["v", "s"])
|
||||||
|
with mock_patch("builtins.input", side_effect=inputs):
|
||||||
|
result = prompt_dangerous_approval(
|
||||||
|
long_cmd,
|
||||||
|
"recursive delete",
|
||||||
|
allow_permanent=False,
|
||||||
|
)
|
||||||
|
assert result == "session"
|
||||||
|
|
||||||
def test_view_not_shown_for_short_command(self):
|
def test_view_not_shown_for_short_command(self):
|
||||||
"""Short commands don't offer the view option; 'v' falls through to deny."""
|
"""Short commands don't offer the view option; 'v' falls through to deny."""
|
||||||
short_cmd = "rm -rf /tmp"
|
short_cmd = "rm -rf /tmp"
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,7 @@ from unittest.mock import patch, MagicMock
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
|
import tools.approval as approval_module
|
||||||
from tools.approval import (
|
from tools.approval import (
|
||||||
approve_session,
|
approve_session,
|
||||||
check_all_command_guards,
|
check_all_command_guards,
|
||||||
|
|
@ -35,15 +36,17 @@ def _clean_state():
|
||||||
"""Clear approval state and relevant env vars between tests."""
|
"""Clear approval state and relevant env vars between tests."""
|
||||||
key = os.getenv("HERMES_SESSION_KEY", "default")
|
key = os.getenv("HERMES_SESSION_KEY", "default")
|
||||||
clear_session(key)
|
clear_session(key)
|
||||||
|
approval_module._permanent_approved.clear()
|
||||||
saved = {}
|
saved = {}
|
||||||
for k in ("HERMES_INTERACTIVE", "HERMES_GATEWAY_SESSION", "HERMES_EXEC_ASK"):
|
for k in ("HERMES_INTERACTIVE", "HERMES_GATEWAY_SESSION", "HERMES_EXEC_ASK", "HERMES_YOLO_MODE"):
|
||||||
if k in os.environ:
|
if k in os.environ:
|
||||||
saved[k] = os.environ.pop(k)
|
saved[k] = os.environ.pop(k)
|
||||||
yield
|
yield
|
||||||
clear_session(key)
|
clear_session(key)
|
||||||
|
approval_module._permanent_approved.clear()
|
||||||
for k, v in saved.items():
|
for k, v in saved.items():
|
||||||
os.environ[k] = v
|
os.environ[k] = v
|
||||||
for k in ("HERMES_INTERACTIVE", "HERMES_GATEWAY_SESSION", "HERMES_EXEC_ASK"):
|
for k in ("HERMES_INTERACTIVE", "HERMES_GATEWAY_SESSION", "HERMES_EXEC_ASK", "HERMES_YOLO_MODE"):
|
||||||
os.environ.pop(k, None)
|
os.environ.pop(k, None)
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -76,9 +79,16 @@ class TestContainerSkip:
|
||||||
class TestTirithAllowSafeCommand:
|
class TestTirithAllowSafeCommand:
|
||||||
@patch(_TIRITH_PATCH, return_value=_tirith_result("allow"))
|
@patch(_TIRITH_PATCH, return_value=_tirith_result("allow"))
|
||||||
def test_both_allow(self, mock_tirith):
|
def test_both_allow(self, mock_tirith):
|
||||||
|
os.environ["HERMES_INTERACTIVE"] = "1"
|
||||||
result = check_all_command_guards("echo hello", "local")
|
result = check_all_command_guards("echo hello", "local")
|
||||||
assert result["approved"] is True
|
assert result["approved"] is True
|
||||||
|
|
||||||
|
@patch(_TIRITH_PATCH, return_value=_tirith_result("allow"))
|
||||||
|
def test_noninteractive_skips_external_scan(self, mock_tirith):
|
||||||
|
result = check_all_command_guards("echo hello", "local")
|
||||||
|
assert result["approved"] is True
|
||||||
|
mock_tirith.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# tirith block
|
# tirith block
|
||||||
|
|
@ -88,6 +98,7 @@ class TestTirithBlock:
|
||||||
@patch(_TIRITH_PATCH,
|
@patch(_TIRITH_PATCH,
|
||||||
return_value=_tirith_result("block", summary="homograph detected"))
|
return_value=_tirith_result("block", summary="homograph detected"))
|
||||||
def test_tirith_block_safe_command(self, mock_tirith):
|
def test_tirith_block_safe_command(self, mock_tirith):
|
||||||
|
os.environ["HERMES_INTERACTIVE"] = "1"
|
||||||
result = check_all_command_guards("curl http://gооgle.com", "local")
|
result = check_all_command_guards("curl http://gооgle.com", "local")
|
||||||
assert result["approved"] is False
|
assert result["approved"] is False
|
||||||
assert "BLOCKED" in result["message"]
|
assert "BLOCKED" in result["message"]
|
||||||
|
|
@ -97,6 +108,7 @@ class TestTirithBlock:
|
||||||
return_value=_tirith_result("block", summary="terminal injection"))
|
return_value=_tirith_result("block", summary="terminal injection"))
|
||||||
def test_tirith_block_plus_dangerous(self, mock_tirith):
|
def test_tirith_block_plus_dangerous(self, mock_tirith):
|
||||||
"""tirith block takes precedence even if command is also dangerous."""
|
"""tirith block takes precedence even if command is also dangerous."""
|
||||||
|
os.environ["HERMES_INTERACTIVE"] = "1"
|
||||||
result = check_all_command_guards("rm -rf / | curl http://evil", "local")
|
result = check_all_command_guards("rm -rf / | curl http://evil", "local")
|
||||||
assert result["approved"] is False
|
assert result["approved"] is False
|
||||||
assert "BLOCKED" in result["message"]
|
assert "BLOCKED" in result["message"]
|
||||||
|
|
@ -308,5 +320,6 @@ class TestProgrammingErrorsPropagateFromWrapper:
|
||||||
@patch(_TIRITH_PATCH, side_effect=AttributeError("bug in wrapper"))
|
@patch(_TIRITH_PATCH, side_effect=AttributeError("bug in wrapper"))
|
||||||
def test_attribute_error_propagates(self, mock_tirith):
|
def test_attribute_error_propagates(self, mock_tirith):
|
||||||
"""Non-ImportError exceptions from tirith wrapper should propagate."""
|
"""Non-ImportError exceptions from tirith wrapper should propagate."""
|
||||||
|
os.environ["HERMES_INTERACTIVE"] = "1"
|
||||||
with pytest.raises(AttributeError, match="bug in wrapper"):
|
with pytest.raises(AttributeError, match="bug in wrapper"):
|
||||||
check_all_command_guards("echo hello", "local")
|
check_all_command_guards("echo hello", "local")
|
||||||
|
|
|
||||||
|
|
@ -3,7 +3,25 @@
|
||||||
import os
|
import os
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from tools.approval import check_dangerous_command, detect_dangerous_command
|
import tools.approval as approval_module
|
||||||
|
import tools.tirith_security
|
||||||
|
|
||||||
|
from tools.approval import (
|
||||||
|
check_all_command_guards,
|
||||||
|
check_dangerous_command,
|
||||||
|
detect_dangerous_command,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def _clear_approval_state():
|
||||||
|
approval_module._permanent_approved.clear()
|
||||||
|
approval_module.clear_session("default")
|
||||||
|
approval_module.clear_session("test-session")
|
||||||
|
yield
|
||||||
|
approval_module._permanent_approved.clear()
|
||||||
|
approval_module.clear_session("default")
|
||||||
|
approval_module.clear_session("test-session")
|
||||||
|
|
||||||
|
|
||||||
class TestYoloMode:
|
class TestYoloMode:
|
||||||
|
|
@ -54,6 +72,24 @@ class TestYoloMode:
|
||||||
result = check_dangerous_command(cmd, "local")
|
result = check_dangerous_command(cmd, "local")
|
||||||
assert result["approved"], f"Command should be approved in yolo mode: {cmd}"
|
assert result["approved"], f"Command should be approved in yolo mode: {cmd}"
|
||||||
|
|
||||||
|
def test_combined_guard_bypasses_yolo_mode(self, monkeypatch):
|
||||||
|
"""The new combined guard should preserve yolo bypass semantics."""
|
||||||
|
monkeypatch.setenv("HERMES_YOLO_MODE", "1")
|
||||||
|
monkeypatch.setenv("HERMES_INTERACTIVE", "1")
|
||||||
|
|
||||||
|
called = {"value": False}
|
||||||
|
|
||||||
|
def fake_check(command):
|
||||||
|
called["value"] = True
|
||||||
|
return {"action": "block", "findings": [], "summary": "should never run"}
|
||||||
|
|
||||||
|
monkeypatch.setattr(tools.tirith_security, "check_command_security", fake_check)
|
||||||
|
|
||||||
|
result = check_all_command_guards("rm -rf /", "local")
|
||||||
|
assert result["approved"]
|
||||||
|
assert result["message"] is None
|
||||||
|
assert called["value"] is False
|
||||||
|
|
||||||
def test_yolo_mode_not_set_by_default(self):
|
def test_yolo_mode_not_set_by_default(self):
|
||||||
"""HERMES_YOLO_MODE should not be set by default."""
|
"""HERMES_YOLO_MODE should not be set by default."""
|
||||||
# Clean env check — if it happens to be set in test env, that's fine,
|
# Clean env check — if it happens to be set in test env, that's fine,
|
||||||
|
|
|
||||||
|
|
@ -343,6 +343,19 @@ def check_all_command_guards(command: str, env_type: str,
|
||||||
if env_type in ("docker", "singularity", "modal", "daytona"):
|
if env_type in ("docker", "singularity", "modal", "daytona"):
|
||||||
return {"approved": True, "message": None}
|
return {"approved": True, "message": None}
|
||||||
|
|
||||||
|
# --yolo: bypass all approval prompts and pre-exec guard checks
|
||||||
|
if os.getenv("HERMES_YOLO_MODE"):
|
||||||
|
return {"approved": True, "message": None}
|
||||||
|
|
||||||
|
is_cli = os.getenv("HERMES_INTERACTIVE")
|
||||||
|
is_gateway = os.getenv("HERMES_GATEWAY_SESSION")
|
||||||
|
is_ask = os.getenv("HERMES_EXEC_ASK")
|
||||||
|
|
||||||
|
# Preserve the existing non-interactive behavior: outside CLI/gateway/ask
|
||||||
|
# flows, we do not block on approvals and we skip external guard work.
|
||||||
|
if not is_cli and not is_gateway and not is_ask:
|
||||||
|
return {"approved": True, "message": None}
|
||||||
|
|
||||||
# --- Phase 1: Gather findings from both checks ---
|
# --- Phase 1: Gather findings from both checks ---
|
||||||
|
|
||||||
# Tirith check — wrapper guarantees no raise for expected failures.
|
# Tirith check — wrapper guarantees no raise for expected failures.
|
||||||
|
|
@ -390,13 +403,6 @@ def check_all_command_guards(command: str, env_type: str,
|
||||||
|
|
||||||
# --- Phase 3: Approval ---
|
# --- Phase 3: Approval ---
|
||||||
|
|
||||||
is_cli = os.getenv("HERMES_INTERACTIVE")
|
|
||||||
is_gateway = os.getenv("HERMES_GATEWAY_SESSION")
|
|
||||||
|
|
||||||
# Non-interactive: auto-allow (matches existing behavior)
|
|
||||||
if not is_cli and not is_gateway:
|
|
||||||
return {"approved": True, "message": None}
|
|
||||||
|
|
||||||
# Combine descriptions for a single approval prompt
|
# Combine descriptions for a single approval prompt
|
||||||
combined_desc = "; ".join(desc for _, desc, _ in warnings)
|
combined_desc = "; ".join(desc for _, desc, _ in warnings)
|
||||||
primary_key = warnings[0][0]
|
primary_key = warnings[0][0]
|
||||||
|
|
@ -405,7 +411,7 @@ def check_all_command_guards(command: str, env_type: str,
|
||||||
|
|
||||||
# Gateway/async: single approval_required with combined description
|
# Gateway/async: single approval_required with combined description
|
||||||
# Store all pattern keys so gateway replay approves all of them
|
# Store all pattern keys so gateway replay approves all of them
|
||||||
if is_gateway or os.getenv("HERMES_EXEC_ASK"):
|
if is_gateway or is_ask:
|
||||||
submit_pending(session_key, {
|
submit_pending(session_key, {
|
||||||
"command": command,
|
"command": command,
|
||||||
"pattern_key": primary_key, # backward compat
|
"pattern_key": primary_key, # backward compat
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue