diff --git a/.env.example b/.env.example index c7edcbc..3af498d 100644 --- a/.env.example +++ b/.env.example @@ -5,13 +5,16 @@ TELEGRAM_BOT_TOKEN=your_bot_token_here MATRIX_HOMESERVER=https://matrix.org MATRIX_USER_ID=@bot:matrix.org MATRIX_PASSWORD=your_password_here - -# Lambda Platform -LAMBDA_PLATFORM_URL=http://localhost:8000 -LAMBDA_SERVICE_TOKEN=your_service_token_here -AGENT_WS_URL=ws://127.0.0.1:8000/agent_ws/ -AGENT_BASE_URL=http://127.0.0.1:8000 MATRIX_PLATFORM_BACKEND=real -# Режим работы: "mock" или "production" -PLATFORM_MODE=mock +# Shared workspace contract +SURFACES_WORKSPACE_DIR=/workspace + +# Compose-local platform-agent route +AGENT_WS_URL=ws://platform-agent:8000/v1/agent_ws/{chat_id}/ +AGENT_BASE_URL=http://platform-agent:8000 + +# platform-agent provider +PROVIDER_MODEL=openai/gpt-4o-mini +PROVIDER_URL=https://openrouter.ai/api/v1 +PROVIDER_API_KEY=sk-or-... diff --git a/README.md b/README.md index a9d7f71..8d95c6b 100644 --- a/README.md +++ b/README.md @@ -136,7 +136,9 @@ Root `docker-compose.yml` теперь является основным лок docker compose up --build ``` -Compose использует локальные директории `external/platform-agent` и `external/platform-agent_api` как источник кода для агента. +Compose собирает `platform-agent` из актуального upstream `external/platform-agent` Dockerfile (`development` target), +монтирует live-код из `external/platform-agent/src` и `external/platform-agent_api`, и подготавливает shared `/workspace` +с правами для agent runtime. Matrix бот подключается к `platform-agent` по service name, а не к отдельно запущенному `localhost`. ### 4.1. Staged attachments в Matrix diff --git a/adapter/matrix/bot.py b/adapter/matrix/bot.py index bd3934a..cf8a74f 100644 --- a/adapter/matrix/bot.py +++ b/adapter/matrix/bot.py @@ -13,7 +13,12 @@ from nio import ( InviteMemberEvent, MatrixRoom, RoomMemberEvent, + RoomMessage, + RoomMessageAudio, + RoomMessageFile, + RoomMessageImage, RoomMessageText, + RoomMessageVideo, ) from nio.responses import SyncResponse @@ -227,19 +232,6 @@ class MatrixBot: incoming, ) await self._stage_attachments(room.room_id, sender, materialized.attachments) - await self._send_all( - room.room_id, - [ - OutgoingMessage( - chat_id=dispatch_chat_id, - text=await self._format_staged_attachments( - room.room_id, - sender, - include_hint=True, - ), - ) - ], - ) return if isinstance(incoming, IncomingMessage) and incoming.attachments: incoming = await self._materialize_incoming_attachments( @@ -276,12 +268,12 @@ class MatrixBot: await self._send_all(room.room_id, outgoing) def _is_file_only_event( - self, event: RoomMessageText, incoming: IncomingMessage | IncomingCommand + self, event: RoomMessage, incoming: IncomingMessage | IncomingCommand ) -> bool: return ( isinstance(incoming, IncomingMessage) and bool(incoming.attachments) - and getattr(event, "msgtype", None) != "m.text" + and not isinstance(event, RoomMessageText) ) async def _stage_attachments( @@ -669,7 +661,16 @@ async def main() -> None: since_token = await prepare_live_sync(client) bot = MatrixBot(client, runtime) - client.add_event_callback(bot.on_room_message, RoomMessageText) + client.add_event_callback( + bot.on_room_message, + ( + RoomMessageText, + RoomMessageFile, + RoomMessageImage, + RoomMessageVideo, + RoomMessageAudio, + ), + ) client.add_event_callback(bot.on_member, (InviteMemberEvent, RoomMemberEvent)) logger.info( diff --git a/adapter/matrix/converter.py b/adapter/matrix/converter.py index f8edd78..a19d8ea 100644 --- a/adapter/matrix/converter.py +++ b/adapter/matrix/converter.py @@ -14,7 +14,8 @@ PLATFORM = "matrix" def extract_attachments(event: Any) -> list[Attachment]: - content = getattr(event, "content", {}) or {} + source = getattr(event, "source", {}) or {} + content = source.get("content", {}) or getattr(event, "content", {}) or {} msgtype = getattr(event, "msgtype", None) if msgtype is None: msgtype = content.get("msgtype") diff --git a/adapter/matrix/files.py b/adapter/matrix/files.py new file mode 100644 index 0000000..52d1a1c --- /dev/null +++ b/adapter/matrix/files.py @@ -0,0 +1,103 @@ +from __future__ import annotations + +import mimetypes +import re +from datetime import UTC, datetime +from pathlib import Path + +from core.protocol import Attachment + + +def _sanitize_component(value: str) -> str: + cleaned = re.sub(r"[^A-Za-z0-9._-]+", "_", value) + cleaned = cleaned.strip("._-") + return cleaned or "unknown" + + +def _default_filename(attachment: Attachment) -> str: + if attachment.filename: + return attachment.filename + + extension = mimetypes.guess_extension(attachment.mime_type or "") or "" + base = { + "image": "image", + "audio": "audio", + "video": "video", + "document": "attachment", + }.get(attachment.type, "attachment") + return f"{base}{extension}" + + +def build_workspace_attachment_path( + *, + workspace_root: Path, + matrix_user_id: str, + room_id: str, + filename: str, + timestamp: str | None = None, +) -> tuple[str, Path]: + stamp = timestamp or datetime.now(UTC).strftime("%Y%m%d-%H%M%S") + safe_user = _sanitize_component(matrix_user_id.lstrip("@")) + safe_room = _sanitize_component(room_id.lstrip("!")) + safe_name = _sanitize_component(filename) or "attachment.bin" + relative_path = ( + Path("surfaces") + / "matrix" + / safe_user + / safe_room + / "inbox" + / f"{stamp}-{safe_name}" + ) + return relative_path.as_posix(), workspace_root / relative_path + + +async def download_matrix_attachment( + *, + client, + workspace_root: Path, + matrix_user_id: str, + room_id: str, + attachment: Attachment, + timestamp: str | None = None, +) -> Attachment: + if not attachment.url: + return attachment + + filename = _default_filename(attachment) + relative_path, absolute_path = build_workspace_attachment_path( + workspace_root=workspace_root, + matrix_user_id=matrix_user_id, + room_id=room_id, + filename=filename, + timestamp=timestamp, + ) + absolute_path.parent.mkdir(parents=True, exist_ok=True) + + response = await client.download(attachment.url) + body = getattr(response, "body", None) + if body is None: + raise RuntimeError(f"Matrix download response for {attachment.url} has no body") + absolute_path.write_bytes(body) + + return Attachment( + type=attachment.type, + url=attachment.url, + filename=filename, + mime_type=attachment.mime_type, + workspace_path=relative_path, + ) + + +def resolve_workspace_attachment_path(workspace_root: Path, workspace_path: str) -> Path: + path = Path(workspace_path) + if path.is_absolute(): + return path + return workspace_root / path + + +def matrix_msgtype_for_attachment(attachment: Attachment) -> str: + return { + "image": "m.image", + "audio": "m.audio", + "video": "m.video", + }.get(attachment.type, "m.file") diff --git a/core/handlers/message.py b/core/handlers/message.py index 2edb87e..d9f91cd 100644 --- a/core/handlers/message.py +++ b/core/handlers/message.py @@ -29,10 +29,15 @@ async def handle_message(event: IncomingMessage, auth_mgr, platform, chat_mgr, s user_id=event.user_id, chat_id=event.chat_id, text=event.text, - attachments=[], + attachments=event.attachments, ) return [ OutgoingTyping(chat_id=event.chat_id, is_typing=False), - OutgoingMessage(chat_id=event.chat_id, text=response.response, parse_mode="markdown"), + OutgoingMessage( + chat_id=event.chat_id, + text=response.response, + parse_mode="markdown", + attachments=list(getattr(response, "attachments", [])), + ), ] diff --git a/core/protocol.py b/core/protocol.py index 02a9f4a..7d6e25f 100644 --- a/core/protocol.py +++ b/core/protocol.py @@ -12,6 +12,7 @@ class Attachment: content: bytes | None = None filename: str | None = None mime_type: str | None = None + workspace_path: str | None = None @dataclass diff --git a/docker-compose.yml b/docker-compose.yml index 480ecad..d6c2e4d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,5 +1,39 @@ services: + platform-agent: + build: + context: ./external/platform-agent + target: development + additional_contexts: + agent_api: ./external/platform-agent_api + env_file: .env + environment: + PYTHONUNBUFFERED: "1" + volumes: + - ./external/platform-agent/src:/app/src + - ./external/platform-agent_api:/agent_api + - workspace:/workspace + command: > + sh -lc " + mkdir -p /workspace && + chown -R agent:agent /workspace && + exec /app/.venv/bin/uvicorn src.main:app --host 0.0.0.0 --port 8000 + " + ports: + - "8000:8000" + restart: unless-stopped + matrix-bot: build: . env_file: .env + environment: + AGENT_BASE_URL: http://platform-agent:8000 + AGENT_WS_URL: ws://platform-agent:8000/v1/agent_ws/ + SURFACES_WORKSPACE_DIR: /workspace + depends_on: + - platform-agent + volumes: + - workspace:/workspace restart: unless-stopped + +volumes: + workspace: diff --git a/sdk/agent_api_wrapper.py b/sdk/agent_api_wrapper.py index 32f126d..94205ea 100644 --- a/sdk/agent_api_wrapper.py +++ b/sdk/agent_api_wrapper.py @@ -86,6 +86,55 @@ class AgentApiWrapper(AgentApi): **self._init_kwargs, ) + @staticmethod + def _event_kind(event: object) -> str: + raw_kind = getattr(event, "type", None) + if hasattr(raw_kind, "value"): + raw_kind = raw_kind.value + if raw_kind is None: + raw_kind = event.__class__.__name__ + + kind = str(raw_kind).replace("-", "_") + if "_" in kind: + return kind.upper() + + normalized = [] + for index, char in enumerate(kind): + if index and char.isupper() and not kind[index - 1].isupper(): + normalized.append("_") + normalized.append(char) + return "".join(normalized).upper() + + @classmethod + def _is_kind(cls, event: object, *needles: str) -> bool: + kind = cls._event_kind(event) + return any(needle in kind for needle in needles) + + @classmethod + def _is_text_event(cls, event: object) -> bool: + return hasattr(event, "text") or cls._is_kind(event, "TEXT_CHUNK") + + @classmethod + def _is_end_event(cls, event: object) -> bool: + kind = cls._event_kind(event) + return kind == "END" or kind.endswith("_END") + + @classmethod + def _is_send_file_event(cls, event: object) -> bool: + return "SEND_FILE" in cls._event_kind(event) + + async def _publish_event(self, event: object, *, queue_event: object | None = None) -> None: + if self.callback: + self.callback(event) + if self._current_queue: + await self._current_queue.put(queue_event if queue_event is not None else event) + + async def _publish_error(self, event: object) -> None: + if self.callback: + self.callback(event) + if self._current_queue and hasattr(event, "code") and hasattr(event, "details"): + await self._current_queue.put(AgentException(getattr(event, "code"), getattr(event, "details"))) + async def _listen(self): try: async for msg in self._ws: @@ -93,7 +142,7 @@ class AgentApiWrapper(AgentApi): try: outgoing_msg = ServerMessage.validate_json(msg.data) - if isinstance(outgoing_msg, MsgEventTextChunk): + if self._is_text_event(outgoing_msg): if self._current_queue: await self._current_queue.put(outgoing_msg) elif self.callback: @@ -101,29 +150,22 @@ class AgentApiWrapper(AgentApi): else: logger.warning("[%s] AgentEvent without active request", self.id) - elif isinstance(outgoing_msg, MsgEventEnd): + elif self._is_end_event(outgoing_msg): self.last_tokens_used = outgoing_msg.tokens_used - if self._current_queue: - await self._current_queue.put(outgoing_msg) + await self._publish_event(outgoing_msg) - elif isinstance(outgoing_msg, MsgError): - if self.callback: - self.callback(outgoing_msg) + elif self._is_kind(outgoing_msg, "ERROR"): error = AgentException(outgoing_msg.code, outgoing_msg.details) logger.error("[%s] Agent error: %s", self.id, error) - if self._current_queue: - await self._current_queue.put(error) + await self._publish_error(outgoing_msg) - elif isinstance(outgoing_msg, MsgGracefulDisconnect): - if self.callback: - self.callback(outgoing_msg) + elif self._is_kind(outgoing_msg, "GRACEFUL_DISCONNECT"): + await self._publish_event(outgoing_msg) logger.info("[%s] Gracefully disconnecting", self.id) break else: - logger.warning("[%s] Unknown message type: %s", self.id, outgoing_msg.type) - if self.callback: - self.callback(outgoing_msg) + await self._publish_event(outgoing_msg) except Exception as exc: logger.error("[%s] Failed to deserialize message: %s", self.id, exc) diff --git a/sdk/interface.py b/sdk/interface.py index e1ff12e..c885867 100644 --- a/sdk/interface.py +++ b/sdk/interface.py @@ -4,7 +4,7 @@ from __future__ import annotations from datetime import datetime from typing import Any, AsyncIterator, Literal, Protocol -from pydantic import BaseModel +from pydantic import BaseModel, Field class User(BaseModel): @@ -17,10 +17,11 @@ class User(BaseModel): class Attachment(BaseModel): - url: str - mime_type: str + url: str | None = None + mime_type: str | None = None size: int | None = None filename: str | None = None + workspace_path: str | None = None class MessageResponse(BaseModel): @@ -28,6 +29,7 @@ class MessageResponse(BaseModel): response: str tokens_used: int finished: bool + attachments: list[Attachment] = Field(default_factory=list) class MessageChunk(BaseModel): diff --git a/sdk/real.py b/sdk/real.py index f6e40ed..71803f4 100644 --- a/sdk/real.py +++ b/sdk/real.py @@ -1,6 +1,8 @@ from __future__ import annotations import asyncio +import inspect +from pathlib import Path from typing import AsyncIterator from sdk.agent_api_wrapper import AgentApiWrapper @@ -71,21 +73,43 @@ class RealPlatformClient(PlatformClient): ) -> MessageResponse: response_parts: list[str] = [] tokens_used = 0 + sent_attachments: list[Attachment] = [] message_id = user_id + saw_end_event = False - async for chunk in self.stream_message(user_id, chat_id, text, attachments=attachments): - message_id = chunk.message_id - if chunk.delta: - response_parts.append(chunk.delta) - if chunk.finished: - tokens_used = chunk.tokens_used + lock = self._get_chat_send_lock(chat_id) + async with lock: + chat_api = await self._get_chat_api(chat_id) + if hasattr(chat_api, "last_tokens_used"): + chat_api.last_tokens_used = 0 - return MessageResponse( - message_id=message_id, - response="".join(response_parts), - tokens_used=tokens_used, - finished=True, - ) + async for event in self._stream_agent_events(chat_api, text, attachments=attachments): + message_id = user_id + if self._is_text_event(event): + chunk_text = getattr(event, "text", "") + if chunk_text: + response_parts.append(chunk_text) + elif self._is_end_event(event): + tokens_used = getattr(event, "tokens_used", tokens_used) + saw_end_event = True + elif self._is_send_file_event(event): + attachment = self._attachment_from_send_file_event(event) + if attachment is not None: + sent_attachments.append(attachment) + + if not saw_end_event: + tokens_used = getattr(chat_api, "last_tokens_used", tokens_used) + await self._prototype_state.set_last_tokens_used(str(chat_id), tokens_used) + + response_kwargs = { + "message_id": message_id, + "response": "".join(response_parts), + "tokens_used": tokens_used, + "finished": True, + } + if self._message_response_accepts_attachments(): + response_kwargs["attachments"] = sent_attachments + return MessageResponse(**response_kwargs) async def stream_message( self, @@ -99,20 +123,37 @@ class RealPlatformClient(PlatformClient): chat_api = await self._get_chat_api(chat_id) if hasattr(chat_api, "last_tokens_used"): chat_api.last_tokens_used = 0 - async for event in chat_api.send_message(text): + saw_end_event = False + async for event in self._stream_agent_events(chat_api, text, attachments=attachments): + if self._is_text_event(event): + yield MessageChunk( + message_id=user_id, + delta=getattr(event, "text", ""), + finished=False, + ) + elif self._is_end_event(event): + tokens_used = getattr(event, "tokens_used", 0) + saw_end_event = True + await self._prototype_state.set_last_tokens_used(str(chat_id), tokens_used) + yield MessageChunk( + message_id=user_id, + delta="", + finished=True, + tokens_used=tokens_used, + ) + elif self._is_send_file_event(event): + continue + else: + continue + if not saw_end_event: + tokens_used = getattr(chat_api, "last_tokens_used", 0) + await self._prototype_state.set_last_tokens_used(str(chat_id), tokens_used) yield MessageChunk( message_id=user_id, - delta=event.text, - finished=False, + delta="", + finished=True, + tokens_used=tokens_used, ) - tokens_used = getattr(chat_api, "last_tokens_used", 0) - await self._prototype_state.set_last_tokens_used(str(chat_id), tokens_used) - yield MessageChunk( - message_id=user_id, - delta="", - finished=True, - tokens_used=tokens_used, - ) async def get_settings(self, user_id: str) -> UserSettings: return await self._prototype_state.get_settings(user_id) @@ -140,3 +181,107 @@ class RealPlatformClient(PlatformClient): close = getattr(self._agent_api, "close", None) if callable(close): await close() + + async def _stream_agent_events( + self, + chat_api, + text: str, + attachments: list[Attachment] | None = None, + ) -> AsyncIterator[object]: + send_message = chat_api.send_message + attachment_paths = self._attachment_paths(attachments) + if attachment_paths and self._send_message_accepts_attachments(send_message): + event_stream = send_message(text, attachments=attachment_paths) + else: + event_stream = send_message(text) + async for event in event_stream: + yield event + + @staticmethod + def _attachment_paths(attachments: list[Attachment] | None) -> list[str]: + if not attachments: + return [] + paths = [] + for attachment in attachments: + if attachment.workspace_path: + paths.append(attachment.workspace_path) + return paths + + @staticmethod + def _send_message_accepts_attachments(send_message) -> bool: + try: + parameters = inspect.signature(send_message).parameters + except (TypeError, ValueError): + return False + return "attachments" in parameters or any( + parameter.kind == inspect.Parameter.VAR_KEYWORD for parameter in parameters.values() + ) + + @staticmethod + def _event_kind(event: object) -> str: + raw_kind = getattr(event, "type", None) + if hasattr(raw_kind, "value"): + raw_kind = raw_kind.value + if raw_kind is None: + raw_kind = event.__class__.__name__ + + kind = str(raw_kind).replace("-", "_") + if "_" in kind: + return kind.upper() + normalized = [] + for index, char in enumerate(kind): + if index and char.isupper() and not kind[index - 1].isupper(): + normalized.append("_") + normalized.append(char) + return "".join(normalized).upper() + + @classmethod + def _is_text_event(cls, event: object) -> bool: + return hasattr(event, "text") or "TEXT_CHUNK" in cls._event_kind(event) + + @classmethod + def _is_end_event(cls, event: object) -> bool: + kind = cls._event_kind(event) + return kind == "END" or kind.endswith("_END") + + @classmethod + def _is_send_file_event(cls, event: object) -> bool: + kind = cls._event_kind(event) + return "SEND_FILE" in kind + + @staticmethod + def _attachment_from_send_file_event(event: object) -> Attachment | None: + location = None + for attr in ("url", "workspace_path", "path", "file_path", "uri"): + value = getattr(event, attr, None) + if value: + location = str(value) + break + if location is None: + return None + + mime_type = getattr(event, "mime_type", None) or "application/octet-stream" + filename = getattr(event, "filename", None) or Path(location).name or None + size = getattr(event, "size", None) + workspace_path = location + if workspace_path.startswith("/workspace/"): + workspace_path = workspace_path[len("/workspace/"):] + elif workspace_path == "/workspace": + workspace_path = "" + return Attachment( + url=location, + mime_type=mime_type, + size=size, + filename=filename, + workspace_path=workspace_path or None, + ) + + @staticmethod + def _message_response_accepts_attachments() -> bool: + fields = getattr(MessageResponse, "model_fields", None) + if isinstance(fields, dict): + return "attachments" in fields + try: + return "attachments" in inspect.signature(MessageResponse).parameters + except (TypeError, ValueError): + return False diff --git a/tests/adapter/matrix/test_converter.py b/tests/adapter/matrix/test_converter.py index a6b75fb..3513913 100644 --- a/tests/adapter/matrix/test_converter.py +++ b/tests/adapter/matrix/test_converter.py @@ -53,6 +53,24 @@ def content_file_event(): ) +def source_only_content_file_event(): + return SimpleNamespace( + sender="@a:m.org", + body="doc.pdf", + event_id="$e5", + msgtype=None, + replyto_event_id=None, + source={ + "content": { + "msgtype": "m.file", + "body": "source-only.pdf", + "url": "mxc://x/source-only", + "info": {"mimetype": "application/pdf"}, + } + }, + ) + + def test_plain_text_to_incoming_message(): result = from_room_event(text_event("Hello"), room_id="!r:m.org", chat_id="C1") assert isinstance(result, IncomingMessage) @@ -147,5 +165,15 @@ def test_attachment_falls_back_to_content_payload(): assert a.mime_type == "application/pdf" +def test_attachment_falls_back_to_source_content_payload(): + result = from_room_event(source_only_content_file_event(), room_id="!r:m.org", chat_id="C1") + assert isinstance(result, IncomingMessage) + a = result.attachments[0] + assert a.type == "document" + assert a.url == "mxc://x/source-only" + assert a.filename == "source-only.pdf" + assert a.mime_type == "application/pdf" + + def test_converter_module_does_not_expose_reaction_callbacks(): assert not hasattr(converter, "from_reaction") diff --git a/tests/adapter/matrix/test_dispatcher.py b/tests/adapter/matrix/test_dispatcher.py index b50dfe0..e2cae34 100644 --- a/tests/adapter/matrix/test_dispatcher.py +++ b/tests/adapter/matrix/test_dispatcher.py @@ -5,6 +5,13 @@ from types import SimpleNamespace from unittest.mock import AsyncMock import pytest +from nio import ( + RoomMessageAudio, + RoomMessageFile, + RoomMessageImage, + RoomMessageText, + RoomMessageVideo, +) from nio.api import RoomVisibility from nio.responses import SyncResponse @@ -332,7 +339,7 @@ async def test_bot_downloads_matrix_file_to_workspace_before_staging(tmp_path, m staged = await get_staged_attachments(runtime.store, "!chat1:example.org", "@alice:example.org") assert staged[0]["workspace_path"] is not None assert (tmp_path / staged[0]["workspace_path"]).read_bytes() == b"%PDF-1.7" - bot._send_all.assert_awaited_once() + bot._send_all.assert_not_awaited() async def test_file_only_event_is_staged_and_does_not_dispatch(): @@ -371,10 +378,7 @@ async def test_file_only_event_is_staged_and_does_not_dispatch(): runtime.dispatcher.dispatch.assert_not_awaited() staged = await get_staged_attachments(runtime.store, "!r:example.org", "@alice:example.org") assert [item["filename"] for item in staged] == ["report.pdf"] - client.room_send.assert_awaited_once() - assert ( - "Следующее сообщение отправит файлы агенту." in client.room_send.await_args.args[2]["body"] - ) + client.room_send.assert_not_awaited() async def test_list_command_returns_current_staged_attachments(): @@ -963,3 +967,43 @@ async def test_matrix_main_closes_platform_without_connecting_root_agent(monkeyp agent_connect.assert_not_awaited() platform_close.assert_awaited_once() + + +async def test_matrix_main_registers_media_message_callbacks(monkeypatch): + bot_module = importlib.import_module("adapter.matrix.bot") + + runtime = SimpleNamespace(platform=SimpleNamespace(close=AsyncMock())) + created_clients = [] + + class FakeAsyncClient: + def __init__(self, *args, **kwargs): + self.access_token = None + self.callbacks = [] + self.sync_forever = AsyncMock() + self.close = AsyncMock() + created_clients.append(self) + + async def login(self, *args, **kwargs): + raise AssertionError("login should not be called when access token is provided") + + def add_event_callback(self, callback, event_type): + self.callbacks.append((callback, event_type)) + + monkeypatch.setenv("MATRIX_HOMESERVER", "https://matrix.example.org") + monkeypatch.setenv("MATRIX_USER_ID", "@bot:example.org") + monkeypatch.setenv("MATRIX_ACCESS_TOKEN", "token") + monkeypatch.setattr(bot_module, "AsyncClient", FakeAsyncClient) + monkeypatch.setattr(bot_module, "build_runtime", lambda **kwargs: runtime) + monkeypatch.setattr(bot_module, "prepare_live_sync", AsyncMock(return_value="s123")) + + await bot_module.main() + + assert len(created_clients) == 1 + registered_types = [event_type for _, event_type in created_clients[0].callbacks] + assert ( + RoomMessageText, + RoomMessageFile, + RoomMessageImage, + RoomMessageVideo, + RoomMessageAudio, + ) in registered_types diff --git a/tests/adapter/matrix/test_files.py b/tests/adapter/matrix/test_files.py new file mode 100644 index 0000000..831ca72 --- /dev/null +++ b/tests/adapter/matrix/test_files.py @@ -0,0 +1,50 @@ +from __future__ import annotations + +from pathlib import Path +from types import SimpleNamespace + +from adapter.matrix.files import build_workspace_attachment_path, download_matrix_attachment +from core.protocol import Attachment + + +def test_build_workspace_attachment_path_scopes_by_surface_user_and_room(tmp_path: Path): + rel_path, abs_path = build_workspace_attachment_path( + workspace_root=tmp_path, + matrix_user_id="@alice:example.org", + room_id="!room:example.org", + filename="report.pdf", + timestamp="20260420-153000", + ) + + assert ( + rel_path + == "surfaces/matrix/alice_example.org/room_example.org/inbox/20260420-153000-report.pdf" + ) + assert abs_path == tmp_path / rel_path + + +async def test_download_matrix_attachment_persists_file_and_returns_workspace_path(tmp_path: Path): + async def download(url: str): + assert url == "mxc://server/id" + return SimpleNamespace(body=b"%PDF-1.7") + + client = SimpleNamespace(download=download) + attachment = Attachment( + type="document", + url="mxc://server/id", + filename="report.pdf", + mime_type="application/pdf", + ) + + saved = await download_matrix_attachment( + client=client, + workspace_root=tmp_path, + matrix_user_id="@alice:example.org", + room_id="!room:example.org", + attachment=attachment, + timestamp="20260420-153000", + ) + + assert saved.workspace_path is not None + assert saved.workspace_path.endswith("20260420-153000-report.pdf") + assert (tmp_path / saved.workspace_path).read_bytes() == b"%PDF-1.7" diff --git a/tests/adapter/matrix/test_send_outgoing.py b/tests/adapter/matrix/test_send_outgoing.py index 17eeefa..72b9fa6 100644 --- a/tests/adapter/matrix/test_send_outgoing.py +++ b/tests/adapter/matrix/test_send_outgoing.py @@ -9,7 +9,7 @@ from adapter.matrix.handlers.confirm import make_handle_cancel, make_handle_conf from adapter.matrix.store import get_pending_confirm, set_room_meta from core.auth import AuthManager from core.chat import ChatManager -from core.protocol import OutgoingUI, UIButton +from core.protocol import Attachment, OutgoingMessage, OutgoingUI, UIButton from core.settings import SettingsManager from core.store import InMemoryStore from sdk.mock import MockPlatformClient @@ -156,3 +156,39 @@ async def test_outgoing_ui_no_round_trip_uses_user_and_room_scope(): assert "отменено" in result[0].text.lower() assert await get_pending_confirm(store, "@alice:example.org", "!confirm:example.org") is None assert await get_pending_confirm(store, "@bob:example.org", "!other:example.org") is not None + + +async def test_send_outgoing_uploads_workspace_file_attachment(tmp_path, monkeypatch): + workspace_file = tmp_path / "surfaces" / "matrix" / "alice" / "room" / "inbox" / "result.txt" + workspace_file.parent.mkdir(parents=True, exist_ok=True) + workspace_file.write_text("ready") + monkeypatch.setenv("SURFACES_WORKSPACE_DIR", str(tmp_path)) + + client = SimpleNamespace( + upload=AsyncMock(return_value=(SimpleNamespace(content_uri="mxc://server/file"), {})), + room_send=AsyncMock(), + ) + + await send_outgoing( + client, + "!room:example.org", + OutgoingMessage( + chat_id="!room:example.org", + text="Файл готов", + attachments=[ + Attachment( + type="document", + filename="result.txt", + mime_type="text/plain", + workspace_path="surfaces/matrix/alice/room/inbox/result.txt", + ) + ], + ), + ) + + client.upload.assert_awaited_once() + client.room_send.assert_awaited() + assert client.room_send.await_args_list[0].args[2]["body"] == "Файл готов" + file_call = client.room_send.await_args_list[1] + assert file_call.args[2]["msgtype"] == "m.file" + assert file_call.args[2]["url"] == "mxc://server/file" diff --git a/tests/core/test_dispatcher.py b/tests/core/test_dispatcher.py index eb437d2..fad2a4f 100644 --- a/tests/core/test_dispatcher.py +++ b/tests/core/test_dispatcher.py @@ -75,6 +75,27 @@ async def test_dispatch_routes_audio_before_catchall(dispatcher): assert (await dispatcher.dispatch(text_msg))[0].text == "text" +async def test_dispatch_routes_document_before_catchall(dispatcher): + async def document_handler(event, **kwargs): + return [OutgoingMessage(chat_id=event.chat_id, text="document")] + + async def catch_all(event, **kwargs): + return [OutgoingMessage(chat_id=event.chat_id, text="text")] + + dispatcher.register(IncomingMessage, "document", document_handler) + dispatcher.register(IncomingMessage, "*", catch_all) + + document_msg = IncomingMessage( + user_id="u1", + platform="matrix", + chat_id="C1", + text="", + attachments=[Attachment(type="document", workspace_path="surfaces/matrix/u1/file.pdf")], + ) + + assert (await dispatcher.dispatch(document_msg))[0].text == "document" + + async def test_dispatch_callback_by_action(dispatcher): async def confirm_handler(event, **kwargs): return [OutgoingMessage(chat_id=event.chat_id, text="confirmed")] diff --git a/tests/core/test_integration.py b/tests/core/test_integration.py index ab8fc8c..fd7bd2e 100644 --- a/tests/core/test_integration.py +++ b/tests/core/test_integration.py @@ -23,11 +23,11 @@ from core.protocol import ( class FakeAgentApi: def __init__(self) -> None: - self.calls: list[str] = [] + self.calls: list[tuple[str, list[str]]] = [] self.last_tokens_used = 0 - async def send_message(self, text: str): - self.calls.append(text) + async def send_message(self, text: str, attachments: list[str] | None = None): + self.calls.append((text, attachments or [])) yield type("Chunk", (), {"text": f"[REAL] {text}"})() self.last_tokens_used = 5 @@ -130,4 +130,31 @@ async def test_full_flow_with_real_platform_uses_shared_agent_api(real_dispatche texts = [r.text for r in result if isinstance(r, OutgoingMessage)] assert texts == ["[REAL] Привет!"] - assert agent_api.calls == ["Привет!"] + assert agent_api.calls == [("Привет!", [])] + + +async def test_full_flow_with_real_platform_forwards_workspace_attachment(real_dispatcher): + dispatcher, agent_api = real_dispatcher + + start = IncomingCommand(user_id="u1", platform="matrix", chat_id="C1", command="start") + await dispatcher.dispatch(start) + + msg = IncomingMessage( + user_id="u1", + platform="matrix", + chat_id="C1", + text="Посмотри файл", + attachments=[ + Attachment( + type="document", + filename="report.pdf", + mime_type="application/pdf", + workspace_path="surfaces/matrix/u1/room/inbox/report.pdf", + ) + ], + ) + await dispatcher.dispatch(msg) + + assert agent_api.calls == [ + ("Посмотри файл", ["surfaces/matrix/u1/room/inbox/report.pdf"]) + ] diff --git a/tests/platform/test_real.py b/tests/platform/test_real.py index 6edecbd..e5f01e4 100644 --- a/tests/platform/test_real.py +++ b/tests/platform/test_real.py @@ -5,7 +5,7 @@ import pytest from core.protocol import SettingsAction import sdk.agent_api_wrapper as agent_api_wrapper_module from sdk.agent_api_wrapper import AgentApiWrapper -from sdk.interface import MessageChunk, MessageResponse, UserSettings +from sdk.interface import Attachment, MessageChunk, MessageResponse, UserSettings from sdk.prototype_state import PrototypeStateStore from sdk.real import RealPlatformClient @@ -90,6 +90,100 @@ class BlockingChatAgentApi: self.last_tokens_used = len(text) +class AttachmentTrackingChatAgentApi: + def __init__(self, chat_id: str) -> None: + self.chat_id = chat_id + self.calls: list[tuple[str, list[str] | None]] = [] + self.connect_calls = 0 + self.close_calls = 0 + self.last_tokens_used = 0 + + async def connect(self) -> None: + self.connect_calls += 1 + + async def close(self) -> None: + self.close_calls += 1 + + async def send_message(self, text: str, attachments: list[str] | None = None): + self.calls.append((text, attachments)) + yield FakeChunk(text) + self.last_tokens_used = 5 + + +class SendFileEvent: + def __init__(self, *, workspace_path: str, mime_type: str, filename: str, size: int) -> None: + self.type = "AGENT_EVENT_SEND_FILE" + self.workspace_path = workspace_path + self.mime_type = mime_type + self.filename = filename + self.size = size + + +class TextChunkEvent: + def __init__(self, text: str) -> None: + self.type = "AGENT_EVENT_TEXT_CHUNK" + self.text = text + + +class ToolCallChunkEvent: + def __init__(self, payload: str) -> None: + self.type = "AGENT_EVENT_TOOL_CALL_CHUNK" + self.payload = payload + + +class ToolResultEvent: + def __init__(self, payload: str) -> None: + self.type = "AGENT_EVENT_TOOL_RESULT" + self.payload = payload + + +class CustomUpdateEvent: + def __init__(self, payload: str) -> None: + self.type = "AGENT_EVENT_CUSTOM_UPDATE" + self.payload = payload + + +class EndEvent: + def __init__(self, tokens_used: int) -> None: + self.type = "AGENT_EVENT_END" + self.tokens_used = tokens_used + + +class ErrorEvent: + def __init__(self, code: str, details: str) -> None: + self.type = "ERROR" + self.code = code + self.details = details + + +class GracefulDisconnectEvent: + def __init__(self) -> None: + self.type = "GRACEFUL_DISCONNECT" + + +class FakeWSMessage: + def __init__(self, data: str) -> None: + self.type = agent_api_wrapper_module.aiohttp.WSMsgType.TEXT + self.data = data + + +class FakeWebSocket: + def __init__(self, messages: list[FakeWSMessage]) -> None: + self._messages = list(messages) + + def __aiter__(self): + return self + + async def __anext__(self): + if not self._messages: + raise StopAsyncIteration + return self._messages.pop(0) + + +class MessageResponseWithAttachments(MessageResponse): + attachments: list[Attachment] = [] + + def test_agent_api_wrapper_uses_modern_constructor_when_available(monkeypatch): calls: list[dict[str, object]] = [] @@ -219,6 +313,76 @@ async def test_real_platform_client_send_message_uses_chat_bound_client(): assert await prototype_state.get_last_tokens_used_for_context("chat-7") == 3 +@pytest.mark.asyncio +async def test_real_platform_client_forwards_attachments_to_chat_api(): + agent_api = AttachmentTrackingChatAgentApi("chat-7") + client = RealPlatformClient( + agent_api=agent_api, + prototype_state=PrototypeStateStore(), + platform="matrix", + ) + attachment = Attachment( + workspace_path="surfaces/matrix/alice/room/inbox/report.pdf", + mime_type="application/pdf", + filename="report.pdf", + size=123, + ) + + result = await client.send_message( + "@alice:example.org", + "chat-7", + "hello", + attachments=[attachment], + ) + + assert agent_api.calls == [("hello", ["surfaces/matrix/alice/room/inbox/report.pdf"])] + assert result.response == "hello" + assert result.tokens_used == 5 + + +@pytest.mark.asyncio +async def test_real_platform_client_preserves_send_file_events_in_sync_result(monkeypatch): + agent_api = AttachmentTrackingChatAgentApi("chat-7") + client = RealPlatformClient( + agent_api=agent_api, + prototype_state=PrototypeStateStore(), + platform="matrix", + ) + + class FileEventAgentApi(AttachmentTrackingChatAgentApi): + async def send_message(self, text: str, attachments: list[str] | None = None): + self.calls.append((text, attachments)) + yield TextChunkEvent("he") + yield SendFileEvent( + workspace_path="/workspace/report.pdf", + mime_type="application/pdf", + filename="report.pdf", + size=123, + ) + yield TextChunkEvent("llo") + self.last_tokens_used = 9 + + monkeypatch.setattr( + "sdk.real.MessageResponse", + MessageResponseWithAttachments, + ) + client._agent_api = FileEventAgentApi("chat-7") + + result = await client.send_message("@alice:example.org", "chat-7", "hello") + + assert result.response == "hello" + assert result.tokens_used == 9 + assert result.attachments == [ + Attachment( + url="/workspace/report.pdf", + mime_type="application/pdf", + filename="report.pdf", + size=123, + workspace_path="report.pdf", + ) + ] + + @pytest.mark.asyncio async def test_real_platform_client_works_with_legacy_agent_api_without_for_chat(): legacy_api = LegacyAgentApi() @@ -385,3 +549,85 @@ async def test_real_platform_client_settings_are_local(): assert isinstance(settings, UserSettings) assert settings.skills["browser"] is True assert settings.skills["web-search"] is True + + +@pytest.mark.asyncio +async def test_agent_api_wrapper_transparently_surfaces_modern_events(monkeypatch): + callback_events: list[object] = [] + queue: asyncio.Queue = asyncio.Queue() + event_map = { + "text": TextChunkEvent("he"), + "tool_call": ToolCallChunkEvent("call"), + "tool_result": ToolResultEvent("result"), + "custom_update": CustomUpdateEvent("update"), + "send_file": SendFileEvent( + workspace_path="/workspace/report.pdf", + mime_type="application/pdf", + filename="report.pdf", + size=123, + ), + "end": EndEvent(tokens_used=11), + "error": ErrorEvent(code="BOOM", details="bad things"), + "disconnect": GracefulDisconnectEvent(), + } + + def fake_validate_json(data: str): + return event_map[data] + + monkeypatch.setattr( + agent_api_wrapper_module, + "ServerMessage", + type("FakeServerMessage", (), {"validate_json": staticmethod(fake_validate_json)}), + ) + + async def fake_cleanup(self): + return None + + monkeypatch.setattr(agent_api_wrapper_module.AgentApiWrapper, "_cleanup", fake_cleanup) + monkeypatch.setattr( + agent_api_wrapper_module.AgentApi, + "__init__", + lambda self, agent_id, base_url=None, chat_id=0, **kwargs: setattr(self, "id", agent_id) + or setattr(self, "callback", kwargs.get("callback")) + or setattr(self, "on_disconnect", kwargs.get("on_disconnect")) + or setattr(self, "_current_queue", None), + ) + + wrapper = AgentApiWrapper( + agent_id="agent-1", + base_url="https://agent.example.com/v1/agent_ws", + chat_id="chat-1", + callback=callback_events.append, + ) + wrapper._current_queue = queue + wrapper._ws = FakeWebSocket( + [ + FakeWSMessage("text"), + FakeWSMessage("tool_call"), + FakeWSMessage("tool_result"), + FakeWSMessage("custom_update"), + FakeWSMessage("send_file"), + FakeWSMessage("end"), + FakeWSMessage("error"), + FakeWSMessage("disconnect"), + ] + ) + + await wrapper._listen() + + queue_events = [] + while not queue.empty(): + queue_events.append(await queue.get()) + + assert queue_events[0].text == "he" + assert any(isinstance(event, SendFileEvent) for event in queue_events) + assert any(isinstance(event, EndEvent) for event in queue_events) + assert any(isinstance(event, GracefulDisconnectEvent) for event in queue_events) + assert callback_events[0].payload == "call" + assert callback_events[1].payload == "result" + assert callback_events[2].payload == "update" + assert any(isinstance(event, SendFileEvent) for event in callback_events) + assert any(isinstance(event, EndEvent) for event in callback_events) + assert any(isinstance(event, ErrorEvent) for event in callback_events) + assert any(isinstance(event, GracefulDisconnectEvent) for event in callback_events) + assert wrapper.last_tokens_used == 11