refactor: extract clipboard methods + comprehensive tests (37 tests)
Refactored image paste internals for testability:
- Extracted _try_attach_clipboard_image() method (clipboard → state)
- Extracted _build_multimodal_content() method (images → OpenAI format)
- chat() now delegates to these instead of inline logic
Tests organized in 4 levels:
Level 1 (19 tests): Clipboard module — every platform path with
realistic subprocess simulation (tools writing files, timeouts,
empty files, cleanup on failure)
Level 2 (8 tests): _build_multimodal_content — base64 encoding,
MIME types (png/jpg/webp/unknown), missing files, multiple images,
default question for empty text
Level 3 (5 tests): _try_attach_clipboard_image — state management,
counter increment/rollback, naming convention, mixed success/failure
Level 4 (5 tests): Queue routing — tuple unpacking, command detection,
images-only payloads, text-only payloads
This commit is contained in:
parent
ffc752a79e
commit
e2a834578d
3 changed files with 636 additions and 162 deletions
88
cli.py
88
cli.py
|
|
@ -1113,6 +1113,52 @@ class HermesCLI:
|
||||||
|
|
||||||
self.console.print()
|
self.console.print()
|
||||||
|
|
||||||
|
def _try_attach_clipboard_image(self) -> bool:
|
||||||
|
"""Check clipboard for an image and attach it if found.
|
||||||
|
|
||||||
|
Saves the image to ~/.hermes/images/ and appends the path to
|
||||||
|
``_attached_images``. Returns True if an image was attached.
|
||||||
|
"""
|
||||||
|
from hermes_cli.clipboard import save_clipboard_image
|
||||||
|
|
||||||
|
img_dir = Path.home() / ".hermes" / "images"
|
||||||
|
self._image_counter += 1
|
||||||
|
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||||
|
img_path = img_dir / f"clip_{ts}_{self._image_counter}.png"
|
||||||
|
|
||||||
|
if save_clipboard_image(img_path):
|
||||||
|
self._attached_images.append(img_path)
|
||||||
|
return True
|
||||||
|
self._image_counter -= 1
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _build_multimodal_content(self, text: str, images: list) -> list:
|
||||||
|
"""Convert text + image paths into OpenAI vision multimodal content.
|
||||||
|
|
||||||
|
Returns a list of content parts suitable for the ``content`` field
|
||||||
|
of a ``user`` message.
|
||||||
|
"""
|
||||||
|
import base64 as _b64
|
||||||
|
|
||||||
|
content_parts = []
|
||||||
|
text_part = text if isinstance(text, str) and text else "What do you see in this image?"
|
||||||
|
content_parts.append({"type": "text", "text": text_part})
|
||||||
|
|
||||||
|
_MIME = {
|
||||||
|
"png": "image/png", "jpg": "image/jpeg", "jpeg": "image/jpeg",
|
||||||
|
"gif": "image/gif", "webp": "image/webp",
|
||||||
|
}
|
||||||
|
for img_path in images:
|
||||||
|
if img_path.exists():
|
||||||
|
data = _b64.b64encode(img_path.read_bytes()).decode()
|
||||||
|
ext = img_path.suffix.lower().lstrip(".")
|
||||||
|
mime = _MIME.get(ext, "image/png")
|
||||||
|
content_parts.append({
|
||||||
|
"type": "image_url",
|
||||||
|
"image_url": {"url": f"data:{mime};base64,{data}"}
|
||||||
|
})
|
||||||
|
return content_parts
|
||||||
|
|
||||||
def _show_tool_availability_warnings(self):
|
def _show_tool_availability_warnings(self):
|
||||||
"""Show warnings about disabled tools due to missing API keys."""
|
"""Show warnings about disabled tools due to missing API keys."""
|
||||||
try:
|
try:
|
||||||
|
|
@ -2164,25 +2210,12 @@ class HermesCLI:
|
||||||
|
|
||||||
# Convert attached images to OpenAI vision multimodal content
|
# Convert attached images to OpenAI vision multimodal content
|
||||||
if images:
|
if images:
|
||||||
import base64 as _b64
|
message = self._build_multimodal_content(
|
||||||
content_parts = []
|
message if isinstance(message, str) else "", images
|
||||||
text_part = message if isinstance(message, str) else ""
|
)
|
||||||
if not text_part:
|
|
||||||
text_part = "What do you see in this image?"
|
|
||||||
content_parts.append({"type": "text", "text": text_part})
|
|
||||||
for img_path in images:
|
for img_path in images:
|
||||||
if img_path.exists():
|
if img_path.exists():
|
||||||
data = _b64.b64encode(img_path.read_bytes()).decode()
|
|
||||||
ext = img_path.suffix.lower().lstrip(".")
|
|
||||||
mime = {"png": "image/png", "jpg": "image/jpeg",
|
|
||||||
"jpeg": "image/jpeg", "gif": "image/gif",
|
|
||||||
"webp": "image/webp"}.get(ext, "image/png")
|
|
||||||
content_parts.append({
|
|
||||||
"type": "image_url",
|
|
||||||
"image_url": {"url": f"data:{mime};base64,{data}"}
|
|
||||||
})
|
|
||||||
_cprint(f" {_DIM}📎 attached {img_path.name} ({img_path.stat().st_size // 1024}KB){_RST}")
|
_cprint(f" {_DIM}📎 attached {img_path.name} ({img_path.stat().st_size // 1024}KB){_RST}")
|
||||||
message = content_parts
|
|
||||||
|
|
||||||
# Add user message to history
|
# Add user message to history
|
||||||
self.conversation_history.append({"role": "user", "content": message})
|
self.conversation_history.append({"role": "user", "content": message})
|
||||||
|
|
@ -2565,29 +2598,10 @@ class HermesCLI:
|
||||||
|
|
||||||
@kb.add(Keys.BracketedPaste, eager=True)
|
@kb.add(Keys.BracketedPaste, eager=True)
|
||||||
def handle_paste(event):
|
def handle_paste(event):
|
||||||
"""Handle Cmd+V / Ctrl+V paste — detect clipboard images.
|
"""Handle Cmd+V / Ctrl+V paste — detect clipboard images."""
|
||||||
|
|
||||||
On every paste event, check the system clipboard for image data.
|
|
||||||
If found, save to ~/.hermes/images/ and attach it to the next
|
|
||||||
message. Any pasted text is inserted into the buffer normally.
|
|
||||||
"""
|
|
||||||
from hermes_cli.clipboard import save_clipboard_image
|
|
||||||
|
|
||||||
pasted_text = event.data or ""
|
pasted_text = event.data or ""
|
||||||
|
if self._try_attach_clipboard_image():
|
||||||
# Check clipboard for image
|
|
||||||
img_dir = Path.home() / ".hermes" / "images"
|
|
||||||
self._image_counter += 1
|
|
||||||
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
||||||
img_path = img_dir / f"clip_{ts}_{self._image_counter}.png"
|
|
||||||
|
|
||||||
if save_clipboard_image(img_path):
|
|
||||||
self._attached_images.append(img_path)
|
|
||||||
event.app.invalidate()
|
event.app.invalidate()
|
||||||
else:
|
|
||||||
self._image_counter -= 1
|
|
||||||
|
|
||||||
# Insert any pasted text normally
|
|
||||||
if pasted_text:
|
if pasted_text:
|
||||||
event.current_buffer.insert_text(pasted_text)
|
event.current_buffer.insert_text(pasted_text)
|
||||||
|
|
||||||
|
|
|
||||||
344
docs/send_file_integration_map.md
Normal file
344
docs/send_file_integration_map.md
Normal file
|
|
@ -0,0 +1,344 @@
|
||||||
|
# send_file Integration Map — Hermes Agent Codebase Deep Dive
|
||||||
|
|
||||||
|
## 1. environments/tool_context.py — Base64 File Transfer Implementation
|
||||||
|
|
||||||
|
### upload_file() (lines 153-205)
|
||||||
|
- Reads local file as raw bytes, base64-encodes to ASCII string
|
||||||
|
- Creates parent dirs in sandbox via `self.terminal(f"mkdir -p {parent}")`
|
||||||
|
- **Chunk size:** 60,000 chars (~60KB per shell command)
|
||||||
|
- **Small files (<=60KB b64):** Single `printf '%s' '{b64}' | base64 -d > {remote_path}`
|
||||||
|
- **Large files:** Writes chunks to `/tmp/_hermes_upload.b64` via `printf >> append`, then `base64 -d` to target
|
||||||
|
- **Error handling:** Checks local file exists; returns `{exit_code, output}`
|
||||||
|
- **Size limits:** No explicit limit, but shell arg limit ~2MB means chunking is necessary for files >~45KB raw
|
||||||
|
- **No theoretical max** — but very large files would be slow (many terminal round trips)
|
||||||
|
|
||||||
|
### download_file() (lines 234-278)
|
||||||
|
- Runs `base64 {remote_path}` inside sandbox, captures stdout
|
||||||
|
- Strips output, base64-decodes to raw bytes
|
||||||
|
- Writes to host filesystem with parent dir creation
|
||||||
|
- **Error handling:** Checks exit code, empty output, decode errors
|
||||||
|
- Returns `{success: bool, bytes: int}` or `{success: false, error: str}`
|
||||||
|
- **Size limit:** Bounded by terminal output buffer (practical limit ~few MB via base64 terminal output)
|
||||||
|
|
||||||
|
### Promotion potential:
|
||||||
|
- These methods work via `self.terminal()` — they're environment-agnostic
|
||||||
|
- Could be directly lifted into a new tool that operates on the agent's current sandbox
|
||||||
|
- For send_file, this `download_file()` pattern is the key: it extracts files from sandbox → host
|
||||||
|
|
||||||
|
## 2. tools/environments/base.py — BaseEnvironment Interface
|
||||||
|
|
||||||
|
### Current methods:
|
||||||
|
- `execute(command, cwd, timeout, stdin_data)` → `{output, returncode}`
|
||||||
|
- `cleanup()` — release resources
|
||||||
|
- `stop()` — alias for cleanup
|
||||||
|
- `_prepare_command()` — sudo transformation
|
||||||
|
- `_build_run_kwargs()` — subprocess kwargs
|
||||||
|
- `_timeout_result()` — standard timeout dict
|
||||||
|
|
||||||
|
### What would need to be added for file transfer:
|
||||||
|
- **Nothing required at this level.** File transfer can be implemented via `execute()` (base64 over terminal, like ToolContext does) or via environment-specific methods.
|
||||||
|
- Optional: `upload_file(local_path, remote_path)` and `download_file(remote_path, local_path)` methods could be added to BaseEnvironment for optimized per-backend transfers, but the base64-over-terminal approach already works universally.
|
||||||
|
|
||||||
|
## 3. tools/environments/docker.py — Docker Container Details
|
||||||
|
|
||||||
|
### Container ID tracking:
|
||||||
|
- `self._container_id` stored at init from `self._inner.container_id`
|
||||||
|
- Inner is `minisweagent.environments.docker.DockerEnvironment`
|
||||||
|
- Container ID is a standard Docker container hash
|
||||||
|
|
||||||
|
### docker cp feasibility:
|
||||||
|
- **YES**, `docker cp` could be used for optimized file transfer:
|
||||||
|
- `docker cp {container_id}:{remote_path} {local_path}` (download)
|
||||||
|
- `docker cp {local_path} {container_id}:{remote_path}` (upload)
|
||||||
|
- Much faster than base64-over-terminal for large files
|
||||||
|
- Container ID is directly accessible via `env._container_id` or `env._inner.container_id`
|
||||||
|
|
||||||
|
### Volumes mounted:
|
||||||
|
- **Persistent mode:** Bind mounts at `~/.hermes/sandboxes/docker/{task_id}/workspace` → `/workspace` and `.../home` → `/root`
|
||||||
|
- **Ephemeral mode:** tmpfs at `/workspace` (10GB), `/home` (1GB), `/root` (1GB)
|
||||||
|
- **User volumes:** From `config.yaml docker_volumes` (arbitrary `-v` mounts)
|
||||||
|
- **Security tmpfs:** `/tmp` (512MB), `/var/tmp` (256MB), `/run` (64MB)
|
||||||
|
|
||||||
|
### Direct host access for persistent mode:
|
||||||
|
- If persistent, files at `/workspace/foo.txt` are just `~/.hermes/sandboxes/docker/{task_id}/workspace/foo.txt` on host — no transfer needed!
|
||||||
|
|
||||||
|
## 4. tools/environments/ssh.py — SSH Connection Management
|
||||||
|
|
||||||
|
### Connection management:
|
||||||
|
- Uses SSH ControlMaster for persistent connection
|
||||||
|
- Control socket at `/tmp/hermes-ssh/{user}@{host}:{port}.sock`
|
||||||
|
- ControlPersist=300 (5 min keepalive)
|
||||||
|
- BatchMode=yes (non-interactive)
|
||||||
|
- Stores: `self.host`, `self.user`, `self.port`, `self.key_path`
|
||||||
|
|
||||||
|
### SCP/SFTP feasibility:
|
||||||
|
- **YES**, SCP can piggyback on the ControlMaster socket:
|
||||||
|
- `scp -o ControlPath={socket} {user}@{host}:{remote} {local}` (download)
|
||||||
|
- `scp -o ControlPath={socket} {local} {user}@{host}:{remote}` (upload)
|
||||||
|
- Same SSH key and connection reuse — zero additional auth
|
||||||
|
- Would be much faster than base64-over-terminal for large files
|
||||||
|
|
||||||
|
## 5. tools/environments/modal.py — Modal Sandbox Filesystem
|
||||||
|
|
||||||
|
### Filesystem API exposure:
|
||||||
|
- **Not directly.** The inner `SwerexModalEnvironment` wraps Modal's sandbox
|
||||||
|
- The sandbox object is accessible at: `env._inner.deployment._sandbox`
|
||||||
|
- Modal's Python SDK exposes `sandbox.open()` for file I/O — but only via async API
|
||||||
|
- Currently only used for `snapshot_filesystem()` during cleanup
|
||||||
|
- **Could use:** `sandbox.open(path, "rb")` to read files or `sandbox.open(path, "wb")` to write
|
||||||
|
- **Alternative:** Base64-over-terminal already works via `execute()` — simpler, no SDK dependency
|
||||||
|
|
||||||
|
## 6. gateway/platforms/base.py — MEDIA: Tag Flow (Complete)
|
||||||
|
|
||||||
|
### extract_media() (lines 587-620):
|
||||||
|
- **Pattern:** `MEDIA:\S+` — extracts file paths after MEDIA: prefix
|
||||||
|
- **Voice flag:** `[[audio_as_voice]]` global directive sets `is_voice=True` for all media in message
|
||||||
|
- Returns `List[Tuple[str, bool]]` (path, is_voice) and cleaned content
|
||||||
|
|
||||||
|
### _process_message_background() media routing (lines 752-786):
|
||||||
|
- After extracting MEDIA tags, routes by file extension:
|
||||||
|
- `.ogg .opus .mp3 .wav .m4a` → `send_voice()`
|
||||||
|
- `.mp4 .mov .avi .mkv .3gp` → `send_video()`
|
||||||
|
- `.jpg .jpeg .png .webp .gif` → `send_image_file()`
|
||||||
|
- **Everything else** → `send_document()`
|
||||||
|
- This routing already supports arbitrary files!
|
||||||
|
|
||||||
|
### send_* method inventory (base class):
|
||||||
|
- `send(chat_id, content, reply_to, metadata)` — ABSTRACT, text
|
||||||
|
- `send_image(chat_id, image_url, caption, reply_to)` — URL-based images
|
||||||
|
- `send_animation(chat_id, animation_url, caption, reply_to)` — GIF animations
|
||||||
|
- `send_voice(chat_id, audio_path, caption, reply_to)` — voice messages
|
||||||
|
- `send_video(chat_id, video_path, caption, reply_to)` — video files
|
||||||
|
- `send_document(chat_id, file_path, caption, file_name, reply_to)` — generic files
|
||||||
|
- `send_image_file(chat_id, image_path, caption, reply_to)` — local image files
|
||||||
|
- `send_typing(chat_id)` — typing indicator
|
||||||
|
- `edit_message(chat_id, message_id, content)` — edit sent messages
|
||||||
|
|
||||||
|
### What's missing:
|
||||||
|
- **Telegram:** No override for `send_document` or `send_image_file` — falls back to text!
|
||||||
|
- **Discord:** No override for `send_document` — falls back to text!
|
||||||
|
- **WhatsApp:** Has `send_document` and `send_image_file` via bridge — COMPLETE.
|
||||||
|
- The base class defaults just send "📎 File: /path" as text — useless for actual file delivery.
|
||||||
|
|
||||||
|
## 7. gateway/platforms/telegram.py — Send Method Analysis
|
||||||
|
|
||||||
|
### Implemented send methods:
|
||||||
|
- `send()` — MarkdownV2 text with fallback to plain
|
||||||
|
- `send_voice()` — `.ogg`/`.opus` as `send_voice()`, others as `send_audio()`
|
||||||
|
- `send_image()` — URL-based via `send_photo()`
|
||||||
|
- `send_animation()` — GIF via `send_animation()`
|
||||||
|
- `send_typing()` — "typing" chat action
|
||||||
|
- `edit_message()` — edit text messages
|
||||||
|
|
||||||
|
### MISSING:
|
||||||
|
- **`send_document()` NOT overridden** — Need to add `self._bot.send_document(chat_id, document=open(file_path, 'rb'), ...)`
|
||||||
|
- **`send_image_file()` NOT overridden** — Need to add `self._bot.send_photo(chat_id, photo=open(path, 'rb'), ...)`
|
||||||
|
- **`send_video()` NOT overridden** — Need to add `self._bot.send_video(...)`
|
||||||
|
|
||||||
|
## 8. gateway/platforms/discord.py — Send Method Analysis
|
||||||
|
|
||||||
|
### Implemented send methods:
|
||||||
|
- `send()` — text messages with chunking
|
||||||
|
- `send_voice()` — discord.File attachment
|
||||||
|
- `send_image()` — downloads URL, creates discord.File attachment
|
||||||
|
- `send_typing()` — channel.typing()
|
||||||
|
- `edit_message()` — edit text messages
|
||||||
|
|
||||||
|
### MISSING:
|
||||||
|
- **`send_document()` NOT overridden** — Need to add discord.File attachment
|
||||||
|
- **`send_image_file()` NOT overridden** — Need to add discord.File from local path
|
||||||
|
- **`send_video()` NOT overridden** — Need to add discord.File attachment
|
||||||
|
|
||||||
|
## 9. gateway/run.py — User File Attachment Handling
|
||||||
|
|
||||||
|
### Current attachment flow:
|
||||||
|
1. **Telegram photos** (line 509-529): Download via `photo.get_file()` → `cache_image_from_bytes()` → vision auto-analysis
|
||||||
|
2. **Telegram voice** (line 532-541): Download → `cache_audio_from_bytes()` → STT transcription
|
||||||
|
3. **Telegram audio** (line 542-551): Same pattern
|
||||||
|
4. **Telegram documents** (line 553-617): Extension validation against `SUPPORTED_DOCUMENT_TYPES`, 20MB limit, content injection for text files
|
||||||
|
5. **Discord attachments** (line 717-751): Content-type detection, image/audio caching, URL fallback for other types
|
||||||
|
6. **Gateway run.py** (lines 818-883): Auto-analyzes images with vision, transcribes audio, enriches document messages with context notes
|
||||||
|
|
||||||
|
### Key insight: Files are always cached to host filesystem first, then processed. The agent sees local file paths.
|
||||||
|
|
||||||
|
## 10. tools/terminal_tool.py — Terminal Tool & Environment Interaction
|
||||||
|
|
||||||
|
### How it manages environments:
|
||||||
|
- Global dict `_active_environments: Dict[str, Any]` keyed by task_id
|
||||||
|
- Per-task creation locks prevent duplicate sandbox creation
|
||||||
|
- Auto-cleanup thread kills idle environments after `TERMINAL_LIFETIME_SECONDS`
|
||||||
|
- `_get_env_config()` reads all TERMINAL_* env vars for backend selection
|
||||||
|
- `_create_environment()` factory creates the right backend type
|
||||||
|
|
||||||
|
### Could send_file piggyback?
|
||||||
|
- **YES.** send_file needs access to the same environment to extract files from sandboxes.
|
||||||
|
- It can reuse `_active_environments[task_id]` to get the environment, then:
|
||||||
|
- Docker: Use `docker cp` via `env._container_id`
|
||||||
|
- SSH: Use `scp` via `env.control_socket`
|
||||||
|
- Local: Just read the file directly
|
||||||
|
- Modal: Use base64-over-terminal via `env.execute()`
|
||||||
|
- The file_tools.py module already does this with `ShellFileOperations` — read_file/write_file/search/patch all share the same env instance.
|
||||||
|
|
||||||
|
## 11. tools/tts_tool.py — Working Example of File Delivery
|
||||||
|
|
||||||
|
### Flow:
|
||||||
|
1. Generate audio file to `~/.hermes/audio_cache/tts_TIMESTAMP.{ogg,mp3}`
|
||||||
|
2. Return JSON with `media_tag: "MEDIA:/path/to/file"`
|
||||||
|
3. For Telegram voice: prepend `[[audio_as_voice]]` directive
|
||||||
|
4. The LLM includes the MEDIA tag in its response text
|
||||||
|
5. `BasePlatformAdapter._process_message_background()` calls `extract_media()` to find the tag
|
||||||
|
6. Routes by extension → `send_voice()` for audio files
|
||||||
|
7. Platform adapter sends the file natively
|
||||||
|
|
||||||
|
### Key pattern: Tool saves file to host → returns MEDIA: path → LLM echoes it → gateway extracts → platform delivers
|
||||||
|
|
||||||
|
## 12. tools/image_generation_tool.py — Working Example of Image Delivery
|
||||||
|
|
||||||
|
### Flow:
|
||||||
|
1. Call FAL.ai API → get image URL
|
||||||
|
2. Return JSON with `image: "https://fal.media/..."` URL
|
||||||
|
3. The LLM includes the URL in markdown: ``
|
||||||
|
4. `BasePlatformAdapter.extract_images()` finds `` patterns
|
||||||
|
5. Routes through `send_image()` (URL) or `send_animation()` (GIF)
|
||||||
|
6. Platform downloads and sends natively
|
||||||
|
|
||||||
|
### Key difference from TTS: Images are URL-based, not local files. The gateway downloads at send time.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
# INTEGRATION MAP: Where send_file Hooks In
|
||||||
|
|
||||||
|
## Architecture Decision: MEDIA: Tag Protocol vs. New Tool
|
||||||
|
|
||||||
|
The MEDIA: tag protocol is already the established pattern for file delivery. Two options:
|
||||||
|
|
||||||
|
### Option A: Pure MEDIA: Tag (Minimal Change)
|
||||||
|
- No new tool needed
|
||||||
|
- Agent downloads file from sandbox to host using terminal (base64)
|
||||||
|
- Saves to known location (e.g., `~/.hermes/file_cache/`)
|
||||||
|
- Includes `MEDIA:/path` in response text
|
||||||
|
- Existing routing in `_process_message_background()` handles delivery
|
||||||
|
- **Problem:** Agent has to manually do base64 dance + know about MEDIA: convention
|
||||||
|
|
||||||
|
### Option B: Dedicated send_file Tool (Recommended)
|
||||||
|
- New tool that the agent calls with `(file_path, caption?)`
|
||||||
|
- Tool handles the sandbox → host extraction automatically
|
||||||
|
- Returns MEDIA: tag that gets routed through existing pipeline
|
||||||
|
- Much cleaner agent experience
|
||||||
|
|
||||||
|
## Implementation Plan for Option B
|
||||||
|
|
||||||
|
### Files to CREATE:
|
||||||
|
|
||||||
|
1. **`tools/send_file_tool.py`** — The new tool
|
||||||
|
- Accepts: `file_path` (path in sandbox), `caption` (optional)
|
||||||
|
- Detects environment backend from `_active_environments`
|
||||||
|
- Extracts file from sandbox:
|
||||||
|
- **local:** `shutil.copy()` or direct path
|
||||||
|
- **docker:** `docker cp {container_id}:{path} {local_cache}/`
|
||||||
|
- **ssh:** `scp -o ControlPath=... {user}@{host}:{path} {local_cache}/`
|
||||||
|
- **modal:** base64-over-terminal via `env.execute("base64 {path}")`
|
||||||
|
- Saves to `~/.hermes/file_cache/{uuid}_{filename}`
|
||||||
|
- Returns: `MEDIA:/cached/path` in response for gateway to pick up
|
||||||
|
- Register with `registry.register(name="send_file", toolset="file", ...)`
|
||||||
|
|
||||||
|
### Files to MODIFY:
|
||||||
|
|
||||||
|
2. **`gateway/platforms/telegram.py`** — Add missing send methods:
|
||||||
|
```python
|
||||||
|
async def send_document(self, chat_id, file_path, caption=None, file_name=None, reply_to=None):
|
||||||
|
with open(file_path, "rb") as f:
|
||||||
|
msg = await self._bot.send_document(
|
||||||
|
chat_id=int(chat_id), document=f,
|
||||||
|
caption=caption, filename=file_name or os.path.basename(file_path))
|
||||||
|
return SendResult(success=True, message_id=str(msg.message_id))
|
||||||
|
|
||||||
|
async def send_image_file(self, chat_id, image_path, caption=None, reply_to=None):
|
||||||
|
with open(image_path, "rb") as f:
|
||||||
|
msg = await self._bot.send_photo(chat_id=int(chat_id), photo=f, caption=caption)
|
||||||
|
return SendResult(success=True, message_id=str(msg.message_id))
|
||||||
|
|
||||||
|
async def send_video(self, chat_id, video_path, caption=None, reply_to=None):
|
||||||
|
with open(video_path, "rb") as f:
|
||||||
|
msg = await self._bot.send_video(chat_id=int(chat_id), video=f, caption=caption)
|
||||||
|
return SendResult(success=True, message_id=str(msg.message_id))
|
||||||
|
```
|
||||||
|
|
||||||
|
3. **`gateway/platforms/discord.py`** — Add missing send methods:
|
||||||
|
```python
|
||||||
|
async def send_document(self, chat_id, file_path, caption=None, file_name=None, reply_to=None):
|
||||||
|
channel = self._client.get_channel(int(chat_id)) or await self._client.fetch_channel(int(chat_id))
|
||||||
|
with open(file_path, "rb") as f:
|
||||||
|
file = discord.File(io.BytesIO(f.read()), filename=file_name or os.path.basename(file_path))
|
||||||
|
msg = await channel.send(content=caption, file=file)
|
||||||
|
return SendResult(success=True, message_id=str(msg.id))
|
||||||
|
|
||||||
|
async def send_image_file(self, chat_id, image_path, caption=None, reply_to=None):
|
||||||
|
# Same pattern as send_document with image filename
|
||||||
|
|
||||||
|
async def send_video(self, chat_id, video_path, caption=None, reply_to=None):
|
||||||
|
# Same pattern, discord renders video attachments inline
|
||||||
|
```
|
||||||
|
|
||||||
|
4. **`toolsets.py`** — Add `"send_file"` to `_HERMES_CORE_TOOLS` list
|
||||||
|
|
||||||
|
5. **`agent/prompt_builder.py`** — Update platform hints to mention send_file tool
|
||||||
|
|
||||||
|
### Code that can be REUSED (zero rewrite):
|
||||||
|
|
||||||
|
- `BasePlatformAdapter.extract_media()` — Already extracts MEDIA: tags
|
||||||
|
- `BasePlatformAdapter._process_message_background()` — Already routes by extension
|
||||||
|
- `ToolContext.download_file()` — Base64-over-terminal extraction pattern
|
||||||
|
- `tools/terminal_tool.py` _active_environments dict — Environment access
|
||||||
|
- `tools/registry.py` — Tool registration infrastructure
|
||||||
|
- `gateway/platforms/base.py` send_document/send_image_file/send_video signatures — Already defined
|
||||||
|
|
||||||
|
### Code that needs to be WRITTEN from scratch:
|
||||||
|
|
||||||
|
1. `tools/send_file_tool.py` (~150 lines):
|
||||||
|
- File extraction from each environment backend type
|
||||||
|
- Local file cache management
|
||||||
|
- Registry registration
|
||||||
|
|
||||||
|
2. Telegram `send_document` + `send_image_file` + `send_video` overrides (~40 lines)
|
||||||
|
3. Discord `send_document` + `send_image_file` + `send_video` overrides (~50 lines)
|
||||||
|
|
||||||
|
### Total effort: ~240 lines of new code, ~5 lines of config changes
|
||||||
|
|
||||||
|
## Key Environment-Specific Extract Strategies
|
||||||
|
|
||||||
|
| Backend | Extract Method | Speed | Complexity |
|
||||||
|
|------------|-------------------------------|----------|------------|
|
||||||
|
| local | shutil.copy / direct path | Instant | None |
|
||||||
|
| docker | `docker cp container:path .` | Fast | Low |
|
||||||
|
| docker+vol | Direct host path access | Instant | None |
|
||||||
|
| ssh | `scp -o ControlPath=...` | Fast | Low |
|
||||||
|
| modal | base64-over-terminal | Moderate | Medium |
|
||||||
|
| singularity| Direct path (overlay mount) | Fast | Low |
|
||||||
|
|
||||||
|
## Data Flow Summary
|
||||||
|
|
||||||
|
```
|
||||||
|
Agent calls send_file(file_path="/workspace/output.pdf", caption="Here's the report")
|
||||||
|
│
|
||||||
|
▼
|
||||||
|
send_file_tool.py:
|
||||||
|
1. Get environment from _active_environments[task_id]
|
||||||
|
2. Detect backend type (docker/ssh/modal/local)
|
||||||
|
3. Extract file to ~/.hermes/file_cache/{uuid}_{filename}
|
||||||
|
4. Return: '{"success": true, "media_tag": "MEDIA:/home/user/.hermes/file_cache/abc123_output.pdf"}'
|
||||||
|
│
|
||||||
|
▼
|
||||||
|
LLM includes MEDIA: tag in its response text
|
||||||
|
│
|
||||||
|
▼
|
||||||
|
BasePlatformAdapter._process_message_background():
|
||||||
|
1. extract_media(response) → finds MEDIA:/path
|
||||||
|
2. Checks extension: .pdf → send_document()
|
||||||
|
3. Calls platform-specific send_document(chat_id, file_path, caption)
|
||||||
|
│
|
||||||
|
▼
|
||||||
|
TelegramAdapter.send_document() / DiscordAdapter.send_document():
|
||||||
|
Opens file, sends via platform API as native document attachment
|
||||||
|
User receives downloadable file in chat
|
||||||
|
```
|
||||||
|
|
@ -1,15 +1,18 @@
|
||||||
"""Tests for hermes_cli/clipboard.py — clipboard image extraction.
|
"""Tests for clipboard image paste — clipboard extraction, multimodal conversion,
|
||||||
|
and CLI integration.
|
||||||
|
|
||||||
Tests clipboard image extraction across platforms, and the CLI-level
|
Coverage:
|
||||||
multimodal content conversion that turns attached images into OpenAI
|
hermes_cli/clipboard.py — platform-specific image extraction
|
||||||
vision API format.
|
cli.py — _try_attach_clipboard_image, _build_multimodal_content,
|
||||||
|
image attachment state, queue tuple routing
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import base64
|
import base64
|
||||||
|
import queue
|
||||||
import subprocess
|
import subprocess
|
||||||
import sys
|
import sys
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from unittest.mock import patch, MagicMock, call
|
from unittest.mock import patch, MagicMock, PropertyMock
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
|
|
@ -20,8 +23,12 @@ from hermes_cli.clipboard import (
|
||||||
_macos_osascript,
|
_macos_osascript,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
FAKE_PNG = b"\x89PNG\r\n\x1a\n" + b"\x00" * 100
|
||||||
|
|
||||||
# ── Platform dispatch ────────────────────────────────────────────────────
|
|
||||||
|
# ═════════════════════════════════════════════════════════════════════════
|
||||||
|
# Level 1: Clipboard module — platform dispatch + tool interactions
|
||||||
|
# ═════════════════════════════════════════════════════════════════════════
|
||||||
|
|
||||||
class TestSaveClipboardImage:
|
class TestSaveClipboardImage:
|
||||||
def test_dispatches_to_macos_on_darwin(self, tmp_path):
|
def test_dispatches_to_macos_on_darwin(self, tmp_path):
|
||||||
|
|
@ -49,21 +56,15 @@ class TestSaveClipboardImage:
|
||||||
assert dest.parent.exists()
|
assert dest.parent.exists()
|
||||||
|
|
||||||
|
|
||||||
# ── macOS pngpaste ───────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
class TestMacosPngpaste:
|
class TestMacosPngpaste:
|
||||||
def test_success_writes_file(self, tmp_path):
|
def test_success_writes_file(self, tmp_path):
|
||||||
"""pngpaste writes the file on success — verify we detect it."""
|
|
||||||
dest = tmp_path / "out.png"
|
dest = tmp_path / "out.png"
|
||||||
|
|
||||||
def fake_run(cmd, **kw):
|
def fake_run(cmd, **kw):
|
||||||
# Simulate pngpaste writing the file
|
dest.write_bytes(FAKE_PNG)
|
||||||
dest.write_bytes(b"\x89PNG\r\n\x1a\n" + b"\x00" * 100)
|
|
||||||
return MagicMock(returncode=0)
|
return MagicMock(returncode=0)
|
||||||
|
|
||||||
with patch("hermes_cli.clipboard.subprocess.run", side_effect=fake_run):
|
with patch("hermes_cli.clipboard.subprocess.run", side_effect=fake_run):
|
||||||
assert _macos_pngpaste(dest) is True
|
assert _macos_pngpaste(dest) is True
|
||||||
assert dest.stat().st_size > 0
|
assert dest.stat().st_size == len(FAKE_PNG)
|
||||||
|
|
||||||
def test_not_installed(self, tmp_path):
|
def test_not_installed(self, tmp_path):
|
||||||
with patch("hermes_cli.clipboard.subprocess.run", side_effect=FileNotFoundError):
|
with patch("hermes_cli.clipboard.subprocess.run", side_effect=FileNotFoundError):
|
||||||
|
|
@ -77,18 +78,19 @@ class TestMacosPngpaste:
|
||||||
assert not dest.exists()
|
assert not dest.exists()
|
||||||
|
|
||||||
def test_empty_file_rejected(self, tmp_path):
|
def test_empty_file_rejected(self, tmp_path):
|
||||||
"""pngpaste exits 0 but writes an empty file — should return False."""
|
|
||||||
dest = tmp_path / "out.png"
|
dest = tmp_path / "out.png"
|
||||||
|
|
||||||
def fake_run(cmd, **kw):
|
def fake_run(cmd, **kw):
|
||||||
dest.write_bytes(b"") # empty
|
dest.write_bytes(b"")
|
||||||
return MagicMock(returncode=0)
|
return MagicMock(returncode=0)
|
||||||
|
|
||||||
with patch("hermes_cli.clipboard.subprocess.run", side_effect=fake_run):
|
with patch("hermes_cli.clipboard.subprocess.run", side_effect=fake_run):
|
||||||
assert _macos_pngpaste(dest) is False
|
assert _macos_pngpaste(dest) is False
|
||||||
|
|
||||||
|
def test_timeout_returns_false(self, tmp_path):
|
||||||
|
dest = tmp_path / "out.png"
|
||||||
|
with patch("hermes_cli.clipboard.subprocess.run",
|
||||||
|
side_effect=subprocess.TimeoutExpired("pngpaste", 3)):
|
||||||
|
assert _macos_pngpaste(dest) is False
|
||||||
|
|
||||||
# ── macOS osascript ──────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
class TestMacosOsascript:
|
class TestMacosOsascript:
|
||||||
def test_no_image_type_in_clipboard(self, tmp_path):
|
def test_no_image_type_in_clipboard(self, tmp_path):
|
||||||
|
|
@ -103,57 +105,53 @@ class TestMacosOsascript:
|
||||||
assert _macos_osascript(tmp_path / "out.png") is False
|
assert _macos_osascript(tmp_path / "out.png") is False
|
||||||
|
|
||||||
def test_success_with_png(self, tmp_path):
|
def test_success_with_png(self, tmp_path):
|
||||||
"""clipboard has PNGf, osascript extracts it successfully."""
|
|
||||||
dest = tmp_path / "out.png"
|
dest = tmp_path / "out.png"
|
||||||
call_count = [0]
|
calls = []
|
||||||
|
|
||||||
def fake_run(cmd, **kw):
|
def fake_run(cmd, **kw):
|
||||||
call_count[0] += 1
|
calls.append(cmd)
|
||||||
if call_count[0] == 1:
|
if len(calls) == 1:
|
||||||
# clipboard info check
|
|
||||||
return MagicMock(stdout="«class PNGf», «class ut16»", returncode=0)
|
return MagicMock(stdout="«class PNGf», «class ut16»", returncode=0)
|
||||||
else:
|
dest.write_bytes(FAKE_PNG)
|
||||||
# extraction — simulate writing the file
|
return MagicMock(stdout="", returncode=0)
|
||||||
dest.write_bytes(b"\x89PNG" + b"\x00" * 50)
|
|
||||||
return MagicMock(stdout="", returncode=0)
|
|
||||||
|
|
||||||
with patch("hermes_cli.clipboard.subprocess.run", side_effect=fake_run):
|
with patch("hermes_cli.clipboard.subprocess.run", side_effect=fake_run):
|
||||||
assert _macos_osascript(dest) is True
|
assert _macos_osascript(dest) is True
|
||||||
assert dest.stat().st_size > 0
|
assert dest.stat().st_size > 0
|
||||||
|
|
||||||
def test_success_with_tiff(self, tmp_path):
|
def test_success_with_tiff(self, tmp_path):
|
||||||
"""clipboard has TIFF type — should still attempt extraction."""
|
|
||||||
dest = tmp_path / "out.png"
|
dest = tmp_path / "out.png"
|
||||||
call_count = [0]
|
calls = []
|
||||||
|
|
||||||
def fake_run(cmd, **kw):
|
def fake_run(cmd, **kw):
|
||||||
call_count[0] += 1
|
calls.append(cmd)
|
||||||
if call_count[0] == 1:
|
if len(calls) == 1:
|
||||||
return MagicMock(stdout="«class TIFF»", returncode=0)
|
return MagicMock(stdout="«class TIFF»", returncode=0)
|
||||||
else:
|
dest.write_bytes(FAKE_PNG)
|
||||||
dest.write_bytes(b"\x89PNG" + b"\x00" * 50)
|
return MagicMock(stdout="", returncode=0)
|
||||||
return MagicMock(stdout="", returncode=0)
|
|
||||||
|
|
||||||
with patch("hermes_cli.clipboard.subprocess.run", side_effect=fake_run):
|
with patch("hermes_cli.clipboard.subprocess.run", side_effect=fake_run):
|
||||||
assert _macos_osascript(dest) is True
|
assert _macos_osascript(dest) is True
|
||||||
|
|
||||||
def test_extraction_returns_fail(self, tmp_path):
|
def test_extraction_returns_fail(self, tmp_path):
|
||||||
"""clipboard info says image but extraction script returns 'fail'."""
|
|
||||||
dest = tmp_path / "out.png"
|
dest = tmp_path / "out.png"
|
||||||
call_count = [0]
|
calls = []
|
||||||
|
|
||||||
def fake_run(cmd, **kw):
|
def fake_run(cmd, **kw):
|
||||||
call_count[0] += 1
|
calls.append(cmd)
|
||||||
if call_count[0] == 1:
|
if len(calls) == 1:
|
||||||
return MagicMock(stdout="«class PNGf»", returncode=0)
|
return MagicMock(stdout="«class PNGf»", returncode=0)
|
||||||
else:
|
return MagicMock(stdout="fail", returncode=0)
|
||||||
return MagicMock(stdout="fail", returncode=0)
|
|
||||||
|
|
||||||
with patch("hermes_cli.clipboard.subprocess.run", side_effect=fake_run):
|
with patch("hermes_cli.clipboard.subprocess.run", side_effect=fake_run):
|
||||||
assert _macos_osascript(dest) is False
|
assert _macos_osascript(dest) is False
|
||||||
|
|
||||||
|
def test_extraction_writes_empty_file(self, tmp_path):
|
||||||
|
dest = tmp_path / "out.png"
|
||||||
|
calls = []
|
||||||
|
def fake_run(cmd, **kw):
|
||||||
|
calls.append(cmd)
|
||||||
|
if len(calls) == 1:
|
||||||
|
return MagicMock(stdout="«class PNGf»", returncode=0)
|
||||||
|
dest.write_bytes(b"")
|
||||||
|
return MagicMock(stdout="", returncode=0)
|
||||||
|
with patch("hermes_cli.clipboard.subprocess.run", side_effect=fake_run):
|
||||||
|
assert _macos_osascript(dest) is False
|
||||||
|
|
||||||
# ── Linux xclip ──────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
class TestLinuxSave:
|
class TestLinuxSave:
|
||||||
def test_no_xclip_installed(self, tmp_path):
|
def test_no_xclip_installed(self, tmp_path):
|
||||||
|
|
@ -166,116 +164,234 @@ class TestLinuxSave:
|
||||||
assert _linux_save(tmp_path / "out.png") is False
|
assert _linux_save(tmp_path / "out.png") is False
|
||||||
|
|
||||||
def test_image_extraction_success(self, tmp_path):
|
def test_image_extraction_success(self, tmp_path):
|
||||||
"""xclip reports image/png in targets, then pipes PNG data."""
|
|
||||||
dest = tmp_path / "out.png"
|
dest = tmp_path / "out.png"
|
||||||
call_count = [0]
|
|
||||||
|
|
||||||
def fake_run(cmd, **kw):
|
def fake_run(cmd, **kw):
|
||||||
call_count[0] += 1
|
|
||||||
if "TARGETS" in cmd:
|
if "TARGETS" in cmd:
|
||||||
return MagicMock(stdout="image/png\ntext/plain\n", returncode=0)
|
return MagicMock(stdout="image/png\ntext/plain\n", returncode=0)
|
||||||
# Extract call — write via the stdout file handle
|
|
||||||
if "stdout" in kw and hasattr(kw["stdout"], "write"):
|
if "stdout" in kw and hasattr(kw["stdout"], "write"):
|
||||||
kw["stdout"].write(b"\x89PNG\r\n\x1a\n" + b"\x00" * 100)
|
kw["stdout"].write(FAKE_PNG)
|
||||||
return MagicMock(returncode=0)
|
return MagicMock(returncode=0)
|
||||||
|
|
||||||
with patch("hermes_cli.clipboard.subprocess.run", side_effect=fake_run):
|
with patch("hermes_cli.clipboard.subprocess.run", side_effect=fake_run):
|
||||||
assert _linux_save(dest) is True
|
assert _linux_save(dest) is True
|
||||||
assert dest.stat().st_size > 0
|
assert dest.stat().st_size > 0
|
||||||
|
|
||||||
def test_extraction_fails_cleans_up(self, tmp_path):
|
def test_extraction_fails_cleans_up(self, tmp_path):
|
||||||
"""If xclip extraction fails, any partial file is cleaned up."""
|
|
||||||
dest = tmp_path / "out.png"
|
dest = tmp_path / "out.png"
|
||||||
call_count = [0]
|
|
||||||
|
|
||||||
def fake_run(cmd, **kw):
|
def fake_run(cmd, **kw):
|
||||||
call_count[0] += 1
|
|
||||||
if "TARGETS" in cmd:
|
if "TARGETS" in cmd:
|
||||||
return MagicMock(stdout="image/png\n", returncode=0)
|
return MagicMock(stdout="image/png\n", returncode=0)
|
||||||
raise subprocess.SubprocessError("pipe broke")
|
raise subprocess.SubprocessError("pipe broke")
|
||||||
|
|
||||||
with patch("hermes_cli.clipboard.subprocess.run", side_effect=fake_run):
|
with patch("hermes_cli.clipboard.subprocess.run", side_effect=fake_run):
|
||||||
assert _linux_save(dest) is False
|
assert _linux_save(dest) is False
|
||||||
assert not dest.exists()
|
assert not dest.exists()
|
||||||
|
|
||||||
|
def test_targets_check_timeout(self, tmp_path):
|
||||||
|
with patch("hermes_cli.clipboard.subprocess.run",
|
||||||
|
side_effect=subprocess.TimeoutExpired("xclip", 3)):
|
||||||
|
assert _linux_save(tmp_path / "out.png") is False
|
||||||
|
|
||||||
# ── Multimodal content conversion (CLI-level) ────────────────────────────
|
|
||||||
|
|
||||||
class TestMultimodalConversion:
|
# ═════════════════════════════════════════════════════════════════════════
|
||||||
"""Test the image → OpenAI vision content conversion in chat()."""
|
# Level 2: _build_multimodal_content — image → OpenAI vision format
|
||||||
|
# ═════════════════════════════════════════════════════════════════════════
|
||||||
|
|
||||||
def _make_fake_image(self, tmp_path, name="test.png", size=64):
|
class TestBuildMultimodalContent:
|
||||||
"""Create a small fake PNG file."""
|
"""Test the extracted _build_multimodal_content method directly."""
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def cli(self):
|
||||||
|
"""Minimal HermesCLI with mocked internals."""
|
||||||
|
with patch("cli.load_cli_config") as mock_cfg:
|
||||||
|
mock_cfg.return_value = {
|
||||||
|
"model": {"default": "test/model", "base_url": "http://x", "provider": "auto"},
|
||||||
|
"terminal": {"timeout": 60},
|
||||||
|
"browser": {},
|
||||||
|
"compression": {"enabled": True},
|
||||||
|
"agent": {"max_turns": 10},
|
||||||
|
"display": {"compact": True},
|
||||||
|
"clarify": {},
|
||||||
|
"code_execution": {},
|
||||||
|
"delegation": {},
|
||||||
|
}
|
||||||
|
with patch.dict("os.environ", {"OPENROUTER_API_KEY": "test-key"}):
|
||||||
|
with patch("cli.CLI_CONFIG", mock_cfg.return_value):
|
||||||
|
from cli import HermesCLI
|
||||||
|
cli_obj = HermesCLI.__new__(HermesCLI)
|
||||||
|
# Manually init just enough state
|
||||||
|
cli_obj._attached_images = []
|
||||||
|
cli_obj._image_counter = 0
|
||||||
|
return cli_obj
|
||||||
|
|
||||||
|
def _make_image(self, tmp_path, name="test.png", content=FAKE_PNG):
|
||||||
img = tmp_path / name
|
img = tmp_path / name
|
||||||
img.write_bytes(b"\x89PNG\r\n\x1a\n" + b"\x00" * size)
|
img.write_bytes(content)
|
||||||
return img
|
return img
|
||||||
|
|
||||||
def test_single_image_with_text(self, tmp_path):
|
def test_single_image_with_text(self, cli, tmp_path):
|
||||||
"""One image + text → multimodal content array."""
|
img = self._make_image(tmp_path)
|
||||||
img = self._make_fake_image(tmp_path)
|
result = cli._build_multimodal_content("Describe this", [img])
|
||||||
raw_bytes = img.read_bytes()
|
|
||||||
expected_b64 = base64.b64encode(raw_bytes).decode()
|
|
||||||
|
|
||||||
# Simulate what chat() does with images
|
assert len(result) == 2
|
||||||
message = "What's in this image?"
|
assert result[0] == {"type": "text", "text": "Describe this"}
|
||||||
images = [img]
|
assert result[1]["type"] == "image_url"
|
||||||
|
url = result[1]["image_url"]["url"]
|
||||||
|
assert url.startswith("data:image/png;base64,")
|
||||||
|
# Verify the base64 actually decodes to our image
|
||||||
|
b64_data = url.split(",", 1)[1]
|
||||||
|
assert base64.b64decode(b64_data) == FAKE_PNG
|
||||||
|
|
||||||
content_parts = []
|
def test_multiple_images(self, cli, tmp_path):
|
||||||
content_parts.append({"type": "text", "text": message})
|
imgs = [self._make_image(tmp_path, f"img{i}.png") for i in range(3)]
|
||||||
for img_path in images:
|
result = cli._build_multimodal_content("Compare", imgs)
|
||||||
data = base64.b64encode(img_path.read_bytes()).decode()
|
assert len(result) == 4 # 1 text + 3 images
|
||||||
ext = img_path.suffix.lower().lstrip(".")
|
assert all(r["type"] == "image_url" for r in result[1:])
|
||||||
mime = {"png": "image/png", "jpg": "image/jpeg"}.get(ext, "image/png")
|
|
||||||
content_parts.append({
|
|
||||||
"type": "image_url",
|
|
||||||
"image_url": {"url": f"data:{mime};base64,{data}"}
|
|
||||||
})
|
|
||||||
|
|
||||||
assert len(content_parts) == 2
|
def test_empty_text_gets_default_question(self, cli, tmp_path):
|
||||||
assert content_parts[0]["type"] == "text"
|
img = self._make_image(tmp_path)
|
||||||
assert content_parts[0]["text"] == "What's in this image?"
|
result = cli._build_multimodal_content("", [img])
|
||||||
assert content_parts[1]["type"] == "image_url"
|
assert result[0]["text"] == "What do you see in this image?"
|
||||||
assert content_parts[1]["image_url"]["url"].startswith("data:image/png;base64,")
|
|
||||||
assert expected_b64 in content_parts[1]["image_url"]["url"]
|
|
||||||
|
|
||||||
def test_multiple_images(self, tmp_path):
|
def test_jpeg_mime_type(self, cli, tmp_path):
|
||||||
"""Multiple images → all included in content array."""
|
img = self._make_image(tmp_path, "photo.jpg", b"\xff\xd8\xff\x00" * 20)
|
||||||
imgs = [self._make_fake_image(tmp_path, f"img{i}.png") for i in range(3)]
|
result = cli._build_multimodal_content("test", [img])
|
||||||
|
assert "image/jpeg" in result[1]["image_url"]["url"]
|
||||||
|
|
||||||
content_parts = [{"type": "text", "text": "Compare these"}]
|
def test_webp_mime_type(self, cli, tmp_path):
|
||||||
for img_path in imgs:
|
img = self._make_image(tmp_path, "img.webp", b"RIFF\x00\x00" * 10)
|
||||||
data = base64.b64encode(img_path.read_bytes()).decode()
|
result = cli._build_multimodal_content("test", [img])
|
||||||
content_parts.append({
|
assert "image/webp" in result[1]["image_url"]["url"]
|
||||||
"type": "image_url",
|
|
||||||
"image_url": {"url": f"data:image/png;base64,{data}"}
|
|
||||||
})
|
|
||||||
|
|
||||||
assert len(content_parts) == 4 # 1 text + 3 images
|
def test_unknown_extension_defaults_to_png(self, cli, tmp_path):
|
||||||
|
img = self._make_image(tmp_path, "data.bmp", b"\x00" * 50)
|
||||||
|
result = cli._build_multimodal_content("test", [img])
|
||||||
|
assert "image/png" in result[1]["image_url"]["url"]
|
||||||
|
|
||||||
def test_no_text_gets_default(self):
|
def test_missing_image_skipped(self, cli, tmp_path):
|
||||||
"""Empty text with image → default question added."""
|
missing = tmp_path / "gone.png"
|
||||||
text = ""
|
result = cli._build_multimodal_content("test", [missing])
|
||||||
if not text:
|
assert len(result) == 1 # only text
|
||||||
text = "What do you see in this image?"
|
|
||||||
assert text == "What do you see in this image?"
|
|
||||||
|
|
||||||
def test_jpeg_mime_type(self, tmp_path):
|
def test_mix_of_existing_and_missing(self, cli, tmp_path):
|
||||||
"""JPEG files get the correct MIME type."""
|
real = self._make_image(tmp_path, "real.png")
|
||||||
img = tmp_path / "photo.jpg"
|
missing = tmp_path / "gone.png"
|
||||||
img.write_bytes(b"\xff\xd8\xff" + b"\x00" * 50)
|
result = cli._build_multimodal_content("test", [real, missing])
|
||||||
|
assert len(result) == 2 # text + 1 real image
|
||||||
|
|
||||||
ext = img.suffix.lower().lstrip(".")
|
|
||||||
mime = {"png": "image/png", "jpg": "image/jpeg",
|
|
||||||
"jpeg": "image/jpeg", "gif": "image/gif",
|
|
||||||
"webp": "image/webp"}.get(ext, "image/png")
|
|
||||||
assert mime == "image/jpeg"
|
|
||||||
|
|
||||||
def test_missing_image_skipped(self, tmp_path):
|
# ═════════════════════════════════════════════════════════════════════════
|
||||||
"""Non-existent image path is silently skipped."""
|
# Level 3: _try_attach_clipboard_image — state management
|
||||||
missing = tmp_path / "does_not_exist.png"
|
# ═════════════════════════════════════════════════════════════════════════
|
||||||
images = [missing]
|
|
||||||
content_parts = [{"type": "text", "text": "test"}]
|
class TestTryAttachClipboardImage:
|
||||||
for img_path in images:
|
"""Test the clipboard → state flow."""
|
||||||
if img_path.exists():
|
|
||||||
content_parts.append({"type": "image_url"})
|
@pytest.fixture
|
||||||
assert len(content_parts) == 1 # only text, no image
|
def cli(self):
|
||||||
|
from cli import HermesCLI
|
||||||
|
cli_obj = HermesCLI.__new__(HermesCLI)
|
||||||
|
cli_obj._attached_images = []
|
||||||
|
cli_obj._image_counter = 0
|
||||||
|
return cli_obj
|
||||||
|
|
||||||
|
def test_image_found_attaches(self, cli):
|
||||||
|
with patch("hermes_cli.clipboard.save_clipboard_image", return_value=True):
|
||||||
|
result = cli._try_attach_clipboard_image()
|
||||||
|
assert result is True
|
||||||
|
assert len(cli._attached_images) == 1
|
||||||
|
assert cli._image_counter == 1
|
||||||
|
|
||||||
|
def test_no_image_doesnt_attach(self, cli):
|
||||||
|
with patch("hermes_cli.clipboard.save_clipboard_image", return_value=False):
|
||||||
|
result = cli._try_attach_clipboard_image()
|
||||||
|
assert result is False
|
||||||
|
assert len(cli._attached_images) == 0
|
||||||
|
assert cli._image_counter == 0 # rolled back
|
||||||
|
|
||||||
|
def test_multiple_attaches_increment_counter(self, cli):
|
||||||
|
with patch("hermes_cli.clipboard.save_clipboard_image", return_value=True):
|
||||||
|
cli._try_attach_clipboard_image()
|
||||||
|
cli._try_attach_clipboard_image()
|
||||||
|
cli._try_attach_clipboard_image()
|
||||||
|
assert len(cli._attached_images) == 3
|
||||||
|
assert cli._image_counter == 3
|
||||||
|
|
||||||
|
def test_mixed_success_and_failure(self, cli):
|
||||||
|
results = [True, False, True]
|
||||||
|
with patch("hermes_cli.clipboard.save_clipboard_image", side_effect=results):
|
||||||
|
cli._try_attach_clipboard_image()
|
||||||
|
cli._try_attach_clipboard_image()
|
||||||
|
cli._try_attach_clipboard_image()
|
||||||
|
assert len(cli._attached_images) == 2
|
||||||
|
assert cli._image_counter == 2 # 3 attempts, 1 rolled back
|
||||||
|
|
||||||
|
def test_image_path_follows_naming_convention(self, cli):
|
||||||
|
with patch("hermes_cli.clipboard.save_clipboard_image", return_value=True):
|
||||||
|
cli._try_attach_clipboard_image()
|
||||||
|
path = cli._attached_images[0]
|
||||||
|
assert path.parent == Path.home() / ".hermes" / "images"
|
||||||
|
assert path.name.startswith("clip_")
|
||||||
|
assert path.suffix == ".png"
|
||||||
|
|
||||||
|
|
||||||
|
# ═════════════════════════════════════════════════════════════════════════
|
||||||
|
# Level 4: Queue routing — tuple unpacking in process_loop
|
||||||
|
# ═════════════════════════════════════════════════════════════════════════
|
||||||
|
|
||||||
|
class TestQueueRouting:
|
||||||
|
"""Test that (text, images) tuples are correctly unpacked and routed."""
|
||||||
|
|
||||||
|
def test_plain_string_stays_string(self):
|
||||||
|
"""Regular text input has no images."""
|
||||||
|
user_input = "hello world"
|
||||||
|
submit_images = []
|
||||||
|
if isinstance(user_input, tuple):
|
||||||
|
user_input, submit_images = user_input
|
||||||
|
assert user_input == "hello world"
|
||||||
|
assert submit_images == []
|
||||||
|
|
||||||
|
def test_tuple_unpacks_text_and_images(self, tmp_path):
|
||||||
|
"""(text, images) tuple is correctly split."""
|
||||||
|
img = tmp_path / "test.png"
|
||||||
|
img.write_bytes(FAKE_PNG)
|
||||||
|
user_input = ("describe this", [img])
|
||||||
|
|
||||||
|
submit_images = []
|
||||||
|
if isinstance(user_input, tuple):
|
||||||
|
user_input, submit_images = user_input
|
||||||
|
assert user_input == "describe this"
|
||||||
|
assert len(submit_images) == 1
|
||||||
|
assert submit_images[0] == img
|
||||||
|
|
||||||
|
def test_empty_text_with_images(self, tmp_path):
|
||||||
|
"""Images without text — text should be empty string."""
|
||||||
|
img = tmp_path / "test.png"
|
||||||
|
img.write_bytes(FAKE_PNG)
|
||||||
|
user_input = ("", [img])
|
||||||
|
|
||||||
|
submit_images = []
|
||||||
|
if isinstance(user_input, tuple):
|
||||||
|
user_input, submit_images = user_input
|
||||||
|
assert user_input == ""
|
||||||
|
assert len(submit_images) == 1
|
||||||
|
|
||||||
|
def test_command_with_images_not_treated_as_command(self):
|
||||||
|
"""Text starting with / in a tuple should still be a command."""
|
||||||
|
user_input = "/help"
|
||||||
|
submit_images = []
|
||||||
|
if isinstance(user_input, tuple):
|
||||||
|
user_input, submit_images = user_input
|
||||||
|
is_command = isinstance(user_input, str) and user_input.startswith("/")
|
||||||
|
assert is_command is True
|
||||||
|
|
||||||
|
def test_images_only_not_treated_as_command(self, tmp_path):
|
||||||
|
"""Empty text + images should not be treated as a command."""
|
||||||
|
img = tmp_path / "test.png"
|
||||||
|
img.write_bytes(FAKE_PNG)
|
||||||
|
user_input = ("", [img])
|
||||||
|
|
||||||
|
submit_images = []
|
||||||
|
if isinstance(user_input, tuple):
|
||||||
|
user_input, submit_images = user_input
|
||||||
|
is_command = isinstance(user_input, str) and user_input.startswith("/")
|
||||||
|
assert is_command is False
|
||||||
|
assert len(submit_images) == 1
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue