diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..9175207 --- /dev/null +++ b/.env.example @@ -0,0 +1,11 @@ +HOMESERVER=https://matrix.org + +# Bot's Matrix username (full MXID) +MATRIX_USERNAME=@your_bot:matrix.org + +# Either use password OR access token +PASSWORD= +ACCESS_TOKEN=syt_... + +# Allowed rooms (comma-separated, no spaces) +ALLOWED_ROOMS=!roomid1:matrix.org,!roomid2:matrix.org \ No newline at end of file diff --git a/README.md b/README.md index 9611f7f..84cef0a 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,4 @@ -# b2b_assistants +# Бот для автоматической генерации отчётов -Репозиторий для разработки B to B рушений \ No newline at end of file +Принимает фото/аудио файлы и текстовые сообщения, когда в течение 15 секунд нет новых сообщений, +формирует отчёт по отправленному. \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..005b623 --- /dev/null +++ b/main.py @@ -0,0 +1,226 @@ +#!/usr/bin/env python3 +import asyncio +import os +import tempfile +import time +from typing import Dict, Optional, Tuple +from dotenv import load_dotenv + +from nio import ( + AsyncClient, + RoomMessageText, + RoomMessageImage, + RoomMessageAudio, + LoginResponse, + AsyncClientConfig, + ErrorResponse, +) + +load_dotenv() + +HOMESERVER = os.getenv("HOMESERVER", "https://matrix.org") +USERNAME = os.getenv("MATRIX_USERNAME") +PASSWORD = os.getenv("PASSWORD") +ALLOWED_ROOMS = set(room.strip() for room in os.getenv("ALLOWED_ROOMS", "").split(",") if room.strip()) + +TEMP_DIR = tempfile.gettempdir() +GROUPING_TIMEOUT = 15.0 + +client: AsyncClient = None + +pending_by_conversation: Dict[Tuple[str, str], Dict] = {} +pending_by_event_id: Dict[str, Dict] = {} + + +async def process_audio(audio_bytes: bytes) -> str: + print(f"[AUDIO] Получено {len(audio_bytes)} байт аудио") + return "Placeholder" + + +async def process_image(image_bytes: bytes) -> str: + print(f"[IMAGE] Получено {len(image_bytes)} байт изображения") + return "Placeholder" + + +async def generate_report(text: str, image_descriptions: list, audio_texts: list) -> str: + print(f"[REPORT] text: {text}, images: {len(image_descriptions)}, audio: {len(audio_texts)}") + #TODO whisper + отчёт + return "Placeholder" + + +async def send_error_message(room_id: str, error_text: str): + await client.room_send( + room_id, + "m.room.message", + {"msgtype": "m.text", "body": f"❌ Ошибка: {error_text}"} + ) + + +async def process_complete_message(data: Dict): + room_id = data["room_id"] + image_descriptions = [] + for img_bytes in data.get("images", []): + desc = await process_image(img_bytes) + image_descriptions.append(desc) + audio_texts = [] + for aud_bytes in data.get("audio", []): + text = await process_audio(aud_bytes) + audio_texts.append(text) + report = await generate_report(data.get("text", ""), image_descriptions, audio_texts) + await client.room_send( + room_id, + "m.room.message", + {"msgtype": "m.text", "body": report} + ) + if "event_id" in data: + pending_by_event_id.pop(data["event_id"], None) + pending_by_conversation.pop((room_id, data["sender"]), None) + + +async def delayed_processing(data: Dict): + await asyncio.sleep(GROUPING_TIMEOUT) + key = (data["room_id"], data["sender"]) + if pending_by_conversation.get(key) is data: + await process_complete_message(data) + + +def get_or_create_pending(room_id: str, sender: str, event_id: Optional[str] = None) -> Dict: + if event_id and event_id in pending_by_event_id: + return pending_by_event_id[event_id] + + key = (room_id, sender) + if key in pending_by_conversation: + return pending_by_conversation[key] + + data = { + "room_id": room_id, + "sender": sender, + "text": None, + "images": [], + "audio": [], + "timestamp": time.time(), + "task": None, + } + if event_id: + data["event_id"] = event_id + pending_by_conversation[key] = data + if event_id: + pending_by_event_id[event_id] = data + return data + + +def reset_timer(data: Dict): + if data["task"] and not data["task"].done(): + data["task"].cancel() + data["timestamp"] = time.time() + data["task"] = asyncio.create_task(delayed_processing(data)) + + +async def on_text_message(room, event: RoomMessageText): + if event.sender == client.user_id: + return + if room.room_id not in ALLOWED_ROOMS: + return + + event_id = event.event_id + data = get_or_create_pending(room.room_id, event.sender, event_id) + data["text"] = event.body + reset_timer(data) + print(f"[TEXT] Добавлен текст в сообщение от {event.sender}: {event.body}") + + +async def on_image_message(room, event: RoomMessageImage): + if event.sender == client.user_id: + return + if room.room_id not in ALLOWED_ROOMS: + return + + related_event_id = None + if hasattr(event, "source") and "content" in event.source: + content = event.source["content"] + if "m.relates_to" in content and "event_id" in content["m.relates_to"]: + related_event_id = content["m.relates_to"]["event_id"] + + data = get_or_create_pending(room.room_id, event.sender, related_event_id) + + download_result = await client.download(event.url) + if isinstance(download_result, ErrorResponse): + print(f"Ошибка скачивания изображения: {download_result.status_code}") + await send_error_message(room.room_id, "Не удалось загрузить изображение.") + return + + data["images"].append(download_result.body) + reset_timer(data) + print(f"[IMAGE] Добавлено изображение в сообщение от {event.sender}") + + +async def on_audio_message(room, event: RoomMessageAudio): + if event.sender == client.user_id: + return + if room.room_id not in ALLOWED_ROOMS: + return + + related_event_id = None + if hasattr(event, "source") and "content" in event.source: + content = event.source["content"] + if "m.relates_to" in content and "event_id" in content["m.relates_to"]: + related_event_id = content["m.relates_to"]["event_id"] + + data = get_or_create_pending(room.room_id, event.sender, related_event_id) + + download_result = await client.download(event.url) + if isinstance(download_result, ErrorResponse): + print(f"Ошибка скачивания аудио: {download_result.status_code}") + await send_error_message(room.room_id, "Не удалось загрузить аудио.") + return + + data["audio"].append(download_result.body) + reset_timer(data) + print(f"[AUDIO] Добавлено аудио в сообщение от {event.sender}") + + +async def main(): + global client + + config = AsyncClientConfig( + max_timeouts=10, + store_sync_tokens=True, + encryption_enabled=False, + ) + client = AsyncClient( + homeserver=HOMESERVER, + user=USERNAME, + device_id=None, + config=config, + ) + + try: + if PASSWORD: + response = await client.login(PASSWORD) + else: + response = await client.login(token=os.environ.get("ACCESS_TOKEN", "")) + + if isinstance(response, LoginResponse): + print(f"Бот {USERNAME} успешно авторизован на {HOMESERVER}") + print(f"Access token: {client.access_token}") + else: + print(f"Ошибка авторизации: {response}") + return + except Exception as e: + print(f"Исключение при авторизации: {e}") + return + + client.add_event_callback(on_text_message, RoomMessageText) + client.add_event_callback(on_image_message, RoomMessageImage) + client.add_event_callback(on_audio_message, RoomMessageAudio) + + print("Бот запущен, ожидание событий...") + try: + await client.sync_forever(timeout=30000) + except KeyboardInterrupt: + print("Бот остановлен пользователем") + finally: + await client.close() + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..d845631 Binary files /dev/null and b/requirements.txt differ