Spaces:
Running
Running
| """ | |
| LifeFlow AI - Main Application (Fixed for Exclusive Tools Dict) | |
| ✅ 修正錯誤: 確保傳遞給 Service的是 Dictionary 而不是單一 Object | |
| ✅ 實現多通道隔離 (Scout 只能用 Scout 的工具) | |
| """ | |
| import gradio as gr | |
| import asyncio | |
| import os | |
| import sys | |
| import traceback | |
| from contextlib import asynccontextmanager | |
| from datetime import datetime | |
| # Agno / MCP Imports | |
| from agno.tools.mcp import MCPTools | |
| # UI Imports | |
| from ui.theme import get_enhanced_css | |
| from ui.components.header import create_header | |
| from ui.components.progress_stepper import create_progress_stepper, update_stepper | |
| from ui.components.input_form import create_input_form, toggle_location_inputs | |
| from ui.components.modals import create_settings_modal, create_doc_modal | |
| from ui.renderers import create_agent_dashboard, create_summary_card | |
| # Core Imports | |
| from core.session import UserSession | |
| from services.planner_service import PlannerService | |
| from services.validator import APIValidator | |
| from config import MODEL_OPTIONS | |
| from src.infra.client_context import client_session_ctx | |
| # ==================== 1. Tool Isolation Config ==================== | |
| # 定義每個 Agent 專屬的工具名稱 (對應 mcp_server_lifeflow.py 裡的 @mcp.tool 名稱) | |
| AGENT_TOOL_MAP = { | |
| "scout": "search_and_offload", | |
| "optimizer": "optimize_from_ref", | |
| "navigator": "calculate_traffic_and_timing", | |
| "weatherman": "check_weather_for_timeline", | |
| "presenter": "read_final_itinerary", | |
| } | |
| # 全域變數:存放 "Agent Name" -> "MCPToolkit Instance" 的對照表 | |
| GLOBAL_TOOLKITS_DICT = {} | |
| # ==================== 2. MCP Lifecycle Manager ==================== | |
| def inject_context_into_tools(toolkit): | |
| """ | |
| 手術刀函式: | |
| 1. [Context] 為每個工具安裝攔截器,自動注入 session_id。 | |
| 2. [Rename] (新增) 移除 Agno 自動加上的底線前綴,讓 Agent 能找到工具。 | |
| """ | |
| # 嘗試取得工具列表 | |
| funcs = getattr(toolkit, "functions", {}) | |
| # 如果是 List,轉成 Dict 方便我們操作名稱 | |
| # (Agno 的 MCPToolkit 通常是用 Dict 存的: {name: function}) | |
| if isinstance(funcs, list): | |
| print("⚠️ Warning: toolkit.functions is a list. Renaming might be tricky.") | |
| tool_iterator = funcs | |
| elif isinstance(funcs, dict): | |
| tool_iterator = list(funcs.values()) # 轉成 List 以便安全迭代 | |
| else: | |
| print(f"⚠️ Warning: Unknown functions type: {type(funcs)}") | |
| return 0 | |
| count = 0 | |
| renamed_count = 0 | |
| for tool in tool_iterator: | |
| # --- A. 執行 Context 注入 (原本的邏輯) --- | |
| original_entrypoint = tool.entrypoint | |
| async def context_aware_wrapper(*args, **kwargs): | |
| current_sid = client_session_ctx.get() | |
| if current_sid: | |
| kwargs['session_id'] = current_sid | |
| if asyncio.iscoroutinefunction(original_entrypoint): | |
| return await original_entrypoint(*args, **kwargs) | |
| else: | |
| return original_entrypoint(*args, **kwargs) | |
| tool.entrypoint = context_aware_wrapper | |
| count += 1 | |
| # --- B. 執行更名手術 (新增邏輯) --- | |
| # 如果名字是以 "_" 開頭 (例如 _search_and_offload) | |
| if tool.name.startswith("_"): | |
| old_name = tool.name | |
| new_name = old_name[1:] # 去掉第一個字元 | |
| # 1. 改工具物件本身的名字 | |
| tool.name = new_name | |
| # 2. 如果 funcs 是字典,也要更新 Key,不然 Agent 查表會查不到 | |
| if isinstance(funcs, dict): | |
| # 建立新 Key,移除舊 Key | |
| funcs[new_name] = tool | |
| if old_name in funcs: | |
| del funcs[old_name] | |
| renamed_count += 1 | |
| # print(f" 🔧 Renamed '{old_name}' -> '{new_name}'") | |
| print(f" 🛡️ Injection: {count} tools patched, {renamed_count} tools renamed.") | |
| return count | |
| async def lifespan_manager(): | |
| """ | |
| MCP 生命週期管理器 (多通道模式): | |
| 迴圈建立 5 個獨立的 MCP Client,每個只包含該 Agent 需要的工具。 | |
| """ | |
| global GLOBAL_TOOLKITS_DICT | |
| print("🚀 [System] Initializing Exclusive MCP Connections...") | |
| server_script = "mcp_server_lifeflow.py" | |
| env_vars = os.environ.copy() | |
| env_vars["PYTHONUNBUFFERED"] = "1" | |
| started_toolkits = [] | |
| try: | |
| # 🔥 迴圈建立隔離的工具箱 | |
| for agent_name, tool_name in AGENT_TOOL_MAP.items(): | |
| print(f" ⚙️ Connecting {agent_name} -> tool: {tool_name}...") | |
| # 建立專屬連線 | |
| tool = MCPTools( | |
| command=f"{sys.executable} {server_script}", | |
| env=env_vars, | |
| # 🔥 關鍵:只允許看見這個工具 | |
| include_tools=[tool_name] | |
| ) | |
| # 啟動連線 | |
| await tool.connect() | |
| inject_context_into_tools(tool) | |
| GLOBAL_TOOLKITS_DICT[agent_name] = tool | |
| # 存入字典 | |
| started_toolkits.append(tool) | |
| print(f"✅ [System] All {len(started_toolkits)} MCP Channels Ready!") | |
| yield | |
| except Exception as e: | |
| print(f"❌ [System] MCP Connection Failed: {e}") | |
| traceback.print_exc() | |
| yield | |
| finally: | |
| print("🔻 [System] Closing All MCP Connections...") | |
| for name, tool in GLOBAL_TOOLKITS_DICT.items(): | |
| try: | |
| await tool.close() | |
| except Exception as e: | |
| print(f"⚠️ Error closing {name}: {e}") | |
| print("🏁 [System] Shutdown Complete.") | |
| # ==================== 3. Main Application Class ==================== | |
| class LifeFlowAI: | |
| def __init__(self): | |
| self.service = PlannerService() | |
| def inject_tools(self, toolkits_dict): | |
| """ | |
| 依賴注入:接收工具字典 | |
| """ | |
| # 這裡會檢查傳進來的是不是字典,避免你剛才遇到的錯誤 | |
| if isinstance(toolkits_dict, dict) and toolkits_dict: | |
| self.service.set_global_tools(toolkits_dict) | |
| print("💉 [App] Tools Dictionary injected into Service.") | |
| else: | |
| print(f"⚠️ [App] Warning: Invalid toolkits format. Expected dict, got {type(toolkits_dict)}") | |
| # ... (Event Wrappers 保持不變,因為它們只呼叫 self.service) ... | |
| def cancel_wrapper(self, session_data): | |
| session = UserSession.from_dict(session_data) | |
| if session.session_id: self.service.cancel_session(session.session_id) | |
| def _get_dashboard_html(self, active_agent: str = None, status: str = "idle", message: str = "Waiting") -> str: | |
| agents = ['team', 'scout', 'optimizer', 'navigator', 'weatherman', 'presenter'] | |
| status_dict = {a: {'status': 'idle', 'message': 'Standby'} for a in agents} | |
| if active_agent in status_dict: | |
| status_dict[active_agent] = {'status': status, 'message': message} | |
| if active_agent != 'team' and status == 'working': | |
| status_dict['team'] = {'status': 'working', 'message': f'Monitoring {active_agent}...'} | |
| return create_agent_dashboard(status_dict) | |
| def _check_api_status(self, session_data): | |
| session = UserSession.from_dict(session_data) | |
| has_key = bool(session.custom_settings.get("model_api_key")) or bool(os.environ.get("OPENAI_API_KEY")) or bool(os.environ.get("GOOGLE_API_KEY")) | |
| return "✅ System Ready (Exclusive Mode)" if has_key else "⚠️ Check API Keys" | |
| def _get_gradio_chat_history(self, session): | |
| return [{"role": msg["role"], "content": msg["message"]} for msg in session.chat_history] | |
| # --- Wrappers --- | |
| def analyze_wrapper(self, user_input, auto_loc, lat, lon, session_data): | |
| session = UserSession.from_dict(session_data) | |
| iterator = self.service.run_step1_analysis(user_input, auto_loc, lat, lon, session) | |
| for event in iterator: | |
| evt_type = event.get("type") | |
| current_session = event.get("session", session) | |
| if evt_type == "error": | |
| gr.Warning(event.get('message')) | |
| yield (gr.update(visible=True), gr.update(visible=False), gr.update(visible=False), gr.HTML(f"<div style='color:red'>{event.get('message')}</div>"), gr.update(), gr.update(), gr.update(), current_session.to_dict()) | |
| return | |
| if evt_type == "stream": | |
| yield (gr.update(visible=True), gr.update(visible=False), gr.update(visible=False), event.get("stream_text", ""), gr.update(), gr.update(), gr.update(), current_session.to_dict()) | |
| elif evt_type == "complete": | |
| tasks_html = self.service.generate_task_list_html(current_session) | |
| date_str = event.get("start_time", "N/A") | |
| summary_html = create_summary_card(len(current_session.task_list), event.get("high_priority", 0), event.get("total_time", 0), location=event.get("start_location", "N/A"), date=date_str) | |
| chat_history = self._get_gradio_chat_history(current_session) | |
| if not chat_history: | |
| chat_history = [{"role": "assistant", "content": event.get('stream_text', "")}] | |
| current_session.chat_history.append({"role": "assistant", "message": "Hi! I'm LifeFlow...", "time": ""}) | |
| yield (gr.update(visible=False), gr.update(visible=True), gr.update(visible=False), "", gr.HTML(tasks_html), gr.HTML(summary_html), chat_history, current_session.to_dict()) | |
| def chat_wrapper(self, msg, session_data): | |
| session = UserSession.from_dict(session_data) | |
| iterator = self.service.modify_task_chat(msg, session) | |
| for event in iterator: | |
| sess = event.get("session", session) | |
| tasks_html = self.service.generate_task_list_html(sess) | |
| summary_html = event.get("summary_html", gr.HTML()) | |
| gradio_history = self._get_gradio_chat_history(sess) | |
| yield (gradio_history, tasks_html, summary_html, sess.to_dict()) | |
| # 🔥 記得改成 async def | |
| async def step3_wrapper(self, session_data): | |
| session = UserSession.from_dict(session_data) | |
| log_content, report_content = "", "" | |
| tasks_html = self.service.generate_task_list_html(session) | |
| init_dashboard = self._get_dashboard_html('team', 'working', 'Initializing...') | |
| loading_html = '<div style="text-align: center; padding: 60px;">🧠 Analyzing...</div>' | |
| init_log = '<div style="padding: 10px; color: #94a3b8;">Waiting for agents...</div>' | |
| yield (init_dashboard, init_log, loading_html, tasks_html, session.to_dict()) | |
| try: | |
| # 🔥 使用 async for | |
| async for event in self.service.run_step3_team(session): | |
| sess = event.get("session", session) | |
| evt_type = event.get("type") | |
| if evt_type == "error": raise Exception(event.get("message")) | |
| if evt_type == "report_stream": report_content = event.get("content", "") | |
| if evt_type == "reasoning_update": | |
| agent, status, msg = event.get("agent_status") | |
| time_str = datetime.now().strftime('%H:%M:%S') | |
| log_entry = f'<div style="margin-bottom: 8px; border-left: 3px solid #6366f1; padding-left: 10px;"><div style="font-size: 0.75rem; color: #94a3b8;">{time_str} • {agent.upper()}</div><div style="color: #334155; font-size: 0.9rem;">{msg}</div></div>' | |
| log_content = log_entry + log_content | |
| dashboard_html = self._get_dashboard_html(agent, status, msg) | |
| log_html = f'<div style="height: 500px; overflow-y: auto; padding: 10px; background: #fff;">{log_content}</div>' | |
| current_report = report_content + "\n\n" if report_content else loading_html | |
| yield (dashboard_html, log_html, current_report, tasks_html, sess.to_dict()) | |
| if evt_type == "complete": | |
| final_report = event.get("report_html", report_content) | |
| final_log = f'<div style="margin-bottom: 8px; border-left: 3px solid #10b981; padding-left: 10px; background: #ecfdf5; padding: 10px;"><div style="font-weight: bold; color: #059669;">✅ Planning Completed</div></div>{log_content}' | |
| final_log_html = f'<div style="height: 500px; overflow-y: auto; padding: 10px; background: #fff;">{final_log}</div>' | |
| yield (self._get_dashboard_html('team', 'complete', 'Planning Finished'), final_log_html, final_report, tasks_html, sess.to_dict()) | |
| except Exception as e: | |
| error_html = f"<div style='color:red'>Error: {str(e)}</div>" | |
| yield (self._get_dashboard_html('team', 'error', 'Failed'), error_html, "", tasks_html, session.to_dict()) | |
| def step4_wrapper(self, session_data): | |
| session = UserSession.from_dict(session_data) | |
| result = self.service.run_step4_finalize(session) | |
| if result['type'] == 'success': | |
| return (gr.update(visible=False), gr.update(visible=True), result['summary_tab_html'], result['report_md'], result['task_list_html'], result['map_fig'], session.to_dict()) | |
| else: | |
| return (gr.update(visible=True), gr.update(visible=False), "", "", "", None, session.to_dict()) | |
| def save_settings(self, g, w, prov, m_key, m_sel, fast, g_key_in, f_sel, s_data): | |
| sess = UserSession.from_dict(s_data) | |
| sess.custom_settings.update({'google_maps_api_key': g, 'openweather_api_key': w, 'llm_provider': prov, 'model_api_key': m_key, 'model': m_sel, 'enable_fast_mode': fast, 'groq_fast_model': f_sel, 'groq_api_key': g_key_in}) | |
| return gr.update(visible=False), sess.to_dict(), "✅ Configuration Saved" | |
| # ==================== 4. UI Builder ==================== | |
| def build_interface(self): | |
| container_css = """ | |
| .gradio-container { max-width: 100% !important; padding: 0; height: 100vh !important; overflow-y: auto !important; } | |
| """ | |
| with gr.Blocks(title="LifeFlow AI (MCP)", css=container_css) as demo: | |
| gr.HTML(get_enhanced_css()) | |
| session_state = gr.State(UserSession().to_dict()) | |
| with gr.Column(elem_classes="step-container"): | |
| home_btn, settings_btn, doc_btn = create_header() | |
| stepper = create_progress_stepper(1) | |
| status_bar = gr.Markdown("Initializing MCP...", visible=False) | |
| # --- Steps UI (Copy from previous) --- | |
| # 為節省篇幅,這裡省略中間 UI 宣告代碼,它們與之前完全相同 | |
| # 請確保這裡是完整的 create_input_form, step2, step3, step4 定義 | |
| # ... | |
| with gr.Group(visible=True, elem_classes="step-container centered-input-container") as step1_container: | |
| (input_area, s1_stream_output, user_input, auto_loc, loc_group, lat_in, lon_in, analyze_btn) = create_input_form("") | |
| with gr.Group(visible=False, elem_classes="step-container") as step2_container: | |
| gr.Markdown("### ✅ Review & Refine Tasks") | |
| with gr.Row(elem_classes="step2-split-view"): | |
| with gr.Column(scale=1): | |
| with gr.Group(elem_classes="panel-container"): | |
| gr.HTML('<div class="panel-header">📋 Your Tasks</div>') | |
| with gr.Group(elem_classes="scrollable-content"): | |
| task_summary_box = gr.HTML() | |
| task_list_box = gr.HTML() | |
| with gr.Column(scale=1): | |
| with gr.Group(elem_classes="chat-panel-native"): | |
| chatbot = gr.Chatbot( | |
| label="AI Assistant", | |
| type="messages", | |
| height=575, | |
| elem_classes="native-chatbot", | |
| bubble_full_width=False | |
| ) | |
| with gr.Row(elem_classes="chat-input-row"): | |
| chat_input = gr.Textbox(show_label=False, placeholder="Type to modify tasks...", container=False, scale=5, autofocus=True) | |
| chat_send = gr.Button("➤", variant="primary", scale=1, min_width=50) | |
| with gr.Row(elem_classes="action-footer"): | |
| back_btn = gr.Button("← Back", variant="secondary", scale=1) | |
| plan_btn = gr.Button("🚀 Start Planning", variant="primary", scale=2) | |
| with gr.Group(visible=False, elem_classes="step-container") as step3_container: | |
| gr.Markdown("### 🤖 AI Team Operations") | |
| with gr.Group(elem_classes="agent-dashboard-container"): | |
| agent_dashboard = gr.HTML(value=self._get_dashboard_html()) | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| with gr.Tabs(): | |
| with gr.Tab("📝 Full Report"): | |
| with gr.Group(elem_classes="live-report-wrapper"): | |
| live_report_md = gr.Markdown("🧠 Analyzing your tasks...") | |
| with gr.Tab("📋 Task List"): | |
| with gr.Group(elem_classes="panel-container"): | |
| with gr.Group(elem_classes="scrollable-content"): | |
| task_list_s3 = gr.HTML() | |
| with gr.Column(scale=1): | |
| cancel_plan_btn = gr.Button("🛑 Stop & Back to Edit", variant="stop") | |
| gr.Markdown("### ⚡ Activity Log") | |
| with gr.Group(elem_classes="panel-container"): | |
| planning_log = gr.HTML(value="Waiting...") | |
| with gr.Group(visible=False, elem_classes="step-container") as step4_container: | |
| # 定義狀態:抽屜預設開啟 | |
| drawer_state = gr.State(True) | |
| with gr.Tabs(): | |
| # === Tab 1: 地圖 === | |
| with gr.Tab("🗺️ Route Map", id="tab_map"): | |
| with gr.Group(elem_classes="map-container-relative"): | |
| # 1. 地圖 (HTML) | |
| # 注意:確保這裡是 map_view | |
| map_view = gr.HTML(label="Route Map") | |
| # 2. 浮動抽屜 (Overlay) | |
| with gr.Group(visible=True, elem_classes="map-overlay-drawer") as summary_drawer: | |
| with gr.Row(elem_classes="drawer-header"): | |
| gr.Markdown("### 📊 Trip Summary", elem_classes="drawer-title") | |
| close_drawer_btn = gr.Button("✕", elem_classes="drawer-close-btn", size="sm") | |
| with gr.Group(elem_classes="drawer-content"): | |
| with gr.Tabs(elem_classes="drawer-tabs"): | |
| with gr.Tab("Overview"): | |
| summary_tab_output = gr.HTML() | |
| with gr.Tab("Tasks"): | |
| task_list_tab_output = gr.HTML() | |
| # 3. 開啟按鈕 (浮動,預設隱藏) | |
| # 加上 variant='primary' 確保它醒目 | |
| open_drawer_btn = gr.Button("📊 Show Summary", visible=False, variant="primary", | |
| elem_classes="map-overlay-btn") | |
| # === Tab 2: 完整報告 === | |
| with gr.Tab("📝 Full Report", id="tab_report"): | |
| # 使用 live-report-wrapper 確保有白紙背景效果 | |
| with gr.Group(elem_classes="live-report-wrapper"): | |
| report_tab_output = gr.Markdown() | |
| # === 事件綁定 (控制抽屜) === | |
| # 關閉:抽屜消失,按鈕出現 | |
| close_drawer_btn.click( | |
| fn=lambda: (gr.update(visible=False), gr.update(visible=True)), | |
| outputs=[summary_drawer, open_drawer_btn] | |
| ) | |
| # 開啟:抽屜出現,按鈕消失 | |
| open_drawer_btn.click( | |
| fn=lambda: (gr.update(visible=True), gr.update(visible=False)), | |
| outputs=[summary_drawer, open_drawer_btn] | |
| ) | |
| # --- Modals & Events --- | |
| (settings_modal, g_key, g_stat, w_key, w_stat, llm_provider, main_key, main_stat, model_sel, | |
| fast_mode_chk, groq_model_sel, groq_key, groq_stat, close_set, save_set, set_stat) = create_settings_modal() | |
| # Validators | |
| g_key.blur(fn=lambda k: "⏳ Checking..." if k else "", inputs=[g_key], outputs=[g_stat]).then(fn=APIValidator.test_google_maps, inputs=[g_key], outputs=[g_stat]) | |
| w_key.blur(fn=lambda k: "⏳ Checking..." if k else "", inputs=[w_key], outputs=[w_stat]).then(fn=APIValidator.test_openweather, inputs=[w_key], outputs=[w_stat]) | |
| main_key.blur(fn=lambda k: "⏳ Checking..." if k else "", inputs=[main_key], outputs=[main_stat]).then(fn=APIValidator.test_llm, inputs=[llm_provider, main_key, model_sel], outputs=[main_stat]) | |
| def resolve_groq_visibility(fast_mode, provider): | |
| if not fast_mode: return (gr.update(visible=False), gr.update(visible=False), gr.update(visible=False)) | |
| model_vis = gr.update(visible=True) | |
| if provider == "Groq": return (model_vis, gr.update(visible=False), gr.update(visible=False)) | |
| else: return (model_vis, gr.update(visible=True), gr.update(visible=True)) | |
| fast_mode_chk.change(fn=resolve_groq_visibility, inputs=[fast_mode_chk, llm_provider], outputs=[groq_model_sel, groq_key, groq_stat]) | |
| def on_provider_change(provider, fast_mode): | |
| new_choices = MODEL_OPTIONS.get(provider, []) | |
| default_val = new_choices[0][1] if new_choices else "" | |
| model_update = gr.Dropdown(choices=new_choices, value=default_val) | |
| g_model_vis, g_key_vis, g_stat_vis = resolve_groq_visibility(fast_mode, provider) | |
| return (model_update, gr.Textbox(value=""), gr.Markdown(value=""), g_model_vis, g_key_vis, g_stat_vis) | |
| llm_provider.change(fn=on_provider_change, inputs=[llm_provider, fast_mode_chk], outputs=[model_sel, main_key, main_stat, groq_model_sel, groq_key, groq_stat]) | |
| doc_modal, close_doc_btn = create_doc_modal() | |
| settings_btn.click(fn=lambda: gr.update(visible=True), outputs=[settings_modal]) | |
| close_set.click(fn=lambda: gr.update(visible=False), outputs=[settings_modal]) | |
| doc_btn.click(fn=lambda: gr.update(visible=True), outputs=[doc_modal]) | |
| close_doc_btn.click(fn=lambda: gr.update(visible=False), outputs=[doc_modal]) | |
| # Actions | |
| analyze_btn.click(fn=self.analyze_wrapper, inputs=[user_input, auto_loc, lat_in, lon_in, session_state], outputs=[step1_container, step2_container, step3_container, s1_stream_output, task_list_box, task_summary_box, chatbot, session_state]).then(fn=lambda: update_stepper(2), outputs=[stepper]) | |
| chat_send.click(fn=self.chat_wrapper, inputs=[chat_input, session_state], outputs=[chatbot, task_list_box, task_summary_box, session_state]).then(fn=lambda: "", outputs=[chat_input]) | |
| chat_input.submit(fn=self.chat_wrapper, inputs=[chat_input, session_state], outputs=[chatbot, task_list_box, task_summary_box, session_state]).then(fn=lambda: "", outputs=[chat_input]) | |
| # 🔥 Step 3 Start | |
| step3_start = plan_btn.click(fn=lambda: (gr.update(visible=False), gr.update(visible=True), update_stepper(3)), outputs=[step2_container, step3_container, stepper]).then(fn=self.step3_wrapper, inputs=[session_state], outputs=[agent_dashboard, planning_log, live_report_md, task_list_s3, session_state], show_progress="hidden") | |
| cancel_plan_btn.click(fn=self.cancel_wrapper, inputs=[session_state], outputs=None).then(fn=lambda: (gr.update(visible=True), gr.update(visible=False), update_stepper(2)), inputs=None, outputs=[step2_container, step3_container, stepper]) | |
| step3_start.then(fn=self.step4_wrapper, inputs=[session_state], outputs=[step3_container, step4_container, summary_tab_output, report_tab_output, task_list_tab_output, map_view, session_state]).then(fn=lambda: update_stepper(4), outputs=[stepper]) | |
| def reset_all(session_data): | |
| old_s = UserSession.from_dict(session_data) | |
| new_s = UserSession() | |
| new_s.custom_settings = old_s.custom_settings | |
| return (gr.update(visible=True), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), update_stepper(1), new_s.to_dict(), "") | |
| home_btn.click(fn=self.cancel_wrapper, inputs=[session_state], outputs=None).then(fn=reset_all, inputs=[session_state], outputs=[step1_container, step2_container, step3_container, step4_container, stepper, session_state, user_input]) | |
| back_btn.click(fn=self.cancel_wrapper, inputs=[session_state], outputs=None).then(fn=reset_all, inputs=[session_state], outputs=[step1_container, step2_container, step3_container, step4_container, stepper, session_state, user_input]) | |
| save_set.click(fn=self.save_settings, inputs=[g_key, w_key, llm_provider, main_key, model_sel, fast_mode_chk, groq_key, groq_model_sel, session_state], outputs=[settings_modal, session_state, status_bar]) | |
| auto_loc.change(fn=toggle_location_inputs, inputs=auto_loc, outputs=loc_group) | |
| demo.load(fn=self._check_api_status, inputs=[session_state], outputs=[status_bar]) | |
| return demo | |
| # ==================== 5. Main ==================== | |
| def main(): | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| async def run_app_lifecycle(): | |
| async with lifespan_manager(): | |
| app = LifeFlowAI() | |
| # 🔥 注入工具字典 (Dictionary) | |
| app.inject_tools(GLOBAL_TOOLKITS_DICT) | |
| demo = app.build_interface() | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=True, | |
| show_error=True, | |
| prevent_thread_lock=True | |
| ) | |
| print("✨ App is running. Press Ctrl+C to stop.") | |
| try: | |
| while True: await asyncio.sleep(1) | |
| except asyncio.CancelledError: | |
| pass | |
| try: | |
| loop.run_until_complete(run_app_lifecycle()) | |
| except KeyboardInterrupt: | |
| pass | |
| except Exception as e: | |
| print("\n❌ CRITICAL ERROR STARTUP FAILED ❌") | |
| traceback.print_exc() | |
| finally: | |
| print("🛑 Shutting down process...") | |
| import os | |
| os._exit(0) | |
| if __name__ == "__main__": | |
| main() |