Easy-Financial-Report / service /report_tools.py
baba521's picture
test
abf5292
import json
def extract_last_three_with_fallback(data_list):
# 定义年份范围(当前最新是 FY2025,所以前三年是 2025, 2024, 2023)
years = [2025, 2024, 2023]
# 构建 period 映射:按优先级
priority_levels = [
("FY", [f"FY{y}" for y in years]),
("Q4", [f"{y}Q4" for y in years]),
("Q3", [f"{y}Q3" for y in years]),
("Q2", [f"{y}Q2" for y in years]),
("Q1", [f"{y}Q1" for y in years]),
]
# 转为字典便于查找
data_map = {item["period"]: item for item in data_list if "period" in item}
# 按优先级尝试
for level_name, periods in priority_levels:
records = []
valid = True
for period in periods:
item = data_map.get(period)
if item is None or item.get("total_revenue") is None:
valid = False
break
# 提取关键字段
clean_item = {
"period": period,
"fiscal_year": int(period[:4]) if level_name != "FY" else int(period[2:]),
"level": level_name,
"total_revenue": item["total_revenue"],
"net_income": item.get("net_income"),
"earnings_per_share": item.get("earnings_per_share"),
"operating_expenses": item.get("operating_expenses"),
"operating_cash_flow": item.get("operating_cash_flow"),
"source_url": item.get("source_url")
}
records.append(clean_item)
if valid:
# 找到完整三年数据,返回
return records
# 如果所有层级都不完整,可选择返回最高优先级中有效的部分(或抛异常)
# 这里我们返回最高优先级中非空的记录(保守策略)
for level_name, periods in priority_levels:
records = []
for period in periods:
item = data_map.get(period)
if item and item.get("total_revenue") is not None:
clean_item = {
"period": period,
"fiscal_year": int(period[:4]) if level_name != "FY" else int(period[2:]),
"level": level_name,
"total_revenue": item["total_revenue"],
"net_income": item.get("net_income"),
"earnings_per_share": item.get("earnings_per_share"),
"operating_expenses": item.get("operating_expenses"),
"operating_cash_flow": item.get("operating_cash_flow"),
"source_url": item.get("source_url")
}
records.append(clean_item)
if records:
return records # 返回第一个有数据的层级(即使不全)
return [] # 完全无数据
def format_number(value):
"""将大数字格式化为 $XM 或 $XB"""
if value >= 1_000_000_000:
return f"${value / 1_000_000_000:.2f}B".replace(".00B", "B").replace(".0B", "B")
elif value >= 1_000_000:
return f"${value / 1_000_000:.1f}M".replace(".0M", "M")
else:
return f"${value:,.0f}"
def format_eps(value):
"""EPS 保留两位小数"""
return f"${value:.2f}"
def safe_int(val):
"""安全转换为 int,支持字符串或 None"""
if val is None:
return 0
try:
return int(float(val)) # 兼容字符串或 float
except (ValueError, TypeError):
return 0
def calculate_change(current, previous):
"""计算同比变化百分比,返回如 '+12.4%' 或 '-3.2%'"""
if previous == 0:
return "+0.0%" if current >= 0 else "-0.0%"
change = (current - previous) / abs(previous) * 100
sign = "+" if change >= 0 else "-"
return f"{sign}{abs(change):.1f}%"
def build_financial_metrics_three_year_data(three_year_data):
# 确保按 fiscal_year 降序排列(最新在前)
sorted_data = sorted(three_year_data, key=lambda x: x["fiscal_year"], reverse=True)
if len(sorted_data) < 2:
raise ValueError("至少需要两年数据来计算同比变化")
latest = sorted_data[0]
previous = sorted_data[1]
# 提取并转为 int
rev_curr = safe_int(latest.get("total_revenue"))
rev_prev = safe_int(previous.get("total_revenue"))
net_curr = safe_int(latest.get("net_income"))
net_prev = safe_int(previous.get("net_income"))
eps_curr = float(latest.get("earnings_per_share", 0) or 0)
eps_prev = float(previous.get("earnings_per_share", 0) or 0)
opex_curr = safe_int(latest.get("operating_expenses"))
opex_prev = safe_int(previous.get("operating_expenses"))
cash_curr = safe_int(latest.get("operating_cash_flow"))
cash_prev = safe_int(previous.get("operating_cash_flow"))
metrics = [
{
"label": "Total Revenue",
"value": format_number(rev_curr),
"change": calculate_change(rev_curr, rev_prev),
"color": "green" if rev_curr >= rev_prev else "red"
},
{
"label": "Net Income",
"value": format_number(net_curr),
"change": calculate_change(net_curr, net_prev),
"color": "green" if net_curr >= net_prev else "red"
},
{
"label": "Earnings Per Share",
"value": format_eps(eps_curr),
"change": calculate_change(eps_curr, eps_prev),
"color": "green" if eps_curr >= eps_prev else "red"
},
{
"label": "Operating Expenses",
"value": format_number(opex_curr),
"change": calculate_change(opex_curr, opex_prev),
"color": "green" if opex_curr >= opex_prev else "red"
},
{
"label": "Cash Flow",
"value": format_number(cash_curr),
"change": calculate_change(cash_curr, cash_prev),
"color": "green" if cash_curr >= cash_prev else "red"
}
]
return metrics
# 假设你的原始数据变量名为 raw_data(即你提供的大列表)
# raw_data = [ {...}, ... ]
# 执行
# result = extract_last_three_with_fallback(raw_data)
# # 输出 JSON
# json_output = json.dumps(result, indent=2)
# print(json_output)
# ==========
from collections import defaultdict
import re
def parse_period(period):
"""解析 period 字符串,返回 (year, type, quarter)"""
if period.startswith('FY'):
year = int(period[2:])
return year, 'FY', None
elif re.match(r'Q[1-4]-\d{4}', period):
q, year = period.split('-')
return int(year), 'Q', int(q[1])
else:
raise ValueError(f"Unknown period format: {period}")
def get_best_value_for_year(year_data, key):
"""
year_data: dict like {'FY': value, 'Q1': val, 'Q2': val, ...}
返回该财年该指标的最佳可用值(优先 FY,其次 Q4->Q3->Q2->Q1)
"""
if year_data.get('FY') is not None:
return year_data['FY']
# 否则从 Q4 到 Q1 找第一个非 None
for q in ['Q4', 'Q3', 'Q2', 'Q1']:
if year_data.get(q) is not None:
return year_data[q]
return None
# def get_yearly_data(data_json):
# metrics_list = data_json['metrics']
# # 按年份组织数据:year -> { 'FY': {...}, 'Q1': {...}, ... }
# yearly_data = "N/A"
# for metric in metrics_list:
# period = metric['period']
# year, ptype, quarter = parse_period(period)
# if ptype == 'FY':
# yearly_data = f"{year} {ptype}"
# else:
# yearly_data = f"{year} {ptype} Q{quarter}"
# return yearly_data
import re
def parse_period_year_data(period):
"""
支持以下格式:
- FY2025
- Q1-2025
- 2025Q1 (新增支持)
"""
if not isinstance(period, str):
return None, None, None
# 格式 1: FY2025
if period.startswith('FY'):
try:
year = int(period[2:])
return year, 'FY', None
except ValueError:
pass
# 格式 2: Q1-2025
match = re.match(r'Q([1-4])-(\d{4})', period)
if match:
quarter = int(match.group(1))
year = int(match.group(2))
return year, 'Q', quarter
# 格式 3: 2025Q1 (新增)
match = re.match(r'(\d{4})Q([1-4])', period)
if match:
year = int(match.group(1))
quarter = int(match.group(2))
return year, 'Q', quarter
# 无法解析
return None, None, None
def get_yearly_data(data_json):
metrics_list = data_json.get('metrics', [])
latest_desc = "N/A"
for metric in metrics_list:
period = metric.get('period')
if not period:
continue
year, ptype, quarter = parse_period_year_data(period)
if year is None:
continue # 跳过无法解析的
if ptype == 'FY':
desc = f"{year} FY"
else:
desc = f"{year} Q{quarter}"
# 简单认为列表顺序是时间顺序,最后一条最新
latest_desc = desc
return latest_desc
def parse_period_yoy(period):
"""解析 period 为 (year, type, quarter)"""
if period.startswith('FY'):
year = int(period[2:])
return year, 'FY', None
elif re.match(r'Q[1-4]-\d{4}', period):
q_part, year_str = period.split('-')
return int(year_str), 'Q', int(q_part[1])
else:
# 忽略无法解析的 period
return None, None, None
def get_best_value_for_year_yoy(values_dict, key):
"""
从年度数据中获取指定指标的最佳值(优先 FY,其次 Q4 → Q1)
values_dict: {'FY': {...}, 'Q1': {...}, ...}
"""
order = ['FY', 'Q4', 'Q3', 'Q2', 'Q1']
for q in order:
metric = values_dict.get(q)
if metric is not None and isinstance(metric, dict):
val = metric.get(key)
if val is not None:
return val
return None
import json
def calculate_yoy_comparison(data_json):
metrics_list = data_json.get('metrics', [])
if not metrics_list:
return []
if not isinstance(metrics_list, list):
return []
if not isinstance(metrics_list[0], dict):
return []
# 安全处理:确保每个 metric 是字典(防止双重 JSON 编码)
cleaned_metrics = []
for i, metric in enumerate(metrics_list):
if isinstance(metric, str):
try:
metric = json.loads(metric)
# metric = metric
except Exception as e:
raise ValueError(f"Failed to parse metrics[{i}] as JSON string: {metric}") from e
if not isinstance(metric, dict):
raise TypeError(f"metrics[{i}] is not a dictionary or valid JSON string. Type: {type(metric)}")
cleaned_metrics.append(metric)
# 按年份组织数据:year -> { 'FY': {...}, 'Q1': {...}, ... }
yearly_data = defaultdict(lambda: defaultdict(dict))
for metric in cleaned_metrics:
period = metric.get('period')
if not period:
continue # 跳过没有 period 的条目
year, ptype, quarter = parse_period_yoy(period)
if year is None:
continue # 跳过无法解析的 period
if ptype == 'FY':
yearly_data[year]['FY'] = metric
elif ptype == 'Q':
yearly_data[year][f'Q{quarter}'] = metric
# 否则忽略
# 获取所有年份并排序(最新在前)
years = sorted(yearly_data.keys(), reverse=True)
if len(years) < 2:
raise ValueError("至少需要两个财年的数据")
latest_year = years[0]
prev_year = years[1]
result = []
indicators = [
("Total Revenue", "total_revenue"),
("Net Income", "net_income"),
("Earnings Per Share", "earnings_per_share"),
("Operating Expenses", "operating_expenses"),
("Cash Flow", "operating_cash_flow")
]
def format_value(val):
if val is None:
return "N/A"
try:
val = float(val)
except (TypeError, ValueError):
return "N/A"
abs_val = abs(val)
if abs_val >= 1e9:
return f"${val / 1e9:.2f}B"
elif abs_val >= 1e6:
return f"${val / 1e6:.1f}M"
elif abs_val >= 1e3:
return f"${val / 1e3:.1f}K"
else:
return f"${val:.2f}"
for label, key in indicators:
# 获取本财年最佳值
current_val = get_best_value_for_year_yoy(yearly_data[latest_year], key)
# 获取去年财年最佳值
prev_val = get_best_value_for_year_yoy(yearly_data[prev_year], key)
if current_val is None or prev_val is None or prev_val == 0:
change_str = "N/A"
color = "N/A"
else:
try:
current_val = float(current_val)
prev_val = float(prev_val)
except (TypeError, ValueError):
change_str = "N/A"
color = "N/A"
else:
change_pct = (current_val - prev_val) / abs(prev_val) * 100
if change_pct > 0:
change_str = f"+{change_pct:.1f}%"
color = "green"
elif change_pct < 0:
change_str = f"{change_pct:.1f}%"
color = "red"
else:
change_str = "0.0%"
color = "N/A"
formatted_value = format_value(current_val)
result.append({
"label": label,
"value": formatted_value,
"change": change_str,
"color": color
})
return result
# def parse_period_yoy(period):
# """解析 period 为 (year, type, quarter)"""
# if period.startswith('FY'):
# year = int(period[2:])
# return year, 'FY', None
# elif re.match(r'Q[1-4]-\d{4}', period):
# q_part, year_str = period.split('-')
# return int(year_str), 'Q', int(q_part[1])
# else:
# # 忽略无法解析的 period
# return None, None, None
# def calculate_yoy_comparison(data_json):
# metrics_list = data_json['metrics']
# # 按年份组织数据:year -> { 'FY': {...}, 'Q1': {...}, ... }
# yearly_data = defaultdict(lambda: defaultdict(dict))
# for metric in metrics_list:
# period = metric['period']
# year, ptype, quarter = parse_period_yoy(period)
# if ptype == 'FY':
# yearly_data[year]['FY'] = metric
# else:
# yearly_data[year][f'Q{quarter}'] = metric
# # 获取所有年份并排序(最新在前)
# years = sorted(yearly_data.keys(), reverse=True)
# if len(years) < 2:
# raise ValueError("至少需要两个财年的数据")
# latest_year = years[0]
# prev_year = years[1]
# result = []
# indicators = [
# ("Total Revenue", "total_revenue"),
# ("Net Income", "net_income"),
# ("Earnings Per Share", "earnings_per_share"),
# ("Operating Expenses", "operating_expenses"),
# ("Cash Flow", "operating_cash_flow")
# ]
# def format_value(val):
# if val is None:
# return "N/A"
# abs_val = abs(val)
# if abs_val >= 1e9:
# return f"${val / 1e9:.2f}B"
# elif abs_val >= 1e6:
# return f"${val / 1e6:.1f}M"
# elif abs_val >= 1e3:
# return f"${val / 1e3:.1f}K"
# else:
# return f"${val:.2f}"
# for label, key in indicators:
# # 获取本财年最佳值
# current_val = get_best_value_for_year(
# {k: v.get(key) for k, v in yearly_data[latest_year].items()},
# key
# )
# # 获取去年财年最佳值
# prev_val = get_best_value_for_year(
# {k: v.get(key) for k, v in yearly_data[prev_year].items()},
# key
# )
# if current_val is None or prev_val is None or prev_val == 0:
# change_str = "N/A"
# color = "N/A"
# else:
# change_pct = (current_val - prev_val) / abs(prev_val) * 100
# if change_pct > 0:
# change_str = f"+{change_pct:.1f}%"
# color = "green"
# elif change_pct < 0:
# change_str = f"{change_pct:.1f}%"
# color = "red"
# else:
# change_str = "0.0%"
# color = "N/A"
# formatted_value = format_value(current_val)
# result.append({
# "label": label,
# "value": formatted_value,
# "change": change_str,
# "color": color
# })
# return result
import re
import json
from collections import defaultdict
def parse_period_three_year(period):
"""解析 period 为 (year, type, quarter)"""
if period.startswith('FY'):
year = int(period[2:])
return year, 'FY', None
elif re.match(r'Q[1-4]-\d{4}', period):
q_part, year_str = period.split('-')
return int(year_str), 'Q', int(q_part[1])
else:
# 忽略无法解析的 period
return None, None, None
def extract_financial_table(data_json):
metrics_list = data_json.get('metrics', [])
if not metrics_list:
return []
if not isinstance(metrics_list, list):
return []
if not isinstance(metrics_list[0], dict):
return []
# === 安全清洗:确保每个 metric 是字典 ===
cleaned_metrics = []
for i, metric in enumerate(metrics_list):
if isinstance(metric, str):
try:
metric = json.loads(metric)
except Exception as e:
raise ValueError(f"Failed to parse metrics[{i}] as JSON string: {metric}") from e
if not isinstance(metric, dict):
raise TypeError(f"metrics[{i}] is not a dictionary or valid JSON string. Type: {type(metric)}")
cleaned_metrics.append(metric)
# 按年份组织所有报告:year -> { 'FY': metric_dict, 'Q1': ..., 'Q2': ... }
yearly_reports = defaultdict(dict)
all_years = set()
for metric in cleaned_metrics:
period = metric.get('period')
if not period:
continue # 跳过无 period 的条目
year, ptype, quarter = parse_period_three_year(period)
if year is None:
continue
all_years.add(year)
if ptype == 'FY':
yearly_reports[year]['FY'] = metric
elif ptype == 'Q':
yearly_reports[year][f'Q{quarter}'] = metric
if not all_years:
raise ValueError("未找到任何有效报告期")
# 取最近三个财年(倒序)
sorted_years = sorted(all_years, reverse=True)[:3]
# 补齐到3年(如果不足)
while len(sorted_years) < 3:
sorted_years.append(None)
# 为每个年份获取最佳值(优先 FY,其次 Q4→Q1)
def get_best_value(year, key):
if year is None:
return None
reports = yearly_reports.get(year, {})
# 确保 reports[q] 是 dict
fy_report = reports.get('FY')
if fy_report and isinstance(fy_report, dict):
fy_val = fy_report.get(key)
if fy_val is not None:
return fy_val
# 否则 Q4 → Q1
for q in ['Q4', 'Q3', 'Q2', 'Q1']:
q_report = reports.get(q)
if q_report and isinstance(q_report, dict):
q_val = q_report.get(key)
if q_val is not None:
return q_val
return None
# 指标定义
indicators = [
("Total", "total_revenue"),
("Net Income", "net_income"),
("Earnings Per Share", "earnings_per_share"),
("Operating Expenses", "operating_expenses"),
("Cash Flow", "operating_cash_flow")
]
# 格式化函数
def format_to_m(value):
if value is None:
return "N/A"
try:
val = float(value)
except (TypeError, ValueError):
return "N/A"
val_in_m = val / 1e6
if abs(val_in_m - round(val_in_m)) < 1e-6:
return f"{int(round(val_in_m))}M"
else:
return f"{val_in_m:.1f}M"
def format_eps(value):
if value is None:
return "N/A"
try:
val = float(value)
except (TypeError, ValueError):
return "N/A"
return f"{val:.2f}"
# 构建 list_data
header = ["Category"] + [f"{year}/FY" for year in sorted_years if year is not None]
list_data = [header]
for label, key in indicators:
row = [label]
for year in sorted_years:
if year is None:
row.append("N/A")
else:
val = get_best_value(year, key)
if label == "Earnings Per Share":
row.append(format_eps(val))
else:
row.append(format_to_m(val))
list_data.append(row)
# 构建 yoy_rates
valid_years = [y for y in sorted_years if y is not None]
yoy_header = ["Category"]
yoy_pairs = []
if len(valid_years) >= 2:
yoy_header.append(f"{valid_years[0]}/FY")
yoy_pairs.append((valid_years[0], valid_years[1]))
if len(valid_years) >= 3:
yoy_header.append(f"{valid_years[1]}/FY")
yoy_pairs.append((valid_years[1], valid_years[2]))
yoy_rates = [yoy_header]
for label, key in indicators:
row = [label]
for curr_y, prev_y in yoy_pairs:
curr_val = get_best_value(curr_y, key)
prev_val = get_best_value(prev_y, key)
if curr_val is None or prev_val is None or prev_val == 0:
row.append("N/A")
else:
try:
curr_val = float(curr_val)
prev_val = float(prev_val)
except (TypeError, ValueError):
row.append("N/A")
else:
pct = (curr_val - prev_val) / abs(prev_val) * 100
if pct >= 0:
row.append(f"+{pct:.2f}%")
else:
row.append(f"{pct:.2f}%")
yoy_rates.append(row)
return {
"list_data": list_data,
"yoy_rates": yoy_rates
}