Hermes Agent UX Improvements

This commit is contained in:
teknium1 2026-02-22 02:16:11 -08:00
parent b1f55e3ee5
commit ededaaa874
23 changed files with 945 additions and 1545 deletions

128
agent/auxiliary_client.py Normal file
View file

@ -0,0 +1,128 @@
"""Shared auxiliary OpenAI client for cheap/fast side tasks.
Provides a single resolution chain so every consumer (context compression,
session search, web extraction, vision analysis, browser vision) picks up
the best available backend without duplicating fallback logic.
Resolution order for text tasks:
1. OpenRouter (OPENROUTER_API_KEY)
2. Nous Portal (~/.hermes/auth.json active provider)
3. Custom endpoint (OPENAI_BASE_URL + OPENAI_API_KEY)
4. None
Resolution order for vision/multimodal tasks:
1. OpenRouter
2. Nous Portal
3. None (custom endpoints can't substitute for Gemini multimodal)
"""
import json
import logging
import os
from pathlib import Path
from typing import Optional, Tuple
from openai import OpenAI
from hermes_constants import OPENROUTER_BASE_URL
logger = logging.getLogger(__name__)
# Default auxiliary models per provider
_OPENROUTER_MODEL = "google/gemini-3-flash-preview"
_NOUS_MODEL = "gemini-3-flash"
_NOUS_DEFAULT_BASE_URL = "https://inference-api.nousresearch.com/v1"
_AUTH_JSON_PATH = Path.home() / ".hermes" / "auth.json"
def _read_nous_auth() -> Optional[dict]:
"""Read and validate ~/.hermes/auth.json for an active Nous provider.
Returns the provider state dict if Nous is active with tokens,
otherwise None.
"""
try:
if not _AUTH_JSON_PATH.is_file():
return None
data = json.loads(_AUTH_JSON_PATH.read_text())
if data.get("active_provider") != "nous":
return None
provider = data.get("providers", {}).get("nous", {})
# Must have at least an access_token or agent_key
if not provider.get("agent_key") and not provider.get("access_token"):
return None
return provider
except Exception as exc:
logger.debug("Could not read Nous auth: %s", exc)
return None
def _nous_api_key(provider: dict) -> str:
"""Extract the best API key from a Nous provider state dict."""
return provider.get("agent_key") or provider.get("access_token", "")
def _nous_base_url() -> str:
"""Resolve the Nous inference base URL from env or default."""
return os.getenv("NOUS_INFERENCE_BASE_URL", _NOUS_DEFAULT_BASE_URL)
# ── Public API ──────────────────────────────────────────────────────────────
def get_text_auxiliary_client() -> Tuple[Optional[OpenAI], Optional[str]]:
"""Return (client, model_slug) for text-only auxiliary tasks.
Falls through OpenRouter -> Nous Portal -> custom endpoint -> (None, None).
"""
# 1. OpenRouter
or_key = os.getenv("OPENROUTER_API_KEY")
if or_key:
logger.debug("Auxiliary text client: OpenRouter")
return OpenAI(api_key=or_key, base_url=OPENROUTER_BASE_URL), _OPENROUTER_MODEL
# 2. Nous Portal
nous = _read_nous_auth()
if nous:
logger.debug("Auxiliary text client: Nous Portal")
return (
OpenAI(api_key=_nous_api_key(nous), base_url=_nous_base_url()),
_NOUS_MODEL,
)
# 3. Custom endpoint (both base URL and key must be set)
custom_base = os.getenv("OPENAI_BASE_URL")
custom_key = os.getenv("OPENAI_API_KEY")
if custom_base and custom_key:
model = os.getenv("OPENAI_MODEL") or os.getenv("LLM_MODEL") or "gpt-4o-mini"
logger.debug("Auxiliary text client: custom endpoint (%s)", model)
return OpenAI(api_key=custom_key, base_url=custom_base), model
# 4. Nothing available
logger.debug("Auxiliary text client: none available")
return None, None
def get_vision_auxiliary_client() -> Tuple[Optional[OpenAI], Optional[str]]:
"""Return (client, model_slug) for vision/multimodal auxiliary tasks.
Only OpenRouter and Nous Portal qualify custom endpoints cannot
substitute for Gemini multimodal.
"""
# 1. OpenRouter
or_key = os.getenv("OPENROUTER_API_KEY")
if or_key:
logger.debug("Auxiliary vision client: OpenRouter")
return OpenAI(api_key=or_key, base_url=OPENROUTER_BASE_URL), _OPENROUTER_MODEL
# 2. Nous Portal
nous = _read_nous_auth()
if nous:
logger.debug("Auxiliary vision client: Nous Portal")
return (
OpenAI(api_key=_nous_api_key(nous), base_url=_nous_base_url()),
_NOUS_MODEL,
)
# 3. Nothing suitable
logger.debug("Auxiliary vision client: none available")
return None, None

View file

@ -9,13 +9,11 @@ import logging
import os
from typing import Any, Dict, List
from openai import OpenAI
from agent.auxiliary_client import get_text_auxiliary_client
from agent.model_metadata import (
get_model_context_length,
estimate_messages_tokens_rough,
)
from hermes_constants import OPENROUTER_BASE_URL
logger = logging.getLogger(__name__)
@ -31,7 +29,6 @@ class ContextCompressor:
self,
model: str,
threshold_percent: float = 0.85,
summary_model: str = "google/gemini-3-flash-preview",
protect_first_n: int = 3,
protect_last_n: int = 4,
summary_target_tokens: int = 500,
@ -39,7 +36,6 @@ class ContextCompressor:
):
self.model = model
self.threshold_percent = threshold_percent
self.summary_model = summary_model
self.protect_first_n = protect_first_n
self.protect_last_n = protect_last_n
self.summary_target_tokens = summary_target_tokens
@ -53,8 +49,7 @@ class ContextCompressor:
self.last_completion_tokens = 0
self.last_total_tokens = 0
api_key = os.getenv("OPENROUTER_API_KEY", "")
self.client = OpenAI(api_key=api_key, base_url=OPENROUTER_BASE_URL) if api_key else None
self.client, self.summary_model = get_text_auxiliary_client()
def update_from_response(self, usage: Dict[str, Any]):
"""Update tracked token usage from API response."""
@ -155,6 +150,26 @@ Write only the summary, starting with "[CONTEXT SUMMARY]:" prefix."""
if not self.quiet_mode:
print(f"\n📦 Context compression triggered ({display_tokens:,} tokens ≥ {self.threshold_tokens:,} threshold)")
print(f" 📊 Model context limit: {self.context_length:,} tokens ({self.threshold_percent*100:.0f}% = {self.threshold_tokens:,})")
# Truncation fallback when no auxiliary model is available
if self.client is None:
print("⚠️ Context compression: no auxiliary model available. Falling back to message truncation.")
# Keep system message(s) at the front and the protected tail;
# simply drop the oldest non-system messages until under threshold.
kept = []
for msg in messages:
if msg.get("role") == "system":
kept.append(msg.copy())
else:
break
tail = messages[-self.protect_last_n:]
kept.extend(m.copy() for m in tail)
self.compression_count += 1
if not self.quiet_mode:
print(f" ✂️ Truncated: {len(messages)}{len(kept)} messages (dropped middle turns)")
return kept
if not self.quiet_mode:
print(f" 🗜️ Summarizing turns {compress_start+1}-{compress_end} ({len(turns_to_summarize)} turns)")
summary = self._generate_summary(turns_to_summarize)

View file

@ -4,11 +4,16 @@ Pure display functions and classes with no AIAgent dependency.
Used by AIAgent._execute_tool_calls for CLI feedback.
"""
import json
import os
import random
import threading
import time
# ANSI escape codes for coloring tool failure indicators
_RED = "\033[31m"
_RESET = "\033[0m"
# =========================================================================
# Tool preview (one-line summary of a tool call's primary argument)
@ -242,12 +247,46 @@ KAWAII_GENERIC = [
# Cute tool message (completion line that replaces the spinner)
# =========================================================================
def get_cute_tool_message(tool_name: str, args: dict, duration: float) -> str:
def _detect_tool_failure(tool_name: str, result: str | None) -> tuple[bool, str]:
"""Inspect a tool result string for signs of failure.
Returns ``(is_failure, suffix)`` where *suffix* is an informational tag
like ``" [exit 1]"`` for terminal failures, or ``" [error]"`` for generic
failures. On success, returns ``(False, "")``.
"""
if result is None:
return False, ""
if tool_name == "terminal":
try:
data = json.loads(result)
exit_code = data.get("exit_code")
if exit_code is not None and exit_code != 0:
return True, f" [exit {exit_code}]"
except (json.JSONDecodeError, TypeError, AttributeError):
pass
return False, ""
# Generic heuristic for non-terminal tools
lower = result[:500].lower()
if '"error"' in lower or '"failed"' in lower or result.startswith("Error"):
return True, " [error]"
return False, ""
def get_cute_tool_message(
tool_name: str, args: dict, duration: float, result: str | None = None,
) -> str:
"""Generate a formatted tool completion line for CLI quiet mode.
Format: ``| {emoji} {verb:9} {detail} {duration}``
When *result* is provided the line is checked for failure indicators.
Failed tool calls get a red prefix and an informational suffix.
"""
dur = f"{duration:.1f}s"
is_failure, failure_suffix = _detect_tool_failure(tool_name, result)
def _trunc(s, n=40):
s = str(s)
@ -257,105 +296,111 @@ def get_cute_tool_message(tool_name: str, args: dict, duration: float) -> str:
p = str(p)
return ("..." + p[-(n-3):]) if len(p) > n else p
def _wrap(line: str) -> str:
"""Apply red coloring and failure suffix when the tool failed."""
if not is_failure:
return line
return f"{_RED}{line}{failure_suffix}{_RESET}"
if tool_name == "web_search":
return f"┊ 🔍 search {_trunc(args.get('query', ''), 42)} {dur}"
return _wrap(f"┊ 🔍 search {_trunc(args.get('query', ''), 42)} {dur}")
if tool_name == "web_extract":
urls = args.get("urls", [])
if urls:
url = urls[0] if isinstance(urls, list) else str(urls)
domain = url.replace("https://", "").replace("http://", "").split("/")[0]
extra = f" +{len(urls)-1}" if len(urls) > 1 else ""
return f"┊ 📄 fetch {_trunc(domain, 35)}{extra} {dur}"
return f"┊ 📄 fetch pages {dur}"
return _wrap(f"┊ 📄 fetch {_trunc(domain, 35)}{extra} {dur}")
return _wrap(f"┊ 📄 fetch pages {dur}")
if tool_name == "web_crawl":
url = args.get("url", "")
domain = url.replace("https://", "").replace("http://", "").split("/")[0]
return f"┊ 🕸️ crawl {_trunc(domain, 35)} {dur}"
return _wrap(f"┊ 🕸️ crawl {_trunc(domain, 35)} {dur}")
if tool_name == "terminal":
return f"┊ 💻 $ {_trunc(args.get('command', ''), 42)} {dur}"
return _wrap(f"┊ 💻 $ {_trunc(args.get('command', ''), 42)} {dur}")
if tool_name == "process":
action = args.get("action", "?")
sid = args.get("session_id", "")[:12]
labels = {"list": "ls processes", "poll": f"poll {sid}", "log": f"log {sid}",
"wait": f"wait {sid}", "kill": f"kill {sid}", "write": f"write {sid}", "submit": f"submit {sid}"}
return f"┊ ⚙️ proc {labels.get(action, f'{action} {sid}')} {dur}"
return _wrap(f"┊ ⚙️ proc {labels.get(action, f'{action} {sid}')} {dur}")
if tool_name == "read_file":
return f"┊ 📖 read {_path(args.get('path', ''))} {dur}"
return _wrap(f"┊ 📖 read {_path(args.get('path', ''))} {dur}")
if tool_name == "write_file":
return f"┊ ✍️ write {_path(args.get('path', ''))} {dur}"
return _wrap(f"┊ ✍️ write {_path(args.get('path', ''))} {dur}")
if tool_name == "patch":
return f"┊ 🔧 patch {_path(args.get('path', ''))} {dur}"
return _wrap(f"┊ 🔧 patch {_path(args.get('path', ''))} {dur}")
if tool_name == "search_files":
pattern = _trunc(args.get("pattern", ""), 35)
target = args.get("target", "content")
verb = "find" if target == "files" else "grep"
return f"┊ 🔎 {verb:9} {pattern} {dur}"
return _wrap(f"┊ 🔎 {verb:9} {pattern} {dur}")
if tool_name == "browser_navigate":
url = args.get("url", "")
domain = url.replace("https://", "").replace("http://", "").split("/")[0]
return f"┊ 🌐 navigate {_trunc(domain, 35)} {dur}"
return _wrap(f"┊ 🌐 navigate {_trunc(domain, 35)} {dur}")
if tool_name == "browser_snapshot":
mode = "full" if args.get("full") else "compact"
return f"┊ 📸 snapshot {mode} {dur}"
return _wrap(f"┊ 📸 snapshot {mode} {dur}")
if tool_name == "browser_click":
return f"┊ 👆 click {args.get('ref', '?')} {dur}"
return _wrap(f"┊ 👆 click {args.get('ref', '?')} {dur}")
if tool_name == "browser_type":
return f"┊ ⌨️ type \"{_trunc(args.get('text', ''), 30)}\" {dur}"
return _wrap(f"┊ ⌨️ type \"{_trunc(args.get('text', ''), 30)}\" {dur}")
if tool_name == "browser_scroll":
d = args.get("direction", "down")
arrow = {"down": "", "up": "", "right": "", "left": ""}.get(d, "")
return f"{arrow} scroll {d} {dur}"
return _wrap(f"{arrow} scroll {d} {dur}")
if tool_name == "browser_back":
return f"┊ ◀️ back {dur}"
return _wrap(f"┊ ◀️ back {dur}")
if tool_name == "browser_press":
return f"┊ ⌨️ press {args.get('key', '?')} {dur}"
return _wrap(f"┊ ⌨️ press {args.get('key', '?')} {dur}")
if tool_name == "browser_close":
return f"┊ 🚪 close browser {dur}"
return _wrap(f"┊ 🚪 close browser {dur}")
if tool_name == "browser_get_images":
return f"┊ 🖼️ images extracting {dur}"
return _wrap(f"┊ 🖼️ images extracting {dur}")
if tool_name == "browser_vision":
return f"┊ 👁️ vision analyzing page {dur}"
return _wrap(f"┊ 👁️ vision analyzing page {dur}")
if tool_name == "todo":
todos_arg = args.get("todos")
merge = args.get("merge", False)
if todos_arg is None:
return f"┊ 📋 plan reading tasks {dur}"
return _wrap(f"┊ 📋 plan reading tasks {dur}")
elif merge:
return f"┊ 📋 plan update {len(todos_arg)} task(s) {dur}"
return _wrap(f"┊ 📋 plan update {len(todos_arg)} task(s) {dur}")
else:
return f"┊ 📋 plan {len(todos_arg)} task(s) {dur}"
return _wrap(f"┊ 📋 plan {len(todos_arg)} task(s) {dur}")
if tool_name == "session_search":
return f"┊ 🔍 recall \"{_trunc(args.get('query', ''), 35)}\" {dur}"
return _wrap(f"┊ 🔍 recall \"{_trunc(args.get('query', ''), 35)}\" {dur}")
if tool_name == "memory":
action = args.get("action", "?")
target = args.get("target", "")
if action == "add":
return f"┊ 🧠 memory +{target}: \"{_trunc(args.get('content', ''), 30)}\" {dur}"
return _wrap(f"┊ 🧠 memory +{target}: \"{_trunc(args.get('content', ''), 30)}\" {dur}")
elif action == "replace":
return f"┊ 🧠 memory ~{target}: \"{_trunc(args.get('old_text', ''), 20)}\" {dur}"
return _wrap(f"┊ 🧠 memory ~{target}: \"{_trunc(args.get('old_text', ''), 20)}\" {dur}")
elif action == "remove":
return f"┊ 🧠 memory -{target}: \"{_trunc(args.get('old_text', ''), 20)}\" {dur}"
return f"┊ 🧠 memory {action} {dur}"
return _wrap(f"┊ 🧠 memory -{target}: \"{_trunc(args.get('old_text', ''), 20)}\" {dur}")
return _wrap(f"┊ 🧠 memory {action} {dur}")
if tool_name == "skills_list":
return f"┊ 📚 skills list {args.get('category', 'all')} {dur}"
return _wrap(f"┊ 📚 skills list {args.get('category', 'all')} {dur}")
if tool_name == "skill_view":
return f"┊ 📚 skill {_trunc(args.get('name', ''), 30)} {dur}"
return _wrap(f"┊ 📚 skill {_trunc(args.get('name', ''), 30)} {dur}")
if tool_name == "image_generate":
return f"┊ 🎨 create {_trunc(args.get('prompt', ''), 35)} {dur}"
return _wrap(f"┊ 🎨 create {_trunc(args.get('prompt', ''), 35)} {dur}")
if tool_name == "text_to_speech":
return f"┊ 🔊 speak {_trunc(args.get('text', ''), 30)} {dur}"
return _wrap(f"┊ 🔊 speak {_trunc(args.get('text', ''), 30)} {dur}")
if tool_name == "vision_analyze":
return f"┊ 👁️ vision {_trunc(args.get('question', ''), 30)} {dur}"
return _wrap(f"┊ 👁️ vision {_trunc(args.get('question', ''), 30)} {dur}")
if tool_name == "mixture_of_agents":
return f"┊ 🧠 reason {_trunc(args.get('user_prompt', ''), 30)} {dur}"
return _wrap(f"┊ 🧠 reason {_trunc(args.get('user_prompt', ''), 30)} {dur}")
if tool_name == "send_message":
return f"┊ 📨 send {args.get('target', '?')}: \"{_trunc(args.get('message', ''), 25)}\" {dur}"
return _wrap(f"┊ 📨 send {args.get('target', '?')}: \"{_trunc(args.get('message', ''), 25)}\" {dur}")
if tool_name == "schedule_cronjob":
return f"┊ ⏰ schedule {_trunc(args.get('name', args.get('prompt', 'task')), 30)} {dur}"
return _wrap(f"┊ ⏰ schedule {_trunc(args.get('name', args.get('prompt', 'task')), 30)} {dur}")
if tool_name == "list_cronjobs":
return f"┊ ⏰ jobs listing {dur}"
return _wrap(f"┊ ⏰ jobs listing {dur}")
if tool_name == "remove_cronjob":
return f"┊ ⏰ remove job {args.get('job_id', '?')} {dur}"
return _wrap(f"┊ ⏰ remove job {args.get('job_id', '?')} {dur}")
if tool_name.startswith("rl_"):
rl = {
"rl_list_environments": "list envs", "rl_select_environment": f"select {args.get('name', '')}",
@ -364,16 +409,16 @@ def get_cute_tool_message(tool_name: str, args: dict, duration: float) -> str:
"rl_stop_training": f"stop {args.get('run_id', '?')[:12]}", "rl_get_results": f"results {args.get('run_id', '?')[:12]}",
"rl_list_runs": "list runs", "rl_test_inference": "test inference",
}
return f"┊ 🧪 rl {rl.get(tool_name, tool_name.replace('rl_', ''))} {dur}"
return _wrap(f"┊ 🧪 rl {rl.get(tool_name, tool_name.replace('rl_', ''))} {dur}")
if tool_name == "execute_code":
code = args.get("code", "")
first_line = code.strip().split("\n")[0] if code.strip() else ""
return f"┊ 🐍 exec {_trunc(first_line, 35)} {dur}"
return _wrap(f"┊ 🐍 exec {_trunc(first_line, 35)} {dur}")
if tool_name == "delegate_task":
tasks = args.get("tasks")
if tasks and isinstance(tasks, list):
return f"┊ 🔀 delegate {len(tasks)} parallel tasks {dur}"
return f"┊ 🔀 delegate {_trunc(args.get('goal', ''), 35)} {dur}"
return _wrap(f"┊ 🔀 delegate {len(tasks)} parallel tasks {dur}")
return _wrap(f"┊ 🔀 delegate {_trunc(args.get('goal', ''), 35)} {dur}")
preview = build_tool_preview(tool_name, args) or ""
return f"┊ ⚡ {tool_name[:9]:9} {_trunc(preview, 35)} {dur}"
return _wrap(f"┊ ⚡ {tool_name[:9]:9} {_trunc(preview, 35)} {dur}")