|
|
import json |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def safe_int(val, default=0): |
|
|
if val is None: |
|
|
return default |
|
|
try: |
|
|
return int(float(val)) |
|
|
except (ValueError, TypeError): |
|
|
return default |
|
|
|
|
|
def safe_float(val, default=0.0): |
|
|
if val is None: |
|
|
return default |
|
|
try: |
|
|
return float(val) |
|
|
except (ValueError, TypeError): |
|
|
return default |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def extract_last_three_with_fallback(data_list): |
|
|
""" |
|
|
从数据列表中提取最新的5年数据(最新1年 + 历史4年) |
|
|
优先级: FY > Q4 > Q3 > Q2 > Q1 |
|
|
返回: 最新的5个period的数据 |
|
|
""" |
|
|
years = [2025, 2024, 2023, 2022, 2021] |
|
|
priority_levels = [ |
|
|
("FY", [f"FY{y}" for y in years]), |
|
|
("Q4", [f"{y}Q4" for y in years]), |
|
|
("Q3", [f"{y}Q3" for y in years]), |
|
|
("Q2", [f"{y}Q2" for y in years]), |
|
|
("Q1", [f"{y}Q1" for y in years]), |
|
|
] |
|
|
|
|
|
data_map = {item["period"]: item for item in data_list if isinstance(item, dict) and "period" in item} |
|
|
|
|
|
|
|
|
for level_name, periods in priority_levels: |
|
|
records = [] |
|
|
valid = True |
|
|
for period in periods: |
|
|
item = data_map.get(period) |
|
|
if not isinstance(item, dict) or item.get("total_revenue") is None: |
|
|
valid = False |
|
|
break |
|
|
|
|
|
if level_name == "FY": |
|
|
fiscal_year = int(period[2:]) |
|
|
else: |
|
|
fiscal_year = int(period[:4]) |
|
|
|
|
|
records.append({ |
|
|
"period": period, |
|
|
"fiscal_year": fiscal_year, |
|
|
"level": level_name, |
|
|
"total_revenue": item.get("total_revenue"), |
|
|
"net_income": item.get("net_income"), |
|
|
"earnings_per_share": item.get("earnings_per_share"), |
|
|
"operating_expenses": item.get("operating_expenses"), |
|
|
"operating_cash_flow": item.get("operating_cash_flow") |
|
|
}) |
|
|
if valid: |
|
|
return records |
|
|
|
|
|
|
|
|
for level_name, periods in priority_levels: |
|
|
records = [] |
|
|
for period in periods: |
|
|
item = data_map.get(period) |
|
|
if isinstance(item, dict) and item.get("total_revenue") is not None: |
|
|
if level_name == "FY": |
|
|
fiscal_year = int(period[2:]) |
|
|
else: |
|
|
fiscal_year = int(period[:4]) |
|
|
|
|
|
records.append({ |
|
|
"period": period, |
|
|
"fiscal_year": fiscal_year, |
|
|
"level": level_name, |
|
|
"total_revenue": item.get("total_revenue"), |
|
|
"net_income": item.get("net_income"), |
|
|
"earnings_per_share": item.get("earnings_per_share"), |
|
|
"operating_expenses": item.get("operating_expenses"), |
|
|
"operating_cash_flow": item.get("operating_cash_flow") |
|
|
}) |
|
|
if records: |
|
|
return records |
|
|
return [] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def format_number(value): |
|
|
if value >= 1_000_000_000: |
|
|
num = value / 1_000_000_000 |
|
|
if num == int(num): |
|
|
return f"${int(num)}B" |
|
|
else: |
|
|
return f"${num:.2f}B".rstrip('0').rstrip('.') |
|
|
elif value >= 1_000_000: |
|
|
num = value / 1_000_000 |
|
|
if num == int(num): |
|
|
return f"${int(num)}M" |
|
|
else: |
|
|
return f"${num:.1f}M".rstrip('0').rstrip('.') |
|
|
elif value >= 1_000: |
|
|
return f"${value:,.0f}" |
|
|
else: |
|
|
return f"${value}" |
|
|
|
|
|
def format_eps(value): |
|
|
return f"${value:.2f}" |
|
|
|
|
|
def calculate_change(current, previous): |
|
|
if previous == 0: |
|
|
return "+0.0%" if current >= 0 else "-0.0%" |
|
|
change = (current - previous) / abs(previous) * 100 |
|
|
sign = "+" if change >= 0 else "-" |
|
|
return f"{sign}{abs(change):.1f}%" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def build_financial_metrics(three_year_data): |
|
|
if len(three_year_data) < 2: |
|
|
raise ValueError("至少需要两年数据来计算同比变化") |
|
|
|
|
|
sorted_data = sorted(three_year_data, key=lambda x: x["fiscal_year"], reverse=True) |
|
|
latest = sorted_data[0] |
|
|
previous = sorted_data[1] |
|
|
|
|
|
rev_curr = safe_int(latest["total_revenue"]) |
|
|
rev_prev = safe_int(previous["total_revenue"]) |
|
|
|
|
|
net_curr = safe_int(latest["net_income"]) |
|
|
net_prev = safe_int(previous["net_income"]) |
|
|
|
|
|
eps_curr = safe_float(latest["earnings_per_share"]) |
|
|
eps_prev = safe_float(previous["earnings_per_share"]) |
|
|
|
|
|
opex_curr = safe_int(latest["operating_expenses"]) |
|
|
opex_prev = safe_int(previous["operating_expenses"]) |
|
|
|
|
|
cash_curr = safe_int(latest["operating_cash_flow"]) |
|
|
cash_prev = safe_int(previous["operating_cash_flow"]) |
|
|
|
|
|
return [ |
|
|
{ |
|
|
"label": "Total Revenue", |
|
|
"value": format_number(rev_curr), |
|
|
"change": calculate_change(rev_curr, rev_prev), |
|
|
"color": "green" if rev_curr >= rev_prev else "red" |
|
|
}, |
|
|
{ |
|
|
"label": "Net Income", |
|
|
"value": format_number(net_curr), |
|
|
"change": calculate_change(net_curr, net_prev), |
|
|
"color": "green" if net_curr >= net_prev else "red" |
|
|
}, |
|
|
{ |
|
|
"label": "Earnings Per Share", |
|
|
"value": format_eps(eps_curr), |
|
|
"change": calculate_change(eps_curr, eps_prev), |
|
|
"color": "green" if eps_curr >= eps_prev else "red" |
|
|
}, |
|
|
{ |
|
|
"label": "Operating Expenses", |
|
|
"value": format_number(opex_curr), |
|
|
"change": calculate_change(opex_curr, opex_prev), |
|
|
"color": "green" if opex_curr >= opex_prev else "red" |
|
|
}, |
|
|
{ |
|
|
"label": "Operating Cash Flow", |
|
|
"value": format_number(cash_curr), |
|
|
"change": calculate_change(cash_curr, cash_prev), |
|
|
"color": "green" if cash_curr >= cash_prev else "red" |
|
|
} |
|
|
] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def process_financial_data_with_metadata(raw_data): |
|
|
""" |
|
|
返回包含 financial_metrics + year_data + three_year_data 的完整结果 |
|
|
financial_metrics: 最新1年的指标(与前一年对比) |
|
|
three_year_data: 最新的前3年数据(排除最新年,用于Latest 3 Years表格) |
|
|
""" |
|
|
return_value = {"financial_metrics": [], "year_data": "N/A", "three_year_data": []} |
|
|
if not raw_data: |
|
|
return return_value |
|
|
if not isinstance(raw_data, list): |
|
|
return return_value |
|
|
if not isinstance(raw_data[0], dict): |
|
|
return {"financial_metrics": [], "year_data": "N/A", "three_year_data": []} |
|
|
|
|
|
|
|
|
if isinstance(raw_data, str): |
|
|
raw_data = json.loads(raw_data) |
|
|
if not isinstance(raw_data, list): |
|
|
raise TypeError("raw_data 必须是列表或 JSON 字符串") |
|
|
|
|
|
|
|
|
five_years = extract_last_three_with_fallback(raw_data) |
|
|
if not five_years: |
|
|
print("无法提取有效的财务数据") |
|
|
return return_value |
|
|
|
|
|
|
|
|
five_years_sorted = sorted(five_years, key=lambda x: x["fiscal_year"], reverse=True) |
|
|
|
|
|
|
|
|
if len(five_years_sorted) < 2: |
|
|
print("数据不足,至少需要2年数据") |
|
|
return return_value |
|
|
|
|
|
|
|
|
latest_year = five_years_sorted[0] |
|
|
previous_year = five_years_sorted[1] |
|
|
|
|
|
|
|
|
|
|
|
three_years_for_table = five_years_sorted[1:4] if len(five_years_sorted) >= 4 else five_years_sorted[1:] |
|
|
|
|
|
|
|
|
year = latest_year["fiscal_year"] |
|
|
level = latest_year["level"] |
|
|
|
|
|
if level == "FY": |
|
|
year_data = f"FY {year}" |
|
|
else: |
|
|
year_data = f"{year} {level}" |
|
|
|
|
|
|
|
|
financial_metrics = build_financial_metrics([latest_year, previous_year]) |
|
|
|
|
|
|
|
|
return { |
|
|
"financial_metrics": financial_metrics, |
|
|
"year_data": year_data, |
|
|
"three_year_data": three_years_for_table |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|