diff --git a/.env.example b/.env.example index 3af498d..c7edcbc 100644 --- a/.env.example +++ b/.env.example @@ -5,16 +5,13 @@ TELEGRAM_BOT_TOKEN=your_bot_token_here MATRIX_HOMESERVER=https://matrix.org MATRIX_USER_ID=@bot:matrix.org MATRIX_PASSWORD=your_password_here + +# Lambda Platform +LAMBDA_PLATFORM_URL=http://localhost:8000 +LAMBDA_SERVICE_TOKEN=your_service_token_here +AGENT_WS_URL=ws://127.0.0.1:8000/agent_ws/ +AGENT_BASE_URL=http://127.0.0.1:8000 MATRIX_PLATFORM_BACKEND=real -# Shared workspace contract -SURFACES_WORKSPACE_DIR=/workspace - -# Compose-local platform-agent route -AGENT_WS_URL=ws://platform-agent:8000/v1/agent_ws/{chat_id}/ -AGENT_BASE_URL=http://platform-agent:8000 - -# platform-agent provider -PROVIDER_MODEL=openai/gpt-4o-mini -PROVIDER_URL=https://openrouter.ai/api/v1 -PROVIDER_API_KEY=sk-or-... +# Режим работы: "mock" или "production" +PLATFORM_MODE=mock diff --git a/README.md b/README.md index 8d95c6b..82cd55c 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ | Поверхность | Статус | |---|---| | Telegram | 🔨 В разработке, отдельный worktree `feat/telegram-adapter` | -| Matrix | ✅ Рабочий прототип, запускается через root `docker compose` вместе с `platform-agent` | +| Matrix | ✅ Рабочий прототип, подключается к реальному агенту | --- @@ -69,8 +69,8 @@ surfaces-bot/ - **Диалог** — сообщения, вложения, подтверждения `!yes` / `!no` и routing через `EventDispatcher` - **Стабильность** — перед `sync_forever()` бот делает bootstrap sync и стартует с `since`, чтобы не переигрывать старую timeline после рестарта - **Текущее ограничение** — encrypted DM пока не поддержан; ручное тестирование Matrix ведётся в незашифрованных комнатах и зависит от локального state-store бота -- **Backend selection** — `MATRIX_PLATFORM_BACKEND=mock` остаётся значением по умолчанию; `MATRIX_PLATFORM_BACKEND=real` использует `platform-agent` из compose и WebSocket contract `/v1/agent_ws/{chat_id}/` -- **Ограничения real backend** — локальный runtime использует shared `/workspace`, а файлы передаются как относительные пути в `attachments` +- **Backend selection** — `MATRIX_PLATFORM_BACKEND=mock` остаётся значением по умолчанию; `MATRIX_PLATFORM_BACKEND=real` требует `AGENT_WS_URL=ws://host:port/agent_ws/` +- **Ограничения real backend** — пока это текстовый direct-agent прототип без вложений и без асинхронных callbacks; локальные настройки и user-state хранятся в `PrototypeStateStore` --- @@ -90,7 +90,6 @@ class PlatformClient(Protocol): Бот передаёт `user_id` + `chat_id` + сообщение; платформа сама решает нужно ли поднять контейнер. Сейчас: `MockPlatformClient` в `sdk/mock.py`, а Matrix real backend собирается через `sdk/real.py` при `MATRIX_PLATFORM_BACKEND=real`. -Файловый контракт уже path-based: бот пишет файлы в shared `/workspace` и передаёт платформе относительные пути в `attachments`. Когда SDK готов: добавляем `SdkPlatformClient`, меняем одну строку в DI. Адаптеры и ядро не трогаем. --- @@ -121,40 +120,32 @@ MATRIX_PASSWORD=... # или MATRIX_ACCESS_TOKEN=... # Выбор backend: mock (по умолчанию) или real (подключение к platform-agent) MATRIX_PLATFORM_BACKEND=real -# compose runtime: platform-agent service name + shared /workspace -AGENT_WS_URL=ws://platform-agent:8000/v1/agent_ws/ -AGENT_BASE_URL=http://platform-agent:8000 -SURFACES_WORKSPACE_DIR=/workspace +# URL WebSocket endpoint platform-agent (только при MATRIX_PLATFORM_BACKEND=real) +AGENT_WS_URL=ws://127.0.0.1:8000/agent_ws/ +AGENT_BASE_URL=http://127.0.0.1:8000 ``` -### 3. Compose runtime +### 3. Запуск platform-agent (для real backend) -Root `docker-compose.yml` теперь является основным локальным runtime для Matrix и platform-agent. -Он поднимает `matrix-bot`, `platform-agent` и общий volume `/workspace`. +platform-agent — отдельный репозиторий, сейчас клонируется в `external/platform-agent`. ```bash -docker compose up --build +cd external/platform-agent + +# Создать .env с параметрами LLM провайдера +cat > .env <` — удалить вложение по номеру -- `!remove all` — очистить все staged вложения - -Следующее обычное сообщение пользователя уходит агенту вместе со всеми staged файлами. - -### 4. Запуск бота вручную +### 4. Запуск бота ```bash # Первый запуск или сброс состояния @@ -193,7 +184,6 @@ PYTHONPATH=. uv run python -m adapter.matrix.bot | Состояние контекста | `!context` | Текущая сессия и список сохранений | | Справка | `!help` | | | Подтверждения | `!yes` / `!no` | Для опасных действий | -| Staged вложения | `!list`, `!remove `, `!remove all` | Файлы без текстовой инструкции ставятся в очередь до следующего сообщения | ### Не работает — блокеры на стороне platform-agent @@ -202,6 +192,7 @@ PYTHONPATH=. uv run python -m adapter.matrix.bot | `!load` в другом чате | platform-agent использует `StateBackend` — файлы живут в памяти отдельно для каждого `thread_id`. Файл, сохранённый в чате A, не виден в чате B. Фикс: переключить platform-agent на `FilesystemBackend` с общим хранилищем. | | Счётчик токенов в `!context` | platform-agent отдаёт `tokens_used=0` хардкодом в `MsgEventEnd`. Наш код перехватывает значение корректно. | | `!reset` | platform-agent не имеет endpoint `/reset`. Задокументировано в ТЗ к платформе. | +| Файловые вложения | Нет API загрузки файлов в область видимости агента. ТЗ передано платформе. | | Персистентность между рестартами | platform-agent использует `MemorySaver` (in-memory). Все разговоры теряются при рестарте процесса. | | E2EE комнаты | `python-olm` не собирается на macOS/ARM. Ограничение инфраструктуры. | @@ -210,7 +201,7 @@ PYTHONPATH=. uv run python -m adapter.matrix.bot | Функция | Статус | |---|---| | `!settings`, `!skills`, `!soul`, `!safety` | Заглушки MVP. Требуют готового SDK платформы. | -| Вложения без текстовой инструкции | Поддержан staged UX только для Matrix. Для других поверхностей ещё не перенесено. | +| Вложения (изображения, документы) | Только текстовые сообщения в текущем MVP. | --- diff --git a/adapter/matrix/bot.py b/adapter/matrix/bot.py index cf8a74f..44d7c95 100644 --- a/adapter/matrix/bot.py +++ b/adapter/matrix/bot.py @@ -6,54 +6,37 @@ from dataclasses import dataclass from pathlib import Path import structlog -from dotenv import load_dotenv from nio import ( AsyncClient, AsyncClientConfig, InviteMemberEvent, MatrixRoom, RoomMemberEvent, - RoomMessage, - RoomMessageAudio, - RoomMessageFile, - RoomMessageImage, RoomMessageText, - RoomMessageVideo, ) from nio.responses import SyncResponse +from dotenv import load_dotenv from adapter.matrix.converter import from_room_event -from adapter.matrix.files import ( - download_matrix_attachment, - matrix_msgtype_for_attachment, - resolve_workspace_attachment_path, -) from adapter.matrix.handlers import register_matrix_handlers -from adapter.matrix.handlers.auth import handle_invite, provision_workspace_chat from adapter.matrix.handlers.context_commands import ( LOAD_PROMPT, ) +from adapter.matrix.handlers.auth import handle_invite, provision_workspace_chat from adapter.matrix.room_router import resolve_chat_id from adapter.matrix.store import ( - add_staged_attachment, clear_load_pending, - clear_staged_attachments, get_load_pending, get_room_meta, - get_staged_attachments, - remove_staged_attachment_at, - set_pending_confirm, set_platform_chat_id, set_room_meta, + set_pending_confirm, ) from core.auth import AuthManager from core.chat import ChatManager from core.handler import EventDispatcher from core.handlers import register_all from core.protocol import ( - Attachment, - IncomingCommand, - IncomingMessage, OutgoingEvent, OutgoingMessage, OutgoingNotification, @@ -214,38 +197,6 @@ class MatrixBot: incoming = from_room_event(event, room_id=room.room_id, chat_id=dispatch_chat_id) if incoming is None: return - if isinstance(incoming, IncomingCommand) and incoming.command in { - "matrix_list_attachments", - "matrix_remove_attachment", - }: - outgoing = await self._handle_staged_attachment_command( - room.room_id, - sender, - incoming, - ) - await self._send_all(room.room_id, outgoing) - return - if self._is_file_only_event(event, incoming): - materialized = await self._materialize_incoming_attachments( - room.room_id, - sender, - incoming, - ) - await self._stage_attachments(room.room_id, sender, materialized.attachments) - return - if isinstance(incoming, IncomingMessage) and incoming.attachments: - incoming = await self._materialize_incoming_attachments( - room.room_id, - sender, - incoming, - ) - clear_staged_after_dispatch = False - if isinstance(incoming, IncomingMessage) and incoming.text: - incoming, clear_staged_after_dispatch = await self._merge_staged_attachments( - room.room_id, - sender, - incoming, - ) try: outgoing = await self.runtime.dispatcher.dispatch(incoming) except PlatformError as exc: @@ -259,159 +210,11 @@ class MatrixBot: outgoing = [ OutgoingMessage( chat_id=dispatch_chat_id, - text="Сервис временно недоступен. Попробуйте ещё раз позже.", + text="Сервис временно недоступен. Попробуйте ещё раз позже." ) ] - else: - if clear_staged_after_dispatch: - await clear_staged_attachments(self.runtime.store, room.room_id, sender) await self._send_all(room.room_id, outgoing) - def _is_file_only_event( - self, event: RoomMessage, incoming: IncomingMessage | IncomingCommand - ) -> bool: - return ( - isinstance(incoming, IncomingMessage) - and bool(incoming.attachments) - and not isinstance(event, RoomMessageText) - ) - - async def _stage_attachments( - self, - room_id: str, - user_id: str, - attachments: list, - ) -> None: - for attachment in attachments: - await add_staged_attachment( - self.runtime.store, - room_id, - user_id, - { - "type": attachment.type, - "url": attachment.url, - "filename": attachment.filename, - "mime_type": attachment.mime_type, - "workspace_path": attachment.workspace_path, - }, - ) - - async def _format_staged_attachments( - self, - room_id: str, - user_id: str, - *, - include_hint: bool = False, - ) -> str: - attachments = await get_staged_attachments(self.runtime.store, room_id, user_id) - if not attachments: - return "Нет сохраненных вложений." - - lines = ["Вложения в очереди:"] - for index, attachment in enumerate(attachments, start=1): - lines.append(f"{index}. {attachment.get('filename') or 'attachment'}") - if include_hint: - lines.extend( - [ - "", - "Следующее сообщение отправит файлы агенту.", - "Команды: !list, !remove , !remove all", - ] - ) - return "\n".join(lines) - - async def _handle_staged_attachment_command( - self, - room_id: str, - user_id: str, - incoming: IncomingCommand, - ) -> list[OutgoingEvent]: - if incoming.command == "matrix_list_attachments": - return [ - OutgoingMessage( - chat_id=incoming.chat_id, - text=await self._format_staged_attachments(room_id, user_id), - ) - ] - - arg = incoming.args[0] if incoming.args else "" - if arg == "all": - await clear_staged_attachments(self.runtime.store, room_id, user_id) - return [OutgoingMessage(chat_id=incoming.chat_id, text="Все вложения удалены.")] - - try: - index = int(arg) - 1 - except ValueError: - return [OutgoingMessage(chat_id=incoming.chat_id, text="Нет такого вложения.")] - - removed = await remove_staged_attachment_at(self.runtime.store, room_id, user_id, index) - if removed is None: - return [OutgoingMessage(chat_id=incoming.chat_id, text="Нет такого вложения.")] - return [ - OutgoingMessage( - chat_id=incoming.chat_id, - text=await self._format_staged_attachments(room_id, user_id), - ) - ] - - async def _merge_staged_attachments( - self, - room_id: str, - user_id: str, - incoming: IncomingMessage, - ) -> tuple[IncomingMessage, bool]: - staged = await get_staged_attachments(self.runtime.store, room_id, user_id) - if not staged: - return incoming, False - attachments = [ - Attachment( - type=item.get("type", "document"), - url=item.get("url"), - filename=item.get("filename"), - mime_type=item.get("mime_type"), - workspace_path=item.get("workspace_path"), - ) - for item in staged - ] - return ( - IncomingMessage( - user_id=incoming.user_id, - platform=incoming.platform, - chat_id=incoming.chat_id, - text=incoming.text, - attachments=attachments, - reply_to=incoming.reply_to, - ), - True, - ) - - async def _materialize_incoming_attachments( - self, - room_id: str, - matrix_user_id: str, - incoming: IncomingMessage, - ) -> IncomingMessage: - workspace_root = Path(os.environ.get("SURFACES_WORKSPACE_DIR", "/workspace")) - materialized = [] - for attachment in incoming.attachments: - materialized.append( - await download_matrix_attachment( - client=self.client, - workspace_root=workspace_root, - matrix_user_id=matrix_user_id, - room_id=room_id, - attachment=attachment, - ) - ) - return IncomingMessage( - user_id=incoming.user_id, - platform=incoming.platform, - chat_id=incoming.chat_id, - text=incoming.text, - attachments=materialized, - reply_to=incoming.reply_to, - ) - async def _bootstrap_unregistered_room( self, room: MatrixRoom, @@ -448,6 +251,11 @@ class MatrixBot: f"Привет, {created['user'].display_name or sender}! Пиши — я здесь.\n\n" "Команды: !new · !chats · !rename · !archive · !context · !save · !load · !help" ) + await self.client.room_send( + created["chat_room_id"], + "m.room.message", + {"msgtype": "m.text", "body": welcome}, + ) await set_room_meta( self.runtime.store, room.room_id, @@ -457,18 +265,12 @@ class MatrixBot: "redirect_chat_id": created["chat_id"], }, ) - await self.client.room_send( - created["chat_room_id"], - "m.room.message", - {"msgtype": "m.text", "body": welcome}, - ) return [ OutgoingMessage( chat_id=room.room_id, text=( f"Создал рабочий чат {created['room_name']} ({created['chat_id']}) " - "и добавил его в пространство Lambda. " - "Открой приглашённую комнату для продолжения." + "и добавил его в пространство Lambda. Открой приглашённую комнату для продолжения." ), ) ] @@ -521,9 +323,7 @@ class MatrixBot: except Exception as exc: logger.warning("load_agent_call_failed", error=str(exc)) return [OutgoingMessage(chat_id=room_id, text=f"Ошибка при загрузке: {exc}")] - return [ - OutgoingMessage(chat_id=room_id, text=f"Запрос на загрузку отправлен агенту: {name}") - ] + return [OutgoingMessage(chat_id=room_id, text=f"Запрос на загрузку отправлен агенту: {name}")] async def on_member(self, room: MatrixRoom, event: RoomMemberEvent) -> None: if getattr(event, "sender", None) == self.client.user_id: @@ -551,7 +351,6 @@ async def prepare_live_sync(client: AsyncClient) -> str | None: return response.next_batch return None - async def send_outgoing( client: AsyncClient, room_id: str, @@ -566,37 +365,7 @@ async def send_outgoing( await client.room_send(room_id, "m.room.message", {"msgtype": "m.text", "body": body}) return if isinstance(event, OutgoingMessage): - if event.text: - await client.room_send( - room_id, "m.room.message", {"msgtype": "m.text", "body": event.text} - ) - if event.attachments: - workspace_root = Path(os.environ.get("SURFACES_WORKSPACE_DIR", "/workspace")) - for attachment in event.attachments: - if not attachment.workspace_path: - continue - file_path = resolve_workspace_attachment_path( - workspace_root, attachment.workspace_path - ) - with file_path.open("rb") as handle: - upload_response, _ = await client.upload( - handle, - content_type=attachment.mime_type or "application/octet-stream", - filename=attachment.filename or file_path.name, - filesize=file_path.stat().st_size, - ) - content_uri = getattr(upload_response, "content_uri", None) - if not content_uri: - raise RuntimeError(f"Matrix upload failed for {file_path}") - await client.room_send( - room_id, - "m.room.message", - { - "msgtype": matrix_msgtype_for_attachment(attachment), - "body": attachment.filename or file_path.name, - "url": content_uri, - }, - ) + await client.room_send(room_id, "m.room.message", {"msgtype": "m.text", "body": event.text}) return if isinstance(event, OutgoingUI): lines = [event.text] @@ -661,16 +430,7 @@ async def main() -> None: since_token = await prepare_live_sync(client) bot = MatrixBot(client, runtime) - client.add_event_callback( - bot.on_room_message, - ( - RoomMessageText, - RoomMessageFile, - RoomMessageImage, - RoomMessageVideo, - RoomMessageAudio, - ), - ) + client.add_event_callback(bot.on_room_message, RoomMessageText) client.add_event_callback(bot.on_member, (InviteMemberEvent, RoomMemberEvent)) logger.info( diff --git a/adapter/matrix/converter.py b/adapter/matrix/converter.py index a19d8ea..00fcdc4 100644 --- a/adapter/matrix/converter.py +++ b/adapter/matrix/converter.py @@ -14,53 +14,42 @@ PLATFORM = "matrix" def extract_attachments(event: Any) -> list[Attachment]: - source = getattr(event, "source", {}) or {} - content = source.get("content", {}) or getattr(event, "content", {}) or {} msgtype = getattr(event, "msgtype", None) if msgtype is None: + content = getattr(event, "content", {}) or {} msgtype = content.get("msgtype") - url = content.get("url") or getattr(event, "url", None) - filename = content.get("body") or getattr(event, "body", None) - mime_type = content.get("mimetype") or getattr(event, "mimetype", None) - if mime_type is None: - info = content.get("info") or {} - if isinstance(info, dict): - mime_type = info.get("mimetype") if msgtype == "m.image": return [ Attachment( type="image", - url=url, - filename=filename, - mime_type=mime_type, + url=getattr(event, "url", None), + mime_type=getattr(event, "mimetype", None), ) ] if msgtype == "m.file": return [ Attachment( type="document", - url=url, - filename=filename, - mime_type=mime_type, + url=getattr(event, "url", None), + filename=getattr(event, "body", None), + mime_type=getattr(event, "mimetype", None), ) ] if msgtype == "m.audio": return [ Attachment( type="audio", - url=url, - filename=filename, - mime_type=mime_type, + url=getattr(event, "url", None), + mime_type=getattr(event, "mimetype", None), ) ] if msgtype == "m.video": return [ Attachment( type="video", - url=url, - filename=filename, - mime_type=mime_type, + url=getattr(event, "url", None), + mime_type=getattr(event, "mimetype", None), ) ] return [] @@ -86,24 +75,6 @@ def from_command(body: str, sender: str, chat_id: str, room_id: str | None = Non }, ) - if command == "list" and not args: - return IncomingCommand( - user_id=sender, - platform=PLATFORM, - chat_id=chat_id, - command="matrix_list_attachments", - args=[], - ) - - if command == "remove" and len(args) == 1: - return IncomingCommand( - user_id=sender, - platform=PLATFORM, - chat_id=chat_id, - command="matrix_remove_attachment", - args=[args[0]], - ) - aliases = { "skills": "settings_skills", "connectors": "settings_connectors", diff --git a/adapter/matrix/files.py b/adapter/matrix/files.py deleted file mode 100644 index 52d1a1c..0000000 --- a/adapter/matrix/files.py +++ /dev/null @@ -1,103 +0,0 @@ -from __future__ import annotations - -import mimetypes -import re -from datetime import UTC, datetime -from pathlib import Path - -from core.protocol import Attachment - - -def _sanitize_component(value: str) -> str: - cleaned = re.sub(r"[^A-Za-z0-9._-]+", "_", value) - cleaned = cleaned.strip("._-") - return cleaned or "unknown" - - -def _default_filename(attachment: Attachment) -> str: - if attachment.filename: - return attachment.filename - - extension = mimetypes.guess_extension(attachment.mime_type or "") or "" - base = { - "image": "image", - "audio": "audio", - "video": "video", - "document": "attachment", - }.get(attachment.type, "attachment") - return f"{base}{extension}" - - -def build_workspace_attachment_path( - *, - workspace_root: Path, - matrix_user_id: str, - room_id: str, - filename: str, - timestamp: str | None = None, -) -> tuple[str, Path]: - stamp = timestamp or datetime.now(UTC).strftime("%Y%m%d-%H%M%S") - safe_user = _sanitize_component(matrix_user_id.lstrip("@")) - safe_room = _sanitize_component(room_id.lstrip("!")) - safe_name = _sanitize_component(filename) or "attachment.bin" - relative_path = ( - Path("surfaces") - / "matrix" - / safe_user - / safe_room - / "inbox" - / f"{stamp}-{safe_name}" - ) - return relative_path.as_posix(), workspace_root / relative_path - - -async def download_matrix_attachment( - *, - client, - workspace_root: Path, - matrix_user_id: str, - room_id: str, - attachment: Attachment, - timestamp: str | None = None, -) -> Attachment: - if not attachment.url: - return attachment - - filename = _default_filename(attachment) - relative_path, absolute_path = build_workspace_attachment_path( - workspace_root=workspace_root, - matrix_user_id=matrix_user_id, - room_id=room_id, - filename=filename, - timestamp=timestamp, - ) - absolute_path.parent.mkdir(parents=True, exist_ok=True) - - response = await client.download(attachment.url) - body = getattr(response, "body", None) - if body is None: - raise RuntimeError(f"Matrix download response for {attachment.url} has no body") - absolute_path.write_bytes(body) - - return Attachment( - type=attachment.type, - url=attachment.url, - filename=filename, - mime_type=attachment.mime_type, - workspace_path=relative_path, - ) - - -def resolve_workspace_attachment_path(workspace_root: Path, workspace_path: str) -> Path: - path = Path(workspace_path) - if path.is_absolute(): - return path - return workspace_root / path - - -def matrix_msgtype_for_attachment(attachment: Attachment) -> str: - return { - "image": "m.image", - "audio": "m.audio", - "video": "m.video", - }.get(attachment.type, "m.file") diff --git a/adapter/matrix/store.py b/adapter/matrix/store.py index acafa9f..5ebb61a 100644 --- a/adapter/matrix/store.py +++ b/adapter/matrix/store.py @@ -1,8 +1,5 @@ from __future__ import annotations -import asyncio -from weakref import WeakValueDictionary - from core.store import StateStore ROOM_META_PREFIX = "matrix_room:" @@ -12,8 +9,6 @@ SKILLS_MSG_PREFIX = "matrix_skills_msg:" PENDING_CONFIRM_PREFIX = "matrix_pending_confirm:" LOAD_PENDING_PREFIX = "matrix_load_pending:" RESET_PENDING_PREFIX = "matrix_reset_pending:" -STAGED_ATTACHMENTS_PREFIX = "matrix_staged_attachments:" -_STAGED_ATTACHMENTS_LOCKS: WeakValueDictionary[str, asyncio.Lock] = WeakValueDictionary() async def get_room_meta(store: StateStore, room_id: str) -> dict | None: @@ -131,66 +126,3 @@ async def set_reset_pending( async def clear_reset_pending(store: StateStore, user_id: str, room_id: str) -> None: await store.delete(_reset_pending_key(user_id, room_id)) - - -def _staged_attachments_key(room_id: str, user_id: str) -> str: - return f"{STAGED_ATTACHMENTS_PREFIX}{room_id}:{user_id}" - - -def _staged_attachments_lock(room_id: str, user_id: str) -> asyncio.Lock: - key = _staged_attachments_key(room_id, user_id) - lock = _STAGED_ATTACHMENTS_LOCKS.get(key) - if lock is None: - lock = asyncio.Lock() - _STAGED_ATTACHMENTS_LOCKS[key] = lock - return lock - - -async def get_staged_attachments( - store: StateStore, room_id: str, user_id: str -) -> list[dict]: - data = await store.get(_staged_attachments_key(room_id, user_id)) - if not isinstance(data, dict): - return [] - - attachments = data.get("attachments") - if not isinstance(attachments, list): - return [] - - return [attachment for attachment in attachments if isinstance(attachment, dict)] - - -async def add_staged_attachment( - store: StateStore, room_id: str, user_id: str, attachment: dict -) -> None: - async with _staged_attachments_lock(room_id, user_id): - attachments = await get_staged_attachments(store, room_id, user_id) - attachments.append(attachment) - await store.set( - _staged_attachments_key(room_id, user_id), {"attachments": attachments} - ) - - -async def remove_staged_attachment_at( - store: StateStore, room_id: str, user_id: str, index: int -) -> dict | None: - async with _staged_attachments_lock(room_id, user_id): - attachments = await get_staged_attachments(store, room_id, user_id) - if index < 0 or index >= len(attachments): - return None - - removed = attachments.pop(index) - if attachments: - await store.set( - _staged_attachments_key(room_id, user_id), {"attachments": attachments} - ) - else: - await store.delete(_staged_attachments_key(room_id, user_id)) - return removed - - -async def clear_staged_attachments( - store: StateStore, room_id: str, user_id: str -) -> None: - async with _staged_attachments_lock(room_id, user_id): - await store.delete(_staged_attachments_key(room_id, user_id)) diff --git a/core/handlers/message.py b/core/handlers/message.py index d9f91cd..2edb87e 100644 --- a/core/handlers/message.py +++ b/core/handlers/message.py @@ -29,15 +29,10 @@ async def handle_message(event: IncomingMessage, auth_mgr, platform, chat_mgr, s user_id=event.user_id, chat_id=event.chat_id, text=event.text, - attachments=event.attachments, + attachments=[], ) return [ OutgoingTyping(chat_id=event.chat_id, is_typing=False), - OutgoingMessage( - chat_id=event.chat_id, - text=response.response, - parse_mode="markdown", - attachments=list(getattr(response, "attachments", [])), - ), + OutgoingMessage(chat_id=event.chat_id, text=response.response, parse_mode="markdown"), ] diff --git a/core/protocol.py b/core/protocol.py index 7d6e25f..02a9f4a 100644 --- a/core/protocol.py +++ b/core/protocol.py @@ -12,7 +12,6 @@ class Attachment: content: bytes | None = None filename: str | None = None mime_type: str | None = None - workspace_path: str | None = None @dataclass diff --git a/docker-compose.yml b/docker-compose.yml index d6c2e4d..480ecad 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,39 +1,5 @@ services: - platform-agent: - build: - context: ./external/platform-agent - target: development - additional_contexts: - agent_api: ./external/platform-agent_api - env_file: .env - environment: - PYTHONUNBUFFERED: "1" - volumes: - - ./external/platform-agent/src:/app/src - - ./external/platform-agent_api:/agent_api - - workspace:/workspace - command: > - sh -lc " - mkdir -p /workspace && - chown -R agent:agent /workspace && - exec /app/.venv/bin/uvicorn src.main:app --host 0.0.0.0 --port 8000 - " - ports: - - "8000:8000" - restart: unless-stopped - matrix-bot: build: . env_file: .env - environment: - AGENT_BASE_URL: http://platform-agent:8000 - AGENT_WS_URL: ws://platform-agent:8000/v1/agent_ws/ - SURFACES_WORKSPACE_DIR: /workspace - depends_on: - - platform-agent - volumes: - - workspace:/workspace restart: unless-stopped - -volumes: - workspace: diff --git a/docs/superpowers/specs/2026-04-20-matrix-shared-workspace-file-flow-design.md b/docs/superpowers/specs/2026-04-20-matrix-shared-workspace-file-flow-design.md deleted file mode 100644 index feca84c..0000000 --- a/docs/superpowers/specs/2026-04-20-matrix-shared-workspace-file-flow-design.md +++ /dev/null @@ -1,252 +0,0 @@ -# Matrix Shared Workspace File Flow Design - -## Goal - -Bring the Matrix surface and `platform-agent` to a single file-handling model that matches the current platform runtime contract as closely as possible. - -The result should be: - -- Matrix receives user files and makes them visible to the agent through a shared `/workspace` -- `platform-agent` receives attachment paths, not ad hoc summaries or inline payloads -- the agent can send files back to the user through the surface via `send_file` -- local development and the default deployment path use the same storage contract - -## Core Decision - -The selected architecture is: - -`Matrix surface <-> shared /workspace <-> platform-agent` - -This means: - -- the Matrix bot is responsible for downloading incoming Matrix media -- downloaded files are written into the same filesystem mounted into `platform-agent` -- the surface passes relative workspace paths to the agent as `attachments` -- the agent returns files to the user by emitting `MsgEventSendFile(path=...)` - -This is the current platform-native direction and does not require new platform endpoints. - -## Why This Decision - -The current upstream platform changes already define the file contract: - -- `MsgUserMessage.attachments` is `list[str]` -- each attachment is a path relative to `/workspace` -- the agent validates those paths against its configured backend root -- the agent can emit `send_file(path)` back to the client - -That is not an upload API and not a remote blob contract. It is an explicit shared-workspace contract. - -Trying to preserve the current separate-process launch model would force the surface to fake production behavior with inline text extraction, out-of-band path rewriting, or a future upload API that does not exist yet. That would increase the gap between our runtime and the platform runtime instead of reducing it. - -## Scope - -This design covers: - -- shared workspace runtime for Matrix bot and `platform-agent` -- incoming Matrix file handling into shared storage -- attachment path propagation to `RealPlatformClient` and `AgentApi` -- outbound file delivery from agent to Matrix user -- local compose/dev workflow and README updates - -This design does not cover: - -- Telegram file flow -- encrypted Matrix media handling -- upload APIs on the platform side -- OCR, PDF parsing, or content extraction pipelines -- long-term object storage or file lifecycle policies beyond basic cleanup boundaries - -## Runtime Contract - -### Shared filesystem - -Both containers must mount the same directory at `/workspace`. - -Requirements: - -- the Matrix bot can create files under `/workspace` -- `platform-agent` sees the same files at the same relative paths -- agent-originated files written under `/workspace` are readable by the Matrix bot - -The contract is path-based, not URL-based. - -### Attachment path format - -The surface sends attachments to the agent as relative workspace paths, for example: - -- `surfaces/matrix///inbox/20260420-153000-report.pdf` -- `surfaces/matrix///inbox/20260420-153200-photo.jpg` - -Rules: - -- paths must be relative to `/workspace` -- paths must be normalized before sending to the agent -- surface-owned uploads must live under a dedicated namespace to avoid collisions with agent-created files - -## Data Flow - -### Incoming file from Matrix user - -1. Matrix receives `m.file`, `m.image`, `m.audio`, or `m.video`. -2. The Matrix bot resolves the target room and platform chat context as usual. -3. The Matrix bot downloads the media from Matrix. -4. The file is stored under `/workspace/surfaces/matrix/.../inbox/...`. -5. The outgoing platform call includes: - - original user text - - `attachments=[relative_path_1, ...]` -6. `platform-agent` validates that those files exist and exposes them to the agent through the upstream attachment mechanism. - -Important detail: - -- the surface should not rewrite the user message into a synthetic file summary unless the message body is empty -- when body is empty, the surface may send a minimal synthetic text such as `User sent one or more attachments.` - -### Outbound file from agent to Matrix user - -1. The agent uses `send_file(path)`. -2. `platform-agent` emits `MsgEventSendFile(path=...)`. -3. The Matrix integration catches that event. -4. The Matrix bot resolves the file inside shared `/workspace`. -5. The Matrix bot uploads the file to Matrix and sends the appropriate media message to the room. - -Surface behavior: - -- if MIME type and extension are known, send the closest native Matrix media type -- otherwise send as `m.file` -- user-visible failures must be explicit if the referenced file does not exist or cannot be uploaded - -## Filesystem Layout - -The Matrix surface owns a dedicated subtree: - -```text -/workspace/ - surfaces/ - matrix/ - / - / - inbox/ - 20260420-153000-report.pdf -``` - -Design constraints: - -- sanitize user ids and room ids before using them as path components -- preserve the original filename in the final basename where possible -- prefix filenames with a timestamp or unique id to avoid collisions - -This layout is intentionally surface-scoped. The agent may read these files, but the surface remains the owner of how inbound messenger files are organized. - -## Components - -### Matrix attachment storage helper - -Add a focused helper module responsible for: - -- building stable workspace-relative paths -- sanitizing path components -- downloading Matrix media into `/workspace` -- returning attachment metadata needed by the platform layer - -This helper should not know about agent transport details beyond the final relative path output. - -### Real platform client - -`RealPlatformClient` must pass attachment relative paths through to `AgentApi.send_message(...)`. - -It must also surface non-text agent events needed by the Matrix adapter, especially `MsgEventSendFile`. - -### Agent API wrapper - -`AgentApiWrapper` must be compatible with the modern upstream protocol: - -- `/v1/agent_ws/{chat_id}/` -- `attachments` on outgoing user messages -- `MsgEventToolCallChunk` -- `MsgEventToolResult` -- `MsgEventCustomUpdate` -- `MsgEventSendFile` -- `MsgEventEnd` - -### Matrix bot outbound renderer - -The Matrix adapter must support sending files back to the room. - -At minimum it needs: - -- path resolution inside shared workspace -- Matrix upload of the local file -- send of an `m.file` or native media event with filename and MIME type - -## Deployment Changes - -### Compose - -The repository root `docker-compose.yml` becomes the primary prod-like local runtime. - -It should define at least: - -- `matrix-bot` -- `platform-agent` -- one shared volume mounted as `/workspace` into both services - -The default developer workflow should stop describing `platform-agent` as a separately started side process. - -### Environment - -The Matrix bot must connect to the in-compose `platform-agent` service by service name, not by assuming a separately launched localhost process. - -The agent WebSocket configuration in docs and examples must match the modern upstream route. - -## Error Handling - -### Incoming files - -If the Matrix bot cannot download or persist the file: - -- do not send a broken attachment path to the agent -- return a user-visible error in the room -- log the Matrix event id, room id, and failure reason - -### Outbound files - -If the agent asks to send a missing file: - -- log a structured warning with the requested path -- send a user-visible message that the file could not be delivered - -### Shared workspace mismatch - -If the runtime is misconfigured and `/workspace` is not actually shared: - -- inbound attachments will fail agent-side path validation -- outbound `send_file` will fail surface-side file resolution - -The implementation should make such failures obvious in logs rather than silently degrading to text-only behavior. - -## Testing - -The implementation must cover: - -- Matrix media download writes into the expected workspace-relative path -- `RealPlatformClient` forwards attachment relative paths to the agent API -- Matrix plain messages with attachments preserve the original text while adding attachment paths -- empty-body attachment-only messages produce the synthetic text fallback -- `AgentApiWrapper` accepts `MsgEventSendFile` without treating it as unknown -- Matrix outbound file handling converts `MsgEventSendFile` into a Matrix upload/send call -- compose configuration mounts the same workspace into both containers - -## Non-Goals - -- no inline text extraction MVP -- no temporary URL-passing contract to the agent -- no fake “prod” mode with separate local filesystems -- no platform API additions in this phase - -## Success Criteria - -- the default local runtime uses a shared `/workspace` -- a user can send a file in Matrix and the agent receives it through upstream `attachments` -- the agent can emit `send_file(path)` and the Matrix user receives the file in the same room -- our runtime behavior matches the current platform contract closely enough that moving from local compose to production does not require redesigning file flow diff --git a/docs/superpowers/specs/2026-04-20-matrix-staged-attachments-design.md b/docs/superpowers/specs/2026-04-20-matrix-staged-attachments-design.md deleted file mode 100644 index ae8a11a..0000000 --- a/docs/superpowers/specs/2026-04-20-matrix-staged-attachments-design.md +++ /dev/null @@ -1,262 +0,0 @@ -# Matrix Staged Attachments Design - -## Goal - -Make file sending in the Matrix surface usable for an AI agent despite current Matrix client behavior, especially in Element where media is often sent immediately as separate events without a shared text composer. - -The result should be: - -- files can arrive before the user writes the actual instruction -- the surface stages those files instead of immediately sending them to the agent -- the next normal user message in the same chat commits all staged files as one agent turn -- the user can inspect and remove staged files with short chat commands - -## Core Decision - -The selected UX model is: - -`incoming Matrix media -> staged attachments for (chat_id, user_id) -> next normal message commits them` - -This means: - -- attachment-only events do not immediately invoke the agent -- the bot acknowledges staged files with a service message -- the next normal user message sends text plus all currently staged files to the agent -- staged files are then cleared - -## Why This Decision - -Matrix natively models messages as separate events, and common clients do not provide a reliable "one text message with many attachments" composer flow. - -In practice this causes two UX failures for an AI bot: - -- users may send files first and only then write the task -- users may send multiple files as multiple independent Matrix events - -If the surface treats each incoming file as a full agent turn, the bot becomes noisy and context-fragmented. If it ignores file-only messages, file handling feels broken. - -Staging is the smallest surface-side abstraction that fixes both problems without fighting the Matrix event model. - -## Scope - -This design covers: - -- staging inbound Matrix attachments before agent submission -- per-chat attachment state for a specific user -- user-facing service messages for staged attachments -- short commands for listing and removing staged files -- commit behavior on the next normal message - -This design does not cover: - -- edits or redactions of original Matrix media events as attachment controls -- cross-surface shared staging -- thread-aware staging beyond the existing `chat_id` boundary -- changes to the platform attachment contract - -## State Model - -### Staging key - -Staged attachments are isolated by: - -- `chat_id` -- `user_id` - -This means: - -- files staged by a user in one chat never appear in another chat -- files staged by one user do not mix with another user's files in the same room - -### Staged attachment record - -Each staged attachment must track at least: - -- stable internal id -- display filename -- workspace-relative path -- MIME type if known -- created timestamp - -User-visible commands operate on the current ordered list, not on internal ids. - -### Lifecycle - -A staged attachment is in exactly one of these states: - -1. `staged` -2. `committed` -3. `removed` - -Rules: - -- only `staged` attachments appear in `!list` -- `committed` attachments are no longer user-removable -- `removed` attachments are excluded from future commits - -## Inbound Behavior - -### Attachment-only event - -If the Matrix surface receives one or more file/media events from a user without a normal text message to commit them: - -1. download each file into shared `/workspace` -2. add each file to the staged set for `(chat_id, user_id)` -3. do not call the agent yet -4. send a service acknowledgment message - -### Service acknowledgment - -The service message must communicate: - -- the current staged attachment list with indices -- that the next normal message will be sent to the agent together with those files -- available commands: `!list`, `!remove `, `!remove all` - -Example shape: - -```text -Staged attachments: -1. screenshot.png -2. invoice.pdf - -Your next message will be sent to the agent with these files. -Commands: !list, !remove , !remove all -``` - -### Burst handling - -Matrix clients may send multiple files as separate consecutive events. - -To avoid bot spam, service acknowledgments should be debounced over a short window and aggregated into one reply where feasible. - -The acknowledgment must reflect the full current staged set, not only the most recently received file. - -## Commit Behavior - -### Commit trigger - -The commit trigger is: - -- the next normal user message in the same `(chat_id, user_id)` scope - -Normal user message means: - -- not a staging control command -- not a pure attachment event being staged - -### Commit action - -When a commit-triggering message arrives: - -1. collect all currently staged attachments for `(chat_id, user_id)` -2. send the user text plus those attachments to the agent as one turn -3. mark all included staged attachments as `committed` -4. clear the staged set - -After commit: - -- the just-sent attachments must no longer appear in `!list` -- a later file upload starts a new staged set - -## Commands - -### `!list` - -Shows the current staged attachment list for the user in the current chat. - -If the list is empty, the response should be short and explicit. - -### `!remove ` - -Removes the staged attachment at the current 1-based index. - -Behavior: - -- if the index is valid, remove that staged attachment and return the updated staged list -- if the index is invalid, return a short error without repeating the list - -### `!remove all` - -Clears the entire staged set for the user in the current chat. - -The response should be short and explicit. - -## Ordering Rules - -The staged list is ordered by staging time. - -User-facing indices: - -- are 1-based -- are recalculated from the current staged set -- may change after removals - -Therefore: - -- `!list` always shows the current authoritative numbering -- after a successful `!remove `, the bot should reply with the refreshed list - -## Error Handling - -### Download failure - -If a file cannot be downloaded or stored: - -- do not add it to the staged set -- do not pretend it will be sent later -- send a short user-visible failure message - -### Invalid command - -If the command is malformed or uses an invalid index: - -- return a short error -- do not commit staged attachments -- do not clear the staged set - -### Agent submission failure - -If commit fails when sending the text plus staged files to the agent: - -- staged attachments must remain available for retry unless the failure is known to be irreversible -- the user-visible error should make it clear that the files were not consumed - -This prevents silent loss of staged context. - -## Interaction with Shared Workspace Design - -This design assumes the shared-workspace contract defined in -[2026-04-20-matrix-shared-workspace-file-flow-design.md](/Users/a/MAI/sem2/lambda/surfaces-bot/docs/superpowers/specs/2026-04-20-matrix-shared-workspace-file-flow-design.md). - -Specifically: - -- staged files are stored in shared `/workspace` -- the final commit still passes workspace-relative paths to `platform-agent` -- staging changes only when the surface chooses to invoke the agent, not how attachments are represented - -## Testing - -The implementation must cover: - -- file-only Matrix events are staged and do not immediately invoke the agent -- service acknowledgment includes staged filenames and command hints -- `!list` returns the current staged set for the correct `(chat_id, user_id)` -- `!remove ` removes the correct staged attachment and refreshes numbering -- `!remove all` clears the staged set -- invalid `!remove ` returns a short error and keeps state unchanged -- the next normal message commits all staged attachments with the text as one agent turn -- committed attachments disappear from staging after success -- failed commits preserve staged attachments -- staging in one chat does not leak into another chat -- staging for one user does not leak to another user in the same room - -## Non-Goals - -This design intentionally does not attempt to: - -- emulate Telegram-style albums in Matrix -- rely on special support from Element or other Matrix clients -- introduce a rich interactive attachment management UI - -The goal is a reliable chat-native workflow that works within Matrix's actual event model. diff --git a/sdk/agent_api_wrapper.py b/sdk/agent_api_wrapper.py index 94205ea..32f126d 100644 --- a/sdk/agent_api_wrapper.py +++ b/sdk/agent_api_wrapper.py @@ -86,55 +86,6 @@ class AgentApiWrapper(AgentApi): **self._init_kwargs, ) - @staticmethod - def _event_kind(event: object) -> str: - raw_kind = getattr(event, "type", None) - if hasattr(raw_kind, "value"): - raw_kind = raw_kind.value - if raw_kind is None: - raw_kind = event.__class__.__name__ - - kind = str(raw_kind).replace("-", "_") - if "_" in kind: - return kind.upper() - - normalized = [] - for index, char in enumerate(kind): - if index and char.isupper() and not kind[index - 1].isupper(): - normalized.append("_") - normalized.append(char) - return "".join(normalized).upper() - - @classmethod - def _is_kind(cls, event: object, *needles: str) -> bool: - kind = cls._event_kind(event) - return any(needle in kind for needle in needles) - - @classmethod - def _is_text_event(cls, event: object) -> bool: - return hasattr(event, "text") or cls._is_kind(event, "TEXT_CHUNK") - - @classmethod - def _is_end_event(cls, event: object) -> bool: - kind = cls._event_kind(event) - return kind == "END" or kind.endswith("_END") - - @classmethod - def _is_send_file_event(cls, event: object) -> bool: - return "SEND_FILE" in cls._event_kind(event) - - async def _publish_event(self, event: object, *, queue_event: object | None = None) -> None: - if self.callback: - self.callback(event) - if self._current_queue: - await self._current_queue.put(queue_event if queue_event is not None else event) - - async def _publish_error(self, event: object) -> None: - if self.callback: - self.callback(event) - if self._current_queue and hasattr(event, "code") and hasattr(event, "details"): - await self._current_queue.put(AgentException(getattr(event, "code"), getattr(event, "details"))) - async def _listen(self): try: async for msg in self._ws: @@ -142,7 +93,7 @@ class AgentApiWrapper(AgentApi): try: outgoing_msg = ServerMessage.validate_json(msg.data) - if self._is_text_event(outgoing_msg): + if isinstance(outgoing_msg, MsgEventTextChunk): if self._current_queue: await self._current_queue.put(outgoing_msg) elif self.callback: @@ -150,22 +101,29 @@ class AgentApiWrapper(AgentApi): else: logger.warning("[%s] AgentEvent without active request", self.id) - elif self._is_end_event(outgoing_msg): + elif isinstance(outgoing_msg, MsgEventEnd): self.last_tokens_used = outgoing_msg.tokens_used - await self._publish_event(outgoing_msg) + if self._current_queue: + await self._current_queue.put(outgoing_msg) - elif self._is_kind(outgoing_msg, "ERROR"): + elif isinstance(outgoing_msg, MsgError): + if self.callback: + self.callback(outgoing_msg) error = AgentException(outgoing_msg.code, outgoing_msg.details) logger.error("[%s] Agent error: %s", self.id, error) - await self._publish_error(outgoing_msg) + if self._current_queue: + await self._current_queue.put(error) - elif self._is_kind(outgoing_msg, "GRACEFUL_DISCONNECT"): - await self._publish_event(outgoing_msg) + elif isinstance(outgoing_msg, MsgGracefulDisconnect): + if self.callback: + self.callback(outgoing_msg) logger.info("[%s] Gracefully disconnecting", self.id) break else: - await self._publish_event(outgoing_msg) + logger.warning("[%s] Unknown message type: %s", self.id, outgoing_msg.type) + if self.callback: + self.callback(outgoing_msg) except Exception as exc: logger.error("[%s] Failed to deserialize message: %s", self.id, exc) diff --git a/sdk/interface.py b/sdk/interface.py index c885867..e1ff12e 100644 --- a/sdk/interface.py +++ b/sdk/interface.py @@ -4,7 +4,7 @@ from __future__ import annotations from datetime import datetime from typing import Any, AsyncIterator, Literal, Protocol -from pydantic import BaseModel, Field +from pydantic import BaseModel class User(BaseModel): @@ -17,11 +17,10 @@ class User(BaseModel): class Attachment(BaseModel): - url: str | None = None - mime_type: str | None = None + url: str + mime_type: str size: int | None = None filename: str | None = None - workspace_path: str | None = None class MessageResponse(BaseModel): @@ -29,7 +28,6 @@ class MessageResponse(BaseModel): response: str tokens_used: int finished: bool - attachments: list[Attachment] = Field(default_factory=list) class MessageChunk(BaseModel): diff --git a/sdk/real.py b/sdk/real.py index 71803f4..f6e40ed 100644 --- a/sdk/real.py +++ b/sdk/real.py @@ -1,8 +1,6 @@ from __future__ import annotations import asyncio -import inspect -from pathlib import Path from typing import AsyncIterator from sdk.agent_api_wrapper import AgentApiWrapper @@ -73,43 +71,21 @@ class RealPlatformClient(PlatformClient): ) -> MessageResponse: response_parts: list[str] = [] tokens_used = 0 - sent_attachments: list[Attachment] = [] message_id = user_id - saw_end_event = False - lock = self._get_chat_send_lock(chat_id) - async with lock: - chat_api = await self._get_chat_api(chat_id) - if hasattr(chat_api, "last_tokens_used"): - chat_api.last_tokens_used = 0 + async for chunk in self.stream_message(user_id, chat_id, text, attachments=attachments): + message_id = chunk.message_id + if chunk.delta: + response_parts.append(chunk.delta) + if chunk.finished: + tokens_used = chunk.tokens_used - async for event in self._stream_agent_events(chat_api, text, attachments=attachments): - message_id = user_id - if self._is_text_event(event): - chunk_text = getattr(event, "text", "") - if chunk_text: - response_parts.append(chunk_text) - elif self._is_end_event(event): - tokens_used = getattr(event, "tokens_used", tokens_used) - saw_end_event = True - elif self._is_send_file_event(event): - attachment = self._attachment_from_send_file_event(event) - if attachment is not None: - sent_attachments.append(attachment) - - if not saw_end_event: - tokens_used = getattr(chat_api, "last_tokens_used", tokens_used) - await self._prototype_state.set_last_tokens_used(str(chat_id), tokens_used) - - response_kwargs = { - "message_id": message_id, - "response": "".join(response_parts), - "tokens_used": tokens_used, - "finished": True, - } - if self._message_response_accepts_attachments(): - response_kwargs["attachments"] = sent_attachments - return MessageResponse(**response_kwargs) + return MessageResponse( + message_id=message_id, + response="".join(response_parts), + tokens_used=tokens_used, + finished=True, + ) async def stream_message( self, @@ -123,37 +99,20 @@ class RealPlatformClient(PlatformClient): chat_api = await self._get_chat_api(chat_id) if hasattr(chat_api, "last_tokens_used"): chat_api.last_tokens_used = 0 - saw_end_event = False - async for event in self._stream_agent_events(chat_api, text, attachments=attachments): - if self._is_text_event(event): - yield MessageChunk( - message_id=user_id, - delta=getattr(event, "text", ""), - finished=False, - ) - elif self._is_end_event(event): - tokens_used = getattr(event, "tokens_used", 0) - saw_end_event = True - await self._prototype_state.set_last_tokens_used(str(chat_id), tokens_used) - yield MessageChunk( - message_id=user_id, - delta="", - finished=True, - tokens_used=tokens_used, - ) - elif self._is_send_file_event(event): - continue - else: - continue - if not saw_end_event: - tokens_used = getattr(chat_api, "last_tokens_used", 0) - await self._prototype_state.set_last_tokens_used(str(chat_id), tokens_used) + async for event in chat_api.send_message(text): yield MessageChunk( message_id=user_id, - delta="", - finished=True, - tokens_used=tokens_used, + delta=event.text, + finished=False, ) + tokens_used = getattr(chat_api, "last_tokens_used", 0) + await self._prototype_state.set_last_tokens_used(str(chat_id), tokens_used) + yield MessageChunk( + message_id=user_id, + delta="", + finished=True, + tokens_used=tokens_used, + ) async def get_settings(self, user_id: str) -> UserSettings: return await self._prototype_state.get_settings(user_id) @@ -181,107 +140,3 @@ class RealPlatformClient(PlatformClient): close = getattr(self._agent_api, "close", None) if callable(close): await close() - - async def _stream_agent_events( - self, - chat_api, - text: str, - attachments: list[Attachment] | None = None, - ) -> AsyncIterator[object]: - send_message = chat_api.send_message - attachment_paths = self._attachment_paths(attachments) - if attachment_paths and self._send_message_accepts_attachments(send_message): - event_stream = send_message(text, attachments=attachment_paths) - else: - event_stream = send_message(text) - async for event in event_stream: - yield event - - @staticmethod - def _attachment_paths(attachments: list[Attachment] | None) -> list[str]: - if not attachments: - return [] - paths = [] - for attachment in attachments: - if attachment.workspace_path: - paths.append(attachment.workspace_path) - return paths - - @staticmethod - def _send_message_accepts_attachments(send_message) -> bool: - try: - parameters = inspect.signature(send_message).parameters - except (TypeError, ValueError): - return False - return "attachments" in parameters or any( - parameter.kind == inspect.Parameter.VAR_KEYWORD for parameter in parameters.values() - ) - - @staticmethod - def _event_kind(event: object) -> str: - raw_kind = getattr(event, "type", None) - if hasattr(raw_kind, "value"): - raw_kind = raw_kind.value - if raw_kind is None: - raw_kind = event.__class__.__name__ - - kind = str(raw_kind).replace("-", "_") - if "_" in kind: - return kind.upper() - normalized = [] - for index, char in enumerate(kind): - if index and char.isupper() and not kind[index - 1].isupper(): - normalized.append("_") - normalized.append(char) - return "".join(normalized).upper() - - @classmethod - def _is_text_event(cls, event: object) -> bool: - return hasattr(event, "text") or "TEXT_CHUNK" in cls._event_kind(event) - - @classmethod - def _is_end_event(cls, event: object) -> bool: - kind = cls._event_kind(event) - return kind == "END" or kind.endswith("_END") - - @classmethod - def _is_send_file_event(cls, event: object) -> bool: - kind = cls._event_kind(event) - return "SEND_FILE" in kind - - @staticmethod - def _attachment_from_send_file_event(event: object) -> Attachment | None: - location = None - for attr in ("url", "workspace_path", "path", "file_path", "uri"): - value = getattr(event, attr, None) - if value: - location = str(value) - break - if location is None: - return None - - mime_type = getattr(event, "mime_type", None) or "application/octet-stream" - filename = getattr(event, "filename", None) or Path(location).name or None - size = getattr(event, "size", None) - workspace_path = location - if workspace_path.startswith("/workspace/"): - workspace_path = workspace_path[len("/workspace/"):] - elif workspace_path == "/workspace": - workspace_path = "" - return Attachment( - url=location, - mime_type=mime_type, - size=size, - filename=filename, - workspace_path=workspace_path or None, - ) - - @staticmethod - def _message_response_accepts_attachments() -> bool: - fields = getattr(MessageResponse, "model_fields", None) - if isinstance(fields, dict): - return "attachments" in fields - try: - return "attachments" in inspect.signature(MessageResponse).parameters - except (TypeError, ValueError): - return False diff --git a/tests/adapter/matrix/test_converter.py b/tests/adapter/matrix/test_converter.py index 3513913..ecaecdc 100644 --- a/tests/adapter/matrix/test_converter.py +++ b/tests/adapter/matrix/test_converter.py @@ -37,41 +37,7 @@ def image_event(url: str = "mxc://x/img", mime: str = "image/jpeg"): ) -def content_file_event(): - return SimpleNamespace( - sender="@a:m.org", - body="doc.pdf", - event_id="$e4", - msgtype=None, - replyto_event_id=None, - content={ - "msgtype": "m.file", - "body": "nested.pdf", - "url": "mxc://x/nested", - "info": {"mimetype": "application/pdf"}, - }, - ) - - -def source_only_content_file_event(): - return SimpleNamespace( - sender="@a:m.org", - body="doc.pdf", - event_id="$e5", - msgtype=None, - replyto_event_id=None, - source={ - "content": { - "msgtype": "m.file", - "body": "source-only.pdf", - "url": "mxc://x/source-only", - "info": {"mimetype": "application/pdf"}, - } - }, - ) - - -def test_plain_text_to_incoming_message(): +async def test_plain_text_to_incoming_message(): result = from_room_event(text_event("Hello"), room_id="!r:m.org", chat_id="C1") assert isinstance(result, IncomingMessage) assert result.text == "Hello" @@ -80,48 +46,20 @@ def test_plain_text_to_incoming_message(): assert result.attachments == [] -def test_bang_command_to_incoming_command(): +async def test_bang_command_to_incoming_command(): result = from_room_event(text_event("!new Analysis"), room_id="!r:m.org", chat_id="C1") assert isinstance(result, IncomingCommand) assert result.command == "new" assert result.args == ["Analysis"] -def test_list_command_maps_to_matrix_list_attachments(): - result = from_room_event(text_event("!list"), room_id="!r:m.org", chat_id="C1") - assert isinstance(result, IncomingCommand) - assert result.command == "matrix_list_attachments" - assert result.args == [] - - -def test_remove_all_maps_to_matrix_remove_attachment(): - result = from_room_event(text_event("!remove all"), room_id="!r:m.org", chat_id="C1") - assert isinstance(result, IncomingCommand) - assert result.command == "matrix_remove_attachment" - assert result.args == ["all"] - - -def test_remove_index_maps_to_matrix_remove_attachment(): - result = from_room_event(text_event("!remove 2"), room_id="!r:m.org", chat_id="C1") - assert isinstance(result, IncomingCommand) - assert result.command == "matrix_remove_attachment" - assert result.args == ["2"] - - -def test_remove_arbitrary_index_maps_to_matrix_remove_attachment(): - result = from_room_event(text_event("!remove 99"), room_id="!r:m.org", chat_id="C1") - assert isinstance(result, IncomingCommand) - assert result.command == "matrix_remove_attachment" - assert result.args == ["99"] - - -def test_skills_alias_to_settings_command(): +async def test_skills_alias_to_settings_command(): result = from_command("!skills", sender="@a:m.org", chat_id="C1") assert isinstance(result, IncomingCommand) assert result.command == "settings_skills" -def test_yes_to_callback(): +async def test_yes_to_callback(): result = from_room_event(text_event("!yes"), room_id="!room:example.org", chat_id="C7") assert isinstance(result, IncomingCallback) assert result.action == "confirm" @@ -129,7 +67,7 @@ def test_yes_to_callback(): assert result.payload["room_id"] == "!room:example.org" -def test_no_to_callback(): +async def test_no_to_callback(): result = from_room_event(text_event("!no"), room_id="!room:example.org", chat_id="C7") assert isinstance(result, IncomingCallback) assert result.action == "cancel" @@ -137,7 +75,7 @@ def test_no_to_callback(): assert result.payload["room_id"] == "!room:example.org" -def test_file_attachment(): +async def test_file_attachment(): result = from_room_event(file_event(), room_id="!r:m.org", chat_id="C1") assert isinstance(result, IncomingMessage) assert len(result.attachments) == 1 @@ -148,32 +86,11 @@ def test_file_attachment(): assert a.mime_type == "application/pdf" -def test_image_attachment(): +async def test_image_attachment(): result = from_room_event(image_event(), room_id="!r:m.org", chat_id="C1") assert result.attachments[0].type == "image" - assert result.attachments[0].filename == "img.jpg" assert result.attachments[0].mime_type == "image/jpeg" -def test_attachment_falls_back_to_content_payload(): - result = from_room_event(content_file_event(), room_id="!r:m.org", chat_id="C1") - assert isinstance(result, IncomingMessage) - a = result.attachments[0] - assert a.type == "document" - assert a.url == "mxc://x/nested" - assert a.filename == "nested.pdf" - assert a.mime_type == "application/pdf" - - -def test_attachment_falls_back_to_source_content_payload(): - result = from_room_event(source_only_content_file_event(), room_id="!r:m.org", chat_id="C1") - assert isinstance(result, IncomingMessage) - a = result.attachments[0] - assert a.type == "document" - assert a.url == "mxc://x/source-only" - assert a.filename == "source-only.pdf" - assert a.mime_type == "application/pdf" - - def test_converter_module_does_not_expose_reaction_callbacks(): assert not hasattr(converter, "from_reaction") diff --git a/tests/adapter/matrix/test_dispatcher.py b/tests/adapter/matrix/test_dispatcher.py index e2cae34..10a4f36 100644 --- a/tests/adapter/matrix/test_dispatcher.py +++ b/tests/adapter/matrix/test_dispatcher.py @@ -4,36 +4,20 @@ import importlib from types import SimpleNamespace from unittest.mock import AsyncMock -import pytest -from nio import ( - RoomMessageAudio, - RoomMessageFile, - RoomMessageImage, - RoomMessageText, - RoomMessageVideo, -) from nio.api import RoomVisibility from nio.responses import SyncResponse from adapter.matrix.bot import MatrixBot, build_runtime, prepare_live_sync from adapter.matrix.handlers.auth import handle_invite from adapter.matrix.store import ( - add_staged_attachment, get_platform_chat_id, get_room_meta, - get_staged_attachments, get_user_meta, set_load_pending, set_room_meta, set_user_meta, ) -from core.protocol import ( - Attachment, - IncomingCallback, - IncomingCommand, - IncomingMessage, - OutgoingMessage, -) +from core.protocol import IncomingCallback, IncomingCommand, OutgoingMessage from sdk.interface import PlatformError from sdk.mock import MockPlatformClient from sdk.real import RealPlatformClient @@ -43,9 +27,7 @@ async def test_matrix_dispatcher_registers_custom_handlers(): runtime = build_runtime(platform=MockPlatformClient()) current_chat_id = "C9" - start = IncomingCommand( - user_id="u1", platform="matrix", chat_id=current_chat_id, command="start" - ) + start = IncomingCommand(user_id="u1", platform="matrix", chat_id=current_chat_id, command="start") await runtime.dispatcher.dispatch(start) new = IncomingCommand( @@ -111,9 +93,7 @@ async def test_new_chat_creates_real_matrix_room_when_client_available(): ) client.room_put_state.assert_awaited_once() put_call = client.room_put_state.call_args - assert ( - put_call.kwargs.get("room_id") == "!space:example" or put_call.args[0] == "!space:example" - ) + assert put_call.kwargs.get("room_id") == "!space:example" or put_call.args[0] == "!space:example" chats = await runtime.chat_mgr.list_active("u1") assert [c.chat_id for c in chats] == ["C7"] assert [c.surface_ref for c in chats] == ["!r2:example"] @@ -159,10 +139,7 @@ async def test_invite_event_creates_space_and_chat_room(): client.room_put_state.assert_awaited_once() put_state_call = client.room_put_state.call_args - assert ( - put_state_call.kwargs.get("event_type") == "m.space.child" - or put_state_call.args[1] == "m.space.child" - ) + assert put_state_call.kwargs.get("event_type") == "m.space.child" or put_state_call.args[1] == "m.space.child" user_meta = await get_user_meta(runtime.store, "@alice:example.org") assert user_meta is not None @@ -272,10 +249,7 @@ async def test_bot_assigns_platform_chat_id_for_existing_managed_room(): await bot.on_room_message(room, event) - assert ( - await get_platform_chat_id(runtime.store, "!chat1:example.org") - == "matrix:!chat1:example.org" - ) + assert await get_platform_chat_id(runtime.store, "!chat1:example.org") == "matrix:!chat1:example.org" runtime.dispatcher.dispatch.assert_awaited_once() @@ -304,316 +278,6 @@ async def test_bot_routes_plain_messages_via_platform_chat_id(): assert dispatched.text == "hello" -async def test_bot_downloads_matrix_file_to_workspace_before_staging(tmp_path, monkeypatch): - monkeypatch.setenv("SURFACES_WORKSPACE_DIR", str(tmp_path)) - runtime = build_runtime(platform=MockPlatformClient()) - await set_room_meta( - runtime.store, - "!chat1:example.org", - { - "chat_id": "C1", - "matrix_user_id": "@alice:example.org", - "platform_chat_id": "matrix:ctx-1", - }, - ) - client = SimpleNamespace( - user_id="@bot:example.org", - download=AsyncMock(return_value=SimpleNamespace(body=b"%PDF-1.7")), - ) - bot = MatrixBot(client, runtime) - bot._send_all = AsyncMock() - runtime.dispatcher.dispatch = AsyncMock(return_value=[]) - room = SimpleNamespace(room_id="!chat1:example.org") - event = SimpleNamespace( - sender="@alice:example.org", - body="report.pdf", - msgtype="m.file", - replyto_event_id=None, - url="mxc://server/id", - mimetype="application/pdf", - ) - - await bot.on_room_message(room, event) - - runtime.dispatcher.dispatch.assert_not_awaited() - staged = await get_staged_attachments(runtime.store, "!chat1:example.org", "@alice:example.org") - assert staged[0]["workspace_path"] is not None - assert (tmp_path / staged[0]["workspace_path"]).read_bytes() == b"%PDF-1.7" - bot._send_all.assert_not_awaited() - - -async def test_file_only_event_is_staged_and_does_not_dispatch(): - runtime = build_runtime(platform=MockPlatformClient()) - client = SimpleNamespace(user_id="@bot:example.org", room_send=AsyncMock()) - bot = MatrixBot(client, runtime) - runtime.dispatcher.dispatch = AsyncMock(return_value=[]) - bot._materialize_incoming_attachments = AsyncMock( - return_value=IncomingMessage( - user_id="@alice:example.org", - platform="matrix", - chat_id="!r:example.org", - text="", - attachments=[ - Attachment( - type="document", - filename="report.pdf", - workspace_path="surfaces/matrix/alice/r/inbox/report.pdf", - mime_type="application/pdf", - ) - ], - ) - ) - room = SimpleNamespace(room_id="!r:example.org") - event = SimpleNamespace( - sender="@alice:example.org", - body="report.pdf", - msgtype="m.file", - url="mxc://hs/id", - mimetype="application/pdf", - replyto_event_id=None, - ) - - await bot.on_room_message(room, event) - - runtime.dispatcher.dispatch.assert_not_awaited() - staged = await get_staged_attachments(runtime.store, "!r:example.org", "@alice:example.org") - assert [item["filename"] for item in staged] == ["report.pdf"] - client.room_send.assert_not_awaited() - - -async def test_list_command_returns_current_staged_attachments(): - runtime = build_runtime(platform=MockPlatformClient()) - await add_staged_attachment( - runtime.store, - "!r:example.org", - "@alice:example.org", - {"filename": "a.pdf", "workspace_path": "a.pdf"}, - ) - await add_staged_attachment( - runtime.store, - "!r:example.org", - "@alice:example.org", - {"filename": "b.pdf", "workspace_path": "b.pdf"}, - ) - client = SimpleNamespace(user_id="@bot:example.org", room_send=AsyncMock()) - bot = MatrixBot(client, runtime) - runtime.dispatcher.dispatch = AsyncMock(return_value=[]) - room = SimpleNamespace(room_id="!r:example.org") - event = SimpleNamespace( - sender="@alice:example.org", body="!list", msgtype="m.text", replyto_event_id=None - ) - - await bot.on_room_message(room, event) - - runtime.dispatcher.dispatch.assert_not_awaited() - body = client.room_send.await_args.args[2]["body"] - assert "1. a.pdf" in body - assert "2. b.pdf" in body - - -async def test_remove_invalid_index_returns_short_error(): - runtime = build_runtime(platform=MockPlatformClient()) - await add_staged_attachment( - runtime.store, - "!r:example.org", - "@alice:example.org", - {"filename": "a.pdf", "workspace_path": "a.pdf"}, - ) - client = SimpleNamespace(user_id="@bot:example.org", room_send=AsyncMock()) - bot = MatrixBot(client, runtime) - runtime.dispatcher.dispatch = AsyncMock(return_value=[]) - room = SimpleNamespace(room_id="!r:example.org") - event = SimpleNamespace( - sender="@alice:example.org", body="!remove 9", msgtype="m.text", replyto_event_id=None - ) - - await bot.on_room_message(room, event) - - runtime.dispatcher.dispatch.assert_not_awaited() - assert client.room_send.await_args.args[2]["body"] == "Нет такого вложения." - - -async def test_remove_attachment_updates_list_and_state(): - runtime = build_runtime(platform=MockPlatformClient()) - await add_staged_attachment( - runtime.store, - "!r:example.org", - "@alice:example.org", - {"filename": "a.pdf", "workspace_path": "a.pdf"}, - ) - await add_staged_attachment( - runtime.store, - "!r:example.org", - "@alice:example.org", - {"filename": "b.pdf", "workspace_path": "b.pdf"}, - ) - client = SimpleNamespace(user_id="@bot:example.org", room_send=AsyncMock()) - bot = MatrixBot(client, runtime) - runtime.dispatcher.dispatch = AsyncMock(return_value=[]) - room = SimpleNamespace(room_id="!r:example.org") - event = SimpleNamespace( - sender="@alice:example.org", body="!remove 1", msgtype="m.text", replyto_event_id=None - ) - - await bot.on_room_message(room, event) - - runtime.dispatcher.dispatch.assert_not_awaited() - staged = await get_staged_attachments(runtime.store, "!r:example.org", "@alice:example.org") - assert [item["filename"] for item in staged] == ["b.pdf"] - body = client.room_send.await_args.args[2]["body"] - assert "1. b.pdf" in body - assert "a.pdf" not in body - - -async def test_remove_all_clears_state(): - runtime = build_runtime(platform=MockPlatformClient()) - await add_staged_attachment( - runtime.store, - "!r:example.org", - "@alice:example.org", - {"filename": "a.pdf", "workspace_path": "a.pdf"}, - ) - client = SimpleNamespace(user_id="@bot:example.org", room_send=AsyncMock()) - bot = MatrixBot(client, runtime) - runtime.dispatcher.dispatch = AsyncMock(return_value=[]) - room = SimpleNamespace(room_id="!r:example.org") - event = SimpleNamespace( - sender="@alice:example.org", - body="!remove all", - msgtype="m.text", - replyto_event_id=None, - ) - - await bot.on_room_message(room, event) - - runtime.dispatcher.dispatch.assert_not_awaited() - assert await get_staged_attachments(runtime.store, "!r:example.org", "@alice:example.org") == [] - assert client.room_send.await_args.args[2]["body"] == "Все вложения удалены." - - -async def test_staged_attachment_commands_are_scoped_by_room_and_user(): - runtime = build_runtime(platform=MockPlatformClient()) - await add_staged_attachment( - runtime.store, - "!r-one:example.org", - "@alice:example.org", - {"filename": "alice-room-one.pdf", "workspace_path": "alice-room-one.pdf"}, - ) - await add_staged_attachment( - runtime.store, - "!r-two:example.org", - "@alice:example.org", - {"filename": "alice-room-two.pdf", "workspace_path": "alice-room-two.pdf"}, - ) - await add_staged_attachment( - runtime.store, - "!r-one:example.org", - "@bob:example.org", - {"filename": "bob-room-one.pdf", "workspace_path": "bob-room-one.pdf"}, - ) - client = SimpleNamespace(user_id="@bot:example.org", room_send=AsyncMock()) - bot = MatrixBot(client, runtime) - runtime.dispatcher.dispatch = AsyncMock(return_value=[]) - room = SimpleNamespace(room_id="!r-one:example.org") - event = SimpleNamespace( - sender="@alice:example.org", - body="!list", - msgtype="m.text", - replyto_event_id=None, - ) - - await bot.on_room_message(room, event) - - runtime.dispatcher.dispatch.assert_not_awaited() - body = client.room_send.await_args.args[2]["body"] - assert "alice-room-one.pdf" in body - assert "alice-room-two.pdf" not in body - assert "bob-room-one.pdf" not in body - - -async def test_next_normal_message_commits_staged_attachments(): - runtime = build_runtime(platform=MockPlatformClient()) - await set_room_meta( - runtime.store, - "!r:example.org", - { - "chat_id": "C1", - "matrix_user_id": "@alice:example.org", - "platform_chat_id": "matrix:ctx-1", - }, - ) - await add_staged_attachment( - runtime.store, - "!r:example.org", - "@alice:example.org", - { - "type": "document", - "filename": "report.pdf", - "workspace_path": "surfaces/matrix/alice/r/inbox/report.pdf", - "mime_type": "application/pdf", - }, - ) - client = SimpleNamespace(user_id="@bot:example.org") - bot = MatrixBot(client, runtime) - bot._send_all = AsyncMock() - runtime.dispatcher.dispatch = AsyncMock(return_value=[]) - room = SimpleNamespace(room_id="!r:example.org") - event = SimpleNamespace( - sender="@alice:example.org", - body="Проанализируй", - msgtype="m.text", - replyto_event_id=None, - ) - - await bot.on_room_message(room, event) - - dispatched = runtime.dispatcher.dispatch.await_args.args[0] - assert isinstance(dispatched, IncomingMessage) - assert dispatched.text == "Проанализируй" - assert [a.workspace_path for a in dispatched.attachments] == [ - "surfaces/matrix/alice/r/inbox/report.pdf" - ] - assert await get_staged_attachments(runtime.store, "!r:example.org", "@alice:example.org") == [] - - -async def test_failed_commit_preserves_staged_attachments(): - runtime = build_runtime(platform=MockPlatformClient()) - await set_room_meta( - runtime.store, - "!r:example.org", - { - "chat_id": "C1", - "matrix_user_id": "@alice:example.org", - "platform_chat_id": "matrix:ctx-1", - }, - ) - await add_staged_attachment( - runtime.store, - "!r:example.org", - "@alice:example.org", - { - "type": "document", - "filename": "report.pdf", - "workspace_path": "surfaces/matrix/alice/r/inbox/report.pdf", - }, - ) - client = SimpleNamespace(user_id="@bot:example.org", room_send=AsyncMock()) - bot = MatrixBot(client, runtime) - runtime.dispatcher.dispatch = AsyncMock(side_effect=PlatformError("boom")) - room = SimpleNamespace(room_id="!r:example.org") - event = SimpleNamespace( - sender="@alice:example.org", - body="Проанализируй", - msgtype="m.text", - replyto_event_id=None, - ) - - await bot.on_room_message(room, event) - - staged = await get_staged_attachments(runtime.store, "!r:example.org", "@alice:example.org") - assert [item["filename"] for item in staged] == ["report.pdf"] - - async def test_bot_keeps_commands_on_local_chat_id(): runtime = build_runtime(platform=MockPlatformClient()) await set_room_meta( @@ -686,10 +350,7 @@ async def test_bot_assigns_platform_chat_id_before_load_selection(): await bot.on_room_message(room, event) - assert ( - await get_platform_chat_id(runtime.store, "!chat1:example.org") - == "matrix:!chat1:example.org" - ) + assert await get_platform_chat_id(runtime.store, "!chat1:example.org") == "matrix:!chat1:example.org" client.room_send.assert_awaited_once_with( "!chat1:example.org", "m.room.message", @@ -754,9 +415,7 @@ async def test_unregistered_room_second_message_reuses_existing_bootstrap(): room = SimpleNamespace(room_id="!entry:example.org", display_name="Entry") await bot.on_room_message(room, SimpleNamespace(sender="@alice:example.org", body="hello")) - await bot.on_room_message( - room, SimpleNamespace(sender="@alice:example.org", body="hello again") - ) + await bot.on_room_message(room, SimpleNamespace(sender="@alice:example.org", body="hello again")) assert client.room_create.await_count == 2 room_send_calls = client.room_send.await_args_list @@ -771,43 +430,6 @@ async def test_unregistered_room_second_message_reuses_existing_bootstrap(): assert "platform_chat_id" not in entry_meta -async def test_unregistered_room_welcome_send_failure_does_not_repeat_bootstrap(): - runtime = build_runtime(platform=MockPlatformClient()) - await set_user_meta(runtime.store, "@alice:example.org", {"next_chat_index": 1}) - space_resp = SimpleNamespace(room_id="!space:example.org") - chat_resp = SimpleNamespace(room_id="!chat1:example.org") - client = SimpleNamespace( - user_id="@bot:example.org", - room_create=AsyncMock(side_effect=[space_resp, chat_resp]), - room_put_state=AsyncMock(), - room_send=AsyncMock(side_effect=[RuntimeError("welcome failed"), None]), - ) - bot = MatrixBot(client, runtime) - room = SimpleNamespace(room_id="!entry:example.org", display_name="Entry") - - with pytest.raises(RuntimeError, match="welcome failed"): - await bot.on_room_message(room, SimpleNamespace(sender="@alice:example.org", body="hello")) - - entry_meta = await get_room_meta(runtime.store, "!entry:example.org") - assert entry_meta == { - "matrix_user_id": "@alice:example.org", - "redirect_room_id": "!chat1:example.org", - "redirect_chat_id": "C1", - } - - await bot.on_room_message( - room, SimpleNamespace(sender="@alice:example.org", body="hello again") - ) - - assert client.room_create.await_count == 2 - room_send_calls = client.room_send.await_args_list - assert any( - call.args[0] == "!entry:example.org" - and "Рабочий чат уже создан: C1" in call.args[2]["body"] - for call in room_send_calls - ) - - async def test_unregistered_room_creates_new_chat_in_existing_space(): runtime = build_runtime(platform=MockPlatformClient()) await set_user_meta( @@ -844,9 +466,7 @@ async def test_mat11_settings_returns_mvp_unavailable_message(): runtime = build_runtime(platform=MockPlatformClient()) current_chat_id = "C9" - start = IncomingCommand( - user_id="u1", platform="matrix", chat_id=current_chat_id, command="start" - ) + start = IncomingCommand(user_id="u1", platform="matrix", chat_id=current_chat_id, command="start") await runtime.dispatcher.dispatch(start) settings_cmd = IncomingCommand( @@ -967,43 +587,3 @@ async def test_matrix_main_closes_platform_without_connecting_root_agent(monkeyp agent_connect.assert_not_awaited() platform_close.assert_awaited_once() - - -async def test_matrix_main_registers_media_message_callbacks(monkeypatch): - bot_module = importlib.import_module("adapter.matrix.bot") - - runtime = SimpleNamespace(platform=SimpleNamespace(close=AsyncMock())) - created_clients = [] - - class FakeAsyncClient: - def __init__(self, *args, **kwargs): - self.access_token = None - self.callbacks = [] - self.sync_forever = AsyncMock() - self.close = AsyncMock() - created_clients.append(self) - - async def login(self, *args, **kwargs): - raise AssertionError("login should not be called when access token is provided") - - def add_event_callback(self, callback, event_type): - self.callbacks.append((callback, event_type)) - - monkeypatch.setenv("MATRIX_HOMESERVER", "https://matrix.example.org") - monkeypatch.setenv("MATRIX_USER_ID", "@bot:example.org") - monkeypatch.setenv("MATRIX_ACCESS_TOKEN", "token") - monkeypatch.setattr(bot_module, "AsyncClient", FakeAsyncClient) - monkeypatch.setattr(bot_module, "build_runtime", lambda **kwargs: runtime) - monkeypatch.setattr(bot_module, "prepare_live_sync", AsyncMock(return_value="s123")) - - await bot_module.main() - - assert len(created_clients) == 1 - registered_types = [event_type for _, event_type in created_clients[0].callbacks] - assert ( - RoomMessageText, - RoomMessageFile, - RoomMessageImage, - RoomMessageVideo, - RoomMessageAudio, - ) in registered_types diff --git a/tests/adapter/matrix/test_files.py b/tests/adapter/matrix/test_files.py deleted file mode 100644 index 831ca72..0000000 --- a/tests/adapter/matrix/test_files.py +++ /dev/null @@ -1,50 +0,0 @@ -from __future__ import annotations - -from pathlib import Path -from types import SimpleNamespace - -from adapter.matrix.files import build_workspace_attachment_path, download_matrix_attachment -from core.protocol import Attachment - - -def test_build_workspace_attachment_path_scopes_by_surface_user_and_room(tmp_path: Path): - rel_path, abs_path = build_workspace_attachment_path( - workspace_root=tmp_path, - matrix_user_id="@alice:example.org", - room_id="!room:example.org", - filename="report.pdf", - timestamp="20260420-153000", - ) - - assert ( - rel_path - == "surfaces/matrix/alice_example.org/room_example.org/inbox/20260420-153000-report.pdf" - ) - assert abs_path == tmp_path / rel_path - - -async def test_download_matrix_attachment_persists_file_and_returns_workspace_path(tmp_path: Path): - async def download(url: str): - assert url == "mxc://server/id" - return SimpleNamespace(body=b"%PDF-1.7") - - client = SimpleNamespace(download=download) - attachment = Attachment( - type="document", - url="mxc://server/id", - filename="report.pdf", - mime_type="application/pdf", - ) - - saved = await download_matrix_attachment( - client=client, - workspace_root=tmp_path, - matrix_user_id="@alice:example.org", - room_id="!room:example.org", - attachment=attachment, - timestamp="20260420-153000", - ) - - assert saved.workspace_path is not None - assert saved.workspace_path.endswith("20260420-153000-report.pdf") - assert (tmp_path / saved.workspace_path).read_bytes() == b"%PDF-1.7" diff --git a/tests/adapter/matrix/test_send_outgoing.py b/tests/adapter/matrix/test_send_outgoing.py index 72b9fa6..17eeefa 100644 --- a/tests/adapter/matrix/test_send_outgoing.py +++ b/tests/adapter/matrix/test_send_outgoing.py @@ -9,7 +9,7 @@ from adapter.matrix.handlers.confirm import make_handle_cancel, make_handle_conf from adapter.matrix.store import get_pending_confirm, set_room_meta from core.auth import AuthManager from core.chat import ChatManager -from core.protocol import Attachment, OutgoingMessage, OutgoingUI, UIButton +from core.protocol import OutgoingUI, UIButton from core.settings import SettingsManager from core.store import InMemoryStore from sdk.mock import MockPlatformClient @@ -156,39 +156,3 @@ async def test_outgoing_ui_no_round_trip_uses_user_and_room_scope(): assert "отменено" in result[0].text.lower() assert await get_pending_confirm(store, "@alice:example.org", "!confirm:example.org") is None assert await get_pending_confirm(store, "@bob:example.org", "!other:example.org") is not None - - -async def test_send_outgoing_uploads_workspace_file_attachment(tmp_path, monkeypatch): - workspace_file = tmp_path / "surfaces" / "matrix" / "alice" / "room" / "inbox" / "result.txt" - workspace_file.parent.mkdir(parents=True, exist_ok=True) - workspace_file.write_text("ready") - monkeypatch.setenv("SURFACES_WORKSPACE_DIR", str(tmp_path)) - - client = SimpleNamespace( - upload=AsyncMock(return_value=(SimpleNamespace(content_uri="mxc://server/file"), {})), - room_send=AsyncMock(), - ) - - await send_outgoing( - client, - "!room:example.org", - OutgoingMessage( - chat_id="!room:example.org", - text="Файл готов", - attachments=[ - Attachment( - type="document", - filename="result.txt", - mime_type="text/plain", - workspace_path="surfaces/matrix/alice/room/inbox/result.txt", - ) - ], - ), - ) - - client.upload.assert_awaited_once() - client.room_send.assert_awaited() - assert client.room_send.await_args_list[0].args[2]["body"] == "Файл готов" - file_call = client.room_send.await_args_list[1] - assert file_call.args[2]["msgtype"] == "m.file" - assert file_call.args[2]["url"] == "mxc://server/file" diff --git a/tests/adapter/matrix/test_store.py b/tests/adapter/matrix/test_store.py index dfb0379..9fcd2a2 100644 --- a/tests/adapter/matrix/test_store.py +++ b/tests/adapter/matrix/test_store.py @@ -3,19 +3,14 @@ from __future__ import annotations import pytest from adapter.matrix.store import ( - STAGED_ATTACHMENTS_PREFIX, - add_staged_attachment, clear_pending_confirm, - clear_staged_attachments, get_pending_confirm, get_platform_chat_id, get_room_meta, get_room_state, get_skills_message_id, - get_staged_attachments, get_user_meta, next_chat_id, - remove_staged_attachment_at, set_pending_confirm, set_platform_chat_id, set_room_meta, @@ -121,118 +116,3 @@ async def test_pending_confirm_roundtrip(store: InMemoryStore): await clear_pending_confirm(store, "!room:m.org") assert await get_pending_confirm(store, "!room:m.org") is None - - -async def test_staged_attachments_roundtrip(store: InMemoryStore): - room_id = "!room:m.org" - user_id = "@alice:m.org" - - assert await get_staged_attachments(store, room_id, user_id) == [] - - first = {"id": "att-1", "name": "screenshot.png"} - second = {"id": "att-2", "name": "invoice.pdf"} - - await add_staged_attachment(store, room_id, user_id, first) - await add_staged_attachment(store, room_id, user_id, second) - - assert await get_staged_attachments(store, room_id, user_id) == [ - first, - second, - ] - - -@pytest.mark.parametrize( - "stored_value", - [ - None, - "not-a-dict", - [], - 123, - ], -) -async def test_staged_attachments_invalid_container_state_returns_empty_list( - store: InMemoryStore, stored_value, -): - room_id = "!room:m.org" - user_id = "@alice:m.org" - - await store.set(f"{STAGED_ATTACHMENTS_PREFIX}{room_id}:{user_id}", stored_value) - - assert await get_staged_attachments(store, room_id, user_id) == [] - - -async def test_staged_attachments_filters_invalid_entries(store: InMemoryStore): - room_id = "!room:m.org" - user_id = "@alice:m.org" - valid_one = {"id": "att-1", "name": "alpha.png"} - valid_two = {"id": "att-2", "name": "beta.pdf"} - - await store.set( - f"{STAGED_ATTACHMENTS_PREFIX}{room_id}:{user_id}", - { - "attachments": [ - valid_one, - "bad-entry", - None, - {"id": "ignored"}, - valid_two, - ] - }, - ) - - assert await get_staged_attachments(store, room_id, user_id) == [ - valid_one, - {"id": "ignored"}, - valid_two, - ] - - -async def test_staged_attachments_are_scoped_by_room_and_user(store: InMemoryStore): - room_a = "!room-a:m.org" - room_b = "!room-b:m.org" - user_a = "@alice:m.org" - user_b = "@bob:m.org" - - attachment_a = {"id": "att-a", "name": "alpha.png"} - attachment_b = {"id": "att-b", "name": "beta.png"} - attachment_c = {"id": "att-c", "name": "gamma.png"} - - await add_staged_attachment(store, room_a, user_a, attachment_a) - await add_staged_attachment(store, room_a, user_b, attachment_b) - await add_staged_attachment(store, room_b, user_a, attachment_c) - - assert await get_staged_attachments(store, room_a, user_a) == [attachment_a] - assert await get_staged_attachments(store, room_a, user_b) == [attachment_b] - assert await get_staged_attachments(store, room_b, user_a) == [attachment_c] - assert await get_staged_attachments(store, room_b, user_b) == [] - - -async def test_remove_staged_attachment_at_by_zero_based_index( - store: InMemoryStore, -): - room_id = "!room:m.org" - user_id = "@alice:m.org" - first = {"id": "att-1", "name": "first.png"} - second = {"id": "att-2", "name": "second.png"} - third = {"id": "att-3", "name": "third.png"} - - await add_staged_attachment(store, room_id, user_id, first) - await add_staged_attachment(store, room_id, user_id, second) - await add_staged_attachment(store, room_id, user_id, third) - - assert await remove_staged_attachment_at(store, room_id, user_id, 1) == second - assert await get_staged_attachments(store, room_id, user_id) == [first, third] - assert await remove_staged_attachment_at(store, room_id, user_id, 99) is None - assert await remove_staged_attachment_at(store, room_id, user_id, -1) is None - - -async def test_clear_staged_attachments(store: InMemoryStore): - room_id = "!room:m.org" - user_id = "@alice:m.org" - - await add_staged_attachment(store, room_id, user_id, {"id": "att-1"}) - await add_staged_attachment(store, room_id, user_id, {"id": "att-2"}) - - await clear_staged_attachments(store, room_id, user_id) - - assert await get_staged_attachments(store, room_id, user_id) == [] diff --git a/tests/core/test_dispatcher.py b/tests/core/test_dispatcher.py index fad2a4f..eb437d2 100644 --- a/tests/core/test_dispatcher.py +++ b/tests/core/test_dispatcher.py @@ -75,27 +75,6 @@ async def test_dispatch_routes_audio_before_catchall(dispatcher): assert (await dispatcher.dispatch(text_msg))[0].text == "text" -async def test_dispatch_routes_document_before_catchall(dispatcher): - async def document_handler(event, **kwargs): - return [OutgoingMessage(chat_id=event.chat_id, text="document")] - - async def catch_all(event, **kwargs): - return [OutgoingMessage(chat_id=event.chat_id, text="text")] - - dispatcher.register(IncomingMessage, "document", document_handler) - dispatcher.register(IncomingMessage, "*", catch_all) - - document_msg = IncomingMessage( - user_id="u1", - platform="matrix", - chat_id="C1", - text="", - attachments=[Attachment(type="document", workspace_path="surfaces/matrix/u1/file.pdf")], - ) - - assert (await dispatcher.dispatch(document_msg))[0].text == "document" - - async def test_dispatch_callback_by_action(dispatcher): async def confirm_handler(event, **kwargs): return [OutgoingMessage(chat_id=event.chat_id, text="confirmed")] diff --git a/tests/core/test_integration.py b/tests/core/test_integration.py index fd7bd2e..ab8fc8c 100644 --- a/tests/core/test_integration.py +++ b/tests/core/test_integration.py @@ -23,11 +23,11 @@ from core.protocol import ( class FakeAgentApi: def __init__(self) -> None: - self.calls: list[tuple[str, list[str]]] = [] + self.calls: list[str] = [] self.last_tokens_used = 0 - async def send_message(self, text: str, attachments: list[str] | None = None): - self.calls.append((text, attachments or [])) + async def send_message(self, text: str): + self.calls.append(text) yield type("Chunk", (), {"text": f"[REAL] {text}"})() self.last_tokens_used = 5 @@ -130,31 +130,4 @@ async def test_full_flow_with_real_platform_uses_shared_agent_api(real_dispatche texts = [r.text for r in result if isinstance(r, OutgoingMessage)] assert texts == ["[REAL] Привет!"] - assert agent_api.calls == [("Привет!", [])] - - -async def test_full_flow_with_real_platform_forwards_workspace_attachment(real_dispatcher): - dispatcher, agent_api = real_dispatcher - - start = IncomingCommand(user_id="u1", platform="matrix", chat_id="C1", command="start") - await dispatcher.dispatch(start) - - msg = IncomingMessage( - user_id="u1", - platform="matrix", - chat_id="C1", - text="Посмотри файл", - attachments=[ - Attachment( - type="document", - filename="report.pdf", - mime_type="application/pdf", - workspace_path="surfaces/matrix/u1/room/inbox/report.pdf", - ) - ], - ) - await dispatcher.dispatch(msg) - - assert agent_api.calls == [ - ("Посмотри файл", ["surfaces/matrix/u1/room/inbox/report.pdf"]) - ] + assert agent_api.calls == ["Привет!"] diff --git a/tests/platform/test_real.py b/tests/platform/test_real.py index e5f01e4..6edecbd 100644 --- a/tests/platform/test_real.py +++ b/tests/platform/test_real.py @@ -5,7 +5,7 @@ import pytest from core.protocol import SettingsAction import sdk.agent_api_wrapper as agent_api_wrapper_module from sdk.agent_api_wrapper import AgentApiWrapper -from sdk.interface import Attachment, MessageChunk, MessageResponse, UserSettings +from sdk.interface import MessageChunk, MessageResponse, UserSettings from sdk.prototype_state import PrototypeStateStore from sdk.real import RealPlatformClient @@ -90,100 +90,6 @@ class BlockingChatAgentApi: self.last_tokens_used = len(text) -class AttachmentTrackingChatAgentApi: - def __init__(self, chat_id: str) -> None: - self.chat_id = chat_id - self.calls: list[tuple[str, list[str] | None]] = [] - self.connect_calls = 0 - self.close_calls = 0 - self.last_tokens_used = 0 - - async def connect(self) -> None: - self.connect_calls += 1 - - async def close(self) -> None: - self.close_calls += 1 - - async def send_message(self, text: str, attachments: list[str] | None = None): - self.calls.append((text, attachments)) - yield FakeChunk(text) - self.last_tokens_used = 5 - - -class SendFileEvent: - def __init__(self, *, workspace_path: str, mime_type: str, filename: str, size: int) -> None: - self.type = "AGENT_EVENT_SEND_FILE" - self.workspace_path = workspace_path - self.mime_type = mime_type - self.filename = filename - self.size = size - - -class TextChunkEvent: - def __init__(self, text: str) -> None: - self.type = "AGENT_EVENT_TEXT_CHUNK" - self.text = text - - -class ToolCallChunkEvent: - def __init__(self, payload: str) -> None: - self.type = "AGENT_EVENT_TOOL_CALL_CHUNK" - self.payload = payload - - -class ToolResultEvent: - def __init__(self, payload: str) -> None: - self.type = "AGENT_EVENT_TOOL_RESULT" - self.payload = payload - - -class CustomUpdateEvent: - def __init__(self, payload: str) -> None: - self.type = "AGENT_EVENT_CUSTOM_UPDATE" - self.payload = payload - - -class EndEvent: - def __init__(self, tokens_used: int) -> None: - self.type = "AGENT_EVENT_END" - self.tokens_used = tokens_used - - -class ErrorEvent: - def __init__(self, code: str, details: str) -> None: - self.type = "ERROR" - self.code = code - self.details = details - - -class GracefulDisconnectEvent: - def __init__(self) -> None: - self.type = "GRACEFUL_DISCONNECT" - - -class FakeWSMessage: - def __init__(self, data: str) -> None: - self.type = agent_api_wrapper_module.aiohttp.WSMsgType.TEXT - self.data = data - - -class FakeWebSocket: - def __init__(self, messages: list[FakeWSMessage]) -> None: - self._messages = list(messages) - - def __aiter__(self): - return self - - async def __anext__(self): - if not self._messages: - raise StopAsyncIteration - return self._messages.pop(0) - - -class MessageResponseWithAttachments(MessageResponse): - attachments: list[Attachment] = [] - - def test_agent_api_wrapper_uses_modern_constructor_when_available(monkeypatch): calls: list[dict[str, object]] = [] @@ -313,76 +219,6 @@ async def test_real_platform_client_send_message_uses_chat_bound_client(): assert await prototype_state.get_last_tokens_used_for_context("chat-7") == 3 -@pytest.mark.asyncio -async def test_real_platform_client_forwards_attachments_to_chat_api(): - agent_api = AttachmentTrackingChatAgentApi("chat-7") - client = RealPlatformClient( - agent_api=agent_api, - prototype_state=PrototypeStateStore(), - platform="matrix", - ) - attachment = Attachment( - workspace_path="surfaces/matrix/alice/room/inbox/report.pdf", - mime_type="application/pdf", - filename="report.pdf", - size=123, - ) - - result = await client.send_message( - "@alice:example.org", - "chat-7", - "hello", - attachments=[attachment], - ) - - assert agent_api.calls == [("hello", ["surfaces/matrix/alice/room/inbox/report.pdf"])] - assert result.response == "hello" - assert result.tokens_used == 5 - - -@pytest.mark.asyncio -async def test_real_platform_client_preserves_send_file_events_in_sync_result(monkeypatch): - agent_api = AttachmentTrackingChatAgentApi("chat-7") - client = RealPlatformClient( - agent_api=agent_api, - prototype_state=PrototypeStateStore(), - platform="matrix", - ) - - class FileEventAgentApi(AttachmentTrackingChatAgentApi): - async def send_message(self, text: str, attachments: list[str] | None = None): - self.calls.append((text, attachments)) - yield TextChunkEvent("he") - yield SendFileEvent( - workspace_path="/workspace/report.pdf", - mime_type="application/pdf", - filename="report.pdf", - size=123, - ) - yield TextChunkEvent("llo") - self.last_tokens_used = 9 - - monkeypatch.setattr( - "sdk.real.MessageResponse", - MessageResponseWithAttachments, - ) - client._agent_api = FileEventAgentApi("chat-7") - - result = await client.send_message("@alice:example.org", "chat-7", "hello") - - assert result.response == "hello" - assert result.tokens_used == 9 - assert result.attachments == [ - Attachment( - url="/workspace/report.pdf", - mime_type="application/pdf", - filename="report.pdf", - size=123, - workspace_path="report.pdf", - ) - ] - - @pytest.mark.asyncio async def test_real_platform_client_works_with_legacy_agent_api_without_for_chat(): legacy_api = LegacyAgentApi() @@ -549,85 +385,3 @@ async def test_real_platform_client_settings_are_local(): assert isinstance(settings, UserSettings) assert settings.skills["browser"] is True assert settings.skills["web-search"] is True - - -@pytest.mark.asyncio -async def test_agent_api_wrapper_transparently_surfaces_modern_events(monkeypatch): - callback_events: list[object] = [] - queue: asyncio.Queue = asyncio.Queue() - event_map = { - "text": TextChunkEvent("he"), - "tool_call": ToolCallChunkEvent("call"), - "tool_result": ToolResultEvent("result"), - "custom_update": CustomUpdateEvent("update"), - "send_file": SendFileEvent( - workspace_path="/workspace/report.pdf", - mime_type="application/pdf", - filename="report.pdf", - size=123, - ), - "end": EndEvent(tokens_used=11), - "error": ErrorEvent(code="BOOM", details="bad things"), - "disconnect": GracefulDisconnectEvent(), - } - - def fake_validate_json(data: str): - return event_map[data] - - monkeypatch.setattr( - agent_api_wrapper_module, - "ServerMessage", - type("FakeServerMessage", (), {"validate_json": staticmethod(fake_validate_json)}), - ) - - async def fake_cleanup(self): - return None - - monkeypatch.setattr(agent_api_wrapper_module.AgentApiWrapper, "_cleanup", fake_cleanup) - monkeypatch.setattr( - agent_api_wrapper_module.AgentApi, - "__init__", - lambda self, agent_id, base_url=None, chat_id=0, **kwargs: setattr(self, "id", agent_id) - or setattr(self, "callback", kwargs.get("callback")) - or setattr(self, "on_disconnect", kwargs.get("on_disconnect")) - or setattr(self, "_current_queue", None), - ) - - wrapper = AgentApiWrapper( - agent_id="agent-1", - base_url="https://agent.example.com/v1/agent_ws", - chat_id="chat-1", - callback=callback_events.append, - ) - wrapper._current_queue = queue - wrapper._ws = FakeWebSocket( - [ - FakeWSMessage("text"), - FakeWSMessage("tool_call"), - FakeWSMessage("tool_result"), - FakeWSMessage("custom_update"), - FakeWSMessage("send_file"), - FakeWSMessage("end"), - FakeWSMessage("error"), - FakeWSMessage("disconnect"), - ] - ) - - await wrapper._listen() - - queue_events = [] - while not queue.empty(): - queue_events.append(await queue.get()) - - assert queue_events[0].text == "he" - assert any(isinstance(event, SendFileEvent) for event in queue_events) - assert any(isinstance(event, EndEvent) for event in queue_events) - assert any(isinstance(event, GracefulDisconnectEvent) for event in queue_events) - assert callback_events[0].payload == "call" - assert callback_events[1].payload == "result" - assert callback_events[2].payload == "update" - assert any(isinstance(event, SendFileEvent) for event in callback_events) - assert any(isinstance(event, EndEvent) for event in callback_events) - assert any(isinstance(event, ErrorEvent) for event in callback_events) - assert any(isinstance(event, GracefulDisconnectEvent) for event in callback_events) - assert wrapper.last_tokens_used == 11