Merge remote-tracking branch 'origin/main' into feature/homeassistant-integration

# Conflicts:
#	run_agent.py
This commit is contained in:
0xbyt4 2026-03-01 11:59:12 +03:00
commit 3fdf03390e
50 changed files with 7354 additions and 358 deletions

View file

@ -0,0 +1,168 @@
"""Tests for agent.auxiliary_client resolution chain, especially the Codex fallback."""
import json
import os
from pathlib import Path
from unittest.mock import patch, MagicMock
import pytest
from agent.auxiliary_client import (
get_text_auxiliary_client,
get_vision_auxiliary_client,
auxiliary_max_tokens_param,
_read_codex_access_token,
)
@pytest.fixture(autouse=True)
def _clean_env(monkeypatch):
"""Strip provider env vars so each test starts clean."""
for key in (
"OPENROUTER_API_KEY", "OPENAI_BASE_URL", "OPENAI_API_KEY",
"OPENAI_MODEL", "LLM_MODEL", "NOUS_INFERENCE_BASE_URL",
):
monkeypatch.delenv(key, raising=False)
@pytest.fixture
def codex_auth_dir(tmp_path, monkeypatch):
"""Provide a writable ~/.codex/ directory with a valid auth.json."""
codex_dir = tmp_path / ".codex"
codex_dir.mkdir()
auth_file = codex_dir / "auth.json"
auth_file.write_text(json.dumps({
"tokens": {
"access_token": "codex-test-token-abc123",
"refresh_token": "codex-refresh-xyz",
}
}))
monkeypatch.setattr(
"agent.auxiliary_client._read_codex_access_token",
lambda: "codex-test-token-abc123",
)
return codex_dir
class TestReadCodexAccessToken:
def test_valid_auth_file(self, tmp_path):
codex_dir = tmp_path / ".codex"
codex_dir.mkdir()
auth = codex_dir / "auth.json"
auth.write_text(json.dumps({
"tokens": {"access_token": "tok-123", "refresh_token": "r-456"}
}))
with patch("agent.auxiliary_client.Path.home", return_value=tmp_path):
result = _read_codex_access_token()
assert result == "tok-123"
def test_missing_file_returns_none(self, tmp_path):
with patch("agent.auxiliary_client.Path.home", return_value=tmp_path):
result = _read_codex_access_token()
assert result is None
def test_empty_token_returns_none(self, tmp_path):
codex_dir = tmp_path / ".codex"
codex_dir.mkdir()
auth = codex_dir / "auth.json"
auth.write_text(json.dumps({"tokens": {"access_token": " "}}))
with patch("agent.auxiliary_client.Path.home", return_value=tmp_path):
result = _read_codex_access_token()
assert result is None
def test_malformed_json_returns_none(self, tmp_path):
codex_dir = tmp_path / ".codex"
codex_dir.mkdir()
(codex_dir / "auth.json").write_text("{bad json")
with patch("agent.auxiliary_client.Path.home", return_value=tmp_path):
result = _read_codex_access_token()
assert result is None
def test_missing_tokens_key_returns_none(self, tmp_path):
codex_dir = tmp_path / ".codex"
codex_dir.mkdir()
(codex_dir / "auth.json").write_text(json.dumps({"other": "data"}))
with patch("agent.auxiliary_client.Path.home", return_value=tmp_path):
result = _read_codex_access_token()
assert result is None
class TestGetTextAuxiliaryClient:
"""Test the full resolution chain for get_text_auxiliary_client."""
def test_openrouter_takes_priority(self, monkeypatch, codex_auth_dir):
monkeypatch.setenv("OPENROUTER_API_KEY", "or-key")
with patch("agent.auxiliary_client.OpenAI") as mock_openai:
client, model = get_text_auxiliary_client()
assert model == "google/gemini-3-flash-preview"
mock_openai.assert_called_once()
call_kwargs = mock_openai.call_args
assert call_kwargs.kwargs["api_key"] == "or-key"
def test_nous_takes_priority_over_codex(self, monkeypatch, codex_auth_dir):
with patch("agent.auxiliary_client._read_nous_auth") as mock_nous, \
patch("agent.auxiliary_client.OpenAI") as mock_openai:
mock_nous.return_value = {"access_token": "nous-tok"}
client, model = get_text_auxiliary_client()
assert model == "gemini-3-flash"
def test_custom_endpoint_over_codex(self, monkeypatch, codex_auth_dir):
monkeypatch.setenv("OPENAI_BASE_URL", "http://localhost:1234/v1")
monkeypatch.setenv("OPENAI_API_KEY", "lm-studio-key")
# Override the autouse monkeypatch for codex
monkeypatch.setattr(
"agent.auxiliary_client._read_codex_access_token",
lambda: "codex-test-token-abc123",
)
with patch("agent.auxiliary_client._read_nous_auth", return_value=None), \
patch("agent.auxiliary_client.OpenAI") as mock_openai:
client, model = get_text_auxiliary_client()
assert model == "gpt-4o-mini"
call_kwargs = mock_openai.call_args
assert call_kwargs.kwargs["base_url"] == "http://localhost:1234/v1"
def test_codex_fallback_when_nothing_else(self, codex_auth_dir):
with patch("agent.auxiliary_client._read_nous_auth", return_value=None), \
patch("agent.auxiliary_client.OpenAI") as mock_openai:
client, model = get_text_auxiliary_client()
assert model == "gpt-5.3-codex"
# Returns a CodexAuxiliaryClient wrapper, not a raw OpenAI client
from agent.auxiliary_client import CodexAuxiliaryClient
assert isinstance(client, CodexAuxiliaryClient)
def test_returns_none_when_nothing_available(self):
with patch("agent.auxiliary_client._read_nous_auth", return_value=None), \
patch("agent.auxiliary_client._read_codex_access_token", return_value=None):
client, model = get_text_auxiliary_client()
assert client is None
assert model is None
class TestCodexNotInVisionClient:
"""Codex fallback should NOT apply to vision tasks."""
def test_vision_returns_none_without_openrouter_nous(self):
with patch("agent.auxiliary_client._read_nous_auth", return_value=None):
client, model = get_vision_auxiliary_client()
assert client is None
assert model is None
class TestAuxiliaryMaxTokensParam:
def test_codex_fallback_uses_max_tokens(self, monkeypatch):
"""Codex adapter translates max_tokens internally, so we return max_tokens."""
with patch("agent.auxiliary_client._read_nous_auth", return_value=None), \
patch("agent.auxiliary_client._read_codex_access_token", return_value="tok"):
result = auxiliary_max_tokens_param(1024)
assert result == {"max_tokens": 1024}
def test_openrouter_uses_max_tokens(self, monkeypatch):
monkeypatch.setenv("OPENROUTER_API_KEY", "or-key")
result = auxiliary_max_tokens_param(1024)
assert result == {"max_tokens": 1024}
def test_no_provider_uses_max_tokens(self):
with patch("agent.auxiliary_client._read_nous_auth", return_value=None), \
patch("agent.auxiliary_client._read_codex_access_token", return_value=None):
result = auxiliary_max_tokens_param(1024)
assert result == {"max_tokens": 1024}

173
tests/agent/test_redact.py Normal file
View file

@ -0,0 +1,173 @@
"""Tests for agent.redact -- secret masking in logs and output."""
import logging
import pytest
from agent.redact import redact_sensitive_text, RedactingFormatter
class TestKnownPrefixes:
def test_openai_sk_key(self):
text = "Using key sk-proj-abc123def456ghi789jkl012"
result = redact_sensitive_text(text)
assert "sk-pro" in result
assert "abc123def456" not in result
assert "..." in result
def test_openrouter_sk_key(self):
text = "OPENROUTER_API_KEY=sk-or-v1-abcdefghijklmnopqrstuvwxyz1234567890"
result = redact_sensitive_text(text)
assert "abcdefghijklmnop" not in result
def test_github_pat_classic(self):
result = redact_sensitive_text("token: ghp_abc123def456ghi789jkl")
assert "abc123def456" not in result
def test_github_pat_fine_grained(self):
result = redact_sensitive_text("github_pat_abc123def456ghi789jklmno")
assert "abc123def456" not in result
def test_slack_token(self):
token = "xoxb-" + "0" * 12 + "-" + "a" * 14
result = redact_sensitive_text(token)
assert "a" * 14 not in result
def test_google_api_key(self):
result = redact_sensitive_text("AIzaSyB-abc123def456ghi789jklmno012345")
assert "abc123def456" not in result
def test_perplexity_key(self):
result = redact_sensitive_text("pplx-abcdef123456789012345")
assert "abcdef12345" not in result
def test_fal_key(self):
result = redact_sensitive_text("fal_abc123def456ghi789jkl")
assert "abc123def456" not in result
def test_short_token_fully_masked(self):
result = redact_sensitive_text("key=sk-short1234567")
assert "***" in result
class TestEnvAssignments:
def test_export_api_key(self):
text = "export OPENAI_API_KEY=sk-proj-abc123def456ghi789jkl012"
result = redact_sensitive_text(text)
assert "OPENAI_API_KEY=" in result
assert "abc123def456" not in result
def test_quoted_value(self):
text = 'MY_SECRET_TOKEN="supersecretvalue123456789"'
result = redact_sensitive_text(text)
assert "MY_SECRET_TOKEN=" in result
assert "supersecretvalue" not in result
def test_non_secret_env_unchanged(self):
text = "HOME=/home/user"
result = redact_sensitive_text(text)
assert result == text
def test_path_unchanged(self):
text = "PATH=/usr/local/bin:/usr/bin"
result = redact_sensitive_text(text)
assert result == text
class TestJsonFields:
def test_json_api_key(self):
text = '{"apiKey": "sk-proj-abc123def456ghi789jkl012"}'
result = redact_sensitive_text(text)
assert "abc123def456" not in result
def test_json_token(self):
text = '{"access_token": "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.longtoken.here"}'
result = redact_sensitive_text(text)
assert "eyJhbGciOiJSUzI1NiIs" not in result
def test_json_non_secret_unchanged(self):
text = '{"name": "John", "model": "gpt-4"}'
result = redact_sensitive_text(text)
assert result == text
class TestAuthHeaders:
def test_bearer_token(self):
text = "Authorization: Bearer sk-proj-abc123def456ghi789jkl012"
result = redact_sensitive_text(text)
assert "Authorization: Bearer" in result
assert "abc123def456" not in result
def test_case_insensitive(self):
text = "authorization: bearer mytoken123456789012345678"
result = redact_sensitive_text(text)
assert "mytoken12345" not in result
class TestTelegramTokens:
def test_bot_token(self):
text = "bot123456789:ABCDEfghij-KLMNopqrst_UVWXyz12345"
result = redact_sensitive_text(text)
assert "ABCDEfghij" not in result
assert "123456789:***" in result
def test_raw_token(self):
text = "12345678901:ABCDEfghijKLMNopqrstUVWXyz1234567890"
result = redact_sensitive_text(text)
assert "ABCDEfghij" not in result
class TestPassthrough:
def test_empty_string(self):
assert redact_sensitive_text("") == ""
def test_none_returns_none(self):
assert redact_sensitive_text(None) is None
def test_normal_text_unchanged(self):
text = "Hello world, this is a normal log message with no secrets."
assert redact_sensitive_text(text) == text
def test_code_unchanged(self):
text = "def main():\n print('hello')\n return 42"
assert redact_sensitive_text(text) == text
def test_url_without_key_unchanged(self):
text = "Connecting to https://api.openai.com/v1/chat/completions"
assert redact_sensitive_text(text) == text
class TestRedactingFormatter:
def test_formats_and_redacts(self):
formatter = RedactingFormatter("%(message)s")
record = logging.LogRecord(
name="test", level=logging.INFO, pathname="", lineno=0,
msg="Key is sk-proj-abc123def456ghi789jkl012",
args=(), exc_info=None,
)
result = formatter.format(record)
assert "abc123def456" not in result
assert "sk-pro" in result
class TestPrintenvSimulation:
"""Simulate what happens when the agent runs `env` or `printenv`."""
def test_full_env_dump(self):
env_dump = """HOME=/home/user
PATH=/usr/local/bin:/usr/bin
OPENAI_API_KEY=sk-proj-abc123def456ghi789jkl012mno345
OPENROUTER_API_KEY=sk-or-v1-reallyLongSecretKeyValue12345678
FIRECRAWL_API_KEY=fc-shortkey123456789012
TELEGRAM_BOT_TOKEN=bot987654321:ABCDEfghij-KLMNopqrst_UVWXyz12345
SHELL=/bin/bash
USER=teknium"""
result = redact_sensitive_text(env_dump)
# Secrets should be masked
assert "abc123def456" not in result
assert "reallyLongSecretKey" not in result
assert "ABCDEfghij" not in result
# Non-secrets should survive
assert "HOME=/home/user" in result
assert "SHELL=/bin/bash" in result
assert "USER=teknium" in result

View file

@ -0,0 +1,374 @@
"""
Tests for subagent progress relay (issue #169).
Verifies that:
- KawaiiSpinner.print_above() works with and without active spinner
- _build_child_progress_callback handles CLI/gateway/no-display paths
- Thinking events are relayed correctly
- Parallel callbacks don't share state
"""
import io
import sys
import time
import threading
import pytest
from unittest.mock import MagicMock, patch
from agent.display import KawaiiSpinner
from tools.delegate_tool import _build_child_progress_callback
# =========================================================================
# KawaiiSpinner.print_above tests
# =========================================================================
class TestPrintAbove:
"""Tests for KawaiiSpinner.print_above method."""
def test_print_above_without_spinner_running(self):
"""print_above should write to stdout even when spinner is not running."""
buf = io.StringIO()
spinner = KawaiiSpinner("test")
spinner._out = buf # Redirect to buffer
spinner.print_above("hello world")
output = buf.getvalue()
assert "hello world" in output
def test_print_above_with_spinner_running(self):
"""print_above should clear spinner line and print text."""
buf = io.StringIO()
spinner = KawaiiSpinner("test")
spinner._out = buf
spinner.running = True # Pretend spinner is running (don't start thread)
spinner.print_above("tool line")
output = buf.getvalue()
assert "tool line" in output
assert "\r" in output # Should start with carriage return to clear spinner line
def test_print_above_uses_captured_stdout(self):
"""print_above should use self._out, not sys.stdout.
This ensures it works inside redirect_stdout(devnull)."""
buf = io.StringIO()
spinner = KawaiiSpinner("test")
spinner._out = buf
# Simulate redirect_stdout(devnull)
old_stdout = sys.stdout
sys.stdout = io.StringIO()
try:
spinner.print_above("should go to buf")
finally:
sys.stdout = old_stdout
assert "should go to buf" in buf.getvalue()
# =========================================================================
# _build_child_progress_callback tests
# =========================================================================
class TestBuildChildProgressCallback:
"""Tests for child progress callback builder."""
def test_returns_none_when_no_display(self):
"""Should return None when parent has no spinner or callback."""
parent = MagicMock()
parent._delegate_spinner = None
parent.tool_progress_callback = None
cb = _build_child_progress_callback(0, parent)
assert cb is None
def test_cli_spinner_tool_event(self):
"""Should print tool line above spinner for CLI path."""
buf = io.StringIO()
spinner = KawaiiSpinner("delegating")
spinner._out = buf
spinner.running = True
parent = MagicMock()
parent._delegate_spinner = spinner
parent.tool_progress_callback = None
cb = _build_child_progress_callback(0, parent)
assert cb is not None
cb("web_search", "quantum computing")
output = buf.getvalue()
assert "web_search" in output
assert "quantum computing" in output
assert "├─" in output
def test_cli_spinner_thinking_event(self):
"""Should print thinking line above spinner for CLI path."""
buf = io.StringIO()
spinner = KawaiiSpinner("delegating")
spinner._out = buf
spinner.running = True
parent = MagicMock()
parent._delegate_spinner = spinner
parent.tool_progress_callback = None
cb = _build_child_progress_callback(0, parent)
cb("_thinking", "I'll search for papers first")
output = buf.getvalue()
assert "💭" in output
assert "search for papers" in output
def test_gateway_batched_progress(self):
"""Gateway path should batch tool calls and flush at BATCH_SIZE."""
parent = MagicMock()
parent._delegate_spinner = None
parent_cb = MagicMock()
parent.tool_progress_callback = parent_cb
cb = _build_child_progress_callback(0, parent)
# Send 4 tool calls — shouldn't flush yet (BATCH_SIZE = 5)
for i in range(4):
cb(f"tool_{i}", f"arg_{i}")
parent_cb.assert_not_called()
# 5th call should trigger flush
cb("tool_4", "arg_4")
parent_cb.assert_called_once()
call_args = parent_cb.call_args
assert "tool_0" in call_args[0][1]
assert "tool_4" in call_args[0][1]
def test_thinking_not_relayed_to_gateway(self):
"""Thinking events should NOT be sent to gateway (too noisy)."""
parent = MagicMock()
parent._delegate_spinner = None
parent_cb = MagicMock()
parent.tool_progress_callback = parent_cb
cb = _build_child_progress_callback(0, parent)
cb("_thinking", "some reasoning text")
parent_cb.assert_not_called()
def test_parallel_callbacks_independent(self):
"""Each child's callback should have independent batch state."""
parent = MagicMock()
parent._delegate_spinner = None
parent_cb = MagicMock()
parent.tool_progress_callback = parent_cb
cb0 = _build_child_progress_callback(0, parent)
cb1 = _build_child_progress_callback(1, parent)
# Send 3 calls to each — neither should flush (batch size = 5)
for i in range(3):
cb0(f"tool_{i}")
cb1(f"other_{i}")
parent_cb.assert_not_called()
def test_task_index_prefix_in_batch_mode(self):
"""Batch mode (task_count > 1) should show 1-indexed prefix for all tasks."""
buf = io.StringIO()
spinner = KawaiiSpinner("delegating")
spinner._out = buf
spinner.running = True
parent = MagicMock()
parent._delegate_spinner = spinner
parent.tool_progress_callback = None
# task_index=0 in a batch of 3 → prefix "[1]"
cb0 = _build_child_progress_callback(0, parent, task_count=3)
cb0("web_search", "test")
output = buf.getvalue()
assert "[1]" in output
# task_index=2 in a batch of 3 → prefix "[3]"
buf.truncate(0)
buf.seek(0)
cb2 = _build_child_progress_callback(2, parent, task_count=3)
cb2("web_search", "test")
output = buf.getvalue()
assert "[3]" in output
def test_single_task_no_prefix(self):
"""Single task (task_count=1) should not show index prefix."""
buf = io.StringIO()
spinner = KawaiiSpinner("delegating")
spinner._out = buf
spinner.running = True
parent = MagicMock()
parent._delegate_spinner = spinner
parent.tool_progress_callback = None
cb = _build_child_progress_callback(0, parent, task_count=1)
cb("web_search", "test")
output = buf.getvalue()
assert "[" not in output
# =========================================================================
# Integration: thinking callback in run_agent.py
# =========================================================================
class TestThinkingCallback:
"""Tests for the _thinking callback in AIAgent conversation loop."""
def _simulate_thinking_callback(self, content, callback, delegate_depth=1):
"""Simulate the exact code path from run_agent.py for the thinking callback.
delegate_depth: simulates self._delegate_depth.
0 = main agent (should NOT fire), >=1 = subagent (should fire).
"""
import re
if (content and callback and delegate_depth > 0):
_think_text = content.strip()
_think_text = re.sub(
r'</?(?:REASONING_SCRATCHPAD|think|reasoning)>', '', _think_text
).strip()
first_line = _think_text.split('\n')[0][:80] if _think_text else ""
if first_line:
try:
callback("_thinking", first_line)
except Exception:
pass
def test_thinking_callback_fires_on_content(self):
"""tool_progress_callback should receive _thinking event
when assistant message has content."""
calls = []
self._simulate_thinking_callback(
"I'll research quantum computing first, then summarize.",
lambda name, preview=None: calls.append((name, preview))
)
assert len(calls) == 1
assert calls[0][0] == "_thinking"
assert "quantum computing" in calls[0][1]
def test_thinking_callback_skipped_when_no_content(self):
"""Should not fire when assistant has no content."""
calls = []
self._simulate_thinking_callback(
None,
lambda name, preview=None: calls.append((name, preview))
)
assert len(calls) == 0
def test_thinking_callback_truncates_long_content(self):
"""Should truncate long content to 80 chars."""
calls = []
self._simulate_thinking_callback(
"A" * 200 + "\nSecond line should be ignored",
lambda name, preview=None: calls.append((name, preview))
)
assert len(calls) == 1
assert len(calls[0][1]) == 80
def test_thinking_callback_skipped_for_main_agent(self):
"""Main agent (delegate_depth=0) should NOT fire thinking events.
This prevents gateway spam on Telegram/Discord."""
calls = []
self._simulate_thinking_callback(
"I'll help you with that request.",
lambda name, preview=None: calls.append((name, preview)),
delegate_depth=0,
)
assert len(calls) == 0
def test_thinking_callback_strips_reasoning_scratchpad(self):
"""REASONING_SCRATCHPAD tags should be stripped before display."""
calls = []
self._simulate_thinking_callback(
"<REASONING_SCRATCHPAD>I need to analyze this carefully</REASONING_SCRATCHPAD>",
lambda name, preview=None: calls.append((name, preview))
)
assert len(calls) == 1
assert "<REASONING_SCRATCHPAD>" not in calls[0][1]
assert "analyze this carefully" in calls[0][1]
def test_thinking_callback_strips_think_tags(self):
"""<think> tags should be stripped before display."""
calls = []
self._simulate_thinking_callback(
"<think>Let me think about this problem</think>",
lambda name, preview=None: calls.append((name, preview))
)
assert len(calls) == 1
assert "<think>" not in calls[0][1]
assert "think about this problem" in calls[0][1]
def test_thinking_callback_empty_after_strip(self):
"""Should not fire when content is only XML tags."""
calls = []
self._simulate_thinking_callback(
"<REASONING_SCRATCHPAD></REASONING_SCRATCHPAD>",
lambda name, preview=None: calls.append((name, preview))
)
assert len(calls) == 0
# =========================================================================
# Gateway batch flush tests
# =========================================================================
class TestBatchFlush:
"""Tests for gateway batch flush on subagent completion."""
def test_flush_sends_remaining_batch(self):
"""_flush should send remaining tool names to gateway."""
parent = MagicMock()
parent._delegate_spinner = None
parent_cb = MagicMock()
parent.tool_progress_callback = parent_cb
cb = _build_child_progress_callback(0, parent)
# Send 3 tools (below batch size of 5)
cb("web_search", "query1")
cb("read_file", "file.txt")
cb("write_file", "out.txt")
parent_cb.assert_not_called()
# Flush should send the remaining 3
cb._flush()
parent_cb.assert_called_once()
summary = parent_cb.call_args[0][1]
assert "web_search" in summary
assert "write_file" in summary
def test_flush_noop_when_batch_empty(self):
"""_flush should not send anything when batch is empty."""
parent = MagicMock()
parent._delegate_spinner = None
parent_cb = MagicMock()
parent.tool_progress_callback = parent_cb
cb = _build_child_progress_callback(0, parent)
cb._flush()
parent_cb.assert_not_called()
def test_flush_noop_when_no_parent_callback(self):
"""_flush should not crash when there's no parent callback."""
buf = io.StringIO()
spinner = KawaiiSpinner("test")
spinner._out = buf
spinner.running = True
parent = MagicMock()
parent._delegate_spinner = spinner
parent.tool_progress_callback = None
cb = _build_child_progress_callback(0, parent)
cb("web_search", "test")
cb._flush() # Should not crash
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View file

@ -0,0 +1,184 @@
"""
Tests for MEDIA tag extraction from tool results.
Verifies that MEDIA tags (e.g., from TTS tool) are only extracted from
messages in the CURRENT turn, not from the full conversation history.
This prevents voice messages from accumulating and being sent multiple
times per reply. (Regression test for #160)
"""
import pytest
import re
def extract_media_tags_fixed(result_messages, history_len):
"""
Extract MEDIA tags from tool results, but ONLY from new messages
(those added after history_len). This is the fixed behavior.
Args:
result_messages: Full list of messages including history + new
history_len: Length of history before this turn
Returns:
Tuple of (media_tags list, has_voice_directive bool)
"""
media_tags = []
has_voice_directive = False
# Only process new messages from this turn
new_messages = result_messages[history_len:] if len(result_messages) > history_len else []
for msg in new_messages:
if msg.get("role") == "tool" or msg.get("role") == "function":
content = msg.get("content", "")
if "MEDIA:" in content:
for match in re.finditer(r'MEDIA:(\S+)', content):
path = match.group(1).strip().rstrip('",}')
if path:
media_tags.append(f"MEDIA:{path}")
if "[[audio_as_voice]]" in content:
has_voice_directive = True
return media_tags, has_voice_directive
def extract_media_tags_broken(result_messages):
"""
The BROKEN behavior: extract MEDIA tags from ALL messages including history.
This causes TTS voice messages to accumulate and be re-sent on every reply.
"""
media_tags = []
has_voice_directive = False
for msg in result_messages:
if msg.get("role") == "tool" or msg.get("role") == "function":
content = msg.get("content", "")
if "MEDIA:" in content:
for match in re.finditer(r'MEDIA:(\S+)', content):
path = match.group(1).strip().rstrip('",}')
if path:
media_tags.append(f"MEDIA:{path}")
if "[[audio_as_voice]]" in content:
has_voice_directive = True
return media_tags, has_voice_directive
class TestMediaExtraction:
"""Tests for MEDIA tag extraction from tool results."""
def test_media_tags_not_extracted_from_history(self):
"""MEDIA tags from previous turns should NOT be extracted again."""
# Simulate conversation history with a TTS call from a previous turn
history = [
{"role": "user", "content": "Say hello as audio"},
{"role": "assistant", "content": None, "tool_calls": [{"id": "1", "function": {"name": "text_to_speech"}}]},
{"role": "tool", "tool_call_id": "1", "content": '{"success": true, "media_tag": "[[audio_as_voice]]\\nMEDIA:/path/to/audio1.ogg"}'},
{"role": "assistant", "content": "I've said hello for you!"},
]
# New turn: user asks a simple question
new_messages = [
{"role": "user", "content": "What time is it?"},
{"role": "assistant", "content": "It's 3:30 AM."},
]
all_messages = history + new_messages
history_len = len(history)
# Fixed behavior: should extract NO media tags (none in new messages)
tags, voice_directive = extract_media_tags_fixed(all_messages, history_len)
assert tags == [], "Fixed extraction should not find tags in history"
assert voice_directive is False
# Broken behavior: would incorrectly extract the old media tag
broken_tags, broken_voice = extract_media_tags_broken(all_messages)
assert len(broken_tags) == 1, "Broken extraction finds tags in history"
assert "audio1.ogg" in broken_tags[0]
def test_media_tags_extracted_from_current_turn(self):
"""MEDIA tags from the current turn SHOULD be extracted."""
# History without TTS
history = [
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi there!"},
]
# New turn with TTS call
new_messages = [
{"role": "user", "content": "Say goodbye as audio"},
{"role": "assistant", "content": None, "tool_calls": [{"id": "2", "function": {"name": "text_to_speech"}}]},
{"role": "tool", "tool_call_id": "2", "content": '{"success": true, "media_tag": "[[audio_as_voice]]\\nMEDIA:/path/to/audio2.ogg"}'},
{"role": "assistant", "content": "I've said goodbye!"},
]
all_messages = history + new_messages
history_len = len(history)
# Fixed behavior: should extract the new media tag
tags, voice_directive = extract_media_tags_fixed(all_messages, history_len)
assert len(tags) == 1, "Should extract media tag from current turn"
assert "audio2.ogg" in tags[0]
assert voice_directive is True
def test_multiple_tts_calls_in_history_not_accumulated(self):
"""Multiple TTS calls in history should NOT accumulate in new responses."""
# History with multiple TTS calls
history = [
{"role": "user", "content": "Say hello"},
{"role": "tool", "tool_call_id": "1", "content": 'MEDIA:/audio/hello.ogg'},
{"role": "assistant", "content": "Done!"},
{"role": "user", "content": "Say goodbye"},
{"role": "tool", "tool_call_id": "2", "content": 'MEDIA:/audio/goodbye.ogg'},
{"role": "assistant", "content": "Done!"},
{"role": "user", "content": "Say thanks"},
{"role": "tool", "tool_call_id": "3", "content": 'MEDIA:/audio/thanks.ogg'},
{"role": "assistant", "content": "Done!"},
]
# New turn: no TTS
new_messages = [
{"role": "user", "content": "What time is it?"},
{"role": "assistant", "content": "3 PM"},
]
all_messages = history + new_messages
history_len = len(history)
# Fixed: no tags
tags, _ = extract_media_tags_fixed(all_messages, history_len)
assert tags == [], "Should not accumulate tags from history"
# Broken: would have 3 tags (all the old ones)
broken_tags, _ = extract_media_tags_broken(all_messages)
assert len(broken_tags) == 3, "Broken version accumulates all history tags"
def test_deduplication_within_current_turn(self):
"""Multiple MEDIA tags in current turn should be deduplicated."""
history = []
# Current turn with multiple tool calls producing same media
new_messages = [
{"role": "user", "content": "Multiple TTS"},
{"role": "tool", "tool_call_id": "1", "content": 'MEDIA:/audio/same.ogg'},
{"role": "tool", "tool_call_id": "2", "content": 'MEDIA:/audio/same.ogg'}, # duplicate
{"role": "tool", "tool_call_id": "3", "content": 'MEDIA:/audio/different.ogg'},
{"role": "assistant", "content": "Done!"},
]
all_messages = history + new_messages
tags, _ = extract_media_tags_fixed(all_messages, 0)
# Even though same.ogg appears twice, deduplication happens after extraction
# The extraction itself should get both, then caller deduplicates
assert len(tags) == 3 # Raw extraction gets all
# Deduplication as done in the actual code:
seen = set()
unique = [t for t in tags if t not in seen and not seen.add(t)]
assert len(unique) == 2 # After dedup: same.ogg and different.ogg
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View file

@ -0,0 +1,210 @@
import json
import time
import base64
from contextlib import contextmanager
from pathlib import Path
from types import SimpleNamespace
import pytest
import yaml
from hermes_cli.auth import (
AuthError,
DEFAULT_CODEX_BASE_URL,
PROVIDER_REGISTRY,
_persist_codex_auth_payload,
_login_openai_codex,
login_command,
get_codex_auth_status,
get_provider_auth_state,
read_codex_auth_file,
resolve_codex_runtime_credentials,
resolve_provider,
)
def _write_codex_auth(codex_home: Path, *, access_token: str = "access", refresh_token: str = "refresh") -> Path:
codex_home.mkdir(parents=True, exist_ok=True)
auth_file = codex_home / "auth.json"
auth_file.write_text(
json.dumps(
{
"auth_mode": "oauth",
"last_refresh": "2026-02-26T00:00:00Z",
"tokens": {
"access_token": access_token,
"refresh_token": refresh_token,
},
}
)
)
return auth_file
def _jwt_with_exp(exp_epoch: int) -> str:
payload = {"exp": exp_epoch}
encoded = base64.urlsafe_b64encode(json.dumps(payload).encode("utf-8")).rstrip(b"=").decode("utf-8")
return f"h.{encoded}.s"
def test_read_codex_auth_file_success(tmp_path, monkeypatch):
codex_home = tmp_path / "codex-home"
auth_file = _write_codex_auth(codex_home)
monkeypatch.setenv("CODEX_HOME", str(codex_home))
payload = read_codex_auth_file()
assert payload["auth_path"] == auth_file
assert payload["tokens"]["access_token"] == "access"
assert payload["tokens"]["refresh_token"] == "refresh"
def test_resolve_codex_runtime_credentials_missing_access_token(tmp_path, monkeypatch):
codex_home = tmp_path / "codex-home"
_write_codex_auth(codex_home, access_token="")
monkeypatch.setenv("CODEX_HOME", str(codex_home))
with pytest.raises(AuthError) as exc:
resolve_codex_runtime_credentials()
assert exc.value.code == "codex_auth_missing_access_token"
assert exc.value.relogin_required is True
def test_resolve_codex_runtime_credentials_refreshes_expiring_token(tmp_path, monkeypatch):
codex_home = tmp_path / "codex-home"
expiring_token = _jwt_with_exp(int(time.time()) - 10)
_write_codex_auth(codex_home, access_token=expiring_token, refresh_token="refresh-old")
monkeypatch.setenv("CODEX_HOME", str(codex_home))
called = {"count": 0}
def _fake_refresh(*, payload, auth_path, timeout_seconds, lock_held=False):
called["count"] += 1
assert auth_path == codex_home / "auth.json"
assert lock_held is True
return {"access_token": "access-new", "refresh_token": "refresh-new"}
monkeypatch.setattr("hermes_cli.auth._refresh_codex_auth_tokens", _fake_refresh)
resolved = resolve_codex_runtime_credentials()
assert called["count"] == 1
assert resolved["api_key"] == "access-new"
def test_resolve_codex_runtime_credentials_force_refresh(tmp_path, monkeypatch):
codex_home = tmp_path / "codex-home"
_write_codex_auth(codex_home, access_token="access-current", refresh_token="refresh-old")
monkeypatch.setenv("CODEX_HOME", str(codex_home))
called = {"count": 0}
def _fake_refresh(*, payload, auth_path, timeout_seconds, lock_held=False):
called["count"] += 1
assert lock_held is True
return {"access_token": "access-forced", "refresh_token": "refresh-new"}
monkeypatch.setattr("hermes_cli.auth._refresh_codex_auth_tokens", _fake_refresh)
resolved = resolve_codex_runtime_credentials(force_refresh=True, refresh_if_expiring=False)
assert called["count"] == 1
assert resolved["api_key"] == "access-forced"
def test_resolve_codex_runtime_credentials_uses_file_lock_on_refresh(tmp_path, monkeypatch):
codex_home = tmp_path / "codex-home"
_write_codex_auth(codex_home, access_token="access-current", refresh_token="refresh-old")
monkeypatch.setenv("CODEX_HOME", str(codex_home))
lock_calls = {"enter": 0, "exit": 0}
@contextmanager
def _fake_lock(auth_path, timeout_seconds=15.0):
assert auth_path == codex_home / "auth.json"
lock_calls["enter"] += 1
try:
yield
finally:
lock_calls["exit"] += 1
refresh_calls = {"count": 0}
def _fake_refresh(*, payload, auth_path, timeout_seconds, lock_held=False):
refresh_calls["count"] += 1
assert lock_held is True
return {"access_token": "access-updated", "refresh_token": "refresh-updated"}
monkeypatch.setattr("hermes_cli.auth._codex_auth_file_lock", _fake_lock)
monkeypatch.setattr("hermes_cli.auth._refresh_codex_auth_tokens", _fake_refresh)
resolved = resolve_codex_runtime_credentials(force_refresh=True, refresh_if_expiring=False)
assert refresh_calls["count"] == 1
assert lock_calls["enter"] == 1
assert lock_calls["exit"] == 1
assert resolved["api_key"] == "access-updated"
def test_resolve_provider_explicit_codex_does_not_fallback(monkeypatch):
monkeypatch.delenv("OPENAI_API_KEY", raising=False)
monkeypatch.delenv("OPENROUTER_API_KEY", raising=False)
assert resolve_provider("openai-codex") == "openai-codex"
def test_persist_codex_auth_payload_writes_atomically(tmp_path):
auth_path = tmp_path / "auth.json"
auth_path.write_text('{"stale":true}\n')
payload = {
"auth_mode": "oauth",
"tokens": {
"access_token": "next-access",
"refresh_token": "next-refresh",
},
"last_refresh": "2026-02-26T00:00:00Z",
}
_persist_codex_auth_payload(auth_path, payload)
stored = json.loads(auth_path.read_text())
assert stored == payload
assert list(tmp_path.glob(".auth.json.*.tmp")) == []
def test_get_codex_auth_status_not_logged_in(tmp_path, monkeypatch):
monkeypatch.setenv("CODEX_HOME", str(tmp_path / "missing-codex-home"))
status = get_codex_auth_status()
assert status["logged_in"] is False
assert "error" in status
def test_login_openai_codex_persists_provider_state(tmp_path, monkeypatch):
hermes_home = tmp_path / "hermes-home"
codex_home = tmp_path / "codex-home"
_write_codex_auth(codex_home)
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.setenv("CODEX_HOME", str(codex_home))
# Mock input() to accept existing credentials
monkeypatch.setattr("builtins.input", lambda _: "y")
_login_openai_codex(SimpleNamespace(), PROVIDER_REGISTRY["openai-codex"])
state = get_provider_auth_state("openai-codex")
assert state is not None
assert state["source"] == "codex-auth-json"
assert state["auth_file"].endswith("auth.json")
config_path = hermes_home / "config.yaml"
config = yaml.safe_load(config_path.read_text())
assert config["model"]["provider"] == "openai-codex"
assert config["model"]["base_url"] == DEFAULT_CODEX_BASE_URL
def test_login_command_shows_deprecation(monkeypatch, capsys):
"""login_command is deprecated and directs users to hermes model."""
with pytest.raises(SystemExit) as exc_info:
login_command(SimpleNamespace())
assert exc_info.value.code == 0
captured = capsys.readouterr()
assert "hermes model" in captured.out

80
tests/test_cli_init.py Normal file
View file

@ -0,0 +1,80 @@
"""Tests for HermesCLI initialization -- catches configuration bugs
that only manifest at runtime (not in mocked unit tests)."""
import os
import sys
from unittest.mock import patch, MagicMock
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
def _make_cli(**kwargs):
"""Create a HermesCLI instance with minimal mocking."""
from cli import HermesCLI
with patch("cli.get_tool_definitions", return_value=[]):
return HermesCLI(**kwargs)
class TestMaxTurnsResolution:
"""max_turns must always resolve to a positive integer, never None."""
def test_default_max_turns_is_integer(self):
cli = _make_cli()
assert isinstance(cli.max_turns, int)
assert cli.max_turns > 0
def test_explicit_max_turns_honored(self):
cli = _make_cli(max_turns=25)
assert cli.max_turns == 25
def test_none_max_turns_gets_default(self):
cli = _make_cli(max_turns=None)
assert isinstance(cli.max_turns, int)
assert cli.max_turns > 0
def test_env_var_max_turns(self, monkeypatch):
"""Env var is used when config file doesn't set max_turns."""
monkeypatch.setenv("HERMES_MAX_ITERATIONS", "42")
import cli as cli_module
original = cli_module.CLI_CONFIG["agent"].get("max_turns")
cli_module.CLI_CONFIG["agent"]["max_turns"] = None
try:
cli_obj = _make_cli()
assert cli_obj.max_turns == 42
finally:
if original is not None:
cli_module.CLI_CONFIG["agent"]["max_turns"] = original
def test_max_turns_never_none_for_agent(self):
"""The value passed to AIAgent must never be None (causes TypeError in run_conversation)."""
cli = _make_cli()
assert cli.max_turns is not None
class TestVerboseAndToolProgress:
def test_default_verbose_is_bool(self):
cli = _make_cli()
assert isinstance(cli.verbose, bool)
def test_tool_progress_mode_is_string(self):
cli = _make_cli()
assert isinstance(cli.tool_progress_mode, str)
assert cli.tool_progress_mode in ("off", "new", "all", "verbose")
class TestProviderResolution:
def test_api_key_is_string_or_none(self):
cli = _make_cli()
assert cli.api_key is None or isinstance(cli.api_key, str)
def test_base_url_is_string(self):
cli = _make_cli()
assert isinstance(cli.base_url, str)
assert cli.base_url.startswith("http")
def test_model_is_string(self):
cli = _make_cli()
assert isinstance(cli.model, str)
assert len(cli.model) > 0

View file

@ -0,0 +1,187 @@
import importlib
import sys
import types
from contextlib import nullcontext
from types import SimpleNamespace
from hermes_cli.auth import AuthError
from hermes_cli import main as hermes_main
def _install_prompt_toolkit_stubs():
class _Dummy:
def __init__(self, *args, **kwargs):
pass
class _Condition:
def __init__(self, func):
self.func = func
def __bool__(self):
return bool(self.func())
class _ANSI(str):
pass
root = types.ModuleType("prompt_toolkit")
history = types.ModuleType("prompt_toolkit.history")
styles = types.ModuleType("prompt_toolkit.styles")
patch_stdout = types.ModuleType("prompt_toolkit.patch_stdout")
application = types.ModuleType("prompt_toolkit.application")
layout = types.ModuleType("prompt_toolkit.layout")
processors = types.ModuleType("prompt_toolkit.layout.processors")
filters = types.ModuleType("prompt_toolkit.filters")
dimension = types.ModuleType("prompt_toolkit.layout.dimension")
menus = types.ModuleType("prompt_toolkit.layout.menus")
widgets = types.ModuleType("prompt_toolkit.widgets")
key_binding = types.ModuleType("prompt_toolkit.key_binding")
completion = types.ModuleType("prompt_toolkit.completion")
formatted_text = types.ModuleType("prompt_toolkit.formatted_text")
history.FileHistory = _Dummy
styles.Style = _Dummy
patch_stdout.patch_stdout = lambda *args, **kwargs: nullcontext()
application.Application = _Dummy
layout.Layout = _Dummy
layout.HSplit = _Dummy
layout.Window = _Dummy
layout.FormattedTextControl = _Dummy
layout.ConditionalContainer = _Dummy
processors.Processor = _Dummy
processors.Transformation = _Dummy
processors.PasswordProcessor = _Dummy
processors.ConditionalProcessor = _Dummy
filters.Condition = _Condition
dimension.Dimension = _Dummy
menus.CompletionsMenu = _Dummy
widgets.TextArea = _Dummy
key_binding.KeyBindings = _Dummy
completion.Completer = _Dummy
completion.Completion = _Dummy
formatted_text.ANSI = _ANSI
root.print_formatted_text = lambda *args, **kwargs: None
sys.modules.setdefault("prompt_toolkit", root)
sys.modules.setdefault("prompt_toolkit.history", history)
sys.modules.setdefault("prompt_toolkit.styles", styles)
sys.modules.setdefault("prompt_toolkit.patch_stdout", patch_stdout)
sys.modules.setdefault("prompt_toolkit.application", application)
sys.modules.setdefault("prompt_toolkit.layout", layout)
sys.modules.setdefault("prompt_toolkit.layout.processors", processors)
sys.modules.setdefault("prompt_toolkit.filters", filters)
sys.modules.setdefault("prompt_toolkit.layout.dimension", dimension)
sys.modules.setdefault("prompt_toolkit.layout.menus", menus)
sys.modules.setdefault("prompt_toolkit.widgets", widgets)
sys.modules.setdefault("prompt_toolkit.key_binding", key_binding)
sys.modules.setdefault("prompt_toolkit.completion", completion)
sys.modules.setdefault("prompt_toolkit.formatted_text", formatted_text)
def _import_cli():
try:
importlib.import_module("prompt_toolkit")
except ModuleNotFoundError:
_install_prompt_toolkit_stubs()
return importlib.import_module("cli")
def test_hermes_cli_init_does_not_eagerly_resolve_runtime_provider(monkeypatch):
cli = _import_cli()
calls = {"count": 0}
def _unexpected_runtime_resolve(**kwargs):
calls["count"] += 1
raise AssertionError("resolve_runtime_provider should not be called in HermesCLI.__init__")
monkeypatch.setattr("hermes_cli.runtime_provider.resolve_runtime_provider", _unexpected_runtime_resolve)
monkeypatch.setattr("hermes_cli.runtime_provider.format_runtime_provider_error", lambda exc: str(exc))
shell = cli.HermesCLI(model="gpt-5", compact=True, max_turns=1)
assert shell is not None
assert calls["count"] == 0
def test_runtime_resolution_failure_is_not_sticky(monkeypatch):
cli = _import_cli()
calls = {"count": 0}
def _runtime_resolve(**kwargs):
calls["count"] += 1
if calls["count"] == 1:
raise RuntimeError("temporary auth failure")
return {
"provider": "openrouter",
"api_mode": "chat_completions",
"base_url": "https://openrouter.ai/api/v1",
"api_key": "test-key",
"source": "env/config",
}
class _DummyAgent:
def __init__(self, *args, **kwargs):
self.kwargs = kwargs
monkeypatch.setattr("hermes_cli.runtime_provider.resolve_runtime_provider", _runtime_resolve)
monkeypatch.setattr("hermes_cli.runtime_provider.format_runtime_provider_error", lambda exc: str(exc))
monkeypatch.setattr(cli, "AIAgent", _DummyAgent)
shell = cli.HermesCLI(model="gpt-5", compact=True, max_turns=1)
assert shell._init_agent() is False
assert shell._init_agent() is True
assert calls["count"] == 2
assert shell.agent is not None
def test_runtime_resolution_rebuilds_agent_on_routing_change(monkeypatch):
cli = _import_cli()
def _runtime_resolve(**kwargs):
return {
"provider": "openai-codex",
"api_mode": "codex_responses",
"base_url": "https://same-endpoint.example/v1",
"api_key": "same-key",
"source": "env/config",
}
monkeypatch.setattr("hermes_cli.runtime_provider.resolve_runtime_provider", _runtime_resolve)
monkeypatch.setattr("hermes_cli.runtime_provider.format_runtime_provider_error", lambda exc: str(exc))
shell = cli.HermesCLI(model="gpt-5", compact=True, max_turns=1)
shell.provider = "openrouter"
shell.api_mode = "chat_completions"
shell.base_url = "https://same-endpoint.example/v1"
shell.api_key = "same-key"
shell.agent = object()
assert shell._ensure_runtime_credentials() is True
assert shell.agent is None
assert shell.provider == "openai-codex"
assert shell.api_mode == "codex_responses"
def test_cmd_model_falls_back_to_auto_on_invalid_provider(monkeypatch, capsys):
monkeypatch.setattr(
"hermes_cli.config.load_config",
lambda: {"model": {"default": "gpt-5", "provider": "invalid-provider"}},
)
monkeypatch.setattr("hermes_cli.config.save_config", lambda cfg: None)
monkeypatch.setattr("hermes_cli.config.get_env_value", lambda key: "")
monkeypatch.setattr("hermes_cli.config.save_env_value", lambda key, value: None)
def _resolve_provider(requested, **kwargs):
if requested == "invalid-provider":
raise AuthError("Unknown provider 'invalid-provider'.", code="invalid_provider")
return "openrouter"
monkeypatch.setattr("hermes_cli.auth.resolve_provider", _resolve_provider)
monkeypatch.setattr(hermes_main, "_prompt_provider_choice", lambda choices: len(choices) - 1)
hermes_main.cmd_model(SimpleNamespace())
output = capsys.readouterr().out
assert "Warning:" in output
assert "falling back to auto provider detection" in output.lower()
assert "No change." in output

View file

@ -0,0 +1,180 @@
import asyncio
import sys
import types
from types import SimpleNamespace
sys.modules.setdefault("fire", types.SimpleNamespace(Fire=lambda *a, **k: None))
sys.modules.setdefault("firecrawl", types.SimpleNamespace(Firecrawl=object))
sys.modules.setdefault("fal_client", types.SimpleNamespace())
import cron.scheduler as cron_scheduler
import gateway.run as gateway_run
import run_agent
from gateway.config import Platform
from gateway.session import SessionSource
def _patch_agent_bootstrap(monkeypatch):
monkeypatch.setattr(
run_agent,
"get_tool_definitions",
lambda **kwargs: [
{
"type": "function",
"function": {
"name": "terminal",
"description": "Run shell commands.",
"parameters": {"type": "object", "properties": {}},
},
}
],
)
monkeypatch.setattr(run_agent, "check_toolset_requirements", lambda: {})
def _codex_message_response(text: str):
return SimpleNamespace(
output=[
SimpleNamespace(
type="message",
content=[SimpleNamespace(type="output_text", text=text)],
)
],
usage=SimpleNamespace(input_tokens=5, output_tokens=3, total_tokens=8),
status="completed",
model="gpt-5-codex",
)
class _UnauthorizedError(RuntimeError):
def __init__(self):
super().__init__("Error code: 401 - unauthorized")
self.status_code = 401
class _FakeOpenAI:
def __init__(self, **kwargs):
self.kwargs = kwargs
def close(self):
return None
class _Codex401ThenSuccessAgent(run_agent.AIAgent):
refresh_attempts = 0
last_init = {}
def __init__(self, *args, **kwargs):
kwargs.setdefault("skip_context_files", True)
kwargs.setdefault("skip_memory", True)
kwargs.setdefault("max_iterations", 4)
type(self).last_init = dict(kwargs)
super().__init__(*args, **kwargs)
self._cleanup_task_resources = lambda task_id: None
self._persist_session = lambda messages, history=None: None
self._save_trajectory = lambda messages, user_message, completed: None
self._save_session_log = lambda messages: None
def _try_refresh_codex_client_credentials(self, *, force: bool = True) -> bool:
type(self).refresh_attempts += 1
return True
def run_conversation(self, user_message: str, conversation_history=None):
calls = {"api": 0}
def _fake_api_call(api_kwargs):
calls["api"] += 1
if calls["api"] == 1:
raise _UnauthorizedError()
return _codex_message_response("Recovered via refresh")
self._interruptible_api_call = _fake_api_call
return super().run_conversation(user_message, conversation_history=conversation_history)
def test_cron_run_job_codex_path_handles_internal_401_refresh(monkeypatch):
_patch_agent_bootstrap(monkeypatch)
monkeypatch.setattr(run_agent, "OpenAI", _FakeOpenAI)
monkeypatch.setattr(run_agent, "AIAgent", _Codex401ThenSuccessAgent)
monkeypatch.setattr(
"hermes_cli.runtime_provider.resolve_runtime_provider",
lambda requested=None: {
"provider": "openai-codex",
"api_mode": "codex_responses",
"base_url": "https://chatgpt.com/backend-api/codex",
"api_key": "codex-token",
},
)
monkeypatch.setattr("hermes_cli.runtime_provider.format_runtime_provider_error", lambda exc: str(exc))
_Codex401ThenSuccessAgent.refresh_attempts = 0
_Codex401ThenSuccessAgent.last_init = {}
success, output, final_response, error = cron_scheduler.run_job(
{"id": "job-1", "name": "Codex Refresh Test", "prompt": "ping"}
)
assert success is True
assert error is None
assert final_response == "Recovered via refresh"
assert "Recovered via refresh" in output
assert _Codex401ThenSuccessAgent.refresh_attempts == 1
assert _Codex401ThenSuccessAgent.last_init["provider"] == "openai-codex"
assert _Codex401ThenSuccessAgent.last_init["api_mode"] == "codex_responses"
def test_gateway_run_agent_codex_path_handles_internal_401_refresh(monkeypatch):
_patch_agent_bootstrap(monkeypatch)
monkeypatch.setattr(run_agent, "OpenAI", _FakeOpenAI)
monkeypatch.setattr(run_agent, "AIAgent", _Codex401ThenSuccessAgent)
monkeypatch.setattr(
gateway_run,
"_resolve_runtime_agent_kwargs",
lambda: {
"provider": "openai-codex",
"api_mode": "codex_responses",
"base_url": "https://chatgpt.com/backend-api/codex",
"api_key": "codex-token",
},
)
monkeypatch.setenv("HERMES_TOOL_PROGRESS", "false")
_Codex401ThenSuccessAgent.refresh_attempts = 0
_Codex401ThenSuccessAgent.last_init = {}
runner = gateway_run.GatewayRunner.__new__(gateway_run.GatewayRunner)
runner.adapters = {}
runner._ephemeral_system_prompt = ""
runner._prefill_messages = []
runner._reasoning_config = None
runner._running_agents = {}
from unittest.mock import MagicMock, AsyncMock
runner.hooks = MagicMock()
runner.hooks.emit = AsyncMock()
runner.hooks.loaded_hooks = []
runner._session_db = None
source = SessionSource(
platform=Platform.LOCAL,
chat_id="cli",
chat_name="CLI",
chat_type="dm",
user_id="user-1",
)
result = asyncio.run(
runner._run_agent(
message="ping",
context_prompt="",
history=[],
source=source,
session_id="session-1",
session_key="agent:main:local:dm",
)
)
assert result["final_response"] == "Recovered via refresh"
assert _Codex401ThenSuccessAgent.refresh_attempts == 1
assert _Codex401ThenSuccessAgent.last_init["provider"] == "openai-codex"
assert _Codex401ThenSuccessAgent.last_init["api_mode"] == "codex_responses"

View file

@ -0,0 +1,40 @@
import json
from hermes_cli.codex_models import DEFAULT_CODEX_MODELS, get_codex_model_ids
def test_get_codex_model_ids_prioritizes_default_and_cache(tmp_path, monkeypatch):
codex_home = tmp_path / "codex-home"
codex_home.mkdir(parents=True, exist_ok=True)
(codex_home / "config.toml").write_text('model = "gpt-5.2-codex"\n')
(codex_home / "models_cache.json").write_text(
json.dumps(
{
"models": [
{"slug": "gpt-5.3-codex", "priority": 20, "supported_in_api": True},
{"slug": "gpt-5.1-codex", "priority": 5, "supported_in_api": True},
{"slug": "gpt-4o", "priority": 1, "supported_in_api": True},
{"slug": "gpt-5-hidden-codex", "priority": 2, "visibility": "hidden"},
]
}
)
)
monkeypatch.setenv("CODEX_HOME", str(codex_home))
models = get_codex_model_ids()
assert models[0] == "gpt-5.2-codex"
assert "gpt-5.1-codex" in models
assert "gpt-5.3-codex" in models
assert "gpt-4o" not in models
assert "gpt-5-hidden-codex" not in models
def test_get_codex_model_ids_falls_back_to_curated_defaults(tmp_path, monkeypatch):
codex_home = tmp_path / "codex-home"
codex_home.mkdir(parents=True, exist_ok=True)
monkeypatch.setenv("CODEX_HOME", str(codex_home))
models = get_codex_model_ids()
assert models[: len(DEFAULT_CODEX_MODELS)] == DEFAULT_CODEX_MODELS

View file

@ -0,0 +1,51 @@
"""Tests for detect_external_credentials() -- Phase 2 credential sync."""
import json
from pathlib import Path
from unittest.mock import patch
import pytest
from hermes_cli.auth import detect_external_credentials
class TestDetectCodexCLI:
def test_detects_valid_codex_auth(self, tmp_path):
codex_dir = tmp_path / ".codex"
codex_dir.mkdir()
auth = codex_dir / "auth.json"
auth.write_text(json.dumps({
"tokens": {"access_token": "tok-123", "refresh_token": "ref-456"}
}))
with patch("hermes_cli.auth.resolve_codex_home_path", return_value=codex_dir):
result = detect_external_credentials()
codex_hits = [c for c in result if c["provider"] == "openai-codex"]
assert len(codex_hits) == 1
assert "Codex CLI" in codex_hits[0]["label"]
assert str(auth) == codex_hits[0]["path"]
def test_skips_codex_without_access_token(self, tmp_path):
codex_dir = tmp_path / ".codex"
codex_dir.mkdir()
(codex_dir / "auth.json").write_text(json.dumps({"tokens": {}}))
with patch("hermes_cli.auth.resolve_codex_home_path", return_value=codex_dir):
result = detect_external_credentials()
assert not any(c["provider"] == "openai-codex" for c in result)
def test_skips_missing_codex_dir(self, tmp_path):
with patch("hermes_cli.auth.resolve_codex_home_path", return_value=tmp_path / "nonexistent"):
result = detect_external_credentials()
assert not any(c["provider"] == "openai-codex" for c in result)
def test_skips_malformed_codex_auth(self, tmp_path):
codex_dir = tmp_path / ".codex"
codex_dir.mkdir()
(codex_dir / "auth.json").write_text("{bad json")
with patch("hermes_cli.auth.resolve_codex_home_path", return_value=codex_dir):
result = detect_external_credentials()
assert not any(c["provider"] == "openai-codex" for c in result)
def test_returns_empty_when_nothing_found(self, tmp_path):
with patch("hermes_cli.auth.resolve_codex_home_path", return_value=tmp_path / ".codex"):
result = detect_external_credentials()
assert result == []

View file

@ -0,0 +1,225 @@
"""Tests for flush_memories() working correctly across all provider modes.
Catches the bug where Codex mode called chat.completions.create on a
Responses-only client, which would fail silently or with a 404.
"""
import json
import os
import sys
import types
from types import SimpleNamespace
from unittest.mock import patch, MagicMock, call
import pytest
sys.modules.setdefault("fire", types.SimpleNamespace(Fire=lambda *a, **k: None))
sys.modules.setdefault("firecrawl", types.SimpleNamespace(Firecrawl=object))
sys.modules.setdefault("fal_client", types.SimpleNamespace())
import run_agent
class _FakeOpenAI:
def __init__(self, **kwargs):
self.kwargs = kwargs
self.api_key = kwargs.get("api_key", "test")
self.base_url = kwargs.get("base_url", "http://test")
def close(self):
pass
def _make_agent(monkeypatch, api_mode="chat_completions", provider="openrouter"):
"""Build an AIAgent with mocked internals, ready for flush_memories testing."""
monkeypatch.setattr(run_agent, "get_tool_definitions", lambda **kw: [
{
"type": "function",
"function": {
"name": "memory",
"description": "Manage memories.",
"parameters": {
"type": "object",
"properties": {
"action": {"type": "string"},
"target": {"type": "string"},
"content": {"type": "string"},
},
},
},
},
])
monkeypatch.setattr(run_agent, "check_toolset_requirements", lambda: {})
monkeypatch.setattr(run_agent, "OpenAI", _FakeOpenAI)
agent = run_agent.AIAgent(
api_key="test-key",
base_url="https://test.example.com/v1",
provider=provider,
api_mode=api_mode,
max_iterations=4,
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
)
# Give it a valid memory store
agent._memory_store = MagicMock()
agent._memory_flush_min_turns = 1
agent._user_turn_count = 5
return agent
def _chat_response_with_memory_call():
"""Simulated chat completions response with a memory tool call."""
return SimpleNamespace(
choices=[SimpleNamespace(
message=SimpleNamespace(
content=None,
tool_calls=[SimpleNamespace(
function=SimpleNamespace(
name="memory",
arguments=json.dumps({
"action": "add",
"target": "notes",
"content": "User prefers dark mode.",
}),
),
)],
),
)],
usage=SimpleNamespace(prompt_tokens=100, completion_tokens=20, total_tokens=120),
)
class TestFlushMemoriesUsesAuxiliaryClient:
"""When an auxiliary client is available, flush_memories should use it
instead of self.client -- especially critical in Codex mode."""
def test_flush_uses_auxiliary_when_available(self, monkeypatch):
agent = _make_agent(monkeypatch, api_mode="codex_responses", provider="openai-codex")
mock_aux_client = MagicMock()
mock_aux_client.chat.completions.create.return_value = _chat_response_with_memory_call()
with patch("agent.auxiliary_client.get_text_auxiliary_client", return_value=(mock_aux_client, "gpt-4o-mini")):
messages = [
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi there"},
{"role": "user", "content": "Remember this"},
]
with patch("tools.memory_tool.memory_tool", return_value="Saved.") as mock_memory:
agent.flush_memories(messages)
mock_aux_client.chat.completions.create.assert_called_once()
call_kwargs = mock_aux_client.chat.completions.create.call_args
assert call_kwargs.kwargs.get("model") == "gpt-4o-mini" or call_kwargs[1].get("model") == "gpt-4o-mini"
def test_flush_uses_main_client_when_no_auxiliary(self, monkeypatch):
"""Non-Codex mode with no auxiliary falls back to self.client."""
agent = _make_agent(monkeypatch, api_mode="chat_completions", provider="openrouter")
agent.client = MagicMock()
agent.client.chat.completions.create.return_value = _chat_response_with_memory_call()
with patch("agent.auxiliary_client.get_text_auxiliary_client", return_value=(None, None)):
messages = [
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi there"},
{"role": "user", "content": "Save this"},
]
with patch("tools.memory_tool.memory_tool", return_value="Saved."):
agent.flush_memories(messages)
agent.client.chat.completions.create.assert_called_once()
def test_flush_executes_memory_tool_calls(self, monkeypatch):
"""Verify that memory tool calls from the flush response actually get executed."""
agent = _make_agent(monkeypatch, api_mode="chat_completions", provider="openrouter")
mock_aux_client = MagicMock()
mock_aux_client.chat.completions.create.return_value = _chat_response_with_memory_call()
with patch("agent.auxiliary_client.get_text_auxiliary_client", return_value=(mock_aux_client, "gpt-4o-mini")):
messages = [
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi"},
{"role": "user", "content": "Note this"},
]
with patch("tools.memory_tool.memory_tool", return_value="Saved.") as mock_memory:
agent.flush_memories(messages)
mock_memory.assert_called_once()
call_kwargs = mock_memory.call_args
assert call_kwargs.kwargs["action"] == "add"
assert call_kwargs.kwargs["target"] == "notes"
assert "dark mode" in call_kwargs.kwargs["content"]
def test_flush_strips_artifacts_from_messages(self, monkeypatch):
"""After flush, the flush prompt and any response should be removed from messages."""
agent = _make_agent(monkeypatch, api_mode="chat_completions", provider="openrouter")
mock_aux_client = MagicMock()
mock_aux_client.chat.completions.create.return_value = _chat_response_with_memory_call()
with patch("agent.auxiliary_client.get_text_auxiliary_client", return_value=(mock_aux_client, "gpt-4o-mini")):
messages = [
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi"},
{"role": "user", "content": "Remember X"},
]
original_len = len(messages)
with patch("tools.memory_tool.memory_tool", return_value="Saved."):
agent.flush_memories(messages)
# Messages should not grow from the flush
assert len(messages) <= original_len
# No flush sentinel should remain
for msg in messages:
assert "_flush_sentinel" not in msg
class TestFlushMemoriesCodexFallback:
"""When no auxiliary client exists and we're in Codex mode, flush should
use the Codex Responses API path instead of chat.completions."""
def test_codex_mode_no_aux_uses_responses_api(self, monkeypatch):
agent = _make_agent(monkeypatch, api_mode="codex_responses", provider="openai-codex")
codex_response = SimpleNamespace(
output=[
SimpleNamespace(
type="function_call",
call_id="call_1",
name="memory",
arguments=json.dumps({
"action": "add",
"target": "notes",
"content": "Codex flush test",
}),
),
],
usage=SimpleNamespace(input_tokens=50, output_tokens=10, total_tokens=60),
status="completed",
model="gpt-5-codex",
)
with patch("agent.auxiliary_client.get_text_auxiliary_client", return_value=(None, None)), \
patch.object(agent, "_run_codex_stream", return_value=codex_response) as mock_stream, \
patch.object(agent, "_build_api_kwargs") as mock_build, \
patch("tools.memory_tool.memory_tool", return_value="Saved.") as mock_memory:
mock_build.return_value = {
"model": "gpt-5-codex",
"instructions": "test",
"input": [],
"tools": [],
"max_output_tokens": 4096,
}
messages = [
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi"},
{"role": "user", "content": "Save this"},
]
agent.flush_memories(messages)
mock_stream.assert_called_once()
mock_memory.assert_called_once()
assert mock_memory.call_args.kwargs["content"] == "Codex flush test"

View file

@ -0,0 +1,460 @@
"""Provider parity tests: verify that AIAgent builds correct API kwargs
and handles responses properly for all supported providers.
Ensures changes to one provider path don't silently break another.
"""
import json
import os
import sys
import types
from types import SimpleNamespace
from unittest.mock import patch, MagicMock
import pytest
sys.modules.setdefault("fire", types.SimpleNamespace(Fire=lambda *a, **k: None))
sys.modules.setdefault("firecrawl", types.SimpleNamespace(Firecrawl=object))
sys.modules.setdefault("fal_client", types.SimpleNamespace())
from run_agent import AIAgent
# ── Helpers ──────────────────────────────────────────────────────────────────
def _tool_defs(*names):
return [
{
"type": "function",
"function": {
"name": n,
"description": f"{n} tool",
"parameters": {"type": "object", "properties": {}},
},
}
for n in names
]
class _FakeOpenAI:
def __init__(self, **kw):
self.api_key = kw.get("api_key", "test")
self.base_url = kw.get("base_url", "http://test")
def close(self):
pass
def _make_agent(monkeypatch, provider, api_mode="chat_completions", base_url="https://openrouter.ai/api/v1"):
monkeypatch.setattr("run_agent.get_tool_definitions", lambda **kw: _tool_defs("web_search", "terminal"))
monkeypatch.setattr("run_agent.check_toolset_requirements", lambda: {})
monkeypatch.setattr("run_agent.OpenAI", _FakeOpenAI)
return AIAgent(
api_key="test-key",
base_url=base_url,
provider=provider,
api_mode=api_mode,
max_iterations=4,
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
)
# ── _build_api_kwargs tests ─────────────────────────────────────────────────
class TestBuildApiKwargsOpenRouter:
def test_uses_chat_completions_format(self, monkeypatch):
agent = _make_agent(monkeypatch, "openrouter")
messages = [{"role": "user", "content": "hi"}]
kwargs = agent._build_api_kwargs(messages)
assert "messages" in kwargs
assert "model" in kwargs
assert kwargs["messages"][-1]["content"] == "hi"
def test_includes_reasoning_in_extra_body(self, monkeypatch):
agent = _make_agent(monkeypatch, "openrouter")
messages = [{"role": "user", "content": "hi"}]
kwargs = agent._build_api_kwargs(messages)
extra = kwargs.get("extra_body", {})
assert "reasoning" in extra
assert extra["reasoning"]["enabled"] is True
def test_includes_tools(self, monkeypatch):
agent = _make_agent(monkeypatch, "openrouter")
messages = [{"role": "user", "content": "hi"}]
kwargs = agent._build_api_kwargs(messages)
assert "tools" in kwargs
tool_names = [t["function"]["name"] for t in kwargs["tools"]]
assert "web_search" in tool_names
def test_no_responses_api_fields(self, monkeypatch):
agent = _make_agent(monkeypatch, "openrouter")
messages = [{"role": "user", "content": "hi"}]
kwargs = agent._build_api_kwargs(messages)
assert "input" not in kwargs
assert "instructions" not in kwargs
assert "store" not in kwargs
class TestBuildApiKwargsNousPortal:
def test_includes_nous_product_tags(self, monkeypatch):
agent = _make_agent(monkeypatch, "nous", base_url="https://inference-api.nousresearch.com/v1")
messages = [{"role": "user", "content": "hi"}]
kwargs = agent._build_api_kwargs(messages)
extra = kwargs.get("extra_body", {})
assert extra.get("tags") == ["product=hermes-agent"]
def test_uses_chat_completions_format(self, monkeypatch):
agent = _make_agent(monkeypatch, "nous", base_url="https://inference-api.nousresearch.com/v1")
messages = [{"role": "user", "content": "hi"}]
kwargs = agent._build_api_kwargs(messages)
assert "messages" in kwargs
assert "input" not in kwargs
class TestBuildApiKwargsCustomEndpoint:
def test_uses_chat_completions_format(self, monkeypatch):
agent = _make_agent(monkeypatch, "custom", base_url="http://localhost:1234/v1")
messages = [{"role": "user", "content": "hi"}]
kwargs = agent._build_api_kwargs(messages)
assert "messages" in kwargs
assert "input" not in kwargs
def test_no_openrouter_extra_body(self, monkeypatch):
agent = _make_agent(monkeypatch, "custom", base_url="http://localhost:1234/v1")
messages = [{"role": "user", "content": "hi"}]
kwargs = agent._build_api_kwargs(messages)
extra = kwargs.get("extra_body", {})
assert "reasoning" not in extra
class TestBuildApiKwargsCodex:
def test_uses_responses_api_format(self, monkeypatch):
agent = _make_agent(monkeypatch, "openai-codex", api_mode="codex_responses",
base_url="https://chatgpt.com/backend-api/codex")
messages = [{"role": "user", "content": "hi"}]
kwargs = agent._build_api_kwargs(messages)
assert "input" in kwargs
assert "instructions" in kwargs
assert "messages" not in kwargs
assert kwargs["store"] is False
def test_includes_reasoning_config(self, monkeypatch):
agent = _make_agent(monkeypatch, "openai-codex", api_mode="codex_responses",
base_url="https://chatgpt.com/backend-api/codex")
messages = [{"role": "user", "content": "hi"}]
kwargs = agent._build_api_kwargs(messages)
assert "reasoning" in kwargs
assert kwargs["reasoning"]["effort"] == "medium"
def test_includes_encrypted_content_in_include(self, monkeypatch):
agent = _make_agent(monkeypatch, "openai-codex", api_mode="codex_responses",
base_url="https://chatgpt.com/backend-api/codex")
messages = [{"role": "user", "content": "hi"}]
kwargs = agent._build_api_kwargs(messages)
assert "reasoning.encrypted_content" in kwargs.get("include", [])
def test_tools_converted_to_responses_format(self, monkeypatch):
agent = _make_agent(monkeypatch, "openai-codex", api_mode="codex_responses",
base_url="https://chatgpt.com/backend-api/codex")
messages = [{"role": "user", "content": "hi"}]
kwargs = agent._build_api_kwargs(messages)
tools = kwargs.get("tools", [])
assert len(tools) > 0
# Responses format has "name" at top level, not nested under "function"
assert "name" in tools[0]
assert "function" not in tools[0]
# ── Message conversion tests ────────────────────────────────────────────────
class TestChatMessagesToResponsesInput:
"""Verify _chat_messages_to_responses_input for Codex mode."""
def test_user_message_passes_through(self, monkeypatch):
agent = _make_agent(monkeypatch, "openai-codex", api_mode="codex_responses",
base_url="https://chatgpt.com/backend-api/codex")
messages = [{"role": "user", "content": "hello"}]
items = agent._chat_messages_to_responses_input(messages)
assert items == [{"role": "user", "content": "hello"}]
def test_system_messages_filtered(self, monkeypatch):
agent = _make_agent(monkeypatch, "openai-codex", api_mode="codex_responses",
base_url="https://chatgpt.com/backend-api/codex")
messages = [
{"role": "system", "content": "be helpful"},
{"role": "user", "content": "hello"},
]
items = agent._chat_messages_to_responses_input(messages)
assert len(items) == 1
assert items[0]["role"] == "user"
def test_assistant_tool_calls_become_function_call_items(self, monkeypatch):
agent = _make_agent(monkeypatch, "openai-codex", api_mode="codex_responses",
base_url="https://chatgpt.com/backend-api/codex")
messages = [{
"role": "assistant",
"content": "",
"tool_calls": [{
"id": "call_abc",
"call_id": "call_abc",
"function": {"name": "web_search", "arguments": '{"query": "test"}'},
}],
}]
items = agent._chat_messages_to_responses_input(messages)
fc_items = [i for i in items if i.get("type") == "function_call"]
assert len(fc_items) == 1
assert fc_items[0]["name"] == "web_search"
assert fc_items[0]["call_id"] == "call_abc"
def test_tool_results_become_function_call_output(self, monkeypatch):
agent = _make_agent(monkeypatch, "openai-codex", api_mode="codex_responses",
base_url="https://chatgpt.com/backend-api/codex")
messages = [{"role": "tool", "tool_call_id": "call_abc", "content": "result here"}]
items = agent._chat_messages_to_responses_input(messages)
assert items[0]["type"] == "function_call_output"
assert items[0]["call_id"] == "call_abc"
assert items[0]["output"] == "result here"
def test_encrypted_reasoning_replayed(self, monkeypatch):
"""Encrypted reasoning items from previous turns must be included in input."""
agent = _make_agent(monkeypatch, "openai-codex", api_mode="codex_responses",
base_url="https://chatgpt.com/backend-api/codex")
messages = [
{"role": "user", "content": "think about this"},
{
"role": "assistant",
"content": "I thought about it.",
"codex_reasoning_items": [
{"type": "reasoning", "id": "rs_abc", "encrypted_content": "gAAAA_test_blob"},
],
},
{"role": "user", "content": "continue"},
]
items = agent._chat_messages_to_responses_input(messages)
reasoning_items = [i for i in items if i.get("type") == "reasoning"]
assert len(reasoning_items) == 1
assert reasoning_items[0]["encrypted_content"] == "gAAAA_test_blob"
def test_no_reasoning_items_for_non_codex_messages(self, monkeypatch):
"""Messages without codex_reasoning_items should not inject anything."""
agent = _make_agent(monkeypatch, "openai-codex", api_mode="codex_responses",
base_url="https://chatgpt.com/backend-api/codex")
messages = [
{"role": "assistant", "content": "hi"},
{"role": "user", "content": "hello"},
]
items = agent._chat_messages_to_responses_input(messages)
reasoning_items = [i for i in items if i.get("type") == "reasoning"]
assert len(reasoning_items) == 0
# ── Response normalization tests ─────────────────────────────────────────────
class TestNormalizeCodexResponse:
"""Verify _normalize_codex_response extracts all fields correctly."""
def _make_codex_agent(self, monkeypatch):
return _make_agent(monkeypatch, "openai-codex", api_mode="codex_responses",
base_url="https://chatgpt.com/backend-api/codex")
def test_text_response(self, monkeypatch):
agent = self._make_codex_agent(monkeypatch)
response = SimpleNamespace(
output=[
SimpleNamespace(type="message", status="completed",
content=[SimpleNamespace(type="output_text", text="Hello!")],
phase="final_answer"),
],
status="completed",
)
msg, reason = agent._normalize_codex_response(response)
assert msg.content == "Hello!"
assert reason == "stop"
def test_reasoning_summary_extracted(self, monkeypatch):
agent = self._make_codex_agent(monkeypatch)
response = SimpleNamespace(
output=[
SimpleNamespace(type="reasoning",
encrypted_content="gAAAA_blob",
summary=[SimpleNamespace(type="summary_text", text="Thinking about math")],
id="rs_123", status=None),
SimpleNamespace(type="message", status="completed",
content=[SimpleNamespace(type="output_text", text="42")],
phase="final_answer"),
],
status="completed",
)
msg, reason = agent._normalize_codex_response(response)
assert msg.content == "42"
assert "math" in msg.reasoning
assert reason == "stop"
def test_encrypted_content_captured(self, monkeypatch):
agent = self._make_codex_agent(monkeypatch)
response = SimpleNamespace(
output=[
SimpleNamespace(type="reasoning",
encrypted_content="gAAAA_secret_blob_123",
summary=[SimpleNamespace(type="summary_text", text="Thinking")],
id="rs_456", status=None),
SimpleNamespace(type="message", status="completed",
content=[SimpleNamespace(type="output_text", text="done")],
phase="final_answer"),
],
status="completed",
)
msg, reason = agent._normalize_codex_response(response)
assert msg.codex_reasoning_items is not None
assert len(msg.codex_reasoning_items) == 1
assert msg.codex_reasoning_items[0]["encrypted_content"] == "gAAAA_secret_blob_123"
assert msg.codex_reasoning_items[0]["id"] == "rs_456"
def test_no_encrypted_content_when_missing(self, monkeypatch):
agent = self._make_codex_agent(monkeypatch)
response = SimpleNamespace(
output=[
SimpleNamespace(type="message", status="completed",
content=[SimpleNamespace(type="output_text", text="no reasoning")],
phase="final_answer"),
],
status="completed",
)
msg, reason = agent._normalize_codex_response(response)
assert msg.codex_reasoning_items is None
def test_tool_calls_extracted(self, monkeypatch):
agent = self._make_codex_agent(monkeypatch)
response = SimpleNamespace(
output=[
SimpleNamespace(type="function_call", status="completed",
call_id="call_xyz", name="web_search",
arguments='{"query":"test"}', id="fc_xyz"),
],
status="completed",
)
msg, reason = agent._normalize_codex_response(response)
assert reason == "tool_calls"
assert len(msg.tool_calls) == 1
assert msg.tool_calls[0].function.name == "web_search"
# ── Chat completions response handling (OpenRouter/Nous) ─────────────────────
class TestBuildAssistantMessage:
"""Verify _build_assistant_message works for all provider response formats."""
def test_openrouter_reasoning_fields(self, monkeypatch):
agent = _make_agent(monkeypatch, "openrouter")
msg = SimpleNamespace(
content="answer",
tool_calls=None,
reasoning="I thought about it",
reasoning_content=None,
reasoning_details=None,
)
result = agent._build_assistant_message(msg, "stop")
assert result["content"] == "answer"
assert result["reasoning"] == "I thought about it"
assert "codex_reasoning_items" not in result
def test_openrouter_reasoning_details_preserved_unmodified(self, monkeypatch):
"""reasoning_details must be passed back exactly as received for
multi-turn continuity (OpenRouter, Anthropic, OpenAI all need this)."""
agent = _make_agent(monkeypatch, "openrouter")
original_detail = {
"type": "thinking",
"thinking": "deep thoughts here",
"signature": "sig123_opaque_blob",
"encrypted_content": "some_provider_blob",
"extra_field": "should_not_be_dropped",
}
msg = SimpleNamespace(
content="answer",
tool_calls=None,
reasoning=None,
reasoning_content=None,
reasoning_details=[original_detail],
)
result = agent._build_assistant_message(msg, "stop")
stored = result["reasoning_details"][0]
# ALL fields must survive, not just type/text/signature
assert stored["signature"] == "sig123_opaque_blob"
assert stored["encrypted_content"] == "some_provider_blob"
assert stored["extra_field"] == "should_not_be_dropped"
assert stored["thinking"] == "deep thoughts here"
def test_codex_preserves_encrypted_reasoning(self, monkeypatch):
agent = _make_agent(monkeypatch, "openai-codex", api_mode="codex_responses",
base_url="https://chatgpt.com/backend-api/codex")
msg = SimpleNamespace(
content="result",
tool_calls=None,
reasoning="summary text",
reasoning_content=None,
reasoning_details=None,
codex_reasoning_items=[
{"type": "reasoning", "id": "rs_1", "encrypted_content": "gAAAA_blob"},
],
)
result = agent._build_assistant_message(msg, "stop")
assert result["codex_reasoning_items"] == [
{"type": "reasoning", "id": "rs_1", "encrypted_content": "gAAAA_blob"},
]
def test_plain_message_no_codex_items(self, monkeypatch):
agent = _make_agent(monkeypatch, "openrouter")
msg = SimpleNamespace(
content="simple",
tool_calls=None,
reasoning=None,
reasoning_content=None,
reasoning_details=None,
)
result = agent._build_assistant_message(msg, "stop")
assert "codex_reasoning_items" not in result
# ── Auxiliary client provider resolution ─────────────────────────────────────
class TestAuxiliaryClientProviderPriority:
"""Verify auxiliary client resolution doesn't break for any provider."""
def test_openrouter_always_wins(self, monkeypatch):
monkeypatch.setenv("OPENROUTER_API_KEY", "or-key")
from agent.auxiliary_client import get_text_auxiliary_client
with patch("agent.auxiliary_client.OpenAI") as mock:
client, model = get_text_auxiliary_client()
assert model == "google/gemini-3-flash-preview"
assert "openrouter" in str(mock.call_args.kwargs["base_url"]).lower()
def test_nous_when_no_openrouter(self, monkeypatch):
monkeypatch.delenv("OPENROUTER_API_KEY", raising=False)
from agent.auxiliary_client import get_text_auxiliary_client
with patch("agent.auxiliary_client._read_nous_auth", return_value={"access_token": "nous-tok"}), \
patch("agent.auxiliary_client.OpenAI") as mock:
client, model = get_text_auxiliary_client()
assert model == "gemini-3-flash"
def test_custom_endpoint_when_no_nous(self, monkeypatch):
monkeypatch.delenv("OPENROUTER_API_KEY", raising=False)
monkeypatch.setenv("OPENAI_BASE_URL", "http://localhost:1234/v1")
monkeypatch.setenv("OPENAI_API_KEY", "local-key")
from agent.auxiliary_client import get_text_auxiliary_client
with patch("agent.auxiliary_client._read_nous_auth", return_value=None), \
patch("agent.auxiliary_client.OpenAI") as mock:
client, model = get_text_auxiliary_client()
assert mock.call_args.kwargs["base_url"] == "http://localhost:1234/v1"
def test_codex_fallback_last_resort(self, monkeypatch):
monkeypatch.delenv("OPENROUTER_API_KEY", raising=False)
monkeypatch.delenv("OPENAI_BASE_URL", raising=False)
monkeypatch.delenv("OPENAI_API_KEY", raising=False)
from agent.auxiliary_client import get_text_auxiliary_client, CodexAuxiliaryClient
with patch("agent.auxiliary_client._read_nous_auth", return_value=None), \
patch("agent.auxiliary_client._read_codex_access_token", return_value="codex-tok"), \
patch("agent.auxiliary_client.OpenAI"):
client, model = get_text_auxiliary_client()
assert model == "gpt-5.3-codex"
assert isinstance(client, CodexAuxiliaryClient)

View file

@ -0,0 +1,748 @@
import sys
import types
from types import SimpleNamespace
import pytest
sys.modules.setdefault("fire", types.SimpleNamespace(Fire=lambda *a, **k: None))
sys.modules.setdefault("firecrawl", types.SimpleNamespace(Firecrawl=object))
sys.modules.setdefault("fal_client", types.SimpleNamespace())
import run_agent
def _patch_agent_bootstrap(monkeypatch):
monkeypatch.setattr(
run_agent,
"get_tool_definitions",
lambda **kwargs: [
{
"type": "function",
"function": {
"name": "terminal",
"description": "Run shell commands.",
"parameters": {"type": "object", "properties": {}},
},
}
],
)
monkeypatch.setattr(run_agent, "check_toolset_requirements", lambda: {})
def _build_agent(monkeypatch):
_patch_agent_bootstrap(monkeypatch)
agent = run_agent.AIAgent(
model="gpt-5-codex",
base_url="https://chatgpt.com/backend-api/codex",
api_key="codex-token",
quiet_mode=True,
max_iterations=4,
skip_context_files=True,
skip_memory=True,
)
agent._cleanup_task_resources = lambda task_id: None
agent._persist_session = lambda messages, history=None: None
agent._save_trajectory = lambda messages, user_message, completed: None
agent._save_session_log = lambda messages: None
return agent
def _codex_message_response(text: str):
return SimpleNamespace(
output=[
SimpleNamespace(
type="message",
content=[SimpleNamespace(type="output_text", text=text)],
)
],
usage=SimpleNamespace(input_tokens=5, output_tokens=3, total_tokens=8),
status="completed",
model="gpt-5-codex",
)
def _codex_tool_call_response():
return SimpleNamespace(
output=[
SimpleNamespace(
type="function_call",
id="fc_1",
call_id="call_1",
name="terminal",
arguments="{}",
)
],
usage=SimpleNamespace(input_tokens=12, output_tokens=4, total_tokens=16),
status="completed",
model="gpt-5-codex",
)
def _codex_incomplete_message_response(text: str):
return SimpleNamespace(
output=[
SimpleNamespace(
type="message",
status="in_progress",
content=[SimpleNamespace(type="output_text", text=text)],
)
],
usage=SimpleNamespace(input_tokens=4, output_tokens=2, total_tokens=6),
status="in_progress",
model="gpt-5-codex",
)
def _codex_commentary_message_response(text: str):
return SimpleNamespace(
output=[
SimpleNamespace(
type="message",
phase="commentary",
status="completed",
content=[SimpleNamespace(type="output_text", text=text)],
)
],
usage=SimpleNamespace(input_tokens=4, output_tokens=2, total_tokens=6),
status="completed",
model="gpt-5-codex",
)
def _codex_ack_message_response(text: str):
return SimpleNamespace(
output=[
SimpleNamespace(
type="message",
status="completed",
content=[SimpleNamespace(type="output_text", text=text)],
)
],
usage=SimpleNamespace(input_tokens=4, output_tokens=2, total_tokens=6),
status="completed",
model="gpt-5-codex",
)
class _FakeResponsesStream:
def __init__(self, *, final_response=None, final_error=None):
self._final_response = final_response
self._final_error = final_error
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def __iter__(self):
return iter(())
def get_final_response(self):
if self._final_error is not None:
raise self._final_error
return self._final_response
class _FakeCreateStream:
def __init__(self, events):
self._events = list(events)
self.closed = False
def __iter__(self):
return iter(self._events)
def close(self):
self.closed = True
def _codex_request_kwargs():
return {
"model": "gpt-5-codex",
"instructions": "You are Hermes.",
"input": [{"role": "user", "content": "Ping"}],
"tools": None,
"store": False,
}
def test_api_mode_uses_explicit_provider_when_codex(monkeypatch):
_patch_agent_bootstrap(monkeypatch)
agent = run_agent.AIAgent(
model="gpt-5-codex",
base_url="https://openrouter.ai/api/v1",
provider="openai-codex",
api_key="codex-token",
quiet_mode=True,
max_iterations=1,
skip_context_files=True,
skip_memory=True,
)
assert agent.api_mode == "codex_responses"
assert agent.provider == "openai-codex"
def test_api_mode_normalizes_provider_case(monkeypatch):
_patch_agent_bootstrap(monkeypatch)
agent = run_agent.AIAgent(
model="gpt-5-codex",
base_url="https://openrouter.ai/api/v1",
provider="OpenAI-Codex",
api_key="codex-token",
quiet_mode=True,
max_iterations=1,
skip_context_files=True,
skip_memory=True,
)
assert agent.provider == "openai-codex"
assert agent.api_mode == "codex_responses"
def test_api_mode_respects_explicit_openrouter_provider_over_codex_url(monkeypatch):
_patch_agent_bootstrap(monkeypatch)
agent = run_agent.AIAgent(
model="gpt-5-codex",
base_url="https://chatgpt.com/backend-api/codex",
provider="openrouter",
api_key="test-token",
quiet_mode=True,
max_iterations=1,
skip_context_files=True,
skip_memory=True,
)
assert agent.api_mode == "chat_completions"
assert agent.provider == "openrouter"
def test_build_api_kwargs_codex(monkeypatch):
agent = _build_agent(monkeypatch)
kwargs = agent._build_api_kwargs(
[
{"role": "system", "content": "You are Hermes."},
{"role": "user", "content": "Ping"},
]
)
assert kwargs["model"] == "gpt-5-codex"
assert kwargs["instructions"] == "You are Hermes."
assert kwargs["store"] is False
assert isinstance(kwargs["input"], list)
assert kwargs["input"][0]["role"] == "user"
assert kwargs["tools"][0]["type"] == "function"
assert kwargs["tools"][0]["name"] == "terminal"
assert kwargs["tools"][0]["strict"] is False
assert "function" not in kwargs["tools"][0]
assert kwargs["store"] is False
assert "timeout" not in kwargs
assert "max_tokens" not in kwargs
assert "extra_body" not in kwargs
def test_run_codex_stream_retries_when_completed_event_missing(monkeypatch):
agent = _build_agent(monkeypatch)
calls = {"stream": 0}
def _fake_stream(**kwargs):
calls["stream"] += 1
if calls["stream"] == 1:
return _FakeResponsesStream(
final_error=RuntimeError("Didn't receive a `response.completed` event.")
)
return _FakeResponsesStream(final_response=_codex_message_response("stream ok"))
agent.client = SimpleNamespace(
responses=SimpleNamespace(
stream=_fake_stream,
create=lambda **kwargs: _codex_message_response("fallback"),
)
)
response = agent._run_codex_stream(_codex_request_kwargs())
assert calls["stream"] == 2
assert response.output[0].content[0].text == "stream ok"
def test_run_codex_stream_falls_back_to_create_after_stream_completion_error(monkeypatch):
agent = _build_agent(monkeypatch)
calls = {"stream": 0, "create": 0}
def _fake_stream(**kwargs):
calls["stream"] += 1
return _FakeResponsesStream(
final_error=RuntimeError("Didn't receive a `response.completed` event.")
)
def _fake_create(**kwargs):
calls["create"] += 1
return _codex_message_response("create fallback ok")
agent.client = SimpleNamespace(
responses=SimpleNamespace(
stream=_fake_stream,
create=_fake_create,
)
)
response = agent._run_codex_stream(_codex_request_kwargs())
assert calls["stream"] == 2
assert calls["create"] == 1
assert response.output[0].content[0].text == "create fallback ok"
def test_run_codex_stream_fallback_parses_create_stream_events(monkeypatch):
agent = _build_agent(monkeypatch)
calls = {"stream": 0, "create": 0}
create_stream = _FakeCreateStream(
[
SimpleNamespace(type="response.created"),
SimpleNamespace(type="response.in_progress"),
SimpleNamespace(type="response.completed", response=_codex_message_response("streamed create ok")),
]
)
def _fake_stream(**kwargs):
calls["stream"] += 1
return _FakeResponsesStream(
final_error=RuntimeError("Didn't receive a `response.completed` event.")
)
def _fake_create(**kwargs):
calls["create"] += 1
assert kwargs.get("stream") is True
return create_stream
agent.client = SimpleNamespace(
responses=SimpleNamespace(
stream=_fake_stream,
create=_fake_create,
)
)
response = agent._run_codex_stream(_codex_request_kwargs())
assert calls["stream"] == 2
assert calls["create"] == 1
assert create_stream.closed is True
assert response.output[0].content[0].text == "streamed create ok"
def test_run_conversation_codex_plain_text(monkeypatch):
agent = _build_agent(monkeypatch)
monkeypatch.setattr(agent, "_interruptible_api_call", lambda api_kwargs: _codex_message_response("OK"))
result = agent.run_conversation("Say OK")
assert result["completed"] is True
assert result["final_response"] == "OK"
assert result["messages"][-1]["role"] == "assistant"
assert result["messages"][-1]["content"] == "OK"
def test_run_conversation_codex_refreshes_after_401_and_retries(monkeypatch):
agent = _build_agent(monkeypatch)
calls = {"api": 0, "refresh": 0}
class _UnauthorizedError(RuntimeError):
def __init__(self):
super().__init__("Error code: 401 - unauthorized")
self.status_code = 401
def _fake_api_call(api_kwargs):
calls["api"] += 1
if calls["api"] == 1:
raise _UnauthorizedError()
return _codex_message_response("Recovered after refresh")
def _fake_refresh(*, force=True):
calls["refresh"] += 1
assert force is True
return True
monkeypatch.setattr(agent, "_interruptible_api_call", _fake_api_call)
monkeypatch.setattr(agent, "_try_refresh_codex_client_credentials", _fake_refresh)
result = agent.run_conversation("Say OK")
assert calls["api"] == 2
assert calls["refresh"] == 1
assert result["completed"] is True
assert result["final_response"] == "Recovered after refresh"
def test_try_refresh_codex_client_credentials_rebuilds_client(monkeypatch):
agent = _build_agent(monkeypatch)
closed = {"value": False}
rebuilt = {"kwargs": None}
class _ExistingClient:
def close(self):
closed["value"] = True
class _RebuiltClient:
pass
def _fake_openai(**kwargs):
rebuilt["kwargs"] = kwargs
return _RebuiltClient()
monkeypatch.setattr(
"hermes_cli.auth.resolve_codex_runtime_credentials",
lambda force_refresh=True: {
"api_key": "new-codex-token",
"base_url": "https://chatgpt.com/backend-api/codex",
},
)
monkeypatch.setattr(run_agent, "OpenAI", _fake_openai)
agent.client = _ExistingClient()
ok = agent._try_refresh_codex_client_credentials(force=True)
assert ok is True
assert closed["value"] is True
assert rebuilt["kwargs"]["api_key"] == "new-codex-token"
assert rebuilt["kwargs"]["base_url"] == "https://chatgpt.com/backend-api/codex"
assert isinstance(agent.client, _RebuiltClient)
def test_run_conversation_codex_tool_round_trip(monkeypatch):
agent = _build_agent(monkeypatch)
responses = [_codex_tool_call_response(), _codex_message_response("done")]
monkeypatch.setattr(agent, "_interruptible_api_call", lambda api_kwargs: responses.pop(0))
def _fake_execute_tool_calls(assistant_message, messages, effective_task_id):
for call in assistant_message.tool_calls:
messages.append(
{
"role": "tool",
"tool_call_id": call.id,
"content": '{"ok":true}',
}
)
monkeypatch.setattr(agent, "_execute_tool_calls", _fake_execute_tool_calls)
result = agent.run_conversation("run a command")
assert result["completed"] is True
assert result["final_response"] == "done"
assert any(msg.get("tool_calls") for msg in result["messages"] if msg.get("role") == "assistant")
assert any(msg.get("role") == "tool" and msg.get("tool_call_id") == "call_1" for msg in result["messages"])
def test_chat_messages_to_responses_input_uses_call_id_for_function_call(monkeypatch):
agent = _build_agent(monkeypatch)
items = agent._chat_messages_to_responses_input(
[
{"role": "user", "content": "Run terminal"},
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "call_abc123",
"type": "function",
"function": {"name": "terminal", "arguments": "{}"},
}
],
},
{"role": "tool", "tool_call_id": "call_abc123", "content": '{"ok":true}'},
]
)
function_call = next(item for item in items if item.get("type") == "function_call")
function_output = next(item for item in items if item.get("type") == "function_call_output")
assert function_call["call_id"] == "call_abc123"
assert "id" not in function_call
assert function_output["call_id"] == "call_abc123"
def test_chat_messages_to_responses_input_accepts_call_pipe_fc_ids(monkeypatch):
agent = _build_agent(monkeypatch)
items = agent._chat_messages_to_responses_input(
[
{"role": "user", "content": "Run terminal"},
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "call_pair123|fc_pair123",
"type": "function",
"function": {"name": "terminal", "arguments": "{}"},
}
],
},
{"role": "tool", "tool_call_id": "call_pair123|fc_pair123", "content": '{"ok":true}'},
]
)
function_call = next(item for item in items if item.get("type") == "function_call")
function_output = next(item for item in items if item.get("type") == "function_call_output")
assert function_call["call_id"] == "call_pair123"
assert "id" not in function_call
assert function_output["call_id"] == "call_pair123"
def test_preflight_codex_api_kwargs_strips_optional_function_call_id(monkeypatch):
agent = _build_agent(monkeypatch)
preflight = agent._preflight_codex_api_kwargs(
{
"model": "gpt-5-codex",
"instructions": "You are Hermes.",
"input": [
{"role": "user", "content": "hi"},
{
"type": "function_call",
"id": "call_bad",
"call_id": "call_good",
"name": "terminal",
"arguments": "{}",
},
],
"tools": [],
"store": False,
}
)
fn_call = next(item for item in preflight["input"] if item.get("type") == "function_call")
assert fn_call["call_id"] == "call_good"
assert "id" not in fn_call
def test_preflight_codex_api_kwargs_rejects_function_call_output_without_call_id(monkeypatch):
agent = _build_agent(monkeypatch)
with pytest.raises(ValueError, match="function_call_output is missing call_id"):
agent._preflight_codex_api_kwargs(
{
"model": "gpt-5-codex",
"instructions": "You are Hermes.",
"input": [{"type": "function_call_output", "output": "{}"}],
"tools": [],
"store": False,
}
)
def test_preflight_codex_api_kwargs_rejects_unsupported_request_fields(monkeypatch):
agent = _build_agent(monkeypatch)
kwargs = _codex_request_kwargs()
kwargs["some_unknown_field"] = "value"
with pytest.raises(ValueError, match="unsupported field"):
agent._preflight_codex_api_kwargs(kwargs)
def test_preflight_codex_api_kwargs_allows_reasoning_and_temperature(monkeypatch):
agent = _build_agent(monkeypatch)
kwargs = _codex_request_kwargs()
kwargs["reasoning"] = {"effort": "high", "summary": "auto"}
kwargs["include"] = ["reasoning.encrypted_content"]
kwargs["temperature"] = 0.7
kwargs["max_output_tokens"] = 4096
result = agent._preflight_codex_api_kwargs(kwargs)
assert result["reasoning"] == {"effort": "high", "summary": "auto"}
assert result["include"] == ["reasoning.encrypted_content"]
assert result["temperature"] == 0.7
assert result["max_output_tokens"] == 4096
def test_run_conversation_codex_replay_payload_keeps_call_id(monkeypatch):
agent = _build_agent(monkeypatch)
responses = [_codex_tool_call_response(), _codex_message_response("done")]
requests = []
def _fake_api_call(api_kwargs):
requests.append(api_kwargs)
return responses.pop(0)
monkeypatch.setattr(agent, "_interruptible_api_call", _fake_api_call)
def _fake_execute_tool_calls(assistant_message, messages, effective_task_id):
for call in assistant_message.tool_calls:
messages.append(
{
"role": "tool",
"tool_call_id": call.id,
"content": '{"ok":true}',
}
)
monkeypatch.setattr(agent, "_execute_tool_calls", _fake_execute_tool_calls)
result = agent.run_conversation("run a command")
assert result["completed"] is True
assert result["final_response"] == "done"
assert len(requests) >= 2
replay_input = requests[1]["input"]
function_call = next(item for item in replay_input if item.get("type") == "function_call")
function_output = next(item for item in replay_input if item.get("type") == "function_call_output")
assert function_call["call_id"] == "call_1"
assert "id" not in function_call
assert function_output["call_id"] == "call_1"
def test_run_conversation_codex_continues_after_incomplete_interim_message(monkeypatch):
agent = _build_agent(monkeypatch)
responses = [
_codex_incomplete_message_response("I'll inspect the repo structure first."),
_codex_tool_call_response(),
_codex_message_response("Architecture summary complete."),
]
monkeypatch.setattr(agent, "_interruptible_api_call", lambda api_kwargs: responses.pop(0))
def _fake_execute_tool_calls(assistant_message, messages, effective_task_id):
for call in assistant_message.tool_calls:
messages.append(
{
"role": "tool",
"tool_call_id": call.id,
"content": '{"ok":true}',
}
)
monkeypatch.setattr(agent, "_execute_tool_calls", _fake_execute_tool_calls)
result = agent.run_conversation("analyze repo")
assert result["completed"] is True
assert result["final_response"] == "Architecture summary complete."
assert any(
msg.get("role") == "assistant"
and msg.get("finish_reason") == "incomplete"
and "inspect the repo structure" in (msg.get("content") or "")
for msg in result["messages"]
)
assert any(msg.get("role") == "tool" and msg.get("tool_call_id") == "call_1" for msg in result["messages"])
def test_normalize_codex_response_marks_commentary_only_message_as_incomplete(monkeypatch):
agent = _build_agent(monkeypatch)
assistant_message, finish_reason = agent._normalize_codex_response(
_codex_commentary_message_response("I'll inspect the repository first.")
)
assert finish_reason == "incomplete"
assert "inspect the repository" in (assistant_message.content or "")
def test_run_conversation_codex_continues_after_commentary_phase_message(monkeypatch):
agent = _build_agent(monkeypatch)
responses = [
_codex_commentary_message_response("I'll inspect the repo structure first."),
_codex_tool_call_response(),
_codex_message_response("Architecture summary complete."),
]
monkeypatch.setattr(agent, "_interruptible_api_call", lambda api_kwargs: responses.pop(0))
def _fake_execute_tool_calls(assistant_message, messages, effective_task_id):
for call in assistant_message.tool_calls:
messages.append(
{
"role": "tool",
"tool_call_id": call.id,
"content": '{"ok":true}',
}
)
monkeypatch.setattr(agent, "_execute_tool_calls", _fake_execute_tool_calls)
result = agent.run_conversation("analyze repo")
assert result["completed"] is True
assert result["final_response"] == "Architecture summary complete."
assert any(
msg.get("role") == "assistant"
and msg.get("finish_reason") == "incomplete"
and "inspect the repo structure" in (msg.get("content") or "")
for msg in result["messages"]
)
assert any(msg.get("role") == "tool" and msg.get("tool_call_id") == "call_1" for msg in result["messages"])
def test_run_conversation_codex_continues_after_ack_stop_message(monkeypatch):
agent = _build_agent(monkeypatch)
responses = [
_codex_ack_message_response(
"Absolutely — I can do that. I'll inspect ~/openclaw-studio and report back with a walkthrough."
),
_codex_tool_call_response(),
_codex_message_response("Architecture summary complete."),
]
monkeypatch.setattr(agent, "_interruptible_api_call", lambda api_kwargs: responses.pop(0))
def _fake_execute_tool_calls(assistant_message, messages, effective_task_id):
for call in assistant_message.tool_calls:
messages.append(
{
"role": "tool",
"tool_call_id": call.id,
"content": '{"ok":true}',
}
)
monkeypatch.setattr(agent, "_execute_tool_calls", _fake_execute_tool_calls)
result = agent.run_conversation("look into ~/openclaw-studio and tell me how it works")
assert result["completed"] is True
assert result["final_response"] == "Architecture summary complete."
assert any(
msg.get("role") == "assistant"
and msg.get("finish_reason") == "incomplete"
and "inspect ~/openclaw-studio" in (msg.get("content") or "")
for msg in result["messages"]
)
assert any(
msg.get("role") == "user"
and "Continue now. Execute the required tool calls" in (msg.get("content") or "")
for msg in result["messages"]
)
assert any(msg.get("role") == "tool" and msg.get("tool_call_id") == "call_1" for msg in result["messages"])
def test_run_conversation_codex_continues_after_ack_for_directory_listing_prompt(monkeypatch):
agent = _build_agent(monkeypatch)
responses = [
_codex_ack_message_response(
"I'll check what's in the current directory and call out 3 notable items."
),
_codex_tool_call_response(),
_codex_message_response("Directory summary complete."),
]
monkeypatch.setattr(agent, "_interruptible_api_call", lambda api_kwargs: responses.pop(0))
def _fake_execute_tool_calls(assistant_message, messages, effective_task_id):
for call in assistant_message.tool_calls:
messages.append(
{
"role": "tool",
"tool_call_id": call.id,
"content": '{"ok":true}',
}
)
monkeypatch.setattr(agent, "_execute_tool_calls", _fake_execute_tool_calls)
result = agent.run_conversation("look at current directory and list 3 notable things")
assert result["completed"] is True
assert result["final_response"] == "Directory summary complete."
assert any(
msg.get("role") == "assistant"
and msg.get("finish_reason") == "incomplete"
and "current directory" in (msg.get("content") or "")
for msg in result["messages"]
)
assert any(
msg.get("role") == "user"
and "Continue now. Execute the required tool calls" in (msg.get("content") or "")
for msg in result["messages"]
)
assert any(msg.get("role") == "tool" and msg.get("tool_call_id") == "call_1" for msg in result["messages"])

View file

@ -0,0 +1,95 @@
from hermes_cli import runtime_provider as rp
def test_resolve_runtime_provider_codex(monkeypatch):
monkeypatch.setattr(rp, "resolve_provider", lambda *a, **k: "openai-codex")
monkeypatch.setattr(
rp,
"resolve_codex_runtime_credentials",
lambda: {
"provider": "openai-codex",
"base_url": "https://chatgpt.com/backend-api/codex",
"api_key": "codex-token",
"source": "codex-auth-json",
"auth_file": "/tmp/auth.json",
"codex_home": "/tmp/codex",
"last_refresh": "2026-02-26T00:00:00Z",
},
)
resolved = rp.resolve_runtime_provider(requested="openai-codex")
assert resolved["provider"] == "openai-codex"
assert resolved["api_mode"] == "codex_responses"
assert resolved["base_url"] == "https://chatgpt.com/backend-api/codex"
assert resolved["api_key"] == "codex-token"
assert resolved["requested_provider"] == "openai-codex"
def test_resolve_runtime_provider_openrouter_explicit(monkeypatch):
monkeypatch.setattr(rp, "resolve_provider", lambda *a, **k: "openrouter")
monkeypatch.setattr(rp, "_get_model_config", lambda: {})
monkeypatch.delenv("OPENAI_BASE_URL", raising=False)
monkeypatch.delenv("OPENROUTER_BASE_URL", raising=False)
monkeypatch.delenv("OPENAI_API_KEY", raising=False)
monkeypatch.delenv("OPENROUTER_API_KEY", raising=False)
resolved = rp.resolve_runtime_provider(
requested="openrouter",
explicit_api_key="test-key",
explicit_base_url="https://example.com/v1/",
)
assert resolved["provider"] == "openrouter"
assert resolved["api_mode"] == "chat_completions"
assert resolved["api_key"] == "test-key"
assert resolved["base_url"] == "https://example.com/v1"
assert resolved["source"] == "explicit"
def test_resolve_runtime_provider_openrouter_ignores_codex_config_base_url(monkeypatch):
monkeypatch.setattr(rp, "resolve_provider", lambda *a, **k: "openrouter")
monkeypatch.setattr(
rp,
"_get_model_config",
lambda: {
"provider": "openai-codex",
"base_url": "https://chatgpt.com/backend-api/codex",
},
)
monkeypatch.delenv("OPENAI_BASE_URL", raising=False)
monkeypatch.delenv("OPENROUTER_BASE_URL", raising=False)
monkeypatch.delenv("OPENAI_API_KEY", raising=False)
monkeypatch.delenv("OPENROUTER_API_KEY", raising=False)
resolved = rp.resolve_runtime_provider(requested="openrouter")
assert resolved["provider"] == "openrouter"
assert resolved["base_url"] == rp.OPENROUTER_BASE_URL
def test_resolve_runtime_provider_auto_uses_custom_config_base_url(monkeypatch):
monkeypatch.setattr(rp, "resolve_provider", lambda *a, **k: "openrouter")
monkeypatch.setattr(
rp,
"_get_model_config",
lambda: {
"provider": "auto",
"base_url": "https://custom.example/v1/",
},
)
monkeypatch.delenv("OPENAI_BASE_URL", raising=False)
monkeypatch.delenv("OPENROUTER_BASE_URL", raising=False)
monkeypatch.delenv("OPENAI_API_KEY", raising=False)
monkeypatch.delenv("OPENROUTER_API_KEY", raising=False)
resolved = rp.resolve_runtime_provider(requested="auto")
assert resolved["provider"] == "openrouter"
assert resolved["base_url"] == "https://custom.example/v1"
def test_resolve_requested_provider_precedence(monkeypatch):
monkeypatch.setenv("HERMES_INFERENCE_PROVIDER", "nous")
monkeypatch.setattr(rp, "_get_model_config", lambda: {"provider": "openai-codex"})
assert rp.resolve_requested_provider("openrouter") == "openrouter"

View file

@ -30,6 +30,9 @@ def _make_mock_parent(depth=0):
"""Create a mock parent agent with the fields delegate_task expects."""
parent = MagicMock()
parent.base_url = "https://openrouter.ai/api/v1"
parent.api_key = "parent-key"
parent.provider = "openrouter"
parent.api_mode = "chat_completions"
parent.model = "anthropic/claude-sonnet-4"
parent.platform = "cli"
parent.providers_allowed = None
@ -218,6 +221,30 @@ class TestDelegateTask(unittest.TestCase):
delegate_task(goal="Test tracking", parent_agent=parent)
self.assertEqual(len(parent._active_children), 0)
def test_child_inherits_runtime_credentials(self):
parent = _make_mock_parent(depth=0)
parent.base_url = "https://chatgpt.com/backend-api/codex"
parent.api_key = "codex-token"
parent.provider = "openai-codex"
parent.api_mode = "codex_responses"
with patch("run_agent.AIAgent") as MockAgent:
mock_child = MagicMock()
mock_child.run_conversation.return_value = {
"final_response": "ok",
"completed": True,
"api_calls": 1,
}
MockAgent.return_value = mock_child
delegate_task(goal="Test runtime inheritance", parent_agent=parent)
_, kwargs = MockAgent.call_args
self.assertEqual(kwargs["base_url"], parent.base_url)
self.assertEqual(kwargs["api_key"], parent.api_key)
self.assertEqual(kwargs["provider"], parent.provider)
self.assertEqual(kwargs["api_mode"], parent.api_mode)
class TestBlockedTools(unittest.TestCase):
def test_blocked_tools_constant(self):

View file

@ -0,0 +1,483 @@
"""Live integration tests for file operations and terminal tools.
These tests run REAL commands through the LocalEnvironment -- no mocks.
They verify that shell noise is properly filtered, commands actually work,
and the tool outputs are EXACTLY what the agent would see.
Every test with output validates against a known-good value AND
asserts zero contamination from shell noise via _assert_clean().
"""
import json
import os
import sys
from pathlib import Path
import pytest
sys.path.insert(0, str(Path(__file__).resolve().parents[2]))
from tools.environments.local import LocalEnvironment, _clean_shell_noise, _SHELL_NOISE_SUBSTRINGS
from tools.file_operations import ShellFileOperations
# ── Shared noise detection ───────────────────────────────────────────────
# Every known shell noise pattern. If ANY of these appear in output that
# isn't explicitly expected, the test fails with a clear message.
_ALL_NOISE_PATTERNS = list(_SHELL_NOISE_SUBSTRINGS) + [
"bash: ",
"Inappropriate ioctl",
]
def _assert_clean(text: str, context: str = "output"):
"""Assert text contains zero shell noise contamination."""
if not text:
return
for noise in _ALL_NOISE_PATTERNS:
assert noise not in text, (
f"Shell noise leaked into {context}: found {noise!r} in:\n"
f"{text[:500]}"
)
# ── Fixtures ─────────────────────────────────────────────────────────────
# Deterministic file content used across tests. Every byte is known,
# so any unexpected text in results is immediately caught.
SIMPLE_CONTENT = "alpha\nbravo\ncharlie\n"
NUMBERED_CONTENT = "\n".join(f"LINE_{i:04d}" for i in range(1, 51)) + "\n"
SPECIAL_CONTENT = "single 'quotes' and \"doubles\" and $VARS and `backticks` and \\backslash\n"
MULTIFILE_A = "def func_alpha():\n return 42\n"
MULTIFILE_B = "def func_bravo():\n return 99\n"
MULTIFILE_C = "nothing relevant here\n"
@pytest.fixture
def env(tmp_path):
"""A real LocalEnvironment rooted in a temp directory."""
return LocalEnvironment(cwd=str(tmp_path), timeout=15)
@pytest.fixture
def ops(env, tmp_path):
"""ShellFileOperations wired to the real local environment."""
return ShellFileOperations(env, cwd=str(tmp_path))
@pytest.fixture
def populated_dir(tmp_path):
"""A temp directory with known files for search/read tests."""
(tmp_path / "alpha.py").write_text(MULTIFILE_A)
(tmp_path / "bravo.py").write_text(MULTIFILE_B)
(tmp_path / "notes.txt").write_text(MULTIFILE_C)
(tmp_path / "data.csv").write_text("col1,col2\n1,2\n3,4\n")
return tmp_path
# ── _clean_shell_noise unit tests ────────────────────────────────────────
class TestCleanShellNoise:
def test_single_noise_line(self):
output = "bash: no job control in this shell\nhello world\n"
result = _clean_shell_noise(output)
assert result == "hello world\n"
def test_double_noise_lines(self):
output = (
"bash: cannot set terminal process group (-1): Inappropriate ioctl for device\n"
"bash: no job control in this shell\n"
"actual output here\n"
)
result = _clean_shell_noise(output)
assert result == "actual output here\n"
_assert_clean(result)
def test_tcsetattr_noise(self):
output = (
"bash: [12345: 2 (255)] tcsetattr: Inappropriate ioctl for device\n"
"real content\n"
)
result = _clean_shell_noise(output)
assert result == "real content\n"
_assert_clean(result)
def test_triple_noise_lines(self):
output = (
"bash: cannot set terminal process group (-1): Inappropriate ioctl for device\n"
"bash: no job control in this shell\n"
"bash: [999: 2 (255)] tcsetattr: Inappropriate ioctl for device\n"
"clean\n"
)
result = _clean_shell_noise(output)
assert result == "clean\n"
def test_no_noise_untouched(self):
assert _clean_shell_noise("hello\nworld\n") == "hello\nworld\n"
def test_empty_string(self):
assert _clean_shell_noise("") == ""
def test_only_noise_produces_empty(self):
output = "bash: no job control in this shell\n"
result = _clean_shell_noise(output)
_assert_clean(result)
def test_noise_in_middle_not_stripped(self):
"""Only LEADING noise is stripped -- noise in the middle is real output."""
output = "real\nbash: no job control in this shell\nmore real\n"
result = _clean_shell_noise(output)
assert result == output
# ── LocalEnvironment.execute() ───────────────────────────────────────────
class TestLocalEnvironmentExecute:
def test_echo_exact_output(self, env):
result = env.execute("echo DETERMINISTIC_OUTPUT_12345")
assert result["returncode"] == 0
assert result["output"].strip() == "DETERMINISTIC_OUTPUT_12345"
_assert_clean(result["output"])
def test_printf_no_trailing_newline(self, env):
result = env.execute("printf 'exact'")
assert result["returncode"] == 0
assert result["output"] == "exact"
_assert_clean(result["output"])
def test_exit_code_propagated(self, env):
result = env.execute("exit 42")
assert result["returncode"] == 42
def test_stderr_captured_in_output(self, env):
result = env.execute("echo STDERR_TEST >&2")
assert "STDERR_TEST" in result["output"]
_assert_clean(result["output"])
def test_cwd_respected(self, env, tmp_path):
subdir = tmp_path / "subdir_test"
subdir.mkdir()
result = env.execute("pwd", cwd=str(subdir))
assert result["returncode"] == 0
assert result["output"].strip() == str(subdir)
_assert_clean(result["output"])
def test_multiline_exact(self, env):
result = env.execute("echo AAA; echo BBB; echo CCC")
lines = [l for l in result["output"].strip().split("\n") if l.strip()]
assert lines == ["AAA", "BBB", "CCC"]
_assert_clean(result["output"])
def test_env_var_home(self, env):
result = env.execute("echo $HOME")
assert result["returncode"] == 0
home = result["output"].strip()
assert home == str(Path.home())
_assert_clean(result["output"])
def test_pipe_exact(self, env):
result = env.execute("echo 'one two three' | wc -w")
assert result["returncode"] == 0
assert result["output"].strip() == "3"
_assert_clean(result["output"])
def test_cat_deterministic_content(self, env, tmp_path):
f = tmp_path / "det.txt"
f.write_text(SIMPLE_CONTENT)
result = env.execute(f"cat {f}")
assert result["returncode"] == 0
assert result["output"] == SIMPLE_CONTENT
_assert_clean(result["output"])
# ── _has_command ─────────────────────────────────────────────────────────
class TestHasCommand:
def test_finds_echo(self, ops):
assert ops._has_command("echo") is True
def test_finds_cat(self, ops):
assert ops._has_command("cat") is True
def test_finds_sed(self, ops):
assert ops._has_command("sed") is True
def test_finds_wc(self, ops):
assert ops._has_command("wc") is True
def test_finds_find(self, ops):
assert ops._has_command("find") is True
def test_missing_command(self, ops):
assert ops._has_command("nonexistent_tool_xyz_abc_999") is False
def test_rg_or_grep_available(self, ops):
assert ops._has_command("rg") or ops._has_command("grep"), \
"Neither rg nor grep found -- search_files will break"
# ── read_file ────────────────────────────────────────────────────────────
class TestReadFile:
def test_exact_content(self, ops, tmp_path):
f = tmp_path / "exact.txt"
f.write_text(SIMPLE_CONTENT)
result = ops.read_file(str(f))
assert result.error is None
# Content has line numbers prepended, check the actual text is there
assert "alpha" in result.content
assert "bravo" in result.content
assert "charlie" in result.content
assert result.total_lines == 3
_assert_clean(result.content)
def test_absolute_path(self, ops, tmp_path):
f = tmp_path / "abs.txt"
f.write_text("ABSOLUTE_PATH_CONTENT\n")
result = ops.read_file(str(f))
assert result.error is None
assert "ABSOLUTE_PATH_CONTENT" in result.content
_assert_clean(result.content)
def test_tilde_expansion(self, ops):
test_path = Path.home() / ".hermes_test_tilde_9f8a7b"
try:
test_path.write_text("TILDE_EXPANSION_OK\n")
result = ops.read_file("~/.hermes_test_tilde_9f8a7b")
assert result.error is None
assert "TILDE_EXPANSION_OK" in result.content
_assert_clean(result.content)
finally:
test_path.unlink(missing_ok=True)
def test_nonexistent_returns_error(self, ops, tmp_path):
result = ops.read_file(str(tmp_path / "ghost.txt"))
assert result.error is not None
def test_pagination_exact_window(self, ops, tmp_path):
f = tmp_path / "numbered.txt"
f.write_text(NUMBERED_CONTENT)
result = ops.read_file(str(f), offset=10, limit=5)
assert result.error is None
assert "LINE_0010" in result.content
assert "LINE_0014" in result.content
assert "LINE_0009" not in result.content
assert "LINE_0015" not in result.content
assert result.total_lines == 50
_assert_clean(result.content)
def test_no_noise_in_content(self, ops, tmp_path):
f = tmp_path / "noise_check.txt"
f.write_text("ONLY_THIS_CONTENT\n")
result = ops.read_file(str(f))
assert result.error is None
_assert_clean(result.content)
# ── write_file ───────────────────────────────────────────────────────────
class TestWriteFile:
def test_write_and_verify(self, ops, tmp_path):
path = str(tmp_path / "written.txt")
result = ops.write_file(path, SIMPLE_CONTENT)
assert result.error is None
assert result.bytes_written == len(SIMPLE_CONTENT.encode())
assert Path(path).read_text() == SIMPLE_CONTENT
def test_creates_nested_dirs(self, ops, tmp_path):
path = str(tmp_path / "a" / "b" / "c" / "deep.txt")
result = ops.write_file(path, "DEEP_CONTENT\n")
assert result.error is None
assert result.dirs_created is True
assert Path(path).read_text() == "DEEP_CONTENT\n"
def test_overwrites_exact(self, ops, tmp_path):
path = str(tmp_path / "overwrite.txt")
Path(path).write_text("OLD_DATA\n")
result = ops.write_file(path, "NEW_DATA\n")
assert result.error is None
assert Path(path).read_text() == "NEW_DATA\n"
def test_large_content_via_stdin(self, ops, tmp_path):
path = str(tmp_path / "large.txt")
content = "X" * 200_000 + "\n"
result = ops.write_file(path, content)
assert result.error is None
assert Path(path).read_text() == content
def test_special_characters_preserved(self, ops, tmp_path):
path = str(tmp_path / "special.txt")
result = ops.write_file(path, SPECIAL_CONTENT)
assert result.error is None
assert Path(path).read_text() == SPECIAL_CONTENT
def test_roundtrip_read_write(self, ops, tmp_path):
"""Write -> read back -> verify exact match."""
path = str(tmp_path / "roundtrip.txt")
ops.write_file(path, SIMPLE_CONTENT)
result = ops.read_file(path)
assert result.error is None
assert "alpha" in result.content
assert "charlie" in result.content
_assert_clean(result.content)
# ── patch_replace ────────────────────────────────────────────────────────
class TestPatchReplace:
def test_exact_replacement(self, ops, tmp_path):
path = str(tmp_path / "patch.txt")
Path(path).write_text("hello world\n")
result = ops.patch_replace(path, "world", "earth")
assert result.error is None
assert Path(path).read_text() == "hello earth\n"
def test_not_found_error(self, ops, tmp_path):
path = str(tmp_path / "patch2.txt")
Path(path).write_text("hello\n")
result = ops.patch_replace(path, "NONEXISTENT_STRING", "replacement")
assert result.error is not None
assert "Could not find" in result.error
def test_multiline_patch(self, ops, tmp_path):
path = str(tmp_path / "multi.txt")
Path(path).write_text("line1\nline2\nline3\n")
result = ops.patch_replace(path, "line2", "REPLACED")
assert result.error is None
assert Path(path).read_text() == "line1\nREPLACED\nline3\n"
# ── search ───────────────────────────────────────────────────────────────
class TestSearch:
def test_content_search_finds_exact_match(self, ops, populated_dir):
result = ops.search("func_alpha", str(populated_dir), target="content")
assert result.error is None
assert result.total_count >= 1
assert any("func_alpha" in m.content for m in result.matches)
for m in result.matches:
_assert_clean(m.content)
_assert_clean(m.path)
def test_content_search_no_false_positives(self, ops, populated_dir):
result = ops.search("ZZZZZ_NONEXISTENT", str(populated_dir), target="content")
assert result.error is None
assert result.total_count == 0
assert len(result.matches) == 0
def test_file_search_finds_py_files(self, ops, populated_dir):
result = ops.search("*.py", str(populated_dir), target="files")
assert result.error is None
assert result.total_count >= 2
# Verify only expected files appear
found_names = set()
for f in result.files:
name = Path(f).name
found_names.add(name)
_assert_clean(f)
assert "alpha.py" in found_names
assert "bravo.py" in found_names
assert "notes.txt" not in found_names
def test_file_search_no_false_file_entries(self, ops, populated_dir):
"""Every entry in the files list must be a real path, not noise."""
result = ops.search("*.py", str(populated_dir), target="files")
assert result.error is None
for f in result.files:
_assert_clean(f)
assert Path(f).exists(), f"Search returned non-existent path: {f}"
def test_content_search_with_glob_filter(self, ops, populated_dir):
result = ops.search("return", str(populated_dir), target="content", file_glob="*.py")
assert result.error is None
for m in result.matches:
assert m.path.endswith(".py"), f"Non-py file in results: {m.path}"
_assert_clean(m.content)
_assert_clean(m.path)
def test_search_output_has_zero_noise(self, ops, populated_dir):
"""Dedicated noise check: search must return only real content."""
result = ops.search("func", str(populated_dir), target="content")
assert result.error is None
for m in result.matches:
_assert_clean(m.content)
_assert_clean(m.path)
# ── _expand_path ─────────────────────────────────────────────────────────
class TestExpandPath:
def test_tilde_exact(self, ops):
result = ops._expand_path("~/test.txt")
expected = f"{Path.home()}/test.txt"
assert result == expected
_assert_clean(result)
def test_absolute_unchanged(self, ops):
assert ops._expand_path("/tmp/test.txt") == "/tmp/test.txt"
def test_relative_unchanged(self, ops):
assert ops._expand_path("relative/path.txt") == "relative/path.txt"
def test_bare_tilde(self, ops):
result = ops._expand_path("~")
assert result == str(Path.home())
_assert_clean(result)
# ── Terminal output cleanliness ──────────────────────────────────────────
class TestTerminalOutputCleanliness:
"""Every command the agent might run must produce noise-free output."""
def test_echo(self, env):
result = env.execute("echo CLEAN_TEST")
assert result["output"].strip() == "CLEAN_TEST"
_assert_clean(result["output"])
def test_cat(self, env, tmp_path):
f = tmp_path / "cat_test.txt"
f.write_text("CAT_CONTENT_EXACT\n")
result = env.execute(f"cat {f}")
assert result["output"] == "CAT_CONTENT_EXACT\n"
_assert_clean(result["output"])
def test_ls(self, env, tmp_path):
(tmp_path / "file_a.txt").write_text("")
(tmp_path / "file_b.txt").write_text("")
result = env.execute(f"ls {tmp_path}")
_assert_clean(result["output"])
assert "file_a.txt" in result["output"]
assert "file_b.txt" in result["output"]
def test_wc(self, env, tmp_path):
f = tmp_path / "wc_test.txt"
f.write_text("one\ntwo\nthree\n")
result = env.execute(f"wc -l < {f}")
assert result["output"].strip() == "3"
_assert_clean(result["output"])
def test_head(self, env, tmp_path):
f = tmp_path / "head_test.txt"
f.write_text(NUMBERED_CONTENT)
result = env.execute(f"head -n 3 {f}")
expected = "LINE_0001\nLINE_0002\nLINE_0003\n"
assert result["output"] == expected
_assert_clean(result["output"])
def test_env_var_expansion(self, env):
result = env.execute("echo $HOME")
assert result["output"].strip() == str(Path.home())
_assert_clean(result["output"])
def test_command_substitution(self, env):
result = env.execute("echo $(echo NESTED)")
assert result["output"].strip() == "NESTED"
_assert_clean(result["output"])
def test_command_v_detection(self, env):
"""This is how _has_command works -- must return clean 'yes'."""
result = env.execute("command -v cat >/dev/null 2>&1 && echo 'yes'")
assert result["output"].strip() == "yes"
_assert_clean(result["output"])