Merge remote-tracking branch 'origin/main' into feat/honcho-async-memory

Made-with: Cursor

# Conflicts:
#	cli.py
#	tests/test_run_agent.py
This commit is contained in:
Erosika 2026-03-11 12:22:56 -04:00
commit a0b0dbe6b2
138 changed files with 17829 additions and 1109 deletions

View file

@ -187,6 +187,30 @@ def _resolve_runtime_agent_kwargs() -> dict:
}
def _resolve_gateway_model() -> str:
"""Read model from env/config — mirrors the resolution in _run_agent_sync.
Without this, temporary AIAgent instances (memory flush, /compress) fall
back to the hardcoded default ("anthropic/claude-opus-4.6") which fails
when the active provider is openai-codex.
"""
model = os.getenv("HERMES_MODEL") or os.getenv("LLM_MODEL") or "anthropic/claude-opus-4.6"
try:
import yaml as _y
_cfg_path = _hermes_home / "config.yaml"
if _cfg_path.exists():
with open(_cfg_path, encoding="utf-8") as _f:
_cfg = _y.safe_load(_f) or {}
_model_cfg = _cfg.get("model", {})
if isinstance(_model_cfg, str):
model = _model_cfg
elif isinstance(_model_cfg, dict):
model = _model_cfg.get("default", model)
except Exception:
pass
return model
class GatewayRunner:
"""
Main gateway controller.
@ -321,8 +345,14 @@ class GatewayRunner:
if not runtime_kwargs.get("api_key"):
return
# Resolve model from config — AIAgent's default is OpenRouter-
# formatted ("anthropic/claude-opus-4.6") which fails when the
# active provider is openai-codex.
model = _resolve_gateway_model()
tmp_agent = AIAgent(
**runtime_kwargs,
model=model,
max_iterations=8,
quiet_mode=True,
enabled_toolsets=["memory", "skills"],
@ -743,6 +773,13 @@ class GatewayRunner:
return None
return HomeAssistantAdapter(config)
elif platform == Platform.EMAIL:
from gateway.platforms.email import EmailAdapter, check_email_requirements
if not check_email_requirements():
logger.warning("Email: EMAIL_ADDRESS, EMAIL_PASSWORD, EMAIL_IMAP_HOST, or EMAIL_SMTP_HOST not set")
return None
return EmailAdapter(config)
return None
def _is_user_authorized(self, source: SessionSource) -> bool:
@ -772,6 +809,7 @@ class GatewayRunner:
Platform.WHATSAPP: "WHATSAPP_ALLOWED_USERS",
Platform.SLACK: "SLACK_ALLOWED_USERS",
Platform.SIGNAL: "SIGNAL_ALLOWED_USERS",
Platform.EMAIL: "EMAIL_ALLOWED_USERS",
}
platform_allow_all_map = {
Platform.TELEGRAM: "TELEGRAM_ALLOW_ALL_USERS",
@ -779,6 +817,7 @@ class GatewayRunner:
Platform.WHATSAPP: "WHATSAPP_ALLOW_ALL_USERS",
Platform.SLACK: "SLACK_ALLOW_ALL_USERS",
Platform.SIGNAL: "SIGNAL_ALLOW_ALL_USERS",
Platform.EMAIL: "EMAIL_ALLOW_ALL_USERS",
}
# Per-platform allow-all flag (e.g., DISCORD_ALLOW_ALL_USERS=true)
@ -877,7 +916,8 @@ class GatewayRunner:
_known_commands = {"new", "reset", "help", "status", "stop", "model",
"personality", "retry", "undo", "sethome", "set-home",
"compress", "usage", "insights", "reload-mcp", "reload_mcp",
"update", "title", "resume", "provider", "rollback"}
"update", "title", "resume", "provider", "rollback",
"background"}
if command and command in _known_commands:
await self.hooks.emit(f"command:{command}", {
"platform": source.platform.value if source.platform else "",
@ -939,7 +979,36 @@ class GatewayRunner:
if command == "rollback":
return await self._handle_rollback_command(event)
if command == "background":
return await self._handle_background_command(event)
# User-defined quick commands (bypass agent loop, no LLM call)
if command:
quick_commands = self.config.get("quick_commands", {})
if command in quick_commands:
qcmd = quick_commands[command]
if qcmd.get("type") == "exec":
exec_cmd = qcmd.get("command", "")
if exec_cmd:
try:
proc = await asyncio.create_subprocess_shell(
exec_cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=30)
output = (stdout or stderr).decode().strip()
return output if output else "Command returned no output."
except asyncio.TimeoutError:
return "Quick command timed out (30s)."
except Exception as e:
return f"Quick command error: {e}"
else:
return f"Quick command '/{command}' has no command defined."
else:
return f"Quick command '/{command}' has unsupported type (only 'exec' is supported)."
# Skill slash commands: /skill-name loads the skill and sends to agent
if command:
try:
@ -1021,9 +1090,12 @@ class GatewayRunner:
# repeated truncation/context failures. Detect this early and
# compress proactively — before the agent even starts. (#628)
#
# Thresholds are derived from the SAME compression config the
# agent uses (compression.threshold × model context length) so
# CLI and messaging platforms behave identically.
# Token source priority:
# 1. Actual API-reported prompt_tokens from the last turn
# (stored in session_entry.last_prompt_tokens)
# 2. Rough char-based estimate (str(msg)//4) with a 1.4x
# safety factor to account for overestimation on tool-heavy
# conversations (code/JSON tokenizes at 5-7+ chars/token).
# -----------------------------------------------------------------
if history and len(history) >= 4:
from agent.model_metadata import (
@ -1074,31 +1146,48 @@ class GatewayRunner:
_compress_token_threshold = int(
_hyg_context_length * _hyg_threshold_pct
)
# Warn if still huge after compression (95% of context)
_warn_token_threshold = int(_hyg_context_length * 0.95)
_msg_count = len(history)
_approx_tokens = estimate_messages_tokens_rough(history)
# Prefer actual API-reported tokens from the last turn
# (stored in session entry) over the rough char-based estimate.
# The rough estimate (str(msg)//4) overestimates by 30-50% on
# tool-heavy/code-heavy conversations, causing premature compression.
_stored_tokens = session_entry.last_prompt_tokens
if _stored_tokens > 0:
_approx_tokens = _stored_tokens
_token_source = "actual"
else:
_approx_tokens = estimate_messages_tokens_rough(history)
# Apply safety factor only for rough estimates
_compress_token_threshold = int(
_compress_token_threshold * 1.4
)
_warn_token_threshold = int(_warn_token_threshold * 1.4)
_token_source = "estimated"
_needs_compress = _approx_tokens >= _compress_token_threshold
if _needs_compress:
logger.info(
"Session hygiene: %s messages, ~%s tokens — auto-compressing "
"Session hygiene: %s messages, ~%s tokens (%s) — auto-compressing "
"(threshold: %s%% of %s = %s tokens)",
_msg_count, f"{_approx_tokens:,}",
_msg_count, f"{_approx_tokens:,}", _token_source,
int(_hyg_threshold_pct * 100),
f"{_hyg_context_length:,}",
f"{_compress_token_threshold:,}",
)
_hyg_adapter = self.adapters.get(source.platform)
_hyg_meta = {"thread_id": source.thread_id} if source.thread_id else None
if _hyg_adapter:
try:
await _hyg_adapter.send(
source.chat_id,
f"🗜️ Session is large ({_msg_count} messages, "
f"~{_approx_tokens:,} tokens). Auto-compressing..."
f"~{_approx_tokens:,} tokens). Auto-compressing...",
metadata=_hyg_meta,
)
except Exception:
pass
@ -1118,6 +1207,7 @@ class GatewayRunner:
if len(_hyg_msgs) >= 4:
_hyg_agent = AIAgent(
**_hyg_runtime,
model=_hyg_model,
max_iterations=4,
quiet_mode=True,
enabled_toolsets=["memory"],
@ -1136,6 +1226,8 @@ class GatewayRunner:
self.session_store.rewrite_transcript(
session_entry.session_id, _compressed
)
# Reset stored token count — transcript was rewritten
session_entry.last_prompt_tokens = 0
history = _compressed
_new_count = len(_compressed)
_new_tokens = estimate_messages_tokens_rough(
@ -1156,7 +1248,8 @@ class GatewayRunner:
f"🗜️ Compressed: {_msg_count}"
f"{_new_count} messages, "
f"~{_approx_tokens:,}"
f"~{_new_tokens:,} tokens"
f"~{_new_tokens:,} tokens",
metadata=_hyg_meta,
)
except Exception:
pass
@ -1176,7 +1269,8 @@ class GatewayRunner:
"after compression "
f"(~{_new_tokens:,} tokens). "
"Consider using /reset to start "
"fresh if you experience issues."
"fresh if you experience issues.",
metadata=_hyg_meta,
)
except Exception:
pass
@ -1188,6 +1282,7 @@ class GatewayRunner:
# Compression failed and session is dangerously large
if _approx_tokens >= _warn_token_threshold:
_hyg_adapter = self.adapters.get(source.platform)
_hyg_meta = {"thread_id": source.thread_id} if source.thread_id else None
if _hyg_adapter:
try:
await _hyg_adapter.send(
@ -1197,7 +1292,8 @@ class GatewayRunner:
f"~{_approx_tokens:,} tokens) and "
"auto-compression failed. Consider "
"using /compress or /reset to avoid "
"issues."
"issues.",
metadata=_hyg_meta,
)
except Exception:
pass
@ -1393,6 +1489,11 @@ class GatewayRunner:
{"role": "assistant", "content": response, "timestamp": ts}
)
else:
# The agent already persisted these messages to SQLite via
# _flush_messages_to_session_db(), so skip the DB write here
# to prevent the duplicate-write bug (#860). We still write
# to JSONL for backward compatibility and as a backup.
agent_persisted = self._session_db is not None
for msg in new_messages:
# Skip system messages (they're rebuilt each run)
if msg.get("role") == "system":
@ -1400,11 +1501,15 @@ class GatewayRunner:
# Add timestamp to each message for debugging
entry = {**msg, "timestamp": ts}
self.session_store.append_to_transcript(
session_entry.session_id, entry
session_entry.session_id, entry,
skip_db=agent_persisted,
)
# Update session
self.session_store.update_session(session_entry.session_key)
# Update session with actual prompt token count from the agent
self.session_store.update_session(
session_entry.session_key,
last_prompt_tokens=agent_result.get("last_prompt_tokens", 0),
)
return response
@ -1512,6 +1617,7 @@ class GatewayRunner:
"`/usage` — Show token usage for this session",
"`/insights [days]` — Show usage insights and analytics",
"`/rollback [number]` — List or restore filesystem checkpoints",
"`/background <prompt>` — Run a prompt in a separate background session",
"`/reload-mcp` — Reload MCP servers from config",
"`/update` — Update Hermes Agent to the latest version",
"`/help` — Show this message",
@ -1745,14 +1851,39 @@ class GatewayRunner:
if not args:
lines = ["🎭 **Available Personalities**\n"]
lines.append("• `none` — (no personality overlay)")
for name, prompt in personalities.items():
preview = prompt[:50] + "..." if len(prompt) > 50 else prompt
if isinstance(prompt, dict):
preview = prompt.get("description") or prompt.get("system_prompt", "")[:50]
else:
preview = prompt[:50] + "..." if len(prompt) > 50 else prompt
lines.append(f"• `{name}` — {preview}")
lines.append(f"\nUsage: `/personality <name>`")
return "\n".join(lines)
if args in personalities:
new_prompt = personalities[args]
def _resolve_prompt(value):
if isinstance(value, dict):
parts = [value.get("system_prompt", "")]
if value.get("tone"):
parts.append(f'Tone: {value["tone"]}')
if value.get("style"):
parts.append(f'Style: {value["style"]}')
return "\n".join(p for p in parts if p)
return str(value)
if args in ("none", "default", "neutral"):
try:
if "agent" not in config or not isinstance(config.get("agent"), dict):
config["agent"] = {}
config["agent"]["system_prompt"] = ""
with open(config_path, "w") as f:
yaml.dump(config, f, default_flow_style=False, sort_keys=False)
except Exception as e:
return f"⚠️ Failed to save personality change: {e}"
self._ephemeral_system_prompt = ""
return "🎭 Personality cleared — using base agent behavior.\n_(takes effect on next message)_"
elif args in personalities:
new_prompt = _resolve_prompt(personalities[args])
# Write to config.yaml, same pattern as CLI save_config_value.
try:
@ -1769,7 +1900,7 @@ class GatewayRunner:
return f"🎭 Personality set to **{args}**\n_(takes effect on next message)_"
available = ", ".join(f"`{n}`" for n in personalities.keys())
available = "`none`, " + ", ".join(f"`{n}`" for n in personalities.keys())
return f"Unknown personality: `{args}`\n\nAvailable: {available}"
async def _handle_retry_command(self, event: MessageEvent) -> str:
@ -1793,6 +1924,8 @@ class GatewayRunner:
# Truncate history to before the last user message and persist
truncated = history[:last_user_idx]
self.session_store.rewrite_transcript(session_entry.session_id, truncated)
# Reset stored token count — transcript was truncated
session_entry.last_prompt_tokens = 0
# Re-send by creating a fake text event with the old message
retry_event = MessageEvent(
@ -1824,6 +1957,8 @@ class GatewayRunner:
removed_msg = history[last_user_idx].get("content", "")
removed_count = len(history) - last_user_idx
self.session_store.rewrite_transcript(session_entry.session_id, history[:last_user_idx])
# Reset stored token count — transcript was truncated
session_entry.last_prompt_tokens = 0
preview = removed_msg[:40] + "..." if len(removed_msg) > 40 else removed_msg
return f"↩️ Undid {removed_count} message(s).\nRemoved: \"{preview}\""
@ -1917,6 +2052,197 @@ class GatewayRunner:
)
return f"{result['error']}"
async def _handle_background_command(self, event: MessageEvent) -> str:
"""Handle /background <prompt> — run a prompt in a separate background session.
Spawns a new AIAgent in a background thread with its own session.
When it completes, sends the result back to the same chat without
modifying the active session's conversation history.
"""
prompt = event.get_command_args().strip()
if not prompt:
return (
"Usage: /background <prompt>\n"
"Example: /background Summarize the top HN stories today\n\n"
"Runs the prompt in a separate session. "
"You can keep chatting — the result will appear here when done."
)
source = event.source
task_id = f"bg_{datetime.now().strftime('%H%M%S')}_{os.urandom(3).hex()}"
# Fire-and-forget the background task
asyncio.create_task(
self._run_background_task(prompt, source, task_id)
)
preview = prompt[:60] + ("..." if len(prompt) > 60 else "")
return f'🔄 Background task started: "{preview}"\nTask ID: {task_id}\nYou can keep chatting — results will appear when done.'
async def _run_background_task(
self, prompt: str, source: "SessionSource", task_id: str
) -> None:
"""Execute a background agent task and deliver the result to the chat."""
from run_agent import AIAgent
adapter = self.adapters.get(source.platform)
if not adapter:
logger.warning("No adapter for platform %s in background task %s", source.platform, task_id)
return
_thread_metadata = {"thread_id": source.thread_id} if source.thread_id else None
try:
runtime_kwargs = _resolve_runtime_agent_kwargs()
if not runtime_kwargs.get("api_key"):
await adapter.send(
source.chat_id,
f"❌ Background task {task_id} failed: no provider credentials configured.",
metadata=_thread_metadata,
)
return
# Read model from config via shared helper
model = _resolve_gateway_model()
# Determine toolset (same logic as _run_agent)
default_toolset_map = {
Platform.LOCAL: "hermes-cli",
Platform.TELEGRAM: "hermes-telegram",
Platform.DISCORD: "hermes-discord",
Platform.WHATSAPP: "hermes-whatsapp",
Platform.SLACK: "hermes-slack",
Platform.SIGNAL: "hermes-signal",
Platform.HOMEASSISTANT: "hermes-homeassistant",
Platform.EMAIL: "hermes-email",
}
platform_toolsets_config = {}
try:
config_path = _hermes_home / 'config.yaml'
if config_path.exists():
import yaml
with open(config_path, 'r', encoding="utf-8") as f:
user_config = yaml.safe_load(f) or {}
platform_toolsets_config = user_config.get("platform_toolsets", {})
except Exception:
pass
platform_config_key = {
Platform.LOCAL: "cli",
Platform.TELEGRAM: "telegram",
Platform.DISCORD: "discord",
Platform.WHATSAPP: "whatsapp",
Platform.SLACK: "slack",
Platform.SIGNAL: "signal",
Platform.HOMEASSISTANT: "homeassistant",
Platform.EMAIL: "email",
}.get(source.platform, "telegram")
config_toolsets = platform_toolsets_config.get(platform_config_key)
if config_toolsets and isinstance(config_toolsets, list):
enabled_toolsets = config_toolsets
else:
default_toolset = default_toolset_map.get(source.platform, "hermes-telegram")
enabled_toolsets = [default_toolset]
platform_key = "cli" if source.platform == Platform.LOCAL else source.platform.value
pr = self._provider_routing
max_iterations = int(os.getenv("HERMES_MAX_ITERATIONS", "90"))
def run_sync():
agent = AIAgent(
model=model,
**runtime_kwargs,
max_iterations=max_iterations,
quiet_mode=True,
verbose_logging=False,
enabled_toolsets=enabled_toolsets,
reasoning_config=self._reasoning_config,
providers_allowed=pr.get("only"),
providers_ignored=pr.get("ignore"),
providers_order=pr.get("order"),
provider_sort=pr.get("sort"),
provider_require_parameters=pr.get("require_parameters", False),
provider_data_collection=pr.get("data_collection"),
session_id=task_id,
platform=platform_key,
session_db=self._session_db,
fallback_model=self._fallback_model,
)
return agent.run_conversation(
user_message=prompt,
task_id=task_id,
)
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, run_sync)
response = result.get("final_response", "") if result else ""
if not response and result and result.get("error"):
response = f"Error: {result['error']}"
# Extract media files from the response
if response:
media_files, response = adapter.extract_media(response)
images, text_content = adapter.extract_images(response)
preview = prompt[:60] + ("..." if len(prompt) > 60 else "")
header = f'✅ Background task complete\nPrompt: "{preview}"\n\n'
if text_content:
await adapter.send(
chat_id=source.chat_id,
content=header + text_content,
metadata=_thread_metadata,
)
elif not images and not media_files:
await adapter.send(
chat_id=source.chat_id,
content=header + "(No response generated)",
metadata=_thread_metadata,
)
# Send extracted images
for image_url, alt_text in (images or []):
try:
await adapter.send_image(
chat_id=source.chat_id,
image_url=image_url,
caption=alt_text,
)
except Exception:
pass
# Send media files
for media_path in (media_files or []):
try:
await adapter.send_file(
chat_id=source.chat_id,
file_path=media_path,
)
except Exception:
pass
else:
preview = prompt[:60] + ("..." if len(prompt) > 60 else "")
await adapter.send(
chat_id=source.chat_id,
content=f'✅ Background task complete\nPrompt: "{preview}"\n\n(No response generated)',
metadata=_thread_metadata,
)
except Exception as e:
logger.exception("Background task %s failed", task_id)
try:
await adapter.send(
chat_id=source.chat_id,
content=f"❌ Background task {task_id} failed: {e}",
metadata=_thread_metadata,
)
except Exception:
pass
async def _handle_compress_command(self, event: MessageEvent) -> str:
"""Handle /compress command -- manually compress conversation context."""
source = event.source
@ -1934,6 +2260,9 @@ class GatewayRunner:
if not runtime_kwargs.get("api_key"):
return "No provider configured -- cannot compress."
# Resolve model from config (same reason as memory flush above).
model = _resolve_gateway_model()
msgs = [
{"role": m.get("role"), "content": m.get("content")}
for m in history
@ -1944,6 +2273,7 @@ class GatewayRunner:
tmp_agent = AIAgent(
**runtime_kwargs,
model=model,
max_iterations=4,
quiet_mode=True,
enabled_toolsets=["memory"],
@ -1957,6 +2287,10 @@ class GatewayRunner:
)
self.session_store.rewrite_transcript(session_entry.session_id, compressed)
# Reset stored token count — transcript changed, old value is stale
self.session_store.update_session(
session_entry.session_key, last_prompt_tokens=0,
)
new_count = len(compressed)
new_tokens = estimate_messages_tokens_rough(compressed)
@ -2600,6 +2934,7 @@ class GatewayRunner:
Platform.SLACK: "hermes-slack",
Platform.SIGNAL: "hermes-signal",
Platform.HOMEASSISTANT: "hermes-homeassistant",
Platform.EMAIL: "hermes-email",
}
# Try to load platform_toolsets from config
@ -2623,6 +2958,7 @@ class GatewayRunner:
Platform.SLACK: "slack",
Platform.SIGNAL: "signal",
Platform.HOMEASSISTANT: "homeassistant",
Platform.EMAIL: "email",
}.get(source.platform, "telegram")
# Use config override if present (list of toolsets), otherwise hardcoded default
@ -2776,7 +3112,7 @@ class GatewayRunner:
# Restore typing indicator
await asyncio.sleep(0.3)
await adapter.send_typing(source.chat_id)
await adapter.send_typing(source.chat_id, metadata=_progress_metadata)
except queue.Empty:
await asyncio.sleep(0.3)
@ -2854,21 +3190,7 @@ class GatewayRunner:
except Exception:
pass
model = os.getenv("HERMES_MODEL") or os.getenv("LLM_MODEL") or "anthropic/claude-opus-4.6"
try:
import yaml as _y
_cfg_path = _hermes_home / "config.yaml"
if _cfg_path.exists():
with open(_cfg_path, encoding="utf-8") as _f:
_cfg = _y.safe_load(_f) or {}
_model_cfg = _cfg.get("model", {})
if isinstance(_model_cfg, str):
model = _model_cfg
elif isinstance(_model_cfg, dict):
model = _model_cfg.get("default", model)
except Exception:
pass
model = _resolve_gateway_model()
try:
runtime_kwargs = _resolve_runtime_agent_kwargs()
@ -2974,6 +3296,13 @@ class GatewayRunner:
# Return final response, or a message if something went wrong
final_response = result.get("final_response")
# Extract last actual prompt token count from the agent's compressor
_last_prompt_toks = 0
_agent = agent_holder[0]
if _agent and hasattr(_agent, "context_compressor"):
_last_prompt_toks = getattr(_agent.context_compressor, "last_prompt_tokens", 0)
if not final_response:
error_msg = f"⚠️ {result['error']}" if result.get("error") else "(No response generated)"
return {
@ -2982,6 +3311,7 @@ class GatewayRunner:
"api_calls": result.get("api_calls", 0),
"tools": tools_holder[0] or [],
"history_offset": len(agent_history),
"last_prompt_tokens": _last_prompt_toks,
}
# Scan tool results for MEDIA:<path> tags that need to be delivered
@ -3025,6 +3355,7 @@ class GatewayRunner:
"api_calls": result_holder[0].get("api_calls", 0) if result_holder[0] else 0,
"tools": tools_holder[0] or [],
"history_offset": len(agent_history),
"last_prompt_tokens": _last_prompt_toks,
}
# Start progress message sender if enabled