some cleanups

This commit is contained in:
teknium 2025-11-05 03:47:17 +00:00
parent ab5c9fc37b
commit c82741c3d8
15 changed files with 911 additions and 56 deletions

View file

@ -24,7 +24,7 @@ def create_test_dataset():
with open(test_file, 'w') as f:
for prompt in prompts:
f.write(json.dumps(prompt) + "\n")
f.write(json.dumps(prompt, ensure_ascii=False) + "\n")
print(f"✅ Created test dataset: {test_file}")
return test_file

View file

@ -0,0 +1,424 @@
#!/usr/bin/env python3
"""
Test script to verify checkpoint behavior in batch_runner.py
This script simulates batch processing with intentional failures to test:
1. Whether checkpoints are saved incrementally during processing
2. Whether resume functionality works correctly after interruption
3. Whether data integrity is maintained across checkpoint cycles
Usage:
# Test current implementation
python tests/test_checkpoint_resumption.py --test_current
# Test after fix is applied
python tests/test_checkpoint_resumption.py --test_fixed
# Run full comparison
python tests/test_checkpoint_resumption.py --compare
"""
import json
import os
import shutil
import sys
import time
import signal
from pathlib import Path
from typing import List, Dict, Any
import traceback
# Add parent directory to path to import batch_runner
sys.path.insert(0, str(Path(__file__).parent.parent))
def create_test_dataset(num_prompts: int = 20) -> Path:
"""Create a small test dataset for checkpoint testing."""
test_data_dir = Path("tests/test_data")
test_data_dir.mkdir(parents=True, exist_ok=True)
dataset_file = test_data_dir / "checkpoint_test_dataset.jsonl"
with open(dataset_file, 'w', encoding='utf-8') as f:
for i in range(num_prompts):
entry = {
"prompt": f"Test prompt {i}: What is 2+2? Just answer briefly.",
"test_id": i
}
f.write(json.dumps(entry, ensure_ascii=False) + "\n")
print(f"✅ Created test dataset: {dataset_file} ({num_prompts} prompts)")
return dataset_file
def monitor_checkpoint_during_run(checkpoint_file: Path, duration: int = 30) -> List[Dict[str, Any]]:
"""
Monitor checkpoint file during a batch run to see when it gets updated.
Args:
checkpoint_file: Path to checkpoint file to monitor
duration: How long to monitor (seconds)
Returns:
List of checkpoint snapshots with timestamps
"""
snapshots = []
start_time = time.time()
last_mtime = None
print(f"\n🔍 Monitoring checkpoint file: {checkpoint_file}")
print(f" Duration: {duration}s")
print("-" * 70)
while time.time() - start_time < duration:
if checkpoint_file.exists():
current_mtime = checkpoint_file.stat().st_mtime
# Check if file was modified
if last_mtime is None or current_mtime != last_mtime:
elapsed = time.time() - start_time
try:
with open(checkpoint_file, 'r') as f:
checkpoint_data = json.load(f)
snapshot = {
"elapsed_seconds": round(elapsed, 2),
"completed_count": len(checkpoint_data.get("completed_prompts", [])),
"completed_prompts": checkpoint_data.get("completed_prompts", [])[:5], # First 5 for display
"timestamp": checkpoint_data.get("last_updated")
}
snapshots.append(snapshot)
print(f"[{elapsed:6.2f}s] Checkpoint updated: {snapshot['completed_count']} prompts completed")
except Exception as e:
print(f"[{elapsed:6.2f}s] Error reading checkpoint: {e}")
last_mtime = current_mtime
else:
if len(snapshots) == 0:
print(f"[{time.time() - start_time:6.2f}s] Checkpoint file not yet created...")
time.sleep(0.5) # Check every 0.5 seconds
return snapshots
def test_current_implementation():
"""Test the current checkpoint implementation."""
print("\n" + "=" * 70)
print("TEST 1: Current Implementation - Checkpoint Timing")
print("=" * 70)
print("\n📝 Testing whether checkpoints are saved incrementally during run...")
# Setup
dataset_file = create_test_dataset(num_prompts=12)
run_name = "checkpoint_test_current"
output_dir = Path("data") / run_name
# Clean up any existing test data
if output_dir.exists():
shutil.rmtree(output_dir)
# Import here to avoid issues if module changes
from batch_runner import BatchRunner
checkpoint_file = output_dir / "checkpoint.json"
# Start monitoring in a separate process would be ideal, but for simplicity
# we'll just check before and after
print(f"\n▶️ Starting batch run...")
print(f" Dataset: {dataset_file}")
print(f" Batch size: 3 (4 batches total)")
print(f" Workers: 2")
print(f" Expected behavior: If incremental, checkpoint should update during run")
start_time = time.time()
try:
runner = BatchRunner(
dataset_file=str(dataset_file),
batch_size=3,
run_name=run_name,
distribution="default",
max_iterations=3, # Keep it short
model="claude-opus-4-20250514",
num_workers=2,
verbose=False
)
# Run with monitoring
import threading
snapshots = []
def monitor():
nonlocal snapshots
snapshots = monitor_checkpoint_during_run(checkpoint_file, duration=60)
monitor_thread = threading.Thread(target=monitor, daemon=True)
monitor_thread.start()
runner.run(resume=False)
monitor_thread.join(timeout=2)
except Exception as e:
print(f"❌ Error during run: {e}")
traceback.print_exc()
return False
elapsed = time.time() - start_time
# Analyze results
print("\n" + "=" * 70)
print("📊 TEST RESULTS")
print("=" * 70)
print(f"Total run time: {elapsed:.2f}s")
print(f"Checkpoint updates observed: {len(snapshots)}")
if len(snapshots) == 0:
print("\n❌ ISSUE: No checkpoint updates observed during run")
print(" This suggests checkpoints are only saved at the end")
return False
elif len(snapshots) == 1:
print("\n⚠️ WARNING: Only 1 checkpoint update (likely at the end)")
print(" This confirms the bug - no incremental checkpointing")
return False
else:
print(f"\n✅ GOOD: Multiple checkpoint updates ({len(snapshots)}) observed")
print(" Checkpointing appears to be incremental")
# Show timeline
print("\n📈 Checkpoint Timeline:")
for i, snapshot in enumerate(snapshots, 1):
print(f" {i}. [{snapshot['elapsed_seconds']:6.2f}s] "
f"{snapshot['completed_count']} prompts completed")
return True
def test_interruption_and_resume():
"""Test that resume actually works after interruption."""
print("\n" + "=" * 70)
print("TEST 2: Interruption and Resume")
print("=" * 70)
print("\n📝 Testing whether resume works after manual interruption...")
# Setup
dataset_file = create_test_dataset(num_prompts=15)
run_name = "checkpoint_test_resume"
output_dir = Path("data") / run_name
# Clean up any existing test data
if output_dir.exists():
shutil.rmtree(output_dir)
from batch_runner import BatchRunner
checkpoint_file = output_dir / "checkpoint.json"
print(f"\n▶️ Starting first run (will process 5 prompts, then simulate interruption)...")
try:
# Create a modified dataset with only first 5 prompts for initial run
temp_dataset = Path("tests/test_data/checkpoint_test_resume_partial.jsonl")
with open(dataset_file, 'r') as f:
lines = f.readlines()[:5]
with open(temp_dataset, 'w') as f:
f.writelines(lines)
runner = BatchRunner(
dataset_file=str(temp_dataset),
batch_size=2,
run_name=run_name,
distribution="default",
max_iterations=3,
model="claude-opus-4-20250514",
num_workers=1,
verbose=False
)
runner.run(resume=False)
# Check checkpoint after first run
if not checkpoint_file.exists():
print("❌ ERROR: Checkpoint file not created after first run")
return False
with open(checkpoint_file, 'r') as f:
checkpoint_data = json.load(f)
initial_completed = len(checkpoint_data.get("completed_prompts", []))
print(f"✅ First run completed: {initial_completed} prompts saved to checkpoint")
# Now try to resume with full dataset
print(f"\n▶️ Starting resume run with full dataset (15 prompts)...")
runner2 = BatchRunner(
dataset_file=str(dataset_file),
batch_size=2,
run_name=run_name,
distribution="default",
max_iterations=3,
model="claude-opus-4-20250514",
num_workers=1,
verbose=False
)
runner2.run(resume=True)
# Check final checkpoint
with open(checkpoint_file, 'r') as f:
final_checkpoint = json.load(f)
final_completed = len(final_checkpoint.get("completed_prompts", []))
print("\n" + "=" * 70)
print("📊 TEST RESULTS")
print("=" * 70)
print(f"Initial completed: {initial_completed}")
print(f"Final completed: {final_completed}")
print(f"Expected: 15")
if final_completed == 15:
print("\n✅ PASS: Resume successfully completed all prompts")
return True
else:
print(f"\n❌ FAIL: Expected 15 completed, got {final_completed}")
return False
except Exception as e:
print(f"❌ Error during test: {e}")
traceback.print_exc()
return False
def test_simulated_crash():
"""Test behavior when process crashes mid-execution."""
print("\n" + "=" * 70)
print("TEST 3: Simulated Crash During Execution")
print("=" * 70)
print("\n📝 This test would require running in a subprocess and killing it...")
print(" Skipping for safety - manual testing recommended")
return None
def print_test_plan():
"""Print the detailed test and fix plan."""
print("\n" + "=" * 70)
print("CHECKPOINT FIX - DETAILED PLAN")
print("=" * 70)
print("""
📋 PROBLEM SUMMARY
------------------
Current implementation uses pool.map() which blocks until ALL batches complete.
Checkpoint is only saved after all batches finish (line 558-559).
If process crashes during batch processing:
- All progress is lost
- Resume does nothing (no incremental checkpoint was saved)
📋 PROPOSED SOLUTION
--------------------
Replace pool.map() with pool.imap_unordered() to get results as they complete.
Save checkpoint after EACH batch completes using a multiprocessing Lock.
Key changes:
1. Use Manager().Lock() for thread-safe checkpoint writes
2. Replace pool.map() with pool.imap_unordered()
3. Update checkpoint after each batch result
4. Maintain backward compatibility with existing checkpoints
📋 IMPLEMENTATION STEPS
-----------------------
1. Add Manager and Lock initialization before Pool creation
2. Pass shared checkpoint data and lock to workers (via Manager)
3. Replace pool.map() with pool.imap_unordered()
4. In result loop: save checkpoint after each batch
5. Add error handling for checkpoint write failures
📋 RISKS & MITIGATIONS
----------------------
Risk: Checkpoint file corruption if two processes write simultaneously
Mitigation: Use multiprocessing.Lock() for exclusive access
Risk: Performance impact from frequent checkpoint writes
Mitigation: Checkpoint writes are fast (small JSON), negligible impact
Risk: Breaking existing runs that are already checkpointed
Mitigation: Maintain checkpoint format, only change timing
Risk: Bugs in multiprocessing lock/manager code
Mitigation: Thorough testing with this test script
📋 TESTING STRATEGY
-------------------
1. Run test_current_implementation() - Confirm bug exists
2. Apply fix to batch_runner.py
3. Run test_current_implementation() again - Should see incremental updates
4. Run test_interruption_and_resume() - Verify resume works
5. Manual test: Start run, kill process mid-batch, resume
📋 ROLLBACK PLAN
----------------
If issues arise:
1. Git revert the changes
2. Original code is working (just missing incremental checkpoint)
3. No data corruption risk - checkpoints are write-only
""")
def main(
test_current: bool = False,
test_resume: bool = False,
test_crash: bool = False,
compare: bool = False,
show_plan: bool = False
):
"""
Run checkpoint behavior tests.
Args:
test_current: Test current implementation checkpoint timing
test_resume: Test interruption and resume functionality
test_crash: Test simulated crash scenario (manual)
compare: Run all tests and compare
show_plan: Show detailed fix plan
"""
if show_plan or (not any([test_current, test_resume, test_crash, compare])):
print_test_plan()
return
results = {}
if test_current or compare:
results['current'] = test_current_implementation()
if test_resume or compare:
results['resume'] = test_interruption_and_resume()
if test_crash or compare:
results['crash'] = test_simulated_crash()
# Summary
if results:
print("\n" + "=" * 70)
print("OVERALL TEST SUMMARY")
print("=" * 70)
for test_name, result in results.items():
if result is None:
status = "⏭️ SKIPPED"
elif result:
status = "✅ PASS"
else:
status = "❌ FAIL"
print(f"{status} - {test_name}")
if __name__ == "__main__":
import fire
fire.Fire(main)

176
tests/test_nous_api_limits.py Executable file
View file

@ -0,0 +1,176 @@
#!/usr/bin/env python3
"""
Test script to diagnose Nous API 400 errors with gemini-2.5-flash model.
This tests various content lengths and parameters to identify what causes failures.
"""
import asyncio
import os
from openai import AsyncOpenAI
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Initialize the Nous API client
nous_client = AsyncOpenAI(
api_key=os.getenv("NOUS_API_KEY"),
base_url="https://inference-api.nousresearch.com/v1"
)
MODEL = "gemini-2.5-flash"
async def test_api_call(test_name: str, content_length: int, **kwargs):
"""Test an API call with specific parameters."""
print(f"\n{'='*60}")
print(f"Test: {test_name}")
print(f"Content length: {content_length:,} characters")
print(f"Additional params: {kwargs}")
print(f"{'='*60}")
# Generate test content
content = "A" * content_length
system_prompt = """You are an expert content analyst. Your job is to process web content and create a comprehensive yet concise summary that preserves all important information while dramatically reducing bulk.
Create a well-structured markdown summary that includes:
1. Key excerpts (quotes, code snippets, important facts) in their original format
2. Comprehensive summary of all other important information
3. Proper markdown formatting with headers, bullets, and emphasis
Your goal is to preserve ALL important information while reducing length. Never lose key facts, figures, insights, or actionable information. Make it scannable and well-organized."""
user_prompt = f"""Please process this web content and create a comprehensive markdown summary:
CONTENT TO PROCESS:
{content}
Create a markdown summary that captures all key information in a well-organized, scannable format. Include important quotes and code snippets in their original formatting. Focus on actionable information, specific details, and unique insights."""
try:
response = await nous_client.chat.completions.create(
model=MODEL,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
**kwargs
)
result = response.choices[0].message.content
print(f"✅ SUCCESS")
print(f" Response length: {len(result)} characters")
print(f" Model used: {response.model}")
print(f" Usage: {response.usage}")
return True
except Exception as e:
print(f"❌ FAILED: {str(e)}")
return False
async def main():
"""Run all tests."""
print("Testing Nous API with gemini-2.5-flash model")
print(f"API Key present: {'Yes' if os.getenv('NOUS_API_KEY') else 'No'}")
results = {}
# Test 1: Small content (should always work)
results['small'] = await test_api_call(
"Small content (5,000 chars)",
5000,
temperature=0.1,
max_tokens=4000
)
await asyncio.sleep(1)
# Test 2: Medium content (around what was failing)
results['medium'] = await test_api_call(
"Medium content (20,000 chars)",
20000,
temperature=0.1,
max_tokens=4000
)
await asyncio.sleep(1)
# Test 3: Large content (79,625 chars like the error)
results['large'] = await test_api_call(
"Large content (79,625 chars)",
79625,
temperature=0.1,
max_tokens=4000
)
await asyncio.sleep(1)
# Test 4: Very large content (100k chars)
results['very_large'] = await test_api_call(
"Very large content (100,000 chars)",
100000,
temperature=0.1,
max_tokens=4000
)
await asyncio.sleep(1)
# Test 5: Same as working case but different max_tokens
results['diff_max_tokens'] = await test_api_call(
"Medium content with higher max_tokens",
20000,
temperature=0.1,
max_tokens=8000
)
await asyncio.sleep(1)
# Test 6: No max_tokens specified
results['no_max_tokens'] = await test_api_call(
"Medium content without max_tokens",
20000,
temperature=0.1
)
await asyncio.sleep(1)
# Test 7: With actual web content (mixed characters)
mixed_content = """
This is a test of web content with various characters:
- Unicode: 你好世界 🌍
- Special chars: <>&"'
- Numbers: 123456789
- Markdown: **bold** _italic_ `code`
- URLs: https://example.com
""" * 1000 # Repeat to make it ~79k chars
print(f"\n{'='*60}")
print(f"Test: Mixed content (real-world scenario)")
print(f"Content length: {len(mixed_content):,} characters")
print(f"{'='*60}")
try:
response = await nous_client.chat.completions.create(
model=MODEL,
messages=[
{"role": "system", "content": "Summarize this content."},
{"role": "user", "content": mixed_content}
],
temperature=0.1,
max_tokens=4000
)
print(f"✅ SUCCESS")
results['mixed_content'] = True
except Exception as e:
print(f"❌ FAILED: {str(e)}")
results['mixed_content'] = False
# Summary
print(f"\n{'='*60}")
print("SUMMARY OF RESULTS:")
print(f"{'='*60}")
for test, passed in results.items():
status = "✅ PASS" if passed else "❌ FAIL"
print(f"{test:20s}: {status}")
passed = sum(results.values())
total = len(results)
print(f"\nTotal: {passed}/{total} tests passed")
if __name__ == "__main__":
asyncio.run(main())

View file

@ -0,0 +1,131 @@
#!/usr/bin/env python3
"""
Test to understand the pattern of failures - it's not about content length!
"""
import asyncio
import os
from openai import AsyncOpenAI
from dotenv import load_dotenv
load_dotenv()
nous_client = AsyncOpenAI(
api_key=os.getenv("NOUS_API_KEY"),
base_url="https://inference-api.nousresearch.com/v1"
)
MODEL = "gemini-2.5-flash"
async def quick_test(description: str, content: str, **kwargs):
"""Quick API test."""
print(f"\n{description} ({len(content):,} chars)...", end=" ")
try:
response = await nous_client.chat.completions.create(
model=MODEL,
messages=[
{"role": "system", "content": "Summarize this."},
{"role": "user", "content": content}
],
**kwargs
)
print(f"✅ SUCCESS")
return True
except Exception as e:
print(f"❌ FAILED: {str(e)[:80]}")
return False
async def main():
print("Testing different content types and parameters...")
# Theory 1: Repeated characters trigger validation
print("\n" + "="*60)
print("THEORY 1: Repeated characters")
print("="*60)
await quick_test("Repeated 'A's (5k)", "A" * 5000, temperature=0.1, max_tokens=4000)
await asyncio.sleep(0.5)
await quick_test("Repeated 'A's (79k)", "A" * 79625, temperature=0.1, max_tokens=4000)
await asyncio.sleep(0.5)
await quick_test("Varied text (5k)", "Test content. " * 400, temperature=0.1, max_tokens=4000)
await asyncio.sleep(0.5)
await quick_test("Varied text (79k)", "Test content with variety. " * 3000, temperature=0.1, max_tokens=4000)
# Theory 2: max_tokens parameter
print("\n" + "="*60)
print("THEORY 2: max_tokens parameter")
print("="*60)
content = "Test " * 4000 # 20k chars
await quick_test("max_tokens=4000", content, temperature=0.1, max_tokens=4000)
await asyncio.sleep(0.5)
await quick_test("max_tokens=8000", content, temperature=0.1, max_tokens=8000)
await asyncio.sleep(0.5)
await quick_test("max_tokens=2000", content, temperature=0.1, max_tokens=2000)
await asyncio.sleep(0.5)
await quick_test("No max_tokens", content, temperature=0.1)
# Theory 3: Temperature parameter
print("\n" + "="*60)
print("THEORY 3: Temperature parameter")
print("="*60)
content = "Test " * 4000
await quick_test("temperature=0.1", content, temperature=0.1, max_tokens=4000)
await asyncio.sleep(0.5)
await quick_test("temperature=0.0", content, temperature=0.0, max_tokens=4000)
await asyncio.sleep(0.5)
await quick_test("temperature=0.5", content, temperature=0.5, max_tokens=4000)
await asyncio.sleep(0.5)
await quick_test("No temperature", content, max_tokens=4000)
# Theory 4: System prompt impact
print("\n" + "="*60)
print("THEORY 4: System prompt length")
print("="*60)
short_system = "Summarize this."
long_system = """You are an expert content analyst. Your job is to process web content and create a comprehensive yet concise summary that preserves all important information while dramatically reducing bulk.
Create a well-structured markdown summary that includes:
1. Key excerpts (quotes, code snippets, important facts) in their original format
2. Comprehensive summary of all other important information
3. Proper markdown formatting with headers, bullets, and emphasis
Your goal is to preserve ALL important information while reducing length."""
content = "A" * 5000
print(f"\nShort system prompt...", end=" ")
try:
response = await nous_client.chat.completions.create(
model=MODEL,
messages=[
{"role": "system", "content": short_system},
{"role": "user", "content": content}
],
temperature=0.1,
max_tokens=4000
)
print(f"✅ SUCCESS")
except Exception as e:
print(f"❌ FAILED")
await asyncio.sleep(0.5)
print(f"Long system prompt...", end=" ")
try:
response = await nous_client.chat.completions.create(
model=MODEL,
messages=[
{"role": "system", "content": long_system},
{"role": "user", "content": content}
],
temperature=0.1,
max_tokens=4000
)
print(f"✅ SUCCESS")
except Exception as e:
print(f"❌ FAILED")
if __name__ == "__main__":
asyncio.run(main())

View file

@ -0,0 +1,109 @@
#!/usr/bin/env python3
"""
Test to confirm: temperature < 0.3 causes failures on Nous API
"""
import asyncio
import os
from openai import AsyncOpenAI
from dotenv import load_dotenv
load_dotenv()
nous_client = AsyncOpenAI(
api_key=os.getenv("NOUS_API_KEY"),
base_url="https://inference-api.nousresearch.com/v1"
)
MODEL = "gemini-2.5-flash"
async def test_temp(temp_value):
"""Test a specific temperature value."""
content = "Test content. " * 1000 # 14k chars
print(f"Testing temperature={temp_value}...", end=" ")
try:
response = await nous_client.chat.completions.create(
model=MODEL,
messages=[
{"role": "system", "content": "Summarize this content."},
{"role": "user", "content": content}
],
temperature=temp_value,
max_tokens=4000
)
print(f"✅ SUCCESS")
return True
except Exception as e:
print(f"❌ FAILED")
return False
async def main():
print("Testing temperature threshold for Nous API...")
print("="*60)
temps = [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 1.0]
for temp in temps:
await test_temp(temp)
await asyncio.sleep(0.5)
print("="*60)
print("\nNow testing with ACTUAL web_tools.py content and parameters:")
print("="*60)
# Simulate the actual web_tools.py call
system_prompt = """You are an expert content analyst. Your job is to process web content and create a comprehensive yet concise summary that preserves all important information while dramatically reducing bulk.
Create a well-structured markdown summary that includes:
1. Key excerpts (quotes, code snippets, important facts) in their original format
2. Comprehensive summary of all other important information
3. Proper markdown formatting with headers, bullets, and emphasis
Your goal is to preserve ALL important information while reducing length. Never lose key facts, figures, insights, or actionable information. Make it scannable and well-organized."""
content = "Sample web page content. " * 3000 # ~75k chars like the real failures
user_prompt = f"""Please process this web content and create a comprehensive markdown summary:
CONTENT TO PROCESS:
{content}
Create a markdown summary that captures all key information in a well-organized, scannable format. Include important quotes and code snippets in their original formatting. Focus on actionable information, specific details, and unique insights."""
print(f"\nActual web_tools call (temp=0.1, {len(content):,} chars)...", end=" ")
try:
response = await nous_client.chat.completions.create(
model=MODEL,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
temperature=0.1,
max_tokens=4000
)
print(f"✅ SUCCESS")
except:
print(f"❌ FAILED")
await asyncio.sleep(0.5)
print(f"Same call but with temp=0.3...", end=" ")
try:
response = await nous_client.chat.completions.create(
model=MODEL,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
temperature=0.3,
max_tokens=4000
)
print(f"✅ SUCCESS")
except:
print(f"❌ FAILED")
if __name__ == "__main__":
asyncio.run(main())

View file

@ -583,7 +583,7 @@ class WebToolsTester:
try:
with open(filename, 'w') as f:
json.dump(results, f, indent=2)
json.dump(results, f, indent=2, ensure_ascii=False)
print_info(f"Test results saved to: {filename}")
except Exception as e:
print_warning(f"Failed to save results: {e}")