merge: resolve conflicts with origin/main

This commit is contained in:
teknium1 2026-03-17 04:30:37 -07:00
commit 0897e4350e
100 changed files with 11637 additions and 1337 deletions

View file

@ -12,6 +12,7 @@ Run with: python -m pytest tests/test_delegate.py -v
import json
import os
import sys
import threading
import unittest
from unittest.mock import MagicMock, patch
@ -44,6 +45,7 @@ def _make_mock_parent(depth=0):
parent._session_db = None
parent._delegate_depth = depth
parent._active_children = []
parent._active_children_lock = threading.Lock()
return parent
@ -722,7 +724,12 @@ class TestDelegationProviderIntegration(unittest.TestCase):
}
parent = _make_mock_parent(depth=0)
with patch("tools.delegate_tool._run_single_child") as mock_run:
# Patch _build_child_agent since credentials are now passed there
# (agents are built in the main thread before being handed to workers)
with patch("tools.delegate_tool._build_child_agent") as mock_build, \
patch("tools.delegate_tool._run_single_child") as mock_run:
mock_child = MagicMock()
mock_build.return_value = mock_child
mock_run.return_value = {
"task_index": 0, "status": "completed",
"summary": "Done", "api_calls": 1, "duration_seconds": 1.0
@ -731,7 +738,8 @@ class TestDelegationProviderIntegration(unittest.TestCase):
tasks = [{"goal": "Task A"}, {"goal": "Task B"}]
delegate_task(tasks=tasks, parent_agent=parent)
for call in mock_run.call_args_list:
self.assertEqual(mock_build.call_count, 2)
for call in mock_build.call_args_list:
self.assertEqual(call.kwargs.get("model"), "meta-llama/llama-4-scout")
self.assertEqual(call.kwargs.get("override_provider"), "openrouter")
self.assertEqual(call.kwargs.get("override_base_url"), "https://openrouter.ai/api/v1")

View file

@ -0,0 +1,210 @@
"""Tests for probe_mcp_server_tools() in tools.mcp_tool."""
import asyncio
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
@pytest.fixture(autouse=True)
def _reset_mcp_state():
"""Ensure clean MCP module state before/after each test."""
import tools.mcp_tool as mcp
old_loop = mcp._mcp_loop
old_thread = mcp._mcp_thread
old_servers = dict(mcp._servers)
yield
mcp._servers.clear()
mcp._servers.update(old_servers)
mcp._mcp_loop = old_loop
mcp._mcp_thread = old_thread
class TestProbeMcpServerTools:
"""Tests for the lightweight probe_mcp_server_tools function."""
def test_returns_empty_when_mcp_not_available(self):
with patch("tools.mcp_tool._MCP_AVAILABLE", False):
from tools.mcp_tool import probe_mcp_server_tools
result = probe_mcp_server_tools()
assert result == {}
def test_returns_empty_when_no_config(self):
with patch("tools.mcp_tool._load_mcp_config", return_value={}):
from tools.mcp_tool import probe_mcp_server_tools
result = probe_mcp_server_tools()
assert result == {}
def test_returns_empty_when_all_servers_disabled(self):
config = {
"github": {"command": "npx", "enabled": False},
"slack": {"command": "npx", "enabled": "off"},
}
with patch("tools.mcp_tool._load_mcp_config", return_value=config):
from tools.mcp_tool import probe_mcp_server_tools
result = probe_mcp_server_tools()
assert result == {}
def test_returns_tools_from_successful_server(self):
"""Successfully probed server returns its tools list."""
config = {
"github": {"command": "npx", "connect_timeout": 5},
}
mock_tool_1 = SimpleNamespace(name="create_issue", description="Create a new issue")
mock_tool_2 = SimpleNamespace(name="search_repos", description="Search repositories")
mock_server = MagicMock()
mock_server._tools = [mock_tool_1, mock_tool_2]
mock_server.shutdown = AsyncMock()
async def fake_connect(name, cfg):
return mock_server
with patch("tools.mcp_tool._load_mcp_config", return_value=config), \
patch("tools.mcp_tool._connect_server", side_effect=fake_connect), \
patch("tools.mcp_tool._ensure_mcp_loop"), \
patch("tools.mcp_tool._run_on_mcp_loop") as mock_run, \
patch("tools.mcp_tool._stop_mcp_loop"):
# Simulate running the async probe
def run_coro(coro, timeout=120):
loop = asyncio.new_event_loop()
try:
return loop.run_until_complete(coro)
finally:
loop.close()
mock_run.side_effect = run_coro
from tools.mcp_tool import probe_mcp_server_tools
result = probe_mcp_server_tools()
assert "github" in result
assert len(result["github"]) == 2
assert result["github"][0] == ("create_issue", "Create a new issue")
assert result["github"][1] == ("search_repos", "Search repositories")
mock_server.shutdown.assert_awaited_once()
def test_failed_server_omitted_from_results(self):
"""Servers that fail to connect are silently skipped."""
config = {
"github": {"command": "npx", "connect_timeout": 5},
"broken": {"command": "nonexistent", "connect_timeout": 5},
}
mock_tool = SimpleNamespace(name="create_issue", description="Create")
mock_server = MagicMock()
mock_server._tools = [mock_tool]
mock_server.shutdown = AsyncMock()
async def fake_connect(name, cfg):
if name == "broken":
raise ConnectionError("Server not found")
return mock_server
with patch("tools.mcp_tool._load_mcp_config", return_value=config), \
patch("tools.mcp_tool._connect_server", side_effect=fake_connect), \
patch("tools.mcp_tool._ensure_mcp_loop"), \
patch("tools.mcp_tool._run_on_mcp_loop") as mock_run, \
patch("tools.mcp_tool._stop_mcp_loop"):
def run_coro(coro, timeout=120):
loop = asyncio.new_event_loop()
try:
return loop.run_until_complete(coro)
finally:
loop.close()
mock_run.side_effect = run_coro
from tools.mcp_tool import probe_mcp_server_tools
result = probe_mcp_server_tools()
assert "github" in result
assert "broken" not in result
def test_handles_tool_without_description(self):
"""Tools without descriptions get empty string."""
config = {"github": {"command": "npx", "connect_timeout": 5}}
mock_tool = SimpleNamespace(name="my_tool") # no description attribute
mock_server = MagicMock()
mock_server._tools = [mock_tool]
mock_server.shutdown = AsyncMock()
async def fake_connect(name, cfg):
return mock_server
with patch("tools.mcp_tool._load_mcp_config", return_value=config), \
patch("tools.mcp_tool._connect_server", side_effect=fake_connect), \
patch("tools.mcp_tool._ensure_mcp_loop"), \
patch("tools.mcp_tool._run_on_mcp_loop") as mock_run, \
patch("tools.mcp_tool._stop_mcp_loop"):
def run_coro(coro, timeout=120):
loop = asyncio.new_event_loop()
try:
return loop.run_until_complete(coro)
finally:
loop.close()
mock_run.side_effect = run_coro
from tools.mcp_tool import probe_mcp_server_tools
result = probe_mcp_server_tools()
assert result["github"][0] == ("my_tool", "")
def test_cleanup_called_even_on_failure(self):
"""_stop_mcp_loop is called even when probe fails."""
config = {"github": {"command": "npx", "connect_timeout": 5}}
with patch("tools.mcp_tool._load_mcp_config", return_value=config), \
patch("tools.mcp_tool._ensure_mcp_loop"), \
patch("tools.mcp_tool._run_on_mcp_loop", side_effect=RuntimeError("boom")), \
patch("tools.mcp_tool._stop_mcp_loop") as mock_stop:
from tools.mcp_tool import probe_mcp_server_tools
result = probe_mcp_server_tools()
assert result == {}
mock_stop.assert_called_once()
def test_skips_disabled_servers(self):
"""Disabled servers are not probed."""
config = {
"github": {"command": "npx", "connect_timeout": 5},
"disabled_one": {"command": "npx", "enabled": False},
}
mock_tool = SimpleNamespace(name="create_issue", description="Create")
mock_server = MagicMock()
mock_server._tools = [mock_tool]
mock_server.shutdown = AsyncMock()
connect_calls = []
async def fake_connect(name, cfg):
connect_calls.append(name)
return mock_server
with patch("tools.mcp_tool._load_mcp_config", return_value=config), \
patch("tools.mcp_tool._connect_server", side_effect=fake_connect), \
patch("tools.mcp_tool._ensure_mcp_loop"), \
patch("tools.mcp_tool._run_on_mcp_loop") as mock_run, \
patch("tools.mcp_tool._stop_mcp_loop"):
def run_coro(coro, timeout=120):
loop = asyncio.new_event_loop()
try:
return loop.run_until_complete(coro)
finally:
loop.close()
mock_run.side_effect = run_coro
from tools.mcp_tool import probe_mcp_server_tools
result = probe_mcp_server_tools()
assert "github" in result
assert "disabled_one" not in result
assert "disabled_one" not in connect_calls

View file

@ -2596,17 +2596,19 @@ class TestMCPSelectiveToolLoading:
async def run():
with patch("tools.mcp_tool._connect_server", side_effect=fake_connect), \
patch.dict("tools.mcp_tool._servers", {}, clear=True), \
patch("tools.registry.registry", mock_registry), \
patch("toolsets.create_custom_toolset"):
return await _discover_and_register_server(
registered = await _discover_and_register_server(
"ink_existing",
{"url": "https://mcp.example.com", "tools": {"include": ["create_service"]}},
)
return registered, _existing_tool_names()
try:
registered = asyncio.run(run())
registered, existing = asyncio.run(run())
assert registered == ["mcp_ink_existing_create_service"]
assert _existing_tool_names() == ["mcp_ink_existing_create_service"]
assert existing == ["mcp_ink_existing_create_service"]
finally:
_servers.pop("ink_existing", None)

View file

@ -294,6 +294,61 @@ class TestCheckpoint:
recovered = registry.recover_from_checkpoint()
assert recovered == 0
def test_write_checkpoint_includes_watcher_metadata(self, registry, tmp_path):
with patch("tools.process_registry.CHECKPOINT_PATH", tmp_path / "procs.json"):
s = _make_session()
s.watcher_platform = "telegram"
s.watcher_chat_id = "999"
s.watcher_thread_id = "42"
s.watcher_interval = 60
registry._running[s.id] = s
registry._write_checkpoint()
data = json.loads((tmp_path / "procs.json").read_text())
assert len(data) == 1
assert data[0]["watcher_platform"] == "telegram"
assert data[0]["watcher_chat_id"] == "999"
assert data[0]["watcher_thread_id"] == "42"
assert data[0]["watcher_interval"] == 60
def test_recover_enqueues_watchers(self, registry, tmp_path):
checkpoint = tmp_path / "procs.json"
checkpoint.write_text(json.dumps([{
"session_id": "proc_live",
"command": "sleep 999",
"pid": os.getpid(), # current process — guaranteed alive
"task_id": "t1",
"session_key": "sk1",
"watcher_platform": "telegram",
"watcher_chat_id": "123",
"watcher_thread_id": "42",
"watcher_interval": 60,
}]))
with patch("tools.process_registry.CHECKPOINT_PATH", checkpoint):
recovered = registry.recover_from_checkpoint()
assert recovered == 1
assert len(registry.pending_watchers) == 1
w = registry.pending_watchers[0]
assert w["session_id"] == "proc_live"
assert w["platform"] == "telegram"
assert w["chat_id"] == "123"
assert w["thread_id"] == "42"
assert w["check_interval"] == 60
def test_recover_skips_watcher_when_no_interval(self, registry, tmp_path):
checkpoint = tmp_path / "procs.json"
checkpoint.write_text(json.dumps([{
"session_id": "proc_live",
"command": "sleep 999",
"pid": os.getpid(),
"task_id": "t1",
"watcher_interval": 0,
}]))
with patch("tools.process_registry.CHECKPOINT_PATH", checkpoint):
recovered = registry.recover_from_checkpoint()
assert recovered == 1
assert len(registry.pending_watchers) == 0
# =========================================================================
# Kill process

View file

@ -25,7 +25,7 @@ def _make_config():
def _install_telegram_mock(monkeypatch, bot):
parse_mode = SimpleNamespace(MARKDOWN_V2="MarkdownV2")
parse_mode = SimpleNamespace(MARKDOWN_V2="MarkdownV2", HTML="HTML")
constants_mod = SimpleNamespace(ParseMode=parse_mode)
telegram_mod = SimpleNamespace(Bot=lambda token: bot, constants=constants_mod)
monkeypatch.setitem(sys.modules, "telegram", telegram_mod)
@ -391,3 +391,97 @@ class TestSendToPlatformChunking:
assert len(sent_calls) >= 3
assert all(call == [] for call in sent_calls[:-1])
assert sent_calls[-1] == media
# ---------------------------------------------------------------------------
# HTML auto-detection in Telegram send
# ---------------------------------------------------------------------------
class TestSendTelegramHtmlDetection:
"""Verify that messages containing HTML tags are sent with parse_mode=HTML
and that plain / markdown messages use MarkdownV2."""
def _make_bot(self):
bot = MagicMock()
bot.send_message = AsyncMock(return_value=SimpleNamespace(message_id=1))
bot.send_photo = AsyncMock()
bot.send_video = AsyncMock()
bot.send_voice = AsyncMock()
bot.send_audio = AsyncMock()
bot.send_document = AsyncMock()
return bot
def test_html_message_uses_html_parse_mode(self, monkeypatch):
bot = self._make_bot()
_install_telegram_mock(monkeypatch, bot)
asyncio.run(
_send_telegram("tok", "123", "<b>Hello</b> world")
)
bot.send_message.assert_awaited_once()
kwargs = bot.send_message.await_args.kwargs
assert kwargs["parse_mode"] == "HTML"
assert kwargs["text"] == "<b>Hello</b> world"
def test_plain_text_uses_markdown_v2(self, monkeypatch):
bot = self._make_bot()
_install_telegram_mock(monkeypatch, bot)
asyncio.run(
_send_telegram("tok", "123", "Just plain text, no tags")
)
bot.send_message.assert_awaited_once()
kwargs = bot.send_message.await_args.kwargs
assert kwargs["parse_mode"] == "MarkdownV2"
def test_html_with_code_and_pre_tags(self, monkeypatch):
bot = self._make_bot()
_install_telegram_mock(monkeypatch, bot)
html = "<pre>code block</pre> and <code>inline</code>"
asyncio.run(_send_telegram("tok", "123", html))
kwargs = bot.send_message.await_args.kwargs
assert kwargs["parse_mode"] == "HTML"
def test_closing_tag_detected(self, monkeypatch):
bot = self._make_bot()
_install_telegram_mock(monkeypatch, bot)
asyncio.run(_send_telegram("tok", "123", "text </div> more"))
kwargs = bot.send_message.await_args.kwargs
assert kwargs["parse_mode"] == "HTML"
def test_angle_brackets_in_math_not_detected(self, monkeypatch):
"""Expressions like 'x < 5' or '3 > 2' should not trigger HTML mode."""
bot = self._make_bot()
_install_telegram_mock(monkeypatch, bot)
asyncio.run(_send_telegram("tok", "123", "if x < 5 then y > 2"))
kwargs = bot.send_message.await_args.kwargs
assert kwargs["parse_mode"] == "MarkdownV2"
def test_html_parse_failure_falls_back_to_plain(self, monkeypatch):
"""If Telegram rejects the HTML, fall back to plain text."""
bot = self._make_bot()
bot.send_message = AsyncMock(
side_effect=[
Exception("Bad Request: can't parse entities: unsupported html tag"),
SimpleNamespace(message_id=2), # plain fallback succeeds
]
)
_install_telegram_mock(monkeypatch, bot)
result = asyncio.run(
_send_telegram("tok", "123", "<invalid>broken html</invalid>")
)
assert result["success"] is True
assert bot.send_message.await_count == 2
second_call = bot.send_message.await_args_list[1].kwargs
assert second_call["parse_mode"] is None

View file

@ -1,8 +1,11 @@
"""Tests for Firecrawl client configuration and singleton behavior.
"""Tests for web backend client configuration and singleton behavior.
Coverage:
_get_firecrawl_client() configuration matrix, singleton caching,
constructor failure recovery, return value verification, edge cases.
_get_backend() backend selection logic with env var combinations.
_get_parallel_client() Parallel client configuration, singleton caching.
check_web_api_key() unified availability check.
"""
import os
@ -117,3 +120,212 @@ class TestFirecrawlClientConfig:
from tools.web_tools import _get_firecrawl_client
with pytest.raises(ValueError):
_get_firecrawl_client()
class TestBackendSelection:
"""Test suite for _get_backend() backend selection logic.
The backend is configured via config.yaml (web.backend), set by
``hermes tools``. Falls back to key-based detection for legacy/manual
setups.
"""
_ENV_KEYS = ("PARALLEL_API_KEY", "FIRECRAWL_API_KEY", "FIRECRAWL_API_URL", "TAVILY_API_KEY")
def setup_method(self):
for key in self._ENV_KEYS:
os.environ.pop(key, None)
def teardown_method(self):
for key in self._ENV_KEYS:
os.environ.pop(key, None)
# ── Config-based selection (web.backend in config.yaml) ───────────
def test_config_parallel(self):
"""web.backend=parallel in config → 'parallel' regardless of keys."""
from tools.web_tools import _get_backend
with patch("tools.web_tools._load_web_config", return_value={"backend": "parallel"}):
assert _get_backend() == "parallel"
def test_config_firecrawl(self):
"""web.backend=firecrawl in config → 'firecrawl' even if Parallel key set."""
from tools.web_tools import _get_backend
with patch("tools.web_tools._load_web_config", return_value={"backend": "firecrawl"}), \
patch.dict(os.environ, {"PARALLEL_API_KEY": "test-key"}):
assert _get_backend() == "firecrawl"
def test_config_tavily(self):
"""web.backend=tavily in config → 'tavily' regardless of other keys."""
from tools.web_tools import _get_backend
with patch("tools.web_tools._load_web_config", return_value={"backend": "tavily"}):
assert _get_backend() == "tavily"
def test_config_tavily_overrides_env_keys(self):
"""web.backend=tavily in config → 'tavily' even if Firecrawl key set."""
from tools.web_tools import _get_backend
with patch("tools.web_tools._load_web_config", return_value={"backend": "tavily"}), \
patch.dict(os.environ, {"FIRECRAWL_API_KEY": "fc-test"}):
assert _get_backend() == "tavily"
def test_config_case_insensitive(self):
"""web.backend=Parallel (mixed case) → 'parallel'."""
from tools.web_tools import _get_backend
with patch("tools.web_tools._load_web_config", return_value={"backend": "Parallel"}):
assert _get_backend() == "parallel"
def test_config_tavily_case_insensitive(self):
"""web.backend=Tavily (mixed case) → 'tavily'."""
from tools.web_tools import _get_backend
with patch("tools.web_tools._load_web_config", return_value={"backend": "Tavily"}):
assert _get_backend() == "tavily"
# ── Fallback (no web.backend in config) ───────────────────────────
def test_fallback_parallel_only_key(self):
"""Only PARALLEL_API_KEY set → 'parallel'."""
from tools.web_tools import _get_backend
with patch("tools.web_tools._load_web_config", return_value={}), \
patch.dict(os.environ, {"PARALLEL_API_KEY": "test-key"}):
assert _get_backend() == "parallel"
def test_fallback_tavily_only_key(self):
"""Only TAVILY_API_KEY set → 'tavily'."""
from tools.web_tools import _get_backend
with patch("tools.web_tools._load_web_config", return_value={}), \
patch.dict(os.environ, {"TAVILY_API_KEY": "tvly-test"}):
assert _get_backend() == "tavily"
def test_fallback_tavily_with_firecrawl_prefers_firecrawl(self):
"""Tavily + Firecrawl keys, no config → 'firecrawl' (backward compat)."""
from tools.web_tools import _get_backend
with patch("tools.web_tools._load_web_config", return_value={}), \
patch.dict(os.environ, {"TAVILY_API_KEY": "tvly-test", "FIRECRAWL_API_KEY": "fc-test"}):
assert _get_backend() == "firecrawl"
def test_fallback_tavily_with_parallel_prefers_parallel(self):
"""Tavily + Parallel keys, no config → 'parallel' (Parallel takes priority over Tavily)."""
from tools.web_tools import _get_backend
with patch("tools.web_tools._load_web_config", return_value={}), \
patch.dict(os.environ, {"TAVILY_API_KEY": "tvly-test", "PARALLEL_API_KEY": "par-test"}):
# Parallel + no Firecrawl → parallel
assert _get_backend() == "parallel"
def test_fallback_both_keys_defaults_to_firecrawl(self):
"""Both keys set, no config → 'firecrawl' (backward compat)."""
from tools.web_tools import _get_backend
with patch("tools.web_tools._load_web_config", return_value={}), \
patch.dict(os.environ, {"PARALLEL_API_KEY": "test-key", "FIRECRAWL_API_KEY": "fc-test"}):
assert _get_backend() == "firecrawl"
def test_fallback_firecrawl_only_key(self):
"""Only FIRECRAWL_API_KEY set → 'firecrawl'."""
from tools.web_tools import _get_backend
with patch("tools.web_tools._load_web_config", return_value={}), \
patch.dict(os.environ, {"FIRECRAWL_API_KEY": "fc-test"}):
assert _get_backend() == "firecrawl"
def test_fallback_no_keys_defaults_to_firecrawl(self):
"""No keys, no config → 'firecrawl' (will fail at client init)."""
from tools.web_tools import _get_backend
with patch("tools.web_tools._load_web_config", return_value={}):
assert _get_backend() == "firecrawl"
def test_invalid_config_falls_through_to_fallback(self):
"""web.backend=invalid → ignored, uses key-based fallback."""
from tools.web_tools import _get_backend
with patch("tools.web_tools._load_web_config", return_value={"backend": "nonexistent"}), \
patch.dict(os.environ, {"PARALLEL_API_KEY": "test-key"}):
assert _get_backend() == "parallel"
class TestParallelClientConfig:
"""Test suite for Parallel client initialization."""
def setup_method(self):
import tools.web_tools
tools.web_tools._parallel_client = None
os.environ.pop("PARALLEL_API_KEY", None)
def teardown_method(self):
import tools.web_tools
tools.web_tools._parallel_client = None
os.environ.pop("PARALLEL_API_KEY", None)
def test_creates_client_with_key(self):
"""PARALLEL_API_KEY set → creates Parallel client."""
with patch.dict(os.environ, {"PARALLEL_API_KEY": "test-key"}):
from tools.web_tools import _get_parallel_client
from parallel import Parallel
client = _get_parallel_client()
assert client is not None
assert isinstance(client, Parallel)
def test_no_key_raises_with_helpful_message(self):
"""No PARALLEL_API_KEY → ValueError with guidance."""
from tools.web_tools import _get_parallel_client
with pytest.raises(ValueError, match="PARALLEL_API_KEY"):
_get_parallel_client()
def test_singleton_returns_same_instance(self):
"""Second call returns cached client."""
with patch.dict(os.environ, {"PARALLEL_API_KEY": "test-key"}):
from tools.web_tools import _get_parallel_client
client1 = _get_parallel_client()
client2 = _get_parallel_client()
assert client1 is client2
class TestCheckWebApiKey:
"""Test suite for check_web_api_key() unified availability check."""
_ENV_KEYS = ("PARALLEL_API_KEY", "FIRECRAWL_API_KEY", "FIRECRAWL_API_URL", "TAVILY_API_KEY")
def setup_method(self):
for key in self._ENV_KEYS:
os.environ.pop(key, None)
def teardown_method(self):
for key in self._ENV_KEYS:
os.environ.pop(key, None)
def test_parallel_key_only(self):
with patch.dict(os.environ, {"PARALLEL_API_KEY": "test-key"}):
from tools.web_tools import check_web_api_key
assert check_web_api_key() is True
def test_firecrawl_key_only(self):
with patch.dict(os.environ, {"FIRECRAWL_API_KEY": "fc-test"}):
from tools.web_tools import check_web_api_key
assert check_web_api_key() is True
def test_firecrawl_url_only(self):
with patch.dict(os.environ, {"FIRECRAWL_API_URL": "http://localhost:3002"}):
from tools.web_tools import check_web_api_key
assert check_web_api_key() is True
def test_tavily_key_only(self):
with patch.dict(os.environ, {"TAVILY_API_KEY": "tvly-test"}):
from tools.web_tools import check_web_api_key
assert check_web_api_key() is True
def test_no_keys_returns_false(self):
from tools.web_tools import check_web_api_key
assert check_web_api_key() is False
def test_both_keys_returns_true(self):
with patch.dict(os.environ, {
"PARALLEL_API_KEY": "test-key",
"FIRECRAWL_API_KEY": "fc-test",
}):
from tools.web_tools import check_web_api_key
assert check_web_api_key() is True
def test_all_three_keys_returns_true(self):
with patch.dict(os.environ, {
"PARALLEL_API_KEY": "test-key",
"FIRECRAWL_API_KEY": "fc-test",
"TAVILY_API_KEY": "tvly-test",
}):
from tools.web_tools import check_web_api_key
assert check_web_api_key() is True

View file

@ -0,0 +1,255 @@
"""Tests for Tavily web backend integration.
Coverage:
_tavily_request() API key handling, endpoint construction, error propagation.
_normalize_tavily_search_results() search response normalization.
_normalize_tavily_documents() extract/crawl response normalization, failed_results.
web_search_tool / web_extract_tool / web_crawl_tool Tavily dispatch paths.
"""
import json
import os
import asyncio
import pytest
from unittest.mock import patch, MagicMock
# ─── _tavily_request ─────────────────────────────────────────────────────────
class TestTavilyRequest:
"""Test suite for the _tavily_request helper."""
def test_raises_without_api_key(self):
"""No TAVILY_API_KEY → ValueError with guidance."""
with patch.dict(os.environ, {}, clear=False):
os.environ.pop("TAVILY_API_KEY", None)
from tools.web_tools import _tavily_request
with pytest.raises(ValueError, match="TAVILY_API_KEY"):
_tavily_request("search", {"query": "test"})
def test_posts_with_api_key_in_body(self):
"""api_key is injected into the JSON payload."""
mock_response = MagicMock()
mock_response.json.return_value = {"results": []}
mock_response.raise_for_status = MagicMock()
with patch.dict(os.environ, {"TAVILY_API_KEY": "tvly-test-key"}):
with patch("tools.web_tools.httpx.post", return_value=mock_response) as mock_post:
from tools.web_tools import _tavily_request
result = _tavily_request("search", {"query": "hello"})
mock_post.assert_called_once()
call_kwargs = mock_post.call_args
payload = call_kwargs.kwargs.get("json") or call_kwargs[1].get("json")
assert payload["api_key"] == "tvly-test-key"
assert payload["query"] == "hello"
assert "api.tavily.com/search" in call_kwargs.args[0]
def test_raises_on_http_error(self):
"""Non-2xx responses propagate as httpx.HTTPStatusError."""
import httpx as _httpx
mock_response = MagicMock()
mock_response.raise_for_status.side_effect = _httpx.HTTPStatusError(
"401 Unauthorized", request=MagicMock(), response=mock_response
)
with patch.dict(os.environ, {"TAVILY_API_KEY": "tvly-bad-key"}):
with patch("tools.web_tools.httpx.post", return_value=mock_response):
from tools.web_tools import _tavily_request
with pytest.raises(_httpx.HTTPStatusError):
_tavily_request("search", {"query": "test"})
# ─── _normalize_tavily_search_results ─────────────────────────────────────────
class TestNormalizeTavilySearchResults:
"""Test search result normalization."""
def test_basic_normalization(self):
from tools.web_tools import _normalize_tavily_search_results
raw = {
"results": [
{"title": "Python Docs", "url": "https://docs.python.org", "content": "Official docs", "score": 0.9},
{"title": "Tutorial", "url": "https://example.com", "content": "A tutorial", "score": 0.8},
]
}
result = _normalize_tavily_search_results(raw)
assert result["success"] is True
web = result["data"]["web"]
assert len(web) == 2
assert web[0]["title"] == "Python Docs"
assert web[0]["url"] == "https://docs.python.org"
assert web[0]["description"] == "Official docs"
assert web[0]["position"] == 1
assert web[1]["position"] == 2
def test_empty_results(self):
from tools.web_tools import _normalize_tavily_search_results
result = _normalize_tavily_search_results({"results": []})
assert result["success"] is True
assert result["data"]["web"] == []
def test_missing_fields(self):
from tools.web_tools import _normalize_tavily_search_results
result = _normalize_tavily_search_results({"results": [{}]})
web = result["data"]["web"]
assert web[0]["title"] == ""
assert web[0]["url"] == ""
assert web[0]["description"] == ""
# ─── _normalize_tavily_documents ──────────────────────────────────────────────
class TestNormalizeTavilyDocuments:
"""Test extract/crawl document normalization."""
def test_basic_document(self):
from tools.web_tools import _normalize_tavily_documents
raw = {
"results": [{
"url": "https://example.com",
"title": "Example",
"raw_content": "Full page content here",
}]
}
docs = _normalize_tavily_documents(raw)
assert len(docs) == 1
assert docs[0]["url"] == "https://example.com"
assert docs[0]["title"] == "Example"
assert docs[0]["content"] == "Full page content here"
assert docs[0]["raw_content"] == "Full page content here"
assert docs[0]["metadata"]["sourceURL"] == "https://example.com"
def test_falls_back_to_content_when_no_raw_content(self):
from tools.web_tools import _normalize_tavily_documents
raw = {"results": [{"url": "https://example.com", "content": "Snippet"}]}
docs = _normalize_tavily_documents(raw)
assert docs[0]["content"] == "Snippet"
def test_failed_results_included(self):
from tools.web_tools import _normalize_tavily_documents
raw = {
"results": [],
"failed_results": [
{"url": "https://fail.com", "error": "timeout"},
],
}
docs = _normalize_tavily_documents(raw)
assert len(docs) == 1
assert docs[0]["url"] == "https://fail.com"
assert docs[0]["error"] == "timeout"
assert docs[0]["content"] == ""
def test_failed_urls_included(self):
from tools.web_tools import _normalize_tavily_documents
raw = {
"results": [],
"failed_urls": ["https://bad.com"],
}
docs = _normalize_tavily_documents(raw)
assert len(docs) == 1
assert docs[0]["url"] == "https://bad.com"
assert docs[0]["error"] == "extraction failed"
def test_fallback_url(self):
from tools.web_tools import _normalize_tavily_documents
raw = {"results": [{"content": "data"}]}
docs = _normalize_tavily_documents(raw, fallback_url="https://fallback.com")
assert docs[0]["url"] == "https://fallback.com"
# ─── web_search_tool (Tavily dispatch) ────────────────────────────────────────
class TestWebSearchTavily:
"""Test web_search_tool dispatch to Tavily."""
def test_search_dispatches_to_tavily(self):
mock_response = MagicMock()
mock_response.json.return_value = {
"results": [{"title": "Result", "url": "https://r.com", "content": "desc", "score": 0.9}]
}
mock_response.raise_for_status = MagicMock()
with patch("tools.web_tools._get_backend", return_value="tavily"), \
patch.dict(os.environ, {"TAVILY_API_KEY": "tvly-test"}), \
patch("tools.web_tools.httpx.post", return_value=mock_response), \
patch("tools.interrupt.is_interrupted", return_value=False):
from tools.web_tools import web_search_tool
result = json.loads(web_search_tool("test query", limit=3))
assert result["success"] is True
assert len(result["data"]["web"]) == 1
assert result["data"]["web"][0]["title"] == "Result"
# ─── web_extract_tool (Tavily dispatch) ───────────────────────────────────────
class TestWebExtractTavily:
"""Test web_extract_tool dispatch to Tavily."""
def test_extract_dispatches_to_tavily(self):
mock_response = MagicMock()
mock_response.json.return_value = {
"results": [{"url": "https://example.com", "raw_content": "Extracted content", "title": "Page"}]
}
mock_response.raise_for_status = MagicMock()
with patch("tools.web_tools._get_backend", return_value="tavily"), \
patch.dict(os.environ, {"TAVILY_API_KEY": "tvly-test"}), \
patch("tools.web_tools.httpx.post", return_value=mock_response), \
patch("tools.web_tools.process_content_with_llm", return_value=None):
from tools.web_tools import web_extract_tool
result = json.loads(asyncio.get_event_loop().run_until_complete(
web_extract_tool(["https://example.com"], use_llm_processing=False)
))
assert "results" in result
assert len(result["results"]) == 1
assert result["results"][0]["url"] == "https://example.com"
# ─── web_crawl_tool (Tavily dispatch) ─────────────────────────────────────────
class TestWebCrawlTavily:
"""Test web_crawl_tool dispatch to Tavily."""
def test_crawl_dispatches_to_tavily(self):
mock_response = MagicMock()
mock_response.json.return_value = {
"results": [
{"url": "https://example.com/page1", "raw_content": "Page 1 content", "title": "Page 1"},
{"url": "https://example.com/page2", "raw_content": "Page 2 content", "title": "Page 2"},
]
}
mock_response.raise_for_status = MagicMock()
with patch("tools.web_tools._get_backend", return_value="tavily"), \
patch.dict(os.environ, {"TAVILY_API_KEY": "tvly-test"}), \
patch("tools.web_tools.httpx.post", return_value=mock_response), \
patch("tools.web_tools.check_website_access", return_value=None), \
patch("tools.interrupt.is_interrupted", return_value=False):
from tools.web_tools import web_crawl_tool
result = json.loads(asyncio.get_event_loop().run_until_complete(
web_crawl_tool("https://example.com", use_llm_processing=False)
))
assert "results" in result
assert len(result["results"]) == 2
assert result["results"][0]["title"] == "Page 1"
def test_crawl_sends_instructions(self):
"""Instructions are included in the Tavily crawl payload."""
mock_response = MagicMock()
mock_response.json.return_value = {"results": []}
mock_response.raise_for_status = MagicMock()
with patch("tools.web_tools._get_backend", return_value="tavily"), \
patch.dict(os.environ, {"TAVILY_API_KEY": "tvly-test"}), \
patch("tools.web_tools.httpx.post", return_value=mock_response) as mock_post, \
patch("tools.web_tools.check_website_access", return_value=None), \
patch("tools.interrupt.is_interrupted", return_value=False):
from tools.web_tools import web_crawl_tool
asyncio.get_event_loop().run_until_complete(
web_crawl_tool("https://example.com", instructions="Find docs", use_llm_processing=False)
)
call_kwargs = mock_post.call_args
payload = call_kwargs.kwargs.get("json") or call_kwargs[1].get("json")
assert payload["instructions"] == "Find docs"
assert payload["url"] == "https://example.com"

View file

@ -0,0 +1,495 @@
import json
from pathlib import Path
import pytest
import yaml
from tools.website_policy import WebsitePolicyError, check_website_access, load_website_blocklist
def test_load_website_blocklist_merges_config_and_shared_file(tmp_path):
shared = tmp_path / "community-blocklist.txt"
shared.write_text("# comment\nexample.org\nsub.bad.net\n", encoding="utf-8")
config_path = tmp_path / "config.yaml"
config_path.write_text(
yaml.safe_dump(
{
"security": {
"website_blocklist": {
"enabled": True,
"domains": ["example.com", "https://www.evil.test/path"],
"shared_files": [str(shared)],
}
}
},
sort_keys=False,
),
encoding="utf-8",
)
policy = load_website_blocklist(config_path)
assert policy["enabled"] is True
assert {rule["pattern"] for rule in policy["rules"]} == {
"example.com",
"evil.test",
"example.org",
"sub.bad.net",
}
def test_check_website_access_matches_parent_domain_subdomains(tmp_path):
config_path = tmp_path / "config.yaml"
config_path.write_text(
yaml.safe_dump(
{
"security": {
"website_blocklist": {
"enabled": True,
"domains": ["example.com"],
}
}
},
sort_keys=False,
),
encoding="utf-8",
)
blocked = check_website_access("https://docs.example.com/page", config_path=config_path)
assert blocked is not None
assert blocked["host"] == "docs.example.com"
assert blocked["rule"] == "example.com"
def test_check_website_access_supports_wildcard_subdomains_only(tmp_path):
config_path = tmp_path / "config.yaml"
config_path.write_text(
yaml.safe_dump(
{
"security": {
"website_blocklist": {
"enabled": True,
"domains": ["*.tracking.example"],
}
}
},
sort_keys=False,
),
encoding="utf-8",
)
assert check_website_access("https://a.tracking.example", config_path=config_path) is not None
assert check_website_access("https://www.tracking.example", config_path=config_path) is not None
assert check_website_access("https://tracking.example", config_path=config_path) is None
def test_default_config_exposes_website_blocklist_shape():
from hermes_cli.config import DEFAULT_CONFIG
website_blocklist = DEFAULT_CONFIG["security"]["website_blocklist"]
assert website_blocklist["enabled"] is False
assert website_blocklist["domains"] == []
assert website_blocklist["shared_files"] == []
def test_load_website_blocklist_uses_enabled_default_when_section_missing(tmp_path):
config_path = tmp_path / "config.yaml"
config_path.write_text(yaml.safe_dump({"display": {"tool_progress": "all"}}, sort_keys=False), encoding="utf-8")
policy = load_website_blocklist(config_path)
assert policy == {"enabled": False, "rules": []}
def test_load_website_blocklist_raises_clean_error_for_invalid_domains_type(tmp_path):
config_path = tmp_path / "config.yaml"
config_path.write_text(
yaml.safe_dump(
{
"security": {
"website_blocklist": {
"enabled": True,
"domains": "example.com",
}
}
},
sort_keys=False,
),
encoding="utf-8",
)
with pytest.raises(WebsitePolicyError, match="security.website_blocklist.domains must be a list"):
load_website_blocklist(config_path)
def test_load_website_blocklist_raises_clean_error_for_invalid_shared_files_type(tmp_path):
config_path = tmp_path / "config.yaml"
config_path.write_text(
yaml.safe_dump(
{
"security": {
"website_blocklist": {
"enabled": True,
"shared_files": "community-blocklist.txt",
}
}
},
sort_keys=False,
),
encoding="utf-8",
)
with pytest.raises(WebsitePolicyError, match="security.website_blocklist.shared_files must be a list"):
load_website_blocklist(config_path)
def test_load_website_blocklist_raises_clean_error_for_invalid_top_level_config_type(tmp_path):
config_path = tmp_path / "config.yaml"
config_path.write_text(yaml.safe_dump(["not", "a", "mapping"], sort_keys=False), encoding="utf-8")
with pytest.raises(WebsitePolicyError, match="config root must be a mapping"):
load_website_blocklist(config_path)
def test_load_website_blocklist_raises_clean_error_for_invalid_security_type(tmp_path):
config_path = tmp_path / "config.yaml"
config_path.write_text(yaml.safe_dump({"security": []}, sort_keys=False), encoding="utf-8")
with pytest.raises(WebsitePolicyError, match="security must be a mapping"):
load_website_blocklist(config_path)
def test_load_website_blocklist_raises_clean_error_for_invalid_website_blocklist_type(tmp_path):
config_path = tmp_path / "config.yaml"
config_path.write_text(
yaml.safe_dump(
{
"security": {
"website_blocklist": "block everything",
}
},
sort_keys=False,
),
encoding="utf-8",
)
with pytest.raises(WebsitePolicyError, match="security.website_blocklist must be a mapping"):
load_website_blocklist(config_path)
def test_load_website_blocklist_raises_clean_error_for_invalid_enabled_type(tmp_path):
config_path = tmp_path / "config.yaml"
config_path.write_text(
yaml.safe_dump(
{
"security": {
"website_blocklist": {
"enabled": "false",
}
}
},
sort_keys=False,
),
encoding="utf-8",
)
with pytest.raises(WebsitePolicyError, match="security.website_blocklist.enabled must be a boolean"):
load_website_blocklist(config_path)
def test_load_website_blocklist_raises_clean_error_for_malformed_yaml(tmp_path):
config_path = tmp_path / "config.yaml"
config_path.write_text("security: [oops\n", encoding="utf-8")
with pytest.raises(WebsitePolicyError, match="Invalid config YAML"):
load_website_blocklist(config_path)
def test_load_website_blocklist_wraps_shared_file_read_errors(tmp_path, monkeypatch):
shared = tmp_path / "community-blocklist.txt"
shared.write_text("example.org\n", encoding="utf-8")
config_path = tmp_path / "config.yaml"
config_path.write_text(
yaml.safe_dump(
{
"security": {
"website_blocklist": {
"enabled": True,
"shared_files": [str(shared)],
}
}
},
sort_keys=False,
),
encoding="utf-8",
)
def failing_read_text(self, *args, **kwargs):
raise PermissionError("no permission")
monkeypatch.setattr(Path, "read_text", failing_read_text)
# Unreadable shared files are now warned and skipped (not raised),
# so the blocklist loads successfully but without those rules.
result = load_website_blocklist(config_path)
assert result["enabled"] is True
assert result["rules"] == [] # shared file rules skipped
def test_check_website_access_uses_dynamic_hermes_home(monkeypatch, tmp_path):
hermes_home = tmp_path / "hermes-home"
hermes_home.mkdir()
(hermes_home / "config.yaml").write_text(
yaml.safe_dump(
{
"security": {
"website_blocklist": {
"enabled": True,
"domains": ["dynamic.example"],
}
}
},
sort_keys=False,
),
encoding="utf-8",
)
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
blocked = check_website_access("https://dynamic.example/path")
assert blocked is not None
assert blocked["rule"] == "dynamic.example"
def test_check_website_access_blocks_scheme_less_urls(tmp_path):
config_path = tmp_path / "config.yaml"
config_path.write_text(
yaml.safe_dump(
{
"security": {
"website_blocklist": {
"enabled": True,
"domains": ["blocked.test"],
}
}
},
sort_keys=False,
),
encoding="utf-8",
)
blocked = check_website_access("www.blocked.test/path", config_path=config_path)
assert blocked is not None
assert blocked["host"] == "www.blocked.test"
assert blocked["rule"] == "blocked.test"
def test_browser_navigate_returns_policy_block(monkeypatch):
from tools import browser_tool
monkeypatch.setattr(
browser_tool,
"check_website_access",
lambda url: {
"host": "blocked.test",
"rule": "blocked.test",
"source": "config",
"message": "Blocked by website policy",
},
)
monkeypatch.setattr(
browser_tool,
"_run_browser_command",
lambda *args, **kwargs: pytest.fail("browser command should not run for blocked URL"),
)
result = json.loads(browser_tool.browser_navigate("https://blocked.test"))
assert result["success"] is False
assert result["blocked_by_policy"]["rule"] == "blocked.test"
def test_browser_navigate_allows_when_shared_file_missing(monkeypatch, tmp_path):
"""Missing shared blocklist files are warned and skipped, not fatal."""
from tools import browser_tool
config_path = tmp_path / "config.yaml"
config_path.write_text(
yaml.safe_dump(
{
"security": {
"website_blocklist": {
"enabled": True,
"shared_files": ["missing-blocklist.txt"],
}
}
},
sort_keys=False,
),
encoding="utf-8",
)
# check_website_access should return None (allow) — missing file is skipped
result = check_website_access("https://allowed.test", config_path=config_path)
assert result is None
@pytest.mark.asyncio
async def test_web_extract_short_circuits_blocked_url(monkeypatch):
from tools import web_tools
monkeypatch.setattr(
web_tools,
"check_website_access",
lambda url: {
"host": "blocked.test",
"rule": "blocked.test",
"source": "config",
"message": "Blocked by website policy",
},
)
monkeypatch.setattr(
web_tools,
"_get_firecrawl_client",
lambda: pytest.fail("firecrawl should not run for blocked URL"),
)
monkeypatch.setattr("tools.interrupt.is_interrupted", lambda: False)
result = json.loads(await web_tools.web_extract_tool(["https://blocked.test"], use_llm_processing=False))
assert result["results"][0]["url"] == "https://blocked.test"
assert "Blocked by website policy" in result["results"][0]["error"]
def test_check_website_access_fails_open_on_malformed_config(tmp_path, monkeypatch):
"""Malformed config with default path should fail open (return None), not crash."""
config_path = tmp_path / "config.yaml"
config_path.write_text("security: [oops\n", encoding="utf-8")
# With explicit config_path (test mode), errors propagate
with pytest.raises(WebsitePolicyError):
check_website_access("https://example.com", config_path=config_path)
# Simulate default path by pointing HERMES_HOME to tmp_path
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
from tools import website_policy
website_policy.invalidate_cache()
# With default path, errors are caught and fail open
result = check_website_access("https://example.com")
assert result is None # allowed, not crashed
@pytest.mark.asyncio
async def test_web_extract_blocks_redirected_final_url(monkeypatch):
from tools import web_tools
def fake_check(url):
if url == "https://allowed.test":
return None
if url == "https://blocked.test/final":
return {
"host": "blocked.test",
"rule": "blocked.test",
"source": "config",
"message": "Blocked by website policy",
}
pytest.fail(f"unexpected URL checked: {url}")
class FakeFirecrawlClient:
def scrape(self, url, formats):
return {
"markdown": "secret content",
"metadata": {
"title": "Redirected",
"sourceURL": "https://blocked.test/final",
},
}
monkeypatch.setattr(web_tools, "check_website_access", fake_check)
monkeypatch.setattr(web_tools, "_get_firecrawl_client", lambda: FakeFirecrawlClient())
monkeypatch.setattr("tools.interrupt.is_interrupted", lambda: False)
result = json.loads(await web_tools.web_extract_tool(["https://allowed.test"], use_llm_processing=False))
assert result["results"][0]["url"] == "https://blocked.test/final"
assert result["results"][0]["content"] == ""
assert result["results"][0]["blocked_by_policy"]["rule"] == "blocked.test"
@pytest.mark.asyncio
async def test_web_crawl_short_circuits_blocked_url(monkeypatch):
from tools import web_tools
# web_crawl_tool checks for Firecrawl env before website policy
monkeypatch.setenv("FIRECRAWL_API_KEY", "fake-key")
monkeypatch.setattr(
web_tools,
"check_website_access",
lambda url: {
"host": "blocked.test",
"rule": "blocked.test",
"source": "config",
"message": "Blocked by website policy",
},
)
monkeypatch.setattr(
web_tools,
"_get_firecrawl_client",
lambda: pytest.fail("firecrawl should not run for blocked crawl URL"),
)
monkeypatch.setattr("tools.interrupt.is_interrupted", lambda: False)
result = json.loads(await web_tools.web_crawl_tool("https://blocked.test", use_llm_processing=False))
assert result["results"][0]["url"] == "https://blocked.test"
assert result["results"][0]["blocked_by_policy"]["rule"] == "blocked.test"
@pytest.mark.asyncio
async def test_web_crawl_blocks_redirected_final_url(monkeypatch):
from tools import web_tools
# web_crawl_tool checks for Firecrawl env before website policy
monkeypatch.setenv("FIRECRAWL_API_KEY", "fake-key")
def fake_check(url):
if url == "https://allowed.test":
return None
if url == "https://blocked.test/final":
return {
"host": "blocked.test",
"rule": "blocked.test",
"source": "config",
"message": "Blocked by website policy",
}
pytest.fail(f"unexpected URL checked: {url}")
class FakeCrawlClient:
def crawl(self, url, **kwargs):
return {
"data": [
{
"markdown": "secret crawl content",
"metadata": {
"title": "Redirected crawl page",
"sourceURL": "https://blocked.test/final",
},
}
]
}
monkeypatch.setattr(web_tools, "check_website_access", fake_check)
monkeypatch.setattr(web_tools, "_get_firecrawl_client", lambda: FakeCrawlClient())
monkeypatch.setattr("tools.interrupt.is_interrupted", lambda: False)
result = json.loads(await web_tools.web_crawl_tool("https://allowed.test", use_llm_processing=False))
assert result["results"][0]["content"] == ""
assert result["results"][0]["error"] == "Blocked by website policy"
assert result["results"][0]["blocked_by_policy"]["rule"] == "blocked.test"