File size: 5,317 Bytes
3bd60c1
d9f5d53
bedf234
9b5168a
 
d9f5d53
4fb4f18
 
9b5168a
d9f5d53
3bd60c1
 
9b5168a
d9f5d53
089b374
bedf234
3bd60c1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
90ca431
 
3bd60c1
9b5168a
3bd60c1
 
 
 
 
 
9b5168a
 
 
4fb4f18
 
bedf234
9b5168a
4fb4f18
9b5168a
4fb4f18
9b5168a
 
4fb4f18
bedf234
 
 
d9f5d53
9b5168a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3bd60c1
a235ecf
3bd60c1
 
 
 
 
 
 
 
 
 
 
 
 
 
a235ecf
9b5168a
d9f5d53
bedf234
3bd60c1
d9f5d53
3bd60c1
bedf234
3bd60c1
9b5168a
3bd60c1
9b5168a
 
 
 
 
3bd60c1
9b5168a
 
90ca431
9b5168a
 
 
 
3bd60c1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9b5168a
 
 
3bd60c1
 
 
 
 
 
 
 
 
 
 
 
9b5168a
3bd60c1
9b5168a
3bd60c1
9b5168a
 
 
 
 
 
90ca431
9b5168a
 
d9f5d53
3bd60c1
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# body_analyzer.py
import os
import re
import requests
from typing import List

HF_API_KEY = os.getenv("HF_API_KEY")
HF_HEADERS = {"Authorization": f"Bearer {HF_API_KEY}"} if HF_API_KEY else {}
HF_TIMEOUT = 20  # seconds

# ML model names
PHISHING_MODEL = "cybersectony/phishing-email-detection-distilbert_v2.4.1"
ZERO_SHOT_MODEL = "facebook/bart-large-mnli"  # for intent/behavior

# Suspicious phrase patterns
SUSPICIOUS_PATTERNS = [
    "verify your account",
    "urgent action",
    "click here",
    "reset password",
    "confirm your identity",
    "bank account",
    "invoice",
    "payment required",
    "unauthorized login",
    "compromised",
    "final reminder",
    "account suspended",
    "account deactivated",
    "update your information",
    "legal action",
    "limited time offer",
    "claim your prize",
    "verify immediately",
    "verify now",
    "verify your credentials",
]

# Zero-shot candidate labels for intent/behavior
BEHAVIOR_LABELS = [
    "credential harvesting",
    "invoice/payment fraud",
    "marketing",
    "benign",
    "malware",
    "account takeover",
]

def _call_hf_text_model(model_name: str, text: str):
    if not HF_API_KEY:
        return None
    try:
        payload = {"inputs": text}
        res = requests.post(
            f"https://api-inference.huggingface.co/models/{model_name}",
            headers=HF_HEADERS,
            json=payload,
            timeout=HF_TIMEOUT,
        )
        return res.json()
    except Exception:
        return None

def _call_hf_zero_shot(text: str, candidate_labels: List[str]):
    if not HF_API_KEY:
        return None
    try:
        payload = {"inputs": text, "parameters": {"candidate_labels": candidate_labels}}
        res = requests.post(
            f"https://api-inference.huggingface.co/models/{ZERO_SHOT_MODEL}",
            headers=HF_HEADERS,
            json=payload,
            timeout=HF_TIMEOUT,
        )
        return res.json()
    except Exception:
        return None

def _parse_hf_phishing_model_output(result):
    if not result:
        return None, 0.0, {}
    if isinstance(result, list) and result and isinstance(result[0], dict):
        r0 = result[0]
        label = r0.get("label")
        score = r0.get("score", 0.0)
        return label, float(score), {label: float(score)}
    if isinstance(result, dict):
        labels = result.get("labels") or result.get("label") or []
        scores = result.get("scores") or result.get("score") or []
        if isinstance(labels, list) and isinstance(scores, list) and labels and scores:
            all_probs = {lab: float(sc) for lab, sc in zip(labels, scores)}
            max_lab = max(all_probs.items(), key=lambda x: x[1])
            return max_lab[0], float(max_lab[1]), all_probs
    return None, 0.0, {}

def analyze_body(subject: str, body: str, urls: list, images: list):
    findings = []
    score = 0
    highlighted_body = (body or "")

    combined_lower = ((subject or "") + "\n" + (body or "")).lower()
    for pattern in SUSPICIOUS_PATTERNS:
        if pattern in combined_lower:
            findings.append(f"Suspicious phrase detected: \"{pattern}\"")
            score += 18
            try:
                highlighted_body = re.sub(re.escape(pattern), f"<mark>{pattern}</mark>", highlighted_body, flags=re.IGNORECASE)
            except Exception:
                pass

    # URL checks
    for u in urls or []:
        findings.append(f"Suspicious URL detected: {u}")
        score += 10
        try:
            highlighted_body = re.sub(re.escape(u), f"<mark>{u}</mark>", highlighted_body, flags=re.IGNORECASE)
        except Exception:
            pass

    # ML phishing model
    ml_label = None
    ml_conf = 0.0
    model_input = "\n".join([subject or "", body or "", "\n".join(urls or [])]).strip()
    if model_input and HF_API_KEY:
        raw = _call_hf_text_model(PHISHING_MODEL, model_input)
        label, conf, _ = _parse_hf_phishing_model_output(raw)
        if label:
            ml_label = label
            ml_conf = conf
            findings.append(f"HuggingFace phishing model β†’ {label} (conf {conf:.2f})")
            score += int(conf * 100 * 0.9)

    # Zero-shot behavior
    behavior = None
    behavior_conf = 0.0
    if HF_API_KEY and model_input:
        zs = _call_hf_zero_shot(model_input, BEHAVIOR_LABELS)
        try:
            if isinstance(zs, dict) and "labels" in zs and "scores" in zs:
                behavior = zs["labels"][0]
                behavior_conf = float(zs["scores"][0])
                findings.append(f"Behavior inference β†’ {behavior} (conf {behavior_conf:.2f})")
                if behavior_conf >= 0.7:
                    score += int(behavior_conf * 30)
        except Exception:
            pass

    if ml_conf >= 0.8 and ("phishing" in (ml_label or "").lower()):
        score = max(score, 80)

    score = int(max(0, min(score, 100)))

    # Verdict
    if score >= 70:
        verdict = "🚨 Malicious"
    elif 50 <= score < 70:
        verdict = "⚠️ Suspicious"
    elif 30 <= score < 50:
        verdict = "πŸ“© Spam"
    else:
        verdict = "βœ… Safe"
        findings.append("No strong phishing signals detected by models/heuristics.")

    # Return exactly 4 values
    return findings, score, highlighted_body, verdict