JC321's picture
Upload edgar_client.py
08708af verified
"""EDGAR API Client Module with Performance Optimization"""
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import urllib3
try:
from sec_edgar_api.EdgarClient import EdgarClient
except ImportError:
EdgarClient = None
import json
import time
import threading
from functools import lru_cache
from datetime import datetime, timedelta
import re
import difflib
# Disable SSL warnings for better compatibility
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class EdgarDataClient:
# Class-level cache for company_tickers.json (shared across instances)
_company_tickers_cache = None
_company_tickers_cache_time = None
_company_tickers_cache_ttl = 3600 # 1 hour TTL
_cache_lock = threading.Lock()
# Class-level rate limiter (SEC requires max 10 requests per second)
_last_request_time = 0
_rate_limit_lock = threading.Lock()
_min_request_interval = 0.11 # 110ms between requests (9 req/sec, safe margin)
# 新增:公司索引(加速搜索,避免每次遍历全量数据)
_by_ticker = None # ticker -> company info
_by_title = None # title (lowercase) -> company info
_by_title_norm = None # normalized title -> company info
_all_keys = None # 用于模糊匹配的所有key列表
_index_built_time = None
_index_ttl = 3600 # 1 hour
# 新增:常见别名映射(提升搜索智能性)
_alias_map = {
"google": "alphabet inc",
"alphabet": "alphabet inc",
"facebook": "meta platforms, inc.",
"meta": "meta platforms, inc.",
"amazon": "amazon.com, inc.",
"apple": "apple inc.",
"microsoft": "microsoft corporation",
"netflix": "netflix, inc.",
"nvidia": "nvidia corporation",
"tesla": "tesla, inc.",
"adobe": "adobe inc.",
"oracle": "oracle corporation",
"ibm": "international business machines corporation",
"paypal": "paypal holdings, inc.",
"shopify": "shopify inc.",
}
def __init__(self, user_agent="Juntao Peng Financial Report Metrics App (jtyxabc@gmail.com)"):
"""Initialize EDGAR client with connection pooling and timeout"""
self.user_agent = user_agent
# 新增:实例级搜索缓存(进一步减少重复搜索开销)
self._search_cache = {}
# Configure requests session with connection pooling
self.session = requests.Session()
# Configure retry strategy with enhanced retries for stability
retry_strategy = Retry(
total=5, # Increased from 3 to 5 for better reliability
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["HEAD", "GET", "OPTIONS"]
)
adapter = HTTPAdapter(
pool_connections=10,
pool_maxsize=20,
max_retries=retry_strategy,
pool_block=False
)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
# Set default timeout with connection and read timeouts
self.timeout = (10, 30) # (connect timeout, read timeout)
# Initialize sec_edgar_api client with timeout wrapper
if EdgarClient:
self.edgar = EdgarClient(user_agent=user_agent)
# Monkey patch to add timeout
self._patch_edgar_client_timeout()
else:
self.edgar = None
def _patch_edgar_client_timeout(self):
"""Monkey patch sec_edgar_api to add timeout support"""
if not self.edgar:
return
# Wrap get_submissions and get_company_facts with timeout (thread-based, Gradio compatible)
original_get_submissions = self.edgar.get_submissions
original_get_company_facts = self.edgar.get_company_facts
def get_submissions_with_timeout(cik):
"""Thread-based timeout wrapper for get_submissions (Gradio compatible)"""
result = [None]
exception = [None]
def wrapper():
try:
result[0] = original_get_submissions(cik)
except Exception as e:
exception[0] = e
thread = threading.Thread(target=wrapper, daemon=True)
thread.start()
# Use read timeout value (second element of timeout tuple)
timeout_seconds = self.timeout[1] if isinstance(self.timeout, tuple) else self.timeout
thread.join(timeout=timeout_seconds)
if thread.is_alive():
raise TimeoutError(f"SEC API request timeout ({timeout_seconds}s)")
if exception[0]:
raise exception[0]
return result[0]
def get_company_facts_with_timeout(cik):
"""Thread-based timeout wrapper for get_company_facts (Gradio compatible)"""
result = [None]
exception = [None]
def wrapper():
try:
result[0] = original_get_company_facts(cik)
except Exception as e:
exception[0] = e
thread = threading.Thread(target=wrapper, daemon=True)
thread.start()
# Use read timeout value (second element of timeout tuple)
timeout_seconds = self.timeout[1] if isinstance(self.timeout, tuple) else self.timeout
thread.join(timeout=timeout_seconds)
if thread.is_alive():
raise TimeoutError(f"SEC API request timeout ({timeout_seconds}s)")
if exception[0]:
raise exception[0]
return result[0]
self.edgar.get_submissions = get_submissions_with_timeout
self.edgar.get_company_facts = get_company_facts_with_timeout
def _rate_limit(self):
"""Thread-safe rate limiting to comply with SEC requirements"""
with self._rate_limit_lock:
current_time = time.time()
time_since_last = current_time - EdgarDataClient._last_request_time
if time_since_last < self._min_request_interval:
sleep_time = self._min_request_interval - time_since_last
time.sleep(sleep_time)
EdgarDataClient._last_request_time = time.time()
def _normalize_text(self, s: str) -> str:
"""规范化文本:用于提升匹配准确度"""
if not s:
return ""
s = s.lower().strip()
s = s.replace("&", " and ")
s = re.sub(r"[.,()\-_/]", " ", s)
s = re.sub(r"\s+", " ", s)
# 移除常见后缀词
stopwords = {"inc", "inc.", "incorporated", "corp", "corporation", "co", "company", "plc", "ltd", "llc", "the"}
tokens = [t for t in s.split() if t not in stopwords]
return " ".join(tokens).strip()
def _ensure_company_index(self):
"""确保公司索引已构建(按需构建或过期重建)"""
with self._cache_lock:
current_time = time.time()
# 若 company_tickers 缓存不存在或已过期,先刷新
if (EdgarDataClient._company_tickers_cache is None or
EdgarDataClient._company_tickers_cache_time is None or
current_time - EdgarDataClient._company_tickers_cache_time >= self._company_tickers_cache_ttl):
# 拉取并更新 company_tickers 缓存
self._rate_limit()
url = "https://www.sec.gov/files/company_tickers.json"
headers = {"User-Agent": self.user_agent}
response = self.session.get(url, headers=headers, timeout=self.timeout)
response.raise_for_status()
companies = response.json()
EdgarDataClient._company_tickers_cache = companies
EdgarDataClient._company_tickers_cache_time = current_time
else:
companies = EdgarDataClient._company_tickers_cache
# 若索引不存在或已过期,则重建索引
if (EdgarDataClient._by_ticker is None or
EdgarDataClient._by_title is None or
EdgarDataClient._by_title_norm is None or
EdgarDataClient._all_keys is None or
EdgarDataClient._index_built_time is None or
current_time - EdgarDataClient._index_built_time >= EdgarDataClient._index_ttl):
by_ticker = {}
by_title = {}
by_title_norm = {}
all_keys = []
for _, company in companies.items():
title = company.get("title", "")
ticker = company.get("ticker", "")
cik_str = str(company.get("cik_str", "")).zfill(10)
title_lower = title.lower()
ticker_lower = ticker.lower()
title_norm = self._normalize_text(title)
# 构建索引:ticker、title、normalized title
if ticker_lower:
by_ticker[ticker_lower] = {"cik": cik_str, "name": title, "ticker": ticker}
all_keys.append(ticker_lower)
if title_lower:
by_title[title_lower] = {"cik": cik_str, "name": title, "ticker": ticker}
if title_norm:
by_title_norm[title_norm] = {"cik": cik_str, "name": title, "ticker": ticker}
all_keys.append(title_norm)
EdgarDataClient._by_ticker = by_ticker
EdgarDataClient._by_title = by_title
EdgarDataClient._by_title_norm = by_title_norm
EdgarDataClient._all_keys = all_keys
EdgarDataClient._index_built_time = current_time
def search_company_by_name(self, company_name):
"""Search company CIK by company name with caching and optimized ticker matching"""
try:
# 实例级缓存命中检查(按规范化后的query)
norm_query = self._normalize_text(company_name)
cache_hit = self._search_cache.get(norm_query)
if cache_hit:
return cache_hit
# 确保索引已构建(首次或过期后会重建)
self._ensure_company_index()
# 获取索引引用(已在锁内构建完成)
by_ticker = EdgarDataClient._by_ticker
by_title = EdgarDataClient._by_title
by_title_norm = EdgarDataClient._by_title_norm
all_keys = EdgarDataClient._all_keys
# ✅ OPTIMIZATION 1: Ticker 优先匹配(遵循项目规范)
raw = company_name.strip().lower()
raw_compact = re.sub(r"[^a-z0-9]", "", raw)
is_ticker_like = len(raw_compact) <= 5 and len(raw_compact) >= 1
if is_ticker_like and raw_compact in by_ticker:
result = by_ticker[raw_compact]
self._search_cache[norm_query] = result
return result
# ✅ OPTIMIZATION 2: 别名映射(如 'google' -> 'alphabet inc')
alias_target = EdgarDataClient._alias_map.get(norm_query)
if alias_target:
alias_norm = self._normalize_text(alias_target)
# 先尝试规范化标题
if alias_norm in by_title_norm:
result = by_title_norm[alias_norm]
self._search_cache[norm_query] = result
return result
# 再尝试原始标题
alias_lower = alias_target.lower()
if alias_lower in by_title:
result = by_title[alias_lower]
self._search_cache[norm_query] = result
return result
# 最后尝试 ticker(有些别名可能实际上是ticker)
alias_ticker = re.sub(r"[^a-z0-9]", "", alias_lower)
if alias_ticker in by_ticker:
result = by_ticker[alias_ticker]
self._search_cache[norm_query] = result
return result
# ✅ OPTIMIZATION 3: 精确匹配(原始标题)
title_lower = company_name.lower().strip()
if title_lower in by_title:
result = by_title[title_lower]
self._search_cache[norm_query] = result
return result
# ✅ OPTIMIZATION 4: 精确匹配(规范化标题)
if norm_query in by_title_norm:
result = by_title_norm[norm_query]
self._search_cache[norm_query] = result
return result
# ✅ OPTIMIZATION 5: 精确匹配(ticker,再次尝试原始输入)
if raw_compact in by_ticker:
result = by_ticker[raw_compact]
self._search_cache[norm_query] = result
return result
# ✅ OPTIMIZATION 6: 部分包含匹配
partial_matches = []
for key in by_title_norm.keys():
if norm_query in key:
partial_matches.append(key)
if not partial_matches:
for t in by_ticker.keys():
if norm_query in t:
partial_matches.append(t)
if partial_matches:
best_key = max(
partial_matches,
key=lambda k: difflib.SequenceMatcher(None, norm_query, k).ratio()
)
result = by_title_norm.get(best_key) or by_ticker.get(best_key)
if result:
self._search_cache[norm_query] = result
return result
# ✅ OPTIMIZATION 7: 模糊匹配(difflib,用于拼写近似的情况)
close = difflib.get_close_matches(norm_query, all_keys, n=1, cutoff=0.78)
if close:
best = close[0]
result = by_title_norm.get(best) or by_ticker.get(best)
if result:
self._search_cache[norm_query] = result
return result
# 未找到
return None
except TimeoutError as e:
print(f"Timeout searching company: {e}")
return None
except Exception as e:
print(f"Error searching company: {e}")
return None
@lru_cache(maxsize=128)
def get_company_info(self, cik):
"""
Get basic company information (cached)
Args:
cik (str): Company CIK code
Returns:
dict: Dictionary containing company information
"""
if not self.edgar:
print("sec_edgar_api library not installed")
return None
try:
self._rate_limit()
# Get company submissions (now has timeout protection)
submissions = self.edgar.get_submissions(cik=cik)
return {
"cik": cik,
"name": submissions.get("name", ""),
"tickers": submissions.get("tickers", []),
"sic": submissions.get("sic", ""),
"sic_description": submissions.get("sicDescription", "")
}
except TimeoutError as e:
print(f"Timeout getting company info for CIK {cik}: {e}")
return None
except Exception as e:
print(f"Error getting company info: {e}")
return None
@lru_cache(maxsize=128)
def get_company_filings(self, cik, form_types=None):
"""
Get all company filing documents (cached)
Args:
cik (str): Company CIK code
form_types (tuple): Tuple of form types, e.g., ('10-K', '10-Q'), None for all types
Returns:
list: List of filing documents
"""
if not self.edgar:
print("sec_edgar_api library not installed")
return []
# Convert list to tuple for caching (lists are not hashable)
if form_types and isinstance(form_types, list):
form_types = tuple(form_types)
try:
self._rate_limit()
# Get company submissions (now has timeout protection)
submissions = self.edgar.get_submissions(cik=cik)
# Extract filing information
filings = []
recent = submissions.get("filings", {}).get("recent", {})
# Get data from each field
form_types_list = recent.get("form", [])
filing_dates = recent.get("filingDate", [])
accession_numbers = recent.get("accessionNumber", [])
primary_documents = recent.get("primaryDocument", [])
# Iterate through all filings
for i in range(len(form_types_list)):
form_type = form_types_list[i]
# ✅ 归一化表单类型: "10-K/A" -> "10-K", "20-F/A" -> "20-F"
# 这样修订版年报也能被正确识别和使用
normalized_form_type = form_type.split('/')[0]
# Filter by form type if specified (使用归一化后的类型)
if form_types and normalized_form_type not in form_types:
continue
filing_date = filing_dates[i] if i < len(filing_dates) else ""
accession_number = accession_numbers[i] if i < len(accession_numbers) else ""
primary_document = primary_documents[i] if i < len(primary_documents) else ""
filing = {
"form_type": form_type, # 保留原始form_type供参考
"filing_date": filing_date,
"accession_number": accession_number,
"primary_document": primary_document
}
filings.append(filing)
return filings
except TimeoutError as e:
print(f"Timeout getting company filings for CIK {cik}: {e}")
return []
except Exception as e:
print(f"Error getting company filings: {e}")
return []
@lru_cache(maxsize=128)
def get_company_facts(self, cik):
"""
Get all company financial facts data (cached)
Args:
cik (str): Company CIK code
Returns:
dict: Company financial facts data
"""
if not self.edgar:
print("sec_edgar_api library not installed")
return {}
try:
self._rate_limit()
# Now has timeout protection via monkey patch
facts = self.edgar.get_company_facts(cik=cik)
return facts
except TimeoutError as e:
print(f"Timeout getting company facts for CIK {cik}: {e}")
return {}
except Exception as e:
print(f"Error getting company facts: {e}")
return {}
def get_financial_data_for_period(self, cik, period):
"""
Get financial data for a specific period (supports annual and quarterly) - Cached
Args:
cik (str): Company CIK code
period (str): Period in format 'YYYY' or 'YYYYQX' (e.g., '2025' or '2025Q3')
Returns:
dict: Financial data dictionary
"""
if not self.edgar:
print("sec_edgar_api library not installed")
return {}
# 实例级缓存(避免重复计算)
cache_key = f"period_{cik}_{period}"
if hasattr(self, '_period_cache') and cache_key in self._period_cache:
return self._period_cache[cache_key]
if not hasattr(self, '_period_cache'):
self._period_cache = {}
try:
# Get company financial facts
facts = self.get_company_facts(cik)
if not facts:
return {}
# Extract us-gaap and ifrs-full financial data (20-F may use IFRS)
us_gaap = facts.get("facts", {}).get("us-gaap", {})
ifrs_full = facts.get("facts", {}).get("ifrs-full", {})
# Define financial metrics and their XBRL tags
# Include multiple possible tags to improve match rate (including US-GAAP and IFRS tags)
financial_metrics = {
"total_revenue": ["Revenues", "RevenueFromContractWithCustomerExcludingAssessedTax", "RevenueFromContractWithCustomerIncludingAssessedTax", "SalesRevenueNet", "RevenueFromContractWithCustomer", "Revenue"],
"net_income": ["NetIncomeLoss", "ProfitLoss", "NetIncome", "ProfitLossAttributableToOwnersOfParent"],
"earnings_per_share": ["EarningsPerShareBasic", "EarningsPerShare", "BasicEarningsPerShare", "BasicEarningsLossPerShare"],
"operating_expenses": ["OperatingExpenses", "OperatingCostsAndExpenses", "OperatingExpensesExcludingDepreciationAndAmortization", "CostsAndExpenses", "GeneralAndAdministrativeExpense", "CostOfRevenue", "ResearchAndDevelopmentExpense", "SellingAndMarketingExpense"],
"operating_cash_flow": ["NetCashProvidedByUsedInOperatingActivities", "NetCashProvidedUsedInOperatingActivities", "NetCashFlowsFromUsedInOperatingActivities", "CashFlowsFromUsedInOperatingActivities"],
}
# Store result with new optimized structure
result = {
"period": period,
"_metadata": {},
"metrics": {}
}
# Determine target form types to search
if 'Q' in period:
# Quarterly data, mainly search 10-Q (20-F usually doesn't have quarterly reports)
target_forms = ("10-Q",) # Use tuple for caching
target_forms_annual = ("10-K", "20-F") # for fallback
year = int(period.split('Q')[0])
quarter = period.split('Q')[1]
else:
# Annual data, search 10-K and 20-F annual forms
target_forms = ("10-K", "20-F") # Use tuple for caching
target_forms_annual = target_forms
year = int(period)
quarter = None
# Get company filings to find accession number and primary document
filings = self.get_company_filings(cik, form_types=target_forms)
filings_map = {} # Map: form -> {accession_number, primary_document, filing_date}
# Build filing map for quick lookup
for filing in filings:
form_type = filing.get("form_type", "")
filing_date = filing.get("filing_date", "")
accession_number = filing.get("accession_number", "")
primary_document = filing.get("primary_document", "")
if filing_date and accession_number:
# Extract year from filing_date (format: YYYY-MM-DD)
file_year = int(filing_date[:4]) if len(filing_date) >= 4 else 0
# ✅ 归一化表单类型: "10-K/A" -> "10-K", "20-F/A" -> "20-F"
# 使用归一化后的类型构建 key,这样 facts 中的 "10-K" 能命中 "10-K/A"
normalized_form_type = form_type.split('/')[0]
# ✅ FIXED: Remove year filter to keep all filings
# 20-F forms are often filed in the year after the fiscal year
# We'll match them later using fiscal year (fy) and filed date
key = f"{normalized_form_type}_{file_year}" # 使用归一化后的类型
if key not in filings_map:
filings_map[key] = {
"accession_number": accession_number,
"primary_document": primary_document,
"form_type": form_type, # 保留原始 form_type
"filing_date": filing_date
}
# Iterate through each financial metric
for metric_key, metric_tags in financial_metrics.items():
# Support multiple possible tags
for metric_tag in metric_tags:
# Search both US-GAAP and IFRS tags
metric_data = None
data_source = None
if metric_tag in us_gaap:
metric_data = us_gaap[metric_tag]
data_source = "us-gaap"
elif metric_tag in ifrs_full:
metric_data = ifrs_full[metric_tag]
data_source = "ifrs-full"
if metric_data:
units = metric_data.get("units", {})
# Find USD unit data (supports USD and USD/shares)
usd_data = None
if "USD" in units:
usd_data = units["USD"]
elif "USD/shares" in units and metric_key == "earnings_per_share":
# EPS uses USD/shares unit
usd_data = units["USD/shares"]
if usd_data:
# Try exact match first, then loose match
matched_entry = None
# Search for data in the specified period
for entry in usd_data:
form = entry.get("form", "")
fy = entry.get("fy", 0)
fp = entry.get("fp", "")
end_date = entry.get("end", "")
if not end_date or len(end_date) < 4:
continue
entry_year = int(end_date[:4])
# Check if form type matches
if form in target_forms:
if quarter:
# Quarterly data match
if entry_year == year and fp == f"Q{quarter}":
# If already matched, compare end date, choose the latest
if matched_entry:
if entry.get("end", "") > matched_entry.get("end", ""):
matched_entry = entry
else:
matched_entry = entry
else:
# Annual data match - prioritize fiscal year (fy) field
# Strategy 1: Exact match by fiscal year
if fy == year and (fp == "FY" or fp == "" or not fp):
# If already matched, compare end date, choose the latest
if matched_entry:
if entry.get("end", "") > matched_entry.get("end", ""):
matched_entry = entry
else:
matched_entry = entry
# Strategy 2: Match by end date year (when fy not available or doesn't match)
elif not matched_entry and entry_year == year and (fp == "FY" or fp == "" or not fp):
matched_entry = entry
# Strategy 3: Allow fy to differ by 1 year (fiscal year vs calendar year mismatch)
elif not matched_entry and fy > 0 and abs(fy - year) <= 1 and (fp == "FY" or fp == "" or not fp):
matched_entry = entry
# Strategy 4: Match by frame field for 20-F
elif not matched_entry and form == "20-F" and "frame" in entry:
frame = entry.get("frame", "")
if f"CY{year}" in frame or str(year) in end_date:
matched_entry = entry
# If quarterly data not found, try finding from annual report (fallback strategy)
if not matched_entry and quarter and target_forms_annual:
for entry in usd_data:
form = entry.get("form", "")
end_date = entry.get("end", "")
fp = entry.get("fp", "")
if form in target_forms_annual and end_date:
# Check if end date is within this quarter range
if str(year) in end_date and f"Q{quarter}" in fp:
matched_entry = entry
break
# Apply matched data
if matched_entry:
# Store metric value and tag
result["metrics"][metric_key] = {
"value": matched_entry.get("val", 0),
"tag": metric_tag
}
# Get form and accession info - only populate metadata once
if not result["_metadata"]:
form_type = matched_entry.get("form", "")
accn_from_facts = matched_entry.get('accn', '').replace('-', '')
filed_date = matched_entry.get('filed', '')
# Multi-strategy filing lookup for 20-F and cross-year submissions
filing_info = None
# Strategy 1: Try matching by fiscal year
filing_key = f"{form_type}_{year}"
filing_info = filings_map.get(filing_key)
# Strategy 2: Try matching by filed year (for 20-F filed in next year)
if not filing_info and filed_date:
filed_year = int(filed_date[:4]) if len(filed_date) >= 4 else 0
if filed_year > 0:
filing_key = f"{form_type}_{filed_year}"
filing_info = filings_map.get(filing_key)
# Strategy 3: Try fiscal year + 1 (common for 20-F)
if not filing_info:
filing_key = f"{form_type}_{year + 1}"
filing_info = filings_map.get(filing_key)
# Strategy 4: Search all filings with matching form type and accession
if not filing_info and accn_from_facts:
for key, finfo in filings_map.items():
if finfo["form_type"] == form_type:
filing_accn = finfo["accession_number"].replace('-', '')
if filing_accn == accn_from_facts:
filing_info = finfo
break
# Generate source URL
source_url = ""
if filing_info:
accession_number = filing_info["accession_number"].replace('-', '')
primary_document = filing_info["primary_document"]
if primary_document:
source_url = f"https://www.sec.gov/Archives/edgar/data/{cik}/{accession_number}/{primary_document}"
else:
source_url = f"https://www.sec.gov/cgi-bin/browse-edgar?action=getcompany&CIK={cik}&type={form_type}&dateb=&owner=exclude&count=100"
else:
source_url = f"https://www.sec.gov/cgi-bin/browse-edgar?action=getcompany&CIK={cik}&type={form_type}&dateb=&owner=exclude&count=100"
# Populate metadata (only once per period)
result["_metadata"] = {
"form": matched_entry.get("form", ""),
"fiscal_year": matched_entry.get("fy", 0),
"fiscal_period": matched_entry.get("fp", ""),
"start_date": matched_entry.get("start", ""),
"end_date": matched_entry.get("end", ""),
"filed_date": matched_entry.get("filed", ""),
"source_url": source_url,
"data_source": data_source
}
# If data is found, break out of tag loop
if metric_key in result["metrics"]:
break
# 缓存结果
if result and "period" in result:
self._period_cache[cache_key] = result
return result
except Exception as e:
print(f"Error getting financial data for period {period}: {e}")
return {}