ege-skill/check-with-source.py

203 lines
7.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import base64
import asyncio
import time
import io
from dotenv import load_dotenv
from openai import AsyncOpenAI
import httpx
from PIL import Image
load_dotenv()
MODEL_NAME = "qwen3.5-122b"
# Таймаут уменьшен до 300 секунд (5 минут)
client = AsyncOpenAI(
api_key=os.getenv("QWEN_API_KEY"),
base_url=os.getenv("QWEN_BASE_URL"),
http_client=httpx.AsyncClient(timeout=httpx.Timeout(300.0, connect=60.0))
)
def encode_image(image_path):
"""Оптимизированная функция с агрессивным сжатием для TIFF"""
with Image.open(image_path) as img:
# Конвертация CMYK в RGB (часто в TIFF)
if img.mode == 'CMYK':
img = img.convert('RGB')
elif img.mode not in ('RGB', 'L'):
img = img.convert('RGB')
# Уменьшаем до 800px (достаточно для OCR)
img.thumbnail((1600, 1600))
buffer = io.BytesIO()
# Агрессивное сжатие для скорости
if image_path.lower().endswith(('.tif', '.tiff')):
# Для TIFF - максимальное сжатие
img.save(buffer, format='JPEG', quality=45,
optimize=True, progressive=False)
else:
# Для остальных - умеренное сжатие
img.save(buffer, format='JPEG', quality=80, optimize=True)
# Отладочная информация
size_mb = len(buffer.getvalue()) / (1024 * 1024)
print(f" [SIZE] {os.path.basename(image_path)}: {size_mb:.2f} MB")
return base64.b64encode(buffer.getvalue()).decode('utf-8')
def get_instructions():
with open("SKILL.md", "r", encoding="utf-8") as f:
skill = f.read()
c_path = os.path.join("references", "russian-essay-criteria.md")
with open(c_path, "r", encoding="utf-8") as f:
criteria = f.read()
return f"{skill}\n\n{criteria}"
async def process_priority_student(student, student_path, instructions):
source_path = os.path.join(student_path, "source.txt")
safe_model_name = MODEL_NAME.replace("/", "_")
output_filename = f"REPORT_{safe_model_name}_WITH_SOURCE.md"
output_path = os.path.join(student_path, output_filename)
if not os.path.exists(source_path):
print(f" [-] {student}: source.txt не найден. Пропуск.")
return False
# ✅ ПРОВЕРКА СУЩЕСТВУЮЩИХ ОТЧЕТОВ
existing_reports = []
for f in os.listdir(student_path):
if f.startswith("REPORT_") and safe_model_name in f:
existing_reports.append(f)
if existing_reports:
print(
f" [-] {student}: Отчет уже существует ({existing_reports[0]}). Пропуск.")
return False
allowed_ext = ('.jpg', '.jpeg', '.png', '.tif', '.tiff')
photos = sorted([f for f in os.listdir(student_path)
if f.lower().endswith(allowed_ext)])
if not photos:
print(f" [-] {student}: Нет фото. Пропуск.")
return False
print(f"\n{'-'*40}")
print(f"[START] {student.upper()} | Фото: {len(photos)} шт.")
print(f"{'-'*40}")
with open(source_path, "r", encoding="utf-8") as f:
source_text = f.read()
# Оригинальный промпт без изменений
prompt = (
"Распознай текст и проверь сочинение строго по критериям ФИПИ.\n"
"Используй предоставленный ИСХОДНЫЙ ТЕКСТ для сверки фактов и К1-К2.\n\n"
f"ИСХОДНЫЙ ТЕКСТ:\n{source_text}"
)
message_content = [{"type": "text", "text": prompt}]
# Ограничиваем количество фото до 5 для скорости
max_photos = 8
if len(photos) > max_photos:
print(f" [WARN] Много фото ({len(photos)}). Беру первые {max_photos}")
photos = photos[:max_photos]
for p in photos:
print(f" [LOG] Кодирую {p}...")
try:
encoded = encode_image(os.path.join(student_path, p))
message_content.append({
"type": "image_url",
"image_url": {"url": f"data:image/jpeg;base64,{encoded}"}
})
except Exception as e:
print(f" [ERR] Ошибка при кодировании {p}: {e}")
continue
try:
print(f" [WAIT] Запрос отправлен. Ждем полный разбор...")
start_api = time.time()
response = await client.chat.completions.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": instructions},
{"role": "user", "content": message_content}
],
temperature=0.1,
)
res_text = response.choices[0].message.content
api_duration = round(time.time() - start_api, 1)
if not res_text or res_text.strip() == "":
print(
f" [!!!] Сервер вернул пустой ответ спустя {api_duration}с.")
res_text = f"ОШИБКА: Сервер прервал генерацию спустя {api_duration} секунд."
print(
f" [TIME] Получено! Время: {api_duration}с | Символов: {len(res_text)}")
with open(output_path, "w", encoding="utf-8") as f:
f.write(f"""---
**Ученик:** {student}
**Время API:** {api_duration}с
**Фото:** {len(photos)} шт.
---
{res_text}""")
print(f"[OK] Отчет готов.")
return True # ✅ Был запрос к API
except asyncio.TimeoutError:
print(f" [ERR] Таймаут запроса (>300 секунд)")
return True # ✅ Ошибка тоже считается за обработку
except Exception as e:
print(f" [ERR] Произошла ошибка: {str(e)}")
return True # ✅ Ошибка тоже считается за обработку
async def main():
base_dir = "photo"
if not os.path.exists(base_dir):
print(f"Ошибка: папка '{base_dir}' не найдена!")
return
instructions = get_instructions()
students = [d for d in os.listdir(base_dir)
if os.path.isdir(os.path.join(base_dir, d))]
if not students:
print(f"Нет папок с учениками в '{base_dir}'")
return
print(f"\nНайдено учеников: {len(students)}")
print("="*50)
for i, student in enumerate(students, 1):
print(f"\n[{i}/{len(students)}]")
was_processed = await process_priority_student(student, os.path.join(base_dir, student), instructions)
# ✅ ПАУЗА ТОЛЬКО ЕСЛИ БЫЛ ЗАПРОС К API
if i < len(students):
if was_processed:
print(f"\n[PAUSE] Жду 3 секунды...")
await asyncio.sleep(3)
else:
print(f" [SKIP] Пропущен, пауза не нужна")
print("\n" + "="*50)
print("ГОТОВО!")
if __name__ == "__main__":
asyncio.run(main())