""" LifeFlow AI - Planner Service (Refactored for MCP Architecture) 核心業務邏輯層:負責協調 Agent 運作、狀態更新與資料處理。 ✅ 移除本地 Toolkits ✅ 整合全域 MCP Client ✅ 保持業務邏輯不變 """ import json import os import time import uuid from datetime import datetime from typing import Generator, Dict, Any, Tuple, Optional, AsyncGenerator # Core Imports from core.session import UserSession from ui.renderers import ( create_task_card, create_summary_card, create_timeline_html_enhanced, ) from core.visualizers import create_animated_map from config import AGENTS_INFO # Models from agno.models.google import Gemini from agno.models.openai import OpenAIChat from agno.models.groq import Groq # Agno Framework from agno.agent import RunEvent from agno.run.team import TeamRunEvent from src.agent.base import UserState, Location, get_context from src.agent.planner import create_planner_agent from src.agent.core_team import create_core_team from src.infra.context import set_session_id from src.infra.poi_repository import poi_repo from src.infra.logger import get_logger from src.infra.client_context import client_session_ctx # 🔥🔥🔥 NEW IMPORTS: 只使用 MCPTools from agno.tools.mcp import MCPTools gemini_safety_settings = [ {"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE"}, {"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_NONE"}, {"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_NONE"}, {"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"}, ] logger = get_logger(__name__) max_retries = 5 class PlannerService: """ PlannerService (MCP Version) """ # Active Sessions 快取 (僅存 Session 物件,不含全域資源) _active_sessions: Dict[str, UserSession] = {} _cancelled_sessions: set = set() # 🔥 全域工具參照 (由 App 注入) _global_toolkits: Dict[str, MCPTools] = {} def set_global_tools(self, toolkits_dict: Dict[str, MCPTools]): """由 app.py 呼叫,注入 MCP Client""" self._global_toolkits = toolkits_dict logger.info("💉 MCP Toolkit injected into PlannerService successfully.") def _inject_session_id_into_toolkit(self, toolkit: MCPTools, session_id: str): """ Monkey Patch 2.0: 攔截 Toolkit 產生的 Function,並在執行時自動注入 session_id 參數。 """ # 1. 備份原始的 get_tools 方法 # 因為 Agno 是呼叫 get_tools() 來取得工具列表的 original_get_tools = toolkit.functions def get_tools_wrapper(): # 2. 取得原始工具列表 (List[Function]) tools = original_get_tools for tool in tools: # 3. 備份每個工具的執行入口 (entrypoint) original_entrypoint = tool.entrypoint # 4. 定義新的執行入口 (Wrapper) def entrypoint_wrapper(*args, **kwargs): # 🔥 核心魔法:在這裡偷偷塞入 session_id # 這樣 LLM 沒傳這個參數,也會被自動補上 kwargs['session_id'] = session_id # 執行原始 MCP 呼叫 return original_entrypoint(*args, **kwargs) # 5. 替換掉入口 tool.entrypoint = entrypoint_wrapper return tools # 6. 將 Toolkit 的 get_tools 換成我們的 Wrapper # 這是 Instance Level 的修改,只會影響當前這個 Agent 的 Toolkit toolkit.get_tools = get_tools_wrapper def cancel_session(self, session_id: str): if session_id: logger.info(f"🛑 Requesting cancellation for session: {session_id}") self._cancelled_sessions.add(session_id) def _get_live_session(self, incoming_session: UserSession) -> UserSession: sid = incoming_session.session_id if not sid: return incoming_session if sid and sid in self._active_sessions: live_session = self._active_sessions[sid] live_session.lat = incoming_session.lat live_session.lng = incoming_session.lng if incoming_session.custom_settings: live_session.custom_settings.update(incoming_session.custom_settings) if len(incoming_session.chat_history) > len(live_session.chat_history): live_session.chat_history = incoming_session.chat_history return live_session self._active_sessions[sid] = incoming_session return incoming_session def initialize_agents(self, session: UserSession, lat: float, lng: float) -> UserSession: if not session.session_id: session.session_id = str(uuid.uuid4()) logger.info(f"🆔 Generated New Session ID: {session.session_id}") session = self._get_live_session(session) session.lat = lat session.lng = lng if not session.user_state: session.user_state = UserState(location=Location(lat=lat, lng=lng)) else: session.user_state.location = Location(lat=lat, lng=lng) if session.planner_agent is not None: return session # 1. 設定模型 (Models) settings = session.custom_settings provider = settings.get("llm_provider", "Gemini") main_api_key = settings.get("model_api_key") selected_model_id = settings.get("model", "gemini-2.5-flash") helper_model_id = settings.get("groq_fast_model", "openai/gpt-oss-20b") enable_fast_mode = settings.get("enable_fast_mode", False) if main_api_key is None: provider = "Gemini" main_api_key = os.environ.get("GEMINI_API_KEY") selected_model_id = "gemini-2.5-flash" logger.warning("⚠️ Main API key not provided, defaulting to Gemini-2.5-flash with env var.") groq_api_key = settings["groq_api_key"] if settings.get("groq_api_key") else os.environ.get("GROQ_API_KEY") # 初始化 Main Brain if provider.lower() == "gemini": main_brain = Gemini(id=selected_model_id, api_key=main_api_key, thinking_budget=1024, safety_settings=gemini_safety_settings) elif provider.lower() == "openai": main_brain = OpenAIChat(id=selected_model_id, api_key=main_api_key) elif provider.lower() == "groq": main_brain = Groq(id=selected_model_id, api_key=main_api_key, temperature=0.1) else: main_brain = Gemini(id='gemini-2.5-flash', api_key=main_api_key) # 初始化 Helper Model if enable_fast_mode and groq_api_key: helper_model = Groq(id=helper_model_id, api_key=groq_api_key, temperature=0.1) else: if provider.lower() == "gemini": helper_model = Gemini(id="gemini-2.5-flash-lite", api_key=main_api_key, safety_settings=gemini_safety_settings) elif provider.lower() == "openai": helper_model = OpenAIChat(id="gpt-4o-mini", api_key=main_api_key) else: helper_model = main_brain models_dict = { "team": main_brain, "presenter": main_brain, "scout": helper_model, "optimizer": helper_model, "navigator": helper_model, "weatherman": helper_model } # 2. 準備 Tools (🔥 MCP 重構核心) if not self._global_toolkits: logger.warning("⚠️ MCP Toolkit is NOT initialized! Agents will have no tools.") def get_tool_list(agent_name): toolkit = self._global_toolkits.get(agent_name) return [toolkit] if toolkit else [] tools_dict = { "scout": get_tool_list("scout"), "optimizer": get_tool_list("optimizer"), "navigator": get_tool_list("navigator"), "weatherman": get_tool_list("weatherman"), "presenter": get_tool_list("presenter"), } planner_kwargs = { "additional_context": get_context(session.user_state), "timezone_identifier": session.user_state.utc_offset, "debug_mode": False, } team_kwargs = {"timezone_identifier": session.user_state.utc_offset} # 3. 建立 Agents session.planner_agent = create_planner_agent(main_brain, planner_kwargs, session_id=session.session_id) session.core_team = create_core_team(models_dict, team_kwargs, tools_dict, session_id=session.session_id) self._active_sessions[session.session_id] = session logger.info(f"✅ Agents initialized (MCP Mode) for session {session.session_id}") return session # ================= Step 1: Analyze Tasks ================= def run_step1_analysis(self, user_input: str, auto_location: bool, lat: float, lng: float, session: UserSession) -> Generator[Dict[str, Any], None, None]: if not user_input or len(user_input.strip()) == 0: yield {"type": "error", "message": "⚠️ Please enter your plans first!", "stream_text": "Waiting for input...", "block_next_step": True} return if auto_location and (lat == 0 or lng == 0): yield {"type": "error", "message": "⚠️ Location detection failed.", "stream_text": "Location Error...", "block_next_step": True} return if not auto_location and (lat is None or lng is None): yield {"type": "error", "message": "⚠️ Please enter valid Latitude/Longitude.", "stream_text": "Location Error...", "block_next_step": True} return try: session = self.initialize_agents(session, lat, lng) self._add_reasoning(session, "planner", "🚀 Starting analysis...") yield {"type": "stream", "stream_text": "🤔 Analyzing your request with AI...", "agent_status": ("planner", "working", "Initializing..."), "session": session} self._add_reasoning(session, "planner", f"Processing: {user_input[:50]}...") current_text = "🤔 Analyzing your request with AI...\n📋 AI is extracting tasks..." planner_stream = session.planner_agent.run( f"help user to update the task_list, user's message: {user_input}", stream=True, stream_events=True ) accumulated_response, displayed_text = "", current_text + "\n\n" for chunk in planner_stream: if chunk.event == RunEvent.run_content: content = chunk.content accumulated_response += content if "@@@" not in accumulated_response: displayed_text += content formatted_text = displayed_text.replace("\n", "
") yield {"type": "stream", "stream_text": formatted_text, "agent_status": ("planner", "working", "Thinking..."), "session": session} json_data = "{" + accumulated_response.split("{", maxsplit=1)[-1] json_data = json_data.replace("`", "").replace("@", "").replace("\\", " ").replace("\n", " ") try: task_list_data = json.loads(json_data) if task_list_data["global_info"]["start_location"].lower() == "user location": task_list_data["global_info"]["start_location"] = {"lat": lat, "lng": lng} session.planner_agent.update_session_state(session_id=session.session_id, session_state_updates={"task_list": task_list_data}) session.task_list = self._convert_task_list_to_ui_format(task_list_data) except Exception as e: logger.error(f"Failed to parse task_list: {e}") session.task_list = [] if not session.task_list: err_msg = "⚠️ AI couldn't identify any tasks." self._add_reasoning(session, "planner", "❌ No tasks found") yield {"type": "error", "message": err_msg, "stream_text": err_msg, "session": session, "block_next_step": True} return if "priority" in session.task_list: for i in session.task_list: if not session.task_list[i].get("priority"): session.task_list[i]["priority"] = "MEDIUM" else: session.task_list[i]["priority"] = session.task_list[i]["priority"].upper() high_priority = sum(1 for t in session.task_list if t.get("priority") == "HIGH") total_time = sum(int(t.get("duration", "0").split()[0]) for t in session.task_list if t.get("duration")) yield {"type": "complete", "stream_text": "Analysis complete!", "start_location": task_list_data["global_info"].get("start_location", "N/A"), "high_priority": high_priority, "total_time": total_time, "start_time": task_list_data["global_info"].get("departure_time", "N/A"), "session": session, "block_next_step": False} except Exception as e: logger.error(f"Error: {e}") yield {"type": "error", "message": str(e), "session": session, "block_next_step": True} # ================= Task Modification (Chat) ================= def modify_task_chat(self, user_message: str, session: UserSession) -> Generator[Dict[str, Any], None, None]: if not user_message or len(user_message.replace(' ', '')) == 0: yield {"type": "chat_error", "message": "Please enter a message.", "session": session} return session = self._get_live_session(session) session.chat_history.append( {"role": "user", "message": user_message, "time": datetime.now().strftime("%H:%M:%S")}) yield {"type": "update_history", "session": session} try: if session.planner_agent is None: if session.lat and session.lng: session = self.initialize_agents(session, session.lat, session.lng) else: yield {"type": "chat_error", "message": "Session lost. Please restart.", "session": session} return session.chat_history.append( {"role": "assistant", "message": "🤔 AI is thinking...", "time": datetime.now().strftime("%H:%M:%S")}) yield {"type": "update_history", "session": session} planner_stream = session.planner_agent.run( f"help user to update the task_list, user's message: {user_message}", stream=True, stream_events=True ) accumulated_response = "" for chunk in planner_stream: if chunk.event == RunEvent.run_content: accumulated_response += chunk.content json_data = "{" + accumulated_response.split("{", maxsplit=1)[-1] json_data = json_data.replace("`", "").replace("@", "").replace("\\", " ").replace("\n", " ") try: task_list_data = json.loads(json_data) if isinstance(task_list_data["global_info"]["start_location"], str) and task_list_data["global_info"][ "start_location"].lower() == "user location": task_list_data["global_info"]["start_location"] = {"lat": session.lat, "lng": session.lng} session.planner_agent.update_session_state(session_id=session.session_id, session_state_updates={"task_list": task_list_data}) session.task_list = self._convert_task_list_to_ui_format(task_list_data) except Exception as e: logger.error(f"Failed to parse modified task_list: {e}") raise e high_priority = sum(1 for t in session.task_list if t.get("priority") == "HIGH") total_time = sum(int(t.get("duration", "0").split()[0]) for t in session.task_list if t.get("duration")) start_location = task_list_data["global_info"].get("start_location", "N/A") date = task_list_data["global_info"].get("departure_time", "N/A") summary_html = create_summary_card(len(session.task_list), high_priority, total_time, start_location, date) session.chat_history[-1] = {"role": "assistant", "message": "✅ Tasks updated.", "time": datetime.now().strftime("%H:%M:%S")} self._add_reasoning(session, "planner", f"Updated: {user_message[:30]}...") yield {"type": "complete", "summary_html": summary_html, "session": session} except Exception as e: logger.error(f"Chat error: {e}") session.chat_history.append( {"role": "assistant", "message": f"❌ Error: {str(e)}", "time": datetime.now().strftime("%H:%M:%S")}) yield {"type": "update_history", "session": session} # ================= Step 3: Run Core Team ================= async def run_step3_team(self, session: UserSession) -> AsyncGenerator[Dict[str, Any], None]: token = client_session_ctx.set(session.session_id) attempt = 0 success = False start_time = time.perf_counter() try: session = self._get_live_session(session) sid = session.session_id if sid in self._cancelled_sessions: self._cancelled_sessions.remove(sid) if not session.task_list: yield {"type": "error", "message": "No tasks to plan.", "session": session} return task_list_input = session.planner_agent.get_session_state()["task_list"] task_list_str = json.dumps(task_list_input, indent=2, ensure_ascii=False) if isinstance(task_list_input, ( dict, list)) else str(task_list_input) self._add_reasoning(session, "team", "🎯 Multi-agent collaboration started") yield {"type": "reasoning_update", "session": session, "agent_status": ("team", "working", "Analyzing tasks...")} while attempt < max_retries and not success: attempt += 1 try: active_agents = set() # 🔥 重點修改 1: 使用 arun (Async Run) # 🔥 重點修改 2: 這個方法本身回傳的是一個 AsyncGenerator,所以要直接 iterate team_stream = session.core_team.arun( task_list_str, stream=True, stream_events=True, session_id=session.session_id ) report_content = "" start_time = time.perf_counter() has_content = False # 🔥 重點修改 3: 使用 async for 來迭代 async for event in team_stream: if event.event in [RunEvent.run_content, RunEvent.tool_call_started]: has_content = True success = True if sid in self._cancelled_sessions: logger.warning(f"🛑 Execution terminated by user for session {sid}") self._cancelled_sessions.remove(sid) yield {"type": "error", "message": "Plan cancelled by user."} return if event.event == RunEvent.run_started: agent_id = event.agent_id or "team" active_agents.add(agent_id) if agent_id == "presenter": report_content = "" yield {"type": "reasoning_update", "session": session, "agent_status": (agent_id, "working", "Thinking...")} elif event.event == RunEvent.run_completed: agent_id = event.agent_id or "team" if agent_id == "team": yield {"type": "reasoning_update", "session": session, "agent_status": ("team", "working", "Processing...")} continue if agent_id in active_agents: active_agents.remove(agent_id) yield {"type": "reasoning_update", "session": session, "agent_status": (agent_id, "idle", "Standby")} yield {"type": "reasoning_update", "session": session, "agent_status": ("team", "working", "Reviewing results...")} elif event.event == RunEvent.run_content and event.agent_id == "presenter": report_content += event.content yield {"type": "report_stream", "content": report_content, "session": session} elif event.event == TeamRunEvent.tool_call_started: tool_name = event.tool.tool_name yield {"type": "reasoning_update", "session": session, "agent_status": ("team", "working", "Orchestrating...")} if "delegate_task_to_member" in tool_name: member_id = event.tool.tool_args.get("member_id", "unknown") self._add_reasoning(session, "team", f"👉 Delegating to {member_id}...") yield {"type": "reasoning_update", "session": session, "agent_status": (member_id, "working", "Receiving Task...")} else: self._add_reasoning(session, "team", f"🔧 Tool: {tool_name}") elif event.event == RunEvent.tool_call_started: member_id = event.agent_id tool_name = event.tool.tool_name yield {"type": "reasoning_update", "session": session, "agent_status": ("team", "working", f"Monitoring {member_id}...")} self._add_reasoning(session, member_id, f"Using tool: {tool_name}...") yield {"type": "reasoning_update", "session": session, "agent_status": (member_id, "working", f"Running Tool...")} elif event.event == TeamRunEvent.run_completed: self._add_reasoning(session, "team", "🎉 Planning process finished") if hasattr(event, 'metrics'): logger.info(f"Total tokens: {event.metrics.total_tokens}") logger.info(f"Input tokens: {event.metrics.input_tokens}") logger.info(f"Output tokens: {event.metrics.output_tokens}") if not has_content and attempt < max_retries: continue break finally: logger.info(f"Run time (s): {time.perf_counter() - start_time}") for agent in ["scout", "optimizer", "navigator", "weatherman", "presenter"]: yield {"type": "reasoning_update", "session": session, "agent_status": (agent, "idle", "Standby")} yield {"type": "reasoning_update", "session": session, "agent_status": ("team", "complete", "All Done!")} session.final_report = report_html = f"## 🎯 Planning Complete\n\n{report_content}" yield {"type": "complete", "report_html": report_html, "session": session, "agent_status": ("team", "complete", "Finished")} except GeneratorExit: return except Exception as e: logger.error(f"Error in attempt {attempt}: {e}") if attempt >= max_retries: yield {"type": "error", "message": str(e), "session": session} finally: client_session_ctx.reset(token) # ================= Step 4: Finalize ================= def run_step4_finalize(self, session: UserSession) -> Dict[str, Any]: try: session = self._get_live_session(session) final_ref_id = poi_repo.get_last_id_by_session(session.session_id) if not final_ref_id: raise ValueError(f"No results found, please Stop & Back to Edit to re-run the planning.") structured_data = poi_repo.load(final_ref_id) timeline_html = create_timeline_html_enhanced(structured_data.get("timeline", [])) metrics = structured_data.get("metrics", {}) traffic = structured_data.get("traffic_summary", {}) task_count = f"{metrics['completed_tasks']} / {metrics['total_tasks']}" high_prio = sum(1 for t in session.task_list if t.get("priority") == "HIGH") total_time = metrics.get("optimized_duration_min", traffic.get("total_duration_min", 0)) dist_m = metrics.get("optimized_distance_m", 0) total_dist_km = dist_m / 1000.0 efficiency = metrics.get("route_efficiency_pct", 0) saved_dist_m = metrics.get("distance_saved_m", 0) saved_time_min = metrics.get("time_saved_min", 0) start_location = structured_data.get("global_info", {}).get("start_location", {}).get("name", "N/A") date = structured_data.get("global_info", {}).get("departure_time", "N/A") summary_card = create_summary_card(task_count, high_prio, int(total_time), start_location, date) eff_color = "#047857" if efficiency >= 80 else "#d97706" eff_bg = "#ecfdf5" if efficiency >= 80 else "#fffbeb" eff_border = "#a7f3d0" if efficiency >= 80 else "#fde68a" ai_stats_html = f"""
🚀 AI EFFICIENCY
{efficiency:.1f}%
⚡ Saved {saved_time_min:.0f} mins
🚗 TOTAL DISTANCE
{total_dist_km:.2f} km
📉 Reduced {saved_dist_m} m
""" full_summary_html = f"{summary_card}{ai_stats_html}

📍 Itinerary Timeline

{timeline_html}" map_fig = create_animated_map(structured_data) task_list_html = self.generate_task_list_html(session) session.planning_completed = True return {"type": "success", "summary_tab_html": full_summary_html, "report_md": session.final_report, "task_list_html": task_list_html, "map_fig": map_fig, "session": session} except Exception as e: logger.error(f"Finalize error: {e}", exc_info=True) return {"type": "error", "message": str(e), "session": session} # ================= Helpers ================= def _add_reasoning(self, session: UserSession, agent: str, message: str): session.reasoning_messages.append( {"agent": agent, "message": message, "time": datetime.now().strftime("%H:%M:%S")}) def _convert_task_list_to_ui_format(self, task_list_data): ui_tasks = [] if isinstance(task_list_data, dict): tasks = task_list_data.get("tasks", []) elif isinstance(task_list_data, list): tasks = task_list_data else: return [] for i, task in enumerate(tasks, 1): ui_tasks.append({ "id": i, "title": task.get("description", "Task"), "priority": task.get("priority", "MEDIUM"), "time": task.get("time_window", "Anytime"), "duration": f"{task.get('service_duration_min', 30)} minutes", "location": task.get("location_hint", "To be determined"), "icon": self._get_task_icon(task.get("category", "other")) }) return ui_tasks def _get_task_icon(self, category: str) -> str: icons = {"medical": "🏥", "shopping": "🛒", "postal": "📮", "food": "🍽️", "entertainment": "🎭", "transportation": "🚗", "other": "📋"} return icons.get(category.lower(), "📋") def generate_task_list_html(self, session: UserSession) -> str: if not session.task_list: return "

No tasks available

" html = "" for task in session.task_list: html += create_task_card(task["id"], task["title"], task["priority"], task["time"], task["duration"], task["location"], task.get("icon", "📋")) return html