Spaces:
Sleeping
Sleeping
File size: 3,630 Bytes
917dcb0 3d7d6cb b1aff46 917dcb0 3d7d6cb 917dcb0 3d7d6cb b1aff46 917dcb0 b1aff46 917dcb0 b1aff46 917dcb0 b1aff46 917dcb0 3d7d6cb 917dcb0 b1aff46 3d7d6cb 917dcb0 b1aff46 3d7d6cb ac7f1fe 3d7d6cb b1aff46 ac7f1fe 3d7d6cb ac7f1fe 3d7d6cb b1aff46 3d7d6cb b1aff46 3d7d6cb b1aff46 3d7d6cb b1aff46 3d7d6cb b1aff46 3d7d6cb b1aff46 3d7d6cb b1aff46 3d7d6cb ac7f1fe 3d7d6cb |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
import requests
import os
import re
from urllib.parse import quote
SAFE_BROWSING_API_KEY = os.getenv("SAFE_BROWSING_API_KEY")
OTX_API_KEY = os.getenv("OTX_API_KEY")
def analyze_urls(urls):
findings = []
score = 0
urls = urls or []
for url in urls:
# 1) Google Safe Browsing
if SAFE_BROWSING_API_KEY:
try:
payload = {
"client": {"clientId": "email-analysis-tool", "clientVersion": "1.0"},
"threatInfo": {
"threatTypes": ["MALWARE", "SOCIAL_ENGINEERING", "UNWANTED_SOFTWARE"],
"platformTypes": ["ANY_PLATFORM"],
"threatEntryTypes": ["URL"],
"threatEntries": [{"url": url}],
},
}
res = requests.post(
f"https://safebrowsing.googleapis.com/v4/threatMatches:find?key={SAFE_BROWSING_API_KEY}",
json=payload,
timeout=15,
)
data = res.json()
if isinstance(data, dict) and "matches" in data:
findings.append(f"URL: {url} flagged by Google Safe Browsing")
score += 40
else:
findings.append(f"URL: {url} not flagged (Google Safe Browsing)")
except Exception:
findings.append(f"URL: {url} check failed (Google Safe Browsing)")
# 2) AlienVault OTX
if OTX_API_KEY:
try:
headers = {"X-OTX-API-KEY": OTX_API_KEY}
encoded = quote(url, safe="")
res = requests.get(
f"https://otx.alienvault.com/api/v1/indicators/url/{encoded}/general",
headers=headers,
timeout=15,
)
if res.status_code == 200:
data = res.json()
if data.get("pulse_info", {}).get("count", 0) > 0:
findings.append(f"URL: {url} flagged in AlienVault OTX")
score += 30
else:
findings.append(f"URL: {url} not found in AlienVault OTX")
else:
findings.append(f"URL: {url} OTX lookup returned {res.status_code}")
except Exception:
findings.append(f"URL: {url} check failed (AlienVault OTX)")
# 3) URLHaus
try:
res = requests.post("https://urlhaus-api.abuse.ch/v1/url/", data={"url": url}, timeout=15)
data = res.json()
if data.get("query_status") == "ok":
status = data.get("url_status", "malicious/suspicious")
findings.append(f"URL: {url} flagged as {status} (URLHaus)")
score += 30
else:
findings.append(f"URL: {url} not found in URLHaus")
except Exception:
findings.append(f"URL: {url} check failed (URLHaus)")
# 4) Heuristics
domain_match = re.search(r"https?://([^/]+)/?", url)
if domain_match:
domain = domain_match.group(1)
if len(domain) > 25 or any(char.isdigit() for char in domain.split(".")[0]):
findings.append(f"URL: {url} has suspicious-looking domain")
score += 15
if "?" in url and len(url.split("?", 1)[1]) > 50:
findings.append(f"URL: {url} has obfuscated query string")
score += 15
if not findings:
return ["No URLs found in email."], 0
return findings, score
|