File size: 3,630 Bytes
917dcb0
 
3d7d6cb
b1aff46
917dcb0
 
3d7d6cb
917dcb0
 
 
3d7d6cb
b1aff46
917dcb0
 
b1aff46
917dcb0
 
 
 
 
 
 
 
 
 
 
 
 
 
b1aff46
917dcb0
 
b1aff46
917dcb0
3d7d6cb
917dcb0
 
b1aff46
3d7d6cb
917dcb0
b1aff46
3d7d6cb
ac7f1fe
3d7d6cb
b1aff46
 
 
 
 
 
ac7f1fe
 
3d7d6cb
 
 
ac7f1fe
3d7d6cb
b1aff46
 
 
3d7d6cb
 
b1aff46
3d7d6cb
b1aff46
3d7d6cb
 
b1aff46
 
3d7d6cb
 
 
b1aff46
3d7d6cb
 
b1aff46
3d7d6cb
 
 
 
 
 
b1aff46
3d7d6cb
 
 
 
 
ac7f1fe
3d7d6cb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import requests
import os
import re
from urllib.parse import quote

SAFE_BROWSING_API_KEY = os.getenv("SAFE_BROWSING_API_KEY")
OTX_API_KEY = os.getenv("OTX_API_KEY")

def analyze_urls(urls):
    findings = []
    score = 0
    urls = urls or []

    for url in urls:
        # 1) Google Safe Browsing
        if SAFE_BROWSING_API_KEY:
            try:
                payload = {
                    "client": {"clientId": "email-analysis-tool", "clientVersion": "1.0"},
                    "threatInfo": {
                        "threatTypes": ["MALWARE", "SOCIAL_ENGINEERING", "UNWANTED_SOFTWARE"],
                        "platformTypes": ["ANY_PLATFORM"],
                        "threatEntryTypes": ["URL"],
                        "threatEntries": [{"url": url}],
                    },
                }
                res = requests.post(
                    f"https://safebrowsing.googleapis.com/v4/threatMatches:find?key={SAFE_BROWSING_API_KEY}",
                    json=payload,
                    timeout=15,
                )
                data = res.json()
                if isinstance(data, dict) and "matches" in data:
                    findings.append(f"URL: {url} flagged by Google Safe Browsing")
                    score += 40
                else:
                    findings.append(f"URL: {url} not flagged (Google Safe Browsing)")
            except Exception:
                findings.append(f"URL: {url} check failed (Google Safe Browsing)")

        # 2) AlienVault OTX
        if OTX_API_KEY:
            try:
                headers = {"X-OTX-API-KEY": OTX_API_KEY}
                encoded = quote(url, safe="")
                res = requests.get(
                    f"https://otx.alienvault.com/api/v1/indicators/url/{encoded}/general",
                    headers=headers,
                    timeout=15,
                )
                if res.status_code == 200:
                    data = res.json()
                    if data.get("pulse_info", {}).get("count", 0) > 0:
                        findings.append(f"URL: {url} flagged in AlienVault OTX")
                        score += 30
                    else:
                        findings.append(f"URL: {url} not found in AlienVault OTX")
                else:
                    findings.append(f"URL: {url} OTX lookup returned {res.status_code}")
            except Exception:
                findings.append(f"URL: {url} check failed (AlienVault OTX)")

        # 3) URLHaus
        try:
            res = requests.post("https://urlhaus-api.abuse.ch/v1/url/", data={"url": url}, timeout=15)
            data = res.json()
            if data.get("query_status") == "ok":
                status = data.get("url_status", "malicious/suspicious")
                findings.append(f"URL: {url} flagged as {status} (URLHaus)")
                score += 30
            else:
                findings.append(f"URL: {url} not found in URLHaus")
        except Exception:
            findings.append(f"URL: {url} check failed (URLHaus)")

        # 4) Heuristics
        domain_match = re.search(r"https?://([^/]+)/?", url)
        if domain_match:
            domain = domain_match.group(1)
            if len(domain) > 25 or any(char.isdigit() for char in domain.split(".")[0]):
                findings.append(f"URL: {url} has suspicious-looking domain")
                score += 15
        if "?" in url and len(url.split("?", 1)[1]) > 50:
            findings.append(f"URL: {url} has obfuscated query string")
            score += 15

    if not findings:
        return ["No URLs found in email."], 0

    return findings, score