123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242 |
- # Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- """
- This code is based on https://github.com/WWangYuHsiang/SMILEtrack/blob/main/BoT-SORT/tracker/bot_sort.py
- """
- import cv2
- import matplotlib.pyplot as plt
- import numpy as np
- from collections import deque
- from ..matching import jde_matching as matching
- from ..motion import GMC
- from .base_jde_tracker import TrackState, STrack
- from .base_jde_tracker import joint_stracks, sub_stracks, remove_duplicate_stracks
- from ..motion import KalmanFilter
- from ppdet.core.workspace import register, serializable
- @register
- @serializable
- class BOTSORTTracker(object):
- """
- BOTSORT tracker, support single class
- Args:
- track_high_thresh (float): threshold of detection high score
- track_low_thresh (float): threshold of remove detection score
- new_track_thresh (float): threshold of new track score
- match_thresh (float): iou threshold for associate
- track_buffer (int): tracking reserved frames,default 30
- min_box_area (float): reserved min box
- camera_motion (bool): Whether use camera motion, default False
- cmc_method (str): camera motion method,defalut sparseOptFlow
- frame_rate (int): fps buffer_size=int(frame_rate / 30.0 * track_buffer)
- """
- def __init__(self,
- track_high_thresh=0.3,
- track_low_thresh=0.2,
- new_track_thresh=0.4,
- match_thresh=0.7,
- track_buffer=30,
- min_box_area=0,
- camera_motion=False,
- cmc_method='sparseOptFlow',
- frame_rate=30):
- self.tracked_stracks = [] # type: list[STrack]
- self.lost_stracks = [] # type: list[STrack]
- self.removed_stracks = [] # type: list[STrack]
- self.frame_id = 0
- self.track_high_thresh = track_high_thresh
- self.track_low_thresh = track_low_thresh
- self.new_track_thresh = new_track_thresh
- self.match_thresh = match_thresh
- self.buffer_size = int(frame_rate / 30.0 * track_buffer)
- self.max_time_lost = self.buffer_size
- self.kalman_filter = KalmanFilter()
- self.min_box_area = min_box_area
- self.camera_motion = camera_motion
- self.gmc = GMC(method=cmc_method)
- def update(self, output_results, img=None):
- self.frame_id += 1
- activated_starcks = []
- refind_stracks = []
- lost_stracks = []
- removed_stracks = []
- if len(output_results):
- bboxes = output_results[:, 2:6]
- scores = output_results[:, 1]
- classes = output_results[:, 0]
- # Remove bad detections
- lowest_inds = scores > self.track_low_thresh
- bboxes = bboxes[lowest_inds]
- scores = scores[lowest_inds]
- classes = classes[lowest_inds]
- # Find high threshold detections
- remain_inds = scores > self.track_high_thresh
- dets = bboxes[remain_inds]
- scores_keep = scores[remain_inds]
- classes_keep = classes[remain_inds]
- else:
- bboxes = []
- scores = []
- classes = []
- dets = []
- scores_keep = []
- classes_keep = []
- if len(dets) > 0:
- '''Detections'''
- detections = [
- STrack(STrack.tlbr_to_tlwh(tlbr), s, c)
- for (tlbr, s, c) in zip(dets, scores_keep, classes_keep)
- ]
- else:
- detections = []
- ''' Add newly detected tracklets to tracked_stracks'''
- unconfirmed = []
- tracked_stracks = [] # type: list[STrack]
- for track in self.tracked_stracks:
- if not track.is_activated:
- unconfirmed.append(track)
- else:
- tracked_stracks.append(track)
- ''' Step 2: First association, with high score detection boxes'''
- strack_pool = joint_stracks(tracked_stracks, self.lost_stracks)
- # Predict the current location with KF
- STrack.multi_predict(strack_pool, self.kalman_filter)
- # Fix camera motion
- if self.camera_motion:
- warp = self.gmc.apply(img[0], dets)
- STrack.multi_gmc(strack_pool, warp)
- STrack.multi_gmc(unconfirmed, warp)
- # Associate with high score detection boxes
- ious_dists = matching.iou_distance(strack_pool, detections)
- matches, u_track, u_detection = matching.linear_assignment(
- ious_dists, thresh=self.match_thresh)
- for itracked, idet in matches:
- track = strack_pool[itracked]
- det = detections[idet]
- if track.state == TrackState.Tracked:
- track.update(detections[idet], self.frame_id)
- activated_starcks.append(track)
- else:
- track.re_activate(det, self.frame_id, new_id=False)
- refind_stracks.append(track)
- ''' Step 3: Second association, with low score detection boxes'''
- if len(scores):
- inds_high = scores < self.track_high_thresh
- inds_low = scores > self.track_low_thresh
- inds_second = np.logical_and(inds_low, inds_high)
- dets_second = bboxes[inds_second]
- scores_second = scores[inds_second]
- classes_second = classes[inds_second]
- else:
- dets_second = []
- scores_second = []
- classes_second = []
- # association the untrack to the low score detections
- if len(dets_second) > 0:
- '''Detections'''
- detections_second = [
- STrack(STrack.tlbr_to_tlwh(tlbr), s, c) for (tlbr, s, c) in
- zip(dets_second, scores_second, classes_second)
- ]
- else:
- detections_second = []
- r_tracked_stracks = [
- strack_pool[i] for i in u_track
- if strack_pool[i].state == TrackState.Tracked
- ]
- dists = matching.iou_distance(r_tracked_stracks, detections_second)
- matches, u_track, u_detection_second = matching.linear_assignment(
- dists, thresh=0.5)
- for itracked, idet in matches:
- track = r_tracked_stracks[itracked]
- det = detections_second[idet]
- if track.state == TrackState.Tracked:
- track.update(det, self.frame_id)
- activated_starcks.append(track)
- else:
- track.re_activate(det, self.frame_id, new_id=False)
- refind_stracks.append(track)
- for it in u_track:
- track = r_tracked_stracks[it]
- if not track.state == TrackState.Lost:
- track.mark_lost()
- lost_stracks.append(track)
- '''Deal with unconfirmed tracks, usually tracks with only one beginning frame'''
- detections = [detections[i] for i in u_detection]
- dists = matching.iou_distance(unconfirmed, detections)
- matches, u_unconfirmed, u_detection = matching.linear_assignment(
- dists, thresh=0.7)
- for itracked, idet in matches:
- unconfirmed[itracked].update(detections[idet], self.frame_id)
- activated_starcks.append(unconfirmed[itracked])
- for it in u_unconfirmed:
- track = unconfirmed[it]
- track.mark_removed()
- removed_stracks.append(track)
- """ Step 4: Init new stracks"""
- for inew in u_detection:
- track = detections[inew]
- if track.score < self.new_track_thresh:
- continue
- track.activate(self.kalman_filter, self.frame_id)
- activated_starcks.append(track)
- """ Step 5: Update state"""
- for track in self.lost_stracks:
- if self.frame_id - track.end_frame > self.max_time_lost:
- track.mark_removed()
- removed_stracks.append(track)
- """ Merge """
- self.tracked_stracks = [
- t for t in self.tracked_stracks if t.state == TrackState.Tracked
- ]
- self.tracked_stracks = joint_stracks(self.tracked_stracks,
- activated_starcks)
- self.tracked_stracks = joint_stracks(self.tracked_stracks,
- refind_stracks)
- self.lost_stracks = sub_stracks(self.lost_stracks, self.tracked_stracks)
- self.lost_stracks.extend(lost_stracks)
- self.lost_stracks = sub_stracks(self.lost_stracks, self.removed_stracks)
- self.removed_stracks.extend(removed_stracks)
- self.tracked_stracks, self.lost_stracks = remove_duplicate_stracks(
- self.tracked_stracks, self.lost_stracks)
- # output_stracks = [track for track in self.tracked_stracks if track.is_activated]
- output_stracks = [track for track in self.tracked_stracks]
- return output_stracks
|