fix: improve read-loop detection — consecutive-only, correct thresholds, fix bugs

Follow-up to PR #705 (merged from 0xbyt4). Addresses several issues:

1. CONSECUTIVE-ONLY TRACKING: Redesigned the read/search tracker to only
   warn/block on truly consecutive identical calls. Any other tool call
   in between (write, patch, terminal, etc.) resets the counter via
   notify_other_tool_call(), called from handle_function_call() in
   model_tools.py. This prevents false blocks in read→edit→verify flows.

2. THRESHOLD ADJUSTMENT: Warn on 3rd consecutive (was 2nd), block on
   4th+ consecutive (was 3rd+). Gives the model more room before
   intervening.

3. TUPLE UNPACKING BUG: Fixed get_read_files_summary() which crashed on
   search keys (5-tuple) when trying to unpack as 3-tuple. Now uses a
   separate read_history set that only tracks file reads.

4. WEB_EXTRACT DOCSTRING: Reverted incorrect removal of 'title' from
   web_extract return docs in code_execution_tool.py — the field IS
   returned by web_tools.py.

5. TESTS: Rewrote test_read_loop_detection.py (35 tests) to cover
   consecutive-only behavior, notify_other_tool_call, interleaved
   read/search, and summary-unaffected-by-searches.
This commit is contained in:
teknium1 2026-03-10 16:25:41 -07:00
parent b53d5dad67
commit a458b535c9
4 changed files with 223 additions and 55 deletions

View file

@ -284,6 +284,16 @@ def handle_function_call(
Returns: Returns:
Function result as a JSON string. Function result as a JSON string.
""" """
# Notify the read-loop tracker when a non-read/search tool runs,
# so the *consecutive* counter resets (reads after other work are fine).
_READ_SEARCH_TOOLS = {"read_file", "search_files"}
if function_name not in _READ_SEARCH_TOOLS:
try:
from tools.file_tools import notify_other_tool_call
notify_other_tool_call(task_id or "default")
except Exception:
pass # file_tools may not be loaded yet
try: try:
if function_name in _AGENT_LOOP_TOOLS: if function_name in _AGENT_LOOP_TOOLS:
return json.dumps({"error": f"{function_name} must be handled by the agent loop"}) return json.dumps({"error": f"{function_name} must be handled by the agent loop"})

View file

@ -3,12 +3,14 @@
Tests for the read-loop detection mechanism in file_tools. Tests for the read-loop detection mechanism in file_tools.
Verifies that: Verifies that:
1. Re-reading the same file region produces a warning 1. Only *consecutive* identical reads trigger warnings/blocks
2. Different regions/files don't trigger false warnings 2. Any other tool call in between resets the consecutive counter
3. Task isolation works (different tasks have separate trackers) 3. Warn on 3rd consecutive, block on 4th+
4. get_read_files_summary returns accurate history 4. Different regions/files/tasks don't trigger false warnings
5. clear_read_tracker resets state 5. get_read_files_summary returns accurate history (unaffected by search keys)
6. Context compression injects file-read history 6. clear_read_tracker resets state
7. notify_other_tool_call resets consecutive counters
8. Context compression injects file-read history
Run with: python -m pytest tests/tools/test_read_loop_detection.py -v Run with: python -m pytest tests/tools/test_read_loop_detection.py -v
""" """
@ -22,6 +24,7 @@ from tools.file_tools import (
search_tool, search_tool,
get_read_files_summary, get_read_files_summary,
clear_read_tracker, clear_read_tracker,
notify_other_tool_call,
_read_tracker, _read_tracker,
) )
@ -57,7 +60,7 @@ def _make_fake_file_ops():
class TestReadLoopDetection(unittest.TestCase): class TestReadLoopDetection(unittest.TestCase):
"""Verify that read_file_tool detects and warns on re-reads.""" """Verify that read_file_tool detects and warns on consecutive re-reads."""
def setUp(self): def setUp(self):
clear_read_tracker() clear_read_tracker()
@ -72,50 +75,68 @@ class TestReadLoopDetection(unittest.TestCase):
self.assertIn("content", result) self.assertIn("content", result)
@patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops()) @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
def test_second_read_same_region_has_warning(self, _mock_ops): def test_second_consecutive_read_no_warning(self, _mock_ops):
"""2nd consecutive read should NOT warn (threshold is 3)."""
read_file_tool("/tmp/test.py", offset=1, limit=500, task_id="t1") read_file_tool("/tmp/test.py", offset=1, limit=500, task_id="t1")
result = json.loads( result = json.loads(
read_file_tool("/tmp/test.py", offset=1, limit=500, task_id="t1") read_file_tool("/tmp/test.py", offset=1, limit=500, task_id="t1")
) )
self.assertIn("_warning", result) self.assertNotIn("_warning", result)
self.assertIn("already read", result["_warning"]) self.assertIn("content", result)
self.assertIn("2 times", result["_warning"])
@patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops()) @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
def test_third_read_is_blocked(self, _mock_ops): def test_third_consecutive_read_has_warning(self, _mock_ops):
"""3rd read of the same region returns error, no content.""" """3rd consecutive read of the same region triggers a warning."""
for _ in range(2): for _ in range(2):
read_file_tool("/tmp/test.py", task_id="t1") read_file_tool("/tmp/test.py", task_id="t1")
result = json.loads(read_file_tool("/tmp/test.py", task_id="t1")) result = json.loads(read_file_tool("/tmp/test.py", task_id="t1"))
self.assertIn("_warning", result)
self.assertIn("3 times", result["_warning"])
# Warning still returns content
self.assertIn("content", result)
@patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
def test_fourth_consecutive_read_is_blocked(self, _mock_ops):
"""4th consecutive read of the same region is BLOCKED — no content."""
for _ in range(3):
read_file_tool("/tmp/test.py", task_id="t1")
result = json.loads(read_file_tool("/tmp/test.py", task_id="t1"))
self.assertIn("error", result) self.assertIn("error", result)
self.assertIn("BLOCKED", result["error"]) self.assertIn("BLOCKED", result["error"])
self.assertIn("4 times", result["error"])
self.assertNotIn("content", result) self.assertNotIn("content", result)
@patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops()) @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
def test_fourth_read_still_blocked(self, _mock_ops): def test_fifth_consecutive_read_still_blocked(self, _mock_ops):
"""Subsequent reads remain blocked with incrementing count.""" """Subsequent reads remain blocked with incrementing count."""
for _ in range(3): for _ in range(4):
read_file_tool("/tmp/test.py", task_id="t1") read_file_tool("/tmp/test.py", task_id="t1")
result = json.loads(read_file_tool("/tmp/test.py", task_id="t1")) result = json.loads(read_file_tool("/tmp/test.py", task_id="t1"))
self.assertIn("BLOCKED", result["error"]) self.assertIn("BLOCKED", result["error"])
self.assertIn("4 times", result["error"]) self.assertIn("5 times", result["error"])
@patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops()) @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
def test_different_region_no_warning(self, _mock_ops): def test_different_region_resets_consecutive(self, _mock_ops):
"""Reading a different region of the same file resets consecutive count."""
read_file_tool("/tmp/test.py", offset=1, limit=500, task_id="t1") read_file_tool("/tmp/test.py", offset=1, limit=500, task_id="t1")
read_file_tool("/tmp/test.py", offset=1, limit=500, task_id="t1")
# Now read a different region — this resets the consecutive counter
result = json.loads( result = json.loads(
read_file_tool("/tmp/test.py", offset=501, limit=500, task_id="t1") read_file_tool("/tmp/test.py", offset=501, limit=500, task_id="t1")
) )
self.assertNotIn("_warning", result) self.assertNotIn("_warning", result)
@patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops()) @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
def test_different_file_no_warning(self, _mock_ops): def test_different_file_resets_consecutive(self, _mock_ops):
"""Reading a different file resets the consecutive counter."""
read_file_tool("/tmp/a.py", task_id="t1")
read_file_tool("/tmp/a.py", task_id="t1") read_file_tool("/tmp/a.py", task_id="t1")
result = json.loads(read_file_tool("/tmp/b.py", task_id="t1")) result = json.loads(read_file_tool("/tmp/b.py", task_id="t1"))
self.assertNotIn("_warning", result) self.assertNotIn("_warning", result)
@patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops()) @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
def test_different_tasks_isolated(self, _mock_ops): def test_different_tasks_isolated(self, _mock_ops):
"""Different task_ids have separate consecutive counters."""
read_file_tool("/tmp/test.py", task_id="task_a") read_file_tool("/tmp/test.py", task_id="task_a")
result = json.loads( result = json.loads(
read_file_tool("/tmp/test.py", task_id="task_b") read_file_tool("/tmp/test.py", task_id="task_b")
@ -124,14 +145,63 @@ class TestReadLoopDetection(unittest.TestCase):
@patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops()) @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
def test_warning_still_returns_content(self, _mock_ops): def test_warning_still_returns_content(self, _mock_ops):
"""Even with a warning, the file content is still returned.""" """Even with a warning (3rd read), the file content is still returned."""
read_file_tool("/tmp/test.py", task_id="t1") for _ in range(2):
read_file_tool("/tmp/test.py", task_id="t1")
result = json.loads(read_file_tool("/tmp/test.py", task_id="t1")) result = json.loads(read_file_tool("/tmp/test.py", task_id="t1"))
self.assertIn("_warning", result) self.assertIn("_warning", result)
self.assertIn("content", result) self.assertIn("content", result)
self.assertIn("content of /tmp/test.py", result["content"]) self.assertIn("content of /tmp/test.py", result["content"])
class TestNotifyOtherToolCall(unittest.TestCase):
"""Verify that notify_other_tool_call resets the consecutive counter."""
def setUp(self):
clear_read_tracker()
def tearDown(self):
clear_read_tracker()
@patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
def test_other_tool_resets_consecutive(self, _mock_ops):
"""After another tool runs, re-reading the same file is NOT consecutive."""
read_file_tool("/tmp/test.py", task_id="t1")
read_file_tool("/tmp/test.py", task_id="t1")
# Simulate a different tool being called
notify_other_tool_call("t1")
# This should be treated as a fresh read (consecutive reset)
result = json.loads(read_file_tool("/tmp/test.py", task_id="t1"))
self.assertNotIn("_warning", result)
self.assertIn("content", result)
@patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
def test_other_tool_prevents_block(self, _mock_ops):
"""Agent can keep reading if other tools are used in between."""
for i in range(10):
read_file_tool("/tmp/test.py", task_id="t1")
notify_other_tool_call("t1")
# After 10 reads interleaved with other tools, still no warning
result = json.loads(read_file_tool("/tmp/test.py", task_id="t1"))
self.assertNotIn("_warning", result)
self.assertNotIn("error", result)
self.assertIn("content", result)
@patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
def test_notify_on_unknown_task_is_safe(self, _mock_ops):
"""notify_other_tool_call on a task that hasn't read anything is a no-op."""
notify_other_tool_call("nonexistent_task") # Should not raise
@patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
def test_history_survives_notify(self, _mock_ops):
"""notify_other_tool_call resets consecutive but preserves read_history."""
read_file_tool("/tmp/test.py", offset=1, limit=100, task_id="t1")
notify_other_tool_call("t1")
summary = get_read_files_summary("t1")
self.assertEqual(len(summary), 1)
self.assertEqual(summary[0]["path"], "/tmp/test.py")
class TestReadFilesSummary(unittest.TestCase): class TestReadFilesSummary(unittest.TestCase):
"""Verify get_read_files_summary returns accurate file-read history.""" """Verify get_read_files_summary returns accurate file-read history."""
@ -183,6 +253,15 @@ class TestReadFilesSummary(unittest.TestCase):
self.assertEqual(len(summary_b), 1) self.assertEqual(len(summary_b), 1)
self.assertEqual(summary_b[0]["path"], "/tmp/b.py") self.assertEqual(summary_b[0]["path"], "/tmp/b.py")
@patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
def test_summary_unaffected_by_searches(self, _mock_ops):
"""Searches should NOT appear in the file-read summary."""
read_file_tool("/tmp/test.py", task_id="t1")
search_tool("def main", task_id="t1")
summary = get_read_files_summary("t1")
self.assertEqual(len(summary), 1)
self.assertEqual(summary[0]["path"], "/tmp/test.py")
class TestClearReadTracker(unittest.TestCase): class TestClearReadTracker(unittest.TestCase):
"""Verify clear_read_tracker resets state properly.""" """Verify clear_read_tracker resets state properly."""
@ -211,10 +290,12 @@ class TestClearReadTracker(unittest.TestCase):
@patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops()) @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
def test_clear_then_reread_no_warning(self, _mock_ops): def test_clear_then_reread_no_warning(self, _mock_ops):
read_file_tool("/tmp/test.py", task_id="t1") for _ in range(3):
read_file_tool("/tmp/test.py", task_id="t1")
clear_read_tracker("t1") clear_read_tracker("t1")
result = json.loads(read_file_tool("/tmp/test.py", task_id="t1")) result = json.loads(read_file_tool("/tmp/test.py", task_id="t1"))
self.assertNotIn("_warning", result) self.assertNotIn("_warning", result)
self.assertNotIn("error", result)
class TestCompressionFileHistory(unittest.TestCase): class TestCompressionFileHistory(unittest.TestCase):
@ -256,7 +337,7 @@ class TestCompressionFileHistory(unittest.TestCase):
{"role": "user", "content": "[CONTEXT SUMMARY]: Files were analyzed."}, {"role": "user", "content": "[CONTEXT SUMMARY]: Files were analyzed."},
messages[-1], # last user messages[-1], # last user
] ]
mock_compressor.last_prompt_tokens = 5000 mock_compressor.last_prompt_tokens = 1000
# Mock the agent's _compress_context dependencies # Mock the agent's _compress_context dependencies
mock_agent = MagicMock() mock_agent = MagicMock()
@ -272,7 +353,7 @@ class TestCompressionFileHistory(unittest.TestCase):
from run_agent import AIAgent from run_agent import AIAgent
result, _ = AIAgent._compress_context( result, _ = AIAgent._compress_context(
mock_agent, messages, "system prompt", mock_agent, messages, "system prompt",
approx_tokens=5000, task_id="compress_test", approx_tokens=1000, task_id="compress_test",
) )
# Find the injected file-read history message # Find the injected file-read history message
@ -291,7 +372,7 @@ class TestCompressionFileHistory(unittest.TestCase):
class TestSearchLoopDetection(unittest.TestCase): class TestSearchLoopDetection(unittest.TestCase):
"""Verify that search_tool detects and blocks repeated searches.""" """Verify that search_tool detects and blocks consecutive repeated searches."""
def setUp(self): def setUp(self):
clear_read_tracker() clear_read_tracker()
@ -306,23 +387,38 @@ class TestSearchLoopDetection(unittest.TestCase):
self.assertNotIn("error", result) self.assertNotIn("error", result)
@patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops()) @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
def test_second_search_has_warning(self, _mock_ops): def test_second_consecutive_search_no_warning(self, _mock_ops):
"""2nd consecutive search should NOT warn (threshold is 3)."""
search_tool("def main", task_id="t1") search_tool("def main", task_id="t1")
result = json.loads(search_tool("def main", task_id="t1")) result = json.loads(search_tool("def main", task_id="t1"))
self.assertIn("_warning", result) self.assertNotIn("_warning", result)
self.assertIn("2 times", result["_warning"]) self.assertNotIn("error", result)
@patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops()) @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
def test_third_search_is_blocked(self, _mock_ops): def test_third_consecutive_search_has_warning(self, _mock_ops):
"""3rd consecutive identical search triggers a warning."""
for _ in range(2): for _ in range(2):
search_tool("def main", task_id="t1") search_tool("def main", task_id="t1")
result = json.loads(search_tool("def main", task_id="t1")) result = json.loads(search_tool("def main", task_id="t1"))
self.assertIn("_warning", result)
self.assertIn("3 times", result["_warning"])
# Warning still returns results
self.assertIn("matches", result)
@patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
def test_fourth_consecutive_search_is_blocked(self, _mock_ops):
"""4th consecutive identical search is BLOCKED."""
for _ in range(3):
search_tool("def main", task_id="t1")
result = json.loads(search_tool("def main", task_id="t1"))
self.assertIn("error", result) self.assertIn("error", result)
self.assertIn("BLOCKED", result["error"]) self.assertIn("BLOCKED", result["error"])
self.assertNotIn("matches", result) self.assertNotIn("matches", result)
@patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops()) @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
def test_different_pattern_no_warning(self, _mock_ops): def test_different_pattern_resets_consecutive(self, _mock_ops):
"""A different search pattern resets the consecutive counter."""
search_tool("def main", task_id="t1")
search_tool("def main", task_id="t1") search_tool("def main", task_id="t1")
result = json.loads(search_tool("class Foo", task_id="t1")) result = json.loads(search_tool("class Foo", task_id="t1"))
self.assertNotIn("_warning", result) self.assertNotIn("_warning", result)
@ -330,10 +426,32 @@ class TestSearchLoopDetection(unittest.TestCase):
@patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops()) @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
def test_different_task_isolated(self, _mock_ops): def test_different_task_isolated(self, _mock_ops):
"""Different tasks have separate consecutive counters."""
search_tool("def main", task_id="t1") search_tool("def main", task_id="t1")
result = json.loads(search_tool("def main", task_id="t2")) result = json.loads(search_tool("def main", task_id="t2"))
self.assertNotIn("_warning", result) self.assertNotIn("_warning", result)
@patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
def test_other_tool_resets_search_consecutive(self, _mock_ops):
"""notify_other_tool_call resets search consecutive counter too."""
search_tool("def main", task_id="t1")
search_tool("def main", task_id="t1")
notify_other_tool_call("t1")
result = json.loads(search_tool("def main", task_id="t1"))
self.assertNotIn("_warning", result)
self.assertNotIn("error", result)
@patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
def test_read_between_searches_resets_consecutive(self, _mock_ops):
"""A read_file call between searches resets search consecutive counter."""
search_tool("def main", task_id="t1")
search_tool("def main", task_id="t1")
# A read changes the last_key, resetting consecutive for the search
read_file_tool("/tmp/test.py", task_id="t1")
result = json.loads(search_tool("def main", task_id="t1"))
self.assertNotIn("_warning", result)
self.assertNotIn("error", result)
class TestTodoInjectionFiltering(unittest.TestCase): class TestTodoInjectionFiltering(unittest.TestCase):
"""Verify that format_for_injection filters completed/cancelled todos.""" """Verify that format_for_injection filters completed/cancelled todos."""

View file

@ -78,7 +78,7 @@ _TOOL_STUBS = {
"web_extract": ( "web_extract": (
"web_extract", "web_extract",
"urls: list", "urls: list",
'"""Extract content from URLs. Returns dict with results list of {url, content, error}."""', '"""Extract content from URLs. Returns dict with results list of {url, title, content, error}."""',
'{"urls": urls}', '{"urls": urls}',
), ),
"read_file": ( "read_file": (
@ -616,7 +616,7 @@ _TOOL_DOC_LINES = [
" Returns {\"data\": {\"web\": [{\"url\", \"title\", \"description\"}, ...]}}"), " Returns {\"data\": {\"web\": [{\"url\", \"title\", \"description\"}, ...]}}"),
("web_extract", ("web_extract",
" web_extract(urls: list[str]) -> dict\n" " web_extract(urls: list[str]) -> dict\n"
" Returns {\"results\": [{\"url\", \"content\", \"error\"}, ...]} where content is markdown"), " Returns {\"results\": [{\"url\", \"title\", \"content\", \"error\"}, ...]} where content is markdown"),
("read_file", ("read_file",
" read_file(path: str, offset: int = 1, limit: int = 500) -> dict\n" " read_file(path: str, offset: int = 1, limit: int = 500) -> dict\n"
" Lines are 1-indexed. Returns {\"content\": \"...\", \"total_lines\": N}"), " Lines are 1-indexed. Returns {\"content\": \"...\", \"total_lines\": N}"),

View file

@ -15,7 +15,10 @@ _file_ops_lock = threading.Lock()
_file_ops_cache: dict = {} _file_ops_cache: dict = {}
# Track files read per task to detect re-read loops after context compression. # Track files read per task to detect re-read loops after context compression.
# Key: task_id, Value: dict mapping (path, offset, limit) -> read count # Per task_id we store:
# "last_key": the key of the most recent read/search call (or None)
# "consecutive": how many times that exact call has been repeated in a row
# "read_history": set of (path, offset, limit) tuples for get_read_files_summary
_read_tracker_lock = threading.Lock() _read_tracker_lock = threading.Lock()
_read_tracker: dict = {} _read_tracker: dict = {}
@ -139,28 +142,37 @@ def read_file_tool(path: str, offset: int = 1, limit: int = 500, task_id: str =
result.content = redact_sensitive_text(result.content) result.content = redact_sensitive_text(result.content)
result_dict = result.to_dict() result_dict = result.to_dict()
# Track reads to detect re-read loops (e.g. after context compression) # Track reads to detect *consecutive* re-read loops.
read_key = (path, offset, limit) # The counter resets whenever any other tool is called in between,
# so only truly back-to-back identical reads trigger warnings/blocks.
read_key = ("read", path, offset, limit)
with _read_tracker_lock: with _read_tracker_lock:
task_reads = _read_tracker.setdefault(task_id, {}) task_data = _read_tracker.setdefault(task_id, {
task_reads[read_key] = task_reads.get(read_key, 0) + 1 "last_key": None, "consecutive": 0, "read_history": set(),
count = task_reads[read_key] })
task_data["read_history"].add((path, offset, limit))
if task_data["last_key"] == read_key:
task_data["consecutive"] += 1
else:
task_data["last_key"] = read_key
task_data["consecutive"] = 1
count = task_data["consecutive"]
if count >= 3: if count >= 4:
# Hard block: stop returning content to break the loop # Hard block: stop returning content to break the loop
return json.dumps({ return json.dumps({
"error": ( "error": (
f"BLOCKED: You have read this exact file region {count} times. " f"BLOCKED: You have read this exact file region {count} times in a row. "
"The content has NOT changed. You already have this information. " "The content has NOT changed. You already have this information. "
"STOP re-reading and proceed with your task." "STOP re-reading and proceed with your task."
), ),
"path": path, "path": path,
"already_read": count, "already_read": count,
}, ensure_ascii=False) }, ensure_ascii=False)
elif count > 1: elif count >= 3:
result_dict["_warning"] = ( result_dict["_warning"] = (
f"You have already read this exact file region {count} times in this session. " f"You have read this exact file region {count} times consecutively. "
"The content has not changed. Use the information you already have instead of re-reading. " "The content has not changed since your last read. Use the information you already have. "
"If you are stuck in a loop, stop reading and proceed with writing or responding." "If you are stuck in a loop, stop reading and proceed with writing or responding."
) )
@ -176,9 +188,10 @@ def get_read_files_summary(task_id: str = "default") -> list:
compression boundaries. compression boundaries.
""" """
with _read_tracker_lock: with _read_tracker_lock:
task_reads = _read_tracker.get(task_id, {}) task_data = _read_tracker.get(task_id, {})
seen_paths = {} read_history = task_data.get("read_history", set())
for (path, offset, limit), count in task_reads.items(): seen_paths: dict = {}
for (path, offset, limit) in read_history:
if path not in seen_paths: if path not in seen_paths:
seen_paths[path] = [] seen_paths[path] = []
seen_paths[path].append(f"lines {offset}-{offset + limit - 1}") seen_paths[path].append(f"lines {offset}-{offset + limit - 1}")
@ -189,7 +202,12 @@ def get_read_files_summary(task_id: str = "default") -> list:
def clear_read_tracker(task_id: str = None): def clear_read_tracker(task_id: str = None):
"""Clear the read tracker. Called when starting a new conversation.""" """Clear the read tracker.
Call with a task_id to clear just that task, or without to clear all.
Should be called when a session is destroyed to prevent memory leaks
in long-running gateway processes.
"""
with _read_tracker_lock: with _read_tracker_lock:
if task_id: if task_id:
_read_tracker.pop(task_id, None) _read_tracker.pop(task_id, None)
@ -197,6 +215,22 @@ def clear_read_tracker(task_id: str = None):
_read_tracker.clear() _read_tracker.clear()
def notify_other_tool_call(task_id: str = "default"):
"""Reset consecutive read/search counter for a task.
Called by the tool dispatcher (model_tools.py) whenever a tool OTHER
than read_file / search_files is executed. This ensures we only warn
or block on *truly consecutive* repeated reads if the agent does
anything else in between (write, patch, terminal, etc.) the counter
resets and the next read is treated as fresh.
"""
with _read_tracker_lock:
task_data = _read_tracker.get(task_id)
if task_data:
task_data["last_key"] = None
task_data["consecutive"] = 0
def write_file_tool(path: str, content: str, task_id: str = "default") -> str: def write_file_tool(path: str, content: str, task_id: str = "default") -> str:
"""Write content to a file.""" """Write content to a file."""
try: try:
@ -245,17 +279,23 @@ def search_tool(pattern: str, target: str = "content", path: str = ".",
task_id: str = "default") -> str: task_id: str = "default") -> str:
"""Search for content or files.""" """Search for content or files."""
try: try:
# Track searches to detect repeated search loops # Track searches to detect *consecutive* repeated search loops.
search_key = ("search", pattern, target, path, file_glob or "") search_key = ("search", pattern, target, str(path), file_glob or "")
with _read_tracker_lock: with _read_tracker_lock:
task_reads = _read_tracker.setdefault(task_id, {}) task_data = _read_tracker.setdefault(task_id, {
task_reads[search_key] = task_reads.get(search_key, 0) + 1 "last_key": None, "consecutive": 0, "read_history": set(),
count = task_reads[search_key] })
if task_data["last_key"] == search_key:
task_data["consecutive"] += 1
else:
task_data["last_key"] = search_key
task_data["consecutive"] = 1
count = task_data["consecutive"]
if count >= 3: if count >= 4:
return json.dumps({ return json.dumps({
"error": ( "error": (
f"BLOCKED: You have run this exact search {count} times. " f"BLOCKED: You have run this exact search {count} times in a row. "
"The results have NOT changed. You already have this information. " "The results have NOT changed. You already have this information. "
"STOP re-searching and proceed with your task." "STOP re-searching and proceed with your task."
), ),
@ -274,9 +314,9 @@ def search_tool(pattern: str, target: str = "content", path: str = ".",
m.content = redact_sensitive_text(m.content) m.content = redact_sensitive_text(m.content)
result_dict = result.to_dict() result_dict = result.to_dict()
if count > 1: if count >= 3:
result_dict["_warning"] = ( result_dict["_warning"] = (
f"You have run this exact search {count} times in this session. " f"You have run this exact search {count} times consecutively. "
"The results have not changed. Use the information you already have." "The results have not changed. Use the information you already have."
) )