import time import httpx # type: ignore import asyncio # type: ignore from google.transit import gtfs_realtime_pb2 # type: ignore from dotenv import load_dotenv # type: ignore import os # type: ignore load_dotenv() TTC_VEHICLES_URL = os.getenv("GTFS_RT_URL") TTC_TRIPS_URL = os.getenv("GTFS_DELAY_URL") # Support both GTFS_ALERT_URL and GTFS_ALERTS_URL for compatibility TTC_ALERTS_URL = os.getenv("GTFS_ALERTS_URL") or os.getenv("GTFS_ALERT_URL") if not TTC_VEHICLES_URL: raise ValueError("GTFS_RT_URL is not set") if not TTC_TRIPS_URL: raise ValueError("GTFS_DELAY_URL is not set") if not TTC_ALERTS_URL: raise ValueError("GTFS_ALERTS_URL or GTFS_ALERT_URL is not set") CAUSE_MAP = {1: "Weather", 2: "Holiday", 4: "Accident", 7: "Technical Problem", 11: "Police Activity"} EFFECT_MAP = {1: "No Service", 3: "Significant Delays", 4: "Detour", 8: "Unknown Effect"} class AsyncBusCache: def __init__(self, ttl=20): self.ttl = ttl self._data = None self._last_updated = 0 async def get_data(self): if self._data and (time.time() - self._last_updated) < self.ttl: return self._data return await self._refresh() async def _refresh(self): try: async with httpx.AsyncClient() as client: # 1. Fetch ALL feeds at once v_res, t_res, a_res = await asyncio.gather( client.get(TTC_VEHICLES_URL, timeout=10), client.get(TTC_TRIPS_URL, timeout=10), client.get(TTC_ALERTS_URL, timeout=10) # The new alerts feed ) # 2. Parse Predictions (Store ALL future stops) t_feed = gtfs_realtime_pb2.FeedMessage() t_feed.ParseFromString(t_res.content) # Map: { "trip_id": { "stop_id_1": time, "stop_id_2": time, ... } } prediction_map = {} for entity in t_feed.entity: if entity.HasField('trip_update'): tu = entity.trip_update trip_id = str(tu.trip.trip_id) # Store every stop in the remainder of the trip prediction_map[trip_id] = { str(stu.stop_id): (stu.departure.time if stu.HasField('departure') else stu.arrival.time) for stu in tu.stop_time_update } # 3. Parse Vehicle Positions v_feed = gtfs_realtime_pb2.FeedMessage() v_feed.ParseFromString(v_res.content) processed_buses = [] for entity in v_feed.entity: if entity.HasField('vehicle'): v = entity.vehicle t_id = str(v.trip.trip_id) # Get all predictions for this trip trip_predictions = prediction_map.get(t_id, {}) # Get the first stop (next stop) for backward compatibility next_stop_id = None predicted_time = None if trip_predictions: # Get the first stop in the predictions (sorted by time) sorted_stops = sorted(trip_predictions.items(), key=lambda x: x[1]) if sorted_stops: next_stop_id, predicted_time = sorted_stops[0] processed_buses.append({ "id": v.vehicle.id, "route": v.trip.route_id, "trip_id": t_id, "lat": round(v.position.latitude, 6), "lon": round(v.position.longitude, 6), "occupancy": v.occupancy_status, "next_stop_id": next_stop_id, "predicted_time": predicted_time, "predictions": trip_predictions # Store all predictions }) # 4. Parse Alerts a_feed = gtfs_realtime_pb2.FeedMessage() a_feed.ParseFromString(a_res.content) # Mapping: { "route_id": [ {header, description, effect}, ... ] } route_alerts = {} for entity in a_feed.entity: if entity.HasField('alert'): alert = entity.alert # Extract English translations header = next((t.text for t in alert.header_text.translation if t.language == "en"), "No Header") description = next((t.text for t in alert.description_text.translation if t.language == "en"), "") # Map cause and effect codes to human-readable strings cause_code = int(alert.cause) if alert.HasField('cause') else None effect_code = int(alert.effect) if alert.HasField('effect') else None alert_payload = { "header": header, "description": description, "cause": CAUSE_MAP.get(cause_code, "Unknown") if cause_code is not None else "Unknown", "effect": EFFECT_MAP.get(effect_code, "Unknown") if effect_code is not None else "Unknown", "severity": "HIGH" if effect_code == 1 else "MEDIUM" } # Link alert to every route it mentions for ie in alert.informed_entity: if ie.HasField('route_id'): rid = str(ie.route_id) if rid not in route_alerts: route_alerts[rid] = [] route_alerts[rid].append(alert_payload) self._data = { "vehicles": processed_buses, "predictions": prediction_map, "alerts": route_alerts # Add this to your cache data } self._last_updated = time.time() print(f"--- Cache Refreshed: {len(processed_buses)} buses, {len(route_alerts)} routes with alerts ---") return self._data except Exception as e: print(f"Async fetch failed: {e}") return self._data if self._data else []