baba521's picture
test
f71610e
import json
import requests
import subprocess
import os
from typing import Dict, Any, Generator, Optional
import time
class MCP_Client:
"""MCP客户端,支持本地命令行和HTTP连接方式"""
def __init__(self, config: Dict[str, Any]):
"""
初始化MCP客户端
Args:
config: MCP服务器配置
"""
self.config = config
self.server_name = list(config["mcpServers"].keys())[0]
self.server_config = config["mcpServers"][self.server_name]
# 根据配置类型初始化不同的连接方式
if "command" in self.server_config:
# 本地命令行方式
self.connection_type = "local"
self.process: Optional[subprocess.Popen] = None
else:
# HTTP方式
self.connection_type = "http"
self.url = self.server_config.get("url")
self.headers = self.server_config.get("headers", {})
# 初始化状态
self.initialized = False
def initialize(self) -> bool:
"""
初始化MCP会话
Returns:
bool: 初始化是否成功
"""
if self.initialized:
return True
try:
if self.connection_type == "local":
# 启动本地MCP进程
command = self.server_config["command"]
args = self.server_config.get("args", [])
env = self.server_config.get("env", {})
# 合并环境变量
process_env = os.environ.copy()
process_env.update(env)
self.process = subprocess.Popen(
[command] + args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=process_env,
text=True,
bufsize=1
)
# 发送初始化请求到子进程
init_request = {
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"protocolVersion": "2025-06-18",
"capabilities": {
"experimental": {},
"sampling": {},
"elicitations": {},
"roots": {}
},
"clientInfo": {
"name": "mcp-client",
"version": "1.0.0"
}
}
}
if self.process.stdin is not None:
self.process.stdin.write(json.dumps(init_request) + "\n")
self.process.stdin.flush()
# 读取响应
if self.process.stdout is not None:
response_line = self.process.stdout.readline()
response_data = json.loads(response_line.strip())
else:
raise Exception("无法读取进程输出")
if "result" in response_data:
# 发送初始化完成通知
initialized_notification = {
"jsonrpc": "2.0",
"method": "notifications/initialized"
}
if self.process.stdin is not None:
self.process.stdin.write(json.dumps(initialized_notification) + "\n")
self.process.stdin.flush()
self.initialized = True
return True
else:
# HTTP方式初始化
init_request = {
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"protocolVersion": "2025-06-18",
"capabilities": {
"experimental": {},
"sampling": {},
"elicitations": {},
"roots": {}
},
"clientInfo": {
"name": "mcp-http-client",
"version": "1.0.0"
}
}
}
init_response = requests.post(
self.url,
json=init_request,
headers={**self.headers, "Content-Type": "application/json"},
timeout=10
)
if init_response.status_code == 200:
response_data = init_response.json()
if "result" in response_data:
# 发送初始化完成通知
initialized_notification = {
"jsonrpc": "2.0",
"method": "notifications/initialized"
}
requests.post(
self.url,
json=initialized_notification,
headers={**self.headers, "Content-Type": "application/json"}
)
self.initialized = True
return True
print(f"MCP初始化失败")
return False
except Exception as e:
print(f"MCP初始化错误: {e}")
return False
def call_tool(self, tool_name: str, arguments: Dict[str, Any], timeout: int = 90) -> Dict[str, Any]:
"""
调用MCP工具
Args:
tool_name: 工具名称
arguments: 工具参数
timeout: 超时时间(秒)
Returns:
Dict[str, Any]: 工具执行结果
"""
# 确保已初始化
if not self.initialized:
if not self.initialize():
raise Exception("MCP会话初始化失败")
if self.connection_type == "local":
# 本地命令行方式调用工具
tool_request = {
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {
"name": tool_name,
"arguments": arguments
}
}
if self.process is not None and self.process.stdin is not None:
self.process.stdin.write(json.dumps(tool_request) + "\n")
self.process.stdin.flush()
# 读取响应
if self.process is not None and self.process.stdout is not None:
response_line = self.process.stdout.readline()
response_data = json.loads(response_line.strip())
else:
raise Exception("无法读取进程输出")
# 检查是否有错误
if "error" in response_data:
error = response_data["error"]
raise Exception(f"MCP错误 {error.get('code', 'unknown')}: {error.get('message', 'Unknown error')}")
# 返回结果
return response_data.get("result", {})
else:
# HTTP方式调用工具
tool_request = {
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {
"name": tool_name,
"arguments": arguments
}
}
tool_response = requests.post(
self.url,
json=tool_request,
headers={**self.headers, "Content-Type": "application/json"},
timeout=timeout
)
if tool_response.status_code != 200:
raise Exception(f"工具调用请求失败: HTTP {tool_response.status_code}")
response_data = tool_response.json()
# 检查是否有错误
if "error" in response_data:
error = response_data["error"]
raise Exception(f"MCP错误 {error.get('code', 'unknown')}: {error.get('message', 'Unknown error')}")
# 返回结果
return response_data.get("result", {})
def close(self) -> None:
"""关闭MCP客户端连接"""
if self.connection_type == "local" and self.process:
self.process.terminate()
self.process.wait()
# 对于HTTP客户端,不需要特殊清理
pass
# 使用示例
if __name__ == "__main__":
# Alpha Vantage MCP配置(本地命令行方式)
mcp_config = {
"mcpServers": {
"alpha-vantage": {
"command": "npx",
"args": ["alpha-ventage-mcp"],
"env": {
"ALPHA_VANTAGE_API_KEY": "97Q9TT7I6J9ZOLDS"
}
}
}
}
# 创建MCP客户端
client = MCP_Client(mcp_config)
try:
# 初始化MCP会话
print("正在初始化MCP会话...")
if client.initialize():
print("MCP会话初始化成功")
else:
print("MCP会话初始化失败")
exit(1)
# 示例:调用工具(请根据实际情况替换工具名称和参数)
print("MCP客户端已准备就绪,可以调用工具")
except Exception as e:
print(f"发生错误: {e}")
finally:
# 关闭连接
client.close()
print("MCP客户端连接已关闭")