File size: 4,931 Bytes
d455ad5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
import re
import dns.resolver
import smtplib
import requests
import threading
import queue
import dns.reversename
CACHE_TTL = 600
# Initialize a DNS resolver with caching enabled
resolver = dns.resolver.Resolver(configure=False)
resolver.nameservers = ['8.8.8.8']
resolver.cache = dns.resolver.Cache()
# def is_valid_email(email):
# # Check if "@" is present in the email
# if "@" not in email:
# return False
# local_part, domain_part = email.split('@')
# # Check for consecutive dots, hyphens, or underscores in the local part
# if re.search(r'\.{2}|-{2}|_{2}', local_part):
# return False
# # Check for consecutive dots, hyphens, or underscores in the domain part
# if re.search(r'\.{2}|-{2}|_{2}', domain_part):
# return False
# # Check for two consecutive dots, hyphens, or underscores anywhere in the email
# if re.search(r'\.\-|\-\.|\.\.|\_\-|\-\_|\_\_|\.\.|--', email):
# return False
# # Validate email syntax
# pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
# return re.match(pattern, email) is not None
def is_valid_email(email):
# Comprehensive regex for email validation
pattern = r'''
^ # Start of string
(?!.*[._%+-]{2}) # No consecutive special characters
[a-zA-Z0-9._%+-]{1,64} # Local part: allowed characters and length limit
(?<![._%+-]) # No special characters at the end of local part
@ # "@" symbol
[a-zA-Z0-9.-]+ # Domain part: allowed characters
(?<![.-]) # No special characters at the end of domain
\.[a-zA-Z]{2,}$ # Top-level domain with minimum 2 characters
'''
# Match the entire email against the pattern
return re.match(pattern, email, re.VERBOSE) is not None
# mx record validation
# Set the cache TTL (in seconds)
def query_dns(record_type, domain):
try:
# Try to resolve the record from cache first
record_name = domain if record_type == 'MX' else f'{domain}.'
cache_result = resolver.cache.get((record_name, record_type))
if cache_result is not None and (dns.resolver.mtime() - cache_result.time) < CACHE_TTL:
return True
# Otherwise, perform a fresh DNS query
resolver.timeout = 2
resolver.lifetime = 2
resolver.resolve(record_name, record_type)
return True
except dns.resolver.NXDOMAIN:
# The domain does not exist
return False
except dns.resolver.NoAnswer:
# No record of the requested type was found
return False
except dns.resolver.Timeout:
# The query timed out
return False
except:
# An unexpected error occurred
return False
def has_valid_mx_record(domain):
# Define a function to handle each DNS query in a separate thread
def query_mx(results_queue):
results_queue.put(query_dns('MX', domain))
def query_a(results_queue):
results_queue.put(query_dns('A', domain))
# Start multiple threads to query the MX and A records simultaneously
mx_queue = queue.Queue()
a_queue = queue.Queue()
mx_thread = threading.Thread(target=query_mx, args=(mx_queue,))
a_thread = threading.Thread(target=query_a, args=(a_queue,))
mx_thread.start()
a_thread.start()
# Wait for both threads to finish and retrieve the results from the queues
mx_thread.join()
a_thread.join()
mx_result = mx_queue.get()
a_result = a_queue.get()
return mx_result or a_result
# smtp connection
def verify_email(email):
# Split the email address into username and domain parts
domain = email.split('@')[1]
# Check the domain MX records
try:
mx_records = dns.resolver.resolve(domain, 'MX')
except dns.resolver.NoAnswer:
return False
# Connect to the SMTP server and perform the email verification
for mx in mx_records:
try:
smtp_server = smtplib.SMTP(str(mx.exchange))
smtp_server.ehlo()
smtp_server.mail('')
code, message = smtp_server.rcpt(str(email))
smtp_server.quit()
if code == 250:
return True
except:
pass
return False
# temporary domain
def is_disposable(domain):
blacklists = [
'https://raw.githubusercontent.com/andreis/disposable-email-domains/master/domains.txt',
'https://raw.githubusercontent.com/wesbos/burner-email-providers/master/emails.txt'
]
for blacklist_url in blacklists:
try:
blacklist = set(requests.get(blacklist_url).text.strip().split('\n'))
if domain in blacklist:
return True
except Exception as e:
print(f'Error loading blacklist {blacklist_url}: {e}')
return False
|