Spaces:
Running
Running
| """ | |
| LifeFlow AI - Multi-User Safe with Real Streaming | |
| 支持多用戶並發 + 真正的串流輸出 + 安全的設定隔離 | |
| """ | |
| import sys | |
| from pathlib import Path | |
| import gradio as gr | |
| from datetime import datetime | |
| import time as time_module | |
| import json | |
| import uuid | |
| from typing import Dict, Any, Optional | |
| # ===== 導入配置 ===== | |
| from config import DEFAULT_SETTINGS, APP_TITLE | |
| # ===== 導入 UI 組件 ===== | |
| from ui.theme import get_enhanced_css | |
| from ui.components.header import create_header, create_top_controls | |
| from ui.components.input_form import create_input_form, toggle_location_inputs | |
| from ui.components.confirmation import create_confirmation_area, create_exit_button | |
| from ui.components.results import create_team_area, create_result_area, create_tabs | |
| from ui.components.modals import create_settings_modal, create_doc_modal | |
| # ===== 導入核心工具 ===== | |
| from core.utils import ( | |
| create_agent_stream_output, create_agent_card_enhanced, | |
| create_task_card, create_summary_card, create_animated_map, | |
| get_reasoning_html_reversed, create_celebration_animation, | |
| create_result_visualization | |
| ) | |
| # ===== 導入 Agno Agent 組件 ===== | |
| from agno.models.google import Gemini | |
| from agno.agent import RunEvent | |
| from agno.run.team import TeamRunEvent | |
| # ===== 導入 Agent 系統 ===== | |
| from src.agent.base import UserState, Location, get_context | |
| from src.agent.planner import create_planner_agent | |
| from src.agent.core_team import create_core_team | |
| from src.infra.context import set_session_id, get_session_id | |
| from src.infra.poi_repository import poi_repo | |
| from src.tools import ( | |
| ScoutToolkit, OptimizationToolkit, | |
| NavigationToolkit, WeatherToolkit, ReaderToolkit | |
| ) | |
| from src.infra.config import get_settings | |
| from src.infra.logger import get_logger | |
| logger = get_logger(__name__) | |
| class UserSession: | |
| """ | |
| 單個用戶的會話數據 | |
| 每個用戶有獨立的實例,確保資料隔離 | |
| """ | |
| def __init__(self): | |
| self.session_id: Optional[str] = None | |
| self.planner_agent = None | |
| self.core_team = None | |
| self.user_state: Optional[UserState] = None | |
| self.task_list: list = [] | |
| self.reasoning_messages: list = [] | |
| self.chat_history: list = [] | |
| self.planning_completed: bool = False | |
| # 存儲位置信息(用於重新初始化 Agent) | |
| self.lat: Optional[float] = None | |
| self.lng: Optional[float] = None | |
| # [Security Fix] 用戶個人的自定義設定 (API Keys, Model Choice) | |
| self.custom_settings: Dict[str, Any] = {} | |
| # 系統預設設定 (從環境變數讀取) | |
| self.agno_settings = get_settings() | |
| def to_dict(self) -> Dict[str, Any]: | |
| """序列化為字典(用於 Gradio State)""" | |
| return { | |
| 'session_id': self.session_id, | |
| 'task_list': self.task_list, | |
| 'reasoning_messages': self.reasoning_messages, | |
| 'chat_history': self.chat_history, | |
| 'planning_completed': self.planning_completed, | |
| 'lat': self.lat, | |
| 'lng': self.lng, | |
| # [Security Fix] 保存用戶自定義設定 | |
| 'custom_settings': self.custom_settings | |
| } | |
| def from_dict(cls, data: Dict[str, Any]) -> 'UserSession': | |
| """從字典恢復(用於 Gradio State)""" | |
| session = cls() | |
| session.session_id = data.get('session_id') | |
| session.task_list = data.get('task_list', []) | |
| session.reasoning_messages = data.get('reasoning_messages', []) | |
| session.chat_history = data.get('chat_history', []) | |
| session.planning_completed = data.get('planning_completed', False) | |
| session.lat = data.get('lat') | |
| session.lng = data.get('lng') | |
| # [Security Fix] 恢復用戶自定義設定 | |
| session.custom_settings = data.get('custom_settings', {}) | |
| return session | |
| class LifeFlowAI: | |
| """LifeFlow AI - Multi-User Safe Version""" | |
| def __init__(self): | |
| # [Security Fix] 移除全域 settings,避免用戶間資料汙染 | |
| pass | |
| def _initialize_agents(self, session: UserSession, lat: float, lng: float): | |
| """初始化 Agents(每個用戶獨立,並應用用戶設定)""" | |
| session.lat = lat | |
| session.lng = lng | |
| if session.planner_agent is not None: | |
| logger.debug(f"Agents already initialized for session {session.session_id}") | |
| return | |
| # 生成 Session ID | |
| if session.session_id is None: | |
| session.session_id = str(uuid.uuid4()) | |
| token = set_session_id(session.session_id) | |
| logger.info(f"🆔 New Session: {session.session_id}") | |
| else: | |
| set_session_id(session.session_id) | |
| logger.info(f"🔄 Restoring Session: {session.session_id}") | |
| # 設定用戶狀態 | |
| session.user_state = UserState(location=Location(lat=lat, lng=lng)) | |
| # [Security Fix] 讀取用戶選擇的模型,如果沒有則使用預設 | |
| selected_model_id = session.custom_settings.get('model', 'gemini-2.5-flash') | |
| # [Security Fix] 優先使用用戶提供的 API Key (這裡以 Gemini 為例,若支援其他模型需擴充邏輯) | |
| # 注意:實際應用中需根據選擇的 Model ID (Claude/Gemini) 來決定使用哪個 Key | |
| gemini_key = session.custom_settings.get('gemini_api_key') or session.agno_settings.gemini_api_key | |
| # 初始化模型 (應用設定) | |
| planner_model = Gemini( | |
| id=selected_model_id, | |
| thinking_budget=2048, | |
| api_key=gemini_key | |
| ) | |
| main_model = Gemini( | |
| id=selected_model_id, | |
| thinking_budget=1024, | |
| api_key=gemini_key | |
| ) | |
| lite_model = Gemini( | |
| id="gemini-2.5-flash-lite", # 輕量級模型通常固定或由次要選項決定 | |
| api_key=gemini_key | |
| ) | |
| # 配置模型和工具 | |
| models_dict = { | |
| "team": main_model, | |
| "scout": main_model, | |
| "optimizer": lite_model, | |
| "navigator": lite_model, | |
| "weatherman": lite_model, | |
| "presenter": main_model, | |
| } | |
| # [Note] 如果 Toolkit 支援傳入 API Key,應在此處從 session.custom_settings 傳入 | |
| tools_dict = { | |
| "scout": [ScoutToolkit()], | |
| "optimizer": [OptimizationToolkit()], | |
| "navigator": [NavigationToolkit()], | |
| "weatherman": [WeatherToolkit()], | |
| "presenter": [ReaderToolkit()], | |
| } | |
| planner_kwargs = { | |
| "additional_context": get_context(session.user_state), | |
| "timezone_identifier": session.user_state.utc_offset, | |
| "debug_mode": False, | |
| } | |
| team_kwargs = { | |
| "timezone_identifier": session.user_state.utc_offset, | |
| } | |
| # 創建 Agents | |
| session.planner_agent = create_planner_agent( | |
| planner_model, | |
| planner_kwargs, | |
| session_id=session.session_id | |
| ) | |
| session.core_team = create_core_team( | |
| models_dict, | |
| team_kwargs, | |
| tools_dict, | |
| session_id=session.session_id | |
| ) | |
| logger.info(f"✅ Agents initialized for session {session.session_id} using model {selected_model_id}") | |
| def step1_analyze_tasks(self, user_input: str, auto_location: bool, | |
| lat: float, lon: float, session: UserSession): | |
| """Step 1: 真正的串流分析""" | |
| if not user_input.strip(): | |
| yield from self._empty_step1_outputs(session) | |
| return | |
| if auto_location: | |
| lat, lon = 25.033, 121.565 | |
| try: | |
| self._initialize_agents(session, lat, lon) | |
| self._add_reasoning(session, "planner", "🚀 Starting analysis...") | |
| yield self._create_step1_outputs( | |
| stream_text="🤔 Analyzing your request with AI...", | |
| session=session, | |
| agent_status=("planner", "working", "Initializing...") | |
| ) | |
| time_module.sleep(0.3) | |
| self._add_reasoning(session, "planner", f"Processing: {user_input[:50]}...") | |
| yield self._create_step1_outputs( | |
| stream_text="🤔 Analyzing your request with AI...\n📋 AI is extracting tasks...", | |
| session=session, | |
| agent_status=("planner", "working", "Extracting tasks...") | |
| ) | |
| planner_stream = session.planner_agent.run( | |
| f"help user to update the task_list, user's message: {user_input}", | |
| stream=True, | |
| stream_events=True | |
| ) | |
| accumulated_response = "" | |
| displayed_text = "🤔 Analyzing your request with AI...\n📋 AI is extracting tasks...\n\n" | |
| show_content = True | |
| for chunk in planner_stream: | |
| if chunk.event == RunEvent.run_content: | |
| content = chunk.content | |
| accumulated_response += content | |
| if show_content: | |
| if "@@@" in accumulated_response: | |
| show_content = False | |
| remaining = content.split("@@@")[0] | |
| if remaining: | |
| displayed_text += remaining | |
| else: | |
| displayed_text += content | |
| yield self._create_step1_outputs( | |
| stream_text=displayed_text, | |
| session=session, | |
| agent_status=("planner", "working", "Processing...") | |
| ) | |
| json_data = "{" + accumulated_response.split("{", maxsplit=1)[-1] | |
| json_data = json_data.replace("`", "").replace("@", "").replace("\\", " ").replace("\n", " ") | |
| session.planner_agent.update_session_state( | |
| session_id=session.session_id, | |
| session_state_updates={"task_list": json_data} | |
| ) | |
| try: | |
| task_list_data = json.loads(json_data) | |
| session.task_list = self._convert_task_list_to_ui_format(task_list_data) | |
| except json.JSONDecodeError as e: | |
| logger.error(f"Failed to parse task_list: {e}") | |
| session.task_list = [] | |
| self._add_reasoning(session, "planner", f"✅ Extracted {len(session.task_list)} tasks") | |
| high_priority = sum(1 for t in session.task_list if t.get("priority") == "HIGH") | |
| total_time = sum( | |
| int(t.get("duration", "0").split()[0]) | |
| for t in session.task_list | |
| if t.get("duration") | |
| ) | |
| final_text = displayed_text + f"\n✅ Analysis complete! Found {len(session.task_list)} tasks." | |
| yield self._create_step1_complete_outputs( | |
| stream_text=final_text, | |
| session=session, | |
| high_priority=high_priority, | |
| total_time=total_time | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error in step1: {e}", exc_info=True) | |
| yield self._create_error_outputs(str(e), session) | |
| def modify_task_chat(self, user_message: str, session: UserSession): | |
| """修改任務(帶真正的串流)""" | |
| if not user_message.strip(): | |
| chat_html = self._generate_chat_history_html(session) | |
| task_html = self._generate_task_list_html(session) | |
| yield chat_html, task_html, session.to_dict() | |
| return | |
| session.chat_history.append({ | |
| "role": "user", | |
| "message": user_message, | |
| "time": datetime.now().strftime("%H:%M:%S") | |
| }) | |
| yield ( | |
| self._generate_chat_history_html(session), | |
| self._generate_task_list_html(session), | |
| session.to_dict() | |
| ) | |
| try: | |
| if session.planner_agent is None: | |
| if session.lat is not None and session.lng is not None: | |
| session.chat_history.append({ | |
| "role": "assistant", | |
| "message": "🔄 Restoring AI system...", | |
| "time": datetime.now().strftime("%H:%M:%S") | |
| }) | |
| yield ( | |
| self._generate_chat_history_html(session), | |
| self._generate_task_list_html(session), | |
| session.to_dict() | |
| ) | |
| self._initialize_agents(session, session.lat, session.lng) | |
| session.chat_history.pop() | |
| else: | |
| session.chat_history.append({ | |
| "role": "assistant", | |
| "message": "❌ Error: Please restart the planning process.", | |
| "time": datetime.now().strftime("%H:%M:%S") | |
| }) | |
| yield ( | |
| self._generate_chat_history_html(session), | |
| self._generate_task_list_html(session), | |
| session.to_dict() | |
| ) | |
| return | |
| session.chat_history.append({ | |
| "role": "assistant", | |
| "message": "🤔 AI is thinking...", | |
| "time": datetime.now().strftime("%H:%M:%S") | |
| }) | |
| yield ( | |
| self._generate_chat_history_html(session), | |
| self._generate_task_list_html(session), | |
| session.to_dict() | |
| ) | |
| planner_stream = session.planner_agent.run( | |
| f"help user to update the task_list, user's message: {user_message}", | |
| stream=True, | |
| stream_events=True | |
| ) | |
| accumulated_response = "" | |
| displayed_thinking = "🤔 AI is thinking...\n\n" | |
| show_content = True | |
| for chunk in planner_stream: | |
| if chunk.event == RunEvent.run_content: | |
| content = chunk.content | |
| accumulated_response += content | |
| if show_content: | |
| if "@@@" in accumulated_response: | |
| show_content = False | |
| content = content.split("@@@")[0] | |
| if content: | |
| displayed_thinking += content | |
| session.chat_history[-1] = { | |
| "role": "assistant", | |
| "message": displayed_thinking, | |
| "time": datetime.now().strftime("%H:%M:%S") | |
| } | |
| yield ( | |
| self._generate_chat_history_html(session), | |
| self._generate_task_list_html(session), | |
| session.to_dict() | |
| ) | |
| json_data = "{" + accumulated_response.split("{", maxsplit=1)[-1] | |
| json_data = json_data.replace("`", "").replace("@", "").replace("\\", " ").replace("\n", " ") | |
| session.planner_agent.update_session_state( | |
| session_id=session.session_id, | |
| session_state_updates={"task_list": json_data} | |
| ) | |
| task_list_data = json.loads(json_data) | |
| session.task_list = self._convert_task_list_to_ui_format(task_list_data) | |
| session.chat_history[-1] = { | |
| "role": "assistant", | |
| "message": "✅ Tasks updated based on your request", | |
| "time": datetime.now().strftime("%H:%M:%S") | |
| } | |
| self._add_reasoning(session, "planner", f"Updated: {user_message[:30]}...") | |
| yield ( | |
| self._generate_chat_history_html(session), | |
| self._generate_task_list_html(session), | |
| session.to_dict() | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error in modify_task_chat: {e}", exc_info=True) | |
| if session.chat_history and "thinking" in session.chat_history[-1].get("message", "").lower(): | |
| session.chat_history.pop() | |
| session.chat_history.append({ | |
| "role": "assistant", | |
| "message": f"❌ Error: {str(e)}", | |
| "time": datetime.now().strftime("%H:%M:%S") | |
| }) | |
| yield ( | |
| self._generate_chat_history_html(session), | |
| self._generate_task_list_html(session), | |
| session.to_dict() | |
| ) | |
| def step2_search_pois(self, session: UserSession): | |
| """Step 2: Scout 開始工作""" | |
| self._add_reasoning(session, "team", "🚀 Core Team activated") | |
| self._add_reasoning(session, "scout", "Searching for POIs...") | |
| agent_updates = [ | |
| create_agent_card_enhanced("planner", "complete", "Tasks ready"), | |
| create_agent_card_enhanced("scout", "working", "Searching..."), | |
| *[create_agent_card_enhanced(k, "idle", "On standby") | |
| for k in ["optimizer", "validator", "weather", "traffic"]] | |
| ] | |
| return ( | |
| get_reasoning_html_reversed(session.reasoning_messages), | |
| "🗺️ Scout is searching...", | |
| *agent_updates, | |
| session.to_dict() | |
| ) | |
| def step3_run_core_team(self, session: UserSession): | |
| """Step 3: 運行 Core Team(帶串流)""" | |
| try: | |
| if session.core_team is None or session.planner_agent is None: | |
| if session.lat is not None and session.lng is not None: | |
| self._initialize_agents(session, session.lat, session.lng) | |
| else: | |
| raise ValueError("Cannot restore agents: location not available") | |
| set_session_id(session.session_id) | |
| task_list_input = session.planner_agent.get_session_state().get("task_list") | |
| if not task_list_input: | |
| raise ValueError("No task list available") | |
| if isinstance(task_list_input, str): | |
| task_list_str = task_list_input | |
| else: | |
| task_list_str = json.dumps(task_list_input, indent=2, ensure_ascii=False) | |
| self._add_reasoning(session, "team", "🎯 Multi-agent collaboration started") | |
| team_stream = session.core_team.run( | |
| f"Plan this trip: {task_list_str}", | |
| stream=True, | |
| stream_events=True, | |
| session_id=session.session_id | |
| ) | |
| report_content = "" | |
| for event in team_stream: | |
| if event.event in [TeamRunEvent.run_content]: | |
| report_content += event.content | |
| elif event.event == "tool_call": | |
| tool_name = event.tool_call.get('function', {}).get('name', 'unknown') | |
| self._add_reasoning(session, "team", f"🔧 Tool: {tool_name}") | |
| elif event.event == TeamRunEvent.run_completed: | |
| self._add_reasoning(session, "team", "🎉 Completed") | |
| report_html = f"## 🎯 Planning Complete\n\n{report_content}..." | |
| return report_html, session.to_dict() | |
| except Exception as e: | |
| logger.error(f"Error in step3: {e}") | |
| return f"## ❌ Error\n\n{str(e)}", session.to_dict() | |
| def step4_finalize(self, session: UserSession): | |
| """Step 4: 完成""" | |
| try: | |
| final_ref_id = poi_repo.get_last_id_by_session(session.session_id) | |
| if not final_ref_id: | |
| raise ValueError(f"No final result found for session {session.session_id}") | |
| structured_data = poi_repo.load(final_ref_id) | |
| timeline = structured_data.get("timeline", []) | |
| metrics = structured_data.get("metrics", {}) | |
| traffic_summary = structured_data.get("traffic_summary", {}) | |
| timeline_html = self._generate_timeline_html(timeline) | |
| metrics_html = self._generate_metrics_html(metrics, traffic_summary) | |
| safe_task_list = session.task_list if session.task_list else [] | |
| result_viz = create_result_visualization(safe_task_list, structured_data) | |
| map_fig = self._generate_map_from_data(structured_data) | |
| agent_updates = [ | |
| create_agent_card_enhanced(k, "complete", "✓ Complete") | |
| for k in ["planner", "scout", "optimizer", "validator", "weather", "traffic"] | |
| ] | |
| self._add_reasoning(session, "team", "🎉 All completed") | |
| session.planning_completed = True | |
| return ( | |
| timeline_html, metrics_html, result_viz, map_fig, | |
| gr.update(visible=True), gr.update(visible=False), | |
| "🎉 Planning completed!", | |
| *agent_updates, | |
| session.to_dict() | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error in step4: {e}", exc_info=True) | |
| error_html = f"<div style='color:red'>Error: {str(e)}</div>" | |
| default_map = create_animated_map() | |
| agent_updates = [ | |
| create_agent_card_enhanced(k, "idle", "Waiting...") | |
| for k in ["planner", "scout", "optimizer", "validator", "weather", "traffic"] | |
| ] | |
| return ( | |
| error_html, error_html, error_html, default_map, | |
| gr.update(visible=True), gr.update(visible=False), | |
| f"Error: {str(e)}", | |
| *agent_updates, | |
| session.to_dict() | |
| ) | |
| # ===== 輔助方法 ===== | |
| # [Security Fix] 新增:保存設定到 User Session | |
| def save_settings(self, google_key, weather_key, anthropic_key, model, session_data): | |
| """保存設定到用戶 Session""" | |
| session = UserSession.from_dict(session_data) | |
| session.custom_settings['google_maps_api_key'] = google_key | |
| session.custom_settings['openweather_api_key'] = weather_key | |
| session.custom_settings['anthropic_api_key'] = anthropic_key | |
| session.custom_settings['model'] = model | |
| return "✅ Settings saved locally!", session.to_dict() | |
| def _add_reasoning(self, session: UserSession, agent: str, message: str): | |
| session.reasoning_messages.append({ | |
| "agent": agent, | |
| "message": message, | |
| "time": datetime.now().strftime("%H:%M:%S") | |
| }) | |
| def _create_step1_outputs(self, stream_text: str, session: UserSession, agent_status: tuple): | |
| agent_key, status, message = agent_status | |
| return ( | |
| self._create_stream_html(stream_text), "", "", | |
| get_reasoning_html_reversed(session.reasoning_messages), | |
| gr.update(visible=False), gr.update(visible=False), "", message, | |
| create_agent_card_enhanced(agent_key, status, message), | |
| create_agent_card_enhanced("scout", "idle", "On standby"), | |
| create_agent_card_enhanced("optimizer", "idle", "On standby"), | |
| create_agent_card_enhanced("validator", "idle", "On standby"), | |
| create_agent_card_enhanced("weather", "idle", "On standby"), | |
| create_agent_card_enhanced("traffic", "idle", "On standby"), | |
| session.to_dict() | |
| ) | |
| def _create_step1_complete_outputs(self, stream_text: str, session: UserSession, | |
| high_priority: int, total_time: int): | |
| summary_html = create_summary_card(len(session.task_list), high_priority, total_time) | |
| task_html = self._generate_task_list_html(session) | |
| return ( | |
| self._create_stream_html(stream_text), summary_html, task_html, | |
| get_reasoning_html_reversed(session.reasoning_messages), | |
| gr.update(visible=True), gr.update(visible=False), | |
| self._generate_chat_welcome_html(), "✓ Tasks extracted", | |
| create_agent_card_enhanced("planner", "complete", "Tasks ready"), | |
| create_agent_card_enhanced("scout", "idle", "On standby"), | |
| create_agent_card_enhanced("optimizer", "idle", "On standby"), | |
| create_agent_card_enhanced("validator", "idle", "On standby"), | |
| create_agent_card_enhanced("weather", "idle", "On standby"), | |
| create_agent_card_enhanced("traffic", "idle", "On standby"), | |
| session.to_dict() | |
| ) | |
| def _empty_step1_outputs(self, session: UserSession): | |
| yield ( | |
| create_agent_stream_output(), "", "", | |
| get_reasoning_html_reversed(), | |
| gr.update(visible=False), gr.update(visible=False), "", | |
| "Waiting for input...", | |
| create_agent_card_enhanced("planner", "idle", "On standby"), | |
| create_agent_card_enhanced("scout", "idle", "On standby"), | |
| create_agent_card_enhanced("optimizer", "idle", "On standby"), | |
| create_agent_card_enhanced("validator", "idle", "On standby"), | |
| create_agent_card_enhanced("weather", "idle", "On standby"), | |
| create_agent_card_enhanced("traffic", "idle", "On standby"), | |
| session.to_dict() | |
| ) | |
| def _create_error_outputs(self, error: str, session: UserSession): | |
| return ( | |
| self._create_stream_html(f"❌ Error: {error}"), "", "", | |
| get_reasoning_html_reversed(session.reasoning_messages), | |
| gr.update(visible=False), gr.update(visible=False), "", | |
| f"Error: {error}", | |
| create_agent_card_enhanced("planner", "idle", "Error occurred"), | |
| create_agent_card_enhanced("scout", "idle", "On standby"), | |
| create_agent_card_enhanced("optimizer", "idle", "On standby"), | |
| create_agent_card_enhanced("validator", "idle", "On standby"), | |
| create_agent_card_enhanced("weather", "idle", "On standby"), | |
| create_agent_card_enhanced("traffic", "idle", "On standby"), | |
| session.to_dict() | |
| ) | |
| def _create_stream_html(self, text: str) -> str: | |
| return f"""<div class="stream-container"><div class="stream-text">{text}<span class="stream-cursor"></span></div></div>""" | |
| def _convert_task_list_to_ui_format(self, task_list_data): | |
| ui_tasks = [] | |
| if isinstance(task_list_data, dict): | |
| tasks = task_list_data.get("tasks", []) | |
| elif isinstance(task_list_data, list): | |
| tasks = task_list_data | |
| else: | |
| return [] | |
| for i, task in enumerate(tasks, 1): | |
| ui_task = { | |
| "id": i, | |
| "title": task.get("description", "Task"), | |
| "priority": task.get("priority", "MEDIUM"), | |
| "time": task.get("time_window", "Anytime"), | |
| "duration": f"{task.get('duration_minutes', 30)} minutes", | |
| "location": task.get("location_hint", "To be determined"), | |
| "icon": self._get_task_icon(task.get("category", "other")) | |
| } | |
| ui_tasks.append(ui_task) | |
| return ui_tasks | |
| def _get_task_icon(self, category: str) -> str: | |
| icons = { | |
| "medical": "🏥", "shopping": "🛒", "postal": "📮", | |
| "food": "🍽️", "entertainment": "🎭", "transportation": "🚗", | |
| "other": "📋" | |
| } | |
| return icons.get(category.lower(), "📋") | |
| def _generate_task_list_html(self, session: UserSession) -> str: | |
| if not session.task_list: | |
| return "<p>No tasks available</p>" | |
| html = "" | |
| for task in session.task_list: | |
| html += create_task_card( | |
| task["id"], task["title"], task["priority"], | |
| task["time"], task["duration"], task["location"], | |
| task.get("icon", "📋") | |
| ) | |
| return html | |
| def _generate_chat_history_html(self, session: UserSession) -> str: | |
| if not session.chat_history: | |
| return self._generate_chat_welcome_html() | |
| html = '<div class="chat-history">' | |
| for msg in session.chat_history: | |
| role_class = "user" if msg["role"] == "user" else "assistant" | |
| icon = "👤" if msg["role"] == "user" else "🤖" | |
| html += f""" | |
| <div class="chat-message {role_class}"> | |
| <span class="chat-icon">{icon}</span> | |
| <div class="chat-content"> | |
| <div class="chat-text">{msg["message"]}</div> | |
| <div class="chat-time">{msg["time"]}</div> | |
| </div> | |
| </div> | |
| """ | |
| html += '</div>' | |
| return html | |
| def _generate_chat_welcome_html(self) -> str: | |
| return """ | |
| <div class="chat-welcome"> | |
| <h3>💬 Chat with LifeFlow AI</h3> | |
| <p>Modify your tasks by chatting here.</p> | |
| </div> | |
| """ | |
| def _generate_timeline_html(self, timeline): | |
| """ | |
| 生成右側 Timeline Tab 的 HTML | |
| 使用 structured_data['timeline'] 中的豐富數據 | |
| """ | |
| if not timeline: | |
| return "<div>No timeline data</div>" | |
| html = '<div class="timeline-container">' | |
| for i, stop in enumerate(timeline): | |
| time = stop.get("time", "N/A") | |
| location = stop.get("location", "Unknown Location") | |
| weather = stop.get("weather", "") | |
| aqi = stop.get("aqi", {}).get("label", "") | |
| travel_time = stop.get("travel_time_from_prev", "") | |
| # 決定標記樣式 | |
| marker_class = "start" if i == 0 else "stop" | |
| html += f""" | |
| <div class="timeline-item"> | |
| <div class="timeline-marker {marker_class}">{i}</div> | |
| <div class="timeline-content"> | |
| <div class="timeline-header"> | |
| <span class="time-badge">{time}</span> | |
| <h4>{location}</h4> | |
| </div> | |
| <div class="timeline-details"> | |
| {f'<span class="weather-tag">🌤️ {weather}</span>' if weather else ''} | |
| {f'<span class="aqi-tag">{aqi}</span>' if aqi else ''} | |
| </div> | |
| {f'<p class="travel-info">🚗 Drive: {travel_time}</p>' if i > 0 else ''} | |
| </div> | |
| </div> | |
| """ | |
| html += '</div>' | |
| # 補充少許 CSS 以確保美觀 | |
| html += """ | |
| <style> | |
| .timeline-header { display: flex; align-items: center; gap: 10px; margin-bottom: 5px; } | |
| .time-badge { background: #e3f2fd; color: #1976d2; padding: 2px 8px; border-radius: 4px; font-weight: bold; font-size: 12px; } | |
| .timeline-details { display: flex; gap: 8px; flex-wrap: wrap; margin-bottom: 5px; } | |
| .weather-tag, .aqi-tag { font-size: 11px; padding: 2px 6px; background: #f5f5f5; border-radius: 4px; border: 1px solid #e0e0e0; } | |
| .travel-info { font-size: 12px; color: #666; margin: 0; display: flex; align-items: center; gap: 5px; } | |
| </style> | |
| """ | |
| return html | |
| def _generate_metrics_html(self, metrics, traffic_summary): | |
| """生成左側的簡單 Metrics""" | |
| if traffic_summary is None: traffic_summary = {} | |
| if metrics is None: metrics = {} | |
| total_distance = traffic_summary.get("total_distance_km", 0) | |
| total_duration = traffic_summary.get("total_duration_min", 0) | |
| efficiency = metrics.get("route_efficiency_pct", 90) | |
| # 根據效率改變顏色 | |
| eff_color = "#50C878" if efficiency >= 80 else "#F5A623" | |
| return f""" | |
| <div class="metrics-container"> | |
| <div class="metric-card"> | |
| <h3>📏 Distance</h3> | |
| <p class="metric-value">{total_distance:.1f} km</p> | |
| </div> | |
| <div class="metric-card"> | |
| <h3>⏱️ Duration</h3> | |
| <p class="metric-value">{total_duration:.0f} min</p> | |
| </div> | |
| <div class="metric-card" style="border-bottom: 3px solid {eff_color}"> | |
| <h3>⚡ Efficiency</h3> | |
| <p class="metric-value" style="color: {eff_color}">{efficiency:.0f}%</p> | |
| </div> | |
| </div> | |
| """ | |
| def _generate_map_from_data(self, structured_data): | |
| """ | |
| 生成 Plotly 地圖,包含真實道路軌跡 (Polyline) | |
| """ | |
| import plotly.graph_objects as go | |
| from core.utils import decode_polyline # 確保導入了解碼函數 | |
| try: | |
| timeline = structured_data.get("timeline", []) | |
| precise_result = structured_data.get("precise_traffic_result", {}) | |
| legs = precise_result.get("legs", []) | |
| if not timeline: | |
| return create_animated_map() | |
| fig = go.Figure() | |
| # 1. 繪製路線 (Polyline) - 藍色路徑 | |
| # 從 legs 中提取 polyline 並解碼 | |
| route_lats = [] | |
| route_lons = [] | |
| for leg in legs: | |
| poly_str = leg.get("polyline") | |
| if poly_str: | |
| decoded = decode_polyline(poly_str) | |
| # 解碼後是 list of (lat, lng) | |
| leg_lats = [coord[0] for coord in decoded] | |
| leg_lons = [coord[1] for coord in decoded] | |
| # 為了讓線段連續,我們可以將其串接 (plotly 支援 nan 分隔,或直接單一 trace) | |
| # 這裡簡單起見,我們將每個 leg 分開加,或者合併。合併效能較好。 | |
| route_lats.extend(leg_lats) | |
| route_lons.extend(leg_lons) | |
| if route_lats: | |
| fig.add_trace(go.Scattermapbox( | |
| lat=route_lats, | |
| lon=route_lons, | |
| mode='lines', | |
| line=dict(width=4, color='#4A90E2'), | |
| name='Route', | |
| hoverinfo='none' # 路徑本身不需 hover | |
| )) | |
| # 2. 繪製站點 (Markers) | |
| lats, lons, hover_texts, colors, sizes = [], [], [], [], [] | |
| for i, stop in enumerate(timeline): | |
| coords = stop.get("coordinates", {}) | |
| lat = coords.get("lat") | |
| lng = coords.get("lng") | |
| if lat and lng: | |
| lats.append(lat) | |
| lons.append(lng) | |
| # 構建 Hover Text | |
| name = stop.get("location", "Stop") | |
| time = stop.get("time", "") | |
| weather = stop.get("weather", "") | |
| text = f"<b>{i}. {name}</b><br>🕒 {time}<br>🌤️ {weather}" | |
| hover_texts.append(text) | |
| # 樣式:起點綠色,終點紅色,中間藍色 | |
| if i == 0: | |
| colors.append('#50C878') # Start | |
| sizes.append(15) | |
| elif i == len(timeline) - 1: | |
| colors.append('#FF6B6B') # End | |
| sizes.append(15) | |
| else: | |
| colors.append('#F5A623') # Middle | |
| sizes.append(12) | |
| fig.add_trace(go.Scattermapbox( | |
| lat=lats, lon=lons, | |
| mode='markers+text', | |
| marker=dict(size=sizes, color=colors, allowoverlap=True), | |
| text=[str(i) for i in range(len(lats))], # 顯示序號 | |
| textposition="top center", | |
| textfont=dict(size=14, color='black', family="Arial Black"), | |
| hovertext=hover_texts, | |
| hoverinfo='text', | |
| name='Stops' | |
| )) | |
| # 設定地圖中心與縮放 | |
| if lats: | |
| center_lat = sum(lats) / len(lats) | |
| center_lon = sum(lons) / len(lons) | |
| else: | |
| center_lat, center_lon = 25.033, 121.565 | |
| fig.update_layout( | |
| mapbox=dict( | |
| style='open-street-map', # 或 'carto-positron' 看起來更乾淨 | |
| center=dict(lat=center_lat, lon=center_lon), | |
| zoom=12 | |
| ), | |
| margin=dict(l=0, r=0, t=0, b=0), | |
| height=500, | |
| showlegend=False | |
| ) | |
| return fig | |
| except Exception as e: | |
| logger.error(f"Error generating map: {e}", exc_info=True) | |
| return create_animated_map() | |
| def build_interface(self): | |
| with gr.Blocks(title=APP_TITLE) as demo: | |
| gr.HTML(get_enhanced_css()) | |
| create_header() | |
| theme_btn, settings_btn, doc_btn = create_top_controls() | |
| # 🔥 每個 Browser Tab 都有獨立的 UserSession | |
| session_state = gr.State(value=UserSession().to_dict()) | |
| with gr.Row(): | |
| with gr.Column(scale=2, min_width=400): | |
| (input_area, agent_stream_output, user_input, auto_location, | |
| location_inputs, lat_input, lon_input, analyze_btn) = create_input_form( | |
| create_agent_stream_output() | |
| ) | |
| (task_confirm_area, task_summary_display, | |
| task_list_display, exit_btn_inline, ready_plan_btn) = create_confirmation_area() | |
| team_area, agent_displays = create_team_area(create_agent_card_enhanced) | |
| (result_area, result_display, timeline_display, metrics_display) = create_result_area( | |
| create_animated_map) | |
| with gr.Column(scale=3, min_width=500): | |
| status_bar = gr.Textbox(label="📊 Status", value="Waiting for input...", interactive=False, | |
| max_lines=1) | |
| (tabs, report_tab, map_tab, report_output, map_output, reasoning_output, | |
| chat_input_area, chat_history_output, chat_input, chat_send) = create_tabs( | |
| create_animated_map, get_reasoning_html_reversed() | |
| ) | |
| (settings_modal, google_maps_key, openweather_key, anthropic_key, | |
| model_choice, close_settings_btn, save_settings_btn, | |
| settings_status) = create_settings_modal() | |
| doc_modal, close_doc_btn = create_doc_modal() | |
| # ===== Event Handlers ===== | |
| auto_location.change(fn=toggle_location_inputs, inputs=[auto_location], outputs=[location_inputs]) | |
| def analyze_wrapper(ui, al, lat, lon, s): | |
| session = UserSession.from_dict(s) | |
| yield from self.step1_analyze_tasks(ui, al, lat, lon, session) | |
| analyze_btn.click( | |
| fn=analyze_wrapper, | |
| inputs=[user_input, auto_location, lat_input, lon_input, session_state], | |
| outputs=[ | |
| agent_stream_output, task_summary_display, task_list_display, | |
| reasoning_output, task_confirm_area, chat_input_area, | |
| chat_history_output, status_bar, *agent_displays, session_state | |
| ] | |
| ).then( | |
| fn=lambda: (gr.update(visible=False), gr.update(visible=True), gr.update(visible=True)), | |
| outputs=[input_area, task_confirm_area, chat_input_area] | |
| ) | |
| def chat_wrapper(msg, s): | |
| session = UserSession.from_dict(s) | |
| yield from self.modify_task_chat(msg, session) | |
| chat_send.click( | |
| fn=chat_wrapper, | |
| inputs=[chat_input, session_state], | |
| outputs=[chat_history_output, task_list_display, session_state] | |
| ).then(fn=lambda: "", outputs=[chat_input]) | |
| exit_btn_inline.click( | |
| fn=lambda: ( | |
| gr.update(visible=True), gr.update(visible=False), | |
| gr.update(visible=False), gr.update(visible=False), | |
| gr.update(visible=False), gr.update(visible=False), | |
| gr.update(visible=False), "", | |
| create_agent_stream_output(), | |
| self._generate_chat_welcome_html(), | |
| "Ready...", | |
| UserSession().to_dict() | |
| ), | |
| outputs=[ | |
| input_area, task_confirm_area, chat_input_area, result_area, | |
| team_area, report_tab, map_tab, user_input, | |
| agent_stream_output, chat_history_output, status_bar, session_state | |
| ] | |
| ) | |
| ready_plan_btn.click( | |
| fn=lambda: (gr.update(visible=False), gr.update(visible=False), | |
| gr.update(visible=True), gr.update(selected="ai_conversation_tab")), | |
| outputs=[task_confirm_area, chat_input_area, team_area, tabs] | |
| ).then( | |
| fn=lambda s: self.step2_search_pois(UserSession.from_dict(s)), | |
| inputs=[session_state], | |
| outputs=[reasoning_output, status_bar, *agent_displays, session_state] | |
| ).then( | |
| fn=lambda s: self.step3_run_core_team(UserSession.from_dict(s)), | |
| inputs=[session_state], | |
| outputs=[report_output, session_state] | |
| ).then( | |
| fn=lambda: (gr.update(visible=True), gr.update(visible=True), gr.update(selected="report_tab")), | |
| outputs=[report_tab, map_tab, tabs] | |
| ).then( | |
| fn=lambda s: self.step4_finalize(UserSession.from_dict(s)), | |
| inputs=[session_state], | |
| outputs=[ | |
| timeline_display, metrics_display, result_display, | |
| map_output, map_tab, team_area, status_bar, *agent_displays, session_state | |
| ] | |
| ).then(fn=lambda: gr.update(visible=True), outputs=[result_area]) | |
| # Settings | |
| settings_btn.click(fn=lambda: gr.update(visible=True), outputs=[settings_modal]) | |
| close_settings_btn.click(fn=lambda: gr.update(visible=False), outputs=[settings_modal]) | |
| # [Security Fix] 儲存設定到 session_state | |
| save_settings_btn.click( | |
| fn=self.save_settings, | |
| inputs=[google_maps_key, openweather_key, anthropic_key, model_choice, session_state], | |
| outputs=[settings_status, session_state] | |
| ) | |
| # Theme | |
| theme_btn.click(fn=None, js=""" | |
| () => { | |
| const container = document.querySelector('.gradio-container'); | |
| if (container) { | |
| container.classList.toggle('theme-dark'); | |
| const isDark = container.classList.contains('theme-dark'); | |
| localStorage.setItem('lifeflow-theme', isDark ? 'dark' : 'light'); | |
| } | |
| } | |
| """) | |
| doc_btn.click(fn=lambda: gr.update(visible=True), outputs=[doc_modal]) | |
| close_doc_btn.click(fn=lambda: gr.update(visible=False), outputs=[doc_modal]) | |
| return demo | |
| def main(): | |
| app = LifeFlowAI() | |
| demo = app.build_interface() | |
| demo.launch(server_name="0.0.0.0", server_port=7860, share=True, show_error=True) | |
| if __name__ == "__main__": | |
| main() |