Spaces:
Sleeping
Sleeping
| # body_analyzer.py | |
| import os | |
| import re | |
| import requests | |
| from typing import List | |
| HF_API_KEY = os.getenv("HF_API_KEY") | |
| HF_HEADERS = {"Authorization": f"Bearer {HF_API_KEY}"} if HF_API_KEY else {} | |
| HF_TIMEOUT = 20 # seconds | |
| # ML model names | |
| PHISHING_MODEL = "cybersectony/phishing-email-detection-distilbert_v2.4.1" | |
| ZERO_SHOT_MODEL = "facebook/bart-large-mnli" # for intent/behavior | |
| # Suspicious phrase patterns | |
| SUSPICIOUS_PATTERNS = [ | |
| "verify your account", | |
| "urgent action", | |
| "click here", | |
| "reset password", | |
| "confirm your identity", | |
| "bank account", | |
| "invoice", | |
| "payment required", | |
| "unauthorized login", | |
| "compromised", | |
| "final reminder", | |
| "account suspended", | |
| "account deactivated", | |
| "update your information", | |
| "legal action", | |
| "limited time offer", | |
| "claim your prize", | |
| "verify immediately", | |
| "verify now", | |
| "verify your credentials", | |
| ] | |
| # Zero-shot candidate labels for intent/behavior | |
| BEHAVIOR_LABELS = [ | |
| "credential harvesting", | |
| "invoice/payment fraud", | |
| "marketing", | |
| "benign", | |
| "malware", | |
| "account takeover", | |
| ] | |
| def _call_hf_text_model(model_name: str, text: str): | |
| if not HF_API_KEY: | |
| return None | |
| try: | |
| payload = {"inputs": text} | |
| res = requests.post( | |
| f"https://api-inference.huggingface.co/models/{model_name}", | |
| headers=HF_HEADERS, | |
| json=payload, | |
| timeout=HF_TIMEOUT, | |
| ) | |
| return res.json() | |
| except Exception: | |
| return None | |
| def _call_hf_zero_shot(text: str, candidate_labels: List[str]): | |
| if not HF_API_KEY: | |
| return None | |
| try: | |
| payload = {"inputs": text, "parameters": {"candidate_labels": candidate_labels}} | |
| res = requests.post( | |
| f"https://api-inference.huggingface.co/models/{ZERO_SHOT_MODEL}", | |
| headers=HF_HEADERS, | |
| json=payload, | |
| timeout=HF_TIMEOUT, | |
| ) | |
| return res.json() | |
| except Exception: | |
| return None | |
| def _parse_hf_phishing_model_output(result): | |
| if not result: | |
| return None, 0.0, {} | |
| if isinstance(result, list) and result and isinstance(result[0], dict): | |
| r0 = result[0] | |
| label = r0.get("label") | |
| score = r0.get("score", 0.0) | |
| return label, float(score), {label: float(score)} | |
| if isinstance(result, dict): | |
| labels = result.get("labels") or result.get("label") or [] | |
| scores = result.get("scores") or result.get("score") or [] | |
| if isinstance(labels, list) and isinstance(scores, list) and labels and scores: | |
| all_probs = {lab: float(sc) for lab, sc in zip(labels, scores)} | |
| max_lab = max(all_probs.items(), key=lambda x: x[1]) | |
| return max_lab[0], float(max_lab[1]), all_probs | |
| return None, 0.0, {} | |
| def analyze_body(subject: str, body: str, urls: list, images: list): | |
| findings = [] | |
| score = 0 | |
| highlighted_body = (body or "") | |
| combined_lower = ((subject or "") + "\n" + (body or "")).lower() | |
| for pattern in SUSPICIOUS_PATTERNS: | |
| if pattern in combined_lower: | |
| findings.append(f"Suspicious phrase detected: \"{pattern}\"") | |
| score += 18 | |
| try: | |
| highlighted_body = re.sub(re.escape(pattern), f"<mark>{pattern}</mark>", highlighted_body, flags=re.IGNORECASE) | |
| except Exception: | |
| pass | |
| # URL checks | |
| for u in urls or []: | |
| findings.append(f"Suspicious URL detected: {u}") | |
| score += 10 | |
| try: | |
| highlighted_body = re.sub(re.escape(u), f"<mark>{u}</mark>", highlighted_body, flags=re.IGNORECASE) | |
| except Exception: | |
| pass | |
| # ML phishing model | |
| ml_label = None | |
| ml_conf = 0.0 | |
| model_input = "\n".join([subject or "", body or "", "\n".join(urls or [])]).strip() | |
| if model_input and HF_API_KEY: | |
| raw = _call_hf_text_model(PHISHING_MODEL, model_input) | |
| label, conf, _ = _parse_hf_phishing_model_output(raw) | |
| if label: | |
| ml_label = label | |
| ml_conf = conf | |
| findings.append(f"HuggingFace phishing model β {label} (conf {conf:.2f})") | |
| score += int(conf * 100 * 0.9) | |
| # Zero-shot behavior | |
| behavior = None | |
| behavior_conf = 0.0 | |
| if HF_API_KEY and model_input: | |
| zs = _call_hf_zero_shot(model_input, BEHAVIOR_LABELS) | |
| try: | |
| if isinstance(zs, dict) and "labels" in zs and "scores" in zs: | |
| behavior = zs["labels"][0] | |
| behavior_conf = float(zs["scores"][0]) | |
| findings.append(f"Behavior inference β {behavior} (conf {behavior_conf:.2f})") | |
| if behavior_conf >= 0.7: | |
| score += int(behavior_conf * 30) | |
| except Exception: | |
| pass | |
| if ml_conf >= 0.8 and ("phishing" in (ml_label or "").lower()): | |
| score = max(score, 80) | |
| score = int(max(0, min(score, 100))) | |
| # Verdict | |
| if score >= 70: | |
| verdict = "π¨ Malicious" | |
| elif 50 <= score < 70: | |
| verdict = "β οΈ Suspicious" | |
| elif 30 <= score < 50: | |
| verdict = "π© Spam" | |
| else: | |
| verdict = "β Safe" | |
| findings.append("No strong phishing signals detected by models/heuristics.") | |
| # Return exactly 4 values | |
| return findings, score, highlighted_body, verdict | |