#!/usr/bin/env python3 """ Merge two transcription sources by timestamps. Primary source (e.g., lavalier mic / Saramonic) — better quality for main speaker. Secondary source (e.g., room mic / H2n XY) — captures audience/student voices. Strategy: 1. Both sources have timestamped segments from Whisper. 2. For each secondary segment, check if primary has a similar segment at the same time. 3. If primary has coverage (overlapping segment exists) — keep primary's version. 4. If primary has NO coverage (gap) — insert secondary segment, tagged as [audience]. 5. Time alignment: the two recordings may have different start times. We detect the offset by cross-correlating the first few segments' text. Usage: python3 merge_transcriptions.py [--offset SECONDS] Output: /merged.json — combined segments with source tags /merged.txt — timestamped text /merged_plain.txt — plain text for LLM processing """ import json import sys import os import argparse from difflib import SequenceMatcher def load_segments(json_path): """Load segments from whisper JSON output.""" with open(json_path) as f: data = json.load(f) segments = [] for seg in data.get("segments", []): segments.append({ "start": seg.get("start", 0), "end": seg.get("end", 0), "text": seg.get("text", "").strip(), }) return segments, data.get("duration", 0) def estimate_offset(primary_segs, secondary_segs, search_window=120): """ Estimate time offset between two recordings. Returns offset such that: secondary_time + offset ≈ primary_time Uses text similarity of segments within search window. """ if not primary_segs or not secondary_segs: return 0.0 best_offset = 0.0 best_score = 0.0 # Try offsets in 1-second steps within search window for offset_int in range(-search_window, search_window + 1): offset = float(offset_int) score = 0.0 comparisons = 0 for p_seg in primary_segs[:30]: # check first 30 primary segments p_mid = (p_seg["start"] + p_seg["end"]) / 2 # Find closest secondary segment at (p_mid - offset) target_time = p_mid - offset best_match = None best_dist = float("inf") for s_seg in secondary_segs[:40]: s_mid = (s_seg["start"] + s_seg["end"]) / 2 dist = abs(s_mid - target_time) if dist < best_dist: best_dist = dist best_match = s_seg if best_match and best_dist < 15: # within 15 seconds sim = SequenceMatcher( None, p_seg["text"].lower(), best_match["text"].lower() ).ratio() score += sim comparisons += 1 if comparisons > 0: score /= comparisons if score > best_score: best_score = score best_offset = offset return best_offset def merge(primary_segs, secondary_segs, offset=0.0, gap_threshold=3.0, sim_threshold=0.3): """ Merge primary and secondary segments. Args: primary_segs: segments from primary source (lavalier) secondary_segs: segments from secondary source (room mic) offset: time offset to add to secondary timestamps to align with primary gap_threshold: minimum gap (seconds) in primary to consider inserting secondary sim_threshold: below this similarity, secondary segment is considered unique content """ merged = [] # Add source tag to primary segments for seg in primary_segs: merged.append({ **seg, "source": "primary", }) # Build primary timeline: list of (start, end) intervals primary_intervals = [(s["start"], s["end"]) for s in primary_segs] def primary_covers(t_start, t_end): """Check if primary has any segment overlapping [t_start, t_end].""" for p_start, p_end in primary_intervals: if p_start <= t_end and p_end >= t_start: return True return False def find_similar_primary(text, t_start, t_end, window=10): """Find most similar primary segment near the given time.""" best_sim = 0.0 for seg in primary_segs: if abs(seg["start"] - t_start) > window and abs(seg["end"] - t_end) > window: continue sim = SequenceMatcher(None, text.lower(), seg["text"].lower()).ratio() if sim > best_sim: best_sim = sim return best_sim # Check each secondary segment inserted = 0 for seg in secondary_segs: adj_start = seg["start"] + offset adj_end = seg["end"] + offset text = seg["text"] if not text: continue # Check if primary already covers this time range if primary_covers(adj_start, adj_end): # Primary has something here — check if it's the same content sim = find_similar_primary(text, adj_start, adj_end) if sim >= sim_threshold: continue # primary already has this, skip # This segment is unique to secondary (likely audience voice) merged.append({ "start": round(adj_start, 2), "end": round(adj_end, 2), "text": text, "source": "secondary", }) inserted += 1 # Sort by start time merged.sort(key=lambda s: s["start"]) return merged, inserted def format_timestamp(seconds): h = int(seconds // 3600) m = int((seconds % 3600) // 60) s = int(seconds % 60) return f"{h:02d}:{m:02d}:{s:02d}" def main(): parser = argparse.ArgumentParser(description="Merge two transcription sources") parser.add_argument("primary", help="Primary source JSON (lavalier/Saramonic)") parser.add_argument("secondary", help="Secondary source JSON (room mic/H2n)") parser.add_argument("output_dir", help="Output directory") parser.add_argument("--offset", type=float, default=None, help="Time offset (seconds) to add to secondary timestamps. " "Auto-detected if not specified.") parser.add_argument("--gap-threshold", type=float, default=3.0, help="Minimum gap in primary to insert secondary (default: 3.0)") parser.add_argument("--sim-threshold", type=float, default=0.3, help="Similarity threshold below which secondary is unique (default: 0.3)") args = parser.parse_args() print(f"Primary: {args.primary}") print(f"Secondary: {args.secondary}") primary_segs, primary_dur = load_segments(args.primary) secondary_segs, secondary_dur = load_segments(args.secondary) print(f"Primary: {len(primary_segs)} segments, {primary_dur:.0f}s") print(f"Secondary: {len(secondary_segs)} segments, {secondary_dur:.0f}s") # Estimate or use provided offset if args.offset is not None: offset = args.offset print(f"Using provided offset: {offset:+.1f}s") else: print("Estimating time offset...") offset = estimate_offset(primary_segs, secondary_segs) print(f"Estimated offset: {offset:+.1f}s (secondary is {abs(offset):.0f}s " f"{'behind' if offset > 0 else 'ahead of'} primary)") # Merge merged, inserted = merge( primary_segs, secondary_segs, offset=offset, gap_threshold=args.gap_threshold, sim_threshold=args.sim_threshold, ) print(f"Merged: {len(merged)} segments ({len(primary_segs)} primary + {inserted} from secondary)") # Write outputs os.makedirs(args.output_dir, exist_ok=True) # JSON json_path = os.path.join(args.output_dir, "merged.json") with open(json_path, "w") as f: json.dump({"segments": merged, "offset": offset}, f, ensure_ascii=False, indent=2) print(f"Written: {json_path}") # Timestamped text txt_path = os.path.join(args.output_dir, "merged.txt") with open(txt_path, "w") as f: for seg in merged: tag = "" if seg["source"] == "primary" else " [аудитория]" f.write(f"[{format_timestamp(seg['start'])}]{tag} {seg['text']}\n") print(f"Written: {txt_path}") # Plain text plain_path = os.path.join(args.output_dir, "merged_plain.txt") with open(plain_path, "w") as f: f.write(" ".join(seg["text"] for seg in merged)) print(f"Written: {plain_path}") if __name__ == "__main__": main()