Merge pull request #1341 from NousResearch/hermes/hermes-2f2b4807

fix(gateway): buffer Telegram media groups to prevent self-interruption
This commit is contained in:
Teknium 2026-03-14 18:03:24 -07:00 committed by GitHub
commit dc44e183e6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 112 additions and 5 deletions

View file

@ -105,11 +105,14 @@ class TelegramAdapter(BasePlatformAdapter):
# Telegram message limits
MAX_MESSAGE_LENGTH = 4096
MEDIA_GROUP_WAIT_SECONDS = 0.8
def __init__(self, config: PlatformConfig):
super().__init__(config, Platform.TELEGRAM)
self._app: Optional[Application] = None
self._bot: Optional[Bot] = None
self._media_group_events: Dict[str, MessageEvent] = {}
self._media_group_tasks: Dict[str, asyncio.Task] = {}
self._token_lock_identity: Optional[str] = None
self._polling_error_task: Optional[asyncio.Task] = None
@ -261,7 +264,15 @@ class TelegramAdapter(BasePlatformAdapter):
return False
async def disconnect(self) -> None:
"""Stop polling and disconnect."""
"""Stop polling, cancel pending album flushes, and disconnect."""
pending_media_group_tasks = list(self._media_group_tasks.values())
for task in pending_media_group_tasks:
task.cancel()
if pending_media_group_tasks:
await asyncio.gather(*pending_media_group_tasks, return_exceptions=True)
self._media_group_tasks.clear()
self._media_group_events.clear()
if self._app:
try:
await self._app.updater.stop()
@ -943,8 +954,53 @@ class TelegramAdapter(BasePlatformAdapter):
except Exception as e:
logger.warning("[Telegram] Failed to cache document: %s", e, exc_info=True)
media_group_id = getattr(msg, "media_group_id", None)
if media_group_id:
await self._queue_media_group_event(str(media_group_id), event)
return
await self.handle_message(event)
async def _queue_media_group_event(self, media_group_id: str, event: MessageEvent) -> None:
"""Buffer Telegram media-group items so albums arrive as one logical event.
Telegram delivers albums as multiple updates with a shared media_group_id.
If we forward each item immediately, the gateway thinks the second image is a
new user message and interrupts the first. We debounce briefly and merge the
attachments into a single MessageEvent.
"""
existing = self._media_group_events.get(media_group_id)
if existing is None:
self._media_group_events[media_group_id] = event
else:
existing.media_urls.extend(event.media_urls)
existing.media_types.extend(event.media_types)
if event.text:
if existing.text:
if event.text not in existing.text.split("\n\n"):
existing.text = f"{existing.text}\n\n{event.text}"
else:
existing.text = event.text
prior_task = self._media_group_tasks.get(media_group_id)
if prior_task:
prior_task.cancel()
self._media_group_tasks[media_group_id] = asyncio.create_task(
self._flush_media_group_event(media_group_id)
)
async def _flush_media_group_event(self, media_group_id: str) -> None:
try:
await asyncio.sleep(self.MEDIA_GROUP_WAIT_SECONDS)
event = self._media_group_events.pop(media_group_id, None)
if event is not None:
await self.handle_message(event)
except asyncio.CancelledError:
return
finally:
self._media_group_tasks.pop(media_group_id, None)
async def _handle_sticker(self, msg: Message, event: "MessageEvent") -> None:
"""
Describe a Telegram sticker via vision analysis, with caching.

View file

@ -81,20 +81,21 @@ def _make_document(
return doc
def _make_message(document=None, caption=None):
"""Build a mock Telegram Message with the given document."""
def _make_message(document=None, caption=None, media_group_id=None, photo=None):
"""Build a mock Telegram Message with the given document/photo."""
msg = MagicMock()
msg.message_id = 42
msg.text = caption or ""
msg.caption = caption
msg.date = None
# Media flags — all None except document
msg.photo = None
# Media flags — all None except explicit payload
msg.photo = photo
msg.video = None
msg.audio = None
msg.voice = None
msg.sticker = None
msg.document = document
msg.media_group_id = media_group_id
# Chat / user
msg.chat = MagicMock()
msg.chat.id = 100
@ -165,6 +166,12 @@ class TestDocumentTypeDetection:
# TestDocumentDownloadBlock
# ---------------------------------------------------------------------------
def _make_photo(file_obj=None):
photo = MagicMock()
photo.get_file = AsyncMock(return_value=file_obj or _make_file_obj(b"photo-bytes"))
return photo
class TestDocumentDownloadBlock:
@pytest.mark.asyncio
async def test_supported_pdf_is_cached(self, adapter):
@ -339,6 +346,50 @@ class TestDocumentDownloadBlock:
adapter.handle_message.assert_called_once()
# ---------------------------------------------------------------------------
# TestMediaGroups — media group (album) buffering
# ---------------------------------------------------------------------------
class TestMediaGroups:
@pytest.mark.asyncio
async def test_photo_album_is_buffered_and_combined(self, adapter):
first_photo = _make_photo(_make_file_obj(b"first"))
second_photo = _make_photo(_make_file_obj(b"second"))
msg1 = _make_message(caption="two images", media_group_id="album-1", photo=[first_photo])
msg2 = _make_message(media_group_id="album-1", photo=[second_photo])
with patch("gateway.platforms.telegram.cache_image_from_bytes", side_effect=["/tmp/one.jpg", "/tmp/two.jpg"]):
await adapter._handle_media_message(_make_update(msg1), MagicMock())
await adapter._handle_media_message(_make_update(msg2), MagicMock())
assert adapter.handle_message.await_count == 0
await asyncio.sleep(adapter.MEDIA_GROUP_WAIT_SECONDS + 0.05)
adapter.handle_message.assert_awaited_once()
event = adapter.handle_message.call_args[0][0]
assert event.text == "two images"
assert event.media_urls == ["/tmp/one.jpg", "/tmp/two.jpg"]
assert len(event.media_types) == 2
@pytest.mark.asyncio
async def test_disconnect_cancels_pending_media_group_flush(self, adapter):
first_photo = _make_photo(_make_file_obj(b"first"))
msg = _make_message(caption="two images", media_group_id="album-2", photo=[first_photo])
with patch("gateway.platforms.telegram.cache_image_from_bytes", return_value="/tmp/one.jpg"):
await adapter._handle_media_message(_make_update(msg), MagicMock())
assert "album-2" in adapter._media_group_events
assert "album-2" in adapter._media_group_tasks
await adapter.disconnect()
await asyncio.sleep(adapter.MEDIA_GROUP_WAIT_SECONDS + 0.05)
assert adapter._media_group_events == {}
assert adapter._media_group_tasks == {}
adapter.handle_message.assert_not_awaited()
# ---------------------------------------------------------------------------
# TestSendDocument — outbound file attachment delivery
# ---------------------------------------------------------------------------