test(gateway): cover photo burst interrupt regressions

Add regression coverage for non-album Telegram photo burst batching, photo follow-ups that should queue without interrupting active runs, and the gateway priority-interrupt path for photo events.
This commit is contained in:
teknium1 2026-03-15 03:50:45 -07:00
parent 4ae1334287
commit fef710aca8
3 changed files with 96 additions and 1 deletions

View file

@ -11,7 +11,7 @@ import asyncio
import pytest
from gateway.config import Platform, PlatformConfig
from gateway.platforms.base import BasePlatformAdapter, MessageEvent, SendResult
from gateway.platforms.base import BasePlatformAdapter, MessageEvent, MessageType, SendResult
from gateway.session import SessionSource, build_session_key
@ -122,3 +122,29 @@ class TestInterruptKeyConsistency:
# Interrupt event was set
assert adapter._active_sessions[session_key].is_set()
@pytest.mark.asyncio
async def test_photo_followup_is_queued_without_interrupt(self):
"""Photo follow-ups should queue behind the active run instead of interrupting it."""
adapter = StubAdapter()
adapter.set_message_handler(lambda event: asyncio.sleep(0, result=None))
source = _source("-1001234", "group")
session_key = build_session_key(source)
interrupt_event = asyncio.Event()
adapter._active_sessions[session_key] = interrupt_event
event = MessageEvent(
text="caption",
source=source,
message_type=MessageType.PHOTO,
message_id="2",
media_urls=["/tmp/photo-a.jpg"],
media_types=["image/jpeg"],
)
await adapter.handle_message(event)
queued = adapter._pending_messages[session_key]
assert queued is event
assert queued.media_urls == ["/tmp/photo-a.jpg"]
assert interrupt_event.is_set() is False