> 技术文档 > LeRobot DP——LeRobot对动作策略Diffusion Policy的封装与解读(含DexCap库中对diffusion_policy的封装与实现)

LeRobot DP——LeRobot对动作策略Diffusion Policy的封装与解读(含DexCap库中对diffusion_policy的封装与实现)


前言

过去2年多的深入超过此前7年,全靠夜以继日的勤奋,一天当两天用,抠论文 抠代码 和大模型及具身同事讨论,是目前日常

而具身库里,idp3、π0、lerobot值得反复研究,故,近期我一直在抠π0及lerobot的源码

  1. 本文一开始是此文《LeRobot——Hugging Face打造的机器人开源库:包含对顶层script、与底层基础层dataset的源码分析》的第四部分 策略,考虑到为避免该文的篇幅过长,故把该文的第四部分 策略独立出来,成本文
  2. 且如此文所述
    LeRobot 项目采用了模块化设计,整个项目大概分为以下三层
    \\rightarrow  最顶层:scripts (命令行工具),属于发号施令层
    \\rightarrow  中间层:common/policies (控制策略),属于中间执行层
    \\rightarrow  底层支撑层:common/datasets, common/envs, common/robot_devices
          基础配置层: configs, common/utils
          属于底层基础层

对于lerobot中的common/policies:策略实现,其包含以下策略

  1. act:Action Chunking Transformer 策略
  2. diffusion:扩散策略
  3. tdmpc:时序差分模型预测控制
  4. vqbet:向量量化行为变换器
  5. pi0:基础策略实现

25年6.1日更新:再后续,又把本文的第一部分、第三部分都已分别独立出去,成为新的两篇文章,如下

  1. LeRobot ACT——LeRobot对ALOHA ACT策略的封装:含源码解析与真机部署(效果比肩ACT原论文)
  2. LeRobot pi0——LeRobot对VLA策略π0的封装:含其源码剖析与真机部署(智能化程度高于ACT)

此外,在本文的第二部分 额外增加了:DexCap库中对diffusion_policy的封装与实现

第一部分 封装的Diffusion Policy

关于什么是Diffusion Policy,详见此文《Diffusion Policy——斯坦福UMI所用的动作预测算法:基于扩散模型的扩散策略(从原理到其编码实现)》

1.1 modeling_diffusion.py

文件采用了分层设计模式,从顶层到底层依次为:

  1. 用户接口层:`DiffusionPolicy` 类作为主要接口,继承自 `PreTrainedPolicy`,负责数据规范化、状态管理和动作选择
    - 依赖于:`DiffusionModel`(核心模型)、`Normalize`/`Unnormalize`(数据处理)
    - 主要方法:
      - `__init__`:初始化归一化器和扩散模型
      - `reset`:清空观察和动作队列
      - `select_action`:生成和选择动作
      - `forward`:计算训练损失
  2. 核心模型层:`DiffusionModel` 类实现扩散模型的核心功能,包括条件采样和损失计算
    - 依赖于:
      - `DiffusionRgbEncoder`:处理图像输入
      - `DiffusionConditionalUnet1d`:主要去噪网络
      - `_make_noise_scheduler`(函数):创建噪声调度器
    - 主要方法:
      - `conditional_sample`:从噪声生成动作序列
      - `_prepare_global_conditioning`:处理和融合多模态输入
      - `generate_actions`:生成动作序列
      - `compute_loss`:计算训练损失

    注意,为方便大家一目了然,在本「2.1 modeling_diffusion.py」节中,我用绿色的字 代表训练过程中的函数,而蓝色的字则代表推理过程中的函数

    \\rightarrow  训练流程:1 输入数据经过归一化处理,2 添加噪声到干净轨迹,3 U-Net尝试预测噪声或原始信号,4 计算损失并反向传播
    \\rightarrow  推理流程:输入观察经过归一化处理、如果动作队列为空,调用 `DiffusionModel.generate_actions`、从纯噪声开始,通过 `conditional_sample` 逐步去噪、生成的动作经过反归一化处理后使用

  3. 特征提取层:包括 `DiffusionRgbEncoder` 和 `SpatialSoftmax` 类,负责从图像中提取有意义的特征
    前者DiffusionRgbEncoder依赖于:
      - `SpatialSoftmax`:从特征图提取关键点
      - `torchvision.models`:提供预训练骨干网络
    - 主要功能:将RGB图像编码为固定维度的特征向量

    后者SpatialSoftmax负责空间特征提取
     - 独立组件,实现关键点提取
     - 主要功能:将特征图转换为图像空间中的关键点坐标

  4. 生成网络层:包括 `DiffusionConditionalUnet1d` 和相关组件,实现去噪扩散过程
    - 条件U-Net依赖于:
      - `DiffusionSinusoidalPosEmb`:编码时间步
      - `DiffusionConditionalResidualBlock1d`:构建编码器和解码器块
      - `DiffusionConv1dBlock`:基础卷积块
    - 主要功能:实现条件扩散模型的核心去噪网络
  5. 基础模块层:包括 `DiffusionConv1dBlock-基础卷积块,包含卷积、组归一化和激活`、`DiffusionConditionalResidualBlock1d-实现FiLM条件调制的残差块` 等基础构建块

1.1.1 DiffusionPolicy:负责数据规范化、状态管理和动作选择

  • 架构设计上
    该策略继承自`PreTrainedPolicy`基类,实现了标准化的接口。在初始化过程中,它首先验证配置的特征设置,然后创建三个数据处理组件:输入归一化器、目标归一化器和输出去归一化器。这些组件确保模型能够处理不同量级的传感器数据和动作值,同时保持数值稳定性。核心计算组件是`DiffusionModel`实例,它封装了底层的神经网络架构和扩散过程
  • 动作生成机制上(推理)
    该策略的一个重要特点是其滑动窗口观察-行动机制,通过双队列系统实现:一个用于存储历史观察,另一个用于存储生成的动作。`reset`方法初始化这些队列,每当环境重置时被调用

    `select_action`方法实现了决策逻辑:当动作队列为空时,策略将最近的`n_obs_steps`步观察历史堆叠,通过扩散模型生成`horizon`步的动作轨迹,然后提取`n_action_steps`步动作放入队列
    且n_action_steps <= horizon - n_obs_steps + 1:确保从生成的动作中可以提取出足够的步数用于执行

    啥意思呢,为方便大家理解,我给举个例子
    假设:
    n_obs_steps = 3,代表模型需要最近3步的观察数据才能生成动作
    horizon = 5,表示模型每次生成未来5步的动作
    n_action_steps = 2,代表只执行其中的2个动作
    时间步从 n-o+1 到 n-o+h,其中 o = n_obs_steps,h = horizon,a = n_action_steps

    时间步 n-o+1 n-o+2 n-o+3 n n+1 n+2 n+3 n+4 n-o+h 使用观察值 是 是 是 是 否 否 否 否 否 生成动作 使用动作 否 否 否 否 否 否 否

    这种机制平衡了长期规划(通过扩散模型生成长序列)、和实时性(仅执行少量动作),适用于需要预测未来但需快速响应的场景——如机器人控制

  • 训练过程上
    `forward`方法支持模型训练,它对输入和目标进行归一化处理,然后调用DiffusionModel的`compute_loss`方法计算损失值
    扩散模型内部使用了标准的扩散过程:添加随机噪声到干净轨迹,然后训练网络预测原始轨迹或噪声(取决于配置,比如DDPM还是DDIM,至于什么是DDPM还是DDIM,详见此文《图像生成发展起源:从VAE、扩散模型DDPM、DDIM到DETR、ViT、Swin transformer》中的相关介绍)

1.1.2 DiffusionModel:实现扩散模型的核心功能,包括条件采样和损失计算

`DiffusionModel`是实现的神经网络核心,采用条件扩散模型结构

  • 在训练阶段,它学习将噪声轨迹恢复为有效动作序列
  • 在推理阶段,从纯噪声开始,通过迭代去噪步骤生成以观察为条件的动作序列。该模型通过`_prepare_global_conditioning`方法将多模态观察(机器人状态、环境状态和可选的图像)融合为全局条件向量
1.1.2.1 __init__:构建了一个灵活的多模态条件架构

在初始化__init__方法中,模型构建了一个灵活的多模态条件架构
首先,它基于机器人状态特征设置基础条件维度,然后根据配置添加图像特征处理能力。对于图像处理,模型提供了两种选择:

  1. 要么为每个摄像头创建独立的`DiffusionRgbEncoder`编码器(当`use_separate_rgb_encoder_per_camera`为真时)
     # 构建观察编码器(根据提供的观察类型) # 设置全局条件维度的初始值为机器人状态特征的维度 global_cond_dim = self.config.robot_state_feature.shape[0] # 如果配置中包含图像特征 if self.config.image_features:  # 获取图像特征的数量 num_images = len(self.config.image_features)  # 如果为每个摄像头使用单独的RGB编码器 if self.config.use_separate_rgb_encoder_per_camera:  # 为每个摄像头创建一个RGB编码器 encoders = [DiffusionRgbEncoder(config) for _ in range(num_images)]  # 将编码器组织为ModuleList  self.rgb_encoder = nn.ModuleList(encoders)  # 增加全局条件维度(特征维度*摄像头数量) global_cond_dim += encoders[0].feature_dim * num_images
  2. 要么使用单个共享编码器处理所有视角
     # 如果使用共享的RGB编码器 else:  # 创建单个RGB编码器 self.rgb_encoder = DiffusionRgbEncoder(config)  # 增加全局条件维度 global_cond_dim += self.rgb_encoder.feature_dim * num_images 

这种设计允许模型根据任务需求和硬件配置灵活调整,同时确保视觉特征被正确地整合到条件表示中
对于不同场景的需求,模型还可以整合环境状态作为额外条件(在环境状态特征可用的前提下),其维度也会被添加到全局条件维度中

 # 如果配置中包含环境状态特征 if self.config.env_state_feature:  # 增加全局条件维度(加上环境状态特征的维度) global_cond_dim += self.config.env_state_feature.shape[0] 

核心去噪网络采用了一维条件U-Net架构——DiffusionConditionalUnet1d 下文会介绍之,通过FiLM调制技术将全局条件信息融入去噪过程

 # 创建条件U-Net模型,条件维度是全局条件维度乘以观察步数 self.unet = DiffusionConditionalUnet1d(config, global_cond_dim=global_cond_dim * config.n_obs_steps) 
1.1.2.2 generate_actions:从观察到动作序列(先后调用_prepare_global_conditioning、conditional_sample)

`generate_actions`函数首先接收一个包含多种观察信息的批次字典,支持机器人状态(必需)、摄像头图像和环境状态(至少需要其一)

  1. 函数从批次中提取批大小和观察步数,并进行验证以确保观察历史长度与配置一致
  2. 在处理观察数据时,函数调用`_prepare_global_conditioning`方法将不同模态的观察融合为单一的全局条件向量
  3. 生成动作的核心步骤通过`conditional_sample`方法实现
  4. 最后,函数从生成的完整轨迹中提取所需的动作段。计算起始索引为`n_obs_steps - 1`(当前观察位置),结束索引通过添加所需的动作步数确定

具体而言,`conditional_sample`方法实现了扩散模型的核心采样算法

  1. 它首先从标准正态分布采样生成初始噪声轨迹,然后根据预设的时间步数,逐步应用去噪网络将噪声转换为有意义的动作序列
  2. 在每个去噪步骤中,模型预测输出并使用噪声调度器计算前一时间步的样本,逐渐将随机噪声转化为条件下的合理动作

`_prepare_global_conditioning`方法展示了处理异构数据的精妙设计

  1. 它接收包含机器人状态、可能的环境状态和图像的批次数据,对图像数据进行特征提取,并将所有条件信息聚合为单一向量
  2. 尤其对多摄像头输入的处理,采用了巧妙的张量维度重排和拼接策略,确保信息的有效整合
1.1.2.3 训练过程:compute_loss的实现

compute_loss`

# 计算损失函数的方法,接收批次数据字典,返回损失张量def compute_loss(self, batch: dict[str, Tensor]) -> Tensor: \"\"\" # 此函数要求批次数据至少包含以下内容: This function expects `batch` to have (at least): { # 观察状态:(批次大小, 观察步数, 状态维度) \"observation.state\": (B, n_obs_steps, state_dim)  # 观察图像:(批次大小, 观察步数, 相机数量, 通道数, 高度, 宽度) \"observation.images\": (B, n_obs_steps, num_cameras, C, H, W)  AND/OR # 和/或 # 环境状态:(批次大小, 环境维度) \"observation.environment_state\": (B, environment_dim)  # 动作:(批次大小, 时间范围, 动作维度) \"action\": (B, horizon, action_dim)  # 动作填充标志:(批次大小, 时间范围) \"action_is_pad\": (B, horizon) } \"\"\"
  1. 首先对干净轨迹应用前向扩散添加噪声
  2. 然后让网络预测原始轨迹或噪声本身(取决于配置的预测类型),并使用均方误差计算损失
  3. 模型还支持掩码处理填充的轨迹部分,确保训练关注有效的动作数据

具体而言

  1. 首先,函数进行严格的输入验证,确保批次数据包含必要的组件:机器人状态历史、目标动作序列、填充标志,以及至少一种环境感知信息(图像或环境状态)。这种验证机制不仅防止运行时错误,也确保了训练数据格式的一致性
     # 输入验证,确保批次包含所有必需的键 assert set(batch).issuperset({\"observation.state\", \"action\", \"action_is_pad\"}) # 确保批次包含图像或环境状态至少一种 assert \"observation.images\" in batch or \"observation.environment_state\" in batch n_obs_steps = batch[\"observation.state\"].shape[1] # 获取观察步数 horizon = batch[\"action\"].shape[1]  # 获取时间范围长度 assert horizon == self.config.horizon  # 确保时间范围与配置中的一致 assert n_obs_steps == self.config.n_obs_steps # 确保观察步数与配置中的一致
  2. 接下来是条件编码环节,函数调用`_prepare_global_conditioning`方法将多模态观察(机器人状态、环境状态和可能的多摄像头图像)转换为统一的条件向量
     # 编码图像特征并与状态向量一起连接,准备全局条件编码(B, global_cond_dim) global_cond = self._prepare_global_conditioning(batch) 

    这个过程特别针对图像数据进行了优化,根据配置选择独立或共享编码器处理多摄像头输入,通过精心设计的张量维度重排确保特征正确融合

  3. 核心的前向扩散过程是训练算法的精髓:
    函数首先获取干净的动作轨迹
     # 前向扩散过程 trajectory = batch[\"action\"] # 获取真实动作轨迹

    然后生成随机噪声并为批次中的每个样本随机采样一个噪声时间步

     # 采样噪声以添加到轨迹中,生成与轨迹相同形状的随机噪声 eps = torch.randn(trajectory.shape, device=trajectory.device) # 为批次中的每个样本随机采样一个噪声时间步 timesteps = torch.randint( # 生成随机整数时间步 low=0, # 最小值为0 # 最大值为训练时间步数 high=self.noise_scheduler.config.num_train_timesteps,  size=(trajectory.shape[0],), # 大小等于批次大小 device=trajectory.device, # 使用与轨迹相同的设备 ).long() # 转换为长整型

    使用噪声调度器将噪声按照特定时间步的幅度添加到干净轨迹中,形成有不同程度噪声的轨迹。这种随机时间步的设计确保模型学习恢复各个噪声级别的能力

     # 根据每个时间步的噪声幅度向干净轨迹添加噪声,即使用噪声调度器添加噪声 noisy_trajectory = self.noise_scheduler.add_noise(trajectory, eps, timesteps)
  4. 去噪网络(U-Net)接收噪声轨迹、时间步和全局条件,产生预测输出
     # 运行去噪网络(可能去噪轨迹或尝试预测噪声),即使用U-Net网络进行预测 pred = self.unet(noisy_trajectory, timesteps, global_cond=global_cond) 

    根据配置的预测类型,损失计算有两种模式:预测添加的噪声(\"epsilon\"模式)或直接预测原始轨迹(\"sample\"模式)

     # 计算损失 目标可能是原始轨迹或噪声 # 如果预测类型是\"epsilon\"(预测噪声) if self.config.prediction_type == \"epsilon\":  # 目标是噪声本身 target = eps# 如果预测类型是\"sample\"(预测原始样本) elif self.config.prediction_type == \"sample\":  # 目标是原始动作轨迹 target = batch[\"action\"] else: raise ValueError(f\"Unsupported prediction type {self.config.prediction_type}\") # 计算均方误差损失,不进行降维处理 loss = F.mse_loss(pred, target, reduction=\"none\") 
  5. 最后,函数处理数据集边界的填充动作。通过应用掩码,确保模型不会从人为填充的数据中学习错误模式,只关注真实轨迹部分

1.1.3 视觉处理DiffusionRgbEncoder:ResNet与SpatialSoftmax

视觉处理使用了`DiffusionRgbEncoder`,它由预训练的骨干网络(如ResNet)和创新的`SpatialSoftmax`层组成。`SpatialSoftmax`通过计算特征图激活的\"质心\",有效地将视觉特征转化为关键点表示,大幅减少维度并保留关键信息。这种设计允许模型从视觉输入中提取与任务相关的空间关系,而不需要过多手工特征工程

1.1.4 DiffusionConditionalUnet1d:1D条件U-Net架构

去噪网络采用1D条件U-Net架构(`DiffusionConditionalUnet1d`),专为处理时间序列数据而设计。它通过编码器-解码器结构和跳跃连接处理动作序列,并使用FiLM调制(Feature-wise Linear Modulation)将条件信息(观察和时间步)整合到去噪过程中

关于U-Net结构的介绍,详见此文《图像生成发展起源:从VAE、扩散模型DDPM、DDIM到DETR、ViT、Swin transformer》的「2.1.1 从扩散模型概念的提出到DDPM(含U-Net网络的简介)、DDIM」

具体而言,该网络采用经典U-Net结构,包含编码器、中间处理模块和解码器三部分。其独特之处在于将时间步和全局条件(如机器人状态、环境特征)融入到整个去噪过程中

  1. 编码器部分首先通过时间步编码器(使用正弦位置嵌入)将扩散时间步转换为特征向量,然后与全局条件特征连接,形成完整的条件表示
     # U-Net编码器 # 所有残差块共用的参数 common_res_block_kwargs = {  \"cond_dim\": cond_dim,  # 条件维度 \"kernel_size\": config.kernel_size, # 卷积核大小 \"n_groups\": config.n_groups, # 组归一化的组数 # 是否使用FiLM比例调制 \"use_film_scale_modulation\": config.use_film_scale_modulation, }

    网络的编码路径由一系列下采样块组成,每个块包含两个条件残差块和一个下采样层

     # 创建下采样模块列表 self.down_modules = nn.ModuleList([]) # 遍历每个输入/输出维度对 for ind, (dim_in, dim_out) in enumerate(in_out):  # 判断是否为最后一个块 is_last = ind >= (len(in_out) - 1)  # 添加一个下采样模块组 self.down_modules.append(  # 每个下采样模块组包含三个部分 nn.ModuleList([ # 第一个残差块,输入维度到输出维度 DiffusionConditionalResidualBlock1d(dim_in, dim_out, **common_res_block_kwargs), # 第二个残差块,维持输出维度 DiffusionConditionalResidualBlock1d(dim_out, dim_out, **common_res_block_kwargs), # 如果不是最后一个块,添加下采样层 # 下采样使用步长为2的卷积,或最后一层用恒等映射 nn.Conv1d(dim_out, dim_out, 3, 2, 1) if not is_last else nn.Identity(),] ) )

    这些残差块使用FiLM技术将条件信息注入卷积特征图中,可以根据配置选择只调制偏置或同时调制缩放和偏置

  2. U-Net的中间部分进一步处理特征,保持空间维度不变
     # 自编码器的中间处理部分 # 创建中间模块列表 self.mid_modules = nn.ModuleList( [ # 第一个中间残差块 DiffusionConditionalResidualBlock1d(# 输入和输出维度都是最深层的维度  config.down_dims[-1], config.down_dims[-1], **common_res_block_kwargs  ), # 第二个中间残差块 DiffusionConditionalResidualBlock1d( # 输入和输出维度都是最深层的维度  config.down_dims[-1], config.down_dims[-1], **common_res_block_kwargs  ), ] )
  3. 而解码路径则通过上采样块逐步恢复原始维度,并通过跳跃连接整合编码器的高分辨率特征
     # U-Net解码器 # 创建上采样模块列表 self.up_modules = nn.ModuleList([]) # 遍历反转的输入/输出维度对(解码器方向) for ind, (dim_out, dim_in) in enumerate(reversed(in_out[1:])):  # 判断是否为最后一个块 is_last = ind >= (len(in_out) - 1)  # 添加一个上采样模块组 self.up_modules.append(  # 每个上采样模块组包含三个部分 nn.ModuleList([ # 输入维度*2,因为包含了编码器的跳跃连接 # 第一个残差块,考虑跳跃连接 DiffusionConditionalResidualBlock1d(dim_in * 2, dim_out, **common_res_block_kwargs), # 第二个残差块,维持输出维度 DiffusionConditionalResidualBlock1d(dim_out, dim_out, **common_res_block_kwargs), # 如果不是最后一个块,添加上采样层 # 上采样使用转置卷积,或最后一层用恒等映射 nn.ConvTranspose1d(dim_out, dim_out, 4, 2, 1) if not is_last else nn.Identity(),] ) )

    最终的卷积层序列为

     # 最终的卷积层序列 self.final_conv = nn.Sequential(  # 第一个卷积块,维持通道数 DiffusionConv1dBlock(config.down_dims[0], config.down_dims[0], kernel_size=config.kernel_size),  # 1x1卷积将通道数映射回动作特征维度 nn.Conv1d(config.down_dims[0], config.action_feature.shape[0], 1), )

在前向传播过程中

  1. 输入数据首先被重排以适应一维卷积(将时间维度放在最后),然后依次通过编码器、中间层和解码器
  2. 每个条件残差块都接收特征图和条件信息,应用FiLM调制来适应特定的观察和时间步
  3. 最后,通过最终卷积层恢复动作维度,并将输出重排回原始形状

用个示意图画一下 上面卷积结构,如下所示

输入: [B,T,D] 形状的噪声动作轨迹 | ▼重排为 [B,D,T](卷积输入格式) | |  +----------------------+ |  | 条件输入: | |  | - 时间步 t  | |  | - 全局观察特征 | |  +----------+-----------+ | | | ▼ |  +----------------------+ |  | 时间步编码器 | |  | (正弦位置嵌入+MLP) | |  +----------+-----------+ | | | ▼ |  +----------------------+ |  | 全局特征向量 | |  | (时间步+观察条件) | |  +----------+-----------+ | | | | ▼ |+------------------------------+ || U-Net编码器 | || | || ┌───────────────────────────┐| || │第1下采样块:  ││ || │ - 条件残差块1 (输入→h1) │◄───┘ FiLM条件| │ - 条件残差块2 (h1→h1) ││ | │ - 下采样层 (h1→h1) ││ ┌─────────┐| └───────────┬───────────────┘│ │ │| ▼ │ │ 跳跃连接 │| ┌───────────────────────────┐│ │ 存储 │| │第2下采样块:  ││ │ │| │ - 条件残差块1 (h1→h2) │◄───┘ │| │ - 条件残差块2 (h2→h2) ││ │| │ - 下采样层 (h2→h2) ││ ┌─────────┐│| └───────────┬───────────────┘│ │ ││| ▼ │ │ 跳跃连接 ││| . │ │ 存储 ││| . │ │ ││| ┌───────────────────────────┐│ │ ││| │最后下采样块:  ││ │ ││| │ - 条件残差块1 (h_n-1→h_n) │◄───┘ ││| │ - 条件残差块2 (h_n→h_n) ││ ││| │ - Identity层  ││ ┌─────────┐││| └───────────┬───────────────┘│ │ │││+-------------┼───────────────-+ │ │││  ▼  │ │││+------------------------------+ │ │││| 中间处理模块 | │ │││| ┌───────────────────────────┐| │ │││| │ - 条件残差块1 (h_n→h_n) │◄───┘ │││| │ - 条件残差块2 (h_n→h_n) ││ │││| └───────────┬───────────────┘| │││+-------------┼────────────────+ │││  ▼ │││+------------------------------+ │││| U-Net解码器 | │││| ┌───────────────────────────┐| │││| │第1上采样块:  ││ │││| │ - 拼接跳跃连接 [h_n,h_n] │◄─────────────┘││| │ - 条件残差块1 (2*h_n→h_n-1)◄────┘ ││| │ - 条件残差块2 (h_n-1→h_n-1)│  ││| │ - 上采样层 (h_n-1→h_n-1) ││  ││| └───────────┬───────────────┘│  ││| ▼ │  ││| ┌───────────────────────────┐|  ││| │第2上采样块:  ││  ││| │ - 拼接跳跃连接 [h_n-1,h_n-1]◄─────────────┘│| │ - 条件残差块1 (2*h_n-1→h_n-2)◄────┘ │| │ - 条件残差块2 (h_n-2→h_n-2)│  │| │ - 上采样层 (h_n-2→h_n-2) ││  │| └───────────┬───────────────┘│  │| ▼ │  │| . │  │| . │  │| ┌───────────────────────────┐|  │| │最后上采样块:  ││  │| │ - 拼接跳跃连接 [h2,h1] │◄───────────────┘| │ - 条件残差块1 (2*h1→h0) │◄────┘| │ - 条件残差块2 (h0→h0) ││| │ - Identity层  ││| └───────────┬───────────────┘│+-------------┼───────────────-+  ▼+------------------------------+| 最终输出层 || ┌───────────────────────────┐|| │ - DiffusionConv1dBlock │|| │ - Conv1d (h0→输出维度) │|| └───────────┬───────────────┘|+-------------┼────────────────+  ▼重排为 [B,T,D](原始格式)  ▼ 输出: [B,T,D] 预测结果

1.2 configuration_diffusion.py

// 待更

第二部分 DP的编码实现:解读DexCap库中的diffusion_policy.py

扩散策略(Diffusion Policy),包括网络的创建、数据处理、训练、推理和模型的序列化与反序列(且注意,本部分分析的代码来自DexCap库中的STEP3_train_policy/robomimic/algo/diffusion_policy.py,至于原装的扩散策略GitHub代码仓库则见:real-stanford/diffusion_policy)

  1. 导入依赖:代码开始部分导入了所需的库和模块。
  2. 算法配置到类的映射函数 (algo_config_to_class):这个函数根据算法配置(algo_config)来决定使用哪个算法类(DiffusionPolicyUNetDex 或 DiffusionPolicyUNet)以及传递给算法的额外参数。
  3. DiffusionPolicyUNetDex 类:这是一个算法类,继承自PolicyAlgo。它实现了扩散策略的核心功能,包括:
    _create_networks:创建和配置网络结构
    _adjust_brightness:调整图像的亮度
    process_batch_for_training:处理训练数据
    train_on_batch:在单个数据批次上训练模型
    log_info:记录训练信息
    reset:重置算法状态,为环境rollout做准备
    get_action:根据观察和目标获取策略动作
    _get_action_trajectory:获取动作轨迹
    serialize 和 deserialize:序列化和反序列化模型参数
  4. DiffusionPolicyUNet 类:这个类与DiffusionPolicyUNetDex类似,但可能有一些不同的实现细节
    比如在网络结构配置上
    DiffusionPolicyUNetDex 类在创建网络时,会根据是否是单手或双手操作来设置不同的参数,如self.arm_split 和 self.action_arm_dim。这是为了适应不同的机械手臂配置。
    DiffusionPolicyUNet 类没有这样的区分,它直接使用self.ac_dim作为动作维度

    再比如在动作维度处理上
    DiffusionPolicyUNetDex中,有一个专门的处理流程来区分手臂和手部的动作维度,这在_get_action_trajectory方法中体现得尤为明显,其中会分别处理手臂和手部的动作。
    DiffusionPolicyUNet则没有这样的区分,它直接处理整个动作序列

    再比如在动作序列处理上
    DiffusionPolicyUNetDex 类在get_action方法中,会从动作队列中弹出动作,并且每次只处理一个动作
    DiffusionPolicyUNet 类在get_action方法中,会将整个动作序列放入动作队列,并逐个处理

  5. 辅助函数
    replace_submodulesreplace_bn_with_gn:这些函数用于替换网络中的特定模块,例如将BatchNorm替换为GroupNorm
  6. 辅助类
    SinusoidalPosEmb、Downsample1d、Upsample1d、Conv1dBlock、ConditionalResidualBlock1D 和 ConditionalUnet1D:这些类定义了网络的不同组件,如卷积块、下采样、上采样和条件残差块

    相当于定义了一个基于UNet的网络结构,用于处理时间序列数据(如动作和观察)。这个网络结构包括多个层,每层都有条件残差块和可能的下采样或上采样操作

  7. 扩散调度器:代码中使用了DDPMSchedulerDDIMScheduler,这些是扩散模型中用于控制噪声添加和去除的调度器
  8. 指数移动平均(EMA):代码中还提到了EMA,这是一种用于平滑模型权重的技术,可以提高训练的稳定性

本第二部分的目录 如下

第二部分 Diffusion Policy的编码实现与源码解读

2.1 DiffusionPolicyUNetDex:实现扩散策略的核心功能

2.1.1 _create_networks:创建和配置网络结构(包含观察编码、噪声预测、噪声调度)

2.1.2 _adjust_brightness:调整图像帧的亮度

2.1.3 process_batch_for_training:处理从数据加载器中采样的数据批次

2.1.4 train_on_batch:在单个数据批次上训练模型(含添加噪声、预测噪声)

2.1.5 log_info:从train_on_batch方法中获取信息,并记录到TensorBoard

2.1.6 reset

2.1.7 get_action:获取策略动作输出

2.1.8 _get_action_trajectory:生成动作轨迹(接受当前观察和目标,返回动作序列)

2.1.9 serialize 和 deserialize

2.2 辅助函数:替换网络中的特定模块(例如将BatchNorm替换为GroupNorm)

2.2.1 replace_submodules:替换符合条件的子模块

2.2.2 replace_bn_with_gn:调用上面的replace_submodules执行相关模块的替换

2.3 UNet for Diffusion

2.3.1 SinusoidalPosEmb:实现正弦位置嵌入

2.3.2 Downsample1d与Upsample1d:下采样与下采样的实现

2.3.3 Conv1dBlock:实现一维卷积块

2.3.4 ConditionalResidualBlock1D:实现条件残差块

2.3.4 ConditionalUnet1D:实现条件一维U-Net


2.1 DiffusionPolicyUNetDex:实现扩散策略的核心功能

2.1.1 _create_networks:创建和配置网络结构(包含观察编码、噪声预测、噪声调度)

这个方法负责创建和配置网络结构,包括观察编码器(obs_encoder)和噪声预测网络(noise_pred_net)。它还设置了噪声调度器(noise_scheduler)和指数移动平均(EMA)模型。

  1. 设置观察组:设置不同的观察组,并从配置中获取编码器的参数
    ​def _create_networks(self): \"\"\" 创建网络并将它们放置在 @self.nets 中。 \"\"\"​ # 为 @MIMO_MLP 设置不同的观察组 if \"label\" 在 self.obs_shapes.keys(): del self.obs_shapes[\"label\"] observation_group_shapes = OrderedDict() observation_group_shapes[\"obs\"] = OrderedDict(self.obs_shapes) encoder_kwargs = ObsUtils.obs_encoder_kwargs_from_config(self.obs_config.encoder)
  2. 创建观察编码器:创建观察编码器,并将所有 BatchNorm 替换为 GroupNorm,以配合EMA 使用
     obs_encoder = ObsNets.ObservationGroupEncoder( observation_group_shapes=observation_group_shapes, encoder_kwargs=encoder_kwargs, ) # 重要! # 将所有 BatchNorm 替换为 GroupNorm 以配合 EMA 使用 # 如果忘记这样做,性能会大幅下降! obs_encoder = replace_bn_with_gn(obs_encoder) obs_dim = obs_encoder.output_shape()[0]
  3. 设置手部噪声和动作维度:设置是否使用手部噪声,并根据手部类型(单手或双手)设
    置动作维度
     self.use_handnoise = True if self.obs_shapes[\"robot0_eef_pos\"][0] == 3: # 单手 self.arm_split = 3 self.action_arm_dim = 7 else: # 双手 self.arm_split = 6 self.action_arm_dim = 14
  4. 创建网络对象:通过ConditionalUnet1D(下文2.3.5节会详述)创建噪声预测网络「基于当前观察条件和目标的轨迹,逐步去除噪声,生成一系列“高层次”动作指令」,并将观察编码器obs_encoder和噪声预测网络noise_pred_net放置在 self.nets中
     # 创建网络对象 noise_pred_net = ConditionalUnet1D( input_dim=self.ac_dim, global_cond_dim=obs_dim * self.algo_config.horizon.observation_horizon ) # 最终架构有两部分 nets = nn.ModuleDict({ \'policy\': nn.ModuleDict({ \'obs_encoder\': obs_encoder, \'noise_pred_net\': noise_pred_net }) }) nets = nets.float().to(self.device)
  5. 设置噪声调度器:根据配置设置噪声调度器
    如果启用了DDPM,则使用DDPMScheduler——控制去噪过程的步数和每一步的噪声水平
     # 设置噪声调度器 noise_scheduler = None if self.algo_config.ddpm.enabled: noise_scheduler = DDPMScheduler( num_train_timesteps=self.algo_config.ddpm.num_train_timesteps, beta_schedule=self.algo_config.ddpm.beta_schedule, clip_sample=self.algo_config.ddpm.clip_sample, prediction_type=self.algo_config.ddpm.prediction_type )

    如果启用了DDIM,则使用DDIMScheduler

     elif self.algo_config.ddim.enabled: noise_scheduler = DDIMScheduler( num_train_timesteps=self.algo_config.ddim.num_train_timesteps, beta_schedule=self.algo_config.ddim.beta_schedule, clip_sample=self.algo_config.ddim.clip_sample, set_alpha_to_one=self.algo_config.ddim.set_alpha_to_one, steps_offset=self.algo_config.ddim.steps_offset, prediction_type=self.algo_config.ddim.prediction_type )

    否则,抛错

     else: raise RuntimeError()
  6. 设置 EMA:根据配置设置 EMA 模型
     # 设置 EMA ema = None if self.algo_config.ema.enabled: ema = EMAModel(model=nets, power=self.algo_config.ema.power)
  7. 设置属性:设置类的属性,包括网络、噪声调度器、EMA 模型、动作检查标志、观察队
    列和动作队列
     # 设置属性 self.nets = nets self.noise_scheduler = noise_scheduler self.ema = ema self.action_check_done = False self.obs_queue = None self.action_queue = None

2.1.2 _adjust_brightness:调整图像帧的亮度

调整图像帧的亮度。这个方法确保输入张量在CUDA设备上,并为每个批次生成一个随机的亮度调整因子,然后应用这个因子并限制值的范围在0到1之间

  1. 方法定义​​​​​​
    def _adjust_brightness(self, tensor): \"\"\" 调整一批图像帧的亮度。 期望张量的形状为 [batch_size, frame_sequence, channels, height, width]。 每批次的亮度调整是相同的,但在批次之间有所不同。 Args: tensor (torch.Tensor): 输入张量。 Returns: torch.Tensor: 亮度调整后的张量。 \"\"\"
  2. 确保张量在 CUDA 设备上:检查输入张量是否在 CUDA 设备上。如果不是,则抛出一个错误
     # 确保张量在 CUDA 设备上 if not tensor.is_cuda: raise ValueError(\"输入张量不是 CUDA 张量\")
  3. 生成随机亮度调整因子:为每个批次生成一个随机的亮度调整因子。因子的范国在 0.8
    到1.2 之间,但可以根据需要调整范围
     # 为每个批次生成一个随机的亮度调整因子 # 因子的范围例如在 0.5 到 1.5 之间,但可以调整范围 batch_size = tensor.size(0) brightness_factor = torch.empty(batch_size, 1, 1, 1, 1).uniform_(0.8, 1.2).to(tensor.device)
  4. 应用亮度调整:将亮度调整因子应用到输入张量上
     # 应用亮度调整 adjusted_tensor = tensor * brightness_factor
  5. 剪辑值以确保它们在。到1范围内:将调整后的张量值剪辑到 0 到1的范围内,以确保亮度调整后的值在有效范围内
     # 剪辑值以确保它们在 0 到 1 范围内 adjusted_tensor.clamp_(0, 1)
  6. 返回调整后的张量:返回亮度调整后的张量
     return adjusted_tensor

2.1.3 process_batch_for_training:处理从数据加载器中采样的数据批次

处理从数据加载器中采样的数据批次,以过滤相关信息并准备用于训练的批次。它还检查动作是否被归一化到[-1,1]的范围内。

2.1.4 train_on_batch:在单个数据批次上训练模型(含添加噪声、预测噪声)

该函数train_on_batch在单个数据批次上训练模型,具体而言,逐一执行以下步骤

  1. 定义
    def train_on_batch(self, batch, epoch, validate=False): \"\"\" 在单个数据批次上进行训练 Args: batch (dict): 从数据加载器中采样并由 @process_batch_for_training 过滤的包含 torch.Tensors 的字典 epoch (int): 纪元编号 - 某些算法需要执行分阶段训练和提前停止 validate (bool): 如果为 True,则不执行任何学习更新 Returns: info (dict): 包含相关输入、输出和损失的字典,可能与日志记录相关 \"\"\"
  2. 获取时间步长和动作维度:从算法配置中获取观察、动作和预测的时间步长,以及动作维度
     To = self.algo_config.horizon.observation_horizon # 获取观察时间步长 Ta = self.algo_config.horizon.action_horizon # 获取动作时间步长 Tp = self.algo_config.horizon.prediction_horizon # 获取预测时间步长 action_dim = self.ac_dim  # 获取动作维度 B = batch[\'actions\'].shape[0] # 获取批次大小
  3. 进入无梯度上下文:如果 validate 为 True,则不计算梯度
     with TorchUtils.maybe_no_grad(no_grad=validate): # 进入无梯度上下文,如果 validate 为 True,则不计算梯度 info = super(DiffusionPolicyUNetDex, self).train_on_batch(batch, epoch, validate=validate) # 调用父类的 train_on_batch 方法 actions = batch[\'actions\'] # 获取动作张量
  4. 添加手部噪声:如果使用手部噪声并且存在手部观察,则生成手部噪声并添加到手部观察中
     if self.use_handnoise and \"robot0_eef_hand\" in self.obs_shapes.keys(): # 如果使用手部噪声并且存在手部观察 handnoise = torch.randn_like(batch[\"obs\"][\"robot0_eef_hand\"]) * 0.03 # 生成手部噪声 batch[\"obs\"][\"robot0_eef_hand\"] += handnoise # 将噪声添加到手部观察中
  5. 编码观察:将观察和目标编码为特征,并将观察特征展平
     # 编码观察 inputs = { \'obs\': batch[\"obs\"], # 获取观察 \'goal\': batch[\"goal_obs\"] # 获取目标观察 } for k in self.obs_shapes: # 遍历观察形状 # 输入的前两个维度应该是 [B, T] assert inputs[\'obs\'][k].ndim - 2 == len(self.obs_shapes[k]) obs_features = TensorUtils.time_distributed(inputs, self.nets[\'policy\'][\'obs_encoder\'], inputs_as_kwargs=True) # 编码观察 assert obs_features.ndim == 3 # [B, T, D] obs_cond = obs_features.flatten(start_dim=1) # 将观察特征展平
  6. 采样噪声并添加到动作中:生成噪声,并根据扩散迭代时间步长将噪声添加到动作中
     # 采样噪声以添加到动作中 noise = torch.randn(actions.shape, device=self.device) # 生成噪声 # 为每个数据点采样一个扩散迭代 timesteps = torch.randint( 0, self.noise_scheduler.config.num_train_timesteps, (B,), device=self.device ).long() # 生成扩散迭代时间步长 # 根据每次扩散迭代的噪声幅度将噪声添加到干净的动作中 # (这是前向扩散过程) noisy_actions = self.noise_scheduler.add_noise(actions, noise, timesteps) # 添加噪声到动作中
  7. 预测噪声残差:使用噪声预测网络noise_pred_net 预测噪声残差
     # 预测噪声残差 noise_pred = self.nets[\'policy\'][\'noise_pred_net\'](noisy_actions, timesteps, global_cond=obs_cond) # 预测噪声残差 noise_arm = noise[..., :self.action_arm_dim].contiguous() # 获取手臂噪声 noise_pred_arm = noise_pred[..., :self.action_arm_dim].contiguous() # 获取预测的手臂噪声 noise_hand = noise[..., self.action_arm_dim:].contiguous() # 获取手部噪声 noise_pred_hand = noise_pred[..., self.action_arm_dim:].contiguous() # 获取预测的手部噪声
  8. 计算 L2 损失:计算手臂和手部的L2 损失
     # L2 损失 loss = F.mse_loss(noise_pred_arm, noise_arm) * 0.7 + F.mse_loss(noise_pred_hand, noise_hand) * 0.3 # 计算 L2 损失
  9. 日志记录:记录 L2 损失,并将损失从计算图中分离
     # 日志记录 losses = { \'l2_loss\': loss # 记录 L2 损失 } info[\"losses\"] = TensorUtils.detach(losses) # 将损失从计算图中分离
  10. 梯度步骤:如果不是验证模式,则执行梯度步骤,并更新 EMA
     if not validate: # 如果不是验证模式 # 梯度步骤 policy_grad_norms = TorchUtils.backprop_for_loss(  net=self.nets, # 网络  optim=self.optimizers[\"policy\"], # 优化器  loss=loss, # 损失 ) # 更新模型权重的指数移动平均值 if self.ema is not None: # 如果启用了 EMA  self.ema.step(self.nets) # 更新 EMA step_info = {  \'policy_grad_norms\': policy_grad_norms # 记录梯度范数 } info.update(step_info) # 更新信息字典
  11. 返回信息字典:返回包含相关输入、输出和损失的信息字典
     return info # 返回信息字典

2.1.5 log_info:从train_on_batch方法中获取信息,并记录到TensorBoard

处理训练批次的信息,以汇总要传递给TensorBoard进行记录的信息。

2.1.6 reset

重置算法状态,为环境rollout做准备。这包括设置观察队列和动作队列

2.1.7 get_action:获取策略动作输出

get_action这个方法用于获取策路动作输出。它接受当前观察和可选的目标作为输入,并返回动作张量

def get_action(self, obs_dict, goal_dict=None): \"\"\" 获取策略动作输出。 Args: obs_dict (dict): 当前观察 [1, Do] goal_dict (dict): (可选)目标 Returns: action (torch.Tensor): 动作张量 [1, Da] \"\"\"

具体步骤包括:

  1. 获取观察和动作的时问步长:从算法配置中获取观察和动作的时间步长
    (注,如上文1.2.2节中所述,T_{o}表示观测范围,T_{p}表示动作预测范围,而T_{a}则代表了动作执行范围
     # obs_dict: key: [1,D] To = self.algo_config.horizon.observation_horizon Ta = self.algo_config.horizon.action_horizon
  2. 检查动作队列是否为空:如果动作队列为空,则运行推理以生成动作序列,并将其放入动作队列中

    至于其中的_get_action_trajectory,下一节会讲其实现

     if len(self.action_queue) == 0: # 没有剩余动作,运行推理 # 将 obs_queue 转换为张量字典(在 T 维度上连接) # import pdb; pdb.set_trace() # obs_dict_list = TensorUtils.list_of_flat_dict_to_dict_of_list(list(self.obs_queue)) # obs_dict_tensor = dict((k, torch.cat(v, dim=0).unsqueeze(0)) for k,v in obs_dict_list.items()) # 运行推理 # [1,T,Da] action_sequence = self._get_action_trajectory(obs_dict=obs_dict) # 将动作放入队列 self.action_queue.extend(action_sequence[0][3:13])
  3. 执行动作:从动作队列中取出一个动作,并将其维度扩展为[1,Da],然后返回该动作
     # 有动作,从左到右执行 # [Da] action = self.action_queue.popleft() # [1,Da] action = action.unsqueeze(0) return action

2.1.8 _get_action_trajectory:生成动作轨迹(接受当前观察和目标,返回动作序列)

这段代码实现了一个名为-get_action_trajectory 的方法,用于生成动作轨迹,它接受当前观察和可选的目标作为输入,并返回动作序列

def _get_action_trajectory(self, obs_dict, goal_dict=None): assert not self.nets.training

具体步骤包括:

  1. 获取时间步长和动作维度:从算法配置中获取观察、动作和预测的时间步长,以及动作维度
     To = self.algo_config.horizon.observation_horizon Ta = self.algo_config.horizon.action_horizon Tp = self.algo_config.horizon.prediction_horizon action_dim = self.ac_dim
  2. 设置推理时问步长:根据配置设置推理时间步长
     if self.algo_config.ddpm.enabled is True: num_inference_timesteps = self.algo_config.ddpm.num_inference_timesteps elif self.algo_config.ddim.enabled is True: num_inference_timesteps = self.algo_config.ddim.num_inference_timesteps else: raise ValueError
  3. 选择网络:选择用于推理的网络。如果启用了 EMA,则使用 EMA 的平均模型
     # 选择网络 nets = self.nets if self.ema is not None: nets = self.ema.averaged_model
  4. 编码观察:将观察和目标编码为特征
    首先,确保输入的前两个维度是 [B,T〕,然后,使用时问分布编码器将输入编码为特征
     # 编码观察 inputs = { \'obs\': obs_dict, \'goal\': goal_dict } for k in self.obs_shapes: # 前两个维度应该是 [B, T] assert inputs[\'obs\'][k].ndim - 2 == len(self.obs_shapes[k]) obs_features = TensorUtils.time_distributed(inputs, self.nets[\'policy\'][\'obs_encoder\'], inputs_as_kwargs=True) assert obs_features.ndim == 3 # [B, T, D] B = obs_features.shape[0]
  5. 重塑观察:将观察特征重塑为 (B,obs_horizon * obs_dim)的形状
     # 将观察重塑为 (B,obs_horizon*obs_dim) obs_cond = obs_features.flatten(start_dim=1)
  6. 从高斯噪声初始化动作:从高斯噪声初始化动作
     # 从高斯噪声初始化动作 noisy_action = torch.randn( (B, Tp, action_dim), device=self.device) naction = noisy_action
  7. 初始化调度器:初始化噪声调度器
     # 初始化调度器 self.noise_scheduler.set_timesteps(num_inference_timesteps)
  8. 预测噪声并进行逆扩散步骤:在每个时间步长上预测噪声,并进行逆扩散步骤以去除噪声
     for k in self.noise_scheduler.timesteps: # 预测噪声 noise_pred = nets[\'policy\'][\'noise_pred_net\']( sample=naction, timestep=k, global_cond=obs_cond ) # 逆扩散步骤(去除噪声) naction = self.noise_scheduler.step( model_output=noise_pred, timestep=k, sample=naction ).prev_sample
  9. 处理动作并返回:使用动作时间步长处理动作,并返回动作序列
     # 使用 Ta 处理动作 start = To - 1 end = start + Ta action = naction[:, start:end] return action

2.1.9 serialize 和 deserialize

序列化和反序列化模型参数。serialize 方法返回一个包含网络参数的字典,而 deserialize 方法从这个字典中加载模型参数

2.2 辅助函数:替换网络中的特定模块(例如将BatchNorm替换为GroupNorm)

2.2.1 replace_submodules:替换符合条件的子模块

该函数用于替换符合条件的子模块

  1. 方法定义
    def replace_submodules( root_module: nn.Module, predicate: Callable[[nn.Module], bool], func: Callable[[nn.Module], nn.Module]) -> nn.Module: \"\"\" Replace all submodules selected by the predicate with the output of func. predicate: Return true if the module is to be replaced. func: Return new module to use. \"\"\"

    它接受三个参数:
    。root_module:根模块,类型为 nn. Module
    。predicate :谓词函数,接受一个模块作为输入,返回一个布尔值,表示该模块是
    否需要被替换
    。func:函数,接受一个模块作为输入,返回一个新的模块,用于替换原模块

  2. 检查根模块是否符合条件,如果符合条件,则直接返回替换后的模块
     if predicate(root_module): return func(root_module)
  3. 检查pytorch版本是否大于1.9,如果版本过低,则抛出导入错误
     if parse_version(torch.__version__) = 1.9.0\')
  4. 查找符合条件的子模块

     bn_list = [k.split(\'.\') for k, m in root_module.named_modules(remove_duplicate=True) if predicate(m)]
  5. 替换符合条件的子模块,具体而言
    首先获取父模块

     for *parent, k in bn_list: parent_module = root_module if len(parent) > 0: parent_module = root_module.get_submodule(\'.\'.join(parent))

    其次获取源模块

     if isinstance(parent_module, nn.Sequential): src_module = parent_module[int(k)] else: src_module = getattr(parent_module, k)

    接着使用func函数生成目标模块

     tgt_module = func(src_module)

    最后将源模块替换为目标模块

     if isinstance(parent_module, nn.Sequential): parent_module[int(k)] = tgt_module else: setattr(parent_module, k, tgt_module)
  6. 验证所有模块已被替换

     # verify that all modules are replaced bn_list = [k.split(\'.\') for k, m in root_module.named_modules(remove_duplicate=True) if predicate(m)] assert len(bn_list) == 0

    这部分代码再次查找所有符合条件的子模块,并确保它们已被替换,如果还有未被替换的模块,则抛出断言错误

  7. 返回根模块

     return root_module

2.2.2 replace_bn_with_gn:调用上面的replace_submodules执行相关模块的替换

该函数用于将所有的BatchNorm层替换为GroupNorm层

  1. 方法定义,其接受两个参数
    def replace_bn_with_gn( root_module: nn.Module, features_per_group: int=16) -> nn.Module: \"\"\" 将所有 BatchNorm 层替换为 GroupNorm 层。 Args: root_module (nn.Module): 根模块 features_per_group (int): 每组的特征数,默认为 16 Returns: nn.Module: 替换后的根模块 \"\"\"

    root_module:根模块,类型为 nn.Module。
    features_per_group:每组的特征数,默认为 16

  2. 调用replace_submodules函数
    1.根模块:传递根模块 root_module
     replace_submodules( root_module=root_module,

    2.谓词函数:传递一个匿名函数 Lambda x: isinstanceCx, nn. BatchNorm2d),用于检查模块是否为 nn. BatchNorm2d

     predicate=lambda x: isinstance(x, nn.BatchNorm2d),

    3.替换函数:传递一个匿名函数 Lambda x:nn. GroupNorm(num_groups=x.num_features // features_per_group,num-channel s=x.num_features),用于将 BatchNorm 层替换为 GroupNorm层

     func=lambda x: nn.GroupNorm( num_groups=x.num_features // features_per_group, num_channels=x.num_features) )

2.3 UNet for Diffusion

2.3.1 SinusoidalPosEmb:实现正弦位置嵌入

SinusoidalPosEmb这个类实现了正弦位置嵌入,用于将输入张量转换为正弦和余弦形式的嵌入

class SinusoidalPosEmb(nn.Module): // 初始化方法接收一个参数dim,表示嵌入的维度,并将其存储在实例变量self.dim中 def __init__(self, dim): super().__init__() self.dim = dim // 前向传播方法,接受一个输入张量x def forward(self, x): device = x.device half_dim = self.dim // 2 emb = math.log(10000) / (half_dim - 1) // 生成嵌入向量 emb = torch.exp(torch.arange(half_dim, device=device) * -emb) // 输入张量和嵌入向量相乘 emb = x[:, None] * emb[None, :] // 将正弦和余弦嵌入拼接在一起 emb = torch.cat((emb.sin(), emb.cos()), dim=-1) return emb

2.3.2 Downsample1d与Upsample1d:下采样与下采样的实现

Downsample1d这个类则实现了一维下采样,永远将输入张量的时间维度减半

  1. 初始化
     def __init__(self, dim): super().__init__() self.conv = nn.Conv1d(dim, dim, 3, 2, 1)

    初始化方法接受一个参数 dim,表示输入和输出的通道数,并创建一个一维卷积层
    self.conv,用于下采样
     

  2. 前向传播
     def forward(self, x): return self.conv(x)

    前向传播方法接受一个输入张量 x,并通过卷积层 self.conv 进行下采样,然后返回
    下采样后的张量

Upsample1d这个类则实现了一维上采样,用于将输入张量的时间维度加倍

  1. 初始化
     def __init__(self, dim): super().__init__() self.conv = nn.ConvTranspose1d(dim, dim, 4, 2, 1)

    初始化方法接受一个参数dim,表示输入和输出的通道数,并创建一个一维转置卷积层
    self.conv ,用于上采样

  2. 前向传播
     def forward(self, x): return self.conv(x)

2.3.3 Conv1dBlock:实现一维卷积块

class Conv1dBlock(nn.Module): \'\'\' Conv1d --> GroupNorm --> Mish 一维卷积 --> 组归一化 --> Mish 激活函数 \'\'\' def __init__(self, inp_channels, out_channels, kernel_size, n_groups=8): super().__init__() # 调用父类的初始化方法 # 使用 nn.Sequential 创建一个顺序容器 self.block = nn.Sequential( nn.Conv1d(inp_channels, out_channels, kernel_size, padding=kernel_size // 2), # 一维卷积层,带有适当的填充 nn.GroupNorm(n_groups, out_channels), # 组归一化层 nn.Mish(), # Mish 激活函数 ) def forward(self, x): return self.block(x) # 前向传播方法,将输入 x 通过顺序容器 self.block 进行处理

可以看到

  1. • 初始化方法接受四个参数:
    。inp_channels:输入通道数
    。out_channels:输出通道数
    。kernel_size:卷积核大小
    。n_groups:组归一化的组数,默认为 8
  2. 然后初始化方法创建一个顺序容器 self.block,包含以下层:
    1.一维卷积层:nn. Conv1d,输入通道数为 inp_channels,输出通道数为out_channels,卷积核大小为 kernel-size,填充为 kernel-size // 2
    2.组归一化层:nn.GroupNorm

2.3.4 ConditionalResidualBlock1D:实现条件残差块

这个类ConditionalResidualBlock1D用于实现条件一维残差块

  1. 类定义
    class ConditionalResidualBlock1D(nn.Module): # 定义 ConditionalResidualBlock1D 类,继承自 nn.Module
  2. 初始化方法
     def __init__(self, in_channels, # 输入通道数 out_channels, # 输出通道数 cond_dim, # 条件维度 kernel_size=3, # 卷积核大小,默认为 3 n_groups=8): # GroupNorm 的组数,默认为 8 super().__init__() # 调用父类的初始化方法
  3. 定义卷积块
     self.blocks = nn.ModuleList([ # 定义卷积块列表 Conv1dBlock(in_channels, out_channels, kernel_size, n_groups=n_groups), # 第一个卷积块 Conv1dBlock(out_channels, out_channels, kernel_size, n_groups=n_groups),  # 第二个卷积块 ])
  4. 定义条件编码器
     # FiLM modulation https://arxiv.org/abs/1709.07871 # predicts per-channel scale and bias cond_channels = out_channels * 2 # 条件通道数为输出通道数的两倍 self.out_channels = out_channels # 设置输出通道数 self.cond_encoder = nn.Sequential( # 定义条件编码器 nn.Mish(), # Mish 激活函数 nn.Linear(cond_dim, cond_channels), # 线性层 nn.Unflatten(-1, (-1, 1)) # 取消展平 )
  5. 定义残差卷积
     # make sure dimensions compatible self.residual_conv = nn.Conv1d(in_channels, out_channels, 1) \\ # 定义残差卷积,如果输入通道数不等于输出通道数,则进行卷积 if in_channels != out_channels else nn.Identity() # 否则使用 Identity 层
  6. 前向传播方法
     def forward(self, x, cond): # 定义前向传播方法 \'\'\' x : [ batch_size x in_channels x horizon ] cond : [ batch_size x cond_dim] returns: out : [ batch_size x out_channels x horizon ] \'\'\'
  7. 通过第一个卷积块
     out = self.blocks[0](x) # 通过第一个卷积块
  8. 编码条件
     embed = self.cond_encoder(cond) # 编码条件 embed = embed.reshape( # 重新调整条件编码的形状 embed.shape[0], 2, self.out_channels, 1) scale = embed[:,0,...] # 获取缩放因子 bias = embed[:,1,...] # 获取偏置 out = scale * out + bias # 应用 FiLM 调制
  9. 通过第二个卷积块
     out = self.blocks[1](out) # 通过第二个卷积块
  10. 添加残差连接
     out = out + self.residual_conv(x) # 添加残差连接 return out # 返回输出

2.3.5 ConditionalUnet1D:实现条件一维U-Net——预测扩散过程中的噪声

ConditionalUnet1D这个类继承自nn.Module,用于实现条件一维U-Net,其用于预测扩散过程中的噪声

  1. 首先,其接受多个参数,用于配置 U-Net 的结构和参数
     def __init__(self, input_dim, # 动作的维度 global_cond_dim, # 全局条件的维度,通常是 obs_horizon * obs_dim diffusion_step_embed_dim=256, # 扩散迭代 k 的位置编码大小 down_dims=[256,512,1024], # 每个 U-Net 层的通道大小,数组的长度决定了层数 kernel_size=5, # 卷积核大小 n_groups=8 # GroupNorm 的组数 ):
  2. 初始化父类和设置维度
     super().__init__() # 调用父类的初始化方法 all_dims = [input_dim] + list(down_dims) # 设置所有层的维度 start_dim = down_dims[0] # 设置起始维度
  3. 定义扩散步骤编码器:用于对扩散步骤进行编码,并计算条件维度
     dsed = diffusion_step_embed_dim # 扩散步骤编码维度 diffusion_step_encoder = nn.Sequential( # 定义扩散步骤编码器 SinusoidalPosEmb(dsed), # 正弦位置嵌入 nn.Linear(dsed, dsed * 4), # 线性层 nn.Mish(), # Mish 激活函数 nn.Linear(dsed * 4, dsed), # 线性层 ) cond_dim = dsed + global_cond_dim # 条件维度
  4. 定义中间模块:使用条件残差块
     in_out = list(zip(all_dims[:-1], all_dims[1:])) # 输入输出维度对 mid_dim = all_dims[-1] # 中间层维度 self.mid_modules = nn.ModuleList([ # 定义中间模块 ConditionalResidualBlock1D( mid_dim, mid_dim, cond_dim=cond_dim, kernel_size=kernel_size, n_groups=n_groups ), ConditionalResidualBlock1D( mid_dim, mid_dim, cond_dim=cond_dim, kernel_size=kernel_size, n_groups=n_groups ), ])
  5. 定义下采样模块:使用条件残差块ConditionalResidualBlock1D和下采样层Downsample1d
     down_modules = nn.ModuleList([]) # 定义下采样模块列表 for ind, (dim_in, dim_out) in enumerate(in_out): # 遍历输入输出维度对 is_last = ind >= (len(in_out) - 1) # 判断是否为最后一层 down_modules.append(nn.ModuleList([ # 添加下采样模块 ConditionalResidualBlock1D(  dim_in, dim_out, cond_dim=cond_dim,  kernel_size=kernel_size, n_groups=n_groups), ConditionalResidualBlock1D(  dim_out, dim_out, cond_dim=cond_dim,  kernel_size=kernel_size, n_groups=n_groups), Downsample1d(dim_out) if not is_last else nn.Identity() # 如果不是最后一层,则添加下采样层,否则添加 Identity 层 ]))
  6. 定义上采样模块:使用条件残差块ConditionalResidualBlock1D和上采样层Upsample1d
     up_modules = nn.ModuleList([]) # 定义上采样模块列表 for ind, (dim_in, dim_out) in enumerate(reversed(in_out[1:])): # 遍历反转后的输入输出维度对 is_last = ind >= (len(in_out) - 1) # 判断是否为最后一层 up_modules.append(nn.ModuleList([ # 添加上采样模块 ConditionalResidualBlock1D(  dim_out*2, dim_in, cond_dim=cond_dim,  kernel_size=kernel_size, n_groups=n_groups), ConditionalResidualBlock1D(  dim_in, dim_in, cond_dim=cond_dim,  kernel_size=kernel_size, n_groups=n_groups), Upsample1d(dim_in) if not is_last else nn.Identity() # 如果不是最后一层,则添加上采样层,否则添加 Identity 层 ]))
  7. 定义最终卷积层:用于生成输出
     final_conv = nn.Sequential( # 定义最终卷积层 Conv1dBlock(start_dim, start_dim, kernel_size=kernel_size), nn.Conv1d(start_dim, input_dim, 1), )
  8. 设置模块属性
     self.diffusion_step_encoder = diffusion_step_encoder # 设置扩散步骤编码器 self.up_modules = up_modules # 设置上采样模块 self.down_modules = down_modules # 设置下采样模块 self.final_conv = final_conv # 设置最终卷积层 print(\"number of parameters: {:e}\".format( # 打印参数数量 sum(p.numel() for p in self.parameters())) )
  9. 前向传播方法:
     def forward(self, sample: torch.Tensor, # 输入张量 timestep: Union[torch.Tensor, float, int], # 扩散步骤 global_cond=None): # 全局条件 \"\"\" x: (B,T,input_dim) timestep: (B,) 或 int,扩散步骤 global_cond: (B,global_cond_dim) output: (B,T,input_dim) \"\"\"

    接受输入张量

     sample = sample.moveaxis(-1,-2) # 将输入张量的最后一个维度移动到第二个维度 # (B,C,T)

    处理扩散步骤

     timesteps = timestep # 获取扩散步骤 if not torch.is_tensor(timesteps): # 如果扩散步骤不是张量 timesteps = torch.tensor([timesteps], dtype=torch.long, device=sample.device) # 将其转换为张量 elif torch.is_tensor(timesteps) and len(timesteps.shape) == 0: # 如果扩散步骤是标量张量 timesteps = timesteps[None].to(sample.device) # 将其扩展为一维张量 timesteps = timesteps.expand(sample.shape[0]) # 将扩散步骤扩展到批次维度

    计算全局特征

     global_feature = self.diffusion_step_encoder(timesteps) # 计算全局特征 if global_cond is not None: # 如果全局条件不为空 global_feature = torch.cat([ # 将全局特征和全局条件拼接在一起 global_feature, global_cond ], axis=-1)

    下采样过程

     x = sample # 获取输入张量 h = [] # 定义中间结果列表 for idx, (resnet, resnet2, downsample) in enumerate(self.down_modules): # 遍历下采样模块 x = resnet(x, global_feature) # 通过第一个条件残差块 x = resnet2(x, global_feature) # 通过第二个条件残差块 h.append(x) # 将结果添加到中间结果列表 x = downsample(x) # 通过下采样层

    中间过程

     for mid_module in self.mid_modules: # 遍历中间模块 x = mid_module(x, global_feature) # 通过中间模块

    上采样过程

     for idx, (resnet, resnet2, upsample) in enumerate(self.up_modules): # 遍历上采样模块 x = torch.cat((x, h.pop()), dim=1) # 将当前结果和中间结果拼接在一起 x = resnet(x, global_feature) # 通过第一个条件残差块 x = resnet2(x, global_feature) # 通过第二个条件残差块 x = upsample(x) # 通过上采样层

    最终卷积层

     x = self.final_conv(x) # 通过最终卷积层

    最终,返回输出张量

     x = x.moveaxis(-1,-2) # 将输出张量的第二个维度移动到最后一个维度 # (B,T,C) return x # 返回输出

第三部分(选读) Diff-Control:改进UMI所用的扩散策略(含ControlNet简介)

3.1 Diff-Control是什么及其提出的背景

3.1.1 背景

自从24年年初斯坦福等一系列机器人横空出世以来,模仿学习已经成为训练机器人的重要方法,其中,基于扩散的策略「4-Diffusion policy: Visuomotor policy learning via action diffusion」——因其有效建模多模态动作分布的能力而脱颖而出,从而提升了性能

然而,在实践中,动作表示的不一致性问题仍然是一个持续的挑战,这种不一致性可能导致机器人轨迹分布与底层环境之间的明显差异,从而限制控制策略的有效性[5-Robot learning from human demonstrations with inconsistent contexts]

这种不一致性的主要原因通常源于

  • 人类演示的丰富上下文性质[6-What matters in learning from offline human demonstrations for robot manipulation]
  • 分布转移问题[7- A reductionof imitation learning and structured prediction to no-regret online learning]
  • 以及高动态环境的波动性
    其实本质上是无状态的,缺乏将记忆和先验知识纳入控制器的机制,从而导致动作生成的不一致性,即they are fundamentally stateless, lackingprovisions for incorporating memory and prior knowledgeinto the controller, potentially leading to inconsistent actiongeneration

先前的方法,如

  1. 动作分块「8-Learning fine-grained bimanual manipulation with low-cost hardware,即ACT,其原理详见此文《ACT的原理解析:斯坦福炒虾机器人Moblie Aloha的动作分块算法ACT》」和预测闭环动作序列[4-扩散策略],已被提出以解决这一问题
  2. 此外,Hydra[9]和基于航点的操作[10]修改动作表示以确保一致性

然而,这些方法通过改变动作表示来解决问题,而不是直接使用原始动作

相反,能否通过在扩散策略中加入时间转换来明确地施加时间一致性?在深度状态空间模型领域[11- Deep state space models for time series forecasting]–[13-How to train your differentiable filter],有效学习状态转换模型能够识别潜在的动态模式

3.1.2 什么是Diff-Control:通过ControlNet将状态信息融入扩散策略中

对此,作者团队于2024年7月提出了Diff-Control「其对应的论文为Diff-Control: A Stateful Diffusion-based Policy for Imitation Learning」,一种基于扩散的状态策略,它生成动作并同时学习动作转换模型,其基于[14- Adding conditional control to text-to-image diffusion models]在图像生成中引入的ControlNet框架,利用它作为转换模型,为基础扩散策略提供时间调控

关于Diff-Control的构建基础,即什么是ControlNet,详见此文《AI绘画原理解析:从CLIP、BLIP到DALLE、DALLE 2、DALLE 3、Stable Diffusion(含ControlNet详解)》的第3.3节 给SD装上方向盘之ControlNet的详解及其应用:微调SD

如下图所示,先前的动作序列(蓝色)在生成新的动作序列(红色)时用作条件

Diff-Control的关键目标是学习如何将状态信息融入扩散策略的决策过程中

下图展示了这种行为的一个示例:

  • 如上图中部所示,一个学习近似余弦函数的策略,在时间点t给定单一观测值时,无状态策略(Diffusion Policy)在生成轨迹的延续方面遇到困难。由于存在歧义,扩散策略[4]往往会学习到多种模式
  • 相比之下,如上图左侧所示,Diff-Control通过整合时间条件,使其能够在生成轨迹时考虑过去的状态
    为此,所提出的方法利用了最新的ControlNet架构,以确保机器人动作生成中的时间一致性
    在CV领域中,ControlNet用于稳定扩散模型,以在生成图像或视频序列时启用额外的控制输入或附加条件

Diff-Control团队将ControlNet的基本原理从图像生成扩展到动作生成,并将其用作状态空间模型,在该模型中,系统的内部状态、观测(摄像头输入)、和人类语言指令共同影响策略的输出Our method extends the basic principle of ControlNet from image generation to action generation, and use it as a state-space model in which the internal state of the system affects the output of the policy in conjunction with observations (camerainput) and human language instructions

如下图所示,是Diff-Control在“打开盖子”任务中的实际应用

  • 每个时间窗口内(如红色所示),Diff-Control 生成动作序列
    Within each time window (depicted in red), Diff-Control generates action sequences
  • 在生成后续动作序列时,它利用先前的动作作为额外的控制输入,如蓝色所示
    When generat-ing subsequent action sequences, it utilizes previous actionsas an additional control input, shown in blue.

    这种时间过渡是通过贝叶斯公式实现的,有效地弥合了独立策略与状态空间建模之间的差距
    This temporaltransition is achieved through Bayesian formulation, effec-tively bridging the gap between standalone policies and statespace modeling

3.2 Diff-Control的技术架构

3.2.1 从扩散模型到递归贝叶斯公式

回顾一下扩散模型的背景知识

扩散模型通过迭代的将高斯噪声映射到目标分布,且可以选择性的基于上下文信息进行条件生成

  1. 比如给定初始点\\mathbf{a}_{T} \\sim \\mathcal{N}(\\mathbf{0}, \\mathbf{I}),扩散模型预测输出序列为\\mathbf{a}_{T-1}, \\mathbf{a}_{T-2}, \\cdots, \\mathbf{a}_{0},其中每个后续输出都是前一个输出的去噪版本(说白了,整个过程就是去噪的过程),故\\mathbf{a}_{0}就是扩散过程的输出
  2. 然后使用去噪扩散概率模型(DDPM)作为骨干网络。在训练过程中,噪声输入可以通过不同的噪声水平生成:\\mathbf{a}_{\\tau}=\\sqrt{\\bar{\\alpha}_{\\tau}} \\mathbf{a}+\\sqrt{1-\\bar{\\alpha}_{\\tau}} \\mathbf{Z},其中\\bar{\\alpha}_{\\tau}是方差调度,\\mathbf{z}是随机噪声,\\mathbf{z} \\sim \\mathcal{N}(\\mathbf{0}, \\mathbf{I})
  3. 接下来,可以训练一个神经网络\\epsilon(\\cdot)来预测添加到输入中的噪声,通过最小化以下公式
    \\mathcal{L}_{\\mathrm{DDPM}}:=\\mathbb{E}_{\\mathbf{o}, \\mathbf{a}, \\tau, \\mathbf{z}}\\left[\\|\\epsilon(\\mathbf{o}, \\mathbf{a}, \\tau)-\\mathbf{z}\\|_{2}^{2}\\right]
    其中,(\\mathbf{o}, \\mathbf{a})表示观察和动作对,\\tau是去噪时间步,\\tau \\in[1, T]

    在采样步骤中,迭代运行去噪过程
    \\mathbf{a}_{\\tau-1}=\\frac{1}{\\sqrt{\\alpha_{\\tau}}}\\left(\\mathbf{a}_{\\tau}-\\frac{1-\\alpha_{\\tau}}{\\sqrt{1-\\bar{\\alpha}_{\\tau}}} \\epsilon\\left(\\mathbf{o}, \\mathbf{a}_{\\tau}, \\tau\\right)\\right)+\\sigma_{\\tau} \\mathbf{z}
    整个过程相当于不断减掉每一步添加的噪声

接下来,目标是学习一个以条件\\mathbf{c}和观察\\mathbf{o}为输入的策略

在这个背景下,定义\\mathbf{a}为包含机器人末端执行器姿态的轨迹。与之前的方法[4],[32]一致,现在的目标也是将多个条件作为输入。然而,正如在上文所提到的,尽管之前的工作[4], [8], [9]已经努力探索稳健的动作生成,但它们并没有考虑到a的状态性

故下面通过在动作空间中引入转移,从贝叶斯的角度解决动作一致性问题(We address the action consistency from a Bayesian perspective by introducing transition in action spaces)

  1. 首先,有如下方程
    \\begin{array}{l} p\\left(\\mathbf{a}_{t} \\mid \\mathbf{a}_{1: t-1}, \\mathbf{o}_{1: t}, \\mathbf{c}\\right) \\\\ \\quad \\propto p\\left(\\mathbf{o}_{t} \\mid \\mathbf{a}_{t}, \\mathbf{c}\\right) p\\left(\\mathbf{a}_{t} \\mid \\mathbf{a}_{1: t-1}, \\mathbf{o}_{1: t-1}, \\mathbf{c}\\right) \\end{array}
  2. \\operatorname{bel}\\left(\\mathbf{a}_{t}\\right)=p\\left(\\mathbf{a}_{t} \\mid \\mathbf{a}_{1: t-1}, \\mathbf{o}, \\mathbf{c}\\right),应用马尔可夫性质,即假设下一个生成的轨迹仅依赖于当前的轨迹,得到

\\operatorname{bel}\\left(\\mathbf{a}_{t}\\right)=\\eta \\underbrace{p\\left(\\mathbf{o}_{t} \\mid \\mathbf{a}_{t}, \\mathbf{c}\\right)}_{\\text {observation model }} \\prod_{t=1}^{t} \\overbrace{p\\left(\\mathbf{a}_{t} \\mid \\mathbf{a}_{t-1}, \\mathbf{c}\\right)}^{\\text {transition model }} \\operatorname{bel}\\left(\\mathbf{a}_{t-1}\\right)

其中,\\eta是归一化因子,p\\left(\\mathbf{o}_{t} \\mid \\mathbf{a}_{t}, \\mathbf{c}\\right)是观测模型,p\\left(\\mathbf{a}_{t} \\mid \\mathbf{a}_{t-1}, \\mathbf{c}\\right)是转移模型,这个转移模型描述了系统动力学演化的规律,而观测模型则确定了系统内部状态与观测到的噪声测量值之间的关系

3.2.2 Diff-Control Policy:结合贝叶斯公式与扩散模型

接下来,展示如何将贝叶斯公式和扩散模型结合在一起,使得一个策略可以生成有状态的动作序列,从而促进一致的机器人行为

他们提出了如下图所示的Diff-Control策略\\pi_{\\boldsymbol{\\theta}}\\left(\\mathbf{a}_{\\left[W_{t}\\right]} \\mid \\mathbf{o}, \\mathbf{a}_{\\left[W_{t-h}\\right]}, \\mathbf{c}\\right),其参数为\\theta

其中,h代表执行时间范围,\\mathbf{c}表示以自然人类指令形式出现的语言条件,\\mathbf{o}表示由场景的RGB相机捕获的一系列图像

策略\\pi_{\\boldsymbol{\\theta}}生成一个轨迹窗口\\mathbf{a}_{\\left[W_{t}\\right]}=\\left[\\mathbf{a}_{1}, \\mathbf{a}_{2}, \\cdots \\mathbf{a}_{W}\\right]^{T} \\in \\mathbb{R}^{7 \\times W},其中W指的是窗口大小或预测时间范围

贝叶斯公式中的Diff-Control策略包含两个关键模块

  1. 转换模块接收先前的动作\\mathbf{a}_{\\left[W_{t}\\right]}并生成潜在的嵌入,以供基础策略后续使用
  2. 作为观测模型,基础策略结合了与\\mathbf{a}_{\\left[W_{t}\\right]}相关的时间信息,并生成一个新的动作\\mathbf{a}_{\\left[W_{t+h}\\right]}

这种双模块结构使得Diff-Control策略能够巧妙地捕捉时间动态,并有助于准确且一致地生成后续动作

3.2.3 基础策略与转换模型

对于基础策略,如下图左侧所示

  1. 首先,按照第2.2.1节中的步骤训练一个基于扩散的策略[4]作为基础策略\\bar{\\pi}_{\\boldsymbol{\\psi}}\\left(\\mathbf{a}_{\\left[W_{t}\\right]} \\mid \\mathbf{o}, \\mathbf{c}\\right)
  2. 然后采用[15-Planning with diffusion for flexible behavior synthesis]中的一维时间卷积网络,并构建U-net骨干网络
  3. 策略\\bar{\\pi}_{\\boldsymbol{\\psi}}可以自主执行并生成动作,而无需依赖任何时间信息

对于转换模型,如下图右侧所示,作者团队将ControlNet纳入为转换模块「Diff-Control Policy通过使用锁定的 U-net 扩散策略架构来实现。该策略复制了编码器和中间模块,并引入了零卷积层,即The Diff-Control Policy is implemented through the utilization of a locked U-net diffusion policy architecture. It replicates the encoder and middle blocks and incorporates zero convolution layers

这种利用有效地扩展了策略网络的能力,使其包含时间条件(This utilization extends the capability of the policy networkto include temporal conditioning effectively)

  1. 为实现这一目标,利用先前生成的动作序列\\mathbf{a}_{\\left[W_{t-h}\\right]}作为ControlNet的提示输入
    To achieve this,we utilize the previously generated action sequences as the prompt input to ControlNet.
  2. 通过这样做,基础策略\\bar{\\pi}_{\\boldsymbol{\\psi}}可以了解先前的动作\\mathbf{a}_{\\left[W_{t-h}\\right]},且通过创建一个可训练的\\bar{\\pi}_{\\boldsymbol{\\psi}}编码器副本来实现ControlNet,然后冻结基础策略\\bar{\\pi}_{\\boldsymbol{\\psi}}
    By doing so, the base policy¯πψψψ becomes informed about the previous actions a[Wt−h].We implement ControlNet by creating a trainable replica of the ¯πψψψ encoders and then freeze the base policy ¯πψψψ.

    且可训练的副本通过零卷积层[33]连接到固定模型
    The trainable replica is connected to the fixed model with zero convolutional layers [33]

  3. 最终,ControlNet可以将\\mathbf{a}_{\\left[W_{t-h}\\right]}作为条件向量,并重用训练好的基础策略\\bar{\\pi}_{\\boldsymbol{\\psi}}来构建下一个动作序列\\mathbf{a}_{\\left[W_{t}\\right]}
    ControlNet can then take a[Wt−h] as the conditioning vector and reuses the trained base policy¯πψψψ to construct the next action sequence a[Wt]

3.2.4 策略的训练与相关模型的选择

在策略的训练上,其中

  • 对于基础策略\\bar{\\pi}_{\\boldsymbol{\\psi}}\\left(\\mathbf{a}_{\\left[W_{t}\\right]} \\mid \\mathbf{o}, \\mathbf{c}\\right)的训练
    首先将观察\\mathbf{o}和语言条件\\mathbf{c}编码到相同的嵌入维度
    然后我们利用方程\\mathcal{L}_{\\mathrm{DDPM}}:=\\mathbb{E}_{\\mathbf{o}, \\mathbf{a}, \\tau, \\mathbf{z}}\\left[\\|\\epsilon(\\mathbf{o}, \\mathbf{a}, \\tau)-\\mathbf{z}\\|_{2}^{2}\\right]中定义的学习目标来训练基础策略
  • 在微调ControlNet时也使用相同的学习目标:\\mathcal{L}:=\\mathbb{E}_{\\mathbf{o}, \\mathbf{c}, \\mathbf{a}, \\mathbf{a}_{\\left[W_{t}\\right]}, \\tau, z}\\left[\\left\\|\\epsilon_{\\theta}\\left(\\mathbf{o}, \\mathbf{c}, \\mathbf{a}_{T}, \\mathbf{a}_{\\left[W_{t}\\right]}, \\tau\\right)-\\mathbf{z}\\right\\|_{2}^{2}\\right]
    整个扩散模型的总体学习目标是\\mathcal{L}\\epsilon_{\\boldsymbol{\\theta}}(\\cdot)是对应的由\\boldsymbol{\\theta}参数化的神经网络,且基础策略和Diff-Control策略都是端到端训练的

在模型的选择上,他们采用了基于CNN的U-net架构。这种基于CNN的骨干网络选择已被证明适用于多种任务,无论是在模拟环境还是现实场景中

在整个的模型架构中,如之前的下图所示

编码器和解码器块使用了具有不同核大小的1D卷积层

  1. 为了实现ControlNet的零卷积层,采用了权重初始化为零的1D1×1卷积层
    这种方法确保在训练的初始阶段,任何潜在的有害噪声不会影响可训练神经网络层的隐藏状态[14]
  2. 且在所有实验中使用窗口大小W=24作为默认预测范围,执行范围h在\\mathbf{a}_{\\left[W_{t-h}\\right]}\\mathbf{a}_{\\left[W_{t}\\right]}之间为12步,故在实验中默认只执行12步(h=12),以与[4-Diffusion policy: Visuomotor policy learning via action diffusion]保持一致
    因为根据[4],执行范围过大或过小都可能导致性能下降

此外,在实施语言条件任务时,他们还观察到基于扩散的策略在利用复杂的CLIP语言特征作为条件学习多样化动作时达到了某种能力上限

为了解决这个问题,更实用的方法是引入融合层并增加视觉和语言表示的嵌入大小,而不是直接将它们拼接在一起「To address this,a more practical approach is to incorporate a fuse layerand increasing embedding size for visual and language representations, instead of concatenating them directly」,这一修改可以提升策略在语言条件任务中的整体表现

3.3 实验与评估:围绕4个任务与4种基线方法PK

3.3.1 对4个任务的一般性建模

通过将Diff-Control策略与四种基线方法在五个不同的机器人任务上进行比较,包含的任务包括:(a)语言条件下的厨房任务,(b)厨房场景中的开盖任务,作为高精度任务,(c)动态场景中的捞鸭任务,(d)作为周期性任务的打鼓任务

  • UR5机器人手臂的动作表示为\\mathbf{a}\\left[W_{t}\\right],其中每个动作表示为\\mathbf{a}_{i}=[x, y, z, \\alpha, \\beta, \\gamma]^{T}i \\in[1, W]其中,包括末端执行器在笛卡尔坐标系中的位置(x, y, z)、方向(\\alpha, \\beta, \\gamma)和夹爪的关节角度\\text { g }
  • 且对于所有任务,输入模态包括两种模态:\\text { o }\\mathbf{c}
    第一种模态,\\mathbf{o} \\in \\mathbb{R}^{224 \\times 224 \\times 3},对应于RGB图像
    第二种模态,\\mathbf{c},指的是从自然语言序列中提取的语言嵌入。这个嵌入作为机器人理解和决策过程的语言输入

下表 按主观难度升序排列任务,提供了任务特征的摘要,如干扰物数量(Dis)、专家演示数量(Dem)、不同动作数量(Act)以及是否需要高精度(HiPrec)

以下是具体的4个任务

  1. 语言条件厨房任务:该任务旨在模拟厨房场景中的多个任务 [35], [36]
    机器人工作区由一个缩小的现实模型厨房组成,如下图所示

    厨房环境包含各种物体,包括锅、平底锅、碗和类似毛绒蔬菜的干扰物。在数据收集过程中,干扰物会被随机放置。训练有素的专家负责远程操作机器人在厨房环境中执行两个特定动作。这些动作包括取回一个西红柿,并根据给定的语言指令将其放置在炉灶上的锅中(A)或水槽中(B)
  2. 高精度开盖:任务包括抬起盖子并随后将其放置在附近的碗上,这需要精确的控制,如下图所示,盖子的把手相对较小,且盖子的表面是反光的

    为了收集此任务的数据,他们获得了50次专家示范。每次示范中,引入了5个或更多干扰物体的随机放置,以及锅的位置和盖子的旋转的轻微变化
  3. 鸭子舀取:受[37]的启发,他们为机器人配备了一个勺子,机器人的目标是将鸭子从水中舀出来,如下图右下角所示,这一任务由于勺子进入水中引起的扰动而具有挑战性

    水流影响橡皮鸭的位置,要求机器人执行精确而谨慎的动作,以成功捕捉鸭子
  4. 鼓点:该任务专门为机器人学习周期性动作而设计,由于需要独特的动作表示,这是一项具有挑战性的任务[38],如下图所示

    该任务的难点在于机器人必须准确地计数鼓点的次数并确定何时停止敲鼓。通过远程操作机器人在每次示范中敲击鼓三次,共获得了150次专家示范

3.3.2 与4种基线在性能上的PK

为了评估diff-control的泛化能力、整体性能及其优势,他们提出的基线如下:

  1. Image-BC:这个基线采用图像到动作的代理框架,类似于BC-Z[2],它基于ResNet-18骨干网络,并使用FiLM [39]通过CLIP语言特征进行条件处理
  2. ModAttn[32]:该方法采用transformer风格的神经网络,并使用模块化结构通过神经注意力来处理任务的各个子方面,它需要人类专家正确识别每个任务的组件和子任务
  3. BC-Z LSTM:这个基线代表了一种受BC-Z架构启发的有状态策略。通过使用MLP和LSTM层将先前的动作和语言条件融合,实现了先前输入的整合
  4. 扩散策略[4]:这个基线是一个标准的扩散策略

所有基线模型均使用相同的专家演示数据进行再现和训练,总共训练3,000个周期

  • 在整个训练过程中,每300个周期保存一次检查点。在他们的分析中,他们报告了这些保存的检查点中每种基线方法所获得的最佳结果
  • 每个实验均在单个NVIDIA Quadro RTX 8000 GPU上以64的批量大小进行,持续约24小时
  • 对于所有任务,使用Adamw优化器,学习率为1e-4

具体PK结果详见原论文