Enhance tool execution and logging in HermesAgentLoop
- Increased thread pool size for tool execution from 8 to 128 to improve concurrency and prevent starvation. - Added a function to resize the tool executor dynamically based on configuration. - Enhanced logging to track API call durations and tool execution times, including warnings for slow tools. - Improved overall performance monitoring by logging detailed information for each turn in the agent loop.
This commit is contained in:
parent
ad042fdd68
commit
5ec75e38b9
1 changed files with 50 additions and 11 deletions
|
|
@ -15,6 +15,7 @@ import asyncio
|
||||||
import concurrent.futures
|
import concurrent.futures
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
|
import os
|
||||||
import uuid
|
import uuid
|
||||||
from dataclasses import dataclass, field
|
from dataclasses import dataclass, field
|
||||||
from typing import Any, Dict, List, Optional, Set
|
from typing import Any, Dict, List, Optional, Set
|
||||||
|
|
@ -24,7 +25,22 @@ from model_tools import handle_function_call
|
||||||
# Thread pool for running sync tool calls that internally use asyncio.run()
|
# Thread pool for running sync tool calls that internally use asyncio.run()
|
||||||
# (e.g., mini-swe-agent's modal/docker backends). Running them in a separate
|
# (e.g., mini-swe-agent's modal/docker backends). Running them in a separate
|
||||||
# thread gives them a clean event loop so they don't deadlock inside Atropos's loop.
|
# thread gives them a clean event loop so they don't deadlock inside Atropos's loop.
|
||||||
_tool_executor = concurrent.futures.ThreadPoolExecutor(max_workers=8)
|
# Size must be large enough for concurrent eval tasks (e.g., 89 TB2 tasks all
|
||||||
|
# making tool calls). Too small = thread pool starvation, tasks queue for minutes.
|
||||||
|
# Resized at runtime by HermesAgentBaseEnv.__init__ via resize_tool_pool().
|
||||||
|
_tool_executor = concurrent.futures.ThreadPoolExecutor(max_workers=128)
|
||||||
|
|
||||||
|
|
||||||
|
def resize_tool_pool(max_workers: int):
|
||||||
|
"""
|
||||||
|
Replace the global tool executor with a new one of the given size.
|
||||||
|
|
||||||
|
Called by HermesAgentBaseEnv.__init__ based on config.tool_pool_size.
|
||||||
|
Safe to call before any tasks are submitted.
|
||||||
|
"""
|
||||||
|
global _tool_executor
|
||||||
|
_tool_executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)
|
||||||
|
logger.info("Tool thread pool resized to %d workers", max_workers)
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
@ -155,7 +171,11 @@ class HermesAgentLoop:
|
||||||
reasoning_per_turn = []
|
reasoning_per_turn = []
|
||||||
tool_errors: List[ToolError] = []
|
tool_errors: List[ToolError] = []
|
||||||
|
|
||||||
|
import time as _time
|
||||||
|
|
||||||
for turn in range(self.max_turns):
|
for turn in range(self.max_turns):
|
||||||
|
turn_start = _time.monotonic()
|
||||||
|
|
||||||
# Build the chat_completion kwargs
|
# Build the chat_completion kwargs
|
||||||
chat_kwargs = {
|
chat_kwargs = {
|
||||||
"messages": messages,
|
"messages": messages,
|
||||||
|
|
@ -172,10 +192,12 @@ class HermesAgentLoop:
|
||||||
chat_kwargs["max_tokens"] = self.max_tokens
|
chat_kwargs["max_tokens"] = self.max_tokens
|
||||||
|
|
||||||
# Make the API call -- standard OpenAI spec
|
# Make the API call -- standard OpenAI spec
|
||||||
|
api_start = _time.monotonic()
|
||||||
try:
|
try:
|
||||||
response = await self.server.chat_completion(**chat_kwargs)
|
response = await self.server.chat_completion(**chat_kwargs)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error("API call failed on turn %d: %s", turn + 1, e)
|
api_elapsed = _time.monotonic() - api_start
|
||||||
|
logger.error("API call failed on turn %d (%.1fs): %s", turn + 1, api_elapsed, e)
|
||||||
return AgentResult(
|
return AgentResult(
|
||||||
messages=messages,
|
messages=messages,
|
||||||
managed_state=self._get_managed_state(),
|
managed_state=self._get_managed_state(),
|
||||||
|
|
@ -185,8 +207,10 @@ class HermesAgentLoop:
|
||||||
tool_errors=tool_errors,
|
tool_errors=tool_errors,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
api_elapsed = _time.monotonic() - api_start
|
||||||
|
|
||||||
if not response or not response.choices:
|
if not response or not response.choices:
|
||||||
logger.warning("Empty response on turn %d", turn + 1)
|
logger.warning("Empty response on turn %d (api=%.1fs)", turn + 1, api_elapsed)
|
||||||
return AgentResult(
|
return AgentResult(
|
||||||
messages=messages,
|
messages=messages,
|
||||||
managed_state=self._get_managed_state(),
|
managed_state=self._get_managed_state(),
|
||||||
|
|
@ -265,14 +289,16 @@ class HermesAgentLoop:
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if tool_name == "terminal":
|
if tool_name == "terminal":
|
||||||
import os
|
|
||||||
backend = os.getenv("TERMINAL_ENV", "local")
|
backend = os.getenv("TERMINAL_ENV", "local")
|
||||||
cmd_preview = args.get("command", "")[:80]
|
cmd_preview = args.get("command", "")[:80]
|
||||||
print(f" 🖥️ [{backend}] $ {cmd_preview}")
|
logger.info(
|
||||||
|
"[%s] $ %s", self.task_id[:8], cmd_preview,
|
||||||
|
)
|
||||||
|
|
||||||
# Run tool calls in a thread pool so backends that use
|
# Run tool calls in a thread pool so backends that use
|
||||||
# asyncio.run() internally (modal, docker) get a clean
|
# asyncio.run() internally (modal, docker) get a clean
|
||||||
# event loop instead of deadlocking inside Atropos's loop.
|
# event loop instead of deadlocking inside Atropos's loop.
|
||||||
|
tool_submit_time = _time.monotonic()
|
||||||
loop = asyncio.get_event_loop()
|
loop = asyncio.get_event_loop()
|
||||||
tool_result = await loop.run_in_executor(
|
tool_result = await loop.run_in_executor(
|
||||||
_tool_executor,
|
_tool_executor,
|
||||||
|
|
@ -280,6 +306,16 @@ class HermesAgentLoop:
|
||||||
tool_name, args, task_id=self.task_id
|
tool_name, args, task_id=self.task_id
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
tool_elapsed = _time.monotonic() - tool_submit_time
|
||||||
|
|
||||||
|
# Log slow tools and thread pool stats for debugging
|
||||||
|
pool_active = _tool_executor._work_queue.qsize()
|
||||||
|
if tool_elapsed > 30:
|
||||||
|
logger.warning(
|
||||||
|
"[%s] turn %d: %s took %.1fs (pool queue=%d)",
|
||||||
|
self.task_id[:8], turn + 1, tool_name,
|
||||||
|
tool_elapsed, pool_active,
|
||||||
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
tool_result = json.dumps(
|
tool_result = json.dumps(
|
||||||
{"error": f"Tool execution failed: {type(e).__name__}: {str(e)}"}
|
{"error": f"Tool execution failed: {type(e).__name__}: {str(e)}"}
|
||||||
|
|
@ -320,10 +356,11 @@ class HermesAgentLoop:
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.debug(
|
turn_elapsed = _time.monotonic() - turn_start
|
||||||
"Turn %d: %d tool calls executed",
|
logger.info(
|
||||||
turn + 1,
|
"[%s] turn %d: api=%.1fs, %d tools, turn_total=%.1fs",
|
||||||
len(assistant_msg.tool_calls),
|
self.task_id[:8], turn + 1, api_elapsed,
|
||||||
|
len(assistant_msg.tool_calls), turn_elapsed,
|
||||||
)
|
)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
|
|
@ -336,8 +373,10 @@ class HermesAgentLoop:
|
||||||
msg_dict["reasoning_content"] = reasoning
|
msg_dict["reasoning_content"] = reasoning
|
||||||
messages.append(msg_dict)
|
messages.append(msg_dict)
|
||||||
|
|
||||||
logger.debug(
|
turn_elapsed = _time.monotonic() - turn_start
|
||||||
"Turn %d: model finished naturally (no tool calls)", turn + 1
|
logger.info(
|
||||||
|
"[%s] turn %d: api=%.1fs, no tools (finished), turn_total=%.1fs",
|
||||||
|
self.task_id[:8], turn + 1, api_elapsed, turn_elapsed,
|
||||||
)
|
)
|
||||||
|
|
||||||
return AgentResult(
|
return AgentResult(
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue