surfaces/tests/adapter/telegram/test_message.py
Mikhail Putilovskij 8901e60f6a fix(tg): reviewer fixes — error handling, timeouts, db index
- commands.py: try/except TelegramBadRequest around all Bot API calls (#2);
  /new handles "topics limit" with user-friendly message (#4)
- start.py: isolate _check_and_prune_stale_topics with try/except Exception (#3)
- message.py: asyncio.timeout(30) around stream_message; handle TimeoutError (#6)
- db.py: add idx_chats_user_id index in init_db() (#7)
- settings.py: remove dead active_chat_id variable (#8)
- tests: add test_message.py (stream error/success); add 2 tests in test_commands.py
  (topics limit, /archive in General topic)
2026-04-02 13:44:59 +03:00

87 lines
2.7 KiB
Python

from __future__ import annotations
import importlib
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
@pytest.fixture(autouse=True)
def fresh_db(tmp_path, monkeypatch):
monkeypatch.setenv("DB_PATH", str(tmp_path / "test.db"))
import adapter.telegram.db as db_mod
importlib.reload(db_mod)
db_mod.init_db()
return db_mod
def make_message(*, user_id=1, thread_id=42, chat_id=100):
m = SimpleNamespace()
m.from_user = SimpleNamespace(id=user_id, full_name="Alice")
m.message_thread_id = thread_id
m.chat = SimpleNamespace(id=chat_id)
m.text = "Hello"
m.photo = None
m.document = None
m.voice = None
m.video = None
m.sticker = None
m.answer = AsyncMock()
placeholder = MagicMock()
placeholder.edit_text = AsyncMock()
m.reply = AsyncMock(return_value=placeholder)
m.bot = MagicMock()
return m, placeholder
def make_dispatcher(chunks=None, raise_exc=None):
"""Build a mock EventDispatcher with configurable stream_message behaviour."""
async def _stream(*args, **kwargs):
if raise_exc is not None:
raise raise_exc
for chunk in (chunks or []):
yield chunk
platform = MagicMock()
platform.get_or_create_user = AsyncMock(
return_value=SimpleNamespace(user_id="uid-1")
)
platform.stream_message = _stream
dispatcher = MagicMock()
dispatcher._platform = platform
return dispatcher
async def test_stream_exception_shows_error(fresh_db):
"""When stream_message raises, the placeholder is updated with an error message."""
fresh_db.create_chat(1, 42, "Чат #1")
import adapter.telegram.handlers.message as mod
importlib.reload(mod)
msg, placeholder = make_message()
dispatcher = make_dispatcher(raise_exc=RuntimeError("boom"))
await mod.handle_topic_message(msg, dispatcher)
placeholder.edit_text.assert_called()
last_call_text = placeholder.edit_text.call_args[0][0]
assert "недоступен" in last_call_text or "ошибка" in last_call_text.lower()
async def test_stream_success_edits_placeholder(fresh_db):
"""When stream_message succeeds, the placeholder is updated with the response."""
fresh_db.create_chat(1, 42, "Чат #1")
import adapter.telegram.handlers.message as mod
importlib.reload(mod)
chunks = [SimpleNamespace(delta="Hello "), SimpleNamespace(delta="world")]
msg, placeholder = make_message()
dispatcher = make_dispatcher(chunks=chunks)
await mod.handle_topic_message(msg, dispatcher)
placeholder.edit_text.assert_called()
last_call_text = placeholder.edit_text.call_args[0][0]
assert "Hello world" in last_call_text