from ast import Not, Pass from concurrent.futures import thread from curses import flash from logging import exception import threading from typing import List from can import BusABC, Message, Logger, Listener from udsoncan.client import Client from udsoncan.exceptions import TimeoutException import udsoncan from udsoncan.connections import BaseConnection from udsoncan import services, Response, MemoryLocation import isotp from isotp import CanMessage import queue from USBCAN import * from Shifter import * from styleSheet import * from mainwindows import Ui_MainWindow from PySide2 import QtWidgets, QtCore, QtGui import struct import time import datetime from ShifterDefine import * from multiprocessing import Process, Queue, Value, Pipe from hexread import * MAX_RCV_NUM = 20 APP_ADDR_LOCATION_OFFSET = 8 APP_LENGTH_LOCATION_OFFSET = 12 logger = logging.getLogger() logger.setLevel(logging.DEBUG) formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s') consoleHandler = logging.StreamHandler() consoleHandler.setLevel(logging.DEBUG) fileHandler = logging.FileHandler('./log/ShiftTool.log', mode='a', encoding='UTF-8') fileHandler.setLevel(logging.DEBUG) consoleHandler.setFormatter(formatter) fileHandler.setFormatter(formatter) logger.addHandler(consoleHandler) logger.addHandler(fileHandler) class UISignals(QtCore.QObject): sig_Disp_str = QtCore.Signal(str) sig_MsgReceived = QtCore.Signal(int) g_signal = UISignals() uds_conn, isotp_conn = Pipe() user_conn, drive_conn = Pipe() class HardwreDevice(): def __init__(self): self.device_type = 4 self.channel = 0 # can_index self.device = 0 # device_index self.baudrate = 0 class PeriodSendThread(object): def __init__(self, period_func, args=[], kwargs={}): self._thread = threading.Thread(target=self._run) self._function = period_func self._args = args self._kwargs = kwargs self._period = 0 self._event = threading.Event() self._period_event = threading.Event() self._terminated = False def start(self): self._thread.start() def stop(self): self._terminated = True self._event.set() self._thread.join() def send_start(self, period): self._period = period self._event.set() def send_stop(self): self._period_event.set() def _run(self): while True: self._event.wait() self._event.clear() if self._terminated: break self._function(*self._args, **self._kwargs) while not self._period_event.wait(self._period): self._function(*self._args, **self._kwargs) self._period_event.clear() class ShifterTool(): class IsoTpConnection(BaseConnection): mtu = 4095 def __init__(self, isotp_layer, name=None): BaseConnection.__init__(self, name) self.UDStoIsoTPQueue = queue.Queue() # USD --> isotp self.IsoTPtoUDSQueue = queue.Queue() # ISOTP --> UDS # self.IsoTPtoCANQueue = queue.Queue() # ISOTP--> CAN self._read_thread = None self.exit_requested = False self.opened = False self.isotp_layer = isotp_layer assert isinstance( self.isotp_layer, isotp.TransportLayer ), 'isotp_layer must be a valid isotp.TransportLayer ' def open(self): self.exit_requested = False self._read_thread = threading.Thread(None, target=self.rxthread_task) self._read_thread.start() self.opened = True logger.info('Connection opened') return self def __enter__(self): return self def __exit__(self, type, value, traceback): self.close() def is_open(self): return self.opened def close(self): self.empty_rxqueue() self.empty_txqueue() self.exit_requested = True self._read_thread.join() self.isotp_layer.reset() self.opened = False logger.info('Connection closed') def specific_send(self, payload): if self.mtu is not None: if len(payload) > self.mtu: logger.warning( "Truncating payload to be set to a length of %d" % (self.mtu)) payload = payload[0:self.mtu] # isotp.protocol.TransportLayer uses byte array. udsoncan is strict on bytes format self.isotp_layer.send(bytearray(payload)) # self.UDStoIsoTPQueue.put(bytearray(payload)) def specific_wait_frame(self, timeout=2): if not self.opened: raise RuntimeError("Connection is not open") timedout = False frame = None # frame = uds_conn.recv() try: frame = self.IsoTPtoUDSQueue.get(block=True, timeout=5) except queue.Empty: timedout = True # frame = self.CANtoIsoTPQueue.get(block=True, timeout=timeout) if timedout: raise TimeoutException( "Did not receive frame IsoTP Transport layer in time (timeout=%s sec)" % timeout) if self.mtu is not None: if frame is not None and len(frame) > self.mtu: logger.warning( "Truncating received payload to a length of %d" % (self.mtu)) frame = frame[0:self.mtu] # isotp.protocol.TransportLayer uses bytearray. udsoncan is strict on bytes format return bytes(frame) def empty_rxqueue(self): while not self.IsoTPtoUDSQueue.empty(): self.IsoTPtoUDSQueue.get() def empty_txqueue(self): while not self.UDStoIsoTPQueue.empty(): self.UDStoIsoTPQueue.get() def rxthread_task(self): while not self.exit_requested: try: # # self.logger.debug("UDStoIsoTPQueue queue size is now %d" % ( # # self.UDStoIsoTPQueue.qsize())) # while not self.UDStoIsoTPQueue.empty(): # self.isotp_layer.send(self.UDStoIsoTPQueue.get()) self.isotp_layer.process() while self.isotp_layer.available(): # isotp_conn.send(self.isotp_layer.recv()) self.IsoTPtoUDSQueue.put(self.isotp_layer.recv()) self.logger.debug( "IsoTPtoUDSQueue queue size is now %d" % (self.IsoTPtoUDSQueue.qsize())) # time.sleep(self.isotp_layer.sleep_time()) time.sleep(0.001) except Exception as e: self.exit_requested = True logger.error(str(e)) print("Error occurred while read CAN(FD) data!") def __init__(self, window: QtWidgets.QMainWindow): super(ShifterTool, self).__init__() # self.title("CCDiag") # self.resizable(False, False) # self.geometry(str(WIDGHT_WIDTH) + "x" + # str(WIDGHT_HEIGHT) + '+200+100') # self.protocol("WM_DELETE_WINDOW", self.Form_OnClosing) self.msgQueue = Queue() # can layer receive queue self.sendQueue = Queue() # can layer send queue self.CANtoIsoTPQueue = Queue() # CAN --> isotp self.shifter = ShifterClass() self.Vehicle = VehicleClass() self.devicedescription = HardwreDevice() self.windows = QtWidgets.QMainWindow() self.UI = Ui_MainWindow() self.UI.setupUi(window) self._dev_info = None self.dbc = cantools.database.load_file("DBC/SX7H.dbc") self.can_thread = threading.Thread(target=self.can_thread) self.TestPresentTimer = QtCore.QTimer() self.TestPresentTimer.timeout.connect(self.creat_testpresentReq) self.DeviceInit() self.WidgetsInit() self.ChnInfoUpdate(self._isOpen) self.in_programming = False self.startfromboot = False self.drv_data = [ 0x1c, 0x01, 0x06, 0x80, 0x1f, 0x01, 0x06, 0x80, 0xfb, 0x0a ] self.app_data: bytes = None self.start_timestamp = time.time() self.boot_logger = None def DeviceInit(self): self._usbcan = None self._isOpen = Value('i', 0) # 进程之间交换can连接状态 self._isChnOpen = False self.needdisconnect = Value('i', 0) # 当前连接需要断开 # current device info self._is_canfd = False self._res_support = False # read can/canfd message thread # self._read_thread = None self._terminated = False self._lock = threading.RLock() self.isotp_params = { # Will request the sender to wait 32ms between consecutive frame. 0-127ms or 100-900ns with values from 0xF1-0xF9 'stmin': 32, # Request the sender to send 8 consecutives frames before sending a new flow control message 'blocksize': 8, # Number of wait frame allowed before triggering an error 'wftmax': 0, # Link layer (CAN layer) works with 8 byte payload (CAN 2.0) 'tx_data_length': 8, # Will pad all transmitted CAN messages with byte 0x00. None means no padding 'tx_padding': 0, # Triggers a timeout if a flow control is awaited for more than 1000 milliseconds 'rx_flowcontrol_timeout': 1000, # Triggers a timeout if a consecutive frame is awaited for more than 1000 milliseconds 'rx_consecutive_frame_timeout': 1000, # When sending, respect the stmin requirement of the receiver. If set to True, go as fast as possible. 'squash_stmin_requirement': False } self._isotpaddr_PHYS = isotp.Address( isotp.AddressingMode.Normal_11bits, txid=self.shifter.canid.phy_rxId, rxid=self.shifter.canid.phy_txId) self._isotpaddr_FUNC = isotp.Address( isotp.AddressingMode.Normal_11bits, txid=self.shifter.canid.fun_rxId, rxid=self.shifter.canid.phy_txId) # self._isotpaddr_EPS = isotp.Address( # isotp.AddressingMode.Normal_11bits, txid=EPS_RX_ID_PHYS, rxid=EPS_TX_ID) # self._isotpaddr_EPS4wd = isotp.Address( # isotp.AddressingMode.Normal_11bits, txid=EPS4wd_RX_ID_PHYS, rxid=EPS4wd_TX_ID) self.isotp_layer = isotp.TransportLayer(rxfn=self.isotp_rcv, txfn=self.isotp_send, address=self._isotpaddr_PHYS, params=self.isotp_params) self.conn = ShifterTool.IsoTpConnection(isotp_layer=self.isotp_layer) self.udsclient = Client(self.conn, request_timeout=2) self.udsclient.config['security_algo'] = self.SecAlgo self.udsclient.config['security_algo_params'] = [ 0x11223344, 0x20AA097B, 0x11223344, 0x70237577 ] self.udsclient.config['data_identifiers'] = self.shifter.did_config self.udsclient.config['server_address_format'] = 32 self.udsclient.config['server_memorysize_format'] = 32 def SecAlgo(self, level, seed, params=None): """ Builds the security key to unlock a security level. temp_key = bytearray(seed) self.output_key = bytearray(seed) xorkey = bytearray(params['xorkey']) for i in range(len(temp_key)): temp_key[i] = temp_key[i] ^ xorkey[i] self.output_key[0] = (temp_key[3] & 0x0F) | (temp_key[2] & 0xF0) self.output_key[1] = ((temp_key[2] & 0x1F) << 3) | ( (temp_key[1] & 0xF8) >> 3) self.output_key[2] = ((temp_key[1] & 0xFC) >> 2) | (temp_key[0] & 0xC0) self.output_key[3] = ((temp_key[0] & 0x0F) << 4) | (temp_key[3] & 0x0F) """ temp_key = (seed[0] << 24) | (seed[1] << 16) | (seed[2] << 8) | ( seed[3]) if level == 0x01: output_key_temp = 0x20AA097B output_key = struct.pack('BBBB', (output_key_temp >> 24) & 0xFF, (output_key_temp >> 16) & 0xFF, (output_key_temp >> 8) & 0xFF, output_key_temp & 0xFF) # output_key_temp = ((((temp_key >> 4) ^ temp_key) # << 3) ^ temp_key) & 0xFFFFFFFF elif level == 0x11: output_key_temp = 0x70237577 output_key = struct.pack('BBBB', (output_key_temp >> 24) & 0xFF, (output_key_temp >> 16) & 0xFF, (output_key_temp >> 8) & 0xFF, output_key_temp & 0xFF) # _temp_y = ((temp_key << 24) & 0xFF000000) + ((temp_key << 8) & # 0xFF0000) + ((temp_key >> 8) & 0xFF00) + ((temp_key >> 24) & 0xFF) # _temp_z = 0 # _temp_sum = 0 # for i in range(64): # _temp_y += ((((_temp_z << 4) ^ (_temp_z >> 5)) + _temp_z) # ^ (_temp_sum + params[_temp_sum & 0x3])) & 0xFFFFFFFF # _temp_y = _temp_y & 0xFFFFFFFF # _temp_sum += 0x8F750A1D # _temp_sum = _temp_sum & 0xFFFFFFFF # _temp_z += ((((_temp_y << 4) ^ (_temp_y >> 5)) + _temp_y) ^ # (_temp_sum + params[(_temp_sum >> 11) & 0x3])) & 0xFFFFFFFF # _temp_z = _temp_z & 0xFFFFFFFF # output_key_temp = (((_temp_z << 24) & 0xFF000000) | ((_temp_z << 8) & 0xFF0000) | ( # (_temp_z >> 8) & 0xFF00) | ((_temp_z >> 24) & 0xFF)) elif level == 0x7D: output_key_temp = 0xAB2F9F36099D81F3 output_key = struct.pack( 'BBBBBBBB', (output_key_temp >> 56) & 0xFF, (output_key_temp >> 48) & 0xFF, (output_key_temp >> 40) & 0xFF, (output_key_temp >> 32) & 0xFF, (output_key_temp >> 24) & 0xFF, (output_key_temp >> 16) & 0xFF, (output_key_temp >> 8) & 0xFF, output_key_temp & 0xFF) else: output_key_temp = temp_key output_key = struct.pack('BBBB', (output_key_temp >> 24) & 0xFF, (output_key_temp >> 16) & 0xFF, (output_key_temp >> 8) & 0xFF, output_key_temp & 0xFF) return output_key def getDateTimeBytes(self): """ get year/month/day and convert into bytes """ _year_high = int(str(datetime.datetime.now().year), 16) >> 8 _year_low = int(str(datetime.datetime.now().year), 16) & 0xFF _month = int(str(datetime.datetime.now().month), 16) _day = int(str(datetime.datetime.now().day), 16) _hour = int(str(datetime.datetime.now().hour), 16) _minute = int(str(datetime.datetime.now().minute), 16) _second = int(str(datetime.datetime.now().second), 16) return (_year_high, _year_low, _month, _day, _hour, _minute, _second) def isotp_rcv(self): '''receive msg from can layer use queue read from can layer listen''' can_msgs = None if not self.CANtoIsoTPQueue.empty(): can_msgs = self.CANtoIsoTPQueue.get() return can_msgs def isotp_send(self, isotp_msg): '''send isotp message to can layer use queue send''' msg = Message() msg.arbitration_id = isotp_msg.arbitration_id msg.dlc = isotp_msg.dlc msg.channel = 0 msg.data = isotp_msg.data # bytearray msg.is_extended_id = False msg.timestamp = time.time() - self.start_timestamp msg.is_error_frame = False msg.is_remote_frame = False self.Tool_send(msg) def Tool_send(self, msg: Message): self._usbcan.send(msg) if self.boot_logger is not None: self.boot_logger.on_message_received(msg) def WidgetsInit(self): # self.UI.setupUi(self) self.UI.comboBox_2.addItem('USBCAN-I') self.UI.comboBox_2.addItem('USBCAN-II') # 波特率 self.UI.comboBox_3.addItems([('%dK' % (i / 1000)) for i in TIMING_DICT.keys()]) # CHANNEL self.UI.comboBox_5.addItems(['CH1/3', 'CH2/4']) self.UI.pushButton.clicked.connect(self.open_close) self.UI.comboBox_3.activated.connect(self.update_HardwareDevice) self.UI.comboBox_5.activated.connect(self.update_HardwareDevice) g_signal.sig_Disp_str.connect(self.disp_string) g_signal.sig_MsgReceived.connect(self._updateRootList) self.initUDSUI() def initUDSUI(self): self.UI.comboBox.addItems(['0x01默认模式', '0x02编程模式', '0x03扩展模式']) self.UI.comboBox.activated.connect(self.sessioncontrol_req) self.UI.comboBox_8.addItems(['0x01硬件复位', '0x03软件复位']) self.UI.comboBox_8.activated.connect(self.MCUReset_req) self.UI.comboBox_9.addItems(['0x00使能收发', '0x01能收禁发', '0x03禁止收发']) self.UI.comboBox_9.activated.connect(self.communicationControl_req) self.UI.comboBox_6.addItems(list(DID_dic.keys())) self.DID_display() self.UI.comboBox_6.activated.connect(self.DID_display) self.UI.comboBox_7.addItems(list(DTCGroup_dic.keys())) self.UI.comboBox_7.activated.connect(self.ClearDTC) self.UI.pushButton_31.clicked.connect(self.SecurityUnlockLevel_1) self.UI.pushButton_32.clicked.connect(self.SecurityUnlockLevel_2) self.UI.pushButton_12.clicked.connect(self.ReadSupplyID) self.UI.pushButton_4.clicked.connect(self.ReadVIN) self.UI.pushButton_6.clicked.connect(self.ReadMfgDate) self.UI.pushButton_14.clicked.connect(self.ReadDataByID) self.UI.pushButton_3.clicked.connect(self.WriteDataByID) self.UI.pushButton_9.clicked.connect(self.StartCalibraiton) self.UI.pushButton_11.clicked.connect(self.Calibraiton_Z) self.UI.pushButton_13.clicked.connect(self.Calibraiton_M) self.UI.pushButton_18.clicked.connect(self.Calibraiton_MP) self.UI.pushButton_22.clicked.connect(self.Calibraiton_MN) self.UI.pushButton_24.clicked.connect(self.Calibraiton_X2) self.UI.pushButton_26.clicked.connect(self.Calibraiton_X1) self.UI.pushButton_27.clicked.connect(self.Calibraiton_Y1) self.UI.pushButton_28.clicked.connect(self.Calibraiton_Y2) self.UI.pushButton_29.clicked.connect(self.Calibraiton_GAP) self.UI.radioButton.toggled.connect(self.TestPresentEvent) self.UI.pushButton_48.clicked.connect(self.start_programming) self.UI.pushButton_46.clicked.connect(self.loadAppfile) self.UI.radioButton_2.toggled.connect(self.SetFromBootFlag) self.UI.pushButton_47.clicked.connect(self.SetBootLog) # self.UI.comboBox_10.activated.connect(self.ReportByMask) self.UI.comboBox_11.addItems(list(ReportBymask_DTC.keys())) self.UI.comboBox_12.addItems(list(DTC_Dic.keys())) self.UI.comboBox_13.addItems(list(DTC_Control_dic.keys())) self.UI.comboBox_13.activated.connect(self.DTC_Control) self.UI.pushButton_49.clicked.connect(self.ReportSupportDTC) self.UI.comboBox_11.activated.connect(self.ReportByMask) self.UI.comboBox_12.activated.connect(self.ReportSnapshotByDTC) def _formatMsgData(self, index, item, received): '''msg data to list Arguments: index {int} -- msg index item {} -- recevive received {-bool} -- always true Returns: [list] -- data list: [index, received, TimeStamp, id, RemoteFlag, ExternFlag, DataLen, data] ''' data = [] if received: data.append('{0:0>4}'.format(index)) data.append(item.timestamp) data.append('接收') else: data.append('') data.append(item.timestamp) data.append('发送') data.append(str.upper(str(hex(item.arbitration_id))).replace('X', 'x')) data.append(item.dlc) if int(item.is_extended_id) == 1: data.append('扩展帧') else: data.append('标准帧') data.append(' '.join( ['0x' + '{:0>2x}'.format(a).upper() for a in list(item.data)])) return data def can_thread_stop(self): self.can_thread.join() def can_thread_start(self): self.can_thread.start() def can_thread(self): while True: time.sleep(0.01) while self._isOpen.value == 1: time.sleep(0.01) if self.needdisconnect.value == 1: ret = self._usbcan.CloseDevice() if ret == 1: self.needdisconnect.value = 0 self._isOpen.value = 0 else: msg, num = self._usbcan.Receive(len=10, timeout=10) if not num == 0: for i in range(num): if msg[i].arbitration_id == self.shifter.canid.phy_txId: # conn.send(msg[i]) # pipe connection send self.CANtoIsoTPQueue.put( msg[i]) # send msg to isotp self.msgQueue.put(msg[i]) if self.boot_logger is not None: self.boot_logger.on_message_received(msg[i]) # logger.info('time:'.format(msg[i].timestamp)+'Rx: ID=0x{:0<3x} '.format(msg[i].arbitration_id) + ' '.join(['0x' + # '{:0<2x}'.format(a).upper() for a in list(msg[i].data)])) # send signal to update screen g_signal.sig_MsgReceived.emit(num) # msgToSendCnt = self.sendQueue.qsize() # if msgToSendCnt > 0: # msg = Message() # for i in range(msgToSendCnt): # msg = self.sendQueue.get() # # logger.info('Tx: ID=0x{:0<3x} '.format( # # msg.arbitration_id)+'DLC={} '.format(msg.dlc)+'externd flag ={} '.format(msg.is_extended_id)+'remote frame:{} '.format(msg.is_remote_frame)) # self.Tool_send(msg) def send_dump(self): msg = Message() msg.arbitration_id = self.shifter.canid.phy_rxId msg.dlc = 8 msg.channel = 0 msg.data = [0x02, 0x3e, 0x80, 0, 0, 0, 0, 0] msg.is_extended_id = False msg.is_remote_frame = False msg.is_error_frame = False msg.timestamp = time.time() - self.start_timestamp self.Tool_send(msg) def send_VehiclePosition(self, data=[]): msg = Message() msg.arbitration_id = 0x10 msg.dlc = 8 msg.channel = 0 msg.data = data msg.is_extended_id = False msg.is_remote_frame = False msg.is_error_frame = False msg.timestamp = time.time() - self.start_timestamp self.Tool_send(msg) def open_close(self): if self._isOpen.value == 0: if self._usbcan is None: self.update_HardwareDevice() can_filters = [{'can_id': 0x5E0, 'can_mask': 0xFFFFFFFF}] bitrate = list( TIMING_DICT.keys())[self.devicedescription.baudrate] self._usbcan = USBCAN(device_type=4, device_index=0, can_index=self.devicedescription.channel, bitrate=bitrate, can_filters=can_filters) if not self._usbcan.InitAndStart(): logger.info("Open usbcan device fail.") self._isOpen.value = 1 self._deviceOpenUpdate() self.can_thread_start() self.conn.open() # start iso tp thread self.send_dump() self.send_dump() else: self._usbcan.InitAndStart() self._isOpen.value = 1 self._deviceOpenUpdate() else: self.needdisconnect.value = 1 self._deviceOpenUpdate() self.TestPresentTimer.stop() def closeEvent(self, e): self.boot_logger.stop() def update_HardwareDevice(self): self.devicedescription.device_type = self.UI.comboBox_2.currentIndex( ) + 3 self.devicedescription.channel = self.UI.comboBox_5.currentIndex() self.devicedescription.baudrate = self.UI.comboBox_3.currentIndex() def _deviceOpenUpdate(self): if self._isOpen.value == 1: self.UI.pushButton.setText("Close CAN") elif self.needdisconnect.value == 1: self.UI.pushButton.setText("Open CAN") # set ui info def _StartlistenMsgProcess(self): self.msgProcess = Process(name='pyUSBCANListener', target=self._usbcan.ListeningMsg, args=(self._isOpen, self.needdisconnect, self.msgQueue, self.sendQueue)) self.msgProcess.daemon = True self.msgProcess.start() # 1.5s后检测连接状态,该值可能需要标定 # self.root.after(1500, func=self._checkConnectStatus) while self._isOpen.value == 1: self._updateRootList() time.sleep(0.1) def disp_string(self, out_s): dt = datetime.datetime.now() nowtime_str = dt.strftime('%I:%M:%S') # time self.UI.textEdit_2.insertPlainText(nowtime_str + ': ') cursor = self.UI.textEdit_2.textCursor() cursor.movePosition(QtGui.QTextCursor.End) self.UI.textEdit_2.setTextCursor(cursor) self.UI.textEdit_2.insertPlainText(str(out_s + "\t\n")) # self.statusbar.showMessage(str(out)) def dispShiftstatus(self): self.UI.pushButton_41.setStyleSheet( Style_dic[self.shifter.UnlockButton]) self.UI.pushButton_40.setStyleSheet(Style_dic[self.shifter.Pbutton]) #X2 self.UI.pushButton_37.setStyleSheet( Style_dic[self.shifter.position == 2]) #X1 self.UI.pushButton_38.setStyleSheet( Style_dic[self.shifter.position == 3]) #Z self.UI.pushButton_30.setStyleSheet( Style_dic[self.shifter.position == 0]) #Y1 self.UI.pushButton_33.setStyleSheet( Style_dic[self.shifter.position == 4]) #Y2 self.UI.pushButton_34.setStyleSheet( Style_dic[self.shifter.position == 5]) #M self.UI.pushButton_39.setStyleSheet( Style_dic[self.shifter.position == 0xe]) #M+ self.UI.pushButton_35.setStyleSheet( Style_dic[self.shifter.position == 0xd]) #M- self.UI.pushButton_36.setStyleSheet( Style_dic[self.shifter.position == 0xc]) self.UI.pushButton_42.setStyleSheet( ColorStyle_dic[self.Vehicle.ShiftLeverPos == "P"]) self.UI.pushButton_43.setStyleSheet( ColorStyle_dic[self.Vehicle.ShiftLeverPos == "D"]) self.UI.pushButton_44.setStyleSheet( ColorStyle_dic[self.Vehicle.ShiftLeverPos == "N"]) self.UI.pushButton_45.setStyleSheet( ColorStyle_dic[self.Vehicle.ShiftLeverPos == "R"]) def _updateRootList(self): _dataSize = self.msgQueue.qsize() receiveNum = 0 formateddata = list() if _dataSize > 5: _dataSize = 5 for i in range(_dataSize): receiveNum += 1 msg = self.msgQueue.get() # if msg.arbitration_id == 0x420: # self.shifter.FramUnpack(msg.arbitration_id, msg.data) # resp_data = self.Vehicle.ShiftLogic(self.shifter) # self.send_VehiclePosition(resp_data) formateddata.append(self._formatMsgData( receiveNum, msg, True)) # return a data list self._insertDataSmooth(data=formateddata, datasize=_dataSize) # self.dispShiftstatus() def _insertDataSmooth(self, data, datasize): # row = 6-datasize # for row in range(datasize): for row in range(datasize): insertdata = data[row] if insertdata[3] == '0x420': row = 0 elif insertdata[3] == "0x77a": row = 1 for column in range(7): item = QtWidgets.QTableWidgetItem() item.setTextAlignment(0x84) # 文本显示位置 item.setText(str(insertdata[column])) self.UI.tableWidget.setItem(row, column, item) def ChnInfoUpdate(self, openflag): pass def TestPresentEvent(self): if self.UI.radioButton.isChecked(): self.TestPresentTimer.start(3000) else: self.TestPresentTimer.stop() def creat_testpresentReq(self): self.send_dump() def sessioncontrol_req(self): sesson = self.UI.comboBox.currentIndex() + 1 self.udsclient.change_session(sesson) # self.UI.comboBox_8.addItems(['0x01硬件复位', '0x03软件复位']) def MCUReset_req(self): req = 0x03 if self.UI.comboBox_8.currentText() == '0x01硬件复位' else 0x01 self.udsclient.ecu_reset(req) def ClearDTC(self): self.udsclient.clear_dtc( group=DTCGroup_dic[self.UI.comboBox_7.currentText()]) def DTC_Control(self): self.udsclient.control_dtc_setting( setting_type=DTC_Control_dic[self.UI.comboBox_13.currentText()]) def Security_req(self): self.udsclient.unlock_security_access(1) # Security_dic[self.UI.comboBox_8.currentData()] def SecurityUnlockLevel_1(self): data = self.udsclient.unlock_security_access(1) if data.positive: self.UI.pushButton_31.setStyleSheet( "background-color:rgb(255, 85, 127)") def SecurityUnlockLevel_2(self): data = self.udsclient.unlock_security_access(0x11) if data.positive: self.UI.pushButton_32.setStyleSheet( "background-color:rgb(255, 85, 127)") def SecurityUnlockLevel_3(self): data = self.udsclient.unlock_security_access(0x7d) if data.positive: self.UI.pushButton_32.setStyleSheet( "background-color:rgb(0, 128, 0)") return data def ReadSupplyID(self): data = self.ReadByDID(0xF18A) if data is not None: self.UI.lineEdit_8.setText(str(data[0xF18A])) def ReadVIN(self): data = self.ReadByDID(0xF190) if data is not None: self.UI.lineEdit_3.setText(str(data[0xF190])) def ReadMfgDate(self): data = self.ReadByDID(0x210B) if data is not None: self.UI.lineEdit_5.setText(str(data[0x210B])) def ReadByDID(self, didlist: List): values = {} try: response = self.udsclient.read_data_by_identifier(didlist) if response.positive: values = response.service_data.values return values except Exception as e: g_signal.sig_Disp_str.emit(e) def DID_display(self): # print(self.UI.comboBox_6.currentText()) self.UI.lineEdit.setText("0x{:0<4x}".format( (DID_dic[self.UI.comboBox_6.currentText()]))) def ReadDataByID(self): tempdid = DID_dic[self.UI.comboBox_6.currentText()] self.UI.lineEdit_2.clear() data = self.ReadByDID(tempdid) if data is not None and len(str(data[tempdid])): # print(data[tempdid]) self.UI.lineEdit_2.setText(data[tempdid]) def WriteDataByID(self): tempdid = DID_dic[self.UI.comboBox_6.currentText()] writedata = self.UI.lineEdit_2.text() try: response = self.udsclient.write_data_by_identifier( tempdid, writedata) if response.positive: values = response.service_data.values except Exception as e: g_signal.sig_Disp_str.emit(e) def communicationControl_req(self): req = 0x00 select = self.UI.comboBox_9.currentText() if select == '0x00使能收发': req = 0 elif select == '0x01能收禁发': req = 1 else: req = 3 try: self.udsclient.communication_control(control_type=req, communication_type=1) except Exception as e: self.disp_string('%s' % e) def StartCalibraiton(self): sesson = 3 self.UI.comboBox.setCurrentIndex(sesson - 1) response = self.udsclient.change_session(sesson) if response.positive: response = self.SecurityUnlockLevel_3() if response.positive: startcmd = bytearray([0x8, 0x0, 0x0, 0x0]) if self.RequestRoutControl(0xFFAA, bytes(startcmd)): self.UI.pushButton_9.setStyleSheet( "background-color:rgb(255, 85, 127)") def Calibraiton_Z(self): calcmd = bytearray([0x1, 0x0, 0x0, 0x0]) if self.RequestRoutControl(0xFFAA, bytes(calcmd)): self.UI.pushButton_11.setStyleSheet( "background-color:rgb(255, 85, 127)") def Calibraiton_M(self): calcmd = bytearray([0x0, 0x0, 0x0, 0x0]) if self.RequestRoutControl(0xFFAA, bytes(calcmd)): self.UI.pushButton_13.setStyleSheet( "background-color:rgb(255, 85, 127)") def Calibraiton_MP(self): calcmd = bytearray([0x2, 0x0, 0x0, 0x0]) if self.RequestRoutControl(0xFFAA, bytes(calcmd)): self.UI.pushButton_18.setStyleSheet( "background-color:rgb(255, 85, 127)") def Calibraiton_MN(self): calcmd = bytearray([0x3, 0x0, 0x0, 0x0]) if self.RequestRoutControl(0xFFAA, bytes(calcmd)): self.UI.pushButton_22.setStyleSheet( "background-color:rgb(255, 85, 127)") def Calibraiton_X2(self): calcmd = bytearray([0x4, 0x0, 0x0, 0x0]) if self.RequestRoutControl(0xFFAA, bytes(calcmd)): self.UI.pushButton_24.setStyleSheet( "background-color:rgb(255, 85, 127)") def Calibraiton_X1(self): calcmd = bytearray([0x5, 0x0, 0x0, 0x0]) if self.RequestRoutControl(0xFFAA, bytes(calcmd)): self.UI.pushButton_26.setStyleSheet( "background-color:rgb(255, 85, 127)") def Calibraiton_Y1(self): calcmd = bytearray([0x6, 0x0, 0x0, 0x0]) if self.RequestRoutControl(0xFFAA, bytes(calcmd)): self.UI.pushButton_27.setStyleSheet( "background-color:rgb(255, 85, 127)") def Calibraiton_Y2(self): calcmd = bytearray([0x7, 0x0, 0x0, 0x0]) if self.RequestRoutControl(0xFFAA, bytes(calcmd)): self.UI.pushButton_28.setStyleSheet( "background-color:rgb(255, 85, 127)") def Calibraiton_GAP(self): calcmd = bytearray([0x9, 0x0, 0x0, 0x0]) if self.RequestRoutControl(0xFFAA, bytes(calcmd)): self.UI.pushButton_29.setStyleSheet( "background-color:rgb(255, 85, 127)") def RequestRoutControl(self, routine_id, data): resp_1 = self.udsclient.start_routine(routine_id=routine_id, data=data) return resp_1.positive def ReportSupportDTC(self): resp_1 = self.udsclient.get_supported_dtc() if resp_1.positive: for i in range(len(resp_1.data) // 4): a = i * 4 + 2 b = i * 4 + 3 c = i * 4 + 4 d = i * 4 + 5 dtc_hex = resp_1.data[a] << 16 | resp_1.data[ b] << 8 | resp_1.data[c] << 0 log = 'DTC:0x%.6x Status:0x%.2x ' % ( dtc_hex, resp_1.data[d]) + DTC_DescriptionDic[dtc_hex] for j in range(8): if resp_1.data[d] >> j & 0x1: log += ' bit%d' % j self.disp_string(log) def ReportByMask(self): mask = ReportBymask_DTC[self.UI.comboBox_11.currentText()] resp_1 = self.udsclient.get_dtc_by_status_mask(status_mask=mask) if resp_1.positive: for i in range(len(resp_1.data) // 4): a = i * 4 + 2 b = i * 4 + 3 c = i * 4 + 4 d = i * 4 + 5 dtc_hex = resp_1.data[a] << 16 | resp_1.data[ b] << 8 | resp_1.data[c] << 0 log = 'DTC:0x%.2x%.2x%.2x Status:0x%.2x %s ' % ( resp_1.data[a], resp_1.data[b], resp_1.data[c], resp_1.data[d], DTC_DescriptionDic[dtc_hex]) for j in range(8): if resp_1.data[d] >> j & 0x1: log += 'bit%d ' % j self.disp_string(log) def ReportSnapshotByDTC(self): dtc = DTC_Dic[self.UI.comboBox_12.currentText()][ self.UI.comboBox_12.currentIndex()] try: resp = self.udsclient.get_dtc_snapshot_by_dtc_number( dtc=dtc, record_number=1) if resp.positive: number_of_did = resp.data[6] status = resp.data[4] print(status) dtc = resp.service_data.dtcs[0] snapshots = dtc.snapshots # print(dtc.id) # print(dtc.status.get_byte()) log = "DTC:0x%.6x Status:0x%.2x" % (dtc.id, status) + ' ' # print(type(snapshots[0].did)) # print(type(snapshots[0].data)) for i in range(number_of_did): log += (DTC_SanpshotDescriptionDic[snapshots[i].did] + ':' + snapshots[i].data + ' ') self.disp_string(log) # print(snapshots[i].did, snapshots[i].data) except Exception as e: self.disp_string('%s' % e) def SetFromBootFlag(self): self.startfromboot = self.UI.radioButton_2.isChecked() def pre_programming(self): if not self.startfromboot: self.disp_string('# 预编程步骤') # 进入extended session self.disp_string('>>> 进入扩展模式') response = self.udsclient.change_session(3) if response.positive: self.disp_string('>>> 解锁安全访问') response = self.udsclient.unlock_security_access(0x11) if response.positive: # 检查编程条件 self.disp_string('>>> 检查编程条件') response = self.udsclient.start_routine(0xff02) if response.positive: # 关闭DTC的存储 self.disp_string('>>> 关闭DTC的存储') response = self.udsclient.control_dtc_setting(2) # print(response) if response.positive: # 关闭与诊断无关的报文 self.disp_string('>>> 关闭与诊断无关的报文') response = self.udsclient.communication_control(0x03, 0x01) else: self.disp_string('>>> 预编程失败') return response.positive # print(response) def main_programming(self): self.disp_string('# 主编程步骤') if self.startfromboot: self.disp_string('>>> 进入扩展模式') response = self.udsclient.change_session(3) # 进入programming session self.disp_string('>>> 进入编程模式') response = self.udsclient.change_session(2) # print(response) if response.positive: # 安全访问 self.disp_string('>>> 安全访问') response = self.udsclient.unlock_security_access(0x11) # print(response) if response.positive: self.disp_string('>>> 请求下载驱动文件') address = 0x20004600 memorysize = 10 memory_location = MemoryLocation(address, memorysize, 32, 32) response = self.udsclient.request_download(memory_location) # print(response) if response.positive: # 发送driver文件 self.disp_string('>>> 发送driver文件') drv_file = self.drv_data response = self.udsclient.transfer_data(1, bytes(drv_file)) self.set_progressbar_pos(len(drv_file)) # print(response) if response.positive: self.disp_string('>>> 发送驱动结束,请求推出') response = self.udsclient.request_transfer_exit() # print(response) if response.positive: # driver文件完整性检验 self.disp_string('>>> driver文件完整性检验') drivecrc = bytes([0x9B, 0x39, 0xf2, 0xec]) response = self.udsclient.start_routine(0xf001, drivecrc) # print(response) if response.positive: app_file = self.app_data address = app_file[APP_ADDR_LOCATION_OFFSET] << 24 | app_file[ APP_ADDR_LOCATION_OFFSET + 1] << 16 | app_file[ APP_ADDR_LOCATION_OFFSET + 2] << 8 | app_file[APP_ADDR_LOCATION_OFFSET + 3] << 0 memorysize = app_file[APP_LENGTH_LOCATION_OFFSET] << 24 | app_file[ APP_LENGTH_LOCATION_OFFSET + 1] << 16 | app_file[ APP_LENGTH_LOCATION_OFFSET + 2] << 8 | app_file[APP_LENGTH_LOCATION_OFFSET + 3] << 0 memory_location = MemoryLocation(address, memorysize, 32, 32) # 删除app存储空间 self.disp_string('>>> 删除app存储空间') data = b'' data += memory_location.alfid.get_byte() data += memory_location.get_address_bytes() data += memory_location.get_memorysize_bytes() response = self.udsclient.start_routine(0xff00, data) # print(response) if response.positive: # 发送app文件 self.disp_string('>>> 发送app文件') response = self.udsclient.request_download(memory_location) # print(response) max_length = response.service_data.max_length # 有效数据长度, 去除sid和sequence两个字节 payload_length = max_length - 2 count = (len(app_file) + payload_length - 1) // payload_length base = self.get_progressbar_pos() for i in range(count): start = i * payload_length end = start + payload_length response = self.udsclient.transfer_data((i + 1) % 256, app_file[start:end]) self.set_progressbar_pos(base + end) # print(response) if response.positive: response = self.udsclient.request_transfer_exit() # print(response) if response.positive: # app文件完整性检验 self.disp_string('>>> app文件完整性检验') response = self.udsclient.start_routine(0xf001, app_file[0:4]) # print(response) return response.positive def post_programming(self): self.disp_string('# 后编程步骤') # 回到default session self.disp_string('>>> 回到默认模式') response = self.udsclient.change_session(1) return response.positive def start_programming(self): self.UI.pushButton_48.setDisabled(True) self.set_progressbar_pos(0) t1 = time.time() try: self.pre_programming() self.main_programming() self.post_programming() except Exception as e: self.disp_string('%s' % e) # self.UI.pushButton_48.setDisabled(False) t2 = time.time() self.disp_string('finished in %.2f sec' % (t2 - t1)) self.in_programming = False self.UI.pushButton_48.setDisabled(False) def set_progressbar_pos(self, pos): drv_data_len = len(self.drv_data) if self.drv_data is not None else 0 app_data_len = len(self.app_data) if self.app_data is not None else 0 total_len = drv_data_len + app_data_len if pos > total_len: pos = total_len elif pos < 0: pos = 0 self.UI.progressBar.setValue(pos) def get_progressbar_pos(self): return self.UI.progressBar.value() def loadAppfile(self): if self.in_programming: self.disp_string('## 正在编程中') return fileName, _ = QtWidgets.QFileDialog.getOpenFileName( None, 'comboBox_4', '.', 'All Files (*);;Bin Files (*.bin);;Hex Files (*.hex)') if fileName != '': self.UI.comboBox_4.addItem(fileName) self.UI.comboBox_4.setCurrentText(fileName) # with open(fileName, 'rb') as fd: # self.app_data = fd.read() # drv_data_len = len( # self.drv_data) if self.drv_data is not None else 0 # app_data_len = len( # self.app_data) if self.app_data is not None else 0 # total_len = drv_data_len + app_data_len # self.UI.progressBar.setRange(0, total_len) # print(self.app_data) # print("===============================================") hex_read = Hex_read() hex_read.Open_file(fileName) self.app_data = hex_read.data drv_data_len = len( self.drv_data) if self.drv_data is not None else 0 app_data_len = len( self.app_data) if self.app_data is not None else 0 total_len = drv_data_len + app_data_len self.UI.progressBar.setRange(0, total_len) def SetBootLog(self): if self.in_programming: self.disp_string('## 正在编程中') return fileName, _ = QtWidgets.QFileDialog.getSaveFileName( None, 'comboBox_10', '.', 'All Files (*);;ASC Files (*.asc);;Blf Files (*.blf)') if fileName != '': self.UI.comboBox_10.addItem(fileName) self.UI.comboBox_10.setCurrentText(fileName) print(fileName) self.boot_logger = Logger(fileName) # self.listener = [self.boot_logger] # with open(fileName, 'rb') as fd: # self.app_data = fd.read() # drv_data_len = len( # self.drv_data) if self.drv_data is not None else 0 # app_data_len = len( # self.app_data) if self.app_data is not None else 0 # total_len = drv_data_len + app_data_len # self.UI.progressBar.setRange(0, total_len)