first
This commit is contained in:
commit
57b2bee9f7
4 changed files with 255 additions and 0 deletions
9
.env.example
Normal file
9
.env.example
Normal file
|
|
@ -0,0 +1,9 @@
|
||||||
|
# Публичный ящик – откуда фильтр забирает письма (тот же, что EMAIL_SENDER)
|
||||||
|
PUBLIC_IMAP=imap.yandex.ru
|
||||||
|
PUBLIC_EMAIL=your_public_email
|
||||||
|
PUBLIC_PASSWORD=your_public_app_password
|
||||||
|
# Приватный ящик – куда фильтр пересылает чистые письма (этот читает OpenClaw)
|
||||||
|
PRIVATE_SMTP=smtp.yandex.ru
|
||||||
|
PRIVATE_SMTP_PORT=465
|
||||||
|
TARGET_EMAIL=your_private_email
|
||||||
|
PRIVATE_PASSWORD=your_private_app_password
|
||||||
1
.gitignore
vendored
Normal file
1
.gitignore
vendored
Normal file
|
|
@ -0,0 +1 @@
|
||||||
|
.env
|
||||||
182
email_filter.py
Normal file
182
email_filter.py
Normal file
|
|
@ -0,0 +1,182 @@
|
||||||
|
# email_filter.py
|
||||||
|
import imaplib
|
||||||
|
import smtplib
|
||||||
|
import email
|
||||||
|
import re
|
||||||
|
import time
|
||||||
|
import os
|
||||||
|
from email.policy import default
|
||||||
|
from email.message import EmailMessage
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
import base64
|
||||||
|
import codecs
|
||||||
|
from html import unescape
|
||||||
|
import re
|
||||||
|
|
||||||
|
|
||||||
|
def extract_text_from_html(html_content):
|
||||||
|
# Простейшее удаление тегов
|
||||||
|
text = re.sub(r'<[^>]+>', ' ', html_content)
|
||||||
|
text = unescape(text)
|
||||||
|
return text
|
||||||
|
|
||||||
|
|
||||||
|
load_dotenv()
|
||||||
|
|
||||||
|
# --- Конфигурация публичного ящика (грязный) ---
|
||||||
|
PUBLIC_IMAP = os.getenv("PUBLIC_IMAP", "imap.yandex.ru")
|
||||||
|
PUBLIC_EMAIL = os.getenv("PUBLIC_EMAIL") # например, "filter@yandex.ru"
|
||||||
|
PUBLIC_PASSWORD = os.getenv("PUBLIC_PASSWORD")
|
||||||
|
|
||||||
|
# --- Конфигурация приватного ящика (чистый, для OpenClaw) ---
|
||||||
|
PRIVATE_SMTP = os.getenv("PRIVATE_SMTP", "smtp.yandex.ru")
|
||||||
|
PRIVATE_SMTP_PORT = int(os.getenv("PRIVATE_SMTP_PORT", 465))
|
||||||
|
PRIVATE_EMAIL = os.getenv("PRIVATE_EMAIL") # "openclaw@yandex.ru"
|
||||||
|
PRIVATE_PASSWORD = os.getenv("PRIVATE_PASSWORD")
|
||||||
|
|
||||||
|
INJECTION_PATTERNS = [
|
||||||
|
r"(?i)ignor(?:e)?\s*(?:all|the|any)?\s*(?:previous|above|all)?\s*instructions?", # лучше
|
||||||
|
# system override
|
||||||
|
r"(?i)system\s*(?:override|message)",
|
||||||
|
r"(?i)you\s+(?:must|will|are obliged to)", # принуждение
|
||||||
|
r"(?i)read_file\s*\(", # read_file(
|
||||||
|
r"(?i)web_fetch\s*\(", # web_fetch(
|
||||||
|
r"(?i)curl\s+.*\|\s*bash", # curl | bash
|
||||||
|
r"(?i)\{\{.*WEBHOOK.*\}\}", # плейсхолдер
|
||||||
|
r"(?i)exfiltrat", # exfiltration
|
||||||
|
# base64 decode
|
||||||
|
r"(?i)base64\s*.decode",
|
||||||
|
r"(?i)\|base64", # | base64
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def detect_encoding(text):
|
||||||
|
"""Проверяет, не закодирована ли инструкция в base64 или ROT13."""
|
||||||
|
# base64
|
||||||
|
b64_pattern = re.compile(r'^[A-Za-z0-9+/]+={0,2}$')
|
||||||
|
for line in text.splitlines():
|
||||||
|
clean = line.strip()
|
||||||
|
if b64_pattern.match(clean) and len(clean) > 20:
|
||||||
|
try:
|
||||||
|
decoded = base64.b64decode(clean).decode(
|
||||||
|
'utf-8', errors='ignore')
|
||||||
|
return True, decoded
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
# ROT13
|
||||||
|
rot13_candidates = ["jevgr", "cuvfuvat", "qrp elcg", "rkrphgr"]
|
||||||
|
if any(cand in text.lower() for cand in rot13_candidates):
|
||||||
|
decoded = codecs.decode(text, 'rot_13')
|
||||||
|
return True, decoded
|
||||||
|
return False, None
|
||||||
|
|
||||||
|
|
||||||
|
def is_malicious_email(raw_email_bytes):
|
||||||
|
"""Возвращает True, если письмо содержит промпт-инъекцию."""
|
||||||
|
msg = email.message_from_bytes(raw_email_bytes, policy=default)
|
||||||
|
|
||||||
|
# Проверяем тему
|
||||||
|
subject = msg.get("Subject", "")
|
||||||
|
if any(re.search(p, subject) for p in INJECTION_PATTERNS):
|
||||||
|
return True
|
||||||
|
|
||||||
|
# Проверяем тело (plain text)
|
||||||
|
body = ""
|
||||||
|
if msg.is_multipart():
|
||||||
|
for part in msg.walk():
|
||||||
|
if part.get_content_type() == "text/plain":
|
||||||
|
body = part.get_payload(decode=True).decode(
|
||||||
|
'utf-8', errors='ignore')
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
body = msg.get_payload(decode=True).decode('utf-8', errors='ignore')
|
||||||
|
if msg.is_multipart():
|
||||||
|
for part in msg.walk():
|
||||||
|
if part.get_content_type() == "text/plain":
|
||||||
|
body = part.get_payload(decode=True).decode(
|
||||||
|
'utf-8', errors='ignore')
|
||||||
|
break
|
||||||
|
elif part.get_content_type() == "text/html":
|
||||||
|
html = part.get_payload(decode=True).decode(
|
||||||
|
'utf-8', errors='ignore')
|
||||||
|
body = extract_text_from_html(html)
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
if msg.get_content_type() == "text/html":
|
||||||
|
html = msg.get_payload(decode=True).decode(
|
||||||
|
'utf-8', errors='ignore')
|
||||||
|
body = extract_text_from_html(html)
|
||||||
|
else:
|
||||||
|
body = msg.get_payload(decode=True).decode(
|
||||||
|
'utf-8', errors='ignore')
|
||||||
|
# Прямой поиск
|
||||||
|
for pattern in INJECTION_PATTERNS:
|
||||||
|
if re.search(pattern, body):
|
||||||
|
return True
|
||||||
|
|
||||||
|
# Проверка на кодирование
|
||||||
|
encoded, decoded = detect_encoding(body)
|
||||||
|
if encoded:
|
||||||
|
for pattern in INJECTION_PATTERNS:
|
||||||
|
if re.search(pattern, decoded):
|
||||||
|
return True
|
||||||
|
|
||||||
|
# Здесь можно добавить проверку вложений (PDF, DOCX) — вызывать вашу существующую pdf_has_hidden_text
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def forward_email(raw_email_bytes):
|
||||||
|
"""Пересылает письмо в приватный ящик (почти без изменений)."""
|
||||||
|
original = email.message_from_bytes(raw_email_bytes, policy=default)
|
||||||
|
new_msg = EmailMessage()
|
||||||
|
# Копируем содержимое
|
||||||
|
if original.is_multipart():
|
||||||
|
new_msg.set_content(original.get_body(
|
||||||
|
preferencelist=('plain')).get_content())
|
||||||
|
else:
|
||||||
|
new_msg.set_content(original.get_payload(
|
||||||
|
decode=True).decode('utf-8', errors='ignore'))
|
||||||
|
new_msg['Subject'] = original['Subject']
|
||||||
|
new_msg['From'] = PRIVATE_EMAIL
|
||||||
|
new_msg['To'] = PRIVATE_EMAIL
|
||||||
|
# Можно сохранить оригинального отправителя в поле Reply-To
|
||||||
|
new_msg['Reply-To'] = original['From']
|
||||||
|
|
||||||
|
with smtplib.SMTP_SSL(PRIVATE_SMTP, PRIVATE_SMTP_PORT) as server:
|
||||||
|
server.login(PRIVATE_EMAIL, PRIVATE_PASSWORD)
|
||||||
|
server.send_message(new_msg)
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
print("[🛡️] Email Filter запущен. Проверка входящих писем...")
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
mail = imaplib.IMAP4_SSL(PUBLIC_IMAP)
|
||||||
|
mail.login(PUBLIC_EMAIL, PUBLIC_PASSWORD)
|
||||||
|
mail.select("inbox")
|
||||||
|
|
||||||
|
result, data = mail.search(None, "UNSEEN")
|
||||||
|
if result == "OK":
|
||||||
|
for num in data[0].split():
|
||||||
|
_, msg_data = mail.fetch(num, "(RFC822)")
|
||||||
|
raw_email = msg_data[0][1]
|
||||||
|
|
||||||
|
if is_malicious_email(raw_email):
|
||||||
|
print("[⚠️] Опасно! Письмо заблокировано (инъекция).")
|
||||||
|
# Помечаем как прочитанное и удаляем (или перемещаем в папку Spam)
|
||||||
|
mail.store(num, "+FLAGS", "\\Deleted")
|
||||||
|
else:
|
||||||
|
print("[✅] Письмо безопасно. Пересылаю агенту.")
|
||||||
|
forward_email(raw_email)
|
||||||
|
# удаляем из публичного ящика
|
||||||
|
mail.store(num, "+FLAGS", "\\Deleted")
|
||||||
|
mail.expunge()
|
||||||
|
mail.close()
|
||||||
|
mail.logout()
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[❌] Ошибка: {e}")
|
||||||
|
time.sleep(30) # пауза 30 секунд
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
63
tool_validator.py
Normal file
63
tool_validator.py
Normal file
|
|
@ -0,0 +1,63 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
# tool_validator.py - Плагин для контроля вызовов OpenClaw
|
||||||
|
|
||||||
|
import sys
|
||||||
|
import json
|
||||||
|
import re
|
||||||
|
|
||||||
|
|
||||||
|
def validate_tool_call(tool_name: str, tool_args: dict) -> dict:
|
||||||
|
"""Валидация каждого вызова инструмента перед выполнением."""
|
||||||
|
|
||||||
|
# Блокируем опасные вызовы чтения файлов
|
||||||
|
if tool_name == "read_file":
|
||||||
|
target_file = tool_args.get("file_path", "")
|
||||||
|
sensitive_files = [".env", ".ssh/id_rsa", ".npmrc", "IDENTITY.md"]
|
||||||
|
for sensitive in sensitive_files:
|
||||||
|
if sensitive in target_file:
|
||||||
|
return {
|
||||||
|
"blocked": True,
|
||||||
|
"reason": f"Доступ к файлу {target_file} запрещён политикой безопасности"
|
||||||
|
}
|
||||||
|
|
||||||
|
# Блокируем эксфильтрацию через web_fetch
|
||||||
|
if tool_name == "web_fetch":
|
||||||
|
url = tool_args.get("url", "")
|
||||||
|
blocklist_patterns = ["webhook.site", "hookbin.com", "ngrok.io"]
|
||||||
|
for pattern in blocklist_patterns:
|
||||||
|
if pattern in url:
|
||||||
|
return {
|
||||||
|
"blocked": True,
|
||||||
|
"reason": f"Исходящий запрос на {url} запрещён (риск эксфильтрации)"
|
||||||
|
}
|
||||||
|
|
||||||
|
# Блокируем выполнение произвольных команд в bash
|
||||||
|
if tool_name == "execute_command":
|
||||||
|
command = tool_args.get("command", "")
|
||||||
|
if re.search(r'\|\s*(base64|sh|bash)', command) or re.search(r'curl.*\|\s*bash', command):
|
||||||
|
return {
|
||||||
|
"blocked": True,
|
||||||
|
"reason": "Выполнение команд с передачей данных через пайп или bash запрещено"
|
||||||
|
}
|
||||||
|
|
||||||
|
return {"blocked": False}
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
if len(sys.argv) < 3:
|
||||||
|
print(json.dumps({"error": "Недостаточно аргументов"}))
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
tool_name = sys.argv[1]
|
||||||
|
try:
|
||||||
|
tool_args = json.loads(sys.argv[2])
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
print(json.dumps({"error": "Неверный формат JSON для аргументов"}))
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
result = validate_tool_call(tool_name, tool_args)
|
||||||
|
print(json.dumps(result))
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
Loading…
Add table
Add a link
Reference in a new issue