Spaces:
Running
Running
| import time | |
| import httpx # type: ignore | |
| import asyncio # type: ignore | |
| from google.transit import gtfs_realtime_pb2 # type: ignore | |
| from dotenv import load_dotenv # type: ignore | |
| import os # type: ignore | |
| load_dotenv() | |
| TTC_VEHICLES_URL = os.getenv("GTFS_RT_URL") | |
| TTC_TRIPS_URL = os.getenv("GTFS_DELAY_URL") | |
| # Support both GTFS_ALERT_URL and GTFS_ALERTS_URL for compatibility | |
| TTC_ALERTS_URL = os.getenv("GTFS_ALERTS_URL") or os.getenv("GTFS_ALERT_URL") | |
| if not TTC_VEHICLES_URL: | |
| raise ValueError("GTFS_RT_URL is not set") | |
| if not TTC_TRIPS_URL: | |
| raise ValueError("GTFS_DELAY_URL is not set") | |
| if not TTC_ALERTS_URL: | |
| raise ValueError("GTFS_ALERTS_URL or GTFS_ALERT_URL is not set") | |
| CAUSE_MAP = {1: "Weather", 2: "Holiday", 4: "Accident", 7: "Technical Problem", 11: "Police Activity"} | |
| EFFECT_MAP = {1: "No Service", 3: "Significant Delays", 4: "Detour", 8: "Unknown Effect"} | |
| class AsyncBusCache: | |
| def __init__(self, ttl=20): | |
| self.ttl = ttl | |
| self._data = None | |
| self._last_updated = 0 | |
| async def get_data(self): | |
| if self._data and (time.time() - self._last_updated) < self.ttl: | |
| return self._data | |
| return await self._refresh() | |
| async def _refresh(self): | |
| try: | |
| async with httpx.AsyncClient() as client: | |
| # 1. Fetch ALL feeds at once | |
| v_res, t_res, a_res = await asyncio.gather( | |
| client.get(TTC_VEHICLES_URL, timeout=10), | |
| client.get(TTC_TRIPS_URL, timeout=10), | |
| client.get(TTC_ALERTS_URL, timeout=10) # The new alerts feed | |
| ) | |
| # 2. Parse Predictions (Store ALL future stops) | |
| t_feed = gtfs_realtime_pb2.FeedMessage() | |
| t_feed.ParseFromString(t_res.content) | |
| # Map: { "trip_id": { "stop_id_1": time, "stop_id_2": time, ... } } | |
| prediction_map = {} | |
| for entity in t_feed.entity: | |
| if entity.HasField('trip_update'): | |
| tu = entity.trip_update | |
| trip_id = str(tu.trip.trip_id) | |
| # Store every stop in the remainder of the trip | |
| prediction_map[trip_id] = { | |
| str(stu.stop_id): (stu.departure.time if stu.HasField('departure') else stu.arrival.time) | |
| for stu in tu.stop_time_update | |
| } | |
| # 3. Parse Vehicle Positions | |
| v_feed = gtfs_realtime_pb2.FeedMessage() | |
| v_feed.ParseFromString(v_res.content) | |
| processed_buses = [] | |
| for entity in v_feed.entity: | |
| if entity.HasField('vehicle'): | |
| v = entity.vehicle | |
| t_id = str(v.trip.trip_id) | |
| # Get all predictions for this trip | |
| trip_predictions = prediction_map.get(t_id, {}) | |
| # Get the first stop (next stop) for backward compatibility | |
| next_stop_id = None | |
| predicted_time = None | |
| if trip_predictions: | |
| # Get the first stop in the predictions (sorted by time) | |
| sorted_stops = sorted(trip_predictions.items(), key=lambda x: x[1]) | |
| if sorted_stops: | |
| next_stop_id, predicted_time = sorted_stops[0] | |
| processed_buses.append({ | |
| "id": v.vehicle.id, | |
| "route": v.trip.route_id, | |
| "trip_id": t_id, | |
| "lat": round(v.position.latitude, 6), | |
| "lon": round(v.position.longitude, 6), | |
| "occupancy": v.occupancy_status, | |
| "next_stop_id": next_stop_id, | |
| "predicted_time": predicted_time, | |
| "predictions": trip_predictions # Store all predictions | |
| }) | |
| # 4. Parse Alerts | |
| a_feed = gtfs_realtime_pb2.FeedMessage() | |
| a_feed.ParseFromString(a_res.content) | |
| # Mapping: { "route_id": [ {header, description, effect}, ... ] } | |
| route_alerts = {} | |
| for entity in a_feed.entity: | |
| if entity.HasField('alert'): | |
| alert = entity.alert | |
| # Extract English translations | |
| header = next((t.text for t in alert.header_text.translation if t.language == "en"), "No Header") | |
| description = next((t.text for t in alert.description_text.translation if t.language == "en"), "") | |
| # Map cause and effect codes to human-readable strings | |
| cause_code = int(alert.cause) if alert.HasField('cause') else None | |
| effect_code = int(alert.effect) if alert.HasField('effect') else None | |
| alert_payload = { | |
| "header": header, | |
| "description": description, | |
| "cause": CAUSE_MAP.get(cause_code, "Unknown") if cause_code is not None else "Unknown", | |
| "effect": EFFECT_MAP.get(effect_code, "Unknown") if effect_code is not None else "Unknown", | |
| "severity": "HIGH" if effect_code == 1 else "MEDIUM" | |
| } | |
| # Link alert to every route it mentions | |
| for ie in alert.informed_entity: | |
| if ie.HasField('route_id'): | |
| rid = str(ie.route_id) | |
| if rid not in route_alerts: | |
| route_alerts[rid] = [] | |
| route_alerts[rid].append(alert_payload) | |
| self._data = { | |
| "vehicles": processed_buses, | |
| "predictions": prediction_map, | |
| "alerts": route_alerts # Add this to your cache data | |
| } | |
| self._last_updated = time.time() | |
| print(f"--- Cache Refreshed: {len(processed_buses)} buses, {len(route_alerts)} routes with alerts ---") | |
| return self._data | |
| except Exception as e: | |
| print(f"Async fetch failed: {e}") | |
| return self._data if self._data else [] |