# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import math import copy import numpy as np import paddle import paddle.nn as nn import paddle.nn.functional as F from ppdet.core.workspace import register, serializable from ppdet.modeling.layers import DropBlock, MultiHeadAttention from ppdet.modeling.ops import get_act_fn from ..backbones.cspresnet import ConvBNLayer, BasicBlock from ..shape_spec import ShapeSpec from ..initializer import linear_init_ __all__ = ['CustomCSPPAN'] def _get_clones(module, N): return nn.LayerList([copy.deepcopy(module) for _ in range(N)]) class SPP(nn.Layer): def __init__(self, ch_in, ch_out, k, pool_size, act='swish', data_format='NCHW'): super(SPP, self).__init__() self.pool = [] self.data_format = data_format for i, size in enumerate(pool_size): pool = self.add_sublayer( 'pool{}'.format(i), nn.MaxPool2D( kernel_size=size, stride=1, padding=size // 2, data_format=data_format, ceil_mode=False)) self.pool.append(pool) self.conv = ConvBNLayer(ch_in, ch_out, k, padding=k // 2, act=act) def forward(self, x): outs = [x] for pool in self.pool: outs.append(pool(x)) if self.data_format == 'NCHW': y = paddle.concat(outs, axis=1) else: y = paddle.concat(outs, axis=-1) y = self.conv(y) return y class CSPStage(nn.Layer): def __init__(self, block_fn, ch_in, ch_out, n, act='swish', spp=False, use_alpha=False): super(CSPStage, self).__init__() ch_mid = int(ch_out // 2) self.conv1 = ConvBNLayer(ch_in, ch_mid, 1, act=act) self.conv2 = ConvBNLayer(ch_in, ch_mid, 1, act=act) self.convs = nn.Sequential() next_ch_in = ch_mid for i in range(n): self.convs.add_sublayer( str(i), eval(block_fn)(next_ch_in, ch_mid, act=act, shortcut=False, use_alpha=use_alpha)) if i == (n - 1) // 2 and spp: self.convs.add_sublayer( 'spp', SPP(ch_mid * 4, ch_mid, 1, [5, 9, 13], act=act)) next_ch_in = ch_mid self.conv3 = ConvBNLayer(ch_mid * 2, ch_out, 1, act=act) def forward(self, x): y1 = self.conv1(x) y2 = self.conv2(x) y2 = self.convs(y2) y = paddle.concat([y1, y2], axis=1) y = self.conv3(y) return y class TransformerEncoderLayer(nn.Layer): def __init__(self, d_model, nhead, dim_feedforward=2048, dropout=0.1, activation="relu", attn_dropout=None, act_dropout=None, normalize_before=False): super(TransformerEncoderLayer, self).__init__() attn_dropout = dropout if attn_dropout is None else attn_dropout act_dropout = dropout if act_dropout is None else act_dropout self.normalize_before = normalize_before self.self_attn = MultiHeadAttention(d_model, nhead, attn_dropout) # Implementation of Feedforward model self.linear1 = nn.Linear(d_model, dim_feedforward) self.dropout = nn.Dropout(act_dropout, mode="upscale_in_train") self.linear2 = nn.Linear(dim_feedforward, d_model) self.norm1 = nn.LayerNorm(d_model) self.norm2 = nn.LayerNorm(d_model) self.dropout1 = nn.Dropout(dropout, mode="upscale_in_train") self.dropout2 = nn.Dropout(dropout, mode="upscale_in_train") self.activation = getattr(F, activation) self._reset_parameters() def _reset_parameters(self): linear_init_(self.linear1) linear_init_(self.linear2) @staticmethod def with_pos_embed(tensor, pos_embed): return tensor if pos_embed is None else tensor + pos_embed def forward(self, src, src_mask=None, pos_embed=None): residual = src if self.normalize_before: src = self.norm1(src) q = k = self.with_pos_embed(src, pos_embed) src = self.self_attn(q, k, value=src, attn_mask=src_mask) src = residual + self.dropout1(src) if not self.normalize_before: src = self.norm1(src) residual = src if self.normalize_before: src = self.norm2(src) src = self.linear2(self.dropout(self.activation(self.linear1(src)))) src = residual + self.dropout2(src) if not self.normalize_before: src = self.norm2(src) return src class TransformerEncoder(nn.Layer): def __init__(self, encoder_layer, num_layers, norm=None): super(TransformerEncoder, self).__init__() self.layers = _get_clones(encoder_layer, num_layers) self.num_layers = num_layers self.norm = norm def forward(self, src, src_mask=None, pos_embed=None): output = src for layer in self.layers: output = layer(output, src_mask=src_mask, pos_embed=pos_embed) if self.norm is not None: output = self.norm(output) return output @register @serializable class CustomCSPPAN(nn.Layer): __shared__ = [ 'norm_type', 'data_format', 'width_mult', 'depth_mult', 'trt', 'eval_size' ] def __init__(self, in_channels=[256, 512, 1024], out_channels=[1024, 512, 256], norm_type='bn', act='leaky', stage_fn='CSPStage', block_fn='BasicBlock', stage_num=1, block_num=3, drop_block=False, block_size=3, keep_prob=0.9, spp=False, data_format='NCHW', width_mult=1.0, depth_mult=1.0, use_alpha=False, trt=False, dim_feedforward=2048, dropout=0.1, activation='gelu', nhead=4, num_layers=4, attn_dropout=None, act_dropout=None, normalize_before=False, use_trans=False, eval_size=None): super(CustomCSPPAN, self).__init__() out_channels = [max(round(c * width_mult), 1) for c in out_channels] block_num = max(round(block_num * depth_mult), 1) act = get_act_fn( act, trt=trt) if act is None or isinstance(act, (str, dict)) else act self.num_blocks = len(in_channels) self.data_format = data_format self._out_channels = out_channels self.hidden_dim = in_channels[-1] in_channels = in_channels[::-1] self.use_trans = use_trans self.eval_size = eval_size if use_trans: if eval_size is not None: self.pos_embed = self.build_2d_sincos_position_embedding( eval_size[1] // 32, eval_size[0] // 32, embed_dim=self.hidden_dim) else: self.pos_embed = None encoder_layer = TransformerEncoderLayer( self.hidden_dim, nhead, dim_feedforward, dropout, activation, attn_dropout, act_dropout, normalize_before) encoder_norm = nn.LayerNorm( self.hidden_dim) if normalize_before else None self.encoder = TransformerEncoder(encoder_layer, num_layers, encoder_norm) fpn_stages = [] fpn_routes = [] for i, (ch_in, ch_out) in enumerate(zip(in_channels, out_channels)): if i > 0: ch_in += ch_pre // 2 stage = nn.Sequential() for j in range(stage_num): stage.add_sublayer( str(j), eval(stage_fn)(block_fn, ch_in if j == 0 else ch_out, ch_out, block_num, act=act, spp=(spp and i == 0), use_alpha=use_alpha)) if drop_block: stage.add_sublayer('drop', DropBlock(block_size, keep_prob)) fpn_stages.append(stage) if i < self.num_blocks - 1: fpn_routes.append( ConvBNLayer( ch_in=ch_out, ch_out=ch_out // 2, filter_size=1, stride=1, padding=0, act=act)) ch_pre = ch_out self.fpn_stages = nn.LayerList(fpn_stages) self.fpn_routes = nn.LayerList(fpn_routes) pan_stages = [] pan_routes = [] for i in reversed(range(self.num_blocks - 1)): pan_routes.append( ConvBNLayer( ch_in=out_channels[i + 1], ch_out=out_channels[i + 1], filter_size=3, stride=2, padding=1, act=act)) ch_in = out_channels[i] + out_channels[i + 1] ch_out = out_channels[i] stage = nn.Sequential() for j in range(stage_num): stage.add_sublayer( str(j), eval(stage_fn)(block_fn, ch_in if j == 0 else ch_out, ch_out, block_num, act=act, spp=False, use_alpha=use_alpha)) if drop_block: stage.add_sublayer('drop', DropBlock(block_size, keep_prob)) pan_stages.append(stage) self.pan_stages = nn.LayerList(pan_stages[::-1]) self.pan_routes = nn.LayerList(pan_routes[::-1]) def build_2d_sincos_position_embedding( self, w, h, embed_dim=1024, temperature=10000., ): grid_w = paddle.arange(int(w), dtype=paddle.float32) grid_h = paddle.arange(int(h), dtype=paddle.float32) grid_w, grid_h = paddle.meshgrid(grid_w, grid_h) assert embed_dim % 4 == 0, 'Embed dimension must be divisible by 4 for 2D sin-cos position embedding' pos_dim = embed_dim // 4 omega = paddle.arange(pos_dim, dtype=paddle.float32) / pos_dim omega = 1. / (temperature**omega) out_w = grid_w.flatten()[..., None] @omega[None] out_h = grid_h.flatten()[..., None] @omega[None] pos_emb = paddle.concat( [ paddle.sin(out_w), paddle.cos(out_w), paddle.sin(out_h), paddle.cos(out_h) ], axis=1)[None, :, :] return pos_emb def forward(self, blocks, for_mot=False): if self.use_trans: last_feat = blocks[-1] n, c, h, w = last_feat.shape # flatten [B, C, H, W] to [B, HxW, C] src_flatten = last_feat.flatten(2).transpose([0, 2, 1]) if self.eval_size is not None and not self.training: pos_embed = self.pos_embed else: pos_embed = self.build_2d_sincos_position_embedding( w=w, h=h, embed_dim=self.hidden_dim) memory = self.encoder(src_flatten, pos_embed=pos_embed) last_feat_encode = memory.transpose([0, 2, 1]).reshape([n, c, h, w]) blocks[-1] = last_feat_encode blocks = blocks[::-1] fpn_feats = [] for i, block in enumerate(blocks): if i > 0: block = paddle.concat([route, block], axis=1) route = self.fpn_stages[i](block) fpn_feats.append(route) if i < self.num_blocks - 1: route = self.fpn_routes[i](route) route = F.interpolate( route, scale_factor=2., data_format=self.data_format) pan_feats = [fpn_feats[-1], ] route = fpn_feats[-1] for i in reversed(range(self.num_blocks - 1)): block = fpn_feats[i] route = self.pan_routes[i](route) block = paddle.concat([route, block], axis=1) route = self.pan_stages[i](block) pan_feats.append(route) return pan_feats[::-1] @classmethod def from_config(cls, cfg, input_shape): return {'in_channels': [i.channels for i in input_shape], } @property def out_shape(self): return [ShapeSpec(channels=c) for c in self._out_channels]