123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379 |
- # Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- """
- This code is based on https://github.com/nwojke/deep_sort/tree/master/deep_sort
- """
- import numpy as np
- from scipy.optimize import linear_sum_assignment
- from ..motion import kalman_filter
- INFTY_COST = 1e+5
- __all__ = [
- 'iou_1toN',
- 'iou_cost',
- '_nn_euclidean_distance',
- '_nn_cosine_distance',
- 'NearestNeighborDistanceMetric',
- 'min_cost_matching',
- 'matching_cascade',
- 'gate_cost_matrix',
- ]
- def iou_1toN(bbox, candidates):
- """
- Computer intersection over union (IoU) by one box to N candidates.
- Args:
- bbox (ndarray): A bounding box in format `(top left x, top left y, width, height)`.
- candidates (ndarray): A matrix of candidate bounding boxes (one per row) in the
- same format as `bbox`.
- Returns:
- ious (ndarray): The intersection over union in [0, 1] between the `bbox`
- and each candidate. A higher score means a larger fraction of the
- `bbox` is occluded by the candidate.
- """
- bbox_tl = bbox[:2]
- bbox_br = bbox[:2] + bbox[2:]
- candidates_tl = candidates[:, :2]
- candidates_br = candidates[:, :2] + candidates[:, 2:]
- tl = np.c_[np.maximum(bbox_tl[0], candidates_tl[:, 0])[:, np.newaxis],
- np.maximum(bbox_tl[1], candidates_tl[:, 1])[:, np.newaxis]]
- br = np.c_[np.minimum(bbox_br[0], candidates_br[:, 0])[:, np.newaxis],
- np.minimum(bbox_br[1], candidates_br[:, 1])[:, np.newaxis]]
- wh = np.maximum(0., br - tl)
- area_intersection = wh.prod(axis=1)
- area_bbox = bbox[2:].prod()
- area_candidates = candidates[:, 2:].prod(axis=1)
- ious = area_intersection / (area_bbox + area_candidates - area_intersection)
- return ious
- def iou_cost(tracks, detections, track_indices=None, detection_indices=None):
- """
- IoU distance metric.
- Args:
- tracks (list[Track]): A list of tracks.
- detections (list[Detection]): A list of detections.
- track_indices (Optional[list[int]]): A list of indices to tracks that
- should be matched. Defaults to all `tracks`.
- detection_indices (Optional[list[int]]): A list of indices to detections
- that should be matched. Defaults to all `detections`.
- Returns:
- cost_matrix (ndarray): A cost matrix of shape len(track_indices),
- len(detection_indices) where entry (i, j) is
- `1 - iou(tracks[track_indices[i]], detections[detection_indices[j]])`.
- """
- if track_indices is None:
- track_indices = np.arange(len(tracks))
- if detection_indices is None:
- detection_indices = np.arange(len(detections))
- cost_matrix = np.zeros((len(track_indices), len(detection_indices)))
- for row, track_idx in enumerate(track_indices):
- if tracks[track_idx].time_since_update > 1:
- cost_matrix[row, :] = 1e+5
- continue
- bbox = tracks[track_idx].to_tlwh()
- candidates = np.asarray([detections[i].tlwh for i in detection_indices])
- cost_matrix[row, :] = 1. - iou_1toN(bbox, candidates)
- return cost_matrix
- def _nn_euclidean_distance(s, q):
- """
- Compute pair-wise squared (Euclidean) distance between points in `s` and `q`.
- Args:
- s (ndarray): Sample points: an NxM matrix of N samples of dimensionality M.
- q (ndarray): Query points: an LxM matrix of L samples of dimensionality M.
- Returns:
- distances (ndarray): A vector of length M that contains for each entry in `q` the
- smallest Euclidean distance to a sample in `s`.
- """
- s, q = np.asarray(s), np.asarray(q)
- if len(s) == 0 or len(q) == 0:
- return np.zeros((len(s), len(q)))
- s2, q2 = np.square(s).sum(axis=1), np.square(q).sum(axis=1)
- distances = -2. * np.dot(s, q.T) + s2[:, None] + q2[None, :]
- distances = np.clip(distances, 0., float(np.inf))
- return np.maximum(0.0, distances.min(axis=0))
- def _nn_cosine_distance(s, q):
- """
- Compute pair-wise cosine distance between points in `s` and `q`.
- Args:
- s (ndarray): Sample points: an NxM matrix of N samples of dimensionality M.
- q (ndarray): Query points: an LxM matrix of L samples of dimensionality M.
- Returns:
- distances (ndarray): A vector of length M that contains for each entry in `q` the
- smallest Euclidean distance to a sample in `s`.
- """
- s = np.asarray(s) / np.linalg.norm(s, axis=1, keepdims=True)
- q = np.asarray(q) / np.linalg.norm(q, axis=1, keepdims=True)
- distances = 1. - np.dot(s, q.T)
- return distances.min(axis=0)
- class NearestNeighborDistanceMetric(object):
- """
- A nearest neighbor distance metric that, for each target, returns
- the closest distance to any sample that has been observed so far.
- Args:
- metric (str): Either "euclidean" or "cosine".
- matching_threshold (float): The matching threshold. Samples with larger
- distance are considered an invalid match.
- budget (Optional[int]): If not None, fix samples per class to at most
- this number. Removes the oldest samples when the budget is reached.
- Attributes:
- samples (Dict[int -> List[ndarray]]): A dictionary that maps from target
- identities to the list of samples that have been observed so far.
- """
- def __init__(self, metric, matching_threshold, budget=None):
- if metric == "euclidean":
- self._metric = _nn_euclidean_distance
- elif metric == "cosine":
- self._metric = _nn_cosine_distance
- else:
- raise ValueError(
- "Invalid metric; must be either 'euclidean' or 'cosine'")
- self.matching_threshold = matching_threshold
- self.budget = budget
- self.samples = {}
- def partial_fit(self, features, targets, active_targets):
- """
- Update the distance metric with new data.
- Args:
- features (ndarray): An NxM matrix of N features of dimensionality M.
- targets (ndarray): An integer array of associated target identities.
- active_targets (List[int]): A list of targets that are currently
- present in the scene.
- """
- for feature, target in zip(features, targets):
- self.samples.setdefault(target, []).append(feature)
- if self.budget is not None:
- self.samples[target] = self.samples[target][-self.budget:]
- self.samples = {k: self.samples[k] for k in active_targets}
- def distance(self, features, targets):
- """
- Compute distance between features and targets.
- Args:
- features (ndarray): An NxM matrix of N features of dimensionality M.
- targets (list[int]): A list of targets to match the given `features` against.
- Returns:
- cost_matrix (ndarray): a cost matrix of shape len(targets), len(features),
- where element (i, j) contains the closest squared distance between
- `targets[i]` and `features[j]`.
- """
- cost_matrix = np.zeros((len(targets), len(features)))
- for i, target in enumerate(targets):
- cost_matrix[i, :] = self._metric(self.samples[target], features)
- return cost_matrix
- def min_cost_matching(distance_metric,
- max_distance,
- tracks,
- detections,
- track_indices=None,
- detection_indices=None):
- """
- Solve linear assignment problem.
- Args:
- distance_metric :
- Callable[List[Track], List[Detection], List[int], List[int]) -> ndarray
- The distance metric is given a list of tracks and detections as
- well as a list of N track indices and M detection indices. The
- metric should return the NxM dimensional cost matrix, where element
- (i, j) is the association cost between the i-th track in the given
- track indices and the j-th detection in the given detection_indices.
- max_distance (float): Gating threshold. Associations with cost larger
- than this value are disregarded.
- tracks (list[Track]): A list of predicted tracks at the current time
- step.
- detections (list[Detection]): A list of detections at the current time
- step.
- track_indices (list[int]): List of track indices that maps rows in
- `cost_matrix` to tracks in `tracks`.
- detection_indices (List[int]): List of detection indices that maps
- columns in `cost_matrix` to detections in `detections`.
- Returns:
- A tuple (List[(int, int)], List[int], List[int]) with the following
- three entries:
- * A list of matched track and detection indices.
- * A list of unmatched track indices.
- * A list of unmatched detection indices.
- """
- if track_indices is None:
- track_indices = np.arange(len(tracks))
- if detection_indices is None:
- detection_indices = np.arange(len(detections))
- if len(detection_indices) == 0 or len(track_indices) == 0:
- return [], track_indices, detection_indices # Nothing to match.
- cost_matrix = distance_metric(tracks, detections, track_indices,
- detection_indices)
- cost_matrix[cost_matrix > max_distance] = max_distance + 1e-5
- indices = linear_sum_assignment(cost_matrix)
- matches, unmatched_tracks, unmatched_detections = [], [], []
- for col, detection_idx in enumerate(detection_indices):
- if col not in indices[1]:
- unmatched_detections.append(detection_idx)
- for row, track_idx in enumerate(track_indices):
- if row not in indices[0]:
- unmatched_tracks.append(track_idx)
- for row, col in zip(indices[0], indices[1]):
- track_idx = track_indices[row]
- detection_idx = detection_indices[col]
- if cost_matrix[row, col] > max_distance:
- unmatched_tracks.append(track_idx)
- unmatched_detections.append(detection_idx)
- else:
- matches.append((track_idx, detection_idx))
- return matches, unmatched_tracks, unmatched_detections
- def matching_cascade(distance_metric,
- max_distance,
- cascade_depth,
- tracks,
- detections,
- track_indices=None,
- detection_indices=None):
- """
- Run matching cascade.
- Args:
- distance_metric :
- Callable[List[Track], List[Detection], List[int], List[int]) -> ndarray
- The distance metric is given a list of tracks and detections as
- well as a list of N track indices and M detection indices. The
- metric should return the NxM dimensional cost matrix, where element
- (i, j) is the association cost between the i-th track in the given
- track indices and the j-th detection in the given detection_indices.
- max_distance (float): Gating threshold. Associations with cost larger
- than this value are disregarded.
- cascade_depth (int): The cascade depth, should be se to the maximum
- track age.
- tracks (list[Track]): A list of predicted tracks at the current time
- step.
- detections (list[Detection]): A list of detections at the current time
- step.
- track_indices (list[int]): List of track indices that maps rows in
- `cost_matrix` to tracks in `tracks`.
- detection_indices (List[int]): List of detection indices that maps
- columns in `cost_matrix` to detections in `detections`.
- Returns:
- A tuple (List[(int, int)], List[int], List[int]) with the following
- three entries:
- * A list of matched track and detection indices.
- * A list of unmatched track indices.
- * A list of unmatched detection indices.
- """
- if track_indices is None:
- track_indices = list(range(len(tracks)))
- if detection_indices is None:
- detection_indices = list(range(len(detections)))
- unmatched_detections = detection_indices
- matches = []
- for level in range(cascade_depth):
- if len(unmatched_detections) == 0: # No detections left
- break
- track_indices_l = [
- k for k in track_indices if tracks[k].time_since_update == 1 + level
- ]
- if len(track_indices_l) == 0: # Nothing to match at this level
- continue
- matches_l, _, unmatched_detections = \
- min_cost_matching(
- distance_metric, max_distance, tracks, detections,
- track_indices_l, unmatched_detections)
- matches += matches_l
- unmatched_tracks = list(set(track_indices) - set(k for k, _ in matches))
- return matches, unmatched_tracks, unmatched_detections
- def gate_cost_matrix(kf,
- cost_matrix,
- tracks,
- detections,
- track_indices,
- detection_indices,
- gated_cost=INFTY_COST,
- only_position=False):
- """
- Invalidate infeasible entries in cost matrix based on the state
- distributions obtained by Kalman filtering.
- Args:
- kf (object): The Kalman filter.
- cost_matrix (ndarray): The NxM dimensional cost matrix, where N is the
- number of track indices and M is the number of detection indices,
- such that entry (i, j) is the association cost between
- `tracks[track_indices[i]]` and `detections[detection_indices[j]]`.
- tracks (list[Track]): A list of predicted tracks at the current time
- step.
- detections (list[Detection]): A list of detections at the current time
- step.
- track_indices (List[int]): List of track indices that maps rows in
- `cost_matrix` to tracks in `tracks`.
- detection_indices (List[int]): List of detection indices that maps
- columns in `cost_matrix` to detections in `detections`.
- gated_cost (Optional[float]): Entries in the cost matrix corresponding
- to infeasible associations are set this value. Defaults to a very
- large value.
- only_position (Optional[bool]): If True, only the x, y position of the
- state distribution is considered during gating. Default False.
- """
- gating_dim = 2 if only_position else 4
- gating_threshold = kalman_filter.chi2inv95[gating_dim]
- measurements = np.asarray(
- [detections[i].to_xyah() for i in detection_indices])
- for row, track_idx in enumerate(track_indices):
- track = tracks[track_idx]
- gating_distance = kf.gating_distance(track.mean, track.covariance,
- measurements, only_position)
- cost_matrix[row, gating_distance > gating_threshold] = gated_cost
- return cost_matrix
|