mohammed-aljafry commited on
Commit
83e01e2
·
verified ·
1 Parent(s): 1f3067f

Upload simulation_modules.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. simulation_modules.py +336 -336
simulation_modules.py CHANGED
@@ -1,336 +1,336 @@
1
- # simulation_modules.py
2
-
3
- import torch
4
- import numpy as np
5
- import cv2
6
- import math
7
- from collections import deque
8
- from typing import List, Tuple, Dict, Any, Optional
9
-
10
- # ================== Constants ==================
11
- WAYPOINT_SCALE_FACTOR = 5.0
12
- T1_FUTURE_TIME = 1.0
13
- T2_FUTURE_TIME = 2.0
14
- PIXELS_PER_METER = 8
15
- MAX_DISTANCE = 32
16
- IMG_SIZE = MAX_DISTANCE * PIXELS_PER_METER * 2
17
- EGO_CAR_X = IMG_SIZE // 2
18
- EGO_CAR_Y = IMG_SIZE - (4.0 * PIXELS_PER_METER)
19
-
20
- COLORS = {
21
- 'vehicle': [255, 0, 0],
22
- 'pedestrian': [0, 255, 0],
23
- 'cyclist': [0, 0, 255],
24
- 'waypoint': [255, 255, 0],
25
- 'ego_car': [255, 255, 255]
26
- }
27
-
28
- # ================== PID Controller ==================
29
- class PIDController:
30
- def __init__(self, K_P=1.0, K_I=0.0, K_D=0.0, n=20):
31
- self._K_P = K_P
32
- self._K_I = K_I
33
- self._K_D = K_D
34
- self._window = deque([0 for _ in range(n)], maxlen=n)
35
-
36
- def step(self, error):
37
- self._window.append(error)
38
- if len(self._window) >= 2:
39
- integral = np.mean(self._window)
40
- derivative = self._window[-1] - self._window[-2]
41
- else:
42
- integral = derivative = 0.0
43
- return self._K_P * error + self._K_I * integral + self._K_D * derivative
44
-
45
- # ================== Helper Functions ==================
46
- def ensure_rgb(image):
47
- if len(image.shape) == 2:
48
- return cv2.cvtColor(image, cv2.COLOR_GRAY2BGR)
49
- elif image.shape[2] == 1:
50
- return cv2.cvtColor(image, cv2.COLOR_GRAY2BGR)
51
- return image
52
-
53
- def add_rect(img, loc, ori, box, value, color):
54
- center_x = int(loc[0] * PIXELS_PER_METER + MAX_DISTANCE * PIXELS_PER_METER)
55
- center_y = int(loc[1] * PIXELS_PER_METER + MAX_DISTANCE * PIXELS_PER_METER)
56
- size_px = (int(box[0] * PIXELS_PER_METER), int(box[1] * PIXELS_PER_METER))
57
- angle_deg = -np.degrees(math.atan2(ori[1], ori[0]))
58
- box_points = cv2.boxPoints(((center_x, center_y), size_px, angle_deg))
59
- box_points = np.int32(box_points)
60
- adjusted_color = [int(c * value) for c in color]
61
- cv2.fillConvexPoly(img, box_points, adjusted_color)
62
- return img
63
-
64
- def render(traffic_grid, t=0):
65
- img = np.zeros((IMG_SIZE, IMG_SIZE, 3), dtype=np.uint8)
66
- counts = {'vehicles': 0, 'pedestrians': 0, 'cyclists': 0}
67
-
68
- if isinstance(traffic_grid, torch.Tensor):
69
- traffic_grid = traffic_grid.cpu().numpy()
70
-
71
- h, w, c = traffic_grid.shape
72
- for y in range(h):
73
- for x in range(w):
74
- for ch in range(c):
75
- if traffic_grid[y, x, ch] > 0.1:
76
- world_x = (x / w - 0.5) * MAX_DISTANCE * 2
77
- world_y = (y / h - 0.5) * MAX_DISTANCE * 2
78
-
79
- if ch < 3:
80
- color = COLORS['vehicle']
81
- counts['vehicles'] += 1
82
- box_size = [2.0, 4.0]
83
- elif ch < 5:
84
- color = COLORS['pedestrian']
85
- counts['pedestrians'] += 1
86
- box_size = [0.8, 0.8]
87
- else:
88
- color = COLORS['cyclist']
89
- counts['cyclists'] += 1
90
- box_size = [1.2, 2.0]
91
-
92
- img = add_rect(img, [world_x, world_y], [1.0, 0.0],
93
- box_size, traffic_grid[y, x, ch], color)
94
-
95
- return img, counts
96
-
97
- def render_waypoints(waypoints, scale_factor=WAYPOINT_SCALE_FACTOR):
98
- img = np.zeros((IMG_SIZE, IMG_SIZE, 3), dtype=np.uint8)
99
-
100
- if isinstance(waypoints, torch.Tensor):
101
- waypoints = waypoints.cpu().numpy()
102
-
103
- scaled_waypoints = waypoints * scale_factor
104
-
105
- for i, wp in enumerate(scaled_waypoints):
106
- px = int(wp[0] * PIXELS_PER_METER + IMG_SIZE // 2)
107
- py = int(wp[1] * PIXELS_PER_METER + IMG_SIZE // 2)
108
-
109
- if 0 <= px < IMG_SIZE and 0 <= py < IMG_SIZE:
110
- radius = max(3, 8 - i)
111
- cv2.circle(img, (px, py), radius, COLORS['waypoint'], -1)
112
-
113
- if i > 0:
114
- prev_px = int(scaled_waypoints[i-1][0] * PIXELS_PER_METER + IMG_SIZE // 2)
115
- prev_py = int(scaled_waypoints[i-1][1] * PIXELS_PER_METER + IMG_SIZE // 2)
116
- if 0 <= prev_px < IMG_SIZE and 0 <= prev_py < IMG_SIZE:
117
- cv2.line(img, (prev_px, prev_py), (px, py), COLORS['waypoint'], 2)
118
-
119
- return img
120
-
121
- def render_self_car(img):
122
- car_pos = [0, -4.0]
123
- car_ori = [1.0, 0.0]
124
- car_size = [2.0, 4.5]
125
- return add_rect(img, car_pos, car_ori, car_size, 1.0, COLORS['ego_car'])
126
-
127
- # ================== Tracker Classes ==================
128
- class TrackedObject:
129
- def __init__(self, obj_id: int):
130
- self.id = obj_id
131
- self.last_step = 0
132
- self.last_pos = [0.0, 0.0]
133
- self.historical_pos = []
134
- self.historical_steps = []
135
- self.velocity = [0.0, 0.0]
136
- self.confidence = 1.0
137
-
138
- def update(self, step: int, obj_info: List[float]):
139
- self.last_step = step
140
- self.last_pos = obj_info[:2]
141
- self.historical_pos.append(obj_info[:2])
142
- self.historical_steps.append(step)
143
-
144
- if len(self.historical_pos) >= 2:
145
- dt = self.historical_steps[-1] - self.historical_steps[-2]
146
- if dt > 0:
147
- dx = self.historical_pos[-1][0] - self.historical_pos[-2][0]
148
- dy = self.historical_pos[-1][1] - self.historical_pos[-2][1]
149
- self.velocity = [dx/dt, dy/dt]
150
-
151
- def predict_position(self, future_time: float) -> List[float]:
152
- predicted_x = self.last_pos[0] + self.velocity[0] * future_time
153
- predicted_y = self.last_pos[1] + self.velocity[1] * future_time
154
- return [predicted_x, predicted_y]
155
-
156
- def is_alive(self, current_step: int, max_age: int = 5) -> bool:
157
- return (current_step - self.last_step) <= max_age
158
-
159
- class Tracker:
160
- def __init__(self, frequency: int = 10):
161
- self.tracks: List[TrackedObject] = []
162
- self.frequency = frequency
163
- self.next_id = 0
164
- self.current_step = 0
165
-
166
- def update_and_predict(self, detections: List[Dict], step: int) -> np.ndarray:
167
- self.current_step = step
168
-
169
- for detection in detections:
170
- pos = detection.get('position', [0, 0])
171
- feature = detection.get('feature', 0.5)
172
-
173
- best_match = None
174
- min_distance = float('inf')
175
-
176
- for track in self.tracks:
177
- if track.is_alive(step):
178
- distance = np.linalg.norm(np.array(pos) - np.array(track.last_pos))
179
- if distance < min_distance and distance < 2.0:
180
- min_distance = distance
181
- best_match = track
182
-
183
- if best_match:
184
- best_match.update(step, pos + [feature])
185
- else:
186
- new_track = TrackedObject(self.next_id)
187
- new_track.update(step, pos + [feature])
188
- self.tracks.append(new_track)
189
- self.next_id += 1
190
-
191
- self.tracks = [t for t in self.tracks if t.is_alive(step)]
192
- return self._generate_prediction_grid()
193
-
194
- def _generate_prediction_grid(self) -> np.ndarray:
195
- grid = np.zeros((20, 20, 7), dtype=np.float32)
196
-
197
- for track in self.tracks:
198
- if track.is_alive(self.current_step):
199
- current_pos = track.last_pos
200
- future_pos_t1 = track.predict_position(T1_FUTURE_TIME)
201
- future_pos_t2 = track.predict_position(T2_FUTURE_TIME)
202
-
203
- for pos in [current_pos, future_pos_t1, future_pos_t2]:
204
- grid_x = int((pos[0] / (MAX_DISTANCE * 2) + 0.5) * 20)
205
- grid_y = int((pos[1] / (MAX_DISTANCE * 2) + 0.5) * 20)
206
-
207
- if 0 <= grid_x < 20 and 0 <= grid_y < 20:
208
- channel = 0
209
- grid[grid_y, grid_x, channel] = max(grid[grid_y, grid_x, channel], track.confidence)
210
-
211
- return grid
212
-
213
- # ================== Controller Classes ==================
214
- class ControllerConfig:
215
- def __init__(self):
216
- self.turn_KP = 1.0
217
- self.turn_KI = 0.1
218
- self.turn_KD = 0.1
219
- self.turn_n = 20
220
-
221
- self.speed_KP = 0.5
222
- self.speed_KI = 0.05
223
- self.speed_KD = 0.1
224
- self.speed_n = 20
225
-
226
- self.max_speed = 6.0
227
- self.max_throttle = 0.75
228
- self.clip_delta = 0.25
229
-
230
- self.brake_speed = 0.4
231
- self.brake_ratio = 1.1
232
-
233
- class InterfuserController:
234
- def __init__(self, config: ControllerConfig):
235
- self.config = config
236
- self.turn_controller = PIDController(config.turn_KP, config.turn_KI, config.turn_KD, config.turn_n)
237
- self.speed_controller = PIDController(config.speed_KP, config.speed_KI, config.speed_KD, config.speed_n)
238
- self.last_steer = 0.0
239
- self.last_throttle = 0.0
240
- self.target_speed = 3.0
241
-
242
- def run_step(self, current_speed: float, waypoints: np.ndarray,
243
- junction: float, traffic_light_state: float,
244
- stop_sign: float, meta_data: Dict) -> Tuple[float, float, bool, str]:
245
-
246
- if isinstance(waypoints, torch.Tensor):
247
- waypoints = waypoints.cpu().numpy()
248
-
249
- if len(waypoints) > 1:
250
- dx = waypoints[1][0] - waypoints[0][0]
251
- dy = waypoints[1][1] - waypoints[0][1]
252
- target_yaw = math.atan2(dy, dx)
253
- steer = self.turn_controller.step(target_yaw)
254
- else:
255
- steer = 0.0
256
-
257
- steer = np.clip(steer, -1.0, 1.0)
258
-
259
- target_speed = self.target_speed
260
- if junction > 0.5:
261
- target_speed *= 0.7
262
- if abs(steer) > 0.3:
263
- target_speed *= 0.8
264
-
265
- speed_error = target_speed - current_speed
266
- throttle = self.speed_controller.step(speed_error)
267
- throttle = np.clip(throttle, 0.0, self.config.max_throttle)
268
-
269
- brake = False
270
- if traffic_light_state > 0.5 or stop_sign > 0.5 or current_speed > self.config.max_speed:
271
- brake = True
272
- throttle = 0.0
273
-
274
- self.last_steer = steer
275
- self.last_throttle = throttle
276
-
277
- metadata = f"Speed:{current_speed:.1f} Target:{target_speed:.1f} Junction:{junction:.2f}"
278
-
279
- return steer, throttle, brake, metadata
280
-
281
- # ================== Display Interface ==================
282
- class DisplayInterface:
283
- def __init__(self, width: int = 1200, height: int = 600):
284
- self._width = width
285
- self._height = height
286
- self.camera_width = width // 2
287
- self.camera_height = height
288
- self.map_width = width // 2
289
- self.map_height = height // 3
290
-
291
- def run_interface(self, data: Dict[str, Any]) -> np.ndarray:
292
- dashboard = np.zeros((self._height, self._width, 3), dtype=np.uint8)
293
-
294
- # Camera view
295
- camera_view = data.get('camera_view')
296
- if camera_view is not None:
297
- camera_resized = cv2.resize(camera_view, (self.camera_width, self.camera_height))
298
- dashboard[:, :self.camera_width] = camera_resized
299
-
300
- # Maps
301
- map_start_x = self.camera_width
302
-
303
- map_t0 = data.get('map_t0')
304
- if map_t0 is not None:
305
- map_resized = cv2.resize(map_t0, (self.map_width, self.map_height))
306
- dashboard[:self.map_height, map_start_x:] = map_resized
307
- cv2.putText(dashboard, "Current (t=0)", (map_start_x + 10, 30),
308
- cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
309
-
310
- map_t1 = data.get('map_t1')
311
- if map_t1 is not None:
312
- map_resized = cv2.resize(map_t1, (self.map_width, self.map_height))
313
- y_start = self.map_height
314
- dashboard[y_start:y_start + self.map_height, map_start_x:] = map_resized
315
- cv2.putText(dashboard, f"Future (t={T1_FUTURE_TIME}s)",
316
- (map_start_x + 10, y_start + 30), cv2.FONT_HERSHEY_SIMPLEX,
317
- 0.7, (255, 255, 255), 2)
318
-
319
- map_t2 = data.get('map_t2')
320
- if map_t2 is not None:
321
- map_resized = cv2.resize(map_t2, (self.map_width, self.map_height))
322
- y_start = self.map_height * 2
323
- dashboard[y_start:, map_start_x:] = map_resized
324
- cv2.putText(dashboard, f"Future (t={T2_FUTURE_TIME}s)",
325
- (map_start_x + 10, y_start + 30), cv2.FONT_HERSHEY_SIMPLEX,
326
- 0.7, (255, 255, 255), 2)
327
-
328
- # Text info
329
- text_info = data.get('text_info', {})
330
- y_offset = 50
331
- for key, value in text_info.items():
332
- cv2.putText(dashboard, value, (10, y_offset), cv2.FONT_HERSHEY_SIMPLEX,
333
- 0.6, (0, 255, 0), 2)
334
- y_offset += 30
335
-
336
- return dashboard
 
1
+ # simulation_modules.py
2
+
3
+ import torch
4
+ import numpy as np
5
+ import cv2
6
+ import math
7
+ from collections import deque
8
+ from typing import List, Tuple, Dict, Any, Optional
9
+
10
+ # ================== Constants ==================
11
+ WAYPOINT_SCALE_FACTOR = 5.0
12
+ T1_FUTURE_TIME = 1.0
13
+ T2_FUTURE_TIME = 2.0
14
+ PIXELS_PER_METER = 8
15
+ MAX_DISTANCE = 32
16
+ IMG_SIZE = MAX_DISTANCE * PIXELS_PER_METER * 2
17
+ EGO_CAR_X = IMG_SIZE // 2
18
+ EGO_CAR_Y = IMG_SIZE - (4.0 * PIXELS_PER_METER)
19
+
20
+ COLORS = {
21
+ 'vehicle': [255, 0, 0],
22
+ 'pedestrian': [0, 255, 0],
23
+ 'cyclist': [0, 0, 255],
24
+ 'waypoint': [255, 255, 0],
25
+ 'ego_car': [255, 255, 255]
26
+ }
27
+
28
+ # ================== PID Controller ==================
29
+ class PIDController:
30
+ def __init__(self, K_P=1.0, K_I=0.0, K_D=0.0, n=20):
31
+ self._K_P = K_P
32
+ self._K_I = K_I
33
+ self._K_D = K_D
34
+ self._window = deque([0 for _ in range(n)], maxlen=n)
35
+
36
+ def step(self, error):
37
+ self._window.append(error)
38
+ if len(self._window) >= 2:
39
+ integral = np.mean(self._window)
40
+ derivative = self._window[-1] - self._window[-2]
41
+ else:
42
+ integral = derivative = 0.0
43
+ return self._K_P * error + self._K_I * integral + self._K_D * derivative
44
+
45
+ # ================== Helper Functions ==================
46
+ def ensure_rgb(image):
47
+ if len(image.shape) == 2:
48
+ return cv2.cvtColor(image, cv2.COLOR_GRAY2BGR)
49
+ elif image.shape[2] == 1:
50
+ return cv2.cvtColor(image, cv2.COLOR_GRAY2BGR)
51
+ return image
52
+
53
+ def add_rect(img, loc, ori, box, value, color):
54
+ center_x = int(loc[0] * PIXELS_PER_METER + MAX_DISTANCE * PIXELS_PER_METER)
55
+ center_y = int(loc[1] * PIXELS_PER_METER + MAX_DISTANCE * PIXELS_PER_METER)
56
+ size_px = (int(box[0] * PIXELS_PER_METER), int(box[1] * PIXELS_PER_METER))
57
+ angle_deg = -np.degrees(math.atan2(ori[1], ori[0]))
58
+ box_points = cv2.boxPoints(((center_x, center_y), size_px, angle_deg))
59
+ box_points = np.int32(box_points)
60
+ adjusted_color = [int(c * value) for c in color]
61
+ cv2.fillConvexPoly(img, box_points, adjusted_color)
62
+ return img
63
+
64
+ def render(traffic_grid, t=0):
65
+ img = np.zeros((IMG_SIZE, IMG_SIZE, 3), dtype=np.uint8)
66
+ counts = {'vehicles': 0, 'pedestrians': 0, 'cyclists': 0}
67
+
68
+ if isinstance(traffic_grid, torch.Tensor):
69
+ traffic_grid = traffic_grid.cpu().numpy()
70
+
71
+ h, w, c = traffic_grid.shape
72
+ for y in range(h):
73
+ for x in range(w):
74
+ for ch in range(c):
75
+ if traffic_grid[y, x, ch] > 0.1:
76
+ world_x = (x / w - 0.5) * MAX_DISTANCE * 2
77
+ world_y = (y / h - 0.5) * MAX_DISTANCE * 2
78
+
79
+ if ch < 3:
80
+ color = COLORS['vehicle']
81
+ counts['vehicles'] += 1
82
+ box_size = [2.0, 4.0]
83
+ elif ch < 5:
84
+ color = COLORS['pedestrian']
85
+ counts['pedestrians'] += 1
86
+ box_size = [0.8, 0.8]
87
+ else:
88
+ color = COLORS['cyclist']
89
+ counts['cyclists'] += 1
90
+ box_size = [1.2, 2.0]
91
+
92
+ img = add_rect(img, [world_x, world_y], [1.0, 0.0],
93
+ box_size, traffic_grid[y, x, ch], color)
94
+
95
+ return img, counts
96
+
97
+ def render_waypoints(waypoints, scale_factor=WAYPOINT_SCALE_FACTOR):
98
+ img = np.zeros((IMG_SIZE, IMG_SIZE, 3), dtype=np.uint8)
99
+
100
+ if isinstance(waypoints, torch.Tensor):
101
+ waypoints = waypoints.cpu().numpy()
102
+
103
+ scaled_waypoints = waypoints * scale_factor
104
+
105
+ for i, wp in enumerate(scaled_waypoints):
106
+ px = int(wp[0] * PIXELS_PER_METER + IMG_SIZE // 2)
107
+ py = int(wp[1] * PIXELS_PER_METER + IMG_SIZE // 2)
108
+
109
+ if 0 <= px < IMG_SIZE and 0 <= py < IMG_SIZE:
110
+ radius = max(3, 8 - i)
111
+ cv2.circle(img, (px, py), radius, COLORS['waypoint'], -1)
112
+
113
+ if i > 0:
114
+ prev_px = int(scaled_waypoints[i-1][0] * PIXELS_PER_METER + IMG_SIZE // 2)
115
+ prev_py = int(scaled_waypoints[i-1][1] * PIXELS_PER_METER + IMG_SIZE // 2)
116
+ if 0 <= prev_px < IMG_SIZE and 0 <= prev_py < IMG_SIZE:
117
+ cv2.line(img, (prev_px, prev_py), (px, py), COLORS['waypoint'], 2)
118
+
119
+ return img
120
+
121
+ def render_self_car(img):
122
+ car_pos = [0, -4.0]
123
+ car_ori = [1.0, 0.0]
124
+ car_size = [2.0, 4.5]
125
+ return add_rect(img, car_pos, car_ori, car_size, 1.0, COLORS['ego_car'])
126
+
127
+ # ================== Tracker Classes ==================
128
+ class TrackedObject:
129
+ def __init__(self, obj_id: int):
130
+ self.id = obj_id
131
+ self.last_step = 0
132
+ self.last_pos = [0.0, 0.0]
133
+ self.historical_pos = []
134
+ self.historical_steps = []
135
+ self.velocity = [0.0, 0.0]
136
+ self.confidence = 1.0
137
+
138
+ def update(self, step: int, obj_info: List[float]):
139
+ self.last_step = step
140
+ self.last_pos = obj_info[:2]
141
+ self.historical_pos.append(obj_info[:2])
142
+ self.historical_steps.append(step)
143
+
144
+ if len(self.historical_pos) >= 2:
145
+ dt = self.historical_steps[-1] - self.historical_steps[-2]
146
+ if dt > 0:
147
+ dx = self.historical_pos[-1][0] - self.historical_pos[-2][0]
148
+ dy = self.historical_pos[-1][1] - self.historical_pos[-2][1]
149
+ self.velocity = [dx/dt, dy/dt]
150
+
151
+ def predict_position(self, future_time: float) -> List[float]:
152
+ predicted_x = self.last_pos[0] + self.velocity[0] * future_time
153
+ predicted_y = self.last_pos[1] + self.velocity[1] * future_time
154
+ return [predicted_x, predicted_y]
155
+
156
+ def is_alive(self, current_step: int, max_age: int = 5) -> bool:
157
+ return (current_step - self.last_step) <= max_age
158
+
159
+ class Tracker:
160
+ def __init__(self, frequency: int = 10):
161
+ self.tracks: List[TrackedObject] = []
162
+ self.frequency = frequency
163
+ self.next_id = 0
164
+ self.current_step = 0
165
+
166
+ def update_and_predict(self, detections: List[Dict], step: int) -> np.ndarray:
167
+ self.current_step = step
168
+
169
+ for detection in detections:
170
+ pos = detection.get('position', [0, 0])
171
+ feature = detection.get('feature', 0.5)
172
+
173
+ best_match = None
174
+ min_distance = float('inf')
175
+
176
+ for track in self.tracks:
177
+ if track.is_alive(step):
178
+ distance = np.linalg.norm(np.array(pos) - np.array(track.last_pos))
179
+ if distance < min_distance and distance < 2.0:
180
+ min_distance = distance
181
+ best_match = track
182
+
183
+ if best_match:
184
+ best_match.update(step, pos + [feature])
185
+ else:
186
+ new_track = TrackedObject(self.next_id)
187
+ new_track.update(step, pos + [feature])
188
+ self.tracks.append(new_track)
189
+ self.next_id += 1
190
+
191
+ self.tracks = [t for t in self.tracks if t.is_alive(step)]
192
+ return self._generate_prediction_grid()
193
+
194
+ def _generate_prediction_grid(self) -> np.ndarray:
195
+ grid = np.zeros((20, 20, 7), dtype=np.float32)
196
+
197
+ for track in self.tracks:
198
+ if track.is_alive(self.current_step):
199
+ current_pos = track.last_pos
200
+ future_pos_t1 = track.predict_position(T1_FUTURE_TIME)
201
+ future_pos_t2 = track.predict_position(T2_FUTURE_TIME)
202
+
203
+ for pos in [current_pos, future_pos_t1, future_pos_t2]:
204
+ grid_x = int((pos[0] / (MAX_DISTANCE * 2) + 0.5) * 20)
205
+ grid_y = int((pos[1] / (MAX_DISTANCE * 2) + 0.5) * 20)
206
+
207
+ if 0 <= grid_x < 20 and 0 <= grid_y < 20:
208
+ channel = 0
209
+ grid[grid_y, grid_x, channel] = max(grid[grid_y, grid_x, channel], track.confidence)
210
+
211
+ return grid
212
+
213
+ # ================== Controller Classes ==================
214
+ class ControllerConfig:
215
+ def __init__(self):
216
+ self.turn_KP = 1.0
217
+ self.turn_KI = 0.1
218
+ self.turn_KD = 0.1
219
+ self.turn_n = 20
220
+
221
+ self.speed_KP = 0.5
222
+ self.speed_KI = 0.05
223
+ self.speed_KD = 0.1
224
+ self.speed_n = 20
225
+
226
+ self.max_speed = 6.0
227
+ self.max_throttle = 0.75
228
+ self.clip_delta = 0.25
229
+
230
+ self.brake_speed = 0.4
231
+ self.brake_ratio = 1.1
232
+
233
+ class InterfuserController:
234
+ def __init__(self, config: ControllerConfig):
235
+ self.config = config
236
+ self.turn_controller = PIDController(config.turn_KP, config.turn_KI, config.turn_KD, config.turn_n)
237
+ self.speed_controller = PIDController(config.speed_KP, config.speed_KI, config.speed_KD, config.speed_n)
238
+ self.last_steer = 0.0
239
+ self.last_throttle = 0.0
240
+ self.target_speed = 3.0
241
+
242
+ def run_step(self, current_speed: float, waypoints: np.ndarray,
243
+ junction: float, traffic_light_state: float,
244
+ stop_sign: float, meta_data: Dict) -> Tuple[float, float, bool, str]:
245
+
246
+ if isinstance(waypoints, torch.Tensor):
247
+ waypoints = waypoints.cpu().numpy()
248
+
249
+ if len(waypoints) > 1:
250
+ dx = waypoints[1][0] - waypoints[0][0]
251
+ dy = waypoints[1][1] - waypoints[0][1]
252
+ target_yaw = math.atan2(dy, dx)
253
+ steer = self.turn_controller.step(target_yaw)
254
+ else:
255
+ steer = 0.0
256
+
257
+ steer = np.clip(steer, -1.0, 1.0)
258
+
259
+ target_speed = self.target_speed
260
+ if junction > 0.5:
261
+ target_speed *= 0.7
262
+ if abs(steer) > 0.3:
263
+ target_speed *= 0.8
264
+
265
+ speed_error = target_speed - current_speed
266
+ throttle = self.speed_controller.step(speed_error)
267
+ throttle = np.clip(throttle, 0.0, self.config.max_throttle)
268
+
269
+ brake = False
270
+ if traffic_light_state > 0.5 or stop_sign > 0.5 or current_speed > self.config.max_speed:
271
+ brake = True
272
+ throttle = 0.0
273
+
274
+ self.last_steer = steer
275
+ self.last_throttle = throttle
276
+
277
+ metadata = f"Speed:{current_speed:.1f} Target:{target_speed:.1f} Junction:{junction:.2f}"
278
+
279
+ return steer, throttle, brake, metadata
280
+
281
+ # ================== Display Interface ==================
282
+ class DisplayInterface:
283
+ def __init__(self, width: int = 1200, height: int = 600):
284
+ self._width = width
285
+ self._height = height
286
+ self.camera_width = width // 2
287
+ self.camera_height = height
288
+ self.map_width = width // 2
289
+ self.map_height = height // 3
290
+
291
+ def run_interface(self, data: Dict[str, Any]) -> np.ndarray:
292
+ dashboard = np.zeros((self._height, self._width, 3), dtype=np.uint8)
293
+
294
+ # Camera view
295
+ camera_view = data.get('camera_view')
296
+ if camera_view is not None:
297
+ camera_resized = cv2.resize(camera_view, (self.camera_width, self.camera_height))
298
+ dashboard[:, :self.camera_width] = camera_resized
299
+
300
+ # Maps
301
+ map_start_x = self.camera_width
302
+
303
+ map_t0 = data.get('map_t0')
304
+ if map_t0 is not None:
305
+ map_resized = cv2.resize(map_t0, (self.map_width, self.map_height))
306
+ dashboard[:self.map_height, map_start_x:] = map_resized
307
+ cv2.putText(dashboard, "Current (t=0)", (map_start_x + 10, 30),
308
+ cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
309
+
310
+ map_t1 = data.get('map_t1')
311
+ if map_t1 is not None:
312
+ map_resized = cv2.resize(map_t1, (self.map_width, self.map_height))
313
+ y_start = self.map_height
314
+ dashboard[y_start:y_start + self.map_height, map_start_x:] = map_resized
315
+ cv2.putText(dashboard, f"Future (t={T1_FUTURE_TIME}s)",
316
+ (map_start_x + 10, y_start + 30), cv2.FONT_HERSHEY_SIMPLEX,
317
+ 0.7, (255, 255, 255), 2)
318
+
319
+ map_t2 = data.get('map_t2')
320
+ if map_t2 is not None:
321
+ map_resized = cv2.resize(map_t2, (self.map_width, self.map_height))
322
+ y_start = self.map_height * 2
323
+ dashboard[y_start:, map_start_x:] = map_resized
324
+ cv2.putText(dashboard, f"Future (t={T2_FUTURE_TIME}s)",
325
+ (map_start_x + 10, y_start + 30), cv2.FONT_HERSHEY_SIMPLEX,
326
+ 0.7, (255, 255, 255), 2)
327
+
328
+ # Text info
329
+ text_info = data.get('text_info', {})
330
+ y_offset = 50
331
+ for key, value in text_info.items():
332
+ cv2.putText(dashboard, value, (10, y_offset), cv2.FONT_HERSHEY_SIMPLEX,
333
+ 0.6, (0, 255, 0), 2)
334
+ y_offset += 30
335
+
336
+ return dashboard