""" OmniMind Orchestrator - Main Gradio Application The World's First Self-Evolving Multi-Agent MCP Ecosystem Competition Entry for MCP's 1st Birthday Hackathon Track 2: MCP in Action (Enterprise Category) Sponsor Integrations: - Google Gemini: Multi-model routing with Gemini 2.0 Flash - Modal: Dynamic MCP deployment - LlamaIndex: Enterprise knowledge RAG - ElevenLabs: Voice-first interface - Blaxel: Agent visualization """ import os import sys import json import zipfile from pathlib import Path from typing import Dict, Any, Optional, Tuple, AsyncGenerator from datetime import datetime import gradio as gr import plotly.graph_objects as go import networkx as nx # Add project root to path sys.path.insert(0, str(Path(__file__).parent)) from core.model_router import router, TaskType from mcp_gen.generator import generator from deployments.modal_deployer import deployer from core.knowledge_engine import knowledge from ui.voice_interface import voice from dotenv import load_dotenv load_dotenv() # ============================================================================ # Helpers # ============================================================================ def to_jsonable(obj: Any) -> Any: """Recursively convert objects to JSON-serializable equivalents.""" if isinstance(obj, Path): return str(obj) if isinstance(obj, datetime): return obj.isoformat() if isinstance(obj, dict): return {k: to_jsonable(v) for k, v in obj.items()} if isinstance(obj, (list, tuple, set)): return [to_jsonable(v) for v in obj] return obj def create_download_zip(server_metadata: Dict[str, Any]) -> Optional[str]: """ Create a ZIP file of the generated MCP server for download. Returns: Path to the ZIP file as a string, or None if creation fails. """ try: server_dir = Path(server_metadata["directory"]) server_id = server_metadata["server_id"] zip_path = server_dir.parent / f"{server_id}.zip" zip_path.parent.mkdir(parents=True, exist_ok=True) with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zipf: for file_path in server_dir.rglob("*"): if file_path.is_file(): arcname = file_path.relative_to(server_dir.parent) zipf.write(file_path, arcname) print(f"[ZIP] Created MCP archive at {zip_path}") return str(zip_path) except Exception as e: print(f"[ERROR] Failed to create ZIP: {e}") return None def push_zip_to_space_repo(zip_path: Path) -> Optional[str]: """ Stub for Hub upload. We intentionally DO NOT commit to the Space repository from inside the running app, because that triggers an automatic redeploy and causes the UI to refresh mid-run. Workflow for the hackathon: - Use the **Download Generated MCP Server** button. - Then manually upload the ZIP to generated_mcps/ in the Files tab if you want it stored on the Hub. Returns: Always None (no automatic Hub URL). """ print( "[HF] Auto-upload to Space repo is disabled to avoid self-redeploy.\n" " Use the download button, then upload the ZIP manually to " "generated_mcps/ in the Files tab if you want it on the Hub." ) return None # ============================================================================ # Agent Visualization (Blaxel Integration) # ============================================================================ def create_agent_graph(agent_state: Dict[str, Any]) -> go.Figure: """ Create real-time agent decision graph using Plotly. """ G = nx.DiGraph() nodes = agent_state.get("nodes", []) edges = agent_state.get("edges", []) for node in nodes: G.add_node(node["id"], label=node["label"], type=node.get("type", "default")) for edge in edges: G.add_edge(edge["from"], edge["to"], label=edge.get("label", "")) pos = nx.spring_layout(G, k=2, iterations=50) edge_x, edge_y = [], [] for e in G.edges(): x0, y0 = pos[e[0]] x1, y1 = pos[e[1]] edge_x.extend([x0, x1, None]) edge_y.extend([y0, y1, None]) edge_trace = go.Scatter( x=edge_x, y=edge_y, line=dict(width=2, color="#888"), hoverinfo="none", mode="lines", ) node_x, node_y, node_text, node_colors = [], [], [], [] color_map = { "planning": "#3B82F6", "generating": "#10B981", "deploying": "#F59E0B", "executing": "#8B5CF6", "completed": "#6B7280", } for n in G.nodes(): x, y = pos[n] node_x.append(x) node_y.append(y) node_text.append(G.nodes[n].get("label", n)) node_type = G.nodes[n].get("type", "default") node_colors.append(color_map.get(node_type, "#6B7280")) node_trace = go.Scatter( x=node_x, y=node_y, mode="markers+text", hoverinfo="text", text=node_text, textposition="top center", marker=dict(size=30, color=node_colors, line=dict(width=2, color="white")), ) fig = go.Figure( data=[edge_trace, node_trace], layout=go.Layout( title=dict(text="🧠 Agent Decision Graph (Real-Time)", font=dict(size=16)), showlegend=False, hovermode="closest", margin=dict(b=0, l=0, r=0, t=40), xaxis=dict(showgrid=False, zeroline=False, showticklabels=False), yaxis=dict(showgrid=False, zeroline=False, showticklabels=False), plot_bgcolor="rgba(0,0,0,0)", height=400, ), ) return fig # ============================================================================ # Core Agent Orchestration # ============================================================================ async def orchestrate_task( user_request: str, use_voice: bool = False, use_knowledge_base: bool = False, ) -> AsyncGenerator[Tuple[str, Optional[go.Figure], Dict[str, Any], Optional[str]], None]: """ Main orchestration function - the brain of OmniMind. Yields: (status_text, agent_graph, metadata, zip_path_for_download) """ output = "# šŸ¤– OmniMind Orchestrator\n\n" output += f"**Request:** {user_request}\n\n" output += "---\n\n" agent_state = { "nodes": [{"id": "start", "label": "User Request", "type": "planning"}], "edges": [], } yield (output, create_agent_graph(agent_state), {}, None) # Step 1: Analyze request output += "## 🧠 Step 1: Analyzing Request\n\n" yield (output, create_agent_graph(agent_state), {}, None) analysis_prompt = f"""Analyze this user request and determine what needs to be done: Request: {user_request} Determine: 1. Can this be done with existing general capabilities? (yes/no) 2. Do we need to generate a custom MCP server? (yes/no) 3. If yes, what should the MCP do? 4. What data sources or APIs are needed? Respond in JSON: {{ "needs_custom_mcp": true/false, "mcp_description": "what the MCP should do", "complexity": "simple|medium|complex", "estimated_tools_needed": 2, "approach": "high-level approach to solve this" }} """ analysis = await router.generate( analysis_prompt, task_type=TaskType.PLANNING, temperature=0.3, ) try: analysis_data = json.loads(analysis["response"]) except Exception: analysis_data = { "needs_custom_mcp": True, "mcp_description": user_request, "complexity": "medium", "estimated_tools_needed": 1, "approach": "Generate custom MCP for this task", } output += f"**Analysis:** {analysis_data['approach']}\n\n" output += f"**Needs Custom MCP:** {analysis_data['needs_custom_mcp']}\n\n" agent_state["nodes"].append( {"id": "analyze", "label": "Analysis", "type": "completed"} ) agent_state["edges"].append({"from": "start", "to": "analyze"}) yield (output, create_agent_graph(agent_state), to_jsonable(analysis_data), None) # Step 2: Knowledge base context = None if use_knowledge_base: output += "## šŸ“š Step 2: Querying Knowledge Base\n\n" agent_state["nodes"].append( {"id": "knowledge", "label": "Knowledge", "type": "executing"} ) agent_state["edges"].append({"from": "analyze", "to": "knowledge"}) yield (output, create_agent_graph(agent_state), {}, None) context = await knowledge.get_context_for_mcp_generation(user_request) if context: output += f"**Found relevant context:** {context[:200]}...\n\n" else: output += "**No relevant context found**\n\n" agent_state["nodes"][-1]["type"] = "completed" yield ( output, create_agent_graph(agent_state), {"has_context": bool(context)}, None, ) # Step 3: Generate MCP server_metadata: Optional[Dict[str, Any]] = None zip_path: Optional[str] = None if analysis_data.get("needs_custom_mcp", False): output += "## āš™ļø Step 3: Generating Custom MCP Server\n\n" agent_state["nodes"].append( {"id": "generate", "label": "Generate MCP", "type": "generating"} ) agent_state["edges"].append({"from": "analyze", "to": "generate"}) yield (output, create_agent_graph(agent_state), {}, None) output += f"**Task:** {analysis_data['mcp_description']}\n\n" output += "šŸ”Ø Using Claude Sonnet for code generation...\n\n" server_metadata = await generator.generate_mcp_server( task_description=analysis_data["mcp_description"], context={"user_context": context} if context else None, ) output += f"āœ… **Generated:** {server_metadata['server_name']}\n" output += ( f"**Tools:** {', '.join([t['name'] for t in server_metadata['tools']])}\n" ) output += f"**Location:** `{server_metadata['directory']}`\n\n" # Code preview output += "### šŸ“„ Generated Code Preview\n\n```python\n" try: app_file = server_metadata["files"]["app"] with open(app_file, "r", encoding="utf-8") as f: lines = f.readlines()[:30] output += "".join(lines) if len(lines) >= 30: output += "\n... (truncated - full code saved locally)\n" except Exception as e: output += f"# Code preview unavailable: {e}\n" output += "```\n\n" output += f"**Files saved to:** `{server_metadata['directory']}`\n\n" # ZIP + (disabled) Hub upload zip_path = create_download_zip(server_metadata) if zip_path: server_metadata["zip_path"] = zip_path output += "šŸ“¦ **Download button updated below!**\n\n" hub_url = push_zip_to_space_repo(Path(zip_path)) if hub_url: server_metadata["hub_url"] = hub_url output += f"šŸ”— **Saved to Hub:** {hub_url}\n\n" else: output += ( "ā„¹ļø Auto-upload to the Hub repo is disabled.\n" " Use the download button, then upload the ZIP manually\n" " to `generated_mcps/` in the Files tab if you want it stored.\n\n" ) agent_state["nodes"][-1]["type"] = "completed" yield ( output, create_agent_graph(agent_state), to_jsonable(server_metadata), zip_path, ) # Step 4: Deploy to Modal output += "## šŸš€ Step 4: Deploying to Modal\n\n" agent_state["nodes"].append( {"id": "deploy", "label": "Deploy", "type": "deploying"} ) agent_state["edges"].append({"from": "generate", "to": "deploy"}) yield (output, create_agent_graph(agent_state), {}, zip_path) deployment = await deployer.deploy_mcp_server(server_metadata) if deployment.get("simulated"): output += ( "āš ļø **Simulated deployment** (configure MODAL_TOKEN for real deployment)\n" ) if deployment.get("status") == "failed": output += ( f"āš ļø **Deployment skipped:** " f"{deployment.get('error', 'Unknown error')}\n\n" ) else: output += f"**URL:** {deployment.get('modal_url', 'N/A')}\n" output += f"**Status:** {deployment.get('status', 'unknown')}\n\n" agent_state["nodes"][-1]["type"] = "completed" yield ( output, create_agent_graph(agent_state), to_jsonable(deployment), zip_path, ) # Step 5: Final response output += "## ✨ Step 5: Generating Response\n\n" agent_state["nodes"].append( {"id": "respond", "label": "Response", "type": "executing"} ) if server_metadata: agent_state["edges"].append({"from": "deploy", "to": "respond"}) else: agent_state["edges"].append({"from": "analyze", "to": "respond"}) yield (output, create_agent_graph(agent_state), {}, zip_path) response_prompt = f"""Based on the work done, provide a clear, professional response to the user. Original request: {user_request} What was done: {json.dumps(analysis_data, indent=2)} {f"Generated MCP: {server_metadata['server_name']}" if server_metadata else "No custom MCP needed"} Provide a helpful response explaining what was accomplished and how the user can use it. """ final_response = await router.generate( response_prompt, task_type=TaskType.REASONING, temperature=0.7, ) output += final_response["response"] + "\n\n" agent_state["nodes"][-1]["type"] = "completed" yield (output, create_agent_graph(agent_state), {}, zip_path) if use_voice and voice.client: output += "\nšŸ”Š **Generating voice response...**\n" yield (output, create_agent_graph(agent_state), {}, zip_path) output += "\n---\n\n" output += "**Model Usage:**\n" stats = router.get_usage_stats() output += f"- Total Requests: {stats['total_requests']}\n" output += f"- Total Cost: ${stats['total_cost']}\n" output += f"- Claude: {stats['by_model']['claude']['requests']}\n" output += f"- Gemini: {stats['by_model']['gemini']['requests']}\n" output += f"- GPT-4: {stats['by_model']['gpt4']['requests']}\n" yield (output, create_agent_graph(agent_state), to_jsonable(stats), zip_path) # ============================================================================ # Gradio UI # ============================================================================ def build_ui() -> gr.Blocks: """Build the Gradio interface.""" custom_css = """ .gradio-container { font-family: 'Inter', -apple-system, BlinkMacSystemFont, sans-serif; } .main-header { text-align: center; padding: 2rem 0; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; border-radius: 10px; margin-bottom: 2rem; } """ with gr.Blocks(title="OmniMind Orchestrator - MCP Hackathon") as app: gr.HTML(f"") gr.HTML( """

🧠 OmniMind Orchestrator

The World's First Self-Evolving Multi-Agent MCP Ecosystem

Track 2 Submission - MCP's 1st Birthday Hackathon

""" ) with gr.Row(): with gr.Column(scale=1): gr.Markdown( """ ### šŸŽÆ What is OmniMind? OmniMind is the **first AI agent that creates other AI agents**. It: 1. 🧠 Analyzes your request 2. āš™ļø Generates custom MCP servers 3. šŸš€ Deploys them to Modal 4. āœ… Executes your task """ ) user_input = gr.Textbox( label="What do you need?", placeholder="Example: Create a tool that monitors my competitor's pricing every hour", lines=3, ) with gr.Row(): use_voice = gr.Checkbox(label="šŸ”Š Voice Output", value=False) use_kb = gr.Checkbox(label="šŸ“š Use Knowledge Base", value=False) submit_btn = gr.Button( "šŸš€ Let OmniMind Handle It", variant="primary" ) gr.Markdown( """ ### šŸ’” Try These Examples: - "Create a tool that scrapes product prices from Amazon" - "Build an API integration for Salesforce" - "Generate a data analyzer for CSV files" - "Make a tool that monitors website uptime" """ ) with gr.Column(scale=2): output_md = gr.Markdown( value="**Results will appear here**", label="Agent Output" ) agent_graph = gr.Plot(label="🧠 Agent Brain (Real-Time)") download_file = gr.File( label="šŸ“¦ Download Generated MCP Server", visible=False ) with gr.Accordion("šŸ“Š Detailed Metadata", open=False): metadata_json = gr.JSON(label="Execution Metadata") with gr.Row(): with gr.Column(): gr.Markdown( """ ### šŸ† Sponsor Integrations - **Anthropic Claude**: Core reasoning engine - **Google Gemini**: Multimodal capabilities - **OpenAI GPT-4**: Planning and routing - **Modal**: Serverless MCP deployment - **LlamaIndex**: Enterprise knowledge RAG - **ElevenLabs**: Voice interface - **Blaxel**: Agent visualization """ ) with gr.Column(): gr.Markdown( """ ### ✨ Innovation Highlights 1. **Self-Evolving Agent** – creates its own tools 2. **Multi-Model Intelligence** – best model for each task 3. **Infinite Extensibility** – not limited by fixed tool sets 4. **Enterprise-Ready** – clean, production-grade architecture 5. **Voice-First UX** – ideal for executives and operators """ ) with gr.Accordion("ā„¹ļø About This Project", open=False): gr.Markdown( """ ## OmniMind Orchestrator Track 2: MCP in Action (Enterprise Category) This project demonstrates an agent that **generates and deploys its own MCP servers on-demand** using Anthropic, OpenAI, Gemini, Modal, LlamaIndex, ElevenLabs and more. """ ) async def handle_submit(request, voice_enabled, kb_enabled): async for out_text, graph, meta, zip_path in orchestrate_task( request, voice_enabled, kb_enabled ): if zip_path: yield ( out_text, graph, meta, gr.update(value=zip_path, visible=True), ) else: yield ( out_text, graph, meta, gr.update(value=None, visible=False), ) submit_btn.click( fn=handle_submit, inputs=[user_input, use_voice, use_kb], outputs=[output_md, agent_graph, metadata_json, download_file], ) gr.Markdown( """ ---
šŸŽ‰ Built for MCP's 1st Birthday Hackathon | Hosted by Anthropic & Gradio
""" ) return app # ============================================================================ # Main Execution # ============================================================================ if __name__ == "__main__": print("=" * 60) print("[AI] OmniMind Orchestrator") print("=" * 60) print() print("[START] Starting Gradio application...") print() required_keys = { "ANTHROPIC_API_KEY": "Claude Sonnet (required)", "OPENAI_API_KEY": "GPT-4 & embeddings (required)", "GOOGLE_API_KEY": "Gemini 2.0 (for $10K prize)", } optional_keys = { "MODAL_TOKEN": "Modal deployment ($2.5K prize)", "ELEVENLABS_API_KEY": "Voice interface ($2K + AirPods)", "LLAMAINDEX_API_KEY": "LlamaIndex cloud ($1K prize)", } print("[OK] Required API Keys:") for key, desc in required_keys.items(): status = "[CHECK]" if os.getenv(key) else "[X]" print(f" {status} {key} - {desc}") print() print("[BONUS] Optional API Keys (for bonus prizes):") for key, desc in optional_keys.items(): status = "[CHECK]" if os.getenv(key) else "[O]" print(f" {status} {key} - {desc}") print() print("=" * 60) print() app = build_ui() app.queue() app.launch( server_name="0.0.0.0", server_port=7860, share=False, show_error=True, )