123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165 |
- # Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- """
- This code is based on https://github.com/noahcao/OC_SORT/blob/master/trackers/ocsort_tracker/association.py
- """
- import os
- import numpy as np
- def iou_batch(bboxes1, bboxes2):
- bboxes2 = np.expand_dims(bboxes2, 0)
- bboxes1 = np.expand_dims(bboxes1, 1)
- xx1 = np.maximum(bboxes1[..., 0], bboxes2[..., 0])
- yy1 = np.maximum(bboxes1[..., 1], bboxes2[..., 1])
- xx2 = np.minimum(bboxes1[..., 2], bboxes2[..., 2])
- yy2 = np.minimum(bboxes1[..., 3], bboxes2[..., 3])
- w = np.maximum(0., xx2 - xx1)
- h = np.maximum(0., yy2 - yy1)
- area = w * h
- iou_matrix = area / ((bboxes1[..., 2] - bboxes1[..., 0]) *
- (bboxes1[..., 3] - bboxes1[..., 1]) +
- (bboxes2[..., 2] - bboxes2[..., 0]) *
- (bboxes2[..., 3] - bboxes2[..., 1]) - area)
- return iou_matrix
- def speed_direction_batch(dets, tracks):
- tracks = tracks[..., np.newaxis]
- CX1, CY1 = (dets[:, 0] + dets[:, 2]) / 2.0, (dets[:, 1] + dets[:, 3]) / 2.0
- CX2, CY2 = (tracks[:, 0] + tracks[:, 2]) / 2.0, (
- tracks[:, 1] + tracks[:, 3]) / 2.0
- dx = CX1 - CX2
- dy = CY1 - CY2
- norm = np.sqrt(dx**2 + dy**2) + 1e-6
- dx = dx / norm
- dy = dy / norm
- return dy, dx
- def linear_assignment(cost_matrix):
- try:
- import lap
- _, x, y = lap.lapjv(cost_matrix, extend_cost=True)
- return np.array([[y[i], i] for i in x if i >= 0])
- except ImportError:
- from scipy.optimize import linear_sum_assignment
- x, y = linear_sum_assignment(cost_matrix)
- return np.array(list(zip(x, y)))
- def associate(detections, trackers, iou_threshold, velocities, previous_obs,
- vdc_weight):
- if (len(trackers) == 0):
- return np.empty(
- (0, 2), dtype=int), np.arange(len(detections)), np.empty(
- (0, 5), dtype=int)
- Y, X = speed_direction_batch(detections, previous_obs)
- inertia_Y, inertia_X = velocities[:, 0], velocities[:, 1]
- inertia_Y = np.repeat(inertia_Y[:, np.newaxis], Y.shape[1], axis=1)
- inertia_X = np.repeat(inertia_X[:, np.newaxis], X.shape[1], axis=1)
- diff_angle_cos = inertia_X * X + inertia_Y * Y
- diff_angle_cos = np.clip(diff_angle_cos, a_min=-1, a_max=1)
- diff_angle = np.arccos(diff_angle_cos)
- diff_angle = (np.pi / 2.0 - np.abs(diff_angle)) / np.pi
- valid_mask = np.ones(previous_obs.shape[0])
- valid_mask[np.where(previous_obs[:, 4] < 0)] = 0
- iou_matrix = iou_batch(detections, trackers)
- scores = np.repeat(
- detections[:, -1][:, np.newaxis], trackers.shape[0], axis=1)
- # iou_matrix = iou_matrix * scores # a trick sometiems works, we don't encourage this
- valid_mask = np.repeat(valid_mask[:, np.newaxis], X.shape[1], axis=1)
- angle_diff_cost = (valid_mask * diff_angle) * vdc_weight
- angle_diff_cost = angle_diff_cost.T
- angle_diff_cost = angle_diff_cost * scores
- if min(iou_matrix.shape) > 0:
- a = (iou_matrix > iou_threshold).astype(np.int32)
- if a.sum(1).max() == 1 and a.sum(0).max() == 1:
- matched_indices = np.stack(np.where(a), axis=1)
- else:
- matched_indices = linear_assignment(-(iou_matrix + angle_diff_cost))
- else:
- matched_indices = np.empty(shape=(0, 2))
- unmatched_detections = []
- for d, det in enumerate(detections):
- if (d not in matched_indices[:, 0]):
- unmatched_detections.append(d)
- unmatched_trackers = []
- for t, trk in enumerate(trackers):
- if (t not in matched_indices[:, 1]):
- unmatched_trackers.append(t)
- # filter out matched with low IOU
- matches = []
- for m in matched_indices:
- if (iou_matrix[m[0], m[1]] < iou_threshold):
- unmatched_detections.append(m[0])
- unmatched_trackers.append(m[1])
- else:
- matches.append(m.reshape(1, 2))
- if (len(matches) == 0):
- matches = np.empty((0, 2), dtype=int)
- else:
- matches = np.concatenate(matches, axis=0)
- return matches, np.array(unmatched_detections), np.array(unmatched_trackers)
- def associate_only_iou(detections, trackers, iou_threshold):
- if (len(trackers) == 0):
- return np.empty(
- (0, 2), dtype=int), np.arange(len(detections)), np.empty(
- (0, 5), dtype=int)
- iou_matrix = iou_batch(detections, trackers)
- if min(iou_matrix.shape) > 0:
- a = (iou_matrix > iou_threshold).astype(np.int32)
- if a.sum(1).max() == 1 and a.sum(0).max() == 1:
- matched_indices = np.stack(np.where(a), axis=1)
- else:
- matched_indices = linear_assignment(-iou_matrix)
- else:
- matched_indices = np.empty(shape=(0, 2))
- unmatched_detections = []
- for d, det in enumerate(detections):
- if (d not in matched_indices[:, 0]):
- unmatched_detections.append(d)
- unmatched_trackers = []
- for t, trk in enumerate(trackers):
- if (t not in matched_indices[:, 1]):
- unmatched_trackers.append(t)
- # filter out matched with low IOU
- matches = []
- for m in matched_indices:
- if (iou_matrix[m[0], m[1]] < iou_threshold):
- unmatched_detections.append(m[0])
- unmatched_trackers.append(m[1])
- else:
- matches.append(m.reshape(1, 2))
- if (len(matches) == 0):
- matches = np.empty((0, 2), dtype=int)
- else:
- matches = np.concatenate(matches, axis=0)
- return matches, np.array(unmatched_detections), np.array(unmatched_trackers)
|