merge: resolve conflicts with main (URL update to hermes-agent.nousresearch.com)

This commit is contained in:
teknium1 2026-03-12 17:49:26 -07:00
commit e976879cf2
29 changed files with 3110 additions and 115 deletions

View file

@ -55,6 +55,7 @@ hermes tools # Configure which tools are enabled
hermes config set # Set individual config values hermes config set # Set individual config values
hermes gateway # Start the messaging gateway (Telegram, Discord, etc.) hermes gateway # Start the messaging gateway (Telegram, Discord, etc.)
hermes setup # Run the full setup wizard (configures everything at once) hermes setup # Run the full setup wizard (configures everything at once)
hermes claw migrate # Migrate from OpenClaw (if coming from OpenClaw)
hermes update # Update to the latest version hermes update # Update to the latest version
hermes doctor # Diagnose any issues hermes doctor # Diagnose any issues
``` ```
@ -87,6 +88,35 @@ All documentation lives at **[hermes-agent.nousresearch.com/docs](https://hermes
--- ---
## Migrating from OpenClaw
If you're coming from OpenClaw, Hermes can automatically import your settings, memories, skills, and API keys.
**During first-time setup:** The setup wizard (`hermes setup`) automatically detects `~/.openclaw` and offers to migrate before configuration begins.
**Anytime after install:**
```bash
hermes claw migrate # Interactive migration (full preset)
hermes claw migrate --dry-run # Preview what would be migrated
hermes claw migrate --preset user-data # Migrate without secrets
hermes claw migrate --overwrite # Overwrite existing conflicts
```
What gets imported:
- **SOUL.md** — persona file
- **Memories** — MEMORY.md and USER.md entries
- **Skills** — user-created skills → `~/.hermes/skills/openclaw-imports/`
- **Command allowlist** — approval patterns
- **Messaging settings** — platform configs, allowed users, working directory
- **API keys** — allowlisted secrets (Telegram, OpenRouter, OpenAI, Anthropic, ElevenLabs)
- **TTS assets** — workspace audio files
- **Workspace instructions** — AGENTS.md (with `--workspace-target`)
See `hermes claw migrate --help` for all options, or use the `openclaw-migration` skill for an interactive agent-guided migration with dry-run previews.
---
## Contributing ## Contributing
We welcome contributions! See the [Contributing Guide](https://hermes-agent.nousresearch.com/docs/developer-guide/contributing) for development setup, code style, and PR process. We welcome contributions! See the [Contributing Guide](https://hermes-agent.nousresearch.com/docs/developer-guide/contributing) for development setup, code style, and PR process.

View file

@ -56,7 +56,7 @@ _API_KEY_PROVIDER_AUX_MODELS: Dict[str, str] = {
# OpenRouter app attribution headers # OpenRouter app attribution headers
_OR_HEADERS = { _OR_HEADERS = {
"HTTP-Referer": "https://github.com/NousResearch/hermes-agent", "HTTP-Referer": "https://hermes-agent.nousresearch.com",
"X-OpenRouter-Title": "Hermes Agent", "X-OpenRouter-Title": "Hermes Agent",
"X-OpenRouter-Categories": "productivity,cli-agent", "X-OpenRouter-Categories": "productivity,cli-agent",
} }

View file

@ -28,7 +28,7 @@ class ContextCompressor:
def __init__( def __init__(
self, self,
model: str, model: str,
threshold_percent: float = 0.85, threshold_percent: float = 0.50,
protect_first_n: int = 3, protect_first_n: int = 3,
protect_last_n: int = 4, protect_last_n: int = 4,
summary_target_tokens: int = 2500, summary_target_tokens: int = 2500,

View file

@ -63,6 +63,11 @@ def get_skin_tool_prefix() -> str:
# Tool preview (one-line summary of a tool call's primary argument) # Tool preview (one-line summary of a tool call's primary argument)
# ========================================================================= # =========================================================================
def _oneline(text: str) -> str:
"""Collapse whitespace (including newlines) to single spaces."""
return " ".join(text.split())
def build_tool_preview(tool_name: str, args: dict, max_len: int = 40) -> str: def build_tool_preview(tool_name: str, args: dict, max_len: int = 40) -> str:
"""Build a short preview of a tool call's primary argument for display.""" """Build a short preview of a tool call's primary argument for display."""
if not args: if not args:
@ -89,7 +94,7 @@ def build_tool_preview(tool_name: str, args: dict, max_len: int = 40) -> str:
if sid: if sid:
parts.append(sid[:16]) parts.append(sid[:16])
if data: if data:
parts.append(f'"{data[:20]}"') parts.append(f'"{_oneline(data[:20])}"')
if timeout_val and action == "wait": if timeout_val and action == "wait":
parts.append(f"{timeout_val}s") parts.append(f"{timeout_val}s")
return " ".join(parts) if parts else None return " ".join(parts) if parts else None
@ -105,24 +110,24 @@ def build_tool_preview(tool_name: str, args: dict, max_len: int = 40) -> str:
return f"planning {len(todos_arg)} task(s)" return f"planning {len(todos_arg)} task(s)"
if tool_name == "session_search": if tool_name == "session_search":
query = args.get("query", "") query = _oneline(args.get("query", ""))
return f"recall: \"{query[:25]}{'...' if len(query) > 25 else ''}\"" return f"recall: \"{query[:25]}{'...' if len(query) > 25 else ''}\""
if tool_name == "memory": if tool_name == "memory":
action = args.get("action", "") action = args.get("action", "")
target = args.get("target", "") target = args.get("target", "")
if action == "add": if action == "add":
content = args.get("content", "") content = _oneline(args.get("content", ""))
return f"+{target}: \"{content[:25]}{'...' if len(content) > 25 else ''}\"" return f"+{target}: \"{content[:25]}{'...' if len(content) > 25 else ''}\""
elif action == "replace": elif action == "replace":
return f"~{target}: \"{args.get('old_text', '')[:20]}\"" return f"~{target}: \"{_oneline(args.get('old_text', '')[:20])}\""
elif action == "remove": elif action == "remove":
return f"-{target}: \"{args.get('old_text', '')[:20]}\"" return f"-{target}: \"{_oneline(args.get('old_text', '')[:20])}\""
return action return action
if tool_name == "send_message": if tool_name == "send_message":
target = args.get("target", "?") target = args.get("target", "?")
msg = args.get("message", "") msg = _oneline(args.get("message", ""))
if len(msg) > 20: if len(msg) > 20:
msg = msg[:17] + "..." msg = msg[:17] + "..."
return f"to {target}: \"{msg}\"" return f"to {target}: \"{msg}\""
@ -156,7 +161,7 @@ def build_tool_preview(tool_name: str, args: dict, max_len: int = 40) -> str:
if isinstance(value, list): if isinstance(value, list):
value = value[0] if value else "" value = value[0] if value else ""
preview = str(value).strip() preview = _oneline(str(value))
if not preview: if not preview:
return None return None
if len(preview) > max_len: if len(preview) > max_len:

25
cli.py
View file

@ -175,7 +175,7 @@ def load_cli_config() -> Dict[str, Any]:
}, },
"compression": { "compression": {
"enabled": True, # Auto-compress when approaching context limit "enabled": True, # Auto-compress when approaching context limit
"threshold": 0.85, # Compress at 85% of model's context limit "threshold": 0.50, # Compress at 50% of model's context limit
"summary_model": "google/gemini-3-flash-preview", # Fast/cheap model for summaries "summary_model": "google/gemini-3-flash-preview", # Fast/cheap model for summaries
}, },
"agent": { "agent": {
@ -3608,6 +3608,19 @@ class HermesCLI:
continue continue
print(f"\n⚡ New message detected, interrupting...") print(f"\n⚡ New message detected, interrupting...")
self.agent.interrupt(interrupt_msg) self.agent.interrupt(interrupt_msg)
# Debug: log to file (stdout may be devnull from redirect_stdout)
try:
import pathlib as _pl
_dbg = _pl.Path.home() / ".hermes" / "interrupt_debug.log"
with open(_dbg, "a") as _f:
import time as _t
_f.write(f"{_t.strftime('%H:%M:%S')} interrupt fired: msg={str(interrupt_msg)[:60]!r}, "
f"children={len(self.agent._active_children)}, "
f"parent._interrupt={self.agent._interrupt_requested}\n")
for _ci, _ch in enumerate(self.agent._active_children):
_f.write(f" child[{_ci}]._interrupt={_ch._interrupt_requested}\n")
except Exception:
pass
break break
except queue.Empty: except queue.Empty:
pass # Queue empty or timeout, continue waiting pass # Queue empty or timeout, continue waiting
@ -3877,6 +3890,16 @@ class HermesCLI:
payload = (text, images) if images else text payload = (text, images) if images else text
if self._agent_running and not (text and text.startswith("/")): if self._agent_running and not (text and text.startswith("/")):
self._interrupt_queue.put(payload) self._interrupt_queue.put(payload)
# Debug: log to file when message enters interrupt queue
try:
import pathlib as _pl
_dbg = _pl.Path.home() / ".hermes" / "interrupt_debug.log"
with open(_dbg, "a") as _f:
import time as _t
_f.write(f"{_t.strftime('%H:%M:%S')} ENTER: queued interrupt msg={str(payload)[:60]!r}, "
f"agent_running={self._agent_running}\n")
except Exception:
pass
else: else:
self._pending_input.put(payload) self._pending_input.put(payload)
event.app.current_buffer.reset(append_to_history=True) event.app.current_buffer.reset(append_to_history=True)

110
docs/migration/openclaw.md Normal file
View file

@ -0,0 +1,110 @@
# Migrating from OpenClaw to Hermes Agent
This guide covers how to import your OpenClaw settings, memories, skills, and API keys into Hermes Agent.
## Three Ways to Migrate
### 1. Automatic (during first-time setup)
When you run `hermes setup` for the first time and Hermes detects `~/.openclaw`, it automatically offers to import your OpenClaw data before configuration begins. Just accept the prompt and everything is handled for you.
### 2. CLI Command (quick, scriptable)
```bash
hermes claw migrate # Full migration with confirmation prompt
hermes claw migrate --dry-run # Preview what would happen
hermes claw migrate --preset user-data # Migrate without API keys/secrets
hermes claw migrate --yes # Skip confirmation prompt
```
**All options:**
| Flag | Description |
|------|-------------|
| `--source PATH` | Path to OpenClaw directory (default: `~/.openclaw`) |
| `--dry-run` | Preview only — no files are modified |
| `--preset {user-data,full}` | Migration preset (default: `full`). `user-data` excludes secrets |
| `--overwrite` | Overwrite existing files (default: skip conflicts) |
| `--migrate-secrets` | Include allowlisted secrets (auto-enabled with `full` preset) |
| `--workspace-target PATH` | Copy workspace instructions (AGENTS.md) to this absolute path |
| `--skill-conflict {skip,overwrite,rename}` | How to handle skill name conflicts (default: `skip`) |
| `--yes`, `-y` | Skip confirmation prompts |
### 3. Agent-Guided (interactive, with previews)
Ask the agent to run the migration for you:
```
> Migrate my OpenClaw setup to Hermes
```
The agent will use the `openclaw-migration` skill to:
1. Run a dry-run first to preview changes
2. Ask about conflict resolution (SOUL.md, skills, etc.)
3. Let you choose between `user-data` and `full` presets
4. Execute the migration with your choices
5. Print a detailed summary of what was migrated
## What Gets Migrated
### `user-data` preset
| Item | Source | Destination |
|------|--------|-------------|
| SOUL.md | `~/.openclaw/workspace/SOUL.md` | `~/.hermes/SOUL.md` |
| Memory entries | `~/.openclaw/workspace/MEMORY.md` | `~/.hermes/memories/MEMORY.md` |
| User profile | `~/.openclaw/workspace/USER.md` | `~/.hermes/memories/USER.md` |
| Skills | `~/.openclaw/workspace/skills/` | `~/.hermes/skills/openclaw-imports/` |
| Command allowlist | `~/.openclaw/workspace/exec_approval_patterns.yaml` | Merged into `~/.hermes/config.yaml` |
| Messaging settings | `~/.openclaw/config.yaml` (TELEGRAM_ALLOWED_USERS, MESSAGING_CWD) | `~/.hermes/.env` |
| TTS assets | `~/.openclaw/workspace/tts/` | `~/.hermes/tts/` |
### `full` preset (adds to `user-data`)
| Item | Source | Destination |
|------|--------|-------------|
| Telegram bot token | `~/.openclaw/config.yaml` | `~/.hermes/.env` |
| OpenRouter API key | `~/.openclaw/.env` or config | `~/.hermes/.env` |
| OpenAI API key | `~/.openclaw/.env` or config | `~/.hermes/.env` |
| Anthropic API key | `~/.openclaw/.env` or config | `~/.hermes/.env` |
| ElevenLabs API key | `~/.openclaw/.env` or config | `~/.hermes/.env` |
Only these 6 allowlisted secrets are ever imported. Other credentials are skipped and reported.
## Conflict Handling
By default, the migration **will not overwrite** existing Hermes data:
- **SOUL.md** — skipped if one already exists in `~/.hermes/`
- **Memory entries** — skipped if memories already exist (to avoid duplicates)
- **Skills** — skipped if a skill with the same name already exists
- **API keys** — skipped if the key is already set in `~/.hermes/.env`
To overwrite conflicts, use `--overwrite`. The migration creates backups before overwriting.
For skills, you can also use `--skill-conflict rename` to import conflicting skills under a new name (e.g., `skill-name-imported`).
## Migration Report
Every migration (including dry runs) produces a report showing:
- **Migrated items** — what was successfully imported
- **Conflicts** — items skipped because they already exist
- **Skipped items** — items not found in the source
- **Errors** — items that failed to import
For execute runs, the full report is saved to `~/.hermes/migration/openclaw/<timestamp>/`.
## Troubleshooting
### "OpenClaw directory not found"
The migration looks for `~/.openclaw` by default. If your OpenClaw is installed elsewhere, use `--source`:
```bash
hermes claw migrate --source /path/to/.openclaw
```
### "Migration script not found"
The migration script ships with Hermes Agent. If you installed via pip (not git clone), the `optional-skills/` directory may not be present. Install the skill from the Skills Hub:
```bash
hermes skills install openclaw-migration
```
### Memory overflow
If your OpenClaw MEMORY.md or USER.md exceeds Hermes' character limits, excess entries are exported to an overflow file in the migration report directory. You can manually review and add the most important ones.

View file

@ -66,13 +66,14 @@ class SlackAdapter(BasePlatformAdapter):
- Typing indicators (not natively supported by Slack bots) - Typing indicators (not natively supported by Slack bots)
""" """
MAX_MESSAGE_LENGTH = 4000 # Slack's limit is higher but mrkdwn can inflate MAX_MESSAGE_LENGTH = 3900 # Slack hard limit is 4000 but leave room for mrkdwn
def __init__(self, config: PlatformConfig): def __init__(self, config: PlatformConfig):
super().__init__(config, Platform.SLACK) super().__init__(config, Platform.SLACK)
self._app: Optional[AsyncApp] = None self._app: Optional[AsyncApp] = None
self._handler: Optional[AsyncSocketModeHandler] = None self._handler: Optional[AsyncSocketModeHandler] = None
self._bot_user_id: Optional[str] = None self._bot_user_id: Optional[str] = None
self._user_name_cache: Dict[str, str] = {} # user_id → display name
async def connect(self) -> bool: async def connect(self) -> bool:
"""Connect to Slack via Socket Mode.""" """Connect to Slack via Socket Mode."""
@ -152,23 +153,36 @@ class SlackAdapter(BasePlatformAdapter):
return SendResult(success=False, error="Not connected") return SendResult(success=False, error="Not connected")
try: try:
# Convert standard markdown → Slack mrkdwn
formatted = self.format_message(content)
# Split long messages, preserving code block boundaries
chunks = self.truncate_message(formatted, self.MAX_MESSAGE_LENGTH)
thread_ts = self._resolve_thread_ts(reply_to, metadata)
last_result = None
# reply_broadcast: also post thread replies to the main channel.
# Controlled via platform config: gateway.slack.reply_broadcast
broadcast = self.config.extra.get("reply_broadcast", False)
for i, chunk in enumerate(chunks):
kwargs = { kwargs = {
"channel": chat_id, "channel": chat_id,
"text": content, "text": chunk,
} }
if thread_ts:
kwargs["thread_ts"] = thread_ts
# Only broadcast the first chunk of the first reply
if broadcast and i == 0:
kwargs["reply_broadcast"] = True
# Reply in thread if thread_ts is available last_result = await self._app.client.chat_postMessage(**kwargs)
if reply_to:
kwargs["thread_ts"] = reply_to
elif metadata and metadata.get("thread_ts"):
kwargs["thread_ts"] = metadata["thread_ts"]
result = await self._app.client.chat_postMessage(**kwargs)
return SendResult( return SendResult(
success=True, success=True,
message_id=result.get("ts"), message_id=last_result.get("ts") if last_result else None,
raw_response=result, raw_response=last_result,
) )
except Exception as e: # pragma: no cover - defensive logging except Exception as e: # pragma: no cover - defensive logging
@ -202,15 +216,185 @@ class SlackAdapter(BasePlatformAdapter):
return SendResult(success=False, error=str(e)) return SendResult(success=False, error=str(e))
async def send_typing(self, chat_id: str, metadata=None) -> None: async def send_typing(self, chat_id: str, metadata=None) -> None:
"""Slack doesn't have a direct typing indicator API for bots.""" """Slack doesn't support typing indicators for bot users.
The reactions system (👀 on receipt, on completion) serves as
the visual feedback mechanism instead.
"""
pass pass
def _resolve_thread_ts(
self,
reply_to: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> Optional[str]:
"""Resolve the correct thread_ts for a Slack API call.
Prefers metadata thread_id (the thread parent's ts, set by the
gateway) over reply_to (which may be a child message's ts).
"""
if metadata:
if metadata.get("thread_id"):
return metadata["thread_id"]
if metadata.get("thread_ts"):
return metadata["thread_ts"]
return reply_to
# ----- Markdown → mrkdwn conversion -----
def format_message(self, content: str) -> str:
"""Convert standard markdown to Slack mrkdwn format.
Protected regions (code blocks, inline code) are extracted first so
their contents are never modified. Standard markdown constructs
(headers, bold, italic, links) are translated to mrkdwn syntax.
"""
if not content:
return content
placeholders: dict = {}
counter = [0]
def _ph(value: str) -> str:
"""Stash value behind a placeholder that survives later passes."""
key = f"\x00SL{counter[0]}\x00"
counter[0] += 1
placeholders[key] = value
return key
text = content
# 1) Protect fenced code blocks (``` ... ```)
text = re.sub(
r'(```(?:[^\n]*\n)?[\s\S]*?```)',
lambda m: _ph(m.group(0)),
text,
)
# 2) Protect inline code (`...`)
text = re.sub(r'(`[^`]+`)', lambda m: _ph(m.group(0)), text)
# 3) Convert markdown links [text](url) → <url|text>
text = re.sub(
r'\[([^\]]+)\]\(([^)]+)\)',
lambda m: _ph(f'<{m.group(2)}|{m.group(1)}>'),
text,
)
# 4) Convert headers (## Title) → *Title* (bold)
def _convert_header(m):
inner = m.group(1).strip()
# Strip redundant bold markers inside a header
inner = re.sub(r'\*\*(.+?)\*\*', r'\1', inner)
return _ph(f'*{inner}*')
text = re.sub(
r'^#{1,6}\s+(.+)$', _convert_header, text, flags=re.MULTILINE
)
# 5) Convert bold: **text** → *text* (Slack bold)
text = re.sub(
r'\*\*(.+?)\*\*',
lambda m: _ph(f'*{m.group(1)}*'),
text,
)
# 6) Convert italic: _text_ stays as _text_ (already Slack italic)
# Single *text* → _text_ (Slack italic)
text = re.sub(
r'(?<!\*)\*([^*\n]+)\*(?!\*)',
lambda m: _ph(f'_{m.group(1)}_'),
text,
)
# 7) Convert strikethrough: ~~text~~ → ~text~
text = re.sub(
r'~~(.+?)~~',
lambda m: _ph(f'~{m.group(1)}~'),
text,
)
# 8) Convert blockquotes: > text → > text (same syntax, just ensure
# no extra escaping happens to the > character)
# Slack uses the same > prefix, so this is a no-op for content.
# 9) Restore placeholders in reverse order
for key in reversed(list(placeholders.keys())):
text = text.replace(key, placeholders[key])
return text
# ----- Reactions -----
async def _add_reaction(
self, channel: str, timestamp: str, emoji: str
) -> bool:
"""Add an emoji reaction to a message. Returns True on success."""
if not self._app:
return False
try:
await self._app.client.reactions_add(
channel=channel, timestamp=timestamp, name=emoji
)
return True
except Exception as e:
# Don't log as error — may fail if already reacted or missing scope
logger.debug("[Slack] reactions.add failed (%s): %s", emoji, e)
return False
async def _remove_reaction(
self, channel: str, timestamp: str, emoji: str
) -> bool:
"""Remove an emoji reaction from a message. Returns True on success."""
if not self._app:
return False
try:
await self._app.client.reactions_remove(
channel=channel, timestamp=timestamp, name=emoji
)
return True
except Exception as e:
logger.debug("[Slack] reactions.remove failed (%s): %s", emoji, e)
return False
# ----- User identity resolution -----
async def _resolve_user_name(self, user_id: str) -> str:
"""Resolve a Slack user ID to a display name, with caching."""
if not user_id:
return ""
if user_id in self._user_name_cache:
return self._user_name_cache[user_id]
if not self._app:
return user_id
try:
result = await self._app.client.users_info(user=user_id)
user = result.get("user", {})
# Prefer display_name → real_name → user_id
profile = user.get("profile", {})
name = (
profile.get("display_name")
or profile.get("real_name")
or user.get("real_name")
or user.get("name")
or user_id
)
self._user_name_cache[user_id] = name
return name
except Exception as e:
logger.debug("[Slack] users.info failed for %s: %s", user_id, e)
self._user_name_cache[user_id] = user_id
return user_id
async def send_image_file( async def send_image_file(
self, self,
chat_id: str, chat_id: str,
image_path: str, image_path: str,
caption: Optional[str] = None, caption: Optional[str] = None,
reply_to: Optional[str] = None, reply_to: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> SendResult: ) -> SendResult:
"""Send a local image file to Slack by uploading it.""" """Send a local image file to Slack by uploading it."""
if not self._app: if not self._app:
@ -226,7 +410,7 @@ class SlackAdapter(BasePlatformAdapter):
file=image_path, file=image_path,
filename=os.path.basename(image_path), filename=os.path.basename(image_path),
initial_comment=caption or "", initial_comment=caption or "",
thread_ts=reply_to, thread_ts=self._resolve_thread_ts(reply_to, metadata),
) )
return SendResult(success=True, raw_response=result) return SendResult(success=True, raw_response=result)
@ -246,6 +430,7 @@ class SlackAdapter(BasePlatformAdapter):
image_url: str, image_url: str,
caption: Optional[str] = None, caption: Optional[str] = None,
reply_to: Optional[str] = None, reply_to: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> SendResult: ) -> SendResult:
"""Send an image to Slack by uploading the URL as a file.""" """Send an image to Slack by uploading the URL as a file."""
if not self._app: if not self._app:
@ -264,7 +449,7 @@ class SlackAdapter(BasePlatformAdapter):
content=response.content, content=response.content,
filename="image.png", filename="image.png",
initial_comment=caption or "", initial_comment=caption or "",
thread_ts=reply_to, thread_ts=self._resolve_thread_ts(reply_to, metadata),
) )
return SendResult(success=True, raw_response=result) return SendResult(success=True, raw_response=result)
@ -286,6 +471,7 @@ class SlackAdapter(BasePlatformAdapter):
audio_path: str, audio_path: str,
caption: Optional[str] = None, caption: Optional[str] = None,
reply_to: Optional[str] = None, reply_to: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> SendResult: ) -> SendResult:
"""Send an audio file to Slack.""" """Send an audio file to Slack."""
if not self._app: if not self._app:
@ -297,7 +483,7 @@ class SlackAdapter(BasePlatformAdapter):
file=audio_path, file=audio_path,
filename=os.path.basename(audio_path), filename=os.path.basename(audio_path),
initial_comment=caption or "", initial_comment=caption or "",
thread_ts=reply_to, thread_ts=self._resolve_thread_ts(reply_to, metadata),
) )
return SendResult(success=True, raw_response=result) return SendResult(success=True, raw_response=result)
@ -316,6 +502,7 @@ class SlackAdapter(BasePlatformAdapter):
video_path: str, video_path: str,
caption: Optional[str] = None, caption: Optional[str] = None,
reply_to: Optional[str] = None, reply_to: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> SendResult: ) -> SendResult:
"""Send a video file to Slack.""" """Send a video file to Slack."""
if not self._app: if not self._app:
@ -330,7 +517,7 @@ class SlackAdapter(BasePlatformAdapter):
file=video_path, file=video_path,
filename=os.path.basename(video_path), filename=os.path.basename(video_path),
initial_comment=caption or "", initial_comment=caption or "",
thread_ts=reply_to, thread_ts=self._resolve_thread_ts(reply_to, metadata),
) )
return SendResult(success=True, raw_response=result) return SendResult(success=True, raw_response=result)
@ -351,6 +538,7 @@ class SlackAdapter(BasePlatformAdapter):
caption: Optional[str] = None, caption: Optional[str] = None,
file_name: Optional[str] = None, file_name: Optional[str] = None,
reply_to: Optional[str] = None, reply_to: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> SendResult: ) -> SendResult:
"""Send a document/file attachment to Slack.""" """Send a document/file attachment to Slack."""
if not self._app: if not self._app:
@ -367,7 +555,7 @@ class SlackAdapter(BasePlatformAdapter):
file=file_path, file=file_path,
filename=display_name, filename=display_name,
initial_comment=caption or "", initial_comment=caption or "",
thread_ts=reply_to, thread_ts=self._resolve_thread_ts(reply_to, metadata),
) )
return SendResult(success=True, raw_response=result) return SendResult(success=True, raw_response=result)
@ -419,13 +607,22 @@ class SlackAdapter(BasePlatformAdapter):
text = event.get("text", "") text = event.get("text", "")
user_id = event.get("user", "") user_id = event.get("user", "")
channel_id = event.get("channel", "") channel_id = event.get("channel", "")
thread_ts = event.get("thread_ts") or event.get("ts")
ts = event.get("ts", "") ts = event.get("ts", "")
# Determine if this is a DM or channel message # Determine if this is a DM or channel message
channel_type = event.get("channel_type", "") channel_type = event.get("channel_type", "")
is_dm = channel_type == "im" is_dm = channel_type == "im"
# Build thread_ts for session keying.
# In channels: fall back to ts so each top-level @mention starts a
# new thread/session (the bot always replies in a thread).
# In DMs: only use the real thread_ts — top-level DMs should share
# one continuous session, threaded DMs get their own session.
if is_dm:
thread_ts = event.get("thread_ts") # None for top-level DMs
else:
thread_ts = event.get("thread_ts") or ts # ts fallback for channels
# In channels, only respond if bot is mentioned # In channels, only respond if bot is mentioned
if not is_dm and self._bot_user_id: if not is_dm and self._bot_user_id:
if f"<@{self._bot_user_id}>" not in text: if f"<@{self._bot_user_id}>" not in text:
@ -521,12 +718,16 @@ class SlackAdapter(BasePlatformAdapter):
except Exception as e: # pragma: no cover - defensive logging except Exception as e: # pragma: no cover - defensive logging
logger.warning("[Slack] Failed to cache document from %s: %s", url, e, exc_info=True) logger.warning("[Slack] Failed to cache document from %s: %s", url, e, exc_info=True)
# Resolve user display name (cached after first lookup)
user_name = await self._resolve_user_name(user_id)
# Build source # Build source
source = self.build_source( source = self.build_source(
chat_id=channel_id, chat_id=channel_id,
chat_name=channel_id, # Will be resolved later if needed chat_name=channel_id, # Will be resolved later if needed
chat_type="dm" if is_dm else "group", chat_type="dm" if is_dm else "group",
user_id=user_id, user_id=user_id,
user_name=user_name,
thread_id=thread_ts, thread_id=thread_ts,
) )
@ -541,8 +742,15 @@ class SlackAdapter(BasePlatformAdapter):
reply_to_message_id=thread_ts if thread_ts != ts else None, reply_to_message_id=thread_ts if thread_ts != ts else None,
) )
# Add 👀 reaction to acknowledge receipt
await self._add_reaction(channel_id, ts, "eyes")
await self.handle_message(msg_event) await self.handle_message(msg_event)
# Replace 👀 with ✅ when done
await self._remove_reaction(channel_id, ts, "eyes")
await self._add_reaction(channel_id, ts, "white_check_mark")
async def _handle_slash_command(self, command: dict) -> None: async def _handle_slash_command(self, command: dict) -> None:
"""Handle /hermes slash command.""" """Handle /hermes slash command."""
text = command.get("text", "").strip() text = command.get("text", "").strip()
@ -556,6 +764,15 @@ class SlackAdapter(BasePlatformAdapter):
"help": "/help", "help": "/help",
"model": "/model", "personality": "/personality", "model": "/model", "personality": "/personality",
"retry": "/retry", "undo": "/undo", "retry": "/retry", "undo": "/undo",
"compact": "/compress", "compress": "/compress",
"resume": "/resume",
"background": "/background",
"usage": "/usage",
"insights": "/insights",
"title": "/title",
"reasoning": "/reasoning",
"provider": "/provider",
"rollback": "/rollback",
} }
first_word = text.split()[0] if text else "" first_word = text.split()[0] if text else ""
if first_word in subcommand_map: if first_word in subcommand_map:

View file

@ -1057,7 +1057,7 @@ class GatewayRunner:
# Read model + compression config from config.yaml — same # Read model + compression config from config.yaml — same
# source of truth the agent itself uses. # source of truth the agent itself uses.
_hyg_model = "anthropic/claude-sonnet-4.6" _hyg_model = "anthropic/claude-sonnet-4.6"
_hyg_threshold_pct = 0.85 _hyg_threshold_pct = 0.50
_hyg_compression_enabled = True _hyg_compression_enabled = True
try: try:
_hyg_cfg_path = _hermes_home / "config.yaml" _hyg_cfg_path = _hermes_home / "config.yaml"
@ -3034,6 +3034,8 @@ class GatewayRunner:
# Queue for progress messages (thread-safe) # Queue for progress messages (thread-safe)
progress_queue = queue.Queue() if tool_progress_enabled else None progress_queue = queue.Queue() if tool_progress_enabled else None
last_tool = [None] # Mutable container for tracking in closure last_tool = [None] # Mutable container for tracking in closure
last_progress_msg = [None] # Track last message for dedup
repeat_count = [0] # How many times the same message repeated
def progress_callback(tool_name: str, preview: str = None, args: dict = None): def progress_callback(tool_name: str, preview: str = None, args: dict = None):
"""Callback invoked by agent when a tool is called.""" """Callback invoked by agent when a tool is called."""
@ -3106,6 +3108,18 @@ class GatewayRunner:
else: else:
msg = f"{emoji} {tool_name}..." msg = f"{emoji} {tool_name}..."
# Dedup: collapse consecutive identical progress messages.
# Common with execute_code where models iterate with the same
# code (same boilerplate imports → identical previews).
if msg == last_progress_msg[0]:
repeat_count[0] += 1
# Update the last line in progress_lines with a counter
# via a special "dedup" queue message.
progress_queue.put(("__dedup__", msg, repeat_count[0]))
return
last_progress_msg[0] = msg
repeat_count[0] = 0
progress_queue.put(msg) progress_queue.put(msg)
# Background task to send progress messages # Background task to send progress messages
@ -3126,7 +3140,16 @@ class GatewayRunner:
while True: while True:
try: try:
msg = progress_queue.get_nowait() raw = progress_queue.get_nowait()
# Handle dedup messages: update last line with repeat counter
if isinstance(raw, tuple) and len(raw) == 3 and raw[0] == "__dedup__":
_, base_msg, count = raw
if progress_lines:
progress_lines[-1] = f"{base_msg} (×{count + 1})"
msg = progress_lines[-1] if progress_lines else base_msg
else:
msg = raw
progress_lines.append(msg) progress_lines.append(msg)
if can_edit and progress_msg_id is not None: if can_edit and progress_msg_id is not None:
@ -3163,8 +3186,13 @@ class GatewayRunner:
# Drain remaining queued messages # Drain remaining queued messages
while not progress_queue.empty(): while not progress_queue.empty():
try: try:
msg = progress_queue.get_nowait() raw = progress_queue.get_nowait()
progress_lines.append(msg) if isinstance(raw, tuple) and len(raw) == 3 and raw[0] == "__dedup__":
_, base_msg, count = raw
if progress_lines:
progress_lines[-1] = f"{base_msg} (×{count + 1})"
else:
progress_lines.append(raw)
except Exception: except Exception:
break break
# Final edit with all remaining tools (only if editing works) # Final edit with all remaining tools (only if editing works)
@ -3418,17 +3446,19 @@ class GatewayRunner:
# Monitor for interrupts from the adapter (new messages arriving) # Monitor for interrupts from the adapter (new messages arriving)
async def monitor_for_interrupt(): async def monitor_for_interrupt():
adapter = self.adapters.get(source.platform) adapter = self.adapters.get(source.platform)
if not adapter: if not adapter or not session_key:
return return
chat_id = source.chat_id
while True: while True:
await asyncio.sleep(0.2) # Check every 200ms await asyncio.sleep(0.2) # Check every 200ms
# Check if adapter has a pending interrupt for this session # Check if adapter has a pending interrupt for this session.
if hasattr(adapter, 'has_pending_interrupt') and adapter.has_pending_interrupt(chat_id): # Must use session_key (build_session_key output) — NOT
# source.chat_id — because the adapter stores interrupt events
# under the full session key.
if hasattr(adapter, 'has_pending_interrupt') and adapter.has_pending_interrupt(session_key):
agent = agent_holder[0] agent = agent_holder[0]
if agent: if agent:
pending_event = adapter.get_pending_message(chat_id) pending_event = adapter.get_pending_message(session_key)
pending_text = pending_event.text if pending_event else None pending_text = pending_event.text if pending_event else None
logger.debug("Interrupt detected from adapter, signaling agent...") logger.debug("Interrupt detected from adapter, signaling agent...")
agent.interrupt(pending_text) agent.interrupt(pending_text)
@ -3445,10 +3475,11 @@ class GatewayRunner:
result = result_holder[0] result = result_holder[0]
adapter = self.adapters.get(source.platform) adapter = self.adapters.get(source.platform)
# Get pending message from adapter if interrupted # Get pending message from adapter if interrupted.
# Use session_key (not source.chat_id) to match adapter's storage keys.
pending = None pending = None
if result and result.get("interrupted") and adapter: if result and result.get("interrupted") and adapter:
pending_event = adapter.get_pending_message(source.chat_id) pending_event = adapter.get_pending_message(session_key) if session_key else None
if pending_event: if pending_event:
pending = pending_event.text pending = pending_event.text
elif result.get("interrupt_message"): elif result.get("interrupt_message"):
@ -3460,8 +3491,8 @@ class GatewayRunner:
# Clear the adapter's interrupt event so the next _run_agent call # Clear the adapter's interrupt event so the next _run_agent call
# doesn't immediately re-trigger the interrupt before the new agent # doesn't immediately re-trigger the interrupt before the new agent
# even makes its first API call (this was causing an infinite loop). # even makes its first API call (this was causing an infinite loop).
if adapter and hasattr(adapter, '_active_sessions') and source.chat_id in adapter._active_sessions: if adapter and hasattr(adapter, '_active_sessions') and session_key and session_key in adapter._active_sessions:
adapter._active_sessions[source.chat_id].clear() adapter._active_sessions[session_key].clear()
# Don't send the interrupted response to the user — it's just noise # Don't send the interrupted response to the user — it's just noise
# like "Operation interrupted." They already know they sent a new # like "Operation interrupted." They already know they sent a new

View file

@ -299,10 +299,21 @@ def build_session_key(source: SessionSource) -> str:
"""Build a deterministic session key from a message source. """Build a deterministic session key from a message source.
This is the single source of truth for session key construction. This is the single source of truth for session key construction.
WhatsApp DMs include chat_id (multi-user), other DMs do not (single owner).
DM rules:
- WhatsApp DMs include chat_id (multi-user support).
- Other DMs include thread_id when present (e.g. Slack threaded DMs),
so each DM thread gets its own session while top-level DMs share one.
- Without thread_id or chat_id, all DMs share a single session.
Group/channel rules:
- thread_id differentiates threads within a channel.
- Without thread_id, all messages in a channel share one session.
""" """
platform = source.platform.value platform = source.platform.value
if source.chat_type == "dm": if source.chat_type == "dm":
if source.thread_id:
return f"agent:main:{platform}:dm:{source.thread_id}"
if platform == "whatsapp" and source.chat_id: if platform == "whatsapp" and source.chat_id:
return f"agent:main:{platform}:dm:{source.chat_id}" return f"agent:main:{platform}:dm:{source.chat_id}"
return f"agent:main:{platform}:dm" return f"agent:main:{platform}:dm"

296
hermes_cli/claw.py Normal file
View file

@ -0,0 +1,296 @@
"""hermes claw — OpenClaw migration commands.
Usage:
hermes claw migrate # Interactive migration from ~/.openclaw
hermes claw migrate --dry-run # Preview what would be migrated
hermes claw migrate --preset full --overwrite # Full migration, overwrite conflicts
"""
import importlib.util
import logging
import sys
from pathlib import Path
from hermes_cli.config import get_hermes_home, get_config_path, load_config, save_config
from hermes_cli.setup import (
Colors,
color,
print_header,
print_info,
print_success,
print_warning,
print_error,
prompt_yes_no,
prompt_choice,
)
logger = logging.getLogger(__name__)
PROJECT_ROOT = Path(__file__).parent.parent.resolve()
_OPENCLAW_SCRIPT = (
PROJECT_ROOT
/ "optional-skills"
/ "migration"
/ "openclaw-migration"
/ "scripts"
/ "openclaw_to_hermes.py"
)
# Fallback: user may have installed the skill from the Hub
_OPENCLAW_SCRIPT_INSTALLED = (
get_hermes_home()
/ "skills"
/ "migration"
/ "openclaw-migration"
/ "scripts"
/ "openclaw_to_hermes.py"
)
def _find_migration_script() -> Path | None:
"""Find the openclaw_to_hermes.py script in known locations."""
for candidate in [_OPENCLAW_SCRIPT, _OPENCLAW_SCRIPT_INSTALLED]:
if candidate.exists():
return candidate
return None
def _load_migration_module(script_path: Path):
"""Dynamically load the migration script as a module."""
spec = importlib.util.spec_from_file_location("openclaw_to_hermes", script_path)
if spec is None or spec.loader is None:
return None
mod = importlib.util.module_from_spec(spec)
# Register in sys.modules so @dataclass can resolve the module
# (Python 3.11+ requires this for dynamically loaded modules)
sys.modules[spec.name] = mod
try:
spec.loader.exec_module(mod)
except Exception:
sys.modules.pop(spec.name, None)
raise
return mod
def claw_command(args):
"""Route hermes claw subcommands."""
action = getattr(args, "claw_action", None)
if action == "migrate":
_cmd_migrate(args)
else:
print("Usage: hermes claw migrate [options]")
print()
print("Commands:")
print(" migrate Migrate settings from OpenClaw to Hermes")
print()
print("Run 'hermes claw migrate --help' for migration options.")
def _cmd_migrate(args):
"""Run the OpenClaw → Hermes migration."""
source_dir = Path(getattr(args, "source", None) or Path.home() / ".openclaw")
dry_run = getattr(args, "dry_run", False)
preset = getattr(args, "preset", "full")
overwrite = getattr(args, "overwrite", False)
migrate_secrets = getattr(args, "migrate_secrets", False)
workspace_target = getattr(args, "workspace_target", None)
skill_conflict = getattr(args, "skill_conflict", "skip")
# If using the "full" preset, secrets are included by default
if preset == "full":
migrate_secrets = True
print()
print(
color(
"┌─────────────────────────────────────────────────────────┐",
Colors.MAGENTA,
)
)
print(
color(
"│ ⚕ Hermes — OpenClaw Migration │",
Colors.MAGENTA,
)
)
print(
color(
"└─────────────────────────────────────────────────────────┘",
Colors.MAGENTA,
)
)
# Check source directory
if not source_dir.is_dir():
print()
print_error(f"OpenClaw directory not found: {source_dir}")
print_info("Make sure your OpenClaw installation is at the expected path.")
print_info(f"You can specify a custom path: hermes claw migrate --source /path/to/.openclaw")
return
# Find the migration script
script_path = _find_migration_script()
if not script_path:
print()
print_error("Migration script not found.")
print_info("Expected at one of:")
print_info(f" {_OPENCLAW_SCRIPT}")
print_info(f" {_OPENCLAW_SCRIPT_INSTALLED}")
print_info("Make sure the openclaw-migration skill is installed.")
return
# Show what we're doing
hermes_home = get_hermes_home()
print()
print_header("Migration Settings")
print_info(f"Source: {source_dir}")
print_info(f"Target: {hermes_home}")
print_info(f"Preset: {preset}")
print_info(f"Mode: {'dry run (preview only)' if dry_run else 'execute'}")
print_info(f"Overwrite: {'yes' if overwrite else 'no (skip conflicts)'}")
print_info(f"Secrets: {'yes (allowlisted only)' if migrate_secrets else 'no'}")
if skill_conflict != "skip":
print_info(f"Skill conflicts: {skill_conflict}")
if workspace_target:
print_info(f"Workspace: {workspace_target}")
print()
# For execute mode (non-dry-run), confirm unless --yes was passed
if not dry_run and not getattr(args, "yes", False):
if not prompt_yes_no("Proceed with migration?", default=True):
print_info("Migration cancelled.")
return
# Ensure config.yaml exists before migration tries to read it
config_path = get_config_path()
if not config_path.exists():
save_config(load_config())
# Load and run the migration
try:
mod = _load_migration_module(script_path)
if mod is None:
print_error("Could not load migration script.")
return
selected = mod.resolve_selected_options(None, None, preset=preset)
ws_target = Path(workspace_target).resolve() if workspace_target else None
migrator = mod.Migrator(
source_root=source_dir.resolve(),
target_root=hermes_home.resolve(),
execute=not dry_run,
workspace_target=ws_target,
overwrite=overwrite,
migrate_secrets=migrate_secrets,
output_dir=None,
selected_options=selected,
preset_name=preset,
skill_conflict_mode=skill_conflict,
)
report = migrator.migrate()
except Exception as e:
print()
print_error(f"Migration failed: {e}")
logger.debug("OpenClaw migration error", exc_info=True)
return
# Print results
_print_migration_report(report, dry_run)
def _print_migration_report(report: dict, dry_run: bool):
"""Print a formatted migration report."""
summary = report.get("summary", {})
migrated = summary.get("migrated", 0)
skipped = summary.get("skipped", 0)
conflicts = summary.get("conflict", 0)
errors = summary.get("error", 0)
total = migrated + skipped + conflicts + errors
print()
if dry_run:
print_header("Dry Run Results")
print_info("No files were modified. This is a preview of what would happen.")
else:
print_header("Migration Results")
print()
# Detailed items
items = report.get("items", [])
if items:
# Group by status
migrated_items = [i for i in items if i.get("status") == "migrated"]
skipped_items = [i for i in items if i.get("status") == "skipped"]
conflict_items = [i for i in items if i.get("status") == "conflict"]
error_items = [i for i in items if i.get("status") == "error"]
if migrated_items:
label = "Would migrate" if dry_run else "Migrated"
print(color(f"{label}:", Colors.GREEN))
for item in migrated_items:
kind = item.get("kind", "unknown")
dest = item.get("destination", "")
if dest:
dest_short = str(dest).replace(str(Path.home()), "~")
print(f" {kind:<22s}{dest_short}")
else:
print(f" {kind}")
print()
if conflict_items:
print(color(f" ⚠ Conflicts (skipped — use --overwrite to force):", Colors.YELLOW))
for item in conflict_items:
kind = item.get("kind", "unknown")
reason = item.get("reason", "already exists")
print(f" {kind:<22s} {reason}")
print()
if skipped_items:
print(color(f" ─ Skipped:", Colors.DIM))
for item in skipped_items:
kind = item.get("kind", "unknown")
reason = item.get("reason", "")
print(f" {kind:<22s} {reason}")
print()
if error_items:
print(color(f" ✗ Errors:", Colors.RED))
for item in error_items:
kind = item.get("kind", "unknown")
reason = item.get("reason", "unknown error")
print(f" {kind:<22s} {reason}")
print()
# Summary line
parts = []
if migrated:
action = "would migrate" if dry_run else "migrated"
parts.append(f"{migrated} {action}")
if conflicts:
parts.append(f"{conflicts} conflict(s)")
if skipped:
parts.append(f"{skipped} skipped")
if errors:
parts.append(f"{errors} error(s)")
if parts:
print_info(f"Summary: {', '.join(parts)}")
else:
print_info("Nothing to migrate.")
# Output directory
output_dir = report.get("output_dir")
if output_dir:
print_info(f"Full report saved to: {output_dir}")
if dry_run:
print()
print_info("To execute the migration, run without --dry-run:")
print_info(f" hermes claw migrate --preset {report.get('preset', 'full')}")
elif migrated:
print()
print_success("Migration complete!")

View file

@ -121,7 +121,7 @@ DEFAULT_CONFIG = {
"compression": { "compression": {
"enabled": True, "enabled": True,
"threshold": 0.85, "threshold": 0.50,
"summary_model": "google/gemini-3-flash-preview", "summary_model": "google/gemini-3-flash-preview",
"summary_provider": "auto", "summary_provider": "auto",
}, },
@ -1119,7 +1119,7 @@ def show_config():
enabled = compression.get('enabled', True) enabled = compression.get('enabled', True)
print(f" Enabled: {'yes' if enabled else 'no'}") print(f" Enabled: {'yes' if enabled else 'no'}")
if enabled: if enabled:
print(f" Threshold: {compression.get('threshold', 0.85) * 100:.0f}%") print(f" Threshold: {compression.get('threshold', 0.50) * 100:.0f}%")
print(f" Model: {compression.get('summary_model', 'google/gemini-3-flash-preview')}") print(f" Model: {compression.get('summary_model', 'google/gemini-3-flash-preview')}")
comp_provider = compression.get('summary_provider', 'auto') comp_provider = compression.get('summary_provider', 'auto')
if comp_provider != 'auto': if comp_provider != 'auto':

View file

@ -22,6 +22,8 @@ Usage:
hermes update # Update to latest version hermes update # Update to latest version
hermes uninstall # Uninstall Hermes Agent hermes uninstall # Uninstall Hermes Agent
hermes sessions browse # Interactive session picker with search hermes sessions browse # Interactive session picker with search
hermes claw migrate # Migrate from OpenClaw to Hermes
hermes claw migrate --dry-run # Preview migration without changes
""" """
import argparse import argparse
@ -1525,8 +1527,21 @@ def _model_flow_api_key_provider(config, provider_id, current_model=""):
save_env_value(base_url_env, override) save_env_value(base_url_env, override)
effective_base = override effective_base = override
# Model selection # Model selection — try live /models endpoint first, fall back to defaults
from hermes_cli.models import fetch_api_models
api_key_for_probe = existing_key or (get_env_value(key_env) if key_env else "")
live_models = fetch_api_models(api_key_for_probe, effective_base)
if live_models:
model_list = live_models
print(f" Found {len(model_list)} model(s) from {pconfig.name} API")
else:
model_list = _PROVIDER_MODELS.get(provider_id, []) model_list = _PROVIDER_MODELS.get(provider_id, [])
if model_list:
print(f" ⚠ Could not auto-detect models from API — showing defaults.")
print(f" Use \"Enter custom model name\" if you don't see your model.")
# else: no defaults either, will fall through to raw input
if model_list: if model_list:
selected = _prompt_model_selection(model_list, current_model=current_model) selected = _prompt_model_selection(model_list, current_model=current_model)
else: else:
@ -2821,6 +2836,69 @@ For more help on a command:
insights_parser.set_defaults(func=cmd_insights) insights_parser.set_defaults(func=cmd_insights)
# =========================================================================
# claw command (OpenClaw migration)
# =========================================================================
claw_parser = subparsers.add_parser(
"claw",
help="OpenClaw migration tools",
description="Migrate settings, memories, skills, and API keys from OpenClaw to Hermes"
)
claw_subparsers = claw_parser.add_subparsers(dest="claw_action")
# claw migrate
claw_migrate = claw_subparsers.add_parser(
"migrate",
help="Migrate from OpenClaw to Hermes",
description="Import settings, memories, skills, and API keys from an OpenClaw installation"
)
claw_migrate.add_argument(
"--source",
help="Path to OpenClaw directory (default: ~/.openclaw)"
)
claw_migrate.add_argument(
"--dry-run",
action="store_true",
help="Preview what would be migrated without making changes"
)
claw_migrate.add_argument(
"--preset",
choices=["user-data", "full"],
default="full",
help="Migration preset (default: full). 'user-data' excludes secrets"
)
claw_migrate.add_argument(
"--overwrite",
action="store_true",
help="Overwrite existing files (default: skip conflicts)"
)
claw_migrate.add_argument(
"--migrate-secrets",
action="store_true",
help="Include allowlisted secrets (TELEGRAM_BOT_TOKEN, API keys, etc.)"
)
claw_migrate.add_argument(
"--workspace-target",
help="Absolute path to copy workspace instructions into"
)
claw_migrate.add_argument(
"--skill-conflict",
choices=["skip", "overwrite", "rename"],
default="skip",
help="How to handle skill name conflicts (default: skip)"
)
claw_migrate.add_argument(
"--yes", "-y",
action="store_true",
help="Skip confirmation prompts"
)
def cmd_claw(args):
from hermes_cli.claw import claw_command
claw_command(args)
claw_parser.set_defaults(func=cmd_claw)
# ========================================================================= # =========================================================================
# version command # version command
# ========================================================================= # =========================================================================

View file

@ -386,44 +386,35 @@ def validate_requested_model(
"message": None, "message": None,
} }
else: else:
# API responded but model is not listed # API responded but model is not listed. Accept anyway —
# the user may have access to models not shown in the public
# listing (e.g. Z.AI Pro/Max plans can use glm-5 on coding
# endpoints even though it's not in /models). Warn but allow.
suggestions = get_close_matches(requested, api_models, n=3, cutoff=0.5) suggestions = get_close_matches(requested, api_models, n=3, cutoff=0.5)
suggestion_text = "" suggestion_text = ""
if suggestions: if suggestions:
suggestion_text = "\n Did you mean: " + ", ".join(f"`{s}`" for s in suggestions) suggestion_text = "\n Similar models: " + ", ".join(f"`{s}`" for s in suggestions)
return {
"accepted": False,
"persist": False,
"recognized": False,
"message": (
f"Error: `{requested}` is not a valid model for this provider."
f"{suggestion_text}"
),
}
# api_models is None — couldn't reach API, fall back to catalog check
provider_label = _PROVIDER_LABELS.get(normalized, normalized)
known_models = provider_model_ids(normalized)
if requested in known_models:
return { return {
"accepted": True, "accepted": True,
"persist": True, "persist": True,
"recognized": True,
"message": None,
}
# Can't validate — accept for session only
suggestion = get_close_matches(requested, known_models, n=1, cutoff=0.6)
suggestion_text = f" Did you mean `{suggestion[0]}`?" if suggestion else ""
return {
"accepted": True,
"persist": False,
"recognized": False, "recognized": False,
"message": ( "message": (
f"Could not validate `{requested}` against the live {provider_label} API. " f"Note: `{requested}` was not found in this provider's model listing. "
"Using it for this session only; config unchanged." f"It may still work if your plan supports it."
f"{suggestion_text}" f"{suggestion_text}"
), ),
} }
# api_models is None — couldn't reach API. Accept and persist,
# but warn so typos don't silently break things.
provider_label = _PROVIDER_LABELS.get(normalized, normalized)
return {
"accepted": True,
"persist": True,
"recognized": False,
"message": (
f"Could not reach the {provider_label} API to validate `{requested}`. "
f"If the service isn't down, this model may not be valid."
),
}

View file

@ -11,6 +11,7 @@ Modular wizard with independently-runnable sections:
Config files are stored in ~/.hermes/ for easy access. Config files are stored in ~/.hermes/ for easy access.
""" """
import importlib.util
import logging import logging
import os import os
import sys import sys
@ -51,6 +52,68 @@ def _set_default_model(config: Dict[str, Any], model_name: str) -> None:
config["model"] = model_cfg config["model"] = model_cfg
# Default model lists per provider — used as fallback when the live
# /models endpoint can't be reached.
_DEFAULT_PROVIDER_MODELS = {
"zai": ["glm-5", "glm-4.7", "glm-4.5", "glm-4.5-flash"],
"kimi-coding": ["kimi-k2.5", "kimi-k2-thinking", "kimi-k2-turbo-preview"],
"minimax": ["MiniMax-M2.5", "MiniMax-M2.5-highspeed", "MiniMax-M2.1"],
"minimax-cn": ["MiniMax-M2.5", "MiniMax-M2.5-highspeed", "MiniMax-M2.1"],
}
def _setup_provider_model_selection(config, provider_id, current_model, prompt_choice, prompt_fn):
"""Model selection for API-key providers with live /models detection.
Tries the provider's /models endpoint first. Falls back to a
hardcoded default list with a warning if the endpoint is unreachable.
Always offers a 'Custom model' escape hatch.
"""
from hermes_cli.auth import PROVIDER_REGISTRY
from hermes_cli.config import get_env_value
from hermes_cli.models import fetch_api_models
pconfig = PROVIDER_REGISTRY[provider_id]
# Resolve API key and base URL for the probe
api_key = ""
for ev in pconfig.api_key_env_vars:
api_key = get_env_value(ev) or os.getenv(ev, "")
if api_key:
break
base_url_env = pconfig.base_url_env_var or ""
base_url = (get_env_value(base_url_env) if base_url_env else "") or pconfig.inference_base_url
# Try live /models endpoint
live_models = fetch_api_models(api_key, base_url)
if live_models:
provider_models = live_models
print_info(f"Found {len(live_models)} model(s) from {pconfig.name} API")
else:
provider_models = _DEFAULT_PROVIDER_MODELS.get(provider_id, [])
if provider_models:
print_warning(
f"Could not auto-detect models from {pconfig.name} API — showing defaults.\n"
f" Use \"Custom model\" if the model you expect isn't listed."
)
model_choices = list(provider_models)
model_choices.append("Custom model")
model_choices.append(f"Keep current ({current_model})")
keep_idx = len(model_choices) - 1
model_idx = prompt_choice("Select default model:", model_choices, keep_idx)
if model_idx < len(provider_models):
_set_default_model(config, provider_models[model_idx])
elif model_idx == len(provider_models):
custom = prompt_fn("Enter model name")
if custom:
_set_default_model(config, custom)
# else: keep current
def _sync_model_from_disk(config: Dict[str, Any]) -> None: def _sync_model_from_disk(config: Dict[str, Any]) -> None:
disk_model = load_config().get("model") disk_model = load_config().get("model")
if isinstance(disk_model, dict): if isinstance(disk_model, dict):
@ -889,7 +952,8 @@ def setup_model_provider(config: dict):
print_info(f" URL: {detected['base_url']}") print_info(f" URL: {detected['base_url']}")
if detected["id"].startswith("coding"): if detected["id"].startswith("coding"):
print_info( print_info(
f" Note: Coding Plan detected — GLM-5 is not available, using {detected['model']}" f" Note: Coding Plan endpoint detected (default model: {detected['model']}). "
f"GLM-5 may still be available depending on your plan tier."
) )
save_env_value("GLM_BASE_URL", zai_base_url) save_env_value("GLM_BASE_URL", zai_base_url)
else: else:
@ -1174,10 +1238,10 @@ def setup_model_provider(config: dict):
_set_default_model(config, custom) _set_default_model(config, custom)
_update_config_for_provider("openai-codex", DEFAULT_CODEX_BASE_URL) _update_config_for_provider("openai-codex", DEFAULT_CODEX_BASE_URL)
_set_model_provider(config, "openai-codex", DEFAULT_CODEX_BASE_URL) _set_model_provider(config, "openai-codex", DEFAULT_CODEX_BASE_URL)
elif selected_provider == "zai": elif selected_provider in ("zai", "kimi-coding", "minimax", "minimax-cn"):
# Coding Plan endpoints don't have GLM-5 _setup_provider_model_selection(
is_coding_plan = get_env_value("GLM_BASE_URL") and "coding" in ( config, selected_provider, current_model,
get_env_value("GLM_BASE_URL") or "" prompt_choice, prompt,
) )
if is_coding_plan: if is_coding_plan:
zai_models = ["glm-4.7", "glm-4.5", "glm-4.5-flash"] zai_models = ["glm-4.7", "glm-4.5", "glm-4.5-flash"]
@ -2111,6 +2175,114 @@ def setup_tools(config: dict, first_install: bool = False):
tools_command(first_install=first_install, config=config) tools_command(first_install=first_install, config=config)
# =============================================================================
# OpenClaw Migration
# =============================================================================
_OPENCLAW_SCRIPT = (
PROJECT_ROOT
/ "optional-skills"
/ "migration"
/ "openclaw-migration"
/ "scripts"
/ "openclaw_to_hermes.py"
)
def _offer_openclaw_migration(hermes_home: Path) -> bool:
"""Detect ~/.openclaw and offer to migrate during first-time setup.
Returns True if migration ran successfully, False otherwise.
"""
openclaw_dir = Path.home() / ".openclaw"
if not openclaw_dir.is_dir():
return False
if not _OPENCLAW_SCRIPT.exists():
return False
print()
print_header("OpenClaw Installation Detected")
print_info(f"Found OpenClaw data at {openclaw_dir}")
print_info("Hermes can import your settings, memories, skills, and API keys.")
print()
if not prompt_yes_no("Would you like to import from OpenClaw?", default=True):
print_info(
"Skipping migration. You can run it later via the openclaw-migration skill."
)
return False
# Ensure config.yaml exists before migration tries to read it
config_path = get_config_path()
if not config_path.exists():
save_config(load_config())
# Dynamically load the migration script
try:
spec = importlib.util.spec_from_file_location(
"openclaw_to_hermes", _OPENCLAW_SCRIPT
)
if spec is None or spec.loader is None:
print_warning("Could not load migration script.")
return False
mod = importlib.util.module_from_spec(spec)
# Register in sys.modules so @dataclass can resolve the module
# (Python 3.11+ requires this for dynamically loaded modules)
import sys as _sys
_sys.modules[spec.name] = mod
try:
spec.loader.exec_module(mod)
except Exception:
_sys.modules.pop(spec.name, None)
raise
# Run migration with the "full" preset, execute mode, no overwrite
selected = mod.resolve_selected_options(None, None, preset="full")
migrator = mod.Migrator(
source_root=openclaw_dir.resolve(),
target_root=hermes_home.resolve(),
execute=True,
workspace_target=None,
overwrite=False,
migrate_secrets=True,
output_dir=None,
selected_options=selected,
preset_name="full",
)
report = migrator.migrate()
except Exception as e:
print_warning(f"Migration failed: {e}")
logger.debug("OpenClaw migration error", exc_info=True)
return False
# Print summary
summary = report.get("summary", {})
migrated = summary.get("migrated", 0)
skipped = summary.get("skipped", 0)
conflicts = summary.get("conflict", 0)
errors = summary.get("error", 0)
print()
if migrated:
print_success(f"Imported {migrated} item(s) from OpenClaw.")
if conflicts:
print_info(f"Skipped {conflicts} item(s) that already exist in Hermes.")
if skipped:
print_info(f"Skipped {skipped} item(s) (not found or unchanged).")
if errors:
print_warning(f"{errors} item(s) had errors — check the migration report.")
output_dir = report.get("output_dir")
if output_dir:
print_info(f"Full report saved to: {output_dir}")
print_success("Migration complete! Continuing with setup...")
return True
# ============================================================================= # =============================================================================
# Main Wizard Orchestrator # Main Wizard Orchestrator
# ============================================================================= # =============================================================================
@ -2277,6 +2449,11 @@ def run_setup_wizard(args):
print() print()
return return
# Offer OpenClaw migration before configuration begins
if _offer_openclaw_migration(hermes_home):
# Reload config in case migration wrote to it
config = load_config()
# ── Full Setup — run all sections ── # ── Full Setup — run all sections ──
print_header("Configuration Location") print_header("Configuration Location")
print_info(f"Config file: {get_config_path()}") print_info(f"Config file: {get_config_path()}")

View file

@ -14,6 +14,22 @@ metadata:
Use this skill when a user wants to move their OpenClaw setup into Hermes Agent with minimal manual cleanup. Use this skill when a user wants to move their OpenClaw setup into Hermes Agent with minimal manual cleanup.
## CLI Command
For a quick, non-interactive migration, use the built-in CLI command:
```bash
hermes claw migrate # Full interactive migration
hermes claw migrate --dry-run # Preview what would be migrated
hermes claw migrate --preset user-data # Migrate without secrets
hermes claw migrate --overwrite # Overwrite existing conflicts
hermes claw migrate --source /custom/path/.openclaw # Custom source
```
The CLI command runs the same migration script described below. Use this skill (via the agent) when you want an interactive, guided migration with dry-run previews and per-item conflict resolution.
**First-time setup:** The `hermes setup` wizard automatically detects `~/.openclaw` and offers migration before configuration begins.
## What this skill does ## What this skill does
It uses `scripts/openclaw_to_hermes.py` to: It uses `scripts/openclaw_to_hermes.py` to:

View file

@ -454,7 +454,7 @@ class AIAgent:
effective_base = base_url effective_base = base_url
if "openrouter" in effective_base.lower(): if "openrouter" in effective_base.lower():
client_kwargs["default_headers"] = { client_kwargs["default_headers"] = {
"HTTP-Referer": "https://github.com/NousResearch/hermes-agent", "HTTP-Referer": "https://hermes-agent.nousresearch.com",
"X-OpenRouter-Title": "Hermes Agent", "X-OpenRouter-Title": "Hermes Agent",
"X-OpenRouter-Categories": "productivity,cli-agent", "X-OpenRouter-Categories": "productivity,cli-agent",
} }
@ -481,7 +481,7 @@ class AIAgent:
"api_key": os.getenv("OPENROUTER_API_KEY", ""), "api_key": os.getenv("OPENROUTER_API_KEY", ""),
"base_url": OPENROUTER_BASE_URL, "base_url": OPENROUTER_BASE_URL,
"default_headers": { "default_headers": {
"HTTP-Referer": "https://github.com/NousResearch/hermes-agent", "HTTP-Referer": "https://hermes-agent.nousresearch.com",
"X-OpenRouter-Title": "Hermes Agent", "X-OpenRouter-Title": "Hermes Agent",
"X-OpenRouter-Categories": "productivity,cli-agent", "X-OpenRouter-Categories": "productivity,cli-agent",
}, },
@ -687,7 +687,7 @@ class AIAgent:
# Initialize context compressor for automatic context management # Initialize context compressor for automatic context management
# Compresses conversation when approaching model's context limit # Compresses conversation when approaching model's context limit
# Configuration via config.yaml (compression section) or environment variables # Configuration via config.yaml (compression section) or environment variables
compression_threshold = float(os.getenv("CONTEXT_COMPRESSION_THRESHOLD", "0.85")) compression_threshold = float(os.getenv("CONTEXT_COMPRESSION_THRESHOLD", "0.50"))
compression_enabled = os.getenv("CONTEXT_COMPRESSION_ENABLED", "true").lower() in ("true", "1", "yes") compression_enabled = os.getenv("CONTEXT_COMPRESSION_ENABLED", "true").lower() in ("true", "1", "yes")
compression_summary_model = os.getenv("CONTEXT_COMPRESSION_MODEL") or None compression_summary_model = os.getenv("CONTEXT_COMPRESSION_MODEL") or None
@ -2585,6 +2585,31 @@ class AIAgent:
return msg return msg
@staticmethod
def _sanitize_tool_calls_for_strict_api(api_msg: dict) -> dict:
"""Strip Codex Responses API fields from tool_calls for strict providers.
Providers like Mistral strictly validate the Chat Completions schema
and reject unknown fields (call_id, response_item_id) with 422.
These fields are preserved in the internal message history this
method only modifies the outgoing API copy.
Creates new tool_call dicts rather than mutating in-place, so the
original messages list retains call_id/response_item_id for Codex
Responses API compatibility (e.g. if the session falls back to a
Codex provider later).
"""
tool_calls = api_msg.get("tool_calls")
if not isinstance(tool_calls, list):
return api_msg
_STRIP_KEYS = {"call_id", "response_item_id"}
api_msg["tool_calls"] = [
{k: v for k, v in tc.items() if k not in _STRIP_KEYS}
if isinstance(tc, dict) else tc
for tc in tool_calls
]
return api_msg
def flush_memories(self, messages: list = None, min_turns: int = None): def flush_memories(self, messages: list = None, min_turns: int = None):
"""Give the model one turn to persist memories before context is lost. """Give the model one turn to persist memories before context is lost.
@ -2622,6 +2647,7 @@ class AIAgent:
try: try:
# Build API messages for the flush call # Build API messages for the flush call
_is_strict_api = "api.mistral.ai" in self.base_url.lower()
api_messages = [] api_messages = []
for msg in messages: for msg in messages:
api_msg = msg.copy() api_msg = msg.copy()
@ -2632,6 +2658,8 @@ class AIAgent:
api_msg.pop("reasoning", None) api_msg.pop("reasoning", None)
api_msg.pop("finish_reason", None) api_msg.pop("finish_reason", None)
api_msg.pop("_flush_sentinel", None) api_msg.pop("_flush_sentinel", None)
if _is_strict_api:
self._sanitize_tool_calls_for_strict_api(api_msg)
api_messages.append(api_msg) api_messages.append(api_msg)
if self._cached_system_prompt: if self._cached_system_prompt:
@ -3111,11 +3139,14 @@ class AIAgent:
try: try:
# Build API messages, stripping internal-only fields # Build API messages, stripping internal-only fields
# (finish_reason, reasoning) that strict APIs like Mistral reject with 422 # (finish_reason, reasoning) that strict APIs like Mistral reject with 422
_is_strict_api = "api.mistral.ai" in self.base_url.lower()
api_messages = [] api_messages = []
for msg in messages: for msg in messages:
api_msg = msg.copy() api_msg = msg.copy()
for internal_field in ("reasoning", "finish_reason"): for internal_field in ("reasoning", "finish_reason"):
api_msg.pop(internal_field, None) api_msg.pop(internal_field, None)
if _is_strict_api:
self._sanitize_tool_calls_for_strict_api(api_msg)
api_messages.append(api_msg) api_messages.append(api_msg)
effective_system = self._cached_system_prompt or "" effective_system = self._cached_system_prompt or ""
@ -3509,6 +3540,12 @@ class AIAgent:
# Remove finish_reason - not accepted by strict APIs (e.g. Mistral) # Remove finish_reason - not accepted by strict APIs (e.g. Mistral)
if "finish_reason" in api_msg: if "finish_reason" in api_msg:
api_msg.pop("finish_reason") api_msg.pop("finish_reason")
# Strip Codex Responses API fields (call_id, response_item_id) for
# strict providers like Mistral that reject unknown fields with 422.
# Uses new dicts so the internal messages list retains the fields
# for Codex Responses compatibility.
if "api.mistral.ai" in self.base_url.lower():
self._sanitize_tool_calls_for_strict_api(api_msg)
# Keep 'reasoning_details' - OpenRouter uses this for multi-turn reasoning context # Keep 'reasoning_details' - OpenRouter uses this for multi-turn reasoning context
# The signature field helps maintain reasoning continuity # The signature field helps maintain reasoning continuity
api_messages.append(api_msg) api_messages.append(api_msg)

View file

@ -0,0 +1,124 @@
"""Tests verifying interrupt key consistency between adapter and gateway.
Regression test for a bug where monitor_for_interrupt() in _run_agent used
source.chat_id to query the adapter, but the adapter stores interrupts under
the full session key (build_session_key output). This mismatch meant
interrupts were never detected, causing subagents to ignore new messages.
"""
import asyncio
import pytest
from gateway.config import Platform, PlatformConfig
from gateway.platforms.base import BasePlatformAdapter, MessageEvent, SendResult
from gateway.session import SessionSource, build_session_key
class StubAdapter(BasePlatformAdapter):
"""Minimal adapter for interrupt tests."""
def __init__(self):
super().__init__(PlatformConfig(enabled=True, token="test"), Platform.TELEGRAM)
async def connect(self):
return True
async def disconnect(self):
pass
async def send(self, chat_id, content, reply_to=None, metadata=None):
return SendResult(success=True, message_id="1")
async def send_typing(self, chat_id, metadata=None):
pass
async def get_chat_info(self, chat_id):
return {"id": chat_id}
def _source(chat_id="123456", chat_type="dm", thread_id=None):
return SessionSource(
platform=Platform.TELEGRAM,
chat_id=chat_id,
chat_type=chat_type,
thread_id=thread_id,
)
class TestInterruptKeyConsistency:
"""Ensure adapter interrupt methods are queried with session_key, not chat_id."""
def test_session_key_differs_from_chat_id_for_dm(self):
"""Session key for a DM is NOT the same as chat_id."""
source = _source("123456", "dm")
session_key = build_session_key(source)
assert session_key != source.chat_id
assert session_key == "agent:main:telegram:dm"
def test_session_key_differs_from_chat_id_for_group(self):
"""Session key for a group chat includes prefix, unlike raw chat_id."""
source = _source("-1001234", "group")
session_key = build_session_key(source)
assert session_key != source.chat_id
assert "agent:main:" in session_key
assert source.chat_id in session_key
@pytest.mark.asyncio
async def test_has_pending_interrupt_requires_session_key(self):
"""has_pending_interrupt returns True only when queried with session_key."""
adapter = StubAdapter()
source = _source("123456", "dm")
session_key = build_session_key(source)
# Simulate adapter storing interrupt under session_key
interrupt_event = asyncio.Event()
adapter._active_sessions[session_key] = interrupt_event
interrupt_event.set()
# Using session_key → found
assert adapter.has_pending_interrupt(session_key) is True
# Using chat_id → NOT found (this was the bug)
assert adapter.has_pending_interrupt(source.chat_id) is False
@pytest.mark.asyncio
async def test_get_pending_message_requires_session_key(self):
"""get_pending_message returns the event only with session_key."""
adapter = StubAdapter()
source = _source("123456", "dm")
session_key = build_session_key(source)
event = MessageEvent(text="hello", source=source, message_id="42")
adapter._pending_messages[session_key] = event
# Using chat_id → None (the bug)
assert adapter.get_pending_message(source.chat_id) is None
# Using session_key → found
result = adapter.get_pending_message(session_key)
assert result is event
@pytest.mark.asyncio
async def test_handle_message_stores_under_session_key(self):
"""handle_message stores pending messages under session_key, not chat_id."""
adapter = StubAdapter()
adapter.set_message_handler(lambda event: asyncio.sleep(0, result=None))
source = _source("-1001234", "group")
session_key = build_session_key(source)
# Mark session as active
adapter._active_sessions[session_key] = asyncio.Event()
# Send a second message while session is active
event = MessageEvent(text="interrupt!", source=source, message_id="2")
await adapter.handle_message(event)
# Stored under session_key
assert session_key in adapter._pending_messages
# NOT stored under chat_id
assert source.chat_id not in adapter._pending_messages
# Interrupt event was set
assert adapter._active_sessions[session_key].is_set()

View file

@ -530,3 +530,277 @@ class TestMessageRouting:
} }
await adapter._handle_slack_message(event) await adapter._handle_slack_message(event)
adapter.handle_message.assert_not_called() adapter.handle_message.assert_not_called()
# ---------------------------------------------------------------------------
# TestFormatMessage — Markdown → mrkdwn conversion
# ---------------------------------------------------------------------------
class TestFormatMessage:
"""Test markdown to Slack mrkdwn conversion."""
def test_bold_conversion(self, adapter):
assert adapter.format_message("**hello**") == "*hello*"
def test_italic_asterisk_conversion(self, adapter):
assert adapter.format_message("*hello*") == "_hello_"
def test_italic_underscore_preserved(self, adapter):
assert adapter.format_message("_hello_") == "_hello_"
def test_header_to_bold(self, adapter):
assert adapter.format_message("## Section Title") == "*Section Title*"
def test_header_with_bold_content(self, adapter):
# **bold** inside a header should not double-wrap
assert adapter.format_message("## **Title**") == "*Title*"
def test_link_conversion(self, adapter):
result = adapter.format_message("[click here](https://example.com)")
assert result == "<https://example.com|click here>"
def test_strikethrough(self, adapter):
assert adapter.format_message("~~deleted~~") == "~deleted~"
def test_code_block_preserved(self, adapter):
code = "```python\nx = **not bold**\n```"
assert adapter.format_message(code) == code
def test_inline_code_preserved(self, adapter):
text = "Use `**raw**` syntax"
assert adapter.format_message(text) == "Use `**raw**` syntax"
def test_mixed_content(self, adapter):
text = "**Bold** and *italic* with `code`"
result = adapter.format_message(text)
assert "*Bold*" in result
assert "_italic_" in result
assert "`code`" in result
def test_empty_string(self, adapter):
assert adapter.format_message("") == ""
def test_none_passthrough(self, adapter):
assert adapter.format_message(None) is None
# ---------------------------------------------------------------------------
# TestReactions
# ---------------------------------------------------------------------------
class TestReactions:
"""Test emoji reaction methods."""
@pytest.mark.asyncio
async def test_add_reaction_calls_api(self, adapter):
adapter._app.client.reactions_add = AsyncMock()
result = await adapter._add_reaction("C123", "ts1", "eyes")
assert result is True
adapter._app.client.reactions_add.assert_called_once_with(
channel="C123", timestamp="ts1", name="eyes"
)
@pytest.mark.asyncio
async def test_add_reaction_handles_error(self, adapter):
adapter._app.client.reactions_add = AsyncMock(side_effect=Exception("already_reacted"))
result = await adapter._add_reaction("C123", "ts1", "eyes")
assert result is False
@pytest.mark.asyncio
async def test_remove_reaction_calls_api(self, adapter):
adapter._app.client.reactions_remove = AsyncMock()
result = await adapter._remove_reaction("C123", "ts1", "eyes")
assert result is True
@pytest.mark.asyncio
async def test_reactions_in_message_flow(self, adapter):
"""Reactions should be added on receipt and swapped on completion."""
adapter._app.client.reactions_add = AsyncMock()
adapter._app.client.reactions_remove = AsyncMock()
adapter._app.client.users_info = AsyncMock(return_value={
"user": {"profile": {"display_name": "Tyler"}}
})
event = {
"text": "hello",
"user": "U_USER",
"channel": "C123",
"channel_type": "im",
"ts": "1234567890.000001",
}
await adapter._handle_slack_message(event)
# Should have added 👀, then removed 👀, then added ✅
add_calls = adapter._app.client.reactions_add.call_args_list
remove_calls = adapter._app.client.reactions_remove.call_args_list
assert len(add_calls) == 2
assert add_calls[0].kwargs["name"] == "eyes"
assert add_calls[1].kwargs["name"] == "white_check_mark"
assert len(remove_calls) == 1
assert remove_calls[0].kwargs["name"] == "eyes"
# ---------------------------------------------------------------------------
# TestUserNameResolution
# ---------------------------------------------------------------------------
class TestUserNameResolution:
"""Test user identity resolution."""
@pytest.mark.asyncio
async def test_resolves_display_name(self, adapter):
adapter._app.client.users_info = AsyncMock(return_value={
"user": {"profile": {"display_name": "Tyler", "real_name": "Tyler B"}}
})
name = await adapter._resolve_user_name("U123")
assert name == "Tyler"
@pytest.mark.asyncio
async def test_falls_back_to_real_name(self, adapter):
adapter._app.client.users_info = AsyncMock(return_value={
"user": {"profile": {"display_name": "", "real_name": "Tyler B"}}
})
name = await adapter._resolve_user_name("U123")
assert name == "Tyler B"
@pytest.mark.asyncio
async def test_caches_result(self, adapter):
adapter._app.client.users_info = AsyncMock(return_value={
"user": {"profile": {"display_name": "Tyler"}}
})
await adapter._resolve_user_name("U123")
await adapter._resolve_user_name("U123")
# Only one API call despite two lookups
assert adapter._app.client.users_info.call_count == 1
@pytest.mark.asyncio
async def test_handles_api_error(self, adapter):
adapter._app.client.users_info = AsyncMock(side_effect=Exception("rate limited"))
name = await adapter._resolve_user_name("U123")
assert name == "U123" # Falls back to user_id
@pytest.mark.asyncio
async def test_user_name_in_message_source(self, adapter):
"""Message source should include resolved user name."""
adapter._app.client.users_info = AsyncMock(return_value={
"user": {"profile": {"display_name": "Tyler"}}
})
adapter._app.client.reactions_add = AsyncMock()
adapter._app.client.reactions_remove = AsyncMock()
event = {
"text": "hello",
"user": "U_USER",
"channel": "C123",
"channel_type": "im",
"ts": "1234567890.000001",
}
await adapter._handle_slack_message(event)
# Check the source in the MessageEvent passed to handle_message
msg_event = adapter.handle_message.call_args[0][0]
assert msg_event.source.user_name == "Tyler"
# ---------------------------------------------------------------------------
# TestSlashCommands — expanded command set
# ---------------------------------------------------------------------------
class TestSlashCommands:
"""Test slash command routing."""
@pytest.mark.asyncio
async def test_compact_maps_to_compress(self, adapter):
command = {"text": "compact", "user_id": "U1", "channel_id": "C1"}
await adapter._handle_slash_command(command)
msg = adapter.handle_message.call_args[0][0]
assert msg.text == "/compress"
@pytest.mark.asyncio
async def test_resume_command(self, adapter):
command = {"text": "resume my session", "user_id": "U1", "channel_id": "C1"}
await adapter._handle_slash_command(command)
msg = adapter.handle_message.call_args[0][0]
assert msg.text == "/resume my session"
@pytest.mark.asyncio
async def test_background_command(self, adapter):
command = {"text": "background run tests", "user_id": "U1", "channel_id": "C1"}
await adapter._handle_slash_command(command)
msg = adapter.handle_message.call_args[0][0]
assert msg.text == "/background run tests"
@pytest.mark.asyncio
async def test_usage_command(self, adapter):
command = {"text": "usage", "user_id": "U1", "channel_id": "C1"}
await adapter._handle_slash_command(command)
msg = adapter.handle_message.call_args[0][0]
assert msg.text == "/usage"
@pytest.mark.asyncio
async def test_reasoning_command(self, adapter):
command = {"text": "reasoning", "user_id": "U1", "channel_id": "C1"}
await adapter._handle_slash_command(command)
msg = adapter.handle_message.call_args[0][0]
assert msg.text == "/reasoning"
# ---------------------------------------------------------------------------
# TestMessageSplitting
# ---------------------------------------------------------------------------
class TestMessageSplitting:
"""Test that long messages are split before sending."""
@pytest.mark.asyncio
async def test_long_message_split_into_chunks(self, adapter):
"""Messages over MAX_MESSAGE_LENGTH should be split."""
long_text = "x" * 5000
adapter._app.client.chat_postMessage = AsyncMock(
return_value={"ts": "ts1"}
)
await adapter.send("C123", long_text)
# Should have been called multiple times
assert adapter._app.client.chat_postMessage.call_count >= 2
@pytest.mark.asyncio
async def test_short_message_single_send(self, adapter):
"""Short messages should be sent in one call."""
adapter._app.client.chat_postMessage = AsyncMock(
return_value={"ts": "ts1"}
)
await adapter.send("C123", "hello world")
assert adapter._app.client.chat_postMessage.call_count == 1
# ---------------------------------------------------------------------------
# TestReplyBroadcast
# ---------------------------------------------------------------------------
class TestReplyBroadcast:
"""Test reply_broadcast config option."""
@pytest.mark.asyncio
async def test_broadcast_disabled_by_default(self, adapter):
adapter._app.client.chat_postMessage = AsyncMock(
return_value={"ts": "ts1"}
)
await adapter.send("C123", "hi", metadata={"thread_id": "parent_ts"})
kwargs = adapter._app.client.chat_postMessage.call_args.kwargs
assert "reply_broadcast" not in kwargs
@pytest.mark.asyncio
async def test_broadcast_enabled_via_config(self, adapter):
adapter.config.extra["reply_broadcast"] = True
adapter._app.client.chat_postMessage = AsyncMock(
return_value={"ts": "ts1"}
)
await adapter.send("C123", "hi", metadata={"thread_id": "parent_ts"})
kwargs = adapter._app.client.chat_postMessage.call_args.kwargs
assert kwargs.get("reply_broadcast") is True

View file

@ -0,0 +1,340 @@
"""Tests for hermes claw commands."""
from argparse import Namespace
from types import ModuleType
from unittest.mock import MagicMock, patch
import pytest
from hermes_cli import claw as claw_mod
# ---------------------------------------------------------------------------
# _find_migration_script
# ---------------------------------------------------------------------------
class TestFindMigrationScript:
"""Test script discovery in known locations."""
def test_finds_project_root_script(self, tmp_path):
script = tmp_path / "openclaw_to_hermes.py"
script.write_text("# placeholder")
with patch.object(claw_mod, "_OPENCLAW_SCRIPT", script):
assert claw_mod._find_migration_script() == script
def test_finds_installed_script(self, tmp_path):
installed = tmp_path / "installed.py"
installed.write_text("# placeholder")
with (
patch.object(claw_mod, "_OPENCLAW_SCRIPT", tmp_path / "nonexistent.py"),
patch.object(claw_mod, "_OPENCLAW_SCRIPT_INSTALLED", installed),
):
assert claw_mod._find_migration_script() == installed
def test_returns_none_when_missing(self, tmp_path):
with (
patch.object(claw_mod, "_OPENCLAW_SCRIPT", tmp_path / "a.py"),
patch.object(claw_mod, "_OPENCLAW_SCRIPT_INSTALLED", tmp_path / "b.py"),
):
assert claw_mod._find_migration_script() is None
# ---------------------------------------------------------------------------
# claw_command routing
# ---------------------------------------------------------------------------
class TestClawCommand:
"""Test the claw_command router."""
def test_routes_to_migrate(self):
args = Namespace(claw_action="migrate", source=None, dry_run=True,
preset="full", overwrite=False, migrate_secrets=False,
workspace_target=None, skill_conflict="skip", yes=False)
with patch.object(claw_mod, "_cmd_migrate") as mock:
claw_mod.claw_command(args)
mock.assert_called_once_with(args)
def test_shows_help_for_no_action(self, capsys):
args = Namespace(claw_action=None)
claw_mod.claw_command(args)
captured = capsys.readouterr()
assert "migrate" in captured.out
# ---------------------------------------------------------------------------
# _cmd_migrate
# ---------------------------------------------------------------------------
class TestCmdMigrate:
"""Test the migrate command handler."""
def test_error_when_source_missing(self, tmp_path, capsys):
args = Namespace(
source=str(tmp_path / "nonexistent"),
dry_run=True, preset="full", overwrite=False,
migrate_secrets=False, workspace_target=None,
skill_conflict="skip", yes=False,
)
claw_mod._cmd_migrate(args)
captured = capsys.readouterr()
assert "not found" in captured.out
def test_error_when_script_missing(self, tmp_path, capsys):
openclaw_dir = tmp_path / ".openclaw"
openclaw_dir.mkdir()
args = Namespace(
source=str(openclaw_dir),
dry_run=True, preset="full", overwrite=False,
migrate_secrets=False, workspace_target=None,
skill_conflict="skip", yes=False,
)
with (
patch.object(claw_mod, "_OPENCLAW_SCRIPT", tmp_path / "a.py"),
patch.object(claw_mod, "_OPENCLAW_SCRIPT_INSTALLED", tmp_path / "b.py"),
):
claw_mod._cmd_migrate(args)
captured = capsys.readouterr()
assert "Migration script not found" in captured.out
def test_dry_run_succeeds(self, tmp_path, capsys):
openclaw_dir = tmp_path / ".openclaw"
openclaw_dir.mkdir()
script = tmp_path / "script.py"
script.write_text("# placeholder")
# Build a fake migration module
fake_mod = ModuleType("openclaw_to_hermes")
fake_mod.resolve_selected_options = MagicMock(return_value={"soul", "memory"})
fake_migrator = MagicMock()
fake_migrator.migrate.return_value = {
"summary": {"migrated": 0, "skipped": 5, "conflict": 0, "error": 0},
"items": [
{"kind": "soul", "status": "skipped", "reason": "Not found"},
],
"preset": "full",
}
fake_mod.Migrator = MagicMock(return_value=fake_migrator)
args = Namespace(
source=str(openclaw_dir),
dry_run=True, preset="full", overwrite=False,
migrate_secrets=False, workspace_target=None,
skill_conflict="skip", yes=False,
)
with (
patch.object(claw_mod, "_find_migration_script", return_value=script),
patch.object(claw_mod, "_load_migration_module", return_value=fake_mod),
patch.object(claw_mod, "get_config_path", return_value=tmp_path / "config.yaml"),
patch.object(claw_mod, "save_config"),
patch.object(claw_mod, "load_config", return_value={}),
):
claw_mod._cmd_migrate(args)
captured = capsys.readouterr()
assert "Dry Run Results" in captured.out
assert "5 skipped" in captured.out
def test_execute_with_confirmation(self, tmp_path, capsys):
openclaw_dir = tmp_path / ".openclaw"
openclaw_dir.mkdir()
config_path = tmp_path / "config.yaml"
config_path.write_text("agent:\n max_turns: 90\n")
fake_mod = ModuleType("openclaw_to_hermes")
fake_mod.resolve_selected_options = MagicMock(return_value={"soul"})
fake_migrator = MagicMock()
fake_migrator.migrate.return_value = {
"summary": {"migrated": 2, "skipped": 1, "conflict": 0, "error": 0},
"items": [
{"kind": "soul", "status": "migrated", "destination": str(tmp_path / "SOUL.md")},
{"kind": "memory", "status": "migrated", "destination": str(tmp_path / "memories/MEMORY.md")},
],
}
fake_mod.Migrator = MagicMock(return_value=fake_migrator)
args = Namespace(
source=str(openclaw_dir),
dry_run=False, preset="user-data", overwrite=False,
migrate_secrets=False, workspace_target=None,
skill_conflict="skip", yes=False,
)
with (
patch.object(claw_mod, "_find_migration_script", return_value=tmp_path / "s.py"),
patch.object(claw_mod, "_load_migration_module", return_value=fake_mod),
patch.object(claw_mod, "get_config_path", return_value=config_path),
patch.object(claw_mod, "prompt_yes_no", return_value=True),
):
claw_mod._cmd_migrate(args)
captured = capsys.readouterr()
assert "Migration Results" in captured.out
assert "Migration complete!" in captured.out
def test_execute_cancelled_by_user(self, tmp_path, capsys):
openclaw_dir = tmp_path / ".openclaw"
openclaw_dir.mkdir()
config_path = tmp_path / "config.yaml"
config_path.write_text("")
args = Namespace(
source=str(openclaw_dir),
dry_run=False, preset="full", overwrite=False,
migrate_secrets=False, workspace_target=None,
skill_conflict="skip", yes=False,
)
with (
patch.object(claw_mod, "_find_migration_script", return_value=tmp_path / "s.py"),
patch.object(claw_mod, "prompt_yes_no", return_value=False),
):
claw_mod._cmd_migrate(args)
captured = capsys.readouterr()
assert "Migration cancelled" in captured.out
def test_execute_with_yes_skips_confirmation(self, tmp_path, capsys):
openclaw_dir = tmp_path / ".openclaw"
openclaw_dir.mkdir()
config_path = tmp_path / "config.yaml"
config_path.write_text("")
fake_mod = ModuleType("openclaw_to_hermes")
fake_mod.resolve_selected_options = MagicMock(return_value=set())
fake_migrator = MagicMock()
fake_migrator.migrate.return_value = {
"summary": {"migrated": 0, "skipped": 0, "conflict": 0, "error": 0},
"items": [],
}
fake_mod.Migrator = MagicMock(return_value=fake_migrator)
args = Namespace(
source=str(openclaw_dir),
dry_run=False, preset="full", overwrite=False,
migrate_secrets=False, workspace_target=None,
skill_conflict="skip", yes=True,
)
with (
patch.object(claw_mod, "_find_migration_script", return_value=tmp_path / "s.py"),
patch.object(claw_mod, "_load_migration_module", return_value=fake_mod),
patch.object(claw_mod, "get_config_path", return_value=config_path),
patch.object(claw_mod, "prompt_yes_no") as mock_prompt,
):
claw_mod._cmd_migrate(args)
mock_prompt.assert_not_called()
def test_handles_migration_error(self, tmp_path, capsys):
openclaw_dir = tmp_path / ".openclaw"
openclaw_dir.mkdir()
config_path = tmp_path / "config.yaml"
config_path.write_text("")
args = Namespace(
source=str(openclaw_dir),
dry_run=True, preset="full", overwrite=False,
migrate_secrets=False, workspace_target=None,
skill_conflict="skip", yes=False,
)
with (
patch.object(claw_mod, "_find_migration_script", return_value=tmp_path / "s.py"),
patch.object(claw_mod, "_load_migration_module", side_effect=RuntimeError("boom")),
patch.object(claw_mod, "get_config_path", return_value=config_path),
patch.object(claw_mod, "save_config"),
patch.object(claw_mod, "load_config", return_value={}),
):
claw_mod._cmd_migrate(args)
captured = capsys.readouterr()
assert "Migration failed" in captured.out
def test_full_preset_enables_secrets(self, tmp_path, capsys):
"""The 'full' preset should set migrate_secrets=True automatically."""
openclaw_dir = tmp_path / ".openclaw"
openclaw_dir.mkdir()
fake_mod = ModuleType("openclaw_to_hermes")
fake_mod.resolve_selected_options = MagicMock(return_value=set())
fake_migrator = MagicMock()
fake_migrator.migrate.return_value = {
"summary": {"migrated": 0, "skipped": 0, "conflict": 0, "error": 0},
"items": [],
}
fake_mod.Migrator = MagicMock(return_value=fake_migrator)
args = Namespace(
source=str(openclaw_dir),
dry_run=True, preset="full", overwrite=False,
migrate_secrets=False, # Not explicitly set by user
workspace_target=None,
skill_conflict="skip", yes=False,
)
with (
patch.object(claw_mod, "_find_migration_script", return_value=tmp_path / "s.py"),
patch.object(claw_mod, "_load_migration_module", return_value=fake_mod),
patch.object(claw_mod, "get_config_path", return_value=tmp_path / "config.yaml"),
patch.object(claw_mod, "save_config"),
patch.object(claw_mod, "load_config", return_value={}),
):
claw_mod._cmd_migrate(args)
# Migrator should have been called with migrate_secrets=True
call_kwargs = fake_mod.Migrator.call_args[1]
assert call_kwargs["migrate_secrets"] is True
# ---------------------------------------------------------------------------
# _print_migration_report
# ---------------------------------------------------------------------------
class TestPrintMigrationReport:
"""Test the report formatting function."""
def test_dry_run_report(self, capsys):
report = {
"summary": {"migrated": 2, "skipped": 1, "conflict": 1, "error": 0},
"items": [
{"kind": "soul", "status": "migrated", "destination": "/home/user/.hermes/SOUL.md"},
{"kind": "memory", "status": "migrated", "destination": "/home/user/.hermes/memories/MEMORY.md"},
{"kind": "skills", "status": "conflict", "reason": "already exists"},
{"kind": "tts-assets", "status": "skipped", "reason": "not found"},
],
"preset": "full",
}
claw_mod._print_migration_report(report, dry_run=True)
captured = capsys.readouterr()
assert "Dry Run Results" in captured.out
assert "Would migrate" in captured.out
assert "2 would migrate" in captured.out
assert "--dry-run" in captured.out
def test_execute_report(self, capsys):
report = {
"summary": {"migrated": 3, "skipped": 0, "conflict": 0, "error": 0},
"items": [
{"kind": "soul", "status": "migrated", "destination": "/home/user/.hermes/SOUL.md"},
],
"output_dir": "/home/user/.hermes/migration/openclaw/20250312T120000",
}
claw_mod._print_migration_report(report, dry_run=False)
captured = capsys.readouterr()
assert "Migration Results" in captured.out
assert "Migrated" in captured.out
assert "Full report saved to" in captured.out
def test_empty_report(self, capsys):
report = {
"summary": {"migrated": 0, "skipped": 0, "conflict": 0, "error": 0},
"items": [],
}
claw_mod._print_migration_report(report, dry_run=False)
captured = capsys.readouterr()
assert "Nothing to migrate" in captured.out

View file

@ -160,7 +160,8 @@ class TestValidateFormatChecks:
def test_no_slash_model_rejected_if_not_in_api(self): def test_no_slash_model_rejected_if_not_in_api(self):
result = _validate("gpt-5.4", api_models=["openai/gpt-5.4"]) result = _validate("gpt-5.4", api_models=["openai/gpt-5.4"])
assert result["accepted"] is False assert result["accepted"] is True
assert "not found" in result["message"]
# -- validate — API found ---------------------------------------------------- # -- validate — API found ----------------------------------------------------
@ -184,37 +185,39 @@ class TestValidateApiFound:
# -- validate — API not found ------------------------------------------------ # -- validate — API not found ------------------------------------------------
class TestValidateApiNotFound: class TestValidateApiNotFound:
def test_model_not_in_api_rejected(self): def test_model_not_in_api_accepted_with_warning(self):
result = _validate("anthropic/claude-nonexistent") result = _validate("anthropic/claude-nonexistent")
assert result["accepted"] is False assert result["accepted"] is True
assert "not a valid model" in result["message"] assert result["persist"] is True
assert "not found" in result["message"]
def test_rejection_includes_suggestions(self): def test_warning_includes_suggestions(self):
result = _validate("anthropic/claude-opus-4.5") result = _validate("anthropic/claude-opus-4.5")
assert result["accepted"] is False assert result["accepted"] is True
assert "Did you mean" in result["message"] assert "Similar models" in result["message"]
# -- validate — API unreachable (fallback) ----------------------------------- # -- validate — API unreachable — accept and persist everything ----------------
class TestValidateApiFallback: class TestValidateApiFallback:
def test_known_catalog_model_accepted_when_api_down(self): def test_any_model_accepted_when_api_down(self):
result = _validate("anthropic/claude-opus-4.6", api_models=None) result = _validate("anthropic/claude-opus-4.6", api_models=None)
assert result["accepted"] is True assert result["accepted"] is True
assert result["persist"] is True assert result["persist"] is True
def test_unknown_model_session_only_when_api_down(self): def test_unknown_model_also_accepted_when_api_down(self):
"""No hardcoded catalog gatekeeping — accept, persist, and warn."""
result = _validate("anthropic/claude-next-gen", api_models=None) result = _validate("anthropic/claude-next-gen", api_models=None)
assert result["accepted"] is True assert result["accepted"] is True
assert result["persist"] is False assert result["persist"] is True
assert "session only" in result["message"].lower() assert "could not reach" in result["message"].lower()
def test_zai_known_model_accepted_when_api_down(self): def test_zai_model_accepted_when_api_down(self):
result = _validate("glm-5", provider="zai", api_models=None) result = _validate("glm-5", provider="zai", api_models=None)
assert result["accepted"] is True assert result["accepted"] is True
assert result["persist"] is True assert result["persist"] is True
def test_unknown_provider_session_only_when_api_down(self): def test_unknown_provider_accepted_when_api_down(self):
result = _validate("some-model", provider="totally-unknown", api_models=None) result = _validate("some-model", provider="totally-unknown", api_models=None)
assert result["accepted"] is True assert result["accepted"] is True
assert result["persist"] is False assert result["persist"] is True

View file

@ -0,0 +1,284 @@
"""Tests for OpenClaw migration integration in the setup wizard."""
from argparse import Namespace
from types import ModuleType
from unittest.mock import MagicMock, patch
from hermes_cli import setup as setup_mod
# ---------------------------------------------------------------------------
# _offer_openclaw_migration — unit tests
# ---------------------------------------------------------------------------
class TestOfferOpenclawMigration:
"""Test the _offer_openclaw_migration helper in isolation."""
def test_skips_when_no_openclaw_dir(self, tmp_path):
"""Should return False immediately when ~/.openclaw does not exist."""
with patch("hermes_cli.setup.Path.home", return_value=tmp_path):
assert setup_mod._offer_openclaw_migration(tmp_path / ".hermes") is False
def test_skips_when_migration_script_missing(self, tmp_path):
"""Should return False when the migration script file is absent."""
openclaw_dir = tmp_path / ".openclaw"
openclaw_dir.mkdir()
with (
patch("hermes_cli.setup.Path.home", return_value=tmp_path),
patch.object(setup_mod, "_OPENCLAW_SCRIPT", tmp_path / "nonexistent.py"),
):
assert setup_mod._offer_openclaw_migration(tmp_path / ".hermes") is False
def test_skips_when_user_declines(self, tmp_path):
"""Should return False when user declines the migration prompt."""
openclaw_dir = tmp_path / ".openclaw"
openclaw_dir.mkdir()
script = tmp_path / "openclaw_to_hermes.py"
script.write_text("# placeholder")
with (
patch("hermes_cli.setup.Path.home", return_value=tmp_path),
patch.object(setup_mod, "_OPENCLAW_SCRIPT", script),
patch.object(setup_mod, "prompt_yes_no", return_value=False),
):
assert setup_mod._offer_openclaw_migration(tmp_path / ".hermes") is False
def test_runs_migration_when_user_accepts(self, tmp_path):
"""Should dynamically load the script and run the Migrator."""
openclaw_dir = tmp_path / ".openclaw"
openclaw_dir.mkdir()
# Create a fake hermes home with config
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
config_path = hermes_home / "config.yaml"
config_path.write_text("agent:\n max_turns: 90\n")
# Build a fake migration module
fake_mod = ModuleType("openclaw_to_hermes")
fake_mod.resolve_selected_options = MagicMock(return_value={"soul", "memory"})
fake_migrator = MagicMock()
fake_migrator.migrate.return_value = {
"summary": {"migrated": 3, "skipped": 1, "conflict": 0, "error": 0},
"output_dir": str(hermes_home / "migration"),
}
fake_mod.Migrator = MagicMock(return_value=fake_migrator)
script = tmp_path / "openclaw_to_hermes.py"
script.write_text("# placeholder")
with (
patch("hermes_cli.setup.Path.home", return_value=tmp_path),
patch.object(setup_mod, "_OPENCLAW_SCRIPT", script),
patch.object(setup_mod, "prompt_yes_no", return_value=True),
patch.object(setup_mod, "get_config_path", return_value=config_path),
patch("importlib.util.spec_from_file_location") as mock_spec_fn,
):
# Wire up the fake module loading
mock_spec = MagicMock()
mock_spec.loader = MagicMock()
mock_spec_fn.return_value = mock_spec
def exec_module(mod):
mod.resolve_selected_options = fake_mod.resolve_selected_options
mod.Migrator = fake_mod.Migrator
mock_spec.loader.exec_module = exec_module
result = setup_mod._offer_openclaw_migration(hermes_home)
assert result is True
fake_mod.resolve_selected_options.assert_called_once_with(
None, None, preset="full"
)
fake_mod.Migrator.assert_called_once()
call_kwargs = fake_mod.Migrator.call_args[1]
assert call_kwargs["execute"] is True
assert call_kwargs["overwrite"] is False
assert call_kwargs["migrate_secrets"] is True
assert call_kwargs["preset_name"] == "full"
fake_migrator.migrate.assert_called_once()
def test_handles_migration_error_gracefully(self, tmp_path):
"""Should catch exceptions and return False."""
openclaw_dir = tmp_path / ".openclaw"
openclaw_dir.mkdir()
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
config_path = hermes_home / "config.yaml"
config_path.write_text("")
script = tmp_path / "openclaw_to_hermes.py"
script.write_text("# placeholder")
with (
patch("hermes_cli.setup.Path.home", return_value=tmp_path),
patch.object(setup_mod, "_OPENCLAW_SCRIPT", script),
patch.object(setup_mod, "prompt_yes_no", return_value=True),
patch.object(setup_mod, "get_config_path", return_value=config_path),
patch(
"importlib.util.spec_from_file_location",
side_effect=RuntimeError("boom"),
),
):
result = setup_mod._offer_openclaw_migration(hermes_home)
assert result is False
def test_creates_config_if_missing(self, tmp_path):
"""Should bootstrap config.yaml before running migration."""
openclaw_dir = tmp_path / ".openclaw"
openclaw_dir.mkdir()
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
config_path = hermes_home / "config.yaml"
# config does NOT exist yet
script = tmp_path / "openclaw_to_hermes.py"
script.write_text("# placeholder")
with (
patch("hermes_cli.setup.Path.home", return_value=tmp_path),
patch.object(setup_mod, "_OPENCLAW_SCRIPT", script),
patch.object(setup_mod, "prompt_yes_no", return_value=True),
patch.object(setup_mod, "get_config_path", return_value=config_path),
patch.object(setup_mod, "load_config", return_value={"agent": {}}),
patch.object(setup_mod, "save_config") as mock_save,
patch(
"importlib.util.spec_from_file_location",
side_effect=RuntimeError("stop early"),
),
):
setup_mod._offer_openclaw_migration(hermes_home)
# save_config should have been called to bootstrap the file
mock_save.assert_called_once_with({"agent": {}})
# ---------------------------------------------------------------------------
# Integration with run_setup_wizard — first-time flow
# ---------------------------------------------------------------------------
def _first_time_args() -> Namespace:
return Namespace(
section=None,
non_interactive=False,
reset=False,
)
class TestSetupWizardOpenclawIntegration:
"""Verify _offer_openclaw_migration is called during first-time setup."""
def test_migration_offered_during_first_time_setup(self, tmp_path):
"""On first-time setup, _offer_openclaw_migration should be called."""
args = _first_time_args()
with (
patch.object(setup_mod, "ensure_hermes_home"),
patch.object(setup_mod, "load_config", return_value={}),
patch.object(setup_mod, "get_hermes_home", return_value=tmp_path),
patch.object(setup_mod, "get_env_value", return_value=""),
patch("hermes_cli.auth.get_active_provider", return_value=None),
# User presses Enter to start
patch("builtins.input", return_value=""),
# Mock the migration offer
patch.object(
setup_mod, "_offer_openclaw_migration", return_value=False
) as mock_migration,
# Mock the actual setup sections so they don't run
patch.object(setup_mod, "setup_model_provider"),
patch.object(setup_mod, "setup_terminal_backend"),
patch.object(setup_mod, "setup_agent_settings"),
patch.object(setup_mod, "setup_gateway"),
patch.object(setup_mod, "setup_tools"),
patch.object(setup_mod, "save_config"),
patch.object(setup_mod, "_print_setup_summary"),
):
setup_mod.run_setup_wizard(args)
mock_migration.assert_called_once_with(tmp_path)
def test_migration_reloads_config_on_success(self, tmp_path):
"""When migration returns True, config should be reloaded."""
args = _first_time_args()
call_order = []
def tracking_load_config():
call_order.append("load_config")
return {}
with (
patch.object(setup_mod, "ensure_hermes_home"),
patch.object(setup_mod, "load_config", side_effect=tracking_load_config),
patch.object(setup_mod, "get_hermes_home", return_value=tmp_path),
patch.object(setup_mod, "get_env_value", return_value=""),
patch("hermes_cli.auth.get_active_provider", return_value=None),
patch("builtins.input", return_value=""),
patch.object(setup_mod, "_offer_openclaw_migration", return_value=True),
patch.object(setup_mod, "setup_model_provider"),
patch.object(setup_mod, "setup_terminal_backend"),
patch.object(setup_mod, "setup_agent_settings"),
patch.object(setup_mod, "setup_gateway"),
patch.object(setup_mod, "setup_tools"),
patch.object(setup_mod, "save_config"),
patch.object(setup_mod, "_print_setup_summary"),
):
setup_mod.run_setup_wizard(args)
# load_config called twice: once at start, once after migration
assert call_order.count("load_config") == 2
def test_reloaded_config_flows_into_remaining_setup_sections(self, tmp_path):
args = _first_time_args()
initial_config = {}
reloaded_config = {"model": {"provider": "openrouter"}}
with (
patch.object(setup_mod, "ensure_hermes_home"),
patch.object(
setup_mod,
"load_config",
side_effect=[initial_config, reloaded_config],
),
patch.object(setup_mod, "get_hermes_home", return_value=tmp_path),
patch.object(setup_mod, "get_env_value", return_value=""),
patch("hermes_cli.auth.get_active_provider", return_value=None),
patch("builtins.input", return_value=""),
patch.object(setup_mod, "_offer_openclaw_migration", return_value=True),
patch.object(setup_mod, "setup_model_provider") as setup_model_provider,
patch.object(setup_mod, "setup_terminal_backend"),
patch.object(setup_mod, "setup_agent_settings"),
patch.object(setup_mod, "setup_gateway"),
patch.object(setup_mod, "setup_tools"),
patch.object(setup_mod, "save_config"),
patch.object(setup_mod, "_print_setup_summary"),
):
setup_mod.run_setup_wizard(args)
setup_model_provider.assert_called_once_with(reloaded_config)
def test_migration_not_offered_for_existing_install(self, tmp_path):
"""Returning users should not see the migration prompt."""
args = _first_time_args()
with (
patch.object(setup_mod, "ensure_hermes_home"),
patch.object(setup_mod, "load_config", return_value={}),
patch.object(setup_mod, "get_hermes_home", return_value=tmp_path),
patch.object(
setup_mod,
"get_env_value",
side_effect=lambda k: "sk-xxx" if k == "OPENROUTER_API_KEY" else "",
),
patch("hermes_cli.auth.get_active_provider", return_value=None),
# Returning user picks "Exit"
patch.object(setup_mod, "prompt_choice", return_value=9),
patch.object(
setup_mod, "_offer_openclaw_migration", return_value=False
) as mock_migration,
):
setup_mod.run_setup_wizard(args)
mock_migration.assert_not_called()

141
tests/run_interrupt_test.py Normal file
View file

@ -0,0 +1,141 @@
#!/usr/bin/env python3
"""Run a real interrupt test with actual AIAgent + delegate child.
Not a pytest test runs directly as a script for live testing.
"""
import threading
import time
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from unittest.mock import MagicMock, patch
from run_agent import AIAgent, IterationBudget
from tools.delegate_tool import _run_single_child
from tools.interrupt import set_interrupt, is_interrupted
set_interrupt(False)
# Create parent agent (minimal)
parent = AIAgent.__new__(AIAgent)
parent._interrupt_requested = False
parent._interrupt_message = None
parent._active_children = []
parent.quiet_mode = True
parent.model = "test/model"
parent.base_url = "http://localhost:1"
parent.api_key = "test"
parent.provider = "test"
parent.api_mode = "chat_completions"
parent.platform = "cli"
parent.enabled_toolsets = ["terminal", "file"]
parent.providers_allowed = None
parent.providers_ignored = None
parent.providers_order = None
parent.provider_sort = None
parent.max_tokens = None
parent.reasoning_config = None
parent.prefill_messages = None
parent._session_db = None
parent._delegate_depth = 0
parent._delegate_spinner = None
parent.tool_progress_callback = None
parent.iteration_budget = IterationBudget(max_total=100)
parent._client_kwargs = {"api_key": "test", "base_url": "http://localhost:1"}
child_started = threading.Event()
result_holder = [None]
def run_delegate():
with patch("run_agent.OpenAI") as MockOpenAI:
mock_client = MagicMock()
def slow_create(**kwargs):
time.sleep(3)
resp = MagicMock()
resp.choices = [MagicMock()]
resp.choices[0].message.content = "Done"
resp.choices[0].message.tool_calls = None
resp.choices[0].message.refusal = None
resp.choices[0].finish_reason = "stop"
resp.usage.prompt_tokens = 100
resp.usage.completion_tokens = 10
resp.usage.total_tokens = 110
resp.usage.prompt_tokens_details = None
return resp
mock_client.chat.completions.create = slow_create
mock_client.close = MagicMock()
MockOpenAI.return_value = mock_client
original_init = AIAgent.__init__
def patched_init(self_agent, *a, **kw):
original_init(self_agent, *a, **kw)
child_started.set()
with patch.object(AIAgent, "__init__", patched_init):
try:
result = _run_single_child(
task_index=0,
goal="Test slow task",
context=None,
toolsets=["terminal"],
model="test/model",
max_iterations=5,
parent_agent=parent,
task_count=1,
override_provider="test",
override_base_url="http://localhost:1",
override_api_key="test",
override_api_mode="chat_completions",
)
result_holder[0] = result
except Exception as e:
print(f"ERROR in delegate: {e}")
import traceback
traceback.print_exc()
print("Starting agent thread...")
agent_thread = threading.Thread(target=run_delegate, daemon=True)
agent_thread.start()
started = child_started.wait(timeout=10)
if not started:
print("ERROR: Child never started")
sys.exit(1)
time.sleep(0.5)
print(f"Active children: {len(parent._active_children)}")
for i, c in enumerate(parent._active_children):
print(f" Child {i}: _interrupt_requested={c._interrupt_requested}")
t0 = time.monotonic()
parent.interrupt("User typed a new message")
print(f"Called parent.interrupt()")
for i, c in enumerate(parent._active_children):
print(f" Child {i} after interrupt: _interrupt_requested={c._interrupt_requested}")
print(f"Global is_interrupted: {is_interrupted()}")
agent_thread.join(timeout=10)
elapsed = time.monotonic() - t0
print(f"Agent thread finished in {elapsed:.2f}s")
result = result_holder[0]
if result:
print(f"Status: {result['status']}")
print(f"Duration: {result['duration_seconds']}s")
if elapsed < 2.0:
print("✅ PASS: Interrupt detected quickly!")
else:
print(f"❌ FAIL: Took {elapsed:.2f}s — interrupt was too slow or not detected")
else:
print("❌ FAIL: No result!")
set_interrupt(False)

View file

@ -0,0 +1,171 @@
"""End-to-end test simulating CLI interrupt during subagent execution.
Reproduces the exact scenario:
1. Parent agent calls delegate_task
2. Child agent is running (simulated with a slow tool)
3. User "types a message" (simulated by calling parent.interrupt from another thread)
4. Child should detect the interrupt and stop
This tests the COMPLETE path including _run_single_child, _active_children
registration, interrupt propagation, and child detection.
"""
import json
import os
import queue
import threading
import time
import unittest
from unittest.mock import MagicMock, patch, PropertyMock
from tools.interrupt import set_interrupt, is_interrupted
class TestCLISubagentInterrupt(unittest.TestCase):
"""Simulate exact CLI scenario."""
def setUp(self):
set_interrupt(False)
def tearDown(self):
set_interrupt(False)
def test_full_delegate_interrupt_flow(self):
"""Full integration: parent runs delegate_task, main thread interrupts."""
from run_agent import AIAgent
interrupt_detected = threading.Event()
child_started = threading.Event()
child_api_call_count = 0
# Create a real-enough parent agent
parent = AIAgent.__new__(AIAgent)
parent._interrupt_requested = False
parent._interrupt_message = None
parent._active_children = []
parent.quiet_mode = True
parent.model = "test/model"
parent.base_url = "http://localhost:1"
parent.api_key = "test"
parent.provider = "test"
parent.api_mode = "chat_completions"
parent.platform = "cli"
parent.enabled_toolsets = ["terminal", "file"]
parent.providers_allowed = None
parent.providers_ignored = None
parent.providers_order = None
parent.provider_sort = None
parent.max_tokens = None
parent.reasoning_config = None
parent.prefill_messages = None
parent._session_db = None
parent._delegate_depth = 0
parent._delegate_spinner = None
parent.tool_progress_callback = None
# We'll track what happens with _active_children
original_children = parent._active_children
# Mock the child's run_conversation to simulate a slow operation
# that checks _interrupt_requested like the real one does
def mock_child_run_conversation(user_message, **kwargs):
child_started.set()
# Find the child in parent._active_children
child = parent._active_children[-1] if parent._active_children else None
# Simulate the agent loop: poll _interrupt_requested like run_conversation does
for i in range(100): # Up to 10 seconds (100 * 0.1s)
if child and child._interrupt_requested:
interrupt_detected.set()
return {
"final_response": "Interrupted!",
"messages": [],
"api_calls": 1,
"completed": False,
"interrupted": True,
"interrupt_message": child._interrupt_message,
}
time.sleep(0.1)
return {
"final_response": "Finished without interrupt",
"messages": [],
"api_calls": 5,
"completed": True,
"interrupted": False,
}
# Patch AIAgent to use our mock
from tools.delegate_tool import _run_single_child
from run_agent import IterationBudget
parent.iteration_budget = IterationBudget(max_total=100)
# Run delegate in a thread (simulates agent_thread)
delegate_result = [None]
delegate_error = [None]
def run_delegate():
try:
with patch('run_agent.AIAgent') as MockAgent:
mock_instance = MagicMock()
mock_instance._interrupt_requested = False
mock_instance._interrupt_message = None
mock_instance._active_children = []
mock_instance.quiet_mode = True
mock_instance.run_conversation = mock_child_run_conversation
mock_instance.interrupt = lambda msg=None: setattr(mock_instance, '_interrupt_requested', True) or setattr(mock_instance, '_interrupt_message', msg)
mock_instance.tools = []
MockAgent.return_value = mock_instance
result = _run_single_child(
task_index=0,
goal="Do something slow",
context=None,
toolsets=["terminal"],
model=None,
max_iterations=50,
parent_agent=parent,
task_count=1,
)
delegate_result[0] = result
except Exception as e:
delegate_error[0] = e
agent_thread = threading.Thread(target=run_delegate, daemon=True)
agent_thread.start()
# Wait for child to start
assert child_started.wait(timeout=5), "Child never started!"
# Now simulate user interrupt (from main/process thread)
time.sleep(0.2) # Give child a moment to be in its loop
print(f"Parent has {len(parent._active_children)} active children")
assert len(parent._active_children) >= 1, f"Expected child in _active_children, got {len(parent._active_children)}"
# This is what the CLI does:
parent.interrupt("Hey stop that")
print(f"Parent._interrupt_requested: {parent._interrupt_requested}")
for i, child in enumerate(parent._active_children):
print(f"Child {i}._interrupt_requested: {child._interrupt_requested}")
# Wait for child to detect interrupt
detected = interrupt_detected.wait(timeout=3.0)
# Wait for delegate to finish
agent_thread.join(timeout=5)
if delegate_error[0]:
raise delegate_error[0]
assert detected, "Child never detected the interrupt!"
result = delegate_result[0]
assert result is not None, "Delegate returned no result"
assert result["status"] == "interrupted", f"Expected 'interrupted', got '{result['status']}'"
print(f"✓ Interrupt detected! Result: {result}")
if __name__ == "__main__":
unittest.main()

View file

@ -31,7 +31,7 @@ class TestModelCommand:
assert cli_obj.model == "anthropic/claude-sonnet-4.5" assert cli_obj.model == "anthropic/claude-sonnet-4.5"
save_mock.assert_called_once_with("model.default", "anthropic/claude-sonnet-4.5") save_mock.assert_called_once_with("model.default", "anthropic/claude-sonnet-4.5")
def test_invalid_model_from_api_is_rejected(self, capsys): def test_unlisted_model_accepted_with_warning(self, capsys):
cli_obj = self._make_cli() cli_obj = self._make_cli()
with patch("hermes_cli.models.fetch_api_models", with patch("hermes_cli.models.fetch_api_models",
@ -40,12 +40,10 @@ class TestModelCommand:
cli_obj.process_command("/model anthropic/fake-model") cli_obj.process_command("/model anthropic/fake-model")
output = capsys.readouterr().out output = capsys.readouterr().out
assert "not a valid model" in output assert "not found" in output or "Model changed" in output
assert "Model unchanged" in output assert cli_obj.model == "anthropic/fake-model" # accepted
assert cli_obj.model == "anthropic/claude-opus-4.6"
save_mock.assert_not_called()
def test_api_unreachable_falls_back_session_only(self, capsys): def test_api_unreachable_accepts_and_persists(self, capsys):
cli_obj = self._make_cli() cli_obj = self._make_cli()
with patch("hermes_cli.models.fetch_api_models", return_value=None), \ with patch("hermes_cli.models.fetch_api_models", return_value=None), \
@ -53,12 +51,11 @@ class TestModelCommand:
cli_obj.process_command("/model anthropic/claude-sonnet-next") cli_obj.process_command("/model anthropic/claude-sonnet-next")
output = capsys.readouterr().out output = capsys.readouterr().out
assert "session only" in output assert "saved to config" in output
assert "will revert on restart" in output
assert cli_obj.model == "anthropic/claude-sonnet-next" assert cli_obj.model == "anthropic/claude-sonnet-next"
save_mock.assert_not_called() save_mock.assert_called_once()
def test_no_slash_model_probes_api_and_rejects(self, capsys): def test_no_slash_model_accepted_with_warning(self, capsys):
cli_obj = self._make_cli() cli_obj = self._make_cli()
with patch("hermes_cli.models.fetch_api_models", with patch("hermes_cli.models.fetch_api_models",
@ -67,11 +64,8 @@ class TestModelCommand:
cli_obj.process_command("/model gpt-5.4") cli_obj.process_command("/model gpt-5.4")
output = capsys.readouterr().out output = capsys.readouterr().out
assert "not a valid model" in output # Model is accepted (with warning) even if not in API listing
assert "Model unchanged" in output assert cli_obj.model == "gpt-5.4"
assert cli_obj.model == "anthropic/claude-opus-4.6" # unchanged
assert cli_obj.agent is not None # not reset
save_mock.assert_not_called()
def test_validation_crash_falls_back_to_save(self, capsys): def test_validation_crash_falls_back_to_save(self, capsys):
cli_obj = self._make_cli() cli_obj = self._make_cli()

View file

@ -0,0 +1,189 @@
#!/usr/bin/env python3
"""Interactive interrupt test that mimics the exact CLI flow.
Starts an agent in a thread with a mock delegate_task that takes a while,
then simulates the user typing a message via _interrupt_queue.
Logs every step to stderr (which isn't affected by redirect_stdout)
so we can see exactly where the interrupt gets lost.
"""
import contextlib
import io
import json
import logging
import queue
import sys
import threading
import time
import os
# Force stderr logging so redirect_stdout doesn't swallow it
logging.basicConfig(level=logging.DEBUG, stream=sys.stderr,
format="%(asctime)s [%(threadName)s] %(message)s")
log = logging.getLogger("interrupt_test")
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from unittest.mock import MagicMock, patch
from run_agent import AIAgent, IterationBudget
from tools.interrupt import set_interrupt, is_interrupted
set_interrupt(False)
# ─── Create parent agent ───
parent = AIAgent.__new__(AIAgent)
parent._interrupt_requested = False
parent._interrupt_message = None
parent._active_children = []
parent.quiet_mode = True
parent.model = "test/model"
parent.base_url = "http://localhost:1"
parent.api_key = "test"
parent.provider = "test"
parent.api_mode = "chat_completions"
parent.platform = "cli"
parent.enabled_toolsets = ["terminal", "file"]
parent.providers_allowed = None
parent.providers_ignored = None
parent.providers_order = None
parent.provider_sort = None
parent.max_tokens = None
parent.reasoning_config = None
parent.prefill_messages = None
parent._session_db = None
parent._delegate_depth = 0
parent._delegate_spinner = None
parent.tool_progress_callback = None
parent.iteration_budget = IterationBudget(max_total=100)
parent._client_kwargs = {"api_key": "test", "base_url": "http://localhost:1"}
# Monkey-patch parent.interrupt to log
_original_interrupt = AIAgent.interrupt
def logged_interrupt(self, message=None):
log.info(f"🔴 parent.interrupt() called with: {message!r}")
log.info(f" _active_children count: {len(self._active_children)}")
_original_interrupt(self, message)
log.info(f" After interrupt: _interrupt_requested={self._interrupt_requested}")
for i, c in enumerate(self._active_children):
log.info(f" Child {i}._interrupt_requested={c._interrupt_requested}")
parent.interrupt = lambda msg=None: logged_interrupt(parent, msg)
# ─── Simulate the exact CLI flow ───
interrupt_queue = queue.Queue()
child_running = threading.Event()
agent_result = [None]
def make_slow_response(delay=2.0):
"""API response that takes a while."""
def create(**kwargs):
log.info(f" 🌐 Mock API call starting (will take {delay}s)...")
time.sleep(delay)
log.info(f" 🌐 Mock API call completed")
resp = MagicMock()
resp.choices = [MagicMock()]
resp.choices[0].message.content = "Done with the task"
resp.choices[0].message.tool_calls = None
resp.choices[0].message.refusal = None
resp.choices[0].finish_reason = "stop"
resp.usage.prompt_tokens = 100
resp.usage.completion_tokens = 10
resp.usage.total_tokens = 110
resp.usage.prompt_tokens_details = None
return resp
return create
def agent_thread_func():
"""Simulates the agent_thread in cli.py's chat() method."""
log.info("🟢 agent_thread starting")
with patch("run_agent.OpenAI") as MockOpenAI:
mock_client = MagicMock()
mock_client.chat.completions.create = make_slow_response(delay=3.0)
mock_client.close = MagicMock()
MockOpenAI.return_value = mock_client
from tools.delegate_tool import _run_single_child
# Signal that child is about to start
original_init = AIAgent.__init__
def patched_init(self_agent, *a, **kw):
log.info("🟡 Child AIAgent.__init__ called")
original_init(self_agent, *a, **kw)
child_running.set()
log.info(f"🟡 Child started, parent._active_children = {len(parent._active_children)}")
with patch.object(AIAgent, "__init__", patched_init):
result = _run_single_child(
task_index=0,
goal="Do a slow thing",
context=None,
toolsets=["terminal"],
model="test/model",
max_iterations=3,
parent_agent=parent,
task_count=1,
override_provider="test",
override_base_url="http://localhost:1",
override_api_key="test",
override_api_mode="chat_completions",
)
agent_result[0] = result
log.info(f"🟢 agent_thread finished. Result status: {result.get('status')}")
# ─── Start agent thread (like chat() does) ───
agent_thread = threading.Thread(target=agent_thread_func, name="agent_thread", daemon=True)
agent_thread.start()
# ─── Wait for child to start ───
if not child_running.wait(timeout=10):
print("FAIL: Child never started", file=sys.stderr)
sys.exit(1)
# Give child time to enter its main loop and start API call
time.sleep(1.0)
# ─── Simulate user typing a message (like handle_enter does) ───
log.info("📝 Simulating user typing 'Hey stop that'")
interrupt_queue.put("Hey stop that")
# ─── Simulate chat() polling loop (like the real chat() method) ───
log.info("📡 Starting interrupt queue polling (like chat())")
interrupt_msg = None
poll_count = 0
while agent_thread.is_alive():
try:
interrupt_msg = interrupt_queue.get(timeout=0.1)
if interrupt_msg:
log.info(f"📨 Got interrupt message from queue: {interrupt_msg!r}")
log.info(f" Calling parent.interrupt()...")
parent.interrupt(interrupt_msg)
log.info(f" parent.interrupt() returned. Breaking poll loop.")
break
except queue.Empty:
poll_count += 1
if poll_count % 20 == 0: # Log every 2s
log.info(f" Still polling ({poll_count} iterations)...")
# ─── Wait for agent to finish ───
log.info("⏳ Waiting for agent_thread to join...")
t0 = time.monotonic()
agent_thread.join(timeout=10)
elapsed = time.monotonic() - t0
log.info(f"✅ agent_thread joined after {elapsed:.2f}s")
# ─── Check results ───
result = agent_result[0]
if result:
log.info(f"Result status: {result['status']}")
log.info(f"Result duration: {result['duration_seconds']}s")
if result["status"] == "interrupted" and elapsed < 2.0:
print("✅ PASS: Interrupt worked correctly!", file=sys.stderr)
else:
print(f"❌ FAIL: status={result['status']}, elapsed={elapsed:.2f}s", file=sys.stderr)
else:
print("❌ FAIL: No result returned", file=sys.stderr)
set_interrupt(False)

View file

@ -0,0 +1,155 @@
"""Test interrupt propagation from parent to child agents.
Reproduces the CLI scenario: user sends a message while delegate_task is
running, main thread calls parent.interrupt(), child should stop.
"""
import json
import threading
import time
import unittest
from unittest.mock import MagicMock, patch, PropertyMock
from tools.interrupt import set_interrupt, is_interrupted, _interrupt_event
class TestInterruptPropagationToChild(unittest.TestCase):
"""Verify interrupt propagates from parent to child agent."""
def setUp(self):
set_interrupt(False)
def tearDown(self):
set_interrupt(False)
def test_parent_interrupt_sets_child_flag(self):
"""When parent.interrupt() is called, child._interrupt_requested should be set."""
from run_agent import AIAgent
parent = AIAgent.__new__(AIAgent)
parent._interrupt_requested = False
parent._interrupt_message = None
parent._active_children = []
parent.quiet_mode = True
child = AIAgent.__new__(AIAgent)
child._interrupt_requested = False
child._interrupt_message = None
child._active_children = []
child.quiet_mode = True
parent._active_children.append(child)
parent.interrupt("new user message")
assert parent._interrupt_requested is True
assert child._interrupt_requested is True
assert child._interrupt_message == "new user message"
assert is_interrupted() is True
def test_child_clear_interrupt_at_start_clears_global(self):
"""child.clear_interrupt() at start of run_conversation clears the GLOBAL event.
This is the intended behavior at startup, but verify it doesn't
accidentally clear an interrupt intended for a running child.
"""
from run_agent import AIAgent
child = AIAgent.__new__(AIAgent)
child._interrupt_requested = True
child._interrupt_message = "msg"
child.quiet_mode = True
child._active_children = []
# Global is set
set_interrupt(True)
assert is_interrupted() is True
# child.clear_interrupt() clears both
child.clear_interrupt()
assert child._interrupt_requested is False
assert is_interrupted() is False
def test_interrupt_during_child_api_call_detected(self):
"""Interrupt set during _interruptible_api_call is detected within 0.5s."""
from run_agent import AIAgent
child = AIAgent.__new__(AIAgent)
child._interrupt_requested = False
child._interrupt_message = None
child._active_children = []
child.quiet_mode = True
child.api_mode = "chat_completions"
child.log_prefix = ""
child._client_kwargs = {"api_key": "test", "base_url": "http://localhost:1234"}
# Mock a slow API call
mock_client = MagicMock()
def slow_api_call(**kwargs):
time.sleep(5) # Would take 5s normally
return MagicMock()
mock_client.chat.completions.create = slow_api_call
mock_client.close = MagicMock()
child.client = mock_client
# Set interrupt after 0.2s from another thread
def set_interrupt_later():
time.sleep(0.2)
child.interrupt("stop!")
t = threading.Thread(target=set_interrupt_later, daemon=True)
t.start()
start = time.monotonic()
try:
child._interruptible_api_call({"model": "test", "messages": []})
self.fail("Should have raised InterruptedError")
except InterruptedError:
elapsed = time.monotonic() - start
# Should detect within ~0.5s (0.2s delay + 0.3s poll interval)
assert elapsed < 1.0, f"Took {elapsed:.2f}s to detect interrupt (expected < 1.0s)"
finally:
t.join(timeout=2)
set_interrupt(False)
def test_concurrent_interrupt_propagation(self):
"""Simulates exact CLI flow: parent runs delegate in thread, main thread interrupts."""
from run_agent import AIAgent
parent = AIAgent.__new__(AIAgent)
parent._interrupt_requested = False
parent._interrupt_message = None
parent._active_children = []
parent.quiet_mode = True
child = AIAgent.__new__(AIAgent)
child._interrupt_requested = False
child._interrupt_message = None
child._active_children = []
child.quiet_mode = True
# Register child (simulating what _run_single_child does)
parent._active_children.append(child)
# Simulate child running (checking flag in a loop)
child_detected = threading.Event()
def simulate_child_loop():
while not child._interrupt_requested:
time.sleep(0.05)
child_detected.set()
child_thread = threading.Thread(target=simulate_child_loop, daemon=True)
child_thread.start()
# Small delay, then interrupt from "main thread"
time.sleep(0.1)
parent.interrupt("user typed something new")
# Child should detect within 200ms
detected = child_detected.wait(timeout=1.0)
assert detected, "Child never detected the interrupt!"
child_thread.join(timeout=1)
set_interrupt(False)
if __name__ == "__main__":
unittest.main()

View file

@ -0,0 +1,176 @@
"""Test real interrupt propagation through delegate_task with actual AIAgent.
This uses a real AIAgent with mocked HTTP responses to test the complete
interrupt flow through _run_single_child child.run_conversation().
"""
import json
import os
import threading
import time
import unittest
from unittest.mock import MagicMock, patch, PropertyMock
from tools.interrupt import set_interrupt, is_interrupted
def _make_slow_api_response(delay=5.0):
"""Create a mock that simulates a slow API response (like a real LLM call)."""
def slow_create(**kwargs):
# Simulate a slow API call
time.sleep(delay)
# Return a simple text response (no tool calls)
resp = MagicMock()
resp.choices = [MagicMock()]
resp.choices[0].message = MagicMock()
resp.choices[0].message.content = "Done"
resp.choices[0].message.tool_calls = None
resp.choices[0].message.refusal = None
resp.choices[0].finish_reason = "stop"
resp.usage = MagicMock()
resp.usage.prompt_tokens = 100
resp.usage.completion_tokens = 10
resp.usage.total_tokens = 110
resp.usage.prompt_tokens_details = None
return resp
return slow_create
class TestRealSubagentInterrupt(unittest.TestCase):
"""Test interrupt with real AIAgent child through delegate_tool."""
def setUp(self):
set_interrupt(False)
os.environ.setdefault("OPENAI_API_KEY", "test-key")
def tearDown(self):
set_interrupt(False)
def test_interrupt_child_during_api_call(self):
"""Real AIAgent child interrupted while making API call."""
from run_agent import AIAgent, IterationBudget
# Create a real parent agent (just enough to be a parent)
parent = AIAgent.__new__(AIAgent)
parent._interrupt_requested = False
parent._interrupt_message = None
parent._active_children = []
parent.quiet_mode = True
parent.model = "test/model"
parent.base_url = "http://localhost:1"
parent.api_key = "test"
parent.provider = "test"
parent.api_mode = "chat_completions"
parent.platform = "cli"
parent.enabled_toolsets = ["terminal", "file"]
parent.providers_allowed = None
parent.providers_ignored = None
parent.providers_order = None
parent.provider_sort = None
parent.max_tokens = None
parent.reasoning_config = None
parent.prefill_messages = None
parent._session_db = None
parent._delegate_depth = 0
parent._delegate_spinner = None
parent.tool_progress_callback = None
parent.iteration_budget = IterationBudget(max_total=100)
parent._client_kwargs = {"api_key": "test", "base_url": "http://localhost:1"}
from tools.delegate_tool import _run_single_child
child_started = threading.Event()
result_holder = [None]
error_holder = [None]
def run_delegate():
try:
# Patch the OpenAI client creation inside AIAgent.__init__
with patch('run_agent.OpenAI') as MockOpenAI:
mock_client = MagicMock()
# API call takes 5 seconds — should be interrupted before that
mock_client.chat.completions.create = _make_slow_api_response(delay=5.0)
mock_client.close = MagicMock()
MockOpenAI.return_value = mock_client
# Also need to patch the system prompt builder
with patch('run_agent.build_system_prompt', return_value="You are a test agent"):
# Signal when child starts
original_run = AIAgent.run_conversation
def patched_run(self_agent, *args, **kwargs):
child_started.set()
return original_run(self_agent, *args, **kwargs)
with patch.object(AIAgent, 'run_conversation', patched_run):
result = _run_single_child(
task_index=0,
goal="Test task",
context=None,
toolsets=["terminal"],
model="test/model",
max_iterations=5,
parent_agent=parent,
task_count=1,
override_provider="test",
override_base_url="http://localhost:1",
override_api_key="test",
override_api_mode="chat_completions",
)
result_holder[0] = result
except Exception as e:
import traceback
traceback.print_exc()
error_holder[0] = e
agent_thread = threading.Thread(target=run_delegate, daemon=True)
agent_thread.start()
# Wait for child to start run_conversation
started = child_started.wait(timeout=10)
if not started:
agent_thread.join(timeout=1)
if error_holder[0]:
raise error_holder[0]
self.fail("Child never started run_conversation")
# Give child time to enter main loop and start API call
time.sleep(0.5)
# Verify child is registered
print(f"Active children: {len(parent._active_children)}")
self.assertGreaterEqual(len(parent._active_children), 1,
"Child not registered in _active_children")
# Interrupt! (simulating what CLI does)
start = time.monotonic()
parent.interrupt("User typed a new message")
# Check propagation
child = parent._active_children[0] if parent._active_children else None
if child:
print(f"Child._interrupt_requested after parent.interrupt(): {child._interrupt_requested}")
self.assertTrue(child._interrupt_requested,
"Interrupt did not propagate to child!")
# Wait for delegate to finish (should be fast since interrupted)
agent_thread.join(timeout=5)
elapsed = time.monotonic() - start
if error_holder[0]:
raise error_holder[0]
result = result_holder[0]
self.assertIsNotNone(result, "Delegate returned no result")
print(f"Result status: {result['status']}, elapsed: {elapsed:.2f}s")
print(f"Full result: {result}")
# The child should have been interrupted, not completed the full 5s API call
self.assertLess(elapsed, 3.0,
f"Took {elapsed:.2f}s — interrupt was not detected quickly enough")
self.assertEqual(result["status"], "interrupted",
f"Expected 'interrupted', got '{result['status']}'")
if __name__ == "__main__":
unittest.main()

View file

@ -0,0 +1,54 @@
"""Verify that redirect_stdout in _run_single_child is process-wide.
This demonstrates that contextlib.redirect_stdout changes sys.stdout
for ALL threads, not just the current one. This means during subagent
execution, all output from other threads (including the CLI's process_thread)
is swallowed.
"""
import contextlib
import io
import sys
import threading
import time
import unittest
class TestRedirectStdoutIsProcessWide(unittest.TestCase):
def test_redirect_stdout_affects_other_threads(self):
"""contextlib.redirect_stdout changes sys.stdout for ALL threads."""
captured_from_other_thread = []
real_stdout = sys.stdout
other_thread_saw_devnull = threading.Event()
def other_thread_work():
"""Runs in a different thread, tries to use sys.stdout."""
time.sleep(0.2) # Let redirect_stdout take effect
# Check what sys.stdout is
if sys.stdout is not real_stdout:
other_thread_saw_devnull.set()
# Try to print — this should go to devnull
captured_from_other_thread.append(sys.stdout)
t = threading.Thread(target=other_thread_work, daemon=True)
t.start()
# redirect_stdout in main thread
devnull = io.StringIO()
with contextlib.redirect_stdout(devnull):
time.sleep(0.5) # Let the other thread check during redirect
t.join(timeout=2)
# The other thread should have seen devnull, NOT the real stdout
self.assertTrue(
other_thread_saw_devnull.is_set(),
"redirect_stdout was NOT process-wide — other thread still saw real stdout. "
"This test's premise is wrong."
)
print("Confirmed: redirect_stdout IS process-wide — affects all threads")
if __name__ == "__main__":
unittest.main()

View file

@ -572,14 +572,23 @@ class ClawHubSource(SkillSource):
logger.warning("ClawHub fetch failed for %s: could not resolve latest version", slug) logger.warning("ClawHub fetch failed for %s: could not resolve latest version", slug)
return None return None
version_data = self._get_json(f"{self.BASE_URL}/skills/{slug}/versions/{latest_version}") # Primary method: download the skill as a ZIP bundle from /download
if not isinstance(version_data, dict): files = self._download_zip(slug, latest_version)
return None
# Fallback: try the version metadata endpoint for inline/raw content
if "SKILL.md" not in files:
version_data = self._get_json(f"{self.BASE_URL}/skills/{slug}/versions/{latest_version}")
if isinstance(version_data, dict):
# Files may be nested under version_data["version"]["files"]
files = self._extract_files(version_data) or files
if "SKILL.md" not in files:
nested = version_data.get("version", {})
if isinstance(nested, dict):
files = self._extract_files(nested) or files
files = self._extract_files(version_data)
if "SKILL.md" not in files: if "SKILL.md" not in files:
logger.warning( logger.warning(
"ClawHub fetch for %s resolved version %s but no inline/raw file content was available", "ClawHub fetch for %s resolved version %s but could not retrieve file content",
slug, slug,
latest_version, latest_version,
) )
@ -674,6 +683,65 @@ class ClawHubSource(SkillSource):
return files return files
def _download_zip(self, slug: str, version: str) -> Dict[str, str]:
"""Download skill as a ZIP bundle from the /download endpoint and extract text files."""
import io
import zipfile
files: Dict[str, str] = {}
max_retries = 3
for attempt in range(max_retries):
try:
resp = httpx.get(
f"{self.BASE_URL}/download",
params={"slug": slug, "version": version},
timeout=30,
follow_redirects=True,
)
if resp.status_code == 429:
retry_after = int(resp.headers.get("retry-after", "5"))
retry_after = min(retry_after, 15) # Cap wait time
logger.debug(
"ClawHub download rate-limited for %s, retrying in %ds (attempt %d/%d)",
slug, retry_after, attempt + 1, max_retries,
)
time.sleep(retry_after)
continue
if resp.status_code != 200:
logger.debug("ClawHub ZIP download for %s v%s returned %s", slug, version, resp.status_code)
return files
with zipfile.ZipFile(io.BytesIO(resp.content)) as zf:
for info in zf.infolist():
if info.is_dir():
continue
# Sanitize path — strip leading slashes and ..
name = info.filename.lstrip("/")
if ".." in name or name.startswith("/"):
continue
# Only extract text-sized files (skip large binaries)
if info.file_size > 500_000:
logger.debug("Skipping large file in ZIP: %s (%d bytes)", name, info.file_size)
continue
try:
raw = zf.read(info.filename)
files[name] = raw.decode("utf-8")
except (UnicodeDecodeError, KeyError):
logger.debug("Skipping non-text file in ZIP: %s", name)
continue
return files
except zipfile.BadZipFile:
logger.warning("ClawHub returned invalid ZIP for %s v%s", slug, version)
return files
except httpx.HTTPError as exc:
logger.debug("ClawHub ZIP download failed for %s v%s: %s", slug, version, exc)
return files
logger.debug("ClawHub ZIP download exhausted retries for %s v%s", slug, version)
return files
def _fetch_text(self, url: str) -> Optional[str]: def _fetch_text(self, url: str) -> Optional[str]:
try: try:
resp = httpx.get(url, timeout=20) resp = httpx.get(url, timeout=20)