b2b_assistants/main.py

496 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import asyncio
import os
import tempfile
import aiofiles
import time
import shutil
import subprocess
from weasyprint import HTML
import io
import json
from typing import Dict, Optional, Tuple
from dotenv import load_dotenv
import aiohttp
from nio import (
AsyncClient,
RoomMessageText,
RoomMessageImage,
RoomMessageAudio,
LoginResponse,
AsyncClientConfig,
ErrorResponse,
UploadResponse,
UploadError,
)
from faster_whisper import WhisperModel
load_dotenv()
HOMESERVER = os.getenv("HOMESERVER", "https://matrix.org")
USERNAME = os.getenv("MATRIX_USERNAME")
PASSWORD = os.getenv("PASSWORD")
ALLOWED_ROOMS = set(room.strip() for room in os.getenv("ALLOWED_ROOMS", "").split(",") if room.strip())
WHISPER_LANGUAGE = os.getenv("WHISPER_LANGUAGE", "ru")
WHISPER_MODEL = os.getenv("WHISPER_MODEL", "small")
# Qwen API
QWEN_API_KEY = os.getenv("QWEN_API_KEY")
QWEN_ENDPOINT = os.getenv("QWEN_ENDPOINT", "https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completions")
QWEN_MODEL = os.getenv("QWEN_MODEL", "qwen3.5-122b")
QWEN_PROMPT_TEMPLATE = ""
with open("base_prompt.txt", "r") as f:
QWEN_PROMPT_TEMPLATE += f.read()
TEMP_DIR = tempfile.gettempdir()
GROUPING_TIMEOUT = 15.0
client: AsyncClient = None
pending_by_conversation: Dict[Tuple[str, str], Dict] = {}
pending_by_event_id: Dict[str, Dict] = {}
whisper_model = None
def get_whisper_model():
global whisper_model
if whisper_model is None:
whisper_model = WhisperModel(WHISPER_MODEL, device="cpu", compute_type="int8")
print(f"Whisper модель {WHISPER_MODEL} загружена (faster-whisper).")
return whisper_model
def ffmpeg_available() -> bool:
found = shutil.which("ffmpeg") is not None
if not found:
print("[ERROR] ffmpeg не найден в системе. Установите ffmpeg и добавьте в PATH.")
return found
def get_file_extension(mimetype: str) -> str:
ext_map = {
"audio/ogg": ".ogg",
"audio/mpeg": ".mp3",
"audio/mp4": ".m4a",
"audio/x-m4a": ".m4a",
"audio/wav": ".wav",
"audio/webm": ".webm",
}
return ext_map.get(mimetype, ".tmp")
async def convert_to_wav(input_path: str) -> Optional[str]:
output_fd, output_path = tempfile.mkstemp(suffix=".wav")
os.close(output_fd)
cmd = [
"ffmpeg", "-i", input_path,
"-map", "0:a:0",
"-map_metadata", "-1",
"-vn",
"-acodec", "pcm_s16le",
"-ar", "16000",
"-ac", "1",
"-y",
output_path
]
try:
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, lambda: subprocess.run(cmd, capture_output=True, check=True))
return output_path
except subprocess.CalledProcessError as e:
print(f"[AUDIO] Ошибка конвертации ffmpeg: {e.stderr.decode()}")
if os.path.exists(output_path):
os.unlink(output_path)
return None
async def transcribe_audio(audio_bytes: bytes, mimetype: str) -> Optional[str]:
if not ffmpeg_available():
print("[AUDIO] Ошибка: ffmpeg не установлен.")
return None
ext = get_file_extension(mimetype)
loop = asyncio.get_running_loop()
with tempfile.NamedTemporaryFile(suffix=ext, delete=False) as tmp:
tmp.write(audio_bytes)
input_path = tmp.name
wav_path = None
try:
wav_path = await convert_to_wav(input_path)
if not wav_path:
print("[AUDIO] Конвертация в WAV не удалась.")
return None
model = get_whisper_model()
segments, info = await loop.run_in_executor(
None,
lambda: model.transcribe(wav_path, beam_size=5, language=WHISPER_LANGUAGE)
)
text = " ".join([segment.text for segment in segments])
return text.strip()
except Exception as e:
print(f"[AUDIO] Ошибка при распознавании: {e}")
return None
finally:
if os.path.exists(input_path):
os.unlink(input_path)
if wav_path and os.path.exists(wav_path):
os.unlink(wav_path)
async def call_qwen_api(prompt: str) -> str:
"""
Асинхронный вызов Qwen API для генерации отчёта.
Возвращает текст ответа или сообщение об ошибке.
"""
if not QWEN_API_KEY:
print("[QWEN] API ключ не задан, возвращаем заглушку.")
return "API ключ Qwen не настроен. Отчёт не может быть сгенерирован."
headers = {
"Authorization": f"Bearer {QWEN_API_KEY}",
"Content-Type": "application/json"
}
payload = {
"model": QWEN_MODEL,
"messages": [
{"role": "user", "content": prompt}
]
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(QWEN_ENDPOINT, headers=headers, json=payload) as resp:
if resp.status == 200:
data = await resp.json()
if "choices" in data and len(data["choices"]) > 0:
content = data["choices"][0]["message"]["content"]
return content.strip()
else:
print(f"[QWEN] Неожиданный формат ответа: {data}")
return "Ошибка: не удалось извлечь ответ из API."
else:
text = await resp.text()
print(f"[QWEN] Ошибка API: {resp.status} - {text}")
return f"Ошибка при обращении к Qwen API (HTTP {resp.status})."
except Exception as e:
print(f"[QWEN] Исключение: {e}")
return "Не удалось соединиться с Qwen API."
async def generate_report(text: str, images_data: list, audios_data: list) -> Optional[bytes]:
# Собираем транскрипции аудио
audio_texts = []
for audio in audios_data:
audio_text = await transcribe_audio(audio["bytes"], audio.get("mimetype", "audio/ogg"))
if audio_text:
audio_texts.append(audio_text)
# Формируем полный текст для отчёта
parts = []
if text:
parts.append(f"Текстовые сообщения:\n{text}")
if audio_texts:
parts.append("Расшифровка аудио:\n" + "\n\n".join(audio_texts))
if images_data:
parts.append(f"Количество изображений: {len(images_data)} (анализ не выполнен)")
full_text = "\n\n".join(parts)
if not full_text.strip():
return None
prompt = f"{QWEN_PROMPT_TEMPLATE}\n Текст: {full_text}"
print("[QWEN] Отправка запроса...")
report = await call_qwen_api(prompt)
print(f"[QWEN] Получен ответ: {report[:200]}...")
# Если API вернул ошибку, не генерируем PDF
if report.startswith("Ошибка:"):
print(f"[QWEN] Ошибка API: {report}")
return None
report = report.replace('```html', '')
report = report.replace('```', '')
try:
pdf_bytes = HTML(string=report).write_pdf()
return pdf_bytes
except Exception as e:
print(f"[PDF] Ошибка генерации: {e}")
return None
async def send_error_message(room_id: str, error_text: str):
await client.room_send(
room_id,
"m.room.message",
{"msgtype": "m.text", "body": f"{error_text}"}
)
async def process_audio(audio_data: Dict) -> str:
audio_bytes = audio_data["bytes"]
mimetype = audio_data.get("mimetype", "audio/ogg")
print(f"[AUDIO] Получено {len(audio_bytes)} байт аудио, тип: {mimetype}")
text = await transcribe_audio(audio_bytes, mimetype)
if text is None:
print("[AUDIO] Распознавание не удалось.")
return ""
return text
async def process_image(image_data: Dict) -> str:
print(f"[IMAGE] Получено {len(image_data['bytes'])} байт изображения")
return "[Описание изображения будет добавлено позже]"
async def process_complete_message(data: Dict):
room_id = data["room_id"]
text_parts = data.get("text", [])
text = "\n".join(text_parts) if text_parts else ""
images_data = data.get("images", [])
audios_data = data.get("audio", [])
pdf_bytes = await generate_report(text, images_data, audios_data)
if pdf_bytes is None:
await client.room_send(
room_id,
"m.room.message",
{"msgtype": "m.text",
"body": "Не удалось обработать сообщение (нет текста, не распознано аудио или ошибка)."}
)
else:
print("[FILE] Загрузка файла на сервер...")
# Создаём файловый объект из байтов
file_like = io.BytesIO(pdf_bytes)
upload_result = await client.upload(
file_like,
content_type="application/pdf",
filename="report.pdf",
filesize=len(pdf_bytes) # обязательно указываем размер
)
# Результат может быть кортежем (UploadError, None) или объектом UploadResponse
if isinstance(upload_result, tuple) and len(upload_result) > 0:
result_obj = upload_result[0]
else:
result_obj = upload_result
if isinstance(result_obj, UploadError):
print(f"[FILE] Ошибка загрузки: {result_obj.status_code} - {result_obj.message}")
await client.room_send(
room_id,
"m.room.message",
{"msgtype": "m.text", "body": "Не удалось загрузить отчёт на сервер."}
)
elif isinstance(result_obj, UploadResponse):
mxc_url = result_obj.content_uri
await client.room_send(
room_id,
"m.room.message",
{
"msgtype": "m.file",
"body": "report.pdf",
"url": mxc_url,
"filename": "report.pdf",
"info": {
"mimetype": "application/pdf",
"size": len(pdf_bytes)
}
}
)
print("[FILE] PDF отправлен")
else:
print(f"[FILE] Неизвестный тип ответа: {result_obj}")
await client.room_send(
room_id,
"m.room.message",
{"msgtype": "m.text", "body": "❌ Ошибка при загрузке отчёта (неизвестный ответ сервера)."}
)
# Очистка данных
if "event_id" in data:
pending_by_event_id.pop(data["event_id"], None)
pending_by_conversation.pop((room_id, data["sender"]), None)
async def delayed_processing(data: Dict):
await asyncio.sleep(GROUPING_TIMEOUT)
key = (data["room_id"], data["sender"])
if pending_by_conversation.get(key) is data:
await process_complete_message(data)
def get_or_create_pending(room_id: str, sender: str, event_id: Optional[str] = None) -> Dict:
if event_id and event_id in pending_by_event_id:
return pending_by_event_id[event_id]
key = (room_id, sender)
if key in pending_by_conversation:
return pending_by_conversation[key]
data = {
"room_id": room_id,
"sender": sender,
"text": [],
"images": [],
"audio": [],
"timestamp": time.time(),
"task": None,
}
if event_id:
data["event_id"] = event_id
pending_by_conversation[key] = data
if event_id:
pending_by_event_id[event_id] = data
return data
def reset_timer(data: Dict):
if data["task"] and not data["task"].done():
data["task"].cancel()
data["timestamp"] = time.time()
data["task"] = asyncio.create_task(delayed_processing(data))
async def on_text_message(room, event: RoomMessageText):
if event.sender == client.user_id:
return
if room.room_id not in ALLOWED_ROOMS:
return
event_id = event.event_id
data = get_or_create_pending(room.room_id, event.sender, event_id)
data["text"].append(event.body)
reset_timer(data)
print(f"[TEXT] Добавлен текст в сообщение от {event.sender}: {event.body}")
async def on_image_message(room, event: RoomMessageImage):
if event.sender == client.user_id:
return
if room.room_id not in ALLOWED_ROOMS:
return
related_event_id = None
if hasattr(event, "source") and "content" in event.source:
content = event.source["content"]
if "m.relates_to" in content and "event_id" in content["m.relates_to"]:
related_event_id = content["m.relates_to"]["event_id"]
data = get_or_create_pending(room.room_id, event.sender, related_event_id)
download_result = await client.download(event.url)
if isinstance(download_result, ErrorResponse):
print(f"[IMAGE] Ошибка скачивания: {download_result.status_code} - {download_result.message}")
await send_error_message(room.room_id, "Не удалось загрузить изображение.")
return
mimetype = getattr(event, "mimetype", None)
if not mimetype and hasattr(event, "info") and isinstance(event.info, dict):
mimetype = event.info.get("mimetype")
if not mimetype:
mimetype = "image/jpeg"
data["images"].append({
"bytes": download_result.body,
"mimetype": mimetype,
})
reset_timer(data)
print(f"[IMAGE] Добавлено изображение в сообщение от {event.sender}")
async def on_audio_message(room, event: RoomMessageAudio):
if event.sender == client.user_id:
return
if room.room_id not in ALLOWED_ROOMS:
return
related_event_id = None
if hasattr(event, "source") and "content" in event.source:
content = event.source["content"]
if "m.relates_to" in content and "event_id" in content["m.relates_to"]:
related_event_id = content["m.relates_to"]["event_id"]
data = get_or_create_pending(room.room_id, event.sender, related_event_id)
download_result = await client.download(event.url)
if isinstance(download_result, ErrorResponse):
print(f"[AUDIO] Ошибка скачивания: {download_result.status_code} - {download_result.message}")
await send_error_message(room.room_id, "Не удалось загрузить аудио.")
return
mimetype = None
if hasattr(event, "info") and isinstance(event.info, dict):
mimetype = event.info.get("mimetype")
if not mimetype:
mimetype = "audio/ogg"
data["audio"].append({
"bytes": download_result.body,
"mimetype": mimetype,
})
reset_timer(data)
print(f"[AUDIO] Добавлено аудио в сообщение от {event.sender}")
async def main():
global client
config = AsyncClientConfig(
max_timeouts=10,
store_sync_tokens=True,
encryption_enabled=False,
)
client = AsyncClient(
homeserver=HOMESERVER,
user=USERNAME,
device_id=None,
config=config,
)
try:
if PASSWORD:
response = await client.login(PASSWORD)
else:
response = await client.login(token=os.environ.get("ACCESS_TOKEN", ""))
if isinstance(response, LoginResponse):
print(f"Бот {USERNAME} успешно авторизован на {HOMESERVER}")
else:
print(f"Ошибка авторизации: {response}")
return
except Exception as e:
print(f"Исключение при авторизации: {e}")
return
if not ffmpeg_available():
print("ВНИМАНИЕ: ffmpeg не найден. Бот не сможет распознавать аудио.")
print("Установите ffmpeg (https://ffmpeg.org/download.html) и добавьте в PATH.")
else:
print("ffmpeg найден, аудио будет обрабатываться.")
if WHISPER_LANGUAGE:
print(f"Язык распознавания: {WHISPER_LANGUAGE}")
if not QWEN_API_KEY:
print("ВНИМАНИЕ: QWEN_API_KEY не задан. Генерация отчётов будет недоступна.")
client.add_event_callback(on_text_message, RoomMessageText)
client.add_event_callback(on_image_message, RoomMessageImage)
client.add_event_callback(on_audio_message, RoomMessageAudio)
print("Бот запущен, ожидание событий...")
try:
await client.sync_forever(timeout=30000)
except KeyboardInterrupt:
print("Бот остановлен пользователем")
finally:
await client.close()
if __name__ == "__main__":
asyncio.run(main())