commit 57b2bee9f78c0b64a875febd0991a86bbea371d3 Author: dropboy27 Date: Fri Apr 24 14:25:23 2026 +0300 first diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..c789ff1 --- /dev/null +++ b/.env.example @@ -0,0 +1,9 @@ +# Публичный ящик – откуда фильтр забирает письма (тот же, что EMAIL_SENDER) +PUBLIC_IMAP=imap.yandex.ru +PUBLIC_EMAIL=your_public_email +PUBLIC_PASSWORD=your_public_app_password +# Приватный ящик – куда фильтр пересылает чистые письма (этот читает OpenClaw) +PRIVATE_SMTP=smtp.yandex.ru +PRIVATE_SMTP_PORT=465 +TARGET_EMAIL=your_private_email +PRIVATE_PASSWORD=your_private_app_password \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2eea525 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.env \ No newline at end of file diff --git a/email_filter.py b/email_filter.py new file mode 100644 index 0000000..865770f --- /dev/null +++ b/email_filter.py @@ -0,0 +1,182 @@ +# email_filter.py +import imaplib +import smtplib +import email +import re +import time +import os +from email.policy import default +from email.message import EmailMessage +from dotenv import load_dotenv +import base64 +import codecs +from html import unescape +import re + + +def extract_text_from_html(html_content): + # Простейшее удаление тегов + text = re.sub(r'<[^>]+>', ' ', html_content) + text = unescape(text) + return text + + +load_dotenv() + +# --- Конфигурация публичного ящика (грязный) --- +PUBLIC_IMAP = os.getenv("PUBLIC_IMAP", "imap.yandex.ru") +PUBLIC_EMAIL = os.getenv("PUBLIC_EMAIL") # например, "filter@yandex.ru" +PUBLIC_PASSWORD = os.getenv("PUBLIC_PASSWORD") + +# --- Конфигурация приватного ящика (чистый, для OpenClaw) --- +PRIVATE_SMTP = os.getenv("PRIVATE_SMTP", "smtp.yandex.ru") +PRIVATE_SMTP_PORT = int(os.getenv("PRIVATE_SMTP_PORT", 465)) +PRIVATE_EMAIL = os.getenv("PRIVATE_EMAIL") # "openclaw@yandex.ru" +PRIVATE_PASSWORD = os.getenv("PRIVATE_PASSWORD") + +INJECTION_PATTERNS = [ + r"(?i)ignor(?:e)?\s*(?:all|the|any)?\s*(?:previous|above|all)?\s*instructions?", # лучше + # system override + r"(?i)system\s*(?:override|message)", + r"(?i)you\s+(?:must|will|are obliged to)", # принуждение + r"(?i)read_file\s*\(", # read_file( + r"(?i)web_fetch\s*\(", # web_fetch( + r"(?i)curl\s+.*\|\s*bash", # curl | bash + r"(?i)\{\{.*WEBHOOK.*\}\}", # плейсхолдер + r"(?i)exfiltrat", # exfiltration + # base64 decode + r"(?i)base64\s*.decode", + r"(?i)\|base64", # | base64 +] + + +def detect_encoding(text): + """Проверяет, не закодирована ли инструкция в base64 или ROT13.""" + # base64 + b64_pattern = re.compile(r'^[A-Za-z0-9+/]+={0,2}$') + for line in text.splitlines(): + clean = line.strip() + if b64_pattern.match(clean) and len(clean) > 20: + try: + decoded = base64.b64decode(clean).decode( + 'utf-8', errors='ignore') + return True, decoded + except: + pass + # ROT13 + rot13_candidates = ["jevgr", "cuvfuvat", "qrp elcg", "rkrphgr"] + if any(cand in text.lower() for cand in rot13_candidates): + decoded = codecs.decode(text, 'rot_13') + return True, decoded + return False, None + + +def is_malicious_email(raw_email_bytes): + """Возвращает True, если письмо содержит промпт-инъекцию.""" + msg = email.message_from_bytes(raw_email_bytes, policy=default) + + # Проверяем тему + subject = msg.get("Subject", "") + if any(re.search(p, subject) for p in INJECTION_PATTERNS): + return True + + # Проверяем тело (plain text) + body = "" + if msg.is_multipart(): + for part in msg.walk(): + if part.get_content_type() == "text/plain": + body = part.get_payload(decode=True).decode( + 'utf-8', errors='ignore') + break + else: + body = msg.get_payload(decode=True).decode('utf-8', errors='ignore') + if msg.is_multipart(): + for part in msg.walk(): + if part.get_content_type() == "text/plain": + body = part.get_payload(decode=True).decode( + 'utf-8', errors='ignore') + break + elif part.get_content_type() == "text/html": + html = part.get_payload(decode=True).decode( + 'utf-8', errors='ignore') + body = extract_text_from_html(html) + break + else: + if msg.get_content_type() == "text/html": + html = msg.get_payload(decode=True).decode( + 'utf-8', errors='ignore') + body = extract_text_from_html(html) + else: + body = msg.get_payload(decode=True).decode( + 'utf-8', errors='ignore') + # Прямой поиск + for pattern in INJECTION_PATTERNS: + if re.search(pattern, body): + return True + + # Проверка на кодирование + encoded, decoded = detect_encoding(body) + if encoded: + for pattern in INJECTION_PATTERNS: + if re.search(pattern, decoded): + return True + + # Здесь можно добавить проверку вложений (PDF, DOCX) — вызывать вашу существующую pdf_has_hidden_text + return False + + +def forward_email(raw_email_bytes): + """Пересылает письмо в приватный ящик (почти без изменений).""" + original = email.message_from_bytes(raw_email_bytes, policy=default) + new_msg = EmailMessage() + # Копируем содержимое + if original.is_multipart(): + new_msg.set_content(original.get_body( + preferencelist=('plain')).get_content()) + else: + new_msg.set_content(original.get_payload( + decode=True).decode('utf-8', errors='ignore')) + new_msg['Subject'] = original['Subject'] + new_msg['From'] = PRIVATE_EMAIL + new_msg['To'] = PRIVATE_EMAIL + # Можно сохранить оригинального отправителя в поле Reply-To + new_msg['Reply-To'] = original['From'] + + with smtplib.SMTP_SSL(PRIVATE_SMTP, PRIVATE_SMTP_PORT) as server: + server.login(PRIVATE_EMAIL, PRIVATE_PASSWORD) + server.send_message(new_msg) + + +def main(): + print("[🛡️] Email Filter запущен. Проверка входящих писем...") + while True: + try: + mail = imaplib.IMAP4_SSL(PUBLIC_IMAP) + mail.login(PUBLIC_EMAIL, PUBLIC_PASSWORD) + mail.select("inbox") + + result, data = mail.search(None, "UNSEEN") + if result == "OK": + for num in data[0].split(): + _, msg_data = mail.fetch(num, "(RFC822)") + raw_email = msg_data[0][1] + + if is_malicious_email(raw_email): + print("[⚠️] Опасно! Письмо заблокировано (инъекция).") + # Помечаем как прочитанное и удаляем (или перемещаем в папку Spam) + mail.store(num, "+FLAGS", "\\Deleted") + else: + print("[✅] Письмо безопасно. Пересылаю агенту.") + forward_email(raw_email) + # удаляем из публичного ящика + mail.store(num, "+FLAGS", "\\Deleted") + mail.expunge() + mail.close() + mail.logout() + except Exception as e: + print(f"[❌] Ошибка: {e}") + time.sleep(30) # пауза 30 секунд + + +if __name__ == "__main__": + main() diff --git a/tool_validator.py b/tool_validator.py new file mode 100644 index 0000000..4dc6ee4 --- /dev/null +++ b/tool_validator.py @@ -0,0 +1,63 @@ +#!/usr/bin/env python3 +# tool_validator.py - Плагин для контроля вызовов OpenClaw + +import sys +import json +import re + + +def validate_tool_call(tool_name: str, tool_args: dict) -> dict: + """Валидация каждого вызова инструмента перед выполнением.""" + + # Блокируем опасные вызовы чтения файлов + if tool_name == "read_file": + target_file = tool_args.get("file_path", "") + sensitive_files = [".env", ".ssh/id_rsa", ".npmrc", "IDENTITY.md"] + for sensitive in sensitive_files: + if sensitive in target_file: + return { + "blocked": True, + "reason": f"Доступ к файлу {target_file} запрещён политикой безопасности" + } + + # Блокируем эксфильтрацию через web_fetch + if tool_name == "web_fetch": + url = tool_args.get("url", "") + blocklist_patterns = ["webhook.site", "hookbin.com", "ngrok.io"] + for pattern in blocklist_patterns: + if pattern in url: + return { + "blocked": True, + "reason": f"Исходящий запрос на {url} запрещён (риск эксфильтрации)" + } + + # Блокируем выполнение произвольных команд в bash + if tool_name == "execute_command": + command = tool_args.get("command", "") + if re.search(r'\|\s*(base64|sh|bash)', command) or re.search(r'curl.*\|\s*bash', command): + return { + "blocked": True, + "reason": "Выполнение команд с передачей данных через пайп или bash запрещено" + } + + return {"blocked": False} + + +def main(): + if len(sys.argv) < 3: + print(json.dumps({"error": "Недостаточно аргументов"})) + sys.exit(1) + + tool_name = sys.argv[1] + try: + tool_args = json.loads(sys.argv[2]) + except json.JSONDecodeError: + print(json.dumps({"error": "Неверный формат JSON для аргументов"})) + sys.exit(1) + + result = validate_tool_call(tool_name, tool_args) + print(json.dumps(result)) + + +if __name__ == "__main__": + main()