From 48fc7d6c9549513d9b1da3f99f449fc7de27821f Mon Sep 17 00:00:00 2001 From: tao_z <tzj0429@163.com> Date: Fri, 10 Jun 2022 19:23:26 +0800 Subject: [PATCH] 修改信息打印标准 增加数据写入功能 --- widgets/ShifterTool.py | 684 +++++++++++++++++++++++++++++++++++++++++++++++--------- 1 files changed, 566 insertions(+), 118 deletions(-) diff --git a/widgets/ShifterTool.py b/widgets/ShifterTool.py index cbf5baa..afd7c68 100644 --- a/widgets/ShifterTool.py +++ b/widgets/ShifterTool.py @@ -1,5 +1,9 @@ - +from ast import Not, Pass +from concurrent.futures import thread +from curses import flash +from logging import exception import threading +from typing import List from can import BusABC, Message from udsoncan.client import Client from udsoncan.exceptions import TimeoutException @@ -10,27 +14,31 @@ from isotp import CanMessage import queue from USBCAN import * -from Shifter import ShifterClass +from Shifter import * +from styleSheet import * from mainwindows import Ui_MainWindow from PySide2 import QtWidgets, QtCore, QtGui import struct import time import datetime -from multiprocessing import Array, Pool, Process, Queue, Value +from ShifterDefine import * +from multiprocessing import Process, Queue, Value, Pipe MAX_RCV_NUM = 20 - +APP_ADDR_LOCATION_OFFSET = 8 +APP_LENGTH_LOCATION_OFFSET = 12 logger = logging.getLogger() -logger.setLevel(logging.DEBUG) +logger.setLevel(logging.INFO) formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s') consoleHandler = logging.StreamHandler() -consoleHandler.setLevel(logging.DEBUG) +consoleHandler.setLevel(logging.ERROR) -fileHandler = logging.FileHandler( - './log/ShiftTool.log', mode='a', encoding='UTF-8') -fileHandler.setLevel(logging.NOTSET) +fileHandler = logging.FileHandler('./log/ShiftTool.log', + mode='a', + encoding='UTF-8') +fileHandler.setLevel(logging.INFO) consoleHandler.setFormatter(formatter) fileHandler.setFormatter(formatter) @@ -44,7 +52,13 @@ sig_MsgReceived = QtCore.Signal(int) +g_signal = UISignals() +uds_conn, isotp_conn = Pipe() +user_conn, drive_conn = Pipe() + + class HardwreDevice(): + def __init__(self): self.device_type = 4 self.channel = 0 # can_index @@ -53,6 +67,7 @@ class PeriodSendThread(object): + def __init__(self, period_func, args=[], kwargs={}): self._thread = threading.Thread(target=self._run) self._function = period_func @@ -100,7 +115,6 @@ BaseConnection.__init__(self, name) self.UDStoIsoTPQueue = queue.Queue() # USD --> isotp self.IsoTPtoUDSQueue = queue.Queue() # ISOTP --> UDS - self.CANtoIsoTPQueue = queue.Queue() # CAN --> isotp # self.IsoTPtoCANQueue = queue.Queue() # ISOTP--> CAN self._read_thread = None self.exit_requested = False @@ -108,12 +122,13 @@ self.isotp_layer = isotp_layer assert isinstance( - self.isotp_layer, isotp.TransportLayer), 'isotp_layer must be a valid isotp.TransportLayer ' + self.isotp_layer, isotp.TransportLayer + ), 'isotp_layer must be a valid isotp.TransportLayer ' def open(self): self.exit_requested = False - self._read_thread = threading.Thread( - None, target=self.rxthread_task) + self._read_thread = threading.Thread(None, + target=self.rxthread_task) self._read_thread.start() self.opened = True logger.info('Connection opened') @@ -141,11 +156,13 @@ if self.mtu is not None: if len(payload) > self.mtu: logger.warning( - "Truncating payload to be set to a length of %d" % (self.mtu)) + "Truncating payload to be set to a length of %d" % + (self.mtu)) payload = payload[0:self.mtu] # isotp.protocol.TransportLayer uses byte array. udsoncan is strict on bytes format - self.UDStoIsoTPQueue.put(bytearray(payload)) + self.isotp_layer.send(bytearray(payload)) + # self.UDStoIsoTPQueue.put(bytearray(payload)) def specific_wait_frame(self, timeout=2): if not self.opened: @@ -153,20 +170,23 @@ timedout = False frame = None + # frame = uds_conn.recv() try: - frame = self.IsoTPtoUDSQueue.get(block=True, timeout=timeout) - # frame = self.CANtoIsoTPQueue.get(block=True, timeout=timeout) + frame = self.IsoTPtoUDSQueue.get(block=True, timeout=5) except queue.Empty: timedout = True + # frame = self.CANtoIsoTPQueue.get(block=True, timeout=timeout) if timedout: raise TimeoutException( - "Did not receive frame IsoTP Transport layer in time (timeout=%s sec)" % timeout) + "Did not receive frame IsoTP Transport layer in time (timeout=%s sec)" + % timeout) if self.mtu is not None: if frame is not None and len(frame) > self.mtu: logger.warning( - "Truncating received payload to a length of %d" % (self.mtu)) + "Truncating received payload to a length of %d" % + (self.mtu)) frame = frame[0:self.mtu] # isotp.protocol.TransportLayer uses bytearray. udsoncan is strict on bytes format @@ -183,20 +203,22 @@ def rxthread_task(self): while not self.exit_requested: try: - # self.logger.debug("UDStoIsoTPQueue queue size is now %d" % ( - # self.UDStoIsoTPQueue.qsize())) - while not self.UDStoIsoTPQueue.empty(): - self.isotp_layer.send(self.UDStoIsoTPQueue.get()) + # # self.logger.debug("UDStoIsoTPQueue queue size is now %d" % ( + # # self.UDStoIsoTPQueue.qsize())) + # while not self.UDStoIsoTPQueue.empty(): + # self.isotp_layer.send(self.UDStoIsoTPQueue.get()) self.isotp_layer.process() while self.isotp_layer.available(): + # isotp_conn.send(self.isotp_layer.recv()) self.IsoTPtoUDSQueue.put(self.isotp_layer.recv()) - # self.logger.debug("IsoTPtoUDSQueue queue size is now %d" % ( - # self.IsoTPtoUDSQueue.qsize())) + self.logger.debug( + "IsoTPtoUDSQueue queue size is now %d" % + (self.IsoTPtoUDSQueue.qsize())) - # time.sleep(self.isotp_layer.sleep_time()) - time.sleep(0.0001) + # time.sleep(self.isotp_layer.sleep_time()) + time.sleep(0.001) except Exception as e: self.exit_requested = True @@ -212,8 +234,7 @@ # self.protocol("WM_DELETE_WINDOW", self.Form_OnClosing) self.msgQueue = Queue() # can layer receive queue self.sendQueue = Queue() # can layer send queue - self.signal = UISignals() - + self.CANtoIsoTPQueue = Queue() # CAN --> isotp self.shifter = ShifterClass() self.devicedescription = HardwreDevice() self.windows = QtWidgets.QMainWindow() @@ -221,10 +242,17 @@ self.UI.setupUi(window) self._dev_info = None self.can_thread = threading.Thread(target=self.can_thread) - + self.TestPresentTimer = QtCore.QTimer() + self.TestPresentTimer.timeout.connect(self.creat_testpresentReq) self.DeviceInit() self.WidgetsInit() self.ChnInfoUpdate(self._isOpen) + self.in_programming = False + self.startfromboot = False + self.drv_data = [ + 0x1c, 0x01, 0x06, 0x80, 0x1f, 0x01, 0x06, 0x80, 0xfb, 0x0a + ] + self.app_data: bytes = None def DeviceInit(self): self._usbcan = None @@ -260,26 +288,33 @@ 'squash_stmin_requirement': False } self._isotpaddr_PHYS = isotp.Address( - isotp.AddressingMode.Normal_11bits, txid=self.shifter.canid.phy_rxId, rxid=self.shifter.canid.phy_txId) + isotp.AddressingMode.Normal_11bits, + txid=self.shifter.canid.phy_rxId, + rxid=self.shifter.canid.phy_txId) self._isotpaddr_FUNC = isotp.Address( - isotp.AddressingMode.Normal_11bits, txid=self.shifter.canid.fun_rxId, rxid=self.shifter.canid.phy_txId) + isotp.AddressingMode.Normal_11bits, + txid=self.shifter.canid.fun_rxId, + rxid=self.shifter.canid.phy_txId) # self._isotpaddr_EPS = isotp.Address( # isotp.AddressingMode.Normal_11bits, txid=EPS_RX_ID_PHYS, rxid=EPS_TX_ID) # self._isotpaddr_EPS4wd = isotp.Address( # isotp.AddressingMode.Normal_11bits, txid=EPS4wd_RX_ID_PHYS, rxid=EPS4wd_TX_ID) - self.isotp_layer = isotp.TransportLayer( - rxfn=self.isotp_rcv, txfn=self.isotp_send, address=self._isotpaddr_PHYS, params=self.isotp_params) + self.isotp_layer = isotp.TransportLayer(rxfn=self.isotp_rcv, + txfn=self.isotp_send, + address=self._isotpaddr_PHYS, + params=self.isotp_params) self.conn = ShifterTool.IsoTpConnection(isotp_layer=self.isotp_layer) self.udsclient = Client(self.conn, request_timeout=2) self.udsclient.config['security_algo'] = self.SecAlgo self.udsclient.config['security_algo_params'] = [ - 0x4FE87269, 0x6BC361D8, 0x9B127D51, 0x5BA41903] + 0x11223344, 0x20AA097B, 0x11223344, 0x70237577 + ] self.udsclient.config['data_identifiers'] = self.shifter.did_config self.udsclient.config['server_address_format'] = 32 self.udsclient.config['server_memorysize_format'] = 32 - def SecAlgo(self, level, seed, params): + def SecAlgo(self, level, seed, params=None): """ Builds the security key to unlock a security level. @@ -296,32 +331,36 @@ self.output_key[2] = ((temp_key[1] & 0xFC) >> 2) | (temp_key[0] & 0xC0) self.output_key[3] = ((temp_key[0] & 0x0F) << 4) | (temp_key[3] & 0x0F) """ - temp_key = (seed[0] << 24) | ( - seed[1] << 16) | (seed[2] << 8) | (seed[3]) + temp_key = (seed[0] << 24) | (seed[1] << 16) | (seed[2] << 8) | ( + seed[3]) if level == 0x01: - output_key_temp = ((((temp_key >> 4) ^ temp_key) - << 3) ^ temp_key) & 0xFFFFFFFF - elif level == 0x11: - _temp_y = ((temp_key << 24) & 0xFF000000) + ((temp_key << 8) & - 0xFF0000) + ((temp_key >> 8) & 0xFF00) + ((temp_key >> 24) & 0xFF) - _temp_z = 0 - _temp_sum = 0 - for i in range(64): - _temp_y += ((((_temp_z << 4) ^ (_temp_z >> 5)) + _temp_z) - ^ (_temp_sum + params[_temp_sum & 0x3])) & 0xFFFFFFFF - _temp_y = _temp_y & 0xFFFFFFFF - _temp_sum += 0x8F750A1D - _temp_sum = _temp_sum & 0xFFFFFFFF - _temp_z += ((((_temp_y << 4) ^ (_temp_y >> 5)) + _temp_y) ^ - (_temp_sum + params[(_temp_sum >> 11) & 0x3])) & 0xFFFFFFFF - _temp_z = _temp_z & 0xFFFFFFFF - output_key_temp = (((_temp_z << 24) & 0xFF000000) | ((_temp_z << 8) & 0xFF0000) | ( - (_temp_z >> 8) & 0xFF00) | ((_temp_z >> 24) & 0xFF)) + output_key_temp = 0x20AA097B + # output_key_temp = ((((temp_key >> 4) ^ temp_key) + # << 3) ^ temp_key) & 0xFFFFFFFF + elif level == 0x09: + output_key_temp = 0x70237577 + # _temp_y = ((temp_key << 24) & 0xFF000000) + ((temp_key << 8) & + # 0xFF0000) + ((temp_key >> 8) & 0xFF00) + ((temp_key >> 24) & 0xFF) + # _temp_z = 0 + # _temp_sum = 0 + # for i in range(64): + # _temp_y += ((((_temp_z << 4) ^ (_temp_z >> 5)) + _temp_z) + # ^ (_temp_sum + params[_temp_sum & 0x3])) & 0xFFFFFFFF + # _temp_y = _temp_y & 0xFFFFFFFF + # _temp_sum += 0x8F750A1D + # _temp_sum = _temp_sum & 0xFFFFFFFF + # _temp_z += ((((_temp_y << 4) ^ (_temp_y >> 5)) + _temp_y) ^ + # (_temp_sum + params[(_temp_sum >> 11) & 0x3])) & 0xFFFFFFFF + # _temp_z = _temp_z & 0xFFFFFFFF + # output_key_temp = (((_temp_z << 24) & 0xFF000000) | ((_temp_z << 8) & 0xFF0000) | ( + # (_temp_z >> 8) & 0xFF00) | ((_temp_z >> 24) & 0xFF)) else: output_key_temp = temp_key - output_key = struct.pack('BBBB', (output_key_temp >> 24) & 0xFF, ( - output_key_temp >> 16) & 0xFF, (output_key_temp >> 8) & 0xFF, output_key_temp & 0xFF) + output_key = struct.pack('BBBB', (output_key_temp >> 24) & 0xFF, + (output_key_temp >> 16) & 0xFF, + (output_key_temp >> 8) & 0xFF, + output_key_temp & 0xFF) return output_key @@ -343,16 +382,8 @@ '''receive msg from can layer use queue read from can layer listen''' can_msgs = None - if not self.conn.CANtoIsoTPQueue.empty(): - can_msgs = self.conn.CANtoIsoTPQueue.get() - # can_num = self.conn.CANtoIsoTPQueue.size() - # # can_num = self._usbcan.GetReceiveNum(self.devicedescription.channel) - # if can_num and not self._terminated: - # read_cnt = MAX_RCV_NUM if can_num >= MAX_RCV_NUM else can_num - # for i in range(read_cnt): - # can_msgs[i] = self.conn.CANtoIsoTPQueue.get() - # else: - # can_msgs = None + if not self.CANtoIsoTPQueue.empty(): + can_msgs = self.CANtoIsoTPQueue.get() return can_msgs def isotp_send(self, isotp_msg): @@ -361,11 +392,10 @@ msg = Message() msg.arbitration_id = isotp_msg.arbitration_id msg.dlc = isotp_msg.dlc - msg.data = isotp_msg.data + msg.channel = 0 + msg.data = isotp_msg.data # bytearray msg.is_extended_id = False - # for i in range(isotp_msg.dlc): - # msg.data[i] = isotp_msg.data[i] - self.sendQueue.put(msg) + self._usbcan.send(msg) def WidgetsInit(self): # self.UI.setupUi(self) @@ -373,7 +403,7 @@ self.UI.comboBox_2.addItem('USBCAN-II') # 波特率 - self.UI.comboBox_3.addItems([('%dK' % (i/1000)) + self.UI.comboBox_3.addItems([('%dK' % (i / 1000)) for i in TIMING_DICT.keys()]) # CHANNEL self.UI.comboBox_5.addItems(['CH1/3', 'CH2/4']) @@ -383,16 +413,48 @@ self.UI.comboBox_3.activated.connect(self.update_HardwareDevice) self.UI.comboBox_5.activated.connect(self.update_HardwareDevice) - self.signal.sig_Disp_str.connect(self.disp_string) - self.signal.sig_MsgReceived.connect(self._updateRootList) + g_signal.sig_Disp_str.connect(self.disp_string) + g_signal.sig_MsgReceived.connect(self._updateRootList) self.initUDSUI() def initUDSUI(self): - self.UI.comboBox.addItems(['0x01默认模式', '0x02编程模式', '0x03编程模式']) + self.UI.comboBox.addItems(['0x01默认模式', '0x02编程模式', '0x03扩展模式']) + self.UI.comboBox.activated.connect(self.sessioncontrol_req) + self.UI.comboBox_8.addItems(['0x01硬件复位', '0x03软件复位']) + self.UI.comboBox_8.activated.connect(self.MCUReset_req) + self.UI.comboBox_9.addItems(['0x00使能收发', '0x01能收禁发', '0x03禁止收发']) + self.UI.comboBox_9.activated.connect(self.communicationControl_req) + self.UI.comboBox_6.addItems(list(DID_dic.keys())) + self.DID_display() + self.UI.comboBox_6.activated.connect(self.DID_display) + + self.UI.pushButton_31.clicked.connect(self.SecurityUnlockLevel_1) + self.UI.pushButton_32.clicked.connect(self.SecurityUnlockLevel_3) + + self.UI.pushButton_12.clicked.connect(self.ReadSupplyID) + self.UI.pushButton_4.clicked.connect(self.ReadVIN) + self.UI.pushButton_6.clicked.connect(self.ReadMfgDate) + self.UI.pushButton_14.clicked.connect(self.ReadDataByID) + self.UI.pushButton_3.clicked.connect(self.WriteDataByID) + self.UI.pushButton_9.clicked.connect(self.StartCalibraiton) + self.UI.pushButton_11.clicked.connect(self.Calibraiton_Z) + self.UI.pushButton_13.clicked.connect(self.Calibraiton_M) + self.UI.pushButton_18.clicked.connect(self.Calibraiton_MP) + self.UI.pushButton_22.clicked.connect(self.Calibraiton_MN) + self.UI.pushButton_24.clicked.connect(self.Calibraiton_X2) + self.UI.pushButton_26.clicked.connect(self.Calibraiton_X1) + self.UI.pushButton_27.clicked.connect(self.Calibraiton_Y1) + self.UI.pushButton_28.clicked.connect(self.Calibraiton_Y2) + self.UI.pushButton_29.clicked.connect(self.Calibraiton_GAP) + self.UI.radioButton.toggled.connect(self.TestPresentEvent) + + self.UI.pushButton_48.clicked.connect(self.start_programming) + self.UI.pushButton_46.clicked.connect(self.loadAppfile) + self.UI.radioButton_2.toggled.connect(self.SetFromBootFlag) def _formatMsgData(self, index, item, received): '''msg data to list @@ -425,8 +487,8 @@ else: data.append('标准帧') - data.append(' '.join(['0x' + - '{:0<2x}'.format(a).upper() for a in list(item.data)])) + data.append(' '.join( + ['0x' + '{:0<2x}'.format(a).upper() for a in list(item.data)])) return data def can_thread_stop(self): @@ -436,46 +498,62 @@ self.can_thread.start() def can_thread(self): - while self._isOpen.value == 1: + while True: time.sleep(0.01) - if self.needdisconnect.value == 1: - ret = self._usbcan.CloseDevice() - if ret == 1: - self.needdisconnect.value = 0 - self._isOpen.value = 0 - else: - msg, num = self._usbcan.Receive(len=10) - if not num == 0: - for i in range(num): - if msg[i].arbitration_id == self.shifter.canid.phy_txId: - self.conn.CANtoIsoTPQueue.put( - msg[i]) # send msg to isotp - self.msgQueue.put(msg[i]) - # send signal to update screen - logger.info('Rx: ID=0x{:0<3x} '.format(msg[i].arbitration_id) + ' '.join(['0x' + - '{:0<2x}'.format(a).upper() for a in list(msg[i].data)])) - self.signal.sig_MsgReceived.emit(num) - msgToSendCnt = self.sendQueue.qsize() - if msgToSendCnt > 0: - msg = Message() - for i in range(msgToSendCnt): + while self._isOpen.value == 1: + time.sleep(0.01) + if self.needdisconnect.value == 1: + ret = self._usbcan.CloseDevice() + if ret == 1: + self.needdisconnect.value = 0 + self._isOpen.value = 0 + else: + msg, num = self._usbcan.Receive(len=1) + if not num == 0: + for i in range(num): + if msg[i].arbitration_id == self.shifter.canid.phy_txId: + # conn.send(msg[i]) # pipe connection send + self.CANtoIsoTPQueue.put( + msg[i]) # send msg to isotp - msg = self.sendQueue.get() - logger.info('Tx: ID=0x{:0<3x} '.format( - msg.arbitration_id)+'DLC={} '.format(msg.dlc)+'externd flag ={} '.format(msg.is_extended_id)+'remote frame:{} '.format(msg.is_remote_frame)) - self._usbcan.send(msg) + self.msgQueue.put(msg[i]) + # logger.info('time:'.format(msg[i].timestamp)+'Rx: ID=0x{:0<3x} '.format(msg[i].arbitration_id) + ' '.join(['0x' + + # '{:0<2x}'.format(a).upper() for a in list(msg[i].data)])) + + # send signal to update screen + + g_signal.sig_MsgReceived.emit(num) + # msgToSendCnt = self.sendQueue.qsize() + # if msgToSendCnt > 0: + # msg = Message() + # for i in range(msgToSendCnt): + + # msg = self.sendQueue.get() + # # logger.info('Tx: ID=0x{:0<3x} '.format( + # # msg.arbitration_id)+'DLC={} '.format(msg.dlc)+'externd flag ={} '.format(msg.is_extended_id)+'remote frame:{} '.format(msg.is_remote_frame)) + # self._usbcan.send(msg) + def send_dump(self): + msg = Message() + msg.arbitration_id = self.shifter.canid.phy_rxId + msg.dlc = 8 + msg.channel = 0 + msg.data = [0x02, 0x3e, 0x80, 0, 0, 0, 0, 0] + msg.is_extended_id = False + self._usbcan.send(msg) def open_close(self): if self._isOpen.value == 0: if self._usbcan is None: self.update_HardwareDevice() - can_filters = [ - {'can_id': 0x420, 'can_mask': 0xFFFFFFFF}] + can_filters = [{'can_id': 0x420, 'can_mask': 0xFFFFFFFF}] - bitrate = list(TIMING_DICT.keys())[ - self.devicedescription.baudrate] - self._usbcan = USBCAN(device_type=4, device_index=0, - can_index=0, bitrate=bitrate, can_filters=can_filters) + bitrate = list( + TIMING_DICT.keys())[self.devicedescription.baudrate] + self._usbcan = USBCAN(device_type=4, + device_index=0, + can_index=0, + bitrate=bitrate, + can_filters=can_filters) if not self._usbcan.InitAndStart(): logger.info("Open usbcan device fail.") @@ -483,6 +561,8 @@ self._deviceOpenUpdate() self.can_thread_start() self.conn.open() # start iso tp thread + self.send_dump() + self.send_dump() else: self._usbcan.InitAndStart() @@ -491,10 +571,12 @@ else: self.needdisconnect.value = 1 self._deviceOpenUpdate() + self.TestPresentTimer.stop() def update_HardwareDevice(self): - self.devicedescription.device_type = self.UI.comboBox_2.currentIndex()+3 + self.devicedescription.device_type = self.UI.comboBox_2.currentIndex( + ) + 3 self.devicedescription.channel = self.UI.comboBox_5.currentIndex() self.devicedescription.baudrate = self.UI.comboBox_3.currentIndex() @@ -503,13 +585,14 @@ self.UI.pushButton.setText("Close CAN") elif self.needdisconnect.value == 1: self.UI.pushButton.setText("Open CAN") + # set ui info def _StartlistenMsgProcess(self): - self.msgProcess = Process( - name='pyUSBCANListener', - target=self._usbcan.ListeningMsg, - args=(self._isOpen, self.needdisconnect, self.msgQueue, self.sendQueue)) + self.msgProcess = Process(name='pyUSBCANListener', + target=self._usbcan.ListeningMsg, + args=(self._isOpen, self.needdisconnect, + self.msgQueue, self.sendQueue)) self.msgProcess.daemon = True self.msgProcess.start() # 1.5s后检测连接状态,该值可能需要标定 @@ -518,8 +601,20 @@ self._updateRootList() time.sleep(0.1) - def disp_string(self, out): - self.UI.statusbar.showMessage(str(out)) + def disp_string(self, out_s): + dt = datetime.datetime.now() + nowtime_str = dt.strftime('%I:%M:%S') # time + self.UI.textEdit_2.insertPlainText(nowtime_str + ': ') + cursor = self.UI.textEdit_2.textCursor() + cursor.movePosition(QtGui.QTextCursor.End) + self.UI.textEdit_2.setTextCursor(cursor) + self.UI.textEdit_2.insertPlainText(str(out_s + "\t\n")) + + # self.statusbar.showMessage(str(out)) + def dispShiftstatus(self): + self.UI.pushButton_41.setStyleSheet( + Style_dic[self.shifter.UnlockButton]) + self.UI.pushButton_40.setStyleSheet(Style_dic[self.shifter.Pbutton]) def _updateRootList(self): _dataSize = self.msgQueue.qsize() @@ -530,9 +625,11 @@ for i in range(_dataSize): receiveNum += 1 msg = self.msgQueue.get() + self.shifter.FramUnpack(msg.arbitration_id, msg.data) formateddata.append(self._formatMsgData( receiveNum, msg, True)) # return a data list self._insertDataSmooth(data=formateddata, datasize=_dataSize) + # self.dispShiftstatus() def _insertDataSmooth(self, data, datasize): # row = 6-datasize @@ -554,4 +651,355 @@ pass def TestPresentEvent(self): - self.udsclient.tester_present() + if self.UI.radioButton.isChecked(): + self.TestPresentTimer.start(3000) + else: + self.TestPresentTimer.stop() + + def creat_testpresentReq(self): + self.send_dump() + + def sessioncontrol_req(self): + sesson = self.UI.comboBox.currentIndex() + 1 + t = threading.Thread(target=self.udsclient.change_session, + args=(sesson, )) + t.start() + t.join() + # self.udsclient.change_session(sesson) + + # self.UI.comboBox_8.addItems(['0x01硬件复位', '0x03软件复位']) + def MCUReset_req(self): + req = 0x03 if self.UI.comboBox_8.currentData() == '0x01硬件复位' else 0x01 + self.udsclient.ecu_reset(req) + + def Security_req(self): + self.udsclient.unlock_security_access(1) + # Security_dic[self.UI.comboBox_8.currentData()] + + def SecurityUnlockLevel_1(self): + data = self.udsclient.unlock_security_access(1) + if data.positive: + self.UI.pushButton_31.setStyleSheet( + "background-color:rgb(255, 85, 127)") + + def SecurityUnlockLevel_3(self): + data = self.udsclient.unlock_security_access(9) + if data.positive: + self.UI.pushButton_32.setStyleSheet( + "background-color:rgb(255, 85, 127)") + + def ReadSupplyID(self): + data = self.ReadByDID(0xF18A) + if data is not None: + self.UI.lineEdit_8.setText(str(data[0xF18A])) + + def ReadVIN(self): + data = self.ReadByDID(0xF190) + if data is not None: + self.UI.lineEdit_3.setText(str(data[0xF190])) + + def ReadMfgDate(self): + data = self.ReadByDID(0x210B) + if data is not None: + self.UI.lineEdit_5.setText(str(data[0x210B])) + + def ReadByDID(self, didlist: List): + values = {} + try: + response = self.udsclient.read_data_by_identifier(didlist) + if response.positive: + values = response.service_data.values + return values + except Exception as e: + g_signal.sig_Disp_str.emit(e) + + def DID_display(self): + # print(self.UI.comboBox_6.currentText()) + self.UI.lineEdit.setText("0x{:0<4x}".format( + (DID_dic[self.UI.comboBox_6.currentText()]))) + + def ReadDataByID(self): + tempdid = DID_dic[self.UI.comboBox_6.currentText()] + self.UI.lineEdit_2.clear() + data = self.ReadByDID(tempdid) + if data is not None and len(str(data[tempdid])): + self.UI.lineEdit_2.setText(str(data[tempdid]) + ' ') + + def WriteDataByID(self): + tempdid = DID_dic[self.UI.comboBox_6.currentText()] + writedata = self.UI.lineEdit_2.text() + try: + response = self.udsclient.write_data_by_identifier( + tempdid, writedata) + if response.positive: + values = response.service_data.values + except Exception as e: + g_signal.sig_Disp_str.emit(e) + + def communicationControl_req(self): + req = 0x00 + select = self.UI.comboBox_9.currentText() + if select == '0x00使能收发': + req = 0 + elif select == '0x01能收禁发': + req = 1 + else: + req = 3 + self.udsclient.communication_control(control_type=0, + communication_type=req) + + def StartCalibraiton(self): + startcmd = bytearray([0x0, 0x0, 0x0, 0x0]) + if self.RequestRoutControl(0xFFAA, bytes(startcmd)): + self.UI.pushButton_9.setStyleSheet( + "background-color:rgb(255, 85, 127)") + + def Calibraiton_Z(self): + calcmd = bytearray([0x1, 0x0, 0x0, 0x0]) + if self.RequestRoutControl(0xFFAA, bytes(calcmd)): + self.UI.pushButton_11.setStyleSheet( + "background-color:rgb(255, 85, 127)") + + def Calibraiton_M(self): + calcmd = bytearray([0x2, 0x0, 0x0, 0x0]) + if self.RequestRoutControl(0xFFAA, bytes(calcmd)): + self.UI.pushButton_13.setStyleSheet( + "background-color:rgb(255, 85, 127)") + + def Calibraiton_MP(self): + calcmd = bytearray([0x3, 0x0, 0x0, 0x0]) + if self.RequestRoutControl(0xFFAA, bytes(calcmd)): + self.UI.pushButton_18.setStyleSheet( + "background-color:rgb(255, 85, 127)") + + def Calibraiton_MN(self): + calcmd = bytearray([0x4, 0x0, 0x0, 0x0]) + if self.RequestRoutControl(0xFFAA, bytes(calcmd)): + self.UI.pushButton_22.setStyleSheet( + "background-color:rgb(255, 85, 127)") + + def Calibraiton_X2(self): + calcmd = bytearray([0x5, 0x0, 0x0, 0x0]) + if self.RequestRoutControl(0xFFAA, bytes(calcmd)): + self.UI.pushButton_24.setStyleSheet( + "background-color:rgb(255, 85, 127)") + + def Calibraiton_X1(self): + calcmd = bytearray([0x6, 0x0, 0x0, 0x0]) + if self.RequestRoutControl(0xFFAA, bytes(calcmd)): + self.UI.pushButton_26.setStyleSheet( + "background-color:rgb(255, 85, 127)") + + def Calibraiton_Y1(self): + calcmd = bytearray([0x7, 0x0, 0x0, 0x0]) + if self.RequestRoutControl(0xFFAA, bytes(calcmd)): + self.UI.pushButton_27.setStyleSheet( + "background-color:rgb(255, 85, 127)") + + def Calibraiton_Y2(self): + calcmd = bytearray([0x8, 0x0, 0x0, 0x0]) + if self.RequestRoutControl(0xFFAA, bytes(calcmd)): + self.UI.pushButton_28.setStyleSheet( + "background-color:rgb(255, 85, 127)") + + def Calibraiton_GAP(self): + calcmd = bytearray([0x9, 0x0, 0x0, 0x0]) + if self.RequestRoutControl(0xFFAA, bytes(calcmd)): + self.UI.pushButton_29.setStyleSheet( + "background-color:rgb(255, 85, 127)") + + def RequestRoutControl(self, routine_id, data): + resp_1 = self.udsclient.start_routine(routine_id=routine_id, data=data) + return resp_1.positive + + def SetFromBootFlag(self): + self.startfromboot = self.UI.radioButton_2.isChecked() + + def pre_programming(self): + if not self.startfromboot: + self.disp_string('# 预编程步骤') + + # 进入extended session + self.disp_string('>>> 进入扩展模式') + response = self.udsclient.change_session(3) + if response.positive: + self.disp_string('>>> 解锁安全访问') + response = self.udsclient.unlock_security_access(9) + + if response.positive: + # 检查编程条件 + self.disp_string('>>> 检查编程条件') + response = self.udsclient.start_routine(0xff02) + if response.positive: + # 关闭DTC的存储 + self.disp_string('>>> 关闭DTC的存储') + response = self.udsclient.control_dtc_setting(2) + # print(response) + if response.positive: + # 关闭与诊断无关的报文 + self.disp_string('>>> 关闭与诊断无关的报文') + response = self.udsclient.communication_control(0x03, 0x01) + else: + self.disp_string('>>> 预编程失败') + + return response.positive + # print(response) + + def main_programming(self): + self.disp_string('# 主编程步骤') + + if self.startfromboot: + self.disp_string('>>> 进入扩展模式') + response = self.udsclient.change_session(3) + + # 进入programming session + self.disp_string('>>> 进入编程模式') + response = self.udsclient.change_session(2) + # print(response) + if response.positive: + # 安全访问 + self.disp_string('>>> 安全访问') + response = self.udsclient.unlock_security_access(9) + # print(response) + if response.positive: + self.disp_string('>>> 请求下载驱动文件') + address = 0x20004600 + memorysize = 10 + memory_location = MemoryLocation(address, memorysize, 32, 32) + response = self.udsclient.request_download(memory_location) + # print(response) + if response.positive: + # 发送driver文件 + self.disp_string('>>> 发送driver文件') + drv_file = self.drv_data + response = self.udsclient.transfer_data(1, bytes(drv_file)) + self.set_progressbar_pos(len(drv_file)) + # print(response) + if response.positive: + self.disp_string('>>> 发送驱动结束,请求推出') + response = self.udsclient.request_transfer_exit() + # print(response) + if response.positive: + # driver文件完整性检验 + self.disp_string('>>> driver文件完整性检验') + drivecrc = bytes([0x9B, 0x39, 0xf2, 0xec]) + response = self.udsclient.start_routine(0xf001, drivecrc) + # print(response) + if response.positive: + app_file = self.app_data + address = app_file[APP_ADDR_LOCATION_OFFSET] << 24 | app_file[ + APP_ADDR_LOCATION_OFFSET + 1] << 16 | app_file[ + APP_ADDR_LOCATION_OFFSET + + 2] << 8 | app_file[APP_ADDR_LOCATION_OFFSET + 3] << 0 + memorysize = app_file[APP_LENGTH_LOCATION_OFFSET] << 24 | app_file[ + APP_LENGTH_LOCATION_OFFSET + 1] << 16 | app_file[ + APP_LENGTH_LOCATION_OFFSET + + 2] << 8 | app_file[APP_LENGTH_LOCATION_OFFSET + 3] << 0 + memory_location = MemoryLocation(address, memorysize, 32, 32) + + # 删除app存储空间 + self.disp_string('>>> 删除app存储空间') + data = b'' + data += memory_location.alfid.get_byte() + data += memory_location.get_address_bytes() + data += memory_location.get_memorysize_bytes() + response = self.udsclient.start_routine(0xff00, data) + # print(response) + if response.positive: + # 发送app文件 + self.disp_string('>>> 发送app文件') + response = self.udsclient.request_download(memory_location) + # print(response) + + max_length = response.service_data.max_length + + # 有效数据长度, 去除sid和sequence两个字节 + payload_length = max_length - 2 + + count = (len(app_file) + payload_length - 1) // payload_length + + base = self.get_progressbar_pos() + for i in range(count): + start = i * payload_length + end = start + payload_length + response = self.udsclient.transfer_data((i + 1) % 256, + app_file[start:end]) + self.set_progressbar_pos(base + end) + # print(response) + if response.positive: + response = self.udsclient.request_transfer_exit() + # print(response) + if response.positive: + # app文件完整性检验 + self.disp_string('>>> app文件完整性检验') + response = self.udsclient.start_routine(0xf001, app_file[0:4]) + # print(response) + return response.positive + + def post_programming(self): + + self.disp_string('# 后编程步骤') + + # 回到default session + self.disp_string('>>> 回到默认模式') + + response = self.udsclient.change_session(1) + return response.positive + + def start_programming(self): + self.UI.pushButton_48.setDisabled(True) + self.set_progressbar_pos(0) + + t1 = time.time() + + try: + + self.pre_programming() + self.main_programming() + self.post_programming() + except Exception as e: + + self.disp_string('%s' % e) + # self.UI.pushButton_48.setDisabled(False) + + t2 = time.time() + + self.disp_string('finished in %.2f sec' % (t2 - t1)) + self.in_programming = False + self.UI.pushButton_48.setDisabled(False) + + def set_progressbar_pos(self, pos): + + drv_data_len = len(self.drv_data) if self.drv_data is not None else 0 + app_data_len = len(self.app_data) if self.app_data is not None else 0 + total_len = drv_data_len + app_data_len + if pos > total_len: + pos = total_len + elif pos < 0: + pos = 0 + + self.UI.progressBar.setValue(pos) + + def get_progressbar_pos(self): + return self.UI.progressBar.value() + + def loadAppfile(self): + if self.in_programming: + self.disp_string('## 正在编程中') + return + + fileName, _ = QtWidgets.QFileDialog.getOpenFileName( + None, 'comboBox_4', '.', + 'All Files (*);;Bin Files (*.bin);;Hex Files (*.hex)') + + if fileName != '': + self.UI.comboBox_4.addItem(fileName) + self.UI.comboBox_4.setCurrentText(fileName) + with open(fileName, 'rb') as fd: + self.app_data = fd.read() + drv_data_len = len( + self.drv_data) if self.drv_data is not None else 0 + app_data_len = len( + self.app_data) if self.app_data is not None else 0 + total_len = drv_data_len + app_data_len + self.UI.progressBar.setRange(0, total_len) -- Gitblit v1.8.0