Spaces:
Sleeping
Sleeping
| # analyze_email_main.py | |
| from parse_email import parse_email | |
| from header_analyzer import analyze_headers | |
| from body_analyzer import analyze_body | |
| from url_analyzer import analyze_urls | |
| import re | |
| def parse_auth_results(auth_results: str): | |
| """Extract SPF, DKIM, and DMARC values from Authentication-Results header.""" | |
| results = {"spf": "unknown", "dkim": "unknown", "dmarc": "unknown"} | |
| if not auth_results: | |
| return results | |
| auth_results = auth_results.lower() | |
| for key in results.keys(): | |
| m = re.search(rf"{key}=([\w-]+)", auth_results) | |
| if m: | |
| results[key] = m.group(1) | |
| return results | |
| def analyze(file_path): | |
| # parse | |
| headers, subject, body, urls, images = parse_email(file_path) | |
| # header analysis | |
| header_findings, header_score, auth_summary = analyze_headers(headers or {}) | |
| # url analysis (keeps previous checks like Safe Browsing / URLHaus) | |
| url_findings, url_score = analyze_urls(urls or []) | |
| # body analysis (subject, body, urls, images) | |
| body_findings, body_score, highlighted_body, body_verdict = analyze_body(subject, body, urls or [], images or []) | |
| # combine scores | |
| total_score = 0 | |
| total_score += (header_score or 0) | |
| total_score += (body_score or 0) * 1.0 # weight body normally | |
| total_score += (url_score or 0) * 1.2 # URLs a bit heavier | |
| # clamp | |
| try: | |
| total_score = float(total_score) | |
| except Exception: | |
| total_score = 0.0 | |
| total_score = max(0.0, min(total_score, 100.0)) | |
| total_score_rounded = round(total_score) | |
| # final verdict | |
| if total_score >= 70: | |
| verdict = "π¨ Malicious" | |
| elif 50 <= total_score < 70: | |
| verdict = "β οΈ Suspicious" | |
| elif 30 <= total_score < 50: | |
| verdict = "π© Spam" | |
| else: | |
| verdict = "β Safe" | |
| # attack type heuristics | |
| attack_type = "General Phishing" | |
| combined_text_lower = ((subject or "") + "\n" + (body or "")).lower() | |
| if any(k in combined_text_lower for k in ["invoice", "payment", "wire transfer", "bank details"]): | |
| attack_type = "Invoice/Payment Fraud (BEC)" | |
| elif any(k in combined_text_lower for k in ["password", "verify", "account", "login", "credentials"]): | |
| attack_type = "Credential Harvesting (Phishing)" | |
| elif any("reply-to domain mismatch" in f.lower() for f in header_findings): | |
| attack_type = "Business Email Compromise (BEC)" | |
| elif any("spam" in f.lower() for f in body_findings + url_findings): | |
| attack_type = "Spam / Marketing" | |
| elif verdict == "β Safe": | |
| attack_type = "Benign / Normal Email" | |
| # tags | |
| tags = [] | |
| for finding in (header_findings + body_findings + url_findings): | |
| fl = finding.lower() | |
| if "domain" in fl: | |
| tags.append("Suspicious Sender Domain") | |
| if "phishing" in fl or "malicious url" in fl or "urlhaus" in fl: | |
| tags.append("Phishing / Malicious URL") | |
| if "urgent" in fl or "suspicious phrase" in fl: | |
| tags.append("Urgent Language") | |
| if "spam" in fl or "marketing" in fl: | |
| tags.append("Spam Tone") | |
| if "spf" in fl or "dkim" in fl or "dmarc" in fl: | |
| tags.append("Auth Failures (SPF/DKIM/DMARC)") | |
| if "ocr" in fl or "extracted text" in fl: | |
| tags.append("Image-based content detected") | |
| summary = { | |
| "Final Verdict": verdict, | |
| "Attack Type": attack_type, | |
| "Attack Score": total_score_rounded, | |
| "Main Tags": ", ".join(sorted(set(tags))) if tags else "No special tags", | |
| } | |
| details = { | |
| "Header Findings": header_findings or [], | |
| "Body Findings": body_findings or [], | |
| "URL Findings": url_findings or [], | |
| "Highlighted Body": highlighted_body or "", | |
| "Auth Results": auth_summary or {}, # <-- NEW: show SPF, DKIM, DMARC results | |
| } | |
| return summary, details | |
| if __name__ == "__main__": | |
| fp = "sample.eml" | |
| s, d = analyze(fp) | |
| print("SUMMARY:", s) | |
| print("DETAILS:", d) | |