from __future__ import annotations import mimetypes import re from datetime import UTC, datetime from pathlib import Path from core.protocol import Attachment def _sanitize_component(value: str) -> str: cleaned = re.sub(r"[^A-Za-z0-9._-]+", "_", value) cleaned = cleaned.strip("._-") return cleaned or "unknown" def _default_filename(attachment: Attachment) -> str: if attachment.filename: return attachment.filename extension = mimetypes.guess_extension(attachment.mime_type or "") or "" base = { "image": "image", "audio": "audio", "video": "video", "document": "attachment", }.get(attachment.type, "attachment") return f"{base}{extension}" def build_workspace_attachment_path( *, workspace_root: Path, matrix_user_id: str, room_id: str, filename: str, timestamp: str | None = None, ) -> tuple[str, Path]: stamp = timestamp or datetime.now(UTC).strftime("%Y%m%d-%H%M%S") safe_user = _sanitize_component(matrix_user_id.lstrip("@")) safe_room = _sanitize_component(room_id.lstrip("!")) safe_name = _sanitize_component(filename) or "attachment.bin" relative_path = ( Path("surfaces") / "matrix" / safe_user / safe_room / "inbox" / f"{stamp}-{safe_name}" ) return relative_path.as_posix(), workspace_root / relative_path async def download_matrix_attachment( *, client, workspace_root: Path, matrix_user_id: str, room_id: str, attachment: Attachment, timestamp: str | None = None, ) -> Attachment: if not attachment.url: return attachment filename = _default_filename(attachment) relative_path, absolute_path = build_workspace_attachment_path( workspace_root=workspace_root, matrix_user_id=matrix_user_id, room_id=room_id, filename=filename, timestamp=timestamp, ) absolute_path.parent.mkdir(parents=True, exist_ok=True) response = await client.download(attachment.url) body = getattr(response, "body", None) if body is None: raise RuntimeError(f"Matrix download response for {attachment.url} has no body") absolute_path.write_bytes(body) return Attachment( type=attachment.type, url=attachment.url, filename=filename, mime_type=attachment.mime_type, workspace_path=relative_path, ) def resolve_workspace_attachment_path(workspace_root: Path, workspace_path: str) -> Path: path = Path(workspace_path) if path.is_absolute(): return path return workspace_root / path def matrix_msgtype_for_attachment(attachment: Attachment) -> str: return { "image": "m.image", "audio": "m.audio", "video": "m.video", }.get(attachment.type, "m.file")