Merge origin/main, resolve conflicts (self._base_url_lower)

This commit is contained in:
Test 2026-03-18 04:09:00 -07:00
commit e7844e9c8d
54 changed files with 2281 additions and 179 deletions

View file

@ -188,6 +188,36 @@ class TestGetModelContextLength:
result = get_model_context_length("custom/model")
assert result == CONTEXT_PROBE_TIERS[0]
@patch("agent.model_metadata.fetch_model_metadata")
@patch("agent.model_metadata.fetch_endpoint_model_metadata")
def test_custom_endpoint_metadata_beats_fuzzy_default(self, mock_endpoint_fetch, mock_fetch):
mock_fetch.return_value = {}
mock_endpoint_fetch.return_value = {
"zai-org/GLM-5-TEE": {"context_length": 65536}
}
result = get_model_context_length(
"zai-org/GLM-5-TEE",
base_url="https://llm.chutes.ai/v1",
api_key="test-key",
)
assert result == 65536
@patch("agent.model_metadata.fetch_model_metadata")
@patch("agent.model_metadata.fetch_endpoint_model_metadata")
def test_custom_endpoint_without_metadata_skips_name_based_default(self, mock_endpoint_fetch, mock_fetch):
mock_fetch.return_value = {}
mock_endpoint_fetch.return_value = {}
result = get_model_context_length(
"zai-org/GLM-5-TEE",
base_url="https://llm.chutes.ai/v1",
api_key="test-key",
)
assert result == CONTEXT_PROBE_TIERS[0]
# =========================================================================
# fetch_model_metadata — caching, TTL, slugs, failures
@ -258,6 +288,25 @@ class TestFetchModelMetadata:
assert "anthropic/claude-3.5-sonnet" in result
assert result["anthropic/claude-3.5-sonnet"]["context_length"] == 200000
@patch("agent.model_metadata.requests.get")
def test_provider_prefixed_models_get_bare_aliases(self, mock_get):
self._reset_cache()
mock_response = MagicMock()
mock_response.json.return_value = {
"data": [{
"id": "provider/test-model",
"context_length": 123456,
"name": "Provider: Test Model",
}]
}
mock_response.raise_for_status = MagicMock()
mock_get.return_value = mock_response
result = fetch_model_metadata(force_refresh=True)
assert result["provider/test-model"]["context_length"] == 123456
assert result["test-model"]["context_length"] == 123456
@patch("agent.model_metadata.requests.get")
def test_ttl_expiry_triggers_refetch(self, mock_get):
"""Cache expires after _MODEL_CACHE_TTL seconds."""

View file

@ -309,6 +309,35 @@ class TestBuildSkillsSystemPrompt:
assert "imessage" in result
assert "Send iMessages" in result
def test_excludes_disabled_skills(self, monkeypatch, tmp_path):
"""Skills in the user's disabled list should not appear in the system prompt."""
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
skills_dir = tmp_path / "skills" / "tools"
skills_dir.mkdir(parents=True)
enabled_skill = skills_dir / "web-search"
enabled_skill.mkdir()
(enabled_skill / "SKILL.md").write_text(
"---\nname: web-search\ndescription: Search the web\n---\n"
)
disabled_skill = skills_dir / "old-tool"
disabled_skill.mkdir()
(disabled_skill / "SKILL.md").write_text(
"---\nname: old-tool\ndescription: Deprecated tool\n---\n"
)
from unittest.mock import patch
with patch(
"tools.skills_tool._get_disabled_skill_names",
return_value={"old-tool"},
):
result = build_skills_system_prompt()
assert "web-search" in result
assert "old-tool" not in result
def test_includes_setup_needed_skills(self, monkeypatch, tmp_path):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
monkeypatch.delenv("MISSING_API_KEY_XYZ", raising=False)

View file

@ -85,6 +85,21 @@ class TestScanSkillCommands:
result = scan_skill_commands()
assert "/generic-tool" in result
def test_excludes_disabled_skills(self, tmp_path):
"""Disabled skills should not register slash commands."""
with (
patch("tools.skills_tool.SKILLS_DIR", tmp_path),
patch(
"tools.skills_tool._get_disabled_skill_names",
return_value={"disabled-skill"},
),
):
_make_skill(tmp_path, "enabled-skill")
_make_skill(tmp_path, "disabled-skill")
result = scan_skill_commands()
assert "/enabled-skill" in result
assert "/disabled-skill" not in result
class TestBuildPreloadedSkillsPrompt:
def test_builds_prompt_for_multiple_named_skills(self, tmp_path):

View file

@ -99,3 +99,27 @@ def test_estimate_usage_cost_refuses_cache_pricing_without_official_cache_rate(m
)
assert result.status == "unknown"
def test_custom_endpoint_models_api_pricing_is_supported(monkeypatch):
monkeypatch.setattr(
"agent.usage_pricing.fetch_endpoint_model_metadata",
lambda base_url, api_key=None: {
"zai-org/GLM-5-TEE": {
"pricing": {
"prompt": "0.0000005",
"completion": "0.000002",
}
}
},
)
entry = get_pricing_entry(
"zai-org/GLM-5-TEE",
provider="custom",
base_url="https://llm.chutes.ai/v1",
api_key="test-key",
)
assert float(entry.input_cost_per_million) == 0.5
assert float(entry.output_cost_per_million) == 2.0

View file

@ -2,7 +2,7 @@
import json
import pytest
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from pathlib import Path
from unittest.mock import patch
@ -122,11 +122,29 @@ class TestComputeNextRun:
schedule = {"kind": "once", "run_at": future}
assert compute_next_run(schedule) == future
def test_once_recent_past_within_grace_returns_time(self, monkeypatch):
now = datetime(2026, 3, 18, 4, 22, 3, tzinfo=timezone.utc)
run_at = "2026-03-18T04:22:00+00:00"
monkeypatch.setattr("cron.jobs._hermes_now", lambda: now)
schedule = {"kind": "once", "run_at": run_at}
assert compute_next_run(schedule) == run_at
def test_once_past_returns_none(self):
past = (datetime.now() - timedelta(hours=1)).isoformat()
schedule = {"kind": "once", "run_at": past}
assert compute_next_run(schedule) is None
def test_once_with_last_run_returns_none_even_within_grace(self, monkeypatch):
now = datetime(2026, 3, 18, 4, 22, 3, tzinfo=timezone.utc)
run_at = "2026-03-18T04:22:00+00:00"
monkeypatch.setattr("cron.jobs._hermes_now", lambda: now)
schedule = {"kind": "once", "run_at": run_at}
assert compute_next_run(schedule, last_run_at=now.isoformat()) is None
def test_interval_first_run(self):
schedule = {"kind": "interval", "minutes": 60}
result = compute_next_run(schedule)
@ -347,6 +365,67 @@ class TestGetDueJobs:
due = get_due_jobs()
assert len(due) == 0
def test_broken_recent_one_shot_without_next_run_is_recovered(self, tmp_cron_dir, monkeypatch):
now = datetime(2026, 3, 18, 4, 22, 30, tzinfo=timezone.utc)
monkeypatch.setattr("cron.jobs._hermes_now", lambda: now)
run_at = "2026-03-18T04:22:00+00:00"
save_jobs(
[{
"id": "oneshot-recover",
"name": "Recover me",
"prompt": "Word of the day",
"schedule": {"kind": "once", "run_at": run_at, "display": "once at 2026-03-18 04:22"},
"schedule_display": "once at 2026-03-18 04:22",
"repeat": {"times": 1, "completed": 0},
"enabled": True,
"state": "scheduled",
"paused_at": None,
"paused_reason": None,
"created_at": "2026-03-18T04:21:00+00:00",
"next_run_at": None,
"last_run_at": None,
"last_status": None,
"last_error": None,
"deliver": "local",
"origin": None,
}]
)
due = get_due_jobs()
assert [job["id"] for job in due] == ["oneshot-recover"]
assert get_job("oneshot-recover")["next_run_at"] == run_at
def test_broken_stale_one_shot_without_next_run_is_not_recovered(self, tmp_cron_dir, monkeypatch):
now = datetime(2026, 3, 18, 4, 30, 0, tzinfo=timezone.utc)
monkeypatch.setattr("cron.jobs._hermes_now", lambda: now)
save_jobs(
[{
"id": "oneshot-stale",
"name": "Too old",
"prompt": "Word of the day",
"schedule": {"kind": "once", "run_at": "2026-03-18T04:22:00+00:00", "display": "once at 2026-03-18 04:22"},
"schedule_display": "once at 2026-03-18 04:22",
"repeat": {"times": 1, "completed": 0},
"enabled": True,
"state": "scheduled",
"paused_at": None,
"paused_reason": None,
"created_at": "2026-03-18T04:21:00+00:00",
"next_run_at": None,
"last_run_at": None,
"last_status": None,
"last_error": None,
"deliver": "local",
"origin": None,
}]
)
assert get_due_jobs() == []
assert get_job("oneshot-stale")["next_run_at"] is None
class TestSaveJobOutput:
def test_creates_output_file(self, tmp_cron_dir):

View file

@ -115,6 +115,22 @@ class TestGatewayConfigRoundtrip:
assert restored.quick_commands == {"limits": {"type": "exec", "command": "echo ok"}}
assert restored.group_sessions_per_user is False
def test_roundtrip_preserves_unauthorized_dm_behavior(self):
config = GatewayConfig(
unauthorized_dm_behavior="ignore",
platforms={
Platform.WHATSAPP: PlatformConfig(
enabled=True,
extra={"unauthorized_dm_behavior": "pair"},
),
},
)
restored = GatewayConfig.from_dict(config.to_dict())
assert restored.unauthorized_dm_behavior == "ignore"
assert restored.platforms[Platform.WHATSAPP].extra["unauthorized_dm_behavior"] == "pair"
class TestLoadGatewayConfig:
def test_bridges_quick_commands_from_config_yaml(self, tmp_path, monkeypatch):
@ -158,3 +174,21 @@ class TestLoadGatewayConfig:
config = load_gateway_config()
assert config.quick_commands == {}
def test_bridges_unauthorized_dm_behavior_from_config_yaml(self, tmp_path, monkeypatch):
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
config_path = hermes_home / "config.yaml"
config_path.write_text(
"unauthorized_dm_behavior: ignore\n"
"whatsapp:\n"
" unauthorized_dm_behavior: pair\n",
encoding="utf-8",
)
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
config = load_gateway_config()
assert config.unauthorized_dm_behavior == "ignore"
assert config.platforms[Platform.WHATSAPP].extra["unauthorized_dm_behavior"] == "pair"

View file

@ -42,6 +42,26 @@ class TestGatewayPidState:
assert status.get_running_pid() == os.getpid()
def test_get_running_pid_accepts_script_style_gateway_cmdline(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
pid_path = tmp_path / "gateway.pid"
pid_path.write_text(json.dumps({
"pid": os.getpid(),
"kind": "hermes-gateway",
"argv": ["/venv/bin/python", "/repo/hermes_cli/main.py", "gateway", "run", "--replace"],
"start_time": 123,
}))
monkeypatch.setattr(status.os, "kill", lambda pid, sig: None)
monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 123)
monkeypatch.setattr(
status,
"_read_process_cmdline",
lambda pid: "/venv/bin/python /repo/hermes_cli/main.py gateway run --replace",
)
assert status.get_running_pid() == os.getpid()
class TestGatewayRuntimeStatus:
def test_write_runtime_status_overwrites_stale_pid_on_restart(self, tmp_path, monkeypatch):

View file

@ -0,0 +1,137 @@
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock
import pytest
from gateway.config import GatewayConfig, Platform, PlatformConfig
from gateway.platforms.base import MessageEvent
from gateway.session import SessionSource
def _clear_auth_env(monkeypatch) -> None:
for key in (
"TELEGRAM_ALLOWED_USERS",
"DISCORD_ALLOWED_USERS",
"WHATSAPP_ALLOWED_USERS",
"SLACK_ALLOWED_USERS",
"SIGNAL_ALLOWED_USERS",
"EMAIL_ALLOWED_USERS",
"SMS_ALLOWED_USERS",
"MATTERMOST_ALLOWED_USERS",
"MATRIX_ALLOWED_USERS",
"DINGTALK_ALLOWED_USERS",
"GATEWAY_ALLOWED_USERS",
"TELEGRAM_ALLOW_ALL_USERS",
"DISCORD_ALLOW_ALL_USERS",
"WHATSAPP_ALLOW_ALL_USERS",
"SLACK_ALLOW_ALL_USERS",
"SIGNAL_ALLOW_ALL_USERS",
"EMAIL_ALLOW_ALL_USERS",
"SMS_ALLOW_ALL_USERS",
"MATTERMOST_ALLOW_ALL_USERS",
"MATRIX_ALLOW_ALL_USERS",
"DINGTALK_ALLOW_ALL_USERS",
"GATEWAY_ALLOW_ALL_USERS",
):
monkeypatch.delenv(key, raising=False)
def _make_event(platform: Platform, user_id: str, chat_id: str) -> MessageEvent:
return MessageEvent(
text="hello",
message_id="m1",
source=SessionSource(
platform=platform,
user_id=user_id,
chat_id=chat_id,
user_name="tester",
chat_type="dm",
),
)
def _make_runner(platform: Platform, config: GatewayConfig):
from gateway.run import GatewayRunner
runner = object.__new__(GatewayRunner)
runner.config = config
adapter = SimpleNamespace(send=AsyncMock())
runner.adapters = {platform: adapter}
runner.pairing_store = MagicMock()
runner.pairing_store.is_approved.return_value = False
return runner, adapter
@pytest.mark.asyncio
async def test_unauthorized_dm_pairs_by_default(monkeypatch):
_clear_auth_env(monkeypatch)
config = GatewayConfig(
platforms={Platform.WHATSAPP: PlatformConfig(enabled=True)},
)
runner, adapter = _make_runner(Platform.WHATSAPP, config)
runner.pairing_store.generate_code.return_value = "ABC12DEF"
result = await runner._handle_message(
_make_event(
Platform.WHATSAPP,
"15551234567@s.whatsapp.net",
"15551234567@s.whatsapp.net",
)
)
assert result is None
runner.pairing_store.generate_code.assert_called_once_with(
"whatsapp",
"15551234567@s.whatsapp.net",
"tester",
)
adapter.send.assert_awaited_once()
assert "ABC12DEF" in adapter.send.await_args.args[1]
@pytest.mark.asyncio
async def test_unauthorized_whatsapp_dm_can_be_ignored(monkeypatch):
_clear_auth_env(monkeypatch)
config = GatewayConfig(
platforms={
Platform.WHATSAPP: PlatformConfig(
enabled=True,
extra={"unauthorized_dm_behavior": "ignore"},
),
},
)
runner, adapter = _make_runner(Platform.WHATSAPP, config)
result = await runner._handle_message(
_make_event(
Platform.WHATSAPP,
"15551234567@s.whatsapp.net",
"15551234567@s.whatsapp.net",
)
)
assert result is None
runner.pairing_store.generate_code.assert_not_called()
adapter.send.assert_not_awaited()
@pytest.mark.asyncio
async def test_global_ignore_suppresses_pairing_reply(monkeypatch):
_clear_auth_env(monkeypatch)
config = GatewayConfig(
unauthorized_dm_behavior="ignore",
platforms={Platform.TELEGRAM: PlatformConfig(enabled=True, token="***")},
)
runner, adapter = _make_runner(Platform.TELEGRAM, config)
result = await runner._handle_message(
_make_event(
Platform.TELEGRAM,
"12345",
"12345",
)
)
assert result is None
runner.pairing_store.generate_code.assert_not_called()
adapter.send.assert_not_awaited()

View file

@ -0,0 +1,70 @@
"""Tests for banner toolset name normalization and skin color usage."""
from unittest.mock import patch
from rich.console import Console
import hermes_cli.banner as banner
import model_tools
import tools.mcp_tool
def test_display_toolset_name_strips_legacy_suffix():
assert banner._display_toolset_name("homeassistant_tools") == "homeassistant"
assert banner._display_toolset_name("honcho_tools") == "honcho"
assert banner._display_toolset_name("web_tools") == "web"
def test_display_toolset_name_preserves_clean_names():
assert banner._display_toolset_name("browser") == "browser"
assert banner._display_toolset_name("file") == "file"
assert banner._display_toolset_name("terminal") == "terminal"
def test_display_toolset_name_handles_empty():
assert banner._display_toolset_name("") == "unknown"
assert banner._display_toolset_name(None) == "unknown"
def test_build_welcome_banner_uses_normalized_toolset_names():
"""Unavailable toolsets should not have '_tools' appended in banner output."""
with (
patch.object(
model_tools,
"check_tool_availability",
return_value=(
["web"],
[
{"name": "homeassistant", "tools": ["ha_call_service"]},
{"name": "honcho", "tools": ["honcho_conclude"]},
],
),
),
patch.object(banner, "get_available_skills", return_value={}),
patch.object(banner, "get_update_result", return_value=None),
patch.object(tools.mcp_tool, "get_mcp_status", return_value=[]),
):
console = Console(
record=True, force_terminal=False, color_system=None, width=160
)
banner.build_welcome_banner(
console=console,
model="anthropic/test-model",
cwd="/tmp/project",
tools=[
{"function": {"name": "web_search"}},
{"function": {"name": "read_file"}},
],
get_toolset_for_tool=lambda name: {
"web_search": "web_tools",
"read_file": "file",
}.get(name),
)
output = console.export_text()
assert "homeassistant:" in output
assert "honcho:" in output
assert "web:" in output
assert "homeassistant_tools:" not in output
assert "honcho_tools:" not in output
assert "web_tools:" not in output

View file

@ -0,0 +1,68 @@
"""Tests for banner get_available_skills() — disabled and platform filtering."""
from unittest.mock import patch
import pytest
_MOCK_SKILLS = [
{"name": "skill-a", "description": "A skill", "category": "tools"},
{"name": "skill-b", "description": "B skill", "category": "tools"},
{"name": "skill-c", "description": "C skill", "category": "creative"},
]
def test_get_available_skills_delegates_to_find_all_skills():
"""get_available_skills should call _find_all_skills (which handles filtering)."""
with patch("tools.skills_tool._find_all_skills", return_value=list(_MOCK_SKILLS)):
from hermes_cli.banner import get_available_skills
result = get_available_skills()
assert "tools" in result
assert "creative" in result
assert sorted(result["tools"]) == ["skill-a", "skill-b"]
assert result["creative"] == ["skill-c"]
def test_get_available_skills_excludes_disabled():
"""Disabled skills should not appear in the banner count."""
# _find_all_skills already filters disabled skills, so if we give it
# a filtered list, get_available_skills should reflect that.
filtered = [s for s in _MOCK_SKILLS if s["name"] != "skill-b"]
with patch("tools.skills_tool._find_all_skills", return_value=filtered):
from hermes_cli.banner import get_available_skills
result = get_available_skills()
all_names = [n for names in result.values() for n in names]
assert "skill-b" not in all_names
assert "skill-a" in all_names
assert len(all_names) == 2
def test_get_available_skills_empty_when_no_skills():
"""No skills installed returns empty dict."""
with patch("tools.skills_tool._find_all_skills", return_value=[]):
from hermes_cli.banner import get_available_skills
result = get_available_skills()
assert result == {}
def test_get_available_skills_handles_import_failure():
"""If _find_all_skills import fails, return empty dict gracefully."""
with patch("tools.skills_tool._find_all_skills", side_effect=ImportError("boom")):
from hermes_cli.banner import get_available_skills
result = get_available_skills()
assert result == {}
def test_get_available_skills_null_category_becomes_general():
"""Skills with None category should be grouped under 'general'."""
skills = [{"name": "orphan-skill", "description": "No cat", "category": None}]
with patch("tools.skills_tool._find_all_skills", return_value=skills):
from hermes_cli.banner import get_available_skills
result = get_available_skills()
assert "general" in result
assert result["general"] == ["orphan-skill"]

View file

@ -1,6 +1,8 @@
"""Tests for hermes_cli.gateway."""
import signal
from types import SimpleNamespace
from unittest.mock import patch, call
import hermes_cli.gateway as gateway
@ -169,3 +171,84 @@ def test_install_linux_gateway_from_setup_system_choice_as_root_installs(monkeyp
assert (scope, did_install) == ("system", True)
assert calls == [(True, True, "alice")]
# ---------------------------------------------------------------------------
# _wait_for_gateway_exit
# ---------------------------------------------------------------------------
class TestWaitForGatewayExit:
"""PID-based wait with force-kill on timeout."""
def test_returns_immediately_when_no_pid(self, monkeypatch):
"""If get_running_pid returns None, exit instantly."""
monkeypatch.setattr("gateway.status.get_running_pid", lambda: None)
# Should return without sleeping at all.
gateway._wait_for_gateway_exit(timeout=1.0, force_after=0.5)
def test_returns_when_process_exits_gracefully(self, monkeypatch):
"""Process exits after a couple of polls — no SIGKILL needed."""
poll_count = 0
def mock_get_running_pid():
nonlocal poll_count
poll_count += 1
return 12345 if poll_count <= 2 else None
monkeypatch.setattr("gateway.status.get_running_pid", mock_get_running_pid)
monkeypatch.setattr("time.sleep", lambda _: None)
gateway._wait_for_gateway_exit(timeout=10.0, force_after=999.0)
# Should have polled until None was returned.
assert poll_count == 3
def test_force_kills_after_grace_period(self, monkeypatch):
"""When the process doesn't exit, SIGKILL the saved PID."""
import time as _time
# Simulate monotonic time advancing past force_after
call_num = 0
def fake_monotonic():
nonlocal call_num
call_num += 1
# First two calls: initial deadline + force_deadline setup (time 0)
# Then each loop iteration advances time
return call_num * 2.0 # 2, 4, 6, 8, ...
kills = []
def mock_kill(pid, sig):
kills.append((pid, sig))
# get_running_pid returns the PID until kill is sent, then None
def mock_get_running_pid():
return None if kills else 42
monkeypatch.setattr("time.monotonic", fake_monotonic)
monkeypatch.setattr("time.sleep", lambda _: None)
monkeypatch.setattr("gateway.status.get_running_pid", mock_get_running_pid)
monkeypatch.setattr("os.kill", mock_kill)
gateway._wait_for_gateway_exit(timeout=10.0, force_after=5.0)
assert (42, signal.SIGKILL) in kills
def test_handles_process_already_gone_on_kill(self, monkeypatch):
"""ProcessLookupError during SIGKILL is not fatal."""
import time as _time
call_num = 0
def fake_monotonic():
nonlocal call_num
call_num += 1
return call_num * 3.0 # Jump past force_after quickly
def mock_kill(pid, sig):
raise ProcessLookupError
monkeypatch.setattr("time.monotonic", fake_monotonic)
monkeypatch.setattr("time.sleep", lambda _: None)
monkeypatch.setattr("gateway.status.get_running_pid", lambda: 99)
monkeypatch.setattr("os.kill", mock_kill)
# Should not raise — ProcessLookupError means it's already gone.
gateway._wait_for_gateway_exit(timeout=10.0, force_after=2.0)

View file

@ -131,7 +131,7 @@ class TestTryActivateFallback:
def test_activates_minimax_fallback(self):
agent = _make_agent(
fallback_model={"provider": "minimax", "model": "MiniMax-M2.5"},
fallback_model={"provider": "minimax", "model": "MiniMax-M2.7"},
)
mock_client = _mock_resolve(
api_key="sk-mm-key",
@ -139,10 +139,10 @@ class TestTryActivateFallback:
)
with patch(
"agent.auxiliary_client.resolve_provider_client",
return_value=(mock_client, "MiniMax-M2.5"),
return_value=(mock_client, "MiniMax-M2.7"),
):
assert agent._try_activate_fallback() is True
assert agent.model == "MiniMax-M2.5"
assert agent.model == "MiniMax-M2.7"
assert agent.provider == "minimax"
assert agent.client is mock_client
@ -165,7 +165,7 @@ class TestTryActivateFallback:
def test_returns_false_when_no_api_key(self):
"""Fallback should fail gracefully when the API key env var is unset."""
agent = _make_agent(
fallback_model={"provider": "minimax", "model": "MiniMax-M2.5"},
fallback_model={"provider": "minimax", "model": "MiniMax-M2.7"},
)
with patch(
"agent.auxiliary_client.resolve_provider_client",

View file

@ -210,6 +210,25 @@ class TestFTS5Search:
sources = [r["source"] for r in results]
assert all(s == "telegram" for s in sources)
def test_search_default_sources_include_acp(self, db):
db.create_session(session_id="s1", source="acp")
db.append_message("s1", role="user", content="ACP question about Python")
results = db.search_messages("Python")
sources = [r["source"] for r in results]
assert "acp" in sources
def test_search_default_includes_all_platforms(self, db):
"""Default search (no source_filter) should find sessions from any platform."""
for src in ("cli", "telegram", "signal", "homeassistant", "acp", "matrix"):
sid = f"s-{src}"
db.create_session(session_id=sid, source=src)
db.append_message(sid, role="user", content=f"universal search test from {src}")
results = db.search_messages("universal search test")
found_sources = {r["source"] for r in results}
assert found_sources == {"cli", "telegram", "signal", "homeassistant", "acp", "matrix"}
def test_search_with_role_filter(self, db):
db.create_session(session_id="s1", source="cli")
db.append_message("s1", role="user", content="What is FastAPI?")

View file

@ -828,7 +828,7 @@ class TestConcurrentToolExecution:
mock_con.assert_not_called()
def test_multiple_tools_uses_concurrent_path(self, agent):
"""Multiple non-interactive tools should use concurrent path."""
"""Multiple read-only tools should use concurrent path."""
tc1 = _mock_tool_call(name="web_search", arguments='{}', call_id="c1")
tc2 = _mock_tool_call(name="read_file", arguments='{"path":"x.py"}', call_id="c2")
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc1, tc2])
@ -839,6 +839,94 @@ class TestConcurrentToolExecution:
mock_con.assert_called_once()
mock_seq.assert_not_called()
def test_terminal_batch_forces_sequential(self, agent):
"""Stateful tools should not share the concurrent execution path."""
tc1 = _mock_tool_call(name="web_search", arguments='{}', call_id="c1")
tc2 = _mock_tool_call(name="terminal", arguments='{"command":"pwd"}', call_id="c2")
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc1, tc2])
messages = []
with patch.object(agent, "_execute_tool_calls_sequential") as mock_seq:
with patch.object(agent, "_execute_tool_calls_concurrent") as mock_con:
agent._execute_tool_calls(mock_msg, messages, "task-1")
mock_seq.assert_called_once()
mock_con.assert_not_called()
def test_write_batch_forces_sequential(self, agent):
"""File mutations should stay ordered within a turn."""
tc1 = _mock_tool_call(name="read_file", arguments='{"path":"x.py"}', call_id="c1")
tc2 = _mock_tool_call(name="write_file", arguments='{"path":"x.py","content":"print(1)"}', call_id="c2")
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc1, tc2])
messages = []
with patch.object(agent, "_execute_tool_calls_sequential") as mock_seq:
with patch.object(agent, "_execute_tool_calls_concurrent") as mock_con:
agent._execute_tool_calls(mock_msg, messages, "task-1")
mock_seq.assert_called_once()
mock_con.assert_not_called()
def test_disjoint_write_batch_uses_concurrent_path(self, agent):
"""Independent file writes should still run concurrently."""
tc1 = _mock_tool_call(
name="write_file",
arguments='{"path":"src/a.py","content":"print(1)"}',
call_id="c1",
)
tc2 = _mock_tool_call(
name="write_file",
arguments='{"path":"src/b.py","content":"print(2)"}',
call_id="c2",
)
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc1, tc2])
messages = []
with patch.object(agent, "_execute_tool_calls_sequential") as mock_seq:
with patch.object(agent, "_execute_tool_calls_concurrent") as mock_con:
agent._execute_tool_calls(mock_msg, messages, "task-1")
mock_con.assert_called_once()
mock_seq.assert_not_called()
def test_overlapping_write_batch_forces_sequential(self, agent):
"""Writes to the same file must stay ordered."""
tc1 = _mock_tool_call(
name="write_file",
arguments='{"path":"src/a.py","content":"print(1)"}',
call_id="c1",
)
tc2 = _mock_tool_call(
name="patch",
arguments='{"path":"src/a.py","old_string":"1","new_string":"2"}',
call_id="c2",
)
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc1, tc2])
messages = []
with patch.object(agent, "_execute_tool_calls_sequential") as mock_seq:
with patch.object(agent, "_execute_tool_calls_concurrent") as mock_con:
agent._execute_tool_calls(mock_msg, messages, "task-1")
mock_seq.assert_called_once()
mock_con.assert_not_called()
def test_malformed_json_args_forces_sequential(self, agent):
"""Unparseable tool arguments should fall back to sequential."""
tc1 = _mock_tool_call(name="web_search", arguments='{}', call_id="c1")
tc2 = _mock_tool_call(name="web_search", arguments="NOT JSON {{{", call_id="c2")
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc1, tc2])
messages = []
with patch.object(agent, "_execute_tool_calls_sequential") as mock_seq:
with patch.object(agent, "_execute_tool_calls_concurrent") as mock_con:
agent._execute_tool_calls(mock_msg, messages, "task-1")
mock_seq.assert_called_once()
mock_con.assert_not_called()
def test_non_dict_args_forces_sequential(self, agent):
"""Tool arguments that parse to a non-dict type should fall back to sequential."""
tc1 = _mock_tool_call(name="web_search", arguments='{}', call_id="c1")
tc2 = _mock_tool_call(name="web_search", arguments='"just a string"', call_id="c2")
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc1, tc2])
messages = []
with patch.object(agent, "_execute_tool_calls_sequential") as mock_seq:
with patch.object(agent, "_execute_tool_calls_concurrent") as mock_con:
agent._execute_tool_calls(mock_msg, messages, "task-1")
mock_seq.assert_called_once()
mock_con.assert_not_called()
def test_concurrent_executes_all_tools(self, agent):
"""Concurrent path should execute all tools and append results in order."""
tc1 = _mock_tool_call(name="web_search", arguments='{"q":"alpha"}', call_id="c1")
@ -965,6 +1053,39 @@ class TestConcurrentToolExecution:
assert "ok" in result
class TestPathsOverlap:
"""Unit tests for the _paths_overlap helper."""
def test_same_path_overlaps(self):
from run_agent import _paths_overlap
assert _paths_overlap(Path("src/a.py"), Path("src/a.py"))
def test_siblings_do_not_overlap(self):
from run_agent import _paths_overlap
assert not _paths_overlap(Path("src/a.py"), Path("src/b.py"))
def test_parent_child_overlap(self):
from run_agent import _paths_overlap
assert _paths_overlap(Path("src"), Path("src/sub/a.py"))
def test_different_roots_do_not_overlap(self):
from run_agent import _paths_overlap
assert not _paths_overlap(Path("src/a.py"), Path("other/a.py"))
def test_nested_vs_flat_do_not_overlap(self):
from run_agent import _paths_overlap
assert not _paths_overlap(Path("src/sub/a.py"), Path("src/a.py"))
def test_empty_paths_do_not_overlap(self):
from run_agent import _paths_overlap
assert not _paths_overlap(Path(""), Path(""))
def test_one_empty_path_does_not_overlap(self):
from run_agent import _paths_overlap
assert not _paths_overlap(Path(""), Path("src/a.py"))
assert not _paths_overlap(Path("src/a.py"), Path(""))
class TestHandleMaxIterations:
def test_returns_summary(self, agent):
resp = _mock_response(content="Here is a summary of what I did.")
@ -2774,3 +2895,135 @@ class TestNormalizeCodexDictArguments:
msg, _ = agent._normalize_codex_response(response)
tc = msg.tool_calls[0]
assert tc.function.arguments == args_str
# ---------------------------------------------------------------------------
# OAuth flag and nudge counter fixes (salvaged from PR #1797)
# ---------------------------------------------------------------------------
class TestOAuthFlagAfterCredentialRefresh:
"""_is_anthropic_oauth must update when token type changes during refresh."""
def test_oauth_flag_updates_api_key_to_oauth(self, agent):
"""Refreshing from API key to OAuth token must set flag to True."""
agent.api_mode = "anthropic_messages"
agent._anthropic_api_key = "sk-ant-api-old"
agent._anthropic_client = MagicMock()
agent._is_anthropic_oauth = False
with (
patch("agent.anthropic_adapter.resolve_anthropic_token",
return_value="sk-ant-setup-oauth-token"),
patch("agent.anthropic_adapter.build_anthropic_client",
return_value=MagicMock()),
):
result = agent._try_refresh_anthropic_client_credentials()
assert result is True
assert agent._is_anthropic_oauth is True
def test_oauth_flag_updates_oauth_to_api_key(self, agent):
"""Refreshing from OAuth to API key must set flag to False."""
agent.api_mode = "anthropic_messages"
agent._anthropic_api_key = "sk-ant-setup-old"
agent._anthropic_client = MagicMock()
agent._is_anthropic_oauth = True
with (
patch("agent.anthropic_adapter.resolve_anthropic_token",
return_value="sk-ant-api03-new-key"),
patch("agent.anthropic_adapter.build_anthropic_client",
return_value=MagicMock()),
):
result = agent._try_refresh_anthropic_client_credentials()
assert result is True
assert agent._is_anthropic_oauth is False
class TestFallbackSetsOAuthFlag:
"""_try_activate_fallback must set _is_anthropic_oauth for Anthropic fallbacks."""
def test_fallback_to_anthropic_oauth_sets_flag(self, agent):
agent._fallback_activated = False
agent._fallback_model = {"provider": "anthropic", "model": "claude-sonnet-4-6"}
mock_client = MagicMock()
mock_client.base_url = "https://api.anthropic.com/v1"
mock_client.api_key = "sk-ant-setup-oauth-token"
with (
patch("agent.auxiliary_client.resolve_provider_client",
return_value=(mock_client, None)),
patch("agent.anthropic_adapter.build_anthropic_client",
return_value=MagicMock()),
patch("agent.anthropic_adapter.resolve_anthropic_token",
return_value=None),
):
result = agent._try_activate_fallback()
assert result is True
assert agent._is_anthropic_oauth is True
def test_fallback_to_anthropic_api_key_clears_flag(self, agent):
agent._fallback_activated = False
agent._fallback_model = {"provider": "anthropic", "model": "claude-sonnet-4-6"}
mock_client = MagicMock()
mock_client.base_url = "https://api.anthropic.com/v1"
mock_client.api_key = "sk-ant-api03-regular-key"
with (
patch("agent.auxiliary_client.resolve_provider_client",
return_value=(mock_client, None)),
patch("agent.anthropic_adapter.build_anthropic_client",
return_value=MagicMock()),
patch("agent.anthropic_adapter.resolve_anthropic_token",
return_value=None),
):
result = agent._try_activate_fallback()
assert result is True
assert agent._is_anthropic_oauth is False
class TestMemoryNudgeCounterPersistence:
"""_turns_since_memory must persist across run_conversation calls."""
def test_counters_initialized_in_init(self):
"""Counters must exist on the agent after __init__."""
with patch("run_agent.get_tool_definitions", return_value=[]):
a = AIAgent(
model="test", api_key="test-key", provider="openrouter",
skip_context_files=True, skip_memory=True,
)
assert hasattr(a, "_turns_since_memory")
assert hasattr(a, "_iters_since_skill")
assert a._turns_since_memory == 0
assert a._iters_since_skill == 0
def test_counters_not_reset_in_preamble(self):
"""The run_conversation preamble must not zero the nudge counters."""
import inspect
src = inspect.getsource(AIAgent.run_conversation)
# The preamble resets many fields (retry counts, budget, etc.)
# before the main loop. Find that reset block and verify our
# counters aren't in it. The reset block ends at iteration_budget.
preamble_end = src.index("self.iteration_budget = IterationBudget")
preamble = src[:preamble_end]
assert "self._turns_since_memory = 0" not in preamble
assert "self._iters_since_skill = 0" not in preamble
class TestDeadRetryCode:
"""Unreachable retry_count >= max_retries after raise must not exist."""
def test_no_unreachable_max_retries_after_backoff(self):
import inspect
source = inspect.getsource(AIAgent.run_conversation)
occurrences = source.count("if retry_count >= max_retries:")
assert occurrences == 2, (
f"Expected 2 occurrences of 'if retry_count >= max_retries:' "
f"but found {occurrences}"
)

View file

@ -505,6 +505,42 @@ class TestToolsetInjection:
assert "mcp_fs_list_files" not in fake_toolsets["non-hermes"]["tools"]
# Original tools preserved
assert "terminal" in fake_toolsets["hermes-cli"]["tools"]
# Server name becomes a standalone toolset
assert "fs" in fake_toolsets
assert "mcp_fs_list_files" in fake_toolsets["fs"]["tools"]
assert fake_toolsets["fs"]["description"].startswith("MCP server '")
def test_server_toolset_skips_builtin_collision(self):
"""MCP server named after a built-in toolset shouldn't overwrite it."""
from tools.mcp_tool import MCPServerTask
mock_tools = [_make_mcp_tool("run", "Run command")]
mock_session = MagicMock()
fresh_servers = {}
async def fake_connect(name, config):
server = MCPServerTask(name)
server.session = mock_session
server._tools = mock_tools
return server
fake_toolsets = {
"hermes-cli": {"tools": ["terminal"], "description": "CLI", "includes": []},
# Built-in toolset named "terminal" — must not be overwritten
"terminal": {"tools": ["terminal"], "description": "Terminal tools", "includes": []},
}
fake_config = {"terminal": {"command": "npx", "args": []}}
with patch("tools.mcp_tool._MCP_AVAILABLE", True), \
patch("tools.mcp_tool._servers", fresh_servers), \
patch("tools.mcp_tool._load_mcp_config", return_value=fake_config), \
patch("tools.mcp_tool._connect_server", side_effect=fake_connect), \
patch("toolsets.TOOLSETS", fake_toolsets):
from tools.mcp_tool import discover_mcp_tools
discover_mcp_tools()
# Built-in toolset preserved — description unchanged
assert fake_toolsets["terminal"]["description"] == "Terminal tools"
def test_server_connection_failure_skipped(self):
"""If one server fails to connect, others still proceed."""

View file

@ -374,6 +374,35 @@ class TestSkillView:
result = json.loads(raw)
assert result["success"] is False
def test_view_disabled_skill_blocked(self, tmp_path):
"""Disabled skills should not be viewable via skill_view."""
with (
patch("tools.skills_tool.SKILLS_DIR", tmp_path),
patch(
"tools.skills_tool._is_skill_disabled",
return_value=True,
),
):
_make_skill(tmp_path, "hidden-skill")
raw = skill_view("hidden-skill")
result = json.loads(raw)
assert result["success"] is False
assert "disabled" in result["error"].lower()
def test_view_enabled_skill_allowed(self, tmp_path):
"""Non-disabled skills should be viewable normally."""
with (
patch("tools.skills_tool.SKILLS_DIR", tmp_path),
patch(
"tools.skills_tool._is_skill_disabled",
return_value=False,
),
):
_make_skill(tmp_path, "active-skill")
raw = skill_view("active-skill")
result = json.loads(raw)
assert result["success"] is True
class TestSkillViewSecureSetupOnLoad:
def test_requests_missing_required_env_and_continues(self, tmp_path, monkeypatch):