# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ This code is based on https://github.com/noahcao/OC_SORT/blob/master/trackers/ocsort_tracker/association.py """ import os import numpy as np def iou_batch(bboxes1, bboxes2): bboxes2 = np.expand_dims(bboxes2, 0) bboxes1 = np.expand_dims(bboxes1, 1) xx1 = np.maximum(bboxes1[..., 0], bboxes2[..., 0]) yy1 = np.maximum(bboxes1[..., 1], bboxes2[..., 1]) xx2 = np.minimum(bboxes1[..., 2], bboxes2[..., 2]) yy2 = np.minimum(bboxes1[..., 3], bboxes2[..., 3]) w = np.maximum(0., xx2 - xx1) h = np.maximum(0., yy2 - yy1) area = w * h iou_matrix = area / ((bboxes1[..., 2] - bboxes1[..., 0]) * (bboxes1[..., 3] - bboxes1[..., 1]) + (bboxes2[..., 2] - bboxes2[..., 0]) * (bboxes2[..., 3] - bboxes2[..., 1]) - area) return iou_matrix def speed_direction_batch(dets, tracks): tracks = tracks[..., np.newaxis] CX1, CY1 = (dets[:, 0] + dets[:, 2]) / 2.0, (dets[:, 1] + dets[:, 3]) / 2.0 CX2, CY2 = (tracks[:, 0] + tracks[:, 2]) / 2.0, ( tracks[:, 1] + tracks[:, 3]) / 2.0 dx = CX1 - CX2 dy = CY1 - CY2 norm = np.sqrt(dx**2 + dy**2) + 1e-6 dx = dx / norm dy = dy / norm return dy, dx def linear_assignment(cost_matrix): try: import lap _, x, y = lap.lapjv(cost_matrix, extend_cost=True) return np.array([[y[i], i] for i in x if i >= 0]) except ImportError: from scipy.optimize import linear_sum_assignment x, y = linear_sum_assignment(cost_matrix) return np.array(list(zip(x, y))) def associate(detections, trackers, iou_threshold, velocities, previous_obs, vdc_weight): if (len(trackers) == 0): return np.empty( (0, 2), dtype=int), np.arange(len(detections)), np.empty( (0, 5), dtype=int) Y, X = speed_direction_batch(detections, previous_obs) inertia_Y, inertia_X = velocities[:, 0], velocities[:, 1] inertia_Y = np.repeat(inertia_Y[:, np.newaxis], Y.shape[1], axis=1) inertia_X = np.repeat(inertia_X[:, np.newaxis], X.shape[1], axis=1) diff_angle_cos = inertia_X * X + inertia_Y * Y diff_angle_cos = np.clip(diff_angle_cos, a_min=-1, a_max=1) diff_angle = np.arccos(diff_angle_cos) diff_angle = (np.pi / 2.0 - np.abs(diff_angle)) / np.pi valid_mask = np.ones(previous_obs.shape[0]) valid_mask[np.where(previous_obs[:, 4] < 0)] = 0 iou_matrix = iou_batch(detections, trackers) scores = np.repeat( detections[:, -1][:, np.newaxis], trackers.shape[0], axis=1) # iou_matrix = iou_matrix * scores # a trick sometiems works, we don't encourage this valid_mask = np.repeat(valid_mask[:, np.newaxis], X.shape[1], axis=1) angle_diff_cost = (valid_mask * diff_angle) * vdc_weight angle_diff_cost = angle_diff_cost.T angle_diff_cost = angle_diff_cost * scores if min(iou_matrix.shape) > 0: a = (iou_matrix > iou_threshold).astype(np.int32) if a.sum(1).max() == 1 and a.sum(0).max() == 1: matched_indices = np.stack(np.where(a), axis=1) else: matched_indices = linear_assignment(-(iou_matrix + angle_diff_cost)) else: matched_indices = np.empty(shape=(0, 2)) unmatched_detections = [] for d, det in enumerate(detections): if (d not in matched_indices[:, 0]): unmatched_detections.append(d) unmatched_trackers = [] for t, trk in enumerate(trackers): if (t not in matched_indices[:, 1]): unmatched_trackers.append(t) # filter out matched with low IOU matches = [] for m in matched_indices: if (iou_matrix[m[0], m[1]] < iou_threshold): unmatched_detections.append(m[0]) unmatched_trackers.append(m[1]) else: matches.append(m.reshape(1, 2)) if (len(matches) == 0): matches = np.empty((0, 2), dtype=int) else: matches = np.concatenate(matches, axis=0) return matches, np.array(unmatched_detections), np.array(unmatched_trackers) def associate_only_iou(detections, trackers, iou_threshold): if (len(trackers) == 0): return np.empty( (0, 2), dtype=int), np.arange(len(detections)), np.empty( (0, 5), dtype=int) iou_matrix = iou_batch(detections, trackers) if min(iou_matrix.shape) > 0: a = (iou_matrix > iou_threshold).astype(np.int32) if a.sum(1).max() == 1 and a.sum(0).max() == 1: matched_indices = np.stack(np.where(a), axis=1) else: matched_indices = linear_assignment(-iou_matrix) else: matched_indices = np.empty(shape=(0, 2)) unmatched_detections = [] for d, det in enumerate(detections): if (d not in matched_indices[:, 0]): unmatched_detections.append(d) unmatched_trackers = [] for t, trk in enumerate(trackers): if (t not in matched_indices[:, 1]): unmatched_trackers.append(t) # filter out matched with low IOU matches = [] for m in matched_indices: if (iou_matrix[m[0], m[1]] < iou_threshold): unmatched_detections.append(m[0]) unmatched_trackers.append(m[1]) else: matches.append(m.reshape(1, 2)) if (len(matches) == 0): matches = np.empty((0, 2), dtype=int) else: matches = np.concatenate(matches, axis=0) return matches, np.array(unmatched_detections), np.array(unmatched_trackers)