fix: eliminate shell noise from terminal output with fence markers
- Wrap commands with unique fence markers (printf FENCE; cmd; printf FENCE) to isolate real output from shell init/exit noise (oh-my-zsh, macOS session restore/save, docker plugin errors, etc.) - Expand _clean_shell_noise to cover zsh/macOS patterns and strip from both beginning and end (fallback when fences are missing) - Fix BSD find compatibility: fallback to simple find when -printf produces empty output (macOS) - Fix test_terminal_disk_usage: use sys.modules to get the real module instead of the shadowed function from tools/__init__.py - Add 13 new unit tests for fence extraction and zsh noise patterns
This commit is contained in:
parent
3c13feed4c
commit
11615014a4
4 changed files with 168 additions and 21 deletions
|
|
@ -17,7 +17,13 @@ import pytest
|
||||||
|
|
||||||
sys.path.insert(0, str(Path(__file__).resolve().parents[2]))
|
sys.path.insert(0, str(Path(__file__).resolve().parents[2]))
|
||||||
|
|
||||||
from tools.environments.local import LocalEnvironment, _clean_shell_noise, _SHELL_NOISE_SUBSTRINGS
|
from tools.environments.local import (
|
||||||
|
LocalEnvironment,
|
||||||
|
_clean_shell_noise,
|
||||||
|
_extract_fenced_output,
|
||||||
|
_OUTPUT_FENCE,
|
||||||
|
_SHELL_NOISE_SUBSTRINGS,
|
||||||
|
)
|
||||||
from tools.file_operations import ShellFileOperations
|
from tools.file_operations import ShellFileOperations
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -28,6 +34,7 @@ from tools.file_operations import ShellFileOperations
|
||||||
_ALL_NOISE_PATTERNS = list(_SHELL_NOISE_SUBSTRINGS) + [
|
_ALL_NOISE_PATTERNS = list(_SHELL_NOISE_SUBSTRINGS) + [
|
||||||
"bash: ",
|
"bash: ",
|
||||||
"Inappropriate ioctl",
|
"Inappropriate ioctl",
|
||||||
|
"Auto-suggestions:",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -125,11 +132,84 @@ class TestCleanShellNoise:
|
||||||
_assert_clean(result)
|
_assert_clean(result)
|
||||||
|
|
||||||
def test_noise_in_middle_not_stripped(self):
|
def test_noise_in_middle_not_stripped(self):
|
||||||
"""Only LEADING noise is stripped -- noise in the middle is real output."""
|
"""Noise in the middle is real output and should be preserved."""
|
||||||
output = "real\nbash: no job control in this shell\nmore real\n"
|
output = "real\nbash: no job control in this shell\nmore real\n"
|
||||||
result = _clean_shell_noise(output)
|
result = _clean_shell_noise(output)
|
||||||
assert result == output
|
assert result == output
|
||||||
|
|
||||||
|
def test_zsh_restored_session(self):
|
||||||
|
output = "Restored session: Mon Mar 2 22:16:54 +03 2026\nhello\n"
|
||||||
|
result = _clean_shell_noise(output)
|
||||||
|
assert result == "hello\n"
|
||||||
|
|
||||||
|
def test_zsh_saving_session_trailing(self):
|
||||||
|
output = "hello\nSaving session...completed.\n"
|
||||||
|
result = _clean_shell_noise(output)
|
||||||
|
assert result == "hello\n"
|
||||||
|
|
||||||
|
def test_zsh_oh_my_zsh_banner(self):
|
||||||
|
output = "Oh My Zsh on! | Auto-suggestions: press right\nhello\n"
|
||||||
|
result = _clean_shell_noise(output)
|
||||||
|
assert result == "hello\n"
|
||||||
|
|
||||||
|
def test_zsh_full_noise_sandwich(self):
|
||||||
|
"""Both leading and trailing zsh noise stripped."""
|
||||||
|
output = (
|
||||||
|
"Restored session: Mon Mar 2\n"
|
||||||
|
"command not found: docker\n"
|
||||||
|
"Oh My Zsh on!\n"
|
||||||
|
"actual output\n"
|
||||||
|
"Saving session...completed.\n"
|
||||||
|
)
|
||||||
|
result = _clean_shell_noise(output)
|
||||||
|
assert result == "actual output\n"
|
||||||
|
|
||||||
|
def test_last_login_stripped(self):
|
||||||
|
output = "Last login: Mon Mar 2 22:00:00 on ttys001\nhello\n"
|
||||||
|
result = _clean_shell_noise(output)
|
||||||
|
assert result == "hello\n"
|
||||||
|
|
||||||
|
|
||||||
|
# ── _extract_fenced_output unit tests ────────────────────────────────────
|
||||||
|
|
||||||
|
class TestExtractFencedOutput:
|
||||||
|
def test_normal_fenced_output(self):
|
||||||
|
raw = f"noise\n{_OUTPUT_FENCE}hello world\n{_OUTPUT_FENCE}more noise\n"
|
||||||
|
assert _extract_fenced_output(raw) == "hello world\n"
|
||||||
|
|
||||||
|
def test_no_trailing_newline(self):
|
||||||
|
"""printf output with no trailing newline is preserved."""
|
||||||
|
raw = f"noise{_OUTPUT_FENCE}exact{_OUTPUT_FENCE}noise"
|
||||||
|
assert _extract_fenced_output(raw) == "exact"
|
||||||
|
|
||||||
|
def test_no_fences_falls_back(self):
|
||||||
|
"""Without fences, falls back to pattern-based cleaning."""
|
||||||
|
raw = "bash: no job control in this shell\nhello\n"
|
||||||
|
result = _extract_fenced_output(raw)
|
||||||
|
assert result == "hello\n"
|
||||||
|
|
||||||
|
def test_only_start_fence(self):
|
||||||
|
"""Only start fence (e.g. user command called exit)."""
|
||||||
|
raw = f"noise{_OUTPUT_FENCE}hello\nSaving session...\n"
|
||||||
|
result = _extract_fenced_output(raw)
|
||||||
|
assert result == "hello\n"
|
||||||
|
|
||||||
|
def test_user_outputs_fence_string(self):
|
||||||
|
"""If user command outputs the fence marker, it is preserved."""
|
||||||
|
raw = f"noise{_OUTPUT_FENCE}{_OUTPUT_FENCE}real\n{_OUTPUT_FENCE}noise"
|
||||||
|
result = _extract_fenced_output(raw)
|
||||||
|
# first fence -> last fence captures the middle including user's fence
|
||||||
|
assert _OUTPUT_FENCE in result
|
||||||
|
assert "real\n" in result
|
||||||
|
|
||||||
|
def test_empty_command_output(self):
|
||||||
|
raw = f"noise{_OUTPUT_FENCE}{_OUTPUT_FENCE}noise"
|
||||||
|
assert _extract_fenced_output(raw) == ""
|
||||||
|
|
||||||
|
def test_multiline_output(self):
|
||||||
|
raw = f"noise\n{_OUTPUT_FENCE}line1\nline2\nline3\n{_OUTPUT_FENCE}noise\n"
|
||||||
|
assert _extract_fenced_output(raw) == "line1\nline2\nline3\n"
|
||||||
|
|
||||||
|
|
||||||
# ── LocalEnvironment.execute() ───────────────────────────────────────────
|
# ── LocalEnvironment.execute() ───────────────────────────────────────────
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,12 @@ from unittest.mock import patch, MagicMock
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
|
# tools/__init__.py re-exports a *function* called ``terminal_tool`` which
|
||||||
|
# shadows the module of the same name. Use sys.modules to get the real module
|
||||||
|
# so patch.object works correctly.
|
||||||
|
import sys
|
||||||
|
import tools.terminal_tool # noqa: F401 -- ensure module is loaded
|
||||||
|
_tt_mod = sys.modules["tools.terminal_tool"]
|
||||||
from tools.terminal_tool import get_active_environments_info
|
from tools.terminal_tool import get_active_environments_info
|
||||||
|
|
||||||
# 1 MiB of data so the rounded MB value is clearly distinguishable
|
# 1 MiB of data so the rounded MB value is clearly distinguishable
|
||||||
|
|
@ -34,10 +40,8 @@ class TestDiskUsageGlob:
|
||||||
"aaaaaaaa-1111-2222-3333-444444444444": MagicMock(),
|
"aaaaaaaa-1111-2222-3333-444444444444": MagicMock(),
|
||||||
}
|
}
|
||||||
|
|
||||||
with (
|
with patch.object(_tt_mod, "_active_environments", fake_envs), \
|
||||||
patch("tools.terminal_tool._active_environments", fake_envs),
|
patch.object(_tt_mod, "_get_scratch_dir", return_value=fake_scratch):
|
||||||
patch("tools.terminal_tool._get_scratch_dir", return_value=fake_scratch),
|
|
||||||
):
|
|
||||||
info = get_active_environments_info()
|
info = get_active_environments_info()
|
||||||
|
|
||||||
# Task A only: ~1.0 MB. With the bug (hardcoded hermes-*),
|
# Task A only: ~1.0 MB. With the bug (hardcoded hermes-*),
|
||||||
|
|
@ -51,10 +55,8 @@ class TestDiskUsageGlob:
|
||||||
"bbbbbbbb-5555-6666-7777-888888888888": MagicMock(),
|
"bbbbbbbb-5555-6666-7777-888888888888": MagicMock(),
|
||||||
}
|
}
|
||||||
|
|
||||||
with (
|
with patch.object(_tt_mod, "_active_environments", fake_envs), \
|
||||||
patch("tools.terminal_tool._active_environments", fake_envs),
|
patch.object(_tt_mod, "_get_scratch_dir", return_value=fake_scratch):
|
||||||
patch("tools.terminal_tool._get_scratch_dir", return_value=fake_scratch),
|
|
||||||
):
|
|
||||||
info = get_active_environments_info()
|
info = get_active_environments_info()
|
||||||
|
|
||||||
# Should be ~2.0 MB total (1 MB per task).
|
# Should be ~2.0 MB total (1 MB per task).
|
||||||
|
|
|
||||||
|
|
@ -9,28 +9,84 @@ import time
|
||||||
|
|
||||||
from tools.environments.base import BaseEnvironment
|
from tools.environments.base import BaseEnvironment
|
||||||
|
|
||||||
|
# Unique marker to isolate real command output from shell init/exit noise.
|
||||||
|
# printf (no trailing newline) keeps the boundaries clean for splitting.
|
||||||
|
_OUTPUT_FENCE = "__HERMES_FENCE_a9f7b3__"
|
||||||
|
|
||||||
# Noise lines emitted by interactive shells when stdin is not a terminal.
|
# Noise lines emitted by interactive shells when stdin is not a terminal.
|
||||||
# Filtered from output to keep tool results clean.
|
# Used as a fallback when output fence markers are missing.
|
||||||
_SHELL_NOISE_SUBSTRINGS = (
|
_SHELL_NOISE_SUBSTRINGS = (
|
||||||
|
# bash
|
||||||
"bash: cannot set terminal process group",
|
"bash: cannot set terminal process group",
|
||||||
"bash: no job control in this shell",
|
"bash: no job control in this shell",
|
||||||
"no job control in this shell",
|
"no job control in this shell",
|
||||||
"cannot set terminal process group",
|
"cannot set terminal process group",
|
||||||
"tcsetattr: Inappropriate ioctl for device",
|
"tcsetattr: Inappropriate ioctl for device",
|
||||||
|
# zsh / oh-my-zsh / macOS terminal session
|
||||||
|
"Restored session:",
|
||||||
|
"Saving session...",
|
||||||
|
"Last login:",
|
||||||
|
"command not found:",
|
||||||
|
"Oh My Zsh",
|
||||||
|
"compinit:",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def _clean_shell_noise(output: str) -> str:
|
def _clean_shell_noise(output: str) -> str:
|
||||||
"""Strip shell startup warnings that leak when using -i without a TTY.
|
"""Strip shell startup/exit warnings that leak when using -i without a TTY.
|
||||||
|
|
||||||
Removes all leading lines that match known noise patterns, not just the first.
|
Removes lines matching known noise patterns from both the beginning
|
||||||
Some environments emit multiple noise lines (e.g. Docker, non-TTY sessions).
|
and end of the output. Lines in the middle are left untouched.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
def _is_noise(line: str) -> bool:
|
||||||
|
return any(noise in line for noise in _SHELL_NOISE_SUBSTRINGS)
|
||||||
|
|
||||||
lines = output.split("\n")
|
lines = output.split("\n")
|
||||||
# Strip all leading noise lines
|
|
||||||
while lines and any(noise in lines[0] for noise in _SHELL_NOISE_SUBSTRINGS):
|
# Strip leading noise
|
||||||
|
while lines and _is_noise(lines[0]):
|
||||||
lines.pop(0)
|
lines.pop(0)
|
||||||
return "\n".join(lines)
|
|
||||||
|
# Strip trailing noise (walk backwards, skip empty lines from split)
|
||||||
|
end = len(lines) - 1
|
||||||
|
while end >= 0 and (not lines[end] or _is_noise(lines[end])):
|
||||||
|
end -= 1
|
||||||
|
|
||||||
|
if end < 0:
|
||||||
|
return ""
|
||||||
|
|
||||||
|
cleaned = lines[: end + 1]
|
||||||
|
result = "\n".join(cleaned)
|
||||||
|
|
||||||
|
# Preserve trailing newline if original had one
|
||||||
|
if output.endswith("\n") and result and not result.endswith("\n"):
|
||||||
|
result += "\n"
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
def _extract_fenced_output(raw: str) -> str:
|
||||||
|
"""Extract real command output from between fence markers.
|
||||||
|
|
||||||
|
The execute() method wraps each command with printf(FENCE) markers.
|
||||||
|
This function finds the first and last fence and returns only the
|
||||||
|
content between them, which is the actual command output free of
|
||||||
|
any shell init/exit noise.
|
||||||
|
|
||||||
|
Falls back to pattern-based _clean_shell_noise if fences are missing.
|
||||||
|
"""
|
||||||
|
first = raw.find(_OUTPUT_FENCE)
|
||||||
|
if first == -1:
|
||||||
|
return _clean_shell_noise(raw)
|
||||||
|
|
||||||
|
start = first + len(_OUTPUT_FENCE)
|
||||||
|
last = raw.rfind(_OUTPUT_FENCE)
|
||||||
|
|
||||||
|
if last <= first:
|
||||||
|
# Only start fence found (e.g. user command called `exit`)
|
||||||
|
return _clean_shell_noise(raw[start:])
|
||||||
|
|
||||||
|
return raw[start:last]
|
||||||
|
|
||||||
|
|
||||||
class LocalEnvironment(BaseEnvironment):
|
class LocalEnvironment(BaseEnvironment):
|
||||||
|
|
@ -64,8 +120,17 @@ class LocalEnvironment(BaseEnvironment):
|
||||||
# -l alone isn't enough: .profile sources .bashrc, but the guard
|
# -l alone isn't enough: .profile sources .bashrc, but the guard
|
||||||
# returns early because the shell isn't interactive.
|
# returns early because the shell isn't interactive.
|
||||||
user_shell = os.environ.get("SHELL") or shutil.which("bash") or "/bin/bash"
|
user_shell = os.environ.get("SHELL") or shutil.which("bash") or "/bin/bash"
|
||||||
|
# Wrap with output fences so we can later extract the real
|
||||||
|
# command output and discard shell init/exit noise.
|
||||||
|
fenced_cmd = (
|
||||||
|
f"printf '{_OUTPUT_FENCE}';"
|
||||||
|
f" {exec_command};"
|
||||||
|
f" __hermes_rc=$?;"
|
||||||
|
f" printf '{_OUTPUT_FENCE}';"
|
||||||
|
f" exit $__hermes_rc"
|
||||||
|
)
|
||||||
proc = subprocess.Popen(
|
proc = subprocess.Popen(
|
||||||
[user_shell, "-lic", exec_command],
|
[user_shell, "-lic", fenced_cmd],
|
||||||
text=True,
|
text=True,
|
||||||
cwd=work_dir,
|
cwd=work_dir,
|
||||||
env=os.environ | self.env,
|
env=os.environ | self.env,
|
||||||
|
|
@ -130,7 +195,7 @@ class LocalEnvironment(BaseEnvironment):
|
||||||
time.sleep(0.2)
|
time.sleep(0.2)
|
||||||
|
|
||||||
reader.join(timeout=5)
|
reader.join(timeout=5)
|
||||||
output = _clean_shell_noise("".join(_output_chunks))
|
output = _extract_fenced_output("".join(_output_chunks))
|
||||||
return {"output": output, "returncode": proc.returncode}
|
return {"output": output, "returncode": proc.returncode}
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|
|
||||||
|
|
@ -848,8 +848,8 @@ class ShellFileOperations(FileOperations):
|
||||||
|
|
||||||
result = self._exec(cmd, timeout=60)
|
result = self._exec(cmd, timeout=60)
|
||||||
|
|
||||||
if result.exit_code != 0 and not result.stdout.strip():
|
if not result.stdout.strip():
|
||||||
# Try without -printf (BSD find compatibility)
|
# Try without -printf (BSD find compatibility -- macOS)
|
||||||
cmd_simple = f"find {self._escape_shell_arg(path)} -type f -name {self._escape_shell_arg(search_pattern)} " \
|
cmd_simple = f"find {self._escape_shell_arg(path)} -type f -name {self._escape_shell_arg(search_pattern)} " \
|
||||||
f"2>/dev/null | head -n {limit + offset} | tail -n +{offset + 1}"
|
f"2>/dev/null | head -n {limit + offset} | tail -n +{offset + 1}"
|
||||||
result = self._exec(cmd_simple, timeout=60)
|
result = self._exec(cmd_simple, timeout=60)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue