from PyQt5 import QtWidgets, QtCore, QtGui from can.interfaces.canalystii import CANalystIIBus, TIMING_DICT from can.interfaces.vector import VectorBus from can.bus import BusABC import asyncio import time import aioisotp from uds.client import Client from udsoncan import MemoryLocation import configparser dtc_describe = { 0x550122: 'P按键信号不正常', 0x550271: '马达动作故障', 0x550329: 'HOME侧传感器异常', 0x550429: 'M侧传感器故障', 0xD10017: '超过正常诊断电压', 0xD10116: '低于正常诊断电压', 0xC00188: 'BUS OFF', 0xC29187: '丢失TCU信号', 0xC14687: '丢失GW信号', } class DiagnosticTroubleCodeWidget(QtWidgets.QWidget): def __init__(self, parent=None, flags=QtCore.Qt.WindowFlags()): super().__init__(parent=parent, flags=flags) self.initUI() self.client: Client = None self.opened_flag = False self.dtc_mask = 0 try: config = configparser.ConfigParser() config.read('config.ini') self.tx_id = int(config['CAN_ID']['tx_id'], 16) self.rx_id = int(config['CAN_ID']['rx_id'], 16) self.append_text_to_log_display('read config.ini success') except Exception as e: self.append_text_to_log_display( 'read config.ini error, use 0x692 and 0x693 for default tx_id and rx_id', '#ff0000') self.tx_id = 0x692 self.rx_id = 0x693 def initUI(self): self.setWindowTitle('Diagnostic Trouble Code') self.setFont(QtGui.QFont('Segoe UI')) cb_interface_type = QtWidgets.QComboBox(self) cb_interface_type.addItems(['vector', 'canalystii']) cb_interface_type.setObjectName('cb_interface_type') cb_baudrate = QtWidgets.QComboBox(self) cb_baudrate.addItems([('%dK' % (i/1000)) for i in TIMING_DICT.keys()]) cb_baudrate.setCurrentIndex(14) cb_baudrate.setObjectName('cb_baudrate') cb_can_channel = QtWidgets.QComboBox(self) cb_can_channel.addItems(['CH1/3', 'CH2/4']) cb_can_channel.setObjectName('cb_can_channel') btn_open = QtWidgets.QPushButton(self) btn_open.setText('open') btn_open.setObjectName('btn_open') btn_close = QtWidgets.QPushButton(self) btn_close.setText('close') btn_close.setObjectName('btn_close') hb0 = QtWidgets.QHBoxLayout() hb0.addWidget(cb_interface_type) hb0.addWidget(cb_baudrate) hb0.addWidget(cb_can_channel) hb0.addWidget(btn_open) hb0.addWidget(btn_close) btn_dtc_on = QtWidgets.QPushButton('DTC On', self) btn_dtc_on.setObjectName('btn_dtc_on') btn_dtc_off = QtWidgets.QPushButton('DTC Off', self) btn_dtc_off.setObjectName('btn_dtc_off') btn_clear_dtc = QtWidgets.QPushButton('Clear DTC', self) btn_clear_dtc.setObjectName('btn_clear_dtc') hb1 = QtWidgets.QHBoxLayout() hb1.addWidget(btn_dtc_on) hb1.addWidget(btn_dtc_off) hb1.addWidget(btn_clear_dtc) btn_get_number_of_dtc_by_status_mask = QtWidgets.QPushButton(self) btn_get_number_of_dtc_by_status_mask.setText( 'get number of dtc by status_mask') btn_get_number_of_dtc_by_status_mask.setObjectName( 'btn_get_number_of_dtc_by_status_mask') btn_get_dtc_by_status_mask = QtWidgets.QPushButton(self) btn_get_dtc_by_status_mask.setText('get dtc by status mask') btn_get_dtc_by_status_mask.setObjectName('btn_get_dtc_by_status_mask') btn_get_supported_dtc = QtWidgets.QPushButton(self) btn_get_supported_dtc.setText('get supported dtc') btn_get_supported_dtc.setObjectName('btn_get_supported_dtc') hb2 = QtWidgets.QHBoxLayout() hb2.addWidget(btn_get_number_of_dtc_by_status_mask) hb2.addWidget(btn_get_dtc_by_status_mask) hb2.addWidget(btn_get_supported_dtc) cb_bit0 = QtWidgets.QCheckBox(self) cb_bit0.setText('bit0 : testFailed') cb_bit0.setObjectName('cb_bit0') cb_bit1 = QtWidgets.QCheckBox(self) cb_bit1.setText('bit1 : testFailedThisOperationCycle') cb_bit1.setObjectName('cb_bit1') cb_bit2 = QtWidgets.QCheckBox(self) cb_bit2.setText('bit2 : pendingDTC') cb_bit2.setObjectName('cb_bit2') cb_bit3 = QtWidgets.QCheckBox(self) cb_bit3.setText('bit3 : confirmedDTC') cb_bit3.setObjectName('cb_bit3') cb_bit4 = QtWidgets.QCheckBox(self) cb_bit4.setText('bit4 : testNotCompletedSinceLastClear') cb_bit4.setObjectName('cb_bit4') cb_bit5 = QtWidgets.QCheckBox(self) cb_bit5.setText('bit5 : testFailedSinceLastClear') cb_bit5.setObjectName('cb_bit5') cb_bit6 = QtWidgets.QCheckBox(self) cb_bit6.setText('bit6 : testNotCompletedThisOperationCycle') cb_bit6.setObjectName('cb_bit6') cb_bit7 = QtWidgets.QCheckBox(self) cb_bit7.setText('bit7 : warningIndicatorRequested') cb_bit7.setObjectName('cb_bit7') gl0 = QtWidgets.QGridLayout() gl0.addWidget(cb_bit0, 0, 0, 1, 1) gl0.addWidget(cb_bit1, 0, 1, 1, 1) gl0.addWidget(cb_bit2, 0, 2, 1, 1) gl0.addWidget(cb_bit3, 0, 3, 1, 1) gl0.addWidget(cb_bit4, 1, 0, 1, 1) gl0.addWidget(cb_bit5, 1, 1, 1, 1) gl0.addWidget(cb_bit6, 1, 2, 1, 1) gl0.addWidget(cb_bit7, 1, 3, 1, 1) gb_dtc_bits = QtWidgets.QGroupBox('dtc mask bits', self) gb_dtc_bits.setLayout(gl0) hb3 = QtWidgets.QHBoxLayout() hb3.addWidget(gb_dtc_bits) tb_log_display = QtWidgets.QTextBrowser(self) tb_log_display.setObjectName('tb_log_display') hb4 = QtWidgets.QHBoxLayout() hb4.addWidget(tb_log_display) vb = QtWidgets.QVBoxLayout() vb.addLayout(hb0) vb.addLayout(hb1) vb.addLayout(hb2) vb.addLayout(hb3) vb.addLayout(hb4) self.setLayout(vb) QtCore.QMetaObject.connectSlotsByName(self) @QtCore.pyqtSlot(str) def on_cb_interface_type_currentIndexChanged(self, text): if text == 'vector': self.findChild(QtWidgets.QComboBox, 'cb_can_channel').setItemText(0, 'CH1/3') self.findChild(QtWidgets.QComboBox, 'cb_can_channel').setItemText(1, 'CH2/4') elif text == 'canalystii': self.findChild(QtWidgets.QComboBox, 'cb_can_channel').setItemText(0, 'CAN1') self.findChild(QtWidgets.QComboBox, 'cb_can_channel').setItemText(1, 'CAN2') @QtCore.pyqtSlot() def on_btn_open_clicked(self): loop = asyncio.get_event_loop() loop.create_task(self.open_device_task()) @QtCore.pyqtSlot() def on_btn_close_clicked(self): self.opened_flag = False self.client = None self.append_text_to_log_display('close device success') @QtCore.pyqtSlot() def on_btn_dtc_on_clicked(self): if self.opened_flag: loop = asyncio.get_event_loop() loop.create_task(self.control_dtc_task(1)) else: self.append_text_to_log_display( 'please open the device', '#ff0000') @QtCore.pyqtSlot() def on_btn_dtc_off_clicked(self): if self.opened_flag: loop = asyncio.get_event_loop() loop.create_task(self.control_dtc_task(2)) else: self.append_text_to_log_display( 'please open the device', '#ff0000') @QtCore.pyqtSlot() def on_btn_clear_dtc_clicked(self): if self.opened_flag: loop = asyncio.get_event_loop() loop.create_task(self.clear_dtc_task()) else: self.append_text_to_log_display( 'please open the device', '#ff0000') @QtCore.pyqtSlot() def on_btn_get_number_of_dtc_by_status_mask_clicked(self): if self.opened_flag: loop = asyncio.get_event_loop() loop.create_task(self.get_number_of_dtc_by_status_mask_task()) else: self.append_text_to_log_display( 'please open the device', '#ff0000') @QtCore.pyqtSlot() def on_btn_get_dtc_by_status_mask_clicked(self): if self.opened_flag: loop = asyncio.get_event_loop() loop.create_task(self.get_dtc_by_status_mask_task()) else: self.append_text_to_log_display( 'please open the device', '#ff0000') @QtCore.pyqtSlot() def on_btn_get_supported_dtc_clicked(self): if self.opened_flag: loop = asyncio.get_event_loop() loop.create_task(self.get_supported_dtc_task()) else: self.append_text_to_log_display( 'please open the device', '#ff0000') @QtCore.pyqtSlot(int) def on_cb_bit0_stateChanged(self, state): if state == 0: self.dtc_mask &= ~(1 << 0) else: self.dtc_mask |= 1 << 0 @QtCore.pyqtSlot(int) def on_cb_bit1_stateChanged(self, state): if state == 0: self.dtc_mask &= ~(1 << 1) else: self.dtc_mask |= 1 << 1 @QtCore.pyqtSlot(int) def on_cb_bit2_stateChanged(self, state): if state == 0: self.dtc_mask &= ~(1 << 2) else: self.dtc_mask |= 1 << 2 @QtCore.pyqtSlot(int) def on_cb_bit3_stateChanged(self, state): if state == 0: self.dtc_mask &= ~(1 << 3) else: self.dtc_mask |= 1 << 3 @QtCore.pyqtSlot(int) def on_cb_bit4_stateChanged(self, state): if state == 0: self.dtc_mask &= ~(1 << 4) else: self.dtc_mask |= 1 << 4 @QtCore.pyqtSlot(int) def on_cb_bit5_stateChanged(self, state): if state == 0: self.dtc_mask &= ~(1 << 5) else: self.dtc_mask |= 1 << 5 @QtCore.pyqtSlot(int) def on_cb_bit6_stateChanged(self, state): if state == 0: self.dtc_mask &= ~(1 << 6) else: self.dtc_mask |= 1 << 6 @QtCore.pyqtSlot(int) def on_cb_bit7_stateChanged(self, state): if state == 0: self.dtc_mask &= ~(1 << 7) else: self.dtc_mask |= 1 << 7 def append_text_to_log_display(self, text, color='#000000'): text = '%s' % (color, text) self.findChild(QtWidgets.QTextEdit, 'tb_log_display').append(text) def clear_log_display(self): self.findChild(QtWidgets.QTextEdit, 'tb_log_display').clear() async def open_device_task(self): if self.opened_flag is True: self.append_text_to_log_display('already opened', '#ff0000') return interface_type_index = self.findChild( QtWidgets.QComboBox, 'cb_interface_type').currentIndex() baudrate_index = self.findChild( QtWidgets.QComboBox, 'cb_baudrate').currentIndex() can_channel_index = self.findChild( QtWidgets.QComboBox, 'cb_can_channel').currentIndex() # bus = VectorBus(channel=1, bitrate=500000, can_filters=can_filters) # bus = CANalystIIBus(channel=0, baud=500000, can_filters=can_filters) self.clear_log_display() can_filters = [ {'can_id': self.rx_id, 'can_mask': 0xFFFFFFFF} ] bitrate = list(TIMING_DICT.keys())[baudrate_index] try: bus: BusABC = None if interface_type_index == 0: bus = VectorBus(channel=can_channel_index, bitrate=bitrate, can_filters=can_filters) elif interface_type_index == 1: bus = CANalystIIBus(channel=can_channel_index, baud=bitrate, can_filters=can_filters) network = aioisotp.ISOTPNetwork(bus=bus) with network.open(): reader, writer = await network.open_connection(self.rx_id, self.tx_id) client = Client(reader, writer) self.client = client try: response = await client.tester_present(timeout=0.3) except asyncio.TimeoutError: pass except Exception as e: print(e) await client.change_session(3) await client.request_seed(1) await client.send_key(1, bytes([0x6d, 0xfb, 0x53, 0x20])) self.append_text_to_log_display('open device success') self.opened_flag = True while self.opened_flag: await client.tester_present(True) await asyncio.sleep(1) # print('tester present') except asyncio.TimeoutError: self.append_text_to_log_display('open device failed', '#ff0000') except Exception as e: self.append_text_to_log_display('%s' % e, '#ff0000') async def control_dtc_task(self, setting_type): try: response = await self.client.control_dtc_setting(setting_type) # print(response) except Exception as e: self.append_text_to_log_display('%s' % e, '#ff0000') async def clear_dtc_task(self, group=0xffffff): try: response = await self.client.clear_dtc(group) # print(response) except Exception as e: self.append_text_to_log_display('%s' % e, '#ff0000') async def get_number_of_dtc_by_status_mask_task(self): if self.dtc_mask == 0: self.append_text_to_log_display('dtc mask is empty', '#ff0000') return try: self.client.writer.write(bytes([0x19, 0x01, self.dtc_mask])) payload = await asyncio.wait_for(self.client.reader.read(4095), 1) self.append_text_to_log_display( '>>> 0x1901 get_number_of_dtc_by_status_mask') self.append_text_to_log_display('Status Mask:0x%.2x DTC Format:0x%.2x DTC Count:%d' % ( payload[2], payload[3], payload[4] << 8 | payload[5])) except Exception as e: self.append_text_to_log_display('%s' % e, '#ff0000') async def get_dtc_by_status_mask_task(self): if self.dtc_mask == 0: self.append_text_to_log_display('dtc mask is empty', '#ff0000') return try: self.client.writer.write(bytes([0x19, 0x02, self.dtc_mask])) payload = await asyncio.wait_for(self.client.reader.read(4095), 1) self.append_text_to_log_display( '>>> 0x1902 get_dtc_by_status_mask') for i in range(len(payload[3:]) // 4): a = i * 4 + 3 b = i * 4 + 4 c = i * 4 + 5 d = i * 4 + 6 dtc_hex = payload[a] << 16 | payload[b] << 8 | payload[c] << 0 log = 'DTC:0x%.2x%.2x%.2x Status:0x%.2x %s ' % ( payload[a], payload[b], payload[c], payload[d], dtc_describe[dtc_hex]) for j in range(8): if payload[d] >> j & 0x1: log += 'bit%d ' % j self.append_text_to_log_display(log) except Exception as e: self.append_text_to_log_display('%s' % e, '#ff0000') async def get_supported_dtc_task(self): try: self.client.writer.write(bytes([0x19, 0x0a])) payload = await asyncio.wait_for(self.client.reader.read(4095), 1) self.append_text_to_log_display('>>> 0x190a get_supported_dtc') for i in range(len(payload[3:]) // 4): a = i * 4 + 3 b = i * 4 + 4 c = i * 4 + 5 d = i * 4 + 6 dtc_hex = payload[a] << 16 | payload[b] << 8 | payload[c] << 0 log = 'DTC:0x%.2x%.2x%.2x Status:0x%.2x %s ' % ( payload[a], payload[b], payload[c], payload[d], dtc_describe[dtc_hex]) for j in range(8): if payload[d] >> j & 0x1: log += 'bit%d ' % j self.append_text_to_log_display(log) except Exception as e: self.append_text_to_log_display('%s' % e, '#ff0000')