Spaces:
Running
Running
| """ | |
| 結果提取器 - 從 OR-Tools 結果提取路線和任務詳情 | |
| """ | |
| from typing import List, Dict, Tuple, Any, Set, Optional | |
| from datetime import datetime, timedelta | |
| import numpy as np | |
| from ortools.constraint_solver import pywrapcp | |
| from src.infra.logger import get_logger | |
| from ..models.internal_models import ( | |
| _Task, | |
| _Graph, | |
| _TSPTWResult, | |
| _RouteStep, | |
| _TaskDetail, | |
| _AlternativePOI, | |
| _OptimizationMetrics, # ✅ 確保有 import 這個 | |
| ) | |
| from ..graph.time_window_handler import TimeWindowHandler | |
| logger = get_logger(__name__) | |
| class SolutionExtractor: | |
| """ | |
| 結果提取器 | |
| 職責: | |
| - 從 OR-Tools 結果提取路線 | |
| - 計算總時間/距離 | |
| - 構建任務詳情(包含備選 POI) | |
| - ✅ 計算快樂表指標 (Metrics) | |
| """ | |
| def __init__(self): | |
| self.tw_handler = TimeWindowHandler() | |
| def extract( | |
| self, | |
| routing: pywrapcp.RoutingModel, | |
| manager: pywrapcp.RoutingIndexManager, | |
| solution: pywrapcp.Assignment, | |
| time_dimension: pywrapcp.RoutingDimension, | |
| start_time: datetime, | |
| graph: _Graph, | |
| tasks: List[_Task], | |
| alt_k: int, | |
| return_to_start: bool, | |
| ) -> _TSPTWResult: | |
| """ | |
| 提取完整結果 | |
| """ | |
| duration_matrix = graph.duration_matrix | |
| distance_matrix = graph.distance_matrix | |
| node_meta = graph.node_meta | |
| locations = graph.locations | |
| route: List[_RouteStep] = [] | |
| visited_task_ids = set() | |
| total_travel_time = 0 | |
| total_travel_distance = 0 | |
| sequence_nodes: List[int] = [] | |
| sequence_indices: List[int] = [] | |
| # 1. 提取訪問順序 | |
| idx = routing.Start(0) | |
| while not routing.IsEnd(idx): | |
| node = manager.IndexToNode(idx) | |
| sequence_indices.append(idx) | |
| sequence_nodes.append(node) | |
| idx = solution.Value(routing.NextVar(idx)) | |
| end_node = manager.IndexToNode(idx) | |
| sequence_indices.append(idx) | |
| sequence_nodes.append(end_node) | |
| # 如果不要顯示回到出發點,就把最後這個 depot 去掉 | |
| if not return_to_start: | |
| if len(sequence_nodes) >= 2 and node_meta[sequence_nodes[-1]]["type"] == "depot": | |
| sequence_nodes = sequence_nodes[:-1] | |
| sequence_indices = sequence_indices[:-1] | |
| # 2. 構建前後節點映射(用於計算備選 POI) | |
| node_prev_next: Dict[int, Tuple[int, int]] = {} | |
| for i, node in enumerate(sequence_nodes): | |
| prev_node = sequence_nodes[i - 1] if i > 0 else sequence_nodes[0] | |
| next_node = sequence_nodes[i + 1] if i < len(sequence_nodes) - 1 else sequence_nodes[-1] | |
| node_prev_next[node] = (prev_node, next_node) | |
| task_nodes: Dict[str, List[int]] = {} | |
| chosen_node_per_task: Dict[str, int] = {} | |
| arrival_sec_map: Dict[int, int] = {} | |
| task_service_duration_map = { | |
| i: task.service_duration_min | |
| for i, task in enumerate(tasks) | |
| } | |
| # 3. 提取路線和訪問信息 | |
| for step_idx, (routing_index, node) in enumerate(zip(sequence_indices, sequence_nodes)): | |
| meta = node_meta[node] | |
| time_var = time_dimension.CumulVar(routing_index) | |
| arr_sec = solution.Value(time_var) | |
| arr_dt = start_time + timedelta(seconds=int(arr_sec)) | |
| arrival_sec_map[node] = arr_sec | |
| if meta["type"] == "depot": | |
| step = _RouteStep( | |
| step=step_idx, | |
| node_index=node, | |
| arrival_time=arr_dt.isoformat(), | |
| departure_time=arr_dt.isoformat(), | |
| type="depot", | |
| ) | |
| else: | |
| task_id = meta["task_id"] | |
| task_idx = meta["task_idx"] | |
| service_duration_min = task_service_duration_map.get(task_idx, 5) | |
| service_duration_sec = service_duration_min * 60 | |
| dep_dt = arr_dt + timedelta(seconds=service_duration_sec) | |
| visited_task_ids.add(task_id) | |
| task_nodes.setdefault(task_id, []) | |
| task_nodes[task_id].append(node) | |
| if task_id not in chosen_node_per_task: | |
| chosen_node_per_task[task_id] = node | |
| step = _RouteStep( | |
| step=step_idx, | |
| node_index=node, | |
| arrival_time=arr_dt.isoformat(), | |
| departure_time=dep_dt.isoformat(), | |
| type="task_poi", | |
| task_id=task_id, | |
| poi_id=meta["poi_id"], | |
| service_duration_min=service_duration_min, # 確保回傳服務時間 | |
| ) | |
| route.append(step) | |
| # 4. 補充未訪問的節點到 task_nodes(用於備選 POI 計算) | |
| for node in range(1, len(node_meta)): | |
| meta = node_meta[node] | |
| if meta["type"] != "poi": | |
| continue | |
| task_id = meta["task_id"] | |
| task_nodes.setdefault(task_id, []) | |
| if node not in task_nodes[task_id]: | |
| task_nodes[task_id].append(node) | |
| # 5. 計算優化後的總距離 (Optimized Distance) | |
| for i in range(len(sequence_nodes) - 1): | |
| n1 = sequence_nodes[i] | |
| n2 = sequence_nodes[i + 1] | |
| total_travel_distance += distance_matrix[n1, n2] | |
| total_travel_time += duration_matrix[n1, n2] # 這是純交通時間 | |
| # 6. 計算快樂表 metrics (Baseline vs Optimized) | |
| # 6.1 計算總服務時間 (分子) | |
| total_service_time_sec = sum( | |
| tasks[node_meta[node]["task_idx"]].service_duration_sec | |
| for node in sequence_nodes | |
| if node_meta[node]["type"] == "poi" | |
| ) | |
| # 6.2 優化後的總耗時 (Total Duration = Finish - Start) | |
| # 注意: 這是包含 Waiting Time 的總工時 | |
| #last_node_idx = sequence_indices[-1] | |
| #optimized_total_duration_sec = solution.Value(time_dimension.CumulVar(last_node_idx)) | |
| optimized_pure_duration = total_travel_time + total_service_time_sec | |
| # 6.3 執行計算 | |
| metrics = self._calculate_metrics( | |
| graph=graph, | |
| tasks=tasks, | |
| visited_task_ids=visited_task_ids, | |
| optimized_dist=total_travel_distance, | |
| optimized_duration=optimized_pure_duration, | |
| total_service_time=total_service_time_sec, | |
| return_to_start=return_to_start | |
| ) | |
| # 7. 構建任務詳情 | |
| tasks_detail = self._build_tasks_detail( | |
| tasks=tasks, | |
| visited_task_ids=visited_task_ids, | |
| chosen_node_per_task=chosen_node_per_task, | |
| task_nodes=task_nodes, | |
| node_meta=node_meta, | |
| locations=locations, | |
| node_prev_next=node_prev_next, | |
| arrival_sec_map=arrival_sec_map, | |
| duration_matrix=duration_matrix, | |
| distance_matrix=distance_matrix, | |
| start_time=start_time, | |
| alt_k=alt_k, | |
| ) | |
| skipped_tasks = sorted(set([t.task_id for t in tasks]) - visited_task_ids) | |
| return _TSPTWResult( | |
| status="OK", | |
| total_travel_time_min=int(total_travel_time // 60), | |
| total_travel_distance_m=int(total_travel_distance), | |
| metrics=metrics, # ✅ 這裡把計算好的 metrics 塞進去 | |
| route=route, | |
| visited_tasks=sorted(list(visited_task_ids)), | |
| skipped_tasks=skipped_tasks, | |
| tasks_detail=tasks_detail, | |
| ) | |
| def _calculate_metrics( | |
| self, | |
| graph: _Graph, | |
| tasks: List[_Task], | |
| visited_task_ids: Set[str], | |
| optimized_dist: float, | |
| optimized_duration: int, | |
| total_service_time: int, | |
| return_to_start: bool | |
| ) -> _OptimizationMetrics: | |
| """ | |
| 計算優化指標 | |
| """ | |
| dist_matrix = graph.distance_matrix | |
| dur_matrix = graph.duration_matrix | |
| node_meta = graph.node_meta | |
| # 1. 建立映射: 哪個 Task 的第 0 個 Candidate 對應哪個 Node Index | |
| task_cand_to_node = {} | |
| for idx, meta in enumerate(node_meta): | |
| if meta["type"] == "poi": | |
| key = (meta["task_idx"], meta["candidate_idx"]) | |
| task_cand_to_node[key] = idx | |
| # 2. 模擬 Baseline (笨方法): 照順序跑,每個都選第一個點 | |
| baseline_nodes = [0] # Start at Depot | |
| for i, task in enumerate(tasks): | |
| # 總是選第一個候選點 (Candidate 0) | |
| target_node = task_cand_to_node.get((i, 0)) | |
| if target_node: | |
| baseline_nodes.append(target_node) | |
| if return_to_start: | |
| baseline_nodes.append(0) | |
| # 3. 計算 Baseline 成本 | |
| base_dist = 0 | |
| base_travel_time = 0 | |
| if len(baseline_nodes) > 1: | |
| for i in range(len(baseline_nodes) - 1): | |
| u = baseline_nodes[i] | |
| v = baseline_nodes[i + 1] | |
| base_dist += dist_matrix[u, v] | |
| base_travel_time += dur_matrix[u, v] | |
| # Baseline 總時間 = 純交通 + 總服務 (假設笨跑法不考慮等待,只算硬成本) | |
| base_total_duration = base_travel_time + total_service_time | |
| # 4. 計算百分比 | |
| dist_imp_pct = 0.0 | |
| if base_dist > 0: | |
| dist_imp_pct = ((base_dist - optimized_dist) / base_dist) * 100 | |
| time_imp_pct = 0.0 | |
| if base_total_duration > 0: | |
| time_imp_pct = ((base_total_duration - optimized_duration) / base_total_duration) * 100 | |
| efficiency_pct = 0.0 | |
| if optimized_duration > 0: | |
| efficiency_pct = (total_service_time / optimized_duration) * 100 | |
| total_task_count = len(tasks) | |
| completed_count = len(visited_task_ids) | |
| completion_rate = (completed_count / total_task_count * 100) if total_task_count > 0 else 0.0 | |
| return _OptimizationMetrics( | |
| # ✅ 新增完成率資訊 | |
| total_tasks=total_task_count, | |
| completed_tasks=completed_count, | |
| completion_rate_pct=round(completion_rate, 1), | |
| original_distance_m=int(base_dist), | |
| optimized_distance_m=int(optimized_dist), | |
| distance_saved_m=int(base_dist - optimized_dist), | |
| distance_improvement_pct=round(dist_imp_pct, 1), | |
| original_duration_min=int(base_total_duration // 60), | |
| optimized_duration_min=int(optimized_duration // 60), | |
| time_saved_min=int((base_total_duration - optimized_duration) // 60), | |
| time_improvement_pct=round(time_imp_pct, 1), | |
| route_efficiency_pct=round(efficiency_pct, 1) | |
| ) | |
| def _build_tasks_detail( | |
| self, | |
| tasks: List[_Task], | |
| visited_task_ids: Set[str], | |
| chosen_node_per_task: Dict[str, int], | |
| task_nodes: Dict[str, List[int]], | |
| node_meta: List[Dict[str, Any]], | |
| locations: List[Dict[str, float]], | |
| node_prev_next: Dict[int, Tuple[int, int]], | |
| arrival_sec_map: Dict[int, int], | |
| duration_matrix: np.ndarray, | |
| distance_matrix: np.ndarray, | |
| start_time: datetime, | |
| alt_k: int, | |
| ) -> List[_TaskDetail]: | |
| """ | |
| 構建任務詳情(保持原本邏輯不變) | |
| """ | |
| tasks_detail: List[_TaskDetail] = [] | |
| all_task_ids = [t.task_id for t in tasks] | |
| task_priority_map = {t.task_id: t.priority for t in tasks} | |
| for task_id in all_task_ids: | |
| visited = task_id in visited_task_ids | |
| priority = task_priority_map.get(task_id, "MEDIUM") | |
| chosen_node = chosen_node_per_task.get(task_id, None) | |
| all_nodes_for_task = task_nodes.get(task_id, []) | |
| chosen_poi_info = None | |
| if visited and chosen_node is not None: | |
| meta = node_meta[chosen_node] | |
| loc = locations[chosen_node] | |
| chosen_poi_info = { | |
| "node_index": chosen_node, | |
| "poi_id": meta["poi_id"], | |
| "lat": loc["lat"], | |
| "lng": loc["lng"], | |
| "interval_idx": meta.get("interval_idx"), | |
| } | |
| alternative_pois: List[_AlternativePOI] = [] | |
| if visited and chosen_node is not None and len(all_nodes_for_task) > 1: | |
| alternative_pois = self._find_alternative_pois( | |
| chosen_node=chosen_node, | |
| all_nodes_for_task=all_nodes_for_task, | |
| node_meta=node_meta, | |
| locations=locations, | |
| node_prev_next=node_prev_next, | |
| arrival_sec_map=arrival_sec_map, | |
| duration_matrix=duration_matrix, | |
| distance_matrix=distance_matrix, | |
| tasks=tasks, | |
| start_time=start_time, | |
| alt_k=alt_k, | |
| ) | |
| tasks_detail.append( | |
| _TaskDetail( | |
| task_id=task_id, | |
| priority=priority, | |
| visited=visited, | |
| chosen_poi=chosen_poi_info, | |
| alternative_pois=alternative_pois, | |
| ) | |
| ) | |
| return tasks_detail | |
| def _find_alternative_pois( | |
| self, | |
| chosen_node: int, | |
| all_nodes_for_task: List[int], | |
| node_meta: List[Dict[str, Any]], | |
| locations: List[Dict[str, float]], | |
| node_prev_next: Dict[int, Tuple[int, int]], | |
| arrival_sec_map: Dict[int, int], | |
| duration_matrix: np.ndarray, | |
| distance_matrix: np.ndarray, | |
| tasks: List[_Task], | |
| start_time: datetime, | |
| alt_k: int, | |
| ) -> List[_AlternativePOI]: | |
| """ | |
| 找出 Top-K 備選 POI (保持原本邏輯不變) | |
| """ | |
| prev_node, next_node = node_prev_next[chosen_node] | |
| base_time = duration_matrix[prev_node, chosen_node] + duration_matrix[chosen_node, next_node] | |
| base_dist = distance_matrix[prev_node, chosen_node] + distance_matrix[chosen_node, next_node] | |
| chosen_meta = node_meta[chosen_node] | |
| chosen_poi_id = chosen_meta["poi_id"] | |
| chosen_arr_sec = arrival_sec_map.get(chosen_node, 0) | |
| candidates_alt: List[_AlternativePOI] = [] | |
| for cand_node in all_nodes_for_task: | |
| meta_c = node_meta[cand_node] | |
| if cand_node == chosen_node: | |
| continue | |
| if meta_c["poi_id"] == chosen_poi_id: | |
| continue | |
| cand_start_sec, cand_end_sec = self.tw_handler.get_node_time_window_sec( | |
| meta_c, tasks, start_time | |
| ) | |
| if not (cand_start_sec <= chosen_arr_sec <= cand_end_sec): | |
| continue | |
| dt = duration_matrix[prev_node, cand_node] + duration_matrix[cand_node, next_node] | |
| dd = distance_matrix[prev_node, cand_node] + distance_matrix[cand_node, next_node] | |
| delta_time_sec = int(dt - base_time) | |
| delta_dist = int(dd - base_dist) | |
| loc_c = locations[cand_node] | |
| candidates_alt.append( | |
| _AlternativePOI( | |
| node_index=cand_node, | |
| poi_id=meta_c["poi_id"], | |
| lat=loc_c["lat"], | |
| lng=loc_c["lng"], | |
| interval_idx=meta_c.get("interval_idx"), | |
| delta_travel_time_min=delta_time_sec // 60, | |
| delta_travel_distance_m=delta_dist, | |
| ) | |
| ) | |
| candidates_alt.sort(key=lambda x: x.delta_travel_time_min) | |
| return candidates_alt[:alt_k] |