""" LifeFlow AI - Multi-User Safe with Real Streaming 支持多用戶並發 + 真正的串流輸出 + 安全的設定隔離 """ import sys from pathlib import Path import gradio as gr from datetime import datetime import time as time_module import json import uuid from typing import Dict, Any, Optional # ===== 導入配置 ===== from config import DEFAULT_SETTINGS, APP_TITLE # ===== 導入 UI 組件 ===== from ui.theme import get_enhanced_css from ui.components.header import create_header, create_top_controls from ui.components.input_form import create_input_form, toggle_location_inputs from ui.components.confirmation import create_confirmation_area, create_exit_button from ui.components.results import create_team_area, create_result_area, create_tabs from ui.components.modals import create_settings_modal, create_doc_modal # ===== 導入核心工具 ===== from core.utils import ( create_agent_stream_output, create_agent_card_enhanced, create_task_card, create_summary_card, create_animated_map, get_reasoning_html_reversed, create_celebration_animation, create_result_visualization ) # ===== 導入 Agno Agent 組件 ===== from agno.models.google import Gemini from agno.agent import RunEvent from agno.run.team import TeamRunEvent # ===== 導入 Agent 系統 ===== from src.agent.base import UserState, Location, get_context from src.agent.planner import create_planner_agent from src.agent.core_team import create_core_team from src.infra.context import set_session_id, get_session_id from src.infra.poi_repository import poi_repo from src.tools import ( ScoutToolkit, OptimizationToolkit, NavigationToolkit, WeatherToolkit, ReaderToolkit ) from src.infra.config import get_settings from src.infra.logger import get_logger logger = get_logger(__name__) class UserSession: """ 單個用戶的會話數據 每個用戶有獨立的實例,確保資料隔離 """ def __init__(self): self.session_id: Optional[str] = None self.planner_agent = None self.core_team = None self.user_state: Optional[UserState] = None self.task_list: list = [] self.reasoning_messages: list = [] self.chat_history: list = [] self.planning_completed: bool = False # 存儲位置信息(用於重新初始化 Agent) self.lat: Optional[float] = None self.lng: Optional[float] = None # [Security Fix] 用戶個人的自定義設定 (API Keys, Model Choice) self.custom_settings: Dict[str, Any] = {} # 系統預設設定 (從環境變數讀取) self.agno_settings = get_settings() def to_dict(self) -> Dict[str, Any]: """序列化為字典(用於 Gradio State)""" return { 'session_id': self.session_id, 'task_list': self.task_list, 'reasoning_messages': self.reasoning_messages, 'chat_history': self.chat_history, 'planning_completed': self.planning_completed, 'lat': self.lat, 'lng': self.lng, # [Security Fix] 保存用戶自定義設定 'custom_settings': self.custom_settings } @classmethod def from_dict(cls, data: Dict[str, Any]) -> 'UserSession': """從字典恢復(用於 Gradio State)""" session = cls() session.session_id = data.get('session_id') session.task_list = data.get('task_list', []) session.reasoning_messages = data.get('reasoning_messages', []) session.chat_history = data.get('chat_history', []) session.planning_completed = data.get('planning_completed', False) session.lat = data.get('lat') session.lng = data.get('lng') # [Security Fix] 恢復用戶自定義設定 session.custom_settings = data.get('custom_settings', {}) return session class LifeFlowAI: """LifeFlow AI - Multi-User Safe Version""" def __init__(self): # [Security Fix] 移除全域 settings,避免用戶間資料汙染 pass def _initialize_agents(self, session: UserSession, lat: float, lng: float): """初始化 Agents(每個用戶獨立,並應用用戶設定)""" session.lat = lat session.lng = lng if session.planner_agent is not None: logger.debug(f"Agents already initialized for session {session.session_id}") return # 生成 Session ID if session.session_id is None: session.session_id = str(uuid.uuid4()) token = set_session_id(session.session_id) logger.info(f"🆔 New Session: {session.session_id}") else: set_session_id(session.session_id) logger.info(f"🔄 Restoring Session: {session.session_id}") # 設定用戶狀態 session.user_state = UserState(location=Location(lat=lat, lng=lng)) # [Security Fix] 讀取用戶選擇的模型,如果沒有則使用預設 selected_model_id = session.custom_settings.get('model', 'gemini-2.5-flash') # [Security Fix] 優先使用用戶提供的 API Key (這裡以 Gemini 為例,若支援其他模型需擴充邏輯) # 注意:實際應用中需根據選擇的 Model ID (Claude/Gemini) 來決定使用哪個 Key gemini_key = session.custom_settings.get('gemini_api_key') or session.agno_settings.gemini_api_key # 初始化模型 (應用設定) planner_model = Gemini( id=selected_model_id, thinking_budget=2048, api_key=gemini_key ) main_model = Gemini( id=selected_model_id, thinking_budget=1024, api_key=gemini_key ) lite_model = Gemini( id="gemini-2.5-flash-lite", # 輕量級模型通常固定或由次要選項決定 api_key=gemini_key ) # 配置模型和工具 models_dict = { "team": main_model, "scout": main_model, "optimizer": lite_model, "navigator": lite_model, "weatherman": lite_model, "presenter": main_model, } # [Note] 如果 Toolkit 支援傳入 API Key,應在此處從 session.custom_settings 傳入 tools_dict = { "scout": [ScoutToolkit()], "optimizer": [OptimizationToolkit()], "navigator": [NavigationToolkit()], "weatherman": [WeatherToolkit()], "presenter": [ReaderToolkit()], } planner_kwargs = { "additional_context": get_context(session.user_state), "timezone_identifier": session.user_state.utc_offset, "debug_mode": False, } team_kwargs = { "timezone_identifier": session.user_state.utc_offset, } # 創建 Agents session.planner_agent = create_planner_agent( planner_model, planner_kwargs, session_id=session.session_id ) session.core_team = create_core_team( models_dict, team_kwargs, tools_dict, session_id=session.session_id ) logger.info(f"✅ Agents initialized for session {session.session_id} using model {selected_model_id}") def step1_analyze_tasks(self, user_input: str, auto_location: bool, lat: float, lon: float, session: UserSession): """Step 1: 真正的串流分析""" if not user_input.strip(): yield from self._empty_step1_outputs(session) return if auto_location: lat, lon = 25.033, 121.565 try: self._initialize_agents(session, lat, lon) self._add_reasoning(session, "planner", "🚀 Starting analysis...") yield self._create_step1_outputs( stream_text="🤔 Analyzing your request with AI...", session=session, agent_status=("planner", "working", "Initializing...") ) time_module.sleep(0.3) self._add_reasoning(session, "planner", f"Processing: {user_input[:50]}...") yield self._create_step1_outputs( stream_text="🤔 Analyzing your request with AI...\n📋 AI is extracting tasks...", session=session, agent_status=("planner", "working", "Extracting tasks...") ) planner_stream = session.planner_agent.run( f"help user to update the task_list, user's message: {user_input}", stream=True, stream_events=True ) accumulated_response = "" displayed_text = "🤔 Analyzing your request with AI...\n📋 AI is extracting tasks...\n\n" show_content = True for chunk in planner_stream: if chunk.event == RunEvent.run_content: content = chunk.content accumulated_response += content if show_content: if "@@@" in accumulated_response: show_content = False remaining = content.split("@@@")[0] if remaining: displayed_text += remaining else: displayed_text += content yield self._create_step1_outputs( stream_text=displayed_text, session=session, agent_status=("planner", "working", "Processing...") ) json_data = "{" + accumulated_response.split("{", maxsplit=1)[-1] json_data = json_data.replace("`", "").replace("@", "").replace("\\", " ").replace("\n", " ") session.planner_agent.update_session_state( session_id=session.session_id, session_state_updates={"task_list": json_data} ) try: task_list_data = json.loads(json_data) session.task_list = self._convert_task_list_to_ui_format(task_list_data) except json.JSONDecodeError as e: logger.error(f"Failed to parse task_list: {e}") session.task_list = [] self._add_reasoning(session, "planner", f"✅ Extracted {len(session.task_list)} tasks") high_priority = sum(1 for t in session.task_list if t.get("priority") == "HIGH") total_time = sum( int(t.get("duration", "0").split()[0]) for t in session.task_list if t.get("duration") ) final_text = displayed_text + f"\n✅ Analysis complete! Found {len(session.task_list)} tasks." yield self._create_step1_complete_outputs( stream_text=final_text, session=session, high_priority=high_priority, total_time=total_time ) except Exception as e: logger.error(f"Error in step1: {e}", exc_info=True) yield self._create_error_outputs(str(e), session) def modify_task_chat(self, user_message: str, session: UserSession): """修改任務(帶真正的串流)""" if not user_message.strip(): chat_html = self._generate_chat_history_html(session) task_html = self._generate_task_list_html(session) yield chat_html, task_html, session.to_dict() return session.chat_history.append({ "role": "user", "message": user_message, "time": datetime.now().strftime("%H:%M:%S") }) yield ( self._generate_chat_history_html(session), self._generate_task_list_html(session), session.to_dict() ) try: if session.planner_agent is None: if session.lat is not None and session.lng is not None: session.chat_history.append({ "role": "assistant", "message": "🔄 Restoring AI system...", "time": datetime.now().strftime("%H:%M:%S") }) yield ( self._generate_chat_history_html(session), self._generate_task_list_html(session), session.to_dict() ) self._initialize_agents(session, session.lat, session.lng) session.chat_history.pop() else: session.chat_history.append({ "role": "assistant", "message": "❌ Error: Please restart the planning process.", "time": datetime.now().strftime("%H:%M:%S") }) yield ( self._generate_chat_history_html(session), self._generate_task_list_html(session), session.to_dict() ) return session.chat_history.append({ "role": "assistant", "message": "🤔 AI is thinking...", "time": datetime.now().strftime("%H:%M:%S") }) yield ( self._generate_chat_history_html(session), self._generate_task_list_html(session), session.to_dict() ) planner_stream = session.planner_agent.run( f"help user to update the task_list, user's message: {user_message}", stream=True, stream_events=True ) accumulated_response = "" displayed_thinking = "🤔 AI is thinking...\n\n" show_content = True for chunk in planner_stream: if chunk.event == RunEvent.run_content: content = chunk.content accumulated_response += content if show_content: if "@@@" in accumulated_response: show_content = False content = content.split("@@@")[0] if content: displayed_thinking += content session.chat_history[-1] = { "role": "assistant", "message": displayed_thinking, "time": datetime.now().strftime("%H:%M:%S") } yield ( self._generate_chat_history_html(session), self._generate_task_list_html(session), session.to_dict() ) json_data = "{" + accumulated_response.split("{", maxsplit=1)[-1] json_data = json_data.replace("`", "").replace("@", "").replace("\\", " ").replace("\n", " ") session.planner_agent.update_session_state( session_id=session.session_id, session_state_updates={"task_list": json_data} ) task_list_data = json.loads(json_data) session.task_list = self._convert_task_list_to_ui_format(task_list_data) session.chat_history[-1] = { "role": "assistant", "message": "✅ Tasks updated based on your request", "time": datetime.now().strftime("%H:%M:%S") } self._add_reasoning(session, "planner", f"Updated: {user_message[:30]}...") yield ( self._generate_chat_history_html(session), self._generate_task_list_html(session), session.to_dict() ) except Exception as e: logger.error(f"Error in modify_task_chat: {e}", exc_info=True) if session.chat_history and "thinking" in session.chat_history[-1].get("message", "").lower(): session.chat_history.pop() session.chat_history.append({ "role": "assistant", "message": f"❌ Error: {str(e)}", "time": datetime.now().strftime("%H:%M:%S") }) yield ( self._generate_chat_history_html(session), self._generate_task_list_html(session), session.to_dict() ) def step2_search_pois(self, session: UserSession): """Step 2: Scout 開始工作""" self._add_reasoning(session, "team", "🚀 Core Team activated") self._add_reasoning(session, "scout", "Searching for POIs...") agent_updates = [ create_agent_card_enhanced("planner", "complete", "Tasks ready"), create_agent_card_enhanced("scout", "working", "Searching..."), *[create_agent_card_enhanced(k, "idle", "On standby") for k in ["optimizer", "validator", "weather", "traffic"]] ] return ( get_reasoning_html_reversed(session.reasoning_messages), "🗺️ Scout is searching...", *agent_updates, session.to_dict() ) def step3_run_core_team(self, session: UserSession): """Step 3: 運行 Core Team(帶串流)""" try: if session.core_team is None or session.planner_agent is None: if session.lat is not None and session.lng is not None: self._initialize_agents(session, session.lat, session.lng) else: raise ValueError("Cannot restore agents: location not available") set_session_id(session.session_id) task_list_input = session.planner_agent.get_session_state().get("task_list") if not task_list_input: raise ValueError("No task list available") if isinstance(task_list_input, str): task_list_str = task_list_input else: task_list_str = json.dumps(task_list_input, indent=2, ensure_ascii=False) self._add_reasoning(session, "team", "🎯 Multi-agent collaboration started") team_stream = session.core_team.run( f"Plan this trip: {task_list_str}", stream=True, stream_events=True, session_id=session.session_id ) report_content = "" for event in team_stream: if event.event in [TeamRunEvent.run_content]: report_content += event.content elif event.event == "tool_call": tool_name = event.tool_call.get('function', {}).get('name', 'unknown') self._add_reasoning(session, "team", f"🔧 Tool: {tool_name}") elif event.event == TeamRunEvent.run_completed: self._add_reasoning(session, "team", "🎉 Completed") report_html = f"## 🎯 Planning Complete\n\n{report_content}..." return report_html, session.to_dict() except Exception as e: logger.error(f"Error in step3: {e}") return f"## ❌ Error\n\n{str(e)}", session.to_dict() def step4_finalize(self, session: UserSession): """Step 4: 完成""" try: final_ref_id = poi_repo.get_last_id_by_session(session.session_id) if not final_ref_id: raise ValueError(f"No final result found for session {session.session_id}") structured_data = poi_repo.load(final_ref_id) timeline = structured_data.get("timeline", []) metrics = structured_data.get("metrics", {}) traffic_summary = structured_data.get("traffic_summary", {}) timeline_html = self._generate_timeline_html(timeline) metrics_html = self._generate_metrics_html(metrics, traffic_summary) safe_task_list = session.task_list if session.task_list else [] result_viz = create_result_visualization(safe_task_list, structured_data) map_fig = self._generate_map_from_data(structured_data) agent_updates = [ create_agent_card_enhanced(k, "complete", "✓ Complete") for k in ["planner", "scout", "optimizer", "validator", "weather", "traffic"] ] self._add_reasoning(session, "team", "🎉 All completed") session.planning_completed = True return ( timeline_html, metrics_html, result_viz, map_fig, gr.update(visible=True), gr.update(visible=False), "🎉 Planning completed!", *agent_updates, session.to_dict() ) except Exception as e: logger.error(f"Error in step4: {e}", exc_info=True) error_html = f"
No tasks available
" html = "" for task in session.task_list: html += create_task_card( task["id"], task["title"], task["priority"], task["time"], task["duration"], task["location"], task.get("icon", "📋") ) return html def _generate_chat_history_html(self, session: UserSession) -> str: if not session.chat_history: return self._generate_chat_welcome_html() html = 'Modify your tasks by chatting here.
🚗 Drive: {travel_time}
' if i > 0 else ''}{total_distance:.1f} km
{total_duration:.0f} min
{efficiency:.0f}%