Spaces:
Paused
Paused
| # Copyright (c) Meta Platforms, Inc. and affiliates. | |
| # All rights reserved. | |
| # | |
| # This source code is licensed under the license found in the | |
| # LICENSE file in the root directory of this source tree. | |
| import logging | |
| from abc import ABCMeta, abstractmethod | |
| from typing import Any, List, Optional, Sequence, Union | |
| from torch import Tensor | |
| from mmengine.dist import (broadcast_object_list, collect_results, | |
| is_main_process) | |
| from mmengine.fileio import dump | |
| from mmengine.logging import print_log | |
| from mmengine.registry import METRICS | |
| from mmengine.structures import BaseDataElement | |
| class BaseMetric(metaclass=ABCMeta): | |
| """Base class for a metric. | |
| The metric first processes each batch of data_samples and predictions, | |
| and appends the processed results to the results list. Then it | |
| collects all results together from all ranks if distributed training | |
| is used. Finally, it computes the metrics of the entire dataset. | |
| A subclass of class:`BaseMetric` should assign a meaningful value to the | |
| class attribute `default_prefix`. See the argument `prefix` for details. | |
| Args: | |
| collect_device (str): Device name used for collecting results from | |
| different ranks during distributed training. Must be 'cpu' or | |
| 'gpu'. Defaults to 'cpu'. | |
| prefix (str, optional): The prefix that will be added in the metric | |
| names to disambiguate homonymous metrics of different evaluators. | |
| If prefix is not provided in the argument, self.default_prefix | |
| will be used instead. Default: None | |
| collect_dir: (str, optional): Synchronize directory for collecting data | |
| from different ranks. This argument should only be configured when | |
| ``collect_device`` is 'cpu'. Defaults to None. | |
| `New in version 0.7.3.` | |
| """ | |
| default_prefix: Optional[str] = None | |
| def __init__(self, | |
| collect_device: str = 'cpu', | |
| prefix: Optional[str] = None, | |
| collect_dir: Optional[str] = None) -> None: | |
| if collect_dir is not None and collect_device != 'cpu': | |
| raise ValueError('`collec_dir` could only be configured when ' | |
| "`collect_device='cpu'`") | |
| self._dataset_meta: Union[None, dict] = None | |
| self.collect_device = collect_device | |
| self.results: List[Any] = [] | |
| self.prefix = prefix or self.default_prefix | |
| self.collect_dir = collect_dir | |
| if self.prefix is None: | |
| print_log( | |
| 'The prefix is not set in metric class ' | |
| f'{self.__class__.__name__}.', | |
| logger='current', | |
| level=logging.WARNING) | |
| def dataset_meta(self) -> Optional[dict]: | |
| """Optional[dict]: Meta info of the dataset.""" | |
| return self._dataset_meta | |
| def dataset_meta(self, dataset_meta: dict) -> None: | |
| """Set the dataset meta info to the metric.""" | |
| self._dataset_meta = dataset_meta | |
| def process(self, data_batch: Any, data_samples: Sequence[dict]) -> None: | |
| """Process one batch of data samples and predictions. The processed | |
| results should be stored in ``self.results``, which will be used to | |
| compute the metrics when all batches have been processed. | |
| Args: | |
| data_batch (Any): A batch of data from the dataloader. | |
| data_samples (Sequence[dict]): A batch of outputs from | |
| the model. | |
| """ | |
| def compute_metrics(self, results: list) -> dict: | |
| """Compute the metrics from processed results. | |
| Args: | |
| results (list): The processed results of each batch. | |
| Returns: | |
| dict: The computed metrics. The keys are the names of the metrics, | |
| and the values are corresponding results. | |
| """ | |
| def evaluate(self, size: int) -> dict: | |
| """Evaluate the model performance of the whole dataset after processing | |
| all batches. | |
| Args: | |
| size (int): Length of the entire validation dataset. When batch | |
| size > 1, the dataloader may pad some data samples to make | |
| sure all ranks have the same length of dataset slice. The | |
| ``collect_results`` function will drop the padded data based on | |
| this size. | |
| Returns: | |
| dict: Evaluation metrics dict on the val dataset. The keys are the | |
| names of the metrics, and the values are corresponding results. | |
| """ | |
| if len(self.results) == 0: | |
| print_log( | |
| f'{self.__class__.__name__} got empty `self.results`. Please ' | |
| 'ensure that the processed results are properly added into ' | |
| '`self.results` in `process` method.', | |
| logger='current', | |
| level=logging.WARNING) | |
| if self.collect_device == 'cpu': | |
| results = collect_results( | |
| self.results, | |
| size, | |
| self.collect_device, | |
| tmpdir=self.collect_dir) | |
| else: | |
| results = collect_results(self.results, size, self.collect_device) | |
| if is_main_process(): | |
| # cast all tensors in results list to cpu | |
| results = _to_cpu(results) | |
| _metrics = self.compute_metrics(results) # type: ignore | |
| # Add prefix to metric names | |
| if self.prefix: | |
| _metrics = { | |
| '/'.join((self.prefix, k)): v | |
| for k, v in _metrics.items() | |
| } | |
| metrics = [_metrics] | |
| else: | |
| metrics = [None] # type: ignore | |
| broadcast_object_list(metrics) | |
| # reset the results list | |
| self.results.clear() | |
| return metrics[0] | |
| class DumpResults(BaseMetric): | |
| """Dump model predictions to a pickle file for offline evaluation. | |
| Args: | |
| out_file_path (str): Path of the dumped file. Must end with '.pkl' | |
| or '.pickle'. | |
| collect_device (str): Device name used for collecting results from | |
| different ranks during distributed training. Must be 'cpu' or | |
| 'gpu'. Defaults to 'cpu'. | |
| collect_dir: (str, optional): Synchronize directory for collecting data | |
| from different ranks. This argument should only be configured when | |
| ``collect_device`` is 'cpu'. Defaults to None. | |
| `New in version 0.7.3.` | |
| """ | |
| def __init__(self, | |
| out_file_path: str, | |
| collect_device: str = 'cpu', | |
| collect_dir: Optional[str] = None) -> None: | |
| super().__init__( | |
| collect_device=collect_device, collect_dir=collect_dir) | |
| if not out_file_path.endswith(('.pkl', '.pickle')): | |
| raise ValueError('The output file must be a pkl file.') | |
| self.out_file_path = out_file_path | |
| def process(self, data_batch: Any, predictions: Sequence[dict]) -> None: | |
| """transfer tensors in predictions to CPU.""" | |
| self.results.extend(_to_cpu(predictions)) | |
| def compute_metrics(self, results: list) -> dict: | |
| """dump the prediction results to a pickle file.""" | |
| dump(results, self.out_file_path) | |
| print_log( | |
| f'Results has been saved to {self.out_file_path}.', | |
| logger='current') | |
| return {} | |
| def _to_cpu(data: Any) -> Any: | |
| """transfer all tensors and BaseDataElement to cpu.""" | |
| if isinstance(data, (Tensor, BaseDataElement)): | |
| return data.to('cpu') | |
| elif isinstance(data, list): | |
| return [_to_cpu(d) for d in data] | |
| elif isinstance(data, tuple): | |
| return tuple(_to_cpu(d) for d in data) | |
| elif isinstance(data, dict): | |
| return {k: _to_cpu(v) for k, v in data.items()} | |
| else: | |
| return data | |