From 8fb618234f3edfff57b5511b13082158bbccdf4a Mon Sep 17 00:00:00 2001 From: capybaraonchain Date: Thu, 12 Mar 2026 14:33:03 +0100 Subject: [PATCH 1/2] fix(gateway): buffer Telegram media groups to prevent self-interruption Telegram albums arrive as multiple updates with a shared media_group_id. Previously each image triggered a separate MessageEvent, causing the agent to interrupt itself when describing the first image. - Add 0.8s debounce window for media group items - Merge attachments into single MessageEvent - Add regression test for photo album buffering --- gateway/platforms/telegram.py | 48 ++++++++++++++++++++++++ tests/gateway/test_telegram_documents.py | 41 ++++++++++++++++++-- 2 files changed, 85 insertions(+), 4 deletions(-) diff --git a/gateway/platforms/telegram.py b/gateway/platforms/telegram.py index df44733e..aae0cce7 100644 --- a/gateway/platforms/telegram.py +++ b/gateway/platforms/telegram.py @@ -105,11 +105,14 @@ class TelegramAdapter(BasePlatformAdapter): # Telegram message limits MAX_MESSAGE_LENGTH = 4096 + MEDIA_GROUP_WAIT_SECONDS = 0.8 def __init__(self, config: PlatformConfig): super().__init__(config, Platform.TELEGRAM) self._app: Optional[Application] = None self._bot: Optional[Bot] = None + self._media_group_events: Dict[str, MessageEvent] = {} + self._media_group_tasks: Dict[str, asyncio.Task] = {} async def connect(self) -> bool: """Connect to Telegram and start polling for updates.""" @@ -872,8 +875,53 @@ class TelegramAdapter(BasePlatformAdapter): except Exception as e: logger.warning("[Telegram] Failed to cache document: %s", e, exc_info=True) + media_group_id = getattr(msg, "media_group_id", None) + if media_group_id: + await self._queue_media_group_event(str(media_group_id), event) + return + await self.handle_message(event) + async def _queue_media_group_event(self, media_group_id: str, event: MessageEvent) -> None: + """Buffer Telegram media-group items so albums arrive as one logical event. + + Telegram delivers albums as multiple updates with a shared media_group_id. + If we forward each item immediately, the gateway thinks the second image is a + new user message and interrupts the first. We debounce briefly and merge the + attachments into a single MessageEvent. + """ + existing = self._media_group_events.get(media_group_id) + if existing is None: + self._media_group_events[media_group_id] = event + else: + existing.media_urls.extend(event.media_urls) + existing.media_types.extend(event.media_types) + if event.text: + if existing.text: + if event.text not in existing.text.split("\n\n"): + existing.text = f"{existing.text}\n\n{event.text}" + else: + existing.text = event.text + + prior_task = self._media_group_tasks.get(media_group_id) + if prior_task: + prior_task.cancel() + + self._media_group_tasks[media_group_id] = asyncio.create_task( + self._flush_media_group_event(media_group_id) + ) + + async def _flush_media_group_event(self, media_group_id: str) -> None: + try: + await asyncio.sleep(self.MEDIA_GROUP_WAIT_SECONDS) + event = self._media_group_events.pop(media_group_id, None) + if event is not None: + await self.handle_message(event) + except asyncio.CancelledError: + return + finally: + self._media_group_tasks.pop(media_group_id, None) + async def _handle_sticker(self, msg: Message, event: "MessageEvent") -> None: """ Describe a Telegram sticker via vision analysis, with caching. diff --git a/tests/gateway/test_telegram_documents.py b/tests/gateway/test_telegram_documents.py index 7a76625f..25cfc249 100644 --- a/tests/gateway/test_telegram_documents.py +++ b/tests/gateway/test_telegram_documents.py @@ -81,20 +81,21 @@ def _make_document( return doc -def _make_message(document=None, caption=None): - """Build a mock Telegram Message with the given document.""" +def _make_message(document=None, caption=None, media_group_id=None, photo=None): + """Build a mock Telegram Message with the given document/photo.""" msg = MagicMock() msg.message_id = 42 msg.text = caption or "" msg.caption = caption msg.date = None - # Media flags — all None except document - msg.photo = None + # Media flags — all None except explicit payload + msg.photo = photo msg.video = None msg.audio = None msg.voice = None msg.sticker = None msg.document = document + msg.media_group_id = media_group_id # Chat / user msg.chat = MagicMock() msg.chat.id = 100 @@ -165,6 +166,12 @@ class TestDocumentTypeDetection: # TestDocumentDownloadBlock # --------------------------------------------------------------------------- +def _make_photo(file_obj=None): + photo = MagicMock() + photo.get_file = AsyncMock(return_value=file_obj or _make_file_obj(b"photo-bytes")) + return photo + + class TestDocumentDownloadBlock: @pytest.mark.asyncio async def test_supported_pdf_is_cached(self, adapter): @@ -339,6 +346,32 @@ class TestDocumentDownloadBlock: adapter.handle_message.assert_called_once() +# --------------------------------------------------------------------------- +# TestMediaGroups — media group (album) buffering +# --------------------------------------------------------------------------- + +class TestMediaGroups: + @pytest.mark.asyncio + async def test_photo_album_is_buffered_and_combined(self, adapter): + first_photo = _make_photo(_make_file_obj(b"first")) + second_photo = _make_photo(_make_file_obj(b"second")) + + msg1 = _make_message(caption="two images", media_group_id="album-1", photo=[first_photo]) + msg2 = _make_message(media_group_id="album-1", photo=[second_photo]) + + with patch("gateway.platforms.telegram.cache_image_from_bytes", side_effect=["/tmp/one.jpg", "/tmp/two.jpg"]): + await adapter._handle_media_message(_make_update(msg1), MagicMock()) + await adapter._handle_media_message(_make_update(msg2), MagicMock()) + assert adapter.handle_message.await_count == 0 + await asyncio.sleep(adapter.MEDIA_GROUP_WAIT_SECONDS + 0.05) + + adapter.handle_message.assert_awaited_once() + event = adapter.handle_message.call_args[0][0] + assert event.text == "two images" + assert event.media_urls == ["/tmp/one.jpg", "/tmp/two.jpg"] + assert len(event.media_types) == 2 + + # --------------------------------------------------------------------------- # TestSendDocument — outbound file attachment delivery # --------------------------------------------------------------------------- From 3fab72f1e17f33bc7328219fde4c39b054051e17 Mon Sep 17 00:00:00 2001 From: teknium1 Date: Sat, 14 Mar 2026 12:18:24 -0700 Subject: [PATCH 2/2] fix(gateway): clean up pending Telegram media groups on disconnect Cancel any queued media-group flush tasks during Telegram adapter disconnect and clear the buffered events map so shutdown can't leave a pending album flush behind. Add a regression test covering disconnect before the debounce window expires. --- gateway/platforms/telegram.py | 10 +++++++++- tests/gateway/test_telegram_documents.py | 18 ++++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/gateway/platforms/telegram.py b/gateway/platforms/telegram.py index aae0cce7..7496a071 100644 --- a/gateway/platforms/telegram.py +++ b/gateway/platforms/telegram.py @@ -200,7 +200,15 @@ class TelegramAdapter(BasePlatformAdapter): return False async def disconnect(self) -> None: - """Stop polling and disconnect.""" + """Stop polling, cancel pending album flushes, and disconnect.""" + pending_media_group_tasks = list(self._media_group_tasks.values()) + for task in pending_media_group_tasks: + task.cancel() + if pending_media_group_tasks: + await asyncio.gather(*pending_media_group_tasks, return_exceptions=True) + self._media_group_tasks.clear() + self._media_group_events.clear() + if self._app: try: await self._app.updater.stop() diff --git a/tests/gateway/test_telegram_documents.py b/tests/gateway/test_telegram_documents.py index 25cfc249..5e3e6f94 100644 --- a/tests/gateway/test_telegram_documents.py +++ b/tests/gateway/test_telegram_documents.py @@ -371,6 +371,24 @@ class TestMediaGroups: assert event.media_urls == ["/tmp/one.jpg", "/tmp/two.jpg"] assert len(event.media_types) == 2 + @pytest.mark.asyncio + async def test_disconnect_cancels_pending_media_group_flush(self, adapter): + first_photo = _make_photo(_make_file_obj(b"first")) + msg = _make_message(caption="two images", media_group_id="album-2", photo=[first_photo]) + + with patch("gateway.platforms.telegram.cache_image_from_bytes", return_value="/tmp/one.jpg"): + await adapter._handle_media_message(_make_update(msg), MagicMock()) + + assert "album-2" in adapter._media_group_events + assert "album-2" in adapter._media_group_tasks + + await adapter.disconnect() + await asyncio.sleep(adapter.MEDIA_GROUP_WAIT_SECONDS + 0.05) + + assert adapter._media_group_events == {} + assert adapter._media_group_tasks == {} + adapter.handle_message.assert_not_awaited() + # --------------------------------------------------------------------------- # TestSendDocument — outbound file attachment delivery