123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231 |
- # Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import argparse
- import codecs
- import os
- import yaml
- import numpy as np
- import cv2
- from sklearn.cluster import DBSCAN
- from pptracking.python.det_infer import load_predictor
- class LaneSegPredictor:
- def __init__(self, lane_seg_config, model_dir):
- """
- Prepare for prediction.
- The usage and docs of paddle inference, please refer to
- https://paddleinference.paddlepaddle.org.cn/product_introduction/summary.html
- """
- if not os.path.exists(lane_seg_config):
- raise ValueError("Cannot find : {},".format(lane_seg_config))
- args = yaml.safe_load(open(lane_seg_config))
- self.model_dir = model_dir
- self.args = args[args['type']]
- self.shape = None
- self.filter_horizontal_flag = self.args['filter_horizontal_flag']
- self.horizontal_filtration_degree = self.args[
- 'horizontal_filtration_degree']
- self.horizontal_filtering_threshold = self.args[
- 'horizontal_filtering_threshold']
- try:
- self.predictor, _ = load_predictor(
- model_dir=self.model_dir,
- run_mode=self.args['run_mode'],
- batch_size=self.args['batch_size'],
- device=self.args['device'],
- min_subgraph_size=self.args['min_subgraph_size'],
- use_dynamic_shape=self.args['use_dynamic_shape'],
- trt_min_shape=self.args['trt_min_shape'],
- trt_max_shape=self.args['trt_max_shape'],
- trt_opt_shape=self.args['trt_opt_shape'],
- trt_calib_mode=self.args['trt_calib_mode'],
- cpu_threads=self.args['cpu_threads'],
- enable_mkldnn=self.args['enable_mkldnn'])
- except Exception as e:
- print(str(e))
- exit()
- def run(self, img):
- input_names = self.predictor.get_input_names()
- input_handle = self.predictor.get_input_handle(input_names[0])
- output_names = self.predictor.get_output_names()
- output_handle = self.predictor.get_output_handle(output_names[0])
- img = np.array(img)
- self.shape = img.shape[1:3]
- img = self.normalize(img)
- img = np.transpose(img, (0, 3, 1, 2))
- input_handle.reshape(img.shape)
- input_handle.copy_from_cpu(img)
- self.predictor.run()
- results = output_handle.copy_to_cpu()
- results = self.postprocess(results)
- return self.get_line(results)
- def normalize(self, im, mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5)):
- mean = np.array(mean)[np.newaxis, np.newaxis, :]
- std = np.array(std)[np.newaxis, np.newaxis, :]
- im = im.astype(np.float32, copy=False) / 255.0
- im -= mean
- im /= std
- return im
- def postprocess(self, pred):
- pred = np.argmax(pred, axis=1)
- pred[pred == 3] = 0
- pred[pred > 0] = 255
- return pred
- def get_line(self, results):
- lines = []
- directions = []
- for i in range(results.shape[0]):
- line, direction = self.hough_line(np.uint8(results[i]))
- lines.append(line)
- directions.append(direction)
- return lines, directions
- def get_distance(self, array_1, array_2):
- lon_a = array_1[0]
- lat_a = array_1[1]
- lon_b = array_2[0]
- lat_b = array_2[1]
- s = pow(pow((lat_b - lat_a), 2) + pow((lon_b - lon_a), 2), 0.5)
- return s
- def get_angle(self, array):
- import math
- x1, y1, x2, y2 = array
- a_x = x2 - x1
- a_y = y2 - y1
- angle1 = math.atan2(a_y, a_x)
- angle1 = int(angle1 * 180 / math.pi)
- if angle1 > 90:
- angle1 = 180 - angle1
- return angle1
- def get_proportion(self, lines):
- proportion = 0.0
- h, w = self.shape
- for line in lines:
- x1, y1, x2, y2 = line
- length = abs(y2 - y1) / h + abs(x2 - x1) / w
- proportion = proportion + length
- return proportion
- def line_cluster(self, linesP):
- points = []
- for i in range(0, len(linesP)):
- l = linesP[i]
- x_center = (float(
- (max(l[2], l[0]) - min(l[2], l[0]))) / 2.0 + min(l[2], l[0]))
- y_center = (float(
- (max(l[3], l[1]) - min(l[3], l[1]))) / 2.0 + min(l[3], l[1]))
- points.append([x_center, y_center])
- dbscan = DBSCAN(
- eps=50, min_samples=2, metric=self.get_distance).fit(points)
- labels = dbscan.labels_
- n_clusters_ = len(set(labels)) - (1 if -1 in labels else 0)
- cluster_list = list([] for i in range(n_clusters_))
- if linesP is not None:
- for i in range(0, len(linesP)):
- if labels[i] == -1:
- continue
- l = linesP[i]
- x1, y1, x2, y2 = l
- if y2 >= y1:
- cluster_list[labels[i]].append([x1, y1, x2, y2])
- else:
- ll = [x2, y2, x1, y1]
- cluster_list[labels[i]].append(ll)
- return cluster_list
- def hough_line(self,
- binary_img,
- min_line=50,
- min_line_points=50,
- max_line_gap=10):
- linesP = cv2.HoughLinesP(binary_img, 1, np.pi / 180, min_line, None,
- min_line_points, max_line_gap)
- if linesP is None:
- return [], None
- coarse_cluster_list = self.line_cluster(linesP[:, 0])
- filter_lines_output, direction = self.filter_lines(coarse_cluster_list)
- return filter_lines_output, direction
- def filter_lines(self, coarse_cluster_list):
- lines = []
- angles = []
- for i in range(len(coarse_cluster_list)):
- if len(coarse_cluster_list[i]) == 0:
- continue
- coarse_cluster_list[i] = np.array(coarse_cluster_list[i])
- distance = abs(coarse_cluster_list[i][:, 3] - coarse_cluster_list[i]
- [:, 1]) + abs(coarse_cluster_list[i][:, 2] -
- coarse_cluster_list[i][:, 0])
- l = coarse_cluster_list[i][np.argmax(distance)]
- angles.append(self.get_angle(l))
- lines.append(l)
- if len(lines) == 0:
- return [], None
- if not self.filter_horizontal_flag:
- return lines, None
- #filter horizontal roads
- angles = np.array(angles)
- max_angle, min_angle = np.max(angles), np.min(angles)
- if (max_angle - min_angle) < self.horizontal_filtration_degree:
- return lines, np.mean(angles)
- thr_angle = (
- max_angle + min_angle) * self.horizontal_filtering_threshold
- lines = np.array(lines)
- min_angle_line = lines[np.where(angles < thr_angle)]
- max_angle_line = lines[np.where(angles >= thr_angle)]
- max_angle_line_pro = self.get_proportion(max_angle_line)
- min_angle_line_pro = self.get_proportion(min_angle_line)
- if max_angle_line_pro >= min_angle_line_pro:
- angle_list = angles[np.where(angles >= thr_angle)]
- return max_angle_line, np.mean(angle_list)
- else:
- angle_list = angles[np.where(angles < thr_angle)]
- return min_angle_line, np.mean(angle_list)
|