卷积神经网络-LeNet复现


1.前言

1989年,LeCun发表了题为Backpropagation Applied to Handwritten Zip Code Recognition的论文,实现了第一个卷积神经网络,如论文中所说,This approach has been successfully applied to the recognition of handwritten zip code digits provided by the U.S. Postal Service.,主要用于手写数字识别,识别精度高达99%

digits

2.网络架构

将原始数据降采样,得到16*16的灰度图片,输入层是256个神经元,有两层卷积层和两层全连接层,最后一层全连接层作为输出。取Tanh作为激活函数,MSE-Loss作为损失函数。

architecture

网络架构Python实现:

def __init__(self, device):  # 初始化
    super(LeNet, self).__init__()
    self.conv1 = nn.Conv2d(1, 12, 5, stride=2, padding=2)  # 卷积层  16*16——>12个8*8
    self.conv2 = nn.Conv2d(12, 12, 5, stride=2, padding=2)  # 卷积层  12个8*8——>12个4*4
    self.fcon1 = nn.Linear(12 * 4 * 4, 30)  # 全连接层
    self.fcon2 = nn.Linear(30, 10)  # 全连接层
    self.act = nn.Tanh()  # 激活函数
    self.loss = nn.MSELoss()  # Loss函数

3.数据集

使用MNIST数据集进行训练,预先下载MNIST数据集到本地

读取MNIST数据集Python实现:

def loadData(datafolder):  # 获取datafolder下数据集
    fileName = ['train-labels-idx1-ubyte.gz', 'train-images-idx3-ubyte.gz',
                't10k-labels-idx1-ubyte.gz', 't10k-images-idx3-ubyte.gz']  # MNIST数据集文件
    dataPath = []  # 数据集路径
    for file in fileName:  # 文件路径
        dataPath.append(os.path.join(datafolder, file))

    with gzip.open(dataPath[0], "rb") as file:  # 训练集标签
        tmp = np.frombuffer(file.read(), np.uint8, offset=8)  # 读取
        y_train = np.zeros((1, len(tmp), 10))  # 增加 1 dimension
        for pos in range(len(tmp)):
            y_train[0][pos][tmp[pos]] = 1.0  # 分类,对应类值为1

    with gzip.open(dataPath[1], "rb") as file:  # 训练集图像
        tmp = np.frombuffer(file.read(), np.uint8, offset=16)  # 读取
        _, tmp = cv2.threshold(tmp, 127, 1, cv2.THRESH_BINARY)  # 转二值图
        tmp = tmp.reshape(len(y_train[-1]), 28, 28)  # 转列向量
        x_train = np.zeros((1, len(y_train[-1]), 16, 16))  # 转换为16*16, 增加 1 dimension
        for t in range(len(tmp)):
            x_train[0][t] = cv2.resize(tmp[t], (16, 16), interpolation=cv2.INTER_AREA)  # 赋值

    with gzip.open(dataPath[2], "rb") as file:  # 测试集标签
        tmp = np.frombuffer(file.read(), np.uint8, offset=8)  # 读取
        y_test = np.zeros((1, len(tmp), 10))  # 增加 1 dimension
        for pos in range(len(tmp)):
            y_test[0][pos][tmp[pos]] = 1.0  # 分类,对应类值为1

    with gzip.open(dataPath[3], "rb") as file:  # 测试集图像
        tmp = np.frombuffer(file.read(), np.uint8, offset=16)  # 读取
        _, tmp = cv2.threshold(tmp, 127, 1, cv2.THRESH_BINARY)  # 转二值图
        tmp = tmp.reshape(len(y_test[-1]), 28, 28)  # 转列向量
        x_test = np.zeros((1, len(y_test[-1]), 16, 16))  # 转换为16*16, 增加 1 dimension
        for t in range(len(tmp)):
            x_test[0][t] = cv2.resize(tmp[t], (16, 16), interpolation=cv2.INTER_AREA)  # 赋值
    print("The datasets were loaded successfully.")
    print("Starting train the model.")
    print("--------------------")
    return (x_train, y_train), (x_test, y_test)

4.正向传播

正向传播Python实现:

def forward(self, data):  # 正向传播
    data = 1.7159 * self.act(self.conv1(data) * 2.0 / 3.0)  # 第一层卷积
    data = 1.7159 * self.act(self.conv2(data) * 2.0 / 3.0)  # 第二层卷积
    data = data.view(-1, 192)  # 12*4*4展开为1*192
    data = 1.7159 * self.act(self.fcon1(data) * 2.0 / 3.0)  # 第一层全连接
    data = 1.7159 * self.act(self.fcon2(data) * 2.0 / 3.0)  # 第二层全连接
    return data

5.测试当前模型对数据集的精度

def test(model, dataTest, device):  # 测试数据集,求精度
    sum_num = len(dataTest[0][0])
    num = 0
    for x_test, y_test in zip(dataTest[0][0], dataTest[0][1]):  # 求每个样本的预测结果
        with torch.no_grad():
            x = x_test.unsqueeze(0).unsqueeze(0).to(device)
            y_predict = model(x)  # 预测值
            _, index = y_predict.max(dim=1)
            if y_test[index] == 1.0:  # 分类正确
                num += 1
    return num / sum_num  # mAP

6.训练模型

optimizer进行学习率的调节

def adjust_lr(epoch, optimizer):  # 调节学习率
    if epoch < 2:
        for param_group in optimizer.param_groups:
            param_group['lr'] = 2.0e-3
    elif epoch < 5:
        for param_group in optimizer.param_groups:
            param_group['lr'] = 1.0e-3
    elif epoch < 8:
        for param_group in optimizer.param_groups:
            param_group['lr'] = 5.0e-4
    else:
        for param_group in optimizer.param_groups:
            param_group['lr'] = 1.0e-4

    return optimizer


# model:卷积神经网络 optimizer: 优化器 dataTrain: 训练数据集 dataTest: 测试数据集 epoch: 迭代次数 device: CPU / GPU
def train(model, optimizer, dataTrain, dataTest, epoches, device):
    AP = []  # 保存精度变化
    Loss = []  # 保存loss变化
    for epoch in range(epoches):  # 迭代epoches次
        sum_num = 0
        lossSum = 0
        optimizer = adjust_lr(epoch, optimizer)
        for x_train, y_train in zip(dataTrain[0][0], dataTrain[0][1]):  # 取单组样本
            x = x_train.unsqueeze(0).unsqueeze(0).to(device)  # 转换为 CPU/ GPU
            y = y_train.unsqueeze(0).to(device)  # 转换为 CPU/ GPU
            p = model(x)  # 获取正向传播值
            optimizer.zero_grad()  # 清空梯度
            loss = model.loss(p, y)  # 求 loss
            loss.backward()  # 反馈调节,计算梯度
            optimizer.step()  # 根据梯度更新参数
            sum_num += 1
            lossSum += loss.to("cpu").detach().numpy()
            if sum_num % 2000 == 0:  # 每隔2000次输出一次,保证训练正常进行
                print("{}/{}——>loss = {}".format(sum_num, len(dataTrain[0][0]), loss.item()))

        Loss.append(lossSum / 60000)  # 保存当前loss
        mAP = test(model, dataTest, device)
        AP.append(mAP)
        print("Epoch {} complete  mAP = {:.2f}%  loss = {:.4f}".format(epoch + 1, mAP * 100, lossSum / 60000))  # 一次迭代结束
    plt.subplot(1, 2, 1)
    plt.plot(AP)
    plt.subplot(1, 2, 2)
    plt.plot(Loss)
    plt.savefig("F:\课外\datasets\MNIST\\2.png")  # 绘图保存AP变化和loss变化
    return model

7.使用模型进行推理预测

network = torch.load(data_folder + "net2.pkl")
mAP = test(network, test_data, device)
print("mAP = {:.2f}%".format(mAP * 100))  

8.全部代码

注意MNIST数据集放在F:\课外\datasets\MNIST目录下,train中精度变化图保存路径为F:\课外\datasets\MNIST\2.png

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
import gzip
import os.path
import numpy as np
import cv2
import torch
import torch.nn as nn
import torch.optim as optim
from matplotlib import pyplot as plt


class LeNet(nn.Module):
    def __init__(self, device):  # 初始化
        super(LeNet, self).__init__()
        self.conv1 = nn.Conv2d(1, 12, 5, stride=2, padding=2)  # 卷积层  16*16——>12个8*8
        self.conv2 = nn.Conv2d(12, 12, 5, stride=2, padding=2)  # 卷积层  12个8*8——>12个4*4
        self.fcon1 = nn.Linear(12 * 4 * 4, 30)  # 全连接层
        self.fcon2 = nn.Linear(30, 10)  # 全连接层
        self.act = nn.Tanh()  # 激活函数
        self.loss = nn.MSELoss()  # Loss函数

        """
        #  初始化权重
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                F_in = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
                m.weight.data = torch.rand(m.weight.data.size()) * 4.8 / F_in - 2.4 / F_in
            if isinstance(m, nn.Linear):
                F_in = m.out_features
                m.weight.data = torch.rand(m.weight.data.size()) * 4.8 / F_in - 2.4 / F_in        
        """
        self.to(device)  # 设置device

    def forward(self, data):  # 正向传播
        data = 1.7159 * self.act(self.conv1(data) * 2.0 / 3.0)  # 第一层卷积
        data = 1.7159 * self.act(self.conv2(data) * 2.0 / 3.0)  # 第二层卷积
        data = data.view(-1, 192)  # 12*4*4展开为1*192
        data = 1.7159 * self.act(self.fcon1(data) * 2.0 / 3.0)  # 第一层全连接
        data = 1.7159 * self.act(self.fcon2(data) * 2.0 / 3.0)  # 第二层全连接
        return data


def adjust_lr(epoch, optimizer):  # 调节学习率
    if epoch < 2:
        for param_group in optimizer.param_groups:
            param_group['lr'] = 2.0e-3
    elif epoch < 5:
        for param_group in optimizer.param_groups:
            param_group['lr'] = 1.0e-3
    elif epoch < 8:
        for param_group in optimizer.param_groups:
            param_group['lr'] = 5.0e-4
    else:
        for param_group in optimizer.param_groups:
            param_group['lr'] = 1.0e-4

    return optimizer


# model:卷积神经网络 optimizer: 优化器 dataTrain: 训练数据集 dataTest: 测试数据集 epoch: 迭代次数 device: CPU / GPU
def train(model, optimizer, dataTrain, dataTest, epoches, device):
    AP = []  # 保存精度变化
    Loss = []  # 保存loss变化
    for epoch in range(epoches):  # 迭代epoches次
        sum_num = 0
        lossSum = 0
        optimizer = adjust_lr(epoch, optimizer)
        for x_train, y_train in zip(dataTrain[0][0], dataTrain[0][1]):  # 取单组样本
            x = x_train.unsqueeze(0).unsqueeze(0).to(device)  # 转换为 CPU/ GPU
            y = y_train.unsqueeze(0).to(device)  # 转换为 CPU/ GPU
            p = model(x)  # 获取正向传播值
            optimizer.zero_grad()  # 清空梯度
            loss = model.loss(p, y)  # 求 loss
            loss.backward()  # 反馈调节,计算梯度
            optimizer.step()  # 根据梯度更新参数
            sum_num += 1
            lossSum += loss.to("cpu").detach().numpy()
            if sum_num % 2000 == 0:  # 每隔2000次输出一次,保证训练正常进行
                print("{}/{}——>loss = {}".format(sum_num, len(dataTrain[0][0]), loss.item()))

        Loss.append(lossSum / 60000)  # 保存当前loss
        mAP = test(model, dataTest, device)
        AP.append(mAP)
        print("Epoch {} complete  mAP = {:.2f}%  loss = {:.4f}".format(epoch + 1, mAP * 100, lossSum / 60000))  # 一次迭代结束
    plt.subplot(1, 2, 1)
    plt.plot(AP)
    plt.subplot(1, 2, 2)
    plt.plot(Loss)
    plt.savefig("F:\课外\datasets\MNIST\\2.png")  # 绘图保存AP变化和loss变化
    return model


def test(model, dataTest, device):  # 测试数据集,求精度
    sum_num = len(dataTest[0][0])
    num = 0
    for x_test, y_test in zip(dataTest[0][0], dataTest[0][1]):  # 求每个样本的预测结果
        with torch.no_grad():
            x = x_test.unsqueeze(0).unsqueeze(0).to(device)
            y_predict = model(x)  # 预测值
            _, index = y_predict.max(dim=1)
            if y_test[index] == 1.0:  # 分类正确
                num += 1
    return num / sum_num  # mAP


def loadData(datafolder):  # 获取datafolder下数据集
    fileName = ['train-labels-idx1-ubyte.gz', 'train-images-idx3-ubyte.gz',
                't10k-labels-idx1-ubyte.gz', 't10k-images-idx3-ubyte.gz']  # MNIST数据集文件
    dataPath = []  # 数据集路径
    for file in fileName:  # 文件路径
        dataPath.append(os.path.join(datafolder, file))

    with gzip.open(dataPath[0], "rb") as file:  # 训练集标签
        tmp = np.frombuffer(file.read(), np.uint8, offset=8)  # 读取
        y_train = np.zeros((1, len(tmp), 10))  # 增加 1 dimension
        for pos in range(len(tmp)):
            y_train[0][pos][tmp[pos]] = 1.0  # 分类,对应类值为1

    with gzip.open(dataPath[1], "rb") as file:  # 训练集图像
        tmp = np.frombuffer(file.read(), np.uint8, offset=16)  # 读取
        _, tmp = cv2.threshold(tmp, 127, 1, cv2.THRESH_BINARY)  # 转二值图
        tmp = tmp.reshape(len(y_train[-1]), 28, 28)  # 转列向量
        x_train = np.zeros((1, len(y_train[-1]), 16, 16))  # 转换为16*16, 增加 1 dimension
        for t in range(len(tmp)):
            x_train[0][t] = cv2.resize(tmp[t], (16, 16), interpolation=cv2.INTER_AREA)  # 赋值

    with gzip.open(dataPath[2], "rb") as file:  # 测试集标签
        tmp = np.frombuffer(file.read(), np.uint8, offset=8)  # 读取
        y_test = np.zeros((1, len(tmp), 10))  # 增加 1 dimension
        for pos in range(len(tmp)):
            y_test[0][pos][tmp[pos]] = 1.0  # 分类,对应类值为1

    with gzip.open(dataPath[3], "rb") as file:  # 测试集图像
        tmp = np.frombuffer(file.read(), np.uint8, offset=16)  # 读取
        _, tmp = cv2.threshold(tmp, 127, 1, cv2.THRESH_BINARY)  # 转二值图
        tmp = tmp.reshape(len(y_test[-1]), 28, 28)  # 转列向量
        x_test = np.zeros((1, len(y_test[-1]), 16, 16))  # 转换为16*16, 增加 1 dimension
        for t in range(len(tmp)):
            x_test[0][t] = cv2.resize(tmp[t], (16, 16), interpolation=cv2.INTER_AREA)  # 赋值

    print("The datasets were loaded successfully.")
    print("Starting train the model.")
    print("--------------------")
    return (x_train, y_train), (x_test, y_test)


if __name__ == "__main__":
    # data_folder = "F:\\data\\MNIST\\"  # 数据集所在文件夹
    data_folder = "F:\课外\datasets\MNIST"
    (x_train, y_train), (x_test, y_test) = loadData(data_folder)  # 导入数据集
    training_data = [(torch.from_numpy(x_value).type(torch.FloatTensor),
                      torch.from_numpy(y_value).type(torch.FloatTensor))
                     for x_value, y_value in zip(x_train, y_train)]  # 训练数据集
    test_data = [(torch.from_numpy(x_value).type(torch.FloatTensor),
                  torch.from_numpy(y_value).type(torch.FloatTensor))
                 for x_value, y_value in zip(x_test, y_test)]  # 测试数据集

    device = torch.device("cuda:0") if torch.cuda.is_available() else torch.device("cpu")  # CPU / GPU
    network = LeNet(device)  # 创建device上卷积神经网络
    epoch = 20  # 迭代次数
    optimizer = optim.SGD(network.parameters(), lr=2.0e-3)  # 随机梯度下降, 学习率初始值为0.002
    network = train(network, optimizer, training_data, test_data, epoch, device)  # 训练模型
    savePath = data_folder + "\net2.pkl"  # 保存路径
    print("The training successfully end.")
    print("the model was saved in the data path: {} .".format(savePath))
    torch.save(network, savePath)  # 保存神经网络
    """
    # 导入神经网络并分类预测
    network = torch.load(data_folder + "net2.pkl")
    mAP = test(network, test_data, device)
    print("mAP = {:.2f}%".format(mAP * 100))    
    """

训练过程

train1

train2

AP和Loss的变化曲线

AP+Loss


文章作者: 易安
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 易安 !
评论
  目录