fix(gateway): buffer Telegram media groups to prevent self-interruption
Telegram albums arrive as multiple updates with a shared media_group_id. Previously each image triggered a separate MessageEvent, causing the agent to interrupt itself when describing the first image. - Add 0.8s debounce window for media group items - Merge attachments into single MessageEvent - Add regression test for photo album buffering
This commit is contained in:
parent
c207a6b302
commit
8fb618234f
2 changed files with 85 additions and 4 deletions
|
|
@ -105,11 +105,14 @@ class TelegramAdapter(BasePlatformAdapter):
|
||||||
|
|
||||||
# Telegram message limits
|
# Telegram message limits
|
||||||
MAX_MESSAGE_LENGTH = 4096
|
MAX_MESSAGE_LENGTH = 4096
|
||||||
|
MEDIA_GROUP_WAIT_SECONDS = 0.8
|
||||||
|
|
||||||
def __init__(self, config: PlatformConfig):
|
def __init__(self, config: PlatformConfig):
|
||||||
super().__init__(config, Platform.TELEGRAM)
|
super().__init__(config, Platform.TELEGRAM)
|
||||||
self._app: Optional[Application] = None
|
self._app: Optional[Application] = None
|
||||||
self._bot: Optional[Bot] = None
|
self._bot: Optional[Bot] = None
|
||||||
|
self._media_group_events: Dict[str, MessageEvent] = {}
|
||||||
|
self._media_group_tasks: Dict[str, asyncio.Task] = {}
|
||||||
|
|
||||||
async def connect(self) -> bool:
|
async def connect(self) -> bool:
|
||||||
"""Connect to Telegram and start polling for updates."""
|
"""Connect to Telegram and start polling for updates."""
|
||||||
|
|
@ -872,8 +875,53 @@ class TelegramAdapter(BasePlatformAdapter):
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning("[Telegram] Failed to cache document: %s", e, exc_info=True)
|
logger.warning("[Telegram] Failed to cache document: %s", e, exc_info=True)
|
||||||
|
|
||||||
|
media_group_id = getattr(msg, "media_group_id", None)
|
||||||
|
if media_group_id:
|
||||||
|
await self._queue_media_group_event(str(media_group_id), event)
|
||||||
|
return
|
||||||
|
|
||||||
await self.handle_message(event)
|
await self.handle_message(event)
|
||||||
|
|
||||||
|
async def _queue_media_group_event(self, media_group_id: str, event: MessageEvent) -> None:
|
||||||
|
"""Buffer Telegram media-group items so albums arrive as one logical event.
|
||||||
|
|
||||||
|
Telegram delivers albums as multiple updates with a shared media_group_id.
|
||||||
|
If we forward each item immediately, the gateway thinks the second image is a
|
||||||
|
new user message and interrupts the first. We debounce briefly and merge the
|
||||||
|
attachments into a single MessageEvent.
|
||||||
|
"""
|
||||||
|
existing = self._media_group_events.get(media_group_id)
|
||||||
|
if existing is None:
|
||||||
|
self._media_group_events[media_group_id] = event
|
||||||
|
else:
|
||||||
|
existing.media_urls.extend(event.media_urls)
|
||||||
|
existing.media_types.extend(event.media_types)
|
||||||
|
if event.text:
|
||||||
|
if existing.text:
|
||||||
|
if event.text not in existing.text.split("\n\n"):
|
||||||
|
existing.text = f"{existing.text}\n\n{event.text}"
|
||||||
|
else:
|
||||||
|
existing.text = event.text
|
||||||
|
|
||||||
|
prior_task = self._media_group_tasks.get(media_group_id)
|
||||||
|
if prior_task:
|
||||||
|
prior_task.cancel()
|
||||||
|
|
||||||
|
self._media_group_tasks[media_group_id] = asyncio.create_task(
|
||||||
|
self._flush_media_group_event(media_group_id)
|
||||||
|
)
|
||||||
|
|
||||||
|
async def _flush_media_group_event(self, media_group_id: str) -> None:
|
||||||
|
try:
|
||||||
|
await asyncio.sleep(self.MEDIA_GROUP_WAIT_SECONDS)
|
||||||
|
event = self._media_group_events.pop(media_group_id, None)
|
||||||
|
if event is not None:
|
||||||
|
await self.handle_message(event)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
return
|
||||||
|
finally:
|
||||||
|
self._media_group_tasks.pop(media_group_id, None)
|
||||||
|
|
||||||
async def _handle_sticker(self, msg: Message, event: "MessageEvent") -> None:
|
async def _handle_sticker(self, msg: Message, event: "MessageEvent") -> None:
|
||||||
"""
|
"""
|
||||||
Describe a Telegram sticker via vision analysis, with caching.
|
Describe a Telegram sticker via vision analysis, with caching.
|
||||||
|
|
|
||||||
|
|
@ -81,20 +81,21 @@ def _make_document(
|
||||||
return doc
|
return doc
|
||||||
|
|
||||||
|
|
||||||
def _make_message(document=None, caption=None):
|
def _make_message(document=None, caption=None, media_group_id=None, photo=None):
|
||||||
"""Build a mock Telegram Message with the given document."""
|
"""Build a mock Telegram Message with the given document/photo."""
|
||||||
msg = MagicMock()
|
msg = MagicMock()
|
||||||
msg.message_id = 42
|
msg.message_id = 42
|
||||||
msg.text = caption or ""
|
msg.text = caption or ""
|
||||||
msg.caption = caption
|
msg.caption = caption
|
||||||
msg.date = None
|
msg.date = None
|
||||||
# Media flags — all None except document
|
# Media flags — all None except explicit payload
|
||||||
msg.photo = None
|
msg.photo = photo
|
||||||
msg.video = None
|
msg.video = None
|
||||||
msg.audio = None
|
msg.audio = None
|
||||||
msg.voice = None
|
msg.voice = None
|
||||||
msg.sticker = None
|
msg.sticker = None
|
||||||
msg.document = document
|
msg.document = document
|
||||||
|
msg.media_group_id = media_group_id
|
||||||
# Chat / user
|
# Chat / user
|
||||||
msg.chat = MagicMock()
|
msg.chat = MagicMock()
|
||||||
msg.chat.id = 100
|
msg.chat.id = 100
|
||||||
|
|
@ -165,6 +166,12 @@ class TestDocumentTypeDetection:
|
||||||
# TestDocumentDownloadBlock
|
# TestDocumentDownloadBlock
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def _make_photo(file_obj=None):
|
||||||
|
photo = MagicMock()
|
||||||
|
photo.get_file = AsyncMock(return_value=file_obj or _make_file_obj(b"photo-bytes"))
|
||||||
|
return photo
|
||||||
|
|
||||||
|
|
||||||
class TestDocumentDownloadBlock:
|
class TestDocumentDownloadBlock:
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_supported_pdf_is_cached(self, adapter):
|
async def test_supported_pdf_is_cached(self, adapter):
|
||||||
|
|
@ -339,6 +346,32 @@ class TestDocumentDownloadBlock:
|
||||||
adapter.handle_message.assert_called_once()
|
adapter.handle_message.assert_called_once()
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# TestMediaGroups — media group (album) buffering
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class TestMediaGroups:
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_photo_album_is_buffered_and_combined(self, adapter):
|
||||||
|
first_photo = _make_photo(_make_file_obj(b"first"))
|
||||||
|
second_photo = _make_photo(_make_file_obj(b"second"))
|
||||||
|
|
||||||
|
msg1 = _make_message(caption="two images", media_group_id="album-1", photo=[first_photo])
|
||||||
|
msg2 = _make_message(media_group_id="album-1", photo=[second_photo])
|
||||||
|
|
||||||
|
with patch("gateway.platforms.telegram.cache_image_from_bytes", side_effect=["/tmp/one.jpg", "/tmp/two.jpg"]):
|
||||||
|
await adapter._handle_media_message(_make_update(msg1), MagicMock())
|
||||||
|
await adapter._handle_media_message(_make_update(msg2), MagicMock())
|
||||||
|
assert adapter.handle_message.await_count == 0
|
||||||
|
await asyncio.sleep(adapter.MEDIA_GROUP_WAIT_SECONDS + 0.05)
|
||||||
|
|
||||||
|
adapter.handle_message.assert_awaited_once()
|
||||||
|
event = adapter.handle_message.call_args[0][0]
|
||||||
|
assert event.text == "two images"
|
||||||
|
assert event.media_urls == ["/tmp/one.jpg", "/tmp/two.jpg"]
|
||||||
|
assert len(event.media_types) == 2
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# TestSendDocument — outbound file attachment delivery
|
# TestSendDocument — outbound file attachment delivery
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue