WMB2Backened / api /bus_cache.py
42Cummer's picture
Upload 9 files
0170ac5 verified
import time
import httpx # type: ignore
import asyncio # type: ignore
from google.transit import gtfs_realtime_pb2 # type: ignore
from dotenv import load_dotenv # type: ignore
import os # type: ignore
load_dotenv()
TTC_VEHICLES_URL = os.getenv("GTFS_RT_URL")
TTC_TRIPS_URL = os.getenv("GTFS_DELAY_URL")
# Support both GTFS_ALERT_URL and GTFS_ALERTS_URL for compatibility
TTC_ALERTS_URL = os.getenv("GTFS_ALERTS_URL") or os.getenv("GTFS_ALERT_URL")
if not TTC_VEHICLES_URL:
raise ValueError("GTFS_RT_URL is not set")
if not TTC_TRIPS_URL:
raise ValueError("GTFS_DELAY_URL is not set")
if not TTC_ALERTS_URL:
raise ValueError("GTFS_ALERTS_URL or GTFS_ALERT_URL is not set")
CAUSE_MAP = {1: "Weather", 2: "Holiday", 4: "Accident", 7: "Technical Problem", 11: "Police Activity"}
EFFECT_MAP = {1: "No Service", 3: "Significant Delays", 4: "Detour", 8: "Unknown Effect"}
class AsyncBusCache:
def __init__(self, ttl=20):
self.ttl = ttl
self._data = None
self._last_updated = 0
async def get_data(self):
if self._data and (time.time() - self._last_updated) < self.ttl:
return self._data
return await self._refresh()
async def _refresh(self):
try:
async with httpx.AsyncClient() as client:
# 1. Fetch ALL feeds at once
v_res, t_res, a_res = await asyncio.gather(
client.get(TTC_VEHICLES_URL, timeout=10),
client.get(TTC_TRIPS_URL, timeout=10),
client.get(TTC_ALERTS_URL, timeout=10) # The new alerts feed
)
# 2. Parse Predictions (Store ALL future stops)
t_feed = gtfs_realtime_pb2.FeedMessage()
t_feed.ParseFromString(t_res.content)
# Map: { "trip_id": { "stop_id_1": time, "stop_id_2": time, ... } }
prediction_map = {}
for entity in t_feed.entity:
if entity.HasField('trip_update'):
tu = entity.trip_update
trip_id = str(tu.trip.trip_id)
# Store every stop in the remainder of the trip
prediction_map[trip_id] = {
str(stu.stop_id): (stu.departure.time if stu.HasField('departure') else stu.arrival.time)
for stu in tu.stop_time_update
}
# 3. Parse Vehicle Positions
v_feed = gtfs_realtime_pb2.FeedMessage()
v_feed.ParseFromString(v_res.content)
processed_buses = []
for entity in v_feed.entity:
if entity.HasField('vehicle'):
v = entity.vehicle
t_id = str(v.trip.trip_id)
# Get all predictions for this trip
trip_predictions = prediction_map.get(t_id, {})
# Get the first stop (next stop) for backward compatibility
next_stop_id = None
predicted_time = None
if trip_predictions:
# Get the first stop in the predictions (sorted by time)
sorted_stops = sorted(trip_predictions.items(), key=lambda x: x[1])
if sorted_stops:
next_stop_id, predicted_time = sorted_stops[0]
processed_buses.append({
"id": v.vehicle.id,
"route": v.trip.route_id,
"trip_id": t_id,
"lat": round(v.position.latitude, 6),
"lon": round(v.position.longitude, 6),
"occupancy": v.occupancy_status,
"next_stop_id": next_stop_id,
"predicted_time": predicted_time,
"predictions": trip_predictions # Store all predictions
})
# 4. Parse Alerts
a_feed = gtfs_realtime_pb2.FeedMessage()
a_feed.ParseFromString(a_res.content)
# Mapping: { "route_id": [ {header, description, effect}, ... ] }
route_alerts = {}
for entity in a_feed.entity:
if entity.HasField('alert'):
alert = entity.alert
# Extract English translations
header = next((t.text for t in alert.header_text.translation if t.language == "en"), "No Header")
description = next((t.text for t in alert.description_text.translation if t.language == "en"), "")
# Map cause and effect codes to human-readable strings
cause_code = int(alert.cause) if alert.HasField('cause') else None
effect_code = int(alert.effect) if alert.HasField('effect') else None
alert_payload = {
"header": header,
"description": description,
"cause": CAUSE_MAP.get(cause_code, "Unknown") if cause_code is not None else "Unknown",
"effect": EFFECT_MAP.get(effect_code, "Unknown") if effect_code is not None else "Unknown",
"severity": "HIGH" if effect_code == 1 else "MEDIUM"
}
# Link alert to every route it mentions
for ie in alert.informed_entity:
if ie.HasField('route_id'):
rid = str(ie.route_id)
if rid not in route_alerts:
route_alerts[rid] = []
route_alerts[rid].append(alert_payload)
self._data = {
"vehicles": processed_buses,
"predictions": prediction_map,
"alerts": route_alerts # Add this to your cache data
}
self._last_updated = time.time()
print(f"--- Cache Refreshed: {len(processed_buses)} buses, {len(route_alerts)} routes with alerts ---")
return self._data
except Exception as e:
print(f"Async fetch failed: {e}")
return self._data if self._data else []