CySecGuardians / analyze_email_main.py
princemaxp's picture
Update analyze_email_main.py
da51597 verified
# analyze_email_main.py
from parse_email import parse_email
from header_analyzer import analyze_headers
from body_analyzer import analyze_body
from url_analyzer import analyze_urls
import re
def parse_auth_results(auth_results: str):
"""Extract SPF, DKIM, and DMARC values from Authentication-Results header."""
results = {"spf": "unknown", "dkim": "unknown", "dmarc": "unknown"}
if not auth_results:
return results
auth_results = auth_results.lower()
for key in results.keys():
m = re.search(rf"{key}=([\w-]+)", auth_results)
if m:
results[key] = m.group(1)
return results
def analyze(file_path):
# parse
headers, subject, body, urls, images = parse_email(file_path)
# header analysis
header_findings, header_score, auth_summary = analyze_headers(headers or {})
# url analysis (keeps previous checks like Safe Browsing / URLHaus)
url_findings, url_score = analyze_urls(urls or [])
# body analysis (subject, body, urls, images)
body_findings, body_score, highlighted_body, body_verdict = analyze_body(subject, body, urls or [], images or [])
# combine scores
total_score = 0
total_score += (header_score or 0)
total_score += (body_score or 0) * 1.0 # weight body normally
total_score += (url_score or 0) * 1.2 # URLs a bit heavier
# clamp
try:
total_score = float(total_score)
except Exception:
total_score = 0.0
total_score = max(0.0, min(total_score, 100.0))
total_score_rounded = round(total_score)
# final verdict
if total_score >= 70:
verdict = "🚨 Malicious"
elif 50 <= total_score < 70:
verdict = "⚠️ Suspicious"
elif 30 <= total_score < 50:
verdict = "πŸ“© Spam"
else:
verdict = "βœ… Safe"
# attack type heuristics
attack_type = "General Phishing"
combined_text_lower = ((subject or "") + "\n" + (body or "")).lower()
if any(k in combined_text_lower for k in ["invoice", "payment", "wire transfer", "bank details"]):
attack_type = "Invoice/Payment Fraud (BEC)"
elif any(k in combined_text_lower for k in ["password", "verify", "account", "login", "credentials"]):
attack_type = "Credential Harvesting (Phishing)"
elif any("reply-to domain mismatch" in f.lower() for f in header_findings):
attack_type = "Business Email Compromise (BEC)"
elif any("spam" in f.lower() for f in body_findings + url_findings):
attack_type = "Spam / Marketing"
elif verdict == "βœ… Safe":
attack_type = "Benign / Normal Email"
# tags
tags = []
for finding in (header_findings + body_findings + url_findings):
fl = finding.lower()
if "domain" in fl:
tags.append("Suspicious Sender Domain")
if "phishing" in fl or "malicious url" in fl or "urlhaus" in fl:
tags.append("Phishing / Malicious URL")
if "urgent" in fl or "suspicious phrase" in fl:
tags.append("Urgent Language")
if "spam" in fl or "marketing" in fl:
tags.append("Spam Tone")
if "spf" in fl or "dkim" in fl or "dmarc" in fl:
tags.append("Auth Failures (SPF/DKIM/DMARC)")
if "ocr" in fl or "extracted text" in fl:
tags.append("Image-based content detected")
summary = {
"Final Verdict": verdict,
"Attack Type": attack_type,
"Attack Score": total_score_rounded,
"Main Tags": ", ".join(sorted(set(tags))) if tags else "No special tags",
}
details = {
"Header Findings": header_findings or [],
"Body Findings": body_findings or [],
"URL Findings": url_findings or [],
"Highlighted Body": highlighted_body or "",
"Auth Results": auth_summary or {}, # <-- NEW: show SPF, DKIM, DMARC results
}
return summary, details
if __name__ == "__main__":
fp = "sample.eml"
s, d = analyze(fp)
print("SUMMARY:", s)
print("DETAILS:", d)