feat: add streaming sentence-by-sentence TTS via ElevenLabs

Stream audio to speaker as the agent generates tokens instead of
waiting for the full response. First sentence plays within ~1-2s
of agent starting to respond.

- run_agent: add stream_callback to run_conversation/chat, streaming
  path in _interruptible_api_call accumulates chunks into mock
  ChatCompletion while forwarding content deltas to callback
- tts_tool: add stream_tts_to_speaker() with sentence buffering,
  think block filtering, markdown stripping, ElevenLabs pcm_24000
  streaming to sounddevice OutputStream
- cli: wire up streaming TTS pipeline in chat(), detect elevenlabs
  provider + sounddevice availability, skip batch TTS when streaming
  is active, signal stop on interrupt

Falls back to batch TTS for Edge/OpenAI providers or when
elevenlabs/sounddevice are not available. Zero impact on non-voice
mode (callback defaults to None).
This commit is contained in:
0xbyt4 2026-03-03 23:03:42 +03:00
parent d7425343ee
commit 179d9e1a22
3 changed files with 410 additions and 18 deletions

70
cli.py
View file

@ -4093,19 +4093,60 @@ class HermesCLI:
try:
# Run the conversation with interrupt monitoring
result = None
# --- Streaming TTS setup ---
# When ElevenLabs is the TTS provider and sounddevice is available,
# we stream audio sentence-by-sentence as the agent generates tokens
# instead of waiting for the full response.
use_streaming_tts = False
text_queue = None
tts_thread = None
stream_callback = None
stop_event = None
if self._voice_tts:
try:
from tools.tts_tool import (
_load_tts_config as _load_tts_cfg,
_get_provider as _get_prov,
_HAS_ELEVENLABS as _el_ok,
_HAS_AUDIO as _audio_ok,
stream_tts_to_speaker,
)
_tts_cfg = _load_tts_cfg()
if (_get_prov(_tts_cfg) == "elevenlabs" and _el_ok and _audio_ok):
use_streaming_tts = True
except Exception:
pass
if use_streaming_tts:
text_queue = queue.Queue()
stop_event = threading.Event()
tts_thread = threading.Thread(
target=stream_tts_to_speaker,
args=(text_queue, stop_event, self._voice_tts_done),
daemon=True,
)
tts_thread.start()
def stream_callback(delta: str):
if text_queue is not None:
text_queue.put(delta)
def run_agent():
nonlocal result
result = self.agent.run_conversation(
user_message=message,
conversation_history=self.conversation_history[:-1], # Exclude the message we just added
stream_callback=stream_callback,
task_id=self.session_id,
)
# Start agent in background thread
agent_thread = threading.Thread(target=run_agent)
agent_thread.start()
# Monitor the dedicated interrupt queue while the agent runs.
# _interrupt_queue is separate from _pending_input, so process_loop
# and chat() never compete for the same queue.
@ -4124,6 +4165,9 @@ class HermesCLI:
if self._clarify_state or self._clarify_freetext:
continue
print(f"\n⚡ New message detected, interrupting...")
# Signal TTS to stop on interrupt
if stop_event is not None:
stop_event.set()
self.agent.interrupt(interrupt_msg)
# Debug: log to file (stdout may be devnull from redirect_stdout)
try:
@ -4143,9 +4187,15 @@ class HermesCLI:
else:
# Fallback for non-interactive mode (e.g., single-query)
agent_thread.join(0.1)
agent_thread.join() # Ensure agent thread completes
# Signal end-of-text to TTS consumer and wait for it to finish
if use_streaming_tts and text_queue is not None:
text_queue.put(None) # sentinel
if tts_thread is not None:
tts_thread.join(timeout=120)
# Drain any remaining agent output still in the StdoutProxy
# buffer so tool/status lines render ABOVE our response box.
# The flush pushes data into the renderer queue; the short
@ -4156,15 +4206,15 @@ class HermesCLI:
# Update history with full conversation
self.conversation_history = result.get("messages", self.conversation_history) if result else self.conversation_history
# Get the final response
response = result.get("final_response", "") if result else ""
# Handle failed results (e.g., non-retryable errors like invalid model)
if result and result.get("failed") and not response:
error_detail = result.get("error", "Unknown error")
response = f"Error: {error_detail}"
# Handle interrupt - check if we were interrupted
pending_message = None
if result and result.get("interrupted"):
@ -4172,8 +4222,9 @@ class HermesCLI:
# Add indicator that we were interrupted
if response and pending_message:
response = response + "\n\n---\n_[Interrupted - processing new message]_"
response_previewed = result.get("response_previewed", False) if result else False
# Display reasoning (thinking) box if enabled and available
if self.show_reasoning and result:
reasoning = result.get("last_reasoning")
@ -4226,7 +4277,8 @@ class HermesCLI:
sys.stdout.flush()
# Speak response aloud if voice TTS is enabled
if self._voice_tts and response:
# Skip batch TTS when streaming TTS already handled it
if self._voice_tts and response and not use_streaming_tts:
threading.Thread(
target=self._voice_speak_response,
args=(response,),

View file

@ -2576,10 +2576,16 @@ class AIAgent:
"""
Run the API call in a background thread so the main conversation loop
can detect interrupts without waiting for the full HTTP round-trip.
On interrupt, closes the HTTP client to cancel the in-flight request
(stops token generation and avoids wasting money), then rebuilds the
client for future calls.
When ``self._stream_callback`` is set (streaming TTS mode), the call
uses ``stream=True`` and iterates over chunks inside the background
thread. Content deltas are forwarded to the callback in real-time
while the full response is accumulated and returned as a
``SimpleNamespace`` that mimics a normal ``ChatCompletion``.
"""
result = {"response": None, "error": None}
@ -2587,10 +2593,103 @@ class AIAgent:
try:
if self.api_mode == "codex_responses":
result["response"] = self._run_codex_stream(api_kwargs)
return
elif self.api_mode == "anthropic_messages":
result["response"] = self._anthropic_client.messages.create(**api_kwargs)
else:
return
cb = getattr(self, "_stream_callback", None)
if cb is None:
# Non-streaming path (default)
result["response"] = self.client.chat.completions.create(**api_kwargs)
return
# --- Streaming path for TTS pipeline ---
stream_kwargs = {**api_kwargs, "stream": True}
stream = self.client.chat.completions.create(**stream_kwargs)
content_parts: list[str] = []
tool_calls_acc: dict[int, dict] = {} # index -> {id, type, function:{name, arguments}}
finish_reason = None
model_name = None
role = "assistant"
for chunk in stream:
if not chunk.choices:
# Usage-only or empty chunk
if hasattr(chunk, "model") and chunk.model:
model_name = chunk.model
continue
delta = chunk.choices[0].delta
if hasattr(chunk, "model") and chunk.model:
model_name = chunk.model
# Content delta
if delta and delta.content:
content_parts.append(delta.content)
try:
cb(delta.content)
except Exception:
pass
# Tool call deltas
if delta and delta.tool_calls:
for tc_delta in delta.tool_calls:
idx = tc_delta.index
if idx not in tool_calls_acc:
tool_calls_acc[idx] = {
"id": tc_delta.id or "",
"type": "function",
"function": {"name": "", "arguments": ""},
}
entry = tool_calls_acc[idx]
if tc_delta.id:
entry["id"] = tc_delta.id
if tc_delta.function:
if tc_delta.function.name:
entry["function"]["name"] += tc_delta.function.name
if tc_delta.function.arguments:
entry["function"]["arguments"] += tc_delta.function.arguments
if chunk.choices[0].finish_reason:
finish_reason = chunk.choices[0].finish_reason
# Build a mock ChatCompletion matching the non-streaming interface
full_content = "".join(content_parts) or None
mock_tool_calls = None
if tool_calls_acc:
mock_tool_calls = []
for idx in sorted(tool_calls_acc):
tc = tool_calls_acc[idx]
mock_tool_calls.append(SimpleNamespace(
id=tc["id"],
type=tc["type"],
function=SimpleNamespace(
name=tc["function"]["name"],
arguments=tc["function"]["arguments"],
),
))
mock_message = SimpleNamespace(
role=role,
content=full_content,
tool_calls=mock_tool_calls,
reasoning_content=None,
)
mock_choice = SimpleNamespace(
index=0,
message=mock_message,
finish_reason=finish_reason or "stop",
)
mock_response = SimpleNamespace(
id="stream-" + str(uuid.uuid4()),
model=model_name,
choices=[mock_choice],
usage=None,
)
result["response"] = mock_response
except Exception as e:
result["error"] = e
@ -3915,7 +4014,8 @@ class AIAgent:
user_message: str,
system_message: str = None,
conversation_history: List[Dict[str, Any]] = None,
task_id: str = None
task_id: str = None,
stream_callback: Optional[callable] = None,
) -> Dict[str, Any]:
"""
Run a complete conversation with tool calling until completion.
@ -3925,6 +4025,9 @@ class AIAgent:
system_message (str): Custom system message (optional, overrides ephemeral_system_prompt if provided)
conversation_history (List[Dict]): Previous conversation messages (optional)
task_id (str): Unique identifier for this task to isolate VMs between concurrent tasks (optional, auto-generated if not provided)
stream_callback: Optional callback invoked with each text delta during streaming.
Used by the TTS pipeline to start audio generation before the full response.
When None (default), API calls use the standard non-streaming path.
Returns:
Dict: Complete conversation result with final response and message history
@ -3933,6 +4036,8 @@ class AIAgent:
# Installed once, transparent when streams are healthy, prevents crash on write.
_install_safe_stdio()
# Store stream callback for _interruptible_api_call to pick up
self._stream_callback = stream_callback
# Generate unique task_id if not provided to isolate VMs between concurrent tasks
effective_task_id = task_id or str(uuid.uuid4())
@ -5377,20 +5482,24 @@ class AIAgent:
# Clear interrupt state after handling
self.clear_interrupt()
# Clear stream callback so it doesn't leak into future calls
self._stream_callback = None
return result
def chat(self, message: str) -> str:
def chat(self, message: str, stream_callback: Optional[callable] = None) -> str:
"""
Simple chat interface that returns just the final response.
Args:
message (str): User message
stream_callback: Optional callback invoked with each text delta during streaming.
Returns:
str: Final assistant response
"""
result = self.run_conversation(message)
result = self.run_conversation(message, stream_callback=stream_callback)
return result["final_response"]

View file

@ -25,9 +25,12 @@ import datetime
import json
import logging
import os
import queue
import re
import shutil
import subprocess
import tempfile
import threading
from pathlib import Path
from typing import Dict, Any, Optional
@ -55,6 +58,13 @@ try:
except ImportError:
_HAS_OPENAI = False
try:
import sounddevice as sd
_HAS_AUDIO = True
except ImportError:
sd = None # type: ignore[assignment]
_HAS_AUDIO = False
# ===========================================================================
# Defaults
@ -63,6 +73,7 @@ DEFAULT_PROVIDER = "edge"
DEFAULT_EDGE_VOICE = "en-US-AriaNeural"
DEFAULT_ELEVENLABS_VOICE_ID = "pNInz6obpgDQGcFmaJgB" # Adam
DEFAULT_ELEVENLABS_MODEL_ID = "eleven_multilingual_v2"
DEFAULT_ELEVENLABS_STREAMING_MODEL_ID = "eleven_flash_v2_5"
DEFAULT_OPENAI_MODEL = "gpt-4o-mini-tts"
DEFAULT_OPENAI_VOICE = "alloy"
DEFAULT_OUTPUT_DIR = str(Path(os.getenv("HERMES_HOME", Path.home() / ".hermes")) / "audio_cache")
@ -420,6 +431,226 @@ def check_tts_requirements() -> bool:
return False
# ===========================================================================
# Streaming TTS: sentence-by-sentence pipeline for ElevenLabs
# ===========================================================================
# Sentence boundary pattern: punctuation followed by space or newline
_SENTENCE_BOUNDARY_RE = re.compile(r'(?<=[.!?])(?:\s|\n)|(?:\n\n)')
# Markdown stripping patterns (same as cli.py _voice_speak_response)
_MD_CODE_BLOCK = re.compile(r'```[\s\S]*?```')
_MD_LINK = re.compile(r'\[([^\]]+)\]\([^)]+\)')
_MD_URL = re.compile(r'https?://\S+')
_MD_BOLD = re.compile(r'\*\*(.+?)\*\*')
_MD_ITALIC = re.compile(r'\*(.+?)\*')
_MD_INLINE_CODE = re.compile(r'`(.+?)`')
_MD_HEADER = re.compile(r'^#+\s*', flags=re.MULTILINE)
_MD_LIST_ITEM = re.compile(r'^\s*[-*]\s+', flags=re.MULTILINE)
_MD_HR = re.compile(r'---+')
_MD_EXCESS_NL = re.compile(r'\n{3,}')
def _strip_markdown_for_tts(text: str) -> str:
"""Remove markdown formatting that shouldn't be spoken aloud."""
text = _MD_CODE_BLOCK.sub(' ', text)
text = _MD_LINK.sub(r'\1', text)
text = _MD_URL.sub('', text)
text = _MD_BOLD.sub(r'\1', text)
text = _MD_ITALIC.sub(r'\1', text)
text = _MD_INLINE_CODE.sub(r'\1', text)
text = _MD_HEADER.sub('', text)
text = _MD_LIST_ITEM.sub('', text)
text = _MD_HR.sub('', text)
text = _MD_EXCESS_NL.sub('\n\n', text)
return text.strip()
def stream_tts_to_speaker(
text_queue: queue.Queue,
stop_event: threading.Event,
tts_done_event: threading.Event,
):
"""Consume text deltas from *text_queue*, buffer them into sentences,
and stream each sentence through ElevenLabs TTS to the speaker in
real-time.
Protocol:
* The producer puts ``str`` deltas onto *text_queue*.
* A ``None`` sentinel signals end-of-text (flush remaining buffer).
* *stop_event* can be set to abort early (e.g. user interrupt).
* *tts_done_event* is **set** in the ``finally`` block so callers
waiting on it (continuous voice mode) know playback is finished.
"""
tts_done_event.clear()
try:
tts_config = _load_tts_config()
el_config = tts_config.get("elevenlabs", {})
voice_id = el_config.get("voice_id", DEFAULT_ELEVENLABS_VOICE_ID)
model_id = el_config.get("streaming_model_id",
el_config.get("model_id", DEFAULT_ELEVENLABS_STREAMING_MODEL_ID))
api_key = os.getenv("ELEVENLABS_API_KEY", "")
if not api_key:
logger.warning("ELEVENLABS_API_KEY not set; streaming TTS disabled")
return
client = ElevenLabs(api_key=api_key)
# Open a single sounddevice output stream for the lifetime of
# this function. ElevenLabs pcm_24000 produces signed 16-bit
# little-endian mono PCM at 24 kHz.
use_sd = _HAS_AUDIO and sd is not None
output_stream = None
if use_sd:
try:
import numpy as _np
output_stream = sd.OutputStream(
samplerate=24000, channels=1, dtype="int16",
)
output_stream.start()
except Exception as exc:
logger.warning("sounddevice OutputStream failed: %s", exc)
output_stream = None
sentence_buf = ""
in_think = False # track <think>...</think> blocks
min_sentence_len = 20
long_flush_len = 100
queue_timeout = 0.5
def _speak_sentence(sentence: str):
"""Generate and play audio for a single sentence."""
if stop_event.is_set():
return
cleaned = _strip_markdown_for_tts(sentence).strip()
if not cleaned:
return
# Truncate very long sentences
if len(cleaned) > MAX_TEXT_LENGTH:
cleaned = cleaned[:MAX_TEXT_LENGTH]
try:
audio_iter = client.text_to_speech.convert(
text=cleaned,
voice_id=voice_id,
model_id=model_id,
output_format="pcm_24000",
)
if output_stream is not None:
for chunk in audio_iter:
if stop_event.is_set():
break
import numpy as _np
audio_array = _np.frombuffer(chunk, dtype=_np.int16)
output_stream.write(audio_array.reshape(-1, 1))
else:
# Fallback: write chunks to temp file and play via system player
_play_via_tempfile(audio_iter, stop_event)
except Exception as exc:
logger.warning("Streaming TTS sentence failed: %s", exc)
def _play_via_tempfile(audio_iter, stop_evt):
"""Write PCM chunks to a temp WAV file and play it."""
try:
import wave
tmp = tempfile.NamedTemporaryFile(suffix=".wav", delete=False)
tmp_path = tmp.name
with wave.open(tmp, "wb") as wf:
wf.setnchannels(1)
wf.setsampwidth(2) # 16-bit
wf.setframerate(24000)
for chunk in audio_iter:
if stop_evt.is_set():
break
wf.writeframes(chunk)
from tools.voice_mode import play_audio_file
play_audio_file(tmp_path)
os.unlink(tmp_path)
except Exception as exc:
logger.warning("Temp-file TTS fallback failed: %s", exc)
while not stop_event.is_set():
# Read next delta from queue
try:
delta = text_queue.get(timeout=queue_timeout)
except queue.Empty:
# Timeout: if we have accumulated a long buffer, flush it
if len(sentence_buf) > long_flush_len:
_speak_sentence(sentence_buf)
sentence_buf = ""
continue
if delta is None:
# End-of-text sentinel: flush remaining buffer
if sentence_buf.strip():
_speak_sentence(sentence_buf)
break
# --- Think block filtering ---
# Process delta character by character for think tags
i = 0
filtered_delta = []
while i < len(delta):
# Check for opening <think tag
if delta[i:].startswith("<think"):
in_think = True
# Skip past the tag
end = delta.find(">", i)
if end != -1:
i = end + 1
else:
i = len(delta)
continue
# Check for closing </think> tag
if delta[i:].startswith("</think>"):
in_think = False
i += len("</think>")
continue
if not in_think:
filtered_delta.append(delta[i])
i += 1
text = "".join(filtered_delta)
if not text:
continue
sentence_buf += text
# Check for sentence boundaries
while True:
m = _SENTENCE_BOUNDARY_RE.search(sentence_buf)
if m is None:
break
end_pos = m.end()
sentence = sentence_buf[:end_pos]
sentence_buf = sentence_buf[end_pos:]
# Merge short fragments into the next sentence
if len(sentence.strip()) < min_sentence_len:
sentence_buf = sentence + sentence_buf
break
_speak_sentence(sentence)
# Drain any remaining items from the queue
while True:
try:
text_queue.get_nowait()
except queue.Empty:
break
# Close the audio output stream
if output_stream is not None:
try:
output_stream.stop()
output_stream.close()
except Exception:
pass
except Exception as exc:
logger.warning("Streaming TTS pipeline error: %s", exc)
finally:
tts_done_event.set()
# ===========================================================================
# Main -- quick diagnostics
# ===========================================================================