from __future__ import annotations import mimetypes import re from pathlib import Path, PurePosixPath from core.protocol import Attachment def _sanitize_filename(value: str) -> str: filename = PurePosixPath(str(value).replace("\\", "/")).name.strip() cleaned = re.sub(r"[\x00-\x1f\x7f<>:\"/\\|?*]+", "_", filename) cleaned = cleaned.strip(" .") return cleaned or "attachment.bin" def _default_filename(attachment: Attachment) -> str: if attachment.filename: return attachment.filename extension = mimetypes.guess_extension(attachment.mime_type or "") or "" base = { "image": "image", "audio": "audio", "video": "video", "document": "attachment", }.get(attachment.type, "attachment") return f"{base}{extension}" def _with_copy_index(filename: str, index: int) -> str: path = Path(filename) suffix = path.suffix stem = path.stem if suffix else filename return f"{stem} ({index}){suffix}" def _unique_workspace_relative_path(workspace_root: Path, filename: str) -> tuple[str, Path]: safe_name = _sanitize_filename(filename) candidate = workspace_root / safe_name if not candidate.exists(): return safe_name, candidate index = 1 while True: indexed_name = _with_copy_index(safe_name, index) candidate = workspace_root / indexed_name if not candidate.exists(): return indexed_name, candidate index += 1 def build_agent_workspace_path( *, workspace_root: Path, filename: str, ) -> tuple[str, Path]: """Saves user files directly to {workspace_root}/{filename}. The returned relative path is what gets passed to agent.send_message(attachments=[...]). """ return _unique_workspace_relative_path(workspace_root, filename) async def download_matrix_attachment( *, client, workspace_root: Path, matrix_user_id: str, room_id: str, attachment: Attachment, timestamp: str | None = None, ) -> Attachment: if not attachment.url: return attachment filename = _default_filename(attachment) del matrix_user_id, room_id, timestamp relative_path, absolute_path = build_agent_workspace_path( workspace_root=workspace_root, filename=filename, ) absolute_path.parent.mkdir(parents=True, exist_ok=True) response = await client.download(attachment.url) body = getattr(response, "body", None) if body is None: raise RuntimeError(f"Matrix download response for {attachment.url} has no body") absolute_path.write_bytes(body) return Attachment( type=attachment.type, url=attachment.url, filename=filename, mime_type=attachment.mime_type, workspace_path=relative_path, ) def resolve_workspace_attachment_path(workspace_root: Path, workspace_path: str) -> Path: path = Path(workspace_path) if path.is_absolute(): return path return workspace_root / path def matrix_msgtype_for_attachment(attachment: Attachment) -> str: return { "image": "m.image", "audio": "m.audio", "video": "m.video", }.get(attachment.type, "m.file")