LifeFlow-AI / app.py
Marco310's picture
buildup gradio app
3b3daa9
raw
history blame
43.4 kB
"""
LifeFlow AI - Multi-User Safe with Real Streaming
支持多用戶並發 + 真正的串流輸出 + 安全的設定隔離
"""
import sys
from pathlib import Path
import gradio as gr
from datetime import datetime
import time as time_module
import json
import uuid
from typing import Dict, Any, Optional
# ===== 導入配置 =====
from config import DEFAULT_SETTINGS, APP_TITLE
# ===== 導入 UI 組件 =====
from ui.theme import get_enhanced_css
from ui.components.header import create_header, create_top_controls
from ui.components.input_form import create_input_form, toggle_location_inputs
from ui.components.confirmation import create_confirmation_area, create_exit_button
from ui.components.results import create_team_area, create_result_area, create_tabs
from ui.components.modals import create_settings_modal, create_doc_modal
# ===== 導入核心工具 =====
from core.utils import (
create_agent_stream_output, create_agent_card_enhanced,
create_task_card, create_summary_card, create_animated_map,
get_reasoning_html_reversed, create_celebration_animation,
create_result_visualization
)
# ===== 導入 Agno Agent 組件 =====
from agno.models.google import Gemini
from agno.agent import RunEvent
from agno.run.team import TeamRunEvent
# ===== 導入 Agent 系統 =====
from src.agent.base import UserState, Location, get_context
from src.agent.planner import create_planner_agent
from src.agent.core_team import create_core_team
from src.infra.context import set_session_id, get_session_id
from src.infra.poi_repository import poi_repo
from src.tools import (
ScoutToolkit, OptimizationToolkit,
NavigationToolkit, WeatherToolkit, ReaderToolkit
)
from src.infra.config import get_settings
from src.infra.logger import get_logger
logger = get_logger(__name__)
class UserSession:
"""
單個用戶的會話數據
每個用戶有獨立的實例,確保資料隔離
"""
def __init__(self):
self.session_id: Optional[str] = None
self.planner_agent = None
self.core_team = None
self.user_state: Optional[UserState] = None
self.task_list: list = []
self.reasoning_messages: list = []
self.chat_history: list = []
self.planning_completed: bool = False
# 存儲位置信息(用於重新初始化 Agent)
self.lat: Optional[float] = None
self.lng: Optional[float] = None
# [Security Fix] 用戶個人的自定義設定 (API Keys, Model Choice)
self.custom_settings: Dict[str, Any] = {}
# 系統預設設定 (從環境變數讀取)
self.agno_settings = get_settings()
def to_dict(self) -> Dict[str, Any]:
"""序列化為字典(用於 Gradio State)"""
return {
'session_id': self.session_id,
'task_list': self.task_list,
'reasoning_messages': self.reasoning_messages,
'chat_history': self.chat_history,
'planning_completed': self.planning_completed,
'lat': self.lat,
'lng': self.lng,
# [Security Fix] 保存用戶自定義設定
'custom_settings': self.custom_settings
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'UserSession':
"""從字典恢復(用於 Gradio State)"""
session = cls()
session.session_id = data.get('session_id')
session.task_list = data.get('task_list', [])
session.reasoning_messages = data.get('reasoning_messages', [])
session.chat_history = data.get('chat_history', [])
session.planning_completed = data.get('planning_completed', False)
session.lat = data.get('lat')
session.lng = data.get('lng')
# [Security Fix] 恢復用戶自定義設定
session.custom_settings = data.get('custom_settings', {})
return session
class LifeFlowAI:
"""LifeFlow AI - Multi-User Safe Version"""
def __init__(self):
# [Security Fix] 移除全域 settings,避免用戶間資料汙染
pass
def _initialize_agents(self, session: UserSession, lat: float, lng: float):
"""初始化 Agents(每個用戶獨立,並應用用戶設定)"""
session.lat = lat
session.lng = lng
if session.planner_agent is not None:
logger.debug(f"Agents already initialized for session {session.session_id}")
return
# 生成 Session ID
if session.session_id is None:
session.session_id = str(uuid.uuid4())
token = set_session_id(session.session_id)
logger.info(f"🆔 New Session: {session.session_id}")
else:
set_session_id(session.session_id)
logger.info(f"🔄 Restoring Session: {session.session_id}")
# 設定用戶狀態
session.user_state = UserState(location=Location(lat=lat, lng=lng))
# [Security Fix] 讀取用戶選擇的模型,如果沒有則使用預設
selected_model_id = session.custom_settings.get('model', 'gemini-2.5-flash')
# [Security Fix] 優先使用用戶提供的 API Key (這裡以 Gemini 為例,若支援其他模型需擴充邏輯)
# 注意:實際應用中需根據選擇的 Model ID (Claude/Gemini) 來決定使用哪個 Key
gemini_key = session.custom_settings.get('gemini_api_key') or session.agno_settings.gemini_api_key
# 初始化模型 (應用設定)
planner_model = Gemini(
id=selected_model_id,
thinking_budget=2048,
api_key=gemini_key
)
main_model = Gemini(
id=selected_model_id,
thinking_budget=1024,
api_key=gemini_key
)
lite_model = Gemini(
id="gemini-2.5-flash-lite", # 輕量級模型通常固定或由次要選項決定
api_key=gemini_key
)
# 配置模型和工具
models_dict = {
"team": main_model,
"scout": main_model,
"optimizer": lite_model,
"navigator": lite_model,
"weatherman": lite_model,
"presenter": main_model,
}
# [Note] 如果 Toolkit 支援傳入 API Key,應在此處從 session.custom_settings 傳入
tools_dict = {
"scout": [ScoutToolkit()],
"optimizer": [OptimizationToolkit()],
"navigator": [NavigationToolkit()],
"weatherman": [WeatherToolkit()],
"presenter": [ReaderToolkit()],
}
planner_kwargs = {
"additional_context": get_context(session.user_state),
"timezone_identifier": session.user_state.utc_offset,
"debug_mode": False,
}
team_kwargs = {
"timezone_identifier": session.user_state.utc_offset,
}
# 創建 Agents
session.planner_agent = create_planner_agent(
planner_model,
planner_kwargs,
session_id=session.session_id
)
session.core_team = create_core_team(
models_dict,
team_kwargs,
tools_dict,
session_id=session.session_id
)
logger.info(f"✅ Agents initialized for session {session.session_id} using model {selected_model_id}")
def step1_analyze_tasks(self, user_input: str, auto_location: bool,
lat: float, lon: float, session: UserSession):
"""Step 1: 真正的串流分析"""
if not user_input.strip():
yield from self._empty_step1_outputs(session)
return
if auto_location:
lat, lon = 25.033, 121.565
try:
self._initialize_agents(session, lat, lon)
self._add_reasoning(session, "planner", "🚀 Starting analysis...")
yield self._create_step1_outputs(
stream_text="🤔 Analyzing your request with AI...",
session=session,
agent_status=("planner", "working", "Initializing...")
)
time_module.sleep(0.3)
self._add_reasoning(session, "planner", f"Processing: {user_input[:50]}...")
yield self._create_step1_outputs(
stream_text="🤔 Analyzing your request with AI...\n📋 AI is extracting tasks...",
session=session,
agent_status=("planner", "working", "Extracting tasks...")
)
planner_stream = session.planner_agent.run(
f"help user to update the task_list, user's message: {user_input}",
stream=True,
stream_events=True
)
accumulated_response = ""
displayed_text = "🤔 Analyzing your request with AI...\n📋 AI is extracting tasks...\n\n"
show_content = True
for chunk in planner_stream:
if chunk.event == RunEvent.run_content:
content = chunk.content
accumulated_response += content
if show_content:
if "@@@" in accumulated_response:
show_content = False
remaining = content.split("@@@")[0]
if remaining:
displayed_text += remaining
else:
displayed_text += content
yield self._create_step1_outputs(
stream_text=displayed_text,
session=session,
agent_status=("planner", "working", "Processing...")
)
json_data = "{" + accumulated_response.split("{", maxsplit=1)[-1]
json_data = json_data.replace("`", "").replace("@", "").replace("\\", " ").replace("\n", " ")
session.planner_agent.update_session_state(
session_id=session.session_id,
session_state_updates={"task_list": json_data}
)
try:
task_list_data = json.loads(json_data)
session.task_list = self._convert_task_list_to_ui_format(task_list_data)
except json.JSONDecodeError as e:
logger.error(f"Failed to parse task_list: {e}")
session.task_list = []
self._add_reasoning(session, "planner", f"✅ Extracted {len(session.task_list)} tasks")
high_priority = sum(1 for t in session.task_list if t.get("priority") == "HIGH")
total_time = sum(
int(t.get("duration", "0").split()[0])
for t in session.task_list
if t.get("duration")
)
final_text = displayed_text + f"\n✅ Analysis complete! Found {len(session.task_list)} tasks."
yield self._create_step1_complete_outputs(
stream_text=final_text,
session=session,
high_priority=high_priority,
total_time=total_time
)
except Exception as e:
logger.error(f"Error in step1: {e}", exc_info=True)
yield self._create_error_outputs(str(e), session)
def modify_task_chat(self, user_message: str, session: UserSession):
"""修改任務(帶真正的串流)"""
if not user_message.strip():
chat_html = self._generate_chat_history_html(session)
task_html = self._generate_task_list_html(session)
yield chat_html, task_html, session.to_dict()
return
session.chat_history.append({
"role": "user",
"message": user_message,
"time": datetime.now().strftime("%H:%M:%S")
})
yield (
self._generate_chat_history_html(session),
self._generate_task_list_html(session),
session.to_dict()
)
try:
if session.planner_agent is None:
if session.lat is not None and session.lng is not None:
session.chat_history.append({
"role": "assistant",
"message": "🔄 Restoring AI system...",
"time": datetime.now().strftime("%H:%M:%S")
})
yield (
self._generate_chat_history_html(session),
self._generate_task_list_html(session),
session.to_dict()
)
self._initialize_agents(session, session.lat, session.lng)
session.chat_history.pop()
else:
session.chat_history.append({
"role": "assistant",
"message": "❌ Error: Please restart the planning process.",
"time": datetime.now().strftime("%H:%M:%S")
})
yield (
self._generate_chat_history_html(session),
self._generate_task_list_html(session),
session.to_dict()
)
return
session.chat_history.append({
"role": "assistant",
"message": "🤔 AI is thinking...",
"time": datetime.now().strftime("%H:%M:%S")
})
yield (
self._generate_chat_history_html(session),
self._generate_task_list_html(session),
session.to_dict()
)
planner_stream = session.planner_agent.run(
f"help user to update the task_list, user's message: {user_message}",
stream=True,
stream_events=True
)
accumulated_response = ""
displayed_thinking = "🤔 AI is thinking...\n\n"
show_content = True
for chunk in planner_stream:
if chunk.event == RunEvent.run_content:
content = chunk.content
accumulated_response += content
if show_content:
if "@@@" in accumulated_response:
show_content = False
content = content.split("@@@")[0]
if content:
displayed_thinking += content
session.chat_history[-1] = {
"role": "assistant",
"message": displayed_thinking,
"time": datetime.now().strftime("%H:%M:%S")
}
yield (
self._generate_chat_history_html(session),
self._generate_task_list_html(session),
session.to_dict()
)
json_data = "{" + accumulated_response.split("{", maxsplit=1)[-1]
json_data = json_data.replace("`", "").replace("@", "").replace("\\", " ").replace("\n", " ")
session.planner_agent.update_session_state(
session_id=session.session_id,
session_state_updates={"task_list": json_data}
)
task_list_data = json.loads(json_data)
session.task_list = self._convert_task_list_to_ui_format(task_list_data)
session.chat_history[-1] = {
"role": "assistant",
"message": "✅ Tasks updated based on your request",
"time": datetime.now().strftime("%H:%M:%S")
}
self._add_reasoning(session, "planner", f"Updated: {user_message[:30]}...")
yield (
self._generate_chat_history_html(session),
self._generate_task_list_html(session),
session.to_dict()
)
except Exception as e:
logger.error(f"Error in modify_task_chat: {e}", exc_info=True)
if session.chat_history and "thinking" in session.chat_history[-1].get("message", "").lower():
session.chat_history.pop()
session.chat_history.append({
"role": "assistant",
"message": f"❌ Error: {str(e)}",
"time": datetime.now().strftime("%H:%M:%S")
})
yield (
self._generate_chat_history_html(session),
self._generate_task_list_html(session),
session.to_dict()
)
def step2_search_pois(self, session: UserSession):
"""Step 2: Scout 開始工作"""
self._add_reasoning(session, "team", "🚀 Core Team activated")
self._add_reasoning(session, "scout", "Searching for POIs...")
agent_updates = [
create_agent_card_enhanced("planner", "complete", "Tasks ready"),
create_agent_card_enhanced("scout", "working", "Searching..."),
*[create_agent_card_enhanced(k, "idle", "On standby")
for k in ["optimizer", "validator", "weather", "traffic"]]
]
return (
get_reasoning_html_reversed(session.reasoning_messages),
"🗺️ Scout is searching...",
*agent_updates,
session.to_dict()
)
def step3_run_core_team(self, session: UserSession):
"""Step 3: 運行 Core Team(帶串流)"""
try:
if session.core_team is None or session.planner_agent is None:
if session.lat is not None and session.lng is not None:
self._initialize_agents(session, session.lat, session.lng)
else:
raise ValueError("Cannot restore agents: location not available")
set_session_id(session.session_id)
task_list_input = session.planner_agent.get_session_state().get("task_list")
if not task_list_input:
raise ValueError("No task list available")
if isinstance(task_list_input, str):
task_list_str = task_list_input
else:
task_list_str = json.dumps(task_list_input, indent=2, ensure_ascii=False)
self._add_reasoning(session, "team", "🎯 Multi-agent collaboration started")
team_stream = session.core_team.run(
f"Plan this trip: {task_list_str}",
stream=True,
stream_events=True,
session_id=session.session_id
)
report_content = ""
for event in team_stream:
if event.event in [TeamRunEvent.run_content]:
report_content += event.content
elif event.event == "tool_call":
tool_name = event.tool_call.get('function', {}).get('name', 'unknown')
self._add_reasoning(session, "team", f"🔧 Tool: {tool_name}")
elif event.event == TeamRunEvent.run_completed:
self._add_reasoning(session, "team", "🎉 Completed")
report_html = f"## 🎯 Planning Complete\n\n{report_content}..."
return report_html, session.to_dict()
except Exception as e:
logger.error(f"Error in step3: {e}")
return f"## ❌ Error\n\n{str(e)}", session.to_dict()
def step4_finalize(self, session: UserSession):
"""Step 4: 完成"""
try:
final_ref_id = poi_repo.get_last_id_by_session(session.session_id)
if not final_ref_id:
raise ValueError(f"No final result found for session {session.session_id}")
structured_data = poi_repo.load(final_ref_id)
timeline = structured_data.get("timeline", [])
metrics = structured_data.get("metrics", {})
traffic_summary = structured_data.get("traffic_summary", {})
timeline_html = self._generate_timeline_html(timeline)
metrics_html = self._generate_metrics_html(metrics, traffic_summary)
safe_task_list = session.task_list if session.task_list else []
result_viz = create_result_visualization(safe_task_list, structured_data)
map_fig = self._generate_map_from_data(structured_data)
agent_updates = [
create_agent_card_enhanced(k, "complete", "✓ Complete")
for k in ["planner", "scout", "optimizer", "validator", "weather", "traffic"]
]
self._add_reasoning(session, "team", "🎉 All completed")
session.planning_completed = True
return (
timeline_html, metrics_html, result_viz, map_fig,
gr.update(visible=True), gr.update(visible=False),
"🎉 Planning completed!",
*agent_updates,
session.to_dict()
)
except Exception as e:
logger.error(f"Error in step4: {e}", exc_info=True)
error_html = f"<div style='color:red'>Error: {str(e)}</div>"
default_map = create_animated_map()
agent_updates = [
create_agent_card_enhanced(k, "idle", "Waiting...")
for k in ["planner", "scout", "optimizer", "validator", "weather", "traffic"]
]
return (
error_html, error_html, error_html, default_map,
gr.update(visible=True), gr.update(visible=False),
f"Error: {str(e)}",
*agent_updates,
session.to_dict()
)
# ===== 輔助方法 =====
# [Security Fix] 新增:保存設定到 User Session
def save_settings(self, google_key, weather_key, anthropic_key, model, session_data):
"""保存設定到用戶 Session"""
session = UserSession.from_dict(session_data)
session.custom_settings['google_maps_api_key'] = google_key
session.custom_settings['openweather_api_key'] = weather_key
session.custom_settings['anthropic_api_key'] = anthropic_key
session.custom_settings['model'] = model
return "✅ Settings saved locally!", session.to_dict()
def _add_reasoning(self, session: UserSession, agent: str, message: str):
session.reasoning_messages.append({
"agent": agent,
"message": message,
"time": datetime.now().strftime("%H:%M:%S")
})
def _create_step1_outputs(self, stream_text: str, session: UserSession, agent_status: tuple):
agent_key, status, message = agent_status
return (
self._create_stream_html(stream_text), "", "",
get_reasoning_html_reversed(session.reasoning_messages),
gr.update(visible=False), gr.update(visible=False), "", message,
create_agent_card_enhanced(agent_key, status, message),
create_agent_card_enhanced("scout", "idle", "On standby"),
create_agent_card_enhanced("optimizer", "idle", "On standby"),
create_agent_card_enhanced("validator", "idle", "On standby"),
create_agent_card_enhanced("weather", "idle", "On standby"),
create_agent_card_enhanced("traffic", "idle", "On standby"),
session.to_dict()
)
def _create_step1_complete_outputs(self, stream_text: str, session: UserSession,
high_priority: int, total_time: int):
summary_html = create_summary_card(len(session.task_list), high_priority, total_time)
task_html = self._generate_task_list_html(session)
return (
self._create_stream_html(stream_text), summary_html, task_html,
get_reasoning_html_reversed(session.reasoning_messages),
gr.update(visible=True), gr.update(visible=False),
self._generate_chat_welcome_html(), "✓ Tasks extracted",
create_agent_card_enhanced("planner", "complete", "Tasks ready"),
create_agent_card_enhanced("scout", "idle", "On standby"),
create_agent_card_enhanced("optimizer", "idle", "On standby"),
create_agent_card_enhanced("validator", "idle", "On standby"),
create_agent_card_enhanced("weather", "idle", "On standby"),
create_agent_card_enhanced("traffic", "idle", "On standby"),
session.to_dict()
)
def _empty_step1_outputs(self, session: UserSession):
yield (
create_agent_stream_output(), "", "",
get_reasoning_html_reversed(),
gr.update(visible=False), gr.update(visible=False), "",
"Waiting for input...",
create_agent_card_enhanced("planner", "idle", "On standby"),
create_agent_card_enhanced("scout", "idle", "On standby"),
create_agent_card_enhanced("optimizer", "idle", "On standby"),
create_agent_card_enhanced("validator", "idle", "On standby"),
create_agent_card_enhanced("weather", "idle", "On standby"),
create_agent_card_enhanced("traffic", "idle", "On standby"),
session.to_dict()
)
def _create_error_outputs(self, error: str, session: UserSession):
return (
self._create_stream_html(f"❌ Error: {error}"), "", "",
get_reasoning_html_reversed(session.reasoning_messages),
gr.update(visible=False), gr.update(visible=False), "",
f"Error: {error}",
create_agent_card_enhanced("planner", "idle", "Error occurred"),
create_agent_card_enhanced("scout", "idle", "On standby"),
create_agent_card_enhanced("optimizer", "idle", "On standby"),
create_agent_card_enhanced("validator", "idle", "On standby"),
create_agent_card_enhanced("weather", "idle", "On standby"),
create_agent_card_enhanced("traffic", "idle", "On standby"),
session.to_dict()
)
def _create_stream_html(self, text: str) -> str:
return f"""<div class="stream-container"><div class="stream-text">{text}<span class="stream-cursor"></span></div></div>"""
def _convert_task_list_to_ui_format(self, task_list_data):
ui_tasks = []
if isinstance(task_list_data, dict):
tasks = task_list_data.get("tasks", [])
elif isinstance(task_list_data, list):
tasks = task_list_data
else:
return []
for i, task in enumerate(tasks, 1):
ui_task = {
"id": i,
"title": task.get("description", "Task"),
"priority": task.get("priority", "MEDIUM"),
"time": task.get("time_window", "Anytime"),
"duration": f"{task.get('duration_minutes', 30)} minutes",
"location": task.get("location_hint", "To be determined"),
"icon": self._get_task_icon(task.get("category", "other"))
}
ui_tasks.append(ui_task)
return ui_tasks
def _get_task_icon(self, category: str) -> str:
icons = {
"medical": "🏥", "shopping": "🛒", "postal": "📮",
"food": "🍽️", "entertainment": "🎭", "transportation": "🚗",
"other": "📋"
}
return icons.get(category.lower(), "📋")
def _generate_task_list_html(self, session: UserSession) -> str:
if not session.task_list:
return "<p>No tasks available</p>"
html = ""
for task in session.task_list:
html += create_task_card(
task["id"], task["title"], task["priority"],
task["time"], task["duration"], task["location"],
task.get("icon", "📋")
)
return html
def _generate_chat_history_html(self, session: UserSession) -> str:
if not session.chat_history:
return self._generate_chat_welcome_html()
html = '<div class="chat-history">'
for msg in session.chat_history:
role_class = "user" if msg["role"] == "user" else "assistant"
icon = "👤" if msg["role"] == "user" else "🤖"
html += f"""
<div class="chat-message {role_class}">
<span class="chat-icon">{icon}</span>
<div class="chat-content">
<div class="chat-text">{msg["message"]}</div>
<div class="chat-time">{msg["time"]}</div>
</div>
</div>
"""
html += '</div>'
return html
def _generate_chat_welcome_html(self) -> str:
return """
<div class="chat-welcome">
<h3>💬 Chat with LifeFlow AI</h3>
<p>Modify your tasks by chatting here.</p>
</div>
"""
def _generate_timeline_html(self, timeline):
"""
生成右側 Timeline Tab 的 HTML
使用 structured_data['timeline'] 中的豐富數據
"""
if not timeline:
return "<div>No timeline data</div>"
html = '<div class="timeline-container">'
for i, stop in enumerate(timeline):
time = stop.get("time", "N/A")
location = stop.get("location", "Unknown Location")
weather = stop.get("weather", "")
aqi = stop.get("aqi", {}).get("label", "")
travel_time = stop.get("travel_time_from_prev", "")
# 決定標記樣式
marker_class = "start" if i == 0 else "stop"
html += f"""
<div class="timeline-item">
<div class="timeline-marker {marker_class}">{i}</div>
<div class="timeline-content">
<div class="timeline-header">
<span class="time-badge">{time}</span>
<h4>{location}</h4>
</div>
<div class="timeline-details">
{f'<span class="weather-tag">🌤️ {weather}</span>' if weather else ''}
{f'<span class="aqi-tag">{aqi}</span>' if aqi else ''}
</div>
{f'<p class="travel-info">🚗 Drive: {travel_time}</p>' if i > 0 else ''}
</div>
</div>
"""
html += '</div>'
# 補充少許 CSS 以確保美觀
html += """
<style>
.timeline-header { display: flex; align-items: center; gap: 10px; margin-bottom: 5px; }
.time-badge { background: #e3f2fd; color: #1976d2; padding: 2px 8px; border-radius: 4px; font-weight: bold; font-size: 12px; }
.timeline-details { display: flex; gap: 8px; flex-wrap: wrap; margin-bottom: 5px; }
.weather-tag, .aqi-tag { font-size: 11px; padding: 2px 6px; background: #f5f5f5; border-radius: 4px; border: 1px solid #e0e0e0; }
.travel-info { font-size: 12px; color: #666; margin: 0; display: flex; align-items: center; gap: 5px; }
</style>
"""
return html
def _generate_metrics_html(self, metrics, traffic_summary):
"""生成左側的簡單 Metrics"""
if traffic_summary is None: traffic_summary = {}
if metrics is None: metrics = {}
total_distance = traffic_summary.get("total_distance_km", 0)
total_duration = traffic_summary.get("total_duration_min", 0)
efficiency = metrics.get("route_efficiency_pct", 90)
# 根據效率改變顏色
eff_color = "#50C878" if efficiency >= 80 else "#F5A623"
return f"""
<div class="metrics-container">
<div class="metric-card">
<h3>📏 Distance</h3>
<p class="metric-value">{total_distance:.1f} km</p>
</div>
<div class="metric-card">
<h3>⏱️ Duration</h3>
<p class="metric-value">{total_duration:.0f} min</p>
</div>
<div class="metric-card" style="border-bottom: 3px solid {eff_color}">
<h3>⚡ Efficiency</h3>
<p class="metric-value" style="color: {eff_color}">{efficiency:.0f}%</p>
</div>
</div>
"""
def _generate_map_from_data(self, structured_data):
"""
生成 Plotly 地圖,包含真實道路軌跡 (Polyline)
"""
import plotly.graph_objects as go
from core.utils import decode_polyline # 確保導入了解碼函數
try:
timeline = structured_data.get("timeline", [])
precise_result = structured_data.get("precise_traffic_result", {})
legs = precise_result.get("legs", [])
if not timeline:
return create_animated_map()
fig = go.Figure()
# 1. 繪製路線 (Polyline) - 藍色路徑
# 從 legs 中提取 polyline 並解碼
route_lats = []
route_lons = []
for leg in legs:
poly_str = leg.get("polyline")
if poly_str:
decoded = decode_polyline(poly_str)
# 解碼後是 list of (lat, lng)
leg_lats = [coord[0] for coord in decoded]
leg_lons = [coord[1] for coord in decoded]
# 為了讓線段連續,我們可以將其串接 (plotly 支援 nan 分隔,或直接單一 trace)
# 這裡簡單起見,我們將每個 leg 分開加,或者合併。合併效能較好。
route_lats.extend(leg_lats)
route_lons.extend(leg_lons)
if route_lats:
fig.add_trace(go.Scattermapbox(
lat=route_lats,
lon=route_lons,
mode='lines',
line=dict(width=4, color='#4A90E2'),
name='Route',
hoverinfo='none' # 路徑本身不需 hover
))
# 2. 繪製站點 (Markers)
lats, lons, hover_texts, colors, sizes = [], [], [], [], []
for i, stop in enumerate(timeline):
coords = stop.get("coordinates", {})
lat = coords.get("lat")
lng = coords.get("lng")
if lat and lng:
lats.append(lat)
lons.append(lng)
# 構建 Hover Text
name = stop.get("location", "Stop")
time = stop.get("time", "")
weather = stop.get("weather", "")
text = f"<b>{i}. {name}</b><br>🕒 {time}<br>🌤️ {weather}"
hover_texts.append(text)
# 樣式:起點綠色,終點紅色,中間藍色
if i == 0:
colors.append('#50C878') # Start
sizes.append(15)
elif i == len(timeline) - 1:
colors.append('#FF6B6B') # End
sizes.append(15)
else:
colors.append('#F5A623') # Middle
sizes.append(12)
fig.add_trace(go.Scattermapbox(
lat=lats, lon=lons,
mode='markers+text',
marker=dict(size=sizes, color=colors, allowoverlap=True),
text=[str(i) for i in range(len(lats))], # 顯示序號
textposition="top center",
textfont=dict(size=14, color='black', family="Arial Black"),
hovertext=hover_texts,
hoverinfo='text',
name='Stops'
))
# 設定地圖中心與縮放
if lats:
center_lat = sum(lats) / len(lats)
center_lon = sum(lons) / len(lons)
else:
center_lat, center_lon = 25.033, 121.565
fig.update_layout(
mapbox=dict(
style='open-street-map', # 或 'carto-positron' 看起來更乾淨
center=dict(lat=center_lat, lon=center_lon),
zoom=12
),
margin=dict(l=0, r=0, t=0, b=0),
height=500,
showlegend=False
)
return fig
except Exception as e:
logger.error(f"Error generating map: {e}", exc_info=True)
return create_animated_map()
def build_interface(self):
with gr.Blocks(title=APP_TITLE) as demo:
gr.HTML(get_enhanced_css())
create_header()
theme_btn, settings_btn, doc_btn = create_top_controls()
# 🔥 每個 Browser Tab 都有獨立的 UserSession
session_state = gr.State(value=UserSession().to_dict())
with gr.Row():
with gr.Column(scale=2, min_width=400):
(input_area, agent_stream_output, user_input, auto_location,
location_inputs, lat_input, lon_input, analyze_btn) = create_input_form(
create_agent_stream_output()
)
(task_confirm_area, task_summary_display,
task_list_display, exit_btn_inline, ready_plan_btn) = create_confirmation_area()
team_area, agent_displays = create_team_area(create_agent_card_enhanced)
(result_area, result_display, timeline_display, metrics_display) = create_result_area(
create_animated_map)
with gr.Column(scale=3, min_width=500):
status_bar = gr.Textbox(label="📊 Status", value="Waiting for input...", interactive=False,
max_lines=1)
(tabs, report_tab, map_tab, report_output, map_output, reasoning_output,
chat_input_area, chat_history_output, chat_input, chat_send) = create_tabs(
create_animated_map, get_reasoning_html_reversed()
)
(settings_modal, google_maps_key, openweather_key, anthropic_key,
model_choice, close_settings_btn, save_settings_btn,
settings_status) = create_settings_modal()
doc_modal, close_doc_btn = create_doc_modal()
# ===== Event Handlers =====
auto_location.change(fn=toggle_location_inputs, inputs=[auto_location], outputs=[location_inputs])
def analyze_wrapper(ui, al, lat, lon, s):
session = UserSession.from_dict(s)
yield from self.step1_analyze_tasks(ui, al, lat, lon, session)
analyze_btn.click(
fn=analyze_wrapper,
inputs=[user_input, auto_location, lat_input, lon_input, session_state],
outputs=[
agent_stream_output, task_summary_display, task_list_display,
reasoning_output, task_confirm_area, chat_input_area,
chat_history_output, status_bar, *agent_displays, session_state
]
).then(
fn=lambda: (gr.update(visible=False), gr.update(visible=True), gr.update(visible=True)),
outputs=[input_area, task_confirm_area, chat_input_area]
)
def chat_wrapper(msg, s):
session = UserSession.from_dict(s)
yield from self.modify_task_chat(msg, session)
chat_send.click(
fn=chat_wrapper,
inputs=[chat_input, session_state],
outputs=[chat_history_output, task_list_display, session_state]
).then(fn=lambda: "", outputs=[chat_input])
exit_btn_inline.click(
fn=lambda: (
gr.update(visible=True), gr.update(visible=False),
gr.update(visible=False), gr.update(visible=False),
gr.update(visible=False), gr.update(visible=False),
gr.update(visible=False), "",
create_agent_stream_output(),
self._generate_chat_welcome_html(),
"Ready...",
UserSession().to_dict()
),
outputs=[
input_area, task_confirm_area, chat_input_area, result_area,
team_area, report_tab, map_tab, user_input,
agent_stream_output, chat_history_output, status_bar, session_state
]
)
ready_plan_btn.click(
fn=lambda: (gr.update(visible=False), gr.update(visible=False),
gr.update(visible=True), gr.update(selected="ai_conversation_tab")),
outputs=[task_confirm_area, chat_input_area, team_area, tabs]
).then(
fn=lambda s: self.step2_search_pois(UserSession.from_dict(s)),
inputs=[session_state],
outputs=[reasoning_output, status_bar, *agent_displays, session_state]
).then(
fn=lambda s: self.step3_run_core_team(UserSession.from_dict(s)),
inputs=[session_state],
outputs=[report_output, session_state]
).then(
fn=lambda: (gr.update(visible=True), gr.update(visible=True), gr.update(selected="report_tab")),
outputs=[report_tab, map_tab, tabs]
).then(
fn=lambda s: self.step4_finalize(UserSession.from_dict(s)),
inputs=[session_state],
outputs=[
timeline_display, metrics_display, result_display,
map_output, map_tab, team_area, status_bar, *agent_displays, session_state
]
).then(fn=lambda: gr.update(visible=True), outputs=[result_area])
# Settings
settings_btn.click(fn=lambda: gr.update(visible=True), outputs=[settings_modal])
close_settings_btn.click(fn=lambda: gr.update(visible=False), outputs=[settings_modal])
# [Security Fix] 儲存設定到 session_state
save_settings_btn.click(
fn=self.save_settings,
inputs=[google_maps_key, openweather_key, anthropic_key, model_choice, session_state],
outputs=[settings_status, session_state]
)
# Theme
theme_btn.click(fn=None, js="""
() => {
const container = document.querySelector('.gradio-container');
if (container) {
container.classList.toggle('theme-dark');
const isDark = container.classList.contains('theme-dark');
localStorage.setItem('lifeflow-theme', isDark ? 'dark' : 'light');
}
}
""")
doc_btn.click(fn=lambda: gr.update(visible=True), outputs=[doc_modal])
close_doc_btn.click(fn=lambda: gr.update(visible=False), outputs=[doc_modal])
return demo
def main():
app = LifeFlowAI()
demo = app.build_interface()
demo.launch(server_name="0.0.0.0", server_port=7860, share=True, show_error=True)
if __name__ == "__main__":
main()