fix: harden sentinel guard for /stop during setup and shutdown
- /stop during sentinel returns helpful message instead of queuing - Shutdown loop skips sentinel entries instead of catching AttributeError - _handle_stop_command guards against sentinel (defensive) - Added tests for both edge cases (7 total race guard tests)
This commit is contained in:
parent
aaa96713d4
commit
fc061c2fee
2 changed files with 82 additions and 4 deletions
|
|
@ -1056,6 +1056,8 @@ class GatewayRunner:
|
||||||
self._running = False
|
self._running = False
|
||||||
|
|
||||||
for session_key, agent in list(self._running_agents.items()):
|
for session_key, agent in list(self._running_agents.items()):
|
||||||
|
if agent is _AGENT_PENDING_SENTINEL:
|
||||||
|
continue
|
||||||
try:
|
try:
|
||||||
agent.interrupt("Gateway shutting down")
|
agent.interrupt("Gateway shutting down")
|
||||||
logger.debug("Interrupted running agent for session %s during shutdown", session_key[:20])
|
logger.debug("Interrupted running agent for session %s during shutdown", session_key[:20])
|
||||||
|
|
@ -1354,8 +1356,12 @@ class GatewayRunner:
|
||||||
|
|
||||||
running_agent = self._running_agents.get(_quick_key)
|
running_agent = self._running_agents.get(_quick_key)
|
||||||
if running_agent is _AGENT_PENDING_SENTINEL:
|
if running_agent is _AGENT_PENDING_SENTINEL:
|
||||||
# Agent is being set up but not ready yet — queue the message
|
# Agent is being set up but not ready yet.
|
||||||
# so it will be picked up after the agent starts.
|
if event.get_command() == "stop":
|
||||||
|
# Nothing to interrupt — agent hasn't started yet.
|
||||||
|
return "⏳ The agent is still starting up — nothing to stop yet."
|
||||||
|
# Queue the message so it will be picked up after the
|
||||||
|
# agent starts.
|
||||||
adapter = self.adapters.get(source.platform)
|
adapter = self.adapters.get(source.platform)
|
||||||
if adapter:
|
if adapter:
|
||||||
adapter._pending_messages[_quick_key] = event
|
adapter._pending_messages[_quick_key] = event
|
||||||
|
|
@ -2326,8 +2332,10 @@ class GatewayRunner:
|
||||||
session_entry = self.session_store.get_or_create_session(source)
|
session_entry = self.session_store.get_or_create_session(source)
|
||||||
session_key = session_entry.session_key
|
session_key = session_entry.session_key
|
||||||
|
|
||||||
if session_key in self._running_agents:
|
agent = self._running_agents.get(session_key)
|
||||||
agent = self._running_agents[session_key]
|
if agent is _AGENT_PENDING_SENTINEL:
|
||||||
|
return "⏳ The agent is still starting up — nothing to stop yet."
|
||||||
|
if agent:
|
||||||
agent.interrupt()
|
agent.interrupt()
|
||||||
return "⚡ Stopping the current task... The agent will finish its current step and respond."
|
return "⚡ Stopping the current task... The agent will finish its current step and respond."
|
||||||
else:
|
else:
|
||||||
|
|
|
||||||
|
|
@ -195,3 +195,73 @@ async def test_command_messages_do_not_leave_sentinel():
|
||||||
assert session_key not in runner._running_agents, (
|
assert session_key not in runner._running_agents, (
|
||||||
"Command handlers must not leave sentinel in _running_agents"
|
"Command handlers must not leave sentinel in _running_agents"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
# Test 6: /stop during sentinel returns helpful message
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_stop_during_sentinel_returns_message():
|
||||||
|
"""If /stop arrives while the sentinel is set (agent still starting),
|
||||||
|
it should return a helpful message instead of crashing or queuing."""
|
||||||
|
runner = _make_runner()
|
||||||
|
event1 = _make_event(text="hello")
|
||||||
|
session_key = build_session_key(event1.source)
|
||||||
|
|
||||||
|
barrier = asyncio.Event()
|
||||||
|
|
||||||
|
async def slow_inner(self_inner, ev, src, qk):
|
||||||
|
await barrier.wait()
|
||||||
|
return "ok"
|
||||||
|
|
||||||
|
with patch.object(GatewayRunner, "_handle_message_with_agent", slow_inner):
|
||||||
|
task1 = asyncio.create_task(runner._handle_message(event1))
|
||||||
|
await asyncio.sleep(0)
|
||||||
|
|
||||||
|
# Sentinel should be set
|
||||||
|
assert runner._running_agents.get(session_key) is _AGENT_PENDING_SENTINEL
|
||||||
|
|
||||||
|
# Send /stop — should get a message, not crash
|
||||||
|
stop_event = _make_event(text="/stop")
|
||||||
|
result = await runner._handle_message(stop_event)
|
||||||
|
assert result is not None, "/stop during sentinel should return a message"
|
||||||
|
assert "starting up" in result.lower()
|
||||||
|
|
||||||
|
# Should NOT be queued as pending
|
||||||
|
adapter = runner.adapters[Platform.TELEGRAM]
|
||||||
|
assert session_key not in adapter._pending_messages
|
||||||
|
|
||||||
|
barrier.set()
|
||||||
|
await task1
|
||||||
|
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
# Test 7: Shutdown skips sentinel entries
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_shutdown_skips_sentinel():
|
||||||
|
"""During gateway shutdown, sentinel entries in _running_agents
|
||||||
|
should be skipped without raising AttributeError."""
|
||||||
|
runner = _make_runner()
|
||||||
|
session_key = "telegram:dm:99999"
|
||||||
|
|
||||||
|
# Simulate a sentinel in _running_agents
|
||||||
|
runner._running_agents[session_key] = _AGENT_PENDING_SENTINEL
|
||||||
|
|
||||||
|
# Also add a real agent mock to verify it still gets interrupted
|
||||||
|
real_agent = MagicMock()
|
||||||
|
runner._running_agents["telegram:dm:88888"] = real_agent
|
||||||
|
|
||||||
|
runner.adapters = {} # No adapters to disconnect
|
||||||
|
runner._running = True
|
||||||
|
runner._shutdown_event = asyncio.Event()
|
||||||
|
runner._exit_reason = None
|
||||||
|
runner._shutdown_all_gateway_honcho = lambda: None
|
||||||
|
|
||||||
|
with patch("gateway.status.remove_pid_file"), \
|
||||||
|
patch("gateway.status.write_runtime_status"):
|
||||||
|
await runner.stop()
|
||||||
|
|
||||||
|
# Real agent should have been interrupted
|
||||||
|
real_agent.interrupt.assert_called_once()
|
||||||
|
# Should not have raised on the sentinel
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue