BrowserUse_and_ComputerUse_.../tools/voice_mode.py
0xbyt4 32b033c11c feat: add silence filter, hallucination guard, and continuous mode control
- Skip silent recordings before STT call (RMS check in AudioRecorder.stop)
- Filter known Whisper hallucinations ("Thank you.", "Bye." etc.)
- Continuous mode: Ctrl+R starts loop, Ctrl+R during recording exits it
- Wait for TTS to finish before auto-restart to avoid recording speaker
- Silence timeout increased to 3s for natural pauses
- Tests: hallucination filter, silent recording skip, real speech passthrough
2026-03-14 14:25:28 +03:00

470 lines
16 KiB
Python

"""Voice Mode -- Push-to-talk audio recording and playback for the CLI.
Provides audio capture via sounddevice, WAV encoding via stdlib wave,
STT dispatch via tools.transcription_tools, and TTS playback via
sounddevice or system audio players.
Dependencies (optional):
pip install sounddevice numpy
or: pip install hermes-agent[voice]
"""
import logging
import os
import platform
import shutil
import subprocess
import tempfile
import threading
import time
import wave
from pathlib import Path
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Optional imports with graceful degradation
# ---------------------------------------------------------------------------
try:
import sounddevice as sd
import numpy as np
_HAS_AUDIO = True
except ImportError:
sd = None # type: ignore[assignment]
np = None # type: ignore[assignment]
_HAS_AUDIO = False
# ---------------------------------------------------------------------------
# Recording parameters
# ---------------------------------------------------------------------------
SAMPLE_RATE = 16000 # Whisper native rate
CHANNELS = 1 # Mono
DTYPE = "int16" # 16-bit PCM
SAMPLE_WIDTH = 2 # bytes per sample (int16)
MAX_RECORDING_SECONDS = 120 # Safety cap
# Silence detection defaults
SILENCE_RMS_THRESHOLD = 200 # RMS below this = silence (int16 range 0-32767)
SILENCE_DURATION_SECONDS = 3.0 # Seconds of continuous silence before auto-stop
# Temp directory for voice recordings
_TEMP_DIR = os.path.join(tempfile.gettempdir(), "hermes_voice")
# ============================================================================
# Audio cues (beep tones)
# ============================================================================
def play_beep(frequency: int = 880, duration: float = 0.12, count: int = 1) -> None:
"""Play a short beep tone using numpy + sounddevice.
Args:
frequency: Tone frequency in Hz (default 880 = A5).
duration: Duration of each beep in seconds.
count: Number of beeps to play (with short gap between).
"""
if not _HAS_AUDIO:
return
try:
gap = 0.06 # seconds between beeps
samples_per_beep = int(SAMPLE_RATE * duration)
samples_per_gap = int(SAMPLE_RATE * gap)
parts = []
for i in range(count):
t = np.linspace(0, duration, samples_per_beep, endpoint=False)
# Apply fade in/out to avoid click artifacts
tone = np.sin(2 * np.pi * frequency * t)
fade_len = min(int(SAMPLE_RATE * 0.01), samples_per_beep // 4)
tone[:fade_len] *= np.linspace(0, 1, fade_len)
tone[-fade_len:] *= np.linspace(1, 0, fade_len)
parts.append((tone * 0.3 * 32767).astype(np.int16))
if i < count - 1:
parts.append(np.zeros(samples_per_gap, dtype=np.int16))
audio = np.concatenate(parts)
sd.play(audio, samplerate=SAMPLE_RATE)
sd.wait()
except Exception as e:
logger.debug("Beep playback failed: %s", e)
# ============================================================================
# AudioRecorder
# ============================================================================
class AudioRecorder:
"""Thread-safe audio recorder using sounddevice.InputStream.
Usage::
recorder = AudioRecorder()
recorder.start(on_silence_stop=my_callback)
# ... user speaks ...
wav_path = recorder.stop() # returns path to WAV file
# or
recorder.cancel() # discard without saving
If ``on_silence_stop`` is provided, recording automatically stops when
the user is silent for ``silence_duration`` seconds and calls the callback.
"""
def __init__(self) -> None:
self._lock = threading.Lock()
self._stream: Any = None
self._frames: List[Any] = []
self._recording = False
self._start_time: float = 0.0
# Silence detection state
self._has_spoken = False
self._silence_start: float = 0.0
self._on_silence_stop = None
self._silence_threshold: int = SILENCE_RMS_THRESHOLD
self._silence_duration: float = SILENCE_DURATION_SECONDS
# -- public properties ---------------------------------------------------
@property
def is_recording(self) -> bool:
return self._recording
@property
def elapsed_seconds(self) -> float:
if not self._recording:
return 0.0
return time.monotonic() - self._start_time
# -- public methods ------------------------------------------------------
def start(self, on_silence_stop=None) -> None:
"""Start capturing audio from the default input device.
Args:
on_silence_stop: Optional callback invoked (in a daemon thread) when
silence is detected after speech. The callback receives no arguments.
Use this to auto-stop recording and trigger transcription.
Raises ``RuntimeError`` if sounddevice/numpy are not installed
or if a recording is already in progress.
"""
if not _HAS_AUDIO:
raise RuntimeError(
"Voice mode requires sounddevice and numpy.\n"
"Install with: pip install sounddevice numpy\n"
"Or: pip install hermes-agent[voice]"
)
with self._lock:
if self._recording:
return # already recording
self._frames = []
self._start_time = time.monotonic()
self._has_spoken = False
self._silence_start = 0.0
self._on_silence_stop = on_silence_stop
def _callback(indata, frames, time_info, status): # noqa: ARG001
if status:
logger.debug("sounddevice status: %s", status)
self._frames.append(indata.copy())
# Silence detection: compute RMS of this chunk
if self._on_silence_stop is not None and self._recording:
rms = int(np.sqrt(np.mean(indata.astype(np.float64) ** 2)))
now = time.monotonic()
if rms > self._silence_threshold:
self._has_spoken = True
self._silence_start = 0.0
elif self._has_spoken:
# User was speaking and now is silent
if self._silence_start == 0.0:
self._silence_start = now
elif now - self._silence_start >= self._silence_duration:
logger.info("Silence detected (%.1fs), auto-stopping",
self._silence_duration)
cb = self._on_silence_stop
self._on_silence_stop = None # fire only once
if cb:
threading.Thread(target=cb, daemon=True).start()
self._stream = sd.InputStream(
samplerate=SAMPLE_RATE,
channels=CHANNELS,
dtype=DTYPE,
callback=_callback,
)
self._stream.start()
self._recording = True
logger.info("Voice recording started (rate=%d, channels=%d)", SAMPLE_RATE, CHANNELS)
def stop(self) -> Optional[str]:
"""Stop recording and write captured audio to a WAV file.
Returns:
Path to the WAV file, or ``None`` if no audio was captured.
"""
with self._lock:
if not self._recording:
return None
self._recording = False
if self._stream is not None:
try:
self._stream.stop()
self._stream.close()
except Exception:
pass
self._stream = None
if not self._frames:
return None
# Concatenate frames and write WAV
audio_data = np.concatenate(self._frames, axis=0)
self._frames = []
elapsed = time.monotonic() - self._start_time
logger.info("Voice recording stopped (%.1fs, %d samples)", elapsed, len(audio_data))
# Skip very short recordings (< 0.3s of audio)
min_samples = int(SAMPLE_RATE * 0.3)
if len(audio_data) < min_samples:
logger.debug("Recording too short (%d samples), discarding", len(audio_data))
return None
# Skip silent recordings (RMS below threshold = no real speech)
rms = int(np.sqrt(np.mean(audio_data.astype(np.float64) ** 2)))
if rms < SILENCE_RMS_THRESHOLD:
logger.info("Recording too quiet (RMS=%d < %d), discarding", rms, SILENCE_RMS_THRESHOLD)
return None
return self._write_wav(audio_data)
def cancel(self) -> None:
"""Stop recording and discard all captured audio."""
with self._lock:
self._recording = False
self._frames = []
if self._stream is not None:
try:
self._stream.stop()
self._stream.close()
except Exception:
pass
self._stream = None
logger.info("Voice recording cancelled")
# -- private helpers -----------------------------------------------------
@staticmethod
def _write_wav(audio_data) -> str:
"""Write numpy int16 audio data to a WAV file.
Returns the file path.
"""
os.makedirs(_TEMP_DIR, exist_ok=True)
timestamp = time.strftime("%Y%m%d_%H%M%S")
wav_path = os.path.join(_TEMP_DIR, f"recording_{timestamp}.wav")
with wave.open(wav_path, "wb") as wf:
wf.setnchannels(CHANNELS)
wf.setsampwidth(SAMPLE_WIDTH)
wf.setframerate(SAMPLE_RATE)
wf.writeframes(audio_data.tobytes())
file_size = os.path.getsize(wav_path)
logger.info("WAV written: %s (%d bytes)", wav_path, file_size)
return wav_path
# ============================================================================
# Whisper hallucination filter
# ============================================================================
# Whisper commonly hallucinates these phrases on silent/near-silent audio.
WHISPER_HALLUCINATIONS = {
"thank you.",
"thank you",
"thanks for watching.",
"thanks for watching",
"subscribe to my channel.",
"subscribe to my channel",
"like and subscribe.",
"like and subscribe",
"please subscribe.",
"please subscribe",
"thank you for watching.",
"thank you for watching",
"bye.",
"bye",
"you",
"the end.",
"the end",
}
def is_whisper_hallucination(transcript: str) -> bool:
"""Check if a transcript is a known Whisper hallucination on silence."""
return transcript.strip().lower() in WHISPER_HALLUCINATIONS
# ============================================================================
# STT dispatch
# ============================================================================
def transcribe_recording(wav_path: str, model: Optional[str] = None) -> Dict[str, Any]:
"""Transcribe a WAV recording using the existing Whisper pipeline.
Delegates to ``tools.transcription_tools.transcribe_audio()``.
Filters out known Whisper hallucinations on silent audio.
Args:
wav_path: Path to the WAV file.
model: Whisper model name (default: from config or ``whisper-1``).
Returns:
Dict with ``success``, ``transcript``, and optionally ``error``.
"""
from tools.transcription_tools import transcribe_audio
result = transcribe_audio(wav_path, model=model)
# Filter out Whisper hallucinations (common on silent/near-silent audio)
if result.get("success") and is_whisper_hallucination(result.get("transcript", "")):
logger.info("Filtered Whisper hallucination: %r", result["transcript"])
return {"success": True, "transcript": "", "filtered": True}
return result
# ============================================================================
# Audio playback
# ============================================================================
def play_audio_file(file_path: str) -> bool:
"""Play an audio file through the default output device.
Strategy:
1. WAV files via ``sounddevice.play()`` when available.
2. System commands: ``afplay`` (macOS), ``ffplay`` (cross-platform),
``aplay`` (Linux ALSA).
Returns:
``True`` if playback succeeded, ``False`` otherwise.
"""
if not os.path.isfile(file_path):
logger.warning("Audio file not found: %s", file_path)
return False
# Try sounddevice for WAV files
if _HAS_AUDIO and file_path.endswith(".wav"):
try:
with wave.open(file_path, "rb") as wf:
frames = wf.readframes(wf.getnframes())
audio_data = np.frombuffer(frames, dtype=np.int16)
sample_rate = wf.getframerate()
sd.play(audio_data, samplerate=sample_rate)
sd.wait()
return True
except Exception as e:
logger.debug("sounddevice playback failed: %s", e)
# Fall back to system audio players
system = platform.system()
players = []
if system == "Darwin":
players.append(["afplay", file_path])
players.append(["ffplay", "-nodisp", "-autoexit", "-loglevel", "quiet", file_path])
if system == "Linux":
players.append(["aplay", "-q", file_path])
for cmd in players:
exe = shutil.which(cmd[0])
if exe:
try:
subprocess.run(cmd, capture_output=True, timeout=300)
return True
except Exception as e:
logger.debug("System player %s failed: %s", cmd[0], e)
logger.warning("No audio player available for %s", file_path)
return False
# ============================================================================
# Requirements check
# ============================================================================
def check_voice_requirements() -> Dict[str, Any]:
"""Check if all voice mode requirements are met.
Returns:
Dict with ``available``, ``audio_available``, ``stt_key_set``,
``missing_packages``, and ``details``.
"""
openai_key = bool(os.getenv("VOICE_TOOLS_OPENAI_KEY"))
groq_key = bool(os.getenv("GROQ_API_KEY"))
stt_key_set = openai_key or groq_key
missing: List[str] = []
if not _HAS_AUDIO:
missing.extend(["sounddevice", "numpy"])
available = _HAS_AUDIO and stt_key_set
details_parts = []
if _HAS_AUDIO:
details_parts.append("Audio capture: OK")
else:
details_parts.append("Audio capture: MISSING (pip install sounddevice numpy)")
if openai_key:
details_parts.append("STT API key: OK (OpenAI)")
elif groq_key:
details_parts.append("STT API key: OK (Groq)")
else:
details_parts.append("STT API key: MISSING (set GROQ_API_KEY or VOICE_TOOLS_OPENAI_KEY)")
return {
"available": available,
"audio_available": _HAS_AUDIO,
"stt_key_set": stt_key_set,
"missing_packages": missing,
"details": "\n".join(details_parts),
}
# ============================================================================
# Temp file cleanup
# ============================================================================
def cleanup_temp_recordings(max_age_seconds: int = 3600) -> int:
"""Remove old temporary voice recording files.
Args:
max_age_seconds: Delete files older than this (default: 1 hour).
Returns:
Number of files deleted.
"""
if not os.path.isdir(_TEMP_DIR):
return 0
deleted = 0
now = time.time()
for entry in os.scandir(_TEMP_DIR):
if entry.is_file() and entry.name.startswith("recording_") and entry.name.endswith(".wav"):
try:
age = now - entry.stat().st_mtime
if age > max_age_seconds:
os.unlink(entry.path)
deleted += 1
except OSError:
pass
if deleted:
logger.debug("Cleaned up %d old voice recordings", deleted)
return deleted