1 files added
5 files modified
| | |
| | | from email.message import Message |
| | | from ShifterDefine import * |
| | | import struct |
| | | from udsoncan import DidCodec |
| | |
| | | reqpos = TCU2_ShiterLevel_dic['M'] |
| | | self.pre_ShiftLeverPos = "P" |
| | | self.max_pos = 'Shifter position Zero' |
| | | elif shift.position is SA1_Status_GearShftPosReq_dic[ |
| | | 'Shifter not initialized']: |
| | | pass |
| | | else: |
| | | if shift.position is not SA1_Status_GearShftPosReq_dic[ |
| | | 'Shifter position Zero']: |
| | |
| | | self.pre_ShiftLeverPos = [ |
| | | k for k, v in TCU2_ShiterLevel_dic.items() if v == reqpos |
| | | ][0] |
| | | |
| | | data = self.pack_TCU2(reqpos) |
| | | return data |
| | |
| | | SA1_Status_ParkButtonReq_dic = { |
| | | 'No request': 0x0, |
| | | 'Driver request park button': 1, |
| | | 'Park Button fault': 2, |
| | | 'Park button fault': 2, |
| | | 'Reserved': 3 |
| | | } |
| | | SA1_Status_UnlockButtonReq_dic = { |
| | |
| | | self.device = device_type |
| | | self.device_index = device_index |
| | | self.channel = can_index |
| | | self.timestamp_start = None |
| | | self.laststamp = None |
| | | |
| | | Timing0, Timing1 = TIMING_DICT[bitrate] |
| | | |
| | |
| | | if rtn == 0xFFFFFFFF or rtn == 0: |
| | | return None, False |
| | | else: |
| | | if self.timestamp_start == None: |
| | | self.laststamp = raw_message[0].TimeStamp |
| | | self.timestamp_start = self.laststamp |
| | | for i in range(rtn): |
| | | msg.append( |
| | | Message( |
| | | timestamp=raw_message[i].TimeStamp |
| | | if raw_message[i].TimeFlag else 0.0, |
| | | arbitration_id=raw_message[i].ID, |
| | | is_remote_frame=raw_message[i].RemoteFlag, |
| | | channel=0, |
| | | extended_id=raw_message[i].ExternFlag, |
| | | data=raw_message[i].Data, |
| | | )) |
| | | Message(timestamp=( |
| | | (raw_message[i].TimeStamp - self.timestamp_start) / |
| | | 1000000) if raw_message[i].TimeFlag else 0.0, |
| | | arbitration_id=raw_message[i].ID, |
| | | is_remote_frame=raw_message[i].RemoteFlag, |
| | | channel=0, |
| | | extended_id=raw_message[i].ExternFlag, |
| | | data=raw_message[i].Data, |
| | | is_error_frame=False)) |
| | | # timestamp = raw_message[i].TimeStamp if raw_message[ |
| | | # i].TimeFlag else 0.0, |
| | | # print(timestamp) |
| | | return (msg, rtn) |
| | | |
| | | def ListeningMsg(self, connectRet, needClose, msgQueue: Queue, |
New file |
| | |
| | | """ |
| | | HEX格式文件以行为单位记录数据,每行都由任意数量的十六进制数组成。 |
| | | |
| | | 每一行的格式如下: |
| | | |
| | | :本行数据长度(2byte)+数据起始地址(4byte)+数据类型(2byte)+数据内容(N byte)+校验(1byte) |
| | | """ |
| | | |
| | | from aenum import IntEnum |
| | | import binascii |
| | | |
| | | HEX_LEN_H = 1 |
| | | HEX_LEN_L = 2 |
| | | |
| | | HEX_ADDR_HH = 3 |
| | | HEX_ADDR_HL = 4 |
| | | HEX_ADDR_LH = 5 |
| | | HEX_ADDR_LL = 6 |
| | | |
| | | HEX_DATA_TYPE_H = 7 |
| | | HEX_DATA_TYPE_L = 8 |
| | | |
| | | HEX_DATA_START = 9 |
| | | |
| | | HexTable = { |
| | | 48: 0, |
| | | 49: 1, |
| | | 50: 2, |
| | | 51: 3, |
| | | 52: 4, |
| | | 53: 5, |
| | | 54: 6, |
| | | 55: 7, |
| | | 56: 8, |
| | | 57: 9, |
| | | 65: 10, |
| | | 66: 11, |
| | | 67: 12, |
| | | 68: 13, |
| | | 69: 14, |
| | | 70: 15 |
| | | } |
| | | |
| | | |
| | | class HexDataType(IntEnum): |
| | | DataRecord = 0 |
| | | FileEnd = 1 |
| | | ExtentionAddr = 2 |
| | | StartSectionAddr = 3 |
| | | ExtentionLinearAddr = 4 |
| | | StartLinearSectionAddr = 3 |
| | | |
| | | |
| | | class Hex_read(object): |
| | | |
| | | def __init__(self): |
| | | self.start_addr = 0 |
| | | self.len = 0 |
| | | self.data = b'' |
| | | |
| | | def Open_file(self, file): |
| | | self.file = file |
| | | self.start_addr = 0 |
| | | self.len = 0 |
| | | self.data = b'' |
| | | |
| | | with open(file, 'rb') as fd: |
| | | for aLineData in fd: |
| | | if aLineData[0] == 0x3A: # 0x3A == ":" |
| | | # print(aLineData) |
| | | DataLen = HexTable[aLineData[HEX_LEN_H]] * 16 + HexTable[ |
| | | aLineData[HEX_LEN_L]] |
| | | Data_Addr = HexTable[ |
| | | aLineData[HEX_ADDR_HH]] * 16 * 16 * 16 + HexTable[ |
| | | aLineData[HEX_ADDR_HL]] * 16 * 16 + HexTable[ |
| | | aLineData[HEX_ADDR_LH]] * 16 + HexTable[ |
| | | aLineData[HEX_ADDR_LL]] |
| | | Data_Type = HexTable[ |
| | | aLineData[HEX_DATA_TYPE_H]] * 16 + HexTable[ |
| | | aLineData[HEX_DATA_TYPE_L]] |
| | | if Data_Type == HexDataType.FileEnd: |
| | | break |
| | | if Data_Type == HexDataType.ExtentionLinearAddr: |
| | | self.start_addr = HexTable[aLineData[ |
| | | HEX_DATA_START]] << 28 | HexTable[aLineData[ |
| | | HEX_DATA_START + 1]] << 20 | HexTable[ |
| | | aLineData[HEX_DATA_START + |
| | | 2]] << 12 | HexTable[aLineData[ |
| | | HEX_DATA_START + 3]] << 4 |
| | | # print(self.start_addr) |
| | | if Data_Type == HexDataType.DataRecord: |
| | | if self.len == 0: |
| | | self.start_addr += Data_Addr |
| | | self.len += DataLen |
| | | self.data += aLineData[HEX_DATA_START:HEX_DATA_START + |
| | | DataLen * 2] |
| | | |
| | | fd.close() |
| | | self.data = binascii.a2b_hex(self.data) |
| | | |
| | | # print(len(self.data)) |
| | | # print(self.start_addr) |
| | | # print(bin_file[0:20]) |
| | |
| | | from ShifterDefine import * |
| | | import cantools |
| | | from pprint import pprint |
| | | import binascii |
| | | from hexread import * |
| | | # def rece_msg(bus): |
| | | # msg = bus.Receive(0.1) |
| | | # if msg[0] is not None: |
| | |
| | | "IndicationLEDControl": 0x8101 |
| | | } |
| | | |
| | | print(DID_dic["IndicationLEDControl"]) |
| | | # print(DID_dic["IndicationLEDControl"]) |
| | | |
| | | # def f(conn): |
| | | # conn.send([1, 'test', None]) |
| | |
| | | |
| | | # sys.exit(app.exec_()) |
| | | |
| | | dbc = cantools.database.load_file("DBC/SX7H.dbc") |
| | | # print(dbc.messages) |
| | | sa_message = dbc.get_message_by_name('SA1') |
| | | # print(sa_message) |
| | | # pprint(sa_message.signals) |
| | | frame = [0x00, 0x0A, 0x00, 0xF0, 0, 0, 0, 0] |
| | | input_signal = dbc.decode_message(0x420, frame) |
| | | part_str = str(input_signal['SA1_Status_ParkButtonReq']) |
| | | pprint(part_str) |
| | | if input_signal['SA1_Status_ParkButtonReq'] == 'No request': |
| | | print('yes') |
| | | # dbc = cantools.database.load_file("DBC/SX7H.dbc") |
| | | # # print(dbc.messages) |
| | | # sa_message = dbc.get_message_by_name('SA1') |
| | | # # print(sa_message) |
| | | # # pprint(sa_message.signals) |
| | | # frame = [0x00, 0x0A, 0x00, 0xF0, 0, 0, 0, 0] |
| | | # input_signal = dbc.decode_message(0x420, frame) |
| | | # part_str = str(input_signal['SA1_Status_ParkButtonReq']) |
| | | # pprint(part_str) |
| | | # if input_signal['SA1_Status_ParkButtonReq'] == 'No request': |
| | | # print('yes') |
| | | |
| | | data = sa_message.encode({ |
| | | 'SA1_Status_PRNDL': 0, |
| | | 'SA1_Status_GearShftPosReq': 0, #'Shifter position Zero', |
| | | 'SA1_Status_ParkButtonReq': 2, |
| | | "SA1_ShifterManualSignal": 0, |
| | | "SA1_ShifterModeSignal": 1, |
| | | "SA1_Status_ShftSensSng_Fault": 0, |
| | | "SA1_IND_ShifterMisUsd": 0, |
| | | "SA1_Live_counter": 1, |
| | | "SA1_Status_UnlockButtonReq": 1, |
| | | "SA1_Status_EcoShifterModeReq": 0, |
| | | "SA1_Status_RqGearPosInV": 0xf, |
| | | "SA1_Status_ShiftPosValidFlag": 0, |
| | | }) |
| | | # data = sa_message.encode({ |
| | | # 'SA1_Status_PRNDL': 0, |
| | | # 'SA1_Status_GearShftPosReq': 0, #'Shifter position Zero', |
| | | # 'SA1_Status_ParkButtonReq': 2, |
| | | # "SA1_ShifterManualSignal": 0, |
| | | # "SA1_ShifterModeSignal": 1, |
| | | # "SA1_Status_ShftSensSng_Fault": 0, |
| | | # "SA1_IND_ShifterMisUsd": 0, |
| | | # "SA1_Live_counter": 1, |
| | | # "SA1_Status_UnlockButtonReq": 1, |
| | | # "SA1_Status_EcoShifterModeReq": 0, |
| | | # "SA1_Status_RqGearPosInV": 0xf, |
| | | # "SA1_Status_ShiftPosValidFlag": 0, |
| | | # }) |
| | | # print(data) |
| | | # print(SA1_Status_ParkButtonReq_dic['No request']) |
| | | |
| | | with open(r"./source bin/SX7H_Shifter.bin", 'rb') as fd: |
| | | bin_file = fd.read() |
| | | print(bin_file[0:20]) |
| | | print('=====================================================') |
| | | |
| | | hex_rd = Hex_read() |
| | | hex_rd.Open_file(r"./source bin/SX7H_Shifter.hex") |
| | | hex_file = hex_rd.data |
| | | print(hex_file[0:20]) |
| | | |
| | | aa = binascii.a2b_hex(hex_file) |
| | | print(aa[0:20]) |
| | |
| | | from logging import exception |
| | | import threading |
| | | from typing import List |
| | | from can import BusABC, Message |
| | | from can import BusABC, Message, Logger, Listener |
| | | from udsoncan.client import Client |
| | | from udsoncan.exceptions import TimeoutException |
| | | import udsoncan |
| | |
| | | import datetime |
| | | from ShifterDefine import * |
| | | from multiprocessing import Process, Queue, Value, Pipe |
| | | from hexread import * |
| | | |
| | | MAX_RCV_NUM = 20 |
| | | APP_ADDR_LOCATION_OFFSET = 8 |
| | |
| | | 0x1c, 0x01, 0x06, 0x80, 0x1f, 0x01, 0x06, 0x80, 0xfb, 0x0a |
| | | ] |
| | | self.app_data: bytes = None |
| | | self.start_timestamp = time.time() |
| | | self.boot_logger = None |
| | | |
| | | def DeviceInit(self): |
| | | self._usbcan = None |
| | |
| | | msg.channel = 0 |
| | | msg.data = isotp_msg.data # bytearray |
| | | msg.is_extended_id = False |
| | | msg.timestamp = time.time() - self.start_timestamp |
| | | msg.is_error_frame = False |
| | | msg.is_remote_frame = False |
| | | self.Tool_send(msg) |
| | | |
| | | def Tool_send(self, msg: Message): |
| | | self._usbcan.send(msg) |
| | | if self.boot_logger is not None: |
| | | self.boot_logger.on_message_received(msg) |
| | | |
| | | def WidgetsInit(self): |
| | | # self.UI.setupUi(self) |
| | |
| | | self.UI.pushButton_48.clicked.connect(self.start_programming) |
| | | self.UI.pushButton_46.clicked.connect(self.loadAppfile) |
| | | self.UI.radioButton_2.toggled.connect(self.SetFromBootFlag) |
| | | |
| | | self.UI.pushButton_47.clicked.connect(self.SetBootLog) |
| | | # self.UI.comboBox_10.activated.connect(self.ReportByMask) |
| | | |
| | | self.UI.comboBox_11.addItems(list(ReportBymask_DTC.keys())) |
| | | self.UI.comboBox_12.addItems(list(DTC_Dic.keys())) |
| | |
| | | msg[i]) # send msg to isotp |
| | | |
| | | self.msgQueue.put(msg[i]) |
| | | if self.boot_logger is not None: |
| | | self.boot_logger.on_message_received(msg[i]) |
| | | # logger.info('time:'.format(msg[i].timestamp)+'Rx: ID=0x{:0<3x} '.format(msg[i].arbitration_id) + ' '.join(['0x' + |
| | | # '{:0<2x}'.format(a).upper() for a in list(msg[i].data)])) |
| | | |
| | | # send signal to update screen |
| | | |
| | | g_signal.sig_MsgReceived.emit(num) |
| | | |
| | | # msgToSendCnt = self.sendQueue.qsize() |
| | | # if msgToSendCnt > 0: |
| | | # msg = Message() |
| | |
| | | # msg = self.sendQueue.get() |
| | | # # logger.info('Tx: ID=0x{:0<3x} '.format( |
| | | # # msg.arbitration_id)+'DLC={} '.format(msg.dlc)+'externd flag ={} '.format(msg.is_extended_id)+'remote frame:{} '.format(msg.is_remote_frame)) |
| | | # self._usbcan.send(msg) |
| | | # self.Tool_send(msg) |
| | | def send_dump(self): |
| | | msg = Message() |
| | | msg.arbitration_id = self.shifter.canid.phy_rxId |
| | |
| | | msg.data = [0x02, 0x3e, 0x80, 0, 0, 0, 0, 0] |
| | | msg.is_extended_id = False |
| | | msg.is_remote_frame = False |
| | | self._usbcan.send(msg) |
| | | msg.is_error_frame = False |
| | | msg.timestamp = time.time() - self.start_timestamp |
| | | self.Tool_send(msg) |
| | | |
| | | def send_VehiclePosition(self, data=[]): |
| | | msg = Message() |
| | |
| | | msg.data = data |
| | | msg.is_extended_id = False |
| | | msg.is_remote_frame = False |
| | | self._usbcan.send(msg) |
| | | msg.is_error_frame = False |
| | | msg.timestamp = time.time() - self.start_timestamp |
| | | self.Tool_send(msg) |
| | | |
| | | def open_close(self): |
| | | if self._isOpen.value == 0: |
| | |
| | | TIMING_DICT.keys())[self.devicedescription.baudrate] |
| | | self._usbcan = USBCAN(device_type=4, |
| | | device_index=0, |
| | | can_index=0, |
| | | can_index=self.devicedescription.channel, |
| | | bitrate=bitrate, |
| | | can_filters=can_filters) |
| | | if not self._usbcan.InitAndStart(): |
| | |
| | | self.needdisconnect.value = 1 |
| | | self._deviceOpenUpdate() |
| | | self.TestPresentTimer.stop() |
| | | |
| | | def closeEvent(self, e): |
| | | self.boot_logger.stop() |
| | | |
| | | def update_HardwareDevice(self): |
| | | |
| | |
| | | if fileName != '': |
| | | self.UI.comboBox_4.addItem(fileName) |
| | | self.UI.comboBox_4.setCurrentText(fileName) |
| | | with open(fileName, 'rb') as fd: |
| | | self.app_data = fd.read() |
| | | drv_data_len = len( |
| | | self.drv_data) if self.drv_data is not None else 0 |
| | | app_data_len = len( |
| | | self.app_data) if self.app_data is not None else 0 |
| | | total_len = drv_data_len + app_data_len |
| | | self.UI.progressBar.setRange(0, total_len) |
| | | |
| | | # with open(fileName, 'rb') as fd: |
| | | # self.app_data = fd.read() |
| | | # drv_data_len = len( |
| | | # self.drv_data) if self.drv_data is not None else 0 |
| | | # app_data_len = len( |
| | | # self.app_data) if self.app_data is not None else 0 |
| | | # total_len = drv_data_len + app_data_len |
| | | # self.UI.progressBar.setRange(0, total_len) |
| | | |
| | | # print(self.app_data) |
| | | # print("===============================================") |
| | | hex_read = Hex_read() |
| | | hex_read.Open_file(fileName) |
| | | self.app_data = hex_read.data |
| | | drv_data_len = len( |
| | | self.drv_data) if self.drv_data is not None else 0 |
| | | app_data_len = len( |
| | | self.app_data) if self.app_data is not None else 0 |
| | | total_len = drv_data_len + app_data_len |
| | | self.UI.progressBar.setRange(0, total_len) |
| | | |
| | | def SetBootLog(self): |
| | | if self.in_programming: |
| | | self.disp_string('## 正在编程中') |
| | | return |
| | | |
| | | fileName, _ = QtWidgets.QFileDialog.getSaveFileName( |
| | | None, 'comboBox_10', '.', |
| | | 'All Files (*);;ASC Files (*.asc);;Blf Files (*.blf)') |
| | | |
| | | if fileName != '': |
| | | self.UI.comboBox_10.addItem(fileName) |
| | | self.UI.comboBox_10.setCurrentText(fileName) |
| | | print(fileName) |
| | | self.boot_logger = Logger(fileName) |
| | | # self.listener = [self.boot_logger] |
| | | # with open(fileName, 'rb') as fd: |
| | | # self.app_data = fd.read() |
| | | # drv_data_len = len( |
| | | # self.drv_data) if self.drv_data is not None else 0 |
| | | # app_data_len = len( |
| | | # self.app_data) if self.app_data is not None else 0 |
| | | # total_len = drv_data_len + app_data_len |
| | | # self.UI.progressBar.setRange(0, total_len) |