Refactor Terminal and AIAgent cleanup

This commit is contained in:
teknium1 2026-02-21 22:31:43 -08:00
parent 9018e9dd70
commit 9123cfb5dd
17 changed files with 1842 additions and 976 deletions

6
agent/__init__.py Normal file
View file

@ -0,0 +1,6 @@
"""Agent internals -- extracted modules from run_agent.py.
These modules contain pure utility functions and self-contained classes
that were previously embedded in the 3,600-line run_agent.py. Extracting
them makes run_agent.py focused on the AIAgent orchestrator class.
"""

182
agent/context_compressor.py Normal file
View file

@ -0,0 +1,182 @@
"""Automatic context window compression for long conversations.
Self-contained class with its own OpenAI client for summarization.
Uses Gemini Flash (cheap/fast) to summarize middle turns while
protecting head and tail context.
"""
import logging
import os
from typing import Any, Dict, List
from openai import OpenAI
from agent.model_metadata import (
get_model_context_length,
estimate_messages_tokens_rough,
)
from hermes_constants import OPENROUTER_BASE_URL
logger = logging.getLogger(__name__)
class ContextCompressor:
"""Compresses conversation context when approaching the model's context limit.
Algorithm: protect first N + last N turns, summarize everything in between.
Token tracking uses actual counts from API responses for accuracy.
"""
def __init__(
self,
model: str,
threshold_percent: float = 0.85,
summary_model: str = "google/gemini-3-flash-preview",
protect_first_n: int = 3,
protect_last_n: int = 4,
summary_target_tokens: int = 500,
quiet_mode: bool = False,
):
self.model = model
self.threshold_percent = threshold_percent
self.summary_model = summary_model
self.protect_first_n = protect_first_n
self.protect_last_n = protect_last_n
self.summary_target_tokens = summary_target_tokens
self.quiet_mode = quiet_mode
self.context_length = get_model_context_length(model)
self.threshold_tokens = int(self.context_length * threshold_percent)
self.compression_count = 0
self.last_prompt_tokens = 0
self.last_completion_tokens = 0
self.last_total_tokens = 0
api_key = os.getenv("OPENROUTER_API_KEY", "")
self.client = OpenAI(api_key=api_key, base_url=OPENROUTER_BASE_URL) if api_key else None
def update_from_response(self, usage: Dict[str, Any]):
"""Update tracked token usage from API response."""
self.last_prompt_tokens = usage.get("prompt_tokens", 0)
self.last_completion_tokens = usage.get("completion_tokens", 0)
self.last_total_tokens = usage.get("total_tokens", 0)
def should_compress(self, prompt_tokens: int = None) -> bool:
"""Check if context exceeds the compression threshold."""
tokens = prompt_tokens if prompt_tokens is not None else self.last_prompt_tokens
return tokens >= self.threshold_tokens
def should_compress_preflight(self, messages: List[Dict[str, Any]]) -> bool:
"""Quick pre-flight check using rough estimate (before API call)."""
rough_estimate = estimate_messages_tokens_rough(messages)
return rough_estimate >= self.threshold_tokens
def get_status(self) -> Dict[str, Any]:
"""Get current compression status for display/logging."""
return {
"last_prompt_tokens": self.last_prompt_tokens,
"threshold_tokens": self.threshold_tokens,
"context_length": self.context_length,
"usage_percent": (self.last_prompt_tokens / self.context_length * 100) if self.context_length else 0,
"compression_count": self.compression_count,
}
def _generate_summary(self, turns_to_summarize: List[Dict[str, Any]]) -> str:
"""Generate a concise summary of conversation turns using a fast model."""
if not self.client:
return "[CONTEXT SUMMARY]: Previous conversation turns have been compressed to save space. The assistant performed various actions and received responses."
parts = []
for msg in turns_to_summarize:
role = msg.get("role", "unknown")
content = msg.get("content", "")
if len(content) > 2000:
content = content[:1000] + "\n...[truncated]...\n" + content[-500:]
tool_calls = msg.get("tool_calls", [])
if tool_calls:
tool_names = [tc.get("function", {}).get("name", "?") for tc in tool_calls if isinstance(tc, dict)]
content += f"\n[Tool calls: {', '.join(tool_names)}]"
parts.append(f"[{role.upper()}]: {content}")
content_to_summarize = "\n\n".join(parts)
prompt = f"""Summarize these conversation turns concisely. This summary will replace these turns in the conversation history.
Write from a neutral perspective describing:
1. What actions were taken (tool calls, searches, file operations)
2. Key information or results obtained
3. Important decisions or findings
4. Relevant data, file names, or outputs
Keep factual and informative. Target ~{self.summary_target_tokens} tokens.
---
TURNS TO SUMMARIZE:
{content_to_summarize}
---
Write only the summary, starting with "[CONTEXT SUMMARY]:" prefix."""
try:
response = self.client.chat.completions.create(
model=self.summary_model,
messages=[{"role": "user", "content": prompt}],
temperature=0.3,
max_tokens=self.summary_target_tokens * 2,
timeout=30.0,
)
summary = response.choices[0].message.content.strip()
if not summary.startswith("[CONTEXT SUMMARY]:"):
summary = "[CONTEXT SUMMARY]: " + summary
return summary
except Exception as e:
logging.warning(f"Failed to generate context summary: {e}")
return "[CONTEXT SUMMARY]: Previous conversation turns have been compressed. The assistant performed tool calls and received responses."
def compress(self, messages: List[Dict[str, Any]], current_tokens: int = None) -> List[Dict[str, Any]]:
"""Compress conversation messages by summarizing middle turns.
Keeps first N + last N turns, summarizes everything in between.
"""
n_messages = len(messages)
if n_messages <= self.protect_first_n + self.protect_last_n + 1:
if not self.quiet_mode:
print(f"⚠️ Cannot compress: only {n_messages} messages (need > {self.protect_first_n + self.protect_last_n + 1})")
return messages
compress_start = self.protect_first_n
compress_end = n_messages - self.protect_last_n
if compress_start >= compress_end:
return messages
turns_to_summarize = messages[compress_start:compress_end]
display_tokens = current_tokens if current_tokens else self.last_prompt_tokens or estimate_messages_tokens_rough(messages)
if not self.quiet_mode:
print(f"\n📦 Context compression triggered ({display_tokens:,} tokens ≥ {self.threshold_tokens:,} threshold)")
print(f" 📊 Model context limit: {self.context_length:,} tokens ({self.threshold_percent*100:.0f}% = {self.threshold_tokens:,})")
print(f" 🗜️ Summarizing turns {compress_start+1}-{compress_end} ({len(turns_to_summarize)} turns)")
summary = self._generate_summary(turns_to_summarize)
compressed = []
for i in range(compress_start):
msg = messages[i].copy()
if i == 0 and msg.get("role") == "system" and self.compression_count == 0:
msg["content"] = msg.get("content", "") + "\n\n[Note: Some earlier conversation turns may be summarized to preserve context space.]"
compressed.append(msg)
compressed.append({"role": "user", "content": summary})
for i in range(compress_end, n_messages):
compressed.append(messages[i].copy())
self.compression_count += 1
if not self.quiet_mode:
new_estimate = estimate_messages_tokens_rough(compressed)
saved_estimate = display_tokens - new_estimate
print(f" ✅ Compressed: {n_messages}{len(compressed)} messages (~{saved_estimate:,} tokens saved)")
print(f" 💡 Compression #{self.compression_count} complete")
return compressed

379
agent/display.py Normal file
View file

@ -0,0 +1,379 @@
"""CLI presentation -- spinner, kawaii faces, tool preview formatting.
Pure display functions and classes with no AIAgent dependency.
Used by AIAgent._execute_tool_calls for CLI feedback.
"""
import os
import random
import threading
import time
# =========================================================================
# Tool preview (one-line summary of a tool call's primary argument)
# =========================================================================
def build_tool_preview(tool_name: str, args: dict, max_len: int = 40) -> str:
"""Build a short preview of a tool call's primary argument for display."""
primary_args = {
"terminal": "command", "web_search": "query", "web_extract": "urls",
"read_file": "path", "write_file": "path", "patch": "path",
"search_files": "pattern", "browser_navigate": "url",
"browser_click": "ref", "browser_type": "text",
"image_generate": "prompt", "text_to_speech": "text",
"vision_analyze": "question", "mixture_of_agents": "user_prompt",
"skill_view": "name", "skills_list": "category",
"schedule_cronjob": "name",
}
if tool_name == "process":
action = args.get("action", "")
sid = args.get("session_id", "")
data = args.get("data", "")
timeout_val = args.get("timeout")
parts = [action]
if sid:
parts.append(sid[:16])
if data:
parts.append(f'"{data[:20]}"')
if timeout_val and action == "wait":
parts.append(f"{timeout_val}s")
return " ".join(parts) if parts else None
if tool_name == "todo":
todos_arg = args.get("todos")
merge = args.get("merge", False)
if todos_arg is None:
return "reading task list"
elif merge:
return f"updating {len(todos_arg)} task(s)"
else:
return f"planning {len(todos_arg)} task(s)"
if tool_name == "session_search":
query = args.get("query", "")
return f"recall: \"{query[:25]}{'...' if len(query) > 25 else ''}\""
if tool_name == "memory":
action = args.get("action", "")
target = args.get("target", "")
if action == "add":
content = args.get("content", "")
return f"+{target}: \"{content[:25]}{'...' if len(content) > 25 else ''}\""
elif action == "replace":
return f"~{target}: \"{args.get('old_text', '')[:20]}\""
elif action == "remove":
return f"-{target}: \"{args.get('old_text', '')[:20]}\""
return action
if tool_name == "send_message":
target = args.get("target", "?")
msg = args.get("message", "")
if len(msg) > 20:
msg = msg[:17] + "..."
return f"to {target}: \"{msg}\""
if tool_name.startswith("rl_"):
rl_previews = {
"rl_list_environments": "listing envs",
"rl_select_environment": args.get("name", ""),
"rl_get_current_config": "reading config",
"rl_edit_config": f"{args.get('field', '')}={args.get('value', '')}",
"rl_start_training": "starting",
"rl_check_status": args.get("run_id", "")[:16],
"rl_stop_training": f"stopping {args.get('run_id', '')[:16]}",
"rl_get_results": args.get("run_id", "")[:16],
"rl_list_runs": "listing runs",
"rl_test_inference": f"{args.get('num_steps', 3)} steps",
}
return rl_previews.get(tool_name)
key = primary_args.get(tool_name)
if not key:
for fallback_key in ("query", "text", "command", "path", "name", "prompt"):
if fallback_key in args:
key = fallback_key
break
if not key or key not in args:
return None
value = args[key]
if isinstance(value, list):
value = value[0] if value else ""
preview = str(value).strip()
if not preview:
return None
if len(preview) > max_len:
preview = preview[:max_len - 3] + "..."
return preview
# =========================================================================
# KawaiiSpinner
# =========================================================================
class KawaiiSpinner:
"""Animated spinner with kawaii faces for CLI feedback during tool execution."""
SPINNERS = {
'dots': ['', '', '', '', '', '', '', '', '', ''],
'bounce': ['', '', '', '', '', '', '', ''],
'grow': ['', '', '', '', '', '', '', '', '', '', '', '', '', ''],
'arrows': ['', '', '', '', '', '', '', ''],
'star': ['', '', '', '', '', '', '', ''],
'moon': ['🌑', '🌒', '🌓', '🌔', '🌕', '🌖', '🌗', '🌘'],
'pulse': ['', '', '', '', '', ''],
'brain': ['🧠', '💭', '💡', '', '💫', '🌟', '💡', '💭'],
'sparkle': ['', '˚', '*', '', '', '', '*', '˚'],
}
KAWAII_WAITING = [
"(。◕‿◕。)", "(◕‿◕✿)", "٩(◕‿◕。)۶", "(✿◠‿◠)", "( ˘▽˘)っ",
"♪(´ε` )", "(◕ᴗ◕✿)", "ヾ(^∇^)", "(≧◡≦)", "(★ω★)",
]
KAWAII_THINKING = [
"(。•́︿•̀。)", "(◔_◔)", "(¬‿¬)", "( •_•)>⌐■-■", "(⌐■_■)",
"(´・_・`)", "◉_◉", "(°ロ°)", "( ˘⌣˘)♡", "ヽ(>∀<☆)☆",
"٩(๑❛ᴗ❛๑)۶", "(⊙_⊙)", "(¬_¬)", "( ͡° ͜ʖ ͡°)", "ಠ_ಠ",
]
THINKING_VERBS = [
"pondering", "contemplating", "musing", "cogitating", "ruminating",
"deliberating", "mulling", "reflecting", "processing", "reasoning",
"analyzing", "computing", "synthesizing", "formulating", "brainstorming",
]
def __init__(self, message: str = "", spinner_type: str = 'dots'):
self.message = message
self.spinner_frames = self.SPINNERS.get(spinner_type, self.SPINNERS['dots'])
self.running = False
self.thread = None
self.frame_idx = 0
self.start_time = None
self.last_line_len = 0
def _animate(self):
while self.running:
if os.getenv("HERMES_SPINNER_PAUSE"):
time.sleep(0.1)
continue
frame = self.spinner_frames[self.frame_idx % len(self.spinner_frames)]
elapsed = time.time() - self.start_time
line = f" {frame} {self.message} ({elapsed:.1f}s)"
clear = '\r' + ' ' * self.last_line_len + '\r'
print(clear + line, end='', flush=True)
self.last_line_len = len(line)
self.frame_idx += 1
time.sleep(0.12)
def start(self):
if self.running:
return
self.running = True
self.start_time = time.time()
self.thread = threading.Thread(target=self._animate, daemon=True)
self.thread.start()
def update_text(self, new_message: str):
self.message = new_message
def stop(self, final_message: str = None):
self.running = False
if self.thread:
self.thread.join(timeout=0.5)
print('\r' + ' ' * (self.last_line_len + 5) + '\r', end='', flush=True)
if final_message:
print(f" {final_message}", flush=True)
def __enter__(self):
self.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.stop()
return False
# =========================================================================
# Kawaii face arrays (used by AIAgent._execute_tool_calls for spinner text)
# =========================================================================
KAWAII_SEARCH = [
"♪(´ε` )", "(。◕‿◕。)", "ヾ(^∇^)", "(◕ᴗ◕✿)", "( ˘▽˘)っ",
"٩(◕‿◕。)۶", "(✿◠‿◠)", "♪~(´ε` )", "(ノ´ヮ`)*:・゚✧", "(◎o◎)",
]
KAWAII_READ = [
"φ(゜▽゜*)♪", "( ˘▽˘)っ", "(⌐■_■)", "٩(。•́‿•̀。)۶", "(◕‿◕✿)",
"ヾ(@⌒ー⌒@)", "(✧ω✧)", "♪(๑ᴖ◡ᴖ๑)♪", "(≧◡≦)", "( ´ ▽ ` )",
]
KAWAII_TERMINAL = [
"ヽ(>∀<☆)", "(ノ°∀°)", "٩(^ᴗ^)۶", "ヾ(⌐■_■)ノ♪", "(•̀ᴗ•́)و",
"┗(0)┓", "(`・ω・´)", "( ̄▽ ̄)", "(ง •̀_•́)ง", "ヽ(´▽`)/",
]
KAWAII_BROWSER = [
"(ノ°∀°)", "(☞゚ヮ゚)☞", "( ͡° ͜ʖ ͡°)", "┌( ಠ_ಠ)┘", "(⊙_⊙)",
"ヾ(•ω•`)o", "( ̄ω ̄)", "( ˇωˇ )", "(ᵔᴥᵔ)", "(◎o◎)",
]
KAWAII_CREATE = [
"✧*。٩(ˊᗜˋ*)و✧", "(ノ◕ヮ◕)ノ*:・゚✧", "ヽ(>∀<☆)", "٩(♡ε♡)۶", "(◕‿◕)♡",
"✿◕ ‿ ◕✿", "(*≧▽≦)", "ヾ(-)", "(☆▽☆)", "°˖✧◝(⁰▿⁰)◜✧˖°",
]
KAWAII_SKILL = [
"ヾ(@⌒ー⌒@)", "(๑˃ᴗ˂)ﻭ", "٩(◕‿◕。)۶", "(✿╹◡╹)", "ヽ(・∀・)",
"(ノ´ヮ`)*:・゚✧", "♪(๑ᴖ◡ᴖ๑)♪", "(◠‿◠)", "٩(ˊᗜˋ*)و", "(^▽^)",
"ヾ(^∇^)", "(★ω★)/", "٩(。•́‿•̀。)۶", "(◕ᴗ◕✿)", "(◎o◎)",
"(✧ω✧)", "ヽ(>∀<☆)", "( ˘▽˘)っ", "(≧◡≦) ♡", "ヾ( ̄▽ ̄)",
]
KAWAII_THINK = [
"(っ°Д°;)っ", "(;′⌒`)", "(・_・ヾ", "( ´_ゝ`)", "( ̄ヘ ̄)",
"(。-`ω´-)", "( ˘︹˘ )", "(¬_¬)", "ヽ(ー_ー )", "(一_一)",
]
KAWAII_GENERIC = [
"♪(´ε` )", "(◕‿◕✿)", "ヾ(^∇^)", "٩(◕‿◕。)۶", "(✿◠‿◠)",
"(ノ´ヮ`)*:・゚✧", "ヽ(>∀<☆)", "(☆▽☆)", "( ˘▽˘)っ", "(≧◡≦)",
]
# =========================================================================
# Cute tool message (completion line that replaces the spinner)
# =========================================================================
def get_cute_tool_message(tool_name: str, args: dict, duration: float) -> str:
"""Generate a formatted tool completion line for CLI quiet mode.
Format: ``| {emoji} {verb:9} {detail} {duration}``
"""
dur = f"{duration:.1f}s"
def _trunc(s, n=40):
s = str(s)
return (s[:n-3] + "...") if len(s) > n else s
def _path(p, n=35):
p = str(p)
return ("..." + p[-(n-3):]) if len(p) > n else p
if tool_name == "web_search":
return f"┊ 🔍 search {_trunc(args.get('query', ''), 42)} {dur}"
if tool_name == "web_extract":
urls = args.get("urls", [])
if urls:
url = urls[0] if isinstance(urls, list) else str(urls)
domain = url.replace("https://", "").replace("http://", "").split("/")[0]
extra = f" +{len(urls)-1}" if len(urls) > 1 else ""
return f"┊ 📄 fetch {_trunc(domain, 35)}{extra} {dur}"
return f"┊ 📄 fetch pages {dur}"
if tool_name == "web_crawl":
url = args.get("url", "")
domain = url.replace("https://", "").replace("http://", "").split("/")[0]
return f"┊ 🕸️ crawl {_trunc(domain, 35)} {dur}"
if tool_name == "terminal":
return f"┊ 💻 $ {_trunc(args.get('command', ''), 42)} {dur}"
if tool_name == "process":
action = args.get("action", "?")
sid = args.get("session_id", "")[:12]
labels = {"list": "ls processes", "poll": f"poll {sid}", "log": f"log {sid}",
"wait": f"wait {sid}", "kill": f"kill {sid}", "write": f"write {sid}", "submit": f"submit {sid}"}
return f"┊ ⚙️ proc {labels.get(action, f'{action} {sid}')} {dur}"
if tool_name == "read_file":
return f"┊ 📖 read {_path(args.get('path', ''))} {dur}"
if tool_name == "write_file":
return f"┊ ✍️ write {_path(args.get('path', ''))} {dur}"
if tool_name == "patch":
return f"┊ 🔧 patch {_path(args.get('path', ''))} {dur}"
if tool_name == "search_files":
pattern = _trunc(args.get("pattern", ""), 35)
target = args.get("target", "content")
verb = "find" if target == "files" else "grep"
return f"┊ 🔎 {verb:9} {pattern} {dur}"
if tool_name == "browser_navigate":
url = args.get("url", "")
domain = url.replace("https://", "").replace("http://", "").split("/")[0]
return f"┊ 🌐 navigate {_trunc(domain, 35)} {dur}"
if tool_name == "browser_snapshot":
mode = "full" if args.get("full") else "compact"
return f"┊ 📸 snapshot {mode} {dur}"
if tool_name == "browser_click":
return f"┊ 👆 click {args.get('ref', '?')} {dur}"
if tool_name == "browser_type":
return f"┊ ⌨️ type \"{_trunc(args.get('text', ''), 30)}\" {dur}"
if tool_name == "browser_scroll":
d = args.get("direction", "down")
arrow = {"down": "", "up": "", "right": "", "left": ""}.get(d, "")
return f"{arrow} scroll {d} {dur}"
if tool_name == "browser_back":
return f"┊ ◀️ back {dur}"
if tool_name == "browser_press":
return f"┊ ⌨️ press {args.get('key', '?')} {dur}"
if tool_name == "browser_close":
return f"┊ 🚪 close browser {dur}"
if tool_name == "browser_get_images":
return f"┊ 🖼️ images extracting {dur}"
if tool_name == "browser_vision":
return f"┊ 👁️ vision analyzing page {dur}"
if tool_name == "todo":
todos_arg = args.get("todos")
merge = args.get("merge", False)
if todos_arg is None:
return f"┊ 📋 plan reading tasks {dur}"
elif merge:
return f"┊ 📋 plan update {len(todos_arg)} task(s) {dur}"
else:
return f"┊ 📋 plan {len(todos_arg)} task(s) {dur}"
if tool_name == "session_search":
return f"┊ 🔍 recall \"{_trunc(args.get('query', ''), 35)}\" {dur}"
if tool_name == "memory":
action = args.get("action", "?")
target = args.get("target", "")
if action == "add":
return f"┊ 🧠 memory +{target}: \"{_trunc(args.get('content', ''), 30)}\" {dur}"
elif action == "replace":
return f"┊ 🧠 memory ~{target}: \"{_trunc(args.get('old_text', ''), 20)}\" {dur}"
elif action == "remove":
return f"┊ 🧠 memory -{target}: \"{_trunc(args.get('old_text', ''), 20)}\" {dur}"
return f"┊ 🧠 memory {action} {dur}"
if tool_name == "skills_list":
return f"┊ 📚 skills list {args.get('category', 'all')} {dur}"
if tool_name == "skill_view":
return f"┊ 📚 skill {_trunc(args.get('name', ''), 30)} {dur}"
if tool_name == "image_generate":
return f"┊ 🎨 create {_trunc(args.get('prompt', ''), 35)} {dur}"
if tool_name == "text_to_speech":
return f"┊ 🔊 speak {_trunc(args.get('text', ''), 30)} {dur}"
if tool_name == "vision_analyze":
return f"┊ 👁️ vision {_trunc(args.get('question', ''), 30)} {dur}"
if tool_name == "mixture_of_agents":
return f"┊ 🧠 reason {_trunc(args.get('user_prompt', ''), 30)} {dur}"
if tool_name == "send_message":
return f"┊ 📨 send {args.get('target', '?')}: \"{_trunc(args.get('message', ''), 25)}\" {dur}"
if tool_name == "schedule_cronjob":
return f"┊ ⏰ schedule {_trunc(args.get('name', args.get('prompt', 'task')), 30)} {dur}"
if tool_name == "list_cronjobs":
return f"┊ ⏰ jobs listing {dur}"
if tool_name == "remove_cronjob":
return f"┊ ⏰ remove job {args.get('job_id', '?')} {dur}"
if tool_name.startswith("rl_"):
rl = {
"rl_list_environments": "list envs", "rl_select_environment": f"select {args.get('name', '')}",
"rl_get_current_config": "get config", "rl_edit_config": f"set {args.get('field', '?')}",
"rl_start_training": "start training", "rl_check_status": f"status {args.get('run_id', '?')[:12]}",
"rl_stop_training": f"stop {args.get('run_id', '?')[:12]}", "rl_get_results": f"results {args.get('run_id', '?')[:12]}",
"rl_list_runs": "list runs", "rl_test_inference": "test inference",
}
return f"┊ 🧪 rl {rl.get(tool_name, tool_name.replace('rl_', ''))} {dur}"
if tool_name == "execute_code":
code = args.get("code", "")
first_line = code.strip().split("\n")[0] if code.strip() else ""
return f"┊ 🐍 exec {_trunc(first_line, 35)} {dur}"
if tool_name == "delegate_task":
tasks = args.get("tasks")
if tasks and isinstance(tasks, list):
return f"┊ 🔀 delegate {len(tasks)} parallel tasks {dur}"
return f"┊ 🔀 delegate {_trunc(args.get('goal', ''), 35)} {dur}"
preview = build_tool_preview(tool_name, args) or ""
return f"┊ ⚡ {tool_name[:9]:9} {_trunc(preview, 35)} {dur}"

97
agent/model_metadata.py Normal file
View file

@ -0,0 +1,97 @@
"""Model metadata, context lengths, and token estimation utilities.
Pure utility functions with no AIAgent dependency. Used by ContextCompressor
and run_agent.py for pre-flight context checks.
"""
import logging
import time
from typing import Any, Dict, List
import requests
from hermes_constants import OPENROUTER_MODELS_URL
logger = logging.getLogger(__name__)
_model_metadata_cache: Dict[str, Dict[str, Any]] = {}
_model_metadata_cache_time: float = 0
_MODEL_CACHE_TTL = 3600
DEFAULT_CONTEXT_LENGTHS = {
"anthropic/claude-opus-4": 200000,
"anthropic/claude-opus-4.5": 200000,
"anthropic/claude-opus-4.6": 200000,
"anthropic/claude-sonnet-4": 200000,
"anthropic/claude-sonnet-4-20250514": 200000,
"anthropic/claude-haiku-4.5": 200000,
"openai/gpt-4o": 128000,
"openai/gpt-4-turbo": 128000,
"openai/gpt-4o-mini": 128000,
"google/gemini-2.0-flash": 1048576,
"google/gemini-2.5-pro": 1048576,
"meta-llama/llama-3.3-70b-instruct": 131072,
"deepseek/deepseek-chat-v3": 65536,
"qwen/qwen-2.5-72b-instruct": 32768,
}
def fetch_model_metadata(force_refresh: bool = False) -> Dict[str, Dict[str, Any]]:
"""Fetch model metadata from OpenRouter (cached for 1 hour)."""
global _model_metadata_cache, _model_metadata_cache_time
if not force_refresh and _model_metadata_cache and (time.time() - _model_metadata_cache_time) < _MODEL_CACHE_TTL:
return _model_metadata_cache
try:
response = requests.get(OPENROUTER_MODELS_URL, timeout=10)
response.raise_for_status()
data = response.json()
cache = {}
for model in data.get("data", []):
model_id = model.get("id", "")
cache[model_id] = {
"context_length": model.get("context_length", 128000),
"max_completion_tokens": model.get("top_provider", {}).get("max_completion_tokens", 4096),
"name": model.get("name", model_id),
"pricing": model.get("pricing", {}),
}
canonical = model.get("canonical_slug", "")
if canonical and canonical != model_id:
cache[canonical] = cache[model_id]
_model_metadata_cache = cache
_model_metadata_cache_time = time.time()
logger.debug("Fetched metadata for %s models from OpenRouter", len(cache))
return cache
except Exception as e:
logging.warning(f"Failed to fetch model metadata from OpenRouter: {e}")
return _model_metadata_cache or {}
def get_model_context_length(model: str) -> int:
"""Get the context length for a model (API first, then fallback defaults)."""
metadata = fetch_model_metadata()
if model in metadata:
return metadata[model].get("context_length", 128000)
for default_model, length in DEFAULT_CONTEXT_LENGTHS.items():
if default_model in model or model in default_model:
return length
return 128000
def estimate_tokens_rough(text: str) -> int:
"""Rough token estimate (~4 chars/token) for pre-flight checks."""
if not text:
return 0
return len(text) // 4
def estimate_messages_tokens_rough(messages: List[Dict[str, Any]]) -> int:
"""Rough token estimate for a message list (pre-flight only)."""
total_chars = sum(len(str(msg)) for msg in messages)
return total_chars // 4

230
agent/prompt_builder.py Normal file
View file

@ -0,0 +1,230 @@
"""System prompt assembly -- identity, platform hints, skills index, context files.
All functions are stateless. AIAgent._build_system_prompt() calls these to
assemble pieces, then combines them with memory and ephemeral prompts.
"""
import logging
import os
import re
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
# =========================================================================
# Constants
# =========================================================================
DEFAULT_AGENT_IDENTITY = (
"You are Hermes Agent, an intelligent AI assistant created by Nous Research. "
"You are helpful, knowledgeable, and direct. You assist users with a wide "
"range of tasks including answering questions, writing and editing code, "
"analyzing information, creative work, and executing actions via your tools. "
"You communicate clearly, admit uncertainty when appropriate, and prioritize "
"being genuinely useful over being verbose unless otherwise directed below."
)
PLATFORM_HINTS = {
"whatsapp": (
"You are on a text messaging communication platform, WhatsApp. "
"Please do not use markdown as it does not render."
),
"telegram": (
"You are on a text messaging communication platform, Telegram. "
"Please do not use markdown as it does not render."
),
"discord": (
"You are in a Discord server or group chat communicating with your user."
),
"cli": (
"You are a CLI AI Agent. Try not to use markdown but simple text "
"renderable inside a terminal."
),
}
CONTEXT_FILE_MAX_CHARS = 20_000
CONTEXT_TRUNCATE_HEAD_RATIO = 0.7
CONTEXT_TRUNCATE_TAIL_RATIO = 0.2
# =========================================================================
# Skills index
# =========================================================================
def build_skills_system_prompt() -> str:
"""Build a compact skill index for the system prompt.
Scans ~/.hermes/skills/ for SKILL.md files grouped by category so the
model can match skills at a glance without extra tool calls.
"""
hermes_home = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes"))
skills_dir = hermes_home / "skills"
if not skills_dir.exists():
return ""
skills_by_category = {}
for skill_file in skills_dir.rglob("SKILL.md"):
rel_path = skill_file.relative_to(skills_dir)
parts = rel_path.parts
if len(parts) >= 2:
category = parts[0]
skill_name = parts[-2]
else:
category = "general"
skill_name = skill_file.parent.name
skills_by_category.setdefault(category, []).append(skill_name)
if not skills_by_category:
return ""
category_descriptions = {}
for category in skills_by_category:
desc_file = skills_dir / category / "DESCRIPTION.md"
if desc_file.exists():
try:
content = desc_file.read_text(encoding="utf-8")
match = re.search(r"^---\s*\n.*?description:\s*(.+?)\s*\n.*?^---", content, re.MULTILINE | re.DOTALL)
if match:
category_descriptions[category] = match.group(1).strip()
except Exception as e:
logger.debug("Could not read skill description %s: %s", desc_file, e)
index_lines = []
for category in sorted(skills_by_category.keys()):
desc = category_descriptions.get(category, "")
names = ", ".join(sorted(set(skills_by_category[category])))
if desc:
index_lines.append(f" {category}: {desc}")
else:
index_lines.append(f" {category}:")
index_lines.append(f" skills: {names}")
return (
"## Skills (mandatory)\n"
"Before replying, scan the skills below. If one clearly matches your task, "
"load it with skill_view(name) and follow its instructions. "
"If a skill has issues, fix it with skill_manage(action='patch').\n"
"\n"
"<available_skills>\n"
+ "\n".join(index_lines) + "\n"
"</available_skills>\n"
"\n"
"If none match, proceed normally without loading a skill."
)
# =========================================================================
# Context files (SOUL.md, AGENTS.md, .cursorrules)
# =========================================================================
def _truncate_content(content: str, filename: str, max_chars: int = CONTEXT_FILE_MAX_CHARS) -> str:
"""Head/tail truncation with a marker in the middle."""
if len(content) <= max_chars:
return content
head_chars = int(max_chars * CONTEXT_TRUNCATE_HEAD_RATIO)
tail_chars = int(max_chars * CONTEXT_TRUNCATE_TAIL_RATIO)
head = content[:head_chars]
tail = content[-tail_chars:]
marker = f"\n\n[...truncated {filename}: kept {head_chars}+{tail_chars} of {len(content)} chars. Use file tools to read the full file.]\n\n"
return head + marker + tail
def build_context_files_prompt(cwd: Optional[str] = None) -> str:
"""Discover and load context files for the system prompt.
Discovery: AGENTS.md (recursive), .cursorrules / .cursor/rules/*.mdc,
SOUL.md (cwd then ~/.hermes/ fallback). Each capped at 20,000 chars.
"""
if cwd is None:
cwd = os.getcwd()
cwd_path = Path(cwd).resolve()
sections = []
# AGENTS.md (hierarchical, recursive)
top_level_agents = None
for name in ["AGENTS.md", "agents.md"]:
candidate = cwd_path / name
if candidate.exists():
top_level_agents = candidate
break
if top_level_agents:
agents_files = []
for root, dirs, files in os.walk(cwd_path):
dirs[:] = [d for d in dirs if not d.startswith('.') and d not in ('node_modules', '__pycache__', 'venv', '.venv')]
for f in files:
if f.lower() == "agents.md":
agents_files.append(Path(root) / f)
agents_files.sort(key=lambda p: len(p.parts))
total_agents_content = ""
for agents_path in agents_files:
try:
content = agents_path.read_text(encoding="utf-8").strip()
if content:
rel_path = agents_path.relative_to(cwd_path)
total_agents_content += f"## {rel_path}\n\n{content}\n\n"
except Exception as e:
logger.debug("Could not read %s: %s", agents_path, e)
if total_agents_content:
total_agents_content = _truncate_content(total_agents_content, "AGENTS.md")
sections.append(total_agents_content)
# .cursorrules
cursorrules_content = ""
cursorrules_file = cwd_path / ".cursorrules"
if cursorrules_file.exists():
try:
content = cursorrules_file.read_text(encoding="utf-8").strip()
if content:
cursorrules_content += f"## .cursorrules\n\n{content}\n\n"
except Exception as e:
logger.debug("Could not read .cursorrules: %s", e)
cursor_rules_dir = cwd_path / ".cursor" / "rules"
if cursor_rules_dir.exists() and cursor_rules_dir.is_dir():
mdc_files = sorted(cursor_rules_dir.glob("*.mdc"))
for mdc_file in mdc_files:
try:
content = mdc_file.read_text(encoding="utf-8").strip()
if content:
cursorrules_content += f"## .cursor/rules/{mdc_file.name}\n\n{content}\n\n"
except Exception as e:
logger.debug("Could not read %s: %s", mdc_file, e)
if cursorrules_content:
cursorrules_content = _truncate_content(cursorrules_content, ".cursorrules")
sections.append(cursorrules_content)
# SOUL.md (cwd first, then ~/.hermes/ fallback)
soul_path = None
for name in ["SOUL.md", "soul.md"]:
candidate = cwd_path / name
if candidate.exists():
soul_path = candidate
break
if not soul_path:
global_soul = Path.home() / ".hermes" / "SOUL.md"
if global_soul.exists():
soul_path = global_soul
if soul_path:
try:
content = soul_path.read_text(encoding="utf-8").strip()
if content:
content = _truncate_content(content, "SOUL.md")
sections.append(
f"## SOUL.md\n\nIf SOUL.md is present, embody its persona and tone. "
f"Avoid stiff, generic replies; follow its guidance unless higher-priority "
f"instructions override it.\n\n{content}"
)
except Exception as e:
logger.debug("Could not read SOUL.md from %s: %s", soul_path, e)
if not sections:
return ""
return "# Project Context\n\nThe following project context files have been loaded and should be followed:\n\n" + "\n".join(sections)

68
agent/prompt_caching.py Normal file
View file

@ -0,0 +1,68 @@
"""Anthropic prompt caching (system_and_3 strategy).
Reduces input token costs by ~75% on multi-turn conversations by caching
the conversation prefix. Uses 4 cache_control breakpoints (Anthropic max):
1. System prompt (stable across all turns)
2-4. Last 3 non-system messages (rolling window)
Pure functions -- no class state, no AIAgent dependency.
"""
import copy
from typing import Any, Dict, List
def _apply_cache_marker(msg: dict, cache_marker: dict) -> None:
"""Add cache_control to a single message, handling all format variations."""
role = msg.get("role", "")
content = msg.get("content")
if role == "tool":
msg["cache_control"] = cache_marker
return
if content is None:
msg["cache_control"] = cache_marker
return
if isinstance(content, str):
msg["content"] = [{"type": "text", "text": content, "cache_control": cache_marker}]
return
if isinstance(content, list) and content:
last = content[-1]
if isinstance(last, dict):
last["cache_control"] = cache_marker
def apply_anthropic_cache_control(
api_messages: List[Dict[str, Any]],
cache_ttl: str = "5m",
) -> List[Dict[str, Any]]:
"""Apply system_and_3 caching strategy to messages for Anthropic models.
Places up to 4 cache_control breakpoints: system prompt + last 3 non-system messages.
Returns:
Deep copy of messages with cache_control breakpoints injected.
"""
messages = copy.deepcopy(api_messages)
if not messages:
return messages
marker = {"type": "ephemeral"}
if cache_ttl == "1h":
marker["ttl"] = "1h"
breakpoints_used = 0
if messages[0].get("role") == "system":
_apply_cache_marker(messages[0], marker)
breakpoints_used += 1
remaining = 4 - breakpoints_used
non_sys = [i for i in range(len(messages)) if messages[i].get("role") != "system"]
for idx in non_sys[-remaining:]:
_apply_cache_marker(messages[idx], marker)
return messages

56
agent/trajectory.py Normal file
View file

@ -0,0 +1,56 @@
"""Trajectory saving utilities and static helpers.
_convert_to_trajectory_format stays as an AIAgent method (batch_runner.py
calls agent._convert_to_trajectory_format). Only the static helpers and
the file-write logic live here.
"""
import json
import logging
from datetime import datetime
from typing import Any, Dict, List
logger = logging.getLogger(__name__)
def convert_scratchpad_to_think(content: str) -> str:
"""Convert <REASONING_SCRATCHPAD> tags to <think> tags."""
if not content or "<REASONING_SCRATCHPAD>" not in content:
return content
return content.replace("<REASONING_SCRATCHPAD>", "<think>").replace("</REASONING_SCRATCHPAD>", "</think>")
def has_incomplete_scratchpad(content: str) -> bool:
"""Check if content has an opening <REASONING_SCRATCHPAD> without a closing tag."""
if not content:
return False
return "<REASONING_SCRATCHPAD>" in content and "</REASONING_SCRATCHPAD>" not in content
def save_trajectory(trajectory: List[Dict[str, Any]], model: str,
completed: bool, filename: str = None):
"""Append a trajectory entry to a JSONL file.
Args:
trajectory: The ShareGPT-format conversation list.
model: Model name for metadata.
completed: Whether the conversation completed successfully.
filename: Override output filename. Defaults to trajectory_samples.jsonl
or failed_trajectories.jsonl based on ``completed``.
"""
if filename is None:
filename = "trajectory_samples.jsonl" if completed else "failed_trajectories.jsonl"
entry = {
"conversations": trajectory,
"timestamp": datetime.now().isoformat(),
"model": model,
"completed": completed,
}
try:
with open(filename, "a", encoding="utf-8") as f:
f.write(json.dumps(entry, ensure_ascii=False) + "\n")
logger.info("Trajectory saved to %s", filename)
except Exception as e:
logger.warning("Failed to save trajectory: %s", e)