Merge branch 'main' into fix/packaging-bugs

This commit is contained in:
Teknium 2026-03-13 03:15:45 -07:00 committed by GitHub
commit 0a88b133c2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
289 changed files with 48243 additions and 3815 deletions

View file

@ -184,43 +184,52 @@ def prompt_dangerous_approval(command: str, description: str,
os.environ["HERMES_SPINNER_PAUSE"] = "1"
try:
print()
print(f" ⚠️ DANGEROUS COMMAND: {description}")
print(f" {command[:80]}{'...' if len(command) > 80 else ''}")
print()
print(f" [o]nce | [s]ession | [a]lways | [d]eny")
print()
sys.stdout.flush()
is_truncated = len(command) > 80
while True:
print()
print(f" ⚠️ DANGEROUS COMMAND: {description}")
print(f" {command[:80]}{'...' if is_truncated else ''}")
print()
view_hint = " | [v]iew full" if is_truncated else ""
print(f" [o]nce | [s]ession | [a]lways | [d]eny{view_hint}")
print()
sys.stdout.flush()
result = {"choice": ""}
result = {"choice": ""}
def get_input():
try:
result["choice"] = input(" Choice [o/s/a/D]: ").strip().lower()
except (EOFError, OSError):
result["choice"] = ""
def get_input():
try:
result["choice"] = input(" Choice [o/s/a/D]: ").strip().lower()
except (EOFError, OSError):
result["choice"] = ""
thread = threading.Thread(target=get_input, daemon=True)
thread.start()
thread.join(timeout=timeout_seconds)
thread = threading.Thread(target=get_input, daemon=True)
thread.start()
thread.join(timeout=timeout_seconds)
if thread.is_alive():
print("\n ⏱ Timeout - denying command")
return "deny"
if thread.is_alive():
print("\n ⏱ Timeout - denying command")
return "deny"
choice = result["choice"]
if choice in ('o', 'once'):
print(" ✓ Allowed once")
return "once"
elif choice in ('s', 'session'):
print(" ✓ Allowed for this session")
return "session"
elif choice in ('a', 'always'):
print(" ✓ Added to permanent allowlist")
return "always"
else:
print(" ✗ Denied")
return "deny"
choice = result["choice"]
if choice in ('v', 'view') and is_truncated:
print()
print(" Full command:")
print(f" {command}")
is_truncated = False # show full on next loop iteration too
continue
if choice in ('o', 'once'):
print(" ✓ Allowed once")
return "once"
elif choice in ('s', 'session'):
print(" ✓ Allowed for this session")
return "session"
elif choice in ('a', 'always'):
print(" ✓ Added to permanent allowlist")
return "always"
else:
print(" ✗ Denied")
return "deny"
except (EOFError, KeyboardInterrupt):
print("\n ✗ Cancelled")
@ -250,6 +259,10 @@ def check_dangerous_command(command: str, env_type: str,
if env_type in ("docker", "singularity", "modal", "daytona"):
return {"approved": True, "message": None}
# --yolo: bypass all approval prompts
if os.getenv("HERMES_YOLO_MODE"):
return {"approved": True, "message": None}
is_dangerous, pattern_key, description = detect_dangerous_command(command)
if not is_dangerous:
return {"approved": True, "message": None}
@ -295,6 +308,6 @@ def check_dangerous_command(command: str, env_type: str,
elif choice == "always":
approve_session(session_key, pattern_key)
approve_permanent(pattern_key)
save_permanent_allowlist(load_permanent_allowlist() | {pattern_key})
save_permanent_allowlist(_permanent_approved)
return {"approved": True, "message": None}

View file

@ -63,7 +63,7 @@ import time
import requests
from typing import Dict, Any, Optional, List
from pathlib import Path
from agent.auxiliary_client import get_vision_auxiliary_client, get_text_auxiliary_client
from agent.auxiliary_client import call_llm
logger = logging.getLogger(__name__)
@ -80,38 +80,15 @@ DEFAULT_SESSION_TIMEOUT = 300
# Max tokens for snapshot content before summarization
SNAPSHOT_SUMMARIZE_THRESHOLD = 8000
# Vision client — for browser_vision (screenshot analysis)
# Wrapped in try/except so a broken auxiliary config doesn't prevent the entire
# browser_tool module from importing (which would disable all 10 browser tools).
try:
_aux_vision_client, _DEFAULT_VISION_MODEL = get_vision_auxiliary_client()
except Exception as _init_err:
logger.debug("Could not initialise vision auxiliary client: %s", _init_err)
_aux_vision_client, _DEFAULT_VISION_MODEL = None, None
# Text client — for page snapshot summarization (same config as web_extract)
try:
_aux_text_client, _DEFAULT_TEXT_MODEL = get_text_auxiliary_client("web_extract")
except Exception as _init_err:
logger.debug("Could not initialise text auxiliary client: %s", _init_err)
_aux_text_client, _DEFAULT_TEXT_MODEL = None, None
# Module-level alias for availability checks
EXTRACTION_MODEL = _DEFAULT_TEXT_MODEL or _DEFAULT_VISION_MODEL
def _get_vision_model() -> str:
def _get_vision_model() -> Optional[str]:
"""Model for browser_vision (screenshot analysis — multimodal)."""
return (os.getenv("AUXILIARY_VISION_MODEL", "").strip()
or _DEFAULT_VISION_MODEL
or "google/gemini-3-flash-preview")
return os.getenv("AUXILIARY_VISION_MODEL", "").strip() or None
def _get_extraction_model() -> str:
def _get_extraction_model() -> Optional[str]:
"""Model for page snapshot text summarization — same as web_extract."""
return (os.getenv("AUXILIARY_WEB_EXTRACT_MODEL", "").strip()
or _DEFAULT_TEXT_MODEL
or "google/gemini-3-flash-preview")
return os.getenv("AUXILIARY_WEB_EXTRACT_MODEL", "").strip() or None
def _is_local_mode() -> bool:
@ -941,9 +918,6 @@ def _extract_relevant_content(
Falls back to simple truncation when no auxiliary text model is configured.
"""
if _aux_text_client is None:
return _truncate_snapshot(snapshot_text)
if user_task:
extraction_prompt = (
f"You are a content extractor for a browser automation agent.\n\n"
@ -968,13 +942,16 @@ def _extract_relevant_content(
)
try:
from agent.auxiliary_client import auxiliary_max_tokens_param
response = _aux_text_client.chat.completions.create(
model=_get_extraction_model(),
messages=[{"role": "user", "content": extraction_prompt}],
**auxiliary_max_tokens_param(4000),
temperature=0.1,
)
call_kwargs = {
"task": "web_extract",
"messages": [{"role": "user", "content": extraction_prompt}],
"max_tokens": 4000,
"temperature": 0.1,
}
model = _get_extraction_model()
if model:
call_kwargs["model"] = model
response = call_llm(**call_kwargs)
return response.choices[0].message.content
except Exception:
return _truncate_snapshot(snapshot_text)
@ -1497,14 +1474,6 @@ def browser_vision(question: str, annotate: bool = False, task_id: Optional[str]
effective_task_id = task_id or "default"
# Check auxiliary vision client
if _aux_vision_client is None or _DEFAULT_VISION_MODEL is None:
return json.dumps({
"success": False,
"error": "Browser vision unavailable: no auxiliary vision model configured. "
"Set OPENROUTER_API_KEY or configure Nous Portal to enable browser vision."
}, ensure_ascii=False)
# Save screenshot to persistent location so it can be shared with users
hermes_home = Path(os.environ.get("HERMES_HOME", Path.home() / ".hermes"))
screenshots_dir = hermes_home / "browser_screenshots"
@ -1562,14 +1531,13 @@ def browser_vision(question: str, annotate: bool = False, task_id: Optional[str]
f"Focus on answering the user's specific question."
)
# Use the sync auxiliary vision client directly
from agent.auxiliary_client import auxiliary_max_tokens_param
# Use the centralized LLM router
vision_model = _get_vision_model()
logger.debug("browser_vision: analysing screenshot (%d bytes) with model=%s",
len(image_data), vision_model)
response = _aux_vision_client.chat.completions.create(
model=vision_model,
messages=[
logger.debug("browser_vision: analysing screenshot (%d bytes)",
len(image_data))
call_kwargs = {
"task": "vision",
"messages": [
{
"role": "user",
"content": [
@ -1578,9 +1546,12 @@ def browser_vision(question: str, annotate: bool = False, task_id: Optional[str]
],
}
],
**auxiliary_max_tokens_param(2000),
temperature=0.1,
)
"max_tokens": 2000,
"temperature": 0.1,
}
if vision_model:
call_kwargs["model"] = vision_model
response = call_llm(**call_kwargs)
analysis = response.choices[0].message.content
response_data = {
@ -1615,10 +1586,10 @@ def _cleanup_old_screenshots(screenshots_dir, max_age_hours=24):
try:
if f.stat().st_mtime < cutoff:
f.unlink()
except Exception:
pass
except Exception:
pass # Non-critical — don't fail the screenshot operation
except Exception as e:
logger.debug("Failed to clean old screenshot %s: %s", f, e)
except Exception as e:
logger.debug("Screenshot cleanup error (non-critical): %s", e)
def _cleanup_old_recordings(max_age_hours=72):
@ -1634,10 +1605,10 @@ def _cleanup_old_recordings(max_age_hours=72):
try:
if f.stat().st_mtime < cutoff:
f.unlink()
except Exception:
pass
except Exception:
pass
except Exception as e:
logger.debug("Failed to clean old recording %s: %s", f, e)
except Exception as e:
logger.debug("Recording cleanup error (non-critical): %s", e)
# ============================================================================
@ -1745,11 +1716,11 @@ def cleanup_browser(task_id: Optional[str] = None) -> None:
pid_file = os.path.join(socket_dir, f"{session_name}.pid")
if os.path.isfile(pid_file):
try:
daemon_pid = int(open(pid_file).read().strip())
daemon_pid = int(Path(pid_file).read_text().strip())
os.kill(daemon_pid, signal.SIGTERM)
logger.debug("Killed daemon pid %s for %s", daemon_pid, session_name)
except (ProcessLookupError, ValueError, PermissionError, OSError):
pass
logger.debug("Could not kill daemon pid for %s (already dead or inaccessible)", session_name)
shutil.rmtree(socket_dir, ignore_errors=True)
logger.debug("Removed task %s from active sessions", task_id)

454
tools/checkpoint_manager.py Normal file
View file

@ -0,0 +1,454 @@
"""
Checkpoint Manager Transparent filesystem snapshots via shadow git repos.
Creates automatic snapshots of working directories before file-mutating
operations (write_file, patch), triggered once per conversation turn.
Provides rollback to any previous checkpoint.
This is NOT a tool the LLM never sees it. It's transparent infrastructure
controlled by the ``checkpoints`` config flag or ``--checkpoints`` CLI flag.
Architecture:
~/.hermes/checkpoints/{sha256(abs_dir)[:16]}/ shadow git repo
HEAD, refs/, objects/ standard git internals
HERMES_WORKDIR original dir path
info/exclude default excludes
The shadow repo uses GIT_DIR + GIT_WORK_TREE so no git state leaks
into the user's project directory.
"""
import hashlib
import logging
import os
import shutil
import subprocess
import time
from pathlib import Path
from typing import Dict, List, Optional, Set
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
CHECKPOINT_BASE = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes")) / "checkpoints"
DEFAULT_EXCLUDES = [
"node_modules/",
"dist/",
"build/",
".env",
".env.*",
".env.local",
".env.*.local",
"__pycache__/",
"*.pyc",
"*.pyo",
".DS_Store",
"*.log",
".cache/",
".next/",
".nuxt/",
"coverage/",
".pytest_cache/",
".venv/",
"venv/",
".git/",
]
# Git subprocess timeout (seconds).
_GIT_TIMEOUT: int = max(10, min(60, int(os.getenv("HERMES_CHECKPOINT_TIMEOUT", "30"))))
# Max files to snapshot — skip huge directories to avoid slowdowns.
_MAX_FILES = 50_000
# ---------------------------------------------------------------------------
# Shadow repo helpers
# ---------------------------------------------------------------------------
def _shadow_repo_path(working_dir: str) -> Path:
"""Deterministic shadow repo path: sha256(abs_path)[:16]."""
abs_path = str(Path(working_dir).resolve())
dir_hash = hashlib.sha256(abs_path.encode()).hexdigest()[:16]
return CHECKPOINT_BASE / dir_hash
def _git_env(shadow_repo: Path, working_dir: str) -> dict:
"""Build env dict that redirects git to the shadow repo."""
env = os.environ.copy()
env["GIT_DIR"] = str(shadow_repo)
env["GIT_WORK_TREE"] = str(Path(working_dir).resolve())
env.pop("GIT_INDEX_FILE", None)
env.pop("GIT_NAMESPACE", None)
env.pop("GIT_ALTERNATE_OBJECT_DIRECTORIES", None)
return env
def _run_git(
args: List[str],
shadow_repo: Path,
working_dir: str,
timeout: int = _GIT_TIMEOUT,
) -> tuple:
"""Run a git command against the shadow repo. Returns (ok, stdout, stderr)."""
env = _git_env(shadow_repo, working_dir)
cmd = ["git"] + list(args)
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout,
env=env,
cwd=str(Path(working_dir).resolve()),
)
ok = result.returncode == 0
stdout = result.stdout.strip()
stderr = result.stderr.strip()
if not ok:
logger.error(
"Git command failed: %s (rc=%d) stderr=%s",
" ".join(cmd), result.returncode, stderr,
)
return ok, stdout, stderr
except subprocess.TimeoutExpired:
msg = f"git timed out after {timeout}s: {' '.join(cmd)}"
logger.error(msg, exc_info=True)
return False, "", msg
except FileNotFoundError:
logger.error("Git executable not found: %s", " ".join(cmd), exc_info=True)
return False, "", "git not found"
except Exception as exc:
logger.error("Unexpected git error running %s: %s", " ".join(cmd), exc, exc_info=True)
return False, "", str(exc)
def _init_shadow_repo(shadow_repo: Path, working_dir: str) -> Optional[str]:
"""Initialise shadow repo if needed. Returns error string or None."""
if (shadow_repo / "HEAD").exists():
return None
shadow_repo.mkdir(parents=True, exist_ok=True)
ok, _, err = _run_git(["init"], shadow_repo, working_dir)
if not ok:
return f"Shadow repo init failed: {err}"
_run_git(["config", "user.email", "hermes@local"], shadow_repo, working_dir)
_run_git(["config", "user.name", "Hermes Checkpoint"], shadow_repo, working_dir)
info_dir = shadow_repo / "info"
info_dir.mkdir(exist_ok=True)
(info_dir / "exclude").write_text(
"\n".join(DEFAULT_EXCLUDES) + "\n", encoding="utf-8"
)
(shadow_repo / "HERMES_WORKDIR").write_text(
str(Path(working_dir).resolve()) + "\n", encoding="utf-8"
)
logger.debug("Initialised checkpoint repo at %s for %s", shadow_repo, working_dir)
return None
def _dir_file_count(path: str) -> int:
"""Quick file count estimate (stops early if over _MAX_FILES)."""
count = 0
try:
for _ in Path(path).rglob("*"):
count += 1
if count > _MAX_FILES:
return count
except (PermissionError, OSError):
pass
return count
# ---------------------------------------------------------------------------
# CheckpointManager
# ---------------------------------------------------------------------------
class CheckpointManager:
"""Manages automatic filesystem checkpoints.
Designed to be owned by AIAgent. Call ``new_turn()`` at the start of
each conversation turn and ``ensure_checkpoint(dir, reason)`` before
any file-mutating tool call. The manager deduplicates so at most one
snapshot is taken per directory per turn.
Parameters
----------
enabled : bool
Master switch (from config / CLI flag).
max_snapshots : int
Keep at most this many checkpoints per directory.
"""
def __init__(self, enabled: bool = False, max_snapshots: int = 50):
self.enabled = enabled
self.max_snapshots = max_snapshots
self._checkpointed_dirs: Set[str] = set()
self._git_available: Optional[bool] = None # lazy probe
# ------------------------------------------------------------------
# Turn lifecycle
# ------------------------------------------------------------------
def new_turn(self) -> None:
"""Reset per-turn dedup. Call at the start of each agent iteration."""
self._checkpointed_dirs.clear()
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def ensure_checkpoint(self, working_dir: str, reason: str = "auto") -> bool:
"""Take a checkpoint if enabled and not already done this turn.
Returns True if a checkpoint was taken, False otherwise.
Never raises all errors are silently logged.
"""
if not self.enabled:
return False
# Lazy git probe
if self._git_available is None:
self._git_available = shutil.which("git") is not None
if not self._git_available:
logger.debug("Checkpoints disabled: git not found")
if not self._git_available:
return False
abs_dir = str(Path(working_dir).resolve())
# Skip root, home, and other overly broad directories
if abs_dir in ("/", str(Path.home())):
logger.debug("Checkpoint skipped: directory too broad (%s)", abs_dir)
return False
# Already checkpointed this turn?
if abs_dir in self._checkpointed_dirs:
return False
self._checkpointed_dirs.add(abs_dir)
try:
return self._take(abs_dir, reason)
except Exception as e:
logger.debug("Checkpoint failed (non-fatal): %s", e)
return False
def list_checkpoints(self, working_dir: str) -> List[Dict]:
"""List available checkpoints for a directory.
Returns a list of dicts with keys: hash, short_hash, timestamp, reason.
Most recent first.
"""
abs_dir = str(Path(working_dir).resolve())
shadow = _shadow_repo_path(abs_dir)
if not (shadow / "HEAD").exists():
return []
ok, stdout, _ = _run_git(
["log", "--format=%H|%h|%aI|%s", "--no-walk=unsorted",
"--all" if False else "HEAD", # just HEAD lineage
"-n", str(self.max_snapshots)],
shadow, abs_dir,
)
# Simpler: just use regular log
ok, stdout, _ = _run_git(
["log", "--format=%H|%h|%aI|%s", "-n", str(self.max_snapshots)],
shadow, abs_dir,
)
if not ok or not stdout:
return []
results = []
for line in stdout.splitlines():
parts = line.split("|", 3)
if len(parts) == 4:
results.append({
"hash": parts[0],
"short_hash": parts[1],
"timestamp": parts[2],
"reason": parts[3],
})
return results
def restore(self, working_dir: str, commit_hash: str) -> Dict:
"""Restore files to a checkpoint state.
Uses ``git checkout <hash> -- .`` which restores tracked files
without moving HEAD safe and reversible.
Returns dict with success/error info.
"""
abs_dir = str(Path(working_dir).resolve())
shadow = _shadow_repo_path(abs_dir)
if not (shadow / "HEAD").exists():
return {"success": False, "error": "No checkpoints exist for this directory"}
# Verify the commit exists
ok, _, err = _run_git(
["cat-file", "-t", commit_hash], shadow, abs_dir,
)
if not ok:
return {"success": False, "error": f"Checkpoint '{commit_hash}' not found", "debug": err or None}
# Take a checkpoint of current state before restoring (so you can undo the undo)
self._take(abs_dir, f"pre-rollback snapshot (restoring to {commit_hash[:8]})")
# Restore
ok, stdout, err = _run_git(
["checkout", commit_hash, "--", "."],
shadow, abs_dir, timeout=_GIT_TIMEOUT * 2,
)
if not ok:
return {"success": False, "error": "Restore failed", "debug": err or None}
# Get info about what was restored
ok2, reason_out, _ = _run_git(
["log", "--format=%s", "-1", commit_hash], shadow, abs_dir,
)
reason = reason_out if ok2 else "unknown"
return {
"success": True,
"restored_to": commit_hash[:8],
"reason": reason,
"directory": abs_dir,
}
def get_working_dir_for_path(self, file_path: str) -> str:
"""Resolve a file path to its working directory for checkpointing.
Walks up from the file's parent to find a reasonable project root
(directory containing .git, pyproject.toml, package.json, etc.).
Falls back to the file's parent directory.
"""
path = Path(file_path).resolve()
if path.is_dir():
candidate = path
else:
candidate = path.parent
# Walk up looking for project root markers
markers = {".git", "pyproject.toml", "package.json", "Cargo.toml",
"go.mod", "Makefile", "pom.xml", ".hg", "Gemfile"}
check = candidate
while check != check.parent:
if any((check / m).exists() for m in markers):
return str(check)
check = check.parent
# No project root found — use the file's parent
return str(candidate)
# ------------------------------------------------------------------
# Internal
# ------------------------------------------------------------------
def _take(self, working_dir: str, reason: str) -> bool:
"""Take a snapshot. Returns True on success."""
shadow = _shadow_repo_path(working_dir)
# Init if needed
err = _init_shadow_repo(shadow, working_dir)
if err:
logger.debug("Checkpoint init failed: %s", err)
return False
# Quick size guard — don't try to snapshot enormous directories
if _dir_file_count(working_dir) > _MAX_FILES:
logger.debug("Checkpoint skipped: >%d files in %s", _MAX_FILES, working_dir)
return False
# Stage everything
ok, _, err = _run_git(
["add", "-A"], shadow, working_dir, timeout=_GIT_TIMEOUT * 2,
)
if not ok:
logger.debug("Checkpoint git-add failed: %s", err)
return False
# Check if there's anything to commit
ok_diff, diff_out, _ = _run_git(
["diff", "--cached", "--quiet"], shadow, working_dir,
)
if ok_diff:
# No changes to commit
logger.debug("Checkpoint skipped: no changes in %s", working_dir)
return False
# Commit
ok, _, err = _run_git(
["commit", "-m", reason, "--allow-empty-message"],
shadow, working_dir, timeout=_GIT_TIMEOUT * 2,
)
if not ok:
logger.debug("Checkpoint commit failed: %s", err)
return False
logger.debug("Checkpoint taken in %s: %s", working_dir, reason)
# Prune old snapshots
self._prune(shadow, working_dir)
return True
def _prune(self, shadow_repo: Path, working_dir: str) -> None:
"""Keep only the last max_snapshots commits via orphan reset."""
ok, stdout, _ = _run_git(
["rev-list", "--count", "HEAD"], shadow_repo, working_dir,
)
if not ok:
return
try:
count = int(stdout)
except ValueError:
return
if count <= self.max_snapshots:
return
# Get the hash of the commit at the cutoff point
ok, cutoff_hash, _ = _run_git(
["rev-list", "--reverse", "HEAD", "--skip=0",
f"--max-count=1"],
shadow_repo, working_dir,
)
# For simplicity, we don't actually prune — git's pack mechanism
# handles this efficiently, and the objects are small. The log
# listing is already limited by max_snapshots.
# Full pruning would require rebase --onto or filter-branch which
# is fragile for a background feature. We just limit the log view.
logger.debug("Checkpoint repo has %d commits (limit %d)", count, self.max_snapshots)
def format_checkpoint_list(checkpoints: List[Dict], directory: str) -> str:
"""Format checkpoint list for display to user."""
if not checkpoints:
return f"No checkpoints found for {directory}"
lines = [f"📸 Checkpoints for {directory}:\n"]
for i, cp in enumerate(checkpoints, 1):
# Parse ISO timestamp to something readable
ts = cp["timestamp"]
if "T" in ts:
ts = ts.split("T")[1].split("+")[0].split("-")[0][:5] # HH:MM
date = cp["timestamp"].split("T")[0]
ts = f"{date} {ts}"
lines.append(f" {i}. {cp['short_hash']} {ts} {cp['reason']}")
lines.append(f"\nUse /rollback <number> to restore, e.g. /rollback 1")
return "\n".join(lines)

View file

@ -311,6 +311,7 @@ def _rpc_server_loop(
sys.stderr.close()
sys.stdout, sys.stderr = _real_stdout, _real_stderr
except Exception as exc:
logger.error("Tool call failed in sandbox: %s", exc, exc_info=True)
result = json.dumps({"error": str(exc)})
tool_call_counter[0] += 1
@ -327,15 +328,15 @@ def _rpc_server_loop(
conn.sendall((result + "\n").encode())
except socket.timeout:
pass
except OSError:
pass
logger.debug("RPC listener socket timeout")
except OSError as e:
logger.debug("RPC listener socket error: %s", e, exc_info=True)
finally:
if conn:
try:
conn.close()
except OSError:
pass
except OSError as e:
logger.debug("RPC conn close error: %s", e)
# ---------------------------------------------------------------------------
@ -397,9 +398,9 @@ def execute_code(
try:
# Write the auto-generated hermes_tools module
tools_src = generate_hermes_tools_module(
list(sandbox_tools) if enabled_tools else list(SANDBOX_ALLOWED_TOOLS)
)
# sandbox_tools is already the correct set (intersection with session
# tools, or SANDBOX_ALLOWED_TOOLS as fallback — see lines above).
tools_src = generate_hermes_tools_module(list(sandbox_tools))
with open(os.path.join(tmpdir, "hermes_tools.py"), "w") as f:
f.write(tools_src)
@ -457,11 +458,17 @@ def execute_code(
# --- Poll loop: watch for exit, timeout, and interrupt ---
deadline = time.monotonic() + timeout
stdout_chunks: list = []
stderr_chunks: list = []
# Background readers to avoid pipe buffer deadlocks
# Background readers to avoid pipe buffer deadlocks.
# For stdout we use a head+tail strategy: keep the first HEAD_BYTES
# and a rolling window of the last TAIL_BYTES so the final print()
# output is never lost. Stderr keeps head-only (errors appear early).
_STDOUT_HEAD_BYTES = int(MAX_STDOUT_BYTES * 0.4) # 40% head
_STDOUT_TAIL_BYTES = MAX_STDOUT_BYTES - _STDOUT_HEAD_BYTES # 60% tail
def _drain(pipe, chunks, max_bytes):
"""Simple head-only drain (used for stderr)."""
total = 0
try:
while True:
@ -472,11 +479,51 @@ def execute_code(
keep = max_bytes - total
chunks.append(data[:keep])
total += len(data)
except (ValueError, OSError) as e:
logger.debug("Error reading process output: %s", e, exc_info=True)
stdout_total_bytes = [0] # mutable ref for total bytes seen
def _drain_head_tail(pipe, head_chunks, tail_chunks, head_bytes, tail_bytes, total_ref):
"""Drain stdout keeping both head and tail data."""
head_collected = 0
from collections import deque
tail_buf = deque()
tail_collected = 0
try:
while True:
data = pipe.read(4096)
if not data:
break
total_ref[0] += len(data)
# Fill head buffer first
if head_collected < head_bytes:
keep = min(len(data), head_bytes - head_collected)
head_chunks.append(data[:keep])
head_collected += keep
data = data[keep:] # remaining goes to tail
if not data:
continue
# Everything past head goes into rolling tail buffer
tail_buf.append(data)
tail_collected += len(data)
# Evict old tail data to stay within tail_bytes budget
while tail_collected > tail_bytes and tail_buf:
oldest = tail_buf.popleft()
tail_collected -= len(oldest)
except (ValueError, OSError):
pass
# Transfer final tail to output list
tail_chunks.extend(tail_buf)
stdout_head_chunks: list = []
stdout_tail_chunks: list = []
stdout_reader = threading.Thread(
target=_drain, args=(proc.stdout, stdout_chunks, MAX_STDOUT_BYTES), daemon=True
target=_drain_head_tail,
args=(proc.stdout, stdout_head_chunks, stdout_tail_chunks,
_STDOUT_HEAD_BYTES, _STDOUT_TAIL_BYTES, stdout_total_bytes),
daemon=True
)
stderr_reader = threading.Thread(
target=_drain, args=(proc.stderr, stderr_chunks, MAX_STDERR_BYTES), daemon=True
@ -500,18 +547,27 @@ def execute_code(
stdout_reader.join(timeout=3)
stderr_reader.join(timeout=3)
stdout_text = b"".join(stdout_chunks).decode("utf-8", errors="replace")
stdout_head = b"".join(stdout_head_chunks).decode("utf-8", errors="replace")
stdout_tail = b"".join(stdout_tail_chunks).decode("utf-8", errors="replace")
stderr_text = b"".join(stderr_chunks).decode("utf-8", errors="replace")
# Truncation notice
if len(stdout_text) >= MAX_STDOUT_BYTES:
stdout_text = stdout_text[:MAX_STDOUT_BYTES] + "\n[output truncated at 50KB]"
# Assemble stdout with head+tail truncation
total_stdout = stdout_total_bytes[0]
if total_stdout > MAX_STDOUT_BYTES and stdout_tail:
omitted = total_stdout - len(stdout_head) - len(stdout_tail)
truncated_notice = (
f"\n\n... [OUTPUT TRUNCATED - {omitted:,} chars omitted "
f"out of {total_stdout:,} total] ...\n\n"
)
stdout_text = stdout_head + truncated_notice + stdout_tail
else:
stdout_text = stdout_head + stdout_tail
exit_code = proc.returncode if proc.returncode is not None else -1
duration = round(time.monotonic() - exec_start, 2)
# Wait for RPC thread to finish
server_sock.close()
server_sock.close() # break accept() so thread exits promptly
rpc_thread.join(timeout=3)
# Build response
@ -547,15 +603,19 @@ def execute_code(
finally:
# Cleanup temp dir and socket
try:
server_sock.close()
except Exception as e:
logger.debug("Server socket close error: %s", e)
try:
import shutil
shutil.rmtree(tmpdir, ignore_errors=True)
except Exception as e:
logger.debug("Could not clean temp dir: %s", e)
logger.debug("Could not clean temp dir: %s", e, exc_info=True)
try:
os.unlink(sock_path)
except OSError:
pass
except OSError as e:
logger.debug("Could not remove socket file: %s", e, exc_info=True)
def _kill_process_group(proc, escalate: bool = False):
@ -565,11 +625,12 @@ def _kill_process_group(proc, escalate: bool = False):
proc.terminate()
else:
os.killpg(os.getpgid(proc.pid), signal.SIGTERM)
except (ProcessLookupError, PermissionError):
except (ProcessLookupError, PermissionError) as e:
logger.debug("Could not kill process group: %s", e, exc_info=True)
try:
proc.kill()
except Exception as e:
logger.debug("Could not kill process: %s", e)
except Exception as e2:
logger.debug("Could not kill process: %s", e2, exc_info=True)
if escalate:
# Give the process 5s to exit after SIGTERM, then SIGKILL
@ -581,11 +642,12 @@ def _kill_process_group(proc, escalate: bool = False):
proc.kill()
else:
os.killpg(os.getpgid(proc.pid), signal.SIGKILL)
except (ProcessLookupError, PermissionError):
except (ProcessLookupError, PermissionError) as e:
logger.debug("Could not kill process group with SIGKILL: %s", e, exc_info=True)
try:
proc.kill()
except Exception as e:
logger.debug("Could not kill process: %s", e)
except Exception as e2:
logger.debug("Could not kill process: %s", e2, exc_info=True)
def _load_config() -> dict:
@ -647,7 +709,10 @@ def build_execute_code_schema(enabled_sandbox_tools: set = None) -> dict:
import_examples = [n for n in ("web_search", "terminal") if n in enabled_sandbox_tools]
if not import_examples:
import_examples = sorted(enabled_sandbox_tools)[:2]
import_str = ", ".join(import_examples) + ", ..."
if import_examples:
import_str = ", ".join(import_examples) + ", ..."
else:
import_str = "..."
description = (
"Run a Python script that can call Hermes tools programmatically. "

View file

@ -20,6 +20,7 @@ import contextlib
import io
import json
import logging
logger = logging.getLogger(__name__)
import os
import sys
import time
@ -107,8 +108,8 @@ def _build_child_progress_callback(task_index: int, parent_agent, task_count: in
short = (preview[:55] + "...") if preview and len(preview) > 55 else (preview or "")
try:
spinner.print_above(f" {prefix}├─ 💭 \"{short}\"")
except Exception:
pass
except Exception as e:
logger.debug("Spinner print_above failed: %s", e)
# Don't relay thinking to gateway (too noisy for chat)
return
@ -129,8 +130,8 @@ def _build_child_progress_callback(task_index: int, parent_agent, task_count: in
line += f" \"{short}\""
try:
spinner.print_above(line)
except Exception:
pass
except Exception as e:
logger.debug("Spinner print_above failed: %s", e)
if parent_cb:
_batch.append(tool_name)
@ -138,8 +139,8 @@ def _build_child_progress_callback(task_index: int, parent_agent, task_count: in
summary = ", ".join(_batch)
try:
parent_cb("subagent_progress", f"🔀 {prefix}{summary}")
except Exception:
pass
except Exception as e:
logger.debug("Parent callback failed: %s", e)
_batch.clear()
def _flush():
@ -148,8 +149,8 @@ def _build_child_progress_callback(task_index: int, parent_agent, task_count: in
summary = ", ".join(_batch)
try:
parent_cb("subagent_progress", f"🔀 {prefix}{summary}")
except Exception:
pass
except Exception as e:
logger.debug("Parent callback flush failed: %s", e)
_batch.clear()
_callback._flush = _flush
@ -165,10 +166,20 @@ def _run_single_child(
max_iterations: int,
parent_agent,
task_count: int = 1,
# Credential overrides from delegation config (provider:model resolution)
override_provider: Optional[str] = None,
override_base_url: Optional[str] = None,
override_api_key: Optional[str] = None,
override_api_mode: Optional[str] = None,
) -> Dict[str, Any]:
"""
Spawn and run a single child agent. Called from within a thread.
Returns a structured result dict.
When override_* params are set (from delegation config), the child uses
those credentials instead of inheriting from the parent. This enables
routing subagents to a different provider:model pair (e.g. cheap/fast
model on OpenRouter while the parent runs on Nous Portal).
"""
from run_agent import AIAgent
@ -198,12 +209,19 @@ def _run_single_child(
# count toward the session-wide limit.
shared_budget = getattr(parent_agent, "iteration_budget", None)
# Resolve effective credentials: config override > parent inherit
effective_model = model or parent_agent.model
effective_provider = override_provider or getattr(parent_agent, "provider", None)
effective_base_url = override_base_url or parent_agent.base_url
effective_api_key = override_api_key or parent_api_key
effective_api_mode = override_api_mode or getattr(parent_agent, "api_mode", None)
child = AIAgent(
base_url=parent_agent.base_url,
api_key=parent_api_key,
model=model or parent_agent.model,
provider=getattr(parent_agent, "provider", None),
api_mode=getattr(parent_agent, "api_mode", None),
base_url=effective_base_url,
api_key=effective_api_key,
model=effective_model,
provider=effective_provider,
api_mode=effective_api_mode,
max_iterations=max_iterations,
max_tokens=getattr(parent_agent, "max_tokens", None),
reasoning_config=getattr(parent_agent, "reasoning_config", None),
@ -241,8 +259,8 @@ def _run_single_child(
if child_progress_cb and hasattr(child_progress_cb, '_flush'):
try:
child_progress_cb._flush()
except Exception:
pass
except Exception as e:
logger.debug("Progress callback flush failed: %s", e)
duration = round(time.monotonic() - child_start, 2)
@ -287,8 +305,8 @@ def _run_single_child(
if hasattr(parent_agent, '_active_children'):
try:
parent_agent._active_children.remove(child)
except (ValueError, UnboundLocalError):
pass
except (ValueError, UnboundLocalError) as e:
logger.debug("Could not remove child from active_children: %s", e)
def delegate_task(
@ -326,6 +344,16 @@ def delegate_task(
default_max_iter = cfg.get("max_iterations", DEFAULT_MAX_ITERATIONS)
effective_max_iter = max_iterations or default_max_iter
# Resolve delegation credentials (provider:model pair).
# When delegation.provider is configured, this resolves the full credential
# bundle (base_url, api_key, api_mode) via the same runtime provider system
# used by CLI/gateway startup. When unconfigured, returns None values so
# children inherit from the parent.
try:
creds = _resolve_delegation_credentials(cfg, parent_agent)
except ValueError as exc:
return json.dumps({"error": str(exc)})
# Normalize to task list
if tasks and isinstance(tasks, list):
task_list = tasks[:MAX_CONCURRENT_CHILDREN]
@ -357,10 +385,14 @@ def delegate_task(
goal=t["goal"],
context=t.get("context"),
toolsets=t.get("toolsets") or toolsets,
model=None,
model=creds["model"],
max_iterations=effective_max_iter,
parent_agent=parent_agent,
task_count=1,
override_provider=creds["provider"],
override_base_url=creds["base_url"],
override_api_key=creds["api_key"],
override_api_mode=creds["api_mode"],
)
results.append(result)
else:
@ -382,10 +414,14 @@ def delegate_task(
goal=t["goal"],
context=t.get("context"),
toolsets=t.get("toolsets") or toolsets,
model=None,
model=creds["model"],
max_iterations=effective_max_iter,
parent_agent=parent_agent,
task_count=n_tasks,
override_provider=creds["provider"],
override_base_url=creds["base_url"],
override_api_key=creds["api_key"],
override_api_mode=creds["api_mode"],
)
futures[future] = i
@ -425,8 +461,8 @@ def delegate_task(
if spinner_ref and remaining > 0:
try:
spinner_ref.update_text(f"🔀 {remaining} task{'s' if remaining != 1 else ''} remaining")
except Exception:
pass
except Exception as e:
logger.debug("Spinner update_text failed: %s", e)
# Restore stdout/stderr in case redirect_stdout race left them as devnull
sys.stdout = _saved_stdout
@ -443,11 +479,78 @@ def delegate_task(
}, ensure_ascii=False)
def _resolve_delegation_credentials(cfg: dict, parent_agent) -> dict:
"""Resolve credentials for subagent delegation.
If ``delegation.provider`` is configured, resolves the full credential
bundle (base_url, api_key, api_mode, provider) via the runtime provider
system the same path used by CLI/gateway startup. This lets subagents
run on a completely different provider:model pair.
If no provider is configured, returns None values so the child inherits
everything from the parent agent.
Raises ValueError with a user-friendly message on credential failure.
"""
configured_model = cfg.get("model") or None
configured_provider = cfg.get("provider") or None
if not configured_provider:
# No provider override — child inherits everything from parent
return {
"model": configured_model,
"provider": None,
"base_url": None,
"api_key": None,
"api_mode": None,
}
# Provider is configured — resolve full credentials
try:
from hermes_cli.runtime_provider import resolve_runtime_provider
runtime = resolve_runtime_provider(requested=configured_provider)
except Exception as exc:
raise ValueError(
f"Cannot resolve delegation provider '{configured_provider}': {exc}. "
f"Check that the provider is configured (API key set, valid provider name). "
f"Available providers: openrouter, nous, zai, kimi-coding, minimax."
) from exc
api_key = runtime.get("api_key", "")
if not api_key:
raise ValueError(
f"Delegation provider '{configured_provider}' resolved but has no API key. "
f"Set the appropriate environment variable or run 'hermes login'."
)
return {
"model": configured_model,
"provider": runtime.get("provider"),
"base_url": runtime.get("base_url"),
"api_key": api_key,
"api_mode": runtime.get("api_mode"),
}
def _load_config() -> dict:
"""Load delegation config from CLI_CONFIG if available."""
"""Load delegation config from CLI_CONFIG or persistent config.
Checks the runtime config (cli.py CLI_CONFIG) first, then falls back
to the persistent config (hermes_cli/config.py load_config()) so that
``delegation.model`` / ``delegation.provider`` are picked up regardless
of the entry point (CLI, gateway, cron).
"""
try:
from cli import CLI_CONFIG
return CLI_CONFIG.get("delegation", {})
cfg = CLI_CONFIG.get("delegation", {})
if cfg:
return cfg
except Exception:
pass
try:
from hermes_cli.config import load_config
full = load_config()
return full.get("delegation", {})
except Exception:
return {}

View file

@ -59,8 +59,16 @@ class BaseEnvironment(ABC):
# Shared helpers (eliminate duplication across backends)
# ------------------------------------------------------------------
def _prepare_command(self, command: str) -> str:
"""Transform sudo commands if SUDO_PASSWORD is available."""
def _prepare_command(self, command: str) -> tuple[str, str | None]:
"""Transform sudo commands if SUDO_PASSWORD is available.
Returns:
(transformed_command, sudo_stdin) see _transform_sudo_command
for the full contract. Callers that drive a subprocess directly
should prepend sudo_stdin (when not None) to any stdin_data they
pass to Popen. Callers that embed stdin via heredoc (modal,
daytona) handle sudo_stdin in their own execute() method.
"""
from tools.terminal_tool import _transform_sudo_command
return _transform_sudo_command(command)

View file

@ -6,6 +6,7 @@ and resumed on next creation, preserving the filesystem across sessions.
"""
import logging
import time
import math
import shlex
import threading
@ -142,10 +143,9 @@ class DaytonaEnvironment(BaseEnvironment):
t = threading.Thread(target=_run, daemon=True)
t.start()
# Wait for timeout + generous buffer for network/SDK overhead
deadline = timeout + 10
deadline = time.monotonic() + timeout + 10
while t.is_alive():
t.join(timeout=0.2)
deadline -= 0.2
if is_interrupted():
with self._lock:
try:
@ -156,7 +156,7 @@ class DaytonaEnvironment(BaseEnvironment):
"output": "[Command interrupted - Daytona sandbox stopped]",
"returncode": 130,
}
if deadline <= 0:
if time.monotonic() > deadline:
# Shell timeout didn't fire and SDK is hung — force stop
with self._lock:
try:
@ -181,7 +181,20 @@ class DaytonaEnvironment(BaseEnvironment):
marker = f"HERMES_EOF_{uuid.uuid4().hex[:8]}"
command = f"{command} << '{marker}'\n{stdin_data}\n{marker}"
exec_command = self._prepare_command(command)
exec_command, sudo_stdin = self._prepare_command(command)
# Daytona sandboxes execute commands via the Daytona SDK and cannot
# pipe subprocess stdin directly the way a local Popen can. When a
# sudo password is present, use a shell-level pipe from printf so that
# the password feeds sudo -S without appearing as an echo argument
# embedded in the shell string. The password is still visible in the
# remote sandbox's command line, but it is not exposed on the user's
# local machine — which is the primary threat being mitigated.
if sudo_stdin is not None:
import shlex
exec_command = (
f"printf '%s\\n' {shlex.quote(sudo_stdin.rstrip())} | {exec_command}"
)
effective_cwd = cwd or self.cwd or None
effective_timeout = timeout or self.timeout

View file

@ -7,6 +7,7 @@ persistence via bind mounts.
import logging
import os
import shutil
import subprocess
import sys
import threading
@ -19,13 +20,57 @@ from tools.interrupt import is_interrupted
logger = logging.getLogger(__name__)
# Common Docker Desktop install paths checked when 'docker' is not in PATH.
# macOS Intel: /usr/local/bin, macOS Apple Silicon (Homebrew): /opt/homebrew/bin,
# Docker Desktop app bundle: /Applications/Docker.app/Contents/Resources/bin
_DOCKER_SEARCH_PATHS = [
"/usr/local/bin/docker",
"/opt/homebrew/bin/docker",
"/Applications/Docker.app/Contents/Resources/bin/docker",
]
_docker_executable: Optional[str] = None # resolved once, cached
def find_docker() -> Optional[str]:
"""Locate the docker CLI binary.
Checks ``shutil.which`` first (respects PATH), then probes well-known
install locations on macOS where Docker Desktop may not be in PATH
(e.g. when running as a gateway service via launchd).
Returns the absolute path, or ``None`` if docker cannot be found.
"""
global _docker_executable
if _docker_executable is not None:
return _docker_executable
found = shutil.which("docker")
if found:
_docker_executable = found
return found
for path in _DOCKER_SEARCH_PATHS:
if os.path.isfile(path) and os.access(path, os.X_OK):
_docker_executable = path
logger.info("Found docker at non-PATH location: %s", path)
return path
return None
# Security flags applied to every container.
# The container itself is the security boundary (isolated from host).
# We drop all capabilities, block privilege escalation, and limit PIDs.
# We drop all capabilities then add back the minimum needed:
# DAC_OVERRIDE - root can write to bind-mounted dirs owned by host user
# CHOWN/FOWNER - package managers (pip, npm, apt) need to set file ownership
# Block privilege escalation and limit PIDs.
# /tmp is size-limited and nosuid but allows exec (needed by pip/npm builds).
_SECURITY_ARGS = [
"--cap-drop", "ALL",
"--cap-add", "DAC_OVERRIDE",
"--cap-add", "CHOWN",
"--cap-add", "FOWNER",
"--security-opt", "no-new-privileges",
"--pids-limit", "256",
"--tmpfs", "/tmp:rw,nosuid,size=512m",
@ -139,9 +184,14 @@ class DockerEnvironment(BaseEnvironment):
all_run_args = list(_SECURITY_ARGS) + writable_args + resource_args + volume_args
logger.info(f"Docker run_args: {all_run_args}")
# Resolve the docker executable once so it works even when
# /usr/local/bin is not in PATH (common on macOS gateway/service).
docker_exe = find_docker() or "docker"
self._inner = _Docker(
image=image, cwd=cwd, timeout=timeout,
run_args=all_run_args,
executable=docker_exe,
)
self._container_id = self._inner.container_id
@ -156,8 +206,9 @@ class DockerEnvironment(BaseEnvironment):
if _storage_opt_ok is not None:
return _storage_opt_ok
try:
docker = find_docker() or "docker"
result = subprocess.run(
["docker", "info", "--format", "{{.Driver}}"],
[docker, "info", "--format", "{{.Driver}}"],
capture_output=True, text=True, timeout=10,
)
driver = result.stdout.strip().lower()
@ -167,14 +218,14 @@ class DockerEnvironment(BaseEnvironment):
# overlay2 only supports storage-opt on XFS with pquota.
# Probe by attempting a dry-ish run — the fastest reliable check.
probe = subprocess.run(
["docker", "create", "--storage-opt", "size=1m", "hello-world"],
[docker, "create", "--storage-opt", "size=1m", "hello-world"],
capture_output=True, text=True, timeout=15,
)
if probe.returncode == 0:
# Clean up the created container
container_id = probe.stdout.strip()
if container_id:
subprocess.run(["docker", "rm", container_id],
subprocess.run([docker, "rm", container_id],
capture_output=True, timeout=5)
_storage_opt_ok = True
else:
@ -187,10 +238,18 @@ class DockerEnvironment(BaseEnvironment):
def execute(self, command: str, cwd: str = "", *,
timeout: int | None = None,
stdin_data: str | None = None) -> dict:
exec_command = self._prepare_command(command)
exec_command, sudo_stdin = self._prepare_command(command)
work_dir = cwd or self.cwd
effective_timeout = timeout or self.timeout
# Merge sudo password (if any) with caller-supplied stdin_data.
if sudo_stdin is not None and stdin_data is not None:
effective_stdin = sudo_stdin + stdin_data
elif sudo_stdin is not None:
effective_stdin = sudo_stdin
else:
effective_stdin = stdin_data
# docker exec -w doesn't expand ~, so prepend a cd into the command
if work_dir == "~" or work_dir.startswith("~/"):
exec_command = f"cd {work_dir} && {exec_command}"
@ -198,7 +257,7 @@ class DockerEnvironment(BaseEnvironment):
assert self._inner.container_id, "Container not started"
cmd = [self._inner.config.executable, "exec"]
if stdin_data is not None:
if effective_stdin is not None:
cmd.append("-i")
cmd.extend(["-w", work_dir])
for key in self._inner.config.forward_env:
@ -213,12 +272,12 @@ class DockerEnvironment(BaseEnvironment):
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
stdin=subprocess.PIPE if stdin_data else subprocess.DEVNULL,
stdin=subprocess.PIPE if effective_stdin else subprocess.DEVNULL,
text=True,
)
if stdin_data:
if effective_stdin:
try:
proc.stdin.write(stdin_data)
proc.stdin.write(effective_stdin)
proc.stdin.close()
except Exception:
pass

View file

@ -161,7 +161,18 @@ class LocalEnvironment(BaseEnvironment):
work_dir = cwd or self.cwd or os.getcwd()
effective_timeout = timeout or self.timeout
exec_command = self._prepare_command(command)
exec_command, sudo_stdin = self._prepare_command(command)
# Merge the sudo password (if any) with caller-supplied stdin_data.
# sudo -S reads exactly one line (the password) then passes the rest
# of stdin to the child, so prepending is safe even when stdin_data
# is also present.
if sudo_stdin is not None and stdin_data is not None:
effective_stdin = sudo_stdin + stdin_data
elif sudo_stdin is not None:
effective_stdin = sudo_stdin
else:
effective_stdin = stdin_data
try:
# The fence wrapper uses bash syntax (semicolons, $?, printf).
@ -195,14 +206,14 @@ class LocalEnvironment(BaseEnvironment):
errors="replace",
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
stdin=subprocess.PIPE if stdin_data is not None else subprocess.DEVNULL,
stdin=subprocess.PIPE if effective_stdin is not None else subprocess.DEVNULL,
preexec_fn=None if _IS_WINDOWS else os.setsid,
)
if stdin_data is not None:
if effective_stdin is not None:
def _write_stdin():
try:
proc.stdin.write(stdin_data)
proc.stdin.write(effective_stdin)
proc.stdin.close()
except (BrokenPipeError, OSError):
pass

View file

@ -50,7 +50,7 @@ class ModalEnvironment(BaseEnvironment):
def __init__(
self,
image: str,
cwd: str = "~",
cwd: str = "/root",
timeout: int = 60,
modal_sandbox_kwargs: Optional[Dict[str, Any]] = None,
persistent_filesystem: bool = True,
@ -95,6 +95,7 @@ class ModalEnvironment(BaseEnvironment):
startup_timeout=180.0,
runtime_timeout=3600.0,
modal_sandbox_kwargs=sandbox_kwargs,
install_pipx=True, # Required: installs pipx + swe-rex runtime (swerex-remote)
)
def execute(self, command: str, cwd: str = "", *,
@ -106,7 +107,20 @@ class ModalEnvironment(BaseEnvironment):
marker = f"HERMES_EOF_{uuid.uuid4().hex[:8]}"
command = f"{command} << '{marker}'\n{stdin_data}\n{marker}"
exec_command = self._prepare_command(command)
exec_command, sudo_stdin = self._prepare_command(command)
# Modal sandboxes execute commands via the Modal SDK and cannot pipe
# subprocess stdin directly the way a local Popen can. When a sudo
# password is present, use a shell-level pipe from printf so that the
# password feeds sudo -S without appearing as an echo argument embedded
# in the shell string. The password is still visible in the remote
# sandbox's command line, but it is not exposed on the user's local
# machine — which is the primary threat being mitigated.
if sudo_stdin is not None:
import shlex
exec_command = (
f"printf '%s\\n' {shlex.quote(sudo_stdin.rstrip())} | {exec_command}"
)
# Run in a background thread so we can poll for interrupts
result_holder = {"value": None, "error": None}
@ -137,6 +151,10 @@ class ModalEnvironment(BaseEnvironment):
def cleanup(self):
"""Snapshot the filesystem (if persistent) then stop the sandbox."""
# Check if _inner was ever set (init may have failed)
if not hasattr(self, '_inner') or self._inner is None:
return
if self._persistent:
try:
sandbox = getattr(self._inner, 'deployment', None)

View file

@ -228,7 +228,15 @@ class SingularityEnvironment(BaseEnvironment):
effective_timeout = timeout or self.timeout
work_dir = cwd or self.cwd
exec_command = self._prepare_command(command)
exec_command, sudo_stdin = self._prepare_command(command)
# Merge sudo password (if any) with caller-supplied stdin_data.
if sudo_stdin is not None and stdin_data is not None:
effective_stdin = sudo_stdin + stdin_data
elif sudo_stdin is not None:
effective_stdin = sudo_stdin
else:
effective_stdin = stdin_data
# apptainer exec --pwd doesn't expand ~, so prepend a cd into the command
if work_dir == "~" or work_dir.startswith("~/"):
@ -245,12 +253,12 @@ class SingularityEnvironment(BaseEnvironment):
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
stdin=subprocess.PIPE if stdin_data else subprocess.DEVNULL,
stdin=subprocess.PIPE if effective_stdin else subprocess.DEVNULL,
text=True,
)
if stdin_data:
if effective_stdin:
try:
proc.stdin.write(stdin_data)
proc.stdin.write(effective_stdin)
proc.stdin.close()
except Exception:
pass

View file

@ -69,15 +69,23 @@ class SSHEnvironment(BaseEnvironment):
timeout: int | None = None,
stdin_data: str | None = None) -> dict:
work_dir = cwd or self.cwd
exec_command = self._prepare_command(command)
exec_command, sudo_stdin = self._prepare_command(command)
wrapped = f'cd {work_dir} && {exec_command}'
effective_timeout = timeout or self.timeout
# Merge sudo password (if any) with caller-supplied stdin_data.
if sudo_stdin is not None and stdin_data is not None:
effective_stdin = sudo_stdin + stdin_data
elif sudo_stdin is not None:
effective_stdin = sudo_stdin
else:
effective_stdin = stdin_data
cmd = self._build_ssh_command()
cmd.extend(["bash", "-c", wrapped])
try:
kwargs = self._build_run_kwargs(timeout, stdin_data)
kwargs = self._build_run_kwargs(timeout, effective_stdin)
# Remove timeout from kwargs -- we handle it in the poll loop
kwargs.pop("timeout", None)
@ -87,13 +95,13 @@ class SSHEnvironment(BaseEnvironment):
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
stdin=subprocess.PIPE if stdin_data else subprocess.DEVNULL,
stdin=subprocess.PIPE if effective_stdin else subprocess.DEVNULL,
text=True,
)
if stdin_data:
if effective_stdin:
try:
proc.stdin.write(stdin_data)
proc.stdin.write(effective_stdin)
proc.stdin.close()
except Exception:
pass

View file

@ -400,10 +400,16 @@ class ShellFileOperations(FileOperations):
return home
elif path.startswith('~/'):
return home + path[1:] # Replace ~ with home
# ~username format - let shell expand it
expand_result = self._exec(f"echo {path}")
if expand_result.exit_code == 0:
return expand_result.stdout.strip()
# ~username format - extract and validate username before
# letting shell expand it (prevent shell injection via
# paths like "~; rm -rf /").
rest = path[1:] # strip leading ~
slash_idx = rest.find('/')
username = rest[:slash_idx] if slash_idx >= 0 else rest
if username and re.fullmatch(r'[a-zA-Z0-9._-]+', username):
expand_result = self._exec(f"echo {path}")
if expand_result.exit_code == 0 and expand_result.stdout.strip():
return expand_result.stdout.strip()
return path
@ -956,37 +962,35 @@ class ShellFileOperations(FileOperations):
# rg match lines: "file:lineno:content" (colon separator)
# rg context lines: "file-lineno-content" (dash separator)
# rg group seps: "--"
# Note: on Windows, paths contain drive letters (e.g. C:\path),
# so naive split(":") breaks. Use regex to handle both platforms.
_match_re = re.compile(r'^([A-Za-z]:)?(.*?):(\d+):(.*)$')
_ctx_re = re.compile(r'^([A-Za-z]:)?(.*?)-(\d+)-(.*)$')
matches = []
for line in result.stdout.strip().split('\n'):
if not line or line == "--":
continue
# Try match line first (colon-separated: file:line:content)
parts = line.split(':', 2)
if len(parts) >= 3:
try:
matches.append(SearchMatch(
path=parts[0],
line_number=int(parts[1]),
content=parts[2][:500]
))
continue
except ValueError:
pass
m = _match_re.match(line)
if m:
matches.append(SearchMatch(
path=(m.group(1) or '') + m.group(2),
line_number=int(m.group(3)),
content=m.group(4)[:500]
))
continue
# Try context line (dash-separated: file-line-content)
# Only attempt if context was requested to avoid false positives
if context > 0:
parts = line.split('-', 2)
if len(parts) >= 3:
try:
matches.append(SearchMatch(
path=parts[0],
line_number=int(parts[1]),
content=parts[2][:500]
))
except ValueError:
pass
m = _ctx_re.match(line)
if m:
matches.append(SearchMatch(
path=(m.group(1) or '') + m.group(2),
line_number=int(m.group(3)),
content=m.group(4)[:500]
))
total = len(matches)
page = matches[offset:offset + limit]
@ -1053,34 +1057,33 @@ class ShellFileOperations(FileOperations):
# grep match lines: "file:lineno:content" (colon)
# grep context lines: "file-lineno-content" (dash)
# grep group seps: "--"
# Note: on Windows, paths contain drive letters (e.g. C:\path),
# so naive split(":") breaks. Use regex to handle both platforms.
_match_re = re.compile(r'^([A-Za-z]:)?(.*?):(\d+):(.*)$')
_ctx_re = re.compile(r'^([A-Za-z]:)?(.*?)-(\d+)-(.*)$')
matches = []
for line in result.stdout.strip().split('\n'):
if not line or line == "--":
continue
parts = line.split(':', 2)
if len(parts) >= 3:
try:
matches.append(SearchMatch(
path=parts[0],
line_number=int(parts[1]),
content=parts[2][:500]
))
continue
except ValueError:
pass
m = _match_re.match(line)
if m:
matches.append(SearchMatch(
path=(m.group(1) or '') + m.group(2),
line_number=int(m.group(3)),
content=m.group(4)[:500]
))
continue
if context > 0:
parts = line.split('-', 2)
if len(parts) >= 3:
try:
matches.append(SearchMatch(
path=parts[0],
line_number=int(parts[1]),
content=parts[2][:500]
))
except ValueError:
pass
m = _ctx_re.match(line)
if m:
matches.append(SearchMatch(
path=(m.group(1) or '') + m.group(2),
line_number=int(m.group(3)),
content=m.group(4)[:500]
))
total = len(matches)
page = matches[offset:offset + limit]

View file

@ -14,6 +14,14 @@ logger = logging.getLogger(__name__)
_file_ops_lock = threading.Lock()
_file_ops_cache: dict = {}
# Track files read per task to detect re-read loops after context compression.
# Per task_id we store:
# "last_key": the key of the most recent read/search call (or None)
# "consecutive": how many times that exact call has been repeated in a row
# "read_history": set of (path, offset, limit) tuples for get_read_files_summary
_read_tracker_lock = threading.Lock()
_read_tracker: dict = {}
def _get_file_ops(task_id: str = "default") -> ShellFileOperations:
"""Get or create ShellFileOperations for a terminal environment.
@ -91,6 +99,7 @@ def _get_file_ops(task_id: str = "default") -> ShellFileOperations:
"container_memory": config.get("container_memory", 5120),
"container_disk": config.get("container_disk", 51200),
"container_persistent": config.get("container_persistent", True),
"docker_volumes": config.get("docker_volumes", []),
}
terminal_env = _create_environment(
env_type=env_type,
@ -131,11 +140,97 @@ def read_file_tool(path: str, offset: int = 1, limit: int = 500, task_id: str =
result = file_ops.read_file(path, offset, limit)
if result.content:
result.content = redact_sensitive_text(result.content)
return json.dumps(result.to_dict(), ensure_ascii=False)
result_dict = result.to_dict()
# Track reads to detect *consecutive* re-read loops.
# The counter resets whenever any other tool is called in between,
# so only truly back-to-back identical reads trigger warnings/blocks.
read_key = ("read", path, offset, limit)
with _read_tracker_lock:
task_data = _read_tracker.setdefault(task_id, {
"last_key": None, "consecutive": 0, "read_history": set(),
})
task_data["read_history"].add((path, offset, limit))
if task_data["last_key"] == read_key:
task_data["consecutive"] += 1
else:
task_data["last_key"] = read_key
task_data["consecutive"] = 1
count = task_data["consecutive"]
if count >= 4:
# Hard block: stop returning content to break the loop
return json.dumps({
"error": (
f"BLOCKED: You have read this exact file region {count} times in a row. "
"The content has NOT changed. You already have this information. "
"STOP re-reading and proceed with your task."
),
"path": path,
"already_read": count,
}, ensure_ascii=False)
elif count >= 3:
result_dict["_warning"] = (
f"You have read this exact file region {count} times consecutively. "
"The content has not changed since your last read. Use the information you already have. "
"If you are stuck in a loop, stop reading and proceed with writing or responding."
)
return json.dumps(result_dict, ensure_ascii=False)
except Exception as e:
return json.dumps({"error": str(e)}, ensure_ascii=False)
def get_read_files_summary(task_id: str = "default") -> list:
"""Return a list of files read in this session for the given task.
Used by context compression to preserve file-read history across
compression boundaries.
"""
with _read_tracker_lock:
task_data = _read_tracker.get(task_id, {})
read_history = task_data.get("read_history", set())
seen_paths: dict = {}
for (path, offset, limit) in read_history:
if path not in seen_paths:
seen_paths[path] = []
seen_paths[path].append(f"lines {offset}-{offset + limit - 1}")
return [
{"path": p, "regions": regions}
for p, regions in sorted(seen_paths.items())
]
def clear_read_tracker(task_id: str = None):
"""Clear the read tracker.
Call with a task_id to clear just that task, or without to clear all.
Should be called when a session is destroyed to prevent memory leaks
in long-running gateway processes.
"""
with _read_tracker_lock:
if task_id:
_read_tracker.pop(task_id, None)
else:
_read_tracker.clear()
def notify_other_tool_call(task_id: str = "default"):
"""Reset consecutive read/search counter for a task.
Called by the tool dispatcher (model_tools.py) whenever a tool OTHER
than read_file / search_files is executed. This ensures we only warn
or block on *truly consecutive* repeated reads if the agent does
anything else in between (write, patch, terminal, etc.) the counter
resets and the next read is treated as fresh.
"""
with _read_tracker_lock:
task_data = _read_tracker.get(task_id)
if task_data:
task_data["last_key"] = None
task_data["consecutive"] = 0
def write_file_tool(path: str, content: str, task_id: str = "default") -> str:
"""Write content to a file."""
try:
@ -143,7 +238,7 @@ def write_file_tool(path: str, content: str, task_id: str = "default") -> str:
result = file_ops.write_file(path, content)
return json.dumps(result.to_dict(), ensure_ascii=False)
except Exception as e:
print(f"[FileTools] write_file error: {type(e).__name__}: {e}", flush=True)
logger.error("write_file error: %s: %s", type(e).__name__, e)
return json.dumps({"error": str(e)}, ensure_ascii=False)
@ -184,6 +279,30 @@ def search_tool(pattern: str, target: str = "content", path: str = ".",
task_id: str = "default") -> str:
"""Search for content or files."""
try:
# Track searches to detect *consecutive* repeated search loops.
search_key = ("search", pattern, target, str(path), file_glob or "")
with _read_tracker_lock:
task_data = _read_tracker.setdefault(task_id, {
"last_key": None, "consecutive": 0, "read_history": set(),
})
if task_data["last_key"] == search_key:
task_data["consecutive"] += 1
else:
task_data["last_key"] = search_key
task_data["consecutive"] = 1
count = task_data["consecutive"]
if count >= 4:
return json.dumps({
"error": (
f"BLOCKED: You have run this exact search {count} times in a row. "
"The results have NOT changed. You already have this information. "
"STOP re-searching and proceed with your task."
),
"pattern": pattern,
"already_searched": count,
}, ensure_ascii=False)
file_ops = _get_file_ops(task_id)
result = file_ops.search(
pattern=pattern, path=path, target=target, file_glob=file_glob,
@ -194,6 +313,13 @@ def search_tool(pattern: str, target: str = "content", path: str = ".",
if hasattr(m, 'content') and m.content:
m.content = redact_sensitive_text(m.content)
result_dict = result.to_dict()
if count >= 3:
result_dict["_warning"] = (
f"You have run this exact search {count} times consecutively. "
"The results have not changed. Use the information you already have."
)
result_json = json.dumps(result_dict, ensure_ascii=False)
# Hint when results were truncated — explicit next offset is clearer
# than relying on the model to infer it from total_count vs match count.

View file

@ -1,8 +1,16 @@
"""Honcho tool for querying user context via dialectic reasoning.
"""Honcho tools for user context retrieval.
Registers ``query_user_context`` -- an LLM-callable tool that asks Honcho
about the current user's history, preferences, goals, and communication
style. The session key is injected at runtime by the agent loop via
Registers three complementary tools, ordered by capability:
honcho_context dialectic Q&A (LLM-powered, direct answers)
honcho_search semantic search (fast, no LLM, raw excerpts)
honcho_profile peer card (fast, no LLM, structured facts)
Use honcho_context when you need Honcho to synthesize an answer.
Use honcho_search or honcho_profile when you want raw data to reason
over yourself.
The session key is injected at runtime by the agent loop via
``set_session_context()``.
"""
@ -34,54 +42,6 @@ def clear_session_context() -> None:
_session_key = None
# ── Tool schema ──
HONCHO_TOOL_SCHEMA = {
"name": "query_user_context",
"description": (
"Query Honcho to retrieve relevant context about the user based on their "
"history and preferences. Use this when you need to understand the user's "
"background, preferences, past interactions, or goals. This helps you "
"personalize your responses and provide more relevant assistance."
),
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": (
"A natural language question about the user. Examples: "
"'What are this user's main goals?', "
"'What communication style does this user prefer?', "
"'What topics has this user discussed recently?', "
"'What is this user's technical expertise level?'"
),
}
},
"required": ["query"],
},
}
# ── Tool handler ──
def _handle_query_user_context(args: dict, **kw) -> str:
"""Execute the Honcho context query."""
query = args.get("query", "")
if not query:
return json.dumps({"error": "Missing required parameter: query"})
if not _session_manager or not _session_key:
return json.dumps({"error": "Honcho is not active for this session."})
try:
result = _session_manager.get_user_context(_session_key, query)
return json.dumps({"result": result})
except Exception as e:
logger.error("Error querying Honcho user context: %s", e)
return json.dumps({"error": f"Failed to query user context: {e}"})
# ── Availability check ──
def _check_honcho_available() -> bool:
@ -89,14 +49,201 @@ def _check_honcho_available() -> bool:
return _session_manager is not None and _session_key is not None
# ── honcho_profile ──
_PROFILE_SCHEMA = {
"name": "honcho_profile",
"description": (
"Retrieve the user's peer card from Honcho — a curated list of key facts "
"about them (name, role, preferences, communication style, patterns). "
"Fast, no LLM reasoning, minimal cost. "
"Use this at conversation start or when you need a quick factual snapshot. "
"Use honcho_context instead when you need Honcho to synthesize an answer."
),
"parameters": {
"type": "object",
"properties": {},
"required": [],
},
}
def _handle_honcho_profile(args: dict, **kw) -> str:
if not _session_manager or not _session_key:
return json.dumps({"error": "Honcho is not active for this session."})
try:
card = _session_manager.get_peer_card(_session_key)
if not card:
return json.dumps({"result": "No profile facts available yet. The user's profile builds over time through conversations."})
return json.dumps({"result": card})
except Exception as e:
logger.error("Error fetching Honcho peer card: %s", e)
return json.dumps({"error": f"Failed to fetch profile: {e}"})
# ── honcho_search ──
_SEARCH_SCHEMA = {
"name": "honcho_search",
"description": (
"Semantic search over Honcho's stored context about the user. "
"Returns raw excerpts ranked by relevance to your query — no LLM synthesis. "
"Cheaper and faster than honcho_context. "
"Good when you want to find specific past facts and reason over them yourself. "
"Use honcho_context when you need a direct synthesized answer."
),
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "What to search for in Honcho's memory (e.g. 'programming languages', 'past projects', 'timezone').",
},
"max_tokens": {
"type": "integer",
"description": "Token budget for returned context (default 800, max 2000).",
},
},
"required": ["query"],
},
}
def _handle_honcho_search(args: dict, **kw) -> str:
query = args.get("query", "")
if not query:
return json.dumps({"error": "Missing required parameter: query"})
if not _session_manager or not _session_key:
return json.dumps({"error": "Honcho is not active for this session."})
max_tokens = min(int(args.get("max_tokens", 800)), 2000)
try:
result = _session_manager.search_context(_session_key, query, max_tokens=max_tokens)
if not result:
return json.dumps({"result": "No relevant context found."})
return json.dumps({"result": result})
except Exception as e:
logger.error("Error searching Honcho context: %s", e)
return json.dumps({"error": f"Failed to search context: {e}"})
# ── honcho_context (dialectic — LLM-powered) ──
_QUERY_SCHEMA = {
"name": "honcho_context",
"description": (
"Ask Honcho a natural language question and get a synthesized answer. "
"Uses Honcho's LLM (dialectic reasoning) — higher cost than honcho_profile or honcho_search. "
"Can query about any peer: the user (default), the AI assistant, or any named peer. "
"Examples: 'What are the user's main goals?', 'What has hermes been working on?', "
"'What is the user's technical expertise level?'"
),
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "A natural language question.",
},
"peer": {
"type": "string",
"description": "Which peer to query about: 'user' (default) or 'ai'. Omit for user.",
},
},
"required": ["query"],
},
}
def _handle_honcho_context(args: dict, **kw) -> str:
query = args.get("query", "")
if not query:
return json.dumps({"error": "Missing required parameter: query"})
if not _session_manager or not _session_key:
return json.dumps({"error": "Honcho is not active for this session."})
peer_target = args.get("peer", "user")
try:
result = _session_manager.dialectic_query(_session_key, query, peer=peer_target)
return json.dumps({"result": result or "No result from Honcho."})
except Exception as e:
logger.error("Error querying Honcho context: %s", e)
return json.dumps({"error": f"Failed to query context: {e}"})
# ── honcho_conclude ──
_CONCLUDE_SCHEMA = {
"name": "honcho_conclude",
"description": (
"Write a conclusion about the user back to Honcho's memory. "
"Conclusions are persistent facts that build the user's profile — "
"preferences, corrections, clarifications, project context, or anything "
"the user tells you that should be remembered across sessions. "
"Use this when the user explicitly states a preference, corrects you, "
"or shares something they want remembered. "
"Examples: 'User prefers dark mode', 'User's project uses Python 3.11', "
"'User corrected: their name is spelled Eri not Eric'."
),
"parameters": {
"type": "object",
"properties": {
"conclusion": {
"type": "string",
"description": "A factual statement about the user to persist in memory.",
}
},
"required": ["conclusion"],
},
}
def _handle_honcho_conclude(args: dict, **kw) -> str:
conclusion = args.get("conclusion", "")
if not conclusion:
return json.dumps({"error": "Missing required parameter: conclusion"})
if not _session_manager or not _session_key:
return json.dumps({"error": "Honcho is not active for this session."})
try:
ok = _session_manager.create_conclusion(_session_key, conclusion)
if ok:
return json.dumps({"result": f"Conclusion saved: {conclusion}"})
return json.dumps({"error": "Failed to save conclusion."})
except Exception as e:
logger.error("Error creating Honcho conclusion: %s", e)
return json.dumps({"error": f"Failed to save conclusion: {e}"})
# ── Registration ──
from tools.registry import registry
registry.register(
name="query_user_context",
name="honcho_profile",
toolset="honcho",
schema=HONCHO_TOOL_SCHEMA,
handler=_handle_query_user_context,
schema=_PROFILE_SCHEMA,
handler=_handle_honcho_profile,
check_fn=_check_honcho_available,
)
registry.register(
name="honcho_search",
toolset="honcho",
schema=_SEARCH_SCHEMA,
handler=_handle_honcho_search,
check_fn=_check_honcho_available,
)
registry.register(
name="honcho_context",
toolset="honcho",
schema=_QUERY_SCHEMA,
handler=_handle_honcho_context,
check_fn=_check_honcho_available,
)
registry.register(
name="honcho_conclude",
toolset="honcho",
schema=_CONCLUDE_SCHEMA,
handler=_handle_honcho_conclude,
check_fn=_check_honcho_available,
)

View file

@ -209,7 +209,7 @@ def _upscale_image(image_url: str, original_prompt: str) -> Dict[str, Any]:
return None
except Exception as e:
logger.error("Error upscaling image: %s", e)
logger.error("Error upscaling image: %s", e, exc_info=True)
return None
@ -377,7 +377,7 @@ def image_generate_tool(
except Exception as e:
generation_time = (datetime.datetime.now() - start_time).total_seconds()
error_msg = f"Error generating image: {str(e)}"
logger.error("%s", error_msg)
logger.error("%s", error_msg, exc_info=True)
# Prepare error response - minimal format
response_data = {

View file

@ -456,17 +456,13 @@ class SamplingHandler:
# Resolve model
model = self._resolve_model(getattr(params, "modelPreferences", None))
# Get auxiliary LLM client
from agent.auxiliary_client import get_text_auxiliary_client
client, default_model = get_text_auxiliary_client()
if client is None:
self.metrics["errors"] += 1
return self._error("No LLM provider available for sampling")
# Get auxiliary LLM client via centralized router
from agent.auxiliary_client import call_llm
resolved_model = model or default_model
# Model whitelist check (we need to resolve model before calling)
resolved_model = model or self.model_override or ""
# Model whitelist check
if self.allowed_models and resolved_model not in self.allowed_models:
if self.allowed_models and resolved_model and resolved_model not in self.allowed_models:
logger.warning(
"MCP server '%s' requested model '%s' not in allowed_models",
self.server_name, resolved_model,
@ -484,20 +480,15 @@ class SamplingHandler:
# Build LLM call kwargs
max_tokens = min(params.maxTokens, self.max_tokens_cap)
call_kwargs: dict = {
"model": resolved_model,
"messages": messages,
"max_tokens": max_tokens,
}
call_temperature = None
if hasattr(params, "temperature") and params.temperature is not None:
call_kwargs["temperature"] = params.temperature
if stop := getattr(params, "stopSequences", None):
call_kwargs["stop"] = stop
call_temperature = params.temperature
# Forward server-provided tools
call_tools = None
server_tools = getattr(params, "tools", None)
if server_tools:
call_kwargs["tools"] = [
call_tools = [
{
"type": "function",
"function": {
@ -508,9 +499,6 @@ class SamplingHandler:
}
for t in server_tools
]
if tool_choice := getattr(params, "toolChoice", None):
mode = getattr(tool_choice, "mode", "auto")
call_kwargs["tool_choice"] = {"auto": "auto", "required": "required", "none": "none"}.get(mode, "auto")
logger.log(
self.audit_level,
@ -520,7 +508,15 @@ class SamplingHandler:
# Offload sync LLM call to thread (non-blocking)
def _sync_call():
return client.chat.completions.create(**call_kwargs)
return call_llm(
task="mcp",
model=resolved_model or None,
messages=messages,
temperature=call_temperature,
max_tokens=max_tokens,
tools=call_tools,
timeout=self.timeout,
)
try:
response = await asyncio.wait_for(
@ -538,6 +534,14 @@ class SamplingHandler:
f"Sampling LLM call failed: {_sanitize_error(str(exc))}"
)
# Guard against empty choices (content filtering, provider errors)
if not getattr(response, "choices", None):
self.metrics["errors"] += 1
return self._error(
f"LLM returned empty response (no choices) for server "
f"'{self.server_name}'"
)
# Track metrics
choice = response.choices[0]
self.metrics["requests"] += 1
@ -1323,29 +1327,23 @@ def discover_mcp_tools() -> List[str]:
async def _discover_one(name: str, cfg: dict) -> List[str]:
"""Connect to a single server and return its registered tool names."""
transport_desc = cfg.get("url", f'{cfg.get("command", "?")} {" ".join(cfg.get("args", [])[:2])}')
try:
registered = await _discover_and_register_server(name, cfg)
transport_type = "HTTP" if "url" in cfg else "stdio"
return registered
except Exception as exc:
logger.warning(
"Failed to connect to MCP server '%s': %s",
name, exc,
)
return []
return await _discover_and_register_server(name, cfg)
async def _discover_all():
nonlocal failed_count
server_names = list(new_servers.keys())
# Connect to all servers in PARALLEL
results = await asyncio.gather(
*(_discover_one(name, cfg) for name, cfg in new_servers.items()),
return_exceptions=True,
)
for result in results:
for name, result in zip(server_names, results):
if isinstance(result, Exception):
failed_count += 1
logger.warning("MCP discovery error: %s", result)
logger.warning(
"Failed to connect to MCP server '%s': %s",
name, result,
)
elif isinstance(result, list):
all_tools.extend(result)
else:

View file

@ -1,39 +1,30 @@
"""Shared OpenRouter API client for Hermes tools.
Provides a single lazy-initialized AsyncOpenAI client that all tool modules
can share, eliminating the duplicated _get_openrouter_client() /
_get_summarizer_client() pattern previously copy-pasted across web_tools,
vision_tools, mixture_of_agents_tool, and session_search_tool.
can share. Routes through the centralized provider router in
agent/auxiliary_client.py so auth, headers, and API format are handled
consistently.
"""
import os
from openai import AsyncOpenAI
from hermes_constants import OPENROUTER_BASE_URL
_client: AsyncOpenAI | None = None
_client = None
def get_async_client() -> AsyncOpenAI:
"""Return a shared AsyncOpenAI client pointed at OpenRouter.
def get_async_client():
"""Return a shared async OpenAI-compatible client for OpenRouter.
The client is created lazily on first call and reused thereafter.
Uses the centralized provider router for auth and client construction.
Raises ValueError if OPENROUTER_API_KEY is not set.
"""
global _client
if _client is None:
api_key = os.getenv("OPENROUTER_API_KEY")
if not api_key:
from agent.auxiliary_client import resolve_provider_client
client, _model = resolve_provider_client("openrouter", async_mode=True)
if client is None:
raise ValueError("OPENROUTER_API_KEY environment variable not set")
_client = AsyncOpenAI(
api_key=api_key,
base_url=OPENROUTER_BASE_URL,
default_headers={
"HTTP-Referer": "https://github.com/NousResearch/hermes-agent",
"X-OpenRouter-Title": "Hermes Agent",
"X-OpenRouter-Categories": "productivity,cli-agent",
},
)
_client = client
return _client

View file

@ -148,11 +148,14 @@ class ProcessRegistry:
if use_pty:
# Try PTY mode for interactive CLI tools
try:
import ptyprocess
if _IS_WINDOWS:
from winpty import PtyProcess as _PtyProcessCls
else:
from ptyprocess import PtyProcess as _PtyProcessCls
user_shell = _find_shell()
pty_env = os.environ | (env_vars or {})
pty_env["PYTHONUNBUFFERED"] = "1"
pty_proc = ptyprocess.PtyProcess.spawn(
pty_proc = _PtyProcessCls.spawn(
[user_shell, "-lic", command],
cwd=session.cwd,
env=pty_env,

View file

@ -54,9 +54,10 @@ ENVIRONMENTS_DIR = TINKER_ATROPOS_ROOT / "tinker_atropos" / "environments"
CONFIGS_DIR = TINKER_ATROPOS_ROOT / "configs"
LOGS_DIR = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes")) / "logs" / "rl_training"
# Ensure logs directory exists
LOGS_DIR.mkdir(parents=True, exist_ok=True)
def _ensure_logs_dir():
"""Lazily create logs directory on first use (avoid side effects at import time)."""
if TINKER_ATROPOS_ROOT.exists():
LOGS_DIR.mkdir(exist_ok=True)
# ============================================================================
# Locked Configuration (Infrastructure Settings)
@ -314,6 +315,8 @@ async def _spawn_training_run(run_state: RunState, config_path: Path):
"""
run_id = run_state.run_id
_ensure_logs_dir()
# Log file paths
api_log = LOGS_DIR / f"api_{run_id}.log"
trainer_log = LOGS_DIR / f"trainer_{run_id}.log"
@ -323,7 +326,10 @@ async def _spawn_training_run(run_state: RunState, config_path: Path):
# Step 1: Start the Atropos API server (run-api)
print(f"[{run_id}] Starting Atropos API server (run-api)...")
api_log_file = open(api_log, "w")
# File must stay open while the subprocess runs; we store the handle
# on run_state so _stop_training_run() can close it when done.
api_log_file = open(api_log, "w") # closed by _stop_training_run
run_state.api_log_file = api_log_file
run_state.api_process = subprocess.Popen(
["run-api"],
stdout=api_log_file,
@ -337,6 +343,7 @@ async def _spawn_training_run(run_state: RunState, config_path: Path):
if run_state.api_process.poll() is not None:
run_state.status = "failed"
run_state.error_message = f"API server exited with code {run_state.api_process.returncode}. Check {api_log}"
_stop_training_run(run_state)
return
print(f"[{run_id}] Atropos API server started")
@ -344,7 +351,8 @@ async def _spawn_training_run(run_state: RunState, config_path: Path):
# Step 2: Start the Tinker trainer
print(f"[{run_id}] Starting Tinker trainer: launch_training.py --config {config_path}")
trainer_log_file = open(trainer_log, "w")
trainer_log_file = open(trainer_log, "w") # closed by _stop_training_run
run_state.trainer_log_file = trainer_log_file
run_state.trainer_process = subprocess.Popen(
[sys.executable, "launch_training.py", "--config", str(config_path)],
stdout=trainer_log_file,
@ -360,8 +368,7 @@ async def _spawn_training_run(run_state: RunState, config_path: Path):
if run_state.trainer_process.poll() is not None:
run_state.status = "failed"
run_state.error_message = f"Trainer exited with code {run_state.trainer_process.returncode}. Check {trainer_log}"
if run_state.api_process:
run_state.api_process.terminate()
_stop_training_run(run_state)
return
print(f"[{run_id}] Trainer started, inference server on port 8001")
@ -380,11 +387,13 @@ async def _spawn_training_run(run_state: RunState, config_path: Path):
if not env_info:
run_state.status = "failed"
run_state.error_message = f"Environment '{run_state.environment}' not found"
_stop_training_run(run_state)
return
print(f"[{run_id}] Starting environment: {env_info.file_path} serve")
env_log_file = open(env_log, "w")
env_log_file = open(env_log, "w") # closed by _stop_training_run
run_state.env_log_file = env_log_file
run_state.env_process = subprocess.Popen(
[sys.executable, str(env_info.file_path), "serve", "--config", str(config_path)],
stdout=env_log_file,
@ -398,10 +407,7 @@ async def _spawn_training_run(run_state: RunState, config_path: Path):
if run_state.env_process.poll() is not None:
run_state.status = "failed"
run_state.error_message = f"Environment exited with code {run_state.env_process.returncode}. Check {env_log}"
if run_state.trainer_process:
run_state.trainer_process.terminate()
if run_state.api_process:
run_state.api_process.terminate()
_stop_training_run(run_state)
return
run_state.status = "running"
@ -480,6 +486,16 @@ def _stop_training_run(run_state: RunState):
if run_state.status == "running":
run_state.status = "stopped"
# Close log file handles that were opened for subprocess stdout.
for attr in ("env_log_file", "trainer_log_file", "api_log_file"):
fh = getattr(run_state, attr, None)
if fh is not None:
try:
fh.close()
except Exception:
pass
setattr(run_state, attr, None)
# ============================================================================
# Environment Discovery Tools
@ -1079,6 +1095,7 @@ async def rl_test_inference(
}
# Create output directory for test results
_ensure_logs_dir()
test_output_dir = LOGS_DIR / "inference_tests"
test_output_dir.mkdir(exist_ok=True)

View file

@ -8,10 +8,13 @@ human-friendly channel names to IDs. Works in both CLI and gateway contexts.
import json
import logging
import os
import re
import time
logger = logging.getLogger(__name__)
_TELEGRAM_TOPIC_TARGET_RE = re.compile(r"^\s*(-?\d+)(?::(\d+))?\s*$")
SEND_MESSAGE_SCHEMA = {
"name": "send_message",
@ -33,7 +36,7 @@ SEND_MESSAGE_SCHEMA = {
},
"target": {
"type": "string",
"description": "Delivery target. Format: 'platform' (uses home channel), 'platform:#channel-name', or 'platform:chat_id'. Examples: 'telegram', 'discord:#bot-home', 'slack:#engineering', 'signal:+15551234567'"
"description": "Delivery target. Format: 'platform' (uses home channel), 'platform:#channel-name', 'platform:chat_id', or Telegram topic 'telegram:chat_id:thread_id'. Examples: 'telegram', 'telegram:-1001234567890:17585', 'discord:#bot-home', 'slack:#engineering', 'signal:+15551234567'"
},
"message": {
"type": "string",
@ -73,23 +76,30 @@ def _handle_send(args):
parts = target.split(":", 1)
platform_name = parts[0].strip().lower()
chat_id = parts[1].strip() if len(parts) > 1 else None
target_ref = parts[1].strip() if len(parts) > 1 else None
chat_id = None
thread_id = None
if target_ref:
chat_id, thread_id, is_explicit = _parse_target_ref(platform_name, target_ref)
else:
is_explicit = False
# Resolve human-friendly channel names to numeric IDs
if chat_id and not chat_id.lstrip("-").isdigit():
if target_ref and not is_explicit:
try:
from gateway.channel_directory import resolve_channel_name
resolved = resolve_channel_name(platform_name, chat_id)
resolved = resolve_channel_name(platform_name, target_ref)
if resolved:
chat_id = resolved
chat_id, thread_id, _ = _parse_target_ref(platform_name, resolved)
else:
return json.dumps({
"error": f"Could not resolve '{chat_id}' on {platform_name}. "
"error": f"Could not resolve '{target_ref}' on {platform_name}. "
f"Use send_message(action='list') to see available targets."
})
except Exception:
return json.dumps({
"error": f"Could not resolve '{chat_id}' on {platform_name}. "
"error": f"Could not resolve '{target_ref}' on {platform_name}. "
f"Try using a numeric channel ID instead."
})
@ -109,6 +119,7 @@ def _handle_send(args):
"slack": Platform.SLACK,
"whatsapp": Platform.WHATSAPP,
"signal": Platform.SIGNAL,
"email": Platform.EMAIL,
}
platform = platform_map.get(platform_name)
if not platform:
@ -134,7 +145,7 @@ def _handle_send(args):
try:
from model_tools import _run_async
result = _run_async(_send_to_platform(platform, pconfig, chat_id, message))
result = _run_async(_send_to_platform(platform, pconfig, chat_id, message, thread_id=thread_id))
if used_home_channel and isinstance(result, dict) and result.get("success"):
result["note"] = f"Sent to {platform_name} home channel (chat_id: {chat_id})"
@ -143,7 +154,7 @@ def _handle_send(args):
try:
from gateway.mirror import mirror_to_session
source_label = os.getenv("HERMES_SESSION_PLATFORM", "cli")
if mirror_to_session(platform_name, chat_id, message, source_label=source_label):
if mirror_to_session(platform_name, chat_id, message, source_label=source_label, thread_id=thread_id):
result["mirrored"] = True
except Exception:
pass
@ -153,26 +164,42 @@ def _handle_send(args):
return json.dumps({"error": f"Send failed: {e}"})
async def _send_to_platform(platform, pconfig, chat_id, message):
def _parse_target_ref(platform_name: str, target_ref: str):
"""Parse a tool target into chat_id/thread_id and whether it is explicit."""
if platform_name == "telegram":
match = _TELEGRAM_TOPIC_TARGET_RE.fullmatch(target_ref)
if match:
return match.group(1), match.group(2), True
if target_ref.lstrip("-").isdigit():
return target_ref, None, True
return None, None, False
async def _send_to_platform(platform, pconfig, chat_id, message, thread_id=None):
"""Route a message to the appropriate platform sender."""
from gateway.config import Platform
if platform == Platform.TELEGRAM:
return await _send_telegram(pconfig.token, chat_id, message)
return await _send_telegram(pconfig.token, chat_id, message, thread_id=thread_id)
elif platform == Platform.DISCORD:
return await _send_discord(pconfig.token, chat_id, message)
elif platform == Platform.SLACK:
return await _send_slack(pconfig.token, chat_id, message)
elif platform == Platform.SIGNAL:
return await _send_signal(pconfig.extra, chat_id, message)
elif platform == Platform.EMAIL:
return await _send_email(pconfig.extra, chat_id, message)
return {"error": f"Direct sending not yet implemented for {platform.value}"}
async def _send_telegram(token, chat_id, message):
async def _send_telegram(token, chat_id, message, thread_id=None):
"""Send via Telegram Bot API (one-shot, no polling needed)."""
try:
from telegram import Bot
bot = Bot(token=token)
msg = await bot.send_message(chat_id=int(chat_id), text=message)
send_kwargs = {"chat_id": int(chat_id), "text": message}
if thread_id is not None:
send_kwargs["message_thread_id"] = int(thread_id)
msg = await bot.send_message(**send_kwargs)
return {"success": True, "platform": "telegram", "chat_id": chat_id, "message_id": str(msg.message_id)}
except ImportError:
return {"error": "python-telegram-bot not installed. Run: pip install python-telegram-bot"}
@ -259,6 +286,35 @@ async def _send_signal(extra, chat_id, message):
return {"error": f"Signal send failed: {e}"}
async def _send_email(extra, chat_id, message):
"""Send via SMTP (one-shot, no persistent connection needed)."""
import smtplib
from email.mime.text import MIMEText
address = extra.get("address") or os.getenv("EMAIL_ADDRESS", "")
password = os.getenv("EMAIL_PASSWORD", "")
smtp_host = extra.get("smtp_host") or os.getenv("EMAIL_SMTP_HOST", "")
smtp_port = int(os.getenv("EMAIL_SMTP_PORT", "587"))
if not all([address, password, smtp_host]):
return {"error": "Email not configured (EMAIL_ADDRESS, EMAIL_PASSWORD, EMAIL_SMTP_HOST required)"}
try:
msg = MIMEText(message, "plain", "utf-8")
msg["From"] = address
msg["To"] = chat_id
msg["Subject"] = "Hermes Agent"
server = smtplib.SMTP(smtp_host, smtp_port)
server.starttls()
server.login(address, password)
server.send_message(msg)
server.quit()
return {"success": True, "platform": "email", "chat_id": chat_id}
except Exception as e:
return {"error": f"Email send failed: {e}"}
def _check_send_message():
"""Gate send_message on gateway running (always available on messaging platforms)."""
platform = os.getenv("HERMES_SESSION_PLATFORM", "")

View file

@ -22,13 +22,7 @@ import os
import logging
from typing import Dict, Any, List, Optional, Union
from openai import AsyncOpenAI, OpenAI
from agent.auxiliary_client import get_async_text_auxiliary_client
# Resolve the async auxiliary client at import time so we have the model slug.
# Handles Codex Responses API adapter transparently.
_async_aux_client, _SUMMARIZER_MODEL = get_async_text_auxiliary_client()
from agent.auxiliary_client import async_call_llm
MAX_SESSION_CHARS = 100_000
MAX_SUMMARY_TOKENS = 10000
@ -156,26 +150,22 @@ async def _summarize_session(
f"Summarize this conversation with focus on: {query}"
)
if _async_aux_client is None or _SUMMARIZER_MODEL is None:
logging.warning("No auxiliary model available for session summarization")
return None
max_retries = 3
for attempt in range(max_retries):
try:
from agent.auxiliary_client import get_auxiliary_extra_body, auxiliary_max_tokens_param
_extra = get_auxiliary_extra_body()
response = await _async_aux_client.chat.completions.create(
model=_SUMMARIZER_MODEL,
response = await async_call_llm(
task="session_search",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
**({} if not _extra else {"extra_body": _extra}),
temperature=0.1,
**auxiliary_max_tokens_param(MAX_SUMMARY_TOKENS),
max_tokens=MAX_SUMMARY_TOKENS,
)
return response.choices[0].message.content.strip()
except RuntimeError:
logging.warning("No auxiliary model available for session summarization")
return None
except Exception as e:
if attempt < max_retries - 1:
await asyncio.sleep(1 * (attempt + 1))
@ -333,8 +323,6 @@ def session_search(
def check_session_search_requirements() -> bool:
"""Requires SQLite state database and an auxiliary text model."""
if _async_aux_client is None:
return False
try:
from hermes_state import DEFAULT_DB_PATH
return DEFAULT_DB_PATH.parent.exists()

View file

@ -37,6 +37,7 @@ import logging
import os
import re
import shutil
import tempfile
from pathlib import Path
from typing import Dict, Any, Optional
@ -190,6 +191,38 @@ def _validate_file_path(file_path: str) -> Optional[str]:
return None
def _atomic_write_text(file_path: Path, content: str, encoding: str = "utf-8") -> None:
"""
Atomically write text content to a file.
Uses a temporary file in the same directory and os.replace() to ensure
the target file is never left in a partially-written state if the process
crashes or is interrupted.
Args:
file_path: Target file path
content: Content to write
encoding: Text encoding (default: utf-8)
"""
file_path.parent.mkdir(parents=True, exist_ok=True)
fd, temp_path = tempfile.mkstemp(
dir=str(file_path.parent),
prefix=f".{file_path.name}.tmp.",
suffix="",
)
try:
with os.fdopen(fd, "w", encoding=encoding) as f:
f.write(content)
os.replace(temp_path, file_path)
except Exception:
# Clean up temp file on error
try:
os.unlink(temp_path)
except OSError:
pass
raise
# =============================================================================
# Core actions
# =============================================================================
@ -218,9 +251,9 @@ def _create_skill(name: str, content: str, category: str = None) -> Dict[str, An
skill_dir = _resolve_skill_dir(name, category)
skill_dir.mkdir(parents=True, exist_ok=True)
# Write SKILL.md
# Write SKILL.md atomically
skill_md = skill_dir / "SKILL.md"
skill_md.write_text(content, encoding="utf-8")
_atomic_write_text(skill_md, content)
# Security scan — roll back on block
scan_error = _security_scan_skill(skill_dir)
@ -256,13 +289,13 @@ def _edit_skill(name: str, content: str) -> Dict[str, Any]:
skill_md = existing["path"] / "SKILL.md"
# Back up original content for rollback
original_content = skill_md.read_text(encoding="utf-8") if skill_md.exists() else None
skill_md.write_text(content, encoding="utf-8")
_atomic_write_text(skill_md, content)
# Security scan — roll back on block
scan_error = _security_scan_skill(existing["path"])
if scan_error:
if original_content is not None:
skill_md.write_text(original_content, encoding="utf-8")
_atomic_write_text(skill_md, original_content)
return {"success": False, "error": scan_error}
return {
@ -342,12 +375,12 @@ def _patch_skill(
}
original_content = content # for rollback
target.write_text(new_content, encoding="utf-8")
_atomic_write_text(target, new_content)
# Security scan — roll back on block
scan_error = _security_scan_skill(skill_dir)
if scan_error:
target.write_text(original_content, encoding="utf-8")
_atomic_write_text(target, original_content)
return {"success": False, "error": scan_error}
replacements = count if replace_all else 1
@ -394,13 +427,13 @@ def _write_file(name: str, file_path: str, file_content: str) -> Dict[str, Any]:
target.parent.mkdir(parents=True, exist_ok=True)
# Back up for rollback
original_content = target.read_text(encoding="utf-8") if target.exists() else None
target.write_text(file_content, encoding="utf-8")
_atomic_write_text(target, file_content)
# Security scan — roll back on block
scan_error = _security_scan_skill(existing["path"])
if scan_error:
if original_content is not None:
target.write_text(original_content, encoding="utf-8")
_atomic_write_text(target, original_content)
else:
target.unlink(missing_ok=True)
return {"success": False, "error": scan_error}

View file

@ -29,7 +29,7 @@ from datetime import datetime, timezone
from pathlib import Path
from typing import List, Tuple
from hermes_constants import OPENROUTER_BASE_URL
# ---------------------------------------------------------------------------
@ -934,25 +934,12 @@ def llm_audit_skill(skill_path: Path, static_result: ScanResult,
if not model:
return static_result
# Call the LLM via the OpenAI SDK (same pattern as run_agent.py)
# Call the LLM via the centralized provider router
try:
from openai import OpenAI
import os
from agent.auxiliary_client import call_llm
api_key = os.getenv("OPENROUTER_API_KEY", "")
if not api_key:
return static_result
client = OpenAI(
base_url=OPENROUTER_BASE_URL,
api_key=api_key,
default_headers={
"HTTP-Referer": "https://github.com/NousResearch/hermes-agent",
"X-OpenRouter-Title": "Hermes Agent",
"X-OpenRouter-Categories": "productivity,cli-agent",
},
)
response = client.chat.completions.create(
response = call_llm(
provider="openrouter",
model=model,
messages=[{
"role": "user",

View file

@ -572,14 +572,23 @@ class ClawHubSource(SkillSource):
logger.warning("ClawHub fetch failed for %s: could not resolve latest version", slug)
return None
version_data = self._get_json(f"{self.BASE_URL}/skills/{slug}/versions/{latest_version}")
if not isinstance(version_data, dict):
return None
# Primary method: download the skill as a ZIP bundle from /download
files = self._download_zip(slug, latest_version)
# Fallback: try the version metadata endpoint for inline/raw content
if "SKILL.md" not in files:
version_data = self._get_json(f"{self.BASE_URL}/skills/{slug}/versions/{latest_version}")
if isinstance(version_data, dict):
# Files may be nested under version_data["version"]["files"]
files = self._extract_files(version_data) or files
if "SKILL.md" not in files:
nested = version_data.get("version", {})
if isinstance(nested, dict):
files = self._extract_files(nested) or files
files = self._extract_files(version_data)
if "SKILL.md" not in files:
logger.warning(
"ClawHub fetch for %s resolved version %s but no inline/raw file content was available",
"ClawHub fetch for %s resolved version %s but could not retrieve file content",
slug,
latest_version,
)
@ -674,6 +683,65 @@ class ClawHubSource(SkillSource):
return files
def _download_zip(self, slug: str, version: str) -> Dict[str, str]:
"""Download skill as a ZIP bundle from the /download endpoint and extract text files."""
import io
import zipfile
files: Dict[str, str] = {}
max_retries = 3
for attempt in range(max_retries):
try:
resp = httpx.get(
f"{self.BASE_URL}/download",
params={"slug": slug, "version": version},
timeout=30,
follow_redirects=True,
)
if resp.status_code == 429:
retry_after = int(resp.headers.get("retry-after", "5"))
retry_after = min(retry_after, 15) # Cap wait time
logger.debug(
"ClawHub download rate-limited for %s, retrying in %ds (attempt %d/%d)",
slug, retry_after, attempt + 1, max_retries,
)
time.sleep(retry_after)
continue
if resp.status_code != 200:
logger.debug("ClawHub ZIP download for %s v%s returned %s", slug, version, resp.status_code)
return files
with zipfile.ZipFile(io.BytesIO(resp.content)) as zf:
for info in zf.infolist():
if info.is_dir():
continue
# Sanitize path — strip leading slashes and ..
name = info.filename.lstrip("/")
if ".." in name or name.startswith("/"):
continue
# Only extract text-sized files (skip large binaries)
if info.file_size > 500_000:
logger.debug("Skipping large file in ZIP: %s (%d bytes)", name, info.file_size)
continue
try:
raw = zf.read(info.filename)
files[name] = raw.decode("utf-8")
except (UnicodeDecodeError, KeyError):
logger.debug("Skipping non-text file in ZIP: %s", name)
continue
return files
except zipfile.BadZipFile:
logger.warning("ClawHub returned invalid ZIP for %s v%s", slug, version)
return files
except httpx.HTTPError as exc:
logger.debug("ClawHub ZIP download failed for %s v%s: %s", slug, version, exc)
return files
logger.debug("ClawHub ZIP download exhausted retries for %s v%s", slug, version)
return files
def _fetch_text(self, url: str) -> Optional[str]:
try:
resp = httpx.get(url, timeout=20)

File diff suppressed because it is too large Load diff

View file

@ -29,6 +29,7 @@ Usage:
import json
import logging
import os
import platform
import signal
import sys
import time
@ -83,8 +84,8 @@ def _check_disk_usage_warning():
if f.is_file():
try:
total_bytes += f.stat().st_size
except OSError:
pass
except OSError as e:
logger.debug("Could not stat file %s: %s", f, e)
total_gb = total_bytes / (1024 ** 3)
@ -192,23 +193,35 @@ def _prompt_for_sudo_password(timeout_seconds: int = 45) -> str:
result = {"password": None, "done": False}
def read_password_thread():
"""Read password from /dev/tty with echo disabled."""
"""Read password with echo disabled. Uses msvcrt on Windows, /dev/tty on Unix."""
tty_fd = None
old_attrs = None
try:
import termios
tty_fd = os.open("/dev/tty", os.O_RDONLY)
old_attrs = termios.tcgetattr(tty_fd)
new_attrs = termios.tcgetattr(tty_fd)
new_attrs[3] = new_attrs[3] & ~termios.ECHO
termios.tcsetattr(tty_fd, termios.TCSAFLUSH, new_attrs)
chars = []
while True:
b = os.read(tty_fd, 1)
if not b or b in (b"\n", b"\r"):
break
chars.append(b)
result["password"] = b"".join(chars).decode("utf-8", errors="replace")
if platform.system() == "Windows":
import msvcrt
chars = []
while True:
c = msvcrt.getwch()
if c in ("\r", "\n"):
break
if c == "\x03":
raise KeyboardInterrupt
chars.append(c)
result["password"] = "".join(chars)
else:
import termios
tty_fd = os.open("/dev/tty", os.O_RDONLY)
old_attrs = termios.tcgetattr(tty_fd)
new_attrs = termios.tcgetattr(tty_fd)
new_attrs[3] = new_attrs[3] & ~termios.ECHO
termios.tcsetattr(tty_fd, termios.TCSAFLUSH, new_attrs)
chars = []
while True:
b = os.read(tty_fd, 1)
if not b or b in (b"\n", b"\r"):
break
chars.append(b)
result["password"] = b"".join(chars).decode("utf-8", errors="replace")
except (EOFError, KeyboardInterrupt, OSError):
result["password"] = ""
except Exception:
@ -218,13 +231,13 @@ def _prompt_for_sudo_password(timeout_seconds: int = 45) -> str:
try:
import termios as _termios
_termios.tcsetattr(tty_fd, _termios.TCSAFLUSH, old_attrs)
except Exception:
pass
except Exception as e:
logger.debug("Failed to restore terminal attributes: %s", e)
if tty_fd is not None:
try:
os.close(tty_fd)
except Exception:
pass
except Exception as e:
logger.debug("Failed to close tty fd: %s", e)
result["done"] = True
try:
@ -278,32 +291,50 @@ def _prompt_for_sudo_password(timeout_seconds: int = 45) -> str:
del os.environ["HERMES_SPINNER_PAUSE"]
def _transform_sudo_command(command: str) -> str:
def _transform_sudo_command(command: str) -> tuple[str, str | None]:
"""
Transform sudo commands to use -S flag if SUDO_PASSWORD is available.
This is a shared helper used by all execution environments to provide
consistent sudo handling across local, SSH, and container environments.
If SUDO_PASSWORD is set (via env, config, or interactive prompt):
'sudo apt install curl' -> password piped via sudo -S
Returns:
(transformed_command, sudo_stdin) where:
- transformed_command has every bare ``sudo`` replaced with
``sudo -S -p ''`` so sudo reads its password from stdin.
- sudo_stdin is the password string with a trailing newline that the
caller must prepend to the process's stdin stream. sudo -S reads
exactly one line (the password) and passes the rest of stdin to the
child command, so prepending is safe even when the caller also has
its own stdin_data to pipe.
- If no password is available, sudo_stdin is None and the command is
returned unchanged so it fails gracefully with
"sudo: a password is required".
Callers that drive a subprocess directly (local, ssh, docker, singularity)
should prepend sudo_stdin to their stdin_data and pass the merged bytes to
Popen's stdin pipe.
Callers that cannot pipe subprocess stdin (modal, daytona) must embed the
password in the command string themselves; see their execute() methods for
how they handle the non-None sudo_stdin case.
If SUDO_PASSWORD is not set and in interactive mode (HERMES_INTERACTIVE=1):
Prompts user for password with 45s timeout, caches for session.
If SUDO_PASSWORD is not set and NOT interactive:
Command runs as-is (fails gracefully with "sudo: a password is required").
"""
global _cached_sudo_password
import re
# Check if command even contains sudo
if not re.search(r'\bsudo\b', command):
return command # No sudo in command, return as-is
return command, None # No sudo in command, nothing to do
# Try to get password from: env var -> session cache -> interactive prompt
sudo_password = os.getenv("SUDO_PASSWORD", "") or _cached_sudo_password
if not sudo_password:
# No password configured - check if we're in interactive mode
if os.getenv("HERMES_INTERACTIVE"):
@ -311,21 +342,21 @@ def _transform_sudo_command(command: str) -> str:
sudo_password = _prompt_for_sudo_password(timeout_seconds=45)
if sudo_password:
_cached_sudo_password = sudo_password # Cache for session
if not sudo_password:
return command # No password, let it fail gracefully
return command, None # No password, let it fail gracefully
def replace_sudo(match):
# Replace 'sudo' with password-piped version
# The -S flag makes sudo read password from stdin
# The -p '' suppresses the password prompt
# Use shlex.quote() to prevent shell injection via password content
import shlex
return f"echo {shlex.quote(sudo_password)} | sudo -S -p ''"
# Replace bare 'sudo' with 'sudo -S -p ""'.
# The password is returned as sudo_stdin and must be written to the
# process's stdin pipe by the caller — it never appears in any
# command-line argument or shell string.
return "sudo -S -p ''"
# Match 'sudo' at word boundaries (not 'visudo' or 'sudoers')
# This handles: sudo, sudo -flag, etc.
return re.sub(r'\bsudo\b', replace_sudo, command)
transformed = re.sub(r'\bsudo\b', replace_sudo, command)
# Trailing newline is required: sudo -S reads one line for the password.
return transformed, sudo_password + "\n"
# Environment classes now live in tools/environments/
@ -403,6 +434,23 @@ def clear_task_env_overrides(task_id: str):
_task_env_overrides.pop(task_id, None)
# Configuration from environment variables
def _parse_env_var(name: str, default: str, converter=int, type_label: str = "integer"):
"""Parse an environment variable with *converter*, raising a clear error on bad values.
Without this wrapper, a single malformed env var (e.g. TERMINAL_TIMEOUT=5m)
causes an unhandled ValueError that kills every terminal command.
"""
raw = os.getenv(name, default)
try:
return converter(raw)
except (ValueError, json.JSONDecodeError):
raise ValueError(
f"Invalid value for {name}: {raw!r} (expected {type_label}). "
f"Check ~/.hermes/.env or environment variables."
)
def _get_env_config() -> Dict[str, Any]:
"""Get terminal environment configuration from environment variables."""
# Default image with Python and Node.js for maximum compatibility
@ -415,7 +463,7 @@ def _get_env_config() -> Dict[str, Any]:
if env_type == "local":
default_cwd = os.getcwd()
else:
default_cwd = "~"
default_cwd = "/root"
# Read TERMINAL_CWD but sanity-check it for container backends.
# If the CWD looks like a host-local path that can't exist inside a
@ -424,7 +472,8 @@ def _get_env_config() -> Dict[str, Any]:
# SSH is excluded since /home/ paths are valid on remote machines.
cwd = os.getenv("TERMINAL_CWD", default_cwd)
if env_type in ("modal", "docker", "singularity", "daytona") and cwd:
host_prefixes = ("/Users/", "C:\\", "C:/")
# Host paths that won't exist inside containers
host_prefixes = ("/Users/", "/home/", "C:\\", "C:/")
if any(cwd.startswith(p) for p in host_prefixes) and cwd != default_cwd:
logger.info("Ignoring TERMINAL_CWD=%r for %s backend "
"(host path won't exist in sandbox). Using %r instead.",
@ -438,19 +487,19 @@ def _get_env_config() -> Dict[str, Any]:
"modal_image": os.getenv("TERMINAL_MODAL_IMAGE", default_image),
"daytona_image": os.getenv("TERMINAL_DAYTONA_IMAGE", default_image),
"cwd": cwd,
"timeout": int(os.getenv("TERMINAL_TIMEOUT", "180")),
"lifetime_seconds": int(os.getenv("TERMINAL_LIFETIME_SECONDS", "300")),
"timeout": _parse_env_var("TERMINAL_TIMEOUT", "180"),
"lifetime_seconds": _parse_env_var("TERMINAL_LIFETIME_SECONDS", "300"),
# SSH-specific config
"ssh_host": os.getenv("TERMINAL_SSH_HOST", ""),
"ssh_user": os.getenv("TERMINAL_SSH_USER", ""),
"ssh_port": int(os.getenv("TERMINAL_SSH_PORT", "22")),
"ssh_port": _parse_env_var("TERMINAL_SSH_PORT", "22"),
"ssh_key": os.getenv("TERMINAL_SSH_KEY", ""),
# Container resource config (applies to docker, singularity, modal, daytona -- ignored for local/ssh)
"container_cpu": float(os.getenv("TERMINAL_CONTAINER_CPU", "1")),
"container_memory": int(os.getenv("TERMINAL_CONTAINER_MEMORY", "5120")), # MB (default 5GB)
"container_disk": int(os.getenv("TERMINAL_CONTAINER_DISK", "51200")), # MB (default 50GB)
"container_cpu": _parse_env_var("TERMINAL_CONTAINER_CPU", "1", float, "number"),
"container_memory": _parse_env_var("TERMINAL_CONTAINER_MEMORY", "5120"), # MB (default 5GB)
"container_disk": _parse_env_var("TERMINAL_CONTAINER_DISK", "51200"), # MB (default 50GB)
"container_persistent": os.getenv("TERMINAL_CONTAINER_PERSISTENT", "true").lower() in ("true", "1", "yes"),
"docker_volumes": json.loads(os.getenv("TERMINAL_DOCKER_VOLUMES", "[]")),
"docker_volumes": _parse_env_var("TERMINAL_DOCKER_VOLUMES", "[]", json.loads, "valid JSON"),
}
@ -504,7 +553,12 @@ def _create_environment(env_type: str, image: str, cwd: str, timeout: int,
if memory > 0:
sandbox_kwargs["memory"] = memory
if disk > 0:
sandbox_kwargs["ephemeral_disk"] = disk
try:
import inspect, modal
if "ephemeral_disk" in inspect.signature(modal.Sandbox.create).parameters:
sandbox_kwargs["ephemeral_disk"] = disk
except Exception:
pass
return _ModalEnvironment(
image=image, cwd=cwd, timeout=timeout,
@ -658,8 +712,8 @@ def get_active_environments_info() -> Dict[str, Any]:
try:
size = sum(f.stat().st_size for f in Path(path).rglob('*') if f.is_file())
total_size += size
except OSError:
pass
except OSError as e:
logger.debug("Could not stat path %s: %s", path, e)
info["total_disk_usage_mb"] = round(total_size / (1024 * 1024), 2)
return info
@ -686,8 +740,8 @@ def cleanup_all_environments():
try:
shutil.rmtree(path, ignore_errors=True)
logger.info("Removed orphaned: %s", path)
except OSError:
pass
except OSError as e:
logger.debug("Failed to remove orphaned path %s: %s", path, e)
if cleaned > 0:
logger.info("Cleaned %d environments", cleaned)
@ -1080,9 +1134,14 @@ def check_terminal_requirements() -> bool:
return True
elif env_type == "docker":
from minisweagent.environments.docker import DockerEnvironment
# Check if docker is available
# Check if docker is available (use find_docker for macOS PATH issues)
from tools.environments.docker import find_docker
import subprocess
result = subprocess.run(["docker", "version"], capture_output=True, timeout=5)
docker = find_docker()
if not docker:
logger.error("Docker executable not found in PATH or common install locations")
return False
result = subprocess.run([docker, "version"], capture_output=True, timeout=5)
return result.returncode == 0
elif env_type == "singularity":
from minisweagent.environments.singularity import SingularityEnvironment

View file

@ -105,8 +105,17 @@ class TodoStore:
"cancelled": "[~]",
}
lines = ["[Your task list was preserved across context compression]"]
for item in self._items:
# Only inject pending/in_progress items — completed/cancelled ones
# cause the model to re-do finished work after compression.
active_items = [
item for item in self._items
if item["status"] in ("pending", "in_progress")
]
if not active_items:
return None
lines = ["[Your active task list was preserved across context compression]"]
for item in active_items:
marker = markers.get(item["status"], "[?]")
lines.append(f"- {marker} {item['id']}. {item['content']} ({item['status']})")

View file

@ -83,7 +83,11 @@ def _load_tts_config() -> Dict[str, Any]:
from hermes_cli.config import load_config
config = load_config()
return config.get("tts", {})
except Exception:
except ImportError:
logger.debug("hermes_cli.config not available, using default TTS config")
return {}
except Exception as e:
logger.warning("Failed to load TTS config: %s", e, exc_info=True)
return {}
@ -115,15 +119,23 @@ def _convert_to_opus(mp3_path: str) -> Optional[str]:
ogg_path = mp3_path.rsplit(".", 1)[0] + ".ogg"
try:
subprocess.run(
result = subprocess.run(
["ffmpeg", "-i", mp3_path, "-acodec", "libopus",
"-ac", "1", "-b:a", "64k", "-vbr", "off", ogg_path, "-y"],
capture_output=True, timeout=30,
)
if result.returncode != 0:
logger.warning("ffmpeg conversion failed with return code %d: %s",
result.returncode, result.stderr.decode('utf-8', errors='ignore')[:200])
return None
if os.path.exists(ogg_path) and os.path.getsize(ogg_path) > 0:
return ogg_path
except subprocess.TimeoutExpired:
logger.warning("ffmpeg OGG conversion timed out after 30s")
except FileNotFoundError:
logger.warning("ffmpeg not found in PATH")
except Exception as e:
logger.warning("ffmpeg OGG conversion failed: %s", e)
logger.warning("ffmpeg OGG conversion failed: %s", e, exc_info=True)
return None
@ -369,10 +381,21 @@ def text_to_speech_tool(
"voice_compatible": voice_compatible,
}, ensure_ascii=False)
except Exception as e:
error_msg = f"TTS generation failed ({provider}): {e}"
except ValueError as e:
# Configuration errors (missing API keys, etc.)
error_msg = f"TTS configuration error ({provider}): {e}"
logger.error("%s", error_msg)
return json.dumps({"success": False, "error": error_msg}, ensure_ascii=False)
except FileNotFoundError as e:
# Missing dependencies or files
error_msg = f"TTS dependency missing ({provider}): {e}"
logger.error("%s", error_msg, exc_info=True)
return json.dumps({"success": False, "error": error_msg}, ensure_ascii=False)
except Exception as e:
# Unexpected errors
error_msg = f"TTS generation failed ({provider}): {e}"
logger.error("%s", error_msg, exc_info=True)
return json.dumps({"success": False, "error": error_msg}, ensure_ascii=False)
# ===========================================================================

View file

@ -27,37 +27,21 @@ Usage:
)
"""
import asyncio
import base64
import json
import logging
import os
import asyncio
import uuid
import base64
from pathlib import Path
from typing import Dict, Any, Optional
from typing import Any, Awaitable, Dict, Optional
from urllib.parse import urlparse
import httpx
from openai import AsyncOpenAI
from agent.auxiliary_client import get_vision_auxiliary_client
from agent.auxiliary_client import async_call_llm
from tools.debug_helpers import DebugSession
logger = logging.getLogger(__name__)
# Resolve vision auxiliary client at module level; build an async wrapper.
_aux_sync_client, DEFAULT_VISION_MODEL = get_vision_auxiliary_client()
_aux_async_client: AsyncOpenAI | None = None
if _aux_sync_client is not None:
_async_kwargs = {
"api_key": _aux_sync_client.api_key,
"base_url": str(_aux_sync_client.base_url),
}
if "openrouter" in str(_aux_sync_client.base_url).lower():
_async_kwargs["default_headers"] = {
"HTTP-Referer": "https://github.com/NousResearch/hermes-agent",
"X-OpenRouter-Title": "Hermes Agent",
"X-OpenRouter-Categories": "productivity,cli-agent",
}
_aux_async_client = AsyncOpenAI(**_async_kwargs)
_debug = DebugSession("vision_tools", env_var="VISION_TOOLS_DEBUG")
@ -73,15 +57,18 @@ def _validate_image_url(url: str) -> bool:
"""
if not url or not isinstance(url, str):
return False
# Check if it's a valid URL format
if not (url.startswith('http://') or url.startswith('https://')):
# Basic HTTP/HTTPS URL check
if not (url.startswith("http://") or url.startswith("https://")):
return False
# Check for common image extensions (optional, as URLs may not have extensions)
image_extensions = ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp', '.svg']
return True # Allow all HTTP/HTTPS URLs for flexibility
# Parse to ensure we at least have a network location; still allow URLs
# without file extensions (e.g. CDN endpoints that redirect to images).
parsed = urlparse(url)
if not parsed.netloc:
return False
return True # Allow all well-formed HTTP/HTTPS URLs for flexibility
async def _download_image(image_url: str, destination: Path, max_retries: int = 3) -> Path:
@ -131,7 +118,12 @@ async def _download_image(image_url: str, destination: Path, max_retries: int =
logger.warning("Retrying in %ss...", wait_time)
await asyncio.sleep(wait_time)
else:
logger.error("Image download failed after %s attempts: %s", max_retries, str(e)[:100])
logger.error(
"Image download failed after %s attempts: %s",
max_retries,
str(e)[:100],
exc_info=True,
)
raise last_error
@ -188,7 +180,7 @@ def _image_to_base64_data_url(image_path: Path, mime_type: Optional[str] = None)
async def vision_analyze_tool(
image_url: str,
user_prompt: str,
model: str = DEFAULT_VISION_MODEL
model: str = None,
) -> str:
"""
Analyze an image from a URL or local file path using vision AI.
@ -248,14 +240,6 @@ async def vision_analyze_tool(
logger.info("Analyzing image: %s", image_url[:60])
logger.info("User prompt: %s", user_prompt[:100])
# Check auxiliary vision client availability
if _aux_async_client is None or DEFAULT_VISION_MODEL is None:
return json.dumps({
"success": False,
"analysis": "Vision analysis unavailable: no auxiliary vision model configured. "
"Set OPENROUTER_API_KEY or configure Nous Portal to enable vision tools."
}, indent=2, ensure_ascii=False)
# Determine if this is a local file path or a remote URL
local_path = Path(image_url)
if local_path.is_file():
@ -311,18 +295,18 @@ async def vision_analyze_tool(
}
]
logger.info("Processing image with %s...", model)
logger.info("Processing image with vision model...")
# Call the vision API
from agent.auxiliary_client import get_auxiliary_extra_body, auxiliary_max_tokens_param
_extra = get_auxiliary_extra_body()
response = await _aux_async_client.chat.completions.create(
model=model,
messages=messages,
temperature=0.1,
**auxiliary_max_tokens_param(2000),
**({} if not _extra else {"extra_body": _extra}),
)
# Call the vision API via centralized router
call_kwargs = {
"task": "vision",
"messages": messages,
"temperature": 0.1,
"max_tokens": 2000,
}
if model:
call_kwargs["model"] = model
response = await async_call_llm(**call_kwargs)
# Extract the analysis
analysis = response.choices[0].message.content.strip()
@ -347,12 +331,30 @@ async def vision_analyze_tool(
except Exception as e:
error_msg = f"Error analyzing image: {str(e)}"
logger.error("%s", error_msg)
logger.error("%s", error_msg, exc_info=True)
# Detect vision capability errors — give the model a clear message
# so it can inform the user instead of a cryptic API error.
err_str = str(e).lower()
if any(hint in err_str for hint in (
"does not support", "not support image", "invalid_request",
"content_policy", "image_url", "multimodal",
"unrecognized request argument", "image input",
)):
analysis = (
f"{model} does not support vision or our request was not "
f"accepted by the server. Error: {e}"
)
else:
analysis = (
"There was a problem with the request and the image could not "
f"be analyzed. Error: {e}"
)
# Prepare error response
result = {
"success": False,
"analysis": "There was a problem with the request and the image could not be analyzed."
"analysis": analysis,
}
debug_call_data["error"] = error_msg
@ -368,12 +370,25 @@ async def vision_analyze_tool(
temp_image_path.unlink()
logger.debug("Cleaned up temporary image file")
except Exception as cleanup_error:
logger.warning("Could not delete temporary file: %s", cleanup_error)
logger.warning(
"Could not delete temporary file: %s", cleanup_error, exc_info=True
)
def check_vision_requirements() -> bool:
"""Check if an auxiliary vision model is available."""
return _aux_async_client is not None
try:
from agent.auxiliary_client import resolve_provider_client
client, _ = resolve_provider_client("openrouter")
if client is not None:
return True
client, _ = resolve_provider_client("nous")
if client is not None:
return True
client, _ = resolve_provider_client("custom")
return client is not None
except Exception:
return False
def get_debug_session_info() -> Dict[str, Any]:
@ -401,10 +416,9 @@ if __name__ == "__main__":
print("Set OPENROUTER_API_KEY or configure Nous Portal to enable vision tools.")
exit(1)
else:
print(f"✅ Vision model available: {DEFAULT_VISION_MODEL}")
print("✅ Vision model available")
print("🛠️ Vision tools ready for use!")
print(f"🧠 Using model: {DEFAULT_VISION_MODEL}")
# Show debug mode status
if _debug.active:
@ -464,13 +478,14 @@ VISION_ANALYZE_SCHEMA = {
}
def _handle_vision_analyze(args, **kw):
def _handle_vision_analyze(args: Dict[str, Any], **kw: Any) -> Awaitable[str]:
image_url = args.get("image_url", "")
question = args.get("question", "")
full_prompt = f"Fully describe and explain everything about this image, then answer the following question:\n\n{question}"
model = (os.getenv("AUXILIARY_VISION_MODEL", "").strip()
or DEFAULT_VISION_MODEL
or "google/gemini-3-flash-preview")
full_prompt = (
"Fully describe and explain everything about this image, then answer the "
f"following question:\n\n{question}"
)
model = os.getenv("AUXILIARY_VISION_MODEL", "").strip() or None
return vision_analyze_tool(image_url, full_prompt, model)

View file

@ -47,8 +47,7 @@ import re
import asyncio
from typing import List, Dict, Any, Optional
from firecrawl import Firecrawl
from openai import AsyncOpenAI
from agent.auxiliary_client import get_async_text_auxiliary_client
from agent.auxiliary_client import async_call_llm
from tools.debug_helpers import DebugSession
logger = logging.getLogger(__name__)
@ -83,15 +82,8 @@ def _get_firecrawl_client():
DEFAULT_MIN_LENGTH_FOR_SUMMARIZATION = 5000
# Resolve async auxiliary client at module level.
# Handles Codex Responses API adapter transparently.
_aux_async_client, _DEFAULT_SUMMARIZER_MODEL = get_async_text_auxiliary_client("web_extract")
# Allow per-task override via config.yaml auxiliary.web_extract_model
DEFAULT_SUMMARIZER_MODEL = (
os.getenv("AUXILIARY_WEB_EXTRACT_MODEL", "").strip()
or _DEFAULT_SUMMARIZER_MODEL
)
# Allow per-task override via env var
DEFAULT_SUMMARIZER_MODEL = os.getenv("AUXILIARY_WEB_EXTRACT_MODEL", "").strip() or None
_debug = DebugSession("web_tools", env_var="WEB_TOOLS_DEBUG")
@ -249,22 +241,22 @@ Create a markdown summary that captures all key information in a well-organized,
for attempt in range(max_retries):
try:
if _aux_async_client is None:
logger.warning("No auxiliary model available for web content processing")
return None
from agent.auxiliary_client import get_auxiliary_extra_body, auxiliary_max_tokens_param
_extra = get_auxiliary_extra_body()
response = await _aux_async_client.chat.completions.create(
model=model,
messages=[
call_kwargs = {
"task": "web_extract",
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
temperature=0.1,
**auxiliary_max_tokens_param(max_tokens),
**({} if not _extra else {"extra_body": _extra}),
)
"temperature": 0.1,
"max_tokens": max_tokens,
}
if model:
call_kwargs["model"] = model
response = await async_call_llm(**call_kwargs)
return response.choices[0].message.content.strip()
except RuntimeError:
logger.warning("No auxiliary model available for web content processing")
return None
except Exception as api_error:
last_error = api_error
if attempt < max_retries - 1:
@ -368,25 +360,18 @@ Synthesize these into ONE cohesive, comprehensive summary that:
Create a single, unified markdown summary."""
try:
if _aux_async_client is None:
logger.warning("No auxiliary model for synthesis, concatenating summaries")
fallback = "\n\n".join(summaries)
if len(fallback) > max_output_size:
fallback = fallback[:max_output_size] + "\n\n[... truncated ...]"
return fallback
from agent.auxiliary_client import get_auxiliary_extra_body, auxiliary_max_tokens_param
_extra = get_auxiliary_extra_body()
response = await _aux_async_client.chat.completions.create(
model=model,
messages=[
call_kwargs = {
"task": "web_extract",
"messages": [
{"role": "system", "content": "You synthesize multiple summaries into one cohesive, comprehensive summary. Be thorough but concise."},
{"role": "user", "content": synthesis_prompt}
],
temperature=0.1,
**auxiliary_max_tokens_param(20000),
**({} if not _extra else {"extra_body": _extra}),
)
"temperature": 0.1,
"max_tokens": 20000,
}
if model:
call_kwargs["model"] = model
response = await async_call_llm(**call_kwargs)
final_summary = response.choices[0].message.content.strip()
# Enforce hard cap
@ -713,8 +698,8 @@ async def web_extract_tool(
debug_call_data["pages_extracted"] = pages_extracted
debug_call_data["original_response_size"] = len(json.dumps(response))
# Process each result with LLM if enabled and auxiliary client is available
if use_llm_processing and _aux_async_client is not None:
# Process each result with LLM if enabled
if use_llm_processing:
logger.info("Processing extracted content with LLM (parallel)...")
debug_call_data["processing_applied"].append("llm_processing")
@ -780,10 +765,6 @@ async def web_extract_tool(
else:
logger.warning("%s (no content to process)", url)
else:
if use_llm_processing and _aux_async_client is None:
logger.warning("LLM processing requested but no auxiliary model available, returning raw content")
debug_call_data["processing_applied"].append("llm_processing_unavailable")
# Print summary of extracted pages for debugging (original behavior)
for result in response.get('results', []):
url = result.get('url', 'Unknown URL')
@ -1013,8 +994,8 @@ async def web_crawl_tool(
debug_call_data["pages_crawled"] = pages_crawled
debug_call_data["original_response_size"] = len(json.dumps(response))
# Process each result with LLM if enabled and auxiliary client is available
if use_llm_processing and _aux_async_client is not None:
# Process each result with LLM if enabled
if use_llm_processing:
logger.info("Processing crawled content with LLM (parallel)...")
debug_call_data["processing_applied"].append("llm_processing")
@ -1080,10 +1061,6 @@ async def web_crawl_tool(
else:
logger.warning("%s (no content to process)", page_url)
else:
if use_llm_processing and _aux_async_client is None:
logger.warning("LLM processing requested but no auxiliary model available, returning raw content")
debug_call_data["processing_applied"].append("llm_processing_unavailable")
# Print summary of crawled pages for debugging (original behavior)
for result in response.get('results', []):
page_url = result.get('url', 'Unknown URL')
@ -1138,7 +1115,15 @@ def check_firecrawl_api_key() -> bool:
def check_auxiliary_model() -> bool:
"""Check if an auxiliary text model is available for LLM content processing."""
return _aux_async_client is not None
try:
from agent.auxiliary_client import resolve_provider_client
for p in ("openrouter", "nous", "custom", "codex"):
client, _ = resolve_provider_client(p)
if client is not None:
return True
return False
except Exception:
return False
def get_debug_session_info() -> Dict[str, Any]: