""" Google Maps API Service """ import requests from typing import List, Dict, Any, Optional, Tuple, Union from datetime import datetime, timedelta, timezone import numpy as np import time from src.infra.logger import get_logger logger = get_logger(__name__) MAX_LOCATIONS_PER_SIDE = 50 def cleanup_str(key: str) -> str: return key.strip('\'" ') def clean_dict_list_keys(data_list: List[Dict[str, Any]]) -> List[Dict[str, Any]]: if not isinstance(data_list, list): # 為了安全,如果輸入不是列表,直接返回原始輸入或空列表 print(f"Warning: Input is not a list. Received type: {type(data_list)}") return data_list cleaned_list = [] for item in data_list: if isinstance(item, dict): cleaned_item = {} for original_key, value in item.items(): new_key = cleanup_str(original_key) cleaned_item[new_key] = value cleaned_list.append(cleaned_item) else: cleaned_list.append(item) return cleaned_list class GoogleMapAPIService: def __init__(self, api_key: str = None): """ Initialize Google Maps API service Args: api_key: Google Maps API key (required) """ if api_key: self.api_key = api_key else: import os self.api_key = os.getenv("GOOGLE_MAPS_API_KEY") try: from src.infra.config import get_settings settings = get_settings() self.api_key = settings.google_maps_api_key except (ImportError, AttributeError) as e: raise e if not self.api_key: raise ValueError( "Google Maps API key is required. " "Set via parameter, config, or GOOGLE_MAPS_API_KEY env var" ) # API endpoints self.geolocation_url = "https://www.googleapis.com/geolocation/v1/geolocate" self.places_text_search_url = "https://places.googleapis.com/v1/places:searchText" self.compute_routes_url = "https://routes.googleapis.com/directions/v2:computeRoutes" self.compute_route_matrix_url = "https://routes.googleapis.com/distanceMatrix/v2:computeRouteMatrix" logger.info("✅ Google Maps API service initialized") # ======================================================================== # Geolocation # ======================================================================== def current_location(self) -> Dict[str, Any]: """ Get current location using Geolocation API Returns: Location info with lat/lng Raises: requests.RequestException: If API request fails """ try: response = requests.post( self.geolocation_url, params={"key": self.api_key}, headers={"Content-Type": "application/json"}, timeout=10 ) if not response.ok: logger.error(f"❌ Geolocation API error: {response.text}") response.raise_for_status() data = response.json() # ✅ 改進: 檢查響應有效性 if "location" not in data: raise ValueError("Invalid geolocation response") logger.debug(f"📍 Current location: {data['location']}") return data except requests.Timeout: logger.error("⏰ Geolocation request timeout") raise except requests.RequestException as e: logger.error(f"❌ Geolocation request failed: {e}") raise except Exception as e: logger.error(f"❌ Unexpected error in current_location: {e}") raise # ======================================================================== # Places API - Text Search (New API v1) # ======================================================================== def text_search( self, query: str, location: Optional[Dict[str, float]] = None, radius: Optional[int] = 10000, limit: int = 3, ) -> List[Dict[str, Any]]: """ Search for a place to get its coordinates and details. Use this to convert a user's vague query (e.g., "Gas station", "Taipei 101") into precise lat/lng coordinates. Args: query: The search text (e.g., "Din Tai Fung", "Hospital"). location: Optional bias location {"lat": float, "lng": float}. radius: Search radius in meters (optional). limit: Max results to return (default 3). Returns: List[Dict]: A list of places found. Each item contains: - name: str - address: str - location: {"lat": float, "lng": float} (CRITICAL for routing) - place_id: str - rating: float """ request_body = { "textQuery": query, "maxResultCount": min(limit, 20) } if location and radius: request_body["locationBias"] = { "circle": { "center": { "latitude": location["lat"], "longitude": location["lng"] }, "radius": float(radius) } } headers = { "Content-Type": "application/json", "X-Goog-Api-Key": self.api_key, "X-Goog-FieldMask": ( "places.id," "places.displayName," "places.formattedAddress," "places.location," "places.rating," "places.userRatingCount," "places.businessStatus," "places.currentOpeningHours," ) } try: logger.debug(f"🔍 Searching places: '{query}', cent_location:'{location}'(limit={limit})") response = requests.post( self.places_text_search_url, json=request_body, headers=headers, timeout=10 ) if not response.ok: print(f"❌ Places Text Search API error: {response.text}") response.raise_for_status() data = response.json() places = data.get("places", []) if not places: logger.warning(f"⚠️ No places found for query: '{query}'") return [] logger.info(f"✅ Found {len(places)} places for '{query}'") standardized_places = [] for place in places: standardized_places.append({ "place_id": place.get("id"), "name": place.get("displayName", {}).get("text", "Unknown"), "address": place.get("formattedAddress", ""), "location": place.get("location", {}), "rating": place.get("rating"), "user_ratings_total": place.get("userRatingCount", 0), "business_status": place.get("businessStatus", "UNKNOWN"), "opening_hours": place.get("currentOpeningHours", {}) }) return standardized_places except requests.Timeout: logger.error("⏰ Places search timeout") raise except requests.RequestException as e: logger.error(f"❌ Places search failed: {e}") raise except Exception as e: logger.error(f"❌ Unexpected error in text_search: {e}") raise # ======================================================================== # Routes API - Compute Routes # ======================================================================== def _compute_routes( self, origin: Dict[str, Any], destination: Dict[str, Any], waypoints: Optional[List[Dict[str, float]]] = None, travel_mode: str = "DRIVE", routing_preference: str = "TRAFFIC_AWARE_OPTIMAL", compute_alternative_routes: bool = False, optimize_waypoint_order: bool = False, departure_time: Optional[datetime] = None, include_traffic_on_polyline: bool = True ) -> Dict[str, Any]: """ ✅ P0-1 & P0-3 修復: 正確的經緯度 + 完整的響應解析 Internal method to compute routes using Routes API v2 Args: origin: Origin location dict with 'lat' and 'lng' keys destination: Destination location dict with 'lat' and 'lng' keys waypoints: Optional list of waypoint dicts with 'lat' and 'lng' travel_mode: Travel mode (DRIVE, WALK, BICYCLE, TRANSIT) routing_preference: Routing preference compute_alternative_routes: Whether to compute alternatives optimize_waypoint_order: Whether to optimize waypoint order departure_time: Departure time as datetime object include_traffic_on_polyline: Whether to include traffic info Returns: Parsed route information dict with: - distance_meters: int - duration_seconds: int - encoded_polyline: str - legs: List[Dict] - optimized_waypoint_order: Optional[List[int]] Raises: ValueError: If route cannot be computed requests.RequestException: If API request fails """ # ✅ P0-1 修復: 正確使用 lng 而不是 lat request_body = { "origin": { "location": { "placeId": origin["place_id"] } if "place_id" in origin else { "latLng": { "latitude": origin["lat"], "longitude": origin["lng"] } } }, "destination": { "location": { "placeId": destination["place_id"] } if "place_id" in destination else { "latLng": { "latitude": destination["lat"], "longitude": destination["lng"] } } }, "travelMode": travel_mode, "routingPreference": routing_preference, "computeAlternativeRoutes": compute_alternative_routes, "languageCode": "zh-TW", "units": "METRIC" } # ✅ 添加中間點 if waypoints: request_body["intermediates"] = [ { "location": { "latLng": { "latitude": wp["lat"], "longitude": wp["lng"] } } } for wp in waypoints ] if optimize_waypoint_order: request_body["optimizeWaypointOrder"] = True if departure_time: if isinstance(departure_time, datetime): request_body["departureTime"] = departure_time.isoformat() elif departure_time == "now": request_body["departureTime"] = datetime.now(timezone.utc).isoformat() # ✅ Field Mask field_mask_parts = [ "routes.duration", "routes.distanceMeters", "routes.polyline.encodedPolyline", "routes.legs", "routes.optimizedIntermediateWaypointIndex" ] if include_traffic_on_polyline: field_mask_parts.extend([ "routes.travelAdvisory", "routes.legs.travelAdvisory" ]) field_mask = ",".join(field_mask_parts) headers = { "Content-Type": "application/json", "X-Goog-Api-Key": self.api_key, "X-Goog-FieldMask": field_mask } logger.debug( f"🗺️ Computing route: {origin['lat']:.4f},{origin['lng']:.4f} → " f"{destination['lat']:.4f},{destination['lng']:.4f}, {request_body['travelMode']} " f"({len(waypoints) if waypoints else 0} waypoints)" ) try: response = requests.post( self.compute_routes_url, json=request_body, headers=headers, timeout=30 # ✅ 增加超時時間 ) if not response.ok: logger.error(f"❌ Routes API error: {response.text}") response.raise_for_status() data = response.json() if hasattr(response, 'json'): try: data = response.json() # 📦 拆開包裹,取得字典資料 except ValueError: print("❌ API 回傳的內容不是有效的 JSON") data = {} # 如果它已經是 list 或 dict (例如某些 client library 會自動轉),就直接用 elif isinstance(response, (dict, list)): data = response else: # 防呆:未知的格式 data = {} # [轉接] 確保格式符合 Parser 需求 (Google Maps API 通常回傳 dict,但也可能包在 list 裡) if isinstance(data, list): formatted_response = {"routes": data} else: formatted_response = data if "routes" not in data or not data["routes"]: logger.error("❌ No routes found in API response") raise ValueError("No routes found") route = data["routes"][0] distance_meters = route.get("distanceMeters", 0) duration_str = route.get("duration", "0s") duration_seconds = int(duration_str.rstrip("s")) polyline_data = route.get("polyline", {}) encoded_polyline = polyline_data.get("encodedPolyline", "") legs = route.get("legs", []) optimized_order = route.get("optimizedIntermediateWaypointIndex") result = { "distance_meters": distance_meters, "duration_seconds": duration_seconds, "encoded_polyline": encoded_polyline, "legs": legs, "optimized_waypoint_order": optimized_order } logger.debug( f"✅ Route computed: {distance_meters}m, {duration_seconds}s" ) return result except requests.Timeout: logger.error("⏰ Routes request timeout") raise except requests.RequestException as e: logger.error(f"❌ Routes request failed: {e}") raise except (KeyError, ValueError) as e: logger.error(f"❌ Failed to parse route response: {e}") raise ValueError(f"Invalid route response: {e}") except Exception as e: logger.error(f"❌ Unexpected error in _compute_routes: {e}") raise # ======================================================================== # Multi-leg Route Computation # ======================================================================== @staticmethod def _encode_polyline(points: List[Tuple[float, float]]) -> str: def encode_coord(coord): coord = int(round(coord * 1e5)) coord <<= 1 if coord < 0: coord = ~coord result = "" while coord >= 0x20: result += chr((0x20 | (coord & 0x1f)) + 63) coord >>= 5 result += chr(coord + 63) return result encoded = "" last_lat, last_lng = 0, 0 for lat, lng in points: d_lat = lat - last_lat d_lng = lng - last_lng encoded += encode_coord(d_lat) + encode_coord(d_lng) last_lat, last_lng = lat, lng return encoded def compute_routes( self, place_points: List[Dict[str, float]], start_time: Union[datetime, str], stop_times: List[int], travel_mode: Union[str, List[str]] = "DRIVE", routing_preference: str = "TRAFFIC_AWARE_OPTIMAL" ) -> Dict[str, Any]: if len(place_points) < 2: raise ValueError("At least 2 places required for route computation") # ... (驗證 stop_times 長度與時間初始化代碼保持不變) ... # 處理開始時間 if start_time == "now" or start_time is None: start_time = datetime.now(timezone.utc) current_time = datetime.now(timezone.utc) elif isinstance(start_time, datetime): current_time = start_time.astimezone(timezone.utc) else: raise ValueError(f"Invalid start_time type: {type(start_time)}") legs_info = [] total_distance = 0 total_duration = 0 place_points = clean_dict_list_keys(place_points) num_legs = len(place_points) - 1 for i in range(num_legs): origin = place_points[i] destination = place_points[i + 1] # 1. 決定當前首選模式 if isinstance(travel_mode, list): primary_mode = travel_mode[i].upper() else: primary_mode = travel_mode.upper() modes_to_try = [] if primary_mode == "DRIVE": # 開車失敗 -> 試試機車 -> 再不行試試公車 (適合跨海/長途) -> 最後才走路 modes_to_try = ["DRIVE", "TWO_WHEELER", "TRANSIT", "WALK"] elif primary_mode == "TWO_WHEELER": modes_to_try = ["TWO_WHEELER", "TRANSIT", "WALK"] elif primary_mode == "TRANSIT": modes_to_try = ["TRANSIT", "WALK"] else: modes_to_try = ["WALK"] route_found = False final_mode_used = primary_mode leg_distance = 0 leg_duration = 0 encoded_polyline = "" # 3. 開始嘗試 for mode in modes_to_try: # 設定 Preference # ✅ TRANSIT 不支援 traffic_aware,必須設為 None if mode in ["DRIVE", "TWO_WHEELER"]: pref = None if routing_preference == "UNDRIVE" else routing_preference.upper() else: pref = None departure_time = current_time if i > 0 else None try: # 呼叫 API route = self._compute_routes( origin=origin, destination=destination, waypoints=None, travel_mode=mode, routing_preference=pref, include_traffic_on_polyline=True, # ✅ TRANSIT 模式非常依賴 departure_time,務必確保有傳入 departure_time=departure_time ) leg_distance = route["distance_meters"] leg_duration = route["duration_seconds"] encoded_polyline = route["encoded_polyline"] route_found = True final_mode_used = mode if mode != primary_mode: logger.info(f" 🔄 Leg {i + 1}: {primary_mode} failed. Switched to {mode} (Polyline OK).") break except Exception: continue # 4. 絕望情況:連走路都失敗 (Math Fallback) if not route_found: logger.warning(f"⚠️ Leg {i + 1}: All API modes (DRIVE/TWO_WHEELER/TRANSIT/WALK) failed.") logger.warning( f" 📍 From: {origin['lat']},{origin['lng']} -> To: {destination['lat']},{destination['lng']}") logger.warning(f" 🔄 Activating Math Fallback (Straight Line).") # 計算直線距離 leg_distance = self._calculate_haversine_distance( origin['lat'], origin['lng'], destination['lat'], destination['lng'] ) # 估算時間 (走路速度 5km/h) speed = 1.38 leg_duration = int(leg_distance / speed) # ✅ [FIX] 生成「直線 Polyline」給前端 # 雖然是直線,但至少前端不會因為空字串而報錯 encoded_polyline = self._encode_polyline([ (origin['lat'], origin['lng']), (destination['lat'], destination['lng']) ]) final_mode_used = "MATH_ESTIMATE" # --- 累加數據 --- total_distance += leg_distance total_duration += leg_duration legs_info.append({ "from_index": i, "to_index": i + 1, "travel_mode": final_mode_used, "distance_meters": leg_distance, "duration_seconds": leg_duration, "departure_time": current_time.isoformat(), "polyline": encoded_polyline # ✅ 現在這裡保證有值 }) # 更新時間 (使用秒數) current_time += timedelta(seconds=leg_duration) stop_duration = stop_times[i + 1] current_time += timedelta(seconds=stop_duration) return { "total_distance_meters": total_distance, "total_duration_seconds": total_duration, "total_residence_time_minutes": sum(stop_times) // 60, "total_time_seconds": int((current_time - start_time).total_seconds()), "start_time": start_time, "end_time": current_time, "stops": place_points, "legs": legs_info, } # ======================================================================== # Distance Matrix API # ======================================================================== def _compute_route_matrix( self, origins: List[Dict[str, float]], destinations: List[Dict[str, float]], travel_mode: str = "DRIVE", routing_preference: str = "TRAFFIC_UNAWARE", departure_time: Optional[datetime] = None ) -> Dict[str, Any]: """ Internal method to compute distance matrix Args: origins: List of origin location dicts destinations: List of destination location dicts travel_mode: Travel mode routing_preference: Routing preference departure_time: Departure time Returns: API response with distance matrix data Raises: requests.RequestException: If API request fails """ request_body = { "origins": [ { "waypoint": { "location": { "latLng": { "latitude": origin["lat"], "longitude": origin["lng"] } } } } for origin in origins ], "destinations": [ { "waypoint": { "location": { "latLng": { "latitude": dest["lat"], "longitude": dest["lng"] } } } } for dest in destinations ], "travelMode": travel_mode.upper(), "routingPreference": routing_preference.upper() } # ✅ 添加 departure_time if departure_time: if isinstance(departure_time, datetime): request_body["departureTime"] = departure_time.isoformat() elif departure_time == "now": request_body["departureTime"] = datetime.now(timezone.utc).isoformat() field_mask = "originIndex,destinationIndex,distanceMeters,duration" headers = { "Content-Type": "application/json", "X-Goog-Api-Key": self.api_key, "X-Goog-FieldMask": field_mask } logger.debug( f"📏 Computing route matrix: {len(origins)} × {len(destinations)}" ) try: response = requests.post( self.compute_route_matrix_url, json=request_body, headers=headers, timeout=30 ) response.raise_for_status() data = response.json() if "error" in data: raise ValueError(f"API Error: {data['error']}") if isinstance(data, list): return data elif "results" in data: return data["results"] else: # 兜底:返回空數組 logger.warning(f"⚠️ Unexpected Distance Matrix response format: {data.keys()}") return [] except requests.Timeout: logger.error("⏰ Route matrix request timeout") raise except requests.RequestException as e: logger.error(f"❌ Route matrix request failed: {e}") raise e except Exception as e: logger.error(f"❌ Unexpected error in _compute_route_matrix: {e}") raise def _get_max_elements(self, travel_mode: str, routing_preference: str) -> int: """ Get max elements per request based on travel mode and preference Args: travel_mode: Travel mode routing_preference: Routing preference Returns: Max elements allowed """ if travel_mode.upper() == "TRANSIT": return 100 if routing_preference and routing_preference.upper() == "TRAFFIC_AWARE_OPTIMAL": return 100 return 625 def compute_route_matrix( self, origins: List[Dict[str, float]], destinations: List[Dict[str, float]], travel_mode: str = "DRIVE", routing_preference: Optional[str] = None, departure_time: Optional[datetime] = None, ) -> Dict[str, np.ndarray]: """ ✅ P1-1 修復: 正確調用 _compute_route_matrix Compute distance and duration matrix between multiple locations Args: origins: List of origin location dicts with 'lat' and 'lng' destinations: List of destination location dicts with 'lat' and 'lng' travel_mode: Travel mode (DRIVE, WALK, BICYCLE, TRANSIT) routing_preference: Routing preference (optional) departure_time: Departure time as datetime object (optional) Returns: Dict with numpy arrays: - duration_matrix: Duration in seconds (shape: O×D) - distance_matrix: Distance in meters (shape: O×D) Raises: ValueError: If input is invalid requests.RequestException: If API request fails """ if not origins or not destinations: raise ValueError("Origins and destinations cannot be empty") max_elements = self._get_max_elements(travel_mode, routing_preference or "") O_len, D_len = len(origins), len(destinations) duration_matrix = np.full((O_len, D_len), -1, dtype=np.int32) distance_matrix = np.full((O_len, D_len), -1, dtype=np.int32) logger.info( f"📊 Computing {O_len}×{D_len} matrix " f"(max_elements={max_elements})" ) # ✅ 分批處理 for o_start in range(0, O_len, MAX_LOCATIONS_PER_SIDE): o_end = min(o_start + MAX_LOCATIONS_PER_SIDE, O_len) origin_batch = origins[o_start:o_end] origin_batch_size = len(origin_batch) max_dest_batch_size = max( 1, min( MAX_LOCATIONS_PER_SIDE, min(D_len, max_elements // origin_batch_size) ), ) for d_start in range(0, D_len, max_dest_batch_size): d_end = min(d_start + max_dest_batch_size, D_len) dest_batch = destinations[d_start:d_end] try: time.sleep(0.2) elements = self._compute_route_matrix( origins=origin_batch, destinations=dest_batch, travel_mode=travel_mode, routing_preference=routing_preference or "TRAFFIC_UNAWARE", departure_time=departure_time, ) if not elements: logger.warning( f"⚠️ No elements in batch " f"[{o_start}:{o_end}, {d_start}:{d_end}]" ) continue for el in elements: if "originIndex" not in el or "destinationIndex" not in el: continue oi = o_start + el["originIndex"] di = d_start + el["destinationIndex"] # ✅ 解析時間 (格式: "300s") duration_str = el.get("duration", "0s") duration_seconds = int(duration_str.rstrip("s")) # ✅ 解析距離 distance_meters = el.get("distanceMeters", 0) duration_matrix[oi, di] = duration_seconds distance_matrix[oi, di] = distance_meters except Exception as e: if "429" in str(e) or "Too Many Requests" in str(e): logger.error(f"🚨 Rate Limit Hit! Sleeping for 5s...") time.sleep(5) logger.error( f"❌ Failed to compute matrix batch " f"[{o_start}:{o_end}, {d_start}:{d_end}]: {e}") logger.info( f"✅ Route matrix computed: " f"{np.sum(duration_matrix >= 0)} / {O_len * D_len} elements" ) return { "duration_matrix": duration_matrix, "distance_matrix": distance_matrix, } # ============================================================================ # Usage Example # ============================================================================ if __name__ == "__main__": import os from src.infra.config import get_settings # Initialize service setting = get_settings() api_key = setting.google_maps_api_key if not api_key: print("❌ Please set GOOGLE_MAPS_API_KEY environment variable") exit(1) service = GoogleMapAPIService(api_key=api_key) # Example 1: Current location try: print("\n" + "="*60) print("Example 1: Current Location") print("="*60) location = service.current_location() print(f"📍 Current: {location['location']}") except Exception as e: print(f"❌ Failed: {e}") # Example 2: Text search try: print("\n" + "="*60) print("Example 2: Text Search") print("="*60) places = service.text_search( query="台大醫院", location={"lat": 25.0408, "lng": 121.5318}, radius=5000, limit=3 ) for i, place in enumerate(places, 1): print(f"{i}. {place['name']} - {place['address']}, {place['place_id']}") except Exception as e: print(f"❌ Failed: {e}") # Example 3: Compute route try: print("\n" + "="*60) print("Example 3: Compute Route") print("="*60) places = [ {"lat": 25.0408, "lng": 121.5318}, # 台北車站 {"lat": 25.0330, "lng": 121.5654}, # 台北101 {"lat": 25.0478, "lng": 121.5170}, # 西門町 ] route = service.compute_routes( place_points=places, start_time=datetime.now(timezone.utc), stop_times=[30, 60, 50], # 停留時間 (分鐘) travel_mode="DRIVE", routing_preference="TRAFFIC_AWARE" ##"TRAFFIC_AWARE_OPTIMAL" ) print(f"📊 Total distance: {route['total_distance_meters']}m") print(f"⏱️ Total duration: {route['total_duration_seconds']}s") print(f"🏁 End time: {route['end_time']}") except Exception as e: print(f"❌ Failed: {e}") raise e # Example 4: Distance matrix try: print("\n" + "="*60) print("Example 4: Distance Matrix") print("="*60) origins = [ {"lat": 25.0408, "lng": 121.5318}, {"lat": 25.0330, "lng": 121.5654}, ] destinations = [ {"lat": 25.0478, "lng": 121.5170}, {"lat": 25.0360, "lng": 121.5000}, ] matrices = service.compute_route_matrix( origins=origins, destinations=destinations, travel_mode="WALK" ) print("Duration matrix (seconds):") print(matrices["duration_matrix"]) print("\nDistance matrix (meters):") print(matrices["distance_matrix"]) except Exception as e: print(f"❌ Failed: {e}") print("\n✅ All examples completed!")