From f111ed334888e758244056c503c1de07c66232ec Mon Sep 17 00:00:00 2001 From: Mikhail Putilovskij Date: Mon, 20 Apr 2026 21:37:12 +0300 Subject: [PATCH] feat: add matrix staging list and remove flow --- adapter/matrix/bot.py | 221 ++++++++++++++++- tests/adapter/matrix/test_dispatcher.py | 309 +++++++++++++++++++++++- 2 files changed, 510 insertions(+), 20 deletions(-) diff --git a/adapter/matrix/bot.py b/adapter/matrix/bot.py index 44d7c95..39c1c77 100644 --- a/adapter/matrix/bot.py +++ b/adapter/matrix/bot.py @@ -6,6 +6,7 @@ from dataclasses import dataclass from pathlib import Path import structlog +from dotenv import load_dotenv from nio import ( AsyncClient, AsyncClientConfig, @@ -15,28 +16,38 @@ from nio import ( RoomMessageText, ) from nio.responses import SyncResponse -from dotenv import load_dotenv from adapter.matrix.converter import from_room_event +from adapter.matrix.files import ( + download_matrix_attachment, + matrix_msgtype_for_attachment, + resolve_workspace_attachment_path, +) from adapter.matrix.handlers import register_matrix_handlers +from adapter.matrix.handlers.auth import handle_invite, provision_workspace_chat from adapter.matrix.handlers.context_commands import ( LOAD_PROMPT, ) -from adapter.matrix.handlers.auth import handle_invite, provision_workspace_chat from adapter.matrix.room_router import resolve_chat_id from adapter.matrix.store import ( + add_staged_attachment, clear_load_pending, + clear_staged_attachments, get_load_pending, get_room_meta, + get_staged_attachments, + remove_staged_attachment_at, + set_pending_confirm, set_platform_chat_id, set_room_meta, - set_pending_confirm, ) from core.auth import AuthManager from core.chat import ChatManager from core.handler import EventDispatcher from core.handlers import register_all from core.protocol import ( + IncomingCommand, + IncomingMessage, OutgoingEvent, OutgoingMessage, OutgoingNotification, @@ -197,6 +208,44 @@ class MatrixBot: incoming = from_room_event(event, room_id=room.room_id, chat_id=dispatch_chat_id) if incoming is None: return + if isinstance(incoming, IncomingCommand) and incoming.command in { + "matrix_list_attachments", + "matrix_remove_attachment", + }: + outgoing = await self._handle_staged_attachment_command( + room.room_id, + sender, + incoming, + ) + await self._send_all(room.room_id, outgoing) + return + if self._is_file_only_event(event, incoming): + materialized = await self._materialize_incoming_attachments( + room.room_id, + sender, + incoming, + ) + await self._stage_attachments(room.room_id, sender, materialized.attachments) + await self._send_all( + room.room_id, + [ + OutgoingMessage( + chat_id=dispatch_chat_id, + text=await self._format_staged_attachments( + room.room_id, + sender, + include_hint=True, + ), + ) + ], + ) + return + if isinstance(incoming, IncomingMessage) and incoming.attachments: + incoming = await self._materialize_incoming_attachments( + room.room_id, + sender, + incoming, + ) try: outgoing = await self.runtime.dispatcher.dispatch(incoming) except PlatformError as exc: @@ -210,11 +259,125 @@ class MatrixBot: outgoing = [ OutgoingMessage( chat_id=dispatch_chat_id, - text="Сервис временно недоступен. Попробуйте ещё раз позже." + text="Сервис временно недоступен. Попробуйте ещё раз позже.", ) ] await self._send_all(room.room_id, outgoing) + def _is_file_only_event( + self, event: RoomMessageText, incoming: IncomingMessage | IncomingCommand + ) -> bool: + return ( + isinstance(incoming, IncomingMessage) + and bool(incoming.attachments) + and getattr(event, "msgtype", None) != "m.text" + ) + + async def _stage_attachments( + self, + room_id: str, + user_id: str, + attachments: list, + ) -> None: + for attachment in attachments: + await add_staged_attachment( + self.runtime.store, + room_id, + user_id, + { + "type": attachment.type, + "url": attachment.url, + "filename": attachment.filename, + "mime_type": attachment.mime_type, + "workspace_path": attachment.workspace_path, + }, + ) + + async def _format_staged_attachments( + self, + room_id: str, + user_id: str, + *, + include_hint: bool = False, + ) -> str: + attachments = await get_staged_attachments(self.runtime.store, room_id, user_id) + if not attachments: + return "Нет сохраненных вложений." + + lines = ["Вложения в очереди:"] + for index, attachment in enumerate(attachments, start=1): + lines.append(f"{index}. {attachment.get('filename') or 'attachment'}") + if include_hint: + lines.extend( + [ + "", + "Следующее сообщение отправит файлы агенту.", + "Команды: !list, !remove , !remove all", + ] + ) + return "\n".join(lines) + + async def _handle_staged_attachment_command( + self, + room_id: str, + user_id: str, + incoming: IncomingCommand, + ) -> list[OutgoingEvent]: + if incoming.command == "matrix_list_attachments": + return [ + OutgoingMessage( + chat_id=incoming.chat_id, + text=await self._format_staged_attachments(room_id, user_id), + ) + ] + + arg = incoming.args[0] if incoming.args else "" + if arg == "all": + await clear_staged_attachments(self.runtime.store, room_id, user_id) + return [OutgoingMessage(chat_id=incoming.chat_id, text="Все вложения удалены.")] + + try: + index = int(arg) - 1 + except ValueError: + return [OutgoingMessage(chat_id=incoming.chat_id, text="Нет такого вложения.")] + + removed = await remove_staged_attachment_at(self.runtime.store, room_id, user_id, index) + if removed is None: + return [OutgoingMessage(chat_id=incoming.chat_id, text="Нет такого вложения.")] + return [ + OutgoingMessage( + chat_id=incoming.chat_id, + text=await self._format_staged_attachments(room_id, user_id), + ) + ] + + async def _materialize_incoming_attachments( + self, + room_id: str, + matrix_user_id: str, + incoming: IncomingMessage, + ) -> IncomingMessage: + workspace_root = Path(os.environ.get("SURFACES_WORKSPACE_DIR", "/workspace")) + materialized = [] + for attachment in incoming.attachments: + materialized.append( + await download_matrix_attachment( + client=self.client, + workspace_root=workspace_root, + matrix_user_id=matrix_user_id, + room_id=room_id, + attachment=attachment, + ) + ) + return IncomingMessage( + user_id=incoming.user_id, + platform=incoming.platform, + chat_id=incoming.chat_id, + text=incoming.text, + attachments=materialized, + reply_to=incoming.reply_to, + ) + async def _bootstrap_unregistered_room( self, room: MatrixRoom, @@ -251,11 +414,6 @@ class MatrixBot: f"Привет, {created['user'].display_name or sender}! Пиши — я здесь.\n\n" "Команды: !new · !chats · !rename · !archive · !context · !save · !load · !help" ) - await self.client.room_send( - created["chat_room_id"], - "m.room.message", - {"msgtype": "m.text", "body": welcome}, - ) await set_room_meta( self.runtime.store, room.room_id, @@ -265,12 +423,18 @@ class MatrixBot: "redirect_chat_id": created["chat_id"], }, ) + await self.client.room_send( + created["chat_room_id"], + "m.room.message", + {"msgtype": "m.text", "body": welcome}, + ) return [ OutgoingMessage( chat_id=room.room_id, text=( f"Создал рабочий чат {created['room_name']} ({created['chat_id']}) " - "и добавил его в пространство Lambda. Открой приглашённую комнату для продолжения." + "и добавил его в пространство Lambda. " + "Открой приглашённую комнату для продолжения." ), ) ] @@ -323,7 +487,9 @@ class MatrixBot: except Exception as exc: logger.warning("load_agent_call_failed", error=str(exc)) return [OutgoingMessage(chat_id=room_id, text=f"Ошибка при загрузке: {exc}")] - return [OutgoingMessage(chat_id=room_id, text=f"Запрос на загрузку отправлен агенту: {name}")] + return [ + OutgoingMessage(chat_id=room_id, text=f"Запрос на загрузку отправлен агенту: {name}") + ] async def on_member(self, room: MatrixRoom, event: RoomMemberEvent) -> None: if getattr(event, "sender", None) == self.client.user_id: @@ -351,6 +517,7 @@ async def prepare_live_sync(client: AsyncClient) -> str | None: return response.next_batch return None + async def send_outgoing( client: AsyncClient, room_id: str, @@ -365,7 +532,37 @@ async def send_outgoing( await client.room_send(room_id, "m.room.message", {"msgtype": "m.text", "body": body}) return if isinstance(event, OutgoingMessage): - await client.room_send(room_id, "m.room.message", {"msgtype": "m.text", "body": event.text}) + if event.text: + await client.room_send( + room_id, "m.room.message", {"msgtype": "m.text", "body": event.text} + ) + if event.attachments: + workspace_root = Path(os.environ.get("SURFACES_WORKSPACE_DIR", "/workspace")) + for attachment in event.attachments: + if not attachment.workspace_path: + continue + file_path = resolve_workspace_attachment_path( + workspace_root, attachment.workspace_path + ) + with file_path.open("rb") as handle: + upload_response, _ = await client.upload( + handle, + content_type=attachment.mime_type or "application/octet-stream", + filename=attachment.filename or file_path.name, + filesize=file_path.stat().st_size, + ) + content_uri = getattr(upload_response, "content_uri", None) + if not content_uri: + raise RuntimeError(f"Matrix upload failed for {file_path}") + await client.room_send( + room_id, + "m.room.message", + { + "msgtype": matrix_msgtype_for_attachment(attachment), + "body": attachment.filename or file_path.name, + "url": content_uri, + }, + ) return if isinstance(event, OutgoingUI): lines = [event.text] diff --git a/tests/adapter/matrix/test_dispatcher.py b/tests/adapter/matrix/test_dispatcher.py index 10a4f36..0c92686 100644 --- a/tests/adapter/matrix/test_dispatcher.py +++ b/tests/adapter/matrix/test_dispatcher.py @@ -4,20 +4,29 @@ import importlib from types import SimpleNamespace from unittest.mock import AsyncMock +import pytest from nio.api import RoomVisibility from nio.responses import SyncResponse from adapter.matrix.bot import MatrixBot, build_runtime, prepare_live_sync from adapter.matrix.handlers.auth import handle_invite from adapter.matrix.store import ( + add_staged_attachment, get_platform_chat_id, get_room_meta, + get_staged_attachments, get_user_meta, set_load_pending, set_room_meta, set_user_meta, ) -from core.protocol import IncomingCallback, IncomingCommand, OutgoingMessage +from core.protocol import ( + Attachment, + IncomingCallback, + IncomingCommand, + IncomingMessage, + OutgoingMessage, +) from sdk.interface import PlatformError from sdk.mock import MockPlatformClient from sdk.real import RealPlatformClient @@ -27,7 +36,9 @@ async def test_matrix_dispatcher_registers_custom_handlers(): runtime = build_runtime(platform=MockPlatformClient()) current_chat_id = "C9" - start = IncomingCommand(user_id="u1", platform="matrix", chat_id=current_chat_id, command="start") + start = IncomingCommand( + user_id="u1", platform="matrix", chat_id=current_chat_id, command="start" + ) await runtime.dispatcher.dispatch(start) new = IncomingCommand( @@ -93,7 +104,9 @@ async def test_new_chat_creates_real_matrix_room_when_client_available(): ) client.room_put_state.assert_awaited_once() put_call = client.room_put_state.call_args - assert put_call.kwargs.get("room_id") == "!space:example" or put_call.args[0] == "!space:example" + assert ( + put_call.kwargs.get("room_id") == "!space:example" or put_call.args[0] == "!space:example" + ) chats = await runtime.chat_mgr.list_active("u1") assert [c.chat_id for c in chats] == ["C7"] assert [c.surface_ref for c in chats] == ["!r2:example"] @@ -139,7 +152,10 @@ async def test_invite_event_creates_space_and_chat_room(): client.room_put_state.assert_awaited_once() put_state_call = client.room_put_state.call_args - assert put_state_call.kwargs.get("event_type") == "m.space.child" or put_state_call.args[1] == "m.space.child" + assert ( + put_state_call.kwargs.get("event_type") == "m.space.child" + or put_state_call.args[1] == "m.space.child" + ) user_meta = await get_user_meta(runtime.store, "@alice:example.org") assert user_meta is not None @@ -249,7 +265,10 @@ async def test_bot_assigns_platform_chat_id_for_existing_managed_room(): await bot.on_room_message(room, event) - assert await get_platform_chat_id(runtime.store, "!chat1:example.org") == "matrix:!chat1:example.org" + assert ( + await get_platform_chat_id(runtime.store, "!chat1:example.org") + == "matrix:!chat1:example.org" + ) runtime.dispatcher.dispatch.assert_awaited_once() @@ -278,6 +297,236 @@ async def test_bot_routes_plain_messages_via_platform_chat_id(): assert dispatched.text == "hello" +async def test_bot_downloads_matrix_file_to_workspace_before_staging(tmp_path, monkeypatch): + monkeypatch.setenv("SURFACES_WORKSPACE_DIR", str(tmp_path)) + runtime = build_runtime(platform=MockPlatformClient()) + await set_room_meta( + runtime.store, + "!chat1:example.org", + { + "chat_id": "C1", + "matrix_user_id": "@alice:example.org", + "platform_chat_id": "matrix:ctx-1", + }, + ) + client = SimpleNamespace( + user_id="@bot:example.org", + download=AsyncMock(return_value=SimpleNamespace(body=b"%PDF-1.7")), + ) + bot = MatrixBot(client, runtime) + bot._send_all = AsyncMock() + runtime.dispatcher.dispatch = AsyncMock(return_value=[]) + room = SimpleNamespace(room_id="!chat1:example.org") + event = SimpleNamespace( + sender="@alice:example.org", + body="report.pdf", + msgtype="m.file", + replyto_event_id=None, + url="mxc://server/id", + mimetype="application/pdf", + ) + + await bot.on_room_message(room, event) + + runtime.dispatcher.dispatch.assert_not_awaited() + staged = await get_staged_attachments(runtime.store, "!chat1:example.org", "@alice:example.org") + assert staged[0]["workspace_path"] is not None + assert (tmp_path / staged[0]["workspace_path"]).read_bytes() == b"%PDF-1.7" + bot._send_all.assert_awaited_once() + + +async def test_file_only_event_is_staged_and_does_not_dispatch(): + runtime = build_runtime(platform=MockPlatformClient()) + client = SimpleNamespace(user_id="@bot:example.org", room_send=AsyncMock()) + bot = MatrixBot(client, runtime) + runtime.dispatcher.dispatch = AsyncMock(return_value=[]) + bot._materialize_incoming_attachments = AsyncMock( + return_value=IncomingMessage( + user_id="@alice:example.org", + platform="matrix", + chat_id="!r:example.org", + text="", + attachments=[ + Attachment( + type="document", + filename="report.pdf", + workspace_path="surfaces/matrix/alice/r/inbox/report.pdf", + mime_type="application/pdf", + ) + ], + ) + ) + room = SimpleNamespace(room_id="!r:example.org") + event = SimpleNamespace( + sender="@alice:example.org", + body="report.pdf", + msgtype="m.file", + url="mxc://hs/id", + mimetype="application/pdf", + replyto_event_id=None, + ) + + await bot.on_room_message(room, event) + + runtime.dispatcher.dispatch.assert_not_awaited() + staged = await get_staged_attachments(runtime.store, "!r:example.org", "@alice:example.org") + assert [item["filename"] for item in staged] == ["report.pdf"] + client.room_send.assert_awaited_once() + assert ( + "Следующее сообщение отправит файлы агенту." in client.room_send.await_args.args[2]["body"] + ) + + +async def test_list_command_returns_current_staged_attachments(): + runtime = build_runtime(platform=MockPlatformClient()) + await add_staged_attachment( + runtime.store, + "!r:example.org", + "@alice:example.org", + {"filename": "a.pdf", "workspace_path": "a.pdf"}, + ) + await add_staged_attachment( + runtime.store, + "!r:example.org", + "@alice:example.org", + {"filename": "b.pdf", "workspace_path": "b.pdf"}, + ) + client = SimpleNamespace(user_id="@bot:example.org", room_send=AsyncMock()) + bot = MatrixBot(client, runtime) + runtime.dispatcher.dispatch = AsyncMock(return_value=[]) + room = SimpleNamespace(room_id="!r:example.org") + event = SimpleNamespace( + sender="@alice:example.org", body="!list", msgtype="m.text", replyto_event_id=None + ) + + await bot.on_room_message(room, event) + + runtime.dispatcher.dispatch.assert_not_awaited() + body = client.room_send.await_args.args[2]["body"] + assert "1. a.pdf" in body + assert "2. b.pdf" in body + + +async def test_remove_invalid_index_returns_short_error(): + runtime = build_runtime(platform=MockPlatformClient()) + await add_staged_attachment( + runtime.store, + "!r:example.org", + "@alice:example.org", + {"filename": "a.pdf", "workspace_path": "a.pdf"}, + ) + client = SimpleNamespace(user_id="@bot:example.org", room_send=AsyncMock()) + bot = MatrixBot(client, runtime) + runtime.dispatcher.dispatch = AsyncMock(return_value=[]) + room = SimpleNamespace(room_id="!r:example.org") + event = SimpleNamespace( + sender="@alice:example.org", body="!remove 9", msgtype="m.text", replyto_event_id=None + ) + + await bot.on_room_message(room, event) + + runtime.dispatcher.dispatch.assert_not_awaited() + assert client.room_send.await_args.args[2]["body"] == "Нет такого вложения." + + +async def test_remove_attachment_updates_list_and_state(): + runtime = build_runtime(platform=MockPlatformClient()) + await add_staged_attachment( + runtime.store, + "!r:example.org", + "@alice:example.org", + {"filename": "a.pdf", "workspace_path": "a.pdf"}, + ) + await add_staged_attachment( + runtime.store, + "!r:example.org", + "@alice:example.org", + {"filename": "b.pdf", "workspace_path": "b.pdf"}, + ) + client = SimpleNamespace(user_id="@bot:example.org", room_send=AsyncMock()) + bot = MatrixBot(client, runtime) + runtime.dispatcher.dispatch = AsyncMock(return_value=[]) + room = SimpleNamespace(room_id="!r:example.org") + event = SimpleNamespace( + sender="@alice:example.org", body="!remove 1", msgtype="m.text", replyto_event_id=None + ) + + await bot.on_room_message(room, event) + + runtime.dispatcher.dispatch.assert_not_awaited() + staged = await get_staged_attachments(runtime.store, "!r:example.org", "@alice:example.org") + assert [item["filename"] for item in staged] == ["b.pdf"] + body = client.room_send.await_args.args[2]["body"] + assert "1. b.pdf" in body + assert "a.pdf" not in body + + +async def test_remove_all_clears_state(): + runtime = build_runtime(platform=MockPlatformClient()) + await add_staged_attachment( + runtime.store, + "!r:example.org", + "@alice:example.org", + {"filename": "a.pdf", "workspace_path": "a.pdf"}, + ) + client = SimpleNamespace(user_id="@bot:example.org", room_send=AsyncMock()) + bot = MatrixBot(client, runtime) + runtime.dispatcher.dispatch = AsyncMock(return_value=[]) + room = SimpleNamespace(room_id="!r:example.org") + event = SimpleNamespace( + sender="@alice:example.org", + body="!remove all", + msgtype="m.text", + replyto_event_id=None, + ) + + await bot.on_room_message(room, event) + + runtime.dispatcher.dispatch.assert_not_awaited() + assert await get_staged_attachments(runtime.store, "!r:example.org", "@alice:example.org") == [] + assert client.room_send.await_args.args[2]["body"] == "Все вложения удалены." + + +async def test_staged_attachment_commands_are_scoped_by_room_and_user(): + runtime = build_runtime(platform=MockPlatformClient()) + await add_staged_attachment( + runtime.store, + "!r-one:example.org", + "@alice:example.org", + {"filename": "alice-room-one.pdf", "workspace_path": "alice-room-one.pdf"}, + ) + await add_staged_attachment( + runtime.store, + "!r-two:example.org", + "@alice:example.org", + {"filename": "alice-room-two.pdf", "workspace_path": "alice-room-two.pdf"}, + ) + await add_staged_attachment( + runtime.store, + "!r-one:example.org", + "@bob:example.org", + {"filename": "bob-room-one.pdf", "workspace_path": "bob-room-one.pdf"}, + ) + client = SimpleNamespace(user_id="@bot:example.org", room_send=AsyncMock()) + bot = MatrixBot(client, runtime) + runtime.dispatcher.dispatch = AsyncMock(return_value=[]) + room = SimpleNamespace(room_id="!r-one:example.org") + event = SimpleNamespace( + sender="@alice:example.org", + body="!list", + msgtype="m.text", + replyto_event_id=None, + ) + + await bot.on_room_message(room, event) + + runtime.dispatcher.dispatch.assert_not_awaited() + body = client.room_send.await_args.args[2]["body"] + assert "alice-room-one.pdf" in body + assert "alice-room-two.pdf" not in body + assert "bob-room-one.pdf" not in body + + async def test_bot_keeps_commands_on_local_chat_id(): runtime = build_runtime(platform=MockPlatformClient()) await set_room_meta( @@ -350,7 +599,10 @@ async def test_bot_assigns_platform_chat_id_before_load_selection(): await bot.on_room_message(room, event) - assert await get_platform_chat_id(runtime.store, "!chat1:example.org") == "matrix:!chat1:example.org" + assert ( + await get_platform_chat_id(runtime.store, "!chat1:example.org") + == "matrix:!chat1:example.org" + ) client.room_send.assert_awaited_once_with( "!chat1:example.org", "m.room.message", @@ -415,7 +667,9 @@ async def test_unregistered_room_second_message_reuses_existing_bootstrap(): room = SimpleNamespace(room_id="!entry:example.org", display_name="Entry") await bot.on_room_message(room, SimpleNamespace(sender="@alice:example.org", body="hello")) - await bot.on_room_message(room, SimpleNamespace(sender="@alice:example.org", body="hello again")) + await bot.on_room_message( + room, SimpleNamespace(sender="@alice:example.org", body="hello again") + ) assert client.room_create.await_count == 2 room_send_calls = client.room_send.await_args_list @@ -430,6 +684,43 @@ async def test_unregistered_room_second_message_reuses_existing_bootstrap(): assert "platform_chat_id" not in entry_meta +async def test_unregistered_room_welcome_send_failure_does_not_repeat_bootstrap(): + runtime = build_runtime(platform=MockPlatformClient()) + await set_user_meta(runtime.store, "@alice:example.org", {"next_chat_index": 1}) + space_resp = SimpleNamespace(room_id="!space:example.org") + chat_resp = SimpleNamespace(room_id="!chat1:example.org") + client = SimpleNamespace( + user_id="@bot:example.org", + room_create=AsyncMock(side_effect=[space_resp, chat_resp]), + room_put_state=AsyncMock(), + room_send=AsyncMock(side_effect=[RuntimeError("welcome failed"), None]), + ) + bot = MatrixBot(client, runtime) + room = SimpleNamespace(room_id="!entry:example.org", display_name="Entry") + + with pytest.raises(RuntimeError, match="welcome failed"): + await bot.on_room_message(room, SimpleNamespace(sender="@alice:example.org", body="hello")) + + entry_meta = await get_room_meta(runtime.store, "!entry:example.org") + assert entry_meta == { + "matrix_user_id": "@alice:example.org", + "redirect_room_id": "!chat1:example.org", + "redirect_chat_id": "C1", + } + + await bot.on_room_message( + room, SimpleNamespace(sender="@alice:example.org", body="hello again") + ) + + assert client.room_create.await_count == 2 + room_send_calls = client.room_send.await_args_list + assert any( + call.args[0] == "!entry:example.org" + and "Рабочий чат уже создан: C1" in call.args[2]["body"] + for call in room_send_calls + ) + + async def test_unregistered_room_creates_new_chat_in_existing_space(): runtime = build_runtime(platform=MockPlatformClient()) await set_user_meta( @@ -466,7 +757,9 @@ async def test_mat11_settings_returns_mvp_unavailable_message(): runtime = build_runtime(platform=MockPlatformClient()) current_chat_id = "C9" - start = IncomingCommand(user_id="u1", platform="matrix", chat_id=current_chat_id, command="start") + start = IncomingCommand( + user_id="u1", platform="matrix", chat_id=current_chat_id, command="start" + ) await runtime.dispatcher.dispatch(start) settings_cmd = IncomingCommand(