""" LifeFlow AI - Multi-User Safe with Real Streaming 支持多用戶並發 + 真正的串流輸出 + 安全的設定隔離 """ import sys from pathlib import Path import gradio as gr from datetime import datetime import time as time_module import json import uuid from typing import Dict, Any, Optional # ===== 導入配置 ===== from config import DEFAULT_SETTINGS, APP_TITLE # ===== 導入 UI 組件 ===== from ui.theme import get_enhanced_css from ui.components.header import create_header, create_top_controls from ui.components.input_form import create_input_form, toggle_location_inputs from ui.components.confirmation import create_confirmation_area, create_exit_button from ui.components.results import create_team_area, create_result_area, create_tabs from ui.components.modals import create_settings_modal, create_doc_modal # ===== 導入核心工具 ===== from core.utils import ( create_agent_stream_output, create_agent_card_enhanced, create_task_card, create_summary_card, create_animated_map, get_reasoning_html_reversed, create_celebration_animation, create_result_visualization ) # ===== 導入 Agno Agent 組件 ===== from agno.models.google import Gemini from agno.agent import RunEvent from agno.run.team import TeamRunEvent # ===== 導入 Agent 系統 ===== from src.agent.base import UserState, Location, get_context from src.agent.planner import create_planner_agent from src.agent.core_team import create_core_team from src.infra.context import set_session_id, get_session_id from src.infra.poi_repository import poi_repo from src.tools import ( ScoutToolkit, OptimizationToolkit, NavigationToolkit, WeatherToolkit, ReaderToolkit ) from src.infra.config import get_settings from src.infra.logger import get_logger logger = get_logger(__name__) class UserSession: """ 單個用戶的會話數據 每個用戶有獨立的實例,確保資料隔離 """ def __init__(self): self.session_id: Optional[str] = None self.planner_agent = None self.core_team = None self.user_state: Optional[UserState] = None self.task_list: list = [] self.reasoning_messages: list = [] self.chat_history: list = [] self.planning_completed: bool = False # 存儲位置信息(用於重新初始化 Agent) self.lat: Optional[float] = None self.lng: Optional[float] = None # [Security Fix] 用戶個人的自定義設定 (API Keys, Model Choice) self.custom_settings: Dict[str, Any] = {} # 系統預設設定 (從環境變數讀取) self.agno_settings = get_settings() def to_dict(self) -> Dict[str, Any]: """序列化為字典(用於 Gradio State)""" return { 'session_id': self.session_id, 'task_list': self.task_list, 'reasoning_messages': self.reasoning_messages, 'chat_history': self.chat_history, 'planning_completed': self.planning_completed, 'lat': self.lat, 'lng': self.lng, # [Security Fix] 保存用戶自定義設定 'custom_settings': self.custom_settings } @classmethod def from_dict(cls, data: Dict[str, Any]) -> 'UserSession': """從字典恢復(用於 Gradio State)""" session = cls() session.session_id = data.get('session_id') session.task_list = data.get('task_list', []) session.reasoning_messages = data.get('reasoning_messages', []) session.chat_history = data.get('chat_history', []) session.planning_completed = data.get('planning_completed', False) session.lat = data.get('lat') session.lng = data.get('lng') # [Security Fix] 恢復用戶自定義設定 session.custom_settings = data.get('custom_settings', {}) return session class LifeFlowAI: """LifeFlow AI - Multi-User Safe Version""" def __init__(self): # [Security Fix] 移除全域 settings,避免用戶間資料汙染 pass def _initialize_agents(self, session: UserSession, lat: float, lng: float): """初始化 Agents(每個用戶獨立,並應用用戶設定)""" session.lat = lat session.lng = lng if session.planner_agent is not None: logger.debug(f"Agents already initialized for session {session.session_id}") return # 生成 Session ID if session.session_id is None: session.session_id = str(uuid.uuid4()) token = set_session_id(session.session_id) logger.info(f"🆔 New Session: {session.session_id}") else: set_session_id(session.session_id) logger.info(f"🔄 Restoring Session: {session.session_id}") # 設定用戶狀態 session.user_state = UserState(location=Location(lat=lat, lng=lng)) # [Security Fix] 讀取用戶選擇的模型,如果沒有則使用預設 selected_model_id = session.custom_settings.get('model', 'gemini-2.5-flash') # [Security Fix] 優先使用用戶提供的 API Key (這裡以 Gemini 為例,若支援其他模型需擴充邏輯) # 注意:實際應用中需根據選擇的 Model ID (Claude/Gemini) 來決定使用哪個 Key gemini_key = session.custom_settings.get('gemini_api_key') or session.agno_settings.gemini_api_key # 初始化模型 (應用設定) planner_model = Gemini( id=selected_model_id, thinking_budget=2048, api_key=gemini_key ) main_model = Gemini( id=selected_model_id, thinking_budget=1024, api_key=gemini_key ) lite_model = Gemini( id="gemini-2.5-flash-lite", # 輕量級模型通常固定或由次要選項決定 api_key=gemini_key ) # 配置模型和工具 models_dict = { "team": main_model, "scout": main_model, "optimizer": lite_model, "navigator": lite_model, "weatherman": lite_model, "presenter": main_model, } # [Note] 如果 Toolkit 支援傳入 API Key,應在此處從 session.custom_settings 傳入 tools_dict = { "scout": [ScoutToolkit()], "optimizer": [OptimizationToolkit()], "navigator": [NavigationToolkit()], "weatherman": [WeatherToolkit()], "presenter": [ReaderToolkit()], } planner_kwargs = { "additional_context": get_context(session.user_state), "timezone_identifier": session.user_state.utc_offset, "debug_mode": False, } team_kwargs = { "timezone_identifier": session.user_state.utc_offset, } # 創建 Agents session.planner_agent = create_planner_agent( planner_model, planner_kwargs, session_id=session.session_id ) session.core_team = create_core_team( models_dict, team_kwargs, tools_dict, session_id=session.session_id ) logger.info(f"✅ Agents initialized for session {session.session_id} using model {selected_model_id}") def step1_analyze_tasks(self, user_input: str, auto_location: bool, lat: float, lon: float, session: UserSession): """Step 1: 真正的串流分析""" if not user_input.strip(): yield from self._empty_step1_outputs(session) return if auto_location: lat, lon = 25.033, 121.565 try: self._initialize_agents(session, lat, lon) self._add_reasoning(session, "planner", "🚀 Starting analysis...") yield self._create_step1_outputs( stream_text="🤔 Analyzing your request with AI...", session=session, agent_status=("planner", "working", "Initializing...") ) time_module.sleep(0.3) self._add_reasoning(session, "planner", f"Processing: {user_input[:50]}...") yield self._create_step1_outputs( stream_text="🤔 Analyzing your request with AI...\n📋 AI is extracting tasks...", session=session, agent_status=("planner", "working", "Extracting tasks...") ) planner_stream = session.planner_agent.run( f"help user to update the task_list, user's message: {user_input}", stream=True, stream_events=True ) accumulated_response = "" displayed_text = "🤔 Analyzing your request with AI...\n📋 AI is extracting tasks...\n\n" show_content = True for chunk in planner_stream: if chunk.event == RunEvent.run_content: content = chunk.content accumulated_response += content if show_content: if "@@@" in accumulated_response: show_content = False remaining = content.split("@@@")[0] if remaining: displayed_text += remaining else: displayed_text += content yield self._create_step1_outputs( stream_text=displayed_text, session=session, agent_status=("planner", "working", "Processing...") ) json_data = "{" + accumulated_response.split("{", maxsplit=1)[-1] json_data = json_data.replace("`", "").replace("@", "").replace("\\", " ").replace("\n", " ") session.planner_agent.update_session_state( session_id=session.session_id, session_state_updates={"task_list": json_data} ) try: task_list_data = json.loads(json_data) session.task_list = self._convert_task_list_to_ui_format(task_list_data) except json.JSONDecodeError as e: logger.error(f"Failed to parse task_list: {e}") session.task_list = [] self._add_reasoning(session, "planner", f"✅ Extracted {len(session.task_list)} tasks") high_priority = sum(1 for t in session.task_list if t.get("priority") == "HIGH") total_time = sum( int(t.get("duration", "0").split()[0]) for t in session.task_list if t.get("duration") ) final_text = displayed_text + f"\n✅ Analysis complete! Found {len(session.task_list)} tasks." yield self._create_step1_complete_outputs( stream_text=final_text, session=session, high_priority=high_priority, total_time=total_time ) except Exception as e: logger.error(f"Error in step1: {e}", exc_info=True) yield self._create_error_outputs(str(e), session) def modify_task_chat(self, user_message: str, session: UserSession): """修改任務(帶真正的串流)""" if not user_message.strip(): chat_html = self._generate_chat_history_html(session) task_html = self._generate_task_list_html(session) yield chat_html, task_html, session.to_dict() return session.chat_history.append({ "role": "user", "message": user_message, "time": datetime.now().strftime("%H:%M:%S") }) yield ( self._generate_chat_history_html(session), self._generate_task_list_html(session), session.to_dict() ) try: if session.planner_agent is None: if session.lat is not None and session.lng is not None: session.chat_history.append({ "role": "assistant", "message": "🔄 Restoring AI system...", "time": datetime.now().strftime("%H:%M:%S") }) yield ( self._generate_chat_history_html(session), self._generate_task_list_html(session), session.to_dict() ) self._initialize_agents(session, session.lat, session.lng) session.chat_history.pop() else: session.chat_history.append({ "role": "assistant", "message": "❌ Error: Please restart the planning process.", "time": datetime.now().strftime("%H:%M:%S") }) yield ( self._generate_chat_history_html(session), self._generate_task_list_html(session), session.to_dict() ) return session.chat_history.append({ "role": "assistant", "message": "🤔 AI is thinking...", "time": datetime.now().strftime("%H:%M:%S") }) yield ( self._generate_chat_history_html(session), self._generate_task_list_html(session), session.to_dict() ) planner_stream = session.planner_agent.run( f"help user to update the task_list, user's message: {user_message}", stream=True, stream_events=True ) accumulated_response = "" displayed_thinking = "🤔 AI is thinking...\n\n" show_content = True for chunk in planner_stream: if chunk.event == RunEvent.run_content: content = chunk.content accumulated_response += content if show_content: if "@@@" in accumulated_response: show_content = False content = content.split("@@@")[0] if content: displayed_thinking += content session.chat_history[-1] = { "role": "assistant", "message": displayed_thinking, "time": datetime.now().strftime("%H:%M:%S") } yield ( self._generate_chat_history_html(session), self._generate_task_list_html(session), session.to_dict() ) json_data = "{" + accumulated_response.split("{", maxsplit=1)[-1] json_data = json_data.replace("`", "").replace("@", "").replace("\\", " ").replace("\n", " ") session.planner_agent.update_session_state( session_id=session.session_id, session_state_updates={"task_list": json_data} ) task_list_data = json.loads(json_data) session.task_list = self._convert_task_list_to_ui_format(task_list_data) session.chat_history[-1] = { "role": "assistant", "message": "✅ Tasks updated based on your request", "time": datetime.now().strftime("%H:%M:%S") } self._add_reasoning(session, "planner", f"Updated: {user_message[:30]}...") yield ( self._generate_chat_history_html(session), self._generate_task_list_html(session), session.to_dict() ) except Exception as e: logger.error(f"Error in modify_task_chat: {e}", exc_info=True) if session.chat_history and "thinking" in session.chat_history[-1].get("message", "").lower(): session.chat_history.pop() session.chat_history.append({ "role": "assistant", "message": f"❌ Error: {str(e)}", "time": datetime.now().strftime("%H:%M:%S") }) yield ( self._generate_chat_history_html(session), self._generate_task_list_html(session), session.to_dict() ) def step2_search_pois(self, session: UserSession): """Step 2: Scout 開始工作""" self._add_reasoning(session, "team", "🚀 Core Team activated") self._add_reasoning(session, "scout", "Searching for POIs...") agent_updates = [ create_agent_card_enhanced("planner", "complete", "Tasks ready"), create_agent_card_enhanced("scout", "working", "Searching..."), *[create_agent_card_enhanced(k, "idle", "On standby") for k in ["optimizer", "validator", "weather", "traffic"]] ] return ( get_reasoning_html_reversed(session.reasoning_messages), "🗺️ Scout is searching...", *agent_updates, session.to_dict() ) def step3_run_core_team(self, session: UserSession): """Step 3: 運行 Core Team(帶串流)""" try: if session.core_team is None or session.planner_agent is None: if session.lat is not None and session.lng is not None: self._initialize_agents(session, session.lat, session.lng) else: raise ValueError("Cannot restore agents: location not available") set_session_id(session.session_id) task_list_input = session.planner_agent.get_session_state().get("task_list") if not task_list_input: raise ValueError("No task list available") if isinstance(task_list_input, str): task_list_str = task_list_input else: task_list_str = json.dumps(task_list_input, indent=2, ensure_ascii=False) self._add_reasoning(session, "team", "🎯 Multi-agent collaboration started") team_stream = session.core_team.run( f"Plan this trip: {task_list_str}", stream=True, stream_events=True, session_id=session.session_id ) report_content = "" for event in team_stream: if event.event in [TeamRunEvent.run_content]: report_content += event.content elif event.event == "tool_call": tool_name = event.tool_call.get('function', {}).get('name', 'unknown') self._add_reasoning(session, "team", f"🔧 Tool: {tool_name}") elif event.event == TeamRunEvent.run_completed: self._add_reasoning(session, "team", "🎉 Completed") report_html = f"## 🎯 Planning Complete\n\n{report_content}..." return report_html, session.to_dict() except Exception as e: logger.error(f"Error in step3: {e}") return f"## ❌ Error\n\n{str(e)}", session.to_dict() def step4_finalize(self, session: UserSession): """Step 4: 完成""" try: final_ref_id = poi_repo.get_last_id_by_session(session.session_id) if not final_ref_id: raise ValueError(f"No final result found for session {session.session_id}") structured_data = poi_repo.load(final_ref_id) timeline = structured_data.get("timeline", []) metrics = structured_data.get("metrics", {}) traffic_summary = structured_data.get("traffic_summary", {}) timeline_html = self._generate_timeline_html(timeline) metrics_html = self._generate_metrics_html(metrics, traffic_summary) safe_task_list = session.task_list if session.task_list else [] result_viz = create_result_visualization(safe_task_list, structured_data) map_fig = self._generate_map_from_data(structured_data) agent_updates = [ create_agent_card_enhanced(k, "complete", "✓ Complete") for k in ["planner", "scout", "optimizer", "validator", "weather", "traffic"] ] self._add_reasoning(session, "team", "🎉 All completed") session.planning_completed = True return ( timeline_html, metrics_html, result_viz, map_fig, gr.update(visible=True), gr.update(visible=False), "🎉 Planning completed!", *agent_updates, session.to_dict() ) except Exception as e: logger.error(f"Error in step4: {e}", exc_info=True) error_html = f"
Error: {str(e)}
" default_map = create_animated_map() agent_updates = [ create_agent_card_enhanced(k, "idle", "Waiting...") for k in ["planner", "scout", "optimizer", "validator", "weather", "traffic"] ] return ( error_html, error_html, error_html, default_map, gr.update(visible=True), gr.update(visible=False), f"Error: {str(e)}", *agent_updates, session.to_dict() ) # ===== 輔助方法 ===== # [Security Fix] 新增:保存設定到 User Session def save_settings(self, google_key, weather_key, anthropic_key, model, session_data): """保存設定到用戶 Session""" session = UserSession.from_dict(session_data) session.custom_settings['google_maps_api_key'] = google_key session.custom_settings['openweather_api_key'] = weather_key session.custom_settings['anthropic_api_key'] = anthropic_key session.custom_settings['model'] = model return "✅ Settings saved locally!", session.to_dict() def _add_reasoning(self, session: UserSession, agent: str, message: str): session.reasoning_messages.append({ "agent": agent, "message": message, "time": datetime.now().strftime("%H:%M:%S") }) def _create_step1_outputs(self, stream_text: str, session: UserSession, agent_status: tuple): agent_key, status, message = agent_status return ( self._create_stream_html(stream_text), "", "", get_reasoning_html_reversed(session.reasoning_messages), gr.update(visible=False), gr.update(visible=False), "", message, create_agent_card_enhanced(agent_key, status, message), create_agent_card_enhanced("scout", "idle", "On standby"), create_agent_card_enhanced("optimizer", "idle", "On standby"), create_agent_card_enhanced("validator", "idle", "On standby"), create_agent_card_enhanced("weather", "idle", "On standby"), create_agent_card_enhanced("traffic", "idle", "On standby"), session.to_dict() ) def _create_step1_complete_outputs(self, stream_text: str, session: UserSession, high_priority: int, total_time: int): summary_html = create_summary_card(len(session.task_list), high_priority, total_time) task_html = self._generate_task_list_html(session) return ( self._create_stream_html(stream_text), summary_html, task_html, get_reasoning_html_reversed(session.reasoning_messages), gr.update(visible=True), gr.update(visible=False), self._generate_chat_welcome_html(), "✓ Tasks extracted", create_agent_card_enhanced("planner", "complete", "Tasks ready"), create_agent_card_enhanced("scout", "idle", "On standby"), create_agent_card_enhanced("optimizer", "idle", "On standby"), create_agent_card_enhanced("validator", "idle", "On standby"), create_agent_card_enhanced("weather", "idle", "On standby"), create_agent_card_enhanced("traffic", "idle", "On standby"), session.to_dict() ) def _empty_step1_outputs(self, session: UserSession): yield ( create_agent_stream_output(), "", "", get_reasoning_html_reversed(), gr.update(visible=False), gr.update(visible=False), "", "Waiting for input...", create_agent_card_enhanced("planner", "idle", "On standby"), create_agent_card_enhanced("scout", "idle", "On standby"), create_agent_card_enhanced("optimizer", "idle", "On standby"), create_agent_card_enhanced("validator", "idle", "On standby"), create_agent_card_enhanced("weather", "idle", "On standby"), create_agent_card_enhanced("traffic", "idle", "On standby"), session.to_dict() ) def _create_error_outputs(self, error: str, session: UserSession): return ( self._create_stream_html(f"❌ Error: {error}"), "", "", get_reasoning_html_reversed(session.reasoning_messages), gr.update(visible=False), gr.update(visible=False), "", f"Error: {error}", create_agent_card_enhanced("planner", "idle", "Error occurred"), create_agent_card_enhanced("scout", "idle", "On standby"), create_agent_card_enhanced("optimizer", "idle", "On standby"), create_agent_card_enhanced("validator", "idle", "On standby"), create_agent_card_enhanced("weather", "idle", "On standby"), create_agent_card_enhanced("traffic", "idle", "On standby"), session.to_dict() ) def _create_stream_html(self, text: str) -> str: return f"""
{text}
""" def _convert_task_list_to_ui_format(self, task_list_data): ui_tasks = [] if isinstance(task_list_data, dict): tasks = task_list_data.get("tasks", []) elif isinstance(task_list_data, list): tasks = task_list_data else: return [] for i, task in enumerate(tasks, 1): ui_task = { "id": i, "title": task.get("description", "Task"), "priority": task.get("priority", "MEDIUM"), "time": task.get("time_window", "Anytime"), "duration": f"{task.get('duration_minutes', 30)} minutes", "location": task.get("location_hint", "To be determined"), "icon": self._get_task_icon(task.get("category", "other")) } ui_tasks.append(ui_task) return ui_tasks def _get_task_icon(self, category: str) -> str: icons = { "medical": "🏥", "shopping": "🛒", "postal": "📮", "food": "🍽️", "entertainment": "🎭", "transportation": "🚗", "other": "📋" } return icons.get(category.lower(), "📋") def _generate_task_list_html(self, session: UserSession) -> str: if not session.task_list: return "

No tasks available

" html = "" for task in session.task_list: html += create_task_card( task["id"], task["title"], task["priority"], task["time"], task["duration"], task["location"], task.get("icon", "📋") ) return html def _generate_chat_history_html(self, session: UserSession) -> str: if not session.chat_history: return self._generate_chat_welcome_html() html = '
' for msg in session.chat_history: role_class = "user" if msg["role"] == "user" else "assistant" icon = "👤" if msg["role"] == "user" else "🤖" html += f"""
{icon}
{msg["message"]}
{msg["time"]}
""" html += '
' return html def _generate_chat_welcome_html(self) -> str: return """

💬 Chat with LifeFlow AI

Modify your tasks by chatting here.

""" def _generate_timeline_html(self, timeline): """ 生成右側 Timeline Tab 的 HTML 使用 structured_data['timeline'] 中的豐富數據 """ if not timeline: return "
No timeline data
" html = '
' for i, stop in enumerate(timeline): time = stop.get("time", "N/A") location = stop.get("location", "Unknown Location") weather = stop.get("weather", "") aqi = stop.get("aqi", {}).get("label", "") travel_time = stop.get("travel_time_from_prev", "") # 決定標記樣式 marker_class = "start" if i == 0 else "stop" html += f"""
{i}
{time}

{location}

{f'🌤️ {weather}' if weather else ''} {f'{aqi}' if aqi else ''}
{f'

🚗 Drive: {travel_time}

' if i > 0 else ''}
""" html += '
' # 補充少許 CSS 以確保美觀 html += """ """ return html def _generate_metrics_html(self, metrics, traffic_summary): """生成左側的簡單 Metrics""" if traffic_summary is None: traffic_summary = {} if metrics is None: metrics = {} total_distance = traffic_summary.get("total_distance_km", 0) total_duration = traffic_summary.get("total_duration_min", 0) efficiency = metrics.get("route_efficiency_pct", 90) # 根據效率改變顏色 eff_color = "#50C878" if efficiency >= 80 else "#F5A623" return f"""

📏 Distance

{total_distance:.1f} km

⏱️ Duration

{total_duration:.0f} min

⚡ Efficiency

{efficiency:.0f}%

""" def _generate_map_from_data(self, structured_data): """ 生成 Plotly 地圖,包含真實道路軌跡 (Polyline) """ import plotly.graph_objects as go from core.utils import decode_polyline # 確保導入了解碼函數 try: timeline = structured_data.get("timeline", []) precise_result = structured_data.get("precise_traffic_result", {}) legs = precise_result.get("legs", []) if not timeline: return create_animated_map() fig = go.Figure() # 1. 繪製路線 (Polyline) - 藍色路徑 # 從 legs 中提取 polyline 並解碼 route_lats = [] route_lons = [] for leg in legs: poly_str = leg.get("polyline") if poly_str: decoded = decode_polyline(poly_str) # 解碼後是 list of (lat, lng) leg_lats = [coord[0] for coord in decoded] leg_lons = [coord[1] for coord in decoded] # 為了讓線段連續,我們可以將其串接 (plotly 支援 nan 分隔,或直接單一 trace) # 這裡簡單起見,我們將每個 leg 分開加,或者合併。合併效能較好。 route_lats.extend(leg_lats) route_lons.extend(leg_lons) if route_lats: fig.add_trace(go.Scattermapbox( lat=route_lats, lon=route_lons, mode='lines', line=dict(width=4, color='#4A90E2'), name='Route', hoverinfo='none' # 路徑本身不需 hover )) # 2. 繪製站點 (Markers) lats, lons, hover_texts, colors, sizes = [], [], [], [], [] for i, stop in enumerate(timeline): coords = stop.get("coordinates", {}) lat = coords.get("lat") lng = coords.get("lng") if lat and lng: lats.append(lat) lons.append(lng) # 構建 Hover Text name = stop.get("location", "Stop") time = stop.get("time", "") weather = stop.get("weather", "") text = f"{i}. {name}
🕒 {time}
🌤️ {weather}" hover_texts.append(text) # 樣式:起點綠色,終點紅色,中間藍色 if i == 0: colors.append('#50C878') # Start sizes.append(15) elif i == len(timeline) - 1: colors.append('#FF6B6B') # End sizes.append(15) else: colors.append('#F5A623') # Middle sizes.append(12) fig.add_trace(go.Scattermapbox( lat=lats, lon=lons, mode='markers+text', marker=dict(size=sizes, color=colors, allowoverlap=True), text=[str(i) for i in range(len(lats))], # 顯示序號 textposition="top center", textfont=dict(size=14, color='black', family="Arial Black"), hovertext=hover_texts, hoverinfo='text', name='Stops' )) # 設定地圖中心與縮放 if lats: center_lat = sum(lats) / len(lats) center_lon = sum(lons) / len(lons) else: center_lat, center_lon = 25.033, 121.565 fig.update_layout( mapbox=dict( style='open-street-map', # 或 'carto-positron' 看起來更乾淨 center=dict(lat=center_lat, lon=center_lon), zoom=12 ), margin=dict(l=0, r=0, t=0, b=0), height=500, showlegend=False ) return fig except Exception as e: logger.error(f"Error generating map: {e}", exc_info=True) return create_animated_map() def build_interface(self): with gr.Blocks(title=APP_TITLE) as demo: gr.HTML(get_enhanced_css()) create_header() theme_btn, settings_btn, doc_btn = create_top_controls() # 🔥 每個 Browser Tab 都有獨立的 UserSession session_state = gr.State(value=UserSession().to_dict()) with gr.Row(): with gr.Column(scale=2, min_width=400): (input_area, agent_stream_output, user_input, auto_location, location_inputs, lat_input, lon_input, analyze_btn) = create_input_form( create_agent_stream_output() ) (task_confirm_area, task_summary_display, task_list_display, exit_btn_inline, ready_plan_btn) = create_confirmation_area() team_area, agent_displays = create_team_area(create_agent_card_enhanced) (result_area, result_display, timeline_display, metrics_display) = create_result_area( create_animated_map) with gr.Column(scale=3, min_width=500): status_bar = gr.Textbox(label="📊 Status", value="Waiting for input...", interactive=False, max_lines=1) (tabs, report_tab, map_tab, report_output, map_output, reasoning_output, chat_input_area, chat_history_output, chat_input, chat_send) = create_tabs( create_animated_map, get_reasoning_html_reversed() ) (settings_modal, google_maps_key, openweather_key, anthropic_key, model_choice, close_settings_btn, save_settings_btn, settings_status) = create_settings_modal() doc_modal, close_doc_btn = create_doc_modal() # ===== Event Handlers ===== auto_location.change(fn=toggle_location_inputs, inputs=[auto_location], outputs=[location_inputs]) def analyze_wrapper(ui, al, lat, lon, s): session = UserSession.from_dict(s) yield from self.step1_analyze_tasks(ui, al, lat, lon, session) analyze_btn.click( fn=analyze_wrapper, inputs=[user_input, auto_location, lat_input, lon_input, session_state], outputs=[ agent_stream_output, task_summary_display, task_list_display, reasoning_output, task_confirm_area, chat_input_area, chat_history_output, status_bar, *agent_displays, session_state ] ).then( fn=lambda: (gr.update(visible=False), gr.update(visible=True), gr.update(visible=True)), outputs=[input_area, task_confirm_area, chat_input_area] ) def chat_wrapper(msg, s): session = UserSession.from_dict(s) yield from self.modify_task_chat(msg, session) chat_send.click( fn=chat_wrapper, inputs=[chat_input, session_state], outputs=[chat_history_output, task_list_display, session_state] ).then(fn=lambda: "", outputs=[chat_input]) exit_btn_inline.click( fn=lambda: ( gr.update(visible=True), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), "", create_agent_stream_output(), self._generate_chat_welcome_html(), "Ready...", UserSession().to_dict() ), outputs=[ input_area, task_confirm_area, chat_input_area, result_area, team_area, report_tab, map_tab, user_input, agent_stream_output, chat_history_output, status_bar, session_state ] ) ready_plan_btn.click( fn=lambda: (gr.update(visible=False), gr.update(visible=False), gr.update(visible=True), gr.update(selected="ai_conversation_tab")), outputs=[task_confirm_area, chat_input_area, team_area, tabs] ).then( fn=lambda s: self.step2_search_pois(UserSession.from_dict(s)), inputs=[session_state], outputs=[reasoning_output, status_bar, *agent_displays, session_state] ).then( fn=lambda s: self.step3_run_core_team(UserSession.from_dict(s)), inputs=[session_state], outputs=[report_output, session_state] ).then( fn=lambda: (gr.update(visible=True), gr.update(visible=True), gr.update(selected="report_tab")), outputs=[report_tab, map_tab, tabs] ).then( fn=lambda s: self.step4_finalize(UserSession.from_dict(s)), inputs=[session_state], outputs=[ timeline_display, metrics_display, result_display, map_output, map_tab, team_area, status_bar, *agent_displays, session_state ] ).then(fn=lambda: gr.update(visible=True), outputs=[result_area]) # Settings settings_btn.click(fn=lambda: gr.update(visible=True), outputs=[settings_modal]) close_settings_btn.click(fn=lambda: gr.update(visible=False), outputs=[settings_modal]) # [Security Fix] 儲存設定到 session_state save_settings_btn.click( fn=self.save_settings, inputs=[google_maps_key, openweather_key, anthropic_key, model_choice, session_state], outputs=[settings_status, session_state] ) # Theme theme_btn.click(fn=None, js=""" () => { const container = document.querySelector('.gradio-container'); if (container) { container.classList.toggle('theme-dark'); const isDark = container.classList.contains('theme-dark'); localStorage.setItem('lifeflow-theme', isDark ? 'dark' : 'light'); } } """) doc_btn.click(fn=lambda: gr.update(visible=True), outputs=[doc_modal]) close_doc_btn.click(fn=lambda: gr.update(visible=False), outputs=[doc_modal]) return demo def main(): app = LifeFlowAI() demo = app.build_interface() demo.launch(server_name="0.0.0.0", server_port=7860, share=True, show_error=True) if __name__ == "__main__": main()