merge: resolve conflicts with main (plugins + stop commands)

This commit is contained in:
teknium1 2026-03-16 07:32:00 -07:00
commit c2769dffe0
18 changed files with 1877 additions and 43 deletions

View file

@ -73,9 +73,15 @@ DEFAULT_AGENT_IDENTITY = (
MEMORY_GUIDANCE = (
"You have persistent memory across sessions. Save durable facts using the memory "
"tool: user preferences, environment details, tool quirks, and stable conventions. "
"Memory is injected into every turn, so keep it compact. Do NOT save task progress, "
"session outcomes, or completed-work logs to memory; use session_search to recall "
"those from past transcripts."
"Memory is injected into every turn, so keep it compact and focused on facts that "
"will still matter later.\n"
"Prioritize what reduces future user steering — the most valuable memory is one "
"that prevents the user from having to correct or remind you again. "
"User preferences and recurring corrections matter more than procedural task details.\n"
"Do NOT save task progress, session outcomes, completed-work logs, or temporary TODO "
"state to memory; use session_search to recall those from past transcripts. "
"If you've discovered a new way to do something, solved a problem that could be "
"necessary later, save it as a skill with the skill tool."
)
SESSION_SEARCH_GUIDANCE = (
@ -86,8 +92,11 @@ SESSION_SEARCH_GUIDANCE = (
SKILLS_GUIDANCE = (
"After completing a complex task (5+ tool calls), fixing a tricky error, "
"or discovering a non-trivial workflow, consider saving the approach as a "
"skill with skill_manage so you can reuse it next time."
"or discovering a non-trivial workflow, save the approach as a "
"skill with skill_manage so you can reuse it next time.\n"
"When using a skill and finding it outdated, incomplete, or wrong, "
"patch it immediately with skill_manage(action='patch') — don't wait to be asked. "
"Skills that aren't maintained become liabilities."
)
PLATFORM_HINTS = {
@ -326,6 +335,9 @@ def build_skills_system_prompt(
"Before replying, scan the skills below. If one clearly matches your task, "
"load it with skill_view(name) and follow its instructions. "
"If a skill has issues, fix it with skill_manage(action='patch').\n"
"After difficult/iterative tasks, offer to save as a skill. "
"If a skill you loaded was missing steps, had wrong commands, or needed "
"pitfalls you discovered, update it before finishing.\n"
"\n"
"<available_skills>\n"
+ "\n".join(index_lines) + "\n"

108
cli.py
View file

@ -204,6 +204,7 @@ def load_cli_config() -> Dict[str, Any]:
"compact": False,
"resume_display": "full",
"show_reasoning": False,
"show_cost": False,
"skin": "default",
},
"clarify": {
@ -395,7 +396,13 @@ def load_cli_config() -> Dict[str, Any]:
"provider": "AUXILIARY_WEB_EXTRACT_PROVIDER",
"model": "AUXILIARY_WEB_EXTRACT_MODEL",
"base_url": "AUXILIARY_WEB_EXTRACT_BASE_URL",
"api_key": "AUXILIARY_WEB_EXTRACT_API_KEY",
"api_key": "AUXILI..._KEY",
},
"approval": {
"provider": "AUXILIARY_APPROVAL_PROVIDER",
"model": "AUXILIARY_APPROVAL_MODEL",
"base_url": "AUXILIARY_APPROVAL_BASE_URL",
"api_key": "AUXILIARY_APPROVAL_API_KEY",
},
}
@ -1017,6 +1024,8 @@ class HermesCLI:
self.bell_on_complete = CLI_CONFIG["display"].get("bell_on_complete", False)
# show_reasoning: display model thinking/reasoning before the response
self.show_reasoning = CLI_CONFIG["display"].get("show_reasoning", False)
# show_cost: display $ cost in the status bar (off by default)
self.show_cost = CLI_CONFIG["display"].get("show_cost", False)
self.verbose = verbose if verbose is not None else (self.tool_progress_mode == "verbose")
# Configuration - priority: CLI args > env vars > config file
@ -1270,13 +1279,22 @@ class HermesCLI:
width = width or shutil.get_terminal_size((80, 24)).columns
percent = snapshot["context_percent"]
percent_label = f"{percent}%" if percent is not None else "--"
cost_label = f"${snapshot['session_cost']:.2f}" if snapshot["pricing_known"] else "cost n/a"
duration_label = snapshot["duration"]
show_cost = getattr(self, "show_cost", False)
if show_cost:
cost_label = f"${snapshot['session_cost']:.2f}" if snapshot["pricing_known"] else "cost n/a"
else:
cost_label = None
if width < 52:
return f"{snapshot['model_short']} · {duration_label}"
if width < 76:
return f"{snapshot['model_short']} · {percent_label} · {cost_label} · {duration_label}"
parts = [f"{snapshot['model_short']}", percent_label]
if cost_label:
parts.append(cost_label)
parts.append(duration_label)
return " · ".join(parts)
if snapshot["context_length"]:
ctx_total = _format_context_length(snapshot["context_length"])
@ -1285,7 +1303,11 @@ class HermesCLI:
else:
context_label = "ctx --"
return f"{snapshot['model_short']}{context_label}{percent_label}{cost_label}{duration_label}"
parts = [f"{snapshot['model_short']}", context_label, percent_label]
if cost_label:
parts.append(cost_label)
parts.append(duration_label)
return "".join(parts)
except Exception:
return f"{self.model if getattr(self, 'model', None) else 'Hermes'}"
@ -1293,8 +1315,13 @@ class HermesCLI:
try:
snapshot = self._get_status_bar_snapshot()
width = shutil.get_terminal_size((80, 24)).columns
cost_label = f"${snapshot['session_cost']:.2f}" if snapshot["pricing_known"] else "cost n/a"
duration_label = snapshot["duration"]
show_cost = getattr(self, "show_cost", False)
if show_cost:
cost_label = f"${snapshot['session_cost']:.2f}" if snapshot["pricing_known"] else "cost n/a"
else:
cost_label = None
if width < 52:
return [
@ -1308,17 +1335,23 @@ class HermesCLI:
percent = snapshot["context_percent"]
percent_label = f"{percent}%" if percent is not None else "--"
if width < 76:
return [
frags = [
("class:status-bar", ""),
("class:status-bar-strong", snapshot["model_short"]),
("class:status-bar-dim", " · "),
(self._status_bar_context_style(percent), percent_label),
("class:status-bar-dim", " · "),
("class:status-bar-dim", cost_label),
]
if cost_label:
frags.extend([
("class:status-bar-dim", " · "),
("class:status-bar-dim", cost_label),
])
frags.extend([
("class:status-bar-dim", " · "),
("class:status-bar-dim", duration_label),
("class:status-bar", " "),
]
])
return frags
if snapshot["context_length"]:
ctx_total = _format_context_length(snapshot["context_length"])
@ -1328,7 +1361,7 @@ class HermesCLI:
context_label = "ctx --"
bar_style = self._status_bar_context_style(percent)
return [
frags = [
("class:status-bar", ""),
("class:status-bar-strong", snapshot["model_short"]),
("class:status-bar-dim", ""),
@ -1337,12 +1370,18 @@ class HermesCLI:
(bar_style, self._build_context_bar(percent)),
("class:status-bar-dim", " "),
(bar_style, percent_label),
("class:status-bar-dim", ""),
("class:status-bar-dim", cost_label),
]
if cost_label:
frags.extend([
("class:status-bar-dim", ""),
("class:status-bar-dim", cost_label),
])
frags.extend([
("class:status-bar-dim", ""),
("class:status-bar-dim", duration_label),
("class:status-bar", " "),
]
])
return frags
except Exception:
return [("class:status-bar", f" {self._build_status_bar_text()} ")]
@ -1989,6 +2028,26 @@ class HermesCLI:
# Treat as a git hash
return ref
def _handle_stop_command(self):
"""Handle /stop — kill all running background processes.
Inspired by OpenAI Codex's separation of interrupt (stop current turn)
from /stop (clean up background processes). See openai/codex#14602.
"""
from tools.process_registry import get_registry
registry = get_registry()
processes = registry.list_processes()
running = [p for p in processes if p.get("status") == "running"]
if not running:
print(" No running background processes.")
return
print(f" Stopping {len(running)} background process(es)...")
killed = registry.kill_all()
print(f" ✅ Stopped {killed} process(es).")
def _handle_paste_command(self):
"""Handle /paste — explicitly check clipboard for an image.
@ -3239,8 +3298,31 @@ class HermesCLI:
self._reload_mcp()
elif cmd_lower.startswith("/browser"):
self._handle_browser_command(cmd_original)
elif cmd_lower == "/plugins":
try:
from hermes_cli.plugins import get_plugin_manager
mgr = get_plugin_manager()
plugins = mgr.list_plugins()
if not plugins:
print("No plugins installed.")
print(f"Drop plugin directories into ~/.hermes/plugins/ to get started.")
else:
print(f"Plugins ({len(plugins)}):")
for p in plugins:
status = "" if p["enabled"] else ""
version = f" v{p['version']}" if p["version"] else ""
tools = f"{p['tools']} tools" if p["tools"] else ""
hooks = f"{p['hooks']} hooks" if p["hooks"] else ""
parts = [x for x in [tools, hooks] if x]
detail = f" ({', '.join(parts)})" if parts else ""
error = f"{p['error']}" if p["error"] else ""
print(f" {status} {p['name']}{version}{detail}{error}")
except Exception as e:
print(f"Plugin system error: {e}")
elif cmd_lower.startswith("/rollback"):
self._handle_rollback_command(cmd_original)
elif cmd_lower == "/stop":
self._handle_stop_command()
elif cmd_lower.startswith("/background"):
self._handle_background_command(cmd_original)
elif cmd_lower.startswith("/skin"):

View file

@ -157,6 +157,12 @@ if _config_path.exists():
"base_url": "AUXILIARY_WEB_EXTRACT_BASE_URL",
"api_key": "AUXILIARY_WEB_EXTRACT_API_KEY",
},
"approval": {
"provider": "AUXILIARY_APPROVAL_PROVIDER",
"model": "AUXILIARY_APPROVAL_MODEL",
"base_url": "AUXILIARY_APPROVAL_BASE_URL",
"api_key": "AUXILIARY_APPROVAL_API_KEY",
},
}
for _task_key, _env_map in _aux_task_env.items():
_task_cfg = _auxiliary_cfg.get(_task_key, {})

View file

@ -7,7 +7,9 @@ interactive CLI.
from __future__ import annotations
import os
from collections.abc import Callable, Mapping
from pathlib import Path
from typing import Any
from prompt_toolkit.completion import Completer, Completion
@ -26,6 +28,7 @@ COMMANDS_BY_CATEGORY = {
"/title": "Set a title for the current session (usage: /title My Session Name)",
"/compress": "Manually compress conversation context (flush memories + summarize)",
"/rollback": "List or restore filesystem checkpoints (usage: /rollback [number])",
"/stop": "Kill all running background processes",
"/background": "Run a prompt in the background (usage: /background <prompt>)",
},
"Configuration": {
@ -46,6 +49,7 @@ COMMANDS_BY_CATEGORY = {
"/cron": "Manage scheduled tasks (list, add/create, edit, pause, resume, run, remove)",
"/reload-mcp": "Reload MCP servers from config.yaml",
"/browser": "Connect browser tools to your live Chrome (usage: /browser connect|disconnect|status)",
"/plugins": "List installed plugins and their status",
},
"Info": {
"/help": "Show this help message",
@ -93,9 +97,88 @@ class SlashCommandCompleter(Completer):
"""
return f"{cmd_name} " if cmd_name == word else cmd_name
@staticmethod
def _extract_path_word(text: str) -> str | None:
"""Extract the current word if it looks like a file path.
Returns the path-like token under the cursor, or None if the
current word doesn't look like a path. A word is path-like when
it starts with ``./``, ``../``, ``~/``, ``/``, or contains a
``/`` separator (e.g. ``src/main.py``).
"""
if not text:
return None
# Walk backwards to find the start of the current "word".
# Words are delimited by spaces, but paths can contain almost anything.
i = len(text) - 1
while i >= 0 and text[i] != " ":
i -= 1
word = text[i + 1:]
if not word:
return None
# Only trigger path completion for path-like tokens
if word.startswith(("./", "../", "~/", "/")) or "/" in word:
return word
return None
@staticmethod
def _path_completions(word: str, limit: int = 30):
"""Yield Completion objects for file paths matching *word*."""
expanded = os.path.expanduser(word)
# Split into directory part and prefix to match inside it
if expanded.endswith("/"):
search_dir = expanded
prefix = ""
else:
search_dir = os.path.dirname(expanded) or "."
prefix = os.path.basename(expanded)
try:
entries = os.listdir(search_dir)
except OSError:
return
count = 0
prefix_lower = prefix.lower()
for entry in sorted(entries):
if prefix and not entry.lower().startswith(prefix_lower):
continue
if count >= limit:
break
full_path = os.path.join(search_dir, entry)
is_dir = os.path.isdir(full_path)
# Build the completion text (what replaces the typed word)
if word.startswith("~"):
display_path = "~/" + os.path.relpath(full_path, os.path.expanduser("~"))
elif os.path.isabs(word):
display_path = full_path
else:
# Keep relative
display_path = os.path.relpath(full_path)
if is_dir:
display_path += "/"
suffix = "/" if is_dir else ""
meta = "dir" if is_dir else _file_size_label(full_path)
yield Completion(
display_path,
start_position=-len(word),
display=entry + suffix,
display_meta=meta,
)
count += 1
def get_completions(self, document, complete_event):
text = document.text_before_cursor
if not text.startswith("/"):
# Try file path completion for non-slash input
path_word = self._extract_path_word(text)
if path_word is not None:
yield from self._path_completions(path_word)
return
word = text[1:]
@ -121,3 +204,18 @@ class SlashCommandCompleter(Completer):
display=cmd,
display_meta=f"{short_desc}",
)
def _file_size_label(path: str) -> str:
"""Return a compact human-readable file size, or '' on error."""
try:
size = os.path.getsize(path)
except OSError:
return ""
if size < 1024:
return f"{size}B"
if size < 1024 * 1024:
return f"{size / 1024:.0f}K"
if size < 1024 * 1024 * 1024:
return f"{size / (1024 * 1024):.1f}M"
return f"{size / (1024 * 1024 * 1024):.1f}G"

View file

@ -185,6 +185,12 @@ DEFAULT_CONFIG = {
"base_url": "",
"api_key": "",
},
"approval": {
"provider": "auto",
"model": "", # fast/cheap model recommended (e.g. gemini-flash, haiku)
"base_url": "",
"api_key": "",
},
"mcp": {
"provider": "auto",
"model": "",
@ -205,6 +211,7 @@ DEFAULT_CONFIG = {
"resume_display": "full",
"bell_on_complete": False,
"show_reasoning": False,
"show_cost": False, # Show $ cost in the status bar (off by default)
"skin": "default",
},
@ -296,6 +303,14 @@ DEFAULT_CONFIG = {
"auto_thread": True, # Auto-create threads on @mention in channels (like Slack)
},
# Approval mode for dangerous commands:
# manual — always prompt the user (default)
# smart — use auxiliary LLM to auto-approve low-risk commands, prompt for high-risk
# off — skip all approval prompts (equivalent to --yolo)
"approvals": {
"mode": "manual",
},
# Permanently allowed dangerous command patterns (added via "always" approval)
"command_allowlist": [],
# User-defined quick commands that bypass the agent loop (type: exec only)

449
hermes_cli/plugins.py Normal file
View file

@ -0,0 +1,449 @@
"""
Hermes Plugin System
====================
Discovers, loads, and manages plugins from three sources:
1. **User plugins** ``~/.hermes/plugins/<name>/``
2. **Project plugins** ``./.hermes/plugins/<name>/``
3. **Pip plugins** packages that expose the ``hermes_agent.plugins``
entry-point group.
Each directory plugin must contain a ``plugin.yaml`` manifest **and** an
``__init__.py`` with a ``register(ctx)`` function.
Lifecycle hooks
---------------
Plugins may register callbacks for any of the hooks in ``VALID_HOOKS``.
The agent core calls ``invoke_hook(name, **kwargs)`` at the appropriate
points.
Tool registration
-----------------
``PluginContext.register_tool()`` delegates to ``tools.registry.register()``
so plugin-defined tools appear alongside the built-in tools.
"""
from __future__ import annotations
import importlib
import importlib.metadata
import importlib.util
import logging
import os
import sys
import types
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Set
try:
import yaml
except ImportError: # pragma: no cover yaml is optional at import time
yaml = None # type: ignore[assignment]
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
VALID_HOOKS: Set[str] = {
"pre_tool_call",
"post_tool_call",
"pre_llm_call",
"post_llm_call",
"on_session_start",
"on_session_end",
}
ENTRY_POINTS_GROUP = "hermes_agent.plugins"
_NS_PARENT = "hermes_plugins"
# ---------------------------------------------------------------------------
# Data classes
# ---------------------------------------------------------------------------
@dataclass
class PluginManifest:
"""Parsed representation of a plugin.yaml manifest."""
name: str
version: str = ""
description: str = ""
author: str = ""
requires_env: List[str] = field(default_factory=list)
provides_tools: List[str] = field(default_factory=list)
provides_hooks: List[str] = field(default_factory=list)
source: str = "" # "user", "project", or "entrypoint"
path: Optional[str] = None
@dataclass
class LoadedPlugin:
"""Runtime state for a single loaded plugin."""
manifest: PluginManifest
module: Optional[types.ModuleType] = None
tools_registered: List[str] = field(default_factory=list)
hooks_registered: List[str] = field(default_factory=list)
enabled: bool = False
error: Optional[str] = None
# ---------------------------------------------------------------------------
# PluginContext handed to each plugin's ``register()`` function
# ---------------------------------------------------------------------------
class PluginContext:
"""Facade given to plugins so they can register tools and hooks."""
def __init__(self, manifest: PluginManifest, manager: "PluginManager"):
self.manifest = manifest
self._manager = manager
# -- tool registration --------------------------------------------------
def register_tool(
self,
name: str,
toolset: str,
schema: dict,
handler: Callable,
check_fn: Callable | None = None,
requires_env: list | None = None,
is_async: bool = False,
description: str = "",
emoji: str = "",
) -> None:
"""Register a tool in the global registry **and** track it as plugin-provided."""
from tools.registry import registry
registry.register(
name=name,
toolset=toolset,
schema=schema,
handler=handler,
check_fn=check_fn,
requires_env=requires_env,
is_async=is_async,
description=description,
emoji=emoji,
)
self._manager._plugin_tool_names.add(name)
logger.debug("Plugin %s registered tool: %s", self.manifest.name, name)
# -- hook registration --------------------------------------------------
def register_hook(self, hook_name: str, callback: Callable) -> None:
"""Register a lifecycle hook callback.
Unknown hook names produce a warning but are still stored so
forward-compatible plugins don't break.
"""
if hook_name not in VALID_HOOKS:
logger.warning(
"Plugin '%s' registered unknown hook '%s' "
"(valid: %s)",
self.manifest.name,
hook_name,
", ".join(sorted(VALID_HOOKS)),
)
self._manager._hooks.setdefault(hook_name, []).append(callback)
logger.debug("Plugin %s registered hook: %s", self.manifest.name, hook_name)
# ---------------------------------------------------------------------------
# PluginManager
# ---------------------------------------------------------------------------
class PluginManager:
"""Central manager that discovers, loads, and invokes plugins."""
def __init__(self) -> None:
self._plugins: Dict[str, LoadedPlugin] = {}
self._hooks: Dict[str, List[Callable]] = {}
self._plugin_tool_names: Set[str] = set()
self._discovered: bool = False
# -----------------------------------------------------------------------
# Public
# -----------------------------------------------------------------------
def discover_and_load(self) -> None:
"""Scan all plugin sources and load each plugin found."""
if self._discovered:
return
self._discovered = True
manifests: List[PluginManifest] = []
# 1. User plugins (~/.hermes/plugins/)
hermes_home = os.environ.get("HERMES_HOME", os.path.expanduser("~/.hermes"))
user_dir = Path(hermes_home) / "plugins"
manifests.extend(self._scan_directory(user_dir, source="user"))
# 2. Project plugins (./.hermes/plugins/)
project_dir = Path.cwd() / ".hermes" / "plugins"
manifests.extend(self._scan_directory(project_dir, source="project"))
# 3. Pip / entry-point plugins
manifests.extend(self._scan_entry_points())
# Load each manifest
for manifest in manifests:
self._load_plugin(manifest)
if manifests:
logger.info(
"Plugin discovery complete: %d found, %d enabled",
len(self._plugins),
sum(1 for p in self._plugins.values() if p.enabled),
)
# -----------------------------------------------------------------------
# Directory scanning
# -----------------------------------------------------------------------
def _scan_directory(self, path: Path, source: str) -> List[PluginManifest]:
"""Read ``plugin.yaml`` manifests from subdirectories of *path*."""
manifests: List[PluginManifest] = []
if not path.is_dir():
return manifests
for child in sorted(path.iterdir()):
if not child.is_dir():
continue
manifest_file = child / "plugin.yaml"
if not manifest_file.exists():
manifest_file = child / "plugin.yml"
if not manifest_file.exists():
logger.debug("Skipping %s (no plugin.yaml)", child)
continue
try:
if yaml is None:
logger.warning("PyYAML not installed cannot load %s", manifest_file)
continue
data = yaml.safe_load(manifest_file.read_text()) or {}
manifest = PluginManifest(
name=data.get("name", child.name),
version=str(data.get("version", "")),
description=data.get("description", ""),
author=data.get("author", ""),
requires_env=data.get("requires_env", []),
provides_tools=data.get("provides_tools", []),
provides_hooks=data.get("provides_hooks", []),
source=source,
path=str(child),
)
manifests.append(manifest)
except Exception as exc:
logger.warning("Failed to parse %s: %s", manifest_file, exc)
return manifests
# -----------------------------------------------------------------------
# Entry-point scanning
# -----------------------------------------------------------------------
def _scan_entry_points(self) -> List[PluginManifest]:
"""Check ``importlib.metadata`` for pip-installed plugins."""
manifests: List[PluginManifest] = []
try:
eps = importlib.metadata.entry_points()
# Python 3.12+ returns a SelectableGroups; earlier returns dict
if hasattr(eps, "select"):
group_eps = eps.select(group=ENTRY_POINTS_GROUP)
elif isinstance(eps, dict):
group_eps = eps.get(ENTRY_POINTS_GROUP, [])
else:
group_eps = [ep for ep in eps if ep.group == ENTRY_POINTS_GROUP]
for ep in group_eps:
manifest = PluginManifest(
name=ep.name,
source="entrypoint",
path=ep.value,
)
manifests.append(manifest)
except Exception as exc:
logger.debug("Entry-point scan failed: %s", exc)
return manifests
# -----------------------------------------------------------------------
# Loading
# -----------------------------------------------------------------------
def _load_plugin(self, manifest: PluginManifest) -> None:
"""Import a plugin module and call its ``register(ctx)`` function."""
loaded = LoadedPlugin(manifest=manifest)
try:
if manifest.source in ("user", "project"):
module = self._load_directory_module(manifest)
else:
module = self._load_entrypoint_module(manifest)
loaded.module = module
# Call register()
register_fn = getattr(module, "register", None)
if register_fn is None:
loaded.error = "no register() function"
logger.warning("Plugin '%s' has no register() function", manifest.name)
else:
ctx = PluginContext(manifest, self)
register_fn(ctx)
loaded.tools_registered = [
t for t in self._plugin_tool_names
if t not in {
n
for name, p in self._plugins.items()
for n in p.tools_registered
}
]
loaded.hooks_registered = list(
{
h
for h, cbs in self._hooks.items()
if cbs # non-empty
}
- {
h
for name, p in self._plugins.items()
for h in p.hooks_registered
}
)
loaded.enabled = True
except Exception as exc:
loaded.error = str(exc)
logger.warning("Failed to load plugin '%s': %s", manifest.name, exc)
self._plugins[manifest.name] = loaded
def _load_directory_module(self, manifest: PluginManifest) -> types.ModuleType:
"""Import a directory-based plugin as ``hermes_plugins.<name>``."""
plugin_dir = Path(manifest.path) # type: ignore[arg-type]
init_file = plugin_dir / "__init__.py"
if not init_file.exists():
raise FileNotFoundError(f"No __init__.py in {plugin_dir}")
# Ensure the namespace parent package exists
if _NS_PARENT not in sys.modules:
ns_pkg = types.ModuleType(_NS_PARENT)
ns_pkg.__path__ = [] # type: ignore[attr-defined]
ns_pkg.__package__ = _NS_PARENT
sys.modules[_NS_PARENT] = ns_pkg
module_name = f"{_NS_PARENT}.{manifest.name.replace('-', '_')}"
spec = importlib.util.spec_from_file_location(
module_name,
init_file,
submodule_search_locations=[str(plugin_dir)],
)
if spec is None or spec.loader is None:
raise ImportError(f"Cannot create module spec for {init_file}")
module = importlib.util.module_from_spec(spec)
module.__package__ = module_name
module.__path__ = [str(plugin_dir)] # type: ignore[attr-defined]
sys.modules[module_name] = module
spec.loader.exec_module(module)
return module
def _load_entrypoint_module(self, manifest: PluginManifest) -> types.ModuleType:
"""Load a pip-installed plugin via its entry-point reference."""
eps = importlib.metadata.entry_points()
if hasattr(eps, "select"):
group_eps = eps.select(group=ENTRY_POINTS_GROUP)
elif isinstance(eps, dict):
group_eps = eps.get(ENTRY_POINTS_GROUP, [])
else:
group_eps = [ep for ep in eps if ep.group == ENTRY_POINTS_GROUP]
for ep in group_eps:
if ep.name == manifest.name:
return ep.load()
raise ImportError(
f"Entry point '{manifest.name}' not found in group '{ENTRY_POINTS_GROUP}'"
)
# -----------------------------------------------------------------------
# Hook invocation
# -----------------------------------------------------------------------
def invoke_hook(self, hook_name: str, **kwargs: Any) -> None:
"""Call all registered callbacks for *hook_name*.
Each callback is wrapped in its own try/except so a misbehaving
plugin cannot break the core agent loop.
"""
callbacks = self._hooks.get(hook_name, [])
for cb in callbacks:
try:
cb(**kwargs)
except Exception as exc:
logger.warning(
"Hook '%s' callback %s raised: %s",
hook_name,
getattr(cb, "__name__", repr(cb)),
exc,
)
# -----------------------------------------------------------------------
# Introspection
# -----------------------------------------------------------------------
def list_plugins(self) -> List[Dict[str, Any]]:
"""Return a list of info dicts for all discovered plugins."""
result: List[Dict[str, Any]] = []
for name, loaded in sorted(self._plugins.items()):
result.append(
{
"name": name,
"version": loaded.manifest.version,
"description": loaded.manifest.description,
"source": loaded.manifest.source,
"enabled": loaded.enabled,
"tools": len(loaded.tools_registered),
"hooks": len(loaded.hooks_registered),
"error": loaded.error,
}
)
return result
# ---------------------------------------------------------------------------
# Module-level singleton & convenience functions
# ---------------------------------------------------------------------------
_plugin_manager: Optional[PluginManager] = None
def get_plugin_manager() -> PluginManager:
"""Return (and lazily create) the global PluginManager singleton."""
global _plugin_manager
if _plugin_manager is None:
_plugin_manager = PluginManager()
return _plugin_manager
def discover_plugins() -> None:
"""Discover and load all plugins (idempotent)."""
get_plugin_manager().discover_and_load()
def invoke_hook(hook_name: str, **kwargs: Any) -> None:
"""Invoke a lifecycle hook on all loaded plugins."""
get_plugin_manager().invoke_hook(hook_name, **kwargs)
def get_plugin_tool_names() -> Set[str]:
"""Return the set of tool names registered by plugins."""
return get_plugin_manager()._plugin_tool_names

View file

@ -113,6 +113,13 @@ try:
except Exception as e:
logger.debug("MCP tool discovery failed: %s", e)
# Plugin tool discovery (user/project/pip plugins)
try:
from hermes_cli.plugins import discover_plugins
discover_plugins()
except Exception as e:
logger.debug("Plugin discovery failed: %s", e)
# =============================================================================
# Backward-compat constants (built once after discovery)
@ -222,6 +229,16 @@ def get_tool_definitions(
for ts_name in get_all_toolsets():
tools_to_include.update(resolve_toolset(ts_name))
# Always include plugin-registered tools — they bypass the toolset filter
# because their toolsets are dynamic (created at plugin load time).
try:
from hermes_cli.plugins import get_plugin_tool_names
plugin_tools = get_plugin_tool_names()
if plugin_tools:
tools_to_include.update(plugin_tools)
except Exception:
pass
# Ask the registry for schemas (only returns tools whose check_fn passes)
filtered_tools = registry.get_definitions(tools_to_include, quiet=quiet_mode)
@ -300,25 +317,39 @@ def handle_function_call(
if function_name in _AGENT_LOOP_TOOLS:
return json.dumps({"error": f"{function_name} must be handled by the agent loop"})
try:
from hermes_cli.plugins import invoke_hook
invoke_hook("pre_tool_call", tool_name=function_name, args=function_args, task_id=task_id or "")
except Exception:
pass
if function_name == "execute_code":
# Prefer the caller-provided list so subagents can't overwrite
# the parent's tool set via the process-global.
sandbox_enabled = enabled_tools if enabled_tools is not None else _last_resolved_tool_names
return registry.dispatch(
result = registry.dispatch(
function_name, function_args,
task_id=task_id,
enabled_tools=sandbox_enabled,
honcho_manager=honcho_manager,
honcho_session_key=honcho_session_key,
)
else:
result = registry.dispatch(
function_name, function_args,
task_id=task_id,
user_task=user_task,
honcho_manager=honcho_manager,
honcho_session_key=honcho_session_key,
)
return registry.dispatch(
function_name, function_args,
task_id=task_id,
user_task=user_task,
honcho_manager=honcho_manager,
honcho_session_key=honcho_session_key,
)
try:
from hermes_cli.plugins import invoke_hook
invoke_hook("post_tool_call", tool_name=function_name, args=function_args, result=result, task_id=task_id or "")
except Exception:
pass
return result
except Exception as e:
error_msg = f"Error executing {function_name}: {str(e)}"

View file

@ -812,7 +812,7 @@ class AIAgent:
logger.debug("peer %s memory_mode=honcho: local USER.md writes disabled", _hcfg.peer_name or "user")
# Skills config: nudge interval for skill creation reminders
self._skill_nudge_interval = 15
self._skill_nudge_interval = 10
try:
from hermes_cli.config import load_config as _load_skills_config
skills_config = _load_skills_config().get("skills", {})
@ -3542,7 +3542,8 @@ class AIAgent:
flush_content = (
"[System: The session is being compressed. "
"Please save anything worth remembering to your memories.]"
"Save anything worth remembering — prioritize user preferences, "
"corrections, and recurring patterns over task-specific details.]"
)
_sentinel = f"__flush_{id(self)}_{time.monotonic()}"
flush_msg = {"role": "user", "content": flush_content, "_flush_sentinel": _sentinel}
@ -4541,8 +4542,9 @@ class AIAgent:
self._turns_since_memory += 1
if self._turns_since_memory >= self._memory_nudge_interval:
user_message += (
"\n\n[System: You've had several exchanges in this session. "
"Consider whether there's anything worth saving to your memories.]"
"\n\n[System: You've had several exchanges. Consider: "
"has the user shared preferences, corrected you, or revealed "
"something about their workflow worth remembering for future sessions?]"
)
self._turns_since_memory = 0
@ -4552,8 +4554,9 @@ class AIAgent:
and self._iters_since_skill >= self._skill_nudge_interval
and "skill_manage" in self.valid_tool_names):
user_message += (
"\n\n[System: The previous task involved many steps. "
"If you discovered a reusable workflow, consider saving it as a skill.]"
"\n\n[System: The previous task involved many tool calls. "
"Save the approach as a skill if it's reusable, or update "
"any existing skill you used if it was wrong or incomplete.]"
)
self._iters_since_skill = 0

View file

@ -26,6 +26,12 @@ def _isolate_hermes_home(tmp_path, monkeypatch):
(fake_home / "memories").mkdir()
(fake_home / "skills").mkdir()
monkeypatch.setenv("HERMES_HOME", str(fake_home))
# Reset plugin singleton so tests don't leak plugins from ~/.hermes/plugins/
try:
import hermes_cli.plugins as _plugins_mod
monkeypatch.setattr(_plugins_mod, "_plugin_manager", None)
except Exception:
pass
# Tests should not inherit the agent's current gateway/messaging surface.
# Individual tests that need gateway behavior set these explicitly.
monkeypatch.delenv("HERMES_SESSION_PLATFORM", raising=False)

View file

@ -12,7 +12,8 @@ EXPECTED_COMMANDS = {
"/personality", "/clear", "/history", "/new", "/reset", "/retry",
"/undo", "/save", "/config", "/cron", "/skills", "/platforms",
"/verbose", "/reasoning", "/compress", "/title", "/usage", "/insights", "/paste",
"/reload-mcp", "/rollback", "/background", "/skin", "/voice", "/browser", "/quit",
"/reload-mcp", "/rollback", "/stop", "/background", "/skin", "/voice", "/browser", "/quit",
"/plugins",
}

View file

@ -0,0 +1,184 @@
"""Tests for file path autocomplete in the CLI completer."""
import os
from unittest.mock import MagicMock
import pytest
from prompt_toolkit.document import Document
from prompt_toolkit.formatted_text import to_plain_text
from hermes_cli.commands import SlashCommandCompleter, _file_size_label
def _display_names(completions):
"""Extract plain-text display names from a list of Completion objects."""
return [to_plain_text(c.display) for c in completions]
def _display_metas(completions):
"""Extract plain-text display_meta from a list of Completion objects."""
return [to_plain_text(c.display_meta) if c.display_meta else "" for c in completions]
@pytest.fixture
def completer():
return SlashCommandCompleter()
class TestExtractPathWord:
def test_relative_path(self):
assert SlashCommandCompleter._extract_path_word("look at ./src/main.py") == "./src/main.py"
def test_home_path(self):
assert SlashCommandCompleter._extract_path_word("edit ~/docs/") == "~/docs/"
def test_absolute_path(self):
assert SlashCommandCompleter._extract_path_word("read /etc/hosts") == "/etc/hosts"
def test_parent_path(self):
assert SlashCommandCompleter._extract_path_word("check ../config.yaml") == "../config.yaml"
def test_path_with_slash_in_middle(self):
assert SlashCommandCompleter._extract_path_word("open src/utils/helpers.py") == "src/utils/helpers.py"
def test_plain_word_not_path(self):
assert SlashCommandCompleter._extract_path_word("hello world") is None
def test_empty_string(self):
assert SlashCommandCompleter._extract_path_word("") is None
def test_single_word_no_slash(self):
assert SlashCommandCompleter._extract_path_word("README.md") is None
def test_word_after_space(self):
assert SlashCommandCompleter._extract_path_word("fix the bug in ./tools/") == "./tools/"
def test_just_dot_slash(self):
assert SlashCommandCompleter._extract_path_word("./") == "./"
def test_just_tilde_slash(self):
assert SlashCommandCompleter._extract_path_word("~/") == "~/"
class TestPathCompletions:
def test_lists_current_directory(self, tmp_path):
(tmp_path / "file_a.py").touch()
(tmp_path / "file_b.txt").touch()
(tmp_path / "subdir").mkdir()
old_cwd = os.getcwd()
os.chdir(tmp_path)
try:
completions = list(SlashCommandCompleter._path_completions("./"))
names = _display_names(completions)
assert "file_a.py" in names
assert "file_b.txt" in names
assert "subdir/" in names
finally:
os.chdir(old_cwd)
def test_filters_by_prefix(self, tmp_path):
(tmp_path / "alpha.py").touch()
(tmp_path / "beta.py").touch()
(tmp_path / "alpha_test.py").touch()
completions = list(SlashCommandCompleter._path_completions(f"{tmp_path}/alpha"))
names = _display_names(completions)
assert "alpha.py" in names
assert "alpha_test.py" in names
assert "beta.py" not in names
def test_directories_have_trailing_slash(self, tmp_path):
(tmp_path / "mydir").mkdir()
(tmp_path / "myfile.txt").touch()
completions = list(SlashCommandCompleter._path_completions(f"{tmp_path}/"))
names = _display_names(completions)
metas = _display_metas(completions)
assert "mydir/" in names
idx = names.index("mydir/")
assert metas[idx] == "dir"
def test_home_expansion(self, tmp_path, monkeypatch):
monkeypatch.setenv("HOME", str(tmp_path))
(tmp_path / "testfile.md").touch()
completions = list(SlashCommandCompleter._path_completions("~/test"))
names = _display_names(completions)
assert "testfile.md" in names
def test_nonexistent_dir_returns_empty(self):
completions = list(SlashCommandCompleter._path_completions("/nonexistent_dir_xyz/"))
assert completions == []
def test_respects_limit(self, tmp_path):
for i in range(50):
(tmp_path / f"file_{i:03d}.txt").touch()
completions = list(SlashCommandCompleter._path_completions(f"{tmp_path}/", limit=10))
assert len(completions) == 10
def test_case_insensitive_prefix(self, tmp_path):
(tmp_path / "README.md").touch()
completions = list(SlashCommandCompleter._path_completions(f"{tmp_path}/read"))
names = _display_names(completions)
assert "README.md" in names
class TestIntegration:
"""Test the completer produces path completions via the prompt_toolkit API."""
def test_slash_commands_still_work(self, completer):
doc = Document("/hel", cursor_position=4)
event = MagicMock()
completions = list(completer.get_completions(doc, event))
names = _display_names(completions)
assert "/help" in names
def test_path_completion_triggers_on_dot_slash(self, completer, tmp_path):
(tmp_path / "test.py").touch()
old_cwd = os.getcwd()
os.chdir(tmp_path)
try:
doc = Document("edit ./te", cursor_position=9)
event = MagicMock()
completions = list(completer.get_completions(doc, event))
names = _display_names(completions)
assert "test.py" in names
finally:
os.chdir(old_cwd)
def test_no_completion_for_plain_words(self, completer):
doc = Document("hello world", cursor_position=11)
event = MagicMock()
completions = list(completer.get_completions(doc, event))
assert completions == []
def test_absolute_path_triggers_completion(self, completer):
doc = Document("check /etc/hos", cursor_position=14)
event = MagicMock()
completions = list(completer.get_completions(doc, event))
names = _display_names(completions)
# /etc/hosts should exist on Linux
assert any("host" in n.lower() for n in names)
class TestFileSizeLabel:
def test_bytes(self, tmp_path):
f = tmp_path / "small.txt"
f.write_text("hi")
assert _file_size_label(str(f)) == "2B"
def test_kilobytes(self, tmp_path):
f = tmp_path / "medium.txt"
f.write_bytes(b"x" * 2048)
assert _file_size_label(str(f)) == "2K"
def test_megabytes(self, tmp_path):
f = tmp_path / "large.bin"
f.write_bytes(b"x" * (2 * 1024 * 1024))
assert _file_size_label(str(f)) == "2.0M"
def test_nonexistent(self):
assert _file_size_label("/nonexistent_xyz") == ""

View file

@ -65,24 +65,39 @@ class TestCLIStatusBar:
assert "claude-sonnet-4-20250514" in text
assert "12.4K/200K" in text
assert "6%" in text
assert "$0.06" in text
assert "$0.06" not in text # cost hidden by default
assert "15m" in text
def test_build_status_bar_text_shows_cost_when_enabled(self):
cli_obj = _attach_agent(
_make_cli(),
prompt_tokens=10000,
completion_tokens=2400,
total_tokens=12400,
api_calls=7,
context_tokens=12400,
context_length=200_000,
)
cli_obj.show_cost = True
text = cli_obj._build_status_bar_text(width=120)
assert "$" in text # cost is shown when enabled
def test_build_status_bar_text_collapses_for_narrow_terminal(self):
cli_obj = _attach_agent(
_make_cli(),
prompt_tokens=10_230,
completion_tokens=2_220,
total_tokens=12_450,
prompt_tokens=10000,
completion_tokens=2400,
total_tokens=12400,
api_calls=7,
context_tokens=12_450,
context_tokens=12400,
context_length=200_000,
)
text = cli_obj._build_status_bar_text(width=60)
assert "" in text
assert "$0.06" in text
assert "$0.06" not in text # cost hidden by default
assert "15m" in text
assert "200K" not in text

340
tests/test_plugins.py Normal file
View file

@ -0,0 +1,340 @@
"""Tests for the Hermes plugin system (hermes_cli.plugins)."""
import logging
import os
import sys
import types
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
import yaml
from hermes_cli.plugins import (
ENTRY_POINTS_GROUP,
VALID_HOOKS,
LoadedPlugin,
PluginContext,
PluginManager,
PluginManifest,
get_plugin_manager,
get_plugin_tool_names,
discover_plugins,
invoke_hook,
)
# ── Helpers ────────────────────────────────────────────────────────────────
def _make_plugin_dir(base: Path, name: str, *, register_body: str = "pass",
manifest_extra: dict | None = None) -> Path:
"""Create a minimal plugin directory with plugin.yaml + __init__.py."""
plugin_dir = base / name
plugin_dir.mkdir(parents=True, exist_ok=True)
manifest = {"name": name, "version": "0.1.0", "description": f"Test plugin {name}"}
if manifest_extra:
manifest.update(manifest_extra)
(plugin_dir / "plugin.yaml").write_text(yaml.dump(manifest))
(plugin_dir / "__init__.py").write_text(
f"def register(ctx):\n {register_body}\n"
)
return plugin_dir
# ── TestPluginDiscovery ────────────────────────────────────────────────────
class TestPluginDiscovery:
"""Tests for plugin discovery from directories and entry points."""
def test_discover_user_plugins(self, tmp_path, monkeypatch):
"""Plugins in ~/.hermes/plugins/ are discovered."""
plugins_dir = tmp_path / "hermes_test" / "plugins"
_make_plugin_dir(plugins_dir, "hello_plugin")
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes_test"))
mgr = PluginManager()
mgr.discover_and_load()
assert "hello_plugin" in mgr._plugins
assert mgr._plugins["hello_plugin"].enabled
def test_discover_project_plugins(self, tmp_path, monkeypatch):
"""Plugins in ./.hermes/plugins/ are discovered."""
project_dir = tmp_path / "project"
project_dir.mkdir()
monkeypatch.chdir(project_dir)
plugins_dir = project_dir / ".hermes" / "plugins"
_make_plugin_dir(plugins_dir, "proj_plugin")
mgr = PluginManager()
mgr.discover_and_load()
assert "proj_plugin" in mgr._plugins
assert mgr._plugins["proj_plugin"].enabled
def test_discover_is_idempotent(self, tmp_path, monkeypatch):
"""Calling discover_and_load() twice does not duplicate plugins."""
plugins_dir = tmp_path / "hermes_test" / "plugins"
_make_plugin_dir(plugins_dir, "once_plugin")
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes_test"))
mgr = PluginManager()
mgr.discover_and_load()
mgr.discover_and_load() # second call should no-op
assert len(mgr._plugins) == 1
def test_discover_skips_dir_without_manifest(self, tmp_path, monkeypatch):
"""Directories without plugin.yaml are silently skipped."""
plugins_dir = tmp_path / "hermes_test" / "plugins"
(plugins_dir / "no_manifest").mkdir(parents=True)
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes_test"))
mgr = PluginManager()
mgr.discover_and_load()
assert len(mgr._plugins) == 0
def test_entry_points_scanned(self, tmp_path, monkeypatch):
"""Entry-point based plugins are discovered (mocked)."""
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes_test"))
fake_module = types.ModuleType("fake_ep_plugin")
fake_module.register = lambda ctx: None # type: ignore[attr-defined]
fake_ep = MagicMock()
fake_ep.name = "ep_plugin"
fake_ep.value = "fake_ep_plugin:register"
fake_ep.group = ENTRY_POINTS_GROUP
fake_ep.load.return_value = fake_module
def fake_entry_points():
result = MagicMock()
result.select = MagicMock(return_value=[fake_ep])
return result
with patch("importlib.metadata.entry_points", fake_entry_points):
mgr = PluginManager()
mgr.discover_and_load()
assert "ep_plugin" in mgr._plugins
# ── TestPluginLoading ──────────────────────────────────────────────────────
class TestPluginLoading:
"""Tests for plugin module loading."""
def test_load_missing_init(self, tmp_path, monkeypatch):
"""Plugin dir without __init__.py records an error."""
plugins_dir = tmp_path / "hermes_test" / "plugins"
plugin_dir = plugins_dir / "bad_plugin"
plugin_dir.mkdir(parents=True)
(plugin_dir / "plugin.yaml").write_text(yaml.dump({"name": "bad_plugin"}))
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes_test"))
mgr = PluginManager()
mgr.discover_and_load()
assert "bad_plugin" in mgr._plugins
assert not mgr._plugins["bad_plugin"].enabled
assert mgr._plugins["bad_plugin"].error is not None
def test_load_missing_register_fn(self, tmp_path, monkeypatch):
"""Plugin without register() function records an error."""
plugins_dir = tmp_path / "hermes_test" / "plugins"
plugin_dir = plugins_dir / "no_reg"
plugin_dir.mkdir(parents=True)
(plugin_dir / "plugin.yaml").write_text(yaml.dump({"name": "no_reg"}))
(plugin_dir / "__init__.py").write_text("# no register function\n")
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes_test"))
mgr = PluginManager()
mgr.discover_and_load()
assert "no_reg" in mgr._plugins
assert not mgr._plugins["no_reg"].enabled
assert "no register()" in mgr._plugins["no_reg"].error
def test_load_registers_namespace_module(self, tmp_path, monkeypatch):
"""Directory plugins are importable under hermes_plugins.<name>."""
plugins_dir = tmp_path / "hermes_test" / "plugins"
_make_plugin_dir(plugins_dir, "ns_plugin")
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes_test"))
# Clean up any prior namespace module
sys.modules.pop("hermes_plugins.ns_plugin", None)
mgr = PluginManager()
mgr.discover_and_load()
assert "hermes_plugins.ns_plugin" in sys.modules
# ── TestPluginHooks ────────────────────────────────────────────────────────
class TestPluginHooks:
"""Tests for lifecycle hook registration and invocation."""
def test_register_and_invoke_hook(self, tmp_path, monkeypatch):
"""Registered hooks are called on invoke_hook()."""
plugins_dir = tmp_path / "hermes_test" / "plugins"
_make_plugin_dir(
plugins_dir, "hook_plugin",
register_body='ctx.register_hook("pre_tool_call", lambda **kw: None)',
)
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes_test"))
mgr = PluginManager()
mgr.discover_and_load()
# Should not raise
mgr.invoke_hook("pre_tool_call", tool_name="test", args={}, task_id="t1")
def test_hook_exception_does_not_propagate(self, tmp_path, monkeypatch):
"""A hook callback that raises does NOT crash the caller."""
plugins_dir = tmp_path / "hermes_test" / "plugins"
_make_plugin_dir(
plugins_dir, "bad_hook",
register_body='ctx.register_hook("post_tool_call", lambda **kw: 1/0)',
)
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes_test"))
mgr = PluginManager()
mgr.discover_and_load()
# Should not raise despite 1/0
mgr.invoke_hook("post_tool_call", tool_name="x", args={}, result="r", task_id="")
def test_invalid_hook_name_warns(self, tmp_path, monkeypatch, caplog):
"""Registering an unknown hook name logs a warning."""
plugins_dir = tmp_path / "hermes_test" / "plugins"
_make_plugin_dir(
plugins_dir, "warn_plugin",
register_body='ctx.register_hook("on_banana", lambda **kw: None)',
)
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes_test"))
with caplog.at_level(logging.WARNING, logger="hermes_cli.plugins"):
mgr = PluginManager()
mgr.discover_and_load()
assert any("on_banana" in record.message for record in caplog.records)
# ── TestPluginContext ──────────────────────────────────────────────────────
class TestPluginContext:
"""Tests for the PluginContext facade."""
def test_register_tool_adds_to_registry(self, tmp_path, monkeypatch):
"""PluginContext.register_tool() puts the tool in the global registry."""
plugins_dir = tmp_path / "hermes_test" / "plugins"
plugin_dir = plugins_dir / "tool_plugin"
plugin_dir.mkdir(parents=True)
(plugin_dir / "plugin.yaml").write_text(yaml.dump({"name": "tool_plugin"}))
(plugin_dir / "__init__.py").write_text(
'def register(ctx):\n'
' ctx.register_tool(\n'
' name="plugin_echo",\n'
' toolset="plugin_tool_plugin",\n'
' schema={"name": "plugin_echo", "description": "Echo", "parameters": {"type": "object", "properties": {}}},\n'
' handler=lambda args, **kw: "echo",\n'
' )\n'
)
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes_test"))
mgr = PluginManager()
mgr.discover_and_load()
assert "plugin_echo" in mgr._plugin_tool_names
from tools.registry import registry
assert "plugin_echo" in registry._tools
# ── TestPluginToolVisibility ───────────────────────────────────────────────
class TestPluginToolVisibility:
"""Plugin-registered tools appear in get_tool_definitions()."""
def test_plugin_tools_in_definitions(self, tmp_path, monkeypatch):
"""Tools from plugins bypass the toolset filter."""
import hermes_cli.plugins as plugins_mod
plugins_dir = tmp_path / "hermes_test" / "plugins"
plugin_dir = plugins_dir / "vis_plugin"
plugin_dir.mkdir(parents=True)
(plugin_dir / "plugin.yaml").write_text(yaml.dump({"name": "vis_plugin"}))
(plugin_dir / "__init__.py").write_text(
'def register(ctx):\n'
' ctx.register_tool(\n'
' name="vis_tool",\n'
' toolset="plugin_vis_plugin",\n'
' schema={"name": "vis_tool", "description": "Visible", "parameters": {"type": "object", "properties": {}}},\n'
' handler=lambda args, **kw: "ok",\n'
' )\n'
)
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes_test"))
mgr = PluginManager()
mgr.discover_and_load()
monkeypatch.setattr(plugins_mod, "_plugin_manager", mgr)
from model_tools import get_tool_definitions
tools = get_tool_definitions(enabled_toolsets=["terminal"], quiet_mode=True)
tool_names = [t["function"]["name"] for t in tools]
assert "vis_tool" in tool_names
# ── TestPluginManagerList ──────────────────────────────────────────────────
class TestPluginManagerList:
"""Tests for PluginManager.list_plugins()."""
def test_list_empty(self):
"""Empty manager returns empty list."""
mgr = PluginManager()
assert mgr.list_plugins() == []
def test_list_returns_sorted(self, tmp_path, monkeypatch):
"""list_plugins() returns results sorted by name."""
plugins_dir = tmp_path / "hermes_test" / "plugins"
_make_plugin_dir(plugins_dir, "zulu")
_make_plugin_dir(plugins_dir, "alpha")
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes_test"))
mgr = PluginManager()
mgr.discover_and_load()
listing = mgr.list_plugins()
names = [p["name"] for p in listing]
assert names == sorted(names)
def test_list_with_plugins(self, tmp_path, monkeypatch):
"""list_plugins() returns info dicts for each discovered plugin."""
plugins_dir = tmp_path / "hermes_test" / "plugins"
_make_plugin_dir(plugins_dir, "alpha")
_make_plugin_dir(plugins_dir, "beta")
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes_test"))
mgr = PluginManager()
mgr.discover_and_load()
listing = mgr.list_plugins()
names = [p["name"] for p in listing]
assert "alpha" in names
assert "beta" in names
for p in listing:
assert "enabled" in p
assert "tools" in p
assert "hooks" in p

View file

@ -4,6 +4,7 @@ This module is the single source of truth for the dangerous command system:
- Pattern detection (DANGEROUS_PATTERNS, detect_dangerous_command)
- Per-session approval state (thread-safe, keyed by session_key)
- Approval prompting (CLI interactive + gateway async)
- Smart approval via auxiliary LLM (auto-approve low-risk commands)
- Permanent allowlist persistence (config.yaml)
"""
@ -283,6 +284,68 @@ def prompt_dangerous_approval(command: str, description: str,
sys.stdout.flush()
def _get_approval_mode() -> str:
"""Read the approval mode from config. Returns 'manual', 'smart', or 'off'."""
try:
from hermes_cli.config import load_config
config = load_config()
return config.get("approvals", {}).get("mode", "manual")
except Exception:
return "manual"
def _smart_approve(command: str, description: str) -> str:
"""Use the auxiliary LLM to assess risk and decide approval.
Returns 'approve' if the LLM determines the command is safe,
'deny' if genuinely dangerous, or 'escalate' if uncertain.
Inspired by OpenAI Codex's Smart Approvals guardian subagent
(openai/codex#13860).
"""
try:
from agent.auxiliary_client import get_text_auxiliary_client, auxiliary_max_tokens_param
client, model = get_text_auxiliary_client(task="approval")
if not client or not model:
logger.debug("Smart approvals: no aux client available, escalating")
return "escalate"
prompt = f"""You are a security reviewer for an AI coding agent. A terminal command was flagged by pattern matching as potentially dangerous.
Command: {command}
Flagged reason: {description}
Assess the ACTUAL risk of this command. Many flagged commands are false positives for example, `python -c "print('hello')"` is flagged as "script execution via -c flag" but is completely harmless.
Rules:
- APPROVE if the command is clearly safe (benign script execution, safe file operations, development tools, package installs, git operations, etc.)
- DENY if the command could genuinely damage the system (recursive delete of important paths, overwriting system files, fork bombs, wiping disks, dropping databases, etc.)
- ESCALATE if you're uncertain
Respond with exactly one word: APPROVE, DENY, or ESCALATE"""
response = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
**auxiliary_max_tokens_param(16),
temperature=0,
)
answer = (response.choices[0].message.content or "").strip().upper()
if "APPROVE" in answer:
return "approve"
elif "DENY" in answer:
return "deny"
else:
return "escalate"
except Exception as e:
logger.debug("Smart approvals: LLM call failed (%s), escalating", e)
return "escalate"
def check_dangerous_command(command: str, env_type: str,
approval_callback=None) -> dict:
"""Check if a command is dangerous and handle approval.
@ -372,8 +435,9 @@ def check_all_command_guards(command: str, env_type: str,
if env_type in ("docker", "singularity", "modal", "daytona"):
return {"approved": True, "message": None}
# --yolo: bypass all approval prompts and pre-exec guard checks
if os.getenv("HERMES_YOLO_MODE"):
# --yolo or approvals.mode=off: bypass all approval prompts
approval_mode = _get_approval_mode()
if os.getenv("HERMES_YOLO_MODE") or approval_mode == "off":
return {"approved": True, "message": None}
is_cli = os.getenv("HERMES_INTERACTIVE")
@ -430,6 +494,31 @@ def check_all_command_guards(command: str, env_type: str,
if not warnings:
return {"approved": True, "message": None}
# --- Phase 2.5: Smart approval (auxiliary LLM risk assessment) ---
# When approvals.mode=smart, ask the aux LLM before prompting the user.
# Inspired by OpenAI Codex's Smart Approvals guardian subagent
# (openai/codex#13860).
if approval_mode == "smart":
combined_desc_for_llm = "; ".join(desc for _, desc, _ in warnings)
verdict = _smart_approve(command, combined_desc_for_llm)
if verdict == "approve":
# Auto-approve and grant session-level approval for these patterns
for key, _, _ in warnings:
approve_session(session_key, key)
logger.debug("Smart approval: auto-approved '%s' (%s)",
command[:60], combined_desc_for_llm)
return {"approved": True, "message": None,
"smart_approved": True}
elif verdict == "deny":
combined_desc_for_llm = "; ".join(desc for _, desc, _ in warnings)
return {
"approved": False,
"message": f"BLOCKED by smart approval: {combined_desc_for_llm}. "
"The command was assessed as genuinely dangerous. Do NOT retry.",
"smart_denied": True,
}
# verdict == "escalate" → fall through to manual prompt
# --- Phase 3: Approval ---
# Combine descriptions for a single approval prompt

View file

@ -439,11 +439,13 @@ MEMORY_SCHEMA = {
"Memory is injected into future turns, so keep it compact and focused on facts "
"that will still matter later.\n\n"
"WHEN TO SAVE (do this proactively, don't wait to be asked):\n"
"- User corrects you or says 'remember this' / 'don't do that again'\n"
"- User shares a preference, habit, or personal detail (name, role, timezone, coding style)\n"
"- You discover something about the environment (OS, installed tools, project structure)\n"
"- User corrects you or says 'remember this' / 'don't do that again'\n"
"- You learn a convention, API quirk, or workflow specific to this user's setup\n"
"- You identify a stable fact that will be useful again in future sessions\n\n"
"PRIORITY: User preferences and corrections > environment facts > procedural knowledge. "
"The most valuable memory prevents the user from having to repeat themselves.\n\n"
"Do NOT save task progress, session outcomes, completed-work logs, or temporary TODO "
"state to memory; use session_search to recall those from past transcripts.\n"
"If you've discovered a new way to do something, solved a problem that could be "

View file

@ -561,7 +561,8 @@ SKILL_MANAGE_SCHEMA = {
"user-corrected approach worked, non-trivial workflow discovered, "
"or user asks you to remember a procedure.\n"
"Update when: instructions stale/wrong, OS-specific failures, "
"missing steps or pitfalls found during use.\n\n"
"missing steps or pitfalls found during use. "
"If you used a skill and hit issues not covered by it, patch it immediately.\n\n"
"After difficult/iterative tasks, offer to save as a skill. "
"Skip for simple one-offs. Confirm with user before creating/deleting.\n\n"
"Good skills: trigger conditions, numbered steps with exact commands, "

View file

@ -0,0 +1,438 @@
---
sidebar_position: 10
---
# Build a Hermes Plugin
This guide walks through building a complete Hermes plugin from scratch. By the end you'll have a working plugin with multiple tools, lifecycle hooks, shipped data files, and a bundled skill — everything the plugin system supports.
## What you're building
A **calculator** plugin with two tools:
- `calculate` — evaluate math expressions (`2**16`, `sqrt(144)`, `pi * 5**2`)
- `unit_convert` — convert between units (`100 F → 37.78 C`, `5 km → 3.11 mi`)
Plus a hook that logs every tool call, and a bundled skill file.
## Step 1: Create the plugin directory
```bash
mkdir -p ~/.hermes/plugins/calculator
cd ~/.hermes/plugins/calculator
```
## Step 2: Write the manifest
Create `plugin.yaml`:
```yaml
name: calculator
version: 1.0.0
description: Math calculator — evaluate expressions and convert units
provides:
tools: true
hooks: true
```
This tells Hermes: "I'm a plugin called calculator, I provide tools and hooks." That's all the manifest needs.
Optional fields you could add:
```yaml
author: Your Name
requires_env: # gate loading on env vars
- SOME_API_KEY # plugin disabled if missing
```
## Step 3: Write the tool schemas
Create `schemas.py` — this is what the LLM reads to decide when to call your tools:
```python
"""Tool schemas — what the LLM sees."""
CALCULATE = {
"name": "calculate",
"description": (
"Evaluate a mathematical expression and return the result. "
"Supports arithmetic (+, -, *, /, **), functions (sqrt, sin, cos, "
"log, abs, round, floor, ceil), and constants (pi, e). "
"Use this for any math the user asks about."
),
"parameters": {
"type": "object",
"properties": {
"expression": {
"type": "string",
"description": "Math expression to evaluate (e.g., '2**10', 'sqrt(144)')",
},
},
"required": ["expression"],
},
}
UNIT_CONVERT = {
"name": "unit_convert",
"description": (
"Convert a value between units. Supports length (m, km, mi, ft, in), "
"weight (kg, lb, oz, g), temperature (C, F, K), data (B, KB, MB, GB, TB), "
"and time (s, min, hr, day)."
),
"parameters": {
"type": "object",
"properties": {
"value": {
"type": "number",
"description": "The numeric value to convert",
},
"from_unit": {
"type": "string",
"description": "Source unit (e.g., 'km', 'lb', 'F', 'GB')",
},
"to_unit": {
"type": "string",
"description": "Target unit (e.g., 'mi', 'kg', 'C', 'MB')",
},
},
"required": ["value", "from_unit", "to_unit"],
},
}
```
**Why schemas matter:** The `description` field is how the LLM decides when to use your tool. Be specific about what it does and when to use it. The `parameters` define what arguments the LLM passes.
## Step 4: Write the tool handlers
Create `tools.py` — this is the code that actually executes when the LLM calls your tools:
```python
"""Tool handlers — the code that runs when the LLM calls each tool."""
import json
import math
# Safe globals for expression evaluation — no file/network access
_SAFE_MATH = {
"abs": abs, "round": round, "min": min, "max": max,
"pow": pow, "sqrt": math.sqrt, "sin": math.sin, "cos": math.cos,
"tan": math.tan, "log": math.log, "log2": math.log2, "log10": math.log10,
"floor": math.floor, "ceil": math.ceil,
"pi": math.pi, "e": math.e,
"factorial": math.factorial,
}
def calculate(args: dict, **kwargs) -> str:
"""Evaluate a math expression safely.
Rules for handlers:
1. Receive args (dict) — the parameters the LLM passed
2. Do the work
3. Return a JSON string — ALWAYS, even on error
4. Accept **kwargs for forward compatibility
"""
expression = args.get("expression", "").strip()
if not expression:
return json.dumps({"error": "No expression provided"})
try:
result = eval(expression, {"__builtins__": {}}, _SAFE_MATH)
return json.dumps({"expression": expression, "result": result})
except ZeroDivisionError:
return json.dumps({"expression": expression, "error": "Division by zero"})
except Exception as e:
return json.dumps({"expression": expression, "error": f"Invalid: {e}"})
# Conversion tables — values are in base units
_LENGTH = {"m": 1, "km": 1000, "mi": 1609.34, "ft": 0.3048, "in": 0.0254, "cm": 0.01}
_WEIGHT = {"kg": 1, "g": 0.001, "lb": 0.453592, "oz": 0.0283495}
_DATA = {"B": 1, "KB": 1024, "MB": 1024**2, "GB": 1024**3, "TB": 1024**4}
_TIME = {"s": 1, "ms": 0.001, "min": 60, "hr": 3600, "day": 86400}
def _convert_temp(value, from_u, to_u):
# Normalize to Celsius
c = {"F": (value - 32) * 5/9, "K": value - 273.15}.get(from_u, value)
# Convert to target
return {"F": c * 9/5 + 32, "K": c + 273.15}.get(to_u, c)
def unit_convert(args: dict, **kwargs) -> str:
"""Convert between units."""
value = args.get("value")
from_unit = args.get("from_unit", "").strip()
to_unit = args.get("to_unit", "").strip()
if value is None or not from_unit or not to_unit:
return json.dumps({"error": "Need value, from_unit, and to_unit"})
try:
# Temperature
if from_unit.upper() in {"C","F","K"} and to_unit.upper() in {"C","F","K"}:
result = _convert_temp(float(value), from_unit.upper(), to_unit.upper())
return json.dumps({"input": f"{value} {from_unit}", "result": round(result, 4),
"output": f"{round(result, 4)} {to_unit}"})
# Ratio-based conversions
for table in (_LENGTH, _WEIGHT, _DATA, _TIME):
lc = {k.lower(): v for k, v in table.items()}
if from_unit.lower() in lc and to_unit.lower() in lc:
result = float(value) * lc[from_unit.lower()] / lc[to_unit.lower()]
return json.dumps({"input": f"{value} {from_unit}",
"result": round(result, 6),
"output": f"{round(result, 6)} {to_unit}"})
return json.dumps({"error": f"Cannot convert {from_unit} → {to_unit}"})
except Exception as e:
return json.dumps({"error": f"Conversion failed: {e}"})
```
**Key rules for handlers:**
1. **Signature:** `def my_handler(args: dict, **kwargs) -> str`
2. **Return:** Always a JSON string. Success and errors alike.
3. **Never raise:** Catch all exceptions, return error JSON instead.
4. **Accept `**kwargs`:** Hermes may pass additional context in the future.
## Step 5: Write the registration
Create `__init__.py` — this wires schemas to handlers:
```python
"""Calculator plugin — registration."""
import logging
from . import schemas, tools
logger = logging.getLogger(__name__)
# Track tool usage via hooks
_call_log = []
def _on_post_tool_call(tool_name, args, result, task_id, **kwargs):
"""Hook: runs after every tool call (not just ours)."""
_call_log.append({"tool": tool_name, "session": task_id})
if len(_call_log) > 100:
_call_log.pop(0)
logger.debug("Tool called: %s (session %s)", tool_name, task_id)
def register(ctx):
"""Wire schemas to handlers and register hooks."""
ctx.register_tool(name="calculate", toolset="calculator",
schema=schemas.CALCULATE, handler=tools.calculate)
ctx.register_tool(name="unit_convert", toolset="calculator",
schema=schemas.UNIT_CONVERT, handler=tools.unit_convert)
# This hook fires for ALL tool calls, not just ours
ctx.register_hook("post_tool_call", _on_post_tool_call)
```
**What `register()` does:**
- Called exactly once at startup
- `ctx.register_tool()` puts your tool in the registry — the model sees it immediately
- `ctx.register_hook()` subscribes to lifecycle events
- If this function crashes, the plugin is disabled but Hermes continues fine
## Step 6: Test it
Start Hermes:
```bash
hermes
```
You should see `calculator: calculate, unit_convert` in the banner's tool list.
Try these prompts:
```
What's 2 to the power of 16?
Convert 100 fahrenheit to celsius
What's the square root of 2 times pi?
How many gigabytes is 1.5 terabytes?
```
Check plugin status:
```
/plugins
```
Output:
```
Plugins (1):
✓ calculator v1.0.0 (2 tools, 1 hooks)
```
## Your plugin's final structure
```
~/.hermes/plugins/calculator/
├── plugin.yaml # "I'm calculator, I provide tools and hooks"
├── __init__.py # Wiring: schemas → handlers, register hooks
├── schemas.py # What the LLM reads (descriptions + parameter specs)
└── tools.py # What runs (calculate, unit_convert functions)
```
Four files, clear separation:
- **Manifest** declares what the plugin is
- **Schemas** describe tools for the LLM
- **Handlers** implement the actual logic
- **Registration** connects everything
## What else can plugins do?
### Ship data files
Put any files in your plugin directory and read them at import time:
```python
# In tools.py or __init__.py
from pathlib import Path
_PLUGIN_DIR = Path(__file__).parent
_DATA_FILE = _PLUGIN_DIR / "data" / "languages.yaml"
with open(_DATA_FILE) as f:
_DATA = yaml.safe_load(f)
```
### Bundle a skill
Include a `skill.md` file and install it during registration:
```python
import shutil
from pathlib import Path
def _install_skill():
"""Copy our skill to ~/.hermes/skills/ on first load."""
try:
from hermes_cli.config import get_hermes_home
dest = get_hermes_home() / "skills" / "my-plugin" / "SKILL.md"
except Exception:
dest = Path.home() / ".hermes" / "skills" / "my-plugin" / "SKILL.md"
if dest.exists():
return # don't overwrite user edits
source = Path(__file__).parent / "skill.md"
if source.exists():
dest.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(source, dest)
def register(ctx):
ctx.register_tool(...)
_install_skill()
```
### Gate on environment variables
If your plugin needs an API key:
```yaml
# plugin.yaml
requires_env:
- WEATHER_API_KEY
```
If `WEATHER_API_KEY` isn't set, the plugin is disabled with a clear message. No crash, no error in the agent — just "Plugin weather disabled (missing: WEATHER_API_KEY)".
### Conditional tool availability
For tools that depend on optional libraries:
```python
ctx.register_tool(
name="my_tool",
schema={...},
handler=my_handler,
check_fn=lambda: _has_optional_lib(), # False = tool hidden from model
)
```
### Register multiple hooks
```python
def register(ctx):
ctx.register_hook("pre_tool_call", before_any_tool)
ctx.register_hook("post_tool_call", after_any_tool)
ctx.register_hook("on_session_start", on_new_session)
ctx.register_hook("on_session_end", on_session_end)
```
Available hooks:
| Hook | When | Arguments |
|------|------|-----------|
| `pre_tool_call` | Before any tool runs | `tool_name`, `args`, `task_id` |
| `post_tool_call` | After any tool returns | `tool_name`, `args`, `result`, `task_id` |
| `pre_llm_call` | Before LLM API call | `messages`, `model` |
| `post_llm_call` | After LLM response | `messages`, `response`, `model` |
| `on_session_start` | Session begins | `session_id`, `platform` |
| `on_session_end` | Session ends | `session_id`, `platform` |
Hooks are observers — they can't modify arguments or return values. If a hook crashes, it's logged and skipped; other hooks and the tool continue normally.
### Distribute via pip
For sharing plugins publicly, add an entry point to your Python package:
```toml
# pyproject.toml
[project.entry-points."hermes_agent.plugins"]
my-plugin = "my_plugin_package"
```
```bash
pip install hermes-plugin-calculator
# Plugin auto-discovered on next hermes startup
```
## Common mistakes
**Handler doesn't return JSON string:**
```python
# Wrong — returns a dict
def handler(args, **kwargs):
return {"result": 42}
# Right — returns a JSON string
def handler(args, **kwargs):
return json.dumps({"result": 42})
```
**Missing `**kwargs` in handler signature:**
```python
# Wrong — will break if Hermes passes extra context
def handler(args):
...
# Right
def handler(args, **kwargs):
...
```
**Handler raises exceptions:**
```python
# Wrong — exception propagates, tool call fails
def handler(args, **kwargs):
result = 1 / int(args["value"]) # ZeroDivisionError!
return json.dumps({"result": result})
# Right — catch and return error JSON
def handler(args, **kwargs):
try:
result = 1 / int(args.get("value", 0))
return json.dumps({"result": result})
except Exception as e:
return json.dumps({"error": str(e)})
```
**Schema description too vague:**
```python
# Bad — model doesn't know when to use it
"description": "Does stuff"
# Good — model knows exactly when and how
"description": "Evaluate a mathematical expression. Use for arithmetic, trig, logarithms. Supports: +, -, *, /, **, sqrt, sin, cos, log, pi, e."
```

View file

@ -0,0 +1,62 @@
---
sidebar_position: 20
---
# Plugins
Hermes has a plugin system for adding custom tools, hooks, and integrations without modifying core code.
**→ [Build a Hermes Plugin](/docs/guides/build-a-hermes-plugin)** — step-by-step guide with a complete working example.
## Quick overview
Drop a directory into `~/.hermes/plugins/` with a `plugin.yaml` and Python code:
```
~/.hermes/plugins/my-plugin/
├── plugin.yaml # manifest
├── __init__.py # register() — wires schemas to handlers
├── schemas.py # tool schemas (what the LLM sees)
└── tools.py # tool handlers (what runs when called)
```
Start Hermes — your tools appear alongside built-in tools. The model can call them immediately.
## What plugins can do
| Capability | How |
|-----------|-----|
| Add tools | `ctx.register_tool(name, schema, handler)` |
| Add hooks | `ctx.register_hook("post_tool_call", callback)` |
| Ship data files | `Path(__file__).parent / "data" / "file.yaml"` |
| Bundle skills | Copy `skill.md` to `~/.hermes/skills/` at load time |
| Gate on env vars | `requires_env: [API_KEY]` in plugin.yaml |
| Distribute via pip | `[project.entry-points."hermes_agent.plugins"]` |
## Plugin discovery
| Source | Path | Use case |
|--------|------|----------|
| User | `~/.hermes/plugins/` | Personal plugins |
| Project | `.hermes/plugins/` | Project-specific plugins |
| pip | `hermes_agent.plugins` entry_points | Distributed packages |
## Available hooks
| Hook | Fires when |
|------|-----------|
| `pre_tool_call` | Before any tool executes |
| `post_tool_call` | After any tool returns |
| `pre_llm_call` | Before LLM API request |
| `post_llm_call` | After LLM API response |
| `on_session_start` | Session begins |
| `on_session_end` | Session ends |
## Managing plugins
```
/plugins # list loaded plugins in a session
hermes config set display.show_cost true # show cost in status bar
```
See the **[full guide](/docs/guides/build-a-hermes-plugin)** for handler contracts, schema format, hook behavior, error handling, and common mistakes.