mgbam's picture
Update app.py
d620032 verified
"""
OmniMind Orchestrator - Main Gradio Application
The World's First Self-Evolving Multi-Agent MCP Ecosystem
Competition Entry for MCP's 1st Birthday Hackathon
Track 2: MCP in Action (Enterprise Category)
Sponsor Integrations:
- Google Gemini: Multi-model routing with Gemini 2.0 Flash
- Modal: Dynamic MCP deployment
- LlamaIndex: Enterprise knowledge RAG
- ElevenLabs: Voice-first interface
- Blaxel: Agent visualization
"""
import os
import sys
import json
import zipfile
from pathlib import Path
from typing import Dict, Any, Optional, Tuple, AsyncGenerator
from datetime import datetime
import gradio as gr
import plotly.graph_objects as go
import networkx as nx
# Add project root to path
sys.path.insert(0, str(Path(__file__).parent))
from core.model_router import router, TaskType
from mcp_gen.generator import generator
from deployments.modal_deployer import deployer
from core.knowledge_engine import knowledge
from ui.voice_interface import voice
from dotenv import load_dotenv
load_dotenv()
# ============================================================================
# Helpers
# ============================================================================
def to_jsonable(obj: Any) -> Any:
"""Recursively convert objects to JSON-serializable equivalents."""
if isinstance(obj, Path):
return str(obj)
if isinstance(obj, datetime):
return obj.isoformat()
if isinstance(obj, dict):
return {k: to_jsonable(v) for k, v in obj.items()}
if isinstance(obj, (list, tuple, set)):
return [to_jsonable(v) for v in obj]
return obj
def create_download_zip(server_metadata: Dict[str, Any]) -> Optional[str]:
"""
Create a ZIP file of the generated MCP server for download.
Returns:
Path to the ZIP file as a string, or None if creation fails.
"""
try:
server_dir = Path(server_metadata["directory"])
server_id = server_metadata["server_id"]
zip_path = server_dir.parent / f"{server_id}.zip"
zip_path.parent.mkdir(parents=True, exist_ok=True)
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zipf:
for file_path in server_dir.rglob("*"):
if file_path.is_file():
arcname = file_path.relative_to(server_dir.parent)
zipf.write(file_path, arcname)
print(f"[ZIP] Created MCP archive at {zip_path}")
return str(zip_path)
except Exception as e:
print(f"[ERROR] Failed to create ZIP: {e}")
return None
def push_zip_to_space_repo(zip_path: Path) -> Optional[str]:
"""
Stub for Hub upload.
We intentionally DO NOT commit to the Space repository from inside
the running app, because that triggers an automatic redeploy and
causes the UI to refresh mid-run.
Workflow for the hackathon:
- Use the **Download Generated MCP Server** button.
- Then manually upload the ZIP to generated_mcps/ in the Files tab
if you want it stored on the Hub.
Returns:
Always None (no automatic Hub URL).
"""
print(
"[HF] Auto-upload to Space repo is disabled to avoid self-redeploy.\n"
" Use the download button, then upload the ZIP manually to "
"generated_mcps/ in the Files tab if you want it on the Hub."
)
return None
# ============================================================================
# Agent Visualization (Blaxel Integration)
# ============================================================================
def create_agent_graph(agent_state: Dict[str, Any]) -> go.Figure:
"""
Create real-time agent decision graph using Plotly.
"""
G = nx.DiGraph()
nodes = agent_state.get("nodes", [])
edges = agent_state.get("edges", [])
for node in nodes:
G.add_node(node["id"], label=node["label"], type=node.get("type", "default"))
for edge in edges:
G.add_edge(edge["from"], edge["to"], label=edge.get("label", ""))
pos = nx.spring_layout(G, k=2, iterations=50)
edge_x, edge_y = [], []
for e in G.edges():
x0, y0 = pos[e[0]]
x1, y1 = pos[e[1]]
edge_x.extend([x0, x1, None])
edge_y.extend([y0, y1, None])
edge_trace = go.Scatter(
x=edge_x,
y=edge_y,
line=dict(width=2, color="#888"),
hoverinfo="none",
mode="lines",
)
node_x, node_y, node_text, node_colors = [], [], [], []
color_map = {
"planning": "#3B82F6",
"generating": "#10B981",
"deploying": "#F59E0B",
"executing": "#8B5CF6",
"completed": "#6B7280",
}
for n in G.nodes():
x, y = pos[n]
node_x.append(x)
node_y.append(y)
node_text.append(G.nodes[n].get("label", n))
node_type = G.nodes[n].get("type", "default")
node_colors.append(color_map.get(node_type, "#6B7280"))
node_trace = go.Scatter(
x=node_x,
y=node_y,
mode="markers+text",
hoverinfo="text",
text=node_text,
textposition="top center",
marker=dict(size=30, color=node_colors, line=dict(width=2, color="white")),
)
fig = go.Figure(
data=[edge_trace, node_trace],
layout=go.Layout(
title=dict(text="🧠 Agent Decision Graph (Real-Time)", font=dict(size=16)),
showlegend=False,
hovermode="closest",
margin=dict(b=0, l=0, r=0, t=40),
xaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
yaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
plot_bgcolor="rgba(0,0,0,0)",
height=400,
),
)
return fig
# ============================================================================
# Core Agent Orchestration
# ============================================================================
async def orchestrate_task(
user_request: str,
use_voice: bool = False,
use_knowledge_base: bool = False,
) -> AsyncGenerator[Tuple[str, Optional[go.Figure], Dict[str, Any], Optional[str]], None]:
"""
Main orchestration function - the brain of OmniMind.
Yields:
(status_text, agent_graph, metadata, zip_path_for_download)
"""
output = "# πŸ€– OmniMind Orchestrator\n\n"
output += f"**Request:** {user_request}\n\n"
output += "---\n\n"
agent_state = {
"nodes": [{"id": "start", "label": "User Request", "type": "planning"}],
"edges": [],
}
yield (output, create_agent_graph(agent_state), {}, None)
# Step 1: Analyze request
output += "## 🧠 Step 1: Analyzing Request\n\n"
yield (output, create_agent_graph(agent_state), {}, None)
analysis_prompt = f"""Analyze this user request and determine what needs to be done:
Request: {user_request}
Determine:
1. Can this be done with existing general capabilities? (yes/no)
2. Do we need to generate a custom MCP server? (yes/no)
3. If yes, what should the MCP do?
4. What data sources or APIs are needed?
Respond in JSON:
{{
"needs_custom_mcp": true/false,
"mcp_description": "what the MCP should do",
"complexity": "simple|medium|complex",
"estimated_tools_needed": 2,
"approach": "high-level approach to solve this"
}}
"""
analysis = await router.generate(
analysis_prompt,
task_type=TaskType.PLANNING,
temperature=0.3,
)
try:
analysis_data = json.loads(analysis["response"])
except Exception:
analysis_data = {
"needs_custom_mcp": True,
"mcp_description": user_request,
"complexity": "medium",
"estimated_tools_needed": 1,
"approach": "Generate custom MCP for this task",
}
output += f"**Analysis:** {analysis_data['approach']}\n\n"
output += f"**Needs Custom MCP:** {analysis_data['needs_custom_mcp']}\n\n"
agent_state["nodes"].append(
{"id": "analyze", "label": "Analysis", "type": "completed"}
)
agent_state["edges"].append({"from": "start", "to": "analyze"})
yield (output, create_agent_graph(agent_state), to_jsonable(analysis_data), None)
# Step 2: Knowledge base
context = None
if use_knowledge_base:
output += "## πŸ“š Step 2: Querying Knowledge Base\n\n"
agent_state["nodes"].append(
{"id": "knowledge", "label": "Knowledge", "type": "executing"}
)
agent_state["edges"].append({"from": "analyze", "to": "knowledge"})
yield (output, create_agent_graph(agent_state), {}, None)
context = await knowledge.get_context_for_mcp_generation(user_request)
if context:
output += f"**Found relevant context:** {context[:200]}...\n\n"
else:
output += "**No relevant context found**\n\n"
agent_state["nodes"][-1]["type"] = "completed"
yield (
output,
create_agent_graph(agent_state),
{"has_context": bool(context)},
None,
)
# Step 3: Generate MCP
server_metadata: Optional[Dict[str, Any]] = None
zip_path: Optional[str] = None
if analysis_data.get("needs_custom_mcp", False):
output += "## βš™οΈ Step 3: Generating Custom MCP Server\n\n"
agent_state["nodes"].append(
{"id": "generate", "label": "Generate MCP", "type": "generating"}
)
agent_state["edges"].append({"from": "analyze", "to": "generate"})
yield (output, create_agent_graph(agent_state), {}, None)
output += f"**Task:** {analysis_data['mcp_description']}\n\n"
output += "πŸ”¨ Using Claude Sonnet for code generation...\n\n"
server_metadata = await generator.generate_mcp_server(
task_description=analysis_data["mcp_description"],
context={"user_context": context} if context else None,
)
output += f"βœ… **Generated:** {server_metadata['server_name']}\n"
output += (
f"**Tools:** {', '.join([t['name'] for t in server_metadata['tools']])}\n"
)
output += f"**Location:** `{server_metadata['directory']}`\n\n"
# Code preview
output += "### πŸ“„ Generated Code Preview\n\n```python\n"
try:
app_file = server_metadata["files"]["app"]
with open(app_file, "r", encoding="utf-8") as f:
lines = f.readlines()[:30]
output += "".join(lines)
if len(lines) >= 30:
output += "\n... (truncated - full code saved locally)\n"
except Exception as e:
output += f"# Code preview unavailable: {e}\n"
output += "```\n\n"
output += f"**Files saved to:** `{server_metadata['directory']}`\n\n"
# ZIP + (disabled) Hub upload
zip_path = create_download_zip(server_metadata)
if zip_path:
server_metadata["zip_path"] = zip_path
output += "πŸ“¦ **Download button updated below!**\n\n"
hub_url = push_zip_to_space_repo(Path(zip_path))
if hub_url:
server_metadata["hub_url"] = hub_url
output += f"πŸ”— **Saved to Hub:** {hub_url}\n\n"
else:
output += (
"ℹ️ Auto-upload to the Hub repo is disabled.\n"
" Use the download button, then upload the ZIP manually\n"
" to `generated_mcps/` in the Files tab if you want it stored.\n\n"
)
agent_state["nodes"][-1]["type"] = "completed"
yield (
output,
create_agent_graph(agent_state),
to_jsonable(server_metadata),
zip_path,
)
# Step 4: Deploy to Modal
output += "## πŸš€ Step 4: Deploying to Modal\n\n"
agent_state["nodes"].append(
{"id": "deploy", "label": "Deploy", "type": "deploying"}
)
agent_state["edges"].append({"from": "generate", "to": "deploy"})
yield (output, create_agent_graph(agent_state), {}, zip_path)
deployment = await deployer.deploy_mcp_server(server_metadata)
if deployment.get("simulated"):
output += (
"⚠️ **Simulated deployment** (configure MODAL_TOKEN for real deployment)\n"
)
if deployment.get("status") == "failed":
output += (
f"⚠️ **Deployment skipped:** "
f"{deployment.get('error', 'Unknown error')}\n\n"
)
else:
output += f"**URL:** {deployment.get('modal_url', 'N/A')}\n"
output += f"**Status:** {deployment.get('status', 'unknown')}\n\n"
agent_state["nodes"][-1]["type"] = "completed"
yield (
output,
create_agent_graph(agent_state),
to_jsonable(deployment),
zip_path,
)
# Step 5: Final response
output += "## ✨ Step 5: Generating Response\n\n"
agent_state["nodes"].append(
{"id": "respond", "label": "Response", "type": "executing"}
)
if server_metadata:
agent_state["edges"].append({"from": "deploy", "to": "respond"})
else:
agent_state["edges"].append({"from": "analyze", "to": "respond"})
yield (output, create_agent_graph(agent_state), {}, zip_path)
response_prompt = f"""Based on the work done, provide a clear, professional response to the user.
Original request: {user_request}
What was done:
{json.dumps(analysis_data, indent=2)}
{f"Generated MCP: {server_metadata['server_name']}" if server_metadata else "No custom MCP needed"}
Provide a helpful response explaining what was accomplished and how the user can use it.
"""
final_response = await router.generate(
response_prompt,
task_type=TaskType.REASONING,
temperature=0.7,
)
output += final_response["response"] + "\n\n"
agent_state["nodes"][-1]["type"] = "completed"
yield (output, create_agent_graph(agent_state), {}, zip_path)
if use_voice and voice.client:
output += "\nπŸ”Š **Generating voice response...**\n"
yield (output, create_agent_graph(agent_state), {}, zip_path)
output += "\n---\n\n"
output += "**Model Usage:**\n"
stats = router.get_usage_stats()
output += f"- Total Requests: {stats['total_requests']}\n"
output += f"- Total Cost: ${stats['total_cost']}\n"
output += f"- Claude: {stats['by_model']['claude']['requests']}\n"
output += f"- Gemini: {stats['by_model']['gemini']['requests']}\n"
output += f"- GPT-4: {stats['by_model']['gpt4']['requests']}\n"
yield (output, create_agent_graph(agent_state), to_jsonable(stats), zip_path)
# ============================================================================
# Gradio UI
# ============================================================================
def build_ui() -> gr.Blocks:
"""Build the Gradio interface."""
custom_css = """
.gradio-container {
font-family: 'Inter', -apple-system, BlinkMacSystemFont, sans-serif;
}
.main-header {
text-align: center;
padding: 2rem 0;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
border-radius: 10px;
margin-bottom: 2rem;
}
"""
with gr.Blocks(title="OmniMind Orchestrator - MCP Hackathon") as app:
gr.HTML(f"<style>{custom_css}</style>")
gr.HTML(
"""
<div class="main-header">
<h1>🧠 OmniMind Orchestrator</h1>
<p>The World's First Self-Evolving Multi-Agent MCP Ecosystem</p>
<p style="font-size: 0.9em; opacity: 0.9;">
Track 2 Submission - MCP's 1st Birthday Hackathon
</p>
</div>
"""
)
with gr.Row():
with gr.Column(scale=1):
gr.Markdown(
"""
### 🎯 What is OmniMind?
OmniMind is the **first AI agent that creates other AI agents**.
It:
1. 🧠 Analyzes your request
2. βš™οΈ Generates custom MCP servers
3. πŸš€ Deploys them to Modal
4. βœ… Executes your task
"""
)
user_input = gr.Textbox(
label="What do you need?",
placeholder="Example: Create a tool that monitors my competitor's pricing every hour",
lines=3,
)
with gr.Row():
use_voice = gr.Checkbox(label="πŸ”Š Voice Output", value=False)
use_kb = gr.Checkbox(label="πŸ“š Use Knowledge Base", value=False)
submit_btn = gr.Button(
"πŸš€ Let OmniMind Handle It", variant="primary"
)
gr.Markdown(
"""
### πŸ’‘ Try These Examples:
- "Create a tool that scrapes product prices from Amazon"
- "Build an API integration for Salesforce"
- "Generate a data analyzer for CSV files"
- "Make a tool that monitors website uptime"
"""
)
with gr.Column(scale=2):
output_md = gr.Markdown(
value="**Results will appear here**", label="Agent Output"
)
agent_graph = gr.Plot(label="🧠 Agent Brain (Real-Time)")
download_file = gr.File(
label="πŸ“¦ Download Generated MCP Server", visible=False
)
with gr.Accordion("πŸ“Š Detailed Metadata", open=False):
metadata_json = gr.JSON(label="Execution Metadata")
with gr.Row():
with gr.Column():
gr.Markdown(
"""
### πŸ† Sponsor Integrations
- **Anthropic Claude**: Core reasoning engine
- **Google Gemini**: Multimodal capabilities
- **OpenAI GPT-4**: Planning and routing
- **Modal**: Serverless MCP deployment
- **LlamaIndex**: Enterprise knowledge RAG
- **ElevenLabs**: Voice interface
- **Blaxel**: Agent visualization
"""
)
with gr.Column():
gr.Markdown(
"""
### ✨ Innovation Highlights
1. **Self-Evolving Agent** – creates its own tools
2. **Multi-Model Intelligence** – best model for each task
3. **Infinite Extensibility** – not limited by fixed tool sets
4. **Enterprise-Ready** – clean, production-grade architecture
5. **Voice-First UX** – ideal for executives and operators
"""
)
with gr.Accordion("ℹ️ About This Project", open=False):
gr.Markdown(
"""
## OmniMind Orchestrator
Track 2: MCP in Action (Enterprise Category)
This project demonstrates an agent that **generates and deploys its own MCP
servers on-demand** using Anthropic, OpenAI, Gemini, Modal, LlamaIndex,
ElevenLabs and more.
"""
)
async def handle_submit(request, voice_enabled, kb_enabled):
async for out_text, graph, meta, zip_path in orchestrate_task(
request, voice_enabled, kb_enabled
):
if zip_path:
yield (
out_text,
graph,
meta,
gr.update(value=zip_path, visible=True),
)
else:
yield (
out_text,
graph,
meta,
gr.update(value=None, visible=False),
)
submit_btn.click(
fn=handle_submit,
inputs=[user_input, use_voice, use_kb],
outputs=[output_md, agent_graph, metadata_json, download_file],
)
gr.Markdown(
"""
---
<div style="text-align: center; padding: 1rem; color: #666;">
πŸŽ‰ Built for MCP's 1st Birthday Hackathon | Hosted by Anthropic &amp; Gradio
</div>
"""
)
return app
# ============================================================================
# Main Execution
# ============================================================================
if __name__ == "__main__":
print("=" * 60)
print("[AI] OmniMind Orchestrator")
print("=" * 60)
print()
print("[START] Starting Gradio application...")
print()
required_keys = {
"ANTHROPIC_API_KEY": "Claude Sonnet (required)",
"OPENAI_API_KEY": "GPT-4 & embeddings (required)",
"GOOGLE_API_KEY": "Gemini 2.0 (for $10K prize)",
}
optional_keys = {
"MODAL_TOKEN": "Modal deployment ($2.5K prize)",
"ELEVENLABS_API_KEY": "Voice interface ($2K + AirPods)",
"LLAMAINDEX_API_KEY": "LlamaIndex cloud ($1K prize)",
}
print("[OK] Required API Keys:")
for key, desc in required_keys.items():
status = "[CHECK]" if os.getenv(key) else "[X]"
print(f" {status} {key} - {desc}")
print()
print("[BONUS] Optional API Keys (for bonus prizes):")
for key, desc in optional_keys.items():
status = "[CHECK]" if os.getenv(key) else "[O]"
print(f" {status} {key} - {desc}")
print()
print("=" * 60)
print()
app = build_ui()
app.queue()
app.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
show_error=True,
)