""" 結果提取器 - 從 OR-Tools 結果提取路線和任務詳情 """ from typing import List, Dict, Tuple, Any, Set, Optional from datetime import datetime, timedelta import numpy as np from ortools.constraint_solver import pywrapcp from src.infra.logger import get_logger from ..models.internal_models import ( _Task, _Graph, _TSPTWResult, _RouteStep, _TaskDetail, _AlternativePOI, _OptimizationMetrics, # ✅ 確保有 import 這個 ) from ..graph.time_window_handler import TimeWindowHandler logger = get_logger(__name__) class SolutionExtractor: """ 結果提取器 職責: - 從 OR-Tools 結果提取路線 - 計算總時間/距離 - 構建任務詳情(包含備選 POI) - ✅ 計算快樂表指標 (Metrics) """ def __init__(self): self.tw_handler = TimeWindowHandler() def extract( self, routing: pywrapcp.RoutingModel, manager: pywrapcp.RoutingIndexManager, solution: pywrapcp.Assignment, time_dimension: pywrapcp.RoutingDimension, start_time: datetime, graph: _Graph, tasks: List[_Task], alt_k: int, return_to_start: bool, ) -> _TSPTWResult: """ 提取完整結果 """ duration_matrix = graph.duration_matrix distance_matrix = graph.distance_matrix node_meta = graph.node_meta locations = graph.locations route: List[_RouteStep] = [] visited_task_ids = set() total_travel_time = 0 total_travel_distance = 0 sequence_nodes: List[int] = [] sequence_indices: List[int] = [] # 1. 提取訪問順序 idx = routing.Start(0) while not routing.IsEnd(idx): node = manager.IndexToNode(idx) sequence_indices.append(idx) sequence_nodes.append(node) idx = solution.Value(routing.NextVar(idx)) end_node = manager.IndexToNode(idx) sequence_indices.append(idx) sequence_nodes.append(end_node) # 如果不要顯示回到出發點,就把最後這個 depot 去掉 if not return_to_start: if len(sequence_nodes) >= 2 and node_meta[sequence_nodes[-1]]["type"] == "depot": sequence_nodes = sequence_nodes[:-1] sequence_indices = sequence_indices[:-1] # 2. 構建前後節點映射(用於計算備選 POI) node_prev_next: Dict[int, Tuple[int, int]] = {} for i, node in enumerate(sequence_nodes): prev_node = sequence_nodes[i - 1] if i > 0 else sequence_nodes[0] next_node = sequence_nodes[i + 1] if i < len(sequence_nodes) - 1 else sequence_nodes[-1] node_prev_next[node] = (prev_node, next_node) task_nodes: Dict[str, List[int]] = {} chosen_node_per_task: Dict[str, int] = {} arrival_sec_map: Dict[int, int] = {} task_service_duration_map = { i: task.service_duration_min for i, task in enumerate(tasks) } # 3. 提取路線和訪問信息 for step_idx, (routing_index, node) in enumerate(zip(sequence_indices, sequence_nodes)): meta = node_meta[node] time_var = time_dimension.CumulVar(routing_index) arr_sec = solution.Value(time_var) arr_dt = start_time + timedelta(seconds=int(arr_sec)) arrival_sec_map[node] = arr_sec if meta["type"] == "depot": step = _RouteStep( step=step_idx, node_index=node, arrival_time=arr_dt.isoformat(), departure_time=arr_dt.isoformat(), type="depot", ) else: task_id = meta["task_id"] task_idx = meta["task_idx"] service_duration_min = task_service_duration_map.get(task_idx, 5) service_duration_sec = service_duration_min * 60 dep_dt = arr_dt + timedelta(seconds=service_duration_sec) visited_task_ids.add(task_id) task_nodes.setdefault(task_id, []) task_nodes[task_id].append(node) if task_id not in chosen_node_per_task: chosen_node_per_task[task_id] = node step = _RouteStep( step=step_idx, node_index=node, arrival_time=arr_dt.isoformat(), departure_time=dep_dt.isoformat(), type="task_poi", task_id=task_id, poi_id=meta["poi_id"], service_duration_min=service_duration_min, # 確保回傳服務時間 ) route.append(step) # 4. 補充未訪問的節點到 task_nodes(用於備選 POI 計算) for node in range(1, len(node_meta)): meta = node_meta[node] if meta["type"] != "poi": continue task_id = meta["task_id"] task_nodes.setdefault(task_id, []) if node not in task_nodes[task_id]: task_nodes[task_id].append(node) # 5. 計算優化後的總距離 (Optimized Distance) for i in range(len(sequence_nodes) - 1): n1 = sequence_nodes[i] n2 = sequence_nodes[i + 1] total_travel_distance += distance_matrix[n1, n2] total_travel_time += duration_matrix[n1, n2] # 這是純交通時間 # 6. 計算快樂表 metrics (Baseline vs Optimized) # 6.1 計算總服務時間 (分子) total_service_time_sec = sum( tasks[node_meta[node]["task_idx"]].service_duration_sec for node in sequence_nodes if node_meta[node]["type"] == "poi" ) # 6.2 優化後的總耗時 (Total Duration = Finish - Start) # 注意: 這是包含 Waiting Time 的總工時 #last_node_idx = sequence_indices[-1] #optimized_total_duration_sec = solution.Value(time_dimension.CumulVar(last_node_idx)) optimized_pure_duration = total_travel_time + total_service_time_sec # 6.3 執行計算 metrics = self._calculate_metrics( graph=graph, tasks=tasks, visited_task_ids=visited_task_ids, optimized_dist=total_travel_distance, optimized_duration=optimized_pure_duration, total_service_time=total_service_time_sec, return_to_start=return_to_start ) # 7. 構建任務詳情 tasks_detail = self._build_tasks_detail( tasks=tasks, visited_task_ids=visited_task_ids, chosen_node_per_task=chosen_node_per_task, task_nodes=task_nodes, node_meta=node_meta, locations=locations, node_prev_next=node_prev_next, arrival_sec_map=arrival_sec_map, duration_matrix=duration_matrix, distance_matrix=distance_matrix, start_time=start_time, alt_k=alt_k, ) skipped_tasks = sorted(set([t.task_id for t in tasks]) - visited_task_ids) return _TSPTWResult( status="OK", total_travel_time_min=int(total_travel_time // 60), total_travel_distance_m=int(total_travel_distance), metrics=metrics, # ✅ 這裡把計算好的 metrics 塞進去 route=route, visited_tasks=sorted(list(visited_task_ids)), skipped_tasks=skipped_tasks, tasks_detail=tasks_detail, ) def _calculate_metrics( self, graph: _Graph, tasks: List[_Task], visited_task_ids: Set[str], optimized_dist: float, optimized_duration: int, total_service_time: int, return_to_start: bool ) -> _OptimizationMetrics: """ 計算優化指標 """ dist_matrix = graph.distance_matrix dur_matrix = graph.duration_matrix node_meta = graph.node_meta # 1. 建立映射: 哪個 Task 的第 0 個 Candidate 對應哪個 Node Index task_cand_to_node = {} for idx, meta in enumerate(node_meta): if meta["type"] == "poi": key = (meta["task_idx"], meta["candidate_idx"]) task_cand_to_node[key] = idx # 2. 模擬 Baseline (笨方法): 照順序跑,每個都選第一個點 baseline_nodes = [0] # Start at Depot for i, task in enumerate(tasks): # 總是選第一個候選點 (Candidate 0) target_node = task_cand_to_node.get((i, 0)) if target_node: baseline_nodes.append(target_node) if return_to_start: baseline_nodes.append(0) # 3. 計算 Baseline 成本 base_dist = 0 base_travel_time = 0 if len(baseline_nodes) > 1: for i in range(len(baseline_nodes) - 1): u = baseline_nodes[i] v = baseline_nodes[i + 1] base_dist += dist_matrix[u, v] base_travel_time += dur_matrix[u, v] # Baseline 總時間 = 純交通 + 總服務 (假設笨跑法不考慮等待,只算硬成本) base_total_duration = base_travel_time + total_service_time # 4. 計算百分比 dist_imp_pct = 0.0 if base_dist > 0: dist_imp_pct = ((base_dist - optimized_dist) / base_dist) * 100 time_imp_pct = 0.0 if base_total_duration > 0: time_imp_pct = ((base_total_duration - optimized_duration) / base_total_duration) * 100 efficiency_pct = 0.0 if optimized_duration > 0: efficiency_pct = (total_service_time / optimized_duration) * 100 total_task_count = len(tasks) completed_count = len(visited_task_ids) completion_rate = (completed_count / total_task_count * 100) if total_task_count > 0 else 0.0 return _OptimizationMetrics( # ✅ 新增完成率資訊 total_tasks=total_task_count, completed_tasks=completed_count, completion_rate_pct=round(completion_rate, 1), original_distance_m=int(base_dist), optimized_distance_m=int(optimized_dist), distance_saved_m=int(base_dist - optimized_dist), distance_improvement_pct=round(dist_imp_pct, 1), original_duration_min=int(base_total_duration // 60), optimized_duration_min=int(optimized_duration // 60), time_saved_min=int((base_total_duration - optimized_duration) // 60), time_improvement_pct=round(time_imp_pct, 1), route_efficiency_pct=round(efficiency_pct, 1) ) def _build_tasks_detail( self, tasks: List[_Task], visited_task_ids: Set[str], chosen_node_per_task: Dict[str, int], task_nodes: Dict[str, List[int]], node_meta: List[Dict[str, Any]], locations: List[Dict[str, float]], node_prev_next: Dict[int, Tuple[int, int]], arrival_sec_map: Dict[int, int], duration_matrix: np.ndarray, distance_matrix: np.ndarray, start_time: datetime, alt_k: int, ) -> List[_TaskDetail]: """ 構建任務詳情(保持原本邏輯不變) """ tasks_detail: List[_TaskDetail] = [] all_task_ids = [t.task_id for t in tasks] task_priority_map = {t.task_id: t.priority for t in tasks} for task_id in all_task_ids: visited = task_id in visited_task_ids priority = task_priority_map.get(task_id, "MEDIUM") chosen_node = chosen_node_per_task.get(task_id, None) all_nodes_for_task = task_nodes.get(task_id, []) chosen_poi_info = None if visited and chosen_node is not None: meta = node_meta[chosen_node] loc = locations[chosen_node] chosen_poi_info = { "node_index": chosen_node, "poi_id": meta["poi_id"], "lat": loc["lat"], "lng": loc["lng"], "interval_idx": meta.get("interval_idx"), } alternative_pois: List[_AlternativePOI] = [] if visited and chosen_node is not None and len(all_nodes_for_task) > 1: alternative_pois = self._find_alternative_pois( chosen_node=chosen_node, all_nodes_for_task=all_nodes_for_task, node_meta=node_meta, locations=locations, node_prev_next=node_prev_next, arrival_sec_map=arrival_sec_map, duration_matrix=duration_matrix, distance_matrix=distance_matrix, tasks=tasks, start_time=start_time, alt_k=alt_k, ) tasks_detail.append( _TaskDetail( task_id=task_id, priority=priority, visited=visited, chosen_poi=chosen_poi_info, alternative_pois=alternative_pois, ) ) return tasks_detail def _find_alternative_pois( self, chosen_node: int, all_nodes_for_task: List[int], node_meta: List[Dict[str, Any]], locations: List[Dict[str, float]], node_prev_next: Dict[int, Tuple[int, int]], arrival_sec_map: Dict[int, int], duration_matrix: np.ndarray, distance_matrix: np.ndarray, tasks: List[_Task], start_time: datetime, alt_k: int, ) -> List[_AlternativePOI]: """ 找出 Top-K 備選 POI (保持原本邏輯不變) """ prev_node, next_node = node_prev_next[chosen_node] base_time = duration_matrix[prev_node, chosen_node] + duration_matrix[chosen_node, next_node] base_dist = distance_matrix[prev_node, chosen_node] + distance_matrix[chosen_node, next_node] chosen_meta = node_meta[chosen_node] chosen_poi_id = chosen_meta["poi_id"] chosen_arr_sec = arrival_sec_map.get(chosen_node, 0) candidates_alt: List[_AlternativePOI] = [] for cand_node in all_nodes_for_task: meta_c = node_meta[cand_node] if cand_node == chosen_node: continue if meta_c["poi_id"] == chosen_poi_id: continue cand_start_sec, cand_end_sec = self.tw_handler.get_node_time_window_sec( meta_c, tasks, start_time ) if not (cand_start_sec <= chosen_arr_sec <= cand_end_sec): continue dt = duration_matrix[prev_node, cand_node] + duration_matrix[cand_node, next_node] dd = distance_matrix[prev_node, cand_node] + distance_matrix[cand_node, next_node] delta_time_sec = int(dt - base_time) delta_dist = int(dd - base_dist) loc_c = locations[cand_node] candidates_alt.append( _AlternativePOI( node_index=cand_node, poi_id=meta_c["poi_id"], lat=loc_c["lat"], lng=loc_c["lng"], interval_idx=meta_c.get("interval_idx"), delta_travel_time_min=delta_time_sec // 60, delta_travel_distance_m=delta_dist, ) ) candidates_alt.sort(key=lambda x: x.delta_travel_time_min) return candidates_alt[:alt_k]