> 文档中心 > 知识蒸馏NST算法实战:使用CoatNet蒸馏ResNet18

知识蒸馏NST算法实战:使用CoatNet蒸馏ResNet18

文章目录

  • 摘要
  • 最终结论
  • 模型
    • ResNet18, ResNet34
    • CoatNet
  • 数据准备
  • 训练Teacher模型
    • 步骤
      • 导入需要的库
      • 定义训练和验证函数
      • 定义全局参数
      • 图像预处理与增强
      • 读取数据
      • 设置模型和Loss
  • 学生网络
    • 步骤
      • 导入需要的库
      • 定义训练和验证函数
      • 定义全局参数
      • 图像预处理与增强
      • 读取数据
      • 设置模型和Loss
  • 蒸馏学生网络
    • 步骤
      • 导入需要的库
      • 定义训练和验证函数
      • 定义全局参数
      • 图像预处理与增强
      • 读取数据
      • 设置模型和Loss
  • 结果比对
  • 总结

摘要

复杂度的检测模型虽然可以取得SOTA的精度,但它们往往难以直接落地应用。模型压缩方法帮助模型在效率和精度之间进行折中。知识蒸馏是模型压缩的一种有效手段,它的核心思想是迫使轻量级的学生模型去学习教师模型提取到的知识,从而提高学生模型的性能。已有的知识蒸馏方法可以分别为三大类:

  • 基于特征的(feature-based,例如VID、NST、FitNets、fine-grained feature imitation)
  • 基于关系的(relation-based,例如IRG、Relational KD、CRD、similarity-preserving knowledge distillation)
  • 基于响应的(response-based,例如Hinton的知识蒸馏开山之作)
    知识蒸馏NST算法实战:使用CoatNet蒸馏ResNet18

今天我们就尝试用基于关系特征的NST知识蒸馏算法完成这篇实战。NST蒸馏是对模型里面的的Block最后一层Feature做蒸馏,所以需要最后一层block的值。所以我们对模型要做修改来适应NST算法,并且为了使Teacher和Student的网络层之间的参数一致,我们这次选用CoatNet作为Teacher模型,选择ResNet18作为Student。

最终结论

先把结论说了吧! Teacher网络使用CoatNet的coatnet_2模型,Student网络使用ResNet18。如下表

网络 epochs ACC
CoatNet 100 91%
ResNet18 100 89%
ResNet18 +NST 100 90%

模型

模型没有用pytorch官方自带的,而是参照以前总结的ResNet模型修改的。ResNet模型结构如下图:
知识蒸馏NST算法实战:使用CoatNet蒸馏ResNet18

ResNet18, ResNet34

ResNet18, ResNet34模型的残差结构是一致的,结构如下:
知识蒸馏NST算法实战:使用CoatNet蒸馏ResNet18
代码如下:
resnet.py

import torchimport torchvisionfrom torch import nnfrom torch.nn import functional as F# from torchsummary import summaryclass ResidualBlock(nn.Module):    """    实现子module: Residual Block    """    def __init__(self, inchannel, outchannel, stride=1, shortcut=None): super(ResidualBlock, self).__init__() self.left = nn.Sequential(     nn.Conv2d(inchannel, outchannel, 3, stride, 1, bias=False),     nn.BatchNorm2d(outchannel),     nn.ReLU(inplace=True),     nn.Conv2d(outchannel, outchannel, 3, 1, 1, bias=False),     nn.BatchNorm2d(outchannel) ) self.right = shortcut    def forward(self, x): out = self.left(x) residual = x if self.right is None else self.right(x) out += residual return F.relu(out)class ResNet(nn.Module):    """    实现主module:ResNet34    ResNet34包含多个layer,每个layer又包含多个Residual block    用子module来实现Residual block,用_make_layer函数来实现layer    """    def __init__(self, blocks, num_classes=1000): super(ResNet, self).__init__() self.model_name = 'resnet34' # 前几层: 图像转换 self.pre = nn.Sequential(     nn.Conv2d(3, 64, 7, 2, 3, bias=False),     nn.BatchNorm2d(64),     nn.ReLU(inplace=True),     nn.MaxPool2d(3, 2, 1)) # 重复的layer,分别有3,4,6,3个residual block self.layer1 = self._make_layer(64, 64, blocks[0]) self.layer2 = self._make_layer(64, 128, blocks[1], stride=2) self.layer3 = self._make_layer(128, 256, blocks[2], stride=2) self.layer4 = self._make_layer(256, 512, blocks[3], stride=2) # 分类用的全连接 self.fc = nn.Linear(512, num_classes)    def _make_layer(self, inchannel, outchannel, block_num, stride=1): """ 构建layer,包含多个residual block """ shortcut = nn.Sequential(     nn.Conv2d(inchannel, outchannel, 1, stride, bias=False),     nn.BatchNorm2d(outchannel),     nn.ReLU() ) layers = [] layers.append(ResidualBlock(inchannel, outchannel, stride, shortcut)) for i in range(1, block_num):     layers.append(ResidualBlock(outchannel, outchannel)) return nn.Sequential(*layers)    def forward(self, x): x = self.pre(x) l1_out = self.layer1(x) l2_out = self.layer2(l1_out) l3_out = self.layer3(l2_out) l4_out = self.layer4(l3_out) p_out = F.avg_pool2d(l4_out, 7) fea = p_out.view(p_out.size(0), -1) out=self.fc(fea) return l1_out,l2_out,l3_out,l4_out,fea,outdef ResNet18():    return ResNet([2, 2, 2, 2])def ResNet34():    return ResNet([3, 4, 6, 3])if __name__ == '__main__':    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")    model = ResNet34()    model.to(device)    # summary(model, (3, 224, 224))

主要修改了输出结果,将每个block的结果输出出来。

CoatNet

代码:
coatnet.py

import torchimport torch.nn as nnfrom einops import rearrangefrom einops.layers.torch import Rearrangedef conv_3x3_bn(inp, oup, image_size, downsample=False):    stride = 1 if downsample == False else 2    return nn.Sequential( nn.Conv2d(inp, oup, 3, stride, 1, bias=False), nn.BatchNorm2d(oup), nn.GELU()    )class PreNorm(nn.Module):    def __init__(self, dim, fn, norm): super().__init__() self.norm = norm(dim) self.fn = fn    def forward(self, x, **kwargs): return self.fn(self.norm(x), **kwargs)class SE(nn.Module):    def __init__(self, inp, oup, expansion=0.25): super().__init__() self.avg_pool = nn.AdaptiveAvgPool2d(1) self.fc = nn.Sequential(     nn.Linear(oup, int(inp * expansion), bias=False),     nn.GELU(),     nn.Linear(int(inp * expansion), oup, bias=False),     nn.Sigmoid() )    def forward(self, x): b, c, _, _ = x.size() y = self.avg_pool(x).view(b, c) y = self.fc(y).view(b, c, 1, 1) return x * yclass FeedForward(nn.Module):    def __init__(self, dim, hidden_dim, dropout=0.): super().__init__() self.net = nn.Sequential(     nn.Linear(dim, hidden_dim),     nn.GELU(),     nn.Dropout(dropout),     nn.Linear(hidden_dim, dim),     nn.Dropout(dropout) )    def forward(self, x): return self.net(x)class MBConv(nn.Module):    def __init__(self, inp, oup, image_size, downsample=False, expansion=4): super().__init__() self.downsample = downsample stride = 1 if self.downsample == False else 2 hidden_dim = int(inp * expansion) if self.downsample:     self.pool = nn.MaxPool2d(3, 2, 1)     self.proj = nn.Conv2d(inp, oup, 1, 1, 0, bias=False) if expansion == 1:     self.conv = nn.Sequential(  # dw  nn.Conv2d(hidden_dim, hidden_dim, 3, stride,     1, groups=hidden_dim, bias=False),  nn.BatchNorm2d(hidden_dim),  nn.GELU(),  # pw-linear  nn.Conv2d(hidden_dim, oup, 1, 1, 0, bias=False),  nn.BatchNorm2d(oup),     ) else:     self.conv = nn.Sequential(  # pw  # down-sample in the first conv  nn.Conv2d(inp, hidden_dim, 1, stride, 0, bias=False),  nn.BatchNorm2d(hidden_dim),  nn.GELU(),  # dw  nn.Conv2d(hidden_dim, hidden_dim, 3, 1, 1,     groups=hidden_dim, bias=False),  nn.BatchNorm2d(hidden_dim),  nn.GELU(),  SE(inp, hidden_dim),  # pw-linear  nn.Conv2d(hidden_dim, oup, 1, 1, 0, bias=False),  nn.BatchNorm2d(oup),     )  self.conv = PreNorm(inp, self.conv, nn.BatchNorm2d)    def forward(self, x): if self.downsample:     return self.proj(self.pool(x)) + self.conv(x) else:     return x + self.conv(x)class Attention(nn.Module):    def __init__(self, inp, oup, image_size, heads=8, dim_head=32, dropout=0.): super().__init__() inner_dim = dim_head * heads project_out = not (heads == 1 and dim_head == inp) self.ih, self.iw = image_size self.heads = heads self.scale = dim_head ** -0.5 # parameter table of relative position bias self.relative_bias_table = nn.Parameter(     torch.zeros((2 * self.ih - 1) * (2 * self.iw - 1), heads)) coords = torch.meshgrid((torch.arange(self.ih), torch.arange(self.iw))) coords = torch.flatten(torch.stack(coords), 1) relative_coords = coords[:, :, None] - coords[:, None, :] relative_coords[0] += self.ih - 1 relative_coords[1] += self.iw - 1 relative_coords[0] *= 2 * self.iw - 1 relative_coords = rearrange(relative_coords, 'c h w -> h w c') relative_index = relative_coords.sum(-1).flatten().unsqueeze(1) self.register_buffer("relative_index", relative_index) self.attend = nn.Softmax(dim=-1) self.to_qkv = nn.Linear(inp, inner_dim * 3, bias=False) self.to_out = nn.Sequential(     nn.Linear(inner_dim, oup),     nn.Dropout(dropout) ) if project_out else nn.Identity()    def forward(self, x): qkv = self.to_qkv(x).chunk(3, dim=-1) q, k, v = map(lambda t: rearrange(     t, 'b n (h d) -> b h n d', h=self.heads), qkv) dots = torch.matmul(q, k.transpose(-1, -2)) * self.scale # Use "gather" for more efficiency on GPUs relative_bias = self.relative_bias_table.gather(     0, self.relative_index.repeat(1, self.heads)) relative_bias = rearrange(     relative_bias, '(h w) c -> 1 c h w', h=self.ih*self.iw, w=self.ih*self.iw) dots = dots + relative_bias attn = self.attend(dots) out = torch.matmul(attn, v) out = rearrange(out, 'b h n d -> b n (h d)') out = self.to_out(out) return outclass Transformer(nn.Module):    def __init__(self, inp, oup, image_size, heads=8, dim_head=32, downsample=False, dropout=0.): super().__init__() hidden_dim = int(inp * 4) self.ih, self.iw = image_size self.downsample = downsample if self.downsample:     self.pool1 = nn.MaxPool2d(3, 2, 1)     self.pool2 = nn.MaxPool2d(3, 2, 1)     self.proj = nn.Conv2d(inp, oup, 1, 1, 0, bias=False) self.attn = Attention(inp, oup, image_size, heads, dim_head, dropout) self.ff = FeedForward(oup, hidden_dim, dropout) self.attn = nn.Sequential(     Rearrange('b c ih iw -> b (ih iw) c'),     PreNorm(inp, self.attn, nn.LayerNorm),     Rearrange('b (ih iw) c -> b c ih iw', ih=self.ih, iw=self.iw) ) self.ff = nn.Sequential(     Rearrange('b c ih iw -> b (ih iw) c'),     PreNorm(oup, self.ff, nn.LayerNorm),     Rearrange('b (ih iw) c -> b c ih iw', ih=self.ih, iw=self.iw) )    def forward(self, x): if self.downsample:     x = self.proj(self.pool1(x)) + self.attn(self.pool2(x)) else:     x = x + self.attn(x) x = x + self.ff(x) return xclass CoAtNet(nn.Module):    def __init__(self, image_size, in_channels, num_blocks, channels, num_classes=1000, block_types=['C', 'C', 'T', 'T']): super().__init__() ih, iw = image_size block = {'C': MBConv, 'T': Transformer} self.s0 = self._make_layer(     conv_3x3_bn, in_channels, channels[0], num_blocks[0], (ih // 2, iw // 2)) self.s1 = self._make_layer(     block[block_types[0]], channels[0], channels[1], num_blocks[1], (ih // 4, iw // 4)) self.s2 = self._make_layer(     block[block_types[1]], channels[1], channels[2], num_blocks[2], (ih // 8, iw // 8)) self.s3 = self._make_layer(     block[block_types[2]], channels[2], channels[3], num_blocks[3], (ih // 16, iw // 16)) self.s4 = self._make_layer(     block[block_types[3]], channels[3], channels[4], num_blocks[4], (ih // 32, iw // 32)) self.pool = nn.AvgPool2d(ih // 32, 1) self.fc = nn.Linear(channels[-1], num_classes, bias=False)    def forward(self, x): x = self.s0(x) l1_out = self.s1(x) l2_out = self.s2(l1_out) l3_out = self.s3(l2_out) l4_out = self.s4(l3_out) fea = self.pool(l4_out).view(-1, l4_out.shape[1]) out = self.fc(fea) return l1_out,l2_out,l3_out,l4_out,fea, out    def _make_layer(self, block, inp, oup, depth, image_size): layers = nn.ModuleList([]) for i in range(depth):     if i == 0:  layers.append(block(inp, oup, image_size, downsample=True))     else:  layers.append(block(oup, oup, image_size)) return nn.Sequential(*layers)def coatnet_0():    num_blocks = [2, 2, 3, 5, 2]     # L    channels = [64, 96, 192, 384, 768]      # D    return CoAtNet((224, 224), 3, num_blocks, channels, num_classes=1000)def coatnet_1():    num_blocks = [2, 2, 6, 14, 2]    # L    channels = [64, 96, 192, 384, 768]      # D    return CoAtNet((224, 224), 3, num_blocks, channels, num_classes=1000)def coatnet_2():    num_blocks = [2, 2, 6, 14, 2]    # L    channels = [128, 128, 256, 512, 1026]   # D    return CoAtNet((224, 224), 3, num_blocks, channels, num_classes=1000)def coatnet_3():    num_blocks = [2, 2, 6, 14, 2]    # L    channels = [192, 192, 384, 768, 1536]   # D    return CoAtNet((224, 224), 3, num_blocks, channels, num_classes=1000)def coatnet_4():    num_blocks = [2, 2, 12, 28, 2]   # L    channels = [192, 192, 384, 768, 1536]   # D    return CoAtNet((224, 224), 3, num_blocks, channels, num_classes=1000)def count_parameters(model):    return sum(p.numel() for p in model.parameters() if p.requires_grad)if __name__ == '__main__':    img = torch.randn(1, 3, 224, 224)    net = coatnet_0()    out = net(img)    print(out.shape, count_parameters(net))    net = coatnet_1()    out = net(img)    print(out.shape, count_parameters(net))    net = coatnet_2()    out = net(img)    print(out.shape, count_parameters(net))    net = coatnet_3()    out = net(img)    print(out.shape, count_parameters(net))    net = coatnet_4()    out = net(img)    print(out.shape, count_parameters(net))

同上,将每个block层都输出出来。

数据准备

数据使用我以前在图像分类任务中的数据集——植物幼苗数据集,先将数据集转为训练集和验证集。执行代码:

import globimport osimport shutilimage_list=glob.glob('data1/*/*.png')print(image_list)file_dir='data'if os.path.exists(file_dir):    print('true')    #os.rmdir(file_dir)    shutil.rmtree(file_dir)#删除再建立    os.makedirs(file_dir)else:    os.makedirs(file_dir)from sklearn.model_selection import train_test_splittrainval_files, val_files = train_test_split(image_list, test_size=0.3, random_state=42)train_dir='train'val_dir='val'train_root=os.path.join(file_dir,train_dir)val_root=os.path.join(file_dir,val_dir)for file in trainval_files:    file_class=file.replace("\\","/").split('/')[-2]    file_name=file.replace("\\","/").split('/')[-1]    file_class=os.path.join(train_root,file_class)    if not os.path.isdir(file_class): os.makedirs(file_class)    shutil.copy(file, file_class + '/' + file_name)for file in val_files:    file_class=file.replace("\\","/").split('/')[-2]    file_name=file.replace("\\","/").split('/')[-1]    file_class=os.path.join(val_root,file_class)    if not os.path.isdir(file_class): os.makedirs(file_class)    shutil.copy(file, file_class + '/' + file_name)

训练Teacher模型

Teacher选用CoatNet的coatnet_2模型。这个模型在训练100个epoch后,在验证集上,最高成绩是91%。

步骤

新建teacher_train.py,插入代码:

导入需要的库

import torch.optim as optimimport torchimport torch.nn as nnimport torch.nn.parallelimport torch.utils.dataimport torch.utils.data.distributedimport torchvision.transforms as transformsfrom torchvision import datasetsfrom torch.autograd import Variablefrom model.coatnet import coatnet_2import jsonimport os

导入所需的库

定义训练和验证函数

编写train方法和val方法,由于修改输出的结果,所以返回结果又多个,如果不想对每个返回结果命名,可以使用下划线代替。

def train(model, device, train_loader, optimizer, epoch):    model.train()    sum_loss = 0    total_num = len(train_loader.dataset)    print(total_num, len(train_loader))    for batch_idx, (data, target) in enumerate(train_loader): data, target = Variable(data).to(device), Variable(target).to(device) _,_,_,l4_out,fea,output = model(data) loss = criterion(output, target) optimizer.zero_grad() loss.backward() optimizer.step() print_loss = loss.data.item() sum_loss += print_loss if (batch_idx + 1) % 10 == 0:     print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(  epoch, (batch_idx + 1) * len(data), len(train_loader.dataset),  100. * (batch_idx + 1) / len(train_loader), loss.item()))    ave_loss = sum_loss / len(train_loader)    print('epoch:{},loss:{}'.format(epoch, ave_loss))

定义全局参数

if __name__ == '__main__':    # 创建保存模型的文件夹    file_dir = 'CoatNet'    if os.path.exists(file_dir): print('true') os.makedirs(file_dir, exist_ok=True)    else: os.makedirs(file_dir)    # 设置全局参数    modellr = 1e-4    BATCH_SIZE = 16    EPOCHS = 100    DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

全局参数:

modellr :学习率
BATCH_SIZE:BatchSize的大小。
EPOCHS :epoch的大小
DEVICE:选择cpu还是gpu训练,默认是gpu,如果找不到GPU则改为CPU训练。

图像预处理与增强

 # 数据预处理7    transform = transforms.Compose([ transforms.RandomRotation(10), transforms.GaussianBlur(kernel_size=(5, 5), sigma=(0.1, 3.0)), transforms.ColorJitter(brightness=0.5, contrast=0.5, saturation=0.5), transforms.Resize((224, 224)), transforms.ToTensor(), transforms.Normalize(mean=[0.44127703, 0.4712498, 0.43714803], std=[0.18507297, 0.18050247, 0.16784933])    ])    transform_test = transforms.Compose([ transforms.Resize((224, 224)), transforms.ToTensor(), transforms.Normalize(mean=[0.44127703, 0.4712498, 0.43714803], std=[0.18507297, 0.18050247, 0.16784933])    ])

对于训练集,增强有10°的随机旋转、高斯模糊、饱和度明亮等。
对于验证集,则不做数据集增强。

读取数据

使用pytorch默认读取数据的方式。

    # 读取数据    dataset_train = datasets.ImageFolder('data/train', transform=transform)    dataset_test = datasets.ImageFolder("data/val", transform=transform_test)    with open('class.txt', 'w') as file: file.write(str(dataset_train.class_to_idx))    with open('class.json', 'w', encoding='utf-8') as file: file.write(json.dumps(dataset_train.class_to_idx))    # 导入数据    train_loader = torch.utils.data.DataLoader(dataset_train, batch_size=BATCH_SIZE, shuffle=True)    test_loader = torch.utils.data.DataLoader(dataset_test, batch_size=BATCH_SIZE, shuffle=False)

采用默认的数据读取方式。

设置模型和Loss

    # 实例化模型并且移动到GPU    criterion = nn.CrossEntropyLoss()    model_ft = coatnet_2()    num_ftrs = model_ft.fc.in_features    model_ft.fc = nn.Linear(num_ftrs, 12)    model_ft.to(DEVICE)    # 选择简单暴力的Adam优化器,学习率调低    optimizer = optim.Adam(model_ft.parameters(), lr=modellr)    cosine_schedule = optim.lr_scheduler.CosineAnnealingLR(optimizer=optimizer, T_max=20, eta_min=1e-9)    # 训练    val_acc_list= {}    for epoch in range(1, EPOCHS + 1): train(model_ft, DEVICE, train_loader, optimizer, epoch) cosine_schedule.step() acc=val(model_ft, DEVICE, test_loader) val_acc_list[epoch]=acc with open('result.json', 'w', encoding='utf-8') as file:     file.write(json.dumps(val_acc_list))    torch.save(model_ft, 'CoatNet/model_final.pth')

设置loss为交叉熵。
设置模型为coatnet_2。
修改最后的输出层,将其改为数据集的类别。
设置优化器为Adam。
设置学习率的调节方式为余弦退火算法。
完成上面的代码就可以开始训练Teacher网络了。

学生网络

学生网络选用ResNet18,是一个比较小一点的网络了,模型的大小有40M。训练100个epoch,在验证集上最终的ACC是89%.

步骤

新建student_train.py,插入代码:

导入需要的库

import torch.optim as optimimport torchimport torch.nn as nnimport torch.nn.parallelimport torch.utils.dataimport torch.utils.data.distributedimport torchvision.transforms as transformsfrom torchvision import datasetsfrom torch.autograd import Variablefrom model.resnet import ResNet18import jsonimport os

导入所需的库

定义训练和验证函数

# 定义训练过程def train(model, device, train_loader, optimizer, epoch):    model.train()    sum_loss = 0    total_num = len(train_loader.dataset)    print(total_num, len(train_loader))    for batch_idx, (data, target) in enumerate(train_loader): data, target = Variable(data).to(device), Variable(target).to(device) _,_,_,l4_out,fea,out = model(data) loss = criterion(out, target) optimizer.zero_grad() loss.backward() optimizer.step() print_loss = loss.data.item() sum_loss += print_loss if (batch_idx + 1) % 10 == 0:     print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(  epoch, (batch_idx + 1) * len(data), len(train_loader.dataset),  100. * (batch_idx + 1) / len(train_loader), loss.item()))    ave_loss = sum_loss / len(train_loader)    print('epoch:{},loss:{}'.format(epoch, ave_loss))Best_ACC=0# 验证过程@torch.no_grad()def val(model, device, test_loader):    global Best_ACC    model.eval()    test_loss = 0    correct = 0    total_num = len(test_loader.dataset)    print(total_num, len(test_loader))    with torch.no_grad(): for data, target in test_loader:     data, target = Variable(data).to(device), Variable(target).to(device)     l1_out,l2_out,l3_out,l4_out,fea,out = model(data)     loss = criterion(out, target)     _, pred = torch.max(out.data, 1)     correct += torch.sum(pred == target)     print_loss = loss.data.item()     test_loss += print_loss correct = correct.data.item() acc = correct / total_num avgloss = test_loss / len(test_loader) if acc > Best_ACC:     torch.save(model, file_dir + '/' + 'best.pth')     Best_ACC = acc print('\nVal set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(     avgloss, correct, len(test_loader.dataset), 100 * acc)) return acc

编写train方法和val函数,由于修改输出的结果,所以返回结果又多个,如果不想对每个返回结果命名,可以使用下划线代替。
在val函数中验证ACC,保存ACC最高的模型。

定义全局参数

if __name__ == '__main__':    # 创建保存模型的文件夹    file_dir = 'resnet'    if os.path.exists(file_dir): print('true') os.makedirs(file_dir, exist_ok=True)    else: os.makedirs(file_dir)    # 设置全局参数    modellr = 1e-4    BATCH_SIZE = 16    EPOCHS = 100    DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

全局参数:

modellr :学习率
BATCH_SIZE:BatchSize的大小。
EPOCHS :epoch的大小
DEVICE:选择cpu还是gpu训练,默认是gpu,如果找不到GPU则改为CPU训练。

注意这里设置和Teacher模型保持一致,这样得出的结论才更有说服力。

图像预处理与增强

 # 数据预处理7    transform = transforms.Compose([ transforms.RandomRotation(10), transforms.GaussianBlur(kernel_size=(5, 5), sigma=(0.1, 3.0)), transforms.ColorJitter(brightness=0.5, contrast=0.5, saturation=0.5), transforms.Resize((224, 224)), transforms.ToTensor(), transforms.Normalize(mean=[0.44127703, 0.4712498, 0.43714803], std=[0.18507297, 0.18050247, 0.16784933])    ])    transform_test = transforms.Compose([ transforms.Resize((224, 224)), transforms.ToTensor(), transforms.Normalize(mean=[0.44127703, 0.4712498, 0.43714803], std=[0.18507297, 0.18050247, 0.16784933])    ])

对于训练集,增强有10°的随机旋转、高斯模糊、饱和度明亮等。
对于验证集,则不做数据集增强。
注意:数据增强和Teacher模型里的增强保持一致。

读取数据

使用pytorch默认读取数据的方式。

    # 读取数据    dataset_train = datasets.ImageFolder('data/train', transform=transform)    dataset_test = datasets.ImageFolder("data/val", transform=transform_test)    with open('class.txt', 'w') as file: file.write(str(dataset_train.class_to_idx))    with open('class.json', 'w', encoding='utf-8') as file: file.write(json.dumps(dataset_train.class_to_idx))    # 导入数据    train_loader = torch.utils.data.DataLoader(dataset_train, batch_size=BATCH_SIZE, shuffle=True)    test_loader = torch.utils.data.DataLoader(dataset_test, batch_size=BATCH_SIZE, shuffle=False)

采用pytorch默认的数据读取方式。

设置模型和Loss

 # 实例化模型并且移动到GPU    criterion = nn.CrossEntropyLoss()    model_ft = ResNet18()    print(model_ft)    num_ftrs = model_ft.fc.in_features    model_ft.fc = nn.Linear(num_ftrs, 12)    model_ft.to(DEVICE)    # 选择简单暴力的Adam优化器,学习率调低    optimizer = optim.Adam(model_ft.parameters(), lr=modellr)    cosine_schedule = optim.lr_scheduler.CosineAnnealingLR(optimizer=optimizer, T_max=20, eta_min=1e-9)    # 训练    val_acc_list= {}    for epoch in range(1, EPOCHS + 1): train(model_ft, DEVICE, train_loader, optimizer, epoch) cosine_schedule.step() acc=val(model_ft, DEVICE, test_loader) val_acc_list[epoch]=acc with open('result_student.json', 'w', encoding='utf-8') as file:     file.write(json.dumps(val_acc_list))    torch.save(model_ft, 'resnet/model_final.pth')

设置loss为交叉熵。
设置模型为ResNet18。
修改最后的输出层,将其改为数据集的类别。
设置优化器为Adam。
设置学习率的调节方式为余弦退火算法。
完成上面的代码就可以开始训练Student网络了。

蒸馏学生网络

学生网络继续选用ResNet18,使用Teacher网络蒸馏学生网络,训练100个epoch,最终,验证集的ACC为90%。
NST知识蒸馏的脚本详见:
https://wanghao.blog.csdn.net/article/details/127802486?spm=1001.2014.3001.5502。
代码如下:
nst.py

from __future__ import absolute_importfrom __future__ import print_functionfrom __future__ import divisionimport torchimport torch.nn as nnimport torch.nn.functional as F'''NST with Polynomial Kernel, where d=2 and c=0'''class NST(nn.Module):'''Like What You Like: Knowledge Distill via Neuron Selectivity Transferhttps://arxiv.org/pdf/1707.01219.pdf'''def __init__(self):super(NST, self).__init__()def forward(self, fm_s, fm_t):s_H, t_H = fm_s.shape[2], fm_t.shape[2]if s_H > t_H:fm_s = F.adaptive_avg_pool2d(fm_s, (t_H, t_H))elif s_H < t_H:fm_t = F.adaptive_avg_pool2d(fm_t, (s_H, s_H))else:passfm_s = fm_s.view(fm_s.size(0), fm_s.size(1), -1)fm_s = F.normalize(fm_s, dim=2)fm_t = fm_t.view(fm_t.size(0), fm_t.size(1), -1)fm_t = F.normalize(fm_t, dim=2)loss = self.poly_kernel(fm_t, fm_t).mean() \ + self.poly_kernel(fm_s, fm_s).mean() \ - 2 * self.poly_kernel(fm_s, fm_t).mean()return lossdef poly_kernel(self, fm1, fm2):fm1 = fm1.unsqueeze(1)fm2 = fm2.unsqueeze(2)out = (fm1 * fm2).sum(-1).pow(2)return out

步骤

新建kd_train.py,插入代码:

导入需要的库

import torch.optim as optimimport torchimport torch.nn as nnimport torch.nn.parallelimport torch.utils.dataimport torch.utils.data.distributedimport torchvision.transforms as transformsfrom torchvision import datasetsfrom model.resnet import ResNet18import jsonimport osfrom nst import NST

导入需要的库,这里要注意,导入的ResNet18是我自定义的模型,不要导入官方自带的ResNet18。

定义训练和验证函数

# 定义训练过程def train(s_net,t_net, device, criterionCls,criterionKD,train_loader, optimizer, epoch):    s_net.train()    sum_loss = 0    total_num = len(train_loader.dataset)    print(total_num, len(train_loader))    for batch_idx, (data, target) in enumerate(train_loader): data, target = data.to(device), target.to(device) optimizer.zero_grad() l1_out_s,l2_out_s,l3_out_s,l4_out_s,fea_s, out_s = s_net(data) cls_loss = criterionCls(out_s, target) l1_out_t,l2_out_t,l3_out_t,l4_out_t,fea_t, out_t = t_net(data)  # 训练出教师的 teacher_output kd_loss = criterionKD(l4_out_s, l4_out_t.detach()) * lambda_kd loss = cls_loss + kd_loss loss.backward() optimizer.step() print_loss = loss.data.item() sum_loss += print_loss if (batch_idx + 1) % 10 == 0:     print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(  epoch, (batch_idx + 1) * len(data), len(train_loader.dataset),  100. * (batch_idx + 1) / len(train_loader), loss.item()))    ave_loss = sum_loss / len(train_loader)    print('epoch:{},loss:{}'.format(epoch, ave_loss))Best_ACC=0# 验证过程@torch.no_grad()def val(model, device,criterionCls, test_loader):    global Best_ACC    model.eval()    test_loss = 0    correct = 0    total_num = len(test_loader.dataset)    print(total_num, len(test_loader))    with torch.no_grad(): for data, target in test_loader:     data, target = data.to(device), target.to(device)     l1_out_s, l2_out_s, l3_out_s, l4_out_s, fea_s, out_s = model(data)     loss = criterionCls(out_s, target)     _, pred = torch.max(out_s.data, 1)     correct += torch.sum(pred == target)     print_loss = loss.data.item()     test_loss += print_loss correct = correct.data.item() acc = correct / total_num avgloss = test_loss / len(test_loader) if acc > Best_ACC:     torch.save(model, file_dir + '/' + 'best.pth')     Best_ACC = acc print('\nVal set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(     avgloss, correct, len(test_loader.dataset), 100 * acc)) return acc

编写train方法和val函数,由于修改输出的结果,所以返回结果有多个,我们需要对l4_out_s这个结果做蒸馏。
将Student的l4_out_s和Teacher的l4_out_t输入到criterionKD这个loss函数中计算loss。
l4_out_t.detach()的意思是阻断Teacher模型的反向传播。
在val函数中验证ACC,保存ACC最高的模型。

定义全局参数

if __name__ == '__main__':    # 创建保存模型的文件夹    file_dir = 'resnet_kd'    if os.path.exists(file_dir): print('true') os.makedirs(file_dir, exist_ok=True)    else: os.makedirs(file_dir)    # 设置全局参数    modellr = 1e-4    BATCH_SIZE = 16    EPOCHS = 100    DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')    lambda_kd=1.0

modellr :学习率
BATCH_SIZE:BatchSize的大小。
EPOCHS :epoch的大小
DEVICE:选择cpu还是gpu训练,默认是gpu,如果找不到GPU则改为CPU训练。
lambda_kd:蒸馏loss的比重,默认是1.0

图像预处理与增强

 # 数据预处理7    transform = transforms.Compose([ transforms.RandomRotation(10), transforms.GaussianBlur(kernel_size=(5, 5), sigma=(0.1, 3.0)), transforms.ColorJitter(brightness=0.5, contrast=0.5, saturation=0.5), transforms.Resize((224, 224)), transforms.ToTensor(), transforms.Normalize(mean=[0.44127703, 0.4712498, 0.43714803], std=[0.18507297, 0.18050247, 0.16784933])    ])    transform_test = transforms.Compose([ transforms.Resize((224, 224)), transforms.ToTensor(), transforms.Normalize(mean=[0.44127703, 0.4712498, 0.43714803], std=[0.18507297, 0.18050247, 0.16784933])    ])

对于训练集,增强有10°的随机旋转、高斯模糊、饱和度明亮等。
对于验证集,则不做数据集增强。
注意:数据增强和Teacher模型里的增强保持一致。

读取数据

使用pytorch默认读取数据的方式。

    # 读取数据    dataset_train = datasets.ImageFolder('data/train', transform=transform)    dataset_test = datasets.ImageFolder("data/val", transform=transform_test)    with open('class.txt', 'w') as file: file.write(str(dataset_train.class_to_idx))    with open('class.json', 'w', encoding='utf-8') as file: file.write(json.dumps(dataset_train.class_to_idx))    # 导入数据    train_loader = torch.utils.data.DataLoader(dataset_train, batch_size=BATCH_SIZE, shuffle=True)    test_loader = torch.utils.data.DataLoader(dataset_test, batch_size=BATCH_SIZE, shuffle=False)

设置模型和Loss

    model_ft = ResNet18()    print(model_ft)    num_ftrs = model_ft.fc.in_features    model_ft.fc = nn.Linear(num_ftrs, 12)    model_ft.to(DEVICE)    # 选择简单暴力的Adam优化器,学习率调低    optimizer = optim.Adam(model_ft.parameters(), lr=modellr)    cosine_schedule = optim.lr_scheduler.CosineAnnealingLR(optimizer=optimizer, T_max=20, eta_min=1e-9)    teacher_model=torch.load('./CoatNet/best.pth')    teacher_model.eval()    # 实例化模型并且移动到GPU    criterionKD = NST()    criterionCls = nn.CrossEntropyLoss()    # 训练    val_acc_list= {}    for epoch in range(1, EPOCHS + 1): train(model_ft,teacher_model, DEVICE,criterionCls,criterionKD, train_loader, optimizer, epoch) cosine_schedule.step() acc=val(model_ft,DEVICE,criterionCls , test_loader) val_acc_list[epoch]=acc with open('result_kd.json', 'w', encoding='utf-8') as file:     file.write(json.dumps(val_acc_list))    torch.save(model_ft, 'resnet_kd/model_final.pth')

设置模型为ResNet18。
修改最后的输出层,将其改为数据集的类别。
设置优化器为Adam。
设置学习率的调节方式为余弦退火算法。
加载Teacher模型,并设置为eval模式。
设置蒸馏loss为criterionKD
设置分类loss为交叉熵。
完成上面的代码就可以开始蒸馏模式!!!

结果比对

加载保存的结果,然后绘制acc曲线。

import numpy as npfrom matplotlib import pyplot as pltimport jsonteacher_file='result.json'student_file='result_student.json'student_kd_file='result_kd.json'def read_json(file):    with open(file, 'r', encoding='utf8') as fp: json_data = json.load(fp) print(json_data)    return json_datateacher_data=read_json(teacher_file)student_data=read_json(student_file)student_kd_data=read_json(student_kd_file)x =[int(x) for x in  list(dict(teacher_data).keys())]print(x)plt.plot(x, list(teacher_data.values()), label='teacher')plt.plot(x,list(student_data.values()), label='student without NST')plt.plot(x, list(student_kd_data.values()), label='student with NST')plt.title('Test accuracy')plt.legend()plt.show()

总结

本文重点讲解了如何使用NST知识蒸馏算法对Student模型进行蒸馏。希望能帮助到大家,如果觉得有用欢迎收藏、点赞和转发;如果有问题也可以留言讨论。
本次实战用到的代码和数据集详见: