From 8117d0adabe39e47973eaff9290a4340b92f63ba Mon Sep 17 00:00:00 2001 From: teknium1 Date: Mon, 16 Feb 2026 19:37:40 -0800 Subject: [PATCH] Refactor file operations and environment management in file_tools and terminal_tool - Improved the caching mechanism for ShellFileOperations to ensure stale entries are invalidated when environments are cleaned up. - Enhanced thread safety by refining the use of locks during environment creation and cleanup processes. - Streamlined the cleanup of inactive environments to prevent blocking other tool calls, ensuring efficient resource management. - Added error handling and messaging improvements for better user feedback during environment cleanup. --- tools/file_tools.py | 134 ++++++++++++++++++++-------------------- tools/terminal_tool.py | 136 +++++++++++++++++++++++------------------ 2 files changed, 146 insertions(+), 124 deletions(-) diff --git a/tools/file_tools.py b/tools/file_tools.py index 35d44950..4ee7a592 100644 --- a/tools/file_tools.py +++ b/tools/file_tools.py @@ -13,86 +13,88 @@ _file_ops_cache: dict = {} def _get_file_ops(task_id: str = "default") -> ShellFileOperations: """Get or create ShellFileOperations for a terminal environment. - + Respects the TERMINAL_ENV setting -- if the task_id doesn't have an environment yet, creates one using the configured backend (local, docker, modal, etc.) rather than always defaulting to local. + + Thread-safe: uses the same per-task creation locks as terminal_tool to + prevent duplicate sandbox creation from concurrent tool calls. """ from tools.terminal_tool import ( _active_environments, _env_lock, _create_environment, _get_env_config, _last_activity, _start_cleanup_thread, _check_disk_usage_warning, + _creation_locks, _creation_locks_lock, ) import time - - # Fast path: check cache without heavy locks + + # Fast path: check cache -- but also verify the underlying environment + # is still alive (it may have been killed by the cleanup thread). with _file_ops_lock: - if task_id in _file_ops_cache: - return _file_ops_cache[task_id] - - # Check if we need to create a new environment. - # Uses the same per-task creation locks as terminal_tool to prevent - # duplicate sandbox creation from concurrent tool calls. - from tools.terminal_tool import _creation_locks, _creation_locks_lock - - needs_creation = False - with _env_lock: - if task_id not in _active_environments: - needs_creation = True - - if needs_creation: - # Per-task lock: only one thread creates the sandbox, others wait - with _creation_locks_lock: - if task_id not in _creation_locks: - _creation_locks[task_id] = __import__("threading").Lock() - task_lock = _creation_locks[task_id] + cached = _file_ops_cache.get(task_id) + if cached is not None: + with _env_lock: + if task_id in _active_environments: + _last_activity[task_id] = time.time() + return cached + else: + # Environment was cleaned up -- invalidate stale cache entry + with _file_ops_lock: + _file_ops_cache.pop(task_id, None) + + # Need to ensure the environment exists before building file_ops. + # Acquire per-task lock so only one thread creates the sandbox. + with _creation_locks_lock: + if task_id not in _creation_locks: + _creation_locks[task_id] = threading.Lock() + task_lock = _creation_locks[task_id] + + with task_lock: + # Double-check: another thread may have created it while we waited + with _env_lock: + if task_id in _active_environments: + _last_activity[task_id] = time.time() + terminal_env = _active_environments[task_id] + else: + terminal_env = None + + if terminal_env is None: + from tools.terminal_tool import _task_env_overrides + + config = _get_env_config() + env_type = config["env_type"] + overrides = _task_env_overrides.get(task_id, {}) + + if env_type == "docker": + image = overrides.get("docker_image") or config["docker_image"] + elif env_type == "singularity": + image = overrides.get("singularity_image") or config["singularity_image"] + elif env_type == "modal": + image = overrides.get("modal_image") or config["modal_image"] + else: + image = "" + + cwd = overrides.get("cwd") or config["cwd"] + if not os.getenv("HERMES_QUIET"): + print(f"[FileTools] Creating new {env_type} environment for task {task_id[:8]}...", flush=True) + + terminal_env = _create_environment( + env_type=env_type, + image=image, + cwd=cwd, + timeout=config["timeout"], + ) - with task_lock: - # Double-check after acquiring the per-task lock with _env_lock: - if task_id in _active_environments: - needs_creation = False + _active_environments[task_id] = terminal_env + _last_activity[task_id] = time.time() - if needs_creation: - from tools.terminal_tool import _task_env_overrides - - config = _get_env_config() - env_type = config["env_type"] - overrides = _task_env_overrides.get(task_id, {}) - - if env_type == "docker": - image = overrides.get("docker_image") or config["docker_image"] - elif env_type == "singularity": - image = overrides.get("singularity_image") or config["singularity_image"] - elif env_type == "modal": - image = overrides.get("modal_image") or config["modal_image"] - else: - image = "" - - cwd = overrides.get("cwd") or config["cwd"] - if not os.getenv("HERMES_QUIET"): - print(f"[FileTools] Creating new {env_type} environment for task {task_id[:8]}...", flush=True) - - new_env = _create_environment( - env_type=env_type, - image=image, - cwd=cwd, - timeout=config["timeout"], - ) - - with _env_lock: - _active_environments[task_id] = new_env - _last_activity[task_id] = __import__("time").time() - - _start_cleanup_thread() - if not os.getenv("HERMES_QUIET"): - print(f"[FileTools] {env_type} environment ready for task {task_id[:8]}", flush=True) - - # Now get the environment and build file_ops - with _env_lock: - _last_activity[task_id] = time.time() - terminal_env = _active_environments[task_id] - + _start_cleanup_thread() + if not os.getenv("HERMES_QUIET"): + print(f"[FileTools] {env_type} environment ready for task {task_id[:8]}", flush=True) + + # Build file_ops from the (guaranteed live) environment and cache it file_ops = ShellFileOperations(terminal_env) with _file_ops_lock: _file_ops_cache[task_id] = file_ops diff --git a/tools/terminal_tool.py b/tools/terminal_tool.py index 43b21f8e..c48508a8 100644 --- a/tools/terminal_tool.py +++ b/tools/terminal_tool.py @@ -1274,49 +1274,56 @@ def _cleanup_inactive_envs(lifetime_seconds: int = 300): global _active_environments, _last_activity current_time = time.time() - tasks_to_cleanup = [] + + # Phase 1: collect stale entries and remove them from tracking dicts while + # holding the lock. Do NOT call env.cleanup() inside the lock -- Modal and + # Docker teardown can block for 10-15s, which would stall every concurrent + # terminal/file tool call waiting on _env_lock. + envs_to_stop = [] # list of (task_id, env) pairs with _env_lock: for task_id, last_time in list(_last_activity.items()): if current_time - last_time > lifetime_seconds: - tasks_to_cleanup.append(task_id) + env = _active_environments.pop(task_id, None) + _last_activity.pop(task_id, None) + _task_workdirs.pop(task_id, None) + if env is not None: + envs_to_stop.append((task_id, env)) - for task_id in tasks_to_cleanup: - try: - if task_id in _active_environments: - env = _active_environments[task_id] - # Try various cleanup methods - if hasattr(env, 'cleanup'): - env.cleanup() - elif hasattr(env, 'stop'): - env.stop() - elif hasattr(env, 'terminate'): - env.terminate() + # Also purge per-task creation locks for cleaned-up tasks + with _creation_locks_lock: + for task_id, _ in envs_to_stop: + _creation_locks.pop(task_id, None) - del _active_environments[task_id] - if not os.getenv("HERMES_QUIET"): - print(f"[Terminal Cleanup] Cleaned up inactive environment for task: {task_id}") + # Phase 2: stop the actual sandboxes OUTSIDE the lock so other tool calls + # are not blocked while Modal/Docker sandboxes shut down. + for task_id, env in envs_to_stop: + # Invalidate stale file_ops cache entry (Bug fix: prevents + # ShellFileOperations from referencing a dead sandbox) + try: + from tools.file_tools import clear_file_ops_cache + clear_file_ops_cache(task_id) + except ImportError: + pass - if task_id in _last_activity: - del _last_activity[task_id] - if task_id in _task_workdirs: - del _task_workdirs[task_id] + try: + if hasattr(env, 'cleanup'): + env.cleanup() + elif hasattr(env, 'stop'): + env.stop() + elif hasattr(env, 'terminate'): + env.terminate() - except Exception as e: - error_str = str(e) - if not os.getenv("HERMES_QUIET"): - if "404" in error_str or "not found" in error_str.lower(): - print(f"[Terminal Cleanup] Environment for task {task_id} already cleaned up") - else: - print(f"[Terminal Cleanup] Error cleaning up environment for task {task_id}: {e}") - - # Always remove from tracking dicts - if task_id in _active_environments: - del _active_environments[task_id] - if task_id in _last_activity: - del _last_activity[task_id] - if task_id in _task_workdirs: - del _task_workdirs[task_id] + if not os.getenv("HERMES_QUIET"): + print(f"[Terminal Cleanup] Cleaned up inactive environment for task: {task_id}") + + except Exception as e: + error_str = str(e) + if not os.getenv("HERMES_QUIET"): + if "404" in error_str or "not found" in error_str.lower(): + print(f"[Terminal Cleanup] Environment for task {task_id} already cleaned up") + else: + print(f"[Terminal Cleanup] Error cleaning up environment for task {task_id}: {e}") def _cleanup_thread_worker(): @@ -1415,34 +1422,47 @@ def cleanup_vm(task_id: str): """Manually clean up a specific environment by task_id.""" global _active_environments, _last_activity, _task_workdirs + # Remove from tracking dicts while holding the lock, but defer the + # actual (potentially slow) env.cleanup() call to outside the lock + # so other tool calls aren't blocked. + env = None with _env_lock: - try: - if task_id in _active_environments: - env = _active_environments[task_id] - if hasattr(env, 'cleanup'): - env.cleanup() - elif hasattr(env, 'stop'): - env.stop() - elif hasattr(env, 'terminate'): - env.terminate() + env = _active_environments.pop(task_id, None) + _task_workdirs.pop(task_id, None) + _last_activity.pop(task_id, None) - del _active_environments[task_id] - if not os.getenv("HERMES_QUIET"): - print(f"[Terminal Cleanup] Manually cleaned up environment for task: {task_id}") + # Clean up per-task creation lock + with _creation_locks_lock: + _creation_locks.pop(task_id, None) - if task_id in _task_workdirs: - del _task_workdirs[task_id] + # Invalidate stale file_ops cache entry + try: + from tools.file_tools import clear_file_ops_cache + clear_file_ops_cache(task_id) + except ImportError: + pass - if task_id in _last_activity: - del _last_activity[task_id] + if env is None: + return - except Exception as e: - if not os.getenv("HERMES_QUIET"): - error_str = str(e) - if "404" in error_str or "not found" in error_str.lower(): - print(f"[Terminal Cleanup] Environment for task {task_id} already cleaned up") - else: - print(f"[Terminal Cleanup] Error cleaning up environment for task {task_id}: {e}") + try: + if hasattr(env, 'cleanup'): + env.cleanup() + elif hasattr(env, 'stop'): + env.stop() + elif hasattr(env, 'terminate'): + env.terminate() + + if not os.getenv("HERMES_QUIET"): + print(f"[Terminal Cleanup] Manually cleaned up environment for task: {task_id}") + + except Exception as e: + if not os.getenv("HERMES_QUIET"): + error_str = str(e) + if "404" in error_str or "not found" in error_str.lower(): + print(f"[Terminal Cleanup] Environment for task {task_id} already cleaned up") + else: + print(f"[Terminal Cleanup] Error cleaning up environment for task {task_id}: {e}") def _atexit_cleanup():