From 8b04fcaf77c52571f6ca31dfd76534a48554e034 Mon Sep 17 00:00:00 2001 From: Mikhail Putilovskij Date: Mon, 20 Apr 2026 15:04:20 +0300 Subject: [PATCH 1/7] docs: add matrix shared workspace file flow design --- ...atrix-shared-workspace-file-flow-design.md | 252 ++++++++++++++++++ 1 file changed, 252 insertions(+) create mode 100644 docs/superpowers/specs/2026-04-20-matrix-shared-workspace-file-flow-design.md diff --git a/docs/superpowers/specs/2026-04-20-matrix-shared-workspace-file-flow-design.md b/docs/superpowers/specs/2026-04-20-matrix-shared-workspace-file-flow-design.md new file mode 100644 index 0000000..feca84c --- /dev/null +++ b/docs/superpowers/specs/2026-04-20-matrix-shared-workspace-file-flow-design.md @@ -0,0 +1,252 @@ +# Matrix Shared Workspace File Flow Design + +## Goal + +Bring the Matrix surface and `platform-agent` to a single file-handling model that matches the current platform runtime contract as closely as possible. + +The result should be: + +- Matrix receives user files and makes them visible to the agent through a shared `/workspace` +- `platform-agent` receives attachment paths, not ad hoc summaries or inline payloads +- the agent can send files back to the user through the surface via `send_file` +- local development and the default deployment path use the same storage contract + +## Core Decision + +The selected architecture is: + +`Matrix surface <-> shared /workspace <-> platform-agent` + +This means: + +- the Matrix bot is responsible for downloading incoming Matrix media +- downloaded files are written into the same filesystem mounted into `platform-agent` +- the surface passes relative workspace paths to the agent as `attachments` +- the agent returns files to the user by emitting `MsgEventSendFile(path=...)` + +This is the current platform-native direction and does not require new platform endpoints. + +## Why This Decision + +The current upstream platform changes already define the file contract: + +- `MsgUserMessage.attachments` is `list[str]` +- each attachment is a path relative to `/workspace` +- the agent validates those paths against its configured backend root +- the agent can emit `send_file(path)` back to the client + +That is not an upload API and not a remote blob contract. It is an explicit shared-workspace contract. + +Trying to preserve the current separate-process launch model would force the surface to fake production behavior with inline text extraction, out-of-band path rewriting, or a future upload API that does not exist yet. That would increase the gap between our runtime and the platform runtime instead of reducing it. + +## Scope + +This design covers: + +- shared workspace runtime for Matrix bot and `platform-agent` +- incoming Matrix file handling into shared storage +- attachment path propagation to `RealPlatformClient` and `AgentApi` +- outbound file delivery from agent to Matrix user +- local compose/dev workflow and README updates + +This design does not cover: + +- Telegram file flow +- encrypted Matrix media handling +- upload APIs on the platform side +- OCR, PDF parsing, or content extraction pipelines +- long-term object storage or file lifecycle policies beyond basic cleanup boundaries + +## Runtime Contract + +### Shared filesystem + +Both containers must mount the same directory at `/workspace`. + +Requirements: + +- the Matrix bot can create files under `/workspace` +- `platform-agent` sees the same files at the same relative paths +- agent-originated files written under `/workspace` are readable by the Matrix bot + +The contract is path-based, not URL-based. + +### Attachment path format + +The surface sends attachments to the agent as relative workspace paths, for example: + +- `surfaces/matrix///inbox/20260420-153000-report.pdf` +- `surfaces/matrix///inbox/20260420-153200-photo.jpg` + +Rules: + +- paths must be relative to `/workspace` +- paths must be normalized before sending to the agent +- surface-owned uploads must live under a dedicated namespace to avoid collisions with agent-created files + +## Data Flow + +### Incoming file from Matrix user + +1. Matrix receives `m.file`, `m.image`, `m.audio`, or `m.video`. +2. The Matrix bot resolves the target room and platform chat context as usual. +3. The Matrix bot downloads the media from Matrix. +4. The file is stored under `/workspace/surfaces/matrix/.../inbox/...`. +5. The outgoing platform call includes: + - original user text + - `attachments=[relative_path_1, ...]` +6. `platform-agent` validates that those files exist and exposes them to the agent through the upstream attachment mechanism. + +Important detail: + +- the surface should not rewrite the user message into a synthetic file summary unless the message body is empty +- when body is empty, the surface may send a minimal synthetic text such as `User sent one or more attachments.` + +### Outbound file from agent to Matrix user + +1. The agent uses `send_file(path)`. +2. `platform-agent` emits `MsgEventSendFile(path=...)`. +3. The Matrix integration catches that event. +4. The Matrix bot resolves the file inside shared `/workspace`. +5. The Matrix bot uploads the file to Matrix and sends the appropriate media message to the room. + +Surface behavior: + +- if MIME type and extension are known, send the closest native Matrix media type +- otherwise send as `m.file` +- user-visible failures must be explicit if the referenced file does not exist or cannot be uploaded + +## Filesystem Layout + +The Matrix surface owns a dedicated subtree: + +```text +/workspace/ + surfaces/ + matrix/ + / + / + inbox/ + 20260420-153000-report.pdf +``` + +Design constraints: + +- sanitize user ids and room ids before using them as path components +- preserve the original filename in the final basename where possible +- prefix filenames with a timestamp or unique id to avoid collisions + +This layout is intentionally surface-scoped. The agent may read these files, but the surface remains the owner of how inbound messenger files are organized. + +## Components + +### Matrix attachment storage helper + +Add a focused helper module responsible for: + +- building stable workspace-relative paths +- sanitizing path components +- downloading Matrix media into `/workspace` +- returning attachment metadata needed by the platform layer + +This helper should not know about agent transport details beyond the final relative path output. + +### Real platform client + +`RealPlatformClient` must pass attachment relative paths through to `AgentApi.send_message(...)`. + +It must also surface non-text agent events needed by the Matrix adapter, especially `MsgEventSendFile`. + +### Agent API wrapper + +`AgentApiWrapper` must be compatible with the modern upstream protocol: + +- `/v1/agent_ws/{chat_id}/` +- `attachments` on outgoing user messages +- `MsgEventToolCallChunk` +- `MsgEventToolResult` +- `MsgEventCustomUpdate` +- `MsgEventSendFile` +- `MsgEventEnd` + +### Matrix bot outbound renderer + +The Matrix adapter must support sending files back to the room. + +At minimum it needs: + +- path resolution inside shared workspace +- Matrix upload of the local file +- send of an `m.file` or native media event with filename and MIME type + +## Deployment Changes + +### Compose + +The repository root `docker-compose.yml` becomes the primary prod-like local runtime. + +It should define at least: + +- `matrix-bot` +- `platform-agent` +- one shared volume mounted as `/workspace` into both services + +The default developer workflow should stop describing `platform-agent` as a separately started side process. + +### Environment + +The Matrix bot must connect to the in-compose `platform-agent` service by service name, not by assuming a separately launched localhost process. + +The agent WebSocket configuration in docs and examples must match the modern upstream route. + +## Error Handling + +### Incoming files + +If the Matrix bot cannot download or persist the file: + +- do not send a broken attachment path to the agent +- return a user-visible error in the room +- log the Matrix event id, room id, and failure reason + +### Outbound files + +If the agent asks to send a missing file: + +- log a structured warning with the requested path +- send a user-visible message that the file could not be delivered + +### Shared workspace mismatch + +If the runtime is misconfigured and `/workspace` is not actually shared: + +- inbound attachments will fail agent-side path validation +- outbound `send_file` will fail surface-side file resolution + +The implementation should make such failures obvious in logs rather than silently degrading to text-only behavior. + +## Testing + +The implementation must cover: + +- Matrix media download writes into the expected workspace-relative path +- `RealPlatformClient` forwards attachment relative paths to the agent API +- Matrix plain messages with attachments preserve the original text while adding attachment paths +- empty-body attachment-only messages produce the synthetic text fallback +- `AgentApiWrapper` accepts `MsgEventSendFile` without treating it as unknown +- Matrix outbound file handling converts `MsgEventSendFile` into a Matrix upload/send call +- compose configuration mounts the same workspace into both containers + +## Non-Goals + +- no inline text extraction MVP +- no temporary URL-passing contract to the agent +- no fake “prod” mode with separate local filesystems +- no platform API additions in this phase + +## Success Criteria + +- the default local runtime uses a shared `/workspace` +- a user can send a file in Matrix and the agent receives it through upstream `attachments` +- the agent can emit `send_file(path)` and the Matrix user receives the file in the same room +- our runtime behavior matches the current platform contract closely enough that moving from local compose to production does not require redesigning file flow From 105ecc68ed14a84b233f67c4dc14bc95c3a239fd Mon Sep 17 00:00:00 2001 From: Mikhail Putilovskij Date: Mon, 20 Apr 2026 16:05:28 +0300 Subject: [PATCH 2/7] docs: add matrix staged attachments design --- ...-04-20-matrix-staged-attachments-design.md | 262 ++++++++++++++++++ 1 file changed, 262 insertions(+) create mode 100644 docs/superpowers/specs/2026-04-20-matrix-staged-attachments-design.md diff --git a/docs/superpowers/specs/2026-04-20-matrix-staged-attachments-design.md b/docs/superpowers/specs/2026-04-20-matrix-staged-attachments-design.md new file mode 100644 index 0000000..ae8a11a --- /dev/null +++ b/docs/superpowers/specs/2026-04-20-matrix-staged-attachments-design.md @@ -0,0 +1,262 @@ +# Matrix Staged Attachments Design + +## Goal + +Make file sending in the Matrix surface usable for an AI agent despite current Matrix client behavior, especially in Element where media is often sent immediately as separate events without a shared text composer. + +The result should be: + +- files can arrive before the user writes the actual instruction +- the surface stages those files instead of immediately sending them to the agent +- the next normal user message in the same chat commits all staged files as one agent turn +- the user can inspect and remove staged files with short chat commands + +## Core Decision + +The selected UX model is: + +`incoming Matrix media -> staged attachments for (chat_id, user_id) -> next normal message commits them` + +This means: + +- attachment-only events do not immediately invoke the agent +- the bot acknowledges staged files with a service message +- the next normal user message sends text plus all currently staged files to the agent +- staged files are then cleared + +## Why This Decision + +Matrix natively models messages as separate events, and common clients do not provide a reliable "one text message with many attachments" composer flow. + +In practice this causes two UX failures for an AI bot: + +- users may send files first and only then write the task +- users may send multiple files as multiple independent Matrix events + +If the surface treats each incoming file as a full agent turn, the bot becomes noisy and context-fragmented. If it ignores file-only messages, file handling feels broken. + +Staging is the smallest surface-side abstraction that fixes both problems without fighting the Matrix event model. + +## Scope + +This design covers: + +- staging inbound Matrix attachments before agent submission +- per-chat attachment state for a specific user +- user-facing service messages for staged attachments +- short commands for listing and removing staged files +- commit behavior on the next normal message + +This design does not cover: + +- edits or redactions of original Matrix media events as attachment controls +- cross-surface shared staging +- thread-aware staging beyond the existing `chat_id` boundary +- changes to the platform attachment contract + +## State Model + +### Staging key + +Staged attachments are isolated by: + +- `chat_id` +- `user_id` + +This means: + +- files staged by a user in one chat never appear in another chat +- files staged by one user do not mix with another user's files in the same room + +### Staged attachment record + +Each staged attachment must track at least: + +- stable internal id +- display filename +- workspace-relative path +- MIME type if known +- created timestamp + +User-visible commands operate on the current ordered list, not on internal ids. + +### Lifecycle + +A staged attachment is in exactly one of these states: + +1. `staged` +2. `committed` +3. `removed` + +Rules: + +- only `staged` attachments appear in `!list` +- `committed` attachments are no longer user-removable +- `removed` attachments are excluded from future commits + +## Inbound Behavior + +### Attachment-only event + +If the Matrix surface receives one or more file/media events from a user without a normal text message to commit them: + +1. download each file into shared `/workspace` +2. add each file to the staged set for `(chat_id, user_id)` +3. do not call the agent yet +4. send a service acknowledgment message + +### Service acknowledgment + +The service message must communicate: + +- the current staged attachment list with indices +- that the next normal message will be sent to the agent together with those files +- available commands: `!list`, `!remove `, `!remove all` + +Example shape: + +```text +Staged attachments: +1. screenshot.png +2. invoice.pdf + +Your next message will be sent to the agent with these files. +Commands: !list, !remove , !remove all +``` + +### Burst handling + +Matrix clients may send multiple files as separate consecutive events. + +To avoid bot spam, service acknowledgments should be debounced over a short window and aggregated into one reply where feasible. + +The acknowledgment must reflect the full current staged set, not only the most recently received file. + +## Commit Behavior + +### Commit trigger + +The commit trigger is: + +- the next normal user message in the same `(chat_id, user_id)` scope + +Normal user message means: + +- not a staging control command +- not a pure attachment event being staged + +### Commit action + +When a commit-triggering message arrives: + +1. collect all currently staged attachments for `(chat_id, user_id)` +2. send the user text plus those attachments to the agent as one turn +3. mark all included staged attachments as `committed` +4. clear the staged set + +After commit: + +- the just-sent attachments must no longer appear in `!list` +- a later file upload starts a new staged set + +## Commands + +### `!list` + +Shows the current staged attachment list for the user in the current chat. + +If the list is empty, the response should be short and explicit. + +### `!remove ` + +Removes the staged attachment at the current 1-based index. + +Behavior: + +- if the index is valid, remove that staged attachment and return the updated staged list +- if the index is invalid, return a short error without repeating the list + +### `!remove all` + +Clears the entire staged set for the user in the current chat. + +The response should be short and explicit. + +## Ordering Rules + +The staged list is ordered by staging time. + +User-facing indices: + +- are 1-based +- are recalculated from the current staged set +- may change after removals + +Therefore: + +- `!list` always shows the current authoritative numbering +- after a successful `!remove `, the bot should reply with the refreshed list + +## Error Handling + +### Download failure + +If a file cannot be downloaded or stored: + +- do not add it to the staged set +- do not pretend it will be sent later +- send a short user-visible failure message + +### Invalid command + +If the command is malformed or uses an invalid index: + +- return a short error +- do not commit staged attachments +- do not clear the staged set + +### Agent submission failure + +If commit fails when sending the text plus staged files to the agent: + +- staged attachments must remain available for retry unless the failure is known to be irreversible +- the user-visible error should make it clear that the files were not consumed + +This prevents silent loss of staged context. + +## Interaction with Shared Workspace Design + +This design assumes the shared-workspace contract defined in +[2026-04-20-matrix-shared-workspace-file-flow-design.md](/Users/a/MAI/sem2/lambda/surfaces-bot/docs/superpowers/specs/2026-04-20-matrix-shared-workspace-file-flow-design.md). + +Specifically: + +- staged files are stored in shared `/workspace` +- the final commit still passes workspace-relative paths to `platform-agent` +- staging changes only when the surface chooses to invoke the agent, not how attachments are represented + +## Testing + +The implementation must cover: + +- file-only Matrix events are staged and do not immediately invoke the agent +- service acknowledgment includes staged filenames and command hints +- `!list` returns the current staged set for the correct `(chat_id, user_id)` +- `!remove ` removes the correct staged attachment and refreshes numbering +- `!remove all` clears the staged set +- invalid `!remove ` returns a short error and keeps state unchanged +- the next normal message commits all staged attachments with the text as one agent turn +- committed attachments disappear from staging after success +- failed commits preserve staged attachments +- staging in one chat does not leak into another chat +- staging for one user does not leak to another user in the same room + +## Non-Goals + +This design intentionally does not attempt to: + +- emulate Telegram-style albums in Matrix +- rely on special support from Element or other Matrix clients +- introduce a rich interactive attachment management UI + +The goal is a reliable chat-native workflow that works within Matrix's actual event model. From 0eaf124e21a3ad5a7b80d3931373f1e0e7d77573 Mon Sep 17 00:00:00 2001 From: Mikhail Putilovskij Date: Mon, 20 Apr 2026 16:21:00 +0300 Subject: [PATCH 3/7] feat: add matrix staged attachment state --- adapter/matrix/store.py | 68 ++++++++++++++++ tests/adapter/matrix/test_store.py | 120 +++++++++++++++++++++++++++++ 2 files changed, 188 insertions(+) diff --git a/adapter/matrix/store.py b/adapter/matrix/store.py index 5ebb61a..acafa9f 100644 --- a/adapter/matrix/store.py +++ b/adapter/matrix/store.py @@ -1,5 +1,8 @@ from __future__ import annotations +import asyncio +from weakref import WeakValueDictionary + from core.store import StateStore ROOM_META_PREFIX = "matrix_room:" @@ -9,6 +12,8 @@ SKILLS_MSG_PREFIX = "matrix_skills_msg:" PENDING_CONFIRM_PREFIX = "matrix_pending_confirm:" LOAD_PENDING_PREFIX = "matrix_load_pending:" RESET_PENDING_PREFIX = "matrix_reset_pending:" +STAGED_ATTACHMENTS_PREFIX = "matrix_staged_attachments:" +_STAGED_ATTACHMENTS_LOCKS: WeakValueDictionary[str, asyncio.Lock] = WeakValueDictionary() async def get_room_meta(store: StateStore, room_id: str) -> dict | None: @@ -126,3 +131,66 @@ async def set_reset_pending( async def clear_reset_pending(store: StateStore, user_id: str, room_id: str) -> None: await store.delete(_reset_pending_key(user_id, room_id)) + + +def _staged_attachments_key(room_id: str, user_id: str) -> str: + return f"{STAGED_ATTACHMENTS_PREFIX}{room_id}:{user_id}" + + +def _staged_attachments_lock(room_id: str, user_id: str) -> asyncio.Lock: + key = _staged_attachments_key(room_id, user_id) + lock = _STAGED_ATTACHMENTS_LOCKS.get(key) + if lock is None: + lock = asyncio.Lock() + _STAGED_ATTACHMENTS_LOCKS[key] = lock + return lock + + +async def get_staged_attachments( + store: StateStore, room_id: str, user_id: str +) -> list[dict]: + data = await store.get(_staged_attachments_key(room_id, user_id)) + if not isinstance(data, dict): + return [] + + attachments = data.get("attachments") + if not isinstance(attachments, list): + return [] + + return [attachment for attachment in attachments if isinstance(attachment, dict)] + + +async def add_staged_attachment( + store: StateStore, room_id: str, user_id: str, attachment: dict +) -> None: + async with _staged_attachments_lock(room_id, user_id): + attachments = await get_staged_attachments(store, room_id, user_id) + attachments.append(attachment) + await store.set( + _staged_attachments_key(room_id, user_id), {"attachments": attachments} + ) + + +async def remove_staged_attachment_at( + store: StateStore, room_id: str, user_id: str, index: int +) -> dict | None: + async with _staged_attachments_lock(room_id, user_id): + attachments = await get_staged_attachments(store, room_id, user_id) + if index < 0 or index >= len(attachments): + return None + + removed = attachments.pop(index) + if attachments: + await store.set( + _staged_attachments_key(room_id, user_id), {"attachments": attachments} + ) + else: + await store.delete(_staged_attachments_key(room_id, user_id)) + return removed + + +async def clear_staged_attachments( + store: StateStore, room_id: str, user_id: str +) -> None: + async with _staged_attachments_lock(room_id, user_id): + await store.delete(_staged_attachments_key(room_id, user_id)) diff --git a/tests/adapter/matrix/test_store.py b/tests/adapter/matrix/test_store.py index 9fcd2a2..dfb0379 100644 --- a/tests/adapter/matrix/test_store.py +++ b/tests/adapter/matrix/test_store.py @@ -3,14 +3,19 @@ from __future__ import annotations import pytest from adapter.matrix.store import ( + STAGED_ATTACHMENTS_PREFIX, + add_staged_attachment, clear_pending_confirm, + clear_staged_attachments, get_pending_confirm, get_platform_chat_id, get_room_meta, get_room_state, get_skills_message_id, + get_staged_attachments, get_user_meta, next_chat_id, + remove_staged_attachment_at, set_pending_confirm, set_platform_chat_id, set_room_meta, @@ -116,3 +121,118 @@ async def test_pending_confirm_roundtrip(store: InMemoryStore): await clear_pending_confirm(store, "!room:m.org") assert await get_pending_confirm(store, "!room:m.org") is None + + +async def test_staged_attachments_roundtrip(store: InMemoryStore): + room_id = "!room:m.org" + user_id = "@alice:m.org" + + assert await get_staged_attachments(store, room_id, user_id) == [] + + first = {"id": "att-1", "name": "screenshot.png"} + second = {"id": "att-2", "name": "invoice.pdf"} + + await add_staged_attachment(store, room_id, user_id, first) + await add_staged_attachment(store, room_id, user_id, second) + + assert await get_staged_attachments(store, room_id, user_id) == [ + first, + second, + ] + + +@pytest.mark.parametrize( + "stored_value", + [ + None, + "not-a-dict", + [], + 123, + ], +) +async def test_staged_attachments_invalid_container_state_returns_empty_list( + store: InMemoryStore, stored_value, +): + room_id = "!room:m.org" + user_id = "@alice:m.org" + + await store.set(f"{STAGED_ATTACHMENTS_PREFIX}{room_id}:{user_id}", stored_value) + + assert await get_staged_attachments(store, room_id, user_id) == [] + + +async def test_staged_attachments_filters_invalid_entries(store: InMemoryStore): + room_id = "!room:m.org" + user_id = "@alice:m.org" + valid_one = {"id": "att-1", "name": "alpha.png"} + valid_two = {"id": "att-2", "name": "beta.pdf"} + + await store.set( + f"{STAGED_ATTACHMENTS_PREFIX}{room_id}:{user_id}", + { + "attachments": [ + valid_one, + "bad-entry", + None, + {"id": "ignored"}, + valid_two, + ] + }, + ) + + assert await get_staged_attachments(store, room_id, user_id) == [ + valid_one, + {"id": "ignored"}, + valid_two, + ] + + +async def test_staged_attachments_are_scoped_by_room_and_user(store: InMemoryStore): + room_a = "!room-a:m.org" + room_b = "!room-b:m.org" + user_a = "@alice:m.org" + user_b = "@bob:m.org" + + attachment_a = {"id": "att-a", "name": "alpha.png"} + attachment_b = {"id": "att-b", "name": "beta.png"} + attachment_c = {"id": "att-c", "name": "gamma.png"} + + await add_staged_attachment(store, room_a, user_a, attachment_a) + await add_staged_attachment(store, room_a, user_b, attachment_b) + await add_staged_attachment(store, room_b, user_a, attachment_c) + + assert await get_staged_attachments(store, room_a, user_a) == [attachment_a] + assert await get_staged_attachments(store, room_a, user_b) == [attachment_b] + assert await get_staged_attachments(store, room_b, user_a) == [attachment_c] + assert await get_staged_attachments(store, room_b, user_b) == [] + + +async def test_remove_staged_attachment_at_by_zero_based_index( + store: InMemoryStore, +): + room_id = "!room:m.org" + user_id = "@alice:m.org" + first = {"id": "att-1", "name": "first.png"} + second = {"id": "att-2", "name": "second.png"} + third = {"id": "att-3", "name": "third.png"} + + await add_staged_attachment(store, room_id, user_id, first) + await add_staged_attachment(store, room_id, user_id, second) + await add_staged_attachment(store, room_id, user_id, third) + + assert await remove_staged_attachment_at(store, room_id, user_id, 1) == second + assert await get_staged_attachments(store, room_id, user_id) == [first, third] + assert await remove_staged_attachment_at(store, room_id, user_id, 99) is None + assert await remove_staged_attachment_at(store, room_id, user_id, -1) is None + + +async def test_clear_staged_attachments(store: InMemoryStore): + room_id = "!room:m.org" + user_id = "@alice:m.org" + + await add_staged_attachment(store, room_id, user_id, {"id": "att-1"}) + await add_staged_attachment(store, room_id, user_id, {"id": "att-2"}) + + await clear_staged_attachments(store, room_id, user_id) + + assert await get_staged_attachments(store, room_id, user_id) == [] From 83c9a1513b1b5dc04fd060bd8aa0a385e6516aba Mon Sep 17 00:00:00 2001 From: Mikhail Putilovskij Date: Mon, 20 Apr 2026 16:26:37 +0300 Subject: [PATCH 4/7] feat: parse matrix staged attachment commands --- adapter/matrix/converter.py | 48 ++++++++++++++---- tests/adapter/matrix/test_converter.py | 69 +++++++++++++++++++++++--- 2 files changed, 100 insertions(+), 17 deletions(-) diff --git a/adapter/matrix/converter.py b/adapter/matrix/converter.py index 00fcdc4..f8edd78 100644 --- a/adapter/matrix/converter.py +++ b/adapter/matrix/converter.py @@ -14,42 +14,52 @@ PLATFORM = "matrix" def extract_attachments(event: Any) -> list[Attachment]: + content = getattr(event, "content", {}) or {} msgtype = getattr(event, "msgtype", None) if msgtype is None: - content = getattr(event, "content", {}) or {} msgtype = content.get("msgtype") + url = content.get("url") or getattr(event, "url", None) + filename = content.get("body") or getattr(event, "body", None) + mime_type = content.get("mimetype") or getattr(event, "mimetype", None) + if mime_type is None: + info = content.get("info") or {} + if isinstance(info, dict): + mime_type = info.get("mimetype") if msgtype == "m.image": return [ Attachment( type="image", - url=getattr(event, "url", None), - mime_type=getattr(event, "mimetype", None), + url=url, + filename=filename, + mime_type=mime_type, ) ] if msgtype == "m.file": return [ Attachment( type="document", - url=getattr(event, "url", None), - filename=getattr(event, "body", None), - mime_type=getattr(event, "mimetype", None), + url=url, + filename=filename, + mime_type=mime_type, ) ] if msgtype == "m.audio": return [ Attachment( type="audio", - url=getattr(event, "url", None), - mime_type=getattr(event, "mimetype", None), + url=url, + filename=filename, + mime_type=mime_type, ) ] if msgtype == "m.video": return [ Attachment( type="video", - url=getattr(event, "url", None), - mime_type=getattr(event, "mimetype", None), + url=url, + filename=filename, + mime_type=mime_type, ) ] return [] @@ -75,6 +85,24 @@ def from_command(body: str, sender: str, chat_id: str, room_id: str | None = Non }, ) + if command == "list" and not args: + return IncomingCommand( + user_id=sender, + platform=PLATFORM, + chat_id=chat_id, + command="matrix_list_attachments", + args=[], + ) + + if command == "remove" and len(args) == 1: + return IncomingCommand( + user_id=sender, + platform=PLATFORM, + chat_id=chat_id, + command="matrix_remove_attachment", + args=[args[0]], + ) + aliases = { "skills": "settings_skills", "connectors": "settings_connectors", diff --git a/tests/adapter/matrix/test_converter.py b/tests/adapter/matrix/test_converter.py index ecaecdc..a6b75fb 100644 --- a/tests/adapter/matrix/test_converter.py +++ b/tests/adapter/matrix/test_converter.py @@ -37,7 +37,23 @@ def image_event(url: str = "mxc://x/img", mime: str = "image/jpeg"): ) -async def test_plain_text_to_incoming_message(): +def content_file_event(): + return SimpleNamespace( + sender="@a:m.org", + body="doc.pdf", + event_id="$e4", + msgtype=None, + replyto_event_id=None, + content={ + "msgtype": "m.file", + "body": "nested.pdf", + "url": "mxc://x/nested", + "info": {"mimetype": "application/pdf"}, + }, + ) + + +def test_plain_text_to_incoming_message(): result = from_room_event(text_event("Hello"), room_id="!r:m.org", chat_id="C1") assert isinstance(result, IncomingMessage) assert result.text == "Hello" @@ -46,20 +62,48 @@ async def test_plain_text_to_incoming_message(): assert result.attachments == [] -async def test_bang_command_to_incoming_command(): +def test_bang_command_to_incoming_command(): result = from_room_event(text_event("!new Analysis"), room_id="!r:m.org", chat_id="C1") assert isinstance(result, IncomingCommand) assert result.command == "new" assert result.args == ["Analysis"] -async def test_skills_alias_to_settings_command(): +def test_list_command_maps_to_matrix_list_attachments(): + result = from_room_event(text_event("!list"), room_id="!r:m.org", chat_id="C1") + assert isinstance(result, IncomingCommand) + assert result.command == "matrix_list_attachments" + assert result.args == [] + + +def test_remove_all_maps_to_matrix_remove_attachment(): + result = from_room_event(text_event("!remove all"), room_id="!r:m.org", chat_id="C1") + assert isinstance(result, IncomingCommand) + assert result.command == "matrix_remove_attachment" + assert result.args == ["all"] + + +def test_remove_index_maps_to_matrix_remove_attachment(): + result = from_room_event(text_event("!remove 2"), room_id="!r:m.org", chat_id="C1") + assert isinstance(result, IncomingCommand) + assert result.command == "matrix_remove_attachment" + assert result.args == ["2"] + + +def test_remove_arbitrary_index_maps_to_matrix_remove_attachment(): + result = from_room_event(text_event("!remove 99"), room_id="!r:m.org", chat_id="C1") + assert isinstance(result, IncomingCommand) + assert result.command == "matrix_remove_attachment" + assert result.args == ["99"] + + +def test_skills_alias_to_settings_command(): result = from_command("!skills", sender="@a:m.org", chat_id="C1") assert isinstance(result, IncomingCommand) assert result.command == "settings_skills" -async def test_yes_to_callback(): +def test_yes_to_callback(): result = from_room_event(text_event("!yes"), room_id="!room:example.org", chat_id="C7") assert isinstance(result, IncomingCallback) assert result.action == "confirm" @@ -67,7 +111,7 @@ async def test_yes_to_callback(): assert result.payload["room_id"] == "!room:example.org" -async def test_no_to_callback(): +def test_no_to_callback(): result = from_room_event(text_event("!no"), room_id="!room:example.org", chat_id="C7") assert isinstance(result, IncomingCallback) assert result.action == "cancel" @@ -75,7 +119,7 @@ async def test_no_to_callback(): assert result.payload["room_id"] == "!room:example.org" -async def test_file_attachment(): +def test_file_attachment(): result = from_room_event(file_event(), room_id="!r:m.org", chat_id="C1") assert isinstance(result, IncomingMessage) assert len(result.attachments) == 1 @@ -86,11 +130,22 @@ async def test_file_attachment(): assert a.mime_type == "application/pdf" -async def test_image_attachment(): +def test_image_attachment(): result = from_room_event(image_event(), room_id="!r:m.org", chat_id="C1") assert result.attachments[0].type == "image" + assert result.attachments[0].filename == "img.jpg" assert result.attachments[0].mime_type == "image/jpeg" +def test_attachment_falls_back_to_content_payload(): + result = from_room_event(content_file_event(), room_id="!r:m.org", chat_id="C1") + assert isinstance(result, IncomingMessage) + a = result.attachments[0] + assert a.type == "document" + assert a.url == "mxc://x/nested" + assert a.filename == "nested.pdf" + assert a.mime_type == "application/pdf" + + def test_converter_module_does_not_expose_reaction_callbacks(): assert not hasattr(converter, "from_reaction") From f111ed334888e758244056c503c1de07c66232ec Mon Sep 17 00:00:00 2001 From: Mikhail Putilovskij Date: Mon, 20 Apr 2026 21:37:12 +0300 Subject: [PATCH 5/7] feat: add matrix staging list and remove flow --- adapter/matrix/bot.py | 221 ++++++++++++++++- tests/adapter/matrix/test_dispatcher.py | 309 +++++++++++++++++++++++- 2 files changed, 510 insertions(+), 20 deletions(-) diff --git a/adapter/matrix/bot.py b/adapter/matrix/bot.py index 44d7c95..39c1c77 100644 --- a/adapter/matrix/bot.py +++ b/adapter/matrix/bot.py @@ -6,6 +6,7 @@ from dataclasses import dataclass from pathlib import Path import structlog +from dotenv import load_dotenv from nio import ( AsyncClient, AsyncClientConfig, @@ -15,28 +16,38 @@ from nio import ( RoomMessageText, ) from nio.responses import SyncResponse -from dotenv import load_dotenv from adapter.matrix.converter import from_room_event +from adapter.matrix.files import ( + download_matrix_attachment, + matrix_msgtype_for_attachment, + resolve_workspace_attachment_path, +) from adapter.matrix.handlers import register_matrix_handlers +from adapter.matrix.handlers.auth import handle_invite, provision_workspace_chat from adapter.matrix.handlers.context_commands import ( LOAD_PROMPT, ) -from adapter.matrix.handlers.auth import handle_invite, provision_workspace_chat from adapter.matrix.room_router import resolve_chat_id from adapter.matrix.store import ( + add_staged_attachment, clear_load_pending, + clear_staged_attachments, get_load_pending, get_room_meta, + get_staged_attachments, + remove_staged_attachment_at, + set_pending_confirm, set_platform_chat_id, set_room_meta, - set_pending_confirm, ) from core.auth import AuthManager from core.chat import ChatManager from core.handler import EventDispatcher from core.handlers import register_all from core.protocol import ( + IncomingCommand, + IncomingMessage, OutgoingEvent, OutgoingMessage, OutgoingNotification, @@ -197,6 +208,44 @@ class MatrixBot: incoming = from_room_event(event, room_id=room.room_id, chat_id=dispatch_chat_id) if incoming is None: return + if isinstance(incoming, IncomingCommand) and incoming.command in { + "matrix_list_attachments", + "matrix_remove_attachment", + }: + outgoing = await self._handle_staged_attachment_command( + room.room_id, + sender, + incoming, + ) + await self._send_all(room.room_id, outgoing) + return + if self._is_file_only_event(event, incoming): + materialized = await self._materialize_incoming_attachments( + room.room_id, + sender, + incoming, + ) + await self._stage_attachments(room.room_id, sender, materialized.attachments) + await self._send_all( + room.room_id, + [ + OutgoingMessage( + chat_id=dispatch_chat_id, + text=await self._format_staged_attachments( + room.room_id, + sender, + include_hint=True, + ), + ) + ], + ) + return + if isinstance(incoming, IncomingMessage) and incoming.attachments: + incoming = await self._materialize_incoming_attachments( + room.room_id, + sender, + incoming, + ) try: outgoing = await self.runtime.dispatcher.dispatch(incoming) except PlatformError as exc: @@ -210,11 +259,125 @@ class MatrixBot: outgoing = [ OutgoingMessage( chat_id=dispatch_chat_id, - text="Сервис временно недоступен. Попробуйте ещё раз позже." + text="Сервис временно недоступен. Попробуйте ещё раз позже.", ) ] await self._send_all(room.room_id, outgoing) + def _is_file_only_event( + self, event: RoomMessageText, incoming: IncomingMessage | IncomingCommand + ) -> bool: + return ( + isinstance(incoming, IncomingMessage) + and bool(incoming.attachments) + and getattr(event, "msgtype", None) != "m.text" + ) + + async def _stage_attachments( + self, + room_id: str, + user_id: str, + attachments: list, + ) -> None: + for attachment in attachments: + await add_staged_attachment( + self.runtime.store, + room_id, + user_id, + { + "type": attachment.type, + "url": attachment.url, + "filename": attachment.filename, + "mime_type": attachment.mime_type, + "workspace_path": attachment.workspace_path, + }, + ) + + async def _format_staged_attachments( + self, + room_id: str, + user_id: str, + *, + include_hint: bool = False, + ) -> str: + attachments = await get_staged_attachments(self.runtime.store, room_id, user_id) + if not attachments: + return "Нет сохраненных вложений." + + lines = ["Вложения в очереди:"] + for index, attachment in enumerate(attachments, start=1): + lines.append(f"{index}. {attachment.get('filename') or 'attachment'}") + if include_hint: + lines.extend( + [ + "", + "Следующее сообщение отправит файлы агенту.", + "Команды: !list, !remove , !remove all", + ] + ) + return "\n".join(lines) + + async def _handle_staged_attachment_command( + self, + room_id: str, + user_id: str, + incoming: IncomingCommand, + ) -> list[OutgoingEvent]: + if incoming.command == "matrix_list_attachments": + return [ + OutgoingMessage( + chat_id=incoming.chat_id, + text=await self._format_staged_attachments(room_id, user_id), + ) + ] + + arg = incoming.args[0] if incoming.args else "" + if arg == "all": + await clear_staged_attachments(self.runtime.store, room_id, user_id) + return [OutgoingMessage(chat_id=incoming.chat_id, text="Все вложения удалены.")] + + try: + index = int(arg) - 1 + except ValueError: + return [OutgoingMessage(chat_id=incoming.chat_id, text="Нет такого вложения.")] + + removed = await remove_staged_attachment_at(self.runtime.store, room_id, user_id, index) + if removed is None: + return [OutgoingMessage(chat_id=incoming.chat_id, text="Нет такого вложения.")] + return [ + OutgoingMessage( + chat_id=incoming.chat_id, + text=await self._format_staged_attachments(room_id, user_id), + ) + ] + + async def _materialize_incoming_attachments( + self, + room_id: str, + matrix_user_id: str, + incoming: IncomingMessage, + ) -> IncomingMessage: + workspace_root = Path(os.environ.get("SURFACES_WORKSPACE_DIR", "/workspace")) + materialized = [] + for attachment in incoming.attachments: + materialized.append( + await download_matrix_attachment( + client=self.client, + workspace_root=workspace_root, + matrix_user_id=matrix_user_id, + room_id=room_id, + attachment=attachment, + ) + ) + return IncomingMessage( + user_id=incoming.user_id, + platform=incoming.platform, + chat_id=incoming.chat_id, + text=incoming.text, + attachments=materialized, + reply_to=incoming.reply_to, + ) + async def _bootstrap_unregistered_room( self, room: MatrixRoom, @@ -251,11 +414,6 @@ class MatrixBot: f"Привет, {created['user'].display_name or sender}! Пиши — я здесь.\n\n" "Команды: !new · !chats · !rename · !archive · !context · !save · !load · !help" ) - await self.client.room_send( - created["chat_room_id"], - "m.room.message", - {"msgtype": "m.text", "body": welcome}, - ) await set_room_meta( self.runtime.store, room.room_id, @@ -265,12 +423,18 @@ class MatrixBot: "redirect_chat_id": created["chat_id"], }, ) + await self.client.room_send( + created["chat_room_id"], + "m.room.message", + {"msgtype": "m.text", "body": welcome}, + ) return [ OutgoingMessage( chat_id=room.room_id, text=( f"Создал рабочий чат {created['room_name']} ({created['chat_id']}) " - "и добавил его в пространство Lambda. Открой приглашённую комнату для продолжения." + "и добавил его в пространство Lambda. " + "Открой приглашённую комнату для продолжения." ), ) ] @@ -323,7 +487,9 @@ class MatrixBot: except Exception as exc: logger.warning("load_agent_call_failed", error=str(exc)) return [OutgoingMessage(chat_id=room_id, text=f"Ошибка при загрузке: {exc}")] - return [OutgoingMessage(chat_id=room_id, text=f"Запрос на загрузку отправлен агенту: {name}")] + return [ + OutgoingMessage(chat_id=room_id, text=f"Запрос на загрузку отправлен агенту: {name}") + ] async def on_member(self, room: MatrixRoom, event: RoomMemberEvent) -> None: if getattr(event, "sender", None) == self.client.user_id: @@ -351,6 +517,7 @@ async def prepare_live_sync(client: AsyncClient) -> str | None: return response.next_batch return None + async def send_outgoing( client: AsyncClient, room_id: str, @@ -365,7 +532,37 @@ async def send_outgoing( await client.room_send(room_id, "m.room.message", {"msgtype": "m.text", "body": body}) return if isinstance(event, OutgoingMessage): - await client.room_send(room_id, "m.room.message", {"msgtype": "m.text", "body": event.text}) + if event.text: + await client.room_send( + room_id, "m.room.message", {"msgtype": "m.text", "body": event.text} + ) + if event.attachments: + workspace_root = Path(os.environ.get("SURFACES_WORKSPACE_DIR", "/workspace")) + for attachment in event.attachments: + if not attachment.workspace_path: + continue + file_path = resolve_workspace_attachment_path( + workspace_root, attachment.workspace_path + ) + with file_path.open("rb") as handle: + upload_response, _ = await client.upload( + handle, + content_type=attachment.mime_type or "application/octet-stream", + filename=attachment.filename or file_path.name, + filesize=file_path.stat().st_size, + ) + content_uri = getattr(upload_response, "content_uri", None) + if not content_uri: + raise RuntimeError(f"Matrix upload failed for {file_path}") + await client.room_send( + room_id, + "m.room.message", + { + "msgtype": matrix_msgtype_for_attachment(attachment), + "body": attachment.filename or file_path.name, + "url": content_uri, + }, + ) return if isinstance(event, OutgoingUI): lines = [event.text] diff --git a/tests/adapter/matrix/test_dispatcher.py b/tests/adapter/matrix/test_dispatcher.py index 10a4f36..0c92686 100644 --- a/tests/adapter/matrix/test_dispatcher.py +++ b/tests/adapter/matrix/test_dispatcher.py @@ -4,20 +4,29 @@ import importlib from types import SimpleNamespace from unittest.mock import AsyncMock +import pytest from nio.api import RoomVisibility from nio.responses import SyncResponse from adapter.matrix.bot import MatrixBot, build_runtime, prepare_live_sync from adapter.matrix.handlers.auth import handle_invite from adapter.matrix.store import ( + add_staged_attachment, get_platform_chat_id, get_room_meta, + get_staged_attachments, get_user_meta, set_load_pending, set_room_meta, set_user_meta, ) -from core.protocol import IncomingCallback, IncomingCommand, OutgoingMessage +from core.protocol import ( + Attachment, + IncomingCallback, + IncomingCommand, + IncomingMessage, + OutgoingMessage, +) from sdk.interface import PlatformError from sdk.mock import MockPlatformClient from sdk.real import RealPlatformClient @@ -27,7 +36,9 @@ async def test_matrix_dispatcher_registers_custom_handlers(): runtime = build_runtime(platform=MockPlatformClient()) current_chat_id = "C9" - start = IncomingCommand(user_id="u1", platform="matrix", chat_id=current_chat_id, command="start") + start = IncomingCommand( + user_id="u1", platform="matrix", chat_id=current_chat_id, command="start" + ) await runtime.dispatcher.dispatch(start) new = IncomingCommand( @@ -93,7 +104,9 @@ async def test_new_chat_creates_real_matrix_room_when_client_available(): ) client.room_put_state.assert_awaited_once() put_call = client.room_put_state.call_args - assert put_call.kwargs.get("room_id") == "!space:example" or put_call.args[0] == "!space:example" + assert ( + put_call.kwargs.get("room_id") == "!space:example" or put_call.args[0] == "!space:example" + ) chats = await runtime.chat_mgr.list_active("u1") assert [c.chat_id for c in chats] == ["C7"] assert [c.surface_ref for c in chats] == ["!r2:example"] @@ -139,7 +152,10 @@ async def test_invite_event_creates_space_and_chat_room(): client.room_put_state.assert_awaited_once() put_state_call = client.room_put_state.call_args - assert put_state_call.kwargs.get("event_type") == "m.space.child" or put_state_call.args[1] == "m.space.child" + assert ( + put_state_call.kwargs.get("event_type") == "m.space.child" + or put_state_call.args[1] == "m.space.child" + ) user_meta = await get_user_meta(runtime.store, "@alice:example.org") assert user_meta is not None @@ -249,7 +265,10 @@ async def test_bot_assigns_platform_chat_id_for_existing_managed_room(): await bot.on_room_message(room, event) - assert await get_platform_chat_id(runtime.store, "!chat1:example.org") == "matrix:!chat1:example.org" + assert ( + await get_platform_chat_id(runtime.store, "!chat1:example.org") + == "matrix:!chat1:example.org" + ) runtime.dispatcher.dispatch.assert_awaited_once() @@ -278,6 +297,236 @@ async def test_bot_routes_plain_messages_via_platform_chat_id(): assert dispatched.text == "hello" +async def test_bot_downloads_matrix_file_to_workspace_before_staging(tmp_path, monkeypatch): + monkeypatch.setenv("SURFACES_WORKSPACE_DIR", str(tmp_path)) + runtime = build_runtime(platform=MockPlatformClient()) + await set_room_meta( + runtime.store, + "!chat1:example.org", + { + "chat_id": "C1", + "matrix_user_id": "@alice:example.org", + "platform_chat_id": "matrix:ctx-1", + }, + ) + client = SimpleNamespace( + user_id="@bot:example.org", + download=AsyncMock(return_value=SimpleNamespace(body=b"%PDF-1.7")), + ) + bot = MatrixBot(client, runtime) + bot._send_all = AsyncMock() + runtime.dispatcher.dispatch = AsyncMock(return_value=[]) + room = SimpleNamespace(room_id="!chat1:example.org") + event = SimpleNamespace( + sender="@alice:example.org", + body="report.pdf", + msgtype="m.file", + replyto_event_id=None, + url="mxc://server/id", + mimetype="application/pdf", + ) + + await bot.on_room_message(room, event) + + runtime.dispatcher.dispatch.assert_not_awaited() + staged = await get_staged_attachments(runtime.store, "!chat1:example.org", "@alice:example.org") + assert staged[0]["workspace_path"] is not None + assert (tmp_path / staged[0]["workspace_path"]).read_bytes() == b"%PDF-1.7" + bot._send_all.assert_awaited_once() + + +async def test_file_only_event_is_staged_and_does_not_dispatch(): + runtime = build_runtime(platform=MockPlatformClient()) + client = SimpleNamespace(user_id="@bot:example.org", room_send=AsyncMock()) + bot = MatrixBot(client, runtime) + runtime.dispatcher.dispatch = AsyncMock(return_value=[]) + bot._materialize_incoming_attachments = AsyncMock( + return_value=IncomingMessage( + user_id="@alice:example.org", + platform="matrix", + chat_id="!r:example.org", + text="", + attachments=[ + Attachment( + type="document", + filename="report.pdf", + workspace_path="surfaces/matrix/alice/r/inbox/report.pdf", + mime_type="application/pdf", + ) + ], + ) + ) + room = SimpleNamespace(room_id="!r:example.org") + event = SimpleNamespace( + sender="@alice:example.org", + body="report.pdf", + msgtype="m.file", + url="mxc://hs/id", + mimetype="application/pdf", + replyto_event_id=None, + ) + + await bot.on_room_message(room, event) + + runtime.dispatcher.dispatch.assert_not_awaited() + staged = await get_staged_attachments(runtime.store, "!r:example.org", "@alice:example.org") + assert [item["filename"] for item in staged] == ["report.pdf"] + client.room_send.assert_awaited_once() + assert ( + "Следующее сообщение отправит файлы агенту." in client.room_send.await_args.args[2]["body"] + ) + + +async def test_list_command_returns_current_staged_attachments(): + runtime = build_runtime(platform=MockPlatformClient()) + await add_staged_attachment( + runtime.store, + "!r:example.org", + "@alice:example.org", + {"filename": "a.pdf", "workspace_path": "a.pdf"}, + ) + await add_staged_attachment( + runtime.store, + "!r:example.org", + "@alice:example.org", + {"filename": "b.pdf", "workspace_path": "b.pdf"}, + ) + client = SimpleNamespace(user_id="@bot:example.org", room_send=AsyncMock()) + bot = MatrixBot(client, runtime) + runtime.dispatcher.dispatch = AsyncMock(return_value=[]) + room = SimpleNamespace(room_id="!r:example.org") + event = SimpleNamespace( + sender="@alice:example.org", body="!list", msgtype="m.text", replyto_event_id=None + ) + + await bot.on_room_message(room, event) + + runtime.dispatcher.dispatch.assert_not_awaited() + body = client.room_send.await_args.args[2]["body"] + assert "1. a.pdf" in body + assert "2. b.pdf" in body + + +async def test_remove_invalid_index_returns_short_error(): + runtime = build_runtime(platform=MockPlatformClient()) + await add_staged_attachment( + runtime.store, + "!r:example.org", + "@alice:example.org", + {"filename": "a.pdf", "workspace_path": "a.pdf"}, + ) + client = SimpleNamespace(user_id="@bot:example.org", room_send=AsyncMock()) + bot = MatrixBot(client, runtime) + runtime.dispatcher.dispatch = AsyncMock(return_value=[]) + room = SimpleNamespace(room_id="!r:example.org") + event = SimpleNamespace( + sender="@alice:example.org", body="!remove 9", msgtype="m.text", replyto_event_id=None + ) + + await bot.on_room_message(room, event) + + runtime.dispatcher.dispatch.assert_not_awaited() + assert client.room_send.await_args.args[2]["body"] == "Нет такого вложения." + + +async def test_remove_attachment_updates_list_and_state(): + runtime = build_runtime(platform=MockPlatformClient()) + await add_staged_attachment( + runtime.store, + "!r:example.org", + "@alice:example.org", + {"filename": "a.pdf", "workspace_path": "a.pdf"}, + ) + await add_staged_attachment( + runtime.store, + "!r:example.org", + "@alice:example.org", + {"filename": "b.pdf", "workspace_path": "b.pdf"}, + ) + client = SimpleNamespace(user_id="@bot:example.org", room_send=AsyncMock()) + bot = MatrixBot(client, runtime) + runtime.dispatcher.dispatch = AsyncMock(return_value=[]) + room = SimpleNamespace(room_id="!r:example.org") + event = SimpleNamespace( + sender="@alice:example.org", body="!remove 1", msgtype="m.text", replyto_event_id=None + ) + + await bot.on_room_message(room, event) + + runtime.dispatcher.dispatch.assert_not_awaited() + staged = await get_staged_attachments(runtime.store, "!r:example.org", "@alice:example.org") + assert [item["filename"] for item in staged] == ["b.pdf"] + body = client.room_send.await_args.args[2]["body"] + assert "1. b.pdf" in body + assert "a.pdf" not in body + + +async def test_remove_all_clears_state(): + runtime = build_runtime(platform=MockPlatformClient()) + await add_staged_attachment( + runtime.store, + "!r:example.org", + "@alice:example.org", + {"filename": "a.pdf", "workspace_path": "a.pdf"}, + ) + client = SimpleNamespace(user_id="@bot:example.org", room_send=AsyncMock()) + bot = MatrixBot(client, runtime) + runtime.dispatcher.dispatch = AsyncMock(return_value=[]) + room = SimpleNamespace(room_id="!r:example.org") + event = SimpleNamespace( + sender="@alice:example.org", + body="!remove all", + msgtype="m.text", + replyto_event_id=None, + ) + + await bot.on_room_message(room, event) + + runtime.dispatcher.dispatch.assert_not_awaited() + assert await get_staged_attachments(runtime.store, "!r:example.org", "@alice:example.org") == [] + assert client.room_send.await_args.args[2]["body"] == "Все вложения удалены." + + +async def test_staged_attachment_commands_are_scoped_by_room_and_user(): + runtime = build_runtime(platform=MockPlatformClient()) + await add_staged_attachment( + runtime.store, + "!r-one:example.org", + "@alice:example.org", + {"filename": "alice-room-one.pdf", "workspace_path": "alice-room-one.pdf"}, + ) + await add_staged_attachment( + runtime.store, + "!r-two:example.org", + "@alice:example.org", + {"filename": "alice-room-two.pdf", "workspace_path": "alice-room-two.pdf"}, + ) + await add_staged_attachment( + runtime.store, + "!r-one:example.org", + "@bob:example.org", + {"filename": "bob-room-one.pdf", "workspace_path": "bob-room-one.pdf"}, + ) + client = SimpleNamespace(user_id="@bot:example.org", room_send=AsyncMock()) + bot = MatrixBot(client, runtime) + runtime.dispatcher.dispatch = AsyncMock(return_value=[]) + room = SimpleNamespace(room_id="!r-one:example.org") + event = SimpleNamespace( + sender="@alice:example.org", + body="!list", + msgtype="m.text", + replyto_event_id=None, + ) + + await bot.on_room_message(room, event) + + runtime.dispatcher.dispatch.assert_not_awaited() + body = client.room_send.await_args.args[2]["body"] + assert "alice-room-one.pdf" in body + assert "alice-room-two.pdf" not in body + assert "bob-room-one.pdf" not in body + + async def test_bot_keeps_commands_on_local_chat_id(): runtime = build_runtime(platform=MockPlatformClient()) await set_room_meta( @@ -350,7 +599,10 @@ async def test_bot_assigns_platform_chat_id_before_load_selection(): await bot.on_room_message(room, event) - assert await get_platform_chat_id(runtime.store, "!chat1:example.org") == "matrix:!chat1:example.org" + assert ( + await get_platform_chat_id(runtime.store, "!chat1:example.org") + == "matrix:!chat1:example.org" + ) client.room_send.assert_awaited_once_with( "!chat1:example.org", "m.room.message", @@ -415,7 +667,9 @@ async def test_unregistered_room_second_message_reuses_existing_bootstrap(): room = SimpleNamespace(room_id="!entry:example.org", display_name="Entry") await bot.on_room_message(room, SimpleNamespace(sender="@alice:example.org", body="hello")) - await bot.on_room_message(room, SimpleNamespace(sender="@alice:example.org", body="hello again")) + await bot.on_room_message( + room, SimpleNamespace(sender="@alice:example.org", body="hello again") + ) assert client.room_create.await_count == 2 room_send_calls = client.room_send.await_args_list @@ -430,6 +684,43 @@ async def test_unregistered_room_second_message_reuses_existing_bootstrap(): assert "platform_chat_id" not in entry_meta +async def test_unregistered_room_welcome_send_failure_does_not_repeat_bootstrap(): + runtime = build_runtime(platform=MockPlatformClient()) + await set_user_meta(runtime.store, "@alice:example.org", {"next_chat_index": 1}) + space_resp = SimpleNamespace(room_id="!space:example.org") + chat_resp = SimpleNamespace(room_id="!chat1:example.org") + client = SimpleNamespace( + user_id="@bot:example.org", + room_create=AsyncMock(side_effect=[space_resp, chat_resp]), + room_put_state=AsyncMock(), + room_send=AsyncMock(side_effect=[RuntimeError("welcome failed"), None]), + ) + bot = MatrixBot(client, runtime) + room = SimpleNamespace(room_id="!entry:example.org", display_name="Entry") + + with pytest.raises(RuntimeError, match="welcome failed"): + await bot.on_room_message(room, SimpleNamespace(sender="@alice:example.org", body="hello")) + + entry_meta = await get_room_meta(runtime.store, "!entry:example.org") + assert entry_meta == { + "matrix_user_id": "@alice:example.org", + "redirect_room_id": "!chat1:example.org", + "redirect_chat_id": "C1", + } + + await bot.on_room_message( + room, SimpleNamespace(sender="@alice:example.org", body="hello again") + ) + + assert client.room_create.await_count == 2 + room_send_calls = client.room_send.await_args_list + assert any( + call.args[0] == "!entry:example.org" + and "Рабочий чат уже создан: C1" in call.args[2]["body"] + for call in room_send_calls + ) + + async def test_unregistered_room_creates_new_chat_in_existing_space(): runtime = build_runtime(platform=MockPlatformClient()) await set_user_meta( @@ -466,7 +757,9 @@ async def test_mat11_settings_returns_mvp_unavailable_message(): runtime = build_runtime(platform=MockPlatformClient()) current_chat_id = "C9" - start = IncomingCommand(user_id="u1", platform="matrix", chat_id=current_chat_id, command="start") + start = IncomingCommand( + user_id="u1", platform="matrix", chat_id=current_chat_id, command="start" + ) await runtime.dispatcher.dispatch(start) settings_cmd = IncomingCommand( From 323a6d3144f838b668fc0a4537766bdc9374cb35 Mon Sep 17 00:00:00 2001 From: Mikhail Putilovskij Date: Mon, 20 Apr 2026 21:39:37 +0300 Subject: [PATCH 6/7] feat: commit staged matrix attachments on next message --- README.md | 53 +++++++++------- adapter/matrix/bot.py | 42 +++++++++++++ tests/adapter/matrix/test_dispatcher.py | 83 +++++++++++++++++++++++++ 3 files changed, 155 insertions(+), 23 deletions(-) diff --git a/README.md b/README.md index 82cd55c..a9d7f71 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ | Поверхность | Статус | |---|---| | Telegram | 🔨 В разработке, отдельный worktree `feat/telegram-adapter` | -| Matrix | ✅ Рабочий прототип, подключается к реальному агенту | +| Matrix | ✅ Рабочий прототип, запускается через root `docker compose` вместе с `platform-agent` | --- @@ -69,8 +69,8 @@ surfaces-bot/ - **Диалог** — сообщения, вложения, подтверждения `!yes` / `!no` и routing через `EventDispatcher` - **Стабильность** — перед `sync_forever()` бот делает bootstrap sync и стартует с `since`, чтобы не переигрывать старую timeline после рестарта - **Текущее ограничение** — encrypted DM пока не поддержан; ручное тестирование Matrix ведётся в незашифрованных комнатах и зависит от локального state-store бота -- **Backend selection** — `MATRIX_PLATFORM_BACKEND=mock` остаётся значением по умолчанию; `MATRIX_PLATFORM_BACKEND=real` требует `AGENT_WS_URL=ws://host:port/agent_ws/` -- **Ограничения real backend** — пока это текстовый direct-agent прототип без вложений и без асинхронных callbacks; локальные настройки и user-state хранятся в `PrototypeStateStore` +- **Backend selection** — `MATRIX_PLATFORM_BACKEND=mock` остаётся значением по умолчанию; `MATRIX_PLATFORM_BACKEND=real` использует `platform-agent` из compose и WebSocket contract `/v1/agent_ws/{chat_id}/` +- **Ограничения real backend** — локальный runtime использует shared `/workspace`, а файлы передаются как относительные пути в `attachments` --- @@ -90,6 +90,7 @@ class PlatformClient(Protocol): Бот передаёт `user_id` + `chat_id` + сообщение; платформа сама решает нужно ли поднять контейнер. Сейчас: `MockPlatformClient` в `sdk/mock.py`, а Matrix real backend собирается через `sdk/real.py` при `MATRIX_PLATFORM_BACKEND=real`. +Файловый контракт уже path-based: бот пишет файлы в shared `/workspace` и передаёт платформе относительные пути в `attachments`. Когда SDK готов: добавляем `SdkPlatformClient`, меняем одну строку в DI. Адаптеры и ядро не трогаем. --- @@ -120,32 +121,38 @@ MATRIX_PASSWORD=... # или MATRIX_ACCESS_TOKEN=... # Выбор backend: mock (по умолчанию) или real (подключение к platform-agent) MATRIX_PLATFORM_BACKEND=real -# URL WebSocket endpoint platform-agent (только при MATRIX_PLATFORM_BACKEND=real) -AGENT_WS_URL=ws://127.0.0.1:8000/agent_ws/ -AGENT_BASE_URL=http://127.0.0.1:8000 +# compose runtime: platform-agent service name + shared /workspace +AGENT_WS_URL=ws://platform-agent:8000/v1/agent_ws/ +AGENT_BASE_URL=http://platform-agent:8000 +SURFACES_WORKSPACE_DIR=/workspace ``` -### 3. Запуск platform-agent (для real backend) +### 3. Compose runtime -platform-agent — отдельный репозиторий, сейчас клонируется в `external/platform-agent`. +Root `docker-compose.yml` теперь является основным локальным runtime для Matrix и platform-agent. +Он поднимает `matrix-bot`, `platform-agent` и общий volume `/workspace`. ```bash -cd external/platform-agent - -# Создать .env с параметрами LLM провайдера -cat > .env <` — удалить вложение по номеру +- `!remove all` — очистить все staged вложения + +Следующее обычное сообщение пользователя уходит агенту вместе со всеми staged файлами. + +### 4. Запуск бота вручную ```bash # Первый запуск или сброс состояния @@ -184,6 +191,7 @@ PYTHONPATH=. uv run python -m adapter.matrix.bot | Состояние контекста | `!context` | Текущая сессия и список сохранений | | Справка | `!help` | | | Подтверждения | `!yes` / `!no` | Для опасных действий | +| Staged вложения | `!list`, `!remove `, `!remove all` | Файлы без текстовой инструкции ставятся в очередь до следующего сообщения | ### Не работает — блокеры на стороне platform-agent @@ -192,7 +200,6 @@ PYTHONPATH=. uv run python -m adapter.matrix.bot | `!load` в другом чате | platform-agent использует `StateBackend` — файлы живут в памяти отдельно для каждого `thread_id`. Файл, сохранённый в чате A, не виден в чате B. Фикс: переключить platform-agent на `FilesystemBackend` с общим хранилищем. | | Счётчик токенов в `!context` | platform-agent отдаёт `tokens_used=0` хардкодом в `MsgEventEnd`. Наш код перехватывает значение корректно. | | `!reset` | platform-agent не имеет endpoint `/reset`. Задокументировано в ТЗ к платформе. | -| Файловые вложения | Нет API загрузки файлов в область видимости агента. ТЗ передано платформе. | | Персистентность между рестартами | platform-agent использует `MemorySaver` (in-memory). Все разговоры теряются при рестарте процесса. | | E2EE комнаты | `python-olm` не собирается на macOS/ARM. Ограничение инфраструктуры. | @@ -201,7 +208,7 @@ PYTHONPATH=. uv run python -m adapter.matrix.bot | Функция | Статус | |---|---| | `!settings`, `!skills`, `!soul`, `!safety` | Заглушки MVP. Требуют готового SDK платформы. | -| Вложения (изображения, документы) | Только текстовые сообщения в текущем MVP. | +| Вложения без текстовой инструкции | Поддержан staged UX только для Matrix. Для других поверхностей ещё не перенесено. | --- diff --git a/adapter/matrix/bot.py b/adapter/matrix/bot.py index 39c1c77..bd3934a 100644 --- a/adapter/matrix/bot.py +++ b/adapter/matrix/bot.py @@ -46,6 +46,7 @@ from core.chat import ChatManager from core.handler import EventDispatcher from core.handlers import register_all from core.protocol import ( + Attachment, IncomingCommand, IncomingMessage, OutgoingEvent, @@ -246,6 +247,13 @@ class MatrixBot: sender, incoming, ) + clear_staged_after_dispatch = False + if isinstance(incoming, IncomingMessage) and incoming.text: + incoming, clear_staged_after_dispatch = await self._merge_staged_attachments( + room.room_id, + sender, + incoming, + ) try: outgoing = await self.runtime.dispatcher.dispatch(incoming) except PlatformError as exc: @@ -262,6 +270,9 @@ class MatrixBot: text="Сервис временно недоступен. Попробуйте ещё раз позже.", ) ] + else: + if clear_staged_after_dispatch: + await clear_staged_attachments(self.runtime.store, room.room_id, sender) await self._send_all(room.room_id, outgoing) def _is_file_only_event( @@ -351,6 +362,37 @@ class MatrixBot: ) ] + async def _merge_staged_attachments( + self, + room_id: str, + user_id: str, + incoming: IncomingMessage, + ) -> tuple[IncomingMessage, bool]: + staged = await get_staged_attachments(self.runtime.store, room_id, user_id) + if not staged: + return incoming, False + attachments = [ + Attachment( + type=item.get("type", "document"), + url=item.get("url"), + filename=item.get("filename"), + mime_type=item.get("mime_type"), + workspace_path=item.get("workspace_path"), + ) + for item in staged + ] + return ( + IncomingMessage( + user_id=incoming.user_id, + platform=incoming.platform, + chat_id=incoming.chat_id, + text=incoming.text, + attachments=attachments, + reply_to=incoming.reply_to, + ), + True, + ) + async def _materialize_incoming_attachments( self, room_id: str, diff --git a/tests/adapter/matrix/test_dispatcher.py b/tests/adapter/matrix/test_dispatcher.py index 0c92686..b50dfe0 100644 --- a/tests/adapter/matrix/test_dispatcher.py +++ b/tests/adapter/matrix/test_dispatcher.py @@ -527,6 +527,89 @@ async def test_staged_attachment_commands_are_scoped_by_room_and_user(): assert "bob-room-one.pdf" not in body +async def test_next_normal_message_commits_staged_attachments(): + runtime = build_runtime(platform=MockPlatformClient()) + await set_room_meta( + runtime.store, + "!r:example.org", + { + "chat_id": "C1", + "matrix_user_id": "@alice:example.org", + "platform_chat_id": "matrix:ctx-1", + }, + ) + await add_staged_attachment( + runtime.store, + "!r:example.org", + "@alice:example.org", + { + "type": "document", + "filename": "report.pdf", + "workspace_path": "surfaces/matrix/alice/r/inbox/report.pdf", + "mime_type": "application/pdf", + }, + ) + client = SimpleNamespace(user_id="@bot:example.org") + bot = MatrixBot(client, runtime) + bot._send_all = AsyncMock() + runtime.dispatcher.dispatch = AsyncMock(return_value=[]) + room = SimpleNamespace(room_id="!r:example.org") + event = SimpleNamespace( + sender="@alice:example.org", + body="Проанализируй", + msgtype="m.text", + replyto_event_id=None, + ) + + await bot.on_room_message(room, event) + + dispatched = runtime.dispatcher.dispatch.await_args.args[0] + assert isinstance(dispatched, IncomingMessage) + assert dispatched.text == "Проанализируй" + assert [a.workspace_path for a in dispatched.attachments] == [ + "surfaces/matrix/alice/r/inbox/report.pdf" + ] + assert await get_staged_attachments(runtime.store, "!r:example.org", "@alice:example.org") == [] + + +async def test_failed_commit_preserves_staged_attachments(): + runtime = build_runtime(platform=MockPlatformClient()) + await set_room_meta( + runtime.store, + "!r:example.org", + { + "chat_id": "C1", + "matrix_user_id": "@alice:example.org", + "platform_chat_id": "matrix:ctx-1", + }, + ) + await add_staged_attachment( + runtime.store, + "!r:example.org", + "@alice:example.org", + { + "type": "document", + "filename": "report.pdf", + "workspace_path": "surfaces/matrix/alice/r/inbox/report.pdf", + }, + ) + client = SimpleNamespace(user_id="@bot:example.org", room_send=AsyncMock()) + bot = MatrixBot(client, runtime) + runtime.dispatcher.dispatch = AsyncMock(side_effect=PlatformError("boom")) + room = SimpleNamespace(room_id="!r:example.org") + event = SimpleNamespace( + sender="@alice:example.org", + body="Проанализируй", + msgtype="m.text", + replyto_event_id=None, + ) + + await bot.on_room_message(room, event) + + staged = await get_staged_attachments(runtime.store, "!r:example.org", "@alice:example.org") + assert [item["filename"] for item in staged] == ["report.pdf"] + + async def test_bot_keeps_commands_on_local_chat_id(): runtime = build_runtime(platform=MockPlatformClient()) await set_room_meta( From 6422c7db5872ef780af587b6cb0bc1cc1026a890 Mon Sep 17 00:00:00 2001 From: Mikhail Putilovskij Date: Tue, 21 Apr 2026 00:26:21 +0300 Subject: [PATCH 7/7] feat: support shared-workspace file flow for matrix --- .env.example | 19 +- README.md | 4 +- adapter/matrix/bot.py | 33 +-- adapter/matrix/converter.py | 3 +- adapter/matrix/files.py | 103 +++++++++ core/handlers/message.py | 9 +- core/protocol.py | 1 + docker-compose.yml | 34 +++ sdk/agent_api_wrapper.py | 72 ++++-- sdk/interface.py | 8 +- sdk/real.py | 191 ++++++++++++++-- tests/adapter/matrix/test_converter.py | 28 +++ tests/adapter/matrix/test_dispatcher.py | 54 ++++- tests/adapter/matrix/test_files.py | 50 +++++ tests/adapter/matrix/test_send_outgoing.py | 38 +++- tests/core/test_dispatcher.py | 21 ++ tests/core/test_integration.py | 35 ++- tests/platform/test_real.py | 248 ++++++++++++++++++++- 18 files changed, 871 insertions(+), 80 deletions(-) create mode 100644 adapter/matrix/files.py create mode 100644 tests/adapter/matrix/test_files.py diff --git a/.env.example b/.env.example index c7edcbc..3af498d 100644 --- a/.env.example +++ b/.env.example @@ -5,13 +5,16 @@ TELEGRAM_BOT_TOKEN=your_bot_token_here MATRIX_HOMESERVER=https://matrix.org MATRIX_USER_ID=@bot:matrix.org MATRIX_PASSWORD=your_password_here - -# Lambda Platform -LAMBDA_PLATFORM_URL=http://localhost:8000 -LAMBDA_SERVICE_TOKEN=your_service_token_here -AGENT_WS_URL=ws://127.0.0.1:8000/agent_ws/ -AGENT_BASE_URL=http://127.0.0.1:8000 MATRIX_PLATFORM_BACKEND=real -# Режим работы: "mock" или "production" -PLATFORM_MODE=mock +# Shared workspace contract +SURFACES_WORKSPACE_DIR=/workspace + +# Compose-local platform-agent route +AGENT_WS_URL=ws://platform-agent:8000/v1/agent_ws/{chat_id}/ +AGENT_BASE_URL=http://platform-agent:8000 + +# platform-agent provider +PROVIDER_MODEL=openai/gpt-4o-mini +PROVIDER_URL=https://openrouter.ai/api/v1 +PROVIDER_API_KEY=sk-or-... diff --git a/README.md b/README.md index a9d7f71..8d95c6b 100644 --- a/README.md +++ b/README.md @@ -136,7 +136,9 @@ Root `docker-compose.yml` теперь является основным лок docker compose up --build ``` -Compose использует локальные директории `external/platform-agent` и `external/platform-agent_api` как источник кода для агента. +Compose собирает `platform-agent` из актуального upstream `external/platform-agent` Dockerfile (`development` target), +монтирует live-код из `external/platform-agent/src` и `external/platform-agent_api`, и подготавливает shared `/workspace` +с правами для agent runtime. Matrix бот подключается к `platform-agent` по service name, а не к отдельно запущенному `localhost`. ### 4.1. Staged attachments в Matrix diff --git a/adapter/matrix/bot.py b/adapter/matrix/bot.py index bd3934a..cf8a74f 100644 --- a/adapter/matrix/bot.py +++ b/adapter/matrix/bot.py @@ -13,7 +13,12 @@ from nio import ( InviteMemberEvent, MatrixRoom, RoomMemberEvent, + RoomMessage, + RoomMessageAudio, + RoomMessageFile, + RoomMessageImage, RoomMessageText, + RoomMessageVideo, ) from nio.responses import SyncResponse @@ -227,19 +232,6 @@ class MatrixBot: incoming, ) await self._stage_attachments(room.room_id, sender, materialized.attachments) - await self._send_all( - room.room_id, - [ - OutgoingMessage( - chat_id=dispatch_chat_id, - text=await self._format_staged_attachments( - room.room_id, - sender, - include_hint=True, - ), - ) - ], - ) return if isinstance(incoming, IncomingMessage) and incoming.attachments: incoming = await self._materialize_incoming_attachments( @@ -276,12 +268,12 @@ class MatrixBot: await self._send_all(room.room_id, outgoing) def _is_file_only_event( - self, event: RoomMessageText, incoming: IncomingMessage | IncomingCommand + self, event: RoomMessage, incoming: IncomingMessage | IncomingCommand ) -> bool: return ( isinstance(incoming, IncomingMessage) and bool(incoming.attachments) - and getattr(event, "msgtype", None) != "m.text" + and not isinstance(event, RoomMessageText) ) async def _stage_attachments( @@ -669,7 +661,16 @@ async def main() -> None: since_token = await prepare_live_sync(client) bot = MatrixBot(client, runtime) - client.add_event_callback(bot.on_room_message, RoomMessageText) + client.add_event_callback( + bot.on_room_message, + ( + RoomMessageText, + RoomMessageFile, + RoomMessageImage, + RoomMessageVideo, + RoomMessageAudio, + ), + ) client.add_event_callback(bot.on_member, (InviteMemberEvent, RoomMemberEvent)) logger.info( diff --git a/adapter/matrix/converter.py b/adapter/matrix/converter.py index f8edd78..a19d8ea 100644 --- a/adapter/matrix/converter.py +++ b/adapter/matrix/converter.py @@ -14,7 +14,8 @@ PLATFORM = "matrix" def extract_attachments(event: Any) -> list[Attachment]: - content = getattr(event, "content", {}) or {} + source = getattr(event, "source", {}) or {} + content = source.get("content", {}) or getattr(event, "content", {}) or {} msgtype = getattr(event, "msgtype", None) if msgtype is None: msgtype = content.get("msgtype") diff --git a/adapter/matrix/files.py b/adapter/matrix/files.py new file mode 100644 index 0000000..52d1a1c --- /dev/null +++ b/adapter/matrix/files.py @@ -0,0 +1,103 @@ +from __future__ import annotations + +import mimetypes +import re +from datetime import UTC, datetime +from pathlib import Path + +from core.protocol import Attachment + + +def _sanitize_component(value: str) -> str: + cleaned = re.sub(r"[^A-Za-z0-9._-]+", "_", value) + cleaned = cleaned.strip("._-") + return cleaned or "unknown" + + +def _default_filename(attachment: Attachment) -> str: + if attachment.filename: + return attachment.filename + + extension = mimetypes.guess_extension(attachment.mime_type or "") or "" + base = { + "image": "image", + "audio": "audio", + "video": "video", + "document": "attachment", + }.get(attachment.type, "attachment") + return f"{base}{extension}" + + +def build_workspace_attachment_path( + *, + workspace_root: Path, + matrix_user_id: str, + room_id: str, + filename: str, + timestamp: str | None = None, +) -> tuple[str, Path]: + stamp = timestamp or datetime.now(UTC).strftime("%Y%m%d-%H%M%S") + safe_user = _sanitize_component(matrix_user_id.lstrip("@")) + safe_room = _sanitize_component(room_id.lstrip("!")) + safe_name = _sanitize_component(filename) or "attachment.bin" + relative_path = ( + Path("surfaces") + / "matrix" + / safe_user + / safe_room + / "inbox" + / f"{stamp}-{safe_name}" + ) + return relative_path.as_posix(), workspace_root / relative_path + + +async def download_matrix_attachment( + *, + client, + workspace_root: Path, + matrix_user_id: str, + room_id: str, + attachment: Attachment, + timestamp: str | None = None, +) -> Attachment: + if not attachment.url: + return attachment + + filename = _default_filename(attachment) + relative_path, absolute_path = build_workspace_attachment_path( + workspace_root=workspace_root, + matrix_user_id=matrix_user_id, + room_id=room_id, + filename=filename, + timestamp=timestamp, + ) + absolute_path.parent.mkdir(parents=True, exist_ok=True) + + response = await client.download(attachment.url) + body = getattr(response, "body", None) + if body is None: + raise RuntimeError(f"Matrix download response for {attachment.url} has no body") + absolute_path.write_bytes(body) + + return Attachment( + type=attachment.type, + url=attachment.url, + filename=filename, + mime_type=attachment.mime_type, + workspace_path=relative_path, + ) + + +def resolve_workspace_attachment_path(workspace_root: Path, workspace_path: str) -> Path: + path = Path(workspace_path) + if path.is_absolute(): + return path + return workspace_root / path + + +def matrix_msgtype_for_attachment(attachment: Attachment) -> str: + return { + "image": "m.image", + "audio": "m.audio", + "video": "m.video", + }.get(attachment.type, "m.file") diff --git a/core/handlers/message.py b/core/handlers/message.py index 2edb87e..d9f91cd 100644 --- a/core/handlers/message.py +++ b/core/handlers/message.py @@ -29,10 +29,15 @@ async def handle_message(event: IncomingMessage, auth_mgr, platform, chat_mgr, s user_id=event.user_id, chat_id=event.chat_id, text=event.text, - attachments=[], + attachments=event.attachments, ) return [ OutgoingTyping(chat_id=event.chat_id, is_typing=False), - OutgoingMessage(chat_id=event.chat_id, text=response.response, parse_mode="markdown"), + OutgoingMessage( + chat_id=event.chat_id, + text=response.response, + parse_mode="markdown", + attachments=list(getattr(response, "attachments", [])), + ), ] diff --git a/core/protocol.py b/core/protocol.py index 02a9f4a..7d6e25f 100644 --- a/core/protocol.py +++ b/core/protocol.py @@ -12,6 +12,7 @@ class Attachment: content: bytes | None = None filename: str | None = None mime_type: str | None = None + workspace_path: str | None = None @dataclass diff --git a/docker-compose.yml b/docker-compose.yml index 480ecad..d6c2e4d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,5 +1,39 @@ services: + platform-agent: + build: + context: ./external/platform-agent + target: development + additional_contexts: + agent_api: ./external/platform-agent_api + env_file: .env + environment: + PYTHONUNBUFFERED: "1" + volumes: + - ./external/platform-agent/src:/app/src + - ./external/platform-agent_api:/agent_api + - workspace:/workspace + command: > + sh -lc " + mkdir -p /workspace && + chown -R agent:agent /workspace && + exec /app/.venv/bin/uvicorn src.main:app --host 0.0.0.0 --port 8000 + " + ports: + - "8000:8000" + restart: unless-stopped + matrix-bot: build: . env_file: .env + environment: + AGENT_BASE_URL: http://platform-agent:8000 + AGENT_WS_URL: ws://platform-agent:8000/v1/agent_ws/ + SURFACES_WORKSPACE_DIR: /workspace + depends_on: + - platform-agent + volumes: + - workspace:/workspace restart: unless-stopped + +volumes: + workspace: diff --git a/sdk/agent_api_wrapper.py b/sdk/agent_api_wrapper.py index 32f126d..94205ea 100644 --- a/sdk/agent_api_wrapper.py +++ b/sdk/agent_api_wrapper.py @@ -86,6 +86,55 @@ class AgentApiWrapper(AgentApi): **self._init_kwargs, ) + @staticmethod + def _event_kind(event: object) -> str: + raw_kind = getattr(event, "type", None) + if hasattr(raw_kind, "value"): + raw_kind = raw_kind.value + if raw_kind is None: + raw_kind = event.__class__.__name__ + + kind = str(raw_kind).replace("-", "_") + if "_" in kind: + return kind.upper() + + normalized = [] + for index, char in enumerate(kind): + if index and char.isupper() and not kind[index - 1].isupper(): + normalized.append("_") + normalized.append(char) + return "".join(normalized).upper() + + @classmethod + def _is_kind(cls, event: object, *needles: str) -> bool: + kind = cls._event_kind(event) + return any(needle in kind for needle in needles) + + @classmethod + def _is_text_event(cls, event: object) -> bool: + return hasattr(event, "text") or cls._is_kind(event, "TEXT_CHUNK") + + @classmethod + def _is_end_event(cls, event: object) -> bool: + kind = cls._event_kind(event) + return kind == "END" or kind.endswith("_END") + + @classmethod + def _is_send_file_event(cls, event: object) -> bool: + return "SEND_FILE" in cls._event_kind(event) + + async def _publish_event(self, event: object, *, queue_event: object | None = None) -> None: + if self.callback: + self.callback(event) + if self._current_queue: + await self._current_queue.put(queue_event if queue_event is not None else event) + + async def _publish_error(self, event: object) -> None: + if self.callback: + self.callback(event) + if self._current_queue and hasattr(event, "code") and hasattr(event, "details"): + await self._current_queue.put(AgentException(getattr(event, "code"), getattr(event, "details"))) + async def _listen(self): try: async for msg in self._ws: @@ -93,7 +142,7 @@ class AgentApiWrapper(AgentApi): try: outgoing_msg = ServerMessage.validate_json(msg.data) - if isinstance(outgoing_msg, MsgEventTextChunk): + if self._is_text_event(outgoing_msg): if self._current_queue: await self._current_queue.put(outgoing_msg) elif self.callback: @@ -101,29 +150,22 @@ class AgentApiWrapper(AgentApi): else: logger.warning("[%s] AgentEvent without active request", self.id) - elif isinstance(outgoing_msg, MsgEventEnd): + elif self._is_end_event(outgoing_msg): self.last_tokens_used = outgoing_msg.tokens_used - if self._current_queue: - await self._current_queue.put(outgoing_msg) + await self._publish_event(outgoing_msg) - elif isinstance(outgoing_msg, MsgError): - if self.callback: - self.callback(outgoing_msg) + elif self._is_kind(outgoing_msg, "ERROR"): error = AgentException(outgoing_msg.code, outgoing_msg.details) logger.error("[%s] Agent error: %s", self.id, error) - if self._current_queue: - await self._current_queue.put(error) + await self._publish_error(outgoing_msg) - elif isinstance(outgoing_msg, MsgGracefulDisconnect): - if self.callback: - self.callback(outgoing_msg) + elif self._is_kind(outgoing_msg, "GRACEFUL_DISCONNECT"): + await self._publish_event(outgoing_msg) logger.info("[%s] Gracefully disconnecting", self.id) break else: - logger.warning("[%s] Unknown message type: %s", self.id, outgoing_msg.type) - if self.callback: - self.callback(outgoing_msg) + await self._publish_event(outgoing_msg) except Exception as exc: logger.error("[%s] Failed to deserialize message: %s", self.id, exc) diff --git a/sdk/interface.py b/sdk/interface.py index e1ff12e..c885867 100644 --- a/sdk/interface.py +++ b/sdk/interface.py @@ -4,7 +4,7 @@ from __future__ import annotations from datetime import datetime from typing import Any, AsyncIterator, Literal, Protocol -from pydantic import BaseModel +from pydantic import BaseModel, Field class User(BaseModel): @@ -17,10 +17,11 @@ class User(BaseModel): class Attachment(BaseModel): - url: str - mime_type: str + url: str | None = None + mime_type: str | None = None size: int | None = None filename: str | None = None + workspace_path: str | None = None class MessageResponse(BaseModel): @@ -28,6 +29,7 @@ class MessageResponse(BaseModel): response: str tokens_used: int finished: bool + attachments: list[Attachment] = Field(default_factory=list) class MessageChunk(BaseModel): diff --git a/sdk/real.py b/sdk/real.py index f6e40ed..71803f4 100644 --- a/sdk/real.py +++ b/sdk/real.py @@ -1,6 +1,8 @@ from __future__ import annotations import asyncio +import inspect +from pathlib import Path from typing import AsyncIterator from sdk.agent_api_wrapper import AgentApiWrapper @@ -71,21 +73,43 @@ class RealPlatformClient(PlatformClient): ) -> MessageResponse: response_parts: list[str] = [] tokens_used = 0 + sent_attachments: list[Attachment] = [] message_id = user_id + saw_end_event = False - async for chunk in self.stream_message(user_id, chat_id, text, attachments=attachments): - message_id = chunk.message_id - if chunk.delta: - response_parts.append(chunk.delta) - if chunk.finished: - tokens_used = chunk.tokens_used + lock = self._get_chat_send_lock(chat_id) + async with lock: + chat_api = await self._get_chat_api(chat_id) + if hasattr(chat_api, "last_tokens_used"): + chat_api.last_tokens_used = 0 - return MessageResponse( - message_id=message_id, - response="".join(response_parts), - tokens_used=tokens_used, - finished=True, - ) + async for event in self._stream_agent_events(chat_api, text, attachments=attachments): + message_id = user_id + if self._is_text_event(event): + chunk_text = getattr(event, "text", "") + if chunk_text: + response_parts.append(chunk_text) + elif self._is_end_event(event): + tokens_used = getattr(event, "tokens_used", tokens_used) + saw_end_event = True + elif self._is_send_file_event(event): + attachment = self._attachment_from_send_file_event(event) + if attachment is not None: + sent_attachments.append(attachment) + + if not saw_end_event: + tokens_used = getattr(chat_api, "last_tokens_used", tokens_used) + await self._prototype_state.set_last_tokens_used(str(chat_id), tokens_used) + + response_kwargs = { + "message_id": message_id, + "response": "".join(response_parts), + "tokens_used": tokens_used, + "finished": True, + } + if self._message_response_accepts_attachments(): + response_kwargs["attachments"] = sent_attachments + return MessageResponse(**response_kwargs) async def stream_message( self, @@ -99,20 +123,37 @@ class RealPlatformClient(PlatformClient): chat_api = await self._get_chat_api(chat_id) if hasattr(chat_api, "last_tokens_used"): chat_api.last_tokens_used = 0 - async for event in chat_api.send_message(text): + saw_end_event = False + async for event in self._stream_agent_events(chat_api, text, attachments=attachments): + if self._is_text_event(event): + yield MessageChunk( + message_id=user_id, + delta=getattr(event, "text", ""), + finished=False, + ) + elif self._is_end_event(event): + tokens_used = getattr(event, "tokens_used", 0) + saw_end_event = True + await self._prototype_state.set_last_tokens_used(str(chat_id), tokens_used) + yield MessageChunk( + message_id=user_id, + delta="", + finished=True, + tokens_used=tokens_used, + ) + elif self._is_send_file_event(event): + continue + else: + continue + if not saw_end_event: + tokens_used = getattr(chat_api, "last_tokens_used", 0) + await self._prototype_state.set_last_tokens_used(str(chat_id), tokens_used) yield MessageChunk( message_id=user_id, - delta=event.text, - finished=False, + delta="", + finished=True, + tokens_used=tokens_used, ) - tokens_used = getattr(chat_api, "last_tokens_used", 0) - await self._prototype_state.set_last_tokens_used(str(chat_id), tokens_used) - yield MessageChunk( - message_id=user_id, - delta="", - finished=True, - tokens_used=tokens_used, - ) async def get_settings(self, user_id: str) -> UserSettings: return await self._prototype_state.get_settings(user_id) @@ -140,3 +181,107 @@ class RealPlatformClient(PlatformClient): close = getattr(self._agent_api, "close", None) if callable(close): await close() + + async def _stream_agent_events( + self, + chat_api, + text: str, + attachments: list[Attachment] | None = None, + ) -> AsyncIterator[object]: + send_message = chat_api.send_message + attachment_paths = self._attachment_paths(attachments) + if attachment_paths and self._send_message_accepts_attachments(send_message): + event_stream = send_message(text, attachments=attachment_paths) + else: + event_stream = send_message(text) + async for event in event_stream: + yield event + + @staticmethod + def _attachment_paths(attachments: list[Attachment] | None) -> list[str]: + if not attachments: + return [] + paths = [] + for attachment in attachments: + if attachment.workspace_path: + paths.append(attachment.workspace_path) + return paths + + @staticmethod + def _send_message_accepts_attachments(send_message) -> bool: + try: + parameters = inspect.signature(send_message).parameters + except (TypeError, ValueError): + return False + return "attachments" in parameters or any( + parameter.kind == inspect.Parameter.VAR_KEYWORD for parameter in parameters.values() + ) + + @staticmethod + def _event_kind(event: object) -> str: + raw_kind = getattr(event, "type", None) + if hasattr(raw_kind, "value"): + raw_kind = raw_kind.value + if raw_kind is None: + raw_kind = event.__class__.__name__ + + kind = str(raw_kind).replace("-", "_") + if "_" in kind: + return kind.upper() + normalized = [] + for index, char in enumerate(kind): + if index and char.isupper() and not kind[index - 1].isupper(): + normalized.append("_") + normalized.append(char) + return "".join(normalized).upper() + + @classmethod + def _is_text_event(cls, event: object) -> bool: + return hasattr(event, "text") or "TEXT_CHUNK" in cls._event_kind(event) + + @classmethod + def _is_end_event(cls, event: object) -> bool: + kind = cls._event_kind(event) + return kind == "END" or kind.endswith("_END") + + @classmethod + def _is_send_file_event(cls, event: object) -> bool: + kind = cls._event_kind(event) + return "SEND_FILE" in kind + + @staticmethod + def _attachment_from_send_file_event(event: object) -> Attachment | None: + location = None + for attr in ("url", "workspace_path", "path", "file_path", "uri"): + value = getattr(event, attr, None) + if value: + location = str(value) + break + if location is None: + return None + + mime_type = getattr(event, "mime_type", None) or "application/octet-stream" + filename = getattr(event, "filename", None) or Path(location).name or None + size = getattr(event, "size", None) + workspace_path = location + if workspace_path.startswith("/workspace/"): + workspace_path = workspace_path[len("/workspace/"):] + elif workspace_path == "/workspace": + workspace_path = "" + return Attachment( + url=location, + mime_type=mime_type, + size=size, + filename=filename, + workspace_path=workspace_path or None, + ) + + @staticmethod + def _message_response_accepts_attachments() -> bool: + fields = getattr(MessageResponse, "model_fields", None) + if isinstance(fields, dict): + return "attachments" in fields + try: + return "attachments" in inspect.signature(MessageResponse).parameters + except (TypeError, ValueError): + return False diff --git a/tests/adapter/matrix/test_converter.py b/tests/adapter/matrix/test_converter.py index a6b75fb..3513913 100644 --- a/tests/adapter/matrix/test_converter.py +++ b/tests/adapter/matrix/test_converter.py @@ -53,6 +53,24 @@ def content_file_event(): ) +def source_only_content_file_event(): + return SimpleNamespace( + sender="@a:m.org", + body="doc.pdf", + event_id="$e5", + msgtype=None, + replyto_event_id=None, + source={ + "content": { + "msgtype": "m.file", + "body": "source-only.pdf", + "url": "mxc://x/source-only", + "info": {"mimetype": "application/pdf"}, + } + }, + ) + + def test_plain_text_to_incoming_message(): result = from_room_event(text_event("Hello"), room_id="!r:m.org", chat_id="C1") assert isinstance(result, IncomingMessage) @@ -147,5 +165,15 @@ def test_attachment_falls_back_to_content_payload(): assert a.mime_type == "application/pdf" +def test_attachment_falls_back_to_source_content_payload(): + result = from_room_event(source_only_content_file_event(), room_id="!r:m.org", chat_id="C1") + assert isinstance(result, IncomingMessage) + a = result.attachments[0] + assert a.type == "document" + assert a.url == "mxc://x/source-only" + assert a.filename == "source-only.pdf" + assert a.mime_type == "application/pdf" + + def test_converter_module_does_not_expose_reaction_callbacks(): assert not hasattr(converter, "from_reaction") diff --git a/tests/adapter/matrix/test_dispatcher.py b/tests/adapter/matrix/test_dispatcher.py index b50dfe0..e2cae34 100644 --- a/tests/adapter/matrix/test_dispatcher.py +++ b/tests/adapter/matrix/test_dispatcher.py @@ -5,6 +5,13 @@ from types import SimpleNamespace from unittest.mock import AsyncMock import pytest +from nio import ( + RoomMessageAudio, + RoomMessageFile, + RoomMessageImage, + RoomMessageText, + RoomMessageVideo, +) from nio.api import RoomVisibility from nio.responses import SyncResponse @@ -332,7 +339,7 @@ async def test_bot_downloads_matrix_file_to_workspace_before_staging(tmp_path, m staged = await get_staged_attachments(runtime.store, "!chat1:example.org", "@alice:example.org") assert staged[0]["workspace_path"] is not None assert (tmp_path / staged[0]["workspace_path"]).read_bytes() == b"%PDF-1.7" - bot._send_all.assert_awaited_once() + bot._send_all.assert_not_awaited() async def test_file_only_event_is_staged_and_does_not_dispatch(): @@ -371,10 +378,7 @@ async def test_file_only_event_is_staged_and_does_not_dispatch(): runtime.dispatcher.dispatch.assert_not_awaited() staged = await get_staged_attachments(runtime.store, "!r:example.org", "@alice:example.org") assert [item["filename"] for item in staged] == ["report.pdf"] - client.room_send.assert_awaited_once() - assert ( - "Следующее сообщение отправит файлы агенту." in client.room_send.await_args.args[2]["body"] - ) + client.room_send.assert_not_awaited() async def test_list_command_returns_current_staged_attachments(): @@ -963,3 +967,43 @@ async def test_matrix_main_closes_platform_without_connecting_root_agent(monkeyp agent_connect.assert_not_awaited() platform_close.assert_awaited_once() + + +async def test_matrix_main_registers_media_message_callbacks(monkeypatch): + bot_module = importlib.import_module("adapter.matrix.bot") + + runtime = SimpleNamespace(platform=SimpleNamespace(close=AsyncMock())) + created_clients = [] + + class FakeAsyncClient: + def __init__(self, *args, **kwargs): + self.access_token = None + self.callbacks = [] + self.sync_forever = AsyncMock() + self.close = AsyncMock() + created_clients.append(self) + + async def login(self, *args, **kwargs): + raise AssertionError("login should not be called when access token is provided") + + def add_event_callback(self, callback, event_type): + self.callbacks.append((callback, event_type)) + + monkeypatch.setenv("MATRIX_HOMESERVER", "https://matrix.example.org") + monkeypatch.setenv("MATRIX_USER_ID", "@bot:example.org") + monkeypatch.setenv("MATRIX_ACCESS_TOKEN", "token") + monkeypatch.setattr(bot_module, "AsyncClient", FakeAsyncClient) + monkeypatch.setattr(bot_module, "build_runtime", lambda **kwargs: runtime) + monkeypatch.setattr(bot_module, "prepare_live_sync", AsyncMock(return_value="s123")) + + await bot_module.main() + + assert len(created_clients) == 1 + registered_types = [event_type for _, event_type in created_clients[0].callbacks] + assert ( + RoomMessageText, + RoomMessageFile, + RoomMessageImage, + RoomMessageVideo, + RoomMessageAudio, + ) in registered_types diff --git a/tests/adapter/matrix/test_files.py b/tests/adapter/matrix/test_files.py new file mode 100644 index 0000000..831ca72 --- /dev/null +++ b/tests/adapter/matrix/test_files.py @@ -0,0 +1,50 @@ +from __future__ import annotations + +from pathlib import Path +from types import SimpleNamespace + +from adapter.matrix.files import build_workspace_attachment_path, download_matrix_attachment +from core.protocol import Attachment + + +def test_build_workspace_attachment_path_scopes_by_surface_user_and_room(tmp_path: Path): + rel_path, abs_path = build_workspace_attachment_path( + workspace_root=tmp_path, + matrix_user_id="@alice:example.org", + room_id="!room:example.org", + filename="report.pdf", + timestamp="20260420-153000", + ) + + assert ( + rel_path + == "surfaces/matrix/alice_example.org/room_example.org/inbox/20260420-153000-report.pdf" + ) + assert abs_path == tmp_path / rel_path + + +async def test_download_matrix_attachment_persists_file_and_returns_workspace_path(tmp_path: Path): + async def download(url: str): + assert url == "mxc://server/id" + return SimpleNamespace(body=b"%PDF-1.7") + + client = SimpleNamespace(download=download) + attachment = Attachment( + type="document", + url="mxc://server/id", + filename="report.pdf", + mime_type="application/pdf", + ) + + saved = await download_matrix_attachment( + client=client, + workspace_root=tmp_path, + matrix_user_id="@alice:example.org", + room_id="!room:example.org", + attachment=attachment, + timestamp="20260420-153000", + ) + + assert saved.workspace_path is not None + assert saved.workspace_path.endswith("20260420-153000-report.pdf") + assert (tmp_path / saved.workspace_path).read_bytes() == b"%PDF-1.7" diff --git a/tests/adapter/matrix/test_send_outgoing.py b/tests/adapter/matrix/test_send_outgoing.py index 17eeefa..72b9fa6 100644 --- a/tests/adapter/matrix/test_send_outgoing.py +++ b/tests/adapter/matrix/test_send_outgoing.py @@ -9,7 +9,7 @@ from adapter.matrix.handlers.confirm import make_handle_cancel, make_handle_conf from adapter.matrix.store import get_pending_confirm, set_room_meta from core.auth import AuthManager from core.chat import ChatManager -from core.protocol import OutgoingUI, UIButton +from core.protocol import Attachment, OutgoingMessage, OutgoingUI, UIButton from core.settings import SettingsManager from core.store import InMemoryStore from sdk.mock import MockPlatformClient @@ -156,3 +156,39 @@ async def test_outgoing_ui_no_round_trip_uses_user_and_room_scope(): assert "отменено" in result[0].text.lower() assert await get_pending_confirm(store, "@alice:example.org", "!confirm:example.org") is None assert await get_pending_confirm(store, "@bob:example.org", "!other:example.org") is not None + + +async def test_send_outgoing_uploads_workspace_file_attachment(tmp_path, monkeypatch): + workspace_file = tmp_path / "surfaces" / "matrix" / "alice" / "room" / "inbox" / "result.txt" + workspace_file.parent.mkdir(parents=True, exist_ok=True) + workspace_file.write_text("ready") + monkeypatch.setenv("SURFACES_WORKSPACE_DIR", str(tmp_path)) + + client = SimpleNamespace( + upload=AsyncMock(return_value=(SimpleNamespace(content_uri="mxc://server/file"), {})), + room_send=AsyncMock(), + ) + + await send_outgoing( + client, + "!room:example.org", + OutgoingMessage( + chat_id="!room:example.org", + text="Файл готов", + attachments=[ + Attachment( + type="document", + filename="result.txt", + mime_type="text/plain", + workspace_path="surfaces/matrix/alice/room/inbox/result.txt", + ) + ], + ), + ) + + client.upload.assert_awaited_once() + client.room_send.assert_awaited() + assert client.room_send.await_args_list[0].args[2]["body"] == "Файл готов" + file_call = client.room_send.await_args_list[1] + assert file_call.args[2]["msgtype"] == "m.file" + assert file_call.args[2]["url"] == "mxc://server/file" diff --git a/tests/core/test_dispatcher.py b/tests/core/test_dispatcher.py index eb437d2..fad2a4f 100644 --- a/tests/core/test_dispatcher.py +++ b/tests/core/test_dispatcher.py @@ -75,6 +75,27 @@ async def test_dispatch_routes_audio_before_catchall(dispatcher): assert (await dispatcher.dispatch(text_msg))[0].text == "text" +async def test_dispatch_routes_document_before_catchall(dispatcher): + async def document_handler(event, **kwargs): + return [OutgoingMessage(chat_id=event.chat_id, text="document")] + + async def catch_all(event, **kwargs): + return [OutgoingMessage(chat_id=event.chat_id, text="text")] + + dispatcher.register(IncomingMessage, "document", document_handler) + dispatcher.register(IncomingMessage, "*", catch_all) + + document_msg = IncomingMessage( + user_id="u1", + platform="matrix", + chat_id="C1", + text="", + attachments=[Attachment(type="document", workspace_path="surfaces/matrix/u1/file.pdf")], + ) + + assert (await dispatcher.dispatch(document_msg))[0].text == "document" + + async def test_dispatch_callback_by_action(dispatcher): async def confirm_handler(event, **kwargs): return [OutgoingMessage(chat_id=event.chat_id, text="confirmed")] diff --git a/tests/core/test_integration.py b/tests/core/test_integration.py index ab8fc8c..fd7bd2e 100644 --- a/tests/core/test_integration.py +++ b/tests/core/test_integration.py @@ -23,11 +23,11 @@ from core.protocol import ( class FakeAgentApi: def __init__(self) -> None: - self.calls: list[str] = [] + self.calls: list[tuple[str, list[str]]] = [] self.last_tokens_used = 0 - async def send_message(self, text: str): - self.calls.append(text) + async def send_message(self, text: str, attachments: list[str] | None = None): + self.calls.append((text, attachments or [])) yield type("Chunk", (), {"text": f"[REAL] {text}"})() self.last_tokens_used = 5 @@ -130,4 +130,31 @@ async def test_full_flow_with_real_platform_uses_shared_agent_api(real_dispatche texts = [r.text for r in result if isinstance(r, OutgoingMessage)] assert texts == ["[REAL] Привет!"] - assert agent_api.calls == ["Привет!"] + assert agent_api.calls == [("Привет!", [])] + + +async def test_full_flow_with_real_platform_forwards_workspace_attachment(real_dispatcher): + dispatcher, agent_api = real_dispatcher + + start = IncomingCommand(user_id="u1", platform="matrix", chat_id="C1", command="start") + await dispatcher.dispatch(start) + + msg = IncomingMessage( + user_id="u1", + platform="matrix", + chat_id="C1", + text="Посмотри файл", + attachments=[ + Attachment( + type="document", + filename="report.pdf", + mime_type="application/pdf", + workspace_path="surfaces/matrix/u1/room/inbox/report.pdf", + ) + ], + ) + await dispatcher.dispatch(msg) + + assert agent_api.calls == [ + ("Посмотри файл", ["surfaces/matrix/u1/room/inbox/report.pdf"]) + ] diff --git a/tests/platform/test_real.py b/tests/platform/test_real.py index 6edecbd..e5f01e4 100644 --- a/tests/platform/test_real.py +++ b/tests/platform/test_real.py @@ -5,7 +5,7 @@ import pytest from core.protocol import SettingsAction import sdk.agent_api_wrapper as agent_api_wrapper_module from sdk.agent_api_wrapper import AgentApiWrapper -from sdk.interface import MessageChunk, MessageResponse, UserSettings +from sdk.interface import Attachment, MessageChunk, MessageResponse, UserSettings from sdk.prototype_state import PrototypeStateStore from sdk.real import RealPlatformClient @@ -90,6 +90,100 @@ class BlockingChatAgentApi: self.last_tokens_used = len(text) +class AttachmentTrackingChatAgentApi: + def __init__(self, chat_id: str) -> None: + self.chat_id = chat_id + self.calls: list[tuple[str, list[str] | None]] = [] + self.connect_calls = 0 + self.close_calls = 0 + self.last_tokens_used = 0 + + async def connect(self) -> None: + self.connect_calls += 1 + + async def close(self) -> None: + self.close_calls += 1 + + async def send_message(self, text: str, attachments: list[str] | None = None): + self.calls.append((text, attachments)) + yield FakeChunk(text) + self.last_tokens_used = 5 + + +class SendFileEvent: + def __init__(self, *, workspace_path: str, mime_type: str, filename: str, size: int) -> None: + self.type = "AGENT_EVENT_SEND_FILE" + self.workspace_path = workspace_path + self.mime_type = mime_type + self.filename = filename + self.size = size + + +class TextChunkEvent: + def __init__(self, text: str) -> None: + self.type = "AGENT_EVENT_TEXT_CHUNK" + self.text = text + + +class ToolCallChunkEvent: + def __init__(self, payload: str) -> None: + self.type = "AGENT_EVENT_TOOL_CALL_CHUNK" + self.payload = payload + + +class ToolResultEvent: + def __init__(self, payload: str) -> None: + self.type = "AGENT_EVENT_TOOL_RESULT" + self.payload = payload + + +class CustomUpdateEvent: + def __init__(self, payload: str) -> None: + self.type = "AGENT_EVENT_CUSTOM_UPDATE" + self.payload = payload + + +class EndEvent: + def __init__(self, tokens_used: int) -> None: + self.type = "AGENT_EVENT_END" + self.tokens_used = tokens_used + + +class ErrorEvent: + def __init__(self, code: str, details: str) -> None: + self.type = "ERROR" + self.code = code + self.details = details + + +class GracefulDisconnectEvent: + def __init__(self) -> None: + self.type = "GRACEFUL_DISCONNECT" + + +class FakeWSMessage: + def __init__(self, data: str) -> None: + self.type = agent_api_wrapper_module.aiohttp.WSMsgType.TEXT + self.data = data + + +class FakeWebSocket: + def __init__(self, messages: list[FakeWSMessage]) -> None: + self._messages = list(messages) + + def __aiter__(self): + return self + + async def __anext__(self): + if not self._messages: + raise StopAsyncIteration + return self._messages.pop(0) + + +class MessageResponseWithAttachments(MessageResponse): + attachments: list[Attachment] = [] + + def test_agent_api_wrapper_uses_modern_constructor_when_available(monkeypatch): calls: list[dict[str, object]] = [] @@ -219,6 +313,76 @@ async def test_real_platform_client_send_message_uses_chat_bound_client(): assert await prototype_state.get_last_tokens_used_for_context("chat-7") == 3 +@pytest.mark.asyncio +async def test_real_platform_client_forwards_attachments_to_chat_api(): + agent_api = AttachmentTrackingChatAgentApi("chat-7") + client = RealPlatformClient( + agent_api=agent_api, + prototype_state=PrototypeStateStore(), + platform="matrix", + ) + attachment = Attachment( + workspace_path="surfaces/matrix/alice/room/inbox/report.pdf", + mime_type="application/pdf", + filename="report.pdf", + size=123, + ) + + result = await client.send_message( + "@alice:example.org", + "chat-7", + "hello", + attachments=[attachment], + ) + + assert agent_api.calls == [("hello", ["surfaces/matrix/alice/room/inbox/report.pdf"])] + assert result.response == "hello" + assert result.tokens_used == 5 + + +@pytest.mark.asyncio +async def test_real_platform_client_preserves_send_file_events_in_sync_result(monkeypatch): + agent_api = AttachmentTrackingChatAgentApi("chat-7") + client = RealPlatformClient( + agent_api=agent_api, + prototype_state=PrototypeStateStore(), + platform="matrix", + ) + + class FileEventAgentApi(AttachmentTrackingChatAgentApi): + async def send_message(self, text: str, attachments: list[str] | None = None): + self.calls.append((text, attachments)) + yield TextChunkEvent("he") + yield SendFileEvent( + workspace_path="/workspace/report.pdf", + mime_type="application/pdf", + filename="report.pdf", + size=123, + ) + yield TextChunkEvent("llo") + self.last_tokens_used = 9 + + monkeypatch.setattr( + "sdk.real.MessageResponse", + MessageResponseWithAttachments, + ) + client._agent_api = FileEventAgentApi("chat-7") + + result = await client.send_message("@alice:example.org", "chat-7", "hello") + + assert result.response == "hello" + assert result.tokens_used == 9 + assert result.attachments == [ + Attachment( + url="/workspace/report.pdf", + mime_type="application/pdf", + filename="report.pdf", + size=123, + workspace_path="report.pdf", + ) + ] + + @pytest.mark.asyncio async def test_real_platform_client_works_with_legacy_agent_api_without_for_chat(): legacy_api = LegacyAgentApi() @@ -385,3 +549,85 @@ async def test_real_platform_client_settings_are_local(): assert isinstance(settings, UserSettings) assert settings.skills["browser"] is True assert settings.skills["web-search"] is True + + +@pytest.mark.asyncio +async def test_agent_api_wrapper_transparently_surfaces_modern_events(monkeypatch): + callback_events: list[object] = [] + queue: asyncio.Queue = asyncio.Queue() + event_map = { + "text": TextChunkEvent("he"), + "tool_call": ToolCallChunkEvent("call"), + "tool_result": ToolResultEvent("result"), + "custom_update": CustomUpdateEvent("update"), + "send_file": SendFileEvent( + workspace_path="/workspace/report.pdf", + mime_type="application/pdf", + filename="report.pdf", + size=123, + ), + "end": EndEvent(tokens_used=11), + "error": ErrorEvent(code="BOOM", details="bad things"), + "disconnect": GracefulDisconnectEvent(), + } + + def fake_validate_json(data: str): + return event_map[data] + + monkeypatch.setattr( + agent_api_wrapper_module, + "ServerMessage", + type("FakeServerMessage", (), {"validate_json": staticmethod(fake_validate_json)}), + ) + + async def fake_cleanup(self): + return None + + monkeypatch.setattr(agent_api_wrapper_module.AgentApiWrapper, "_cleanup", fake_cleanup) + monkeypatch.setattr( + agent_api_wrapper_module.AgentApi, + "__init__", + lambda self, agent_id, base_url=None, chat_id=0, **kwargs: setattr(self, "id", agent_id) + or setattr(self, "callback", kwargs.get("callback")) + or setattr(self, "on_disconnect", kwargs.get("on_disconnect")) + or setattr(self, "_current_queue", None), + ) + + wrapper = AgentApiWrapper( + agent_id="agent-1", + base_url="https://agent.example.com/v1/agent_ws", + chat_id="chat-1", + callback=callback_events.append, + ) + wrapper._current_queue = queue + wrapper._ws = FakeWebSocket( + [ + FakeWSMessage("text"), + FakeWSMessage("tool_call"), + FakeWSMessage("tool_result"), + FakeWSMessage("custom_update"), + FakeWSMessage("send_file"), + FakeWSMessage("end"), + FakeWSMessage("error"), + FakeWSMessage("disconnect"), + ] + ) + + await wrapper._listen() + + queue_events = [] + while not queue.empty(): + queue_events.append(await queue.get()) + + assert queue_events[0].text == "he" + assert any(isinstance(event, SendFileEvent) for event in queue_events) + assert any(isinstance(event, EndEvent) for event in queue_events) + assert any(isinstance(event, GracefulDisconnectEvent) for event in queue_events) + assert callback_events[0].payload == "call" + assert callback_events[1].payload == "result" + assert callback_events[2].payload == "update" + assert any(isinstance(event, SendFileEvent) for event in callback_events) + assert any(isinstance(event, EndEvent) for event in callback_events) + assert any(isinstance(event, ErrorEvent) for event in callback_events) + assert any(isinstance(event, GracefulDisconnectEvent) for event in callback_events) + assert wrapper.last_tokens_used == 11