CySecGuardians / body_analyzer.py
princemaxp's picture
Update body_analyzer.py
3bd60c1 verified
# body_analyzer.py
import os
import re
import requests
from typing import List
HF_API_KEY = os.getenv("HF_API_KEY")
HF_HEADERS = {"Authorization": f"Bearer {HF_API_KEY}"} if HF_API_KEY else {}
HF_TIMEOUT = 20 # seconds
# ML model names
PHISHING_MODEL = "cybersectony/phishing-email-detection-distilbert_v2.4.1"
ZERO_SHOT_MODEL = "facebook/bart-large-mnli" # for intent/behavior
# Suspicious phrase patterns
SUSPICIOUS_PATTERNS = [
"verify your account",
"urgent action",
"click here",
"reset password",
"confirm your identity",
"bank account",
"invoice",
"payment required",
"unauthorized login",
"compromised",
"final reminder",
"account suspended",
"account deactivated",
"update your information",
"legal action",
"limited time offer",
"claim your prize",
"verify immediately",
"verify now",
"verify your credentials",
]
# Zero-shot candidate labels for intent/behavior
BEHAVIOR_LABELS = [
"credential harvesting",
"invoice/payment fraud",
"marketing",
"benign",
"malware",
"account takeover",
]
def _call_hf_text_model(model_name: str, text: str):
if not HF_API_KEY:
return None
try:
payload = {"inputs": text}
res = requests.post(
f"https://api-inference.huggingface.co/models/{model_name}",
headers=HF_HEADERS,
json=payload,
timeout=HF_TIMEOUT,
)
return res.json()
except Exception:
return None
def _call_hf_zero_shot(text: str, candidate_labels: List[str]):
if not HF_API_KEY:
return None
try:
payload = {"inputs": text, "parameters": {"candidate_labels": candidate_labels}}
res = requests.post(
f"https://api-inference.huggingface.co/models/{ZERO_SHOT_MODEL}",
headers=HF_HEADERS,
json=payload,
timeout=HF_TIMEOUT,
)
return res.json()
except Exception:
return None
def _parse_hf_phishing_model_output(result):
if not result:
return None, 0.0, {}
if isinstance(result, list) and result and isinstance(result[0], dict):
r0 = result[0]
label = r0.get("label")
score = r0.get("score", 0.0)
return label, float(score), {label: float(score)}
if isinstance(result, dict):
labels = result.get("labels") or result.get("label") or []
scores = result.get("scores") or result.get("score") or []
if isinstance(labels, list) and isinstance(scores, list) and labels and scores:
all_probs = {lab: float(sc) for lab, sc in zip(labels, scores)}
max_lab = max(all_probs.items(), key=lambda x: x[1])
return max_lab[0], float(max_lab[1]), all_probs
return None, 0.0, {}
def analyze_body(subject: str, body: str, urls: list, images: list):
findings = []
score = 0
highlighted_body = (body or "")
combined_lower = ((subject or "") + "\n" + (body or "")).lower()
for pattern in SUSPICIOUS_PATTERNS:
if pattern in combined_lower:
findings.append(f"Suspicious phrase detected: \"{pattern}\"")
score += 18
try:
highlighted_body = re.sub(re.escape(pattern), f"<mark>{pattern}</mark>", highlighted_body, flags=re.IGNORECASE)
except Exception:
pass
# URL checks
for u in urls or []:
findings.append(f"Suspicious URL detected: {u}")
score += 10
try:
highlighted_body = re.sub(re.escape(u), f"<mark>{u}</mark>", highlighted_body, flags=re.IGNORECASE)
except Exception:
pass
# ML phishing model
ml_label = None
ml_conf = 0.0
model_input = "\n".join([subject or "", body or "", "\n".join(urls or [])]).strip()
if model_input and HF_API_KEY:
raw = _call_hf_text_model(PHISHING_MODEL, model_input)
label, conf, _ = _parse_hf_phishing_model_output(raw)
if label:
ml_label = label
ml_conf = conf
findings.append(f"HuggingFace phishing model β†’ {label} (conf {conf:.2f})")
score += int(conf * 100 * 0.9)
# Zero-shot behavior
behavior = None
behavior_conf = 0.0
if HF_API_KEY and model_input:
zs = _call_hf_zero_shot(model_input, BEHAVIOR_LABELS)
try:
if isinstance(zs, dict) and "labels" in zs and "scores" in zs:
behavior = zs["labels"][0]
behavior_conf = float(zs["scores"][0])
findings.append(f"Behavior inference β†’ {behavior} (conf {behavior_conf:.2f})")
if behavior_conf >= 0.7:
score += int(behavior_conf * 30)
except Exception:
pass
if ml_conf >= 0.8 and ("phishing" in (ml_label or "").lower()):
score = max(score, 80)
score = int(max(0, min(score, 100)))
# Verdict
if score >= 70:
verdict = "🚨 Malicious"
elif 50 <= score < 70:
verdict = "⚠️ Suspicious"
elif 30 <= score < 50:
verdict = "πŸ“© Spam"
else:
verdict = "βœ… Safe"
findings.append("No strong phishing signals detected by models/heuristics.")
# Return exactly 4 values
return findings, score, highlighted_body, verdict