From e5dc569daac34ba0a5f82069ce78c6fb7a25917c Mon Sep 17 00:00:00 2001 From: Himess Date: Sat, 14 Mar 2026 11:03:20 -0700 Subject: [PATCH 1/2] fix: salvage gateway dedup and executor cleanup from PR #993 Salvages the two still-relevant fixes from PR #993 onto current main: - use a 3-tuple LOCAL delivery key so explicit/local-origin targets are not duplicated - shut down the previous agent-loop ThreadPoolExecutor when resizing the global pool Adds regression tests for both behaviors. --- environments/agent_loop.py | 2 ++ gateway/delivery.py | 2 +- tests/gateway/test_delivery.py | 11 ++++++++++- tests/test_agent_loop.py | 19 +++++++++++++++++++ 4 files changed, 32 insertions(+), 2 deletions(-) diff --git a/environments/agent_loop.py b/environments/agent_loop.py index ab8c0236..dec3bc4e 100644 --- a/environments/agent_loop.py +++ b/environments/agent_loop.py @@ -39,7 +39,9 @@ def resize_tool_pool(max_workers: int): Safe to call before any tasks are submitted. """ global _tool_executor + old_executor = _tool_executor _tool_executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) + old_executor.shutdown(wait=False) logger.info("Tool thread pool resized to %d workers", max_workers) logger = logging.getLogger(__name__) diff --git a/gateway/delivery.py b/gateway/delivery.py index 630ab638..69ec6376 100644 --- a/gateway/delivery.py +++ b/gateway/delivery.py @@ -161,7 +161,7 @@ class DeliveryRouter: # Always include local if configured if self.config.always_log_local: - local_key = (Platform.LOCAL, None) + local_key = (Platform.LOCAL, None, None) if local_key not in seen_platforms: targets.append(DeliveryTarget(platform=Platform.LOCAL)) diff --git a/tests/gateway/test_delivery.py b/tests/gateway/test_delivery.py index 42eba781..3894897f 100644 --- a/tests/gateway/test_delivery.py +++ b/tests/gateway/test_delivery.py @@ -1,7 +1,7 @@ """Tests for the delivery routing module.""" from gateway.config import Platform, GatewayConfig, PlatformConfig, HomeChannel -from gateway.delivery import DeliveryTarget, parse_deliver_spec +from gateway.delivery import DeliveryRouter, DeliveryTarget, parse_deliver_spec from gateway.session import SessionSource @@ -85,3 +85,12 @@ class TestTargetToStringRoundtrip: reparsed = DeliveryTarget.parse(s) assert reparsed.platform == Platform.TELEGRAM assert reparsed.chat_id == "999" + + +class TestDeliveryRouter: + def test_resolve_targets_does_not_duplicate_local_when_explicit(self): + router = DeliveryRouter(GatewayConfig(always_log_local=True)) + + targets = router.resolve_targets(["local"]) + + assert [target.platform for target in targets] == [Platform.LOCAL] diff --git a/tests/test_agent_loop.py b/tests/test_agent_loop.py index bb0ccd06..b95ff780 100644 --- a/tests/test_agent_loop.py +++ b/tests/test_agent_loop.py @@ -484,3 +484,22 @@ class TestResizeToolPool: """resize_tool_pool should not raise.""" resize_tool_pool(16) # Small pool for testing resize_tool_pool(128) # Restore default + + def test_resize_shuts_down_previous_executor(self, monkeypatch): + """Replacing the global tool executor should shut down the old pool.""" + import environments.agent_loop as agent_loop_module + + old_executor = MagicMock() + new_executor = MagicMock() + + monkeypatch.setattr(agent_loop_module, "_tool_executor", old_executor) + monkeypatch.setattr( + agent_loop_module.concurrent.futures, + "ThreadPoolExecutor", + MagicMock(return_value=new_executor), + ) + + resize_tool_pool(16) + + old_executor.shutdown.assert_called_once_with(wait=False) + assert agent_loop_module._tool_executor is new_executor From 94af51f621de55c1f8ebbe0dbc6c2a54ad4fd0ed Mon Sep 17 00:00:00 2001 From: teknium1 Date: Sat, 14 Mar 2026 11:03:25 -0700 Subject: [PATCH 2/2] fix: harden trajectory compressor summary content handling Normalize summary-model content before stripping so empty or non-string responses do not trigger retry/fallback paths. Adds sync and async regression tests for None content. --- tests/test_trajectory_compressor.py | 34 ++++++++++++++++++++++++++++- trajectory_compressor.py | 33 ++++++++++++++++------------ 2 files changed, 52 insertions(+), 15 deletions(-) diff --git a/tests/test_trajectory_compressor.py b/tests/test_trajectory_compressor.py index 75fbd5a2..c95a3af9 100644 --- a/tests/test_trajectory_compressor.py +++ b/tests/test_trajectory_compressor.py @@ -1,7 +1,10 @@ """Tests for trajectory_compressor.py — config, metrics, and compression logic.""" import json -from unittest.mock import patch, MagicMock +from types import SimpleNamespace +from unittest.mock import AsyncMock, patch, MagicMock + +import pytest from trajectory_compressor import ( CompressionConfig, @@ -384,3 +387,32 @@ class TestTokenCounting: tc.tokenizer.encode = MagicMock(side_effect=Exception("fail")) # Should fallback to len(text) // 4 assert tc.count_tokens("12345678") == 2 + + +class TestGenerateSummary: + def test_generate_summary_handles_none_content(self): + tc = _make_compressor() + tc.client = MagicMock() + tc.client.chat.completions.create.return_value = SimpleNamespace( + choices=[SimpleNamespace(message=SimpleNamespace(content=None))] + ) + metrics = TrajectoryMetrics() + + summary = tc._generate_summary("Turn content", metrics) + + assert summary == "[CONTEXT SUMMARY]:" + + @pytest.mark.asyncio + async def test_generate_summary_async_handles_none_content(self): + tc = _make_compressor() + tc.async_client = MagicMock() + tc.async_client.chat.completions.create = AsyncMock( + return_value=SimpleNamespace( + choices=[SimpleNamespace(message=SimpleNamespace(content=None))] + ) + ) + metrics = TrajectoryMetrics() + + summary = await tc._generate_summary_async("Turn content", metrics) + + assert summary == "[CONTEXT SUMMARY]:" diff --git a/trajectory_compressor.py b/trajectory_compressor.py index ef81d6e2..1bfed6bf 100644 --- a/trajectory_compressor.py +++ b/trajectory_compressor.py @@ -495,6 +495,21 @@ class TrajectoryCompressor: parts.append(f"[Turn {i} - {role.upper()}]:\n{value}") return "\n\n".join(parts) + + @staticmethod + def _coerce_summary_content(content: Any) -> str: + """Normalize summary-model output to a safe string.""" + if not isinstance(content, str): + content = str(content) if content else "" + return content.strip() + + @staticmethod + def _ensure_summary_prefix(summary: str) -> str: + """Normalize summary text to include the expected prefix exactly once.""" + text = (summary or "").strip() + if text.startswith("[CONTEXT SUMMARY]:"): + return text + return "[CONTEXT SUMMARY]:" if not text else f"[CONTEXT SUMMARY]: {text}" def _generate_summary(self, content: str, metrics: TrajectoryMetrics) -> str: """ @@ -545,13 +560,8 @@ Write only the summary, starting with "[CONTEXT SUMMARY]:" prefix.""" max_tokens=self.config.summary_target_tokens * 2, ) - summary = response.choices[0].message.content.strip() - - # Ensure it starts with the prefix - if not summary.startswith("[CONTEXT SUMMARY]:"): - summary = "[CONTEXT SUMMARY]: " + summary - - return summary + summary = self._coerce_summary_content(response.choices[0].message.content) + return self._ensure_summary_prefix(summary) except Exception as e: metrics.summarization_errors += 1 @@ -612,13 +622,8 @@ Write only the summary, starting with "[CONTEXT SUMMARY]:" prefix.""" max_tokens=self.config.summary_target_tokens * 2, ) - summary = response.choices[0].message.content.strip() - - # Ensure it starts with the prefix - if not summary.startswith("[CONTEXT SUMMARY]:"): - summary = "[CONTEXT SUMMARY]: " + summary - - return summary + summary = self._coerce_summary_content(response.choices[0].message.content) + return self._ensure_summary_prefix(summary) except Exception as e: metrics.summarization_errors += 1