""" recognition.py — модуль распознавания аудиофайла с ответами ученика ЕГЭ (аудирование, английский язык). Зависимости: pip install faster-whisper Использование: from recognition import transcribe, extract_answers # Полный pipeline: аудио -> транскрипт -> структурированные ответы result = transcribe("student_answers.mp3") answers = extract_answers(result.text) print(answers) """ from __future__ import annotations import re import logging from dataclasses import dataclass, field from pathlib import Path logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Константы # --------------------------------------------------------------------------- # Модели faster-whisper по убыванию скорости / возрастанию качества: # tiny, base, small, medium, large-v2, large-v3 DEFAULT_MODEL = "medium" # Подсказка для Whisper — описывает формат ответов ученика. # Критически важна для правильного распознавания "one/two/three" как цифр # и "A equals 3" как ответа на задание 1. WHISPER_PROMPT = ( "Student answers to EGE English listening exam. " "Task one matching: speaker A answer three, speaker B answer one, " "speaker C answer five, speaker D answer seven, speaker E answer two, speaker F answer four. " "Tasks two through nine True False Not Stated: " "task two true, task three false, task four not stated. " "Tasks ten through eighteen multiple choice one two or three: " "task ten two, task eleven one, task twelve three." ) # --------------------------------------------------------------------------- # Структуры данных # --------------------------------------------------------------------------- @dataclass class TranscriptResult: """Результат транскрипции аудиофайла.""" text: str # Полный текст транскрипта language: str # Определённый язык ("en") duration_seconds: float # Длительность аудио в секундах segments: list[dict] = field(default_factory=list) # Детальные сегменты с таймкодами model_used: str = DEFAULT_MODEL @dataclass class StudentAnswers: """Структурированные ответы ученика, извлечённые из транскрипта.""" # Задание 1: соответствие A–F → цифра 1–7 task1: dict[str, str] = field(default_factory=dict) # {"A": "3", "B": "1", ...} # Задания 2–9: True(1) / False(2) / Not Stated(3) task2_9: dict[int, str] = field(default_factory=dict) # {2: "1", 3: "2", ...} # Задания 10–18: выбор из вариантов 1/2/3 task10_18: dict[int, str] = field(default_factory=dict) # {10: "2", 11: "1", ...} # Задания, которые не удалось распознать unrecognized: list[str] = field(default_factory=list) def to_dict(self) -> dict: return { "task1": self.task1, "task2_9": {str(k): v for k, v in self.task2_9.items()}, "task10_18": {str(k): v for k, v in self.task10_18.items()}, "unrecognized": self.unrecognized, } def summary(self) -> str: """Читаемое представление для вывода агенту / в лог.""" lines = ["=== Распознанные ответы ученика ==="] if self.task1: lines.append("\nЗадание 1 (соответствие):") for letter in "ABCDEF": ans = self.task1.get(letter, "?") lines.append(f" {letter} → {ans}") if self.task2_9: lines.append("\nЗадания 2–9 (True/False/Not Stated):") labels = {"1": "True", "2": "False", "3": "Not Stated"} for task_num in range(2, 10): ans = self.task2_9.get(task_num, "?") label = labels.get(ans, ans) lines.append(f" Задание {task_num}: {ans} ({label})") if self.task10_18: lines.append("\nЗадания 10–18 (выбор):") for task_num in range(10, 19): ans = self.task10_18.get(task_num, "?") lines.append(f" Задание {task_num}: {ans}") if self.unrecognized: lines.append(f"\nНе распознано: {', '.join(self.unrecognized)}") return "\n".join(lines) # --------------------------------------------------------------------------- # Транскрипция # --------------------------------------------------------------------------- def transcribe( audio_path: str | Path, model_size: str = DEFAULT_MODEL, device: str = "auto", compute_type: str = "auto", language: str = "en", beam_size: int = 5, ) -> TranscriptResult: """ Транскрибирует аудиофайл с ответами ученика. Args: audio_path: Путь к аудиофайлу (MP3, WAV, M4A, OGG, WEBM, FLAC). model_size: Размер модели Whisper: tiny/base/small/medium/large-v2/large-v3. medium — хороший баланс скорость/качество для ЕГЭ. large-v3 — максимальное качество, медленнее. device: "auto" | "cpu" | "cuda". "auto" выберет GPU если доступен. compute_type: "auto" | "int8" | "float16" | "float32". "auto" подберёт оптимальный тип для устройства. language: Язык аудио. "en" для ответов на английском. beam_size: Ширина луча beam search. 5 — стандарт, выше = точнее но медленнее. Returns: TranscriptResult с текстом, языком, длительностью и сегментами. Raises: FileNotFoundError: Если аудиофайл не найден. RuntimeError: Если faster-whisper не установлен. """ try: from faster_whisper import WhisperModel except ImportError: raise RuntimeError( "faster-whisper не установлен. Установите: pip install faster-whisper" ) audio_path = Path(audio_path) if not audio_path.exists(): raise FileNotFoundError(f"Аудиофайл не найден: {audio_path}") # Автовыбор устройства и типа вычислений resolved_device, resolved_compute = _resolve_device(device, compute_type) logger.info( "Загрузка модели %s на %s (%s)...", model_size, resolved_device, resolved_compute ) model = WhisperModel( model_size, device=resolved_device, compute_type=resolved_compute, ) logger.info("Транскрибирую: %s", audio_path.name) segments_gen, info = model.transcribe( str(audio_path), language=language, beam_size=beam_size, initial_prompt=WHISPER_PROMPT, word_timestamps=False, vad_filter=True, # Фильтрация тишины — полезно для записей с паузами vad_parameters={ "min_silence_duration_ms": 500, # Паузы >0.5с считаются тишиной "speech_pad_ms": 200, }, ) # Материализуем генератор сегментов segments = [] full_text_parts = [] for seg in segments_gen: segments.append({ "start": round(seg.start, 2), "end": round(seg.end, 2), "text": seg.text.strip(), }) full_text_parts.append(seg.text.strip()) full_text = " ".join(full_text_parts) logger.info( "Транскрипция завершена. Длительность: %.1f сек, слов ~%d", info.duration, len(full_text.split()) ) return TranscriptResult( text=full_text, language=info.language, duration_seconds=round(info.duration, 1), segments=segments, model_used=model_size, ) def _resolve_device(device: str, compute_type: str) -> tuple[str, str]: """Определяет оптимальное устройство и тип вычислений.""" if device != "auto" and compute_type != "auto": return device, compute_type # Проверяем наличие CUDA try: from torch import cuda has_cuda = cuda.is_available() except ImportError: has_cuda = False if device == "auto": device = "cuda" if has_cuda else "cpu" if compute_type == "auto": if device == "cuda": compute_type = "float16" # GPU: float16 быстрее и точнее чем int8 else: compute_type = "int8" # CPU: int8 значительно быстрее float32 return device, compute_type # --------------------------------------------------------------------------- # Извлечение ответов из транскрипта # --------------------------------------------------------------------------- def extract_answers(transcript_text: str) -> StudentAnswers: """ Извлекает структурированные ответы из текста транскрипта. Обрабатывает разные форматы речи ученика: - "A three", "speaker A answer three", "A equals 3", "A — три" - "task two true", "number two true", "two — true", "2 true" - "task ten two", "ten — 2", "question ten answer two" Args: transcript_text: Текст транскрипта от Whisper. Returns: StudentAnswers со структурированными ответами. """ answers = StudentAnswers() text = transcript_text.lower().strip() _extract_task1(text, answers) _extract_task2_9(text, answers) _extract_task10_18(text, answers) logger.debug("Извлечено ответов: task1=%d, task2_9=%d, task10_18=%d, нераспознано=%d", len(answers.task1), len(answers.task2_9), len(answers.task10_18), len(answers.unrecognized)) return answers def _extract_task1(text: str, answers: StudentAnswers) -> None: """ Задание 1: соответствие A–F → цифра 1–7. Примеры: "A three", "speaker A answer 3", "A equals three", "A — 3" """ # Числа словами → цифры word_to_digit = { "one": "1", "two": "2", "three": "3", "four": "4", "five": "5", "six": "6", "seven": "7", # На случай русских ответов "один": "1", "два": "2", "три": "3", "четыре": "4", "пять": "5", "шесть": "6", "семь": "7", } for letter in "abcdef": # Паттерн: буква, затем необязательный разделитель, затем цифра/слово pattern = ( rf"(?:speaker\s+)?{letter}" rf"(?:\s+(?:answer|equals|is|—|-|:))?" rf"\s+" rf"({_digit_or_word_pattern(1, 7)})" ) match = re.search(pattern, text) if match: raw = match.group(1).strip() digit = word_to_digit.get(raw, raw) if not raw.isdigit() else raw if digit in [str(i) for i in range(1, 8)]: answers.task1[letter.upper()] = digit else: answers.unrecognized.append(f"1{letter.upper()}") else: answers.unrecognized.append(f"1{letter.upper()}") def _extract_task2_9(text: str, answers: StudentAnswers) -> None: """ Задания 2–9: True(1) / False(2) / Not Stated(3). Примеры: "task two true", "number 3 false", "four not stated", "5 — 2" """ tfs_map = { "true": "1", "1": "1", "false": "2", "2": "2", "not stated": "3", "not_stated": "3", "3": "3", } num_words = { "two": 2, "three": 3, "four": 4, "five": 5, "six": 6, "seven": 7, "eight": 8, "nine": 9, "2": 2, "3": 3, "4": 4, "5": 5, "6": 6, "7": 7, "8": 8, "9": 9, } for word, num in num_words.items(): pattern = ( rf"(?:task|number|question|задание)?\s*{re.escape(word)}" rf"(?:\s+(?:is|answer|—|-|:))?" rf"\s+" rf"(true|false|not\s+stated|not_stated|[123])" ) match = re.search(pattern, text) if match: raw = match.group(1).strip().replace(" ", "_") digit = tfs_map.get(raw) or tfs_map.get(raw.replace("_", " ")) if digit: answers.task2_9[num] = digit else: answers.unrecognized.append(str(num)) else: answers.unrecognized.append(str(num)) def _extract_task10_18(text: str, answers: StudentAnswers) -> None: """ Задания 10–18: выбор из вариантов 1/2/3. Примеры: "task ten two", "eleven — 1", "question 12 answer three" """ word_to_digit_choice = { "one": "1", "two": "2", "three": "3", "1": "1", "2": "2", "3": "3", } num_words = { "ten": 10, "eleven": 11, "twelve": 12, "thirteen": 13, "fourteen": 14, "fifteen": 15, "sixteen": 16, "seventeen": 17, "eighteen": 18, "10": 10, "11": 11, "12": 12, "13": 13, "14": 14, "15": 15, "16": 16, "17": 17, "18": 18, } for word, num in num_words.items(): pattern = ( rf"(?:task|number|question|задание)?\s*{re.escape(word)}" rf"(?:\s+(?:is|answer|—|-|:))?" rf"\s+" rf"({_digit_or_word_pattern(1, 3)})" ) match = re.search(pattern, text) if match: raw = match.group(1).strip() digit = word_to_digit_choice.get(raw) if digit: answers.task10_18[num] = digit else: answers.unrecognized.append(str(num)) else: answers.unrecognized.append(str(num)) def _digit_or_word_pattern(min_val: int, max_val: int) -> str: """Строит regex-паттерн для диапазона цифр и их словесных форм.""" digits = [str(i) for i in range(min_val, max_val + 1)] words = { 1: "one", 2: "two", 3: "three", 4: "four", 5: "five", 6: "six", 7: "seven", 8: "eight", 9: "nine", 10: "ten", 11: "eleven", 12: "twelve", 13: "thirteen", 14: "fourteen", 15: "fifteen", 16: "sixteen", 17: "seventeen", 18: "eighteen", } options = digits + [words[i] for i in range(min_val, max_val + 1) if i in words] return "(" + "|".join(sorted(options, key=len, reverse=True)) + ")" # --------------------------------------------------------------------------- # Удобный pipeline # --------------------------------------------------------------------------- def process_audio( audio_path: str | Path, model_size: str = DEFAULT_MODEL, device: str = "auto", verbose: bool = False, ) -> tuple[TranscriptResult, StudentAnswers]: """ Полный pipeline: аудиофайл → транскрипт → структурированные ответы. Args: audio_path: Путь к аудиофайлу. model_size: Размер модели Whisper (medium по умолчанию). device: "auto" | "cpu" | "cuda". verbose: Если True — выводить промежуточные результаты в stdout. Returns: Кортеж (TranscriptResult, StudentAnswers). Example: result, answers = process_audio("student.mp3") print(answers.summary()) # Передать answers.to_dict() агенту для сверки с ключами """ if verbose: print(f"[1/2] Транскрибирую {Path(audio_path).name} (модель: {model_size})...") transcript = transcribe(audio_path, model_size=model_size, device=device) if verbose: print(f" Длительность: {transcript.duration_seconds} сек") print(f" Транскрипт: {transcript.text[:120]}...") print("[2/2] Извлекаю ответы...") answers = extract_answers(transcript.text) if verbose: print(answers.summary()) return transcript, answers # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- if __name__ == "__main__": import argparse import json import sys logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") parser = argparse.ArgumentParser( description="Распознавание аудиоответов ЕГЭ (аудирование, английский язык)" ) parser.add_argument("audio", help="Путь к аудиофайлу") parser.add_argument( "--model", default=DEFAULT_MODEL, choices=["tiny", "base", "small", "medium", "large-v2", "large-v3"], help=f"Размер модели Whisper (по умолчанию: {DEFAULT_MODEL})" ) parser.add_argument( "--device", default="auto", choices=["auto", "cpu", "cuda"], help="Устройство для инференса (по умолчанию: auto)" ) parser.add_argument( "--output", choices=["summary", "json", "transcript"], default="summary", help="Формат вывода: summary (читаемый), json (машинный), transcript (сырой текст)" ) parser.add_argument( "--transcript-only", action="store_true", help="Только транскрипция без извлечения ответов" ) args = parser.parse_args() try: if args.transcript_only or args.output == "transcript": result = transcribe(args.audio, model_size=args.model, device=args.device) print(result.text) else: result, answers = process_audio( args.audio, model_size=args.model, device=args.device, verbose=(args.output == "summary"), ) if args.output == "json": output = { "transcript": result.text, "language": result.language, "duration_seconds": result.duration_seconds, "model_used": result.model_used, "answers": answers.to_dict(), } print(json.dumps(output, ensure_ascii=False, indent=2)) elif args.output == "summary": # verbose=True уже вывел всё в process_audio pass except (FileNotFoundError, RuntimeError) as e: print(f"Ошибка: {e}", file=sys.stderr) sys.exit(1)