Shifter.py | ●●●●● patch | view | raw | blame | history | |
USBCAN.py | ●●●●● patch | view | raw | blame | history | |
hexread.py | ●●●●● patch | view | raw | blame | history | |
test.py | ●●●●● patch | view | raw | blame | history | |
widgets/ShifterTool.py | ●●●●● patch | view | raw | blame | history |
Shifter.py
@@ -1,4 +1,3 @@ from email.message import Message from ShifterDefine import * import struct from udsoncan import DidCodec USBCAN.py
@@ -304,6 +304,8 @@ self.device = device_type self.device_index = device_index self.channel = can_index self.timestamp_start = None self.laststamp = None Timing0, Timing1 = TIMING_DICT[bitrate] @@ -429,17 +431,23 @@ if rtn == 0xFFFFFFFF or rtn == 0: return None, False else: if self.timestamp_start == None: self.laststamp = raw_message[0].TimeStamp self.timestamp_start = self.laststamp for i in range(rtn): msg.append( Message( timestamp=raw_message[i].TimeStamp if raw_message[i].TimeFlag else 0.0, Message(timestamp=( (raw_message[i].TimeStamp - self.timestamp_start) / 1000000) if raw_message[i].TimeFlag else 0.0, arbitration_id=raw_message[i].ID, is_remote_frame=raw_message[i].RemoteFlag, channel=0, extended_id=raw_message[i].ExternFlag, data=raw_message[i].Data, )) is_error_frame=False)) # timestamp = raw_message[i].TimeStamp if raw_message[ # i].TimeFlag else 0.0, # print(timestamp) return (msg, rtn) def ListeningMsg(self, connectRet, needClose, msgQueue: Queue, hexread.py
New file @@ -0,0 +1,103 @@ """ HEX格式文件以行为单位记录数据,每行都由任意数量的十六进制数组成。 每一行的格式如下: :本行数据长度(2byte)+数据起始地址(4byte)+数据类型(2byte)+数据内容(N byte)+校验(1byte) """ from aenum import IntEnum import binascii HEX_LEN_H = 1 HEX_LEN_L = 2 HEX_ADDR_HH = 3 HEX_ADDR_HL = 4 HEX_ADDR_LH = 5 HEX_ADDR_LL = 6 HEX_DATA_TYPE_H = 7 HEX_DATA_TYPE_L = 8 HEX_DATA_START = 9 HexTable = { 48: 0, 49: 1, 50: 2, 51: 3, 52: 4, 53: 5, 54: 6, 55: 7, 56: 8, 57: 9, 65: 10, 66: 11, 67: 12, 68: 13, 69: 14, 70: 15 } class HexDataType(IntEnum): DataRecord = 0 FileEnd = 1 ExtentionAddr = 2 StartSectionAddr = 3 ExtentionLinearAddr = 4 StartLinearSectionAddr = 3 class Hex_read(object): def __init__(self): self.start_addr = 0 self.len = 0 self.data = b'' def Open_file(self, file): self.file = file self.start_addr = 0 self.len = 0 self.data = b'' with open(file, 'rb') as fd: for aLineData in fd: if aLineData[0] == 0x3A: # 0x3A == ":" # print(aLineData) DataLen = HexTable[aLineData[HEX_LEN_H]] * 16 + HexTable[ aLineData[HEX_LEN_L]] Data_Addr = HexTable[ aLineData[HEX_ADDR_HH]] * 16 * 16 * 16 + HexTable[ aLineData[HEX_ADDR_HL]] * 16 * 16 + HexTable[ aLineData[HEX_ADDR_LH]] * 16 + HexTable[ aLineData[HEX_ADDR_LL]] Data_Type = HexTable[ aLineData[HEX_DATA_TYPE_H]] * 16 + HexTable[ aLineData[HEX_DATA_TYPE_L]] if Data_Type == HexDataType.FileEnd: break if Data_Type == HexDataType.ExtentionLinearAddr: self.start_addr = HexTable[aLineData[ HEX_DATA_START]] << 28 | HexTable[aLineData[ HEX_DATA_START + 1]] << 20 | HexTable[ aLineData[HEX_DATA_START + 2]] << 12 | HexTable[aLineData[ HEX_DATA_START + 3]] << 4 # print(self.start_addr) if Data_Type == HexDataType.DataRecord: if self.len == 0: self.start_addr += Data_Addr self.len += DataLen self.data += aLineData[HEX_DATA_START:HEX_DATA_START + DataLen * 2] fd.close() self.data = binascii.a2b_hex(self.data) # print(len(self.data)) # print(self.start_addr) # print(bin_file[0:20]) test.py
@@ -14,6 +14,8 @@ from ShifterDefine import * import cantools from pprint import pprint import binascii from hexread import * # def rece_msg(bus): # msg = bus.Receive(0.1) # if msg[0] is not None: @@ -52,7 +54,7 @@ "IndicationLEDControl": 0x8101 } print(DID_dic["IndicationLEDControl"]) # print(DID_dic["IndicationLEDControl"]) # def f(conn): # conn.send([1, 'test', None]) @@ -217,31 +219,44 @@ # sys.exit(app.exec_()) dbc = cantools.database.load_file("DBC/SX7H.dbc") # print(dbc.messages) sa_message = dbc.get_message_by_name('SA1') # print(sa_message) # pprint(sa_message.signals) frame = [0x00, 0x0A, 0x00, 0xF0, 0, 0, 0, 0] input_signal = dbc.decode_message(0x420, frame) part_str = str(input_signal['SA1_Status_ParkButtonReq']) pprint(part_str) if input_signal['SA1_Status_ParkButtonReq'] == 'No request': print('yes') # dbc = cantools.database.load_file("DBC/SX7H.dbc") # # print(dbc.messages) # sa_message = dbc.get_message_by_name('SA1') # # print(sa_message) # # pprint(sa_message.signals) # frame = [0x00, 0x0A, 0x00, 0xF0, 0, 0, 0, 0] # input_signal = dbc.decode_message(0x420, frame) # part_str = str(input_signal['SA1_Status_ParkButtonReq']) # pprint(part_str) # if input_signal['SA1_Status_ParkButtonReq'] == 'No request': # print('yes') data = sa_message.encode({ 'SA1_Status_PRNDL': 0, 'SA1_Status_GearShftPosReq': 0, #'Shifter position Zero', 'SA1_Status_ParkButtonReq': 2, "SA1_ShifterManualSignal": 0, "SA1_ShifterModeSignal": 1, "SA1_Status_ShftSensSng_Fault": 0, "SA1_IND_ShifterMisUsd": 0, "SA1_Live_counter": 1, "SA1_Status_UnlockButtonReq": 1, "SA1_Status_EcoShifterModeReq": 0, "SA1_Status_RqGearPosInV": 0xf, "SA1_Status_ShiftPosValidFlag": 0, }) # data = sa_message.encode({ # 'SA1_Status_PRNDL': 0, # 'SA1_Status_GearShftPosReq': 0, #'Shifter position Zero', # 'SA1_Status_ParkButtonReq': 2, # "SA1_ShifterManualSignal": 0, # "SA1_ShifterModeSignal": 1, # "SA1_Status_ShftSensSng_Fault": 0, # "SA1_IND_ShifterMisUsd": 0, # "SA1_Live_counter": 1, # "SA1_Status_UnlockButtonReq": 1, # "SA1_Status_EcoShifterModeReq": 0, # "SA1_Status_RqGearPosInV": 0xf, # "SA1_Status_ShiftPosValidFlag": 0, # }) # print(data) # print(SA1_Status_ParkButtonReq_dic['No request']) with open(r"./source bin/SX7H_Shifter.bin", 'rb') as fd: bin_file = fd.read() print(bin_file[0:20]) print('=====================================================') hex_rd = Hex_read() hex_rd.Open_file(r"./source bin/SX7H_Shifter.hex") hex_file = hex_rd.data print(hex_file[0:20]) aa = binascii.a2b_hex(hex_file) print(aa[0:20]) widgets/ShifterTool.py
@@ -4,7 +4,7 @@ from logging import exception import threading from typing import List from can import BusABC, Message from can import BusABC, Message, Logger, Listener from udsoncan.client import Client from udsoncan.exceptions import TimeoutException import udsoncan @@ -23,6 +23,7 @@ import datetime from ShifterDefine import * from multiprocessing import Process, Queue, Value, Pipe from hexread import * MAX_RCV_NUM = 20 APP_ADDR_LOCATION_OFFSET = 8 @@ -255,6 +256,8 @@ 0x1c, 0x01, 0x06, 0x80, 0x1f, 0x01, 0x06, 0x80, 0xfb, 0x0a ] self.app_data: bytes = None self.start_timestamp = time.time() self.boot_logger = None def DeviceInit(self): self._usbcan = None @@ -413,7 +416,15 @@ msg.channel = 0 msg.data = isotp_msg.data # bytearray msg.is_extended_id = False msg.timestamp = time.time() - self.start_timestamp msg.is_error_frame = False msg.is_remote_frame = False self.Tool_send(msg) def Tool_send(self, msg: Message): self._usbcan.send(msg) if self.boot_logger is not None: self.boot_logger.on_message_received(msg) def WidgetsInit(self): # self.UI.setupUi(self) @@ -475,6 +486,9 @@ self.UI.pushButton_48.clicked.connect(self.start_programming) self.UI.pushButton_46.clicked.connect(self.loadAppfile) self.UI.radioButton_2.toggled.connect(self.SetFromBootFlag) self.UI.pushButton_47.clicked.connect(self.SetBootLog) # self.UI.comboBox_10.activated.connect(self.ReportByMask) self.UI.comboBox_11.addItems(list(ReportBymask_DTC.keys())) self.UI.comboBox_12.addItems(list(DTC_Dic.keys())) @@ -546,12 +560,15 @@ msg[i]) # send msg to isotp self.msgQueue.put(msg[i]) if self.boot_logger is not None: self.boot_logger.on_message_received(msg[i]) # logger.info('time:'.format(msg[i].timestamp)+'Rx: ID=0x{:0<3x} '.format(msg[i].arbitration_id) + ' '.join(['0x' + # '{:0<2x}'.format(a).upper() for a in list(msg[i].data)])) # send signal to update screen g_signal.sig_MsgReceived.emit(num) # msgToSendCnt = self.sendQueue.qsize() # if msgToSendCnt > 0: # msg = Message() @@ -560,7 +577,7 @@ # msg = self.sendQueue.get() # # logger.info('Tx: ID=0x{:0<3x} '.format( # # msg.arbitration_id)+'DLC={} '.format(msg.dlc)+'externd flag ={} '.format(msg.is_extended_id)+'remote frame:{} '.format(msg.is_remote_frame)) # self._usbcan.send(msg) # self.Tool_send(msg) def send_dump(self): msg = Message() msg.arbitration_id = self.shifter.canid.phy_rxId @@ -569,7 +586,9 @@ msg.data = [0x02, 0x3e, 0x80, 0, 0, 0, 0, 0] msg.is_extended_id = False msg.is_remote_frame = False self._usbcan.send(msg) msg.is_error_frame = False msg.timestamp = time.time() - self.start_timestamp self.Tool_send(msg) def send_VehiclePosition(self, data=[]): msg = Message() @@ -579,7 +598,9 @@ msg.data = data msg.is_extended_id = False msg.is_remote_frame = False self._usbcan.send(msg) msg.is_error_frame = False msg.timestamp = time.time() - self.start_timestamp self.Tool_send(msg) def open_close(self): if self._isOpen.value == 0: @@ -612,6 +633,9 @@ self.needdisconnect.value = 1 self._deviceOpenUpdate() self.TestPresentTimer.stop() def closeEvent(self, e): self.boot_logger.stop() def update_HardwareDevice(self): @@ -726,6 +750,7 @@ pass def TestPresentEvent(self): self.udsclient. if self.UI.radioButton.isChecked(): self.TestPresentTimer.start(3000) else: @@ -1153,11 +1178,48 @@ if fileName != '': self.UI.comboBox_4.addItem(fileName) self.UI.comboBox_4.setCurrentText(fileName) with open(fileName, 'rb') as fd: self.app_data = fd.read() # with open(fileName, 'rb') as fd: # self.app_data = fd.read() # drv_data_len = len( # self.drv_data) if self.drv_data is not None else 0 # app_data_len = len( # self.app_data) if self.app_data is not None else 0 # total_len = drv_data_len + app_data_len # self.UI.progressBar.setRange(0, total_len) # print(self.app_data) # print("===============================================") hex_read = Hex_read() hex_read.Open_file(fileName) self.app_data = hex_read.data drv_data_len = len( self.drv_data) if self.drv_data is not None else 0 app_data_len = len( self.app_data) if self.app_data is not None else 0 total_len = drv_data_len + app_data_len self.UI.progressBar.setRange(0, total_len) def SetBootLog(self): if self.in_programming: self.disp_string('## 正在编程中') return fileName, _ = QtWidgets.QFileDialog.getSaveFileName( None, 'comboBox_10', '.', 'All Files (*);;ASC Files (*.asc);;Blf Files (*.blf)') if fileName != '': self.UI.comboBox_10.addItem(fileName) self.UI.comboBox_10.setCurrentText(fileName) print(fileName) self.boot_logger = Logger(fileName) # self.listener = [self.boot_logger] # with open(fileName, 'rb') as fd: # self.app_data = fd.read() # drv_data_len = len( # self.drv_data) if self.drv_data is not None else 0 # app_data_len = len( # self.app_data) if self.app_data is not None else 0 # total_len = drv_data_len + app_data_len # self.UI.progressBar.setRange(0, total_len)