diff --git a/gateway/platforms/matrix.py b/gateway/platforms/matrix.py index 77a2f240..dbdd8702 100644 --- a/gateway/platforms/matrix.py +++ b/gateway/platforms/matrix.py @@ -103,6 +103,23 @@ class MatrixAdapter(BasePlatformAdapter): self._dm_rooms: Dict[str, bool] = {} # Set of room IDs we've joined self._joined_rooms: Set[str] = set() + # Event deduplication (bounded deque keeps newest entries) + from collections import deque + self._processed_events: deque = deque(maxlen=1000) + self._processed_events_set: set = set() + + def _is_duplicate_event(self, event_id) -> bool: + """Return True if this event was already processed. Tracks the ID otherwise.""" + if not event_id: + return False + if event_id in self._processed_events_set: + return True + if len(self._processed_events) == self._processed_events.maxlen: + evicted = self._processed_events[0] + self._processed_events_set.discard(evicted) + self._processed_events.append(event_id) + self._processed_events_set.add(event_id) + return False # ------------------------------------------------------------------ # Required overrides @@ -188,7 +205,6 @@ class MatrixAdapter(BasePlatformAdapter): # Register event callbacks. client.add_event_callback(self._on_room_message, nio.RoomMessageText) - client.add_event_callback(self._on_room_message_media, nio.RoomMessageMedia) client.add_event_callback(self._on_room_message_media, nio.RoomMessageImage) client.add_event_callback(self._on_room_message_media, nio.RoomMessageAudio) client.add_event_callback(self._on_room_message_media, nio.RoomMessageVideo) @@ -559,6 +575,10 @@ class MatrixAdapter(BasePlatformAdapter): if event.sender == self._user_id: return + # Deduplicate by event ID (nio can fire the same event more than once). + if self._is_duplicate_event(getattr(event, "event_id", None)): + return + # Startup grace: ignore old messages from initial sync. event_ts = getattr(event, "server_timestamp", 0) / 1000.0 if event_ts and event_ts < self._startup_ts - _STARTUP_GRACE_SECONDS: @@ -648,6 +668,10 @@ class MatrixAdapter(BasePlatformAdapter): if event.sender == self._user_id: return + # Deduplicate by event ID. + if self._is_duplicate_event(getattr(event, "event_id", None)): + return + # Startup grace. event_ts = getattr(event, "server_timestamp", 0) / 1000.0 if event_ts and event_ts < self._startup_ts - _STARTUP_GRACE_SECONDS: @@ -681,6 +705,24 @@ class MatrixAdapter(BasePlatformAdapter): elif event_mimetype: media_type = event_mimetype + # For images, download and cache locally so vision tools can access them. + # Matrix MXC URLs require authentication, so direct URL access fails. + cached_path = None + if msg_type == MessageType.PHOTO and url: + try: + ext_map = { + "image/jpeg": ".jpg", "image/png": ".png", + "image/gif": ".gif", "image/webp": ".webp", + } + ext = ext_map.get(event_mimetype, ".jpg") + download_resp = await self._client.download(url) + if isinstance(download_resp, nio.DownloadResponse): + from gateway.platforms.base import cache_image_from_bytes + cached_path = cache_image_from_bytes(download_resp.body, ext=ext) + logger.info("[Matrix] Cached user image at %s", cached_path) + except Exception as e: + logger.warning("[Matrix] Failed to cache image: %s", e) + is_dm = self._dm_rooms.get(room.room_id, False) if not is_dm and room.member_count == 2: is_dm = True @@ -701,14 +743,18 @@ class MatrixAdapter(BasePlatformAdapter): thread_id=thread_id, ) + # Use cached local path for images, HTTP URL for other media types + media_urls = [cached_path] if cached_path else ([http_url] if http_url else None) + media_types = [media_type] if media_urls else None + msg_event = MessageEvent( text=body, message_type=msg_type, source=source, raw_message=getattr(event, "source", {}), message_id=event.event_id, - media_urls=[http_url] if http_url else None, - media_types=[media_type] if http_url else None, + media_urls=media_urls, + media_types=media_types, ) await self.handle_message(msg_event)