From 5e5ad634a1df2b6ac417e5d2f1f8e4c5798cb987 Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Sun, 22 Mar 2026 09:27:25 -0700 Subject: [PATCH] fix(matrix): duplicate messages, image caching for vision support (#2520) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three fixes for the Matrix adapter: 1. Remove RoomMessageMedia callback registration — RoomMessageImage inherits from it, causing images to be processed twice. 2. Add event ID deduplication to both text and media handlers. nio can fire the same event more than once; bounded deque+set tracks the last 1000 events. 3. Cache images locally via Matrix client download. MXC URLs require authentication, so the vision pipeline couldn't access them. Images are now downloaded via the authenticated client and saved to the local cache (same pattern as Telegram/Discord). Cherry-picked from PR #2353 by williamtwomey. Co-authored-by: williamtwomey --- gateway/platforms/matrix.py | 52 ++++++++++++++++++++++++++++++++++--- 1 file changed, 49 insertions(+), 3 deletions(-) diff --git a/gateway/platforms/matrix.py b/gateway/platforms/matrix.py index 77a2f240..dbdd8702 100644 --- a/gateway/platforms/matrix.py +++ b/gateway/platforms/matrix.py @@ -103,6 +103,23 @@ class MatrixAdapter(BasePlatformAdapter): self._dm_rooms: Dict[str, bool] = {} # Set of room IDs we've joined self._joined_rooms: Set[str] = set() + # Event deduplication (bounded deque keeps newest entries) + from collections import deque + self._processed_events: deque = deque(maxlen=1000) + self._processed_events_set: set = set() + + def _is_duplicate_event(self, event_id) -> bool: + """Return True if this event was already processed. Tracks the ID otherwise.""" + if not event_id: + return False + if event_id in self._processed_events_set: + return True + if len(self._processed_events) == self._processed_events.maxlen: + evicted = self._processed_events[0] + self._processed_events_set.discard(evicted) + self._processed_events.append(event_id) + self._processed_events_set.add(event_id) + return False # ------------------------------------------------------------------ # Required overrides @@ -188,7 +205,6 @@ class MatrixAdapter(BasePlatformAdapter): # Register event callbacks. client.add_event_callback(self._on_room_message, nio.RoomMessageText) - client.add_event_callback(self._on_room_message_media, nio.RoomMessageMedia) client.add_event_callback(self._on_room_message_media, nio.RoomMessageImage) client.add_event_callback(self._on_room_message_media, nio.RoomMessageAudio) client.add_event_callback(self._on_room_message_media, nio.RoomMessageVideo) @@ -559,6 +575,10 @@ class MatrixAdapter(BasePlatformAdapter): if event.sender == self._user_id: return + # Deduplicate by event ID (nio can fire the same event more than once). + if self._is_duplicate_event(getattr(event, "event_id", None)): + return + # Startup grace: ignore old messages from initial sync. event_ts = getattr(event, "server_timestamp", 0) / 1000.0 if event_ts and event_ts < self._startup_ts - _STARTUP_GRACE_SECONDS: @@ -648,6 +668,10 @@ class MatrixAdapter(BasePlatformAdapter): if event.sender == self._user_id: return + # Deduplicate by event ID. + if self._is_duplicate_event(getattr(event, "event_id", None)): + return + # Startup grace. event_ts = getattr(event, "server_timestamp", 0) / 1000.0 if event_ts and event_ts < self._startup_ts - _STARTUP_GRACE_SECONDS: @@ -681,6 +705,24 @@ class MatrixAdapter(BasePlatformAdapter): elif event_mimetype: media_type = event_mimetype + # For images, download and cache locally so vision tools can access them. + # Matrix MXC URLs require authentication, so direct URL access fails. + cached_path = None + if msg_type == MessageType.PHOTO and url: + try: + ext_map = { + "image/jpeg": ".jpg", "image/png": ".png", + "image/gif": ".gif", "image/webp": ".webp", + } + ext = ext_map.get(event_mimetype, ".jpg") + download_resp = await self._client.download(url) + if isinstance(download_resp, nio.DownloadResponse): + from gateway.platforms.base import cache_image_from_bytes + cached_path = cache_image_from_bytes(download_resp.body, ext=ext) + logger.info("[Matrix] Cached user image at %s", cached_path) + except Exception as e: + logger.warning("[Matrix] Failed to cache image: %s", e) + is_dm = self._dm_rooms.get(room.room_id, False) if not is_dm and room.member_count == 2: is_dm = True @@ -701,14 +743,18 @@ class MatrixAdapter(BasePlatformAdapter): thread_id=thread_id, ) + # Use cached local path for images, HTTP URL for other media types + media_urls = [cached_path] if cached_path else ([http_url] if http_url else None) + media_types = [media_type] if media_urls else None + msg_event = MessageEvent( text=body, message_type=msg_type, source=source, raw_message=getattr(event, "source", {}), message_id=event.event_id, - media_urls=[http_url] if http_url else None, - media_types=[media_type] if http_url else None, + media_urls=media_urls, + media_types=media_types, ) await self.handle_message(msg_event)