import threading from can import BusABC, Message from udsoncan.client import Client from udsoncan.exceptions import TimeoutException import udsoncan from udsoncan.connections import BaseConnection from udsoncan import services, Response, MemoryLocation import isotp from isotp import CanMessage import queue from USBCAN import * from Shifter import ShifterClass from mainwindows import Ui_MainWindow from PySide2 import QtWidgets, QtCore, QtGui import struct import time import datetime from multiprocessing import Array, Pool, Process, Queue, Value MAX_RCV_NUM = 20 logger = logging.getLogger() logger.setLevel(logging.DEBUG) formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s') consoleHandler = logging.StreamHandler() consoleHandler.setLevel(logging.DEBUG) fileHandler = logging.FileHandler( './log/ShiftTool.log', mode='a', encoding='UTF-8') fileHandler.setLevel(logging.NOTSET) consoleHandler.setFormatter(formatter) fileHandler.setFormatter(formatter) logger.addHandler(consoleHandler) logger.addHandler(fileHandler) class UISignals(QtCore.QObject): sig_Disp_str = QtCore.Signal(str) sig_MsgReceived = QtCore.Signal(int) class HardwreDevice(): def __init__(self): self.device_type = 4 self.channel = 0 # can_index self.device = 0 # device_index self.baudrate = 0 class PeriodSendThread(object): def __init__(self, period_func, args=[], kwargs={}): self._thread = threading.Thread(target=self._run) self._function = period_func self._args = args self._kwargs = kwargs self._period = 0 self._event = threading.Event() self._period_event = threading.Event() self._terminated = False def start(self): self._thread.start() def stop(self): self._terminated = True self._event.set() self._thread.join() def send_start(self, period): self._period = period self._event.set() def send_stop(self): self._period_event.set() def _run(self): while True: self._event.wait() self._event.clear() if self._terminated: break self._function(*self._args, **self._kwargs) while not self._period_event.wait(self._period): self._function(*self._args, **self._kwargs) self._period_event.clear() class ShifterTool(): class IsoTpConnection(BaseConnection): mtu = 4095 def __init__(self, isotp_layer, name=None): BaseConnection.__init__(self, name) self.UDStoIsoTPQueue = queue.Queue() # USD --> isotp self.IsoTPtoUDSQueue = queue.Queue() # ISOTP --> UDS self.CANtoIsoTPQueue = queue.Queue() # CAN --> isotp # self.IsoTPtoCANQueue = queue.Queue() # ISOTP--> CAN self._read_thread = None self.exit_requested = False self.opened = False self.isotp_layer = isotp_layer assert isinstance( self.isotp_layer, isotp.TransportLayer), 'isotp_layer must be a valid isotp.TransportLayer ' def open(self): self.exit_requested = False self._read_thread = threading.Thread( None, target=self.rxthread_task) self._read_thread.start() self.opened = True logger.info('Connection opened') return self def __enter__(self): return self def __exit__(self, type, value, traceback): self.close() def is_open(self): return self.opened def close(self): self.empty_rxqueue() self.empty_txqueue() self.exit_requested = True self._read_thread.join() self.isotp_layer.reset() self.opened = False logger.info('Connection closed') def specific_send(self, payload): if self.mtu is not None: if len(payload) > self.mtu: logger.warning( "Truncating payload to be set to a length of %d" % (self.mtu)) payload = payload[0:self.mtu] # isotp.protocol.TransportLayer uses byte array. udsoncan is strict on bytes format self.UDStoIsoTPQueue.put(bytearray(payload)) def specific_wait_frame(self, timeout=2): if not self.opened: raise RuntimeError("Connection is not open") timedout = False frame = None try: frame = self.IsoTPtoUDSQueue.get(block=True, timeout=timeout) # frame = self.CANtoIsoTPQueue.get(block=True, timeout=timeout) except queue.Empty: timedout = True if timedout: raise TimeoutException( "Did not receive frame IsoTP Transport layer in time (timeout=%s sec)" % timeout) if self.mtu is not None: if frame is not None and len(frame) > self.mtu: logger.warning( "Truncating received payload to a length of %d" % (self.mtu)) frame = frame[0:self.mtu] # isotp.protocol.TransportLayer uses bytearray. udsoncan is strict on bytes format return bytes(frame) def empty_rxqueue(self): while not self.IsoTPtoUDSQueue.empty(): self.IsoTPtoUDSQueue.get() def empty_txqueue(self): while not self.UDStoIsoTPQueue.empty(): self.UDStoIsoTPQueue.get() def rxthread_task(self): while not self.exit_requested: try: # self.logger.debug("UDStoIsoTPQueue queue size is now %d" % ( # self.UDStoIsoTPQueue.qsize())) while not self.UDStoIsoTPQueue.empty(): self.isotp_layer.send(self.UDStoIsoTPQueue.get()) self.isotp_layer.process() while self.isotp_layer.available(): self.IsoTPtoUDSQueue.put(self.isotp_layer.recv()) # self.logger.debug("IsoTPtoUDSQueue queue size is now %d" % ( # self.IsoTPtoUDSQueue.qsize())) # time.sleep(self.isotp_layer.sleep_time()) time.sleep(0.0001) except Exception as e: self.exit_requested = True logger.error(str(e)) print("Error occurred while read CAN(FD) data!") def __init__(self, window: QtWidgets.QMainWindow): super(ShifterTool, self).__init__() # self.title("CCDiag") # self.resizable(False, False) # self.geometry(str(WIDGHT_WIDTH) + "x" + # str(WIDGHT_HEIGHT) + '+200+100') # self.protocol("WM_DELETE_WINDOW", self.Form_OnClosing) self.msgQueue = Queue() # can layer receive queue self.sendQueue = Queue() # can layer send queue self.signal = UISignals() self.shifter = ShifterClass() self.devicedescription = HardwreDevice() self.windows = QtWidgets.QMainWindow() self.UI = Ui_MainWindow() self.UI.setupUi(window) self._dev_info = None self.can_thread = threading.Thread(target=self.can_thread) self.DeviceInit() self.WidgetsInit() self.ChnInfoUpdate(self._isOpen) def DeviceInit(self): self._usbcan = None self._isOpen = Value('i', 0) # 进程之间交换can连接状态 self._isChnOpen = False self.needdisconnect = Value('i', 0) # 当前连接需要断开 # current device info self._is_canfd = False self._res_support = False # read can/canfd message thread # self._read_thread = None self._terminated = False self._lock = threading.RLock() self.isotp_params = { # Will request the sender to wait 32ms between consecutive frame. 0-127ms or 100-900ns with values from 0xF1-0xF9 'stmin': 32, # Request the sender to send 8 consecutives frames before sending a new flow control message 'blocksize': 8, # Number of wait frame allowed before triggering an error 'wftmax': 0, # Link layer (CAN layer) works with 8 byte payload (CAN 2.0) 'tx_data_length': 8, # Will pad all transmitted CAN messages with byte 0x00. None means no padding 'tx_padding': 0, # Triggers a timeout if a flow control is awaited for more than 1000 milliseconds 'rx_flowcontrol_timeout': 1000, # Triggers a timeout if a consecutive frame is awaited for more than 1000 milliseconds 'rx_consecutive_frame_timeout': 1000, # When sending, respect the stmin requirement of the receiver. If set to True, go as fast as possible. 'squash_stmin_requirement': False } self._isotpaddr_PHYS = isotp.Address( isotp.AddressingMode.Normal_11bits, txid=self.shifter.canid.phy_rxId, rxid=self.shifter.canid.phy_txId) self._isotpaddr_FUNC = isotp.Address( isotp.AddressingMode.Normal_11bits, txid=self.shifter.canid.fun_rxId, rxid=self.shifter.canid.phy_txId) # self._isotpaddr_EPS = isotp.Address( # isotp.AddressingMode.Normal_11bits, txid=EPS_RX_ID_PHYS, rxid=EPS_TX_ID) # self._isotpaddr_EPS4wd = isotp.Address( # isotp.AddressingMode.Normal_11bits, txid=EPS4wd_RX_ID_PHYS, rxid=EPS4wd_TX_ID) self.isotp_layer = isotp.TransportLayer( rxfn=self.isotp_rcv, txfn=self.isotp_send, address=self._isotpaddr_PHYS, params=self.isotp_params) self.conn = ShifterTool.IsoTpConnection(isotp_layer=self.isotp_layer) self.udsclient = Client(self.conn, request_timeout=2) self.udsclient.config['security_algo'] = self.SecAlgo self.udsclient.config['security_algo_params'] = [ 0x4FE87269, 0x6BC361D8, 0x9B127D51, 0x5BA41903] self.udsclient.config['data_identifiers'] = self.shifter.did_config self.udsclient.config['server_address_format'] = 32 self.udsclient.config['server_memorysize_format'] = 32 def SecAlgo(self, level, seed, params): """ Builds the security key to unlock a security level. temp_key = bytearray(seed) self.output_key = bytearray(seed) xorkey = bytearray(params['xorkey']) for i in range(len(temp_key)): temp_key[i] = temp_key[i] ^ xorkey[i] self.output_key[0] = (temp_key[3] & 0x0F) | (temp_key[2] & 0xF0) self.output_key[1] = ((temp_key[2] & 0x1F) << 3) | ( (temp_key[1] & 0xF8) >> 3) self.output_key[2] = ((temp_key[1] & 0xFC) >> 2) | (temp_key[0] & 0xC0) self.output_key[3] = ((temp_key[0] & 0x0F) << 4) | (temp_key[3] & 0x0F) """ temp_key = (seed[0] << 24) | ( seed[1] << 16) | (seed[2] << 8) | (seed[3]) if level == 0x01: output_key_temp = ((((temp_key >> 4) ^ temp_key) << 3) ^ temp_key) & 0xFFFFFFFF elif level == 0x11: _temp_y = ((temp_key << 24) & 0xFF000000) + ((temp_key << 8) & 0xFF0000) + ((temp_key >> 8) & 0xFF00) + ((temp_key >> 24) & 0xFF) _temp_z = 0 _temp_sum = 0 for i in range(64): _temp_y += ((((_temp_z << 4) ^ (_temp_z >> 5)) + _temp_z) ^ (_temp_sum + params[_temp_sum & 0x3])) & 0xFFFFFFFF _temp_y = _temp_y & 0xFFFFFFFF _temp_sum += 0x8F750A1D _temp_sum = _temp_sum & 0xFFFFFFFF _temp_z += ((((_temp_y << 4) ^ (_temp_y >> 5)) + _temp_y) ^ (_temp_sum + params[(_temp_sum >> 11) & 0x3])) & 0xFFFFFFFF _temp_z = _temp_z & 0xFFFFFFFF output_key_temp = (((_temp_z << 24) & 0xFF000000) | ((_temp_z << 8) & 0xFF0000) | ( (_temp_z >> 8) & 0xFF00) | ((_temp_z >> 24) & 0xFF)) else: output_key_temp = temp_key output_key = struct.pack('BBBB', (output_key_temp >> 24) & 0xFF, ( output_key_temp >> 16) & 0xFF, (output_key_temp >> 8) & 0xFF, output_key_temp & 0xFF) return output_key def getDateTimeBytes(self): """ get year/month/day and convert into bytes """ _year_high = int(str(datetime.datetime.now().year), 16) >> 8 _year_low = int(str(datetime.datetime.now().year), 16) & 0xFF _month = int(str(datetime.datetime.now().month), 16) _day = int(str(datetime.datetime.now().day), 16) _hour = int(str(datetime.datetime.now().hour), 16) _minute = int(str(datetime.datetime.now().minute), 16) _second = int(str(datetime.datetime.now().second), 16) return (_year_high, _year_low, _month, _day, _hour, _minute, _second) def isotp_rcv(self): '''receive msg from can layer use queue read from can layer listen''' can_msgs = None if not self.conn.CANtoIsoTPQueue.empty(): can_msgs = self.conn.CANtoIsoTPQueue.get() # can_num = self.conn.CANtoIsoTPQueue.size() # # can_num = self._usbcan.GetReceiveNum(self.devicedescription.channel) # if can_num and not self._terminated: # read_cnt = MAX_RCV_NUM if can_num >= MAX_RCV_NUM else can_num # for i in range(read_cnt): # can_msgs[i] = self.conn.CANtoIsoTPQueue.get() # else: # can_msgs = None return can_msgs def isotp_send(self, isotp_msg): '''send isotp message to can layer use queue send''' msg = Message() msg.arbitration_id = isotp_msg.arbitration_id msg.dlc = isotp_msg.dlc msg.data = isotp_msg.data msg.is_extended_id = False # for i in range(isotp_msg.dlc): # msg.data[i] = isotp_msg.data[i] self.sendQueue.put(msg) def WidgetsInit(self): # self.UI.setupUi(self) self.UI.comboBox_2.addItem('USBCAN-I') self.UI.comboBox_2.addItem('USBCAN-II') # 波特率 self.UI.comboBox_3.addItems([('%dK' % (i/1000)) for i in TIMING_DICT.keys()]) # CHANNEL self.UI.comboBox_5.addItems(['CH1/3', 'CH2/4']) self.UI.pushButton.clicked.connect(self.open_close) self.UI.comboBox_3.activated.connect(self.update_HardwareDevice) self.UI.comboBox_5.activated.connect(self.update_HardwareDevice) self.signal.sig_Disp_str.connect(self.disp_string) self.signal.sig_MsgReceived.connect(self._updateRootList) self.initUDSUI() def initUDSUI(self): self.UI.comboBox.addItems(['0x01默认模式', '0x02编程模式', '0x03编程模式']) self.UI.comboBox_8.addItems(['0x01硬件复位', '0x03软件复位']) self.UI.comboBox_9.addItems(['0x00使能收发', '0x01能收禁发', '0x03禁止收发']) self.UI.radioButton.toggled.connect(self.TestPresentEvent) def _formatMsgData(self, index, item, received): '''msg data to list Arguments: index {int} -- msg index item {} -- recevive received {-bool} -- always true Returns: [list] -- data list: [index, received, TimeStamp, id, RemoteFlag, ExternFlag, DataLen, data] ''' data = [] if received: data.append('{0:0>4}'.format(index)) data.append(item.timestamp) data.append('接收') else: data.append('') data.append(item.timestamp) data.append('发送') data.append(str.upper(str(hex(item.arbitration_id))).replace('X', 'x')) data.append(item.dlc) if int(item.is_extended_id) == 1: data.append('扩展帧') else: data.append('标准帧') data.append(' '.join(['0x' + '{:0<2x}'.format(a).upper() for a in list(item.data)])) return data def can_thread_stop(self): self.can_thread.join() def can_thread_start(self): self.can_thread.start() def can_thread(self): while self._isOpen.value == 1: time.sleep(0.01) if self.needdisconnect.value == 1: ret = self._usbcan.CloseDevice() if ret == 1: self.needdisconnect.value = 0 self._isOpen.value = 0 else: msg, num = self._usbcan.Receive(len=10) if not num == 0: for i in range(num): if msg[i].arbitration_id == self.shifter.canid.phy_txId: self.conn.CANtoIsoTPQueue.put( msg[i]) # send msg to isotp self.msgQueue.put(msg[i]) # send signal to update screen logger.info('Rx: ID=0x{:0<3x} '.format(msg[i].arbitration_id) + ' '.join(['0x' + '{:0<2x}'.format(a).upper() for a in list(msg[i].data)])) self.signal.sig_MsgReceived.emit(num) msgToSendCnt = self.sendQueue.qsize() if msgToSendCnt > 0: msg = Message() for i in range(msgToSendCnt): msg = self.sendQueue.get() logger.info('Tx: ID=0x{:0<3x} '.format( msg.arbitration_id)+'DLC={} '.format(msg.dlc)+'externd flag ={} '.format(msg.is_extended_id)+'remote frame:{} '.format(msg.is_remote_frame)) self._usbcan.send(msg) def open_close(self): if self._isOpen.value == 0: if self._usbcan is None: self.update_HardwareDevice() can_filters = [ {'can_id': 0x420, 'can_mask': 0xFFFFFFFF}] bitrate = list(TIMING_DICT.keys())[ self.devicedescription.baudrate] self._usbcan = USBCAN(device_type=4, device_index=0, can_index=0, bitrate=bitrate, can_filters=can_filters) if not self._usbcan.InitAndStart(): logger.info("Open usbcan device fail.") self._isOpen.value = 1 self._deviceOpenUpdate() self.can_thread_start() self.conn.open() # start iso tp thread else: self._usbcan.InitAndStart() self._isOpen.value = 1 self._deviceOpenUpdate() else: self.needdisconnect.value = 1 self._deviceOpenUpdate() def update_HardwareDevice(self): self.devicedescription.device_type = self.UI.comboBox_2.currentIndex()+3 self.devicedescription.channel = self.UI.comboBox_5.currentIndex() self.devicedescription.baudrate = self.UI.comboBox_3.currentIndex() def _deviceOpenUpdate(self): if self._isOpen.value == 1: self.UI.pushButton.setText("Close CAN") elif self.needdisconnect.value == 1: self.UI.pushButton.setText("Open CAN") # set ui info def _StartlistenMsgProcess(self): self.msgProcess = Process( name='pyUSBCANListener', target=self._usbcan.ListeningMsg, args=(self._isOpen, self.needdisconnect, self.msgQueue, self.sendQueue)) self.msgProcess.daemon = True self.msgProcess.start() # 1.5s后检测连接状态,该值可能需要标定 # self.root.after(1500, func=self._checkConnectStatus) while self._isOpen.value == 1: self._updateRootList() time.sleep(0.1) def disp_string(self, out): self.UI.statusbar.showMessage(str(out)) def _updateRootList(self): _dataSize = self.msgQueue.qsize() receiveNum = 0 formateddata = list() if _dataSize > 5: _dataSize = 5 for i in range(_dataSize): receiveNum += 1 msg = self.msgQueue.get() formateddata.append(self._formatMsgData( receiveNum, msg, True)) # return a data list self._insertDataSmooth(data=formateddata, datasize=_dataSize) def _insertDataSmooth(self, data, datasize): # row = 6-datasize # for row in range(datasize): for row in range(datasize): insertdata = data[row] if insertdata[3] == '0x420': row = 0 elif insertdata[3] == "0x77a": row = 1 for column in range(7): item = QtWidgets.QTableWidgetItem() item.setTextAlignment(0x84) # 文本显示位置 item.setText(str(insertdata[column])) self.UI.tableWidget.setItem(row, column, item) def ChnInfoUpdate(self, openflag): pass def TestPresentEvent(self): self.udsclient.tester_present()