from PyQt5 import QtWidgets, QtCore, QtGui from can.interfaces.canalystii import CANalystIIBus, TIMING_DICT from can.interfaces.vector import VectorBus from can.bus import BusABC import asyncio import time import aioisotp from uds.client import Client from udsoncan import MemoryLocation import configparser class FlashBootloaderWidget(QtWidgets.QWidget): def __init__(self, parent=None, flags=QtCore.Qt.WindowFlags()): super().__init__(parent=parent, flags=flags) self.initUI() self.in_programming = False self.bus: BusABC = None self.drv_data: bytes = None self.app_data: bytes = None try: config = configparser.ConfigParser() config.read('config.ini') self.tx_id = int(config['CAN_ID']['tx_id'], 16) self.rx_id = int(config['CAN_ID']['rx_id'], 16) self.append_text_to_log_display('read config.ini success') except Exception as e: self.append_text_to_log_display( 'read config.ini error, use 0x692 and 0x693 for default tx_id and rx_id', '#ff0000') self.tx_id = 0x692 self.rx_id = 0x693 def initUI(self): self.setWindowTitle('Flash Bootloader') self.setFont(QtGui.QFont('Segoe UI')) le_drv = QtWidgets.QLineEdit(self) le_drv.setReadOnly(True) le_drv.setObjectName('le_drv') btn_drv = QtWidgets.QPushButton(self) btn_drv.setText('driver') btn_drv.setObjectName('btn_drv') le_app = QtWidgets.QLineEdit(self) le_app.setReadOnly(True) le_app.setObjectName('le_app') btn_app = QtWidgets.QPushButton(self) btn_app.setText('app') btn_app.setObjectName('btn_app') gl0 = QtWidgets.QGridLayout() gl0.addWidget(le_drv, 0, 0, 1, 1) gl0.addWidget(btn_drv, 0, 1, 1, 1) gl0.addWidget(le_app, 1, 0, 1, 1) gl0.addWidget(btn_app, 1, 1, 1, 1) cb_interface_type = QtWidgets.QComboBox(self) cb_interface_type.addItems(['vector', 'canalystii']) cb_interface_type.setObjectName('cb_interface_type') cb_baudrate = QtWidgets.QComboBox(self) cb_baudrate.addItems([('%dK' % (i/1000)) for i in TIMING_DICT.keys()]) cb_baudrate.setCurrentIndex(14) cb_baudrate.setObjectName('cb_baudrate') cb_can_channel = QtWidgets.QComboBox(self) cb_can_channel.addItems(['CH1/3', 'CH2/4']) cb_can_channel.setObjectName('cb_can_channel') btn_open = QtWidgets.QPushButton(self) btn_open.setText('open') btn_open.setObjectName('btn_open') hb0 = QtWidgets.QHBoxLayout() hb0.addWidget(cb_interface_type) hb0.addWidget(cb_baudrate) hb0.addWidget(cb_can_channel) hb0.addWidget(btn_open) cb_only_main = QtWidgets.QCheckBox(self) cb_only_main.setText('only programming step') btn_start = QtWidgets.QPushButton(self) btn_start.setText('start programming') btn_start.setObjectName('btn_start') btn_start.setDisabled(True) hb1 = QtWidgets.QHBoxLayout() hb1.addWidget(cb_only_main) hb1.addWidget(btn_start) progress_bar = QtWidgets.QProgressBar() progress_bar.setObjectName('progress_bar') hb2 = QtWidgets.QHBoxLayout() hb2.addWidget(progress_bar) checkbox1 = QtWidgets.QCheckBox(self) checkbox2 = QtWidgets.QCheckBox(self) checkbox3 = QtWidgets.QCheckBox(self) checkbox4 = QtWidgets.QCheckBox(self) checkbox5 = QtWidgets.QCheckBox(self) checkbox6 = QtWidgets.QCheckBox(self) checkbox7 = QtWidgets.QCheckBox(self) checkbox8 = QtWidgets.QCheckBox(self) checkbox9 = QtWidgets.QCheckBox(self) checkbox10 = QtWidgets.QCheckBox(self) checkbox11 = QtWidgets.QCheckBox(self) # https://blog.csdn.net/jianfengxia/article/details/86623321 checkbox1.setText('Enter extended session') checkbox2.setText('Stop setting of DTCs') checkbox3.setText('Disable non-diagnostic communication') vb_pre = QtWidgets.QVBoxLayout() vb_pre.setAlignment(QtCore.Qt.AlignTop) vb_pre.addWidget(checkbox1) vb_pre.addWidget(checkbox2) vb_pre.addWidget(checkbox3) gb_pre = QtWidgets.QGroupBox(self) gb_pre.setTitle('pre-programming') gb_pre.setLayout(vb_pre) checkbox4.setText('Enter programming session') checkbox5.setText('Request seed') checkbox6.setText('Send key') checkbox7.setText('Write Programming Date') checkbox8.setText('Erase Application Software Memory') checkbox9.setText('Download Application Software') checkbox10.setText( 'Check Programming Application Software Dependencie') vb_main = QtWidgets.QVBoxLayout() vb_main.setAlignment(QtCore.Qt.AlignTop) vb_main.addWidget(checkbox4) vb_main.addWidget(checkbox5) vb_main.addWidget(checkbox6) vb_main.addWidget(checkbox7) vb_main.addWidget(checkbox8) vb_main.addWidget(checkbox9) vb_main.addWidget(checkbox10) gb_main = QtWidgets.QGroupBox(self) gb_main.setTitle('programming') gb_main.setLayout(vb_main) checkbox11.setText('ECUReset') vb_post = QtWidgets.QVBoxLayout() vb_post.setAlignment(QtCore.Qt.AlignTop) vb_post.addWidget(checkbox11) gb_post = QtWidgets.QGroupBox(self) gb_post.setTitle('post-programming') gb_post.setLayout(vb_post) hb3 = QtWidgets.QHBoxLayout() hb3.addWidget(gb_pre) hb3.addWidget(gb_main) hb3.addWidget(gb_post) tb_log_display = QtWidgets.QTextBrowser(self) tb_log_display.setObjectName('tb_log_display') hb4 = QtWidgets.QHBoxLayout() hb4.addWidget(tb_log_display) vb = QtWidgets.QVBoxLayout() vb.addLayout(gl0) vb.addLayout(hb0) vb.addLayout(hb1) vb.addLayout(hb2) vb.addLayout(hb3) vb.addLayout(hb4) self.setLayout(vb) QtCore.QMetaObject.connectSlotsByName(self) async def pre_programming(self, client: Client): self.append_text_to_log_display('# 预编程步骤') # 进入extended session self.append_text_to_log_display('>>> 进入extended session') response = await client.change_session(3) # print(response) # 检查编程条件 self.append_text_to_log_display('>>> 检查编程条件') response = await client.start_routine(0xff02) # print(response) # 关闭DTC的存储 self.append_text_to_log_display('>>> 关闭DTC的存储') response = await client.control_dtc_setting(2) # print(response) # 关闭与诊断无关的报文 self.append_text_to_log_display('>>> 关闭与诊断无关的报文') response = await client.communication_control(0x03, 0x01) # print(response) async def programming(self, client: Client): self.append_text_to_log_display('# 主编程步骤') # 进入programming session self.append_text_to_log_display('>>> 进入programming session') response = await client.change_session(2) # print(response) # 请求种子 self.append_text_to_log_display('>>> 请求种子') response = await client.request_seed(1) # print(response) # 发送密钥 self.append_text_to_log_display('>>> 发送密钥') response = await client.send_key(1, bytes([0xb3, 0x42])) # print(response) # 发送driver文件 self.append_text_to_log_display('>>> 发送driver文件') drv_file = self.drv_data address = drv_file[16] << 24 | drv_file[17] << 16 | drv_file[18] << 8 | drv_file[19] << 0 memorysize = drv_file[20] << 24 | drv_file[21] << 16 | drv_file[22] << 8 | drv_file[23] << 0 memory_location = MemoryLocation(address, memorysize, 32, 32) response = await client.request_download(memory_location) # print(response) max_length = response.service_data.max_length # 有效数据长度, 去除sid和sequence两个字节 payload_length = max_length - 2 count = (len(drv_file) + payload_length - 1) // payload_length base = self.get_progressbar_pos() for i in range(count): start = i * payload_length end = start + payload_length response = await client.transfer_data((i + 1) % 256, drv_file[start:end]) self.set_progressbar_pos(base + end) # print(response) response = await client.request_transfer_exit() # print(response) # driver文件完整性检验 self.append_text_to_log_display('>>> driver文件完整性检验') response = await client.start_routine(0xf001, drv_file[0:4]) # print(response) app_file = self.app_data address = app_file[16] << 24 | app_file[17] << 16 | app_file[18] << 8 | app_file[19] << 0 memorysize = app_file[20] << 24 | app_file[21] << 16 | app_file[22] << 8 | app_file[23] << 0 memory_location = MemoryLocation(address, memorysize, 32, 32) # 删除app存储空间 self.append_text_to_log_display('>>> 删除app存储空间') data = b'' data += memory_location.alfid.get_byte() data += memory_location.get_address_bytes() data += memory_location.get_memorysize_bytes() response = await client.start_routine(0xff00, data) # print(response) # 发送app文件 self.append_text_to_log_display('>>> 发送app文件') response = await client.request_download(memory_location) # print(response) max_length = response.service_data.max_length # 有效数据长度, 去除sid和sequence两个字节 payload_length = max_length - 2 count = (len(app_file) + payload_length - 1) // payload_length base = self.get_progressbar_pos() for i in range(count): start = i * payload_length end = start + payload_length response = await client.transfer_data((i + 1) % 256, app_file[start:end]) self.set_progressbar_pos(base + end) # print(response) response = await client.request_transfer_exit() # print(response) # app文件完整性检验 self.append_text_to_log_display('>>> app文件完整性检验') response = await client.start_routine(0xf001, app_file[0:4]) # print(response) async def post_programming(self, client: Client): self.append_text_to_log_display('# 后编程步骤') # 回到default session self.append_text_to_log_display('>>> 回到default session') response = await client.change_session(1, True) # print(response) async def start_programming(self): network = aioisotp.ISOTPNetwork(bus=self.bus) with network.open(): reader, writer = await network.open_connection(self.rx_id, self.tx_id) client = Client(reader, writer) try: await client.tester_present(timeout=0.3) except asyncio.TimeoutError: # canalystii 第一条请求无效 # timeout >= 0.3 if isinstance(self.bus, CANalystIIBus): pass elif isinstance(self.bus, VectorBus): self.append_text_to_log_display('查看是否选错CAN通道', '#ff0000') except Exception as e: self.append_text_to_log_display('%s' % e, '#ff0000') self.set_progressbar_pos(0) t1 = time.time() try: await self.pre_programming(client) await self.programming(client) await self.post_programming(client) except asyncio.TimeoutError: self.append_text_to_log_display('request timeout', '#ff0000') except Exception as e: self.append_text_to_log_display('%s' % e, '#ff0000') t2 = time.time() self.append_text_to_log_display('finished in %.2f sec' % (t2 - t1)) self.in_programming = False self.findChild(QtWidgets.QPushButton, 'btn_start').setDisabled(True) def set_progressbar_pos(self, pos): drv_data_len = len( self.drv_data) if self.drv_data is not None else 0 app_data_len = len( self.app_data) if self.app_data is not None else 0 total_len = drv_data_len + app_data_len if pos > total_len: pos = total_len elif pos < 0: pos = 0 self.findChild(QtWidgets.QProgressBar, 'progress_bar').setValue(pos) def get_progressbar_pos(self): return self.findChild(QtWidgets.QProgressBar, 'progress_bar').value() def append_text_to_log_display(self, text, color='#000000'): text = '%s' % (color, text) self.findChild(QtWidgets.QTextBrowser, 'tb_log_display').append(text) def clear_log_display(self): self.findChild(QtWidgets.QTextBrowser, 'tb_log_display').clear() @QtCore.pyqtSlot() def on_btn_drv_clicked(self): if self.in_programming: self.append_text_to_log_display('## 正在编程中') return fileName, _ = QtWidgets.QFileDialog.getOpenFileName( self, 'load driver file', '.', 'All Files (*);;Bin Files (*.bin)') if fileName != '': self.findChild(QtWidgets.QLineEdit, 'le_drv').setText(fileName) with open(fileName, 'rb') as fd: self.drv_data = fd.read() drv_data_len = len( self.drv_data) if self.drv_data is not None else 0 app_data_len = len( self.app_data) if self.app_data is not None else 0 total_len = drv_data_len + app_data_len self.findChild(QtWidgets.QProgressBar, 'progress_bar').setRange(0, total_len) @QtCore.pyqtSlot() def on_btn_app_clicked(self): if self.in_programming: self.append_text_to_log_display('## 正在编程中') return fileName, _ = QtWidgets.QFileDialog.getOpenFileName( self, 'load app file', '.', 'All Files (*);;Bin Files (*.bin)') if fileName != '': self.findChild(QtWidgets.QLineEdit, 'le_app').setText(fileName) with open(fileName, 'rb') as fd: self.app_data = fd.read() drv_data_len = len( self.drv_data) if self.drv_data is not None else 0 app_data_len = len( self.app_data) if self.app_data is not None else 0 total_len = drv_data_len + app_data_len self.findChild(QtWidgets.QProgressBar, 'progress_bar').setRange(0, total_len) @QtCore.pyqtSlot() def on_btn_start_clicked(self): if self.drv_data is None or self.app_data is None: self.append_text_to_log_display('请加载driver文件和app文件') return if self.bus is None: self.append_text_to_log_display('请打开设备') return if self.in_programming: self.append_text_to_log_display('## 正在编程中') return self.in_programming = True loop = asyncio.get_event_loop() loop.create_task(self.start_programming()) @QtCore.pyqtSlot() def on_btn_open_clicked(self): if self.in_programming: self.append_text_to_log_display('## 正在编程中') return interface_type_index = self.findChild( QtWidgets.QComboBox, 'cb_interface_type').currentIndex() baudrate_index = self.findChild( QtWidgets.QComboBox, 'cb_baudrate').currentIndex() can_channel_index = self.findChild( QtWidgets.QComboBox, 'cb_can_channel').currentIndex() # bus = VectorBus(channel=1, bitrate=500000, can_filters=can_filters) # bus = CANalystIIBus(channel=0, baud=500000, can_filters=can_filters) self.clear_log_display() can_filters = [ {'can_id': self.rx_id, 'can_mask': 0xFFFFFFFF} ] bitrate = list(TIMING_DICT.keys())[baudrate_index] try: if interface_type_index == 0: self.bus = VectorBus(channel=can_channel_index, bitrate=bitrate, can_filters=can_filters) elif interface_type_index == 1: self.bus = CANalystIIBus(channel=can_channel_index, baud=bitrate, can_filters=can_filters) self.append_text_to_log_display('open device success') self.findChild(QtWidgets.QPushButton, 'btn_start').setDisabled(False) except Exception as e: self.append_text_to_log_display('%s' % e, '#ff0000') @QtCore.pyqtSlot(str) def on_cb_interface_type_currentIndexChanged(self, text): if text == 'vector': self.findChild(QtWidgets.QComboBox, 'cb_can_channel').setItemText(0, 'CH1/3') self.findChild(QtWidgets.QComboBox, 'cb_can_channel').setItemText(1, 'CH2/4') elif text == 'canalystii': self.findChild(QtWidgets.QComboBox, 'cb_can_channel').setItemText(0, 'CAN1') self.findChild(QtWidgets.QComboBox, 'cb_can_channel').setItemText(1, 'CAN2') # @PyQt5.QtCore.pyqtSlot(参数) # def on_发送者对象名称_发射信号名称(self, 参数): # pass