baba521's picture
测试集成mcp
716f1cd
import gradio as gr
import requests
import json
import os
MCP_SPACE = "JC321/EasyReportsMCPServer"
MCP_URL = "https://jc321-easyreportsmcpserver.hf.space"
# 设置请求头
HEADERS = {
"Content-Type": "application/json",
"User-Agent": "SEC-Query-Assistant/1.0 (jtyxabc@gmail.com)"
}
# 格式化数值显示
def format_value(value, value_type="money"):
"""
格式化数值:0显示为N/A,其他显示为带单位的格式
value_type: "money" (金额), "eps" (每股收益), "number" (普通数字)
"""
if value is None or value == 0:
return "N/A"
if value_type == "money":
return f"${value:.2f}B"
elif value_type == "eps":
return f"${value:.2f}"
else: # number
return f"{value:.2f}"
def normalize_cik(cik):
"""
格式化 CIK 为标准的 10 位格式
"""
if not cik:
return None
# 转换为字符串并移除非数字字符
cik_str = str(cik).replace('-', '').replace(' ', '')
# 仅保留数字
cik_str = ''.join(c for c in cik_str if c.isdigit())
# 填充前导 0 至 10 位
return cik_str.zfill(10) if cik_str else None
def parse_mcp_response(response_data):
"""
解析 MCP 协议响应数据
支持格式:
1. {"result": {"content": [{"type": "text", "text": "{...}"}]}}
2. {"content": [{"type": "text", "text": "{...}"}]}
3. 直接的 JSON 数据
"""
if not isinstance(response_data, dict):
return response_data
# 格式 1: {"result": {"content": [...]}}
if "result" in response_data and "content" in response_data["result"]:
content = response_data["result"]["content"]
if content and len(content) > 0:
text_content = content[0].get("text", "{}")
# 直接解析 JSON(MCP Server 已移除 emoji 前缀)
try:
return json.loads(text_content)
except json.JSONDecodeError:
return text_content
return {}
# 格式 2: {"content": [...]}
elif "content" in response_data:
content = response_data.get("content", [])
if content and len(content) > 0:
text_content = content[0].get("text", "{}")
# 直接解析 JSON
try:
return json.loads(text_content)
except json.JSONDecodeError:
return text_content
return {}
# 格式 3: 直接返回
return response_data
# MCP 工具定义
def create_mcp_tools():
"""创建 MCP 工具列表"""
return [
{
"name": "query_financial_data",
"description": "Query SEC financial data for US listed companies",
"parameters": {
"type": "object",
"properties": {
"company_name": {
"type": "string",
"description": "Company name or stock symbol (e.g., Apple, NVIDIA, AAPL)"
},
"query_type": {
"type": "string",
"enum": ["Latest Financial Data", "3-Year Trends", "5-Year Trends"],
"description": "Type of financial query"
}
},
"required": ["company_name", "query_type"]
}
}
]
# 工具执行函数
def execute_tool(tool_name, **kwargs):
"""执行 MCP 工具"""
if tool_name == "query_financial_data":
return query_financial_data(kwargs.get("company_name"), kwargs.get("query_type"))
return f"Unknown tool: {tool_name}"
# 创建超链接
def create_source_link(source_form, source_url=None):
"""为Source Form创建超链接,使用MCP后端返回的URL"""
if not source_form or source_form == 'N/A':
return source_form
# 如果后端提供了URL,使用后端的URL
if source_url and source_url != 'N/A':
return f"[{source_form}]({source_url})"
# 如果没有URL,只显示文本
return source_form
def query_financial_data(company_name, query_type):
"""查询财务数据的主函数"""
if not company_name:
return "Please enter a company name or stock symbol"
# 翻译英文查询类型为中文(用于后端处理)
query_type_mapping = {
"Latest": "最新财务数据",
"3-Year": "3年趋势",
"5-Year": "5年趋势",
"Filings": "公司报表列表"
}
internal_query_type = query_type_mapping.get(query_type, query_type)
try:
# 使用 MCP 协议调用工具
# 先搜索公司(使用 advanced_search_company)
search_resp = requests.post(
f"{MCP_URL}/message",
json={
"method": "tools/call",
"params": {
"name": "advanced_search_company",
"arguments": {"company_input": company_name}
}
},
headers=HEADERS,
timeout=30
)
print(f"搜索公司:{company_name},search_resp.status_code: {search_resp.status_code}\nSearch Response: {search_resp.text}")
if search_resp.status_code != 200:
print(f"❌ Server Error: HTTP {search_resp.status_code}\n\nResponse: {search_resp.text[:500]}")
return []
try:
result = search_resp.json()
# 使用统一的 MCP 响应解析函数
company = parse_mcp_response(result)
except (ValueError, KeyError, json.JSONDecodeError) as e:
return f"❌ JSON Parse Error: {str(e)}\n\nResponse: {search_resp.text[:500]}"
if isinstance(company, dict) and company.get("error"):
return f"❌ Error: {company['error']}"
# advanced_search 返回的字段: cik, name, ticker
# 注意: 不是 tickers 和 sic_description
company_name = company.get('name', 'Unknown')
ticker = company.get('ticker', 'N/A')
result = f"# {company_name}\n\n"
result += f"**Stock Symbol**: {ticker}\n"
# sic_description 需要后续通过 get_company_info 获取,这里暂时不显示
result += "\n---\n\n"
# 获取并格式化 CIK 为 10 位标准格式
cik = normalize_cik(company.get('cik'))
if not cik:
return result + f"❌ Error: Invalid CIK from company search\n\nDebug: company data = {json.dumps(company, indent=2)}"
# 根据查询类型获取数据
if internal_query_type == "最新财务数据":
data_resp = requests.post(
f"{MCP_URL}/message",
json={
"method": "tools/call",
"params": {
"name": "get_latest_financial_data",
"arguments": {"cik": cik}
}
},
headers=HEADERS,
timeout=30
)
if data_resp.status_code != 200:
return result + f"❌ Server Error: HTTP {data_resp.status_code}\n\n{data_resp.text[:500]}"
try:
data_result = data_resp.json()
# 使用统一的 MCP 响应解析函数
data = parse_mcp_response(data_result)
except (ValueError, KeyError, json.JSONDecodeError) as e:
return result + f"❌ JSON Parse Error: {str(e)}\n\n{data_resp.text[:500]}"
if isinstance(data, dict) and data.get("error"):
return result + f"❌ {data['error']}"
cik = data.get('cik')
result += f"## Fiscal Year {data.get('period', 'N/A')}\n\n"
total_revenue = data.get('total_revenue', 0) / 1e9 if data.get('total_revenue') else 0
net_income = data.get('net_income', 0) / 1e9 if data.get('net_income') else 0
eps = data.get('earnings_per_share', 0) if data.get('earnings_per_share') else 0
opex = data.get('operating_expenses', 0) / 1e9 if data.get('operating_expenses') else 0
ocf = data.get('operating_cash_flow', 0) / 1e9 if data.get('operating_cash_flow') else 0
result += f"- **Total Revenue**: {format_value(total_revenue)}\n"
result += f"- **Net Income**: {format_value(net_income)}\n"
result += f"- **Earnings Per Share**: {format_value(eps, 'eps')}\n"
result += f"- **Operating Expenses**: {format_value(opex)}\n"
result += f"- **Operating Cash Flow**: {format_value(ocf)}\n"
# 使用后端返回的 source_url
source_form = data.get('source_form', 'N/A')
source_url = data.get('source_url', None) # 从后端获取URL
result += f"- **Source Form**: {create_source_link(source_form, source_url)}\n"
elif internal_query_type == "3年趋势":
metrics_resp = requests.post(
f"{MCP_URL}/message",
json={
"method": "tools/call",
"params": {
"name": "extract_financial_metrics",
"arguments": {"cik": cik, "years": 3}
}
},
headers=HEADERS,
timeout=60
)
# 调试:显示 HTTP 响应状态
result += f"\n**Debug Info (3-Year)**:\n- HTTP Status: {metrics_resp.status_code}\n"
if metrics_resp.status_code != 200:
return result + f"❌ Server Error: HTTP {metrics_resp.status_code}\n\n{metrics_resp.text[:500]}"
try:
metrics_result = metrics_resp.json()
# 调试:显示原始 JSON 响应
result += f"- Raw Response Length: {len(metrics_resp.text)} chars\n"
result += f"- Response Preview: {metrics_resp.text[:200]}...\n\n"
# 使用统一的 MCP 响应解析函数
metrics = parse_mcp_response(metrics_result)
# 调试:显示解析后的数据类型和内容
result += f"- Parsed Type: {type(metrics).__name__}\n"
if isinstance(metrics, dict):
result += f"- Parsed Keys: {list(metrics.keys())}\n"
result += f"- Periods: {metrics.get('periods', 'N/A')}\n"
result += f"- Data Length: {len(metrics.get('data', []))}\n\n"
except (ValueError, KeyError, json.JSONDecodeError) as e:
return result + f"❌ JSON Parse Error: {str(e)}\n\nResponse: {metrics_resp.text[:500]}"
if isinstance(metrics, dict) and metrics.get("error"):
return result + f"❌ {metrics['error']}"
# 调试:显示原始响应
if not isinstance(metrics, dict):
return result + f"❌ Invalid response format\n\nDebug: {str(metrics)[:500]}"
result += f"## 3-Year Financial Trends ({metrics.get('periods', 0)} periods)\n\n"
# 显示所有数据(包括年度和季度)
all_data = metrics.get('data', []) # MCP Server 返回的字段是 'data'
# 调试:检查是否有数据
if not all_data:
return result + f"❌ No data returned from MCP Server\n\nDebug: metrics keys = {list(metrics.keys())}\n\nFull response: {json.dumps(metrics, indent=2, ensure_ascii=False)[:1000]}"
# 去重:根据period和source_form去重
seen = set()
unique_data = []
for m in all_data:
key = (m.get('period', 'N/A'), m.get('source_form', 'N/A'))
if key not in seen:
seen.add(key)
unique_data.append(m)
# 按期间降序排序,确保显示最近的3年数据
# 使用更智能的排序:先按年份,再按是否是季度
# 正确顺序:FY2024 → 2024Q3 → 2024Q2 → 2024Q1 → FY2023
def sort_key(x):
period = x.get('period', '0000')
# 提取年份(前4位)
year = period[:4] if len(period) >= 4 else '0000'
# 如果有Q,提取季度号
if 'Q' in period:
quarter = period[period.index('Q')+1] if period.index('Q')+1 < len(period) else '0'
return (year, 1, 4 - int(quarter)) # Q在FY后面:Q3, Q2, Q1 (4-3=1, 4-2=2, 4-1=3)
else:
return (year, 0, 0) # FY 排在同年的所有Q之前
unique_data = sorted(unique_data, key=sort_key, reverse=True)
result += "| Period | Revenue (B) | Net Income (B) | EPS | Operating Expenses (B) | Operating Cash Flow (B) | Source Form |\n"
result += "|--------|-------------|----------------|-----|------------------------|-------------------------|-------------|\n"
for m in unique_data:
period = m.get('period', 'N/A')
rev = (m.get('total_revenue') or 0) / 1e9
inc = (m.get('net_income') or 0) / 1e9
eps_val = m.get('earnings_per_share') or 0
opex = (m.get('operating_expenses') or 0) / 1e9
ocf = (m.get('operating_cash_flow') or 0) / 1e9
source_form = m.get('source_form', 'N/A')
source_url = m.get('source_url', None) # 从后端获取URL
# 区分年度和季度,修复双重FY前缀问题
if 'Q' in period:
# 季度数据,不添加前缀
display_period = period
else:
# 年度数据,只在没有FY的情况下添加
display_period = period if period.startswith('FY') else f"FY{period}"
source_link = create_source_link(source_form, source_url)
result += f"| {display_period} | {format_value(rev)} | {format_value(inc)} | {format_value(eps_val, 'eps')} | {format_value(opex)} | {format_value(ocf)} | {source_link} |\n"
elif internal_query_type == "5年趋势":
metrics_resp = requests.post(
f"{MCP_URL}/message",
json={
"method": "tools/call",
"params": {
"name": "extract_financial_metrics",
"arguments": {"cik": cik, "years": 5}
}
},
headers=HEADERS,
timeout=60
)
# 调试:显示 HTTP 响应状态
result += f"\n**Debug Info (5-Year)**:\n- HTTP Status: {metrics_resp.status_code}\n"
if metrics_resp.status_code != 200:
return result + f"❌ Server Error: HTTP {metrics_resp.status_code}\n\n{metrics_resp.text[:500]}"
try:
metrics_result = metrics_resp.json()
# 调试:显示原始 JSON 响应
result += f"- Raw Response Length: {len(metrics_resp.text)} chars\n"
result += f"- Response Preview: {metrics_resp.text[:200]}...\n\n"
# 使用统一的 MCP 响应解析函数
metrics = parse_mcp_response(metrics_result)
# 调试:显示解析后的数据类型和内容
result += f"- Parsed Type: {type(metrics).__name__}\n"
if isinstance(metrics, dict):
result += f"- Parsed Keys: {list(metrics.keys())}\n"
result += f"- Periods: {metrics.get('periods', 'N/A')}\n"
result += f"- Data Length: {len(metrics.get('data', []))}\n\n"
except (ValueError, KeyError, json.JSONDecodeError) as e:
return result + f"❌ JSON Parse Error: {str(e)}\n\nResponse: {metrics_resp.text[:500]}"
if isinstance(metrics, dict) and metrics.get("error"):
return result + f"❌ {metrics['error']}"
# 调试:显示原始响应
if not isinstance(metrics, dict):
return result + f"❌ Invalid response format\n\nDebug: {str(metrics)[:500]}"
# 显示所有数据(包括年度和季度)
all_data = metrics.get('data', []) # MCP Server 返回的字段是 'data'
# 调试:检查是否有数据
if not all_data:
return result + f"❌ No data returned from MCP Server\n\nDebug: metrics keys = {list(metrics.keys())}\n\nFull response: {json.dumps(metrics, indent=2, ensure_ascii=False)[:1000]}"
# 去重:根据period和source_form去重
seen = set()
unique_data = []
for m in all_data:
key = (m.get('period', 'N/A'), m.get('source_form', 'N/A'))
if key not in seen:
seen.add(key)
unique_data.append(m)
# 按期间降序排序,确保显示最近的5年数据
# 使用更智能的排序:先按年份,再按是否是季度
# 正确顺序:FY2024 → 2024Q3 → 2024Q2 → 2024Q1 → FY2023
def sort_key(x):
period = x.get('period', '0000')
# 提取年份(前4位)
year = period[:4] if len(period) >= 4 else '0000'
# 如果有Q,提取季度号
if 'Q' in period:
quarter = period[period.index('Q')+1] if period.index('Q')+1 < len(period) else '0'
return (year, 1, 4 - int(quarter)) # Q在FY后面:Q3, Q2, Q1 (4-3=1, 4-2=2, 4-1=3)
else:
return (year, 0, 0) # FY 排在同年的所有Q之前
unique_data = sorted(unique_data, key=sort_key, reverse=True)
print(f'5年数据::{unique_data}')
result = unique_data
elif internal_query_type == "公司报表列表":
# 查询公司所有报表
filings_resp = requests.post(
f"{MCP_URL}/message",
json={
"method": "tools/call",
"params": {
"name": "get_company_filings",
"arguments": {"cik": cik, "limit": 50}
}
},
headers=HEADERS,
timeout=60
)
if filings_resp.status_code != 200:
return result + f"❌ Server Error: HTTP {filings_resp.status_code}\n\n{filings_resp.text[:500]}"
try:
filings_result = filings_resp.json()
# 使用统一的 MCP 响应解析函数
filings_data = parse_mcp_response(filings_result)
except (ValueError, KeyError, json.JSONDecodeError) as e:
return result + f"❌ JSON Parse Error: {str(e)}\n\n{filings_resp.text[:500]}"
if isinstance(filings_data, dict) and filings_data.get("error"):
return result + f"❌ {filings_data['error']}"
filings = filings_data.get('filings', []) if isinstance(filings_data, dict) else filings_data
result += f"## Company Filings ({len(filings)} records)\n\n"
result += "| Form Type | Filing Date | Accession Number | Primary Document |\n"
result += "|-----------|-------------|------------------|------------------|\n"
for filing in filings:
form_type = filing.get('form_type', 'N/A')
filing_date = filing.get('filing_date', 'N/A')
accession_num = filing.get('accession_number', 'N/A')
primary_doc = filing.get('primary_document', 'N/A')
filing_url = filing.get('filing_url', None) # 从后端获取URL
# 使用后端返回的URL创建链接
if filing_url and filing_url != 'N/A':
form_link = f"[{form_type}]({filing_url})"
primary_doc_link = f"[{primary_doc}]({filing_url})"
else:
form_link = form_type
primary_doc_link = primary_doc
result += f"| {form_link} | {filing_date} | {accession_num} | {primary_doc_link} |\n"
return result
except requests.exceptions.RequestException as e:
return f"❌ Network Error: {str(e)}\n\nMCP Server: {MCP_URL}"
except Exception as e:
import traceback
return f"❌ Unexpected Error: {str(e)}\n\nTraceback:\n{traceback.format_exc()}"