# -*- coding: utf-8 -*- # @Author: Weisen Pan import logging import torch import torch.nn as nn __all__ = ['ResNet', 'resnet110'] def apply_3x3_convolution(in_channels, out_channels, stride=1, groups=1, dilation=1): """3x3 Convolution with padding.""" return nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=dilation, groups=groups, bias=False, dilation=dilation) def apply_1x1_convolution(in_channels, out_channels, stride=1): """1x1 Convolution.""" return nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False) class BasicBlock(nn.Module): expansion = 1 def __init__(self, in_channels, out_channels, stride=1, downsample=None, groups=1, base_width=64, dilation=1, norm_layer=None): """Basic Block used in ResNet. Consists of two 3x3 convolutions.""" super(BasicBlock, self).__init__() if norm_layer is None: norm_layer = nn.BatchNorm2d if groups != 1 or base_width != 64: raise ValueError('BasicBlock only supports groups=1 and base_width=64') if dilation > 1: raise NotImplementedError("Dilation > 1 not supported in BasicBlock") self.conv1 = apply_3x3_convolution(in_channels, out_channels, stride) self.bn1 = norm_layer(out_channels) self.relu = nn.ReLU(inplace=True) self.conv2 = apply_3x3_convolution(out_channels, out_channels) self.bn2 = norm_layer(out_channels) self.downsample = downsample self.stride = stride def forward(self, x): """Defines the forward pass through the block.""" identity = x out = self.conv1(x) out = self.bn1(out) out = self.relu(out) out = self.conv2(out) out = self.bn2(out) if self.downsample is not None: identity = self.downsample(x) out += identity out = self.relu(out) return out class Bottleneck(nn.Module): expansion = 4 def __init__(self, in_channels, out_channels, stride=1, downsample=None, groups=1, base_width=64, dilation=1, norm_layer=None): """Bottleneck block used in ResNet. Has three layers: 1x1, 3x3, and 1x1 convolutions.""" super(Bottleneck, self).__init__() if norm_layer is None: norm_layer = nn.BatchNorm2d width = int(out_channels * (base_width / 64.)) * groups self.conv1 = apply_1x1_convolution(in_channels, width) self.bn1 = norm_layer(width) self.conv2 = apply_3x3_convolution(width, width, stride, groups, dilation) self.bn2 = norm_layer(width) self.conv3 = apply_1x1_convolution(width, out_channels * self.expansion) self.bn3 = norm_layer(out_channels * self.expansion) self.relu = nn.ReLU(inplace=True) self.downsample = downsample self.stride = stride def forward(self, x): """Defines the forward pass through the bottleneck block.""" identity = x out = self.conv1(x) out = self.bn1(out) out = self.relu(out) out = self.conv2(out) out = self.bn2(out) out = self.relu(out) out = self.conv3(out) out = self.bn3(out) if self.downsample is not None: identity = self.downsample(x) out += identity out = self.relu(out) return out class ResNet(nn.Module): def __init__(self, block, layers, num_classes=10, zero_init_residual=False, groups=1, width_per_group=64, replace_stride_with_dilation=None, norm_layer=None, KD=False): """Defines the ResNet architecture.""" super(ResNet, self).__init__() if norm_layer is None: norm_layer = nn.BatchNorm2d self._norm_layer = norm_layer self.inplanes = 16 self.dilation = 1 if replace_stride_with_dilation is None: replace_stride_with_dilation = [False, False, False] if len(replace_stride_with_dilation) != 3: raise ValueError("replace_stride_with_dilation should be None or a 3-element tuple.") self.groups = groups self.base_width = width_per_group self.conv1 = nn.Conv2d(3, self.inplanes, kernel_size=3, stride=1, padding=1, bias=False) self.bn1 = nn.BatchNorm2d(self.inplanes) self.relu = nn.ReLU(inplace=True) self.layer1 = self._create_model_layer(block, 16, layers[0]) self.layer2 = self._create_model_layer(block, 32, layers[1], stride=2) self.layer3 = self._create_model_layer(block, 64, layers[2], stride=2) self.avgpool = nn.AdaptiveAvgPool2d((1, 1)) self.fc = nn.Linear(64 * block.expansion, num_classes) self.KD = KD for m in self.modules(): if isinstance(m, nn.Conv2d): nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu') elif isinstance(m, nn.BatchNorm2d): nn.init.constant_(m.weight, 1) nn.init.constant_(m.bias, 0) if zero_init_residual: for m in self.modules(): if isinstance(m, Bottleneck): nn.init.constant_(m.bn3.weight, 0) elif isinstance(m, BasicBlock): nn.init.constant_(m.bn2.weight, 0) def _create_model_layer(self, block, planes, blocks, stride=1, dilate=False): """Creates a layer in ResNet using the specified block type.""" norm_layer = self._norm_layer downsample = None previous_dilation = self.dilation if dilate: self.dilation *= stride stride = 1 if stride != 1 or self.inplanes != planes * block.expansion: downsample = nn.Sequential( apply_1x1_convolution(self.inplanes, planes * block.expansion, stride), norm_layer(planes * block.expansion), ) layers = [] layers.append(block(self.inplanes, planes, stride, downsample, self.groups, self.base_width, previous_dilation, norm_layer)) self.inplanes = planes * block.expansion for _ in range(1, blocks): layers.append(block(self.inplanes, planes, groups=self.groups, base_width=self.base_width, dilation=self.dilation, norm_layer=norm_layer)) return nn.Sequential(*layers) def forward(self, x): """Defines the forward pass of the ResNet model.""" x = self.layer1(x) # Output: B x 16 x 32 x 32 x = self.layer2(x) # Output: B x 32 x 16 x 16 x = self.layer3(x) # Output: B x 64 x 8 x 8 x = self.avgpool(x) # Output: B x 64 x 1 x 1 x_f = x.view(x.size(0), -1) # Flatten: B x 64 x = self.fc(x_f) # Output: B x num_classes return x def resnet56_server(num_classes, models_pretrained=False, path=None, **kwargs): """ Constructs a ResNet-110 model. Args: num_classes (int): Number of output classes. models_pretrained (bool): If True, returns a model pre-trained on ImageNet. path (str): Path to the pre-trained model. """ logging.info("Loading model with path: " + str(path)) model = ResNet(Bottleneck, [6, 6, 6], num_classes=num_classes, **kwargs) if models_pretrained: checkpoint = torch.load(path) state_dict = checkpoint['state_dict'] new_state_dict = {k.replace("module.", ""): v for k, v in state_dict.items()} model.load_state_dict(new_state_dict) return model