fix: always fall back to non-streaming on ANY streaming error
Previously the fallback only triggered on specific error keywords like 'streaming is not supported'. Many third-party providers have partial or broken streaming — rejecting stream=True, crashing on stream_options, dropping connections mid-stream, returning malformed chunks, etc. Now: any exception during the streaming API call triggers an automatic fallback to the standard non-streaming request path. The error is logged at INFO level for diagnostics but never surfaces to the user. If the fallback also fails, THAT error propagates normally. This ensures streaming is additive — it improves UX when it works but never breaks providers that don't support it. Tests: 2 new (any-error fallback, double-failure propagation), 15 total.
This commit is contained in:
parent
5479bb0e0c
commit
99369b926c
2 changed files with 62 additions and 21 deletions
20
run_agent.py
20
run_agent.py
|
|
@ -3208,23 +3208,17 @@ class AIAgent:
|
||||||
else:
|
else:
|
||||||
result["response"] = _call_chat_completions()
|
result["response"] = _call_chat_completions()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
err_text = str(e).lower()
|
# Always fall back to non-streaming on ANY streaming error.
|
||||||
# Fall back to non-streaming if provider doesn't support it.
|
# Many third-party/extrinsic providers have partial or broken
|
||||||
# Be specific in matching — "stream" alone is too broad and
|
# streaming support — rejecting stream=True, crashing on
|
||||||
# catches unrelated errors like "stream_options" rejections.
|
# stream_options, dropping connections mid-stream, etc.
|
||||||
stream_unsupported = any(
|
# A clean fallback to the standard request path ensures the
|
||||||
kw in err_text
|
# agent still works even if streaming doesn't.
|
||||||
for kw in ("streaming is not", "streaming not support",
|
logger.info("Streaming failed, falling back to non-streaming: %s", e)
|
||||||
"does not support stream", "not available")
|
|
||||||
)
|
|
||||||
if stream_unsupported:
|
|
||||||
logger.info("Streaming not supported by provider, falling back to non-streaming: %s", e)
|
|
||||||
try:
|
try:
|
||||||
result["response"] = self._interruptible_api_call(api_kwargs)
|
result["response"] = self._interruptible_api_call(api_kwargs)
|
||||||
except Exception as fallback_err:
|
except Exception as fallback_err:
|
||||||
result["error"] = fallback_err
|
result["error"] = fallback_err
|
||||||
else:
|
|
||||||
result["error"] = e
|
|
||||||
finally:
|
finally:
|
||||||
request_client = request_client_holder.get("client")
|
request_client = request_client_holder.get("client")
|
||||||
if request_client is not None:
|
if request_client is not None:
|
||||||
|
|
|
||||||
|
|
@ -321,7 +321,7 @@ class TestStreamingCallbacks:
|
||||||
|
|
||||||
|
|
||||||
class TestStreamingFallback:
|
class TestStreamingFallback:
|
||||||
"""Verify fallback to non-streaming on unsupported providers."""
|
"""Verify fallback to non-streaming on ANY streaming error."""
|
||||||
|
|
||||||
@patch("run_agent.AIAgent._interruptible_api_call")
|
@patch("run_agent.AIAgent._interruptible_api_call")
|
||||||
@patch("run_agent.AIAgent._create_request_openai_client")
|
@patch("run_agent.AIAgent._create_request_openai_client")
|
||||||
|
|
@ -367,16 +367,63 @@ class TestStreamingFallback:
|
||||||
assert response.choices[0].message.content == "fallback response"
|
assert response.choices[0].message.content == "fallback response"
|
||||||
mock_non_stream.assert_called_once()
|
mock_non_stream.assert_called_once()
|
||||||
|
|
||||||
|
@patch("run_agent.AIAgent._interruptible_api_call")
|
||||||
@patch("run_agent.AIAgent._create_request_openai_client")
|
@patch("run_agent.AIAgent._create_request_openai_client")
|
||||||
@patch("run_agent.AIAgent._close_request_openai_client")
|
@patch("run_agent.AIAgent._close_request_openai_client")
|
||||||
def test_non_stream_error_raises(self, mock_close, mock_create):
|
def test_any_stream_error_falls_back(self, mock_close, mock_create, mock_non_stream):
|
||||||
"""Non-streaming errors propagate normally."""
|
"""ANY streaming error triggers fallback — not just specific messages."""
|
||||||
from run_agent import AIAgent
|
from run_agent import AIAgent
|
||||||
|
|
||||||
mock_client = MagicMock()
|
mock_client = MagicMock()
|
||||||
mock_client.chat.completions.create.side_effect = Exception("Rate limit exceeded")
|
mock_client.chat.completions.create.side_effect = Exception(
|
||||||
|
"Connection reset by peer"
|
||||||
|
)
|
||||||
mock_create.return_value = mock_client
|
mock_create.return_value = mock_client
|
||||||
|
|
||||||
|
fallback_response = SimpleNamespace(
|
||||||
|
id="fallback",
|
||||||
|
model="test",
|
||||||
|
choices=[SimpleNamespace(
|
||||||
|
index=0,
|
||||||
|
message=SimpleNamespace(
|
||||||
|
role="assistant",
|
||||||
|
content="fallback after connection error",
|
||||||
|
tool_calls=None,
|
||||||
|
reasoning_content=None,
|
||||||
|
),
|
||||||
|
finish_reason="stop",
|
||||||
|
)],
|
||||||
|
usage=None,
|
||||||
|
)
|
||||||
|
mock_non_stream.return_value = fallback_response
|
||||||
|
|
||||||
|
agent = AIAgent(
|
||||||
|
model="test/model",
|
||||||
|
quiet_mode=True,
|
||||||
|
skip_context_files=True,
|
||||||
|
skip_memory=True,
|
||||||
|
)
|
||||||
|
agent.api_mode = "chat_completions"
|
||||||
|
agent._interrupt_requested = False
|
||||||
|
|
||||||
|
response = agent._interruptible_streaming_api_call({})
|
||||||
|
|
||||||
|
assert response.choices[0].message.content == "fallback after connection error"
|
||||||
|
mock_non_stream.assert_called_once()
|
||||||
|
|
||||||
|
@patch("run_agent.AIAgent._interruptible_api_call")
|
||||||
|
@patch("run_agent.AIAgent._create_request_openai_client")
|
||||||
|
@patch("run_agent.AIAgent._close_request_openai_client")
|
||||||
|
def test_fallback_error_propagates(self, mock_close, mock_create, mock_non_stream):
|
||||||
|
"""When both streaming AND fallback fail, the fallback error propagates."""
|
||||||
|
from run_agent import AIAgent
|
||||||
|
|
||||||
|
mock_client = MagicMock()
|
||||||
|
mock_client.chat.completions.create.side_effect = Exception("stream broke")
|
||||||
|
mock_create.return_value = mock_client
|
||||||
|
|
||||||
|
mock_non_stream.side_effect = Exception("Rate limit exceeded")
|
||||||
|
|
||||||
agent = AIAgent(
|
agent = AIAgent(
|
||||||
model="test/model",
|
model="test/model",
|
||||||
quiet_mode=True,
|
quiet_mode=True,
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue