1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246 |
- # Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- from __future__ import absolute_import
- from __future__ import division
- from __future__ import print_function
- import os
- import copy
- import sys
- import math
- from collections import defaultdict
- import numpy as np
- from ppdet.modeling.bbox_utils import bbox_iou_np_expand
- from .map_utils import ap_per_class
- from .metrics import Metric
- from .munkres import Munkres
- try:
- import motmetrics as mm
- mm.lap.default_solver = 'lap'
- except:
- print(
- 'Warning: Unable to use MOT metric, please install motmetrics, for example: `pip install motmetrics`, see https://github.com/longcw/py-motmetrics'
- )
- pass
- from ppdet.utils.logger import setup_logger
- logger = setup_logger(__name__)
- __all__ = ['MOTEvaluator', 'MOTMetric', 'JDEDetMetric', 'KITTIMOTMetric']
- def read_mot_results(filename, is_gt=False, is_ignore=False):
- valid_label = [1]
- ignore_labels = [2, 7, 8, 12] # only in motchallenge datasets like 'MOT16'
- if is_gt:
- logger.info(
- "In MOT16/17 dataset the valid_label of ground truth is '{}', "
- "in other dataset it should be '0' for single classs MOT.".format(
- valid_label[0]))
- results_dict = dict()
- if os.path.isfile(filename):
- with open(filename, 'r') as f:
- for line in f.readlines():
- linelist = line.split(',')
- if len(linelist) < 7:
- continue
- fid = int(linelist[0])
- if fid < 1:
- continue
- results_dict.setdefault(fid, list())
- if is_gt:
- label = int(float(linelist[7]))
- mark = int(float(linelist[6]))
- if mark == 0 or label not in valid_label:
- continue
- score = 1
- elif is_ignore:
- if 'MOT16-' in filename or 'MOT17-' in filename or 'MOT15-' in filename or 'MOT20-' in filename:
- label = int(float(linelist[7]))
- vis_ratio = float(linelist[8])
- if label not in ignore_labels and vis_ratio >= 0:
- continue
- else:
- continue
- score = 1
- else:
- score = float(linelist[6])
- tlwh = tuple(map(float, linelist[2:6]))
- target_id = int(linelist[1])
- results_dict[fid].append((tlwh, target_id, score))
- return results_dict
- """
- MOT dataset label list, see in https://motchallenge.net
- labels={'ped', ... % 1
- 'person_on_vhcl', ... % 2
- 'car', ... % 3
- 'bicycle', ... % 4
- 'mbike', ... % 5
- 'non_mot_vhcl', ... % 6
- 'static_person', ... % 7
- 'distractor', ... % 8
- 'occluder', ... % 9
- 'occluder_on_grnd', ... % 10
- 'occluder_full', ... % 11
- 'reflection', ... % 12
- 'crowd' ... % 13
- };
- """
- def unzip_objs(objs):
- if len(objs) > 0:
- tlwhs, ids, scores = zip(*objs)
- else:
- tlwhs, ids, scores = [], [], []
- tlwhs = np.asarray(tlwhs, dtype=float).reshape(-1, 4)
- return tlwhs, ids, scores
- class MOTEvaluator(object):
- def __init__(self, data_root, seq_name, data_type):
- self.data_root = data_root
- self.seq_name = seq_name
- self.data_type = data_type
- self.load_annotations()
- try:
- import motmetrics as mm
- mm.lap.default_solver = 'lap'
- except Exception as e:
- raise RuntimeError(
- 'Unable to use MOT metric, please install motmetrics, for example: `pip install motmetrics`, see https://github.com/longcw/py-motmetrics'
- )
- self.reset_accumulator()
- def load_annotations(self):
- assert self.data_type == 'mot'
- gt_filename = os.path.join(self.data_root, self.seq_name, 'gt',
- 'gt.txt')
- if not os.path.exists(gt_filename):
- logger.warning(
- "gt_filename '{}' of MOTEvaluator is not exist, so the MOTA will be -INF."
- )
- self.gt_frame_dict = read_mot_results(gt_filename, is_gt=True)
- self.gt_ignore_frame_dict = read_mot_results(
- gt_filename, is_ignore=True)
- def reset_accumulator(self):
- self.acc = mm.MOTAccumulator(auto_id=True)
- def eval_frame(self, frame_id, trk_tlwhs, trk_ids, rtn_events=False):
- # results
- trk_tlwhs = np.copy(trk_tlwhs)
- trk_ids = np.copy(trk_ids)
- # gts
- gt_objs = self.gt_frame_dict.get(frame_id, [])
- gt_tlwhs, gt_ids = unzip_objs(gt_objs)[:2]
- # ignore boxes
- ignore_objs = self.gt_ignore_frame_dict.get(frame_id, [])
- ignore_tlwhs = unzip_objs(ignore_objs)[0]
- # remove ignored results
- keep = np.ones(len(trk_tlwhs), dtype=bool)
- iou_distance = mm.distances.iou_matrix(
- ignore_tlwhs, trk_tlwhs, max_iou=0.5)
- if len(iou_distance) > 0:
- match_is, match_js = mm.lap.linear_sum_assignment(iou_distance)
- match_is, match_js = map(lambda a: np.asarray(a, dtype=int), [match_is, match_js])
- match_ious = iou_distance[match_is, match_js]
- match_js = np.asarray(match_js, dtype=int)
- match_js = match_js[np.logical_not(np.isnan(match_ious))]
- keep[match_js] = False
- trk_tlwhs = trk_tlwhs[keep]
- trk_ids = trk_ids[keep]
- # get distance matrix
- iou_distance = mm.distances.iou_matrix(gt_tlwhs, trk_tlwhs, max_iou=0.5)
- # acc
- self.acc.update(gt_ids, trk_ids, iou_distance)
- if rtn_events and iou_distance.size > 0 and hasattr(self.acc,
- 'last_mot_events'):
- events = self.acc.last_mot_events # only supported by https://github.com/longcw/py-motmetrics
- else:
- events = None
- return events
- def eval_file(self, filename):
- self.reset_accumulator()
- result_frame_dict = read_mot_results(filename, is_gt=False)
- frames = sorted(list(set(result_frame_dict.keys())))
- for frame_id in frames:
- trk_objs = result_frame_dict.get(frame_id, [])
- trk_tlwhs, trk_ids = unzip_objs(trk_objs)[:2]
- self.eval_frame(frame_id, trk_tlwhs, trk_ids, rtn_events=False)
- return self.acc
- @staticmethod
- def get_summary(accs,
- names,
- metrics=('mota', 'num_switches', 'idp', 'idr', 'idf1',
- 'precision', 'recall')):
- names = copy.deepcopy(names)
- if metrics is None:
- metrics = mm.metrics.motchallenge_metrics
- metrics = copy.deepcopy(metrics)
- mh = mm.metrics.create()
- summary = mh.compute_many(
- accs, metrics=metrics, names=names, generate_overall=True)
- return summary
- @staticmethod
- def save_summary(summary, filename):
- import pandas as pd
- writer = pd.ExcelWriter(filename)
- summary.to_excel(writer)
- writer.save()
- class MOTMetric(Metric):
- def __init__(self, save_summary=False):
- self.save_summary = save_summary
- self.MOTEvaluator = MOTEvaluator
- self.result_root = None
- self.reset()
- def reset(self):
- self.accs = []
- self.seqs = []
- def update(self, data_root, seq, data_type, result_root, result_filename):
- evaluator = self.MOTEvaluator(data_root, seq, data_type)
- self.accs.append(evaluator.eval_file(result_filename))
- self.seqs.append(seq)
- self.result_root = result_root
- def accumulate(self):
- metrics = mm.metrics.motchallenge_metrics
- mh = mm.metrics.create()
- summary = self.MOTEvaluator.get_summary(self.accs, self.seqs, metrics)
- self.strsummary = mm.io.render_summary(
- summary,
- formatters=mh.formatters,
- namemap=mm.io.motchallenge_metric_names)
- if self.save_summary:
- self.MOTEvaluator.save_summary(
- summary, os.path.join(self.result_root, 'summary.xlsx'))
- def log(self):
- print(self.strsummary)
- def get_results(self):
- return self.strsummary
- class JDEDetMetric(Metric):
- # Note this detection AP metric is different from COCOMetric or VOCMetric,
- # and the bboxes coordinates are not scaled to the original image
- def __init__(self, overlap_thresh=0.5):
- self.overlap_thresh = overlap_thresh
- self.reset()
- def reset(self):
- self.AP_accum = np.zeros(1)
- self.AP_accum_count = np.zeros(1)
- def update(self, inputs, outputs):
- bboxes = outputs['bbox'][:, 2:].numpy()
- scores = outputs['bbox'][:, 1].numpy()
- labels = outputs['bbox'][:, 0].numpy()
- bbox_lengths = outputs['bbox_num'].numpy()
- if bboxes.shape[0] == 1 and bboxes.sum() == 0.0:
- return
- gt_boxes = inputs['gt_bbox'].numpy()[0]
- gt_labels = inputs['gt_class'].numpy()[0]
- if gt_labels.shape[0] == 0:
- return
- correct = []
- detected = []
- for i in range(bboxes.shape[0]):
- obj_pred = 0
- pred_bbox = bboxes[i].reshape(1, 4)
- # Compute iou with target boxes
- iou = bbox_iou_np_expand(pred_bbox, gt_boxes, x1y1x2y2=True)[0]
- # Extract index of largest overlap
- best_i = np.argmax(iou)
- # If overlap exceeds threshold and classification is correct mark as correct
- if iou[best_i] > self.overlap_thresh and obj_pred == gt_labels[
- best_i] and best_i not in detected:
- correct.append(1)
- detected.append(best_i)
- else:
- correct.append(0)
- # Compute Average Precision (AP) per class
- target_cls = list(gt_labels.T[0])
- AP, AP_class, R, P = ap_per_class(
- tp=correct,
- conf=scores,
- pred_cls=np.zeros_like(scores),
- target_cls=target_cls)
- self.AP_accum_count += np.bincount(AP_class, minlength=1)
- self.AP_accum += np.bincount(AP_class, minlength=1, weights=AP)
- def accumulate(self):
- logger.info("Accumulating evaluatation results...")
- self.map_stat = self.AP_accum[0] / (self.AP_accum_count[0] + 1E-16)
- def log(self):
- map_stat = 100. * self.map_stat
- logger.info("mAP({:.2f}) = {:.2f}%".format(self.overlap_thresh,
- map_stat))
- def get_results(self):
- return self.map_stat
- """
- Following code is borrow from https://github.com/xingyizhou/CenterTrack/blob/master/src/tools/eval_kitti_track/evaluate_tracking.py
- """
- class tData:
- """
- Utility class to load data.
- """
- def __init__(self,frame=-1,obj_type="unset",truncation=-1,occlusion=-1,\
- obs_angle=-10,x1=-1,y1=-1,x2=-1,y2=-1,w=-1,h=-1,l=-1,\
- X=-1000,Y=-1000,Z=-1000,yaw=-10,score=-1000,track_id=-1):
- """
- Constructor, initializes the object given the parameters.
- """
- self.frame = frame
- self.track_id = track_id
- self.obj_type = obj_type
- self.truncation = truncation
- self.occlusion = occlusion
- self.obs_angle = obs_angle
- self.x1 = x1
- self.y1 = y1
- self.x2 = x2
- self.y2 = y2
- self.w = w
- self.h = h
- self.l = l
- self.X = X
- self.Y = Y
- self.Z = Z
- self.yaw = yaw
- self.score = score
- self.ignored = False
- self.valid = False
- self.tracker = -1
- def __str__(self):
- attrs = vars(self)
- return '\n'.join("%s: %s" % item for item in attrs.items())
- class KITTIEvaluation(object):
- """ KITTI tracking statistics (CLEAR MOT, id-switches, fragments, ML/PT/MT, precision/recall)
- MOTA - Multi-object tracking accuracy in [0,100]
- MOTP - Multi-object tracking precision in [0,100] (3D) / [td,100] (2D)
- MOTAL - Multi-object tracking accuracy in [0,100] with log10(id-switches)
- id-switches - number of id switches
- fragments - number of fragmentations
- MT, PT, ML - number of mostly tracked, partially tracked and mostly lost trajectories
- recall - recall = percentage of detected targets
- precision - precision = percentage of correctly detected targets
- FAR - number of false alarms per frame
- falsepositives - number of false positives (FP)
- missed - number of missed targets (FN)
- """
- def __init__(self, result_path, gt_path, min_overlap=0.5, max_truncation = 0,\
- min_height = 25, max_occlusion = 2, cls="car",\
- n_frames=[], seqs=[], n_sequences=0):
- # get number of sequences and
- # get number of frames per sequence from test mapping
- # (created while extracting the benchmark)
- self.gt_path = os.path.join(gt_path, "../labels")
- self.n_frames = n_frames
- self.sequence_name = seqs
- self.n_sequences = n_sequences
- self.cls = cls # class to evaluate, i.e. pedestrian or car
- self.result_path = result_path
- # statistics and numbers for evaluation
- self.n_gt = 0 # number of ground truth detections minus ignored false negatives and true positives
- self.n_igt = 0 # number of ignored ground truth detections
- self.n_gts = [
- ] # number of ground truth detections minus ignored false negatives and true positives PER SEQUENCE
- self.n_igts = [
- ] # number of ground ignored truth detections PER SEQUENCE
- self.n_gt_trajectories = 0
- self.n_gt_seq = []
- self.n_tr = 0 # number of tracker detections minus ignored tracker detections
- self.n_trs = [
- ] # number of tracker detections minus ignored tracker detections PER SEQUENCE
- self.n_itr = 0 # number of ignored tracker detections
- self.n_itrs = [] # number of ignored tracker detections PER SEQUENCE
- self.n_igttr = 0 # number of ignored ground truth detections where the corresponding associated tracker detection is also ignored
- self.n_tr_trajectories = 0
- self.n_tr_seq = []
- self.MOTA = 0
- self.MOTP = 0
- self.MOTAL = 0
- self.MODA = 0
- self.MODP = 0
- self.MODP_t = []
- self.recall = 0
- self.precision = 0
- self.F1 = 0
- self.FAR = 0
- self.total_cost = 0
- self.itp = 0 # number of ignored true positives
- self.itps = [] # number of ignored true positives PER SEQUENCE
- self.tp = 0 # number of true positives including ignored true positives!
- self.tps = [
- ] # number of true positives including ignored true positives PER SEQUENCE
- self.fn = 0 # number of false negatives WITHOUT ignored false negatives
- self.fns = [
- ] # number of false negatives WITHOUT ignored false negatives PER SEQUENCE
- self.ifn = 0 # number of ignored false negatives
- self.ifns = [] # number of ignored false negatives PER SEQUENCE
- self.fp = 0 # number of false positives
- # a bit tricky, the number of ignored false negatives and ignored true positives
- # is subtracted, but if both tracker detection and ground truth detection
- # are ignored this number is added again to avoid double counting
- self.fps = [] # above PER SEQUENCE
- self.mme = 0
- self.fragments = 0
- self.id_switches = 0
- self.MT = 0
- self.PT = 0
- self.ML = 0
- self.min_overlap = min_overlap # minimum bounding box overlap for 3rd party metrics
- self.max_truncation = max_truncation # maximum truncation of an object for evaluation
- self.max_occlusion = max_occlusion # maximum occlusion of an object for evaluation
- self.min_height = min_height # minimum height of an object for evaluation
- self.n_sample_points = 500
- # this should be enough to hold all groundtruth trajectories
- # is expanded if necessary and reduced in any case
- self.gt_trajectories = [[] for x in range(self.n_sequences)]
- self.ign_trajectories = [[] for x in range(self.n_sequences)]
- def loadGroundtruth(self):
- try:
- self._loadData(self.gt_path, cls=self.cls, loading_groundtruth=True)
- except IOError:
- return False
- return True
- def loadTracker(self):
- try:
- if not self._loadData(
- self.result_path, cls=self.cls, loading_groundtruth=False):
- return False
- except IOError:
- return False
- return True
- def _loadData(self,
- root_dir,
- cls,
- min_score=-1000,
- loading_groundtruth=False):
- """
- Generic loader for ground truth and tracking data.
- Use loadGroundtruth() or loadTracker() to load this data.
- Loads detections in KITTI format from textfiles.
- """
- # construct objectDetections object to hold detection data
- t_data = tData()
- data = []
- eval_2d = True
- eval_3d = True
- seq_data = []
- n_trajectories = 0
- n_trajectories_seq = []
- for seq, s_name in enumerate(self.sequence_name):
- i = 0
- filename = os.path.join(root_dir, "%s.txt" % s_name)
- f = open(filename, "r")
- f_data = [
- [] for x in range(self.n_frames[seq])
- ] # current set has only 1059 entries, sufficient length is checked anyway
- ids = []
- n_in_seq = 0
- id_frame_cache = []
- for line in f:
- # KITTI tracking benchmark data format:
- # (frame,tracklet_id,objectType,truncation,occlusion,alpha,x1,y1,x2,y2,h,w,l,X,Y,Z,ry)
- line = line.strip()
- fields = line.split(" ")
- # classes that should be loaded (ignored neighboring classes)
- if "car" in cls.lower():
- classes = ["car", "van"]
- elif "pedestrian" in cls.lower():
- classes = ["pedestrian", "person_sitting"]
- else:
- classes = [cls.lower()]
- classes += ["dontcare"]
- if not any([s for s in classes if s in fields[2].lower()]):
- continue
- # get fields from table
- t_data.frame = int(float(fields[0])) # frame
- t_data.track_id = int(float(fields[1])) # id
- t_data.obj_type = fields[
- 2].lower() # object type [car, pedestrian, cyclist, ...]
- t_data.truncation = int(
- float(fields[3])) # truncation [-1,0,1,2]
- t_data.occlusion = int(
- float(fields[4])) # occlusion [-1,0,1,2]
- t_data.obs_angle = float(fields[5]) # observation angle [rad]
- t_data.x1 = float(fields[6]) # left [px]
- t_data.y1 = float(fields[7]) # top [px]
- t_data.x2 = float(fields[8]) # right [px]
- t_data.y2 = float(fields[9]) # bottom [px]
- t_data.h = float(fields[10]) # height [m]
- t_data.w = float(fields[11]) # width [m]
- t_data.l = float(fields[12]) # length [m]
- t_data.X = float(fields[13]) # X [m]
- t_data.Y = float(fields[14]) # Y [m]
- t_data.Z = float(fields[15]) # Z [m]
- t_data.yaw = float(fields[16]) # yaw angle [rad]
- if not loading_groundtruth:
- if len(fields) == 17:
- t_data.score = -1
- elif len(fields) == 18:
- t_data.score = float(fields[17]) # detection score
- else:
- logger.info("file is not in KITTI format")
- return
- # do not consider objects marked as invalid
- if t_data.track_id is -1 and t_data.obj_type != "dontcare":
- continue
- idx = t_data.frame
- # check if length for frame data is sufficient
- if idx >= len(f_data):
- print("extend f_data", idx, len(f_data))
- f_data += [[] for x in range(max(500, idx - len(f_data)))]
- try:
- id_frame = (t_data.frame, t_data.track_id)
- if id_frame in id_frame_cache and not loading_groundtruth:
- logger.info(
- "track ids are not unique for sequence %d: frame %d"
- % (seq, t_data.frame))
- logger.info(
- "track id %d occurred at least twice for this frame"
- % t_data.track_id)
- logger.info("Exiting...")
- #continue # this allows to evaluate non-unique result files
- return False
- id_frame_cache.append(id_frame)
- f_data[t_data.frame].append(copy.copy(t_data))
- except:
- print(len(f_data), idx)
- raise
- if t_data.track_id not in ids and t_data.obj_type != "dontcare":
- ids.append(t_data.track_id)
- n_trajectories += 1
- n_in_seq += 1
- # check if uploaded data provides information for 2D and 3D evaluation
- if not loading_groundtruth and eval_2d is True and (
- t_data.x1 == -1 or t_data.x2 == -1 or t_data.y1 == -1 or
- t_data.y2 == -1):
- eval_2d = False
- if not loading_groundtruth and eval_3d is True and (
- t_data.X == -1000 or t_data.Y == -1000 or
- t_data.Z == -1000):
- eval_3d = False
- # only add existing frames
- n_trajectories_seq.append(n_in_seq)
- seq_data.append(f_data)
- f.close()
- if not loading_groundtruth:
- self.tracker = seq_data
- self.n_tr_trajectories = n_trajectories
- self.eval_2d = eval_2d
- self.eval_3d = eval_3d
- self.n_tr_seq = n_trajectories_seq
- if self.n_tr_trajectories == 0:
- return False
- else:
- # split ground truth and DontCare areas
- self.dcareas = []
- self.groundtruth = []
- for seq_idx in range(len(seq_data)):
- seq_gt = seq_data[seq_idx]
- s_g, s_dc = [], []
- for f in range(len(seq_gt)):
- all_gt = seq_gt[f]
- g, dc = [], []
- for gg in all_gt:
- if gg.obj_type == "dontcare":
- dc.append(gg)
- else:
- g.append(gg)
- s_g.append(g)
- s_dc.append(dc)
- self.dcareas.append(s_dc)
- self.groundtruth.append(s_g)
- self.n_gt_seq = n_trajectories_seq
- self.n_gt_trajectories = n_trajectories
- return True
- def boxoverlap(self, a, b, criterion="union"):
- """
- boxoverlap computes intersection over union for bbox a and b in KITTI format.
- If the criterion is 'union', overlap = (a inter b) / a union b).
- If the criterion is 'a', overlap = (a inter b) / a, where b should be a dontcare area.
- """
- x1 = max(a.x1, b.x1)
- y1 = max(a.y1, b.y1)
- x2 = min(a.x2, b.x2)
- y2 = min(a.y2, b.y2)
- w = x2 - x1
- h = y2 - y1
- if w <= 0. or h <= 0.:
- return 0.
- inter = w * h
- aarea = (a.x2 - a.x1) * (a.y2 - a.y1)
- barea = (b.x2 - b.x1) * (b.y2 - b.y1)
- # intersection over union overlap
- if criterion.lower() == "union":
- o = inter / float(aarea + barea - inter)
- elif criterion.lower() == "a":
- o = float(inter) / float(aarea)
- else:
- raise TypeError("Unkown type for criterion")
- return o
- def compute3rdPartyMetrics(self):
- """
- Computes the metrics defined in
- - Stiefelhagen 2008: Evaluating Multiple Object Tracking Performance: The CLEAR MOT Metrics
- MOTA, MOTAL, MOTP
- - Nevatia 2008: Global Data Association for Multi-Object Tracking Using Network Flows
- MT/PT/ML
- """
- # construct Munkres object for Hungarian Method association
- hm = Munkres()
- max_cost = 1e9
- # go through all frames and associate ground truth and tracker results
- # groundtruth and tracker contain lists for every single frame containing lists of KITTI format detections
- fr, ids = 0, 0
- for seq_idx in range(len(self.groundtruth)):
- seq_gt = self.groundtruth[seq_idx]
- seq_dc = self.dcareas[seq_idx] # don't care areas
- seq_tracker = self.tracker[seq_idx]
- seq_trajectories = defaultdict(list)
- seq_ignored = defaultdict(list)
- # statistics over the current sequence, check the corresponding
- # variable comments in __init__ to get their meaning
- seqtp = 0
- seqitp = 0
- seqfn = 0
- seqifn = 0
- seqfp = 0
- seqigt = 0
- seqitr = 0
- last_ids = [[], []]
- n_gts = 0
- n_trs = 0
- for f in range(len(seq_gt)):
- g = seq_gt[f]
- dc = seq_dc[f]
- t = seq_tracker[f]
- # counting total number of ground truth and tracker objects
- self.n_gt += len(g)
- self.n_tr += len(t)
- n_gts += len(g)
- n_trs += len(t)
- # use hungarian method to associate, using boxoverlap 0..1 as cost
- # build cost matrix
- cost_matrix = []
- this_ids = [[], []]
- for gg in g:
- # save current ids
- this_ids[0].append(gg.track_id)
- this_ids[1].append(-1)
- gg.tracker = -1
- gg.id_switch = 0
- gg.fragmentation = 0
- cost_row = []
- for tt in t:
- # overlap == 1 is cost ==0
- c = 1 - self.boxoverlap(gg, tt)
- # gating for boxoverlap
- if c <= self.min_overlap:
- cost_row.append(c)
- else:
- cost_row.append(max_cost) # = 1e9
- cost_matrix.append(cost_row)
- # all ground truth trajectories are initially not associated
- # extend groundtruth trajectories lists (merge lists)
- seq_trajectories[gg.track_id].append(-1)
- seq_ignored[gg.track_id].append(False)
- if len(g) is 0:
- cost_matrix = [[]]
- # associate
- association_matrix = hm.compute(cost_matrix)
- # tmp variables for sanity checks and MODP computation
- tmptp = 0
- tmpfp = 0
- tmpfn = 0
- tmpc = 0 # this will sum up the overlaps for all true positives
- tmpcs = [0] * len(
- g) # this will save the overlaps for all true positives
- # the reason is that some true positives might be ignored
- # later such that the corrsponding overlaps can
- # be subtracted from tmpc for MODP computation
- # mapping for tracker ids and ground truth ids
- for row, col in association_matrix:
- # apply gating on boxoverlap
- c = cost_matrix[row][col]
- if c < max_cost:
- g[row].tracker = t[col].track_id
- this_ids[1][row] = t[col].track_id
- t[col].valid = True
- g[row].distance = c
- self.total_cost += 1 - c
- tmpc += 1 - c
- tmpcs[row] = 1 - c
- seq_trajectories[g[row].track_id][-1] = t[col].track_id
- # true positives are only valid associations
- self.tp += 1
- tmptp += 1
- else:
- g[row].tracker = -1
- self.fn += 1
- tmpfn += 1
- # associate tracker and DontCare areas
- # ignore tracker in neighboring classes
- nignoredtracker = 0 # number of ignored tracker detections
- ignoredtrackers = dict() # will associate the track_id with -1
- # if it is not ignored and 1 if it is
- # ignored;
- # this is used to avoid double counting ignored
- # cases, see the next loop
- for tt in t:
- ignoredtrackers[tt.track_id] = -1
- # ignore detection if it belongs to a neighboring class or is
- # smaller or equal to the minimum height
- tt_height = abs(tt.y1 - tt.y2)
- if ((self.cls == "car" and tt.obj_type == "van") or
- (self.cls == "pedestrian" and
- tt.obj_type == "person_sitting") or
- tt_height <= self.min_height) and not tt.valid:
- nignoredtracker += 1
- tt.ignored = True
- ignoredtrackers[tt.track_id] = 1
- continue
- for d in dc:
- overlap = self.boxoverlap(tt, d, "a")
- if overlap > 0.5 and not tt.valid:
- tt.ignored = True
- nignoredtracker += 1
- ignoredtrackers[tt.track_id] = 1
- break
- # check for ignored FN/TP (truncation or neighboring object class)
- ignoredfn = 0 # the number of ignored false negatives
- nignoredtp = 0 # the number of ignored true positives
- nignoredpairs = 0 # the number of ignored pairs, i.e. a true positive
- # which is ignored but where the associated tracker
- # detection has already been ignored
- gi = 0
- for gg in g:
- if gg.tracker < 0:
- if gg.occlusion>self.max_occlusion or gg.truncation>self.max_truncation\
- or (self.cls=="car" and gg.obj_type=="van") or (self.cls=="pedestrian" and gg.obj_type=="person_sitting"):
- seq_ignored[gg.track_id][-1] = True
- gg.ignored = True
- ignoredfn += 1
- elif gg.tracker >= 0:
- if gg.occlusion>self.max_occlusion or gg.truncation>self.max_truncation\
- or (self.cls=="car" and gg.obj_type=="van") or (self.cls=="pedestrian" and gg.obj_type=="person_sitting"):
- seq_ignored[gg.track_id][-1] = True
- gg.ignored = True
- nignoredtp += 1
- # if the associated tracker detection is already ignored,
- # we want to avoid double counting ignored detections
- if ignoredtrackers[gg.tracker] > 0:
- nignoredpairs += 1
- # for computing MODP, the overlaps from ignored detections
- # are subtracted
- tmpc -= tmpcs[gi]
- gi += 1
- # the below might be confusion, check the comments in __init__
- # to see what the individual statistics represent
- # correct TP by number of ignored TP due to truncation
- # ignored TP are shown as tracked in visualization
- tmptp -= nignoredtp
- # count the number of ignored true positives
- self.itp += nignoredtp
- # adjust the number of ground truth objects considered
- self.n_gt -= (ignoredfn + nignoredtp)
- # count the number of ignored ground truth objects
- self.n_igt += ignoredfn + nignoredtp
- # count the number of ignored tracker objects
- self.n_itr += nignoredtracker
- # count the number of ignored pairs, i.e. associated tracker and
- # ground truth objects that are both ignored
- self.n_igttr += nignoredpairs
- # false negatives = associated gt bboxes exceding association threshold + non-associated gt bboxes
- tmpfn += len(g) - len(association_matrix) - ignoredfn
- self.fn += len(g) - len(association_matrix) - ignoredfn
- self.ifn += ignoredfn
- # false positives = tracker bboxes - associated tracker bboxes
- # mismatches (mme_t)
- tmpfp += len(
- t) - tmptp - nignoredtracker - nignoredtp + nignoredpairs
- self.fp += len(
- t) - tmptp - nignoredtracker - nignoredtp + nignoredpairs
- # update sequence data
- seqtp += tmptp
- seqitp += nignoredtp
- seqfp += tmpfp
- seqfn += tmpfn
- seqifn += ignoredfn
- seqigt += ignoredfn + nignoredtp
- seqitr += nignoredtracker
- # sanity checks
- # - the number of true positives minues ignored true positives
- # should be greater or equal to 0
- # - the number of false negatives should be greater or equal to 0
- # - the number of false positives needs to be greater or equal to 0
- # otherwise ignored detections might be counted double
- # - the number of counted true positives (plus ignored ones)
- # and the number of counted false negatives (plus ignored ones)
- # should match the total number of ground truth objects
- # - the number of counted true positives (plus ignored ones)
- # and the number of counted false positives
- # plus the number of ignored tracker detections should
- # match the total number of tracker detections; note that
- # nignoredpairs is subtracted here to avoid double counting
- # of ignored detection sin nignoredtp and nignoredtracker
- if tmptp < 0:
- print(tmptp, nignoredtp)
- raise NameError("Something went wrong! TP is negative")
- if tmpfn < 0:
- print(tmpfn,
- len(g),
- len(association_matrix), ignoredfn, nignoredpairs)
- raise NameError("Something went wrong! FN is negative")
- if tmpfp < 0:
- print(tmpfp,
- len(t), tmptp, nignoredtracker, nignoredtp,
- nignoredpairs)
- raise NameError("Something went wrong! FP is negative")
- if tmptp + tmpfn is not len(g) - ignoredfn - nignoredtp:
- print("seqidx", seq_idx)
- print("frame ", f)
- print("TP ", tmptp)
- print("FN ", tmpfn)
- print("FP ", tmpfp)
- print("nGT ", len(g))
- print("nAss ", len(association_matrix))
- print("ign GT", ignoredfn)
- print("ign TP", nignoredtp)
- raise NameError(
- "Something went wrong! nGroundtruth is not TP+FN")
- if tmptp + tmpfp + nignoredtp + nignoredtracker - nignoredpairs is not len(
- t):
- print(seq_idx, f, len(t), tmptp, tmpfp)
- print(len(association_matrix), association_matrix)
- raise NameError(
- "Something went wrong! nTracker is not TP+FP")
- # check for id switches or fragmentations
- for i, tt in enumerate(this_ids[0]):
- if tt in last_ids[0]:
- idx = last_ids[0].index(tt)
- tid = this_ids[1][i]
- lid = last_ids[1][idx]
- if tid != lid and lid != -1 and tid != -1:
- if g[i].truncation < self.max_truncation:
- g[i].id_switch = 1
- ids += 1
- if tid != lid and lid != -1:
- if g[i].truncation < self.max_truncation:
- g[i].fragmentation = 1
- fr += 1
- # save current index
- last_ids = this_ids
- # compute MOTP_t
- MODP_t = 1
- if tmptp != 0:
- MODP_t = tmpc / float(tmptp)
- self.MODP_t.append(MODP_t)
- # remove empty lists for current gt trajectories
- self.gt_trajectories[seq_idx] = seq_trajectories
- self.ign_trajectories[seq_idx] = seq_ignored
- # gather statistics for "per sequence" statistics.
- self.n_gts.append(n_gts)
- self.n_trs.append(n_trs)
- self.tps.append(seqtp)
- self.itps.append(seqitp)
- self.fps.append(seqfp)
- self.fns.append(seqfn)
- self.ifns.append(seqifn)
- self.n_igts.append(seqigt)
- self.n_itrs.append(seqitr)
- # compute MT/PT/ML, fragments, idswitches for all groundtruth trajectories
- n_ignored_tr_total = 0
- for seq_idx, (
- seq_trajectories, seq_ignored
- ) in enumerate(zip(self.gt_trajectories, self.ign_trajectories)):
- if len(seq_trajectories) == 0:
- continue
- tmpMT, tmpML, tmpPT, tmpId_switches, tmpFragments = [0] * 5
- n_ignored_tr = 0
- for g, ign_g in zip(seq_trajectories.values(),
- seq_ignored.values()):
- # all frames of this gt trajectory are ignored
- if all(ign_g):
- n_ignored_tr += 1
- n_ignored_tr_total += 1
- continue
- # all frames of this gt trajectory are not assigned to any detections
- if all([this == -1 for this in g]):
- tmpML += 1
- self.ML += 1
- continue
- # compute tracked frames in trajectory
- last_id = g[0]
- # first detection (necessary to be in gt_trajectories) is always tracked
- tracked = 1 if g[0] >= 0 else 0
- lgt = 0 if ign_g[0] else 1
- for f in range(1, len(g)):
- if ign_g[f]:
- last_id = -1
- continue
- lgt += 1
- if last_id != g[f] and last_id != -1 and g[f] != -1 and g[
- f - 1] != -1:
- tmpId_switches += 1
- self.id_switches += 1
- if f < len(g) - 1 and g[f - 1] != g[
- f] and last_id != -1 and g[f] != -1 and g[f +
- 1] != -1:
- tmpFragments += 1
- self.fragments += 1
- if g[f] != -1:
- tracked += 1
- last_id = g[f]
- # handle last frame; tracked state is handled in for loop (g[f]!=-1)
- if len(g) > 1 and g[f - 1] != g[f] and last_id != -1 and g[
- f] != -1 and not ign_g[f]:
- tmpFragments += 1
- self.fragments += 1
- # compute MT/PT/ML
- tracking_ratio = tracked / float(len(g) - sum(ign_g))
- if tracking_ratio > 0.8:
- tmpMT += 1
- self.MT += 1
- elif tracking_ratio < 0.2:
- tmpML += 1
- self.ML += 1
- else: # 0.2 <= tracking_ratio <= 0.8
- tmpPT += 1
- self.PT += 1
- if (self.n_gt_trajectories - n_ignored_tr_total) == 0:
- self.MT = 0.
- self.PT = 0.
- self.ML = 0.
- else:
- self.MT /= float(self.n_gt_trajectories - n_ignored_tr_total)
- self.PT /= float(self.n_gt_trajectories - n_ignored_tr_total)
- self.ML /= float(self.n_gt_trajectories - n_ignored_tr_total)
- # precision/recall etc.
- if (self.fp + self.tp) == 0 or (self.tp + self.fn) == 0:
- self.recall = 0.
- self.precision = 0.
- else:
- self.recall = self.tp / float(self.tp + self.fn)
- self.precision = self.tp / float(self.fp + self.tp)
- if (self.recall + self.precision) == 0:
- self.F1 = 0.
- else:
- self.F1 = 2. * (self.precision * self.recall) / (
- self.precision + self.recall)
- if sum(self.n_frames) == 0:
- self.FAR = "n/a"
- else:
- self.FAR = self.fp / float(sum(self.n_frames))
- # compute CLEARMOT
- if self.n_gt == 0:
- self.MOTA = -float("inf")
- self.MODA = -float("inf")
- else:
- self.MOTA = 1 - (self.fn + self.fp + self.id_switches
- ) / float(self.n_gt)
- self.MODA = 1 - (self.fn + self.fp) / float(self.n_gt)
- if self.tp == 0:
- self.MOTP = float("inf")
- else:
- self.MOTP = self.total_cost / float(self.tp)
- if self.n_gt != 0:
- if self.id_switches == 0:
- self.MOTAL = 1 - (self.fn + self.fp + self.id_switches
- ) / float(self.n_gt)
- else:
- self.MOTAL = 1 - (self.fn + self.fp +
- math.log10(self.id_switches)
- ) / float(self.n_gt)
- else:
- self.MOTAL = -float("inf")
- if sum(self.n_frames) == 0:
- self.MODP = "n/a"
- else:
- self.MODP = sum(self.MODP_t) / float(sum(self.n_frames))
- return True
- def createSummary(self):
- summary = ""
- summary += "tracking evaluation summary".center(80, "=") + "\n"
- summary += self.printEntry("Multiple Object Tracking Accuracy (MOTA)",
- self.MOTA) + "\n"
- summary += self.printEntry("Multiple Object Tracking Precision (MOTP)",
- self.MOTP) + "\n"
- summary += self.printEntry("Multiple Object Tracking Accuracy (MOTAL)",
- self.MOTAL) + "\n"
- summary += self.printEntry("Multiple Object Detection Accuracy (MODA)",
- self.MODA) + "\n"
- summary += self.printEntry("Multiple Object Detection Precision (MODP)",
- self.MODP) + "\n"
- summary += "\n"
- summary += self.printEntry("Recall", self.recall) + "\n"
- summary += self.printEntry("Precision", self.precision) + "\n"
- summary += self.printEntry("F1", self.F1) + "\n"
- summary += self.printEntry("False Alarm Rate", self.FAR) + "\n"
- summary += "\n"
- summary += self.printEntry("Mostly Tracked", self.MT) + "\n"
- summary += self.printEntry("Partly Tracked", self.PT) + "\n"
- summary += self.printEntry("Mostly Lost", self.ML) + "\n"
- summary += "\n"
- summary += self.printEntry("True Positives", self.tp) + "\n"
- #summary += self.printEntry("True Positives per Sequence", self.tps) + "\n"
- summary += self.printEntry("Ignored True Positives", self.itp) + "\n"
- #summary += self.printEntry("Ignored True Positives per Sequence", self.itps) + "\n"
- summary += self.printEntry("False Positives", self.fp) + "\n"
- #summary += self.printEntry("False Positives per Sequence", self.fps) + "\n"
- summary += self.printEntry("False Negatives", self.fn) + "\n"
- #summary += self.printEntry("False Negatives per Sequence", self.fns) + "\n"
- summary += self.printEntry("ID-switches", self.id_switches) + "\n"
- self.fp = self.fp / self.n_gt
- self.fn = self.fn / self.n_gt
- self.id_switches = self.id_switches / self.n_gt
- summary += self.printEntry("False Positives Ratio", self.fp) + "\n"
- #summary += self.printEntry("False Positives per Sequence", self.fps) + "\n"
- summary += self.printEntry("False Negatives Ratio", self.fn) + "\n"
- #summary += self.printEntry("False Negatives per Sequence", self.fns) + "\n"
- summary += self.printEntry("Ignored False Negatives Ratio",
- self.ifn) + "\n"
- #summary += self.printEntry("Ignored False Negatives per Sequence", self.ifns) + "\n"
- summary += self.printEntry("Missed Targets", self.fn) + "\n"
- summary += self.printEntry("ID-switches", self.id_switches) + "\n"
- summary += self.printEntry("Fragmentations", self.fragments) + "\n"
- summary += "\n"
- summary += self.printEntry("Ground Truth Objects (Total)", self.n_gt +
- self.n_igt) + "\n"
- #summary += self.printEntry("Ground Truth Objects (Total) per Sequence", self.n_gts) + "\n"
- summary += self.printEntry("Ignored Ground Truth Objects",
- self.n_igt) + "\n"
- #summary += self.printEntry("Ignored Ground Truth Objects per Sequence", self.n_igts) + "\n"
- summary += self.printEntry("Ground Truth Trajectories",
- self.n_gt_trajectories) + "\n"
- summary += "\n"
- summary += self.printEntry("Tracker Objects (Total)", self.n_tr) + "\n"
- #summary += self.printEntry("Tracker Objects (Total) per Sequence", self.n_trs) + "\n"
- summary += self.printEntry("Ignored Tracker Objects", self.n_itr) + "\n"
- #summary += self.printEntry("Ignored Tracker Objects per Sequence", self.n_itrs) + "\n"
- summary += self.printEntry("Tracker Trajectories",
- self.n_tr_trajectories) + "\n"
- #summary += "\n"
- #summary += self.printEntry("Ignored Tracker Objects with Associated Ignored Ground Truth Objects", self.n_igttr) + "\n"
- summary += "=" * 80
- return summary
- def printEntry(self, key, val, width=(70, 10)):
- """
- Pretty print an entry in a table fashion.
- """
- s_out = key.ljust(width[0])
- if type(val) == int:
- s = "%%%dd" % width[1]
- s_out += s % val
- elif type(val) == float:
- s = "%%%df" % (width[1])
- s_out += s % val
- else:
- s_out += ("%s" % val).rjust(width[1])
- return s_out
- def saveToStats(self, save_summary):
- """
- Save the statistics in a whitespace separate file.
- """
- summary = self.createSummary()
- if save_summary:
- filename = os.path.join(self.result_path,
- "summary_%s.txt" % self.cls)
- dump = open(filename, "w+")
- dump.write(summary)
- dump.close()
- return summary
- class KITTIMOTMetric(Metric):
- def __init__(self, save_summary=True):
- self.save_summary = save_summary
- self.MOTEvaluator = KITTIEvaluation
- self.result_root = None
- self.reset()
- def reset(self):
- self.seqs = []
- self.n_sequences = 0
- self.n_frames = []
- self.strsummary = ''
- def update(self, data_root, seq, data_type, result_root, result_filename):
- assert data_type == 'kitti', "data_type should 'kitti'"
- self.result_root = result_root
- self.gt_path = data_root
- gt_path = '{}/../labels/{}.txt'.format(data_root, seq)
- gt = open(gt_path, "r")
- max_frame = 0
- for line in gt:
- line = line.strip()
- line_list = line.split(" ")
- if int(line_list[0]) > max_frame:
- max_frame = int(line_list[0])
- rs = open(result_filename, "r")
- for line in rs:
- line = line.strip()
- line_list = line.split(" ")
- if int(line_list[0]) > max_frame:
- max_frame = int(line_list[0])
- gt.close()
- rs.close()
- self.n_frames.append(max_frame + 1)
- self.seqs.append(seq)
- self.n_sequences += 1
- def accumulate(self):
- logger.info("Processing Result for KITTI Tracking Benchmark")
- e = self.MOTEvaluator(result_path=self.result_root, gt_path=self.gt_path,\
- n_frames=self.n_frames, seqs=self.seqs, n_sequences=self.n_sequences)
- try:
- if not e.loadTracker():
- return
- logger.info("Loading Results - Success")
- logger.info("Evaluate Object Class: %s" % c.upper())
- except:
- logger.info("Caught exception while loading result data.")
- if not e.loadGroundtruth():
- raise ValueError("Ground truth not found.")
- logger.info("Loading Groundtruth - Success")
- # sanity checks
- if len(e.groundtruth) is not len(e.tracker):
- logger.info(
- "The uploaded data does not provide results for every sequence.")
- return False
- logger.info("Loaded %d Sequences." % len(e.groundtruth))
- logger.info("Start Evaluation...")
- if e.compute3rdPartyMetrics():
- self.strsummary = e.saveToStats(self.save_summary)
- else:
- logger.info(
- "There seem to be no true positives or false positives at all in the submitted data."
- )
- def log(self):
- print(self.strsummary)
- def get_results(self):
- return self.strsummary
|