surfaces/adapter/telegram/handlers/message.py
Mikhail Putilovskij 8901e60f6a fix(tg): reviewer fixes — error handling, timeouts, db index
- commands.py: try/except TelegramBadRequest around all Bot API calls (#2);
  /new handles "topics limit" with user-friendly message (#4)
- start.py: isolate _check_and_prune_stale_topics with try/except Exception (#3)
- message.py: asyncio.timeout(30) around stream_message; handle TimeoutError (#6)
- db.py: add idx_chats_user_id index in init_db() (#7)
- settings.py: remove dead active_chat_id variable (#8)
- tests: add test_message.py (stream error/success); add 2 tests in test_commands.py
  (topics limit, /archive in General topic)
2026-04-02 13:44:59 +03:00

87 lines
3 KiB
Python

from __future__ import annotations
import asyncio
import time
import structlog
from aiogram import F, Router
from aiogram.exceptions import TelegramBadRequest
from aiogram.types import Message
from adapter.telegram import converter, db
from core.handler import EventDispatcher
logger = structlog.get_logger(__name__)
router = Router(name="message")
STREAM_EDIT_INTERVAL = 1.5
STREAM_MIN_DELTA = 100
TELEGRAM_MAX_LEN = 4096
@router.message(F.text & F.message_thread_id)
async def handle_topic_message(message: Message, dispatcher: EventDispatcher) -> None:
"""Route a text message in a topic to the platform and stream the response."""
user_id = message.from_user.id
thread_id = message.message_thread_id
chat = db.get_chat(user_id=user_id, thread_id=thread_id)
if chat is None or chat["archived_at"] is not None:
return
incoming = converter.from_message(message)
if incoming is None:
return
platform_user = await dispatcher._platform.get_or_create_user(
external_id=str(user_id),
platform="telegram",
display_name=message.from_user.full_name,
)
placeholder = await message.reply("...")
accumulated = ""
last_edit_time = 0.0
last_edit_len = 0
try:
async with asyncio.timeout(30):
async for chunk in dispatcher._platform.stream_message(
user_id=platform_user.user_id,
chat_id=str(thread_id),
text=incoming.text,
attachments=None,
):
accumulated += chunk.delta
now = time.monotonic()
delta = len(accumulated) - last_edit_len
if delta >= STREAM_MIN_DELTA and (now - last_edit_time) >= STREAM_EDIT_INTERVAL:
await _safe_edit(placeholder, accumulated)
last_edit_time = now
last_edit_len = len(accumulated)
await _safe_edit(placeholder, accumulated or "...")
except TimeoutError:
logger.warning("platform_timeout", user_id=user_id, thread_id=thread_id)
await _safe_edit(placeholder, "Сервис не отвечает, попробуй позже")
except TelegramBadRequest as e:
if "thread not found" in str(e).lower():
db.archive_chat(user_id=user_id, thread_id=thread_id)
logger.warning("topic_deleted_during_message", thread_id=thread_id)
else:
logger.error("telegram_error", error=str(e))
await _safe_edit(placeholder, "Ошибка отправки, попробуй ещё раз")
except Exception:
logger.exception("platform_error", user_id=user_id, thread_id=thread_id)
await _safe_edit(placeholder, "Сервис временно недоступен, попробуй позже")
async def _safe_edit(message: Message, text: str) -> None:
try:
await message.edit_text(text[:TELEGRAM_MAX_LEN])
except TelegramBadRequest as e:
if "not modified" not in str(e).lower():
raise