123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377 |
- # Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- """
- This code is based on https://github.com/PeizeSun/SparseR-CNN/blob/main/projects/SparseRCNN/sparsercnn/head.py
- Ths copyright of PeizeSun/SparseR-CNN is as follows:
- MIT License [see LICENSE for details]
- """
- from __future__ import absolute_import
- from __future__ import division
- from __future__ import print_function
- import math
- import copy
- import paddle
- import paddle.nn as nn
- from ppdet.core.workspace import register
- from ppdet.modeling.heads.roi_extractor import RoIAlign
- from ppdet.modeling.bbox_utils import delta2bbox
- from .. import initializer as init
- _DEFAULT_SCALE_CLAMP = math.log(100000. / 16)
- class DynamicConv(nn.Layer):
- def __init__(
- self,
- head_hidden_dim,
- head_dim_dynamic,
- head_num_dynamic, ):
- super().__init__()
- self.hidden_dim = head_hidden_dim
- self.dim_dynamic = head_dim_dynamic
- self.num_dynamic = head_num_dynamic
- self.num_params = self.hidden_dim * self.dim_dynamic
- self.dynamic_layer = nn.Linear(self.hidden_dim,
- self.num_dynamic * self.num_params)
- self.norm1 = nn.LayerNorm(self.dim_dynamic)
- self.norm2 = nn.LayerNorm(self.hidden_dim)
- self.activation = nn.ReLU()
- pooler_resolution = 7
- num_output = self.hidden_dim * pooler_resolution**2
- self.out_layer = nn.Linear(num_output, self.hidden_dim)
- self.norm3 = nn.LayerNorm(self.hidden_dim)
- def forward(self, pro_features, roi_features):
- '''
- pro_features: (1, N * nr_boxes, self.d_model)
- roi_features: (49, N * nr_boxes, self.d_model)
- '''
- features = roi_features.transpose(perm=[1, 0, 2])
- parameters = self.dynamic_layer(pro_features).transpose(perm=[1, 0, 2])
- param1 = parameters[:, :, :self.num_params].reshape(
- [-1, self.hidden_dim, self.dim_dynamic])
- param2 = parameters[:, :, self.num_params:].reshape(
- [-1, self.dim_dynamic, self.hidden_dim])
- features = paddle.bmm(features, param1)
- features = self.norm1(features)
- features = self.activation(features)
- features = paddle.bmm(features, param2)
- features = self.norm2(features)
- features = self.activation(features)
- features = features.flatten(1)
- features = self.out_layer(features)
- features = self.norm3(features)
- features = self.activation(features)
- return features
- class RCNNHead(nn.Layer):
- def __init__(
- self,
- d_model,
- num_classes,
- dim_feedforward,
- nhead,
- dropout,
- head_cls,
- head_reg,
- head_dim_dynamic,
- head_num_dynamic,
- scale_clamp: float=_DEFAULT_SCALE_CLAMP,
- bbox_weights=(2.0, 2.0, 1.0, 1.0), ):
- super().__init__()
- self.d_model = d_model
- # dynamic.
- self.self_attn = nn.MultiHeadAttention(d_model, nhead, dropout=dropout)
- self.inst_interact = DynamicConv(d_model, head_dim_dynamic,
- head_num_dynamic)
- self.linear1 = nn.Linear(d_model, dim_feedforward)
- self.dropout = nn.Dropout(dropout)
- self.linear2 = nn.Linear(dim_feedforward, d_model)
- self.norm1 = nn.LayerNorm(d_model)
- self.norm2 = nn.LayerNorm(d_model)
- self.norm3 = nn.LayerNorm(d_model)
- self.dropout1 = nn.Dropout(dropout)
- self.dropout2 = nn.Dropout(dropout)
- self.dropout3 = nn.Dropout(dropout)
- self.activation = nn.ReLU()
- # cls.
- num_cls = head_cls
- cls_module = list()
- for _ in range(num_cls):
- cls_module.append(nn.Linear(d_model, d_model, bias_attr=False))
- cls_module.append(nn.LayerNorm(d_model))
- cls_module.append(nn.ReLU())
- self.cls_module = nn.LayerList(cls_module)
- # reg.
- num_reg = head_reg
- reg_module = list()
- for _ in range(num_reg):
- reg_module.append(nn.Linear(d_model, d_model, bias_attr=False))
- reg_module.append(nn.LayerNorm(d_model))
- reg_module.append(nn.ReLU())
- self.reg_module = nn.LayerList(reg_module)
- # pred.
- self.class_logits = nn.Linear(d_model, num_classes)
- self.bboxes_delta = nn.Linear(d_model, 4)
- self.scale_clamp = scale_clamp
- self.bbox_weights = bbox_weights
- def forward(self, features, bboxes, pro_features, pooler):
- """
- :param bboxes: (N, nr_boxes, 4)
- :param pro_features: (N, nr_boxes, d_model)
- """
- N, nr_boxes = bboxes.shape[:2]
- proposal_boxes = list()
- for b in range(N):
- proposal_boxes.append(bboxes[b])
- roi_num = paddle.full([N], nr_boxes).astype("int32")
- roi_features = pooler(features, proposal_boxes, roi_num)
- roi_features = roi_features.reshape(
- [N * nr_boxes, self.d_model, -1]).transpose(perm=[2, 0, 1])
- # self_att.
- pro_features = pro_features.reshape([N, nr_boxes, self.d_model])
- pro_features2 = self.self_attn(
- pro_features, pro_features, value=pro_features)
- pro_features = pro_features.transpose(perm=[1, 0, 2]) + self.dropout1(
- pro_features2.transpose(perm=[1, 0, 2]))
- pro_features = self.norm1(pro_features)
- # inst_interact.
- pro_features = pro_features.reshape(
- [nr_boxes, N, self.d_model]).transpose(perm=[1, 0, 2]).reshape(
- [1, N * nr_boxes, self.d_model])
- pro_features2 = self.inst_interact(pro_features, roi_features)
- pro_features = pro_features + self.dropout2(pro_features2)
- obj_features = self.norm2(pro_features)
- # obj_feature.
- obj_features2 = self.linear2(
- self.dropout(self.activation(self.linear1(obj_features))))
- obj_features = obj_features + self.dropout3(obj_features2)
- obj_features = self.norm3(obj_features)
- fc_feature = obj_features.transpose(perm=[1, 0, 2]).reshape(
- [N * nr_boxes, -1])
- cls_feature = fc_feature.clone()
- reg_feature = fc_feature.clone()
- for cls_layer in self.cls_module:
- cls_feature = cls_layer(cls_feature)
- for reg_layer in self.reg_module:
- reg_feature = reg_layer(reg_feature)
- class_logits = self.class_logits(cls_feature)
- bboxes_deltas = self.bboxes_delta(reg_feature)
- pred_bboxes = delta2bbox(bboxes_deltas,
- bboxes.reshape([-1, 4]), self.bbox_weights)
- return class_logits.reshape([N, nr_boxes, -1]), pred_bboxes.reshape(
- [N, nr_boxes, -1]), obj_features
- @register
- class SparseRCNNHead(nn.Layer):
- '''
- SparsercnnHead
- Args:
- roi_input_shape (list[ShapeSpec]): The output shape of fpn
- num_classes (int): Number of classes,
- head_hidden_dim (int): The param of MultiHeadAttention,
- head_dim_feedforward (int): The param of MultiHeadAttention,
- nhead (int): The param of MultiHeadAttention,
- head_dropout (float): The p of dropout,
- head_cls (int): The number of class head,
- head_reg (int): The number of regressionhead,
- head_num_dynamic (int): The number of DynamicConv's param,
- head_num_heads (int): The number of RCNNHead,
- deep_supervision (int): wheather supervise the intermediate results,
- num_proposals (int): the number of proposals boxes and features
- '''
- __inject__ = ['loss_func']
- __shared__ = ['num_classes']
- def __init__(
- self,
- head_hidden_dim,
- head_dim_feedforward,
- nhead,
- head_dropout,
- head_cls,
- head_reg,
- head_dim_dynamic,
- head_num_dynamic,
- head_num_heads,
- deep_supervision,
- num_proposals,
- num_classes=80,
- loss_func="SparseRCNNLoss",
- roi_input_shape=None, ):
- super().__init__()
- assert head_num_heads > 0, \
- f'At least one RoI Head is required, but {head_num_heads}.'
- # Build RoI.
- box_pooler = self._init_box_pooler(roi_input_shape)
- self.box_pooler = box_pooler
- # Build heads.
- rcnn_head = RCNNHead(
- head_hidden_dim,
- num_classes,
- head_dim_feedforward,
- nhead,
- head_dropout,
- head_cls,
- head_reg,
- head_dim_dynamic,
- head_num_dynamic, )
- self.head_series = nn.LayerList(
- [copy.deepcopy(rcnn_head) for i in range(head_num_heads)])
- self.return_intermediate = deep_supervision
- self.num_classes = num_classes
- # build init proposal
- self.init_proposal_features = nn.Embedding(num_proposals,
- head_hidden_dim)
- self.init_proposal_boxes = nn.Embedding(num_proposals, 4)
- self.lossfunc = loss_func
- # Init parameters.
- init.reset_initialized_parameter(self)
- self._reset_parameters()
- def _reset_parameters(self):
- # init all parameters.
- prior_prob = 0.01
- bias_value = -math.log((1 - prior_prob) / prior_prob)
- for m in self.sublayers():
- if isinstance(m, nn.Linear):
- init.xavier_normal_(m.weight, reverse=True)
- elif not isinstance(m, nn.Embedding) and hasattr(
- m, "weight") and m.weight.dim() > 1:
- init.xavier_normal_(m.weight, reverse=False)
- if hasattr(m, "bias") and m.bias is not None and m.bias.shape[
- -1] == self.num_classes:
- init.constant_(m.bias, bias_value)
- init_bboxes = paddle.empty_like(self.init_proposal_boxes.weight)
- init_bboxes[:, :2] = 0.5
- init_bboxes[:, 2:] = 1.0
- self.init_proposal_boxes.weight.set_value(init_bboxes)
- @staticmethod
- def _init_box_pooler(input_shape):
- pooler_resolution = 7
- sampling_ratio = 2
- if input_shape is not None:
- pooler_scales = tuple(1.0 / input_shape[k].stride
- for k in range(len(input_shape)))
- in_channels = [
- input_shape[f].channels for f in range(len(input_shape))
- ]
- end_level = len(input_shape) - 1
- # Check all channel counts are equal
- assert len(set(in_channels)) == 1, in_channels
- else:
- pooler_scales = [1.0 / 4.0, 1.0 / 8.0, 1.0 / 16.0, 1.0 / 32.0]
- end_level = 3
- box_pooler = RoIAlign(
- resolution=pooler_resolution,
- spatial_scale=pooler_scales,
- sampling_ratio=sampling_ratio,
- end_level=end_level,
- aligned=True)
- return box_pooler
- def forward(self, features, input_whwh):
- bs = len(features[0])
- bboxes = box_cxcywh_to_xyxy(self.init_proposal_boxes.weight.clone(
- )).unsqueeze(0)
- bboxes = bboxes * input_whwh.unsqueeze(-2)
- init_features = self.init_proposal_features.weight.unsqueeze(0).tile(
- [1, bs, 1])
- proposal_features = init_features.clone()
- inter_class_logits = []
- inter_pred_bboxes = []
- for stage, rcnn_head in enumerate(self.head_series):
- class_logits, pred_bboxes, proposal_features = rcnn_head(
- features, bboxes, proposal_features, self.box_pooler)
- if self.return_intermediate or stage == len(self.head_series) - 1:
- inter_class_logits.append(class_logits)
- inter_pred_bboxes.append(pred_bboxes)
- bboxes = pred_bboxes.detach()
- output = {
- 'pred_logits': inter_class_logits[-1],
- 'pred_boxes': inter_pred_bboxes[-1]
- }
- if self.return_intermediate:
- output['aux_outputs'] = [{
- 'pred_logits': a,
- 'pred_boxes': b
- } for a, b in zip(inter_class_logits[:-1], inter_pred_bboxes[:-1])]
- return output
- def get_loss(self, outputs, targets):
- losses = self.lossfunc(outputs, targets)
- weight_dict = self.lossfunc.weight_dict
- for k in losses.keys():
- if k in weight_dict:
- losses[k] *= weight_dict[k]
- return losses
- def box_cxcywh_to_xyxy(x):
- x_c, y_c, w, h = x.unbind(-1)
- b = [(x_c - 0.5 * w), (y_c - 0.5 * h), (x_c + 0.5 * w), (y_c + 0.5 * h)]
- return paddle.stack(b, axis=-1)
|