работает в телеге

This commit is contained in:
Пьянзин Михаил 2026-04-02 13:37:20 +03:00
parent 96858d6364
commit 75704f6930
3 changed files with 301 additions and 262 deletions

View file

@ -1,69 +1,47 @@
# Matrix Image Recognition Bot
# Telegram Image Description Bot
Бот для Matrix, который распознаёт изображения с помощью Qwen-VL.
## Описание
Этот бот подключается к Matrix серверу, слушает изображения в разрешённых комнатах и отправляет их в Qwen-VL API для анализа. Бот возвращает описание изображения обратно в чат.
Бот для Telegram, который описывает изображения с помощью Qwen-VL API.
## Требования
- Python 3.10+
- Доступ к Matrix серверу
- API ключ Qwen-VL
- Токен Telegram бота (получить у [@BotFather](https://t.me/BotFather))
- Ключ доступа к Qwen API
## Установка
1. Клонируйте репозиторий:
```bash
git clone <repository-url>
cd b2b_assistants
```
2. Создайте виртуальное окружение:
```bash
python -m venv venv
source venv/bin/activate # Linux/Mac
# или
venv\Scripts\activate # Windows
```
3. Установите зависимости:
1. Установите зависимости:
```bash
pip install -r requirements.txt
```
4. Настройте переменные окружения в файле `.env`:
2. Настройте переменные окружения в файле `.env`:
```env
# Matrix подключение
HOMESERVER=https://matrix.lambda.coredump.ru
MATRIX_USERNAME=@image_bot:matrix.lambda.coredump.ru
PASSWORD=ваш_пароль_бота
ALLOWED_ROOMS=!QcPkdLDWqDegdtDnpP:matrix.lambda.coredump.ru
# Telegram Bot Token (получите у @BotFather)
TELEGRAM_BOT_TOKEN=ваш_токен_бота
# Qwen API
QWEN_API_KEY=sk-L6oRP0m15Z9YquluktS6w
# Qwen API настройки (уже заполнены)
QWEN_API_KEY=sk-L6oRP0mP15Z9YquluktS6w
QWEN_ENDPOINT=https://llm.lambda.coredump.ru/v1
QWEN_VL_MODEL=qwen-vl-plus
```
## Запуск
3. Запустите бота:
```bash
python src/image_bot.py
```
## Команды бота
- `/help` - показать справку по командам
- `/status` - показать статус бота
## Использование
1. Пригласите бота в комнату Matrix
2. Отправьте изображение в чат
3. Бот автоматически проанализирует изображение и вернёт описание
1. Найдите вашего бота в Telegram и нажмите `/start`
2. Отправьте боту изображение
3. Бот вернёт описание изображения на русском языке
## Команды
- `/start` - начать работу с ботом
- `/help` - показать справку
- `/settoken <token>` - установить токен API (временное решение)
## Структура проекта
@ -76,6 +54,8 @@ b2b_assistants/
└── image_bot.py # Основной код бота
```
## Лицензия
## Примечания
MIT
- Бот использует base64 кодирование для отправки изображений в Qwen-VL API
- Для ограничения доступа используйте переменную `ALLOWED_USERS` в `.env`
- Время обработки изображения может составлять до 2 минут для больших файлов

View file

@ -1,3 +1,3 @@
matrix-nio>=0.25.0
httpx>=0.24.0
python-dotenv>=1.0.0
python-telegram-bot==20.7
python-dotenv==1.0.0
requests==2.31.0

View file

@ -1,241 +1,300 @@
#!/usr/bin/env python3
import asyncio
"""
Telegram Bot that describes images using Qwen-VL API.
"""
import os
import base64
import logging
from typing import Optional
from dataclasses import dataclass
import tempfile
from pathlib import Path
import httpx
from nio import (
AsyncClient,
RoomMessageImage,
RoomMessageText,
LoginResponse,
JoinResponse,
)
import requests
from dotenv import load_dotenv
from telegram import Update
from telegram.ext import (
Application,
CommandHandler,
MessageHandler,
filters,
ContextTypes,
)
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
# Qwen API configuration
QWEN_API_KEY = os.getenv("QWEN_API_KEY")
QWEN_ENDPOINT = os.getenv("QWEN_ENDPOINT")
QWEN_MODEL = os.getenv("QWEN_VL_MODEL", "qwen-vl-plus")
@dataclass
class BotConfig:
homeserver: str
username: str
password: str
allowed_rooms: list[str]
qwen_api_key: str
qwen_endpoint: str
qwen_model: str
# Telegram bot token
TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
# Allowed user IDs (empty list means all users allowed)
ALLOWED_USERS = [int(user.strip()) for user in os.getenv("ALLOWED_USERS", "").split(",") if user.strip()]
def load_config() -> BotConfig:
load_dotenv()
allowed_rooms_str = os.getenv("ALLOWED_ROOMS", "")
allowed_rooms = [r.strip() for r in allowed_rooms_str.split(",") if r.strip()]
return BotConfig(
homeserver=os.getenv("HOMESERVER", "https://matrix.lambda.coredump.ru"),
username=os.getenv("MATRIX_USERNAME", ""),
password=os.getenv("PASSWORD", ""),
allowed_rooms=allowed_rooms,
qwen_api_key=os.getenv("QWEN_API_KEY", ""),
qwen_endpoint=os.getenv("QWEN_ENDPOINT", "https://llm.lambda.coredump.ru/v1"),
qwen_model=os.getenv("QWEN_VL_MODEL", "qwen-vl-plus"),
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Send a message when the command /start is issued."""
user = update.effective_user
await update.message.reply_text(
f"Привет, {user.first_name}!\n\n"
"Я бот, который описывает и генерирует изображения.\n\n"
"📸 Отправь мне картинку - я опишу что на ней изображено\n"
"🎨 Используй /draw <описание> - я сгенерирую изображение\n\n"
"Доступные команды:\n"
"/start - показать это сообщение\n"
"/help - показать справку\n"
"/draw <описание> - сгенерировать изображение"
)
class QwenVLClient:
def __init__(self, api_key: str, endpoint: str, model: str):
self.api_key = api_key
self.endpoint = endpoint
self.model = model
self.client = httpx.AsyncClient(timeout=120.0)
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Send a message when the command /help is issued."""
help_text = (
"📸 **Бот описания и генерации изображений**\n\n"
"Этот бот использует Qwen-VL для анализа изображений и генерирует картинки по запросу.\n\n"
"📋 **Команды:**\n"
"/start - начать работу\n"
"/help - показать эту справку\n"
"/draw <описание> - сгенерировать изображение по описанию\n\n"
"💡 **Как использовать:**\n"
"• Отправьте боту изображение для получения описания\n"
"• Используйте /draw для генерации изображения"
)
await update.message.reply_text(help_text, parse_mode="Markdown")
async def analyze_image(self, image_data: bytes, prompt: str = "Опиши это изображение подробно:") -> str:
image_base64 = base64.b64encode(image_data).decode("utf-8")
image_url = f"data:image/jpeg;base64,{image_base64}"
async def describe_image(image_url: str) -> str:
"""Send image to Qwen-VL API and get description."""
headers = {
"Authorization": f"Bearer {QWEN_API_KEY}",
"Content-Type": "application/json",
}
payload = {
"model": self.model,
"model": QWEN_MODEL,
"messages": [
{
"role": "user",
"content": [
{"type": "image_url", "image_url": {"url": image_url}},
{"type": "text", "text": prompt}
]
{"type": "text", "text": "Кратко опиши изображение на русском языке. Перечисли основные детали, каждая в 1-2 предложениях. Будь лаконичен."},
],
}
]
],
}
headers = {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"}
api_url = f"{QWEN_ENDPOINT}/chat/completions"
try:
response = await self.client.post(
f"{self.endpoint}/chat/completions",
json=payload,
headers=headers
)
response.raise_for_status()
response = requests.post(api_url, headers=headers, json=payload, timeout=60)
if response.status_code != 200:
logger.error(f"API error: {response.status_code} - {response.text}")
return f"Ошибка API: {response.status_code}. {response.text[:200]}"
result = response.json()
description = result["choices"][0]["message"]["content"]
return description
except requests.exceptions.RequestException as e:
logger.error(f"API request failed: {e}")
return f"Ошибка при запросе к API: {str(e)}"
except (KeyError, IndexError) as e:
logger.error(f"Unexpected response format: {e}")
return "Ошибка при обработке ответа от API"
async def handle_photo(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle photo messages."""
# Check if user is allowed
if ALLOWED_USERS and update.effective_user.id not in ALLOWED_USERS:
await update.message.reply_text("У вас нет доступа к этому боту.")
return
photo = update.message.photo[-1] # Get the highest resolution photo
user = update.effective_user
await update.message.reply_text("🔍 Анализирую изображение...")
try:
# Download the photo
file = await context.bot.get_file(photo.file_id)
# Create a temporary file
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp_file:
tmp_path = tmp_file.name
await file.download_to_drive(tmp_path)
# For Qwen API, we need a public URL
# Since we can't easily host the image, we'll use a different approach
# We'll read the image and send it as base64 if the API supports it
try:
description = await describe_image_with_base64(tmp_path)
except Exception as e:
logger.warning(f"Base64 approach failed: {e}, trying URL approach")
# Fallback: try with a placeholder URL approach
description = await describe_image("placeholder")
# Clean up temp file
Path(tmp_path).unlink(missing_ok=True)
await update.message.reply_text(f"📝 Описание:\n\n{description}")
except Exception as e:
logger.error(f"Error processing photo: {e}")
await update.message.reply_text(f"❌ Произошла ошибка при обработке изображения: {str(e)}")
async def describe_image_with_base64(image_path: str) -> str:
"""Send image to Qwen-VL API using base64 encoding."""
import base64
headers = {
"Authorization": f"Bearer {QWEN_API_KEY}",
"Content-Type": "application/json",
}
# Read and encode image
with open(image_path, "rb") as f:
image_data = base64.b64encode(f.read()).decode("utf-8")
# Create data URI
image_uri = f"data:image/jpeg;base64,{image_data}"
payload = {
"model": QWEN_MODEL,
"messages": [
{
"role": "user",
"content": [
{"type": "image_url", "image_url": {"url": image_uri}},
{"type": "text", "text": "Кратко опиши изображение на русском языке. Перечисли основные детали, каждая в 1-2 предложениях. Будь лаконичен."},
],
}
],
}
api_url = f"{QWEN_ENDPOINT}/chat/completions"
response = requests.post(api_url, headers=headers, json=payload, timeout=120)
if response.status_code != 200:
logger.error(f"API error: {response.status_code} - {response.text}")
raise Exception(f"API error: {response.status_code} - {response.text}")
result = response.json()
return result["choices"][0]["message"]["content"]
except httpx.HTTPError as e:
logger.error(f"Error calling Qwen-VL API: {e}")
if e.response:
logger.error(f"Response: {e.response.text}")
return f"Ошибка при анализе изображения: {str(e)}"
async def close(self):
await self.client.aclose()
class ImageBot:
def __init__(self, config: BotConfig):
self.config = config
self.client = AsyncClient(config.homeserver, config.username) # исправлено
self.qwen_client = QwenVLClient(
config.qwen_api_key,
config.qwen_endpoint,
config.qwen_model
async def draw_image(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle /draw command to generate images."""
if not context.args:
await update.message.reply_text(
"Пожалуйста, укажите описание изображения:\n/draw <описание того что хотите увидеть>"
)
self.default_prompt = "Опиши это изображение подробно."
return
prompt = " ".join(context.args)
await update.message.reply_text("🎨 Генерирую изображение... Это может занять до 2 минут.")
async def login(self) -> bool:
try:
response = await self.client.login(
password=self.config.password,
device_name="image_recognition_bot"
)
if isinstance(response, LoginResponse):
logger.info(f"Logged in as {self.config.username}")
return True
image_url = await generate_image(prompt)
if image_url:
await context.bot.send_photo(chat_id=update.effective_chat.id, photo=image_url, caption=f"{prompt}")
else:
logger.error(f"Login failed: {response}")
return False
await update.message.reply_text("Не удалось сгенерировать изображение. API не поддерживает генерацию картинок.")
except Exception as e:
logger.error(f"Login error: {e}")
return False
logger.error(f"Image generation error: {e}")
await update.message.reply_text(f"❌ Ошибка при генерации: {str(e)}")
async def generate_image(prompt: str) -> str | None:
"""Generate image using Qwen API or alternative services."""
headers = {
"Authorization": f"Bearer {QWEN_API_KEY}",
"Content-Type": "application/json",
}
# Try Qwen image generation endpoint first
# API expects prompt as a direct parameter
payload = {
"model": "mai",
"prompt": prompt,
"n": 1,
"size": "1024x1024"
}
api_url = f"{QWEN_ENDPOINT}/images/generations"
async def join_rooms(self) -> None:
for room_id in self.config.allowed_rooms:
try:
response = await self.client.join(room_id)
if isinstance(response, JoinResponse):
logger.info(f"Joined room: {room_id}")
response = requests.post(api_url, headers=headers, json=payload, timeout=120)
logger.info(f"Image API response: {response.status_code}")
if response.status_code == 200:
result = response.json()
logger.info(f"Image result: {result}")
# Try different response formats
if "data" in result and len(result["data"]) > 0:
return result["data"][0].get("url") or result["data"][0].get("image_url")
elif "image_url" in result:
return result["image_url"]
elif "output" in result and "url" in result["output"]:
return result["output"]["url"]
else:
logger.warning(f"Could not join room {room_id}: {response}")
logger.warning(f"Image API failed: {response.status_code} - {response.text}")
except Exception as e:
logger.error(f"Error joining room {room_id}: {e}")
logger.warning(f"Image generation failed: {e}")
async def download_image(self, url: str) -> Optional[bytes]:
try:
logger.info(f"Downloading image from {url}")
response = await self.client.download(url) # правильный метод
if response and hasattr(response, 'body'):
logger.info(f"Downloaded {len(response.body)} bytes")
return response.body
return None
except Exception as e:
logger.error(f"Download error: {e}")
# Fallback: return None and inform user
return None
async def send_response(self, room_id: str, message: str) -> None:
try:
await self.client.room_send(
room_id=room_id,
message_type="m.room.message",
content={"msgtype": "m.text", "body": message}
)
except Exception as e:
logger.error(f"Error sending response: {e}")
async def handle_image_message(self, room_id: str, event: RoomMessageImage) -> None:
if room_id not in self.config.allowed_rooms:
logger.warning(f"Ignoring image from non-allowed room {room_id}")
def main() -> None:
"""Start the bot."""
if not TELEGRAM_BOT_TOKEN:
logger.error("TELEGRAM_BOT_TOKEN not set. Please set it in .env file.")
print("❌ Ошибка: TELEGRAM_BOT_TOKEN не установлен.")
print("\nПолучите токен у @BotFather в Telegram:")
print("1. Напишите @BotFather")
print("2. Отправьте команду /newbot")
print("3. Следуйте инструкциям и скопируйте токен")
print("\nДобавьте токен в файл .env:")
print("TELEGRAM_BOT_TOKEN=ваш_токен_от_BotFather")
return
logger.info(f"Processing image in {room_id}")
image_url = event.url # прямой доступ к URL
if not image_url:
await self.send_response(room_id, "Не удалось получить изображение.")
return
# Create application with increased timeout configuration
application = Application.builder() \
.token(TELEGRAM_BOT_TOKEN) \
.connect_timeout(60) \
.read_timeout(60) \
.write_timeout(60) \
.pool_timeout(60) \
.build()
image_data = await self.download_image(image_url)
if not image_data:
await self.send_response(room_id, "Не удалось загрузить изображение.")
return
try:
await self.client.typing(room_id, timeout=4000)
except Exception:
pass
await self.send_response(room_id, "🔍 Анализирую изображение...")
result = await self.qwen_client.analyze_image(image_data, self.default_prompt)
await self.send_response(room_id, f"📸 *Результат анализа:*\n\n{result}")
async def handle_text_message(self, room_id: str, event: RoomMessageText) -> None:
if room_id not in self.config.allowed_rooms:
logger.warning(f"Ignoring text from non-allowed room {room_id}")
return
body = event.body.strip().lower()
if body == "/help":
await self.send_response(room_id, "🤖 *Commands:*\n/help - справка\n/status - статус бота")
elif body == "/status":
await self.send_response(
room_id,
f"✅ *Статус:*\nПользователь: {self.config.username}\nМодель: {self.config.qwen_model}\nЭндпоинт: {self.config.qwen_endpoint}"
# Add handlers
application.add_handler(CommandHandler("start", start))
application.add_handler(CommandHandler("help", help_command))
application.add_handler(CommandHandler("draw", draw_image))
application.add_handler(
MessageHandler(filters.PHOTO, handle_photo),
)
async def register_callbacks(self) -> None:
self.client.add_event_callback(self.image_callback, RoomMessageImage)
self.client.add_event_callback(self.text_callback, RoomMessageText)
async def image_callback(self, room: RoomMessageImage, event: RoomMessageImage) -> None:
await self.handle_image_message(room.room_id, event)
async def text_callback(self, room: RoomMessageText, event: RoomMessageText) -> None:
await self.handle_text_message(room.room_id, event)
async def sync_loop(self) -> None:
logger.info("Starting sync loop...")
try:
await self.client.sync_forever(timeout=30000, full_state=True)
except Exception as e:
logger.error(f"Sync error: {e}")
raise
async def close(self) -> None:
await self.client.close()
await self.qwen_client.close()
async def main():
config = load_config()
if not config.password or not config.qwen_api_key or not config.allowed_rooms:
logger.error("Missing required config")
return
bot = ImageBot(config)
try:
if not await bot.login():
return
await bot.join_rooms()
await bot.register_callbacks()
await bot.sync_loop()
except KeyboardInterrupt:
logger.info("Bot stopped")
except Exception as e:
logger.error(f"Bot error: {e}")
finally:
await bot.close()
# Start the bot
logger.info("Бот запущен...")
print("🤖 Бот запущен! Ожидание сообщений...")
application.run_polling(allowed_updates=Update.ALL_TYPES)
if __name__ == "__main__":
asyncio.run(main())
main()