LifeFlow-AI / app.py
Marco310's picture
feat: improve UI/UX
0fe3818
"""
LifeFlow AI - Main Application (Fixed for Exclusive Tools Dict)
✅ 修正錯誤: 確保傳遞給 Service的是 Dictionary 而不是單一 Object
✅ 實現多通道隔離 (Scout 只能用 Scout 的工具)
"""
import gradio as gr
import asyncio
import os
import sys
import traceback
from contextlib import asynccontextmanager
from datetime import datetime
# Agno / MCP Imports
from agno.tools.mcp import MCPTools
# UI Imports
from ui.theme import get_enhanced_css
from ui.components.header import create_header
from ui.components.progress_stepper import create_progress_stepper, update_stepper
from ui.components.input_form import create_input_form, toggle_location_inputs
from ui.components.modals import create_settings_modal, create_doc_modal
from ui.renderers import create_agent_dashboard, create_summary_card
# Core Imports
from core.session import UserSession
from services.planner_service import PlannerService
from services.validator import APIValidator
from config import MODEL_OPTIONS
from src.infra.client_context import client_session_ctx
# ==================== 1. Tool Isolation Config ====================
# 定義每個 Agent 專屬的工具名稱 (對應 mcp_server_lifeflow.py 裡的 @mcp.tool 名稱)
AGENT_TOOL_MAP = {
"scout": "search_and_offload",
"optimizer": "optimize_from_ref",
"navigator": "calculate_traffic_and_timing",
"weatherman": "check_weather_for_timeline",
"presenter": "read_final_itinerary",
}
# 全域變數:存放 "Agent Name" -> "MCPToolkit Instance" 的對照表
GLOBAL_TOOLKITS_DICT = {}
# ==================== 2. MCP Lifecycle Manager ====================
def inject_context_into_tools(toolkit):
"""
手術刀函式:
1. [Context] 為每個工具安裝攔截器,自動注入 session_id。
2. [Rename] (新增) 移除 Agno 自動加上的底線前綴,讓 Agent 能找到工具。
"""
# 嘗試取得工具列表
funcs = getattr(toolkit, "functions", {})
# 如果是 List,轉成 Dict 方便我們操作名稱
# (Agno 的 MCPToolkit 通常是用 Dict 存的: {name: function})
if isinstance(funcs, list):
print("⚠️ Warning: toolkit.functions is a list. Renaming might be tricky.")
tool_iterator = funcs
elif isinstance(funcs, dict):
tool_iterator = list(funcs.values()) # 轉成 List 以便安全迭代
else:
print(f"⚠️ Warning: Unknown functions type: {type(funcs)}")
return 0
count = 0
renamed_count = 0
for tool in tool_iterator:
# --- A. 執行 Context 注入 (原本的邏輯) ---
original_entrypoint = tool.entrypoint
async def context_aware_wrapper(*args, **kwargs):
current_sid = client_session_ctx.get()
if current_sid:
kwargs['session_id'] = current_sid
if asyncio.iscoroutinefunction(original_entrypoint):
return await original_entrypoint(*args, **kwargs)
else:
return original_entrypoint(*args, **kwargs)
tool.entrypoint = context_aware_wrapper
count += 1
# --- B. 執行更名手術 (新增邏輯) ---
# 如果名字是以 "_" 開頭 (例如 _search_and_offload)
if tool.name.startswith("_"):
old_name = tool.name
new_name = old_name[1:] # 去掉第一個字元
# 1. 改工具物件本身的名字
tool.name = new_name
# 2. 如果 funcs 是字典,也要更新 Key,不然 Agent 查表會查不到
if isinstance(funcs, dict):
# 建立新 Key,移除舊 Key
funcs[new_name] = tool
if old_name in funcs:
del funcs[old_name]
renamed_count += 1
# print(f" 🔧 Renamed '{old_name}' -> '{new_name}'")
print(f" 🛡️ Injection: {count} tools patched, {renamed_count} tools renamed.")
return count
@asynccontextmanager
async def lifespan_manager():
"""
MCP 生命週期管理器 (多通道模式):
迴圈建立 5 個獨立的 MCP Client,每個只包含該 Agent 需要的工具。
"""
global GLOBAL_TOOLKITS_DICT
print("🚀 [System] Initializing Exclusive MCP Connections...")
server_script = "mcp_server_lifeflow.py"
env_vars = os.environ.copy()
env_vars["PYTHONUNBUFFERED"] = "1"
started_toolkits = []
try:
# 🔥 迴圈建立隔離的工具箱
for agent_name, tool_name in AGENT_TOOL_MAP.items():
print(f" ⚙️ Connecting {agent_name} -> tool: {tool_name}...")
# 建立專屬連線
tool = MCPTools(
command=f"{sys.executable} {server_script}",
env=env_vars,
# 🔥 關鍵:只允許看見這個工具
include_tools=[tool_name]
)
# 啟動連線
await tool.connect()
inject_context_into_tools(tool)
GLOBAL_TOOLKITS_DICT[agent_name] = tool
# 存入字典
started_toolkits.append(tool)
print(f"✅ [System] All {len(started_toolkits)} MCP Channels Ready!")
yield
except Exception as e:
print(f"❌ [System] MCP Connection Failed: {e}")
traceback.print_exc()
yield
finally:
print("🔻 [System] Closing All MCP Connections...")
for name, tool in GLOBAL_TOOLKITS_DICT.items():
try:
await tool.close()
except Exception as e:
print(f"⚠️ Error closing {name}: {e}")
print("🏁 [System] Shutdown Complete.")
# ==================== 3. Main Application Class ====================
class LifeFlowAI:
def __init__(self):
self.service = PlannerService()
def inject_tools(self, toolkits_dict):
"""
依賴注入:接收工具字典
"""
# 這裡會檢查傳進來的是不是字典,避免你剛才遇到的錯誤
if isinstance(toolkits_dict, dict) and toolkits_dict:
self.service.set_global_tools(toolkits_dict)
print("💉 [App] Tools Dictionary injected into Service.")
else:
print(f"⚠️ [App] Warning: Invalid toolkits format. Expected dict, got {type(toolkits_dict)}")
# ... (Event Wrappers 保持不變,因為它們只呼叫 self.service) ...
def cancel_wrapper(self, session_data):
session = UserSession.from_dict(session_data)
if session.session_id: self.service.cancel_session(session.session_id)
def _get_dashboard_html(self, active_agent: str = None, status: str = "idle", message: str = "Waiting") -> str:
agents = ['team', 'scout', 'optimizer', 'navigator', 'weatherman', 'presenter']
status_dict = {a: {'status': 'idle', 'message': 'Standby'} for a in agents}
if active_agent in status_dict:
status_dict[active_agent] = {'status': status, 'message': message}
if active_agent != 'team' and status == 'working':
status_dict['team'] = {'status': 'working', 'message': f'Monitoring {active_agent}...'}
return create_agent_dashboard(status_dict)
def _check_api_status(self, session_data):
session = UserSession.from_dict(session_data)
has_key = bool(session.custom_settings.get("model_api_key")) or bool(os.environ.get("OPENAI_API_KEY")) or bool(os.environ.get("GOOGLE_API_KEY"))
return "✅ System Ready (Exclusive Mode)" if has_key else "⚠️ Check API Keys"
def _get_gradio_chat_history(self, session):
return [{"role": msg["role"], "content": msg["message"]} for msg in session.chat_history]
# --- Wrappers ---
def analyze_wrapper(self, user_input, auto_loc, lat, lon, session_data):
session = UserSession.from_dict(session_data)
iterator = self.service.run_step1_analysis(user_input, auto_loc, lat, lon, session)
for event in iterator:
evt_type = event.get("type")
current_session = event.get("session", session)
if evt_type == "error":
gr.Warning(event.get('message'))
yield (gr.update(visible=True), gr.update(visible=False), gr.update(visible=False), gr.HTML(f"<div style='color:red'>{event.get('message')}</div>"), gr.update(), gr.update(), gr.update(), current_session.to_dict())
return
if evt_type == "stream":
yield (gr.update(visible=True), gr.update(visible=False), gr.update(visible=False), event.get("stream_text", ""), gr.update(), gr.update(), gr.update(), current_session.to_dict())
elif evt_type == "complete":
tasks_html = self.service.generate_task_list_html(current_session)
date_str = event.get("start_time", "N/A")
summary_html = create_summary_card(len(current_session.task_list), event.get("high_priority", 0), event.get("total_time", 0), location=event.get("start_location", "N/A"), date=date_str)
chat_history = self._get_gradio_chat_history(current_session)
if not chat_history:
chat_history = [{"role": "assistant", "content": event.get('stream_text', "")}]
current_session.chat_history.append({"role": "assistant", "message": "Hi! I'm LifeFlow...", "time": ""})
yield (gr.update(visible=False), gr.update(visible=True), gr.update(visible=False), "", gr.HTML(tasks_html), gr.HTML(summary_html), chat_history, current_session.to_dict())
def chat_wrapper(self, msg, session_data):
session = UserSession.from_dict(session_data)
iterator = self.service.modify_task_chat(msg, session)
for event in iterator:
sess = event.get("session", session)
tasks_html = self.service.generate_task_list_html(sess)
summary_html = event.get("summary_html", gr.HTML())
gradio_history = self._get_gradio_chat_history(sess)
yield (gradio_history, tasks_html, summary_html, sess.to_dict())
# 🔥 記得改成 async def
async def step3_wrapper(self, session_data):
session = UserSession.from_dict(session_data)
log_content, report_content = "", ""
tasks_html = self.service.generate_task_list_html(session)
init_dashboard = self._get_dashboard_html('team', 'working', 'Initializing...')
loading_html = '<div style="text-align: center; padding: 60px;">🧠 Analyzing...</div>'
init_log = '<div style="padding: 10px; color: #94a3b8;">Waiting for agents...</div>'
yield (init_dashboard, init_log, loading_html, tasks_html, session.to_dict())
try:
# 🔥 使用 async for
async for event in self.service.run_step3_team(session):
sess = event.get("session", session)
evt_type = event.get("type")
if evt_type == "error": raise Exception(event.get("message"))
if evt_type == "report_stream": report_content = event.get("content", "")
if evt_type == "reasoning_update":
agent, status, msg = event.get("agent_status")
time_str = datetime.now().strftime('%H:%M:%S')
log_entry = f'<div style="margin-bottom: 8px; border-left: 3px solid #6366f1; padding-left: 10px;"><div style="font-size: 0.75rem; color: #94a3b8;">{time_str}{agent.upper()}</div><div style="color: #334155; font-size: 0.9rem;">{msg}</div></div>'
log_content = log_entry + log_content
dashboard_html = self._get_dashboard_html(agent, status, msg)
log_html = f'<div style="height: 500px; overflow-y: auto; padding: 10px; background: #fff;">{log_content}</div>'
current_report = report_content + "\n\n" if report_content else loading_html
yield (dashboard_html, log_html, current_report, tasks_html, sess.to_dict())
if evt_type == "complete":
final_report = event.get("report_html", report_content)
final_log = f'<div style="margin-bottom: 8px; border-left: 3px solid #10b981; padding-left: 10px; background: #ecfdf5; padding: 10px;"><div style="font-weight: bold; color: #059669;">✅ Planning Completed</div></div>{log_content}'
final_log_html = f'<div style="height: 500px; overflow-y: auto; padding: 10px; background: #fff;">{final_log}</div>'
yield (self._get_dashboard_html('team', 'complete', 'Planning Finished'), final_log_html, final_report, tasks_html, sess.to_dict())
except Exception as e:
error_html = f"<div style='color:red'>Error: {str(e)}</div>"
yield (self._get_dashboard_html('team', 'error', 'Failed'), error_html, "", tasks_html, session.to_dict())
def step4_wrapper(self, session_data):
session = UserSession.from_dict(session_data)
result = self.service.run_step4_finalize(session)
if result['type'] == 'success':
return (gr.update(visible=False), gr.update(visible=True), result['summary_tab_html'], result['report_md'], result['task_list_html'], result['map_fig'], session.to_dict())
else:
return (gr.update(visible=True), gr.update(visible=False), "", "", "", None, session.to_dict())
def save_settings(self, g, w, prov, m_key, m_sel, fast, g_key_in, f_sel, s_data):
sess = UserSession.from_dict(s_data)
sess.custom_settings.update({'google_maps_api_key': g, 'openweather_api_key': w, 'llm_provider': prov, 'model_api_key': m_key, 'model': m_sel, 'enable_fast_mode': fast, 'groq_fast_model': f_sel, 'groq_api_key': g_key_in})
return gr.update(visible=False), sess.to_dict(), "✅ Configuration Saved"
# ==================== 4. UI Builder ====================
def build_interface(self):
container_css = """
.gradio-container { max-width: 100% !important; padding: 0; height: 100vh !important; overflow-y: auto !important; }
"""
with gr.Blocks(title="LifeFlow AI (MCP)", css=container_css) as demo:
gr.HTML(get_enhanced_css())
session_state = gr.State(UserSession().to_dict())
with gr.Column(elem_classes="step-container"):
home_btn, settings_btn, doc_btn = create_header()
stepper = create_progress_stepper(1)
status_bar = gr.Markdown("Initializing MCP...", visible=False)
# --- Steps UI (Copy from previous) ---
# 為節省篇幅,這裡省略中間 UI 宣告代碼,它們與之前完全相同
# 請確保這裡是完整的 create_input_form, step2, step3, step4 定義
# ...
with gr.Group(visible=True, elem_classes="step-container centered-input-container") as step1_container:
(input_area, s1_stream_output, user_input, auto_loc, loc_group, lat_in, lon_in, analyze_btn) = create_input_form("")
with gr.Group(visible=False, elem_classes="step-container") as step2_container:
gr.Markdown("### ✅ Review & Refine Tasks")
with gr.Row(elem_classes="step2-split-view"):
with gr.Column(scale=1):
with gr.Group(elem_classes="panel-container"):
gr.HTML('<div class="panel-header">📋 Your Tasks</div>')
with gr.Group(elem_classes="scrollable-content"):
task_summary_box = gr.HTML()
task_list_box = gr.HTML()
with gr.Column(scale=1):
with gr.Group(elem_classes="chat-panel-native"):
chatbot = gr.Chatbot(
label="AI Assistant",
type="messages",
height=575,
elem_classes="native-chatbot",
bubble_full_width=False
)
with gr.Row(elem_classes="chat-input-row"):
chat_input = gr.Textbox(show_label=False, placeholder="Type to modify tasks...", container=False, scale=5, autofocus=True)
chat_send = gr.Button("➤", variant="primary", scale=1, min_width=50)
with gr.Row(elem_classes="action-footer"):
back_btn = gr.Button("← Back", variant="secondary", scale=1)
plan_btn = gr.Button("🚀 Start Planning", variant="primary", scale=2)
with gr.Group(visible=False, elem_classes="step-container") as step3_container:
gr.Markdown("### 🤖 AI Team Operations")
with gr.Group(elem_classes="agent-dashboard-container"):
agent_dashboard = gr.HTML(value=self._get_dashboard_html())
with gr.Row():
with gr.Column(scale=3):
with gr.Tabs():
with gr.Tab("📝 Full Report"):
with gr.Group(elem_classes="live-report-wrapper"):
live_report_md = gr.Markdown("🧠 Analyzing your tasks...")
with gr.Tab("📋 Task List"):
with gr.Group(elem_classes="panel-container"):
with gr.Group(elem_classes="scrollable-content"):
task_list_s3 = gr.HTML()
with gr.Column(scale=1):
cancel_plan_btn = gr.Button("🛑 Stop & Back to Edit", variant="stop")
gr.Markdown("### ⚡ Activity Log")
with gr.Group(elem_classes="panel-container"):
planning_log = gr.HTML(value="Waiting...")
with gr.Group(visible=False, elem_classes="step-container") as step4_container:
# 定義狀態:抽屜預設開啟
drawer_state = gr.State(True)
with gr.Tabs():
# === Tab 1: 地圖 ===
with gr.Tab("🗺️ Route Map", id="tab_map"):
with gr.Group(elem_classes="map-container-relative"):
# 1. 地圖 (HTML)
# 注意:確保這裡是 map_view
map_view = gr.HTML(label="Route Map")
# 2. 浮動抽屜 (Overlay)
with gr.Group(visible=True, elem_classes="map-overlay-drawer") as summary_drawer:
with gr.Row(elem_classes="drawer-header"):
gr.Markdown("### 📊 Trip Summary", elem_classes="drawer-title")
close_drawer_btn = gr.Button("✕", elem_classes="drawer-close-btn", size="sm")
with gr.Group(elem_classes="drawer-content"):
with gr.Tabs(elem_classes="drawer-tabs"):
with gr.Tab("Overview"):
summary_tab_output = gr.HTML()
with gr.Tab("Tasks"):
task_list_tab_output = gr.HTML()
# 3. 開啟按鈕 (浮動,預設隱藏)
# 加上 variant='primary' 確保它醒目
open_drawer_btn = gr.Button("📊 Show Summary", visible=False, variant="primary",
elem_classes="map-overlay-btn")
# === Tab 2: 完整報告 ===
with gr.Tab("📝 Full Report", id="tab_report"):
# 使用 live-report-wrapper 確保有白紙背景效果
with gr.Group(elem_classes="live-report-wrapper"):
report_tab_output = gr.Markdown()
# === 事件綁定 (控制抽屜) ===
# 關閉:抽屜消失,按鈕出現
close_drawer_btn.click(
fn=lambda: (gr.update(visible=False), gr.update(visible=True)),
outputs=[summary_drawer, open_drawer_btn]
)
# 開啟:抽屜出現,按鈕消失
open_drawer_btn.click(
fn=lambda: (gr.update(visible=True), gr.update(visible=False)),
outputs=[summary_drawer, open_drawer_btn]
)
# --- Modals & Events ---
(settings_modal, g_key, g_stat, w_key, w_stat, llm_provider, main_key, main_stat, model_sel,
fast_mode_chk, groq_model_sel, groq_key, groq_stat, close_set, save_set, set_stat) = create_settings_modal()
# Validators
g_key.blur(fn=lambda k: "⏳ Checking..." if k else "", inputs=[g_key], outputs=[g_stat]).then(fn=APIValidator.test_google_maps, inputs=[g_key], outputs=[g_stat])
w_key.blur(fn=lambda k: "⏳ Checking..." if k else "", inputs=[w_key], outputs=[w_stat]).then(fn=APIValidator.test_openweather, inputs=[w_key], outputs=[w_stat])
main_key.blur(fn=lambda k: "⏳ Checking..." if k else "", inputs=[main_key], outputs=[main_stat]).then(fn=APIValidator.test_llm, inputs=[llm_provider, main_key, model_sel], outputs=[main_stat])
def resolve_groq_visibility(fast_mode, provider):
if not fast_mode: return (gr.update(visible=False), gr.update(visible=False), gr.update(visible=False))
model_vis = gr.update(visible=True)
if provider == "Groq": return (model_vis, gr.update(visible=False), gr.update(visible=False))
else: return (model_vis, gr.update(visible=True), gr.update(visible=True))
fast_mode_chk.change(fn=resolve_groq_visibility, inputs=[fast_mode_chk, llm_provider], outputs=[groq_model_sel, groq_key, groq_stat])
def on_provider_change(provider, fast_mode):
new_choices = MODEL_OPTIONS.get(provider, [])
default_val = new_choices[0][1] if new_choices else ""
model_update = gr.Dropdown(choices=new_choices, value=default_val)
g_model_vis, g_key_vis, g_stat_vis = resolve_groq_visibility(fast_mode, provider)
return (model_update, gr.Textbox(value=""), gr.Markdown(value=""), g_model_vis, g_key_vis, g_stat_vis)
llm_provider.change(fn=on_provider_change, inputs=[llm_provider, fast_mode_chk], outputs=[model_sel, main_key, main_stat, groq_model_sel, groq_key, groq_stat])
doc_modal, close_doc_btn = create_doc_modal()
settings_btn.click(fn=lambda: gr.update(visible=True), outputs=[settings_modal])
close_set.click(fn=lambda: gr.update(visible=False), outputs=[settings_modal])
doc_btn.click(fn=lambda: gr.update(visible=True), outputs=[doc_modal])
close_doc_btn.click(fn=lambda: gr.update(visible=False), outputs=[doc_modal])
# Actions
analyze_btn.click(fn=self.analyze_wrapper, inputs=[user_input, auto_loc, lat_in, lon_in, session_state], outputs=[step1_container, step2_container, step3_container, s1_stream_output, task_list_box, task_summary_box, chatbot, session_state]).then(fn=lambda: update_stepper(2), outputs=[stepper])
chat_send.click(fn=self.chat_wrapper, inputs=[chat_input, session_state], outputs=[chatbot, task_list_box, task_summary_box, session_state]).then(fn=lambda: "", outputs=[chat_input])
chat_input.submit(fn=self.chat_wrapper, inputs=[chat_input, session_state], outputs=[chatbot, task_list_box, task_summary_box, session_state]).then(fn=lambda: "", outputs=[chat_input])
# 🔥 Step 3 Start
step3_start = plan_btn.click(fn=lambda: (gr.update(visible=False), gr.update(visible=True), update_stepper(3)), outputs=[step2_container, step3_container, stepper]).then(fn=self.step3_wrapper, inputs=[session_state], outputs=[agent_dashboard, planning_log, live_report_md, task_list_s3, session_state], show_progress="hidden")
cancel_plan_btn.click(fn=self.cancel_wrapper, inputs=[session_state], outputs=None).then(fn=lambda: (gr.update(visible=True), gr.update(visible=False), update_stepper(2)), inputs=None, outputs=[step2_container, step3_container, stepper])
step3_start.then(fn=self.step4_wrapper, inputs=[session_state], outputs=[step3_container, step4_container, summary_tab_output, report_tab_output, task_list_tab_output, map_view, session_state]).then(fn=lambda: update_stepper(4), outputs=[stepper])
def reset_all(session_data):
old_s = UserSession.from_dict(session_data)
new_s = UserSession()
new_s.custom_settings = old_s.custom_settings
return (gr.update(visible=True), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), update_stepper(1), new_s.to_dict(), "")
home_btn.click(fn=self.cancel_wrapper, inputs=[session_state], outputs=None).then(fn=reset_all, inputs=[session_state], outputs=[step1_container, step2_container, step3_container, step4_container, stepper, session_state, user_input])
back_btn.click(fn=self.cancel_wrapper, inputs=[session_state], outputs=None).then(fn=reset_all, inputs=[session_state], outputs=[step1_container, step2_container, step3_container, step4_container, stepper, session_state, user_input])
save_set.click(fn=self.save_settings, inputs=[g_key, w_key, llm_provider, main_key, model_sel, fast_mode_chk, groq_key, groq_model_sel, session_state], outputs=[settings_modal, session_state, status_bar])
auto_loc.change(fn=toggle_location_inputs, inputs=auto_loc, outputs=loc_group)
demo.load(fn=self._check_api_status, inputs=[session_state], outputs=[status_bar])
return demo
# ==================== 5. Main ====================
def main():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
async def run_app_lifecycle():
async with lifespan_manager():
app = LifeFlowAI()
# 🔥 注入工具字典 (Dictionary)
app.inject_tools(GLOBAL_TOOLKITS_DICT)
demo = app.build_interface()
demo.launch(
server_name="0.0.0.0",
server_port=7860,
share=True,
show_error=True,
prevent_thread_lock=True
)
print("✨ App is running. Press Ctrl+C to stop.")
try:
while True: await asyncio.sleep(1)
except asyncio.CancelledError:
pass
try:
loop.run_until_complete(run_app_lifecycle())
except KeyboardInterrupt:
pass
except Exception as e:
print("\n❌ CRITICAL ERROR STARTUP FAILED ❌")
traceback.print_exc()
finally:
print("🛑 Shutting down process...")
import os
os._exit(0)
if __name__ == "__main__":
main()