File size: 1,900 Bytes
85653bc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
#!/usr/bin/env python3
"""
测试币安API连接和数据获取功能
"""

import sys
import os
sys.path.append(os.path.dirname(os.path.abspath(__file__)))

from binance.client import Client
import pandas as pd

def test_binance_connection():
    """测试币安连接"""
    try:
        # 初始化客户端(使用公开API)
        client = Client("", "")
        
        # 测试获取服务器时间
        server_time = client.get_server_time()
        print(f"✅ 币安服务器连接成功,服务器时间: {server_time}")
        
        # 测试获取交易对信息
        exchange_info = client.get_exchange_info()
        print(f"✅ 获取交易所信息成功,共有 {len(exchange_info['symbols'])} 个交易对")
        
        # 测试获取K线数据
        klines = client.get_klines(symbol='BTCUSDT', interval='1h', limit=10)
        print(f"✅ 获取BTCUSDT K线数据成功,获取到 {len(klines)} 条数据")
        
        # 转换为DataFrame并显示
        df = pd.DataFrame(klines, columns=[
            'timestamp', 'open', 'high', 'low', 'close', 'volume',
            'close_time', 'quote_asset_volume', 'number_of_trades',
            'taker_buy_base_asset_volume', 'taker_buy_quote_asset_volume', 'ignore'
        ])
        
        df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
        print("\n最新的5条K线数据:")
        print(df[['timestamp', 'open', 'high', 'low', 'close', 'volume']].tail())
        
        return True
        
    except Exception as e:
        print(f"❌ 币安连接测试失败: {e}")
        return False

if __name__ == "__main__":
    print("🚀 开始测试币安API连接...")
    success = test_binance_connection()
    
    if success:
        print("\n✅ 所有测试通过!币安API集成准备就绪。")
    else:
        print("\n❌ 测试失败,请检查网络连接。")