Spaces:
Sleeping
Sleeping
File size: 5,317 Bytes
3bd60c1 d9f5d53 bedf234 9b5168a d9f5d53 4fb4f18 9b5168a d9f5d53 3bd60c1 9b5168a d9f5d53 089b374 bedf234 3bd60c1 90ca431 3bd60c1 9b5168a 3bd60c1 9b5168a 4fb4f18 bedf234 9b5168a 4fb4f18 9b5168a 4fb4f18 9b5168a 4fb4f18 bedf234 d9f5d53 9b5168a 3bd60c1 a235ecf 3bd60c1 a235ecf 9b5168a d9f5d53 bedf234 3bd60c1 d9f5d53 3bd60c1 bedf234 3bd60c1 9b5168a 3bd60c1 9b5168a 3bd60c1 9b5168a 90ca431 9b5168a 3bd60c1 9b5168a 3bd60c1 9b5168a 3bd60c1 9b5168a 3bd60c1 9b5168a 90ca431 9b5168a d9f5d53 3bd60c1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
# body_analyzer.py
import os
import re
import requests
from typing import List
HF_API_KEY = os.getenv("HF_API_KEY")
HF_HEADERS = {"Authorization": f"Bearer {HF_API_KEY}"} if HF_API_KEY else {}
HF_TIMEOUT = 20 # seconds
# ML model names
PHISHING_MODEL = "cybersectony/phishing-email-detection-distilbert_v2.4.1"
ZERO_SHOT_MODEL = "facebook/bart-large-mnli" # for intent/behavior
# Suspicious phrase patterns
SUSPICIOUS_PATTERNS = [
"verify your account",
"urgent action",
"click here",
"reset password",
"confirm your identity",
"bank account",
"invoice",
"payment required",
"unauthorized login",
"compromised",
"final reminder",
"account suspended",
"account deactivated",
"update your information",
"legal action",
"limited time offer",
"claim your prize",
"verify immediately",
"verify now",
"verify your credentials",
]
# Zero-shot candidate labels for intent/behavior
BEHAVIOR_LABELS = [
"credential harvesting",
"invoice/payment fraud",
"marketing",
"benign",
"malware",
"account takeover",
]
def _call_hf_text_model(model_name: str, text: str):
if not HF_API_KEY:
return None
try:
payload = {"inputs": text}
res = requests.post(
f"https://api-inference.huggingface.co/models/{model_name}",
headers=HF_HEADERS,
json=payload,
timeout=HF_TIMEOUT,
)
return res.json()
except Exception:
return None
def _call_hf_zero_shot(text: str, candidate_labels: List[str]):
if not HF_API_KEY:
return None
try:
payload = {"inputs": text, "parameters": {"candidate_labels": candidate_labels}}
res = requests.post(
f"https://api-inference.huggingface.co/models/{ZERO_SHOT_MODEL}",
headers=HF_HEADERS,
json=payload,
timeout=HF_TIMEOUT,
)
return res.json()
except Exception:
return None
def _parse_hf_phishing_model_output(result):
if not result:
return None, 0.0, {}
if isinstance(result, list) and result and isinstance(result[0], dict):
r0 = result[0]
label = r0.get("label")
score = r0.get("score", 0.0)
return label, float(score), {label: float(score)}
if isinstance(result, dict):
labels = result.get("labels") or result.get("label") or []
scores = result.get("scores") or result.get("score") or []
if isinstance(labels, list) and isinstance(scores, list) and labels and scores:
all_probs = {lab: float(sc) for lab, sc in zip(labels, scores)}
max_lab = max(all_probs.items(), key=lambda x: x[1])
return max_lab[0], float(max_lab[1]), all_probs
return None, 0.0, {}
def analyze_body(subject: str, body: str, urls: list, images: list):
findings = []
score = 0
highlighted_body = (body or "")
combined_lower = ((subject or "") + "\n" + (body or "")).lower()
for pattern in SUSPICIOUS_PATTERNS:
if pattern in combined_lower:
findings.append(f"Suspicious phrase detected: \"{pattern}\"")
score += 18
try:
highlighted_body = re.sub(re.escape(pattern), f"<mark>{pattern}</mark>", highlighted_body, flags=re.IGNORECASE)
except Exception:
pass
# URL checks
for u in urls or []:
findings.append(f"Suspicious URL detected: {u}")
score += 10
try:
highlighted_body = re.sub(re.escape(u), f"<mark>{u}</mark>", highlighted_body, flags=re.IGNORECASE)
except Exception:
pass
# ML phishing model
ml_label = None
ml_conf = 0.0
model_input = "\n".join([subject or "", body or "", "\n".join(urls or [])]).strip()
if model_input and HF_API_KEY:
raw = _call_hf_text_model(PHISHING_MODEL, model_input)
label, conf, _ = _parse_hf_phishing_model_output(raw)
if label:
ml_label = label
ml_conf = conf
findings.append(f"HuggingFace phishing model β {label} (conf {conf:.2f})")
score += int(conf * 100 * 0.9)
# Zero-shot behavior
behavior = None
behavior_conf = 0.0
if HF_API_KEY and model_input:
zs = _call_hf_zero_shot(model_input, BEHAVIOR_LABELS)
try:
if isinstance(zs, dict) and "labels" in zs and "scores" in zs:
behavior = zs["labels"][0]
behavior_conf = float(zs["scores"][0])
findings.append(f"Behavior inference β {behavior} (conf {behavior_conf:.2f})")
if behavior_conf >= 0.7:
score += int(behavior_conf * 30)
except Exception:
pass
if ml_conf >= 0.8 and ("phishing" in (ml_label or "").lower()):
score = max(score, 80)
score = int(max(0, min(score, 100)))
# Verdict
if score >= 70:
verdict = "π¨ Malicious"
elif 50 <= score < 70:
verdict = "β οΈ Suspicious"
elif 30 <= score < 50:
verdict = "π© Spam"
else:
verdict = "β
Safe"
findings.append("No strong phishing signals detected by models/heuristics.")
# Return exactly 4 values
return findings, score, highlighted_body, verdict
|