CySecGuardians / body_analyzer.py
princemaxp's picture
Update body_analyzer.py
9b5168a verified
raw
history blame
10.1 kB
# body_analyzer.py
import os
import re
import requests
import base64
import io
from typing import List
HF_API_KEY = os.getenv("HF_API_KEY")
HF_HEADERS = {"Authorization": f"Bearer {HF_API_KEY}"} if HF_API_KEY else {}
HF_TIMEOUT = 20 # seconds
# ML model names
PHISHING_MODEL = "cybersectony/phishing-email-detection-distilbert_v2.4.1"
ZERO_SHOT_MODEL = "facebook/bart-large-mnli" # for intent/behavior
# Suspicious phrase patterns (lowercased when matching)
SUSPICIOUS_PATTERNS = [
"verify your account",
"urgent action",
"click here",
"reset password",
"confirm your identity",
"bank account",
"invoice",
"payment required",
"unauthorized login",
"compromised",
"final reminder",
"account suspended",
"account deactivated",
"update your information",
"legal action",
"limited time offer",
"claim your prize",
"verify immediately",
"verify now",
"verify your credentials",
]
# zero-shot candidate labels for message behavior
BEHAVIOR_LABELS = [
"credential harvesting",
"invoice/payment fraud",
"marketing",
"benign",
"malware",
"account takeover",
]
def _call_hf_text_model(model_name: str, text: str):
"""Call HF Inference API for text. Return raw JSON or None on failure."""
if not HF_API_KEY:
return None
try:
payload = {"inputs": text}
# For zero-shot, caller will pass parameters in payload if needed
res = requests.post(
f"https://api-inference.huggingface.co/models/{model_name}",
headers=HF_HEADERS,
json=payload,
timeout=HF_TIMEOUT,
)
return res.json()
except Exception:
return None
def _call_hf_zero_shot(text: str, candidate_labels: List[str]):
if not HF_API_KEY:
return None
try:
payload = {"inputs": text, "parameters": {"candidate_labels": candidate_labels}}
res = requests.post(
f"https://api-inference.huggingface.co/models/{ZERO_SHOT_MODEL}",
headers=HF_HEADERS,
json=payload,
timeout=HF_TIMEOUT,
)
return res.json()
except Exception:
return None
def _call_hf_image_ocr(model_name: str, image_bytes: bytes):
"""
Call HF image OCR model endpoint. Returns string or None.
Uses raw bytes upload: content-type application/octet-stream body.
"""
if not HF_API_KEY:
return None
try:
headers = HF_HEADERS.copy()
headers["Content-Type"] = "application/octet-stream"
res = requests.post(
f"https://api-inference.huggingface.co/models/{model_name}",
headers=headers,
data=image_bytes,
timeout=HF_TIMEOUT,
)
# Many vision models return {"generated_text": "..."} or list; attempt to parse common shapes
data = res.json()
if isinstance(data, dict):
# TrOCR-style may return {"generated_text": "..."}
if "generated_text" in data:
return data["generated_text"]
# Some OCR endpoints may return list of dicts
if isinstance(data, list) and data and isinstance(data[0], dict):
# choose text-like fields if present
candidate = data[0].get("generated_text") or data[0].get("text") or data[0].get("caption")
return candidate
# fallback: try string concatenation if possible
if isinstance(data, str):
return data
except Exception:
pass
return None
# local pytesseract fallback
def _ocr_local_pytesseract(image_bytes):
try:
from PIL import Image
import pytesseract
import io
image = Image.open(io.BytesIO(image_bytes)).convert("RGB")
text = pytesseract.image_to_string(image)
return text
except Exception:
return None
def _parse_hf_phishing_model_output(result):
"""
Expected: model may return list of logits/probs. Try common shapes.
Returns: label:str, confidence:float (0..1), all_probs:dict
"""
if not result:
return None, 0.0, {}
# if list of dicts with label & score
if isinstance(result, list) and len(result) > 0 and isinstance(result[0], dict):
r0 = result[0]
label = r0.get("label")
score = r0.get("score", 0.0)
return label, float(score or 0.0), {label: float(score or 0.0)}
# if dict with labels & scores
if isinstance(result, dict):
# sometimes returns {'labels': [...], 'scores': [...]}
labels = result.get("labels") or result.get("label") or []
scores = result.get("scores") or result.get("score") or []
if isinstance(labels, list) and isinstance(scores, list) and labels and scores:
all_probs = {lab: float(sc) for lab, sc in zip(labels, scores)}
# pick max
max_lab = max(all_probs.items(), key=lambda x: x[1])
return max_lab[0], float(max_lab[1]), all_probs
return None, 0.0, {}
def analyze_body(subject: str, body: str, urls: list, images: list):
"""
Inputs:
subject: email subject (str)
body: plaintext combined body (str)
urls: list of urls
images: list of image bytes
Returns:
findings (list[str]), score (int 0..100), highlighted_body (str), verdict (str)
"""
findings = []
score = 0
highlighted_body = (body or "") # will attempt to highlight suspicious text/URLs
# 1) Basic heuristics on subject + body
combined_lower = ((subject or "") + "\n" + (body or "")).lower()
for pattern in SUSPICIOUS_PATTERNS:
if pattern in combined_lower:
findings.append(f"Suspicious phrase detected: \"{pattern}\"")
# weight subject phrases more heavily
if pattern in (subject or "").lower():
score += 30
else:
score += 18
try:
highlighted_body = re.sub(re.escape(pattern), f"<mark>{pattern}</mark>", highlighted_body, flags=re.IGNORECASE)
except Exception:
pass
# 2) URL heuristics (always include)
for u in urls or []:
findings.append(f"Suspicious URL detected: {u}")
score += 10
try:
highlighted_body = re.sub(re.escape(u), f"<mark>{u}</mark>", highlighted_body, flags=re.IGNORECASE)
except Exception:
pass
# suspicious domain structure bump
domain_match = re.search(r"https?://([^/]+)/?", u)
if domain_match:
domain = domain_match.group(1)
if len(domain) > 25 or any(ch.isdigit() for ch in domain.split(".")[0]):
findings.append(f"URL: suspicious-looking domain {domain}")
score += 10
# 3) OCR images
ocr_texts = []
if images:
for img_bytes in images:
text = None
# Prefer HF TrOCR-like endpoint if HF_API_KEY provided
if HF_API_KEY:
# try a well-known OCR-capable model; TrOCR base is a candidate
ocr_result = _call_hf_image_ocr("microsoft/trocr-base-stage1", img_bytes)
if ocr_result:
text = ocr_result
if not text:
# fallback to local pytesseract
text = _ocr_local_pytesseract(img_bytes)
if text:
ocr_texts.append(text)
findings.append("OCR: extracted text from image.")
# add small heuristic score for OCR results
lower = text.lower()
for pat in SUSPICIOUS_PATTERNS:
if pat in lower:
findings.append(f"OCR: suspicious phrase in image -> \"{pat}\"")
score += 20
# 4) ML phishing model (Hugging Face)
ml_label = None
ml_conf = 0.0
ml_all = {}
model_input = "\n".join([subject or "", body or "", "\n".join(urls or []), "\n".join(ocr_texts or [])]).strip()
if model_input and HF_API_KEY:
raw = _call_hf_text_model(PHISHING_MODEL, model_input)
label, conf, allp = _parse_hf_phishing_model_output(raw)
if label:
ml_label = label
ml_conf = conf
ml_all = allp
findings.append(f"HuggingFace phishing model β†’ {label} (conf {conf:.2f})")
# confidence scaled to score (but cap)
score += int(conf * 100 * 0.9) # slightly reduce to avoid double-counting
# 5) Zero-shot behavior intent model (when HF available)
behavior = None
behavior_conf = 0.0
if HF_API_KEY and model_input:
zs = _call_hf_zero_shot(model_input, BEHAVIOR_LABELS)
try:
if isinstance(zs, dict) and "labels" in zs and "scores" in zs:
best_label = zs["labels"][0]
best_score = float(zs["scores"][0])
behavior = best_label
behavior_conf = best_score
findings.append(f"Behavior inference β†’ {behavior} (conf {behavior_conf:.2f})")
# add modest boost for strong behavior confidence
if behavior_conf >= 0.7:
score += int(behavior_conf * 30)
except Exception:
pass
# 6) Final heuristics fallbacks
# If ML already strongly flagged phishing, ensure high score
if ml_conf >= 0.8 and ("phishing" in (ml_label or "").lower()):
score = max(score, 80)
# clamp
try:
score = int(max(0, min(score, 100)))
except Exception:
score = 0
# Final verdict mapping (tunable)
if score >= 70:
verdict = "🚨 Malicious"
elif 50 <= score < 70:
verdict = "⚠️ Suspicious"
elif 30 <= score < 50:
verdict = "πŸ“© Spam"
else:
verdict = "βœ… Safe"
findings.append("No strong phishing signals detected by models/heuristics.")
# Return findings, score, highlighted body (with possible <mark> tags), verdict
return findings, score, highlighted_body, verdict