Merge pull request #1726 from NousResearch/fix/memory-tool-file-locking

fix(memory): concurrent writes silently drop entries — add file locking
This commit is contained in:
Teknium 2026-03-17 04:23:59 -07:00 committed by GitHub
commit 21b823dd3b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -23,11 +23,13 @@ Design:
- Frozen snapshot pattern: system prompt is stable, tool responses show live state
"""
import fcntl
import json
import logging
import os
import re
import tempfile
from contextlib import contextmanager
from pathlib import Path
from typing import Dict, Any, List, Optional
@ -120,14 +122,43 @@ class MemoryStore:
"user": self._render_block("user", self.user_entries),
}
@staticmethod
@contextmanager
def _file_lock(path: Path):
"""Acquire an exclusive file lock for read-modify-write safety.
Uses a separate .lock file so the memory file itself can still be
atomically replaced via os.replace().
"""
lock_path = path.with_suffix(path.suffix + ".lock")
lock_path.parent.mkdir(parents=True, exist_ok=True)
fd = open(lock_path, "w")
try:
fcntl.flock(fd, fcntl.LOCK_EX)
yield
finally:
fcntl.flock(fd, fcntl.LOCK_UN)
fd.close()
@staticmethod
def _path_for(target: str) -> Path:
if target == "user":
return MEMORY_DIR / "USER.md"
return MEMORY_DIR / "MEMORY.md"
def _reload_target(self, target: str):
"""Re-read entries from disk into in-memory state.
Called under file lock to get the latest state before mutating.
"""
fresh = self._read_file(self._path_for(target))
fresh = list(dict.fromkeys(fresh)) # deduplicate
self._set_entries(target, fresh)
def save_to_disk(self, target: str):
"""Persist entries to the appropriate file. Called after every mutation."""
MEMORY_DIR.mkdir(parents=True, exist_ok=True)
if target == "memory":
self._write_file(MEMORY_DIR / "MEMORY.md", self.memory_entries)
elif target == "user":
self._write_file(MEMORY_DIR / "USER.md", self.user_entries)
self._write_file(self._path_for(target), self._entries_for(target))
def _entries_for(self, target: str) -> List[str]:
if target == "user":
@ -162,33 +193,37 @@ class MemoryStore:
if scan_error:
return {"success": False, "error": scan_error}
entries = self._entries_for(target)
limit = self._char_limit(target)
with self._file_lock(self._path_for(target)):
# Re-read from disk under lock to pick up writes from other sessions
self._reload_target(target)
# Reject exact duplicates
if content in entries:
return self._success_response(target, "Entry already exists (no duplicate added).")
entries = self._entries_for(target)
limit = self._char_limit(target)
# Calculate what the new total would be
new_entries = entries + [content]
new_total = len(ENTRY_DELIMITER.join(new_entries))
# Reject exact duplicates
if content in entries:
return self._success_response(target, "Entry already exists (no duplicate added).")
if new_total > limit:
current = self._char_count(target)
return {
"success": False,
"error": (
f"Memory at {current:,}/{limit:,} chars. "
f"Adding this entry ({len(content)} chars) would exceed the limit. "
f"Replace or remove existing entries first."
),
"current_entries": entries,
"usage": f"{current:,}/{limit:,}",
}
# Calculate what the new total would be
new_entries = entries + [content]
new_total = len(ENTRY_DELIMITER.join(new_entries))
entries.append(content)
self._set_entries(target, entries)
self.save_to_disk(target)
if new_total > limit:
current = self._char_count(target)
return {
"success": False,
"error": (
f"Memory at {current:,}/{limit:,} chars. "
f"Adding this entry ({len(content)} chars) would exceed the limit. "
f"Replace or remove existing entries first."
),
"current_entries": entries,
"usage": f"{current:,}/{limit:,}",
}
entries.append(content)
self._set_entries(target, entries)
self.save_to_disk(target)
return self._success_response(target, "Entry added.")
@ -206,44 +241,47 @@ class MemoryStore:
if scan_error:
return {"success": False, "error": scan_error}
entries = self._entries_for(target)
matches = [(i, e) for i, e in enumerate(entries) if old_text in e]
with self._file_lock(self._path_for(target)):
self._reload_target(target)
if len(matches) == 0:
return {"success": False, "error": f"No entry matched '{old_text}'."}
entries = self._entries_for(target)
matches = [(i, e) for i, e in enumerate(entries) if old_text in e]
if len(matches) > 1:
# If all matches are identical (exact duplicates), operate on the first one
unique_texts = set(e for _, e in matches)
if len(unique_texts) > 1:
previews = [e[:80] + ("..." if len(e) > 80 else "") for _, e in matches]
if len(matches) == 0:
return {"success": False, "error": f"No entry matched '{old_text}'."}
if len(matches) > 1:
# If all matches are identical (exact duplicates), operate on the first one
unique_texts = set(e for _, e in matches)
if len(unique_texts) > 1:
previews = [e[:80] + ("..." if len(e) > 80 else "") for _, e in matches]
return {
"success": False,
"error": f"Multiple entries matched '{old_text}'. Be more specific.",
"matches": previews,
}
# All identical -- safe to replace just the first
idx = matches[0][0]
limit = self._char_limit(target)
# Check that replacement doesn't blow the budget
test_entries = entries.copy()
test_entries[idx] = new_content
new_total = len(ENTRY_DELIMITER.join(test_entries))
if new_total > limit:
return {
"success": False,
"error": f"Multiple entries matched '{old_text}'. Be more specific.",
"matches": previews,
"error": (
f"Replacement would put memory at {new_total:,}/{limit:,} chars. "
f"Shorten the new content or remove other entries first."
),
}
# All identical -- safe to replace just the first
idx = matches[0][0]
limit = self._char_limit(target)
# Check that replacement doesn't blow the budget
test_entries = entries.copy()
test_entries[idx] = new_content
new_total = len(ENTRY_DELIMITER.join(test_entries))
if new_total > limit:
return {
"success": False,
"error": (
f"Replacement would put memory at {new_total:,}/{limit:,} chars. "
f"Shorten the new content or remove other entries first."
),
}
entries[idx] = new_content
self._set_entries(target, entries)
self.save_to_disk(target)
entries[idx] = new_content
self._set_entries(target, entries)
self.save_to_disk(target)
return self._success_response(target, "Entry replaced.")
@ -253,28 +291,31 @@ class MemoryStore:
if not old_text:
return {"success": False, "error": "old_text cannot be empty."}
entries = self._entries_for(target)
matches = [(i, e) for i, e in enumerate(entries) if old_text in e]
with self._file_lock(self._path_for(target)):
self._reload_target(target)
if len(matches) == 0:
return {"success": False, "error": f"No entry matched '{old_text}'."}
entries = self._entries_for(target)
matches = [(i, e) for i, e in enumerate(entries) if old_text in e]
if len(matches) > 1:
# If all matches are identical (exact duplicates), remove the first one
unique_texts = set(e for _, e in matches)
if len(unique_texts) > 1:
previews = [e[:80] + ("..." if len(e) > 80 else "") for _, e in matches]
return {
"success": False,
"error": f"Multiple entries matched '{old_text}'. Be more specific.",
"matches": previews,
}
# All identical -- safe to remove just the first
if len(matches) == 0:
return {"success": False, "error": f"No entry matched '{old_text}'."}
idx = matches[0][0]
entries.pop(idx)
self._set_entries(target, entries)
self.save_to_disk(target)
if len(matches) > 1:
# If all matches are identical (exact duplicates), remove the first one
unique_texts = set(e for _, e in matches)
if len(unique_texts) > 1:
previews = [e[:80] + ("..." if len(e) > 80 else "") for _, e in matches]
return {
"success": False,
"error": f"Multiple entries matched '{old_text}'. Be more specific.",
"matches": previews,
}
# All identical -- safe to remove just the first
idx = matches[0][0]
entries.pop(idx)
self._set_entries(target, entries)
self.save_to_disk(target)
return self._success_response(target, "Entry removed.")