Spaces:
Runtime error
Runtime error
| # coding=utf-8 | |
| # Copyright 2021 The Deeplab2 Authors. | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| r"""This file contains code to track based on IoU overlaps. | |
| The IoUTracker takes frame-by-frame panoptic segmentation prediction and | |
| generates video panoptic segmentation with re-ordered identities based on IoU | |
| overlaps within consecutive frames. | |
| To run this script, you need to install scipy. | |
| For example, install it via pip: | |
| $pip install scipy | |
| """ | |
| import collections | |
| import os | |
| import pprint | |
| from typing import List, Text, Tuple, Optional | |
| from absl import app | |
| from absl import flags | |
| from absl import logging | |
| import numpy as np | |
| from scipy import optimize | |
| import tensorflow as tf | |
| from deeplab2.data import dataset | |
| from deeplab2.evaluation import segmentation_and_tracking_quality as stq | |
| from deeplab2.tracker import optical_flow_utils | |
| from deeplab2.trainer import vis_utils | |
| FLAGS = flags.FLAGS | |
| flags.DEFINE_string('gt', None, 'The path to the gt video frames. This folder ' | |
| 'should contain one folder per sequence.') | |
| flags.DEFINE_string('pred', None, 'The path to the prediction video frames. ' | |
| 'This folder should contain one folder per sequence.') | |
| flags.DEFINE_string('output', '', 'The path to store the tracked video frames.' | |
| 'This folder should contain one folder per sequence.') | |
| flags.DEFINE_string('sequence', '', 'The sequence ID to evaluate on.') | |
| flags.DEFINE_string( | |
| 'dataset', 'kitti_step', 'The specified dataset is used' | |
| ' to interpret the labels. Supported options are: ' + | |
| ', '.join(dataset.MAP_NAMES)) | |
| flags.DEFINE_string('optical_flow', None, | |
| 'The path to the optical flow predictions. This folder ' | |
| 'should contain one folder per sequence.') | |
| _LABEL_DIVISOR = 10000 | |
| _OCCLUSION_EXT = '.occ_forward' | |
| _FLOW_EXT = '.flow_forward' | |
| def _format_output(output, indent=4): | |
| """Formats `output`, either on one line, or indented across multiple lines.""" | |
| formatted = pprint.pformat(output) | |
| lines = formatted.splitlines() | |
| if len(lines) == 1: | |
| return formatted | |
| lines = [' ' * indent + line for line in lines] | |
| return '\n' + '\n'.join(lines) | |
| def _compute_mask_iou(instance_a: np.ndarray, instance_b: np.ndarray) -> int: | |
| """Computes the IoU of two binary masks.""" | |
| intersection = np.count_nonzero( | |
| np.logical_and(instance_a > 0, instance_b > 0).astype(np.uint8)) | |
| non_intersection_a = np.count_nonzero(instance_a > 0) - intersection | |
| non_intersection_b = np.count_nonzero(instance_b > 0) - intersection | |
| return intersection / ( | |
| intersection + non_intersection_a + non_intersection_b) | |
| class IoUTracker(object): | |
| """This class computes track IDs based on IoU overlap.""" | |
| def __init__(self, | |
| classes_to_track: List[int], | |
| label_divisor: int, | |
| sigma=10, | |
| iou_threshold=0.3): | |
| """Initializes the tracker. | |
| Args: | |
| classes_to_track: A list of class IDs that should be tracked. | |
| label_divisor: The divisor to split the label map into semantic classes | |
| and instance IDs. | |
| sigma: An integer specifying the number of frames that tracks should be | |
| kept active while being discontinued. | |
| iou_threshold: A float specifying the minimum IoU value for a match. | |
| """ | |
| self._sigma = sigma | |
| self._iou_threshold = iou_threshold | |
| self._classes_to_track = classes_to_track | |
| self._label_divisor = label_divisor | |
| self.reset_states() | |
| def reset_states(self): | |
| """Resets all tracking states.""" | |
| self._last_mask_per_track = { | |
| i: collections.OrderedDict() for i in self._classes_to_track | |
| } | |
| self._frames_since_last_update = { | |
| i: collections.OrderedDict() for i in self._classes_to_track | |
| } | |
| # `0` is reserved for `crowd`. | |
| self._next_track_id = 1 | |
| def _add_track(self, object_mask: np.ndarray, class_index: int): | |
| """Adds a new track.""" | |
| track_id = self._next_track_id | |
| self._last_mask_per_track[class_index][track_id] = object_mask | |
| self._frames_since_last_update[class_index][track_id] = 0 | |
| self._next_track_id += 1 | |
| def _remove_track(self, track_id: int, class_index: int): | |
| """Removes a track.""" | |
| del self._last_mask_per_track[class_index][track_id] | |
| del self._frames_since_last_update[class_index][track_id] | |
| def _increase_inactivity_of_track(self, track_id: int, class_index: int): | |
| """Increases inactivity of track and potentially remove it.""" | |
| self._frames_since_last_update[class_index][track_id] += 1 | |
| if (self._frames_since_last_update[class_index][track_id] > | |
| self._sigma): | |
| self._remove_track(track_id, class_index) | |
| def _match_instances_to_tracks( | |
| self, instances: List[np.ndarray], class_index: int, | |
| instances_with_track_id: np.ndarray, | |
| warped_instances: List[np.ndarray]) -> np.ndarray: | |
| """Match instances to tracks and update tracks accordingly.""" | |
| track_ids = list(self._last_mask_per_track[class_index].keys()) | |
| # Match instances to tracks based on IoU overlap. | |
| if warped_instances: | |
| matches, unmatched_instances, unmatched_tracks = ( | |
| self._associate_instances_to_tracks(warped_instances, class_index)) | |
| else: | |
| matches, unmatched_instances, unmatched_tracks = ( | |
| self._associate_instances_to_tracks(instances, class_index)) | |
| # Extend existing tracks. | |
| for instance_index, track_id_index in matches: | |
| track_id = track_ids[track_id_index] | |
| instance_mask = instances[instance_index] | |
| self._last_mask_per_track[class_index][track_id] = instance_mask | |
| self._frames_since_last_update[class_index][track_id] = 0 | |
| instances_with_track_id[instance_mask] = track_id | |
| # Add new tracks. | |
| for instance_index in unmatched_instances: | |
| instance_mask = instances[instance_index] | |
| self._add_track(instance_mask, class_index) | |
| instances_with_track_id[instance_mask] = self._next_track_id - 1 | |
| # Remove tracks that are inactive for more than `sigma` frames. | |
| for track_id_index in unmatched_tracks: | |
| track_id = track_ids[track_id_index] | |
| self._increase_inactivity_of_track(track_id, class_index) | |
| return instances_with_track_id | |
| def update(self, predicted_frame: np.ndarray, | |
| predicted_flow: Optional[np.ndarray], | |
| predicted_occlusion: Optional[np.ndarray]) -> np.ndarray: | |
| """Updates the tracking states and computes the track IDs. | |
| Args: | |
| predicted_frame: The panoptic label map for a particular video frame. | |
| predicted_flow: An optional np.array containing the optical flow. | |
| predicted_occlusion: An optional np.array containing the predicted | |
| occlusion map. | |
| Returns: | |
| The updated panoptic label map for the input frame containing track IDs. | |
| """ | |
| predicted_classes = predicted_frame // self._label_divisor | |
| predicted_instances = predicted_frame % self._label_divisor | |
| instances_with_track_id = np.zeros_like(predicted_instances) | |
| for class_index in self._classes_to_track: | |
| instances_mask = np.logical_and(predicted_classes == class_index, | |
| predicted_instances > 0) | |
| instance_ids = np.unique(predicted_instances[instances_mask]) | |
| instances = [ | |
| np.logical_and(instances_mask, predicted_instances == i) | |
| for i in instance_ids | |
| ] | |
| # If current class has no instances, check if tracks needs to be removed, | |
| # because they are inactive for more than `sigma` frames. | |
| if not instances: | |
| immutable_key_list = list(self._frames_since_last_update[class_index]) | |
| for track_id in immutable_key_list: | |
| self._increase_inactivity_of_track(track_id, class_index) | |
| continue | |
| # If there are no tracks recorded yet, all all instances as new tracks. | |
| if not self._last_mask_per_track[class_index]: | |
| for instance_mask in instances: | |
| self._add_track(instance_mask, class_index) | |
| instances_with_track_id[instance_mask] = self._next_track_id - 1 | |
| else: | |
| # If optical flow is used, warp all instances. | |
| warped_instances = [] | |
| if predicted_occlusion is not None and predicted_flow is not None: | |
| for instance in instances: | |
| warped_instance = optical_flow_utils.warp_flow( | |
| instance.astype(np.float32), predicted_flow) | |
| warped_instances.append( | |
| optical_flow_utils.remove_occlusions(warped_instance, | |
| predicted_occlusion)) | |
| instances_with_track_id = self._match_instances_to_tracks( | |
| instances, class_index, instances_with_track_id, warped_instances) | |
| if self._next_track_id >= self._label_divisor: | |
| raise ValueError('To many tracks were detected for the given ' | |
| 'label_divisor. Please increase the label_divisor to ' | |
| 'make sure that the track Ids are less than the ' | |
| 'label_divisor.') | |
| return predicted_classes * self._label_divisor + instances_with_track_id | |
| def _associate_instances_to_tracks( | |
| self, instances: List[np.ndarray], | |
| class_index: int) -> Tuple[List[Tuple[int, int]], List[int], List[int]]: | |
| """Matches the instances to existing tracks. | |
| Args: | |
| instances: A list of numpy arrays specifying the instance masks. | |
| class_index: An integer specifying the class index. | |
| Returns: | |
| A tuple of Lists: | |
| - Containing all indices of matches between instances and tracks. | |
| - Containing all indices of unmatched instances. | |
| - Containing all indices of unmatched tracks. | |
| """ | |
| number_of_instances = len(instances) | |
| number_of_tracks = len(self._last_mask_per_track[class_index]) | |
| iou_matrix = np.zeros((number_of_instances, number_of_tracks)) | |
| for i, instance_mask in enumerate(instances): | |
| for j, last_mask in enumerate( | |
| self._last_mask_per_track[class_index].values()): | |
| iou_matrix[i, j] = _compute_mask_iou(instance_mask, last_mask) | |
| matches_indices = np.stack( | |
| list(optimize.linear_sum_assignment(-iou_matrix)), axis=1) | |
| unmatched_instances = [ | |
| inst_id for inst_id in range(number_of_instances) | |
| if inst_id not in matches_indices[:, 0] | |
| ] | |
| unmatched_tracks = [ | |
| inst_id for inst_id in range(number_of_tracks) | |
| if inst_id not in matches_indices[:, 1] | |
| ] | |
| list_of_matches = [] | |
| for m in matches_indices: | |
| if iou_matrix[m[0], m[1]] > self._iou_threshold: | |
| list_of_matches.append(m) | |
| else: | |
| unmatched_instances.append(m[0]) | |
| unmatched_tracks.append(m[1]) | |
| return list_of_matches, unmatched_instances, unmatched_tracks | |
| def read_panoptic_image(path: Text, label_divisor: int) -> np.ndarray: | |
| """Reads in a panoptic image in 2 channel format and returns as np array.""" | |
| with tf.io.gfile.GFile(path, 'rb') as f: | |
| image = tf.cast(tf.io.decode_image(f.read()), tf.int32).numpy() | |
| return image[..., 0] * label_divisor + image[..., 1] | |
| def read_numpy_tensor(path: Text) -> np.ndarray: | |
| """Reads a numpy array from `path` and returns it.""" | |
| with tf.io.gfile.GFile(path, 'rb') as f: | |
| return np.load(f) | |
| def main(unused_args): | |
| if FLAGS.dataset not in dataset.MAP_NAME_TO_DATASET_INFO: | |
| raise ValueError('Given dataset option is not a valid dataset. Please use ' | |
| '--help to see available options.') | |
| dataset_info = dataset.MAP_NAME_TO_DATASET_INFO[FLAGS.dataset] | |
| thing_classes = dataset_info.class_has_instances_list | |
| ignore_label = dataset_info.ignore_label | |
| num_classes = dataset_info.num_classes | |
| colormap_name = dataset_info.colormap | |
| use_optical_flow = FLAGS.optical_flow is not None | |
| # Create Tracker and metric. | |
| tracker = IoUTracker(thing_classes, _LABEL_DIVISOR) | |
| metric = stq.STQuality(num_classes, thing_classes, ignore_label, | |
| _LABEL_DIVISOR, 256*256*256) | |
| # Get ground-truth files. | |
| for gt_sequence_folder in tf.io.gfile.glob(os.path.join(FLAGS.gt, '*')): | |
| tracker.reset_states() | |
| color_map = dict() | |
| sequence = os.path.basename(gt_sequence_folder) | |
| if FLAGS.sequence and FLAGS.sequence != sequence: | |
| continue | |
| pred_sequence_folder = os.path.join(FLAGS.pred, sequence) | |
| if use_optical_flow: | |
| optical_flow_sequence_folder = os.path.join(FLAGS.optical_flow, sequence) | |
| for gt_frame_path in sorted(tf.io.gfile.glob( | |
| os.path.join(gt_sequence_folder, '*.png'))): | |
| gt_frame_name = gt_frame_path.split('/')[-1] | |
| pred_frame_name = os.path.join(pred_sequence_folder, gt_frame_name) | |
| flow = None | |
| occlusion = None | |
| logging.info('Processing sequence %s: frame %s.', sequence, gt_frame_name) | |
| gt_frame = read_panoptic_image(gt_frame_path, _LABEL_DIVISOR) | |
| pred_frame = read_panoptic_image(pred_frame_name, _LABEL_DIVISOR) | |
| if use_optical_flow: | |
| frame_id = int(os.path.splitext(gt_frame_name)[0]) | |
| flow_path = os.path.join(optical_flow_sequence_folder, | |
| '%06d%s' % (frame_id - 1, _FLOW_EXT)) | |
| occlusion_path = os.path.join(optical_flow_sequence_folder, | |
| '%06d%s' % (frame_id - 1, _OCCLUSION_EXT)) | |
| if tf.io.gfile.exists(flow_path): | |
| flow = read_numpy_tensor(flow_path) | |
| occlusion = read_numpy_tensor(occlusion_path)[0, ..., 0] | |
| else: | |
| logging.info('Could not find optical flow for current frame.') | |
| h, w = gt_frame.shape | |
| flow = np.zeros_like((h, w, 2), np.float32) | |
| occlusion = np.zeros_like((h, w), np.float32) | |
| pred_frame = tracker.update(pred_frame, flow, occlusion) | |
| if FLAGS.output: | |
| output_folder = os.path.join(FLAGS.output, sequence) | |
| tf.io.gfile.makedirs(output_folder) | |
| color_map = vis_utils.save_parsing_result(pred_frame, _LABEL_DIVISOR, | |
| thing_classes, output_folder, | |
| os.path.splitext( | |
| gt_frame_name)[0], | |
| color_map, | |
| colormap_name=colormap_name) | |
| metric.update_state( | |
| tf.convert_to_tensor(gt_frame), tf.convert_to_tensor(pred_frame), | |
| sequence) | |
| logging.info('Final results:') | |
| logging.info(_format_output(metric.result())) | |
| if __name__ == '__main__': | |
| flags.mark_flags_as_required(['gt', 'pred']) | |
| app.run(main) | |