> 文档中心 > Pytorch神经网络实战学习笔记_25 循环神经网络结构训练语言模型并进行简单预测

Pytorch神经网络实战学习笔记_25 循环神经网络结构训练语言模型并进行简单预测

 

1 语言模型步骤

简单概述:根据输入内容,继续输出后面的句子。

1.1 根据需求拆分任务

  • (1)先对模型输入一段文字,令模型输出之后的一个文字。
  • (2)将模型预测出来的文字当成输入,再放到模型里,使模型预测出下一个文字,这样循环下去,以使RNN完成一句话的输出。

1.2 根据任务设计功能模块

  • (1)模型能够记住前面文字的语义;
  • (2)能够根据前面的语义和一个输入文字,输出下一个文字。

1.3 根据功能模块设计实现方案

RNN模型的接口可以输出两个结果:预测值和当前状态

  1. 在实现时,将输入的序列样本拆开,使用循环的方式,将字符逐个输入模型。模型会对每次的输入预测出两个结果,一个是预测字符,另一个是当前的序列状态。
  2. 在训练场景下,将硕测字骑用于计算损失,列状动用于传入下一次循环计算。
  3. 在测试场景下,用循环的方式将输入序列中的文字一个个地传入到模型中,得到最后一个时刻的当前状态,将该状态和输入序列中的最后一个文字转入模型,生成下一个文字的预测结果。同时,按照要求生成的文字条件,重复地将新生成的文字和当前状态输入模型,来预测下一个文字。

2 语言模型的代码实现

2.1 准备样本数据

样本内容:
在尘世的纷扰中,只要心头悬挂着远方的灯光,我们就会坚持不懈地走,理想为我们灌注了精神的蕴藉。所以,生活再平凡、再普通、再琐碎,我们都要坚持一种信念,默守一种精神,为自己积淀站立的信心,前行的气力。

2.1.1定义基本工具函数---make_Language_model.py(第1部分)

首先引入头文件,然后定义相关函数:get_ch_lable()从文件中获取文本,get_ch._able_v0将文本数组转方向量,具体代码如下:

import numpy as npimport torchimport torch.nn.functional as Fimport timeimport randomfrom collections import Counter# 1.1 定义基本的工具函数RANDOM_SEED = 123torch.manual_seed(RANDOM_SEED)DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')def elapsed(sec): # 计算时间函数    if sec < 60: return str(sec) + "sec"    elif sec<(60*60): return str(sec/60) + "min"    else: return str(sec/(60*60)) + "hour"training_file = 'csv_list/wordstest.txt' # 定义样本文件#中文字def get_ch_label(txt_file): # 提取样本中的汉字    labels = ""    with open(txt_file,'rb') as f : for label in f:     labels = labels + label.decode("gb2312",errors = 'ignore')    return labels#中文多文件def readalltxt(txt_files): # 处理中文    labels = []    for txt_file in txt_files: target = get_ch_label(txt_file) labels.append(target)    return labels# 将汉子转化成向量,支持文件和内存对象里的汉字转换def get_ch_label_v(txt_file,word_num_map,txt_label = None):    words_size = len(word_num_map)    to_num = lambda word:word_num_map.get(word,words_size)    if txt_file != None: txt_label = get_ch_label(txt_file)    # 使用to_num()实现单个汉子转成向量的功能,如果没有该汉字,则将返回words_size(值为69)    labels_vector = list(map(to_num,txt_label)) # 将汉字列表中的每个元素传入到to_num()进行转换    return labels_vector

2.1.2 样本预处理---make_Language_model.py(第2部分)

样本预处理这个一套指读取整个样本,将其放入training_data里,获取全部的字表words,并生成样本向量wordlabel与向量有对应关系的word_num_map,代码如下:

# 1.2 样本预处理training_data = get_ch_label(training_file)print("加载训练模型中")print("该样本长度:",len(training_data))counter = Counter(training_data)words = sorted(counter)words_size = len(words)word_num_map = dict(zip(words,range(words_size)))print("字表大小:",words_size)wordlabel = get_ch_label_v(training_file,word_num_map)# 加载训练模型中# 该样本长度: 75# 字表大小: 41(去重)

    上述结果表示样本文件里一共有75个文字,其中掉重复的文字之后,还有41个。这41个文字将作为字表词典,建立文字与索引值的对应关系。
    在训练模型时,每个文字都会被转化成数字形式的索引值输入模型。模型的输出是这41个文字的概率,即把每个文字当成一类。

2.2 代码实现:构建循环神经网络模型---make_Language_model.py(第3部分)

使用GRU构建RNN模型,令RNN模型只接收一个序列的喻入字符,并预测出下一个序列的字符。
在该模型里,所需要完成的步骤如下:

  1. 将输入的字索引转为词嵌入;
  2. 将词嵌入结果输入GRU层;
  3. 对GRU结果做全连接处理,得到维度为69的预测结果,这个预测结果代表每个文字的概率。

2.2.1 代码实现

# 1.3 构建循环神经网络(RNN)模型class GRURNN(torch.nn.Module):    def __init__(self,word_size,embed_dim,hidden_dim,output_size,num_layers): super(GRURNN, self).__init__() self.num_layers = num_layers self.hidden_dim = hidden_dim self.embed = torch.nn.Embedding(word_size,embed_dim) # 定义一个多层的双向层: # 预测结果:形状为[序列,批次,维度hidden_dim×2],因为是双向RNN,故维度为hidden_dim # 序列状态:形状为[层数×2,批次,维度hidden_dim] self.gru = torch.nn.GRU(input_size=embed_dim,    hidden_size=hidden_dim,    num_layers=num_layers,bidirectional=True) self.fc = torch.nn.Linear(hidden_dim *2,output_size)# 全连接层,充当模型的输出层,用于对GRU输出的预测结果进行处理,得到最终的分类结果    def forward(self,features,hidden): embeded = self.embed(features.view(1,-1)) output,hidden = self.gru(embeded.view(1,1,-1),hidden) # output = self.attention(output) output = self.fc(output.view(1,-1)) return output,hidden    def init_zero_state(self): # 对于GRU层状态的初始化,每次迭代训练之前,需要对GRU的状态进行清空,因为输入的序列是1,故torch.zeros的第二个参数为1 init_hidden = torch.zeros(self.num_layers*2,1,self.hidden_dim).to(DEVICE) return init_hidden

2.3 代码实现:实例化,训练模型--make_Language_model.py(第3部分)

# 1.4 实例化模型类,并训练模型EMBEDDING_DIM = 10 # 定义词嵌入维度HIDDEN_DIM = 20 # 定义隐藏层维度NUM_LAYERS = 1 # 定义层数# 实例化模型model = GRURNN(words_size,EMBEDDING_DIM,HIDDEN_DIM,words_size,NUM_LAYERS)model = model.to(DEVICE)optimizer = torch.optim.Adam(model.parameters(),lr=0.005)# 定义测试函数def evaluate(model,prime_str,predict_len,temperature=0.8):    hidden = model.init_zero_state().to(DEVICE)    predicted = ""    # 处理输入语义    for p in range(len(prime_str) -1): _,hidden = model(prime_str[p],hidden) predicted = predicted + words[predict_len]    inp = prime_str[-1] # 获得输入字符    predicted = predicted + words[inp]    #按照指定长度输出预测字符    for p in range(predict_len): output,hidden = model(inp,hidden) # 将输入字符和状态传入模型 # 从多项式中分布采样 # 在测试环境下,使用温度的参数和指数计算对模型的输出结果进行微调,保证其数值是大于0的数,小于0,torch.multinomial()会报错 # 同时,使用多项式分布的方式进行采样,生成预测结果 output_dist = output.data.view(-1).div(temperature).exp() inp = torch.multinomial(output_dist,1)[0] # 获取采样结果 predicted = predicted + words[inp] # 将索引转化成汉字保存在字符串中    return predicted# 定义参数并训练training_iters = 5000display_step = 1000n_input = 4step = 0offset = random.randint(0,n_input+1)end_offset = n_input + 1while step  (len(training_data)-end_offset): offset = random.randint(0,n_input+1)    # 制作输入样本    inwords = wordlabel[offset:offset+n_input]    inwords = np.reshape(np.array(inwords),[n_input,-1,1])    # 制作标签样本    out_onehot = wordlabel[offset+1:offset+n_input+1]    hidden = model.init_zero_state() # RNN的状态清零    optimizer.zero_grad()    loss = 0.0    inputs = torch.LongTensor(inwords).to(DEVICE)    targets = torch.LongTensor(out_onehot).to(DEVICE)    for c in range(n_input): # 按照输入长度将样本预测输入模型并进行预测 outputs,hidden = model(inputs[c],hidden) loss = loss + F.cross_entropy(outputs,targets[c].view(1))    loss = loss / n_input    loss.backward()    optimizer.step()    # 输出日志    with torch.set_grad_enabled(False): if (step+1)%display_step == 0 :     print(f'Time elapesd:{(time.time() - start_time)/60:.4f}min')     print(f'step {step + 1}|Loss {loss.item():.2f}\n\n')     with torch.no_grad():  print(evaluate(model,inputs,32),'\n')     print(50*'=')    step = step +1    # 每次迭代结束,将偏移值相后移动n_input+1个距离单位,可以保证输入数据的样本相互均匀,否则会出现文本两边的样本训练次数较少的情况。    offset = offset + (n_input+1)print("Finished!")

2.4 代码实现:运行模型生成句子--make_Language_model.py(第4部分)

# 1.5 运行模型生成句子while True:    prompt = "输入几个文字:"    sentence = input(prompt)    inputword = sentence.strip()    try: inputword = get_ch_label_v(None,word_num_map,inputword) keys = np.reshape(np.array(inputword),[len(inputword),-1,1]) # get_ch_label_v()中,如果在字典中找不到对应的索引,就会为其分配一个无效的索引值, # 进而在 evaluate()函数中调用模型的时,差不多对应对的有效词向量而终止报错 model.eval() with torch.no_grad():     sentence = evaluate(model,torch.LongTensor(keys).to(DEVICE),32) print(sentence)    except: # 异常处理,当输入的文字不在模型字典中时,系统会报错,有意设置,防止输入超范围的字词 print("还没学会")

3 代码总览--make_Language_model.py

import numpy as npimport torchimport torch.nn.functional as Fimport timeimport randomfrom collections import Counter# 1.1 定义基本的工具函数RANDOM_SEED = 123torch.manual_seed(RANDOM_SEED)DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')def elapsed(sec): # 计算时间函数    if sec < 60: return str(sec) + "sec"    elif sec<(60*60): return str(sec/60) + "min"    else: return str(sec/(60*60)) + "hour"training_file = 'csv_list/wordstest.txt' # 定义样本文件#中文字def get_ch_label(txt_file): # 提取样本中的汉字    labels = ""    with open(txt_file,'rb') as f : for label in f:     labels = labels + label.decode("gb2312",errors = 'ignore')    return labels#中文多文件def readalltxt(txt_files): # 处理中文    labels = []    for txt_file in txt_files: target = get_ch_label(txt_file) labels.append(target)    return labels# 将汉子转化成向量,支持文件和内存对象里的汉字转换def get_ch_label_v(txt_file,word_num_map,txt_label = None):    words_size = len(word_num_map)    to_num = lambda word:word_num_map.get(word,words_size)    if txt_file != None: txt_label = get_ch_label(txt_file)    # 使用to_num()实现单个汉子转成向量的功能,如果没有该汉字,则将返回words_size(值为69)    labels_vector = list(map(to_num,txt_label)) # 将汉字列表中的每个元素传入到to_num()进行转换    return labels_vector# 1.2 样本预处理training_data = get_ch_label(training_file)print("加载训练模型中")print("该样本长度:",len(training_data))counter = Counter(training_data)words = sorted(counter)words_size = len(words)word_num_map = dict(zip(words,range(words_size)))print("字表大小:",words_size)wordlabel = get_ch_label_v(training_file,word_num_map)# 加载训练模型中# 该样本长度: 75# 字表大小: 41(去重)# 1.3 构建循环神经网络(RNN)模型class GRURNN(torch.nn.Module):    def __init__(self,word_size,embed_dim,hidden_dim,output_size,num_layers): super(GRURNN, self).__init__() self.num_layers = num_layers self.hidden_dim = hidden_dim self.embed = torch.nn.Embedding(word_size,embed_dim) # 定义一个多层的双向层: # 预测结果:形状为[序列,批次,维度hidden_dim×2],因为是双向RNN,故维度为hidden_dim # 序列状态:形状为[层数×2,批次,维度hidden_dim] self.gru = torch.nn.GRU(input_size=embed_dim,    hidden_size=hidden_dim,    num_layers=num_layers,bidirectional=True) self.fc = torch.nn.Linear(hidden_dim *2,output_size)# 全连接层,充当模型的输出层,用于对GRU输出的预测结果进行处理,得到最终的分类结果    def forward(self,features,hidden): embeded = self.embed(features.view(1,-1)) output,hidden = self.gru(embeded.view(1,1,-1),hidden) # output = self.attention(output) output = self.fc(output.view(1,-1)) return output,hidden    def init_zero_state(self): # 对于GRU层状态的初始化,每次迭代训练之前,需要对GRU的状态进行清空,因为输入的序列是1,故torch.zeros的第二个参数为1 init_hidden = torch.zeros(self.num_layers*2,1,self.hidden_dim).to(DEVICE) return init_hidden# 1.4 实例化模型类,并训练模型EMBEDDING_DIM = 10 # 定义词嵌入维度HIDDEN_DIM = 20 # 定义隐藏层维度NUM_LAYERS = 1 # 定义层数# 实例化模型model = GRURNN(words_size,EMBEDDING_DIM,HIDDEN_DIM,words_size,NUM_LAYERS)model = model.to(DEVICE)optimizer = torch.optim.Adam(model.parameters(),lr=0.005)# 定义测试函数def evaluate(model,prime_str,predict_len,temperature=0.8):    hidden = model.init_zero_state().to(DEVICE)    predicted = ""    # 处理输入语义    for p in range(len(prime_str) -1): _,hidden = model(prime_str[p],hidden) predicted = predicted + words[predict_len]    inp = prime_str[-1] # 获得输入字符    predicted = predicted + words[inp]    #按照指定长度输出预测字符    for p in range(predict_len): output,hidden = model(inp,hidden) # 将输入字符和状态传入模型 # 从多项式中分布采样 # 在测试环境下,使用温度的参数和指数计算对模型的输出结果进行微调,保证其数值是大于0的数,小于0,torch.multinomial()会报错 # 同时,使用多项式分布的方式进行采样,生成预测结果 output_dist = output.data.view(-1).div(temperature).exp() inp = torch.multinomial(output_dist,1)[0] # 获取采样结果 predicted = predicted + words[inp] # 将索引转化成汉字保存在字符串中    return predicted# 定义参数并训练training_iters = 5000display_step = 1000n_input = 4step = 0offset = random.randint(0,n_input+1)end_offset = n_input + 1while step  (len(training_data)-end_offset): offset = random.randint(0,n_input+1)    # 制作输入样本    inwords = wordlabel[offset:offset+n_input]    inwords = np.reshape(np.array(inwords),[n_input,-1,1])    # 制作标签样本    out_onehot = wordlabel[offset+1:offset+n_input+1]    hidden = model.init_zero_state() # RNN的状态清零    optimizer.zero_grad()    loss = 0.0    inputs = torch.LongTensor(inwords).to(DEVICE)    targets = torch.LongTensor(out_onehot).to(DEVICE)    for c in range(n_input): # 按照输入长度将样本预测输入模型并进行预测 outputs,hidden = model(inputs[c],hidden) loss = loss + F.cross_entropy(outputs,targets[c].view(1))    loss = loss / n_input    loss.backward()    optimizer.step()    # 输出日志    with torch.set_grad_enabled(False): if (step+1)%display_step == 0 :     print(f'Time elapesd:{(time.time() - start_time)/60:.4f}min')     print(f'step {step + 1}|Loss {loss.item():.2f}\n\n')     with torch.no_grad():  print(evaluate(model,inputs,32),'\n')     print(50*'=')    step = step +1    # 每次迭代结束,将偏移值相后移动n_input+1个距离单位,可以保证输入数据的样本相互均匀,否则会出现文本两边的样本训练次数较少的情况。    offset = offset + (n_input+1)print("Finished!")# 1.5 运行模型生成句子while True:    prompt = "输入几个文字:"    sentence = input(prompt)    inputword = sentence.strip()    try: inputword = get_ch_label_v(None,word_num_map,inputword) keys = np.reshape(np.array(inputword),[len(inputword),-1,1]) # get_ch_label_v()中,如果在字典中找不到对应的索引,就会为其分配一个无效的索引值, # 进而在 evaluate()函数中调用模型的时,差不多对应对的有效词向量而终止报错 model.eval() with torch.no_grad():     sentence = evaluate(model,torch.LongTensor(keys).to(DEVICE),32) print(sentence)    except: # 异常处理,当输入的文字不在模型字典中时,系统会报错,有意设置,防止输入超范围的字词 print("还没学会")

模型结果:训练的并不好,但是还能用