# body_analyzer.py import os import re import requests from typing import List HF_API_KEY = os.getenv("HF_API_KEY") HF_HEADERS = {"Authorization": f"Bearer {HF_API_KEY}"} if HF_API_KEY else {} HF_TIMEOUT = 20 # seconds # ML model names PHISHING_MODEL = "cybersectony/phishing-email-detection-distilbert_v2.4.1" ZERO_SHOT_MODEL = "facebook/bart-large-mnli" # for intent/behavior # Suspicious phrase patterns SUSPICIOUS_PATTERNS = [ "verify your account", "urgent action", "click here", "reset password", "confirm your identity", "bank account", "invoice", "payment required", "unauthorized login", "compromised", "final reminder", "account suspended", "account deactivated", "update your information", "legal action", "limited time offer", "claim your prize", "verify immediately", "verify now", "verify your credentials", ] # Zero-shot candidate labels for intent/behavior BEHAVIOR_LABELS = [ "credential harvesting", "invoice/payment fraud", "marketing", "benign", "malware", "account takeover", ] def _call_hf_text_model(model_name: str, text: str): if not HF_API_KEY: return None try: payload = {"inputs": text} res = requests.post( f"https://api-inference.huggingface.co/models/{model_name}", headers=HF_HEADERS, json=payload, timeout=HF_TIMEOUT, ) return res.json() except Exception: return None def _call_hf_zero_shot(text: str, candidate_labels: List[str]): if not HF_API_KEY: return None try: payload = {"inputs": text, "parameters": {"candidate_labels": candidate_labels}} res = requests.post( f"https://api-inference.huggingface.co/models/{ZERO_SHOT_MODEL}", headers=HF_HEADERS, json=payload, timeout=HF_TIMEOUT, ) return res.json() except Exception: return None def _parse_hf_phishing_model_output(result): if not result: return None, 0.0, {} if isinstance(result, list) and result and isinstance(result[0], dict): r0 = result[0] label = r0.get("label") score = r0.get("score", 0.0) return label, float(score), {label: float(score)} if isinstance(result, dict): labels = result.get("labels") or result.get("label") or [] scores = result.get("scores") or result.get("score") or [] if isinstance(labels, list) and isinstance(scores, list) and labels and scores: all_probs = {lab: float(sc) for lab, sc in zip(labels, scores)} max_lab = max(all_probs.items(), key=lambda x: x[1]) return max_lab[0], float(max_lab[1]), all_probs return None, 0.0, {} def analyze_body(subject: str, body: str, urls: list, images: list): findings = [] score = 0 highlighted_body = (body or "") combined_lower = ((subject or "") + "\n" + (body or "")).lower() for pattern in SUSPICIOUS_PATTERNS: if pattern in combined_lower: findings.append(f"Suspicious phrase detected: \"{pattern}\"") score += 18 try: highlighted_body = re.sub(re.escape(pattern), f"{pattern}", highlighted_body, flags=re.IGNORECASE) except Exception: pass # URL checks for u in urls or []: findings.append(f"Suspicious URL detected: {u}") score += 10 try: highlighted_body = re.sub(re.escape(u), f"{u}", highlighted_body, flags=re.IGNORECASE) except Exception: pass # ML phishing model ml_label = None ml_conf = 0.0 model_input = "\n".join([subject or "", body or "", "\n".join(urls or [])]).strip() if model_input and HF_API_KEY: raw = _call_hf_text_model(PHISHING_MODEL, model_input) label, conf, _ = _parse_hf_phishing_model_output(raw) if label: ml_label = label ml_conf = conf findings.append(f"HuggingFace phishing model → {label} (conf {conf:.2f})") score += int(conf * 100 * 0.9) # Zero-shot behavior behavior = None behavior_conf = 0.0 if HF_API_KEY and model_input: zs = _call_hf_zero_shot(model_input, BEHAVIOR_LABELS) try: if isinstance(zs, dict) and "labels" in zs and "scores" in zs: behavior = zs["labels"][0] behavior_conf = float(zs["scores"][0]) findings.append(f"Behavior inference → {behavior} (conf {behavior_conf:.2f})") if behavior_conf >= 0.7: score += int(behavior_conf * 30) except Exception: pass if ml_conf >= 0.8 and ("phishing" in (ml_label or "").lower()): score = max(score, 80) score = int(max(0, min(score, 100))) # Verdict if score >= 70: verdict = "🚨 Malicious" elif 50 <= score < 70: verdict = "⚠️ Suspicious" elif 30 <= score < 50: verdict = "📩 Spam" else: verdict = "✅ Safe" findings.append("No strong phishing signals detected by models/heuristics.") # Return exactly 4 values return findings, score, highlighted_body, verdict