b2b_assistants/main.py

226 lines
No EOL
7.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import asyncio
import os
import tempfile
import time
from typing import Dict, Optional, Tuple
from dotenv import load_dotenv
from nio import (
AsyncClient,
RoomMessageText,
RoomMessageImage,
RoomMessageAudio,
LoginResponse,
AsyncClientConfig,
ErrorResponse,
)
load_dotenv()
HOMESERVER = os.getenv("HOMESERVER", "https://matrix.org")
USERNAME = os.getenv("MATRIX_USERNAME")
PASSWORD = os.getenv("PASSWORD")
ALLOWED_ROOMS = set(room.strip() for room in os.getenv("ALLOWED_ROOMS", "").split(",") if room.strip())
TEMP_DIR = tempfile.gettempdir()
GROUPING_TIMEOUT = 15.0
client: AsyncClient = None
pending_by_conversation: Dict[Tuple[str, str], Dict] = {}
pending_by_event_id: Dict[str, Dict] = {}
async def process_audio(audio_bytes: bytes) -> str:
print(f"[AUDIO] Получено {len(audio_bytes)} байт аудио")
return "Placeholder"
async def process_image(image_bytes: bytes) -> str:
print(f"[IMAGE] Получено {len(image_bytes)} байт изображения")
return "Placeholder"
async def generate_report(text: str, image_descriptions: list, audio_texts: list) -> str:
print(f"[REPORT] text: {text}, images: {len(image_descriptions)}, audio: {len(audio_texts)}")
#TODO whisper + отчёт
return "Placeholder"
async def send_error_message(room_id: str, error_text: str):
await client.room_send(
room_id,
"m.room.message",
{"msgtype": "m.text", "body": f"❌ Ошибка: {error_text}"}
)
async def process_complete_message(data: Dict):
room_id = data["room_id"]
image_descriptions = []
for img_bytes in data.get("images", []):
desc = await process_image(img_bytes)
image_descriptions.append(desc)
audio_texts = []
for aud_bytes in data.get("audio", []):
text = await process_audio(aud_bytes)
audio_texts.append(text)
report = await generate_report(data.get("text", ""), image_descriptions, audio_texts)
await client.room_send(
room_id,
"m.room.message",
{"msgtype": "m.text", "body": report}
)
if "event_id" in data:
pending_by_event_id.pop(data["event_id"], None)
pending_by_conversation.pop((room_id, data["sender"]), None)
async def delayed_processing(data: Dict):
await asyncio.sleep(GROUPING_TIMEOUT)
key = (data["room_id"], data["sender"])
if pending_by_conversation.get(key) is data:
await process_complete_message(data)
def get_or_create_pending(room_id: str, sender: str, event_id: Optional[str] = None) -> Dict:
if event_id and event_id in pending_by_event_id:
return pending_by_event_id[event_id]
key = (room_id, sender)
if key in pending_by_conversation:
return pending_by_conversation[key]
data = {
"room_id": room_id,
"sender": sender,
"text": None,
"images": [],
"audio": [],
"timestamp": time.time(),
"task": None,
}
if event_id:
data["event_id"] = event_id
pending_by_conversation[key] = data
if event_id:
pending_by_event_id[event_id] = data
return data
def reset_timer(data: Dict):
if data["task"] and not data["task"].done():
data["task"].cancel()
data["timestamp"] = time.time()
data["task"] = asyncio.create_task(delayed_processing(data))
async def on_text_message(room, event: RoomMessageText):
if event.sender == client.user_id:
return
if room.room_id not in ALLOWED_ROOMS:
return
event_id = event.event_id
data = get_or_create_pending(room.room_id, event.sender, event_id)
data["text"] = event.body
reset_timer(data)
print(f"[TEXT] Добавлен текст в сообщение от {event.sender}: {event.body}")
async def on_image_message(room, event: RoomMessageImage):
if event.sender == client.user_id:
return
if room.room_id not in ALLOWED_ROOMS:
return
related_event_id = None
if hasattr(event, "source") and "content" in event.source:
content = event.source["content"]
if "m.relates_to" in content and "event_id" in content["m.relates_to"]:
related_event_id = content["m.relates_to"]["event_id"]
data = get_or_create_pending(room.room_id, event.sender, related_event_id)
download_result = await client.download(event.url)
if isinstance(download_result, ErrorResponse):
print(f"Ошибка скачивания изображения: {download_result.status_code}")
await send_error_message(room.room_id, "Не удалось загрузить изображение.")
return
data["images"].append(download_result.body)
reset_timer(data)
print(f"[IMAGE] Добавлено изображение в сообщение от {event.sender}")
async def on_audio_message(room, event: RoomMessageAudio):
if event.sender == client.user_id:
return
if room.room_id not in ALLOWED_ROOMS:
return
related_event_id = None
if hasattr(event, "source") and "content" in event.source:
content = event.source["content"]
if "m.relates_to" in content and "event_id" in content["m.relates_to"]:
related_event_id = content["m.relates_to"]["event_id"]
data = get_or_create_pending(room.room_id, event.sender, related_event_id)
download_result = await client.download(event.url)
if isinstance(download_result, ErrorResponse):
print(f"Ошибка скачивания аудио: {download_result.status_code}")
await send_error_message(room.room_id, "Не удалось загрузить аудио.")
return
data["audio"].append(download_result.body)
reset_timer(data)
print(f"[AUDIO] Добавлено аудио в сообщение от {event.sender}")
async def main():
global client
config = AsyncClientConfig(
max_timeouts=10,
store_sync_tokens=True,
encryption_enabled=False,
)
client = AsyncClient(
homeserver=HOMESERVER,
user=USERNAME,
device_id=None,
config=config,
)
try:
if PASSWORD:
response = await client.login(PASSWORD)
else:
response = await client.login(token=os.environ.get("ACCESS_TOKEN", ""))
if isinstance(response, LoginResponse):
print(f"Бот {USERNAME} успешно авторизован на {HOMESERVER}")
print(f"Access token: {client.access_token}")
else:
print(f"Ошибка авторизации: {response}")
return
except Exception as e:
print(f"Исключение при авторизации: {e}")
return
client.add_event_callback(on_text_message, RoomMessageText)
client.add_event_callback(on_image_message, RoomMessageImage)
client.add_event_callback(on_audio_message, RoomMessageAudio)
print("Бот запущен, ожидание событий...")
try:
await client.sync_forever(timeout=30000)
except KeyboardInterrupt:
print("Бот остановлен пользователем")
finally:
await client.close()
if __name__ == "__main__":
asyncio.run(main())