""" LifeFlow AI - Main Application (Fixed for Exclusive Tools Dict) ✅ 修正錯誤: 確保傳遞給 Service的是 Dictionary 而不是單一 Object ✅ 實現多通道隔離 (Scout 只能用 Scout 的工具) """ import gradio as gr import asyncio import os import sys import traceback from contextlib import asynccontextmanager from datetime import datetime # Agno / MCP Imports from agno.tools.mcp import MCPTools # UI Imports from ui.theme import get_enhanced_css from ui.components.header import create_header from ui.components.progress_stepper import create_progress_stepper, update_stepper from ui.components.input_form import create_input_form, toggle_location_inputs from ui.components.modals import create_settings_modal, create_doc_modal from ui.renderers import create_agent_dashboard, create_summary_card # Core Imports from core.session import UserSession from services.planner_service import PlannerService from services.validator import APIValidator from config import MODEL_OPTIONS from src.infra.client_context import client_session_ctx # ==================== 1. Tool Isolation Config ==================== # 定義每個 Agent 專屬的工具名稱 (對應 mcp_server_lifeflow.py 裡的 @mcp.tool 名稱) AGENT_TOOL_MAP = { "scout": "search_and_offload", "optimizer": "optimize_from_ref", "navigator": "calculate_traffic_and_timing", "weatherman": "check_weather_for_timeline", "presenter": "read_final_itinerary", } # 全域變數:存放 "Agent Name" -> "MCPToolkit Instance" 的對照表 GLOBAL_TOOLKITS_DICT = {} # ==================== 2. MCP Lifecycle Manager ==================== def inject_context_into_tools(toolkit): """ 手術刀函式: 1. [Context] 為每個工具安裝攔截器,自動注入 session_id。 2. [Rename] (新增) 移除 Agno 自動加上的底線前綴,讓 Agent 能找到工具。 """ # 嘗試取得工具列表 funcs = getattr(toolkit, "functions", {}) # 如果是 List,轉成 Dict 方便我們操作名稱 # (Agno 的 MCPToolkit 通常是用 Dict 存的: {name: function}) if isinstance(funcs, list): print("⚠️ Warning: toolkit.functions is a list. Renaming might be tricky.") tool_iterator = funcs elif isinstance(funcs, dict): tool_iterator = list(funcs.values()) # 轉成 List 以便安全迭代 else: print(f"⚠️ Warning: Unknown functions type: {type(funcs)}") return 0 count = 0 renamed_count = 0 for tool in tool_iterator: # --- A. 執行 Context 注入 (原本的邏輯) --- original_entrypoint = tool.entrypoint async def context_aware_wrapper(*args, **kwargs): current_sid = client_session_ctx.get() if current_sid: kwargs['session_id'] = current_sid if asyncio.iscoroutinefunction(original_entrypoint): return await original_entrypoint(*args, **kwargs) else: return original_entrypoint(*args, **kwargs) tool.entrypoint = context_aware_wrapper count += 1 # --- B. 執行更名手術 (新增邏輯) --- # 如果名字是以 "_" 開頭 (例如 _search_and_offload) if tool.name.startswith("_"): old_name = tool.name new_name = old_name[1:] # 去掉第一個字元 # 1. 改工具物件本身的名字 tool.name = new_name # 2. 如果 funcs 是字典,也要更新 Key,不然 Agent 查表會查不到 if isinstance(funcs, dict): # 建立新 Key,移除舊 Key funcs[new_name] = tool if old_name in funcs: del funcs[old_name] renamed_count += 1 # print(f" 🔧 Renamed '{old_name}' -> '{new_name}'") print(f" 🛡️ Injection: {count} tools patched, {renamed_count} tools renamed.") return count @asynccontextmanager async def lifespan_manager(): """ MCP 生命週期管理器 (多通道模式): 迴圈建立 5 個獨立的 MCP Client,每個只包含該 Agent 需要的工具。 """ global GLOBAL_TOOLKITS_DICT print("🚀 [System] Initializing Exclusive MCP Connections...") server_script = "mcp_server_lifeflow.py" env_vars = os.environ.copy() env_vars["PYTHONUNBUFFERED"] = "1" started_toolkits = [] try: # 🔥 迴圈建立隔離的工具箱 for agent_name, tool_name in AGENT_TOOL_MAP.items(): print(f" ⚙️ Connecting {agent_name} -> tool: {tool_name}...") # 建立專屬連線 tool = MCPTools( command=f"{sys.executable} {server_script}", env=env_vars, # 🔥 關鍵:只允許看見這個工具 include_tools=[tool_name] ) # 啟動連線 await tool.connect() inject_context_into_tools(tool) GLOBAL_TOOLKITS_DICT[agent_name] = tool # 存入字典 started_toolkits.append(tool) print(f"✅ [System] All {len(started_toolkits)} MCP Channels Ready!") yield except Exception as e: print(f"❌ [System] MCP Connection Failed: {e}") traceback.print_exc() yield finally: print("🔻 [System] Closing All MCP Connections...") for name, tool in GLOBAL_TOOLKITS_DICT.items(): try: await tool.close() except Exception as e: print(f"⚠️ Error closing {name}: {e}") print("🏁 [System] Shutdown Complete.") # ==================== 3. Main Application Class ==================== class LifeFlowAI: def __init__(self): self.service = PlannerService() def inject_tools(self, toolkits_dict): """ 依賴注入:接收工具字典 """ # 這裡會檢查傳進來的是不是字典,避免你剛才遇到的錯誤 if isinstance(toolkits_dict, dict) and toolkits_dict: self.service.set_global_tools(toolkits_dict) print("💉 [App] Tools Dictionary injected into Service.") else: print(f"⚠️ [App] Warning: Invalid toolkits format. Expected dict, got {type(toolkits_dict)}") # ... (Event Wrappers 保持不變,因為它們只呼叫 self.service) ... def cancel_wrapper(self, session_data): session = UserSession.from_dict(session_data) if session.session_id: self.service.cancel_session(session.session_id) def _get_dashboard_html(self, active_agent: str = None, status: str = "idle", message: str = "Waiting") -> str: agents = ['team', 'scout', 'optimizer', 'navigator', 'weatherman', 'presenter'] status_dict = {a: {'status': 'idle', 'message': 'Standby'} for a in agents} if active_agent in status_dict: status_dict[active_agent] = {'status': status, 'message': message} if active_agent != 'team' and status == 'working': status_dict['team'] = {'status': 'working', 'message': f'Monitoring {active_agent}...'} return create_agent_dashboard(status_dict) def _check_api_status(self, session_data): session = UserSession.from_dict(session_data) has_key = bool(session.custom_settings.get("model_api_key")) or bool(os.environ.get("OPENAI_API_KEY")) or bool(os.environ.get("GOOGLE_API_KEY")) return "✅ System Ready (Exclusive Mode)" if has_key else "⚠️ Check API Keys" def _get_gradio_chat_history(self, session): return [{"role": msg["role"], "content": msg["message"]} for msg in session.chat_history] # --- Wrappers --- def analyze_wrapper(self, user_input, auto_loc, lat, lon, session_data): session = UserSession.from_dict(session_data) iterator = self.service.run_step1_analysis(user_input, auto_loc, lat, lon, session) for event in iterator: evt_type = event.get("type") current_session = event.get("session", session) if evt_type == "error": gr.Warning(event.get('message')) yield (gr.update(visible=True), gr.update(visible=False), gr.update(visible=False), gr.HTML(f"
{event.get('message')}
"), gr.update(), gr.update(), gr.update(), current_session.to_dict()) return if evt_type == "stream": yield (gr.update(visible=True), gr.update(visible=False), gr.update(visible=False), event.get("stream_text", ""), gr.update(), gr.update(), gr.update(), current_session.to_dict()) elif evt_type == "complete": tasks_html = self.service.generate_task_list_html(current_session) date_str = event.get("start_time", "N/A") summary_html = create_summary_card(len(current_session.task_list), event.get("high_priority", 0), event.get("total_time", 0), location=event.get("start_location", "N/A"), date=date_str) chat_history = self._get_gradio_chat_history(current_session) if not chat_history: chat_history = [{"role": "assistant", "content": event.get('stream_text', "")}] current_session.chat_history.append({"role": "assistant", "message": "Hi! I'm LifeFlow...", "time": ""}) yield (gr.update(visible=False), gr.update(visible=True), gr.update(visible=False), "", gr.HTML(tasks_html), gr.HTML(summary_html), chat_history, current_session.to_dict()) def chat_wrapper(self, msg, session_data): session = UserSession.from_dict(session_data) iterator = self.service.modify_task_chat(msg, session) for event in iterator: sess = event.get("session", session) tasks_html = self.service.generate_task_list_html(sess) summary_html = event.get("summary_html", gr.HTML()) gradio_history = self._get_gradio_chat_history(sess) yield (gradio_history, tasks_html, summary_html, sess.to_dict()) # 🔥 記得改成 async def async def step3_wrapper(self, session_data): session = UserSession.from_dict(session_data) log_content, report_content = "", "" tasks_html = self.service.generate_task_list_html(session) init_dashboard = self._get_dashboard_html('team', 'working', 'Initializing...') loading_html = '
🧠 Analyzing...
' init_log = '
Waiting for agents...
' yield (init_dashboard, init_log, loading_html, tasks_html, session.to_dict()) try: # 🔥 使用 async for async for event in self.service.run_step3_team(session): sess = event.get("session", session) evt_type = event.get("type") if evt_type == "error": raise Exception(event.get("message")) if evt_type == "report_stream": report_content = event.get("content", "") if evt_type == "reasoning_update": agent, status, msg = event.get("agent_status") time_str = datetime.now().strftime('%H:%M:%S') log_entry = f'
{time_str} • {agent.upper()}
{msg}
' log_content = log_entry + log_content dashboard_html = self._get_dashboard_html(agent, status, msg) log_html = f'
{log_content}
' current_report = report_content + "\n\n" if report_content else loading_html yield (dashboard_html, log_html, current_report, tasks_html, sess.to_dict()) if evt_type == "complete": final_report = event.get("report_html", report_content) final_log = f'
✅ Planning Completed
{log_content}' final_log_html = f'
{final_log}
' yield (self._get_dashboard_html('team', 'complete', 'Planning Finished'), final_log_html, final_report, tasks_html, sess.to_dict()) except Exception as e: error_html = f"
Error: {str(e)}
" yield (self._get_dashboard_html('team', 'error', 'Failed'), error_html, "", tasks_html, session.to_dict()) def step4_wrapper(self, session_data): session = UserSession.from_dict(session_data) result = self.service.run_step4_finalize(session) if result['type'] == 'success': return (gr.update(visible=False), gr.update(visible=True), result['summary_tab_html'], result['report_md'], result['task_list_html'], result['map_fig'], session.to_dict()) else: return (gr.update(visible=True), gr.update(visible=False), "", "", "", None, session.to_dict()) def save_settings(self, g, w, prov, m_key, m_sel, fast, g_key_in, f_sel, s_data): sess = UserSession.from_dict(s_data) sess.custom_settings.update({'google_maps_api_key': g, 'openweather_api_key': w, 'llm_provider': prov, 'model_api_key': m_key, 'model': m_sel, 'enable_fast_mode': fast, 'groq_fast_model': f_sel, 'groq_api_key': g_key_in}) return gr.update(visible=False), sess.to_dict(), "✅ Configuration Saved" # ==================== 4. UI Builder ==================== def build_interface(self): container_css = """ .gradio-container { max-width: 100% !important; padding: 0; height: 100vh !important; overflow-y: auto !important; } """ with gr.Blocks(title="LifeFlow AI (MCP)", css=container_css) as demo: gr.HTML(get_enhanced_css()) session_state = gr.State(UserSession().to_dict()) with gr.Column(elem_classes="step-container"): home_btn, settings_btn, doc_btn = create_header() stepper = create_progress_stepper(1) status_bar = gr.Markdown("Initializing MCP...", visible=False) # --- Steps UI (Copy from previous) --- # 為節省篇幅,這裡省略中間 UI 宣告代碼,它們與之前完全相同 # 請確保這裡是完整的 create_input_form, step2, step3, step4 定義 # ... with gr.Group(visible=True, elem_classes="step-container centered-input-container") as step1_container: (input_area, s1_stream_output, user_input, auto_loc, loc_group, lat_in, lon_in, analyze_btn) = create_input_form("") with gr.Group(visible=False, elem_classes="step-container") as step2_container: gr.Markdown("### ✅ Review & Refine Tasks") with gr.Row(elem_classes="step2-split-view"): with gr.Column(scale=1): with gr.Group(elem_classes="panel-container"): gr.HTML('
📋 Your Tasks
') with gr.Group(elem_classes="scrollable-content"): task_summary_box = gr.HTML() task_list_box = gr.HTML() with gr.Column(scale=1): with gr.Group(elem_classes="chat-panel-native"): chatbot = gr.Chatbot( label="AI Assistant", type="messages", height=575, elem_classes="native-chatbot", bubble_full_width=False ) with gr.Row(elem_classes="chat-input-row"): chat_input = gr.Textbox(show_label=False, placeholder="Type to modify tasks...", container=False, scale=5, autofocus=True) chat_send = gr.Button("➤", variant="primary", scale=1, min_width=50) with gr.Row(elem_classes="action-footer"): back_btn = gr.Button("← Back", variant="secondary", scale=1) plan_btn = gr.Button("🚀 Start Planning", variant="primary", scale=2) with gr.Group(visible=False, elem_classes="step-container") as step3_container: gr.Markdown("### 🤖 AI Team Operations") with gr.Group(elem_classes="agent-dashboard-container"): agent_dashboard = gr.HTML(value=self._get_dashboard_html()) with gr.Row(): with gr.Column(scale=3): with gr.Tabs(): with gr.Tab("📝 Full Report"): with gr.Group(elem_classes="live-report-wrapper"): live_report_md = gr.Markdown("🧠 Analyzing your tasks...") with gr.Tab("📋 Task List"): with gr.Group(elem_classes="panel-container"): with gr.Group(elem_classes="scrollable-content"): task_list_s3 = gr.HTML() with gr.Column(scale=1): cancel_plan_btn = gr.Button("🛑 Stop & Back to Edit", variant="stop") gr.Markdown("### ⚡ Activity Log") with gr.Group(elem_classes="panel-container"): planning_log = gr.HTML(value="Waiting...") with gr.Group(visible=False, elem_classes="step-container") as step4_container: # 定義狀態:抽屜預設開啟 drawer_state = gr.State(True) with gr.Tabs(): # === Tab 1: 地圖 === with gr.Tab("🗺️ Route Map", id="tab_map"): with gr.Group(elem_classes="map-container-relative"): # 1. 地圖 (HTML) # 注意:確保這裡是 map_view map_view = gr.HTML(label="Route Map") # 2. 浮動抽屜 (Overlay) with gr.Group(visible=True, elem_classes="map-overlay-drawer") as summary_drawer: with gr.Row(elem_classes="drawer-header"): gr.Markdown("### 📊 Trip Summary", elem_classes="drawer-title") close_drawer_btn = gr.Button("✕", elem_classes="drawer-close-btn", size="sm") with gr.Group(elem_classes="drawer-content"): with gr.Tabs(elem_classes="drawer-tabs"): with gr.Tab("Overview"): summary_tab_output = gr.HTML() with gr.Tab("Tasks"): task_list_tab_output = gr.HTML() # 3. 開啟按鈕 (浮動,預設隱藏) # 加上 variant='primary' 確保它醒目 open_drawer_btn = gr.Button("📊 Show Summary", visible=False, variant="primary", elem_classes="map-overlay-btn") # === Tab 2: 完整報告 === with gr.Tab("📝 Full Report", id="tab_report"): # 使用 live-report-wrapper 確保有白紙背景效果 with gr.Group(elem_classes="live-report-wrapper"): report_tab_output = gr.Markdown() # === 事件綁定 (控制抽屜) === # 關閉:抽屜消失,按鈕出現 close_drawer_btn.click( fn=lambda: (gr.update(visible=False), gr.update(visible=True)), outputs=[summary_drawer, open_drawer_btn] ) # 開啟:抽屜出現,按鈕消失 open_drawer_btn.click( fn=lambda: (gr.update(visible=True), gr.update(visible=False)), outputs=[summary_drawer, open_drawer_btn] ) # --- Modals & Events --- (settings_modal, g_key, g_stat, w_key, w_stat, llm_provider, main_key, main_stat, model_sel, fast_mode_chk, groq_model_sel, groq_key, groq_stat, close_set, save_set, set_stat) = create_settings_modal() # Validators g_key.blur(fn=lambda k: "⏳ Checking..." if k else "", inputs=[g_key], outputs=[g_stat]).then(fn=APIValidator.test_google_maps, inputs=[g_key], outputs=[g_stat]) w_key.blur(fn=lambda k: "⏳ Checking..." if k else "", inputs=[w_key], outputs=[w_stat]).then(fn=APIValidator.test_openweather, inputs=[w_key], outputs=[w_stat]) main_key.blur(fn=lambda k: "⏳ Checking..." if k else "", inputs=[main_key], outputs=[main_stat]).then(fn=APIValidator.test_llm, inputs=[llm_provider, main_key, model_sel], outputs=[main_stat]) def resolve_groq_visibility(fast_mode, provider): if not fast_mode: return (gr.update(visible=False), gr.update(visible=False), gr.update(visible=False)) model_vis = gr.update(visible=True) if provider == "Groq": return (model_vis, gr.update(visible=False), gr.update(visible=False)) else: return (model_vis, gr.update(visible=True), gr.update(visible=True)) fast_mode_chk.change(fn=resolve_groq_visibility, inputs=[fast_mode_chk, llm_provider], outputs=[groq_model_sel, groq_key, groq_stat]) def on_provider_change(provider, fast_mode): new_choices = MODEL_OPTIONS.get(provider, []) default_val = new_choices[0][1] if new_choices else "" model_update = gr.Dropdown(choices=new_choices, value=default_val) g_model_vis, g_key_vis, g_stat_vis = resolve_groq_visibility(fast_mode, provider) return (model_update, gr.Textbox(value=""), gr.Markdown(value=""), g_model_vis, g_key_vis, g_stat_vis) llm_provider.change(fn=on_provider_change, inputs=[llm_provider, fast_mode_chk], outputs=[model_sel, main_key, main_stat, groq_model_sel, groq_key, groq_stat]) doc_modal, close_doc_btn = create_doc_modal() settings_btn.click(fn=lambda: gr.update(visible=True), outputs=[settings_modal]) close_set.click(fn=lambda: gr.update(visible=False), outputs=[settings_modal]) doc_btn.click(fn=lambda: gr.update(visible=True), outputs=[doc_modal]) close_doc_btn.click(fn=lambda: gr.update(visible=False), outputs=[doc_modal]) # Actions analyze_btn.click(fn=self.analyze_wrapper, inputs=[user_input, auto_loc, lat_in, lon_in, session_state], outputs=[step1_container, step2_container, step3_container, s1_stream_output, task_list_box, task_summary_box, chatbot, session_state]).then(fn=lambda: update_stepper(2), outputs=[stepper]) chat_send.click(fn=self.chat_wrapper, inputs=[chat_input, session_state], outputs=[chatbot, task_list_box, task_summary_box, session_state]).then(fn=lambda: "", outputs=[chat_input]) chat_input.submit(fn=self.chat_wrapper, inputs=[chat_input, session_state], outputs=[chatbot, task_list_box, task_summary_box, session_state]).then(fn=lambda: "", outputs=[chat_input]) # 🔥 Step 3 Start step3_start = plan_btn.click(fn=lambda: (gr.update(visible=False), gr.update(visible=True), update_stepper(3)), outputs=[step2_container, step3_container, stepper]).then(fn=self.step3_wrapper, inputs=[session_state], outputs=[agent_dashboard, planning_log, live_report_md, task_list_s3, session_state], show_progress="hidden") cancel_plan_btn.click(fn=self.cancel_wrapper, inputs=[session_state], outputs=None).then(fn=lambda: (gr.update(visible=True), gr.update(visible=False), update_stepper(2)), inputs=None, outputs=[step2_container, step3_container, stepper]) step3_start.then(fn=self.step4_wrapper, inputs=[session_state], outputs=[step3_container, step4_container, summary_tab_output, report_tab_output, task_list_tab_output, map_view, session_state]).then(fn=lambda: update_stepper(4), outputs=[stepper]) def reset_all(session_data): old_s = UserSession.from_dict(session_data) new_s = UserSession() new_s.custom_settings = old_s.custom_settings return (gr.update(visible=True), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), update_stepper(1), new_s.to_dict(), "") home_btn.click(fn=self.cancel_wrapper, inputs=[session_state], outputs=None).then(fn=reset_all, inputs=[session_state], outputs=[step1_container, step2_container, step3_container, step4_container, stepper, session_state, user_input]) back_btn.click(fn=self.cancel_wrapper, inputs=[session_state], outputs=None).then(fn=reset_all, inputs=[session_state], outputs=[step1_container, step2_container, step3_container, step4_container, stepper, session_state, user_input]) save_set.click(fn=self.save_settings, inputs=[g_key, w_key, llm_provider, main_key, model_sel, fast_mode_chk, groq_key, groq_model_sel, session_state], outputs=[settings_modal, session_state, status_bar]) auto_loc.change(fn=toggle_location_inputs, inputs=auto_loc, outputs=loc_group) demo.load(fn=self._check_api_status, inputs=[session_state], outputs=[status_bar]) return demo # ==================== 5. Main ==================== def main(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) async def run_app_lifecycle(): async with lifespan_manager(): app = LifeFlowAI() # 🔥 注入工具字典 (Dictionary) app.inject_tools(GLOBAL_TOOLKITS_DICT) demo = app.build_interface() demo.launch( server_name="0.0.0.0", server_port=7860, share=True, show_error=True, prevent_thread_lock=True ) print("✨ App is running. Press Ctrl+C to stop.") try: while True: await asyncio.sleep(1) except asyncio.CancelledError: pass try: loop.run_until_complete(run_app_lifecycle()) except KeyboardInterrupt: pass except Exception as e: print("\n❌ CRITICAL ERROR STARTUP FAILED ❌") traceback.print_exc() finally: print("🛑 Shutting down process...") import os os._exit(0) if __name__ == "__main__": main()