File size: 4,185 Bytes
fae4e5b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
"""
Synchronous wrapper for MCP Client
Provides sync interface for Gradio event handlers
"""

import asyncio
from typing import Optional, Dict, Any, List
from .client import MCPClient


class SyncMCPClient:
    """Synchronous wrapper for MCPClient to use in Gradio event handlers"""

    def __init__(self, server_url: Optional[str] = None):
        self.client = MCPClient(server_url)
        self._loop = None

    def _get_or_create_event_loop(self):
        """Get or create an event loop for async operations"""
        try:
            loop = asyncio.get_event_loop()
            if loop.is_closed():
                loop = asyncio.new_event_loop()
                asyncio.set_event_loop(loop)
        except RuntimeError:
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
        return loop

    def _run_async(self, coro):
        """Run an async coroutine and return the result"""
        loop = self._get_or_create_event_loop()
        return loop.run_until_complete(coro)

    def initialize(self):
        """Initialize connection to MCP server (sync)"""
        return self._run_async(self.client.initialize())

    def analyze_leaderboard(
        self,
        leaderboard_repo: str = "kshitijthakkar/smoltrace-leaderboard",
        metric_focus: str = "overall",
        time_range: str = "last_week",
        top_n: int = 5,
        hf_token: Optional[str] = None,
        gemini_api_key: Optional[str] = None
    ) -> str:
        """Analyze leaderboard (sync wrapper)"""
        return self._run_async(
            self.client.analyze_leaderboard(
                leaderboard_repo, metric_focus, time_range, top_n, hf_token, gemini_api_key
            )
        )

    def debug_trace(
        self,
        trace_data: Dict[str, Any],
        question: str,
        metrics_data: Optional[Dict[str, Any]] = None,
        hf_token: Optional[str] = None,
        gemini_api_key: Optional[str] = None
    ) -> str:
        """Debug trace (sync wrapper)"""
        return self._run_async(
            self.client.debug_trace(trace_data, question, metrics_data, hf_token, gemini_api_key)
        )

    def estimate_cost(
        self,
        model: str,
        agent_type: str = "both",
        num_tests: int = 100,
        hardware: Optional[str] = None,
        hf_token: Optional[str] = None,
        gemini_api_key: Optional[str] = None
    ) -> str:
        """Estimate cost (sync wrapper)"""
        return self._run_async(
            self.client.estimate_cost(model, agent_type, num_tests, hardware, hf_token, gemini_api_key)
        )

    def compare_runs(
        self,
        run_data_list: List[Dict[str, Any]],
        focus_metrics: Optional[List[str]] = None,
        hf_token: Optional[str] = None,
        gemini_api_key: Optional[str] = None
    ) -> str:
        """Compare runs (sync wrapper)"""
        return self._run_async(
            self.client.compare_runs(run_data_list, focus_metrics, hf_token, gemini_api_key)
        )

    def analyze_results(
        self,
        results_data: List[Dict[str, Any]],
        analysis_focus: str = "optimization",
        hf_token: Optional[str] = None,
        gemini_api_key: Optional[str] = None
    ) -> str:
        """Analyze results (sync wrapper)"""
        return self._run_async(
            self.client.analyze_results(results_data, analysis_focus, hf_token, gemini_api_key)
        )

    def get_dataset_info(
        self,
        dataset_repo: str,
        hf_token: Optional[str] = None,
        gemini_api_key: Optional[str] = None
    ) -> str:
        """Get dataset info (sync wrapper)"""
        return self._run_async(
            self.client.get_dataset_info(dataset_repo, hf_token, gemini_api_key)
        )

    def close(self):
        """Close the MCP client (sync wrapper)"""
        return self._run_async(self.client.close())


# Global instance
_sync_mcp_client: Optional[SyncMCPClient] = None


def get_sync_mcp_client() -> SyncMCPClient:
    """Get or create the global synchronous MCP client"""
    global _sync_mcp_client
    if _sync_mcp_client is None:
        _sync_mcp_client = SyncMCPClient()
    return _sync_mcp_client