From 179d9e1a22709a6475d931cb4827abc97bd6ca02 Mon Sep 17 00:00:00 2001
From: 0xbyt4 <35742124+0xbyt4@users.noreply.github.com>
Date: Tue, 3 Mar 2026 23:03:42 +0300
Subject: [PATCH] feat: add streaming sentence-by-sentence TTS via ElevenLabs
Stream audio to speaker as the agent generates tokens instead of
waiting for the full response. First sentence plays within ~1-2s
of agent starting to respond.
- run_agent: add stream_callback to run_conversation/chat, streaming
path in _interruptible_api_call accumulates chunks into mock
ChatCompletion while forwarding content deltas to callback
- tts_tool: add stream_tts_to_speaker() with sentence buffering,
think block filtering, markdown stripping, ElevenLabs pcm_24000
streaming to sounddevice OutputStream
- cli: wire up streaming TTS pipeline in chat(), detect elevenlabs
provider + sounddevice availability, skip batch TTS when streaming
is active, signal stop on interrupt
Falls back to batch TTS for Edge/OpenAI providers or when
elevenlabs/sounddevice are not available. Zero impact on non-voice
mode (callback defaults to None).
---
cli.py | 70 ++++++++++++--
run_agent.py | 127 +++++++++++++++++++++++--
tools/tts_tool.py | 231 ++++++++++++++++++++++++++++++++++++++++++++++
3 files changed, 410 insertions(+), 18 deletions(-)
diff --git a/cli.py b/cli.py
index 5e4f5c7d..230d1e9f 100755
--- a/cli.py
+++ b/cli.py
@@ -4093,19 +4093,60 @@ class HermesCLI:
try:
# Run the conversation with interrupt monitoring
result = None
-
+
+ # --- Streaming TTS setup ---
+ # When ElevenLabs is the TTS provider and sounddevice is available,
+ # we stream audio sentence-by-sentence as the agent generates tokens
+ # instead of waiting for the full response.
+ use_streaming_tts = False
+ text_queue = None
+ tts_thread = None
+ stream_callback = None
+ stop_event = None
+
+ if self._voice_tts:
+ try:
+ from tools.tts_tool import (
+ _load_tts_config as _load_tts_cfg,
+ _get_provider as _get_prov,
+ _HAS_ELEVENLABS as _el_ok,
+ _HAS_AUDIO as _audio_ok,
+ stream_tts_to_speaker,
+ )
+ _tts_cfg = _load_tts_cfg()
+ if (_get_prov(_tts_cfg) == "elevenlabs" and _el_ok and _audio_ok):
+ use_streaming_tts = True
+ except Exception:
+ pass
+
+ if use_streaming_tts:
+ text_queue = queue.Queue()
+ stop_event = threading.Event()
+
+ tts_thread = threading.Thread(
+ target=stream_tts_to_speaker,
+ args=(text_queue, stop_event, self._voice_tts_done),
+ daemon=True,
+ )
+ tts_thread.start()
+
+ def stream_callback(delta: str):
+ if text_queue is not None:
+ text_queue.put(delta)
+
def run_agent():
nonlocal result
result = self.agent.run_conversation(
user_message=message,
conversation_history=self.conversation_history[:-1], # Exclude the message we just added
+ stream_callback=stream_callback,
task_id=self.session_id,
)
-
+
# Start agent in background thread
agent_thread = threading.Thread(target=run_agent)
agent_thread.start()
-
+
# Monitor the dedicated interrupt queue while the agent runs.
# _interrupt_queue is separate from _pending_input, so process_loop
# and chat() never compete for the same queue.
@@ -4124,6 +4165,9 @@ class HermesCLI:
if self._clarify_state or self._clarify_freetext:
continue
print(f"\n⚡ New message detected, interrupting...")
+ # Signal TTS to stop on interrupt
+ if stop_event is not None:
+ stop_event.set()
self.agent.interrupt(interrupt_msg)
# Debug: log to file (stdout may be devnull from redirect_stdout)
try:
@@ -4143,9 +4187,15 @@ class HermesCLI:
else:
# Fallback for non-interactive mode (e.g., single-query)
agent_thread.join(0.1)
-
+
agent_thread.join() # Ensure agent thread completes
+ # Signal end-of-text to TTS consumer and wait for it to finish
+ if use_streaming_tts and text_queue is not None:
+ text_queue.put(None) # sentinel
+ if tts_thread is not None:
+ tts_thread.join(timeout=120)
+
# Drain any remaining agent output still in the StdoutProxy
# buffer so tool/status lines render ABOVE our response box.
# The flush pushes data into the renderer queue; the short
@@ -4156,15 +4206,15 @@ class HermesCLI:
# Update history with full conversation
self.conversation_history = result.get("messages", self.conversation_history) if result else self.conversation_history
-
+
# Get the final response
response = result.get("final_response", "") if result else ""
-
+
# Handle failed results (e.g., non-retryable errors like invalid model)
if result and result.get("failed") and not response:
error_detail = result.get("error", "Unknown error")
response = f"Error: {error_detail}"
-
+
# Handle interrupt - check if we were interrupted
pending_message = None
if result and result.get("interrupted"):
@@ -4172,8 +4222,9 @@ class HermesCLI:
# Add indicator that we were interrupted
if response and pending_message:
response = response + "\n\n---\n_[Interrupted - processing new message]_"
-
+
response_previewed = result.get("response_previewed", False) if result else False
+
# Display reasoning (thinking) box if enabled and available
if self.show_reasoning and result:
reasoning = result.get("last_reasoning")
@@ -4226,7 +4277,8 @@ class HermesCLI:
sys.stdout.flush()
# Speak response aloud if voice TTS is enabled
- if self._voice_tts and response:
+ # Skip batch TTS when streaming TTS already handled it
+ if self._voice_tts and response and not use_streaming_tts:
threading.Thread(
target=self._voice_speak_response,
args=(response,),
diff --git a/run_agent.py b/run_agent.py
index ba214b71..6dd08436 100644
--- a/run_agent.py
+++ b/run_agent.py
@@ -2576,10 +2576,16 @@ class AIAgent:
"""
Run the API call in a background thread so the main conversation loop
can detect interrupts without waiting for the full HTTP round-trip.
-
+
On interrupt, closes the HTTP client to cancel the in-flight request
(stops token generation and avoids wasting money), then rebuilds the
client for future calls.
+
+ When ``self._stream_callback`` is set (streaming TTS mode), the call
+ uses ``stream=True`` and iterates over chunks inside the background
+ thread. Content deltas are forwarded to the callback in real-time
+ while the full response is accumulated and returned as a
+ ``SimpleNamespace`` that mimics a normal ``ChatCompletion``.
"""
result = {"response": None, "error": None}
@@ -2587,10 +2593,103 @@ class AIAgent:
try:
if self.api_mode == "codex_responses":
result["response"] = self._run_codex_stream(api_kwargs)
+ return
elif self.api_mode == "anthropic_messages":
result["response"] = self._anthropic_client.messages.create(**api_kwargs)
- else:
+ return
+
+ cb = getattr(self, "_stream_callback", None)
+ if cb is None:
+ # Non-streaming path (default)
result["response"] = self.client.chat.completions.create(**api_kwargs)
+ return
+
+ # --- Streaming path for TTS pipeline ---
+ stream_kwargs = {**api_kwargs, "stream": True}
+ stream = self.client.chat.completions.create(**stream_kwargs)
+
+ content_parts: list[str] = []
+ tool_calls_acc: dict[int, dict] = {} # index -> {id, type, function:{name, arguments}}
+ finish_reason = None
+ model_name = None
+ role = "assistant"
+
+ for chunk in stream:
+ if not chunk.choices:
+ # Usage-only or empty chunk
+ if hasattr(chunk, "model") and chunk.model:
+ model_name = chunk.model
+ continue
+
+ delta = chunk.choices[0].delta
+ if hasattr(chunk, "model") and chunk.model:
+ model_name = chunk.model
+
+ # Content delta
+ if delta and delta.content:
+ content_parts.append(delta.content)
+ try:
+ cb(delta.content)
+ except Exception:
+ pass
+
+ # Tool call deltas
+ if delta and delta.tool_calls:
+ for tc_delta in delta.tool_calls:
+ idx = tc_delta.index
+ if idx not in tool_calls_acc:
+ tool_calls_acc[idx] = {
+ "id": tc_delta.id or "",
+ "type": "function",
+ "function": {"name": "", "arguments": ""},
+ }
+ entry = tool_calls_acc[idx]
+ if tc_delta.id:
+ entry["id"] = tc_delta.id
+ if tc_delta.function:
+ if tc_delta.function.name:
+ entry["function"]["name"] += tc_delta.function.name
+ if tc_delta.function.arguments:
+ entry["function"]["arguments"] += tc_delta.function.arguments
+
+ if chunk.choices[0].finish_reason:
+ finish_reason = chunk.choices[0].finish_reason
+
+ # Build a mock ChatCompletion matching the non-streaming interface
+ full_content = "".join(content_parts) or None
+ mock_tool_calls = None
+ if tool_calls_acc:
+ mock_tool_calls = []
+ for idx in sorted(tool_calls_acc):
+ tc = tool_calls_acc[idx]
+ mock_tool_calls.append(SimpleNamespace(
+ id=tc["id"],
+ type=tc["type"],
+ function=SimpleNamespace(
+ name=tc["function"]["name"],
+ arguments=tc["function"]["arguments"],
+ ),
+ ))
+
+ mock_message = SimpleNamespace(
+ role=role,
+ content=full_content,
+ tool_calls=mock_tool_calls,
+ reasoning_content=None,
+ )
+ mock_choice = SimpleNamespace(
+ index=0,
+ message=mock_message,
+ finish_reason=finish_reason or "stop",
+ )
+ mock_response = SimpleNamespace(
+ id="stream-" + str(uuid.uuid4()),
+ model=model_name,
+ choices=[mock_choice],
+ usage=None,
+ )
+ result["response"] = mock_response
+
except Exception as e:
result["error"] = e
@@ -3915,7 +4014,8 @@ class AIAgent:
user_message: str,
system_message: str = None,
conversation_history: List[Dict[str, Any]] = None,
- task_id: str = None
+ task_id: str = None,
+ stream_callback: Optional[callable] = None,
) -> Dict[str, Any]:
"""
Run a complete conversation with tool calling until completion.
@@ -3925,6 +4025,9 @@ class AIAgent:
system_message (str): Custom system message (optional, overrides ephemeral_system_prompt if provided)
conversation_history (List[Dict]): Previous conversation messages (optional)
task_id (str): Unique identifier for this task to isolate VMs between concurrent tasks (optional, auto-generated if not provided)
+ stream_callback: Optional callback invoked with each text delta during streaming.
+ Used by the TTS pipeline to start audio generation before the full response.
+ When None (default), API calls use the standard non-streaming path.
Returns:
Dict: Complete conversation result with final response and message history
@@ -3933,6 +4036,8 @@ class AIAgent:
# Installed once, transparent when streams are healthy, prevents crash on write.
_install_safe_stdio()
+ # Store stream callback for _interruptible_api_call to pick up
+ self._stream_callback = stream_callback
# Generate unique task_id if not provided to isolate VMs between concurrent tasks
effective_task_id = task_id or str(uuid.uuid4())
@@ -5377,20 +5482,24 @@ class AIAgent:
# Clear interrupt state after handling
self.clear_interrupt()
-
+
+ # Clear stream callback so it doesn't leak into future calls
+ self._stream_callback = None
+
return result
-
- def chat(self, message: str) -> str:
+
+ def chat(self, message: str, stream_callback: Optional[callable] = None) -> str:
"""
Simple chat interface that returns just the final response.
-
+
Args:
message (str): User message
-
+ stream_callback: Optional callback invoked with each text delta during streaming.
+
Returns:
str: Final assistant response
"""
- result = self.run_conversation(message)
+ result = self.run_conversation(message, stream_callback=stream_callback)
return result["final_response"]
diff --git a/tools/tts_tool.py b/tools/tts_tool.py
index 3544b20f..358bd6f1 100644
--- a/tools/tts_tool.py
+++ b/tools/tts_tool.py
@@ -25,9 +25,12 @@ import datetime
import json
import logging
import os
+import queue
+import re
import shutil
import subprocess
import tempfile
+import threading
from pathlib import Path
from typing import Dict, Any, Optional
@@ -55,6 +58,13 @@ try:
except ImportError:
_HAS_OPENAI = False
+try:
+ import sounddevice as sd
+ _HAS_AUDIO = True
+except ImportError:
+ sd = None # type: ignore[assignment]
+ _HAS_AUDIO = False
+
# ===========================================================================
# Defaults
@@ -63,6 +73,7 @@ DEFAULT_PROVIDER = "edge"
DEFAULT_EDGE_VOICE = "en-US-AriaNeural"
DEFAULT_ELEVENLABS_VOICE_ID = "pNInz6obpgDQGcFmaJgB" # Adam
DEFAULT_ELEVENLABS_MODEL_ID = "eleven_multilingual_v2"
+DEFAULT_ELEVENLABS_STREAMING_MODEL_ID = "eleven_flash_v2_5"
DEFAULT_OPENAI_MODEL = "gpt-4o-mini-tts"
DEFAULT_OPENAI_VOICE = "alloy"
DEFAULT_OUTPUT_DIR = str(Path(os.getenv("HERMES_HOME", Path.home() / ".hermes")) / "audio_cache")
@@ -420,6 +431,226 @@ def check_tts_requirements() -> bool:
return False
+# ===========================================================================
+# Streaming TTS: sentence-by-sentence pipeline for ElevenLabs
+# ===========================================================================
+# Sentence boundary pattern: punctuation followed by space or newline
+_SENTENCE_BOUNDARY_RE = re.compile(r'(?<=[.!?])(?:\s|\n)|(?:\n\n)')
+
+# Markdown stripping patterns (same as cli.py _voice_speak_response)
+_MD_CODE_BLOCK = re.compile(r'```[\s\S]*?```')
+_MD_LINK = re.compile(r'\[([^\]]+)\]\([^)]+\)')
+_MD_URL = re.compile(r'https?://\S+')
+_MD_BOLD = re.compile(r'\*\*(.+?)\*\*')
+_MD_ITALIC = re.compile(r'\*(.+?)\*')
+_MD_INLINE_CODE = re.compile(r'`(.+?)`')
+_MD_HEADER = re.compile(r'^#+\s*', flags=re.MULTILINE)
+_MD_LIST_ITEM = re.compile(r'^\s*[-*]\s+', flags=re.MULTILINE)
+_MD_HR = re.compile(r'---+')
+_MD_EXCESS_NL = re.compile(r'\n{3,}')
+
+
+def _strip_markdown_for_tts(text: str) -> str:
+ """Remove markdown formatting that shouldn't be spoken aloud."""
+ text = _MD_CODE_BLOCK.sub(' ', text)
+ text = _MD_LINK.sub(r'\1', text)
+ text = _MD_URL.sub('', text)
+ text = _MD_BOLD.sub(r'\1', text)
+ text = _MD_ITALIC.sub(r'\1', text)
+ text = _MD_INLINE_CODE.sub(r'\1', text)
+ text = _MD_HEADER.sub('', text)
+ text = _MD_LIST_ITEM.sub('', text)
+ text = _MD_HR.sub('', text)
+ text = _MD_EXCESS_NL.sub('\n\n', text)
+ return text.strip()
+
+
+def stream_tts_to_speaker(
+ text_queue: queue.Queue,
+ stop_event: threading.Event,
+ tts_done_event: threading.Event,
+):
+ """Consume text deltas from *text_queue*, buffer them into sentences,
+ and stream each sentence through ElevenLabs TTS to the speaker in
+ real-time.
+
+ Protocol:
+ * The producer puts ``str`` deltas onto *text_queue*.
+ * A ``None`` sentinel signals end-of-text (flush remaining buffer).
+ * *stop_event* can be set to abort early (e.g. user interrupt).
+ * *tts_done_event* is **set** in the ``finally`` block so callers
+ waiting on it (continuous voice mode) know playback is finished.
+ """
+ tts_done_event.clear()
+
+ try:
+ tts_config = _load_tts_config()
+ el_config = tts_config.get("elevenlabs", {})
+ voice_id = el_config.get("voice_id", DEFAULT_ELEVENLABS_VOICE_ID)
+ model_id = el_config.get("streaming_model_id",
+ el_config.get("model_id", DEFAULT_ELEVENLABS_STREAMING_MODEL_ID))
+
+ api_key = os.getenv("ELEVENLABS_API_KEY", "")
+ if not api_key:
+ logger.warning("ELEVENLABS_API_KEY not set; streaming TTS disabled")
+ return
+
+ client = ElevenLabs(api_key=api_key)
+
+ # Open a single sounddevice output stream for the lifetime of
+ # this function. ElevenLabs pcm_24000 produces signed 16-bit
+ # little-endian mono PCM at 24 kHz.
+ use_sd = _HAS_AUDIO and sd is not None
+ output_stream = None
+ if use_sd:
+ try:
+ import numpy as _np
+ output_stream = sd.OutputStream(
+ samplerate=24000, channels=1, dtype="int16",
+ )
+ output_stream.start()
+ except Exception as exc:
+ logger.warning("sounddevice OutputStream failed: %s", exc)
+ output_stream = None
+
+ sentence_buf = ""
+ in_think = False # track ... blocks
+ min_sentence_len = 20
+ long_flush_len = 100
+ queue_timeout = 0.5
+
+ def _speak_sentence(sentence: str):
+ """Generate and play audio for a single sentence."""
+ if stop_event.is_set():
+ return
+ cleaned = _strip_markdown_for_tts(sentence).strip()
+ if not cleaned:
+ return
+ # Truncate very long sentences
+ if len(cleaned) > MAX_TEXT_LENGTH:
+ cleaned = cleaned[:MAX_TEXT_LENGTH]
+ try:
+ audio_iter = client.text_to_speech.convert(
+ text=cleaned,
+ voice_id=voice_id,
+ model_id=model_id,
+ output_format="pcm_24000",
+ )
+ if output_stream is not None:
+ for chunk in audio_iter:
+ if stop_event.is_set():
+ break
+ import numpy as _np
+ audio_array = _np.frombuffer(chunk, dtype=_np.int16)
+ output_stream.write(audio_array.reshape(-1, 1))
+ else:
+ # Fallback: write chunks to temp file and play via system player
+ _play_via_tempfile(audio_iter, stop_event)
+ except Exception as exc:
+ logger.warning("Streaming TTS sentence failed: %s", exc)
+
+ def _play_via_tempfile(audio_iter, stop_evt):
+ """Write PCM chunks to a temp WAV file and play it."""
+ try:
+ import wave
+ tmp = tempfile.NamedTemporaryFile(suffix=".wav", delete=False)
+ tmp_path = tmp.name
+ with wave.open(tmp, "wb") as wf:
+ wf.setnchannels(1)
+ wf.setsampwidth(2) # 16-bit
+ wf.setframerate(24000)
+ for chunk in audio_iter:
+ if stop_evt.is_set():
+ break
+ wf.writeframes(chunk)
+ from tools.voice_mode import play_audio_file
+ play_audio_file(tmp_path)
+ os.unlink(tmp_path)
+ except Exception as exc:
+ logger.warning("Temp-file TTS fallback failed: %s", exc)
+
+ while not stop_event.is_set():
+ # Read next delta from queue
+ try:
+ delta = text_queue.get(timeout=queue_timeout)
+ except queue.Empty:
+ # Timeout: if we have accumulated a long buffer, flush it
+ if len(sentence_buf) > long_flush_len:
+ _speak_sentence(sentence_buf)
+ sentence_buf = ""
+ continue
+
+ if delta is None:
+ # End-of-text sentinel: flush remaining buffer
+ if sentence_buf.strip():
+ _speak_sentence(sentence_buf)
+ break
+
+ # --- Think block filtering ---
+ # Process delta character by character for think tags
+ i = 0
+ filtered_delta = []
+ while i < len(delta):
+ # Check for opening ", i)
+ if end != -1:
+ i = end + 1
+ else:
+ i = len(delta)
+ continue
+ # Check for closing tag
+ if delta[i:].startswith(""):
+ in_think = False
+ i += len("")
+ continue
+ if not in_think:
+ filtered_delta.append(delta[i])
+ i += 1
+
+ text = "".join(filtered_delta)
+ if not text:
+ continue
+
+ sentence_buf += text
+
+ # Check for sentence boundaries
+ while True:
+ m = _SENTENCE_BOUNDARY_RE.search(sentence_buf)
+ if m is None:
+ break
+ end_pos = m.end()
+ sentence = sentence_buf[:end_pos]
+ sentence_buf = sentence_buf[end_pos:]
+ # Merge short fragments into the next sentence
+ if len(sentence.strip()) < min_sentence_len:
+ sentence_buf = sentence + sentence_buf
+ break
+ _speak_sentence(sentence)
+
+ # Drain any remaining items from the queue
+ while True:
+ try:
+ text_queue.get_nowait()
+ except queue.Empty:
+ break
+
+ # Close the audio output stream
+ if output_stream is not None:
+ try:
+ output_stream.stop()
+ output_stream.close()
+ except Exception:
+ pass
+
+ except Exception as exc:
+ logger.warning("Streaming TTS pipeline error: %s", exc)
+ finally:
+ tts_done_event.set()
+
+
# ===========================================================================
# Main -- quick diagnostics
# ===========================================================================