| | |
| | | |
| | | from ast import Pass |
| | | from ast import Not, Pass |
| | | from concurrent.futures import thread |
| | | from logging import exception |
| | | import threading |
| | | from typing import List |
| | | from can import BusABC, Message |
| | | from udsoncan.client import Client |
| | | from udsoncan.exceptions import TimeoutException |
| | |
| | | from isotp import CanMessage |
| | | import queue |
| | | from USBCAN import * |
| | | from Shifter import ShifterClass |
| | | from Shifter import AsciiCodec, ShifterClass |
| | | from mainwindows import Ui_MainWindow |
| | | from PySide2 import QtWidgets, QtCore, QtGui |
| | | import struct |
| | | import time |
| | | import datetime |
| | | from multiprocessing import Array, Pool, Process, Queue, Value |
| | | from ShifterDefine import * |
| | | from multiprocessing import Array, Pool, Process, Queue, Value, Pipe |
| | | |
| | | MAX_RCV_NUM = 20 |
| | | |
| | |
| | | |
| | | |
| | | g_signal = UISignals() |
| | | uds_conn, isotp_conn = Pipe() |
| | | user_conn, drive_conn = Pipe() |
| | | |
| | | |
| | | class HardwreDevice(): |
| | |
| | | BaseConnection.__init__(self, name) |
| | | self.UDStoIsoTPQueue = queue.Queue() # USD --> isotp |
| | | self.IsoTPtoUDSQueue = queue.Queue() # ISOTP --> UDS |
| | | self.CANtoIsoTPQueue = queue.Queue() # CAN --> isotp |
| | | # self.IsoTPtoCANQueue = queue.Queue() # ISOTP--> CAN |
| | | self._read_thread = None |
| | | self.exit_requested = False |
| | |
| | | payload = payload[0:self.mtu] |
| | | |
| | | # isotp.protocol.TransportLayer uses byte array. udsoncan is strict on bytes format |
| | | self.UDStoIsoTPQueue.put(bytearray(payload)) |
| | | self.isotp_layer.send(bytearray(payload)) |
| | | # self.UDStoIsoTPQueue.put(bytearray(payload)) |
| | | |
| | | def specific_wait_frame(self, timeout=2): |
| | | if not self.opened: |
| | |
| | | |
| | | timedout = False |
| | | frame = None |
| | | # frame = uds_conn.recv() |
| | | try: |
| | | frame = self.IsoTPtoUDSQueue.get(block=True, timeout=timeout) |
| | | # frame = self.CANtoIsoTPQueue.get(block=True, timeout=timeout) |
| | | frame = self.IsoTPtoUDSQueue.get(block=True, timeout=5) |
| | | except queue.Empty: |
| | | timedout = True |
| | | # frame = self.CANtoIsoTPQueue.get(block=True, timeout=timeout) |
| | | |
| | | if timedout: |
| | | raise TimeoutException( |
| | |
| | | def rxthread_task(self): |
| | | while not self.exit_requested: |
| | | try: |
| | | # self.logger.debug("UDStoIsoTPQueue queue size is now %d" % ( |
| | | # self.UDStoIsoTPQueue.qsize())) |
| | | while not self.UDStoIsoTPQueue.empty(): |
| | | self.isotp_layer.send(self.UDStoIsoTPQueue.get()) |
| | | # # self.logger.debug("UDStoIsoTPQueue queue size is now %d" % ( |
| | | # # self.UDStoIsoTPQueue.qsize())) |
| | | # while not self.UDStoIsoTPQueue.empty(): |
| | | # self.isotp_layer.send(self.UDStoIsoTPQueue.get()) |
| | | |
| | | self.isotp_layer.process() |
| | | |
| | | while self.isotp_layer.available(): |
| | | # isotp_conn.send(self.isotp_layer.recv()) |
| | | self.IsoTPtoUDSQueue.put(self.isotp_layer.recv()) |
| | | # self.logger.debug("IsoTPtoUDSQueue queue size is now %d" % ( |
| | | # self.IsoTPtoUDSQueue.qsize())) |
| | | self.logger.debug("IsoTPtoUDSQueue queue size is now %d" % ( |
| | | self.IsoTPtoUDSQueue.qsize())) |
| | | |
| | | # time.sleep(self.isotp_layer.sleep_time()) |
| | | # time.sleep(self.isotp_layer.sleep_time()) |
| | | time.sleep(0.001) |
| | | |
| | | except Exception as e: |
| | |
| | | # self.protocol("WM_DELETE_WINDOW", self.Form_OnClosing) |
| | | self.msgQueue = Queue() # can layer receive queue |
| | | self.sendQueue = Queue() # can layer send queue |
| | | |
| | | self.CANtoIsoTPQueue = Queue() # CAN --> isotp |
| | | self.shifter = ShifterClass() |
| | | self.devicedescription = HardwreDevice() |
| | | self.windows = QtWidgets.QMainWindow() |
| | |
| | | self.DeviceInit() |
| | | self.WidgetsInit() |
| | | self.ChnInfoUpdate(self._isOpen) |
| | | self.drv_data: bytes = [0x1c, 0x01, 0x06, 0x80, |
| | | 0x1f, 0x01, 0x06, 0x80, 0xfb, 0x0a] |
| | | self.app_data: bytes = None |
| | | |
| | | def DeviceInit(self): |
| | | self._usbcan = None |
| | |
| | | self.udsclient = Client(self.conn, request_timeout=2) |
| | | self.udsclient.config['security_algo'] = self.SecAlgo |
| | | self.udsclient.config['security_algo_params'] = [ |
| | | 0x4FE87269, 0x6BC361D8, 0x9B127D51, 0x5BA41903] |
| | | 0x11223344, 0x20AA097B, 0x11223344, 0x70237577] |
| | | self.udsclient.config['data_identifiers'] = self.shifter.did_config |
| | | |
| | | self.udsclient.config['server_address_format'] = 32 |
| | |
| | | temp_key = (seed[0] << 24) | ( |
| | | seed[1] << 16) | (seed[2] << 8) | (seed[3]) |
| | | if level == 0x01: |
| | | output_key_temp = ((((temp_key >> 4) ^ temp_key) |
| | | << 3) ^ temp_key) & 0xFFFFFFFF |
| | | elif level == 0x11: |
| | | _temp_y = ((temp_key << 24) & 0xFF000000) + ((temp_key << 8) & |
| | | 0xFF0000) + ((temp_key >> 8) & 0xFF00) + ((temp_key >> 24) & 0xFF) |
| | | _temp_z = 0 |
| | | _temp_sum = 0 |
| | | for i in range(64): |
| | | _temp_y += ((((_temp_z << 4) ^ (_temp_z >> 5)) + _temp_z) |
| | | ^ (_temp_sum + params[_temp_sum & 0x3])) & 0xFFFFFFFF |
| | | _temp_y = _temp_y & 0xFFFFFFFF |
| | | _temp_sum += 0x8F750A1D |
| | | _temp_sum = _temp_sum & 0xFFFFFFFF |
| | | _temp_z += ((((_temp_y << 4) ^ (_temp_y >> 5)) + _temp_y) ^ |
| | | (_temp_sum + params[(_temp_sum >> 11) & 0x3])) & 0xFFFFFFFF |
| | | _temp_z = _temp_z & 0xFFFFFFFF |
| | | output_key_temp = (((_temp_z << 24) & 0xFF000000) | ((_temp_z << 8) & 0xFF0000) | ( |
| | | (_temp_z >> 8) & 0xFF00) | ((_temp_z >> 24) & 0xFF)) |
| | | output_key_temp = 0x20AA097B |
| | | # output_key_temp = ((((temp_key >> 4) ^ temp_key) |
| | | # << 3) ^ temp_key) & 0xFFFFFFFF |
| | | elif level == 0x09: |
| | | output_key_temp = 0x70237577 |
| | | # _temp_y = ((temp_key << 24) & 0xFF000000) + ((temp_key << 8) & |
| | | # 0xFF0000) + ((temp_key >> 8) & 0xFF00) + ((temp_key >> 24) & 0xFF) |
| | | # _temp_z = 0 |
| | | # _temp_sum = 0 |
| | | # for i in range(64): |
| | | # _temp_y += ((((_temp_z << 4) ^ (_temp_z >> 5)) + _temp_z) |
| | | # ^ (_temp_sum + params[_temp_sum & 0x3])) & 0xFFFFFFFF |
| | | # _temp_y = _temp_y & 0xFFFFFFFF |
| | | # _temp_sum += 0x8F750A1D |
| | | # _temp_sum = _temp_sum & 0xFFFFFFFF |
| | | # _temp_z += ((((_temp_y << 4) ^ (_temp_y >> 5)) + _temp_y) ^ |
| | | # (_temp_sum + params[(_temp_sum >> 11) & 0x3])) & 0xFFFFFFFF |
| | | # _temp_z = _temp_z & 0xFFFFFFFF |
| | | # output_key_temp = (((_temp_z << 24) & 0xFF000000) | ((_temp_z << 8) & 0xFF0000) | ( |
| | | # (_temp_z >> 8) & 0xFF00) | ((_temp_z >> 24) & 0xFF)) |
| | | else: |
| | | output_key_temp = temp_key |
| | | |
| | |
| | | '''receive msg from can layer |
| | | use queue read from can layer listen''' |
| | | can_msgs = None |
| | | if not self.conn.CANtoIsoTPQueue.empty(): |
| | | can_msgs = self.conn.CANtoIsoTPQueue.get() |
| | | # can_num = self.conn.CANtoIsoTPQueue.size() |
| | | # # can_num = self._usbcan.GetReceiveNum(self.devicedescription.channel) |
| | | # if can_num and not self._terminated: |
| | | # read_cnt = MAX_RCV_NUM if can_num >= MAX_RCV_NUM else can_num |
| | | # for i in range(read_cnt): |
| | | # can_msgs[i] = self.conn.CANtoIsoTPQueue.get() |
| | | # else: |
| | | # can_msgs = None |
| | | if not self.CANtoIsoTPQueue.empty(): |
| | | can_msgs = self.CANtoIsoTPQueue.get() |
| | | return can_msgs |
| | | |
| | | def isotp_send(self, isotp_msg): |
| | |
| | | msg.channel = 0 |
| | | msg.data = isotp_msg.data # bytearray |
| | | msg.is_extended_id = False |
| | | # for i in range(isotp_msg.dlc): |
| | | # msg.data[i] = isotp_msg.data[i] |
| | | self.sendQueue.put(msg) |
| | | self._usbcan.send(msg) |
| | | |
| | | def WidgetsInit(self): |
| | | # self.UI.setupUi(self) |
| | |
| | | |
| | | self.UI.comboBox_9.addItems(['0x00使能收发', '0x01能收禁发', '0x03禁止收发']) |
| | | self.UI.comboBox_9.activated.connect(self.communicationControl_req) |
| | | self.UI.comboBox_6.addItems(list(DID_dic.keys())) |
| | | self.DID_display() |
| | | self.UI.comboBox_6.activated.connect(self.DID_display) |
| | | |
| | | self.UI.pushButton_31.clicked.connect(self.SecurityUnlockLevel_1) |
| | | self.UI.pushButton_32.clicked.connect(self.SecurityUnlockLevel_3) |
| | | |
| | | self.UI.pushButton_12.clicked.connect(self.ReadSupplyID) |
| | | self.UI.pushButton_4.clicked.connect(self.ReadVIN) |
| | | self.UI.pushButton_6.clicked.connect(self.ReadMfgDate) |
| | | self.UI.pushButton_14.clicked.connect(self.ReadDataByID) |
| | | self.UI.pushButton_9.clicked.connect(self.StartCalibraiton) |
| | | self.UI.pushButton_11.clicked.connect(self.Calibraiton_Z) |
| | | self.UI.pushButton_13.clicked.connect(self.Calibraiton_M) |
| | | self.UI.pushButton_18.clicked.connect(self.Calibraiton_MP) |
| | | self.UI.pushButton_22.clicked.connect(self.Calibraiton_MN) |
| | | self.UI.pushButton_24.clicked.connect(self.Calibraiton_X2) |
| | | self.UI.pushButton_26.clicked.connect(self.Calibraiton_X1) |
| | | self.UI.pushButton_27.clicked.connect(self.Calibraiton_Y1) |
| | | self.UI.pushButton_28.clicked.connect(self.Calibraiton_Y2) |
| | | self.UI.pushButton_29.clicked.connect(self.Calibraiton_GAP) |
| | | |
| | | self.UI.radioButton.toggled.connect(self.TestPresentEvent) |
| | | |
| | | self.UI.pushButton_48.clicked.connect(self.start_programming) |
| | | seff.UI.pushButton_46.clicked.connect(self.loadAppfile) |
| | | |
| | | def _formatMsgData(self, index, item, received): |
| | | '''msg data to list |
| | |
| | | self.can_thread.start() |
| | | |
| | | def can_thread(self): |
| | | while self._isOpen.value == 1: |
| | | while True: |
| | | time.sleep(0.01) |
| | | if self.needdisconnect.value == 1: |
| | | ret = self._usbcan.CloseDevice() |
| | | if ret == 1: |
| | | self.needdisconnect.value = 0 |
| | | self._isOpen.value = 0 |
| | | else: |
| | | msg, num = self._usbcan.Receive(len=10) |
| | | if not num == 0: |
| | | for i in range(num): |
| | | if msg[i].arbitration_id == self.shifter.canid.phy_txId: |
| | | self.conn.CANtoIsoTPQueue.put( |
| | | msg[i]) # send msg to isotp |
| | | self.msgQueue.put(msg[i]) |
| | | # send signal to update screen |
| | | # logger.info('Rx: ID=0x{:0<3x} '.format(msg[i].arbitration_id) + ' '.join(['0x' + |
| | | # '{:0<2x}'.format(a).upper() for a in list(msg[i].data)])) |
| | | g_signal.sig_MsgReceived.emit(num) |
| | | msgToSendCnt = self.sendQueue.qsize() |
| | | if msgToSendCnt > 0: |
| | | msg = Message() |
| | | for i in range(msgToSendCnt): |
| | | while self._isOpen.value == 1: |
| | | time.sleep(0.01) |
| | | if self.needdisconnect.value == 1: |
| | | ret = self._usbcan.CloseDevice() |
| | | if ret == 1: |
| | | self.needdisconnect.value = 0 |
| | | self._isOpen.value = 0 |
| | | else: |
| | | msg, num = self._usbcan.Receive(len=1) |
| | | if not num == 0: |
| | | for i in range(num): |
| | | if msg[i].arbitration_id == self.shifter.canid.phy_txId: |
| | | # conn.send(msg[i]) # pipe connection send |
| | | self.CANtoIsoTPQueue.put( |
| | | msg[i]) # send msg to isotp |
| | | print('收到77A') |
| | | |
| | | msg = self.sendQueue.get() |
| | | # logger.info('Tx: ID=0x{:0<3x} '.format( |
| | | # msg.arbitration_id)+'DLC={} '.format(msg.dlc)+'externd flag ={} '.format(msg.is_extended_id)+'remote frame:{} '.format(msg.is_remote_frame)) |
| | | self._usbcan.send(msg) |
| | | self.msgQueue.put(msg[i]) |
| | | # logger.info('time:'.format(msg[i].timestamp)+'Rx: ID=0x{:0<3x} '.format(msg[i].arbitration_id) + ' '.join(['0x' + |
| | | # '{:0<2x}'.format(a).upper() for a in list(msg[i].data)])) |
| | | |
| | | # send signal to update screen |
| | | |
| | | g_signal.sig_MsgReceived.emit(num) |
| | | # msgToSendCnt = self.sendQueue.qsize() |
| | | # if msgToSendCnt > 0: |
| | | # msg = Message() |
| | | # for i in range(msgToSendCnt): |
| | | |
| | | # msg = self.sendQueue.get() |
| | | # # logger.info('Tx: ID=0x{:0<3x} '.format( |
| | | # # msg.arbitration_id)+'DLC={} '.format(msg.dlc)+'externd flag ={} '.format(msg.is_extended_id)+'remote frame:{} '.format(msg.is_remote_frame)) |
| | | # self._usbcan.send(msg) |
| | | def send_dump(self): |
| | | msg = Message() |
| | | msg.arbitration_id = self.shifter.canid.phy_rxId |
| | | msg.dlc = 8 |
| | | msg.channel = 0 |
| | | msg.data = [0x02, 0x3e, 0x80, 0, 0, 0, 0, 0] |
| | | msg.is_extended_id = False |
| | | self._usbcan.send(msg) |
| | | |
| | | def open_close(self): |
| | | if self._isOpen.value == 0: |
| | |
| | | self._deviceOpenUpdate() |
| | | self.can_thread_start() |
| | | self.conn.open() # start iso tp thread |
| | | self.send_dump() |
| | | self.send_dump() |
| | | |
| | | else: |
| | | self._usbcan.InitAndStart() |
| | |
| | | else: |
| | | self.needdisconnect.value = 1 |
| | | self._deviceOpenUpdate() |
| | | self.TestPresentTimer.stop() |
| | | |
| | | def update_HardwareDevice(self): |
| | | |
| | |
| | | self._updateRootList() |
| | | time.sleep(0.1) |
| | | |
| | | def disp_string(self, out): |
| | | self.UI.statusbar.showMessage(str(out)) |
| | | def disp_string(self, out_s): |
| | | dt = datetime.now() |
| | | nowtime_str = dt.strftime('%I:%M:%S') # time |
| | | self.textEdit_2.insertPlainText(nowtime_str + ': ') |
| | | cursor = self.textEdit_2.textCursor() |
| | | cursor.movePosition(QtGui.QTextCursor.End) |
| | | self.textEdit_2.setTextCursor(cursor) |
| | | self.textEdit_2.insertPlainText(str(out_s+"\t\n")) |
| | | |
| | | # self.statusbar.showMessage(str(out)) |
| | | |
| | | def _updateRootList(self): |
| | | _dataSize = self.msgQueue.qsize() |
| | |
| | | |
| | | def TestPresentEvent(self): |
| | | if self.UI.radioButton.isChecked(): |
| | | self.TestPresentTimer.start(4000) |
| | | self.TestPresentTimer.start(3000) |
| | | else: |
| | | self.TestPresentTimer.stop() |
| | | |
| | | def creat_testpresentReq(self): |
| | | self.udsclient.tester_present() |
| | | self.send_dump() |
| | | |
| | | def sessioncontrol_req(self): |
| | | sesson = self.UI.comboBox.currentIndex() + 1 |
| | | self.udsclient.change_session(sesson) |
| | | t = threading.Thread( |
| | | target=self.udsclient.change_session, args=(sesson,)) |
| | | t.start() |
| | | t.join() |
| | | # self.udsclient.change_session(sesson) |
| | | |
| | | # self.UI.comboBox_8.addItems(['0x01硬件复位', '0x03软件复位']) |
| | | def MCUReset_req(self): |
| | | req = 0x03 if self.UI.comboBox_8.currentData() == '0x01硬件复位' else 0x01 |
| | | self.udsclient.ecu_reset(req) |
| | | |
| | | # self.UI.comboBox_9.addItems(['0x00使能收发', '0x01能收禁发', '0x03禁止收发']) |
| | | def Security_req(self): |
| | | self.udsclient.unlock_security_access(1) |
| | | # Security_dic[self.UI.comboBox_8.currentData()] |
| | | |
| | | def SecurityUnlockLevel_1(self): |
| | | data = self.udsclient.unlock_security_access(1) |
| | | if data.positive: |
| | | self.UI.pushButton_31.setStyleSheet( |
| | | "background-color:rgb(255, 85, 127)") |
| | | |
| | | def SecurityUnlockLevel_3(self): |
| | | data = self.udsclient.unlock_security_access(9) |
| | | if data.positive: |
| | | self.UI.pushButton_32.setStyleSheet( |
| | | "background-color:rgb(255, 85, 127)") |
| | | |
| | | def ReadSupplyID(self): |
| | | data = self.ReadByDID(0xF18A) |
| | | if data is not None: |
| | | self.UI.lineEdit_8.setText(str(data[0xF18A])) |
| | | |
| | | def ReadVIN(self): |
| | | data = self.ReadByDID(0xF190) |
| | | if data is not None: |
| | | self.UI.lineEdit_3.setText(str(data[0xF190])) |
| | | |
| | | def ReadMfgDate(self): |
| | | data = self.ReadByDID(0x210B) |
| | | if data is not None: |
| | | self.UI.lineEdit_5.setText(str(data[0x210B])) |
| | | |
| | | def ReadByDID(self, didlist: List): |
| | | values = {} |
| | | try: |
| | | response = self.udsclient.read_data_by_identifier(didlist) |
| | | if response.positive: |
| | | values = response.service_data.values |
| | | return values |
| | | except Exception as e: |
| | | g_signal.sig_Disp_str.emit(e) |
| | | |
| | | def DID_display(self): |
| | | # print(self.UI.comboBox_6.currentText()) |
| | | self.UI.lineEdit.setText("0x{:0<4x}". |
| | | format((DID_dic[self.UI.comboBox_6.currentText()]))) |
| | | |
| | | def ReadDataByID(self): |
| | | tempdid = DID_dic[self.UI.comboBox_6.currentText()] |
| | | self.UI.lineEdit_2.clear() |
| | | data = self.ReadByDID(tempdid) |
| | | if data is not None and len(str(data[tempdid])): |
| | | self.UI.lineEdit_2.setText(str(data[tempdid])+' ') |
| | | |
| | | def communicationControl_req(self): |
| | | req = 0x00 |
| | | select = self.UI.comboBox_9.currentData() |
| | | select = self.UI.comboBox_9.currentText() |
| | | if select == '0x00使能收发': |
| | | req = 0 |
| | | elif select == '0x01能收禁发': |
| | |
| | | req = 3 |
| | | self.udsclient.communication_control( |
| | | control_type=0, communication_type=req) |
| | | |
| | | def StartCalibraiton(self): |
| | | startcmd = bytearray([0x0, 0x0, 0x0, 0x0]) |
| | | if self.RequestRoutControl(0xFFAA, bytes(startcmd)): |
| | | self.UI.pushButton_9.setStyleSheet( |
| | | "background-color:rgb(255, 85, 127)") |
| | | |
| | | def Calibraiton_Z(self): |
| | | calcmd = bytearray([0x1, 0x0, 0x0, 0x0]) |
| | | if self.RequestRoutControl(0xFFAA, bytes(calcmd)): |
| | | self.UI.pushButton_11.setStyleSheet( |
| | | "background-color:rgb(255, 85, 127)") |
| | | |
| | | def Calibraiton_M(self): |
| | | calcmd = bytearray([0x2, 0x0, 0x0, 0x0]) |
| | | if self.RequestRoutControl(0xFFAA, bytes(calcmd)): |
| | | self.UI.pushButton_13.setStyleSheet( |
| | | "background-color:rgb(255, 85, 127)") |
| | | |
| | | def Calibraiton_MP(self): |
| | | calcmd = bytearray([0x3, 0x0, 0x0, 0x0]) |
| | | if self.RequestRoutControl(0xFFAA, bytes(calcmd)): |
| | | self.UI.pushButton_18.setStyleSheet( |
| | | "background-color:rgb(255, 85, 127)") |
| | | |
| | | def Calibraiton_MN(self): |
| | | calcmd = bytearray([0x4, 0x0, 0x0, 0x0]) |
| | | if self.RequestRoutControl(0xFFAA, bytes(calcmd)): |
| | | self.UI.pushButton_22.setStyleSheet( |
| | | "background-color:rgb(255, 85, 127)") |
| | | |
| | | def Calibraiton_X2(self): |
| | | calcmd = bytearray([0x5, 0x0, 0x0, 0x0]) |
| | | if self.RequestRoutControl(0xFFAA, bytes(calcmd)): |
| | | self.UI.pushButton_24.setStyleSheet( |
| | | "background-color:rgb(255, 85, 127)") |
| | | |
| | | def Calibraiton_X1(self): |
| | | calcmd = bytearray([0x6, 0x0, 0x0, 0x0]) |
| | | if self.RequestRoutControl(0xFFAA, bytes(calcmd)): |
| | | self.UI.pushButton_26.setStyleSheet( |
| | | "background-color:rgb(255, 85, 127)") |
| | | |
| | | def Calibraiton_Y1(self): |
| | | calcmd = bytearray([0x7, 0x0, 0x0, 0x0]) |
| | | if self.RequestRoutControl(0xFFAA, bytes(calcmd)): |
| | | self.UI.pushButton_27.setStyleSheet( |
| | | "background-color:rgb(255, 85, 127)") |
| | | |
| | | def Calibraiton_Y2(self): |
| | | calcmd = bytearray([0x8, 0x0, 0x0, 0x0]) |
| | | if self.RequestRoutControl(0xFFAA, bytes(calcmd)): |
| | | self.UI.pushButton_28.setStyleSheet( |
| | | "background-color:rgb(255, 85, 127)") |
| | | |
| | | def Calibraiton_GAP(self): |
| | | calcmd = bytearray([0x9, 0x0, 0x0, 0x0]) |
| | | if self.RequestRoutControl(0xFFAA, bytes(calcmd)): |
| | | self.UI.pushButton_29.setStyleSheet( |
| | | "background-color:rgb(255, 85, 127)") |
| | | |
| | | def RequestRoutControl(self, routine_id, data): |
| | | resp_1 = self.udsclient.start_routine(routine_id=routine_id, data=data) |
| | | return resp_1.positive |
| | | |
| | | def pre_programming(self): |
| | | |
| | | self.disp_string('# 预编程步骤') |
| | | |
| | | # 进入extended session |
| | | self.disp_string('>>> 进入扩展模式') |
| | | response = self.udsclient.change_session(3) |
| | | if response.positive: |
| | | # 检查编程条件 |
| | | self.disp_string('>>> 检查编程条件') |
| | | response = self.udsclient.start_routine(0xff02) |
| | | if response.positive: |
| | | # 关闭DTC的存储 |
| | | self.disp_string('>>> 关闭DTC的存储') |
| | | response = self.udsclient.control_dtc_setting(2) |
| | | # print(response) |
| | | if response.positive: |
| | | # 关闭与诊断无关的报文 |
| | | self.disp_string('>>> 关闭与诊断无关的报文') |
| | | response = self.udsclient.communication_control(0x03, 0x01) |
| | | else: |
| | | self.disp_string('>>> 预编程失败') |
| | | |
| | | return response.positive |
| | | # print(response) |
| | | |
| | | def main_programming(self): |
| | | |
| | | self.disp_string('# 主编程步骤') |
| | | |
| | | # 进入programming session |
| | | self.disp_string('>>> 进入编程模式') |
| | | response = self.udsclient.change_session(2) |
| | | # print(response) |
| | | if response.positive: |
| | | # 安全访问 |
| | | self.disp_string('>>> 安全访问') |
| | | response = self.udsclient.unlock_security_access(9) |
| | | # print(response) |
| | | |
| | | # print(response) |
| | | if response.positive: |
| | | # 发送driver文件 |
| | | self.disp_string('>>> 发送driver文件') |
| | | drv_file = self.drv_data |
| | | address = drv_file[16] << 24 | drv_file[17] << 16 | drv_file[ |
| | | 18] << 8 | drv_file[19] << 0 |
| | | memorysize = drv_file[20] << 24 | drv_file[21] << 16 | drv_file[ |
| | | 22] << 8 | drv_file[23] << 0 |
| | | memory_location = MemoryLocation(address, memorysize, 32, 32) |
| | | response = self.udsclient.request_download(memory_location) |
| | | # print(response) |
| | | |
| | | max_length = response.service_data.max_length |
| | | |
| | | # 有效数据长度, 去除sid和sequence两个字节 |
| | | payload_length = max_length - 2 |
| | | |
| | | count = (len(drv_file) + payload_length - 1) // payload_length |
| | | |
| | | base = self.get_progressbar_pos() |
| | | for i in range(count): |
| | | start = i * payload_length |
| | | end = start + payload_length |
| | | response = self.udsclient.transfer_data((i + 1) % 256, |
| | | drv_file[start:end]) |
| | | self.set_progressbar_pos(base + end) |
| | | # print(response) |
| | | if response.positive: |
| | | response = self.udsclient.request_transfer_exit() |
| | | # print(response) |
| | | if response.positive: |
| | | # driver文件完整性检验 |
| | | self.disp_string('>>> driver文件完整性检验') |
| | | response = self.udsclient.start_routine(0xf001, drv_file[0:4]) |
| | | # print(response) |
| | | if response.positive: |
| | | app_file = self.app_data |
| | | address = app_file[16] << 24 | app_file[17] << 16 | app_file[ |
| | | 18] << 8 | app_file[19] << 0 |
| | | memorysize = app_file[20] << 24 | app_file[21] << 16 | app_file[ |
| | | 22] << 8 | app_file[23] << 0 |
| | | memory_location = MemoryLocation(address, memorysize, 32, 32) |
| | | |
| | | # 删除app存储空间 |
| | | self.disp_string('>>> 删除app存储空间') |
| | | data = b'' |
| | | data += memory_location.alfid.get_byte() |
| | | data += memory_location.get_address_bytes() |
| | | data += memory_location.get_memorysize_bytes() |
| | | response = self.udsclient.start_routine(0xff00, data) |
| | | # print(response) |
| | | if response.positive: |
| | | # 发送app文件 |
| | | self.disp_string('>>> 发送app文件') |
| | | response = self.udsclient.request_download(memory_location) |
| | | # print(response) |
| | | |
| | | max_length = response.service_data.max_length |
| | | |
| | | # 有效数据长度, 去除sid和sequence两个字节 |
| | | payload_length = max_length - 2 |
| | | |
| | | count = (len(app_file) + payload_length - 1) // payload_length |
| | | |
| | | base = self.get_progressbar_pos() |
| | | for i in range(count): |
| | | start = i * payload_length |
| | | end = start + payload_length |
| | | response = self.udsclient.transfer_data((i + 1) % 256, |
| | | app_file[start:end]) |
| | | self.set_progressbar_pos(base + end) |
| | | # print(response) |
| | | if response.positive: |
| | | response = self.udsclient.request_transfer_exit() |
| | | # print(response) |
| | | if response.positive: |
| | | # app文件完整性检验 |
| | | self.disp_string('>>> app文件完整性检验') |
| | | response = self.udsclient.start_routine(0xf001, app_file[0:4]) |
| | | # print(response) |
| | | return response.positive |
| | | |
| | | def post_programming(self): |
| | | |
| | | self.disp_string('# 后编程步骤') |
| | | |
| | | # 回到default session |
| | | self.disp_string('>>> 回到默认模式') |
| | | |
| | | self.udsclient.suppress_positive_response.enabled = True |
| | | response = self.udsclient.change_session(1) |
| | | return response.positive |
| | | |
| | | def start_programming(self): |
| | | |
| | | self.set_progressbar_pos(0) |
| | | |
| | | t1 = time.time() |
| | | |
| | | try: |
| | | |
| | | self.pre_programming() |
| | | self.main_programming() |
| | | self.post_programming() |
| | | except Exception as e: |
| | | |
| | | self.disp_string('%s' % e, '#ff0000') |
| | | |
| | | t2 = time.time() |
| | | |
| | | self.disp_string('finished in %.2f sec' % (t2 - t1)) |
| | | self.in_programming = False |
| | | self.UI.pushButton_48.setDisabled(True) |
| | | |
| | | def set_progressbar_pos(self, pos): |
| | | |
| | | drv_data_len = len(self.drv_data) if self.drv_data is not None else 0 |
| | | app_data_len = len(self.app_data) if self.app_data is not None else 0 |
| | | total_len = drv_data_len + app_data_len |
| | | if pos > total_len: |
| | | pos = total_len |
| | | elif pos < 0: |
| | | pos = 0 |
| | | |
| | | self.UI.progressBar.setValue(pos) |
| | | |
| | | def get_progressbar_pos(self): |
| | | return self.UI.progressBar.value() |
| | | |
| | | def loadAppfile(self): |
| | | if self.in_programming: |
| | | self.disp_string('## 正在编程中') |
| | | return |
| | | |
| | | fileName, _ = QtWidgets.QFileDialog.getOpenFileName( |
| | | self, 'comboBox_4', '.', 'All Files (*);;Bin Files (*.bin);;Hex Files (*.hex)') |
| | | |
| | | if fileName != '': |
| | | self.UI.comboBox_4.addItem(fileName) |
| | | self.UI.comboBox_4.setCurrentText(fileName) |
| | | with open(fileName, 'rb') as fd: |
| | | self.app_data = fd.read() |
| | | drv_data_len = len( |
| | | self.drv_data) if self.drv_data is not None else 0 |
| | | app_data_len = len( |
| | | self.app_data) if self.app_data is not None else 0 |
| | | total_len = drv_data_len + app_data_len |
| | | self.UI.progressBar.setRange(0, total_len) |