| | |
| | | |
| | | from ast import Pass |
| | | from ast import Not, Pass |
| | | from concurrent.futures import thread |
| | | from curses import flash |
| | | from logging import exception |
| | | import threading |
| | | from can import BusABC, Message |
| | | from typing import List |
| | | from can import BusABC, Message, Logger, Listener |
| | | from udsoncan.client import Client |
| | | from udsoncan.exceptions import TimeoutException |
| | | import udsoncan |
| | |
| | | from isotp import CanMessage |
| | | import queue |
| | | from USBCAN import * |
| | | from Shifter import ShifterClass |
| | | from Shifter import * |
| | | from styleSheet import * |
| | | from mainwindows import Ui_MainWindow |
| | | from PySide2 import QtWidgets, QtCore, QtGui |
| | | import struct |
| | | import time |
| | | import datetime |
| | | from multiprocessing import Array, Pool, Process, Queue, Value |
| | | from ShifterDefine import * |
| | | from multiprocessing import Process, Queue, Value, Pipe |
| | | from hexread import * |
| | | |
| | | MAX_RCV_NUM = 20 |
| | | |
| | | APP_ADDR_LOCATION_OFFSET = 8 |
| | | APP_LENGTH_LOCATION_OFFSET = 12 |
| | | logger = logging.getLogger() |
| | | logger.setLevel(logging.DEBUG) |
| | | formatter = logging.Formatter( |
| | |
| | | consoleHandler = logging.StreamHandler() |
| | | consoleHandler.setLevel(logging.DEBUG) |
| | | |
| | | fileHandler = logging.FileHandler( |
| | | './log/ShiftTool.log', mode='a', encoding='UTF-8') |
| | | fileHandler.setLevel(logging.NOTSET) |
| | | fileHandler = logging.FileHandler('./log/ShiftTool.log', |
| | | mode='a', |
| | | encoding='UTF-8') |
| | | fileHandler.setLevel(logging.DEBUG) |
| | | |
| | | consoleHandler.setFormatter(formatter) |
| | | fileHandler.setFormatter(formatter) |
| | |
| | | |
| | | |
| | | g_signal = UISignals() |
| | | uds_conn, isotp_conn = Pipe() |
| | | user_conn, drive_conn = Pipe() |
| | | |
| | | |
| | | class HardwreDevice(): |
| | | |
| | | def __init__(self): |
| | | self.device_type = 4 |
| | | self.channel = 0 # can_index |
| | |
| | | |
| | | |
| | | class PeriodSendThread(object): |
| | | |
| | | def __init__(self, period_func, args=[], kwargs={}): |
| | | self._thread = threading.Thread(target=self._run) |
| | | self._function = period_func |
| | |
| | | BaseConnection.__init__(self, name) |
| | | self.UDStoIsoTPQueue = queue.Queue() # USD --> isotp |
| | | self.IsoTPtoUDSQueue = queue.Queue() # ISOTP --> UDS |
| | | self.CANtoIsoTPQueue = queue.Queue() # CAN --> isotp |
| | | # self.IsoTPtoCANQueue = queue.Queue() # ISOTP--> CAN |
| | | self._read_thread = None |
| | | self.exit_requested = False |
| | |
| | | self.isotp_layer = isotp_layer |
| | | |
| | | assert isinstance( |
| | | self.isotp_layer, isotp.TransportLayer), 'isotp_layer must be a valid isotp.TransportLayer ' |
| | | self.isotp_layer, isotp.TransportLayer |
| | | ), 'isotp_layer must be a valid isotp.TransportLayer ' |
| | | |
| | | def open(self): |
| | | self.exit_requested = False |
| | | self._read_thread = threading.Thread( |
| | | None, target=self.rxthread_task) |
| | | self._read_thread = threading.Thread(None, |
| | | target=self.rxthread_task) |
| | | self._read_thread.start() |
| | | self.opened = True |
| | | logger.info('Connection opened') |
| | |
| | | if self.mtu is not None: |
| | | if len(payload) > self.mtu: |
| | | logger.warning( |
| | | "Truncating payload to be set to a length of %d" % (self.mtu)) |
| | | "Truncating payload to be set to a length of %d" % |
| | | (self.mtu)) |
| | | payload = payload[0:self.mtu] |
| | | |
| | | # isotp.protocol.TransportLayer uses byte array. udsoncan is strict on bytes format |
| | | self.UDStoIsoTPQueue.put(bytearray(payload)) |
| | | self.isotp_layer.send(bytearray(payload)) |
| | | # self.UDStoIsoTPQueue.put(bytearray(payload)) |
| | | |
| | | def specific_wait_frame(self, timeout=2): |
| | | if not self.opened: |
| | |
| | | |
| | | timedout = False |
| | | frame = None |
| | | # frame = uds_conn.recv() |
| | | try: |
| | | frame = self.IsoTPtoUDSQueue.get(block=True, timeout=timeout) |
| | | # frame = self.CANtoIsoTPQueue.get(block=True, timeout=timeout) |
| | | frame = self.IsoTPtoUDSQueue.get(block=True, timeout=5) |
| | | except queue.Empty: |
| | | timedout = True |
| | | # frame = self.CANtoIsoTPQueue.get(block=True, timeout=timeout) |
| | | |
| | | if timedout: |
| | | raise TimeoutException( |
| | | "Did not receive frame IsoTP Transport layer in time (timeout=%s sec)" % timeout) |
| | | "Did not receive frame IsoTP Transport layer in time (timeout=%s sec)" |
| | | % timeout) |
| | | |
| | | if self.mtu is not None: |
| | | if frame is not None and len(frame) > self.mtu: |
| | | logger.warning( |
| | | "Truncating received payload to a length of %d" % (self.mtu)) |
| | | "Truncating received payload to a length of %d" % |
| | | (self.mtu)) |
| | | frame = frame[0:self.mtu] |
| | | |
| | | # isotp.protocol.TransportLayer uses bytearray. udsoncan is strict on bytes format |
| | |
| | | def rxthread_task(self): |
| | | while not self.exit_requested: |
| | | try: |
| | | # self.logger.debug("UDStoIsoTPQueue queue size is now %d" % ( |
| | | # self.UDStoIsoTPQueue.qsize())) |
| | | while not self.UDStoIsoTPQueue.empty(): |
| | | self.isotp_layer.send(self.UDStoIsoTPQueue.get()) |
| | | # # self.logger.debug("UDStoIsoTPQueue queue size is now %d" % ( |
| | | # # self.UDStoIsoTPQueue.qsize())) |
| | | # while not self.UDStoIsoTPQueue.empty(): |
| | | # self.isotp_layer.send(self.UDStoIsoTPQueue.get()) |
| | | |
| | | self.isotp_layer.process() |
| | | |
| | | while self.isotp_layer.available(): |
| | | # isotp_conn.send(self.isotp_layer.recv()) |
| | | self.IsoTPtoUDSQueue.put(self.isotp_layer.recv()) |
| | | # self.logger.debug("IsoTPtoUDSQueue queue size is now %d" % ( |
| | | # self.IsoTPtoUDSQueue.qsize())) |
| | | self.logger.debug( |
| | | "IsoTPtoUDSQueue queue size is now %d" % |
| | | (self.IsoTPtoUDSQueue.qsize())) |
| | | |
| | | # time.sleep(self.isotp_layer.sleep_time()) |
| | | # time.sleep(self.isotp_layer.sleep_time()) |
| | | time.sleep(0.001) |
| | | |
| | | except Exception as e: |
| | |
| | | # self.protocol("WM_DELETE_WINDOW", self.Form_OnClosing) |
| | | self.msgQueue = Queue() # can layer receive queue |
| | | self.sendQueue = Queue() # can layer send queue |
| | | |
| | | self.CANtoIsoTPQueue = Queue() # CAN --> isotp |
| | | self.shifter = ShifterClass() |
| | | self.Vehicle = VehicleClass() |
| | | self.devicedescription = HardwreDevice() |
| | | self.windows = QtWidgets.QMainWindow() |
| | | self.UI = Ui_MainWindow() |
| | | self.UI.setupUi(window) |
| | | self._dev_info = None |
| | | self.dbc = cantools.database.load_file("DBC/SX7H.dbc") |
| | | self.can_thread = threading.Thread(target=self.can_thread) |
| | | self.TestPresentTimer = QtCore.QTimer() |
| | | self.TestPresentTimer.timeout.connect(self.creat_testpresentReq) |
| | | self.DeviceInit() |
| | | self.WidgetsInit() |
| | | self.ChnInfoUpdate(self._isOpen) |
| | | self.in_programming = False |
| | | self.startfromboot = False |
| | | self.drv_data = [ |
| | | 0x1c, 0x01, 0x06, 0x80, 0x1f, 0x01, 0x06, 0x80, 0xfb, 0x0a |
| | | ] |
| | | self.app_data: bytes = None |
| | | self.start_timestamp = time.time() |
| | | self.boot_logger = None |
| | | |
| | | def DeviceInit(self): |
| | | self._usbcan = None |
| | |
| | | 'squash_stmin_requirement': False |
| | | } |
| | | self._isotpaddr_PHYS = isotp.Address( |
| | | isotp.AddressingMode.Normal_11bits, txid=self.shifter.canid.phy_rxId, rxid=self.shifter.canid.phy_txId) |
| | | isotp.AddressingMode.Normal_11bits, |
| | | txid=self.shifter.canid.phy_rxId, |
| | | rxid=self.shifter.canid.phy_txId) |
| | | self._isotpaddr_FUNC = isotp.Address( |
| | | isotp.AddressingMode.Normal_11bits, txid=self.shifter.canid.fun_rxId, rxid=self.shifter.canid.phy_txId) |
| | | isotp.AddressingMode.Normal_11bits, |
| | | txid=self.shifter.canid.fun_rxId, |
| | | rxid=self.shifter.canid.phy_txId) |
| | | # self._isotpaddr_EPS = isotp.Address( |
| | | # isotp.AddressingMode.Normal_11bits, txid=EPS_RX_ID_PHYS, rxid=EPS_TX_ID) |
| | | # self._isotpaddr_EPS4wd = isotp.Address( |
| | | # isotp.AddressingMode.Normal_11bits, txid=EPS4wd_RX_ID_PHYS, rxid=EPS4wd_TX_ID) |
| | | self.isotp_layer = isotp.TransportLayer( |
| | | rxfn=self.isotp_rcv, txfn=self.isotp_send, address=self._isotpaddr_PHYS, params=self.isotp_params) |
| | | self.isotp_layer = isotp.TransportLayer(rxfn=self.isotp_rcv, |
| | | txfn=self.isotp_send, |
| | | address=self._isotpaddr_PHYS, |
| | | params=self.isotp_params) |
| | | self.conn = ShifterTool.IsoTpConnection(isotp_layer=self.isotp_layer) |
| | | self.udsclient = Client(self.conn, request_timeout=2) |
| | | self.udsclient.config['security_algo'] = self.SecAlgo |
| | | self.udsclient.config['security_algo_params'] = [ |
| | | 0x4FE87269, 0x6BC361D8, 0x9B127D51, 0x5BA41903] |
| | | 0x11223344, 0x20AA097B, 0x11223344, 0x70237577 |
| | | ] |
| | | self.udsclient.config['data_identifiers'] = self.shifter.did_config |
| | | |
| | | self.udsclient.config['server_address_format'] = 32 |
| | | self.udsclient.config['server_memorysize_format'] = 32 |
| | | |
| | | def SecAlgo(self, level, seed, params): |
| | | def SecAlgo(self, level, seed, params=None): |
| | | """ |
| | | Builds the security key to unlock a security level. |
| | | |
| | |
| | | self.output_key[2] = ((temp_key[1] & 0xFC) >> 2) | (temp_key[0] & 0xC0) |
| | | self.output_key[3] = ((temp_key[0] & 0x0F) << 4) | (temp_key[3] & 0x0F) |
| | | """ |
| | | temp_key = (seed[0] << 24) | ( |
| | | seed[1] << 16) | (seed[2] << 8) | (seed[3]) |
| | | temp_key = (seed[0] << 24) | (seed[1] << 16) | (seed[2] << 8) | ( |
| | | seed[3]) |
| | | if level == 0x01: |
| | | output_key_temp = ((((temp_key >> 4) ^ temp_key) |
| | | << 3) ^ temp_key) & 0xFFFFFFFF |
| | | elif level == 0x11: |
| | | _temp_y = ((temp_key << 24) & 0xFF000000) + ((temp_key << 8) & |
| | | 0xFF0000) + ((temp_key >> 8) & 0xFF00) + ((temp_key >> 24) & 0xFF) |
| | | _temp_z = 0 |
| | | _temp_sum = 0 |
| | | for i in range(64): |
| | | _temp_y += ((((_temp_z << 4) ^ (_temp_z >> 5)) + _temp_z) |
| | | ^ (_temp_sum + params[_temp_sum & 0x3])) & 0xFFFFFFFF |
| | | _temp_y = _temp_y & 0xFFFFFFFF |
| | | _temp_sum += 0x8F750A1D |
| | | _temp_sum = _temp_sum & 0xFFFFFFFF |
| | | _temp_z += ((((_temp_y << 4) ^ (_temp_y >> 5)) + _temp_y) ^ |
| | | (_temp_sum + params[(_temp_sum >> 11) & 0x3])) & 0xFFFFFFFF |
| | | _temp_z = _temp_z & 0xFFFFFFFF |
| | | output_key_temp = (((_temp_z << 24) & 0xFF000000) | ((_temp_z << 8) & 0xFF0000) | ( |
| | | (_temp_z >> 8) & 0xFF00) | ((_temp_z >> 24) & 0xFF)) |
| | | output_key_temp = 0x20AA097B |
| | | output_key = struct.pack('BBBB', (output_key_temp >> 24) & 0xFF, |
| | | (output_key_temp >> 16) & 0xFF, |
| | | (output_key_temp >> 8) & 0xFF, |
| | | output_key_temp & 0xFF) |
| | | # output_key_temp = ((((temp_key >> 4) ^ temp_key) |
| | | # << 3) ^ temp_key) & 0xFFFFFFFF |
| | | elif level == 0x09: |
| | | output_key_temp = 0x70237577 |
| | | output_key = struct.pack('BBBB', (output_key_temp >> 24) & 0xFF, |
| | | (output_key_temp >> 16) & 0xFF, |
| | | (output_key_temp >> 8) & 0xFF, |
| | | output_key_temp & 0xFF) |
| | | # _temp_y = ((temp_key << 24) & 0xFF000000) + ((temp_key << 8) & |
| | | # 0xFF0000) + ((temp_key >> 8) & 0xFF00) + ((temp_key >> 24) & 0xFF) |
| | | # _temp_z = 0 |
| | | # _temp_sum = 0 |
| | | # for i in range(64): |
| | | # _temp_y += ((((_temp_z << 4) ^ (_temp_z >> 5)) + _temp_z) |
| | | # ^ (_temp_sum + params[_temp_sum & 0x3])) & 0xFFFFFFFF |
| | | # _temp_y = _temp_y & 0xFFFFFFFF |
| | | # _temp_sum += 0x8F750A1D |
| | | # _temp_sum = _temp_sum & 0xFFFFFFFF |
| | | # _temp_z += ((((_temp_y << 4) ^ (_temp_y >> 5)) + _temp_y) ^ |
| | | # (_temp_sum + params[(_temp_sum >> 11) & 0x3])) & 0xFFFFFFFF |
| | | # _temp_z = _temp_z & 0xFFFFFFFF |
| | | # output_key_temp = (((_temp_z << 24) & 0xFF000000) | ((_temp_z << 8) & 0xFF0000) | ( |
| | | # (_temp_z >> 8) & 0xFF00) | ((_temp_z >> 24) & 0xFF)) |
| | | elif level == 0x7D: |
| | | output_key_temp = 0xAB2F9F36099D81F3 |
| | | output_key = struct.pack( |
| | | 'BBBBBBBB', (output_key_temp >> 56) & 0xFF, |
| | | (output_key_temp >> 48) & 0xFF, (output_key_temp >> 40) & 0xFF, |
| | | (output_key_temp >> 32) & 0xFF, (output_key_temp >> 24) & 0xFF, |
| | | (output_key_temp >> 16) & 0xFF, (output_key_temp >> 8) & 0xFF, |
| | | output_key_temp & 0xFF) |
| | | else: |
| | | output_key_temp = temp_key |
| | | |
| | | output_key = struct.pack('BBBB', (output_key_temp >> 24) & 0xFF, ( |
| | | output_key_temp >> 16) & 0xFF, (output_key_temp >> 8) & 0xFF, output_key_temp & 0xFF) |
| | | output_key = struct.pack('BBBB', (output_key_temp >> 24) & 0xFF, |
| | | (output_key_temp >> 16) & 0xFF, |
| | | (output_key_temp >> 8) & 0xFF, |
| | | output_key_temp & 0xFF) |
| | | |
| | | return output_key |
| | | |
| | |
| | | '''receive msg from can layer |
| | | use queue read from can layer listen''' |
| | | can_msgs = None |
| | | if not self.conn.CANtoIsoTPQueue.empty(): |
| | | can_msgs = self.conn.CANtoIsoTPQueue.get() |
| | | # can_num = self.conn.CANtoIsoTPQueue.size() |
| | | # # can_num = self._usbcan.GetReceiveNum(self.devicedescription.channel) |
| | | # if can_num and not self._terminated: |
| | | # read_cnt = MAX_RCV_NUM if can_num >= MAX_RCV_NUM else can_num |
| | | # for i in range(read_cnt): |
| | | # can_msgs[i] = self.conn.CANtoIsoTPQueue.get() |
| | | # else: |
| | | # can_msgs = None |
| | | if not self.CANtoIsoTPQueue.empty(): |
| | | can_msgs = self.CANtoIsoTPQueue.get() |
| | | return can_msgs |
| | | |
| | | def isotp_send(self, isotp_msg): |
| | |
| | | msg.channel = 0 |
| | | msg.data = isotp_msg.data # bytearray |
| | | msg.is_extended_id = False |
| | | # for i in range(isotp_msg.dlc): |
| | | # msg.data[i] = isotp_msg.data[i] |
| | | self.sendQueue.put(msg) |
| | | msg.timestamp = time.time() - self.start_timestamp |
| | | msg.is_error_frame = False |
| | | msg.is_remote_frame = False |
| | | self.Tool_send(msg) |
| | | |
| | | def Tool_send(self, msg: Message): |
| | | self._usbcan.send(msg) |
| | | if self.boot_logger is not None: |
| | | self.boot_logger.on_message_received(msg) |
| | | |
| | | def WidgetsInit(self): |
| | | # self.UI.setupUi(self) |
| | |
| | | self.UI.comboBox_2.addItem('USBCAN-II') |
| | | |
| | | # 波特率 |
| | | self.UI.comboBox_3.addItems([('%dK' % (i/1000)) |
| | | self.UI.comboBox_3.addItems([('%dK' % (i / 1000)) |
| | | for i in TIMING_DICT.keys()]) |
| | | # CHANNEL |
| | | self.UI.comboBox_5.addItems(['CH1/3', 'CH2/4']) |
| | |
| | | |
| | | self.UI.comboBox_9.addItems(['0x00使能收发', '0x01能收禁发', '0x03禁止收发']) |
| | | self.UI.comboBox_9.activated.connect(self.communicationControl_req) |
| | | self.UI.comboBox_6.addItems(list(DID_dic.keys())) |
| | | self.DID_display() |
| | | self.UI.comboBox_6.activated.connect(self.DID_display) |
| | | self.UI.comboBox_7.addItems(list(DTCGroup_dic.keys())) |
| | | self.UI.comboBox_7.activated.connect(self.ClearDTC) |
| | | |
| | | self.UI.pushButton_31.clicked.connect(self.SecurityUnlockLevel_1) |
| | | self.UI.pushButton_32.clicked.connect(self.SecurityUnlockLevel_2) |
| | | |
| | | self.UI.pushButton_12.clicked.connect(self.ReadSupplyID) |
| | | self.UI.pushButton_4.clicked.connect(self.ReadVIN) |
| | | self.UI.pushButton_6.clicked.connect(self.ReadMfgDate) |
| | | self.UI.pushButton_14.clicked.connect(self.ReadDataByID) |
| | | self.UI.pushButton_3.clicked.connect(self.WriteDataByID) |
| | | self.UI.pushButton_9.clicked.connect(self.StartCalibraiton) |
| | | self.UI.pushButton_11.clicked.connect(self.Calibraiton_Z) |
| | | self.UI.pushButton_13.clicked.connect(self.Calibraiton_M) |
| | | self.UI.pushButton_18.clicked.connect(self.Calibraiton_MP) |
| | | self.UI.pushButton_22.clicked.connect(self.Calibraiton_MN) |
| | | self.UI.pushButton_24.clicked.connect(self.Calibraiton_X2) |
| | | self.UI.pushButton_26.clicked.connect(self.Calibraiton_X1) |
| | | self.UI.pushButton_27.clicked.connect(self.Calibraiton_Y1) |
| | | self.UI.pushButton_28.clicked.connect(self.Calibraiton_Y2) |
| | | self.UI.pushButton_29.clicked.connect(self.Calibraiton_GAP) |
| | | |
| | | self.UI.radioButton.toggled.connect(self.TestPresentEvent) |
| | | |
| | | self.UI.pushButton_48.clicked.connect(self.start_programming) |
| | | self.UI.pushButton_46.clicked.connect(self.loadAppfile) |
| | | self.UI.radioButton_2.toggled.connect(self.SetFromBootFlag) |
| | | |
| | | self.UI.pushButton_47.clicked.connect(self.SetBootLog) |
| | | # self.UI.comboBox_10.activated.connect(self.ReportByMask) |
| | | |
| | | self.UI.comboBox_11.addItems(list(ReportBymask_DTC.keys())) |
| | | self.UI.comboBox_12.addItems(list(DTC_Dic.keys())) |
| | | self.UI.comboBox_13.addItems(list(DTC_Control_dic.keys())) |
| | | self.UI.comboBox_13.activated.connect(self.DTC_Control) |
| | | |
| | | self.UI.pushButton_49.clicked.connect(self.ReportSupportDTC) |
| | | self.UI.comboBox_11.activated.connect(self.ReportByMask) |
| | | self.UI.comboBox_12.activated.connect(self.ReportSnapshotByDTC) |
| | | |
| | | def _formatMsgData(self, index, item, received): |
| | | '''msg data to list |
| | |
| | | else: |
| | | data.append('标准帧') |
| | | |
| | | data.append(' '.join(['0x' + |
| | | '{:0<2x}'.format(a).upper() for a in list(item.data)])) |
| | | data.append(' '.join( |
| | | ['0x' + '{:0>2x}'.format(a).upper() for a in list(item.data)])) |
| | | return data |
| | | |
| | | def can_thread_stop(self): |
| | |
| | | self.can_thread.start() |
| | | |
| | | def can_thread(self): |
| | | while self._isOpen.value == 1: |
| | | while True: |
| | | time.sleep(0.01) |
| | | if self.needdisconnect.value == 1: |
| | | ret = self._usbcan.CloseDevice() |
| | | if ret == 1: |
| | | self.needdisconnect.value = 0 |
| | | self._isOpen.value = 0 |
| | | else: |
| | | msg, num = self._usbcan.Receive(len=10) |
| | | if not num == 0: |
| | | for i in range(num): |
| | | if msg[i].arbitration_id == self.shifter.canid.phy_txId: |
| | | self.conn.CANtoIsoTPQueue.put( |
| | | msg[i]) # send msg to isotp |
| | | self.msgQueue.put(msg[i]) |
| | | # send signal to update screen |
| | | # logger.info('Rx: ID=0x{:0<3x} '.format(msg[i].arbitration_id) + ' '.join(['0x' + |
| | | # '{:0<2x}'.format(a).upper() for a in list(msg[i].data)])) |
| | | g_signal.sig_MsgReceived.emit(num) |
| | | msgToSendCnt = self.sendQueue.qsize() |
| | | if msgToSendCnt > 0: |
| | | msg = Message() |
| | | for i in range(msgToSendCnt): |
| | | while self._isOpen.value == 1: |
| | | time.sleep(0.01) |
| | | if self.needdisconnect.value == 1: |
| | | ret = self._usbcan.CloseDevice() |
| | | if ret == 1: |
| | | self.needdisconnect.value = 0 |
| | | self._isOpen.value = 0 |
| | | else: |
| | | msg, num = self._usbcan.Receive(len=1) |
| | | if not num == 0: |
| | | for i in range(num): |
| | | if msg[i].arbitration_id == self.shifter.canid.phy_txId: |
| | | # conn.send(msg[i]) # pipe connection send |
| | | self.CANtoIsoTPQueue.put( |
| | | msg[i]) # send msg to isotp |
| | | |
| | | msg = self.sendQueue.get() |
| | | # logger.info('Tx: ID=0x{:0<3x} '.format( |
| | | # msg.arbitration_id)+'DLC={} '.format(msg.dlc)+'externd flag ={} '.format(msg.is_extended_id)+'remote frame:{} '.format(msg.is_remote_frame)) |
| | | self._usbcan.send(msg) |
| | | self.msgQueue.put(msg[i]) |
| | | if self.boot_logger is not None: |
| | | self.boot_logger.on_message_received(msg[i]) |
| | | # logger.info('time:'.format(msg[i].timestamp)+'Rx: ID=0x{:0<3x} '.format(msg[i].arbitration_id) + ' '.join(['0x' + |
| | | # '{:0<2x}'.format(a).upper() for a in list(msg[i].data)])) |
| | | |
| | | # send signal to update screen |
| | | |
| | | g_signal.sig_MsgReceived.emit(num) |
| | | |
| | | # msgToSendCnt = self.sendQueue.qsize() |
| | | # if msgToSendCnt > 0: |
| | | # msg = Message() |
| | | # for i in range(msgToSendCnt): |
| | | |
| | | # msg = self.sendQueue.get() |
| | | # # logger.info('Tx: ID=0x{:0<3x} '.format( |
| | | # # msg.arbitration_id)+'DLC={} '.format(msg.dlc)+'externd flag ={} '.format(msg.is_extended_id)+'remote frame:{} '.format(msg.is_remote_frame)) |
| | | # self.Tool_send(msg) |
| | | def send_dump(self): |
| | | msg = Message() |
| | | msg.arbitration_id = self.shifter.canid.phy_rxId |
| | | msg.dlc = 8 |
| | | msg.channel = 0 |
| | | msg.data = [0x02, 0x3e, 0x80, 0, 0, 0, 0, 0] |
| | | msg.is_extended_id = False |
| | | msg.is_remote_frame = False |
| | | msg.is_error_frame = False |
| | | msg.timestamp = time.time() - self.start_timestamp |
| | | self.Tool_send(msg) |
| | | |
| | | def send_VehiclePosition(self, data=[]): |
| | | msg = Message() |
| | | msg.arbitration_id = 0x10 |
| | | msg.dlc = 8 |
| | | msg.channel = 0 |
| | | msg.data = data |
| | | msg.is_extended_id = False |
| | | msg.is_remote_frame = False |
| | | msg.is_error_frame = False |
| | | msg.timestamp = time.time() - self.start_timestamp |
| | | self.Tool_send(msg) |
| | | |
| | | def open_close(self): |
| | | if self._isOpen.value == 0: |
| | | if self._usbcan is None: |
| | | self.update_HardwareDevice() |
| | | can_filters = [ |
| | | {'can_id': 0x420, 'can_mask': 0xFFFFFFFF}] |
| | | can_filters = [{'can_id': 0x420, 'can_mask': 0xFFFFFFFF}] |
| | | |
| | | bitrate = list(TIMING_DICT.keys())[ |
| | | self.devicedescription.baudrate] |
| | | self._usbcan = USBCAN(device_type=4, device_index=0, |
| | | can_index=0, bitrate=bitrate, can_filters=can_filters) |
| | | bitrate = list( |
| | | TIMING_DICT.keys())[self.devicedescription.baudrate] |
| | | self._usbcan = USBCAN(device_type=4, |
| | | device_index=0, |
| | | can_index=self.devicedescription.channel, |
| | | bitrate=bitrate, |
| | | can_filters=can_filters) |
| | | if not self._usbcan.InitAndStart(): |
| | | logger.info("Open usbcan device fail.") |
| | | |
| | |
| | | self._deviceOpenUpdate() |
| | | self.can_thread_start() |
| | | self.conn.open() # start iso tp thread |
| | | self.send_dump() |
| | | self.send_dump() |
| | | |
| | | else: |
| | | self._usbcan.InitAndStart() |
| | |
| | | else: |
| | | self.needdisconnect.value = 1 |
| | | self._deviceOpenUpdate() |
| | | self.TestPresentTimer.stop() |
| | | |
| | | def closeEvent(self, e): |
| | | self.boot_logger.stop() |
| | | |
| | | def update_HardwareDevice(self): |
| | | |
| | | self.devicedescription.device_type = self.UI.comboBox_2.currentIndex()+3 |
| | | self.devicedescription.device_type = self.UI.comboBox_2.currentIndex( |
| | | ) + 3 |
| | | self.devicedescription.channel = self.UI.comboBox_5.currentIndex() |
| | | self.devicedescription.baudrate = self.UI.comboBox_3.currentIndex() |
| | | |
| | |
| | | self.UI.pushButton.setText("Close CAN") |
| | | elif self.needdisconnect.value == 1: |
| | | self.UI.pushButton.setText("Open CAN") |
| | | |
| | | # set ui info |
| | | |
| | | def _StartlistenMsgProcess(self): |
| | | self.msgProcess = Process( |
| | | name='pyUSBCANListener', |
| | | target=self._usbcan.ListeningMsg, |
| | | args=(self._isOpen, self.needdisconnect, self.msgQueue, self.sendQueue)) |
| | | self.msgProcess = Process(name='pyUSBCANListener', |
| | | target=self._usbcan.ListeningMsg, |
| | | args=(self._isOpen, self.needdisconnect, |
| | | self.msgQueue, self.sendQueue)) |
| | | self.msgProcess.daemon = True |
| | | self.msgProcess.start() |
| | | # 1.5s后检测连接状态,该值可能需要标定 |
| | |
| | | self._updateRootList() |
| | | time.sleep(0.1) |
| | | |
| | | def disp_string(self, out): |
| | | self.UI.statusbar.showMessage(str(out)) |
| | | def disp_string(self, out_s): |
| | | dt = datetime.datetime.now() |
| | | nowtime_str = dt.strftime('%I:%M:%S') # time |
| | | self.UI.textEdit_2.insertPlainText(nowtime_str + ': ') |
| | | cursor = self.UI.textEdit_2.textCursor() |
| | | cursor.movePosition(QtGui.QTextCursor.End) |
| | | self.UI.textEdit_2.setTextCursor(cursor) |
| | | self.UI.textEdit_2.insertPlainText(str(out_s + "\t\n")) |
| | | |
| | | # self.statusbar.showMessage(str(out)) |
| | | def dispShiftstatus(self): |
| | | self.UI.pushButton_41.setStyleSheet( |
| | | Style_dic[self.shifter.UnlockButton]) |
| | | self.UI.pushButton_40.setStyleSheet(Style_dic[self.shifter.Pbutton]) |
| | | #X2 |
| | | self.UI.pushButton_37.setStyleSheet( |
| | | Style_dic[self.shifter.position == 2]) |
| | | #X1 |
| | | self.UI.pushButton_38.setStyleSheet( |
| | | Style_dic[self.shifter.position == 3]) |
| | | #Z |
| | | self.UI.pushButton_30.setStyleSheet( |
| | | Style_dic[self.shifter.position == 0]) |
| | | #Y1 |
| | | self.UI.pushButton_33.setStyleSheet( |
| | | Style_dic[self.shifter.position == 4]) |
| | | #Y2 |
| | | self.UI.pushButton_34.setStyleSheet( |
| | | Style_dic[self.shifter.position == 5]) |
| | | #M |
| | | self.UI.pushButton_39.setStyleSheet( |
| | | Style_dic[self.shifter.position == 0xe]) |
| | | #M+ |
| | | self.UI.pushButton_35.setStyleSheet( |
| | | Style_dic[self.shifter.position == 0xd]) |
| | | #M- |
| | | self.UI.pushButton_36.setStyleSheet( |
| | | Style_dic[self.shifter.position == 0xc]) |
| | | self.UI.pushButton_42.setStyleSheet( |
| | | ColorStyle_dic[self.Vehicle.ShiftLeverPos == "P"]) |
| | | self.UI.pushButton_43.setStyleSheet( |
| | | ColorStyle_dic[self.Vehicle.ShiftLeverPos == "D"]) |
| | | self.UI.pushButton_44.setStyleSheet( |
| | | ColorStyle_dic[self.Vehicle.ShiftLeverPos == "N"]) |
| | | self.UI.pushButton_45.setStyleSheet( |
| | | ColorStyle_dic[self.Vehicle.ShiftLeverPos == "R"]) |
| | | |
| | | def _updateRootList(self): |
| | | _dataSize = self.msgQueue.qsize() |
| | |
| | | for i in range(_dataSize): |
| | | receiveNum += 1 |
| | | msg = self.msgQueue.get() |
| | | if msg.arbitration_id == 0x420: |
| | | self.shifter.FramUnpack(msg.arbitration_id, msg.data) |
| | | resp_data = self.Vehicle.ShiftLogic(self.shifter) |
| | | self.send_VehiclePosition(resp_data) |
| | | formateddata.append(self._formatMsgData( |
| | | receiveNum, msg, True)) # return a data list |
| | | self._insertDataSmooth(data=formateddata, datasize=_dataSize) |
| | | self.dispShiftstatus() |
| | | |
| | | def _insertDataSmooth(self, data, datasize): |
| | | # row = 6-datasize |
| | |
| | | |
| | | def TestPresentEvent(self): |
| | | if self.UI.radioButton.isChecked(): |
| | | self.TestPresentTimer.start(4000) |
| | | self.TestPresentTimer.start(3000) |
| | | else: |
| | | self.TestPresentTimer.stop() |
| | | |
| | | def creat_testpresentReq(self): |
| | | self.udsclient.tester_present() |
| | | self.send_dump() |
| | | |
| | | def sessioncontrol_req(self): |
| | | sesson = self.UI.comboBox.currentIndex() + 1 |
| | |
| | | |
| | | # self.UI.comboBox_8.addItems(['0x01硬件复位', '0x03软件复位']) |
| | | def MCUReset_req(self): |
| | | req = 0x03 if self.UI.comboBox_8.currentData() == '0x01硬件复位' else 0x01 |
| | | req = 0x03 if self.UI.comboBox_8.currentText() == '0x01硬件复位' else 0x01 |
| | | self.udsclient.ecu_reset(req) |
| | | |
| | | # self.UI.comboBox_9.addItems(['0x00使能收发', '0x01能收禁发', '0x03禁止收发']) |
| | | def ClearDTC(self): |
| | | self.udsclient.clear_dtc( |
| | | group=DTCGroup_dic[self.UI.comboBox_7.currentText()]) |
| | | |
| | | def DTC_Control(self): |
| | | self.udsclient.control_dtc_setting( |
| | | setting_type=DTC_Control_dic[self.UI.comboBox_13.currentText()]) |
| | | |
| | | def Security_req(self): |
| | | self.udsclient.unlock_security_access(1) |
| | | # Security_dic[self.UI.comboBox_8.currentData()] |
| | | |
| | | def SecurityUnlockLevel_1(self): |
| | | data = self.udsclient.unlock_security_access(1) |
| | | if data.positive: |
| | | self.UI.pushButton_31.setStyleSheet( |
| | | "background-color:rgb(255, 85, 127)") |
| | | |
| | | def SecurityUnlockLevel_2(self): |
| | | data = self.udsclient.unlock_security_access(9) |
| | | if data.positive: |
| | | self.UI.pushButton_32.setStyleSheet( |
| | | "background-color:rgb(255, 85, 127)") |
| | | |
| | | def SecurityUnlockLevel_3(self): |
| | | data = self.udsclient.unlock_security_access(0x7d) |
| | | if data.positive: |
| | | self.UI.pushButton_32.setStyleSheet( |
| | | "background-color:rgb(0, 128, 0)") |
| | | return data |
| | | |
| | | def ReadSupplyID(self): |
| | | data = self.ReadByDID(0xF18A) |
| | | if data is not None: |
| | | self.UI.lineEdit_8.setText(str(data[0xF18A])) |
| | | |
| | | def ReadVIN(self): |
| | | data = self.ReadByDID(0xF190) |
| | | if data is not None: |
| | | self.UI.lineEdit_3.setText(str(data[0xF190])) |
| | | |
| | | def ReadMfgDate(self): |
| | | data = self.ReadByDID(0x210B) |
| | | if data is not None: |
| | | self.UI.lineEdit_5.setText(str(data[0x210B])) |
| | | |
| | | def ReadByDID(self, didlist: List): |
| | | values = {} |
| | | try: |
| | | response = self.udsclient.read_data_by_identifier(didlist) |
| | | if response.positive: |
| | | values = response.service_data.values |
| | | return values |
| | | except Exception as e: |
| | | g_signal.sig_Disp_str.emit(e) |
| | | |
| | | def DID_display(self): |
| | | # print(self.UI.comboBox_6.currentText()) |
| | | self.UI.lineEdit.setText("0x{:0<4x}".format( |
| | | (DID_dic[self.UI.comboBox_6.currentText()]))) |
| | | |
| | | def ReadDataByID(self): |
| | | tempdid = DID_dic[self.UI.comboBox_6.currentText()] |
| | | self.UI.lineEdit_2.clear() |
| | | data = self.ReadByDID(tempdid) |
| | | if data is not None and len(str(data[tempdid])): |
| | | # print(data[tempdid]) |
| | | self.UI.lineEdit_2.setText(data[tempdid]) |
| | | |
| | | def WriteDataByID(self): |
| | | tempdid = DID_dic[self.UI.comboBox_6.currentText()] |
| | | writedata = self.UI.lineEdit_2.text() |
| | | try: |
| | | response = self.udsclient.write_data_by_identifier( |
| | | tempdid, writedata) |
| | | if response.positive: |
| | | values = response.service_data.values |
| | | except Exception as e: |
| | | g_signal.sig_Disp_str.emit(e) |
| | | |
| | | def communicationControl_req(self): |
| | | req = 0x00 |
| | | select = self.UI.comboBox_9.currentData() |
| | | select = self.UI.comboBox_9.currentText() |
| | | if select == '0x00使能收发': |
| | | req = 0 |
| | | elif select == '0x01能收禁发': |
| | | req = 1 |
| | | else: |
| | | req = 3 |
| | | self.udsclient.communication_control( |
| | | control_type=0, communication_type=req) |
| | | try: |
| | | self.udsclient.communication_control(control_type=req, |
| | | communication_type=1) |
| | | except Exception as e: |
| | | self.disp_string('%s' % e) |
| | | |
| | | def StartCalibraiton(self): |
| | | sesson = 3 |
| | | self.UI.comboBox.setCurrentIndex(sesson - 1) |
| | | response = self.udsclient.change_session(sesson) |
| | | if response.positive: |
| | | response = self.SecurityUnlockLevel_3() |
| | | if response.positive: |
| | | startcmd = bytearray([0x8, 0x0, 0x0, 0x0]) |
| | | if self.RequestRoutControl(0xFFAA, bytes(startcmd)): |
| | | self.UI.pushButton_9.setStyleSheet( |
| | | "background-color:rgb(255, 85, 127)") |
| | | |
| | | def Calibraiton_Z(self): |
| | | calcmd = bytearray([0x1, 0x0, 0x0, 0x0]) |
| | | if self.RequestRoutControl(0xFFAA, bytes(calcmd)): |
| | | self.UI.pushButton_11.setStyleSheet( |
| | | "background-color:rgb(255, 85, 127)") |
| | | |
| | | def Calibraiton_M(self): |
| | | calcmd = bytearray([0x0, 0x0, 0x0, 0x0]) |
| | | if self.RequestRoutControl(0xFFAA, bytes(calcmd)): |
| | | self.UI.pushButton_13.setStyleSheet( |
| | | "background-color:rgb(255, 85, 127)") |
| | | |
| | | def Calibraiton_MP(self): |
| | | calcmd = bytearray([0x2, 0x0, 0x0, 0x0]) |
| | | if self.RequestRoutControl(0xFFAA, bytes(calcmd)): |
| | | self.UI.pushButton_18.setStyleSheet( |
| | | "background-color:rgb(255, 85, 127)") |
| | | |
| | | def Calibraiton_MN(self): |
| | | calcmd = bytearray([0x3, 0x0, 0x0, 0x0]) |
| | | if self.RequestRoutControl(0xFFAA, bytes(calcmd)): |
| | | self.UI.pushButton_22.setStyleSheet( |
| | | "background-color:rgb(255, 85, 127)") |
| | | |
| | | def Calibraiton_X2(self): |
| | | calcmd = bytearray([0x4, 0x0, 0x0, 0x0]) |
| | | if self.RequestRoutControl(0xFFAA, bytes(calcmd)): |
| | | self.UI.pushButton_24.setStyleSheet( |
| | | "background-color:rgb(255, 85, 127)") |
| | | |
| | | def Calibraiton_X1(self): |
| | | calcmd = bytearray([0x5, 0x0, 0x0, 0x0]) |
| | | if self.RequestRoutControl(0xFFAA, bytes(calcmd)): |
| | | self.UI.pushButton_26.setStyleSheet( |
| | | "background-color:rgb(255, 85, 127)") |
| | | |
| | | def Calibraiton_Y1(self): |
| | | calcmd = bytearray([0x6, 0x0, 0x0, 0x0]) |
| | | if self.RequestRoutControl(0xFFAA, bytes(calcmd)): |
| | | self.UI.pushButton_27.setStyleSheet( |
| | | "background-color:rgb(255, 85, 127)") |
| | | |
| | | def Calibraiton_Y2(self): |
| | | calcmd = bytearray([0x7, 0x0, 0x0, 0x0]) |
| | | if self.RequestRoutControl(0xFFAA, bytes(calcmd)): |
| | | self.UI.pushButton_28.setStyleSheet( |
| | | "background-color:rgb(255, 85, 127)") |
| | | |
| | | def Calibraiton_GAP(self): |
| | | calcmd = bytearray([0x9, 0x0, 0x0, 0x0]) |
| | | if self.RequestRoutControl(0xFFAA, bytes(calcmd)): |
| | | self.UI.pushButton_29.setStyleSheet( |
| | | "background-color:rgb(255, 85, 127)") |
| | | |
| | | def RequestRoutControl(self, routine_id, data): |
| | | resp_1 = self.udsclient.start_routine(routine_id=routine_id, data=data) |
| | | return resp_1.positive |
| | | |
| | | def ReportSupportDTC(self): |
| | | resp_1 = self.udsclient.get_supported_dtc() |
| | | if resp_1.positive: |
| | | for i in range(len(resp_1.data) // 4): |
| | | a = i * 4 + 2 |
| | | b = i * 4 + 3 |
| | | c = i * 4 + 4 |
| | | d = i * 4 + 5 |
| | | dtc_hex = resp_1.data[a] << 16 | resp_1.data[ |
| | | b] << 8 | resp_1.data[c] << 0 |
| | | log = 'DTC:0x%.6x Status:0x%.2x ' % ( |
| | | dtc_hex, resp_1.data[d]) + DTC_DescriptionDic[dtc_hex] |
| | | for j in range(8): |
| | | if resp_1.data[d] >> j & 0x1: |
| | | log += ' bit%d' % j |
| | | self.disp_string(log) |
| | | |
| | | def ReportByMask(self): |
| | | mask = ReportBymask_DTC[self.UI.comboBox_11.currentText()] |
| | | resp_1 = self.udsclient.get_dtc_by_status_mask(status_mask=mask) |
| | | if resp_1.positive: |
| | | for i in range(len(resp_1.data) // 4): |
| | | a = i * 4 + 2 |
| | | b = i * 4 + 3 |
| | | c = i * 4 + 4 |
| | | d = i * 4 + 5 |
| | | dtc_hex = resp_1.data[a] << 16 | resp_1.data[ |
| | | b] << 8 | resp_1.data[c] << 0 |
| | | log = 'DTC:0x%.2x%.2x%.2x Status:0x%.2x %s ' % ( |
| | | resp_1.data[a], resp_1.data[b], resp_1.data[c], |
| | | resp_1.data[d], DTC_DescriptionDic[dtc_hex]) |
| | | for j in range(8): |
| | | if resp_1.data[d] >> j & 0x1: |
| | | log += 'bit%d ' % j |
| | | self.disp_string(log) |
| | | |
| | | def ReportSnapshotByDTC(self): |
| | | dtc = DTC_Dic[self.UI.comboBox_12.currentText()][ |
| | | self.UI.comboBox_12.currentIndex()] |
| | | try: |
| | | resp = self.udsclient.get_dtc_snapshot_by_dtc_number( |
| | | dtc=dtc, record_number=1) |
| | | if resp.positive: |
| | | number_of_did = resp.data[6] |
| | | status = resp.data[4] |
| | | print(status) |
| | | dtc = resp.service_data.dtcs[0] |
| | | snapshots = dtc.snapshots |
| | | # print(dtc.id) |
| | | # print(dtc.status.get_byte()) |
| | | log = "DTC:0x%.6x Status:0x%.2x" % (dtc.id, status) + ' ' |
| | | # print(type(snapshots[0].did)) |
| | | # print(type(snapshots[0].data)) |
| | | for i in range(number_of_did): |
| | | log += (DTC_SanpshotDescriptionDic[snapshots[i].did] + |
| | | ':' + snapshots[i].data + ' ') |
| | | self.disp_string(log) |
| | | |
| | | # print(snapshots[i].did, snapshots[i].data) |
| | | except Exception as e: |
| | | self.disp_string('%s' % e) |
| | | |
| | | def SetFromBootFlag(self): |
| | | self.startfromboot = self.UI.radioButton_2.isChecked() |
| | | |
| | | def pre_programming(self): |
| | | if not self.startfromboot: |
| | | self.disp_string('# 预编程步骤') |
| | | |
| | | # 进入extended session |
| | | self.disp_string('>>> 进入扩展模式') |
| | | response = self.udsclient.change_session(3) |
| | | if response.positive: |
| | | self.disp_string('>>> 解锁安全访问') |
| | | response = self.udsclient.unlock_security_access(9) |
| | | |
| | | if response.positive: |
| | | # 检查编程条件 |
| | | self.disp_string('>>> 检查编程条件') |
| | | response = self.udsclient.start_routine(0xff02) |
| | | if response.positive: |
| | | # 关闭DTC的存储 |
| | | self.disp_string('>>> 关闭DTC的存储') |
| | | response = self.udsclient.control_dtc_setting(2) |
| | | # print(response) |
| | | if response.positive: |
| | | # 关闭与诊断无关的报文 |
| | | self.disp_string('>>> 关闭与诊断无关的报文') |
| | | response = self.udsclient.communication_control(0x03, 0x01) |
| | | else: |
| | | self.disp_string('>>> 预编程失败') |
| | | |
| | | return response.positive |
| | | # print(response) |
| | | |
| | | def main_programming(self): |
| | | self.disp_string('# 主编程步骤') |
| | | |
| | | if self.startfromboot: |
| | | self.disp_string('>>> 进入扩展模式') |
| | | response = self.udsclient.change_session(3) |
| | | |
| | | # 进入programming session |
| | | self.disp_string('>>> 进入编程模式') |
| | | response = self.udsclient.change_session(2) |
| | | # print(response) |
| | | if response.positive: |
| | | # 安全访问 |
| | | self.disp_string('>>> 安全访问') |
| | | response = self.udsclient.unlock_security_access(9) |
| | | # print(response) |
| | | if response.positive: |
| | | self.disp_string('>>> 请求下载驱动文件') |
| | | address = 0x20004600 |
| | | memorysize = 10 |
| | | memory_location = MemoryLocation(address, memorysize, 32, 32) |
| | | response = self.udsclient.request_download(memory_location) |
| | | # print(response) |
| | | if response.positive: |
| | | # 发送driver文件 |
| | | self.disp_string('>>> 发送driver文件') |
| | | drv_file = self.drv_data |
| | | response = self.udsclient.transfer_data(1, bytes(drv_file)) |
| | | self.set_progressbar_pos(len(drv_file)) |
| | | # print(response) |
| | | if response.positive: |
| | | self.disp_string('>>> 发送驱动结束,请求推出') |
| | | response = self.udsclient.request_transfer_exit() |
| | | # print(response) |
| | | if response.positive: |
| | | # driver文件完整性检验 |
| | | self.disp_string('>>> driver文件完整性检验') |
| | | drivecrc = bytes([0x9B, 0x39, 0xf2, 0xec]) |
| | | response = self.udsclient.start_routine(0xf001, drivecrc) |
| | | # print(response) |
| | | if response.positive: |
| | | app_file = self.app_data |
| | | address = app_file[APP_ADDR_LOCATION_OFFSET] << 24 | app_file[ |
| | | APP_ADDR_LOCATION_OFFSET + 1] << 16 | app_file[ |
| | | APP_ADDR_LOCATION_OFFSET + |
| | | 2] << 8 | app_file[APP_ADDR_LOCATION_OFFSET + 3] << 0 |
| | | memorysize = app_file[APP_LENGTH_LOCATION_OFFSET] << 24 | app_file[ |
| | | APP_LENGTH_LOCATION_OFFSET + 1] << 16 | app_file[ |
| | | APP_LENGTH_LOCATION_OFFSET + |
| | | 2] << 8 | app_file[APP_LENGTH_LOCATION_OFFSET + 3] << 0 |
| | | memory_location = MemoryLocation(address, memorysize, 32, 32) |
| | | |
| | | # 删除app存储空间 |
| | | self.disp_string('>>> 删除app存储空间') |
| | | data = b'' |
| | | data += memory_location.alfid.get_byte() |
| | | data += memory_location.get_address_bytes() |
| | | data += memory_location.get_memorysize_bytes() |
| | | response = self.udsclient.start_routine(0xff00, data) |
| | | # print(response) |
| | | if response.positive: |
| | | # 发送app文件 |
| | | self.disp_string('>>> 发送app文件') |
| | | response = self.udsclient.request_download(memory_location) |
| | | # print(response) |
| | | |
| | | max_length = response.service_data.max_length |
| | | |
| | | # 有效数据长度, 去除sid和sequence两个字节 |
| | | payload_length = max_length - 2 |
| | | |
| | | count = (len(app_file) + payload_length - 1) // payload_length |
| | | |
| | | base = self.get_progressbar_pos() |
| | | for i in range(count): |
| | | start = i * payload_length |
| | | end = start + payload_length |
| | | response = self.udsclient.transfer_data((i + 1) % 256, |
| | | app_file[start:end]) |
| | | self.set_progressbar_pos(base + end) |
| | | # print(response) |
| | | if response.positive: |
| | | response = self.udsclient.request_transfer_exit() |
| | | # print(response) |
| | | if response.positive: |
| | | # app文件完整性检验 |
| | | self.disp_string('>>> app文件完整性检验') |
| | | response = self.udsclient.start_routine(0xf001, app_file[0:4]) |
| | | # print(response) |
| | | return response.positive |
| | | |
| | | def post_programming(self): |
| | | |
| | | self.disp_string('# 后编程步骤') |
| | | |
| | | # 回到default session |
| | | self.disp_string('>>> 回到默认模式') |
| | | |
| | | response = self.udsclient.change_session(1) |
| | | return response.positive |
| | | |
| | | def start_programming(self): |
| | | self.UI.pushButton_48.setDisabled(True) |
| | | self.set_progressbar_pos(0) |
| | | |
| | | t1 = time.time() |
| | | |
| | | try: |
| | | |
| | | self.pre_programming() |
| | | self.main_programming() |
| | | self.post_programming() |
| | | except Exception as e: |
| | | |
| | | self.disp_string('%s' % e) |
| | | # self.UI.pushButton_48.setDisabled(False) |
| | | |
| | | t2 = time.time() |
| | | |
| | | self.disp_string('finished in %.2f sec' % (t2 - t1)) |
| | | self.in_programming = False |
| | | self.UI.pushButton_48.setDisabled(False) |
| | | |
| | | def set_progressbar_pos(self, pos): |
| | | |
| | | drv_data_len = len(self.drv_data) if self.drv_data is not None else 0 |
| | | app_data_len = len(self.app_data) if self.app_data is not None else 0 |
| | | total_len = drv_data_len + app_data_len |
| | | if pos > total_len: |
| | | pos = total_len |
| | | elif pos < 0: |
| | | pos = 0 |
| | | |
| | | self.UI.progressBar.setValue(pos) |
| | | |
| | | def get_progressbar_pos(self): |
| | | return self.UI.progressBar.value() |
| | | |
| | | def loadAppfile(self): |
| | | if self.in_programming: |
| | | self.disp_string('## 正在编程中') |
| | | return |
| | | |
| | | fileName, _ = QtWidgets.QFileDialog.getOpenFileName( |
| | | None, 'comboBox_4', '.', |
| | | 'All Files (*);;Bin Files (*.bin);;Hex Files (*.hex)') |
| | | |
| | | if fileName != '': |
| | | self.UI.comboBox_4.addItem(fileName) |
| | | self.UI.comboBox_4.setCurrentText(fileName) |
| | | |
| | | # with open(fileName, 'rb') as fd: |
| | | # self.app_data = fd.read() |
| | | # drv_data_len = len( |
| | | # self.drv_data) if self.drv_data is not None else 0 |
| | | # app_data_len = len( |
| | | # self.app_data) if self.app_data is not None else 0 |
| | | # total_len = drv_data_len + app_data_len |
| | | # self.UI.progressBar.setRange(0, total_len) |
| | | |
| | | # print(self.app_data) |
| | | # print("===============================================") |
| | | hex_read = Hex_read() |
| | | hex_read.Open_file(fileName) |
| | | self.app_data = hex_read.data |
| | | drv_data_len = len( |
| | | self.drv_data) if self.drv_data is not None else 0 |
| | | app_data_len = len( |
| | | self.app_data) if self.app_data is not None else 0 |
| | | total_len = drv_data_len + app_data_len |
| | | self.UI.progressBar.setRange(0, total_len) |
| | | |
| | | def SetBootLog(self): |
| | | if self.in_programming: |
| | | self.disp_string('## 正在编程中') |
| | | return |
| | | |
| | | fileName, _ = QtWidgets.QFileDialog.getSaveFileName( |
| | | None, 'comboBox_10', '.', |
| | | 'All Files (*);;ASC Files (*.asc);;Blf Files (*.blf)') |
| | | |
| | | if fileName != '': |
| | | self.UI.comboBox_10.addItem(fileName) |
| | | self.UI.comboBox_10.setCurrentText(fileName) |
| | | print(fileName) |
| | | self.boot_logger = Logger(fileName) |
| | | # self.listener = [self.boot_logger] |
| | | # with open(fileName, 'rb') as fd: |
| | | # self.app_data = fd.read() |
| | | # drv_data_len = len( |
| | | # self.drv_data) if self.drv_data is not None else 0 |
| | | # app_data_len = len( |
| | | # self.app_data) if self.app_data is not None else 0 |
| | | # total_len = drv_data_len + app_data_len |
| | | # self.UI.progressBar.setRange(0, total_len) |