princemaxp commited on
Commit
9b5168a
Β·
verified Β·
1 Parent(s): da36e3d

Update body_analyzer.py

Browse files
Files changed (1) hide show
  1. body_analyzer.py +249 -96
body_analyzer.py CHANGED
@@ -1,127 +1,280 @@
1
- import requests
2
  import os
3
  import re
 
 
 
 
4
 
5
  HF_API_KEY = os.getenv("HF_API_KEY")
6
  HF_HEADERS = {"Authorization": f"Bearer {HF_API_KEY}"} if HF_API_KEY else {}
 
7
 
8
- MODELS = {
9
- "ai_detector": "roberta-base-openai-detector",
10
- "sentiment": "finiteautomata/bertweet-base-sentiment-analysis",
11
- "spam": "mrm8488/bert-tiny-finetuned-sms-spam-detection",
12
- }
13
 
 
14
  SUSPICIOUS_PATTERNS = [
15
- r"verify your account",
16
- r"urgent action",
17
- r"click here",
18
- r"reset (your )?password",
19
- r"confirm (your )?identity",
20
- r"bank account",
21
- r"invoice",
22
- r"payment (required|overdue|failed|method expired)",
23
- r"unauthorized login",
24
- r"compromised",
25
- r"final reminder",
26
- r"account (suspended|deactivated|locked)",
27
- r"update your (information|details|billing)",
28
- r"legal action",
29
- r"free trial",
30
- r"limited time offer",
31
- r"click below",
32
- r"winner",
33
- r"congratulations",
34
- r"urgent response",
35
- r"claim your prize",
36
- r"act now",
37
- r"unsubscribe",
38
- r"lottery",
39
- r"risk-free",
40
  ]
41
 
42
- def query_hf(model, text):
 
 
 
 
 
 
 
 
 
 
 
43
  if not HF_API_KEY:
44
  return None
45
  try:
 
 
46
  res = requests.post(
47
- f"https://api-inference.huggingface.co/models/{model}",
48
  headers=HF_HEADERS,
49
- json={"inputs": text[:1000]},
50
- timeout=15,
51
  )
52
  return res.json()
53
  except Exception:
54
  return None
55
 
56
- def parse_hf_result(result):
57
- # Common shapes: [{"label": "...", "score": ...}] or {"labels":[...], "scores":[...]}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
58
  if not result:
59
- return None, None
60
- if isinstance(result, list) and result and isinstance(result[0], dict):
61
- if "label" in result[0] and "score" in result[0]:
62
- return result[0]["label"], result[0]["score"]
 
 
 
 
63
  if isinstance(result, dict):
64
- labels = result.get("labels") or []
65
- scores = result.get("scores") or []
66
- if labels and scores:
67
- return labels[0], scores[0]
68
- return None, None
 
 
 
 
69
 
70
- def analyze_body(text):
 
 
 
 
 
 
 
 
 
71
  findings = []
72
  score = 0
73
- body_lower = (text or "").lower()
74
- highlighted_body = text or ""
75
 
76
- # 1) Suspicious phrases
 
77
  for pattern in SUSPICIOUS_PATTERNS:
78
- matches = re.findall(pattern, body_lower)
79
- for match in matches:
80
- display = match if isinstance(match, str) else (match[0] if match else "")
81
- if not display:
82
- continue
83
- findings.append(f'Suspicious phrase detected: "{display}"')
84
- score += 15 # tuned down to reduce instant Malicious
85
- highlighted_body = re.sub(
86
- re.escape(display),
87
- f"<mark>{display}</mark>",
88
- highlighted_body,
89
- flags=re.IGNORECASE,
90
- )
91
-
92
- # 2) URLs
93
- urls = re.findall(r'https?://[^\s]+', body_lower)
94
- for url in urls:
95
- findings.append(f"Suspicious URL detected: {url}")
96
  score += 10
97
- highlighted_body = re.sub(re.escape(url), f"<mark>{url}</mark>", highlighted_body, flags=re.IGNORECASE)
98
-
99
- # 3) AI text detector
100
- label, confidence = parse_hf_result(query_hf(MODELS["ai_detector"], text or ""))
101
- if label:
102
- findings.append(f"Body: AI Detector β†’ {label} (confidence {confidence:.2f})")
103
-
104
- # 4) Sentiment
105
- label, confidence = parse_hf_result(query_hf(MODELS["sentiment"], text or ""))
106
- if label:
107
- findings.append(f"Body: Sentiment β†’ {label} (confidence {confidence:.2f})")
108
- if label.lower() == "negative":
109
- score += 10
110
-
111
- # 5) Spam detector
112
- label, confidence = parse_hf_result(query_hf(MODELS["spam"], text or ""))
113
- if label:
114
- findings.append(f"Body: Spam Detector β†’ {label} (confidence {confidence:.2f})")
115
- if label.lower() == "spam":
116
- score += 25
117
-
118
- # 6) Verdict
119
- if score >= 50:
120
- verdict = "Malicious / Spam"
121
- elif score >= 20:
122
- verdict = "Suspicious"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
123
  else:
124
- verdict = "Safe"
125
- findings.append("No suspicious content detected in body.")
126
 
 
127
  return findings, score, highlighted_body, verdict
 
1
+ # body_analyzer.py
2
  import os
3
  import re
4
+ import requests
5
+ import base64
6
+ import io
7
+ from typing import List
8
 
9
  HF_API_KEY = os.getenv("HF_API_KEY")
10
  HF_HEADERS = {"Authorization": f"Bearer {HF_API_KEY}"} if HF_API_KEY else {}
11
+ HF_TIMEOUT = 20 # seconds
12
 
13
+ # ML model names
14
+ PHISHING_MODEL = "cybersectony/phishing-email-detection-distilbert_v2.4.1"
15
+ ZERO_SHOT_MODEL = "facebook/bart-large-mnli" # for intent/behavior
 
 
16
 
17
+ # Suspicious phrase patterns (lowercased when matching)
18
  SUSPICIOUS_PATTERNS = [
19
+ "verify your account",
20
+ "urgent action",
21
+ "click here",
22
+ "reset password",
23
+ "confirm your identity",
24
+ "bank account",
25
+ "invoice",
26
+ "payment required",
27
+ "unauthorized login",
28
+ "compromised",
29
+ "final reminder",
30
+ "account suspended",
31
+ "account deactivated",
32
+ "update your information",
33
+ "legal action",
34
+ "limited time offer",
35
+ "claim your prize",
36
+ "verify immediately",
37
+ "verify now",
38
+ "verify your credentials",
 
 
 
 
 
39
  ]
40
 
41
+ # zero-shot candidate labels for message behavior
42
+ BEHAVIOR_LABELS = [
43
+ "credential harvesting",
44
+ "invoice/payment fraud",
45
+ "marketing",
46
+ "benign",
47
+ "malware",
48
+ "account takeover",
49
+ ]
50
+
51
+ def _call_hf_text_model(model_name: str, text: str):
52
+ """Call HF Inference API for text. Return raw JSON or None on failure."""
53
  if not HF_API_KEY:
54
  return None
55
  try:
56
+ payload = {"inputs": text}
57
+ # For zero-shot, caller will pass parameters in payload if needed
58
  res = requests.post(
59
+ f"https://api-inference.huggingface.co/models/{model_name}",
60
  headers=HF_HEADERS,
61
+ json=payload,
62
+ timeout=HF_TIMEOUT,
63
  )
64
  return res.json()
65
  except Exception:
66
  return None
67
 
68
+ def _call_hf_zero_shot(text: str, candidate_labels: List[str]):
69
+ if not HF_API_KEY:
70
+ return None
71
+ try:
72
+ payload = {"inputs": text, "parameters": {"candidate_labels": candidate_labels}}
73
+ res = requests.post(
74
+ f"https://api-inference.huggingface.co/models/{ZERO_SHOT_MODEL}",
75
+ headers=HF_HEADERS,
76
+ json=payload,
77
+ timeout=HF_TIMEOUT,
78
+ )
79
+ return res.json()
80
+ except Exception:
81
+ return None
82
+
83
+ def _call_hf_image_ocr(model_name: str, image_bytes: bytes):
84
+ """
85
+ Call HF image OCR model endpoint. Returns string or None.
86
+ Uses raw bytes upload: content-type application/octet-stream body.
87
+ """
88
+ if not HF_API_KEY:
89
+ return None
90
+ try:
91
+ headers = HF_HEADERS.copy()
92
+ headers["Content-Type"] = "application/octet-stream"
93
+ res = requests.post(
94
+ f"https://api-inference.huggingface.co/models/{model_name}",
95
+ headers=headers,
96
+ data=image_bytes,
97
+ timeout=HF_TIMEOUT,
98
+ )
99
+ # Many vision models return {"generated_text": "..."} or list; attempt to parse common shapes
100
+ data = res.json()
101
+ if isinstance(data, dict):
102
+ # TrOCR-style may return {"generated_text": "..."}
103
+ if "generated_text" in data:
104
+ return data["generated_text"]
105
+ # Some OCR endpoints may return list of dicts
106
+ if isinstance(data, list) and data and isinstance(data[0], dict):
107
+ # choose text-like fields if present
108
+ candidate = data[0].get("generated_text") or data[0].get("text") or data[0].get("caption")
109
+ return candidate
110
+ # fallback: try string concatenation if possible
111
+ if isinstance(data, str):
112
+ return data
113
+ except Exception:
114
+ pass
115
+ return None
116
+
117
+ # local pytesseract fallback
118
+ def _ocr_local_pytesseract(image_bytes):
119
+ try:
120
+ from PIL import Image
121
+ import pytesseract
122
+ import io
123
+ image = Image.open(io.BytesIO(image_bytes)).convert("RGB")
124
+ text = pytesseract.image_to_string(image)
125
+ return text
126
+ except Exception:
127
+ return None
128
+
129
+ def _parse_hf_phishing_model_output(result):
130
+ """
131
+ Expected: model may return list of logits/probs. Try common shapes.
132
+ Returns: label:str, confidence:float (0..1), all_probs:dict
133
+ """
134
  if not result:
135
+ return None, 0.0, {}
136
+ # if list of dicts with label & score
137
+ if isinstance(result, list) and len(result) > 0 and isinstance(result[0], dict):
138
+ r0 = result[0]
139
+ label = r0.get("label")
140
+ score = r0.get("score", 0.0)
141
+ return label, float(score or 0.0), {label: float(score or 0.0)}
142
+ # if dict with labels & scores
143
  if isinstance(result, dict):
144
+ # sometimes returns {'labels': [...], 'scores': [...]}
145
+ labels = result.get("labels") or result.get("label") or []
146
+ scores = result.get("scores") or result.get("score") or []
147
+ if isinstance(labels, list) and isinstance(scores, list) and labels and scores:
148
+ all_probs = {lab: float(sc) for lab, sc in zip(labels, scores)}
149
+ # pick max
150
+ max_lab = max(all_probs.items(), key=lambda x: x[1])
151
+ return max_lab[0], float(max_lab[1]), all_probs
152
+ return None, 0.0, {}
153
 
154
+ def analyze_body(subject: str, body: str, urls: list, images: list):
155
+ """
156
+ Inputs:
157
+ subject: email subject (str)
158
+ body: plaintext combined body (str)
159
+ urls: list of urls
160
+ images: list of image bytes
161
+ Returns:
162
+ findings (list[str]), score (int 0..100), highlighted_body (str), verdict (str)
163
+ """
164
  findings = []
165
  score = 0
166
+ highlighted_body = (body or "") # will attempt to highlight suspicious text/URLs
 
167
 
168
+ # 1) Basic heuristics on subject + body
169
+ combined_lower = ((subject or "") + "\n" + (body or "")).lower()
170
  for pattern in SUSPICIOUS_PATTERNS:
171
+ if pattern in combined_lower:
172
+ findings.append(f"Suspicious phrase detected: \"{pattern}\"")
173
+ # weight subject phrases more heavily
174
+ if pattern in (subject or "").lower():
175
+ score += 30
176
+ else:
177
+ score += 18
178
+ try:
179
+ highlighted_body = re.sub(re.escape(pattern), f"<mark>{pattern}</mark>", highlighted_body, flags=re.IGNORECASE)
180
+ except Exception:
181
+ pass
182
+
183
+ # 2) URL heuristics (always include)
184
+ for u in urls or []:
185
+ findings.append(f"Suspicious URL detected: {u}")
 
 
 
186
  score += 10
187
+ try:
188
+ highlighted_body = re.sub(re.escape(u), f"<mark>{u}</mark>", highlighted_body, flags=re.IGNORECASE)
189
+ except Exception:
190
+ pass
191
+ # suspicious domain structure bump
192
+ domain_match = re.search(r"https?://([^/]+)/?", u)
193
+ if domain_match:
194
+ domain = domain_match.group(1)
195
+ if len(domain) > 25 or any(ch.isdigit() for ch in domain.split(".")[0]):
196
+ findings.append(f"URL: suspicious-looking domain {domain}")
197
+ score += 10
198
+
199
+ # 3) OCR images
200
+ ocr_texts = []
201
+ if images:
202
+ for img_bytes in images:
203
+ text = None
204
+ # Prefer HF TrOCR-like endpoint if HF_API_KEY provided
205
+ if HF_API_KEY:
206
+ # try a well-known OCR-capable model; TrOCR base is a candidate
207
+ ocr_result = _call_hf_image_ocr("microsoft/trocr-base-stage1", img_bytes)
208
+ if ocr_result:
209
+ text = ocr_result
210
+ if not text:
211
+ # fallback to local pytesseract
212
+ text = _ocr_local_pytesseract(img_bytes)
213
+ if text:
214
+ ocr_texts.append(text)
215
+ findings.append("OCR: extracted text from image.")
216
+ # add small heuristic score for OCR results
217
+ lower = text.lower()
218
+ for pat in SUSPICIOUS_PATTERNS:
219
+ if pat in lower:
220
+ findings.append(f"OCR: suspicious phrase in image -> \"{pat}\"")
221
+ score += 20
222
+
223
+ # 4) ML phishing model (Hugging Face)
224
+ ml_label = None
225
+ ml_conf = 0.0
226
+ ml_all = {}
227
+ model_input = "\n".join([subject or "", body or "", "\n".join(urls or []), "\n".join(ocr_texts or [])]).strip()
228
+ if model_input and HF_API_KEY:
229
+ raw = _call_hf_text_model(PHISHING_MODEL, model_input)
230
+ label, conf, allp = _parse_hf_phishing_model_output(raw)
231
+ if label:
232
+ ml_label = label
233
+ ml_conf = conf
234
+ ml_all = allp
235
+ findings.append(f"HuggingFace phishing model β†’ {label} (conf {conf:.2f})")
236
+ # confidence scaled to score (but cap)
237
+ score += int(conf * 100 * 0.9) # slightly reduce to avoid double-counting
238
+
239
+ # 5) Zero-shot behavior intent model (when HF available)
240
+ behavior = None
241
+ behavior_conf = 0.0
242
+ if HF_API_KEY and model_input:
243
+ zs = _call_hf_zero_shot(model_input, BEHAVIOR_LABELS)
244
+ try:
245
+ if isinstance(zs, dict) and "labels" in zs and "scores" in zs:
246
+ best_label = zs["labels"][0]
247
+ best_score = float(zs["scores"][0])
248
+ behavior = best_label
249
+ behavior_conf = best_score
250
+ findings.append(f"Behavior inference β†’ {behavior} (conf {behavior_conf:.2f})")
251
+ # add modest boost for strong behavior confidence
252
+ if behavior_conf >= 0.7:
253
+ score += int(behavior_conf * 30)
254
+ except Exception:
255
+ pass
256
+
257
+ # 6) Final heuristics fallbacks
258
+ # If ML already strongly flagged phishing, ensure high score
259
+ if ml_conf >= 0.8 and ("phishing" in (ml_label or "").lower()):
260
+ score = max(score, 80)
261
+
262
+ # clamp
263
+ try:
264
+ score = int(max(0, min(score, 100)))
265
+ except Exception:
266
+ score = 0
267
+
268
+ # Final verdict mapping (tunable)
269
+ if score >= 70:
270
+ verdict = "🚨 Malicious"
271
+ elif 50 <= score < 70:
272
+ verdict = "⚠️ Suspicious"
273
+ elif 30 <= score < 50:
274
+ verdict = "πŸ“© Spam"
275
  else:
276
+ verdict = "βœ… Safe"
277
+ findings.append("No strong phishing signals detected by models/heuristics.")
278
 
279
+ # Return findings, score, highlighted body (with possible <mark> tags), verdict
280
  return findings, score, highlighted_body, verdict