from PyQt5 import QtWidgets, QtCore, QtGui
|
from can.interfaces.canalystii import CANalystIIBus, TIMING_DICT
|
from can.interfaces.vector import VectorBus
|
from can.bus import BusABC
|
import asyncio
|
import time
|
import aioisotp
|
from uds.client import Client
|
from udsoncan import MemoryLocation
|
import configparser
|
|
|
class FlashBootloaderWidget(QtWidgets.QWidget):
|
def __init__(self, parent=None, flags=QtCore.Qt.WindowFlags()):
|
super().__init__(parent=parent, flags=flags)
|
self.initUI()
|
self.in_programming = False
|
self.bus: BusABC = None
|
self.drv_data: bytes = None
|
self.app_data: bytes = None
|
|
try:
|
config = configparser.ConfigParser()
|
config.read('config.ini')
|
self.tx_id = int(config['CAN_ID']['tx_id'], 16)
|
self.rx_id = int(config['CAN_ID']['rx_id'], 16)
|
self.append_text_to_log_display('read config.ini success')
|
except Exception as e:
|
self.append_text_to_log_display(
|
'read config.ini error, use 0x692 and 0x693 for default tx_id and rx_id',
|
'#ff0000')
|
self.tx_id = 0x692
|
self.rx_id = 0x693
|
|
def initUI(self):
|
self.setWindowTitle('Bootloader')
|
self.setFont(QtGui.QFont('Segoe UI'))
|
desktop = QtWidgets.QApplication.desktop() # 获得屏幕尺寸
|
self.resize(desktop.width() / 2, desktop.height() / 2)
|
|
# 设置文本输入框
|
le_drv = QtWidgets.QLineEdit(self)
|
le_drv.setReadOnly(True)
|
le_drv.setObjectName('le_drv')
|
btn_drv = QtWidgets.QPushButton(self)
|
btn_drv.setText('驱动程序')
|
btn_drv.setObjectName('btn_drv')
|
|
le_app = QtWidgets.QLineEdit(self)
|
le_app.setReadOnly(True)
|
le_app.setObjectName('le_app')
|
btn_app = QtWidgets.QPushButton(self)
|
btn_app.setText('应用程序')
|
btn_app.setObjectName('btn_app')
|
# 创建网格布局 2行2列 void addWidget(QWidget *, int row, int column, int rowSpan, int columnSpan, Qt::Alignment = 0);
|
gridlayer_file = QtWidgets.QGridLayout()
|
gridlayer_file.addWidget(le_drv, 0, 0, 1, 1)
|
gridlayer_file.addWidget(btn_drv, 0, 1, 1, 1)
|
gridlayer_file.addWidget(le_app, 1, 0, 1, 1)
|
gridlayer_file.addWidget(btn_app, 1, 1, 1, 1)
|
|
gb_file = QtWidgets.QGroupBox(self)
|
gb_file.setTitle('加载文件')
|
gb_file.setLayout(gridlayer_file)
|
|
cb_interface_type = QtWidgets.QComboBox(self)
|
cb_interface_type.addItems(['CANoe', 'USBCAN'])
|
cb_interface_type.setObjectName('cb_interface_type')
|
|
cb_baudrate = QtWidgets.QComboBox(self)
|
cb_baudrate.addItems([('%dK' % (i / 1000))
|
for i in TIMING_DICT.keys()])
|
cb_baudrate.setCurrentIndex(14)
|
cb_baudrate.setObjectName('cb_baudrate')
|
|
cb_can_channel = QtWidgets.QComboBox(self)
|
cb_can_channel.addItems(['通道1', '通道2'])
|
cb_can_channel.setObjectName('cb_can_channel')
|
|
btn_open = QtWidgets.QPushButton(self)
|
btn_open.setText('打开')
|
btn_open.setObjectName('btn_open')
|
|
cb_only_main = QtWidgets.QCheckBox(self)
|
cb_only_main.setText('仅主编程')
|
|
btn_start = QtWidgets.QPushButton(self)
|
btn_start.setText('开始刷写')
|
btn_start.setObjectName('btn_start')
|
btn_start.setDisabled(True)
|
|
progress_bar = QtWidgets.QProgressBar()
|
progress_bar.setObjectName('progress_bar')
|
|
GL0 = QtWidgets.QGridLayout()
|
GL0.addWidget(cb_interface_type, 0, 0, 1, 1)
|
GL0.addWidget(cb_baudrate, 0, 1, 1, 1)
|
GL0.addWidget(cb_can_channel, 0, 2, 1, 1)
|
GL0.addWidget(btn_open, 0, 3, 1, 1)
|
|
gb_cfg = QtWidgets.QGroupBox(self)
|
gb_cfg.setTitle('配置')
|
gb_cfg.setLayout(GL0)
|
|
GL1 = QtWidgets.QGridLayout()
|
GL1.addWidget(cb_only_main, 0, 0, 1, 1)
|
GL1.addWidget(btn_start, 1, 8, 1, 1)
|
GL1.addWidget(progress_bar, 1, 0, 1, 8)
|
|
gb_progress = QtWidgets.QGroupBox(self)
|
gb_progress.setTitle('进度控制')
|
gb_progress.setLayout(GL1)
|
|
tb_log_display = QtWidgets.QTextBrowser(self)
|
tb_log_display.setObjectName('tb_log_display')
|
|
table_data_display = QtWidgets.QTableWidget(self)
|
table_data_display.setObjectName('table_data_display')
|
table_data_display.setColumnCount(4)
|
table_data_display.setRowCount(8)
|
|
table_data_display.setHorizontalHeaderLabels(
|
['时间', 'CAN ID', 'DLC', '数据'])
|
table_data_display.verticalHeader().setVisible(False)
|
table_data_display.horizontalHeader().setStretchLastSection(
|
True) # 设置表格最后一列自适应
|
table_data_display.setColumnWidth(0, 100)
|
table_data_display.setColumnWidth(1, 80)
|
table_data_display.setColumnWidth(2, 80)
|
table_data_display.setShowGrid(False)
|
GL_disp = QtWidgets.QGridLayout()
|
GL_disp.setColumnStretch(0, 1) # 设置表格列宽
|
GL_disp.setColumnStretch(1, 3)
|
GL_disp.addWidget(table_data_display, 0, 1, 1, 3)
|
GL_disp.addWidget(tb_log_display, 0, 0, 1, 1)
|
|
gb_log = QtWidgets.QGroupBox(self)
|
gb_log.setTitle('数据事件')
|
gb_log.setLayout(GL_disp)
|
|
# 垂直布局
|
vb = QtWidgets.QVBoxLayout()
|
vb.addWidget(gb_file)
|
vb.addWidget(gb_cfg)
|
vb.addWidget(gb_progress)
|
vb.addWidget(gb_log)
|
|
self.setLayout(vb)
|
|
QtCore.QMetaObject.connectSlotsByName(self)
|
|
async def pre_programming(self, client: Client):
|
|
self.append_text_to_log_display('# 预编程步骤')
|
|
# 进入extended session
|
self.append_text_to_log_display('>>> 进入extended session')
|
response = await client.change_session(3)
|
# print(response)
|
|
# 检查编程条件
|
self.append_text_to_log_display('>>> 检查编程条件')
|
response = await client.start_routine(0xff02)
|
# print(response)
|
|
# 关闭DTC的存储
|
self.append_text_to_log_display('>>> 关闭DTC的存储')
|
response = await client.control_dtc_setting(2)
|
# print(response)
|
|
# 关闭与诊断无关的报文
|
self.append_text_to_log_display('>>> 关闭与诊断无关的报文')
|
response = await client.communication_control(0x03, 0x01)
|
# print(response)
|
|
async def programming(self, client: Client):
|
|
self.append_text_to_log_display('# 主编程步骤')
|
|
# 进入programming session
|
self.append_text_to_log_display('>>> 进入programming session')
|
response = await client.change_session(2)
|
# print(response)
|
|
# 请求种子
|
self.append_text_to_log_display('>>> 请求种子')
|
response = await client.request_seed(1)
|
# print(response)
|
|
# 发送密钥
|
self.append_text_to_log_display('>>> 发送密钥')
|
response = await client.send_key(1, bytes([0xb3, 0x42]))
|
# print(response)
|
|
# 发送driver文件
|
self.append_text_to_log_display('>>> 发送driver文件')
|
drv_file = self.drv_data
|
address = drv_file[16] << 24 | drv_file[17] << 16 | drv_file[
|
18] << 8 | drv_file[19] << 0
|
memorysize = drv_file[20] << 24 | drv_file[21] << 16 | drv_file[
|
22] << 8 | drv_file[23] << 0
|
memory_location = MemoryLocation(address, memorysize, 32, 32)
|
response = await client.request_download(memory_location)
|
# print(response)
|
|
max_length = response.service_data.max_length
|
|
# 有效数据长度, 去除sid和sequence两个字节
|
payload_length = max_length - 2
|
|
count = (len(drv_file) + payload_length - 1) // payload_length
|
|
base = self.get_progressbar_pos()
|
for i in range(count):
|
start = i * payload_length
|
end = start + payload_length
|
response = await client.transfer_data((i + 1) % 256,
|
drv_file[start:end])
|
self.set_progressbar_pos(base + end)
|
# print(response)
|
|
response = await client.request_transfer_exit()
|
# print(response)
|
|
# driver文件完整性检验
|
self.append_text_to_log_display('>>> driver文件完整性检验')
|
response = await client.start_routine(0xf001, drv_file[0:4])
|
# print(response)
|
|
app_file = self.app_data
|
address = app_file[16] << 24 | app_file[17] << 16 | app_file[
|
18] << 8 | app_file[19] << 0
|
memorysize = app_file[20] << 24 | app_file[21] << 16 | app_file[
|
22] << 8 | app_file[23] << 0
|
memory_location = MemoryLocation(address, memorysize, 32, 32)
|
|
# 删除app存储空间
|
self.append_text_to_log_display('>>> 删除app存储空间')
|
data = b''
|
data += memory_location.alfid.get_byte()
|
data += memory_location.get_address_bytes()
|
data += memory_location.get_memorysize_bytes()
|
response = await client.start_routine(0xff00, data)
|
# print(response)
|
|
# 发送app文件
|
self.append_text_to_log_display('>>> 发送app文件')
|
response = await client.request_download(memory_location)
|
# print(response)
|
|
max_length = response.service_data.max_length
|
|
# 有效数据长度, 去除sid和sequence两个字节
|
payload_length = max_length - 2
|
|
count = (len(app_file) + payload_length - 1) // payload_length
|
|
base = self.get_progressbar_pos()
|
for i in range(count):
|
start = i * payload_length
|
end = start + payload_length
|
response = await client.transfer_data((i + 1) % 256,
|
app_file[start:end])
|
self.set_progressbar_pos(base + end)
|
# print(response)
|
|
response = await client.request_transfer_exit()
|
# print(response)
|
|
# app文件完整性检验
|
self.append_text_to_log_display('>>> app文件完整性检验')
|
response = await client.start_routine(0xf001, app_file[0:4])
|
# print(response)
|
|
async def post_programming(self, client: Client):
|
|
self.append_text_to_log_display('# 后编程步骤')
|
|
# 回到default session
|
self.append_text_to_log_display('>>> 回到default session')
|
response = await client.change_session(1, True)
|
# print(response)
|
|
async def start_programming(self):
|
|
network = aioisotp.ISOTPNetwork(bus=self.bus)
|
|
with network.open():
|
|
reader, writer = await network.open_connection(
|
self.rx_id, self.tx_id)
|
|
client = Client(reader, writer)
|
|
try:
|
await client.tester_present(timeout=0.3)
|
except asyncio.TimeoutError:
|
# canalystii 第一条请求无效
|
# timeout >= 0.3
|
if isinstance(self.bus, CANalystIIBus):
|
pass
|
elif isinstance(self.bus, VectorBus):
|
self.append_text_to_log_display('查看是否选错CAN通道', '#ff0000')
|
|
except Exception as e:
|
self.append_text_to_log_display('%s' % e, '#ff0000')
|
|
self.set_progressbar_pos(0)
|
|
t1 = time.time()
|
|
try:
|
|
await self.pre_programming(client)
|
await self.programming(client)
|
await self.post_programming(client)
|
|
except asyncio.TimeoutError:
|
|
self.append_text_to_log_display('request timeout', '#ff0000')
|
|
except Exception as e:
|
|
self.append_text_to_log_display('%s' % e, '#ff0000')
|
|
t2 = time.time()
|
|
self.append_text_to_log_display('finished in %.2f sec' % (t2 - t1))
|
self.in_programming = False
|
self.findChild(QtWidgets.QPushButton,
|
'btn_start').setDisabled(True)
|
|
def set_progressbar_pos(self, pos):
|
|
drv_data_len = len(self.drv_data) if self.drv_data is not None else 0
|
app_data_len = len(self.app_data) if self.app_data is not None else 0
|
total_len = drv_data_len + app_data_len
|
if pos > total_len:
|
pos = total_len
|
elif pos < 0:
|
pos = 0
|
|
self.findChild(QtWidgets.QProgressBar, 'progress_bar').setValue(pos)
|
|
def get_progressbar_pos(self):
|
|
return self.findChild(QtWidgets.QProgressBar, 'progress_bar').value()
|
|
def append_text_to_log_display(self, text, color='#000000'):
|
|
text = '<font color="%s">%s</font>' % (color, text)
|
self.findChild(QtWidgets.QTextBrowser, 'tb_log_display').append(text)
|
|
def clear_log_display(self):
|
|
self.findChild(QtWidgets.QTextBrowser, 'tb_log_display').clear()
|
|
@QtCore.pyqtSlot()
|
def on_btn_drv_clicked(self):
|
|
if self.in_programming:
|
self.append_text_to_log_display('## 正在编程中')
|
return
|
|
fileName, _ = QtWidgets.QFileDialog.getOpenFileName(
|
self, 'load driver file', '.', 'All Files (*);;Bin Files (*.bin)')
|
|
if fileName != '':
|
self.findChild(QtWidgets.QLineEdit, 'le_drv').setText(fileName)
|
with open(fileName, 'rb') as fd:
|
self.drv_data = fd.read()
|
drv_data_len = len(
|
self.drv_data) if self.drv_data is not None else 0
|
app_data_len = len(
|
self.app_data) if self.app_data is not None else 0
|
total_len = drv_data_len + app_data_len
|
self.findChild(QtWidgets.QProgressBar,
|
'progress_bar').setRange(0, total_len)
|
|
@QtCore.pyqtSlot()
|
def on_btn_app_clicked(self):
|
|
if self.in_programming:
|
self.append_text_to_log_display('## 正在编程中')
|
return
|
|
fileName, _ = QtWidgets.QFileDialog.getOpenFileName(
|
self, 'load app file', '.', 'All Files (*);;Bin Files (*.bin)')
|
|
if fileName != '':
|
self.findChild(QtWidgets.QLineEdit, 'le_app').setText(fileName)
|
with open(fileName, 'rb') as fd:
|
self.app_data = fd.read()
|
drv_data_len = len(
|
self.drv_data) if self.drv_data is not None else 0
|
app_data_len = len(
|
self.app_data) if self.app_data is not None else 0
|
total_len = drv_data_len + app_data_len
|
self.findChild(QtWidgets.QProgressBar,
|
'progress_bar').setRange(0, total_len)
|
|
@QtCore.pyqtSlot()
|
def on_btn_start_clicked(self):
|
|
if self.drv_data is None or self.app_data is None:
|
self.append_text_to_log_display('请加载driver文件和app文件')
|
return
|
|
if self.bus is None:
|
self.append_text_to_log_display('请打开设备')
|
return
|
|
if self.in_programming:
|
self.append_text_to_log_display('## 正在编程中')
|
return
|
|
self.in_programming = True
|
loop = asyncio.get_event_loop()
|
loop.create_task(self.start_programming())
|
|
@QtCore.pyqtSlot()
|
def on_btn_open_clicked(self):
|
|
if self.in_programming:
|
self.append_text_to_log_display('## 正在编程中')
|
return
|
|
interface_type_index = self.findChild(
|
QtWidgets.QComboBox, 'cb_interface_type').currentIndex()
|
|
baudrate_index = self.findChild(QtWidgets.QComboBox,
|
'cb_baudrate').currentIndex()
|
|
can_channel_index = self.findChild(QtWidgets.QComboBox,
|
'cb_can_channel').currentIndex()
|
|
# bus = VectorBus(channel=1, bitrate=500000, can_filters=can_filters)
|
# bus = CANalystIIBus(channel=0, baud=500000, can_filters=can_filters)
|
|
self.clear_log_display()
|
|
can_filters = [{'can_id': self.rx_id, 'can_mask': 0xFFFFFFFF}]
|
|
bitrate = list(TIMING_DICT.keys())[baudrate_index]
|
|
try:
|
|
if interface_type_index == 0:
|
self.bus = VectorBus(channel=can_channel_index,
|
bitrate=bitrate,
|
can_filters=can_filters)
|
|
elif interface_type_index == 1:
|
self.bus = CANalystIIBus(channel=can_channel_index,
|
baud=bitrate,
|
can_filters=can_filters)
|
|
self.append_text_to_log_display('open device success')
|
self.findChild(QtWidgets.QPushButton,
|
'btn_start').setDisabled(False)
|
|
except Exception as e:
|
self.append_text_to_log_display('%s' % e, '#ff0000')
|
|
@QtCore.pyqtSlot(str)
|
def on_cb_interface_type_currentIndexChanged(self, text):
|
|
if text == 'CANoe':
|
self.findChild(QtWidgets.QComboBox,
|
'cb_can_channel').setItemText(0, '通道1/3')
|
self.findChild(QtWidgets.QComboBox,
|
'cb_can_channel').setItemText(1, '通道2/4')
|
elif text == 'USBCAN':
|
self.findChild(QtWidgets.QComboBox,
|
'cb_can_channel').setItemText(0, 'CAN1')
|
self.findChild(QtWidgets.QComboBox,
|
'cb_can_channel').setItemText(1, 'CAN2')
|
|
|
# @PyQt5.QtCore.pyqtSlot(参数)
|
# def on_发送者对象名称_发射信号名称(self, 参数):
|
# pass
|