|
|
"""Financial Data Analysis Module"""
|
|
|
|
|
|
from EasyReportDataMCP.edgar_client import EdgarDataClient
|
|
|
from datetime import datetime
|
|
|
from functools import lru_cache
|
|
|
import json
|
|
|
|
|
|
|
|
|
class FinancialAnalyzer:
|
|
|
def __init__(self, user_agent="Juntao Peng Financial Report Metrics App (jtyxabc@gmail.com)"):
|
|
|
"""
|
|
|
Initialize financial analyzer
|
|
|
|
|
|
Args:
|
|
|
user_agent (str): User agent string for identifying request source
|
|
|
"""
|
|
|
self.edgar_client = EdgarDataClient(user_agent)
|
|
|
|
|
|
self._search_cache = {}
|
|
|
self._extract_metrics_cache = {}
|
|
|
|
|
|
def search_company(self, company_input):
|
|
|
"""
|
|
|
Search company information (by name or CIK) - Optimized version
|
|
|
|
|
|
Args:
|
|
|
company_input (str): Company name or CIK
|
|
|
|
|
|
Returns:
|
|
|
dict: Company information
|
|
|
"""
|
|
|
|
|
|
if company_input in self._search_cache:
|
|
|
return self._search_cache[company_input]
|
|
|
|
|
|
|
|
|
if company_input.isdigit() and len(company_input) >= 8:
|
|
|
|
|
|
company_info = self.edgar_client.get_company_info(company_input)
|
|
|
if company_info:
|
|
|
self._search_cache[company_input] = company_info
|
|
|
return company_info
|
|
|
else:
|
|
|
return {"error": "Company not found for specified CIK"}
|
|
|
else:
|
|
|
|
|
|
company = self.edgar_client.search_company_by_name(company_input)
|
|
|
if company:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
result = {
|
|
|
"cik": company['cik'],
|
|
|
"name": company['name'],
|
|
|
"tickers": [company['ticker']] if company.get('ticker') else [],
|
|
|
"_source": "company_tickers_cache"
|
|
|
}
|
|
|
self._search_cache[company_input] = result
|
|
|
return result
|
|
|
else:
|
|
|
return {"error": "No matching company found"}
|
|
|
|
|
|
def get_company_filings_list(self, cik, form_types=['10-K', '10-Q']):
|
|
|
"""
|
|
|
Get company filings list
|
|
|
|
|
|
Args:
|
|
|
cik (str): Company CIK
|
|
|
form_types (list): List of form types
|
|
|
|
|
|
Returns:
|
|
|
list: Filings list
|
|
|
"""
|
|
|
filings = self.edgar_client.get_company_filings(cik, form_types)
|
|
|
return filings
|
|
|
|
|
|
def extract_financial_metrics(self, cik, years=3):
|
|
|
"""
|
|
|
Extract financial metrics for specified number of years (optimized)
|
|
|
|
|
|
Args:
|
|
|
cik (str): Company CIK
|
|
|
years (int): Number of years to extract, default is 3 years
|
|
|
|
|
|
Returns:
|
|
|
list: List of financial data
|
|
|
"""
|
|
|
|
|
|
cache_key = f"{cik}_{years}"
|
|
|
if cache_key in self._extract_metrics_cache:
|
|
|
return self._extract_metrics_cache[cache_key]
|
|
|
|
|
|
financial_data = []
|
|
|
|
|
|
|
|
|
facts = self.edgar_client.get_company_facts(cik)
|
|
|
if not facts:
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
|
|
filings_10k = self.edgar_client.get_company_filings(cik, ('10-K',))
|
|
|
filings_20f = self.edgar_client.get_company_filings(cik, ('20-F',))
|
|
|
all_annual_filings = filings_10k + filings_20f
|
|
|
|
|
|
if not all_annual_filings:
|
|
|
return []
|
|
|
|
|
|
|
|
|
is_20f_filer = len(filings_20f) > 0 and len(filings_10k) == 0
|
|
|
has_quarterly = False
|
|
|
|
|
|
|
|
|
filing_year_map = {}
|
|
|
|
|
|
for filing in all_annual_filings:
|
|
|
filing_date = filing.get('filing_date', '')
|
|
|
if filing_date and len(filing_date) >= 4:
|
|
|
try:
|
|
|
file_year = int(filing_date[:4])
|
|
|
if file_year not in filing_year_map:
|
|
|
filing_year_map[file_year] = []
|
|
|
filing_year_map[file_year].append(filing)
|
|
|
except ValueError:
|
|
|
continue
|
|
|
|
|
|
if not filing_year_map:
|
|
|
return []
|
|
|
|
|
|
|
|
|
sorted_years = sorted(filing_year_map.keys(), reverse=True)
|
|
|
target_years = sorted_years[:years]
|
|
|
|
|
|
|
|
|
filing_to_fiscal_year = {}
|
|
|
|
|
|
|
|
|
for data_source in ["us-gaap", "ifrs-full"]:
|
|
|
if data_source in facts.get("facts", {}):
|
|
|
source_data = facts["facts"][data_source]
|
|
|
|
|
|
|
|
|
revenue_tags = ["Revenues", "RevenueFromContractWithCustomerExcludingAssessedTax",
|
|
|
"Revenue", "RevenueFromContractWithCustomer"]
|
|
|
|
|
|
for tag in revenue_tags:
|
|
|
if tag in source_data:
|
|
|
units = source_data[tag].get("units", {})
|
|
|
if "USD" in units:
|
|
|
for entry in units["USD"]:
|
|
|
form = entry.get("form", "")
|
|
|
fy = entry.get("fy", 0)
|
|
|
filed = entry.get("filed", "")
|
|
|
fp = entry.get("fp", "")
|
|
|
|
|
|
|
|
|
if form in ["10-K", "20-F"] and fy > 0 and filed and (fp == "FY" or not fp):
|
|
|
if len(filed) >= 10:
|
|
|
try:
|
|
|
file_year = int(filed[:4])
|
|
|
|
|
|
if file_year not in filing_to_fiscal_year:
|
|
|
filing_to_fiscal_year[file_year] = fy
|
|
|
except ValueError:
|
|
|
continue
|
|
|
break
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
periods = []
|
|
|
for file_year in target_years:
|
|
|
|
|
|
fiscal_year = filing_to_fiscal_year.get(file_year, file_year)
|
|
|
|
|
|
|
|
|
periods.append({
|
|
|
'period': str(fiscal_year),
|
|
|
'type': 'annual',
|
|
|
'fiscal_year': fiscal_year,
|
|
|
'filing_year': file_year
|
|
|
})
|
|
|
|
|
|
|
|
|
if not is_20f_filer:
|
|
|
|
|
|
for quarter in range(4, 0, -1):
|
|
|
periods.append({
|
|
|
'period': f"{fiscal_year}Q{quarter}",
|
|
|
'type': 'quarterly',
|
|
|
'fiscal_year': fiscal_year,
|
|
|
'filing_year': file_year
|
|
|
})
|
|
|
|
|
|
|
|
|
for idx, period_info in enumerate(periods):
|
|
|
period = period_info['period']
|
|
|
fiscal_year = period_info['fiscal_year']
|
|
|
|
|
|
data = self.edgar_client.get_financial_data_for_period(cik, period)
|
|
|
|
|
|
if data and "period" in data:
|
|
|
|
|
|
if period_info['type'] == 'annual':
|
|
|
data["period"] = f"FY{fiscal_year}"
|
|
|
|
|
|
|
|
|
data["_sequence"] = idx
|
|
|
|
|
|
financial_data.append(data)
|
|
|
|
|
|
|
|
|
if financial_data:
|
|
|
self._extract_metrics_cache[cache_key] = financial_data
|
|
|
|
|
|
return financial_data
|
|
|
|
|
|
def get_latest_financial_data(self, cik):
|
|
|
"""
|
|
|
Get latest financial data (optimized)
|
|
|
|
|
|
Args:
|
|
|
cik (str): Company CIK
|
|
|
|
|
|
Returns:
|
|
|
dict: Latest financial data
|
|
|
"""
|
|
|
|
|
|
|
|
|
filings_10k = self.edgar_client.get_company_filings(cik, ('10-K',))
|
|
|
filings_20f = self.edgar_client.get_company_filings(cik, ('20-F',))
|
|
|
filings = filings_10k + filings_20f
|
|
|
|
|
|
if not filings:
|
|
|
return {}
|
|
|
|
|
|
|
|
|
latest_filing_year = None
|
|
|
for filing in filings:
|
|
|
if 'filing_date' in filing and filing['filing_date']:
|
|
|
try:
|
|
|
filing_year = int(filing['filing_date'][:4])
|
|
|
if latest_filing_year is None or filing_year > latest_filing_year:
|
|
|
latest_filing_year = filing_year
|
|
|
except ValueError:
|
|
|
continue
|
|
|
|
|
|
if latest_filing_year is None:
|
|
|
return {}
|
|
|
|
|
|
|
|
|
return self.edgar_client.get_financial_data_for_period(cik, str(latest_filing_year))
|
|
|
|
|
|
def format_financial_data(self, financial_data):
|
|
|
"""
|
|
|
Format financial data for display
|
|
|
|
|
|
Args:
|
|
|
financial_data (dict or list): Financial data
|
|
|
|
|
|
Returns:
|
|
|
dict or list: Formatted financial data
|
|
|
"""
|
|
|
if isinstance(financial_data, list):
|
|
|
|
|
|
sorted_data = sorted(financial_data, key=lambda x: x.get("_sequence", 999))
|
|
|
formatted_data = []
|
|
|
for data in sorted_data:
|
|
|
formatted_data.append(self._format_single_financial_data(data))
|
|
|
return formatted_data
|
|
|
else:
|
|
|
return self._format_single_financial_data(financial_data)
|
|
|
|
|
|
def _format_single_financial_data(self, data):
|
|
|
"""
|
|
|
Format single financial data entry - optimized structure
|
|
|
|
|
|
Args:
|
|
|
data (dict): Financial data with new optimized structure
|
|
|
|
|
|
Returns:
|
|
|
dict: Formatted financial data
|
|
|
"""
|
|
|
formatted = {
|
|
|
"period": data.get("period"),
|
|
|
"_sequence": data.get("_sequence")
|
|
|
}
|
|
|
|
|
|
|
|
|
if "metrics" in data and isinstance(data["metrics"], dict):
|
|
|
|
|
|
for metric_key, metric_data in data["metrics"].items():
|
|
|
if isinstance(metric_data, dict):
|
|
|
formatted[metric_key] = metric_data.get("value")
|
|
|
else:
|
|
|
|
|
|
formatted[metric_key] = metric_data
|
|
|
|
|
|
|
|
|
if "_metadata" in data:
|
|
|
metadata = data["_metadata"]
|
|
|
formatted["source_url"] = metadata.get("source_url")
|
|
|
formatted["source_form"] = metadata.get("form")
|
|
|
formatted["data_source"] = metadata.get("data_source")
|
|
|
else:
|
|
|
|
|
|
formatted.update(data)
|
|
|
|
|
|
|
|
|
key_fields = ['total_revenue', 'net_income', 'earnings_per_share',
|
|
|
'operating_expenses', 'operating_cash_flow', 'source_url', 'source_form']
|
|
|
for key in key_fields:
|
|
|
if key not in formatted:
|
|
|
formatted[key] = None
|
|
|
|
|
|
|
|
|
if 'earnings_per_share' in formatted and isinstance(formatted['earnings_per_share'], (int, float)):
|
|
|
formatted['earnings_per_share'] = round(formatted['earnings_per_share'], 2)
|
|
|
|
|
|
return formatted
|
|
|
|
|
|
|
|
|
|