mmpretrain/mmcls/models/backbones/shufflenet_v1.py

365 lines
12 KiB
Python

import logging
import torch
import torch.nn as nn
import torch.utils.checkpoint as cp
from mmcv.cnn import (ConvModule, build_activation_layer, build_conv_layer,
constant_init, kaiming_init)
from mmcv.runner import load_checkpoint
from torch.nn.modules.batchnorm import _BatchNorm
from .base_backbone import BaseBackbone
def channel_shuffle(x, groups):
""" Channel Shuffle operation.
This function enables cross-group information flow for multiple group
convolution layers.
Args:
x (Tensor): The input tensor.
groups (int): The number of groups to divide the input tensor
in channel dimension.
Returns:
Tensor: The output tensor after channel shuffle operation.
"""
batchsize, num_channels, height, width = x.size()
assert (num_channels % groups == 0), ('num_channels should be '
'divisible by groups')
channels_per_group = num_channels // groups
x = x.view(batchsize, groups, channels_per_group, height, width)
x = torch.transpose(x, 1, 2).contiguous()
x = x.view(batchsize, -1, height, width)
return x
def _make_divisible(v, divisor, min_value=None):
""" Make divisible function.
This function ensures that all layers have a channel number that is
divisible by divisor.
Args:
v (int): The original channel number
divisor (int): The divisor to fully divide the channel number
min_value (int, optional): the minimum value of the output channel.
Returns:
int: The modified output channel number
"""
if min_value is None:
min_value = divisor
new_v = max(min_value, int(v + divisor / 2) // divisor * divisor)
# Make sure that round down does not go down by more than 10%.
if new_v < 0.9 * v:
new_v += divisor
return new_v
class ShuffleUnit(nn.Module):
"""ShuffleUnit block.
ShuffleNet unit with pointwise group convolution (GConv) and channel
shuffle.
Args:
inplanes (int): The input channels of the ShuffleUnit.
planes (int): The output channels of the ShuffleUnit.
groups (int, optional): The number of groups to be used in grouped 1x1
convolutions in each ShuffleUnit.
first_block (bool, optional): Whether is the first ShuffleUnit of a
sequential ShuffleUnits. If True, use the grouped 1x1 convolution.
combine (str, optional): The ways to combine the input and output
branches.
conv_cfg (dict): Config dict for convolution layer. Default: None,
which means using conv2d.
norm_cfg (dict): Config dict for normalization layer. Default: None.
act_cfg (dict): Config dict for activation layer.
Default: dict(type='ReLU').
with_cp (bool, optional): Use checkpoint or not. Using checkpoint
will save some memory while slowing down the training speed.
Returns:
Tensor: The output tensor.
"""
def __init__(self,
inplanes,
planes,
groups=3,
first_block=True,
combine='add',
conv_cfg=None,
norm_cfg=None,
act_cfg=dict(type='ReLU'),
with_cp=False):
super(ShuffleUnit, self).__init__()
self.inplanes = inplanes
self.planes = planes
self.first_block = first_block
self.combine = combine
self.groups = groups
self.bottleneck_channels = self.planes // 4
self.with_cp = with_cp
if self.combine == 'add':
self.depthwise_stride = 1
self._combine_func = self._add
assert inplanes == planes, ('inplanes must be equal to '
'planes when combine is add.')
elif self.combine == 'concat':
self.depthwise_stride = 2
self._combine_func = self._concat
self.planes -= self.inplanes
else:
raise ValueError(f'Cannot combine tensors with {self.combine}. '
f'Only "add" and "concat" are supported.')
self.first_1x1_groups = self.groups if first_block else 1
self.g_conv_1x1_compress = ConvModule(
in_channels=self.inplanes,
out_channels=self.bottleneck_channels,
kernel_size=1,
groups=self.first_1x1_groups,
conv_cfg=conv_cfg,
norm_cfg=norm_cfg,
act_cfg=act_cfg)
self.depthwise_conv3x3_bn = ConvModule(
in_channels=self.bottleneck_channels,
out_channels=self.bottleneck_channels,
kernel_size=3,
stride=self.depthwise_stride,
padding=1,
groups=self.bottleneck_channels,
conv_cfg=conv_cfg,
norm_cfg=norm_cfg,
act_cfg=None)
self.g_conv_1x1_expand = ConvModule(
in_channels=self.bottleneck_channels,
out_channels=self.planes,
kernel_size=1,
groups=self.groups,
conv_cfg=conv_cfg,
norm_cfg=norm_cfg,
act_cfg=None)
self.avgpool = nn.AvgPool2d(kernel_size=3, stride=2, padding=1)
self.act = build_activation_layer(act_cfg)
@staticmethod
def _add(x, out):
# residual connection
return x + out
@staticmethod
def _concat(x, out):
# concatenate along channel axis
return torch.cat((x, out), 1)
def forward(self, x):
def _inner_forward(x):
residual = x
if self.combine == 'concat':
residual = self.avgpool(residual)
out = self.g_conv_1x1_compress(x)
out = channel_shuffle(out, self.groups)
out = self.depthwise_conv3x3_bn(out)
out = self.g_conv_1x1_expand(out)
out = self._combine_func(residual, out)
out = self.act(out)
return out
if self.with_cp and x.requires_grad:
out = cp.checkpoint(_inner_forward, x)
else:
out = _inner_forward(x)
return out
class ShuffleNetv1(BaseBackbone):
"""ShuffleNetv1 backbone.
Args:
groups (int, optional): The number of groups to be used in grouped 1x1
convolutions in each ShuffleUnit. Default is 3 for best performance
according to original paper.
widen_factor (float, optional): Width multiplier - adjusts number of
channels in each layer by this amount. Default is 1.0.
out_indices (Sequence[int]): Output from which stages.
frozen_stages (int): Stages to be frozen (all param fixed). -1 means
not freezing any parameters.
conv_cfg (dict): Config dict for convolution layer. Default: None,
which means using conv2d.
norm_cfg (dict): Config dict for normalization layer. Default: None.
act_cfg (dict): Config dict for activation layer.
Default: dict(type='ReLU').
norm_eval (bool): Whether to set norm layers to eval mode, namely,
freeze running stats (mean and var). Note: Effect on Batch Norm
and its variants only.
with_cp (bool): Use checkpoint or not. Using checkpoint will save some
memory while slowing down the training speed.
"""
def __init__(self,
groups=3,
widen_factor=1.0,
out_indices=(0, 1, 2, 3),
frozen_stages=-1,
conv_cfg=None,
norm_cfg=None,
act_cfg=dict(type='ReLU'),
norm_eval=True,
with_cp=False):
super(ShuffleNetv1, self).__init__()
blocks = [3, 7, 3]
self.groups = groups
for indice in out_indices:
if indice not in range(0, 4):
raise ValueError(f'the item in out_indices must in '
f'range(0, 4). But received {indice}')
if frozen_stages not in range(-1, 4):
raise ValueError(f'frozen_stages must in range(-1, 4). '
f'But received {frozen_stages}')
self.out_indices = out_indices
self.frozen_stages = frozen_stages
self.conv_cfg = conv_cfg
self.norm_cfg = norm_cfg
self.act_cfg = act_cfg
self.norm_eval = norm_eval
self.with_cp = with_cp
if groups == 1:
channels = (144, 288, 576)
elif groups == 2:
channels = (200, 400, 800)
elif groups == 3:
channels = (240, 480, 960)
elif groups == 4:
channels = (272, 544, 1088)
elif groups == 8:
channels = (384, 768, 1536)
else:
raise ValueError(f'{groups} groups is not supported for 1x1 '
f'Grouped Convolutions')
channels = [_make_divisible(ch * widen_factor, 8) for ch in channels]
self.inplanes = int(24 * widen_factor)
self.conv1 = build_conv_layer(
self.conv_cfg,
in_channels=3,
out_channels=self.inplanes,
kernel_size=3,
stride=2,
padding=1,
bias=False)
self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
self.layer1 = self._make_layer(
channels[0], blocks[0], first_block=False)
self.layer2 = self._make_layer(channels[1], blocks[1])
self.layer3 = self._make_layer(channels[2], blocks[2])
def init_weights(self, pretrained=None):
if isinstance(pretrained, str):
logger = logging.getLogger()
load_checkpoint(self, pretrained, strict=False, logger=logger)
elif pretrained is None:
for m in self.modules():
if isinstance(m, nn.Conv2d):
kaiming_init(m)
elif isinstance(m, nn.BatchNorm2d):
constant_init(m, 1)
else:
raise TypeError('pretrained must be a str or None')
def _make_layer(self, outplanes, blocks, first_block=True):
""" Stack ShuffleUnit blocks to make a layer.
Args:
outplanes: Number of output channels.
blocks: Number of blocks to be built.
first_block (bool, optional): Whether is the first ShuffleUnit of a
sequential ShuffleUnits. If True, use the grouped 1x1
convolution.
Returns:
Module: A module consisting of several ShuffleUnit blocks.
"""
layers = []
for i in range(blocks):
first_block = first_block if i == 0 else True
combine_mode = 'concat' if i == 0 else 'add'
layers.append(
ShuffleUnit(
self.inplanes,
outplanes,
groups=self.groups,
first_block=first_block,
combine=combine_mode,
conv_cfg=self.conv_cfg,
norm_cfg=self.norm_cfg,
act_cfg=self.act_cfg,
with_cp=self.with_cp))
self.inplanes = outplanes
return nn.Sequential(*layers)
def forward(self, x):
x = self.conv1(x)
x = self.maxpool(x)
outs = []
x = self.layer1(x)
if 0 in self.out_indices:
outs.append(x)
x = self.layer2(x)
if 1 in self.out_indices:
outs.append(x)
x = self.layer3(x)
if 2 in self.out_indices:
outs.append(x)
outs.append(x)
if len(outs) == 1:
return outs[0]
else:
return tuple(outs)
def _freeze_stages(self):
if self.frozen_stages >= 0:
for param in self.conv1.parameters():
param.requires_grad = False
for i in range(1, self.frozen_stages + 1):
layer = getattr(self, f'layer{i}')
layer.eval()
for param in layer.parameters():
param.requires_grad = False
def train(self, mode=True):
super(ShuffleNetv1, self).train(mode)
self._freeze_stages()
if mode and self.norm_eval:
for m in self.modules():
if isinstance(m, _BatchNorm):
m.eval()