1.前言
1989年,LeCun发表了题为Backpropagation Applied to Handwritten Zip Code Recognition
的论文,实现了第一个卷积神经网络,如论文中所说,This approach has been successfully applied to the recognition of handwritten zip code digits provided by the U.S. Postal Service.
,主要用于手写数字识别,识别精度高达99%。
2.网络架构
将原始数据降采样,得到16*16的灰度图片,输入层是256个神经元,有两层卷积层和两层全连接层,最后一层全连接层作为输出。取Tanh作为激活函数,MSE-Loss作为损失函数。
网络架构Python实现:
def __init__(self, device): # 初始化
super(LeNet, self).__init__()
self.conv1 = nn.Conv2d(1, 12, 5, stride=2, padding=2) # 卷积层 16*16——>12个8*8
self.conv2 = nn.Conv2d(12, 12, 5, stride=2, padding=2) # 卷积层 12个8*8——>12个4*4
self.fcon1 = nn.Linear(12 * 4 * 4, 30) # 全连接层
self.fcon2 = nn.Linear(30, 10) # 全连接层
self.act = nn.Tanh() # 激活函数
self.loss = nn.MSELoss() # Loss函数
3.数据集
使用MNIST数据集进行训练,预先下载MNIST数据集到本地
读取MNIST数据集Python实现:
def loadData(datafolder): # 获取datafolder下数据集
fileName = ['train-labels-idx1-ubyte.gz', 'train-images-idx3-ubyte.gz',
't10k-labels-idx1-ubyte.gz', 't10k-images-idx3-ubyte.gz'] # MNIST数据集文件
dataPath = [] # 数据集路径
for file in fileName: # 文件路径
dataPath.append(os.path.join(datafolder, file))
with gzip.open(dataPath[0], "rb") as file: # 训练集标签
tmp = np.frombuffer(file.read(), np.uint8, offset=8) # 读取
y_train = np.zeros((1, len(tmp), 10)) # 增加 1 dimension
for pos in range(len(tmp)):
y_train[0][pos][tmp[pos]] = 1.0 # 分类,对应类值为1
with gzip.open(dataPath[1], "rb") as file: # 训练集图像
tmp = np.frombuffer(file.read(), np.uint8, offset=16) # 读取
_, tmp = cv2.threshold(tmp, 127, 1, cv2.THRESH_BINARY) # 转二值图
tmp = tmp.reshape(len(y_train[-1]), 28, 28) # 转列向量
x_train = np.zeros((1, len(y_train[-1]), 16, 16)) # 转换为16*16, 增加 1 dimension
for t in range(len(tmp)):
x_train[0][t] = cv2.resize(tmp[t], (16, 16), interpolation=cv2.INTER_AREA) # 赋值
with gzip.open(dataPath[2], "rb") as file: # 测试集标签
tmp = np.frombuffer(file.read(), np.uint8, offset=8) # 读取
y_test = np.zeros((1, len(tmp), 10)) # 增加 1 dimension
for pos in range(len(tmp)):
y_test[0][pos][tmp[pos]] = 1.0 # 分类,对应类值为1
with gzip.open(dataPath[3], "rb") as file: # 测试集图像
tmp = np.frombuffer(file.read(), np.uint8, offset=16) # 读取
_, tmp = cv2.threshold(tmp, 127, 1, cv2.THRESH_BINARY) # 转二值图
tmp = tmp.reshape(len(y_test[-1]), 28, 28) # 转列向量
x_test = np.zeros((1, len(y_test[-1]), 16, 16)) # 转换为16*16, 增加 1 dimension
for t in range(len(tmp)):
x_test[0][t] = cv2.resize(tmp[t], (16, 16), interpolation=cv2.INTER_AREA) # 赋值
print("The datasets were loaded successfully.")
print("Starting train the model.")
print("--------------------")
return (x_train, y_train), (x_test, y_test)
4.正向传播
正向传播Python实现:
def forward(self, data): # 正向传播
data = 1.7159 * self.act(self.conv1(data) * 2.0 / 3.0) # 第一层卷积
data = 1.7159 * self.act(self.conv2(data) * 2.0 / 3.0) # 第二层卷积
data = data.view(-1, 192) # 12*4*4展开为1*192
data = 1.7159 * self.act(self.fcon1(data) * 2.0 / 3.0) # 第一层全连接
data = 1.7159 * self.act(self.fcon2(data) * 2.0 / 3.0) # 第二层全连接
return data
5.测试当前模型对数据集的精度
def test(model, dataTest, device): # 测试数据集,求精度
sum_num = len(dataTest[0][0])
num = 0
for x_test, y_test in zip(dataTest[0][0], dataTest[0][1]): # 求每个样本的预测结果
with torch.no_grad():
x = x_test.unsqueeze(0).unsqueeze(0).to(device)
y_predict = model(x) # 预测值
_, index = y_predict.max(dim=1)
if y_test[index] == 1.0: # 分类正确
num += 1
return num / sum_num # mAP
6.训练模型
optimizer进行学习率的调节
def adjust_lr(epoch, optimizer): # 调节学习率
if epoch < 2:
for param_group in optimizer.param_groups:
param_group['lr'] = 2.0e-3
elif epoch < 5:
for param_group in optimizer.param_groups:
param_group['lr'] = 1.0e-3
elif epoch < 8:
for param_group in optimizer.param_groups:
param_group['lr'] = 5.0e-4
else:
for param_group in optimizer.param_groups:
param_group['lr'] = 1.0e-4
return optimizer
# model:卷积神经网络 optimizer: 优化器 dataTrain: 训练数据集 dataTest: 测试数据集 epoch: 迭代次数 device: CPU / GPU
def train(model, optimizer, dataTrain, dataTest, epoches, device):
AP = [] # 保存精度变化
Loss = [] # 保存loss变化
for epoch in range(epoches): # 迭代epoches次
sum_num = 0
lossSum = 0
optimizer = adjust_lr(epoch, optimizer)
for x_train, y_train in zip(dataTrain[0][0], dataTrain[0][1]): # 取单组样本
x = x_train.unsqueeze(0).unsqueeze(0).to(device) # 转换为 CPU/ GPU
y = y_train.unsqueeze(0).to(device) # 转换为 CPU/ GPU
p = model(x) # 获取正向传播值
optimizer.zero_grad() # 清空梯度
loss = model.loss(p, y) # 求 loss
loss.backward() # 反馈调节,计算梯度
optimizer.step() # 根据梯度更新参数
sum_num += 1
lossSum += loss.to("cpu").detach().numpy()
if sum_num % 2000 == 0: # 每隔2000次输出一次,保证训练正常进行
print("{}/{}——>loss = {}".format(sum_num, len(dataTrain[0][0]), loss.item()))
Loss.append(lossSum / 60000) # 保存当前loss
mAP = test(model, dataTest, device)
AP.append(mAP)
print("Epoch {} complete mAP = {:.2f}% loss = {:.4f}".format(epoch + 1, mAP * 100, lossSum / 60000)) # 一次迭代结束
plt.subplot(1, 2, 1)
plt.plot(AP)
plt.subplot(1, 2, 2)
plt.plot(Loss)
plt.savefig("F:\课外\datasets\MNIST\\2.png") # 绘图保存AP变化和loss变化
return model
7.使用模型进行推理预测
network = torch.load(data_folder + "net2.pkl")
mAP = test(network, test_data, device)
print("mAP = {:.2f}%".format(mAP * 100))
8.全部代码
注意MNIST数据集放在F:\课外\datasets\MNIST
目录下,train中精度变化图保存路径为F:\课外\datasets\MNIST\2.png
。
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
import gzip
import os.path
import numpy as np
import cv2
import torch
import torch.nn as nn
import torch.optim as optim
from matplotlib import pyplot as plt
class LeNet(nn.Module):
def __init__(self, device): # 初始化
super(LeNet, self).__init__()
self.conv1 = nn.Conv2d(1, 12, 5, stride=2, padding=2) # 卷积层 16*16——>12个8*8
self.conv2 = nn.Conv2d(12, 12, 5, stride=2, padding=2) # 卷积层 12个8*8——>12个4*4
self.fcon1 = nn.Linear(12 * 4 * 4, 30) # 全连接层
self.fcon2 = nn.Linear(30, 10) # 全连接层
self.act = nn.Tanh() # 激活函数
self.loss = nn.MSELoss() # Loss函数
"""
# 初始化权重
for m in self.modules():
if isinstance(m, nn.Conv2d):
F_in = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
m.weight.data = torch.rand(m.weight.data.size()) * 4.8 / F_in - 2.4 / F_in
if isinstance(m, nn.Linear):
F_in = m.out_features
m.weight.data = torch.rand(m.weight.data.size()) * 4.8 / F_in - 2.4 / F_in
"""
self.to(device) # 设置device
def forward(self, data): # 正向传播
data = 1.7159 * self.act(self.conv1(data) * 2.0 / 3.0) # 第一层卷积
data = 1.7159 * self.act(self.conv2(data) * 2.0 / 3.0) # 第二层卷积
data = data.view(-1, 192) # 12*4*4展开为1*192
data = 1.7159 * self.act(self.fcon1(data) * 2.0 / 3.0) # 第一层全连接
data = 1.7159 * self.act(self.fcon2(data) * 2.0 / 3.0) # 第二层全连接
return data
def adjust_lr(epoch, optimizer): # 调节学习率
if epoch < 2:
for param_group in optimizer.param_groups:
param_group['lr'] = 2.0e-3
elif epoch < 5:
for param_group in optimizer.param_groups:
param_group['lr'] = 1.0e-3
elif epoch < 8:
for param_group in optimizer.param_groups:
param_group['lr'] = 5.0e-4
else:
for param_group in optimizer.param_groups:
param_group['lr'] = 1.0e-4
return optimizer
# model:卷积神经网络 optimizer: 优化器 dataTrain: 训练数据集 dataTest: 测试数据集 epoch: 迭代次数 device: CPU / GPU
def train(model, optimizer, dataTrain, dataTest, epoches, device):
AP = [] # 保存精度变化
Loss = [] # 保存loss变化
for epoch in range(epoches): # 迭代epoches次
sum_num = 0
lossSum = 0
optimizer = adjust_lr(epoch, optimizer)
for x_train, y_train in zip(dataTrain[0][0], dataTrain[0][1]): # 取单组样本
x = x_train.unsqueeze(0).unsqueeze(0).to(device) # 转换为 CPU/ GPU
y = y_train.unsqueeze(0).to(device) # 转换为 CPU/ GPU
p = model(x) # 获取正向传播值
optimizer.zero_grad() # 清空梯度
loss = model.loss(p, y) # 求 loss
loss.backward() # 反馈调节,计算梯度
optimizer.step() # 根据梯度更新参数
sum_num += 1
lossSum += loss.to("cpu").detach().numpy()
if sum_num % 2000 == 0: # 每隔2000次输出一次,保证训练正常进行
print("{}/{}——>loss = {}".format(sum_num, len(dataTrain[0][0]), loss.item()))
Loss.append(lossSum / 60000) # 保存当前loss
mAP = test(model, dataTest, device)
AP.append(mAP)
print("Epoch {} complete mAP = {:.2f}% loss = {:.4f}".format(epoch + 1, mAP * 100, lossSum / 60000)) # 一次迭代结束
plt.subplot(1, 2, 1)
plt.plot(AP)
plt.subplot(1, 2, 2)
plt.plot(Loss)
plt.savefig("F:\课外\datasets\MNIST\\2.png") # 绘图保存AP变化和loss变化
return model
def test(model, dataTest, device): # 测试数据集,求精度
sum_num = len(dataTest[0][0])
num = 0
for x_test, y_test in zip(dataTest[0][0], dataTest[0][1]): # 求每个样本的预测结果
with torch.no_grad():
x = x_test.unsqueeze(0).unsqueeze(0).to(device)
y_predict = model(x) # 预测值
_, index = y_predict.max(dim=1)
if y_test[index] == 1.0: # 分类正确
num += 1
return num / sum_num # mAP
def loadData(datafolder): # 获取datafolder下数据集
fileName = ['train-labels-idx1-ubyte.gz', 'train-images-idx3-ubyte.gz',
't10k-labels-idx1-ubyte.gz', 't10k-images-idx3-ubyte.gz'] # MNIST数据集文件
dataPath = [] # 数据集路径
for file in fileName: # 文件路径
dataPath.append(os.path.join(datafolder, file))
with gzip.open(dataPath[0], "rb") as file: # 训练集标签
tmp = np.frombuffer(file.read(), np.uint8, offset=8) # 读取
y_train = np.zeros((1, len(tmp), 10)) # 增加 1 dimension
for pos in range(len(tmp)):
y_train[0][pos][tmp[pos]] = 1.0 # 分类,对应类值为1
with gzip.open(dataPath[1], "rb") as file: # 训练集图像
tmp = np.frombuffer(file.read(), np.uint8, offset=16) # 读取
_, tmp = cv2.threshold(tmp, 127, 1, cv2.THRESH_BINARY) # 转二值图
tmp = tmp.reshape(len(y_train[-1]), 28, 28) # 转列向量
x_train = np.zeros((1, len(y_train[-1]), 16, 16)) # 转换为16*16, 增加 1 dimension
for t in range(len(tmp)):
x_train[0][t] = cv2.resize(tmp[t], (16, 16), interpolation=cv2.INTER_AREA) # 赋值
with gzip.open(dataPath[2], "rb") as file: # 测试集标签
tmp = np.frombuffer(file.read(), np.uint8, offset=8) # 读取
y_test = np.zeros((1, len(tmp), 10)) # 增加 1 dimension
for pos in range(len(tmp)):
y_test[0][pos][tmp[pos]] = 1.0 # 分类,对应类值为1
with gzip.open(dataPath[3], "rb") as file: # 测试集图像
tmp = np.frombuffer(file.read(), np.uint8, offset=16) # 读取
_, tmp = cv2.threshold(tmp, 127, 1, cv2.THRESH_BINARY) # 转二值图
tmp = tmp.reshape(len(y_test[-1]), 28, 28) # 转列向量
x_test = np.zeros((1, len(y_test[-1]), 16, 16)) # 转换为16*16, 增加 1 dimension
for t in range(len(tmp)):
x_test[0][t] = cv2.resize(tmp[t], (16, 16), interpolation=cv2.INTER_AREA) # 赋值
print("The datasets were loaded successfully.")
print("Starting train the model.")
print("--------------------")
return (x_train, y_train), (x_test, y_test)
if __name__ == "__main__":
# data_folder = "F:\\data\\MNIST\\" # 数据集所在文件夹
data_folder = "F:\课外\datasets\MNIST"
(x_train, y_train), (x_test, y_test) = loadData(data_folder) # 导入数据集
training_data = [(torch.from_numpy(x_value).type(torch.FloatTensor),
torch.from_numpy(y_value).type(torch.FloatTensor))
for x_value, y_value in zip(x_train, y_train)] # 训练数据集
test_data = [(torch.from_numpy(x_value).type(torch.FloatTensor),
torch.from_numpy(y_value).type(torch.FloatTensor))
for x_value, y_value in zip(x_test, y_test)] # 测试数据集
device = torch.device("cuda:0") if torch.cuda.is_available() else torch.device("cpu") # CPU / GPU
network = LeNet(device) # 创建device上卷积神经网络
epoch = 20 # 迭代次数
optimizer = optim.SGD(network.parameters(), lr=2.0e-3) # 随机梯度下降, 学习率初始值为0.002
network = train(network, optimizer, training_data, test_data, epoch, device) # 训练模型
savePath = data_folder + "\net2.pkl" # 保存路径
print("The training successfully end.")
print("the model was saved in the data path: {} .".format(savePath))
torch.save(network, savePath) # 保存神经网络
"""
# 导入神经网络并分类预测
network = torch.load(data_folder + "net2.pkl")
mAP = test(network, test_data, device)
print("mAP = {:.2f}%".format(mAP * 100))
"""
训练过程
AP和Loss的变化曲线