fix: thread safety for concurrent subagent delegation (#1672)
* fix: thread safety for concurrent subagent delegation Four thread-safety fixes that prevent crashes and data races when running multiple subagents concurrently via delegate_task: 1. Remove redirect_stdout/stderr from delegate_tool — mutating global sys.stdout races with the spinner thread when multiple children start concurrently, causing segfaults. Children already run with quiet_mode=True so the redirect was redundant. 2. Split _run_single_child into _build_child_agent (main thread) + _run_single_child (worker thread). AIAgent construction creates httpx/SSL clients which are not thread-safe to initialize concurrently. 3. Add threading.Lock to SessionDB — subagents share the parent's SessionDB and call create_session/append_message from worker threads with no synchronization. 4. Add _active_children_lock to AIAgent — interrupt() iterates _active_children while worker threads append/remove children. 5. Add _client_cache_lock to auxiliary_client — multiple subagent threads may resolve clients concurrently via call_llm(). Based on PR #1471 by peteromallet. * feat: Honcho base_url override via config.yaml + quick command alias type Two features salvaged from PR #1576: 1. Honcho base_url override: allows pointing Hermes at a remote self-hosted Honcho deployment via config.yaml: honcho: base_url: "http://192.168.x.x:8000" When set, this overrides the Honcho SDK's environment mapping (production/local), enabling LAN/VPN Honcho deployments without requiring the server to live on localhost. Uses config.yaml instead of env var (HONCHO_URL) per project convention. 2. Quick command alias type: adds a new 'alias' quick command type that rewrites to another slash command before normal dispatch: quick_commands: sc: type: alias target: /context Supports both CLI and gateway. Arguments are forwarded to the target command. Based on PR #1576 by redhelix. --------- Co-authored-by: peteromallet <peteromallet@users.noreply.github.com> Co-authored-by: redhelix <redhelix@users.noreply.github.com>
This commit is contained in:
parent
fd61ae13e5
commit
1d5a39e002
14 changed files with 397 additions and 272 deletions
|
|
@ -16,13 +16,10 @@ The parent's context only sees the delegation call and the summary result,
|
|||
never the child's intermediate tool calls or reasoning.
|
||||
"""
|
||||
|
||||
import contextlib
|
||||
import io
|
||||
import json
|
||||
import logging
|
||||
logger = logging.getLogger(__name__)
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
|
@ -150,7 +147,7 @@ def _build_child_progress_callback(task_index: int, parent_agent, task_count: in
|
|||
return _callback
|
||||
|
||||
|
||||
def _run_single_child(
|
||||
def _build_child_agent(
|
||||
task_index: int,
|
||||
goal: str,
|
||||
context: Optional[str],
|
||||
|
|
@ -158,16 +155,15 @@ def _run_single_child(
|
|||
model: Optional[str],
|
||||
max_iterations: int,
|
||||
parent_agent,
|
||||
task_count: int = 1,
|
||||
# Credential overrides from delegation config (provider:model resolution)
|
||||
override_provider: Optional[str] = None,
|
||||
override_base_url: Optional[str] = None,
|
||||
override_api_key: Optional[str] = None,
|
||||
override_api_mode: Optional[str] = None,
|
||||
) -> Dict[str, Any]:
|
||||
):
|
||||
"""
|
||||
Spawn and run a single child agent. Called from within a thread.
|
||||
Returns a structured result dict.
|
||||
Build a child AIAgent on the main thread (thread-safe construction).
|
||||
Returns the constructed child agent without running it.
|
||||
|
||||
When override_* params are set (from delegation config), the child uses
|
||||
those credentials instead of inheriting from the parent. This enables
|
||||
|
|
@ -176,8 +172,6 @@ def _run_single_child(
|
|||
"""
|
||||
from run_agent import AIAgent
|
||||
|
||||
child_start = time.monotonic()
|
||||
|
||||
# When no explicit toolsets given, inherit from parent's enabled toolsets
|
||||
# so disabled tools (e.g. web) don't leak to subagents.
|
||||
if toolsets:
|
||||
|
|
@ -188,65 +182,84 @@ def _run_single_child(
|
|||
child_toolsets = _strip_blocked_tools(DEFAULT_TOOLSETS)
|
||||
|
||||
child_prompt = _build_child_system_prompt(goal, context)
|
||||
# Extract parent's API key so subagents inherit auth (e.g. Nous Portal).
|
||||
parent_api_key = getattr(parent_agent, "api_key", None)
|
||||
if (not parent_api_key) and hasattr(parent_agent, "_client_kwargs"):
|
||||
parent_api_key = parent_agent._client_kwargs.get("api_key")
|
||||
|
||||
try:
|
||||
# Extract parent's API key so subagents inherit auth (e.g. Nous Portal).
|
||||
parent_api_key = getattr(parent_agent, "api_key", None)
|
||||
if (not parent_api_key) and hasattr(parent_agent, "_client_kwargs"):
|
||||
parent_api_key = parent_agent._client_kwargs.get("api_key")
|
||||
# Build progress callback to relay tool calls to parent display
|
||||
child_progress_cb = _build_child_progress_callback(task_index, parent_agent)
|
||||
|
||||
# Build progress callback to relay tool calls to parent display
|
||||
child_progress_cb = _build_child_progress_callback(task_index, parent_agent, task_count)
|
||||
# Share the parent's iteration budget so subagent tool calls
|
||||
# count toward the session-wide limit.
|
||||
shared_budget = getattr(parent_agent, "iteration_budget", None)
|
||||
|
||||
# Share the parent's iteration budget so subagent tool calls
|
||||
# count toward the session-wide limit.
|
||||
shared_budget = getattr(parent_agent, "iteration_budget", None)
|
||||
# Resolve effective credentials: config override > parent inherit
|
||||
effective_model = model or parent_agent.model
|
||||
effective_provider = override_provider or getattr(parent_agent, "provider", None)
|
||||
effective_base_url = override_base_url or parent_agent.base_url
|
||||
effective_api_key = override_api_key or parent_api_key
|
||||
effective_api_mode = override_api_mode or getattr(parent_agent, "api_mode", None)
|
||||
|
||||
# Resolve effective credentials: config override > parent inherit
|
||||
effective_model = model or parent_agent.model
|
||||
effective_provider = override_provider or getattr(parent_agent, "provider", None)
|
||||
effective_base_url = override_base_url or parent_agent.base_url
|
||||
effective_api_key = override_api_key or parent_api_key
|
||||
effective_api_mode = override_api_mode or getattr(parent_agent, "api_mode", None)
|
||||
child = AIAgent(
|
||||
base_url=effective_base_url,
|
||||
api_key=effective_api_key,
|
||||
model=effective_model,
|
||||
provider=effective_provider,
|
||||
api_mode=effective_api_mode,
|
||||
max_iterations=max_iterations,
|
||||
max_tokens=getattr(parent_agent, "max_tokens", None),
|
||||
reasoning_config=getattr(parent_agent, "reasoning_config", None),
|
||||
prefill_messages=getattr(parent_agent, "prefill_messages", None),
|
||||
enabled_toolsets=child_toolsets,
|
||||
quiet_mode=True,
|
||||
ephemeral_system_prompt=child_prompt,
|
||||
log_prefix=f"[subagent-{task_index}]",
|
||||
platform=parent_agent.platform,
|
||||
skip_context_files=True,
|
||||
skip_memory=True,
|
||||
clarify_callback=None,
|
||||
session_db=getattr(parent_agent, '_session_db', None),
|
||||
providers_allowed=parent_agent.providers_allowed,
|
||||
providers_ignored=parent_agent.providers_ignored,
|
||||
providers_order=parent_agent.providers_order,
|
||||
provider_sort=parent_agent.provider_sort,
|
||||
tool_progress_callback=child_progress_cb,
|
||||
iteration_budget=shared_budget,
|
||||
)
|
||||
|
||||
child = AIAgent(
|
||||
base_url=effective_base_url,
|
||||
api_key=effective_api_key,
|
||||
model=effective_model,
|
||||
provider=effective_provider,
|
||||
api_mode=effective_api_mode,
|
||||
max_iterations=max_iterations,
|
||||
max_tokens=getattr(parent_agent, "max_tokens", None),
|
||||
reasoning_config=getattr(parent_agent, "reasoning_config", None),
|
||||
prefill_messages=getattr(parent_agent, "prefill_messages", None),
|
||||
enabled_toolsets=child_toolsets,
|
||||
quiet_mode=True,
|
||||
ephemeral_system_prompt=child_prompt,
|
||||
log_prefix=f"[subagent-{task_index}]",
|
||||
platform=parent_agent.platform,
|
||||
skip_context_files=True,
|
||||
skip_memory=True,
|
||||
clarify_callback=None,
|
||||
session_db=getattr(parent_agent, '_session_db', None),
|
||||
providers_allowed=parent_agent.providers_allowed,
|
||||
providers_ignored=parent_agent.providers_ignored,
|
||||
providers_order=parent_agent.providers_order,
|
||||
provider_sort=parent_agent.provider_sort,
|
||||
tool_progress_callback=child_progress_cb,
|
||||
iteration_budget=shared_budget,
|
||||
)
|
||||
# Set delegation depth so children can't spawn grandchildren
|
||||
child._delegate_depth = getattr(parent_agent, '_delegate_depth', 0) + 1
|
||||
|
||||
# Set delegation depth so children can't spawn grandchildren
|
||||
child._delegate_depth = getattr(parent_agent, '_delegate_depth', 0) + 1
|
||||
|
||||
# Register child for interrupt propagation
|
||||
if hasattr(parent_agent, '_active_children'):
|
||||
# Register child for interrupt propagation
|
||||
if hasattr(parent_agent, '_active_children'):
|
||||
lock = getattr(parent_agent, '_active_children_lock', None)
|
||||
if lock:
|
||||
with lock:
|
||||
parent_agent._active_children.append(child)
|
||||
else:
|
||||
parent_agent._active_children.append(child)
|
||||
|
||||
# Run with stdout/stderr suppressed to prevent interleaved output
|
||||
devnull = io.StringIO()
|
||||
with contextlib.redirect_stdout(devnull), contextlib.redirect_stderr(devnull):
|
||||
result = child.run_conversation(user_message=goal)
|
||||
return child
|
||||
|
||||
def _run_single_child(
|
||||
task_index: int,
|
||||
goal: str,
|
||||
child=None,
|
||||
parent_agent=None,
|
||||
**_kwargs,
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Run a pre-built child agent. Called from within a thread.
|
||||
Returns a structured result dict.
|
||||
"""
|
||||
child_start = time.monotonic()
|
||||
|
||||
# Get the progress callback from the child agent
|
||||
child_progress_cb = getattr(child, 'tool_progress_callback', None)
|
||||
|
||||
try:
|
||||
result = child.run_conversation(user_message=goal)
|
||||
|
||||
# Flush any remaining batched progress to gateway
|
||||
if child_progress_cb and hasattr(child_progress_cb, '_flush'):
|
||||
|
|
@ -355,11 +368,15 @@ def _run_single_child(
|
|||
# Unregister child from interrupt propagation
|
||||
if hasattr(parent_agent, '_active_children'):
|
||||
try:
|
||||
parent_agent._active_children.remove(child)
|
||||
lock = getattr(parent_agent, '_active_children_lock', None)
|
||||
if lock:
|
||||
with lock:
|
||||
parent_agent._active_children.remove(child)
|
||||
else:
|
||||
parent_agent._active_children.remove(child)
|
||||
except (ValueError, UnboundLocalError) as e:
|
||||
logger.debug("Could not remove child from active_children: %s", e)
|
||||
|
||||
|
||||
def delegate_task(
|
||||
goal: Optional[str] = None,
|
||||
context: Optional[str] = None,
|
||||
|
|
@ -428,51 +445,38 @@ def delegate_task(
|
|||
# Track goal labels for progress display (truncated for readability)
|
||||
task_labels = [t["goal"][:40] for t in task_list]
|
||||
|
||||
if n_tasks == 1:
|
||||
# Single task -- run directly (no thread pool overhead)
|
||||
t = task_list[0]
|
||||
result = _run_single_child(
|
||||
task_index=0,
|
||||
goal=t["goal"],
|
||||
context=t.get("context"),
|
||||
toolsets=t.get("toolsets") or toolsets,
|
||||
model=creds["model"],
|
||||
max_iterations=effective_max_iter,
|
||||
parent_agent=parent_agent,
|
||||
task_count=1,
|
||||
override_provider=creds["provider"],
|
||||
override_base_url=creds["base_url"],
|
||||
# Build all child agents on the main thread (thread-safe construction)
|
||||
children = []
|
||||
for i, t in enumerate(task_list):
|
||||
child = _build_child_agent(
|
||||
task_index=i, goal=t["goal"], context=t.get("context"),
|
||||
toolsets=t.get("toolsets") or toolsets, model=creds["model"],
|
||||
max_iterations=effective_max_iter, parent_agent=parent_agent,
|
||||
override_provider=creds["provider"], override_base_url=creds["base_url"],
|
||||
override_api_key=creds["api_key"],
|
||||
override_api_mode=creds["api_mode"],
|
||||
)
|
||||
children.append((i, t, child))
|
||||
|
||||
if n_tasks == 1:
|
||||
# Single task -- run directly (no thread pool overhead)
|
||||
_i, _t, child = children[0]
|
||||
result = _run_single_child(0, _t["goal"], child, parent_agent)
|
||||
results.append(result)
|
||||
else:
|
||||
# Batch -- run in parallel with per-task progress lines
|
||||
completed_count = 0
|
||||
spinner_ref = getattr(parent_agent, '_delegate_spinner', None)
|
||||
|
||||
# Save stdout/stderr before the executor — redirect_stdout in child
|
||||
# threads races on sys.stdout and can leave it as devnull permanently.
|
||||
_saved_stdout = sys.stdout
|
||||
_saved_stderr = sys.stderr
|
||||
|
||||
with ThreadPoolExecutor(max_workers=MAX_CONCURRENT_CHILDREN) as executor:
|
||||
futures = {}
|
||||
for i, t in enumerate(task_list):
|
||||
for i, t, child in children:
|
||||
future = executor.submit(
|
||||
_run_single_child,
|
||||
task_index=i,
|
||||
goal=t["goal"],
|
||||
context=t.get("context"),
|
||||
toolsets=t.get("toolsets") or toolsets,
|
||||
model=creds["model"],
|
||||
max_iterations=effective_max_iter,
|
||||
child=child,
|
||||
parent_agent=parent_agent,
|
||||
task_count=n_tasks,
|
||||
override_provider=creds["provider"],
|
||||
override_base_url=creds["base_url"],
|
||||
override_api_key=creds["api_key"],
|
||||
override_api_mode=creds["api_mode"],
|
||||
)
|
||||
futures[future] = i
|
||||
|
||||
|
|
@ -515,10 +519,6 @@ def delegate_task(
|
|||
except Exception as e:
|
||||
logger.debug("Spinner update_text failed: %s", e)
|
||||
|
||||
# Restore stdout/stderr in case redirect_stdout race left them as devnull
|
||||
sys.stdout = _saved_stdout
|
||||
sys.stderr = _saved_stderr
|
||||
|
||||
# Sort by task_index so results match input order
|
||||
results.sort(key=lambda r: r["task_index"])
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue