Merge pull request #1419 from NousResearch/fix/1264-env-secret-blocklist

fix(security): block gateway and tool env vars in subprocesses
This commit is contained in:
Teknium 2026-03-15 03:22:58 -07:00 committed by GitHub
commit 0de200cf4d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 215 additions and 27 deletions

View file

@ -1,10 +1,11 @@
"""Tests for provider env var blocklist in LocalEnvironment. """Tests for subprocess env sanitization in LocalEnvironment.
Verifies that Hermes-internal provider env vars (OPENAI_BASE_URL, etc.) Verifies that Hermes-managed provider, tool, and gateway env vars are
are stripped from subprocess environments so external CLIs are not stripped from subprocess environments so external CLIs are not silently
silently misrouted. misrouted or handed Hermes secrets.
See: https://github.com/NousResearch/hermes-agent/issues/1002 See: https://github.com/NousResearch/hermes-agent/issues/1002
See: https://github.com/NousResearch/hermes-agent/issues/1264
""" """
import os import os
@ -110,6 +111,30 @@ class TestProviderEnvBlocklist:
for var in extra_provider_vars: for var in extra_provider_vars:
assert var not in result_env, f"{var} leaked into subprocess env" assert var not in result_env, f"{var} leaked into subprocess env"
def test_tool_and_gateway_vars_are_stripped(self):
"""Tool and gateway secrets/config must not leak into subprocess env."""
leaked_vars = {
"TELEGRAM_BOT_TOKEN": "bot-token",
"TELEGRAM_HOME_CHANNEL": "12345",
"DISCORD_HOME_CHANNEL": "67890",
"SLACK_APP_TOKEN": "xapp-secret",
"WHATSAPP_ALLOWED_USERS": "+15555550123",
"SIGNAL_ACCOUNT": "+15555550124",
"HASS_TOKEN": "ha-secret",
"EMAIL_PASSWORD": "email-secret",
"FIRECRAWL_API_KEY": "fc-secret",
"BROWSERBASE_PROJECT_ID": "bb-project",
"ELEVENLABS_API_KEY": "el-secret",
"GITHUB_TOKEN": "ghp_secret",
"GH_TOKEN": "gh_alias_secret",
"GATEWAY_ALLOW_ALL_USERS": "true",
"GATEWAY_ALLOWED_USERS": "alice,bob",
}
result_env = _run_with_env(extra_os_env=leaked_vars)
for var in leaked_vars:
assert var not in result_env, f"{var} leaked into subprocess env"
def test_safe_vars_are_preserved(self): def test_safe_vars_are_preserved(self):
"""Standard env vars (PATH, HOME, USER) must still be passed through.""" """Standard env vars (PATH, HOME, USER) must still be passed through."""
result_env = _run_with_env() result_env = _run_with_env()
@ -205,3 +230,56 @@ class TestBlocklistCoverage:
"HELICONE_API_KEY", "HELICONE_API_KEY",
} }
assert extras.issubset(_HERMES_PROVIDER_ENV_BLOCKLIST) assert extras.issubset(_HERMES_PROVIDER_ENV_BLOCKLIST)
def test_optional_tool_and_messaging_vars_are_in_blocklist(self):
"""Tool/messaging vars from OPTIONAL_ENV_VARS should stay covered."""
from hermes_cli.config import OPTIONAL_ENV_VARS
for name, metadata in OPTIONAL_ENV_VARS.items():
category = metadata.get("category")
if category in {"tool", "messaging"}:
assert name in _HERMES_PROVIDER_ENV_BLOCKLIST, (
f"Optional env var {name} (category={category}) missing from blocklist"
)
elif category == "setting" and metadata.get("password"):
assert name in _HERMES_PROVIDER_ENV_BLOCKLIST, (
f"Secret setting env var {name} missing from blocklist"
)
def test_gateway_runtime_vars_are_in_blocklist(self):
extras = {
"TELEGRAM_HOME_CHANNEL",
"TELEGRAM_HOME_CHANNEL_NAME",
"DISCORD_HOME_CHANNEL",
"DISCORD_HOME_CHANNEL_NAME",
"DISCORD_REQUIRE_MENTION",
"DISCORD_FREE_RESPONSE_CHANNELS",
"DISCORD_AUTO_THREAD",
"SLACK_HOME_CHANNEL",
"SLACK_HOME_CHANNEL_NAME",
"SLACK_ALLOWED_USERS",
"WHATSAPP_ENABLED",
"WHATSAPP_MODE",
"WHATSAPP_ALLOWED_USERS",
"SIGNAL_HTTP_URL",
"SIGNAL_ACCOUNT",
"SIGNAL_ALLOWED_USERS",
"SIGNAL_GROUP_ALLOWED_USERS",
"SIGNAL_HOME_CHANNEL",
"SIGNAL_HOME_CHANNEL_NAME",
"SIGNAL_IGNORE_STORIES",
"HASS_TOKEN",
"HASS_URL",
"EMAIL_ADDRESS",
"EMAIL_PASSWORD",
"EMAIL_IMAP_HOST",
"EMAIL_SMTP_HOST",
"EMAIL_HOME_ADDRESS",
"EMAIL_HOME_ADDRESS_NAME",
"GATEWAY_ALLOWED_USERS",
"GH_TOKEN",
"GITHUB_APP_ID",
"GITHUB_APP_PRIVATE_KEY_PATH",
"GITHUB_APP_INSTALLATION_ID",
}
assert extras.issubset(_HERMES_PROVIDER_ENV_BLOCKLIST)

View file

@ -1,11 +1,13 @@
"""Tests for tools/process_registry.py — ProcessRegistry query methods, pruning, checkpoint.""" """Tests for tools/process_registry.py — ProcessRegistry query methods, pruning, checkpoint."""
import json import json
import os
import time import time
import pytest import pytest
from pathlib import Path from pathlib import Path
from unittest.mock import MagicMock, patch from unittest.mock import MagicMock, patch
from tools.environments.local import _HERMES_PROVIDER_ENV_FORCE_PREFIX
from tools.process_registry import ( from tools.process_registry import (
ProcessRegistry, ProcessRegistry,
ProcessSession, ProcessSession,
@ -213,6 +215,54 @@ class TestPruning:
assert total <= MAX_PROCESSES assert total <= MAX_PROCESSES
# =========================================================================
# Spawn env sanitization
# =========================================================================
class TestSpawnEnvSanitization:
def test_spawn_local_strips_blocked_vars_from_background_env(self, registry):
captured = {}
def fake_popen(cmd, **kwargs):
captured["env"] = kwargs["env"]
proc = MagicMock()
proc.pid = 4321
proc.stdout = iter([])
proc.stdin = MagicMock()
proc.poll.return_value = None
return proc
fake_thread = MagicMock()
with patch.dict(os.environ, {
"PATH": "/usr/bin:/bin",
"HOME": "/home/user",
"USER": "tester",
"TELEGRAM_BOT_TOKEN": "bot-secret",
"FIRECRAWL_API_KEY": "fc-secret",
}, clear=True), \
patch("tools.process_registry._find_shell", return_value="/bin/bash"), \
patch("subprocess.Popen", side_effect=fake_popen), \
patch("threading.Thread", return_value=fake_thread), \
patch.object(registry, "_write_checkpoint"):
registry.spawn_local(
"echo hello",
cwd="/tmp",
env_vars={
"MY_CUSTOM_VAR": "keep-me",
"TELEGRAM_BOT_TOKEN": "drop-me",
f"{_HERMES_PROVIDER_ENV_FORCE_PREFIX}TELEGRAM_BOT_TOKEN": "forced-bot-token",
},
)
env = captured["env"]
assert env["MY_CUSTOM_VAR"] == "keep-me"
assert env["TELEGRAM_BOT_TOKEN"] == "forced-bot-token"
assert "FIRECRAWL_API_KEY" not in env
assert f"{_HERMES_PROVIDER_ENV_FORCE_PREFIX}TELEGRAM_BOT_TOKEN" not in env
assert env["PYTHONUNBUFFERED"] == "1"
# ========================================================================= # =========================================================================
# Checkpoint # Checkpoint
# ========================================================================= # =========================================================================

View file

@ -27,11 +27,12 @@ _HERMES_PROVIDER_ENV_FORCE_PREFIX = "_HERMES_FORCE_"
def _build_provider_env_blocklist() -> frozenset: def _build_provider_env_blocklist() -> frozenset:
"""Derive the blocklist from the provider registry + known extras. """Derive the blocklist from provider, tool, and gateway config.
Automatically picks up api_key_env_vars and base_url_env_var from Automatically picks up api_key_env_vars and base_url_env_var from
every registered provider, so adding a new provider to auth.py is every registered provider, plus tool/messaging env vars from the
enough no manual list to keep in sync. optional config registry, so new Hermes-managed secrets are blocked
in subprocesses without having to maintain multiple static lists.
""" """
blocked: set[str] = set() blocked: set[str] = set()
@ -44,7 +45,18 @@ def _build_provider_env_blocklist() -> frozenset:
except ImportError: except ImportError:
pass pass
# Vars not in the registry but still Hermes-internal / conflict-prone try:
from hermes_cli.config import OPTIONAL_ENV_VARS
for name, metadata in OPTIONAL_ENV_VARS.items():
category = metadata.get("category")
if category in {"tool", "messaging"}:
blocked.add(name)
elif category == "setting" and metadata.get("password"):
blocked.add(name)
except ImportError:
pass
# Vars not covered above but still Hermes-internal / conflict-prone.
blocked.update({ blocked.update({
"OPENAI_BASE_URL", "OPENAI_BASE_URL",
"OPENAI_API_KEY", "OPENAI_API_KEY",
@ -67,6 +79,41 @@ def _build_provider_env_blocklist() -> frozenset:
"FIREWORKS_API_KEY", # Fireworks AI "FIREWORKS_API_KEY", # Fireworks AI
"XAI_API_KEY", # xAI (Grok) "XAI_API_KEY", # xAI (Grok)
"HELICONE_API_KEY", # LLM Observability proxy "HELICONE_API_KEY", # LLM Observability proxy
# Gateway/runtime config not represented in OPTIONAL_ENV_VARS.
"TELEGRAM_HOME_CHANNEL",
"TELEGRAM_HOME_CHANNEL_NAME",
"DISCORD_HOME_CHANNEL",
"DISCORD_HOME_CHANNEL_NAME",
"DISCORD_REQUIRE_MENTION",
"DISCORD_FREE_RESPONSE_CHANNELS",
"DISCORD_AUTO_THREAD",
"SLACK_HOME_CHANNEL",
"SLACK_HOME_CHANNEL_NAME",
"SLACK_ALLOWED_USERS",
"WHATSAPP_ENABLED",
"WHATSAPP_MODE",
"WHATSAPP_ALLOWED_USERS",
"SIGNAL_HTTP_URL",
"SIGNAL_ACCOUNT",
"SIGNAL_ALLOWED_USERS",
"SIGNAL_GROUP_ALLOWED_USERS",
"SIGNAL_HOME_CHANNEL",
"SIGNAL_HOME_CHANNEL_NAME",
"SIGNAL_IGNORE_STORIES",
"HASS_TOKEN",
"HASS_URL",
"EMAIL_ADDRESS",
"EMAIL_PASSWORD",
"EMAIL_IMAP_HOST",
"EMAIL_SMTP_HOST",
"EMAIL_HOME_ADDRESS",
"EMAIL_HOME_ADDRESS_NAME",
"GATEWAY_ALLOWED_USERS",
# Skills Hub / GitHub app auth paths and aliases.
"GH_TOKEN",
"GITHUB_APP_ID",
"GITHUB_APP_PRIVATE_KEY_PATH",
"GITHUB_APP_INSTALLATION_ID",
}) })
return frozenset(blocked) return frozenset(blocked)
@ -74,6 +121,30 @@ def _build_provider_env_blocklist() -> frozenset:
_HERMES_PROVIDER_ENV_BLOCKLIST = _build_provider_env_blocklist() _HERMES_PROVIDER_ENV_BLOCKLIST = _build_provider_env_blocklist()
def _sanitize_subprocess_env(base_env: dict | None, extra_env: dict | None = None) -> dict:
"""Filter Hermes-managed secrets from a subprocess environment.
`_HERMES_FORCE_<VAR>` entries in ``extra_env`` opt a blocked variable back in
intentionally for callers that truly need it.
"""
sanitized: dict[str, str] = {}
for key, value in (base_env or {}).items():
if key.startswith(_HERMES_PROVIDER_ENV_FORCE_PREFIX):
continue
if key not in _HERMES_PROVIDER_ENV_BLOCKLIST:
sanitized[key] = value
for key, value in (extra_env or {}).items():
if key.startswith(_HERMES_PROVIDER_ENV_FORCE_PREFIX):
real_key = key[len(_HERMES_PROVIDER_ENV_FORCE_PREFIX):]
sanitized[real_key] = value
elif key not in _HERMES_PROVIDER_ENV_BLOCKLIST:
sanitized[key] = value
return sanitized
def _find_bash() -> str: def _find_bash() -> str:
"""Find bash for command execution. """Find bash for command execution.
@ -249,18 +320,11 @@ class LocalEnvironment(BaseEnvironment):
# Ensure PATH always includes standard dirs — systemd services # Ensure PATH always includes standard dirs — systemd services
# and some terminal multiplexers inherit a minimal PATH. # and some terminal multiplexers inherit a minimal PATH.
_SANE_PATH = "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin" _SANE_PATH = "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
# Strip Hermes-internal provider vars so external CLIs # Strip Hermes-managed provider/tool/gateway vars so external CLIs
# (e.g. codex) are not silently misrouted. Callers that # are not silently misrouted or handed Hermes secrets. Callers that
# truly need a blocked var can opt in by prefixing the key # truly need a blocked var can opt in by prefixing the key with
# with _HERMES_FORCE_ in self.env (e.g. _HERMES_FORCE_OPENAI_API_KEY). # _HERMES_FORCE_ in self.env (e.g. _HERMES_FORCE_OPENAI_API_KEY).
merged = dict(os.environ | self.env) run_env = _sanitize_subprocess_env(os.environ, self.env)
run_env = {}
for k, v in merged.items():
if k.startswith(_HERMES_PROVIDER_ENV_FORCE_PREFIX):
real_key = k[len(_HERMES_PROVIDER_ENV_FORCE_PREFIX):]
run_env[real_key] = v
elif k not in _HERMES_PROVIDER_ENV_BLOCKLIST:
run_env[k] = v
existing_path = run_env.get("PATH", "") existing_path = run_env.get("PATH", "")
if "/usr/bin" not in existing_path.split(":"): if "/usr/bin" not in existing_path.split(":"):
run_env["PATH"] = f"{existing_path}:{_SANE_PATH}" if existing_path else _SANE_PATH run_env["PATH"] = f"{existing_path}:{_SANE_PATH}" if existing_path else _SANE_PATH

View file

@ -42,7 +42,7 @@ import time
import uuid import uuid
_IS_WINDOWS = platform.system() == "Windows" _IS_WINDOWS = platform.system() == "Windows"
from tools.environments.local import _find_shell, _HERMES_PROVIDER_ENV_BLOCKLIST from tools.environments.local import _find_shell, _sanitize_subprocess_env
from dataclasses import dataclass, field from dataclasses import dataclass, field
from pathlib import Path from pathlib import Path
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional
@ -155,9 +155,7 @@ class ProcessRegistry:
else: else:
from ptyprocess import PtyProcess as _PtyProcessCls from ptyprocess import PtyProcess as _PtyProcessCls
user_shell = _find_shell() user_shell = _find_shell()
pty_env = {k: v for k, v in os.environ.items() pty_env = _sanitize_subprocess_env(os.environ, env_vars)
if k not in _HERMES_PROVIDER_ENV_BLOCKLIST}
pty_env.update(env_vars or {})
pty_env["PYTHONUNBUFFERED"] = "1" pty_env["PYTHONUNBUFFERED"] = "1"
pty_proc = _PtyProcessCls.spawn( pty_proc = _PtyProcessCls.spawn(
[user_shell, "-lic", command], [user_shell, "-lic", command],
@ -198,9 +196,7 @@ class ProcessRegistry:
# Force unbuffered output for Python scripts so progress is visible # Force unbuffered output for Python scripts so progress is visible
# during background execution (libraries like tqdm/datasets buffer when # during background execution (libraries like tqdm/datasets buffer when
# stdout is a pipe, hiding output from process(action="poll")). # stdout is a pipe, hiding output from process(action="poll")).
bg_env = {k: v for k, v in os.environ.items() bg_env = _sanitize_subprocess_env(os.environ, env_vars)
if k not in _HERMES_PROVIDER_ENV_BLOCKLIST}
bg_env.update(env_vars or {})
bg_env["PYTHONUNBUFFERED"] = "1" bg_env["PYTHONUNBUFFERED"] = "1"
proc = subprocess.Popen( proc = subprocess.Popen(
[user_shell, "-lic", command], [user_shell, "-lic", command],