refactor: implement structured logging across multiple modules

- Introduced logging functionality in cli.py, run_agent.py, scheduler.py, and various tool modules to replace print statements with structured logging.
- Enhanced error handling and informational messages to improve debugging and monitoring capabilities.
- Ensured consistent logging practices across the codebase, facilitating better traceability and maintenance.
This commit is contained in:
teknium1 2026-02-21 03:11:11 -08:00
parent b6247b71b5
commit a885d2f240
14 changed files with 303 additions and 303 deletions

View file

@ -27,6 +27,7 @@ Usage:
"""
import json
import logging
import os
import signal
import sys
@ -40,6 +41,8 @@ import uuid
from pathlib import Path
from typing import Optional, Dict, Any
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Global interrupt event: set by the agent when a user interrupt arrives.
@ -87,13 +90,11 @@ def _get_scratch_dir() -> Path:
# Create user-specific subdirectory
user_scratch = scratch / os.getenv("USER", "hermes") / "hermes-agent"
user_scratch.mkdir(parents=True, exist_ok=True)
if not os.getenv("HERMES_QUIET"):
print(f"[Terminal] Using /scratch for sandboxes: {user_scratch}")
logger.info("Using /scratch for sandboxes: %s", user_scratch)
return user_scratch
# Fall back to /tmp
if not os.getenv("HERMES_QUIET"):
print("[Terminal] Warning: /scratch not available, using /tmp (limited space)")
logger.warning("/scratch not available, using /tmp (limited space)")
return Path(tempfile.gettempdir())
@ -155,9 +156,9 @@ def _get_or_build_sif(image: str, executable: str = "apptainer") -> str:
if sif_path.exists():
return str(sif_path)
print(f"[Terminal] Building SIF image (one-time setup)...")
print(f"[Terminal] Source: {image}")
print(f"[Terminal] Target: {sif_path}")
logger.info("Building SIF image (one-time setup)...")
logger.info(" Source: %s", image)
logger.info(" Target: %s", sif_path)
# Ensure tmp directory exists for build
tmp_dir = cache_dir / "tmp"
@ -177,21 +178,21 @@ def _get_or_build_sif(image: str, executable: str = "apptainer") -> str:
env=env
)
if result.returncode != 0:
print(f"[Terminal] ⚠️ SIF build failed, falling back to docker:// URL")
print(f"[Terminal] Error: {result.stderr[:500]}")
logger.warning("SIF build failed, falling back to docker:// URL")
logger.warning(" Error: %s", result.stderr[:500])
return image
print(f"[Terminal] ✅ SIF image built successfully")
logger.info("SIF image built successfully")
return str(sif_path)
except subprocess.TimeoutExpired:
print(f"[Terminal] ⚠️ SIF build timed out, falling back to docker:// URL")
logger.warning("SIF build timed out, falling back to docker:// URL")
# Clean up partial file
if sif_path.exists():
sif_path.unlink()
return image
except Exception as e:
print(f"[Terminal] ⚠️ SIF build error: {e}, falling back to docker:// URL")
logger.warning("SIF build error: %s, falling back to docker:// URL", e)
return image
@ -218,8 +219,8 @@ def _check_disk_usage_warning():
total_gb = total_bytes / (1024 ** 3)
if total_gb > DISK_USAGE_WARNING_THRESHOLD_GB:
print(f"⚠️ [Terminal] WARNING: Disk usage ({total_gb:.1f}GB) exceeds threshold ({DISK_USAGE_WARNING_THRESHOLD_GB}GB)")
print(f" Consider running cleanup_all_environments() or reducing parallel workers")
logger.warning("Disk usage (%.1fGB) exceeds threshold (%.0fGB). Consider running cleanup_all_environments().",
total_gb, DISK_USAGE_WARNING_THRESHOLD_GB)
return True
return False
@ -280,7 +281,7 @@ def _save_permanent_allowlist(patterns: set):
config["command_allowlist"] = list(patterns)
save_config(config)
except Exception as e:
print(f" ⚠️ Could not save allowlist: {e}")
logger.warning("Could not save allowlist: %s", e)
def _detect_dangerous_command(command: str) -> tuple:
@ -822,7 +823,7 @@ class _SingularityEnvironment:
raise RuntimeError(f"Failed to start instance: {result.stderr}")
self._instance_started = True
print(f"[Singularity] Instance {self.instance_id} started (persistent container)", flush=True)
logger.info("Singularity instance %s started (persistent container)", self.instance_id)
except subprocess.TimeoutExpired:
raise RuntimeError("Instance start timed out")
@ -881,9 +882,9 @@ class _SingularityEnvironment:
text=True,
timeout=30,
)
print(f"[Singularity] Instance {self.instance_id} stopped", flush=True)
logger.info("Singularity instance %s stopped", self.instance_id)
except Exception as e:
print(f"[Singularity] Warning: failed to stop instance {self.instance_id}: {e}", flush=True)
logger.warning("Failed to stop Singularity instance %s: %s", self.instance_id, e)
self._instance_started = False
def stop(self):
@ -1282,11 +1283,9 @@ def _get_env_config() -> Dict[str, Any]:
# inside a container. Also catch Windows-style paths (C:\...).
host_prefixes = ("/Users/", "/home/", "C:\\", "C:/")
if any(cwd.startswith(p) for p in host_prefixes) and cwd != default_cwd:
if not os.getenv("HERMES_QUIET"):
print(
f"[Terminal] Ignoring TERMINAL_CWD={cwd!r} for {env_type} backend "
f"(host path won't exist in sandbox). Using {default_cwd!r} instead."
)
logger.info("Ignoring TERMINAL_CWD=%r for %s backend "
"(host path won't exist in sandbox). Using %r instead.",
cwd, env_type, default_cwd)
cwd = default_cwd
return {
@ -1406,16 +1405,14 @@ def _cleanup_inactive_envs(lifetime_seconds: int = 300):
elif hasattr(env, 'terminate'):
env.terminate()
if not os.getenv("HERMES_QUIET"):
print(f"[Terminal Cleanup] Cleaned up inactive environment for task: {task_id}")
logger.info("Cleaned up inactive environment for task: %s", task_id)
except Exception as e:
error_str = str(e)
if not os.getenv("HERMES_QUIET"):
if "404" in error_str or "not found" in error_str.lower():
print(f"[Terminal Cleanup] Environment for task {task_id} already cleaned up")
else:
print(f"[Terminal Cleanup] Error cleaning up environment for task {task_id}: {e}")
if "404" in error_str or "not found" in error_str.lower():
logger.info("Environment for task %s already cleaned up", task_id)
else:
logger.warning("Error cleaning up environment for task %s: %s", task_id, e)
def _cleanup_thread_worker():
@ -1427,8 +1424,7 @@ def _cleanup_thread_worker():
config = _get_env_config()
_cleanup_inactive_envs(config["lifetime_seconds"])
except Exception as e:
if not os.getenv("HERMES_QUIET"):
print(f"[Terminal Cleanup] Error in cleanup thread: {e}")
logger.warning("Error in cleanup thread: %s", e)
for _ in range(60):
if not _cleanup_running:
@ -1493,7 +1489,7 @@ def cleanup_all_environments():
cleanup_vm(task_id)
cleaned += 1
except Exception as e:
print(f"[Terminal Cleanup] Error cleaning {task_id}: {e}")
logger.error("Error cleaning %s: %s", task_id, e)
# Also clean any orphaned directories
scratch_dir = _get_scratch_dir()
@ -1501,12 +1497,12 @@ def cleanup_all_environments():
for path in glob.glob(str(scratch_dir / "hermes-*")):
try:
shutil.rmtree(path, ignore_errors=True)
print(f"[Terminal Cleanup] Removed orphaned: {path}")
logger.info("Removed orphaned: %s", path)
except OSError:
pass
if not os.getenv("HERMES_QUIET") and cleaned > 0:
print(f"[Terminal Cleanup] Cleaned {cleaned} environments")
if cleaned > 0:
logger.info("Cleaned %d environments", cleaned)
return cleaned
@ -1545,16 +1541,14 @@ def cleanup_vm(task_id: str):
elif hasattr(env, 'terminate'):
env.terminate()
if not os.getenv("HERMES_QUIET"):
print(f"[Terminal Cleanup] Manually cleaned up environment for task: {task_id}")
logger.info("Manually cleaned up environment for task: %s", task_id)
except Exception as e:
if not os.getenv("HERMES_QUIET"):
error_str = str(e)
if "404" in error_str or "not found" in error_str.lower():
print(f"[Terminal Cleanup] Environment for task {task_id} already cleaned up")
else:
print(f"[Terminal Cleanup] Error cleaning up environment for task {task_id}: {e}")
error_str = str(e)
if "404" in error_str or "not found" in error_str.lower():
logger.info("Environment for task %s already cleaned up", task_id)
else:
logger.warning("Error cleaning up environment for task %s: %s", task_id, e)
def _atexit_cleanup():
@ -1562,7 +1556,7 @@ def _atexit_cleanup():
_stop_cleanup_thread()
if _active_environments:
count = len(_active_environments)
print(f"\n[Terminal Cleanup] Shutting down {count} remaining sandbox(es)...")
logger.info("Shutting down %d remaining sandbox(es)...", count)
cleanup_all_environments()
atexit.register(_atexit_cleanup)
@ -1679,8 +1673,7 @@ def terminal_tool(
if needs_creation:
if env_type in ("singularity", "local"):
_check_disk_usage_warning()
if not os.getenv("HERMES_QUIET"):
print(f"[Terminal] Creating new {env_type} environment for task {effective_task_id[:8]}...", flush=True)
logger.info("Creating new %s environment for task %s...", env_type, effective_task_id[:8])
try:
ssh_config = None
if env_type == "ssh":
@ -1710,8 +1703,7 @@ def terminal_tool(
_active_environments[effective_task_id] = new_env
_last_activity[effective_task_id] = time.time()
env = new_env
if not os.getenv("HERMES_QUIET"):
print(f"[Terminal] {env_type} environment ready for task {effective_task_id[:8]}", flush=True)
logger.info("%s environment ready for task %s", env_type, effective_task_id[:8])
# Check for dangerous commands (only for local/ssh in interactive modes)
# Skip check if force=True (user has confirmed they want to run it)
@ -1828,17 +1820,13 @@ def terminal_tool(
if retry_count < max_retries:
retry_count += 1
wait_time = 2 ** retry_count
print(f"⚠️ Terminal: execution error, retrying in {wait_time}s (attempt {retry_count}/{max_retries})")
print(f" Command: {command[:200]}")
print(f" Error: {type(e).__name__}: {e}")
print(f" Task ID: {effective_task_id}, Backend: {env_type}")
logger.warning("Execution error, retrying in %ds (attempt %d/%d) - Command: %s - Error: %s: %s - Task: %s, Backend: %s",
wait_time, retry_count, max_retries, command[:200], type(e).__name__, e, effective_task_id, env_type)
time.sleep(wait_time)
continue
print(f"❌ Terminal: execution failed after {max_retries} retries")
print(f" Command: {command[:200]}")
print(f" Error: {type(e).__name__}: {e}")
print(f" Task ID: {effective_task_id}, Backend: {env_type}")
logger.error("Execution failed after %d retries - Command: %s - Error: %s: %s - Task: %s, Backend: %s",
max_retries, command[:200], type(e).__name__, e, effective_task_id, env_type)
return json.dumps({
"output": "",
"exit_code": -1,
@ -1908,7 +1896,7 @@ def check_terminal_requirements() -> bool:
else:
return False
except Exception as e:
print(f"Terminal requirements check failed: {e}")
logger.error("Terminal requirements check failed: %s", e)
return False