Spaces:
Sleeping
Sleeping
| # body_analyzer.py | |
| import os | |
| import re | |
| import requests | |
| import base64 | |
| import io | |
| from typing import List | |
| HF_API_KEY = os.getenv("HF_API_KEY") | |
| HF_HEADERS = {"Authorization": f"Bearer {HF_API_KEY}"} if HF_API_KEY else {} | |
| HF_TIMEOUT = 20 # seconds | |
| # ML model names | |
| PHISHING_MODEL = "cybersectony/phishing-email-detection-distilbert_v2.4.1" | |
| ZERO_SHOT_MODEL = "facebook/bart-large-mnli" # for intent/behavior | |
| # Suspicious phrase patterns (lowercased when matching) | |
| SUSPICIOUS_PATTERNS = [ | |
| "verify your account", | |
| "urgent action", | |
| "click here", | |
| "reset password", | |
| "confirm your identity", | |
| "bank account", | |
| "invoice", | |
| "payment required", | |
| "unauthorized login", | |
| "compromised", | |
| "final reminder", | |
| "account suspended", | |
| "account deactivated", | |
| "update your information", | |
| "legal action", | |
| "limited time offer", | |
| "claim your prize", | |
| "verify immediately", | |
| "verify now", | |
| "verify your credentials", | |
| ] | |
| # zero-shot candidate labels for message behavior | |
| BEHAVIOR_LABELS = [ | |
| "credential harvesting", | |
| "invoice/payment fraud", | |
| "marketing", | |
| "benign", | |
| "malware", | |
| "account takeover", | |
| ] | |
| def _call_hf_text_model(model_name: str, text: str): | |
| """Call HF Inference API for text. Return raw JSON or None on failure.""" | |
| if not HF_API_KEY: | |
| return None | |
| try: | |
| payload = {"inputs": text} | |
| # For zero-shot, caller will pass parameters in payload if needed | |
| res = requests.post( | |
| f"https://api-inference.huggingface.co/models/{model_name}", | |
| headers=HF_HEADERS, | |
| json=payload, | |
| timeout=HF_TIMEOUT, | |
| ) | |
| return res.json() | |
| except Exception: | |
| return None | |
| def _call_hf_zero_shot(text: str, candidate_labels: List[str]): | |
| if not HF_API_KEY: | |
| return None | |
| try: | |
| payload = {"inputs": text, "parameters": {"candidate_labels": candidate_labels}} | |
| res = requests.post( | |
| f"https://api-inference.huggingface.co/models/{ZERO_SHOT_MODEL}", | |
| headers=HF_HEADERS, | |
| json=payload, | |
| timeout=HF_TIMEOUT, | |
| ) | |
| return res.json() | |
| except Exception: | |
| return None | |
| def _call_hf_image_ocr(model_name: str, image_bytes: bytes): | |
| """ | |
| Call HF image OCR model endpoint. Returns string or None. | |
| Uses raw bytes upload: content-type application/octet-stream body. | |
| """ | |
| if not HF_API_KEY: | |
| return None | |
| try: | |
| headers = HF_HEADERS.copy() | |
| headers["Content-Type"] = "application/octet-stream" | |
| res = requests.post( | |
| f"https://api-inference.huggingface.co/models/{model_name}", | |
| headers=headers, | |
| data=image_bytes, | |
| timeout=HF_TIMEOUT, | |
| ) | |
| # Many vision models return {"generated_text": "..."} or list; attempt to parse common shapes | |
| data = res.json() | |
| if isinstance(data, dict): | |
| # TrOCR-style may return {"generated_text": "..."} | |
| if "generated_text" in data: | |
| return data["generated_text"] | |
| # Some OCR endpoints may return list of dicts | |
| if isinstance(data, list) and data and isinstance(data[0], dict): | |
| # choose text-like fields if present | |
| candidate = data[0].get("generated_text") or data[0].get("text") or data[0].get("caption") | |
| return candidate | |
| # fallback: try string concatenation if possible | |
| if isinstance(data, str): | |
| return data | |
| except Exception: | |
| pass | |
| return None | |
| # local pytesseract fallback | |
| def _ocr_local_pytesseract(image_bytes): | |
| try: | |
| from PIL import Image | |
| import pytesseract | |
| import io | |
| image = Image.open(io.BytesIO(image_bytes)).convert("RGB") | |
| text = pytesseract.image_to_string(image) | |
| return text | |
| except Exception: | |
| return None | |
| def _parse_hf_phishing_model_output(result): | |
| """ | |
| Expected: model may return list of logits/probs. Try common shapes. | |
| Returns: label:str, confidence:float (0..1), all_probs:dict | |
| """ | |
| if not result: | |
| return None, 0.0, {} | |
| # if list of dicts with label & score | |
| if isinstance(result, list) and len(result) > 0 and isinstance(result[0], dict): | |
| r0 = result[0] | |
| label = r0.get("label") | |
| score = r0.get("score", 0.0) | |
| return label, float(score or 0.0), {label: float(score or 0.0)} | |
| # if dict with labels & scores | |
| if isinstance(result, dict): | |
| # sometimes returns {'labels': [...], 'scores': [...]} | |
| labels = result.get("labels") or result.get("label") or [] | |
| scores = result.get("scores") or result.get("score") or [] | |
| if isinstance(labels, list) and isinstance(scores, list) and labels and scores: | |
| all_probs = {lab: float(sc) for lab, sc in zip(labels, scores)} | |
| # pick max | |
| max_lab = max(all_probs.items(), key=lambda x: x[1]) | |
| return max_lab[0], float(max_lab[1]), all_probs | |
| return None, 0.0, {} | |
| def analyze_body(subject: str, body: str, urls: list, images: list): | |
| """ | |
| Inputs: | |
| subject: email subject (str) | |
| body: plaintext combined body (str) | |
| urls: list of urls | |
| images: list of image bytes | |
| Returns: | |
| findings (list[str]), score (int 0..100), highlighted_body (str), verdict (str) | |
| """ | |
| findings = [] | |
| score = 0 | |
| highlighted_body = (body or "") # will attempt to highlight suspicious text/URLs | |
| # 1) Basic heuristics on subject + body | |
| combined_lower = ((subject or "") + "\n" + (body or "")).lower() | |
| for pattern in SUSPICIOUS_PATTERNS: | |
| if pattern in combined_lower: | |
| findings.append(f"Suspicious phrase detected: \"{pattern}\"") | |
| # weight subject phrases more heavily | |
| if pattern in (subject or "").lower(): | |
| score += 30 | |
| else: | |
| score += 18 | |
| try: | |
| highlighted_body = re.sub(re.escape(pattern), f"<mark>{pattern}</mark>", highlighted_body, flags=re.IGNORECASE) | |
| except Exception: | |
| pass | |
| # 2) URL heuristics (always include) | |
| for u in urls or []: | |
| findings.append(f"Suspicious URL detected: {u}") | |
| score += 10 | |
| try: | |
| highlighted_body = re.sub(re.escape(u), f"<mark>{u}</mark>", highlighted_body, flags=re.IGNORECASE) | |
| except Exception: | |
| pass | |
| # suspicious domain structure bump | |
| domain_match = re.search(r"https?://([^/]+)/?", u) | |
| if domain_match: | |
| domain = domain_match.group(1) | |
| if len(domain) > 25 or any(ch.isdigit() for ch in domain.split(".")[0]): | |
| findings.append(f"URL: suspicious-looking domain {domain}") | |
| score += 10 | |
| # 3) OCR images | |
| ocr_texts = [] | |
| if images: | |
| for img_bytes in images: | |
| text = None | |
| # Prefer HF TrOCR-like endpoint if HF_API_KEY provided | |
| if HF_API_KEY: | |
| # try a well-known OCR-capable model; TrOCR base is a candidate | |
| ocr_result = _call_hf_image_ocr("microsoft/trocr-base-stage1", img_bytes) | |
| if ocr_result: | |
| text = ocr_result | |
| if not text: | |
| # fallback to local pytesseract | |
| text = _ocr_local_pytesseract(img_bytes) | |
| if text: | |
| ocr_texts.append(text) | |
| findings.append("OCR: extracted text from image.") | |
| # add small heuristic score for OCR results | |
| lower = text.lower() | |
| for pat in SUSPICIOUS_PATTERNS: | |
| if pat in lower: | |
| findings.append(f"OCR: suspicious phrase in image -> \"{pat}\"") | |
| score += 20 | |
| # 4) ML phishing model (Hugging Face) | |
| ml_label = None | |
| ml_conf = 0.0 | |
| ml_all = {} | |
| model_input = "\n".join([subject or "", body or "", "\n".join(urls or []), "\n".join(ocr_texts or [])]).strip() | |
| if model_input and HF_API_KEY: | |
| raw = _call_hf_text_model(PHISHING_MODEL, model_input) | |
| label, conf, allp = _parse_hf_phishing_model_output(raw) | |
| if label: | |
| ml_label = label | |
| ml_conf = conf | |
| ml_all = allp | |
| findings.append(f"HuggingFace phishing model β {label} (conf {conf:.2f})") | |
| # confidence scaled to score (but cap) | |
| score += int(conf * 100 * 0.9) # slightly reduce to avoid double-counting | |
| # 5) Zero-shot behavior intent model (when HF available) | |
| behavior = None | |
| behavior_conf = 0.0 | |
| if HF_API_KEY and model_input: | |
| zs = _call_hf_zero_shot(model_input, BEHAVIOR_LABELS) | |
| try: | |
| if isinstance(zs, dict) and "labels" in zs and "scores" in zs: | |
| best_label = zs["labels"][0] | |
| best_score = float(zs["scores"][0]) | |
| behavior = best_label | |
| behavior_conf = best_score | |
| findings.append(f"Behavior inference β {behavior} (conf {behavior_conf:.2f})") | |
| # add modest boost for strong behavior confidence | |
| if behavior_conf >= 0.7: | |
| score += int(behavior_conf * 30) | |
| except Exception: | |
| pass | |
| # 6) Final heuristics fallbacks | |
| # If ML already strongly flagged phishing, ensure high score | |
| if ml_conf >= 0.8 and ("phishing" in (ml_label or "").lower()): | |
| score = max(score, 80) | |
| # clamp | |
| try: | |
| score = int(max(0, min(score, 100))) | |
| except Exception: | |
| score = 0 | |
| # Final verdict mapping (tunable) | |
| if score >= 70: | |
| verdict = "π¨ Malicious" | |
| elif 50 <= score < 70: | |
| verdict = "β οΈ Suspicious" | |
| elif 30 <= score < 50: | |
| verdict = "π© Spam" | |
| else: | |
| verdict = "β Safe" | |
| findings.append("No strong phishing signals detected by models/heuristics.") | |
| # Return findings, score, highlighted body (with possible <mark> tags), verdict | |
| return findings, score, highlighted_body, verdict | |