""" Gradio MCP Client for Remote MCP Server - With File Upload """ import json import os import shutil import warnings from contextlib import asynccontextmanager import gradio as gr from fastmcp import Client from fastmcp.client.transports import StreamableHttpTransport from openai import OpenAI # Suppress deprecation warnings warnings.filterwarnings( "ignore", category=DeprecationWarning, module="websockets.legacy" ) warnings.filterwarnings( "ignore", category=DeprecationWarning, module="uvicorn.protocols.websockets" ) # Import orchestrator functions (if available) try: from orchestrator import run_orchestrated_chat, run_orchestrated_chat_stream except ImportError: # Fallback if orchestrator module not found run_orchestrated_chat = None run_orchestrated_chat_stream = None # Configuration MCP_SERVER_URL = "https://mcp-1st-birthday-auto-deployer.hf.space/gradio_api/mcp/" OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") MODEL = "gpt-5-mini" # Will be set when app launches APP_URL = None class MCPClientManager: def __init__(self, server_url: str): self.server_url = server_url @asynccontextmanager async def get_client(self): transport = StreamableHttpTransport(self.server_url) async with Client(transport) as client: yield client async def get_tools(self) -> list: async with self.get_client() as client: return await client.list_tools() async def call_tool(self, tool_name: str, arguments: dict) -> str: async with self.get_client() as client: result = await client.call_tool(tool_name, arguments) if hasattr(result, "content"): if isinstance(result.content, list): return "\n".join( str(item.text) if hasattr(item, "text") else str(item) for item in result.content ) return str(result.content) return str(result) def to_openai_tools(self, tools: list) -> list: return [ { "type": "function", "function": { "name": tool.name, "description": tool.description or "", "parameters": { "type": "object", "properties": tool.inputSchema.get("properties", {}) if tool.inputSchema else {}, "required": tool.inputSchema.get("required", []) if tool.inputSchema else [], }, }, } for tool in tools ] mcp = MCPClientManager(MCP_SERVER_URL) openai_client = OpenAI(api_key=OPENAI_API_KEY) SYSTEM_PROMPT = """You are a helpful ML assistant with access to Auto Deployer tools. IMPORTANT: When calling tools with file_path parameter: - Use the provided file URL directly - Pass ONLY the raw URL (e.g., "https://...") - Never add prefixes like "Gradio File Input - " Always pass URLs directly without any prefix.""" async def chat(message: str, history: list, file_url: str): """Process chat with optional file URL""" tools = await mcp.get_tools() openai_tools = mcp.to_openai_tools(tools) messages = [{"role": "system", "content": SYSTEM_PROMPT}] # Add file context if available user_content = message if file_url: user_content = f"[Uploaded CSV file URL: {file_url}]\n\n{message}" # Build history for item in history: if isinstance(item, (list, tuple)) and len(item) == 2: user_msg, assistant_msg = item messages.append({"role": "user", "content": user_msg}) if assistant_msg: messages.append({"role": "assistant", "content": assistant_msg}) messages.append({"role": "user", "content": user_content}) # First call response = openai_client.chat.completions.create( model=MODEL, messages=messages, tools=openai_tools, tool_choice="auto", ) assistant_message = response.choices[0].message # Handle tool calls while assistant_message.tool_calls: messages.append(assistant_message) yield "🔧 Calling tools...\n\n" for tool_call in assistant_message.tool_calls: tool_name = tool_call.function.name arguments = json.loads(tool_call.function.arguments) # Clean file_path if "file_path" in arguments: fp = arguments["file_path"] if fp.startswith("Gradio File Input - "): arguments["file_path"] = fp.replace("Gradio File Input - ", "") yield f"⚙️ Running `{tool_name}`...\n\n" try: tool_result = await mcp.call_tool(tool_name, arguments) except Exception as e: tool_result = f"Error: {e}" messages.append( { "role": "tool", "tool_call_id": tool_call.id, "content": tool_result, } ) response = openai_client.chat.completions.create( model=MODEL, messages=messages, tools=openai_tools, tool_choice="auto", ) assistant_message = response.choices[0].message # Stream final response stream = openai_client.chat.completions.create( model=MODEL, messages=messages, stream=True, ) partial_response = "" for chunk in stream: if chunk.choices[0].delta.content: partial_response += chunk.choices[0].delta.content yield partial_response def handle_upload(file_obj, request: gr.Request): """ 1) Take uploaded file 2) Copy to /tmp for a stable path 3) Build a public gradio file URL """ if file_obj is None: return None, None # Local path where Gradio stored the file local_path = file_obj.name # Optional: stabilize path under /tmp stable_path = os.path.join("/tmp", os.path.basename(local_path)) try: shutil.copy(local_path, stable_path) local_path = stable_path except Exception: # If copy fails, use original path pass # Use Gradio's internal file URL format base_url = str(request.base_url).rstrip('/') public_url = f"{base_url}/gradio_api/file={local_path}" return public_url, public_url async def chat_send_stream(user_msg, history, file_url): """ Streaming chat function that yields updates including tool invocations. - history: list of message dictionaries with 'role' and 'content' keys - file_url: required HTTP URL to the uploaded file """ if history is None: history = [] # Ensure history is in proper dict format messages = [] for item in history: if isinstance(item, dict) and "role" in item and "content" in item: messages.append(item) elif isinstance(item, (list, tuple)) and len(item) == 2: user_msg_item, assistant_msg_item = item messages.append({"role": "user", "content": str(user_msg_item)}) if assistant_msg_item: messages.append({"role": "assistant", "content": str(assistant_msg_item)}) # Add current user message messages.append({"role": "user", "content": user_msg}) # Add thinking placeholder messages.append({"role": "assistant", "content": "🤔 Thinking..."}) # If no file, respond with error if not file_url: messages[-1] = {"role": "assistant", "content": "Upload a file first."} yield messages return # Use orchestrator if available if run_orchestrated_chat_stream: # Convert to tuple format for orchestrator (excluding current thinking message) history_tuples = [] for item in messages[:-1]: if item["role"] == "user": history_tuples.append((item.get("content", ""), "")) elif item["role"] == "assistant": if history_tuples: history_tuples[-1] = (history_tuples[-1][0], item.get("content", "")) # Stream the response using async generator async for chunk in run_orchestrated_chat_stream( user_msg, history_tuples, file_url ): chunk_type = chunk.get("type", "") chunk_content = chunk.get("content", "") if chunk_type == "thinking": messages[-1] = {"role": "assistant", "content": chunk_content} yield messages elif chunk_type == "tool": messages[-1] = {"role": "assistant", "content": messages[-1]["content"] + f"\n{chunk_content}"} yield messages elif chunk_type == "result": messages[-1] = {"role": "assistant", "content": messages[-1]["content"] + f"\n{chunk_content}"} yield messages elif chunk_type == "final": messages[-1] = {"role": "assistant", "content": chunk_content} yield messages elif chunk_type == "error": messages[-1] = {"role": "assistant", "content": chunk_content} yield messages else: # Fallback: use the existing chat function with streaming simple_history = [item for item in messages[:-1] if item["role"] in ["user", "assistant"]] response_text = "" async for chunk in chat(user_msg, simple_history, file_url): response_text = chunk messages[-1] = {"role": "assistant", "content": response_text} yield messages with gr.Blocks(title="MCP + GPT-5 mini - Streaming Chat") as demo: gr.Markdown( """ # AI-Driven MLOps Agent 🤖 - **Upload a CSV file** (required) - Real-time streaming with live tool invocations - Get intelligent insights, training, or deployment based on your needs """ ) uploader = gr.File( label="Required CSV file upload", file_count="single", type="filepath", file_types=[".csv"], # Restrict to CSV files only ) # Internal file URL storage (hidden from UI) file_url_state = gr.State(value=None) # Use message format for better streaming support chatbot = gr.Chatbot( label="Chat", avatar_images=( None, "https://huggingface.co/datasets/huggingface/brand-assets/resolve/main/hf-logo.png", ), ) msg = gr.Textbox(label="Message", interactive=True) send = gr.Button("Send", interactive=True) # When file changes, generate URL and update state uploader.change( handle_upload, inputs=[uploader], outputs=[file_url_state], ) # Send button (streaming) - update chatbot and clear input send.click( chat_send_stream, inputs=[msg, chatbot, file_url_state], outputs=[chatbot], ).then(lambda: "", outputs=[msg]) # Press Enter to send (streaming) - update chatbot and clear input msg.submit( chat_send_stream, inputs=[msg, chatbot, file_url_state], outputs=[chatbot], ).then(lambda: "", outputs=[msg]) async def test_mcp_connection(): """Test MCP connection on startup""" try: print("Testing MCP server connection...") tools = await mcp.get_tools() print(f"✅ Connected to MCP server. Found {len(tools)} tools.") return True except Exception as e: print(f"❌ Failed to connect to MCP server: {e}") return False if __name__ == "__main__": import asyncio import warnings # Suppress all warnings for cleaner output warnings.filterwarnings("ignore") # Test MCP connection on startup try: print(f"Attempting to connect to MCP server: {MCP_SERVER_URL}") asyncio.run(test_mcp_connection()) except Exception as e: print(f"MCP connection test failed: {e}") print("Continuing anyway - connection will be retried during chat...") # Launch the app demo.queue().launch( allowed_paths=["/tmp"], ssr_mode=False, show_error=True, quiet=True, )