tao_z
2022-07-20 14a1ed561791fbc2357cb37206a1c0e73a97abc2
调整支持HEX烧录功能
1 files added
4 files modified
261 ■■■■ changed files
Shifter.py 1 ●●●● patch | view | raw | blame | history
USBCAN.py 16 ●●●● patch | view | raw | blame | history
hexread.py 103 ●●●●● patch | view | raw | blame | history
test.py 67 ●●●●● patch | view | raw | blame | history
widgets/ShifterTool.py 74 ●●●●● patch | view | raw | blame | history
Shifter.py
@@ -1,4 +1,3 @@
from email.message import Message
from ShifterDefine import *
import struct
from udsoncan import DidCodec
USBCAN.py
@@ -304,6 +304,8 @@
        self.device = device_type
        self.device_index = device_index
        self.channel = can_index
        self.timestamp_start = None
        self.laststamp = None
        Timing0, Timing1 = TIMING_DICT[bitrate]
@@ -429,17 +431,23 @@
        if rtn == 0xFFFFFFFF or rtn == 0:
            return None, False
        else:
            if self.timestamp_start == None:
                self.laststamp = raw_message[0].TimeStamp
                self.timestamp_start = self.laststamp
            for i in range(rtn):
                msg.append(
                    Message(
                        timestamp=raw_message[i].TimeStamp
                        if raw_message[i].TimeFlag else 0.0,
                    Message(timestamp=(
                        (raw_message[i].TimeStamp - self.timestamp_start) /
                        1000000) if raw_message[i].TimeFlag else 0.0,
                        arbitration_id=raw_message[i].ID,
                        is_remote_frame=raw_message[i].RemoteFlag,
                        channel=0,
                        extended_id=raw_message[i].ExternFlag,
                        data=raw_message[i].Data,
                    ))
                            is_error_frame=False))
                # timestamp = raw_message[i].TimeStamp if raw_message[
                #     i].TimeFlag else 0.0,
                # print(timestamp)
            return (msg, rtn)
    def ListeningMsg(self, connectRet, needClose, msgQueue: Queue,
hexread.py
New file
@@ -0,0 +1,103 @@
"""
HEX格式文件以行为单位记录数据,每行都由任意数量的十六进制数组成。
每一行的格式如下:
:本行数据长度(2byte)+数据起始地址(4byte)+数据类型(2byte)+数据内容(N byte)+校验(1byte)
"""
from aenum import IntEnum
import binascii
HEX_LEN_H = 1
HEX_LEN_L = 2
HEX_ADDR_HH = 3
HEX_ADDR_HL = 4
HEX_ADDR_LH = 5
HEX_ADDR_LL = 6
HEX_DATA_TYPE_H = 7
HEX_DATA_TYPE_L = 8
HEX_DATA_START = 9
HexTable = {
    48: 0,
    49: 1,
    50: 2,
    51: 3,
    52: 4,
    53: 5,
    54: 6,
    55: 7,
    56: 8,
    57: 9,
    65: 10,
    66: 11,
    67: 12,
    68: 13,
    69: 14,
    70: 15
}
class HexDataType(IntEnum):
    DataRecord = 0
    FileEnd = 1
    ExtentionAddr = 2
    StartSectionAddr = 3
    ExtentionLinearAddr = 4
    StartLinearSectionAddr = 3
class Hex_read(object):
    def __init__(self):
        self.start_addr = 0
        self.len = 0
        self.data = b''
    def Open_file(self, file):
        self.file = file
        self.start_addr = 0
        self.len = 0
        self.data = b''
        with open(file, 'rb') as fd:
            for aLineData in fd:
                if aLineData[0] == 0x3A:  # 0x3A == ":"
                    # print(aLineData)
                    DataLen = HexTable[aLineData[HEX_LEN_H]] * 16 + HexTable[
                        aLineData[HEX_LEN_L]]
                    Data_Addr = HexTable[
                        aLineData[HEX_ADDR_HH]] * 16 * 16 * 16 + HexTable[
                            aLineData[HEX_ADDR_HL]] * 16 * 16 + HexTable[
                                aLineData[HEX_ADDR_LH]] * 16 + HexTable[
                                    aLineData[HEX_ADDR_LL]]
                    Data_Type = HexTable[
                        aLineData[HEX_DATA_TYPE_H]] * 16 + HexTable[
                            aLineData[HEX_DATA_TYPE_L]]
                    if Data_Type == HexDataType.FileEnd:
                        break
                    if Data_Type == HexDataType.ExtentionLinearAddr:
                        self.start_addr = HexTable[aLineData[
                            HEX_DATA_START]] << 28 | HexTable[aLineData[
                                HEX_DATA_START + 1]] << 20 | HexTable[
                                    aLineData[HEX_DATA_START +
                                              2]] << 12 | HexTable[aLineData[
                                                  HEX_DATA_START + 3]] << 4
                        # print(self.start_addr)
                    if Data_Type == HexDataType.DataRecord:
                        if self.len == 0:
                            self.start_addr += Data_Addr
                        self.len += DataLen
                        self.data += aLineData[HEX_DATA_START:HEX_DATA_START +
                                               DataLen * 2]
            fd.close()
            self.data = binascii.a2b_hex(self.data)
            # print(len(self.data))
            # print(self.start_addr)
            # print(bin_file[0:20])
test.py
@@ -14,6 +14,8 @@
from ShifterDefine import *
import cantools
from pprint import pprint
import binascii
from hexread import *
# def rece_msg(bus):
#     msg = bus.Receive(0.1)
#     if msg[0] is not None:
@@ -52,7 +54,7 @@
    "IndicationLEDControl": 0x8101
}
print(DID_dic["IndicationLEDControl"])
# print(DID_dic["IndicationLEDControl"])
# def f(conn):
#     conn.send([1, 'test', None])
@@ -217,31 +219,44 @@
#     sys.exit(app.exec_())
dbc = cantools.database.load_file("DBC/SX7H.dbc")
# print(dbc.messages)
sa_message = dbc.get_message_by_name('SA1')
# print(sa_message)
# pprint(sa_message.signals)
frame = [0x00, 0x0A, 0x00, 0xF0, 0, 0, 0, 0]
input_signal = dbc.decode_message(0x420, frame)
part_str = str(input_signal['SA1_Status_ParkButtonReq'])
pprint(part_str)
if input_signal['SA1_Status_ParkButtonReq'] == 'No request':
    print('yes')
# dbc = cantools.database.load_file("DBC/SX7H.dbc")
# # print(dbc.messages)
# sa_message = dbc.get_message_by_name('SA1')
# # print(sa_message)
# # pprint(sa_message.signals)
# frame = [0x00, 0x0A, 0x00, 0xF0, 0, 0, 0, 0]
# input_signal = dbc.decode_message(0x420, frame)
# part_str = str(input_signal['SA1_Status_ParkButtonReq'])
# pprint(part_str)
# if input_signal['SA1_Status_ParkButtonReq'] == 'No request':
#     print('yes')
data = sa_message.encode({
    'SA1_Status_PRNDL': 0,
    'SA1_Status_GearShftPosReq': 0,  #'Shifter position Zero',
    'SA1_Status_ParkButtonReq': 2,
    "SA1_ShifterManualSignal": 0,
    "SA1_ShifterModeSignal": 1,
    "SA1_Status_ShftSensSng_Fault": 0,
    "SA1_IND_ShifterMisUsd": 0,
    "SA1_Live_counter": 1,
    "SA1_Status_UnlockButtonReq": 1,
    "SA1_Status_EcoShifterModeReq": 0,
    "SA1_Status_RqGearPosInV": 0xf,
    "SA1_Status_ShiftPosValidFlag": 0,
})
# data = sa_message.encode({
#     'SA1_Status_PRNDL': 0,
#     'SA1_Status_GearShftPosReq': 0,  #'Shifter position Zero',
#     'SA1_Status_ParkButtonReq': 2,
#     "SA1_ShifterManualSignal": 0,
#     "SA1_ShifterModeSignal": 1,
#     "SA1_Status_ShftSensSng_Fault": 0,
#     "SA1_IND_ShifterMisUsd": 0,
#     "SA1_Live_counter": 1,
#     "SA1_Status_UnlockButtonReq": 1,
#     "SA1_Status_EcoShifterModeReq": 0,
#     "SA1_Status_RqGearPosInV": 0xf,
#     "SA1_Status_ShiftPosValidFlag": 0,
# })
# print(data)
# print(SA1_Status_ParkButtonReq_dic['No request'])
with open(r"./source bin/SX7H_Shifter.bin", 'rb') as fd:
    bin_file = fd.read()
    print(bin_file[0:20])
print('=====================================================')
hex_rd = Hex_read()
hex_rd.Open_file(r"./source bin/SX7H_Shifter.hex")
hex_file = hex_rd.data
print(hex_file[0:20])
aa = binascii.a2b_hex(hex_file)
print(aa[0:20])
widgets/ShifterTool.py
@@ -4,7 +4,7 @@
from logging import exception
import threading
from typing import List
from can import BusABC, Message
from can import BusABC, Message, Logger, Listener
from udsoncan.client import Client
from udsoncan.exceptions import TimeoutException
import udsoncan
@@ -23,6 +23,7 @@
import datetime
from ShifterDefine import *
from multiprocessing import Process, Queue, Value, Pipe
from hexread import *
MAX_RCV_NUM = 20
APP_ADDR_LOCATION_OFFSET = 8
@@ -255,6 +256,8 @@
            0x1c, 0x01, 0x06, 0x80, 0x1f, 0x01, 0x06, 0x80, 0xfb, 0x0a
        ]
        self.app_data: bytes = None
        self.start_timestamp = time.time()
        self.boot_logger = None
    def DeviceInit(self):
        self._usbcan = None
@@ -413,7 +416,15 @@
        msg.channel = 0
        msg.data = isotp_msg.data  # bytearray
        msg.is_extended_id = False
        msg.timestamp = time.time() - self.start_timestamp
        msg.is_error_frame = False
        msg.is_remote_frame = False
        self.Tool_send(msg)
    def Tool_send(self, msg: Message):
        self._usbcan.send(msg)
        if self.boot_logger is not None:
            self.boot_logger.on_message_received(msg)
    def WidgetsInit(self):
        # self.UI.setupUi(self)
@@ -475,6 +486,9 @@
        self.UI.pushButton_48.clicked.connect(self.start_programming)
        self.UI.pushButton_46.clicked.connect(self.loadAppfile)
        self.UI.radioButton_2.toggled.connect(self.SetFromBootFlag)
        self.UI.pushButton_47.clicked.connect(self.SetBootLog)
        # self.UI.comboBox_10.activated.connect(self.ReportByMask)
        self.UI.comboBox_11.addItems(list(ReportBymask_DTC.keys()))
        self.UI.comboBox_12.addItems(list(DTC_Dic.keys()))
@@ -546,12 +560,15 @@
                                    msg[i])  # send msg to isotp
                            self.msgQueue.put(msg[i])
                            if self.boot_logger is not None:
                                self.boot_logger.on_message_received(msg[i])
                            # logger.info('time:'.format(msg[i].timestamp)+'Rx: ID=0x{:0<3x} '.format(msg[i].arbitration_id) + ' '.join(['0x' +
                            #    '{:0<2x}'.format(a).upper() for a in list(msg[i].data)]))
                            # send signal to update screen
                        g_signal.sig_MsgReceived.emit(num)
                # msgToSendCnt = self.sendQueue.qsize()
                # if msgToSendCnt > 0:
                #     msg = Message()
@@ -560,7 +577,7 @@
                #         msg = self.sendQueue.get()
                #         # logger.info('Tx: ID=0x{:0<3x} '.format(
                #         #     msg.arbitration_id)+'DLC={} '.format(msg.dlc)+'externd flag ={} '.format(msg.is_extended_id)+'remote frame:{} '.format(msg.is_remote_frame))
                #         self._usbcan.send(msg)
                #         self.Tool_send(msg)
    def send_dump(self):
        msg = Message()
        msg.arbitration_id = self.shifter.canid.phy_rxId
@@ -569,7 +586,9 @@
        msg.data = [0x02, 0x3e, 0x80, 0, 0, 0, 0, 0]
        msg.is_extended_id = False
        msg.is_remote_frame = False
        self._usbcan.send(msg)
        msg.is_error_frame = False
        msg.timestamp = time.time() - self.start_timestamp
        self.Tool_send(msg)
    def send_VehiclePosition(self, data=[]):
        msg = Message()
@@ -579,7 +598,9 @@
        msg.data = data
        msg.is_extended_id = False
        msg.is_remote_frame = False
        self._usbcan.send(msg)
        msg.is_error_frame = False
        msg.timestamp = time.time() - self.start_timestamp
        self.Tool_send(msg)
    def open_close(self):
        if self._isOpen.value == 0:
@@ -612,6 +633,9 @@
            self.needdisconnect.value = 1
            self._deviceOpenUpdate()
            self.TestPresentTimer.stop()
    def closeEvent(self, e):
        self.boot_logger.stop()
    def update_HardwareDevice(self):
@@ -726,6 +750,7 @@
        pass
    def TestPresentEvent(self):
        self.udsclient.
        if self.UI.radioButton.isChecked():
            self.TestPresentTimer.start(3000)
        else:
@@ -1153,11 +1178,48 @@
        if fileName != '':
            self.UI.comboBox_4.addItem(fileName)
            self.UI.comboBox_4.setCurrentText(fileName)
            with open(fileName, 'rb') as fd:
                self.app_data = fd.read()
            # with open(fileName, 'rb') as fd:
            #     self.app_data = fd.read()
            #     drv_data_len = len(
            #         self.drv_data) if self.drv_data is not None else 0
            #     app_data_len = len(
            #         self.app_data) if self.app_data is not None else 0
            #     total_len = drv_data_len + app_data_len
            #     self.UI.progressBar.setRange(0, total_len)
            # print(self.app_data)
            # print("===============================================")
            hex_read = Hex_read()
            hex_read.Open_file(fileName)
            self.app_data = hex_read.data
                drv_data_len = len(
                    self.drv_data) if self.drv_data is not None else 0
                app_data_len = len(
                    self.app_data) if self.app_data is not None else 0
                total_len = drv_data_len + app_data_len
                self.UI.progressBar.setRange(0, total_len)
    def SetBootLog(self):
        if self.in_programming:
            self.disp_string('## 正在编程中')
            return
        fileName, _ = QtWidgets.QFileDialog.getSaveFileName(
            None, 'comboBox_10', '.',
            'All Files (*);;ASC Files (*.asc);;Blf Files (*.blf)')
        if fileName != '':
            self.UI.comboBox_10.addItem(fileName)
            self.UI.comboBox_10.setCurrentText(fileName)
            print(fileName)
            self.boot_logger = Logger(fileName)
            # self.listener = [self.boot_logger]
            # with open(fileName, 'rb') as fd:
            #     self.app_data = fd.read()
            #     drv_data_len = len(
            #         self.drv_data) if self.drv_data is not None else 0
            #     app_data_len = len(
            #         self.app_data) if self.app_data is not None else 0
            #     total_len = drv_data_len + app_data_len
            #     self.UI.progressBar.setRange(0, total_len)