From 388130a122a0815d3ffda053d035e0c561f5ae2c Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Thu, 19 Mar 2026 10:30:50 -0700 Subject: [PATCH] fix: persist ACP sessions to SessionDB so they survive process restarts * fix: persist ACP sessions to disk so they survive process restarts The ACP adapter stored sessions entirely in-memory. When the editor restarted the ACP subprocess (idle timeout, crash, system sleep/wake, editor restart), all sessions were lost. The editor's load_session / resume_session calls would fail to find the session, forcing a new empty session and losing all conversation history. Changes: - SessionManager now persists each session as a JSON file under ~/.hermes/acp_sessions/.json - get_session() transparently restores from disk when not in memory - update_cwd(), fork_session(), list_sessions() all check disk - server.py calls save_session() after prompt completion, /reset, /compact, and model switches - cleanup() and remove_session() delete disk files too - Sessions have a 7-day TTL; expired sessions are pruned on startup - Atomic writes via tempfile + os.replace to prevent corruption - 11 new tests covering persistence, disk restoration, and TTL expiry * refactor: use SessionDB instead of JSON files for ACP session persistence Replace the standalone JSON file persistence layer with SessionDB (~/.hermes/state.db) integration. ACP sessions now: - Share the same DB as CLI and gateway sessions - Are searchable via session_search (FTS5) - Get token tracking, cost tracking, and session titles for free - Follow existing session pruning policies Key changes: - _get_db() lazily creates a SessionDB, resolving HERMES_HOME dynamically (not at import time) for test compatibility - _persist() creates session record + replaces messages in DB - _restore() loads from DB with source='acp' filter - cwd stored in model_config JSON field (no schema migration) - Model values coerced to str to handle mock agents in tests - Removed: json files, sessions_dir, ttl_days, _expire logic - Tests updated: DB-backed persistence, FTS search, tool_call round-tripping, source filtering --------- Co-authored-by: Test --- acp_adapter/server.py | 6 + acp_adapter/session.py | 294 +++++++++++++++++++++++++++++++++----- tests/acp/test_session.py | 173 +++++++++++++++++++++- 3 files changed, 438 insertions(+), 35 deletions(-) diff --git a/acp_adapter/server.py b/acp_adapter/server.py index 1081104e..92788988 100644 --- a/acp_adapter/server.py +++ b/acp_adapter/server.py @@ -304,6 +304,8 @@ class HermesACPAgent(acp.Agent): if result.get("messages"): state.history = result["messages"] + # Persist updated history so sessions survive process restarts. + self.session_manager.save_session(session_id) final_response = result.get("final_response", "") if final_response and conn: @@ -400,6 +402,7 @@ class HermesACPAgent(acp.Agent): cwd=state.cwd, model=new_model, ) + self.session_manager.save_session(state.session_id) provider_label = target_provider or getattr(state.agent, "provider", "auto") logger.info("Session %s: model switched to %s", state.session_id, new_model) return f"Model switched to: {new_model}\nProvider: {provider_label}" @@ -444,6 +447,7 @@ class HermesACPAgent(acp.Agent): def _cmd_reset(self, args: str, state: SessionState) -> str: state.history.clear() + self.session_manager.save_session(state.session_id) return "Conversation history cleared." def _cmd_compact(self, args: str, state: SessionState) -> str: @@ -453,6 +457,7 @@ class HermesACPAgent(acp.Agent): agent = state.agent if hasattr(agent, "compress_context"): agent.compress_context(state.history) + self.session_manager.save_session(state.session_id) return f"Context compressed. Messages: {len(state.history)}" return "Context compression not available for this agent." except Exception as e: @@ -475,5 +480,6 @@ class HermesACPAgent(acp.Agent): cwd=state.cwd, model=model_id, ) + self.session_manager.save_session(session_id) logger.info("Session %s: model switched to %s", session_id, model_id) return None diff --git a/acp_adapter/session.py b/acp_adapter/session.py index 0f5b2428..01b2ee47 100644 --- a/acp_adapter/session.py +++ b/acp_adapter/session.py @@ -1,7 +1,15 @@ -"""ACP session manager — maps ACP sessions to Hermes AIAgent instances.""" +"""ACP session manager — maps ACP sessions to Hermes AIAgent instances. + +Sessions are persisted to the shared SessionDB (``~/.hermes/state.db``) so they +survive process restarts and appear in ``session_search``. When the editor +reconnects after idle/restart, the ``load_session`` / ``resume_session`` calls +find the persisted session in the database and restore the full conversation +history. +""" from __future__ import annotations import copy +import json import logging import uuid from dataclasses import dataclass, field @@ -46,18 +54,26 @@ class SessionState: class SessionManager: - """Thread-safe manager for ACP sessions backed by Hermes AIAgent instances.""" + """Thread-safe manager for ACP sessions backed by Hermes AIAgent instances. - def __init__(self, agent_factory=None): + Sessions are held in-memory for fast access **and** persisted to the + shared SessionDB so they survive process restarts and are searchable + via ``session_search``. + """ + + def __init__(self, agent_factory=None, db=None): """ Args: agent_factory: Optional callable that creates an AIAgent-like object. Used by tests. When omitted, a real AIAgent is created using the current Hermes runtime provider configuration. + db: Optional SessionDB instance. When omitted, the default + SessionDB (``~/.hermes/state.db``) is lazily created. """ self._sessions: Dict[str, SessionState] = {} self._lock = Lock() self._agent_factory = agent_factory + self._db_instance = db # None → lazy-init on first use # ---- public API --------------------------------------------------------- @@ -77,54 +93,67 @@ class SessionManager: with self._lock: self._sessions[session_id] = state _register_task_cwd(session_id, cwd) + self._persist(state) logger.info("Created ACP session %s (cwd=%s)", session_id, cwd) return state def get_session(self, session_id: str) -> Optional[SessionState]: - """Return the session for *session_id*, or ``None``.""" + """Return the session for *session_id*, or ``None``. + + If the session is not in memory but exists in the database (e.g. after + a process restart), it is transparently restored. + """ with self._lock: - return self._sessions.get(session_id) + state = self._sessions.get(session_id) + if state is not None: + return state + # Attempt to restore from database. + return self._restore(session_id) def remove_session(self, session_id: str) -> bool: - """Remove a session. Returns True if it existed.""" + """Remove a session from memory and database. Returns True if it existed.""" with self._lock: existed = self._sessions.pop(session_id, None) is not None - if existed: + db_existed = self._delete_persisted(session_id) + if existed or db_existed: _clear_task_cwd(session_id) - return existed + return existed or db_existed def fork_session(self, session_id: str, cwd: str = ".") -> Optional[SessionState]: """Deep-copy a session's history into a new session.""" import threading - with self._lock: - original = self._sessions.get(session_id) - if original is None: - return None + original = self.get_session(session_id) # checks DB too + if original is None: + return None - new_id = str(uuid.uuid4()) - agent = self._make_agent( - session_id=new_id, - cwd=cwd, - model=original.model or None, - ) - state = SessionState( - session_id=new_id, - agent=agent, - cwd=cwd, - model=getattr(agent, "model", original.model) or original.model, - history=copy.deepcopy(original.history), - cancel_event=threading.Event(), - ) + new_id = str(uuid.uuid4()) + agent = self._make_agent( + session_id=new_id, + cwd=cwd, + model=original.model or None, + ) + state = SessionState( + session_id=new_id, + agent=agent, + cwd=cwd, + model=getattr(agent, "model", original.model) or original.model, + history=copy.deepcopy(original.history), + cancel_event=threading.Event(), + ) + with self._lock: self._sessions[new_id] = state _register_task_cwd(new_id, cwd) + self._persist(state) logger.info("Forked ACP session %s -> %s", session_id, new_id) return state def list_sessions(self) -> List[Dict[str, Any]]: - """Return lightweight info dicts for all sessions.""" + """Return lightweight info dicts for all sessions (memory + database).""" + # Collect in-memory sessions first. with self._lock: - return [ + seen_ids = set(self._sessions.keys()) + results = [ { "session_id": s.session_id, "cwd": s.cwd, @@ -134,23 +163,220 @@ class SessionManager: for s in self._sessions.values() ] + # Merge any persisted sessions not currently in memory. + db = self._get_db() + if db is not None: + try: + rows = db.search_sessions(source="acp", limit=1000) + for row in rows: + sid = row["id"] + if sid in seen_ids: + continue + # Extract cwd from model_config JSON. + cwd = "." + mc = row.get("model_config") + if mc: + try: + cwd = json.loads(mc).get("cwd", ".") + except (json.JSONDecodeError, TypeError): + pass + results.append({ + "session_id": sid, + "cwd": cwd, + "model": row.get("model") or "", + "history_len": row.get("message_count") or 0, + }) + except Exception: + logger.debug("Failed to list ACP sessions from DB", exc_info=True) + + return results + def update_cwd(self, session_id: str, cwd: str) -> Optional[SessionState]: """Update the working directory for a session and its tool overrides.""" - with self._lock: - state = self._sessions.get(session_id) - if state is None: - return None - state.cwd = cwd + state = self.get_session(session_id) # checks DB too + if state is None: + return None + state.cwd = cwd _register_task_cwd(session_id, cwd) + self._persist(state) return state def cleanup(self) -> None: - """Remove all sessions and clear task-specific cwd overrides.""" + """Remove all sessions (memory and database) and clear task-specific cwd overrides.""" with self._lock: session_ids = list(self._sessions.keys()) self._sessions.clear() for session_id in session_ids: _clear_task_cwd(session_id) + self._delete_persisted(session_id) + # Also remove any DB-only ACP sessions not currently in memory. + db = self._get_db() + if db is not None: + try: + rows = db.search_sessions(source="acp", limit=10000) + for row in rows: + sid = row["id"] + _clear_task_cwd(sid) + db.delete_session(sid) + except Exception: + logger.debug("Failed to cleanup ACP sessions from DB", exc_info=True) + + def save_session(self, session_id: str) -> None: + """Persist the current state of a session to the database. + + Called by the server after prompt completion, slash commands that + mutate history, and model switches. + """ + with self._lock: + state = self._sessions.get(session_id) + if state is not None: + self._persist(state) + + # ---- persistence via SessionDB ------------------------------------------ + + def _get_db(self): + """Lazily initialise and return the SessionDB instance. + + Returns ``None`` if the DB is unavailable (e.g. import error in a + minimal test environment). + + Note: we resolve ``HERMES_HOME`` dynamically rather than relying on + the module-level ``DEFAULT_DB_PATH`` constant, because that constant + is evaluated at import time and won't reflect env-var changes made + later (e.g. by the test fixture ``_isolate_hermes_home``). + """ + if self._db_instance is not None: + return self._db_instance + try: + import os + from pathlib import Path + from hermes_state import SessionDB + hermes_home = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes")) + self._db_instance = SessionDB(db_path=hermes_home / "state.db") + return self._db_instance + except Exception: + logger.debug("SessionDB unavailable for ACP persistence", exc_info=True) + return None + + def _persist(self, state: SessionState) -> None: + """Write session state to the database. + + Creates the session record if it doesn't exist, then replaces all + stored messages with the current in-memory history. + """ + db = self._get_db() + if db is None: + return + + # Ensure model is a plain string (not a MagicMock or other proxy). + model_str = str(state.model) if state.model else None + cwd_json = json.dumps({"cwd": state.cwd}) + + try: + # Ensure the session record exists. + existing = db.get_session(state.session_id) + if existing is None: + db.create_session( + session_id=state.session_id, + source="acp", + model=model_str, + model_config={"cwd": state.cwd}, + ) + else: + # Update model_config (contains cwd) if changed. + try: + with db._lock: + db._conn.execute( + "UPDATE sessions SET model_config = ?, model = COALESCE(?, model) WHERE id = ?", + (cwd_json, model_str, state.session_id), + ) + db._conn.commit() + except Exception: + logger.debug("Failed to update ACP session metadata", exc_info=True) + + # Replace stored messages with current history. + db.clear_messages(state.session_id) + for msg in state.history: + db.append_message( + session_id=state.session_id, + role=msg.get("role", "user"), + content=msg.get("content"), + tool_name=msg.get("tool_name") or msg.get("name"), + tool_calls=msg.get("tool_calls"), + tool_call_id=msg.get("tool_call_id"), + ) + except Exception: + logger.warning("Failed to persist ACP session %s", state.session_id, exc_info=True) + + def _restore(self, session_id: str) -> Optional[SessionState]: + """Load a session from the database into memory, recreating the AIAgent.""" + import threading + + db = self._get_db() + if db is None: + return None + + try: + row = db.get_session(session_id) + except Exception: + logger.debug("Failed to query DB for ACP session %s", session_id, exc_info=True) + return None + + if row is None: + return None + + # Only restore ACP sessions. + if row.get("source") != "acp": + return None + + # Extract cwd from model_config. + cwd = "." + mc = row.get("model_config") + if mc: + try: + cwd = json.loads(mc).get("cwd", ".") + except (json.JSONDecodeError, TypeError): + pass + + model = row.get("model") or None + + # Load conversation history. + try: + history = db.get_messages_as_conversation(session_id) + except Exception: + logger.warning("Failed to load messages for ACP session %s", session_id, exc_info=True) + history = [] + + try: + agent = self._make_agent(session_id=session_id, cwd=cwd, model=model) + except Exception: + logger.warning("Failed to recreate agent for ACP session %s", session_id, exc_info=True) + return None + + state = SessionState( + session_id=session_id, + agent=agent, + cwd=cwd, + model=model or getattr(agent, "model", "") or "", + history=history, + cancel_event=threading.Event(), + ) + with self._lock: + self._sessions[session_id] = state + _register_task_cwd(session_id, cwd) + logger.info("Restored ACP session %s from DB (%d messages)", session_id, len(history)) + return state + + def _delete_persisted(self, session_id: str) -> bool: + """Delete a session from the database. Returns True if it existed.""" + db = self._get_db() + if db is None: + return False + try: + return db.delete_session(session_id) + except Exception: + logger.debug("Failed to delete ACP session %s from DB", session_id, exc_info=True) + return False # ---- internal ----------------------------------------------------------- diff --git a/tests/acp/test_session.py b/tests/acp/test_session.py index 79cbcf53..43d9a722 100644 --- a/tests/acp/test_session.py +++ b/tests/acp/test_session.py @@ -1,15 +1,21 @@ """Tests for acp_adapter.session — SessionManager and SessionState.""" +import json import pytest from unittest.mock import MagicMock from acp_adapter.session import SessionManager, SessionState +from hermes_state import SessionDB + + +def _mock_agent(): + return MagicMock(name="MockAIAgent") @pytest.fixture() def manager(): """SessionManager with a mock agent factory (avoids needing API keys).""" - return SessionManager(agent_factory=lambda: MagicMock(name="MockAIAgent")) + return SessionManager(agent_factory=_mock_agent) # --------------------------------------------------------------------------- @@ -110,3 +116,168 @@ class TestListAndCleanup: assert manager.get_session(state.session_id) is None # Removing again returns False assert manager.remove_session(state.session_id) is False + + +# --------------------------------------------------------------------------- +# persistence — sessions survive process restarts (via SessionDB) +# --------------------------------------------------------------------------- + + +class TestPersistence: + """Verify that sessions are persisted to SessionDB and can be restored.""" + + def test_create_session_writes_to_db(self, manager): + state = manager.create_session(cwd="/project") + db = manager._get_db() + assert db is not None + row = db.get_session(state.session_id) + assert row is not None + assert row["source"] == "acp" + # cwd stored in model_config JSON + mc = json.loads(row["model_config"]) + assert mc["cwd"] == "/project" + + def test_get_session_restores_from_db(self, manager): + """Simulate process restart: create session, drop from memory, get again.""" + state = manager.create_session(cwd="/work") + state.history.append({"role": "user", "content": "hello"}) + state.history.append({"role": "assistant", "content": "hi there"}) + manager.save_session(state.session_id) + + sid = state.session_id + + # Drop from in-memory store (simulates process restart). + with manager._lock: + del manager._sessions[sid] + + # get_session should transparently restore from DB. + restored = manager.get_session(sid) + assert restored is not None + assert restored.session_id == sid + assert restored.cwd == "/work" + assert len(restored.history) == 2 + assert restored.history[0]["content"] == "hello" + assert restored.history[1]["content"] == "hi there" + # Agent should have been recreated. + assert restored.agent is not None + + def test_save_session_updates_db(self, manager): + state = manager.create_session() + state.history.append({"role": "user", "content": "test"}) + manager.save_session(state.session_id) + + db = manager._get_db() + messages = db.get_messages_as_conversation(state.session_id) + assert len(messages) == 1 + assert messages[0]["content"] == "test" + + def test_remove_session_deletes_from_db(self, manager): + state = manager.create_session() + db = manager._get_db() + assert db.get_session(state.session_id) is not None + manager.remove_session(state.session_id) + assert db.get_session(state.session_id) is None + + def test_cleanup_removes_all_from_db(self, manager): + s1 = manager.create_session() + s2 = manager.create_session() + db = manager._get_db() + assert db.get_session(s1.session_id) is not None + assert db.get_session(s2.session_id) is not None + manager.cleanup() + assert db.get_session(s1.session_id) is None + assert db.get_session(s2.session_id) is None + + def test_list_sessions_includes_db_only(self, manager): + """Sessions only in DB (not in memory) appear in list_sessions.""" + state = manager.create_session(cwd="/db-only") + sid = state.session_id + + # Drop from memory. + with manager._lock: + del manager._sessions[sid] + + listing = manager.list_sessions() + ids = {s["session_id"] for s in listing} + assert sid in ids + + def test_fork_restores_source_from_db(self, manager): + """Forking a session that is only in DB should work.""" + original = manager.create_session() + original.history.append({"role": "user", "content": "context"}) + manager.save_session(original.session_id) + + # Drop original from memory. + with manager._lock: + del manager._sessions[original.session_id] + + forked = manager.fork_session(original.session_id, cwd="/fork") + assert forked is not None + assert len(forked.history) == 1 + assert forked.history[0]["content"] == "context" + assert forked.session_id != original.session_id + + def test_update_cwd_restores_from_db(self, manager): + state = manager.create_session(cwd="/old") + sid = state.session_id + + with manager._lock: + del manager._sessions[sid] + + updated = manager.update_cwd(sid, "/new") + assert updated is not None + assert updated.cwd == "/new" + + # Should also be persisted in DB. + db = manager._get_db() + row = db.get_session(sid) + mc = json.loads(row["model_config"]) + assert mc["cwd"] == "/new" + + def test_only_restores_acp_sessions(self, manager): + """get_session should not restore non-ACP sessions from DB.""" + db = manager._get_db() + # Manually create a CLI session in the DB. + db.create_session(session_id="cli-session-123", source="cli", model="test") + # Should not be found via ACP SessionManager. + assert manager.get_session("cli-session-123") is None + + def test_sessions_searchable_via_fts(self, manager): + """ACP sessions stored in SessionDB are searchable via FTS5.""" + state = manager.create_session() + state.history.append({"role": "user", "content": "how do I configure nginx"}) + state.history.append({"role": "assistant", "content": "Here is the nginx config..."}) + manager.save_session(state.session_id) + + db = manager._get_db() + results = db.search_messages("nginx") + assert len(results) > 0 + session_ids = {r["session_id"] for r in results} + assert state.session_id in session_ids + + def test_tool_calls_persisted(self, manager): + """Messages with tool_calls should round-trip through the DB.""" + state = manager.create_session() + state.history.append({ + "role": "assistant", + "content": None, + "tool_calls": [{"id": "tc_1", "type": "function", + "function": {"name": "terminal", "arguments": "{}"}}], + }) + state.history.append({ + "role": "tool", + "content": "output here", + "tool_call_id": "tc_1", + "name": "terminal", + }) + manager.save_session(state.session_id) + + # Drop from memory, restore from DB. + with manager._lock: + del manager._sessions[state.session_id] + + restored = manager.get_session(state.session_id) + assert restored is not None + assert len(restored.history) == 2 + assert restored.history[0].get("tool_calls") is not None + assert restored.history[1].get("tool_call_id") == "tc_1"