123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279 |
- # copyright (c) 2022 PaddlePaddle Authors. All Rights Reserve.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- """
- Code is refer from:
- https://github.com/RuijieJ/pren/blob/main/Nets/EfficientNet.py
- """
- from __future__ import absolute_import
- from __future__ import division
- from __future__ import print_function
- import math
- import re
- import collections
- import paddle
- import paddle.nn as nn
- import paddle.nn.functional as F
- __all__ = ['EfficientNetb3']
- GlobalParams = collections.namedtuple('GlobalParams', [
- 'batch_norm_momentum', 'batch_norm_epsilon', 'dropout_rate', 'num_classes',
- 'width_coefficient', 'depth_coefficient', 'depth_divisor', 'min_depth',
- 'drop_connect_rate', 'image_size'
- ])
- BlockArgs = collections.namedtuple('BlockArgs', [
- 'kernel_size', 'num_repeat', 'input_filters', 'output_filters',
- 'expand_ratio', 'id_skip', 'stride', 'se_ratio'
- ])
- class BlockDecoder:
- @staticmethod
- def _decode_block_string(block_string):
- assert isinstance(block_string, str)
- ops = block_string.split('_')
- options = {}
- for op in ops:
- splits = re.split(r'(\d.*)', op)
- if len(splits) >= 2:
- key, value = splits[:2]
- options[key] = value
- assert (('s' in options and len(options['s']) == 1) or
- (len(options['s']) == 2 and options['s'][0] == options['s'][1]))
- return BlockArgs(
- kernel_size=int(options['k']),
- num_repeat=int(options['r']),
- input_filters=int(options['i']),
- output_filters=int(options['o']),
- expand_ratio=int(options['e']),
- id_skip=('noskip' not in block_string),
- se_ratio=float(options['se']) if 'se' in options else None,
- stride=[int(options['s'][0])])
- @staticmethod
- def decode(string_list):
- assert isinstance(string_list, list)
- blocks_args = []
- for block_string in string_list:
- blocks_args.append(BlockDecoder._decode_block_string(block_string))
- return blocks_args
- def efficientnet(width_coefficient=None,
- depth_coefficient=None,
- dropout_rate=0.2,
- drop_connect_rate=0.2,
- image_size=None,
- num_classes=1000):
- blocks_args = [
- 'r1_k3_s11_e1_i32_o16_se0.25',
- 'r2_k3_s22_e6_i16_o24_se0.25',
- 'r2_k5_s22_e6_i24_o40_se0.25',
- 'r3_k3_s22_e6_i40_o80_se0.25',
- 'r3_k5_s11_e6_i80_o112_se0.25',
- 'r4_k5_s22_e6_i112_o192_se0.25',
- 'r1_k3_s11_e6_i192_o320_se0.25',
- ]
- blocks_args = BlockDecoder.decode(blocks_args)
- global_params = GlobalParams(
- batch_norm_momentum=0.99,
- batch_norm_epsilon=1e-3,
- dropout_rate=dropout_rate,
- drop_connect_rate=drop_connect_rate,
- num_classes=num_classes,
- width_coefficient=width_coefficient,
- depth_coefficient=depth_coefficient,
- depth_divisor=8,
- min_depth=None,
- image_size=image_size, )
- return blocks_args, global_params
- class EffUtils:
- @staticmethod
- def round_filters(filters, global_params):
- """ Calculate and round number of filters based on depth multiplier. """
- multiplier = global_params.width_coefficient
- if not multiplier:
- return filters
- divisor = global_params.depth_divisor
- min_depth = global_params.min_depth
- filters *= multiplier
- min_depth = min_depth or divisor
- new_filters = max(min_depth,
- int(filters + divisor / 2) // divisor * divisor)
- if new_filters < 0.9 * filters:
- new_filters += divisor
- return int(new_filters)
- @staticmethod
- def round_repeats(repeats, global_params):
- """ Round number of filters based on depth multiplier. """
- multiplier = global_params.depth_coefficient
- if not multiplier:
- return repeats
- return int(math.ceil(multiplier * repeats))
- class MbConvBlock(nn.Layer):
- def __init__(self, block_args):
- super(MbConvBlock, self).__init__()
- self._block_args = block_args
- self.has_se = (self._block_args.se_ratio is not None) and \
- (0 < self._block_args.se_ratio <= 1)
- self.id_skip = block_args.id_skip
- # expansion phase
- self.inp = self._block_args.input_filters
- oup = self._block_args.input_filters * self._block_args.expand_ratio
- if self._block_args.expand_ratio != 1:
- self._expand_conv = nn.Conv2D(self.inp, oup, 1, bias_attr=False)
- self._bn0 = nn.BatchNorm(oup)
- # depthwise conv phase
- k = self._block_args.kernel_size
- s = self._block_args.stride
- if isinstance(s, list):
- s = s[0]
- self._depthwise_conv = nn.Conv2D(
- oup,
- oup,
- groups=oup,
- kernel_size=k,
- stride=s,
- padding='same',
- bias_attr=False)
- self._bn1 = nn.BatchNorm(oup)
- # squeeze and excitation layer, if desired
- if self.has_se:
- num_squeezed_channels = max(1,
- int(self._block_args.input_filters *
- self._block_args.se_ratio))
- self._se_reduce = nn.Conv2D(oup, num_squeezed_channels, 1)
- self._se_expand = nn.Conv2D(num_squeezed_channels, oup, 1)
- # output phase and some util class
- self.final_oup = self._block_args.output_filters
- self._project_conv = nn.Conv2D(oup, self.final_oup, 1, bias_attr=False)
- self._bn2 = nn.BatchNorm(self.final_oup)
- self._swish = nn.Swish()
- def _drop_connect(self, inputs, p, training):
- if not training:
- return inputs
- batch_size = inputs.shape[0]
- keep_prob = 1 - p
- random_tensor = keep_prob
- random_tensor += paddle.rand([batch_size, 1, 1, 1], dtype=inputs.dtype)
- random_tensor = paddle.to_tensor(random_tensor, place=inputs.place)
- binary_tensor = paddle.floor(random_tensor)
- output = inputs / keep_prob * binary_tensor
- return output
- def forward(self, inputs, drop_connect_rate=None):
- # expansion and depthwise conv
- x = inputs
- if self._block_args.expand_ratio != 1:
- x = self._swish(self._bn0(self._expand_conv(inputs)))
- x = self._swish(self._bn1(self._depthwise_conv(x)))
- # squeeze and excitation
- if self.has_se:
- x_squeezed = F.adaptive_avg_pool2d(x, 1)
- x_squeezed = self._se_expand(
- self._swish(self._se_reduce(x_squeezed)))
- x = F.sigmoid(x_squeezed) * x
- x = self._bn2(self._project_conv(x))
- # skip conntection and drop connect
- if self.id_skip and self._block_args.stride == 1 and \
- self.inp == self.final_oup:
- if drop_connect_rate:
- x = self._drop_connect(
- x, p=drop_connect_rate, training=self.training)
- x = x + inputs
- return x
- class EfficientNetb3_PREN(nn.Layer):
- def __init__(self, in_channels):
- super(EfficientNetb3_PREN, self).__init__()
- """
- the fllowing are efficientnetb3's superparams,
- they means efficientnetb3 network's width, depth, resolution and
- dropout respectively, to fit for text recognition task, the resolution
- here is changed from 300 to 64.
- """
- w, d, s, p = 1.2, 1.4, 64, 0.3
- self._blocks_args, self._global_params = efficientnet(
- width_coefficient=w,
- depth_coefficient=d,
- dropout_rate=p,
- image_size=s)
- self.out_channels = []
- # stem
- out_channels = EffUtils.round_filters(32, self._global_params)
- self._conv_stem = nn.Conv2D(
- in_channels, out_channels, 3, 2, padding='same', bias_attr=False)
- self._bn0 = nn.BatchNorm(out_channels)
- # build blocks
- self._blocks = []
- # to extract three feature maps for fpn based on efficientnetb3 backbone
- self._concerned_block_idxes = [7, 17, 25]
- _concerned_idx = 0
- for i, block_args in enumerate(self._blocks_args):
- block_args = block_args._replace(
- input_filters=EffUtils.round_filters(block_args.input_filters,
- self._global_params),
- output_filters=EffUtils.round_filters(block_args.output_filters,
- self._global_params),
- num_repeat=EffUtils.round_repeats(block_args.num_repeat,
- self._global_params))
- self._blocks.append(
- self.add_sublayer(f"{i}-0", MbConvBlock(block_args)))
- _concerned_idx += 1
- if _concerned_idx in self._concerned_block_idxes:
- self.out_channels.append(block_args.output_filters)
- if block_args.num_repeat > 1:
- block_args = block_args._replace(
- input_filters=block_args.output_filters, stride=1)
- for j in range(block_args.num_repeat - 1):
- self._blocks.append(
- self.add_sublayer(f'{i}-{j+1}', MbConvBlock(block_args)))
- _concerned_idx += 1
- if _concerned_idx in self._concerned_block_idxes:
- self.out_channels.append(block_args.output_filters)
- self._swish = nn.Swish()
- def forward(self, inputs):
- outs = []
- x = self._swish(self._bn0(self._conv_stem(inputs)))
- for idx, block in enumerate(self._blocks):
- drop_connect_rate = self._global_params.drop_connect_rate
- if drop_connect_rate:
- drop_connect_rate *= float(idx) / len(self._blocks)
- x = block(x, drop_connect_rate=drop_connect_rate)
- if idx in self._concerned_block_idxes:
- outs.append(x)
- return outs
|