Implement interrupt handling for agent and CLI input and persistent prompt line at bottom of CLI :)
- Enhanced the AIAgent class to support interrupt requests, allowing for graceful interruption of ongoing tasks and processing of new messages. - Updated the HermesCLI to manage user input in a persistent manner, enabling real-time interruption of the agent's conversation. - Introduced a mechanism in the GatewayRunner to handle incoming messages while an agent is running, allowing for immediate response to user commands. - Improved overall user experience by providing feedback during interruptions and ensuring that pending messages are processed correctly.
This commit is contained in:
parent
beeb7896e0
commit
9bfe185a2e
3 changed files with 336 additions and 34 deletions
104
gateway/run.py
104
gateway/run.py
|
|
@ -72,6 +72,11 @@ class GatewayRunner:
|
|||
self.delivery_router = DeliveryRouter(self.config)
|
||||
self._running = False
|
||||
self._shutdown_event = asyncio.Event()
|
||||
|
||||
# Track running agents per session for interrupt support
|
||||
# Key: session_key, Value: AIAgent instance
|
||||
self._running_agents: Dict[str, Any] = {}
|
||||
self._pending_messages: Dict[str, str] = {} # Queued messages during interrupt
|
||||
|
||||
async def start(self) -> bool:
|
||||
"""
|
||||
|
|
@ -217,10 +222,11 @@ class GatewayRunner:
|
|||
This is the core message processing pipeline:
|
||||
1. Check user authorization
|
||||
2. Check for commands (/new, /reset, etc.)
|
||||
3. Get or create session
|
||||
4. Build context for agent
|
||||
5. Run agent conversation
|
||||
6. Return response
|
||||
3. Check for running agent and interrupt if needed
|
||||
4. Get or create session
|
||||
5. Build context for agent
|
||||
6. Run agent conversation
|
||||
7. Return response
|
||||
"""
|
||||
source = event.source
|
||||
|
||||
|
|
@ -229,7 +235,7 @@ class GatewayRunner:
|
|||
print(f"[gateway] Unauthorized user: {source.user_id} ({source.user_name}) on {source.platform.value}")
|
||||
return None # Silently ignore unauthorized users
|
||||
|
||||
# Check for reset commands
|
||||
# Check for commands
|
||||
command = event.get_command()
|
||||
if command in ["new", "reset"]:
|
||||
return await self._handle_reset_command(event)
|
||||
|
|
@ -237,8 +243,21 @@ class GatewayRunner:
|
|||
if command == "status":
|
||||
return await self._handle_status_command(event)
|
||||
|
||||
if command == "stop":
|
||||
return await self._handle_stop_command(event)
|
||||
|
||||
# Get or create session
|
||||
session_entry = self.session_store.get_or_create_session(source)
|
||||
session_key = session_entry.session_key
|
||||
|
||||
# Check if there's already a running agent for this session
|
||||
if session_key in self._running_agents:
|
||||
running_agent = self._running_agents[session_key]
|
||||
print(f"[gateway] ⚡ Interrupting running agent for session {session_key[:20]}...")
|
||||
running_agent.interrupt(event.text)
|
||||
# Store the new message to be processed after current agent finishes
|
||||
self._pending_messages[session_key] = event.text
|
||||
return None # Don't respond yet - let the interrupt handle it
|
||||
|
||||
# Build session context
|
||||
context = build_session_context(source, self.config, session_entry)
|
||||
|
|
@ -259,7 +278,8 @@ class GatewayRunner:
|
|||
context_prompt=context_prompt,
|
||||
history=history,
|
||||
source=source,
|
||||
session_id=session_entry.session_id
|
||||
session_id=session_entry.session_id,
|
||||
session_key=session_key
|
||||
)
|
||||
|
||||
# Append to transcript
|
||||
|
|
@ -309,6 +329,10 @@ class GatewayRunner:
|
|||
|
||||
connected_platforms = [p.value for p in self.adapters.keys()]
|
||||
|
||||
# Check if there's an active agent
|
||||
session_key = session_entry.session_key
|
||||
is_running = session_key in self._running_agents
|
||||
|
||||
lines = [
|
||||
"📊 **Hermes Gateway Status**",
|
||||
"",
|
||||
|
|
@ -316,12 +340,26 @@ class GatewayRunner:
|
|||
f"**Created:** {session_entry.created_at.strftime('%Y-%m-%d %H:%M')}",
|
||||
f"**Last Activity:** {session_entry.updated_at.strftime('%Y-%m-%d %H:%M')}",
|
||||
f"**Tokens:** {session_entry.total_tokens:,}",
|
||||
f"**Agent Running:** {'Yes ⚡' if is_running else 'No'}",
|
||||
"",
|
||||
f"**Connected Platforms:** {', '.join(connected_platforms)}",
|
||||
]
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
async def _handle_stop_command(self, event: MessageEvent) -> str:
|
||||
"""Handle /stop command - interrupt a running agent."""
|
||||
source = event.source
|
||||
session_entry = self.session_store.get_or_create_session(source)
|
||||
session_key = session_entry.session_key
|
||||
|
||||
if session_key in self._running_agents:
|
||||
agent = self._running_agents[session_key]
|
||||
agent.interrupt()
|
||||
return "⚡ Stopping the current task... The agent will finish its current step and respond."
|
||||
else:
|
||||
return "No active task to stop."
|
||||
|
||||
def _set_session_env(self, context: SessionContext) -> None:
|
||||
"""Set environment variables for the current session."""
|
||||
os.environ["HERMES_SESSION_PLATFORM"] = context.source.platform.value
|
||||
|
|
@ -341,12 +379,14 @@ class GatewayRunner:
|
|||
context_prompt: str,
|
||||
history: List[Dict[str, Any]],
|
||||
source: SessionSource,
|
||||
session_id: str
|
||||
session_id: str,
|
||||
session_key: str = None
|
||||
) -> str:
|
||||
"""
|
||||
Run the agent with the given message and context.
|
||||
|
||||
This is run in a thread pool to not block the event loop.
|
||||
Supports interruption via new messages.
|
||||
"""
|
||||
from run_agent import AIAgent
|
||||
import queue
|
||||
|
|
@ -432,6 +472,10 @@ class GatewayRunner:
|
|||
print(f"[Gateway] Progress message error: {e}")
|
||||
await asyncio.sleep(1)
|
||||
|
||||
# We need to share the agent instance for interrupt support
|
||||
agent_holder = [None] # Mutable container for the agent instance
|
||||
result_holder = [None] # Mutable container for the result
|
||||
|
||||
def run_sync():
|
||||
# Read from env var or use default (same as CLI)
|
||||
max_iterations = int(os.getenv("HERMES_MAX_ITERATIONS", "60"))
|
||||
|
|
@ -446,6 +490,9 @@ class GatewayRunner:
|
|||
tool_progress_callback=progress_callback if tool_progress_enabled else None,
|
||||
)
|
||||
|
||||
# Store agent reference for interrupt support
|
||||
agent_holder[0] = agent
|
||||
|
||||
# Convert transcript history to agent format
|
||||
# Transcript has timestamps; agent expects {"role": ..., "content": ...}
|
||||
agent_history = []
|
||||
|
|
@ -456,6 +503,7 @@ class GatewayRunner:
|
|||
agent_history.append({"role": role, "content": content})
|
||||
|
||||
result = agent.run_conversation(message, conversation_history=agent_history)
|
||||
result_holder[0] = result
|
||||
|
||||
# Return final response, or a message if something went wrong
|
||||
final_response = result.get("final_response")
|
||||
|
|
@ -472,14 +520,56 @@ class GatewayRunner:
|
|||
if tool_progress_enabled:
|
||||
progress_task = asyncio.create_task(send_progress_messages())
|
||||
|
||||
# Track this agent as running for this session (for interrupt support)
|
||||
# We do this in a callback after the agent is created
|
||||
async def track_agent():
|
||||
# Wait for agent to be created
|
||||
while agent_holder[0] is None:
|
||||
await asyncio.sleep(0.05)
|
||||
if session_key:
|
||||
self._running_agents[session_key] = agent_holder[0]
|
||||
|
||||
tracking_task = asyncio.create_task(track_agent())
|
||||
|
||||
try:
|
||||
# Run in thread pool to not block
|
||||
loop = asyncio.get_event_loop()
|
||||
response = await loop.run_in_executor(None, run_sync)
|
||||
|
||||
# Check if we were interrupted and have a pending message
|
||||
result = result_holder[0]
|
||||
if result and result.get("interrupted") and session_key:
|
||||
pending = self._pending_messages.pop(session_key, None)
|
||||
if pending:
|
||||
print(f"[gateway] 📨 Processing interrupted message: '{pending[:40]}...'")
|
||||
# Add an indicator to the response
|
||||
if response:
|
||||
response = response + "\n\n---\n_[Interrupted - processing your new message]_"
|
||||
|
||||
# Send the interrupted response first
|
||||
adapter = self.adapters.get(source.platform)
|
||||
if adapter and response:
|
||||
await adapter.send(chat_id=source.chat_id, content=response)
|
||||
|
||||
# Now process the pending message with updated history
|
||||
updated_history = result.get("messages", history)
|
||||
return await self._run_agent(
|
||||
message=pending,
|
||||
context_prompt=context_prompt,
|
||||
history=updated_history,
|
||||
source=source,
|
||||
session_id=session_id,
|
||||
session_key=session_key
|
||||
)
|
||||
finally:
|
||||
# Stop progress sender
|
||||
if progress_task:
|
||||
progress_task.cancel()
|
||||
|
||||
# Clean up tracking
|
||||
tracking_task.cancel()
|
||||
if session_key and session_key in self._running_agents:
|
||||
del self._running_agents[session_key]
|
||||
try:
|
||||
await progress_task
|
||||
except asyncio.CancelledError:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue