> 技术文档 > llama-factory源码详解——以DPO为例_llamafactory dpo

llama-factory源码详解——以DPO为例_llamafactory dpo

本文记录了我在学习 llama-factory过程中对代码运行过程的梳理

代码入口——src/train.py

from llamafactory.train.tuner import run_expdef main(): run_exp()def _mp_fn(index): # For xla_spawn (TPUs) run_exp()if __name__ == \"__main__\": main()

run_exp() 

该函数位于src/llamafactory/train/tuner.py

def run_exp(args: Optional[Dict[str, Any]] = None, callbacks: List[\"TrainerCallback\"] = []) -> None: callbacks.append(LogCallback()) model_args, data_args, training_args, finetuning_args, generating_args = get_train_args(args) if finetuning_args.stage == \"pt\": run_pt(model_args, data_args, training_args, finetuning_args, callbacks) elif finetuning_args.stage == \"sft\": run_sft(model_args, data_args, training_args, finetuning_args, generating_args, callbacks) elif finetuning_args.stage == \"rm\": run_rm(model_args, data_args, training_args, finetuning_args, callbacks) elif finetuning_args.stage == \"ppo\": run_ppo(model_args, data_args, training_args, finetuning_args, generating_args, callbacks) elif finetuning_args.stage == \"dpo\": run_dpo(model_args, data_args, training_args, finetuning_args, callbacks) elif finetuning_args.stage == \"kto\": run_kto(model_args, data_args, training_args, finetuning_args, callbacks) else: raise ValueError(\"Unknown task: {}.\".format(finetuning_args.stage))

这段代码首先获取训练参数,也就是 get_train_args函数,位置在src/llamafactory/hparams/parser.py,hparams这个包里还有llama-factory的参数体系。

这个函数是用来解析和验证训练参数的。它接受一个可选的字典参数args,如果没有提供,则使用默认值。函数的主要任务是:

  1. 解析输入参数并将其分配给五个变量:model_argsdata_argstraining_argsfinetuning_args, 和 generating_args
  2. 设置日志记录,以便在训练过程中记录信息。
  3. 验证输入参数的合法性,包括检查是否有冲突的参数设置、是否有必要的参数未提供等。
  4. 根据需要进行参数的后处理,例如在分布式训练中调整某些参数的值。
  5. 如果输出目录已经存在并且不允许覆盖,函数会检查是否可以从之前的检查点恢复训练。
  6. 根据是否使用混合精度训练,设置模型的计算精度。
  7. 在每个进程中记录一些基本的训练信息,例如进程的排名、设备、GPU数量、分布式训练的状态和计算精度。
  8. 使用提供的种子设置随机数生成器的种子。

最后,函数返回五个参数对象,分别代表模型、数据、训练、微调和生成的相关参数。这些参数将被用来配置和执行训练过程。

run_dpo()

以dpo为例,run_dpo()函数在src/llamafactory/train/dpo/workflow.py。

导入模块

from typing import TYPE_CHECKING, List, Optionalfrom ...data import PairwiseDataCollatorWithPadding, get_datasetfrom ...extras.constants import IGNORE_INDEXfrom ...extras.ploting import plot_lossfrom ...hparams import ModelArgumentsfrom ...model import load_model, load_tokenizerfrom ..trainer_utils import create_modelcard_and_push, create_ref_modelfrom .trainer import CustomDPOTrainer

这些导入语句引入了所需的模块和函数:

  • typing 模块用于类型检查。
  • PairwiseDataCollatorWithPaddingget_dataset 用于数据处理。
  • IGNORE_INDEX 是一个常量,用于忽略填充值。
  • plot_loss 用于绘制损失图。
  • ModelArguments 是模型参数的定义。
  • load_modelload_tokenizer 用于加载模型和分词器。
  • create_modelcard_and_pushcreate_ref_model 是一些实用函数。
  • CustomDPOTrainer 是自定义的训练器类。

类型检查

if TYPE_CHECKING: from transformers import Seq2SeqTrainingArguments, TrainerCallback from ...hparams import DataArguments, FinetuningArguments

这些导入语句仅在类型检查时使用,用于定义类型提示。

run_dpo 函数

def run_dpo( model_args: \"ModelArguments\", data_args: \"DataArguments\", training_args: \"Seq2SeqTrainingArguments\", finetuning_args: \"FinetuningArguments\", callbacks: Optional[List[\"TrainerCallback\"]] = None,):

这个函数 run_dpo 接受五个参数:

  • model_args: 模型相关的参数。
  • data_args: 数据相关的参数。
  • training_args: 训练相关的参数。
  • finetuning_args: 微调相关的参数。
  • callbacks: 可选的回调函数列表。

加载分词器和数据集

 tokenizer_module = load_tokenizer(model_args) tokenizer = tokenizer_module[\"tokenizer\"] dataset_module = get_dataset(model_args, data_args, training_args, stage=\"rm\", **tokenizer_module) model = load_model(tokenizer, model_args, finetuning_args, training_args.do_train)

这几行代码执行以下操作:

  1. 加载分词器模块,并从中提取出实际的分词器对象。
  2. 调用 get_dataset 函数,获取数据集模块。参数包括模型参数、数据参数、训练参数、阶段(这里是 \"rm\"),以及分词器模块中的其他参数。
  3. 加载模型,使用分词器、模型参数、微调参数和训练标志(do_train)作为输入。

创建数据整理器

 data_collator = PairwiseDataCollatorWithPadding( tokenizer=tokenizer, pad_to_multiple_of=8, label_pad_token_id=IGNORE_INDEX if data_args.ignore_pad_token_for_loss else tokenizer.pad_token_id, )

这段代码创建了一个 PairwiseDataCollatorWithPadding 对象,用于数据整理。它使用分词器,并设置填充到8的倍数。如果 data_args.ignore_pad_token_for_loss 为真,则使用 IGNORE_INDEX 作为标签填充标记,否则使用分词器的填充标记。

创建参考模型

 if finetuning_args.use_ref_model: if finetuning_args.ref_model is None and (not training_args.do_train): # use the model itself ref_model = model else: ref_model = create_ref_model(model_args, finetuning_args) else: ref_model = None

这段代码创建一个参考模型:

  • 如果 finetuning_args.use_ref_model 为真:
    • 如果 finetuning_args.ref_modelNone 且不进行训练,则使用当前模型作为参考模型。
    • 否则,调用 create_ref_model 函数创建参考模型。
  • 如果 finetuning_args.use_ref_model 为假,则参考模型为 None

更新训练参数

 training_args.remove_unused_columns = False # important for pairwise dataset

这行代码更新训练参数,设置 remove_unused_columnsFalse,这对于成对数据集非常重要。

初始化训练器

 trainer = CustomDPOTrainer( model=model, ref_model=ref_model, args=training_args, finetuning_args=finetuning_args, data_collator=data_collator, callbacks=callbacks, **dataset_module, **tokenizer_module, )

这段代码初始化自定义的训练器 CustomDPOTrainer,传入模型、参考模型、训练参数、微调参数、数据整理器、回调函数以及数据集和分词器模块中的其他参数。

训练模型

 if training_args.do_train: train_result = trainer.train(resume_from_checkpoint=training_args.resume_from_checkpoint) trainer.save_model() trainer.log_metrics(\"train\", train_result.metrics) trainer.save_metrics(\"train\", train_result.metrics) trainer.save_state() if trainer.is_world_process_zero() and finetuning_args.plot_loss: plot_loss(training_args.output_dir, keys=[\"loss\", \"eval_loss\", \"rewards/accuracies\"])

如果 do_train 为真,则执行以下操作:

  1. 调用 trainer.train 方法进行训练,支持从检查点恢复训练。
  2. 保存模型。
  3. 记录和保存训练指标。
  4. 保存训练状态。
  5. 如果当前进程是主进程且启用了绘制损失图,则调用 plot_loss 绘制损失图。

评估模型

 if training_args.do_eval: metrics = trainer.evaluate(metric_key_prefix=\"eval\") if id(model) == id(ref_model): # unable to compute rewards if reference model is the model itself remove_keys = [key for key in metrics.keys() if \"rewards\" in key] for key in remove_keys: metrics.pop(key) trainer.log_metrics(\"eval\", metrics) trainer.save_metrics(\"eval\", metrics)

如果 do_eval 为真,则执行以下操作:

  1. 调用 trainer.evaluate 方法进行评估,使用 \"eval\" 作为指标前缀。
  2. 如果模型和参考模型是同一个对象,则无法计算奖励,移除包含 \"rewards\" 的指标。
  3. 记录和保存评估指标。

创建模型卡并推送

 create_modelcard_and_push(trainer, model_args, data_args, training_args, finetuning_args)

最后,调用 create_modelcard_and_push 函数,创建模型卡并推送到指定位置。

总结

这个代码片段定义了一个 run_dpo 函数,用于加载和准备模型、数据集和相关的配置参数,初始化自定义训练器 CustomDPOTrainer,并根据需要进行训练和评估。它还包括创建模型卡并推送的步骤。

CustomDPOTrainer类

class CustomDPOTrainer(DPOTrainer):

这个类 CustomDPOTrainer 继承自 DPOTrainer,它是一个自定义的训练器类。

 def __init__( self, model: Union[\"PreTrainedModel\", torch.nn.Module], ref_model: Optional[Union[\"PreTrainedModel\", torch.nn.Module]], finetuning_args: \"FinetuningArguments\", processor: Optional[\"ProcessorMixin\"], disable_dropout: bool = True, **kwargs, ):

初始化方法,接受以下参数:

  • model: 预训练模型或 PyTorch 模型。
  • ref_model: 可选的参考模型。
  • finetuning_args: 微调参数。
  • processor: 可选的处理器。
  • disable_dropout: 是否禁用 dropout,默认为 True
  • **kwargs: 其他关键字参数。
 if disable_dropout: disable_dropout_in_model(model) if ref_model is not None: disable_dropout_in_model(ref_model)

如果 disable_dropout 为真,则禁用模型和参考模型中的 dropout。

 self.finetuning_args = finetuning_args self.f_divergence_type = \"reverse_kl\" self.reference_free = False self.use_dpo_data_collator = True # hack to avoid warning self.generate_during_eval = False # disable at evaluation self.label_pad_token_id = IGNORE_INDEX self.padding_value = 0 self.is_encoder_decoder = model.config.is_encoder_decoder self.precompute_ref_log_probs = False self._precomputed_train_ref_log_probs = False self._precomputed_eval_ref_log_probs = False self._peft_has_been_casted_to_bf16 = False

初始化一些实例变量,包括微调参数、散度类型、是否使用参考模型、数据整理器、评估期间是否生成、标签填充标记、填充值、是否是编码器-解码器模型等。

 self.ref_model = ref_model self._stored_metrics = defaultdict(lambda: defaultdict(list))

设置参考模型,并初始化一个存储指标的字典。

 # dpo hyperparams self.beta = finetuning_args.pref_beta self.loss_type = finetuning_args.pref_loss self.ftx_gamma = finetuning_args.pref_ftx self.label_smoothing = finetuning_args.dpo_label_smoothing self.simpo_gamma = finetuning_args.simpo_gamma

初始化一些 DPO(偏好优化)超参数。

 Trainer.__init__(self, model=model, **kwargs) if not hasattr(self, \"accelerator\"): raise AttributeError(\"Please update `transformers`.\")

调用父类 Trainer 的初始化方法,并检查是否存在 accelerator 属性。

 warnings.simplefilter(\"ignore\") # remove gc warnings on ref model

忽略一些警告信息。

 if ref_model is not None: if self.is_deepspeed_enabled: if not (  getattr(ref_model, \"is_loaded_in_8bit\", False) or getattr(ref_model, \"is_loaded_in_4bit\", False) ): # quantized models are already set on the correct device  self.ref_model = self._prepare_deepspeed(self.ref_model) else: self.ref_model = self.accelerator.prepare_model(self.ref_model, evaluation_mode=True) self.ref_model.eval()

如果参考模型不为空,且启用了 DeepSpeed,则准备 DeepSpeed 模型;否则,使用加速器准备参考模型,并将其设置为评估模式。

 if processor is not None: self.add_callback(SaveProcessorCallback(processor))

如果处理器不为空,则添加 SaveProcessorCallback 回调。

 if finetuning_args.pissa_convert: self.callback_handler.add_callback(PissaConvertCallback)

如果启用了 pissa_convert,则添加 PissaConvertCallback 回调。

 if finetuning_args.use_badam: from badam import BAdamCallback, clip_grad_norm_old_version self.accelerator.clip_grad_norm_ = MethodType(clip_grad_norm_old_version, self.accelerator) self.add_callback(BAdamCallback)

如果启用了 use_badam,则导入 BAdamCallbackclip_grad_norm_old_version,并添加 BAdamCallback 回调。

 def create_optimizer(self) -> \"torch.optim.Optimizer\": if self.optimizer is None: self.optimizer = create_custom_optimzer(self.model, self.args, self.finetuning_args) return super().create_optimizer()

创建优化器,如果优化器为空,则调用 create_custom_optimzer 创建自定义优化器。

 def create_scheduler( self, num_training_steps: int, optimizer: Optional[\"torch.optim.Optimizer\"] = None ) -> \"torch.optim.lr_scheduler.LRScheduler\": create_custom_scheduler(self.args, num_training_steps, optimizer) return super().create_scheduler(num_training_steps, optimizer)

创建学习率调度器,调用 create_custom_scheduler 创建自定义调度器。

 def odds_ratio_loss(self, chosen_logps: \"torch.Tensor\", rejected_logps: \"torch.Tensor\") -> \"torch.Tensor\": r\"\"\" Computes ORPO\'s odds ratio (OR) loss for batched log probabilities of the policy model. \"\"\" log_odds = (chosen_logps - rejected_logps) - ( torch.log1p(-torch.exp(chosen_logps)) - torch.log1p(-torch.exp(rejected_logps)) ) sft_loss = -chosen_logps odds_ratio_loss = -F.logsigmoid(log_odds) orpo_loss = sft_loss + self.beta * odds_ratio_loss return orpo_loss

这是一个用于计算策略模型的批量对数概率的Odds Ratio Policy Optimization(ORPO)损失的PyTorch函数。log_odds 是选择和拒绝的对数概率之差,sft_loss 是负的选择对数概率,odds_ratio_loss 是负的 logsigmoid,最终的 orpo_losssft_lossodds_ratio_loss 的加权和。ORPO损失旨在鼓励策略选择具有更高期望回报的动作,同时惩罚它选择不太可能是最优的动作。通过平衡这两个目标,ORPO旨在改善强化学习代理在复杂环境中的性能。

 def simpo_loss(self, chosen_logps: \"torch.Tensor\", rejected_logps: \"torch.Tensor\") -> \"torch.Tensor\": r\"\"\" Computes SimPO loss for batched log probabilities of the policy model. \"\"\" pi_logratios = chosen_logps - rejected_logps gamma_logratios = self.simpo_gamma / self.beta logits = pi_logratios - gamma_logratios simpo_loss = -F.logsigmoid(self.beta * logits) return simpo_loss

这个函数用于计算批量对数概率的策略模型的SimPO(简单政策优化)损失。pi_logratios 是选择和拒绝的对数概率之差,gamma_logratiossimpo_gammabeta 的比值,logitspi_logratiosgamma_logratios 之差,最终的 simpo_loss 是负的 logsigmoid

 def compute_preference_loss( self, policy_chosen_logps: \"torch.Tensor\", policy_rejected_logps: \"torch.Tensor\", reference_chosen_logps: Optional[\"torch.Tensor\"], reference_rejected_logps: Optional[\"torch.Tensor\"], ) -> Tuple[\"torch.Tensor\", \"torch.Tensor\", \"torch.Tensor\"]: r\"\"\" Computes loss for preference learning. \"\"\" if not self.finetuning_args.use_ref_model: if self.loss_type == \"orpo\": losses = self.odds_ratio_loss(policy_chosen_logps, policy_rejected_logps) elif self.loss_type == \"simpo\": losses = self.simpo_loss(policy_chosen_logps, policy_rejected_logps) else: raise NotImplementedError(\"Unknown loss type: {}.\".format(self.loss_type)) chosen_rewards = self.beta * policy_chosen_logps.to(self.accelerator.device).detach() rejected_rewards = self.beta * policy_rejected_logps.to(self.accelerator.device).detach() else: losses, chosen_rewards, rejected_rewards = self.dpo_loss( policy_chosen_logps, policy_rejected_logps, reference_chosen_logps, reference_rejected_logps ) return losses, chosen_rewards, rejected_rewards

计算偏好学习的损失。如果不使用参考模型,根据参数中的损失类型计算 ORPO 或 SimPO 损失,并计算选择和拒绝的奖励。如果使用参考模型,调用 dpo_loss 计算损失和奖励。对于是否使用参考模型,也就是use_ref_model参数,可以到src/llamafactory/hparams/finetuning_args.py中查看它的默认值:

self.use_ref_model = self.stage == \"dpo\" and self.pref_loss not in [\"orpo\", \"simpo\"]
pref_loss: Literal[\"sigmoid\", \"hinge\", \"ipo\", \"kto_pair\", \"orpo\", \"simpo\"] = field( default=\"sigmoid\", metadata={\"help\": \"The type of DPO loss to use.\"}, )

 可以看到pref_loss的默认值是sigmod,也就是use_ref_model在dpo阶段默认是True。

 def concatenated_forward( self, model: \"PreTrainedModel\", batch: Dict[str, \"torch.Tensor\"] ) -> Tuple[\"torch.Tensor\", \"torch.Tensor\", \"torch.Tensor\", \"torch.Tensor\", \"torch.Tensor\"]: r\"\"\" Computes the sum log probabilities of the labels under given logits if loss_type is not IPO, ORPO or SimPO. Otherwise the average log probabilities. \"\"\" if self.finetuning_args.use_ref_model: batch = {k: v.detach().clone() for k, v in batch.items()} # avoid error all_logits: \"torch.Tensor\" = model(**batch, return_dict=True, use_cache=False).logits.to(torch.float32) all_logps, valid_length = get_batch_logps(logits=all_logits, labels=batch[\"labels\"]) if self.loss_type in [\"ipo\", \"orpo\", \"simpo\"]: all_logps = all_logps / valid_length batch_size = batch[\"input_ids\"].size(0) // 2 chosen_logps, rejected_logps = all_logps.split(batch_size, dim=0) chosen_logits, rejected_logits = all_logits.split(batch_size, dim=0) chosen_length, _ = valid_length.split(batch_size, dim=0) return chosen_logps, rejected_logps, chosen_logits, rejected_logits, chosen_logps / chosen_length

计算给定 logits 下标签的对数概率之和(如果损失类型不是 IPO、ORPO 或 SimPO),否则计算平均对数概率。返回选择和拒绝的对数概率、logits 和选择的对数概率的平均值。

 def compute_reference_log_probs( self, model: \"PreTrainedModel\", batch: Dict[str, \"torch.Tensor\"] ) -> Tuple[Optional[\"torch.Tensor\"], Optional[\"torch.Tensor\"]]: r\"\"\" Computes log probabilities of the reference model. \"\"\" if not self.finetuning_args.use_ref_model: return None, None if self.ref_model is None: ref_model = model ref_context = self.accelerator.unwrap_model(model).disable_adapter() else: ref_model = self.ref_model ref_context = nullcontext() with torch.no_grad(), ref_context: reference_chosen_logps, reference_rejected_logps, *_ = self.concatenated_forward(ref_model, batch) return reference_chosen_logps, reference_rejected_logps

计算参考模型的对数概率。如果不使用参考模型,返回 None。否则,计算参考模型的选择和拒绝对数概率。

 def get_batch_loss_metrics( self, model: \"PreTrainedModel\", batch: Dict[str, \"torch.Tensor\"], train_eval: Literal[\"train\", \"eval\"] = \"train\", ) -> Tuple[\"torch.Tensor\", Dict[str, \"torch.Tensor\"]]: r\"\"\" Computes the DPO loss and other metrics for the given batch of inputs for train or test. \"\"\" metrics = {} ( policy_chosen_logps, policy_rejected_logps, policy_chosen_logits, policy_rejected_logits, policy_chosen_logps_avg, ) = self.concatenated_forward(model, batch) reference_chosen_logps, reference_rejected_logps = self.compute_reference_log_probs(model, batch) losses, chosen_rewards, rejected_rewards = self.compute_preference_loss( policy_chosen_logps, policy_rejected_logps, reference_chosen_logps, reference_rejected_logps, ) sft_loss = -policy_chosen_logps_avg if self.ftx_gamma > 1e-6: losses += self.ftx_gamma * sft_loss reward_accuracies = (chosen_rewards > rejected_rewards).float() prefix = \"eval_\" if train_eval == \"eval\" else \"\" metrics[\"{}rewards/chosen\".format(prefix)] = chosen_rewards.mean().cpu() metrics[\"{}rewards/rejected\".format(prefix)] = rejected_rewards.mean().cpu() metrics[\"{}rewards/accuracies\".format(prefix)] = reward_accuracies.mean().cpu() metrics[\"{}rewards/margins\".format(prefix)] = (chosen_rewards - rejected_rewards).mean().cpu() metrics[\"{}logps/rejected\".format(prefix)] = policy_rejected_logps.detach().mean().cpu() metrics[\"{}logps/chosen\".format(prefix)] = policy_chosen_logps.detach().mean().cpu() metrics[\"{}logits/rejected\".format(prefix)] = policy_rejected_logits.detach().mean().cpu() metrics[\"{}logits/chosen\".format(prefix)] = policy_chosen_logits.detach().mean().cpu() if self.loss_type == \"orpo\": metrics[\"{}sft_loss\".format(prefix)] = sft_loss.detach().mean().cpu() metrics[\"{}odds_ratio_loss\".format(prefix)] = ((losses - sft_loss) / self.beta).detach().mean().cpu() return losses.mean(), metrics

这个方法计算给定输入批次的 DPO 损失和其他指标,用于训练或测试。具体步骤如下:

  1. 初始化一个空的 metrics 字典。
  2. 调用 concatenated_forward 方法,获取策略模型的选择和拒绝对数概率、logits 和对数概率的平均值。
  3. 调用 compute_reference_log_probs 方法,计算参考模型的选择和拒绝对数概率。
  4. 调用 compute_preference_loss 方法,计算偏好学习的损失和奖励。
  5. 计算 sft_loss,如果 ftx_gamma 大于 1e-6,则将其加到损失中,这里的sft_loss可能起到一个正则项的作用。
  6. 计算奖励准确率。
  7. 根据 train_eval 参数设置前缀,更新 metrics 字典。
  8. 返回损失的平均值和 metrics 字典。

总结: CustomDPOTrainer 类扩展了 DPOTrainer,添加了自定义的初始化方法、优化器和调度器创建方法,以及计算偏好学习损失和其他指标的方法。它还包括计算 ORPO 和 SimPO 损失的方法,以及计算参考模型对数概率的方法。这个类主要用于在训练过程中处理偏好优化相关的任务。