import json
import time
from .mcp_client import MCP_Client
BASE_URL = "https://dashscope.aliyuncs.com/compatible-mode/v1"
MODEL = "qwen3-max"
TOKEN = "sk-ef26097310ec45c184e8d84b31ea9356"
# Alpha Vantage MCP配置
MCP_CONFIG = {
"mcpServers": {
"alpha-vantage": {
"command": "npx",
"args": ["alpha-ventage-mcp"],
"env": {
"ALPHA_VANTAGE_API_KEY": "97Q9TT7I6J9ZOLDS"
}
}
}
}
def extract_stock_symbol(user_input: str) -> str:
"""
从用户输入中提取股票代码
支持多种格式:
- "查询阿里巴巴股票" -> BABA
- "查询AAPL股票" -> AAPL
- "我想了解MSFT的股价" -> MSFT
- "BABA股价如何" -> BABA
"""
# 预定义一些常见股票代码的映射
stock_mapping = {
"阿里巴巴": "BABA",
"阿里": "BABA",
"腾讯": "TCEHY",
"微软": "MSFT",
"苹果": "AAPL",
"谷歌": "GOOGL",
"亚马逊": "AMZN",
"特斯拉": "TSLA",
"英伟达": "NVDA",
"百度": "BIDU",
"京东": "JD",
"拼多多": "PDD"
}
# 检查预定义映射
for chinese_name, symbol in stock_mapping.items():
if chinese_name in user_input:
return symbol
# 尝试直接匹配常见的股票代码格式 (如 AAPL, MSFT 等)
import re
# 匹配常见的股票代码格式
patterns = [
r'\b([A-Z]{1,5})\b', # 大写字母组成的股票代码
r'股票代码[是为]?([A-Z]{1,5})',
r'([A-Z]{1,5})股票'
]
for pattern in patterns:
match = re.search(pattern, user_input)
if match:
symbol = match.group(1)
# 验证是否为有效的股票代码格式
if 1 <= len(symbol) <= 5 and symbol.isalpha():
return symbol
# 默认返回阿里巴巴
return "BABA"
def get_stock_price(symbol: str):
"""获取股票价格"""
client = MCP_Client(MCP_CONFIG)
if not client.initialize():
raise Exception("MCP初始化失败")
try:
result = client.call_tool("get_stock_price", {"symbol": symbol})
client.close()
return result
except Exception as e:
client.close()
raise e
def get_company_overview(symbol: str):
"""获取公司概况"""
client = MCP_Client(MCP_CONFIG)
if not client.initialize():
raise Exception("MCP初始化失败")
try:
result = client.call_tool("get_company_overview", {"symbol": symbol})
client.close()
return result
except Exception as e:
client.close()
raise e
def get_daily_time_series(symbol: str):
"""获取每日时间序列数据"""
client = MCP_Client(MCP_CONFIG)
if not client.initialize():
raise Exception("MCP初始化失败")
try:
result = client.call_tool("get_daily_time_series", {"symbol": symbol})
client.close()
return result
except Exception as e:
client.close()
raise e
def get_forex_rate(from_currency: str = "USD", to_currency: str = "CNY"):
"""获取外汇汇率"""
client = MCP_Client(MCP_CONFIG)
if not client.initialize():
raise Exception("MCP初始化失败")
try:
result = client.call_tool("get_forex_rate", {
"from_currency": from_currency,
"to_currency": to_currency
})
client.close()
return result
except Exception as e:
client.close()
raise e
def get_tools_for_stock(symbol: str):
"""根据股票代码生成工具列表"""
return [
{
"name": "get_stock_price",
"description": "获取股票价格",
"params": {"symbol": symbol},
},
{
"name": "get_company_overview",
"description": "获取公司概况",
"params": {"symbol": symbol},
},
{
"name": "get_daily_time_series",
"description": "获取每日时间序列数据",
"params": {"symbol": symbol},
},
{
"name": "get_forex_rate",
"description": "获取外汇汇率",
"params": {"from_currency": "USD", "to_currency": "CNY"},
}
]
def call_llm(prompt: str):
"""模拟调用大模型的函数(你可以替换为真实请求)"""
import requests
headers = {
"Authorization": f"Bearer {TOKEN}",
"Content-Type": "application/json"
}
data = {
"model": MODEL,
"messages": [{"role": "user", "content": prompt}],
"stream": True # 启用流式响应
}
response = requests.post(f"{BASE_URL}/chat/completions", headers=headers, json=data, stream=True)
for line in response.iter_lines():
if line:
yield line.decode('utf-8')
def get_stock_data_example(symbol: str):
"""示例函数:获取股票数据的简化调用方式"""
try:
# 获取股票价格
price_data = get_stock_price(symbol)
print(f"股票价格数据: {price_data}")
# 获取公司概况
overview_data = get_company_overview(symbol)
print(f"公司概况数据: {overview_data}")
# 获取每日时间序列
time_series_data = get_daily_time_series(symbol)
print(f"时间序列数据: {time_series_data}")
# 获取外汇汇率
forex_data = get_forex_rate()
print(f"外汇汇率数据: {forex_data}")
return {
"price": price_data,
"overview": overview_data,
"time_series": time_series_data,
"forex": forex_data
}
except Exception as e:
print(f"获取股票数据时出错: {str(e)}")
raise e
def process_tool_analysis(user_input: str, history: list):
"""
处理工具分析的主要逻辑
- user_input: str
- history: list of {"role": ..., "content": ...}
Returns generator for streaming.
"""
if not user_input.strip():
yield "", history
return
# 添加用户消息到历史
history.append({"role": "user", "content": user_input})
try:
# 从用户输入中提取股票代码
stock_symbol = extract_stock_symbol(user_input)
# 根据股票代码生成工具列表
tools_to_test = get_tools_for_stock(stock_symbol)
# 创建MCP客户端
client = MCP_Client(MCP_CONFIG)
# 初始化MCP会话
if not client.initialize():
error_msg = "❌ MCP初始化失败"
history.append({"role": "assistant", "content": error_msg})
yield "", history
return
# 收集所有工具的查询结果
all_results = []
# 依次调用每个工具
for i, tool in enumerate(tools_to_test, 1):
# 添加工具标题(使用HTML实现可折叠效果,默认折叠)
tool_content = f''' 分析结果: {result_msg}
[MCP] 🔧 工具 {i}/{len(tools_to_test)}: {tool['name']} ({tool['description']})
') + len('
')
if start_idx != -1 and end_idx != -1:
summary_content = history[-1]["content"][start_idx:end_idx]
# 更新工具标题的类名和状态文本(添加过渡效果)
updated_summary = summary_content.replace('tool-header querying',
'tool-header analyzing').replace(
'status-tag querying', 'status-tag analyzing').replace('查询中', '分析中')
history[-1]["content"] = history[-1]["content"].replace(summary_content, updated_summary)
yield "", history
# 流式显示模型处理结果(在可折叠面板内部)
model_summary = ""
# 移除所有冗余提示文本,直接显示分析结果区域
history[-1]["content"] += "') + len('
')
if start_idx != -1 and end_idx != -1:
summary_start = history[-1]["content"][start_idx:end_idx]
updated_summary = summary_start.replace('tool-header analyzing',
'tool-header completed').replace(
'status-tag analyzing', 'status-tag completed').replace('分析中', '已完成')
history[-1]["content"] = history[-1]["content"].replace(summary_start, updated_summary)
# 关闭可折叠面板
history[-1]["content"] += "