Merge PR #616: fix: retry with rebuilt payload after compression
Authored by tripledoublev. After context compression on 413/400 errors, the inner retry loop was reusing the stale pre-compression api_messages payload. Fix breaks out of the inner retry loop so the outer loop rebuilds api_messages from the now-compressed messages list. Adds regression test verifying the second request actually contains the compressed payload.
This commit is contained in:
commit
899dfdcfb9
2 changed files with 65 additions and 8 deletions
24
run_agent.py
24
run_agent.py
|
|
@ -3284,7 +3284,7 @@ class AIAgent:
|
||||||
api_messages = []
|
api_messages = []
|
||||||
for msg in messages:
|
for msg in messages:
|
||||||
api_msg = msg.copy()
|
api_msg = msg.copy()
|
||||||
|
|
||||||
# For ALL assistant messages, pass reasoning back to the API
|
# For ALL assistant messages, pass reasoning back to the API
|
||||||
# This ensures multi-turn reasoning context is preserved
|
# This ensures multi-turn reasoning context is preserved
|
||||||
if msg.get("role") == "assistant":
|
if msg.get("role") == "assistant":
|
||||||
|
|
@ -3292,7 +3292,7 @@ class AIAgent:
|
||||||
if reasoning_text:
|
if reasoning_text:
|
||||||
# Add reasoning_content for API compatibility (Moonshot AI, Novita, OpenRouter)
|
# Add reasoning_content for API compatibility (Moonshot AI, Novita, OpenRouter)
|
||||||
api_msg["reasoning_content"] = reasoning_text
|
api_msg["reasoning_content"] = reasoning_text
|
||||||
|
|
||||||
# Remove 'reasoning' field - it's for trajectory storage only
|
# Remove 'reasoning' field - it's for trajectory storage only
|
||||||
# We've copied it to 'reasoning_content' for the API above
|
# We've copied it to 'reasoning_content' for the API above
|
||||||
if "reasoning" in api_msg:
|
if "reasoning" in api_msg:
|
||||||
|
|
@ -3303,7 +3303,7 @@ class AIAgent:
|
||||||
# Keep 'reasoning_details' - OpenRouter uses this for multi-turn reasoning context
|
# Keep 'reasoning_details' - OpenRouter uses this for multi-turn reasoning context
|
||||||
# The signature field helps maintain reasoning continuity
|
# The signature field helps maintain reasoning continuity
|
||||||
api_messages.append(api_msg)
|
api_messages.append(api_msg)
|
||||||
|
|
||||||
# Build the final system message: cached prompt + ephemeral system prompt.
|
# Build the final system message: cached prompt + ephemeral system prompt.
|
||||||
# The ephemeral part is appended here (not baked into the cached prompt)
|
# The ephemeral part is appended here (not baked into the cached prompt)
|
||||||
# so it stays out of the session DB and logs.
|
# so it stays out of the session DB and logs.
|
||||||
|
|
@ -3316,21 +3316,21 @@ class AIAgent:
|
||||||
effective_system = (effective_system + "\n\n" + self.ephemeral_system_prompt).strip()
|
effective_system = (effective_system + "\n\n" + self.ephemeral_system_prompt).strip()
|
||||||
if effective_system:
|
if effective_system:
|
||||||
api_messages = [{"role": "system", "content": effective_system}] + api_messages
|
api_messages = [{"role": "system", "content": effective_system}] + api_messages
|
||||||
|
|
||||||
# Inject ephemeral prefill messages right after the system prompt
|
# Inject ephemeral prefill messages right after the system prompt
|
||||||
# but before conversation history. Same API-call-time-only pattern.
|
# but before conversation history. Same API-call-time-only pattern.
|
||||||
if self.prefill_messages:
|
if self.prefill_messages:
|
||||||
sys_offset = 1 if effective_system else 0
|
sys_offset = 1 if effective_system else 0
|
||||||
for idx, pfm in enumerate(self.prefill_messages):
|
for idx, pfm in enumerate(self.prefill_messages):
|
||||||
api_messages.insert(sys_offset + idx, pfm.copy())
|
api_messages.insert(sys_offset + idx, pfm.copy())
|
||||||
|
|
||||||
# Apply Anthropic prompt caching for Claude models via OpenRouter.
|
# Apply Anthropic prompt caching for Claude models via OpenRouter.
|
||||||
# Auto-detected: if model name contains "claude" and base_url is OpenRouter,
|
# Auto-detected: if model name contains "claude" and base_url is OpenRouter,
|
||||||
# inject cache_control breakpoints (system + last 3 messages) to reduce
|
# inject cache_control breakpoints (system + last 3 messages) to reduce
|
||||||
# input token costs by ~75% on multi-turn conversations.
|
# input token costs by ~75% on multi-turn conversations.
|
||||||
if self._use_prompt_caching:
|
if self._use_prompt_caching:
|
||||||
api_messages = apply_anthropic_cache_control(api_messages, cache_ttl=self._cache_ttl)
|
api_messages = apply_anthropic_cache_control(api_messages, cache_ttl=self._cache_ttl)
|
||||||
|
|
||||||
# Safety net: strip orphaned tool results / add stubs for missing
|
# Safety net: strip orphaned tool results / add stubs for missing
|
||||||
# results before sending to the API. The compressor handles this
|
# results before sending to the API. The compressor handles this
|
||||||
# during compression, but orphans can also sneak in from session
|
# during compression, but orphans can also sneak in from session
|
||||||
|
|
@ -3374,6 +3374,7 @@ class AIAgent:
|
||||||
max_compression_attempts = 3
|
max_compression_attempts = 3
|
||||||
codex_auth_retry_attempted = False
|
codex_auth_retry_attempted = False
|
||||||
nous_auth_retry_attempted = False
|
nous_auth_retry_attempted = False
|
||||||
|
restart_with_compressed_messages = False
|
||||||
|
|
||||||
finish_reason = "stop"
|
finish_reason = "stop"
|
||||||
response = None # Guard against UnboundLocalError if all retries fail
|
response = None # Guard against UnboundLocalError if all retries fail
|
||||||
|
|
@ -3707,7 +3708,8 @@ class AIAgent:
|
||||||
if len(messages) < original_len:
|
if len(messages) < original_len:
|
||||||
print(f"{self.log_prefix} 🗜️ Compressed {original_len} → {len(messages)} messages, retrying...")
|
print(f"{self.log_prefix} 🗜️ Compressed {original_len} → {len(messages)} messages, retrying...")
|
||||||
time.sleep(2) # Brief pause between compression retries
|
time.sleep(2) # Brief pause between compression retries
|
||||||
continue # Retry with compressed messages
|
restart_with_compressed_messages = True
|
||||||
|
break
|
||||||
else:
|
else:
|
||||||
print(f"{self.log_prefix}❌ Payload too large and cannot compress further.")
|
print(f"{self.log_prefix}❌ Payload too large and cannot compress further.")
|
||||||
logging.error(f"{self.log_prefix}413 payload too large. Cannot compress further.")
|
logging.error(f"{self.log_prefix}413 payload too large. Cannot compress further.")
|
||||||
|
|
@ -3775,7 +3777,8 @@ class AIAgent:
|
||||||
if len(messages) < original_len:
|
if len(messages) < original_len:
|
||||||
print(f"{self.log_prefix} 🗜️ Compressed {original_len} → {len(messages)} messages, retrying...")
|
print(f"{self.log_prefix} 🗜️ Compressed {original_len} → {len(messages)} messages, retrying...")
|
||||||
time.sleep(2) # Brief pause between compression retries
|
time.sleep(2) # Brief pause between compression retries
|
||||||
continue # Retry with compressed messages or new tier
|
restart_with_compressed_messages = True
|
||||||
|
break
|
||||||
else:
|
else:
|
||||||
# Can't compress further and already at minimum tier
|
# Can't compress further and already at minimum tier
|
||||||
print(f"{self.log_prefix}❌ Context length exceeded and cannot compress further.")
|
print(f"{self.log_prefix}❌ Context length exceeded and cannot compress further.")
|
||||||
|
|
@ -3862,6 +3865,11 @@ class AIAgent:
|
||||||
if interrupted:
|
if interrupted:
|
||||||
break
|
break
|
||||||
|
|
||||||
|
if restart_with_compressed_messages:
|
||||||
|
api_call_count -= 1
|
||||||
|
self.iteration_budget.refund()
|
||||||
|
continue
|
||||||
|
|
||||||
# Guard: if all retries exhausted without a successful response
|
# Guard: if all retries exhausted without a successful response
|
||||||
# (e.g. repeated context-length errors that exhausted retry_count),
|
# (e.g. repeated context-length errors that exhausted retry_count),
|
||||||
# the `response` variable is still None. Break out cleanly.
|
# the `response` variable is still None. Break out cleanly.
|
||||||
|
|
|
||||||
|
|
@ -234,6 +234,55 @@ class TestHTTP413Compression:
|
||||||
mock_compress.assert_called_once()
|
mock_compress.assert_called_once()
|
||||||
assert result["completed"] is True
|
assert result["completed"] is True
|
||||||
|
|
||||||
|
def test_context_length_retry_rebuilds_request_after_compression(self, agent):
|
||||||
|
"""Retry must send the compressed transcript, not the stale oversized payload."""
|
||||||
|
err_400 = Exception(
|
||||||
|
"Error code: 400 - {'error': {'message': "
|
||||||
|
"\"This endpoint's maximum context length is 128000 tokens. "
|
||||||
|
"Please reduce the length of the messages.\"}}"
|
||||||
|
)
|
||||||
|
err_400.status_code = 400
|
||||||
|
ok_resp = _mock_response(content="Recovered after real compression", finish_reason="stop")
|
||||||
|
|
||||||
|
request_payloads = []
|
||||||
|
|
||||||
|
def _side_effect(**kwargs):
|
||||||
|
request_payloads.append(kwargs)
|
||||||
|
if len(request_payloads) == 1:
|
||||||
|
raise err_400
|
||||||
|
return ok_resp
|
||||||
|
|
||||||
|
agent.client.chat.completions.create.side_effect = _side_effect
|
||||||
|
|
||||||
|
prefill = [
|
||||||
|
{"role": "user", "content": "previous question"},
|
||||||
|
{"role": "assistant", "content": "previous answer"},
|
||||||
|
]
|
||||||
|
|
||||||
|
with (
|
||||||
|
patch.object(agent, "_compress_context") as mock_compress,
|
||||||
|
patch.object(agent, "_persist_session"),
|
||||||
|
patch.object(agent, "_save_trajectory"),
|
||||||
|
patch.object(agent, "_cleanup_task_resources"),
|
||||||
|
):
|
||||||
|
mock_compress.return_value = (
|
||||||
|
[{"role": "user", "content": "compressed summary"}],
|
||||||
|
"compressed prompt",
|
||||||
|
)
|
||||||
|
result = agent.run_conversation("hello", conversation_history=prefill)
|
||||||
|
|
||||||
|
assert result["completed"] is True
|
||||||
|
assert len(request_payloads) == 2
|
||||||
|
assert len(request_payloads[1]["messages"]) < len(request_payloads[0]["messages"])
|
||||||
|
assert request_payloads[1]["messages"][0] == {
|
||||||
|
"role": "system",
|
||||||
|
"content": "compressed prompt",
|
||||||
|
}
|
||||||
|
assert request_payloads[1]["messages"][1] == {
|
||||||
|
"role": "user",
|
||||||
|
"content": "compressed summary",
|
||||||
|
}
|
||||||
|
|
||||||
def test_413_cannot_compress_further(self, agent):
|
def test_413_cannot_compress_further(self, agent):
|
||||||
"""When compression can't reduce messages, return partial result."""
|
"""When compression can't reduce messages, return partial result."""
|
||||||
err_413 = _make_413_error()
|
err_413 = _make_413_error()
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue