from __future__ import annotations import asyncio import time import structlog from aiogram import F, Router from aiogram.exceptions import TelegramBadRequest from aiogram.types import Message from adapter.telegram import converter, db from core.handler import EventDispatcher logger = structlog.get_logger(__name__) router = Router(name="message") STREAM_EDIT_INTERVAL = 1.5 STREAM_MIN_DELTA = 100 TELEGRAM_MAX_LEN = 4096 @router.message(F.text & F.message_thread_id) async def handle_topic_message(message: Message, dispatcher: EventDispatcher) -> None: """Route a text message in a topic to the platform and stream the response.""" user_id = message.from_user.id thread_id = message.message_thread_id chat = db.get_chat(user_id=user_id, thread_id=thread_id) if chat is None or chat["archived_at"] is not None: return incoming = converter.from_message(message) if incoming is None: return platform_user = await dispatcher._platform.get_or_create_user( external_id=str(user_id), platform="telegram", display_name=message.from_user.full_name, ) placeholder = await message.reply("...") accumulated = "" last_edit_time = 0.0 last_edit_len = 0 try: async with asyncio.timeout(30): async for chunk in dispatcher._platform.stream_message( user_id=platform_user.user_id, chat_id=str(thread_id), text=incoming.text, attachments=None, ): accumulated += chunk.delta now = time.monotonic() delta = len(accumulated) - last_edit_len if delta >= STREAM_MIN_DELTA and (now - last_edit_time) >= STREAM_EDIT_INTERVAL: await _safe_edit(placeholder, accumulated) last_edit_time = now last_edit_len = len(accumulated) await _safe_edit(placeholder, accumulated or "...") except TimeoutError: logger.warning("platform_timeout", user_id=user_id, thread_id=thread_id) await _safe_edit(placeholder, "Сервис не отвечает, попробуй позже") except TelegramBadRequest as e: if "thread not found" in str(e).lower(): db.archive_chat(user_id=user_id, thread_id=thread_id) logger.warning("topic_deleted_during_message", thread_id=thread_id) else: logger.error("telegram_error", error=str(e)) await _safe_edit(placeholder, "Ошибка отправки, попробуй ещё раз") except Exception: logger.exception("platform_error", user_id=user_id, thread_id=thread_id) await _safe_edit(placeholder, "Сервис временно недоступен, попробуй позже") async def _safe_edit(message: Message, text: str) -> None: try: await message.edit_text(text[:TELEGRAM_MAX_LEN]) except TelegramBadRequest as e: if "not modified" not in str(e).lower(): raise