File size: 6,404 Bytes
0170ac5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import time
import httpx # type: ignore
import asyncio # type: ignore
from google.transit import gtfs_realtime_pb2 # type: ignore
from dotenv import load_dotenv # type: ignore
import os # type: ignore

load_dotenv()

TTC_VEHICLES_URL = os.getenv("GTFS_RT_URL")
TTC_TRIPS_URL = os.getenv("GTFS_DELAY_URL")
# Support both GTFS_ALERT_URL and GTFS_ALERTS_URL for compatibility
TTC_ALERTS_URL = os.getenv("GTFS_ALERTS_URL") or os.getenv("GTFS_ALERT_URL")

if not TTC_VEHICLES_URL:
    raise ValueError("GTFS_RT_URL is not set")
if not TTC_TRIPS_URL:
    raise ValueError("GTFS_DELAY_URL is not set")
if not TTC_ALERTS_URL:
    raise ValueError("GTFS_ALERTS_URL or GTFS_ALERT_URL is not set")

CAUSE_MAP = {1: "Weather", 2: "Holiday", 4: "Accident", 7: "Technical Problem", 11: "Police Activity"}
EFFECT_MAP = {1: "No Service", 3: "Significant Delays", 4: "Detour", 8: "Unknown Effect"}

class AsyncBusCache:
    def __init__(self, ttl=20):
        self.ttl = ttl
        self._data = None
        self._last_updated = 0

    async def get_data(self):
        if self._data and (time.time() - self._last_updated) < self.ttl:
            return self._data
        return await self._refresh()

    async def _refresh(self):
        try:
            async with httpx.AsyncClient() as client:
                # 1. Fetch ALL feeds at once
                v_res, t_res, a_res = await asyncio.gather(
                    client.get(TTC_VEHICLES_URL, timeout=10),
                    client.get(TTC_TRIPS_URL, timeout=10),
                    client.get(TTC_ALERTS_URL, timeout=10)  # The new alerts feed
                )

            # 2. Parse Predictions (Store ALL future stops)
            t_feed = gtfs_realtime_pb2.FeedMessage()
            t_feed.ParseFromString(t_res.content)
            
            # Map: { "trip_id": { "stop_id_1": time, "stop_id_2": time, ... } }
            prediction_map = {}
            for entity in t_feed.entity:
                if entity.HasField('trip_update'):
                    tu = entity.trip_update
                    trip_id = str(tu.trip.trip_id)
                    
                    # Store every stop in the remainder of the trip
                    prediction_map[trip_id] = {
                        str(stu.stop_id): (stu.departure.time if stu.HasField('departure') else stu.arrival.time)
                        for stu in tu.stop_time_update
                    }

            # 3. Parse Vehicle Positions
            v_feed = gtfs_realtime_pb2.FeedMessage()
            v_feed.ParseFromString(v_res.content)
            
            processed_buses = []
            for entity in v_feed.entity:
                if entity.HasField('vehicle'):
                    v = entity.vehicle
                    t_id = str(v.trip.trip_id)
                    
                    # Get all predictions for this trip
                    trip_predictions = prediction_map.get(t_id, {})
                    
                    # Get the first stop (next stop) for backward compatibility
                    next_stop_id = None
                    predicted_time = None
                    if trip_predictions:
                        # Get the first stop in the predictions (sorted by time)
                        sorted_stops = sorted(trip_predictions.items(), key=lambda x: x[1])
                        if sorted_stops:
                            next_stop_id, predicted_time = sorted_stops[0]

                    processed_buses.append({
                        "id": v.vehicle.id,
                        "route": v.trip.route_id,
                        "trip_id": t_id,
                        "lat": round(v.position.latitude, 6),
                        "lon": round(v.position.longitude, 6),
                        "occupancy": v.occupancy_status,
                        "next_stop_id": next_stop_id,
                        "predicted_time": predicted_time,
                        "predictions": trip_predictions  # Store all predictions
                    })
            
            # 4. Parse Alerts
            a_feed = gtfs_realtime_pb2.FeedMessage()
            a_feed.ParseFromString(a_res.content)
            
            # Mapping: { "route_id": [ {header, description, effect}, ... ] }
            route_alerts = {}
            for entity in a_feed.entity:
                if entity.HasField('alert'):
                    alert = entity.alert
                    
                    # Extract English translations
                    header = next((t.text for t in alert.header_text.translation if t.language == "en"), "No Header")
                    description = next((t.text for t in alert.description_text.translation if t.language == "en"), "")
                    
                    # Map cause and effect codes to human-readable strings
                    cause_code = int(alert.cause) if alert.HasField('cause') else None
                    effect_code = int(alert.effect) if alert.HasField('effect') else None
                    
                    alert_payload = {
                        "header": header,
                        "description": description,
                        "cause": CAUSE_MAP.get(cause_code, "Unknown") if cause_code is not None else "Unknown",
                        "effect": EFFECT_MAP.get(effect_code, "Unknown") if effect_code is not None else "Unknown",
                        "severity": "HIGH" if effect_code == 1 else "MEDIUM"
                    }

                    # Link alert to every route it mentions
                    for ie in alert.informed_entity:
                        if ie.HasField('route_id'):
                            rid = str(ie.route_id)
                            if rid not in route_alerts:
                                route_alerts[rid] = []
                            route_alerts[rid].append(alert_payload)

            self._data = {
                "vehicles": processed_buses,
                "predictions": prediction_map,
                "alerts": route_alerts  # Add this to your cache data
            }
            self._last_updated = time.time()
            print(f"--- Cache Refreshed: {len(processed_buses)} buses, {len(route_alerts)} routes with alerts ---")
            return self._data
            
        except Exception as e:
            print(f"Async fetch failed: {e}")
            return self._data if self._data else []