114 lines
3.1 KiB
Python
114 lines
3.1 KiB
Python
from __future__ import annotations
|
|
|
|
import mimetypes
|
|
import re
|
|
from pathlib import Path, PurePosixPath
|
|
|
|
from core.protocol import Attachment
|
|
|
|
|
|
def _sanitize_filename(value: str) -> str:
|
|
filename = PurePosixPath(str(value).replace("\\", "/")).name.strip()
|
|
cleaned = re.sub(r"[\x00-\x1f\x7f<>:\"/\\|?*]+", "_", filename)
|
|
cleaned = cleaned.strip(" .")
|
|
return cleaned or "attachment.bin"
|
|
|
|
|
|
def _default_filename(attachment: Attachment) -> str:
|
|
if attachment.filename:
|
|
return attachment.filename
|
|
|
|
extension = mimetypes.guess_extension(attachment.mime_type or "") or ""
|
|
base = {
|
|
"image": "image",
|
|
"audio": "audio",
|
|
"video": "video",
|
|
"document": "attachment",
|
|
}.get(attachment.type, "attachment")
|
|
return f"{base}{extension}"
|
|
|
|
|
|
def _with_copy_index(filename: str, index: int) -> str:
|
|
path = Path(filename)
|
|
suffix = path.suffix
|
|
stem = path.stem if suffix else filename
|
|
return f"{stem} ({index}){suffix}"
|
|
|
|
|
|
def _unique_workspace_relative_path(workspace_root: Path, filename: str) -> tuple[str, Path]:
|
|
safe_name = _sanitize_filename(filename)
|
|
candidate = workspace_root / safe_name
|
|
if not candidate.exists():
|
|
return safe_name, candidate
|
|
|
|
index = 1
|
|
while True:
|
|
indexed_name = _with_copy_index(safe_name, index)
|
|
candidate = workspace_root / indexed_name
|
|
if not candidate.exists():
|
|
return indexed_name, candidate
|
|
index += 1
|
|
|
|
|
|
def build_agent_workspace_path(
|
|
*,
|
|
workspace_root: Path,
|
|
filename: str,
|
|
) -> tuple[str, Path]:
|
|
"""Saves user files directly to {workspace_root}/{filename}.
|
|
|
|
The returned relative path is what gets passed to agent.send_message(attachments=[...]).
|
|
"""
|
|
return _unique_workspace_relative_path(workspace_root, filename)
|
|
|
|
|
|
async def download_matrix_attachment(
|
|
*,
|
|
client,
|
|
workspace_root: Path,
|
|
matrix_user_id: str,
|
|
room_id: str,
|
|
attachment: Attachment,
|
|
timestamp: str | None = None,
|
|
) -> Attachment:
|
|
if not attachment.url:
|
|
return attachment
|
|
|
|
filename = _default_filename(attachment)
|
|
|
|
del matrix_user_id, room_id, timestamp
|
|
relative_path, absolute_path = build_agent_workspace_path(
|
|
workspace_root=workspace_root,
|
|
filename=filename,
|
|
)
|
|
|
|
absolute_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
response = await client.download(attachment.url)
|
|
body = getattr(response, "body", None)
|
|
if body is None:
|
|
raise RuntimeError(f"Matrix download response for {attachment.url} has no body")
|
|
absolute_path.write_bytes(body)
|
|
|
|
return Attachment(
|
|
type=attachment.type,
|
|
url=attachment.url,
|
|
filename=filename,
|
|
mime_type=attachment.mime_type,
|
|
workspace_path=relative_path,
|
|
)
|
|
|
|
|
|
def resolve_workspace_attachment_path(workspace_root: Path, workspace_path: str) -> Path:
|
|
path = Path(workspace_path)
|
|
if path.is_absolute():
|
|
return path
|
|
return workspace_root / path
|
|
|
|
|
|
def matrix_msgtype_for_attachment(attachment: Attachment) -> str:
|
|
return {
|
|
"image": "m.image",
|
|
"audio": "m.audio",
|
|
"video": "m.video",
|
|
}.get(attachment.type, "m.file")
|