|
|
import gradio as gr |
|
|
import requests |
|
|
import json |
|
|
import os |
|
|
|
|
|
MCP_SPACE = "JC321/EasyReportsMCPServer" |
|
|
MCP_URL = "https://jc321-easyreportsmcpserver.hf.space" |
|
|
|
|
|
|
|
|
HEADERS = { |
|
|
"Content-Type": "application/json", |
|
|
"User-Agent": "SEC-Query-Assistant/1.0 (jtyxabc@gmail.com)" |
|
|
} |
|
|
|
|
|
|
|
|
def format_value(value, value_type="money"): |
|
|
""" |
|
|
格式化数值:0显示为N/A,其他显示为带单位的格式 |
|
|
value_type: "money" (金额), "eps" (每股收益), "number" (普通数字) |
|
|
""" |
|
|
if value is None or value == 0: |
|
|
return "N/A" |
|
|
|
|
|
if value_type == "money": |
|
|
return f"${value:.2f}B" |
|
|
elif value_type == "eps": |
|
|
return f"${value:.2f}" |
|
|
else: |
|
|
return f"{value:.2f}" |
|
|
|
|
|
def normalize_cik(cik): |
|
|
""" |
|
|
格式化 CIK 为标准的 10 位格式 |
|
|
""" |
|
|
if not cik: |
|
|
return None |
|
|
|
|
|
cik_str = str(cik).replace('-', '').replace(' ', '') |
|
|
|
|
|
cik_str = ''.join(c for c in cik_str if c.isdigit()) |
|
|
|
|
|
return cik_str.zfill(10) if cik_str else None |
|
|
|
|
|
def parse_mcp_response(response_data): |
|
|
""" |
|
|
解析 MCP 协议响应数据 |
|
|
支持格式: |
|
|
1. {"result": {"content": [{"type": "text", "text": "{...}"}]}} |
|
|
2. {"content": [{"type": "text", "text": "{...}"}]} |
|
|
3. 直接的 JSON 数据 |
|
|
""" |
|
|
if not isinstance(response_data, dict): |
|
|
return response_data |
|
|
|
|
|
|
|
|
if "result" in response_data and "content" in response_data["result"]: |
|
|
content = response_data["result"]["content"] |
|
|
if content and len(content) > 0: |
|
|
text_content = content[0].get("text", "{}") |
|
|
|
|
|
try: |
|
|
return json.loads(text_content) |
|
|
except json.JSONDecodeError: |
|
|
return text_content |
|
|
return {} |
|
|
|
|
|
|
|
|
elif "content" in response_data: |
|
|
content = response_data.get("content", []) |
|
|
if content and len(content) > 0: |
|
|
text_content = content[0].get("text", "{}") |
|
|
|
|
|
try: |
|
|
return json.loads(text_content) |
|
|
except json.JSONDecodeError: |
|
|
return text_content |
|
|
return {} |
|
|
|
|
|
|
|
|
return response_data |
|
|
|
|
|
|
|
|
def create_mcp_tools(): |
|
|
"""创建 MCP 工具列表""" |
|
|
return [ |
|
|
{ |
|
|
"name": "query_financial_data", |
|
|
"description": "Query SEC financial data for US listed companies", |
|
|
"parameters": { |
|
|
"type": "object", |
|
|
"properties": { |
|
|
"company_name": { |
|
|
"type": "string", |
|
|
"description": "Company name or stock symbol (e.g., Apple, NVIDIA, AAPL)" |
|
|
}, |
|
|
"query_type": { |
|
|
"type": "string", |
|
|
"enum": ["Latest Financial Data", "3-Year Trends", "5-Year Trends"], |
|
|
"description": "Type of financial query" |
|
|
} |
|
|
}, |
|
|
"required": ["company_name", "query_type"] |
|
|
} |
|
|
} |
|
|
] |
|
|
|
|
|
|
|
|
def execute_tool(tool_name, **kwargs): |
|
|
"""执行 MCP 工具""" |
|
|
if tool_name == "query_financial_data": |
|
|
return query_financial_data(kwargs.get("company_name"), kwargs.get("query_type")) |
|
|
return f"Unknown tool: {tool_name}" |
|
|
|
|
|
def create_source_link(source_form, source_url=None): |
|
|
"""为Source Form创建超链接,使用MCP后端返回的URL""" |
|
|
if not source_form or source_form == 'N/A': |
|
|
return source_form |
|
|
|
|
|
|
|
|
if source_url and source_url != 'N/A': |
|
|
return f"[{source_form}]({source_url})" |
|
|
|
|
|
|
|
|
return source_form |
|
|
|
|
|
def query_financial_data(company_name, query_type): |
|
|
"""查询财务数据的主函数""" |
|
|
|
|
|
if not company_name: |
|
|
return "Please enter a company name or stock symbol" |
|
|
|
|
|
|
|
|
query_type_mapping = { |
|
|
"Latest": "最新财务数据", |
|
|
"3-Year": "3年趋势", |
|
|
"5-Year": "5年趋势", |
|
|
"Filings": "公司报表列表" |
|
|
} |
|
|
internal_query_type = query_type_mapping.get(query_type, query_type) |
|
|
|
|
|
try: |
|
|
|
|
|
|
|
|
search_resp = requests.post( |
|
|
f"{MCP_URL}/message", |
|
|
json={ |
|
|
"method": "tools/call", |
|
|
"params": { |
|
|
"name": "advanced_search_company", |
|
|
"arguments": {"company_input": company_name} |
|
|
} |
|
|
}, |
|
|
headers=HEADERS, |
|
|
timeout=30 |
|
|
) |
|
|
|
|
|
print(f"搜索公司:{company_name},search_resp.status_code: {search_resp.status_code}\nSearch Response: {search_resp.text}") |
|
|
|
|
|
if search_resp.status_code != 200: |
|
|
print(f"❌ Server Error: HTTP {search_resp.status_code}\n\nResponse: {search_resp.text[:500]}") |
|
|
return [] |
|
|
|
|
|
try: |
|
|
result = search_resp.json() |
|
|
|
|
|
company = parse_mcp_response(result) |
|
|
except (ValueError, KeyError, json.JSONDecodeError) as e: |
|
|
return f"❌ JSON Parse Error: {str(e)}\n\nResponse: {search_resp.text[:500]}" |
|
|
|
|
|
if isinstance(company, dict) and company.get("error"): |
|
|
return f"❌ Error: {company['error']}" |
|
|
|
|
|
|
|
|
|
|
|
company_name = company.get('name', 'Unknown') |
|
|
ticker = company.get('ticker', 'N/A') |
|
|
|
|
|
result = f"# {company_name}\n\n" |
|
|
result += f"**Stock Symbol**: {ticker}\n" |
|
|
|
|
|
result += "\n---\n\n" |
|
|
|
|
|
|
|
|
cik = normalize_cik(company.get('cik')) |
|
|
if not cik: |
|
|
return result + f"❌ Error: Invalid CIK from company search\n\nDebug: company data = {json.dumps(company, indent=2)}" |
|
|
|
|
|
|
|
|
if internal_query_type == "最新财务数据": |
|
|
data_resp = requests.post( |
|
|
f"{MCP_URL}/message", |
|
|
json={ |
|
|
"method": "tools/call", |
|
|
"params": { |
|
|
"name": "get_latest_financial_data", |
|
|
"arguments": {"cik": cik} |
|
|
} |
|
|
}, |
|
|
headers=HEADERS, |
|
|
timeout=30 |
|
|
) |
|
|
|
|
|
if data_resp.status_code != 200: |
|
|
return result + f"❌ Server Error: HTTP {data_resp.status_code}\n\n{data_resp.text[:500]}" |
|
|
|
|
|
try: |
|
|
data_result = data_resp.json() |
|
|
|
|
|
data = parse_mcp_response(data_result) |
|
|
except (ValueError, KeyError, json.JSONDecodeError) as e: |
|
|
return result + f"❌ JSON Parse Error: {str(e)}\n\n{data_resp.text[:500]}" |
|
|
|
|
|
if isinstance(data, dict) and data.get("error"): |
|
|
return result + f"❌ {data['error']}" |
|
|
|
|
|
cik = data.get('cik') |
|
|
result += f"## Fiscal Year {data.get('period', 'N/A')}\n\n" |
|
|
|
|
|
total_revenue = data.get('total_revenue', 0) / 1e9 if data.get('total_revenue') else 0 |
|
|
net_income = data.get('net_income', 0) / 1e9 if data.get('net_income') else 0 |
|
|
eps = data.get('earnings_per_share', 0) if data.get('earnings_per_share') else 0 |
|
|
opex = data.get('operating_expenses', 0) / 1e9 if data.get('operating_expenses') else 0 |
|
|
ocf = data.get('operating_cash_flow', 0) / 1e9 if data.get('operating_cash_flow') else 0 |
|
|
|
|
|
result += f"- **Total Revenue**: {format_value(total_revenue)}\n" |
|
|
result += f"- **Net Income**: {format_value(net_income)}\n" |
|
|
result += f"- **Earnings Per Share**: {format_value(eps, 'eps')}\n" |
|
|
result += f"- **Operating Expenses**: {format_value(opex)}\n" |
|
|
result += f"- **Operating Cash Flow**: {format_value(ocf)}\n" |
|
|
|
|
|
source_form = data.get('source_form', 'N/A') |
|
|
source_url = data.get('source_url', None) |
|
|
result += f"- **Source Form**: {create_source_link(source_form, source_url)}\n" |
|
|
|
|
|
elif internal_query_type == "3年趋势": |
|
|
metrics_resp = requests.post( |
|
|
f"{MCP_URL}/message", |
|
|
json={ |
|
|
"method": "tools/call", |
|
|
"params": { |
|
|
"name": "extract_financial_metrics", |
|
|
"arguments": {"cik": cik, "years": 3} |
|
|
} |
|
|
}, |
|
|
headers=HEADERS, |
|
|
timeout=60 |
|
|
) |
|
|
|
|
|
|
|
|
result += f"\n**Debug Info (3-Year)**:\n- HTTP Status: {metrics_resp.status_code}\n" |
|
|
|
|
|
if metrics_resp.status_code != 200: |
|
|
return result + f"❌ Server Error: HTTP {metrics_resp.status_code}\n\n{metrics_resp.text[:500]}" |
|
|
|
|
|
try: |
|
|
metrics_result = metrics_resp.json() |
|
|
|
|
|
result += f"- Raw Response Length: {len(metrics_resp.text)} chars\n" |
|
|
result += f"- Response Preview: {metrics_resp.text[:200]}...\n\n" |
|
|
|
|
|
|
|
|
metrics = parse_mcp_response(metrics_result) |
|
|
|
|
|
|
|
|
result += f"- Parsed Type: {type(metrics).__name__}\n" |
|
|
if isinstance(metrics, dict): |
|
|
result += f"- Parsed Keys: {list(metrics.keys())}\n" |
|
|
result += f"- Periods: {metrics.get('periods', 'N/A')}\n" |
|
|
result += f"- Data Length: {len(metrics.get('data', []))}\n\n" |
|
|
except (ValueError, KeyError, json.JSONDecodeError) as e: |
|
|
return result + f"❌ JSON Parse Error: {str(e)}\n\nResponse: {metrics_resp.text[:500]}" |
|
|
|
|
|
if isinstance(metrics, dict) and metrics.get("error"): |
|
|
return result + f"❌ {metrics['error']}" |
|
|
|
|
|
|
|
|
if not isinstance(metrics, dict): |
|
|
return result + f"❌ Invalid response format\n\nDebug: {str(metrics)[:500]}" |
|
|
|
|
|
result += f"## 3-Year Financial Trends ({metrics.get('periods', 0)} periods)\n\n" |
|
|
|
|
|
|
|
|
all_data = metrics.get('data', []) |
|
|
|
|
|
|
|
|
if not all_data: |
|
|
return result + f"❌ No data returned from MCP Server\n\nDebug: metrics keys = {list(metrics.keys())}\n\nFull response: {json.dumps(metrics, indent=2, ensure_ascii=False)[:1000]}" |
|
|
|
|
|
|
|
|
seen = set() |
|
|
unique_data = [] |
|
|
for m in all_data: |
|
|
key = (m.get('period', 'N/A'), m.get('source_form', 'N/A')) |
|
|
if key not in seen: |
|
|
seen.add(key) |
|
|
unique_data.append(m) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def sort_key(x): |
|
|
period = x.get('period', '0000') |
|
|
|
|
|
year = period[:4] if len(period) >= 4 else '0000' |
|
|
|
|
|
if 'Q' in period: |
|
|
quarter = period[period.index('Q')+1] if period.index('Q')+1 < len(period) else '0' |
|
|
return (year, 1, 4 - int(quarter)) |
|
|
else: |
|
|
return (year, 0, 0) |
|
|
|
|
|
unique_data = sorted(unique_data, key=sort_key, reverse=True) |
|
|
|
|
|
result += "| Period | Revenue (B) | Net Income (B) | EPS | Operating Expenses (B) | Operating Cash Flow (B) | Source Form |\n" |
|
|
result += "|--------|-------------|----------------|-----|------------------------|-------------------------|-------------|\n" |
|
|
|
|
|
for m in unique_data: |
|
|
period = m.get('period', 'N/A') |
|
|
rev = (m.get('total_revenue') or 0) / 1e9 |
|
|
inc = (m.get('net_income') or 0) / 1e9 |
|
|
eps_val = m.get('earnings_per_share') or 0 |
|
|
opex = (m.get('operating_expenses') or 0) / 1e9 |
|
|
ocf = (m.get('operating_cash_flow') or 0) / 1e9 |
|
|
source_form = m.get('source_form', 'N/A') |
|
|
source_url = m.get('source_url', None) |
|
|
|
|
|
|
|
|
if 'Q' in period: |
|
|
|
|
|
display_period = period |
|
|
else: |
|
|
|
|
|
display_period = period if period.startswith('FY') else f"FY{period}" |
|
|
|
|
|
source_link = create_source_link(source_form, source_url) |
|
|
|
|
|
result += f"| {display_period} | {format_value(rev)} | {format_value(inc)} | {format_value(eps_val, 'eps')} | {format_value(opex)} | {format_value(ocf)} | {source_link} |\n" |
|
|
|
|
|
elif internal_query_type == "5年趋势": |
|
|
metrics_resp = requests.post( |
|
|
f"{MCP_URL}/message", |
|
|
json={ |
|
|
"method": "tools/call", |
|
|
"params": { |
|
|
"name": "extract_financial_metrics", |
|
|
"arguments": {"cik": cik, "years": 5} |
|
|
} |
|
|
}, |
|
|
headers=HEADERS, |
|
|
timeout=60 |
|
|
) |
|
|
|
|
|
|
|
|
result += f"\n**Debug Info (5-Year)**:\n- HTTP Status: {metrics_resp.status_code}\n" |
|
|
|
|
|
if metrics_resp.status_code != 200: |
|
|
return result + f"❌ Server Error: HTTP {metrics_resp.status_code}\n\n{metrics_resp.text[:500]}" |
|
|
|
|
|
try: |
|
|
metrics_result = metrics_resp.json() |
|
|
|
|
|
result += f"- Raw Response Length: {len(metrics_resp.text)} chars\n" |
|
|
result += f"- Response Preview: {metrics_resp.text[:200]}...\n\n" |
|
|
|
|
|
|
|
|
metrics = parse_mcp_response(metrics_result) |
|
|
|
|
|
|
|
|
result += f"- Parsed Type: {type(metrics).__name__}\n" |
|
|
if isinstance(metrics, dict): |
|
|
result += f"- Parsed Keys: {list(metrics.keys())}\n" |
|
|
result += f"- Periods: {metrics.get('periods', 'N/A')}\n" |
|
|
result += f"- Data Length: {len(metrics.get('data', []))}\n\n" |
|
|
except (ValueError, KeyError, json.JSONDecodeError) as e: |
|
|
return result + f"❌ JSON Parse Error: {str(e)}\n\nResponse: {metrics_resp.text[:500]}" |
|
|
|
|
|
if isinstance(metrics, dict) and metrics.get("error"): |
|
|
return result + f"❌ {metrics['error']}" |
|
|
|
|
|
|
|
|
if not isinstance(metrics, dict): |
|
|
return result + f"❌ Invalid response format\n\nDebug: {str(metrics)[:500]}" |
|
|
|
|
|
|
|
|
all_data = metrics.get('data', []) |
|
|
|
|
|
|
|
|
if not all_data: |
|
|
return result + f"❌ No data returned from MCP Server\n\nDebug: metrics keys = {list(metrics.keys())}\n\nFull response: {json.dumps(metrics, indent=2, ensure_ascii=False)[:1000]}" |
|
|
|
|
|
|
|
|
seen = set() |
|
|
unique_data = [] |
|
|
for m in all_data: |
|
|
key = (m.get('period', 'N/A'), m.get('source_form', 'N/A')) |
|
|
if key not in seen: |
|
|
seen.add(key) |
|
|
unique_data.append(m) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def sort_key(x): |
|
|
period = x.get('period', '0000') |
|
|
|
|
|
year = period[:4] if len(period) >= 4 else '0000' |
|
|
|
|
|
if 'Q' in period: |
|
|
quarter = period[period.index('Q')+1] if period.index('Q')+1 < len(period) else '0' |
|
|
return (year, 1, 4 - int(quarter)) |
|
|
else: |
|
|
return (year, 0, 0) |
|
|
|
|
|
unique_data = sorted(unique_data, key=sort_key, reverse=True) |
|
|
print(f'5年数据::{unique_data}') |
|
|
result = unique_data |
|
|
|
|
|
elif internal_query_type == "公司报表列表": |
|
|
|
|
|
filings_resp = requests.post( |
|
|
f"{MCP_URL}/message", |
|
|
json={ |
|
|
"method": "tools/call", |
|
|
"params": { |
|
|
"name": "get_company_filings", |
|
|
"arguments": {"cik": cik, "limit": 50} |
|
|
} |
|
|
}, |
|
|
headers=HEADERS, |
|
|
timeout=60 |
|
|
) |
|
|
|
|
|
if filings_resp.status_code != 200: |
|
|
return result + f"❌ Server Error: HTTP {filings_resp.status_code}\n\n{filings_resp.text[:500]}" |
|
|
|
|
|
try: |
|
|
filings_result = filings_resp.json() |
|
|
|
|
|
filings_data = parse_mcp_response(filings_result) |
|
|
except (ValueError, KeyError, json.JSONDecodeError) as e: |
|
|
return result + f"❌ JSON Parse Error: {str(e)}\n\n{filings_resp.text[:500]}" |
|
|
|
|
|
if isinstance(filings_data, dict) and filings_data.get("error"): |
|
|
return result + f"❌ {filings_data['error']}" |
|
|
|
|
|
filings = filings_data.get('filings', []) if isinstance(filings_data, dict) else filings_data |
|
|
|
|
|
result += f"## Company Filings ({len(filings)} records)\n\n" |
|
|
result += "| Form Type | Filing Date | Accession Number | Primary Document |\n" |
|
|
result += "|-----------|-------------|------------------|------------------|\n" |
|
|
|
|
|
for filing in filings: |
|
|
form_type = filing.get('form_type', 'N/A') |
|
|
filing_date = filing.get('filing_date', 'N/A') |
|
|
accession_num = filing.get('accession_number', 'N/A') |
|
|
primary_doc = filing.get('primary_document', 'N/A') |
|
|
filing_url = filing.get('filing_url', None) |
|
|
|
|
|
|
|
|
if filing_url and filing_url != 'N/A': |
|
|
form_link = f"[{form_type}]({filing_url})" |
|
|
primary_doc_link = f"[{primary_doc}]({filing_url})" |
|
|
else: |
|
|
form_link = form_type |
|
|
primary_doc_link = primary_doc |
|
|
|
|
|
result += f"| {form_link} | {filing_date} | {accession_num} | {primary_doc_link} |\n" |
|
|
|
|
|
return result |
|
|
|
|
|
except requests.exceptions.RequestException as e: |
|
|
return f"❌ Network Error: {str(e)}\n\nMCP Server: {MCP_URL}" |
|
|
except Exception as e: |
|
|
import traceback |
|
|
return f"❌ Unexpected Error: {str(e)}\n\nTraceback:\n{traceback.format_exc()}" |
|
|
|