Make scripts better
This commit is contained in:
parent
ba56147e95
commit
e8ad7df469
12 changed files with 614 additions and 432 deletions
|
|
@ -1,25 +1,241 @@
|
|||
#!/usr/bin/env python3
|
||||
"""Объединение транскрипций из нескольких файлов."""
|
||||
"""
|
||||
Merge two transcription sources by timestamps.
|
||||
|
||||
Primary source (e.g., lavalier mic / Saramonic) — better quality for main speaker.
|
||||
Secondary source (e.g., room mic / H2n XY) — captures audience/student voices.
|
||||
|
||||
Strategy:
|
||||
1. Both sources have timestamped segments from Whisper.
|
||||
2. For each secondary segment, check if primary has a similar segment at the same time.
|
||||
3. If primary has coverage (overlapping segment exists) — keep primary's version.
|
||||
4. If primary has NO coverage (gap) — insert secondary segment, tagged as [audience].
|
||||
5. Time alignment: the two recordings may have different start times.
|
||||
We detect the offset by cross-correlating the first few segments' text.
|
||||
|
||||
Usage:
|
||||
python3 merge_transcriptions.py <primary.json> <secondary.json> <output_dir> [--offset SECONDS]
|
||||
|
||||
Output:
|
||||
<output_dir>/merged.json — combined segments with source tags
|
||||
<output_dir>/merged.txt — timestamped text
|
||||
<output_dir>/merged_plain.txt — plain text for LLM processing
|
||||
"""
|
||||
|
||||
import json
|
||||
import sys
|
||||
import os
|
||||
import argparse
|
||||
from difflib import SequenceMatcher
|
||||
|
||||
def merge_transcriptions(timeline_dir, output_path="merged_plain.txt"):
|
||||
"""Собирает все .txt файлы в один."""
|
||||
txt_files = sorted([f for f in os.listdir(timeline_dir) if f.endswith('.txt') and 'merged' not in f])
|
||||
|
||||
|
||||
def load_segments(json_path):
|
||||
"""Load segments from whisper JSON output."""
|
||||
with open(json_path) as f:
|
||||
data = json.load(f)
|
||||
segments = []
|
||||
for seg in data.get("segments", []):
|
||||
segments.append({
|
||||
"start": seg.get("start", 0),
|
||||
"end": seg.get("end", 0),
|
||||
"text": seg.get("text", "").strip(),
|
||||
})
|
||||
return segments, data.get("duration", 0)
|
||||
|
||||
|
||||
def estimate_offset(primary_segs, secondary_segs, search_window=120):
|
||||
"""
|
||||
Estimate time offset between two recordings.
|
||||
Returns offset such that: secondary_time + offset ≈ primary_time
|
||||
Uses text similarity of segments within search window.
|
||||
"""
|
||||
if not primary_segs or not secondary_segs:
|
||||
return 0.0
|
||||
|
||||
best_offset = 0.0
|
||||
best_score = 0.0
|
||||
|
||||
# Try offsets in 1-second steps within search window
|
||||
for offset_int in range(-search_window, search_window + 1):
|
||||
offset = float(offset_int)
|
||||
score = 0.0
|
||||
comparisons = 0
|
||||
|
||||
for p_seg in primary_segs[:30]: # check first 30 primary segments
|
||||
p_mid = (p_seg["start"] + p_seg["end"]) / 2
|
||||
# Find closest secondary segment at (p_mid - offset)
|
||||
target_time = p_mid - offset
|
||||
best_match = None
|
||||
best_dist = float("inf")
|
||||
|
||||
for s_seg in secondary_segs[:40]:
|
||||
s_mid = (s_seg["start"] + s_seg["end"]) / 2
|
||||
dist = abs(s_mid - target_time)
|
||||
if dist < best_dist:
|
||||
best_dist = dist
|
||||
best_match = s_seg
|
||||
|
||||
if best_match and best_dist < 15: # within 15 seconds
|
||||
sim = SequenceMatcher(
|
||||
None, p_seg["text"].lower(), best_match["text"].lower()
|
||||
).ratio()
|
||||
score += sim
|
||||
comparisons += 1
|
||||
|
||||
if comparisons > 0:
|
||||
score /= comparisons
|
||||
|
||||
if score > best_score:
|
||||
best_score = score
|
||||
best_offset = offset
|
||||
|
||||
return best_offset
|
||||
|
||||
|
||||
def merge(primary_segs, secondary_segs, offset=0.0, gap_threshold=3.0, sim_threshold=0.3):
|
||||
"""
|
||||
Merge primary and secondary segments.
|
||||
|
||||
Args:
|
||||
primary_segs: segments from primary source (lavalier)
|
||||
secondary_segs: segments from secondary source (room mic)
|
||||
offset: time offset to add to secondary timestamps to align with primary
|
||||
gap_threshold: minimum gap (seconds) in primary to consider inserting secondary
|
||||
sim_threshold: below this similarity, secondary segment is considered unique content
|
||||
"""
|
||||
merged = []
|
||||
for txt_file in txt_files:
|
||||
with open(os.path.join(timeline_dir, txt_file), 'r', encoding='utf-8') as f:
|
||||
content = f.read().strip()
|
||||
if content:
|
||||
merged.append(f"--- {txt_file} ---\n{content}\n")
|
||||
|
||||
with open(os.path.join(timeline_dir, output_path), 'w', encoding='utf-8') as f:
|
||||
f.write('\n\n'.join(merged))
|
||||
|
||||
print(f"Merged {len(txt_files)} files into {output_path}")
|
||||
|
||||
# Add source tag to primary segments
|
||||
for seg in primary_segs:
|
||||
merged.append({
|
||||
**seg,
|
||||
"source": "primary",
|
||||
})
|
||||
|
||||
# Build primary timeline: list of (start, end) intervals
|
||||
primary_intervals = [(s["start"], s["end"]) for s in primary_segs]
|
||||
|
||||
def primary_covers(t_start, t_end):
|
||||
"""Check if primary has any segment overlapping [t_start, t_end]."""
|
||||
for p_start, p_end in primary_intervals:
|
||||
if p_start <= t_end and p_end >= t_start:
|
||||
return True
|
||||
return False
|
||||
|
||||
def find_similar_primary(text, t_start, t_end, window=10):
|
||||
"""Find most similar primary segment near the given time."""
|
||||
best_sim = 0.0
|
||||
for seg in primary_segs:
|
||||
if abs(seg["start"] - t_start) > window and abs(seg["end"] - t_end) > window:
|
||||
continue
|
||||
sim = SequenceMatcher(None, text.lower(), seg["text"].lower()).ratio()
|
||||
if sim > best_sim:
|
||||
best_sim = sim
|
||||
return best_sim
|
||||
|
||||
# Check each secondary segment
|
||||
inserted = 0
|
||||
for seg in secondary_segs:
|
||||
adj_start = seg["start"] + offset
|
||||
adj_end = seg["end"] + offset
|
||||
text = seg["text"]
|
||||
|
||||
if not text:
|
||||
continue
|
||||
|
||||
# Check if primary already covers this time range
|
||||
if primary_covers(adj_start, adj_end):
|
||||
# Primary has something here — check if it's the same content
|
||||
sim = find_similar_primary(text, adj_start, adj_end)
|
||||
if sim >= sim_threshold:
|
||||
continue # primary already has this, skip
|
||||
|
||||
# This segment is unique to secondary (likely audience voice)
|
||||
merged.append({
|
||||
"start": round(adj_start, 2),
|
||||
"end": round(adj_end, 2),
|
||||
"text": text,
|
||||
"source": "secondary",
|
||||
})
|
||||
inserted += 1
|
||||
|
||||
# Sort by start time
|
||||
merged.sort(key=lambda s: s["start"])
|
||||
|
||||
return merged, inserted
|
||||
|
||||
|
||||
def format_timestamp(seconds):
|
||||
h = int(seconds // 3600)
|
||||
m = int((seconds % 3600) // 60)
|
||||
s = int(seconds % 60)
|
||||
return f"{h:02d}:{m:02d}:{s:02d}"
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Merge two transcription sources")
|
||||
parser.add_argument("primary", help="Primary source JSON (lavalier/Saramonic)")
|
||||
parser.add_argument("secondary", help="Secondary source JSON (room mic/H2n)")
|
||||
parser.add_argument("output_dir", help="Output directory")
|
||||
parser.add_argument("--offset", type=float, default=None,
|
||||
help="Time offset (seconds) to add to secondary timestamps. "
|
||||
"Auto-detected if not specified.")
|
||||
parser.add_argument("--gap-threshold", type=float, default=3.0,
|
||||
help="Minimum gap in primary to insert secondary (default: 3.0)")
|
||||
parser.add_argument("--sim-threshold", type=float, default=0.3,
|
||||
help="Similarity threshold below which secondary is unique (default: 0.3)")
|
||||
args = parser.parse_args()
|
||||
|
||||
print(f"Primary: {args.primary}")
|
||||
print(f"Secondary: {args.secondary}")
|
||||
|
||||
primary_segs, primary_dur = load_segments(args.primary)
|
||||
secondary_segs, secondary_dur = load_segments(args.secondary)
|
||||
print(f"Primary: {len(primary_segs)} segments, {primary_dur:.0f}s")
|
||||
print(f"Secondary: {len(secondary_segs)} segments, {secondary_dur:.0f}s")
|
||||
|
||||
# Estimate or use provided offset
|
||||
if args.offset is not None:
|
||||
offset = args.offset
|
||||
print(f"Using provided offset: {offset:+.1f}s")
|
||||
else:
|
||||
print("Estimating time offset...")
|
||||
offset = estimate_offset(primary_segs, secondary_segs)
|
||||
print(f"Estimated offset: {offset:+.1f}s (secondary is {abs(offset):.0f}s "
|
||||
f"{'behind' if offset > 0 else 'ahead of'} primary)")
|
||||
|
||||
# Merge
|
||||
merged, inserted = merge(
|
||||
primary_segs, secondary_segs,
|
||||
offset=offset,
|
||||
gap_threshold=args.gap_threshold,
|
||||
sim_threshold=args.sim_threshold,
|
||||
)
|
||||
print(f"Merged: {len(merged)} segments ({len(primary_segs)} primary + {inserted} from secondary)")
|
||||
|
||||
# Write outputs
|
||||
os.makedirs(args.output_dir, exist_ok=True)
|
||||
|
||||
# JSON
|
||||
json_path = os.path.join(args.output_dir, "merged.json")
|
||||
with open(json_path, "w") as f:
|
||||
json.dump({"segments": merged, "offset": offset}, f, ensure_ascii=False, indent=2)
|
||||
print(f"Written: {json_path}")
|
||||
|
||||
# Timestamped text
|
||||
txt_path = os.path.join(args.output_dir, "merged.txt")
|
||||
with open(txt_path, "w") as f:
|
||||
for seg in merged:
|
||||
tag = "" if seg["source"] == "primary" else " [аудитория]"
|
||||
f.write(f"[{format_timestamp(seg['start'])}]{tag} {seg['text']}\n")
|
||||
print(f"Written: {txt_path}")
|
||||
|
||||
# Plain text
|
||||
plain_path = os.path.join(args.output_dir, "merged_plain.txt")
|
||||
with open(plain_path, "w") as f:
|
||||
f.write(" ".join(seg["text"] for seg in merged))
|
||||
print(f"Written: {plain_path}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
dir_path = sys.argv[1] if len(sys.argv) > 1 else "transcription"
|
||||
merge_transcriptions(dir_path)
|
||||
main()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue