import requests import os import re HF_API_KEY = os.getenv("HF_API_KEY") HF_HEADERS = {"Authorization": f"Bearer {HF_API_KEY}"} if HF_API_KEY else {} MODELS = { "ai_detector": "roberta-base-openai-detector", "sentiment": "finiteautomata/bertweet-base-sentiment-analysis", "spam": "mrm8488/bert-tiny-finetuned-sms-spam-detection", } SUSPICIOUS_PATTERNS = [ r"verify your account", r"urgent action", r"click here", r"reset (your )?password", r"confirm (your )?identity", r"bank account", r"invoice", r"payment (required|overdue|failed|method expired)", r"unauthorized login", r"compromised", r"final reminder", r"account (suspended|deactivated|locked)", r"update your (information|details|billing)", r"legal action", r"free trial", r"limited time offer", r"click below", r"winner", r"congratulations", r"urgent response", r"claim your prize", r"act now", r"unsubscribe", r"lottery", r"risk-free", ] def query_hf(model, text): if not HF_API_KEY: return None try: res = requests.post( f"https://api-inference.huggingface.co/models/{model}", headers=HF_HEADERS, json={"inputs": text[:1000]}, timeout=15, ) return res.json() except Exception: return None def parse_hf_result(result): # Common shapes: [{"label": "...", "score": ...}] or {"labels":[...], "scores":[...]} if not result: return None, None if isinstance(result, list) and result and isinstance(result[0], dict): if "label" in result[0] and "score" in result[0]: return result[0]["label"], result[0]["score"] if isinstance(result, dict): labels = result.get("labels") or [] scores = result.get("scores") or [] if labels and scores: return labels[0], scores[0] return None, None def analyze_body(text): findings = [] score = 0 body_lower = (text or "").lower() highlighted_body = text or "" # 1) Suspicious phrases for pattern in SUSPICIOUS_PATTERNS: matches = re.findall(pattern, body_lower) for match in matches: display = match if isinstance(match, str) else (match[0] if match else "") if not display: continue findings.append(f'Suspicious phrase detected: "{display}"') score += 15 # tuned down to reduce instant Malicious highlighted_body = re.sub( re.escape(display), f"{display}", highlighted_body, flags=re.IGNORECASE, ) # 2) URLs urls = re.findall(r'https?://[^\s]+', body_lower) for url in urls: findings.append(f"Suspicious URL detected: {url}") score += 10 highlighted_body = re.sub(re.escape(url), f"{url}", highlighted_body, flags=re.IGNORECASE) # 3) AI text detector label, confidence = parse_hf_result(query_hf(MODELS["ai_detector"], text or "")) if label: findings.append(f"Body: AI Detector → {label} (confidence {confidence:.2f})") # 4) Sentiment label, confidence = parse_hf_result(query_hf(MODELS["sentiment"], text or "")) if label: findings.append(f"Body: Sentiment → {label} (confidence {confidence:.2f})") if label.lower() == "negative": score += 10 # 5) Spam detector label, confidence = parse_hf_result(query_hf(MODELS["spam"], text or "")) if label: findings.append(f"Body: Spam Detector → {label} (confidence {confidence:.2f})") if label.lower() == "spam": score += 25 # 6) Verdict if score >= 50: verdict = "Malicious / Spam" elif score >= 20: verdict = "Suspicious" else: verdict = "Safe" findings.append("No suspicious content detected in body.") return findings, score, highlighted_body, verdict