SOME/IP:用Python实现协议订阅、Offer、订阅ACK与报文接收_python someip
文章目录
- 前言
 - 一、代码层次
 - 二、详细代码
 - 
- 1. eth_scapy_sd.py
 - 2、eth_scapy_someip.py
 - 3、network_define.py
 - 4、packet_define.py
 - 5、unpack_define.py
 - 6、someip_controller.py
 
 
前言
1、需要pip安装scapy库
 2、需要修改根据实际情况配置network_define.py
 3、执行someip_controller.py运行本案例
 4、本文参考github: eth-scapy-someip
注:最近没啥时间写文章,有兴趣自己研究下
一、代码层次
someip/└── module/ ├── eth_scapy_sd.py # SOME/IP SD服务发现结构体定义 ├── eth_scapy_someip.py # SOME/IP标准结构体定义 ├── network_define.py # 网络配置(IP/端口/多播组) ├── packet_define.py # 协议字段定义(Offer/Subscribe/ACK) ├── unpack_define.py # 接收报文解析器 └── someip_controller.py # 主控制逻辑(订阅/OFFER/ACK流程管理)
二、详细代码
1. eth_scapy_sd.py
import ctypesimport collectionsfrom eth_scapy_someip import SOMEIPfrom scapy.fields import *from scapy.packet import *from scapy.layers.inet6 import IP6Fieldclass _SDPacketBase(Packet): \"\"\"base class to be used among all SD Packet definitions.\"\"\" # use this dictionary to set default values for desired fields (mostly on subclasses # where not all fields are defined locally) # - key : field_name, value : desired value # - it will be used from \'init_fields\' function, upon packet initialization # # example : _defaults = {\'field_1_name\':field_1_value,\'field_2_name\':field_2_value} _defaults = {} def _set_defaults(self): \"\"\"goes through \'_defaults\' dict setting field default values (for those that have been defined).\"\"\" for key in self._defaults.keys(): try: self.get_field(key) except KeyError: pass else: self.setfieldval(key, self._defaults[key]) def init_fields(self): \"\"\"perform initialization of packet fields with desired values. NOTE : this funtion will only be called *once* upon class (or subclass) construction \"\"\" Packet.init_fields(self) self._set_defaults()# SD ENTRY# - Service# - EventGroupclass _SDEntry(_SDPacketBase): \"\"\"Base class for SDEntry_* packages.\"\"\" TYPE_FMT = \">B\" TYPE_PAYLOAD_I = 0 # ENTRY TYPES : SERVICE TYPE_SRV_FINDSERVICE = 0x00 TYPE_SRV_OFFERSERVICE = 0x01 TYPE_SRV = (TYPE_SRV_FINDSERVICE, TYPE_SRV_OFFERSERVICE) # ENTRY TYPES : EVENGROUP TYPE_EVTGRP_SUBSCRIBE = 0x06 TYPE_EVTGRP_SUBSCRIBE_ACK = 0x07 TYPE_EVTGRP = (TYPE_EVTGRP_SUBSCRIBE, TYPE_EVTGRP_SUBSCRIBE_ACK) # overall len (UT usage) OVERALL_LEN = 16 # 定义Entries Array结构体 fields_desc = [ ByteField(\"type\", 0), ByteField(\"index_1\", 0), ByteField(\"index_2\", 0), BitField(\"n_opt_1\", 0, 4), BitField(\"n_opt_2\", 0, 4), ShortField(\"srv_id\", 0), ShortField(\"inst_id\", 0), ByteField(\"major_ver\", 0), X3BytesField(\"ttl\", 0), ] def guess_payload_class(self, payload): \"\"\"decode SDEntry depending on its type.\"\"\" pl_type = struct.unpack( _SDEntry.TYPE_FMT, payload[_SDEntry.TYPE_PAYLOAD_I : _SDEntry.TYPE_PAYLOAD_I + 1], )[0] if pl_type in _SDEntry.TYPE_SRV: return SDEntry_Service elif pl_type in _SDEntry.TYPE_EVTGRP: return SDEntry_EventGroupclass SDEntry_Service(_SDEntry): \"\"\"Service Entry.\"\"\" _defaults = {\"type\": _SDEntry.TYPE_SRV_FINDSERVICE} name = \"Service Entry\" fields_desc = [_SDEntry, IntField(\"minor_ver\", 0)]class SDEntry_EventGroup(_SDEntry): \"\"\"EventGroup Entry.\"\"\" _defaults = {\"type\": _SDEntry.TYPE_EVTGRP_SUBSCRIBE} name = \"Eventgroup Entry\" fields_desc = [ _SDEntry, BitField(\"res\", 0, 12), BitField(\"cnt\", 0, 4), ShortField(\"eventgroup_id\", 0), ]# SD Option# - Configuration# - LoadBalancing# - IPv4 EndPoint# - IPv6 EndPoint# - IPv4 MultiCast# - IPv6 MultiCast# - IPv4 EndPoint# - IPv6 EndPointclass _SDOption(_SDPacketBase): \"\"\"Base class for SDOption_* packages.\"\"\" TYPE_FMT = \">B\" TYPE_PAYLOAD_I = 2 CFG_TYPE = 0x01 CFG_OVERALL_LEN = ( 4 # overall length of CFG SDOption,empty \'cfg_str\' (to be used from UT) ) LOADBALANCE_TYPE = 0x02 LOADBALANCE_LEN = 0x05 LOADBALANCE_OVERALL_LEN = 8 # overall length of LB SDOption (to be used from UT) IP4_ENDPOINT_TYPE = 0x04 IP4_ENDPOINT_LEN = 0x0009 IP4_MCAST_TYPE = 0x14 IP4_MCAST_LEN = 0x0009 IP4_SDENDPOINT_TYPE = 0x24 IP4_SDENDPOINT_LEN = 0x0009 IP4_OVERALL_LEN = 12 # overall length of IP4 SDOption (to be used from UT) IP6_ENDPOINT_TYPE = 0x06 IP6_ENDPOINT_LEN = 0x0015 IP6_MCAST_TYPE = 0x16 IP6_MCAST_LEN = 0x0015 IP6_SDENDPOINT_TYPE = 0x26 IP6_SDENDPOINT_LEN = 0x0015 IP6_OVERALL_LEN = 24 # overall length of IP6 SDOption (to be used from UT) def guess_payload_class(self, payload): \"\"\"decode SDOption depending on its type.\"\"\" pl_type = struct.unpack( _SDOption.TYPE_FMT, payload[_SDOption.TYPE_PAYLOAD_I : _SDOption.TYPE_PAYLOAD_I + 1], )[0] if pl_type == _SDOption.CFG_TYPE: return SDOption_Config elif pl_type == self.LOADBALANCE_TYPE: return SDOption_LoadBalance elif pl_type == self.IP4_ENDPOINT_TYPE: return SDOption_IP4_EndPoint elif pl_type == self.IP4_MCAST_TYPE: return SDOption_IP4_Multicast elif pl_type == self.IP4_SDENDPOINT_TYPE: return SDOption_IP4_SD_EndPoint elif pl_type == self.IP6_ENDPOINT_TYPE: return SDOption_IP6_EndPoint elif pl_type == self.IP6_MCAST_TYPE: return SDOption_IP6_Multicast elif pl_type == self.IP6_SDENDPOINT_TYPE: return SDOption_IP6_SD_EndPointclass _SDOption_Header(_SDOption): fields_desc = [ ShortField(\"len\", None), ByteField(\"type\", 0), ByteField(\"res_hdr\", 0), ]class _SDOption_Tail(_SDOption): fields_desc = [ ByteField(\"res_tail\", 0), ByteEnumField(\"l4_proto\", 0x06, {0x06: \"TCP\", 0x11: \"UDP\"}), ShortField(\"port\", 0), ]class _SDOption_IP4(_SDOption): fields_desc = [_SDOption_Header, IPField(\"addr\", \"0.0.0.0\"), _SDOption_Tail]class _SDOption_IP6(_SDOption): fields_desc = [ _SDOption_Header, IP6Field(\"addr\", \"2001:cdba:0000:0000:0000:0000:3257:9652\"), _SDOption_Tail, ]class SDOption_Config(_SDOption): # offset to be added upon length calculation (corresponding to header\'s \"Reserved\" field) LEN_OFFSET = 0x01 name = \"Config Option\" # default values specification _defaults = {\"type\": _SDOption.CFG_TYPE} # package fields definiton fields_desc = [_SDOption_Header, StrField(\"cfg_str\", \"\")] def post_build(self, p, pay): # length computation excluding 16b_length and 8b_type l = self.len if l is None: l = len(self.cfg_str) + self.LEN_OFFSET p = struct.pack(\"!H\", l) + p[2:] return p + payclass SDOption_LoadBalance(_SDOption): name = \"LoadBalance Option\" # default values specification _defaults = {\"type\": _SDOption.LOADBALANCE_TYPE, \"len\": _SDOption.LOADBALANCE_LEN} # package fields definiton fields_desc = [_SDOption_Header, ShortField(\"priority\", 0), ShortField(\"weight\", 0)]# SDOPTIONS : IPv4-specificclass SDOption_IP4_EndPoint(_SDOption_IP4): name = \"IP4 EndPoint Option\" # default values specification _defaults = {\"type\": _SDOption.IP4_ENDPOINT_TYPE, \"len\": _SDOption.IP4_ENDPOINT_LEN}class SDOption_IP4_Multicast(_SDOption_IP4): name = \"IP4 Multicast Option\" # default values specification _defaults = {\"type\": _SDOption.IP4_MCAST_TYPE, \"len\": _SDOption.IP4_MCAST_LEN}class SDOption_IP4_SD_EndPoint(_SDOption_IP4): name = \"IP4 SDEndPoint Option\" # default values specification _defaults = { \"type\": _SDOption.IP4_SDENDPOINT_TYPE, \"len\": _SDOption.IP4_SDENDPOINT_LEN, }# SDOPTIONS : IPv6-specificclass SDOption_IP6_EndPoint(_SDOption_IP6): name = \"IP6 EndPoint Option\" # default values specification _defaults = {\"type\": _SDOption.IP6_ENDPOINT_TYPE, \"len\": _SDOption.IP6_ENDPOINT_LEN}class SDOption_IP6_Multicast(_SDOption_IP6): name = \"IP6 Multicast Option\" # default values specification _defaults = {\"type\": _SDOption.IP6_MCAST_TYPE, \"len\": _SDOption.IP6_MCAST_LEN}class SDOption_IP6_SD_EndPoint(_SDOption_IP6): name = \"IP6 SDEndPoint Option\" # default values specification _defaults = { \"type\": _SDOption.IP6_SDENDPOINT_TYPE, \"len\": _SDOption.IP6_SDENDPOINT_LEN, }## SD PACKAGE DEFINITION#class SD(_SDPacketBase): \"\"\" SD Packet NOTE : when adding \'entries\' or \'options\', do not use list.append() method but create a new list e.g. : p = SD() p.option_array = [SDOption_Config(),SDOption_IP6_EndPoint()] \"\"\" SOMEIP_MSGID_SRVID = 0xFFFF SOMEIP_MSGID_SUBID = 0x1 SOMEIP_MSGID_EVENTID = 0x100 SOMEIP_PROTO_VER = 0x01 SOMEIP_IFACE_VER = 0x01 SOMEIP_MSG_TYPE = SOMEIP.TYPE_NOTIFICATION name = \"SD\" # Flags definition: {\"name\":(mask,offset)} _sdFlag = collections.namedtuple(\"Flag\", \"mask offset\") FLAGSDEF = { \"REBOOT\": _sdFlag(mask=0x80, offset=7), # ReBoot flag \"UNICAST\": _sdFlag(mask=0x40, offset=6), # UniCast flag } fields_desc = [ ByteField(\"flags\", 0), X3BytesField(\"res\", 0), FieldLenField( name=\"len_entry_array\", default=None, length_of=\"entry_array\", fmt=\"!I\" ), PacketListField( name=\"entry_array\", default=None, pkt_cls=_SDEntry, length_from=lambda pkt: pkt.len_entry_array, ), FieldLenField( name=\"len_option_array\", default=None, length_of=\"option_array\", fmt=\"!I\" ), PacketListField( name=\"option_array\", default=None, pkt_cls=_SDOption, length_from=lambda pkt: pkt.len_option_array, ), ] def __init__(self, *args, **kwargs): super(SD, self).__init__(*args, **kwargs) self.explicit = 1 def getFlag(self, name): \"\"\"get particular flag from bitfield.\"\"\" name = name.upper() if name in self.FLAGSDEF: return (self.flags & self.FLAGSDEF[name].mask) >> self.FLAGSDEF[name].offset else: return None def setFlag(self, name, value): \"\"\" Set particular flag on bitfield. :param str name : name of the flag to set (see SD.FLAGSDEF) :param int value : either 0x1 or 0x0 (provided int will be ANDed with 0x01) \"\"\" name = name.upper() if name in self.FLAGSDEF: self.flags = ( self.flags & ctypes.c_ubyte(~self.FLAGSDEF[name].mask).value ) | (value & 0x01) << self.FLAGSDEF[name].offset def setEntryArray(self, entry_list): \"\"\" Add entries to entry_array. :param entry_list: list of entries to be added. Single entry object also accepted \"\"\" if isinstance(entry_list, list): self.entry_array = entry_list else: self.entry_array = [entry_list] def setOptionArray(self, option_list): \"\"\" Add options to option_array. :param option_list: list of options to be added. Single option object also accepted \"\"\" if isinstance(option_list, list): self.option_array = option_list else: self.option_array = [option_list] def getSomeip(self, stacked=False): \"\"\" return SD-initialized SOME/IP packet :param stacked: boolean. Either just SOME/IP packet or stacked over SD-self \"\"\" p = SOMEIP() p.msg_id.srv_id = SD.SOMEIP_MSGID_SRVID p.msg_id.sub_id = SD.SOMEIP_MSGID_SUBID p.msg_id.event_id = SD.SOMEIP_MSGID_EVENTID p.proto_ver = SD.SOMEIP_PROTO_VER p.iface_ver = SD.SOMEIP_IFACE_VER p.msg_type = SD.SOMEIP_MSG_TYPE if stacked: return p / self else: return p def get_someip_with_session_id(self,session_id, stacked=False): p = SOMEIP() p.msg_id.srv_id = SD.SOMEIP_MSGID_SRVID p.msg_id.sub_id = SD.SOMEIP_MSGID_SUBID p.msg_id.event_id = SD.SOMEIP_MSGID_EVENTID p.proto_ver = SD.SOMEIP_PROTO_VER p.iface_ver = SD.SOMEIP_IFACE_VER p.msg_type = SD.SOMEIP_MSG_TYPE p.req_id.session_id = session_id if stacked: return p / self else: return p
2、eth_scapy_someip.py
from scapy.layers.inet import *from scapy.fields import *from scapy.packet import *\"\"\"SOMEIP PACKAGE DEFINITION\"\"\"class _SOMEIP_MessageId(Packet): \"\"\"MessageId subpacket.\"\"\" name = \'MessageId\' fields_desc = [ ShortField(\'srv_id\', 0), BitEnumField(\'sub_id\', 0, 1, {0: \'METHOD_ID\', 1: \'EVENT_ID\'}), ConditionalField(BitField(\'method_id\', 0, 15), lambda pkt: pkt.sub_id == 0), ConditionalField(BitField(\'event_id\', 0, 15), lambda pkt: pkt.sub_id == 1) ] def extract_padding(self, p): return \'\', pclass _SOMEIP_RequestId(Packet): \"\"\" RequestId subpacket.\"\"\" name = \'RequestId\' fields_desc = [ ShortField(\'client_id\', 0), ShortField(\'session_id\', 0)] def extract_padding(self, p): return \'\', pclass SOMEIP(Packet): \"\"\" SOME/IP Packet.\"\"\" # Default values PROTOCOL_VERSION = 0x01 INTERFACE_VERSION = 0x01 # Lenght offset (without payload) LEN_OFFSET = 0x08 # SOME/IP TYPE VALUES TYPE_REQUEST = 0x00 TYPE_REQUEST_NO_RET = 0x01 TYPE_NOTIFICATION = 0x02 TYPE_REQUEST_ACK = 0x40 TYPE_REQUEST_NORET_ACK = 0x41 TYPE_NOTIFICATION_ACK = 0x42 TYPE_RESPONSE = 0x80 TYPE_ERROR = 0x81 TYPE_RESPONSE_ACK = 0xc0 TYPE_ERROR_ACK = 0xc1 # SOME/IP-TP TYPE VALUES TYPE_REQUEST_SEGMENT = 0x20 TYPE_REQUEST_NO_RET_SEGMENT = 0x21 TYPE_NOTIFICATION_SEGMENT = 0x22 TYPE_REQUEST_ACK_SEGMENT = 0x60 TYPE_REQUEST_NORET_ACK_SEGMENT = 0x61 TYPE_NOTIFICATION_ACK_SEGMENT = 0x62 TYPE_RESPONSE_SEGMENT = 0xa0 TYPE_ERROR_SEGMENT = 0xa1 TYPE_RESPONSE_ACK_SEGMENT = 0xe0 TYPE_ERROR_ACK_SEGMENT = 0xe1 SOMEIP_TP_TYPES = frozenset({TYPE_REQUEST_SEGMENT, TYPE_REQUEST_NO_RET_SEGMENT, TYPE_NOTIFICATION_SEGMENT,  TYPE_REQUEST_ACK_SEGMENT, TYPE_REQUEST_NORET_ACK_SEGMENT,  TYPE_NOTIFICATION_ACK_SEGMENT, TYPE_RESPONSE_SEGMENT, TYPE_ERROR_SEGMENT,  TYPE_RESPONSE_ACK_SEGMENT, TYPE_ERROR_ACK_SEGMENT}) SOMEIP_TP_TYPE_BIT_MASK = 0x20 # SOME/IP RETURN CODES RET_E_OK = 0x00 RET_E_NOT_OK = 0x01 RET_E_UNKNOWN_SERVICE = 0x02 RET_E_UNKNOWN_METHOD = 0x03 RET_E_NOT_READY = 0x04 RET_E_NOT_REACHABLE = 0x05 RET_E_TIMEOUT = 0x06 RET_E_WRONG_PROTOCOL_V = 0x07 RET_E_WRONG_INTERFACE_V = 0x08 RET_E_MALFORMED_MSG = 0x09 RET_E_WRONG_MESSAGE_TYPE = 0x0a # SOME/IP-TP More Segments Flag SOMEIP_TP_LAST_SEGMENT = 0 SOMEIP_TP_MORE_SEGMENTS = 1 _OVERALL_LEN_NOPAYLOAD = 16 # UT name = \'SOME/IP\' fields_desc = [ PacketField(\'msg_id\', _SOMEIP_MessageId(), _SOMEIP_MessageId), # MessageID IntField(\'len\', None), # Length PacketField(\'req_id\', _SOMEIP_RequestId(), _SOMEIP_RequestId), # RequestID ByteField(\'proto_ver\', PROTOCOL_VERSION), # Protocol version ByteField(\'iface_ver\', INTERFACE_VERSION), # Interface version ByteEnumField(\'msg_type\', TYPE_REQUEST, { # -- Message type -- TYPE_REQUEST: \'REQUEST\', # 0x00 TYPE_REQUEST_NO_RET: \'REQUEST_NO_RETURN\', # 0x01 TYPE_NOTIFICATION: \'NOTIFICATION\', # 0x02 TYPE_REQUEST_ACK: \'REQUEST_ACK\', # 0x40 TYPE_REQUEST_NORET_ACK: \'REQUEST_NO_RETURN_ACK\', # 0x41 TYPE_NOTIFICATION_ACK: \'NOTIFICATION_ACK\', # 0x42 TYPE_RESPONSE: \'RESPONSE\', # 0x80 TYPE_ERROR: \'ERROR\', # 0x81 TYPE_RESPONSE_ACK: \'RESPONSE_ACK\', # 0xc0 TYPE_ERROR_ACK: \'ERROR_ACK\', # 0xc1 }), ByteEnumField(\'retcode\', 0, { # -- Return code -- RET_E_OK: \'E_OK\', # 0x00 RET_E_NOT_OK: \'E_NOT_OK\', # 0x01 RET_E_UNKNOWN_SERVICE: \'E_UNKNOWN_SERVICE\', # 0x02 RET_E_UNKNOWN_METHOD: \'E_UNKNOWN_METHOD\', # 0x03 RET_E_NOT_READY: \'E_NOT_READY\', # 0x04 RET_E_NOT_REACHABLE: \'E_NOT_REACHABLE\', # 0x05 RET_E_TIMEOUT: \'E_TIMEOUT\', # 0x06 RET_E_WRONG_PROTOCOL_V: \'E_WRONG_PROTOCOL_VERSION\', # 0x07 RET_E_WRONG_INTERFACE_V: \'E_WRONG_INTERFACE_VERSION\', # 0x08 RET_E_MALFORMED_MSG: \'E_MALFORMED_MESSAGE\', # 0x09 RET_E_WRONG_MESSAGE_TYPE: \'E_WRONG_MESSAGE_TYPE\', # 0x0a }), ConditionalField(BitField(\'offset\', 0, 28), lambda pkt: pkt.msg_type in SOMEIP.SOMEIP_TP_TYPES), ConditionalField(BitField(\'reserved\', 0, 3), lambda pkt: pkt.msg_type in SOMEIP.SOMEIP_TP_TYPES), ConditionalField(BitEnumField(\'more_segments\', 0, 1, {SOMEIP_TP_LAST_SEGMENT: \'Last_Segment\',  SOMEIP_TP_MORE_SEGMENTS: \'More_Segments\'  }), lambda pkt: pkt.msg_type in SOMEIP.SOMEIP_TP_TYPES) ] def post_build(self, p, pay): length = self.len # length computation : RequestID + PROTOVER_IFACEVER_TYPE_RETCODE + PAYLOAD if length is None: length = self.LEN_OFFSET + len(pay) p = p[:4] + struct.pack(\'!I\', length) + p[8:] return p + payfor i in range(15): bind_layers(UDP, SOMEIP, sport=30490 + i) bind_layers(TCP, SOMEIP, sport=30490 + i)
3、network_define.py
class EthParameter: # 广播 sd_network_card = \"eth0.62\" sd_ip = \"239.0.0.255\" # 本机 server_network_card = \"Realtek PCIe GbE Family Controller #2\" server_ip = \"192.168.62.31\" # 域控 client_network_card = \"eth0.62\" client_ip = \"192.168.62.11\" sd_port = 30490 producer_port = 30500 consumer_prot = 30501
4、packet_define.py
import eth_scapy_sd as sdfrom network_define import EthParameterfrom scapy.layers.l2 import Etherfrom scapy.layers.inet import IP, UDPclass SomeipPacker: def __init__(self): self.eth_para = EthParameter() self.sd_session_id = 1 self.client_session_id = 1 def update_sd_session_id(self): \"\"\"更新session_id,从0递增到65535\"\"\" self.sd_session_id += 1 if self.sd_session_id >= 65535: self.sd_session_id = 0 def update_client_session_id(self): self.client_session_id += 1 if self.client_session_id >= 65535: self.client_session_id = 0 # ===========================# # 序列化 # # ===========================# def packet_subscribe(self, service_id: list, ttl: int): \"\"\"组事件订阅包\"\"\" subscribe_packager = sd.SD() subscribe_packager.setFlag(\"REBOOT\", 1) subscribe_packager.setFlag(\"UNICAST\", 1) subscribe_packager.len_entry_array = 16 * len(service_id) subscribe_packager.len_option_array = 24 for i in service_id: subscribe_packager.entry_array.append( sd.SDEntry_EventGroup(  type=sd.SDEntry_EventGroup.TYPE_EVTGRP_SUBSCRIBE,  srv_id=i,  inst_id=0x1,  n_opt_1=2,  major_ver=0x1,  ttl=ttl,  eventgroup_id=0x1, ) ) subscribe_packager.option_array = [ # UDP EndPoint sd.SDOption_IP4_EndPoint( addr=self.eth_para.server_ip, l4_proto=0x11, port=self.eth_para.consumer_prot, ), # TCP EndPoint sd.SDOption_IP4_EndPoint( addr=self.eth_para.server_ip, l4_proto=0x06, port=self.eth_para.consumer_prot, ), ] print(subscribe_packager.show()) subscribe_package = ( Ether() / IP(src=self.eth_para.server_ip, dst=self.eth_para.client_ip) / UDP(sport=self.eth_para.sd_port, dport=self.eth_para.sd_port) / subscribe_packager.get_someip_with_session_id( self.client_session_id, True ) ) self.update_client_session_id() return subscribe_package def packet_subscribe_ack(self, service_id: list, ttl: int): \"\"\"组ack包\"\"\" ack_packager = sd.SD() ack_packager.setFlag(\"REBOOT\", 1) ack_packager.setFlag(\"UNICAST\", 1) ack_packager.len_entry_array = 16 * len(service_id) ack_packager.len_option_array = 0 for i in service_id: ack_packager.entry_array.append( sd.SDEntry_EventGroup(  type=sd.SDEntry_EventGroup.TYPE_EVTGRP_SUBSCRIBE_ACK,  srv_id=i,  inst_id=0x1,  major_ver=0x1,  ttl=ttl,  eventgroup_id=0x1, ) ) print(ack_packager.show()) ack_package = ( Ether() / IP(src=self.eth_para.server_ip, dst=self.eth_para.client_ip) / UDP(sport=self.eth_para.sd_port, dport=self.eth_para.sd_port) / ack_packager.get_someip_with_session_id(self.client_session_id, True) ) self.update_client_session_id() return ack_package def packet_offer(self, service_id: list, ttl: int): \"\"\"组offer包\"\"\" offer_packager = sd.SD() offer_packager.setFlag(\"REBOOT\", 1) offer_packager.setFlag(\"UNICAST\", 1) offer_packager.len_entry_array = 16 * len(service_id) offer_packager.len_option_array = 12 for i in service_id: offer_packager.entry_array.append( sd.SDEntry_Service(  type=sd.SDEntry_Service.TYPE_SRV_OFFERSERVICE,  srv_id=i,  inst_id=0x1,  n_opt_1=1,  major_ver=0x1,  minor_ver=0x01,  ttl=ttl, ) ) offer_packager.option_array = [ sd.SDOption_IP4_EndPoint( addr=self.eth_para.server_ip, l4_proto=0x11, port=self.eth_para.producer_port, ) ] print(offer_packager.show()) offer_package = ( Ether() / IP(src=self.eth_para.server_ip, dst=self.eth_para.sd_ip) / UDP(sport=self.eth_para.sd_port, dport=self.eth_para.sd_port) / offer_packager.get_someip_with_session_id(self.sd_session_id, True) ) self.update_sd_session_id() return offer_package
5、unpack_define.py
from scapy.packet import Rawfrom typing import Optionalfrom dataclasses import dataclass@dataclassclass SomeIPHeaderParams: service_id: int event_id: int # 或 event_id,取决于消息类型 client_id: int session_id: int msg_type: int return_code: int length: int protocol_version: int interface_version: intclass SomeipUnpacker: @staticmethod def get_someip_header_params(receive_packet) -> Optional[SomeIPHeaderParams]: \"\"\"获取someip header\"\"\" try: if receive_packet.haslayer(\"SOME/IP\"): someip_layer = receive_packet[\"SOME/IP\"] # 获取someip header值 service_id = someip_layer.msg_id.srv_id sub_id = someip_layer.msg_id.sub_id event_id = sub_id << 15 | someip_layer.msg_id.event_id # 提取请求 ID(Client ID 和 Session ID) req_id = someip_layer.req_id client_id = req_id.client_id session_id = req_id.session_id # 构造头部对象 return SomeIPHeaderParams(  service_id=service_id,  event_id=event_id,  client_id=client_id,  session_id=session_id,  msg_type=someip_layer.msg_type,  return_code=someip_layer.retcode,  length=someip_layer.len,  protocol_version=someip_layer.proto_ver,  interface_version=someip_layer.iface_ver, ) except (AttributeError, ValueError) as e: print(f\"解析失败: {e}\") return None @staticmethod def get_someip_payload(receive_packet): \"\"\"获取someip payload\"\"\" try: someip_payload = receive_packet[Raw].load # print(f\"someip_payload: {someip_payload}\") return someip_payload except AttributeError: return None
6、someip_controller.py
import threadingimport socketimport timefrom module.packet_define import *from module.unpack_define import *from scapy.sendrecv import sendp, sniffclass SomeipController(threading.Thread): def __init__(self, someip_packer: SomeipPacker, someip_unpacker: SomeipUnpacker): super().__init__() self.someip_packer = someip_packer self.someip_unpacker = someip_unpacker self._stop_event = threading.Event() self.running = True def run(self): # 创建并发送offer以及订阅ack报文(这里忽略订阅,直接回复订阅ack) service_id_list = [0xA994, 0xA995] _offer = self.someip_packer.packet_offer( service_id=service_id_list, ttl=3 ) _offer_sender = SomeipSendLoop( network_card=EthParameter.sd_network_card, someip_packer=_offer, cycle_time_ms=2000, ) # 回复订阅ACK _subscribe_ack = self.someip_packer.packet_subscribe_ack( service_id=service_id_list, ttl=3 ) _subscribe_ack_sender = SomeipSendLoop( network_card=EthParameter.sd_network_card, someip_packer=_subscribe_ack, cycle_time_ms=2000, ) # 创建并发送订阅报文(订阅0xAB03, 0xAB04) _subscribe = self.someip_packer.packet_subscribe( service_id=[0xAB03,0xAB04], ttl=0x03 ) _subscribe_sender = SomeipSendLoop( network_card=EthParameter.server_network_card, someip_packer=_subscribe, cycle_time_ms=2000, ) # 启动接收模块 someip_receiver = SomeIpReceiver(EthParameter.server_network_card) while self._stop_event: time.sleep(0.01) def stop(self): self.running = False # 设置标志位为 False 来停止 self._stop_event.set() self.join()class SomeipSendLoop(threading.Thread): def __init__(self, network_card, someip_packer, cycle_time_ms): super().__init__() self.network_card = network_card self.someip_packer = someip_packer self.cycle_time_ms = cycle_time_ms self._stop_event = threading.Event() self.start() def run(self): while self._stop_event: if self.someip_packer: sendp(self.someip_packer, iface=self.network_card, verbose=0) time.sleep(self.cycle_time_ms / 1000) def stop(self): self._stop_event.set() self.join()class SomeIpReceiver(threading.Thread): def __init__( self, eth_desc: str, someip_unpacker ): super().__init__() self._eth_desc = eth_desc self._unpacker = someip_unpacker self._terminated = False self._stop_event =threading.Event() self.start() def packet_callback(self, packet): if packet: header_param = self._unpacker.get_someip_header_params( receive_packet=packet ) print(f\"msg_type: {header_param.msg_type}\") def run(self): sniff( iface=self._eth_desc, prn=self.packet_callback, filter=\"udp\", stop_filter=lambda x: self._terminated, ) while self._stop_event: # 保持sniff嗅探不中断 time.sleep(1) def stop(self): self._terminated = True self._stop_event.set() self.join()# 如果协议有TCPclass SomeipTcpSocket(threading.Thread): def __init__(self): super().__init__() self.tcp_sock: socket = None self._stop_event = threading.Event() self.start() def create_tcp_socket(self): self.tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.tcp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.tcp_sock.bind((EthParameter.icc_android_ip, EthParameter.consumer_prot)) self.tcp_sock.connect((EthParameter.adcc_ip, EthParameter.producer_port)) def run(self): self.create_tcp_socket() while self._stop_event: # 保持TCP通信 time.sleep(1) def stop(self): self._stop_event.set() self.join()if __name__ == \"__main__\": s_socket = SomeipTcpSocket() s_packer = SomeipPacker() s_unpacker = SomeipUnpacker() sim_server = SomeipController( s_packer, s_unpacker ) sim_server.start()


