506 lines
18 KiB
Python
506 lines
18 KiB
Python
#!/usr/bin/env python3
|
||
import asyncio
|
||
import os
|
||
import tempfile
|
||
import aiofiles
|
||
import time
|
||
import shutil
|
||
import subprocess
|
||
from weasyprint import HTML
|
||
import io
|
||
import json
|
||
from typing import Dict, Optional, Tuple
|
||
from dotenv import load_dotenv
|
||
|
||
import aiohttp
|
||
from nio import (
|
||
AsyncClient,
|
||
RoomMessageText,
|
||
RoomMessageImage,
|
||
RoomMessageAudio,
|
||
LoginResponse,
|
||
AsyncClientConfig,
|
||
ErrorResponse,
|
||
UploadResponse,
|
||
UploadError,
|
||
)
|
||
|
||
from faster_whisper import WhisperModel
|
||
|
||
load_dotenv()
|
||
|
||
HOMESERVER = os.getenv("HOMESERVER", "https://matrix.org")
|
||
USERNAME = os.getenv("MATRIX_USERNAME")
|
||
PASSWORD = os.getenv("PASSWORD")
|
||
ALLOWED_ROOMS = set(room.strip() for room in os.getenv("ALLOWED_ROOMS", "").split(",") if room.strip())
|
||
WHISPER_LANGUAGE = os.getenv("WHISPER_LANGUAGE", "ru")
|
||
WHISPER_MODEL = os.getenv("WHISPER_MODEL", "small")
|
||
|
||
# Qwen API
|
||
QWEN_API_KEY = os.getenv("QWEN_API_KEY")
|
||
QWEN_ENDPOINT = os.getenv("QWEN_ENDPOINT", "https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completions")
|
||
QWEN_MODEL = os.getenv("QWEN_MODEL", "qwen3.5-122b")
|
||
QWEN_PROMPT_TEMPLATE = ""
|
||
with open("base_prompt.txt", "r") as f:
|
||
QWEN_PROMPT_TEMPLATE += f.read()
|
||
|
||
QWEN_QUALITY_PROMPT_TEMPLATE = ""
|
||
with open("quality_prompt.txt", "r") as f:
|
||
QWEN_QUALITY_PROMPT_TEMPLATE += f.read()
|
||
|
||
TEMP_DIR = tempfile.gettempdir()
|
||
GROUPING_TIMEOUT = 15.0
|
||
|
||
client: AsyncClient = None
|
||
pending_by_conversation: Dict[Tuple[str, str], Dict] = {}
|
||
pending_by_event_id: Dict[str, Dict] = {}
|
||
|
||
whisper_model = None
|
||
|
||
|
||
def get_whisper_model():
|
||
global whisper_model
|
||
if whisper_model is None:
|
||
whisper_model = WhisperModel(WHISPER_MODEL, device="cpu", compute_type="int8")
|
||
print(f"Whisper модель {WHISPER_MODEL} загружена (faster-whisper).")
|
||
return whisper_model
|
||
|
||
|
||
def ffmpeg_available() -> bool:
|
||
found = shutil.which("ffmpeg") is not None
|
||
if not found:
|
||
print("[ERROR] ffmpeg не найден в системе. Установите ffmpeg и добавьте в PATH.")
|
||
return found
|
||
|
||
|
||
def get_file_extension(mimetype: str) -> str:
|
||
ext_map = {
|
||
"audio/ogg": ".ogg",
|
||
"audio/mpeg": ".mp3",
|
||
"audio/mp4": ".m4a",
|
||
"audio/x-m4a": ".m4a",
|
||
"audio/wav": ".wav",
|
||
"audio/webm": ".webm",
|
||
}
|
||
return ext_map.get(mimetype, ".tmp")
|
||
|
||
|
||
async def convert_to_wav(input_path: str) -> Optional[str]:
|
||
output_fd, output_path = tempfile.mkstemp(suffix=".wav")
|
||
os.close(output_fd)
|
||
cmd = [
|
||
"ffmpeg", "-i", input_path,
|
||
"-map", "0:a:0",
|
||
"-map_metadata", "-1",
|
||
"-vn",
|
||
"-acodec", "pcm_s16le",
|
||
"-ar", "16000",
|
||
"-ac", "1",
|
||
"-y",
|
||
output_path
|
||
]
|
||
try:
|
||
loop = asyncio.get_running_loop()
|
||
await loop.run_in_executor(None, lambda: subprocess.run(cmd, capture_output=True, check=True))
|
||
return output_path
|
||
except subprocess.CalledProcessError as e:
|
||
print(f"[AUDIO] Ошибка конвертации ffmpeg: {e.stderr.decode()}")
|
||
if os.path.exists(output_path):
|
||
os.unlink(output_path)
|
||
return None
|
||
|
||
|
||
async def transcribe_audio(audio_bytes: bytes, mimetype: str) -> Optional[str]:
|
||
if not ffmpeg_available():
|
||
print("[AUDIO] Ошибка: ffmpeg не установлен.")
|
||
return None
|
||
|
||
ext = get_file_extension(mimetype)
|
||
loop = asyncio.get_running_loop()
|
||
with tempfile.NamedTemporaryFile(suffix=ext, delete=False) as tmp:
|
||
tmp.write(audio_bytes)
|
||
input_path = tmp.name
|
||
|
||
wav_path = None
|
||
try:
|
||
wav_path = await convert_to_wav(input_path)
|
||
if not wav_path:
|
||
print("[AUDIO] Конвертация в WAV не удалась.")
|
||
return None
|
||
|
||
model = get_whisper_model()
|
||
segments, info = await loop.run_in_executor(
|
||
None,
|
||
lambda: model.transcribe(wav_path, beam_size=5, language=WHISPER_LANGUAGE)
|
||
)
|
||
text = " ".join([segment.text for segment in segments])
|
||
return text.strip()
|
||
except Exception as e:
|
||
print(f"[AUDIO] Ошибка при распознавании: {e}")
|
||
return None
|
||
finally:
|
||
if os.path.exists(input_path):
|
||
os.unlink(input_path)
|
||
if wav_path and os.path.exists(wav_path):
|
||
os.unlink(wav_path)
|
||
|
||
|
||
async def call_qwen_api(prompt: str) -> str:
|
||
"""
|
||
Асинхронный вызов Qwen API для генерации отчёта.
|
||
Возвращает текст ответа или сообщение об ошибке.
|
||
"""
|
||
if not QWEN_API_KEY:
|
||
print("[QWEN] API ключ не задан, возвращаем заглушку.")
|
||
return "API ключ Qwen не настроен. Отчёт не может быть сгенерирован."
|
||
|
||
headers = {
|
||
"Authorization": f"Bearer {QWEN_API_KEY}",
|
||
"Content-Type": "application/json"
|
||
}
|
||
payload = {
|
||
"model": QWEN_MODEL,
|
||
"messages": [
|
||
{"role": "user", "content": prompt}
|
||
]
|
||
}
|
||
|
||
try:
|
||
async with aiohttp.ClientSession() as session:
|
||
async with session.post(QWEN_ENDPOINT, headers=headers, json=payload) as resp:
|
||
if resp.status == 200:
|
||
data = await resp.json()
|
||
if "choices" in data and len(data["choices"]) > 0:
|
||
content = data["choices"][0]["message"]["content"]
|
||
return content.strip()
|
||
else:
|
||
print(f"[QWEN] Неожиданный формат ответа: {data}")
|
||
return "Ошибка: не удалось извлечь ответ из API."
|
||
else:
|
||
text = await resp.text()
|
||
print(f"[QWEN] Ошибка API: {resp.status} - {text}")
|
||
return f"Ошибка при обращении к Qwen API (HTTP {resp.status})."
|
||
except Exception as e:
|
||
print(f"[QWEN] Исключение: {e}")
|
||
return "Не удалось соединиться с Qwen API."
|
||
|
||
|
||
async def generate_report(text: str, images_data: list, audios_data: list) -> Optional[bytes]:
|
||
# Собираем транскрипции аудио
|
||
audio_texts = []
|
||
for audio in audios_data:
|
||
audio_text = await transcribe_audio(audio["bytes"], audio.get("mimetype", "audio/ogg"))
|
||
if audio_text:
|
||
audio_texts.append(audio_text)
|
||
|
||
# Формируем полный текст для отчёта
|
||
parts = []
|
||
if text:
|
||
parts.append(f"Текстовые сообщения:\n{text}")
|
||
if audio_texts:
|
||
parts.append("Расшифровка аудио:\n" + "\n\n".join(audio_texts))
|
||
if images_data:
|
||
parts.append(f"Количество изображений: {len(images_data)} (анализ не выполнен)")
|
||
|
||
full_text = "\n\n".join(parts)
|
||
if not full_text.strip():
|
||
return None
|
||
|
||
prompt = f"{QWEN_PROMPT_TEMPLATE}\n Текст: {full_text}"
|
||
print("[QWEN] Отправка запроса...")
|
||
report = await call_qwen_api(prompt)
|
||
print(f"[QWEN] Получен ответ: {report[:200]}...")
|
||
|
||
# Если API вернул ошибку, не генерируем PDF
|
||
if report.startswith("Ошибка:"):
|
||
print(f"[QWEN] Ошибка API: {report}")
|
||
return None
|
||
|
||
quality_prompt = f"{QWEN_QUALITY_PROMPT_TEMPLATE}\n Исходные данные: {full_text} Отчёт: {report}"
|
||
quality_report = await call_qwen_api(quality_prompt)
|
||
print(f"[QWEN] Оценка качества: {quality_report}")
|
||
|
||
report = report.replace('```html', '')
|
||
report = report.replace('```', '')
|
||
|
||
try:
|
||
pdf_bytes = HTML(string=report).write_pdf()
|
||
return pdf_bytes
|
||
except Exception as e:
|
||
print(f"[PDF] Ошибка генерации: {e}")
|
||
return None
|
||
|
||
|
||
async def send_error_message(room_id: str, error_text: str):
|
||
await client.room_send(
|
||
room_id,
|
||
"m.room.message",
|
||
{"msgtype": "m.text", "body": f"❌ {error_text}"}
|
||
)
|
||
|
||
|
||
async def process_audio(audio_data: Dict) -> str:
|
||
audio_bytes = audio_data["bytes"]
|
||
mimetype = audio_data.get("mimetype", "audio/ogg")
|
||
print(f"[AUDIO] Получено {len(audio_bytes)} байт аудио, тип: {mimetype}")
|
||
text = await transcribe_audio(audio_bytes, mimetype)
|
||
if text is None:
|
||
print("[AUDIO] Распознавание не удалось.")
|
||
return ""
|
||
print(f"[AUDIO] Распознанный текст: {text[:300]}...")
|
||
return text
|
||
|
||
|
||
async def process_image(image_data: Dict) -> str:
|
||
print(f"[IMAGE] Получено {len(image_data['bytes'])} байт изображения")
|
||
return "[Описание изображения будет добавлено позже]"
|
||
|
||
|
||
async def process_complete_message(data: Dict):
|
||
room_id = data["room_id"]
|
||
text_parts = data.get("text", [])
|
||
text = "\n".join(text_parts) if text_parts else ""
|
||
images_data = data.get("images", [])
|
||
audios_data = data.get("audio", [])
|
||
|
||
pdf_bytes = await generate_report(text, images_data, audios_data)
|
||
|
||
if pdf_bytes is None:
|
||
await client.room_send(
|
||
room_id,
|
||
"m.room.message",
|
||
{"msgtype": "m.text",
|
||
"body": "Не удалось обработать сообщение (нет текста, не распознано аудио или ошибка)."}
|
||
)
|
||
else:
|
||
print("[FILE] Загрузка файла на сервер...")
|
||
# Создаём файловый объект из байтов
|
||
file_like = io.BytesIO(pdf_bytes)
|
||
upload_result = await client.upload(
|
||
file_like,
|
||
content_type="application/pdf",
|
||
filename="report.pdf",
|
||
filesize=len(pdf_bytes) # обязательно указываем размер
|
||
)
|
||
|
||
# Результат может быть кортежем (UploadError, None) или объектом UploadResponse
|
||
if isinstance(upload_result, tuple) and len(upload_result) > 0:
|
||
result_obj = upload_result[0]
|
||
else:
|
||
result_obj = upload_result
|
||
|
||
if isinstance(result_obj, UploadError):
|
||
print(f"[FILE] Ошибка загрузки: {result_obj.status_code} - {result_obj.message}")
|
||
await client.room_send(
|
||
room_id,
|
||
"m.room.message",
|
||
{"msgtype": "m.text", "body": "❌ Не удалось загрузить отчёт на сервер."}
|
||
)
|
||
elif isinstance(result_obj, UploadResponse):
|
||
mxc_url = result_obj.content_uri
|
||
await client.room_send(
|
||
room_id,
|
||
"m.room.message",
|
||
{
|
||
"msgtype": "m.file",
|
||
"body": "report.pdf",
|
||
"url": mxc_url,
|
||
"filename": "report.pdf",
|
||
"info": {
|
||
"mimetype": "application/pdf",
|
||
"size": len(pdf_bytes)
|
||
}
|
||
}
|
||
)
|
||
print("[FILE] PDF отправлен")
|
||
else:
|
||
print(f"[FILE] Неизвестный тип ответа: {result_obj}")
|
||
await client.room_send(
|
||
room_id,
|
||
"m.room.message",
|
||
{"msgtype": "m.text", "body": "❌ Ошибка при загрузке отчёта (неизвестный ответ сервера)."}
|
||
)
|
||
|
||
# Очистка данных
|
||
if "event_id" in data:
|
||
pending_by_event_id.pop(data["event_id"], None)
|
||
pending_by_conversation.pop((room_id, data["sender"]), None)
|
||
|
||
|
||
async def delayed_processing(data: Dict):
|
||
await asyncio.sleep(GROUPING_TIMEOUT)
|
||
key = (data["room_id"], data["sender"])
|
||
if pending_by_conversation.get(key) is data:
|
||
await process_complete_message(data)
|
||
|
||
|
||
def get_or_create_pending(room_id: str, sender: str, event_id: Optional[str] = None) -> Dict:
|
||
if event_id and event_id in pending_by_event_id:
|
||
return pending_by_event_id[event_id]
|
||
|
||
key = (room_id, sender)
|
||
if key in pending_by_conversation:
|
||
return pending_by_conversation[key]
|
||
|
||
data = {
|
||
"room_id": room_id,
|
||
"sender": sender,
|
||
"text": [],
|
||
"images": [],
|
||
"audio": [],
|
||
"timestamp": time.time(),
|
||
"task": None,
|
||
}
|
||
if event_id:
|
||
data["event_id"] = event_id
|
||
pending_by_conversation[key] = data
|
||
if event_id:
|
||
pending_by_event_id[event_id] = data
|
||
return data
|
||
|
||
|
||
def reset_timer(data: Dict):
|
||
if data["task"] and not data["task"].done():
|
||
data["task"].cancel()
|
||
data["timestamp"] = time.time()
|
||
data["task"] = asyncio.create_task(delayed_processing(data))
|
||
|
||
|
||
async def on_text_message(room, event: RoomMessageText):
|
||
if event.sender == client.user_id:
|
||
return
|
||
if room.room_id not in ALLOWED_ROOMS:
|
||
return
|
||
|
||
event_id = event.event_id
|
||
data = get_or_create_pending(room.room_id, event.sender, event_id)
|
||
data["text"].append(event.body)
|
||
reset_timer(data)
|
||
print(f"[TEXT] Добавлен текст в сообщение от {event.sender}: {event.body}")
|
||
|
||
|
||
async def on_image_message(room, event: RoomMessageImage):
|
||
if event.sender == client.user_id:
|
||
return
|
||
if room.room_id not in ALLOWED_ROOMS:
|
||
return
|
||
|
||
related_event_id = None
|
||
if hasattr(event, "source") and "content" in event.source:
|
||
content = event.source["content"]
|
||
if "m.relates_to" in content and "event_id" in content["m.relates_to"]:
|
||
related_event_id = content["m.relates_to"]["event_id"]
|
||
|
||
data = get_or_create_pending(room.room_id, event.sender, related_event_id)
|
||
|
||
download_result = await client.download(event.url)
|
||
if isinstance(download_result, ErrorResponse):
|
||
print(f"[IMAGE] Ошибка скачивания: {download_result.status_code} - {download_result.message}")
|
||
await send_error_message(room.room_id, "Не удалось загрузить изображение.")
|
||
return
|
||
|
||
mimetype = getattr(event, "mimetype", None)
|
||
if not mimetype and hasattr(event, "info") and isinstance(event.info, dict):
|
||
mimetype = event.info.get("mimetype")
|
||
if not mimetype:
|
||
mimetype = "image/jpeg"
|
||
|
||
data["images"].append({
|
||
"bytes": download_result.body,
|
||
"mimetype": mimetype,
|
||
})
|
||
reset_timer(data)
|
||
print(f"[IMAGE] Добавлено изображение в сообщение от {event.sender}")
|
||
|
||
|
||
async def on_audio_message(room, event: RoomMessageAudio):
|
||
if event.sender == client.user_id:
|
||
return
|
||
if room.room_id not in ALLOWED_ROOMS:
|
||
return
|
||
|
||
related_event_id = None
|
||
if hasattr(event, "source") and "content" in event.source:
|
||
content = event.source["content"]
|
||
if "m.relates_to" in content and "event_id" in content["m.relates_to"]:
|
||
related_event_id = content["m.relates_to"]["event_id"]
|
||
|
||
data = get_or_create_pending(room.room_id, event.sender, related_event_id)
|
||
|
||
download_result = await client.download(event.url)
|
||
if isinstance(download_result, ErrorResponse):
|
||
print(f"[AUDIO] Ошибка скачивания: {download_result.status_code} - {download_result.message}")
|
||
await send_error_message(room.room_id, "Не удалось загрузить аудио.")
|
||
return
|
||
|
||
mimetype = None
|
||
if hasattr(event, "info") and isinstance(event.info, dict):
|
||
mimetype = event.info.get("mimetype")
|
||
if not mimetype:
|
||
mimetype = "audio/ogg"
|
||
|
||
data["audio"].append({
|
||
"bytes": download_result.body,
|
||
"mimetype": mimetype,
|
||
})
|
||
reset_timer(data)
|
||
print(f"[AUDIO] Добавлено аудио в сообщение от {event.sender}")
|
||
|
||
|
||
async def main():
|
||
global client
|
||
|
||
config = AsyncClientConfig(
|
||
max_timeouts=10,
|
||
store_sync_tokens=True,
|
||
encryption_enabled=False,
|
||
)
|
||
client = AsyncClient(
|
||
homeserver=HOMESERVER,
|
||
user=USERNAME,
|
||
device_id=None,
|
||
config=config,
|
||
)
|
||
|
||
try:
|
||
if PASSWORD:
|
||
response = await client.login(PASSWORD)
|
||
else:
|
||
response = await client.login(token=os.environ.get("ACCESS_TOKEN", ""))
|
||
|
||
if isinstance(response, LoginResponse):
|
||
print(f"Бот {USERNAME} успешно авторизован на {HOMESERVER}")
|
||
else:
|
||
print(f"Ошибка авторизации: {response}")
|
||
return
|
||
except Exception as e:
|
||
print(f"Исключение при авторизации: {e}")
|
||
return
|
||
|
||
if not ffmpeg_available():
|
||
print("ВНИМАНИЕ: ffmpeg не найден. Бот не сможет распознавать аудио.")
|
||
print("Установите ffmpeg (https://ffmpeg.org/download.html) и добавьте в PATH.")
|
||
else:
|
||
print("ffmpeg найден, аудио будет обрабатываться.")
|
||
|
||
if WHISPER_LANGUAGE:
|
||
print(f"Язык распознавания: {WHISPER_LANGUAGE}")
|
||
|
||
if not QWEN_API_KEY:
|
||
print("ВНИМАНИЕ: QWEN_API_KEY не задан. Генерация отчётов будет недоступна.")
|
||
|
||
client.add_event_callback(on_text_message, RoomMessageText)
|
||
client.add_event_callback(on_image_message, RoomMessageImage)
|
||
client.add_event_callback(on_audio_message, RoomMessageAudio)
|
||
|
||
print("Бот запущен, ожидание событий...")
|
||
try:
|
||
await client.sync_forever(timeout=30000)
|
||
except KeyboardInterrupt:
|
||
print("Бот остановлен пользователем")
|
||
finally:
|
||
await client.close()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(main())
|