diff --git a/recognition.py b/recognition.py new file mode 100644 index 0000000..646ca40 --- /dev/null +++ b/recognition.py @@ -0,0 +1,503 @@ +""" +recognition.py — модуль распознавания аудиофайла с ответами ученика ЕГЭ (аудирование, английский язык). + +Зависимости: + pip install faster-whisper + +Использование: + from recognition import transcribe, extract_answers + + # Полный pipeline: аудио -> транскрипт -> структурированные ответы + result = transcribe("student_answers.mp3") + answers = extract_answers(result.text) + print(answers) +""" + +from __future__ import annotations + +import re +import logging +from dataclasses import dataclass, field +from pathlib import Path + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Константы +# --------------------------------------------------------------------------- + +# Модели faster-whisper по убыванию скорости / возрастанию качества: +# tiny, base, small, medium, large-v2, large-v3 +DEFAULT_MODEL = "medium" + +# Подсказка для Whisper — описывает формат ответов ученика. +# Критически важна для правильного распознавания "one/two/three" как цифр +# и "A equals 3" как ответа на задание 1. +WHISPER_PROMPT = ( + "Student answers to EGE English listening exam. " + "Task one matching: speaker A answer three, speaker B answer one, " + "speaker C answer five, speaker D answer seven, speaker E answer two, speaker F answer four. " + "Tasks two through nine True False Not Stated: " + "task two true, task three false, task four not stated. " + "Tasks ten through eighteen multiple choice one two or three: " + "task ten two, task eleven one, task twelve three." +) + + +# --------------------------------------------------------------------------- +# Структуры данных +# --------------------------------------------------------------------------- + +@dataclass +class TranscriptResult: + """Результат транскрипции аудиофайла.""" + text: str # Полный текст транскрипта + language: str # Определённый язык ("en") + duration_seconds: float # Длительность аудио в секундах + segments: list[dict] = field(default_factory=list) # Детальные сегменты с таймкодами + model_used: str = DEFAULT_MODEL + + +@dataclass +class StudentAnswers: + """Структурированные ответы ученика, извлечённые из транскрипта.""" + + # Задание 1: соответствие A–F → цифра 1–7 + task1: dict[str, str] = field(default_factory=dict) # {"A": "3", "B": "1", ...} + + # Задания 2–9: True(1) / False(2) / Not Stated(3) + task2_9: dict[int, str] = field(default_factory=dict) # {2: "1", 3: "2", ...} + + # Задания 10–18: выбор из вариантов 1/2/3 + task10_18: dict[int, str] = field(default_factory=dict) # {10: "2", 11: "1", ...} + + # Задания, которые не удалось распознать + unrecognized: list[str] = field(default_factory=list) + + def to_dict(self) -> dict: + return { + "task1": self.task1, + "task2_9": {str(k): v for k, v in self.task2_9.items()}, + "task10_18": {str(k): v for k, v in self.task10_18.items()}, + "unrecognized": self.unrecognized, + } + + def summary(self) -> str: + """Читаемое представление для вывода агенту / в лог.""" + lines = ["=== Распознанные ответы ученика ==="] + + if self.task1: + lines.append("\nЗадание 1 (соответствие):") + for letter in "ABCDEF": + ans = self.task1.get(letter, "?") + lines.append(f" {letter} → {ans}") + + if self.task2_9: + lines.append("\nЗадания 2–9 (True/False/Not Stated):") + labels = {"1": "True", "2": "False", "3": "Not Stated"} + for task_num in range(2, 10): + ans = self.task2_9.get(task_num, "?") + label = labels.get(ans, ans) + lines.append(f" Задание {task_num}: {ans} ({label})") + + if self.task10_18: + lines.append("\nЗадания 10–18 (выбор):") + for task_num in range(10, 19): + ans = self.task10_18.get(task_num, "?") + lines.append(f" Задание {task_num}: {ans}") + + if self.unrecognized: + lines.append(f"\nНе распознано: {', '.join(self.unrecognized)}") + + return "\n".join(lines) + + +# --------------------------------------------------------------------------- +# Транскрипция +# --------------------------------------------------------------------------- + +def transcribe( + audio_path: str | Path, + model_size: str = DEFAULT_MODEL, + device: str = "auto", + compute_type: str = "auto", + language: str = "en", + beam_size: int = 5, +) -> TranscriptResult: + """ + Транскрибирует аудиофайл с ответами ученика. + + Args: + audio_path: Путь к аудиофайлу (MP3, WAV, M4A, OGG, WEBM, FLAC). + model_size: Размер модели Whisper: tiny/base/small/medium/large-v2/large-v3. + medium — хороший баланс скорость/качество для ЕГЭ. + large-v3 — максимальное качество, медленнее. + device: "auto" | "cpu" | "cuda". "auto" выберет GPU если доступен. + compute_type: "auto" | "int8" | "float16" | "float32". + "auto" подберёт оптимальный тип для устройства. + language: Язык аудио. "en" для ответов на английском. + beam_size: Ширина луча beam search. 5 — стандарт, выше = точнее но медленнее. + + Returns: + TranscriptResult с текстом, языком, длительностью и сегментами. + + Raises: + FileNotFoundError: Если аудиофайл не найден. + RuntimeError: Если faster-whisper не установлен. + """ + try: + from faster_whisper import WhisperModel + except ImportError: + raise RuntimeError( + "faster-whisper не установлен. Установите: pip install faster-whisper" + ) + + audio_path = Path(audio_path) + if not audio_path.exists(): + raise FileNotFoundError(f"Аудиофайл не найден: {audio_path}") + + # Автовыбор устройства и типа вычислений + resolved_device, resolved_compute = _resolve_device(device, compute_type) + + logger.info( + "Загрузка модели %s на %s (%s)...", + model_size, resolved_device, resolved_compute + ) + + model = WhisperModel( + model_size, + device=resolved_device, + compute_type=resolved_compute, + ) + + logger.info("Транскрибирую: %s", audio_path.name) + + segments_gen, info = model.transcribe( + str(audio_path), + language=language, + beam_size=beam_size, + initial_prompt=WHISPER_PROMPT, + word_timestamps=False, + vad_filter=True, # Фильтрация тишины — полезно для записей с паузами + vad_parameters={ + "min_silence_duration_ms": 500, # Паузы >0.5с считаются тишиной + "speech_pad_ms": 200, + }, + ) + + # Материализуем генератор сегментов + segments = [] + full_text_parts = [] + + for seg in segments_gen: + segments.append({ + "start": round(seg.start, 2), + "end": round(seg.end, 2), + "text": seg.text.strip(), + }) + full_text_parts.append(seg.text.strip()) + + full_text = " ".join(full_text_parts) + + logger.info( + "Транскрипция завершена. Длительность: %.1f сек, слов ~%d", + info.duration, len(full_text.split()) + ) + + return TranscriptResult( + text=full_text, + language=info.language, + duration_seconds=round(info.duration, 1), + segments=segments, + model_used=model_size, + ) + + +def _resolve_device(device: str, compute_type: str) -> tuple[str, str]: + """Определяет оптимальное устройство и тип вычислений.""" + if device != "auto" and compute_type != "auto": + return device, compute_type + + # Проверяем наличие CUDA + try: + from torch import cuda + has_cuda = cuda.is_available() + except ImportError: + has_cuda = False + + if device == "auto": + device = "cuda" if has_cuda else "cpu" + + if compute_type == "auto": + if device == "cuda": + compute_type = "float16" # GPU: float16 быстрее и точнее чем int8 + else: + compute_type = "int8" # CPU: int8 значительно быстрее float32 + + return device, compute_type + + +# --------------------------------------------------------------------------- +# Извлечение ответов из транскрипта +# --------------------------------------------------------------------------- + +def extract_answers(transcript_text: str) -> StudentAnswers: + """ + Извлекает структурированные ответы из текста транскрипта. + + Обрабатывает разные форматы речи ученика: + - "A three", "speaker A answer three", "A equals 3", "A — три" + - "task two true", "number two true", "two — true", "2 true" + - "task ten two", "ten — 2", "question ten answer two" + + Args: + transcript_text: Текст транскрипта от Whisper. + + Returns: + StudentAnswers со структурированными ответами. + """ + answers = StudentAnswers() + text = transcript_text.lower().strip() + + _extract_task1(text, answers) + _extract_task2_9(text, answers) + _extract_task10_18(text, answers) + + logger.debug("Извлечено ответов: task1=%d, task2_9=%d, task10_18=%d, нераспознано=%d", + len(answers.task1), len(answers.task2_9), + len(answers.task10_18), len(answers.unrecognized)) + + return answers + + +def _extract_task1(text: str, answers: StudentAnswers) -> None: + """ + Задание 1: соответствие A–F → цифра 1–7. + Примеры: "A three", "speaker A answer 3", "A equals three", "A — 3" + """ + # Числа словами → цифры + word_to_digit = { + "one": "1", "two": "2", "three": "3", "four": "4", + "five": "5", "six": "6", "seven": "7", + # На случай русских ответов + "один": "1", "два": "2", "три": "3", "четыре": "4", + "пять": "5", "шесть": "6", "семь": "7", + } + + for letter in "abcdef": + # Паттерн: буква, затем необязательный разделитель, затем цифра/слово + pattern = ( + rf"(?:speaker\s+)?{letter}" + rf"(?:\s+(?:answer|equals|is|—|-|:))?" + rf"\s+" + rf"({_digit_or_word_pattern(1, 7)})" + ) + match = re.search(pattern, text) + if match: + raw = match.group(1).strip() + digit = word_to_digit.get(raw, raw) if not raw.isdigit() else raw + if digit in [str(i) for i in range(1, 8)]: + answers.task1[letter.upper()] = digit + else: + answers.unrecognized.append(f"1{letter.upper()}") + else: + answers.unrecognized.append(f"1{letter.upper()}") + + +def _extract_task2_9(text: str, answers: StudentAnswers) -> None: + """ + Задания 2–9: True(1) / False(2) / Not Stated(3). + Примеры: "task two true", "number 3 false", "four not stated", "5 — 2" + """ + tfs_map = { + "true": "1", "1": "1", + "false": "2", "2": "2", + "not stated": "3", "not_stated": "3", "3": "3", + } + + num_words = { + "two": 2, "three": 3, "four": 4, "five": 5, + "six": 6, "seven": 7, "eight": 8, "nine": 9, + "2": 2, "3": 3, "4": 4, "5": 5, + "6": 6, "7": 7, "8": 8, "9": 9, + } + + for word, num in num_words.items(): + pattern = ( + rf"(?:task|number|question|задание)?\s*{re.escape(word)}" + rf"(?:\s+(?:is|answer|—|-|:))?" + rf"\s+" + rf"(true|false|not\s+stated|not_stated|[123])" + ) + match = re.search(pattern, text) + if match: + raw = match.group(1).strip().replace(" ", "_") + digit = tfs_map.get(raw) or tfs_map.get(raw.replace("_", " ")) + if digit: + answers.task2_9[num] = digit + else: + answers.unrecognized.append(str(num)) + else: + answers.unrecognized.append(str(num)) + + +def _extract_task10_18(text: str, answers: StudentAnswers) -> None: + """ + Задания 10–18: выбор из вариантов 1/2/3. + Примеры: "task ten two", "eleven — 1", "question 12 answer three" + """ + word_to_digit_choice = { + "one": "1", "two": "2", "three": "3", + "1": "1", "2": "2", "3": "3", + } + + num_words = { + "ten": 10, "eleven": 11, "twelve": 12, "thirteen": 13, + "fourteen": 14, "fifteen": 15, "sixteen": 16, + "seventeen": 17, "eighteen": 18, + "10": 10, "11": 11, "12": 12, "13": 13, + "14": 14, "15": 15, "16": 16, "17": 17, "18": 18, + } + + for word, num in num_words.items(): + pattern = ( + rf"(?:task|number|question|задание)?\s*{re.escape(word)}" + rf"(?:\s+(?:is|answer|—|-|:))?" + rf"\s+" + rf"({_digit_or_word_pattern(1, 3)})" + ) + match = re.search(pattern, text) + if match: + raw = match.group(1).strip() + digit = word_to_digit_choice.get(raw) + if digit: + answers.task10_18[num] = digit + else: + answers.unrecognized.append(str(num)) + else: + answers.unrecognized.append(str(num)) + + +def _digit_or_word_pattern(min_val: int, max_val: int) -> str: + """Строит regex-паттерн для диапазона цифр и их словесных форм.""" + digits = [str(i) for i in range(min_val, max_val + 1)] + words = { + 1: "one", 2: "two", 3: "three", 4: "four", 5: "five", + 6: "six", 7: "seven", 8: "eight", 9: "nine", 10: "ten", + 11: "eleven", 12: "twelve", 13: "thirteen", 14: "fourteen", + 15: "fifteen", 16: "sixteen", 17: "seventeen", 18: "eighteen", + } + options = digits + [words[i] for i in range(min_val, max_val + 1) if i in words] + return "(" + "|".join(sorted(options, key=len, reverse=True)) + ")" + + +# --------------------------------------------------------------------------- +# Удобный pipeline +# --------------------------------------------------------------------------- + +def process_audio( + audio_path: str | Path, + model_size: str = DEFAULT_MODEL, + device: str = "auto", + verbose: bool = False, +) -> tuple[TranscriptResult, StudentAnswers]: + """ + Полный pipeline: аудиофайл → транскрипт → структурированные ответы. + + Args: + audio_path: Путь к аудиофайлу. + model_size: Размер модели Whisper (medium по умолчанию). + device: "auto" | "cpu" | "cuda". + verbose: Если True — выводить промежуточные результаты в stdout. + + Returns: + Кортеж (TranscriptResult, StudentAnswers). + + Example: + result, answers = process_audio("student.mp3") + print(answers.summary()) + # Передать answers.to_dict() агенту для сверки с ключами + """ + if verbose: + print(f"[1/2] Транскрибирую {Path(audio_path).name} (модель: {model_size})...") + + transcript = transcribe(audio_path, model_size=model_size, device=device) + + if verbose: + print(f" Длительность: {transcript.duration_seconds} сек") + print(f" Транскрипт: {transcript.text[:120]}...") + print("[2/2] Извлекаю ответы...") + + answers = extract_answers(transcript.text) + + if verbose: + print(answers.summary()) + + return transcript, answers + + +# --------------------------------------------------------------------------- +# CLI +# --------------------------------------------------------------------------- + +if __name__ == "__main__": + import argparse + import json + import sys + + logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") + + parser = argparse.ArgumentParser( + description="Распознавание аудиоответов ЕГЭ (аудирование, английский язык)" + ) + parser.add_argument("audio", help="Путь к аудиофайлу") + parser.add_argument( + "--model", default=DEFAULT_MODEL, + choices=["tiny", "base", "small", "medium", "large-v2", "large-v3"], + help=f"Размер модели Whisper (по умолчанию: {DEFAULT_MODEL})" + ) + parser.add_argument( + "--device", default="auto", + choices=["auto", "cpu", "cuda"], + help="Устройство для инференса (по умолчанию: auto)" + ) + parser.add_argument( + "--output", choices=["summary", "json", "transcript"], + default="summary", + help="Формат вывода: summary (читаемый), json (машинный), transcript (сырой текст)" + ) + parser.add_argument( + "--transcript-only", action="store_true", + help="Только транскрипция без извлечения ответов" + ) + + args = parser.parse_args() + + try: + if args.transcript_only or args.output == "transcript": + result = transcribe(args.audio, model_size=args.model, device=args.device) + print(result.text) + else: + result, answers = process_audio( + args.audio, + model_size=args.model, + device=args.device, + verbose=(args.output == "summary"), + ) + + if args.output == "json": + output = { + "transcript": result.text, + "language": result.language, + "duration_seconds": result.duration_seconds, + "model_used": result.model_used, + "answers": answers.to_dict(), + } + print(json.dumps(output, ensure_ascii=False, indent=2)) + elif args.output == "summary": + # verbose=True уже вывел всё в process_audio + pass + + except (FileNotFoundError, RuntimeError) as e: + print(f"Ошибка: {e}", file=sys.stderr) + sys.exit(1)