123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365 |
- # Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import cv2
- import numpy as np
- import time
- import argparse
- from scipy.special import softmax
- from openvino.runtime import Core
- def image_preprocess(img_path, re_shape):
- img = cv2.imread(img_path)
- img = cv2.resize(
- img, (re_shape, re_shape), interpolation=cv2.INTER_LANCZOS4)
- img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
- img = np.transpose(img, [2, 0, 1]) / 255
- img = np.expand_dims(img, 0)
- img_mean = np.array([0.485, 0.456, 0.406]).reshape((3, 1, 1))
- img_std = np.array([0.229, 0.224, 0.225]).reshape((3, 1, 1))
- img -= img_mean
- img /= img_std
- return img.astype(np.float32)
- def draw_box(img, results, class_label, scale_x, scale_y):
- label_list = list(
- map(lambda x: x.strip(), open(class_label, 'r').readlines()))
- for i in range(len(results)):
- print(label_list[int(results[i][0])], ':', results[i][1])
- bbox = results[i, 2:]
- label_id = int(results[i, 0])
- score = results[i, 1]
- if (score > 0.20):
- xmin, ymin, xmax, ymax = [
- int(bbox[0] * scale_x), int(bbox[1] * scale_y),
- int(bbox[2] * scale_x), int(bbox[3] * scale_y)
- ]
- cv2.rectangle(img, (xmin, ymin), (xmax, ymax), (0, 255, 0), 3)
- font = cv2.FONT_HERSHEY_SIMPLEX
- label_text = label_list[label_id]
- cv2.rectangle(img, (xmin, ymin), (xmax, ymin - 60), (0, 255, 0), -1)
- cv2.putText(img, "#" + label_text, (xmin, ymin - 10), font, 1,
- (255, 255, 255), 2, cv2.LINE_AA)
- cv2.putText(img,
- str(round(score, 3)), (xmin, ymin - 40), font, 0.8,
- (255, 255, 255), 2, cv2.LINE_AA)
- return img
- def hard_nms(box_scores, iou_threshold, top_k=-1, candidate_size=200):
- """
- Args:
- box_scores (N, 5): boxes in corner-form and probabilities.
- iou_threshold: intersection over union threshold.
- top_k: keep top_k results. If k <= 0, keep all the results.
- candidate_size: only consider the candidates with the highest scores.
- Returns:
- picked: a list of indexes of the kept boxes
- """
- scores = box_scores[:, -1]
- boxes = box_scores[:, :-1]
- picked = []
- indexes = np.argsort(scores)
- indexes = indexes[-candidate_size:]
- while len(indexes) > 0:
- current = indexes[-1]
- picked.append(current)
- if 0 < top_k == len(picked) or len(indexes) == 1:
- break
- current_box = boxes[current, :]
- indexes = indexes[:-1]
- rest_boxes = boxes[indexes, :]
- iou = iou_of(
- rest_boxes,
- np.expand_dims(
- current_box, axis=0), )
- indexes = indexes[iou <= iou_threshold]
- return box_scores[picked, :]
- def iou_of(boxes0, boxes1, eps=1e-5):
- """Return intersection-over-union (Jaccard index) of boxes.
- Args:
- boxes0 (N, 4): ground truth boxes.
- boxes1 (N or 1, 4): predicted boxes.
- eps: a small number to avoid 0 as denominator.
- Returns:
- iou (N): IoU values.
- """
- overlap_left_top = np.maximum(boxes0[..., :2], boxes1[..., :2])
- overlap_right_bottom = np.minimum(boxes0[..., 2:], boxes1[..., 2:])
- overlap_area = area_of(overlap_left_top, overlap_right_bottom)
- area0 = area_of(boxes0[..., :2], boxes0[..., 2:])
- area1 = area_of(boxes1[..., :2], boxes1[..., 2:])
- return overlap_area / (area0 + area1 - overlap_area + eps)
- def area_of(left_top, right_bottom):
- """Compute the areas of rectangles given two corners.
- Args:
- left_top (N, 2): left top corner.
- right_bottom (N, 2): right bottom corner.
- Returns:
- area (N): return the area.
- """
- hw = np.clip(right_bottom - left_top, 0.0, None)
- return hw[..., 0] * hw[..., 1]
- class PicoDetPostProcess(object):
- """
- Args:
- input_shape (int): network input image size
- ori_shape (int): ori image shape of before padding
- scale_factor (float): scale factor of ori image
- enable_mkldnn (bool): whether to open MKLDNN
- """
- def __init__(self,
- input_shape,
- ori_shape,
- scale_factor,
- strides=[8, 16, 32, 64],
- score_threshold=0.4,
- nms_threshold=0.5,
- nms_top_k=1000,
- keep_top_k=100):
- self.ori_shape = ori_shape
- self.input_shape = input_shape
- self.scale_factor = scale_factor
- self.strides = strides
- self.score_threshold = score_threshold
- self.nms_threshold = nms_threshold
- self.nms_top_k = nms_top_k
- self.keep_top_k = keep_top_k
- def warp_boxes(self, boxes, ori_shape):
- """Apply transform to boxes
- """
- width, height = ori_shape[1], ori_shape[0]
- n = len(boxes)
- if n:
- # warp points
- xy = np.ones((n * 4, 3))
- xy[:, :2] = boxes[:, [0, 1, 2, 3, 0, 3, 2, 1]].reshape(
- n * 4, 2) # x1y1, x2y2, x1y2, x2y1
- # xy = xy @ M.T # transform
- xy = (xy[:, :2] / xy[:, 2:3]).reshape(n, 8) # rescale
- # create new boxes
- x = xy[:, [0, 2, 4, 6]]
- y = xy[:, [1, 3, 5, 7]]
- xy = np.concatenate(
- (x.min(1), y.min(1), x.max(1), y.max(1))).reshape(4, n).T
- # clip boxes
- xy[:, [0, 2]] = xy[:, [0, 2]].clip(0, width)
- xy[:, [1, 3]] = xy[:, [1, 3]].clip(0, height)
- return xy.astype(np.float32)
- else:
- return boxes
- def __call__(self, scores, raw_boxes):
- batch_size = raw_boxes[0].shape[0]
- reg_max = int(raw_boxes[0].shape[-1] / 4 - 1)
- out_boxes_num = []
- out_boxes_list = []
- for batch_id in range(batch_size):
- # generate centers
- decode_boxes = []
- select_scores = []
- for stride, box_distribute, score in zip(self.strides, raw_boxes,
- scores):
- box_distribute = box_distribute[batch_id]
- score = score[batch_id]
- # centers
- fm_h = self.input_shape[0] / stride
- fm_w = self.input_shape[1] / stride
- h_range = np.arange(fm_h)
- w_range = np.arange(fm_w)
- ww, hh = np.meshgrid(w_range, h_range)
- ct_row = (hh.flatten() + 0.5) * stride
- ct_col = (ww.flatten() + 0.5) * stride
- center = np.stack((ct_col, ct_row, ct_col, ct_row), axis=1)
- # box distribution to distance
- reg_range = np.arange(reg_max + 1)
- box_distance = box_distribute.reshape((-1, reg_max + 1))
- box_distance = softmax(box_distance, axis=1)
- box_distance = box_distance * np.expand_dims(reg_range, axis=0)
- box_distance = np.sum(box_distance, axis=1).reshape((-1, 4))
- box_distance = box_distance * stride
- # top K candidate
- topk_idx = np.argsort(score.max(axis=1))[::-1]
- topk_idx = topk_idx[:self.nms_top_k]
- center = center[topk_idx]
- score = score[topk_idx]
- box_distance = box_distance[topk_idx]
- # decode box
- decode_box = center + [-1, -1, 1, 1] * box_distance
- select_scores.append(score)
- decode_boxes.append(decode_box)
- # nms
- bboxes = np.concatenate(decode_boxes, axis=0)
- confidences = np.concatenate(select_scores, axis=0)
- picked_box_probs = []
- picked_labels = []
- for class_index in range(0, confidences.shape[1]):
- probs = confidences[:, class_index]
- mask = probs > self.score_threshold
- probs = probs[mask]
- if probs.shape[0] == 0:
- continue
- subset_boxes = bboxes[mask, :]
- box_probs = np.concatenate(
- [subset_boxes, probs.reshape(-1, 1)], axis=1)
- box_probs = hard_nms(
- box_probs,
- iou_threshold=self.nms_threshold,
- top_k=self.keep_top_k, )
- picked_box_probs.append(box_probs)
- picked_labels.extend([class_index] * box_probs.shape[0])
- if len(picked_box_probs) == 0:
- out_boxes_list.append(np.empty((0, 4)))
- out_boxes_num.append(0)
- else:
- picked_box_probs = np.concatenate(picked_box_probs)
- # resize output boxes
- picked_box_probs[:, :4] = self.warp_boxes(
- picked_box_probs[:, :4], self.ori_shape[batch_id])
- im_scale = np.concatenate([
- self.scale_factor[batch_id][::-1],
- self.scale_factor[batch_id][::-1]
- ])
- picked_box_probs[:, :4] /= im_scale
- # clas score box
- out_boxes_list.append(
- np.concatenate(
- [
- np.expand_dims(
- np.array(picked_labels),
- axis=-1), np.expand_dims(
- picked_box_probs[:, 4], axis=-1),
- picked_box_probs[:, :4]
- ],
- axis=1))
- out_boxes_num.append(len(picked_labels))
- out_boxes_list = np.concatenate(out_boxes_list, axis=0)
- out_boxes_num = np.asarray(out_boxes_num).astype(np.int32)
- return out_boxes_list, out_boxes_num
- def detect(img_file, compiled_model, re_shape, class_label):
- output = compiled_model.infer_new_request({0: test_image})
- result_ie = list(output.values()) #[0]
- test_im_shape = np.array([[re_shape, re_shape]]).astype('float32')
- test_scale_factor = np.array([[1, 1]]).astype('float32')
- np_score_list = []
- np_boxes_list = []
- num_outs = int(len(result_ie) / 2)
- for out_idx in range(num_outs):
- np_score_list.append(result_ie[out_idx])
- np_boxes_list.append(result_ie[out_idx + num_outs])
- postprocess = PicoDetPostProcess(test_image.shape[2:], test_im_shape,
- test_scale_factor)
- np_boxes, np_boxes_num = postprocess(np_score_list, np_boxes_list)
- image = cv2.imread(img_file, 1)
- scale_x = image.shape[1] / test_image.shape[3]
- scale_y = image.shape[0] / test_image.shape[2]
- res_image = draw_box(image, np_boxes, class_label, scale_x, scale_y)
- cv2.imwrite('res.jpg', res_image)
- cv2.imshow("res", res_image)
- cv2.waitKey()
- def benchmark(test_image, compiled_model):
- # benchmark
- loop_num = 100
- warm_up = 8
- timeall = 0
- time_min = float("inf")
- time_max = float('-inf')
- for i in range(loop_num + warm_up):
- time0 = time.time()
- #perform the inference step
- output = compiled_model.infer_new_request({0: test_image})
- time1 = time.time()
- timed = time1 - time0
- if i >= warm_up:
- timeall = timeall + timed
- time_min = min(time_min, timed)
- time_max = max(time_max, timed)
- time_avg = timeall / loop_num
- print('inference_time(ms): min={}, max={}, avg={}'.format(
- round(time_min * 1000, 2),
- round(time_max * 1000, 1), round(time_avg * 1000, 1)))
- if __name__ == '__main__':
- parser = argparse.ArgumentParser()
- parser.add_argument(
- '--benchmark', type=int, default=1, help="0:detect; 1:benchmark")
- parser.add_argument(
- '--img_path',
- type=str,
- default='../../../../demo/000000014439.jpg',
- help="image path")
- parser.add_argument(
- '--onnx_path',
- type=str,
- default='out_onnxsim/picodet_s_320_processed.onnx',
- help="onnx filepath")
- parser.add_argument('--in_shape', type=int, default=320, help="input_size")
- parser.add_argument(
- '--class_label',
- type=str,
- default='coco_label.txt',
- help="class label file")
- args = parser.parse_args()
- ie = Core()
- net = ie.read_model(args.onnx_path)
- test_image = image_preprocess(args.img_path, args.in_shape)
- compiled_model = ie.compile_model(net, 'CPU')
- if args.benchmark == 0:
- detect(args.img_path, compiled_model, args.in_shape, args.class_label)
- if args.benchmark == 1:
- benchmark(test_image, compiled_model)
|