from __future__ import annotations import time import structlog from aiogram import F, Router from aiogram.exceptions import TelegramBadRequest from aiogram.types import Message from adapter.telegram import converter, db from core.handler import EventDispatcher logger = structlog.get_logger(__name__) router = Router(name="message") STREAM_EDIT_INTERVAL = 1.5 STREAM_MIN_DELTA = 100 TELEGRAM_MAX_LEN = 4096 @router.message(F.text & F.message_thread_id) async def handle_topic_message(message: Message, dispatcher: EventDispatcher) -> None: """Route a text message in a topic to the platform and stream the response.""" user_id = message.from_user.id thread_id = message.message_thread_id chat = db.get_chat(user_id=user_id, thread_id=thread_id) if chat is None or chat["archived_at"] is not None: return incoming = converter.from_message(message) if incoming is None: return platform_user = await dispatcher._platform.get_or_create_user( external_id=str(user_id), platform="telegram", display_name=message.from_user.full_name, ) placeholder = await message.reply("...") accumulated = "" last_edit_time = 0.0 last_edit_len = 0 try: async for chunk in dispatcher._platform.stream_message( user_id=platform_user.user_id, chat_id=str(thread_id), text=incoming.text, attachments=None, ): accumulated += chunk.delta now = time.monotonic() delta = len(accumulated) - last_edit_len if delta >= STREAM_MIN_DELTA and (now - last_edit_time) >= STREAM_EDIT_INTERVAL: await _safe_edit(placeholder, accumulated) last_edit_time = now last_edit_len = len(accumulated) await _safe_edit(placeholder, accumulated or "...") except TelegramBadRequest as e: if "thread not found" in str(e).lower(): db.archive_chat(user_id=user_id, thread_id=thread_id) logger.warning("topic_deleted_during_message", thread_id=thread_id) else: logger.error("telegram_error", error=str(e)) await _safe_edit(placeholder, "Ошибка отправки, попробуй ещё раз") except Exception: logger.exception("platform_error", user_id=user_id, thread_id=thread_id) await _safe_edit(placeholder, "Сервис временно недоступен, попробуй позже") async def _safe_edit(message: Message, text: str) -> None: try: await message.edit_text(text[:TELEGRAM_MAX_LEN]) except TelegramBadRequest as e: if "not modified" not in str(e).lower(): raise