> 技术文档 > 宇树 G1 部署(九)——遥操作控制脚本 teleop_hand_and_arm.py 分析与测试部署_搭建宇树g1仿真环境

宇树 G1 部署(九)——遥操作控制脚本 teleop_hand_and_arm.py 分析与测试部署_搭建宇树g1仿真环境

首先,我使用的是 v1.0 版本,宇树最近发力了更新的很快:xr_teleoperate-1.0

teleop_hand_and_arm.py 支持通过 XR 设备(比如手势或手柄)来控制实际机器人动作,也支持在虚拟仿真中运行。可以根据需要,通过命令行参数来配置运行方式

启动参数说明:

 1. 基础控制参数

⚙️ 参数 📜 说明 🔘 目前可选值 📌 默认值 --xr-mode 选择 XR 输入模式(通过什么方式控制机器人)

hand(手势跟踪)

controller(手柄跟踪)

hand --arm 选择机器人设备类型

G1_29

G1_23

H1_2 H1

G1_29 --ee 选择手臂的末端执行器设备类型

dex1

dex3

inspire1

无默认值

 2. 模式开关参数

⚙️ 参数 📜 说明 --record 【启用数据录制模式】
按 r 键进入遥操后,按 s 键可开启数据录制,再次按 s 键可结束录制并保存本次 episode 数据
继续按下 s 键可重复前述过程 --motion 【启用运动控制模式】
开启本模式后,可在机器人运控程序运行下进行遥操作程序
手势跟踪模式下,可使用 R3遥控器 控制机器人正常行走;手柄跟踪模式下,也可使用手柄摇杆控制机器人行走 --headless 【启用无图形界面模式】
适用于本程序部署在开发计算单元(PC2)等无显示器情况 --sim 【启用仿真模式】

选择手势跟踪来控制 G1(29 DoF) + inspire1 灵巧手设备,同时开启数据录制模式,则启动命令如下所示(建议在 terminal 中运行):

python teleop_hand_and_arm.py --xr-mode=hand --arm=G1_29 --ee=inspire1 --record --motion

接下来详细分析一下这个脚本,这个脚本高度模块化,可扩展性极好,并且采用并发设计,可以做很多二开

目录

1 脚本解析

1.1 导入依赖及路径配置

1.2 仿真控制辅助函数

1.3 全局状态管理与键盘监听回调

1.4 主函数入口与参数解析

1.5 图像通道配置与共享内存初始化

1.6 XR/手势与机器人运动控制绑定

1.7 仿真与运动客户端初始化

1.8 主循环 - 控制、记录与交互核心

1.9 异常处理与资源清理

2 相机适配与脚本测试

2.1 相机适配

2.2 脚本测试

2.1 图像服务

2.2 主控脚本测试


1 脚本解析

1.1 导入依赖及路径配置

import numpy as np # 科学计算库,主要用于矩阵、图像等数据处理import time # 提供时间和延迟函数import argparse # 用于命令行参数解析import cv2 # OpenCV,常用于图像处理和显示from multiprocessing import shared_memory, Value, Array, Lock # 多进程数据共享和同步import threading # 多线程,用于异步任务如键盘监听、图像接收import logging_mp # 自定义多进程日志模块,便于跨进程日志收集logging_mp.basic_config(level=logging_mp.INFO) # 设置日志等级为INFOlogger_mp = logging_mp.get_logger(__name__) # 获取当前模块的日志实例import os  # 操作系统相关import sys # 系统参数和函数current_dir = os.path.dirname(os.path.abspath(__file__)) # 当前脚本所在目录parent_dir = os.path.dirname(current_dir)  # 上级目录(项目根目录)sys.path.append(parent_dir) # 把根目录加入模块搜索路径,便于import自定义模块# 导入自定义功能模块from televuer import TeleVuerWrapper # XR/VR交互与手势、相机接口from teleop.robot_control.robot_arm import G1_29_ArmController, G1_23_ArmController, H1_2_ArmController, H1_ArmController # 手臂控制器from teleop.robot_control.robot_arm_ik import G1_29_ArmIK, G1_23_ArmIK, H1_2_ArmIK, H1_ArmIK # 手臂逆解算器from teleop.robot_control.robot_hand_unitree import Dex3_1_Controller, Gripper_Controller # Unitree手/夹爪控制器from teleop.robot_control.robot_hand_inspire import Inspire_Controller # Inspire手控制器from teleop.image_server.image_client import ImageClient # 远程图像采集客户端from teleop.utils.episode_writer import EpisodeWriter # episode数据记录器from sshkeyboard import listen_keyboard, stop_listening # 跨平台键盘监听,非阻塞# 仿真相关:Isaac通信话题from unitree_sdk2py.core.channel import ChannelPublisher # DDS通信话题发布器from unitree_sdk2py.idl.std_msgs.msg.dds_ import String_ # 通用字符串消息类型
  • televuer:XR/VR端的手势、头显追踪数据与画面交互桥
  • robot_control:控制不同型号手臂和末端执行器
  • episode_writer:用于实验数据记录和存档
  • ImageClient:用于与远端摄像头画面服务器通信
  • sshkeyboard:跨平台键盘监听

1.2 仿真控制辅助函数

def publish_reset_category(category: int, publisher): # 发布仿真场景重置信号 msg = String_(data=str(category))# 构造消息 publisher.Write(msg) # 发布 logger_mp.info(f\"published reset category: {category}\")# 日志记录

向 Isaac 仿真环境发布场景重置信号,便于环境回溯、重新初始化

1.3 全局状态管理与键盘监听回调

# 全局运行状态变量start_signal = False # 标志是否开始主循环(r键触发)running = True  # 主循环运行标志should_toggle_recording = False # 是否切换录制状态(s键触发)is_recording = False # 当前是否处于录制状态def on_press(key): # 键盘按下回调 global running, start_signal, should_toggle_recording if key == \'r\':  # \'r\'键:启动主循环 start_signal = True logger_mp.info(\"Program start signal received.\") elif key == \'q\': # \'q\'键:退出主循环 stop_listening() running = False elif key == \'s\': # \'s\'键:切换录制 should_toggle_recording = True else: logger_mp.info(f\"{key} was pressed, but no action is defined for this key.\")# 启动键盘监听线程,监听过程中不会阻塞主线程listen_keyboard_thread = threading.Thread( target=listen_keyboard, kwargs={\"on_press\": on_press, \"until\": None, \"sequential\": False,}, daemon=True)listen_keyboard_thread.start() # 启动监听
  • 维护主状态机(是否运行、是否录制等),通过键盘控制流程转移(r:开始;q:退出;s:切换录制)
  • listen_keyboard_thread:监听线程,用于实现用户无阻塞交互

1.4 主函数入口与参数解析

if __name__ == \'__main__\': # 程序主入口 parser = argparse.ArgumentParser() # 创建参数解析器 parser.add_argument(\'--task_dir\', type=str, default=\'./utils/data\', help=\'path to save data\') # 数据保存路径 parser.add_argument(\'--frequency\', type=float, default=90.0, help=\"save data\'s frequency\") # 采样频率 parser.add_argument(\'--xr-mode\', type=str, choices=[\'hand\', \'controller\'], default=\'hand\', help=\'Select XR device tracking source\') # XR输入模式 parser.add_argument(\'--arm\', type=str, choices=[\'G1_29\', \'G1_23\', \'H1_2\', \'H1\'], default=\'G1_29\', help=\'Select arm controller\') # 选用手臂 parser.add_argument(\'--ee\', type=str, choices=[\'dex3\', \'gripper\', \'inspire1\'], help=\'Select end effector controller\')  # 末端类型 parser.add_argument(\'--record\', action=\'store_true\', help=\'Enable data recording\')  # 是否录制数据 parser.add_argument(\'--motion\', action=\'store_true\', help=\'Enable motion control mode\') # 是否开启运动模式 parser.add_argument(\'--headless\', action=\'store_true\', help=\'Enable headless mode (no display)\') # 是否无头(无窗口)模式 parser.add_argument(\'--sim\', action=\'store_true\', help=\'Enable isaac simulation mode\') # 是否仿真 args = parser.parse_args() # 解析参数 logger_mp.info(f\"args: {args}\") # 输出参数配置

命令行参数管理,支持灵活配置,如:数据路径、帧率、手臂类型、末端类型、是否仿真、是否录制、是否无头模式等

1.5 图像通道配置与共享内存初始化

 # -------- 图像客户端/共享内存配置 -------- if args.sim: # 仿真配置 img_config = { \'fps\': 30, \'head_camera_type\': \'opencv\', \'head_camera_image_shape\': [480, 640], # 头部相机分辨率 \'head_camera_id_numbers\': [0], \'wrist_camera_type\': \'opencv\', \'wrist_camera_image_shape\': [480, 640], # 腕部相机分辨率 \'wrist_camera_id_numbers\': [2, 4], } else: # 实机配置 img_config = { \'fps\': 30, \'head_camera_type\': \'opencv\', \'head_camera_image_shape\': [480, 1280], # 头部相机分辨率(1280更宽,可能双目) \'head_camera_id_numbers\': [0], \'wrist_camera_type\': \'opencv\', \'wrist_camera_image_shape\': [480, 640], \'wrist_camera_id_numbers\': [2, 4], } ASPECT_RATIO_THRESHOLD = 2.0 # 判别双目/单目的分辨率阈值 # 判断是否为双目头相机 if len(img_config[\'head_camera_id_numbers\']) > 1 or (img_config[\'head_camera_image_shape\'][1] / img_config[\'head_camera_image_shape\'][0] > ASPECT_RATIO_THRESHOLD): BINOCULAR = True else: BINOCULAR = False # 是否有腕部相机 if \'wrist_camera_type\' in img_config: WRIST = True else: WRIST = False # 设置头部相机的共享内存图像shape if BINOCULAR and not (img_config[\'head_camera_image_shape\'][1] / img_config[\'head_camera_image_shape\'][0] > ASPECT_RATIO_THRESHOLD): tv_img_shape = (img_config[\'head_camera_image_shape\'][0], img_config[\'head_camera_image_shape\'][1] * 2, 3) else: tv_img_shape = (img_config[\'head_camera_image_shape\'][0], img_config[\'head_camera_image_shape\'][1], 3) # 申请头部相机的共享内存,并映射为numpy数组 tv_img_shm = shared_memory.SharedMemory(create=True, size=np.prod(tv_img_shape) * np.uint8().itemsize) tv_img_array = np.ndarray(tv_img_shape, dtype=np.uint8, buffer=tv_img_shm.buf) # 腕部相机同理,判断WRIST和仿真,创建对应的共享内存和客户端 if WRIST and args.sim: wrist_img_shape = (img_config[\'wrist_camera_image_shape\'][0], img_config[\'wrist_camera_image_shape\'][1] * 2, 3) wrist_img_shm = shared_memory.SharedMemory(create=True, size=np.prod(wrist_img_shape) * np.uint8().itemsize) wrist_img_array = np.ndarray(wrist_img_shape, dtype=np.uint8, buffer=wrist_img_shm.buf) img_client = ImageClient(tv_img_shape=tv_img_shape, tv_img_shm_name=tv_img_shm.name, wrist_img_shape=wrist_img_shape, wrist_img_shm_name=wrist_img_shm.name, server_address=\"127.0.0.1\") elif WRIST and not args.sim: wrist_img_shape = (img_config[\'wrist_camera_image_shape\'][0], img_config[\'wrist_camera_image_shape\'][1] * 2, 3) wrist_img_shm = shared_memory.SharedMemory(create=True, size=np.prod(wrist_img_shape) * np.uint8().itemsize) wrist_img_array = np.ndarray(wrist_img_shape, dtype=np.uint8, buffer=wrist_img_shm.buf) img_client = ImageClient(tv_img_shape=tv_img_shape, tv_img_shm_name=tv_img_shm.name, wrist_img_shape=wrist_img_shape, wrist_img_shm_name=wrist_img_shm.name) else: # 没有腕部相机,仅头部相机 img_client = ImageClient(tv_img_shape=tv_img_shape, tv_img_shm_name=tv_img_shm.name) # 图像接收线程,持续将远程图像写入共享内存 image_receive_thread = threading.Thread(target=img_client.receive_process, daemon=True) image_receive_thread.daemon = True image_receive_thread.start()
  • 兼容多相机(头、腕)、多分辨率与双目/单目场景

  • 用共享内存高效实现多进程(如图像采集与主控)之间的数据高速传递,保证零拷贝

  • 图像客户端单独线程异步接收,保证主线程不卡顿

1.6 XR/手势与机器人运动控制绑定

 # XR桥接实例,负责手势/动作/图像数据交互 tv_wrapper = TeleVuerWrapper( binocular=BINOCULAR, use_hand_tracking=args.xr_mode == \'hand\', img_shape=tv_img_shape, img_shm_name=tv_img_shm.name, return_state_data=True, return_hand_rot_data=False ) # -------- 机器人臂、末端控制器与逆运动学模块加载 -------- if args.arm == \'G1_29\': arm_ctrl = G1_29_ArmController(motion_mode=args.motion, simulation_mode=args.sim) arm_ik = G1_29_ArmIK() elif args.arm == \'G1_23\': arm_ctrl = G1_23_ArmController(simulation_mode=args.sim) arm_ik = G1_23_ArmIK() elif args.arm == \'H1_2\': arm_ctrl = H1_2_ArmController(simulation_mode=args.sim) arm_ik = H1_2_ArmIK() elif args.arm == \'H1\': arm_ctrl = H1_ArmController(simulation_mode=args.sim) arm_ik = H1_ArmIK() # -------- 末端执行器控制器/数据区初始化 -------- if args.ee == \"dex3\": # 双Dex3机械手 left_hand_pos_array = Array(\'d\', 75, lock=True) # 左手输入 75维 right_hand_pos_array = Array(\'d\', 75, lock=True) # 右手输入 75维 dual_hand_data_lock = Lock() # 双手输入锁 dual_hand_state_array = Array(\'d\', 14, lock=False) # 输出当前左右手state dual_hand_action_array = Array(\'d\', 14, lock=False) # 输出当前左右手action hand_ctrl = Dex3_1_Controller( left_hand_pos_array, right_hand_pos_array, dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array ) elif args.ee == \"gripper\": # 双夹爪 left_gripper_value = Value(\'d\', 0.0, lock=True) # 左夹爪输入 right_gripper_value = Value(\'d\', 0.0, lock=True) # 右夹爪输入 dual_gripper_data_lock = Lock()# 夹爪同步锁 dual_gripper_state_array = Array(\'d\', 2, lock=False) # 输出当前左右夹爪state dual_gripper_action_array = Array(\'d\', 2, lock=False)# 输出当前左右夹爪action gripper_ctrl = Gripper_Controller( left_gripper_value, right_gripper_value, dual_gripper_data_lock, dual_gripper_state_array, dual_gripper_action_array, simulation_mode=args.sim ) elif args.ee == \"inspire1\": # Inspire手 left_hand_pos_array = Array(\'d\', 75, lock=True) # Inspire手输入 right_hand_pos_array = Array(\'d\', 75, lock=True) dual_hand_data_lock = Lock() dual_hand_state_array = Array(\'d\', 12, lock=False) dual_hand_action_array = Array(\'d\', 12, lock=False) hand_ctrl = Inspire_Controller( left_hand_pos_array, right_hand_pos_array, dual_hand_data_lock, dual_hand_state_array, dual_hand_action_array ) else: pass # 没选末端则跳过
  • XR 设备(如 Apple Vision)追踪的数据,通过 tv_wrapper 进行抽象和接口化,统一管理
  • 根据参数,灵活加载目标机器人的运动学与执行器控制(如 7/6轴、Dex3、夹爪等)

1.7 仿真与运动客户端初始化

 # -------- 仿真相关初始化 -------- if args.sim: reset_pose_publisher = ChannelPublisher(\"rt/reset_pose/cmd\", String_) # 创建仿真重置话题 reset_pose_publisher.Init()# 初始化 from teleop.utils.sim_state_topic import start_sim_state_subscribe # 仿真状态订阅 sim_state_subscriber = start_sim_state_subscribe() # -------- 控制器运动模式初始化 -------- if args.xr_mode == \'controller\' and args.motion: from unitree_sdk2py.g1.loco.g1_loco_client import LocoClient sport_client = LocoClient() sport_client.SetTimeout(0.0001) # 设置通信超时 sport_client.Init() # -------- 数据录制器初始化 -------- if args.record and args.headless: recorder = EpisodeWriter(task_dir=args.task_dir, frequency=args.frequency, rerun_log=False) # 无头模式:不复现 elif args.record and not args.headless: recorder = EpisodeWriter(task_dir=args.task_dir, frequency=args.frequency, rerun_log=True) # GUI模式:可复现
  • 仿真/实机自适应

  • 数据记录支持无头与 GUI 两种模式(适配远程部署/本地实验)

  • LocoClient 用于高层次运动控制

1.8 主循环 - 控制、记录与交互核心

 try: logger_mp.info(\"Please enter the start signal (enter \'r\' to start the subsequent program)\") # 等待用户按下 \'r\' 启动程序 while not start_signal: time.sleep(0.01) arm_ctrl.speed_gradual_max() # 启动时速度梯度最大化,安全启动 while running:  # 主循环,按frequency频率循环 start_time = time.time() if not args.headless: # 非无头模式,显示主画面 tv_resized_image = cv2.resize(tv_img_array, (tv_img_shape[1] // 2, tv_img_shape[0] // 2)) cv2.imshow(\"record image\", tv_resized_image) key = cv2.waitKey(1) & 0xFF if key == ord(\'q\'):  stop_listening()  running = False  if args.sim: publish_reset_category(2, reset_pose_publisher) elif key == ord(\'s\'):  should_toggle_recording = True elif key == ord(\'a\'):  if args.sim: publish_reset_category(2, reset_pose_publisher) if args.record and should_toggle_recording: should_toggle_recording = False if not is_recording:  if recorder.create_episode(): is_recording = True  else: logger_mp.error(\"Failed to create episode. Recording not started.\") else:  is_recording = False  recorder.save_episode()  if args.sim: publish_reset_category(1, reset_pose_publisher) # 获取XR/VR动作输入 tele_data = tv_wrapper.get_motion_state_data() # 不同末端/输入模式下的输入数据赋值 if (args.ee == \'dex3\' or args.ee == \'inspire1\') and args.xr_mode == \'hand\': with left_hand_pos_array.get_lock():  left_hand_pos_array[:] = tele_data.left_hand_pos.flatten() with right_hand_pos_array.get_lock():  right_hand_pos_array[:] = tele_data.right_hand_pos.flatten() elif args.ee == \'gripper\' and args.xr_mode == \'controller\': with left_gripper_value.get_lock():  left_gripper_value.value = tele_data.left_trigger_value with right_gripper_value.get_lock():  right_gripper_value.value = tele_data.right_trigger_value elif args.ee == \'gripper\' and args.xr_mode == \'hand\': with left_gripper_value.get_lock():  left_gripper_value.value = tele_data.left_pinch_value with right_gripper_value.get_lock():  right_gripper_value.value = tele_data.right_pinch_value else: pass  # 高级运动控制(如遥控器模式) if args.xr_mode == \'controller\' and args.motion: # 右A键退出 if tele_data.tele_state.right_aButton:  running = False # 左右摇杆共同按下,减震模式(紧急停止) if tele_data.tele_state.left_thumbstick_state and tele_data.tele_state.right_thumbstick_state:  sport_client.Damp() # 运动控制,速度限制在0.3以内 sport_client.Move(  -tele_data.tele_state.left_thumbstick_value[1] * 0.3,  -tele_data.tele_state.left_thumbstick_value[0] * 0.3,  -tele_data.tele_state.right_thumbstick_value[0] * 0.3 ) # 获取当前双臂关节状态和速度 current_lr_arm_q = arm_ctrl.get_current_dual_arm_q() current_lr_arm_dq = arm_ctrl.get_current_dual_arm_dq() # 求逆解得到目标关节和力矩 time_ik_start = time.time() sol_q, sol_tauff = arm_ik.solve_ik( tele_data.left_arm_pose, tele_data.right_arm_pose, current_lr_arm_q, current_lr_arm_dq ) time_ik_end = time.time() logger_mp.debug(f\"ik:\\t{round(time_ik_end - time_ik_start, 6)}\") # 控制机械臂 arm_ctrl.ctrl_dual_arm(sol_q, sol_tauff) # ------ 数据录制流程 ------ if args.record: # 各种末端/模式下状态和动作的收集与组织 if args.ee == \"dex3\" and args.xr_mode == \'hand\':  with dual_hand_data_lock: left_ee_state = dual_hand_state_array[:7] right_ee_state = dual_hand_state_array[-7:] left_hand_action = dual_hand_action_array[:7] right_hand_action = dual_hand_action_array[-7:] current_body_state = [] current_body_action = [] elif args.ee == \"gripper\" and args.xr_mode == \'hand\':  with dual_gripper_data_lock: left_ee_state = [dual_gripper_state_array[1]] right_ee_state = [dual_gripper_state_array[0]] left_hand_action = [dual_gripper_action_array[1]] right_hand_action = [dual_gripper_action_array[0]] current_body_state = [] current_body_action = [] elif args.ee == \"gripper\" and args.xr_mode == \'controller\':  with dual_gripper_data_lock: left_ee_state = [dual_gripper_state_array[1]] right_ee_state = [dual_gripper_state_array[0]] left_hand_action = [dual_gripper_action_array[1]] right_hand_action = [dual_gripper_action_array[0]] current_body_state = arm_ctrl.get_current_motor_q().tolist() current_body_action = [ -tele_data.tele_state.left_thumbstick_value[1] * 0.3, -tele_data.tele_state.left_thumbstick_value[0] * 0.3, -tele_data.tele_state.right_thumbstick_value[0] * 0.3 ] elif args.ee == \"inspire1\" and args.xr_mode == \'hand\':  with dual_hand_data_lock: left_ee_state = dual_hand_state_array[:6] right_ee_state = dual_hand_state_array[-6:] left_hand_action = dual_hand_action_array[:6] right_hand_action = dual_hand_action_array[-6:] current_body_state = [] current_body_action = [] else:  left_ee_state = []  right_ee_state = []  left_hand_action = []  right_hand_action = []  current_body_state = []  current_body_action = [] # 图像采集 current_tv_image = tv_img_array.copy() if WRIST:  current_wrist_image = wrist_img_array.copy() # 关节状态/动作 left_arm_state = current_lr_arm_q[:7] right_arm_state = current_lr_arm_q[-7:] left_arm_action = sol_q[:7] right_arm_action = sol_q[-7:] if is_recording:  colors = {}  depths = {}  # 多相机/双目数据分流  if BINOCULAR: colors[f\"color_{0}\"] = current_tv_image[:, :tv_img_shape[1]//2] colors[f\"color_{1}\"] = current_tv_image[:, tv_img_shape[1]//2:] if WRIST: colors[f\"color_{2}\"] = current_wrist_image[:, :wrist_img_shape[1]//2] colors[f\"color_{3}\"] = current_wrist_image[:, wrist_img_shape[1]//2:]  else: colors[f\"color_{0}\"] = current_tv_image if WRIST: colors[f\"color_{1}\"] = current_wrist_image[:, :wrist_img_shape[1]//2] colors[f\"color_{2}\"] = current_wrist_image[:, wrist_img_shape[1]//2:]  # 组织各类状态  states = { \"left_arm\": { \"qpos\": left_arm_state.tolist(), # 关节角度 \"qvel\": [], \"torque\": [], }, \"right_arm\": { \"qpos\": right_arm_state.tolist(), \"qvel\": [], \"torque\": [], }, \"left_ee\": { \"qpos\": left_ee_state,  \"qvel\": [],  \"torque\": [], }, \"right_ee\": { \"qpos\": right_ee_state, \"qvel\": [],  \"torque\": [], }, \"body\": { \"qpos\": current_body_state, },  }  actions = { \"left_arm\": {  \"qpos\": left_arm_action.tolist(), \"qvel\": [], \"torque\": [], }, \"right_arm\": {  \"qpos\": right_arm_action.tolist(), \"qvel\": [], \"torque\": [], }, \"left_ee\": {  \"qpos\": left_hand_action, \"qvel\": [], \"torque\": [], }, \"right_ee\": {  \"qpos\": right_hand_action, \"qvel\": [], \"torque\": [], }, \"body\": { \"qpos\": current_body_action, },  }  if args.sim: # 仿真状态同时录制 sim_state = sim_state_subscriber.read_data()  recorder.add_item(colors=colors, depths=depths, states=states, actions=actions, sim_state=sim_state)  else: recorder.add_item(colors=colors, depths=depths, states=states, actions=actions) # 控制主循环帧率(frequency控制,每周期做sleep补偿) current_time = time.time() time_elapsed = current_time - start_time sleep_time = max(0, (1 / args.frequency) - time_elapsed) time.sleep(sleep_time) logger_mp.debug(f\"main process sleep: {sleep_time}\")
  • 核心循环依赖于全局状态管理,控制整体流程启动、结束、录制等逻辑

  • 通过 XR 设备采集动作/手势数据,结合当前机器人状态,实时求逆解控制

  • 支持仿真和实机的兼容、数据分流、录制、UI 预览等功能

  • 按设置频率同步主循环(保持控制与采样一致性)

1.9 异常处理与资源清理

 except KeyboardInterrupt: logger_mp.info(\"KeyboardInterrupt, exiting program...\") # 捕获Ctrl+C优雅退出 finally: arm_ctrl.ctrl_dual_arm_go_home() # 程序结束时归位 if args.sim: sim_state_subscriber.stop_subscribe() tv_img_shm.close() tv_img_shm.unlink() if WRIST: wrist_img_shm.close() wrist_img_shm.unlink() if args.record: recorder.close() listen_keyboard_thread.join() logger_mp.info(\"Finally, exiting program...\") exit(0)
  • 保证即使中断也能自动归位/释放机器人

  • 关闭所有共享内存、线程、数据记录器,防止资源泄露

2 相机适配与脚本测试

目前官方给的遥操作是另外一套相机:

宇树各个模块参考链接:https://github.com/unitreerobotics/xr_teleoperate?tab=readme-ov-file

宇树相机参考链接:[For reference only] http://e.tb.cn/h.TaZxgkpfWkNCakg?tk=KKz03Kyu04u

为了与官方测试最为相机,我采用的是 “120无畸变”

先记一下这个相机的适配过程

2.1 相机适配

1. 使用 v4l2-ctl 命令预览设备信息

首先可以确认相机设备确实存在并可以访问:

v4l2-ctl --list-devices

或直接查看格式和分辨率支持:

v4l2-ctl --device=/dev/video6 --list-formats-extv4l2-ctl --device=/dev/video7 --list-formats-ext

2. 使用 ffplay 快速查看

ffplay /dev/video6

如果没有 ffplay,先装一下:sudo apt install ffmpeg

3. 使用 opencv 脚本查看

写一个简单的脚本:

\'\'\'#双目import cv2cap = cv2.VideoCapture(0)while True: ret, frame = cap.read() if not ret: break # 双目拆分(假设1280x480,左右各640x480) left = frame[:, :frame.shape[1]//2] right = frame[:, frame.shape[1]//2:] cv2.imshow(\'left\', left) cv2.imshow(\'right\', right) if cv2.waitKey(1) == ord(\'q\'): breakcap.release()cv2.destroyAllWindows()\'\'\'#单目import cv2cap = cv2.VideoCapture(0)while True: ret, frame = cap.read() if not ret: print(\"No frame!\") break cv2.imshow(\"cam6\", frame) if cv2.waitKey(1) == ord(\'q\'): breakcap.release()cv2.destroyAllWindows()

可以显示:

2.2 脚本测试

2.1 图像服务

开启 image_server.py,只需修改 config:

 config = { \'fps\': 30, \'head_camera_type\': \'opencv\', \'head_camera_image_shape\': [480, 1280], # Head camera resolution \'head_camera_id_numbers\': [0], }

然后,新开一个 terminal 开启 image_client.py,只需修改 ip,具体根据 ifconfig 查看:

client = ImageClient(image_show = True, server_address=\'192.168.8.30\', Unit_Test=False) # deployment test

2.2 主控脚本测试

切记:关掉 client!!!开启server!!!

如果端口占用就:

#查看进程sudo lsof -i :8012#kill 占用端口的进程sudo kill -9 12345

或者:

sudo kill -9 $(sudo lsof -t -i:8012)

如果报错 Segment Fault,那就是 dds 通讯没起来,就添加 cyclonedds 到 ~/.bashrc,然后 source ~/.bashrc 立即生效

export CYCLONEDDS_HOME=\"/home/unitree/Documents/unitree_sdk2_python/cyclonedds/install\"

检查一下:

echo $CYCLONEDDS_HOME

主控脚本修改 config:

 img_config = { \'fps\': 30, \'head_camera_type\': \'opencv\', \'head_camera_image_shape\': [480, 1280], # Head camera resolution \'head_camera_id_numbers\': [0], }

最后,确保 G1 已经进入 调试模式(L2+R2),并启动主控脚本:

打开浏览器应用(ipad Safari 也可以),输入并访问网址:https://192.168.8.30:8012?ws=wss://192.168.8.30:8012,可以快乐的遥操作啦