CySecGuardians / parse_email.py
princemaxp's picture
Update parse_email.py
f88bfb7 verified
# parse_email.py
import email
from email import policy
from bs4 import BeautifulSoup
import re
import base64
import io
def _extract_inline_images_from_html(html):
images = []
soup = BeautifulSoup(html or "", "html.parser")
for img in soup.find_all("img"):
src = img.get("src", "")
if src.startswith("...
try:
header, b64 = src.split(",", 1)
data = base64.b64decode(b64)
images.append(data)
except Exception:
continue
return images
def parse_email(file_path):
"""
Returns: headers(dict), subject(str), body(str), urls(list), images(list of bytes)
"""
with open(file_path, "rb") as f:
msg = email.message_from_binary_file(f, policy=policy.default)
headers = dict(msg.items())
subject = headers.get("Subject", "") or ""
body = ""
images = []
# Walk parts - handle multipart and attachments
if msg.is_multipart():
for part in msg.walk():
ctype = part.get_content_type()
disp = str(part.get("Content-Disposition") or "").lower()
# attachments that are images
if ctype.startswith("image/"):
try:
data = part.get_payload(decode=True)
if data:
images.append(data)
except Exception:
pass
# text/plain
if ctype == "text/plain" and "attachment" not in disp:
try:
body += part.get_content()
except Exception:
pass
# text/html
if ctype == "text/html" and "attachment" not in disp:
try:
html_body = part.get_content()
# extract inline images from this html (data URIs)
images += _extract_inline_images_from_html(html_body)
# convert html to text
soup = BeautifulSoup(html_body, "html.parser")
body += soup.get_text(" ", strip=True)
except Exception:
pass
else:
# not multipart
try:
if msg.get_content_type() == "text/html":
html_body = msg.get_content()
images += _extract_inline_images_from_html(html_body)
soup = BeautifulSoup(html_body, "html.parser")
body = soup.get_text(" ", strip=True)
else:
body = msg.get_content()
except Exception:
body = ""
# URL extraction (from combined body)
urls = set()
try:
urls.update(re.findall(r"https?://[^\s\"'<>]+", body))
except Exception:
pass
# Also try to find URLs in headers (e.g., List-Unsubscribe) or other parts
for k, v in headers.items():
try:
urls.update(re.findall(r"https?://[^\s\"'<>]+", str(v)))
except Exception:
pass
return headers, subject, body, list(urls), images