linux中模拟can信号实现通信
实车上算法一般通过ros进行通信,车辆和控制器之间则通过can通信实现,今天来学习一下如何模拟这个can。
can信号的发送和接收一般是需要载体的,我们一般都有can0和can1设备可以使用,在电脑上创建这个设备:
加载vcan内核模块:
sudo modprobe vcan
创建虚拟CAN接口:
sudo ip link add dev vcan0 type vcan
将虚拟CAN接口处于在线状态:
sudo ip link set up vcan0
然后就能进行测试了:
cansend vcan0 260##00000000000000000
可以去另一个终端打印相关的信息:
candump vcan0
详细信息(比如判断是can帧还是can-fd帧)通过:
candump -L vcan0
来打印
说明:
1.基本用法
cansend 需要两个参数: 和 。
是 CAN 设备的名称,如 vcan0。
是要发送的 CAN 帧的格式。
2.CAN 帧格式
#{R|data}:用于标准的 CAN 2.0 帧。
是 CAN 帧的标识符,可以是 3 个(标准帧)或 8 个(扩展帧)十六进制字符。
{R} 表示远程传输请求。
{data} 是数据字段,包含 0 到 8 个十六进制值(对于 CAN FD 是 0 到 64)。
##{data}:用于 CAN FD 帧。
是一个十六进制值(0 到 F),定义了 CAN FD 帧的标志。
3.示例
5A1#11.2233.44556677.88:发送一个标准 CAN 帧,ID 为 5A1,数据为 11 22 33 44 55 66 77 88。
123#DEADBEEF:发送一个标准 CAN 帧,ID 为 123,数据为 DE AD BE EF。
5AA#:发送一个扩展 CAN 帧,ID 为 5AA,没有数据。
123##1:发送一个 CAN FD 帧,ID 为 123,标志为 1,没有数据。
213##311:发送一个 CAN FD 帧,ID 为 213,标志为 311,没有数据。
1F334455#1122334455667788:发送一个扩展 CAN 帧,ID 为 1F334455,数据为 11 22 33 44 55 66 77 88。
123#R:发送一个远程传输请求,ID 为 123。
程序示例:
新建一个功能包创建新的CanData.msg:
int32 drive_modeint32 gear_positionint32 emergency_statusint32 reset_keyint32 remote_keyfloat32 current_anglefloat32 vehicle_speedint32 throttle_feedbackint32 fault_code1int32 fault_code2int32 fault_code3int32 fault_code4int32 total_mileagefloat32 brake_pressureint32 socint32 vehicle_status1int32 vehicle_status2int32 left_turnint32 right_turnint32 hornint32 high_beamint32 brake_sensorint32 low_beamint32 fog_lightint32 seat_indicatorint32 height_sensorfloat32 pitch_sensorint32 run_modeint32 handbrakefloat32 run_speedint32 main_versionint32 sub_versionint32 rev_versionint32 dateint32 total_faultint32 vcu_statusint32 vehicle_start_statusint32 vcu_voltage
附带的cmakelist和package:
cmake_minimum_required(VERSION 3.0.2)project(wirecontrol)find_package(catkin REQUIRED COMPONENTS rospy std_msgs message_generation)add_message_files( FILES CanData.msg)generate_messages( DEPENDENCIES std_msgs)catkin_package( CATKIN_DEPENDS message_runtime)
<?xml version=\"1.0\"?><package format=\"2\"> <name>wirecontrol</name> <version>0.0.0</version> <description>The wirecontrol package</description> <maintainer email=\"cyun@todo.todo\">cyun</maintainer> <license>TODO</license> <buildtool_depend>catkin</buildtool_depend> <build_depend>message_generation</build_depend> <build_depend>rospy</build_depend> <build_depend>std_msgs</build_depend> <build_export_depend>rospy</build_export_depend> <build_export_depend>std_msgs</build_export_depend> <exec_depend>message_runtime</exec_depend> <exec_depend>rospy</exec_depend> <exec_depend>std_msgs</exec_depend> <export> </export></package>
函数编写,主要是通过dbc文件来发送can消息,比较方便。
#!/usr/bin/env python\'\'\'Author: CYUN && cyun@tju.enu.cnDate: 2024-07-26 16:58:02LastEditors: CYUN && cyun@tju.enu.cnLastEditTime: 2024-07-26 18:09:29FilePath: /undefined/home/cyun/forklift_sim_ws/src/wirecontrol/scripts/wirecontrol.pyDescription: send can messages from vcan0 to controllerCopyright (c) 2024 by Tianjin University, All Rights Reserved. \'\'\'import osimport timeimport canimport rospyimport cantoolsfrom car_interfaces.msg import CanData # 替换为你的包名# 获取DBC文件路径script_directory = os.path.dirname(os.path.abspath(__file__))relative_path = \"../config/IPC_VCU_ZRD.dbc\"dbc_file_path = os.path.join(script_directory, relative_path)# 加载DBC文件dbc = cantools.db.load_file(dbc_file_path)# 创建与 \'vcan0\' 接口的 CAN 总线连接# can_bus = can.interface.Bus(bustype=\'socketcan\', channel=\'vcan0\', bitrate=500000)can_bus = can.interface.Bus(bustype=\'socketcan\', channel=\'vcan0\', bitrate=500000, fd=True)# 初始化全局变量data = CanData()previous_send_time = {}def can_data_callback(msg): global data data = msgdef send_can_message(bus, message): global previous_send_time current_time = time.time() # 发布can/canfd消息 # can_msg = can.Message(arbitration_id=msg_id, data=encoded_message, is_extended_id=False, is_fd=True) # can_msg = can.Message(arbitration_id=message.arbitration_id, data=message.data, is_extended_id=message.is_extended_id, is_fd=fd) bus.send(message) if message.arbitration_id in previous_send_time: time_diff = current_time - previous_send_time[message.arbitration_id] frequency = 1.0 / time_diff if time_diff > 0 else 0 print(f\"Sent CAN message: ID=0x{message.arbitration_id:X}, Data={\'\'.join(format(x, \'02x\') for x in message.data)}\") print(f\"send sequence: {frequency:.2f} Hz\") else: print(f\"Sent CAN message: ID=0x{message.arbitration_id:X}, Data={\'\'.join(format(x, \'02x\') for x in message.data)}\") print(\"send sequence: First message\") previous_send_time[message.arbitration_id] = current_timedef main(): global can_bus rospy.init_node(\'can_publisher\', anonymous=True) rospy.Subscriber(\'can_data_topic\', CanData, can_data_callback) # 订阅自定义消息 rate = rospy.Rate(50) # 50 Hz try: while not rospy.is_shutdown(): for msg_id in [0x201, 0x202, 0x203, 0x204, 0x205, 0x261, 0x260]: if msg_id == 0x201: message = dbc.get_message_by_frame_id(0x201) data_dict = { \'Drive_Mode\': data.drive_mode, \'Gear\': data.gear_position, \'EPOSts\': data.emergency_status, \'YK_H\': data.reset_key, \'YK_F\': data.remote_key, \'Angle\': int(data.current_angle * 100), # 根据协议可能需要调整 \'Car_Speed\': int(data.vehicle_speed * 1000), # 根据协议可能需要调整 \'Motor_Torque\': data.throttle_feedback } elif msg_id == 0x202: message = dbc.get_message_by_frame_id(0x202) data_dict = { \'Fault1\': data.fault_code1, \'Fault2\': data.fault_code2, \'Fault3\': data.fault_code3, \'Fault4\': data.fault_code4, \'Mileage\': data.total_mileage } elif msg_id == 0x203: message = dbc.get_message_by_frame_id(0x203) data_dict = { \'Brake_Pressure\': int(data.brake_pressure * 20), # 根据协议可能需要调整 \'SOC\': data.soc, \'CarSts1\': data.vehicle_status1, \'CarSts2\': data.vehicle_status2, \'LR_WheelSpeed\': data.left_turn, \'RR_WheelSpeed\': data.right_turn } elif msg_id == 0x204: message = dbc.get_message_by_frame_id(0x204) data_dict = { \'Pitching\': data.height_sensor, \'LX_Hight\': int(data.pitch_sensor * 10), # 根据协议可能需要调整 } elif msg_id == 0x261: message = dbc.get_message_by_frame_id(0x261) data_dict = { \'LV_Sun\': data.main_version, \'LV_zCode\': data.sub_version, \'LV_Main\': data.rev_version, \'Time\': data.date } elif msg_id == 0x260: message = dbc.get_message_by_frame_id(0x260) data_dict = { \'Fault\': data.total_fault, \'VCU_Sts\': data.vcu_status, \'VCU_Service_Voltage\': data.vehicle_start_status, \'CarStartState\': data.vcu_voltage } encoded_message = message.encode(data_dict) # 发布CAN消息 # can_msg = can.Message(arbitration_id=msg_id, data=encoded_message, is_extended_id=False) # 发布CANFD消息 can_msg = can.Message(arbitration_id=msg_id, data=encoded_message, is_extended_id=False, is_fd=True) send_can_message(can_bus, can_msg) rate.sleep() except rospy.ROSInterruptException: pass except Exception as e: print(f\"An error occurred: {e}\") finally: can_bus.shutdown()if __name__ == \'__main__\': main()
运行需要安装一些can工具:
sudo apt install can-utilspip install can cantools
这里面通过设置fd的bool值来确定发送can还是canfd消息。
将车辆模型的相关信息通过ros topic发送到can处理接口后,就能够实现通过控制can信号的值来控制我们的车辆了。很不错。
另外,还是查看所有can口的命令:
ip addr | grep \"can\"
我这里有两个虚拟can口
可以通过
sudo ip link delete vcan0
来删除can口