test: reorganize test structure and add missing unit tests

Reorganize flat tests/ directory to mirror source code structure
(tools/, gateway/, hermes_cli/, integration/). Add 11 new test files
covering previously untested modules: registry, patch_parser,
fuzzy_match, todo_tool, approval, file_tools, gateway session/config/
delivery, and hermes_cli config/models. Total: 147 unit tests passing,
9 integration tests gated behind pytest marker.
This commit is contained in:
0xbyt4 2026-02-26 03:20:08 +03:00
parent 3c5bf5b9d8
commit 8fc28c34ce
24 changed files with 1066 additions and 16 deletions

View file

View file

@ -0,0 +1,132 @@
#!/usr/bin/env python3
"""
Test script for batch runner
This script tests the batch runner with a small sample dataset
to verify functionality before running large batches.
"""
import pytest
pytestmark = pytest.mark.integration
import json
import shutil
from pathlib import Path
def create_test_dataset():
"""Create a small test dataset."""
test_file = Path("tests/test_dataset.jsonl")
test_file.parent.mkdir(exist_ok=True)
prompts = [
{"prompt": "What is 2 + 2?"},
{"prompt": "What is the capital of France?"},
{"prompt": "Explain what Python is in one sentence."},
]
with open(test_file, 'w') as f:
for prompt in prompts:
f.write(json.dumps(prompt, ensure_ascii=False) + "\n")
print(f"✅ Created test dataset: {test_file}")
return test_file
def cleanup_test_run(run_name):
"""Clean up test run output."""
output_dir = Path("data") / run_name
if output_dir.exists():
shutil.rmtree(output_dir)
print(f"🗑️ Cleaned up test output: {output_dir}")
def verify_output(run_name):
"""Verify that output files were created correctly."""
output_dir = Path("data") / run_name
# Check directory exists
if not output_dir.exists():
print(f"❌ Output directory not found: {output_dir}")
return False
# Check for checkpoint
checkpoint_file = output_dir / "checkpoint.json"
if not checkpoint_file.exists():
print(f"❌ Checkpoint file not found: {checkpoint_file}")
return False
# Check for statistics
stats_file = output_dir / "statistics.json"
if not stats_file.exists():
print(f"❌ Statistics file not found: {stats_file}")
return False
# Check for batch files
batch_files = list(output_dir.glob("batch_*.jsonl"))
if not batch_files:
print(f"❌ No batch files found in: {output_dir}")
return False
print(f"✅ Output verification passed:")
print(f" - Checkpoint: {checkpoint_file}")
print(f" - Statistics: {stats_file}")
print(f" - Batch files: {len(batch_files)}")
# Load and display statistics
with open(stats_file) as f:
stats = json.load(f)
print(f"\n📊 Statistics Summary:")
print(f" - Total prompts: {stats['total_prompts']}")
print(f" - Total batches: {stats['total_batches']}")
print(f" - Duration: {stats['duration_seconds']}s")
if stats.get('tool_statistics'):
print(f" - Tool calls:")
for tool, tool_stats in stats['tool_statistics'].items():
print(f"{tool}: {tool_stats['count']} calls, {tool_stats['success_rate']:.1f}% success")
return True
def main():
"""Run the test."""
print("🧪 Batch Runner Test")
print("=" * 60)
run_name = "test_run"
# Clean up any previous test run
cleanup_test_run(run_name)
# Create test dataset
test_file = create_test_dataset()
print(f"\n📝 To run the test manually:")
print(f" python batch_runner.py \\")
print(f" --dataset_file={test_file} \\")
print(f" --batch_size=2 \\")
print(f" --run_name={run_name} \\")
print(f" --distribution=minimal \\")
print(f" --num_workers=2")
print(f"\n💡 Or test with different distributions:")
print(f" python batch_runner.py --list_distributions")
print(f"\n🔍 After running, you can verify output with:")
print(f" python tests/test_batch_runner.py --verify")
# Note: We don't actually run the batch runner here to avoid API calls during testing
# Users should run it manually with their API keys configured
if __name__ == "__main__":
import sys
if "--verify" in sys.argv:
run_name = "test_run"
verify_output(run_name)
else:
main()

View file

@ -0,0 +1,440 @@
#!/usr/bin/env python3
"""
Test script to verify checkpoint behavior in batch_runner.py
This script simulates batch processing with intentional failures to test:
1. Whether checkpoints are saved incrementally during processing
2. Whether resume functionality works correctly after interruption
3. Whether data integrity is maintained across checkpoint cycles
Usage:
# Test current implementation
python tests/test_checkpoint_resumption.py --test_current
# Test after fix is applied
python tests/test_checkpoint_resumption.py --test_fixed
# Run full comparison
python tests/test_checkpoint_resumption.py --compare
"""
import pytest
pytestmark = pytest.mark.integration
import json
import os
import shutil
import sys
import time
from pathlib import Path
from typing import List, Dict, Any
import traceback
# Add project root to path to import batch_runner
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
def create_test_dataset(num_prompts: int = 20) -> Path:
"""Create a small test dataset for checkpoint testing."""
test_data_dir = Path("tests/test_data")
test_data_dir.mkdir(parents=True, exist_ok=True)
dataset_file = test_data_dir / "checkpoint_test_dataset.jsonl"
with open(dataset_file, 'w', encoding='utf-8') as f:
for i in range(num_prompts):
entry = {
"prompt": f"Test prompt {i}: What is 2+2? Just answer briefly.",
"test_id": i
}
f.write(json.dumps(entry, ensure_ascii=False) + "\n")
print(f"✅ Created test dataset: {dataset_file} ({num_prompts} prompts)")
return dataset_file
def monitor_checkpoint_during_run(checkpoint_file: Path, duration: int = 30) -> List[Dict[str, Any]]:
"""
Monitor checkpoint file during a batch run to see when it gets updated.
Args:
checkpoint_file: Path to checkpoint file to monitor
duration: How long to monitor (seconds)
Returns:
List of checkpoint snapshots with timestamps
"""
snapshots = []
start_time = time.time()
last_mtime = None
print(f"\n🔍 Monitoring checkpoint file: {checkpoint_file}")
print(f" Duration: {duration}s")
print("-" * 70)
while time.time() - start_time < duration:
if checkpoint_file.exists():
current_mtime = checkpoint_file.stat().st_mtime
# Check if file was modified
if last_mtime is None or current_mtime != last_mtime:
elapsed = time.time() - start_time
try:
with open(checkpoint_file, 'r') as f:
checkpoint_data = json.load(f)
snapshot = {
"elapsed_seconds": round(elapsed, 2),
"completed_count": len(checkpoint_data.get("completed_prompts", [])),
"completed_prompts": checkpoint_data.get("completed_prompts", [])[:5], # First 5 for display
"timestamp": checkpoint_data.get("last_updated")
}
snapshots.append(snapshot)
print(f"[{elapsed:6.2f}s] Checkpoint updated: {snapshot['completed_count']} prompts completed")
except Exception as e:
print(f"[{elapsed:6.2f}s] Error reading checkpoint: {e}")
last_mtime = current_mtime
else:
if len(snapshots) == 0:
print(f"[{time.time() - start_time:6.2f}s] Checkpoint file not yet created...")
time.sleep(0.5) # Check every 0.5 seconds
return snapshots
def _cleanup_test_artifacts(*paths):
"""Remove test-generated files and directories."""
for p in paths:
p = Path(p)
if p.is_dir():
shutil.rmtree(p, ignore_errors=True)
elif p.is_file():
p.unlink(missing_ok=True)
def test_current_implementation():
"""Test the current checkpoint implementation."""
print("\n" + "=" * 70)
print("TEST 1: Current Implementation - Checkpoint Timing")
print("=" * 70)
print("\n📝 Testing whether checkpoints are saved incrementally during run...")
# Setup
dataset_file = create_test_dataset(num_prompts=12)
run_name = "checkpoint_test_current"
output_dir = Path("data") / run_name
# Clean up any existing test data
if output_dir.exists():
shutil.rmtree(output_dir)
# Import here to avoid issues if module changes
from batch_runner import BatchRunner
checkpoint_file = output_dir / "checkpoint.json"
# Start monitoring in a separate process would be ideal, but for simplicity
# we'll just check before and after
print(f"\n▶️ Starting batch run...")
print(f" Dataset: {dataset_file}")
print(f" Batch size: 3 (4 batches total)")
print(f" Workers: 2")
print(f" Expected behavior: If incremental, checkpoint should update during run")
start_time = time.time()
try:
runner = BatchRunner(
dataset_file=str(dataset_file),
batch_size=3,
run_name=run_name,
distribution="default",
max_iterations=3, # Keep it short
model="claude-opus-4-20250514",
num_workers=2,
verbose=False
)
# Run with monitoring
import threading
snapshots = []
def monitor():
nonlocal snapshots
snapshots = monitor_checkpoint_during_run(checkpoint_file, duration=60)
monitor_thread = threading.Thread(target=monitor, daemon=True)
monitor_thread.start()
runner.run(resume=False)
monitor_thread.join(timeout=2)
except Exception as e:
print(f"❌ Error during run: {e}")
traceback.print_exc()
return False
finally:
_cleanup_test_artifacts(dataset_file, output_dir)
elapsed = time.time() - start_time
# Analyze results
print("\n" + "=" * 70)
print("📊 TEST RESULTS")
print("=" * 70)
print(f"Total run time: {elapsed:.2f}s")
print(f"Checkpoint updates observed: {len(snapshots)}")
if len(snapshots) == 0:
print("\n❌ ISSUE: No checkpoint updates observed during run")
print(" This suggests checkpoints are only saved at the end")
return False
elif len(snapshots) == 1:
print("\n⚠️ WARNING: Only 1 checkpoint update (likely at the end)")
print(" This confirms the bug - no incremental checkpointing")
return False
else:
print(f"\n✅ GOOD: Multiple checkpoint updates ({len(snapshots)}) observed")
print(" Checkpointing appears to be incremental")
# Show timeline
print("\n📈 Checkpoint Timeline:")
for i, snapshot in enumerate(snapshots, 1):
print(f" {i}. [{snapshot['elapsed_seconds']:6.2f}s] "
f"{snapshot['completed_count']} prompts completed")
return True
def test_interruption_and_resume():
"""Test that resume actually works after interruption."""
print("\n" + "=" * 70)
print("TEST 2: Interruption and Resume")
print("=" * 70)
print("\n📝 Testing whether resume works after manual interruption...")
# Setup
dataset_file = create_test_dataset(num_prompts=15)
run_name = "checkpoint_test_resume"
output_dir = Path("data") / run_name
# Clean up any existing test data
if output_dir.exists():
shutil.rmtree(output_dir)
from batch_runner import BatchRunner
checkpoint_file = output_dir / "checkpoint.json"
print(f"\n▶️ Starting first run (will process 5 prompts, then simulate interruption)...")
temp_dataset = Path("tests/test_data/checkpoint_test_resume_partial.jsonl")
try:
# Create a modified dataset with only first 5 prompts for initial run
with open(dataset_file, 'r') as f:
lines = f.readlines()[:5]
with open(temp_dataset, 'w') as f:
f.writelines(lines)
runner = BatchRunner(
dataset_file=str(temp_dataset),
batch_size=2,
run_name=run_name,
distribution="default",
max_iterations=3,
model="claude-opus-4-20250514",
num_workers=1,
verbose=False
)
runner.run(resume=False)
# Check checkpoint after first run
if not checkpoint_file.exists():
print("❌ ERROR: Checkpoint file not created after first run")
return False
with open(checkpoint_file, 'r') as f:
checkpoint_data = json.load(f)
initial_completed = len(checkpoint_data.get("completed_prompts", []))
print(f"✅ First run completed: {initial_completed} prompts saved to checkpoint")
# Now try to resume with full dataset
print(f"\n▶️ Starting resume run with full dataset (15 prompts)...")
runner2 = BatchRunner(
dataset_file=str(dataset_file),
batch_size=2,
run_name=run_name,
distribution="default",
max_iterations=3,
model="claude-opus-4-20250514",
num_workers=1,
verbose=False
)
runner2.run(resume=True)
# Check final checkpoint
with open(checkpoint_file, 'r') as f:
final_checkpoint = json.load(f)
final_completed = len(final_checkpoint.get("completed_prompts", []))
print("\n" + "=" * 70)
print("📊 TEST RESULTS")
print("=" * 70)
print(f"Initial completed: {initial_completed}")
print(f"Final completed: {final_completed}")
print(f"Expected: 15")
if final_completed == 15:
print("\n✅ PASS: Resume successfully completed all prompts")
return True
else:
print(f"\n❌ FAIL: Expected 15 completed, got {final_completed}")
return False
except Exception as e:
print(f"❌ Error during test: {e}")
traceback.print_exc()
return False
finally:
_cleanup_test_artifacts(dataset_file, temp_dataset, output_dir)
def test_simulated_crash():
"""Test behavior when process crashes mid-execution."""
print("\n" + "=" * 70)
print("TEST 3: Simulated Crash During Execution")
print("=" * 70)
print("\n📝 This test would require running in a subprocess and killing it...")
print(" Skipping for safety - manual testing recommended")
return None
def print_test_plan():
"""Print the detailed test and fix plan."""
print("\n" + "=" * 70)
print("CHECKPOINT FIX - DETAILED PLAN")
print("=" * 70)
print("""
📋 PROBLEM SUMMARY
------------------
Current implementation uses pool.map() which blocks until ALL batches complete.
Checkpoint is only saved after all batches finish (line 558-559).
If process crashes during batch processing:
- All progress is lost
- Resume does nothing (no incremental checkpoint was saved)
📋 PROPOSED SOLUTION
--------------------
Replace pool.map() with pool.imap_unordered() to get results as they complete.
Save checkpoint after EACH batch completes using a multiprocessing Lock.
Key changes:
1. Use Manager().Lock() for thread-safe checkpoint writes
2. Replace pool.map() with pool.imap_unordered()
3. Update checkpoint after each batch result
4. Maintain backward compatibility with existing checkpoints
📋 IMPLEMENTATION STEPS
-----------------------
1. Add Manager and Lock initialization before Pool creation
2. Pass shared checkpoint data and lock to workers (via Manager)
3. Replace pool.map() with pool.imap_unordered()
4. In result loop: save checkpoint after each batch
5. Add error handling for checkpoint write failures
📋 RISKS & MITIGATIONS
----------------------
Risk: Checkpoint file corruption if two processes write simultaneously
Mitigation: Use multiprocessing.Lock() for exclusive access
Risk: Performance impact from frequent checkpoint writes
Mitigation: Checkpoint writes are fast (small JSON), negligible impact
Risk: Breaking existing runs that are already checkpointed
Mitigation: Maintain checkpoint format, only change timing
Risk: Bugs in multiprocessing lock/manager code
Mitigation: Thorough testing with this test script
📋 TESTING STRATEGY
-------------------
1. Run test_current_implementation() - Confirm bug exists
2. Apply fix to batch_runner.py
3. Run test_current_implementation() again - Should see incremental updates
4. Run test_interruption_and_resume() - Verify resume works
5. Manual test: Start run, kill process mid-batch, resume
📋 ROLLBACK PLAN
----------------
If issues arise:
1. Git revert the changes
2. Original code is working (just missing incremental checkpoint)
3. No data corruption risk - checkpoints are write-only
""")
def main(
test_current: bool = False,
test_resume: bool = False,
test_crash: bool = False,
compare: bool = False,
show_plan: bool = False
):
"""
Run checkpoint behavior tests.
Args:
test_current: Test current implementation checkpoint timing
test_resume: Test interruption and resume functionality
test_crash: Test simulated crash scenario (manual)
compare: Run all tests and compare
show_plan: Show detailed fix plan
"""
if show_plan or (not any([test_current, test_resume, test_crash, compare])):
print_test_plan()
return
results = {}
if test_current or compare:
results['current'] = test_current_implementation()
if test_resume or compare:
results['resume'] = test_interruption_and_resume()
if test_crash or compare:
results['crash'] = test_simulated_crash()
# Summary
if results:
print("\n" + "=" * 70)
print("OVERALL TEST SUMMARY")
print("=" * 70)
for test_name, result in results.items():
if result is None:
status = "⏭️ SKIPPED"
elif result:
status = "✅ PASS"
else:
status = "❌ FAIL"
print(f"{status} - {test_name}")
if __name__ == "__main__":
import fire
fire.Fire(main)

View file

@ -0,0 +1,302 @@
#!/usr/bin/env python3
"""
Test Modal Terminal Tool
This script tests that the Modal terminal backend is correctly configured
and can execute commands in Modal sandboxes.
Usage:
# Run with Modal backend
TERMINAL_ENV=modal python tests/test_modal_terminal.py
# Or run directly (will use whatever TERMINAL_ENV is set in .env)
python tests/test_modal_terminal.py
"""
import pytest
pytestmark = pytest.mark.integration
import os
import sys
import json
from pathlib import Path
# Try to load .env file if python-dotenv is available
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
# Manually load .env if dotenv not available
env_file = Path(__file__).parent.parent.parent / ".env"
if env_file.exists():
with open(env_file) as f:
for line in f:
line = line.strip()
if line and not line.startswith('#') and '=' in line:
key, value = line.split('=', 1)
# Remove quotes if present
value = value.strip().strip('"').strip("'")
os.environ.setdefault(key.strip(), value)
# Add project root to path for imports
parent_dir = Path(__file__).parent.parent.parent
sys.path.insert(0, str(parent_dir))
sys.path.insert(0, str(parent_dir / "mini-swe-agent" / "src"))
# Import terminal_tool module directly using importlib to avoid tools/__init__.py
import importlib.util
terminal_tool_path = parent_dir / "tools" / "terminal_tool.py"
spec = importlib.util.spec_from_file_location("terminal_tool", terminal_tool_path)
terminal_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(terminal_module)
terminal_tool = terminal_module.terminal_tool
check_terminal_requirements = terminal_module.check_terminal_requirements
_get_env_config = terminal_module._get_env_config
cleanup_vm = terminal_module.cleanup_vm
get_active_environments_info = terminal_module.get_active_environments_info
def test_modal_requirements():
"""Test that Modal requirements are met."""
print("\n" + "=" * 60)
print("TEST 1: Modal Requirements Check")
print("=" * 60)
config = _get_env_config()
print(f"Current TERMINAL_ENV: {config['env_type']}")
print(f"Modal image: {config['modal_image']}")
# Check for Modal authentication
modal_token = os.getenv("MODAL_TOKEN_ID")
modal_toml = Path.home() / ".modal.toml"
print(f"\nModal authentication:")
print(f" MODAL_TOKEN_ID env var: {'✅ Set' if modal_token else '❌ Not set'}")
print(f" ~/.modal.toml file: {'✅ Exists' if modal_toml.exists() else '❌ Not found'}")
if config['env_type'] != 'modal':
print(f"\n⚠️ TERMINAL_ENV is '{config['env_type']}', not 'modal'")
print(" Set TERMINAL_ENV=modal in .env or export it to test Modal backend")
return False
requirements_met = check_terminal_requirements()
print(f"\nRequirements check: {'✅ Passed' if requirements_met else '❌ Failed'}")
return requirements_met
def test_simple_command():
"""Test executing a simple command."""
print("\n" + "=" * 60)
print("TEST 2: Simple Command Execution")
print("=" * 60)
test_task_id = "modal_test_simple"
print("Executing: echo 'Hello from Modal!'")
result = terminal_tool("echo 'Hello from Modal!'", task_id=test_task_id)
result_json = json.loads(result)
print(f"\nResult:")
print(f" Output: {result_json.get('output', '')[:200]}")
print(f" Exit code: {result_json.get('exit_code')}")
print(f" Error: {result_json.get('error')}")
success = result_json.get('exit_code') == 0 and 'Hello from Modal!' in result_json.get('output', '')
print(f"\nTest: {'✅ Passed' if success else '❌ Failed'}")
# Cleanup
cleanup_vm(test_task_id)
return success
def test_python_execution():
"""Test executing Python code in Modal."""
print("\n" + "=" * 60)
print("TEST 3: Python Execution")
print("=" * 60)
test_task_id = "modal_test_python"
python_cmd = 'python3 -c "import sys; print(f\'Python {sys.version}\')"'
print(f"Executing: {python_cmd}")
result = terminal_tool(python_cmd, task_id=test_task_id)
result_json = json.loads(result)
print(f"\nResult:")
print(f" Output: {result_json.get('output', '')[:200]}")
print(f" Exit code: {result_json.get('exit_code')}")
print(f" Error: {result_json.get('error')}")
success = result_json.get('exit_code') == 0 and 'Python' in result_json.get('output', '')
print(f"\nTest: {'✅ Passed' if success else '❌ Failed'}")
# Cleanup
cleanup_vm(test_task_id)
return success
def test_pip_install():
"""Test installing a package with pip in Modal."""
print("\n" + "=" * 60)
print("TEST 4: Pip Install Test")
print("=" * 60)
test_task_id = "modal_test_pip"
# Install a small package and verify
print("Executing: pip install --break-system-packages cowsay && python3 -c \"import cowsay; cowsay.cow('Modal works!')\"")
result = terminal_tool(
"pip install --break-system-packages cowsay && python3 -c \"import cowsay; cowsay.cow('Modal works!')\"",
task_id=test_task_id,
timeout=120
)
result_json = json.loads(result)
print(f"\nResult:")
output = result_json.get('output', '')
print(f" Output (last 500 chars): ...{output[-500:] if len(output) > 500 else output}")
print(f" Exit code: {result_json.get('exit_code')}")
print(f" Error: {result_json.get('error')}")
success = result_json.get('exit_code') == 0 and 'Modal works!' in result_json.get('output', '')
print(f"\nTest: {'✅ Passed' if success else '❌ Failed'}")
# Cleanup
cleanup_vm(test_task_id)
return success
def test_filesystem_persistence():
"""Test that filesystem persists between commands in the same task."""
print("\n" + "=" * 60)
print("TEST 5: Filesystem Persistence")
print("=" * 60)
test_task_id = "modal_test_persist"
# Create a file
print("Step 1: Creating test file...")
result1 = terminal_tool("echo 'persistence test' > /tmp/modal_test.txt", task_id=test_task_id)
result1_json = json.loads(result1)
print(f" Exit code: {result1_json.get('exit_code')}")
# Read the file back
print("Step 2: Reading test file...")
result2 = terminal_tool("cat /tmp/modal_test.txt", task_id=test_task_id)
result2_json = json.loads(result2)
print(f" Output: {result2_json.get('output', '')}")
print(f" Exit code: {result2_json.get('exit_code')}")
success = (
result1_json.get('exit_code') == 0 and
result2_json.get('exit_code') == 0 and
'persistence test' in result2_json.get('output', '')
)
print(f"\nTest: {'✅ Passed' if success else '❌ Failed'}")
# Cleanup
cleanup_vm(test_task_id)
return success
def test_environment_isolation():
"""Test that different task_ids get isolated environments."""
print("\n" + "=" * 60)
print("TEST 6: Environment Isolation")
print("=" * 60)
task1 = "modal_test_iso_1"
task2 = "modal_test_iso_2"
# Create file in task1
print("Step 1: Creating file in task1...")
result1 = terminal_tool("echo 'task1 data' > /tmp/isolated.txt", task_id=task1)
# Try to read from task2 (should not exist)
print("Step 2: Trying to read file from task2 (should not exist)...")
result2 = terminal_tool("cat /tmp/isolated.txt 2>&1 || echo 'FILE_NOT_FOUND'", task_id=task2)
result2_json = json.loads(result2)
# The file should either not exist or be empty in task2
output = result2_json.get('output', '')
isolated = 'task1 data' not in output or 'FILE_NOT_FOUND' in output or 'No such file' in output
print(f" Task2 output: {output[:200]}")
print(f"\nTest: {'✅ Passed (environments isolated)' if isolated else '❌ Failed (environments NOT isolated)'}")
# Cleanup
cleanup_vm(task1)
cleanup_vm(task2)
return isolated
def main():
"""Run all Modal terminal tests."""
print("🧪 Modal Terminal Tool Test Suite")
print("=" * 60)
# Check current config
config = _get_env_config()
print(f"\nCurrent configuration:")
print(f" TERMINAL_ENV: {config['env_type']}")
print(f" TERMINAL_MODAL_IMAGE: {config['modal_image']}")
print(f" TERMINAL_TIMEOUT: {config['timeout']}s")
if config['env_type'] != 'modal':
print(f"\n⚠️ WARNING: TERMINAL_ENV is set to '{config['env_type']}', not 'modal'")
print(" To test Modal specifically, set TERMINAL_ENV=modal")
response = input("\n Continue testing with current backend? (y/n): ")
if response.lower() != 'y':
print("Aborting.")
return
results = {}
# Run tests
results['requirements'] = test_modal_requirements()
if not results['requirements']:
print("\n❌ Requirements not met. Cannot continue with other tests.")
return
results['simple_command'] = test_simple_command()
results['python_execution'] = test_python_execution()
results['pip_install'] = test_pip_install()
results['filesystem_persistence'] = test_filesystem_persistence()
results['environment_isolation'] = test_environment_isolation()
# Summary
print("\n" + "=" * 60)
print("TEST SUMMARY")
print("=" * 60)
passed = sum(1 for v in results.values() if v)
total = len(results)
for test_name, passed_test in results.items():
status = "✅ PASSED" if passed_test else "❌ FAILED"
print(f" {test_name}: {status}")
print(f"\nTotal: {passed}/{total} tests passed")
# Show active environments
env_info = get_active_environments_info()
print(f"\nActive environments after tests: {env_info['count']}")
if env_info['count'] > 0:
print(f" Task IDs: {env_info['task_ids']}")
return passed == total
if __name__ == "__main__":
success = main()
sys.exit(0 if success else 1)

View file

@ -0,0 +1,623 @@
#!/usr/bin/env python3
"""
Comprehensive Test Suite for Web Tools Module
This script tests all web tools functionality to ensure they work correctly.
Run this after any updates to the web_tools.py module or Firecrawl library.
Usage:
python test_web_tools.py # Run all tests
python test_web_tools.py --no-llm # Skip LLM processing tests
python test_web_tools.py --verbose # Show detailed output
Requirements:
- FIRECRAWL_API_KEY environment variable must be set
- NOUS_API_KEY environment variable (optional, for LLM tests)
"""
import pytest
pytestmark = pytest.mark.integration
import json
import asyncio
import sys
import os
import argparse
from datetime import datetime
from typing import List
# Import the web tools to test (updated path after moving tools/)
from tools.web_tools import (
web_search_tool,
web_extract_tool,
web_crawl_tool,
check_firecrawl_api_key,
check_auxiliary_model,
get_debug_session_info
)
class Colors:
"""ANSI color codes for terminal output"""
HEADER = '\033[95m'
BLUE = '\033[94m'
CYAN = '\033[96m'
GREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
def print_header(text: str):
"""Print a formatted header"""
print(f"\n{Colors.HEADER}{Colors.BOLD}{'='*60}{Colors.ENDC}")
print(f"{Colors.HEADER}{Colors.BOLD}{text}{Colors.ENDC}")
print(f"{Colors.HEADER}{Colors.BOLD}{'='*60}{Colors.ENDC}")
def print_section(text: str):
"""Print a formatted section header"""
print(f"\n{Colors.CYAN}{Colors.BOLD}📌 {text}{Colors.ENDC}")
print(f"{Colors.CYAN}{'-'*50}{Colors.ENDC}")
def print_success(text: str):
"""Print success message"""
print(f"{Colors.GREEN}{text}{Colors.ENDC}")
def print_error(text: str):
"""Print error message"""
print(f"{Colors.FAIL}{text}{Colors.ENDC}")
def print_warning(text: str):
"""Print warning message"""
print(f"{Colors.WARNING}⚠️ {text}{Colors.ENDC}")
def print_info(text: str, indent: int = 0):
"""Print info message"""
indent_str = " " * indent
print(f"{indent_str}{Colors.BLUE} {text}{Colors.ENDC}")
class WebToolsTester:
"""Test suite for web tools"""
def __init__(self, verbose: bool = False, test_llm: bool = True):
self.verbose = verbose
self.test_llm = test_llm
self.test_results = {
"passed": [],
"failed": [],
"skipped": []
}
self.start_time = None
self.end_time = None
def log_result(self, test_name: str, status: str, details: str = ""):
"""Log test result"""
result = {
"test": test_name,
"status": status,
"details": details,
"timestamp": datetime.now().isoformat()
}
if status == "passed":
self.test_results["passed"].append(result)
print_success(f"{test_name}: {details}" if details else test_name)
elif status == "failed":
self.test_results["failed"].append(result)
print_error(f"{test_name}: {details}" if details else test_name)
elif status == "skipped":
self.test_results["skipped"].append(result)
print_warning(f"{test_name} skipped: {details}" if details else f"{test_name} skipped")
def test_environment(self) -> bool:
"""Test environment setup and API keys"""
print_section("Environment Check")
# Check Firecrawl API key
if not check_firecrawl_api_key():
self.log_result("Firecrawl API Key", "failed", "FIRECRAWL_API_KEY not set")
return False
else:
self.log_result("Firecrawl API Key", "passed", "Found")
# Check Nous API key (optional)
if not check_auxiliary_model():
self.log_result("Nous API Key", "skipped", "NOUS_API_KEY not set (LLM tests will be skipped)")
self.test_llm = False
else:
self.log_result("Nous API Key", "passed", "Found")
# Check debug mode
debug_info = get_debug_session_info()
if debug_info["enabled"]:
print_info(f"Debug mode enabled - Session: {debug_info['session_id']}")
print_info(f"Debug log: {debug_info['log_path']}")
return True
def test_web_search(self) -> List[str]:
"""Test web search functionality"""
print_section("Test 1: Web Search")
test_queries = [
("Python web scraping tutorial", 5),
("Firecrawl API documentation", 3),
("inflammatory arthritis symptoms treatment", 8) # Test medical query from your example
]
extracted_urls = []
for query, limit in test_queries:
try:
print(f"\n Testing search: '{query}' (limit={limit})")
if self.verbose:
print(f" Calling web_search_tool(query='{query}', limit={limit})")
# Perform search
result = web_search_tool(query, limit)
# Parse result
try:
data = json.loads(result)
except json.JSONDecodeError as e:
self.log_result(f"Search: {query[:30]}...", "failed", f"Invalid JSON: {e}")
if self.verbose:
print(f" Raw response (first 500 chars): {result[:500]}...")
continue
if "error" in data:
self.log_result(f"Search: {query[:30]}...", "failed", f"API error: {data['error']}")
continue
# Check structure
if "success" not in data or "data" not in data:
self.log_result(f"Search: {query[:30]}...", "failed", "Missing success or data fields")
if self.verbose:
print(f" Response keys: {list(data.keys())}")
continue
web_results = data.get("data", {}).get("web", [])
if not web_results:
self.log_result(f"Search: {query[:30]}...", "failed", "Empty web results array")
if self.verbose:
print(f" data.web content: {data.get('data', {}).get('web')}")
continue
# Validate each result
valid_results = 0
missing_fields = []
for i, result in enumerate(web_results):
required_fields = ["url", "title", "description"]
has_all_fields = all(key in result for key in required_fields)
if has_all_fields:
valid_results += 1
# Collect URLs for extraction test
if len(extracted_urls) < 3:
extracted_urls.append(result["url"])
if self.verbose:
print(f" Result {i+1}: ✓ {result['title'][:50]}...")
print(f" URL: {result['url'][:60]}...")
else:
missing = [f for f in required_fields if f not in result]
missing_fields.append(f"Result {i+1} missing: {missing}")
if self.verbose:
print(f" Result {i+1}: ✗ Missing fields: {missing}")
# Log results
if valid_results == len(web_results):
self.log_result(
f"Search: {query[:30]}...",
"passed",
f"All {valid_results} results valid"
)
else:
self.log_result(
f"Search: {query[:30]}...",
"failed",
f"Only {valid_results}/{len(web_results)} valid. Issues: {'; '.join(missing_fields[:3])}"
)
except Exception as e:
self.log_result(f"Search: {query[:30]}...", "failed", f"Exception: {type(e).__name__}: {str(e)}")
if self.verbose:
import traceback
print(f" Traceback: {traceback.format_exc()}")
if self.verbose and extracted_urls:
print(f"\n URLs collected for extraction test: {len(extracted_urls)}")
for url in extracted_urls:
print(f" - {url}")
return extracted_urls
async def test_web_extract(self, urls: List[str] = None):
"""Test web content extraction"""
print_section("Test 2: Web Extract (without LLM)")
# Use provided URLs or defaults
if not urls:
urls = [
"https://docs.firecrawl.dev/introduction",
"https://www.python.org/about/"
]
print(f" Using default URLs for testing")
else:
print(f" Using {len(urls)} URLs from search results")
# Test extraction
if urls:
try:
test_urls = urls[:2] # Test with max 2 URLs
print(f"\n Extracting content from {len(test_urls)} URL(s)...")
for url in test_urls:
print(f" - {url}")
if self.verbose:
print(f" Calling web_extract_tool(urls={test_urls}, format='markdown', use_llm_processing=False)")
result = await web_extract_tool(
test_urls,
format="markdown",
use_llm_processing=False
)
# Parse result
try:
data = json.loads(result)
except json.JSONDecodeError as e:
self.log_result("Extract (no LLM)", "failed", f"Invalid JSON: {e}")
if self.verbose:
print(f" Raw response (first 500 chars): {result[:500]}...")
return
if "error" in data:
self.log_result("Extract (no LLM)", "failed", f"API error: {data['error']}")
return
results = data.get("results", [])
if not results:
self.log_result("Extract (no LLM)", "failed", "No results in response")
if self.verbose:
print(f" Response keys: {list(data.keys())}")
return
# Validate each result
valid_results = 0
failed_results = 0
total_content_length = 0
extraction_details = []
for i, result in enumerate(results):
title = result.get("title", "No title")
content = result.get("content", "")
error = result.get("error")
if error:
failed_results += 1
extraction_details.append(f"Page {i+1}: ERROR - {error}")
if self.verbose:
print(f" Page {i+1}: ✗ Error - {error}")
elif content:
content_len = len(content)
total_content_length += content_len
valid_results += 1
extraction_details.append(f"Page {i+1}: {title[:40]}... ({content_len} chars)")
if self.verbose:
print(f" Page {i+1}: ✓ {title[:50]}... - {content_len} characters")
print(f" First 100 chars: {content[:100]}...")
else:
extraction_details.append(f"Page {i+1}: {title[:40]}... (EMPTY)")
if self.verbose:
print(f" Page {i+1}: ⚠ {title[:50]}... - Empty content")
# Log results
if valid_results > 0:
self.log_result(
"Extract (no LLM)",
"passed",
f"{valid_results}/{len(results)} pages extracted, {total_content_length} total chars"
)
else:
self.log_result(
"Extract (no LLM)",
"failed",
f"No valid content. {failed_results} errors, {len(results) - failed_results} empty"
)
if self.verbose:
print(f"\n Extraction details:")
for detail in extraction_details:
print(f" {detail}")
except Exception as e:
self.log_result("Extract (no LLM)", "failed", f"Exception: {type(e).__name__}: {str(e)}")
if self.verbose:
import traceback
print(f" Traceback: {traceback.format_exc()}")
async def test_web_extract_with_llm(self, urls: List[str] = None):
"""Test web extraction with LLM processing"""
print_section("Test 3: Web Extract (with Gemini LLM)")
if not self.test_llm:
self.log_result("Extract (with LLM)", "skipped", "LLM testing disabled")
return
# Use a URL likely to have substantial content
test_url = urls[0] if urls else "https://docs.firecrawl.dev/features/scrape"
try:
print(f"\n Extracting and processing: {test_url}")
result = await web_extract_tool(
[test_url],
format="markdown",
use_llm_processing=True,
min_length=1000 # Lower threshold for testing
)
data = json.loads(result)
if "error" in data:
self.log_result("Extract (with LLM)", "failed", data["error"])
return
results = data.get("results", [])
if not results:
self.log_result("Extract (with LLM)", "failed", "No results returned")
return
result = results[0]
content = result.get("content", "")
if content:
content_len = len(content)
# Check if content was actually processed (should be shorter than typical raw content)
if content_len > 0:
self.log_result(
"Extract (with LLM)",
"passed",
f"Content processed: {content_len} chars"
)
if self.verbose:
print(f"\n First 300 chars of processed content:")
print(f" {content[:300]}...")
else:
self.log_result("Extract (with LLM)", "failed", "No content after processing")
else:
self.log_result("Extract (with LLM)", "failed", "No content field in result")
except json.JSONDecodeError as e:
self.log_result("Extract (with LLM)", "failed", f"Invalid JSON: {e}")
except Exception as e:
self.log_result("Extract (with LLM)", "failed", str(e))
async def test_web_crawl(self):
"""Test web crawling functionality"""
print_section("Test 4: Web Crawl")
test_sites = [
("https://docs.firecrawl.dev", None, 2), # Test docs site
("https://firecrawl.dev", None, 3), # Test main site
]
for url, instructions, expected_min_pages in test_sites:
try:
print(f"\n Testing crawl of: {url}")
if instructions:
print(f" Instructions: {instructions}")
else:
print(f" No instructions (general crawl)")
print(f" Expected minimum pages: {expected_min_pages}")
# Show what's being called
if self.verbose:
print(f" Calling web_crawl_tool(url='{url}', instructions={instructions}, use_llm_processing=False)")
result = await web_crawl_tool(
url,
instructions=instructions,
use_llm_processing=False # Disable LLM for faster testing
)
# Check if result is valid JSON
try:
data = json.loads(result)
except json.JSONDecodeError as e:
self.log_result(f"Crawl: {url}", "failed", f"Invalid JSON response: {e}")
if self.verbose:
print(f" Raw response (first 500 chars): {result[:500]}...")
continue
# Check for errors
if "error" in data:
self.log_result(f"Crawl: {url}", "failed", f"API error: {data['error']}")
continue
# Get results
results = data.get("results", [])
if not results:
self.log_result(f"Crawl: {url}", "failed", "No pages in results array")
if self.verbose:
print(f" Full response: {json.dumps(data, indent=2)[:1000]}...")
continue
# Analyze pages
valid_pages = 0
empty_pages = 0
total_content = 0
page_details = []
for i, page in enumerate(results):
content = page.get("content", "")
title = page.get("title", "Untitled")
error = page.get("error")
if error:
page_details.append(f"Page {i+1}: ERROR - {error}")
elif content:
valid_pages += 1
content_len = len(content)
total_content += content_len
page_details.append(f"Page {i+1}: {title[:40]}... ({content_len} chars)")
else:
empty_pages += 1
page_details.append(f"Page {i+1}: {title[:40]}... (EMPTY)")
# Show detailed results if verbose
if self.verbose:
print(f"\n Crawl Results:")
print(f" Total pages returned: {len(results)}")
print(f" Valid pages (with content): {valid_pages}")
print(f" Empty pages: {empty_pages}")
print(f" Total content size: {total_content} characters")
print(f"\n Page Details:")
for detail in page_details[:10]: # Show first 10 pages
print(f" - {detail}")
if len(page_details) > 10:
print(f" ... and {len(page_details) - 10} more pages")
# Determine pass/fail
if valid_pages >= expected_min_pages:
self.log_result(
f"Crawl: {url}",
"passed",
f"{valid_pages}/{len(results)} valid pages, {total_content} chars total"
)
else:
self.log_result(
f"Crawl: {url}",
"failed",
f"Only {valid_pages} valid pages (expected >= {expected_min_pages}), {empty_pages} empty, {len(results)} total"
)
except Exception as e:
self.log_result(f"Crawl: {url}", "failed", f"Exception: {type(e).__name__}: {str(e)}")
if self.verbose:
import traceback
print(f" Traceback:")
print(" " + "\n ".join(traceback.format_exc().split("\n")))
async def run_all_tests(self):
"""Run all tests"""
self.start_time = datetime.now()
print_header("WEB TOOLS TEST SUITE")
print(f"Started at: {self.start_time.strftime('%Y-%m-%d %H:%M:%S')}")
# Test environment
if not self.test_environment():
print_error("\nCannot proceed without required API keys!")
return False
# Test search and collect URLs
urls = self.test_web_search()
# Test extraction
await self.test_web_extract(urls if urls else None)
# Test extraction with LLM
if self.test_llm:
await self.test_web_extract_with_llm(urls if urls else None)
# Test crawling
await self.test_web_crawl()
# Print summary
self.end_time = datetime.now()
duration = (self.end_time - self.start_time).total_seconds()
print_header("TEST SUMMARY")
print(f"Duration: {duration:.2f} seconds")
print(f"\n{Colors.GREEN}Passed: {len(self.test_results['passed'])}{Colors.ENDC}")
print(f"{Colors.FAIL}Failed: {len(self.test_results['failed'])}{Colors.ENDC}")
print(f"{Colors.WARNING}Skipped: {len(self.test_results['skipped'])}{Colors.ENDC}")
# List failed tests
if self.test_results["failed"]:
print(f"\n{Colors.FAIL}{Colors.BOLD}Failed Tests:{Colors.ENDC}")
for test in self.test_results["failed"]:
print(f" - {test['test']}: {test['details']}")
# Save results to file
self.save_results()
return len(self.test_results["failed"]) == 0
def save_results(self):
"""Save test results to a JSON file"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"test_results_web_tools_{timestamp}.json"
results = {
"test_suite": "Web Tools",
"start_time": self.start_time.isoformat() if self.start_time else None,
"end_time": self.end_time.isoformat() if self.end_time else None,
"duration_seconds": (self.end_time - self.start_time).total_seconds() if self.start_time and self.end_time else None,
"summary": {
"passed": len(self.test_results["passed"]),
"failed": len(self.test_results["failed"]),
"skipped": len(self.test_results["skipped"])
},
"results": self.test_results,
"environment": {
"firecrawl_api_key": check_firecrawl_api_key(),
"nous_api_key": check_auxiliary_model(),
"debug_mode": get_debug_session_info()["enabled"]
}
}
try:
with open(filename, 'w') as f:
json.dump(results, f, indent=2, ensure_ascii=False)
print_info(f"Test results saved to: {filename}")
except Exception as e:
print_warning(f"Failed to save results: {e}")
async def main():
"""Main entry point"""
parser = argparse.ArgumentParser(description="Test Web Tools Module")
parser.add_argument("--no-llm", action="store_true", help="Skip LLM processing tests")
parser.add_argument("--verbose", "-v", action="store_true", help="Show detailed output")
parser.add_argument("--debug", action="store_true", help="Enable debug mode for web tools")
args = parser.parse_args()
# Set debug mode if requested
if args.debug:
os.environ["WEB_TOOLS_DEBUG"] = "true"
print_info("Debug mode enabled for web tools")
# Create tester
tester = WebToolsTester(
verbose=args.verbose,
test_llm=not args.no_llm
)
# Run tests
success = await tester.run_all_tests()
# Exit with appropriate code
sys.exit(0 if success else 1)
if __name__ == "__main__":
asyncio.run(main())