Merge origin/main into hermes/hermes-5d160594

This commit is contained in:
teknium1 2026-03-14 19:34:05 -07:00
commit 3229e434b8
78 changed files with 3762 additions and 395 deletions

View file

@ -15,10 +15,30 @@ from agent.prompt_builder import (
build_context_files_prompt,
CONTEXT_FILE_MAX_CHARS,
DEFAULT_AGENT_IDENTITY,
MEMORY_GUIDANCE,
SESSION_SEARCH_GUIDANCE,
PLATFORM_HINTS,
)
# =========================================================================
# Guidance constants
# =========================================================================
class TestGuidanceConstants:
def test_memory_guidance_discourages_task_logs(self):
assert "durable facts" in MEMORY_GUIDANCE
assert "Do NOT save task progress" in MEMORY_GUIDANCE
assert "session_search" in MEMORY_GUIDANCE
assert "like a diary" not in MEMORY_GUIDANCE
assert ">80%" not in MEMORY_GUIDANCE
def test_session_search_guidance_is_simple_cross_session_recall(self):
assert "relevant cross-session context exists" in SESSION_SEARCH_GUIDANCE
assert "recent turns of the current session" not in SESSION_SEARCH_GUIDANCE
# =========================================================================
# Context injection scanning
# =========================================================================
@ -435,6 +455,7 @@ class TestPromptBuilderConstants:
assert "whatsapp" in PLATFORM_HINTS
assert "telegram" in PLATFORM_HINTS
assert "discord" in PLATFORM_HINTS
assert "cron" in PLATFORM_HINTS
assert "cli" in PLATFORM_HINTS

View file

@ -6,7 +6,7 @@ from unittest.mock import patch, MagicMock
import pytest
from cron.scheduler import _resolve_origin, _deliver_result, run_job
from cron.scheduler import _resolve_origin, _resolve_delivery_target, _deliver_result, run_job
class TestResolveOrigin:
@ -44,6 +44,56 @@ class TestResolveOrigin:
assert _resolve_origin(job) is None
class TestResolveDeliveryTarget:
def test_origin_delivery_preserves_thread_id(self):
job = {
"deliver": "origin",
"origin": {
"platform": "telegram",
"chat_id": "-1001",
"thread_id": "17585",
},
}
assert _resolve_delivery_target(job) == {
"platform": "telegram",
"chat_id": "-1001",
"thread_id": "17585",
}
def test_bare_platform_uses_matching_origin_chat(self):
job = {
"deliver": "telegram",
"origin": {
"platform": "telegram",
"chat_id": "-1001",
"thread_id": "17585",
},
}
assert _resolve_delivery_target(job) == {
"platform": "telegram",
"chat_id": "-1001",
"thread_id": "17585",
}
def test_bare_platform_falls_back_to_home_channel(self, monkeypatch):
monkeypatch.setenv("TELEGRAM_HOME_CHANNEL", "-2002")
job = {
"deliver": "telegram",
"origin": {
"platform": "discord",
"chat_id": "abc",
},
}
assert _resolve_delivery_target(job) == {
"platform": "telegram",
"chat_id": "-2002",
"thread_id": None,
}
class TestDeliverResultMirrorLogging:
"""Verify that mirror_to_session failures are logged, not silently swallowed."""

View file

@ -1,7 +1,7 @@
"""Tests for the delivery routing module."""
from gateway.config import Platform, GatewayConfig, PlatformConfig, HomeChannel
from gateway.delivery import DeliveryTarget, parse_deliver_spec
from gateway.delivery import DeliveryRouter, DeliveryTarget, parse_deliver_spec
from gateway.session import SessionSource
@ -85,3 +85,12 @@ class TestTargetToStringRoundtrip:
reparsed = DeliveryTarget.parse(s)
assert reparsed.platform == Platform.TELEGRAM
assert reparsed.chat_id == "999"
class TestDeliveryRouter:
def test_resolve_targets_does_not_duplicate_local_when_explicit(self):
router = DeliveryRouter(GatewayConfig(always_log_local=True))
targets = router.resolve_targets(["local"])
assert [target.platform for target in targets] == [Platform.LOCAL]

View file

@ -0,0 +1,46 @@
import pytest
from gateway.config import GatewayConfig, Platform, PlatformConfig
from gateway.platforms.base import BasePlatformAdapter
from gateway.run import GatewayRunner
class _FatalAdapter(BasePlatformAdapter):
def __init__(self):
super().__init__(PlatformConfig(enabled=True, token="token"), Platform.TELEGRAM)
async def connect(self) -> bool:
self._set_fatal_error(
"telegram_token_lock",
"Another local Hermes gateway is already using this Telegram bot token.",
retryable=False,
)
return False
async def disconnect(self) -> None:
self._mark_disconnected()
async def send(self, chat_id, content, reply_to=None, metadata=None):
raise NotImplementedError
async def get_chat_info(self, chat_id):
return {"id": chat_id}
@pytest.mark.asyncio
async def test_runner_requests_clean_exit_for_nonretryable_startup_conflict(monkeypatch, tmp_path):
config = GatewayConfig(
platforms={
Platform.TELEGRAM: PlatformConfig(enabled=True, token="token")
},
sessions_dir=tmp_path / "sessions",
)
runner = GatewayRunner(config)
monkeypatch.setattr(runner, "_create_adapter", lambda platform, platform_config: _FatalAdapter())
ok = await runner.start()
assert ok is True
assert runner.should_exit_cleanly is True
assert "already using this Telegram bot token" in runner.exit_reason

View file

@ -25,3 +25,77 @@ class TestGatewayPidState:
assert status.get_running_pid() is None
assert not pid_path.exists()
class TestGatewayRuntimeStatus:
def test_write_runtime_status_records_platform_failure(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
status.write_runtime_status(
gateway_state="startup_failed",
exit_reason="telegram conflict",
platform="telegram",
platform_state="fatal",
error_code="telegram_polling_conflict",
error_message="another poller is active",
)
payload = status.read_runtime_status()
assert payload["gateway_state"] == "startup_failed"
assert payload["exit_reason"] == "telegram conflict"
assert payload["platforms"]["telegram"]["state"] == "fatal"
assert payload["platforms"]["telegram"]["error_code"] == "telegram_polling_conflict"
assert payload["platforms"]["telegram"]["error_message"] == "another poller is active"
class TestScopedLocks:
def test_acquire_scoped_lock_rejects_live_other_process(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_GATEWAY_LOCK_DIR", str(tmp_path / "locks"))
lock_path = tmp_path / "locks" / "telegram-bot-token-2bb80d537b1da3e3.lock"
lock_path.parent.mkdir(parents=True, exist_ok=True)
lock_path.write_text(json.dumps({
"pid": 99999,
"start_time": 123,
"kind": "hermes-gateway",
}))
monkeypatch.setattr(status.os, "kill", lambda pid, sig: None)
monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 123)
acquired, existing = status.acquire_scoped_lock("telegram-bot-token", "secret", metadata={"platform": "telegram"})
assert acquired is False
assert existing["pid"] == 99999
def test_acquire_scoped_lock_replaces_stale_record(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_GATEWAY_LOCK_DIR", str(tmp_path / "locks"))
lock_path = tmp_path / "locks" / "telegram-bot-token-2bb80d537b1da3e3.lock"
lock_path.parent.mkdir(parents=True, exist_ok=True)
lock_path.write_text(json.dumps({
"pid": 99999,
"start_time": 123,
"kind": "hermes-gateway",
}))
def fake_kill(pid, sig):
raise ProcessLookupError
monkeypatch.setattr(status.os, "kill", fake_kill)
acquired, existing = status.acquire_scoped_lock("telegram-bot-token", "secret", metadata={"platform": "telegram"})
assert acquired is True
payload = json.loads(lock_path.read_text())
assert payload["pid"] == os.getpid()
assert payload["metadata"]["platform"] == "telegram"
def test_release_scoped_lock_only_removes_current_owner(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_GATEWAY_LOCK_DIR", str(tmp_path / "locks"))
acquired, _ = status.acquire_scoped_lock("telegram-bot-token", "secret", metadata={"platform": "telegram"})
assert acquired is True
lock_path = tmp_path / "locks" / "telegram-bot-token-2bb80d537b1da3e3.lock"
assert lock_path.exists()
status.release_scoped_lock("telegram-bot-token", "secret")
assert not lock_path.exists()

View file

@ -0,0 +1,100 @@
import asyncio
import sys
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock
import pytest
from gateway.config import PlatformConfig
def _ensure_telegram_mock():
if "telegram" in sys.modules and hasattr(sys.modules["telegram"], "__file__"):
return
telegram_mod = MagicMock()
telegram_mod.ext.ContextTypes.DEFAULT_TYPE = type(None)
telegram_mod.constants.ParseMode.MARKDOWN_V2 = "MarkdownV2"
telegram_mod.constants.ChatType.GROUP = "group"
telegram_mod.constants.ChatType.SUPERGROUP = "supergroup"
telegram_mod.constants.ChatType.CHANNEL = "channel"
telegram_mod.constants.ChatType.PRIVATE = "private"
for name in ("telegram", "telegram.ext", "telegram.constants"):
sys.modules.setdefault(name, telegram_mod)
_ensure_telegram_mock()
from gateway.platforms.telegram import TelegramAdapter # noqa: E402
@pytest.mark.asyncio
async def test_connect_rejects_same_host_token_lock(monkeypatch):
adapter = TelegramAdapter(PlatformConfig(enabled=True, token="secret-token"))
monkeypatch.setattr(
"gateway.status.acquire_scoped_lock",
lambda scope, identity, metadata=None: (False, {"pid": 4242}),
)
ok = await adapter.connect()
assert ok is False
assert adapter.fatal_error_code == "telegram_token_lock"
assert adapter.has_fatal_error is True
assert "already using this Telegram bot token" in adapter.fatal_error_message
@pytest.mark.asyncio
async def test_polling_conflict_stops_polling_and_notifies_handler(monkeypatch):
adapter = TelegramAdapter(PlatformConfig(enabled=True, token="secret-token"))
fatal_handler = AsyncMock()
adapter.set_fatal_error_handler(fatal_handler)
monkeypatch.setattr(
"gateway.status.acquire_scoped_lock",
lambda scope, identity, metadata=None: (True, None),
)
monkeypatch.setattr(
"gateway.status.release_scoped_lock",
lambda scope, identity: None,
)
captured = {}
async def fake_start_polling(**kwargs):
captured["error_callback"] = kwargs["error_callback"]
updater = SimpleNamespace(
start_polling=AsyncMock(side_effect=fake_start_polling),
stop=AsyncMock(),
)
bot = SimpleNamespace(set_my_commands=AsyncMock())
app = SimpleNamespace(
bot=bot,
updater=updater,
add_handler=MagicMock(),
initialize=AsyncMock(),
start=AsyncMock(),
)
builder = MagicMock()
builder.token.return_value = builder
builder.build.return_value = app
monkeypatch.setattr("gateway.platforms.telegram.Application", SimpleNamespace(builder=MagicMock(return_value=builder)))
ok = await adapter.connect()
assert ok is True
assert callable(captured["error_callback"])
conflict = type("Conflict", (Exception,), {})
captured["error_callback"](conflict("Conflict: terminated by other getUpdates request; make sure that only one bot instance is running"))
await asyncio.sleep(0)
await asyncio.sleep(0)
assert adapter.fatal_error_code == "telegram_polling_conflict"
assert adapter.has_fatal_error is True
updater.stop.assert_awaited()
fatal_handler.assert_awaited_once()

View file

@ -81,20 +81,21 @@ def _make_document(
return doc
def _make_message(document=None, caption=None):
"""Build a mock Telegram Message with the given document."""
def _make_message(document=None, caption=None, media_group_id=None, photo=None):
"""Build a mock Telegram Message with the given document/photo."""
msg = MagicMock()
msg.message_id = 42
msg.text = caption or ""
msg.caption = caption
msg.date = None
# Media flags — all None except document
msg.photo = None
# Media flags — all None except explicit payload
msg.photo = photo
msg.video = None
msg.audio = None
msg.voice = None
msg.sticker = None
msg.document = document
msg.media_group_id = media_group_id
# Chat / user
msg.chat = MagicMock()
msg.chat.id = 100
@ -165,6 +166,12 @@ class TestDocumentTypeDetection:
# TestDocumentDownloadBlock
# ---------------------------------------------------------------------------
def _make_photo(file_obj=None):
photo = MagicMock()
photo.get_file = AsyncMock(return_value=file_obj or _make_file_obj(b"photo-bytes"))
return photo
class TestDocumentDownloadBlock:
@pytest.mark.asyncio
async def test_supported_pdf_is_cached(self, adapter):
@ -339,6 +346,50 @@ class TestDocumentDownloadBlock:
adapter.handle_message.assert_called_once()
# ---------------------------------------------------------------------------
# TestMediaGroups — media group (album) buffering
# ---------------------------------------------------------------------------
class TestMediaGroups:
@pytest.mark.asyncio
async def test_photo_album_is_buffered_and_combined(self, adapter):
first_photo = _make_photo(_make_file_obj(b"first"))
second_photo = _make_photo(_make_file_obj(b"second"))
msg1 = _make_message(caption="two images", media_group_id="album-1", photo=[first_photo])
msg2 = _make_message(media_group_id="album-1", photo=[second_photo])
with patch("gateway.platforms.telegram.cache_image_from_bytes", side_effect=["/tmp/one.jpg", "/tmp/two.jpg"]):
await adapter._handle_media_message(_make_update(msg1), MagicMock())
await adapter._handle_media_message(_make_update(msg2), MagicMock())
assert adapter.handle_message.await_count == 0
await asyncio.sleep(adapter.MEDIA_GROUP_WAIT_SECONDS + 0.05)
adapter.handle_message.assert_awaited_once()
event = adapter.handle_message.call_args[0][0]
assert event.text == "two images"
assert event.media_urls == ["/tmp/one.jpg", "/tmp/two.jpg"]
assert len(event.media_types) == 2
@pytest.mark.asyncio
async def test_disconnect_cancels_pending_media_group_flush(self, adapter):
first_photo = _make_photo(_make_file_obj(b"first"))
msg = _make_message(caption="two images", media_group_id="album-2", photo=[first_photo])
with patch("gateway.platforms.telegram.cache_image_from_bytes", return_value="/tmp/one.jpg"):
await adapter._handle_media_message(_make_update(msg), MagicMock())
assert "album-2" in adapter._media_group_events
assert "album-2" in adapter._media_group_tasks
await adapter.disconnect()
await asyncio.sleep(adapter.MEDIA_GROUP_WAIT_SECONDS + 0.05)
assert adapter._media_group_events == {}
assert adapter._media_group_tasks == {}
adapter.handle_message.assert_not_awaited()
# ---------------------------------------------------------------------------
# TestSendDocument — outbound file attachment delivery
# ---------------------------------------------------------------------------

View file

@ -88,7 +88,7 @@ class TestHandleUpdateCommand:
@pytest.mark.asyncio
async def test_no_hermes_binary(self, tmp_path):
"""Returns error when hermes is not on PATH."""
"""Returns error when hermes is not on PATH and hermes_cli is not importable."""
runner = _make_runner()
event = _make_event()
@ -102,10 +102,77 @@ class TestHandleUpdateCommand:
with patch("gateway.run._hermes_home", tmp_path), \
patch("gateway.run.__file__", fake_file), \
patch("shutil.which", return_value=None):
patch("shutil.which", return_value=None), \
patch("importlib.util.find_spec", return_value=None):
result = await runner._handle_update_command(event)
assert "not found on PATH" in result
assert "Could not locate" in result
assert "hermes update" in result
@pytest.mark.asyncio
async def test_fallback_to_sys_executable(self, tmp_path):
"""Falls back to sys.executable -m hermes_cli.main when hermes not on PATH."""
runner = _make_runner()
event = _make_event()
fake_root = tmp_path / "project"
fake_root.mkdir()
(fake_root / ".git").mkdir()
(fake_root / "gateway").mkdir()
(fake_root / "gateway" / "run.py").touch()
fake_file = str(fake_root / "gateway" / "run.py")
hermes_home = tmp_path / "hermes"
hermes_home.mkdir()
mock_popen = MagicMock()
fake_spec = MagicMock()
with patch("gateway.run._hermes_home", hermes_home), \
patch("gateway.run.__file__", fake_file), \
patch("shutil.which", return_value=None), \
patch("importlib.util.find_spec", return_value=fake_spec), \
patch("subprocess.Popen", mock_popen):
result = await runner._handle_update_command(event)
assert "Starting Hermes update" in result
call_args = mock_popen.call_args[0][0]
# The update_cmd uses sys.executable -m hermes_cli.main
joined = " ".join(call_args) if isinstance(call_args, list) else call_args
assert "hermes_cli.main" in joined or "bash" in call_args[0]
@pytest.mark.asyncio
async def test_resolve_hermes_bin_prefers_which(self, tmp_path):
"""_resolve_hermes_bin returns argv parts from shutil.which when available."""
from gateway.run import _resolve_hermes_bin
with patch("shutil.which", return_value="/custom/path/hermes"):
result = _resolve_hermes_bin()
assert result == ["/custom/path/hermes"]
@pytest.mark.asyncio
async def test_resolve_hermes_bin_fallback(self):
"""_resolve_hermes_bin falls back to sys.executable argv when which fails."""
import sys
from gateway.run import _resolve_hermes_bin
fake_spec = MagicMock()
with patch("shutil.which", return_value=None), \
patch("importlib.util.find_spec", return_value=fake_spec):
result = _resolve_hermes_bin()
assert result == [sys.executable, "-m", "hermes_cli.main"]
@pytest.mark.asyncio
async def test_resolve_hermes_bin_returns_none_when_both_fail(self):
"""_resolve_hermes_bin returns None when both strategies fail."""
from gateway.run import _resolve_hermes_bin
with patch("shutil.which", return_value=None), \
patch("importlib.util.find_spec", return_value=None):
result = _resolve_hermes_bin()
assert result is None
@pytest.mark.asyncio
async def test_writes_pending_marker(self, tmp_path):

View file

@ -0,0 +1,107 @@
"""Tests for cmd_update — branch fallback when remote branch doesn't exist."""
import subprocess
from types import SimpleNamespace
from unittest.mock import patch
import pytest
from hermes_cli.main import cmd_update, PROJECT_ROOT
def _make_run_side_effect(branch="main", verify_ok=True, commit_count="0"):
"""Build a side_effect function for subprocess.run that simulates git commands."""
def side_effect(cmd, **kwargs):
joined = " ".join(str(c) for c in cmd)
# git rev-parse --abbrev-ref HEAD (get current branch)
if "rev-parse" in joined and "--abbrev-ref" in joined:
return subprocess.CompletedProcess(cmd, 0, stdout=f"{branch}\n", stderr="")
# git rev-parse --verify origin/{branch} (check remote branch exists)
if "rev-parse" in joined and "--verify" in joined:
rc = 0 if verify_ok else 128
return subprocess.CompletedProcess(cmd, rc, stdout="", stderr="")
# git rev-list HEAD..origin/{branch} --count
if "rev-list" in joined:
return subprocess.CompletedProcess(cmd, 0, stdout=f"{commit_count}\n", stderr="")
# Fallback: return a successful CompletedProcess with empty stdout
return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="")
return side_effect
@pytest.fixture
def mock_args():
return SimpleNamespace()
class TestCmdUpdateBranchFallback:
"""cmd_update falls back to main when current branch has no remote counterpart."""
@patch("shutil.which", return_value=None)
@patch("subprocess.run")
def test_update_falls_back_to_main_when_branch_not_on_remote(
self, mock_run, _mock_which, mock_args, capsys
):
mock_run.side_effect = _make_run_side_effect(
branch="fix/stoicneko", verify_ok=False, commit_count="3"
)
cmd_update(mock_args)
commands = [" ".join(str(a) for a in c.args[0]) for c in mock_run.call_args_list]
# rev-list should use origin/main, not origin/fix/stoicneko
rev_list_cmds = [c for c in commands if "rev-list" in c]
assert len(rev_list_cmds) == 1
assert "origin/main" in rev_list_cmds[0]
assert "origin/fix/stoicneko" not in rev_list_cmds[0]
# pull should use main, not fix/stoicneko
pull_cmds = [c for c in commands if "pull" in c]
assert len(pull_cmds) == 1
assert "main" in pull_cmds[0]
@patch("shutil.which", return_value=None)
@patch("subprocess.run")
def test_update_uses_current_branch_when_on_remote(
self, mock_run, _mock_which, mock_args, capsys
):
mock_run.side_effect = _make_run_side_effect(
branch="main", verify_ok=True, commit_count="2"
)
cmd_update(mock_args)
commands = [" ".join(str(a) for a in c.args[0]) for c in mock_run.call_args_list]
rev_list_cmds = [c for c in commands if "rev-list" in c]
assert len(rev_list_cmds) == 1
assert "origin/main" in rev_list_cmds[0]
pull_cmds = [c for c in commands if "pull" in c]
assert len(pull_cmds) == 1
assert "main" in pull_cmds[0]
@patch("shutil.which", return_value=None)
@patch("subprocess.run")
def test_update_already_up_to_date(
self, mock_run, _mock_which, mock_args, capsys
):
mock_run.side_effect = _make_run_side_effect(
branch="main", verify_ok=True, commit_count="0"
)
cmd_update(mock_args)
captured = capsys.readouterr()
assert "Already up to date!" in captured.out
# Should NOT have called pull
commands = [" ".join(str(a) for a in c.args[0]) for c in mock_run.call_args_list]
pull_cmds = [c for c in commands if "pull" in c]
assert len(pull_cmds) == 0

View file

@ -59,15 +59,16 @@ def test_systemd_install_checks_linger_status(monkeypatch, tmp_path, capsys):
unit_path = tmp_path / "systemd" / "user" / "hermes-gateway.service"
monkeypatch.setattr(gateway, "get_systemd_unit_path", lambda: unit_path)
monkeypatch.setattr(gateway, "get_systemd_linger_status", lambda: (False, ""))
calls = []
helper_calls = []
def fake_run(cmd, check=False, **kwargs):
calls.append((cmd, check))
return SimpleNamespace(returncode=0, stdout="", stderr="")
monkeypatch.setattr(gateway.subprocess, "run", fake_run)
monkeypatch.setattr(gateway, "_ensure_linger_enabled", lambda: helper_calls.append(True))
gateway.systemd_install(force=False)
@ -77,6 +78,5 @@ def test_systemd_install_checks_linger_status(monkeypatch, tmp_path, capsys):
["systemctl", "--user", "daemon-reload"],
["systemctl", "--user", "enable", gateway.SERVICE_NAME],
]
assert helper_calls == [True]
assert "Service installed and enabled" in out
assert "Systemd linger is disabled" in out
assert "loginctl enable-linger" in out

View file

@ -0,0 +1,120 @@
"""Tests for gateway linger auto-enable behavior on headless Linux installs."""
from types import SimpleNamespace
import hermes_cli.gateway as gateway
class TestEnsureLingerEnabled:
def test_linger_already_enabled_via_file(self, monkeypatch, capsys):
monkeypatch.setattr(gateway, "is_linux", lambda: True)
monkeypatch.setattr("getpass.getuser", lambda: "testuser")
monkeypatch.setattr(gateway, "Path", lambda _path: SimpleNamespace(exists=lambda: True))
calls = []
monkeypatch.setattr(gateway.subprocess, "run", lambda *args, **kwargs: calls.append((args, kwargs)))
gateway._ensure_linger_enabled()
out = capsys.readouterr().out
assert "Systemd linger is enabled" in out
assert calls == []
def test_status_enabled_skips_enable(self, monkeypatch, capsys):
monkeypatch.setattr(gateway, "is_linux", lambda: True)
monkeypatch.setattr("getpass.getuser", lambda: "testuser")
monkeypatch.setattr(gateway, "Path", lambda _path: SimpleNamespace(exists=lambda: False))
monkeypatch.setattr(gateway, "get_systemd_linger_status", lambda: (True, ""))
calls = []
monkeypatch.setattr(gateway.subprocess, "run", lambda *args, **kwargs: calls.append((args, kwargs)))
gateway._ensure_linger_enabled()
out = capsys.readouterr().out
assert "Systemd linger is enabled" in out
assert calls == []
def test_loginctl_success_enables_linger(self, monkeypatch, capsys):
monkeypatch.setattr(gateway, "is_linux", lambda: True)
monkeypatch.setattr("getpass.getuser", lambda: "testuser")
monkeypatch.setattr(gateway, "Path", lambda _path: SimpleNamespace(exists=lambda: False))
monkeypatch.setattr(gateway, "get_systemd_linger_status", lambda: (False, ""))
monkeypatch.setattr("shutil.which", lambda name: "/usr/bin/loginctl")
run_calls = []
def fake_run(cmd, capture_output=False, text=False, check=False):
run_calls.append((cmd, capture_output, text, check))
return SimpleNamespace(returncode=0, stdout="", stderr="")
monkeypatch.setattr(gateway.subprocess, "run", fake_run)
gateway._ensure_linger_enabled()
out = capsys.readouterr().out
assert "Enabling linger" in out
assert "Linger enabled" in out
assert run_calls == [(["loginctl", "enable-linger", "testuser"], True, True, False)]
def test_missing_loginctl_shows_manual_guidance(self, monkeypatch, capsys):
monkeypatch.setattr(gateway, "is_linux", lambda: True)
monkeypatch.setattr("getpass.getuser", lambda: "testuser")
monkeypatch.setattr(gateway, "Path", lambda _path: SimpleNamespace(exists=lambda: False))
monkeypatch.setattr(gateway, "get_systemd_linger_status", lambda: (None, "loginctl not found"))
monkeypatch.setattr("shutil.which", lambda name: None)
calls = []
monkeypatch.setattr(gateway.subprocess, "run", lambda *args, **kwargs: calls.append((args, kwargs)))
gateway._ensure_linger_enabled()
out = capsys.readouterr().out
assert "sudo loginctl enable-linger testuser" in out
assert "loginctl not found" in out
assert calls == []
def test_loginctl_failure_shows_manual_guidance(self, monkeypatch, capsys):
monkeypatch.setattr(gateway, "is_linux", lambda: True)
monkeypatch.setattr("getpass.getuser", lambda: "testuser")
monkeypatch.setattr(gateway, "Path", lambda _path: SimpleNamespace(exists=lambda: False))
monkeypatch.setattr(gateway, "get_systemd_linger_status", lambda: (False, ""))
monkeypatch.setattr("shutil.which", lambda name: "/usr/bin/loginctl")
monkeypatch.setattr(
gateway.subprocess,
"run",
lambda *args, **kwargs: SimpleNamespace(returncode=1, stdout="", stderr="Permission denied"),
)
gateway._ensure_linger_enabled()
out = capsys.readouterr().out
assert "sudo loginctl enable-linger testuser" in out
assert "Permission denied" in out
def test_systemd_install_calls_linger_helper(monkeypatch, tmp_path, capsys):
unit_path = tmp_path / "systemd" / "user" / "hermes-gateway.service"
monkeypatch.setattr(gateway, "get_systemd_unit_path", lambda: unit_path)
calls = []
def fake_run(cmd, check=False, **kwargs):
calls.append((cmd, check))
return SimpleNamespace(returncode=0, stdout="", stderr="")
helper_calls = []
monkeypatch.setattr(gateway.subprocess, "run", fake_run)
monkeypatch.setattr(gateway, "_ensure_linger_enabled", lambda: helper_calls.append(True))
gateway.systemd_install(force=False)
out = capsys.readouterr().out
assert unit_path.exists()
assert [cmd for cmd, _ in calls] == [
["systemctl", "--user", "daemon-reload"],
["systemctl", "--user", "enable", gateway.SERVICE_NAME],
]
assert helper_calls == [True]
assert "Service installed and enabled" in out

View file

@ -0,0 +1,22 @@
from hermes_cli.gateway import _runtime_health_lines
def test_runtime_health_lines_include_fatal_platform_and_startup_reason(monkeypatch):
monkeypatch.setattr(
"gateway.status.read_runtime_status",
lambda: {
"gateway_state": "startup_failed",
"exit_reason": "telegram conflict",
"platforms": {
"telegram": {
"state": "fatal",
"error_message": "another poller is active",
}
},
},
)
lines = _runtime_health_lines()
assert "⚠ telegram: another poller is active" in lines
assert "⚠ Last startup issue: telegram conflict" in lines

View file

@ -0,0 +1,48 @@
"""Tests for CLI placeholder text in config/setup output."""
import os
from argparse import Namespace
from unittest.mock import patch
import pytest
from hermes_cli.config import config_command, show_config
from hermes_cli.setup import _print_setup_summary
def test_config_set_usage_marks_placeholders(capsys):
args = Namespace(config_command="set", key=None, value=None)
with pytest.raises(SystemExit) as exc:
config_command(args)
assert exc.value.code == 1
out = capsys.readouterr().out
assert "Usage: hermes config set <key> <value>" in out
def test_config_unknown_command_help_marks_placeholders(capsys):
args = Namespace(config_command="wat")
with pytest.raises(SystemExit) as exc:
config_command(args)
assert exc.value.code == 1
out = capsys.readouterr().out
assert "hermes config set <key> <value> Set a config value" in out
def test_show_config_marks_placeholders(tmp_path, capsys):
with patch.dict(os.environ, {"HERMES_HOME": str(tmp_path)}):
show_config()
out = capsys.readouterr().out
assert "hermes config set <key> <value>" in out
def test_setup_summary_marks_placeholders(tmp_path, capsys):
with patch.dict(os.environ, {"HERMES_HOME": str(tmp_path)}):
_print_setup_summary({"tts": {"provider": "edge"}}, tmp_path)
out = capsys.readouterr().out
assert "hermes config set <key> <value>" in out

View file

@ -3,7 +3,7 @@
from __future__ import annotations
from hermes_cli.config import load_config, save_config, save_env_value
from hermes_cli.setup import setup_model_provider
from hermes_cli.setup import _print_setup_summary, setup_model_provider
def _read_env(home):
@ -50,11 +50,15 @@ def test_setup_keep_current_custom_from_config_does_not_fall_through(tmp_path, m
calls = {"count": 0}
def fake_prompt_choice(_question, choices, default=0):
def fake_prompt_choice(question, choices, default=0):
calls["count"] += 1
if calls["count"] == 1:
assert choices[-1] == "Keep current (Custom: https://example.invalid/v1)"
return len(choices) - 1
if calls["count"] == 2:
assert question == "Configure vision:"
assert choices[-1] == "Skip for now"
return len(choices) - 1
raise AssertionError("Model menu should not appear for keep-current custom")
monkeypatch.setattr("hermes_cli.setup.prompt_choice", fake_prompt_choice)
@ -70,7 +74,7 @@ def test_setup_keep_current_custom_from_config_does_not_fall_through(tmp_path, m
assert reloaded["model"]["provider"] == "custom"
assert reloaded["model"]["default"] == "custom/model"
assert reloaded["model"]["base_url"] == "https://example.invalid/v1"
assert calls["count"] == 1
assert calls["count"] == 2
def test_setup_keep_current_config_provider_uses_provider_specific_model_menu(tmp_path, monkeypatch):
@ -88,13 +92,17 @@ def test_setup_keep_current_config_provider_uses_provider_specific_model_menu(tm
captured = {"provider_choices": None, "model_choices": None}
calls = {"count": 0}
def fake_prompt_choice(_question, choices, default=0):
def fake_prompt_choice(question, choices, default=0):
calls["count"] += 1
if calls["count"] == 1:
captured["provider_choices"] = list(choices)
assert choices[-1] == "Keep current (Anthropic)"
return len(choices) - 1
if calls["count"] == 2:
assert question == "Configure vision:"
assert choices[-1] == "Skip for now"
return len(choices) - 1
if calls["count"] == 3:
captured["model_choices"] = list(choices)
return len(choices) - 1 # keep current model
raise AssertionError("Unexpected extra prompt_choice call")
@ -113,7 +121,43 @@ def test_setup_keep_current_config_provider_uses_provider_specific_model_menu(tm
assert captured["model_choices"] is not None
assert captured["model_choices"][0] == "claude-opus-4-6"
assert "anthropic/claude-opus-4.6 (recommended)" not in captured["model_choices"]
assert calls["count"] == 2
assert calls["count"] == 3
def test_setup_keep_current_anthropic_can_configure_openai_vision_default(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
_clear_provider_env(monkeypatch)
config = load_config()
config["model"] = {
"default": "claude-opus-4-6",
"provider": "anthropic",
}
save_config(config)
picks = iter([
9, # keep current provider
1, # configure vision with OpenAI
5, # use default gpt-4o-mini vision model
4, # keep current Anthropic model
])
monkeypatch.setattr("hermes_cli.setup.prompt_choice", lambda *args, **kwargs: next(picks))
monkeypatch.setattr(
"hermes_cli.setup.prompt",
lambda message, *args, **kwargs: "sk-openai" if "OpenAI API key" in message else "",
)
monkeypatch.setattr("hermes_cli.setup.prompt_yes_no", lambda *args, **kwargs: False)
monkeypatch.setattr("hermes_cli.auth.get_active_provider", lambda: None)
monkeypatch.setattr("hermes_cli.auth.detect_external_credentials", lambda: [])
monkeypatch.setattr("hermes_cli.models.provider_model_ids", lambda provider: [])
setup_model_provider(config)
env = _read_env(tmp_path)
assert env.get("OPENAI_API_KEY") == "sk-openai"
assert env.get("OPENAI_BASE_URL") == "https://api.openai.com/v1"
assert env.get("AUXILIARY_VISION_MODEL") == "gpt-4o-mini"
def test_setup_switch_custom_to_codex_clears_custom_endpoint_and_updates_config(tmp_path, monkeypatch):
@ -144,7 +188,7 @@ def test_setup_switch_custom_to_codex_clears_custom_endpoint_and_updates_config(
"hermes_cli.auth.resolve_codex_runtime_credentials",
lambda *args, **kwargs: {
"base_url": "https://chatgpt.com/backend-api/codex",
"api_key": "codex-access-token",
"api_key": "codex-...oken",
},
)
monkeypatch.setattr(
@ -163,3 +207,22 @@ def test_setup_switch_custom_to_codex_clears_custom_endpoint_and_updates_config(
assert reloaded["model"]["provider"] == "openai-codex"
assert reloaded["model"]["default"] == "openai/gpt-5.3-codex"
assert reloaded["model"]["base_url"] == "https://chatgpt.com/backend-api/codex"
def test_setup_summary_marks_codex_auth_as_vision_available(tmp_path, monkeypatch, capsys):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
_clear_provider_env(monkeypatch)
(tmp_path / "auth.json").write_text(
'{"active_provider":"openai-codex","providers":{"openai-codex":{"tokens":{"access_token":"tok"}}}}'
)
monkeypatch.setattr("shutil.which", lambda _name: None)
_print_setup_summary(load_config(), tmp_path)
output = capsys.readouterr().out
assert "Vision (image analysis)" in output
assert "missing run 'hermes setup' to configure" not in output
assert "Mixture of Agents" in output
assert "missing OPENROUTER_API_KEY" in output

View file

@ -3,7 +3,7 @@ from io import StringIO
import pytest
from rich.console import Console
from hermes_cli.skills_hub import do_check, do_list, do_update
from hermes_cli.skills_hub import do_check, do_list, do_update, handle_skills_slash
class _DummyLockFile:

View file

@ -0,0 +1,26 @@
import sys
from types import SimpleNamespace
def test_cli_skills_install_accepts_yes_alias(monkeypatch):
from hermes_cli.main import main
captured = {}
def fake_skills_command(args):
captured["identifier"] = args.identifier
captured["force"] = args.force
monkeypatch.setattr("hermes_cli.skills_hub.skills_command", fake_skills_command)
monkeypatch.setattr(
sys,
"argv",
["hermes", "skills", "install", "official/email/agentmail", "--yes"],
)
main()
assert captured == {
"identifier": "official/email/agentmail",
"force": True,
}

View file

@ -484,3 +484,22 @@ class TestResizeToolPool:
"""resize_tool_pool should not raise."""
resize_tool_pool(16) # Small pool for testing
resize_tool_pool(128) # Restore default
def test_resize_shuts_down_previous_executor(self, monkeypatch):
"""Replacing the global tool executor should shut down the old pool."""
import environments.agent_loop as agent_loop_module
old_executor = MagicMock()
new_executor = MagicMock()
monkeypatch.setattr(agent_loop_module, "_tool_executor", old_executor)
monkeypatch.setattr(
agent_loop_module.concurrent.futures,
"ThreadPoolExecutor",
MagicMock(return_value=new_executor),
)
resize_tool_pool(16)
old_executor.shutdown.assert_called_once_with(wait=False)
assert agent_loop_module._tool_executor is new_executor

View file

@ -0,0 +1,100 @@
import queue
import threading
import time
from types import SimpleNamespace
from unittest.mock import MagicMock
from cli import HermesCLI
def _make_cli_stub():
cli = HermesCLI.__new__(HermesCLI)
cli._approval_state = None
cli._approval_deadline = 0
cli._approval_lock = threading.Lock()
cli._invalidate = MagicMock()
cli._app = SimpleNamespace(invalidate=MagicMock())
return cli
class TestCliApprovalUi:
def test_approval_callback_includes_view_for_long_commands(self):
cli = _make_cli_stub()
command = "sudo dd if=/tmp/githubcli-keyring.gpg of=/usr/share/keyrings/githubcli-archive-keyring.gpg bs=4M status=progress"
result = {}
def _run_callback():
result["value"] = cli._approval_callback(command, "disk copy")
thread = threading.Thread(target=_run_callback, daemon=True)
thread.start()
deadline = time.time() + 2
while cli._approval_state is None and time.time() < deadline:
time.sleep(0.01)
assert cli._approval_state is not None
assert "view" in cli._approval_state["choices"]
cli._approval_state["response_queue"].put("deny")
thread.join(timeout=2)
assert result["value"] == "deny"
def test_handle_approval_selection_view_expands_in_place(self):
cli = _make_cli_stub()
cli._approval_state = {
"command": "sudo dd if=/tmp/in of=/usr/share/keyrings/githubcli-archive-keyring.gpg bs=4M status=progress",
"description": "disk copy",
"choices": ["once", "session", "always", "deny", "view"],
"selected": 4,
"response_queue": queue.Queue(),
}
cli._handle_approval_selection()
assert cli._approval_state is not None
assert cli._approval_state["show_full"] is True
assert "view" not in cli._approval_state["choices"]
assert cli._approval_state["selected"] == 3
assert cli._approval_state["response_queue"].empty()
def test_approval_display_places_title_inside_box_not_border(self):
cli = _make_cli_stub()
cli._approval_state = {
"command": "sudo dd if=/tmp/in of=/usr/share/keyrings/githubcli-archive-keyring.gpg bs=4M status=progress",
"description": "disk copy",
"choices": ["once", "session", "always", "deny", "view"],
"selected": 0,
"response_queue": queue.Queue(),
}
fragments = cli._get_approval_display_fragments()
rendered = "".join(text for _style, text in fragments)
lines = rendered.splitlines()
assert lines[0].startswith("")
assert "Dangerous Command" not in lines[0]
assert any("Dangerous Command" in line for line in lines[1:3])
assert "Show full command" in rendered
assert "githubcli-archive-keyring.gpg" not in rendered
def test_approval_display_shows_full_command_after_view(self):
cli = _make_cli_stub()
full_command = "sudo dd if=/tmp/in of=/usr/share/keyrings/githubcli-archive-keyring.gpg bs=4M status=progress"
cli._approval_state = {
"command": full_command,
"description": "disk copy",
"choices": ["once", "session", "always", "deny"],
"selected": 0,
"show_full": True,
"response_queue": queue.Queue(),
}
fragments = cli._get_approval_display_fragments()
rendered = "".join(text for _style, text in fragments)
assert "..." not in rendered
assert "githubcli-" in rendered
assert "archive-" in rendered
assert "keyring.gpg" in rendered
assert "status=progress" in rendered

View file

@ -0,0 +1,117 @@
"""Tests for slash command prefix matching in HermesCLI.process_command."""
from unittest.mock import MagicMock, patch
from cli import HermesCLI
def _make_cli():
cli_obj = HermesCLI.__new__(HermesCLI)
cli_obj.config = {}
cli_obj.console = MagicMock()
cli_obj.agent = None
cli_obj.conversation_history = []
cli_obj.session_id = None
cli_obj._pending_input = MagicMock()
return cli_obj
class TestSlashCommandPrefixMatching:
def test_unique_prefix_dispatches_command(self):
"""/con should dispatch to /config when it uniquely matches."""
cli_obj = _make_cli()
with patch.object(cli_obj, 'show_config') as mock_config:
cli_obj.process_command("/con")
mock_config.assert_called_once()
def test_unique_prefix_with_args_does_not_recurse(self):
"""/con set key value should expand to /config set key value without infinite recursion."""
cli_obj = _make_cli()
dispatched = []
original = cli_obj.process_command.__func__
def counting_process_command(self_inner, cmd):
dispatched.append(cmd)
if len(dispatched) > 5:
raise RecursionError("process_command called too many times")
return original(self_inner, cmd)
with patch.object(type(cli_obj), 'process_command', counting_process_command):
try:
cli_obj.process_command("/con set key value")
except RecursionError:
assert False, "process_command recursed infinitely"
# Should have been called at most twice: once for /con set..., once for /config set...
assert len(dispatched) <= 2
def test_exact_command_with_args_does_not_recurse(self):
"""/config set key value hits exact branch and does not loop back to prefix."""
cli_obj = _make_cli()
call_count = [0]
original_pc = HermesCLI.process_command
def guarded(self_inner, cmd):
call_count[0] += 1
if call_count[0] > 10:
raise RecursionError("Infinite recursion detected")
return original_pc(self_inner, cmd)
with patch.object(HermesCLI, 'process_command', guarded):
try:
cli_obj.process_command("/config set key value")
except RecursionError:
assert False, "Recursed infinitely on /config set key value"
assert call_count[0] <= 3
def test_ambiguous_prefix_shows_suggestions(self):
"""/re matches multiple commands — should show ambiguous message."""
cli_obj = _make_cli()
cli_obj.process_command("/re")
printed = " ".join(str(c) for c in cli_obj.console.print.call_args_list)
assert "Ambiguous" in printed or "Did you mean" in printed
def test_unknown_command_shows_error(self):
"""/xyz should show unknown command error."""
cli_obj = _make_cli()
cli_obj.process_command("/xyz")
printed = " ".join(str(c) for c in cli_obj.console.print.call_args_list)
assert "Unknown command" in printed
def test_exact_command_still_works(self):
"""/help should still work as exact match."""
cli_obj = _make_cli()
with patch.object(cli_obj, 'show_help') as mock_help:
cli_obj.process_command("/help")
mock_help.assert_called_once()
def test_skill_command_prefix_matches(self):
"""A prefix that uniquely matches a skill command should dispatch it."""
cli_obj = _make_cli()
fake_skill = {"/test-skill-xyz": {"name": "Test Skill", "description": "test"}}
printed = []
cli_obj.console.print = lambda *a, **kw: printed.append(str(a))
import cli as cli_mod
with patch.object(cli_mod, '_skill_commands', fake_skill):
cli_obj.process_command("/test-skill-xy")
# Should NOT show "Unknown command" — should have dispatched or attempted skill
unknown = any("Unknown command" in p for p in printed)
assert not unknown, f"Expected skill prefix to match, got: {printed}"
def test_ambiguous_between_builtin_and_skill(self):
"""Ambiguous prefix spanning builtin + skill commands shows suggestions."""
cli_obj = _make_cli()
# /help-extra is a fake skill that shares /hel prefix with /help
fake_skill = {"/help-extra": {"name": "Help Extra", "description": "test"}}
import cli as cli_mod
with patch.object(cli_mod, '_skill_commands', fake_skill), patch.object(cli_obj, 'show_help') as mock_help:
cli_obj.process_command("/help")
# /help is an exact match so should work normally, not show ambiguous
mock_help.assert_called_once()
printed = " ".join(str(c) for c in cli_obj.console.print.call_args_list)
assert "Ambiguous" not in printed

View file

@ -186,6 +186,11 @@ def test_codex_provider_replaces_incompatible_default_model(monkeypatch):
monkeypatch.delenv("LLM_MODEL", raising=False)
monkeypatch.delenv("OPENAI_MODEL", raising=False)
# Ensure local user config does not leak a model into the test
monkeypatch.setitem(cli.CLI_CONFIG, "model", {
"default": "",
"base_url": "https://openrouter.ai/api/v1",
})
def _runtime_resolve(**kwargs):
return {
@ -240,6 +245,11 @@ def test_codex_provider_uses_config_model(monkeypatch):
monkeypatch.setattr("hermes_cli.runtime_provider.resolve_runtime_provider", _runtime_resolve)
monkeypatch.setattr("hermes_cli.runtime_provider.format_runtime_provider_error", lambda exc: str(exc))
# Prevent live API call from overriding the config model
monkeypatch.setattr(
"hermes_cli.codex_models.get_codex_model_ids",
lambda access_token=None: ["gpt-5.2-codex"],
)
shell = cli.HermesCLI(compact=True, max_turns=1)

View file

@ -150,7 +150,7 @@ def test_custom_endpoint_auto_provider_prefers_openai_key(monkeypatch):
monkeypatch.setenv("OPENAI_BASE_URL", "https://my-vllm-server.example.com/v1")
monkeypatch.delenv("OPENROUTER_BASE_URL", raising=False)
monkeypatch.setenv("OPENAI_API_KEY", "sk-vllm-key")
monkeypatch.setenv("OPENROUTER_API_KEY", "sk-or-should-not-leak")
monkeypatch.setenv("OPENROUTER_API_KEY", "sk-or-...leak")
resolved = rp.resolve_runtime_provider(requested="auto")
@ -158,6 +158,107 @@ def test_custom_endpoint_auto_provider_prefers_openai_key(monkeypatch):
assert resolved["api_key"] == "sk-vllm-key"
def test_named_custom_provider_uses_saved_credentials(monkeypatch):
monkeypatch.delenv("OPENAI_API_KEY", raising=False)
monkeypatch.delenv("OPENROUTER_API_KEY", raising=False)
monkeypatch.setattr(
rp,
"load_config",
lambda: {
"custom_providers": [
{
"name": "Local",
"base_url": "http://1.2.3.4:1234/v1",
"api_key": "local-provider-key",
}
]
},
)
monkeypatch.setattr(
rp,
"resolve_provider",
lambda *a, **k: (_ for _ in ()).throw(
AssertionError(
"resolve_provider should not be called for named custom providers"
)
),
)
resolved = rp.resolve_runtime_provider(requested="local")
assert resolved["provider"] == "openrouter"
assert resolved["api_mode"] == "chat_completions"
assert resolved["base_url"] == "http://1.2.3.4:1234/v1"
assert resolved["api_key"] == "local-provider-key"
assert resolved["requested_provider"] == "local"
assert resolved["source"] == "custom_provider:Local"
def test_named_custom_provider_falls_back_to_openai_api_key(monkeypatch):
monkeypatch.setenv("OPENAI_API_KEY", "env-openai-key")
monkeypatch.delenv("OPENROUTER_API_KEY", raising=False)
monkeypatch.setattr(
rp,
"load_config",
lambda: {
"custom_providers": [
{
"name": "Local LLM",
"base_url": "http://localhost:1234/v1",
}
]
},
)
monkeypatch.setattr(
rp,
"resolve_provider",
lambda *a, **k: (_ for _ in ()).throw(
AssertionError(
"resolve_provider should not be called for named custom providers"
)
),
)
resolved = rp.resolve_runtime_provider(requested="custom:local-llm")
assert resolved["base_url"] == "http://localhost:1234/v1"
assert resolved["api_key"] == "env-openai-key"
assert resolved["requested_provider"] == "custom:local-llm"
def test_named_custom_provider_does_not_shadow_builtin_provider(monkeypatch):
monkeypatch.setattr(
rp,
"load_config",
lambda: {
"custom_providers": [
{
"name": "nous",
"base_url": "http://localhost:1234/v1",
"api_key": "shadow-key",
}
]
},
)
monkeypatch.setattr(
rp,
"resolve_nous_runtime_credentials",
lambda **kwargs: {
"base_url": "https://inference-api.nousresearch.com/v1",
"api_key": "nous-runtime-key",
"source": "portal",
"expires_at": None,
},
)
resolved = rp.resolve_runtime_provider(requested="nous")
assert resolved["provider"] == "nous"
assert resolved["base_url"] == "https://inference-api.nousresearch.com/v1"
assert resolved["api_key"] == "nous-runtime-key"
assert resolved["requested_provider"] == "nous"
def test_explicit_openrouter_skips_openai_base_url(monkeypatch):
"""When the user explicitly requests openrouter, OPENAI_BASE_URL
(which may point to a custom endpoint) must not override the

View file

@ -328,6 +328,34 @@ class TestCronTimezone:
"Overdue job was skipped — _ensure_aware likely shifted absolute time"
)
def test_get_due_jobs_naive_cross_timezone(self, tmp_path, monkeypatch):
"""Naive past timestamps must be detected as due even when Hermes tz
is behind system local tz the scenario that triggered #806."""
import cron.jobs as jobs_module
monkeypatch.setattr(jobs_module, "CRON_DIR", tmp_path / "cron")
monkeypatch.setattr(jobs_module, "JOBS_FILE", tmp_path / "cron" / "jobs.json")
monkeypatch.setattr(jobs_module, "OUTPUT_DIR", tmp_path / "cron" / "output")
# Use a Hermes timezone far behind UTC so that the numeric wall time
# of the naive timestamp exceeds _hermes_now's wall time — this would
# have caused a false "not due" with the old replace(tzinfo=...) approach.
os.environ["HERMES_TIMEZONE"] = "Pacific/Midway" # UTC-11
hermes_time.reset_cache()
from cron.jobs import create_job, load_jobs, save_jobs, get_due_jobs
create_job(prompt="Cross-tz job", schedule="every 1h")
jobs = load_jobs()
# Force a naive past timestamp (system-local wall time, 10 min ago)
naive_past = (datetime.now() - timedelta(minutes=10)).isoformat()
jobs[0]["next_run_at"] = naive_past
save_jobs(jobs)
due = get_due_jobs()
assert len(due) == 1, (
"Naive past timestamp should be due regardless of Hermes timezone"
)
def test_create_job_stores_tz_aware_timestamps(self, tmp_path, monkeypatch):
"""New jobs store timezone-aware created_at and next_run_at."""
import cron.jobs as jobs_module

View file

@ -1,7 +1,10 @@
"""Tests for trajectory_compressor.py — config, metrics, and compression logic."""
import json
from unittest.mock import patch, MagicMock
from types import SimpleNamespace
from unittest.mock import AsyncMock, patch, MagicMock
import pytest
from trajectory_compressor import (
CompressionConfig,
@ -384,3 +387,32 @@ class TestTokenCounting:
tc.tokenizer.encode = MagicMock(side_effect=Exception("fail"))
# Should fallback to len(text) // 4
assert tc.count_tokens("12345678") == 2
class TestGenerateSummary:
def test_generate_summary_handles_none_content(self):
tc = _make_compressor()
tc.client = MagicMock()
tc.client.chat.completions.create.return_value = SimpleNamespace(
choices=[SimpleNamespace(message=SimpleNamespace(content=None))]
)
metrics = TrajectoryMetrics()
summary = tc._generate_summary("Turn content", metrics)
assert summary == "[CONTEXT SUMMARY]:"
@pytest.mark.asyncio
async def test_generate_summary_async_handles_none_content(self):
tc = _make_compressor()
tc.async_client = MagicMock()
tc.async_client.chat.completions.create = AsyncMock(
return_value=SimpleNamespace(
choices=[SimpleNamespace(message=SimpleNamespace(content=None))]
)
)
metrics = TrajectoryMetrics()
summary = await tc._generate_summary_async("Turn content", metrics)
assert summary == "[CONTEXT SUMMARY]:"

View file

@ -0,0 +1,96 @@
"""Regression tests for browser session cleanup and screenshot recovery."""
from unittest.mock import patch
class TestScreenshotPathRecovery:
def test_extracts_standard_absolute_path(self):
from tools.browser_tool import _extract_screenshot_path_from_text
assert (
_extract_screenshot_path_from_text("Screenshot saved to /tmp/foo.png")
== "/tmp/foo.png"
)
def test_extracts_quoted_absolute_path(self):
from tools.browser_tool import _extract_screenshot_path_from_text
assert (
_extract_screenshot_path_from_text(
"Screenshot saved to '/Users/david/.hermes/browser_screenshots/shot.png'"
)
== "/Users/david/.hermes/browser_screenshots/shot.png"
)
class TestBrowserCleanup:
def setup_method(self):
from tools import browser_tool
self.browser_tool = browser_tool
self.orig_active_sessions = browser_tool._active_sessions.copy()
self.orig_session_last_activity = browser_tool._session_last_activity.copy()
self.orig_recording_sessions = browser_tool._recording_sessions.copy()
self.orig_cleanup_done = browser_tool._cleanup_done
def teardown_method(self):
self.browser_tool._active_sessions.clear()
self.browser_tool._active_sessions.update(self.orig_active_sessions)
self.browser_tool._session_last_activity.clear()
self.browser_tool._session_last_activity.update(self.orig_session_last_activity)
self.browser_tool._recording_sessions.clear()
self.browser_tool._recording_sessions.update(self.orig_recording_sessions)
self.browser_tool._cleanup_done = self.orig_cleanup_done
def test_cleanup_browser_clears_tracking_state(self):
browser_tool = self.browser_tool
browser_tool._active_sessions["task-1"] = {
"session_name": "sess-1",
"bb_session_id": None,
}
browser_tool._session_last_activity["task-1"] = 123.0
with (
patch("tools.browser_tool._maybe_stop_recording") as mock_stop,
patch(
"tools.browser_tool._run_browser_command",
return_value={"success": True},
) as mock_run,
patch("tools.browser_tool.os.path.exists", return_value=False),
):
browser_tool.cleanup_browser("task-1")
assert "task-1" not in browser_tool._active_sessions
assert "task-1" not in browser_tool._session_last_activity
mock_stop.assert_called_once_with("task-1")
mock_run.assert_called_once_with("task-1", "close", [], timeout=10)
def test_browser_close_delegates_to_cleanup_browser(self):
import json
browser_tool = self.browser_tool
browser_tool._active_sessions["task-2"] = {"session_name": "sess-2"}
with patch("tools.browser_tool.cleanup_browser") as mock_cleanup:
result = json.loads(browser_tool.browser_close("task-2"))
assert result == {"success": True, "closed": True}
mock_cleanup.assert_called_once_with("task-2")
def test_emergency_cleanup_clears_all_tracking_state(self):
browser_tool = self.browser_tool
browser_tool._cleanup_done = False
browser_tool._active_sessions["task-1"] = {"session_name": "sess-1"}
browser_tool._active_sessions["task-2"] = {"session_name": "sess-2"}
browser_tool._session_last_activity["task-1"] = 1.0
browser_tool._session_last_activity["task-2"] = 2.0
browser_tool._recording_sessions.update({"task-1", "task-2"})
with patch("tools.browser_tool.cleanup_all_browsers") as mock_cleanup_all:
browser_tool._emergency_cleanup_all_sessions()
mock_cleanup_all.assert_called_once_with()
assert browser_tool._active_sessions == {}
assert browser_tool._session_last_activity == {}
assert browser_tool._recording_sessions == set()
assert browser_tool._cleanup_done is True

View file

@ -1,11 +1,8 @@
"""Tests for the --force flag dangerous verdict bypass fix in skills_guard.py.
"""Regression tests for skills guard policy precedence.
Regression test: the old code had `if result.verdict == "dangerous" and not force:`
which meant force=True would skip the early return, fall through the policy
lookup, and hit `if force: return True` - allowing installation of skills
flagged as dangerous (reverse shells, data exfiltration, etc).
The docstring explicitly states: "never overrides dangerous".
Official/builtin skills should follow the INSTALL_POLICY table even when their
scan verdict is dangerous, and --force should override blocked verdicts for
non-builtin sources.
"""
@ -44,10 +41,6 @@ def _new_should_allow(verdict, trust_level, force):
}
VERDICT_INDEX = {"safe": 0, "caution": 1, "dangerous": 2}
# Fixed: no `and not force` - dangerous is always blocked
if verdict == "dangerous":
return False
policy = INSTALL_POLICY.get(trust_level, INSTALL_POLICY["community"])
vi = VERDICT_INDEX.get(verdict, 2)
decision = policy[vi]
@ -61,35 +54,28 @@ def _new_should_allow(verdict, trust_level, force):
return False
class TestForceNeverOverridesDangerous:
"""The core bug: --force bypassed the dangerous verdict block."""
class TestPolicyPrecedenceForDangerousVerdicts:
def test_builtin_dangerous_is_allowed_by_policy(self):
assert _new_should_allow("dangerous", "builtin", force=False) is True
def test_old_code_allows_dangerous_with_force(self):
"""Old code: force=True lets dangerous skills through."""
assert _old_should_allow("dangerous", "community", force=True) is True
def test_trusted_dangerous_is_blocked_without_force(self):
assert _new_should_allow("dangerous", "trusted", force=False) is False
def test_new_code_blocks_dangerous_with_force(self):
"""Fixed code: force=True still blocks dangerous skills."""
assert _new_should_allow("dangerous", "community", force=True) is False
def test_force_overrides_dangerous_for_community(self):
assert _new_should_allow("dangerous", "community", force=True) is True
def test_new_code_blocks_dangerous_trusted_with_force(self):
"""Fixed code: even trusted + force cannot install dangerous."""
assert _new_should_allow("dangerous", "trusted", force=True) is False
def test_force_overrides_dangerous_for_trusted(self):
assert _new_should_allow("dangerous", "trusted", force=True) is True
def test_force_still_overrides_caution(self):
"""force=True should still work for caution verdicts."""
assert _new_should_allow("caution", "community", force=True) is True
def test_caution_community_blocked_without_force(self):
"""Caution + community is blocked without force (unchanged)."""
assert _new_should_allow("caution", "community", force=False) is False
def test_safe_always_allowed(self):
"""Safe verdict is always allowed regardless of force."""
assert _new_should_allow("safe", "community", force=False) is True
assert _new_should_allow("safe", "community", force=True) is True
def test_dangerous_blocked_without_force(self):
"""Dangerous is blocked without force (both old and new agree)."""
assert _old_should_allow("dangerous", "community", force=False) is False
assert _new_should_allow("dangerous", "community", force=False) is False
def test_old_code_happened_to_allow_forced_dangerous_community(self):
assert _old_should_allow("dangerous", "community", force=True) is True

View file

@ -9,9 +9,24 @@ from tools.memory_tool import (
memory_tool,
_scan_memory_content,
ENTRY_DELIMITER,
MEMORY_SCHEMA,
)
# =========================================================================
# Tool schema guidance
# =========================================================================
class TestMemorySchema:
def test_discourages_diary_style_task_logs(self):
description = MEMORY_SCHEMA["description"]
assert "Do NOT save task progress" in description
assert "session_search" in description
assert "like a diary" not in description
assert "temporary task state" in description
assert ">80%" not in description
# =========================================================================
# Security scanning
# =========================================================================

View file

@ -2,6 +2,7 @@
import asyncio
import json
import os
import sys
from pathlib import Path
from types import SimpleNamespace
@ -29,6 +30,118 @@ def _install_telegram_mock(monkeypatch, bot):
class TestSendMessageTool:
def test_cron_duplicate_target_is_skipped_and_explained(self):
home = SimpleNamespace(chat_id="-1001")
config, _telegram_cfg = _make_config()
config.get_home_channel = lambda _platform: home
with patch.dict(
os.environ,
{
"HERMES_CRON_AUTO_DELIVER_PLATFORM": "telegram",
"HERMES_CRON_AUTO_DELIVER_CHAT_ID": "-1001",
},
clear=False,
), \
patch("gateway.config.load_gateway_config", return_value=config), \
patch("tools.interrupt.is_interrupted", return_value=False), \
patch("model_tools._run_async", side_effect=_run_async_immediately), \
patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})) as send_mock, \
patch("gateway.mirror.mirror_to_session", return_value=True) as mirror_mock:
result = json.loads(
send_message_tool(
{
"action": "send",
"target": "telegram",
"message": "hello",
}
)
)
assert result["success"] is True
assert result["skipped"] is True
assert result["reason"] == "cron_auto_delivery_duplicate_target"
assert "final response" in result["note"]
send_mock.assert_not_awaited()
mirror_mock.assert_not_called()
def test_cron_different_target_still_sends(self):
config, telegram_cfg = _make_config()
with patch.dict(
os.environ,
{
"HERMES_CRON_AUTO_DELIVER_PLATFORM": "telegram",
"HERMES_CRON_AUTO_DELIVER_CHAT_ID": "-1001",
},
clear=False,
), \
patch("gateway.config.load_gateway_config", return_value=config), \
patch("tools.interrupt.is_interrupted", return_value=False), \
patch("model_tools._run_async", side_effect=_run_async_immediately), \
patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})) as send_mock, \
patch("gateway.mirror.mirror_to_session", return_value=True) as mirror_mock:
result = json.loads(
send_message_tool(
{
"action": "send",
"target": "telegram:-1002",
"message": "hello",
}
)
)
assert result["success"] is True
assert result.get("skipped") is not True
send_mock.assert_awaited_once_with(
Platform.TELEGRAM,
telegram_cfg,
"-1002",
"hello",
thread_id=None,
media_files=[],
)
mirror_mock.assert_called_once_with("telegram", "-1002", "hello", source_label="cli", thread_id=None)
def test_cron_same_chat_different_thread_still_sends(self):
config, telegram_cfg = _make_config()
with patch.dict(
os.environ,
{
"HERMES_CRON_AUTO_DELIVER_PLATFORM": "telegram",
"HERMES_CRON_AUTO_DELIVER_CHAT_ID": "-1001",
"HERMES_CRON_AUTO_DELIVER_THREAD_ID": "17585",
},
clear=False,
), \
patch("gateway.config.load_gateway_config", return_value=config), \
patch("tools.interrupt.is_interrupted", return_value=False), \
patch("model_tools._run_async", side_effect=_run_async_immediately), \
patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})) as send_mock, \
patch("gateway.mirror.mirror_to_session", return_value=True) as mirror_mock:
result = json.loads(
send_message_tool(
{
"action": "send",
"target": "telegram:-1001:99999",
"message": "hello",
}
)
)
assert result["success"] is True
assert result.get("skipped") is not True
send_mock.assert_awaited_once_with(
Platform.TELEGRAM,
telegram_cfg,
"-1001",
"hello",
thread_id="99999",
media_files=[],
)
mirror_mock.assert_called_once_with("telegram", "-1001", "hello", source_label="cli", thread_id="99999")
def test_sends_to_explicit_telegram_topic_target(self):
config, telegram_cfg = _make_config()

View file

@ -9,9 +9,21 @@ from tools.session_search_tool import (
_format_conversation,
_truncate_around_matches,
MAX_SESSION_CHARS,
SESSION_SEARCH_SCHEMA,
)
# =========================================================================
# Tool schema guidance
# =========================================================================
class TestSessionSearchSchema:
def test_keeps_cross_session_recall_guidance_without_current_session_nudge(self):
description = SESSION_SEARCH_SCHEMA["description"]
assert "past conversations" in description
assert "recent turns of the current session" not in description
# =========================================================================
# _format_timestamp
# =========================================================================

View file

@ -46,9 +46,9 @@ from tools.skills_guard import (
class TestResolveTrustLevel:
def test_builtin_not_exposed(self):
# builtin is only used internally, not resolved from source string
assert _resolve_trust_level("openai/skills") == "trusted"
def test_official_sources_resolve_to_builtin(self):
assert _resolve_trust_level("official") == "builtin"
assert _resolve_trust_level("official/email/agentmail") == "builtin"
def test_trusted_repos(self):
assert _resolve_trust_level("openai/skills") == "trusted"
@ -116,11 +116,17 @@ class TestShouldAllowInstall:
allowed, _ = should_allow_install(self._result("trusted", "caution", f))
assert allowed is True
def test_dangerous_blocked_even_trusted(self):
def test_trusted_dangerous_blocked_without_force(self):
f = [Finding("x", "critical", "c", "f", 1, "m", "d")]
allowed, _ = should_allow_install(self._result("trusted", "dangerous", f))
assert allowed is False
def test_builtin_dangerous_allowed_without_force(self):
f = [Finding("x", "critical", "c", "f", 1, "m", "d")]
allowed, reason = should_allow_install(self._result("builtin", "dangerous", f))
assert allowed is True
assert "builtin source" in reason
def test_force_overrides_caution(self):
f = [Finding("x", "high", "c", "f", 1, "m", "d")]
allowed, reason = should_allow_install(self._result("community", "caution", f), force=True)
@ -132,22 +138,21 @@ class TestShouldAllowInstall:
allowed, _ = should_allow_install(self._result("community", "dangerous", f), force=False)
assert allowed is False
def test_force_never_overrides_dangerous(self):
"""--force must not bypass dangerous verdict (regression test)."""
def test_force_overrides_dangerous_for_community(self):
f = [Finding("x", "critical", "c", "f", 1, "m", "d")]
allowed, reason = should_allow_install(
self._result("community", "dangerous", f), force=True
)
assert allowed is False
assert "DANGEROUS" in reason
assert allowed is True
assert "Force-installed" in reason
def test_force_never_overrides_dangerous_trusted(self):
"""--force must not bypass dangerous even for trusted sources."""
def test_force_overrides_dangerous_for_trusted(self):
f = [Finding("x", "critical", "c", "f", 1, "m", "d")]
allowed, _ = should_allow_install(
allowed, reason = should_allow_install(
self._result("trusted", "dangerous", f), force=True
)
assert allowed is False
assert allowed is True
assert "Force-installed" in reason
# ---------------------------------------------------------------------------