Spaces:
Sleeping
Sleeping
| # simulation_modules.py | |
| import torch | |
| import numpy as np | |
| import cv2 | |
| import math | |
| from collections import deque | |
| from typing import List, Tuple, Dict, Any, Optional | |
| # ================== Constants ================== | |
| WAYPOINT_SCALE_FACTOR = 5.0 | |
| T1_FUTURE_TIME = 1.0 | |
| T2_FUTURE_TIME = 2.0 | |
| PIXELS_PER_METER = 8 | |
| MAX_DISTANCE = 32 | |
| IMG_SIZE = MAX_DISTANCE * PIXELS_PER_METER * 2 | |
| EGO_CAR_X = IMG_SIZE // 2 | |
| EGO_CAR_Y = IMG_SIZE - (4.0 * PIXELS_PER_METER) | |
| COLORS = { | |
| 'vehicle': [255, 0, 0], | |
| 'pedestrian': [0, 255, 0], | |
| 'cyclist': [0, 0, 255], | |
| 'waypoint': [255, 255, 0], | |
| 'ego_car': [255, 255, 255] | |
| } | |
| # ================== PID Controller ================== | |
| class PIDController: | |
| def __init__(self, K_P=1.0, K_I=0.0, K_D=0.0, n=20): | |
| self._K_P = K_P | |
| self._K_I = K_I | |
| self._K_D = K_D | |
| self._window = deque([0 for _ in range(n)], maxlen=n) | |
| def step(self, error): | |
| self._window.append(error) | |
| if len(self._window) >= 2: | |
| integral = np.mean(self._window) | |
| derivative = self._window[-1] - self._window[-2] | |
| else: | |
| integral = derivative = 0.0 | |
| return self._K_P * error + self._K_I * integral + self._K_D * derivative | |
| # ================== Helper Functions ================== | |
| def ensure_rgb(image): | |
| if len(image.shape) == 2: | |
| return cv2.cvtColor(image, cv2.COLOR_GRAY2BGR) | |
| elif image.shape[2] == 1: | |
| return cv2.cvtColor(image, cv2.COLOR_GRAY2BGR) | |
| return image | |
| def add_rect(img, loc, ori, box, value, color): | |
| center_x = int(loc[0] * PIXELS_PER_METER + MAX_DISTANCE * PIXELS_PER_METER) | |
| center_y = int(loc[1] * PIXELS_PER_METER + MAX_DISTANCE * PIXELS_PER_METER) | |
| size_px = (int(box[0] * PIXELS_PER_METER), int(box[1] * PIXELS_PER_METER)) | |
| angle_deg = -np.degrees(math.atan2(ori[1], ori[0])) | |
| box_points = cv2.boxPoints(((center_x, center_y), size_px, angle_deg)) | |
| box_points = np.int32(box_points) | |
| adjusted_color = [int(c * value) for c in color] | |
| cv2.fillConvexPoly(img, box_points, adjusted_color) | |
| return img | |
| def render(traffic_grid, t=0): | |
| img = np.zeros((IMG_SIZE, IMG_SIZE, 3), dtype=np.uint8) | |
| counts = {'vehicles': 0, 'pedestrians': 0, 'cyclists': 0} | |
| if isinstance(traffic_grid, torch.Tensor): | |
| traffic_grid = traffic_grid.cpu().numpy() | |
| h, w, c = traffic_grid.shape | |
| for y in range(h): | |
| for x in range(w): | |
| for ch in range(c): | |
| if traffic_grid[y, x, ch] > 0.1: | |
| world_x = (x / w - 0.5) * MAX_DISTANCE * 2 | |
| world_y = (y / h - 0.5) * MAX_DISTANCE * 2 | |
| if ch < 3: | |
| color = COLORS['vehicle'] | |
| counts['vehicles'] += 1 | |
| box_size = [2.0, 4.0] | |
| elif ch < 5: | |
| color = COLORS['pedestrian'] | |
| counts['pedestrians'] += 1 | |
| box_size = [0.8, 0.8] | |
| else: | |
| color = COLORS['cyclist'] | |
| counts['cyclists'] += 1 | |
| box_size = [1.2, 2.0] | |
| img = add_rect(img, [world_x, world_y], [1.0, 0.0], | |
| box_size, traffic_grid[y, x, ch], color) | |
| return img, counts | |
| def render_waypoints(waypoints, scale_factor=WAYPOINT_SCALE_FACTOR): | |
| img = np.zeros((IMG_SIZE, IMG_SIZE, 3), dtype=np.uint8) | |
| if isinstance(waypoints, torch.Tensor): | |
| waypoints = waypoints.cpu().numpy() | |
| scaled_waypoints = waypoints * scale_factor | |
| for i, wp in enumerate(scaled_waypoints): | |
| px = int(wp[0] * PIXELS_PER_METER + IMG_SIZE // 2) | |
| py = int(wp[1] * PIXELS_PER_METER + IMG_SIZE // 2) | |
| if 0 <= px < IMG_SIZE and 0 <= py < IMG_SIZE: | |
| radius = max(3, 8 - i) | |
| cv2.circle(img, (px, py), radius, COLORS['waypoint'], -1) | |
| if i > 0: | |
| prev_px = int(scaled_waypoints[i-1][0] * PIXELS_PER_METER + IMG_SIZE // 2) | |
| prev_py = int(scaled_waypoints[i-1][1] * PIXELS_PER_METER + IMG_SIZE // 2) | |
| if 0 <= prev_px < IMG_SIZE and 0 <= prev_py < IMG_SIZE: | |
| cv2.line(img, (prev_px, prev_py), (px, py), COLORS['waypoint'], 2) | |
| return img | |
| def render_self_car(img): | |
| car_pos = [0, -4.0] | |
| car_ori = [1.0, 0.0] | |
| car_size = [2.0, 4.5] | |
| return add_rect(img, car_pos, car_ori, car_size, 1.0, COLORS['ego_car']) | |
| # ================== Tracker Classes ================== | |
| class TrackedObject: | |
| def __init__(self, obj_id: int): | |
| self.id = obj_id | |
| self.last_step = 0 | |
| self.last_pos = [0.0, 0.0] | |
| self.historical_pos = [] | |
| self.historical_steps = [] | |
| self.velocity = [0.0, 0.0] | |
| self.confidence = 1.0 | |
| def update(self, step: int, obj_info: List[float]): | |
| self.last_step = step | |
| self.last_pos = obj_info[:2] | |
| self.historical_pos.append(obj_info[:2]) | |
| self.historical_steps.append(step) | |
| if len(self.historical_pos) >= 2: | |
| dt = self.historical_steps[-1] - self.historical_steps[-2] | |
| if dt > 0: | |
| dx = self.historical_pos[-1][0] - self.historical_pos[-2][0] | |
| dy = self.historical_pos[-1][1] - self.historical_pos[-2][1] | |
| self.velocity = [dx/dt, dy/dt] | |
| def predict_position(self, future_time: float) -> List[float]: | |
| predicted_x = self.last_pos[0] + self.velocity[0] * future_time | |
| predicted_y = self.last_pos[1] + self.velocity[1] * future_time | |
| return [predicted_x, predicted_y] | |
| def is_alive(self, current_step: int, max_age: int = 5) -> bool: | |
| return (current_step - self.last_step) <= max_age | |
| class Tracker: | |
| def __init__(self, frequency: int = 10): | |
| self.tracks: List[TrackedObject] = [] | |
| self.frequency = frequency | |
| self.next_id = 0 | |
| self.current_step = 0 | |
| def update_and_predict(self, detections: List[Dict], step: int) -> np.ndarray: | |
| self.current_step = step | |
| for detection in detections: | |
| pos = detection.get('position', [0, 0]) | |
| feature = detection.get('feature', 0.5) | |
| best_match = None | |
| min_distance = float('inf') | |
| for track in self.tracks: | |
| if track.is_alive(step): | |
| distance = np.linalg.norm(np.array(pos) - np.array(track.last_pos)) | |
| if distance < min_distance and distance < 2.0: | |
| min_distance = distance | |
| best_match = track | |
| if best_match: | |
| best_match.update(step, pos + [feature]) | |
| else: | |
| new_track = TrackedObject(self.next_id) | |
| new_track.update(step, pos + [feature]) | |
| self.tracks.append(new_track) | |
| self.next_id += 1 | |
| self.tracks = [t for t in self.tracks if t.is_alive(step)] | |
| return self._generate_prediction_grid() | |
| def _generate_prediction_grid(self) -> np.ndarray: | |
| grid = np.zeros((20, 20, 7), dtype=np.float32) | |
| for track in self.tracks: | |
| if track.is_alive(self.current_step): | |
| current_pos = track.last_pos | |
| future_pos_t1 = track.predict_position(T1_FUTURE_TIME) | |
| future_pos_t2 = track.predict_position(T2_FUTURE_TIME) | |
| for pos in [current_pos, future_pos_t1, future_pos_t2]: | |
| grid_x = int((pos[0] / (MAX_DISTANCE * 2) + 0.5) * 20) | |
| grid_y = int((pos[1] / (MAX_DISTANCE * 2) + 0.5) * 20) | |
| if 0 <= grid_x < 20 and 0 <= grid_y < 20: | |
| channel = 0 | |
| grid[grid_y, grid_x, channel] = max(grid[grid_y, grid_x, channel], track.confidence) | |
| return grid | |
| # ================== Controller Classes ================== | |
| class ControllerConfig: | |
| def __init__(self): | |
| self.turn_KP = 1.0 | |
| self.turn_KI = 0.1 | |
| self.turn_KD = 0.1 | |
| self.turn_n = 20 | |
| self.speed_KP = 0.5 | |
| self.speed_KI = 0.05 | |
| self.speed_KD = 0.1 | |
| self.speed_n = 20 | |
| self.max_speed = 6.0 | |
| self.max_throttle = 0.75 | |
| self.clip_delta = 0.25 | |
| self.brake_speed = 0.4 | |
| self.brake_ratio = 1.1 | |
| class InterfuserController: | |
| def __init__(self, config: ControllerConfig): | |
| self.config = config | |
| self.turn_controller = PIDController(config.turn_KP, config.turn_KI, config.turn_KD, config.turn_n) | |
| self.speed_controller = PIDController(config.speed_KP, config.speed_KI, config.speed_KD, config.speed_n) | |
| self.last_steer = 0.0 | |
| self.last_throttle = 0.0 | |
| self.target_speed = 3.0 | |
| def run_step(self, current_speed: float, waypoints: np.ndarray, | |
| junction: float, traffic_light_state: float, | |
| stop_sign: float, meta_data: Dict) -> Tuple[float, float, bool, str]: | |
| if isinstance(waypoints, torch.Tensor): | |
| waypoints = waypoints.cpu().numpy() | |
| if len(waypoints) > 1: | |
| dx = waypoints[1][0] - waypoints[0][0] | |
| dy = waypoints[1][1] - waypoints[0][1] | |
| target_yaw = math.atan2(dy, dx) | |
| steer = self.turn_controller.step(target_yaw) | |
| else: | |
| steer = 0.0 | |
| steer = np.clip(steer, -1.0, 1.0) | |
| target_speed = self.target_speed | |
| if junction > 0.5: | |
| target_speed *= 0.7 | |
| if abs(steer) > 0.3: | |
| target_speed *= 0.8 | |
| speed_error = target_speed - current_speed | |
| throttle = self.speed_controller.step(speed_error) | |
| throttle = np.clip(throttle, 0.0, self.config.max_throttle) | |
| brake = False | |
| if traffic_light_state > 0.5 or stop_sign > 0.5 or current_speed > self.config.max_speed: | |
| brake = True | |
| throttle = 0.0 | |
| self.last_steer = steer | |
| self.last_throttle = throttle | |
| metadata = f"Speed:{current_speed:.1f} Target:{target_speed:.1f} Junction:{junction:.2f}" | |
| return steer, throttle, brake, metadata | |
| # ================== Display Interface ================== | |
| class DisplayInterface: | |
| def __init__(self, width: int = 1200, height: int = 600): | |
| self._width = width | |
| self._height = height | |
| self.camera_width = width // 2 | |
| self.camera_height = height | |
| self.map_width = width // 2 | |
| self.map_height = height // 3 | |
| def run_interface(self, data: Dict[str, Any]) -> np.ndarray: | |
| dashboard = np.zeros((self._height, self._width, 3), dtype=np.uint8) | |
| # Camera view | |
| camera_view = data.get('camera_view') | |
| if camera_view is not None: | |
| camera_resized = cv2.resize(camera_view, (self.camera_width, self.camera_height)) | |
| dashboard[:, :self.camera_width] = camera_resized | |
| # Maps | |
| map_start_x = self.camera_width | |
| map_t0 = data.get('map_t0') | |
| if map_t0 is not None: | |
| map_resized = cv2.resize(map_t0, (self.map_width, self.map_height)) | |
| dashboard[:self.map_height, map_start_x:] = map_resized | |
| cv2.putText(dashboard, "Current (t=0)", (map_start_x + 10, 30), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2) | |
| map_t1 = data.get('map_t1') | |
| if map_t1 is not None: | |
| map_resized = cv2.resize(map_t1, (self.map_width, self.map_height)) | |
| y_start = self.map_height | |
| dashboard[y_start:y_start + self.map_height, map_start_x:] = map_resized | |
| cv2.putText(dashboard, f"Future (t={T1_FUTURE_TIME}s)", | |
| (map_start_x + 10, y_start + 30), cv2.FONT_HERSHEY_SIMPLEX, | |
| 0.7, (255, 255, 255), 2) | |
| map_t2 = data.get('map_t2') | |
| if map_t2 is not None: | |
| map_resized = cv2.resize(map_t2, (self.map_width, self.map_height)) | |
| y_start = self.map_height * 2 | |
| dashboard[y_start:, map_start_x:] = map_resized | |
| cv2.putText(dashboard, f"Future (t={T2_FUTURE_TIME}s)", | |
| (map_start_x + 10, y_start + 30), cv2.FONT_HERSHEY_SIMPLEX, | |
| 0.7, (255, 255, 255), 2) | |
| # Text info | |
| text_info = data.get('text_info', {}) | |
| y_offset = 50 | |
| for key, value in text_info.items(): | |
| cv2.putText(dashboard, value, (10, y_offset), cv2.FONT_HERSHEY_SIMPLEX, | |
| 0.6, (0, 255, 0), 2) | |
| y_offset += 30 | |
| return dashboard | |