Add a claude code-like CLI

- Introduced `cli-config.yaml.example` to provide a template for configuring the CLI behavior, including model settings, terminal tool configurations, agent behavior, and toolsets.
- Created `cli.py` for an interactive terminal interface, allowing users to start the Hermes Agent with various options and toolsets.
- Added `hermes` launcher script for convenient CLI access.
- Updated `model_tools.py` to support quiet mode for suppressing output during tool initialization and execution.
- Enhanced logging in various tools to respect quiet mode, improving user experience by reducing unnecessary output.
- Added `prompt_toolkit` to `requirements.txt` for improved CLI interaction capabilities.
- Created `TODO.md` for future improvements and enhancements to the Hermes Agent framework.
This commit is contained in:
teknium 2026-01-31 06:30:48 +00:00
parent 8e986584f4
commit bc76a032ba
10 changed files with 2251 additions and 118 deletions

View file

@ -64,11 +64,13 @@ def _get_scratch_dir() -> Path:
# Create user-specific subdirectory
user_scratch = scratch / os.getenv("USER", "hermes") / "hermes-agent"
user_scratch.mkdir(parents=True, exist_ok=True)
print(f"[Terminal] Using /scratch for sandboxes: {user_scratch}")
if not os.getenv("HERMES_QUIET"):
print(f"[Terminal] Using /scratch for sandboxes: {user_scratch}")
return user_scratch
# Fall back to /tmp
print("[Terminal] Warning: /scratch not available, using /tmp (limited space)")
if not os.getenv("HERMES_QUIET"):
print("[Terminal] Warning: /scratch not available, using /tmp (limited space)")
return Path(tempfile.gettempdir())
@ -307,6 +309,144 @@ class _SingularityEnvironment:
"""Cleanup on destruction."""
self.cleanup()
class _SSHEnvironment:
"""
SSH-based remote execution environment.
Runs commands on a remote machine over SSH, keeping the agent code
completely isolated from the execution environment. Uses SSH ControlMaster
for connection persistence (faster subsequent commands).
Security benefits:
- Agent cannot modify its own code
- Remote machine acts as a sandbox
- Clear separation between agent and execution environment
"""
def __init__(self, host: str, user: str, cwd: str = "/tmp", timeout: int = 60,
port: int = 22, key_path: str = ""):
self.host = host
self.user = user
self.cwd = cwd
self.timeout = timeout
self.port = port
self.key_path = key_path
# Create control socket directory for connection persistence
self.control_dir = Path(tempfile.gettempdir()) / "hermes-ssh"
self.control_dir.mkdir(parents=True, exist_ok=True)
self.control_socket = self.control_dir / f"{user}@{host}:{port}.sock"
# Test connection and establish ControlMaster
self._establish_connection()
def _build_ssh_command(self, extra_args: list = None) -> list:
"""Build base SSH command with connection options."""
cmd = ["ssh"]
# Connection multiplexing for performance
cmd.extend(["-o", f"ControlPath={self.control_socket}"])
cmd.extend(["-o", "ControlMaster=auto"])
cmd.extend(["-o", "ControlPersist=300"]) # Keep connection alive for 5 min
# Standard options
cmd.extend(["-o", "BatchMode=yes"]) # No password prompts
cmd.extend(["-o", "StrictHostKeyChecking=accept-new"]) # Accept new hosts
cmd.extend(["-o", "ConnectTimeout=10"])
# Port
if self.port != 22:
cmd.extend(["-p", str(self.port)])
# Private key
if self.key_path:
cmd.extend(["-i", self.key_path])
# Extra args (like -t for TTY)
if extra_args:
cmd.extend(extra_args)
# Target
cmd.append(f"{self.user}@{self.host}")
return cmd
def _establish_connection(self):
"""Test SSH connection and establish ControlMaster."""
cmd = self._build_ssh_command()
cmd.append("echo 'SSH connection established'")
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=15
)
if result.returncode != 0:
error_msg = result.stderr.strip() or result.stdout.strip()
raise RuntimeError(f"SSH connection failed: {error_msg}")
except subprocess.TimeoutExpired:
raise RuntimeError(f"SSH connection to {self.user}@{self.host} timed out")
def execute(self, command: str, cwd: str = "", *, timeout: int | None = None) -> dict:
"""Execute a command on the remote host via SSH."""
work_dir = cwd or self.cwd
effective_timeout = timeout or self.timeout
# Wrap command to run in the correct directory
# Use bash -c to handle complex commands properly
wrapped_command = f'cd {work_dir} && {command}'
cmd = self._build_ssh_command()
cmd.extend(["bash", "-c", wrapped_command])
try:
result = subprocess.run(
cmd,
text=True,
timeout=effective_timeout,
encoding="utf-8",
errors="replace",
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
return {"output": result.stdout, "returncode": result.returncode}
except subprocess.TimeoutExpired:
return {"output": f"Command timed out after {effective_timeout}s", "returncode": 124}
except Exception as e:
return {"output": f"SSH execution error: {str(e)}", "returncode": 1}
def cleanup(self):
"""Close the SSH ControlMaster connection."""
if self.control_socket.exists():
try:
# Send exit command to ControlMaster
cmd = ["ssh", "-o", f"ControlPath={self.control_socket}", "-O", "exit",
f"{self.user}@{self.host}"]
subprocess.run(cmd, capture_output=True, timeout=5)
except:
pass
# Remove socket file
try:
self.control_socket.unlink()
except:
pass
def stop(self):
"""Alias for cleanup."""
self.cleanup()
def __del__(self):
"""Cleanup on destruction."""
try:
self.cleanup()
except:
pass
# Tool description for LLM
TERMINAL_TOOL_DESCRIPTION = """Execute commands on a secure Linux environment.
@ -348,25 +488,31 @@ _cleanup_running = False
def _get_env_config() -> Dict[str, Any]:
"""Get terminal environment configuration from environment variables."""
return {
"env_type": os.getenv("TERMINAL_ENV", "local"), # local, docker, singularity, or modal
"env_type": os.getenv("TERMINAL_ENV", "local"), # local, docker, singularity, modal, or ssh
"docker_image": os.getenv("TERMINAL_DOCKER_IMAGE", "python:3.11"),
"singularity_image": os.getenv("TERMINAL_SINGULARITY_IMAGE", "docker://python:3.11"),
"modal_image": os.getenv("TERMINAL_MODAL_IMAGE", "python:3.11"),
"cwd": os.getenv("TERMINAL_CWD", "/tmp"),
"timeout": int(os.getenv("TERMINAL_TIMEOUT", "60")),
"lifetime_seconds": int(os.getenv("TERMINAL_LIFETIME_SECONDS", "300")),
# SSH-specific config
"ssh_host": os.getenv("TERMINAL_SSH_HOST", ""),
"ssh_user": os.getenv("TERMINAL_SSH_USER", ""),
"ssh_port": int(os.getenv("TERMINAL_SSH_PORT", "22")),
"ssh_key": os.getenv("TERMINAL_SSH_KEY", ""), # Path to private key (optional, uses ssh-agent if empty)
}
def _create_environment(env_type: str, image: str, cwd: str, timeout: int):
def _create_environment(env_type: str, image: str, cwd: str, timeout: int, ssh_config: dict = None):
"""
Create an execution environment from mini-swe-agent.
Args:
env_type: One of "local", "docker", "singularity", "modal"
image: Docker/Singularity/Modal image name (ignored for local)
env_type: One of "local", "docker", "singularity", "modal", "ssh"
image: Docker/Singularity/Modal image name (ignored for local/ssh)
cwd: Working directory
timeout: Default command timeout
ssh_config: SSH connection config (for env_type="ssh")
Returns:
Environment instance with execute() method
@ -387,8 +533,20 @@ def _create_environment(env_type: str, image: str, cwd: str, timeout: int):
from minisweagent.environments.extra.swerex_modal import SwerexModalEnvironment
return SwerexModalEnvironment(image=image, cwd=cwd, timeout=timeout)
elif env_type == "ssh":
if not ssh_config or not ssh_config.get("host") or not ssh_config.get("user"):
raise ValueError("SSH environment requires ssh_host and ssh_user to be configured")
return _SSHEnvironment(
host=ssh_config["host"],
user=ssh_config["user"],
port=ssh_config.get("port", 22),
key_path=ssh_config.get("key", ""),
cwd=cwd,
timeout=timeout
)
else:
raise ValueError(f"Unknown environment type: {env_type}. Use 'local', 'docker', 'singularity', or 'modal'")
raise ValueError(f"Unknown environment type: {env_type}. Use 'local', 'docker', 'singularity', 'modal', or 'ssh'")
def _cleanup_inactive_envs(lifetime_seconds: int = 300):
@ -416,7 +574,8 @@ def _cleanup_inactive_envs(lifetime_seconds: int = 300):
env.terminate()
del _active_environments[task_id]
print(f"[Terminal Cleanup] Cleaned up inactive environment for task: {task_id}")
if not os.getenv("HERMES_QUIET"):
print(f"[Terminal Cleanup] Cleaned up inactive environment for task: {task_id}")
if task_id in _last_activity:
del _last_activity[task_id]
@ -425,10 +584,11 @@ def _cleanup_inactive_envs(lifetime_seconds: int = 300):
except Exception as e:
error_str = str(e)
if "404" in error_str or "not found" in error_str.lower():
print(f"[Terminal Cleanup] Environment for task {task_id} already cleaned up")
else:
print(f"[Terminal Cleanup] Error cleaning up environment for task {task_id}: {e}")
if not os.getenv("HERMES_QUIET"):
if "404" in error_str or "not found" in error_str.lower():
print(f"[Terminal Cleanup] Environment for task {task_id} already cleaned up")
else:
print(f"[Terminal Cleanup] Error cleaning up environment for task {task_id}: {e}")
# Always remove from tracking dicts
if task_id in _active_environments:
@ -448,7 +608,8 @@ def _cleanup_thread_worker():
config = _get_env_config()
_cleanup_inactive_envs(config["lifetime_seconds"])
except Exception as e:
print(f"[Terminal Cleanup] Error in cleanup thread: {e}")
if not os.getenv("HERMES_QUIET"):
print(f"[Terminal Cleanup] Error in cleanup thread: {e}")
for _ in range(60):
if not _cleanup_running:
@ -545,7 +706,8 @@ def cleanup_vm(task_id: str):
env.terminate()
del _active_environments[task_id]
print(f"[Terminal Cleanup] Manually cleaned up environment for task: {task_id}")
if not os.getenv("HERMES_QUIET"):
print(f"[Terminal Cleanup] Manually cleaned up environment for task: {task_id}")
if task_id in _task_workdirs:
del _task_workdirs[task_id]
@ -554,11 +716,12 @@ def cleanup_vm(task_id: str):
del _last_activity[task_id]
except Exception as e:
error_str = str(e)
if "404" in error_str or "not found" in error_str.lower():
print(f"[Terminal Cleanup] Environment for task {task_id} already cleaned up")
else:
print(f"[Terminal Cleanup] Error cleaning up environment for task {task_id}: {e}")
if not os.getenv("HERMES_QUIET"):
error_str = str(e)
if "404" in error_str or "not found" in error_str.lower():
print(f"[Terminal Cleanup] Environment for task {task_id} already cleaned up")
else:
print(f"[Terminal Cleanup] Error cleaning up environment for task {task_id}: {e}")
atexit.register(_stop_cleanup_thread)
@ -616,9 +779,10 @@ def terminal_tool(
# Use task_id for environment isolation
effective_task_id = task_id or "default"
# For local environment, create a unique subdirectory per task
# For local environment in batch mode, create a unique subdirectory per task
# This prevents parallel tasks from overwriting each other's files
if env_type == "local":
# In CLI mode (HERMES_QUIET), use the cwd directly without subdirectories
if env_type == "local" and not os.getenv("HERMES_QUIET"):
import uuid
with _env_lock:
if effective_task_id not in _task_workdirs:
@ -637,11 +801,22 @@ def terminal_tool(
_check_disk_usage_warning()
try:
# Build SSH config if using SSH environment
ssh_config = None
if env_type == "ssh":
ssh_config = {
"host": config.get("ssh_host", ""),
"user": config.get("ssh_user", ""),
"port": config.get("ssh_port", 22),
"key": config.get("ssh_key", ""),
}
_active_environments[effective_task_id] = _create_environment(
env_type=env_type,
image=image,
cwd=cwd,
timeout=effective_timeout
timeout=effective_timeout,
ssh_config=ssh_config
)
except ImportError as e:
return json.dumps({