123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536 |
- # Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- # Modified from Deformable-DETR (https://github.com/fundamentalvision/Deformable-DETR)
- # Copyright (c) 2020 SenseTime. All Rights Reserved.
- from __future__ import absolute_import
- from __future__ import division
- from __future__ import print_function
- import math
- import paddle
- import paddle.nn as nn
- import paddle.nn.functional as F
- from paddle import ParamAttr
- from ppdet.core.workspace import register
- from ..layers import MultiHeadAttention
- from .position_encoding import PositionEmbedding
- from .utils import _get_clones, get_valid_ratio
- from ..initializer import linear_init_, constant_, xavier_uniform_, normal_
- __all__ = ['DeformableTransformer']
- class MSDeformableAttention(nn.Layer):
- def __init__(self,
- embed_dim=256,
- num_heads=8,
- num_levels=4,
- num_points=4,
- lr_mult=0.1):
- """
- Multi-Scale Deformable Attention Module
- """
- super(MSDeformableAttention, self).__init__()
- self.embed_dim = embed_dim
- self.num_heads = num_heads
- self.num_levels = num_levels
- self.num_points = num_points
- self.total_points = num_heads * num_levels * num_points
- self.head_dim = embed_dim // num_heads
- assert self.head_dim * num_heads == self.embed_dim, "embed_dim must be divisible by num_heads"
- self.sampling_offsets = nn.Linear(
- embed_dim,
- self.total_points * 2,
- weight_attr=ParamAttr(learning_rate=lr_mult),
- bias_attr=ParamAttr(learning_rate=lr_mult))
- self.attention_weights = nn.Linear(embed_dim, self.total_points)
- self.value_proj = nn.Linear(embed_dim, embed_dim)
- self.output_proj = nn.Linear(embed_dim, embed_dim)
- try:
- # use cuda op
- from deformable_detr_ops import ms_deformable_attn
- except:
- # use paddle func
- from .utils import deformable_attention_core_func as ms_deformable_attn
- self.ms_deformable_attn_core = ms_deformable_attn
- self._reset_parameters()
- def _reset_parameters(self):
- # sampling_offsets
- constant_(self.sampling_offsets.weight)
- thetas = paddle.arange(
- self.num_heads,
- dtype=paddle.float32) * (2.0 * math.pi / self.num_heads)
- grid_init = paddle.stack([thetas.cos(), thetas.sin()], -1)
- grid_init = grid_init / grid_init.abs().max(-1, keepdim=True)
- grid_init = grid_init.reshape([self.num_heads, 1, 1, 2]).tile(
- [1, self.num_levels, self.num_points, 1])
- scaling = paddle.arange(
- 1, self.num_points + 1,
- dtype=paddle.float32).reshape([1, 1, -1, 1])
- grid_init *= scaling
- self.sampling_offsets.bias.set_value(grid_init.flatten())
- # attention_weights
- constant_(self.attention_weights.weight)
- constant_(self.attention_weights.bias)
- # proj
- xavier_uniform_(self.value_proj.weight)
- constant_(self.value_proj.bias)
- xavier_uniform_(self.output_proj.weight)
- constant_(self.output_proj.bias)
- def forward(self,
- query,
- reference_points,
- value,
- value_spatial_shapes,
- value_level_start_index,
- value_mask=None):
- """
- Args:
- query (Tensor): [bs, query_length, C]
- reference_points (Tensor): [bs, query_length, n_levels, 2], range in [0, 1], top-left (0,0),
- bottom-right (1, 1), including padding area
- value (Tensor): [bs, value_length, C]
- value_spatial_shapes (Tensor): [n_levels, 2], [(H_0, W_0), (H_1, W_1), ..., (H_{L-1}, W_{L-1})]
- value_level_start_index (Tensor(int64)): [n_levels], [0, H_0*W_0, H_0*W_0+H_1*W_1, ...]
- value_mask (Tensor): [bs, value_length], True for non-padding elements, False for padding elements
- Returns:
- output (Tensor): [bs, Length_{query}, C]
- """
- bs, Len_q = query.shape[:2]
- Len_v = value.shape[1]
- assert int(value_spatial_shapes.prod(1).sum()) == Len_v
- value = self.value_proj(value)
- if value_mask is not None:
- value_mask = value_mask.astype(value.dtype).unsqueeze(-1)
- value *= value_mask
- value = value.reshape([bs, Len_v, self.num_heads, self.head_dim])
- sampling_offsets = self.sampling_offsets(query).reshape(
- [bs, Len_q, self.num_heads, self.num_levels, self.num_points, 2])
- attention_weights = self.attention_weights(query).reshape(
- [bs, Len_q, self.num_heads, self.num_levels * self.num_points])
- attention_weights = F.softmax(attention_weights).reshape(
- [bs, Len_q, self.num_heads, self.num_levels, self.num_points])
- if reference_points.shape[-1] == 2:
- offset_normalizer = value_spatial_shapes.flip([1]).reshape(
- [1, 1, 1, self.num_levels, 1, 2])
- sampling_locations = reference_points.reshape([
- bs, Len_q, 1, self.num_levels, 1, 2
- ]) + sampling_offsets / offset_normalizer
- elif reference_points.shape[-1] == 4:
- sampling_locations = (
- reference_points[:, :, None, :, None, :2] + sampling_offsets /
- self.num_points * reference_points[:, :, None, :, None, 2:] *
- 0.5)
- else:
- raise ValueError(
- "Last dim of reference_points must be 2 or 4, but get {} instead.".
- format(reference_points.shape[-1]))
- output = self.ms_deformable_attn_core(
- value, value_spatial_shapes, value_level_start_index,
- sampling_locations, attention_weights)
- output = self.output_proj(output)
- return output
- class DeformableTransformerEncoderLayer(nn.Layer):
- def __init__(self,
- d_model=256,
- n_head=8,
- dim_feedforward=1024,
- dropout=0.1,
- activation="relu",
- n_levels=4,
- n_points=4,
- weight_attr=None,
- bias_attr=None):
- super(DeformableTransformerEncoderLayer, self).__init__()
- # self attention
- self.self_attn = MSDeformableAttention(d_model, n_head, n_levels,
- n_points)
- self.dropout1 = nn.Dropout(dropout)
- self.norm1 = nn.LayerNorm(d_model)
- # ffn
- self.linear1 = nn.Linear(d_model, dim_feedforward, weight_attr,
- bias_attr)
- self.activation = getattr(F, activation)
- self.dropout2 = nn.Dropout(dropout)
- self.linear2 = nn.Linear(dim_feedforward, d_model, weight_attr,
- bias_attr)
- self.dropout3 = nn.Dropout(dropout)
- self.norm2 = nn.LayerNorm(d_model)
- self._reset_parameters()
- def _reset_parameters(self):
- linear_init_(self.linear1)
- linear_init_(self.linear2)
- xavier_uniform_(self.linear1.weight)
- xavier_uniform_(self.linear2.weight)
- def with_pos_embed(self, tensor, pos):
- return tensor if pos is None else tensor + pos
- def forward_ffn(self, src):
- src2 = self.linear2(self.dropout2(self.activation(self.linear1(src))))
- src = src + self.dropout3(src2)
- src = self.norm2(src)
- return src
- def forward(self,
- src,
- reference_points,
- spatial_shapes,
- level_start_index,
- src_mask=None,
- pos_embed=None):
- # self attention
- src2 = self.self_attn(
- self.with_pos_embed(src, pos_embed), reference_points, src,
- spatial_shapes, level_start_index, src_mask)
- src = src + self.dropout1(src2)
- src = self.norm1(src)
- # ffn
- src = self.forward_ffn(src)
- return src
- class DeformableTransformerEncoder(nn.Layer):
- def __init__(self, encoder_layer, num_layers):
- super(DeformableTransformerEncoder, self).__init__()
- self.layers = _get_clones(encoder_layer, num_layers)
- self.num_layers = num_layers
- @staticmethod
- def get_reference_points(spatial_shapes, valid_ratios, offset=0.5):
- valid_ratios = valid_ratios.unsqueeze(1)
- reference_points = []
- for i, (H, W) in enumerate(spatial_shapes):
- ref_y, ref_x = paddle.meshgrid(
- paddle.arange(end=H) + offset, paddle.arange(end=W) + offset)
- ref_y = ref_y.flatten().unsqueeze(0) / (valid_ratios[:, :, i, 1] *
- H)
- ref_x = ref_x.flatten().unsqueeze(0) / (valid_ratios[:, :, i, 0] *
- W)
- reference_points.append(paddle.stack((ref_x, ref_y), axis=-1))
- reference_points = paddle.concat(reference_points, 1).unsqueeze(2)
- reference_points = reference_points * valid_ratios
- return reference_points
- def forward(self,
- src,
- spatial_shapes,
- level_start_index,
- src_mask=None,
- pos_embed=None,
- valid_ratios=None):
- output = src
- if valid_ratios is None:
- valid_ratios = paddle.ones(
- [src.shape[0], spatial_shapes.shape[0], 2])
- reference_points = self.get_reference_points(spatial_shapes,
- valid_ratios)
- for layer in self.layers:
- output = layer(output, reference_points, spatial_shapes,
- level_start_index, src_mask, pos_embed)
- return output
- class DeformableTransformerDecoderLayer(nn.Layer):
- def __init__(self,
- d_model=256,
- n_head=8,
- dim_feedforward=1024,
- dropout=0.1,
- activation="relu",
- n_levels=4,
- n_points=4,
- weight_attr=None,
- bias_attr=None):
- super(DeformableTransformerDecoderLayer, self).__init__()
- # self attention
- self.self_attn = MultiHeadAttention(d_model, n_head, dropout=dropout)
- self.dropout1 = nn.Dropout(dropout)
- self.norm1 = nn.LayerNorm(d_model)
- # cross attention
- self.cross_attn = MSDeformableAttention(d_model, n_head, n_levels,
- n_points)
- self.dropout2 = nn.Dropout(dropout)
- self.norm2 = nn.LayerNorm(d_model)
- # ffn
- self.linear1 = nn.Linear(d_model, dim_feedforward, weight_attr,
- bias_attr)
- self.activation = getattr(F, activation)
- self.dropout3 = nn.Dropout(dropout)
- self.linear2 = nn.Linear(dim_feedforward, d_model, weight_attr,
- bias_attr)
- self.dropout4 = nn.Dropout(dropout)
- self.norm3 = nn.LayerNorm(d_model)
- self._reset_parameters()
- def _reset_parameters(self):
- linear_init_(self.linear1)
- linear_init_(self.linear2)
- xavier_uniform_(self.linear1.weight)
- xavier_uniform_(self.linear2.weight)
- def with_pos_embed(self, tensor, pos):
- return tensor if pos is None else tensor + pos
- def forward_ffn(self, tgt):
- tgt2 = self.linear2(self.dropout3(self.activation(self.linear1(tgt))))
- tgt = tgt + self.dropout4(tgt2)
- tgt = self.norm3(tgt)
- return tgt
- def forward(self,
- tgt,
- reference_points,
- memory,
- memory_spatial_shapes,
- memory_level_start_index,
- memory_mask=None,
- query_pos_embed=None):
- # self attention
- q = k = self.with_pos_embed(tgt, query_pos_embed)
- tgt2 = self.self_attn(q, k, value=tgt)
- tgt = tgt + self.dropout1(tgt2)
- tgt = self.norm1(tgt)
- # cross attention
- tgt2 = self.cross_attn(
- self.with_pos_embed(tgt, query_pos_embed), reference_points, memory,
- memory_spatial_shapes, memory_level_start_index, memory_mask)
- tgt = tgt + self.dropout2(tgt2)
- tgt = self.norm2(tgt)
- # ffn
- tgt = self.forward_ffn(tgt)
- return tgt
- class DeformableTransformerDecoder(nn.Layer):
- def __init__(self, decoder_layer, num_layers, return_intermediate=False):
- super(DeformableTransformerDecoder, self).__init__()
- self.layers = _get_clones(decoder_layer, num_layers)
- self.num_layers = num_layers
- self.return_intermediate = return_intermediate
- def forward(self,
- tgt,
- reference_points,
- memory,
- memory_spatial_shapes,
- memory_level_start_index,
- memory_mask=None,
- query_pos_embed=None):
- output = tgt
- intermediate = []
- for lid, layer in enumerate(self.layers):
- output = layer(output, reference_points, memory,
- memory_spatial_shapes, memory_level_start_index,
- memory_mask, query_pos_embed)
- if self.return_intermediate:
- intermediate.append(output)
- if self.return_intermediate:
- return paddle.stack(intermediate)
- return output.unsqueeze(0)
- @register
- class DeformableTransformer(nn.Layer):
- __shared__ = ['hidden_dim']
- def __init__(self,
- num_queries=300,
- position_embed_type='sine',
- return_intermediate_dec=True,
- backbone_num_channels=[512, 1024, 2048],
- num_feature_levels=4,
- num_encoder_points=4,
- num_decoder_points=4,
- hidden_dim=256,
- nhead=8,
- num_encoder_layers=6,
- num_decoder_layers=6,
- dim_feedforward=1024,
- dropout=0.1,
- activation="relu",
- lr_mult=0.1,
- weight_attr=None,
- bias_attr=None):
- super(DeformableTransformer, self).__init__()
- assert position_embed_type in ['sine', 'learned'], \
- f'ValueError: position_embed_type not supported {position_embed_type}!'
- assert len(backbone_num_channels) <= num_feature_levels
- self.hidden_dim = hidden_dim
- self.nhead = nhead
- self.num_feature_levels = num_feature_levels
- encoder_layer = DeformableTransformerEncoderLayer(
- hidden_dim, nhead, dim_feedforward, dropout, activation,
- num_feature_levels, num_encoder_points, weight_attr, bias_attr)
- self.encoder = DeformableTransformerEncoder(encoder_layer,
- num_encoder_layers)
- decoder_layer = DeformableTransformerDecoderLayer(
- hidden_dim, nhead, dim_feedforward, dropout, activation,
- num_feature_levels, num_decoder_points, weight_attr, bias_attr)
- self.decoder = DeformableTransformerDecoder(
- decoder_layer, num_decoder_layers, return_intermediate_dec)
- self.level_embed = nn.Embedding(num_feature_levels, hidden_dim)
- self.tgt_embed = nn.Embedding(num_queries, hidden_dim)
- self.query_pos_embed = nn.Embedding(num_queries, hidden_dim)
- self.reference_points = nn.Linear(
- hidden_dim,
- 2,
- weight_attr=ParamAttr(learning_rate=lr_mult),
- bias_attr=ParamAttr(learning_rate=lr_mult))
- self.input_proj = nn.LayerList()
- for in_channels in backbone_num_channels:
- self.input_proj.append(
- nn.Sequential(
- nn.Conv2D(
- in_channels,
- hidden_dim,
- kernel_size=1,
- weight_attr=weight_attr,
- bias_attr=bias_attr),
- nn.GroupNorm(32, hidden_dim)))
- in_channels = backbone_num_channels[-1]
- for _ in range(num_feature_levels - len(backbone_num_channels)):
- self.input_proj.append(
- nn.Sequential(
- nn.Conv2D(
- in_channels,
- hidden_dim,
- kernel_size=3,
- stride=2,
- padding=1,
- weight_attr=weight_attr,
- bias_attr=bias_attr),
- nn.GroupNorm(32, hidden_dim)))
- in_channels = hidden_dim
- self.position_embedding = PositionEmbedding(
- hidden_dim // 2,
- normalize=True if position_embed_type == 'sine' else False,
- embed_type=position_embed_type,
- offset=-0.5)
- self._reset_parameters()
- def _reset_parameters(self):
- normal_(self.level_embed.weight)
- normal_(self.tgt_embed.weight)
- normal_(self.query_pos_embed.weight)
- xavier_uniform_(self.reference_points.weight)
- constant_(self.reference_points.bias)
- for l in self.input_proj:
- xavier_uniform_(l[0].weight)
- constant_(l[0].bias)
- @classmethod
- def from_config(cls, cfg, input_shape):
- return {'backbone_num_channels': [i.channels for i in input_shape], }
- def forward(self, src_feats, src_mask=None, *args, **kwargs):
- srcs = []
- for i in range(len(src_feats)):
- srcs.append(self.input_proj[i](src_feats[i]))
- if self.num_feature_levels > len(srcs):
- len_srcs = len(srcs)
- for i in range(len_srcs, self.num_feature_levels):
- if i == len_srcs:
- srcs.append(self.input_proj[i](src_feats[-1]))
- else:
- srcs.append(self.input_proj[i](srcs[-1]))
- src_flatten = []
- mask_flatten = []
- lvl_pos_embed_flatten = []
- spatial_shapes = []
- valid_ratios = []
- for level, src in enumerate(srcs):
- bs, _, h, w = paddle.shape(src)
- spatial_shapes.append(paddle.concat([h, w]))
- src = src.flatten(2).transpose([0, 2, 1])
- src_flatten.append(src)
- if src_mask is not None:
- mask = F.interpolate(src_mask.unsqueeze(0), size=(h, w))[0]
- else:
- mask = paddle.ones([bs, h, w])
- valid_ratios.append(get_valid_ratio(mask))
- pos_embed = self.position_embedding(mask).flatten(1, 2)
- lvl_pos_embed = pos_embed + self.level_embed.weight[level]
- lvl_pos_embed_flatten.append(lvl_pos_embed)
- mask = mask.flatten(1)
- mask_flatten.append(mask)
- src_flatten = paddle.concat(src_flatten, 1)
- mask_flatten = None if src_mask is None else paddle.concat(mask_flatten,
- 1)
- lvl_pos_embed_flatten = paddle.concat(lvl_pos_embed_flatten, 1)
- # [l, 2]
- spatial_shapes = paddle.to_tensor(
- paddle.stack(spatial_shapes).astype('int64'))
- # [l], 每一个level的起始index
- level_start_index = paddle.concat([
- paddle.zeros(
- [1], dtype='int64'), spatial_shapes.prod(1).cumsum(0)[:-1]
- ])
- # [b, l, 2]
- valid_ratios = paddle.stack(valid_ratios, 1)
- # encoder
- memory = self.encoder(src_flatten, spatial_shapes, level_start_index,
- mask_flatten, lvl_pos_embed_flatten, valid_ratios)
- # prepare input for decoder
- bs, _, c = memory.shape
- query_embed = self.query_pos_embed.weight.unsqueeze(0).tile([bs, 1, 1])
- tgt = self.tgt_embed.weight.unsqueeze(0).tile([bs, 1, 1])
- reference_points = F.sigmoid(self.reference_points(query_embed))
- reference_points_input = reference_points.unsqueeze(
- 2) * valid_ratios.unsqueeze(1)
- # decoder
- hs = self.decoder(tgt, reference_points_input, memory, spatial_shapes,
- level_start_index, mask_flatten, query_embed)
- return (hs, memory, reference_points)
|