> 技术文档 > 储粮温度预测新方案!FEBL模型用代码实现:LSTM+注意力+岭回归的完整流程

储粮温度预测新方案!FEBL模型用代码实现:LSTM+注意力+岭回归的完整流程

特征增强型宽度学习(FEBL)的模型,用于储粮温度预测任务。以下是代码的逐层解析和功能说明:


1. 依赖库导入

import torchimport torch.nn as nnimport torch.optim as optimimport numpy as npimport timefrom sklearn.preprocessing import MinMaxScalerfrom sklearn.model_selection import train_test_split
  • PyTorch:用于构建神经网络模型。
  • NumPy:处理数值计算。
  • scikit-learn:用于数据归一化(MinMaxScaler)和数据集划分(train_test_split)。

2. 数据处理类 GrainData

class GrainData: \"\"\"储粮数据处理类,包括数据生成、预处理和划分\"\"\"
功能:
  • 生成模拟数据load_data 方法生成 (num_samples, num_features) 的数据,前4个特征模拟粮温随时间变化的时序数据。
  • 创建时序数据集create_dataset 方法将原始数据转换为滑动窗口格式(每个样本包含 seq_length 个时间步,标签为下一个时间步的温度)。
  • 数据预处理preprocess 方法对数据进行归一化处理(MinMaxScaler),并划分为训练集和测试集。
关键代码段:
# 生成数据X = np.random.rand(num_samples, num_features)for i in range(1, num_samples): X[i, :4] = X[i-1, :4] + np.random.normal(0, 0.1)# 创建时序数据集def create_dataset(data, seq_length): X, y = [], [] for i in range(len(data)-seq_length): X.append(data[i:i+seq_length]) y.append(data[i+seq_length, 0]) # 假设预测第一个特征 return np.array(X), np.array(y)

3. 特征增强模块 FeatureEnhancer

class FeatureEnhancer(nn.Module): def __init__(self, input_size, hidden_size, num_heads): super().__init__() self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True) self.attention = nn.MultiheadAttention(embed_dim=hidden_size, num_heads=num_heads)
功能:
  • LSTM:提取时序特征。
  • 多头自注意力(MultiheadAttention):增强关键特征(如温度变化的关联性)。
  • 输出处理:取最后一个时间步的特征,通过线性层调整维度
关键代码段:
def forward(self, x): # LSTM提取时序特征 out, _ = self.lstm(x) # 多头自注意力增强 attention_out, _ = self.attention(out, out, out) # 取最后一个时间步的特征 last_feature = attention_out[:, -1, :] return last_feature

4. 宽度学习系统(BLS)模块 BroadLearningSystem

class BroadLearningSystem(nn.Module): def __init__(self, input_size, feature_nodes, enhancement_nodes): super().__init__() self.feature_layer = nn.Linear(input_size, feature_nodes) self.enhancement_layer = nn.Linear(feature_nodes, enhancement_nodes)
功能:
  • 特征节点:通过线性变换生成高维特征(feature_nodes)。
  • 增强节点:进一步扩展特征维度(enhancement_nodes),增强模型表达能力。
关键代码段:
def forward(self, x): # 特征节点 feature_out = torch.tanh(self.feature_layer(x)) # 增强节点 enhancement_out = torch.tanh(self.enhancement_layer(feature_out)) return torch.cat([feature_out, enhancement_out], dim=1)

5. 整合模型 FEBLModel

class FEBLModel(nn.Module): def __init__(self, config): super().__init__() self.feature_extractor = FeatureEnhancer(config[\'input_size\'], config[\'lstm_hidden\'], config[\'num_heads\']) self.bls = BroadLearningSystem(config[\'lstm_hidden\'], config[\'feature_nodes\'], config[\'enhancement_nodes\'])
功能:
  • 组合模块:将 FeatureEnhancer 和 BLS 模块组合在一起。
  • 岭回归输出:使用 compute_ridge_weights 方法计算最终输出层的权重(基于训练集特征矩阵和标签)。
关键代码段:
def compute_ridge_weights(self, A_train, y_train): # 岭回归计算权重 I = torch.eye(A_train.size(1)) A_pinv = torch.linalg.pinv(A_train.T @ A_train + self.lambda_reg * I) @ A_train.T self.W_m = A_pinv @ y_train

6. 训练与评估流程 train_and_evaluate

def train_and_evaluate(): # 配置参数 config = { \'input_size\': 8, \'seq_length\': 30, \'lstm_hidden\': 128, \'num_heads\': 2, \'feature_nodes\': 40, \'enhancement_nodes\': 400 } # 数据预处理 data_handler = GrainData(seq_length=config[\'seq_length\']) X_train, X_test, y_train, y_test = data_handler.preprocess() # 模型初始化 model = FEBLModel(config) # 预训练特征增强器 optimizer = optim.Adam(model.feature_extractor.parameters(), lr=0.001) criterion = nn.MSELoss() for epoch in range(50): features = model.feature_extractor(X_train) A = model.bls(features) outputs = temp_fc(A) # 临时全连接层 loss = criterion(outputs, y_train) optimizer.zero_grad() loss.backward() optimizer.step() # 计算BLS输出权重 with torch.no_grad(): A_train = model.bls(model.feature_extractor(X_train)) model.compute_ridge_weights(A_train, y_train) # 测试集评估 y_pred = model(X_test) mae = torch.mean(torch.abs(y_pred - y_test)).item() rmse = torch.sqrt(torch.mean((y_pred - y_test) ** 2)).item() mape = torch.mean(torch.abs((y_pred - y_test) / y_test)).item() * 100 print(f\"MAE: {mae:.4f}, RMSE: {rmse:.4f}, MAPE: {mape:.2f}%\")

7. 模型特点总结

模块 功能 LSTM 提取时序特征(如温度变化的长期依赖关系) 多头自注意力 增强关键特征(如温度波动的关联性) BLS(宽度学习系统) 通过特征节点和增强节点扩展模型容量 岭回归 避免过拟合,稳定输出权重

完整代码如下

import torchimport torch.nn as nnimport torch.optim as optimimport numpy as npimport timefrom sklearn.preprocessing import MinMaxScalerfrom sklearn.model_selection import train_test_splitclass GrainData: \"\"\"储粮数据处理类,包括数据生成、预处理和划分\"\"\" def __init__(self, seq_length=30): \"\"\" 初始化数据处理对象 :param seq_length: 时序窗口大小 \"\"\" self.seq_length = seq_length self.scaler = MinMaxScaler(feature_range=(0, 1)) def load_data(self, num_samples=800, num_features=8): \"\"\" 生成模拟储粮数据 :param num_samples: 样本数量 :param num_features: 特征数量 :return: 模拟数据数组 \"\"\" # 生成基础数据 (范围15-45度) data = np.random.rand(num_samples, num_features) * 30 + 15 # 添加时间相关性:温度趋势变化 for i in range(1, num_samples): # 前4个特征(粮食温度)随时间变化,添加随机波动 data[i, :4] = data[i - 1, :4] + np.random.randn(4) * 0.5 return data def create_dataset(self, data): \"\"\" 创建用于训练的时序数据集 :param data: 原始数据 :return: (特征集, 标签集) \"\"\" X, y = [], [] # 创建滚动窗口数据集 for i in range(len(data) - self.seq_length - 1): X.append(data[i:i + self.seq_length]) # 窗口序列 y.append(data[i + self.seq_length, 0]) # 预测下一时刻的粮温 return np.array(X), np.array(y) def preprocess(self): \"\"\" 数据预处理:归一化并划分训练/测试集 :return: (X_train, X_test, y_train, y_test) \"\"\" # 1. 加载数据 data = self.load_data() # 2. 归一化处理 scaled_data = self.scaler.fit_transform(data) # 3. 创建数据集 X, y = self.create_dataset(scaled_data) # 4. 划分训练/测试集 (按时间顺序划分) return train_test_split(X, y, test_size=0.2, shuffle=False)class FeatureEnhancer(nn.Module): \"\"\"特征增强器:LSTM + 多头自注意力机制\"\"\" def __init__(self, input_size, hidden_size, num_heads): \"\"\" 初始化特征增强器 :param input_size: 输入特征维度 :param hidden_size: LSTM隐藏单元数量 :param num_heads: 注意力头部数量 \"\"\" super().__init__() # LSTM层提取时序特征 self.lstm = nn.LSTM( input_size=input_size, hidden_size=hidden_size, batch_first=True ) # 多头自注意力机制强化关键特征 self.attention = nn.MultiheadAttention( embed_dim=hidden_size, num_heads=num_heads, batch_first=True ) # 维度调整层确保输出一致 self.linear_adjust = nn.Linear(hidden_size, hidden_size) def forward(self, x): \"\"\" 前向传播 :param x: 输入数据 (batch, seq_len, input_size) :return: 增强后的特征 (batch, hidden_size) \"\"\" # 1. LSTM处理时序特征 lstm_out, _ = self.lstm(x) # 2. 多头自注意力强化重要特征 # 输入维度检查:确保是3维(batch, seq, features) if lstm_out.dim() == 2: lstm_out = lstm_out.unsqueeze(0) attn_output, _ = self.attention(lstm_out, lstm_out, lstm_out) # 3. 取最后一个时间步的特征 + 维度调整 return self.linear_adjust(attn_output[:, -1, :])class BroadLearningSystem(nn.Module): \"\"\"宽度学习系统:特征节点与增强节点\"\"\" def __init__(self, input_size, feature_nodes, enhancement_nodes): \"\"\" 初始化BLS系统 :param input_size: 输入特征维度 :param feature_nodes: 特征节点数量 :param enhancement_nodes: 增强节点数量 \"\"\" super().__init__() # 随机初始化权重和偏置 self.W_e = torch.randn(input_size, feature_nodes) * 2 - 1 self.bias_e = torch.randn(1, feature_nodes) * 2 - 1 self.W_h = torch.randn(feature_nodes, enhancement_nodes) * 2 - 1 self.bias_h = torch.randn(1, enhancement_nodes) * 2 - 1 def forward(self, x): \"\"\" 前向传播 :param x: 输入特征 (batch, input_size) :return: 特征+增强节点 (batch, feature_nodes+enhancement_nodes) \"\"\" # 1. 特征节点: 非线性变换 Z = torch.tanh(torch.mm(x, self.W_e) + self.bias_e) # 2. 增强节点: 非线性变换 H = torch.tanh(torch.mm(Z, self.W_h) + self.bias_h) # 3. 级联作为最终特征矩阵 return torch.cat((Z, H), dim=1)class FEBLModel(nn.Module): \"\"\"特征增强型宽度学习模型 (FEBL)\"\"\" def __init__(self, config): \"\"\" 初始化FEBL模型 :param config: 配置字典 \"\"\" super().__init__() # 特征增强器 self.feature_enhancer = FeatureEnhancer( config[\'input_size\'], config[\'lstm_hidden\'], config[\'num_heads\'] ) # 宽度学习系统 self.bls = BroadLearningSystem( config[\'lstm_hidden\'], config[\'feature_nodes\'], config[\'enhancement_nodes\'] ) # 输出权重 (通过岭回归计算) self.W_m = None def forward(self, x): \"\"\" 前向传播 :param x: 输入数据 :return: 预测结果 \"\"\" # 1. 特征增强 features = self.feature_enhancer(x) # 2. 维度统一: 确保为(batch, features)形式 if features.dim() > 2: features = features.squeeze(0) # 3. 宽度学习 A = self.bls(features) # 4. 输出预测 (如果权重已计算) return torch.mm(A, self.W_m) if self.W_m is not None else A def compute_ridge_weights(self, A, Y, lambda_reg=1e-10): \"\"\" 岭回归计算输出权重 :param A: 特征矩阵 :param Y: 标签 :param lambda_reg: 正则化系数 \"\"\" # 确保转换为PyTorch张量 if not isinstance(A, torch.Tensor): A = torch.tensor(A, dtype=torch.float32) if not isinstance(Y, torch.Tensor): Y = torch.tensor(Y, dtype=torch.float32) # 关键检查1: 样本数量匹配 if A.size(0) != Y.size(0): raise ValueError(f\"特征矩阵A有{A.size(0)}个样本, 但标签Y有{Y.size(0)}个样本\") # 关键检查2: 矩阵维度一致 if Y.dim() == 1: Y = Y.unsqueeze(1) # 1. 计算伪逆 (岭回归) I = torch.eye(A.size(1)) A_pinv = torch.linalg.pinv(A.T @ A + lambda_reg * I) @ A.T # 2. 计算输出权重 (确保维度一致) self.W_m = A_pinv @ Y # 3. 维度检查: 确保预测维度正确 if self.W_m.size(0) != A.size(1) or self.W_m.size(1) != Y.size(1): raise RuntimeError(f\"权重矩阵维度错误: W_m {self.W_m.shape}, A {A.shape}, Y {Y.shape}\")def train_and_evaluate(): \"\"\"模型训练与评估流程\"\"\" # 配置参数 config = { \'input_size\': 8, # 输入特征数量 \'seq_length\': 30, # 时序窗口大小 \'lstm_hidden\': 128, # LSTM隐藏单元 \'num_heads\': 2, # 注意力头数 \'feature_nodes\': 40, # BLS特征节点数 \'enhancement_nodes\': 400 # BLS增强节点数 } print(\"开始数据预处理...\") data_handler = GrainData(seq_length=config[\'seq_length\']) X_train, X_test, y_train, y_test = data_handler.preprocess() # 转换为PyTorch张量并调整维度 X_train = torch.tensor(X_train).float() y_train = torch.tensor(y_train).float().unsqueeze(1) # 添加第二维度 X_test = torch.tensor(X_test).float() y_test = torch.tensor(y_test).float().unsqueeze(1) print(f\"训练集维度: X_train {X_train.shape}, y_train {y_train.shape}\") print(f\"测试集维度: X_test {X_test.shape}, y_test {y_test.shape}\") # 初始化模型 model = FEBLModel(config) print(f\"模型初始化完成: 特征节点 {config[\'feature_nodes\']} + 增强节点 {config[\'enhancement_nodes\']}\") # === 阶段1: 预训练特征增强器 === print(\"\\n开始预训练特征增强器 (50轮)...\") optimizer = optim.Adam(model.feature_enhancer.parameters(), lr=0.001) criterion = nn.MSELoss() # 添加临时输出层 bls_output_dim = config[\'feature_nodes\'] + config[\'enhancement_nodes\'] temp_fc = nn.Linear(bls_output_dim, 1) start_time = time.time() for epoch in range(50): # 前向传播 features = model.feature_enhancer(X_train) A = model.bls(features) outputs = temp_fc(A) # 损失计算 loss = criterion(outputs, y_train) # 反向传播 optimizer.zero_grad() loss.backward() optimizer.step() if (epoch + 1) % 10 == 0: print(f\"Epoch [{epoch + 1}/50], Loss: {loss.item():.6f}, Time: {time.time() - start_time:.1f}s\") # === 阶段2: 计算BLS输出权重 === print(\"\\n计算BLS岭回归权重...\") with torch.no_grad(): # 1. 获取训练集特征矩阵 train_features = model.feature_enhancer(X_train) A_train = model.bls(train_features) # 2. 维度检查 print(f\"训练特征矩阵维度: A_train {A_train.shape}, y_train {y_train.shape}\") if A_train.shape[0] != y_train.shape[0]: print(f\"警告: 样本数量不匹配 A_train {A_train.shape[0]} vs y_train {y_train.shape[0]}\") # 3. 岭回归计算权重 try: model.compute_ridge_weights(A_train, y_train) print(f\"权重计算成功: W_m维度 {model.W_m.shape}\") except Exception as e: print(f\"权重计算失败: {e}\") return None # 4. 测试集预测 test_features = model.feature_enhancer(X_test) A_test = model.bls(test_features) print(f\"测试特征矩阵维度: A_test {A_test.shape}\") y_pred = model(X_test) # 5. 性能评估 mae = torch.mean(torch.abs(y_pred - y_test)).item() rmse = torch.sqrt(torch.mean((y_pred - y_test) ** 2)).item() mape = torch.mean(torch.abs((y_pred - y_test) / y_test)).item() * 100 print(\"\\n测试结果:\") print(f\"MAE: {mae:.4f}\") print(f\"RMSE: {rmse:.4f}\") print(f\"MAPE: {mape:.2f}%\") print(f\"总训练时间: {time.time() - start_time:.2f}秒\") return modelif __name__ == \"__main__\": print(\"=\" * 60) print(\"特征增强型宽度学习 (FEBL) 粮温预测模型\") print(\"=\" * 60) model = train_and_evaluate()