Easy-Financial-Report / EasyReportDataMCP /financial_analyzer.py
JC321's picture
Upload financial_analyzer.py
941f17c verified
"""Financial Data Analysis Module"""
from EasyReportDataMCP.edgar_client import EdgarDataClient
from datetime import datetime
from functools import lru_cache
import json
class FinancialAnalyzer:
def __init__(self, user_agent="Juntao Peng Financial Report Metrics App (jtyxabc@gmail.com)"):
"""
Initialize financial analyzer
Args:
user_agent (str): User agent string for identifying request source
"""
self.edgar_client = EdgarDataClient(user_agent)
# 新增:实例级缓存,进一步提升性能
self._search_cache = {}
self._extract_metrics_cache = {} # 缓存 extract_financial_metrics 结果
def search_company(self, company_input):
"""
Search company information (by name or CIK) - Optimized version
Args:
company_input (str): Company name or CIK
Returns:
dict: Company information
"""
# 实例级缓存检查
if company_input in self._search_cache:
return self._search_cache[company_input]
# If input is numeric, assume it's a CIK
if company_input.isdigit() and len(company_input) >= 8:
# Get company information from cache (will use @lru_cache)
company_info = self.edgar_client.get_company_info(company_input)
if company_info:
self._search_cache[company_input] = company_info
return company_info
else:
return {"error": "Company not found for specified CIK"}
else:
# Search company by name/ticker (uses cached company_tickers.json)
company = self.edgar_client.search_company_by_name(company_input)
if company:
# ✅ OPTIMIZATION: Return basic info directly without calling get_company_info
# search_company_by_name already returns: cik, name, ticker
# Only call get_company_info if we need SIC code or description
# For basic searches, the ticker data is sufficient
# This eliminates the 3-5 second delay from get_company_info
result = {
"cik": company['cik'],
"name": company['name'],
"tickers": [company['ticker']] if company.get('ticker') else [],
"_source": "company_tickers_cache" # Debug info
}
self._search_cache[company_input] = result
return result
else:
return {"error": "No matching company found"}
def get_company_filings_list(self, cik, form_types=['10-K', '10-Q']):
"""
Get company filings list
Args:
cik (str): Company CIK
form_types (list): List of form types
Returns:
list: Filings list
"""
filings = self.edgar_client.get_company_filings(cik, form_types)
return filings
def extract_financial_metrics(self, cik, years=3):
"""
Extract financial metrics for specified number of years (optimized)
Args:
cik (str): Company CIK
years (int): Number of years to extract, default is 3 years
Returns:
list: List of financial data
"""
# 实例级缓存检查(避免重复计算)
cache_key = f"{cik}_{years}"
if cache_key in self._extract_metrics_cache:
return self._extract_metrics_cache[cache_key]
financial_data = []
# Step 1: Get company facts ONCE (will be cached)
facts = self.edgar_client.get_company_facts(cik)
if not facts:
return []
# Step 2: Get company filings ONCE to determine available years
# Use tuple for caching compatibility
filings_10k = self.edgar_client.get_company_filings(cik, ('10-K',))
filings_20f = self.edgar_client.get_company_filings(cik, ('20-F',))
all_annual_filings = filings_10k + filings_20f
if not all_annual_filings:
return []
# Detect if company is a 20-F filer (foreign company)
is_20f_filer = len(filings_20f) > 0 and len(filings_10k) == 0
has_quarterly = False # 20-F filers typically don't have quarterly reports
# Step 3: Extract filing years from annual reports
filing_year_map = {} # Map: filing_year -> list of filings
for filing in all_annual_filings:
filing_date = filing.get('filing_date', '')
if filing_date and len(filing_date) >= 4:
try:
file_year = int(filing_date[:4])
if file_year not in filing_year_map:
filing_year_map[file_year] = []
filing_year_map[file_year].append(filing)
except ValueError:
continue
if not filing_year_map:
return []
# Step 4: Sort years in descending order and take the most recent N years
sorted_years = sorted(filing_year_map.keys(), reverse=True)
target_years = sorted_years[:years]
# Step 5: Map filing years to fiscal years using facts (already fetched)
filing_to_fiscal_year = {} # Map: filing_year -> fiscal_year
# Try to map filing years to fiscal years using Company Facts
for data_source in ["us-gaap", "ifrs-full"]:
if data_source in facts.get("facts", {}):
source_data = facts["facts"][data_source]
# Look for Revenue tag to get fiscal year mapping
revenue_tags = ["Revenues", "RevenueFromContractWithCustomerExcludingAssessedTax",
"Revenue", "RevenueFromContractWithCustomer"]
for tag in revenue_tags:
if tag in source_data:
units = source_data[tag].get("units", {})
if "USD" in units:
for entry in units["USD"]:
form = entry.get("form", "")
fy = entry.get("fy", 0)
filed = entry.get("filed", "") # Filing date
fp = entry.get("fp", "")
# Map filing year to fiscal year
if form in ["10-K", "20-F"] and fy > 0 and filed and (fp == "FY" or not fp):
if len(filed) >= 10: # Format: YYYY-MM-DD
try:
file_year = int(filed[:4])
# Store the mapping: filing_year -> fiscal_year
if file_year not in filing_to_fiscal_year:
filing_to_fiscal_year[file_year] = fy
except ValueError:
continue
break # Found revenue tag, no need to check more
# Step 6: Generate period list for target years
# For each year: FY -> Q4 -> Q3 -> Q2 -> Q1 (descending order)
# For 20-F filers: only FY (no quarterly data)
periods = []
for file_year in target_years:
# Try to get fiscal year from mapping, otherwise use filing year
fiscal_year = filing_to_fiscal_year.get(file_year, file_year)
# First add annual data for this fiscal year
periods.append({
'period': str(fiscal_year),
'type': 'annual',
'fiscal_year': fiscal_year,
'filing_year': file_year
})
# Only add quarterly data for 10-K filers (not for 20-F filers)
if not is_20f_filer:
# Then add quarterly data in descending order: Q4, Q3, Q2, Q1
for quarter in range(4, 0, -1):
periods.append({
'period': f"{fiscal_year}Q{quarter}",
'type': 'quarterly',
'fiscal_year': fiscal_year,
'filing_year': file_year
})
# Step 7: Get financial data for each period
for idx, period_info in enumerate(periods):
period = period_info['period']
fiscal_year = period_info['fiscal_year']
data = self.edgar_client.get_financial_data_for_period(cik, period)
if data and "period" in data:
# Add fiscal year prefix for annual data
if period_info['type'] == 'annual':
data["period"] = f"FY{fiscal_year}"
# Add sequence number to maintain order
data["_sequence"] = idx
financial_data.append(data)
# 缓存结果
if financial_data:
self._extract_metrics_cache[cache_key] = financial_data
return financial_data
def get_latest_financial_data(self, cik):
"""
Get latest financial data (optimized)
Args:
cik (str): Company CIK
Returns:
dict: Latest financial data
"""
# Get latest filing year (supports 10-K and 20-F)
# Use tuple for caching
filings_10k = self.edgar_client.get_company_filings(cik, ('10-K',))
filings_20f = self.edgar_client.get_company_filings(cik, ('20-F',))
filings = filings_10k + filings_20f
if not filings:
return {}
# Get latest filing year
latest_filing_year = None
for filing in filings:
if 'filing_date' in filing and filing['filing_date']:
try:
filing_year = int(filing['filing_date'][:4])
if latest_filing_year is None or filing_year > latest_filing_year:
latest_filing_year = filing_year
except ValueError:
continue
if latest_filing_year is None:
return {}
# Get financial data for latest year
return self.edgar_client.get_financial_data_for_period(cik, str(latest_filing_year))
def format_financial_data(self, financial_data):
"""
Format financial data for display
Args:
financial_data (dict or list): Financial data
Returns:
dict or list: Formatted financial data
"""
if isinstance(financial_data, list):
# Sort by _sequence to maintain correct order (FY -> Q4 -> Q3 -> Q2 -> Q1)
sorted_data = sorted(financial_data, key=lambda x: x.get("_sequence", 999))
formatted_data = []
for data in sorted_data:
formatted_data.append(self._format_single_financial_data(data))
return formatted_data
else:
return self._format_single_financial_data(financial_data)
def _format_single_financial_data(self, data):
"""
Format single financial data entry - optimized structure
Args:
data (dict): Financial data with new optimized structure
Returns:
dict: Formatted financial data
"""
formatted = {
"period": data.get("period"),
"_sequence": data.get("_sequence")
}
# Handle new optimized structure with metrics
if "metrics" in data and isinstance(data["metrics"], dict):
# Extract metrics to top level for backward compatibility
for metric_key, metric_data in data["metrics"].items():
if isinstance(metric_data, dict):
formatted[metric_key] = metric_data.get("value")
else:
# Fallback for old format
formatted[metric_key] = metric_data
# Add metadata to top level
if "_metadata" in data:
metadata = data["_metadata"]
formatted["source_url"] = metadata.get("source_url")
formatted["source_form"] = metadata.get("form")
formatted["data_source"] = metadata.get("data_source")
else:
# Fallback: old format compatibility
formatted.update(data)
# Ensure all key fields exist, even if None
key_fields = ['total_revenue', 'net_income', 'earnings_per_share',
'operating_expenses', 'operating_cash_flow', 'source_url', 'source_form']
for key in key_fields:
if key not in formatted:
formatted[key] = None
# Format EPS, keep two decimal places
if 'earnings_per_share' in formatted and isinstance(formatted['earnings_per_share'], (int, float)):
formatted['earnings_per_share'] = round(formatted['earnings_per_share'], 2)
return formatted