> 技术文档 > Transformer模型Decoder原理精讲及其PyTorch逐行实现

Transformer模型Decoder原理精讲及其PyTorch逐行实现


原理:

Decoder 的核心是一个自回归 (Auto-regressive) 的生成器。它的任务是在给定源序列的编码表示 (encoder_outputs) 和已生成的目标序列部分 (y_1, ..., y_{t-1}) 的条件下,预测出下一个词 y_t 的概率分布。

一个标准的 Decoder Layer 包含三个核心子层:

1.带掩码的多头自注意力 (Masked Multi-Head Self-Attention): 用于处理已生成的目标序列,使其每个位置能获取来自历史位置的上下文信息。通过“前瞻掩码 (Look-ahead Mask)”防止看到未来信息。

2.多头交叉注意力 (Multi-Head Cross-Attention): 连接 Encoder 和 Decoder 的枢纽。它使用来自上一层的查询 (Query),去检索和对齐 Encoder 输出的键 (Key) 和值 (Value),从而将源序列信息整合进来。

3.前馈神经网络 (Position-wise Feed-Forward Network): 与 Encoder 中的完全一样,用于对融合后的信息进行非线性变换。

我们将采用 Pre-LN (Layer Normalization) 结构,即在进入每个子层前进行层归一化。

1. 核心组件 (复用与修改)

Decoder 的许多组件与 Encoder 是相同的。

1.1 Multi-Head Attention 和 Position-wise Feed-Forward Network

这两个模块的实现与 Encoder 完全相同。我们将直接复用之前的代码。它们是通用的计算模块,一个用于信息加权聚合,一个用于非线性变换。

1.2 Positional Encoding

此模块也与 Encoder 完全相同,用于为序列中的每个位置注入位置信息。

2. 解码器层 (DecoderLayer) 实现

这是 Decoder 的核心单元,它将上述三个子层有机地组织在一起。

DecoderLayerforward 方法接收两个主要输入:目标序列的表示 x 和 Encoder 的输出 memory

1.x 首先通过带掩码的自注意力层,进行内部信息聚合。Q, K, V 全部来自 x,并使用前瞻掩码。

2.该层的输出,作为 Q,去和 memory (作为 K 和 V) 进行交叉注意力计算。

3.交叉注意力的输出再通过前馈网络进行最终处理。 每个步骤都遵循 Norm -> Sublayer -> Dropout -> Add 的 Pre-LN 流程。

第一步:构建前馈网络FFN:

先定义初始函数

import torchimport torch.nn as nnimport math# 声明一个名为 PositionwiseFeedForward 的类,它继承自 PyTorch 的基础模块 nn.Moduleclass PositionwiseFeedForward(nn.Module): def __init__(self, d_model, d_ff, dropout=0.1): super().__init__() # 定义第一个线性层。它将输入维度从 d_model 扩展到 d_ff self.linear1 = nn.Linear(d_model, d_ff) # 定义一个 ReLU 激活函数,用于引入非线性 self.relu = nn.ReLU() # 定义一个 Dropout 层,用于防止过拟合 self.dropout = nn.Dropout(dropout) # 定义第二个线性层。它将维度从 d_ff 压缩回 d_model self.linear2 = nn.Linear(d_ff, d_model)

再定义前向传播:

def forward(self, x): # 1. 数据 x 先通过第一个线性层 x = self.linear1(x) # 2. 然后通过 ReLU 激活函数 x = self.relu(x) # 3. 再通过 Dropout 层 x = self.dropout(x) # 4. 最后通过第二个线性层 x = self.linear2(x) # 返回最终处理结果 return x
第二步:构建多头注意力

这个部件比较复杂,我们逐行构建。

定义初始函数与QKV矩阵生成

# 声明 MultiHeadAttention 类class MultiHeadAttention(nn.Module): # 构造函数,接收模型维度和头的数量 def __init__(self, d_model, nhead, dropout=0.1): super().__init__() # --- 初始化参数 --- self.d_model = d_model self.nhead = nhead # 计算每个头的维度,必须能整除 self.d_head = d_model // nhead # --- 定义网络层 --- # 定义用于生成 Query 的线性层 self.q_linear = nn.Linear(d_model, d_model) # 定义用于生成 Key 的线性层 self.k_linear = nn.Linear(d_model, d_model) # 定义用于生成 Value 的线性层 self.v_linear = nn.Linear(d_model, d_model) # 定义最终的输出线性层 self.out_linear = nn.Linear(d_model, d_model) # 定义 Dropout 层 self.dropout = nn.Dropout(dropout)

定义前向传播

 def forward(self, query, key, value, mask=None): # 获取批量大小 batch_size = query.size(0) # 1. 对 Q, K, V 进行线性变换,并重塑以支持多头 # query.size(0) 是 batch_size, -1 会自动推断为序列长度 # .view() 操作是为了把一个 d_model 维的长向量,拆分成 nhead 个 d_head 维的小向量 # .transpose(1, 2) 是为了把 nhead 维度换到前面,方便后续并行计算 q = self.q_linear(query).view(batch_size, -1, self.nhead, self.d_head).transpose(1, 2) k = self.k_linear(key).view(batch_size, -1, self.nhead, self.d_head).transpose(1, 2) v = self.v_linear(value).view(batch_size, -1, self.nhead, self.d_head).transpose(1, 2) # 2. 计算注意力分数 # torch.matmul(q, k.transpose(-2, -1)) 计算 Q 和 K 的点积 # k.transpose(-2, -1) 将 K 的最后两个维度(序列长度和d_head)交换,以满足矩阵乘法要求 # / math.sqrt(self.d_head) 进行缩放,防止梯度消失 scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(self.d_head) # 3. 应用掩码 (如果提供了) if mask is not None: scores = scores.masked_fill(mask, -1e9) # 4. 将分数转换为概率权重 attn_weights = torch.softmax(scores, dim=-1) # 对权重应用 Dropout attn_weights = self.dropout(attn_weights) # 5. 用权重加权求和 Value context = torch.matmul(attn_weights, v) # 6. 拼接多头并进行最终的线性投影 # .transpose(1, 2) 把维度换回来 # .contiguous() 确保内存是连续的,这是 .view() 的要求 # .view() 把 nhead 个 d_head 维的小向量重新“粘合”成一个 d_model 维的长向量 context = context.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model) # 通过最后的输出线性层 output = self.out_linear(context) return output 
第三步:组装一个解码器层 (DecoderLayer)

现在我们可以开始DecoderLayer :

初始化函数与参数:

class DecoderLayer(nn.Module): # 构造函数 def __init__(self, d_model, nhead, d_ff, dropout=0.1): super().__init__() # 实例化第一个子层:带掩码的自注意力 self.masked_self_attn = MultiHeadAttention(d_model, nhead, dropout) # 实例化第二个子层:交叉注意力 self.cross_attn = MultiHeadAttention(d_model, nhead, dropout) # 实例化第三个子层:前馈网络 self.feed_forward = PositionwiseFeedForward(d_model, d_ff, dropout) # 实例化三个子层对应的层归一化 self.norm1 = nn.LayerNorm(d_model) self.norm2 = nn.LayerNorm(d_model) self.norm3 = nn.LayerNorm(d_model) # 实例化三个子层对应的 Dropout self.dropout1 = nn.Dropout(dropout) self.dropout2 = nn.Dropout(dropout) self.dropout3 = nn.Dropout(dropout)

1..定义前向传播函数的掩码注意力函数,注意它接收 tgt 和 memory 两个输入:

tgt指的是模型需要生成或预测的那个序列。

def forward(self, tgt, memory, tgt_mask=None, memory_mask=None): # 1.1 对输入进行层归一化 (Pre-LN) normed_tgt = self.norm1(tgt) # 1.2 Q,K,V都来自normed_tgt,并应用前瞻掩码 attn_output = self.masked_self_attn(normed_tgt, normed_tgt, normed_tgt, mask=tgt_mask) # 1.3 应用Dropout,并与原始输入进行残差连接 tgt = tgt + self.dropout1(attn_output)

2.交叉注意力层:

 #2.1 对上一步的输出进行层归一化 normed_tgt = self.norm2(tgt) # 2.2 Q来自normed_tgt,K和V来自Encoder的memory attn_output = self.cross_attn(normed_tgt, memory, memory, mask=memory_mask) # 2.3 应用Dropout,并进行残差连接 tgt = tgt + self.dropout2(attn_output)

3.前馈网络层:

 # 3.1 对上一步的输出进行层归一化 normed_tgt = self.norm3(tgt) # 3.2 通过前馈网络 ff_output = self.feed_forward(normed_tgt) # 3.3 应用Dropout,并进行残差连接 tgt = tgt + self.dropout3(ff_output) return tgt
第四步:构建完整的 Decoder 和所有辅助部件

我们已经有了 DecoderLayer 这个基本单元。现在,我们把 N 个这样的单元堆叠起来,并加上处理输入和输出的部件,就完成了整个 Decoder。

1.位置编码

class PositionalEncoding(nn.Module): def __init__(self, d_model, dropout=0.1, max_len=5000): super().__init__() self.dropout = nn.Dropout(p=dropout) # 创建一个足够大的位置编码矩阵 pe = torch.zeros(max_len, d_model) # 创建位置张量 [0, 1, 2, ..., max_len-1] position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) # 计算除法项 div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)) # 用sin和cos计算偶数和奇数维度的位置编码 pe[:, 0::2] = torch.sin(position * div_term) pe[:, 1::2] = torch.cos(position * div_term) # 将pe注册为模型的buffer,它不是参数,但会随模型移动(如.to(device)) self.register_buffer(\'pe\', pe.unsqueeze(0))

2.前瞻掩码生成函数

def generate_square_subsequent_mask(size): mask = torch.triu(torch.ones(size, size), diagonal=1).bool() return mask

最终的 Decoder 类:

class Decoder(nn.Module): def __init__(self, vocab_size, d_model, nhead, d_ff, num_layers, dropout=0.1): super().__init__() self.d_model = d_model # 定义词嵌入层,将token ID转换为向量 self.embedding = nn.Embedding(vocab_size, d_model) # 实例化位置编码模块 self.pos_encoder = PositionalEncoding(d_model, dropout) # 使用 ModuleList 来堆叠 N 个 DecoderLayer self.layers = nn.ModuleList([DecoderLayer(d_model, nhead, d_ff, dropout) for _ in range(num_layers)]) # 定义最终的层归一化 self.final_norm = nn.LayerNorm(d_model) # 定义最终的输出线性层,将向量映射到词汇表得分 self.output_linear = nn.Linear(d_model, vocab_size)

前向传播:

 def forward(self, tgt, memory, tgt_mask=None, memory_mask=None): # 1. 词嵌入 tgt_emb = self.embedding(tgt) * math.sqrt(self.d_model) # 2. 添加位置编码 x = self.pos_encoder(tgt_emb) # 3. 循环通过每一个 DecoderLayer for layer in self.layers: x = layer(x, memory, tgt_mask, memory_mask) # 4. 最终归一化 x = self.final_norm(x) # 5. 通过输出线性层得到 logits logits = self.output_linear(x) return logits
第五步:完整的 Encoder-Decoder 使用示例

为了让代码可以运行,我们需要一个 Encoder 来提供 memory。这里提供一个最小化的 Encoder 并展示完整的联动过程。

最小化Encoder:

class EncoderLayer(nn.Module): def __init__(self, d_model, nhead, d_ff, dropout=0.1): super().__init__() self.self_attn = MultiHeadAttention(d_model, nhead, dropout) self.feed_forward = PositionwiseFeedForward(d_model, d_ff, dropout) self.norm1 = nn.LayerNorm(d_model) self.norm2 = nn.LayerNorm(d_model) self.dropout1 = nn.Dropout(dropout) self.dropout2 = nn.Dropout(dropout) def forward(self, src, src_mask=None): normed_src = self.norm1(src) attn_output = self.self_attn(normed_src, normed_src, normed_src, mask=src_mask) src = src + self.dropout1(attn_output) normed_src = self.norm2(src) ff_output = self.feed_forward(normed_src) src = src + self.dropout2(ff_output) return src
class Encoder(nn.Module): def __init__(self, vocab_size, d_model, nhead, d_ff, num_layers, dropout=0.1): super().__init__() self.embedding = nn.Embedding(vocab_size, d_model) self.pos_encoder = PositionalEncoding(d_model, dropout) self.layers = nn.ModuleList([EncoderLayer(d_model, nhead, d_ff, dropout) for _ in range(num_layers)]) self.final_norm = nn.LayerNorm(d_model) def forward(self, src, src_mask=None): src_emb = self.embedding(src) * math.sqrt(d_model) x = self.pos_encoder(src_emb) for layer in self.layers: x = layer(x, mask=src_mask) return self.final_norm(x)

主程序 :

if __name__ == \'__main__\': #超参数定义 SRC_VOCAB_SIZE = 10000 TGT_VOCAB_SIZE = 12000 D_MODEL = 512 NHEAD = 8 D_FF = 2048 NUM_LAYERS = 3 # 使用较少的层快速测试 #实例化模型 encoder = Encoder(SRC_VOCAB_SIZE, D_MODEL, NHEAD, D_FF, NUM_LAYERS) decoder = Decoder(TGT_VOCAB_SIZE, D_MODEL, NHEAD, D_FF, NUM_LAYERS) # --- 准备输入数据 --- BATCH_SIZE = 2 SRC_SEQ_LEN = 10 TGT_SEQ_LEN = 12 src_tokens = torch.randint(1, SRC_VOCAB_SIZE, (BATCH_SIZE, SRC_SEQ_LEN)) tgt_tokens = torch.randint(1, TGT_VOCAB_SIZE, (BATCH_SIZE, TGT_SEQ_LEN))

添加掩码:

 tgt_mask = generate_square_subsequent_mask(TGT_SEQ_LEN) # Shape: [12, 12] print(\"开始Encoder-Decoder )
 # 1. Encoder 处理源序列 memory = encoder(src_tokens) print(f\"Encoder 输出 (memory) 形状: {memory.shape}\") # 2. Decoder 处理目标序列和 memory output_logits = decoder(tgt_tokens, memory, tgt_mask=tgt_mask) print(f\"Decoder 最终输出 (logits) 形状: {output_logits.shape}\") # 验证输出形状 assert output_logits.shape == (BATCH_SIZE, TGT_SEQ_LEN, TGT_VOCAB_SIZE) print(\"\\n输出形状验证成功\")

深入理解架构中的数据流动:

为了帮助大家更好的理解,所以我再最后附上数据在架构中的动态过程与架构作用:

第一阶段:编码器作用:

编码器的唯一目标是:接收一个完整的输入序列(例如,一个英文句子),然后为这个序列生成一个包含丰富上下文信息的、高质量的向量表示(我们称之为 memory)。

1.从文字到向量

输入: \"I am a student\"

第1步:分词 (Tokenization)

句子被切分成一个个最小的单元(token)。

结果: [\"I\", \"am\", \"a\", \"student\"]

第2步:数值化 (Numericalization)

每个 token 都会从一个预先建好的“词汇表”中查找其对应的唯一ID。

结果: [101, 2572, 1037, 5801] (ID为示例)

第3步:词嵌入与位置编码 (Embedding & Positional Encoding)

词嵌入: 每个 ID 再通过一个“嵌入层”(Embedding Layer)转换成一个高维度的向量(维度为 d_model,比如 512)。现在,每个词都有了自己初步的、包含语义的向量表示。

位置编码: 因为 Transformer 本身不理解顺序,我们需要明确地告诉它每个词的位置。一个根据 sincos 函数生成的位置向量会被到对应的词向量上。

最终输入: 一个形状为 (序列长度, d_model) 的矩阵,其中每个向量都同时包含了词义位置信息。

2. 穿越N层结构:层层递进的理解:

这个包含了位置信息的向量矩阵,现在要进入由 N 层(比如6层)完全相同的编码器层(Encoder Layer)组成的堆栈。我们来看数据在单层中的流动:

1.进入多头自注意力 (Multi-Head Self-Attention)

这是编码器的核心。在这里,序列中的每一个词都会“环顾”序列中的所有其他词(包括自己)。

它通过计算 Q, K, V(查询、键、值)来判断:“为了更好地理解我自己的意思,我应该对其他哪个词投入多少注意力?”

结果: 每个词的向量都吸收了来自整个句子的上下文信息,变成了一个新的、更丰富的向量。例如,“student”的向量现在也包含了“I am a”的信息。

2.进入残差连接与层归一化 (Add & Norm)

注意力层的输出会与该层的输入进行一次“残差连接”(相加),这能防止信息在深层网络中丢失。然后通过“层归一化”来稳定训练。

3.进入前馈网络 (Feed-Forward Network)

这个向量会再通过一个简单的全连接神经网络,进行一次非线性的信息加工和提炼。

4.再次 Add & Norm

稳定训练

至此,数据在一层编码器中的流动就结束了。这个输出会作为下一层编码器层的输入,重复上述过程。

3. 最后生成“记忆”memory

当数据流经所有 N 层编码器后,最终的输出是一个形状为 (序列长度, d_model) 的向量矩阵。这份矩阵就是我们所说的 memory

这份 memory 是编码器对原始输入句子的最终、最深刻的理解。它是一个静态的、只读的“知识库”,为接下来解码器的生成工作做好了万全的准备。

第二阶段:解码器数据流动

解码器的目标是利用编码器生成的 memory,以自回归 (auto-regressive) 的方式,一个接一个地生成目标序列的 token。

1. 生成第一个词

输入:

  1. 一个特殊的“序列开始”符(比如 )。

  2. 编码器生成的完整 memory

解码器内部流动:

符经过嵌入和位置编码,变成一个向量,这个向量进入解码器堆栈。在每一层解码器层中:

带掩码的自注意力: 因为现在只有一个词,这个层基本不起作用。它的目的是让解码器回顾已经生成的内容,但现在历史是空的。

交叉注意力 (Cross-Attention):   向量作为查询(Q),去“查阅”编码器提供的整个 memory(作为键K值V)。它在问:“我要开始生成了,原文中哪个部分的信息最重要?” 模型可能会发现,原文开头的 \"I\" 最重要。

前馈网络: 对交叉注意力提取出的信息进行加工。

量流过所有 N 层解码器后,顶层的输出向量会经过一个最终的线性层和 Softmax 函数,在整个目标词汇表上生成一个概率分布。最后输出概率最高的词,比如“我”,被选为第一个生成的词。

2. 生成后续的词 (自回归循环)

输入已经生成的所有词,即 [, \"我\"]以及编码器生成的完整 memory

解码器内部流动:

[, \"我\"] 序列经过嵌入和位置编码,变成向量矩阵。进入解码器堆栈。在每一层解码器层中:

带掩码的自注意力: 现在历史不再为空。“我”这个词会回顾 ,形成对当前已生成内容的理解。前瞻掩码会确保它看不到未来的词。

交叉注意力: 这个“已生成内容”的理解,会形成一个新的查询(Q),再次去查阅整个 memory。它可能在问:“我已经写了‘我’,接下来我应该关注原文的哪部分?” 这次,注意力可能被吸引到 \"am\" 和 \"a student\" 上。

前馈网络: 再次加工信息。

顶层输出经过线性层和 Softmax,生成新的概率分布。输出:概率最高的词,比如“是”,被选为下一个词。

这个回顾历史 -> 查阅原文 -> 决定下一个词的循环会不断进行,直到模型生成一个“序列结束”符 ,数据的整个流动就结束了

希望这篇文章能帮助大家了解并学会使用Decoder