|
|
"""EDGAR API Client Module with Performance Optimization""" |
|
|
|
|
|
import requests |
|
|
from requests.adapters import HTTPAdapter |
|
|
from urllib3.util.retry import Retry |
|
|
import urllib3 |
|
|
try: |
|
|
from sec_edgar_api.EdgarClient import EdgarClient |
|
|
except ImportError: |
|
|
EdgarClient = None |
|
|
import json |
|
|
import time |
|
|
import threading |
|
|
from functools import lru_cache |
|
|
from datetime import datetime, timedelta |
|
|
import re |
|
|
import difflib |
|
|
|
|
|
|
|
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) |
|
|
|
|
|
|
|
|
class EdgarDataClient: |
|
|
|
|
|
_company_tickers_cache = None |
|
|
_company_tickers_cache_time = None |
|
|
_company_tickers_cache_ttl = 3600 |
|
|
_cache_lock = threading.Lock() |
|
|
|
|
|
|
|
|
_last_request_time = 0 |
|
|
_rate_limit_lock = threading.Lock() |
|
|
_min_request_interval = 0.11 |
|
|
|
|
|
|
|
|
_by_ticker = None |
|
|
_by_title = None |
|
|
_by_title_norm = None |
|
|
_all_keys = None |
|
|
_index_built_time = None |
|
|
_index_ttl = 3600 |
|
|
|
|
|
|
|
|
_alias_map = { |
|
|
"google": "alphabet inc", |
|
|
"alphabet": "alphabet inc", |
|
|
"facebook": "meta platforms, inc.", |
|
|
"meta": "meta platforms, inc.", |
|
|
"amazon": "amazon.com, inc.", |
|
|
"apple": "apple inc.", |
|
|
"microsoft": "microsoft corporation", |
|
|
"netflix": "netflix, inc.", |
|
|
"nvidia": "nvidia corporation", |
|
|
"tesla": "tesla, inc.", |
|
|
"adobe": "adobe inc.", |
|
|
"oracle": "oracle corporation", |
|
|
"ibm": "international business machines corporation", |
|
|
"paypal": "paypal holdings, inc.", |
|
|
"shopify": "shopify inc.", |
|
|
} |
|
|
|
|
|
def __init__(self, user_agent="Juntao Peng Financial Report Metrics App (jtyxabc@gmail.com)"): |
|
|
"""Initialize EDGAR client with connection pooling and timeout""" |
|
|
self.user_agent = user_agent |
|
|
|
|
|
|
|
|
self._search_cache = {} |
|
|
|
|
|
|
|
|
self.session = requests.Session() |
|
|
|
|
|
|
|
|
retry_strategy = Retry( |
|
|
total=5, |
|
|
backoff_factor=1, |
|
|
status_forcelist=[429, 500, 502, 503, 504], |
|
|
allowed_methods=["HEAD", "GET", "OPTIONS"] |
|
|
) |
|
|
|
|
|
adapter = HTTPAdapter( |
|
|
pool_connections=10, |
|
|
pool_maxsize=20, |
|
|
max_retries=retry_strategy, |
|
|
pool_block=False |
|
|
) |
|
|
|
|
|
self.session.mount("http://", adapter) |
|
|
self.session.mount("https://", adapter) |
|
|
|
|
|
|
|
|
self.timeout = (10, 30) |
|
|
|
|
|
|
|
|
if EdgarClient: |
|
|
self.edgar = EdgarClient(user_agent=user_agent) |
|
|
|
|
|
self._patch_edgar_client_timeout() |
|
|
else: |
|
|
self.edgar = None |
|
|
|
|
|
def _patch_edgar_client_timeout(self): |
|
|
"""Monkey patch sec_edgar_api to add timeout support""" |
|
|
if not self.edgar: |
|
|
return |
|
|
|
|
|
|
|
|
original_get_submissions = self.edgar.get_submissions |
|
|
original_get_company_facts = self.edgar.get_company_facts |
|
|
|
|
|
def get_submissions_with_timeout(cik): |
|
|
"""Thread-based timeout wrapper for get_submissions (Gradio compatible)""" |
|
|
result = [None] |
|
|
exception = [None] |
|
|
|
|
|
def wrapper(): |
|
|
try: |
|
|
result[0] = original_get_submissions(cik) |
|
|
except Exception as e: |
|
|
exception[0] = e |
|
|
|
|
|
thread = threading.Thread(target=wrapper, daemon=True) |
|
|
thread.start() |
|
|
|
|
|
timeout_seconds = self.timeout[1] if isinstance(self.timeout, tuple) else self.timeout |
|
|
thread.join(timeout=timeout_seconds) |
|
|
|
|
|
if thread.is_alive(): |
|
|
raise TimeoutError(f"SEC API request timeout ({timeout_seconds}s)") |
|
|
|
|
|
if exception[0]: |
|
|
raise exception[0] |
|
|
|
|
|
return result[0] |
|
|
|
|
|
def get_company_facts_with_timeout(cik): |
|
|
"""Thread-based timeout wrapper for get_company_facts (Gradio compatible)""" |
|
|
result = [None] |
|
|
exception = [None] |
|
|
|
|
|
def wrapper(): |
|
|
try: |
|
|
result[0] = original_get_company_facts(cik) |
|
|
except Exception as e: |
|
|
exception[0] = e |
|
|
|
|
|
thread = threading.Thread(target=wrapper, daemon=True) |
|
|
thread.start() |
|
|
|
|
|
timeout_seconds = self.timeout[1] if isinstance(self.timeout, tuple) else self.timeout |
|
|
thread.join(timeout=timeout_seconds) |
|
|
|
|
|
if thread.is_alive(): |
|
|
raise TimeoutError(f"SEC API request timeout ({timeout_seconds}s)") |
|
|
|
|
|
if exception[0]: |
|
|
raise exception[0] |
|
|
|
|
|
return result[0] |
|
|
|
|
|
self.edgar.get_submissions = get_submissions_with_timeout |
|
|
self.edgar.get_company_facts = get_company_facts_with_timeout |
|
|
|
|
|
def _rate_limit(self): |
|
|
"""Thread-safe rate limiting to comply with SEC requirements""" |
|
|
with self._rate_limit_lock: |
|
|
current_time = time.time() |
|
|
time_since_last = current_time - EdgarDataClient._last_request_time |
|
|
|
|
|
if time_since_last < self._min_request_interval: |
|
|
sleep_time = self._min_request_interval - time_since_last |
|
|
time.sleep(sleep_time) |
|
|
|
|
|
EdgarDataClient._last_request_time = time.time() |
|
|
|
|
|
def _normalize_text(self, s: str) -> str: |
|
|
"""规范化文本:用于提升匹配准确度""" |
|
|
if not s: |
|
|
return "" |
|
|
s = s.lower().strip() |
|
|
s = s.replace("&", " and ") |
|
|
s = re.sub(r"[.,()\-_/]", " ", s) |
|
|
s = re.sub(r"\s+", " ", s) |
|
|
|
|
|
stopwords = {"inc", "inc.", "incorporated", "corp", "corporation", "co", "company", "plc", "ltd", "llc", "the"} |
|
|
tokens = [t for t in s.split() if t not in stopwords] |
|
|
return " ".join(tokens).strip() |
|
|
|
|
|
def _ensure_company_index(self): |
|
|
"""确保公司索引已构建(按需构建或过期重建)""" |
|
|
with self._cache_lock: |
|
|
current_time = time.time() |
|
|
|
|
|
|
|
|
if (EdgarDataClient._company_tickers_cache is None or |
|
|
EdgarDataClient._company_tickers_cache_time is None or |
|
|
current_time - EdgarDataClient._company_tickers_cache_time >= self._company_tickers_cache_ttl): |
|
|
|
|
|
self._rate_limit() |
|
|
url = "https://www.sec.gov/files/company_tickers.json" |
|
|
headers = {"User-Agent": self.user_agent} |
|
|
response = self.session.get(url, headers=headers, timeout=self.timeout) |
|
|
response.raise_for_status() |
|
|
companies = response.json() |
|
|
EdgarDataClient._company_tickers_cache = companies |
|
|
EdgarDataClient._company_tickers_cache_time = current_time |
|
|
else: |
|
|
companies = EdgarDataClient._company_tickers_cache |
|
|
|
|
|
|
|
|
if (EdgarDataClient._by_ticker is None or |
|
|
EdgarDataClient._by_title is None or |
|
|
EdgarDataClient._by_title_norm is None or |
|
|
EdgarDataClient._all_keys is None or |
|
|
EdgarDataClient._index_built_time is None or |
|
|
current_time - EdgarDataClient._index_built_time >= EdgarDataClient._index_ttl): |
|
|
|
|
|
by_ticker = {} |
|
|
by_title = {} |
|
|
by_title_norm = {} |
|
|
all_keys = [] |
|
|
|
|
|
for _, company in companies.items(): |
|
|
title = company.get("title", "") |
|
|
ticker = company.get("ticker", "") |
|
|
cik_str = str(company.get("cik_str", "")).zfill(10) |
|
|
|
|
|
title_lower = title.lower() |
|
|
ticker_lower = ticker.lower() |
|
|
title_norm = self._normalize_text(title) |
|
|
|
|
|
|
|
|
if ticker_lower: |
|
|
by_ticker[ticker_lower] = {"cik": cik_str, "name": title, "ticker": ticker} |
|
|
all_keys.append(ticker_lower) |
|
|
if title_lower: |
|
|
by_title[title_lower] = {"cik": cik_str, "name": title, "ticker": ticker} |
|
|
if title_norm: |
|
|
by_title_norm[title_norm] = {"cik": cik_str, "name": title, "ticker": ticker} |
|
|
all_keys.append(title_norm) |
|
|
|
|
|
EdgarDataClient._by_ticker = by_ticker |
|
|
EdgarDataClient._by_title = by_title |
|
|
EdgarDataClient._by_title_norm = by_title_norm |
|
|
EdgarDataClient._all_keys = all_keys |
|
|
EdgarDataClient._index_built_time = current_time |
|
|
|
|
|
def search_company_by_name(self, company_name): |
|
|
"""Search company CIK by company name with caching and optimized ticker matching""" |
|
|
try: |
|
|
|
|
|
norm_query = self._normalize_text(company_name) |
|
|
cache_hit = self._search_cache.get(norm_query) |
|
|
if cache_hit: |
|
|
return cache_hit |
|
|
|
|
|
|
|
|
self._ensure_company_index() |
|
|
|
|
|
|
|
|
by_ticker = EdgarDataClient._by_ticker |
|
|
by_title = EdgarDataClient._by_title |
|
|
by_title_norm = EdgarDataClient._by_title_norm |
|
|
all_keys = EdgarDataClient._all_keys |
|
|
|
|
|
|
|
|
raw = company_name.strip().lower() |
|
|
raw_compact = re.sub(r"[^a-z0-9]", "", raw) |
|
|
is_ticker_like = len(raw_compact) <= 5 and len(raw_compact) >= 1 |
|
|
|
|
|
if is_ticker_like and raw_compact in by_ticker: |
|
|
result = by_ticker[raw_compact] |
|
|
self._search_cache[norm_query] = result |
|
|
return result |
|
|
|
|
|
|
|
|
alias_target = EdgarDataClient._alias_map.get(norm_query) |
|
|
if alias_target: |
|
|
alias_norm = self._normalize_text(alias_target) |
|
|
|
|
|
if alias_norm in by_title_norm: |
|
|
result = by_title_norm[alias_norm] |
|
|
self._search_cache[norm_query] = result |
|
|
return result |
|
|
|
|
|
alias_lower = alias_target.lower() |
|
|
if alias_lower in by_title: |
|
|
result = by_title[alias_lower] |
|
|
self._search_cache[norm_query] = result |
|
|
return result |
|
|
|
|
|
alias_ticker = re.sub(r"[^a-z0-9]", "", alias_lower) |
|
|
if alias_ticker in by_ticker: |
|
|
result = by_ticker[alias_ticker] |
|
|
self._search_cache[norm_query] = result |
|
|
return result |
|
|
|
|
|
|
|
|
title_lower = company_name.lower().strip() |
|
|
if title_lower in by_title: |
|
|
result = by_title[title_lower] |
|
|
self._search_cache[norm_query] = result |
|
|
return result |
|
|
|
|
|
|
|
|
if norm_query in by_title_norm: |
|
|
result = by_title_norm[norm_query] |
|
|
self._search_cache[norm_query] = result |
|
|
return result |
|
|
|
|
|
|
|
|
if raw_compact in by_ticker: |
|
|
result = by_ticker[raw_compact] |
|
|
self._search_cache[norm_query] = result |
|
|
return result |
|
|
|
|
|
|
|
|
partial_matches = [] |
|
|
for key in by_title_norm.keys(): |
|
|
if norm_query in key: |
|
|
partial_matches.append(key) |
|
|
if not partial_matches: |
|
|
for t in by_ticker.keys(): |
|
|
if norm_query in t: |
|
|
partial_matches.append(t) |
|
|
if partial_matches: |
|
|
best_key = max( |
|
|
partial_matches, |
|
|
key=lambda k: difflib.SequenceMatcher(None, norm_query, k).ratio() |
|
|
) |
|
|
result = by_title_norm.get(best_key) or by_ticker.get(best_key) |
|
|
if result: |
|
|
self._search_cache[norm_query] = result |
|
|
return result |
|
|
|
|
|
|
|
|
close = difflib.get_close_matches(norm_query, all_keys, n=1, cutoff=0.78) |
|
|
if close: |
|
|
best = close[0] |
|
|
result = by_title_norm.get(best) or by_ticker.get(best) |
|
|
if result: |
|
|
self._search_cache[norm_query] = result |
|
|
return result |
|
|
|
|
|
|
|
|
return None |
|
|
|
|
|
except TimeoutError as e: |
|
|
print(f"Timeout searching company: {e}") |
|
|
return None |
|
|
except Exception as e: |
|
|
print(f"Error searching company: {e}") |
|
|
return None |
|
|
|
|
|
@lru_cache(maxsize=128) |
|
|
def get_company_info(self, cik): |
|
|
""" |
|
|
Get basic company information (cached) |
|
|
|
|
|
Args: |
|
|
cik (str): Company CIK code |
|
|
|
|
|
Returns: |
|
|
dict: Dictionary containing company information |
|
|
""" |
|
|
if not self.edgar: |
|
|
print("sec_edgar_api library not installed") |
|
|
return None |
|
|
|
|
|
try: |
|
|
self._rate_limit() |
|
|
|
|
|
submissions = self.edgar.get_submissions(cik=cik) |
|
|
|
|
|
return { |
|
|
"cik": cik, |
|
|
"name": submissions.get("name", ""), |
|
|
"tickers": submissions.get("tickers", []), |
|
|
"sic": submissions.get("sic", ""), |
|
|
"sic_description": submissions.get("sicDescription", "") |
|
|
} |
|
|
except TimeoutError as e: |
|
|
print(f"Timeout getting company info for CIK {cik}: {e}") |
|
|
return None |
|
|
except Exception as e: |
|
|
print(f"Error getting company info: {e}") |
|
|
return None |
|
|
|
|
|
@lru_cache(maxsize=128) |
|
|
def get_company_filings(self, cik, form_types=None): |
|
|
""" |
|
|
Get all company filing documents (cached) |
|
|
|
|
|
Args: |
|
|
cik (str): Company CIK code |
|
|
form_types (tuple): Tuple of form types, e.g., ('10-K', '10-Q'), None for all types |
|
|
|
|
|
Returns: |
|
|
list: List of filing documents |
|
|
""" |
|
|
if not self.edgar: |
|
|
print("sec_edgar_api library not installed") |
|
|
return [] |
|
|
|
|
|
|
|
|
if form_types and isinstance(form_types, list): |
|
|
form_types = tuple(form_types) |
|
|
|
|
|
try: |
|
|
self._rate_limit() |
|
|
|
|
|
submissions = self.edgar.get_submissions(cik=cik) |
|
|
|
|
|
|
|
|
filings = [] |
|
|
recent = submissions.get("filings", {}).get("recent", {}) |
|
|
|
|
|
|
|
|
form_types_list = recent.get("form", []) |
|
|
filing_dates = recent.get("filingDate", []) |
|
|
accession_numbers = recent.get("accessionNumber", []) |
|
|
primary_documents = recent.get("primaryDocument", []) |
|
|
|
|
|
|
|
|
for i in range(len(form_types_list)): |
|
|
form_type = form_types_list[i] |
|
|
|
|
|
|
|
|
|
|
|
normalized_form_type = form_type.split('/')[0] |
|
|
|
|
|
|
|
|
if form_types and normalized_form_type not in form_types: |
|
|
continue |
|
|
|
|
|
filing_date = filing_dates[i] if i < len(filing_dates) else "" |
|
|
accession_number = accession_numbers[i] if i < len(accession_numbers) else "" |
|
|
primary_document = primary_documents[i] if i < len(primary_documents) else "" |
|
|
|
|
|
filing = { |
|
|
"form_type": form_type, |
|
|
"filing_date": filing_date, |
|
|
"accession_number": accession_number, |
|
|
"primary_document": primary_document |
|
|
} |
|
|
|
|
|
filings.append(filing) |
|
|
|
|
|
return filings |
|
|
except TimeoutError as e: |
|
|
print(f"Timeout getting company filings for CIK {cik}: {e}") |
|
|
return [] |
|
|
except Exception as e: |
|
|
print(f"Error getting company filings: {e}") |
|
|
return [] |
|
|
|
|
|
@lru_cache(maxsize=128) |
|
|
def get_company_facts(self, cik): |
|
|
""" |
|
|
Get all company financial facts data (cached) |
|
|
|
|
|
Args: |
|
|
cik (str): Company CIK code |
|
|
|
|
|
Returns: |
|
|
dict: Company financial facts data |
|
|
""" |
|
|
if not self.edgar: |
|
|
print("sec_edgar_api library not installed") |
|
|
return {} |
|
|
|
|
|
try: |
|
|
self._rate_limit() |
|
|
|
|
|
facts = self.edgar.get_company_facts(cik=cik) |
|
|
return facts |
|
|
except TimeoutError as e: |
|
|
print(f"Timeout getting company facts for CIK {cik}: {e}") |
|
|
return {} |
|
|
except Exception as e: |
|
|
print(f"Error getting company facts: {e}") |
|
|
return {} |
|
|
|
|
|
def get_financial_data_for_period(self, cik, period): |
|
|
""" |
|
|
Get financial data for a specific period (supports annual and quarterly) - Cached |
|
|
|
|
|
Args: |
|
|
cik (str): Company CIK code |
|
|
period (str): Period in format 'YYYY' or 'YYYYQX' (e.g., '2025' or '2025Q3') |
|
|
|
|
|
Returns: |
|
|
dict: Financial data dictionary |
|
|
""" |
|
|
if not self.edgar: |
|
|
print("sec_edgar_api library not installed") |
|
|
return {} |
|
|
|
|
|
|
|
|
cache_key = f"period_{cik}_{period}" |
|
|
if hasattr(self, '_period_cache') and cache_key in self._period_cache: |
|
|
return self._period_cache[cache_key] |
|
|
|
|
|
if not hasattr(self, '_period_cache'): |
|
|
self._period_cache = {} |
|
|
|
|
|
try: |
|
|
|
|
|
facts = self.get_company_facts(cik) |
|
|
|
|
|
if not facts: |
|
|
return {} |
|
|
|
|
|
|
|
|
us_gaap = facts.get("facts", {}).get("us-gaap", {}) |
|
|
ifrs_full = facts.get("facts", {}).get("ifrs-full", {}) |
|
|
|
|
|
|
|
|
|
|
|
financial_metrics = { |
|
|
"total_revenue": ["Revenues", "RevenueFromContractWithCustomerExcludingAssessedTax", "RevenueFromContractWithCustomerIncludingAssessedTax", "SalesRevenueNet", "RevenueFromContractWithCustomer", "Revenue"], |
|
|
"net_income": ["NetIncomeLoss", "ProfitLoss", "NetIncome", "ProfitLossAttributableToOwnersOfParent"], |
|
|
"earnings_per_share": ["EarningsPerShareBasic", "EarningsPerShare", "BasicEarningsPerShare", "BasicEarningsLossPerShare"], |
|
|
"operating_expenses": ["OperatingExpenses", "OperatingCostsAndExpenses", "OperatingExpensesExcludingDepreciationAndAmortization", "CostsAndExpenses", "GeneralAndAdministrativeExpense", "CostOfRevenue", "ResearchAndDevelopmentExpense", "SellingAndMarketingExpense"], |
|
|
"operating_cash_flow": ["NetCashProvidedByUsedInOperatingActivities", "NetCashProvidedUsedInOperatingActivities", "NetCashFlowsFromUsedInOperatingActivities", "CashFlowsFromUsedInOperatingActivities"], |
|
|
} |
|
|
|
|
|
|
|
|
result = { |
|
|
"period": period, |
|
|
"_metadata": {}, |
|
|
"metrics": {} |
|
|
} |
|
|
|
|
|
|
|
|
if 'Q' in period: |
|
|
|
|
|
target_forms = ("10-Q",) |
|
|
target_forms_annual = ("10-K", "20-F") |
|
|
year = int(period.split('Q')[0]) |
|
|
quarter = period.split('Q')[1] |
|
|
else: |
|
|
|
|
|
target_forms = ("10-K", "20-F") |
|
|
target_forms_annual = target_forms |
|
|
year = int(period) |
|
|
quarter = None |
|
|
|
|
|
|
|
|
filings = self.get_company_filings(cik, form_types=target_forms) |
|
|
filings_map = {} |
|
|
|
|
|
|
|
|
for filing in filings: |
|
|
form_type = filing.get("form_type", "") |
|
|
filing_date = filing.get("filing_date", "") |
|
|
accession_number = filing.get("accession_number", "") |
|
|
primary_document = filing.get("primary_document", "") |
|
|
|
|
|
if filing_date and accession_number: |
|
|
|
|
|
file_year = int(filing_date[:4]) if len(filing_date) >= 4 else 0 |
|
|
|
|
|
|
|
|
|
|
|
normalized_form_type = form_type.split('/')[0] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
key = f"{normalized_form_type}_{file_year}" |
|
|
if key not in filings_map: |
|
|
filings_map[key] = { |
|
|
"accession_number": accession_number, |
|
|
"primary_document": primary_document, |
|
|
"form_type": form_type, |
|
|
"filing_date": filing_date |
|
|
} |
|
|
|
|
|
|
|
|
for metric_key, metric_tags in financial_metrics.items(): |
|
|
|
|
|
for metric_tag in metric_tags: |
|
|
|
|
|
metric_data = None |
|
|
data_source = None |
|
|
|
|
|
if metric_tag in us_gaap: |
|
|
metric_data = us_gaap[metric_tag] |
|
|
data_source = "us-gaap" |
|
|
elif metric_tag in ifrs_full: |
|
|
metric_data = ifrs_full[metric_tag] |
|
|
data_source = "ifrs-full" |
|
|
|
|
|
if metric_data: |
|
|
units = metric_data.get("units", {}) |
|
|
|
|
|
|
|
|
usd_data = None |
|
|
if "USD" in units: |
|
|
usd_data = units["USD"] |
|
|
elif "USD/shares" in units and metric_key == "earnings_per_share": |
|
|
|
|
|
usd_data = units["USD/shares"] |
|
|
|
|
|
if usd_data: |
|
|
|
|
|
matched_entry = None |
|
|
|
|
|
|
|
|
for entry in usd_data: |
|
|
form = entry.get("form", "") |
|
|
fy = entry.get("fy", 0) |
|
|
fp = entry.get("fp", "") |
|
|
end_date = entry.get("end", "") |
|
|
|
|
|
if not end_date or len(end_date) < 4: |
|
|
continue |
|
|
|
|
|
entry_year = int(end_date[:4]) |
|
|
|
|
|
|
|
|
if form in target_forms: |
|
|
if quarter: |
|
|
|
|
|
if entry_year == year and fp == f"Q{quarter}": |
|
|
|
|
|
if matched_entry: |
|
|
if entry.get("end", "") > matched_entry.get("end", ""): |
|
|
matched_entry = entry |
|
|
else: |
|
|
matched_entry = entry |
|
|
else: |
|
|
|
|
|
|
|
|
if fy == year and (fp == "FY" or fp == "" or not fp): |
|
|
|
|
|
if matched_entry: |
|
|
if entry.get("end", "") > matched_entry.get("end", ""): |
|
|
matched_entry = entry |
|
|
else: |
|
|
matched_entry = entry |
|
|
|
|
|
elif not matched_entry and entry_year == year and (fp == "FY" or fp == "" or not fp): |
|
|
matched_entry = entry |
|
|
|
|
|
elif not matched_entry and fy > 0 and abs(fy - year) <= 1 and (fp == "FY" or fp == "" or not fp): |
|
|
matched_entry = entry |
|
|
|
|
|
elif not matched_entry and form == "20-F" and "frame" in entry: |
|
|
frame = entry.get("frame", "") |
|
|
if f"CY{year}" in frame or str(year) in end_date: |
|
|
matched_entry = entry |
|
|
|
|
|
|
|
|
if not matched_entry and quarter and target_forms_annual: |
|
|
for entry in usd_data: |
|
|
form = entry.get("form", "") |
|
|
end_date = entry.get("end", "") |
|
|
fp = entry.get("fp", "") |
|
|
|
|
|
if form in target_forms_annual and end_date: |
|
|
|
|
|
if str(year) in end_date and f"Q{quarter}" in fp: |
|
|
matched_entry = entry |
|
|
break |
|
|
|
|
|
|
|
|
if matched_entry: |
|
|
|
|
|
result["metrics"][metric_key] = { |
|
|
"value": matched_entry.get("val", 0), |
|
|
"tag": metric_tag |
|
|
} |
|
|
|
|
|
|
|
|
if not result["_metadata"]: |
|
|
form_type = matched_entry.get("form", "") |
|
|
accn_from_facts = matched_entry.get('accn', '').replace('-', '') |
|
|
filed_date = matched_entry.get('filed', '') |
|
|
|
|
|
|
|
|
filing_info = None |
|
|
|
|
|
|
|
|
filing_key = f"{form_type}_{year}" |
|
|
filing_info = filings_map.get(filing_key) |
|
|
|
|
|
|
|
|
if not filing_info and filed_date: |
|
|
filed_year = int(filed_date[:4]) if len(filed_date) >= 4 else 0 |
|
|
if filed_year > 0: |
|
|
filing_key = f"{form_type}_{filed_year}" |
|
|
filing_info = filings_map.get(filing_key) |
|
|
|
|
|
|
|
|
if not filing_info: |
|
|
filing_key = f"{form_type}_{year + 1}" |
|
|
filing_info = filings_map.get(filing_key) |
|
|
|
|
|
|
|
|
if not filing_info and accn_from_facts: |
|
|
for key, finfo in filings_map.items(): |
|
|
if finfo["form_type"] == form_type: |
|
|
filing_accn = finfo["accession_number"].replace('-', '') |
|
|
if filing_accn == accn_from_facts: |
|
|
filing_info = finfo |
|
|
break |
|
|
|
|
|
|
|
|
source_url = "" |
|
|
if filing_info: |
|
|
accession_number = filing_info["accession_number"].replace('-', '') |
|
|
primary_document = filing_info["primary_document"] |
|
|
if primary_document: |
|
|
source_url = f"https://www.sec.gov/Archives/edgar/data/{cik}/{accession_number}/{primary_document}" |
|
|
else: |
|
|
source_url = f"https://www.sec.gov/cgi-bin/browse-edgar?action=getcompany&CIK={cik}&type={form_type}&dateb=&owner=exclude&count=100" |
|
|
else: |
|
|
source_url = f"https://www.sec.gov/cgi-bin/browse-edgar?action=getcompany&CIK={cik}&type={form_type}&dateb=&owner=exclude&count=100" |
|
|
|
|
|
|
|
|
result["_metadata"] = { |
|
|
"form": matched_entry.get("form", ""), |
|
|
"fiscal_year": matched_entry.get("fy", 0), |
|
|
"fiscal_period": matched_entry.get("fp", ""), |
|
|
"start_date": matched_entry.get("start", ""), |
|
|
"end_date": matched_entry.get("end", ""), |
|
|
"filed_date": matched_entry.get("filed", ""), |
|
|
"source_url": source_url, |
|
|
"data_source": data_source |
|
|
} |
|
|
|
|
|
|
|
|
if metric_key in result["metrics"]: |
|
|
break |
|
|
|
|
|
|
|
|
if result and "period" in result: |
|
|
self._period_cache[cache_key] = result |
|
|
|
|
|
return result |
|
|
except Exception as e: |
|
|
print(f"Error getting financial data for period {period}: {e}") |
|
|
return {} |
|
|
|
|
|
|
|
|
|