123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395 |
- # Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import paddle
- import paddle.nn as nn
- import paddle.nn.functional as F
- from ppdet.core.workspace import register
- from paddle import ParamAttr
- from paddle.regularizer import L2Decay
- from .fcos_head import ScaleReg
- from ..initializer import bias_init_with_prob, constant_, normal_
- from ..ops import get_act_fn, anchor_generator
- from ..rbox_utils import box2corners
- from ..losses import ProbIoULoss
- import numpy as np
- __all__ = ['FCOSRHead']
- def trunc_div(a, b):
- ipt = paddle.divide(a, b)
- sign_ipt = paddle.sign(ipt)
- abs_ipt = paddle.abs(ipt)
- abs_ipt = paddle.floor(abs_ipt)
- out = paddle.multiply(sign_ipt, abs_ipt)
- return out
- def fmod(a, b):
- return a - trunc_div(a, b) * b
- def fmod_eval(a, b):
- return a - a.divide(b).cast(paddle.int32).cast(paddle.float32) * b
- class ConvBNLayer(nn.Layer):
- def __init__(self,
- ch_in,
- ch_out,
- filter_size=3,
- stride=1,
- groups=1,
- padding=0,
- norm_cfg={'name': 'gn',
- 'num_groups': 32},
- act=None):
- super(ConvBNLayer, self).__init__()
- self.conv = nn.Conv2D(
- in_channels=ch_in,
- out_channels=ch_out,
- kernel_size=filter_size,
- stride=stride,
- padding=padding,
- groups=groups,
- bias_attr=False)
- norm_type = norm_cfg['name']
- if norm_type in ['sync_bn', 'bn']:
- self.norm = nn.BatchNorm2D(
- ch_out,
- weight_attr=ParamAttr(regularizer=L2Decay(0.0)),
- bias_attr=ParamAttr(regularizer=L2Decay(0.0)))
- else:
- groups = norm_cfg.get('num_groups', 1)
- self.norm = nn.GroupNorm(
- num_groups=groups,
- num_channels=ch_out,
- weight_attr=ParamAttr(regularizer=L2Decay(0.0)),
- bias_attr=ParamAttr(regularizer=L2Decay(0.0)))
- self.act = get_act_fn(act) if act is None or isinstance(act, (
- str, dict)) else act
- def forward(self, x):
- x = self.conv(x)
- x = self.norm(x)
- x = self.act(x)
- return x
- @register
- class FCOSRHead(nn.Layer):
- """ FCOSR Head, refer to https://arxiv.org/abs/2111.10780 for details """
- __shared__ = ['num_classes', 'trt']
- __inject__ = ['assigner', 'nms']
- def __init__(self,
- num_classes=15,
- in_channels=256,
- feat_channels=256,
- stacked_convs=4,
- act='relu',
- fpn_strides=[4, 8, 16, 32, 64],
- trt=False,
- loss_weight={'class': 1.0,
- 'probiou': 1.0},
- norm_cfg={'name': 'gn',
- 'num_groups': 32},
- assigner='FCOSRAssigner',
- nms='MultiClassNMS'):
- super(FCOSRHead, self).__init__()
- self.in_channels = in_channels
- self.num_classes = num_classes
- self.fpn_strides = fpn_strides
- self.stacked_convs = stacked_convs
- self.loss_weight = loss_weight
- self.half_pi = paddle.to_tensor(
- [1.5707963267948966], dtype=paddle.float32)
- self.probiou_loss = ProbIoULoss(mode='l1')
- act = get_act_fn(
- act, trt=trt) if act is None or isinstance(act,
- (str, dict)) else act
- self.trt = trt
- self.loss_weight = loss_weight
- self.assigner = assigner
- self.nms = nms
- # stem
- self.stem_cls = nn.LayerList()
- self.stem_reg = nn.LayerList()
- for i in range(self.stacked_convs):
- self.stem_cls.append(
- ConvBNLayer(
- self.in_channels[i],
- feat_channels,
- filter_size=3,
- stride=1,
- padding=1,
- norm_cfg=norm_cfg,
- act=act))
- self.stem_reg.append(
- ConvBNLayer(
- self.in_channels[i],
- feat_channels,
- filter_size=3,
- stride=1,
- padding=1,
- norm_cfg=norm_cfg,
- act=act))
- self.scales = nn.LayerList(
- [ScaleReg() for _ in range(len(fpn_strides))])
- # prediction
- self.pred_cls = nn.Conv2D(feat_channels, self.num_classes, 3, padding=1)
- self.pred_xy = nn.Conv2D(feat_channels, 2, 3, padding=1)
- self.pred_wh = nn.Conv2D(feat_channels, 2, 3, padding=1)
- self.pred_angle = nn.Conv2D(feat_channels, 1, 3, padding=1)
- self._init_weights()
- def _init_weights(self):
- for cls_, reg_ in zip(self.stem_cls, self.stem_reg):
- normal_(cls_.conv.weight, std=0.01)
- normal_(reg_.conv.weight, std=0.01)
- bias_cls = bias_init_with_prob(0.01)
- normal_(self.pred_cls.weight, std=0.01)
- constant_(self.pred_cls.bias, bias_cls)
- normal_(self.pred_xy.weight, std=0.01)
- normal_(self.pred_wh.weight, std=0.01)
- normal_(self.pred_angle.weight, std=0.01)
- @classmethod
- def from_config(cls, cfg, input_shape):
- return {'in_channels': [i.channels for i in input_shape], }
- def _generate_anchors(self, feats):
- if self.trt:
- anchor_points = []
- for feat, stride in zip(feats, self.fpn_strides):
- _, _, h, w = paddle.shape(feat)
- anchor, _ = anchor_generator(
- feat,
- stride * 4,
- 1.0, [1.0, 1.0, 1.0, 1.0], [stride, stride],
- offset=0.5)
- x1, y1, x2, y2 = paddle.split(anchor, 4, axis=-1)
- xc = (x1 + x2 + 1) / 2
- yc = (y1 + y2 + 1) / 2
- anchor_point = paddle.concat(
- [xc, yc], axis=-1).reshape((1, h * w, 2))
- anchor_points.append(anchor_point)
- anchor_points = paddle.concat(anchor_points, axis=1)
- return anchor_points, None, None
- else:
- anchor_points = []
- stride_tensor = []
- num_anchors_list = []
- for feat, stride in zip(feats, self.fpn_strides):
- _, _, h, w = paddle.shape(feat)
- shift_x = (paddle.arange(end=w) + 0.5) * stride
- shift_y = (paddle.arange(end=h) + 0.5) * stride
- shift_y, shift_x = paddle.meshgrid(shift_y, shift_x)
- anchor_point = paddle.cast(
- paddle.stack(
- [shift_x, shift_y], axis=-1), dtype='float32')
- anchor_points.append(anchor_point.reshape([1, -1, 2]))
- stride_tensor.append(
- paddle.full(
- [1, h * w, 1], stride, dtype='float32'))
- num_anchors_list.append(h * w)
- anchor_points = paddle.concat(anchor_points, axis=1)
- stride_tensor = paddle.concat(stride_tensor, axis=1)
- return anchor_points, stride_tensor, num_anchors_list
- def forward(self, feats, target=None):
- if self.training:
- return self.forward_train(feats, target)
- else:
- return self.forward_eval(feats, target)
- def forward_train(self, feats, target=None):
- anchor_points, stride_tensor, num_anchors_list = self._generate_anchors(
- feats)
- cls_pred_list, reg_pred_list = [], []
- for stride, feat, scale in zip(self.fpn_strides, feats, self.scales):
- # cls
- cls_feat = feat
- for cls_layer in self.stem_cls:
- cls_feat = cls_layer(cls_feat)
- cls_pred = F.sigmoid(self.pred_cls(cls_feat))
- cls_pred_list.append(cls_pred.flatten(2).transpose((0, 2, 1)))
- # reg
- reg_feat = feat
- for reg_layer in self.stem_reg:
- reg_feat = reg_layer(reg_feat)
- reg_xy = scale(self.pred_xy(reg_feat)) * stride
- reg_wh = F.elu(scale(self.pred_wh(reg_feat)) + 1.) * stride
- reg_angle = self.pred_angle(reg_feat)
- reg_angle = fmod(reg_angle, self.half_pi)
- reg_pred = paddle.concat([reg_xy, reg_wh, reg_angle], axis=1)
- reg_pred_list.append(reg_pred.flatten(2).transpose((0, 2, 1)))
- cls_pred_list = paddle.concat(cls_pred_list, axis=1)
- reg_pred_list = paddle.concat(reg_pred_list, axis=1)
- return self.get_loss([
- cls_pred_list, reg_pred_list, anchor_points, stride_tensor,
- num_anchors_list
- ], target)
- def forward_eval(self, feats, target=None):
- cls_pred_list, reg_pred_list = [], []
- anchor_points, _, _ = self._generate_anchors(feats)
- for stride, feat, scale in zip(self.fpn_strides, feats, self.scales):
- b, _, h, w = paddle.shape(feat)
- # cls
- cls_feat = feat
- for cls_layer in self.stem_cls:
- cls_feat = cls_layer(cls_feat)
- cls_pred = F.sigmoid(self.pred_cls(cls_feat))
- cls_pred_list.append(cls_pred.reshape([b, self.num_classes, h * w]))
- # reg
- reg_feat = feat
- for reg_layer in self.stem_reg:
- reg_feat = reg_layer(reg_feat)
- reg_xy = scale(self.pred_xy(reg_feat)) * stride
- reg_wh = F.elu(scale(self.pred_wh(reg_feat)) + 1.) * stride
- reg_angle = self.pred_angle(reg_feat)
- reg_angle = fmod_eval(reg_angle, self.half_pi)
- reg_pred = paddle.concat([reg_xy, reg_wh, reg_angle], axis=1)
- reg_pred = reg_pred.reshape([b, 5, h * w]).transpose((0, 2, 1))
- reg_pred_list.append(reg_pred)
- cls_pred_list = paddle.concat(cls_pred_list, axis=2)
- reg_pred_list = paddle.concat(reg_pred_list, axis=1)
- reg_pred_list = self._bbox_decode(anchor_points, reg_pred_list)
- return cls_pred_list, reg_pred_list
- def _bbox_decode(self, points, reg_pred_list):
- xy, wha = paddle.split(reg_pred_list, [2, 3], axis=-1)
- xy = xy + points
- return paddle.concat([xy, wha], axis=-1)
- def _box2corners(self, pred_bboxes):
- """ convert (x, y, w, h, angle) to (x1, y1, x2, y2, x3, y3, x4, y4)
- Args:
- pred_bboxes (Tensor): [B, N, 5]
-
- Returns:
- polys (Tensor): [B, N, 8]
- """
- x, y, w, h, angle = paddle.split(pred_bboxes, 5, axis=-1)
- cos_a_half = paddle.cos(angle) * 0.5
- sin_a_half = paddle.sin(angle) * 0.5
- w_x = cos_a_half * w
- w_y = sin_a_half * w
- h_x = -sin_a_half * h
- h_y = cos_a_half * h
- return paddle.concat(
- [
- x + w_x + h_x, y + w_y + h_y, x - w_x + h_x, y - w_y + h_y,
- x - w_x - h_x, y - w_y - h_y, x + w_x - h_x, y + w_y - h_y
- ],
- axis=-1)
- def get_loss(self, head_outs, gt_meta):
- cls_pred_list, reg_pred_list, anchor_points, stride_tensor, num_anchors_list = head_outs
- gt_labels = gt_meta['gt_class']
- gt_bboxes = gt_meta['gt_bbox']
- gt_rboxes = gt_meta['gt_rbox']
- pad_gt_mask = gt_meta['pad_gt_mask']
- # decode
- pred_rboxes = self._bbox_decode(anchor_points, reg_pred_list)
- # label assignment
- assigned_labels, assigned_rboxes, assigned_scores = \
- self.assigner(
- anchor_points,
- stride_tensor,
- num_anchors_list,
- gt_labels,
- gt_bboxes,
- gt_rboxes,
- pad_gt_mask,
- self.num_classes,
- pred_rboxes
- )
- # reg_loss
- mask_positive = (assigned_labels != self.num_classes)
- num_pos = mask_positive.sum().item()
- if num_pos > 0:
- bbox_mask = mask_positive.unsqueeze(-1).tile([1, 1, 5])
- pred_rboxes_pos = paddle.masked_select(pred_rboxes,
- bbox_mask).reshape([-1, 5])
- assigned_rboxes_pos = paddle.masked_select(
- assigned_rboxes, bbox_mask).reshape([-1, 5])
- bbox_weight = paddle.masked_select(
- assigned_scores.sum(-1), mask_positive).reshape([-1])
- avg_factor = bbox_weight.sum()
- loss_probiou = self.probiou_loss(pred_rboxes_pos,
- assigned_rboxes_pos)
- loss_probiou = paddle.sum(loss_probiou * bbox_weight) / avg_factor
- else:
- loss_probiou = pred_rboxes.sum() * 0.
- avg_factor = max(num_pos, 1.0)
- # cls_loss
- loss_cls = self._qfocal_loss(
- cls_pred_list, assigned_scores, reduction='sum')
- loss_cls = loss_cls / avg_factor
- loss = self.loss_weight['class'] * loss_cls + \
- self.loss_weight['probiou'] * loss_probiou
- out_dict = {
- 'loss': loss,
- 'loss_probiou': loss_probiou,
- 'loss_cls': loss_cls
- }
- return out_dict
- @staticmethod
- def _qfocal_loss(score, label, gamma=2.0, reduction='sum'):
- weight = (score - label).pow(gamma)
- loss = F.binary_cross_entropy(
- score, label, weight=weight, reduction=reduction)
- return loss
- def post_process(self, head_outs, scale_factor):
- pred_scores, pred_rboxes = head_outs
- # [B, N, 5] -> [B, N, 4, 2] -> [B, N, 8]
- pred_rboxes = self._box2corners(pred_rboxes)
- # scale bbox to origin
- scale_y, scale_x = paddle.split(scale_factor, 2, axis=-1)
- scale_factor = paddle.concat(
- [
- scale_x, scale_y, scale_x, scale_y, scale_x, scale_y, scale_x,
- scale_y
- ],
- axis=-1).reshape([-1, 1, 8])
- pred_rboxes /= scale_factor
- bbox_pred, bbox_num, _ = self.nms(pred_rboxes, pred_scores)
- return bbox_pred, bbox_num
|