CySecGuardians / body_analyzer.py
princemaxp's picture
Update body_analyzer.py
a235ecf verified
raw
history blame
4.01 kB
import requests
import os
import re
HF_API_KEY = os.getenv("HF_API_KEY")
HF_HEADERS = {"Authorization": f"Bearer {HF_API_KEY}"} if HF_API_KEY else {}
MODELS = {
"ai_detector": "roberta-base-openai-detector",
"sentiment": "finiteautomata/bertweet-base-sentiment-analysis",
"spam": "mrm8488/bert-tiny-finetuned-sms-spam-detection",
}
SUSPICIOUS_PATTERNS = [
r"verify your account",
r"urgent action",
r"click here",
r"reset (your )?password",
r"confirm (your )?identity",
r"bank account",
r"invoice",
r"payment (required|overdue|failed|method expired)",
r"unauthorized login",
r"compromised",
r"final reminder",
r"account (suspended|deactivated|locked)",
r"update your (information|details|billing)",
r"legal action",
r"free trial",
r"limited time offer",
r"click below",
r"winner",
r"congratulations",
r"urgent response",
r"claim your prize",
r"act now",
r"unsubscribe",
r"lottery",
r"risk-free",
]
def query_hf(model, text):
if not HF_API_KEY:
return None
try:
res = requests.post(
f"https://api-inference.huggingface.co/models/{model}",
headers=HF_HEADERS,
json={"inputs": text[:1000]},
timeout=15,
)
return res.json()
except Exception:
return None
def parse_hf_result(result):
# Common shapes: [{"label": "...", "score": ...}] or {"labels":[...], "scores":[...]}
if not result:
return None, None
if isinstance(result, list) and result and isinstance(result[0], dict):
if "label" in result[0] and "score" in result[0]:
return result[0]["label"], result[0]["score"]
if isinstance(result, dict):
labels = result.get("labels") or []
scores = result.get("scores") or []
if labels and scores:
return labels[0], scores[0]
return None, None
def analyze_body(text):
findings = []
score = 0
body_lower = (text or "").lower()
highlighted_body = text or ""
# 1) Suspicious phrases
for pattern in SUSPICIOUS_PATTERNS:
matches = re.findall(pattern, body_lower)
for match in matches:
display = match if isinstance(match, str) else (match[0] if match else "")
if not display:
continue
findings.append(f'Suspicious phrase detected: "{display}"')
score += 15 # tuned down to reduce instant Malicious
highlighted_body = re.sub(
re.escape(display),
f"<mark>{display}</mark>",
highlighted_body,
flags=re.IGNORECASE,
)
# 2) URLs
urls = re.findall(r'https?://[^\s]+', body_lower)
for url in urls:
findings.append(f"Suspicious URL detected: {url}")
score += 10
highlighted_body = re.sub(re.escape(url), f"<mark>{url}</mark>", highlighted_body, flags=re.IGNORECASE)
# 3) AI text detector
label, confidence = parse_hf_result(query_hf(MODELS["ai_detector"], text or ""))
if label:
findings.append(f"Body: AI Detector β†’ {label} (confidence {confidence:.2f})")
# 4) Sentiment
label, confidence = parse_hf_result(query_hf(MODELS["sentiment"], text or ""))
if label:
findings.append(f"Body: Sentiment β†’ {label} (confidence {confidence:.2f})")
if label.lower() == "negative":
score += 10
# 5) Spam detector
label, confidence = parse_hf_result(query_hf(MODELS["spam"], text or ""))
if label:
findings.append(f"Body: Spam Detector β†’ {label} (confidence {confidence:.2f})")
if label.lower() == "spam":
score += 25
# 6) Verdict
if score >= 50:
verdict = "Malicious / Spam"
elif score >= 20:
verdict = "Suspicious"
else:
verdict = "Safe"
findings.append("No suspicious content detected in body.")
return findings, score, highlighted_body, verdict