From a1a1c89b91fc9476c51981699aaa7c2bccdd8570 Mon Sep 17 00:00:00 2001 From: tao_z <tzj0429@163.com> Date: Sun, 07 Aug 2022 14:19:02 +0800 Subject: [PATCH] SX7H初步完成 --- widgets/ShifterTool.py | 543 ++++++++++++++++++++++++++++++++++++++++------------- 1 files changed, 404 insertions(+), 139 deletions(-) diff --git a/widgets/ShifterTool.py b/widgets/ShifterTool.py index 020d602..d357388 100644 --- a/widgets/ShifterTool.py +++ b/widgets/ShifterTool.py @@ -1,10 +1,10 @@ - from ast import Not, Pass from concurrent.futures import thread +from curses import flash from logging import exception import threading from typing import List -from can import BusABC, Message +from can import BusABC, Message, Logger, Listener from udsoncan.client import Client from udsoncan.exceptions import TimeoutException import udsoncan @@ -14,17 +14,20 @@ from isotp import CanMessage import queue from USBCAN import * -from Shifter import AsciiCodec, ShifterClass +from Shifter import * +from styleSheet import * from mainwindows import Ui_MainWindow from PySide2 import QtWidgets, QtCore, QtGui import struct import time import datetime from ShifterDefine import * -from multiprocessing import Array, Pool, Process, Queue, Value, Pipe +from multiprocessing import Process, Queue, Value, Pipe +from hexread import * MAX_RCV_NUM = 20 - +APP_ADDR_LOCATION_OFFSET = 8 +APP_LENGTH_LOCATION_OFFSET = 12 logger = logging.getLogger() logger.setLevel(logging.DEBUG) formatter = logging.Formatter( @@ -33,9 +36,10 @@ consoleHandler = logging.StreamHandler() consoleHandler.setLevel(logging.DEBUG) -fileHandler = logging.FileHandler( - './log/ShiftTool.log', mode='a', encoding='UTF-8') -fileHandler.setLevel(logging.NOTSET) +fileHandler = logging.FileHandler('./log/ShiftTool.log', + mode='a', + encoding='UTF-8') +fileHandler.setLevel(logging.DEBUG) consoleHandler.setFormatter(formatter) fileHandler.setFormatter(formatter) @@ -55,6 +59,7 @@ class HardwreDevice(): + def __init__(self): self.device_type = 4 self.channel = 0 # can_index @@ -63,6 +68,7 @@ class PeriodSendThread(object): + def __init__(self, period_func, args=[], kwargs={}): self._thread = threading.Thread(target=self._run) self._function = period_func @@ -117,12 +123,13 @@ self.isotp_layer = isotp_layer assert isinstance( - self.isotp_layer, isotp.TransportLayer), 'isotp_layer must be a valid isotp.TransportLayer ' + self.isotp_layer, isotp.TransportLayer + ), 'isotp_layer must be a valid isotp.TransportLayer ' def open(self): self.exit_requested = False - self._read_thread = threading.Thread( - None, target=self.rxthread_task) + self._read_thread = threading.Thread(None, + target=self.rxthread_task) self._read_thread.start() self.opened = True logger.info('Connection opened') @@ -150,7 +157,8 @@ if self.mtu is not None: if len(payload) > self.mtu: logger.warning( - "Truncating payload to be set to a length of %d" % (self.mtu)) + "Truncating payload to be set to a length of %d" % + (self.mtu)) payload = payload[0:self.mtu] # isotp.protocol.TransportLayer uses byte array. udsoncan is strict on bytes format @@ -172,12 +180,14 @@ if timedout: raise TimeoutException( - "Did not receive frame IsoTP Transport layer in time (timeout=%s sec)" % timeout) + "Did not receive frame IsoTP Transport layer in time (timeout=%s sec)" + % timeout) if self.mtu is not None: if frame is not None and len(frame) > self.mtu: logger.warning( - "Truncating received payload to a length of %d" % (self.mtu)) + "Truncating received payload to a length of %d" % + (self.mtu)) frame = frame[0:self.mtu] # isotp.protocol.TransportLayer uses bytearray. udsoncan is strict on bytes format @@ -204,8 +214,9 @@ while self.isotp_layer.available(): # isotp_conn.send(self.isotp_layer.recv()) self.IsoTPtoUDSQueue.put(self.isotp_layer.recv()) - self.logger.debug("IsoTPtoUDSQueue queue size is now %d" % ( - self.IsoTPtoUDSQueue.qsize())) + self.logger.debug( + "IsoTPtoUDSQueue queue size is now %d" % + (self.IsoTPtoUDSQueue.qsize())) # time.sleep(self.isotp_layer.sleep_time()) time.sleep(0.001) @@ -226,20 +237,27 @@ self.sendQueue = Queue() # can layer send queue self.CANtoIsoTPQueue = Queue() # CAN --> isotp self.shifter = ShifterClass() + self.Vehicle = VehicleClass() self.devicedescription = HardwreDevice() self.windows = QtWidgets.QMainWindow() self.UI = Ui_MainWindow() self.UI.setupUi(window) self._dev_info = None + self.dbc = cantools.database.load_file("DBC/SX7H.dbc") self.can_thread = threading.Thread(target=self.can_thread) self.TestPresentTimer = QtCore.QTimer() self.TestPresentTimer.timeout.connect(self.creat_testpresentReq) self.DeviceInit() self.WidgetsInit() self.ChnInfoUpdate(self._isOpen) - self.drv_data: bytes = [0x1c, 0x01, 0x06, 0x80, - 0x1f, 0x01, 0x06, 0x80, 0xfb, 0x0a] + self.in_programming = False + self.startfromboot = False + self.drv_data = [ + 0x1c, 0x01, 0x06, 0x80, 0x1f, 0x01, 0x06, 0x80, 0xfb, 0x0a + ] self.app_data: bytes = None + self.start_timestamp = time.time() + self.boot_logger = None def DeviceInit(self): self._usbcan = None @@ -275,26 +293,33 @@ 'squash_stmin_requirement': False } self._isotpaddr_PHYS = isotp.Address( - isotp.AddressingMode.Normal_11bits, txid=self.shifter.canid.phy_rxId, rxid=self.shifter.canid.phy_txId) + isotp.AddressingMode.Normal_11bits, + txid=self.shifter.canid.phy_rxId, + rxid=self.shifter.canid.phy_txId) self._isotpaddr_FUNC = isotp.Address( - isotp.AddressingMode.Normal_11bits, txid=self.shifter.canid.fun_rxId, rxid=self.shifter.canid.phy_txId) + isotp.AddressingMode.Normal_11bits, + txid=self.shifter.canid.fun_rxId, + rxid=self.shifter.canid.phy_txId) # self._isotpaddr_EPS = isotp.Address( # isotp.AddressingMode.Normal_11bits, txid=EPS_RX_ID_PHYS, rxid=EPS_TX_ID) # self._isotpaddr_EPS4wd = isotp.Address( # isotp.AddressingMode.Normal_11bits, txid=EPS4wd_RX_ID_PHYS, rxid=EPS4wd_TX_ID) - self.isotp_layer = isotp.TransportLayer( - rxfn=self.isotp_rcv, txfn=self.isotp_send, address=self._isotpaddr_PHYS, params=self.isotp_params) + self.isotp_layer = isotp.TransportLayer(rxfn=self.isotp_rcv, + txfn=self.isotp_send, + address=self._isotpaddr_PHYS, + params=self.isotp_params) self.conn = ShifterTool.IsoTpConnection(isotp_layer=self.isotp_layer) self.udsclient = Client(self.conn, request_timeout=2) self.udsclient.config['security_algo'] = self.SecAlgo self.udsclient.config['security_algo_params'] = [ - 0x11223344, 0x20AA097B, 0x11223344, 0x70237577] + 0x11223344, 0x20AA097B, 0x11223344, 0x70237577 + ] self.udsclient.config['data_identifiers'] = self.shifter.did_config self.udsclient.config['server_address_format'] = 32 self.udsclient.config['server_memorysize_format'] = 32 - def SecAlgo(self, level, seed, params): + def SecAlgo(self, level, seed, params=None): """ Builds the security key to unlock a security level. @@ -311,14 +336,22 @@ self.output_key[2] = ((temp_key[1] & 0xFC) >> 2) | (temp_key[0] & 0xC0) self.output_key[3] = ((temp_key[0] & 0x0F) << 4) | (temp_key[3] & 0x0F) """ - temp_key = (seed[0] << 24) | ( - seed[1] << 16) | (seed[2] << 8) | (seed[3]) + temp_key = (seed[0] << 24) | (seed[1] << 16) | (seed[2] << 8) | ( + seed[3]) if level == 0x01: output_key_temp = 0x20AA097B + output_key = struct.pack('BBBB', (output_key_temp >> 24) & 0xFF, + (output_key_temp >> 16) & 0xFF, + (output_key_temp >> 8) & 0xFF, + output_key_temp & 0xFF) # output_key_temp = ((((temp_key >> 4) ^ temp_key) # << 3) ^ temp_key) & 0xFFFFFFFF elif level == 0x09: output_key_temp = 0x70237577 + output_key = struct.pack('BBBB', (output_key_temp >> 24) & 0xFF, + (output_key_temp >> 16) & 0xFF, + (output_key_temp >> 8) & 0xFF, + output_key_temp & 0xFF) # _temp_y = ((temp_key << 24) & 0xFF000000) + ((temp_key << 8) & # 0xFF0000) + ((temp_key >> 8) & 0xFF00) + ((temp_key >> 24) & 0xFF) # _temp_z = 0 @@ -334,11 +367,21 @@ # _temp_z = _temp_z & 0xFFFFFFFF # output_key_temp = (((_temp_z << 24) & 0xFF000000) | ((_temp_z << 8) & 0xFF0000) | ( # (_temp_z >> 8) & 0xFF00) | ((_temp_z >> 24) & 0xFF)) + elif level == 0x7D: + output_key_temp = 0xAB2F9F36099D81F3 + output_key = struct.pack( + 'BBBBBBBB', (output_key_temp >> 56) & 0xFF, + (output_key_temp >> 48) & 0xFF, (output_key_temp >> 40) & 0xFF, + (output_key_temp >> 32) & 0xFF, (output_key_temp >> 24) & 0xFF, + (output_key_temp >> 16) & 0xFF, (output_key_temp >> 8) & 0xFF, + output_key_temp & 0xFF) else: output_key_temp = temp_key - output_key = struct.pack('BBBB', (output_key_temp >> 24) & 0xFF, ( - output_key_temp >> 16) & 0xFF, (output_key_temp >> 8) & 0xFF, output_key_temp & 0xFF) + output_key = struct.pack('BBBB', (output_key_temp >> 24) & 0xFF, + (output_key_temp >> 16) & 0xFF, + (output_key_temp >> 8) & 0xFF, + output_key_temp & 0xFF) return output_key @@ -373,7 +416,15 @@ msg.channel = 0 msg.data = isotp_msg.data # bytearray msg.is_extended_id = False + msg.timestamp = time.time() - self.start_timestamp + msg.is_error_frame = False + msg.is_remote_frame = False + self.Tool_send(msg) + + def Tool_send(self, msg: Message): self._usbcan.send(msg) + if self.boot_logger is not None: + self.boot_logger.on_message_received(msg) def WidgetsInit(self): # self.UI.setupUi(self) @@ -381,7 +432,7 @@ self.UI.comboBox_2.addItem('USBCAN-II') # 波特率 - self.UI.comboBox_3.addItems([('%dK' % (i/1000)) + self.UI.comboBox_3.addItems([('%dK' % (i / 1000)) for i in TIMING_DICT.keys()]) # CHANNEL self.UI.comboBox_5.addItems(['CH1/3', 'CH2/4']) @@ -408,14 +459,17 @@ self.UI.comboBox_6.addItems(list(DID_dic.keys())) self.DID_display() self.UI.comboBox_6.activated.connect(self.DID_display) + self.UI.comboBox_7.addItems(list(DTCGroup_dic.keys())) + self.UI.comboBox_7.activated.connect(self.ClearDTC) self.UI.pushButton_31.clicked.connect(self.SecurityUnlockLevel_1) - self.UI.pushButton_32.clicked.connect(self.SecurityUnlockLevel_3) + self.UI.pushButton_32.clicked.connect(self.SecurityUnlockLevel_2) self.UI.pushButton_12.clicked.connect(self.ReadSupplyID) self.UI.pushButton_4.clicked.connect(self.ReadVIN) self.UI.pushButton_6.clicked.connect(self.ReadMfgDate) self.UI.pushButton_14.clicked.connect(self.ReadDataByID) + self.UI.pushButton_3.clicked.connect(self.WriteDataByID) self.UI.pushButton_9.clicked.connect(self.StartCalibraiton) self.UI.pushButton_11.clicked.connect(self.Calibraiton_Z) self.UI.pushButton_13.clicked.connect(self.Calibraiton_M) @@ -430,7 +484,20 @@ self.UI.radioButton.toggled.connect(self.TestPresentEvent) self.UI.pushButton_48.clicked.connect(self.start_programming) - seff.UI.pushButton_46.clicked.connect(self.loadAppfile) + self.UI.pushButton_46.clicked.connect(self.loadAppfile) + self.UI.radioButton_2.toggled.connect(self.SetFromBootFlag) + + self.UI.pushButton_47.clicked.connect(self.SetBootLog) + # self.UI.comboBox_10.activated.connect(self.ReportByMask) + + self.UI.comboBox_11.addItems(list(ReportBymask_DTC.keys())) + self.UI.comboBox_12.addItems(list(DTC_Dic.keys())) + self.UI.comboBox_13.addItems(list(DTC_Control_dic.keys())) + self.UI.comboBox_13.activated.connect(self.DTC_Control) + + self.UI.pushButton_49.clicked.connect(self.ReportSupportDTC) + self.UI.comboBox_11.activated.connect(self.ReportByMask) + self.UI.comboBox_12.activated.connect(self.ReportSnapshotByDTC) def _formatMsgData(self, index, item, received): '''msg data to list @@ -463,8 +530,8 @@ else: data.append('标准帧') - data.append(' '.join(['0x' + - '{:0<2x}'.format(a).upper() for a in list(item.data)])) + data.append(' '.join( + ['0x' + '{:0>2x}'.format(a).upper() for a in list(item.data)])) return data def can_thread_stop(self): @@ -491,15 +558,17 @@ # conn.send(msg[i]) # pipe connection send self.CANtoIsoTPQueue.put( msg[i]) # send msg to isotp - print('收到77A') self.msgQueue.put(msg[i]) + if self.boot_logger is not None: + self.boot_logger.on_message_received(msg[i]) # logger.info('time:'.format(msg[i].timestamp)+'Rx: ID=0x{:0<3x} '.format(msg[i].arbitration_id) + ' '.join(['0x' + # '{:0<2x}'.format(a).upper() for a in list(msg[i].data)])) # send signal to update screen g_signal.sig_MsgReceived.emit(num) + # msgToSendCnt = self.sendQueue.qsize() # if msgToSendCnt > 0: # msg = Message() @@ -508,7 +577,7 @@ # msg = self.sendQueue.get() # # logger.info('Tx: ID=0x{:0<3x} '.format( # # msg.arbitration_id)+'DLC={} '.format(msg.dlc)+'externd flag ={} '.format(msg.is_extended_id)+'remote frame:{} '.format(msg.is_remote_frame)) - # self._usbcan.send(msg) + # self.Tool_send(msg) def send_dump(self): msg = Message() msg.arbitration_id = self.shifter.canid.phy_rxId @@ -516,19 +585,36 @@ msg.channel = 0 msg.data = [0x02, 0x3e, 0x80, 0, 0, 0, 0, 0] msg.is_extended_id = False - self._usbcan.send(msg) + msg.is_remote_frame = False + msg.is_error_frame = False + msg.timestamp = time.time() - self.start_timestamp + self.Tool_send(msg) + + def send_VehiclePosition(self, data=[]): + msg = Message() + msg.arbitration_id = 0x10 + msg.dlc = 8 + msg.channel = 0 + msg.data = data + msg.is_extended_id = False + msg.is_remote_frame = False + msg.is_error_frame = False + msg.timestamp = time.time() - self.start_timestamp + self.Tool_send(msg) def open_close(self): if self._isOpen.value == 0: if self._usbcan is None: self.update_HardwareDevice() - can_filters = [ - {'can_id': 0x420, 'can_mask': 0xFFFFFFFF}] + can_filters = [{'can_id': 0x420, 'can_mask': 0xFFFFFFFF}] - bitrate = list(TIMING_DICT.keys())[ - self.devicedescription.baudrate] - self._usbcan = USBCAN(device_type=4, device_index=0, - can_index=0, bitrate=bitrate, can_filters=can_filters) + bitrate = list( + TIMING_DICT.keys())[self.devicedescription.baudrate] + self._usbcan = USBCAN(device_type=4, + device_index=0, + can_index=self.devicedescription.channel, + bitrate=bitrate, + can_filters=can_filters) if not self._usbcan.InitAndStart(): logger.info("Open usbcan device fail.") @@ -548,9 +634,13 @@ self._deviceOpenUpdate() self.TestPresentTimer.stop() + def closeEvent(self, e): + self.boot_logger.stop() + def update_HardwareDevice(self): - self.devicedescription.device_type = self.UI.comboBox_2.currentIndex()+3 + self.devicedescription.device_type = self.UI.comboBox_2.currentIndex( + ) + 3 self.devicedescription.channel = self.UI.comboBox_5.currentIndex() self.devicedescription.baudrate = self.UI.comboBox_3.currentIndex() @@ -559,13 +649,14 @@ self.UI.pushButton.setText("Close CAN") elif self.needdisconnect.value == 1: self.UI.pushButton.setText("Open CAN") + # set ui info def _StartlistenMsgProcess(self): - self.msgProcess = Process( - name='pyUSBCANListener', - target=self._usbcan.ListeningMsg, - args=(self._isOpen, self.needdisconnect, self.msgQueue, self.sendQueue)) + self.msgProcess = Process(name='pyUSBCANListener', + target=self._usbcan.ListeningMsg, + args=(self._isOpen, self.needdisconnect, + self.msgQueue, self.sendQueue)) self.msgProcess.daemon = True self.msgProcess.start() # 1.5s后检测连接状态,该值可能需要标定 @@ -575,15 +666,51 @@ time.sleep(0.1) def disp_string(self, out_s): - dt = datetime.now() + dt = datetime.datetime.now() nowtime_str = dt.strftime('%I:%M:%S') # time - self.textEdit_2.insertPlainText(nowtime_str + ': ') - cursor = self.textEdit_2.textCursor() + self.UI.textEdit_2.insertPlainText(nowtime_str + ': ') + cursor = self.UI.textEdit_2.textCursor() cursor.movePosition(QtGui.QTextCursor.End) - self.textEdit_2.setTextCursor(cursor) - self.textEdit_2.insertPlainText(str(out_s+"\t\n")) + self.UI.textEdit_2.setTextCursor(cursor) + self.UI.textEdit_2.insertPlainText(str(out_s + "\t\n")) # self.statusbar.showMessage(str(out)) + def dispShiftstatus(self): + self.UI.pushButton_41.setStyleSheet( + Style_dic[self.shifter.UnlockButton]) + self.UI.pushButton_40.setStyleSheet(Style_dic[self.shifter.Pbutton]) + #X2 + self.UI.pushButton_37.setStyleSheet( + Style_dic[self.shifter.position == 2]) + #X1 + self.UI.pushButton_38.setStyleSheet( + Style_dic[self.shifter.position == 3]) + #Z + self.UI.pushButton_30.setStyleSheet( + Style_dic[self.shifter.position == 0]) + #Y1 + self.UI.pushButton_33.setStyleSheet( + Style_dic[self.shifter.position == 4]) + #Y2 + self.UI.pushButton_34.setStyleSheet( + Style_dic[self.shifter.position == 5]) + #M + self.UI.pushButton_39.setStyleSheet( + Style_dic[self.shifter.position == 0xe]) + #M+ + self.UI.pushButton_35.setStyleSheet( + Style_dic[self.shifter.position == 0xd]) + #M- + self.UI.pushButton_36.setStyleSheet( + Style_dic[self.shifter.position == 0xc]) + self.UI.pushButton_42.setStyleSheet( + ColorStyle_dic[self.Vehicle.ShiftLeverPos == "P"]) + self.UI.pushButton_43.setStyleSheet( + ColorStyle_dic[self.Vehicle.ShiftLeverPos == "D"]) + self.UI.pushButton_44.setStyleSheet( + ColorStyle_dic[self.Vehicle.ShiftLeverPos == "N"]) + self.UI.pushButton_45.setStyleSheet( + ColorStyle_dic[self.Vehicle.ShiftLeverPos == "R"]) def _updateRootList(self): _dataSize = self.msgQueue.qsize() @@ -594,9 +721,14 @@ for i in range(_dataSize): receiveNum += 1 msg = self.msgQueue.get() + if msg.arbitration_id == 0x420: + self.shifter.FramUnpack(msg.arbitration_id, msg.data) + resp_data = self.Vehicle.ShiftLogic(self.shifter) + self.send_VehiclePosition(resp_data) formateddata.append(self._formatMsgData( receiveNum, msg, True)) # return a data list self._insertDataSmooth(data=formateddata, datasize=_dataSize) + self.dispShiftstatus() def _insertDataSmooth(self, data, datasize): # row = 6-datasize @@ -628,16 +760,20 @@ def sessioncontrol_req(self): sesson = self.UI.comboBox.currentIndex() + 1 - t = threading.Thread( - target=self.udsclient.change_session, args=(sesson,)) - t.start() - t.join() - # self.udsclient.change_session(sesson) + self.udsclient.change_session(sesson) # self.UI.comboBox_8.addItems(['0x01硬件复位', '0x03软件复位']) def MCUReset_req(self): - req = 0x03 if self.UI.comboBox_8.currentData() == '0x01硬件复位' else 0x01 + req = 0x03 if self.UI.comboBox_8.currentText() == '0x01硬件复位' else 0x01 self.udsclient.ecu_reset(req) + + def ClearDTC(self): + self.udsclient.clear_dtc( + group=DTCGroup_dic[self.UI.comboBox_7.currentText()]) + + def DTC_Control(self): + self.udsclient.control_dtc_setting( + setting_type=DTC_Control_dic[self.UI.comboBox_13.currentText()]) def Security_req(self): self.udsclient.unlock_security_access(1) @@ -649,11 +785,18 @@ self.UI.pushButton_31.setStyleSheet( "background-color:rgb(255, 85, 127)") - def SecurityUnlockLevel_3(self): + def SecurityUnlockLevel_2(self): data = self.udsclient.unlock_security_access(9) if data.positive: self.UI.pushButton_32.setStyleSheet( "background-color:rgb(255, 85, 127)") + + def SecurityUnlockLevel_3(self): + data = self.udsclient.unlock_security_access(0x7d) + if data.positive: + self.UI.pushButton_32.setStyleSheet( + "background-color:rgb(0, 128, 0)") + return data def ReadSupplyID(self): data = self.ReadByDID(0xF18A) @@ -682,15 +825,27 @@ def DID_display(self): # print(self.UI.comboBox_6.currentText()) - self.UI.lineEdit.setText("0x{:0<4x}". - format((DID_dic[self.UI.comboBox_6.currentText()]))) + self.UI.lineEdit.setText("0x{:0<4x}".format( + (DID_dic[self.UI.comboBox_6.currentText()]))) def ReadDataByID(self): tempdid = DID_dic[self.UI.comboBox_6.currentText()] self.UI.lineEdit_2.clear() data = self.ReadByDID(tempdid) if data is not None and len(str(data[tempdid])): - self.UI.lineEdit_2.setText(str(data[tempdid])+' ') + # print(data[tempdid]) + self.UI.lineEdit_2.setText(data[tempdid]) + + def WriteDataByID(self): + tempdid = DID_dic[self.UI.comboBox_6.currentText()] + writedata = self.UI.lineEdit_2.text() + try: + response = self.udsclient.write_data_by_identifier( + tempdid, writedata) + if response.positive: + values = response.service_data.values + except Exception as e: + g_signal.sig_Disp_str.emit(e) def communicationControl_req(self): req = 0x00 @@ -701,14 +856,23 @@ req = 1 else: req = 3 - self.udsclient.communication_control( - control_type=0, communication_type=req) + try: + self.udsclient.communication_control(control_type=req, + communication_type=1) + except Exception as e: + self.disp_string('%s' % e) def StartCalibraiton(self): - startcmd = bytearray([0x0, 0x0, 0x0, 0x0]) - if self.RequestRoutControl(0xFFAA, bytes(startcmd)): - self.UI.pushButton_9.setStyleSheet( - "background-color:rgb(255, 85, 127)") + sesson = 3 + self.UI.comboBox.setCurrentIndex(sesson - 1) + response = self.udsclient.change_session(sesson) + if response.positive: + response = self.SecurityUnlockLevel_3() + if response.positive: + startcmd = bytearray([0x8, 0x0, 0x0, 0x0]) + if self.RequestRoutControl(0xFFAA, bytes(startcmd)): + self.UI.pushButton_9.setStyleSheet( + "background-color:rgb(255, 85, 127)") def Calibraiton_Z(self): calcmd = bytearray([0x1, 0x0, 0x0, 0x0]) @@ -717,43 +881,43 @@ "background-color:rgb(255, 85, 127)") def Calibraiton_M(self): - calcmd = bytearray([0x2, 0x0, 0x0, 0x0]) + calcmd = bytearray([0x0, 0x0, 0x0, 0x0]) if self.RequestRoutControl(0xFFAA, bytes(calcmd)): self.UI.pushButton_13.setStyleSheet( "background-color:rgb(255, 85, 127)") def Calibraiton_MP(self): - calcmd = bytearray([0x3, 0x0, 0x0, 0x0]) + calcmd = bytearray([0x2, 0x0, 0x0, 0x0]) if self.RequestRoutControl(0xFFAA, bytes(calcmd)): self.UI.pushButton_18.setStyleSheet( "background-color:rgb(255, 85, 127)") def Calibraiton_MN(self): - calcmd = bytearray([0x4, 0x0, 0x0, 0x0]) + calcmd = bytearray([0x3, 0x0, 0x0, 0x0]) if self.RequestRoutControl(0xFFAA, bytes(calcmd)): self.UI.pushButton_22.setStyleSheet( "background-color:rgb(255, 85, 127)") def Calibraiton_X2(self): - calcmd = bytearray([0x5, 0x0, 0x0, 0x0]) + calcmd = bytearray([0x4, 0x0, 0x0, 0x0]) if self.RequestRoutControl(0xFFAA, bytes(calcmd)): self.UI.pushButton_24.setStyleSheet( "background-color:rgb(255, 85, 127)") def Calibraiton_X1(self): - calcmd = bytearray([0x6, 0x0, 0x0, 0x0]) + calcmd = bytearray([0x5, 0x0, 0x0, 0x0]) if self.RequestRoutControl(0xFFAA, bytes(calcmd)): self.UI.pushButton_26.setStyleSheet( "background-color:rgb(255, 85, 127)") def Calibraiton_Y1(self): - calcmd = bytearray([0x7, 0x0, 0x0, 0x0]) + calcmd = bytearray([0x6, 0x0, 0x0, 0x0]) if self.RequestRoutControl(0xFFAA, bytes(calcmd)): self.UI.pushButton_27.setStyleSheet( "background-color:rgb(255, 85, 127)") def Calibraiton_Y2(self): - calcmd = bytearray([0x8, 0x0, 0x0, 0x0]) + calcmd = bytearray([0x7, 0x0, 0x0, 0x0]) if self.RequestRoutControl(0xFFAA, bytes(calcmd)): self.UI.pushButton_28.setStyleSheet( "background-color:rgb(255, 85, 127)") @@ -768,35 +932,107 @@ resp_1 = self.udsclient.start_routine(routine_id=routine_id, data=data) return resp_1.positive + def ReportSupportDTC(self): + resp_1 = self.udsclient.get_supported_dtc() + if resp_1.positive: + for i in range(len(resp_1.data) // 4): + a = i * 4 + 2 + b = i * 4 + 3 + c = i * 4 + 4 + d = i * 4 + 5 + dtc_hex = resp_1.data[a] << 16 | resp_1.data[ + b] << 8 | resp_1.data[c] << 0 + log = 'DTC:0x%.6x Status:0x%.2x ' % ( + dtc_hex, resp_1.data[d]) + DTC_DescriptionDic[dtc_hex] + for j in range(8): + if resp_1.data[d] >> j & 0x1: + log += ' bit%d' % j + self.disp_string(log) + + def ReportByMask(self): + mask = ReportBymask_DTC[self.UI.comboBox_11.currentText()] + resp_1 = self.udsclient.get_dtc_by_status_mask(status_mask=mask) + if resp_1.positive: + for i in range(len(resp_1.data) // 4): + a = i * 4 + 2 + b = i * 4 + 3 + c = i * 4 + 4 + d = i * 4 + 5 + dtc_hex = resp_1.data[a] << 16 | resp_1.data[ + b] << 8 | resp_1.data[c] << 0 + log = 'DTC:0x%.2x%.2x%.2x Status:0x%.2x %s ' % ( + resp_1.data[a], resp_1.data[b], resp_1.data[c], + resp_1.data[d], DTC_DescriptionDic[dtc_hex]) + for j in range(8): + if resp_1.data[d] >> j & 0x1: + log += 'bit%d ' % j + self.disp_string(log) + + def ReportSnapshotByDTC(self): + dtc = DTC_Dic[self.UI.comboBox_12.currentText()][ + self.UI.comboBox_12.currentIndex()] + try: + resp = self.udsclient.get_dtc_snapshot_by_dtc_number( + dtc=dtc, record_number=1) + if resp.positive: + number_of_did = resp.data[6] + status = resp.data[4] + print(status) + dtc = resp.service_data.dtcs[0] + snapshots = dtc.snapshots + # print(dtc.id) + # print(dtc.status.get_byte()) + log = "DTC:0x%.6x Status:0x%.2x" % (dtc.id, status) + ' ' + # print(type(snapshots[0].did)) + # print(type(snapshots[0].data)) + for i in range(number_of_did): + log += (DTC_SanpshotDescriptionDic[snapshots[i].did] + + ':' + snapshots[i].data + ' ') + self.disp_string(log) + + # print(snapshots[i].did, snapshots[i].data) + except Exception as e: + self.disp_string('%s' % e) + + def SetFromBootFlag(self): + self.startfromboot = self.UI.radioButton_2.isChecked() + def pre_programming(self): + if not self.startfromboot: + self.disp_string('# 预编程步骤') - self.disp_string('# 预编程步骤') + # 进入extended session + self.disp_string('>>> 进入扩展模式') + response = self.udsclient.change_session(3) + if response.positive: + self.disp_string('>>> 解锁安全访问') + response = self.udsclient.unlock_security_access(9) - # 进入extended session - self.disp_string('>>> 进入扩展模式') - response = self.udsclient.change_session(3) - if response.positive: - # 检查编程条件 - self.disp_string('>>> 检查编程条件') - response = self.udsclient.start_routine(0xff02) - if response.positive: - # 关闭DTC的存储 - self.disp_string('>>> 关闭DTC的存储') - response = self.udsclient.control_dtc_setting(2) - # print(response) - if response.positive: - # 关闭与诊断无关的报文 - self.disp_string('>>> 关闭与诊断无关的报文') - response = self.udsclient.communication_control(0x03, 0x01) - else: - self.disp_string('>>> 预编程失败') + if response.positive: + # 检查编程条件 + self.disp_string('>>> 检查编程条件') + response = self.udsclient.start_routine(0xff02) + if response.positive: + # 关闭DTC的存储 + self.disp_string('>>> 关闭DTC的存储') + response = self.udsclient.control_dtc_setting(2) + # print(response) + if response.positive: + # 关闭与诊断无关的报文 + self.disp_string('>>> 关闭与诊断无关的报文') + response = self.udsclient.communication_control(0x03, 0x01) + else: + self.disp_string('>>> 预编程失败') - return response.positive + return response.positive # print(response) def main_programming(self): - self.disp_string('# 主编程步骤') + + if self.startfromboot: + self.disp_string('>>> 进入扩展模式') + response = self.udsclient.change_session(3) # 进入programming session self.disp_string('>>> 进入编程模式') @@ -807,49 +1043,40 @@ self.disp_string('>>> 安全访问') response = self.udsclient.unlock_security_access(9) # print(response) - + if response.positive: + self.disp_string('>>> 请求下载驱动文件') + address = 0x20004600 + memorysize = 10 + memory_location = MemoryLocation(address, memorysize, 32, 32) + response = self.udsclient.request_download(memory_location) # print(response) if response.positive: # 发送driver文件 self.disp_string('>>> 发送driver文件') drv_file = self.drv_data - address = drv_file[16] << 24 | drv_file[17] << 16 | drv_file[ - 18] << 8 | drv_file[19] << 0 - memorysize = drv_file[20] << 24 | drv_file[21] << 16 | drv_file[ - 22] << 8 | drv_file[23] << 0 - memory_location = MemoryLocation(address, memorysize, 32, 32) - response = self.udsclient.request_download(memory_location) - # print(response) - - max_length = response.service_data.max_length - - # 有效数据长度, 去除sid和sequence两个字节 - payload_length = max_length - 2 - - count = (len(drv_file) + payload_length - 1) // payload_length - - base = self.get_progressbar_pos() - for i in range(count): - start = i * payload_length - end = start + payload_length - response = self.udsclient.transfer_data((i + 1) % 256, - drv_file[start:end]) - self.set_progressbar_pos(base + end) + response = self.udsclient.transfer_data(1, bytes(drv_file)) + self.set_progressbar_pos(len(drv_file)) # print(response) if response.positive: + self.disp_string('>>> 发送驱动结束,请求推出') response = self.udsclient.request_transfer_exit() # print(response) if response.positive: # driver文件完整性检验 self.disp_string('>>> driver文件完整性检验') - response = self.udsclient.start_routine(0xf001, drv_file[0:4]) + drivecrc = bytes([0x9B, 0x39, 0xf2, 0xec]) + response = self.udsclient.start_routine(0xf001, drivecrc) # print(response) if response.positive: app_file = self.app_data - address = app_file[16] << 24 | app_file[17] << 16 | app_file[ - 18] << 8 | app_file[19] << 0 - memorysize = app_file[20] << 24 | app_file[21] << 16 | app_file[ - 22] << 8 | app_file[23] << 0 + address = app_file[APP_ADDR_LOCATION_OFFSET] << 24 | app_file[ + APP_ADDR_LOCATION_OFFSET + 1] << 16 | app_file[ + APP_ADDR_LOCATION_OFFSET + + 2] << 8 | app_file[APP_ADDR_LOCATION_OFFSET + 3] << 0 + memorysize = app_file[APP_LENGTH_LOCATION_OFFSET] << 24 | app_file[ + APP_LENGTH_LOCATION_OFFSET + 1] << 16 | app_file[ + APP_LENGTH_LOCATION_OFFSET + + 2] << 8 | app_file[APP_LENGTH_LOCATION_OFFSET + 3] << 0 memory_location = MemoryLocation(address, memorysize, 32, 32) # 删除app存储空间 @@ -864,11 +1091,11 @@ # 发送app文件 self.disp_string('>>> 发送app文件') response = self.udsclient.request_download(memory_location) - # print(response) + # print(response) max_length = response.service_data.max_length - # 有效数据长度, 去除sid和sequence两个字节 + # 有效数据长度, 去除sid和sequence两个字节 payload_length = max_length - 2 count = (len(app_file) + payload_length - 1) // payload_length @@ -898,12 +1125,11 @@ # 回到default session self.disp_string('>>> 回到默认模式') - self.udsclient.suppress_positive_response.enabled = True response = self.udsclient.change_session(1) return response.positive def start_programming(self): - + self.UI.pushButton_48.setDisabled(True) self.set_progressbar_pos(0) t1 = time.time() @@ -915,13 +1141,14 @@ self.post_programming() except Exception as e: - self.disp_string('%s' % e, '#ff0000') + self.disp_string('%s' % e) + # self.UI.pushButton_48.setDisabled(False) t2 = time.time() self.disp_string('finished in %.2f sec' % (t2 - t1)) self.in_programming = False - self.UI.pushButton_48.setDisabled(True) + self.UI.pushButton_48.setDisabled(False) def set_progressbar_pos(self, pos): @@ -944,16 +1171,54 @@ return fileName, _ = QtWidgets.QFileDialog.getOpenFileName( - self, 'comboBox_4', '.', 'All Files (*);;Bin Files (*.bin);;Hex Files (*.hex)') + None, 'comboBox_4', '.', + 'All Files (*);;Bin Files (*.bin);;Hex Files (*.hex)') if fileName != '': self.UI.comboBox_4.addItem(fileName) self.UI.comboBox_4.setCurrentText(fileName) - with open(fileName, 'rb') as fd: - self.app_data = fd.read() - drv_data_len = len( - self.drv_data) if self.drv_data is not None else 0 - app_data_len = len( - self.app_data) if self.app_data is not None else 0 - total_len = drv_data_len + app_data_len - self.UI.progressBar.setRange(0, total_len) + + # with open(fileName, 'rb') as fd: + # self.app_data = fd.read() + # drv_data_len = len( + # self.drv_data) if self.drv_data is not None else 0 + # app_data_len = len( + # self.app_data) if self.app_data is not None else 0 + # total_len = drv_data_len + app_data_len + # self.UI.progressBar.setRange(0, total_len) + + # print(self.app_data) + # print("===============================================") + hex_read = Hex_read() + hex_read.Open_file(fileName) + self.app_data = hex_read.data + drv_data_len = len( + self.drv_data) if self.drv_data is not None else 0 + app_data_len = len( + self.app_data) if self.app_data is not None else 0 + total_len = drv_data_len + app_data_len + self.UI.progressBar.setRange(0, total_len) + + def SetBootLog(self): + if self.in_programming: + self.disp_string('## 正在编程中') + return + + fileName, _ = QtWidgets.QFileDialog.getSaveFileName( + None, 'comboBox_10', '.', + 'All Files (*);;ASC Files (*.asc);;Blf Files (*.blf)') + + if fileName != '': + self.UI.comboBox_10.addItem(fileName) + self.UI.comboBox_10.setCurrentText(fileName) + print(fileName) + self.boot_logger = Logger(fileName) + # self.listener = [self.boot_logger] + # with open(fileName, 'rb') as fd: + # self.app_data = fd.read() + # drv_data_len = len( + # self.drv_data) if self.drv_data is not None else 0 + # app_data_len = len( + # self.app_data) if self.app_data is not None else 0 + # total_len = drv_data_len + app_data_len + # self.UI.progressBar.setRange(0, total_len) -- Gitblit v1.8.0