Spaces:
Running
Running
Abid Ali Awan
refactor: Remove debug print statements and temporary textbox from file upload handling in the Gradio application, streamlining the output structure and enhancing clarity in error messaging.
45bd7ce
| """ | |
| Gradio MCP Client for Remote MCP Server - With File Upload | |
| """ | |
| import json | |
| import os | |
| import shutil | |
| import warnings | |
| from contextlib import asynccontextmanager | |
| import gradio as gr | |
| from fastmcp import Client | |
| from fastmcp.client.transports import StreamableHttpTransport | |
| from openai import OpenAI | |
| # Suppress deprecation warnings | |
| warnings.filterwarnings( | |
| "ignore", category=DeprecationWarning, module="websockets.legacy" | |
| ) | |
| warnings.filterwarnings( | |
| "ignore", category=DeprecationWarning, module="uvicorn.protocols.websockets" | |
| ) | |
| # Import orchestrator functions (if available) | |
| try: | |
| from orchestrator import run_orchestrated_chat, run_orchestrated_chat_stream | |
| except ImportError: | |
| # Fallback if orchestrator module not found | |
| run_orchestrated_chat = None | |
| run_orchestrated_chat_stream = None | |
| # Configuration | |
| MCP_SERVER_URL = "https://mcp-1st-birthday-auto-deployer.hf.space/gradio_api/mcp/" | |
| OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") | |
| MODEL = "gpt-5-mini" | |
| # Will be set when app launches | |
| APP_URL = None | |
| class MCPClientManager: | |
| def __init__(self, server_url: str): | |
| self.server_url = server_url | |
| async def get_client(self): | |
| transport = StreamableHttpTransport(self.server_url) | |
| async with Client(transport) as client: | |
| yield client | |
| async def get_tools(self) -> list: | |
| async with self.get_client() as client: | |
| return await client.list_tools() | |
| async def call_tool(self, tool_name: str, arguments: dict) -> str: | |
| async with self.get_client() as client: | |
| result = await client.call_tool(tool_name, arguments) | |
| if hasattr(result, "content"): | |
| if isinstance(result.content, list): | |
| return "\n".join( | |
| str(item.text) if hasattr(item, "text") else str(item) | |
| for item in result.content | |
| ) | |
| return str(result.content) | |
| return str(result) | |
| def to_openai_tools(self, tools: list) -> list: | |
| return [ | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": tool.name, | |
| "description": tool.description or "", | |
| "parameters": { | |
| "type": "object", | |
| "properties": tool.inputSchema.get("properties", {}) | |
| if tool.inputSchema | |
| else {}, | |
| "required": tool.inputSchema.get("required", []) | |
| if tool.inputSchema | |
| else [], | |
| }, | |
| }, | |
| } | |
| for tool in tools | |
| ] | |
| mcp = MCPClientManager(MCP_SERVER_URL) | |
| openai_client = OpenAI(api_key=OPENAI_API_KEY) | |
| SYSTEM_PROMPT = """You are a helpful ML assistant with access to Auto Deployer tools. | |
| IMPORTANT: When calling tools with file_path parameter: | |
| - Use the provided file URL directly | |
| - Pass ONLY the raw URL (e.g., "https://...") | |
| - Never add prefixes like "Gradio File Input - " | |
| Always pass URLs directly without any prefix.""" | |
| async def chat(message: str, history: list, file_url: str): | |
| """Process chat with optional file URL""" | |
| tools = await mcp.get_tools() | |
| openai_tools = mcp.to_openai_tools(tools) | |
| messages = [{"role": "system", "content": SYSTEM_PROMPT}] | |
| # Add file context if available | |
| user_content = message | |
| if file_url: | |
| user_content = f"[Uploaded CSV file URL: {file_url}]\n\n{message}" | |
| # Build history | |
| for item in history: | |
| if isinstance(item, (list, tuple)) and len(item) == 2: | |
| user_msg, assistant_msg = item | |
| messages.append({"role": "user", "content": user_msg}) | |
| if assistant_msg: | |
| messages.append({"role": "assistant", "content": assistant_msg}) | |
| messages.append({"role": "user", "content": user_content}) | |
| # First call | |
| response = openai_client.chat.completions.create( | |
| model=MODEL, | |
| messages=messages, | |
| tools=openai_tools, | |
| tool_choice="auto", | |
| ) | |
| assistant_message = response.choices[0].message | |
| # Handle tool calls | |
| while assistant_message.tool_calls: | |
| messages.append(assistant_message) | |
| yield "🔧 Calling tools...\n\n" | |
| for tool_call in assistant_message.tool_calls: | |
| tool_name = tool_call.function.name | |
| arguments = json.loads(tool_call.function.arguments) | |
| # Clean file_path | |
| if "file_path" in arguments: | |
| fp = arguments["file_path"] | |
| if fp.startswith("Gradio File Input - "): | |
| arguments["file_path"] = fp.replace("Gradio File Input - ", "") | |
| yield f"⚙️ Running `{tool_name}`...\n\n" | |
| try: | |
| tool_result = await mcp.call_tool(tool_name, arguments) | |
| except Exception as e: | |
| tool_result = f"Error: {e}" | |
| messages.append( | |
| { | |
| "role": "tool", | |
| "tool_call_id": tool_call.id, | |
| "content": tool_result, | |
| } | |
| ) | |
| response = openai_client.chat.completions.create( | |
| model=MODEL, | |
| messages=messages, | |
| tools=openai_tools, | |
| tool_choice="auto", | |
| ) | |
| assistant_message = response.choices[0].message | |
| # Stream final response | |
| stream = openai_client.chat.completions.create( | |
| model=MODEL, | |
| messages=messages, | |
| stream=True, | |
| ) | |
| partial_response = "" | |
| for chunk in stream: | |
| if chunk.choices[0].delta.content: | |
| partial_response += chunk.choices[0].delta.content | |
| yield partial_response | |
| def handle_upload(file_obj, request: gr.Request): | |
| """ | |
| 1) Take uploaded file | |
| 2) Copy to /tmp for a stable path | |
| 3) Build a public gradio file URL | |
| """ | |
| if file_obj is None: | |
| return None, None | |
| # Local path where Gradio stored the file | |
| local_path = file_obj.name | |
| # Optional: stabilize path under /tmp | |
| stable_path = os.path.join("/tmp", os.path.basename(local_path)) | |
| try: | |
| shutil.copy(local_path, stable_path) | |
| local_path = stable_path | |
| except Exception: | |
| # If copy fails, use original path | |
| pass | |
| # Use Gradio's internal file URL format | |
| base_url = str(request.base_url).rstrip('/') | |
| public_url = f"{base_url}/gradio_api/file={local_path}" | |
| return public_url, public_url | |
| async def chat_send_stream(user_msg, history, file_url): | |
| """ | |
| Streaming chat function that yields updates including tool invocations. | |
| - history: list of message dictionaries with 'role' and 'content' keys | |
| - file_url: required HTTP URL to the uploaded file | |
| """ | |
| if history is None: | |
| history = [] | |
| # Ensure history is in proper dict format | |
| messages = [] | |
| for item in history: | |
| if isinstance(item, dict) and "role" in item and "content" in item: | |
| messages.append(item) | |
| elif isinstance(item, (list, tuple)) and len(item) == 2: | |
| user_msg_item, assistant_msg_item = item | |
| messages.append({"role": "user", "content": str(user_msg_item)}) | |
| if assistant_msg_item: | |
| messages.append({"role": "assistant", "content": str(assistant_msg_item)}) | |
| # Add current user message | |
| messages.append({"role": "user", "content": user_msg}) | |
| # Add thinking placeholder | |
| messages.append({"role": "assistant", "content": "🤔 Thinking..."}) | |
| # If no file, respond with error | |
| if not file_url: | |
| messages[-1] = {"role": "assistant", "content": "Upload a file first."} | |
| yield messages | |
| return | |
| # Use orchestrator if available | |
| if run_orchestrated_chat_stream: | |
| # Convert to tuple format for orchestrator (excluding current thinking message) | |
| history_tuples = [] | |
| for item in messages[:-1]: | |
| if item["role"] == "user": | |
| history_tuples.append((item.get("content", ""), "")) | |
| elif item["role"] == "assistant": | |
| if history_tuples: | |
| history_tuples[-1] = (history_tuples[-1][0], item.get("content", "")) | |
| # Stream the response using async generator | |
| async for chunk in run_orchestrated_chat_stream( | |
| user_msg, history_tuples, file_url | |
| ): | |
| chunk_type = chunk.get("type", "") | |
| chunk_content = chunk.get("content", "") | |
| if chunk_type == "thinking": | |
| messages[-1] = {"role": "assistant", "content": chunk_content} | |
| yield messages | |
| elif chunk_type == "tool": | |
| messages[-1] = {"role": "assistant", "content": messages[-1]["content"] + f"\n{chunk_content}"} | |
| yield messages | |
| elif chunk_type == "result": | |
| messages[-1] = {"role": "assistant", "content": messages[-1]["content"] + f"\n{chunk_content}"} | |
| yield messages | |
| elif chunk_type == "final": | |
| messages[-1] = {"role": "assistant", "content": chunk_content} | |
| yield messages | |
| elif chunk_type == "error": | |
| messages[-1] = {"role": "assistant", "content": chunk_content} | |
| yield messages | |
| else: | |
| # Fallback: use the existing chat function with streaming | |
| simple_history = [item for item in messages[:-1] if item["role"] in ["user", "assistant"]] | |
| response_text = "" | |
| async for chunk in chat(user_msg, simple_history, file_url): | |
| response_text = chunk | |
| messages[-1] = {"role": "assistant", "content": response_text} | |
| yield messages | |
| with gr.Blocks(title="MCP + GPT-5 mini - Streaming Chat") as demo: | |
| gr.Markdown( | |
| """ | |
| # AI-Driven MLOps Agent 🤖 | |
| - **Upload a CSV file** (required) | |
| - Real-time streaming with live tool invocations | |
| - Get intelligent insights, training, or deployment based on your needs | |
| """ | |
| ) | |
| uploader = gr.File( | |
| label="Required CSV file upload", | |
| file_count="single", | |
| type="filepath", | |
| file_types=[".csv"], # Restrict to CSV files only | |
| ) | |
| # Internal file URL storage (hidden from UI) | |
| file_url_state = gr.State(value=None) | |
| # Use message format for better streaming support | |
| chatbot = gr.Chatbot( | |
| label="Chat", | |
| avatar_images=( | |
| None, | |
| "https://huggingface.co/datasets/huggingface/brand-assets/resolve/main/hf-logo.png", | |
| ), | |
| ) | |
| msg = gr.Textbox(label="Message", interactive=True) | |
| send = gr.Button("Send", interactive=True) | |
| # When file changes, generate URL and update state | |
| uploader.change( | |
| handle_upload, | |
| inputs=[uploader], | |
| outputs=[file_url_state], | |
| ) | |
| # Send button (streaming) - update chatbot and clear input | |
| send.click( | |
| chat_send_stream, | |
| inputs=[msg, chatbot, file_url_state], | |
| outputs=[chatbot], | |
| ).then(lambda: "", outputs=[msg]) | |
| # Press Enter to send (streaming) - update chatbot and clear input | |
| msg.submit( | |
| chat_send_stream, | |
| inputs=[msg, chatbot, file_url_state], | |
| outputs=[chatbot], | |
| ).then(lambda: "", outputs=[msg]) | |
| async def test_mcp_connection(): | |
| """Test MCP connection on startup""" | |
| try: | |
| print("Testing MCP server connection...") | |
| tools = await mcp.get_tools() | |
| print(f"✅ Connected to MCP server. Found {len(tools)} tools.") | |
| return True | |
| except Exception as e: | |
| print(f"❌ Failed to connect to MCP server: {e}") | |
| return False | |
| if __name__ == "__main__": | |
| import asyncio | |
| import warnings | |
| # Suppress all warnings for cleaner output | |
| warnings.filterwarnings("ignore") | |
| # Test MCP connection on startup | |
| try: | |
| print(f"Attempting to connect to MCP server: {MCP_SERVER_URL}") | |
| asyncio.run(test_mcp_connection()) | |
| except Exception as e: | |
| print(f"MCP connection test failed: {e}") | |
| print("Continuing anyway - connection will be retried during chat...") | |
| # Launch the app | |
| demo.queue().launch( | |
| allowed_paths=["/tmp"], | |
| ssr_mode=False, | |
| show_error=True, | |
| quiet=True, | |
| ) | |