vehicle_retrograde.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320
  1. # Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import numpy as np
  15. import math
  16. class VehicleRetrogradeRecognizer(object):
  17. def __init__(self, cfg):
  18. self.cfg = cfg
  19. self.filter_horizontal_flag = self.cfg['filter_horizontal_flag']
  20. self.deviation = self.cfg['deviation']
  21. self.move_scale = self.cfg['move_scale']
  22. self.keep_right_flag = self.cfg['keep_right_flag']
  23. self.center_traj_retrograde = [{}] #retrograde recognizer record use
  24. self.fence_line = None if len(self.cfg[
  25. 'fence_line']) == 0 else self.cfg['fence_line']
  26. def update_center_traj(self, mot_res, max_len):
  27. from collections import deque, defaultdict
  28. if mot_res is not None:
  29. ids = mot_res['boxes'][:, 0]
  30. scores = mot_res['boxes'][:, 2]
  31. boxes = mot_res['boxes'][:, 3:]
  32. boxes[:, 2] = boxes[:, 2] - boxes[:, 0]
  33. boxes[:, 3] = boxes[:, 3] - boxes[:, 1]
  34. else:
  35. boxes = np.zeros([0, 4])
  36. ids = np.zeros([0])
  37. scores = np.zeros([0])
  38. # single class, still need to be defaultdict type for ploting
  39. num_classes = 1
  40. online_tlwhs = defaultdict(list)
  41. online_scores = defaultdict(list)
  42. online_ids = defaultdict(list)
  43. online_tlwhs[0] = boxes
  44. online_ids[0] = ids
  45. if mot_res is not None:
  46. for cls_id in range(num_classes):
  47. tlwhs = online_tlwhs[cls_id]
  48. obj_ids = online_ids[cls_id]
  49. for i, tlwh in enumerate(tlwhs):
  50. x1, y1, w, h = tlwh
  51. center = tuple(map(int, (x1 + w / 2., y1 + h)))
  52. obj_id = int(obj_ids[i])
  53. if self.center_traj_retrograde is not None:
  54. if obj_id not in self.center_traj_retrograde[cls_id]:
  55. self.center_traj_retrograde[cls_id][obj_id] = deque(
  56. maxlen=max_len)
  57. self.center_traj_retrograde[cls_id][obj_id].append(
  58. center)
  59. def get_angle(self, array):
  60. x1, y1, x2, y2 = array
  61. a_x = x2 - x1
  62. a_y = y2 - y1
  63. angle1 = math.atan2(a_y, a_x)
  64. angle1 = int(angle1 * 180 / math.pi)
  65. a_x = x2 - x1 if y2 >= y1 else x1 - x2
  66. a_y = y2 - y1 if y2 >= y1 else y1 - y2
  67. angle2 = math.atan2(a_y, a_x)
  68. angle2 = int(angle2 * 180 / math.pi)
  69. if angle2 > 90:
  70. angle2 = 180 - angle2
  71. return angle1, angle2
  72. def is_move(self, array, frame_shape):
  73. x1, y1, x2, y2 = array
  74. h, w, _ = frame_shape
  75. if abs(x1 - x2) > w * self.move_scale or abs(y1 -
  76. y2) > h * self.move_scale:
  77. return True
  78. else:
  79. return False
  80. def get_distance_point2line(self, point, line):
  81. line_point1, line_point2 = np.array(line[0:2]), np.array(line[2:])
  82. vec1 = line_point1 - point
  83. vec2 = line_point2 - point
  84. distance = np.abs(np.cross(vec1, vec2)) / np.linalg.norm(line_point1 -
  85. line_point2)
  86. return distance
  87. def driving_direction(self, line1, line2, is_init=False):
  88. x1, y1 = line1[2] - line1[0], line1[3] - line1[1]
  89. x2, y2 = line2[0] - line1[0], line2[1] - line1[1]
  90. result = x1 * y2 - x2 * y1
  91. distance = self.get_distance_point2line([x2, y2], line1)
  92. if result < 0:
  93. result = 1
  94. elif result == 0:
  95. if line2[3] >= line2[1]:
  96. return -1
  97. else:
  98. return 1
  99. else:
  100. result = -1
  101. return result, distance
  102. def get_long_fence_line(self, h, w, line):
  103. x1, y1, x2, y2 = line
  104. if x1 == x2:
  105. return [x1, 0, x1, h]
  106. if y1 == y2:
  107. return [0, y1, w, y1]
  108. k = (y2 - y1) / (x2 - x1)
  109. b = y1 - k * x1
  110. if k == 1 and b == 0:
  111. return [0, 0, w, h]
  112. if k == -1 and b == 0:
  113. return [w, 0, h, h]
  114. top = [-b / k, 0]
  115. left = [0, b]
  116. right = [w, k * w + b]
  117. bottom = [(h - b) / k, h]
  118. candidate = np.array([top, left, right, bottom])
  119. flag = np.array([0, 0, 0, 0])
  120. if top[0] >= 0 and top[0] <= w:
  121. flag[0] = 1
  122. if left[1] > 0 and left[1] <= h:
  123. flag[1] = 1
  124. if right[1] > 0 and right[1] <= h:
  125. flag[2] = 1
  126. if bottom[0] > 0 and bottom[0] < w:
  127. flag[3] = 1
  128. ind = np.where(flag == 1)
  129. candidate = candidate[ind]
  130. candidate_sort = candidate[candidate[:, 1].argsort()]
  131. return [
  132. int(candidate_sort[0][0]), int(candidate_sort[0][1]),
  133. int(candidate_sort[1][0]), int(candidate_sort[1][1])
  134. ]
  135. def init_fence_line(self, lanes, pos_dir_traj, neg_dir_traj, frame_shape):
  136. fence_lines_candidate = None
  137. h, w, _ = frame_shape
  138. abs_distance = h * h + w * w
  139. for lane in lanes[0]:
  140. pos_dir_distansce = h * h + w * w
  141. neg_dir_distansce = h * h + w * w
  142. pos_dir = 0
  143. neg_dir = 0
  144. for traj_line in pos_dir_traj:
  145. dir_result, distansce = self.driving_direction(
  146. lane, traj_line['traj_line'])
  147. if dir_result > 0:
  148. pos_dir_distansce = distansce if distansce < pos_dir_distansce else pos_dir_distansce
  149. pos_dir = 1
  150. else:
  151. neg_dir_distansce = distansce if distansce < neg_dir_distansce else neg_dir_distansce
  152. neg_dir = 1
  153. if pos_dir > 0 and neg_dir > 0:
  154. continue
  155. for traj_line in neg_dir_traj:
  156. dir_result, distansce = self.driving_direction(
  157. lane, traj_line['traj_line'])
  158. if dir_result > 0:
  159. pos_dir_distansce = distansce if distansce < pos_dir_distansce else pos_dir_distansce
  160. pos_dir = 1
  161. else:
  162. neg_dir_distansce = distansce if distansce < neg_dir_distansce else neg_dir_distansce
  163. neg_dir = 1
  164. if pos_dir > 0 and neg_dir > 0:
  165. diff_dir_distance = abs(pos_dir_distansce - neg_dir_distansce)
  166. if diff_dir_distance < abs_distance:
  167. fence_lines_candidate = lane
  168. abs_distance = diff_dir_distance
  169. if fence_lines_candidate is None:
  170. return None
  171. fence_lines_candidate = self.get_long_fence_line(h, w,
  172. fence_lines_candidate)
  173. return fence_lines_candidate
  174. def judge_retrograde(self, traj_line):
  175. line1 = self.fence_line
  176. x1, y1 = line1[2] - line1[0], line1[3] - line1[1]
  177. line2 = traj_line['traj_line']
  178. x2_start_point, y2_start_point = line2[0] - line1[0], line2[1] - line1[
  179. 1]
  180. x2_end_point, y2_end_point = line2[2] - line1[0], line2[3] - line1[1]
  181. start_point_dir = x1 * y2_start_point - x2_start_point * y1
  182. end_point_dir = x1 * y2_end_point - x2_end_point * y1
  183. if start_point_dir < 0:
  184. start_point_dir = 1
  185. elif start_point_dir == 0:
  186. if line2[3] >= line2[1]:
  187. start_point_dir = -1
  188. else:
  189. start_point_dir = 1
  190. else:
  191. start_point_dir = -1
  192. if end_point_dir < 0:
  193. end_point_dir = 1
  194. elif end_point_dir == 0:
  195. if line2[3] >= line2[1]:
  196. end_point_dir = -1
  197. else:
  198. end_point_dir = 1
  199. else:
  200. end_point_dir = -1
  201. if self.keep_right_flag:
  202. driver_dir = -1 if (line2[3] - line2[1]) >= 0 else 1
  203. else:
  204. driver_dir = -1 if (line2[3] - line2[1]) <= 0 else 1
  205. return start_point_dir == driver_dir and start_point_dir == end_point_dir
  206. def mot_run(self, lanes_res, det_res, frame_shape):
  207. det = det_res['boxes']
  208. directions = lanes_res['directions']
  209. lanes = lanes_res['output']
  210. if len(directions) > 0:
  211. direction = directions[0]
  212. else:
  213. return [], self.fence_line
  214. if len(det) == 0:
  215. return [], self.fence_line
  216. traj_lines = []
  217. pos_dir_traj = []
  218. neg_dir_traj = []
  219. for i in range(len(det)):
  220. class_id = int(det[i][1])
  221. mot_id = int(det[i][0])
  222. traj_i = self.center_traj_retrograde[class_id][mot_id]
  223. if len(traj_i) < 2:
  224. continue
  225. traj_line = {
  226. 'index': i,
  227. 'mot_id': mot_id,
  228. 'traj_line':
  229. [traj_i[0][0], traj_i[0][1], traj_i[-1][0], traj_i[-1][1]]
  230. }
  231. if not self.is_move(traj_line['traj_line'], frame_shape):
  232. continue
  233. angle, angle_deviation = self.get_angle(traj_line['traj_line'])
  234. if direction is not None and self.filter_horizontal_flag:
  235. if abs(angle_deviation - direction) > self.deviation:
  236. continue
  237. traj_line['angle'] = angle
  238. traj_lines.append(traj_line)
  239. if self.fence_line is None:
  240. if angle >= 0:
  241. pos_dir_traj.append(traj_line)
  242. else:
  243. neg_dir_traj.append(traj_line)
  244. if len(traj_lines) == 0:
  245. return [], self.fence_line
  246. if self.fence_line is None:
  247. if len(pos_dir_traj) < 1 or len(neg_dir_traj) < 1:
  248. return [], None
  249. self.fence_line = self.init_fence_line(lanes, pos_dir_traj,
  250. neg_dir_traj, frame_shape)
  251. return [], self.fence_line
  252. else:
  253. retrograde_list = []
  254. for traj_line in traj_lines:
  255. if self.judge_retrograde(traj_line) == False:
  256. retrograde_list.append(det[traj_line['index']][0])
  257. return retrograde_list, self.fence_line