from PyQt5 import QtWidgets, QtCore, QtGui
|
from can.interfaces.canalystii import CANalystIIBus, TIMING_DICT
|
from can.interfaces.vector import VectorBus
|
from can.bus import BusABC
|
import asyncio
|
import time
|
import aioisotp
|
from uds.client import Client
|
from udsoncan import MemoryLocation
|
import configparser
|
|
|
class FlashBootloaderWidget(QtWidgets.QWidget):
|
|
def __init__(self, parent=None, flags=QtCore.Qt.WindowFlags()):
|
super().__init__(parent=parent, flags=flags)
|
self.initUI()
|
self.in_programming = False
|
self.bus: BusABC = None
|
self.drv_data: bytes = None
|
self.app_data: bytes = None
|
|
try:
|
config = configparser.ConfigParser()
|
config.read('config.ini')
|
self.tx_id = int(config['CAN_ID']['tx_id'], 16)
|
self.rx_id = int(config['CAN_ID']['rx_id'], 16)
|
self.append_text_to_log_display('read config.ini success')
|
except Exception as e:
|
self.append_text_to_log_display(
|
'read config.ini error, use 0x692 and 0x693 for default tx_id and rx_id', '#ff0000')
|
self.tx_id = 0x692
|
self.rx_id = 0x693
|
|
def initUI(self):
|
|
self.setWindowTitle('Flash Bootloader')
|
self.setFont(QtGui.QFont('Segoe UI'))
|
|
le_drv = QtWidgets.QLineEdit(self)
|
le_drv.setReadOnly(True)
|
le_drv.setObjectName('le_drv')
|
btn_drv = QtWidgets.QPushButton(self)
|
btn_drv.setText('driver')
|
btn_drv.setObjectName('btn_drv')
|
|
le_app = QtWidgets.QLineEdit(self)
|
le_app.setReadOnly(True)
|
le_app.setObjectName('le_app')
|
btn_app = QtWidgets.QPushButton(self)
|
btn_app.setText('app')
|
btn_app.setObjectName('btn_app')
|
|
gl0 = QtWidgets.QGridLayout()
|
gl0.addWidget(le_drv, 0, 0, 1, 1)
|
gl0.addWidget(btn_drv, 0, 1, 1, 1)
|
gl0.addWidget(le_app, 1, 0, 1, 1)
|
gl0.addWidget(btn_app, 1, 1, 1, 1)
|
|
cb_interface_type = QtWidgets.QComboBox(self)
|
cb_interface_type.addItems(['vector', 'canalystii'])
|
cb_interface_type.setObjectName('cb_interface_type')
|
|
cb_baudrate = QtWidgets.QComboBox(self)
|
cb_baudrate.addItems([('%dK' % (i/1000)) for i in TIMING_DICT.keys()])
|
cb_baudrate.setCurrentIndex(14)
|
cb_baudrate.setObjectName('cb_baudrate')
|
|
cb_can_channel = QtWidgets.QComboBox(self)
|
cb_can_channel.addItems(['CH1/3', 'CH2/4'])
|
cb_can_channel.setObjectName('cb_can_channel')
|
|
btn_open = QtWidgets.QPushButton(self)
|
btn_open.setText('open')
|
btn_open.setObjectName('btn_open')
|
|
hb0 = QtWidgets.QHBoxLayout()
|
hb0.addWidget(cb_interface_type)
|
hb0.addWidget(cb_baudrate)
|
hb0.addWidget(cb_can_channel)
|
hb0.addWidget(btn_open)
|
|
cb_only_main = QtWidgets.QCheckBox(self)
|
cb_only_main.setText('only programming step')
|
|
btn_start = QtWidgets.QPushButton(self)
|
btn_start.setText('start programming')
|
btn_start.setObjectName('btn_start')
|
btn_start.setDisabled(True)
|
|
hb1 = QtWidgets.QHBoxLayout()
|
hb1.addWidget(cb_only_main)
|
hb1.addWidget(btn_start)
|
|
progress_bar = QtWidgets.QProgressBar()
|
progress_bar.setObjectName('progress_bar')
|
|
hb2 = QtWidgets.QHBoxLayout()
|
hb2.addWidget(progress_bar)
|
|
checkbox1 = QtWidgets.QCheckBox(self)
|
checkbox2 = QtWidgets.QCheckBox(self)
|
checkbox3 = QtWidgets.QCheckBox(self)
|
checkbox4 = QtWidgets.QCheckBox(self)
|
checkbox5 = QtWidgets.QCheckBox(self)
|
checkbox6 = QtWidgets.QCheckBox(self)
|
checkbox7 = QtWidgets.QCheckBox(self)
|
checkbox8 = QtWidgets.QCheckBox(self)
|
checkbox9 = QtWidgets.QCheckBox(self)
|
checkbox10 = QtWidgets.QCheckBox(self)
|
checkbox11 = QtWidgets.QCheckBox(self)
|
|
# https://blog.csdn.net/jianfengxia/article/details/86623321
|
|
checkbox1.setText('Enter extended session')
|
checkbox2.setText('Stop setting of DTCs')
|
checkbox3.setText('Disable non-diagnostic communication')
|
vb_pre = QtWidgets.QVBoxLayout()
|
vb_pre.setAlignment(QtCore.Qt.AlignTop)
|
vb_pre.addWidget(checkbox1)
|
vb_pre.addWidget(checkbox2)
|
vb_pre.addWidget(checkbox3)
|
gb_pre = QtWidgets.QGroupBox(self)
|
gb_pre.setTitle('pre-programming')
|
gb_pre.setLayout(vb_pre)
|
|
checkbox4.setText('Enter programming session')
|
checkbox5.setText('Request seed')
|
checkbox6.setText('Send key')
|
checkbox7.setText('Write Programming Date')
|
checkbox8.setText('Erase Application Software Memory')
|
checkbox9.setText('Download Application Software')
|
checkbox10.setText(
|
'Check Programming Application Software Dependencie')
|
|
vb_main = QtWidgets.QVBoxLayout()
|
vb_main.setAlignment(QtCore.Qt.AlignTop)
|
vb_main.addWidget(checkbox4)
|
vb_main.addWidget(checkbox5)
|
vb_main.addWidget(checkbox6)
|
vb_main.addWidget(checkbox7)
|
vb_main.addWidget(checkbox8)
|
vb_main.addWidget(checkbox9)
|
vb_main.addWidget(checkbox10)
|
gb_main = QtWidgets.QGroupBox(self)
|
gb_main.setTitle('programming')
|
gb_main.setLayout(vb_main)
|
|
checkbox11.setText('ECUReset')
|
vb_post = QtWidgets.QVBoxLayout()
|
vb_post.setAlignment(QtCore.Qt.AlignTop)
|
vb_post.addWidget(checkbox11)
|
gb_post = QtWidgets.QGroupBox(self)
|
gb_post.setTitle('post-programming')
|
gb_post.setLayout(vb_post)
|
|
hb3 = QtWidgets.QHBoxLayout()
|
hb3.addWidget(gb_pre)
|
hb3.addWidget(gb_main)
|
hb3.addWidget(gb_post)
|
|
tb_log_display = QtWidgets.QTextBrowser(self)
|
tb_log_display.setObjectName('tb_log_display')
|
|
hb4 = QtWidgets.QHBoxLayout()
|
hb4.addWidget(tb_log_display)
|
|
vb = QtWidgets.QVBoxLayout()
|
vb.addLayout(gl0)
|
vb.addLayout(hb0)
|
vb.addLayout(hb1)
|
vb.addLayout(hb2)
|
vb.addLayout(hb3)
|
vb.addLayout(hb4)
|
|
self.setLayout(vb)
|
|
QtCore.QMetaObject.connectSlotsByName(self)
|
|
async def pre_programming(self, client: Client):
|
|
self.append_text_to_log_display('# 预编程步骤')
|
|
# 进入extended session
|
self.append_text_to_log_display('>>> 进入extended session')
|
response = await client.change_session(3)
|
# print(response)
|
|
# 检查编程条件
|
self.append_text_to_log_display('>>> 检查编程条件')
|
response = await client.start_routine(0xff02)
|
# print(response)
|
|
# 关闭DTC的存储
|
self.append_text_to_log_display('>>> 关闭DTC的存储')
|
response = await client.control_dtc_setting(2)
|
# print(response)
|
|
# 关闭与诊断无关的报文
|
self.append_text_to_log_display('>>> 关闭与诊断无关的报文')
|
response = await client.communication_control(0x03, 0x01)
|
# print(response)
|
|
async def programming(self, client: Client):
|
|
self.append_text_to_log_display('# 主编程步骤')
|
|
# 进入programming session
|
self.append_text_to_log_display('>>> 进入programming session')
|
response = await client.change_session(2)
|
# print(response)
|
|
# 请求种子
|
self.append_text_to_log_display('>>> 请求种子')
|
response = await client.request_seed(1)
|
# print(response)
|
|
# 发送密钥
|
self.append_text_to_log_display('>>> 发送密钥')
|
response = await client.send_key(1, bytes([0xb3, 0x42]))
|
# print(response)
|
|
# 发送driver文件
|
self.append_text_to_log_display('>>> 发送driver文件')
|
drv_file = self.drv_data
|
address = drv_file[16] << 24 | drv_file[17] << 16 | drv_file[18] << 8 | drv_file[19] << 0
|
memorysize = drv_file[20] << 24 | drv_file[21] << 16 | drv_file[22] << 8 | drv_file[23] << 0
|
memory_location = MemoryLocation(address, memorysize, 32, 32)
|
response = await client.request_download(memory_location)
|
# print(response)
|
|
max_length = response.service_data.max_length
|
|
# 有效数据长度, 去除sid和sequence两个字节
|
payload_length = max_length - 2
|
|
count = (len(drv_file) + payload_length - 1) // payload_length
|
|
base = self.get_progressbar_pos()
|
for i in range(count):
|
start = i * payload_length
|
end = start + payload_length
|
response = await client.transfer_data((i + 1) % 256, drv_file[start:end])
|
self.set_progressbar_pos(base + end)
|
# print(response)
|
|
response = await client.request_transfer_exit()
|
# print(response)
|
|
# driver文件完整性检验
|
self.append_text_to_log_display('>>> driver文件完整性检验')
|
response = await client.start_routine(0xf001, drv_file[0:4])
|
# print(response)
|
|
app_file = self.app_data
|
address = app_file[16] << 24 | app_file[17] << 16 | app_file[18] << 8 | app_file[19] << 0
|
memorysize = app_file[20] << 24 | app_file[21] << 16 | app_file[22] << 8 | app_file[23] << 0
|
memory_location = MemoryLocation(address, memorysize, 32, 32)
|
|
# 删除app存储空间
|
self.append_text_to_log_display('>>> 删除app存储空间')
|
data = b''
|
data += memory_location.alfid.get_byte()
|
data += memory_location.get_address_bytes()
|
data += memory_location.get_memorysize_bytes()
|
response = await client.start_routine(0xff00, data)
|
# print(response)
|
|
# 发送app文件
|
self.append_text_to_log_display('>>> 发送app文件')
|
response = await client.request_download(memory_location)
|
# print(response)
|
|
max_length = response.service_data.max_length
|
|
# 有效数据长度, 去除sid和sequence两个字节
|
payload_length = max_length - 2
|
|
count = (len(app_file) + payload_length - 1) // payload_length
|
|
base = self.get_progressbar_pos()
|
for i in range(count):
|
start = i * payload_length
|
end = start + payload_length
|
response = await client.transfer_data((i + 1) % 256, app_file[start:end])
|
self.set_progressbar_pos(base + end)
|
# print(response)
|
|
response = await client.request_transfer_exit()
|
# print(response)
|
|
# app文件完整性检验
|
self.append_text_to_log_display('>>> app文件完整性检验')
|
response = await client.start_routine(0xf001, app_file[0:4])
|
# print(response)
|
|
async def post_programming(self, client: Client):
|
|
self.append_text_to_log_display('# 后编程步骤')
|
|
# 回到default session
|
self.append_text_to_log_display('>>> 回到default session')
|
response = await client.change_session(1, True)
|
# print(response)
|
|
async def start_programming(self):
|
|
network = aioisotp.ISOTPNetwork(bus=self.bus)
|
|
with network.open():
|
|
reader, writer = await network.open_connection(self.rx_id, self.tx_id)
|
|
client = Client(reader, writer)
|
|
try:
|
await client.tester_present(timeout=0.3)
|
except asyncio.TimeoutError:
|
# canalystii 第一条请求无效
|
# timeout >= 0.3
|
if isinstance(self.bus, CANalystIIBus):
|
pass
|
elif isinstance(self.bus, VectorBus):
|
self.append_text_to_log_display('查看是否选错CAN通道', '#ff0000')
|
|
except Exception as e:
|
self.append_text_to_log_display('%s' % e, '#ff0000')
|
|
self.set_progressbar_pos(0)
|
|
t1 = time.time()
|
|
try:
|
|
await self.pre_programming(client)
|
await self.programming(client)
|
await self.post_programming(client)
|
|
except asyncio.TimeoutError:
|
|
self.append_text_to_log_display('request timeout', '#ff0000')
|
|
except Exception as e:
|
|
self.append_text_to_log_display('%s' % e, '#ff0000')
|
|
t2 = time.time()
|
|
self.append_text_to_log_display('finished in %.2f sec' % (t2 - t1))
|
self.in_programming = False
|
self.findChild(QtWidgets.QPushButton,
|
'btn_start').setDisabled(True)
|
|
def set_progressbar_pos(self, pos):
|
|
drv_data_len = len(
|
self.drv_data) if self.drv_data is not None else 0
|
app_data_len = len(
|
self.app_data) if self.app_data is not None else 0
|
total_len = drv_data_len + app_data_len
|
if pos > total_len:
|
pos = total_len
|
elif pos < 0:
|
pos = 0
|
|
self.findChild(QtWidgets.QProgressBar, 'progress_bar').setValue(pos)
|
|
def get_progressbar_pos(self):
|
|
return self.findChild(QtWidgets.QProgressBar, 'progress_bar').value()
|
|
def append_text_to_log_display(self, text, color='#000000'):
|
|
text = '<font color="%s">%s</font>' % (color, text)
|
self.findChild(QtWidgets.QTextBrowser, 'tb_log_display').append(text)
|
|
def clear_log_display(self):
|
|
self.findChild(QtWidgets.QTextBrowser, 'tb_log_display').clear()
|
|
@QtCore.pyqtSlot()
|
def on_btn_drv_clicked(self):
|
|
if self.in_programming:
|
self.append_text_to_log_display('## 正在编程中')
|
return
|
|
fileName, _ = QtWidgets.QFileDialog.getOpenFileName(
|
self, 'load driver file', '.', 'All Files (*);;Bin Files (*.bin)')
|
|
if fileName != '':
|
self.findChild(QtWidgets.QLineEdit, 'le_drv').setText(fileName)
|
with open(fileName, 'rb') as fd:
|
self.drv_data = fd.read()
|
drv_data_len = len(
|
self.drv_data) if self.drv_data is not None else 0
|
app_data_len = len(
|
self.app_data) if self.app_data is not None else 0
|
total_len = drv_data_len + app_data_len
|
self.findChild(QtWidgets.QProgressBar,
|
'progress_bar').setRange(0, total_len)
|
|
@QtCore.pyqtSlot()
|
def on_btn_app_clicked(self):
|
|
if self.in_programming:
|
self.append_text_to_log_display('## 正在编程中')
|
return
|
|
fileName, _ = QtWidgets.QFileDialog.getOpenFileName(
|
self, 'load app file', '.', 'All Files (*);;Bin Files (*.bin)')
|
|
if fileName != '':
|
self.findChild(QtWidgets.QLineEdit, 'le_app').setText(fileName)
|
with open(fileName, 'rb') as fd:
|
self.app_data = fd.read()
|
drv_data_len = len(
|
self.drv_data) if self.drv_data is not None else 0
|
app_data_len = len(
|
self.app_data) if self.app_data is not None else 0
|
total_len = drv_data_len + app_data_len
|
self.findChild(QtWidgets.QProgressBar,
|
'progress_bar').setRange(0, total_len)
|
|
@QtCore.pyqtSlot()
|
def on_btn_start_clicked(self):
|
|
if self.drv_data is None or self.app_data is None:
|
self.append_text_to_log_display('请加载driver文件和app文件')
|
return
|
|
if self.bus is None:
|
self.append_text_to_log_display('请打开设备')
|
return
|
|
if self.in_programming:
|
self.append_text_to_log_display('## 正在编程中')
|
return
|
|
self.in_programming = True
|
loop = asyncio.get_event_loop()
|
loop.create_task(self.start_programming())
|
|
@QtCore.pyqtSlot()
|
def on_btn_open_clicked(self):
|
|
if self.in_programming:
|
self.append_text_to_log_display('## 正在编程中')
|
return
|
|
interface_type_index = self.findChild(
|
QtWidgets.QComboBox, 'cb_interface_type').currentIndex()
|
|
baudrate_index = self.findChild(
|
QtWidgets.QComboBox, 'cb_baudrate').currentIndex()
|
|
can_channel_index = self.findChild(
|
QtWidgets.QComboBox, 'cb_can_channel').currentIndex()
|
|
# bus = VectorBus(channel=1, bitrate=500000, can_filters=can_filters)
|
# bus = CANalystIIBus(channel=0, baud=500000, can_filters=can_filters)
|
|
self.clear_log_display()
|
|
can_filters = [
|
{'can_id': self.rx_id, 'can_mask': 0xFFFFFFFF}
|
]
|
|
bitrate = list(TIMING_DICT.keys())[baudrate_index]
|
|
try:
|
|
if interface_type_index == 0:
|
self.bus = VectorBus(channel=can_channel_index,
|
bitrate=bitrate, can_filters=can_filters)
|
|
elif interface_type_index == 1:
|
self.bus = CANalystIIBus(channel=can_channel_index,
|
baud=bitrate, can_filters=can_filters)
|
|
self.append_text_to_log_display('open device success')
|
self.findChild(QtWidgets.QPushButton,
|
'btn_start').setDisabled(False)
|
|
except Exception as e:
|
self.append_text_to_log_display('%s' % e, '#ff0000')
|
|
@QtCore.pyqtSlot(str)
|
def on_cb_interface_type_currentIndexChanged(self, text):
|
|
if text == 'vector':
|
self.findChild(QtWidgets.QComboBox,
|
'cb_can_channel').setItemText(0, 'CH1/3')
|
self.findChild(QtWidgets.QComboBox,
|
'cb_can_channel').setItemText(1, 'CH2/4')
|
elif text == 'canalystii':
|
self.findChild(QtWidgets.QComboBox,
|
'cb_can_channel').setItemText(0, 'CAN1')
|
self.findChild(QtWidgets.QComboBox,
|
'cb_can_channel').setItemText(1, 'CAN2')
|
|
# @PyQt5.QtCore.pyqtSlot(参数)
|
# def on_发送者对象名称_发射信号名称(self, 参数):
|
# pass
|