"""
LifeFlow AI - Planner Service (Refactored for MCP Architecture)
核心業務邏輯層:負責協調 Agent 運作、狀態更新與資料處理。
✅ 移除本地 Toolkits
✅ 整合全域 MCP Client
✅ 保持業務邏輯不變
"""
import json
import os
import time
import uuid
from datetime import datetime
from typing import Generator, Dict, Any, Tuple, Optional, AsyncGenerator
# Core Imports
from core.session import UserSession
from ui.renderers import (
create_task_card,
create_summary_card,
create_timeline_html_enhanced,
)
from core.visualizers import create_animated_map
from config import AGENTS_INFO
# Models
from agno.models.google import Gemini
from agno.models.openai import OpenAIChat
from agno.models.groq import Groq
# Agno Framework
from agno.agent import RunEvent
from agno.run.team import TeamRunEvent
from src.agent.base import UserState, Location, get_context
from src.agent.planner import create_planner_agent
from src.agent.core_team import create_core_team
from src.infra.context import set_session_id
from src.infra.poi_repository import poi_repo
from src.infra.logger import get_logger
from src.infra.client_context import client_session_ctx
# 🔥🔥🔥 NEW IMPORTS: 只使用 MCPTools
from agno.tools.mcp import MCPTools
gemini_safety_settings = [
{"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE"},
{"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_NONE"},
{"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_NONE"},
{"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"},
]
logger = get_logger(__name__)
max_retries = 5
class PlannerService:
"""
PlannerService (MCP Version)
"""
# Active Sessions 快取 (僅存 Session 物件,不含全域資源)
_active_sessions: Dict[str, UserSession] = {}
_cancelled_sessions: set = set()
# 🔥 全域工具參照 (由 App 注入)
_global_toolkits: Dict[str, MCPTools] = {}
def set_global_tools(self, toolkits_dict: Dict[str, MCPTools]):
"""由 app.py 呼叫,注入 MCP Client"""
self._global_toolkits = toolkits_dict
logger.info("💉 MCP Toolkit injected into PlannerService successfully.")
def _inject_session_id_into_toolkit(self, toolkit: MCPTools, session_id: str):
"""
Monkey Patch 2.0:
攔截 Toolkit 產生的 Function,並在執行時自動注入 session_id 參數。
"""
# 1. 備份原始的 get_tools 方法
# 因為 Agno 是呼叫 get_tools() 來取得工具列表的
original_get_tools = toolkit.functions
def get_tools_wrapper():
# 2. 取得原始工具列表 (List[Function])
tools = original_get_tools
for tool in tools:
# 3. 備份每個工具的執行入口 (entrypoint)
original_entrypoint = tool.entrypoint
# 4. 定義新的執行入口 (Wrapper)
def entrypoint_wrapper(*args, **kwargs):
# 🔥 核心魔法:在這裡偷偷塞入 session_id
# 這樣 LLM 沒傳這個參數,也會被自動補上
kwargs['session_id'] = session_id
# 執行原始 MCP 呼叫
return original_entrypoint(*args, **kwargs)
# 5. 替換掉入口
tool.entrypoint = entrypoint_wrapper
return tools
# 6. 將 Toolkit 的 get_tools 換成我們的 Wrapper
# 這是 Instance Level 的修改,只會影響當前這個 Agent 的 Toolkit
toolkit.get_tools = get_tools_wrapper
def cancel_session(self, session_id: str):
if session_id:
logger.info(f"🛑 Requesting cancellation for session: {session_id}")
self._cancelled_sessions.add(session_id)
def _get_live_session(self, incoming_session: UserSession) -> UserSession:
sid = incoming_session.session_id
if not sid: return incoming_session
if sid and sid in self._active_sessions:
live_session = self._active_sessions[sid]
live_session.lat = incoming_session.lat
live_session.lng = incoming_session.lng
if incoming_session.custom_settings:
live_session.custom_settings.update(incoming_session.custom_settings)
if len(incoming_session.chat_history) > len(live_session.chat_history):
live_session.chat_history = incoming_session.chat_history
return live_session
self._active_sessions[sid] = incoming_session
return incoming_session
def initialize_agents(self, session: UserSession, lat: float, lng: float) -> UserSession:
if not session.session_id:
session.session_id = str(uuid.uuid4())
logger.info(f"🆔 Generated New Session ID: {session.session_id}")
session = self._get_live_session(session)
session.lat = lat
session.lng = lng
if not session.user_state:
session.user_state = UserState(location=Location(lat=lat, lng=lng))
else:
session.user_state.location = Location(lat=lat, lng=lng)
if session.planner_agent is not None:
return session
# 1. 設定模型 (Models)
settings = session.custom_settings
provider = settings.get("llm_provider", "Gemini")
main_api_key = settings.get("model_api_key")
selected_model_id = settings.get("model", "gemini-2.5-flash")
helper_model_id = settings.get("groq_fast_model", "openai/gpt-oss-20b")
enable_fast_mode = settings.get("enable_fast_mode", False)
if main_api_key is None:
provider = "Gemini"
main_api_key = os.environ.get("GEMINI_API_KEY")
selected_model_id = "gemini-2.5-flash"
logger.warning("⚠️ Main API key not provided, defaulting to Gemini-2.5-flash with env var.")
groq_api_key = settings["groq_api_key"] if settings.get("groq_api_key") else os.environ.get("GROQ_API_KEY")
# 初始化 Main Brain
if provider.lower() == "gemini":
main_brain = Gemini(id=selected_model_id, api_key=main_api_key, thinking_budget=1024,
safety_settings=gemini_safety_settings)
elif provider.lower() == "openai":
main_brain = OpenAIChat(id=selected_model_id, api_key=main_api_key)
elif provider.lower() == "groq":
main_brain = Groq(id=selected_model_id, api_key=main_api_key, temperature=0.1)
else:
main_brain = Gemini(id='gemini-2.5-flash', api_key=main_api_key)
# 初始化 Helper Model
if enable_fast_mode and groq_api_key:
helper_model = Groq(id=helper_model_id, api_key=groq_api_key, temperature=0.1)
else:
if provider.lower() == "gemini":
helper_model = Gemini(id="gemini-2.5-flash-lite", api_key=main_api_key,
safety_settings=gemini_safety_settings)
elif provider.lower() == "openai":
helper_model = OpenAIChat(id="gpt-4o-mini", api_key=main_api_key)
else:
helper_model = main_brain
models_dict = {
"team": main_brain,
"presenter": main_brain,
"scout": helper_model,
"optimizer": helper_model,
"navigator": helper_model,
"weatherman": helper_model
}
# 2. 準備 Tools (🔥 MCP 重構核心)
if not self._global_toolkits:
logger.warning("⚠️ MCP Toolkit is NOT initialized! Agents will have no tools.")
def get_tool_list(agent_name):
toolkit = self._global_toolkits.get(agent_name)
return [toolkit] if toolkit else []
tools_dict = {
"scout": get_tool_list("scout"),
"optimizer": get_tool_list("optimizer"),
"navigator": get_tool_list("navigator"),
"weatherman": get_tool_list("weatherman"),
"presenter": get_tool_list("presenter"),
}
planner_kwargs = {
"additional_context": get_context(session.user_state),
"timezone_identifier": session.user_state.utc_offset,
"debug_mode": False,
}
team_kwargs = {"timezone_identifier": session.user_state.utc_offset}
# 3. 建立 Agents
session.planner_agent = create_planner_agent(main_brain, planner_kwargs, session_id=session.session_id)
session.core_team = create_core_team(models_dict, team_kwargs, tools_dict, session_id=session.session_id)
self._active_sessions[session.session_id] = session
logger.info(f"✅ Agents initialized (MCP Mode) for session {session.session_id}")
return session
# ================= Step 1: Analyze Tasks =================
def run_step1_analysis(self, user_input: str, auto_location: bool,
lat: float, lng: float, session: UserSession) -> Generator[Dict[str, Any], None, None]:
if not user_input or len(user_input.strip()) == 0:
yield {"type": "error", "message": "⚠️ Please enter your plans first!",
"stream_text": "Waiting for input...", "block_next_step": True}
return
if auto_location and (lat == 0 or lng == 0):
yield {"type": "error", "message": "⚠️ Location detection failed.", "stream_text": "Location Error...",
"block_next_step": True}
return
if not auto_location and (lat is None or lng is None):
yield {"type": "error", "message": "⚠️ Please enter valid Latitude/Longitude.",
"stream_text": "Location Error...", "block_next_step": True}
return
try:
session = self.initialize_agents(session, lat, lng)
self._add_reasoning(session, "planner", "🚀 Starting analysis...")
yield {"type": "stream", "stream_text": "🤔 Analyzing your request with AI...",
"agent_status": ("planner", "working", "Initializing..."), "session": session}
self._add_reasoning(session, "planner", f"Processing: {user_input[:50]}...")
current_text = "🤔 Analyzing your request with AI...\n📋 AI is extracting tasks..."
planner_stream = session.planner_agent.run(
f"help user to update the task_list, user's message: {user_input}",
stream=True, stream_events=True
)
accumulated_response, displayed_text = "", current_text + "\n\n"
for chunk in planner_stream:
if chunk.event == RunEvent.run_content:
content = chunk.content
accumulated_response += content
if "@@@" not in accumulated_response:
displayed_text += content
formatted_text = displayed_text.replace("\n", "
")
yield {"type": "stream", "stream_text": formatted_text,
"agent_status": ("planner", "working", "Thinking..."), "session": session}
json_data = "{" + accumulated_response.split("{", maxsplit=1)[-1]
json_data = json_data.replace("`", "").replace("@", "").replace("\\", " ").replace("\n", " ")
try:
task_list_data = json.loads(json_data)
if task_list_data["global_info"]["start_location"].lower() == "user location":
task_list_data["global_info"]["start_location"] = {"lat": lat, "lng": lng}
session.planner_agent.update_session_state(session_id=session.session_id,
session_state_updates={"task_list": task_list_data})
session.task_list = self._convert_task_list_to_ui_format(task_list_data)
except Exception as e:
logger.error(f"Failed to parse task_list: {e}")
session.task_list = []
if not session.task_list:
err_msg = "⚠️ AI couldn't identify any tasks."
self._add_reasoning(session, "planner", "❌ No tasks found")
yield {"type": "error", "message": err_msg, "stream_text": err_msg, "session": session,
"block_next_step": True}
return
if "priority" in session.task_list:
for i in session.task_list:
if not session.task_list[i].get("priority"):
session.task_list[i]["priority"] = "MEDIUM"
else:
session.task_list[i]["priority"] = session.task_list[i]["priority"].upper()
high_priority = sum(1 for t in session.task_list if t.get("priority") == "HIGH")
total_time = sum(int(t.get("duration", "0").split()[0]) for t in session.task_list if t.get("duration"))
yield {"type": "complete", "stream_text": "Analysis complete!",
"start_location": task_list_data["global_info"].get("start_location", "N/A"),
"high_priority": high_priority, "total_time": total_time,
"start_time": task_list_data["global_info"].get("departure_time", "N/A"), "session": session,
"block_next_step": False}
except Exception as e:
logger.error(f"Error: {e}")
yield {"type": "error", "message": str(e), "session": session, "block_next_step": True}
# ================= Task Modification (Chat) =================
def modify_task_chat(self, user_message: str, session: UserSession) -> Generator[Dict[str, Any], None, None]:
if not user_message or len(user_message.replace(' ', '')) == 0:
yield {"type": "chat_error", "message": "Please enter a message.", "session": session}
return
session = self._get_live_session(session)
session.chat_history.append(
{"role": "user", "message": user_message, "time": datetime.now().strftime("%H:%M:%S")})
yield {"type": "update_history", "session": session}
try:
if session.planner_agent is None:
if session.lat and session.lng:
session = self.initialize_agents(session, session.lat, session.lng)
else:
yield {"type": "chat_error", "message": "Session lost. Please restart.", "session": session}
return
session.chat_history.append(
{"role": "assistant", "message": "🤔 AI is thinking...", "time": datetime.now().strftime("%H:%M:%S")})
yield {"type": "update_history", "session": session}
planner_stream = session.planner_agent.run(
f"help user to update the task_list, user's message: {user_message}",
stream=True, stream_events=True
)
accumulated_response = ""
for chunk in planner_stream:
if chunk.event == RunEvent.run_content:
accumulated_response += chunk.content
json_data = "{" + accumulated_response.split("{", maxsplit=1)[-1]
json_data = json_data.replace("`", "").replace("@", "").replace("\\", " ").replace("\n", " ")
try:
task_list_data = json.loads(json_data)
if isinstance(task_list_data["global_info"]["start_location"], str) and task_list_data["global_info"][
"start_location"].lower() == "user location":
task_list_data["global_info"]["start_location"] = {"lat": session.lat, "lng": session.lng}
session.planner_agent.update_session_state(session_id=session.session_id,
session_state_updates={"task_list": task_list_data})
session.task_list = self._convert_task_list_to_ui_format(task_list_data)
except Exception as e:
logger.error(f"Failed to parse modified task_list: {e}")
raise e
high_priority = sum(1 for t in session.task_list if t.get("priority") == "HIGH")
total_time = sum(int(t.get("duration", "0").split()[0]) for t in session.task_list if t.get("duration"))
start_location = task_list_data["global_info"].get("start_location", "N/A")
date = task_list_data["global_info"].get("departure_time", "N/A")
summary_html = create_summary_card(len(session.task_list), high_priority, total_time, start_location, date)
session.chat_history[-1] = {"role": "assistant", "message": "✅ Tasks updated.",
"time": datetime.now().strftime("%H:%M:%S")}
self._add_reasoning(session, "planner", f"Updated: {user_message[:30]}...")
yield {"type": "complete", "summary_html": summary_html, "session": session}
except Exception as e:
logger.error(f"Chat error: {e}")
session.chat_history.append(
{"role": "assistant", "message": f"❌ Error: {str(e)}", "time": datetime.now().strftime("%H:%M:%S")})
yield {"type": "update_history", "session": session}
# ================= Step 3: Run Core Team =================
async def run_step3_team(self, session: UserSession) -> AsyncGenerator[Dict[str, Any], None]:
token = client_session_ctx.set(session.session_id)
attempt = 0
success = False
start_time = time.perf_counter()
try:
session = self._get_live_session(session)
sid = session.session_id
if sid in self._cancelled_sessions: self._cancelled_sessions.remove(sid)
if not session.task_list:
yield {"type": "error", "message": "No tasks to plan.", "session": session}
return
task_list_input = session.planner_agent.get_session_state()["task_list"]
task_list_str = json.dumps(task_list_input, indent=2, ensure_ascii=False) if isinstance(task_list_input, (
dict, list)) else str(task_list_input)
self._add_reasoning(session, "team", "🎯 Multi-agent collaboration started")
yield {"type": "reasoning_update", "session": session,
"agent_status": ("team", "working", "Analyzing tasks...")}
while attempt < max_retries and not success:
attempt += 1
try:
active_agents = set()
# 🔥 重點修改 1: 使用 arun (Async Run)
# 🔥 重點修改 2: 這個方法本身回傳的是一個 AsyncGenerator,所以要直接 iterate
team_stream = session.core_team.arun(
task_list_str,
stream=True,
stream_events=True,
session_id=session.session_id
)
report_content = ""
start_time = time.perf_counter()
has_content = False
# 🔥 重點修改 3: 使用 async for 來迭代
async for event in team_stream:
if event.event in [RunEvent.run_content, RunEvent.tool_call_started]:
has_content = True
success = True
if sid in self._cancelled_sessions:
logger.warning(f"🛑 Execution terminated by user for session {sid}")
self._cancelled_sessions.remove(sid)
yield {"type": "error", "message": "Plan cancelled by user."}
return
if event.event == RunEvent.run_started:
agent_id = event.agent_id or "team"
active_agents.add(agent_id)
if agent_id == "presenter": report_content = ""
yield {"type": "reasoning_update", "session": session,
"agent_status": (agent_id, "working", "Thinking...")}
elif event.event == RunEvent.run_completed:
agent_id = event.agent_id or "team"
if agent_id == "team":
yield {"type": "reasoning_update", "session": session,
"agent_status": ("team", "working", "Processing...")}
continue
if agent_id in active_agents: active_agents.remove(agent_id)
yield {"type": "reasoning_update", "session": session,
"agent_status": (agent_id, "idle", "Standby")}
yield {"type": "reasoning_update", "session": session,
"agent_status": ("team", "working", "Reviewing results...")}
elif event.event == RunEvent.run_content and event.agent_id == "presenter":
report_content += event.content
yield {"type": "report_stream", "content": report_content, "session": session}
elif event.event == TeamRunEvent.tool_call_started:
tool_name = event.tool.tool_name
yield {"type": "reasoning_update", "session": session,
"agent_status": ("team", "working", "Orchestrating...")}
if "delegate_task_to_member" in tool_name:
member_id = event.tool.tool_args.get("member_id", "unknown")
self._add_reasoning(session, "team", f"👉 Delegating to {member_id}...")
yield {"type": "reasoning_update", "session": session,
"agent_status": (member_id, "working", "Receiving Task...")}
else:
self._add_reasoning(session, "team", f"🔧 Tool: {tool_name}")
elif event.event == RunEvent.tool_call_started:
member_id = event.agent_id
tool_name = event.tool.tool_name
yield {"type": "reasoning_update", "session": session,
"agent_status": ("team", "working", f"Monitoring {member_id}...")}
self._add_reasoning(session, member_id, f"Using tool: {tool_name}...")
yield {"type": "reasoning_update", "session": session,
"agent_status": (member_id, "working", f"Running Tool...")}
elif event.event == TeamRunEvent.run_completed:
self._add_reasoning(session, "team", "🎉 Planning process finished")
if hasattr(event, 'metrics'):
logger.info(f"Total tokens: {event.metrics.total_tokens}")
logger.info(f"Input tokens: {event.metrics.input_tokens}")
logger.info(f"Output tokens: {event.metrics.output_tokens}")
if not has_content and attempt < max_retries: continue
break
finally:
logger.info(f"Run time (s): {time.perf_counter() - start_time}")
for agent in ["scout", "optimizer", "navigator", "weatherman", "presenter"]:
yield {"type": "reasoning_update", "session": session, "agent_status": (agent, "idle", "Standby")}
yield {"type": "reasoning_update", "session": session, "agent_status": ("team", "complete", "All Done!")}
session.final_report = report_html = f"## 🎯 Planning Complete\n\n{report_content}"
yield {"type": "complete", "report_html": report_html, "session": session,
"agent_status": ("team", "complete", "Finished")}
except GeneratorExit:
return
except Exception as e:
logger.error(f"Error in attempt {attempt}: {e}")
if attempt >= max_retries: yield {"type": "error", "message": str(e), "session": session}
finally:
client_session_ctx.reset(token)
# ================= Step 4: Finalize =================
def run_step4_finalize(self, session: UserSession) -> Dict[str, Any]:
try:
session = self._get_live_session(session)
final_ref_id = poi_repo.get_last_id_by_session(session.session_id)
if not final_ref_id:
raise ValueError(f"No results found, please Stop & Back to Edit to re-run the planning.")
structured_data = poi_repo.load(final_ref_id)
timeline_html = create_timeline_html_enhanced(structured_data.get("timeline", []))
metrics = structured_data.get("metrics", {})
traffic = structured_data.get("traffic_summary", {})
task_count = f"{metrics['completed_tasks']} / {metrics['total_tasks']}"
high_prio = sum(1 for t in session.task_list if t.get("priority") == "HIGH")
total_time = metrics.get("optimized_duration_min", traffic.get("total_duration_min", 0))
dist_m = metrics.get("optimized_distance_m", 0)
total_dist_km = dist_m / 1000.0
efficiency = metrics.get("route_efficiency_pct", 0)
saved_dist_m = metrics.get("distance_saved_m", 0)
saved_time_min = metrics.get("time_saved_min", 0)
start_location = structured_data.get("global_info", {}).get("start_location", {}).get("name", "N/A")
date = structured_data.get("global_info", {}).get("departure_time", "N/A")
summary_card = create_summary_card(task_count, high_prio, int(total_time), start_location, date)
eff_color = "#047857" if efficiency >= 80 else "#d97706"
eff_bg = "#ecfdf5" if efficiency >= 80 else "#fffbeb"
eff_border = "#a7f3d0" if efficiency >= 80 else "#fde68a"
ai_stats_html = f"""
No tasks available
" html = "" for task in session.task_list: html += create_task_card(task["id"], task["title"], task["priority"], task["time"], task["duration"], task["location"], task.get("icon", "📋")) return html