fix(gateway): persist Honcho managers across session requests
This commit is contained in:
parent
87cc5287a8
commit
87349b9bc1
2 changed files with 177 additions and 1 deletions
|
|
@ -225,6 +225,12 @@ class GatewayRunner:
|
||||||
# Track pending exec approvals per session
|
# Track pending exec approvals per session
|
||||||
# Key: session_key, Value: {"command": str, "pattern_key": str}
|
# Key: session_key, Value: {"command": str, "pattern_key": str}
|
||||||
self._pending_approvals: Dict[str, Dict[str, str]] = {}
|
self._pending_approvals: Dict[str, Dict[str, str]] = {}
|
||||||
|
|
||||||
|
# Persistent Honcho managers keyed by gateway session key.
|
||||||
|
# This preserves write_frequency="session" semantics across short-lived
|
||||||
|
# per-message AIAgent instances.
|
||||||
|
self._honcho_managers: Dict[str, Any] = {}
|
||||||
|
self._honcho_configs: Dict[str, Any] = {}
|
||||||
|
|
||||||
# Initialize session database for session_search tool support
|
# Initialize session database for session_search tool support
|
||||||
self._session_db = None
|
self._session_db = None
|
||||||
|
|
@ -241,6 +247,63 @@ class GatewayRunner:
|
||||||
# Event hook system
|
# Event hook system
|
||||||
from gateway.hooks import HookRegistry
|
from gateway.hooks import HookRegistry
|
||||||
self.hooks = HookRegistry()
|
self.hooks = HookRegistry()
|
||||||
|
|
||||||
|
def _get_or_create_gateway_honcho(self, session_key: str):
|
||||||
|
"""Return a persistent Honcho manager/config pair for this gateway session."""
|
||||||
|
if not hasattr(self, "_honcho_managers"):
|
||||||
|
self._honcho_managers = {}
|
||||||
|
if not hasattr(self, "_honcho_configs"):
|
||||||
|
self._honcho_configs = {}
|
||||||
|
|
||||||
|
if session_key in self._honcho_managers:
|
||||||
|
return self._honcho_managers[session_key], self._honcho_configs.get(session_key)
|
||||||
|
|
||||||
|
try:
|
||||||
|
from honcho_integration.client import HonchoClientConfig, get_honcho_client
|
||||||
|
from honcho_integration.session import HonchoSessionManager
|
||||||
|
|
||||||
|
hcfg = HonchoClientConfig.from_global_config()
|
||||||
|
ai_mode = hcfg.peer_memory_mode(hcfg.ai_peer)
|
||||||
|
user_mode = hcfg.peer_memory_mode(hcfg.peer_name or "user")
|
||||||
|
if not hcfg.enabled or not hcfg.api_key or (ai_mode == "local" and user_mode == "local"):
|
||||||
|
return None, hcfg
|
||||||
|
|
||||||
|
client = get_honcho_client(hcfg)
|
||||||
|
manager = HonchoSessionManager(
|
||||||
|
honcho=client,
|
||||||
|
config=hcfg,
|
||||||
|
context_tokens=hcfg.context_tokens,
|
||||||
|
)
|
||||||
|
self._honcho_managers[session_key] = manager
|
||||||
|
self._honcho_configs[session_key] = hcfg
|
||||||
|
return manager, hcfg
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug("Gateway Honcho init failed for %s: %s", session_key, e)
|
||||||
|
return None, None
|
||||||
|
|
||||||
|
def _shutdown_gateway_honcho(self, session_key: str) -> None:
|
||||||
|
"""Flush and close the persistent Honcho manager for a gateway session."""
|
||||||
|
managers = getattr(self, "_honcho_managers", None)
|
||||||
|
configs = getattr(self, "_honcho_configs", None)
|
||||||
|
if managers is None or configs is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
manager = managers.pop(session_key, None)
|
||||||
|
configs.pop(session_key, None)
|
||||||
|
if not manager:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
manager.shutdown()
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug("Gateway Honcho shutdown failed for %s: %s", session_key, e)
|
||||||
|
|
||||||
|
def _shutdown_all_gateway_honcho(self) -> None:
|
||||||
|
"""Flush and close all persistent Honcho managers."""
|
||||||
|
managers = getattr(self, "_honcho_managers", None)
|
||||||
|
if not managers:
|
||||||
|
return
|
||||||
|
for session_key in list(managers.keys()):
|
||||||
|
self._shutdown_gateway_honcho(session_key)
|
||||||
|
|
||||||
def _flush_memories_for_session(self, old_session_id: str):
|
def _flush_memories_for_session(self, old_session_id: str):
|
||||||
"""Prompt the agent to save memories/skills before context is lost.
|
"""Prompt the agent to save memories/skills before context is lost.
|
||||||
|
|
@ -595,6 +658,7 @@ class GatewayRunner:
|
||||||
)
|
)
|
||||||
try:
|
try:
|
||||||
await self._async_flush_memories(entry.session_id)
|
await self._async_flush_memories(entry.session_id)
|
||||||
|
self._shutdown_gateway_honcho(key)
|
||||||
self.session_store._pre_flushed_sessions.add(entry.session_id)
|
self.session_store._pre_flushed_sessions.add(entry.session_id)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.debug("Proactive memory flush failed for %s: %s", entry.session_id, e)
|
logger.debug("Proactive memory flush failed for %s: %s", entry.session_id, e)
|
||||||
|
|
@ -617,8 +681,9 @@ class GatewayRunner:
|
||||||
logger.info("✓ %s disconnected", platform.value)
|
logger.info("✓ %s disconnected", platform.value)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error("✗ %s disconnect error: %s", platform.value, e)
|
logger.error("✗ %s disconnect error: %s", platform.value, e)
|
||||||
|
|
||||||
self.adapters.clear()
|
self.adapters.clear()
|
||||||
|
self._shutdown_all_gateway_honcho()
|
||||||
self._shutdown_event.set()
|
self._shutdown_event.set()
|
||||||
|
|
||||||
from gateway.status import remove_pid_file
|
from gateway.status import remove_pid_file
|
||||||
|
|
@ -1369,6 +1434,8 @@ class GatewayRunner:
|
||||||
asyncio.create_task(self._async_flush_memories(old_entry.session_id))
|
asyncio.create_task(self._async_flush_memories(old_entry.session_id))
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.debug("Gateway memory flush on reset failed: %s", e)
|
logger.debug("Gateway memory flush on reset failed: %s", e)
|
||||||
|
|
||||||
|
self._shutdown_gateway_honcho(session_key)
|
||||||
|
|
||||||
# Reset the session
|
# Reset the session
|
||||||
new_entry = self.session_store.reset_session(session_key)
|
new_entry = self.session_store.reset_session(session_key)
|
||||||
|
|
@ -1989,6 +2056,8 @@ class GatewayRunner:
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.debug("Memory flush on resume failed: %s", e)
|
logger.debug("Memory flush on resume failed: %s", e)
|
||||||
|
|
||||||
|
self._shutdown_gateway_honcho(session_key)
|
||||||
|
|
||||||
# Clear any running agent for this session key
|
# Clear any running agent for this session key
|
||||||
if session_key in self._running_agents:
|
if session_key in self._running_agents:
|
||||||
del self._running_agents[session_key]
|
del self._running_agents[session_key]
|
||||||
|
|
@ -2812,6 +2881,7 @@ class GatewayRunner:
|
||||||
}
|
}
|
||||||
|
|
||||||
pr = self._provider_routing
|
pr = self._provider_routing
|
||||||
|
honcho_manager, honcho_config = self._get_or_create_gateway_honcho(session_key)
|
||||||
agent = AIAgent(
|
agent = AIAgent(
|
||||||
model=model,
|
model=model,
|
||||||
**runtime_kwargs,
|
**runtime_kwargs,
|
||||||
|
|
@ -2833,6 +2903,8 @@ class GatewayRunner:
|
||||||
step_callback=_step_callback_sync if _hooks_ref.loaded_hooks else None,
|
step_callback=_step_callback_sync if _hooks_ref.loaded_hooks else None,
|
||||||
platform=platform_key,
|
platform=platform_key,
|
||||||
honcho_session_key=session_key,
|
honcho_session_key=session_key,
|
||||||
|
honcho_manager=honcho_manager,
|
||||||
|
honcho_config=honcho_config,
|
||||||
session_db=self._session_db,
|
session_db=self._session_db,
|
||||||
fallback_model=self._fallback_model,
|
fallback_model=self._fallback_model,
|
||||||
)
|
)
|
||||||
|
|
|
||||||
104
tests/gateway/test_honcho_lifecycle.py
Normal file
104
tests/gateway/test_honcho_lifecycle.py
Normal file
|
|
@ -0,0 +1,104 @@
|
||||||
|
"""Tests for gateway-owned Honcho lifecycle helpers."""
|
||||||
|
|
||||||
|
from types import SimpleNamespace
|
||||||
|
from unittest.mock import AsyncMock, MagicMock, patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from gateway.config import Platform
|
||||||
|
from gateway.platforms.base import MessageEvent
|
||||||
|
from gateway.session import SessionSource
|
||||||
|
|
||||||
|
|
||||||
|
def _make_runner():
|
||||||
|
from gateway.run import GatewayRunner
|
||||||
|
|
||||||
|
runner = object.__new__(GatewayRunner)
|
||||||
|
runner._honcho_managers = {}
|
||||||
|
runner._honcho_configs = {}
|
||||||
|
runner._running_agents = {}
|
||||||
|
runner._pending_messages = {}
|
||||||
|
runner._pending_approvals = {}
|
||||||
|
runner.adapters = {}
|
||||||
|
runner.hooks = MagicMock()
|
||||||
|
runner.hooks.emit = AsyncMock()
|
||||||
|
return runner
|
||||||
|
|
||||||
|
|
||||||
|
def _make_event(text="/reset"):
|
||||||
|
return MessageEvent(
|
||||||
|
text=text,
|
||||||
|
source=SessionSource(
|
||||||
|
platform=Platform.TELEGRAM,
|
||||||
|
chat_id="chat-1",
|
||||||
|
user_id="user-1",
|
||||||
|
user_name="alice",
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TestGatewayHonchoLifecycle:
|
||||||
|
def test_gateway_reuses_honcho_manager_for_session_key(self):
|
||||||
|
runner = _make_runner()
|
||||||
|
hcfg = SimpleNamespace(
|
||||||
|
enabled=True,
|
||||||
|
api_key="honcho-key",
|
||||||
|
ai_peer="hermes",
|
||||||
|
peer_name="alice",
|
||||||
|
context_tokens=123,
|
||||||
|
peer_memory_mode=lambda peer: "hybrid",
|
||||||
|
)
|
||||||
|
manager = MagicMock()
|
||||||
|
|
||||||
|
with (
|
||||||
|
patch("honcho_integration.client.HonchoClientConfig.from_global_config", return_value=hcfg),
|
||||||
|
patch("honcho_integration.client.get_honcho_client", return_value=MagicMock()),
|
||||||
|
patch("honcho_integration.session.HonchoSessionManager", return_value=manager) as mock_mgr_cls,
|
||||||
|
):
|
||||||
|
first_mgr, first_cfg = runner._get_or_create_gateway_honcho("session-key")
|
||||||
|
second_mgr, second_cfg = runner._get_or_create_gateway_honcho("session-key")
|
||||||
|
|
||||||
|
assert first_mgr is manager
|
||||||
|
assert second_mgr is manager
|
||||||
|
assert first_cfg is hcfg
|
||||||
|
assert second_cfg is hcfg
|
||||||
|
mock_mgr_cls.assert_called_once()
|
||||||
|
|
||||||
|
def test_gateway_skips_honcho_manager_in_local_mode(self):
|
||||||
|
runner = _make_runner()
|
||||||
|
hcfg = SimpleNamespace(
|
||||||
|
enabled=True,
|
||||||
|
api_key="honcho-key",
|
||||||
|
ai_peer="hermes",
|
||||||
|
peer_name="alice",
|
||||||
|
peer_memory_mode=lambda peer: "local",
|
||||||
|
)
|
||||||
|
|
||||||
|
with (
|
||||||
|
patch("honcho_integration.client.HonchoClientConfig.from_global_config", return_value=hcfg),
|
||||||
|
patch("honcho_integration.client.get_honcho_client") as mock_client,
|
||||||
|
patch("honcho_integration.session.HonchoSessionManager") as mock_mgr_cls,
|
||||||
|
):
|
||||||
|
manager, cfg = runner._get_or_create_gateway_honcho("session-key")
|
||||||
|
|
||||||
|
assert manager is None
|
||||||
|
assert cfg is hcfg
|
||||||
|
mock_client.assert_not_called()
|
||||||
|
mock_mgr_cls.assert_not_called()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_reset_shuts_down_gateway_honcho_manager(self):
|
||||||
|
runner = _make_runner()
|
||||||
|
event = _make_event()
|
||||||
|
runner._shutdown_gateway_honcho = MagicMock()
|
||||||
|
runner.session_store = MagicMock()
|
||||||
|
runner.session_store._generate_session_key.return_value = "gateway-key"
|
||||||
|
runner.session_store._entries = {
|
||||||
|
"gateway-key": SimpleNamespace(session_id="old-session"),
|
||||||
|
}
|
||||||
|
runner.session_store.reset_session.return_value = SimpleNamespace(session_id="new-session")
|
||||||
|
|
||||||
|
result = await runner._handle_reset_command(event)
|
||||||
|
|
||||||
|
runner._shutdown_gateway_honcho.assert_called_once_with("gateway-key")
|
||||||
|
assert "Session reset" in result
|
||||||
Loading…
Add table
Add a link
Reference in a new issue