# Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ This code is based on https://github.com/WWangYuHsiang/SMILEtrack/blob/main/BoT-SORT/tracker/bot_sort.py """ import cv2 import matplotlib.pyplot as plt import numpy as np from collections import deque from ..matching import jde_matching as matching from ..motion import GMC from .base_jde_tracker import TrackState, STrack from .base_jde_tracker import joint_stracks, sub_stracks, remove_duplicate_stracks from ..motion import KalmanFilter from ppdet.core.workspace import register, serializable @register @serializable class BOTSORTTracker(object): """ BOTSORT tracker, support single class Args: track_high_thresh (float): threshold of detection high score track_low_thresh (float): threshold of remove detection score new_track_thresh (float): threshold of new track score match_thresh (float): iou threshold for associate track_buffer (int): tracking reserved frames,default 30 min_box_area (float): reserved min box camera_motion (bool): Whether use camera motion, default False cmc_method (str): camera motion method,defalut sparseOptFlow frame_rate (int): fps buffer_size=int(frame_rate / 30.0 * track_buffer) """ def __init__(self, track_high_thresh=0.3, track_low_thresh=0.2, new_track_thresh=0.4, match_thresh=0.7, track_buffer=30, min_box_area=0, camera_motion=False, cmc_method='sparseOptFlow', frame_rate=30): self.tracked_stracks = [] # type: list[STrack] self.lost_stracks = [] # type: list[STrack] self.removed_stracks = [] # type: list[STrack] self.frame_id = 0 self.track_high_thresh = track_high_thresh self.track_low_thresh = track_low_thresh self.new_track_thresh = new_track_thresh self.match_thresh = match_thresh self.buffer_size = int(frame_rate / 30.0 * track_buffer) self.max_time_lost = self.buffer_size self.kalman_filter = KalmanFilter() self.min_box_area = min_box_area self.camera_motion = camera_motion self.gmc = GMC(method=cmc_method) def update(self, output_results, img=None): self.frame_id += 1 activated_starcks = [] refind_stracks = [] lost_stracks = [] removed_stracks = [] if len(output_results): bboxes = output_results[:, 2:6] scores = output_results[:, 1] classes = output_results[:, 0] # Remove bad detections lowest_inds = scores > self.track_low_thresh bboxes = bboxes[lowest_inds] scores = scores[lowest_inds] classes = classes[lowest_inds] # Find high threshold detections remain_inds = scores > self.track_high_thresh dets = bboxes[remain_inds] scores_keep = scores[remain_inds] classes_keep = classes[remain_inds] else: bboxes = [] scores = [] classes = [] dets = [] scores_keep = [] classes_keep = [] if len(dets) > 0: '''Detections''' detections = [ STrack(STrack.tlbr_to_tlwh(tlbr), s, c) for (tlbr, s, c) in zip(dets, scores_keep, classes_keep) ] else: detections = [] ''' Add newly detected tracklets to tracked_stracks''' unconfirmed = [] tracked_stracks = [] # type: list[STrack] for track in self.tracked_stracks: if not track.is_activated: unconfirmed.append(track) else: tracked_stracks.append(track) ''' Step 2: First association, with high score detection boxes''' strack_pool = joint_stracks(tracked_stracks, self.lost_stracks) # Predict the current location with KF STrack.multi_predict(strack_pool, self.kalman_filter) # Fix camera motion if self.camera_motion: warp = self.gmc.apply(img[0], dets) STrack.multi_gmc(strack_pool, warp) STrack.multi_gmc(unconfirmed, warp) # Associate with high score detection boxes ious_dists = matching.iou_distance(strack_pool, detections) matches, u_track, u_detection = matching.linear_assignment( ious_dists, thresh=self.match_thresh) for itracked, idet in matches: track = strack_pool[itracked] det = detections[idet] if track.state == TrackState.Tracked: track.update(detections[idet], self.frame_id) activated_starcks.append(track) else: track.re_activate(det, self.frame_id, new_id=False) refind_stracks.append(track) ''' Step 3: Second association, with low score detection boxes''' if len(scores): inds_high = scores < self.track_high_thresh inds_low = scores > self.track_low_thresh inds_second = np.logical_and(inds_low, inds_high) dets_second = bboxes[inds_second] scores_second = scores[inds_second] classes_second = classes[inds_second] else: dets_second = [] scores_second = [] classes_second = [] # association the untrack to the low score detections if len(dets_second) > 0: '''Detections''' detections_second = [ STrack(STrack.tlbr_to_tlwh(tlbr), s, c) for (tlbr, s, c) in zip(dets_second, scores_second, classes_second) ] else: detections_second = [] r_tracked_stracks = [ strack_pool[i] for i in u_track if strack_pool[i].state == TrackState.Tracked ] dists = matching.iou_distance(r_tracked_stracks, detections_second) matches, u_track, u_detection_second = matching.linear_assignment( dists, thresh=0.5) for itracked, idet in matches: track = r_tracked_stracks[itracked] det = detections_second[idet] if track.state == TrackState.Tracked: track.update(det, self.frame_id) activated_starcks.append(track) else: track.re_activate(det, self.frame_id, new_id=False) refind_stracks.append(track) for it in u_track: track = r_tracked_stracks[it] if not track.state == TrackState.Lost: track.mark_lost() lost_stracks.append(track) '''Deal with unconfirmed tracks, usually tracks with only one beginning frame''' detections = [detections[i] for i in u_detection] dists = matching.iou_distance(unconfirmed, detections) matches, u_unconfirmed, u_detection = matching.linear_assignment( dists, thresh=0.7) for itracked, idet in matches: unconfirmed[itracked].update(detections[idet], self.frame_id) activated_starcks.append(unconfirmed[itracked]) for it in u_unconfirmed: track = unconfirmed[it] track.mark_removed() removed_stracks.append(track) """ Step 4: Init new stracks""" for inew in u_detection: track = detections[inew] if track.score < self.new_track_thresh: continue track.activate(self.kalman_filter, self.frame_id) activated_starcks.append(track) """ Step 5: Update state""" for track in self.lost_stracks: if self.frame_id - track.end_frame > self.max_time_lost: track.mark_removed() removed_stracks.append(track) """ Merge """ self.tracked_stracks = [ t for t in self.tracked_stracks if t.state == TrackState.Tracked ] self.tracked_stracks = joint_stracks(self.tracked_stracks, activated_starcks) self.tracked_stracks = joint_stracks(self.tracked_stracks, refind_stracks) self.lost_stracks = sub_stracks(self.lost_stracks, self.tracked_stracks) self.lost_stracks.extend(lost_stracks) self.lost_stracks = sub_stracks(self.lost_stracks, self.removed_stracks) self.removed_stracks.extend(removed_stracks) self.tracked_stracks, self.lost_stracks = remove_duplicate_stracks( self.tracked_stracks, self.lost_stracks) # output_stracks = [track for track in self.tracked_stracks if track.is_activated] output_stracks = [track for track in self.tracked_stracks] return output_stracks