š§ OmniMind Orchestrator
The World's First Self-Evolving Multi-Agent MCP Ecosystem
Track 2 Submission - MCP's 1st Birthday Hackathon
""" OmniMind Orchestrator - Main Gradio Application The World's First Self-Evolving Multi-Agent MCP Ecosystem Competition Entry for MCP's 1st Birthday Hackathon Track 2: MCP in Action (Enterprise Category) Sponsor Integrations: - Google Gemini: Multi-model routing with Gemini 2.0 Flash - Modal: Dynamic MCP deployment - LlamaIndex: Enterprise knowledge RAG - ElevenLabs: Voice-first interface - Blaxel: Agent visualization """ import os import sys import json import zipfile from pathlib import Path from typing import Dict, Any, Optional, Tuple, AsyncGenerator from datetime import datetime import gradio as gr import plotly.graph_objects as go import networkx as nx # Add project root to path sys.path.insert(0, str(Path(__file__).parent)) from core.model_router import router, TaskType from mcp_gen.generator import generator from deployments.modal_deployer import deployer from core.knowledge_engine import knowledge from ui.voice_interface import voice from dotenv import load_dotenv load_dotenv() # ============================================================================ # Helpers # ============================================================================ def to_jsonable(obj: Any) -> Any: """Recursively convert objects to JSON-serializable equivalents.""" if isinstance(obj, Path): return str(obj) if isinstance(obj, datetime): return obj.isoformat() if isinstance(obj, dict): return {k: to_jsonable(v) for k, v in obj.items()} if isinstance(obj, (list, tuple, set)): return [to_jsonable(v) for v in obj] return obj def create_download_zip(server_metadata: Dict[str, Any]) -> Optional[str]: """ Create a ZIP file of the generated MCP server for download. Returns: Path to the ZIP file as a string, or None if creation fails. """ try: server_dir = Path(server_metadata["directory"]) server_id = server_metadata["server_id"] zip_path = server_dir.parent / f"{server_id}.zip" zip_path.parent.mkdir(parents=True, exist_ok=True) with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zipf: for file_path in server_dir.rglob("*"): if file_path.is_file(): arcname = file_path.relative_to(server_dir.parent) zipf.write(file_path, arcname) print(f"[ZIP] Created MCP archive at {zip_path}") return str(zip_path) except Exception as e: print(f"[ERROR] Failed to create ZIP: {e}") return None def push_zip_to_space_repo(zip_path: Path) -> Optional[str]: """ Stub for Hub upload. We intentionally DO NOT commit to the Space repository from inside the running app, because that triggers an automatic redeploy and causes the UI to refresh mid-run. Workflow for the hackathon: - Use the **Download Generated MCP Server** button. - Then manually upload the ZIP to generated_mcps/ in the Files tab if you want it stored on the Hub. Returns: Always None (no automatic Hub URL). """ print( "[HF] Auto-upload to Space repo is disabled to avoid self-redeploy.\n" " Use the download button, then upload the ZIP manually to " "generated_mcps/ in the Files tab if you want it on the Hub." ) return None # ============================================================================ # Agent Visualization (Blaxel Integration) # ============================================================================ def create_agent_graph(agent_state: Dict[str, Any]) -> go.Figure: """ Create real-time agent decision graph using Plotly. """ G = nx.DiGraph() nodes = agent_state.get("nodes", []) edges = agent_state.get("edges", []) for node in nodes: G.add_node(node["id"], label=node["label"], type=node.get("type", "default")) for edge in edges: G.add_edge(edge["from"], edge["to"], label=edge.get("label", "")) pos = nx.spring_layout(G, k=2, iterations=50) edge_x, edge_y = [], [] for e in G.edges(): x0, y0 = pos[e[0]] x1, y1 = pos[e[1]] edge_x.extend([x0, x1, None]) edge_y.extend([y0, y1, None]) edge_trace = go.Scatter( x=edge_x, y=edge_y, line=dict(width=2, color="#888"), hoverinfo="none", mode="lines", ) node_x, node_y, node_text, node_colors = [], [], [], [] color_map = { "planning": "#3B82F6", "generating": "#10B981", "deploying": "#F59E0B", "executing": "#8B5CF6", "completed": "#6B7280", } for n in G.nodes(): x, y = pos[n] node_x.append(x) node_y.append(y) node_text.append(G.nodes[n].get("label", n)) node_type = G.nodes[n].get("type", "default") node_colors.append(color_map.get(node_type, "#6B7280")) node_trace = go.Scatter( x=node_x, y=node_y, mode="markers+text", hoverinfo="text", text=node_text, textposition="top center", marker=dict(size=30, color=node_colors, line=dict(width=2, color="white")), ) fig = go.Figure( data=[edge_trace, node_trace], layout=go.Layout( title=dict(text="š§ Agent Decision Graph (Real-Time)", font=dict(size=16)), showlegend=False, hovermode="closest", margin=dict(b=0, l=0, r=0, t=40), xaxis=dict(showgrid=False, zeroline=False, showticklabels=False), yaxis=dict(showgrid=False, zeroline=False, showticklabels=False), plot_bgcolor="rgba(0,0,0,0)", height=400, ), ) return fig # ============================================================================ # Core Agent Orchestration # ============================================================================ async def orchestrate_task( user_request: str, use_voice: bool = False, use_knowledge_base: bool = False, ) -> AsyncGenerator[Tuple[str, Optional[go.Figure], Dict[str, Any], Optional[str]], None]: """ Main orchestration function - the brain of OmniMind. Yields: (status_text, agent_graph, metadata, zip_path_for_download) """ output = "# š¤ OmniMind Orchestrator\n\n" output += f"**Request:** {user_request}\n\n" output += "---\n\n" agent_state = { "nodes": [{"id": "start", "label": "User Request", "type": "planning"}], "edges": [], } yield (output, create_agent_graph(agent_state), {}, None) # Step 1: Analyze request output += "## š§ Step 1: Analyzing Request\n\n" yield (output, create_agent_graph(agent_state), {}, None) analysis_prompt = f"""Analyze this user request and determine what needs to be done: Request: {user_request} Determine: 1. Can this be done with existing general capabilities? (yes/no) 2. Do we need to generate a custom MCP server? (yes/no) 3. If yes, what should the MCP do? 4. What data sources or APIs are needed? Respond in JSON: {{ "needs_custom_mcp": true/false, "mcp_description": "what the MCP should do", "complexity": "simple|medium|complex", "estimated_tools_needed": 2, "approach": "high-level approach to solve this" }} """ analysis = await router.generate( analysis_prompt, task_type=TaskType.PLANNING, temperature=0.3, ) try: analysis_data = json.loads(analysis["response"]) except Exception: analysis_data = { "needs_custom_mcp": True, "mcp_description": user_request, "complexity": "medium", "estimated_tools_needed": 1, "approach": "Generate custom MCP for this task", } output += f"**Analysis:** {analysis_data['approach']}\n\n" output += f"**Needs Custom MCP:** {analysis_data['needs_custom_mcp']}\n\n" agent_state["nodes"].append( {"id": "analyze", "label": "Analysis", "type": "completed"} ) agent_state["edges"].append({"from": "start", "to": "analyze"}) yield (output, create_agent_graph(agent_state), to_jsonable(analysis_data), None) # Step 2: Knowledge base context = None if use_knowledge_base: output += "## š Step 2: Querying Knowledge Base\n\n" agent_state["nodes"].append( {"id": "knowledge", "label": "Knowledge", "type": "executing"} ) agent_state["edges"].append({"from": "analyze", "to": "knowledge"}) yield (output, create_agent_graph(agent_state), {}, None) context = await knowledge.get_context_for_mcp_generation(user_request) if context: output += f"**Found relevant context:** {context[:200]}...\n\n" else: output += "**No relevant context found**\n\n" agent_state["nodes"][-1]["type"] = "completed" yield ( output, create_agent_graph(agent_state), {"has_context": bool(context)}, None, ) # Step 3: Generate MCP server_metadata: Optional[Dict[str, Any]] = None zip_path: Optional[str] = None if analysis_data.get("needs_custom_mcp", False): output += "## āļø Step 3: Generating Custom MCP Server\n\n" agent_state["nodes"].append( {"id": "generate", "label": "Generate MCP", "type": "generating"} ) agent_state["edges"].append({"from": "analyze", "to": "generate"}) yield (output, create_agent_graph(agent_state), {}, None) output += f"**Task:** {analysis_data['mcp_description']}\n\n" output += "šØ Using Claude Sonnet for code generation...\n\n" server_metadata = await generator.generate_mcp_server( task_description=analysis_data["mcp_description"], context={"user_context": context} if context else None, ) output += f"ā **Generated:** {server_metadata['server_name']}\n" output += ( f"**Tools:** {', '.join([t['name'] for t in server_metadata['tools']])}\n" ) output += f"**Location:** `{server_metadata['directory']}`\n\n" # Code preview output += "### š Generated Code Preview\n\n```python\n" try: app_file = server_metadata["files"]["app"] with open(app_file, "r", encoding="utf-8") as f: lines = f.readlines()[:30] output += "".join(lines) if len(lines) >= 30: output += "\n... (truncated - full code saved locally)\n" except Exception as e: output += f"# Code preview unavailable: {e}\n" output += "```\n\n" output += f"**Files saved to:** `{server_metadata['directory']}`\n\n" # ZIP + (disabled) Hub upload zip_path = create_download_zip(server_metadata) if zip_path: server_metadata["zip_path"] = zip_path output += "š¦ **Download button updated below!**\n\n" hub_url = push_zip_to_space_repo(Path(zip_path)) if hub_url: server_metadata["hub_url"] = hub_url output += f"š **Saved to Hub:** {hub_url}\n\n" else: output += ( "ā¹ļø Auto-upload to the Hub repo is disabled.\n" " Use the download button, then upload the ZIP manually\n" " to `generated_mcps/` in the Files tab if you want it stored.\n\n" ) agent_state["nodes"][-1]["type"] = "completed" yield ( output, create_agent_graph(agent_state), to_jsonable(server_metadata), zip_path, ) # Step 4: Deploy to Modal output += "## š Step 4: Deploying to Modal\n\n" agent_state["nodes"].append( {"id": "deploy", "label": "Deploy", "type": "deploying"} ) agent_state["edges"].append({"from": "generate", "to": "deploy"}) yield (output, create_agent_graph(agent_state), {}, zip_path) deployment = await deployer.deploy_mcp_server(server_metadata) if deployment.get("simulated"): output += ( "ā ļø **Simulated deployment** (configure MODAL_TOKEN for real deployment)\n" ) if deployment.get("status") == "failed": output += ( f"ā ļø **Deployment skipped:** " f"{deployment.get('error', 'Unknown error')}\n\n" ) else: output += f"**URL:** {deployment.get('modal_url', 'N/A')}\n" output += f"**Status:** {deployment.get('status', 'unknown')}\n\n" agent_state["nodes"][-1]["type"] = "completed" yield ( output, create_agent_graph(agent_state), to_jsonable(deployment), zip_path, ) # Step 5: Final response output += "## ⨠Step 5: Generating Response\n\n" agent_state["nodes"].append( {"id": "respond", "label": "Response", "type": "executing"} ) if server_metadata: agent_state["edges"].append({"from": "deploy", "to": "respond"}) else: agent_state["edges"].append({"from": "analyze", "to": "respond"}) yield (output, create_agent_graph(agent_state), {}, zip_path) response_prompt = f"""Based on the work done, provide a clear, professional response to the user. Original request: {user_request} What was done: {json.dumps(analysis_data, indent=2)} {f"Generated MCP: {server_metadata['server_name']}" if server_metadata else "No custom MCP needed"} Provide a helpful response explaining what was accomplished and how the user can use it. """ final_response = await router.generate( response_prompt, task_type=TaskType.REASONING, temperature=0.7, ) output += final_response["response"] + "\n\n" agent_state["nodes"][-1]["type"] = "completed" yield (output, create_agent_graph(agent_state), {}, zip_path) if use_voice and voice.client: output += "\nš **Generating voice response...**\n" yield (output, create_agent_graph(agent_state), {}, zip_path) output += "\n---\n\n" output += "**Model Usage:**\n" stats = router.get_usage_stats() output += f"- Total Requests: {stats['total_requests']}\n" output += f"- Total Cost: ${stats['total_cost']}\n" output += f"- Claude: {stats['by_model']['claude']['requests']}\n" output += f"- Gemini: {stats['by_model']['gemini']['requests']}\n" output += f"- GPT-4: {stats['by_model']['gpt4']['requests']}\n" yield (output, create_agent_graph(agent_state), to_jsonable(stats), zip_path) # ============================================================================ # Gradio UI # ============================================================================ def build_ui() -> gr.Blocks: """Build the Gradio interface.""" custom_css = """ .gradio-container { font-family: 'Inter', -apple-system, BlinkMacSystemFont, sans-serif; } .main-header { text-align: center; padding: 2rem 0; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; border-radius: 10px; margin-bottom: 2rem; } """ with gr.Blocks(title="OmniMind Orchestrator - MCP Hackathon") as app: gr.HTML(f"") gr.HTML( """
The World's First Self-Evolving Multi-Agent MCP Ecosystem
Track 2 Submission - MCP's 1st Birthday Hackathon