"""Tests for CLI voice mode integration -- command parsing, markdown stripping, state management, streaming TTS activation, voice message prefix, _vprint.""" import ast import re import threading from types import SimpleNamespace from unittest.mock import MagicMock, patch import pytest # ============================================================================ # Markdown stripping (same logic as _voice_speak_response) # ============================================================================ def _strip_markdown_for_tts(text: str) -> str: """Replicate the markdown stripping logic from cli._voice_speak_response.""" tts_text = text[:4000] if len(text) > 4000 else text tts_text = re.sub(r'```[\s\S]*?```', ' ', tts_text) # fenced code blocks tts_text = re.sub(r'\[([^\]]+)\]\([^)]+\)', r'\1', tts_text) # [text](url) -> text tts_text = re.sub(r'https?://\S+', '', tts_text) # URLs tts_text = re.sub(r'\*\*(.+?)\*\*', r'\1', tts_text) # bold tts_text = re.sub(r'\*(.+?)\*', r'\1', tts_text) # italic tts_text = re.sub(r'`(.+?)`', r'\1', tts_text) # inline code tts_text = re.sub(r'^#+\s*', '', tts_text, flags=re.MULTILINE) # headers tts_text = re.sub(r'^\s*[-*]\s+', '', tts_text, flags=re.MULTILINE) # list items tts_text = re.sub(r'---+', '', tts_text) # horizontal rules tts_text = re.sub(r'\n{3,}', '\n\n', tts_text) # excessive newlines return tts_text.strip() class TestMarkdownStripping: def test_strips_bold(self): assert _strip_markdown_for_tts("This is **bold** text") == "This is bold text" def test_strips_italic(self): assert _strip_markdown_for_tts("This is *italic* text") == "This is italic text" def test_strips_inline_code(self): assert _strip_markdown_for_tts("Run `pip install foo`") == "Run pip install foo" def test_strips_fenced_code_blocks(self): text = "Here is code:\n```python\nprint('hello')\n```\nDone." result = _strip_markdown_for_tts(text) assert "print" not in result assert "Done." in result def test_strips_headers(self): assert _strip_markdown_for_tts("## Summary\nSome text") == "Summary\nSome text" def test_strips_list_markers(self): text = "- item one\n- item two\n* item three" result = _strip_markdown_for_tts(text) assert "item one" in result assert "- " not in result assert "* " not in result def test_strips_urls(self): text = "Visit https://example.com for details" result = _strip_markdown_for_tts(text) assert "https://" not in result assert "Visit" in result def test_strips_markdown_links(self): text = "See [the docs](https://example.com/docs) for info" result = _strip_markdown_for_tts(text) assert "the docs" in result assert "https://" not in result assert "[" not in result def test_strips_horizontal_rules(self): text = "Part one\n---\nPart two" result = _strip_markdown_for_tts(text) assert "---" not in result assert "Part one" in result assert "Part two" in result def test_empty_after_stripping_returns_empty(self): text = "```python\nprint('hello')\n```" result = _strip_markdown_for_tts(text) assert result == "" def test_truncates_long_text(self): text = "a" * 5000 result = _strip_markdown_for_tts(text) assert len(result) <= 4000 def test_complex_response(self): text = ( "## Answer\n\n" "Here's how to do it:\n\n" "```python\ndef hello():\n print('hi')\n```\n\n" "Run it with `python main.py`. " "See [docs](https://example.com) for more.\n\n" "- Step one\n- Step two\n\n" "---\n\n" "**Good luck!**" ) result = _strip_markdown_for_tts(text) assert "```" not in result assert "https://" not in result assert "**" not in result assert "---" not in result assert "Answer" in result assert "Good luck!" in result assert "docs" in result # ============================================================================ # Voice command parsing # ============================================================================ class TestVoiceCommandParsing: """Test _handle_voice_command logic without full CLI setup.""" def test_parse_subcommands(self): """Verify subcommand extraction from /voice commands.""" test_cases = [ ("/voice on", "on"), ("/voice off", "off"), ("/voice tts", "tts"), ("/voice status", "status"), ("/voice", ""), ("/voice ON ", "on"), ] for command, expected in test_cases: parts = command.strip().split(maxsplit=1) subcommand = parts[1].lower().strip() if len(parts) > 1 else "" assert subcommand == expected, f"Failed for {command!r}: got {subcommand!r}" # ============================================================================ # Voice state thread safety # ============================================================================ class TestVoiceStateLock: def test_lock_protects_state(self): """Verify that concurrent state changes don't corrupt state.""" lock = threading.Lock() state = {"recording": False, "count": 0} def toggle_many(n): for _ in range(n): with lock: state["recording"] = not state["recording"] state["count"] += 1 threads = [threading.Thread(target=toggle_many, args=(1000,)) for _ in range(4)] for t in threads: t.start() for t in threads: t.join() assert state["count"] == 4000 # ============================================================================ # Streaming TTS lazy import activation (Bug A fix) # ============================================================================ class TestStreamingTTSActivation: """Verify streaming TTS uses lazy imports to check availability.""" def test_activates_when_elevenlabs_and_sounddevice_available(self): """use_streaming_tts should be True when provider is elevenlabs and both lazy imports succeed.""" use_streaming_tts = False try: from tools.tts_tool import ( _load_tts_config as _load_tts_cfg, _get_provider as _get_prov, _import_elevenlabs, _import_sounddevice, ) assert callable(_import_elevenlabs) assert callable(_import_sounddevice) except ImportError: pytest.skip("tools.tts_tool not available") with patch("tools.tts_tool._load_tts_config") as mock_cfg, \ patch("tools.tts_tool._get_provider", return_value="elevenlabs"), \ patch("tools.tts_tool._import_elevenlabs") as mock_el, \ patch("tools.tts_tool._import_sounddevice") as mock_sd: mock_cfg.return_value = {"provider": "elevenlabs"} mock_el.return_value = MagicMock() mock_sd.return_value = MagicMock() from tools.tts_tool import ( _load_tts_config as load_cfg, _get_provider as get_prov, _import_elevenlabs as import_el, _import_sounddevice as import_sd, ) cfg = load_cfg() if get_prov(cfg) == "elevenlabs": import_el() import_sd() use_streaming_tts = True assert use_streaming_tts is True def test_does_not_activate_when_elevenlabs_missing(self): """use_streaming_tts stays False when elevenlabs import fails.""" use_streaming_tts = False with patch("tools.tts_tool._load_tts_config", return_value={"provider": "elevenlabs"}), \ patch("tools.tts_tool._get_provider", return_value="elevenlabs"), \ patch("tools.tts_tool._import_elevenlabs", side_effect=ImportError("no elevenlabs")): try: from tools.tts_tool import ( _load_tts_config as load_cfg, _get_provider as get_prov, _import_elevenlabs as import_el, _import_sounddevice as import_sd, ) cfg = load_cfg() if get_prov(cfg) == "elevenlabs": import_el() import_sd() use_streaming_tts = True except (ImportError, OSError): pass assert use_streaming_tts is False def test_does_not_activate_when_sounddevice_missing(self): """use_streaming_tts stays False when sounddevice import fails.""" use_streaming_tts = False with patch("tools.tts_tool._load_tts_config", return_value={"provider": "elevenlabs"}), \ patch("tools.tts_tool._get_provider", return_value="elevenlabs"), \ patch("tools.tts_tool._import_elevenlabs", return_value=MagicMock()), \ patch("tools.tts_tool._import_sounddevice", side_effect=OSError("no PortAudio")): try: from tools.tts_tool import ( _load_tts_config as load_cfg, _get_provider as get_prov, _import_elevenlabs as import_el, _import_sounddevice as import_sd, ) cfg = load_cfg() if get_prov(cfg) == "elevenlabs": import_el() import_sd() use_streaming_tts = True except (ImportError, OSError): pass assert use_streaming_tts is False def test_does_not_activate_for_non_elevenlabs_provider(self): """use_streaming_tts stays False when provider is not elevenlabs.""" use_streaming_tts = False with patch("tools.tts_tool._load_tts_config", return_value={"provider": "edge"}), \ patch("tools.tts_tool._get_provider", return_value="edge"): try: from tools.tts_tool import ( _load_tts_config as load_cfg, _get_provider as get_prov, _import_elevenlabs as import_el, _import_sounddevice as import_sd, ) cfg = load_cfg() if get_prov(cfg) == "elevenlabs": import_el() import_sd() use_streaming_tts = True except (ImportError, OSError): pass assert use_streaming_tts is False def test_stale_boolean_imports_no_longer_exist(self): """Confirm _HAS_ELEVENLABS and _HAS_AUDIO are not in tts_tool module.""" import tools.tts_tool as tts_mod assert not hasattr(tts_mod, "_HAS_ELEVENLABS"), \ "_HAS_ELEVENLABS should not exist -- lazy imports replaced it" assert not hasattr(tts_mod, "_HAS_AUDIO"), \ "_HAS_AUDIO should not exist -- lazy imports replaced it" # ============================================================================ # Voice mode user message prefix (Bug B fix) # ============================================================================ class TestVoiceMessagePrefix: """Voice mode should inject instruction via user message prefix, not by modifying the system prompt (which breaks prompt cache).""" def test_prefix_added_when_voice_mode_active(self): """When voice mode is active and message is str, agent_message should have the voice instruction prefix.""" voice_mode = True message = "What's the weather like?" agent_message = message if voice_mode and isinstance(message, str): agent_message = ( "[Voice input — respond concisely and conversationally, " "2-3 sentences max. No code blocks or markdown.] " + message ) assert agent_message.startswith("[Voice input") assert "What's the weather like?" in agent_message def test_no_prefix_when_voice_mode_inactive(self): """When voice mode is off, message passes through unchanged.""" voice_mode = False message = "What's the weather like?" agent_message = message if voice_mode and isinstance(message, str): agent_message = ( "[Voice input — respond concisely and conversationally, " "2-3 sentences max. No code blocks or markdown.] " + message ) assert agent_message == message def test_no_prefix_for_multimodal_content(self): """When message is a list (multimodal), no prefix is added.""" voice_mode = True message = [{"type": "text", "text": "describe this"}, {"type": "image_url"}] agent_message = message if voice_mode and isinstance(message, str): agent_message = ( "[Voice input — respond concisely and conversationally, " "2-3 sentences max. No code blocks or markdown.] " + message ) assert agent_message is message def test_history_stays_clean(self): """conversation_history should contain the original message, not the prefixed version.""" voice_mode = True message = "Hello there" conversation_history = [] conversation_history.append({"role": "user", "content": message}) agent_message = message if voice_mode and isinstance(message, str): agent_message = ( "[Voice input — respond concisely and conversationally, " "2-3 sentences max. No code blocks or markdown.] " + message ) assert conversation_history[-1]["content"] == "Hello there" assert agent_message.startswith("[Voice input") assert agent_message != conversation_history[-1]["content"] def test_enable_voice_mode_does_not_modify_system_prompt(self): """_enable_voice_mode should NOT modify self.system_prompt or agent.ephemeral_system_prompt -- the system prompt must stay stable to preserve prompt cache.""" cli = SimpleNamespace( _voice_mode=False, _voice_tts=False, _voice_lock=threading.Lock(), system_prompt="You are helpful", agent=SimpleNamespace(ephemeral_system_prompt="You are helpful"), ) original_system = cli.system_prompt original_ephemeral = cli.agent.ephemeral_system_prompt cli._voice_mode = True assert cli.system_prompt == original_system assert cli.agent.ephemeral_system_prompt == original_ephemeral # ============================================================================ # _vprint force parameter (Minor fix) # ============================================================================ class TestVprintForceParameter: """_vprint should suppress output during streaming TTS unless force=True.""" def _make_agent_with_stream(self, stream_active: bool): """Create a minimal agent-like object with _vprint.""" agent = SimpleNamespace( _stream_callback=MagicMock() if stream_active else None, ) def _vprint(*args, force=False, **kwargs): if not force and getattr(agent, "_stream_callback", None) is not None: return print(*args, **kwargs) agent._vprint = _vprint return agent def test_suppressed_during_streaming(self, capsys): """Normal _vprint output is suppressed when streaming TTS is active.""" agent = self._make_agent_with_stream(stream_active=True) agent._vprint("should be hidden") captured = capsys.readouterr() assert captured.out == "" def test_shown_when_not_streaming(self, capsys): """Normal _vprint output is shown when streaming is not active.""" agent = self._make_agent_with_stream(stream_active=False) agent._vprint("should be shown") captured = capsys.readouterr() assert "should be shown" in captured.out def test_force_shown_during_streaming(self, capsys): """force=True bypasses the streaming suppression.""" agent = self._make_agent_with_stream(stream_active=True) agent._vprint("critical error!", force=True) captured = capsys.readouterr() assert "critical error!" in captured.out def test_force_shown_when_not_streaming(self, capsys): """force=True works normally when not streaming (no regression).""" agent = self._make_agent_with_stream(stream_active=False) agent._vprint("normal message", force=True) captured = capsys.readouterr() assert "normal message" in captured.out def test_error_messages_use_force_in_run_agent(self): """Verify that critical error _vprint calls in run_agent.py include force=True.""" with open("run_agent.py", "r") as f: source = f.read() tree = ast.parse(source) forced_error_count = 0 unforced_error_count = 0 for node in ast.walk(tree): if not isinstance(node, ast.Call): continue func = node.func if not (isinstance(func, ast.Attribute) and func.attr == "_vprint"): continue has_fatal = False for arg in node.args: if isinstance(arg, ast.JoinedStr): for val in arg.values: if isinstance(val, ast.Constant) and isinstance(val.value, str): if "\u274c" in val.value: has_fatal = True break if not has_fatal: continue has_force = any( kw.arg == "force" and isinstance(kw.value, ast.Constant) and kw.value.value is True for kw in node.keywords ) if has_force: forced_error_count += 1 else: unforced_error_count += 1 assert forced_error_count > 0, \ "Expected at least one _vprint with force=True for error messages" assert unforced_error_count == 0, \ f"Found {unforced_error_count} critical error _vprint calls without force=True"