@TOC
GoogLeNet
GoogLeNet 是由 Google 团队提出的一种深度卷积神经网络架构,主要特点包括:
- Inception 模块:这是 GoogLeNet 的核心组成部分,通过在一个层内并行使用不同大小的卷积核(1x1, 3x3, 5x5)和池化操作,然后将它们的结果拼接在一起,形成一个更丰富的特征表示。
- 辅助分类器:为了缓解梯度消失问题,GoogLeNet 在网络的中间层添加了两个辅助分类器,这些分类器在训练过程中提供额外的梯度信号,并且有助于正则化模型。
- 全局平均池化:在网络的最后,GoogLeNet 使用全局平均池化替代全连接层,这不仅减少了参数量,还提高了模型的泛化能力。
#coding:utf8
# Copyright 2023 longpeng2008. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# If you find any problem,please contact us
#
# longpeng2008to2012@gmail.com
#
# or create issues
# =============================================================================
import torch
import torch.nn as nn
# 卷积模块
class BasicConv(nn.Module):
def __init__(self,in_channels,out_channels,kernel_size,stride=(1,1),padding=(0,0)):
super(BasicConv, self).__init__()
self.conv = nn.Conv2d(in_channels=in_channels,out_channels=out_channels,kernel_size=kernel_size,stride=stride,padding=padding)
self.relu = nn.ReLU(inplace=True)
def forward(self, x):
x = self.conv(x)
x = self.relu(x)
return x
# 额外的损失分支
class SideBranch(nn.Module):
def __init__(self, in_channels,num_classes):
super(SideBranch, self).__init__()
self.avg_pool = nn.AvgPool2d(kernel_size=5, stride=3)
self.conv1x1 = BasicConv(in_channels=in_channels, out_channels=128, kernel_size=1)
self.fc_1 = nn.Linear(in_features=2048, out_features=1024)
self.relu = nn.ReLU(inplace=True)
self.fc_2 = nn.Linear(in_features=1024, out_features=num_classes)
def forward(self, x):
x = self.avg_pool(x)
x = self.conv1x1(x)
x = torch.flatten(x,1)
x = self.fc_1(x)
x = self.relu(x)
x = torch.dropout(x, 0.7, train=True)
x = self.fc_2(x)
return x
# Inception模块
class InceptionBlock(nn.Module):
def __init__(self,in_channels,ch1x1, ch3x3reduce,ch3x3,ch5x5reduce,ch5x5,chpool):
super(InceptionBlock, self).__init__()
self.branch_1 = BasicConv(in_channels=in_channels, out_channels=ch1x1,kernel_size=1)
self.branch_2 = nn.Sequential(
BasicConv(in_channels=in_channels, out_channels=ch3x3reduce, kernel_size=1),
BasicConv(in_channels=ch3x3reduce, out_channels=ch3x3,kernel_size=3, padding=1)
)
self.branch_3 = nn.Sequential(
BasicConv(in_channels=in_channels, out_channels=ch5x5reduce,kernel_size=1),
BasicConv(in_channels=ch5x5reduce,out_channels=ch5x5,kernel_size=5, padding=2)
)
self.branch_4 = nn.Sequential(
nn.MaxPool2d(kernel_size=3, padding=1,stride=1, ceil_mode=True),
BasicConv(in_channels=in_channels,out_channels=chpool,kernel_size=1)
)
def forward(self, x):
x_1 = self.branch_1(x)
x_2 = self.branch_2(x)
x_3 = self.branch_3(x)
x_4 = self.branch_4(x)
x = torch.cat([x_1, x_2, x_3, x_4], dim=1)
return x
# GoogLeNet/Inception模型
class Inception_V1(nn.Module):
def __init__(self, num_classes):
super(Inception_V1, self).__init__()
self.BasicConv_1 = BasicConv(in_channels=3, out_channels=64, kernel_size=7,stride=2, padding=3)
self.max_pool_1 = nn.MaxPool2d(kernel_size=3, stride=2, ceil_mode=True) # 把不足square_size的边保留下来,单独计算
self.lrn_1 = nn.LocalResponseNorm(2)
self.conv_1x1 = BasicConv(in_channels=64, out_channels=64, kernel_size=1)
self.conv_3x3 = BasicConv(in_channels=64, out_channels=192, kernel_size=3, padding=1)
self.lrn_2 = nn.LocalResponseNorm(2)
self.max_pool_2 = nn.MaxPool2d(kernel_size=3, stride=2, ceil_mode=True)
# in_channels,ch1x1, ch3x3reduce,ch3x3,ch5x5reduce,ch5x5,chpool
self.InceptionBlock_3a = InceptionBlock(192,64,96,128,16,32,32)
self.InceptionBlock_3b = InceptionBlock(256,128,128,192,32,96,64)
self.max_pool_3 = nn.MaxPool2d(kernel_size=3, stride=2, ceil_mode=True)
self.InceptionBlock_4a = InceptionBlock(480,192,96,208,16,48,64)
self.SideBranch_1 = SideBranch(512, num_classes)
self.InceptionBlock_4b = InceptionBlock(512,160,112,224,24,64,64)
self.InceptionBlock_4c = InceptionBlock(512,128,128,256,24,64,64)
self.InceptionBlock_4d = InceptionBlock(512,112,144,288,32,64,64)
self.SideBranch_2 = SideBranch(528, num_classes)
self.InceptionBlock_4e = InceptionBlock(528,256,160,320,32,128,128)
self.max_pool_4 = nn.MaxPool2d(kernel_size=3,stride=2, ceil_mode=True)
self.InceptionBlock_5a = InceptionBlock(832,256,160,320,32,128,128)
self.InceptionBlock_5b = InceptionBlock(832,384,192,384,48,128,128)
self.avg_pool = nn.AdaptiveAvgPool2d(1)
self.flatten = nn.Flatten()
self.fc = nn.Linear(in_features=1024 ,out_features=num_classes)
def forward(self, x):
x = self.BasicConv_1(x)
x = self.max_pool_1(x)
x = self.lrn_1(x)
x = self.conv_1x1(x)
x = self.conv_3x3(x)
x = self.lrn_1(x)
x = self.max_pool_2(x)
x = self.InceptionBlock_3a(x)
x = self.InceptionBlock_3b(x)
x = self.max_pool_3(x)
x = self.InceptionBlock_4a(x)
x_1 = self.SideBranch_1(x)
x = self.InceptionBlock_4b(x)
x = self.InceptionBlock_4c(x)
x = self.InceptionBlock_4d(x)
x_2 = self.SideBranch_2(x)
x = self.InceptionBlock_4e(x)
x = self.max_pool_4(x)
x = self.InceptionBlock_5a(x)
x = self.InceptionBlock_5b(x)
x = self.avg_pool(x)
x = self.flatten(x)
x = torch.dropout(x, 0.4,train=True)
x = self.fc(x)
x_1 = torch.softmax(x_1, dim=1)
x_2 = torch.softmax(x_2, dim=1)
x_3 = torch.softmax(x, dim=1)
# output = x_3 + (x_1 + x_2) * 0.3
return x_3,x_2,x_1
if __name__ == '__main__':
# 创建模型,给定输入,前向传播,存储模型
input = torch.randn([1, 3, 224, 224])
model = Inception_V1(num_classes=1000)
torch.save(model, 'googlenet.pth')
x_3,x_2,x_1 = model(input)
# 观察输出,只需要观察shape是我们想要的即可
print(x_1.shape)
print(x_2.shape)
print(x_3.shape)
torch.onnx.export(model, input, 'googlenet.onnx')ResNet18
ResNet18 是 ResNet(残差网络)系列中的一个较小型号,由 Microsoft Research 提出。ResNet 通过引入残差块(Residual Block)解决了深度神经网络中的梯度消失问题,使得网络可以更深,从而提高模型的性能。以下是 ResNet18 的主要特点和结构:
主要特点
- 残差块:每个残差块包含两个卷积层,每个卷积层后面跟着一个批量归一化(Batch Normalization)层和一个 ReLU 激活函数。残差块通过跳跃连接(Skip Connection)将输入直接加到输出上,这样可以缓解梯度消失问题。
- 简单的结构:ResNet18 只有 18 层,相对于其他更深的 ResNet 模型(如 ResNet50、ResNet101)来说,计算量较小,适合资源有限的场景。
结构
ResNet18 的结构可以分为以下几个部分: 输入层:
- 卷积层:7x7 卷积核,64 个输出通道,步长为 2,填充为 3。
- 批量归一化层。
- ReLU 激活函数。
- 最大池化层:3x3 核,步长为 2,填充为 1。
残差块:
- 4 个残差块组,每个组包含 2 个残差块。
- 每个残差块包含两个 3x3 卷积层,每个卷积层后面跟着一个批量归一化层和一个 ReLU 激活函数。
- 跳跃连接将输入直接加到输出上。
全局平均池化层:
- 将特征图缩小到固定大小,通常为 1x1。
全连接层:
- 输出分类结果,通常用于图像分类任务。
#coding:utf8
# Copyright 2023 longpeng2008. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# If you find any problem,please contact us
#
# longpeng2008to2012@gmail.com
#
# or create issues
# =============================================================================
import torch
import torch.nn as nn
# 残差模块
class ResidualBlock(nn.Module):
def __init__(self, in_channels, outchannels):
super(ResidualBlock, self).__init__()
self.channel_equal_flag = True
if in_channels == outchannels:
self.conv1 = nn.Conv2d(in_channels=in_channels, out_channels=outchannels, kernel_size=3, padding=1, stride=1)
else:
## 对恒等映射分支的变换,当通道数发生变换时,分辨率变为原来的二分之一
self.conv1x1 = nn.Conv2d(in_channels=in_channels, out_channels=outchannels, kernel_size=1, stride=2)
self.bn1x1 = nn.BatchNorm2d(num_features=outchannels)
self.channel_equal_flag = False
self.conv1 = nn.Conv2d(in_channels=in_channels, out_channels=outchannels,kernel_size=3,padding=1, stride=2)
self.bn1 = nn.BatchNorm2d(num_features=outchannels)
self.relu = nn.ReLU(inplace=True)
self.conv2 = nn.Conv2d(in_channels=outchannels, out_channels=outchannels, kernel_size=3,padding=1)
self.bn2 = nn.BatchNorm2d(num_features=outchannels)
def forward(self, x):
identity = x
x = self.conv1(x)
x = self.bn1(x)
x = self.relu(x)
x = self.conv2(x)
x = self.bn2(x)
x = self.relu(x)
if self.channel_equal_flag == True:
pass
else:
identity = self.conv1x1(identity)
identity = self.bn1x1(identity)
identity = self.relu(identity)
out = identity + x
return out
class ResNet18(nn.Module):
def __init__(self, num_classes=1000):
super(ResNet18, self).__init__()
# conv1
self.conv1 = nn.Conv2d(in_channels=3, out_channels=64,kernel_size=7,stride=2, padding=3)
self.bn1 = nn.BatchNorm2d(num_features=64)
self.relu = nn.ReLU(inplace=True)
# conv2_x
self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2,padding=1)
self.conv2_1 = ResidualBlock(in_channels=64, outchannels=64)
self.conv2_2 = ResidualBlock(in_channels=64, outchannels=64)
# conv3_x
self.conv3_1 = ResidualBlock(in_channels=64, outchannels=128)
self.conv3_2 = ResidualBlock(in_channels=128, outchannels=128)
# conv4_x
self.conv4_1 = ResidualBlock(in_channels=128, outchannels=256)
self.conv4_2 = ResidualBlock(in_channels=256, outchannels=256)
# conv5_x
self.conv5_1 = ResidualBlock(in_channels=256, outchannels=512)
self.conv5_2 = ResidualBlock(in_channels=512, outchannels=512)
# avg_pool
self.avg_pool = nn.AdaptiveAvgPool2d(output_size=(1,1)) # [N, C, H, W] = [N, 512, 1, 1]
# fc
self.fc = nn.Linear(in_features=512, out_features=num_classes) # [N, num_classes]
# softmax
self.softmax = nn.Softmax(dim=1)
def forward(self, x):
# conv1
x = self.conv1(x)
x = self.bn1(x)
x = self.relu(x)
# conv2_x
x = self.maxpool(x)
x = self.conv2_1(x)
x = self.conv2_2(x)
# conv3_x
x = self.conv3_1(x)
x = self.conv3_2(x)
# conv4_x
x = self.conv4_1(x)
x = self.conv4_2(x)
# conv5_x
x = self.conv5_1(x)
x = self.conv5_2(x)
# avgpool + fc + softmax
x = self.avg_pool(x)
x = torch.flatten(x, 1)
x = self.fc(x)
x = self.softmax(x)
return x
if __name__ == '__main__':
model = ResNet18(num_classes=1000)
input = torch.randn([1, 3, 224, 224])
output = model(input)
torch.save(model, 'resnet18.pth')
torch.onnx.export(model,input,'resnet18.onnx')示例
import torch
from torch import nn
from torch.utils.data import DataLoader
from tqdm import tqdm # pip install tqdm
import matplotlib.pyplot as plt
import os
from torchsummary import summary
from torch.utils.tensorboard import SummaryWriter
import wandb
import datetime
from MyModel import SimpleCNN
from torchvision.models import resnet18, ResNet18_Weights
from MYDataset import MyDataset
# # 定义训练函数
def train(dataloader, model, loss_fn, optimizer):
# 初始化训练数据集的大小和批次数量
size = len(dataloader.dataset)
num_batches = len(dataloader)
# 设置模型为训练模式
model.train()
# 初始化总损失和正确预测数量
loss_total = 0
correct = 0
# 遍历数据加载器中的所有数据批次
for X, y in tqdm(dataloader):
# 将数据和标签移动到指定设备(例如GPU)
X, y = X.to(device), y.to(device)
# 使用模型进行预测
pred = model(X)
# 计算正确预测的数量
correct += (pred.argmax(1) == y).type(torch.float).sum().item()
# 计算预测结果和真实结果之间的损失
loss = loss_fn(pred, y)
# 累加总损失
loss_total += loss.item()
# 执行反向传播,计算梯度
loss.backward()
# 更新模型参数
optimizer.step()
# 清除梯度信息
optimizer.zero_grad()
# 计算平均损失和准确率
loss_avg = loss_total / num_batches
correct /= size
# 返回准确率和平均损失,保留三位小数
return round(correct, 3), round(loss_avg,3)
# 定义测试函数
def test(dataloader, model, loss_fn):
# 初始化测试数据集的大小和批次数量
size = len(dataloader.dataset)
num_batches = len(dataloader)
# 设置模型为评估模式
model.eval()
# 初始化测试损失和正确预测数量
test_loss, correct = 0, 0
# 不计算梯度,以提高计算效率并减少内存使用
with torch.no_grad():
# 遍历数据加载器中的所有数据批次
for X, y in tqdm(dataloader):
# 将数据和标签移动到指定设备(例如GPU)
X, y = X.to(device), y.to(device)
# 使用模型进行预测
pred = model(X)
# 累加预测损失
test_loss += loss_fn(pred, y).item()
# 累加正确预测的数量
correct += (pred.argmax(1) == y).type(torch.float).sum().item()
# 计算平均测试损失和准确率
test_loss /= num_batches
correct /= size
# 返回准确率和平均测试损失,保留三位小数
return round(correct, 3), round(test_loss, 3)
def writedata(txt_log_name, tensorboard_writer, epoch, train_accuracy, train_loss, test_accuracy, test_loss):
# 保存到文档
with open(txt_log_name, "a+") as f:
f.write(f"Epoch:{epoch}\ttrain_accuracy:{train_accuracy}\ttrain_loss:{train_loss}\ttest_accuracy:{test_accuracy}\ttest_loss:{test_loss}\n")
# 保存到tensorboard
# 记录全连接层参数
for name, param in model.named_parameters():
tensorboard_writer.add_histogram(name, param.clone().cpu().data.numpy(), global_step=epoch)
tensorboard_writer.add_scalar('Accuracy/train', train_accuracy, epoch)
tensorboard_writer.add_scalar('Loss/train', train_loss, epoch)
tensorboard_writer.add_scalar('Accuracy/test', test_accuracy, epoch)
tensorboard_writer.add_scalar('Loss/test', test_loss, epoch)
wandb.log({"Accuracy/train": train_accuracy,
"Loss/train": train_loss,
"Accuracy/test": test_accuracy,
"Loss/test": test_loss})
def plot_txt(log_txt_loc):
with open(log_txt_loc, 'r') as f:
log_data = f.read()
# 解析日志数据
epochs = []
train_accuracies = []
train_losses = []
test_accuracies = []
test_losses = []
for line in log_data.strip().split('\n'):
epoch, train_acc, train_loss, test_acc, test_loss = line.split('\t')
epochs.append(int(epoch.split(':')[1]))
train_accuracies.append(float(train_acc.split(':')[1]))
train_losses.append(float(train_loss.split(':')[1]))
test_accuracies.append(float(test_acc.split(':')[1]))
test_losses.append(float(test_loss.split(':')[1]))
# 创建折线图
plt.figure(figsize=(10, 5))
# 训练数据
plt.subplot(1, 2, 1)
plt.plot(epochs, train_accuracies, label='Train Accuracy')
plt.plot(epochs, test_accuracies, label='Test Accuracy')
plt.title('Training Metrics')
plt.xlabel('Epoch')
plt.ylabel('Value')
plt.legend()
# 设置横坐标刻度为整数
plt.xticks(range(min(epochs), max(epochs) + 1))
# 测试数据
plt.subplot(1, 2, 2)
plt.plot(epochs, train_losses, label='Train Loss')
plt.plot(epochs, test_losses, label='Test Loss')
plt.title('Testing Metrics')
plt.xlabel('Epoch')
plt.ylabel('Value')
plt.legend()
# 设置横坐标刻度为整数
plt.xticks(range(min(epochs), max(epochs) + 1))
plt.tight_layout()
plt.show()
if __name__ == '__main__':
batch_size = 64
init_lr = 1e-4
epochs = 20
log_root = "logs_resnet18_adam"
log_txt_loc = os.path.join(log_root,"log.txt")
# 指定TensorBoard数据的保存地址
tensorboard_writer = SummaryWriter(log_root)
# WandB信息保存地址
run_time = datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
wandb.init(
dir=log_root,
project='Flower Classify',
name=f"run_resnet18_adam",
config={
"learning_rate": init_lr,
"batch_size": batch_size,
"model": "resnet18",
"dataset": "Flower10",
"epochs": epochs,
}
)
if os.path.isdir(log_root):
pass
else:
os.mkdir(log_root)
train_data = MyDataset("train.txt",train_flag=True)
test_data = MyDataset("test.txt",train_flag=False)
# 创建数据加载器
train_dataloader = DataLoader(train_data, batch_size=batch_size)
test_dataloader = DataLoader(test_data, batch_size=batch_size)
for X, y in test_dataloader:
print(f"Shape of X [N, C, H, W]: {X.shape}")
print(f"Shape of y: {y.shape} {y.dtype}")
break
# 指定设备
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using {device} device")
"""
使用resnet18进行预训练
"""
# 加载具有预训练权重的ResNet18模型
weights = ResNet18_Weights.DEFAULT
model = resnet18(weights=weights)
# 保存模型权重到本地文件
torch.save(model.state_dict(), 'resnet18_weights.pth')
print("ResNet18 weights downloaded and saved as 'resnet18_weights.pth'")
pretrain_model = resnet18(pretrained=False)
print(resnet18)
num_ftrs = pretrain_model.fc.in_features # 获取全连接层的输入
pretrain_model.fc = nn.Linear(num_ftrs, 10) # 全连接层改为不同的输出
# 预先训练好的参数, 'https://download.pytorch.org/models/resnet34-333f7ec4.pth'
pretrained_dict = torch.load('resnet18_weights.pth')
# # 弹出fc层的参数
pretrained_dict.pop('fc.weight')
pretrained_dict.pop('fc.bias')
# # 自己的模型参数变量,在开始时里面参数处于初始状态,所以很多0和1
model_dict = pretrain_model.state_dict()
# # 去除一些不需要的参数
pretrained_dict = {k: v for k, v in pretrained_dict.items() if k in model_dict}
# # 模型参数列表进行参数更新,加载参数
model_dict.update(pretrained_dict)
# 改进过的预训练模型结构,加载刚刚的模型参数列表
pretrain_model.load_state_dict(model_dict)
'''
冻结部分层
'''
# 将满足条件的参数的 requires_grad 属性设置为False
for name, value in pretrain_model.named_parameters():
if (name != 'fc.weight') and (name != 'fc.bias'):
value.requires_grad = False
#
# filter 函数将模型中属性 requires_grad = True 的参数选出来
params_selected = filter(lambda p: p.requires_grad, pretrain_model.parameters()) # 要更新的参数在 params_selected 当中
model = pretrain_model.to(device)
print(model)
summary(model, (3,224,224))
# 模拟输入,大小和输入相同即可
init_img = torch.zeros((1, 3, 224, 224), device=device)
tensorboard_writer.add_graph(model, init_img)
# 添加wandb的模型记录
wandb.watch(model, log='all', log_graph=True)
# 定义损失函数
loss_fn = nn.CrossEntropyLoss()
# 定义优化器
optimizer = torch.optim.Adam(params_selected, lr=init_lr)
best_acc = 0
# 定义循环次数,每次循环里面,先训练,再测试
for t in range(epochs):
print(f"Epoch {t + 1}\n-------------------------------")
train_acc, train_loss = train(train_dataloader, model, loss_fn, optimizer)
test_acc, test_loss = test(test_dataloader, model, loss_fn)
writedata(log_txt_loc, tensorboard_writer,t,train_acc,train_loss,test_acc,test_loss)
# 保存最佳模型
if test_acc > best_acc:
best_acc = test_acc
torch.save(model.state_dict(), os.path.join(log_root,"best.pth"))
torch.save(model.state_dict(), os.path.join(log_root,"last.pth"))
print("Done!")
tensorboard_writer.close()
wandb.finish()
plot_txt(log_txt_loc)ResNet50
# coding:utf8
# Copyright 2023 longpeng2008. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# If you find any problem,please contact us
#
# longpeng2008to2012@gmail.com
#
# or create issues
# =============================================================================
import torch
import torch.nn as nn
class ResidualBlock(nn.Module):
def __init__(self, in_channels, out_channels, identity_downsample=None, stride=1):
super(ResidualBlock, self).__init__()
self.expansion = 4
self.conv1 = nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=1, stride=1, padding=0)
self.bn1 = nn.BatchNorm2d(out_channels)
self.conv2 = nn.Conv2d(in_channels=out_channels, out_channels=out_channels, kernel_size=3, stride=stride,
padding=1)
self.bn2 = nn.BatchNorm2d(out_channels)
self.conv3 = nn.Conv2d(in_channels=out_channels, out_channels=out_channels * self.expansion, kernel_size=1,
stride=1, padding=0)
self.bn3 = nn.BatchNorm2d(out_channels * self.expansion)
self.relu = nn.ReLU()
self.identity_downsample = identity_downsample
self.stride = stride
def forward(self, x):
identity = x.clone()
x = self.conv1(x)
x = self.bn1(x)
x = self.relu(x)
x = self.conv2(x)
x = self.bn2(x)
x = self.relu(x)
x = self.conv3(x)
x = self.bn3(x)
if self.identity_downsample is not None:
identity = self.identity_downsample(identity)
x += identity
x = self.relu(x)
return x
class ResNet(nn.Module):
def __init__(self, block, layers, num_classes):
super(ResNet, self).__init__()
self.in_channels = 64
self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3)
self.bn1 = nn.BatchNorm2d(64)
self.relu = nn.ReLU()
self.pooling = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
self.layer1 = self.res_layer(block, layers[0], out_channels=64, stride=1) #第1组残差模块,整体分辨率不下降
self.layer2 = self.res_layer(block, layers[1], out_channels=128, stride=2)#第2组残差模块,分辨率下降1/2
self.layer3 = self.res_layer(block, layers[2], out_channels=256, stride=2)#第2组残差模块,分辨率下降1/2
self.layer4 = self.res_layer(block, layers[3], out_channels=512, stride=2)#第2组残差模块,分辨率下降1/2
self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
self.fc = nn.Linear(512 * 4, num_classes)
def forward(self, x):
x = self.conv1(x)
x = self.bn1(x)
x = self.relu(x)
x = self.pooling(x)
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = self.layer4(x)
x = self.avgpool(x)
x = x.reshape(x.shape[0], -1)
x = self.fc(x)
return x
def res_layer(self, block, num_residual_blocks, out_channels, stride):
identity_downsample = None
layers = []
# 每一组残差模块的第一个残差模块需要特殊对待
if stride != 1 or self.in_channels != out_channels * 4:
identity_downsample = nn.Sequential(
nn.Conv2d(
self.in_channels,
out_channels * 4,
kernel_size=1,
stride=stride,
),
nn.BatchNorm2d(out_channels * 4),
)
layers.append(
block(self.in_channels, out_channels, identity_downsample, stride)
)
# 除了每一组残差模块的第一个,其他残差模块的输入通道都等于输出通道的4倍,ResNet 50,101,152
self.in_channels = out_channels * 4 # 该值每经过一组残差模块,就会变大,64 -> 4*64=256 -> 4*128=512 -> 4*256=1024 -> 4*512=2048
# 如resnet50的conv2_x,通道变换为256 -> 64 -> 256
for i in range(num_residual_blocks - 1):
layers.append(block(self.in_channels, out_channels))
return nn.Sequential(*layers)
def ResNet50(num_classes=1000):
return ResNet(ResidualBlock, [3, 4, 6, 3], num_classes)
def ResNet101(num_classes=1000):
return ResNet(ResidualBlock, [3, 4, 23, 3], num_classes)
def ResNet152(num_classes=1000):
return ResNet(ResidualBlock, [3, 8, 36, 3], num_classes)
input = torch.randn([1, 3, 224, 224])
resnet50 = ResNet50(num_classes=1000)
output = resnet50(input)
torch.save(resnet50, 'resnet50.pth')
torch.onnx.export(resnet50, input, 'resnet50.onnx')vgg
VGG(Visual Geometry Group)是由牛津大学的 Visual Geometry Group 提出的一种经典的卷积神经网络架构。VGG 网络在 2014 年的 ImageNet 大规模视觉识别挑战赛(ILSVRC)中取得了优异的成绩,以其简单而有效的设计而闻名。
主要特点
- 简单且一致的架构:VGG 使用了非常简单的架构,主要由多个 3x3 的卷积层和最大池化层组成。
- 多层卷积:通过堆叠多个小卷积层(3x3)来增加网络的深度,而不是使用更大的卷积核。
- 全连接层:在网络的最后部分,使用了几个全连接层来进行分类。
结构
VGG 网络有多种变体,其中最常用的是 VGG16 和 VGG19。以下是 VGG16 的结构: 输入层:
- 输入图像尺寸:224x224x3。
卷积层:
- 2 个 3x3 卷积层,64 个输出通道,后接一个 2x2 最大池化层。
- 2 个 3x3 卷积层,128 个输出通道,后接一个 2x2 最大池化层。
- 3 个 3x3 卷积层,256 个输出通道,后接一个 2x2 最大池化层。
- 3 个 3x3 卷积层,512 个输出通道,后接一个 2x2 最大池化层。
- 3 个 3x3 卷积层,512 个输出通道,后接一个 2x2 最大池化层。
全连接层:
- 2 个 4096 维的全连接层,每个全连接层后面跟着一个 ReLU 激活函数和一个 Dropout 层(防止过拟合)。
- 1 个 1000 维的全连接层,用于输出分类结果。
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
class VGG(nn.Module):
def __init__(self):
super(VGG,self).__init__()
self.conv1_1 = nn.Conv2d(3,64,3,1,1)
self.conv1_2 = nn.Conv2d(64,64,3,1,1)
self.pool1 = nn.MaxPool2d(2)
self.conv2_1 = nn.Conv2d(64,128,3,1,1)
self.conv2_2 = nn.Conv2d(128,128,3,1,1)
self.pool2 = nn.MaxPool2d(2)
self.conv3_1 = nn.Conv2d(128, 256, 3, 1, 1)
self.conv3_2 = nn.Conv2d(256, 256, 3, 1, 1)
self.conv3_3 = nn.Conv2d(256, 256, 3, 1, 1)
self.pool3 = nn.MaxPool2d(2)
self.conv4_1 = nn.Conv2d(256, 512, 3, 1, 1)
self.conv4_2 = nn.Conv2d(512, 512, 3, 1, 1)
self.conv4_3 = nn.Conv2d(512, 512, 3, 1, 1)
self.pool4 = nn.MaxPool2d(2)
self.conv5_1 = nn.Conv2d(512, 512, 3, 1, 1)
self.conv5_2 = nn.Conv2d(512, 512, 3, 1, 1)
self.conv5_3 = nn.Conv2d(512, 512, 3, 1, 1)
self.pool5 = nn.MaxPool2d(2)
self.fc1 = nn.Linear(7*7*512,4096)
self.fc2 = nn.Linear(4096, 4096)
self.classifier = nn.Linear(4096, 1000)
def forward(self,x):
x = F.relu(self.conv1_1(x))
x = F.relu(self.conv1_2(x))
x = self.pool1(x)
x = F.relu(self.conv2_1(x))
x = F.relu(self.conv2_2(x))
x = self.pool2(x)
x = F.relu(self.conv3_1(x))
x = F.relu(self.conv3_2(x))
x = F.relu(self.conv3_3(x))
x = self.pool3(x)
x = F.relu(self.conv4_1(x))
x = F.relu(self.conv4_2(x))
x = F.relu(self.conv4_3(x))
x = self.pool4(x)
x = F.relu(self.conv5_1(x))
x = F.relu(self.conv5_2(x))
x = F.relu(self.conv5_3(x))
x = self.pool5(x)
#print('conv5.shape',x.shape) #n*7*7*512
x = x.reshape(-1,7*7*512)
#print('conv5.shape',x.shape) #n*7*7*512
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.classifier(x)
return x
x = torch.randn((1,3,224,224))
vgg = VGG()
y = vgg(x)
print(y.shape)
torch.save(vgg,'vgg.pth')
torch.onnx.export(vgg,x,'vgg.onnx')mobilenetv1
MobileNetV1 是一种轻量级的卷积神经网络架构,主要用于移动设备和嵌入式视觉应用。以下是 MobileNetV1 的一些关键特点:
- 深度可分离卷积:MobileNetV1 引入了深度可分离卷积,将标准卷积分解为深度卷积(depthwise convolution)和逐点卷积(pointwise convolution)。这种设计大大减少了模型的参数数量和计算复杂度。 结构简单:MobileNetV1 的结构相对简单,由多个深度可分离卷积层堆叠而成,每个层后面通常跟着批量归一化(Batch Normalization)和ReLU激活函数。 高效:由于其轻量级的设计,MobileNetV1 在资源受限的设备上表现出色,适用于实时图像分类、物体检测等任务。
MobileNetV1 的基本结构
一个典型的 MobileNetV1 模型包括以下几个部分:
- 输入层:接受输入图像。
- 初始卷积层:一个标准的3x3卷积层,后面跟着批量归一化和ReLU激活。
- 深度可分离卷积层:多个深度可分离卷积层,每个层包括深度卷积、批量归一化、ReLU激活、逐点卷积、批量归一化和ReLU激活。
- 全局平均池化层:将特征图降维为固定大小的向量。
- 全连接层:输出分类结果。
#coding:utf8
import time
import torch
import torch.nn as nn
import torchvision.models as models
class MobileNetV1(nn.Module):
def __init__(self):
super(MobileNetV1,self).__init__()
# 标准卷积
def conv_bn(inp,oup,stride):
return nn.Sequential(
nn.Conv2d(inp,oup,3,stride,1,bias = False),
nn.BatchNorm2d(oup),
nn.ReLU(inplace = True))
# 深度可分离卷积,depthwise convolution + pointwise convolution
def conv_dw(inp,oup,stride):
return nn.Sequential(
nn.Conv2d(inp,inp,3,stride,1,groups = inp,bias = False),
nn.BatchNorm2d(inp),
nn.ReLU(inplace = True),
nn.Conv2d(inp,oup,1,1,0,bias = False),
nn.BatchNorm2d(oup),
nn.ReLU(inplace = True))
#网络模型声明
self.model = nn.Sequential(
conv_bn(3,32,2),
conv_dw(32,64,1),
conv_dw(64,128,2),
conv_dw(128,128,1),
conv_dw(128,256,2),
conv_dw(256,256,1),
conv_dw(256,512,2),
conv_dw(512,512,1),
conv_dw(512,512,1),
conv_dw(512,512,1),
conv_dw(512,512,1),
conv_dw(512,512,1),
conv_dw(512,1024,2),
conv_dw(1024,1024,1),
nn.AvgPool2d(7),)
self.fc = nn.Linear(1024,1000)
#网络的前向过程
def forward(self,x):
x = self.model(x)
x = x.view(-1,1024)
x = self.fc(x)
return x
#速度评估
def speed(model,name):
t0 = time.time()
input = torch.rand(1,3,224,224).cpu()
t1 = time.time()
model(input)
t2 = time.time()
for i in range(0,30):
model(input)
t3 = time.time()
print('%10s : %f'%(name,(t3 - t2)/30))
if __name__ == '__main__':
resnet18 = models.resnet18().cpu()
alexnet = models.alexnet().cpu()
vgg16 = models.vgg16().cpu()
mobilenetv1 = MobileNetV1().cpu()
speed(resnet18,'resnet18')
speed(alexnet,'alexnet')
speed(vgg16,'vgg16')
speed(mobilenetv1,'mobilenet')mobilenetv2
MobileNetV2 是 MobileNetV1 的改进版本,进一步优化了模型的性能和效率。MobileNetV2 引入了一些新的设计理念,使其在保持轻量级的同时,提高了模型的准确性和鲁棒性。以下是 MobileNetV2 的一些关键特点:
- 倒残差结构(Inverted Residuals):与传统的残差块不同,MobileNetV2 使用“膨胀-压缩”的策略,即先通过1x1卷积增加通道数,再进行深度卷积,最后通过1x1卷积减少通道数。
- 线性瓶颈(Linear Bottlenecks):在每个残差块的末端使用线性层,而不是ReLU激活函数,以保留更多的信息。
- 跳跃连接(Skip Connections):类似于ResNet中的跳跃连接,有助于梯度传播,提高训练效果。
MobileNetV2 的基本结构
一个典型的 MobileNetV2 模型包括以下几个部分:
- 输入层:接受输入图像。
- 初始卷积层:一个标准的3x3卷积层,后面跟着批量归一化和ReLU6激活。
- 倒残差块(Inverted Residual Blocks):多个倒残差块,每个块包括1x1卷积、深度卷积、1x1卷积和跳跃连接。
- 全局平均池化层:将特征图降维为固定大小的向量。
- 全连接层:输出分类结果。
#coding:utf8
import time
import torch
import torch.nn as nn
# 调整通道数量,使其都可以除8
def _make_divisible(v, divisor, min_value=None):
if min_value is None:
min_value = divisor
new_v = max(min_value, int(v + divisor / 2) // divisor * divisor)
# 保证向下取整损失不要超过10%.
if new_v < 0.9 * v:
new_v += divisor
return new_v
# 标准卷积模块
class ConvBNReLU(nn.Sequential):
def __init__(self, in_planes, out_planes, kernel_size=3, stride=1, groups=1):
padding = (kernel_size - 1) // 2
super(ConvBNReLU, self).__init__(
nn.Conv2d(in_planes, out_planes, kernel_size, stride, padding, groups=groups, bias=False),
nn.BatchNorm2d(out_planes),
nn.ReLU6(inplace=True)
)
# 反转残差模块
class InvertedResidual(nn.Module):
def __init__(self, inp, oup, stride, expand_ratio):
super(InvertedResidual, self).__init__()
self.stride = stride
assert stride in [1, 2]
hidden_dim = int(round(inp * expand_ratio))
self.use_res_connect = self.stride == 1 and inp == oup
layers = []
if expand_ratio != 1:
# pw
layers.append(ConvBNReLU(inp, hidden_dim, kernel_size=1))
layers.extend([
# dw
ConvBNReLU(hidden_dim, hidden_dim, stride=stride, groups=hidden_dim),
# pw-linear
nn.Conv2d(hidden_dim, oup, 1, 1, 0, bias=False),
nn.BatchNorm2d(oup),
])
self.conv = nn.Sequential(*layers)
def forward(self, x):
if self.use_res_connect:
return x + self.conv(x)
else:
return self.conv(x)
# 模型定义模块
class MobileNetV2(nn.Module):
def __init__(self, num_classes=1000, width_mult=1.0, inverted_residual_setting=None, round_nearest=8):
super(MobileNetV2, self).__init__()
block = InvertedResidual
input_channel = 32
last_channel = 1280
if inverted_residual_setting is None:
inverted_residual_setting = [
# t, c, n, s
[1, 16, 1, 1],
[6, 24, 2, 2],
[6, 32, 3, 2],
[6, 64, 4, 2],
[6, 96, 3, 1],
[6, 160, 3, 2],
[6, 320, 1, 1],
]
# 需要输入t,c,n,s参数
if len(inverted_residual_setting) == 0 or len(inverted_residual_setting[0]) != 4:
raise ValueError("inverted_residual_setting should be non-empty "
"or a 4-element list, got {}".format(inverted_residual_setting))
# 第一层
input_channel = _make_divisible(input_channel * width_mult, round_nearest)
self.last_channel = _make_divisible(last_channel * max(1.0, width_mult), round_nearest)
features = [ConvBNReLU(3, input_channel, stride=2)]
# inverted residual blocks
for t, c, n, s in inverted_residual_setting:
output_channel = _make_divisible(c * width_mult, round_nearest)
for i in range(n):
stride = s if i == 0 else 1
features.append(block(input_channel, output_channel, stride, expand_ratio=t))
input_channel = output_channel
# 最后几层
features.append(ConvBNReLU(input_channel, self.last_channel, kernel_size=1))
# make it nn.Sequential
self.features = nn.Sequential(*features)
# building classifier
self.classifier = nn.Sequential(
nn.Dropout(0.2),
nn.Linear(self.last_channel, num_classes),
)
def forward(self, x):
x = self.features(x) # 7*7*1280
x = x.mean([2, 3])
x = self.classifier(x)
return x
# 速度评估
def speed(model, name):
t0 = time.time()
input = torch.rand(1, 3, 224, 224).cpu()
t1 = time.time()
model(input)
t2 = time.time()
for i in range(0, 30):
model(input)
t3 = time.time()
print('%10s : %f' % (name, (t3 - t2) / 30))
if __name__ == '__main__':
from mobilenetv1 import MobileNetV1
mobilenetv1 = MobileNetV1().cpu()
mobilenetv2 = MobileNetV2(width_mult=1)
speed(mobilenetv1, 'mobilenetv1')
speed(mobilenetv2, 'mobilenetv2')
input = torch.randn([1, 3, 224, 224])
output = mobilenetv1(input)
torch.onnx.export(mobilenetv1,input,'mobilenetv1.onnx')
output = mobilenetv2(input)
torch.onnx.export(mobilenetv2,input,'mobilenetv2.onnx')shufflenetv1
ShuffleNetV1 是一种专门为移动设备和嵌入式系统设计的高效卷积神经网络架构。它通过引入通道混洗(Channel Shuffle)操作,有效利用了分组卷积(Group Convolution)的优势,同时避免了分组卷积导致的信息隔离问题。以下是 ShuffleNetV1 的一些关键特点:
- 分组卷积(Group Convolution):通过将输入通道分成多个组,分别进行卷积操作,减少计算量和模型参数。
- 通道混洗(Channel Shuffle):在分组卷积之后,通过通道混洗操作重新组合通道,确保信息在不同组之间充分交流。
- 瓶颈结构(Bottleneck Structure):使用1x1卷积进行降维和升维,中间使用3x3深度卷积进行特征提取。
ShuffleNetV1 的基本结构
一个典型的 ShuffleNetV1 模型包括以下几个部分:
- 输入层:接受输入图像。
- 初始卷积层:一个标准的3x3卷积层,后面跟着批量归一化和ReLU激活。
- 瓶颈块(Bottleneck Blocks):多个瓶颈块,每个块包括1x1卷积、深度卷积、1x1卷积和通道混洗操作。
- 全局平均池化层:将特征图降维为固定大小的向量。
- 全连接层:输出分类结果。
import torch
import torch.nn as nn
import torch.nn.functional as F
import time
from collections import OrderedDict
def conv3x3(in_channels, out_channels, stride=1,padding=1, bias=True, groups=1):
return nn.Conv2d(
in_channels,
out_channels,
kernel_size=3,
stride=stride,
padding=padding,
bias=bias,
groups=groups)
def conv1x1(in_channels, out_channels, groups=1):
return nn.Conv2d(
in_channels,
out_channels,
kernel_size=1,
groups=groups,
stride=1)
# 通道shuffle实现
def channel_shuffle(x, groups):
batchsize, num_channels, height, width = x.data.size()
channels_per_group = num_channels // groups
# reshape
x = x.view(batchsize, groups, channels_per_group, height, width)
# transpose
# - contiguous() required if transpose() is used before view().
# See https://github.com/pytorch/pytorch/issues/764
x = torch.transpose(x, 1, 2).contiguous()
# flatten
x = x.view(batchsize, -1, height, width)
return x
# shufflenet结构单元
class ShuffleUnit(nn.Module):
def __init__(self, in_channels, out_channels, groups=3,
grouped_conv=True, combine='add'):
super(ShuffleUnit, self).__init__()
self.in_channels = in_channels
self.out_channels = out_channels
self.grouped_conv = grouped_conv
self.combine = combine
self.groups = groups
self.bottleneck_channels = self.out_channels // 4
# define the type of ShuffleUnit
if self.combine == 'add':
# ShuffleUnit Figure 2b
self.depthwise_stride = 1
self._combine_func = self._add
elif self.combine == 'concat':
# ShuffleUnit Figure 2c
self.depthwise_stride = 2
self._combine_func = self._concat
# ensure output of concat has the same channels as original output channels.
self.out_channels -= self.in_channels
else:
raise ValueError("Cannot combine tensors with \"{}\"" \
"Only \"add\" and \"concat\" are" \
"supported".format(self.combine))
# Use a 1x1 grouped or non-grouped convolution to reduce input channels
# to bottleneck channels, as in a ResNet bottleneck module.
# NOTE: Do not use group convolution for the first conv1x1 in Stage 2.
self.first_1x1_groups = self.groups if grouped_conv else 1
self.g_conv_1x1_compress = self._make_grouped_conv1x1(
self.in_channels,
self.bottleneck_channels,
self.first_1x1_groups,
batch_norm=True,
relu=True
)
# 3x3 depthwise convolution followed by batch normalization
self.depthwise_conv3x3 = conv3x3(
self.bottleneck_channels, self.bottleneck_channels,
stride=self.depthwise_stride, groups=self.bottleneck_channels)
self.bn_after_depthwise = nn.BatchNorm2d(self.bottleneck_channels)
# Use 1x1 grouped convolution to expand from bottleneck_channels to out_channels
self.g_conv_1x1_expand = self._make_grouped_conv1x1(
self.bottleneck_channels,
self.out_channels,
self.groups,
batch_norm=True,
relu=False
)
@staticmethod
def _add(x, out):
# residual connection
return x + out
@staticmethod
def _concat(x, out):
# concatenate along channel axis
return torch.cat((x, out), 1)
def _make_grouped_conv1x1(self, in_channels, out_channels, groups,
batch_norm=True, relu=False):
modules = OrderedDict()
conv = conv1x1(in_channels, out_channels, groups=groups)
modules['conv1x1'] = conv
if batch_norm:
modules['batch_norm'] = nn.BatchNorm2d(out_channels)
if relu:
modules['relu'] = nn.ReLU()
if len(modules) > 1:
return nn.Sequential(modules)
else:
return conv
def forward(self, x):
# save for combining later with output
residual = x
if self.combine == 'concat':
residual = F.avg_pool2d(residual, kernel_size=3,
stride=2, padding=1)
out = self.g_conv_1x1_compress(x)
out = channel_shuffle(out, self.groups)
out = self.depthwise_conv3x3(out)
out = self.bn_after_depthwise(out)
out = self.g_conv_1x1_expand(out)
out = self._combine_func(residual, out)
return F.relu(out)
class ShuffleNetV1(nn.Module):
def __init__(self, groups=3, in_channels=3, num_classes=1000):
super(ShuffleNetV1, self).__init__()
self.groups = groups # 1*1卷积的分组参数
self.stage_repeats = [3, 7, 3]
self.in_channels = in_channels
self.num_classes = num_classes
# index 0 is invalid and should never be called.
# only used for indexing convenience.
if groups == 1:
self.stage_out_channels = [-1, 24, 144, 288, 576]
elif groups == 2:
self.stage_out_channels = [-1, 24, 200, 400, 800]
elif groups == 3:
self.stage_out_channels = [-1, 24, 240, 480, 960]
elif groups == 4:
self.stage_out_channels = [-1, 24, 272, 544, 1088]
elif groups == 8:
self.stage_out_channels = [-1, 24, 384, 768, 1536]
else:
raise ValueError(
"""{} groups is not supported for 1x1 Grouped Convolutions""".format(groups))
# Stage 1 always has 24 output channels
self.conv1 = conv3x3(self.in_channels,
self.stage_out_channels[1], # stage 1
stride=2)
self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
# Stage 2
self.stage2 = self._make_stage(2)
# Stage 3
self.stage3 = self._make_stage(3)
# Stage 4
self.stage4 = self._make_stage(4)
# Fully-connected classification layer
num_inputs = self.stage_out_channels[-1]
self.fc = nn.Linear(num_inputs, self.num_classes)
def _make_stage(self, stage):
modules = OrderedDict()
stage_name = "ShuffleUnit_Stage{}".format(stage)
# First ShuffleUnit in the stage
# 1. Stage 2的第一个1x1 convolution不进行分组,其他都进行分组
grouped_conv = stage > 2
# 2. 第一个空间分辨率下降,永远是拼接单元.
first_module = ShuffleUnit(
self.stage_out_channels[stage - 1],
self.stage_out_channels[stage],
groups=self.groups,
grouped_conv=grouped_conv,
combine='concat'
)
modules[stage_name + "_0"] = first_module
# 后面的不用下降分辨率.
for i in range(self.stage_repeats[stage - 2]):
name = stage_name + "_{}".format(i + 1)
module = ShuffleUnit(
self.stage_out_channels[stage],
self.stage_out_channels[stage],
groups=self.groups,
grouped_conv=True,
combine='add'
)
modules[name] = module
return nn.Sequential(modules)
def forward(self, x):
x = self.conv1(x)
x = self.maxpool(x)
x = self.stage2(x)
x = self.stage3(x)
x = self.stage4(x)
# global average pooling layer
x = F.avg_pool2d(x, x.data.size()[-2:])
# flatten for input to fully-connected layer
x = x.view(x.size(0), -1)
x = self.fc(x)
return F.log_softmax(x, dim=1)
def speed(model, name):
t0 = time.time()
input = torch.rand(1, 3, 224, 224).cpu()
t1 = time.time()
model(input)
t2 = time.time()
for i in range(0, 30):
model(input)
t3 = time.time()
print('%10s : %f' % (name, (t3 - t2) / 30))
if __name__ == "__main__":
model = ShuffleNetV1()
input = torch.rand(1,3,224,224).cpu()
output = model(input)
speed(model, 'shufflenet v1')
#print(model)shufflenetv2
ShuffleNetV2 是 ShuffleNetV1 的改进版本,旨在进一步提高模型的效率和准确性。ShuffleNetV2 通过一系列实验和理论分析,提出了一套更有效的设计原则,并在此基础上构建了新的网络架构。以下是 ShuffleNetV2 的一些关键特点:
- 平衡计算负载:确保每个阶段的计算负载均匀分布,避免某些层的计算量过大。
- 避免信息丢失:通过设计合理的跳跃连接,确保信息在不同层之间有效传递。
- 减少内存访问成本:通过优化数据布局和操作顺序,减少内存访问次数,提高运行效率。
- 通道混洗(Channel Shuffle):在分组卷积之后,通过通道混洗操作重新组合通道,确保信息在不同组之间充分交流。
ShuffleNetV2 的基本结构
一个典型的 ShuffleNetV2 模型包括以下几个部分:
- 输入层:接受输入图像。
- 初始卷积层:一个标准的3x3卷积层,后面跟着批量归一化和ReLU激活。
- 基本块(Basic Blocks):多个基本块,每个块包括1x1卷积、深度卷积、1x1卷积和通道混洗操作。
- 全局平均池化层:将特征图降维为固定大小的向量。
- 全连接层:输出分类结果。
import torch
import torch.nn as nn
import time
def channel_shuffle(x, groups):
batchsize, num_channels, height, width = x.data.size()
channels_per_group = num_channels // groups
# reshape
x = x.view(batchsize, groups,
channels_per_group, height, width)
x = torch.transpose(x, 1, 2).contiguous()
# flatten
x = x.view(batchsize, -1, height, width)
return x
class InvertedResidual(nn.Module):
def __init__(self, inp, oup, stride):
super(InvertedResidual, self).__init__()
if not (1 <= stride <= 3):
raise ValueError('illegal stride value')
self.stride = stride
branch_features = oup // 2
assert (self.stride != 1) or (inp == branch_features << 1)
if self.stride > 1:
self.branch1 = nn.Sequential(
self.depthwise_conv(inp, inp, kernel_size=3, stride=self.stride, padding=1),
nn.BatchNorm2d(inp),
nn.Conv2d(inp, branch_features, kernel_size=1, stride=1, padding=0, bias=False),
nn.BatchNorm2d(branch_features),
nn.ReLU(inplace=True),
)
self.branch2 = nn.Sequential(
nn.Conv2d(inp if (self.stride > 1) else branch_features,
branch_features, kernel_size=1, stride=1, padding=0, bias=False),
nn.BatchNorm2d(branch_features),
nn.ReLU(inplace=True),
self.depthwise_conv(branch_features, branch_features, kernel_size=3, stride=self.stride, padding=1),
nn.BatchNorm2d(branch_features),
nn.Conv2d(branch_features, branch_features, kernel_size=1, stride=1, padding=0, bias=False),
nn.BatchNorm2d(branch_features),
nn.ReLU(inplace=True),
)
@staticmethod
def depthwise_conv(i, o, kernel_size, stride=1, padding=0, bias=False):
return nn.Conv2d(i, o, kernel_size, stride, padding, bias=bias, groups=i)
def forward(self, x):
if self.stride == 1:
x1, x2 = x.chunk(2, dim=1)
out = torch.cat((x1, self.branch2(x2)), dim=1)
else:
out = torch.cat((self.branch1(x), self.branch2(x)), dim=1)
out = channel_shuffle(out, 2)
return out
class ShuffleNetV2(nn.Module):
def __init__(self, stages_repeats, stages_out_channels, num_classes=1000):
super(ShuffleNetV2, self).__init__()
if len(stages_repeats) != 3:
raise ValueError('expected stages_repeats as list of 3 positive ints')
if len(stages_out_channels) != 5:
raise ValueError('expected stages_out_channels as list of 5 positive ints')
self._stage_out_channels = stages_out_channels
input_channels = 3
output_channels = self._stage_out_channels[0]
self.conv1 = nn.Sequential(
nn.Conv2d(input_channels, output_channels, 3, 2, 1, bias=False),
nn.BatchNorm2d(output_channels),
nn.ReLU(inplace=True),
)
input_channels = output_channels
self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
stage_names = ['stage{}'.format(i) for i in [2, 3, 4]]
for name, repeats, output_channels in zip(
stage_names, stages_repeats, self._stage_out_channels[1:]):
seq = [InvertedResidual(input_channels, output_channels, 2)]
for i in range(repeats - 1):
seq.append(InvertedResidual(output_channels, output_channels, 1))
setattr(self, name, nn.Sequential(*seq))
input_channels = output_channels
output_channels = self._stage_out_channels[-1]
self.conv5 = nn.Sequential(
nn.Conv2d(input_channels, output_channels, 1, 1, 0, bias=False),
nn.BatchNorm2d(output_channels),
nn.ReLU(inplace=True),
)
self.fc = nn.Linear(output_channels, num_classes)
def forward(self, x):
x = self.conv1(x)
x = self.maxpool(x)
x = self.stage2(x)
x = self.stage3(x)
x = self.stage4(x)
x = self.conv5(x)
x = x.mean([2, 3]) # globalpool
x = self.fc(x)
return x
def speed(model, name):
t0 = time.time()
input = torch.rand(1, 3, 224, 224).cpu()
t1 = time.time()
model(input)
t2 = time.time()
for i in range(0, 30):
model(input)
t3 = time.time()
print('%10s : %f' % (name, (t3 - t2) / 30))
if __name__ == "__main__":
from shufflenetv1 import ShuffleNetV1
shufflenetv1 = ShuffleNetV1()
shufflenetv2 = ShuffleNetV2([3, 7, 3], [24, 116, 232, 464, 1024])
speed(shufflenetv1, 'shufflenet v1')
speed(shufflenetv2, 'shufflenet v2')
input = torch.randn([1, 3, 224, 224])
output = shufflenetv1(input)
torch.onnx.export(shufflenetv1,input,'shufflenetv1.onnx')
output = shufflenetv2(input)
torch.onnx.export(shufflenetv2,input,'shufflenetv2.onnx')