from PyQt5 import QtWidgets, QtCore, QtGui
|
from can.interfaces.canalystii import CANalystIIBus, TIMING_DICT
|
from can.interfaces.vector import VectorBus
|
from can.bus import BusABC
|
import asyncio
|
import time
|
import aioisotp
|
from uds.client import Client
|
from udsoncan import MemoryLocation
|
import configparser
|
|
|
dtc_describe = {
|
0x550122: 'P按键信号不正常',
|
0x550271: '马达动作故障',
|
0x550329: 'HOME侧传感器异常',
|
0x550429: 'M侧传感器故障',
|
0xD10017: '超过正常诊断电压',
|
0xD10116: '低于正常诊断电压',
|
0xC00188: 'BUS OFF',
|
0xC29187: '丢失TCU信号',
|
0xC14687: '丢失GW信号',
|
}
|
|
|
class DiagnosticTroubleCodeWidget(QtWidgets.QWidget):
|
|
def __init__(self, parent=None, flags=QtCore.Qt.WindowFlags()):
|
super().__init__(parent=parent, flags=flags)
|
self.initUI()
|
self.client: Client = None
|
self.opened_flag = False
|
self.dtc_mask = 0
|
|
try:
|
config = configparser.ConfigParser()
|
config.read('config.ini')
|
self.tx_id = int(config['CAN_ID']['tx_id'], 16)
|
self.rx_id = int(config['CAN_ID']['rx_id'], 16)
|
self.append_text_to_log_display('read config.ini success')
|
except Exception as e:
|
self.append_text_to_log_display(
|
'read config.ini error, use 0x692 and 0x693 for default tx_id and rx_id', '#ff0000')
|
self.tx_id = 0x692
|
self.rx_id = 0x693
|
|
def initUI(self):
|
|
self.setWindowTitle('Diagnostic Trouble Code')
|
self.setFont(QtGui.QFont('Segoe UI'))
|
|
cb_interface_type = QtWidgets.QComboBox(self)
|
cb_interface_type.addItems(['vector', 'canalystii'])
|
cb_interface_type.setObjectName('cb_interface_type')
|
|
cb_baudrate = QtWidgets.QComboBox(self)
|
cb_baudrate.addItems([('%dK' % (i/1000)) for i in TIMING_DICT.keys()])
|
cb_baudrate.setCurrentIndex(14)
|
cb_baudrate.setObjectName('cb_baudrate')
|
|
cb_can_channel = QtWidgets.QComboBox(self)
|
cb_can_channel.addItems(['CH1/3', 'CH2/4'])
|
cb_can_channel.setObjectName('cb_can_channel')
|
|
btn_open = QtWidgets.QPushButton(self)
|
btn_open.setText('open')
|
btn_open.setObjectName('btn_open')
|
btn_close = QtWidgets.QPushButton(self)
|
btn_close.setText('close')
|
btn_close.setObjectName('btn_close')
|
|
hb0 = QtWidgets.QHBoxLayout()
|
hb0.addWidget(cb_interface_type)
|
hb0.addWidget(cb_baudrate)
|
hb0.addWidget(cb_can_channel)
|
hb0.addWidget(btn_open)
|
hb0.addWidget(btn_close)
|
|
btn_dtc_on = QtWidgets.QPushButton('DTC On', self)
|
btn_dtc_on.setObjectName('btn_dtc_on')
|
btn_dtc_off = QtWidgets.QPushButton('DTC Off', self)
|
btn_dtc_off.setObjectName('btn_dtc_off')
|
btn_clear_dtc = QtWidgets.QPushButton('Clear DTC', self)
|
btn_clear_dtc.setObjectName('btn_clear_dtc')
|
|
hb1 = QtWidgets.QHBoxLayout()
|
hb1.addWidget(btn_dtc_on)
|
hb1.addWidget(btn_dtc_off)
|
hb1.addWidget(btn_clear_dtc)
|
|
btn_get_number_of_dtc_by_status_mask = QtWidgets.QPushButton(self)
|
btn_get_number_of_dtc_by_status_mask.setText(
|
'get number of dtc by status_mask')
|
btn_get_number_of_dtc_by_status_mask.setObjectName(
|
'btn_get_number_of_dtc_by_status_mask')
|
|
btn_get_dtc_by_status_mask = QtWidgets.QPushButton(self)
|
btn_get_dtc_by_status_mask.setText('get dtc by status mask')
|
btn_get_dtc_by_status_mask.setObjectName('btn_get_dtc_by_status_mask')
|
|
btn_get_supported_dtc = QtWidgets.QPushButton(self)
|
btn_get_supported_dtc.setText('get supported dtc')
|
btn_get_supported_dtc.setObjectName('btn_get_supported_dtc')
|
|
hb2 = QtWidgets.QHBoxLayout()
|
hb2.addWidget(btn_get_number_of_dtc_by_status_mask)
|
hb2.addWidget(btn_get_dtc_by_status_mask)
|
hb2.addWidget(btn_get_supported_dtc)
|
|
cb_bit0 = QtWidgets.QCheckBox(self)
|
cb_bit0.setText('bit0 : testFailed')
|
cb_bit0.setObjectName('cb_bit0')
|
cb_bit1 = QtWidgets.QCheckBox(self)
|
cb_bit1.setText('bit1 : testFailedThisOperationCycle')
|
cb_bit1.setObjectName('cb_bit1')
|
cb_bit2 = QtWidgets.QCheckBox(self)
|
cb_bit2.setText('bit2 : pendingDTC')
|
cb_bit2.setObjectName('cb_bit2')
|
cb_bit3 = QtWidgets.QCheckBox(self)
|
cb_bit3.setText('bit3 : confirmedDTC')
|
cb_bit3.setObjectName('cb_bit3')
|
|
cb_bit4 = QtWidgets.QCheckBox(self)
|
cb_bit4.setText('bit4 : testNotCompletedSinceLastClear')
|
cb_bit4.setObjectName('cb_bit4')
|
cb_bit5 = QtWidgets.QCheckBox(self)
|
cb_bit5.setText('bit5 : testFailedSinceLastClear')
|
cb_bit5.setObjectName('cb_bit5')
|
cb_bit6 = QtWidgets.QCheckBox(self)
|
cb_bit6.setText('bit6 : testNotCompletedThisOperationCycle')
|
cb_bit6.setObjectName('cb_bit6')
|
cb_bit7 = QtWidgets.QCheckBox(self)
|
cb_bit7.setText('bit7 : warningIndicatorRequested')
|
cb_bit7.setObjectName('cb_bit7')
|
|
gl0 = QtWidgets.QGridLayout()
|
|
gl0.addWidget(cb_bit0, 0, 0, 1, 1)
|
gl0.addWidget(cb_bit1, 0, 1, 1, 1)
|
gl0.addWidget(cb_bit2, 0, 2, 1, 1)
|
gl0.addWidget(cb_bit3, 0, 3, 1, 1)
|
gl0.addWidget(cb_bit4, 1, 0, 1, 1)
|
gl0.addWidget(cb_bit5, 1, 1, 1, 1)
|
gl0.addWidget(cb_bit6, 1, 2, 1, 1)
|
gl0.addWidget(cb_bit7, 1, 3, 1, 1)
|
|
gb_dtc_bits = QtWidgets.QGroupBox('dtc mask bits', self)
|
gb_dtc_bits.setLayout(gl0)
|
|
hb3 = QtWidgets.QHBoxLayout()
|
hb3.addWidget(gb_dtc_bits)
|
|
tb_log_display = QtWidgets.QTextBrowser(self)
|
tb_log_display.setObjectName('tb_log_display')
|
|
hb4 = QtWidgets.QHBoxLayout()
|
hb4.addWidget(tb_log_display)
|
|
vb = QtWidgets.QVBoxLayout()
|
vb.addLayout(hb0)
|
vb.addLayout(hb1)
|
vb.addLayout(hb2)
|
vb.addLayout(hb3)
|
vb.addLayout(hb4)
|
|
self.setLayout(vb)
|
|
QtCore.QMetaObject.connectSlotsByName(self)
|
|
@QtCore.pyqtSlot(str)
|
def on_cb_interface_type_currentIndexChanged(self, text):
|
|
if text == 'vector':
|
self.findChild(QtWidgets.QComboBox,
|
'cb_can_channel').setItemText(0, 'CH1/3')
|
self.findChild(QtWidgets.QComboBox,
|
'cb_can_channel').setItemText(1, 'CH2/4')
|
elif text == 'canalystii':
|
self.findChild(QtWidgets.QComboBox,
|
'cb_can_channel').setItemText(0, 'CAN1')
|
self.findChild(QtWidgets.QComboBox,
|
'cb_can_channel').setItemText(1, 'CAN2')
|
|
@QtCore.pyqtSlot()
|
def on_btn_open_clicked(self):
|
loop = asyncio.get_event_loop()
|
loop.create_task(self.open_device_task())
|
|
@QtCore.pyqtSlot()
|
def on_btn_close_clicked(self):
|
self.opened_flag = False
|
self.client = None
|
self.append_text_to_log_display('close device success')
|
|
@QtCore.pyqtSlot()
|
def on_btn_dtc_on_clicked(self):
|
if self.opened_flag:
|
loop = asyncio.get_event_loop()
|
loop.create_task(self.control_dtc_task(1))
|
else:
|
self.append_text_to_log_display(
|
'please open the device', '#ff0000')
|
|
@QtCore.pyqtSlot()
|
def on_btn_dtc_off_clicked(self):
|
if self.opened_flag:
|
loop = asyncio.get_event_loop()
|
loop.create_task(self.control_dtc_task(2))
|
else:
|
self.append_text_to_log_display(
|
'please open the device', '#ff0000')
|
|
@QtCore.pyqtSlot()
|
def on_btn_clear_dtc_clicked(self):
|
if self.opened_flag:
|
loop = asyncio.get_event_loop()
|
loop.create_task(self.clear_dtc_task())
|
else:
|
self.append_text_to_log_display(
|
'please open the device', '#ff0000')
|
|
@QtCore.pyqtSlot()
|
def on_btn_get_number_of_dtc_by_status_mask_clicked(self):
|
if self.opened_flag:
|
loop = asyncio.get_event_loop()
|
loop.create_task(self.get_number_of_dtc_by_status_mask_task())
|
else:
|
self.append_text_to_log_display(
|
'please open the device', '#ff0000')
|
|
@QtCore.pyqtSlot()
|
def on_btn_get_dtc_by_status_mask_clicked(self):
|
if self.opened_flag:
|
loop = asyncio.get_event_loop()
|
loop.create_task(self.get_dtc_by_status_mask_task())
|
else:
|
self.append_text_to_log_display(
|
'please open the device', '#ff0000')
|
|
@QtCore.pyqtSlot()
|
def on_btn_get_supported_dtc_clicked(self):
|
if self.opened_flag:
|
loop = asyncio.get_event_loop()
|
loop.create_task(self.get_supported_dtc_task())
|
else:
|
self.append_text_to_log_display(
|
'please open the device', '#ff0000')
|
|
@QtCore.pyqtSlot(int)
|
def on_cb_bit0_stateChanged(self, state):
|
if state == 0:
|
self.dtc_mask &= ~(1 << 0)
|
else:
|
self.dtc_mask |= 1 << 0
|
|
@QtCore.pyqtSlot(int)
|
def on_cb_bit1_stateChanged(self, state):
|
if state == 0:
|
self.dtc_mask &= ~(1 << 1)
|
else:
|
self.dtc_mask |= 1 << 1
|
|
@QtCore.pyqtSlot(int)
|
def on_cb_bit2_stateChanged(self, state):
|
if state == 0:
|
self.dtc_mask &= ~(1 << 2)
|
else:
|
self.dtc_mask |= 1 << 2
|
|
@QtCore.pyqtSlot(int)
|
def on_cb_bit3_stateChanged(self, state):
|
if state == 0:
|
self.dtc_mask &= ~(1 << 3)
|
else:
|
self.dtc_mask |= 1 << 3
|
|
@QtCore.pyqtSlot(int)
|
def on_cb_bit4_stateChanged(self, state):
|
if state == 0:
|
self.dtc_mask &= ~(1 << 4)
|
else:
|
self.dtc_mask |= 1 << 4
|
|
@QtCore.pyqtSlot(int)
|
def on_cb_bit5_stateChanged(self, state):
|
if state == 0:
|
self.dtc_mask &= ~(1 << 5)
|
else:
|
self.dtc_mask |= 1 << 5
|
|
@QtCore.pyqtSlot(int)
|
def on_cb_bit6_stateChanged(self, state):
|
if state == 0:
|
self.dtc_mask &= ~(1 << 6)
|
else:
|
self.dtc_mask |= 1 << 6
|
|
@QtCore.pyqtSlot(int)
|
def on_cb_bit7_stateChanged(self, state):
|
if state == 0:
|
self.dtc_mask &= ~(1 << 7)
|
else:
|
self.dtc_mask |= 1 << 7
|
|
def append_text_to_log_display(self, text, color='#000000'):
|
|
text = '<font color="%s">%s</font>' % (color, text)
|
self.findChild(QtWidgets.QTextEdit, 'tb_log_display').append(text)
|
|
def clear_log_display(self):
|
|
self.findChild(QtWidgets.QTextEdit, 'tb_log_display').clear()
|
|
async def open_device_task(self):
|
|
if self.opened_flag is True:
|
self.append_text_to_log_display('already opened', '#ff0000')
|
return
|
|
interface_type_index = self.findChild(
|
QtWidgets.QComboBox, 'cb_interface_type').currentIndex()
|
|
baudrate_index = self.findChild(
|
QtWidgets.QComboBox, 'cb_baudrate').currentIndex()
|
|
can_channel_index = self.findChild(
|
QtWidgets.QComboBox, 'cb_can_channel').currentIndex()
|
|
# bus = VectorBus(channel=1, bitrate=500000, can_filters=can_filters)
|
# bus = CANalystIIBus(channel=0, baud=500000, can_filters=can_filters)
|
|
self.clear_log_display()
|
|
can_filters = [
|
{'can_id': self.rx_id, 'can_mask': 0xFFFFFFFF}
|
]
|
|
bitrate = list(TIMING_DICT.keys())[baudrate_index]
|
|
try:
|
bus: BusABC = None
|
if interface_type_index == 0:
|
bus = VectorBus(channel=can_channel_index,
|
bitrate=bitrate, can_filters=can_filters)
|
|
elif interface_type_index == 1:
|
bus = CANalystIIBus(channel=can_channel_index,
|
baud=bitrate, can_filters=can_filters)
|
|
network = aioisotp.ISOTPNetwork(bus=bus)
|
|
with network.open():
|
reader, writer = await network.open_connection(self.rx_id, self.tx_id)
|
client = Client(reader, writer)
|
|
self.client = client
|
|
try:
|
response = await client.tester_present(timeout=0.3)
|
except asyncio.TimeoutError:
|
pass
|
except Exception as e:
|
print(e)
|
|
await client.change_session(3)
|
await client.request_seed(1)
|
await client.send_key(1, bytes([0x6d, 0xfb, 0x53, 0x20]))
|
|
self.append_text_to_log_display('open device success')
|
self.opened_flag = True
|
|
while self.opened_flag:
|
await client.tester_present(True)
|
await asyncio.sleep(1)
|
# print('tester present')
|
except asyncio.TimeoutError:
|
self.append_text_to_log_display('open device failed', '#ff0000')
|
except Exception as e:
|
self.append_text_to_log_display('%s' % e, '#ff0000')
|
|
async def control_dtc_task(self, setting_type):
|
try:
|
response = await self.client.control_dtc_setting(setting_type)
|
# print(response)
|
except Exception as e:
|
self.append_text_to_log_display('%s' % e, '#ff0000')
|
|
async def clear_dtc_task(self, group=0xffffff):
|
try:
|
response = await self.client.clear_dtc(group)
|
# print(response)
|
except Exception as e:
|
self.append_text_to_log_display('%s' % e, '#ff0000')
|
|
async def get_number_of_dtc_by_status_mask_task(self):
|
|
if self.dtc_mask == 0:
|
self.append_text_to_log_display('dtc mask is empty', '#ff0000')
|
return
|
|
try:
|
self.client.writer.write(bytes([0x19, 0x01, self.dtc_mask]))
|
payload = await asyncio.wait_for(self.client.reader.read(4095), 1)
|
self.append_text_to_log_display(
|
'>>> 0x1901 get_number_of_dtc_by_status_mask')
|
self.append_text_to_log_display('Status Mask:0x%.2x DTC Format:0x%.2x DTC Count:%d' % (
|
payload[2], payload[3], payload[4] << 8 | payload[5]))
|
except Exception as e:
|
self.append_text_to_log_display('%s' % e, '#ff0000')
|
|
async def get_dtc_by_status_mask_task(self):
|
|
if self.dtc_mask == 0:
|
self.append_text_to_log_display('dtc mask is empty', '#ff0000')
|
return
|
|
try:
|
self.client.writer.write(bytes([0x19, 0x02, self.dtc_mask]))
|
payload = await asyncio.wait_for(self.client.reader.read(4095), 1)
|
self.append_text_to_log_display(
|
'>>> 0x1902 get_dtc_by_status_mask')
|
for i in range(len(payload[3:]) // 4):
|
a = i * 4 + 3
|
b = i * 4 + 4
|
c = i * 4 + 5
|
d = i * 4 + 6
|
dtc_hex = payload[a] << 16 | payload[b] << 8 | payload[c] << 0
|
log = 'DTC:0x%.2x%.2x%.2x Status:0x%.2x %s ' % (
|
payload[a], payload[b], payload[c], payload[d], dtc_describe[dtc_hex])
|
for j in range(8):
|
if payload[d] >> j & 0x1:
|
log += 'bit%d ' % j
|
self.append_text_to_log_display(log)
|
except Exception as e:
|
self.append_text_to_log_display('%s' % e, '#ff0000')
|
|
async def get_supported_dtc_task(self):
|
try:
|
self.client.writer.write(bytes([0x19, 0x0a]))
|
payload = await asyncio.wait_for(self.client.reader.read(4095), 1)
|
self.append_text_to_log_display('>>> 0x190a get_supported_dtc')
|
for i in range(len(payload[3:]) // 4):
|
a = i * 4 + 3
|
b = i * 4 + 4
|
c = i * 4 + 5
|
d = i * 4 + 6
|
dtc_hex = payload[a] << 16 | payload[b] << 8 | payload[c] << 0
|
log = 'DTC:0x%.2x%.2x%.2x Status:0x%.2x %s ' % (
|
payload[a], payload[b], payload[c], payload[d], dtc_describe[dtc_hex])
|
for j in range(8):
|
if payload[d] >> j & 0x1:
|
log += 'bit%d ' % j
|
self.append_text_to_log_display(log)
|
except Exception as e:
|
self.append_text_to_log_display('%s' % e, '#ff0000')
|