|
|
import json |
|
|
|
|
|
def extract_last_three_with_fallback(data_list): |
|
|
|
|
|
years = [2025, 2024, 2023] |
|
|
|
|
|
|
|
|
priority_levels = [ |
|
|
("FY", [f"FY{y}" for y in years]), |
|
|
("Q4", [f"{y}Q4" for y in years]), |
|
|
("Q3", [f"{y}Q3" for y in years]), |
|
|
("Q2", [f"{y}Q2" for y in years]), |
|
|
("Q1", [f"{y}Q1" for y in years]), |
|
|
] |
|
|
|
|
|
|
|
|
data_map = {item["period"]: item for item in data_list if "period" in item} |
|
|
|
|
|
|
|
|
for level_name, periods in priority_levels: |
|
|
records = [] |
|
|
valid = True |
|
|
|
|
|
for period in periods: |
|
|
item = data_map.get(period) |
|
|
if item is None or item.get("total_revenue") is None: |
|
|
valid = False |
|
|
break |
|
|
|
|
|
clean_item = { |
|
|
"period": period, |
|
|
"fiscal_year": int(period[:4]) if level_name != "FY" else int(period[2:]), |
|
|
"level": level_name, |
|
|
"total_revenue": item["total_revenue"], |
|
|
"net_income": item.get("net_income"), |
|
|
"earnings_per_share": item.get("earnings_per_share"), |
|
|
"operating_expenses": item.get("operating_expenses"), |
|
|
"operating_cash_flow": item.get("operating_cash_flow"), |
|
|
"source_url": item.get("source_url") |
|
|
} |
|
|
records.append(clean_item) |
|
|
|
|
|
if valid: |
|
|
|
|
|
return records |
|
|
|
|
|
|
|
|
|
|
|
for level_name, periods in priority_levels: |
|
|
records = [] |
|
|
for period in periods: |
|
|
item = data_map.get(period) |
|
|
if item and item.get("total_revenue") is not None: |
|
|
clean_item = { |
|
|
"period": period, |
|
|
"fiscal_year": int(period[:4]) if level_name != "FY" else int(period[2:]), |
|
|
"level": level_name, |
|
|
"total_revenue": item["total_revenue"], |
|
|
"net_income": item.get("net_income"), |
|
|
"earnings_per_share": item.get("earnings_per_share"), |
|
|
"operating_expenses": item.get("operating_expenses"), |
|
|
"operating_cash_flow": item.get("operating_cash_flow"), |
|
|
"source_url": item.get("source_url") |
|
|
} |
|
|
records.append(clean_item) |
|
|
if records: |
|
|
return records |
|
|
|
|
|
return [] |
|
|
|
|
|
|
|
|
def format_number(value): |
|
|
"""将大数字格式化为 $XM 或 $XB""" |
|
|
if value >= 1_000_000_000: |
|
|
return f"${value / 1_000_000_000:.2f}B".replace(".00B", "B").replace(".0B", "B") |
|
|
elif value >= 1_000_000: |
|
|
return f"${value / 1_000_000:.1f}M".replace(".0M", "M") |
|
|
else: |
|
|
return f"${value:,.0f}" |
|
|
|
|
|
def format_eps(value): |
|
|
"""EPS 保留两位小数""" |
|
|
return f"${value:.2f}" |
|
|
|
|
|
def safe_int(val): |
|
|
"""安全转换为 int,支持字符串或 None""" |
|
|
if val is None: |
|
|
return 0 |
|
|
try: |
|
|
return int(float(val)) |
|
|
except (ValueError, TypeError): |
|
|
return 0 |
|
|
|
|
|
def calculate_change(current, previous): |
|
|
"""计算同比变化百分比,返回如 '+12.4%' 或 '-3.2%'""" |
|
|
if previous == 0: |
|
|
return "+0.0%" if current >= 0 else "-0.0%" |
|
|
change = (current - previous) / abs(previous) * 100 |
|
|
sign = "+" if change >= 0 else "-" |
|
|
return f"{sign}{abs(change):.1f}%" |
|
|
|
|
|
def build_financial_metrics_three_year_data(three_year_data): |
|
|
|
|
|
sorted_data = sorted(three_year_data, key=lambda x: x["fiscal_year"], reverse=True) |
|
|
if len(sorted_data) < 2: |
|
|
raise ValueError("至少需要两年数据来计算同比变化") |
|
|
|
|
|
latest = sorted_data[0] |
|
|
previous = sorted_data[1] |
|
|
|
|
|
|
|
|
rev_curr = safe_int(latest.get("total_revenue")) |
|
|
rev_prev = safe_int(previous.get("total_revenue")) |
|
|
|
|
|
net_curr = safe_int(latest.get("net_income")) |
|
|
net_prev = safe_int(previous.get("net_income")) |
|
|
|
|
|
eps_curr = float(latest.get("earnings_per_share", 0) or 0) |
|
|
eps_prev = float(previous.get("earnings_per_share", 0) or 0) |
|
|
|
|
|
opex_curr = safe_int(latest.get("operating_expenses")) |
|
|
opex_prev = safe_int(previous.get("operating_expenses")) |
|
|
|
|
|
cash_curr = safe_int(latest.get("operating_cash_flow")) |
|
|
cash_prev = safe_int(previous.get("operating_cash_flow")) |
|
|
|
|
|
metrics = [ |
|
|
{ |
|
|
"label": "Total Revenue", |
|
|
"value": format_number(rev_curr), |
|
|
"change": calculate_change(rev_curr, rev_prev), |
|
|
"color": "green" if rev_curr >= rev_prev else "red" |
|
|
}, |
|
|
{ |
|
|
"label": "Net Income", |
|
|
"value": format_number(net_curr), |
|
|
"change": calculate_change(net_curr, net_prev), |
|
|
"color": "green" if net_curr >= net_prev else "red" |
|
|
}, |
|
|
{ |
|
|
"label": "Earnings Per Share", |
|
|
"value": format_eps(eps_curr), |
|
|
"change": calculate_change(eps_curr, eps_prev), |
|
|
"color": "green" if eps_curr >= eps_prev else "red" |
|
|
}, |
|
|
{ |
|
|
"label": "Operating Expenses", |
|
|
"value": format_number(opex_curr), |
|
|
"change": calculate_change(opex_curr, opex_prev), |
|
|
"color": "green" if opex_curr >= opex_prev else "red" |
|
|
}, |
|
|
{ |
|
|
"label": "Cash Flow", |
|
|
"value": format_number(cash_curr), |
|
|
"change": calculate_change(cash_curr, cash_prev), |
|
|
"color": "green" if cash_curr >= cash_prev else "red" |
|
|
} |
|
|
] |
|
|
|
|
|
return metrics |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from collections import defaultdict |
|
|
import re |
|
|
|
|
|
def parse_period(period): |
|
|
"""解析 period 字符串,返回 (year, type, quarter)""" |
|
|
if period.startswith('FY'): |
|
|
year = int(period[2:]) |
|
|
return year, 'FY', None |
|
|
elif re.match(r'Q[1-4]-\d{4}', period): |
|
|
q, year = period.split('-') |
|
|
return int(year), 'Q', int(q[1]) |
|
|
else: |
|
|
raise ValueError(f"Unknown period format: {period}") |
|
|
|
|
|
def get_best_value_for_year(year_data, key): |
|
|
""" |
|
|
year_data: dict like {'FY': value, 'Q1': val, 'Q2': val, ...} |
|
|
返回该财年该指标的最佳可用值(优先 FY,其次 Q4->Q3->Q2->Q1) |
|
|
""" |
|
|
if year_data.get('FY') is not None: |
|
|
return year_data['FY'] |
|
|
|
|
|
for q in ['Q4', 'Q3', 'Q2', 'Q1']: |
|
|
if year_data.get(q) is not None: |
|
|
return year_data[q] |
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import re |
|
|
|
|
|
def parse_period_year_data(period): |
|
|
""" |
|
|
支持以下格式: |
|
|
- FY2025 |
|
|
- Q1-2025 |
|
|
- 2025Q1 (新增支持) |
|
|
""" |
|
|
if not isinstance(period, str): |
|
|
return None, None, None |
|
|
|
|
|
|
|
|
if period.startswith('FY'): |
|
|
try: |
|
|
year = int(period[2:]) |
|
|
return year, 'FY', None |
|
|
except ValueError: |
|
|
pass |
|
|
|
|
|
|
|
|
match = re.match(r'Q([1-4])-(\d{4})', period) |
|
|
if match: |
|
|
quarter = int(match.group(1)) |
|
|
year = int(match.group(2)) |
|
|
return year, 'Q', quarter |
|
|
|
|
|
|
|
|
match = re.match(r'(\d{4})Q([1-4])', period) |
|
|
if match: |
|
|
year = int(match.group(1)) |
|
|
quarter = int(match.group(2)) |
|
|
return year, 'Q', quarter |
|
|
|
|
|
|
|
|
return None, None, None |
|
|
def get_yearly_data(data_json): |
|
|
metrics_list = data_json.get('metrics', []) |
|
|
latest_desc = "N/A" |
|
|
|
|
|
for metric in metrics_list: |
|
|
period = metric.get('period') |
|
|
if not period: |
|
|
continue |
|
|
year, ptype, quarter = parse_period_year_data(period) |
|
|
if year is None: |
|
|
continue |
|
|
|
|
|
if ptype == 'FY': |
|
|
desc = f"{year} FY" |
|
|
else: |
|
|
desc = f"{year} Q{quarter}" |
|
|
|
|
|
|
|
|
latest_desc = desc |
|
|
|
|
|
return latest_desc |
|
|
def parse_period_yoy(period): |
|
|
"""解析 period 为 (year, type, quarter)""" |
|
|
if period.startswith('FY'): |
|
|
year = int(period[2:]) |
|
|
return year, 'FY', None |
|
|
elif re.match(r'Q[1-4]-\d{4}', period): |
|
|
q_part, year_str = period.split('-') |
|
|
return int(year_str), 'Q', int(q_part[1]) |
|
|
else: |
|
|
|
|
|
return None, None, None |
|
|
|
|
|
def get_best_value_for_year_yoy(values_dict, key): |
|
|
""" |
|
|
从年度数据中获取指定指标的最佳值(优先 FY,其次 Q4 → Q1) |
|
|
values_dict: {'FY': {...}, 'Q1': {...}, ...} |
|
|
""" |
|
|
order = ['FY', 'Q4', 'Q3', 'Q2', 'Q1'] |
|
|
for q in order: |
|
|
metric = values_dict.get(q) |
|
|
if metric is not None and isinstance(metric, dict): |
|
|
val = metric.get(key) |
|
|
if val is not None: |
|
|
return val |
|
|
return None |
|
|
import json |
|
|
def calculate_yoy_comparison(data_json): |
|
|
metrics_list = data_json.get('metrics', []) |
|
|
if not metrics_list: |
|
|
return [] |
|
|
if not isinstance(metrics_list, list): |
|
|
return [] |
|
|
if not isinstance(metrics_list[0], dict): |
|
|
return [] |
|
|
|
|
|
cleaned_metrics = [] |
|
|
for i, metric in enumerate(metrics_list): |
|
|
if isinstance(metric, str): |
|
|
try: |
|
|
metric = json.loads(metric) |
|
|
|
|
|
except Exception as e: |
|
|
raise ValueError(f"Failed to parse metrics[{i}] as JSON string: {metric}") from e |
|
|
if not isinstance(metric, dict): |
|
|
raise TypeError(f"metrics[{i}] is not a dictionary or valid JSON string. Type: {type(metric)}") |
|
|
cleaned_metrics.append(metric) |
|
|
|
|
|
|
|
|
yearly_data = defaultdict(lambda: defaultdict(dict)) |
|
|
|
|
|
for metric in cleaned_metrics: |
|
|
period = metric.get('period') |
|
|
if not period: |
|
|
continue |
|
|
|
|
|
year, ptype, quarter = parse_period_yoy(period) |
|
|
if year is None: |
|
|
continue |
|
|
|
|
|
if ptype == 'FY': |
|
|
yearly_data[year]['FY'] = metric |
|
|
elif ptype == 'Q': |
|
|
yearly_data[year][f'Q{quarter}'] = metric |
|
|
|
|
|
|
|
|
|
|
|
years = sorted(yearly_data.keys(), reverse=True) |
|
|
if len(years) < 2: |
|
|
raise ValueError("至少需要两个财年的数据") |
|
|
|
|
|
latest_year = years[0] |
|
|
prev_year = years[1] |
|
|
|
|
|
result = [] |
|
|
indicators = [ |
|
|
("Total Revenue", "total_revenue"), |
|
|
("Net Income", "net_income"), |
|
|
("Earnings Per Share", "earnings_per_share"), |
|
|
("Operating Expenses", "operating_expenses"), |
|
|
("Cash Flow", "operating_cash_flow") |
|
|
] |
|
|
|
|
|
def format_value(val): |
|
|
if val is None: |
|
|
return "N/A" |
|
|
try: |
|
|
val = float(val) |
|
|
except (TypeError, ValueError): |
|
|
return "N/A" |
|
|
abs_val = abs(val) |
|
|
if abs_val >= 1e9: |
|
|
return f"${val / 1e9:.2f}B" |
|
|
elif abs_val >= 1e6: |
|
|
return f"${val / 1e6:.1f}M" |
|
|
elif abs_val >= 1e3: |
|
|
return f"${val / 1e3:.1f}K" |
|
|
else: |
|
|
return f"${val:.2f}" |
|
|
|
|
|
for label, key in indicators: |
|
|
|
|
|
current_val = get_best_value_for_year_yoy(yearly_data[latest_year], key) |
|
|
|
|
|
prev_val = get_best_value_for_year_yoy(yearly_data[prev_year], key) |
|
|
|
|
|
if current_val is None or prev_val is None or prev_val == 0: |
|
|
change_str = "N/A" |
|
|
color = "N/A" |
|
|
else: |
|
|
try: |
|
|
current_val = float(current_val) |
|
|
prev_val = float(prev_val) |
|
|
except (TypeError, ValueError): |
|
|
change_str = "N/A" |
|
|
color = "N/A" |
|
|
else: |
|
|
change_pct = (current_val - prev_val) / abs(prev_val) * 100 |
|
|
if change_pct > 0: |
|
|
change_str = f"+{change_pct:.1f}%" |
|
|
color = "green" |
|
|
elif change_pct < 0: |
|
|
change_str = f"{change_pct:.1f}%" |
|
|
color = "red" |
|
|
else: |
|
|
change_str = "0.0%" |
|
|
color = "N/A" |
|
|
|
|
|
formatted_value = format_value(current_val) |
|
|
|
|
|
result.append({ |
|
|
"label": label, |
|
|
"value": formatted_value, |
|
|
"change": change_str, |
|
|
"color": color |
|
|
}) |
|
|
|
|
|
return result |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import re |
|
|
import json |
|
|
from collections import defaultdict |
|
|
|
|
|
def parse_period_three_year(period): |
|
|
"""解析 period 为 (year, type, quarter)""" |
|
|
if period.startswith('FY'): |
|
|
year = int(period[2:]) |
|
|
return year, 'FY', None |
|
|
elif re.match(r'Q[1-4]-\d{4}', period): |
|
|
q_part, year_str = period.split('-') |
|
|
return int(year_str), 'Q', int(q_part[1]) |
|
|
else: |
|
|
|
|
|
return None, None, None |
|
|
|
|
|
def extract_financial_table(data_json): |
|
|
metrics_list = data_json.get('metrics', []) |
|
|
if not metrics_list: |
|
|
return [] |
|
|
if not isinstance(metrics_list, list): |
|
|
return [] |
|
|
if not isinstance(metrics_list[0], dict): |
|
|
return [] |
|
|
|
|
|
cleaned_metrics = [] |
|
|
for i, metric in enumerate(metrics_list): |
|
|
if isinstance(metric, str): |
|
|
try: |
|
|
metric = json.loads(metric) |
|
|
except Exception as e: |
|
|
raise ValueError(f"Failed to parse metrics[{i}] as JSON string: {metric}") from e |
|
|
if not isinstance(metric, dict): |
|
|
raise TypeError(f"metrics[{i}] is not a dictionary or valid JSON string. Type: {type(metric)}") |
|
|
cleaned_metrics.append(metric) |
|
|
|
|
|
|
|
|
yearly_reports = defaultdict(dict) |
|
|
all_years = set() |
|
|
|
|
|
for metric in cleaned_metrics: |
|
|
period = metric.get('period') |
|
|
if not period: |
|
|
continue |
|
|
|
|
|
year, ptype, quarter = parse_period_three_year(period) |
|
|
if year is None: |
|
|
continue |
|
|
all_years.add(year) |
|
|
if ptype == 'FY': |
|
|
yearly_reports[year]['FY'] = metric |
|
|
elif ptype == 'Q': |
|
|
yearly_reports[year][f'Q{quarter}'] = metric |
|
|
|
|
|
if not all_years: |
|
|
raise ValueError("未找到任何有效报告期") |
|
|
|
|
|
|
|
|
sorted_years = sorted(all_years, reverse=True)[:3] |
|
|
|
|
|
while len(sorted_years) < 3: |
|
|
sorted_years.append(None) |
|
|
|
|
|
|
|
|
def get_best_value(year, key): |
|
|
if year is None: |
|
|
return None |
|
|
reports = yearly_reports.get(year, {}) |
|
|
|
|
|
fy_report = reports.get('FY') |
|
|
if fy_report and isinstance(fy_report, dict): |
|
|
fy_val = fy_report.get(key) |
|
|
if fy_val is not None: |
|
|
return fy_val |
|
|
|
|
|
for q in ['Q4', 'Q3', 'Q2', 'Q1']: |
|
|
q_report = reports.get(q) |
|
|
if q_report and isinstance(q_report, dict): |
|
|
q_val = q_report.get(key) |
|
|
if q_val is not None: |
|
|
return q_val |
|
|
return None |
|
|
|
|
|
|
|
|
indicators = [ |
|
|
("Total", "total_revenue"), |
|
|
("Net Income", "net_income"), |
|
|
("Earnings Per Share", "earnings_per_share"), |
|
|
("Operating Expenses", "operating_expenses"), |
|
|
("Cash Flow", "operating_cash_flow") |
|
|
] |
|
|
|
|
|
|
|
|
def format_to_m(value): |
|
|
if value is None: |
|
|
return "N/A" |
|
|
try: |
|
|
val = float(value) |
|
|
except (TypeError, ValueError): |
|
|
return "N/A" |
|
|
val_in_m = val / 1e6 |
|
|
if abs(val_in_m - round(val_in_m)) < 1e-6: |
|
|
return f"{int(round(val_in_m))}M" |
|
|
else: |
|
|
return f"{val_in_m:.1f}M" |
|
|
|
|
|
def format_eps(value): |
|
|
if value is None: |
|
|
return "N/A" |
|
|
try: |
|
|
val = float(value) |
|
|
except (TypeError, ValueError): |
|
|
return "N/A" |
|
|
return f"{val:.2f}" |
|
|
|
|
|
|
|
|
header = ["Category"] + [f"{year}/FY" for year in sorted_years if year is not None] |
|
|
list_data = [header] |
|
|
|
|
|
for label, key in indicators: |
|
|
row = [label] |
|
|
for year in sorted_years: |
|
|
if year is None: |
|
|
row.append("N/A") |
|
|
else: |
|
|
val = get_best_value(year, key) |
|
|
if label == "Earnings Per Share": |
|
|
row.append(format_eps(val)) |
|
|
else: |
|
|
row.append(format_to_m(val)) |
|
|
list_data.append(row) |
|
|
|
|
|
|
|
|
valid_years = [y for y in sorted_years if y is not None] |
|
|
yoy_header = ["Category"] |
|
|
yoy_pairs = [] |
|
|
|
|
|
if len(valid_years) >= 2: |
|
|
yoy_header.append(f"{valid_years[0]}/FY") |
|
|
yoy_pairs.append((valid_years[0], valid_years[1])) |
|
|
if len(valid_years) >= 3: |
|
|
yoy_header.append(f"{valid_years[1]}/FY") |
|
|
yoy_pairs.append((valid_years[1], valid_years[2])) |
|
|
|
|
|
yoy_rates = [yoy_header] |
|
|
|
|
|
for label, key in indicators: |
|
|
row = [label] |
|
|
for curr_y, prev_y in yoy_pairs: |
|
|
curr_val = get_best_value(curr_y, key) |
|
|
prev_val = get_best_value(prev_y, key) |
|
|
|
|
|
if curr_val is None or prev_val is None or prev_val == 0: |
|
|
row.append("N/A") |
|
|
else: |
|
|
try: |
|
|
curr_val = float(curr_val) |
|
|
prev_val = float(prev_val) |
|
|
except (TypeError, ValueError): |
|
|
row.append("N/A") |
|
|
else: |
|
|
pct = (curr_val - prev_val) / abs(prev_val) * 100 |
|
|
if pct >= 0: |
|
|
row.append(f"+{pct:.2f}%") |
|
|
else: |
|
|
row.append(f"{pct:.2f}%") |
|
|
yoy_rates.append(row) |
|
|
|
|
|
return { |
|
|
"list_data": list_data, |
|
|
"yoy_rates": yoy_rates |
|
|
} |