не работает пока(

This commit is contained in:
Пьянзин Михаил 2026-04-01 00:37:54 +03:00
parent 55cec3b876
commit 96858d6364
3 changed files with 324 additions and 2 deletions

View file

@ -1,3 +1,81 @@
# b2b_assistants # Matrix Image Recognition Bot
Репозиторий для разработки B to B рушений Бот для Matrix, который распознаёт изображения с помощью Qwen-VL.
## Описание
Этот бот подключается к Matrix серверу, слушает изображения в разрешённых комнатах и отправляет их в Qwen-VL API для анализа. Бот возвращает описание изображения обратно в чат.
## Требования
- Python 3.10+
- Доступ к Matrix серверу
- API ключ Qwen-VL
## Установка
1. Клонируйте репозиторий:
```bash
git clone <repository-url>
cd b2b_assistants
```
2. Создайте виртуальное окружение:
```bash
python -m venv venv
source venv/bin/activate # Linux/Mac
# или
venv\Scripts\activate # Windows
```
3. Установите зависимости:
```bash
pip install -r requirements.txt
```
4. Настройте переменные окружения в файле `.env`:
```env
# Matrix подключение
HOMESERVER=https://matrix.lambda.coredump.ru
MATRIX_USERNAME=@image_bot:matrix.lambda.coredump.ru
PASSWORD=ваш_пароль_бота
ALLOWED_ROOMS=!QcPkdLDWqDegdtDnpP:matrix.lambda.coredump.ru
# Qwen API
QWEN_API_KEY=sk-L6oRP0m15Z9YquluktS6w
QWEN_ENDPOINT=https://llm.lambda.coredump.ru/v1
QWEN_VL_MODEL=qwen-vl-plus
```
## Запуск
```bash
python src/image_bot.py
```
## Команды бота
- `/help` - показать справку по командам
- `/status` - показать статус бота
## Использование
1. Пригласите бота в комнату Matrix
2. Отправьте изображение в чат
3. Бот автоматически проанализирует изображение и вернёт описание
## Структура проекта
```
b2b_assistants/
├── .env # Переменные окружения
├── requirements.txt # Зависимости Python
├── README.md # Документация
└── src/
└── image_bot.py # Основной код бота
```
## Лицензия
MIT

3
requirements.txt Normal file
View file

@ -0,0 +1,3 @@
matrix-nio>=0.25.0
httpx>=0.24.0
python-dotenv>=1.0.0

241
src/image_bot.py Normal file
View file

@ -0,0 +1,241 @@
#!/usr/bin/env python3
import asyncio
import os
import base64
import logging
from typing import Optional
from dataclasses import dataclass
import httpx
from nio import (
AsyncClient,
RoomMessageImage,
RoomMessageText,
LoginResponse,
JoinResponse,
)
from dotenv import load_dotenv
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
@dataclass
class BotConfig:
homeserver: str
username: str
password: str
allowed_rooms: list[str]
qwen_api_key: str
qwen_endpoint: str
qwen_model: str
def load_config() -> BotConfig:
load_dotenv()
allowed_rooms_str = os.getenv("ALLOWED_ROOMS", "")
allowed_rooms = [r.strip() for r in allowed_rooms_str.split(",") if r.strip()]
return BotConfig(
homeserver=os.getenv("HOMESERVER", "https://matrix.lambda.coredump.ru"),
username=os.getenv("MATRIX_USERNAME", ""),
password=os.getenv("PASSWORD", ""),
allowed_rooms=allowed_rooms,
qwen_api_key=os.getenv("QWEN_API_KEY", ""),
qwen_endpoint=os.getenv("QWEN_ENDPOINT", "https://llm.lambda.coredump.ru/v1"),
qwen_model=os.getenv("QWEN_VL_MODEL", "qwen-vl-plus"),
)
class QwenVLClient:
def __init__(self, api_key: str, endpoint: str, model: str):
self.api_key = api_key
self.endpoint = endpoint
self.model = model
self.client = httpx.AsyncClient(timeout=120.0)
async def analyze_image(self, image_data: bytes, prompt: str = "Опиши это изображение подробно:") -> str:
image_base64 = base64.b64encode(image_data).decode("utf-8")
image_url = f"data:image/jpeg;base64,{image_base64}"
payload = {
"model": self.model,
"messages": [
{
"role": "user",
"content": [
{"type": "image_url", "image_url": {"url": image_url}},
{"type": "text", "text": prompt}
]
}
]
}
headers = {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"}
try:
response = await self.client.post(
f"{self.endpoint}/chat/completions",
json=payload,
headers=headers
)
response.raise_for_status()
result = response.json()
return result["choices"][0]["message"]["content"]
except httpx.HTTPError as e:
logger.error(f"Error calling Qwen-VL API: {e}")
if e.response:
logger.error(f"Response: {e.response.text}")
return f"Ошибка при анализе изображения: {str(e)}"
async def close(self):
await self.client.aclose()
class ImageBot:
def __init__(self, config: BotConfig):
self.config = config
self.client = AsyncClient(config.homeserver, config.username) # исправлено
self.qwen_client = QwenVLClient(
config.qwen_api_key,
config.qwen_endpoint,
config.qwen_model
)
self.default_prompt = "Опиши это изображение подробно."
async def login(self) -> bool:
try:
response = await self.client.login(
password=self.config.password,
device_name="image_recognition_bot"
)
if isinstance(response, LoginResponse):
logger.info(f"Logged in as {self.config.username}")
return True
else:
logger.error(f"Login failed: {response}")
return False
except Exception as e:
logger.error(f"Login error: {e}")
return False
async def join_rooms(self) -> None:
for room_id in self.config.allowed_rooms:
try:
response = await self.client.join(room_id)
if isinstance(response, JoinResponse):
logger.info(f"Joined room: {room_id}")
else:
logger.warning(f"Could not join room {room_id}: {response}")
except Exception as e:
logger.error(f"Error joining room {room_id}: {e}")
async def download_image(self, url: str) -> Optional[bytes]:
try:
logger.info(f"Downloading image from {url}")
response = await self.client.download(url) # правильный метод
if response and hasattr(response, 'body'):
logger.info(f"Downloaded {len(response.body)} bytes")
return response.body
return None
except Exception as e:
logger.error(f"Download error: {e}")
return None
async def send_response(self, room_id: str, message: str) -> None:
try:
await self.client.room_send(
room_id=room_id,
message_type="m.room.message",
content={"msgtype": "m.text", "body": message}
)
except Exception as e:
logger.error(f"Error sending response: {e}")
async def handle_image_message(self, room_id: str, event: RoomMessageImage) -> None:
if room_id not in self.config.allowed_rooms:
logger.warning(f"Ignoring image from non-allowed room {room_id}")
return
logger.info(f"Processing image in {room_id}")
image_url = event.url # прямой доступ к URL
if not image_url:
await self.send_response(room_id, "Не удалось получить изображение.")
return
image_data = await self.download_image(image_url)
if not image_data:
await self.send_response(room_id, "Не удалось загрузить изображение.")
return
try:
await self.client.typing(room_id, timeout=4000)
except Exception:
pass
await self.send_response(room_id, "🔍 Анализирую изображение...")
result = await self.qwen_client.analyze_image(image_data, self.default_prompt)
await self.send_response(room_id, f"📸 *Результат анализа:*\n\n{result}")
async def handle_text_message(self, room_id: str, event: RoomMessageText) -> None:
if room_id not in self.config.allowed_rooms:
logger.warning(f"Ignoring text from non-allowed room {room_id}")
return
body = event.body.strip().lower()
if body == "/help":
await self.send_response(room_id, "🤖 *Commands:*\n/help - справка\n/status - статус бота")
elif body == "/status":
await self.send_response(
room_id,
f"✅ *Статус:*\nПользователь: {self.config.username}\nМодель: {self.config.qwen_model}\nЭндпоинт: {self.config.qwen_endpoint}"
)
async def register_callbacks(self) -> None:
self.client.add_event_callback(self.image_callback, RoomMessageImage)
self.client.add_event_callback(self.text_callback, RoomMessageText)
async def image_callback(self, room: RoomMessageImage, event: RoomMessageImage) -> None:
await self.handle_image_message(room.room_id, event)
async def text_callback(self, room: RoomMessageText, event: RoomMessageText) -> None:
await self.handle_text_message(room.room_id, event)
async def sync_loop(self) -> None:
logger.info("Starting sync loop...")
try:
await self.client.sync_forever(timeout=30000, full_state=True)
except Exception as e:
logger.error(f"Sync error: {e}")
raise
async def close(self) -> None:
await self.client.close()
await self.qwen_client.close()
async def main():
config = load_config()
if not config.password or not config.qwen_api_key or not config.allowed_rooms:
logger.error("Missing required config")
return
bot = ImageBot(config)
try:
if not await bot.login():
return
await bot.join_rooms()
await bot.register_callbacks()
await bot.sync_loop()
except KeyboardInterrupt:
logger.info("Bot stopped")
except Exception as e:
logger.error(f"Bot error: {e}")
finally:
await bot.close()
if __name__ == "__main__":
asyncio.run(main())