File size: 6,570 Bytes
14fecff
4038d7a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f4b85eb
4038d7a
 
e99affa
4038d7a
 
f4b85eb
 
 
4038d7a
f4b85eb
 
 
 
 
 
 
4038d7a
f4b85eb
 
 
e99affa
4038d7a
e99affa
f4b85eb
4038d7a
 
 
 
 
 
f4b85eb
4038d7a
 
f4b85eb
4038d7a
 
 
 
 
f4b85eb
4038d7a
 
 
 
 
f4b85eb
4038d7a
 
 
 
 
f4b85eb
4038d7a
 
f4b85eb
4038d7a
 
f4b85eb
4038d7a
 
 
 
 
49f1a98
14fecff
4038d7a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
292c399
4038d7a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d76bad0
4038d7a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f4b85eb
4038d7a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
import re
import difflib
import whois
from datetime import datetime

# Official brand domains (extend as needed)
BRAND_OFFICIAL = {
    "paypal": ["paypal.com"],
    "amazon": ["amazon.com"],
    "google": ["google.com", "gmail.com"],
    "microsoft": ["microsoft.com", "outlook.com", "live.com"],
    "apple": ["apple.com"],
    "flowtoscale": ["flowtoscale.com"],  # Example from your case
}

# Suspicious / cheap TLDs often abused
SUSPICIOUS_TLDS = {"info", "xyz", "top", "click", "work", "loan", "tk"}


def get_domain_age_days(domain: str):
    """Return domain age in days (or None if lookup fails)."""
    try:
        w = whois.whois(domain)
        creation_date = w.creation_date

        # Handle weird formats
        if isinstance(creation_date, list) and creation_date:
            creation_date = creation_date[0]
        if isinstance(creation_date, str):
            try:
                creation_date = datetime.fromisoformat(creation_date)
            except Exception:
                creation_date = None

        if creation_date and isinstance(creation_date, datetime):
            return (datetime.now() - creation_date).days
    except Exception as e:
        # Do not crash if WHOIS fails on Hugging Face
        print(f"[WHOIS ERROR] Could not fetch age for {domain}: {e}")
        return None
    return None


def parse_auth_results(auth_header: str):
    """
    Parse the Authentication-Results header and return a readable summary.
    """
    auth_header = (auth_header or "").lower()
    findings = []

    if not auth_header:
        return "No Authentication-Results header found"

    # SPF
    if "spf=pass" in auth_header:
        findings.append("SPF passed")
    elif "spf=fail" in auth_header:
        findings.append("SPF failed")

    # DKIM
    if "dkim=pass" in auth_header:
        findings.append("DKIM passed")
    elif "dkim=fail" in auth_header or "dkim=permerror" in auth_header:
        findings.append("DKIM failed")

    # DMARC
    if "dmarc=pass" in auth_header:
        findings.append("DMARC passed")
    elif "dmarc=fail" in auth_header:
        findings.append("DMARC failed")

    if not findings:
        return "Authentication results unclear or missing"

    return ", ".join(findings)


def analyze_headers(headers, body=""):
    """
    Input: headers dict, optional body text
    Output: (findings: list[str], score: int, auth_summary: str)
    """
    findings = []
    score = 0
    headers = headers or {}

    auth_results = (headers.get("Authentication-Results") or headers.get("Authentication-results") or "").lower()

    # Strict auth failures
    if "dkim=fail" in auth_results or "dkim=permerror" in auth_results:
        findings.append("Header: DKIM check failed")
        score += 30
    if "spf=fail" in auth_results:
        findings.append("Header: SPF check failed")
        score += 30
    if "dmarc=fail" in auth_results:
        findings.append("Header: DMARC check failed")
        score += 30

    # Softer auth problems
    if any(x in auth_results for x in ["spf=softfail", "spf=neutral", "spf=none"]):
        findings.append("Header: SPF not properly aligned")
        score += 10
    if any(x in auth_results for x in ["dmarc=temperror", "dkim=temperror"]):
        findings.append("Header: Temporary auth errors (DKIM/DMARC)")
        score += 5

    # From and Reply-To domain compare
    from_addr = headers.get("From", "") or ""
    reply_to = headers.get("Reply-To", "") or ""
    from_domain_m = re.search(r'@([a-zA-Z0-9.-]+)', from_addr)
    reply_domain_m = re.search(r'@([a-zA-Z0-9.-]+)', reply_to)
    if from_domain_m and reply_domain_m:
        from_domain = from_domain_m.group(1).lower()
        reply_domain = reply_domain_m.group(1).lower()
        if from_domain != reply_domain:
            findings.append(f"Header: Reply-To domain mismatch (From: {from_domain}, Reply-To: {reply_domain})")
            score += 20
    else:
        from_domain = from_domain_m.group(1).lower() if from_domain_m else ""

    # Sender domain analysis
    if from_domain:
        parts = from_domain.split('.')
        tld = parts[-1]

        # free provider detection
        if from_domain in ["gmail.com", "yahoo.com", "outlook.com", "hotmail.com"]:
            findings.append(f"Header: Free email provider used ({from_domain})")
            score += 8

        # suspicious domain structure
        if len(parts) > 4 or (parts and any(ch.isdigit() for ch in parts[0])):
            findings.append(f"Header: Suspicious-looking domain structure ({from_domain})")
            score += 15

        # suspicious TLD
        if tld in SUSPICIOUS_TLDS:
            findings.append(f"Header: Suspicious/abused TLD used ({tld})")
            score += 20

        # Domain age check (robust)
        age_days = get_domain_age_days(from_domain)
        if age_days is not None and age_days < 90:
            findings.append(f"Header: Domain {from_domain} is very new ({age_days} days old)")
            score += 35

        # brand-squatting / look-alike check
        for brand, official_list in BRAND_OFFICIAL.items():
            if brand in from_domain:
                is_official = any(
                    from_domain.endswith("." + off) or from_domain == off
                    for off in official_list
                )
                if not is_official:
                    findings.append(f"Header: Domain contains brand '{brand}' but is not official ({from_domain})")
                    score += 30

            # fuzzy look-alike
            for legit in official_list:
                ratio = difflib.SequenceMatcher(None, from_domain, legit).ratio()
                if ratio > 0.7 and from_domain != legit:
                    findings.append(f"Header: Possible look-alike spoofing ({from_domain} vs {legit})")
                    score += 40

        # Content-to-domain mismatch (organization spoofing)
        if body and "ravenmail" in body.lower() and "ravenmail" not in from_domain:
            findings.append("Header/Content: Possible spoofing — mentions RavenMail but sender domain is unrelated")
            score += 40

    # Bcc usage
    if headers.get("Bcc") or headers.get("bcc"):
        findings.append("Header: Email sent with BCC (common in mass phishing)")
        score += 12

    if not findings:
        return ["No suspicious issues found in headers."], 0, "No Authentication-Results header found"

    # Return findings, cumulative score, and parsed authentication summary
    return findings, score, parse_auth_results(auth_results)