refactor(domain-intel): streamline documentation and add CLI tool for domain intelligence operations
This commit is contained in:
parent
924570c5be
commit
0862fa96fd
2 changed files with 481 additions and 380 deletions
|
|
@ -1,392 +1,96 @@
|
||||||
---
|
---
|
||||||
name: domain-intel
|
name: domain-intel
|
||||||
description: Passive domain reconnaissance using Python stdlib. Use this skill for subdomain discovery, SSL certificate inspection, WHOIS lookups, DNS records, domain availability checks, and bulk multi-domain analysis. No API keys required. Triggers on requests like "find subdomains", "check ssl cert", "whois lookup", "is this domain available", "bulk check these domains".
|
description: Passive domain reconnaissance using Python stdlib. Subdomain discovery, SSL certificate inspection, WHOIS lookups, DNS records, domain availability checks, and bulk multi-domain analysis. No API keys required.
|
||||||
---
|
---
|
||||||
|
|
||||||
# Domain Intelligence — Passive OSINT
|
# Domain Intelligence — Passive OSINT
|
||||||
|
|
||||||
Passive domain reconnaissance using only Python stdlib and public data sources.
|
Passive domain reconnaissance using only Python stdlib.
|
||||||
**Zero dependencies. Zero API keys. Works out of the box.**
|
**Zero dependencies. Zero API keys. Works on Linux, macOS, and Windows.**
|
||||||
|
|
||||||
## Data Sources
|
## Helper script
|
||||||
|
|
||||||
- **crt.sh** — Certificate Transparency logs (subdomain discovery)
|
This skill includes `scripts/domain_intel.py` — a complete CLI tool for all domain intelligence operations.
|
||||||
- **WHOIS servers** — Direct TCP queries to 100+ authoritative TLD servers
|
|
||||||
- **Google DNS-over-HTTPS** — MX/NS/TXT/CNAME resolution
|
```bash
|
||||||
|
# Subdomain discovery via Certificate Transparency logs
|
||||||
|
python3 SKILL_DIR/scripts/domain_intel.py subdomains example.com
|
||||||
|
|
||||||
|
# SSL certificate inspection (expiry, cipher, SANs, issuer)
|
||||||
|
python3 SKILL_DIR/scripts/domain_intel.py ssl example.com
|
||||||
|
|
||||||
|
# WHOIS lookup (registrar, dates, name servers — 100+ TLDs)
|
||||||
|
python3 SKILL_DIR/scripts/domain_intel.py whois example.com
|
||||||
|
|
||||||
|
# DNS records (A, AAAA, MX, NS, TXT, CNAME)
|
||||||
|
python3 SKILL_DIR/scripts/domain_intel.py dns example.com
|
||||||
|
|
||||||
|
# Domain availability check (passive: DNS + WHOIS + SSL signals)
|
||||||
|
python3 SKILL_DIR/scripts/domain_intel.py available coolstartup.io
|
||||||
|
|
||||||
|
# Bulk analysis — multiple domains, multiple checks in parallel
|
||||||
|
python3 SKILL_DIR/scripts/domain_intel.py bulk example.com github.com google.com
|
||||||
|
python3 SKILL_DIR/scripts/domain_intel.py bulk example.com github.com --checks ssl,dns
|
||||||
|
```
|
||||||
|
|
||||||
|
`SKILL_DIR` is the directory containing this SKILL.md file. All output is structured JSON.
|
||||||
|
|
||||||
|
## Available commands
|
||||||
|
|
||||||
|
| Command | What it does | Data source |
|
||||||
|
|---------|-------------|-------------|
|
||||||
|
| `subdomains` | Find subdomains from certificate logs | crt.sh (HTTPS) |
|
||||||
|
| `ssl` | Inspect TLS certificate details | Direct TCP:443 to target |
|
||||||
|
| `whois` | Registration info, registrar, dates | WHOIS servers (TCP:43) |
|
||||||
|
| `dns` | A, AAAA, MX, NS, TXT, CNAME records | System DNS + Google DoH |
|
||||||
|
| `available` | Check if domain is registered | DNS + WHOIS + SSL signals |
|
||||||
|
| `bulk` | Run multiple checks on multiple domains | All of the above |
|
||||||
|
|
||||||
|
## When to use this vs built-in tools
|
||||||
|
|
||||||
|
- **Use this skill** for infrastructure questions: subdomains, SSL certs, WHOIS, DNS records, availability
|
||||||
|
- **Use `web_search`** for general research about what a domain/company does
|
||||||
|
- **Use `web_extract`** to get the actual content of a webpage
|
||||||
|
- **Use `terminal` with `curl -I`** for a simple "is this URL reachable" check
|
||||||
|
|
||||||
|
| Task | Better tool | Why |
|
||||||
|
|------|-------------|-----|
|
||||||
|
| "What does example.com do?" | `web_extract` | Gets page content, not DNS/WHOIS data |
|
||||||
|
| "Find info about a company" | `web_search` | General research, not domain-specific |
|
||||||
|
| "Is this website safe?" | `web_search` | Reputation checks need web context |
|
||||||
|
| "Check if a URL is reachable" | `terminal` with `curl -I` | Simple HTTP check |
|
||||||
|
| "Find subdomains of X" | **This skill** | Only passive source for this |
|
||||||
|
| "When does the SSL cert expire?" | **This skill** | Built-in tools can't inspect TLS |
|
||||||
|
| "Who registered this domain?" | **This skill** | WHOIS data not in web search |
|
||||||
|
| "Is coolstartup.io available?" | **This skill** | Passive availability via DNS+WHOIS+SSL |
|
||||||
|
|
||||||
|
## Platform compatibility
|
||||||
|
|
||||||
|
Pure Python stdlib (`socket`, `ssl`, `urllib`, `json`, `concurrent.futures`).
|
||||||
|
Works identically on Linux, macOS, and Windows with no dependencies.
|
||||||
|
|
||||||
|
- **crt.sh queries** use HTTPS (port 443) — works behind most firewalls
|
||||||
|
- **WHOIS queries** use TCP port 43 — may be blocked on restrictive networks
|
||||||
|
- **DNS queries** use Google DoH (HTTPS) for MX/NS/TXT — firewall-friendly
|
||||||
|
- **SSL checks** connect to the target on port 443 — the only "active" operation
|
||||||
|
|
||||||
|
## Data sources
|
||||||
|
|
||||||
|
All queries are **passive** — no port scanning, no vulnerability testing:
|
||||||
|
|
||||||
|
- **crt.sh** — Certificate Transparency logs (subdomain discovery, HTTPS only)
|
||||||
|
- **WHOIS servers** — Direct TCP to 100+ authoritative TLD registrars
|
||||||
|
- **Google DNS-over-HTTPS** — MX, NS, TXT, CNAME resolution (firewall-friendly)
|
||||||
- **System DNS** — A/AAAA record resolution
|
- **System DNS** — A/AAAA record resolution
|
||||||
|
- **SSL check** is the only "active" operation (TCP connection to target:443)
|
||||||
---
|
|
||||||
|
|
||||||
## Usage
|
|
||||||
|
|
||||||
When the user asks about a domain, use the `terminal` tool to run the appropriate Python snippet below.
|
|
||||||
All functions print structured JSON. Parse and summarize results for the user.
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## 1. Subdomain Discovery (crt.sh)
|
|
||||||
|
|
||||||
```python
|
|
||||||
import json, urllib.request, urllib.parse
|
|
||||||
from datetime import datetime, timezone
|
|
||||||
|
|
||||||
def subdomains(domain, include_expired=False, limit=200):
|
|
||||||
url = f"https://crt.sh/?q=%25.{urllib.parse.quote(domain)}&output=json"
|
|
||||||
req = urllib.request.Request(url, headers={"User-Agent": "domain-intel-skill/1.0", "Accept": "application/json"})
|
|
||||||
with urllib.request.urlopen(req, timeout=15) as r:
|
|
||||||
entries = json.loads(r.read().decode())
|
|
||||||
|
|
||||||
seen, results = set(), []
|
|
||||||
for e in entries:
|
|
||||||
not_after = e.get("not_after", "")
|
|
||||||
if not include_expired and not_after:
|
|
||||||
try:
|
|
||||||
dt = datetime.strptime(not_after[:19], "%Y-%m-%dT%H:%M:%S").replace(tzinfo=timezone.utc)
|
|
||||||
if dt <= datetime.now(timezone.utc):
|
|
||||||
continue
|
|
||||||
except ValueError:
|
|
||||||
pass
|
|
||||||
for name in e.get("name_value", "").splitlines():
|
|
||||||
name = name.strip().lower()
|
|
||||||
if name and name not in seen:
|
|
||||||
seen.add(name)
|
|
||||||
results.append({"subdomain": name, "issuer": e.get("issuer_name",""), "not_after": not_after})
|
|
||||||
|
|
||||||
results.sort(key=lambda r: (r["subdomain"].startswith("*"), r["subdomain"]))
|
|
||||||
results = results[:limit]
|
|
||||||
print(json.dumps({"domain": domain, "count": len(results), "subdomains": results}, indent=2))
|
|
||||||
|
|
||||||
subdomains("DOMAIN_HERE")
|
|
||||||
```
|
|
||||||
|
|
||||||
**Example:** Replace `DOMAIN_HERE` with `example.com`
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## 2. SSL Certificate Inspection
|
|
||||||
|
|
||||||
```python
|
|
||||||
import json, ssl, socket
|
|
||||||
from datetime import datetime, timezone
|
|
||||||
|
|
||||||
def check_ssl(host, port=443, timeout=10):
|
|
||||||
def flat(rdns):
|
|
||||||
r = {}
|
|
||||||
for rdn in rdns:
|
|
||||||
for item in rdn:
|
|
||||||
if isinstance(item, (list,tuple)) and len(item)==2:
|
|
||||||
r[item[0]] = item[1]
|
|
||||||
return r
|
|
||||||
|
|
||||||
def extract_uris(entries):
|
|
||||||
return [e[-1] if isinstance(e,(list,tuple)) else str(e) for e in entries]
|
|
||||||
|
|
||||||
def parse_date(s):
|
|
||||||
for fmt in ("%b %d %H:%M:%S %Y %Z", "%b %d %H:%M:%S %Y %Z"):
|
|
||||||
try: return datetime.strptime(s, fmt).replace(tzinfo=timezone.utc)
|
|
||||||
except ValueError: pass
|
|
||||||
return None
|
|
||||||
|
|
||||||
warning = None
|
|
||||||
try:
|
|
||||||
ctx = ssl.create_default_context()
|
|
||||||
with socket.create_connection((host, port), timeout=timeout) as sock:
|
|
||||||
with ctx.wrap_socket(sock, server_hostname=host) as s:
|
|
||||||
cert, cipher, proto = s.getpeercert(), s.cipher(), s.version()
|
|
||||||
except ssl.SSLCertVerificationError as e:
|
|
||||||
warning = str(e)
|
|
||||||
ctx = ssl.create_default_context()
|
|
||||||
ctx.check_hostname = False
|
|
||||||
ctx.verify_mode = ssl.CERT_NONE
|
|
||||||
with socket.create_connection((host, port), timeout=timeout) as sock:
|
|
||||||
with ctx.wrap_socket(sock, server_hostname=host) as s:
|
|
||||||
cert, cipher, proto = s.getpeercert(), s.cipher(), s.version()
|
|
||||||
|
|
||||||
not_after = parse_date(cert.get("notAfter",""))
|
|
||||||
not_before = parse_date(cert.get("notBefore",""))
|
|
||||||
now = datetime.now(timezone.utc)
|
|
||||||
days = (not_after - now).days if not_after else None
|
|
||||||
is_expired = days is not None and days < 0
|
|
||||||
|
|
||||||
if is_expired: status = f"EXPIRED ({abs(days)} days ago)"
|
|
||||||
elif days is not None and days <= 14: status = f"CRITICAL — {days} day(s) left"
|
|
||||||
elif days is not None and days <= 30: status = f"WARNING — {days} day(s) left"
|
|
||||||
else: status = f"OK — {days} day(s) remaining" if days is not None else "unknown"
|
|
||||||
|
|
||||||
print(json.dumps({
|
|
||||||
"host": host, "port": port,
|
|
||||||
"subject": flat(cert.get("subject",[])),
|
|
||||||
"issuer": flat(cert.get("issuer",[])),
|
|
||||||
"subject_alt_names": [f"{t}:{v}" for t,v in cert.get("subjectAltName",[])],
|
|
||||||
"not_before": not_before.isoformat() if not_before else "",
|
|
||||||
"not_after": not_after.isoformat() if not_after else "",
|
|
||||||
"days_remaining": days, "is_expired": is_expired, "expiry_status": status,
|
|
||||||
"tls_version": proto, "cipher_suite": cipher[0] if cipher else None,
|
|
||||||
"serial_number": cert.get("serialNumber",""),
|
|
||||||
"ocsp_urls": extract_uris(cert.get("OCSP",[])),
|
|
||||||
"ca_issuers": extract_uris(cert.get("caIssuers",[])),
|
|
||||||
"verification_warning": warning,
|
|
||||||
}, indent=2))
|
|
||||||
|
|
||||||
check_ssl("DOMAIN_HERE")
|
|
||||||
```
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## 3. WHOIS Lookup (100+ TLDs)
|
|
||||||
|
|
||||||
```python
|
|
||||||
import json, socket, re
|
|
||||||
from datetime import datetime, timezone
|
|
||||||
|
|
||||||
WHOIS_SERVERS = {
|
|
||||||
"com":"whois.verisign-grs.com","net":"whois.verisign-grs.com","org":"whois.pir.org",
|
|
||||||
"io":"whois.nic.io","co":"whois.nic.co","ai":"whois.nic.ai","dev":"whois.nic.google",
|
|
||||||
"app":"whois.nic.google","tech":"whois.nic.tech","shop":"whois.nic.shop",
|
|
||||||
"store":"whois.nic.store","online":"whois.nic.online","site":"whois.nic.site",
|
|
||||||
"cloud":"whois.nic.cloud","digital":"whois.nic.digital","media":"whois.nic.media",
|
|
||||||
"blog":"whois.nic.blog","info":"whois.afilias.net","biz":"whois.biz",
|
|
||||||
"me":"whois.nic.me","tv":"whois.nic.tv","cc":"whois.nic.cc","ws":"whois.website.ws",
|
|
||||||
"uk":"whois.nic.uk","co.uk":"whois.nic.uk","de":"whois.denic.de","nl":"whois.domain-registry.nl",
|
|
||||||
"fr":"whois.nic.fr","it":"whois.nic.it","es":"whois.nic.es","pl":"whois.dns.pl",
|
|
||||||
"ru":"whois.tcinet.ru","se":"whois.iis.se","no":"whois.norid.no","fi":"whois.fi",
|
|
||||||
"ch":"whois.nic.ch","at":"whois.nic.at","be":"whois.dns.be","cz":"whois.nic.cz",
|
|
||||||
"br":"whois.registro.br","ca":"whois.cira.ca","mx":"whois.mx","au":"whois.auda.org.au",
|
|
||||||
"jp":"whois.jprs.jp","cn":"whois.cnnic.cn","in":"whois.inregistry.net","kr":"whois.kr",
|
|
||||||
"sg":"whois.sgnic.sg","hk":"whois.hkirc.hk","tr":"whois.nic.tr","ae":"whois.aeda.net.ae",
|
|
||||||
"za":"whois.registry.net.za","ng":"whois.nic.net.ng","ly":"whois.nic.ly",
|
|
||||||
"space":"whois.nic.space","zone":"whois.nic.zone","ninja":"whois.nic.ninja",
|
|
||||||
"guru":"whois.nic.guru","rocks":"whois.nic.rocks","social":"whois.nic.social",
|
|
||||||
"network":"whois.nic.network","global":"whois.nic.global","design":"whois.nic.design",
|
|
||||||
"studio":"whois.nic.studio","agency":"whois.nic.agency","finance":"whois.nic.finance",
|
|
||||||
"legal":"whois.nic.legal","health":"whois.nic.health","green":"whois.nic.green",
|
|
||||||
"city":"whois.nic.city","land":"whois.nic.land","live":"whois.nic.live",
|
|
||||||
"game":"whois.nic.game","games":"whois.nic.games","pw":"whois.nic.pw",
|
|
||||||
"mn":"whois.nic.mn","sh":"whois.nic.sh","gg":"whois.gg","im":"whois.nic.im",
|
|
||||||
}
|
|
||||||
|
|
||||||
def whois_query(domain, server, port=43):
|
|
||||||
with socket.create_connection((server, port), timeout=10) as s:
|
|
||||||
s.sendall((domain+"\r\n").encode())
|
|
||||||
chunks = []
|
|
||||||
while True:
|
|
||||||
c = s.recv(4096)
|
|
||||||
if not c: break
|
|
||||||
chunks.append(c)
|
|
||||||
return b"".join(chunks).decode("utf-8", errors="replace")
|
|
||||||
|
|
||||||
def parse_iso(s):
|
|
||||||
if not s: return None
|
|
||||||
for fmt in ("%Y-%m-%dT%H:%M:%S","%Y-%m-%dT%H:%M:%SZ","%Y-%m-%d %H:%M:%S","%Y-%m-%d"):
|
|
||||||
try: return datetime.strptime(s[:19],fmt).replace(tzinfo=timezone.utc)
|
|
||||||
except ValueError: pass
|
|
||||||
return None
|
|
||||||
|
|
||||||
def whois(domain):
|
|
||||||
parts = domain.split(".")
|
|
||||||
server = WHOIS_SERVERS.get(".".join(parts[-2:])) or WHOIS_SERVERS.get(parts[-1])
|
|
||||||
if not server:
|
|
||||||
print(json.dumps({"error": f"No WHOIS server for .{parts[-1]}"}))
|
|
||||||
return
|
|
||||||
try:
|
|
||||||
raw = whois_query(domain, server)
|
|
||||||
except Exception as e:
|
|
||||||
print(json.dumps({"error": str(e)}))
|
|
||||||
return
|
|
||||||
|
|
||||||
patterns = {
|
|
||||||
"registrar": r"(?:Registrar|registrar):\s*(.+)",
|
|
||||||
"creation_date": r"(?:Creation Date|Created|created):\s*(.+)",
|
|
||||||
"expiration_date": r"(?:Registry Expiry Date|Expiration Date|Expiry Date):\s*(.+)",
|
|
||||||
"updated_date": r"(?:Updated Date|Last Modified):\s*(.+)",
|
|
||||||
"name_servers": r"(?:Name Server|nserver):\s*(.+)",
|
|
||||||
"status": r"(?:Domain Status|status):\s*(.+)",
|
|
||||||
"dnssec": r"DNSSEC:\s*(.+)",
|
|
||||||
}
|
|
||||||
result = {"domain": domain, "whois_server": server}
|
|
||||||
for key, pat in patterns.items():
|
|
||||||
matches = re.findall(pat, raw, re.IGNORECASE)
|
|
||||||
if matches:
|
|
||||||
if key in ("name_servers","status"):
|
|
||||||
result[key] = list(dict.fromkeys(m.strip().lower() for m in matches))
|
|
||||||
else:
|
|
||||||
result[key] = matches[0].strip()
|
|
||||||
for field in ("creation_date","expiration_date","updated_date"):
|
|
||||||
if field in result:
|
|
||||||
dt = parse_iso(result[field][:19])
|
|
||||||
if dt:
|
|
||||||
result[field] = dt.isoformat()
|
|
||||||
if field == "expiration_date":
|
|
||||||
days = (dt - datetime.now(timezone.utc)).days
|
|
||||||
result["expiration_days_remaining"] = days
|
|
||||||
result["is_expired"] = days < 0
|
|
||||||
print(json.dumps(result, indent=2))
|
|
||||||
|
|
||||||
whois("DOMAIN_HERE")
|
|
||||||
```
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## 4. DNS Records
|
|
||||||
|
|
||||||
```python
|
|
||||||
import json, socket, urllib.request, urllib.parse
|
|
||||||
|
|
||||||
def dns(domain, types=None):
|
|
||||||
if not types: types = ["A","AAAA","MX","NS","TXT","CNAME"]
|
|
||||||
records = {}
|
|
||||||
|
|
||||||
for qtype in types:
|
|
||||||
if qtype == "A":
|
|
||||||
try: records["A"] = list(dict.fromkeys(i[4][0] for i in socket.getaddrinfo(domain,None,socket.AF_INET)))
|
|
||||||
except: records["A"] = []
|
|
||||||
elif qtype == "AAAA":
|
|
||||||
try: records["AAAA"] = list(dict.fromkeys(i[4][0] for i in socket.getaddrinfo(domain,None,socket.AF_INET6)))
|
|
||||||
except: records["AAAA"] = []
|
|
||||||
else:
|
|
||||||
url = f"https://dns.google/resolve?name={urllib.parse.quote(domain)}&type={qtype}"
|
|
||||||
try:
|
|
||||||
req = urllib.request.Request(url, headers={"User-Agent":"domain-intel-skill/1.0"})
|
|
||||||
with urllib.request.urlopen(req, timeout=10) as r:
|
|
||||||
data = json.loads(r.read())
|
|
||||||
records[qtype] = [a.get("data","").strip().rstrip(".") for a in data.get("Answer",[]) if a.get("data")]
|
|
||||||
except:
|
|
||||||
records[qtype] = []
|
|
||||||
|
|
||||||
print(json.dumps({"domain": domain, "records": records}, indent=2))
|
|
||||||
|
|
||||||
dns("DOMAIN_HERE")
|
|
||||||
```
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## 5. Domain Availability Check
|
|
||||||
|
|
||||||
```python
|
|
||||||
import json, socket, ssl
|
|
||||||
|
|
||||||
def available(domain):
|
|
||||||
import urllib.request, urllib.parse, re
|
|
||||||
from datetime import datetime, timezone
|
|
||||||
|
|
||||||
signals = {}
|
|
||||||
|
|
||||||
# DNS check
|
|
||||||
try: a = [i[4][0] for i in socket.getaddrinfo(domain,None,socket.AF_INET)]
|
|
||||||
except: a = []
|
|
||||||
try: ns_url = f"https://dns.google/resolve?name={urllib.parse.quote(domain)}&type=NS"
|
|
||||||
req = urllib.request.Request(ns_url, headers={"User-Agent":"domain-intel-skill/1.0"})
|
|
||||||
with urllib.request.urlopen(req, timeout=10) as r:
|
|
||||||
ns = [x.get("data","") for x in json.loads(r.read()).get("Answer",[])]
|
|
||||||
except: ns = []
|
|
||||||
signals["dns_a"] = a
|
|
||||||
signals["dns_ns"] = ns
|
|
||||||
dns_exists = bool(a or ns)
|
|
||||||
|
|
||||||
# SSL check
|
|
||||||
ssl_up = False
|
|
||||||
try:
|
|
||||||
ctx = ssl.create_default_context()
|
|
||||||
ctx.check_hostname = False; ctx.verify_mode = ssl.CERT_NONE
|
|
||||||
with socket.create_connection((domain,443),timeout=3) as s:
|
|
||||||
with ctx.wrap_socket(s, server_hostname=domain): ssl_up = True
|
|
||||||
except: pass
|
|
||||||
signals["ssl_reachable"] = ssl_up
|
|
||||||
|
|
||||||
# WHOIS check (simple)
|
|
||||||
WHOIS = {"com":"whois.verisign-grs.com","net":"whois.verisign-grs.com","org":"whois.pir.org",
|
|
||||||
"io":"whois.nic.io","co":"whois.nic.co","ai":"whois.nic.ai","dev":"whois.nic.google",
|
|
||||||
"me":"whois.nic.me","app":"whois.nic.google","tech":"whois.nic.tech"}
|
|
||||||
tld = domain.rsplit(".",1)[-1]
|
|
||||||
whois_avail = None
|
|
||||||
whois_note = ""
|
|
||||||
server = WHOIS.get(tld)
|
|
||||||
if server:
|
|
||||||
try:
|
|
||||||
with socket.create_connection((server,43),timeout=10) as s:
|
|
||||||
s.sendall((domain+"\r\n").encode())
|
|
||||||
raw = b""
|
|
||||||
while True:
|
|
||||||
c = s.recv(4096)
|
|
||||||
if not c: break
|
|
||||||
raw += c
|
|
||||||
raw = raw.decode("utf-8",errors="replace").lower()
|
|
||||||
if any(p in raw for p in ["no match","not found","no data found","status: free"]):
|
|
||||||
whois_avail = True; whois_note = "WHOIS: not found"
|
|
||||||
elif "registrar:" in raw or "creation date:" in raw:
|
|
||||||
whois_avail = False; whois_note = "WHOIS: registered"
|
|
||||||
else: whois_note = "WHOIS: inconclusive"
|
|
||||||
except Exception as e: whois_note = f"WHOIS error: {e}"
|
|
||||||
signals["whois_available"] = whois_avail
|
|
||||||
signals["whois_note"] = whois_note
|
|
||||||
|
|
||||||
if not dns_exists and whois_avail is True: verdict,conf = "LIKELY AVAILABLE","high"
|
|
||||||
elif dns_exists or whois_avail is False or ssl_up: verdict,conf = "REGISTERED / IN USE","high"
|
|
||||||
elif not dns_exists and whois_avail is None: verdict,conf = "POSSIBLY AVAILABLE","medium"
|
|
||||||
else: verdict,conf = "UNCERTAIN","low"
|
|
||||||
|
|
||||||
print(json.dumps({"domain":domain,"verdict":verdict,"confidence":conf,"signals":signals},indent=2))
|
|
||||||
|
|
||||||
available("DOMAIN_HERE")
|
|
||||||
```
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## 6. Bulk Analysis (Multiple Domains in Parallel)
|
|
||||||
|
|
||||||
```python
|
|
||||||
import json
|
|
||||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
||||||
|
|
||||||
# Paste any of the functions above (check_ssl, whois, dns, available, subdomains)
|
|
||||||
# then use this runner:
|
|
||||||
|
|
||||||
def bulk_check(domains, checks=None, max_workers=5):
|
|
||||||
if not checks: checks = ["ssl", "whois", "dns", "available"]
|
|
||||||
|
|
||||||
def run_one(domain):
|
|
||||||
result = {"domain": domain}
|
|
||||||
# Import/define individual functions above, then:
|
|
||||||
if "ssl" in checks:
|
|
||||||
try: result["ssl"] = json.loads(check_ssl_json(domain))
|
|
||||||
except Exception as e: result["ssl"] = {"error": str(e)}
|
|
||||||
if "whois" in checks:
|
|
||||||
try: result["whois"] = json.loads(whois_json(domain))
|
|
||||||
except Exception as e: result["whois"] = {"error": str(e)}
|
|
||||||
if "dns" in checks:
|
|
||||||
try: result["dns"] = json.loads(dns_json(domain))
|
|
||||||
except Exception as e: result["dns"] = {"error": str(e)}
|
|
||||||
if "available" in checks:
|
|
||||||
try: result["available"] = json.loads(available_json(domain))
|
|
||||||
except Exception as e: result["available"] = {"error": str(e)}
|
|
||||||
return result
|
|
||||||
|
|
||||||
results = []
|
|
||||||
with ThreadPoolExecutor(max_workers=min(max_workers,10)) as ex:
|
|
||||||
futures = {ex.submit(run_one, d): d for d in domains[:20]}
|
|
||||||
for f in as_completed(futures):
|
|
||||||
results.append(f.result())
|
|
||||||
|
|
||||||
print(json.dumps({"total": len(results), "checks": checks, "results": results}, indent=2))
|
|
||||||
```
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## Quick Reference
|
|
||||||
|
|
||||||
| Task | What to run |
|
|
||||||
|------|-------------|
|
|
||||||
| Find subdomains | Snippet 1 — replace `DOMAIN_HERE` |
|
|
||||||
| Check SSL cert | Snippet 2 — replace `DOMAIN_HERE` |
|
|
||||||
| WHOIS lookup | Snippet 3 — replace `DOMAIN_HERE` |
|
|
||||||
| DNS records | Snippet 4 — replace `DOMAIN_HERE` |
|
|
||||||
| Is domain available? | Snippet 5 — replace `DOMAIN_HERE` |
|
|
||||||
| Bulk check 20 domains | Snippet 6 |
|
|
||||||
|
|
||||||
## Notes
|
## Notes
|
||||||
|
|
||||||
- All requests are **passive** — no active scanning, no packets sent to target hosts (except SSL check which makes a TCP connection)
|
- WHOIS queries use TCP port 43 — may be blocked on restrictive networks
|
||||||
- `subdomains` only queries crt.sh — the target domain is never contacted
|
- Some WHOIS servers redact registrant info (GDPR) — mention this to the user
|
||||||
- WHOIS queries go to registrar servers, not the target
|
- crt.sh can be slow for very popular domains (thousands of certs) — set reasonable expectations
|
||||||
- Results are structured JSON — summarize key findings for the user
|
- The availability check is heuristic-based (3 passive signals) — not authoritative like a registrar API
|
||||||
- For expired cert warnings or WHOIS redaction, mention these to the user as notable findings
|
|
||||||
|
---
|
||||||
|
|
||||||
|
*Contributed by [@FurkanL0](https://github.com/FurkanL0)*
|
||||||
|
|
|
||||||
397
skills/domain/domain-intel/scripts/domain_intel.py
Normal file
397
skills/domain/domain-intel/scripts/domain_intel.py
Normal file
|
|
@ -0,0 +1,397 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Domain Intelligence — Passive OSINT via Python stdlib.
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
python domain_intel.py subdomains example.com
|
||||||
|
python domain_intel.py ssl example.com
|
||||||
|
python domain_intel.py whois example.com
|
||||||
|
python domain_intel.py dns example.com
|
||||||
|
python domain_intel.py available example.com
|
||||||
|
python domain_intel.py bulk example.com github.com google.com --checks ssl,dns
|
||||||
|
|
||||||
|
All output is structured JSON. No dependencies beyond Python stdlib.
|
||||||
|
Works on Linux, macOS, and Windows.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import re
|
||||||
|
import socket
|
||||||
|
import ssl
|
||||||
|
import sys
|
||||||
|
import urllib.request
|
||||||
|
import urllib.parse
|
||||||
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
|
||||||
|
|
||||||
|
# ─── Subdomain Discovery (crt.sh) ──────────────────────────────────────────
|
||||||
|
|
||||||
|
def subdomains(domain, include_expired=False, limit=200):
|
||||||
|
"""Find subdomains via Certificate Transparency logs."""
|
||||||
|
url = f"https://crt.sh/?q=%25.{urllib.parse.quote(domain)}&output=json"
|
||||||
|
req = urllib.request.Request(url, headers={
|
||||||
|
"User-Agent": "domain-intel-skill/1.0", "Accept": "application/json",
|
||||||
|
})
|
||||||
|
with urllib.request.urlopen(req, timeout=15) as r:
|
||||||
|
entries = json.loads(r.read().decode())
|
||||||
|
|
||||||
|
seen, results = set(), []
|
||||||
|
now = datetime.now(timezone.utc)
|
||||||
|
for e in entries:
|
||||||
|
not_after = e.get("not_after", "")
|
||||||
|
if not include_expired and not_after:
|
||||||
|
try:
|
||||||
|
dt = datetime.strptime(not_after[:19], "%Y-%m-%dT%H:%M:%S").replace(tzinfo=timezone.utc)
|
||||||
|
if dt <= now:
|
||||||
|
continue
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
for name in e.get("name_value", "").splitlines():
|
||||||
|
name = name.strip().lower()
|
||||||
|
if name and name not in seen:
|
||||||
|
seen.add(name)
|
||||||
|
results.append({
|
||||||
|
"subdomain": name,
|
||||||
|
"issuer": e.get("issuer_name", ""),
|
||||||
|
"not_after": not_after,
|
||||||
|
})
|
||||||
|
|
||||||
|
results.sort(key=lambda r: (r["subdomain"].startswith("*"), r["subdomain"]))
|
||||||
|
return {"domain": domain, "count": min(len(results), limit), "subdomains": results[:limit]}
|
||||||
|
|
||||||
|
|
||||||
|
# ─── SSL Certificate Inspection ────────────────────────────────────────────
|
||||||
|
|
||||||
|
def check_ssl(host, port=443, timeout=10):
|
||||||
|
"""Inspect the TLS certificate of a host."""
|
||||||
|
def flat(rdns):
|
||||||
|
r = {}
|
||||||
|
for rdn in rdns:
|
||||||
|
for item in rdn:
|
||||||
|
if isinstance(item, (list, tuple)) and len(item) == 2:
|
||||||
|
r[item[0]] = item[1]
|
||||||
|
return r
|
||||||
|
|
||||||
|
def parse_date(s):
|
||||||
|
for fmt in ("%b %d %H:%M:%S %Y %Z", "%b %d %H:%M:%S %Y %Z"):
|
||||||
|
try:
|
||||||
|
return datetime.strptime(s, fmt).replace(tzinfo=timezone.utc)
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
return None
|
||||||
|
|
||||||
|
warning = None
|
||||||
|
try:
|
||||||
|
ctx = ssl.create_default_context()
|
||||||
|
with socket.create_connection((host, port), timeout=timeout) as sock:
|
||||||
|
with ctx.wrap_socket(sock, server_hostname=host) as s:
|
||||||
|
cert, cipher, proto = s.getpeercert(), s.cipher(), s.version()
|
||||||
|
except ssl.SSLCertVerificationError as e:
|
||||||
|
warning = str(e)
|
||||||
|
ctx = ssl.create_default_context()
|
||||||
|
ctx.check_hostname = False
|
||||||
|
ctx.verify_mode = ssl.CERT_NONE
|
||||||
|
with socket.create_connection((host, port), timeout=timeout) as sock:
|
||||||
|
with ctx.wrap_socket(sock, server_hostname=host) as s:
|
||||||
|
cert, cipher, proto = s.getpeercert(), s.cipher(), s.version()
|
||||||
|
|
||||||
|
not_after = parse_date(cert.get("notAfter", ""))
|
||||||
|
now = datetime.now(timezone.utc)
|
||||||
|
days = (not_after - now).days if not_after else None
|
||||||
|
is_expired = days is not None and days < 0
|
||||||
|
|
||||||
|
if is_expired:
|
||||||
|
status = f"EXPIRED ({abs(days)} days ago)"
|
||||||
|
elif days is not None and days <= 14:
|
||||||
|
status = f"CRITICAL — {days} day(s) left"
|
||||||
|
elif days is not None and days <= 30:
|
||||||
|
status = f"WARNING — {days} day(s) left"
|
||||||
|
else:
|
||||||
|
status = f"OK — {days} day(s) remaining" if days is not None else "unknown"
|
||||||
|
|
||||||
|
return {
|
||||||
|
"host": host, "port": port,
|
||||||
|
"subject": flat(cert.get("subject", [])),
|
||||||
|
"issuer": flat(cert.get("issuer", [])),
|
||||||
|
"subject_alt_names": [f"{t}:{v}" for t, v in cert.get("subjectAltName", [])],
|
||||||
|
"not_before": parse_date(cert.get("notBefore", "")).isoformat() if parse_date(cert.get("notBefore", "")) else "",
|
||||||
|
"not_after": not_after.isoformat() if not_after else "",
|
||||||
|
"days_remaining": days, "is_expired": is_expired, "expiry_status": status,
|
||||||
|
"tls_version": proto,
|
||||||
|
"cipher_suite": cipher[0] if cipher else None,
|
||||||
|
"serial_number": cert.get("serialNumber", ""),
|
||||||
|
"verification_warning": warning,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# ─── WHOIS Lookup ──────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
WHOIS_SERVERS = {
|
||||||
|
"com": "whois.verisign-grs.com", "net": "whois.verisign-grs.com",
|
||||||
|
"org": "whois.pir.org", "io": "whois.nic.io", "co": "whois.nic.co",
|
||||||
|
"ai": "whois.nic.ai", "dev": "whois.nic.google", "app": "whois.nic.google",
|
||||||
|
"tech": "whois.nic.tech", "shop": "whois.nic.shop", "store": "whois.nic.store",
|
||||||
|
"online": "whois.nic.online", "site": "whois.nic.site", "cloud": "whois.nic.cloud",
|
||||||
|
"digital": "whois.nic.digital", "media": "whois.nic.media", "blog": "whois.nic.blog",
|
||||||
|
"info": "whois.afilias.net", "biz": "whois.biz", "me": "whois.nic.me",
|
||||||
|
"tv": "whois.nic.tv", "cc": "whois.nic.cc", "ws": "whois.website.ws",
|
||||||
|
"uk": "whois.nic.uk", "co.uk": "whois.nic.uk", "de": "whois.denic.de",
|
||||||
|
"nl": "whois.domain-registry.nl", "fr": "whois.nic.fr", "it": "whois.nic.it",
|
||||||
|
"es": "whois.nic.es", "pl": "whois.dns.pl", "ru": "whois.tcinet.ru",
|
||||||
|
"se": "whois.iis.se", "no": "whois.norid.no", "fi": "whois.fi",
|
||||||
|
"ch": "whois.nic.ch", "at": "whois.nic.at", "be": "whois.dns.be",
|
||||||
|
"cz": "whois.nic.cz", "br": "whois.registro.br", "ca": "whois.cira.ca",
|
||||||
|
"mx": "whois.mx", "au": "whois.auda.org.au", "jp": "whois.jprs.jp",
|
||||||
|
"cn": "whois.cnnic.cn", "in": "whois.inregistry.net", "kr": "whois.kr",
|
||||||
|
"sg": "whois.sgnic.sg", "hk": "whois.hkirc.hk", "tr": "whois.nic.tr",
|
||||||
|
"ae": "whois.aeda.net.ae", "za": "whois.registry.net.za",
|
||||||
|
"space": "whois.nic.space", "zone": "whois.nic.zone", "ninja": "whois.nic.ninja",
|
||||||
|
"guru": "whois.nic.guru", "rocks": "whois.nic.rocks", "live": "whois.nic.live",
|
||||||
|
"game": "whois.nic.game", "games": "whois.nic.games",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def whois_lookup(domain):
|
||||||
|
"""Query WHOIS servers for domain registration info."""
|
||||||
|
parts = domain.split(".")
|
||||||
|
server = WHOIS_SERVERS.get(".".join(parts[-2:])) or WHOIS_SERVERS.get(parts[-1])
|
||||||
|
if not server:
|
||||||
|
return {"error": f"No WHOIS server for .{parts[-1]}"}
|
||||||
|
|
||||||
|
try:
|
||||||
|
with socket.create_connection((server, 43), timeout=10) as s:
|
||||||
|
s.sendall((domain + "\r\n").encode())
|
||||||
|
chunks = []
|
||||||
|
while True:
|
||||||
|
c = s.recv(4096)
|
||||||
|
if not c:
|
||||||
|
break
|
||||||
|
chunks.append(c)
|
||||||
|
raw = b"".join(chunks).decode("utf-8", errors="replace")
|
||||||
|
except Exception as e:
|
||||||
|
return {"error": str(e)}
|
||||||
|
|
||||||
|
patterns = {
|
||||||
|
"registrar": r"(?:Registrar|registrar):\s*(.+)",
|
||||||
|
"creation_date": r"(?:Creation Date|Created|created):\s*(.+)",
|
||||||
|
"expiration_date": r"(?:Registry Expiry Date|Expiration Date|Expiry Date):\s*(.+)",
|
||||||
|
"updated_date": r"(?:Updated Date|Last Modified):\s*(.+)",
|
||||||
|
"name_servers": r"(?:Name Server|nserver):\s*(.+)",
|
||||||
|
"status": r"(?:Domain Status|status):\s*(.+)",
|
||||||
|
"dnssec": r"DNSSEC:\s*(.+)",
|
||||||
|
}
|
||||||
|
result = {"domain": domain, "whois_server": server}
|
||||||
|
for key, pat in patterns.items():
|
||||||
|
matches = re.findall(pat, raw, re.IGNORECASE)
|
||||||
|
if matches:
|
||||||
|
if key in ("name_servers", "status"):
|
||||||
|
result[key] = list(dict.fromkeys(m.strip().lower() for m in matches))
|
||||||
|
else:
|
||||||
|
result[key] = matches[0].strip()
|
||||||
|
|
||||||
|
for field in ("creation_date", "expiration_date", "updated_date"):
|
||||||
|
if field in result:
|
||||||
|
for fmt in ("%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%d %H:%M:%S", "%Y-%m-%d"):
|
||||||
|
try:
|
||||||
|
dt = datetime.strptime(result[field][:19], fmt).replace(tzinfo=timezone.utc)
|
||||||
|
result[field] = dt.isoformat()
|
||||||
|
if field == "expiration_date":
|
||||||
|
days = (dt - datetime.now(timezone.utc)).days
|
||||||
|
result["expiration_days_remaining"] = days
|
||||||
|
result["is_expired"] = days < 0
|
||||||
|
break
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
# ─── DNS Records ───────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def dns_records(domain, types=None):
|
||||||
|
"""Resolve DNS records using system DNS + Google DoH."""
|
||||||
|
if not types:
|
||||||
|
types = ["A", "AAAA", "MX", "NS", "TXT", "CNAME"]
|
||||||
|
records = {}
|
||||||
|
|
||||||
|
for qtype in types:
|
||||||
|
if qtype == "A":
|
||||||
|
try:
|
||||||
|
records["A"] = list(dict.fromkeys(
|
||||||
|
i[4][0] for i in socket.getaddrinfo(domain, None, socket.AF_INET)
|
||||||
|
))
|
||||||
|
except Exception:
|
||||||
|
records["A"] = []
|
||||||
|
elif qtype == "AAAA":
|
||||||
|
try:
|
||||||
|
records["AAAA"] = list(dict.fromkeys(
|
||||||
|
i[4][0] for i in socket.getaddrinfo(domain, None, socket.AF_INET6)
|
||||||
|
))
|
||||||
|
except Exception:
|
||||||
|
records["AAAA"] = []
|
||||||
|
else:
|
||||||
|
url = f"https://dns.google/resolve?name={urllib.parse.quote(domain)}&type={qtype}"
|
||||||
|
try:
|
||||||
|
req = urllib.request.Request(url, headers={"User-Agent": "domain-intel-skill/1.0"})
|
||||||
|
with urllib.request.urlopen(req, timeout=10) as r:
|
||||||
|
data = json.loads(r.read())
|
||||||
|
records[qtype] = [
|
||||||
|
a.get("data", "").strip().rstrip(".")
|
||||||
|
for a in data.get("Answer", []) if a.get("data")
|
||||||
|
]
|
||||||
|
except Exception:
|
||||||
|
records[qtype] = []
|
||||||
|
|
||||||
|
return {"domain": domain, "records": records}
|
||||||
|
|
||||||
|
|
||||||
|
# ─── Domain Availability Check ─────────────────────────────────────────────
|
||||||
|
|
||||||
|
def check_available(domain):
|
||||||
|
"""Check domain availability using passive signals (DNS + WHOIS + SSL)."""
|
||||||
|
signals = {}
|
||||||
|
|
||||||
|
# DNS
|
||||||
|
try:
|
||||||
|
a = [i[4][0] for i in socket.getaddrinfo(domain, None, socket.AF_INET)]
|
||||||
|
except Exception:
|
||||||
|
a = []
|
||||||
|
|
||||||
|
try:
|
||||||
|
ns_url = f"https://dns.google/resolve?name={urllib.parse.quote(domain)}&type=NS"
|
||||||
|
req = urllib.request.Request(ns_url, headers={"User-Agent": "domain-intel-skill/1.0"})
|
||||||
|
with urllib.request.urlopen(req, timeout=10) as r:
|
||||||
|
ns = [x.get("data", "") for x in json.loads(r.read()).get("Answer", [])]
|
||||||
|
except Exception:
|
||||||
|
ns = []
|
||||||
|
|
||||||
|
signals["dns_a"] = a
|
||||||
|
signals["dns_ns"] = ns
|
||||||
|
dns_exists = bool(a or ns)
|
||||||
|
|
||||||
|
# SSL
|
||||||
|
ssl_up = False
|
||||||
|
try:
|
||||||
|
ctx = ssl.create_default_context()
|
||||||
|
ctx.check_hostname = False
|
||||||
|
ctx.verify_mode = ssl.CERT_NONE
|
||||||
|
with socket.create_connection((domain, 443), timeout=3) as s:
|
||||||
|
with ctx.wrap_socket(s, server_hostname=domain):
|
||||||
|
ssl_up = True
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
signals["ssl_reachable"] = ssl_up
|
||||||
|
|
||||||
|
# WHOIS (quick check)
|
||||||
|
tld = domain.rsplit(".", 1)[-1]
|
||||||
|
server = WHOIS_SERVERS.get(tld)
|
||||||
|
whois_avail = None
|
||||||
|
whois_note = ""
|
||||||
|
if server:
|
||||||
|
try:
|
||||||
|
with socket.create_connection((server, 43), timeout=10) as s:
|
||||||
|
s.sendall((domain + "\r\n").encode())
|
||||||
|
raw = b""
|
||||||
|
while True:
|
||||||
|
c = s.recv(4096)
|
||||||
|
if not c:
|
||||||
|
break
|
||||||
|
raw += c
|
||||||
|
raw = raw.decode("utf-8", errors="replace").lower()
|
||||||
|
if any(p in raw for p in ["no match", "not found", "no data found", "status: free"]):
|
||||||
|
whois_avail = True
|
||||||
|
whois_note = "WHOIS: not found"
|
||||||
|
elif "registrar:" in raw or "creation date:" in raw:
|
||||||
|
whois_avail = False
|
||||||
|
whois_note = "WHOIS: registered"
|
||||||
|
else:
|
||||||
|
whois_note = "WHOIS: inconclusive"
|
||||||
|
except Exception as e:
|
||||||
|
whois_note = f"WHOIS error: {e}"
|
||||||
|
|
||||||
|
signals["whois_available"] = whois_avail
|
||||||
|
signals["whois_note"] = whois_note
|
||||||
|
|
||||||
|
if not dns_exists and whois_avail is True:
|
||||||
|
verdict, conf = "LIKELY AVAILABLE", "high"
|
||||||
|
elif dns_exists or whois_avail is False or ssl_up:
|
||||||
|
verdict, conf = "REGISTERED / IN USE", "high"
|
||||||
|
elif not dns_exists and whois_avail is None:
|
||||||
|
verdict, conf = "POSSIBLY AVAILABLE", "medium"
|
||||||
|
else:
|
||||||
|
verdict, conf = "UNCERTAIN", "low"
|
||||||
|
|
||||||
|
return {"domain": domain, "verdict": verdict, "confidence": conf, "signals": signals}
|
||||||
|
|
||||||
|
|
||||||
|
# ─── Bulk Analysis ─────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
COMMAND_MAP = {
|
||||||
|
"subdomains": subdomains,
|
||||||
|
"ssl": check_ssl,
|
||||||
|
"whois": whois_lookup,
|
||||||
|
"dns": dns_records,
|
||||||
|
"available": check_available,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def bulk_check(domains, checks=None, max_workers=5):
|
||||||
|
"""Run multiple checks across multiple domains in parallel."""
|
||||||
|
if not checks:
|
||||||
|
checks = ["ssl", "whois", "dns"]
|
||||||
|
|
||||||
|
def run_one(d):
|
||||||
|
entry = {"domain": d}
|
||||||
|
for check in checks:
|
||||||
|
fn = COMMAND_MAP.get(check)
|
||||||
|
if fn:
|
||||||
|
try:
|
||||||
|
entry[check] = fn(d)
|
||||||
|
except Exception as e:
|
||||||
|
entry[check] = {"error": str(e)}
|
||||||
|
return entry
|
||||||
|
|
||||||
|
results = []
|
||||||
|
with ThreadPoolExecutor(max_workers=min(max_workers, 10)) as ex:
|
||||||
|
futures = {ex.submit(run_one, d): d for d in domains[:20]}
|
||||||
|
for f in as_completed(futures):
|
||||||
|
results.append(f.result())
|
||||||
|
|
||||||
|
return {"total": len(results), "checks": checks, "results": results}
|
||||||
|
|
||||||
|
|
||||||
|
# ─── CLI Entry Point ───────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def main():
|
||||||
|
if len(sys.argv) < 3:
|
||||||
|
print(__doc__)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
command = sys.argv[1].lower()
|
||||||
|
args = sys.argv[2:]
|
||||||
|
|
||||||
|
if command == "bulk":
|
||||||
|
# Parse --checks flag
|
||||||
|
checks = None
|
||||||
|
domains = []
|
||||||
|
i = 0
|
||||||
|
while i < len(args):
|
||||||
|
if args[i] == "--checks" and i + 1 < len(args):
|
||||||
|
checks = [c.strip() for c in args[i + 1].split(",")]
|
||||||
|
i += 2
|
||||||
|
else:
|
||||||
|
domains.append(args[i])
|
||||||
|
i += 1
|
||||||
|
result = bulk_check(domains, checks)
|
||||||
|
elif command in COMMAND_MAP:
|
||||||
|
result = COMMAND_MAP[command](args[0])
|
||||||
|
else:
|
||||||
|
print(f"Unknown command: {command}")
|
||||||
|
print(f"Available: {', '.join(COMMAND_MAP.keys())}, bulk")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
print(json.dumps(result, indent=2))
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
Loading…
Add table
Add a link
Reference in a new issue