LifeFlow-AI / src /optimization /solver /solution_extractor.py
Marco310's picture
buildup agent system
b7d08cf
raw
history blame
16 kB
"""
結果提取器 - 從 OR-Tools 結果提取路線和任務詳情
"""
from typing import List, Dict, Tuple, Any, Set, Optional
from datetime import datetime, timedelta
import numpy as np
from ortools.constraint_solver import pywrapcp
from src.infra.logger import get_logger
from ..models.internal_models import (
_Task,
_Graph,
_TSPTWResult,
_RouteStep,
_TaskDetail,
_AlternativePOI,
_OptimizationMetrics, # ✅ 確保有 import 這個
)
from ..graph.time_window_handler import TimeWindowHandler
logger = get_logger(__name__)
class SolutionExtractor:
"""
結果提取器
職責:
- 從 OR-Tools 結果提取路線
- 計算總時間/距離
- 構建任務詳情(包含備選 POI)
- ✅ 計算快樂表指標 (Metrics)
"""
def __init__(self):
self.tw_handler = TimeWindowHandler()
def extract(
self,
routing: pywrapcp.RoutingModel,
manager: pywrapcp.RoutingIndexManager,
solution: pywrapcp.Assignment,
time_dimension: pywrapcp.RoutingDimension,
start_time: datetime,
graph: _Graph,
tasks: List[_Task],
alt_k: int,
return_to_start: bool,
) -> _TSPTWResult:
"""
提取完整結果
"""
duration_matrix = graph.duration_matrix
distance_matrix = graph.distance_matrix
node_meta = graph.node_meta
locations = graph.locations
route: List[_RouteStep] = []
visited_task_ids = set()
total_travel_time = 0
total_travel_distance = 0
sequence_nodes: List[int] = []
sequence_indices: List[int] = []
# 1. 提取訪問順序
idx = routing.Start(0)
while not routing.IsEnd(idx):
node = manager.IndexToNode(idx)
sequence_indices.append(idx)
sequence_nodes.append(node)
idx = solution.Value(routing.NextVar(idx))
end_node = manager.IndexToNode(idx)
sequence_indices.append(idx)
sequence_nodes.append(end_node)
# 如果不要顯示回到出發點,就把最後這個 depot 去掉
if not return_to_start:
if len(sequence_nodes) >= 2 and node_meta[sequence_nodes[-1]]["type"] == "depot":
sequence_nodes = sequence_nodes[:-1]
sequence_indices = sequence_indices[:-1]
# 2. 構建前後節點映射(用於計算備選 POI)
node_prev_next: Dict[int, Tuple[int, int]] = {}
for i, node in enumerate(sequence_nodes):
prev_node = sequence_nodes[i - 1] if i > 0 else sequence_nodes[0]
next_node = sequence_nodes[i + 1] if i < len(sequence_nodes) - 1 else sequence_nodes[-1]
node_prev_next[node] = (prev_node, next_node)
task_nodes: Dict[str, List[int]] = {}
chosen_node_per_task: Dict[str, int] = {}
arrival_sec_map: Dict[int, int] = {}
task_service_duration_map = {
i: task.service_duration_min
for i, task in enumerate(tasks)
}
# 3. 提取路線和訪問信息
for step_idx, (routing_index, node) in enumerate(zip(sequence_indices, sequence_nodes)):
meta = node_meta[node]
time_var = time_dimension.CumulVar(routing_index)
arr_sec = solution.Value(time_var)
arr_dt = start_time + timedelta(seconds=int(arr_sec))
arrival_sec_map[node] = arr_sec
if meta["type"] == "depot":
step = _RouteStep(
step=step_idx,
node_index=node,
arrival_time=arr_dt.isoformat(),
departure_time=arr_dt.isoformat(),
type="depot",
)
else:
task_id = meta["task_id"]
task_idx = meta["task_idx"]
service_duration_min = task_service_duration_map.get(task_idx, 5)
service_duration_sec = service_duration_min * 60
dep_dt = arr_dt + timedelta(seconds=service_duration_sec)
visited_task_ids.add(task_id)
task_nodes.setdefault(task_id, [])
task_nodes[task_id].append(node)
if task_id not in chosen_node_per_task:
chosen_node_per_task[task_id] = node
step = _RouteStep(
step=step_idx,
node_index=node,
arrival_time=arr_dt.isoformat(),
departure_time=dep_dt.isoformat(),
type="task_poi",
task_id=task_id,
poi_id=meta["poi_id"],
service_duration_min=service_duration_min, # 確保回傳服務時間
)
route.append(step)
# 4. 補充未訪問的節點到 task_nodes(用於備選 POI 計算)
for node in range(1, len(node_meta)):
meta = node_meta[node]
if meta["type"] != "poi":
continue
task_id = meta["task_id"]
task_nodes.setdefault(task_id, [])
if node not in task_nodes[task_id]:
task_nodes[task_id].append(node)
# 5. 計算優化後的總距離 (Optimized Distance)
for i in range(len(sequence_nodes) - 1):
n1 = sequence_nodes[i]
n2 = sequence_nodes[i + 1]
total_travel_distance += distance_matrix[n1, n2]
total_travel_time += duration_matrix[n1, n2] # 這是純交通時間
# 6. 計算快樂表 metrics (Baseline vs Optimized)
# 6.1 計算總服務時間 (分子)
total_service_time_sec = sum(
tasks[node_meta[node]["task_idx"]].service_duration_sec
for node in sequence_nodes
if node_meta[node]["type"] == "poi"
)
# 6.2 優化後的總耗時 (Total Duration = Finish - Start)
# 注意: 這是包含 Waiting Time 的總工時
#last_node_idx = sequence_indices[-1]
#optimized_total_duration_sec = solution.Value(time_dimension.CumulVar(last_node_idx))
optimized_pure_duration = total_travel_time + total_service_time_sec
# 6.3 執行計算
metrics = self._calculate_metrics(
graph=graph,
tasks=tasks,
visited_task_ids=visited_task_ids,
optimized_dist=total_travel_distance,
optimized_duration=optimized_pure_duration,
total_service_time=total_service_time_sec,
return_to_start=return_to_start
)
# 7. 構建任務詳情
tasks_detail = self._build_tasks_detail(
tasks=tasks,
visited_task_ids=visited_task_ids,
chosen_node_per_task=chosen_node_per_task,
task_nodes=task_nodes,
node_meta=node_meta,
locations=locations,
node_prev_next=node_prev_next,
arrival_sec_map=arrival_sec_map,
duration_matrix=duration_matrix,
distance_matrix=distance_matrix,
start_time=start_time,
alt_k=alt_k,
)
skipped_tasks = sorted(set([t.task_id for t in tasks]) - visited_task_ids)
return _TSPTWResult(
status="OK",
total_travel_time_min=int(total_travel_time // 60),
total_travel_distance_m=int(total_travel_distance),
metrics=metrics, # ✅ 這裡把計算好的 metrics 塞進去
route=route,
visited_tasks=sorted(list(visited_task_ids)),
skipped_tasks=skipped_tasks,
tasks_detail=tasks_detail,
)
def _calculate_metrics(
self,
graph: _Graph,
tasks: List[_Task],
visited_task_ids: Set[str],
optimized_dist: float,
optimized_duration: int,
total_service_time: int,
return_to_start: bool
) -> _OptimizationMetrics:
"""
計算優化指標
"""
dist_matrix = graph.distance_matrix
dur_matrix = graph.duration_matrix
node_meta = graph.node_meta
# 1. 建立映射: 哪個 Task 的第 0 個 Candidate 對應哪個 Node Index
task_cand_to_node = {}
for idx, meta in enumerate(node_meta):
if meta["type"] == "poi":
key = (meta["task_idx"], meta["candidate_idx"])
task_cand_to_node[key] = idx
# 2. 模擬 Baseline (笨方法): 照順序跑,每個都選第一個點
baseline_nodes = [0] # Start at Depot
for i, task in enumerate(tasks):
# 總是選第一個候選點 (Candidate 0)
target_node = task_cand_to_node.get((i, 0))
if target_node:
baseline_nodes.append(target_node)
if return_to_start:
baseline_nodes.append(0)
# 3. 計算 Baseline 成本
base_dist = 0
base_travel_time = 0
if len(baseline_nodes) > 1:
for i in range(len(baseline_nodes) - 1):
u = baseline_nodes[i]
v = baseline_nodes[i + 1]
base_dist += dist_matrix[u, v]
base_travel_time += dur_matrix[u, v]
# Baseline 總時間 = 純交通 + 總服務 (假設笨跑法不考慮等待,只算硬成本)
base_total_duration = base_travel_time + total_service_time
# 4. 計算百分比
dist_imp_pct = 0.0
if base_dist > 0:
dist_imp_pct = ((base_dist - optimized_dist) / base_dist) * 100
time_imp_pct = 0.0
if base_total_duration > 0:
time_imp_pct = ((base_total_duration - optimized_duration) / base_total_duration) * 100
efficiency_pct = 0.0
if optimized_duration > 0:
efficiency_pct = (total_service_time / optimized_duration) * 100
total_task_count = len(tasks)
completed_count = len(visited_task_ids)
completion_rate = (completed_count / total_task_count * 100) if total_task_count > 0 else 0.0
return _OptimizationMetrics(
# ✅ 新增完成率資訊
total_tasks=total_task_count,
completed_tasks=completed_count,
completion_rate_pct=round(completion_rate, 1),
original_distance_m=int(base_dist),
optimized_distance_m=int(optimized_dist),
distance_saved_m=int(base_dist - optimized_dist),
distance_improvement_pct=round(dist_imp_pct, 1),
original_duration_min=int(base_total_duration // 60),
optimized_duration_min=int(optimized_duration // 60),
time_saved_min=int((base_total_duration - optimized_duration) // 60),
time_improvement_pct=round(time_imp_pct, 1),
route_efficiency_pct=round(efficiency_pct, 1)
)
def _build_tasks_detail(
self,
tasks: List[_Task],
visited_task_ids: Set[str],
chosen_node_per_task: Dict[str, int],
task_nodes: Dict[str, List[int]],
node_meta: List[Dict[str, Any]],
locations: List[Dict[str, float]],
node_prev_next: Dict[int, Tuple[int, int]],
arrival_sec_map: Dict[int, int],
duration_matrix: np.ndarray,
distance_matrix: np.ndarray,
start_time: datetime,
alt_k: int,
) -> List[_TaskDetail]:
"""
構建任務詳情(保持原本邏輯不變)
"""
tasks_detail: List[_TaskDetail] = []
all_task_ids = [t.task_id for t in tasks]
task_priority_map = {t.task_id: t.priority for t in tasks}
for task_id in all_task_ids:
visited = task_id in visited_task_ids
priority = task_priority_map.get(task_id, "MEDIUM")
chosen_node = chosen_node_per_task.get(task_id, None)
all_nodes_for_task = task_nodes.get(task_id, [])
chosen_poi_info = None
if visited and chosen_node is not None:
meta = node_meta[chosen_node]
loc = locations[chosen_node]
chosen_poi_info = {
"node_index": chosen_node,
"poi_id": meta["poi_id"],
"lat": loc["lat"],
"lng": loc["lng"],
"interval_idx": meta.get("interval_idx"),
}
alternative_pois: List[_AlternativePOI] = []
if visited and chosen_node is not None and len(all_nodes_for_task) > 1:
alternative_pois = self._find_alternative_pois(
chosen_node=chosen_node,
all_nodes_for_task=all_nodes_for_task,
node_meta=node_meta,
locations=locations,
node_prev_next=node_prev_next,
arrival_sec_map=arrival_sec_map,
duration_matrix=duration_matrix,
distance_matrix=distance_matrix,
tasks=tasks,
start_time=start_time,
alt_k=alt_k,
)
tasks_detail.append(
_TaskDetail(
task_id=task_id,
priority=priority,
visited=visited,
chosen_poi=chosen_poi_info,
alternative_pois=alternative_pois,
)
)
return tasks_detail
def _find_alternative_pois(
self,
chosen_node: int,
all_nodes_for_task: List[int],
node_meta: List[Dict[str, Any]],
locations: List[Dict[str, float]],
node_prev_next: Dict[int, Tuple[int, int]],
arrival_sec_map: Dict[int, int],
duration_matrix: np.ndarray,
distance_matrix: np.ndarray,
tasks: List[_Task],
start_time: datetime,
alt_k: int,
) -> List[_AlternativePOI]:
"""
找出 Top-K 備選 POI (保持原本邏輯不變)
"""
prev_node, next_node = node_prev_next[chosen_node]
base_time = duration_matrix[prev_node, chosen_node] + duration_matrix[chosen_node, next_node]
base_dist = distance_matrix[prev_node, chosen_node] + distance_matrix[chosen_node, next_node]
chosen_meta = node_meta[chosen_node]
chosen_poi_id = chosen_meta["poi_id"]
chosen_arr_sec = arrival_sec_map.get(chosen_node, 0)
candidates_alt: List[_AlternativePOI] = []
for cand_node in all_nodes_for_task:
meta_c = node_meta[cand_node]
if cand_node == chosen_node:
continue
if meta_c["poi_id"] == chosen_poi_id:
continue
cand_start_sec, cand_end_sec = self.tw_handler.get_node_time_window_sec(
meta_c, tasks, start_time
)
if not (cand_start_sec <= chosen_arr_sec <= cand_end_sec):
continue
dt = duration_matrix[prev_node, cand_node] + duration_matrix[cand_node, next_node]
dd = distance_matrix[prev_node, cand_node] + distance_matrix[cand_node, next_node]
delta_time_sec = int(dt - base_time)
delta_dist = int(dd - base_dist)
loc_c = locations[cand_node]
candidates_alt.append(
_AlternativePOI(
node_index=cand_node,
poi_id=meta_c["poi_id"],
lat=loc_c["lat"],
lng=loc_c["lng"],
interval_idx=meta_c.get("interval_idx"),
delta_travel_time_min=delta_time_sec // 60,
delta_travel_distance_m=delta_dist,
)
)
candidates_alt.sort(key=lambda x: x.delta_travel_time_min)
return candidates_alt[:alt_k]