Spaces:
Sleeping
Sleeping
| import requests | |
| import os | |
| import re | |
| from urllib.parse import quote | |
| SAFE_BROWSING_API_KEY = os.getenv("SAFE_BROWSING_API_KEY") | |
| OTX_API_KEY = os.getenv("OTX_API_KEY") | |
| def analyze_urls(urls): | |
| findings = [] | |
| score = 0 | |
| urls = urls or [] | |
| for url in urls: | |
| # 1) Google Safe Browsing | |
| if SAFE_BROWSING_API_KEY: | |
| try: | |
| payload = { | |
| "client": {"clientId": "email-analysis-tool", "clientVersion": "1.0"}, | |
| "threatInfo": { | |
| "threatTypes": ["MALWARE", "SOCIAL_ENGINEERING", "UNWANTED_SOFTWARE"], | |
| "platformTypes": ["ANY_PLATFORM"], | |
| "threatEntryTypes": ["URL"], | |
| "threatEntries": [{"url": url}], | |
| }, | |
| } | |
| res = requests.post( | |
| f"https://safebrowsing.googleapis.com/v4/threatMatches:find?key={SAFE_BROWSING_API_KEY}", | |
| json=payload, | |
| timeout=15, | |
| ) | |
| data = res.json() | |
| if isinstance(data, dict) and "matches" in data: | |
| findings.append(f"URL: {url} flagged by Google Safe Browsing") | |
| score += 40 | |
| else: | |
| findings.append(f"URL: {url} not flagged (Google Safe Browsing)") | |
| except Exception: | |
| findings.append(f"URL: {url} check failed (Google Safe Browsing)") | |
| # 2) AlienVault OTX | |
| if OTX_API_KEY: | |
| try: | |
| headers = {"X-OTX-API-KEY": OTX_API_KEY} | |
| encoded = quote(url, safe="") | |
| res = requests.get( | |
| f"https://otx.alienvault.com/api/v1/indicators/url/{encoded}/general", | |
| headers=headers, | |
| timeout=15, | |
| ) | |
| if res.status_code == 200: | |
| data = res.json() | |
| if data.get("pulse_info", {}).get("count", 0) > 0: | |
| findings.append(f"URL: {url} flagged in AlienVault OTX") | |
| score += 30 | |
| else: | |
| findings.append(f"URL: {url} not found in AlienVault OTX") | |
| else: | |
| findings.append(f"URL: {url} OTX lookup returned {res.status_code}") | |
| except Exception: | |
| findings.append(f"URL: {url} check failed (AlienVault OTX)") | |
| # 3) URLHaus | |
| try: | |
| res = requests.post("https://urlhaus-api.abuse.ch/v1/url/", data={"url": url}, timeout=15) | |
| data = res.json() | |
| if data.get("query_status") == "ok": | |
| status = data.get("url_status", "malicious/suspicious") | |
| findings.append(f"URL: {url} flagged as {status} (URLHaus)") | |
| score += 30 | |
| else: | |
| findings.append(f"URL: {url} not found in URLHaus") | |
| except Exception: | |
| findings.append(f"URL: {url} check failed (URLHaus)") | |
| # 4) Heuristics | |
| domain_match = re.search(r"https?://([^/]+)/?", url) | |
| if domain_match: | |
| domain = domain_match.group(1) | |
| if len(domain) > 25 or any(char.isdigit() for char in domain.split(".")[0]): | |
| findings.append(f"URL: {url} has suspicious-looking domain") | |
| score += 15 | |
| if "?" in url and len(url.split("?", 1)[1]) > 50: | |
| findings.append(f"URL: {url} has obfuscated query string") | |
| score += 15 | |
| if not findings: | |
| return ["No URLs found in email."], 0 | |
| return findings, score | |