from __future__ import annotations from pathlib import Path from types import SimpleNamespace from adapter.matrix.files import ( build_agent_incoming_path, build_workspace_attachment_path, download_matrix_attachment, ) from core.protocol import Attachment def test_build_workspace_attachment_path_scopes_by_surface_user_and_room(tmp_path: Path): rel_path, abs_path = build_workspace_attachment_path( workspace_root=tmp_path, matrix_user_id="@alice:example.org", room_id="!room:example.org", filename="report.pdf", timestamp="20260420-153000", ) assert ( rel_path == "surfaces/matrix/alice_example.org/room_example.org/inbox/20260420-153000-report.pdf" ) assert abs_path == tmp_path / rel_path async def test_download_matrix_attachment_persists_file_and_returns_workspace_path(tmp_path: Path): async def download(url: str): assert url == "mxc://server/id" return SimpleNamespace(body=b"%PDF-1.7") client = SimpleNamespace(download=download) attachment = Attachment( type="document", url="mxc://server/id", filename="report.pdf", mime_type="application/pdf", ) saved = await download_matrix_attachment( client=client, workspace_root=tmp_path, matrix_user_id="@alice:example.org", room_id="!room:example.org", attachment=attachment, timestamp="20260420-153000", ) assert saved.workspace_path is not None assert saved.workspace_path.endswith("20260420-153000-report.pdf") assert (tmp_path / saved.workspace_path).read_bytes() == b"%PDF-1.7" def test_build_workspace_attachment_path_keeps_room_safe_agents_relative_contract(tmp_path: Path): rel_path, abs_path = build_workspace_attachment_path( workspace_root=tmp_path / "agents" / "7", matrix_user_id="@alice+bob:example.org", room_id="!room/ops:example.org", filename="quarterly status (final).pdf", timestamp="20260420-153000", ) assert rel_path == ( "surfaces/matrix/alice_bob_example.org/room_ops_example.org/inbox/" "20260420-153000-quarterly_status_final_.pdf" ) assert not Path(rel_path).is_absolute() assert abs_path == tmp_path / "agents" / "7" / rel_path def test_build_agent_incoming_path_uses_agent_workspace_volume(tmp_path: Path): rel_path, abs_path = build_agent_incoming_path( workspace_root=tmp_path / "agents" / "17", filename="quarterly status.pdf", timestamp="20260428-110000", ) assert rel_path == "incoming/20260428-110000-quarterly_status.pdf" assert abs_path == tmp_path / "agents" / "17" / rel_path async def test_download_matrix_attachment_uses_agent_workspace_incoming_dir(tmp_path: Path): async def download(url: str): assert url == "mxc://server/id" return SimpleNamespace(body=b"%PDF-1.7") saved = await download_matrix_attachment( client=SimpleNamespace(download=download), workspace_root=tmp_path / "agents" / "17", matrix_user_id="@alice:example.org", room_id="!room:example.org", attachment=Attachment( type="document", url="mxc://server/id", filename="report.pdf", mime_type="application/pdf", ), timestamp="20260428-110000", ) assert saved.workspace_path == "incoming/20260428-110000-report.pdf" assert (tmp_path / "agents" / "17" / saved.workspace_path).read_bytes() == b"%PDF-1.7"