123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320 |
- # Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import numpy as np
- import math
- class VehicleRetrogradeRecognizer(object):
- def __init__(self, cfg):
- self.cfg = cfg
- self.filter_horizontal_flag = self.cfg['filter_horizontal_flag']
- self.deviation = self.cfg['deviation']
- self.move_scale = self.cfg['move_scale']
- self.keep_right_flag = self.cfg['keep_right_flag']
- self.center_traj_retrograde = [{}] #retrograde recognizer record use
- self.fence_line = None if len(self.cfg[
- 'fence_line']) == 0 else self.cfg['fence_line']
- def update_center_traj(self, mot_res, max_len):
- from collections import deque, defaultdict
- if mot_res is not None:
- ids = mot_res['boxes'][:, 0]
- scores = mot_res['boxes'][:, 2]
- boxes = mot_res['boxes'][:, 3:]
- boxes[:, 2] = boxes[:, 2] - boxes[:, 0]
- boxes[:, 3] = boxes[:, 3] - boxes[:, 1]
- else:
- boxes = np.zeros([0, 4])
- ids = np.zeros([0])
- scores = np.zeros([0])
- # single class, still need to be defaultdict type for ploting
- num_classes = 1
- online_tlwhs = defaultdict(list)
- online_scores = defaultdict(list)
- online_ids = defaultdict(list)
- online_tlwhs[0] = boxes
- online_ids[0] = ids
- if mot_res is not None:
- for cls_id in range(num_classes):
- tlwhs = online_tlwhs[cls_id]
- obj_ids = online_ids[cls_id]
- for i, tlwh in enumerate(tlwhs):
- x1, y1, w, h = tlwh
- center = tuple(map(int, (x1 + w / 2., y1 + h)))
- obj_id = int(obj_ids[i])
- if self.center_traj_retrograde is not None:
- if obj_id not in self.center_traj_retrograde[cls_id]:
- self.center_traj_retrograde[cls_id][obj_id] = deque(
- maxlen=max_len)
- self.center_traj_retrograde[cls_id][obj_id].append(
- center)
- def get_angle(self, array):
- x1, y1, x2, y2 = array
- a_x = x2 - x1
- a_y = y2 - y1
- angle1 = math.atan2(a_y, a_x)
- angle1 = int(angle1 * 180 / math.pi)
- a_x = x2 - x1 if y2 >= y1 else x1 - x2
- a_y = y2 - y1 if y2 >= y1 else y1 - y2
- angle2 = math.atan2(a_y, a_x)
- angle2 = int(angle2 * 180 / math.pi)
- if angle2 > 90:
- angle2 = 180 - angle2
- return angle1, angle2
- def is_move(self, array, frame_shape):
- x1, y1, x2, y2 = array
- h, w, _ = frame_shape
- if abs(x1 - x2) > w * self.move_scale or abs(y1 -
- y2) > h * self.move_scale:
- return True
- else:
- return False
- def get_distance_point2line(self, point, line):
- line_point1, line_point2 = np.array(line[0:2]), np.array(line[2:])
- vec1 = line_point1 - point
- vec2 = line_point2 - point
- distance = np.abs(np.cross(vec1, vec2)) / np.linalg.norm(line_point1 -
- line_point2)
- return distance
- def driving_direction(self, line1, line2, is_init=False):
- x1, y1 = line1[2] - line1[0], line1[3] - line1[1]
- x2, y2 = line2[0] - line1[0], line2[1] - line1[1]
- result = x1 * y2 - x2 * y1
- distance = self.get_distance_point2line([x2, y2], line1)
- if result < 0:
- result = 1
- elif result == 0:
- if line2[3] >= line2[1]:
- return -1
- else:
- return 1
- else:
- result = -1
- return result, distance
- def get_long_fence_line(self, h, w, line):
- x1, y1, x2, y2 = line
- if x1 == x2:
- return [x1, 0, x1, h]
- if y1 == y2:
- return [0, y1, w, y1]
- k = (y2 - y1) / (x2 - x1)
- b = y1 - k * x1
- if k == 1 and b == 0:
- return [0, 0, w, h]
- if k == -1 and b == 0:
- return [w, 0, h, h]
- top = [-b / k, 0]
- left = [0, b]
- right = [w, k * w + b]
- bottom = [(h - b) / k, h]
- candidate = np.array([top, left, right, bottom])
- flag = np.array([0, 0, 0, 0])
- if top[0] >= 0 and top[0] <= w:
- flag[0] = 1
- if left[1] > 0 and left[1] <= h:
- flag[1] = 1
- if right[1] > 0 and right[1] <= h:
- flag[2] = 1
- if bottom[0] > 0 and bottom[0] < w:
- flag[3] = 1
- ind = np.where(flag == 1)
- candidate = candidate[ind]
- candidate_sort = candidate[candidate[:, 1].argsort()]
- return [
- int(candidate_sort[0][0]), int(candidate_sort[0][1]),
- int(candidate_sort[1][0]), int(candidate_sort[1][1])
- ]
- def init_fence_line(self, lanes, pos_dir_traj, neg_dir_traj, frame_shape):
- fence_lines_candidate = None
- h, w, _ = frame_shape
- abs_distance = h * h + w * w
- for lane in lanes[0]:
- pos_dir_distansce = h * h + w * w
- neg_dir_distansce = h * h + w * w
- pos_dir = 0
- neg_dir = 0
- for traj_line in pos_dir_traj:
- dir_result, distansce = self.driving_direction(
- lane, traj_line['traj_line'])
- if dir_result > 0:
- pos_dir_distansce = distansce if distansce < pos_dir_distansce else pos_dir_distansce
- pos_dir = 1
- else:
- neg_dir_distansce = distansce if distansce < neg_dir_distansce else neg_dir_distansce
- neg_dir = 1
- if pos_dir > 0 and neg_dir > 0:
- continue
- for traj_line in neg_dir_traj:
- dir_result, distansce = self.driving_direction(
- lane, traj_line['traj_line'])
- if dir_result > 0:
- pos_dir_distansce = distansce if distansce < pos_dir_distansce else pos_dir_distansce
- pos_dir = 1
- else:
- neg_dir_distansce = distansce if distansce < neg_dir_distansce else neg_dir_distansce
- neg_dir = 1
- if pos_dir > 0 and neg_dir > 0:
- diff_dir_distance = abs(pos_dir_distansce - neg_dir_distansce)
- if diff_dir_distance < abs_distance:
- fence_lines_candidate = lane
- abs_distance = diff_dir_distance
- if fence_lines_candidate is None:
- return None
- fence_lines_candidate = self.get_long_fence_line(h, w,
- fence_lines_candidate)
- return fence_lines_candidate
- def judge_retrograde(self, traj_line):
- line1 = self.fence_line
- x1, y1 = line1[2] - line1[0], line1[3] - line1[1]
- line2 = traj_line['traj_line']
- x2_start_point, y2_start_point = line2[0] - line1[0], line2[1] - line1[
- 1]
- x2_end_point, y2_end_point = line2[2] - line1[0], line2[3] - line1[1]
- start_point_dir = x1 * y2_start_point - x2_start_point * y1
- end_point_dir = x1 * y2_end_point - x2_end_point * y1
- if start_point_dir < 0:
- start_point_dir = 1
- elif start_point_dir == 0:
- if line2[3] >= line2[1]:
- start_point_dir = -1
- else:
- start_point_dir = 1
- else:
- start_point_dir = -1
- if end_point_dir < 0:
- end_point_dir = 1
- elif end_point_dir == 0:
- if line2[3] >= line2[1]:
- end_point_dir = -1
- else:
- end_point_dir = 1
- else:
- end_point_dir = -1
- if self.keep_right_flag:
- driver_dir = -1 if (line2[3] - line2[1]) >= 0 else 1
- else:
- driver_dir = -1 if (line2[3] - line2[1]) <= 0 else 1
- return start_point_dir == driver_dir and start_point_dir == end_point_dir
- def mot_run(self, lanes_res, det_res, frame_shape):
- det = det_res['boxes']
- directions = lanes_res['directions']
- lanes = lanes_res['output']
- if len(directions) > 0:
- direction = directions[0]
- else:
- return [], self.fence_line
- if len(det) == 0:
- return [], self.fence_line
- traj_lines = []
- pos_dir_traj = []
- neg_dir_traj = []
- for i in range(len(det)):
- class_id = int(det[i][1])
- mot_id = int(det[i][0])
- traj_i = self.center_traj_retrograde[class_id][mot_id]
- if len(traj_i) < 2:
- continue
- traj_line = {
- 'index': i,
- 'mot_id': mot_id,
- 'traj_line':
- [traj_i[0][0], traj_i[0][1], traj_i[-1][0], traj_i[-1][1]]
- }
- if not self.is_move(traj_line['traj_line'], frame_shape):
- continue
- angle, angle_deviation = self.get_angle(traj_line['traj_line'])
- if direction is not None and self.filter_horizontal_flag:
- if abs(angle_deviation - direction) > self.deviation:
- continue
- traj_line['angle'] = angle
- traj_lines.append(traj_line)
- if self.fence_line is None:
- if angle >= 0:
- pos_dir_traj.append(traj_line)
- else:
- neg_dir_traj.append(traj_line)
- if len(traj_lines) == 0:
- return [], self.fence_line
- if self.fence_line is None:
- if len(pos_dir_traj) < 1 or len(neg_dir_traj) < 1:
- return [], None
- self.fence_line = self.init_fence_line(lanes, pos_dir_traj,
- neg_dir_traj, frame_shape)
- return [], self.fence_line
- else:
- retrograde_list = []
- for traj_line in traj_lines:
- if self.judge_retrograde(traj_line) == False:
- retrograde_list.append(det[traj_line['index']][0])
- return retrograde_list, self.fence_line
|