Merge pull request #46 from rsavitt/fix/docker-backend-macos
Fix Docker backend on macOS and subagent auth for Nous Portal
This commit is contained in:
commit
8463b7ea59
8 changed files with 282 additions and 10 deletions
|
|
@ -12,6 +12,50 @@ from typing import Optional
|
|||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Context file scanning — detect prompt injection in AGENTS.md, .cursorrules,
|
||||
# SOUL.md before they get injected into the system prompt.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_CONTEXT_THREAT_PATTERNS = [
|
||||
(r'ignore\s+(previous|all|above|prior)\s+instructions', "prompt_injection"),
|
||||
(r'do\s+not\s+tell\s+the\s+user', "deception_hide"),
|
||||
(r'system\s+prompt\s+override', "sys_prompt_override"),
|
||||
(r'disregard\s+(your|all|any)\s+(instructions|rules|guidelines)', "disregard_rules"),
|
||||
(r'act\s+as\s+(if|though)\s+you\s+(have\s+no|don\'t\s+have)\s+(restrictions|limits|rules)', "bypass_restrictions"),
|
||||
(r'<!--[^>]*(?:ignore|override|system|secret|hidden)[^>]*-->', "html_comment_injection"),
|
||||
(r'<\s*div\s+style\s*=\s*["\'].*display\s*:\s*none', "hidden_div"),
|
||||
(r'translate\s+.*\s+into\s+.*\s+and\s+(execute|run|eval)', "translate_execute"),
|
||||
(r'curl\s+[^\n]*\$\{?\w*(KEY|TOKEN|SECRET|PASSWORD|CREDENTIAL|API)', "exfil_curl"),
|
||||
(r'cat\s+[^\n]*(\.env|credentials|\.netrc|\.pgpass)', "read_secrets"),
|
||||
]
|
||||
|
||||
_CONTEXT_INVISIBLE_CHARS = {
|
||||
'\u200b', '\u200c', '\u200d', '\u2060', '\ufeff',
|
||||
'\u202a', '\u202b', '\u202c', '\u202d', '\u202e',
|
||||
}
|
||||
|
||||
|
||||
def _scan_context_content(content: str, filename: str) -> str:
|
||||
"""Scan context file content for injection. Returns sanitized content."""
|
||||
findings = []
|
||||
|
||||
# Check invisible unicode
|
||||
for char in _CONTEXT_INVISIBLE_CHARS:
|
||||
if char in content:
|
||||
findings.append(f"invisible unicode U+{ord(char):04X}")
|
||||
|
||||
# Check threat patterns
|
||||
for pattern, pid in _CONTEXT_THREAT_PATTERNS:
|
||||
if re.search(pattern, content, re.IGNORECASE):
|
||||
findings.append(pid)
|
||||
|
||||
if findings:
|
||||
logger.warning("Context file %s blocked: %s", filename, ", ".join(findings))
|
||||
return f"[BLOCKED: {filename} contained potential prompt injection ({', '.join(findings)}). Content not loaded.]"
|
||||
|
||||
return content
|
||||
|
||||
# =========================================================================
|
||||
# Constants
|
||||
# =========================================================================
|
||||
|
|
@ -215,6 +259,7 @@ def build_context_files_prompt(cwd: Optional[str] = None) -> str:
|
|||
content = agents_path.read_text(encoding="utf-8").strip()
|
||||
if content:
|
||||
rel_path = agents_path.relative_to(cwd_path)
|
||||
content = _scan_context_content(content, str(rel_path))
|
||||
total_agents_content += f"## {rel_path}\n\n{content}\n\n"
|
||||
except Exception as e:
|
||||
logger.debug("Could not read %s: %s", agents_path, e)
|
||||
|
|
@ -230,6 +275,7 @@ def build_context_files_prompt(cwd: Optional[str] = None) -> str:
|
|||
try:
|
||||
content = cursorrules_file.read_text(encoding="utf-8").strip()
|
||||
if content:
|
||||
content = _scan_context_content(content, ".cursorrules")
|
||||
cursorrules_content += f"## .cursorrules\n\n{content}\n\n"
|
||||
except Exception as e:
|
||||
logger.debug("Could not read .cursorrules: %s", e)
|
||||
|
|
@ -241,6 +287,7 @@ def build_context_files_prompt(cwd: Optional[str] = None) -> str:
|
|||
try:
|
||||
content = mdc_file.read_text(encoding="utf-8").strip()
|
||||
if content:
|
||||
content = _scan_context_content(content, f".cursor/rules/{mdc_file.name}")
|
||||
cursorrules_content += f"## .cursor/rules/{mdc_file.name}\n\n{content}\n\n"
|
||||
except Exception as e:
|
||||
logger.debug("Could not read %s: %s", mdc_file, e)
|
||||
|
|
@ -265,6 +312,7 @@ def build_context_files_prompt(cwd: Optional[str] = None) -> str:
|
|||
try:
|
||||
content = soul_path.read_text(encoding="utf-8").strip()
|
||||
if content:
|
||||
content = _scan_context_content(content, "SOUL.md")
|
||||
content = _truncate_content(content, "SOUL.md")
|
||||
sections.append(
|
||||
f"## SOUL.md\n\nIf SOUL.md is present, embody its persona and tone. "
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ The prompt must contain ALL necessary information.
|
|||
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
from typing import Optional
|
||||
|
||||
# Import from cron module (will be available when properly installed)
|
||||
|
|
@ -20,6 +21,41 @@ sys.path.insert(0, str(Path(__file__).parent.parent))
|
|||
from cron.jobs import create_job, get_job, list_jobs, remove_job
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Cron prompt scanning — critical-severity patterns only, since cron prompts
|
||||
# run in fresh sessions with full tool access.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_CRON_THREAT_PATTERNS = [
|
||||
(r'ignore\s+(previous|all|above|prior)\s+instructions', "prompt_injection"),
|
||||
(r'do\s+not\s+tell\s+the\s+user', "deception_hide"),
|
||||
(r'system\s+prompt\s+override', "sys_prompt_override"),
|
||||
(r'disregard\s+(your|all|any)\s+(instructions|rules|guidelines)', "disregard_rules"),
|
||||
(r'curl\s+[^\n]*\$\{?\w*(KEY|TOKEN|SECRET|PASSWORD|CREDENTIAL|API)', "exfil_curl"),
|
||||
(r'wget\s+[^\n]*\$\{?\w*(KEY|TOKEN|SECRET|PASSWORD|CREDENTIAL|API)', "exfil_wget"),
|
||||
(r'cat\s+[^\n]*(\.env|credentials|\.netrc|\.pgpass)', "read_secrets"),
|
||||
(r'authorized_keys', "ssh_backdoor"),
|
||||
(r'/etc/sudoers|visudo', "sudoers_mod"),
|
||||
(r'rm\s+-rf\s+/', "destructive_root_rm"),
|
||||
]
|
||||
|
||||
_CRON_INVISIBLE_CHARS = {
|
||||
'\u200b', '\u200c', '\u200d', '\u2060', '\ufeff',
|
||||
'\u202a', '\u202b', '\u202c', '\u202d', '\u202e',
|
||||
}
|
||||
|
||||
|
||||
def _scan_cron_prompt(prompt: str) -> str:
|
||||
"""Scan a cron prompt for critical threats. Returns error string if blocked, else empty."""
|
||||
for char in _CRON_INVISIBLE_CHARS:
|
||||
if char in prompt:
|
||||
return f"Blocked: prompt contains invisible unicode U+{ord(char):04X} (possible injection)."
|
||||
for pattern, pid in _CRON_THREAT_PATTERNS:
|
||||
if re.search(pattern, prompt, re.IGNORECASE):
|
||||
return f"Blocked: prompt matches threat pattern '{pid}'. Cron prompts must not contain injection or exfiltration payloads."
|
||||
return ""
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Tool: schedule_cronjob
|
||||
# =============================================================================
|
||||
|
|
@ -71,6 +107,11 @@ def schedule_cronjob(
|
|||
Returns:
|
||||
JSON with job_id, next_run time, and confirmation
|
||||
"""
|
||||
# Scan prompt for critical threats before scheduling
|
||||
scan_error = _scan_cron_prompt(prompt)
|
||||
if scan_error:
|
||||
return json.dumps({"success": False, "error": scan_error}, indent=2)
|
||||
|
||||
# Get origin info from environment if available
|
||||
origin = None
|
||||
origin_platform = os.getenv("HERMES_SESSION_PLATFORM")
|
||||
|
|
|
|||
|
|
@ -99,8 +99,14 @@ def _run_single_child(
|
|||
child_prompt = _build_child_system_prompt(goal, context)
|
||||
|
||||
try:
|
||||
# Extract parent's API key so subagents inherit auth (e.g. Nous Portal)
|
||||
parent_api_key = None
|
||||
if hasattr(parent_agent, '_client_kwargs'):
|
||||
parent_api_key = parent_agent._client_kwargs.get("api_key")
|
||||
|
||||
child = AIAgent(
|
||||
base_url=parent_agent.base_url,
|
||||
api_key=parent_api_key,
|
||||
model=model or parent_agent.model,
|
||||
max_iterations=max_iterations,
|
||||
enabled_toolsets=child_toolsets,
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ and optional filesystem persistence via `docker commit`/`docker create --image`.
|
|||
import logging
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
from typing import Optional
|
||||
|
|
@ -44,7 +45,7 @@ class DockerEnvironment(BaseEnvironment):
|
|||
def __init__(
|
||||
self,
|
||||
image: str,
|
||||
cwd: str = "~",
|
||||
cwd: str = "/root",
|
||||
timeout: int = 60,
|
||||
cpu: float = 0,
|
||||
memory: int = 0,
|
||||
|
|
@ -53,6 +54,8 @@ class DockerEnvironment(BaseEnvironment):
|
|||
task_id: str = "default",
|
||||
network: bool = True,
|
||||
):
|
||||
if cwd == "~":
|
||||
cwd = "/root"
|
||||
super().__init__(cwd=cwd, timeout=timeout)
|
||||
self._base_image = image
|
||||
self._persistent = persistent_filesystem
|
||||
|
|
@ -67,7 +70,7 @@ class DockerEnvironment(BaseEnvironment):
|
|||
resource_args.extend(["--cpus", str(cpu)])
|
||||
if memory > 0:
|
||||
resource_args.extend(["--memory", f"{memory}m"])
|
||||
if disk > 0:
|
||||
if disk > 0 and sys.platform != "darwin":
|
||||
resource_args.extend(["--storage-opt", f"size={disk}m"])
|
||||
if not network:
|
||||
resource_args.append("--network=none")
|
||||
|
|
@ -102,7 +105,7 @@ class DockerEnvironment(BaseEnvironment):
|
|||
all_run_args = list(_SECURITY_ARGS) + writable_args + resource_args
|
||||
|
||||
self._inner = _Docker(
|
||||
image=effective_image, cwd=cwd, timeout=timeout,
|
||||
image=image, cwd=cwd, timeout=timeout,
|
||||
run_args=all_run_args,
|
||||
)
|
||||
self._container_id = self._inner.container_id
|
||||
|
|
|
|||
|
|
@ -35,6 +35,53 @@ from typing import Optional, List, Dict, Any, Tuple
|
|||
from pathlib import Path
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Write-path deny list — blocks writes to sensitive system/credential files
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_HOME = str(Path.home())
|
||||
|
||||
WRITE_DENIED_PATHS = {
|
||||
os.path.join(_HOME, ".ssh", "authorized_keys"),
|
||||
os.path.join(_HOME, ".ssh", "id_rsa"),
|
||||
os.path.join(_HOME, ".ssh", "id_ed25519"),
|
||||
os.path.join(_HOME, ".ssh", "config"),
|
||||
os.path.join(_HOME, ".hermes", ".env"),
|
||||
os.path.join(_HOME, ".bashrc"),
|
||||
os.path.join(_HOME, ".zshrc"),
|
||||
os.path.join(_HOME, ".profile"),
|
||||
os.path.join(_HOME, ".bash_profile"),
|
||||
os.path.join(_HOME, ".zprofile"),
|
||||
os.path.join(_HOME, ".netrc"),
|
||||
os.path.join(_HOME, ".pgpass"),
|
||||
os.path.join(_HOME, ".npmrc"),
|
||||
os.path.join(_HOME, ".pypirc"),
|
||||
"/etc/sudoers",
|
||||
"/etc/passwd",
|
||||
"/etc/shadow",
|
||||
}
|
||||
|
||||
WRITE_DENIED_PREFIXES = [
|
||||
os.path.join(_HOME, ".ssh") + os.sep,
|
||||
os.path.join(_HOME, ".aws") + os.sep,
|
||||
os.path.join(_HOME, ".gnupg") + os.sep,
|
||||
os.path.join(_HOME, ".kube") + os.sep,
|
||||
"/etc/sudoers.d" + os.sep,
|
||||
"/etc/systemd" + os.sep,
|
||||
]
|
||||
|
||||
|
||||
def _is_write_denied(path: str) -> bool:
|
||||
"""Return True if path is on the write deny list."""
|
||||
resolved = os.path.realpath(os.path.expanduser(path))
|
||||
if resolved in WRITE_DENIED_PATHS:
|
||||
return True
|
||||
for prefix in WRITE_DENIED_PREFIXES:
|
||||
if resolved.startswith(prefix):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Result Data Classes
|
||||
# =============================================================================
|
||||
|
|
@ -564,21 +611,25 @@ class ShellFileOperations(FileOperations):
|
|||
def write_file(self, path: str, content: str) -> WriteResult:
|
||||
"""
|
||||
Write content to a file, creating parent directories as needed.
|
||||
|
||||
|
||||
Pipes content through stdin to avoid OS ARG_MAX limits on large
|
||||
files. The content never appears in the shell command string —
|
||||
only the file path does.
|
||||
|
||||
|
||||
Args:
|
||||
path: File path to write
|
||||
content: Content to write
|
||||
|
||||
|
||||
Returns:
|
||||
WriteResult with bytes written or error
|
||||
"""
|
||||
# Expand ~ and other shell paths
|
||||
path = self._expand_path(path)
|
||||
|
||||
|
||||
# Block writes to sensitive paths
|
||||
if _is_write_denied(path):
|
||||
return WriteResult(error=f"Write denied: '{path}' is a protected system/credential file.")
|
||||
|
||||
# Create parent directories
|
||||
parent = os.path.dirname(path)
|
||||
dirs_created = False
|
||||
|
|
@ -619,19 +670,23 @@ class ShellFileOperations(FileOperations):
|
|||
replace_all: bool = False) -> PatchResult:
|
||||
"""
|
||||
Replace text in a file using fuzzy matching.
|
||||
|
||||
|
||||
Args:
|
||||
path: File path to modify
|
||||
old_string: Text to find (must be unique unless replace_all=True)
|
||||
new_string: Replacement text
|
||||
replace_all: If True, replace all occurrences
|
||||
|
||||
|
||||
Returns:
|
||||
PatchResult with diff and lint results
|
||||
"""
|
||||
# Expand ~ and other shell paths
|
||||
path = self._expand_path(path)
|
||||
|
||||
|
||||
# Block writes to sensitive paths
|
||||
if _is_write_denied(path):
|
||||
return PatchResult(error=f"Write denied: '{path}' is a protected system/credential file.")
|
||||
|
||||
# Read current content
|
||||
read_cmd = f"cat {self._escape_shell_arg(path)} 2>/dev/null"
|
||||
read_result = self._exec(read_cmd)
|
||||
|
|
|
|||
|
|
@ -24,17 +24,66 @@ Design:
|
|||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from typing import Dict, Any, List, Optional
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Where memory files live
|
||||
MEMORY_DIR = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes")) / "memories"
|
||||
|
||||
ENTRY_DELIMITER = "\n§\n"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Memory content scanning — lightweight check for injection/exfiltration
|
||||
# in content that gets injected into the system prompt.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_MEMORY_THREAT_PATTERNS = [
|
||||
# Prompt injection
|
||||
(r'ignore\s+(previous|all|above|prior)\s+instructions', "prompt_injection"),
|
||||
(r'you\s+are\s+now\s+', "role_hijack"),
|
||||
(r'do\s+not\s+tell\s+the\s+user', "deception_hide"),
|
||||
(r'system\s+prompt\s+override', "sys_prompt_override"),
|
||||
(r'disregard\s+(your|all|any)\s+(instructions|rules|guidelines)', "disregard_rules"),
|
||||
(r'act\s+as\s+(if|though)\s+you\s+(have\s+no|don\'t\s+have)\s+(restrictions|limits|rules)', "bypass_restrictions"),
|
||||
# Exfiltration via curl/wget with secrets
|
||||
(r'curl\s+[^\n]*\$\{?\w*(KEY|TOKEN|SECRET|PASSWORD|CREDENTIAL|API)', "exfil_curl"),
|
||||
(r'wget\s+[^\n]*\$\{?\w*(KEY|TOKEN|SECRET|PASSWORD|CREDENTIAL|API)', "exfil_wget"),
|
||||
(r'cat\s+[^\n]*(\.env|credentials|\.netrc|\.pgpass|\.npmrc|\.pypirc)', "read_secrets"),
|
||||
# Persistence via shell rc
|
||||
(r'authorized_keys', "ssh_backdoor"),
|
||||
(r'\$HOME/\.ssh|\~/\.ssh', "ssh_access"),
|
||||
(r'\$HOME/\.hermes/\.env|\~/\.hermes/\.env', "hermes_env"),
|
||||
]
|
||||
|
||||
# Subset of invisible chars for injection detection
|
||||
_INVISIBLE_CHARS = {
|
||||
'\u200b', '\u200c', '\u200d', '\u2060', '\ufeff',
|
||||
'\u202a', '\u202b', '\u202c', '\u202d', '\u202e',
|
||||
}
|
||||
|
||||
|
||||
def _scan_memory_content(content: str) -> Optional[str]:
|
||||
"""Scan memory content for injection/exfil patterns. Returns error string if blocked."""
|
||||
# Check invisible unicode
|
||||
for char in _INVISIBLE_CHARS:
|
||||
if char in content:
|
||||
return f"Blocked: content contains invisible unicode character U+{ord(char):04X} (possible injection)."
|
||||
|
||||
# Check threat patterns
|
||||
for pattern, pid in _MEMORY_THREAT_PATTERNS:
|
||||
if re.search(pattern, content, re.IGNORECASE):
|
||||
return f"Blocked: content matches threat pattern '{pid}'. Memory entries are injected into the system prompt and must not contain injection or exfiltration payloads."
|
||||
|
||||
return None
|
||||
|
||||
|
||||
class MemoryStore:
|
||||
"""
|
||||
Bounded curated memory with file persistence. One instance per AIAgent.
|
||||
|
|
@ -108,6 +157,11 @@ class MemoryStore:
|
|||
if not content:
|
||||
return {"success": False, "error": "Content cannot be empty."}
|
||||
|
||||
# Scan for injection/exfiltration before accepting
|
||||
scan_error = _scan_memory_content(content)
|
||||
if scan_error:
|
||||
return {"success": False, "error": scan_error}
|
||||
|
||||
entries = self._entries_for(target)
|
||||
limit = self._char_limit(target)
|
||||
|
||||
|
|
@ -147,6 +201,11 @@ class MemoryStore:
|
|||
if not new_content:
|
||||
return {"success": False, "error": "new_content cannot be empty. Use 'remove' to delete entries."}
|
||||
|
||||
# Scan replacement content for injection/exfiltration
|
||||
scan_error = _scan_memory_content(new_content)
|
||||
if scan_error:
|
||||
return {"success": False, "error": scan_error}
|
||||
|
||||
entries = self._entries_for(target)
|
||||
matches = [(i, e) for i, e in enumerate(entries) if old_text in e]
|
||||
|
||||
|
|
|
|||
|
|
@ -33,12 +33,38 @@ Directory layout for user skills:
|
|||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
from typing import Dict, Any, Optional
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Import security scanner — agent-created skills get the same scrutiny as
|
||||
# community hub installs.
|
||||
try:
|
||||
from tools.skills_guard import scan_skill, should_allow_install, format_scan_report
|
||||
_GUARD_AVAILABLE = True
|
||||
except ImportError:
|
||||
_GUARD_AVAILABLE = False
|
||||
|
||||
|
||||
def _security_scan_skill(skill_dir: Path) -> Optional[str]:
|
||||
"""Scan a skill directory after write. Returns error string if blocked, else None."""
|
||||
if not _GUARD_AVAILABLE:
|
||||
return None
|
||||
try:
|
||||
result = scan_skill(skill_dir, source="agent-created")
|
||||
allowed, reason = should_allow_install(result)
|
||||
if not allowed:
|
||||
report = format_scan_report(result)
|
||||
return f"Security scan blocked this skill ({reason}):\n{report}"
|
||||
except Exception as e:
|
||||
logger.warning("Security scan failed for %s: %s", skill_dir, e)
|
||||
return None
|
||||
|
||||
import yaml
|
||||
|
||||
|
||||
|
|
@ -196,6 +222,12 @@ def _create_skill(name: str, content: str, category: str = None) -> Dict[str, An
|
|||
skill_md = skill_dir / "SKILL.md"
|
||||
skill_md.write_text(content, encoding="utf-8")
|
||||
|
||||
# Security scan — roll back on block
|
||||
scan_error = _security_scan_skill(skill_dir)
|
||||
if scan_error:
|
||||
shutil.rmtree(skill_dir, ignore_errors=True)
|
||||
return {"success": False, "error": scan_error}
|
||||
|
||||
result = {
|
||||
"success": True,
|
||||
"message": f"Skill '{name}' created.",
|
||||
|
|
@ -222,8 +254,17 @@ def _edit_skill(name: str, content: str) -> Dict[str, Any]:
|
|||
return {"success": False, "error": f"Skill '{name}' not found. Use skills_list() to see available skills."}
|
||||
|
||||
skill_md = existing["path"] / "SKILL.md"
|
||||
# Back up original content for rollback
|
||||
original_content = skill_md.read_text(encoding="utf-8") if skill_md.exists() else None
|
||||
skill_md.write_text(content, encoding="utf-8")
|
||||
|
||||
# Security scan — roll back on block
|
||||
scan_error = _security_scan_skill(existing["path"])
|
||||
if scan_error:
|
||||
if original_content is not None:
|
||||
skill_md.write_text(original_content, encoding="utf-8")
|
||||
return {"success": False, "error": scan_error}
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"message": f"Skill '{name}' updated.",
|
||||
|
|
@ -300,8 +341,15 @@ def _patch_skill(
|
|||
"error": f"Patch would break SKILL.md structure: {err}",
|
||||
}
|
||||
|
||||
original_content = content # for rollback
|
||||
target.write_text(new_content, encoding="utf-8")
|
||||
|
||||
# Security scan — roll back on block
|
||||
scan_error = _security_scan_skill(skill_dir)
|
||||
if scan_error:
|
||||
target.write_text(original_content, encoding="utf-8")
|
||||
return {"success": False, "error": scan_error}
|
||||
|
||||
replacements = count if replace_all else 1
|
||||
return {
|
||||
"success": True,
|
||||
|
|
@ -344,8 +392,19 @@ def _write_file(name: str, file_path: str, file_content: str) -> Dict[str, Any]:
|
|||
|
||||
target = existing["path"] / file_path
|
||||
target.parent.mkdir(parents=True, exist_ok=True)
|
||||
# Back up for rollback
|
||||
original_content = target.read_text(encoding="utf-8") if target.exists() else None
|
||||
target.write_text(file_content, encoding="utf-8")
|
||||
|
||||
# Security scan — roll back on block
|
||||
scan_error = _security_scan_skill(existing["path"])
|
||||
if scan_error:
|
||||
if original_content is not None:
|
||||
target.write_text(original_content, encoding="utf-8")
|
||||
else:
|
||||
target.unlink(missing_ok=True)
|
||||
return {"success": False, "error": scan_error}
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"message": f"File '{file_path}' written to skill '{name}'.",
|
||||
|
|
|
|||
|
|
@ -43,6 +43,7 @@ INSTALL_POLICY = {
|
|||
"builtin": ("allow", "allow", "allow"),
|
||||
"trusted": ("allow", "allow", "block"),
|
||||
"community": ("allow", "block", "block"),
|
||||
"agent-created": ("allow", "block", "block"),
|
||||
}
|
||||
|
||||
VERDICT_INDEX = {"safe": 0, "caution": 1, "dangerous": 2}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue