|
|
import json |
|
|
import os |
|
|
from concurrent.futures import ThreadPoolExecutor |
|
|
from pathlib import Path |
|
|
from typing import Any |
|
|
|
|
|
import cachetools |
|
|
import gradio as gr |
|
|
import requests |
|
|
import urllib3 |
|
|
from dns import message |
|
|
|
|
|
|
|
|
_DNS_SERVER = "https://dns.google/dns-query" |
|
|
_DNS_RECORD_TYPES = [ |
|
|
"A", |
|
|
"AAAA", |
|
|
"CNAME", |
|
|
"MX", |
|
|
"NS", |
|
|
"SOA", |
|
|
"TXT", |
|
|
"RP", |
|
|
"LOC", |
|
|
"CAA", |
|
|
"SPF", |
|
|
"SRV", |
|
|
"NSEC", |
|
|
"RRSIG", |
|
|
] |
|
|
|
|
|
_COMMON_SUBDOMAINS_TXT_PATH = Path("./subdomains/subdomains.txt") |
|
|
|
|
|
_CACHE_MAX_SIZE = 4096 |
|
|
_CACHE_TTL_SECONDS = 3600 |
|
|
|
|
|
|
|
|
@cachetools.cached( |
|
|
cache=cachetools.TTLCache(maxsize=_CACHE_MAX_SIZE, ttl=_CACHE_TTL_SECONDS), |
|
|
) |
|
|
def get_geolocation(ip: str) -> dict[str, Any] | str: |
|
|
"""Get location information from an ip address. |
|
|
|
|
|
Returns the following information on an ip address: |
|
|
1. IPv4 |
|
|
2. city |
|
|
4. country_code |
|
|
5. country_name |
|
|
6. latitude |
|
|
7. longitude |
|
|
8. postal |
|
|
9. state |
|
|
|
|
|
Example: |
|
|
>>> from pprint import pprint |
|
|
>>> pprint(get_location("103.100.104.0")) |
|
|
... {'IPv4': '103.100.104.0', |
|
|
'city': None, |
|
|
'country_code': 'NZ', |
|
|
'country_name': 'New Zealand', |
|
|
'latitude': -41, |
|
|
'longitude': 174, |
|
|
'postal': None, |
|
|
'state': None} |
|
|
|
|
|
Args: |
|
|
ip: ip address |
|
|
|
|
|
Returns: |
|
|
Location information on the ip address. |
|
|
""" |
|
|
try: |
|
|
return requests.get( |
|
|
f"https://geolocation-db.com/json/{ip.strip()}", |
|
|
timeout=1, |
|
|
).json() |
|
|
except Exception as e: |
|
|
return str(e) |
|
|
|
|
|
|
|
|
def _request_dns_record( |
|
|
domain: str, |
|
|
record_type: str, |
|
|
timeout: float = 0.5, |
|
|
) -> list[str]: |
|
|
"""Utility to build dns resolve requests that do not use port 53. |
|
|
|
|
|
Args: |
|
|
domain: domain to investigate |
|
|
record_type: record type |
|
|
|
|
|
Returns: |
|
|
Information about the dns record type for the domain. |
|
|
""" |
|
|
q = message.make_query(domain, record_type) |
|
|
response = requests.post( |
|
|
_DNS_SERVER, |
|
|
headers={ |
|
|
"Content-Type": "application/dns-message", |
|
|
"Accept": "application/dns-message", |
|
|
}, |
|
|
data=q.to_wire(), |
|
|
verify=True, |
|
|
timeout=timeout, |
|
|
) |
|
|
dns_message = message.from_wire(response.content) |
|
|
return [str(rdata) for rdata in dns_message.answer[0]] if dns_message.answer else [] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@cachetools.cached( |
|
|
cache=cachetools.TTLCache(maxsize=_CACHE_MAX_SIZE, ttl=_CACHE_TTL_SECONDS), |
|
|
) |
|
|
def enumerate_dns(domain_name: str) -> dict[str, Any] | None: |
|
|
r"""Enumerates information about a specific domain's DNS configuration. |
|
|
|
|
|
Information collected about the domain name: |
|
|
1. A records: the IPv4 associated with the domain |
|
|
2. AAAA records: the IPv6 associated with the domain |
|
|
3. CAA records: used by owners to specify which Certificate Authorities |
|
|
are authorized to issue SSL/TLS certificates for their domains. |
|
|
4. CNAME records: alias of one name to another - the DNS lookup will |
|
|
continue by retrying the lookup with the new name. |
|
|
5. LOC records: geographic location associated with a domain name. |
|
|
6. MX records: associated email servers to the domain. |
|
|
7. NS records: DNS servers that are authoritative for a particular domain. |
|
|
These may be use to inquire information about the domain. |
|
|
8. SOA records: defines authoritative information about a DNS zone, |
|
|
including zone transfers and cache expiration. |
|
|
9. TXT records: used for domain verification and email security. |
|
|
10. RP records: the responsible person for a domain. |
|
|
11. SPF records: defines authorized email servers. |
|
|
12. SRV records: specifies location of specific services |
|
|
(port and host) for the domain. |
|
|
14. NSEC records: proves non-existence of DNS records |
|
|
and prevents zone enumeration. |
|
|
15. RRSIG records: contains cryptographic signatures for DNSSEC-signed |
|
|
records, providing authentication and integrity. |
|
|
|
|
|
Example: |
|
|
>>> from pprint import pprint |
|
|
>>> pprint(enumerate_dns("youtube.com")) |
|
|
... {'A': 'youtube.com. 300 IN A 142.250.200.142', |
|
|
'AAAA': 'youtube.com. 286 IN AAAA 2a00:1450:4003:80f::200e', |
|
|
'CAA': 'youtube.com. 14352 IN CAA 0 issue "pki.goog"', |
|
|
'CNAME': None, |
|
|
'LOC': None, |
|
|
'MX': 'youtube.com. 300 IN MX 0 smtp.google.com.', |
|
|
'NS': 'youtube.com. 21600 IN NS ns4.google.com.\n' |
|
|
'youtube.com. 21600 IN NS ns1.google.com.\n' |
|
|
'youtube.com. 21600 IN NS ns2.google.com.\n' |
|
|
'youtube.com. 21600 IN NS ns3.google.com.', |
|
|
'NSEC': None, |
|
|
'RP': None, |
|
|
'RRSIG': None, |
|
|
'SOA': 'youtube.com. 60 IN SOA ns1.google.com. dns-admin.google.com. ' |
|
|
'766113658 900 900 1800 60', |
|
|
'SPF': None, |
|
|
'SRV': None, |
|
|
'TXT': 'youtube.com. 3586 IN TXT "v=spf1 include:google.com mx -all"\n' |
|
|
'youtube.com. 3586 IN TXT ' |
|
|
'"facebook-domain-verification=64jdes7le4h7e7lfpi22rijygx58j1"\n' |
|
|
'youtube.com. 3586 IN TXT ' |
|
|
'"google-site-verification=QtQWEwHWM8tHiJ4s-jJWzEQrD_fF3luPnpzNDH-Nw-w"'} |
|
|
|
|
|
Args: |
|
|
domain_name: domain name for which to |
|
|
enumerate the DNS configuration. |
|
|
|
|
|
Returns: |
|
|
The domain's DNS configuration. |
|
|
""" |
|
|
enumeration = {} |
|
|
for record_type in _DNS_RECORD_TYPES: |
|
|
try: |
|
|
record = _request_dns_record(domain_name.strip(), record_type, timeout=1) |
|
|
if record: |
|
|
enumeration[record_type] = record |
|
|
except Exception as e: |
|
|
enumeration[record_type] = [str(e)] |
|
|
return enumeration if enumeration else None |
|
|
|
|
|
|
|
|
def resolve_subdomain_ipv4(domain: str) -> str | None: |
|
|
"""Resolve the IPv4 address of a domain. |
|
|
|
|
|
Args: |
|
|
domain: domain name |
|
|
|
|
|
Returns: |
|
|
The domain is returned provided |
|
|
it was resolved. Otherwise nothing |
|
|
is returned. |
|
|
""" |
|
|
try: |
|
|
ipv4 = _request_dns_record(domain, "A", timeout=0.6) |
|
|
if ipv4: |
|
|
return domain |
|
|
msg = "Cannot resolve it: it is likely non-existing" |
|
|
raise Exception(msg) |
|
|
except Exception: |
|
|
return None |
|
|
|
|
|
|
|
|
@cachetools.cached( |
|
|
cache=cachetools.TTLCache(maxsize=_CACHE_MAX_SIZE, ttl=_CACHE_TTL_SECONDS), |
|
|
) |
|
|
def scrap_subdomains_for_domain(domain_name: str) -> list[str]: |
|
|
"""Retrieves subdomains associated to a domain if any. |
|
|
|
|
|
The information retrieved from a domain is its subdomains |
|
|
provided they are the top 1000 subdomain prefixes as |
|
|
indicated by https://github.com/rbsec/dnscan/tree/master |
|
|
|
|
|
Importantly, it finds subdomains only if their prefixes |
|
|
are along the top 1000 most common. Hence, it may not |
|
|
yield all the subdomains associated to the domain. |
|
|
|
|
|
Example: |
|
|
>>> scrap_subdomains_for_domain("github.com") |
|
|
... ['www.github.com', 'smtp.github.com', 'ns1.github.com', |
|
|
'ns2.github.com','autodiscover.github.com', 'test.github.com', |
|
|
'blog.github.com', 'admin.github.com', 'support.github.com', |
|
|
'docs.github.com', 'shop.github.com', 'wiki.github.com', |
|
|
'api.github.com', 'live.github.com', 'help.github.com', |
|
|
'jobs.github.com', 'services.github.com', 'de.github.com', |
|
|
'cs.github.com', 'fr.github.com', 'ssh.github.com', |
|
|
'partner.github.com', 'community.github.com', |
|
|
'mailer.github.com', 'training.github.com', ...] |
|
|
|
|
|
Args: |
|
|
domain_name: domain name for which to retrieve a |
|
|
list of subdomains |
|
|
|
|
|
Returns: |
|
|
List of subdomains if any. |
|
|
""" |
|
|
try: |
|
|
with open(_COMMON_SUBDOMAINS_TXT_PATH) as file: |
|
|
subdomains = [line.strip() for line in file if line.strip()] |
|
|
except FileNotFoundError: |
|
|
return [] |
|
|
|
|
|
potential_subdomains = [ |
|
|
f"{subdomain}.{domain_name.strip()}" for subdomain in subdomains |
|
|
] |
|
|
with ThreadPoolExecutor(max_workers=None) as executor: |
|
|
results = executor.map(resolve_subdomain_ipv4, potential_subdomains) |
|
|
return [domain for domain in results if domain] |
|
|
|
|
|
|
|
|
@cachetools.cached( |
|
|
cache=cachetools.TTLCache(maxsize=_CACHE_MAX_SIZE, ttl=_CACHE_TTL_SECONDS), |
|
|
) |
|
|
def retrieve_ioc_from_threatfox(potentially_ioc: str) -> str: |
|
|
r"""Retrieves information about a potential IoC from ThreatFox. |
|
|
|
|
|
It may be used to retrieve information of indicators of compromise |
|
|
(IOCs) associated with malware, with the infosec community, AV |
|
|
vendors and cyber threat intelligence providers. |
|
|
|
|
|
Examples: |
|
|
>>> retrieve_ioc_from_threatfox("139.180.203.104") |
|
|
... { |
|
|
"query_status": "ok", |
|
|
"data": [ |
|
|
{ |
|
|
"id": "12", |
|
|
"ioc": "139.180.203.104:443", |
|
|
"threat_type": "botnet_cc", |
|
|
"threat_type_desc": "Indicator that identifies a botnet command&control...", |
|
|
"ioc_type": "ip:port", |
|
|
"ioc_type_desc": "ip:port combination that is used for botnet Command&..., |
|
|
"malware": "win.cobalt_strike", |
|
|
"malware_printable": "Cobalt Strike", |
|
|
"malware_alias": "Agentemis,BEACON,CobaltStrike", |
|
|
"malware_malpedia": "https:\/\/malpedia.caad.fkie.fraunhofer.de\/...", |
|
|
"confidence_level": 75, |
|
|
"first_seen": "2020-12-06 09:10:23 UTC", |
|
|
"last_seen": null, |
|
|
"reference": null, |
|
|
"reporter": "abuse_ch", |
|
|
"tags": null, |
|
|
"malware_samples": [ |
|
|
{ |
|
|
"time_stamp": "2021-03-23 08:18:06 UTC", |
|
|
"md5_hash": "5b7e82e051ade4b14d163eea2a17bf8b", |
|
|
"sha256_hash": "b325c92fa540edeb89b95dbfd4400c1cb33599c66859....", |
|
|
"malware_bazaar": "https:\/\/bazaar.abuse.ch\/sample\/b325c...\/" |
|
|
}, |
|
|
] |
|
|
|
|
|
} |
|
|
] |
|
|
} |
|
|
|
|
|
Args: |
|
|
potentially_ioc: this can be a url, a domain, a hash, |
|
|
or any other type of IoC. |
|
|
|
|
|
Returns: |
|
|
Information of the input as an IoC: threat type, malware type andsamples, |
|
|
confidence level, first/last seen dates, and more IoC information. |
|
|
""" |
|
|
headers = {"Auth-Key": os.environ["THREATFOX_APIKEY"]} |
|
|
pool = urllib3.HTTPSConnectionPool( |
|
|
"threatfox-api.abuse.ch", |
|
|
port=443, |
|
|
maxsize=50, |
|
|
headers=headers, |
|
|
timeout=5, |
|
|
) |
|
|
data = { |
|
|
"query": "search_ioc", |
|
|
"search_term": potentially_ioc.strip(), |
|
|
} |
|
|
json_data = json.dumps(data) |
|
|
try: |
|
|
response = pool.request("POST", "/api/v1/", body=json_data) |
|
|
return response.data.decode("utf-8", "ignore") |
|
|
except Exception as e: |
|
|
return str(e) |
|
|
|
|
|
|
|
|
geo_location_tool = gr.Interface( |
|
|
fn=get_geolocation, |
|
|
inputs=gr.Textbox(label="ip"), |
|
|
outputs=gr.JSON(label="Geolocation of IP"), |
|
|
title="Domain Associated Geolocation Finder", |
|
|
description="Retrieves the geolocation associated to an input ip address", |
|
|
theme="default", |
|
|
examples=["1.0.3.255", "59.34.7.3"], |
|
|
) |
|
|
|
|
|
dns_enumeration_tool = gr.Interface( |
|
|
fn=enumerate_dns, |
|
|
inputs=gr.Textbox(label="domain"), |
|
|
outputs=gr.JSON(label="DNS records"), |
|
|
title="DNS record enumerator of domains", |
|
|
description="Retrieves several dns record types for the input domain names", |
|
|
theme="default", |
|
|
examples=["owasp.org", "nist.gov"], |
|
|
) |
|
|
|
|
|
scrap_subdomains_tool = gr.Interface( |
|
|
fn=scrap_subdomains_for_domain, |
|
|
inputs=gr.Textbox(label="domain"), |
|
|
outputs=gr.JSON(label="Subdomains managed by domain"), |
|
|
title="Subdomains Extractor of domains", |
|
|
description="Retrieves the subdomains for the input domain if they are common", |
|
|
theme="default", |
|
|
examples=["github.com", "netacea.com"], |
|
|
) |
|
|
|
|
|
extractor_of_ioc_from_threatfox_tool = gr.Interface( |
|
|
fn=retrieve_ioc_from_threatfox, |
|
|
inputs=gr.Textbox(label="IoC - url, domains or hash"), |
|
|
outputs=gr.Text(label="Entity information as an IoC"), |
|
|
title="IoC information extractor associated to particular entities", |
|
|
description=( |
|
|
"If information as an Indicator of Compromise (IoC) exists " |
|
|
"for the input url, domain or hash, it retrieves it" |
|
|
), |
|
|
theme="default", |
|
|
examples=["advertipros.com", "dev.couplesparks.com"], |
|
|
example_labels=["👾 IoC 1", "👾 IoC 2"], |
|
|
) |
|
|
|