Spaces:
Sleeping
Sleeping
Update analyze_email_main.py
Browse files- analyze_email_main.py +33 -50
analyze_email_main.py
CHANGED
|
@@ -4,23 +4,24 @@ from body_analyzer import analyze_body
|
|
| 4 |
from url_analyzer import analyze_urls
|
| 5 |
|
| 6 |
def analyze(file_path):
|
| 7 |
-
#
|
| 8 |
-
headers, body, urls = parse_email(file_path)
|
| 9 |
|
| 10 |
-
#
|
| 11 |
-
header_findings, header_score = analyze_headers(headers)
|
|
|
|
|
|
|
| 12 |
|
| 13 |
-
#
|
| 14 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 15 |
|
| 16 |
-
#
|
| 17 |
-
url_findings, url_score = analyze_urls(urls)
|
| 18 |
-
|
| 19 |
-
# --- Calculate total score ---
|
| 20 |
-
total_score = header_score + body_score + (url_score * 1.5)
|
| 21 |
-
total_score = min(total_score, 100)
|
| 22 |
-
|
| 23 |
-
# --- Determine final verdict ---
|
| 24 |
if total_score >= 70:
|
| 25 |
verdict = "π¨ Malicious"
|
| 26 |
elif 50 <= total_score < 70:
|
|
@@ -30,9 +31,9 @@ def analyze(file_path):
|
|
| 30 |
else:
|
| 31 |
verdict = "β
Safe"
|
| 32 |
|
| 33 |
-
#
|
| 34 |
-
body_lower = body.lower()
|
| 35 |
-
attack_type = "General Phishing"
|
| 36 |
if any(word in body_lower for word in ["invoice", "payment", "wire transfer", "bank details"]):
|
| 37 |
attack_type = "Invoice/Payment Fraud (BEC)"
|
| 38 |
elif any(word in body_lower for word in ["password", "verify", "account", "login", "credentials"]):
|
|
@@ -44,51 +45,33 @@ def analyze(file_path):
|
|
| 44 |
elif verdict == "β
Safe":
|
| 45 |
attack_type = "Benign / Normal Email"
|
| 46 |
|
| 47 |
-
#
|
| 48 |
tags = []
|
| 49 |
-
for finding in header_findings + body_findings + url_findings:
|
| 50 |
-
|
| 51 |
-
if "domain" in
|
| 52 |
tags.append("Suspicious Sender Domain")
|
| 53 |
-
if "phishing" in
|
| 54 |
tags.append("Phishing / Malicious URL")
|
| 55 |
-
if "urgent" in
|
| 56 |
tags.append("Urgent Language")
|
| 57 |
-
if "spam" in
|
| 58 |
tags.append("Spam Tone")
|
|
|
|
|
|
|
| 59 |
|
| 60 |
-
# --- Summary report ---
|
| 61 |
summary = {
|
| 62 |
"Final Verdict": verdict,
|
| 63 |
"Attack Type": attack_type,
|
| 64 |
-
"Attack Score":
|
| 65 |
-
"Main Tags": ", ".join(sorted(set(tags))) if tags else "No special tags"
|
| 66 |
}
|
| 67 |
|
| 68 |
-
# --- Detailed findings ---
|
| 69 |
details = {
|
| 70 |
-
"Header Findings": header_findings,
|
| 71 |
-
"Body Findings": body_findings,
|
| 72 |
-
"URL Findings": url_findings,
|
| 73 |
-
"Highlighted Body": highlighted_body
|
| 74 |
}
|
| 75 |
|
| 76 |
return summary, details
|
| 77 |
-
|
| 78 |
-
# --- Local testing ---
|
| 79 |
-
if __name__ == "__main__":
|
| 80 |
-
file_path = "sample.eml"
|
| 81 |
-
summary, details = analyze(file_path)
|
| 82 |
-
|
| 83 |
-
print("==== SUMMARY ====")
|
| 84 |
-
for k, v in summary.items():
|
| 85 |
-
print(f"{k}: {v}")
|
| 86 |
-
|
| 87 |
-
print("\n==== DETAILS ====")
|
| 88 |
-
for section, findings in details.items():
|
| 89 |
-
print(f"\n-- {section} --")
|
| 90 |
-
if isinstance(findings, list):
|
| 91 |
-
for f in findings:
|
| 92 |
-
print(f)
|
| 93 |
-
else:
|
| 94 |
-
print(findings)
|
|
|
|
| 4 |
from url_analyzer import analyze_urls
|
| 5 |
|
| 6 |
def analyze(file_path):
|
| 7 |
+
# Parse
|
| 8 |
+
headers, body, urls = parse_email(file_path or "")
|
| 9 |
|
| 10 |
+
# Analyze
|
| 11 |
+
header_findings, header_score = analyze_headers(headers or {})
|
| 12 |
+
body_findings, body_score, highlighted_body, body_verdict = analyze_body(body or "")
|
| 13 |
+
url_findings, url_score = analyze_urls(urls or [])
|
| 14 |
|
| 15 |
+
# Score
|
| 16 |
+
total_score = (header_score or 0) + (body_score or 0) + (url_score or 0) * 1.5
|
| 17 |
+
try:
|
| 18 |
+
total_score = float(total_score)
|
| 19 |
+
except Exception:
|
| 20 |
+
total_score = 0.0
|
| 21 |
+
total_score = max(0.0, min(total_score, 100.0))
|
| 22 |
+
total_score_rounded = round(total_score)
|
| 23 |
|
| 24 |
+
# Verdict
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 25 |
if total_score >= 70:
|
| 26 |
verdict = "π¨ Malicious"
|
| 27 |
elif 50 <= total_score < 70:
|
|
|
|
| 31 |
else:
|
| 32 |
verdict = "β
Safe"
|
| 33 |
|
| 34 |
+
# Attack type
|
| 35 |
+
body_lower = (body or "").lower()
|
| 36 |
+
attack_type = "General Phishing"
|
| 37 |
if any(word in body_lower for word in ["invoice", "payment", "wire transfer", "bank details"]):
|
| 38 |
attack_type = "Invoice/Payment Fraud (BEC)"
|
| 39 |
elif any(word in body_lower for word in ["password", "verify", "account", "login", "credentials"]):
|
|
|
|
| 45 |
elif verdict == "β
Safe":
|
| 46 |
attack_type = "Benign / Normal Email"
|
| 47 |
|
| 48 |
+
# Tags
|
| 49 |
tags = []
|
| 50 |
+
for finding in (header_findings + body_findings + url_findings):
|
| 51 |
+
fl = finding.lower()
|
| 52 |
+
if "domain" in fl:
|
| 53 |
tags.append("Suspicious Sender Domain")
|
| 54 |
+
if "phishing" in fl or "malicious url" in fl or "urlhaus" in fl:
|
| 55 |
tags.append("Phishing / Malicious URL")
|
| 56 |
+
if "urgent" in fl or "suspicious phrase" in fl:
|
| 57 |
tags.append("Urgent Language")
|
| 58 |
+
if "spam" in fl or "marketing" in fl:
|
| 59 |
tags.append("Spam Tone")
|
| 60 |
+
if "spf" in fl or "dkim" in fl or "dmarc" in fl:
|
| 61 |
+
tags.append("Auth Failures (SPF/DKIM/DMARC)")
|
| 62 |
|
|
|
|
| 63 |
summary = {
|
| 64 |
"Final Verdict": verdict,
|
| 65 |
"Attack Type": attack_type,
|
| 66 |
+
"Attack Score": total_score_rounded,
|
| 67 |
+
"Main Tags": ", ".join(sorted(set(tags))) if tags else "No special tags",
|
| 68 |
}
|
| 69 |
|
|
|
|
| 70 |
details = {
|
| 71 |
+
"Header Findings": header_findings or [],
|
| 72 |
+
"Body Findings": body_findings or [],
|
| 73 |
+
"URL Findings": url_findings or [],
|
| 74 |
+
"Highlighted Body": highlighted_body or "",
|
| 75 |
}
|
| 76 |
|
| 77 |
return summary, details
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|