auto-report-skill/scripts/merge_transcriptions.py

241 lines
8.5 KiB
Python

#!/usr/bin/env python3
"""
Merge two transcription sources by timestamps.
Primary source (e.g., lavalier mic / Saramonic) — better quality for main speaker.
Secondary source (e.g., room mic / H2n XY) — captures audience/student voices.
Strategy:
1. Both sources have timestamped segments from Whisper.
2. For each secondary segment, check if primary has a similar segment at the same time.
3. If primary has coverage (overlapping segment exists) — keep primary's version.
4. If primary has NO coverage (gap) — insert secondary segment, tagged as [audience].
5. Time alignment: the two recordings may have different start times.
We detect the offset by cross-correlating the first few segments' text.
Usage:
python3 merge_transcriptions.py <primary.json> <secondary.json> <output_dir> [--offset SECONDS]
Output:
<output_dir>/merged.json — combined segments with source tags
<output_dir>/merged.txt — timestamped text
<output_dir>/merged_plain.txt — plain text for LLM processing
"""
import json
import sys
import os
import argparse
from difflib import SequenceMatcher
def load_segments(json_path):
"""Load segments from whisper JSON output."""
with open(json_path) as f:
data = json.load(f)
segments = []
for seg in data.get("segments", []):
segments.append({
"start": seg.get("start", 0),
"end": seg.get("end", 0),
"text": seg.get("text", "").strip(),
})
return segments, data.get("duration", 0)
def estimate_offset(primary_segs, secondary_segs, search_window=120):
"""
Estimate time offset between two recordings.
Returns offset such that: secondary_time + offset ≈ primary_time
Uses text similarity of segments within search window.
"""
if not primary_segs or not secondary_segs:
return 0.0
best_offset = 0.0
best_score = 0.0
# Try offsets in 1-second steps within search window
for offset_int in range(-search_window, search_window + 1):
offset = float(offset_int)
score = 0.0
comparisons = 0
for p_seg in primary_segs[:30]: # check first 30 primary segments
p_mid = (p_seg["start"] + p_seg["end"]) / 2
# Find closest secondary segment at (p_mid - offset)
target_time = p_mid - offset
best_match = None
best_dist = float("inf")
for s_seg in secondary_segs[:40]:
s_mid = (s_seg["start"] + s_seg["end"]) / 2
dist = abs(s_mid - target_time)
if dist < best_dist:
best_dist = dist
best_match = s_seg
if best_match and best_dist < 15: # within 15 seconds
sim = SequenceMatcher(
None, p_seg["text"].lower(), best_match["text"].lower()
).ratio()
score += sim
comparisons += 1
if comparisons > 0:
score /= comparisons
if score > best_score:
best_score = score
best_offset = offset
return best_offset
def merge(primary_segs, secondary_segs, offset=0.0, gap_threshold=3.0, sim_threshold=0.3):
"""
Merge primary and secondary segments.
Args:
primary_segs: segments from primary source (lavalier)
secondary_segs: segments from secondary source (room mic)
offset: time offset to add to secondary timestamps to align with primary
gap_threshold: minimum gap (seconds) in primary to consider inserting secondary
sim_threshold: below this similarity, secondary segment is considered unique content
"""
merged = []
# Add source tag to primary segments
for seg in primary_segs:
merged.append({
**seg,
"source": "primary",
})
# Build primary timeline: list of (start, end) intervals
primary_intervals = [(s["start"], s["end"]) for s in primary_segs]
def primary_covers(t_start, t_end):
"""Check if primary has any segment overlapping [t_start, t_end]."""
for p_start, p_end in primary_intervals:
if p_start <= t_end and p_end >= t_start:
return True
return False
def find_similar_primary(text, t_start, t_end, window=10):
"""Find most similar primary segment near the given time."""
best_sim = 0.0
for seg in primary_segs:
if abs(seg["start"] - t_start) > window and abs(seg["end"] - t_end) > window:
continue
sim = SequenceMatcher(None, text.lower(), seg["text"].lower()).ratio()
if sim > best_sim:
best_sim = sim
return best_sim
# Check each secondary segment
inserted = 0
for seg in secondary_segs:
adj_start = seg["start"] + offset
adj_end = seg["end"] + offset
text = seg["text"]
if not text:
continue
# Check if primary already covers this time range
if primary_covers(adj_start, adj_end):
# Primary has something here — check if it's the same content
sim = find_similar_primary(text, adj_start, adj_end)
if sim >= sim_threshold:
continue # primary already has this, skip
# This segment is unique to secondary (likely audience voice)
merged.append({
"start": round(adj_start, 2),
"end": round(adj_end, 2),
"text": text,
"source": "secondary",
})
inserted += 1
# Sort by start time
merged.sort(key=lambda s: s["start"])
return merged, inserted
def format_timestamp(seconds):
h = int(seconds // 3600)
m = int((seconds % 3600) // 60)
s = int(seconds % 60)
return f"{h:02d}:{m:02d}:{s:02d}"
def main():
parser = argparse.ArgumentParser(description="Merge two transcription sources")
parser.add_argument("primary", help="Primary source JSON (lavalier/Saramonic)")
parser.add_argument("secondary", help="Secondary source JSON (room mic/H2n)")
parser.add_argument("output_dir", help="Output directory")
parser.add_argument("--offset", type=float, default=None,
help="Time offset (seconds) to add to secondary timestamps. "
"Auto-detected if not specified.")
parser.add_argument("--gap-threshold", type=float, default=3.0,
help="Minimum gap in primary to insert secondary (default: 3.0)")
parser.add_argument("--sim-threshold", type=float, default=0.3,
help="Similarity threshold below which secondary is unique (default: 0.3)")
args = parser.parse_args()
print(f"Primary: {args.primary}")
print(f"Secondary: {args.secondary}")
primary_segs, primary_dur = load_segments(args.primary)
secondary_segs, secondary_dur = load_segments(args.secondary)
print(f"Primary: {len(primary_segs)} segments, {primary_dur:.0f}s")
print(f"Secondary: {len(secondary_segs)} segments, {secondary_dur:.0f}s")
# Estimate or use provided offset
if args.offset is not None:
offset = args.offset
print(f"Using provided offset: {offset:+.1f}s")
else:
print("Estimating time offset...")
offset = estimate_offset(primary_segs, secondary_segs)
print(f"Estimated offset: {offset:+.1f}s (secondary is {abs(offset):.0f}s "
f"{'behind' if offset > 0 else 'ahead of'} primary)")
# Merge
merged, inserted = merge(
primary_segs, secondary_segs,
offset=offset,
gap_threshold=args.gap_threshold,
sim_threshold=args.sim_threshold,
)
print(f"Merged: {len(merged)} segments ({len(primary_segs)} primary + {inserted} from secondary)")
# Write outputs
os.makedirs(args.output_dir, exist_ok=True)
# JSON
json_path = os.path.join(args.output_dir, "merged.json")
with open(json_path, "w") as f:
json.dump({"segments": merged, "offset": offset}, f, ensure_ascii=False, indent=2)
print(f"Written: {json_path}")
# Timestamped text
txt_path = os.path.join(args.output_dir, "merged.txt")
with open(txt_path, "w") as f:
for seg in merged:
tag = "" if seg["source"] == "primary" else " [аудитория]"
f.write(f"[{format_timestamp(seg['start'])}]{tag} {seg['text']}\n")
print(f"Written: {txt_path}")
# Plain text
plain_path = os.path.join(args.output_dir, "merged_plain.txt")
with open(plain_path, "w") as f:
f.write(" ".join(seg["text"] for seg in merged))
print(f"Written: {plain_path}")
if __name__ == "__main__":
main()