123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355 |
- # Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- # Modified from DETR (https://github.com/facebookresearch/detr)
- # Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
- from __future__ import absolute_import
- from __future__ import division
- from __future__ import print_function
- import paddle
- import paddle.nn as nn
- import paddle.nn.functional as F
- from ppdet.core.workspace import register
- from ..layers import MultiHeadAttention, _convert_attention_mask
- from .position_encoding import PositionEmbedding
- from .utils import _get_clones
- from ..initializer import linear_init_, conv_init_, xavier_uniform_, normal_
- __all__ = ['DETRTransformer']
- class TransformerEncoderLayer(nn.Layer):
- def __init__(self,
- d_model,
- nhead,
- dim_feedforward=2048,
- dropout=0.1,
- activation="relu",
- attn_dropout=None,
- act_dropout=None,
- normalize_before=False):
- super(TransformerEncoderLayer, self).__init__()
- attn_dropout = dropout if attn_dropout is None else attn_dropout
- act_dropout = dropout if act_dropout is None else act_dropout
- self.normalize_before = normalize_before
- self.self_attn = MultiHeadAttention(d_model, nhead, attn_dropout)
- # Implementation of Feedforward model
- self.linear1 = nn.Linear(d_model, dim_feedforward)
- self.dropout = nn.Dropout(act_dropout, mode="upscale_in_train")
- self.linear2 = nn.Linear(dim_feedforward, d_model)
- self.norm1 = nn.LayerNorm(d_model)
- self.norm2 = nn.LayerNorm(d_model)
- self.dropout1 = nn.Dropout(dropout, mode="upscale_in_train")
- self.dropout2 = nn.Dropout(dropout, mode="upscale_in_train")
- self.activation = getattr(F, activation)
- self._reset_parameters()
- def _reset_parameters(self):
- linear_init_(self.linear1)
- linear_init_(self.linear2)
- @staticmethod
- def with_pos_embed(tensor, pos_embed):
- return tensor if pos_embed is None else tensor + pos_embed
- def forward(self, src, src_mask=None, pos_embed=None):
- residual = src
- if self.normalize_before:
- src = self.norm1(src)
- q = k = self.with_pos_embed(src, pos_embed)
- src = self.self_attn(q, k, value=src, attn_mask=src_mask)
- src = residual + self.dropout1(src)
- if not self.normalize_before:
- src = self.norm1(src)
- residual = src
- if self.normalize_before:
- src = self.norm2(src)
- src = self.linear2(self.dropout(self.activation(self.linear1(src))))
- src = residual + self.dropout2(src)
- if not self.normalize_before:
- src = self.norm2(src)
- return src
- class TransformerEncoder(nn.Layer):
- def __init__(self, encoder_layer, num_layers, norm=None):
- super(TransformerEncoder, self).__init__()
- self.layers = _get_clones(encoder_layer, num_layers)
- self.num_layers = num_layers
- self.norm = norm
- def forward(self, src, src_mask=None, pos_embed=None):
- output = src
- for layer in self.layers:
- output = layer(output, src_mask=src_mask, pos_embed=pos_embed)
- if self.norm is not None:
- output = self.norm(output)
- return output
- class TransformerDecoderLayer(nn.Layer):
- def __init__(self,
- d_model,
- nhead,
- dim_feedforward=2048,
- dropout=0.1,
- activation="relu",
- attn_dropout=None,
- act_dropout=None,
- normalize_before=False):
- super(TransformerDecoderLayer, self).__init__()
- attn_dropout = dropout if attn_dropout is None else attn_dropout
- act_dropout = dropout if act_dropout is None else act_dropout
- self.normalize_before = normalize_before
- self.self_attn = MultiHeadAttention(d_model, nhead, attn_dropout)
- self.cross_attn = MultiHeadAttention(d_model, nhead, attn_dropout)
- # Implementation of Feedforward model
- self.linear1 = nn.Linear(d_model, dim_feedforward)
- self.dropout = nn.Dropout(act_dropout, mode="upscale_in_train")
- self.linear2 = nn.Linear(dim_feedforward, d_model)
- self.norm1 = nn.LayerNorm(d_model)
- self.norm2 = nn.LayerNorm(d_model)
- self.norm3 = nn.LayerNorm(d_model)
- self.dropout1 = nn.Dropout(dropout, mode="upscale_in_train")
- self.dropout2 = nn.Dropout(dropout, mode="upscale_in_train")
- self.dropout3 = nn.Dropout(dropout, mode="upscale_in_train")
- self.activation = getattr(F, activation)
- self._reset_parameters()
- def _reset_parameters(self):
- linear_init_(self.linear1)
- linear_init_(self.linear2)
- @staticmethod
- def with_pos_embed(tensor, pos_embed):
- return tensor if pos_embed is None else tensor + pos_embed
- def forward(self,
- tgt,
- memory,
- tgt_mask=None,
- memory_mask=None,
- pos_embed=None,
- query_pos_embed=None):
- tgt_mask = _convert_attention_mask(tgt_mask, tgt.dtype)
- residual = tgt
- if self.normalize_before:
- tgt = self.norm1(tgt)
- q = k = self.with_pos_embed(tgt, query_pos_embed)
- tgt = self.self_attn(q, k, value=tgt, attn_mask=tgt_mask)
- tgt = residual + self.dropout1(tgt)
- if not self.normalize_before:
- tgt = self.norm1(tgt)
- residual = tgt
- if self.normalize_before:
- tgt = self.norm2(tgt)
- q = self.with_pos_embed(tgt, query_pos_embed)
- k = self.with_pos_embed(memory, pos_embed)
- tgt = self.cross_attn(q, k, value=memory, attn_mask=memory_mask)
- tgt = residual + self.dropout2(tgt)
- if not self.normalize_before:
- tgt = self.norm2(tgt)
- residual = tgt
- if self.normalize_before:
- tgt = self.norm3(tgt)
- tgt = self.linear2(self.dropout(self.activation(self.linear1(tgt))))
- tgt = residual + self.dropout3(tgt)
- if not self.normalize_before:
- tgt = self.norm3(tgt)
- return tgt
- class TransformerDecoder(nn.Layer):
- def __init__(self,
- decoder_layer,
- num_layers,
- norm=None,
- return_intermediate=False):
- super(TransformerDecoder, self).__init__()
- self.layers = _get_clones(decoder_layer, num_layers)
- self.num_layers = num_layers
- self.norm = norm
- self.return_intermediate = return_intermediate
- def forward(self,
- tgt,
- memory,
- tgt_mask=None,
- memory_mask=None,
- pos_embed=None,
- query_pos_embed=None):
- tgt_mask = _convert_attention_mask(tgt_mask, tgt.dtype)
- output = tgt
- intermediate = []
- for layer in self.layers:
- output = layer(
- output,
- memory,
- tgt_mask=tgt_mask,
- memory_mask=memory_mask,
- pos_embed=pos_embed,
- query_pos_embed=query_pos_embed)
- if self.return_intermediate:
- intermediate.append(self.norm(output))
- if self.norm is not None:
- output = self.norm(output)
- if self.return_intermediate:
- return paddle.stack(intermediate)
- return output.unsqueeze(0)
- @register
- class DETRTransformer(nn.Layer):
- __shared__ = ['hidden_dim']
- def __init__(self,
- num_queries=100,
- position_embed_type='sine',
- return_intermediate_dec=True,
- backbone_num_channels=2048,
- hidden_dim=256,
- nhead=8,
- num_encoder_layers=6,
- num_decoder_layers=6,
- dim_feedforward=2048,
- dropout=0.1,
- activation="relu",
- attn_dropout=None,
- act_dropout=None,
- normalize_before=False):
- super(DETRTransformer, self).__init__()
- assert position_embed_type in ['sine', 'learned'],\
- f'ValueError: position_embed_type not supported {position_embed_type}!'
- self.hidden_dim = hidden_dim
- self.nhead = nhead
- encoder_layer = TransformerEncoderLayer(
- hidden_dim, nhead, dim_feedforward, dropout, activation,
- attn_dropout, act_dropout, normalize_before)
- encoder_norm = nn.LayerNorm(hidden_dim) if normalize_before else None
- self.encoder = TransformerEncoder(encoder_layer, num_encoder_layers,
- encoder_norm)
- decoder_layer = TransformerDecoderLayer(
- hidden_dim, nhead, dim_feedforward, dropout, activation,
- attn_dropout, act_dropout, normalize_before)
- decoder_norm = nn.LayerNorm(hidden_dim)
- self.decoder = TransformerDecoder(
- decoder_layer,
- num_decoder_layers,
- decoder_norm,
- return_intermediate=return_intermediate_dec)
- self.input_proj = nn.Conv2D(
- backbone_num_channels, hidden_dim, kernel_size=1)
- self.query_pos_embed = nn.Embedding(num_queries, hidden_dim)
- self.position_embedding = PositionEmbedding(
- hidden_dim // 2,
- normalize=True if position_embed_type == 'sine' else False,
- embed_type=position_embed_type)
- self._reset_parameters()
- def _reset_parameters(self):
- for p in self.parameters():
- if p.dim() > 1:
- xavier_uniform_(p)
- conv_init_(self.input_proj)
- normal_(self.query_pos_embed.weight)
- @classmethod
- def from_config(cls, cfg, input_shape):
- return {
- 'backbone_num_channels': [i.channels for i in input_shape][-1],
- }
- def _convert_attention_mask(self, mask):
- return (mask - 1.0) * 1e9
- def forward(self, src, src_mask=None, *args, **kwargs):
- r"""
- Applies a Transformer model on the inputs.
- Parameters:
- src (List(Tensor)): Backbone feature maps with shape [[bs, c, h, w]].
- src_mask (Tensor, optional): A tensor used in multi-head attention
- to prevents attention to some unwanted positions, usually the
- paddings or the subsequent positions. It is a tensor with shape
- [bs, H, W]`. When the data type is bool, the unwanted positions
- have `False` values and the others have `True` values. When the
- data type is int, the unwanted positions have 0 values and the
- others have 1 values. When the data type is float, the unwanted
- positions have `-INF` values and the others have 0 values. It
- can be None when nothing wanted or needed to be prevented
- attention to. Default None.
- Returns:
- output (Tensor): [num_levels, batch_size, num_queries, hidden_dim]
- memory (Tensor): [batch_size, hidden_dim, h, w]
- """
- # use last level feature map
- src_proj = self.input_proj(src[-1])
- bs, c, h, w = paddle.shape(src_proj)
- # flatten [B, C, H, W] to [B, HxW, C]
- src_flatten = src_proj.flatten(2).transpose([0, 2, 1])
- if src_mask is not None:
- src_mask = F.interpolate(src_mask.unsqueeze(0), size=(h, w))[0]
- else:
- src_mask = paddle.ones([bs, h, w])
- pos_embed = self.position_embedding(src_mask).flatten(1, 2)
- if self.training:
- src_mask = self._convert_attention_mask(src_mask)
- src_mask = src_mask.reshape([bs, 1, 1, h * w])
- else:
- src_mask = None
- memory = self.encoder(
- src_flatten, src_mask=src_mask, pos_embed=pos_embed)
- query_pos_embed = self.query_pos_embed.weight.unsqueeze(0).tile(
- [bs, 1, 1])
- tgt = paddle.zeros_like(query_pos_embed)
- output = self.decoder(
- tgt,
- memory,
- memory_mask=src_mask,
- pos_embed=pos_embed,
- query_pos_embed=query_pos_embed)
- if self.training:
- src_mask = src_mask.reshape([bs, 1, 1, h, w])
- else:
- src_mask = None
- return (output, memory.transpose([0, 2, 1]).reshape([bs, c, h, w]),
- src_proj, src_mask)
|