123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189 |
- # Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- """
- This code is based on https://github.com/nwojke/deep_sort/blob/master/deep_sort/tracker.py
- """
- import numpy as np
- from ..motion import KalmanFilter
- from ..matching.deepsort_matching import NearestNeighborDistanceMetric
- from ..matching.deepsort_matching import iou_cost, min_cost_matching, matching_cascade, gate_cost_matrix
- from .base_sde_tracker import Track
- from ..utils import Detection
- from ppdet.core.workspace import register, serializable
- from ppdet.utils.logger import setup_logger
- logger = setup_logger(__name__)
- __all__ = ['DeepSORTTracker']
- @register
- @serializable
- class DeepSORTTracker(object):
- """
- DeepSORT tracker
- Args:
- input_size (list): input feature map size to reid model, [h, w] format,
- [64, 192] as default.
- min_box_area (int): min box area to filter out low quality boxes
- vertical_ratio (float): w/h, the vertical ratio of the bbox to filter
- bad results, set 1.6 default for pedestrian tracking. If set <=0
- means no need to filter bboxes.
- budget (int): If not None, fix samples per class to at most this number.
- Removes the oldest samples when the budget is reached.
- max_age (int): maximum number of missed misses before a track is deleted
- n_init (float): Number of frames that a track remains in initialization
- phase. Number of consecutive detections before the track is confirmed.
- The track state is set to `Deleted` if a miss occurs within the first
- `n_init` frames.
- metric_type (str): either "euclidean" or "cosine", the distance metric
- used for measurement to track association.
- matching_threshold (float): samples with larger distance are
- considered an invalid match.
- max_iou_distance (float): max iou distance threshold
- motion (object): KalmanFilter instance
- """
- def __init__(self,
- input_size=[64, 192],
- min_box_area=0,
- vertical_ratio=-1,
- budget=100,
- max_age=70,
- n_init=3,
- metric_type='cosine',
- matching_threshold=0.2,
- max_iou_distance=0.9,
- motion='KalmanFilter'):
- self.input_size = input_size
- self.min_box_area = min_box_area
- self.vertical_ratio = vertical_ratio
- self.max_age = max_age
- self.n_init = n_init
- self.metric = NearestNeighborDistanceMetric(metric_type,
- matching_threshold, budget)
- self.max_iou_distance = max_iou_distance
- if motion == 'KalmanFilter':
- self.motion = KalmanFilter()
- self.tracks = []
- self._next_id = 1
- def predict(self):
- """
- Propagate track state distributions one time step forward.
- This function should be called once every time step, before `update`.
- """
- for track in self.tracks:
- track.predict(self.motion)
- def update(self, pred_dets, pred_embs):
- """
- Perform measurement update and track management.
- Args:
- pred_dets (np.array): Detection results of the image, the shape is
- [N, 6], means 'cls_id, score, x0, y0, x1, y1'.
- pred_embs (np.array): Embedding results of the image, the shape is
- [N, 128], usually pred_embs.shape[1] is a multiple of 128.
- """
- pred_cls_ids = pred_dets[:, 0:1]
- pred_scores = pred_dets[:, 1:2]
- pred_xyxys = pred_dets[:, 2:6]
- pred_tlwhs = np.concatenate((pred_xyxys[:, 0:2], pred_xyxys[:, 2:4] - pred_xyxys[:, 0:2] + 1), axis=1)
- detections = [
- Detection(tlwh, score, feat, cls_id)
- for tlwh, score, feat, cls_id in zip(pred_tlwhs, pred_scores,
- pred_embs, pred_cls_ids)
- ]
- # Run matching cascade.
- matches, unmatched_tracks, unmatched_detections = \
- self._match(detections)
- # Update track set.
- for track_idx, detection_idx in matches:
- self.tracks[track_idx].update(self.motion,
- detections[detection_idx])
- for track_idx in unmatched_tracks:
- self.tracks[track_idx].mark_missed()
- for detection_idx in unmatched_detections:
- self._initiate_track(detections[detection_idx])
- self.tracks = [t for t in self.tracks if not t.is_deleted()]
- # Update distance metric.
- active_targets = [t.track_id for t in self.tracks if t.is_confirmed()]
- features, targets = [], []
- for track in self.tracks:
- if not track.is_confirmed():
- continue
- features += track.features
- targets += [track.track_id for _ in track.features]
- track.features = []
- self.metric.partial_fit(
- np.asarray(features), np.asarray(targets), active_targets)
- output_stracks = self.tracks
- return output_stracks
- def _match(self, detections):
- def gated_metric(tracks, dets, track_indices, detection_indices):
- features = np.array([dets[i].feature for i in detection_indices])
- targets = np.array([tracks[i].track_id for i in track_indices])
- cost_matrix = self.metric.distance(features, targets)
- cost_matrix = gate_cost_matrix(self.motion, cost_matrix, tracks,
- dets, track_indices,
- detection_indices)
- return cost_matrix
- # Split track set into confirmed and unconfirmed tracks.
- confirmed_tracks = [
- i for i, t in enumerate(self.tracks) if t.is_confirmed()
- ]
- unconfirmed_tracks = [
- i for i, t in enumerate(self.tracks) if not t.is_confirmed()
- ]
- # Associate confirmed tracks using appearance features.
- matches_a, unmatched_tracks_a, unmatched_detections = \
- matching_cascade(
- gated_metric, self.metric.matching_threshold, self.max_age,
- self.tracks, detections, confirmed_tracks)
- # Associate remaining tracks together with unconfirmed tracks using IOU.
- iou_track_candidates = unconfirmed_tracks + [
- k for k in unmatched_tracks_a
- if self.tracks[k].time_since_update == 1
- ]
- unmatched_tracks_a = [
- k for k in unmatched_tracks_a
- if self.tracks[k].time_since_update != 1
- ]
- matches_b, unmatched_tracks_b, unmatched_detections = \
- min_cost_matching(
- iou_cost, self.max_iou_distance, self.tracks,
- detections, iou_track_candidates, unmatched_detections)
- matches = matches_a + matches_b
- unmatched_tracks = list(set(unmatched_tracks_a + unmatched_tracks_b))
- return matches, unmatched_tracks, unmatched_detections
- def _initiate_track(self, detection):
- mean, covariance = self.motion.initiate(detection.to_xyah())
- self.tracks.append(
- Track(mean, covariance, self._next_id, self.n_init, self.max_age,
- detection.cls_id, detection.score, detection.feature))
- self._next_id += 1
|