merge: resolve conflicts with origin/main

- gateway/run.py: Take main's _resolve_gateway_model() helper
- hermes_cli/setup.py: Re-apply nous-api removal after merge brought
  it back. Fix provider_idx offset (Custom is now index 3, not 4).
- tests/hermes_cli/test_setup.py: Fix custom setup test index (3→4)
This commit is contained in:
teknium1 2026-03-12 00:29:04 -07:00
commit 2192b17670
19 changed files with 1708 additions and 368 deletions

View file

@ -8,6 +8,8 @@ from agent.prompt_builder import (
_scan_context_content,
_truncate_content,
_read_skill_description,
_read_skill_conditions,
_skill_should_show,
build_skills_system_prompt,
build_context_files_prompt,
CONTEXT_FILE_MAX_CHARS,
@ -277,3 +279,177 @@ class TestPromptBuilderConstants:
assert "telegram" in PLATFORM_HINTS
assert "discord" in PLATFORM_HINTS
assert "cli" in PLATFORM_HINTS
# =========================================================================
# Conditional skill activation
# =========================================================================
class TestReadSkillConditions:
def test_no_conditions_returns_empty_lists(self, tmp_path):
skill_file = tmp_path / "SKILL.md"
skill_file.write_text("---\nname: test\ndescription: A skill\n---\n")
conditions = _read_skill_conditions(skill_file)
assert conditions["fallback_for_toolsets"] == []
assert conditions["requires_toolsets"] == []
assert conditions["fallback_for_tools"] == []
assert conditions["requires_tools"] == []
def test_reads_fallback_for_toolsets(self, tmp_path):
skill_file = tmp_path / "SKILL.md"
skill_file.write_text(
"---\nname: ddg\ndescription: DuckDuckGo\nmetadata:\n hermes:\n fallback_for_toolsets: [web]\n---\n"
)
conditions = _read_skill_conditions(skill_file)
assert conditions["fallback_for_toolsets"] == ["web"]
def test_reads_requires_toolsets(self, tmp_path):
skill_file = tmp_path / "SKILL.md"
skill_file.write_text(
"---\nname: openhue\ndescription: Hue lights\nmetadata:\n hermes:\n requires_toolsets: [terminal]\n---\n"
)
conditions = _read_skill_conditions(skill_file)
assert conditions["requires_toolsets"] == ["terminal"]
def test_reads_multiple_conditions(self, tmp_path):
skill_file = tmp_path / "SKILL.md"
skill_file.write_text(
"---\nname: test\ndescription: Test\nmetadata:\n hermes:\n fallback_for_toolsets: [browser]\n requires_tools: [terminal]\n---\n"
)
conditions = _read_skill_conditions(skill_file)
assert conditions["fallback_for_toolsets"] == ["browser"]
assert conditions["requires_tools"] == ["terminal"]
def test_missing_file_returns_empty(self, tmp_path):
conditions = _read_skill_conditions(tmp_path / "missing.md")
assert conditions == {}
class TestSkillShouldShow:
def test_no_filter_info_always_shows(self):
assert _skill_should_show({}, None, None) is True
def test_empty_conditions_always_shows(self):
assert _skill_should_show(
{"fallback_for_toolsets": [], "requires_toolsets": [],
"fallback_for_tools": [], "requires_tools": []},
{"web_search"}, {"web"}
) is True
def test_fallback_hidden_when_toolset_available(self):
conditions = {"fallback_for_toolsets": ["web"], "requires_toolsets": [],
"fallback_for_tools": [], "requires_tools": []}
assert _skill_should_show(conditions, set(), {"web"}) is False
def test_fallback_shown_when_toolset_unavailable(self):
conditions = {"fallback_for_toolsets": ["web"], "requires_toolsets": [],
"fallback_for_tools": [], "requires_tools": []}
assert _skill_should_show(conditions, set(), set()) is True
def test_requires_shown_when_toolset_available(self):
conditions = {"fallback_for_toolsets": [], "requires_toolsets": ["terminal"],
"fallback_for_tools": [], "requires_tools": []}
assert _skill_should_show(conditions, set(), {"terminal"}) is True
def test_requires_hidden_when_toolset_missing(self):
conditions = {"fallback_for_toolsets": [], "requires_toolsets": ["terminal"],
"fallback_for_tools": [], "requires_tools": []}
assert _skill_should_show(conditions, set(), set()) is False
def test_fallback_for_tools_hidden_when_tool_available(self):
conditions = {"fallback_for_toolsets": [], "requires_toolsets": [],
"fallback_for_tools": ["web_search"], "requires_tools": []}
assert _skill_should_show(conditions, {"web_search"}, set()) is False
def test_fallback_for_tools_shown_when_tool_missing(self):
conditions = {"fallback_for_toolsets": [], "requires_toolsets": [],
"fallback_for_tools": ["web_search"], "requires_tools": []}
assert _skill_should_show(conditions, set(), set()) is True
def test_requires_tools_hidden_when_tool_missing(self):
conditions = {"fallback_for_toolsets": [], "requires_toolsets": [],
"fallback_for_tools": [], "requires_tools": ["terminal"]}
assert _skill_should_show(conditions, set(), set()) is False
def test_requires_tools_shown_when_tool_available(self):
conditions = {"fallback_for_toolsets": [], "requires_toolsets": [],
"fallback_for_tools": [], "requires_tools": ["terminal"]}
assert _skill_should_show(conditions, {"terminal"}, set()) is True
class TestBuildSkillsSystemPromptConditional:
def test_fallback_skill_hidden_when_primary_available(self, monkeypatch, tmp_path):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
skill_dir = tmp_path / "skills" / "search" / "duckduckgo"
skill_dir.mkdir(parents=True)
(skill_dir / "SKILL.md").write_text(
"---\nname: duckduckgo\ndescription: Free web search\nmetadata:\n hermes:\n fallback_for_toolsets: [web]\n---\n"
)
result = build_skills_system_prompt(
available_tools=set(),
available_toolsets={"web"},
)
assert "duckduckgo" not in result
def test_fallback_skill_shown_when_primary_unavailable(self, monkeypatch, tmp_path):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
skill_dir = tmp_path / "skills" / "search" / "duckduckgo"
skill_dir.mkdir(parents=True)
(skill_dir / "SKILL.md").write_text(
"---\nname: duckduckgo\ndescription: Free web search\nmetadata:\n hermes:\n fallback_for_toolsets: [web]\n---\n"
)
result = build_skills_system_prompt(
available_tools=set(),
available_toolsets=set(),
)
assert "duckduckgo" in result
def test_requires_skill_hidden_when_toolset_missing(self, monkeypatch, tmp_path):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
skill_dir = tmp_path / "skills" / "iot" / "openhue"
skill_dir.mkdir(parents=True)
(skill_dir / "SKILL.md").write_text(
"---\nname: openhue\ndescription: Hue lights\nmetadata:\n hermes:\n requires_toolsets: [terminal]\n---\n"
)
result = build_skills_system_prompt(
available_tools=set(),
available_toolsets=set(),
)
assert "openhue" not in result
def test_requires_skill_shown_when_toolset_available(self, monkeypatch, tmp_path):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
skill_dir = tmp_path / "skills" / "iot" / "openhue"
skill_dir.mkdir(parents=True)
(skill_dir / "SKILL.md").write_text(
"---\nname: openhue\ndescription: Hue lights\nmetadata:\n hermes:\n requires_toolsets: [terminal]\n---\n"
)
result = build_skills_system_prompt(
available_tools=set(),
available_toolsets={"terminal"},
)
assert "openhue" in result
def test_unconditional_skill_always_shown(self, monkeypatch, tmp_path):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
skill_dir = tmp_path / "skills" / "general" / "notes"
skill_dir.mkdir(parents=True)
(skill_dir / "SKILL.md").write_text(
"---\nname: notes\ndescription: Take notes\n---\n"
)
result = build_skills_system_prompt(
available_tools=set(),
available_toolsets=set(),
)
assert "notes" in result
def test_no_args_shows_all_skills(self, monkeypatch, tmp_path):
"""Backward compat: calling with no args shows everything."""
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
skill_dir = tmp_path / "skills" / "search" / "duckduckgo"
skill_dir.mkdir(parents=True)
(skill_dir / "SKILL.md").write_text(
"---\nname: duckduckgo\ndescription: Free web search\nmetadata:\n hermes:\n fallback_for_toolsets: [web]\n---\n"
)
result = build_skills_system_prompt()
assert "duckduckgo" in result

View file

@ -0,0 +1,249 @@
"""Tests for Discord free-response defaults and mention gating."""
from datetime import datetime, timezone
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock
import sys
import pytest
from gateway.config import PlatformConfig
def _ensure_discord_mock():
"""Install a mock discord module when discord.py isn't available."""
if "discord" in sys.modules and hasattr(sys.modules["discord"], "__file__"):
return
discord_mod = MagicMock()
discord_mod.Intents.default.return_value = MagicMock()
discord_mod.Client = MagicMock
discord_mod.File = MagicMock
discord_mod.DMChannel = type("DMChannel", (), {})
discord_mod.Thread = type("Thread", (), {})
discord_mod.ForumChannel = type("ForumChannel", (), {})
discord_mod.ui = SimpleNamespace(View=object, button=lambda *a, **k: (lambda fn: fn), Button=object)
discord_mod.ButtonStyle = SimpleNamespace(success=1, primary=2, danger=3, green=1, blurple=2, red=3)
discord_mod.Color = SimpleNamespace(orange=lambda: 1, green=lambda: 2, blue=lambda: 3, red=lambda: 4)
discord_mod.Interaction = object
discord_mod.Embed = MagicMock
ext_mod = MagicMock()
commands_mod = MagicMock()
commands_mod.Bot = MagicMock
ext_mod.commands = commands_mod
sys.modules.setdefault("discord", discord_mod)
sys.modules.setdefault("discord.ext", ext_mod)
sys.modules.setdefault("discord.ext.commands", commands_mod)
_ensure_discord_mock()
import gateway.platforms.discord as discord_platform # noqa: E402
from gateway.platforms.discord import DiscordAdapter # noqa: E402
class FakeDMChannel:
def __init__(self, channel_id: int = 1, name: str = "dm"):
self.id = channel_id
self.name = name
class FakeTextChannel:
def __init__(self, channel_id: int = 1, name: str = "general", guild_name: str = "Hermes Server"):
self.id = channel_id
self.name = name
self.guild = SimpleNamespace(name=guild_name)
self.topic = None
class FakeForumChannel:
def __init__(self, channel_id: int = 1, name: str = "support-forum", guild_name: str = "Hermes Server"):
self.id = channel_id
self.name = name
self.guild = SimpleNamespace(name=guild_name)
self.type = 15
self.topic = None
class FakeThread:
def __init__(self, channel_id: int = 1, name: str = "thread", parent=None, guild_name: str = "Hermes Server"):
self.id = channel_id
self.name = name
self.parent = parent
self.parent_id = getattr(parent, "id", None)
self.guild = getattr(parent, "guild", None) or SimpleNamespace(name=guild_name)
self.topic = None
@pytest.fixture
def adapter(monkeypatch):
monkeypatch.setattr(discord_platform.discord, "DMChannel", FakeDMChannel, raising=False)
monkeypatch.setattr(discord_platform.discord, "Thread", FakeThread, raising=False)
monkeypatch.setattr(discord_platform.discord, "ForumChannel", FakeForumChannel, raising=False)
config = PlatformConfig(enabled=True, token="fake-token")
adapter = DiscordAdapter(config)
adapter._client = SimpleNamespace(user=SimpleNamespace(id=999))
adapter.handle_message = AsyncMock()
return adapter
def make_message(*, channel, content: str, mentions=None):
author = SimpleNamespace(id=42, display_name="Jezza", name="Jezza")
return SimpleNamespace(
id=123,
content=content,
mentions=list(mentions or []),
attachments=[],
reference=None,
created_at=datetime.now(timezone.utc),
channel=channel,
author=author,
)
@pytest.mark.asyncio
async def test_discord_defaults_to_require_mention(adapter, monkeypatch):
"""Default behavior: require @mention in server channels."""
monkeypatch.delenv("DISCORD_REQUIRE_MENTION", raising=False)
monkeypatch.delenv("DISCORD_FREE_RESPONSE_CHANNELS", raising=False)
message = make_message(channel=FakeTextChannel(channel_id=123), content="hello from channel")
await adapter._handle_message(message)
# Should be ignored — no mention, require_mention defaults to true
adapter.handle_message.assert_not_awaited()
@pytest.mark.asyncio
async def test_discord_free_response_in_server_channels(adapter, monkeypatch):
monkeypatch.setenv("DISCORD_REQUIRE_MENTION", "false")
monkeypatch.delenv("DISCORD_FREE_RESPONSE_CHANNELS", raising=False)
message = make_message(channel=FakeTextChannel(channel_id=123), content="hello from channel")
await adapter._handle_message(message)
adapter.handle_message.assert_awaited_once()
event = adapter.handle_message.await_args.args[0]
assert event.text == "hello from channel"
assert event.source.chat_id == "123"
assert event.source.chat_type == "group"
@pytest.mark.asyncio
async def test_discord_free_response_in_threads(adapter, monkeypatch):
monkeypatch.setenv("DISCORD_REQUIRE_MENTION", "false")
monkeypatch.delenv("DISCORD_FREE_RESPONSE_CHANNELS", raising=False)
thread = FakeThread(channel_id=456, name="Ghost reader skill")
message = make_message(channel=thread, content="hello from thread")
await adapter._handle_message(message)
adapter.handle_message.assert_awaited_once()
event = adapter.handle_message.await_args.args[0]
assert event.text == "hello from thread"
assert event.source.chat_id == "456"
assert event.source.thread_id == "456"
assert event.source.chat_type == "thread"
@pytest.mark.asyncio
async def test_discord_forum_threads_are_handled_as_threads(adapter, monkeypatch):
monkeypatch.setenv("DISCORD_REQUIRE_MENTION", "false")
monkeypatch.delenv("DISCORD_FREE_RESPONSE_CHANNELS", raising=False)
forum = FakeForumChannel(channel_id=222, name="support-forum")
thread = FakeThread(channel_id=456, name="Can Hermes reply here?", parent=forum)
message = make_message(channel=thread, content="hello from forum post")
await adapter._handle_message(message)
adapter.handle_message.assert_awaited_once()
event = adapter.handle_message.await_args.args[0]
assert event.text == "hello from forum post"
assert event.source.chat_id == "456"
assert event.source.thread_id == "456"
assert event.source.chat_type == "thread"
assert event.source.chat_name == "Hermes Server / support-forum / Can Hermes reply here?"
@pytest.mark.asyncio
async def test_discord_can_still_require_mentions_when_enabled(adapter, monkeypatch):
monkeypatch.setenv("DISCORD_REQUIRE_MENTION", "true")
monkeypatch.delenv("DISCORD_FREE_RESPONSE_CHANNELS", raising=False)
message = make_message(channel=FakeTextChannel(channel_id=789), content="ignored without mention")
await adapter._handle_message(message)
adapter.handle_message.assert_not_awaited()
@pytest.mark.asyncio
async def test_discord_free_response_channel_overrides_mention_requirement(adapter, monkeypatch):
monkeypatch.setenv("DISCORD_REQUIRE_MENTION", "true")
monkeypatch.setenv("DISCORD_FREE_RESPONSE_CHANNELS", "789,999")
message = make_message(channel=FakeTextChannel(channel_id=789), content="allowed without mention")
await adapter._handle_message(message)
adapter.handle_message.assert_awaited_once()
event = adapter.handle_message.await_args.args[0]
assert event.text == "allowed without mention"
@pytest.mark.asyncio
async def test_discord_forum_parent_in_free_response_list_allows_forum_thread(adapter, monkeypatch):
monkeypatch.setenv("DISCORD_REQUIRE_MENTION", "true")
monkeypatch.setenv("DISCORD_FREE_RESPONSE_CHANNELS", "222")
forum = FakeForumChannel(channel_id=222, name="support-forum")
thread = FakeThread(channel_id=333, name="Forum topic", parent=forum)
message = make_message(channel=thread, content="allowed from forum thread")
await adapter._handle_message(message)
adapter.handle_message.assert_awaited_once()
event = adapter.handle_message.await_args.args[0]
assert event.text == "allowed from forum thread"
assert event.source.chat_id == "333"
@pytest.mark.asyncio
async def test_discord_accepts_and_strips_bot_mentions_when_required(adapter, monkeypatch):
monkeypatch.setenv("DISCORD_REQUIRE_MENTION", "true")
monkeypatch.delenv("DISCORD_FREE_RESPONSE_CHANNELS", raising=False)
bot_user = adapter._client.user
message = make_message(
channel=FakeTextChannel(channel_id=321),
content=f"<@{bot_user.id}> hello with mention",
mentions=[bot_user],
)
await adapter._handle_message(message)
adapter.handle_message.assert_awaited_once()
event = adapter.handle_message.await_args.args[0]
assert event.text == "hello with mention"
@pytest.mark.asyncio
async def test_discord_dms_ignore_mention_requirement(adapter, monkeypatch):
monkeypatch.setenv("DISCORD_REQUIRE_MENTION", "true")
monkeypatch.delenv("DISCORD_FREE_RESPONSE_CHANNELS", raising=False)
message = make_message(channel=FakeDMChannel(channel_id=654), content="dm without mention")
await adapter._handle_message(message)
adapter.handle_message.assert_awaited_once()
event = adapter.handle_message.await_args.args[0]
assert event.text == "dm without mention"
assert event.source.chat_type == "dm"

View file

@ -0,0 +1,130 @@
import json
from hermes_cli.auth import _update_config_for_provider, get_active_provider
from hermes_cli.config import load_config, save_config
from hermes_cli.setup import setup_model_provider
def _clear_provider_env(monkeypatch):
for key in (
"NOUS_API_KEY",
"OPENROUTER_API_KEY",
"OPENAI_BASE_URL",
"OPENAI_API_KEY",
"LLM_MODEL",
):
monkeypatch.delenv(key, raising=False)
def test_nous_api_setup_preserves_model_provider_metadata(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
_clear_provider_env(monkeypatch)
config = load_config()
monkeypatch.setattr("hermes_cli.setup.prompt_choice", lambda *args, **kwargs: 0)
prompt_values = iter(
[
"nous-api-key",
"",
"meta-llama/Llama-4-Maverick-17B-128E-Instruct-FP8",
]
)
monkeypatch.setattr(
"hermes_cli.setup.prompt",
lambda *args, **kwargs: next(prompt_values),
)
setup_model_provider(config)
save_config(config)
reloaded = load_config()
assert isinstance(reloaded["model"], dict)
assert reloaded["model"]["provider"] == "nous-api"
assert reloaded["model"]["base_url"] == "https://inference-api.nousresearch.com/v1"
assert (
reloaded["model"]["default"]
== "meta-llama/Llama-4-Maverick-17B-128E-Instruct-FP8"
)
def test_nous_oauth_setup_keeps_current_model_when_syncing_disk_provider(
tmp_path, monkeypatch
):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
_clear_provider_env(monkeypatch)
config = load_config()
prompt_choices = iter([1, 2])
monkeypatch.setattr(
"hermes_cli.setup.prompt_choice",
lambda *args, **kwargs: next(prompt_choices),
)
monkeypatch.setattr("hermes_cli.setup.prompt", lambda *args, **kwargs: "")
def _fake_login_nous(*args, **kwargs):
auth_path = tmp_path / "auth.json"
auth_path.write_text(json.dumps({"active_provider": "nous", "providers": {}}))
_update_config_for_provider("nous", "https://inference.example.com/v1")
monkeypatch.setattr("hermes_cli.auth._login_nous", _fake_login_nous)
monkeypatch.setattr(
"hermes_cli.auth.resolve_nous_runtime_credentials",
lambda *args, **kwargs: {
"base_url": "https://inference.example.com/v1",
"api_key": "nous-key",
},
)
monkeypatch.setattr(
"hermes_cli.auth.fetch_nous_models",
lambda *args, **kwargs: ["gemini-3-flash"],
)
setup_model_provider(config)
save_config(config)
reloaded = load_config()
assert isinstance(reloaded["model"], dict)
assert reloaded["model"]["provider"] == "nous"
assert reloaded["model"]["base_url"] == "https://inference.example.com/v1"
assert reloaded["model"]["default"] == "anthropic/claude-opus-4.6"
def test_custom_setup_clears_active_oauth_provider(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
_clear_provider_env(monkeypatch)
auth_path = tmp_path / "auth.json"
auth_path.write_text(json.dumps({"active_provider": "nous", "providers": {}}))
config = load_config()
monkeypatch.setattr("hermes_cli.setup.prompt_choice", lambda *args, **kwargs: 3)
prompt_values = iter(
[
"https://custom.example/v1",
"custom-api-key",
"custom/model",
"",
]
)
monkeypatch.setattr(
"hermes_cli.setup.prompt",
lambda *args, **kwargs: next(prompt_values),
)
setup_model_provider(config)
save_config(config)
reloaded = load_config()
assert get_active_provider() is None
assert isinstance(reloaded["model"], dict)
assert reloaded["model"]["provider"] == "custom"
assert reloaded["model"]["base_url"] == "https://custom.example/v1"
assert reloaded["model"]["default"] == "custom/model"

View file

@ -249,6 +249,85 @@ class TestCronTimezone:
due = get_due_jobs()
assert len(due) == 1
def test_ensure_aware_naive_preserves_absolute_time(self):
"""_ensure_aware must preserve the absolute instant for naive datetimes.
Regression: the old code used replace(tzinfo=hermes_tz) which shifted
absolute time when system-local tz != Hermes tz. The fix interprets
naive values as system-local wall time, then converts.
"""
from cron.jobs import _ensure_aware
os.environ["HERMES_TIMEZONE"] = "Asia/Kolkata"
hermes_time.reset_cache()
# Create a naive datetime — will be interpreted as system-local time
naive_dt = datetime(2026, 3, 11, 12, 0, 0)
result = _ensure_aware(naive_dt)
# The result should be in Kolkata tz
assert result.tzinfo is not None
# The UTC equivalent must match what we'd get by correctly interpreting
# the naive dt as system-local time first, then converting
system_tz = datetime.now().astimezone().tzinfo
expected_utc = naive_dt.replace(tzinfo=system_tz).astimezone(timezone.utc)
actual_utc = result.astimezone(timezone.utc)
assert actual_utc == expected_utc, (
f"Absolute time shifted: expected {expected_utc}, got {actual_utc}"
)
def test_ensure_aware_normalizes_aware_to_hermes_tz(self):
"""Already-aware datetimes should be normalized to Hermes tz."""
from cron.jobs import _ensure_aware
os.environ["HERMES_TIMEZONE"] = "Asia/Kolkata"
hermes_time.reset_cache()
# Create an aware datetime in UTC
utc_dt = datetime(2026, 3, 11, 15, 0, 0, tzinfo=timezone.utc)
result = _ensure_aware(utc_dt)
# Must be in Hermes tz (Kolkata) but same absolute instant
kolkata = ZoneInfo("Asia/Kolkata")
assert result.utctimetuple()[:5] == (2026, 3, 11, 15, 0)
expected_local = utc_dt.astimezone(kolkata)
assert result == expected_local
def test_ensure_aware_due_job_not_skipped_when_system_ahead(self, tmp_path, monkeypatch):
"""Reproduce the actual bug: system tz ahead of Hermes tz caused
overdue jobs to appear as not-yet-due.
Scenario: system is Asia/Kolkata (UTC+5:30), Hermes is UTC.
A naive timestamp from 5 minutes ago (local time) should still
be recognized as due after conversion.
"""
import cron.jobs as jobs_module
monkeypatch.setattr(jobs_module, "CRON_DIR", tmp_path / "cron")
monkeypatch.setattr(jobs_module, "JOBS_FILE", tmp_path / "cron" / "jobs.json")
monkeypatch.setattr(jobs_module, "OUTPUT_DIR", tmp_path / "cron" / "output")
os.environ["HERMES_TIMEZONE"] = "UTC"
hermes_time.reset_cache()
from cron.jobs import create_job, load_jobs, save_jobs, get_due_jobs
job = create_job(prompt="Bug repro", schedule="every 1h")
jobs = load_jobs()
# Simulate a naive timestamp that was written by datetime.now() on a
# system running in UTC+5:30 — 5 minutes in the past (local time)
naive_past = (datetime.now() - timedelta(minutes=5)).isoformat()
jobs[0]["next_run_at"] = naive_past
save_jobs(jobs)
# Must be recognized as due regardless of tz mismatch
due = get_due_jobs()
assert len(due) == 1, (
"Overdue job was skipped — _ensure_aware likely shifted absolute time"
)
def test_create_job_stores_tz_aware_timestamps(self, tmp_path, monkeypatch):
"""New jobs store timezone-aware created_at and next_run_at."""
import cron.jobs as jobs_module

View file

@ -2323,3 +2323,127 @@ class TestMCPServerTaskSamplingIntegration:
kwargs = server._sampling.session_kwargs()
assert "sampling_callback" in kwargs
assert "sampling_capabilities" in kwargs
# ---------------------------------------------------------------------------
# Discovery failed_count tracking
# ---------------------------------------------------------------------------
class TestDiscoveryFailedCount:
"""Verify discover_mcp_tools() correctly tracks failed server connections."""
def test_failed_server_increments_failed_count(self):
"""When _discover_and_register_server raises, failed_count increments."""
from tools.mcp_tool import discover_mcp_tools, _servers, _ensure_mcp_loop
fake_config = {
"good_server": {"command": "npx", "args": ["good"]},
"bad_server": {"command": "npx", "args": ["bad"]},
}
async def fake_register(name, cfg):
if name == "bad_server":
raise ConnectionError("Connection refused")
# Simulate successful registration
from tools.mcp_tool import MCPServerTask
server = MCPServerTask(name)
server.session = MagicMock()
server._tools = [_make_mcp_tool("tool_a")]
_servers[name] = server
return [f"mcp_{name}_tool_a"]
with patch("tools.mcp_tool._load_mcp_config", return_value=fake_config), \
patch("tools.mcp_tool._discover_and_register_server", side_effect=fake_register), \
patch("tools.mcp_tool._MCP_AVAILABLE", True), \
patch("tools.mcp_tool._existing_tool_names", return_value=["mcp_good_server_tool_a"]):
_ensure_mcp_loop()
# Capture the logger to verify failed_count in summary
with patch("tools.mcp_tool.logger") as mock_logger:
discover_mcp_tools()
# Find the summary info call
info_calls = [
str(call)
for call in mock_logger.info.call_args_list
if "failed" in str(call).lower() or "MCP:" in str(call)
]
# The summary should mention the failure
assert any("1 failed" in str(c) for c in info_calls), (
f"Summary should report 1 failed server, got: {info_calls}"
)
_servers.pop("good_server", None)
_servers.pop("bad_server", None)
def test_all_servers_fail_still_prints_summary(self):
"""When all servers fail, a summary with failure count is still printed."""
from tools.mcp_tool import discover_mcp_tools, _servers, _ensure_mcp_loop
fake_config = {
"srv1": {"command": "npx", "args": ["a"]},
"srv2": {"command": "npx", "args": ["b"]},
}
async def always_fail(name, cfg):
raise ConnectionError(f"Server {name} refused")
with patch("tools.mcp_tool._load_mcp_config", return_value=fake_config), \
patch("tools.mcp_tool._discover_and_register_server", side_effect=always_fail), \
patch("tools.mcp_tool._MCP_AVAILABLE", True), \
patch("tools.mcp_tool._existing_tool_names", return_value=[]):
_ensure_mcp_loop()
with patch("tools.mcp_tool.logger") as mock_logger:
discover_mcp_tools()
# Summary must be printed even when all servers fail
info_calls = [str(call) for call in mock_logger.info.call_args_list]
assert any("2 failed" in str(c) for c in info_calls), (
f"Summary should report 2 failed servers, got: {info_calls}"
)
_servers.pop("srv1", None)
_servers.pop("srv2", None)
def test_ok_servers_excludes_failures(self):
"""ok_servers count correctly excludes failed servers."""
from tools.mcp_tool import discover_mcp_tools, _servers, _ensure_mcp_loop
fake_config = {
"ok1": {"command": "npx", "args": ["ok1"]},
"ok2": {"command": "npx", "args": ["ok2"]},
"fail1": {"command": "npx", "args": ["fail"]},
}
async def selective_register(name, cfg):
if name == "fail1":
raise ConnectionError("Refused")
from tools.mcp_tool import MCPServerTask
server = MCPServerTask(name)
server.session = MagicMock()
server._tools = [_make_mcp_tool("t")]
_servers[name] = server
return [f"mcp_{name}_t"]
with patch("tools.mcp_tool._load_mcp_config", return_value=fake_config), \
patch("tools.mcp_tool._discover_and_register_server", side_effect=selective_register), \
patch("tools.mcp_tool._MCP_AVAILABLE", True), \
patch("tools.mcp_tool._existing_tool_names", return_value=["mcp_ok1_t", "mcp_ok2_t"]):
_ensure_mcp_loop()
with patch("tools.mcp_tool.logger") as mock_logger:
discover_mcp_tools()
info_calls = [str(call) for call in mock_logger.info.call_args_list]
# Should say "2 server(s)" not "3 server(s)"
assert any("2 server" in str(c) for c in info_calls), (
f"Summary should report 2 ok servers, got: {info_calls}"
)
assert any("1 failed" in str(c) for c in info_calls), (
f"Summary should report 1 failed, got: {info_calls}"
)
_servers.pop("ok1", None)
_servers.pop("ok2", None)
_servers.pop("fail1", None)