Merge PR #736: feat(honcho): async writes, memory modes, session title integration, setup CLI

Authored by erosika. Builds on #38 and #243.

Adds async write support, configurable memory modes, context prefetch pipeline,
4 new Honcho tools (honcho_context, honcho_profile, honcho_search, honcho_conclude),
full 'hermes honcho' CLI, session strategies, AI peer identity, recallMode A/B,
gateway lifecycle management, and comprehensive docs.

Cherry-picks fixes from PRs #831/#832 (adavyas).

Co-authored-by: erosika <erosika@users.noreply.github.com>
Co-authored-by: adavyas <adavyas@users.noreply.github.com>
This commit is contained in:
Teknium 2026-03-12 19:05:11 -07:00 committed by GitHub
commit 475dd58a8e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
26 changed files with 4688 additions and 355 deletions

View file

@ -0,0 +1,103 @@
"""Tests for gateway-owned Honcho lifecycle helpers."""
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from gateway.config import Platform
from gateway.platforms.base import MessageEvent
from gateway.session import SessionSource
def _make_runner():
from gateway.run import GatewayRunner
runner = object.__new__(GatewayRunner)
runner._honcho_managers = {}
runner._honcho_configs = {}
runner._running_agents = {}
runner._pending_messages = {}
runner._pending_approvals = {}
runner.adapters = {}
runner.hooks = MagicMock()
runner.hooks.emit = AsyncMock()
return runner
def _make_event(text="/reset"):
return MessageEvent(
text=text,
source=SessionSource(
platform=Platform.TELEGRAM,
chat_id="chat-1",
user_id="user-1",
user_name="alice",
),
)
class TestGatewayHonchoLifecycle:
def test_gateway_reuses_honcho_manager_for_session_key(self):
runner = _make_runner()
hcfg = SimpleNamespace(
enabled=True,
api_key="honcho-key",
ai_peer="hermes",
peer_name="alice",
context_tokens=123,
peer_memory_mode=lambda peer: "hybrid",
)
manager = MagicMock()
with (
patch("honcho_integration.client.HonchoClientConfig.from_global_config", return_value=hcfg),
patch("honcho_integration.client.get_honcho_client", return_value=MagicMock()),
patch("honcho_integration.session.HonchoSessionManager", return_value=manager) as mock_mgr_cls,
):
first_mgr, first_cfg = runner._get_or_create_gateway_honcho("session-key")
second_mgr, second_cfg = runner._get_or_create_gateway_honcho("session-key")
assert first_mgr is manager
assert second_mgr is manager
assert first_cfg is hcfg
assert second_cfg is hcfg
mock_mgr_cls.assert_called_once()
def test_gateway_skips_honcho_manager_when_disabled(self):
runner = _make_runner()
hcfg = SimpleNamespace(
enabled=False,
api_key="honcho-key",
ai_peer="hermes",
peer_name="alice",
)
with (
patch("honcho_integration.client.HonchoClientConfig.from_global_config", return_value=hcfg),
patch("honcho_integration.client.get_honcho_client") as mock_client,
patch("honcho_integration.session.HonchoSessionManager") as mock_mgr_cls,
):
manager, cfg = runner._get_or_create_gateway_honcho("session-key")
assert manager is None
assert cfg is hcfg
mock_client.assert_not_called()
mock_mgr_cls.assert_not_called()
@pytest.mark.asyncio
async def test_reset_shuts_down_gateway_honcho_manager(self):
runner = _make_runner()
event = _make_event()
runner._shutdown_gateway_honcho = MagicMock()
runner.session_store = MagicMock()
runner.session_store._generate_session_key.return_value = "gateway-key"
runner.session_store._entries = {
"gateway-key": SimpleNamespace(session_id="old-session"),
}
runner.session_store.reset_session.return_value = SimpleNamespace(session_id="new-session")
result = await runner._handle_reset_command(event)
runner._shutdown_gateway_honcho.assert_called_once_with("gateway-key")
assert "Session reset" in result

View file

@ -0,0 +1,560 @@
"""Tests for the async-memory Honcho improvements.
Covers:
- write_frequency parsing (async / turn / session / int)
- memory_mode parsing
- resolve_session_name with session_title
- HonchoSessionManager.save() routing per write_frequency
- async writer thread lifecycle and retry
- flush_all() drains pending messages
- shutdown() joins the thread
- memory_mode gating helpers (unit-level)
"""
import json
import queue
import threading
import time
from pathlib import Path
from unittest.mock import MagicMock, patch, call
import pytest
from honcho_integration.client import HonchoClientConfig
from honcho_integration.session import (
HonchoSession,
HonchoSessionManager,
_ASYNC_SHUTDOWN,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_session(**kwargs) -> HonchoSession:
return HonchoSession(
key=kwargs.get("key", "cli:test"),
user_peer_id=kwargs.get("user_peer_id", "eri"),
assistant_peer_id=kwargs.get("assistant_peer_id", "hermes"),
honcho_session_id=kwargs.get("honcho_session_id", "cli-test"),
messages=kwargs.get("messages", []),
)
def _make_manager(write_frequency="turn", memory_mode="hybrid") -> HonchoSessionManager:
cfg = HonchoClientConfig(
write_frequency=write_frequency,
memory_mode=memory_mode,
api_key="test-key",
enabled=True,
)
mgr = HonchoSessionManager(config=cfg)
mgr._honcho = MagicMock()
return mgr
# ---------------------------------------------------------------------------
# write_frequency parsing from config file
# ---------------------------------------------------------------------------
class TestWriteFrequencyParsing:
def test_string_async(self, tmp_path):
cfg_file = tmp_path / "config.json"
cfg_file.write_text(json.dumps({"apiKey": "k", "writeFrequency": "async"}))
cfg = HonchoClientConfig.from_global_config(config_path=cfg_file)
assert cfg.write_frequency == "async"
def test_string_turn(self, tmp_path):
cfg_file = tmp_path / "config.json"
cfg_file.write_text(json.dumps({"apiKey": "k", "writeFrequency": "turn"}))
cfg = HonchoClientConfig.from_global_config(config_path=cfg_file)
assert cfg.write_frequency == "turn"
def test_string_session(self, tmp_path):
cfg_file = tmp_path / "config.json"
cfg_file.write_text(json.dumps({"apiKey": "k", "writeFrequency": "session"}))
cfg = HonchoClientConfig.from_global_config(config_path=cfg_file)
assert cfg.write_frequency == "session"
def test_integer_frequency(self, tmp_path):
cfg_file = tmp_path / "config.json"
cfg_file.write_text(json.dumps({"apiKey": "k", "writeFrequency": 5}))
cfg = HonchoClientConfig.from_global_config(config_path=cfg_file)
assert cfg.write_frequency == 5
def test_integer_string_coerced(self, tmp_path):
cfg_file = tmp_path / "config.json"
cfg_file.write_text(json.dumps({"apiKey": "k", "writeFrequency": "3"}))
cfg = HonchoClientConfig.from_global_config(config_path=cfg_file)
assert cfg.write_frequency == 3
def test_host_block_overrides_root(self, tmp_path):
cfg_file = tmp_path / "config.json"
cfg_file.write_text(json.dumps({
"apiKey": "k",
"writeFrequency": "turn",
"hosts": {"hermes": {"writeFrequency": "session"}},
}))
cfg = HonchoClientConfig.from_global_config(config_path=cfg_file)
assert cfg.write_frequency == "session"
def test_defaults_to_async(self, tmp_path):
cfg_file = tmp_path / "config.json"
cfg_file.write_text(json.dumps({"apiKey": "k"}))
cfg = HonchoClientConfig.from_global_config(config_path=cfg_file)
assert cfg.write_frequency == "async"
# ---------------------------------------------------------------------------
# memory_mode parsing from config file
# ---------------------------------------------------------------------------
class TestMemoryModeParsing:
def test_hybrid(self, tmp_path):
cfg_file = tmp_path / "config.json"
cfg_file.write_text(json.dumps({"apiKey": "k", "memoryMode": "hybrid"}))
cfg = HonchoClientConfig.from_global_config(config_path=cfg_file)
assert cfg.memory_mode == "hybrid"
def test_honcho_only(self, tmp_path):
cfg_file = tmp_path / "config.json"
cfg_file.write_text(json.dumps({"apiKey": "k", "memoryMode": "honcho"}))
cfg = HonchoClientConfig.from_global_config(config_path=cfg_file)
assert cfg.memory_mode == "honcho"
def test_defaults_to_hybrid(self, tmp_path):
cfg_file = tmp_path / "config.json"
cfg_file.write_text(json.dumps({"apiKey": "k"}))
cfg = HonchoClientConfig.from_global_config(config_path=cfg_file)
assert cfg.memory_mode == "hybrid"
def test_host_block_overrides_root(self, tmp_path):
cfg_file = tmp_path / "config.json"
cfg_file.write_text(json.dumps({
"apiKey": "k",
"memoryMode": "hybrid",
"hosts": {"hermes": {"memoryMode": "honcho"}},
}))
cfg = HonchoClientConfig.from_global_config(config_path=cfg_file)
assert cfg.memory_mode == "honcho"
def test_object_form_sets_default_and_overrides(self, tmp_path):
cfg_file = tmp_path / "config.json"
cfg_file.write_text(json.dumps({
"apiKey": "k",
"hosts": {"hermes": {"memoryMode": {
"default": "hybrid",
"hermes": "honcho",
}}},
}))
cfg = HonchoClientConfig.from_global_config(config_path=cfg_file)
assert cfg.memory_mode == "hybrid"
assert cfg.peer_memory_mode("hermes") == "honcho"
assert cfg.peer_memory_mode("unknown") == "hybrid" # falls through to default
def test_object_form_no_default_falls_back_to_hybrid(self, tmp_path):
cfg_file = tmp_path / "config.json"
cfg_file.write_text(json.dumps({
"apiKey": "k",
"hosts": {"hermes": {"memoryMode": {"hermes": "honcho"}}},
}))
cfg = HonchoClientConfig.from_global_config(config_path=cfg_file)
assert cfg.memory_mode == "hybrid"
assert cfg.peer_memory_mode("hermes") == "honcho"
assert cfg.peer_memory_mode("other") == "hybrid"
def test_global_string_host_object_override(self, tmp_path):
"""Host object form overrides global string."""
cfg_file = tmp_path / "config.json"
cfg_file.write_text(json.dumps({
"apiKey": "k",
"memoryMode": "honcho",
"hosts": {"hermes": {"memoryMode": {"default": "hybrid", "hermes": "honcho"}}},
}))
cfg = HonchoClientConfig.from_global_config(config_path=cfg_file)
assert cfg.memory_mode == "hybrid" # host default wins over global "honcho"
assert cfg.peer_memory_mode("hermes") == "honcho"
# ---------------------------------------------------------------------------
# resolve_session_name with session_title
# ---------------------------------------------------------------------------
class TestResolveSessionNameTitle:
def test_manual_override_beats_title(self):
cfg = HonchoClientConfig(sessions={"/my/project": "manual-name"})
result = cfg.resolve_session_name("/my/project", session_title="the-title")
assert result == "manual-name"
def test_title_beats_dirname(self):
cfg = HonchoClientConfig()
result = cfg.resolve_session_name("/some/dir", session_title="my-project")
assert result == "my-project"
def test_title_with_peer_prefix(self):
cfg = HonchoClientConfig(peer_name="eri", session_peer_prefix=True)
result = cfg.resolve_session_name("/some/dir", session_title="aeris")
assert result == "eri-aeris"
def test_title_sanitized(self):
cfg = HonchoClientConfig()
result = cfg.resolve_session_name("/some/dir", session_title="my project/name!")
# trailing dashes stripped by .strip('-')
assert result == "my-project-name"
def test_title_all_invalid_chars_falls_back_to_dirname(self):
cfg = HonchoClientConfig()
result = cfg.resolve_session_name("/some/dir", session_title="!!! ###")
# sanitized to empty → falls back to dirname
assert result == "dir"
def test_none_title_falls_back_to_dirname(self):
cfg = HonchoClientConfig()
result = cfg.resolve_session_name("/some/dir", session_title=None)
assert result == "dir"
def test_empty_title_falls_back_to_dirname(self):
cfg = HonchoClientConfig()
result = cfg.resolve_session_name("/some/dir", session_title="")
assert result == "dir"
def test_per_session_uses_session_id(self):
cfg = HonchoClientConfig(session_strategy="per-session")
result = cfg.resolve_session_name("/some/dir", session_id="20260309_175514_9797dd")
assert result == "20260309_175514_9797dd"
def test_per_session_with_peer_prefix(self):
cfg = HonchoClientConfig(session_strategy="per-session", peer_name="eri", session_peer_prefix=True)
result = cfg.resolve_session_name("/some/dir", session_id="20260309_175514_9797dd")
assert result == "eri-20260309_175514_9797dd"
def test_per_session_no_id_falls_back_to_dirname(self):
cfg = HonchoClientConfig(session_strategy="per-session")
result = cfg.resolve_session_name("/some/dir", session_id=None)
assert result == "dir"
def test_title_beats_session_id(self):
cfg = HonchoClientConfig(session_strategy="per-session")
result = cfg.resolve_session_name("/some/dir", session_title="my-title", session_id="20260309_175514_9797dd")
assert result == "my-title"
def test_manual_beats_session_id(self):
cfg = HonchoClientConfig(session_strategy="per-session", sessions={"/some/dir": "pinned"})
result = cfg.resolve_session_name("/some/dir", session_id="20260309_175514_9797dd")
assert result == "pinned"
def test_global_strategy_returns_workspace(self):
cfg = HonchoClientConfig(session_strategy="global", workspace_id="my-workspace")
result = cfg.resolve_session_name("/some/dir")
assert result == "my-workspace"
# ---------------------------------------------------------------------------
# save() routing per write_frequency
# ---------------------------------------------------------------------------
class TestSaveRouting:
def _make_session_with_message(self, mgr=None):
sess = _make_session()
sess.add_message("user", "hello")
sess.add_message("assistant", "hi")
if mgr:
mgr._cache[sess.key] = sess
return sess
def test_turn_flushes_immediately(self):
mgr = _make_manager(write_frequency="turn")
sess = self._make_session_with_message(mgr)
with patch.object(mgr, "_flush_session") as mock_flush:
mgr.save(sess)
mock_flush.assert_called_once_with(sess)
def test_session_mode_does_not_flush(self):
mgr = _make_manager(write_frequency="session")
sess = self._make_session_with_message(mgr)
with patch.object(mgr, "_flush_session") as mock_flush:
mgr.save(sess)
mock_flush.assert_not_called()
def test_async_mode_enqueues(self):
mgr = _make_manager(write_frequency="async")
sess = self._make_session_with_message(mgr)
with patch.object(mgr, "_flush_session") as mock_flush:
mgr.save(sess)
# flush_session should NOT be called synchronously
mock_flush.assert_not_called()
assert not mgr._async_queue.empty()
def test_int_frequency_flushes_on_nth_turn(self):
mgr = _make_manager(write_frequency=3)
sess = self._make_session_with_message(mgr)
with patch.object(mgr, "_flush_session") as mock_flush:
mgr.save(sess) # turn 1
mgr.save(sess) # turn 2
assert mock_flush.call_count == 0
mgr.save(sess) # turn 3
assert mock_flush.call_count == 1
def test_int_frequency_skips_other_turns(self):
mgr = _make_manager(write_frequency=5)
sess = self._make_session_with_message(mgr)
with patch.object(mgr, "_flush_session") as mock_flush:
for _ in range(4):
mgr.save(sess)
assert mock_flush.call_count == 0
mgr.save(sess) # turn 5
assert mock_flush.call_count == 1
# ---------------------------------------------------------------------------
# flush_all()
# ---------------------------------------------------------------------------
class TestFlushAll:
def test_flushes_all_cached_sessions(self):
mgr = _make_manager(write_frequency="session")
s1 = _make_session(key="s1", honcho_session_id="s1")
s2 = _make_session(key="s2", honcho_session_id="s2")
s1.add_message("user", "a")
s2.add_message("user", "b")
mgr._cache = {"s1": s1, "s2": s2}
with patch.object(mgr, "_flush_session") as mock_flush:
mgr.flush_all()
assert mock_flush.call_count == 2
def test_flush_all_drains_async_queue(self):
mgr = _make_manager(write_frequency="async")
sess = _make_session()
sess.add_message("user", "pending")
mgr._async_queue.put(sess)
with patch.object(mgr, "_flush_session") as mock_flush:
mgr.flush_all()
# Called at least once for the queued item
assert mock_flush.call_count >= 1
def test_flush_all_tolerates_errors(self):
mgr = _make_manager(write_frequency="session")
sess = _make_session()
mgr._cache = {"key": sess}
with patch.object(mgr, "_flush_session", side_effect=RuntimeError("oops")):
# Should not raise
mgr.flush_all()
# ---------------------------------------------------------------------------
# async writer thread lifecycle
# ---------------------------------------------------------------------------
class TestAsyncWriterThread:
def test_thread_started_on_async_mode(self):
mgr = _make_manager(write_frequency="async")
assert mgr._async_thread is not None
assert mgr._async_thread.is_alive()
mgr.shutdown()
def test_no_thread_for_turn_mode(self):
mgr = _make_manager(write_frequency="turn")
assert mgr._async_thread is None
assert mgr._async_queue is None
def test_shutdown_joins_thread(self):
mgr = _make_manager(write_frequency="async")
assert mgr._async_thread.is_alive()
mgr.shutdown()
assert not mgr._async_thread.is_alive()
def test_async_writer_calls_flush(self):
mgr = _make_manager(write_frequency="async")
sess = _make_session()
sess.add_message("user", "async msg")
flushed = []
def capture(s):
flushed.append(s)
return True
mgr._flush_session = capture
mgr._async_queue.put(sess)
# Give the daemon thread time to process
deadline = time.time() + 2.0
while not flushed and time.time() < deadline:
time.sleep(0.05)
mgr.shutdown()
assert len(flushed) == 1
assert flushed[0] is sess
def test_shutdown_sentinel_stops_loop(self):
mgr = _make_manager(write_frequency="async")
thread = mgr._async_thread
mgr.shutdown()
thread.join(timeout=3)
assert not thread.is_alive()
# ---------------------------------------------------------------------------
# async retry on failure
# ---------------------------------------------------------------------------
class TestAsyncWriterRetry:
def test_retries_once_on_failure(self):
mgr = _make_manager(write_frequency="async")
sess = _make_session()
sess.add_message("user", "msg")
call_count = [0]
def flaky_flush(s):
call_count[0] += 1
if call_count[0] == 1:
raise ConnectionError("network blip")
# second call succeeds silently
mgr._flush_session = flaky_flush
with patch("time.sleep"): # skip the 2s sleep in retry
mgr._async_queue.put(sess)
deadline = time.time() + 3.0
while call_count[0] < 2 and time.time() < deadline:
time.sleep(0.05)
mgr.shutdown()
assert call_count[0] == 2
def test_drops_after_two_failures(self):
mgr = _make_manager(write_frequency="async")
sess = _make_session()
sess.add_message("user", "msg")
call_count = [0]
def always_fail(s):
call_count[0] += 1
raise RuntimeError("always broken")
mgr._flush_session = always_fail
with patch("time.sleep"):
mgr._async_queue.put(sess)
deadline = time.time() + 3.0
while call_count[0] < 2 and time.time() < deadline:
time.sleep(0.05)
mgr.shutdown()
# Should have tried exactly twice (initial + one retry) and not crashed
assert call_count[0] == 2
assert not mgr._async_thread.is_alive()
def test_retries_when_flush_reports_failure(self):
mgr = _make_manager(write_frequency="async")
sess = _make_session()
sess.add_message("user", "msg")
call_count = [0]
def fail_then_succeed(_session):
call_count[0] += 1
return call_count[0] > 1
mgr._flush_session = fail_then_succeed
with patch("time.sleep"):
mgr._async_queue.put(sess)
deadline = time.time() + 3.0
while call_count[0] < 2 and time.time() < deadline:
time.sleep(0.05)
mgr.shutdown()
assert call_count[0] == 2
class TestMemoryFileMigrationTargets:
def test_soul_upload_targets_ai_peer(self, tmp_path):
mgr = _make_manager(write_frequency="turn")
session = _make_session(
key="cli:test",
user_peer_id="custom-user",
assistant_peer_id="custom-ai",
honcho_session_id="cli-test",
)
mgr._cache[session.key] = session
user_peer = MagicMock(name="user-peer")
ai_peer = MagicMock(name="ai-peer")
mgr._peers_cache[session.user_peer_id] = user_peer
mgr._peers_cache[session.assistant_peer_id] = ai_peer
honcho_session = MagicMock()
mgr._sessions_cache[session.honcho_session_id] = honcho_session
(tmp_path / "MEMORY.md").write_text("memory facts", encoding="utf-8")
(tmp_path / "USER.md").write_text("user profile", encoding="utf-8")
(tmp_path / "SOUL.md").write_text("ai identity", encoding="utf-8")
uploaded = mgr.migrate_memory_files(session.key, str(tmp_path))
assert uploaded is True
assert honcho_session.upload_file.call_count == 3
peer_by_upload_name = {}
for call_args in honcho_session.upload_file.call_args_list:
payload = call_args.kwargs["file"]
peer_by_upload_name[payload[0]] = call_args.kwargs["peer"]
assert peer_by_upload_name["consolidated_memory.md"] is user_peer
assert peer_by_upload_name["user_profile.md"] is user_peer
assert peer_by_upload_name["agent_soul.md"] is ai_peer
# ---------------------------------------------------------------------------
# HonchoClientConfig dataclass defaults for new fields
# ---------------------------------------------------------------------------
class TestNewConfigFieldDefaults:
def test_write_frequency_default(self):
cfg = HonchoClientConfig()
assert cfg.write_frequency == "async"
def test_memory_mode_default(self):
cfg = HonchoClientConfig()
assert cfg.memory_mode == "hybrid"
def test_write_frequency_set(self):
cfg = HonchoClientConfig(write_frequency="turn")
assert cfg.write_frequency == "turn"
def test_memory_mode_set(self):
cfg = HonchoClientConfig(memory_mode="honcho")
assert cfg.memory_mode == "honcho"
def test_peer_memory_mode_falls_back_to_global(self):
cfg = HonchoClientConfig(memory_mode="honcho")
assert cfg.peer_memory_mode("any-peer") == "honcho"
def test_peer_memory_mode_override(self):
cfg = HonchoClientConfig(memory_mode="hybrid", peer_memory_modes={"hermes": "honcho"})
assert cfg.peer_memory_mode("hermes") == "honcho"
assert cfg.peer_memory_mode("other") == "hybrid"
class TestPrefetchCacheAccessors:
def test_set_and_pop_context_result(self):
mgr = _make_manager(write_frequency="turn")
payload = {"representation": "Known user", "card": "prefers concise replies"}
mgr.set_context_result("cli:test", payload)
assert mgr.pop_context_result("cli:test") == payload
assert mgr.pop_context_result("cli:test") == {}
def test_set_and_pop_dialectic_result(self):
mgr = _make_manager(write_frequency="turn")
mgr.set_dialectic_result("cli:test", "Resume with toolset cleanup")
assert mgr.pop_dialectic_result("cli:test") == "Resume with toolset cleanup"
assert mgr.pop_dialectic_result("cli:test") == ""

View file

@ -0,0 +1,29 @@
"""Tests for Honcho CLI helpers."""
from honcho_integration.cli import _resolve_api_key
class TestResolveApiKey:
def test_prefers_host_scoped_key(self):
cfg = {
"apiKey": "root-key",
"hosts": {
"hermes": {
"apiKey": "host-key",
}
},
}
assert _resolve_api_key(cfg) == "host-key"
def test_falls_back_to_root_key(self):
cfg = {
"apiKey": "root-key",
"hosts": {"hermes": {}},
}
assert _resolve_api_key(cfg) == "root-key"
def test_falls_back_to_env_key(self, monkeypatch):
monkeypatch.setenv("HONCHO_API_KEY", "env-key")
assert _resolve_api_key({}) == "env-key"
monkeypatch.delenv("HONCHO_API_KEY", raising=False)

View file

@ -25,7 +25,8 @@ class TestHonchoClientConfigDefaults:
assert config.environment == "production"
assert config.enabled is False
assert config.save_messages is True
assert config.session_strategy == "per-directory"
assert config.session_strategy == "per-session"
assert config.recall_mode == "hybrid"
assert config.session_peer_prefix is False
assert config.linked_hosts == []
assert config.sessions == {}
@ -134,6 +135,41 @@ class TestFromGlobalConfig:
assert config.workspace_id == "root-ws"
assert config.ai_peer == "root-ai"
def test_session_strategy_default_from_global_config(self, tmp_path):
"""from_global_config with no sessionStrategy should match dataclass default."""
config_file = tmp_path / "config.json"
config_file.write_text(json.dumps({"apiKey": "key"}))
config = HonchoClientConfig.from_global_config(config_path=config_file)
assert config.session_strategy == "per-session"
def test_context_tokens_host_block_wins(self, tmp_path):
"""Host block contextTokens should override root."""
config_file = tmp_path / "config.json"
config_file.write_text(json.dumps({
"apiKey": "key",
"contextTokens": 1000,
"hosts": {"hermes": {"contextTokens": 2000}},
}))
config = HonchoClientConfig.from_global_config(config_path=config_file)
assert config.context_tokens == 2000
def test_recall_mode_from_config(self, tmp_path):
"""recallMode is read from config, host block wins."""
config_file = tmp_path / "config.json"
config_file.write_text(json.dumps({
"apiKey": "key",
"recallMode": "tools",
"hosts": {"hermes": {"recallMode": "context"}},
}))
config = HonchoClientConfig.from_global_config(config_path=config_file)
assert config.recall_mode == "context"
def test_recall_mode_default(self, tmp_path):
config_file = tmp_path / "config.json"
config_file.write_text(json.dumps({"apiKey": "key"}))
config = HonchoClientConfig.from_global_config(config_path=config_file)
assert config.recall_mode == "hybrid"
def test_corrupt_config_falls_back_to_env(self, tmp_path):
config_file = tmp_path / "config.json"
config_file.write_text("not valid json{{{")
@ -177,6 +213,40 @@ class TestResolveSessionName:
# Should use os.getcwd() basename
assert result == Path.cwd().name
def test_per_repo_uses_git_root(self):
config = HonchoClientConfig(session_strategy="per-repo")
with patch.object(
HonchoClientConfig, "_git_repo_name", return_value="hermes-agent"
):
result = config.resolve_session_name("/home/user/hermes-agent/subdir")
assert result == "hermes-agent"
def test_per_repo_with_peer_prefix(self):
config = HonchoClientConfig(
session_strategy="per-repo", peer_name="eri", session_peer_prefix=True
)
with patch.object(
HonchoClientConfig, "_git_repo_name", return_value="groudon"
):
result = config.resolve_session_name("/home/user/groudon/src")
assert result == "eri-groudon"
def test_per_repo_falls_back_to_dirname_outside_git(self):
config = HonchoClientConfig(session_strategy="per-repo")
with patch.object(
HonchoClientConfig, "_git_repo_name", return_value=None
):
result = config.resolve_session_name("/home/user/not-a-repo")
assert result == "not-a-repo"
def test_per_repo_manual_override_still_wins(self):
config = HonchoClientConfig(
session_strategy="per-repo",
sessions={"/home/user/proj": "custom-session"},
)
result = config.resolve_session_name("/home/user/proj")
assert result == "custom-session"
class TestGetLinkedWorkspaces:
def test_resolves_linked_hosts(self):

View file

@ -93,8 +93,8 @@ class TestRealSubagentInterrupt(unittest.TestCase):
mock_client.close = MagicMock()
MockOpenAI.return_value = mock_client
# Also need to patch the system prompt builder
with patch('run_agent.build_system_prompt', return_value="You are a test agent"):
# Patch the instance method so it skips prompt assembly
with patch.object(AIAgent, '_build_system_prompt', return_value="You are a test agent"):
# Signal when child starts
original_run = AIAgent.run_conversation

View file

@ -13,6 +13,7 @@ from unittest.mock import MagicMock, patch, PropertyMock
import pytest
from honcho_integration.client import HonchoClientConfig
from run_agent import AIAgent
from agent.prompt_builder import DEFAULT_AGENT_IDENTITY, PLATFORM_HINTS
@ -1209,17 +1210,15 @@ class TestSystemPromptStability:
assert "User prefers Python over JavaScript" in agent._cached_system_prompt
def test_honcho_prefetch_skipped_on_continuing_session(self):
"""Honcho prefetch should not be called when conversation_history
is non-empty (continuing session)."""
def test_honcho_prefetch_runs_on_continuing_session(self):
"""Honcho prefetch is consumed on continuing sessions via ephemeral context."""
conversation_history = [
{"role": "user", "content": "hello"},
{"role": "assistant", "content": "hi there"},
]
# The guard: `not conversation_history` is False when history exists
should_prefetch = not conversation_history
assert should_prefetch is False
recall_mode = "hybrid"
should_prefetch = bool(conversation_history) and recall_mode != "tools"
assert should_prefetch is True
def test_honcho_prefetch_runs_on_first_turn(self):
"""Honcho prefetch should run when conversation_history is empty."""
@ -1228,6 +1227,190 @@ class TestSystemPromptStability:
assert should_prefetch is True
class TestHonchoActivation:
def test_disabled_config_skips_honcho_init(self):
hcfg = HonchoClientConfig(
enabled=False,
api_key="honcho-key",
peer_name="user",
ai_peer="hermes",
)
with (
patch("run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search")),
patch("run_agent.check_toolset_requirements", return_value={}),
patch("run_agent.OpenAI"),
patch("honcho_integration.client.HonchoClientConfig.from_global_config", return_value=hcfg),
patch("honcho_integration.client.get_honcho_client") as mock_client,
):
agent = AIAgent(
api_key="test-key-1234567890",
quiet_mode=True,
skip_context_files=True,
skip_memory=False,
)
assert agent._honcho is None
assert agent._honcho_config is hcfg
mock_client.assert_not_called()
def test_injected_honcho_manager_skips_fresh_client_init(self):
hcfg = HonchoClientConfig(
enabled=True,
api_key="honcho-key",
memory_mode="hybrid",
peer_name="user",
ai_peer="hermes",
recall_mode="hybrid",
)
manager = MagicMock()
manager._config = hcfg
manager.get_or_create.return_value = SimpleNamespace(messages=[])
manager.get_prefetch_context.return_value = {"representation": "Known user", "card": ""}
with (
patch("run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search")),
patch("run_agent.check_toolset_requirements", return_value={}),
patch("run_agent.OpenAI"),
patch("honcho_integration.client.get_honcho_client") as mock_client,
patch("tools.honcho_tools.set_session_context"),
):
agent = AIAgent(
api_key="test-key-1234567890",
quiet_mode=True,
skip_context_files=True,
skip_memory=False,
honcho_session_key="gateway-session",
honcho_manager=manager,
honcho_config=hcfg,
)
assert agent._honcho is manager
manager.get_or_create.assert_called_once_with("gateway-session")
manager.get_prefetch_context.assert_called_once_with("gateway-session")
manager.set_context_result.assert_called_once_with(
"gateway-session",
{"representation": "Known user", "card": ""},
)
mock_client.assert_not_called()
def test_recall_mode_context_suppresses_honcho_tools(self):
hcfg = HonchoClientConfig(
enabled=True,
api_key="honcho-key",
memory_mode="hybrid",
peer_name="user",
ai_peer="hermes",
recall_mode="context",
)
manager = MagicMock()
manager._config = hcfg
manager.get_or_create.return_value = SimpleNamespace(messages=[])
manager.get_prefetch_context.return_value = {"representation": "Known user", "card": ""}
with (
patch(
"run_agent.get_tool_definitions",
side_effect=[
_make_tool_defs("web_search"),
_make_tool_defs(
"web_search",
"honcho_context",
"honcho_profile",
"honcho_search",
"honcho_conclude",
),
],
),
patch("run_agent.check_toolset_requirements", return_value={}),
patch("run_agent.OpenAI"),
patch("tools.honcho_tools.set_session_context"),
):
agent = AIAgent(
api_key="test-key-1234567890",
quiet_mode=True,
skip_context_files=True,
skip_memory=False,
honcho_session_key="gateway-session",
honcho_manager=manager,
honcho_config=hcfg,
)
assert "web_search" in agent.valid_tool_names
assert "honcho_context" not in agent.valid_tool_names
assert "honcho_profile" not in agent.valid_tool_names
assert "honcho_search" not in agent.valid_tool_names
assert "honcho_conclude" not in agent.valid_tool_names
def test_inactive_honcho_strips_stale_honcho_tools(self):
hcfg = HonchoClientConfig(
enabled=False,
api_key="honcho-key",
peer_name="user",
ai_peer="hermes",
)
with (
patch("run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search", "honcho_context")),
patch("run_agent.check_toolset_requirements", return_value={}),
patch("run_agent.OpenAI"),
patch("honcho_integration.client.HonchoClientConfig.from_global_config", return_value=hcfg),
patch("honcho_integration.client.get_honcho_client") as mock_client,
):
agent = AIAgent(
api_key="test-key-1234567890",
quiet_mode=True,
skip_context_files=True,
skip_memory=False,
)
assert agent._honcho is None
assert "web_search" in agent.valid_tool_names
assert "honcho_context" not in agent.valid_tool_names
mock_client.assert_not_called()
class TestHonchoPrefetchScheduling:
def test_honcho_prefetch_includes_cached_dialectic(self, agent):
agent._honcho = MagicMock()
agent._honcho_session_key = "session-key"
agent._honcho.pop_context_result.return_value = {}
agent._honcho.pop_dialectic_result.return_value = "Continue with the migration checklist."
context = agent._honcho_prefetch("what next?")
assert "Continuity synthesis" in context
assert "migration checklist" in context
def test_queue_honcho_prefetch_skips_tools_mode(self, agent):
agent._honcho = MagicMock()
agent._honcho_session_key = "session-key"
agent._honcho_config = HonchoClientConfig(
enabled=True,
api_key="honcho-key",
recall_mode="tools",
)
agent._queue_honcho_prefetch("what next?")
agent._honcho.prefetch_context.assert_not_called()
agent._honcho.prefetch_dialectic.assert_not_called()
def test_queue_honcho_prefetch_runs_when_context_enabled(self, agent):
agent._honcho = MagicMock()
agent._honcho_session_key = "session-key"
agent._honcho_config = HonchoClientConfig(
enabled=True,
api_key="honcho-key",
recall_mode="hybrid",
)
agent._queue_honcho_prefetch("what next?")
agent._honcho.prefetch_context.assert_called_once_with("session-key", "what next?")
agent._honcho.prefetch_dialectic.assert_called_once_with("session-key", "what next?")
# ---------------------------------------------------------------------------
# Iteration budget pressure warnings
# ---------------------------------------------------------------------------