b2b_assistants/main.py

377 lines
No EOL
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import asyncio
import os
import tempfile
import time
import shutil
import subprocess
from typing import Dict, Optional, Tuple
from dotenv import load_dotenv
from nio import (
AsyncClient,
RoomMessageText,
RoomMessageImage,
RoomMessageAudio,
LoginResponse,
AsyncClientConfig,
ErrorResponse,
)
from faster_whisper import WhisperModel
load_dotenv()
HOMESERVER = os.getenv("HOMESERVER", "https://matrix.org")
USERNAME = os.getenv("MATRIX_USERNAME")
PASSWORD = os.getenv("PASSWORD")
ALLOWED_ROOMS = set(room.strip() for room in os.getenv("ALLOWED_ROOMS", "").split(",") if room.strip())
WHISPER_LANGUAGE = os.getenv("WHISPER_LANGUAGE", "ru")
WHISPER_MODEL = os.getenv("WHISPER_MODEL", "small")
TEMP_DIR = tempfile.gettempdir()
GROUPING_TIMEOUT = 15.0
client: AsyncClient = None
pending_by_conversation: Dict[Tuple[str, str], Dict] = {}
pending_by_event_id: Dict[str, Dict] = {}
whisper_model = None
def get_whisper_model():
global whisper_model
if whisper_model is None:
whisper_model = WhisperModel(WHISPER_MODEL, device="cpu", compute_type="int8")
print(f"Whisper модель {WHISPER_MODEL} загружена (faster-whisper).")
return whisper_model
def ffmpeg_available() -> bool:
found = shutil.which("ffmpeg") is not None
if not found:
print("[ERROR] ffmpeg не найден в системе. Установите ffmpeg и добавьте в PATH.")
return found
def get_file_extension(mimetype: str) -> str:
ext_map = {
"audio/ogg": ".ogg",
"audio/mpeg": ".mp3",
"audio/mp4": ".m4a",
"audio/x-m4a": ".m4a",
"audio/wav": ".wav",
"audio/webm": ".webm",
}
return ext_map.get(mimetype, ".tmp")
async def convert_to_wav(input_path: str) -> Optional[str]:
output_fd, output_path = tempfile.mkstemp(suffix=".wav")
os.close(output_fd)
cmd = [
"ffmpeg", "-i", input_path,
"-map", "0:a:0",
"-map_metadata", "-1",
"-vn",
"-acodec", "pcm_s16le",
"-ar", "16000",
"-ac", "1",
"-y",
output_path
]
try:
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, lambda: subprocess.run(cmd, capture_output=True, check=True))
return output_path
except subprocess.CalledProcessError as e:
print(f"[AUDIO] Ошибка конвертации ffmpeg: {e.stderr.decode()}")
if os.path.exists(output_path):
os.unlink(output_path)
return None
async def transcribe_audio(audio_bytes: bytes, mimetype: str) -> Optional[str]:
if not ffmpeg_available():
print("[AUDIO] Ошибка: ffmpeg не установлен.")
return None
ext = get_file_extension(mimetype)
loop = asyncio.get_running_loop()
with tempfile.NamedTemporaryFile(suffix=ext, delete=False) as tmp:
tmp.write(audio_bytes)
input_path = tmp.name
wav_path = None
try:
wav_path = await convert_to_wav(input_path)
if not wav_path:
print("[AUDIO] Конвертация в WAV не удалась.")
return None
model = get_whisper_model()
segments, info = await loop.run_in_executor(
None,
lambda: model.transcribe(wav_path, beam_size=5, language=WHISPER_LANGUAGE)
)
text = " ".join([segment.text for segment in segments])
return text.strip()
except Exception as e:
print(f"[AUDIO] Ошибка при распознавании: {e}")
return None
finally:
if os.path.exists(input_path):
os.unlink(input_path)
if wav_path and os.path.exists(wav_path):
os.unlink(wav_path)
async def process_audio(audio_data: Dict) -> str:
audio_bytes = audio_data["bytes"]
mimetype = audio_data.get("mimetype", "audio/ogg")
print(f"[AUDIO] Получено {len(audio_bytes)} байт аудио, тип: {mimetype}")
text = await transcribe_audio(audio_bytes, mimetype)
if text is None:
print("[AUDIO] Распознавание не удалось.")
return ""
return text
async def process_image(image_data: Dict) -> str:
print(f"[IMAGE] Получено {len(image_data['bytes'])} байт изображения")
return "[Описание изображения будет добавлено позже]"
async def generate_report(text: str, images_data: list, audios_data: list) -> str:
audio_texts = []
for audio in audios_data:
audio_text = await process_audio(audio)
if audio_text:
audio_texts.append(audio_text)
image_descriptions = []
for img in images_data:
desc = await process_image(img)
if desc:
image_descriptions.append(desc)
parts = []
if text:
parts.append(f"**Текст сообщения:**\n{text}")
if audio_texts:
parts.append("**Распознанный текст из аудио:**\n" + "\n\n".join(audio_texts))
if image_descriptions:
parts.append("**Описания изображений:**\n" + "\n".join(image_descriptions))
if not parts:
return "Не удалось обработать сообщение (нет текста, не распознано аудио или ошибка)."
print(f"[REPORT] text: {text}, images: {len(image_descriptions)}, audio: {len(audio_texts)}")
return "\n\n".join(parts)
async def send_error_message(room_id: str, error_text: str):
await client.room_send(
room_id,
"m.room.message",
{"msgtype": "m.text", "body": f"{error_text}"}
)
async def process_complete_message(data: Dict):
room_id = data["room_id"]
# Объединяем все текстовые сообщения, которые были в этой группе
text_parts = data.get("text", [])
text = "\n".join(text_parts) if text_parts else ""
images_data = data.get("images", [])
audios_data = data.get("audio", [])
report = await generate_report(text, images_data, audios_data)
await client.room_send(
room_id,
"m.room.message",
{"msgtype": "m.text", "body": report}
)
if "event_id" in data:
pending_by_event_id.pop(data["event_id"], None)
pending_by_conversation.pop((room_id, data["sender"]), None)
async def delayed_processing(data: Dict):
await asyncio.sleep(GROUPING_TIMEOUT)
key = (data["room_id"], data["sender"])
if pending_by_conversation.get(key) is data:
await process_complete_message(data)
def get_or_create_pending(room_id: str, sender: str, event_id: Optional[str] = None) -> Dict:
if event_id and event_id in pending_by_event_id:
return pending_by_event_id[event_id]
key = (room_id, sender)
if key in pending_by_conversation:
return pending_by_conversation[key]
data = {
"room_id": room_id,
"sender": sender,
"text": [], # список строк, а не одна строка
"images": [],
"audio": [],
"timestamp": time.time(),
"task": None,
}
if event_id:
data["event_id"] = event_id
pending_by_conversation[key] = data
if event_id:
pending_by_event_id[event_id] = data
return data
def reset_timer(data: Dict):
if data["task"] and not data["task"].done():
data["task"].cancel()
data["timestamp"] = time.time()
data["task"] = asyncio.create_task(delayed_processing(data))
async def on_text_message(room, event: RoomMessageText):
if event.sender == client.user_id:
return
if room.room_id not in ALLOWED_ROOMS:
return
event_id = event.event_id
data = get_or_create_pending(room.room_id, event.sender, event_id)
# Добавляем текст в список, а не заменяем
data["text"].append(event.body)
reset_timer(data)
print(f"[TEXT] Добавлен текст в сообщение от {event.sender}: {event.body}")
async def on_image_message(room, event: RoomMessageImage):
if event.sender == client.user_id:
return
if room.room_id not in ALLOWED_ROOMS:
return
related_event_id = None
if hasattr(event, "source") and "content" in event.source:
content = event.source["content"]
if "m.relates_to" in content and "event_id" in content["m.relates_to"]:
related_event_id = content["m.relates_to"]["event_id"]
data = get_or_create_pending(room.room_id, event.sender, related_event_id)
download_result = await client.download(event.url)
if isinstance(download_result, ErrorResponse):
print(f"[IMAGE] Ошибка скачивания: {download_result.status_code} - {download_result.message}")
await send_error_message(room.room_id, "Не удалось загрузить изображение.")
return
mimetype = getattr(event, "mimetype", None)
if not mimetype and hasattr(event, "info") and isinstance(event.info, dict):
mimetype = event.info.get("mimetype")
if not mimetype:
mimetype = "image/jpeg"
data["images"].append({
"bytes": download_result.body,
"mimetype": mimetype,
})
reset_timer(data)
print(f"[IMAGE] Добавлено изображение в сообщение от {event.sender}")
async def on_audio_message(room, event: RoomMessageAudio):
if event.sender == client.user_id:
return
if room.room_id not in ALLOWED_ROOMS:
return
related_event_id = None
if hasattr(event, "source") and "content" in event.source:
content = event.source["content"]
if "m.relates_to" in content and "event_id" in content["m.relates_to"]:
related_event_id = content["m.relates_to"]["event_id"]
data = get_or_create_pending(room.room_id, event.sender, related_event_id)
download_result = await client.download(event.url)
if isinstance(download_result, ErrorResponse):
print(f"[AUDIO] Ошибка скачивания: {download_result.status_code} - {download_result.message}")
await send_error_message(room.room_id, "Не удалось загрузить аудио.")
return
mimetype = None
if hasattr(event, "info") and isinstance(event.info, dict):
mimetype = event.info.get("mimetype")
if not mimetype:
mimetype = "audio/ogg"
data["audio"].append({
"bytes": download_result.body,
"mimetype": mimetype,
})
reset_timer(data)
print(f"[AUDIO] Добавлено аудио в сообщение от {event.sender}")
async def main():
global client
config = AsyncClientConfig(
max_timeouts=10,
store_sync_tokens=True,
encryption_enabled=False,
)
client = AsyncClient(
homeserver=HOMESERVER,
user=USERNAME,
device_id=None,
config=config,
)
try:
if PASSWORD:
response = await client.login(PASSWORD)
else:
response = await client.login(token=os.environ.get("ACCESS_TOKEN", ""))
if isinstance(response, LoginResponse):
print(f"Бот {USERNAME} успешно авторизован на {HOMESERVER}")
print(f"Access token: {client.access_token}")
else:
print(f"Ошибка авторизации: {response}")
return
except Exception as e:
print(f"Исключение при авторизации: {e}")
return
if not ffmpeg_available():
print("ВНИМАНИЕ: ffmpeg не найден. Бот не сможет распознавать аудио.")
print("Установите ffmpeg (https://ffmpeg.org/download.html) и добавьте в PATH.")
else:
print("ffmpeg найден, аудио будет обрабатываться.")
if WHISPER_LANGUAGE:
print(f"Язык распознавания: {WHISPER_LANGUAGE}")
client.add_event_callback(on_text_message, RoomMessageText)
client.add_event_callback(on_image_message, RoomMessageImage)
client.add_event_callback(on_audio_message, RoomMessageAudio)
print("Бот запущен, ожидание событий...")
try:
await client.sync_forever(timeout=30000)
except KeyboardInterrupt:
print("Бот остановлен пользователем")
finally:
await client.close()
if __name__ == "__main__":
asyncio.run(main())