Spaces:
Running
Running
| """ | |
| Google Maps API Service | |
| """ | |
| import requests | |
| from typing import List, Dict, Any, Optional, Tuple, Union | |
| from datetime import datetime, timedelta, timezone | |
| import numpy as np | |
| import time | |
| from src.infra.logger import get_logger | |
| logger = get_logger(__name__) | |
| MAX_LOCATIONS_PER_SIDE = 50 | |
| def cleanup_str(key: str) -> str: | |
| return key.strip('\'" ') | |
| def clean_dict_list_keys(data_list: List[Dict[str, Any]]) -> List[Dict[str, Any]]: | |
| if not isinstance(data_list, list): | |
| # 為了安全,如果輸入不是列表,直接返回原始輸入或空列表 | |
| print(f"Warning: Input is not a list. Received type: {type(data_list)}") | |
| return data_list | |
| cleaned_list = [] | |
| for item in data_list: | |
| if isinstance(item, dict): | |
| cleaned_item = {} | |
| for original_key, value in item.items(): | |
| new_key = cleanup_str(original_key) | |
| cleaned_item[new_key] = value | |
| cleaned_list.append(cleaned_item) | |
| else: | |
| cleaned_list.append(item) | |
| return cleaned_list | |
| class GoogleMapAPIService: | |
| def __init__(self, api_key: str = None): | |
| """ | |
| Initialize Google Maps API service | |
| Args: | |
| api_key: Google Maps API key (required) | |
| """ | |
| if api_key: | |
| self.api_key = api_key | |
| else: | |
| import os | |
| self.api_key = os.getenv("GOOGLE_MAPS_API_KEY") | |
| try: | |
| from src.infra.config import get_settings | |
| settings = get_settings() | |
| self.api_key = settings.google_maps_api_key | |
| except (ImportError, AttributeError) as e: | |
| raise e | |
| if not self.api_key: | |
| raise ValueError( | |
| "Google Maps API key is required. " | |
| "Set via parameter, config, or GOOGLE_MAPS_API_KEY env var" | |
| ) | |
| # API endpoints | |
| self.geolocation_url = "https://www.googleapis.com/geolocation/v1/geolocate" | |
| self.places_text_search_url = "https://places.googleapis.com/v1/places:searchText" | |
| self.compute_routes_url = "https://routes.googleapis.com/directions/v2:computeRoutes" | |
| self.compute_route_matrix_url = "https://routes.googleapis.com/distanceMatrix/v2:computeRouteMatrix" | |
| logger.info("✅ Google Maps API service initialized") | |
| # ======================================================================== | |
| # Geolocation | |
| # ======================================================================== | |
| def current_location(self) -> Dict[str, Any]: | |
| """ | |
| Get current location using Geolocation API | |
| Returns: | |
| Location info with lat/lng | |
| Raises: | |
| requests.RequestException: If API request fails | |
| """ | |
| try: | |
| response = requests.post( | |
| self.geolocation_url, | |
| params={"key": self.api_key}, | |
| headers={"Content-Type": "application/json"}, | |
| timeout=10 | |
| ) | |
| if not response.ok: | |
| logger.error(f"❌ Geolocation API error: {response.text}") | |
| response.raise_for_status() | |
| data = response.json() | |
| # ✅ 改進: 檢查響應有效性 | |
| if "location" not in data: | |
| raise ValueError("Invalid geolocation response") | |
| logger.debug(f"📍 Current location: {data['location']}") | |
| return data | |
| except requests.Timeout: | |
| logger.error("⏰ Geolocation request timeout") | |
| raise | |
| except requests.RequestException as e: | |
| logger.error(f"❌ Geolocation request failed: {e}") | |
| raise | |
| except Exception as e: | |
| logger.error(f"❌ Unexpected error in current_location: {e}") | |
| raise | |
| # ======================================================================== | |
| # Places API - Text Search (New API v1) | |
| # ======================================================================== | |
| def text_search( | |
| self, | |
| query: str, | |
| location: Optional[Dict[str, float]] = None, | |
| radius: Optional[int] = 10000, | |
| limit: int = 3, | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Search for a place to get its coordinates and details. | |
| Use this to convert a user's vague query (e.g., "Gas station", "Taipei 101") into precise lat/lng coordinates. | |
| Args: | |
| query: The search text (e.g., "Din Tai Fung", "Hospital"). | |
| location: Optional bias location {"lat": float, "lng": float}. | |
| radius: Search radius in meters (optional). | |
| limit: Max results to return (default 3). | |
| Returns: | |
| List[Dict]: A list of places found. Each item contains: | |
| - name: str | |
| - address: str | |
| - location: {"lat": float, "lng": float} (CRITICAL for routing) | |
| - place_id: str | |
| - rating: float | |
| """ | |
| request_body = { | |
| "textQuery": query, | |
| "maxResultCount": min(limit, 20) | |
| } | |
| if location and radius: | |
| request_body["locationBias"] = { | |
| "circle": { | |
| "center": { | |
| "latitude": location["lat"], | |
| "longitude": location["lng"] | |
| }, | |
| "radius": float(radius) | |
| } | |
| } | |
| headers = { | |
| "Content-Type": "application/json", | |
| "X-Goog-Api-Key": self.api_key, | |
| "X-Goog-FieldMask": ( | |
| "places.id," | |
| "places.displayName," | |
| "places.formattedAddress," | |
| "places.location," | |
| "places.rating," | |
| "places.userRatingCount," | |
| "places.businessStatus," | |
| "places.currentOpeningHours," | |
| ) | |
| } | |
| try: | |
| logger.debug(f"🔍 Searching places: '{query}', cent_location:'{location}'(limit={limit})") | |
| response = requests.post( | |
| self.places_text_search_url, | |
| json=request_body, | |
| headers=headers, | |
| timeout=10 | |
| ) | |
| if not response.ok: | |
| print(f"❌ Places Text Search API error: {response.text}") | |
| response.raise_for_status() | |
| data = response.json() | |
| places = data.get("places", []) | |
| if not places: | |
| logger.warning(f"⚠️ No places found for query: '{query}'") | |
| return [] | |
| logger.info(f"✅ Found {len(places)} places for '{query}'") | |
| standardized_places = [] | |
| for place in places: | |
| standardized_places.append({ | |
| "place_id": place.get("id"), | |
| "name": place.get("displayName", {}).get("text", "Unknown"), | |
| "address": place.get("formattedAddress", ""), | |
| "location": place.get("location", {}), | |
| "rating": place.get("rating"), | |
| "user_ratings_total": place.get("userRatingCount", 0), | |
| "business_status": place.get("businessStatus", "UNKNOWN"), | |
| "opening_hours": place.get("currentOpeningHours", {}) | |
| }) | |
| return standardized_places | |
| except requests.Timeout: | |
| logger.error("⏰ Places search timeout") | |
| raise | |
| except requests.RequestException as e: | |
| logger.error(f"❌ Places search failed: {e}") | |
| raise | |
| except Exception as e: | |
| logger.error(f"❌ Unexpected error in text_search: {e}") | |
| raise | |
| # ======================================================================== | |
| # Routes API - Compute Routes | |
| # ======================================================================== | |
| def _compute_routes( | |
| self, | |
| origin: Dict[str, Any], | |
| destination: Dict[str, Any], | |
| waypoints: Optional[List[Dict[str, float]]] = None, | |
| travel_mode: str = "DRIVE", | |
| routing_preference: str = "TRAFFIC_AWARE_OPTIMAL", | |
| compute_alternative_routes: bool = False, | |
| optimize_waypoint_order: bool = False, | |
| departure_time: Optional[datetime] = None, | |
| include_traffic_on_polyline: bool = True | |
| ) -> Dict[str, Any]: | |
| """ | |
| ✅ P0-1 & P0-3 修復: 正確的經緯度 + 完整的響應解析 | |
| Internal method to compute routes using Routes API v2 | |
| Args: | |
| origin: Origin location dict with 'lat' and 'lng' keys | |
| destination: Destination location dict with 'lat' and 'lng' keys | |
| waypoints: Optional list of waypoint dicts with 'lat' and 'lng' | |
| travel_mode: Travel mode (DRIVE, WALK, BICYCLE, TRANSIT) | |
| routing_preference: Routing preference | |
| compute_alternative_routes: Whether to compute alternatives | |
| optimize_waypoint_order: Whether to optimize waypoint order | |
| departure_time: Departure time as datetime object | |
| include_traffic_on_polyline: Whether to include traffic info | |
| Returns: | |
| Parsed route information dict with: | |
| - distance_meters: int | |
| - duration_seconds: int | |
| - encoded_polyline: str | |
| - legs: List[Dict] | |
| - optimized_waypoint_order: Optional[List[int]] | |
| Raises: | |
| ValueError: If route cannot be computed | |
| requests.RequestException: If API request fails | |
| """ | |
| # ✅ P0-1 修復: 正確使用 lng 而不是 lat | |
| request_body = { | |
| "origin": { | |
| "location": { | |
| "placeId": origin["place_id"] | |
| } if "place_id" in origin else { | |
| "latLng": { | |
| "latitude": origin["lat"], | |
| "longitude": origin["lng"] | |
| } | |
| } | |
| }, | |
| "destination": { | |
| "location": { | |
| "placeId": destination["place_id"] | |
| } if "place_id" in destination else { | |
| "latLng": { | |
| "latitude": destination["lat"], | |
| "longitude": destination["lng"] | |
| } | |
| } | |
| }, | |
| "travelMode": travel_mode, | |
| "routingPreference": routing_preference, | |
| "computeAlternativeRoutes": compute_alternative_routes, | |
| "languageCode": "zh-TW", | |
| "units": "METRIC" | |
| } | |
| # ✅ 添加中間點 | |
| if waypoints: | |
| request_body["intermediates"] = [ | |
| { | |
| "location": { | |
| "latLng": { | |
| "latitude": wp["lat"], | |
| "longitude": wp["lng"] | |
| } | |
| } | |
| } | |
| for wp in waypoints | |
| ] | |
| if optimize_waypoint_order: | |
| request_body["optimizeWaypointOrder"] = True | |
| if departure_time: | |
| if isinstance(departure_time, datetime): | |
| request_body["departureTime"] = departure_time.isoformat() | |
| elif departure_time == "now": | |
| request_body["departureTime"] = datetime.now(timezone.utc).isoformat() | |
| # ✅ Field Mask | |
| field_mask_parts = [ | |
| "routes.duration", | |
| "routes.distanceMeters", | |
| "routes.polyline.encodedPolyline", | |
| "routes.legs", | |
| "routes.optimizedIntermediateWaypointIndex" | |
| ] | |
| if include_traffic_on_polyline: | |
| field_mask_parts.extend([ | |
| "routes.travelAdvisory", | |
| "routes.legs.travelAdvisory" | |
| ]) | |
| field_mask = ",".join(field_mask_parts) | |
| headers = { | |
| "Content-Type": "application/json", | |
| "X-Goog-Api-Key": self.api_key, | |
| "X-Goog-FieldMask": field_mask | |
| } | |
| logger.debug( | |
| f"🗺️ Computing route: {origin['lat']:.4f},{origin['lng']:.4f} → " | |
| f"{destination['lat']:.4f},{destination['lng']:.4f}, {request_body['travelMode']} " | |
| f"({len(waypoints) if waypoints else 0} waypoints)" | |
| ) | |
| try: | |
| response = requests.post( | |
| self.compute_routes_url, | |
| json=request_body, | |
| headers=headers, | |
| timeout=30 # ✅ 增加超時時間 | |
| ) | |
| if not response.ok: | |
| logger.error(f"❌ Routes API error: {response.text}") | |
| response.raise_for_status() | |
| data = response.json() | |
| if hasattr(response, 'json'): | |
| try: | |
| data = response.json() # 📦 拆開包裹,取得字典資料 | |
| except ValueError: | |
| print("❌ API 回傳的內容不是有效的 JSON") | |
| data = {} | |
| # 如果它已經是 list 或 dict (例如某些 client library 會自動轉),就直接用 | |
| elif isinstance(response, (dict, list)): | |
| data = response | |
| else: | |
| # 防呆:未知的格式 | |
| data = {} | |
| # [轉接] 確保格式符合 Parser 需求 (Google Maps API 通常回傳 dict,但也可能包在 list 裡) | |
| if isinstance(data, list): | |
| formatted_response = {"routes": data} | |
| else: | |
| formatted_response = data | |
| if "routes" not in data or not data["routes"]: | |
| logger.error("❌ No routes found in API response") | |
| raise ValueError("No routes found") | |
| route = data["routes"][0] | |
| distance_meters = route.get("distanceMeters", 0) | |
| duration_str = route.get("duration", "0s") | |
| duration_seconds = int(duration_str.rstrip("s")) | |
| polyline_data = route.get("polyline", {}) | |
| encoded_polyline = polyline_data.get("encodedPolyline", "") | |
| legs = route.get("legs", []) | |
| optimized_order = route.get("optimizedIntermediateWaypointIndex") | |
| result = { | |
| "distance_meters": distance_meters, | |
| "duration_seconds": duration_seconds, | |
| "encoded_polyline": encoded_polyline, | |
| "legs": legs, | |
| "optimized_waypoint_order": optimized_order | |
| } | |
| logger.debug( | |
| f"✅ Route computed: {distance_meters}m, {duration_seconds}s" | |
| ) | |
| return result | |
| except requests.Timeout: | |
| logger.error("⏰ Routes request timeout") | |
| raise | |
| except requests.RequestException as e: | |
| logger.error(f"❌ Routes request failed: {e}") | |
| raise | |
| except (KeyError, ValueError) as e: | |
| logger.error(f"❌ Failed to parse route response: {e}") | |
| raise ValueError(f"Invalid route response: {e}") | |
| except Exception as e: | |
| logger.error(f"❌ Unexpected error in _compute_routes: {e}") | |
| raise | |
| # ======================================================================== | |
| # Multi-leg Route Computation | |
| # ======================================================================== | |
| def _encode_polyline(points: List[Tuple[float, float]]) -> str: | |
| def encode_coord(coord): | |
| coord = int(round(coord * 1e5)) | |
| coord <<= 1 | |
| if coord < 0: | |
| coord = ~coord | |
| result = "" | |
| while coord >= 0x20: | |
| result += chr((0x20 | (coord & 0x1f)) + 63) | |
| coord >>= 5 | |
| result += chr(coord + 63) | |
| return result | |
| encoded = "" | |
| last_lat, last_lng = 0, 0 | |
| for lat, lng in points: | |
| d_lat = lat - last_lat | |
| d_lng = lng - last_lng | |
| encoded += encode_coord(d_lat) + encode_coord(d_lng) | |
| last_lat, last_lng = lat, lng | |
| return encoded | |
| def compute_routes( | |
| self, | |
| place_points: List[Dict[str, float]], | |
| start_time: Union[datetime, str], | |
| stop_times: List[int], | |
| travel_mode: Union[str, List[str]] = "DRIVE", | |
| routing_preference: str = "TRAFFIC_AWARE_OPTIMAL" | |
| ) -> Dict[str, Any]: | |
| if len(place_points) < 2: | |
| raise ValueError("At least 2 places required for route computation") | |
| # ... (驗證 stop_times 長度與時間初始化代碼保持不變) ... | |
| # 處理開始時間 | |
| if start_time == "now" or start_time is None: | |
| start_time = datetime.now(timezone.utc) | |
| current_time = datetime.now(timezone.utc) | |
| elif isinstance(start_time, datetime): | |
| current_time = start_time.astimezone(timezone.utc) | |
| else: | |
| raise ValueError(f"Invalid start_time type: {type(start_time)}") | |
| legs_info = [] | |
| total_distance = 0 | |
| total_duration = 0 | |
| place_points = clean_dict_list_keys(place_points) | |
| num_legs = len(place_points) - 1 | |
| for i in range(num_legs): | |
| origin = place_points[i] | |
| destination = place_points[i + 1] | |
| # 1. 決定當前首選模式 | |
| if isinstance(travel_mode, list): | |
| primary_mode = travel_mode[i].upper() | |
| else: | |
| primary_mode = travel_mode.upper() | |
| modes_to_try = [] | |
| if primary_mode == "DRIVE": | |
| # 開車失敗 -> 試試機車 -> 再不行試試公車 (適合跨海/長途) -> 最後才走路 | |
| modes_to_try = ["DRIVE", "TWO_WHEELER", "TRANSIT", "WALK"] | |
| elif primary_mode == "TWO_WHEELER": | |
| modes_to_try = ["TWO_WHEELER", "TRANSIT", "WALK"] | |
| elif primary_mode == "TRANSIT": | |
| modes_to_try = ["TRANSIT", "WALK"] | |
| else: | |
| modes_to_try = ["WALK"] | |
| route_found = False | |
| final_mode_used = primary_mode | |
| leg_distance = 0 | |
| leg_duration = 0 | |
| encoded_polyline = "" | |
| # 3. 開始嘗試 | |
| for mode in modes_to_try: | |
| # 設定 Preference | |
| # ✅ TRANSIT 不支援 traffic_aware,必須設為 None | |
| if mode in ["DRIVE", "TWO_WHEELER"]: | |
| pref = None if routing_preference == "UNDRIVE" else routing_preference.upper() | |
| else: | |
| pref = None | |
| departure_time = current_time if i > 0 else None | |
| try: | |
| # 呼叫 API | |
| route = self._compute_routes( | |
| origin=origin, | |
| destination=destination, | |
| waypoints=None, | |
| travel_mode=mode, | |
| routing_preference=pref, | |
| include_traffic_on_polyline=True, | |
| # ✅ TRANSIT 模式非常依賴 departure_time,務必確保有傳入 | |
| departure_time=departure_time | |
| ) | |
| leg_distance = route["distance_meters"] | |
| leg_duration = route["duration_seconds"] | |
| encoded_polyline = route["encoded_polyline"] | |
| route_found = True | |
| final_mode_used = mode | |
| if mode != primary_mode: | |
| logger.info(f" 🔄 Leg {i + 1}: {primary_mode} failed. Switched to {mode} (Polyline OK).") | |
| break | |
| except Exception: | |
| continue | |
| # 4. 絕望情況:連走路都失敗 (Math Fallback) | |
| if not route_found: | |
| logger.warning(f"⚠️ Leg {i + 1}: All API modes (DRIVE/TWO_WHEELER/TRANSIT/WALK) failed.") | |
| logger.warning( | |
| f" 📍 From: {origin['lat']},{origin['lng']} -> To: {destination['lat']},{destination['lng']}") | |
| logger.warning(f" 🔄 Activating Math Fallback (Straight Line).") | |
| # 計算直線距離 | |
| leg_distance = self._calculate_haversine_distance( | |
| origin['lat'], origin['lng'], | |
| destination['lat'], destination['lng'] | |
| ) | |
| # 估算時間 (走路速度 5km/h) | |
| speed = 1.38 | |
| leg_duration = int(leg_distance / speed) | |
| # ✅ [FIX] 生成「直線 Polyline」給前端 | |
| # 雖然是直線,但至少前端不會因為空字串而報錯 | |
| encoded_polyline = self._encode_polyline([ | |
| (origin['lat'], origin['lng']), | |
| (destination['lat'], destination['lng']) | |
| ]) | |
| final_mode_used = "MATH_ESTIMATE" | |
| # --- 累加數據 --- | |
| total_distance += leg_distance | |
| total_duration += leg_duration | |
| legs_info.append({ | |
| "from_index": i, | |
| "to_index": i + 1, | |
| "travel_mode": final_mode_used, | |
| "distance_meters": leg_distance, | |
| "duration_seconds": leg_duration, | |
| "departure_time": current_time.isoformat(), | |
| "polyline": encoded_polyline # ✅ 現在這裡保證有值 | |
| }) | |
| # 更新時間 (使用秒數) | |
| current_time += timedelta(seconds=leg_duration) | |
| stop_duration = stop_times[i + 1] | |
| current_time += timedelta(seconds=stop_duration) | |
| return { | |
| "total_distance_meters": total_distance, | |
| "total_duration_seconds": total_duration, | |
| "total_residence_time_minutes": sum(stop_times) // 60, | |
| "total_time_seconds": int((current_time - start_time).total_seconds()), | |
| "start_time": start_time, | |
| "end_time": current_time, | |
| "stops": place_points, | |
| "legs": legs_info, | |
| } | |
| # ======================================================================== | |
| # Distance Matrix API | |
| # ======================================================================== | |
| def _compute_route_matrix( | |
| self, | |
| origins: List[Dict[str, float]], | |
| destinations: List[Dict[str, float]], | |
| travel_mode: str = "DRIVE", | |
| routing_preference: str = "TRAFFIC_UNAWARE", | |
| departure_time: Optional[datetime] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Internal method to compute distance matrix | |
| Args: | |
| origins: List of origin location dicts | |
| destinations: List of destination location dicts | |
| travel_mode: Travel mode | |
| routing_preference: Routing preference | |
| departure_time: Departure time | |
| Returns: | |
| API response with distance matrix data | |
| Raises: | |
| requests.RequestException: If API request fails | |
| """ | |
| request_body = { | |
| "origins": [ | |
| { | |
| "waypoint": { | |
| "location": { | |
| "latLng": { | |
| "latitude": origin["lat"], | |
| "longitude": origin["lng"] | |
| } | |
| } | |
| } | |
| } | |
| for origin in origins | |
| ], | |
| "destinations": [ | |
| { | |
| "waypoint": { | |
| "location": { | |
| "latLng": { | |
| "latitude": dest["lat"], | |
| "longitude": dest["lng"] | |
| } | |
| } | |
| } | |
| } | |
| for dest in destinations | |
| ], | |
| "travelMode": travel_mode.upper(), | |
| "routingPreference": routing_preference.upper() | |
| } | |
| # ✅ 添加 departure_time | |
| if departure_time: | |
| if isinstance(departure_time, datetime): | |
| request_body["departureTime"] = departure_time.isoformat() | |
| elif departure_time == "now": | |
| request_body["departureTime"] = datetime.now(timezone.utc).isoformat() | |
| field_mask = "originIndex,destinationIndex,distanceMeters,duration" | |
| headers = { | |
| "Content-Type": "application/json", | |
| "X-Goog-Api-Key": self.api_key, | |
| "X-Goog-FieldMask": field_mask | |
| } | |
| logger.debug( | |
| f"📏 Computing route matrix: {len(origins)} × {len(destinations)}" | |
| ) | |
| try: | |
| response = requests.post( | |
| self.compute_route_matrix_url, | |
| json=request_body, | |
| headers=headers, | |
| timeout=30 | |
| ) | |
| response.raise_for_status() | |
| data = response.json() | |
| if "error" in data: | |
| raise ValueError(f"API Error: {data['error']}") | |
| if isinstance(data, list): | |
| return data | |
| elif "results" in data: | |
| return data["results"] | |
| else: | |
| # 兜底:返回空數組 | |
| logger.warning(f"⚠️ Unexpected Distance Matrix response format: {data.keys()}") | |
| return [] | |
| except requests.Timeout: | |
| logger.error("⏰ Route matrix request timeout") | |
| raise | |
| except requests.RequestException as e: | |
| logger.error(f"❌ Route matrix request failed: {e}") | |
| raise e | |
| except Exception as e: | |
| logger.error(f"❌ Unexpected error in _compute_route_matrix: {e}") | |
| raise | |
| def _get_max_elements(self, travel_mode: str, routing_preference: str) -> int: | |
| """ | |
| Get max elements per request based on travel mode and preference | |
| Args: | |
| travel_mode: Travel mode | |
| routing_preference: Routing preference | |
| Returns: | |
| Max elements allowed | |
| """ | |
| if travel_mode.upper() == "TRANSIT": | |
| return 100 | |
| if routing_preference and routing_preference.upper() == "TRAFFIC_AWARE_OPTIMAL": | |
| return 100 | |
| return 625 | |
| def compute_route_matrix( | |
| self, | |
| origins: List[Dict[str, float]], | |
| destinations: List[Dict[str, float]], | |
| travel_mode: str = "DRIVE", | |
| routing_preference: Optional[str] = None, | |
| departure_time: Optional[datetime] = None, | |
| ) -> Dict[str, np.ndarray]: | |
| """ | |
| ✅ P1-1 修復: 正確調用 _compute_route_matrix | |
| Compute distance and duration matrix between multiple locations | |
| Args: | |
| origins: List of origin location dicts with 'lat' and 'lng' | |
| destinations: List of destination location dicts with 'lat' and 'lng' | |
| travel_mode: Travel mode (DRIVE, WALK, BICYCLE, TRANSIT) | |
| routing_preference: Routing preference (optional) | |
| departure_time: Departure time as datetime object (optional) | |
| Returns: | |
| Dict with numpy arrays: | |
| - duration_matrix: Duration in seconds (shape: O×D) | |
| - distance_matrix: Distance in meters (shape: O×D) | |
| Raises: | |
| ValueError: If input is invalid | |
| requests.RequestException: If API request fails | |
| """ | |
| if not origins or not destinations: | |
| raise ValueError("Origins and destinations cannot be empty") | |
| max_elements = self._get_max_elements(travel_mode, routing_preference or "") | |
| O_len, D_len = len(origins), len(destinations) | |
| duration_matrix = np.full((O_len, D_len), -1, dtype=np.int32) | |
| distance_matrix = np.full((O_len, D_len), -1, dtype=np.int32) | |
| logger.info( | |
| f"📊 Computing {O_len}×{D_len} matrix " | |
| f"(max_elements={max_elements})" | |
| ) | |
| # ✅ 分批處理 | |
| for o_start in range(0, O_len, MAX_LOCATIONS_PER_SIDE): | |
| o_end = min(o_start + MAX_LOCATIONS_PER_SIDE, O_len) | |
| origin_batch = origins[o_start:o_end] | |
| origin_batch_size = len(origin_batch) | |
| max_dest_batch_size = max( | |
| 1, | |
| min( | |
| MAX_LOCATIONS_PER_SIDE, | |
| min(D_len, max_elements // origin_batch_size) | |
| ), | |
| ) | |
| for d_start in range(0, D_len, max_dest_batch_size): | |
| d_end = min(d_start + max_dest_batch_size, D_len) | |
| dest_batch = destinations[d_start:d_end] | |
| try: | |
| time.sleep(0.2) | |
| elements = self._compute_route_matrix( | |
| origins=origin_batch, | |
| destinations=dest_batch, | |
| travel_mode=travel_mode, | |
| routing_preference=routing_preference or "TRAFFIC_UNAWARE", | |
| departure_time=departure_time, | |
| ) | |
| if not elements: | |
| logger.warning( | |
| f"⚠️ No elements in batch " | |
| f"[{o_start}:{o_end}, {d_start}:{d_end}]" | |
| ) | |
| continue | |
| for el in elements: | |
| if "originIndex" not in el or "destinationIndex" not in el: | |
| continue | |
| oi = o_start + el["originIndex"] | |
| di = d_start + el["destinationIndex"] | |
| # ✅ 解析時間 (格式: "300s") | |
| duration_str = el.get("duration", "0s") | |
| duration_seconds = int(duration_str.rstrip("s")) | |
| # ✅ 解析距離 | |
| distance_meters = el.get("distanceMeters", 0) | |
| duration_matrix[oi, di] = duration_seconds | |
| distance_matrix[oi, di] = distance_meters | |
| except Exception as e: | |
| if "429" in str(e) or "Too Many Requests" in str(e): | |
| logger.error(f"🚨 Rate Limit Hit! Sleeping for 5s...") | |
| time.sleep(5) | |
| logger.error( | |
| f"❌ Failed to compute matrix batch " | |
| f"[{o_start}:{o_end}, {d_start}:{d_end}]: {e}") | |
| logger.info( | |
| f"✅ Route matrix computed: " | |
| f"{np.sum(duration_matrix >= 0)} / {O_len * D_len} elements" | |
| ) | |
| return { | |
| "duration_matrix": duration_matrix, | |
| "distance_matrix": distance_matrix, | |
| } | |
| # ============================================================================ | |
| # Usage Example | |
| # ============================================================================ | |
| if __name__ == "__main__": | |
| import os | |
| from src.infra.config import get_settings | |
| # Initialize service | |
| setting = get_settings() | |
| api_key = setting.google_maps_api_key | |
| if not api_key: | |
| print("❌ Please set GOOGLE_MAPS_API_KEY environment variable") | |
| exit(1) | |
| service = GoogleMapAPIService(api_key=api_key) | |
| # Example 1: Current location | |
| try: | |
| print("\n" + "="*60) | |
| print("Example 1: Current Location") | |
| print("="*60) | |
| location = service.current_location() | |
| print(f"📍 Current: {location['location']}") | |
| except Exception as e: | |
| print(f"❌ Failed: {e}") | |
| # Example 2: Text search | |
| try: | |
| print("\n" + "="*60) | |
| print("Example 2: Text Search") | |
| print("="*60) | |
| places = service.text_search( | |
| query="台大醫院", | |
| location={"lat": 25.0408, "lng": 121.5318}, | |
| radius=5000, | |
| limit=3 | |
| ) | |
| for i, place in enumerate(places, 1): | |
| print(f"{i}. {place['name']} - {place['address']}, {place['place_id']}") | |
| except Exception as e: | |
| print(f"❌ Failed: {e}") | |
| # Example 3: Compute route | |
| try: | |
| print("\n" + "="*60) | |
| print("Example 3: Compute Route") | |
| print("="*60) | |
| places = [ | |
| {"lat": 25.0408, "lng": 121.5318}, # 台北車站 | |
| {"lat": 25.0330, "lng": 121.5654}, # 台北101 | |
| {"lat": 25.0478, "lng": 121.5170}, # 西門町 | |
| ] | |
| route = service.compute_routes( | |
| place_points=places, | |
| start_time=datetime.now(timezone.utc), | |
| stop_times=[30, 60, 50], # 停留時間 (分鐘) | |
| travel_mode="DRIVE", | |
| routing_preference="TRAFFIC_AWARE" ##"TRAFFIC_AWARE_OPTIMAL" | |
| ) | |
| print(f"📊 Total distance: {route['total_distance_meters']}m") | |
| print(f"⏱️ Total duration: {route['total_duration_seconds']}s") | |
| print(f"🏁 End time: {route['end_time']}") | |
| except Exception as e: | |
| print(f"❌ Failed: {e}") | |
| raise e | |
| # Example 4: Distance matrix | |
| try: | |
| print("\n" + "="*60) | |
| print("Example 4: Distance Matrix") | |
| print("="*60) | |
| origins = [ | |
| {"lat": 25.0408, "lng": 121.5318}, | |
| {"lat": 25.0330, "lng": 121.5654}, | |
| ] | |
| destinations = [ | |
| {"lat": 25.0478, "lng": 121.5170}, | |
| {"lat": 25.0360, "lng": 121.5000}, | |
| ] | |
| matrices = service.compute_route_matrix( | |
| origins=origins, | |
| destinations=destinations, | |
| travel_mode="WALK" | |
| ) | |
| print("Duration matrix (seconds):") | |
| print(matrices["duration_matrix"]) | |
| print("\nDistance matrix (meters):") | |
| print(matrices["distance_matrix"]) | |
| except Exception as e: | |
| print(f"❌ Failed: {e}") | |
| print("\n✅ All examples completed!") |