Compare commits

...

7 commits

22 changed files with 2316 additions and 118 deletions

View file

@ -5,13 +5,16 @@ TELEGRAM_BOT_TOKEN=your_bot_token_here
MATRIX_HOMESERVER=https://matrix.org
MATRIX_USER_ID=@bot:matrix.org
MATRIX_PASSWORD=your_password_here
# Lambda Platform
LAMBDA_PLATFORM_URL=http://localhost:8000
LAMBDA_SERVICE_TOKEN=your_service_token_here
AGENT_WS_URL=ws://127.0.0.1:8000/agent_ws/
AGENT_BASE_URL=http://127.0.0.1:8000
MATRIX_PLATFORM_BACKEND=real
# Режим работы: "mock" или "production"
PLATFORM_MODE=mock
# Shared workspace contract
SURFACES_WORKSPACE_DIR=/workspace
# Compose-local platform-agent route
AGENT_WS_URL=ws://platform-agent:8000/v1/agent_ws/{chat_id}/
AGENT_BASE_URL=http://platform-agent:8000
# platform-agent provider
PROVIDER_MODEL=openai/gpt-4o-mini
PROVIDER_URL=https://openrouter.ai/api/v1
PROVIDER_API_KEY=sk-or-...

View file

@ -7,7 +7,7 @@
| Поверхность | Статус |
|---|---|
| Telegram | 🔨 В разработке, отдельный worktree `feat/telegram-adapter` |
| Matrix | ✅ Рабочий прототип, подключается к реальному агенту |
| Matrix | ✅ Рабочий прототип, запускается через root `docker compose` вместе с `platform-agent` |
---
@ -69,8 +69,8 @@ surfaces-bot/
- **Диалог** — сообщения, вложения, подтверждения `!yes` / `!no` и routing через `EventDispatcher`
- **Стабильность** — перед `sync_forever()` бот делает bootstrap sync и стартует с `since`, чтобы не переигрывать старую timeline после рестарта
- **Текущее ограничение** — encrypted DM пока не поддержан; ручное тестирование Matrix ведётся в незашифрованных комнатах и зависит от локального state-store бота
- **Backend selection**`MATRIX_PLATFORM_BACKEND=mock` остаётся значением по умолчанию; `MATRIX_PLATFORM_BACKEND=real` требует `AGENT_WS_URL=ws://host:port/agent_ws/`
- **Ограничения real backend**пока это текстовый direct-agent прототип без вложений и без асинхронных callbacks; локальные настройки и user-state хранятся в `PrototypeStateStore`
- **Backend selection**`MATRIX_PLATFORM_BACKEND=mock` остаётся значением по умолчанию; `MATRIX_PLATFORM_BACKEND=real` использует `platform-agent` из compose и WebSocket contract `/v1/agent_ws/{chat_id}/`
- **Ограничения real backend**локальный runtime использует shared `/workspace`, а файлы передаются как относительные пути в `attachments`
---
@ -90,6 +90,7 @@ class PlatformClient(Protocol):
Бот передаёт `user_id` + `chat_id` + сообщение; платформа сама решает нужно ли поднять контейнер.
Сейчас: `MockPlatformClient` в `sdk/mock.py`, а Matrix real backend собирается через `sdk/real.py` при `MATRIX_PLATFORM_BACKEND=real`.
Файловый контракт уже path-based: бот пишет файлы в shared `/workspace` и передаёт платформе относительные пути в `attachments`.
Когда SDK готов: добавляем `SdkPlatformClient`, меняем одну строку в DI. Адаптеры и ядро не трогаем.
---
@ -120,32 +121,40 @@ MATRIX_PASSWORD=... # или MATRIX_ACCESS_TOKEN=...
# Выбор backend: mock (по умолчанию) или real (подключение к platform-agent)
MATRIX_PLATFORM_BACKEND=real
# URL WebSocket endpoint platform-agent (только при MATRIX_PLATFORM_BACKEND=real)
AGENT_WS_URL=ws://127.0.0.1:8000/agent_ws/
AGENT_BASE_URL=http://127.0.0.1:8000
# compose runtime: platform-agent service name + shared /workspace
AGENT_WS_URL=ws://platform-agent:8000/v1/agent_ws/
AGENT_BASE_URL=http://platform-agent:8000
SURFACES_WORKSPACE_DIR=/workspace
```
### 3. Запуск platform-agent (для real backend)
### 3. Compose runtime
platform-agent — отдельный репозиторий, сейчас клонируется в `external/platform-agent`.
Root `docker-compose.yml` теперь является основным локальным runtime для Matrix и platform-agent.
Он поднимает `matrix-bot`, `platform-agent` и общий volume `/workspace`.
```bash
cd external/platform-agent
# Создать .env с параметрами LLM провайдера
cat > .env <<EOF
PROVIDER_MODEL=openai/gpt-4o-mini
PROVIDER_URL=https://openrouter.ai/api/v1
PROVIDER_API_KEY=sk-or-...
EOF
# Запустить
uv run uvicorn src.main:app --host 127.0.0.1 --port 8000
docker compose up --build
```
Проверить что работает: `curl http://127.0.0.1:8000/agent_ws/` должен вернуть ответ об апгрейде до WebSocket.
Compose собирает `platform-agent` из актуального upstream `external/platform-agent` Dockerfile (`development` target),
монтирует live-код из `external/platform-agent/src` и `external/platform-agent_api`, и подготавливает shared `/workspace`
с правами для agent runtime.
Matrix бот подключается к `platform-agent` по service name, а не к отдельно запущенному `localhost`.
### 4. Запуск бота
### 4.1. Staged attachments в Matrix
Если Matrix-клиент отправляет файлы отдельными media events, бот не вызывает агента сразу.
Вместо этого он сохраняет файлы в shared `/workspace`, ставит их в очередь для конкретного чата и пользователя, и ждёт следующего обычного сообщения.
Команды:
- `!list` — показать staged вложения
- `!remove <n>` — удалить вложение по номеру
- `!remove all` — очистить все staged вложения
Следующее обычное сообщение пользователя уходит агенту вместе со всеми staged файлами.
### 4. Запуск бота вручную
```bash
# Первый запуск или сброс состояния
@ -184,6 +193,7 @@ PYTHONPATH=. uv run python -m adapter.matrix.bot
| Состояние контекста | `!context` | Текущая сессия и список сохранений |
| Справка | `!help` | |
| Подтверждения | `!yes` / `!no` | Для опасных действий |
| Staged вложения | `!list`, `!remove <n>`, `!remove all` | Файлы без текстовой инструкции ставятся в очередь до следующего сообщения |
### Не работает — блокеры на стороне platform-agent
@ -192,7 +202,6 @@ PYTHONPATH=. uv run python -m adapter.matrix.bot
| `!load` в другом чате | platform-agent использует `StateBackend` — файлы живут в памяти отдельно для каждого `thread_id`. Файл, сохранённый в чате A, не виден в чате B. Фикс: переключить platform-agent на `FilesystemBackend` с общим хранилищем. |
| Счётчик токенов в `!context` | platform-agent отдаёт `tokens_used=0` хардкодом в `MsgEventEnd`. Наш код перехватывает значение корректно. |
| `!reset` | platform-agent не имеет endpoint `/reset`. Задокументировано в ТЗ к платформе. |
| Файловые вложения | Нет API загрузки файлов в область видимости агента. ТЗ передано платформе. |
| Персистентность между рестартами | platform-agent использует `MemorySaver` (in-memory). Все разговоры теряются при рестарте процесса. |
| E2EE комнаты | `python-olm` не собирается на macOS/ARM. Ограничение инфраструктуры. |
@ -201,7 +210,7 @@ PYTHONPATH=. uv run python -m adapter.matrix.bot
| Функция | Статус |
|---|---|
| `!settings`, `!skills`, `!soul`, `!safety` | Заглушки MVP. Требуют готового SDK платформы. |
| Вложения (изображения, документы) | Только текстовые сообщения в текущем MVP. |
| Вложения без текстовой инструкции | Поддержан staged UX только для Matrix. Для других поверхностей ещё не перенесено. |
---

View file

@ -6,37 +6,54 @@ from dataclasses import dataclass
from pathlib import Path
import structlog
from dotenv import load_dotenv
from nio import (
AsyncClient,
AsyncClientConfig,
InviteMemberEvent,
MatrixRoom,
RoomMemberEvent,
RoomMessage,
RoomMessageAudio,
RoomMessageFile,
RoomMessageImage,
RoomMessageText,
RoomMessageVideo,
)
from nio.responses import SyncResponse
from dotenv import load_dotenv
from adapter.matrix.converter import from_room_event
from adapter.matrix.files import (
download_matrix_attachment,
matrix_msgtype_for_attachment,
resolve_workspace_attachment_path,
)
from adapter.matrix.handlers import register_matrix_handlers
from adapter.matrix.handlers.auth import handle_invite, provision_workspace_chat
from adapter.matrix.handlers.context_commands import (
LOAD_PROMPT,
)
from adapter.matrix.handlers.auth import handle_invite, provision_workspace_chat
from adapter.matrix.room_router import resolve_chat_id
from adapter.matrix.store import (
add_staged_attachment,
clear_load_pending,
clear_staged_attachments,
get_load_pending,
get_room_meta,
get_staged_attachments,
remove_staged_attachment_at,
set_pending_confirm,
set_platform_chat_id,
set_room_meta,
set_pending_confirm,
)
from core.auth import AuthManager
from core.chat import ChatManager
from core.handler import EventDispatcher
from core.handlers import register_all
from core.protocol import (
Attachment,
IncomingCommand,
IncomingMessage,
OutgoingEvent,
OutgoingMessage,
OutgoingNotification,
@ -197,6 +214,38 @@ class MatrixBot:
incoming = from_room_event(event, room_id=room.room_id, chat_id=dispatch_chat_id)
if incoming is None:
return
if isinstance(incoming, IncomingCommand) and incoming.command in {
"matrix_list_attachments",
"matrix_remove_attachment",
}:
outgoing = await self._handle_staged_attachment_command(
room.room_id,
sender,
incoming,
)
await self._send_all(room.room_id, outgoing)
return
if self._is_file_only_event(event, incoming):
materialized = await self._materialize_incoming_attachments(
room.room_id,
sender,
incoming,
)
await self._stage_attachments(room.room_id, sender, materialized.attachments)
return
if isinstance(incoming, IncomingMessage) and incoming.attachments:
incoming = await self._materialize_incoming_attachments(
room.room_id,
sender,
incoming,
)
clear_staged_after_dispatch = False
if isinstance(incoming, IncomingMessage) and incoming.text:
incoming, clear_staged_after_dispatch = await self._merge_staged_attachments(
room.room_id,
sender,
incoming,
)
try:
outgoing = await self.runtime.dispatcher.dispatch(incoming)
except PlatformError as exc:
@ -210,11 +259,159 @@ class MatrixBot:
outgoing = [
OutgoingMessage(
chat_id=dispatch_chat_id,
text="Сервис временно недоступен. Попробуйте ещё раз позже."
text="Сервис временно недоступен. Попробуйте ещё раз позже.",
)
]
else:
if clear_staged_after_dispatch:
await clear_staged_attachments(self.runtime.store, room.room_id, sender)
await self._send_all(room.room_id, outgoing)
def _is_file_only_event(
self, event: RoomMessage, incoming: IncomingMessage | IncomingCommand
) -> bool:
return (
isinstance(incoming, IncomingMessage)
and bool(incoming.attachments)
and not isinstance(event, RoomMessageText)
)
async def _stage_attachments(
self,
room_id: str,
user_id: str,
attachments: list,
) -> None:
for attachment in attachments:
await add_staged_attachment(
self.runtime.store,
room_id,
user_id,
{
"type": attachment.type,
"url": attachment.url,
"filename": attachment.filename,
"mime_type": attachment.mime_type,
"workspace_path": attachment.workspace_path,
},
)
async def _format_staged_attachments(
self,
room_id: str,
user_id: str,
*,
include_hint: bool = False,
) -> str:
attachments = await get_staged_attachments(self.runtime.store, room_id, user_id)
if not attachments:
return "Нет сохраненных вложений."
lines = ["Вложения в очереди:"]
for index, attachment in enumerate(attachments, start=1):
lines.append(f"{index}. {attachment.get('filename') or 'attachment'}")
if include_hint:
lines.extend(
[
"",
"Следующее сообщение отправит файлы агенту.",
"Команды: !list, !remove <n>, !remove all",
]
)
return "\n".join(lines)
async def _handle_staged_attachment_command(
self,
room_id: str,
user_id: str,
incoming: IncomingCommand,
) -> list[OutgoingEvent]:
if incoming.command == "matrix_list_attachments":
return [
OutgoingMessage(
chat_id=incoming.chat_id,
text=await self._format_staged_attachments(room_id, user_id),
)
]
arg = incoming.args[0] if incoming.args else ""
if arg == "all":
await clear_staged_attachments(self.runtime.store, room_id, user_id)
return [OutgoingMessage(chat_id=incoming.chat_id, text="Все вложения удалены.")]
try:
index = int(arg) - 1
except ValueError:
return [OutgoingMessage(chat_id=incoming.chat_id, text="Нет такого вложения.")]
removed = await remove_staged_attachment_at(self.runtime.store, room_id, user_id, index)
if removed is None:
return [OutgoingMessage(chat_id=incoming.chat_id, text="Нет такого вложения.")]
return [
OutgoingMessage(
chat_id=incoming.chat_id,
text=await self._format_staged_attachments(room_id, user_id),
)
]
async def _merge_staged_attachments(
self,
room_id: str,
user_id: str,
incoming: IncomingMessage,
) -> tuple[IncomingMessage, bool]:
staged = await get_staged_attachments(self.runtime.store, room_id, user_id)
if not staged:
return incoming, False
attachments = [
Attachment(
type=item.get("type", "document"),
url=item.get("url"),
filename=item.get("filename"),
mime_type=item.get("mime_type"),
workspace_path=item.get("workspace_path"),
)
for item in staged
]
return (
IncomingMessage(
user_id=incoming.user_id,
platform=incoming.platform,
chat_id=incoming.chat_id,
text=incoming.text,
attachments=attachments,
reply_to=incoming.reply_to,
),
True,
)
async def _materialize_incoming_attachments(
self,
room_id: str,
matrix_user_id: str,
incoming: IncomingMessage,
) -> IncomingMessage:
workspace_root = Path(os.environ.get("SURFACES_WORKSPACE_DIR", "/workspace"))
materialized = []
for attachment in incoming.attachments:
materialized.append(
await download_matrix_attachment(
client=self.client,
workspace_root=workspace_root,
matrix_user_id=matrix_user_id,
room_id=room_id,
attachment=attachment,
)
)
return IncomingMessage(
user_id=incoming.user_id,
platform=incoming.platform,
chat_id=incoming.chat_id,
text=incoming.text,
attachments=materialized,
reply_to=incoming.reply_to,
)
async def _bootstrap_unregistered_room(
self,
room: MatrixRoom,
@ -251,11 +448,6 @@ class MatrixBot:
f"Привет, {created['user'].display_name or sender}! Пиши — я здесь.\n\n"
"Команды: !new · !chats · !rename · !archive · !context · !save · !load · !help"
)
await self.client.room_send(
created["chat_room_id"],
"m.room.message",
{"msgtype": "m.text", "body": welcome},
)
await set_room_meta(
self.runtime.store,
room.room_id,
@ -265,12 +457,18 @@ class MatrixBot:
"redirect_chat_id": created["chat_id"],
},
)
await self.client.room_send(
created["chat_room_id"],
"m.room.message",
{"msgtype": "m.text", "body": welcome},
)
return [
OutgoingMessage(
chat_id=room.room_id,
text=(
f"Создал рабочий чат {created['room_name']} ({created['chat_id']}) "
"и добавил его в пространство Lambda. Открой приглашённую комнату для продолжения."
"и добавил его в пространство Lambda. "
"Открой приглашённую комнату для продолжения."
),
)
]
@ -323,7 +521,9 @@ class MatrixBot:
except Exception as exc:
logger.warning("load_agent_call_failed", error=str(exc))
return [OutgoingMessage(chat_id=room_id, text=f"Ошибка при загрузке: {exc}")]
return [OutgoingMessage(chat_id=room_id, text=f"Запрос на загрузку отправлен агенту: {name}")]
return [
OutgoingMessage(chat_id=room_id, text=f"Запрос на загрузку отправлен агенту: {name}")
]
async def on_member(self, room: MatrixRoom, event: RoomMemberEvent) -> None:
if getattr(event, "sender", None) == self.client.user_id:
@ -351,6 +551,7 @@ async def prepare_live_sync(client: AsyncClient) -> str | None:
return response.next_batch
return None
async def send_outgoing(
client: AsyncClient,
room_id: str,
@ -365,7 +566,37 @@ async def send_outgoing(
await client.room_send(room_id, "m.room.message", {"msgtype": "m.text", "body": body})
return
if isinstance(event, OutgoingMessage):
await client.room_send(room_id, "m.room.message", {"msgtype": "m.text", "body": event.text})
if event.text:
await client.room_send(
room_id, "m.room.message", {"msgtype": "m.text", "body": event.text}
)
if event.attachments:
workspace_root = Path(os.environ.get("SURFACES_WORKSPACE_DIR", "/workspace"))
for attachment in event.attachments:
if not attachment.workspace_path:
continue
file_path = resolve_workspace_attachment_path(
workspace_root, attachment.workspace_path
)
with file_path.open("rb") as handle:
upload_response, _ = await client.upload(
handle,
content_type=attachment.mime_type or "application/octet-stream",
filename=attachment.filename or file_path.name,
filesize=file_path.stat().st_size,
)
content_uri = getattr(upload_response, "content_uri", None)
if not content_uri:
raise RuntimeError(f"Matrix upload failed for {file_path}")
await client.room_send(
room_id,
"m.room.message",
{
"msgtype": matrix_msgtype_for_attachment(attachment),
"body": attachment.filename or file_path.name,
"url": content_uri,
},
)
return
if isinstance(event, OutgoingUI):
lines = [event.text]
@ -430,7 +661,16 @@ async def main() -> None:
since_token = await prepare_live_sync(client)
bot = MatrixBot(client, runtime)
client.add_event_callback(bot.on_room_message, RoomMessageText)
client.add_event_callback(
bot.on_room_message,
(
RoomMessageText,
RoomMessageFile,
RoomMessageImage,
RoomMessageVideo,
RoomMessageAudio,
),
)
client.add_event_callback(bot.on_member, (InviteMemberEvent, RoomMemberEvent))
logger.info(

View file

@ -14,42 +14,53 @@ PLATFORM = "matrix"
def extract_attachments(event: Any) -> list[Attachment]:
source = getattr(event, "source", {}) or {}
content = source.get("content", {}) or getattr(event, "content", {}) or {}
msgtype = getattr(event, "msgtype", None)
if msgtype is None:
content = getattr(event, "content", {}) or {}
msgtype = content.get("msgtype")
url = content.get("url") or getattr(event, "url", None)
filename = content.get("body") or getattr(event, "body", None)
mime_type = content.get("mimetype") or getattr(event, "mimetype", None)
if mime_type is None:
info = content.get("info") or {}
if isinstance(info, dict):
mime_type = info.get("mimetype")
if msgtype == "m.image":
return [
Attachment(
type="image",
url=getattr(event, "url", None),
mime_type=getattr(event, "mimetype", None),
url=url,
filename=filename,
mime_type=mime_type,
)
]
if msgtype == "m.file":
return [
Attachment(
type="document",
url=getattr(event, "url", None),
filename=getattr(event, "body", None),
mime_type=getattr(event, "mimetype", None),
url=url,
filename=filename,
mime_type=mime_type,
)
]
if msgtype == "m.audio":
return [
Attachment(
type="audio",
url=getattr(event, "url", None),
mime_type=getattr(event, "mimetype", None),
url=url,
filename=filename,
mime_type=mime_type,
)
]
if msgtype == "m.video":
return [
Attachment(
type="video",
url=getattr(event, "url", None),
mime_type=getattr(event, "mimetype", None),
url=url,
filename=filename,
mime_type=mime_type,
)
]
return []
@ -75,6 +86,24 @@ def from_command(body: str, sender: str, chat_id: str, room_id: str | None = Non
},
)
if command == "list" and not args:
return IncomingCommand(
user_id=sender,
platform=PLATFORM,
chat_id=chat_id,
command="matrix_list_attachments",
args=[],
)
if command == "remove" and len(args) == 1:
return IncomingCommand(
user_id=sender,
platform=PLATFORM,
chat_id=chat_id,
command="matrix_remove_attachment",
args=[args[0]],
)
aliases = {
"skills": "settings_skills",
"connectors": "settings_connectors",

103
adapter/matrix/files.py Normal file
View file

@ -0,0 +1,103 @@
from __future__ import annotations
import mimetypes
import re
from datetime import UTC, datetime
from pathlib import Path
from core.protocol import Attachment
def _sanitize_component(value: str) -> str:
cleaned = re.sub(r"[^A-Za-z0-9._-]+", "_", value)
cleaned = cleaned.strip("._-")
return cleaned or "unknown"
def _default_filename(attachment: Attachment) -> str:
if attachment.filename:
return attachment.filename
extension = mimetypes.guess_extension(attachment.mime_type or "") or ""
base = {
"image": "image",
"audio": "audio",
"video": "video",
"document": "attachment",
}.get(attachment.type, "attachment")
return f"{base}{extension}"
def build_workspace_attachment_path(
*,
workspace_root: Path,
matrix_user_id: str,
room_id: str,
filename: str,
timestamp: str | None = None,
) -> tuple[str, Path]:
stamp = timestamp or datetime.now(UTC).strftime("%Y%m%d-%H%M%S")
safe_user = _sanitize_component(matrix_user_id.lstrip("@"))
safe_room = _sanitize_component(room_id.lstrip("!"))
safe_name = _sanitize_component(filename) or "attachment.bin"
relative_path = (
Path("surfaces")
/ "matrix"
/ safe_user
/ safe_room
/ "inbox"
/ f"{stamp}-{safe_name}"
)
return relative_path.as_posix(), workspace_root / relative_path
async def download_matrix_attachment(
*,
client,
workspace_root: Path,
matrix_user_id: str,
room_id: str,
attachment: Attachment,
timestamp: str | None = None,
) -> Attachment:
if not attachment.url:
return attachment
filename = _default_filename(attachment)
relative_path, absolute_path = build_workspace_attachment_path(
workspace_root=workspace_root,
matrix_user_id=matrix_user_id,
room_id=room_id,
filename=filename,
timestamp=timestamp,
)
absolute_path.parent.mkdir(parents=True, exist_ok=True)
response = await client.download(attachment.url)
body = getattr(response, "body", None)
if body is None:
raise RuntimeError(f"Matrix download response for {attachment.url} has no body")
absolute_path.write_bytes(body)
return Attachment(
type=attachment.type,
url=attachment.url,
filename=filename,
mime_type=attachment.mime_type,
workspace_path=relative_path,
)
def resolve_workspace_attachment_path(workspace_root: Path, workspace_path: str) -> Path:
path = Path(workspace_path)
if path.is_absolute():
return path
return workspace_root / path
def matrix_msgtype_for_attachment(attachment: Attachment) -> str:
return {
"image": "m.image",
"audio": "m.audio",
"video": "m.video",
}.get(attachment.type, "m.file")

View file

@ -1,5 +1,8 @@
from __future__ import annotations
import asyncio
from weakref import WeakValueDictionary
from core.store import StateStore
ROOM_META_PREFIX = "matrix_room:"
@ -9,6 +12,8 @@ SKILLS_MSG_PREFIX = "matrix_skills_msg:"
PENDING_CONFIRM_PREFIX = "matrix_pending_confirm:"
LOAD_PENDING_PREFIX = "matrix_load_pending:"
RESET_PENDING_PREFIX = "matrix_reset_pending:"
STAGED_ATTACHMENTS_PREFIX = "matrix_staged_attachments:"
_STAGED_ATTACHMENTS_LOCKS: WeakValueDictionary[str, asyncio.Lock] = WeakValueDictionary()
async def get_room_meta(store: StateStore, room_id: str) -> dict | None:
@ -126,3 +131,66 @@ async def set_reset_pending(
async def clear_reset_pending(store: StateStore, user_id: str, room_id: str) -> None:
await store.delete(_reset_pending_key(user_id, room_id))
def _staged_attachments_key(room_id: str, user_id: str) -> str:
return f"{STAGED_ATTACHMENTS_PREFIX}{room_id}:{user_id}"
def _staged_attachments_lock(room_id: str, user_id: str) -> asyncio.Lock:
key = _staged_attachments_key(room_id, user_id)
lock = _STAGED_ATTACHMENTS_LOCKS.get(key)
if lock is None:
lock = asyncio.Lock()
_STAGED_ATTACHMENTS_LOCKS[key] = lock
return lock
async def get_staged_attachments(
store: StateStore, room_id: str, user_id: str
) -> list[dict]:
data = await store.get(_staged_attachments_key(room_id, user_id))
if not isinstance(data, dict):
return []
attachments = data.get("attachments")
if not isinstance(attachments, list):
return []
return [attachment for attachment in attachments if isinstance(attachment, dict)]
async def add_staged_attachment(
store: StateStore, room_id: str, user_id: str, attachment: dict
) -> None:
async with _staged_attachments_lock(room_id, user_id):
attachments = await get_staged_attachments(store, room_id, user_id)
attachments.append(attachment)
await store.set(
_staged_attachments_key(room_id, user_id), {"attachments": attachments}
)
async def remove_staged_attachment_at(
store: StateStore, room_id: str, user_id: str, index: int
) -> dict | None:
async with _staged_attachments_lock(room_id, user_id):
attachments = await get_staged_attachments(store, room_id, user_id)
if index < 0 or index >= len(attachments):
return None
removed = attachments.pop(index)
if attachments:
await store.set(
_staged_attachments_key(room_id, user_id), {"attachments": attachments}
)
else:
await store.delete(_staged_attachments_key(room_id, user_id))
return removed
async def clear_staged_attachments(
store: StateStore, room_id: str, user_id: str
) -> None:
async with _staged_attachments_lock(room_id, user_id):
await store.delete(_staged_attachments_key(room_id, user_id))

View file

@ -29,10 +29,15 @@ async def handle_message(event: IncomingMessage, auth_mgr, platform, chat_mgr, s
user_id=event.user_id,
chat_id=event.chat_id,
text=event.text,
attachments=[],
attachments=event.attachments,
)
return [
OutgoingTyping(chat_id=event.chat_id, is_typing=False),
OutgoingMessage(chat_id=event.chat_id, text=response.response, parse_mode="markdown"),
OutgoingMessage(
chat_id=event.chat_id,
text=response.response,
parse_mode="markdown",
attachments=list(getattr(response, "attachments", [])),
),
]

View file

@ -12,6 +12,7 @@ class Attachment:
content: bytes | None = None
filename: str | None = None
mime_type: str | None = None
workspace_path: str | None = None
@dataclass

View file

@ -1,5 +1,39 @@
services:
platform-agent:
build:
context: ./external/platform-agent
target: development
additional_contexts:
agent_api: ./external/platform-agent_api
env_file: .env
environment:
PYTHONUNBUFFERED: "1"
volumes:
- ./external/platform-agent/src:/app/src
- ./external/platform-agent_api:/agent_api
- workspace:/workspace
command: >
sh -lc "
mkdir -p /workspace &&
chown -R agent:agent /workspace &&
exec /app/.venv/bin/uvicorn src.main:app --host 0.0.0.0 --port 8000
"
ports:
- "8000:8000"
restart: unless-stopped
matrix-bot:
build: .
env_file: .env
environment:
AGENT_BASE_URL: http://platform-agent:8000
AGENT_WS_URL: ws://platform-agent:8000/v1/agent_ws/
SURFACES_WORKSPACE_DIR: /workspace
depends_on:
- platform-agent
volumes:
- workspace:/workspace
restart: unless-stopped
volumes:
workspace:

View file

@ -0,0 +1,252 @@
# Matrix Shared Workspace File Flow Design
## Goal
Bring the Matrix surface and `platform-agent` to a single file-handling model that matches the current platform runtime contract as closely as possible.
The result should be:
- Matrix receives user files and makes them visible to the agent through a shared `/workspace`
- `platform-agent` receives attachment paths, not ad hoc summaries or inline payloads
- the agent can send files back to the user through the surface via `send_file`
- local development and the default deployment path use the same storage contract
## Core Decision
The selected architecture is:
`Matrix surface <-> shared /workspace <-> platform-agent`
This means:
- the Matrix bot is responsible for downloading incoming Matrix media
- downloaded files are written into the same filesystem mounted into `platform-agent`
- the surface passes relative workspace paths to the agent as `attachments`
- the agent returns files to the user by emitting `MsgEventSendFile(path=...)`
This is the current platform-native direction and does not require new platform endpoints.
## Why This Decision
The current upstream platform changes already define the file contract:
- `MsgUserMessage.attachments` is `list[str]`
- each attachment is a path relative to `/workspace`
- the agent validates those paths against its configured backend root
- the agent can emit `send_file(path)` back to the client
That is not an upload API and not a remote blob contract. It is an explicit shared-workspace contract.
Trying to preserve the current separate-process launch model would force the surface to fake production behavior with inline text extraction, out-of-band path rewriting, or a future upload API that does not exist yet. That would increase the gap between our runtime and the platform runtime instead of reducing it.
## Scope
This design covers:
- shared workspace runtime for Matrix bot and `platform-agent`
- incoming Matrix file handling into shared storage
- attachment path propagation to `RealPlatformClient` and `AgentApi`
- outbound file delivery from agent to Matrix user
- local compose/dev workflow and README updates
This design does not cover:
- Telegram file flow
- encrypted Matrix media handling
- upload APIs on the platform side
- OCR, PDF parsing, or content extraction pipelines
- long-term object storage or file lifecycle policies beyond basic cleanup boundaries
## Runtime Contract
### Shared filesystem
Both containers must mount the same directory at `/workspace`.
Requirements:
- the Matrix bot can create files under `/workspace`
- `platform-agent` sees the same files at the same relative paths
- agent-originated files written under `/workspace` are readable by the Matrix bot
The contract is path-based, not URL-based.
### Attachment path format
The surface sends attachments to the agent as relative workspace paths, for example:
- `surfaces/matrix/<matrix_user_id>/<room_id>/inbox/20260420-153000-report.pdf`
- `surfaces/matrix/<matrix_user_id>/<room_id>/inbox/20260420-153200-photo.jpg`
Rules:
- paths must be relative to `/workspace`
- paths must be normalized before sending to the agent
- surface-owned uploads must live under a dedicated namespace to avoid collisions with agent-created files
## Data Flow
### Incoming file from Matrix user
1. Matrix receives `m.file`, `m.image`, `m.audio`, or `m.video`.
2. The Matrix bot resolves the target room and platform chat context as usual.
3. The Matrix bot downloads the media from Matrix.
4. The file is stored under `/workspace/surfaces/matrix/.../inbox/...`.
5. The outgoing platform call includes:
- original user text
- `attachments=[relative_path_1, ...]`
6. `platform-agent` validates that those files exist and exposes them to the agent through the upstream attachment mechanism.
Important detail:
- the surface should not rewrite the user message into a synthetic file summary unless the message body is empty
- when body is empty, the surface may send a minimal synthetic text such as `User sent one or more attachments.`
### Outbound file from agent to Matrix user
1. The agent uses `send_file(path)`.
2. `platform-agent` emits `MsgEventSendFile(path=...)`.
3. The Matrix integration catches that event.
4. The Matrix bot resolves the file inside shared `/workspace`.
5. The Matrix bot uploads the file to Matrix and sends the appropriate media message to the room.
Surface behavior:
- if MIME type and extension are known, send the closest native Matrix media type
- otherwise send as `m.file`
- user-visible failures must be explicit if the referenced file does not exist or cannot be uploaded
## Filesystem Layout
The Matrix surface owns a dedicated subtree:
```text
/workspace/
surfaces/
matrix/
<sanitized-user-id>/
<sanitized-room-id>/
inbox/
20260420-153000-report.pdf
```
Design constraints:
- sanitize user ids and room ids before using them as path components
- preserve the original filename in the final basename where possible
- prefix filenames with a timestamp or unique id to avoid collisions
This layout is intentionally surface-scoped. The agent may read these files, but the surface remains the owner of how inbound messenger files are organized.
## Components
### Matrix attachment storage helper
Add a focused helper module responsible for:
- building stable workspace-relative paths
- sanitizing path components
- downloading Matrix media into `/workspace`
- returning attachment metadata needed by the platform layer
This helper should not know about agent transport details beyond the final relative path output.
### Real platform client
`RealPlatformClient` must pass attachment relative paths through to `AgentApi.send_message(...)`.
It must also surface non-text agent events needed by the Matrix adapter, especially `MsgEventSendFile`.
### Agent API wrapper
`AgentApiWrapper` must be compatible with the modern upstream protocol:
- `/v1/agent_ws/{chat_id}/`
- `attachments` on outgoing user messages
- `MsgEventToolCallChunk`
- `MsgEventToolResult`
- `MsgEventCustomUpdate`
- `MsgEventSendFile`
- `MsgEventEnd`
### Matrix bot outbound renderer
The Matrix adapter must support sending files back to the room.
At minimum it needs:
- path resolution inside shared workspace
- Matrix upload of the local file
- send of an `m.file` or native media event with filename and MIME type
## Deployment Changes
### Compose
The repository root `docker-compose.yml` becomes the primary prod-like local runtime.
It should define at least:
- `matrix-bot`
- `platform-agent`
- one shared volume mounted as `/workspace` into both services
The default developer workflow should stop describing `platform-agent` as a separately started side process.
### Environment
The Matrix bot must connect to the in-compose `platform-agent` service by service name, not by assuming a separately launched localhost process.
The agent WebSocket configuration in docs and examples must match the modern upstream route.
## Error Handling
### Incoming files
If the Matrix bot cannot download or persist the file:
- do not send a broken attachment path to the agent
- return a user-visible error in the room
- log the Matrix event id, room id, and failure reason
### Outbound files
If the agent asks to send a missing file:
- log a structured warning with the requested path
- send a user-visible message that the file could not be delivered
### Shared workspace mismatch
If the runtime is misconfigured and `/workspace` is not actually shared:
- inbound attachments will fail agent-side path validation
- outbound `send_file` will fail surface-side file resolution
The implementation should make such failures obvious in logs rather than silently degrading to text-only behavior.
## Testing
The implementation must cover:
- Matrix media download writes into the expected workspace-relative path
- `RealPlatformClient` forwards attachment relative paths to the agent API
- Matrix plain messages with attachments preserve the original text while adding attachment paths
- empty-body attachment-only messages produce the synthetic text fallback
- `AgentApiWrapper` accepts `MsgEventSendFile` without treating it as unknown
- Matrix outbound file handling converts `MsgEventSendFile` into a Matrix upload/send call
- compose configuration mounts the same workspace into both containers
## Non-Goals
- no inline text extraction MVP
- no temporary URL-passing contract to the agent
- no fake “prod” mode with separate local filesystems
- no platform API additions in this phase
## Success Criteria
- the default local runtime uses a shared `/workspace`
- a user can send a file in Matrix and the agent receives it through upstream `attachments`
- the agent can emit `send_file(path)` and the Matrix user receives the file in the same room
- our runtime behavior matches the current platform contract closely enough that moving from local compose to production does not require redesigning file flow

View file

@ -0,0 +1,262 @@
# Matrix Staged Attachments Design
## Goal
Make file sending in the Matrix surface usable for an AI agent despite current Matrix client behavior, especially in Element where media is often sent immediately as separate events without a shared text composer.
The result should be:
- files can arrive before the user writes the actual instruction
- the surface stages those files instead of immediately sending them to the agent
- the next normal user message in the same chat commits all staged files as one agent turn
- the user can inspect and remove staged files with short chat commands
## Core Decision
The selected UX model is:
`incoming Matrix media -> staged attachments for (chat_id, user_id) -> next normal message commits them`
This means:
- attachment-only events do not immediately invoke the agent
- the bot acknowledges staged files with a service message
- the next normal user message sends text plus all currently staged files to the agent
- staged files are then cleared
## Why This Decision
Matrix natively models messages as separate events, and common clients do not provide a reliable "one text message with many attachments" composer flow.
In practice this causes two UX failures for an AI bot:
- users may send files first and only then write the task
- users may send multiple files as multiple independent Matrix events
If the surface treats each incoming file as a full agent turn, the bot becomes noisy and context-fragmented. If it ignores file-only messages, file handling feels broken.
Staging is the smallest surface-side abstraction that fixes both problems without fighting the Matrix event model.
## Scope
This design covers:
- staging inbound Matrix attachments before agent submission
- per-chat attachment state for a specific user
- user-facing service messages for staged attachments
- short commands for listing and removing staged files
- commit behavior on the next normal message
This design does not cover:
- edits or redactions of original Matrix media events as attachment controls
- cross-surface shared staging
- thread-aware staging beyond the existing `chat_id` boundary
- changes to the platform attachment contract
## State Model
### Staging key
Staged attachments are isolated by:
- `chat_id`
- `user_id`
This means:
- files staged by a user in one chat never appear in another chat
- files staged by one user do not mix with another user's files in the same room
### Staged attachment record
Each staged attachment must track at least:
- stable internal id
- display filename
- workspace-relative path
- MIME type if known
- created timestamp
User-visible commands operate on the current ordered list, not on internal ids.
### Lifecycle
A staged attachment is in exactly one of these states:
1. `staged`
2. `committed`
3. `removed`
Rules:
- only `staged` attachments appear in `!list`
- `committed` attachments are no longer user-removable
- `removed` attachments are excluded from future commits
## Inbound Behavior
### Attachment-only event
If the Matrix surface receives one or more file/media events from a user without a normal text message to commit them:
1. download each file into shared `/workspace`
2. add each file to the staged set for `(chat_id, user_id)`
3. do not call the agent yet
4. send a service acknowledgment message
### Service acknowledgment
The service message must communicate:
- the current staged attachment list with indices
- that the next normal message will be sent to the agent together with those files
- available commands: `!list`, `!remove <n>`, `!remove all`
Example shape:
```text
Staged attachments:
1. screenshot.png
2. invoice.pdf
Your next message will be sent to the agent with these files.
Commands: !list, !remove <n>, !remove all
```
### Burst handling
Matrix clients may send multiple files as separate consecutive events.
To avoid bot spam, service acknowledgments should be debounced over a short window and aggregated into one reply where feasible.
The acknowledgment must reflect the full current staged set, not only the most recently received file.
## Commit Behavior
### Commit trigger
The commit trigger is:
- the next normal user message in the same `(chat_id, user_id)` scope
Normal user message means:
- not a staging control command
- not a pure attachment event being staged
### Commit action
When a commit-triggering message arrives:
1. collect all currently staged attachments for `(chat_id, user_id)`
2. send the user text plus those attachments to the agent as one turn
3. mark all included staged attachments as `committed`
4. clear the staged set
After commit:
- the just-sent attachments must no longer appear in `!list`
- a later file upload starts a new staged set
## Commands
### `!list`
Shows the current staged attachment list for the user in the current chat.
If the list is empty, the response should be short and explicit.
### `!remove <n>`
Removes the staged attachment at the current 1-based index.
Behavior:
- if the index is valid, remove that staged attachment and return the updated staged list
- if the index is invalid, return a short error without repeating the list
### `!remove all`
Clears the entire staged set for the user in the current chat.
The response should be short and explicit.
## Ordering Rules
The staged list is ordered by staging time.
User-facing indices:
- are 1-based
- are recalculated from the current staged set
- may change after removals
Therefore:
- `!list` always shows the current authoritative numbering
- after a successful `!remove <n>`, the bot should reply with the refreshed list
## Error Handling
### Download failure
If a file cannot be downloaded or stored:
- do not add it to the staged set
- do not pretend it will be sent later
- send a short user-visible failure message
### Invalid command
If the command is malformed or uses an invalid index:
- return a short error
- do not commit staged attachments
- do not clear the staged set
### Agent submission failure
If commit fails when sending the text plus staged files to the agent:
- staged attachments must remain available for retry unless the failure is known to be irreversible
- the user-visible error should make it clear that the files were not consumed
This prevents silent loss of staged context.
## Interaction with Shared Workspace Design
This design assumes the shared-workspace contract defined in
[2026-04-20-matrix-shared-workspace-file-flow-design.md](/Users/a/MAI/sem2/lambda/surfaces-bot/docs/superpowers/specs/2026-04-20-matrix-shared-workspace-file-flow-design.md).
Specifically:
- staged files are stored in shared `/workspace`
- the final commit still passes workspace-relative paths to `platform-agent`
- staging changes only when the surface chooses to invoke the agent, not how attachments are represented
## Testing
The implementation must cover:
- file-only Matrix events are staged and do not immediately invoke the agent
- service acknowledgment includes staged filenames and command hints
- `!list` returns the current staged set for the correct `(chat_id, user_id)`
- `!remove <n>` removes the correct staged attachment and refreshes numbering
- `!remove all` clears the staged set
- invalid `!remove <n>` returns a short error and keeps state unchanged
- the next normal message commits all staged attachments with the text as one agent turn
- committed attachments disappear from staging after success
- failed commits preserve staged attachments
- staging in one chat does not leak into another chat
- staging for one user does not leak to another user in the same room
## Non-Goals
This design intentionally does not attempt to:
- emulate Telegram-style albums in Matrix
- rely on special support from Element or other Matrix clients
- introduce a rich interactive attachment management UI
The goal is a reliable chat-native workflow that works within Matrix's actual event model.

View file

@ -86,6 +86,55 @@ class AgentApiWrapper(AgentApi):
**self._init_kwargs,
)
@staticmethod
def _event_kind(event: object) -> str:
raw_kind = getattr(event, "type", None)
if hasattr(raw_kind, "value"):
raw_kind = raw_kind.value
if raw_kind is None:
raw_kind = event.__class__.__name__
kind = str(raw_kind).replace("-", "_")
if "_" in kind:
return kind.upper()
normalized = []
for index, char in enumerate(kind):
if index and char.isupper() and not kind[index - 1].isupper():
normalized.append("_")
normalized.append(char)
return "".join(normalized).upper()
@classmethod
def _is_kind(cls, event: object, *needles: str) -> bool:
kind = cls._event_kind(event)
return any(needle in kind for needle in needles)
@classmethod
def _is_text_event(cls, event: object) -> bool:
return hasattr(event, "text") or cls._is_kind(event, "TEXT_CHUNK")
@classmethod
def _is_end_event(cls, event: object) -> bool:
kind = cls._event_kind(event)
return kind == "END" or kind.endswith("_END")
@classmethod
def _is_send_file_event(cls, event: object) -> bool:
return "SEND_FILE" in cls._event_kind(event)
async def _publish_event(self, event: object, *, queue_event: object | None = None) -> None:
if self.callback:
self.callback(event)
if self._current_queue:
await self._current_queue.put(queue_event if queue_event is not None else event)
async def _publish_error(self, event: object) -> None:
if self.callback:
self.callback(event)
if self._current_queue and hasattr(event, "code") and hasattr(event, "details"):
await self._current_queue.put(AgentException(getattr(event, "code"), getattr(event, "details")))
async def _listen(self):
try:
async for msg in self._ws:
@ -93,7 +142,7 @@ class AgentApiWrapper(AgentApi):
try:
outgoing_msg = ServerMessage.validate_json(msg.data)
if isinstance(outgoing_msg, MsgEventTextChunk):
if self._is_text_event(outgoing_msg):
if self._current_queue:
await self._current_queue.put(outgoing_msg)
elif self.callback:
@ -101,29 +150,22 @@ class AgentApiWrapper(AgentApi):
else:
logger.warning("[%s] AgentEvent without active request", self.id)
elif isinstance(outgoing_msg, MsgEventEnd):
elif self._is_end_event(outgoing_msg):
self.last_tokens_used = outgoing_msg.tokens_used
if self._current_queue:
await self._current_queue.put(outgoing_msg)
await self._publish_event(outgoing_msg)
elif isinstance(outgoing_msg, MsgError):
if self.callback:
self.callback(outgoing_msg)
elif self._is_kind(outgoing_msg, "ERROR"):
error = AgentException(outgoing_msg.code, outgoing_msg.details)
logger.error("[%s] Agent error: %s", self.id, error)
if self._current_queue:
await self._current_queue.put(error)
await self._publish_error(outgoing_msg)
elif isinstance(outgoing_msg, MsgGracefulDisconnect):
if self.callback:
self.callback(outgoing_msg)
elif self._is_kind(outgoing_msg, "GRACEFUL_DISCONNECT"):
await self._publish_event(outgoing_msg)
logger.info("[%s] Gracefully disconnecting", self.id)
break
else:
logger.warning("[%s] Unknown message type: %s", self.id, outgoing_msg.type)
if self.callback:
self.callback(outgoing_msg)
await self._publish_event(outgoing_msg)
except Exception as exc:
logger.error("[%s] Failed to deserialize message: %s", self.id, exc)

View file

@ -4,7 +4,7 @@ from __future__ import annotations
from datetime import datetime
from typing import Any, AsyncIterator, Literal, Protocol
from pydantic import BaseModel
from pydantic import BaseModel, Field
class User(BaseModel):
@ -17,10 +17,11 @@ class User(BaseModel):
class Attachment(BaseModel):
url: str
mime_type: str
url: str | None = None
mime_type: str | None = None
size: int | None = None
filename: str | None = None
workspace_path: str | None = None
class MessageResponse(BaseModel):
@ -28,6 +29,7 @@ class MessageResponse(BaseModel):
response: str
tokens_used: int
finished: bool
attachments: list[Attachment] = Field(default_factory=list)
class MessageChunk(BaseModel):

View file

@ -1,6 +1,8 @@
from __future__ import annotations
import asyncio
import inspect
from pathlib import Path
from typing import AsyncIterator
from sdk.agent_api_wrapper import AgentApiWrapper
@ -71,21 +73,43 @@ class RealPlatformClient(PlatformClient):
) -> MessageResponse:
response_parts: list[str] = []
tokens_used = 0
sent_attachments: list[Attachment] = []
message_id = user_id
saw_end_event = False
async for chunk in self.stream_message(user_id, chat_id, text, attachments=attachments):
message_id = chunk.message_id
if chunk.delta:
response_parts.append(chunk.delta)
if chunk.finished:
tokens_used = chunk.tokens_used
lock = self._get_chat_send_lock(chat_id)
async with lock:
chat_api = await self._get_chat_api(chat_id)
if hasattr(chat_api, "last_tokens_used"):
chat_api.last_tokens_used = 0
return MessageResponse(
message_id=message_id,
response="".join(response_parts),
tokens_used=tokens_used,
finished=True,
)
async for event in self._stream_agent_events(chat_api, text, attachments=attachments):
message_id = user_id
if self._is_text_event(event):
chunk_text = getattr(event, "text", "")
if chunk_text:
response_parts.append(chunk_text)
elif self._is_end_event(event):
tokens_used = getattr(event, "tokens_used", tokens_used)
saw_end_event = True
elif self._is_send_file_event(event):
attachment = self._attachment_from_send_file_event(event)
if attachment is not None:
sent_attachments.append(attachment)
if not saw_end_event:
tokens_used = getattr(chat_api, "last_tokens_used", tokens_used)
await self._prototype_state.set_last_tokens_used(str(chat_id), tokens_used)
response_kwargs = {
"message_id": message_id,
"response": "".join(response_parts),
"tokens_used": tokens_used,
"finished": True,
}
if self._message_response_accepts_attachments():
response_kwargs["attachments"] = sent_attachments
return MessageResponse(**response_kwargs)
async def stream_message(
self,
@ -99,20 +123,37 @@ class RealPlatformClient(PlatformClient):
chat_api = await self._get_chat_api(chat_id)
if hasattr(chat_api, "last_tokens_used"):
chat_api.last_tokens_used = 0
async for event in chat_api.send_message(text):
saw_end_event = False
async for event in self._stream_agent_events(chat_api, text, attachments=attachments):
if self._is_text_event(event):
yield MessageChunk(
message_id=user_id,
delta=getattr(event, "text", ""),
finished=False,
)
elif self._is_end_event(event):
tokens_used = getattr(event, "tokens_used", 0)
saw_end_event = True
await self._prototype_state.set_last_tokens_used(str(chat_id), tokens_used)
yield MessageChunk(
message_id=user_id,
delta="",
finished=True,
tokens_used=tokens_used,
)
elif self._is_send_file_event(event):
continue
else:
continue
if not saw_end_event:
tokens_used = getattr(chat_api, "last_tokens_used", 0)
await self._prototype_state.set_last_tokens_used(str(chat_id), tokens_used)
yield MessageChunk(
message_id=user_id,
delta=event.text,
finished=False,
delta="",
finished=True,
tokens_used=tokens_used,
)
tokens_used = getattr(chat_api, "last_tokens_used", 0)
await self._prototype_state.set_last_tokens_used(str(chat_id), tokens_used)
yield MessageChunk(
message_id=user_id,
delta="",
finished=True,
tokens_used=tokens_used,
)
async def get_settings(self, user_id: str) -> UserSettings:
return await self._prototype_state.get_settings(user_id)
@ -140,3 +181,107 @@ class RealPlatformClient(PlatformClient):
close = getattr(self._agent_api, "close", None)
if callable(close):
await close()
async def _stream_agent_events(
self,
chat_api,
text: str,
attachments: list[Attachment] | None = None,
) -> AsyncIterator[object]:
send_message = chat_api.send_message
attachment_paths = self._attachment_paths(attachments)
if attachment_paths and self._send_message_accepts_attachments(send_message):
event_stream = send_message(text, attachments=attachment_paths)
else:
event_stream = send_message(text)
async for event in event_stream:
yield event
@staticmethod
def _attachment_paths(attachments: list[Attachment] | None) -> list[str]:
if not attachments:
return []
paths = []
for attachment in attachments:
if attachment.workspace_path:
paths.append(attachment.workspace_path)
return paths
@staticmethod
def _send_message_accepts_attachments(send_message) -> bool:
try:
parameters = inspect.signature(send_message).parameters
except (TypeError, ValueError):
return False
return "attachments" in parameters or any(
parameter.kind == inspect.Parameter.VAR_KEYWORD for parameter in parameters.values()
)
@staticmethod
def _event_kind(event: object) -> str:
raw_kind = getattr(event, "type", None)
if hasattr(raw_kind, "value"):
raw_kind = raw_kind.value
if raw_kind is None:
raw_kind = event.__class__.__name__
kind = str(raw_kind).replace("-", "_")
if "_" in kind:
return kind.upper()
normalized = []
for index, char in enumerate(kind):
if index and char.isupper() and not kind[index - 1].isupper():
normalized.append("_")
normalized.append(char)
return "".join(normalized).upper()
@classmethod
def _is_text_event(cls, event: object) -> bool:
return hasattr(event, "text") or "TEXT_CHUNK" in cls._event_kind(event)
@classmethod
def _is_end_event(cls, event: object) -> bool:
kind = cls._event_kind(event)
return kind == "END" or kind.endswith("_END")
@classmethod
def _is_send_file_event(cls, event: object) -> bool:
kind = cls._event_kind(event)
return "SEND_FILE" in kind
@staticmethod
def _attachment_from_send_file_event(event: object) -> Attachment | None:
location = None
for attr in ("url", "workspace_path", "path", "file_path", "uri"):
value = getattr(event, attr, None)
if value:
location = str(value)
break
if location is None:
return None
mime_type = getattr(event, "mime_type", None) or "application/octet-stream"
filename = getattr(event, "filename", None) or Path(location).name or None
size = getattr(event, "size", None)
workspace_path = location
if workspace_path.startswith("/workspace/"):
workspace_path = workspace_path[len("/workspace/"):]
elif workspace_path == "/workspace":
workspace_path = ""
return Attachment(
url=location,
mime_type=mime_type,
size=size,
filename=filename,
workspace_path=workspace_path or None,
)
@staticmethod
def _message_response_accepts_attachments() -> bool:
fields = getattr(MessageResponse, "model_fields", None)
if isinstance(fields, dict):
return "attachments" in fields
try:
return "attachments" in inspect.signature(MessageResponse).parameters
except (TypeError, ValueError):
return False

View file

@ -37,7 +37,41 @@ def image_event(url: str = "mxc://x/img", mime: str = "image/jpeg"):
)
async def test_plain_text_to_incoming_message():
def content_file_event():
return SimpleNamespace(
sender="@a:m.org",
body="doc.pdf",
event_id="$e4",
msgtype=None,
replyto_event_id=None,
content={
"msgtype": "m.file",
"body": "nested.pdf",
"url": "mxc://x/nested",
"info": {"mimetype": "application/pdf"},
},
)
def source_only_content_file_event():
return SimpleNamespace(
sender="@a:m.org",
body="doc.pdf",
event_id="$e5",
msgtype=None,
replyto_event_id=None,
source={
"content": {
"msgtype": "m.file",
"body": "source-only.pdf",
"url": "mxc://x/source-only",
"info": {"mimetype": "application/pdf"},
}
},
)
def test_plain_text_to_incoming_message():
result = from_room_event(text_event("Hello"), room_id="!r:m.org", chat_id="C1")
assert isinstance(result, IncomingMessage)
assert result.text == "Hello"
@ -46,20 +80,48 @@ async def test_plain_text_to_incoming_message():
assert result.attachments == []
async def test_bang_command_to_incoming_command():
def test_bang_command_to_incoming_command():
result = from_room_event(text_event("!new Analysis"), room_id="!r:m.org", chat_id="C1")
assert isinstance(result, IncomingCommand)
assert result.command == "new"
assert result.args == ["Analysis"]
async def test_skills_alias_to_settings_command():
def test_list_command_maps_to_matrix_list_attachments():
result = from_room_event(text_event("!list"), room_id="!r:m.org", chat_id="C1")
assert isinstance(result, IncomingCommand)
assert result.command == "matrix_list_attachments"
assert result.args == []
def test_remove_all_maps_to_matrix_remove_attachment():
result = from_room_event(text_event("!remove all"), room_id="!r:m.org", chat_id="C1")
assert isinstance(result, IncomingCommand)
assert result.command == "matrix_remove_attachment"
assert result.args == ["all"]
def test_remove_index_maps_to_matrix_remove_attachment():
result = from_room_event(text_event("!remove 2"), room_id="!r:m.org", chat_id="C1")
assert isinstance(result, IncomingCommand)
assert result.command == "matrix_remove_attachment"
assert result.args == ["2"]
def test_remove_arbitrary_index_maps_to_matrix_remove_attachment():
result = from_room_event(text_event("!remove 99"), room_id="!r:m.org", chat_id="C1")
assert isinstance(result, IncomingCommand)
assert result.command == "matrix_remove_attachment"
assert result.args == ["99"]
def test_skills_alias_to_settings_command():
result = from_command("!skills", sender="@a:m.org", chat_id="C1")
assert isinstance(result, IncomingCommand)
assert result.command == "settings_skills"
async def test_yes_to_callback():
def test_yes_to_callback():
result = from_room_event(text_event("!yes"), room_id="!room:example.org", chat_id="C7")
assert isinstance(result, IncomingCallback)
assert result.action == "confirm"
@ -67,7 +129,7 @@ async def test_yes_to_callback():
assert result.payload["room_id"] == "!room:example.org"
async def test_no_to_callback():
def test_no_to_callback():
result = from_room_event(text_event("!no"), room_id="!room:example.org", chat_id="C7")
assert isinstance(result, IncomingCallback)
assert result.action == "cancel"
@ -75,7 +137,7 @@ async def test_no_to_callback():
assert result.payload["room_id"] == "!room:example.org"
async def test_file_attachment():
def test_file_attachment():
result = from_room_event(file_event(), room_id="!r:m.org", chat_id="C1")
assert isinstance(result, IncomingMessage)
assert len(result.attachments) == 1
@ -86,11 +148,32 @@ async def test_file_attachment():
assert a.mime_type == "application/pdf"
async def test_image_attachment():
def test_image_attachment():
result = from_room_event(image_event(), room_id="!r:m.org", chat_id="C1")
assert result.attachments[0].type == "image"
assert result.attachments[0].filename == "img.jpg"
assert result.attachments[0].mime_type == "image/jpeg"
def test_attachment_falls_back_to_content_payload():
result = from_room_event(content_file_event(), room_id="!r:m.org", chat_id="C1")
assert isinstance(result, IncomingMessage)
a = result.attachments[0]
assert a.type == "document"
assert a.url == "mxc://x/nested"
assert a.filename == "nested.pdf"
assert a.mime_type == "application/pdf"
def test_attachment_falls_back_to_source_content_payload():
result = from_room_event(source_only_content_file_event(), room_id="!r:m.org", chat_id="C1")
assert isinstance(result, IncomingMessage)
a = result.attachments[0]
assert a.type == "document"
assert a.url == "mxc://x/source-only"
assert a.filename == "source-only.pdf"
assert a.mime_type == "application/pdf"
def test_converter_module_does_not_expose_reaction_callbacks():
assert not hasattr(converter, "from_reaction")

View file

@ -4,20 +4,36 @@ import importlib
from types import SimpleNamespace
from unittest.mock import AsyncMock
import pytest
from nio import (
RoomMessageAudio,
RoomMessageFile,
RoomMessageImage,
RoomMessageText,
RoomMessageVideo,
)
from nio.api import RoomVisibility
from nio.responses import SyncResponse
from adapter.matrix.bot import MatrixBot, build_runtime, prepare_live_sync
from adapter.matrix.handlers.auth import handle_invite
from adapter.matrix.store import (
add_staged_attachment,
get_platform_chat_id,
get_room_meta,
get_staged_attachments,
get_user_meta,
set_load_pending,
set_room_meta,
set_user_meta,
)
from core.protocol import IncomingCallback, IncomingCommand, OutgoingMessage
from core.protocol import (
Attachment,
IncomingCallback,
IncomingCommand,
IncomingMessage,
OutgoingMessage,
)
from sdk.interface import PlatformError
from sdk.mock import MockPlatformClient
from sdk.real import RealPlatformClient
@ -27,7 +43,9 @@ async def test_matrix_dispatcher_registers_custom_handlers():
runtime = build_runtime(platform=MockPlatformClient())
current_chat_id = "C9"
start = IncomingCommand(user_id="u1", platform="matrix", chat_id=current_chat_id, command="start")
start = IncomingCommand(
user_id="u1", platform="matrix", chat_id=current_chat_id, command="start"
)
await runtime.dispatcher.dispatch(start)
new = IncomingCommand(
@ -93,7 +111,9 @@ async def test_new_chat_creates_real_matrix_room_when_client_available():
)
client.room_put_state.assert_awaited_once()
put_call = client.room_put_state.call_args
assert put_call.kwargs.get("room_id") == "!space:example" or put_call.args[0] == "!space:example"
assert (
put_call.kwargs.get("room_id") == "!space:example" or put_call.args[0] == "!space:example"
)
chats = await runtime.chat_mgr.list_active("u1")
assert [c.chat_id for c in chats] == ["C7"]
assert [c.surface_ref for c in chats] == ["!r2:example"]
@ -139,7 +159,10 @@ async def test_invite_event_creates_space_and_chat_room():
client.room_put_state.assert_awaited_once()
put_state_call = client.room_put_state.call_args
assert put_state_call.kwargs.get("event_type") == "m.space.child" or put_state_call.args[1] == "m.space.child"
assert (
put_state_call.kwargs.get("event_type") == "m.space.child"
or put_state_call.args[1] == "m.space.child"
)
user_meta = await get_user_meta(runtime.store, "@alice:example.org")
assert user_meta is not None
@ -249,7 +272,10 @@ async def test_bot_assigns_platform_chat_id_for_existing_managed_room():
await bot.on_room_message(room, event)
assert await get_platform_chat_id(runtime.store, "!chat1:example.org") == "matrix:!chat1:example.org"
assert (
await get_platform_chat_id(runtime.store, "!chat1:example.org")
== "matrix:!chat1:example.org"
)
runtime.dispatcher.dispatch.assert_awaited_once()
@ -278,6 +304,316 @@ async def test_bot_routes_plain_messages_via_platform_chat_id():
assert dispatched.text == "hello"
async def test_bot_downloads_matrix_file_to_workspace_before_staging(tmp_path, monkeypatch):
monkeypatch.setenv("SURFACES_WORKSPACE_DIR", str(tmp_path))
runtime = build_runtime(platform=MockPlatformClient())
await set_room_meta(
runtime.store,
"!chat1:example.org",
{
"chat_id": "C1",
"matrix_user_id": "@alice:example.org",
"platform_chat_id": "matrix:ctx-1",
},
)
client = SimpleNamespace(
user_id="@bot:example.org",
download=AsyncMock(return_value=SimpleNamespace(body=b"%PDF-1.7")),
)
bot = MatrixBot(client, runtime)
bot._send_all = AsyncMock()
runtime.dispatcher.dispatch = AsyncMock(return_value=[])
room = SimpleNamespace(room_id="!chat1:example.org")
event = SimpleNamespace(
sender="@alice:example.org",
body="report.pdf",
msgtype="m.file",
replyto_event_id=None,
url="mxc://server/id",
mimetype="application/pdf",
)
await bot.on_room_message(room, event)
runtime.dispatcher.dispatch.assert_not_awaited()
staged = await get_staged_attachments(runtime.store, "!chat1:example.org", "@alice:example.org")
assert staged[0]["workspace_path"] is not None
assert (tmp_path / staged[0]["workspace_path"]).read_bytes() == b"%PDF-1.7"
bot._send_all.assert_not_awaited()
async def test_file_only_event_is_staged_and_does_not_dispatch():
runtime = build_runtime(platform=MockPlatformClient())
client = SimpleNamespace(user_id="@bot:example.org", room_send=AsyncMock())
bot = MatrixBot(client, runtime)
runtime.dispatcher.dispatch = AsyncMock(return_value=[])
bot._materialize_incoming_attachments = AsyncMock(
return_value=IncomingMessage(
user_id="@alice:example.org",
platform="matrix",
chat_id="!r:example.org",
text="",
attachments=[
Attachment(
type="document",
filename="report.pdf",
workspace_path="surfaces/matrix/alice/r/inbox/report.pdf",
mime_type="application/pdf",
)
],
)
)
room = SimpleNamespace(room_id="!r:example.org")
event = SimpleNamespace(
sender="@alice:example.org",
body="report.pdf",
msgtype="m.file",
url="mxc://hs/id",
mimetype="application/pdf",
replyto_event_id=None,
)
await bot.on_room_message(room, event)
runtime.dispatcher.dispatch.assert_not_awaited()
staged = await get_staged_attachments(runtime.store, "!r:example.org", "@alice:example.org")
assert [item["filename"] for item in staged] == ["report.pdf"]
client.room_send.assert_not_awaited()
async def test_list_command_returns_current_staged_attachments():
runtime = build_runtime(platform=MockPlatformClient())
await add_staged_attachment(
runtime.store,
"!r:example.org",
"@alice:example.org",
{"filename": "a.pdf", "workspace_path": "a.pdf"},
)
await add_staged_attachment(
runtime.store,
"!r:example.org",
"@alice:example.org",
{"filename": "b.pdf", "workspace_path": "b.pdf"},
)
client = SimpleNamespace(user_id="@bot:example.org", room_send=AsyncMock())
bot = MatrixBot(client, runtime)
runtime.dispatcher.dispatch = AsyncMock(return_value=[])
room = SimpleNamespace(room_id="!r:example.org")
event = SimpleNamespace(
sender="@alice:example.org", body="!list", msgtype="m.text", replyto_event_id=None
)
await bot.on_room_message(room, event)
runtime.dispatcher.dispatch.assert_not_awaited()
body = client.room_send.await_args.args[2]["body"]
assert "1. a.pdf" in body
assert "2. b.pdf" in body
async def test_remove_invalid_index_returns_short_error():
runtime = build_runtime(platform=MockPlatformClient())
await add_staged_attachment(
runtime.store,
"!r:example.org",
"@alice:example.org",
{"filename": "a.pdf", "workspace_path": "a.pdf"},
)
client = SimpleNamespace(user_id="@bot:example.org", room_send=AsyncMock())
bot = MatrixBot(client, runtime)
runtime.dispatcher.dispatch = AsyncMock(return_value=[])
room = SimpleNamespace(room_id="!r:example.org")
event = SimpleNamespace(
sender="@alice:example.org", body="!remove 9", msgtype="m.text", replyto_event_id=None
)
await bot.on_room_message(room, event)
runtime.dispatcher.dispatch.assert_not_awaited()
assert client.room_send.await_args.args[2]["body"] == "Нет такого вложения."
async def test_remove_attachment_updates_list_and_state():
runtime = build_runtime(platform=MockPlatformClient())
await add_staged_attachment(
runtime.store,
"!r:example.org",
"@alice:example.org",
{"filename": "a.pdf", "workspace_path": "a.pdf"},
)
await add_staged_attachment(
runtime.store,
"!r:example.org",
"@alice:example.org",
{"filename": "b.pdf", "workspace_path": "b.pdf"},
)
client = SimpleNamespace(user_id="@bot:example.org", room_send=AsyncMock())
bot = MatrixBot(client, runtime)
runtime.dispatcher.dispatch = AsyncMock(return_value=[])
room = SimpleNamespace(room_id="!r:example.org")
event = SimpleNamespace(
sender="@alice:example.org", body="!remove 1", msgtype="m.text", replyto_event_id=None
)
await bot.on_room_message(room, event)
runtime.dispatcher.dispatch.assert_not_awaited()
staged = await get_staged_attachments(runtime.store, "!r:example.org", "@alice:example.org")
assert [item["filename"] for item in staged] == ["b.pdf"]
body = client.room_send.await_args.args[2]["body"]
assert "1. b.pdf" in body
assert "a.pdf" not in body
async def test_remove_all_clears_state():
runtime = build_runtime(platform=MockPlatformClient())
await add_staged_attachment(
runtime.store,
"!r:example.org",
"@alice:example.org",
{"filename": "a.pdf", "workspace_path": "a.pdf"},
)
client = SimpleNamespace(user_id="@bot:example.org", room_send=AsyncMock())
bot = MatrixBot(client, runtime)
runtime.dispatcher.dispatch = AsyncMock(return_value=[])
room = SimpleNamespace(room_id="!r:example.org")
event = SimpleNamespace(
sender="@alice:example.org",
body="!remove all",
msgtype="m.text",
replyto_event_id=None,
)
await bot.on_room_message(room, event)
runtime.dispatcher.dispatch.assert_not_awaited()
assert await get_staged_attachments(runtime.store, "!r:example.org", "@alice:example.org") == []
assert client.room_send.await_args.args[2]["body"] == "Все вложения удалены."
async def test_staged_attachment_commands_are_scoped_by_room_and_user():
runtime = build_runtime(platform=MockPlatformClient())
await add_staged_attachment(
runtime.store,
"!r-one:example.org",
"@alice:example.org",
{"filename": "alice-room-one.pdf", "workspace_path": "alice-room-one.pdf"},
)
await add_staged_attachment(
runtime.store,
"!r-two:example.org",
"@alice:example.org",
{"filename": "alice-room-two.pdf", "workspace_path": "alice-room-two.pdf"},
)
await add_staged_attachment(
runtime.store,
"!r-one:example.org",
"@bob:example.org",
{"filename": "bob-room-one.pdf", "workspace_path": "bob-room-one.pdf"},
)
client = SimpleNamespace(user_id="@bot:example.org", room_send=AsyncMock())
bot = MatrixBot(client, runtime)
runtime.dispatcher.dispatch = AsyncMock(return_value=[])
room = SimpleNamespace(room_id="!r-one:example.org")
event = SimpleNamespace(
sender="@alice:example.org",
body="!list",
msgtype="m.text",
replyto_event_id=None,
)
await bot.on_room_message(room, event)
runtime.dispatcher.dispatch.assert_not_awaited()
body = client.room_send.await_args.args[2]["body"]
assert "alice-room-one.pdf" in body
assert "alice-room-two.pdf" not in body
assert "bob-room-one.pdf" not in body
async def test_next_normal_message_commits_staged_attachments():
runtime = build_runtime(platform=MockPlatformClient())
await set_room_meta(
runtime.store,
"!r:example.org",
{
"chat_id": "C1",
"matrix_user_id": "@alice:example.org",
"platform_chat_id": "matrix:ctx-1",
},
)
await add_staged_attachment(
runtime.store,
"!r:example.org",
"@alice:example.org",
{
"type": "document",
"filename": "report.pdf",
"workspace_path": "surfaces/matrix/alice/r/inbox/report.pdf",
"mime_type": "application/pdf",
},
)
client = SimpleNamespace(user_id="@bot:example.org")
bot = MatrixBot(client, runtime)
bot._send_all = AsyncMock()
runtime.dispatcher.dispatch = AsyncMock(return_value=[])
room = SimpleNamespace(room_id="!r:example.org")
event = SimpleNamespace(
sender="@alice:example.org",
body="Проанализируй",
msgtype="m.text",
replyto_event_id=None,
)
await bot.on_room_message(room, event)
dispatched = runtime.dispatcher.dispatch.await_args.args[0]
assert isinstance(dispatched, IncomingMessage)
assert dispatched.text == "Проанализируй"
assert [a.workspace_path for a in dispatched.attachments] == [
"surfaces/matrix/alice/r/inbox/report.pdf"
]
assert await get_staged_attachments(runtime.store, "!r:example.org", "@alice:example.org") == []
async def test_failed_commit_preserves_staged_attachments():
runtime = build_runtime(platform=MockPlatformClient())
await set_room_meta(
runtime.store,
"!r:example.org",
{
"chat_id": "C1",
"matrix_user_id": "@alice:example.org",
"platform_chat_id": "matrix:ctx-1",
},
)
await add_staged_attachment(
runtime.store,
"!r:example.org",
"@alice:example.org",
{
"type": "document",
"filename": "report.pdf",
"workspace_path": "surfaces/matrix/alice/r/inbox/report.pdf",
},
)
client = SimpleNamespace(user_id="@bot:example.org", room_send=AsyncMock())
bot = MatrixBot(client, runtime)
runtime.dispatcher.dispatch = AsyncMock(side_effect=PlatformError("boom"))
room = SimpleNamespace(room_id="!r:example.org")
event = SimpleNamespace(
sender="@alice:example.org",
body="Проанализируй",
msgtype="m.text",
replyto_event_id=None,
)
await bot.on_room_message(room, event)
staged = await get_staged_attachments(runtime.store, "!r:example.org", "@alice:example.org")
assert [item["filename"] for item in staged] == ["report.pdf"]
async def test_bot_keeps_commands_on_local_chat_id():
runtime = build_runtime(platform=MockPlatformClient())
await set_room_meta(
@ -350,7 +686,10 @@ async def test_bot_assigns_platform_chat_id_before_load_selection():
await bot.on_room_message(room, event)
assert await get_platform_chat_id(runtime.store, "!chat1:example.org") == "matrix:!chat1:example.org"
assert (
await get_platform_chat_id(runtime.store, "!chat1:example.org")
== "matrix:!chat1:example.org"
)
client.room_send.assert_awaited_once_with(
"!chat1:example.org",
"m.room.message",
@ -415,7 +754,9 @@ async def test_unregistered_room_second_message_reuses_existing_bootstrap():
room = SimpleNamespace(room_id="!entry:example.org", display_name="Entry")
await bot.on_room_message(room, SimpleNamespace(sender="@alice:example.org", body="hello"))
await bot.on_room_message(room, SimpleNamespace(sender="@alice:example.org", body="hello again"))
await bot.on_room_message(
room, SimpleNamespace(sender="@alice:example.org", body="hello again")
)
assert client.room_create.await_count == 2
room_send_calls = client.room_send.await_args_list
@ -430,6 +771,43 @@ async def test_unregistered_room_second_message_reuses_existing_bootstrap():
assert "platform_chat_id" not in entry_meta
async def test_unregistered_room_welcome_send_failure_does_not_repeat_bootstrap():
runtime = build_runtime(platform=MockPlatformClient())
await set_user_meta(runtime.store, "@alice:example.org", {"next_chat_index": 1})
space_resp = SimpleNamespace(room_id="!space:example.org")
chat_resp = SimpleNamespace(room_id="!chat1:example.org")
client = SimpleNamespace(
user_id="@bot:example.org",
room_create=AsyncMock(side_effect=[space_resp, chat_resp]),
room_put_state=AsyncMock(),
room_send=AsyncMock(side_effect=[RuntimeError("welcome failed"), None]),
)
bot = MatrixBot(client, runtime)
room = SimpleNamespace(room_id="!entry:example.org", display_name="Entry")
with pytest.raises(RuntimeError, match="welcome failed"):
await bot.on_room_message(room, SimpleNamespace(sender="@alice:example.org", body="hello"))
entry_meta = await get_room_meta(runtime.store, "!entry:example.org")
assert entry_meta == {
"matrix_user_id": "@alice:example.org",
"redirect_room_id": "!chat1:example.org",
"redirect_chat_id": "C1",
}
await bot.on_room_message(
room, SimpleNamespace(sender="@alice:example.org", body="hello again")
)
assert client.room_create.await_count == 2
room_send_calls = client.room_send.await_args_list
assert any(
call.args[0] == "!entry:example.org"
and "Рабочий чат уже создан: C1" in call.args[2]["body"]
for call in room_send_calls
)
async def test_unregistered_room_creates_new_chat_in_existing_space():
runtime = build_runtime(platform=MockPlatformClient())
await set_user_meta(
@ -466,7 +844,9 @@ async def test_mat11_settings_returns_mvp_unavailable_message():
runtime = build_runtime(platform=MockPlatformClient())
current_chat_id = "C9"
start = IncomingCommand(user_id="u1", platform="matrix", chat_id=current_chat_id, command="start")
start = IncomingCommand(
user_id="u1", platform="matrix", chat_id=current_chat_id, command="start"
)
await runtime.dispatcher.dispatch(start)
settings_cmd = IncomingCommand(
@ -587,3 +967,43 @@ async def test_matrix_main_closes_platform_without_connecting_root_agent(monkeyp
agent_connect.assert_not_awaited()
platform_close.assert_awaited_once()
async def test_matrix_main_registers_media_message_callbacks(monkeypatch):
bot_module = importlib.import_module("adapter.matrix.bot")
runtime = SimpleNamespace(platform=SimpleNamespace(close=AsyncMock()))
created_clients = []
class FakeAsyncClient:
def __init__(self, *args, **kwargs):
self.access_token = None
self.callbacks = []
self.sync_forever = AsyncMock()
self.close = AsyncMock()
created_clients.append(self)
async def login(self, *args, **kwargs):
raise AssertionError("login should not be called when access token is provided")
def add_event_callback(self, callback, event_type):
self.callbacks.append((callback, event_type))
monkeypatch.setenv("MATRIX_HOMESERVER", "https://matrix.example.org")
monkeypatch.setenv("MATRIX_USER_ID", "@bot:example.org")
monkeypatch.setenv("MATRIX_ACCESS_TOKEN", "token")
monkeypatch.setattr(bot_module, "AsyncClient", FakeAsyncClient)
monkeypatch.setattr(bot_module, "build_runtime", lambda **kwargs: runtime)
monkeypatch.setattr(bot_module, "prepare_live_sync", AsyncMock(return_value="s123"))
await bot_module.main()
assert len(created_clients) == 1
registered_types = [event_type for _, event_type in created_clients[0].callbacks]
assert (
RoomMessageText,
RoomMessageFile,
RoomMessageImage,
RoomMessageVideo,
RoomMessageAudio,
) in registered_types

View file

@ -0,0 +1,50 @@
from __future__ import annotations
from pathlib import Path
from types import SimpleNamespace
from adapter.matrix.files import build_workspace_attachment_path, download_matrix_attachment
from core.protocol import Attachment
def test_build_workspace_attachment_path_scopes_by_surface_user_and_room(tmp_path: Path):
rel_path, abs_path = build_workspace_attachment_path(
workspace_root=tmp_path,
matrix_user_id="@alice:example.org",
room_id="!room:example.org",
filename="report.pdf",
timestamp="20260420-153000",
)
assert (
rel_path
== "surfaces/matrix/alice_example.org/room_example.org/inbox/20260420-153000-report.pdf"
)
assert abs_path == tmp_path / rel_path
async def test_download_matrix_attachment_persists_file_and_returns_workspace_path(tmp_path: Path):
async def download(url: str):
assert url == "mxc://server/id"
return SimpleNamespace(body=b"%PDF-1.7")
client = SimpleNamespace(download=download)
attachment = Attachment(
type="document",
url="mxc://server/id",
filename="report.pdf",
mime_type="application/pdf",
)
saved = await download_matrix_attachment(
client=client,
workspace_root=tmp_path,
matrix_user_id="@alice:example.org",
room_id="!room:example.org",
attachment=attachment,
timestamp="20260420-153000",
)
assert saved.workspace_path is not None
assert saved.workspace_path.endswith("20260420-153000-report.pdf")
assert (tmp_path / saved.workspace_path).read_bytes() == b"%PDF-1.7"

View file

@ -9,7 +9,7 @@ from adapter.matrix.handlers.confirm import make_handle_cancel, make_handle_conf
from adapter.matrix.store import get_pending_confirm, set_room_meta
from core.auth import AuthManager
from core.chat import ChatManager
from core.protocol import OutgoingUI, UIButton
from core.protocol import Attachment, OutgoingMessage, OutgoingUI, UIButton
from core.settings import SettingsManager
from core.store import InMemoryStore
from sdk.mock import MockPlatformClient
@ -156,3 +156,39 @@ async def test_outgoing_ui_no_round_trip_uses_user_and_room_scope():
assert "отменено" in result[0].text.lower()
assert await get_pending_confirm(store, "@alice:example.org", "!confirm:example.org") is None
assert await get_pending_confirm(store, "@bob:example.org", "!other:example.org") is not None
async def test_send_outgoing_uploads_workspace_file_attachment(tmp_path, monkeypatch):
workspace_file = tmp_path / "surfaces" / "matrix" / "alice" / "room" / "inbox" / "result.txt"
workspace_file.parent.mkdir(parents=True, exist_ok=True)
workspace_file.write_text("ready")
monkeypatch.setenv("SURFACES_WORKSPACE_DIR", str(tmp_path))
client = SimpleNamespace(
upload=AsyncMock(return_value=(SimpleNamespace(content_uri="mxc://server/file"), {})),
room_send=AsyncMock(),
)
await send_outgoing(
client,
"!room:example.org",
OutgoingMessage(
chat_id="!room:example.org",
text="Файл готов",
attachments=[
Attachment(
type="document",
filename="result.txt",
mime_type="text/plain",
workspace_path="surfaces/matrix/alice/room/inbox/result.txt",
)
],
),
)
client.upload.assert_awaited_once()
client.room_send.assert_awaited()
assert client.room_send.await_args_list[0].args[2]["body"] == "Файл готов"
file_call = client.room_send.await_args_list[1]
assert file_call.args[2]["msgtype"] == "m.file"
assert file_call.args[2]["url"] == "mxc://server/file"

View file

@ -3,14 +3,19 @@ from __future__ import annotations
import pytest
from adapter.matrix.store import (
STAGED_ATTACHMENTS_PREFIX,
add_staged_attachment,
clear_pending_confirm,
clear_staged_attachments,
get_pending_confirm,
get_platform_chat_id,
get_room_meta,
get_room_state,
get_skills_message_id,
get_staged_attachments,
get_user_meta,
next_chat_id,
remove_staged_attachment_at,
set_pending_confirm,
set_platform_chat_id,
set_room_meta,
@ -116,3 +121,118 @@ async def test_pending_confirm_roundtrip(store: InMemoryStore):
await clear_pending_confirm(store, "!room:m.org")
assert await get_pending_confirm(store, "!room:m.org") is None
async def test_staged_attachments_roundtrip(store: InMemoryStore):
room_id = "!room:m.org"
user_id = "@alice:m.org"
assert await get_staged_attachments(store, room_id, user_id) == []
first = {"id": "att-1", "name": "screenshot.png"}
second = {"id": "att-2", "name": "invoice.pdf"}
await add_staged_attachment(store, room_id, user_id, first)
await add_staged_attachment(store, room_id, user_id, second)
assert await get_staged_attachments(store, room_id, user_id) == [
first,
second,
]
@pytest.mark.parametrize(
"stored_value",
[
None,
"not-a-dict",
[],
123,
],
)
async def test_staged_attachments_invalid_container_state_returns_empty_list(
store: InMemoryStore, stored_value,
):
room_id = "!room:m.org"
user_id = "@alice:m.org"
await store.set(f"{STAGED_ATTACHMENTS_PREFIX}{room_id}:{user_id}", stored_value)
assert await get_staged_attachments(store, room_id, user_id) == []
async def test_staged_attachments_filters_invalid_entries(store: InMemoryStore):
room_id = "!room:m.org"
user_id = "@alice:m.org"
valid_one = {"id": "att-1", "name": "alpha.png"}
valid_two = {"id": "att-2", "name": "beta.pdf"}
await store.set(
f"{STAGED_ATTACHMENTS_PREFIX}{room_id}:{user_id}",
{
"attachments": [
valid_one,
"bad-entry",
None,
{"id": "ignored"},
valid_two,
]
},
)
assert await get_staged_attachments(store, room_id, user_id) == [
valid_one,
{"id": "ignored"},
valid_two,
]
async def test_staged_attachments_are_scoped_by_room_and_user(store: InMemoryStore):
room_a = "!room-a:m.org"
room_b = "!room-b:m.org"
user_a = "@alice:m.org"
user_b = "@bob:m.org"
attachment_a = {"id": "att-a", "name": "alpha.png"}
attachment_b = {"id": "att-b", "name": "beta.png"}
attachment_c = {"id": "att-c", "name": "gamma.png"}
await add_staged_attachment(store, room_a, user_a, attachment_a)
await add_staged_attachment(store, room_a, user_b, attachment_b)
await add_staged_attachment(store, room_b, user_a, attachment_c)
assert await get_staged_attachments(store, room_a, user_a) == [attachment_a]
assert await get_staged_attachments(store, room_a, user_b) == [attachment_b]
assert await get_staged_attachments(store, room_b, user_a) == [attachment_c]
assert await get_staged_attachments(store, room_b, user_b) == []
async def test_remove_staged_attachment_at_by_zero_based_index(
store: InMemoryStore,
):
room_id = "!room:m.org"
user_id = "@alice:m.org"
first = {"id": "att-1", "name": "first.png"}
second = {"id": "att-2", "name": "second.png"}
third = {"id": "att-3", "name": "third.png"}
await add_staged_attachment(store, room_id, user_id, first)
await add_staged_attachment(store, room_id, user_id, second)
await add_staged_attachment(store, room_id, user_id, third)
assert await remove_staged_attachment_at(store, room_id, user_id, 1) == second
assert await get_staged_attachments(store, room_id, user_id) == [first, third]
assert await remove_staged_attachment_at(store, room_id, user_id, 99) is None
assert await remove_staged_attachment_at(store, room_id, user_id, -1) is None
async def test_clear_staged_attachments(store: InMemoryStore):
room_id = "!room:m.org"
user_id = "@alice:m.org"
await add_staged_attachment(store, room_id, user_id, {"id": "att-1"})
await add_staged_attachment(store, room_id, user_id, {"id": "att-2"})
await clear_staged_attachments(store, room_id, user_id)
assert await get_staged_attachments(store, room_id, user_id) == []

View file

@ -75,6 +75,27 @@ async def test_dispatch_routes_audio_before_catchall(dispatcher):
assert (await dispatcher.dispatch(text_msg))[0].text == "text"
async def test_dispatch_routes_document_before_catchall(dispatcher):
async def document_handler(event, **kwargs):
return [OutgoingMessage(chat_id=event.chat_id, text="document")]
async def catch_all(event, **kwargs):
return [OutgoingMessage(chat_id=event.chat_id, text="text")]
dispatcher.register(IncomingMessage, "document", document_handler)
dispatcher.register(IncomingMessage, "*", catch_all)
document_msg = IncomingMessage(
user_id="u1",
platform="matrix",
chat_id="C1",
text="",
attachments=[Attachment(type="document", workspace_path="surfaces/matrix/u1/file.pdf")],
)
assert (await dispatcher.dispatch(document_msg))[0].text == "document"
async def test_dispatch_callback_by_action(dispatcher):
async def confirm_handler(event, **kwargs):
return [OutgoingMessage(chat_id=event.chat_id, text="confirmed")]

View file

@ -23,11 +23,11 @@ from core.protocol import (
class FakeAgentApi:
def __init__(self) -> None:
self.calls: list[str] = []
self.calls: list[tuple[str, list[str]]] = []
self.last_tokens_used = 0
async def send_message(self, text: str):
self.calls.append(text)
async def send_message(self, text: str, attachments: list[str] | None = None):
self.calls.append((text, attachments or []))
yield type("Chunk", (), {"text": f"[REAL] {text}"})()
self.last_tokens_used = 5
@ -130,4 +130,31 @@ async def test_full_flow_with_real_platform_uses_shared_agent_api(real_dispatche
texts = [r.text for r in result if isinstance(r, OutgoingMessage)]
assert texts == ["[REAL] Привет!"]
assert agent_api.calls == ["Привет!"]
assert agent_api.calls == [("Привет!", [])]
async def test_full_flow_with_real_platform_forwards_workspace_attachment(real_dispatcher):
dispatcher, agent_api = real_dispatcher
start = IncomingCommand(user_id="u1", platform="matrix", chat_id="C1", command="start")
await dispatcher.dispatch(start)
msg = IncomingMessage(
user_id="u1",
platform="matrix",
chat_id="C1",
text="Посмотри файл",
attachments=[
Attachment(
type="document",
filename="report.pdf",
mime_type="application/pdf",
workspace_path="surfaces/matrix/u1/room/inbox/report.pdf",
)
],
)
await dispatcher.dispatch(msg)
assert agent_api.calls == [
("Посмотри файл", ["surfaces/matrix/u1/room/inbox/report.pdf"])
]

View file

@ -5,7 +5,7 @@ import pytest
from core.protocol import SettingsAction
import sdk.agent_api_wrapper as agent_api_wrapper_module
from sdk.agent_api_wrapper import AgentApiWrapper
from sdk.interface import MessageChunk, MessageResponse, UserSettings
from sdk.interface import Attachment, MessageChunk, MessageResponse, UserSettings
from sdk.prototype_state import PrototypeStateStore
from sdk.real import RealPlatformClient
@ -90,6 +90,100 @@ class BlockingChatAgentApi:
self.last_tokens_used = len(text)
class AttachmentTrackingChatAgentApi:
def __init__(self, chat_id: str) -> None:
self.chat_id = chat_id
self.calls: list[tuple[str, list[str] | None]] = []
self.connect_calls = 0
self.close_calls = 0
self.last_tokens_used = 0
async def connect(self) -> None:
self.connect_calls += 1
async def close(self) -> None:
self.close_calls += 1
async def send_message(self, text: str, attachments: list[str] | None = None):
self.calls.append((text, attachments))
yield FakeChunk(text)
self.last_tokens_used = 5
class SendFileEvent:
def __init__(self, *, workspace_path: str, mime_type: str, filename: str, size: int) -> None:
self.type = "AGENT_EVENT_SEND_FILE"
self.workspace_path = workspace_path
self.mime_type = mime_type
self.filename = filename
self.size = size
class TextChunkEvent:
def __init__(self, text: str) -> None:
self.type = "AGENT_EVENT_TEXT_CHUNK"
self.text = text
class ToolCallChunkEvent:
def __init__(self, payload: str) -> None:
self.type = "AGENT_EVENT_TOOL_CALL_CHUNK"
self.payload = payload
class ToolResultEvent:
def __init__(self, payload: str) -> None:
self.type = "AGENT_EVENT_TOOL_RESULT"
self.payload = payload
class CustomUpdateEvent:
def __init__(self, payload: str) -> None:
self.type = "AGENT_EVENT_CUSTOM_UPDATE"
self.payload = payload
class EndEvent:
def __init__(self, tokens_used: int) -> None:
self.type = "AGENT_EVENT_END"
self.tokens_used = tokens_used
class ErrorEvent:
def __init__(self, code: str, details: str) -> None:
self.type = "ERROR"
self.code = code
self.details = details
class GracefulDisconnectEvent:
def __init__(self) -> None:
self.type = "GRACEFUL_DISCONNECT"
class FakeWSMessage:
def __init__(self, data: str) -> None:
self.type = agent_api_wrapper_module.aiohttp.WSMsgType.TEXT
self.data = data
class FakeWebSocket:
def __init__(self, messages: list[FakeWSMessage]) -> None:
self._messages = list(messages)
def __aiter__(self):
return self
async def __anext__(self):
if not self._messages:
raise StopAsyncIteration
return self._messages.pop(0)
class MessageResponseWithAttachments(MessageResponse):
attachments: list[Attachment] = []
def test_agent_api_wrapper_uses_modern_constructor_when_available(monkeypatch):
calls: list[dict[str, object]] = []
@ -219,6 +313,76 @@ async def test_real_platform_client_send_message_uses_chat_bound_client():
assert await prototype_state.get_last_tokens_used_for_context("chat-7") == 3
@pytest.mark.asyncio
async def test_real_platform_client_forwards_attachments_to_chat_api():
agent_api = AttachmentTrackingChatAgentApi("chat-7")
client = RealPlatformClient(
agent_api=agent_api,
prototype_state=PrototypeStateStore(),
platform="matrix",
)
attachment = Attachment(
workspace_path="surfaces/matrix/alice/room/inbox/report.pdf",
mime_type="application/pdf",
filename="report.pdf",
size=123,
)
result = await client.send_message(
"@alice:example.org",
"chat-7",
"hello",
attachments=[attachment],
)
assert agent_api.calls == [("hello", ["surfaces/matrix/alice/room/inbox/report.pdf"])]
assert result.response == "hello"
assert result.tokens_used == 5
@pytest.mark.asyncio
async def test_real_platform_client_preserves_send_file_events_in_sync_result(monkeypatch):
agent_api = AttachmentTrackingChatAgentApi("chat-7")
client = RealPlatformClient(
agent_api=agent_api,
prototype_state=PrototypeStateStore(),
platform="matrix",
)
class FileEventAgentApi(AttachmentTrackingChatAgentApi):
async def send_message(self, text: str, attachments: list[str] | None = None):
self.calls.append((text, attachments))
yield TextChunkEvent("he")
yield SendFileEvent(
workspace_path="/workspace/report.pdf",
mime_type="application/pdf",
filename="report.pdf",
size=123,
)
yield TextChunkEvent("llo")
self.last_tokens_used = 9
monkeypatch.setattr(
"sdk.real.MessageResponse",
MessageResponseWithAttachments,
)
client._agent_api = FileEventAgentApi("chat-7")
result = await client.send_message("@alice:example.org", "chat-7", "hello")
assert result.response == "hello"
assert result.tokens_used == 9
assert result.attachments == [
Attachment(
url="/workspace/report.pdf",
mime_type="application/pdf",
filename="report.pdf",
size=123,
workspace_path="report.pdf",
)
]
@pytest.mark.asyncio
async def test_real_platform_client_works_with_legacy_agent_api_without_for_chat():
legacy_api = LegacyAgentApi()
@ -385,3 +549,85 @@ async def test_real_platform_client_settings_are_local():
assert isinstance(settings, UserSettings)
assert settings.skills["browser"] is True
assert settings.skills["web-search"] is True
@pytest.mark.asyncio
async def test_agent_api_wrapper_transparently_surfaces_modern_events(monkeypatch):
callback_events: list[object] = []
queue: asyncio.Queue = asyncio.Queue()
event_map = {
"text": TextChunkEvent("he"),
"tool_call": ToolCallChunkEvent("call"),
"tool_result": ToolResultEvent("result"),
"custom_update": CustomUpdateEvent("update"),
"send_file": SendFileEvent(
workspace_path="/workspace/report.pdf",
mime_type="application/pdf",
filename="report.pdf",
size=123,
),
"end": EndEvent(tokens_used=11),
"error": ErrorEvent(code="BOOM", details="bad things"),
"disconnect": GracefulDisconnectEvent(),
}
def fake_validate_json(data: str):
return event_map[data]
monkeypatch.setattr(
agent_api_wrapper_module,
"ServerMessage",
type("FakeServerMessage", (), {"validate_json": staticmethod(fake_validate_json)}),
)
async def fake_cleanup(self):
return None
monkeypatch.setattr(agent_api_wrapper_module.AgentApiWrapper, "_cleanup", fake_cleanup)
monkeypatch.setattr(
agent_api_wrapper_module.AgentApi,
"__init__",
lambda self, agent_id, base_url=None, chat_id=0, **kwargs: setattr(self, "id", agent_id)
or setattr(self, "callback", kwargs.get("callback"))
or setattr(self, "on_disconnect", kwargs.get("on_disconnect"))
or setattr(self, "_current_queue", None),
)
wrapper = AgentApiWrapper(
agent_id="agent-1",
base_url="https://agent.example.com/v1/agent_ws",
chat_id="chat-1",
callback=callback_events.append,
)
wrapper._current_queue = queue
wrapper._ws = FakeWebSocket(
[
FakeWSMessage("text"),
FakeWSMessage("tool_call"),
FakeWSMessage("tool_result"),
FakeWSMessage("custom_update"),
FakeWSMessage("send_file"),
FakeWSMessage("end"),
FakeWSMessage("error"),
FakeWSMessage("disconnect"),
]
)
await wrapper._listen()
queue_events = []
while not queue.empty():
queue_events.append(await queue.get())
assert queue_events[0].text == "he"
assert any(isinstance(event, SendFileEvent) for event in queue_events)
assert any(isinstance(event, EndEvent) for event in queue_events)
assert any(isinstance(event, GracefulDisconnectEvent) for event in queue_events)
assert callback_events[0].payload == "call"
assert callback_events[1].payload == "result"
assert callback_events[2].payload == "update"
assert any(isinstance(event, SendFileEvent) for event in callback_events)
assert any(isinstance(event, EndEvent) for event in callback_events)
assert any(isinstance(event, ErrorEvent) for event in callback_events)
assert any(isinstance(event, GracefulDisconnectEvent) for event in callback_events)
assert wrapper.last_tokens_used == 11