princemaxp commited on
Commit
4038d7a
·
verified ·
1 Parent(s): e99affa

Update header_analyzer.py

Browse files
Files changed (1) hide show
  1. header_analyzer.py +158 -156
header_analyzer.py CHANGED
@@ -1,165 +1,167 @@
1
- # body_analyzer.py
2
- import os
3
  import re
4
- import requests
5
- from typing import List
6
-
7
- HF_API_KEY = os.getenv("HF_API_KEY")
8
- HF_HEADERS = {"Authorization": f"Bearer {HF_API_KEY}"} if HF_API_KEY else {}
9
- HF_TIMEOUT = 20 # seconds
10
-
11
- # ML model names
12
- PHISHING_MODEL = "cybersectony/phishing-email-detection-distilbert_v2.4.1"
13
- ZERO_SHOT_MODEL = "facebook/bart-large-mnli" # for intent/behavior
14
-
15
- # Suspicious phrase patterns
16
- SUSPICIOUS_PATTERNS = [
17
- "verify your account",
18
- "urgent action",
19
- "click here",
20
- "reset password",
21
- "confirm your identity",
22
- "bank account",
23
- "invoice",
24
- "payment required",
25
- "unauthorized login",
26
- "compromised",
27
- "final reminder",
28
- "account suspended",
29
- "account deactivated",
30
- "update your information",
31
- "legal action",
32
- "limited time offer",
33
- "claim your prize",
34
- "verify immediately",
35
- "verify now",
36
- "verify your credentials",
37
- ]
38
-
39
- # Zero-shot candidate labels for intent/behavior
40
- BEHAVIOR_LABELS = [
41
- "credential harvesting",
42
- "invoice/payment fraud",
43
- "marketing",
44
- "benign",
45
- "malware",
46
- "account takeover",
47
- ]
48
-
49
- def _call_hf_text_model(model_name: str, text: str):
50
- if not HF_API_KEY:
51
- return None
52
- try:
53
- payload = {"inputs": text}
54
- res = requests.post(
55
- f"https://api-inference.huggingface.co/models/{model_name}",
56
- headers=HF_HEADERS,
57
- json=payload,
58
- timeout=HF_TIMEOUT,
59
- )
60
- return res.json()
61
- except Exception:
62
- return None
63
-
64
- def _call_hf_zero_shot(text: str, candidate_labels: List[str]):
65
- if not HF_API_KEY:
66
- return None
67
  try:
68
- payload = {"inputs": text, "parameters": {"candidate_labels": candidate_labels}}
69
- res = requests.post(
70
- f"https://api-inference.huggingface.co/models/{ZERO_SHOT_MODEL}",
71
- headers=HF_HEADERS,
72
- json=payload,
73
- timeout=HF_TIMEOUT,
74
- )
75
- return res.json()
76
  except Exception:
77
  return None
 
78
 
79
- def _parse_hf_phishing_model_output(result):
80
- if not result:
81
- return None, 0.0, {}
82
- if isinstance(result, list) and result and isinstance(result[0], dict):
83
- r0 = result[0]
84
- label = r0.get("label")
85
- score = r0.get("score", 0.0)
86
- return label, float(score), {label: float(score)}
87
- if isinstance(result, dict):
88
- labels = result.get("labels") or result.get("label") or []
89
- scores = result.get("scores") or result.get("score") or []
90
- if isinstance(labels, list) and isinstance(scores, list) and labels and scores:
91
- all_probs = {lab: float(sc) for lab, sc in zip(labels, scores)}
92
- max_lab = max(all_probs.items(), key=lambda x: x[1])
93
- return max_lab[0], float(max_lab[1]), all_probs
94
- return None, 0.0, {}
95
-
96
- def analyze_body(subject: str, body: str, urls: list, images: list):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
97
  findings = []
98
  score = 0
99
- highlighted_body = (body or "")
100
-
101
- combined_lower = ((subject or "") + "\n" + (body or "")).lower()
102
- for pattern in SUSPICIOUS_PATTERNS:
103
- if pattern in combined_lower:
104
- findings.append(f"Suspicious phrase detected: \"{pattern}\"")
105
- score += 18
106
- try:
107
- highlighted_body = re.sub(re.escape(pattern), f"<mark>{pattern}</mark>", highlighted_body, flags=re.IGNORECASE)
108
- except Exception:
109
- pass
110
-
111
- # URL checks
112
- for u in urls or []:
113
- findings.append(f"Suspicious URL detected: {u}")
 
 
 
114
  score += 10
115
- try:
116
- highlighted_body = re.sub(re.escape(u), f"<mark>{u}</mark>", highlighted_body, flags=re.IGNORECASE)
117
- except Exception:
118
- pass
119
-
120
- # ML phishing model
121
- ml_label = None
122
- ml_conf = 0.0
123
- model_input = "\n".join([subject or "", body or "", "\n".join(urls or [])]).strip()
124
- if model_input and HF_API_KEY:
125
- raw = _call_hf_text_model(PHISHING_MODEL, model_input)
126
- label, conf, _ = _parse_hf_phishing_model_output(raw)
127
- if label:
128
- ml_label = label
129
- ml_conf = conf
130
- findings.append(f"HuggingFace phishing model → {label} (conf {conf:.2f})")
131
- score += int(conf * 100 * 0.9)
132
-
133
- # Zero-shot behavior
134
- behavior = None
135
- behavior_conf = 0.0
136
- if HF_API_KEY and model_input:
137
- zs = _call_hf_zero_shot(model_input, BEHAVIOR_LABELS)
138
- try:
139
- if isinstance(zs, dict) and "labels" in zs and "scores" in zs:
140
- behavior = zs["labels"][0]
141
- behavior_conf = float(zs["scores"][0])
142
- findings.append(f"Behavior inference → {behavior} (conf {behavior_conf:.2f})")
143
- if behavior_conf >= 0.7:
144
- score += int(behavior_conf * 30)
145
- except Exception:
146
- pass
147
-
148
- if ml_conf >= 0.8 and ("phishing" in (ml_label or "").lower()):
149
- score = max(score, 80)
150
-
151
- score = int(max(0, min(score, 100)))
152
-
153
- # Verdict
154
- if score >= 70:
155
- verdict = "🚨 Malicious"
156
- elif 50 <= score < 70:
157
- verdict = "⚠️ Suspicious"
158
- elif 30 <= score < 50:
159
- verdict = "📩 Spam"
160
  else:
161
- verdict = "✅ Safe"
162
- findings.append("No strong phishing signals detected by models/heuristics.")
163
-
164
- # Return exactly 4 values
165
- return findings, score, highlighted_body, verdict
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import re
2
+ import difflib
3
+ import whois
4
+ from datetime import datetime
5
+
6
+ # Official brand domains (extend as needed)
7
+ BRAND_OFFICIAL = {
8
+ "paypal": ["paypal.com"],
9
+ "amazon": ["amazon.com"],
10
+ "google": ["google.com", "gmail.com"],
11
+ "microsoft": ["microsoft.com", "outlook.com", "live.com"],
12
+ "apple": ["apple.com"],
13
+ "flowtoscale": ["flowtoscale.com"], # Example from your case
14
+ }
15
+
16
+ # Suspicious / cheap TLDs often abused
17
+ SUSPICIOUS_TLDS = {"info", "xyz", "top", "click", "work", "loan", "tk"}
18
+
19
+ def get_domain_age_days(domain: str):
20
+ """Return domain age in days (or None if lookup fails)."""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
21
  try:
22
+ w = whois.whois(domain)
23
+ creation_date = w.creation_date
24
+ if isinstance(creation_date, list): # sometimes returns list
25
+ creation_date = creation_date[0]
26
+ if creation_date:
27
+ return (datetime.now() - creation_date).days
 
 
28
  except Exception:
29
  return None
30
+ return None
31
 
32
+ def parse_auth_results(auth_header: str):
33
+ """
34
+ Parse the Authentication-Results header and return a readable summary.
35
+ """
36
+ auth_header = (auth_header or "").lower()
37
+ findings = []
38
+
39
+ if not auth_header:
40
+ return "No Authentication-Results header found"
41
+
42
+ # SPF
43
+ if "spf=pass" in auth_header:
44
+ findings.append("SPF passed")
45
+ elif "spf=fail" in auth_header:
46
+ findings.append("SPF failed")
47
+
48
+ # DKIM
49
+ if "dkim=pass" in auth_header:
50
+ findings.append("DKIM passed")
51
+ elif "dkim=fail" in auth_header or "dkim=permerror" in auth_header:
52
+ findings.append("DKIM failed")
53
+
54
+ # DMARC
55
+ if "dmarc=pass" in auth_header:
56
+ findings.append("DMARC passed")
57
+ elif "dmarc=fail" in auth_header:
58
+ findings.append("DMARC failed")
59
+
60
+ if not findings:
61
+ return "Authentication results unclear or missing"
62
+
63
+ return ", ".join(findings)
64
+
65
+ def analyze_headers(headers, body=""):
66
+ """
67
+ Input: headers dict, optional body text
68
+ Output: (findings: list[str], score: int, auth_summary: str)
69
+ """
70
  findings = []
71
  score = 0
72
+ headers = headers or {}
73
+
74
+ auth_results = (headers.get("Authentication-Results") or headers.get("Authentication-results") or "").lower()
75
+
76
+ # Strict auth failures
77
+ if "dkim=fail" in auth_results or "dkim=permerror" in auth_results:
78
+ findings.append("Header: DKIM check failed")
79
+ score += 30
80
+ if "spf=fail" in auth_results:
81
+ findings.append("Header: SPF check failed")
82
+ score += 30
83
+ if "dmarc=fail" in auth_results:
84
+ findings.append("Header: DMARC check failed")
85
+ score += 30
86
+
87
+ # Softer auth problems
88
+ if any(x in auth_results for x in ["spf=softfail", "spf=neutral", "spf=none"]):
89
+ findings.append("Header: SPF not properly aligned")
90
  score += 10
91
+ if any(x in auth_results for x in ["dmarc=temperror", "dkim=temperror"]):
92
+ findings.append("Header: Temporary auth errors (DKIM/DMARC)")
93
+ score += 5
94
+
95
+ # From and Reply-To domain compare
96
+ from_addr = headers.get("From", "") or ""
97
+ reply_to = headers.get("Reply-To", "") or ""
98
+ from_domain_m = re.search(r'@([a-zA-Z0-9.-]+)', from_addr)
99
+ reply_domain_m = re.search(r'@([a-zA-Z0-9.-]+)', reply_to)
100
+ if from_domain_m and reply_domain_m:
101
+ from_domain = from_domain_m.group(1).lower()
102
+ reply_domain = reply_domain_m.group(1).lower()
103
+ if from_domain != reply_domain:
104
+ findings.append(f"Header: Reply-To domain mismatch (From: {from_domain}, Reply-To: {reply_domain})")
105
+ score += 20
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
106
  else:
107
+ from_domain = from_domain_m.group(1).lower() if from_domain_m else ""
108
+
109
+ # Sender domain analysis
110
+ if from_domain:
111
+ parts = from_domain.split('.')
112
+ tld = parts[-1]
113
+
114
+ # free provider detection
115
+ if from_domain in ["gmail.com", "yahoo.com", "outlook.com", "hotmail.com"]:
116
+ findings.append(f"Header: Free email provider used ({from_domain})")
117
+ score += 8
118
+
119
+ # suspicious domain structure
120
+ if len(parts) > 4 or (parts and any(ch.isdigit() for ch in parts[0])):
121
+ findings.append(f"Header: Suspicious-looking domain structure ({from_domain})")
122
+ score += 15
123
+
124
+ # suspicious TLD
125
+ if tld in SUSPICIOUS_TLDS:
126
+ findings.append(f"Header: Suspicious/abused TLD used ({tld})")
127
+ score += 20
128
+
129
+ # Domain age check
130
+ age_days = get_domain_age_days(from_domain)
131
+ if age_days is not None and age_days < 90:
132
+ findings.append(f"Header: Domain {from_domain} is very new ({age_days} days old)")
133
+ score += 35
134
+
135
+ # brand-squatting / look-alike check
136
+ for brand, official_list in BRAND_OFFICIAL.items():
137
+ if brand in from_domain:
138
+ is_official = any(
139
+ from_domain.endswith("." + off) or from_domain == off
140
+ for off in official_list
141
+ )
142
+ if not is_official:
143
+ findings.append(f"Header: Domain contains brand '{brand}' but is not official ({from_domain})")
144
+ score += 30
145
+
146
+ # fuzzy look-alike
147
+ for legit in official_list:
148
+ ratio = difflib.SequenceMatcher(None, from_domain, legit).ratio()
149
+ if ratio > 0.7 and from_domain != legit:
150
+ findings.append(f"Header: Possible look-alike spoofing ({from_domain} vs {legit})")
151
+ score += 40
152
+
153
+ # Content-to-domain mismatch (organization spoofing)
154
+ if body and "ravenmail" in body.lower() and "ravenmail" not in from_domain:
155
+ findings.append("Header/Content: Possible spoofing — mentions RavenMail but sender domain is unrelated")
156
+ score += 40
157
+
158
+ # Bcc usage
159
+ if headers.get("Bcc") or headers.get("bcc"):
160
+ findings.append("Header: Email sent with BCC (common in mass phishing)")
161
+ score += 12
162
+
163
+ if not findings:
164
+ return ["No suspicious issues found in headers."], 0, "No Authentication-Results header found"
165
+
166
+ # Return findings, cumulative score, and parsed authentication summary
167
+ return findings, score, parse_auth_results(auth_results)