diff --git a/src/core.py b/src/core.py index a21937d..cdd9807 100644 --- a/src/core.py +++ b/src/core.py @@ -2,73 +2,47 @@ import json import os import smtplib from email.message import EmailMessage - from loguru import logger - class AttackManager: - def __init__(self): - # Загружаем настройки и сразу проверяем их наличие + def __init__(self, vesion: int): + self.dir_version = f"v{vesion}" + self.webhook_url = os.getenv("WEBHOOK_URL") self.sender_email = os.getenv("EMAIL_SENDER") self.sender_password = os.getenv("EMAIL_PASSWORD") + self.target_email = os.getenv("TARGET_EMAIL") self.smtp_server = os.getenv("SMTP_SERVER", "smtp.yandex.ru") self.smtp_port = int(os.getenv("SMTP_PORT", 465)) - # ПРОВЕРКА: Если чего-то не хватает, бросаем ошибку - # Это уберет ошибку Pylance, так как после этого блока переменные точно str - if not all([self.webhook_url, self.sender_email, self.sender_password]): - logger.critical( - "❌ Ошибка: Проверьте файл .env! Отсутствуют обязательные параметры." - ) + if not all([self.webhook_url, self.sender_email, self.sender_password, self.target_email]): + logger.critical("❌ Ошибка: Проверьте .env!") raise ValueError("Missing environment variables") - self.payloads_path = os.path.join("data", "payloads.json") + # Пути к двум файлам + self.benign_path = os.path.join("experiments", self.dir_version, "benign.json") + self.malicious_path = os.path.join("experiments", self.dir_version, "malicious.json") - def get_payloads(self) -> list[dict]: - """Загружает все доступные атаки из JSON.""" - with open(self.payloads_path, "r", encoding="utf-8") as f: + def load_payloads(self, path) -> list[dict]: + with open(path, "r", encoding="utf-8") as f: return json.load(f) - def prepare_message(self, target_email: str, attack: dict) -> EmailMessage: - """Подготавливает письмо на основе шаблона атаки.""" - # Заменяем плейсхолдер на реальный URL - # Мы уверены, что self.webhook_url это str благодаря проверке в __init__ - final_body = attack["body"].replace("{{WEBHOOK_URL}}", str(self.webhook_url)) - - msg = EmailMessage() - msg.set_content(final_body) - msg["Subject"] = attack["subject"] - msg["From"] = self.sender_email - msg["To"] = target_email - - return msg - - def execute_attacks(self, target_email: str, payload_ids: list[str] | None = None): - """ - Отправляет выбранные атаки (или все сразу, если payload_ids не указан). - """ - payloads = self.get_payloads() - - # Если список ID не передан, берем все атаки из файла - to_send = [p for p in payloads if payload_ids is None or p["id"] in payload_ids] - - if not to_send: - logger.warning("Нет атак для отправки. Проверьте ID.") - return - + def send_payload(self, payload: dict): + """Логика отправки одного письма.""" try: - logger.info(f"Connecting to {self.smtp_server}...") + # Заменяем плейсхолдер на реальный Webhook URL + body = payload["body"].replace("{{WEBHOOK_URL}}", str(self.webhook_url)) + + msg = EmailMessage() + msg.set_content(body) + msg["Subject"] = payload["subject"] + msg["From"] = self.sender_email + msg["To"] = self.target_email + with smtplib.SMTP_SSL(self.smtp_server, self.smtp_port) as server: - # Теперь Pylance спокоен, так как мы гарантировали, что это str server.login(str(self.sender_email), str(self.sender_password)) - - for attack in to_send: - msg = self.prepare_message(target_email, attack) - server.send_message(msg) - logger.success(f"✅ Attack '{attack['id']}' sent to {target_email}") - - logger.info(f"All attacks completed. Monitor Webhook: {self.webhook_url}") - + server.send_message(msg) + return True except Exception as e: - logger.error(f"Failed to execute attacks: {e}") + logger.error(f"Ошибка отправки: {e}") + return False \ No newline at end of file diff --git a/src/main.py b/src/main.py index 0ba3bd1..9b9d977 100644 --- a/src/main.py +++ b/src/main.py @@ -1,25 +1,70 @@ +import os +import sys from dotenv import load_dotenv from loguru import logger - from core import AttackManager +VERSION = 2 + load_dotenv() - -def run(): - manager = AttackManager() - TARGET = "ПОЧТА-АГЕНТА@yandex.ru" - - logger.info("--- Lambda Red Team Toolkit ---") - - # Просто закомментируй/раскомментируй нужную строку перед запуском: +def main(): + manager = AttackManager(VERSION) - # 1. Тестируем всё сразу: - manager.execute_attacks(TARGET) + # Загружаем списки + benign_list = manager.load_payloads(manager.benign_path) + malicious_list = manager.load_payloads(manager.malicious_path) - # 2. Или тестируем только конкретную (например, новую про лицензии): - # manager.execute_attacks(TARGET, payload_ids=["pirated_software_keys"]) + while True: + print("\n" + "="*50) + print("🚀 LAMBDA RED TEAM TOOLKIT: INTERACTIVE CONSOLE") + print("="*50) + print(f"Цель: {manager.target_email}") + print(f"Отправитель: {manager.sender_email}") + print("-" * 50) + print("\n🟢 ОБЫЧНЫЕ ПИСЬМА (BENIGN):") + for i, p in enumerate(benign_list): + print(f" [B{i}] {p['name']}") + + print("\n🔴 ИНЪЕКЦИИ (MALICIOUS):") + for i, p in enumerate(malicious_list): + print(f" [M{i}] {p['name']}") + + print("\n[Q] Выход") + + choice = input("\nВыберите ID письма (например, B0 или M0): ").strip().upper() + + if choice == 'Q': + logger.info("Выход из системы.") + break + + # Логика выбора из нужного списка + target_list = None + idx = -1 + + try: + if choice.startswith('B'): + target_list = benign_list + idx = int(choice[1:]) + elif choice.startswith('M'): + target_list = malicious_list + idx = int(choice[1:]) + + if target_list and 0 <= idx < len(target_list): + selected = target_list[idx] + print(f"\nВыбрано: {selected['name']}") + confirm = input("Отправить письмо? (y/n): ").lower() + + if confirm == 'y': + if manager.send_payload(selected): + logger.success(f"Успешно отправлено: {selected['subject']}") + else: + logger.error("Не удалось отправить письмо.") + else: + print("❌ Ошибка: Неверный ID письма.") + except ValueError: + print("❌ Ошибка: Введите корректный ID (буква + число).") if __name__ == "__main__": - run() + main() \ No newline at end of file