- MAX API integration (long polling) - MockPlatformClient for agent simulation - File download & workspace storage - Basic commands: /help, /start - Attachment queue: add works, list/remove need testing [WIP: attachment queue commands] [MOCK-ONLY: requires real agent for production]
149 lines
No EOL
4.7 KiB
Python
149 lines
No EOL
4.7 KiB
Python
"""Incoming / outgoing file helpers for MAX (aligned with Matrix workspace layout)."""
|
||
from __future__ import annotations
|
||
|
||
import mimetypes
|
||
import re
|
||
from pathlib import Path, PurePosixPath
|
||
|
||
import httpx
|
||
from adapter.max.api_client import MaxBotApi
|
||
|
||
|
||
def _sanitize_filename(value: str) -> str:
|
||
"""Безопасное имя файла с поддержкой кириллицы."""
|
||
filename = PurePosixPath(str(value).replace("\\", "/")).name.strip()
|
||
cleaned = re.sub(r'[\x00-\x1f\x7f<>"|?*]+', "_", filename)
|
||
cleaned = cleaned.strip(" .")
|
||
return cleaned or "attachment.bin"
|
||
|
||
|
||
def _with_copy_index(filename: str, index: int) -> str:
|
||
"""Generate filename with copy index: file.txt -> file (1).txt"""
|
||
path = Path(filename)
|
||
suffix = path.suffix
|
||
stem = path.stem if suffix else filename
|
||
return f"{stem} ({index}){suffix}"
|
||
|
||
|
||
def _unique_workspace_relative_path(workspace_root: Path, filename: str) -> tuple[str, Path]:
|
||
"""Generate unique filename if file already exists in workspace."""
|
||
safe_name = _sanitize_filename(filename)
|
||
candidate = workspace_root / safe_name
|
||
|
||
if not candidate.exists():
|
||
return safe_name, candidate
|
||
|
||
index = 1
|
||
while True:
|
||
indexed_name = _with_copy_index(safe_name, index)
|
||
candidate = workspace_root / indexed_name
|
||
if not candidate.exists():
|
||
return indexed_name, candidate
|
||
index += 1
|
||
|
||
|
||
def build_agent_workspace_path(
|
||
*,
|
||
workspace_root: Path,
|
||
filename: str,
|
||
) -> tuple[str, Path]:
|
||
"""Saves user files directly to {workspace_root}/{filename}."""
|
||
return _unique_workspace_relative_path(workspace_root, filename)
|
||
|
||
|
||
def resolve_workspace_attachment_path(workspace_root: Path, workspace_path: str) -> Path:
|
||
"""Resolve relative workspace path to absolute Path object."""
|
||
path = Path(workspace_path)
|
||
if path.is_absolute():
|
||
return path
|
||
return workspace_root / path
|
||
|
||
# === Конец вспомогательных функций ===
|
||
|
||
|
||
def guess_upload_type(mime_type: str | None, *, attachment_type: str) -> str:
|
||
if attachment_type == "image":
|
||
return "image"
|
||
if attachment_type == "video":
|
||
return "video"
|
||
if attachment_type == "audio":
|
||
return "audio"
|
||
mime = mime_type or ""
|
||
if mime.startswith("image/"):
|
||
return "image"
|
||
if mime.startswith("video/"):
|
||
return "video"
|
||
if mime.startswith("audio/"):
|
||
return "audio"
|
||
return "file"
|
||
|
||
|
||
async def save_incoming_from_url(
|
||
*,
|
||
api: MaxBotApi, # Теперь тип известен
|
||
workspace_root: Path,
|
||
filename: str,
|
||
url: str,
|
||
) -> str:
|
||
"""Скачивает файл по URL и сохраняет в workspace."""
|
||
data = await api.download_file(url)
|
||
workspace_root.mkdir(parents=True, exist_ok=True)
|
||
|
||
# Используем добавленную функцию
|
||
relative_path, absolute_path = build_agent_workspace_path(
|
||
workspace_root=workspace_root,
|
||
filename=filename,
|
||
)
|
||
|
||
absolute_path.parent.mkdir(parents=True, exist_ok=True)
|
||
absolute_path.write_bytes(data)
|
||
return relative_path
|
||
|
||
|
||
async def upload_file_as_attachment(
|
||
api: MaxBotApi, # Теперь тип известен
|
||
*,
|
||
filename: str,
|
||
content: bytes,
|
||
upload_type: str,
|
||
) -> dict:
|
||
"""Загружает файл в MAX для отправки пользователю."""
|
||
meta = await api.get_upload_url(upload_type)
|
||
upload_url = meta.get("url")
|
||
token = meta.get("token")
|
||
if not isinstance(upload_url, str) or not upload_url:
|
||
raise RuntimeError("MAX uploads response missing url")
|
||
|
||
async with httpx.AsyncClient(timeout=httpx.Timeout(120.0)) as client:
|
||
response = await client.post(
|
||
upload_url,
|
||
files={"data": (filename, content, guess_mimetype(filename))},
|
||
)
|
||
response.raise_for_status()
|
||
|
||
payload: dict = {}
|
||
if token:
|
||
payload["token"] = token
|
||
if upload_type == "image":
|
||
return {"type": "image", "payload": payload}
|
||
|
||
type_map = {
|
||
"file": "file",
|
||
"video": "video",
|
||
"audio": "audio",
|
||
}
|
||
mapped = type_map.get(upload_type, "file")
|
||
return {"type": mapped, "payload": payload}
|
||
|
||
|
||
def guess_mimetype(filename: str) -> str:
|
||
mime, _ = mimetypes.guess_type(filename)
|
||
return mime or "application/octet-stream"
|
||
|
||
|
||
def read_workspace_bytes(workspace_path: str | Path, *, agent_workspace: str) -> bytes:
|
||
"""Читает файл из workspace по относительному пути."""
|
||
root = Path(agent_workspace)
|
||
# Теперь эта функция определена выше
|
||
resolved = resolve_workspace_attachment_path(root, str(workspace_path))
|
||
return resolved.read_bytes() |