前言
在后ResNet时代,EfficientNet作为仅用ResNet 3%的参数就达到其90%性能的存在,值得我们的研究一番
目标:实现EfficientNet的完整架构,包括:
- Stem: 初始卷积层
- Blocks: MBConv Block的堆叠
- Head: 顶部卷积层
- Classifier: 分类头
论文:EfficientNet: Rethinking Model Scaling for Convolutional Neural Networks
主要参考的项目:牛津大佬luckmeals的Pytorch实现(但是代码硬编码优点严重,所以这里重新实现了一下,但70%的代码还是他的,这里只是重新研究了一下)
开始!
实现Swish和MemoryEfficientSwish函数
首先是标准Swish函数,表达式
::: align-center
Swish(x)=x\sigma(x)
:::
python
class Swish(nn.Module):
"""
Swish激活函数 - 标准实现
函数定义:
Swish(x) = x * sigmoid(x)
这是一种平滑的非单调激活函数,在深度网络中表现优于ReLU
优点:
- 可微分,梯度平滑
- 非单调性有助于增强表达能力
- 计算开销略高于ReLU
示例:
>>> activation = Swish()
>>> x = torch.randn(1, 3, 224, 224)
>>> output = activation(x) # 形状: [1, 3, 224, 224]
"""
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
前向传播函数
参数:
x (torch.Tensor): 输入张量,形状任意
返回:
torch.Tensor: 激活后的张量,形状与输入相同
"""
return x * torch.sigmoid(x)
backward函数不需要自己写,Pytorch会自动分析得到计算图x\rightarrow \sigma(x)\rightarrow x\sigma(x),之后在反向传播过程中使用内置的自动微分
Pytorch已经足够智能,在前向传播过程中会保存中间结果\sigma(x),用空间换时间,但是EfficientNet为了避免内存占用过大选择设计了MemoryEfficientSiwsh激活函数,它不保存中间结果,在反向传播过程中现场计算,这里实现一下,只保存x就行了,但是缺点是我们破坏了原本的Pytorch逻辑,所以ONNX导出就废掉了
python
class SwishImplementation(torch.autograd.Function):
"""
Swish激活函数的自定义自动微分实现
"""
@staticmethod
def forward(ctx: Any, input_tensor: torch.Tensor) -> torch.Tensor:
"""
前向传播
参数:
ctx: PyTorch上下文对象,用于保存反向传播需要的信息
input_tensor: 输入张量
返回:
输出张量 = input_tensor * sigmoid(input_tensor)
"""
# 保存输入张量,用于反向传播
ctx.save_for_backward(input_tensor)
# 计算 Swish: x * sigmoid(x)
result = input_tensor * torch.sigmoid(input_tensor)
return result
@staticmethod
def backward(ctx: Any, grad_output: torch.Tensor) -> torch.Tensor:
"""
反向传播
参数:
ctx: 包含前向传播保存信息的上下文对象
grad_output: 来自后续层的梯度,形状与forward输出相同
"""
# 获取前向传播保存的输入
input_tensor = ctx.saved_tensors[0]
# 重新计算sigmoid值
sigmoid_input = torch.sigmoid(input_tensor)
# 计算Swish的导数: σ(x) * (1 + x * (1 - σ(x)))
# 分解计算以提高数值稳定性
derivative = sigmoid_input * (1 + input_tensor * (1 - sigmoid_input))
# 应用链式法则
grad_input = grad_output * derivative
return grad_input
class MemoryEfficientSwish(nn.Module):
"""
内存高效的Swish激活函数
说明:
使用自定义autograd函数实现的内存优化版Swish
在训练大型网络时显著减少显存占用
注意:
- 不支持ONNX导出
- 不支持PyTorch JIT编译
- 导出前需切换为标准Swish
示例:
>>> # 训练时使用
>>> activation = MemoryEfficientSwish()
>>> x = torch.randn(32, 128, 56, 56, requires_grad=True)
>>> output = activation(x)
>>> loss = output.sum()
>>> loss.backward() # 内存占用更少
"""
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
前向传播函数
参数:
x (torch.Tensor): 输入张量,形状任意
返回:
torch.Tensor: 激活后的张量,形状与输入相同
实现:
调用SwishImplementation.apply()执行自定义前向/反向传播
"""
return SwishImplementation.apply(x)
之后我们遵循OOP思想,设计一个工厂函数以供后面我们快速调用
python
def create_activation(memory_efficient: bool = True) -> nn.Module:
"""
激活函数工厂方法
功能说明:
根据需求创建合适的Swish激活函数实例
提供统一的创建接口,便于切换实现
参数:
memory_efficient (bool): 是否使用内存优化版本
- True: 返回MemoryEfficientSwish(默认,用于训练)
- False: 返回标准Swish(用于导出)
返回:
nn.Module: Swish激活函数实例
使用示例:
>>> # 训练时
>>> train_activation = create_activation(memory_efficient=True)
>>>
>>> # 导出时
>>> export_activation = create_activation(memory_efficient=False)
>>>
>>> # 在模型中使用
>>> self.activation = create_activation(memory_efficient=self.training)
"""
if memory_efficient:
return MemoryEfficientSwish()
else:
return Swish()
动态Padding卷积
EfficientNet考虑的一个非常重要的问题就是
::: align-center
以往CNN网络训练时输入的图像尺寸总是224x224,到底是为啥?别问,问就是工程经验!
:::
所以它们就把输入尺寸给调整了
谷歌原来是在TF上做的实验,但问题是到了Pytorch,卷积层的输入图像尺寸是硬编码的,一遇到不同尺寸的图像输入进去就废掉了,但是我们还想用Pytorch的标准卷积,所以我们就需要对输入矩阵进行Padding,而这个Padding应该被设计为可以根据输入尺寸动态决定
首先我们给定一个任意尺寸i\times i的图片,要求卷积输出o\times o得是(\frac{i}{stride},\frac{i}{stride}),其中stride是我们的卷积步长,已知标准的卷积尺寸计算公式是
::: align-center
o=floor(\frac{i-(k-1)\times d - p}{stride})+1
:::
我们想要保证
::: align-center
o=ceil(\frac{i}{stride})
:::
计算得到所需的Padding p为
::: align-center
(o-1)\times stride +(k-1)\times d + 1 - i
:::
注意上面的标准卷积公式有些写为2p,这里我们后边是会整除2的
继承Pytorch的2D卷积模块并覆写它的前向传播函数,在传播的时候动态进行Padding,我们即可得到结果
python
class Conv2dDynamicSamePadding(nn.Conv2d):
"""
动态Same Padding的2D卷积层
功能说明:
模仿TensorFlow的'SAME'填充模式,在前向传播时动态计算padding
确保输出尺寸 = ceil(输入尺寸 / stride)
使用场景:
- 训练阶段使用
- 输入图像尺寸不固定的情况
- 不需要导出为ONNX的场景
Padding计算原理:
对于输入尺寸i、步长s、卷积核大小k、膨胀率d:
- 输出尺寸: o = ceil(i / s)
- 所需padding: p = max((o - 1) * s + (k - 1) * d + 1 - i, 0)
- 左侧padding: p // 2
- 右侧padding: p - p // 2
数学公式推导:
标准卷积输出计算: o = floor((i + p - (k-1)*d - 1) / s) + 1
要使 o = ceil(i / s),需要保证:
floor((i + p - (k-1)*d - 1) / s) + 1 = ceil(i / s)
解得: p = (o - 1) * s + (k - 1) * d + 1 - i
示例:
>>> # 创建3x3卷积,步长为2
>>> conv = Conv2dDynamicSamePadding(
... in_channels=32, out_channels=64,
... kernel_size=3, stride=2
... )
>>> x = torch.randn(1, 32, 56, 56)
>>> output = conv(x) # 形状: [1, 64, 28, 28]
"""
def __init__(
self,
in_channels: int,
out_channels: int,
kernel_size: Union[int, Tuple[int, int]],
stride: Union[int, Tuple[int, int]] = 1,
dilation: Union[int, Tuple[int, int]] = 1,
groups: int = 1,
bias: bool = True
):
"""
初始化动态Same Padding卷积层
参数:
in_channels (int): 输入通道数
out_channels (int): 输出通道数
kernel_size (int | tuple): 卷积核大小
- int: 正方形卷积核
- tuple: (height, width)
stride (int | tuple): 步长,默认1
dilation (int | tuple): 膨胀率,默认1
groups (int): 分组卷积的组数,默认1
- groups=1: 标准卷积
- groups=in_channels: 深度可分离卷积
bias (bool): 是否使用偏置,默认True
注意:
- padding参数固定为0,因为会动态计算
- stride会被标准化为长度为2的列表
"""
# 调用父类初始化,padding设为0(动态计算)
super().__init__(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=kernel_size,
stride=stride,
padding=0, # 不使用固定padding
dilation=dilation,
groups=groups,
bias=bias
)
# 标准化stride为2元素列表 [stride_h, stride_w]
# 如果输入是单个值,复制为[stride, stride]
self.stride = (
[self.stride[0]] * 2
if len(self.stride) == 2
else self.stride
)
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
前向传播函数
参数:
x (torch.Tensor): 输入张量,形状 [B, C, H, W]
- B: batch size
- C: 通道数(必须等于in_channels)
- H: 高度
- W: 宽度
返回:
torch.Tensor: 输出张量,形状 [B, out_channels, H', W']
- H' = ceil(H / stride_h)
- W' = ceil(W / stride_w)
计算流程:
1. 获取输入的高度ih和宽度iw
2. 获取卷积核的高度kh和宽度kw
3. 计算输出尺寸 oh = ceil(ih / sh), ow = ceil(iw / sw)
4. 计算所需padding: pad_h, pad_w
5. 如果需要padding,对输入进行填充
6. 执行标准卷积操作
Padding分布:
- 高度方向: 上侧 pad_h // 2, 下侧 pad_h - pad_h // 2
- 宽度方向: 左侧 pad_w // 2, 右侧 pad_w - pad_w // 2
"""
# 获取输入空间维度 (高度, 宽度)
input_height, input_width = x.size()[-2:]
# 获取卷积核空间维度
kernel_height, kernel_width = self.weight.size()[-2:]
# 获取步长
stride_height, stride_width = self.stride
# 计算期望的输出尺寸(向上取整)
# 这是TensorFlow 'SAME'模式的核心
output_height = math.ceil(input_height / stride_height)
output_width = math.ceil(input_width / stride_width)
# 计算总共需要的padding
# 公式: pad = max((output - 1) * stride + (kernel - 1) * dilation + 1 - input, 0)
pad_height = max(
(output_height - 1) * self.stride[0] +
(kernel_height - 1) * self.dilation[0] + 1 -
input_height,
0
)
pad_width = max(
(output_width - 1) * self.stride[1] +
(kernel_width - 1) * self.dilation[1] + 1 -
input_width,
0
)
# 如果需要padding,进行填充
if pad_height > 0 or pad_width > 0:
# F.pad的padding顺序: (left, right, top, bottom)
x = F.pad(
x,
[
pad_width // 2, # 左侧padding
pad_width - pad_width // 2, # 右侧padding
pad_height // 2, # 上侧padding
pad_height - pad_height // 2 # 下侧padding
]
)
# 执行标准2D卷积
# 此时输入已经padding完成,使用self.padding=0
output = F.conv2d(
input=x,
weight=self.weight,
bias=self.bias,
stride=self.stride,
padding=self.padding, # 始终为0
dilation=self.dilation,
groups=self.groups
)
return output
这样我们就又成功的破坏了Pytorch原本的卷积逻辑,ONNX导出再一次被废掉了。在部署时图像应是定尺寸的,同时为了支持ONNX,我们还需要实现一个固定尺寸Padding的静态卷积模块
python
class Conv2dStaticSamePadding(nn.Conv2d):
"""
静态Same Padding的2D卷积层
功能说明:
在初始化时预先计算padding,创建固定的padding层
输出尺寸与动态版本完全一致,但支持ONNX导出
使用场景:
- 需要导出为ONNX格式时使用
- 输入图像尺寸固定的情况
- 生产部署推理时使用
优势:
- 支持ONNX导出(动态padding不支持)
- 计算效率略高(padding层已预先创建)
- 更容易进行模型优化和融合
注意事项:
- 必须在初始化时指定image_size参数
- 如果推理时输入尺寸与指定的image_size不同,输出尺寸可能不正确
示例:
>>> # 创建用于224x224输入的卷积层
>>> conv = Conv2dStaticSamePadding(
... in_channels=32, out_channels=64,
... kernel_size=3, stride=2,
... image_size=224 # 必须指定
... )
>>> x = torch.randn(1, 32, 224, 224)
>>> output = conv(x) # 形状: [1, 64, 112, 112]
>>>
>>> # 导出为ONNX
>>> torch.onnx.export(conv, x, "conv.onnx")
"""
def __init__(
self,
in_channels: int,
out_channels: int,
kernel_size: Union[int, Tuple[int, int]],
stride: Union[int, Tuple[int, int]] = 1,
image_size: Optional[Union[int, Tuple[int, int]]] = None,
**kwargs
):
"""
初始化静态Same Padding卷积层
参数:
in_channels (int): 输入通道数
out_channels (int): 输出通道数
kernel_size (int | tuple): 卷积核大小
stride (int | tuple): 步长,默认1
image_size (int | tuple): 输入图像尺寸(必须提供)
- int: 正方形图像
- tuple: (height, width)
**kwargs: 其他传递给nn.Conv2d的参数
- dilation: 膨胀率
- groups: 分组数
- bias: 是否使用偏置等
异常:
AssertionError: 如果image_size为None
"""
# 调用父类初始化
super().__init__(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=kernel_size,
stride=stride,
**kwargs
)
# 标准化stride为2元素列表
self.stride = (
[self.stride[0]] * 2
if len(self.stride) == 2
else self.stride
)
# image_size是必需参数
assert image_size is not None, \
"静态Same Padding卷积层必须指定image_size参数"
# 标准化image_size为(height, width)元组
if isinstance(image_size, int):
input_height, input_width = image_size, image_size
else:
input_height, input_width = image_size
# 获取卷积核尺寸
kernel_height, kernel_width = self.weight.size()[-2:]
# 获取步长
stride_height, stride_width = self.stride
# 计算输出尺寸(与动态版本相同的逻辑)
output_height = math.ceil(input_height / stride_height)
output_width = math.ceil(input_width / stride_width)
# 计算所需的padding
# 公式与动态版本完全一致
pad_height = max(
(output_height - 1) * self.stride[0] +
(kernel_height - 1) * self.dilation[0] + 1 -
input_height,
0
)
pad_width = max(
(output_width - 1) * self.stride[1] +
(kernel_width - 1) * self.dilation[1] + 1 -
input_width,
0
)
# 创建静态padding层
if pad_height > 0 or pad_width > 0:
# 使用ZeroPad2d创建固定的padding层
# padding顺序: (left, right, top, bottom)
self.static_padding = nn.ZeroPad2d(
(
pad_width // 2, # 左
pad_width - pad_width // 2, # 右
pad_height // 2, # 上
pad_height - pad_height // 2 # 下
)
)
else:
# 不需要padding时,使用Identity(直接返回输入)
self.static_padding = nn.Identity()
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
前向传播函数
参数:
x (torch.Tensor): 输入张量,形状 [B, C, H, W]
注意: H和W应该与初始化时的image_size相同
返回:
torch.Tensor: 输出张量,形状 [B, out_channels, H', W']
计算流程:
1. 应用预先创建的static_padding层
2. 执行标准卷积操作
优势:
- 整个前向传播是纯粹的nn.Module操作
- 没有动态计算,完全静态
- 支持ONNX等格式导出
"""
# 应用静态padding(如果需要)
x = self.static_padding(x)
# 执行标准卷积
output = F.conv2d(
input=x,
weight=self.weight,
bias=self.bias,
stride=self.stride,
padding=self.padding,
dilation=self.dilation,
groups=self.groups
)
return output
与上面动态版本不同的是,这里的Padding在构造函数里就已经算好了
之后依然是遵循我们的OOP思想,设计一个工厂函数,以及实现我们的计算o=ceil(\frac{i}{stride})的函数
python
def get_same_padding_conv2d(image_size: Optional[Union[int, Tuple[int, int]]] = None):
"""
Same Padding卷积层工厂函数
说明:
根据是否提供image_size参数,返回合适的Conv2d类
参数:
image_size (int | tuple | None): 输入图像尺寸
- None: 返回动态padding版本(用于训练)
- int/tuple: 返回静态padding版本(用于导出)
返回:
type: Conv2d类(不是实例)
- image_size=None -> Conv2dDynamicSamePadding
- image_size!=None -> functools.partial(Conv2dStaticSamePadding, image_size=image_size)
使用示例:
>>> # 训练时 - 动态padding
>>> Conv2d = get_same_padding_conv2d(image_size=None)
>>> conv = Conv2d(in_channels=32, out_channels=64, kernel_size=3)
>>>
>>> # 导出时 - 静态padding
>>> Conv2d = get_same_padding_conv2d(image_size=224)
>>> conv = Conv2d(in_channels=32, out_channels=64, kernel_size=3)
>>>
>>> # 在模型中使用
>>> class MyModel(nn.Module):
... def __init__(self, image_size=None):
... super().__init__()
... Conv2d = get_same_padding_conv2d(image_size)
... self.conv = Conv2d(32, 64, 3, stride=2)
"""
if image_size is None:
# 动态padding版本 - 用于训练和可变尺寸输入
return Conv2dDynamicSamePadding
else:
# 静态padding版本 - 用于ONNX导出和固定尺寸推理
# 使用functools.partial绑定image_size参数
from functools import partial
return partial(Conv2dStaticSamePadding, image_size=image_size)
def calculate_output_image_size(
input_image_size: Optional[Union[int, Tuple[int, int]]],
stride: Union[int, Tuple[int, int]]
) -> Optional[Tuple[int, int]]:
"""
计算卷积层输出图像尺寸
功能说明:
在使用Same Padding时,计算卷积后的输出尺寸
用于静态padding的预计算
参数:
input_image_size (int | tuple | None): 输入图像尺寸
- int: 正方形图像
- tuple: (height, width)
- None: 返回None(动态情况)
stride (int | tuple): 卷积步长
- int: 高度和宽度使用相同步长
- tuple: (stride_h, stride_w)
返回:
tuple[int, int] | None: 输出图像尺寸 (height, width)
如果输入为None,返回None
计算公式:
output_height = ceil(input_height / stride_height)
output_width = ceil(input_width / stride_width)
示例:
>>> # 224x224输入,步长2
>>> calculate_output_image_size(224, 2)
[112, 112]
>>>
>>> # 非正方形输入
>>> calculate_output_image_size((100, 200), (2, 4))
[50, 50]
>>>
>>> # 动态情况
>>> calculate_output_image_size(None, 2)
None
"""
# 如果输入为None,直接返回None(动态情况)
if input_image_size is None:
return None
# 标准化为(height, width)元组
if isinstance(input_image_size, int):
image_height, image_width = input_image_size, input_image_size
else:
image_height, image_width = input_image_size
# 标准化stride
stride = stride if isinstance(stride, int) else stride[0]
# 计算输出尺寸(向上取整)
image_height = int(math.ceil(image_height / stride))
image_width = int(math.ceil(image_width / stride))
return image_height, image_width
实现随机残差丢弃
Drop Connect技术,也称为随机深度(Stochastic Depth),和DropOut不一样的是,在训练时不丢单个神经元,而是随机丢弃整个层的输出
它的原理是以概率p随机将整个批次中的某些样本置零,具体地
- 用一个
keep_prob作为保留该特征图的概率,keep\_prob=1-drop\_rate - 每个特征图整体加一个[0,1)均匀分布的随机数,再加上keep_prob,这样元素的范围就来到了[0,2)
- 向下取整,得到0/1 Mask(每个元素要么取整到0要么取整到1)
- output = \frac{input\times mask}{keep\_prob}
第四步这样做的原因是要保证期望不变,推导其期望
::: align-center
\begin{aligned}E[output]=E[\frac{input\times mask}{keep\_prob}]\end{aligned}
:::
使用期望的线性性
::: align-center
\begin{aligned}E[output]=\frac{input\times E[mask]}{keep\_prob}\end{aligned}
:::
input和keep_prob都是定值,直接拿出,而mask是一个0-1的概率分布,它的PMF为
::: align-center
\left\{\begin{matrix}1&&p=keep\_prob\\0&&p=1-keep\_prob\end{matrix}\right .
:::
所以它的期望E[mask]=keep\_prob,和分母的抵消,最后得到
::: align-center
E[output]=E[input]
:::
python
def drop_connect(inputs: torch.Tensor, drop_rate: float, training: bool) -> torch.Tensor:
"""
Drop Connect正则化函数
功能说明:
在训练时以一定概率随机丢弃整个样本的连接
与Dropout的区别:Drop Connect丢弃整个样本,Dropout丢弃单个神经元
原理:
1. 训练模式: 以概率p随机将整个批次中的某些样本置零
2. 推理模式: 直接返回输入(不做任何处理)
3. 使用keep_prob进行缩放,保持期望值不变
使用场景:
- 在残差连接中使用,实现随机深度
- 提升模型泛化能力,防止过拟合
参数:
inputs (torch.Tensor): 输入张量,形状 [B, C, H, W]
- B: batch size
- C: 通道数
- H: 高度
- W: 宽度
drop_rate (float): 丢弃概率,范围 [0.0, 1.0]
- 0.0: 不丢弃任何样本
- 0.2: 丢弃20%的样本
- 1.0: 丢弃所有样本(不应使用)
training (bool): 是否处于训练模式
- True: 应用drop connect
- False: 直接返回输入(推理模式)
返回:
torch.Tensor: 处理后的张量,形状与输入相同
流程:
1. 检查drop_rate是否在有效范围[0, 1]
2. 如果是推理模式,直接返回输入
3. 计算保留概率: keep_prob = 1 - drop_rate
4. 生成随机mask:
- 创建形状为[B, 1, 1, 1]的随机张量
- 个样本一个随机数,广播到整个特征图
- 加上keep_prob后向下取整,得到0/1 mask
5. 应用mask并缩放: output = input * mask / keep_prob
示例:
>>> # 训练时使用
>>> x = torch.randn(32, 128, 7, 7)
>>> output = drop_connect(x, drop_rate=0.2, training=True)
>>> # 约20%的样本被置零,其余样本被缩放1.25倍
>>>
>>> # 推理时
>>> output = drop_connect(x, drop_rate=0.2, training=False)
>>> # 直接返回x,不做任何修改
为什么形状是[B, 1, 1, 1]:
- 对每个样本生成一个随机数
- 整个样本要么全部保留,要么全部丢弃
- 广播机制自动扩展到[B, C, H, W]
期望值不变性证明:
设mask为0/1随机变量,P(mask=1) = keep_prob
E[output] = E[input * mask / keep_prob]
= input * E[mask] / keep_prob
= input * keep_prob / keep_prob
= input
"""
# 参数验证: drop_rate必须在[0, 1]范围内
assert 0 <= drop_rate <= 1, \
f"丢弃概率必须在[0, 1]范围内,当前值: {drop_rate}"
# 推理模式:不应用drop connect
if not training:
return inputs
# 计算保留概率
keep_prob = 1 - drop_rate
# 获取批次大小
batch_size = inputs.shape[0]
# 生成随机二值mask
# 1. 创建形状为[B, 1, 1, 1]的张量,初始值为keep_prob
# 2. 加上[0, 1)均匀分布的随机数
# 3. 向下取整得到二值mask
#
# 例如: keep_prob = 0.8
# - random_tensor = 0.8 + rand([B, 1, 1, 1]) # 范围[0.8, 1.8)
# - binary_tensor = floor(random_tensor) # 80%概率为1,20%概率为0
random_tensor = keep_prob
random_tensor += torch.rand(
[batch_size, 1, 1, 1],
dtype=inputs.dtype, # 使用与输入相同的数据类型
device=inputs.device # 使用与输入相同的设备(CPU/GPU)
)
binary_tensor = torch.floor(random_tensor)
# 应用mask并进行缩放
# 除以keep_prob是为了保持期望值不变
# 例如: keep_prob=0.8时,保留的样本会乘以1.25 (1/0.8)
output = inputs / keep_prob * binary_tensor
return output
class DropConnect(torch.nn.Module):
"""
Drop Connect的nn.Module封装
示例:
>>> # 在模型中使用
>>> class MyBlock(nn.Module):
... def __init__(self, drop_rate=0.2):
... super().__init__()
... self.conv = nn.Conv2d(32, 64, 3)
... self.drop_connect = DropConnect(drop_rate)
...
... def forward(self, x):
... out = self.conv(x)
... out = self.drop_connect(out)
... return out + x # 残差连接
"""
def __init__(self, drop_rate: float = 0.0):
"""
初始化Drop Connect层
参数:
drop_rate (float): 丢弃概率,范围[0, 1]
"""
super().__init__()
self.drop_rate = drop_rate
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
前向传播
参数:
x (torch.Tensor): 输入张量
返回:
torch.Tensor: 处理后的张量
"""
return drop_connect(x, self.drop_rate, self.training)
def extra_repr(self) -> str:
"""
返回层的额外描述信息(用于print(model))
返回:
str: 描述字符串
"""
return f'drop_rate={self.drop_rate}'
而对于丢弃率drop_rate,EfficientNet考虑随着层数的递增逐渐的线性提高丢弃率,三句话表达
- 浅层特征更底层,需要更稳定
- 深层特征更抽象,对丢弃更鲁棒
- 渐进式正则化,这样训练更稳定
python
def get_drop_connect_rate(
block_idx: int,
total_blocks: int,
base_drop_rate: float = 0.2
) -> float:
"""
计算线性递增的drop connect率
说明:
EfficientNet使用线性递增的drop connect率
靠近输出的Block使用更高的丢弃率
原理:
drop_rate = base_rate * (block_idx / total_blocks)
- 第一个Block: drop_rate ≈ 0
- 最后一个Block: drop_rate = base_rate
为什么线性递增:
- 浅层特征更底层,需要更稳定
- 深层特征更抽象,对丢弃更鲁棒
- 渐进式正则化,这样训练更稳定
参数:
block_idx (int): 当前Block索引(从0开始)
total_blocks (int): 总Block数量
base_drop_rate (float): 最大丢弃率,默认0.2
返回:
float: 当前Block的drop connect率
计算示例:
>>> # 假设有7个Block,base_rate=0.2
>>> get_drop_connect_rate(0, 7, 0.2)
0.0 # 第一个Block
>>>
>>> get_drop_connect_rate(3, 7, 0.2)
0.086 # 中间Block: 0.2 * (3/7)
>>>
>>> get_drop_connect_rate(6, 7, 0.2)
0.171 # 最后一个Block: 0.2 * (6/7)
使用示例:
>>> for idx, block in enumerate(blocks):
... drop_rate = get_drop_connect_rate(idx, len(blocks))
... output = block(x, drop_connect_rate=drop_rate)
"""
if not base_drop_rate:
return 0.0
# 线性插值计算当前Block的drop rate
# 公式: drop_rate = base_rate * (block_idx / total_blocks)
drop_rate = base_drop_rate * float(block_idx) / total_blocks
return drop_rate
实现MBConvBlock
EfficientNet的MBConv Block分为5个大块
-
扩展阶段 (Expansion phase): 1x1卷积扩展通道数,扩大特征表述空间
传统的ResNet是高维 (256) → 低维 (64) → 高维 (256),这样在中间的低维空间提取特征变为出现“BottomNeck”瓶颈,而EfficientNet是低维 (32) → 高维 (192) → 低维 (32),这样在更高维空间进行特征提取,信息会更丰富 -
深度卷积 (Depthwise convolution): KxK深度可分离卷积,提高运算性能
传统的卷积为- 输入: 192 个通道(Channel) × (H×W)
- 卷积核: 192×192×3×3 = 331,776 参数
- 输出: 192 通道
但DepthWise Convolution考虑
- 输入: 192 通道
- ↓ 每个通道独立卷积
- 卷积核: 192×1×3×3 = 1,728 参数
- ↓
- 输出: 192 通道
换句话说,传统卷积是每一个卷积核都会卷一遍所有通道,每个输出通道的特征图是所有输入通道与对应卷积核的加权和,在一次操作中同时完成空间信息提取和通道信息融合,而Depthwise Convolution只做空间卷积,不做通道融合,每个输入通道单独与自己的卷积核进行卷积,输出通道数与输入通道数相同,这样对于上面那个例子就节省了接近192倍的参数量,同时输出的通道不会改变
但是不做通道信息融合能行嘛?肯定不行啊,上下文的语义关系扔掉了这效果得烂爆了,所以就有了 -
SE(Squeeze-and-Excitation)通道注意力机制模块,它的大致操作是
- 输入: (192, 7, 7)
- ↓
- [Squeeze] 全局平均池化
- ↓ (192, 1, 1) - 每个通道一个值
- [Excitation] 降维 → 升维
- ↓
- Conv(192 → 48): 学习通道间依赖
- ↓
- Conv(48 → 192): 恢复通道数
- ↓
- Sigmoid: 生成 0~1 的注意力分布权重
- ↓ (192, 1, 1)
- [Scale] 特征重标定
- ↓
- features * weights
- ↓
- 输出: (192, 7, 7)
其实就是先把特征过一个池化,把每一个通道的特征压缩成一个数,表示这个通道的“整体特征”,之后用卷积核强行把这些特征“压在一起”,再用卷积核“重新拽开”,这样不同特征黏在一块又重新被拉开的时候里面就多少沾点别人的东西了,过一个Sigmoid函数转化为概率作为注意力加权分布,用每一个权重与原来的每一个通道的7x7特征图整体逐元素相乘,这样不同通道的特征图就获得了不同的权重
也有用FC做SE模块的,这里引用的项目是用卷积做的 -
逐点卷积 (Pointwise convolution): 1x1卷积恢复通道数,和1.中介绍的类似,这里把特征重新压到较低维的空间
-
当输入输出尺寸相同时,进行残差连接 (Skip connection): ResNet的经典思想,避免前面一堆操作导致梯度消失,特征弥散等不好的东西,同时避免训练时前面的模块训练误差过大导致后侧结果偏差累计导致误差越来越大的问题。同时采用了Drop Connect以一定概率丢弃整个残差分支防止过拟合
其中每一步还需要过一下批量正则化(Batch Norm, BN)把期望和方差调正(期望调0,方差调1)和Swish激活函数,这里不再说明
python
class MBConvBlock(nn.Module):
"""
Mobile Inverted Residual Bottleneck Block (MBConv Block)
EfficientNet的核心组件,由以下部分组成:
1. 扩展阶段 (Expansion phase): 1x1卷积扩展通道数
2. 深度卷积 (Depthwise convolution): KxK深度可分离卷积
3. SE模块 (Squeeze-and-Excitation): 通道注意力机制
4. 逐点卷积 (Pointwise convolution): 1x1卷积恢复通道数
5. 残差连接 (Skip connection): 当输入输出尺寸相同时
参数说明:
block_config (BlockConfig): Block配置对象
包含kernel_size, stride, expand_ratio等参数
global_config (GlobalConfig): 全局配置对象
包含batch_norm参数、drop_connect_rate等
image_size (Optional[int]): 输入图像尺寸
用于计算静态padding
None时使用动态padding
属性:
has_se (bool): 是否使用SE模块
id_skip (bool): 是否使用残差连接
使用示例:
>>> from efficientnet_oop.config.model_config import BlockConfig, GlobalConfig
>>>
>>> # 创建配置
>>> block_cfg = BlockConfig(
... kernel_size=3, stride=1, expand_ratio=6,
... input_filters=32, output_filters=64, se_ratio=0.25
... )
>>> global_cfg = GlobalConfig()
>>>
>>> # 创建Block
>>> block = MBConvBlock(block_cfg, global_cfg, image_size=224)
>>>
>>> # 前向传播
>>> x = torch.randn(1, 32, 56, 56)
>>> output = block(x, drop_connect_rate=0.2)
>>> print(output.shape) # torch.Size([1, 64, 56, 56])
"""
def __init__(
self,
block_config: BlockConfig,
global_config: GlobalConfig,
image_size: Optional[int] = None
):
super().__init__()
# 保存配置
self._block_config = block_config
self._global_config = global_config
# BatchNorm参数
# 注意: PyTorch的momentum定义与TensorFlow相反
# TensorFlow: running_mean = momentum * old + (1-momentum) * new
# PyTorch: running_mean = (1-momentum) * old + momentum * new
self._bn_momentum = global_config.get_pytorch_bn_momentum()
self._bn_epsilon = global_config.batch_norm_epsilon
# 判断是否使用SE模块
# se_ratio必须在(0, 1]范围内
self.has_se = (
block_config.se_ratio is not None and
0 < block_config.se_ratio <= 1
)
# 是否使用残差连接
self.id_skip = block_config.id_skip
# ========================================
# 扩展阶段 (Expansion Phase)
# ========================================
# 输入通道数
inp = block_config.input_filters
# 输出通道数 = 输入通道数 × 扩展比例
oup = block_config.input_filters * block_config.expand_ratio
# 只有当expand_ratio != 1时才进行扩展
# 例如: MBConv1不需要扩展,MBConv6需要扩展6倍
if block_config.expand_ratio != 1:
Conv2d = get_same_padding_conv2d(image_size=image_size)
self._expand_conv = Conv2d(
in_channels=inp,
out_channels=oup,
kernel_size=1, # 1x1卷积
stride=1,
bias=False, # 后面有BN,不需要bias
image_size=image_size
)
self._bn0 = nn.BatchNorm2d(
num_features=oup,
momentum=self._bn_momentum,
eps=self._bn_epsilon
)
# ========================================
# 深度卷积阶段 (Depthwise Convolution)
# ========================================
# 使用groups=oup实现深度可分离卷积
# 每个通道独立卷积,大幅减少计算量
k = block_config.kernel_size # 卷积核大小 (3 or 5)
s = block_config.stride # 步长 (1 or 2)
Conv2d = get_same_padding_conv2d(image_size=image_size)
self._depthwise_conv = Conv2d(
in_channels=oup,
out_channels=oup,
groups=oup, # 关键: groups=oup使其成为深度卷积
kernel_size=k,
stride=s,
bias=False,
image_size=image_size
)
self._bn1 = nn.BatchNorm2d(
num_features=oup,
momentum=self._bn_momentum,
eps=self._bn_epsilon
)
# 计算深度卷积后的特征图尺寸
# 用于后续层的padding计算
image_size = calculate_output_image_size(image_size, s)
# ========================================
# Squeeze-and-Excitation注意力机制模块 (SE Module)
# ========================================
if self.has_se:
# SE模块的压缩通道数
# 例如: input_filters=32, se_ratio=0.25
# num_squeezed = max(1, int(32 * 0.25)) = 8
num_squeezed_channels = max(
1,
int(block_config.input_filters * block_config.se_ratio)
)
# SE的Reduce层: 降维
# 使用1x1卷积实现全连接功能
# SE模块在1x1的特征图上操作,不需要padding
Conv2d = get_same_padding_conv2d(image_size=(1, 1))
self._se_reduce = Conv2d(
in_channels=oup,
out_channels=num_squeezed_channels,
kernel_size=1,
image_size=(1, 1) # SE在pooling后操作,尺寸为1x1
)
# SE的Expand层: 升维
# 恢复到原始通道数
self._se_expand = Conv2d(
in_channels=num_squeezed_channels,
out_channels=oup,
kernel_size=1,
image_size=(1, 1)
)
# ========================================
# 逐点卷积阶段 (Pointwise Convolution)
# ========================================
# 将通道数投影回目标通道数
final_oup = block_config.output_filters
Conv2d = get_same_padding_conv2d(image_size=image_size)
self._project_conv = Conv2d(
in_channels=oup,
out_channels=final_oup,
kernel_size=1, # 1x1卷积
stride=1,
bias=False,
image_size=image_size
)
self._bn2 = nn.BatchNorm2d(
num_features=final_oup,
momentum=self._bn_momentum,
eps=self._bn_epsilon
)
# ========================================
# 激活函数
# ========================================
# 默认使用内存高效版本的Swish
# Swish(x) = x * sigmoid(x)
self._swish = MemoryEfficientSwish()
def forward(
self,
inputs: torch.Tensor,
drop_connect_rate: Optional[float] = None
) -> torch.Tensor:
"""
MBConvBlock的前向传播
流程说明:
1. Expansion: 扩展通道数 (如果expand_ratio != 1)
2. Depthwise: 深度可分离卷积
3. SE Module: 通道注意力 (如果has_se=True)
4. Projection: 投影到目标通道数
5. Skip Connection: 残差连接 (如果满足条件)
参数:
inputs (torch.Tensor): 输入张量
shape: (batch_size, in_channels, height, width)
drop_connect_rate (Optional[float]): Drop Connect概率
范围: [0.0, 1.0]
训练时随机丢弃整个分支,增强正则化
None或0表示不使用
返回:
torch.Tensor: 输出张量
shape: (batch_size, out_channels, new_height, new_width)
"""
x = inputs
# ========================================
# 扩展阶段 (Expansion Phase)
# ========================================
# 只有expand_ratio != 1时才执行
# 例如: MBConv1跳过此阶段,MBConv6执行此阶段
if self._block_config.expand_ratio != 1:
# 1x1卷积扩展通道
x = self._expand_conv(inputs)
# BatchNorm归一化
x = self._bn0(x)
# Swish激活: x * sigmoid(x)
x = self._swish(x)
# ========================================
# 深度卷积阶段 (Depthwise Convolution)
# ========================================
# KxK深度可分离卷积
# 每个通道独立进行空间卷积
x = self._depthwise_conv(x)
# BatchNorm归一化
x = self._bn1(x)
# Swish激活
x = self._swish(x)
# ========================================
# SE模块 (Squeeze-and-Excitation)
# ========================================
if self.has_se:
# Squeeze: 全局平均池化
# (B, C, H, W) -> (B, C, 1, 1)
# 将每个通道的空间信息压缩为一个值
x_squeezed = F.adaptive_avg_pool2d(x, 1)
# Excitation-Reduce: 降维
# (B, C, 1, 1) -> (B, C//r, 1, 1)
# r是se_ratio的倒数,例如se_ratio=0.25时r=4
x_squeezed = self._se_reduce(x_squeezed)
x_squeezed = self._swish(x_squeezed)
# Excitation-Expand: 升维
# (B, C//r, 1, 1) -> (B, C, 1, 1)
x_squeezed = self._se_expand(x_squeezed)
# Scale: 使用sigmoid生成通道权重
# 将权重应用到原始特征图
# x = x * sigmoid(x_squeezed)
# 实现通道级别的注意力机制
x = torch.sigmoid(x_squeezed) * x
# ========================================
# 投影阶段 (Projection Phase)
# ========================================
# 1x1卷积投影到目标通道数
x = self._project_conv(x)
# BatchNorm归一化
# 注意: 这里没有激活函数(Linear Bottleneck)
# 原因: 保留更多信息,避免ReLU造成的信息损失
x = self._bn2(x)
# ========================================
# 残差连接 (Skip Connection)
# ========================================
# 残差连接的条件:
# 1. id_skip=True (配置允许)
# 2. stride=1 (特征图尺寸不变)
# 3. input_channels == output_channels (通道数相同)
input_filters = self._block_config.input_filters
output_filters = self._block_config.output_filters
if (
self.id_skip and
self._block_config.stride == 1 and
input_filters == output_filters
):
# Drop Connect: 随机深度 (Stochastic Depth)
# 训练时以一定概率丢弃整个残差分支
# 类似Dropout但作用于整个层而非单个神经元
# 目的:
# 1. 正则化,防止过拟合
# 2. 缩短有效网络深度,加速训练
# 3. 集成效果,类似训练多个子网络
if drop_connect_rate:
x = drop_connect(
x,
drop_rate=drop_connect_rate,
training=self.training
)
# 残差连接: output = F(x) + x
# 缓解梯度消失,便于训练深层网络
x = x + inputs
return x
def set_swish(self, memory_efficient: bool = True):
"""
设置Swish激活函数的实现方式
功能说明:
- memory_efficient=True: 使用自定义autograd函数
优点: 节省显存,适合训练
缺点: 不支持ONNX导出
- memory_efficient=False: 使用标准实现
优点: 支持ONNX导出,适合部署
缺点: 显存占用较大
参数:
memory_efficient (bool): 是否使用内存高效版本
默认True(训练时)
导出ONNX时设为False
使用示例:
>>> # 训练阶段
>>> block.set_swish(memory_efficient=True)
>>>
>>> # 导出ONNX前
>>> block.set_swish(memory_efficient=False)
>>> torch.onnx.export(model, ...)
"""
if memory_efficient:
self._swish = MemoryEfficientSwish()
else:
self._swish = Swish()
实现EfficientNet的缩放策略
EfficientNet还有一大亮点,就是谷歌在证明了单独缩放某一个超参对网络性能提高有限后,同时考虑了缩放网络的宽度、深度和输入分辨率。三个参数都使用不同的缩放因子进行缩放。谷歌在约束计算资源后搜索得到了B0模型,之后的模型都是搜索这些缩放因子得到的
python
def round_filters(filters: int, width_coefficient: float, depth_divisor: int = 8, min_depth: Optional[int] = None) -> int:
"""
根据宽度系数调整并舍入通道数
功能说明:
EfficientNet使用复合缩放策略同时缩放网络的宽度、深度和分辨率
此函数实现宽度缩放,确保通道数是depth_divisor的倍数
流程:
1. 通道数乘以宽度系数: filters *= width_coefficient
2. 向最近的depth_divisor倍数舍入
3. 防止舍入导致通道数减少超过10%
原理:
宽度缩放可以增加每层的特征图通道数,提升模型容量
参数:
filters (int): 原始通道数
例如: 32, 64, 128等
width_coefficient (float): 宽度缩放系数
- 1.0: 不缩放(B0)
- 1.1: 增加10%宽度(B2)
- 2.0: 双倍宽度(B7)
depth_divisor (int): 通道数必须是此值的倍数,默认8
目的: 优化GPU/TPU计算效率
min_depth (int | None): 最小通道数,默认None
None时使用depth_divisor作为最小值
返回:
int: 调整后的通道数(depth_divisor的倍数)
计算示例:
>>> # B0: width=1.0, 不缩放
>>> round_filters(32, width_coefficient=1.0)
32
>>>
>>> # B7: width=2.0, 双倍宽度
>>> round_filters(32, width_coefficient=2.0)
64 # 32 * 2.0 = 64
>>>
>>> # B2: width=1.1, 增加10%
>>> round_filters(32, width_coefficient=1.1)
32 # 32 * 1.1 = 35.2 -> 向8的倍数舍入 -> 32
>>>
>>> round_filters(64, width_coefficient=1.1)
72 # 64 * 1.1 = 70.4 -> 向8的倍数舍入 -> 72
舍入算法详解:
1. 计算缩放后的值: scaled = filters * width_coefficient
2. 向最近的divisor倍数舍入:
new_filters = max(min_depth, int(scaled + divisor/2) // divisor * divisor)
3. 防止过度舍入:
if new_filters < 0.9 * scaled: new_filters += divisor
"""
# 如果宽度系数为None或为1,直接返回原始通道数
if not width_coefficient:
return filters
# 应用宽度缩放
filters *= width_coefficient
# 确定最小通道数
# 如果未指定min_depth,使用depth_divisor作为下限
min_depth = min_depth or depth_divisor
# 向最近的depth_divisor倍数舍入
# 算法: (filters + divisor/2) // divisor * divisor
# 例如: filters=35, divisor=8
# (35 + 4) // 8 * 8 = 39 // 8 * 8 = 4 * 8 = 32
new_filters = max(
min_depth, # 保证不小于最小值
int(filters + depth_divisor / 2) // depth_divisor * depth_divisor
)
# 防止舍入导致通道数减少超过10%
# 如果new_filters < 0.9 * filters,增加一个divisor
# 例如: filters=70, new_filters=64
# 64 < 0.9 * 70 = 63? No -> 保持64
# 但如果filters=71, new_filters=72后变64
# 64 < 0.9 * 71 = 63.9? No -> 保持64
if new_filters < 0.9 * filters:
new_filters += depth_divisor
return int(new_filters)
def round_repeats(repeats: int, depth_coefficient: float) -> int:
"""
根据深度系数调整并舍入层数
说明:
EfficientNet的深度缩放策略,调整每个Block的重复次数
通过增加层数来提升模型容量
原理:
- 层数乘以深度系数后向上取整
- 深度缩放比宽度缩放更能提升性能,但计算开销也更大
参数:
repeats (int): Block的原始重复次数
例如: MBConv的重复次数,如1, 2, 3等
depth_coefficient (float): 深度缩放系数
- 1.0: 不缩放(B0)
- 1.1: 增加10%深度(B1)
- 3.1: 增加3.1倍深度(B7)
返回:
int: 调整后的重复次数(向上取整)
计算示例:
>>> # B0: depth=1.0, 不缩放
>>> round_repeats(2, depth_coefficient=1.0)
2
>>>
>>> # B1: depth=1.1, 增加10%
>>> round_repeats(2, depth_coefficient=1.1)
3 # ceil(2 * 1.1) = ceil(2.2) = 3
>>>
>>> # B7: depth=3.1, 大幅增加
>>> round_repeats(2, depth_coefficient=3.1)
7 # ceil(2 * 3.1) = ceil(6.2) = 7
为什么向上取整:
- 保守策略,确保模型容量不会因舍入而降低
- 向下取整可能导致某些Block完全消失
"""
# 如果深度系数为None或为1,直接返回原始重复次数
if not depth_coefficient:
return repeats
# 应用深度缩放并向上取整
# 使用math.ceil确保至少达到预期深度
return int(math.ceil(depth_coefficient * repeats))
装配模型
大致操作为:
-
Stem (词干层):
- 3x3卷积,stride=2
- 将输入从3通道扩展到32通道(缩放后)
- 特征图尺寸减半: 224x224 -> 112x112
-
Blocks (主体层):
- 7个Stage,每个Stage包含多个MBConv Block
- 使用不同的kernel_size (3x3或5x5)
- 通过stride=2进行下采样
- 最终特征图: 7x7 (对于224输入)
-
Head (头部层):
- 1x1卷积
- 将通道数扩展到1280(缩放后)
- 提取高级语义特征
-
Classifier (分类层):
- 全局平均池化
- Dropout正则化
- 全连接层输出类别概率
python
class EfficientNet(nn.Module):
"""
EfficientNet主模型类
说明:
EfficientNet = Stem + Blocks + Head + Classifier
1. Stem (词干层):
- 3x3卷积,stride=2
- 将输入从3通道扩展到32通道(缩放后)
- 特征图尺寸减半: 224x224 -> 112x112
2. Blocks (主体层):
- 7个Stage,每个Stage包含多个MBConv Block
- 使用不同的kernel_size (3x3或5x5)
- 通过stride=2进行下采样
- 最终特征图: 7x7 (对于224输入)
3. Head (头部层):
- 1x1卷积
- 将通道数扩展到1280(缩放后)
- 提取高级语义特征
4. Classifier (分类层):
- 全局平均池化
- Dropout正则化
- 全连接层输出类别概率
缩放策略:
EfficientNet使用复合缩放同时调整:
- Width: 通道数 (width_coefficient)
- Depth: 层数 (depth_coefficient)
- Resolution: 输入分辨率 (image_size)
约束优化问题:
公式: depth = α^φ, width = β^φ, resolution = γ^φ
s.t. α·β²·γ² ≈ 2, α≥1, β≥1, γ≥1
参数说明:
block_configs (List[BlockConfig]): Block配置列表
每个配置定义一个Stage的结构
global_config (GlobalConfig): 全局配置对象
包含缩放系数、BatchNorm参数等
属性:
_blocks (nn.ModuleList): MBConv Block列表
_conv_stem: Stem卷积层
_conv_head: Head卷积层
_fc: 全连接分类层
使用示例:
>>> from efficientnet_oop.config.model_config import get_efficientnet_config
>>>
>>> # 创建B0模型
>>> block_configs, global_config = get_efficientnet_config('efficientnet-b0')
>>> model = EfficientNet(block_configs, global_config)
>>>
>>> # 前向传播
>>> x = torch.randn(1, 3, 224, 224)
>>> output = model(x)
>>> print(output.shape) # torch.Size([1, 1000])
>>>
>>> # 提取特征
>>> features = model.extract_features(x)
>>> print(features.shape) # torch.Size([1, 1280, 7, 7])
"""
def __init__(
self,
block_configs: List[BlockConfig],
global_config: GlobalConfig
):
super().__init__()
# 参数验证
if not isinstance(block_configs, list):
raise TypeError('block_configs必须是列表类型')
if len(block_configs) == 0:
raise ValueError('block_configs不能为空')
# 保存配置
self._block_configs = block_configs
self._global_config = global_config
# 对外暴露,用于访问配置
self.global_config = global_config
# BatchNorm参数
bn_momentum = global_config.get_pytorch_bn_momentum()
bn_epsilon = global_config.batch_norm_epsilon
# 获取图像尺寸用于静态padding计算
image_size = global_config.image_size
# ========================================
# Stem层 (初始卷积层)
# ========================================
# 功能: 初始特征提取
# 输入: (B, 3, H, W) - RGB图像
# 输出: (B, 32*width, H/2, W/2)
# Stem的输出通道数
# 基础通道数32,根据width_coefficient缩放
# 例如: B0: 32, B7: 64
out_channels = round_filters(
32,
global_config.width_coefficient,
global_config.depth_divisor,
global_config.min_depth
)
# Stem卷积: 3x3, stride=2
Conv2d = get_same_padding_conv2d(image_size=image_size)
self._conv_stem = Conv2d(
in_channels=3, # RGB输入
out_channels=out_channels,
kernel_size=3,
stride=2, # 下采样2倍
bias=False,
image_size=image_size
)
# Stem的BatchNorm
self._bn0 = nn.BatchNorm2d(
num_features=out_channels,
momentum=bn_momentum,
eps=bn_epsilon
)
# 更新特征图尺寸
# 224 -> 112 (stride=2)
image_size = calculate_output_image_size(image_size, 2)
# ========================================
# Blocks层 (MBConv Block堆叠)
# ========================================
# 功能: 主要特征提取
# 架构: 7个Stage,每个Stage包含多个相同配置的Block
self._blocks = nn.ModuleList([])
# 遍历每个Stage配置
for block_config in block_configs:
# 应用宽度缩放: 调整输入输出通道数
# 例如: B0的16通道, B7变为32通道
scaled_input_filters = round_filters(
block_config.input_filters,
global_config.width_coefficient,
global_config.depth_divisor,
global_config.min_depth
)
scaled_output_filters = round_filters(
block_config.output_filters,
global_config.width_coefficient,
global_config.depth_divisor,
global_config.min_depth
)
# 应用深度缩放: 调整重复次数
# 例如: B0重复2次, B7可能重复6次
scaled_num_repeat = round_repeats(
block_config.num_repeat,
global_config.depth_coefficient
)
# 更新Block配置(不修改原始配置)
scaled_block_config = BlockConfig(
num_repeat=scaled_num_repeat,
kernel_size=block_config.kernel_size,
stride=block_config.stride,
expand_ratio=block_config.expand_ratio,
input_filters=scaled_input_filters,
output_filters=scaled_output_filters,
se_ratio=block_config.se_ratio,
id_skip=block_config.id_skip
)
# 添加第一个Block
# 第一个Block负责:
# 1. 可能的下采样 (stride可能为2)
# 2. 通道数变换 (input_filters -> output_filters)
self._blocks.append(
MBConvBlock(
scaled_block_config,
global_config,
image_size=image_size
)
)
# 更新特征图尺寸(如果有下采样)
image_size = calculate_output_image_size(
image_size,
scaled_block_config.stride
)
# 添加后续重复的Block
# 后续Block的特点:
# 1. stride固定为1 (不再下采样)
# 2. input_filters = output_filters (通道数不变)
if scaled_num_repeat > 1:
# 修改配置用于重复Block
repeat_block_config = BlockConfig(
num_repeat=scaled_num_repeat,
kernel_size=scaled_block_config.kernel_size,
stride=1, # 重复Block不下采样
expand_ratio=scaled_block_config.expand_ratio,
input_filters=scaled_block_config.output_filters, # 输入=输出
output_filters=scaled_block_config.output_filters,
se_ratio=scaled_block_config.se_ratio,
id_skip=scaled_block_config.id_skip
)
# 添加剩余的Block
for _ in range(scaled_num_repeat - 1):
self._blocks.append(
MBConvBlock(
repeat_block_config,
global_config,
image_size=image_size
)
)
# ========================================
# Head层 (顶部卷积层)
# ========================================
# 功能: 进一步提取高级特征
# 输入: 最后一个Block的输出
# 输出: (B, 1280*width, 7, 7)
# Head的输入通道数 = 最后一个Block的输出通道数
in_channels = scaled_block_config.output_filters
# Head的输出通道数
# 基础通道数1280,根据width_coefficient缩放
# 例如: B0: 1280, B7: 2560
out_channels = round_filters(
1280,
global_config.width_coefficient,
global_config.depth_divisor,
global_config.min_depth
)
# Head卷积: 1x1
Conv2d = get_same_padding_conv2d(image_size=image_size)
self._conv_head = Conv2d(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=1, # 1x1卷积,不改变空间尺寸
bias=False,
image_size=image_size
)
# Head的BatchNorm
self._bn1 = nn.BatchNorm2d(
num_features=out_channels,
momentum=bn_momentum,
eps=bn_epsilon
)
# ========================================
# Classifier层 (分类头)
# ========================================
# 功能: 将特征转换为类别概率
# 全局平均池化
# (B, C, H, W) -> (B, C, 1, 1)
self._avg_pooling = nn.AdaptiveAvgPool2d(1)
# 可选的分类头
# include_top=False时,模型仅作为特征提取器
if global_config.include_top:
# Dropout正则化
# 防止全连接层过拟合
self._dropout = nn.Dropout(global_config.dropout_rate)
# 全连接层
# (B, C) -> (B, num_classes)
self._fc = nn.Linear(out_channels, global_config.num_classes)
# ========================================
# 激活函数
# ========================================
# 默认使用内存高效版本的Swish
self._swish = MemoryEfficientSwish()
def forward(self, inputs: torch.Tensor) -> torch.Tensor:
"""
EfficientNet的前向传播
说明:
1. Stem: 初始卷积和下采样
2. Blocks: 主要特征提取
3. Head: 顶部卷积
4. Pooling: 全局平均池化
5. Classifier: Dropout + 全连接
参数:
inputs (torch.Tensor): 输入张量
shape: (batch_size, 3, height, width)
通常: (B, 3, 224, 224) for B0
返回:
torch.Tensor: 输出张量
include_top=True: (B, num_classes) - 类别logits
include_top=False: (B, C, 1, 1) - 池化后的特征
计算流程示例 (B0, 224x224输入):
输入: (1, 3, 224, 224)
Stem: (1, 32, 112, 112)
Blocks: (1, 320, 7, 7)
Head: (1, 1280, 7, 7)
Pool: (1, 1280, 1, 1)
Flatten: (1, 1280)
FC: (1, 1000)
"""
# 调用extract_features提取特征
x = self.extract_features(inputs)
# 全局平均池化
# (B, C, H, W) -> (B, C, 1, 1)
x = self._avg_pooling(x)
# 如果包含分类头
if self._global_config.include_top:
# 展平: (B, C, 1, 1) -> (B, C)
x = x.flatten(start_dim=1)
# Dropout正则化
x = self._dropout(x)
# 全连接层: (B, C) -> (B, num_classes)
x = self._fc(x)
return x
def extract_features(self, inputs: torch.Tensor) -> torch.Tensor:
"""
提取卷积特征(不包含池化和分类)
功能说明:
提取Head层之后的特征图
参数:
inputs (torch.Tensor): 输入张量
shape: (batch_size, 3, height, width)
返回:
torch.Tensor: 卷积特征
shape: (batch_size, 1280*width, H/32, W/32)
例如: B0输入224x224 -> 输出(B, 1280, 7, 7)
使用示例:
>>> model = EfficientNet(...)
>>> x = torch.randn(1, 3, 224, 224)
>>> features = model.extract_features(x)
>>> # 使用特征进行下游任务
>>> custom_head = nn.Linear(1280, 10)
>>> pooled = F.adaptive_avg_pool2d(features, 1).flatten(1)
>>> output = custom_head(pooled)
"""
# ========================================
# Stem层
# ========================================
x = self._conv_stem(inputs)
x = self._bn0(x)
x = self._swish(x)
# ========================================
# Blocks层
# ========================================
# 遍历所有MBConv Block
for idx, block in enumerate(self._blocks):
# 线性增长计算当前Block的drop_connect_rate
# 第一个Block: drop_rate = 0
# 最后一个Block: drop_rate = global_drop_connect_rate
drop_connect_rate = self._global_config.drop_connect_rate
if drop_connect_rate:
# 线性插值
# drop_rate = base_rate * (block_idx / total_blocks)
drop_connect_rate *= float(idx) / len(self._blocks)
# Block前向传播
x = block(x, drop_connect_rate=drop_connect_rate)
# ========================================
# Head层
# ========================================
x = self._conv_head(x)
x = self._bn1(x)
x = self._swish(x)
return x
全部评论 (0)
暂无评论,快来抢沙发吧~