Spaces:
Running
Running
File size: 6,404 Bytes
0170ac5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
import time
import httpx # type: ignore
import asyncio # type: ignore
from google.transit import gtfs_realtime_pb2 # type: ignore
from dotenv import load_dotenv # type: ignore
import os # type: ignore
load_dotenv()
TTC_VEHICLES_URL = os.getenv("GTFS_RT_URL")
TTC_TRIPS_URL = os.getenv("GTFS_DELAY_URL")
# Support both GTFS_ALERT_URL and GTFS_ALERTS_URL for compatibility
TTC_ALERTS_URL = os.getenv("GTFS_ALERTS_URL") or os.getenv("GTFS_ALERT_URL")
if not TTC_VEHICLES_URL:
raise ValueError("GTFS_RT_URL is not set")
if not TTC_TRIPS_URL:
raise ValueError("GTFS_DELAY_URL is not set")
if not TTC_ALERTS_URL:
raise ValueError("GTFS_ALERTS_URL or GTFS_ALERT_URL is not set")
CAUSE_MAP = {1: "Weather", 2: "Holiday", 4: "Accident", 7: "Technical Problem", 11: "Police Activity"}
EFFECT_MAP = {1: "No Service", 3: "Significant Delays", 4: "Detour", 8: "Unknown Effect"}
class AsyncBusCache:
def __init__(self, ttl=20):
self.ttl = ttl
self._data = None
self._last_updated = 0
async def get_data(self):
if self._data and (time.time() - self._last_updated) < self.ttl:
return self._data
return await self._refresh()
async def _refresh(self):
try:
async with httpx.AsyncClient() as client:
# 1. Fetch ALL feeds at once
v_res, t_res, a_res = await asyncio.gather(
client.get(TTC_VEHICLES_URL, timeout=10),
client.get(TTC_TRIPS_URL, timeout=10),
client.get(TTC_ALERTS_URL, timeout=10) # The new alerts feed
)
# 2. Parse Predictions (Store ALL future stops)
t_feed = gtfs_realtime_pb2.FeedMessage()
t_feed.ParseFromString(t_res.content)
# Map: { "trip_id": { "stop_id_1": time, "stop_id_2": time, ... } }
prediction_map = {}
for entity in t_feed.entity:
if entity.HasField('trip_update'):
tu = entity.trip_update
trip_id = str(tu.trip.trip_id)
# Store every stop in the remainder of the trip
prediction_map[trip_id] = {
str(stu.stop_id): (stu.departure.time if stu.HasField('departure') else stu.arrival.time)
for stu in tu.stop_time_update
}
# 3. Parse Vehicle Positions
v_feed = gtfs_realtime_pb2.FeedMessage()
v_feed.ParseFromString(v_res.content)
processed_buses = []
for entity in v_feed.entity:
if entity.HasField('vehicle'):
v = entity.vehicle
t_id = str(v.trip.trip_id)
# Get all predictions for this trip
trip_predictions = prediction_map.get(t_id, {})
# Get the first stop (next stop) for backward compatibility
next_stop_id = None
predicted_time = None
if trip_predictions:
# Get the first stop in the predictions (sorted by time)
sorted_stops = sorted(trip_predictions.items(), key=lambda x: x[1])
if sorted_stops:
next_stop_id, predicted_time = sorted_stops[0]
processed_buses.append({
"id": v.vehicle.id,
"route": v.trip.route_id,
"trip_id": t_id,
"lat": round(v.position.latitude, 6),
"lon": round(v.position.longitude, 6),
"occupancy": v.occupancy_status,
"next_stop_id": next_stop_id,
"predicted_time": predicted_time,
"predictions": trip_predictions # Store all predictions
})
# 4. Parse Alerts
a_feed = gtfs_realtime_pb2.FeedMessage()
a_feed.ParseFromString(a_res.content)
# Mapping: { "route_id": [ {header, description, effect}, ... ] }
route_alerts = {}
for entity in a_feed.entity:
if entity.HasField('alert'):
alert = entity.alert
# Extract English translations
header = next((t.text for t in alert.header_text.translation if t.language == "en"), "No Header")
description = next((t.text for t in alert.description_text.translation if t.language == "en"), "")
# Map cause and effect codes to human-readable strings
cause_code = int(alert.cause) if alert.HasField('cause') else None
effect_code = int(alert.effect) if alert.HasField('effect') else None
alert_payload = {
"header": header,
"description": description,
"cause": CAUSE_MAP.get(cause_code, "Unknown") if cause_code is not None else "Unknown",
"effect": EFFECT_MAP.get(effect_code, "Unknown") if effect_code is not None else "Unknown",
"severity": "HIGH" if effect_code == 1 else "MEDIUM"
}
# Link alert to every route it mentions
for ie in alert.informed_entity:
if ie.HasField('route_id'):
rid = str(ie.route_id)
if rid not in route_alerts:
route_alerts[rid] = []
route_alerts[rid].append(alert_payload)
self._data = {
"vehicles": processed_buses,
"predictions": prediction_map,
"alerts": route_alerts # Add this to your cache data
}
self._last_updated = time.time()
print(f"--- Cache Refreshed: {len(processed_buses)} buses, {len(route_alerts)} routes with alerts ---")
return self._data
except Exception as e:
print(f"Async fetch failed: {e}")
return self._data if self._data else [] |