From 3543b755afbf5e899273bfdf5c9996e443a9658f Mon Sep 17 00:00:00 2001 From: Bartok9 Date: Mon, 16 Mar 2026 03:35:35 -0400 Subject: [PATCH 01/15] fix(docker): auto-mount host CWD to /workspace MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes #1445 — When using Docker backend, the user's current working directory is now automatically bind-mounted to /workspace inside the container. This allows users to run `cd my-project && hermes` and have their project files accessible to the agent without manual volume config. Changes: - Add host_cwd and auto_mount_cwd parameters to DockerEnvironment - Capture original host CWD in _get_env_config() before container fallback - Pass host_cwd through _create_environment() to Docker backend - Add TERMINAL_DOCKER_NO_AUTO_MOUNT env var to disable if needed - Skip auto-mount when /workspace is already explicitly mounted - Add tests for auto-mount behavior - Add documentation for the new feature The auto-mount is skipped when: 1. TERMINAL_DOCKER_NO_AUTO_MOUNT=true is set 2. User configured docker_volumes with :/workspace 3. persistent_filesystem=true (persistent sandbox mode) This makes the Docker backend behave more intuitively — the agent operates on the user's actual project directory by default. --- tests/tools/test_docker_environment.py | 145 +++++++++++++++++++++++ tools/environments/docker.py | 29 +++++ tools/terminal_tool.py | 14 ++- website/docs/user-guide/configuration.md | 36 ++++++ 4 files changed, 222 insertions(+), 2 deletions(-) diff --git a/tests/tools/test_docker_environment.py b/tests/tools/test_docker_environment.py index ead65528..3ed297b5 100644 --- a/tests/tools/test_docker_environment.py +++ b/tests/tools/test_docker_environment.py @@ -86,3 +86,148 @@ def test_ensure_docker_available_uses_resolved_executable(monkeypatch): }) ] + +def test_auto_mount_host_cwd_adds_volume(monkeypatch, tmp_path): + """When host_cwd is provided, it should be auto-mounted to /workspace.""" + import os + + # Create a temp directory to simulate user's project directory + project_dir = tmp_path / "my-project" + project_dir.mkdir() + + # Mock Docker availability + def _run_docker_version(*args, **kwargs): + return subprocess.CompletedProcess(args[0], 0, stdout="Docker version", stderr="") + + def _run_docker_create(*args, **kwargs): + return subprocess.CompletedProcess(args[0], 1, stdout="", stderr="storage-opt not supported") + + monkeypatch.setattr(docker_env, "find_docker", lambda: "/usr/bin/docker") + monkeypatch.setattr(docker_env.subprocess, "run", _run_docker_version) + + # Mock the inner _Docker class to capture run_args + captured_run_args = [] + + class MockInnerDocker: + container_id = "mock-container-123" + config = type("Config", (), {"executable": "/usr/bin/docker", "forward_env": [], "env": {}})() + + def __init__(self, **kwargs): + captured_run_args.extend(kwargs.get("run_args", [])) + + monkeypatch.setattr( + "minisweagent.environments.docker.DockerEnvironment", + MockInnerDocker, + ) + + # Create environment with host_cwd + env = docker_env.DockerEnvironment( + image="python:3.11", + cwd="/workspace", + timeout=60, + persistent_filesystem=False, # Non-persistent mode uses tmpfs, should be overridden + task_id="test-auto-mount", + volumes=[], + host_cwd=str(project_dir), + auto_mount_cwd=True, + ) + + # Check that the host_cwd was added as a volume mount + volume_mount = f"-v {project_dir}:/workspace" + run_args_str = " ".join(captured_run_args) + assert f"{project_dir}:/workspace" in run_args_str, f"Expected auto-mount in run_args: {run_args_str}" + + +def test_auto_mount_disabled_via_env(monkeypatch, tmp_path): + """Auto-mount should be disabled when TERMINAL_DOCKER_NO_AUTO_MOUNT is set.""" + import os + + project_dir = tmp_path / "my-project" + project_dir.mkdir() + + monkeypatch.setenv("TERMINAL_DOCKER_NO_AUTO_MOUNT", "true") + + def _run_docker_version(*args, **kwargs): + return subprocess.CompletedProcess(args[0], 0, stdout="Docker version", stderr="") + + monkeypatch.setattr(docker_env, "find_docker", lambda: "/usr/bin/docker") + monkeypatch.setattr(docker_env.subprocess, "run", _run_docker_version) + + captured_run_args = [] + + class MockInnerDocker: + container_id = "mock-container-456" + config = type("Config", (), {"executable": "/usr/bin/docker", "forward_env": [], "env": {}})() + + def __init__(self, **kwargs): + captured_run_args.extend(kwargs.get("run_args", [])) + + monkeypatch.setattr( + "minisweagent.environments.docker.DockerEnvironment", + MockInnerDocker, + ) + + env = docker_env.DockerEnvironment( + image="python:3.11", + cwd="/workspace", + timeout=60, + persistent_filesystem=False, + task_id="test-no-auto-mount", + volumes=[], + host_cwd=str(project_dir), + auto_mount_cwd=True, + ) + + # Check that the host_cwd was NOT added (because env var disabled it) + run_args_str = " ".join(captured_run_args) + assert f"{project_dir}:/workspace" not in run_args_str, f"Auto-mount should be disabled: {run_args_str}" + + +def test_auto_mount_skipped_when_workspace_already_mounted(monkeypatch, tmp_path): + """Auto-mount should be skipped if /workspace is already mounted via user volumes.""" + import os + + project_dir = tmp_path / "my-project" + project_dir.mkdir() + other_dir = tmp_path / "other" + other_dir.mkdir() + + def _run_docker_version(*args, **kwargs): + return subprocess.CompletedProcess(args[0], 0, stdout="Docker version", stderr="") + + monkeypatch.setattr(docker_env, "find_docker", lambda: "/usr/bin/docker") + monkeypatch.setattr(docker_env.subprocess, "run", _run_docker_version) + + captured_run_args = [] + + class MockInnerDocker: + container_id = "mock-container-789" + config = type("Config", (), {"executable": "/usr/bin/docker", "forward_env": [], "env": {}})() + + def __init__(self, **kwargs): + captured_run_args.extend(kwargs.get("run_args", [])) + + monkeypatch.setattr( + "minisweagent.environments.docker.DockerEnvironment", + MockInnerDocker, + ) + + # User already configured a volume mount for /workspace + env = docker_env.DockerEnvironment( + image="python:3.11", + cwd="/workspace", + timeout=60, + persistent_filesystem=False, + task_id="test-workspace-exists", + volumes=[f"{other_dir}:/workspace"], # User explicitly mounted something to /workspace + host_cwd=str(project_dir), + auto_mount_cwd=True, + ) + + # The user's explicit mount should be present + run_args_str = " ".join(captured_run_args) + assert f"{other_dir}:/workspace" in run_args_str + + # But the auto-mount should NOT add a duplicate + assert run_args_str.count(":/workspace") == 1, f"Should only have one /workspace mount: {run_args_str}" + diff --git a/tools/environments/docker.py b/tools/environments/docker.py index c04eff8d..1c95f7b3 100644 --- a/tools/environments/docker.py +++ b/tools/environments/docker.py @@ -158,6 +158,10 @@ class DockerEnvironment(BaseEnvironment): Persistence: when enabled, bind mounts preserve /workspace and /root across container restarts. + + Auto-mount: when host_cwd is provided (the user's original working directory), + it is automatically bind-mounted to /workspace unless auto_mount_cwd=False + or the path is already covered by an explicit volume mount. """ def __init__( @@ -172,6 +176,8 @@ class DockerEnvironment(BaseEnvironment): task_id: str = "default", volumes: list = None, network: bool = True, + host_cwd: str = None, + auto_mount_cwd: bool = True, ): if cwd == "~": cwd = "/root" @@ -250,6 +256,29 @@ class DockerEnvironment(BaseEnvironment): else: logger.warning(f"Docker volume '{vol}' missing colon, skipping") + # Auto-mount host CWD to /workspace when enabled (fixes #1445). + # This allows users to run `cd my-project && hermes` and have Docker + # automatically mount their project directory into the container. + # Disabled when: auto_mount_cwd=False, host_cwd is not a valid directory, + # or /workspace is already covered by writable_args or a user volume. + auto_mount_disabled = os.getenv("TERMINAL_DOCKER_NO_AUTO_MOUNT", "").lower() in ("1", "true", "yes") + if host_cwd and auto_mount_cwd and not auto_mount_disabled: + host_cwd_abs = os.path.abspath(os.path.expanduser(host_cwd)) + if os.path.isdir(host_cwd_abs): + # Check if /workspace is already being mounted by persistence or user config + workspace_already_mounted = any( + ":/workspace" in arg for arg in writable_args + ) or any( + ":/workspace" in arg for arg in volume_args + ) + if not workspace_already_mounted: + logger.info(f"Auto-mounting host CWD to /workspace: {host_cwd_abs}") + volume_args.extend(["-v", f"{host_cwd_abs}:/workspace"]) + else: + logger.debug(f"Skipping auto-mount: /workspace already mounted") + else: + logger.debug(f"Skipping auto-mount: host_cwd is not a valid directory: {host_cwd}") + logger.info(f"Docker volume_args: {volume_args}") all_run_args = list(_SECURITY_ARGS) + writable_args + resource_args + volume_args logger.info(f"Docker run_args: {all_run_args}") diff --git a/tools/terminal_tool.py b/tools/terminal_tool.py index fc22bf3f..a9326f3e 100644 --- a/tools/terminal_tool.py +++ b/tools/terminal_tool.py @@ -481,7 +481,12 @@ def _get_env_config() -> Dict[str, Any]: # container/sandbox, fall back to the backend's own default. This # catches the case where cli.py (or .env) leaked the host's CWD. # SSH is excluded since /home/ paths are valid on remote machines. - cwd = os.getenv("TERMINAL_CWD", default_cwd) + raw_cwd = os.getenv("TERMINAL_CWD", default_cwd) + cwd = raw_cwd + # Capture original host CWD for auto-mounting into containers (fixes #1445). + # Even when the container's working directory falls back to /root, we still + # want to auto-mount the user's host project directory to /workspace. + host_cwd = raw_cwd if raw_cwd and os.path.isdir(raw_cwd) else os.getcwd() if env_type in ("modal", "docker", "singularity", "daytona") and cwd: # Host paths that won't exist inside containers host_prefixes = ("/Users/", "/home/", "C:\\", "C:/") @@ -498,6 +503,7 @@ def _get_env_config() -> Dict[str, Any]: "modal_image": os.getenv("TERMINAL_MODAL_IMAGE", default_image), "daytona_image": os.getenv("TERMINAL_DAYTONA_IMAGE", default_image), "cwd": cwd, + "host_cwd": host_cwd, # Original host directory for auto-mounting into containers "timeout": _parse_env_var("TERMINAL_TIMEOUT", "180"), "lifetime_seconds": _parse_env_var("TERMINAL_LIFETIME_SECONDS", "300"), # SSH-specific config @@ -525,7 +531,8 @@ def _get_env_config() -> Dict[str, Any]: def _create_environment(env_type: str, image: str, cwd: str, timeout: int, ssh_config: dict = None, container_config: dict = None, local_config: dict = None, - task_id: str = "default"): + task_id: str = "default", + host_cwd: str = None): """ Create an execution environment from mini-swe-agent. @@ -537,6 +544,7 @@ def _create_environment(env_type: str, image: str, cwd: str, timeout: int, ssh_config: SSH connection config (for env_type="ssh") container_config: Resource config for container backends (cpu, memory, disk, persistent) task_id: Task identifier for environment reuse and snapshot keying + host_cwd: Original host working directory (for auto-mounting into containers) Returns: Environment instance with execute() method @@ -559,6 +567,7 @@ def _create_environment(env_type: str, image: str, cwd: str, timeout: int, cpu=cpu, memory=memory, disk=disk, persistent_filesystem=persistent, task_id=task_id, volumes=volumes, + host_cwd=host_cwd, ) elif env_type == "singularity": @@ -965,6 +974,7 @@ def terminal_tool( container_config=container_config, local_config=local_config, task_id=effective_task_id, + host_cwd=config.get("host_cwd"), ) except ImportError as e: return json.dumps({ diff --git a/website/docs/user-guide/configuration.md b/website/docs/user-guide/configuration.md index 8adec23f..9a673bc7 100644 --- a/website/docs/user-guide/configuration.md +++ b/website/docs/user-guide/configuration.md @@ -520,6 +520,42 @@ This is useful for: Can also be set via environment variable: `TERMINAL_DOCKER_VOLUMES='["/host:/container"]'` (JSON array). +### Docker Auto-Mount Current Directory + +When using the Docker backend, Hermes **automatically mounts your current working directory** to `/workspace` inside the container. This means you can: + +```bash +cd ~/projects/my-app +hermes +# The agent can now see and edit files in ~/projects/my-app via /workspace +``` + +No manual volume configuration needed — just `cd` to your project and run `hermes`. + +**How it works:** +- If you're in `/home/user/projects/my-app`, that directory is mounted to `/workspace` +- The container's working directory is set to `/workspace` +- Files you edit on the host are immediately visible to the agent, and vice versa + +**Disabling auto-mount:** + +If you prefer the old behavior (empty `/workspace` with tmpfs or persistent sandbox), disable auto-mount: + +```bash +export TERMINAL_DOCKER_NO_AUTO_MOUNT=true +``` + +**Precedence:** + +Auto-mount is skipped when: +1. `TERMINAL_DOCKER_NO_AUTO_MOUNT=true` is set +2. You've explicitly configured a volume mount to `/workspace` in `docker_volumes` +3. `container_persistent: true` is set (persistent sandbox mode uses its own `/workspace`) + +:::tip +Auto-mount is ideal for project-based work where you want the agent to operate on your actual files. For isolated sandboxing where the agent shouldn't access your filesystem, set `TERMINAL_DOCKER_NO_AUTO_MOUNT=true`. +::: + ### Persistent Shell By default, each terminal command runs in its own subprocess — working directory, environment variables, and shell variables reset between commands. When **persistent shell** is enabled, a single long-lived bash process is kept alive across `execute()` calls so that state survives between commands. From c1ac32737d57179373911eb88fe7110fbe9c4dba Mon Sep 17 00:00:00 2001 From: teknium1 Date: Mon, 16 Mar 2026 05:05:45 -0700 Subject: [PATCH 02/15] =?UTF-8?q?feat:=20unified=20streaming=20infrastruct?= =?UTF-8?q?ure=20=E2=80=94=20core=20delta=20callbacks=20for=20all=20provid?= =?UTF-8?q?ers?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Stage 1 of streaming support. Adds: - stream_delta_callback parameter on AIAgent.__init__ for real-time token delivery - _interruptible_streaming_api_call() handling chat_completions + anthropic_messages - Enhanced _run_codex_stream() to fire delta callbacks during Codex streaming - _fire_stream_delta() fires both display and TTS callbacks - _fire_reasoning_delta() for reasoning content streaming - Tool-call suppression: callbacks only fire on text-only responses - on_first_delta callback for spinner control on first token - Provider fallback: graceful degradation to non-streaming - _has_stream_consumers() unifies stream_delta_callback and _stream_callback checks - Anthropic streaming returns native Message for downstream compatibility Drawing from PRs #922 (unified streaming), #1312 (gateway consumer), #774 (Telegram streaming), #798 (CLI streaming), #1214 (reasoning modes). Credit: jobless0x, OutThisLife, clicksingh, raulvidis. --- run_agent.py | 333 ++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 301 insertions(+), 32 deletions(-) diff --git a/run_agent.py b/run_agent.py index 8a4147a8..8a93feee 100644 --- a/run_agent.py +++ b/run_agent.py @@ -296,6 +296,7 @@ class AIAgent: reasoning_callback: callable = None, clarify_callback: callable = None, step_callback: callable = None, + stream_delta_callback: callable = None, max_tokens: int = None, reasoning_config: Dict[str, Any] = None, prefill_messages: List[Dict[str, Any]] = None, @@ -395,6 +396,7 @@ class AIAgent: self.reasoning_callback = reasoning_callback self.clarify_callback = clarify_callback self.step_callback = step_callback + self.stream_delta_callback = stream_delta_callback self._last_reported_tool = None # Track for "new tool" mode # Interrupt mechanism for breaking out of tool loops @@ -856,9 +858,9 @@ class AIAgent: """Verbose print — suppressed when streaming TTS is active. Pass ``force=True`` for error/warning messages that should always be - shown even during streaming TTS playback. + shown even during streaming playback (TTS or display). """ - if not force and getattr(self, "_stream_callback", None) is not None: + if not force and self._has_stream_consumers(): return print(*args, **kwargs) @@ -2606,11 +2608,30 @@ class AIAgent: """Execute one streaming Responses API request and return the final response.""" active_client = client or self._ensure_primary_openai_client(reason="codex_stream_direct") max_stream_retries = 1 + has_tool_calls = False + first_delta_fired = False for attempt in range(max_stream_retries + 1): try: with active_client.responses.stream(**api_kwargs) as stream: - for _ in stream: - pass + for event in stream: + if self._interrupt_requested: + break + event_type = getattr(event, "type", "") + # Fire callbacks on text content deltas (suppress during tool calls) + if "output_text.delta" in event_type or event_type == "response.output_text.delta": + delta_text = getattr(event, "delta", "") + if delta_text and not has_tool_calls: + if not first_delta_fired: + first_delta_fired = True + self._fire_stream_delta(delta_text) + # Track tool calls to suppress text streaming + elif "function_call" in event_type: + has_tool_calls = True + # Fire reasoning callbacks + elif "reasoning" in event_type and "delta" in event_type: + reasoning_text = getattr(event, "delta", "") + if reasoning_text: + self._fire_reasoning_delta(reasoning_text) return stream.get_final_response() except RuntimeError as exc: err_text = str(exc) @@ -2972,6 +2993,265 @@ class AIAgent: raise result["error"] return result["response"] + # ── Unified streaming API call ───────────────────────────────────────── + + def _fire_stream_delta(self, text: str) -> None: + """Fire all registered stream delta callbacks (display + TTS).""" + for cb in (self.stream_delta_callback, self._stream_callback): + if cb is not None: + try: + cb(text) + except Exception: + pass + + def _fire_reasoning_delta(self, text: str) -> None: + """Fire reasoning callback if registered.""" + cb = self.reasoning_callback + if cb is not None: + try: + cb(text) + except Exception: + pass + + def _has_stream_consumers(self) -> bool: + """Return True if any streaming consumer is registered.""" + return ( + self.stream_delta_callback is not None + or getattr(self, "_stream_callback", None) is not None + ) + + def _interruptible_streaming_api_call( + self, api_kwargs: dict, *, on_first_delta: callable = None + ): + """Streaming variant of _interruptible_api_call for real-time token delivery. + + Handles all three api_modes: + - chat_completions: stream=True on OpenAI-compatible endpoints + - anthropic_messages: client.messages.stream() via Anthropic SDK + - codex_responses: delegates to _run_codex_stream (already streaming) + + Fires stream_delta_callback and _stream_callback for each text token. + Tool-call turns suppress the callback — only text-only final responses + stream to the consumer. Returns a SimpleNamespace that mimics the + non-streaming response shape so the rest of the agent loop is unchanged. + + Falls back to _interruptible_api_call on provider errors indicating + streaming is not supported. + """ + if self.api_mode == "codex_responses": + # Codex already streams internally; we just need to pass callbacks + return self._interruptible_api_call(api_kwargs) + + result = {"response": None, "error": None} + request_client_holder = {"client": None} + first_delta_fired = {"done": False} + + def _fire_first_delta(): + if not first_delta_fired["done"] and on_first_delta: + first_delta_fired["done"] = True + try: + on_first_delta() + except Exception: + pass + + def _call_chat_completions(): + """Stream a chat completions response.""" + stream_kwargs = {**api_kwargs, "stream": True, "stream_options": {"include_usage": True}} + request_client_holder["client"] = self._create_request_openai_client( + reason="chat_completion_stream_request" + ) + stream = request_client_holder["client"].chat.completions.create(**stream_kwargs) + + content_parts: list = [] + tool_calls_acc: dict = {} + finish_reason = None + model_name = None + role = "assistant" + reasoning_parts: list = [] + usage_obj = None + + for chunk in stream: + if self._interrupt_requested: + break + + if not chunk.choices: + if hasattr(chunk, "model") and chunk.model: + model_name = chunk.model + # Usage comes in the final chunk with empty choices + if hasattr(chunk, "usage") and chunk.usage: + usage_obj = chunk.usage + continue + + delta = chunk.choices[0].delta + if hasattr(chunk, "model") and chunk.model: + model_name = chunk.model + + # Accumulate reasoning content + reasoning_text = getattr(delta, "reasoning_content", None) or getattr(delta, "reasoning", None) + if reasoning_text: + reasoning_parts.append(reasoning_text) + self._fire_reasoning_delta(reasoning_text) + + # Accumulate text content — fire callback only when no tool calls + if delta and delta.content: + content_parts.append(delta.content) + if not tool_calls_acc: + _fire_first_delta() + self._fire_stream_delta(delta.content) + + # Accumulate tool call deltas (silently, no callback) + if delta and delta.tool_calls: + for tc_delta in delta.tool_calls: + idx = tc_delta.index if tc_delta.index is not None else 0 + if idx not in tool_calls_acc: + tool_calls_acc[idx] = { + "id": tc_delta.id or "", + "type": "function", + "function": {"name": "", "arguments": ""}, + } + entry = tool_calls_acc[idx] + if tc_delta.id: + entry["id"] = tc_delta.id + if tc_delta.function: + if tc_delta.function.name: + entry["function"]["name"] += tc_delta.function.name + if tc_delta.function.arguments: + entry["function"]["arguments"] += tc_delta.function.arguments + + if chunk.choices[0].finish_reason: + finish_reason = chunk.choices[0].finish_reason + + # Usage in the final chunk + if hasattr(chunk, "usage") and chunk.usage: + usage_obj = chunk.usage + + # Build mock response matching non-streaming shape + full_content = "".join(content_parts) or None + mock_tool_calls = None + if tool_calls_acc: + mock_tool_calls = [] + for idx in sorted(tool_calls_acc): + tc = tool_calls_acc[idx] + mock_tool_calls.append(SimpleNamespace( + id=tc["id"], + type=tc["type"], + function=SimpleNamespace( + name=tc["function"]["name"], + arguments=tc["function"]["arguments"], + ), + )) + + full_reasoning = "".join(reasoning_parts) or None + mock_message = SimpleNamespace( + role=role, + content=full_content, + tool_calls=mock_tool_calls, + reasoning_content=full_reasoning, + ) + mock_choice = SimpleNamespace( + index=0, + message=mock_message, + finish_reason=finish_reason or "stop", + ) + return SimpleNamespace( + id="stream-" + str(uuid.uuid4()), + model=model_name, + choices=[mock_choice], + usage=usage_obj, + ) + + def _call_anthropic(): + """Stream an Anthropic Messages API response. + + Fires delta callbacks for real-time token delivery, but returns + the native Anthropic Message object from get_final_message() so + the rest of the agent loop (validation, tool extraction, etc.) + works unchanged. + """ + has_tool_use = False + + # Use the Anthropic SDK's streaming context manager + with self._anthropic_client.messages.stream(**api_kwargs) as stream: + for event in stream: + if self._interrupt_requested: + break + + event_type = getattr(event, "type", None) + + if event_type == "content_block_start": + block = getattr(event, "content_block", None) + if block and getattr(block, "type", None) == "tool_use": + has_tool_use = True + + elif event_type == "content_block_delta": + delta = getattr(event, "delta", None) + if delta: + delta_type = getattr(delta, "type", None) + if delta_type == "text_delta": + text = getattr(delta, "text", "") + if text and not has_tool_use: + _fire_first_delta() + self._fire_stream_delta(text) + elif delta_type == "thinking_delta": + thinking_text = getattr(delta, "thinking", "") + if thinking_text: + self._fire_reasoning_delta(thinking_text) + + # Return the native Anthropic Message for downstream processing + return stream.get_final_message() + + def _call(): + try: + if self.api_mode == "anthropic_messages": + self._try_refresh_anthropic_client_credentials() + result["response"] = _call_anthropic() + else: + result["response"] = _call_chat_completions() + except Exception as e: + err_text = str(e).lower() + # Fall back to non-streaming if provider doesn't support it + stream_unsupported = any( + kw in err_text + for kw in ("stream", "not support", "unsupported", "not available") + ) + if stream_unsupported: + logger.info("Streaming not supported by provider, falling back to non-streaming: %s", e) + try: + result["response"] = self._interruptible_api_call(api_kwargs) + except Exception as fallback_err: + result["error"] = fallback_err + else: + result["error"] = e + finally: + request_client = request_client_holder.get("client") + if request_client is not None: + self._close_request_openai_client(request_client, reason="stream_request_complete") + + t = threading.Thread(target=_call, daemon=True) + t.start() + while t.is_alive(): + t.join(timeout=0.3) + if self._interrupt_requested: + try: + if self.api_mode == "anthropic_messages": + from agent.anthropic_adapter import build_anthropic_client + + self._anthropic_client.close() + self._anthropic_client = build_anthropic_client( + self._anthropic_api_key, + getattr(self, "_anthropic_base_url", None), + ) + else: + request_client = request_client_holder.get("client") + if request_client is not None: + self._close_request_openai_client(request_client, reason="stream_interrupt_abort") + except Exception: + pass + raise InterruptedError("Agent interrupted during streaming API call") + if result["error"] is not None: + raise result["error"] + return result["response"] + # ── Provider fallback ────────────────────────────────────────────────── def _try_activate_fallback(self) -> bool: @@ -4172,7 +4452,7 @@ class AIAgent: spinner.stop(cute_msg) elif self.quiet_mode: self._vprint(f" {cute_msg}") - elif self.quiet_mode and self._stream_callback is None: + elif self.quiet_mode and not self._has_stream_consumers(): face = random.choice(KawaiiSpinner.KAWAII_WAITING) emoji = _get_tool_emoji(function_name) preview = _build_tool_preview(function_name, function_args) or function_name @@ -4807,8 +5087,8 @@ class AIAgent: self._vprint(f"\n{self.log_prefix}🔄 Making API call #{api_call_count}/{self.max_iterations}...") self._vprint(f"{self.log_prefix} 📊 Request size: {len(api_messages)} messages, ~{approx_tokens:,} tokens (~{total_chars:,} chars)") self._vprint(f"{self.log_prefix} 🔧 Available tools: {len(self.tools) if self.tools else 0}") - elif self._stream_callback is None: - # Animated thinking spinner in quiet mode (skip during streaming TTS) + elif not self._has_stream_consumers(): + # Animated thinking spinner in quiet mode (skip during streaming) face = random.choice(KawaiiSpinner.KAWAII_THINKING) verb = random.choice(KawaiiSpinner.THINKING_VERBS) if self.thinking_callback: @@ -4848,33 +5128,22 @@ class AIAgent: if os.getenv("HERMES_DUMP_REQUESTS", "").strip().lower() in {"1", "true", "yes", "on"}: self._dump_api_request_debug(api_kwargs, reason="preflight") - cb = getattr(self, "_stream_callback", None) - if cb is not None and self.api_mode == "chat_completions": - response = self._streaming_api_call(api_kwargs, cb) + if self._has_stream_consumers(): + # Streaming path: fire delta callbacks for real-time + # token delivery to CLI display, gateway, or TTS. + def _stop_spinner(): + nonlocal thinking_spinner + if thinking_spinner: + thinking_spinner.stop("") + thinking_spinner = None + if self.thinking_callback: + self.thinking_callback("") + + response = self._interruptible_streaming_api_call( + api_kwargs, on_first_delta=_stop_spinner + ) else: response = self._interruptible_api_call(api_kwargs) - # Forward full response to TTS callback for non-streaming providers - # (e.g. Anthropic) so voice TTS still works via batch delivery. - if cb is not None and response: - try: - content = None - # Try choices first — _interruptible_api_call converts all - # providers (including Anthropic) to this format. - try: - content = response.choices[0].message.content - except (AttributeError, IndexError): - pass - # Fallback: Anthropic native content blocks - if not content and self.api_mode == "anthropic_messages": - text_parts = [ - block.text for block in getattr(response, "content", []) - if getattr(block, "type", None) == "text" and getattr(block, "text", None) - ] - content = " ".join(text_parts) if text_parts else None - if content: - cb(content) - except Exception: - pass api_duration = time.time() - api_start_time From d23e9a9bed94795e5af7919082ec1ecf897f50cb Mon Sep 17 00:00:00 2001 From: teknium1 Date: Mon, 16 Mar 2026 05:10:15 -0700 Subject: [PATCH 03/15] =?UTF-8?q?feat(cli):=20streaming=20token=20display?= =?UTF-8?q?=20=E2=80=94=20line-buffered=20rendering=20with=20response=20bo?= =?UTF-8?q?x=20framing?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Stage 2 of streaming support. CLI now streams tokens in real-time: - _stream_delta(): line-buffered rendering via _cprint (prompt_toolkit safe) - _flush_stream(): emits remaining buffer and closes response box - Response box opens on first token, closes on flush - Skip Rich Panel when streaming already displayed content - Reset streaming state before each agent turn - Compatible with existing TTS streaming (both can fire simultaneously) - Uses skin engine for response label branding Credit: OutThisLife (#798 CLI streaming concept). --- cli.py | 67 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/cli.py b/cli.py index 47018657..79668876 100755 --- a/cli.py +++ b/cli.py @@ -1017,6 +1017,11 @@ class HermesCLI: self.show_reasoning = CLI_CONFIG["display"].get("show_reasoning", False) self.verbose = verbose if verbose is not None else (self.tool_progress_mode == "verbose") + # Streaming display state + self._stream_buf = "" # Partial line buffer for line-buffered rendering + self._stream_started = False # True once first delta arrives + self._stream_box_opened = False # True once the response box header is printed + # Configuration - priority: CLI args > env vars > config file # Model comes from: CLI arg or config.yaml (single source of truth). # LLM_MODEL/OPENAI_MODEL env vars are NOT checked — config.yaml is @@ -1403,6 +1408,56 @@ class HermesCLI: self._spinner_text = text or "" self._invalidate() + # ── Streaming display ──────────────────────────────────────────────── + + def _stream_delta(self, text: str) -> None: + """Line-buffered streaming callback for real-time token rendering. + + Receives text deltas from the agent as tokens arrive. Buffers + partial lines and emits complete lines via _cprint to work + reliably with prompt_toolkit's patch_stdout. + """ + if not text: + return + + # Open the response box header on the very first delta + if not self._stream_box_opened: + self._stream_box_opened = True + try: + from hermes_cli.skin_engine import get_active_skin + _skin = get_active_skin() + label = _skin.get_branding("response_label", "⚕ Hermes") + except Exception: + label = "⚕ Hermes" + w = shutil.get_terminal_size().columns + fill = w - 2 - len(label) + _cprint(f"\n{_GOLD}╭─{label}{'─' * max(fill - 1, 0)}╮{_RST}") + + self._stream_started = True + self._stream_buf += text + + # Emit complete lines, keep partial remainder in buffer + while "\n" in self._stream_buf: + line, self._stream_buf = self._stream_buf.split("\n", 1) + _cprint(line) + + def _flush_stream(self) -> None: + """Emit any remaining partial line from the stream buffer and close the box.""" + if self._stream_buf: + _cprint(self._stream_buf) + self._stream_buf = "" + + # Close the response box + if self._stream_box_opened: + w = shutil.get_terminal_size().columns + _cprint(f"{_GOLD}╰{'─' * (w - 2)}╯{_RST}") + + def _reset_stream_state(self) -> None: + """Reset streaming state before each agent invocation.""" + self._stream_buf = "" + self._stream_started = False + self._stream_box_opened = False + def _slow_command_status(self, command: str) -> str: """Return a user-facing status message for slower slash commands.""" cmd_lower = command.lower().strip() @@ -1588,6 +1643,7 @@ class HermesCLI: checkpoint_max_snapshots=self.checkpoint_max_snapshots, pass_session_id=self.pass_session_id, tool_progress_callback=self._on_tool_progress, + stream_delta_callback=self._stream_delta, ) # Apply any pending title now that the session exists in the DB if self._pending_title and self._session_db: @@ -4616,6 +4672,9 @@ class HermesCLI: # Run the conversation with interrupt monitoring result = None + # Reset streaming display state for this turn + self._reset_stream_state() + # --- Streaming TTS setup --- # When ElevenLabs is the TTS provider and sounddevice is available, # we stream audio sentence-by-sentence as the agent generates tokens @@ -4742,6 +4801,9 @@ class HermesCLI: agent_thread.join() # Ensure agent thread completes + # Flush any remaining streamed text and close the box + self._flush_stream() + # Signal end-of-text to TTS consumer and wait for it to finish if use_streaming_tts and text_queue is not None: text_queue.put(None) # sentinel @@ -4816,10 +4878,15 @@ class HermesCLI: _resp_text = "#FFF8DC" is_error_response = result and (result.get("failed") or result.get("partial")) + already_streamed = self._stream_started and self._stream_box_opened and not is_error_response if use_streaming_tts and _streaming_box_opened and not is_error_response: # Text was already printed sentence-by-sentence; just close the box w = shutil.get_terminal_size().columns _cprint(f"\n{_GOLD}╰{'─' * (w - 2)}╯{_RST}") + elif already_streamed: + # Response was already streamed token-by-token with box framing; + # _flush_stream() already closed the box. Skip Rich Panel. + pass else: _chat_console = ChatConsole() _chat_console.print(Panel( From 2219695d92a4d91505a247ead482b5c63c1d51d7 Mon Sep 17 00:00:00 2001 From: teknium1 Date: Mon, 16 Mar 2026 05:12:38 -0700 Subject: [PATCH 04/15] =?UTF-8?q?test:=2014-test=20streaming=20suite=20?= =?UTF-8?q?=E2=80=94=20accumulator,=20callbacks,=20fallback,=20reasoning,?= =?UTF-8?q?=20Codex?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Tests cover: - Text/tool-call/mixed response accumulation into correct shape - Delta callback ordering and on_first_delta firing once - Tool-call suppression (no callbacks during tool turns) - Provider fallback on 'not supported' errors - Reasoning content accumulation and callback - _has_stream_consumers() detection - Codex stream delta callback firing --- tests/test_streaming.py | 524 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 524 insertions(+) create mode 100644 tests/test_streaming.py diff --git a/tests/test_streaming.py b/tests/test_streaming.py new file mode 100644 index 00000000..3615c2a9 --- /dev/null +++ b/tests/test_streaming.py @@ -0,0 +1,524 @@ +"""Tests for streaming token delivery infrastructure. + +Tests the unified streaming API call, delta callbacks, tool-call +suppression, provider fallback, and CLI streaming display. +""" +import json +import threading +import uuid +from types import SimpleNamespace +from unittest.mock import MagicMock, patch, PropertyMock + +import pytest + + +# ── Helpers ────────────────────────────────────────────────────────────── + + +def _make_stream_chunk( + content=None, tool_calls=None, finish_reason=None, + model=None, reasoning_content=None, usage=None, +): + """Build a mock streaming chunk matching OpenAI's ChatCompletionChunk shape.""" + delta = SimpleNamespace( + content=content, + tool_calls=tool_calls, + reasoning_content=reasoning_content, + reasoning=None, + ) + choice = SimpleNamespace( + index=0, + delta=delta, + finish_reason=finish_reason, + ) + chunk = SimpleNamespace( + choices=[choice], + model=model, + usage=usage, + ) + return chunk + + +def _make_tool_call_delta(index=0, tc_id=None, name=None, arguments=None): + """Build a mock tool call delta.""" + func = SimpleNamespace(name=name, arguments=arguments) + return SimpleNamespace(index=index, id=tc_id, function=func) + + +def _make_empty_chunk(model=None, usage=None): + """Build a chunk with no choices (usage-only final chunk).""" + return SimpleNamespace(choices=[], model=model, usage=usage) + + +# ── Test: Streaming Accumulator ────────────────────────────────────────── + + +class TestStreamingAccumulator: + """Verify that _interruptible_streaming_api_call accumulates content + and tool calls into a response matching the non-streaming shape.""" + + @patch("run_agent.AIAgent._create_request_openai_client") + @patch("run_agent.AIAgent._close_request_openai_client") + def test_text_only_response(self, mock_close, mock_create): + """Text-only stream produces correct response shape.""" + from run_agent import AIAgent + + chunks = [ + _make_stream_chunk(content="Hello"), + _make_stream_chunk(content=" world"), + _make_stream_chunk(content="!", finish_reason="stop", model="test-model"), + _make_empty_chunk(usage=SimpleNamespace(prompt_tokens=10, completion_tokens=3)), + ] + + mock_client = MagicMock() + mock_client.chat.completions.create.return_value = iter(chunks) + mock_create.return_value = mock_client + + agent = AIAgent( + model="test/model", + quiet_mode=True, + skip_context_files=True, + skip_memory=True, + ) + agent.api_mode = "chat_completions" + agent._interrupt_requested = False + + response = agent._interruptible_streaming_api_call({}) + + assert response.choices[0].message.content == "Hello world!" + assert response.choices[0].message.tool_calls is None + assert response.choices[0].finish_reason == "stop" + assert response.usage is not None + assert response.usage.completion_tokens == 3 + + @patch("run_agent.AIAgent._create_request_openai_client") + @patch("run_agent.AIAgent._close_request_openai_client") + def test_tool_call_response(self, mock_close, mock_create): + """Tool call stream accumulates ID, name, and arguments.""" + from run_agent import AIAgent + + chunks = [ + _make_stream_chunk(tool_calls=[ + _make_tool_call_delta(index=0, tc_id="call_123", name="terminal") + ]), + _make_stream_chunk(tool_calls=[ + _make_tool_call_delta(index=0, arguments='{"command":') + ]), + _make_stream_chunk(tool_calls=[ + _make_tool_call_delta(index=0, arguments=' "ls"}') + ]), + _make_stream_chunk(finish_reason="tool_calls"), + ] + + mock_client = MagicMock() + mock_client.chat.completions.create.return_value = iter(chunks) + mock_create.return_value = mock_client + + agent = AIAgent( + model="test/model", + quiet_mode=True, + skip_context_files=True, + skip_memory=True, + ) + agent.api_mode = "chat_completions" + agent._interrupt_requested = False + + response = agent._interruptible_streaming_api_call({}) + + tc = response.choices[0].message.tool_calls + assert tc is not None + assert len(tc) == 1 + assert tc[0].id == "call_123" + assert tc[0].function.name == "terminal" + assert tc[0].function.arguments == '{"command": "ls"}' + + @patch("run_agent.AIAgent._create_request_openai_client") + @patch("run_agent.AIAgent._close_request_openai_client") + def test_mixed_content_and_tool_calls(self, mock_close, mock_create): + """Stream with both text and tool calls accumulates both.""" + from run_agent import AIAgent + + chunks = [ + _make_stream_chunk(content="Let me check"), + _make_stream_chunk(tool_calls=[ + _make_tool_call_delta(index=0, tc_id="call_456", name="web_search") + ]), + _make_stream_chunk(tool_calls=[ + _make_tool_call_delta(index=0, arguments='{"query": "test"}') + ]), + _make_stream_chunk(finish_reason="tool_calls"), + ] + + mock_client = MagicMock() + mock_client.chat.completions.create.return_value = iter(chunks) + mock_create.return_value = mock_client + + agent = AIAgent( + model="test/model", + quiet_mode=True, + skip_context_files=True, + skip_memory=True, + ) + agent.api_mode = "chat_completions" + agent._interrupt_requested = False + + response = agent._interruptible_streaming_api_call({}) + + assert response.choices[0].message.content == "Let me check" + assert len(response.choices[0].message.tool_calls) == 1 + + +# ── Test: Streaming Callbacks ──────────────────────────────────────────── + + +class TestStreamingCallbacks: + """Verify that delta callbacks fire correctly.""" + + @patch("run_agent.AIAgent._create_request_openai_client") + @patch("run_agent.AIAgent._close_request_openai_client") + def test_deltas_fire_in_order(self, mock_close, mock_create): + """Callbacks receive text deltas in order.""" + from run_agent import AIAgent + + chunks = [ + _make_stream_chunk(content="a"), + _make_stream_chunk(content="b"), + _make_stream_chunk(content="c"), + _make_stream_chunk(finish_reason="stop"), + ] + + deltas = [] + + mock_client = MagicMock() + mock_client.chat.completions.create.return_value = iter(chunks) + mock_create.return_value = mock_client + + agent = AIAgent( + model="test/model", + quiet_mode=True, + skip_context_files=True, + skip_memory=True, + stream_delta_callback=lambda t: deltas.append(t), + ) + agent.api_mode = "chat_completions" + agent._interrupt_requested = False + + agent._interruptible_streaming_api_call({}) + + assert deltas == ["a", "b", "c"] + + @patch("run_agent.AIAgent._create_request_openai_client") + @patch("run_agent.AIAgent._close_request_openai_client") + def test_on_first_delta_fires_once(self, mock_close, mock_create): + """on_first_delta callback fires exactly once.""" + from run_agent import AIAgent + + chunks = [ + _make_stream_chunk(content="a"), + _make_stream_chunk(content="b"), + _make_stream_chunk(finish_reason="stop"), + ] + + first_delta_calls = [] + + mock_client = MagicMock() + mock_client.chat.completions.create.return_value = iter(chunks) + mock_create.return_value = mock_client + + agent = AIAgent( + model="test/model", + quiet_mode=True, + skip_context_files=True, + skip_memory=True, + ) + agent.api_mode = "chat_completions" + agent._interrupt_requested = False + + agent._interruptible_streaming_api_call( + {}, on_first_delta=lambda: first_delta_calls.append(True) + ) + + assert len(first_delta_calls) == 1 + + @patch("run_agent.AIAgent._create_request_openai_client") + @patch("run_agent.AIAgent._close_request_openai_client") + def test_tool_only_does_not_fire_callback(self, mock_close, mock_create): + """Tool-call-only stream does not fire the delta callback.""" + from run_agent import AIAgent + + chunks = [ + _make_stream_chunk(tool_calls=[ + _make_tool_call_delta(index=0, tc_id="call_789", name="terminal") + ]), + _make_stream_chunk(tool_calls=[ + _make_tool_call_delta(index=0, arguments='{"command": "ls"}') + ]), + _make_stream_chunk(finish_reason="tool_calls"), + ] + + deltas = [] + + mock_client = MagicMock() + mock_client.chat.completions.create.return_value = iter(chunks) + mock_create.return_value = mock_client + + agent = AIAgent( + model="test/model", + quiet_mode=True, + skip_context_files=True, + skip_memory=True, + stream_delta_callback=lambda t: deltas.append(t), + ) + agent.api_mode = "chat_completions" + agent._interrupt_requested = False + + agent._interruptible_streaming_api_call({}) + + assert deltas == [] + + @patch("run_agent.AIAgent._create_request_openai_client") + @patch("run_agent.AIAgent._close_request_openai_client") + def test_text_suppressed_when_tool_calls_present(self, mock_close, mock_create): + """Text deltas are suppressed when tool calls are also in the stream.""" + from run_agent import AIAgent + + chunks = [ + _make_stream_chunk(content="thinking..."), + _make_stream_chunk(tool_calls=[ + _make_tool_call_delta(index=0, tc_id="call_abc", name="read_file") + ]), + _make_stream_chunk(content=" more text"), + _make_stream_chunk(finish_reason="tool_calls"), + ] + + deltas = [] + + mock_client = MagicMock() + mock_client.chat.completions.create.return_value = iter(chunks) + mock_create.return_value = mock_client + + agent = AIAgent( + model="test/model", + quiet_mode=True, + skip_context_files=True, + skip_memory=True, + stream_delta_callback=lambda t: deltas.append(t), + ) + agent.api_mode = "chat_completions" + agent._interrupt_requested = False + + response = agent._interruptible_streaming_api_call({}) + + # Text before tool call IS fired (we don't know yet it will have tools) + assert "thinking..." in deltas + # Text after tool call is NOT fired + assert " more text" not in deltas + # But content is still accumulated in the response + assert response.choices[0].message.content == "thinking... more text" + + +# ── Test: Streaming Fallback ──────────────────────────────────────────── + + +class TestStreamingFallback: + """Verify fallback to non-streaming on unsupported providers.""" + + @patch("run_agent.AIAgent._interruptible_api_call") + @patch("run_agent.AIAgent._create_request_openai_client") + @patch("run_agent.AIAgent._close_request_openai_client") + def test_stream_error_falls_back(self, mock_close, mock_create, mock_non_stream): + """'not supported' error triggers fallback to non-streaming.""" + from run_agent import AIAgent + + mock_client = MagicMock() + mock_client.chat.completions.create.side_effect = Exception( + "Streaming is not supported for this model" + ) + mock_create.return_value = mock_client + + fallback_response = SimpleNamespace( + id="fallback", + model="test", + choices=[SimpleNamespace( + index=0, + message=SimpleNamespace( + role="assistant", + content="fallback response", + tool_calls=None, + reasoning_content=None, + ), + finish_reason="stop", + )], + usage=None, + ) + mock_non_stream.return_value = fallback_response + + agent = AIAgent( + model="test/model", + quiet_mode=True, + skip_context_files=True, + skip_memory=True, + ) + agent.api_mode = "chat_completions" + agent._interrupt_requested = False + + response = agent._interruptible_streaming_api_call({}) + + assert response.choices[0].message.content == "fallback response" + mock_non_stream.assert_called_once() + + @patch("run_agent.AIAgent._create_request_openai_client") + @patch("run_agent.AIAgent._close_request_openai_client") + def test_non_stream_error_raises(self, mock_close, mock_create): + """Non-streaming errors propagate normally.""" + from run_agent import AIAgent + + mock_client = MagicMock() + mock_client.chat.completions.create.side_effect = Exception("Rate limit exceeded") + mock_create.return_value = mock_client + + agent = AIAgent( + model="test/model", + quiet_mode=True, + skip_context_files=True, + skip_memory=True, + ) + agent.api_mode = "chat_completions" + agent._interrupt_requested = False + + with pytest.raises(Exception, match="Rate limit exceeded"): + agent._interruptible_streaming_api_call({}) + + +# ── Test: Reasoning Streaming ──────────────────────────────────────────── + + +class TestReasoningStreaming: + """Verify reasoning content is accumulated and callback fires.""" + + @patch("run_agent.AIAgent._create_request_openai_client") + @patch("run_agent.AIAgent._close_request_openai_client") + def test_reasoning_callback_fires(self, mock_close, mock_create): + """Reasoning deltas fire the reasoning_callback.""" + from run_agent import AIAgent + + chunks = [ + _make_stream_chunk(reasoning_content="Let me think"), + _make_stream_chunk(reasoning_content=" about this"), + _make_stream_chunk(content="The answer is 42"), + _make_stream_chunk(finish_reason="stop"), + ] + + reasoning_deltas = [] + text_deltas = [] + + mock_client = MagicMock() + mock_client.chat.completions.create.return_value = iter(chunks) + mock_create.return_value = mock_client + + agent = AIAgent( + model="test/model", + quiet_mode=True, + skip_context_files=True, + skip_memory=True, + stream_delta_callback=lambda t: text_deltas.append(t), + reasoning_callback=lambda t: reasoning_deltas.append(t), + ) + agent.api_mode = "chat_completions" + agent._interrupt_requested = False + + response = agent._interruptible_streaming_api_call({}) + + assert reasoning_deltas == ["Let me think", " about this"] + assert text_deltas == ["The answer is 42"] + assert response.choices[0].message.reasoning_content == "Let me think about this" + assert response.choices[0].message.content == "The answer is 42" + + +# ── Test: _has_stream_consumers ────────────────────────────────────────── + + +class TestHasStreamConsumers: + """Verify _has_stream_consumers() detects registered callbacks.""" + + def test_no_consumers(self): + from run_agent import AIAgent + agent = AIAgent( + model="test/model", + quiet_mode=True, + skip_context_files=True, + skip_memory=True, + ) + assert agent._has_stream_consumers() is False + + def test_delta_callback_set(self): + from run_agent import AIAgent + agent = AIAgent( + model="test/model", + quiet_mode=True, + skip_context_files=True, + skip_memory=True, + stream_delta_callback=lambda t: None, + ) + assert agent._has_stream_consumers() is True + + def test_stream_callback_set(self): + from run_agent import AIAgent + agent = AIAgent( + model="test/model", + quiet_mode=True, + skip_context_files=True, + skip_memory=True, + ) + agent._stream_callback = lambda t: None + assert agent._has_stream_consumers() is True + + +# ── Test: Codex stream fires callbacks ──────────────────────────────── + + +class TestCodexStreamCallbacks: + """Verify _run_codex_stream fires delta callbacks.""" + + def test_codex_text_delta_fires_callback(self): + from run_agent import AIAgent + + deltas = [] + + agent = AIAgent( + model="test/model", + quiet_mode=True, + skip_context_files=True, + skip_memory=True, + stream_delta_callback=lambda t: deltas.append(t), + ) + agent.api_mode = "codex_responses" + agent._interrupt_requested = False + + # Mock the stream context manager + mock_event_text = SimpleNamespace( + type="response.output_text.delta", + delta="Hello from Codex!", + ) + mock_event_done = SimpleNamespace( + type="response.completed", + delta="", + ) + + mock_stream = MagicMock() + mock_stream.__enter__ = MagicMock(return_value=mock_stream) + mock_stream.__exit__ = MagicMock(return_value=False) + mock_stream.__iter__ = MagicMock(return_value=iter([mock_event_text, mock_event_done])) + mock_stream.get_final_response.return_value = SimpleNamespace( + output=[SimpleNamespace( + type="message", + content=[SimpleNamespace(type="output_text", text="Hello from Codex!")], + )], + status="completed", + ) + + mock_client = MagicMock() + mock_client.responses.stream.return_value = mock_stream + + response = agent._run_codex_stream({}, client=mock_client) + assert "Hello from Codex!" in deltas From ac739e485fea9b423181cad8869cb30666060a6a Mon Sep 17 00:00:00 2001 From: teknium1 Date: Mon, 16 Mar 2026 05:28:10 -0700 Subject: [PATCH 05/15] fix(cli): reasoning tag suppression during streaming + fix fallback detection MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes two issues found during live testing: 1. Reasoning tag suppression: close tags like that arrive split across stream tokens (e.g. '\n\nHello') were being lost because the buffer was discarded. Fix: keep a sliding window of the tail (max close tag length) so partial tags survive across tokens. 2. Streaming fallback detection was too broad — 'stream' matched any error containing that word (including 'stream_options' rejections). Narrowed to specific phrases: 'streaming is not', 'streaming not support', 'does not support stream', 'not available'. Verified with real API calls: streaming works end-to-end with reasoning block suppression, response box framing, and proper fallback to Rich Panel when streaming isn't active. --- cli.py | 79 ++++++++++++++++++++++++++++++++++++++++++++++++++-- run_agent.py | 7 +++-- 2 files changed, 82 insertions(+), 4 deletions(-) diff --git a/cli.py b/cli.py index 79668876..6292993e 100755 --- a/cli.py +++ b/cli.py @@ -1416,12 +1416,86 @@ class HermesCLI: Receives text deltas from the agent as tokens arrive. Buffers partial lines and emits complete lines via _cprint to work reliably with prompt_toolkit's patch_stdout. + + Reasoning/thinking blocks (, , etc.) + are suppressed during streaming since they'd display raw XML tags. + The agent strips them from the final response anyway. """ if not text: return - # Open the response box header on the very first delta + self._stream_started = True + + # ── Tag-based reasoning suppression ── + # Track whether we're inside a reasoning/thinking block. + # These tags are model-generated (system prompt tells the model + # to use them) and get stripped from final_response. We must + # suppress them during streaming too. + _OPEN_TAGS = ("", "", "") + _CLOSE_TAGS = ("", "", "") + + # Append to a pre-filter buffer first + self._stream_prefilt = getattr(self, "_stream_prefilt", "") + text + + # Check if we're entering a reasoning block + if not getattr(self, "_in_reasoning_block", False): + for tag in _OPEN_TAGS: + idx = self._stream_prefilt.find(tag) + if idx != -1: + # Emit everything before the tag + before = self._stream_prefilt[:idx] + if before: + self._emit_stream_text(before) + self._in_reasoning_block = True + self._stream_prefilt = self._stream_prefilt[idx + len(tag):] + break + + # Could also be a partial open tag at the end — hold it back + if not getattr(self, "_in_reasoning_block", False): + # Check for partial tag match at the end + safe = self._stream_prefilt + for tag in _OPEN_TAGS: + for i in range(1, len(tag)): + if self._stream_prefilt.endswith(tag[:i]): + safe = self._stream_prefilt[:-i] + break + if safe: + self._emit_stream_text(safe) + self._stream_prefilt = self._stream_prefilt[len(safe):] + return + + # Inside a reasoning block — look for close tag. + # Keep accumulating _stream_prefilt because close tags can arrive + # split across multiple tokens (e.g. "..."). + if getattr(self, "_in_reasoning_block", False): + for tag in _CLOSE_TAGS: + idx = self._stream_prefilt.find(tag) + if idx != -1: + self._in_reasoning_block = False + after = self._stream_prefilt[idx + len(tag):] + self._stream_prefilt = "" + # Process remaining text after close tag + if after: + self._emit_stream_text(after) + return + # Still inside reasoning block — keep only the tail that could + # be a partial close tag prefix (save memory on long blocks). + max_tag_len = max(len(t) for t in _CLOSE_TAGS) + if len(self._stream_prefilt) > max_tag_len: + self._stream_prefilt = self._stream_prefilt[-max_tag_len:] + return + + def _emit_stream_text(self, text: str) -> None: + """Emit filtered text to the streaming display.""" + if not text: + return + + # Open the response box header on the very first visible text if not self._stream_box_opened: + # Strip leading whitespace/newlines before first visible content + text = text.lstrip("\n") + if not text: + return self._stream_box_opened = True try: from hermes_cli.skin_engine import get_active_skin @@ -1433,7 +1507,6 @@ class HermesCLI: fill = w - 2 - len(label) _cprint(f"\n{_GOLD}╭─{label}{'─' * max(fill - 1, 0)}╮{_RST}") - self._stream_started = True self._stream_buf += text # Emit complete lines, keep partial remainder in buffer @@ -1457,6 +1530,8 @@ class HermesCLI: self._stream_buf = "" self._stream_started = False self._stream_box_opened = False + self._stream_prefilt = "" + self._in_reasoning_block = False def _slow_command_status(self, command: str) -> str: """Return a user-facing status message for slower slash commands.""" diff --git a/run_agent.py b/run_agent.py index 8a93feee..c8c471e3 100644 --- a/run_agent.py +++ b/run_agent.py @@ -3209,10 +3209,13 @@ class AIAgent: result["response"] = _call_chat_completions() except Exception as e: err_text = str(e).lower() - # Fall back to non-streaming if provider doesn't support it + # Fall back to non-streaming if provider doesn't support it. + # Be specific in matching — "stream" alone is too broad and + # catches unrelated errors like "stream_options" rejections. stream_unsupported = any( kw in err_text - for kw in ("stream", "not support", "unsupported", "not available") + for kw in ("streaming is not", "streaming not support", + "does not support stream", "not available") ) if stream_unsupported: logger.info("Streaming not supported by provider, falling back to non-streaming: %s", e) From 5479bb0e0cd76a7a4406f7ee90839ac9950046d1 Mon Sep 17 00:00:00 2001 From: teknium1 Date: Mon, 16 Mar 2026 05:52:42 -0700 Subject: [PATCH 06/15] =?UTF-8?q?feat(gateway):=20streaming=20token=20deli?= =?UTF-8?q?very=20=E2=80=94=20StreamingConfig,=20GatewayStreamConsumer,=20?= =?UTF-8?q?already=5Fsent?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Stage 3 of streaming support. Gateway now streams tokens to messaging platforms: - StreamingConfig dataclass (enabled, transport, edit_interval, buffer_threshold, cursor) on GatewayConfig with from_dict/to_dict serialization - GatewayStreamConsumer: async queue-based consumer that progressively edits a single message on the target platform (edit transport) - on_delta() → queue → run() async task → send_or_edit() with rate limiting - already_sent propagation: when streaming delivered the response, handler returns None so base adapter skips duplicate send() - stream_delta_callback wired into AIAgent constructor in _run_agent - Consumer lifecycle: started as asyncio task, awaited with timeout in finally Config (config.yaml): streaming: enabled: true transport: edit # progressive editMessageText edit_interval: 0.3 # seconds between edits buffer_threshold: 40 # chars before forcing flush cursor: ' ▉' Credit: jobless0x (#774, #1312), OutThisLife (#798), clicksingh (#697). --- gateway/config.py | 36 ++++++++ gateway/run.py | 62 +++++++++++++ gateway/stream_consumer.py | 172 +++++++++++++++++++++++++++++++++++++ 3 files changed, 270 insertions(+) create mode 100644 gateway/stream_consumer.py diff --git a/gateway/config.py b/gateway/config.py index af399f0f..676b5214 100644 --- a/gateway/config.py +++ b/gateway/config.py @@ -146,6 +146,37 @@ class PlatformConfig: ) +@dataclass +class StreamingConfig: + """Configuration for real-time token streaming to messaging platforms.""" + enabled: bool = True + transport: str = "edit" # "edit" (progressive editMessageText) or "off" + edit_interval: float = 0.3 # Seconds between message edits + buffer_threshold: int = 40 # Chars before forcing an edit + cursor: str = " ▉" # Cursor shown during streaming + + def to_dict(self) -> Dict[str, Any]: + return { + "enabled": self.enabled, + "transport": self.transport, + "edit_interval": self.edit_interval, + "buffer_threshold": self.buffer_threshold, + "cursor": self.cursor, + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "StreamingConfig": + if not data: + return cls() + return cls( + enabled=data.get("enabled", True), + transport=data.get("transport", "edit"), + edit_interval=float(data.get("edit_interval", 0.3)), + buffer_threshold=int(data.get("buffer_threshold", 40)), + cursor=data.get("cursor", " ▉"), + ) + + @dataclass class GatewayConfig: """ @@ -179,6 +210,9 @@ class GatewayConfig: # Session isolation in shared chats group_sessions_per_user: bool = True # Isolate group/channel sessions per participant when user IDs are available + # Streaming configuration + streaming: StreamingConfig = field(default_factory=StreamingConfig) + def get_connected_platforms(self) -> List[Platform]: """Return list of platforms that are enabled and configured.""" connected = [] @@ -244,6 +278,7 @@ class GatewayConfig: "always_log_local": self.always_log_local, "stt_enabled": self.stt_enabled, "group_sessions_per_user": self.group_sessions_per_user, + "streaming": self.streaming.to_dict(), } @classmethod @@ -297,6 +332,7 @@ class GatewayConfig: always_log_local=data.get("always_log_local", True), stt_enabled=_coerce_bool(stt_enabled, True), group_sessions_per_user=_coerce_bool(group_sessions_per_user, True), + streaming=StreamingConfig.from_dict(data.get("streaming", {})), ) diff --git a/gateway/run.py b/gateway/run.py index a7e637ec..8bc860c3 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -1915,6 +1915,11 @@ class GatewayRunner: if self._should_send_voice_reply(event, response, agent_messages): await self._send_voice_reply(event, response) + # If streaming already delivered the response, return None so + # _process_message_background doesn't send it again. + if agent_result.get("already_sent"): + return None + return response except Exception as e: @@ -4080,6 +4085,7 @@ class GatewayRunner: agent_holder = [None] # Mutable container for the agent instance result_holder = [None] # Mutable container for the result tools_holder = [None] # Mutable container for the tool definitions + stream_consumer_holder = [None] # Mutable container for stream consumer # Bridge sync step_callback → async hooks.emit for agent:step events _loop_for_step = asyncio.get_event_loop() @@ -4142,6 +4148,35 @@ class GatewayRunner: honcho_manager, honcho_config = self._get_or_create_gateway_honcho(session_key) reasoning_config = self._load_reasoning_config() self._reasoning_config = reasoning_config + # Set up streaming consumer if enabled + _stream_consumer = None + _stream_delta_cb = None + _scfg = getattr(getattr(self, 'config', None), 'streaming', None) + if _scfg is None: + from gateway.config import StreamingConfig + _scfg = StreamingConfig() + + if _scfg.enabled and _scfg.transport != "off": + try: + from gateway.stream_consumer import GatewayStreamConsumer, StreamConsumerConfig + _adapter = self.adapters.get(source.platform) + if _adapter: + _consumer_cfg = StreamConsumerConfig( + edit_interval=_scfg.edit_interval, + buffer_threshold=_scfg.buffer_threshold, + cursor=_scfg.cursor, + ) + _stream_consumer = GatewayStreamConsumer( + adapter=_adapter, + chat_id=source.chat_id, + config=_consumer_cfg, + metadata={"thread_id": source.thread_id} if source.thread_id else None, + ) + _stream_delta_cb = _stream_consumer.on_delta + stream_consumer_holder[0] = _stream_consumer + except Exception as _sc_err: + logger.debug("Could not set up stream consumer: %s", _sc_err) + agent = AIAgent( model=model, **runtime_kwargs, @@ -4161,6 +4196,7 @@ class GatewayRunner: session_id=session_id, tool_progress_callback=progress_callback if tool_progress_enabled else None, step_callback=_step_callback_sync if _hooks_ref.loaded_hooks else None, + stream_delta_callback=_stream_delta_cb, platform=platform_key, honcho_session_key=session_key, honcho_manager=honcho_manager, @@ -4231,6 +4267,10 @@ class GatewayRunner: result = agent.run_conversation(message, conversation_history=agent_history, task_id=session_id) result_holder[0] = result + + # Signal the stream consumer that the agent is done + if _stream_consumer is not None: + _stream_consumer.finish() # Return final response, or a message if something went wrong final_response = result.get("final_response") @@ -4330,6 +4370,11 @@ class GatewayRunner: progress_task = None if tool_progress_enabled: progress_task = asyncio.create_task(send_progress_messages()) + + # Start stream consumer task if configured + stream_task = None + if stream_consumer_holder[0] is not None: + stream_task = asyncio.create_task(stream_consumer_holder[0].run()) # Track this agent as running for this session (for interrupt support) # We do this in a callback after the agent is created @@ -4412,6 +4457,17 @@ class GatewayRunner: if progress_task: progress_task.cancel() interrupt_monitor.cancel() + + # Wait for stream consumer to finish its final edit + if stream_task: + try: + await asyncio.wait_for(stream_task, timeout=5.0) + except (asyncio.TimeoutError, asyncio.CancelledError): + stream_task.cancel() + try: + await stream_task + except asyncio.CancelledError: + pass # Clean up tracking tracking_task.cancel() @@ -4425,6 +4481,12 @@ class GatewayRunner: await task except asyncio.CancelledError: pass + + # If streaming already delivered the response, mark it so the + # caller's send() is skipped (avoiding duplicate messages). + _sc = stream_consumer_holder[0] + if _sc and _sc.already_sent and isinstance(response, dict): + response["already_sent"] = True return response diff --git a/gateway/stream_consumer.py b/gateway/stream_consumer.py new file mode 100644 index 00000000..f6ab004c --- /dev/null +++ b/gateway/stream_consumer.py @@ -0,0 +1,172 @@ +"""Gateway streaming consumer — bridges sync agent callbacks to async platform delivery. + +The agent fires stream_delta_callback(text) synchronously from its worker thread. +GatewayStreamConsumer: + 1. Receives deltas via on_delta() (thread-safe, sync) + 2. Queues them to an asyncio task via queue.Queue + 3. The async run() task buffers, rate-limits, and progressively edits + a single message on the target platform + +Design: Uses the edit transport (send initial message, then editMessageText). +This is universally supported across Telegram, Discord, and Slack. + +Credit: jobless0x (#774, #1312), OutThisLife (#798), clicksingh (#697). +""" + +from __future__ import annotations + +import asyncio +import logging +import queue +import time +from dataclasses import dataclass +from typing import Any, Optional + +logger = logging.getLogger("gateway.stream_consumer") + +# Sentinel to signal the stream is complete +_DONE = object() + + +@dataclass +class StreamConsumerConfig: + """Runtime config for a single stream consumer instance.""" + edit_interval: float = 0.3 + buffer_threshold: int = 40 + cursor: str = " ▉" + + +class GatewayStreamConsumer: + """Async consumer that progressively edits a platform message with streamed tokens. + + Usage:: + + consumer = GatewayStreamConsumer(adapter, chat_id, config, metadata=metadata) + # Pass consumer.on_delta as stream_delta_callback to AIAgent + agent = AIAgent(..., stream_delta_callback=consumer.on_delta) + # Start the consumer as an asyncio task + task = asyncio.create_task(consumer.run()) + # ... run agent in thread pool ... + consumer.finish() # signal completion + await task # wait for final edit + """ + + def __init__( + self, + adapter: Any, + chat_id: str, + config: Optional[StreamConsumerConfig] = None, + metadata: Optional[dict] = None, + ): + self.adapter = adapter + self.chat_id = chat_id + self.cfg = config or StreamConsumerConfig() + self.metadata = metadata + self._queue: queue.Queue = queue.Queue() + self._accumulated = "" + self._message_id: Optional[str] = None + self._already_sent = False + self._last_edit_time = 0.0 + + @property + def already_sent(self) -> bool: + """True if at least one message was sent/edited — signals the base + adapter to skip re-sending the final response.""" + return self._already_sent + + def on_delta(self, text: str) -> None: + """Thread-safe callback — called from the agent's worker thread.""" + if text: + self._queue.put(text) + + def finish(self) -> None: + """Signal that the stream is complete.""" + self._queue.put(_DONE) + + async def run(self) -> None: + """Async task that drains the queue and edits the platform message.""" + try: + while True: + # Drain all available items from the queue + got_done = False + while True: + try: + item = self._queue.get_nowait() + if item is _DONE: + got_done = True + break + self._accumulated += item + except queue.Empty: + break + + # Decide whether to flush an edit + now = time.monotonic() + elapsed = now - self._last_edit_time + should_edit = ( + got_done + or (elapsed >= self.cfg.edit_interval + and len(self._accumulated) > 0) + or len(self._accumulated) >= self.cfg.buffer_threshold + ) + + if should_edit and self._accumulated: + display_text = self._accumulated + if not got_done: + display_text += self.cfg.cursor + + await self._send_or_edit(display_text) + self._last_edit_time = time.monotonic() + + if got_done: + # Final edit without cursor + if self._accumulated and self._message_id: + await self._send_or_edit(self._accumulated) + return + + await asyncio.sleep(0.05) # Small yield to not busy-loop + + except asyncio.CancelledError: + # Best-effort final edit on cancellation + if self._accumulated and self._message_id: + try: + await self._send_or_edit(self._accumulated) + except Exception: + pass + except Exception as e: + logger.error("Stream consumer error: %s", e) + + async def _send_or_edit(self, text: str) -> None: + """Send or edit the streaming message.""" + try: + if self._message_id is not None: + # Edit existing message + result = await self.adapter.edit_message( + chat_id=self.chat_id, + message_id=self._message_id, + content=text, + ) + if result.success: + self._already_sent = True + else: + # Edit failed — try sending as new message + logger.debug("Edit failed, sending new message") + result = await self.adapter.send( + chat_id=self.chat_id, + content=text, + metadata=self.metadata, + ) + if result.success and result.message_id: + self._message_id = result.message_id + self._already_sent = True + else: + # First message — send new + result = await self.adapter.send( + chat_id=self.chat_id, + content=text, + metadata=self.metadata, + ) + if result.success and result.message_id: + self._message_id = result.message_id + self._already_sent = True + except Exception as e: + logger.error("Stream send/edit error: %s", e) From 99369b926c1b55bc79d69f219975332a43080764 Mon Sep 17 00:00:00 2001 From: teknium1 Date: Mon, 16 Mar 2026 06:15:09 -0700 Subject: [PATCH 07/15] fix: always fall back to non-streaming on ANY streaming error MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously the fallback only triggered on specific error keywords like 'streaming is not supported'. Many third-party providers have partial or broken streaming — rejecting stream=True, crashing on stream_options, dropping connections mid-stream, returning malformed chunks, etc. Now: any exception during the streaming API call triggers an automatic fallback to the standard non-streaming request path. The error is logged at INFO level for diagnostics but never surfaces to the user. If the fallback also fails, THAT error propagates normally. This ensures streaming is additive — it improves UX when it works but never breaks providers that don't support it. Tests: 2 new (any-error fallback, double-failure propagation), 15 total. --- run_agent.py | 28 +++++++++------------ tests/test_streaming.py | 55 ++++++++++++++++++++++++++++++++++++++--- 2 files changed, 62 insertions(+), 21 deletions(-) diff --git a/run_agent.py b/run_agent.py index c8c471e3..459b8d0e 100644 --- a/run_agent.py +++ b/run_agent.py @@ -3208,23 +3208,17 @@ class AIAgent: else: result["response"] = _call_chat_completions() except Exception as e: - err_text = str(e).lower() - # Fall back to non-streaming if provider doesn't support it. - # Be specific in matching — "stream" alone is too broad and - # catches unrelated errors like "stream_options" rejections. - stream_unsupported = any( - kw in err_text - for kw in ("streaming is not", "streaming not support", - "does not support stream", "not available") - ) - if stream_unsupported: - logger.info("Streaming not supported by provider, falling back to non-streaming: %s", e) - try: - result["response"] = self._interruptible_api_call(api_kwargs) - except Exception as fallback_err: - result["error"] = fallback_err - else: - result["error"] = e + # Always fall back to non-streaming on ANY streaming error. + # Many third-party/extrinsic providers have partial or broken + # streaming support — rejecting stream=True, crashing on + # stream_options, dropping connections mid-stream, etc. + # A clean fallback to the standard request path ensures the + # agent still works even if streaming doesn't. + logger.info("Streaming failed, falling back to non-streaming: %s", e) + try: + result["response"] = self._interruptible_api_call(api_kwargs) + except Exception as fallback_err: + result["error"] = fallback_err finally: request_client = request_client_holder.get("client") if request_client is not None: diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 3615c2a9..6cc34d97 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -321,7 +321,7 @@ class TestStreamingCallbacks: class TestStreamingFallback: - """Verify fallback to non-streaming on unsupported providers.""" + """Verify fallback to non-streaming on ANY streaming error.""" @patch("run_agent.AIAgent._interruptible_api_call") @patch("run_agent.AIAgent._create_request_openai_client") @@ -367,16 +367,63 @@ class TestStreamingFallback: assert response.choices[0].message.content == "fallback response" mock_non_stream.assert_called_once() + @patch("run_agent.AIAgent._interruptible_api_call") @patch("run_agent.AIAgent._create_request_openai_client") @patch("run_agent.AIAgent._close_request_openai_client") - def test_non_stream_error_raises(self, mock_close, mock_create): - """Non-streaming errors propagate normally.""" + def test_any_stream_error_falls_back(self, mock_close, mock_create, mock_non_stream): + """ANY streaming error triggers fallback — not just specific messages.""" from run_agent import AIAgent mock_client = MagicMock() - mock_client.chat.completions.create.side_effect = Exception("Rate limit exceeded") + mock_client.chat.completions.create.side_effect = Exception( + "Connection reset by peer" + ) mock_create.return_value = mock_client + fallback_response = SimpleNamespace( + id="fallback", + model="test", + choices=[SimpleNamespace( + index=0, + message=SimpleNamespace( + role="assistant", + content="fallback after connection error", + tool_calls=None, + reasoning_content=None, + ), + finish_reason="stop", + )], + usage=None, + ) + mock_non_stream.return_value = fallback_response + + agent = AIAgent( + model="test/model", + quiet_mode=True, + skip_context_files=True, + skip_memory=True, + ) + agent.api_mode = "chat_completions" + agent._interrupt_requested = False + + response = agent._interruptible_streaming_api_call({}) + + assert response.choices[0].message.content == "fallback after connection error" + mock_non_stream.assert_called_once() + + @patch("run_agent.AIAgent._interruptible_api_call") + @patch("run_agent.AIAgent._create_request_openai_client") + @patch("run_agent.AIAgent._close_request_openai_client") + def test_fallback_error_propagates(self, mock_close, mock_create, mock_non_stream): + """When both streaming AND fallback fail, the fallback error propagates.""" + from run_agent import AIAgent + + mock_client = MagicMock() + mock_client.chat.completions.create.side_effect = Exception("stream broke") + mock_create.return_value = mock_client + + mock_non_stream.side_effect = Exception("Rate limit exceeded") + agent = AIAgent( model="test/model", quiet_mode=True, From 8e07f9ca560fc0c81d91ccd2702ddd8539201e5e Mon Sep 17 00:00:00 2001 From: teknium1 Date: Mon, 16 Mar 2026 06:35:46 -0700 Subject: [PATCH 08/15] =?UTF-8?q?fix:=20audit=20fixes=20=E2=80=94=205=20bu?= =?UTF-8?q?gs=20found=20and=20resolved?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Thorough code review found 5 issues across run_agent.py, cli.py, and gateway/: 1. CRITICAL — Gateway stream consumer task never started: stream_consumer_holder was checked BEFORE run_sync populated it. Fixed with async polling pattern (same as track_agent). 2. MEDIUM-HIGH — Streaming fallback after partial delivery caused double-response: if streaming failed after some tokens were delivered, the fallback would re-deliver the full response. Now tracks deltas_were_sent and only falls back when no tokens reached consumers yet. 3. MEDIUM — Codex mode lost on_first_delta spinner callback: _run_codex_stream now accepts on_first_delta parameter, fires it on first text delta. Passed through from _interruptible_streaming_api_call via _codex_on_first_delta instance attribute. 4. MEDIUM — CLI close-tag after-text bypassed tag filtering: text after a reasoning close tag was sent directly to _emit_stream_text, skipping open-tag detection. Now routes through _stream_delta for full filtering. 5. LOW — Removed 140 lines of dead code: old _streaming_api_call method (superseded by _interruptible_streaming_api_call). Updated 13 tests in test_run_agent.py and test_openai_client_lifecycle.py to use the new method name and signature. 4573 tests passing. --- cli.py | 5 +- gateway/run.py | 15 +- run_agent.py | 188 +++++--------------------- tests/test_openai_client_lifecycle.py | 11 +- tests/test_run_agent.py | 32 +++-- 5 files changed, 75 insertions(+), 176 deletions(-) diff --git a/cli.py b/cli.py index 6292993e..b7203e90 100755 --- a/cli.py +++ b/cli.py @@ -1474,9 +1474,10 @@ class HermesCLI: self._in_reasoning_block = False after = self._stream_prefilt[idx + len(tag):] self._stream_prefilt = "" - # Process remaining text after close tag + # Process remaining text after close tag through full + # filtering (it could contain another open tag) if after: - self._emit_stream_text(after) + self._stream_delta(after) return # Still inside reasoning block — keep only the tail that could # be a partial close tag prefix (save memory on long blocks). diff --git a/gateway/run.py b/gateway/run.py index 8bc860c3..71f453d8 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -4371,10 +4371,19 @@ class GatewayRunner: if tool_progress_enabled: progress_task = asyncio.create_task(send_progress_messages()) - # Start stream consumer task if configured + # Start stream consumer task — polls for consumer creation since it + # happens inside run_sync (thread pool) after the agent is constructed. stream_task = None - if stream_consumer_holder[0] is not None: - stream_task = asyncio.create_task(stream_consumer_holder[0].run()) + + async def _start_stream_consumer(): + """Wait for the stream consumer to be created, then run it.""" + for _ in range(200): # Up to 10s wait + if stream_consumer_holder[0] is not None: + await stream_consumer_holder[0].run() + return + await asyncio.sleep(0.05) + + stream_task = asyncio.create_task(_start_stream_consumer()) # Track this agent as running for this session (for interrupt support) # We do this in a callback after the agent is created diff --git a/run_agent.py b/run_agent.py index 459b8d0e..a9088732 100644 --- a/run_agent.py +++ b/run_agent.py @@ -2604,7 +2604,7 @@ class AIAgent: def _close_request_openai_client(self, client: Any, *, reason: str) -> None: self._close_openai_client(client, reason=reason, shared=False) - def _run_codex_stream(self, api_kwargs: dict, client: Any = None): + def _run_codex_stream(self, api_kwargs: dict, client: Any = None, on_first_delta: callable = None): """Execute one streaming Responses API request and return the final response.""" active_client = client or self._ensure_primary_openai_client(reason="codex_stream_direct") max_stream_retries = 1 @@ -2623,6 +2623,11 @@ class AIAgent: if delta_text and not has_tool_calls: if not first_delta_fired: first_delta_fired = True + if on_first_delta: + try: + on_first_delta() + except Exception: + pass self._fire_stream_delta(delta_text) # Track tool calls to suppress text streaming elif "function_call" in event_type: @@ -2812,6 +2817,7 @@ class AIAgent: result["response"] = self._run_codex_stream( api_kwargs, client=request_client_holder["client"], + on_first_delta=getattr(self, "_codex_on_first_delta", None), ) elif self.api_mode == "anthropic_messages": result["response"] = self._anthropic_messages_create(api_kwargs) @@ -2853,146 +2859,6 @@ class AIAgent: raise result["error"] return result["response"] - def _streaming_api_call(self, api_kwargs: dict, stream_callback): - """Streaming variant of _interruptible_api_call for voice TTS pipeline. - - Uses ``stream=True`` and forwards content deltas to *stream_callback* - in real-time. Returns a ``SimpleNamespace`` that mimics a normal - ``ChatCompletion`` so the rest of the agent loop works unchanged. - - This method is separate from ``_interruptible_api_call`` to keep the - core agent loop untouched for non-voice users. - """ - result = {"response": None, "error": None} - request_client_holder = {"client": None} - - def _call(): - try: - stream_kwargs = {**api_kwargs, "stream": True} - request_client_holder["client"] = self._create_request_openai_client( - reason="chat_completion_stream_request" - ) - stream = request_client_holder["client"].chat.completions.create(**stream_kwargs) - - content_parts: list[str] = [] - tool_calls_acc: dict[int, dict] = {} - finish_reason = None - model_name = None - role = "assistant" - - for chunk in stream: - if not chunk.choices: - if hasattr(chunk, "model") and chunk.model: - model_name = chunk.model - continue - - delta = chunk.choices[0].delta - if hasattr(chunk, "model") and chunk.model: - model_name = chunk.model - - if delta and delta.content: - content_parts.append(delta.content) - try: - stream_callback(delta.content) - except Exception: - pass - - if delta and delta.tool_calls: - for tc_delta in delta.tool_calls: - idx = tc_delta.index if tc_delta.index is not None else 0 - if idx in tool_calls_acc and tc_delta.id and tc_delta.id != tool_calls_acc[idx]["id"]: - matched = False - for eidx, eentry in tool_calls_acc.items(): - if eentry["id"] == tc_delta.id: - idx = eidx - matched = True - break - if not matched: - idx = (max(k for k in tool_calls_acc if isinstance(k, int)) + 1) if tool_calls_acc else 0 - if idx not in tool_calls_acc: - tool_calls_acc[idx] = { - "id": tc_delta.id or "", - "type": "function", - "function": {"name": "", "arguments": ""}, - } - entry = tool_calls_acc[idx] - if tc_delta.id: - entry["id"] = tc_delta.id - if tc_delta.function: - if tc_delta.function.name: - entry["function"]["name"] += tc_delta.function.name - if tc_delta.function.arguments: - entry["function"]["arguments"] += tc_delta.function.arguments - - if chunk.choices[0].finish_reason: - finish_reason = chunk.choices[0].finish_reason - - full_content = "".join(content_parts) or None - mock_tool_calls = None - if tool_calls_acc: - mock_tool_calls = [] - for idx in sorted(tool_calls_acc): - tc = tool_calls_acc[idx] - mock_tool_calls.append(SimpleNamespace( - id=tc["id"], - type=tc["type"], - function=SimpleNamespace( - name=tc["function"]["name"], - arguments=tc["function"]["arguments"], - ), - )) - - mock_message = SimpleNamespace( - role=role, - content=full_content, - tool_calls=mock_tool_calls, - reasoning_content=None, - ) - mock_choice = SimpleNamespace( - index=0, - message=mock_message, - finish_reason=finish_reason or "stop", - ) - mock_response = SimpleNamespace( - id="stream-" + str(uuid.uuid4()), - model=model_name, - choices=[mock_choice], - usage=None, - ) - result["response"] = mock_response - - except Exception as e: - result["error"] = e - finally: - request_client = request_client_holder.get("client") - if request_client is not None: - self._close_request_openai_client(request_client, reason="stream_request_complete") - - t = threading.Thread(target=_call, daemon=True) - t.start() - while t.is_alive(): - t.join(timeout=0.3) - if self._interrupt_requested: - try: - if self.api_mode == "anthropic_messages": - from agent.anthropic_adapter import build_anthropic_client - - self._anthropic_client.close() - self._anthropic_client = build_anthropic_client( - self._anthropic_api_key, - getattr(self, "_anthropic_base_url", None), - ) - else: - request_client = request_client_holder.get("client") - if request_client is not None: - self._close_request_openai_client(request_client, reason="stream_interrupt_abort") - except Exception: - pass - raise InterruptedError("Agent interrupted during API call") - if result["error"] is not None: - raise result["error"] - return result["response"] - # ── Unified streaming API call ───────────────────────────────────────── def _fire_stream_delta(self, text: str) -> None: @@ -3039,12 +2905,20 @@ class AIAgent: streaming is not supported. """ if self.api_mode == "codex_responses": - # Codex already streams internally; we just need to pass callbacks - return self._interruptible_api_call(api_kwargs) + # Codex streams internally via _run_codex_stream. The main dispatch + # in _interruptible_api_call already calls it; we just need to + # ensure on_first_delta reaches it. Store it on the instance + # temporarily so _run_codex_stream can pick it up. + self._codex_on_first_delta = on_first_delta + try: + return self._interruptible_api_call(api_kwargs) + finally: + self._codex_on_first_delta = None result = {"response": None, "error": None} request_client_holder = {"client": None} first_delta_fired = {"done": False} + deltas_were_sent = {"yes": False} # Track if any deltas were fired (for fallback) def _fire_first_delta(): if not first_delta_fired["done"] and on_first_delta: @@ -3098,6 +2972,7 @@ class AIAgent: if not tool_calls_acc: _fire_first_delta() self._fire_stream_delta(delta.content) + deltas_were_sent["yes"] = True # Accumulate tool call deltas (silently, no callback) if delta and delta.tool_calls: @@ -3208,17 +3083,22 @@ class AIAgent: else: result["response"] = _call_chat_completions() except Exception as e: - # Always fall back to non-streaming on ANY streaming error. - # Many third-party/extrinsic providers have partial or broken - # streaming support — rejecting stream=True, crashing on - # stream_options, dropping connections mid-stream, etc. - # A clean fallback to the standard request path ensures the - # agent still works even if streaming doesn't. - logger.info("Streaming failed, falling back to non-streaming: %s", e) - try: - result["response"] = self._interruptible_api_call(api_kwargs) - except Exception as fallback_err: - result["error"] = fallback_err + if deltas_were_sent["yes"]: + # Streaming failed AFTER some tokens were already delivered + # to consumers. Don't fall back — that would cause + # double-delivery (partial streamed + full non-streamed). + # Let the error propagate; the partial content already + # reached the user via the stream. + logger.warning("Streaming failed after partial delivery, not falling back: %s", e) + result["error"] = e + else: + # Streaming failed before any tokens reached consumers. + # Safe to fall back to the standard non-streaming path. + logger.info("Streaming failed before delivery, falling back to non-streaming: %s", e) + try: + result["response"] = self._interruptible_api_call(api_kwargs) + except Exception as fallback_err: + result["error"] = fallback_err finally: request_client = request_client_holder.get("client") if request_client is not None: diff --git a/tests/test_openai_client_lifecycle.py b/tests/test_openai_client_lifecycle.py index 69573789..72d92fd1 100644 --- a/tests/test_openai_client_lifecycle.py +++ b/tests/test_openai_client_lifecycle.py @@ -59,8 +59,11 @@ def _build_agent(shared_client=None): agent._interrupt_requested = False agent._interrupt_message = None agent._client_lock = threading.RLock() - agent._client_kwargs = {"api_key": "test-key", "base_url": agent.base_url} + agent._client_kwargs = {"api_key": "***", "base_url": agent.base_url} agent.client = shared_client or FakeSharedClient(lambda **kwargs: {"shared": True}) + agent.stream_delta_callback = None + agent._stream_callback = None + agent.reasoning_callback = None return agent @@ -173,7 +176,11 @@ def test_streaming_call_recreates_closed_shared_client_before_request(monkeypatc monkeypatch.setattr(run_agent, "OpenAI", factory) agent = _build_agent(shared_client=stale_shared) - response = agent._streaming_api_call({"model": agent.model, "messages": []}, lambda _delta: None) + agent.stream_delta_callback = lambda _delta: None + # Force chat_completions mode so the streaming path uses + # chat.completions.create(stream=True) instead of Codex responses.stream() + agent.api_mode = "chat_completions" + response = agent._interruptible_streaming_api_call({"model": agent.model, "messages": []}) assert response.choices[0].message.content == "Hello world" assert agent.client is replacement_shared diff --git a/tests/test_run_agent.py b/tests/test_run_agent.py index 2cc37fc5..cfe8bab2 100644 --- a/tests/test_run_agent.py +++ b/tests/test_run_agent.py @@ -2329,8 +2329,9 @@ class TestStreamingApiCall: ] agent.client.chat.completions.create.return_value = iter(chunks) callback = MagicMock() + agent.stream_delta_callback = callback - resp = agent._streaming_api_call({"messages": []}, callback) + resp = agent._interruptible_streaming_api_call({"messages": []}) assert resp.choices[0].message.content == "Hello World" assert resp.choices[0].finish_reason == "stop" @@ -2347,7 +2348,7 @@ class TestStreamingApiCall: ] agent.client.chat.completions.create.return_value = iter(chunks) - resp = agent._streaming_api_call({"messages": []}, MagicMock()) + resp = agent._interruptible_streaming_api_call({"messages": []}) tc = resp.choices[0].message.tool_calls assert len(tc) == 1 @@ -2363,7 +2364,7 @@ class TestStreamingApiCall: ] agent.client.chat.completions.create.return_value = iter(chunks) - resp = agent._streaming_api_call({"messages": []}, MagicMock()) + resp = agent._interruptible_streaming_api_call({"messages": []}) tc = resp.choices[0].message.tool_calls assert len(tc) == 2 @@ -2378,7 +2379,7 @@ class TestStreamingApiCall: ] agent.client.chat.completions.create.return_value = iter(chunks) - resp = agent._streaming_api_call({"messages": []}, MagicMock()) + resp = agent._interruptible_streaming_api_call({"messages": []}) assert resp.choices[0].message.content == "I'll search" assert len(resp.choices[0].message.tool_calls) == 1 @@ -2387,7 +2388,7 @@ class TestStreamingApiCall: chunks = [_make_chunk(finish_reason="stop")] agent.client.chat.completions.create.return_value = iter(chunks) - resp = agent._streaming_api_call({"messages": []}, MagicMock()) + resp = agent._interruptible_streaming_api_call({"messages": []}) assert resp.choices[0].message.content is None assert resp.choices[0].message.tool_calls is None @@ -2399,9 +2400,9 @@ class TestStreamingApiCall: _make_chunk(finish_reason="stop"), ] agent.client.chat.completions.create.return_value = iter(chunks) - callback = MagicMock(side_effect=ValueError("boom")) + agent.stream_delta_callback = MagicMock(side_effect=ValueError("boom")) - resp = agent._streaming_api_call({"messages": []}, callback) + resp = agent._interruptible_streaming_api_call({"messages": []}) assert resp.choices[0].message.content == "Hello World" @@ -2412,7 +2413,7 @@ class TestStreamingApiCall: ] agent.client.chat.completions.create.return_value = iter(chunks) - resp = agent._streaming_api_call({"messages": []}, MagicMock()) + resp = agent._interruptible_streaming_api_call({"messages": []}) assert resp.model == "gpt-4o" @@ -2420,22 +2421,23 @@ class TestStreamingApiCall: chunks = [_make_chunk(content="x"), _make_chunk(finish_reason="stop")] agent.client.chat.completions.create.return_value = iter(chunks) - agent._streaming_api_call({"messages": [], "model": "test"}, MagicMock()) + agent._interruptible_streaming_api_call({"messages": [], "model": "test"}) call_kwargs = agent.client.chat.completions.create.call_args assert call_kwargs[1].get("stream") is True or call_kwargs.kwargs.get("stream") is True - def test_api_exception_propagated(self, agent): + def test_api_exception_falls_back_to_non_streaming(self, agent): + """When streaming fails before any deltas, fallback to non-streaming is attempted.""" agent.client.chat.completions.create.side_effect = ConnectionError("fail") - + # The fallback also uses the same client, so it'll fail too with pytest.raises(ConnectionError, match="fail"): - agent._streaming_api_call({"messages": []}, MagicMock()) + agent._interruptible_streaming_api_call({"messages": []}) def test_response_has_uuid_id(self, agent): chunks = [_make_chunk(content="x"), _make_chunk(finish_reason="stop")] agent.client.chat.completions.create.return_value = iter(chunks) - resp = agent._streaming_api_call({"messages": []}, MagicMock()) + resp = agent._interruptible_streaming_api_call({"messages": []}) assert resp.id.startswith("stream-") assert len(resp.id) > len("stream-") @@ -2449,7 +2451,7 @@ class TestStreamingApiCall: ] agent.client.chat.completions.create.return_value = iter(chunks) - resp = agent._streaming_api_call({"messages": []}, MagicMock()) + resp = agent._interruptible_streaming_api_call({"messages": []}) assert resp.choices[0].message.content == "Hello" assert resp.model == "gpt-4" @@ -2505,7 +2507,7 @@ class TestAnthropicInterruptHandler: def test_streaming_has_anthropic_branch(self): """_streaming_api_call must also handle Anthropic interrupt.""" import inspect - source = inspect.getsource(AIAgent._streaming_api_call) + source = inspect.getsource(AIAgent._interruptible_streaming_api_call) assert "anthropic_messages" in source, \ "_streaming_api_call must handle Anthropic interrupt" From fc4080c58a4deef04e49d5ead4ccffc5d7170a68 Mon Sep 17 00:00:00 2001 From: teknium1 Date: Mon, 16 Mar 2026 07:34:29 -0700 Subject: [PATCH 09/15] fix(cli): add to streaming tag suppression list Anthropic native models emit tags in text content (separate from the SDK's thinking_delta events). Without suppression, these tags leak into the streamed CLI output. Found during live provider testing. --- cli.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cli.py b/cli.py index b7203e90..2aa18179 100755 --- a/cli.py +++ b/cli.py @@ -1431,8 +1431,8 @@ class HermesCLI: # These tags are model-generated (system prompt tells the model # to use them) and get stripped from final_response. We must # suppress them during streaming too. - _OPEN_TAGS = ("", "", "") - _CLOSE_TAGS = ("", "", "") + _OPEN_TAGS = ("", "", "", "") + _CLOSE_TAGS = ("", "", "", "") # Append to a pre-filter buffer first self._stream_prefilt = getattr(self, "_stream_prefilt", "") + text From c0b88018eb8c95d139ac590cf60da5789cb6ca15 Mon Sep 17 00:00:00 2001 From: teknium1 Date: Mon, 16 Mar 2026 07:44:42 -0700 Subject: [PATCH 10/15] =?UTF-8?q?feat:=20ship=20streaming=20disabled=20by?= =?UTF-8?q?=20default=20=E2=80=94=20opt-in=20via=20config?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Streaming is now off by default for both CLI and gateway. Users opt in: CLI (config.yaml): display: streaming: true Gateway (config.yaml): streaming: enabled: true This lets early adopters test streaming while existing users see zero change. Once we have enough field validation, we flip the default to true in a subsequent release. --- cli.py | 5 ++++- gateway/config.py | 4 ++-- hermes_cli/config.py | 1 + 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/cli.py b/cli.py index 2aa18179..51625fcf 100755 --- a/cli.py +++ b/cli.py @@ -1017,6 +1017,9 @@ class HermesCLI: self.show_reasoning = CLI_CONFIG["display"].get("show_reasoning", False) self.verbose = verbose if verbose is not None else (self.tool_progress_mode == "verbose") + # streaming: stream tokens to the terminal as they arrive (display.streaming in config.yaml) + self.streaming_enabled = CLI_CONFIG["display"].get("streaming", False) + # Streaming display state self._stream_buf = "" # Partial line buffer for line-buffered rendering self._stream_started = False # True once first delta arrives @@ -1719,7 +1722,7 @@ class HermesCLI: checkpoint_max_snapshots=self.checkpoint_max_snapshots, pass_session_id=self.pass_session_id, tool_progress_callback=self._on_tool_progress, - stream_delta_callback=self._stream_delta, + stream_delta_callback=self._stream_delta if self.streaming_enabled else None, ) # Apply any pending title now that the session exists in the DB if self._pending_title and self._session_db: diff --git a/gateway/config.py b/gateway/config.py index 676b5214..0b01ed26 100644 --- a/gateway/config.py +++ b/gateway/config.py @@ -149,7 +149,7 @@ class PlatformConfig: @dataclass class StreamingConfig: """Configuration for real-time token streaming to messaging platforms.""" - enabled: bool = True + enabled: bool = False transport: str = "edit" # "edit" (progressive editMessageText) or "off" edit_interval: float = 0.3 # Seconds between message edits buffer_threshold: int = 40 # Chars before forcing an edit @@ -169,7 +169,7 @@ class StreamingConfig: if not data: return cls() return cls( - enabled=data.get("enabled", True), + enabled=data.get("enabled", False), transport=data.get("transport", "edit"), edit_interval=float(data.get("edit_interval", 0.3)), buffer_threshold=int(data.get("buffer_threshold", 40)), diff --git a/hermes_cli/config.py b/hermes_cli/config.py index f7813130..9c07153d 100644 --- a/hermes_cli/config.py +++ b/hermes_cli/config.py @@ -202,6 +202,7 @@ DEFAULT_CONFIG = { "resume_display": "full", "bell_on_complete": False, "show_reasoning": False, + "streaming": False, "skin": "default", }, From 23b9d88a763c33c080c404246967c4e1b50b901e Mon Sep 17 00:00:00 2001 From: teknium1 Date: Mon, 16 Mar 2026 07:53:08 -0700 Subject: [PATCH 11/15] docs: add streaming config to cli-config.yaml.example and defaults Documents the new streaming options in the example config: - display.streaming for CLI (under display section) - streaming.enabled + transport/interval/threshold/cursor for gateway - Added streaming: false to load_cli_config() defaults dict --- cli-config.yaml.example | 19 +++++++++++++++++++ cli.py | 1 + 2 files changed, 20 insertions(+) diff --git a/cli-config.yaml.example b/cli-config.yaml.example index 7bc2c490..ea5ba6f8 100644 --- a/cli-config.yaml.example +++ b/cli-config.yaml.example @@ -339,6 +339,19 @@ session_reset: # explicitly want one shared "room brain" per group/channel. group_sessions_per_user: true +# ───────────────────────────────────────────────────────────────────────────── +# Gateway Streaming +# ───────────────────────────────────────────────────────────────────────────── +# Stream tokens to messaging platforms in real-time. The bot sends a message +# on first token, then progressively edits it as more tokens arrive. +# Disabled by default — enable to try the streaming UX on Telegram/Discord/Slack. +streaming: + enabled: false + # transport: edit # "edit" = progressive editMessageText + # edit_interval: 0.3 # seconds between message edits + # buffer_threshold: 40 # chars before forcing an edit flush + # cursor: " ▉" # cursor shown during streaming + # ============================================================================= # Skills Configuration # ============================================================================= @@ -700,6 +713,12 @@ display: # Toggle at runtime with /reasoning show or /reasoning hide. show_reasoning: false + # Stream tokens to the terminal as they arrive instead of waiting for the + # full response. The response box opens on first token and text appears + # line-by-line. Tool calls are still captured silently. + # Disabled by default — enable to try the streaming UX. + streaming: false + # ─────────────────────────────────────────────────────────────────────────── # Skin / Theme # ─────────────────────────────────────────────────────────────────────────── diff --git a/cli.py b/cli.py index 51625fcf..4256ac20 100755 --- a/cli.py +++ b/cli.py @@ -203,6 +203,7 @@ def load_cli_config() -> Dict[str, Any]: "compact": False, "resume_display": "full", "show_reasoning": False, + "streaming": False, "skin": "default", }, "clarify": { From d3687d3e817eaf98f983e8bd6358e825074340c9 Mon Sep 17 00:00:00 2001 From: teknium1 Date: Mon, 16 Mar 2026 10:22:44 -0700 Subject: [PATCH 12/15] docs: document planned live reasoning token display as future enhancement The streaming infrastructure already fires reasoning deltas via _fire_reasoning_delta() during streaming. The remaining work is the CLI display layer: a dim reasoning box that opens on first reasoning token, streams live, then transitions to the response box. Reference: PR #1214 (raulvidis) for gateway reasoning visibility. --- cli.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/cli.py b/cli.py index 4256ac20..b02221ec 100755 --- a/cli.py +++ b/cli.py @@ -1413,6 +1413,15 @@ class HermesCLI: self._invalidate() # ── Streaming display ──────────────────────────────────────────────── + # + # Future: When display.show_reasoning is also enabled, stream reasoning + # tokens into a dim box above the response (like the existing static + # reasoning display, but live). The infrastructure exists — reasoning + # deltas fire via _fire_reasoning_delta() during streaming. The display + # layer needs: a dim reasoning box that opens on first reasoning token, + # accumulates live, then transitions to the response box when content + # tokens start arriving. See PR #1214 (raulvidis) for gateway-side + # reasoning visibility modes as a reference implementation. def _stream_delta(self, text: str) -> None: """Line-buffered streaming callback for real-time token rendering. From 942950f5b9aaba74a60c7d400b839f70807d3696 Mon Sep 17 00:00:00 2001 From: teknium1 Date: Mon, 16 Mar 2026 10:29:55 -0700 Subject: [PATCH 13/15] =?UTF-8?q?feat(cli):=20live=20reasoning=20token=20s?= =?UTF-8?q?treaming=20=E2=80=94=20dim=20box=20above=20response?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When both display.streaming and display.show_reasoning are enabled, reasoning tokens stream in real-time into a dim bordered box. When content tokens start arriving, the reasoning box closes and the response box opens — smooth visual transition. - _stream_reasoning_delta(): line-buffered rendering in dim text - _close_reasoning_box(): flush + close, called on first content token - Reasoning callback routes to streaming version when both flags set - Skips static post-response reasoning display when streamed live - State reset per turn via _reset_stream_state() Works with reasoning_content deltas (OpenRouter reasoning mode) and thinking_delta events (Anthropic extended thinking). --- cli.py | 65 +++++++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 53 insertions(+), 12 deletions(-) diff --git a/cli.py b/cli.py index b02221ec..c24b67e0 100755 --- a/cli.py +++ b/cli.py @@ -1413,15 +1413,43 @@ class HermesCLI: self._invalidate() # ── Streaming display ──────────────────────────────────────────────── - # - # Future: When display.show_reasoning is also enabled, stream reasoning - # tokens into a dim box above the response (like the existing static - # reasoning display, but live). The infrastructure exists — reasoning - # deltas fire via _fire_reasoning_delta() during streaming. The display - # layer needs: a dim reasoning box that opens on first reasoning token, - # accumulates live, then transitions to the response box when content - # tokens start arriving. See PR #1214 (raulvidis) for gateway-side - # reasoning visibility modes as a reference implementation. + + def _stream_reasoning_delta(self, text: str) -> None: + """Stream reasoning/thinking tokens into a dim box above the response. + + Opens a dim reasoning box on first token, streams line-by-line. + The box is closed automatically when content tokens start arriving + (via _stream_delta → _emit_stream_text). + """ + if not text: + return + + # Open reasoning box on first reasoning token + if not getattr(self, "_reasoning_box_opened", False): + self._reasoning_box_opened = True + w = shutil.get_terminal_size().columns + r_label = " Reasoning " + r_fill = w - 2 - len(r_label) + _cprint(f"\n{_DIM}┌─{r_label}{'─' * max(r_fill - 1, 0)}┐{_RST}") + + self._reasoning_buf = getattr(self, "_reasoning_buf", "") + text + + # Emit complete lines + while "\n" in self._reasoning_buf: + line, self._reasoning_buf = self._reasoning_buf.split("\n", 1) + _cprint(f"{_DIM}{line}{_RST}") + + def _close_reasoning_box(self) -> None: + """Close the live reasoning box if it's open.""" + if getattr(self, "_reasoning_box_opened", False): + # Flush remaining reasoning buffer + buf = getattr(self, "_reasoning_buf", "") + if buf: + _cprint(f"{_DIM}{buf}{_RST}") + self._reasoning_buf = "" + w = shutil.get_terminal_size().columns + _cprint(f"{_DIM}└{'─' * (w - 2)}┘{_RST}") + self._reasoning_box_opened = False def _stream_delta(self, text: str) -> None: """Line-buffered streaming callback for real-time token rendering. @@ -1504,6 +1532,9 @@ class HermesCLI: if not text: return + # Close the live reasoning box before opening the response box + self._close_reasoning_box() + # Open the response box header on the very first visible text if not self._stream_box_opened: # Strip leading whitespace/newlines before first visible content @@ -1530,6 +1561,9 @@ class HermesCLI: def _flush_stream(self) -> None: """Emit any remaining partial line from the stream buffer and close the box.""" + # Close reasoning box if still open (in case no content tokens arrived) + self._close_reasoning_box() + if self._stream_buf: _cprint(self._stream_buf) self._stream_buf = "" @@ -1546,6 +1580,8 @@ class HermesCLI: self._stream_box_opened = False self._stream_prefilt = "" self._in_reasoning_block = False + self._reasoning_box_opened = False + self._reasoning_buf = "" def _slow_command_status(self, command: str) -> str: """Return a user-facing status message for slower slash commands.""" @@ -1724,7 +1760,11 @@ class HermesCLI: platform="cli", session_db=self._session_db, clarify_callback=self._clarify_callback, - reasoning_callback=self._on_reasoning if (self.show_reasoning or self.verbose) else None, + reasoning_callback=( + self._stream_reasoning_delta if (self.streaming_enabled and self.show_reasoning) + else self._on_reasoning if (self.show_reasoning or self.verbose) + else None + ), honcho_session_key=None, # resolved by run_agent via config sessions map / title fallback_model=self._fallback_model, thinking_callback=self._on_thinking, @@ -4935,8 +4975,9 @@ class HermesCLI: response_previewed = result.get("response_previewed", False) if result else False - # Display reasoning (thinking) box if enabled and available - if self.show_reasoning and result: + # Display reasoning (thinking) box if enabled and available. + # Skip when streaming already showed reasoning live. + if self.show_reasoning and result and not self._stream_started: reasoning = result.get("last_reasoning") if reasoning: w = shutil.get_terminal_size().columns From 25a1f1867fa9fbe6732e8c246934955ca54f8e61 Mon Sep 17 00:00:00 2001 From: teknium1 Date: Mon, 16 Mar 2026 12:41:28 -0700 Subject: [PATCH 14/15] fix(gateway): prevent message flooding on adapters without edit support When the stream consumer's first edit_message() call fails (Signal, Email, HomeAssistant don't support editing), it now disables editing for the rest of the stream instead of falling back to sending a new message every 0.3 seconds. The final response is delivered by the normal send path since already_sent stays false. Without this fix, enabling gateway streaming on Signal/Email/HA would flood the chat with dozens of partial messages. --- gateway/stream_consumer.py | 35 ++++++++++++++++++++--------------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/gateway/stream_consumer.py b/gateway/stream_consumer.py index f6ab004c..42d9dd70 100644 --- a/gateway/stream_consumer.py +++ b/gateway/stream_consumer.py @@ -66,6 +66,7 @@ class GatewayStreamConsumer: self._accumulated = "" self._message_id: Optional[str] = None self._already_sent = False + self._edit_supported = True # Disabled on first edit failure (Signal/Email/HA) self._last_edit_time = 0.0 @property @@ -139,25 +140,26 @@ class GatewayStreamConsumer: """Send or edit the streaming message.""" try: if self._message_id is not None: - # Edit existing message - result = await self.adapter.edit_message( - chat_id=self.chat_id, - message_id=self._message_id, - content=text, - ) - if result.success: - self._already_sent = True - else: - # Edit failed — try sending as new message - logger.debug("Edit failed, sending new message") - result = await self.adapter.send( + if self._edit_supported: + # Edit existing message + result = await self.adapter.edit_message( chat_id=self.chat_id, + message_id=self._message_id, content=text, - metadata=self.metadata, ) - if result.success and result.message_id: - self._message_id = result.message_id + if result.success: self._already_sent = True + else: + # Edit not supported by this adapter — stop streaming, + # let the normal send path handle the final response. + # Without this guard, adapters like Signal/Email would + # flood the chat with a new message every edit_interval. + logger.debug("Edit failed, disabling streaming for this adapter") + self._edit_supported = False + else: + # Editing not supported — skip intermediate updates. + # The final response will be sent by the normal path. + pass else: # First message — send new result = await self.adapter.send( @@ -168,5 +170,8 @@ class GatewayStreamConsumer: if result.success and result.message_id: self._message_id = result.message_id self._already_sent = True + else: + # Initial send failed — disable streaming for this session + self._edit_supported = False except Exception as e: logger.error("Stream send/edit error: %s", e) From 8feb9e4656ab12458ad68a0a433b79bac42a6adb Mon Sep 17 00:00:00 2001 From: teknium1 Date: Mon, 16 Mar 2026 12:53:49 -0700 Subject: [PATCH 15/15] docs: add streaming section to configuration guide --- website/docs/user-guide/configuration.md | 31 ++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/website/docs/user-guide/configuration.md b/website/docs/user-guide/configuration.md index 9a673bc7..97cb9f0b 100644 --- a/website/docs/user-guide/configuration.md +++ b/website/docs/user-guide/configuration.md @@ -832,6 +832,7 @@ display: resume_display: full # full (show previous messages on resume) | minimal (one-liner only) bell_on_complete: false # Play terminal bell when agent finishes (great for long tasks) show_reasoning: false # Show model reasoning/thinking above each response (toggle with /reasoning show|hide) + streaming: false # Stream tokens to terminal as they arrive (real-time output) background_process_notifications: all # all | result | error | off (gateway only) ``` @@ -884,6 +885,36 @@ voice: Use `/voice on` in the CLI to enable microphone mode, `record_key` to start/stop recording, and `/voice tts` to toggle spoken replies. See [Voice Mode](/docs/user-guide/features/voice-mode) for end-to-end setup and platform-specific behavior. +## Streaming + +Stream tokens to the terminal or messaging platforms as they arrive, instead of waiting for the full response. + +### CLI Streaming + +```yaml +display: + streaming: true # Stream tokens to terminal in real-time + show_reasoning: true # Also stream reasoning/thinking tokens (optional) +``` + +When enabled, responses appear token-by-token inside a streaming box. Tool calls are still captured silently. If the provider doesn't support streaming, it falls back to the normal display automatically. + +### Gateway Streaming (Telegram, Discord, Slack) + +```yaml +streaming: + enabled: true # Enable progressive message editing + edit_interval: 0.3 # Seconds between message edits + buffer_threshold: 40 # Characters before forcing an edit flush + cursor: " ▉" # Cursor shown during streaming +``` + +When enabled, the bot sends a message on the first token, then progressively edits it as more tokens arrive. Platforms that don't support message editing (Signal, Email) gracefully skip streaming and deliver the final response normally. + +:::note +Streaming is disabled by default. Enable it in `~/.hermes/config.yaml` to try the streaming UX. +::: + ## Group Chat Session Isolation Control whether shared chats keep one conversation per room or one conversation per participant: