LifeFlow-AI / src /services /local_route_estimator.py
Marco310's picture
feat(v1.0.1): upgrade planner reasoning, map visuals, and presenter output
9066e49
import numpy as np
from typing import List, Dict, Optional
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
class LocalRouteEstimator:
"""
一個本地的距離與時間估算器,用於取代昂貴的 Google Distance Matrix API。
使用 Haversine 公式 + 曲折係數進行估算。
"""
# 曲折係數 (Circuity Factor): 直線距離 x 此係數 = 估算道路距離
# 一般城市道路約為 1.3 ~ 1.5
TORTUOSITY_FACTOR = 1.55
# 預設行駛速度 (km/h) -> 轉換為 m/s
AVERAGE_SPEED_KMH = 32.5
AVERAGE_SPEED_MPS = AVERAGE_SPEED_KMH * 1000 / 3600
def compute_route_matrix(
self,
origins: List[Dict[str, float]],
destinations: List[Dict[str, float]],
*args, **kwargs
) -> Dict[str, np.ndarray]:
"""
本地版矩陣計算 (Drop-in replacement for Google API)
完全不發送 HTTP 請求,純數學計算。
"""
O_len = len(origins)
D_len = len(destinations)
logger.info(
f"🧮 Computing LOCAL route matrix: {O_len}×{D_len} "
f"(Speed assumed: {self.AVERAGE_SPEED_KMH} km/h)"
)
# 1. 準備座標數據 (轉成 numpy array)
# origins structure: [{'lat': x, 'lng': y}, ...]
o_lats = np.array([o['lat'] for o in origins], dtype=np.float64)
o_lngs = np.array([o['lng'] for o in origins], dtype=np.float64)
d_lats = np.array([d['lat'] for d in destinations], dtype=np.float64)
d_lngs = np.array([d['lng'] for d in destinations], dtype=np.float64)
# 2. 向量化 Haversine 計算 (利用 Broadcasting 建立矩陣)
# 將維度調整為 (O, 1) 和 (1, D) 以便廣播運算
lat1 = np.radians(o_lats)[:, np.newaxis]
lon1 = np.radians(o_lngs)[:, np.newaxis]
lat2 = np.radians(d_lats)[np.newaxis, :]
lon2 = np.radians(d_lngs)[np.newaxis, :]
# Haversine 公式
dlon = lon2 - lon1
dlat = lat2 - lat1
a = np.sin(dlat / 2) ** 2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon / 2) ** 2
c = 2 * np.arcsin(np.sqrt(a))
r = 6371000 # 地球半徑 (公尺)
# 得到直線距離 (Great Circle Distance)
straight_distance = c * r
# 3. 估算道路距離 (Road Distance)
# 乘以曲折係數
estimated_distance = straight_distance * self.TORTUOSITY_FACTOR
# 4. 估算時間 (Duration)
# 時間 = 距離 / 速度
# 為了避免除以零,我們假設最小時間
estimated_duration = estimated_distance / self.AVERAGE_SPEED_MPS
# 轉換為 int32 (與 Google API 回傳格式保持一致)
distance_matrix = estimated_distance.astype(np.int32)
duration_matrix = estimated_duration.astype(np.int32)
# 5. 自我對角線處理 (如果是同一個點,距離時間設為 0)
# 只有當 origins 和 destinations 列表完全一樣時才需要檢查
# 這裡簡單處理:如果距離極小 (< 10米),設為 0
mask = distance_matrix < 10
distance_matrix[mask] = 0
duration_matrix[mask] = 0
logger.debug("✅ Local matrix computation done.")
return {
"duration_matrix": duration_matrix,
"distance_matrix": distance_matrix,
}