feat: add persistent memory system + SQLite session store

Two-part implementation:

Part A - Curated Bounded Memory:
- New memory tool (tools/memory_tool.py) with MEMORY.md + USER.md stores
- Character-limited (2200/1375 chars), § delimited entries
- Frozen snapshot injected into system prompt at session start
- Model manages pruning via replace/remove with substring matching
- Usage indicator shown in system prompt header

Part B - SQLite Session Store:
- New hermes_state.py with SessionDB class, FTS5 full-text search
- Gateway session.py rewritten to dual-write SQLite + legacy JSONL
- Compression-triggered session splitting with parent_session_id chains
- New session_search tool with Gemini Flash summarization of matched sessions
- CLI session lifecycle (create on launch, close on exit)

Also:
- System prompt now cached per session, only rebuilt on compression
  (fixes prefix cache invalidation from date/time changes every turn)
- Config version bumped to 3, hermes doctor checks for new artifacts
- Disabled in batch_runner and RL environments
This commit is contained in:
teknium1 2026-02-19 00:57:31 -08:00
parent 655303f2f1
commit 440c244cac
19 changed files with 2397 additions and 327 deletions

385
tools/memory_tool.py Normal file
View file

@ -0,0 +1,385 @@
#!/usr/bin/env python3
"""
Memory Tool Module - Persistent Curated Memory
Provides bounded, file-backed memory that persists across sessions. Two stores:
- MEMORY.md: agent's personal notes and observations (environment facts, project
conventions, tool quirks, things learned)
- USER.md: what the agent knows about the user (preferences, communication style,
expectations, workflow habits)
Both are injected into the system prompt as a frozen snapshot at session start.
Mid-session writes update files on disk immediately (durable) but do NOT change
the system prompt -- this preserves the prefix cache for the entire session.
The snapshot refreshes on the next session start.
Entry delimiter: § (section sign). Entries can be multiline.
Character limits (not tokens) because char counts are model-independent.
Design:
- Single `memory` tool with action parameter: add, replace, remove, read
- replace/remove use short unique substring matching (not full text or IDs)
- Behavioral guidance lives in the tool schema description
- Frozen snapshot pattern: system prompt is stable, tool responses show live state
"""
import json
import os
import fcntl
from pathlib import Path
from typing import Dict, Any, List, Optional
# Where memory files live
MEMORY_DIR = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes")) / "memories"
ENTRY_DELIMITER = "\n§\n"
class MemoryStore:
"""
Bounded curated memory with file persistence. One instance per AIAgent.
Maintains two parallel states:
- _system_prompt_snapshot: frozen at load time, used for system prompt injection.
Never mutated mid-session. Keeps prefix cache stable.
- memory_entries / user_entries: live state, mutated by tool calls, persisted to disk.
Tool responses always reflect this live state.
"""
def __init__(self, memory_char_limit: int = 2200, user_char_limit: int = 1375):
self.memory_entries: List[str] = []
self.user_entries: List[str] = []
self.memory_char_limit = memory_char_limit
self.user_char_limit = user_char_limit
# Frozen snapshot for system prompt -- set once at load_from_disk()
self._system_prompt_snapshot: Dict[str, str] = {"memory": "", "user": ""}
def load_from_disk(self):
"""Load entries from MEMORY.md and USER.md, capture system prompt snapshot."""
MEMORY_DIR.mkdir(parents=True, exist_ok=True)
self.memory_entries = self._read_file(MEMORY_DIR / "MEMORY.md")
self.user_entries = self._read_file(MEMORY_DIR / "USER.md")
# Capture frozen snapshot for system prompt injection
self._system_prompt_snapshot = {
"memory": self._render_block("memory", self.memory_entries),
"user": self._render_block("user", self.user_entries),
}
def save_to_disk(self, target: str):
"""Persist entries to the appropriate file. Called after every mutation."""
MEMORY_DIR.mkdir(parents=True, exist_ok=True)
if target == "memory":
self._write_file(MEMORY_DIR / "MEMORY.md", self.memory_entries)
elif target == "user":
self._write_file(MEMORY_DIR / "USER.md", self.user_entries)
def _entries_for(self, target: str) -> List[str]:
if target == "user":
return self.user_entries
return self.memory_entries
def _set_entries(self, target: str, entries: List[str]):
if target == "user":
self.user_entries = entries
else:
self.memory_entries = entries
def _char_count(self, target: str) -> int:
entries = self._entries_for(target)
if not entries:
return 0
return len(ENTRY_DELIMITER.join(entries))
def _char_limit(self, target: str) -> int:
if target == "user":
return self.user_char_limit
return self.memory_char_limit
def add(self, target: str, content: str) -> Dict[str, Any]:
"""Append a new entry. Returns error if it would exceed the char limit."""
content = content.strip()
if not content:
return {"success": False, "error": "Content cannot be empty."}
entries = self._entries_for(target)
limit = self._char_limit(target)
# Calculate what the new total would be
new_entries = entries + [content]
new_total = len(ENTRY_DELIMITER.join(new_entries))
if new_total > limit:
current = self._char_count(target)
return {
"success": False,
"error": (
f"Memory at {current:,}/{limit:,} chars. "
f"Adding this entry ({len(content)} chars) would exceed the limit. "
f"Replace or remove existing entries first."
),
"current_entries": entries,
"usage": f"{current:,}/{limit:,}",
}
entries.append(content)
self._set_entries(target, entries)
self.save_to_disk(target)
return self._success_response(target, "Entry added.")
def replace(self, target: str, old_text: str, new_content: str) -> Dict[str, Any]:
"""Find entry containing old_text substring, replace it with new_content."""
old_text = old_text.strip()
new_content = new_content.strip()
if not old_text:
return {"success": False, "error": "old_text cannot be empty."}
if not new_content:
return {"success": False, "error": "new_content cannot be empty. Use 'remove' to delete entries."}
entries = self._entries_for(target)
matches = [(i, e) for i, e in enumerate(entries) if old_text in e]
if len(matches) == 0:
return {"success": False, "error": f"No entry matched '{old_text}'."}
if len(matches) > 1:
previews = [e[:80] + ("..." if len(e) > 80 else "") for _, e in matches]
return {
"success": False,
"error": f"Multiple entries matched '{old_text}'. Be more specific.",
"matches": previews,
}
idx = matches[0][0]
limit = self._char_limit(target)
# Check that replacement doesn't blow the budget
test_entries = entries.copy()
test_entries[idx] = new_content
new_total = len(ENTRY_DELIMITER.join(test_entries))
if new_total > limit:
return {
"success": False,
"error": (
f"Replacement would put memory at {new_total:,}/{limit:,} chars. "
f"Shorten the new content or remove other entries first."
),
}
entries[idx] = new_content
self._set_entries(target, entries)
self.save_to_disk(target)
return self._success_response(target, "Entry replaced.")
def remove(self, target: str, old_text: str) -> Dict[str, Any]:
"""Remove the entry containing old_text substring."""
old_text = old_text.strip()
if not old_text:
return {"success": False, "error": "old_text cannot be empty."}
entries = self._entries_for(target)
matches = [(i, e) for i, e in enumerate(entries) if old_text in e]
if len(matches) == 0:
return {"success": False, "error": f"No entry matched '{old_text}'."}
if len(matches) > 1:
previews = [e[:80] + ("..." if len(e) > 80 else "") for _, e in matches]
return {
"success": False,
"error": f"Multiple entries matched '{old_text}'. Be more specific.",
"matches": previews,
}
idx = matches[0][0]
entries.pop(idx)
self._set_entries(target, entries)
self.save_to_disk(target)
return self._success_response(target, "Entry removed.")
def read(self, target: str) -> Dict[str, Any]:
"""Return live current entries and usage stats."""
return self._success_response(target)
def format_for_system_prompt(self, target: str) -> Optional[str]:
"""
Return the frozen snapshot for system prompt injection.
This returns the state captured at load_from_disk() time, NOT the live
state. Mid-session writes do not affect this. This keeps the system
prompt stable across all turns, preserving the prefix cache.
Returns None if the snapshot is empty (no entries at load time).
"""
block = self._system_prompt_snapshot.get(target, "")
return block if block else None
# -- Internal helpers --
def _success_response(self, target: str, message: str = None) -> Dict[str, Any]:
entries = self._entries_for(target)
current = self._char_count(target)
limit = self._char_limit(target)
pct = int((current / limit) * 100) if limit > 0 else 0
resp = {
"success": True,
"target": target,
"entries": entries,
"usage": f"{pct}% — {current:,}/{limit:,} chars",
"entry_count": len(entries),
}
if message:
resp["message"] = message
return resp
def _render_block(self, target: str, entries: List[str]) -> str:
"""Render a system prompt block with header and usage indicator."""
if not entries:
return ""
limit = self._char_limit(target)
content = ENTRY_DELIMITER.join(entries)
current = len(content)
pct = int((current / limit) * 100) if limit > 0 else 0
if target == "user":
header = f"USER PROFILE (who the user is) [{pct}% — {current:,}/{limit:,} chars]"
else:
header = f"MEMORY (your personal notes) [{pct}% — {current:,}/{limit:,} chars]"
separator = "" * 46
return f"{separator}\n{header}\n{separator}\n{content}"
@staticmethod
def _read_file(path: Path) -> List[str]:
"""Read a memory file and split into entries."""
if not path.exists():
return []
try:
with open(path, "r", encoding="utf-8") as f:
fcntl.flock(f, fcntl.LOCK_SH)
try:
raw = f.read()
finally:
fcntl.flock(f, fcntl.LOCK_UN)
except (OSError, IOError):
return []
if not raw.strip():
return []
entries = [e.strip() for e in raw.split("§")]
return [e for e in entries if e]
@staticmethod
def _write_file(path: Path, entries: List[str]):
"""Write entries to a memory file with file locking."""
content = ENTRY_DELIMITER.join(entries) if entries else ""
try:
with open(path, "w", encoding="utf-8") as f:
fcntl.flock(f, fcntl.LOCK_EX)
try:
f.write(content)
finally:
fcntl.flock(f, fcntl.LOCK_UN)
except (OSError, IOError) as e:
raise RuntimeError(f"Failed to write memory file {path}: {e}")
def memory_tool(
action: str,
target: str = "memory",
content: str = None,
old_text: str = None,
store: Optional[MemoryStore] = None,
) -> str:
"""
Single entry point for the memory tool. Dispatches to MemoryStore methods.
Returns JSON string with results.
"""
if store is None:
return json.dumps({"success": False, "error": "Memory is not available. It may be disabled in config or this environment."}, ensure_ascii=False)
if target not in ("memory", "user"):
return json.dumps({"success": False, "error": f"Invalid target '{target}'. Use 'memory' or 'user'."}, ensure_ascii=False)
if action == "add":
if not content:
return json.dumps({"success": False, "error": "Content is required for 'add' action."}, ensure_ascii=False)
result = store.add(target, content)
elif action == "replace":
if not old_text:
return json.dumps({"success": False, "error": "old_text is required for 'replace' action."}, ensure_ascii=False)
if not content:
return json.dumps({"success": False, "error": "content is required for 'replace' action."}, ensure_ascii=False)
result = store.replace(target, old_text, content)
elif action == "remove":
if not old_text:
return json.dumps({"success": False, "error": "old_text is required for 'remove' action."}, ensure_ascii=False)
result = store.remove(target, old_text)
elif action == "read":
result = store.read(target)
else:
return json.dumps({"success": False, "error": f"Unknown action '{action}'. Use: add, replace, remove, read"}, ensure_ascii=False)
return json.dumps(result, ensure_ascii=False)
def check_memory_requirements() -> bool:
"""Memory tool has no external requirements -- always available."""
return True
# =============================================================================
# OpenAI Function-Calling Schema
# =============================================================================
MEMORY_SCHEMA = {
"name": "memory",
"description": (
"Manage persistent memory (visible in system prompt). Targets: "
"'memory' (your notes) or 'user' (user profile).\n"
"Actions: add, replace, remove, read. For replace/remove, old_text "
"is a short unique snippet to identify the entry.\n"
"Usage indicator in system prompt shows capacity. When >80%, "
"consolidate/replace before adding. Prefer replacing over removing.\n"
"Write: non-obvious facts, user preferences, tool quirks. "
"Skip: trivial info, things in skills, re-discoverable content."
),
"parameters": {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["add", "replace", "remove", "read"],
"description": "The action to perform."
},
"target": {
"type": "string",
"enum": ["memory", "user"],
"description": "Which memory store: 'memory' for personal notes, 'user' for user profile."
},
"content": {
"type": "string",
"description": "The entry content. Required for 'add' and 'replace'."
},
"old_text": {
"type": "string",
"description": "Short unique substring identifying the entry to replace or remove."
},
},
"required": ["action", "target"],
},
}

View file

@ -0,0 +1,319 @@
#!/usr/bin/env python3
"""
Session Search Tool - Long-Term Conversation Recall
Searches past session transcripts in SQLite via FTS5, then summarizes the top
matching sessions using a cheap/fast model (same pattern as web_extract).
Returns focused summaries of past conversations rather than raw transcripts,
keeping the main model's context window clean.
Flow:
1. FTS5 search finds matching messages ranked by relevance
2. Groups by session, takes the top N unique sessions (default 3)
3. Loads each session's conversation, truncates to ~100k chars centered on matches
4. Sends to Gemini Flash with a focused summarization prompt
5. Returns per-session summaries with metadata
"""
import asyncio
import concurrent.futures
import json
import os
import logging
from typing import Dict, Any, List, Optional
from openai import AsyncOpenAI
SUMMARIZER_MODEL = "google/gemini-3-flash-preview"
MAX_SESSION_CHARS = 100_000
MAX_SUMMARY_TOKENS = 2000
_summarizer_client = None
def _get_client() -> AsyncOpenAI:
"""Lazy-init the summarizer client (shared with web_tools pattern)."""
global _summarizer_client
if _summarizer_client is None:
api_key = os.getenv("OPENROUTER_API_KEY")
if not api_key:
raise ValueError("OPENROUTER_API_KEY not set")
_summarizer_client = AsyncOpenAI(
api_key=api_key,
base_url="https://openrouter.ai/api/v1",
)
return _summarizer_client
def _format_conversation(messages: List[Dict[str, Any]]) -> str:
"""Format session messages into a readable transcript for summarization."""
parts = []
for msg in messages:
role = msg.get("role", "unknown").upper()
content = msg.get("content") or ""
tool_name = msg.get("tool_name")
if role == "TOOL" and tool_name:
# Truncate long tool outputs
if len(content) > 500:
content = content[:250] + "\n...[truncated]...\n" + content[-250:]
parts.append(f"[TOOL:{tool_name}]: {content}")
elif role == "ASSISTANT":
# Include tool call names if present
tool_calls = msg.get("tool_calls")
if tool_calls and isinstance(tool_calls, list):
tc_names = []
for tc in tool_calls:
if isinstance(tc, dict):
name = tc.get("name") or tc.get("function", {}).get("name", "?")
tc_names.append(name)
if tc_names:
parts.append(f"[ASSISTANT]: [Called: {', '.join(tc_names)}]")
if content:
parts.append(f"[ASSISTANT]: {content}")
else:
parts.append(f"[ASSISTANT]: {content}")
else:
parts.append(f"[{role}]: {content}")
return "\n\n".join(parts)
def _truncate_around_matches(
full_text: str, query: str, max_chars: int = MAX_SESSION_CHARS
) -> str:
"""
Truncate a conversation transcript to max_chars, centered around
where the query terms appear. Keeps content near matches, trims the edges.
"""
if len(full_text) <= max_chars:
return full_text
# Find the first occurrence of any query term
query_terms = query.lower().split()
text_lower = full_text.lower()
first_match = len(full_text)
for term in query_terms:
pos = text_lower.find(term)
if pos != -1 and pos < first_match:
first_match = pos
if first_match == len(full_text):
# No match found, take from the start
first_match = 0
# Center the window around the first match
half = max_chars // 2
start = max(0, first_match - half)
end = min(len(full_text), start + max_chars)
if end - start < max_chars:
start = max(0, end - max_chars)
truncated = full_text[start:end]
prefix = "...[earlier conversation truncated]...\n\n" if start > 0 else ""
suffix = "\n\n...[later conversation truncated]..." if end < len(full_text) else ""
return prefix + truncated + suffix
async def _summarize_session(
conversation_text: str, query: str, session_meta: Dict[str, Any]
) -> Optional[str]:
"""Summarize a single session conversation focused on the search query."""
system_prompt = (
"You are reviewing a past conversation transcript to help recall what happened. "
"Summarize the conversation with a focus on the search topic. Include:\n"
"1. What the user asked about or wanted to accomplish\n"
"2. What actions were taken and what the outcomes were\n"
"3. Key decisions, solutions found, or conclusions reached\n"
"4. Any specific commands, files, URLs, or technical details that were important\n"
"5. Anything left unresolved or notable\n\n"
"Be thorough but concise. Preserve specific details (commands, paths, error messages) "
"that would be useful to recall. Write in past tense as a factual recap."
)
source = session_meta.get("source", "unknown")
started = session_meta.get("started_at", "unknown")
user_prompt = (
f"Search topic: {query}\n"
f"Session source: {source}\n"
f"Session started: {started}\n\n"
f"CONVERSATION TRANSCRIPT:\n{conversation_text}\n\n"
f"Summarize this conversation with focus on: {query}"
)
max_retries = 3
for attempt in range(max_retries):
try:
response = await _get_client().chat.completions.create(
model=SUMMARIZER_MODEL,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
temperature=0.1,
max_tokens=MAX_SUMMARY_TOKENS,
)
return response.choices[0].message.content.strip()
except Exception as e:
if attempt < max_retries - 1:
await asyncio.sleep(1 * (attempt + 1))
else:
logging.warning(f"Session summarization failed after {max_retries} attempts: {e}")
return None
def session_search(
query: str,
role_filter: str = None,
limit: int = 3,
db=None,
) -> str:
"""
Search past sessions and return focused summaries of matching conversations.
Uses FTS5 to find matches, then summarizes the top sessions with Gemini Flash.
"""
if db is None:
return json.dumps({"success": False, "error": "Session database not available."}, ensure_ascii=False)
if not query or not query.strip():
return json.dumps({"success": False, "error": "Query cannot be empty."}, ensure_ascii=False)
query = query.strip()
limit = min(limit, 5) # Cap at 5 sessions to avoid excessive LLM calls
try:
# Parse role filter
role_list = None
if role_filter and role_filter.strip():
role_list = [r.strip() for r in role_filter.split(",") if r.strip()]
# FTS5 search -- get matches ranked by relevance
raw_results = db.search_messages(
query=query,
role_filter=role_list,
limit=50, # Get more matches to find unique sessions
offset=0,
)
if not raw_results:
return json.dumps({
"success": True,
"query": query,
"results": [],
"count": 0,
"message": "No matching sessions found.",
}, ensure_ascii=False)
# Group by session_id, keep order (highest ranked first)
seen_sessions = {}
for result in raw_results:
sid = result["session_id"]
if sid not in seen_sessions:
seen_sessions[sid] = result
if len(seen_sessions) >= limit:
break
# Summarize each matching session
summaries = []
for session_id, match_info in seen_sessions.items():
try:
# Load full conversation
messages = db.get_messages_as_conversation(session_id)
if not messages:
continue
# Get session metadata
session_meta = db.get_session(session_id) or {}
# Format and truncate
conversation_text = _format_conversation(messages)
conversation_text = _truncate_around_matches(conversation_text, query)
# Summarize with Gemini Flash (handle both async and sync contexts)
coro = _summarize_session(conversation_text, query, session_meta)
try:
asyncio.get_running_loop()
# Already in an async context (gateway) -- run in a thread
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
summary = pool.submit(lambda: asyncio.run(coro)).result(timeout=30)
except RuntimeError:
# No running loop (normal CLI) -- use asyncio.run directly
summary = asyncio.run(coro)
if summary:
summaries.append({
"session_id": session_id,
"source": match_info.get("source", "unknown"),
"model": match_info.get("model"),
"session_started": match_info.get("session_started"),
"summary": summary,
})
except Exception as e:
logging.warning(f"Failed to summarize session {session_id}: {e}")
continue
return json.dumps({
"success": True,
"query": query,
"results": summaries,
"count": len(summaries),
"sessions_searched": len(seen_sessions),
}, ensure_ascii=False)
except Exception as e:
return json.dumps({"success": False, "error": f"Search failed: {str(e)}"}, ensure_ascii=False)
def check_session_search_requirements() -> bool:
"""Requires SQLite state database and OpenRouter API key."""
if not os.getenv("OPENROUTER_API_KEY"):
return False
try:
from hermes_state import DEFAULT_DB_PATH
return DEFAULT_DB_PATH.parent.exists()
except ImportError:
return False
SESSION_SEARCH_SCHEMA = {
"name": "session_search",
"description": (
"Search and recall past conversations. Finds matching sessions using "
"full-text search, then provides a focused summary of each matching "
"conversation.\n\n"
"Use this when you need to recall:\n"
"- A solution or approach from a previous session\n"
"- Something the user said or asked about before\n"
"- A command, file path, or technical detail from past work\n"
"- The outcome of a previous task\n\n"
"Supports search syntax:\n"
" Keywords: docker deployment\n"
" Phrases: '\"exact phrase\"'\n"
" Boolean: docker OR kubernetes, python NOT java\n"
" Prefix: deploy*\n\n"
"Returns summaries (not raw transcripts) of the top matching sessions, "
"focused on your search topic. Max 3 sessions per search."
),
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query — keywords, phrases, or boolean expressions to find in past sessions.",
},
"role_filter": {
"type": "string",
"description": "Optional: only search messages from specific roles (comma-separated). E.g. 'user,assistant' to skip tool outputs.",
},
"limit": {
"type": "integer",
"description": "Max sessions to summarize (default: 3, max: 5).",
"default": 3,
},
},
"required": ["query"],
},
}