diff --git a/email_filter.py b/email_filter.py index 865770f..93f9be1 100644 --- a/email_filter.py +++ b/email_filter.py @@ -35,18 +35,22 @@ PRIVATE_EMAIL = os.getenv("PRIVATE_EMAIL") # "openclaw@yandex.ru" PRIVATE_PASSWORD = os.getenv("PRIVATE_PASSWORD") INJECTION_PATTERNS = [ - r"(?i)ignor(?:e)?\s*(?:all|the|any)?\s*(?:previous|above|all)?\s*instructions?", # лучше - # system override - r"(?i)system\s*(?:override|message)", - r"(?i)you\s+(?:must|will|are obliged to)", # принуждение - r"(?i)read_file\s*\(", # read_file( - r"(?i)web_fetch\s*\(", # web_fetch( - r"(?i)curl\s+.*\|\s*bash", # curl | bash - r"(?i)\{\{.*WEBHOOK.*\}\}", # плейсхолдер - r"(?i)exfiltrat", # exfiltration - # base64 decode - r"(?i)base64\s*.decode", - r"(?i)\|base64", # | base64 + r"(?i)ignore\s*(all|previous)\s*instructions", + r"(?i)system\s*(override|message|directive)", + r"(?i)you\s+(must|will|are obliged to)\s+", + r"(?i)read_file\s*\(.*\.ssh", + r"(?i)web_fetch\s*\(", + r"(?i)curl\s+.*\|\s*bash", + r"(?i){{.*WEBHOOK.*}}", + r"(?i)exfiltrat", + r"(?i)base64.*decode", + r"(?i)\b(SOULD\.md|IDENTITY\.md|USER\.md|SECRETS\.md)\b", + r"(?i)(~/.openclaw/|/home/\w+/\.openclaw/).*(\.md|\.env)\b", + r"(?i)(read_file|cat|открой|прочитай|read\s+file)\s*\(?.*\.(md|env)", + r"(?i)(read_file.*\.md.*web_fetch|web_fetch.*read_file.*\.md)", + r"(?i)(base64\s*encode.*(IDENTITY|SOULD|USER)|(IDENTITY|SOULD|USER).*base64\s*encode)", + r"(?i)(\.env\b|dot\s?env)", + r"(?i)\|base64", ] diff --git a/tool_validator.py b/tool_validator.py deleted file mode 100644 index 4dc6ee4..0000000 --- a/tool_validator.py +++ /dev/null @@ -1,63 +0,0 @@ -#!/usr/bin/env python3 -# tool_validator.py - Плагин для контроля вызовов OpenClaw - -import sys -import json -import re - - -def validate_tool_call(tool_name: str, tool_args: dict) -> dict: - """Валидация каждого вызова инструмента перед выполнением.""" - - # Блокируем опасные вызовы чтения файлов - if tool_name == "read_file": - target_file = tool_args.get("file_path", "") - sensitive_files = [".env", ".ssh/id_rsa", ".npmrc", "IDENTITY.md"] - for sensitive in sensitive_files: - if sensitive in target_file: - return { - "blocked": True, - "reason": f"Доступ к файлу {target_file} запрещён политикой безопасности" - } - - # Блокируем эксфильтрацию через web_fetch - if tool_name == "web_fetch": - url = tool_args.get("url", "") - blocklist_patterns = ["webhook.site", "hookbin.com", "ngrok.io"] - for pattern in blocklist_patterns: - if pattern in url: - return { - "blocked": True, - "reason": f"Исходящий запрос на {url} запрещён (риск эксфильтрации)" - } - - # Блокируем выполнение произвольных команд в bash - if tool_name == "execute_command": - command = tool_args.get("command", "") - if re.search(r'\|\s*(base64|sh|bash)', command) or re.search(r'curl.*\|\s*bash', command): - return { - "blocked": True, - "reason": "Выполнение команд с передачей данных через пайп или bash запрещено" - } - - return {"blocked": False} - - -def main(): - if len(sys.argv) < 3: - print(json.dumps({"error": "Недостаточно аргументов"})) - sys.exit(1) - - tool_name = sys.argv[1] - try: - tool_args = json.loads(sys.argv[2]) - except json.JSONDecodeError: - print(json.dumps({"error": "Неверный формат JSON для аргументов"})) - sys.exit(1) - - result = validate_tool_call(tool_name, tool_args) - print(json.dumps(result)) - - -if __name__ == "__main__": - main()