698b17133dd304fe293f495aa3b96c4274ccdd50..a1a1c89b91fc9476c51981699aaa7c2bccdd8570
2022-08-07 tao_z
SX7H初步完成
a1a1c8 diff | tree
2022-07-20 tao_z
调整支持HEX烧录功能
14a1ed diff | tree
1 files added
5 files modified
290 ■■■■ changed files
Shifter.py 5 ●●●● patch | view | raw | blame | history
ShifterDefine.py 2 ●●● patch | view | raw | blame | history
USBCAN.py 26 ●●●●● patch | view | raw | blame | history
hexread.py 103 ●●●●● patch | view | raw | blame | history
test.py 67 ●●●●● patch | view | raw | blame | history
widgets/ShifterTool.py 87 ●●●● patch | view | raw | blame | history
Shifter.py
@@ -1,4 +1,3 @@
from email.message import Message
from ShifterDefine import *
import struct
from udsoncan import DidCodec
@@ -293,6 +292,9 @@
            reqpos = TCU2_ShiterLevel_dic['M']
            self.pre_ShiftLeverPos = "P"
            self.max_pos = 'Shifter position Zero'
        elif shift.position is SA1_Status_GearShftPosReq_dic[
                'Shifter not initialized']:
            pass
        else:
            if shift.position is not SA1_Status_GearShftPosReq_dic[
                    'Shifter position Zero']:
@@ -304,5 +306,6 @@
                self.pre_ShiftLeverPos = [
                    k for k, v in TCU2_ShiterLevel_dic.items() if v == reqpos
                ][0]
        data = self.pack_TCU2(reqpos)
        return data
ShifterDefine.py
@@ -94,7 +94,7 @@
SA1_Status_ParkButtonReq_dic = {
    'No request': 0x0,
    'Driver request park button': 1,
    'Park Button fault': 2,
    'Park button fault': 2,
    'Reserved': 3
}
SA1_Status_UnlockButtonReq_dic = {
USBCAN.py
@@ -304,6 +304,8 @@
        self.device = device_type
        self.device_index = device_index
        self.channel = can_index
        self.timestamp_start = None
        self.laststamp = None
        Timing0, Timing1 = TIMING_DICT[bitrate]
@@ -429,17 +431,23 @@
        if rtn == 0xFFFFFFFF or rtn == 0:
            return None, False
        else:
            if self.timestamp_start == None:
                self.laststamp = raw_message[0].TimeStamp
                self.timestamp_start = self.laststamp
            for i in range(rtn):
                msg.append(
                    Message(
                        timestamp=raw_message[i].TimeStamp
                        if raw_message[i].TimeFlag else 0.0,
                        arbitration_id=raw_message[i].ID,
                        is_remote_frame=raw_message[i].RemoteFlag,
                        channel=0,
                        extended_id=raw_message[i].ExternFlag,
                        data=raw_message[i].Data,
                    ))
                    Message(timestamp=(
                        (raw_message[i].TimeStamp - self.timestamp_start) /
                        1000000) if raw_message[i].TimeFlag else 0.0,
                            arbitration_id=raw_message[i].ID,
                            is_remote_frame=raw_message[i].RemoteFlag,
                            channel=0,
                            extended_id=raw_message[i].ExternFlag,
                            data=raw_message[i].Data,
                            is_error_frame=False))
                # timestamp = raw_message[i].TimeStamp if raw_message[
                #     i].TimeFlag else 0.0,
                # print(timestamp)
            return (msg, rtn)
    def ListeningMsg(self, connectRet, needClose, msgQueue: Queue,
hexread.py
New file
@@ -0,0 +1,103 @@
"""
HEX格式文件以行为单位记录数据,每行都由任意数量的十六进制数组成。
每一行的格式如下:
:本行数据长度(2byte)+数据起始地址(4byte)+数据类型(2byte)+数据内容(N byte)+校验(1byte)
"""
from aenum import IntEnum
import binascii
HEX_LEN_H = 1
HEX_LEN_L = 2
HEX_ADDR_HH = 3
HEX_ADDR_HL = 4
HEX_ADDR_LH = 5
HEX_ADDR_LL = 6
HEX_DATA_TYPE_H = 7
HEX_DATA_TYPE_L = 8
HEX_DATA_START = 9
HexTable = {
    48: 0,
    49: 1,
    50: 2,
    51: 3,
    52: 4,
    53: 5,
    54: 6,
    55: 7,
    56: 8,
    57: 9,
    65: 10,
    66: 11,
    67: 12,
    68: 13,
    69: 14,
    70: 15
}
class HexDataType(IntEnum):
    DataRecord = 0
    FileEnd = 1
    ExtentionAddr = 2
    StartSectionAddr = 3
    ExtentionLinearAddr = 4
    StartLinearSectionAddr = 3
class Hex_read(object):
    def __init__(self):
        self.start_addr = 0
        self.len = 0
        self.data = b''
    def Open_file(self, file):
        self.file = file
        self.start_addr = 0
        self.len = 0
        self.data = b''
        with open(file, 'rb') as fd:
            for aLineData in fd:
                if aLineData[0] == 0x3A:  # 0x3A == ":"
                    # print(aLineData)
                    DataLen = HexTable[aLineData[HEX_LEN_H]] * 16 + HexTable[
                        aLineData[HEX_LEN_L]]
                    Data_Addr = HexTable[
                        aLineData[HEX_ADDR_HH]] * 16 * 16 * 16 + HexTable[
                            aLineData[HEX_ADDR_HL]] * 16 * 16 + HexTable[
                                aLineData[HEX_ADDR_LH]] * 16 + HexTable[
                                    aLineData[HEX_ADDR_LL]]
                    Data_Type = HexTable[
                        aLineData[HEX_DATA_TYPE_H]] * 16 + HexTable[
                            aLineData[HEX_DATA_TYPE_L]]
                    if Data_Type == HexDataType.FileEnd:
                        break
                    if Data_Type == HexDataType.ExtentionLinearAddr:
                        self.start_addr = HexTable[aLineData[
                            HEX_DATA_START]] << 28 | HexTable[aLineData[
                                HEX_DATA_START + 1]] << 20 | HexTable[
                                    aLineData[HEX_DATA_START +
                                              2]] << 12 | HexTable[aLineData[
                                                  HEX_DATA_START + 3]] << 4
                        # print(self.start_addr)
                    if Data_Type == HexDataType.DataRecord:
                        if self.len == 0:
                            self.start_addr += Data_Addr
                        self.len += DataLen
                        self.data += aLineData[HEX_DATA_START:HEX_DATA_START +
                                               DataLen * 2]
            fd.close()
            self.data = binascii.a2b_hex(self.data)
            # print(len(self.data))
            # print(self.start_addr)
            # print(bin_file[0:20])
test.py
@@ -14,6 +14,8 @@
from ShifterDefine import *
import cantools
from pprint import pprint
import binascii
from hexread import *
# def rece_msg(bus):
#     msg = bus.Receive(0.1)
#     if msg[0] is not None:
@@ -52,7 +54,7 @@
    "IndicationLEDControl": 0x8101
}
print(DID_dic["IndicationLEDControl"])
# print(DID_dic["IndicationLEDControl"])
# def f(conn):
#     conn.send([1, 'test', None])
@@ -217,31 +219,44 @@
#     sys.exit(app.exec_())
dbc = cantools.database.load_file("DBC/SX7H.dbc")
# print(dbc.messages)
sa_message = dbc.get_message_by_name('SA1')
# print(sa_message)
# pprint(sa_message.signals)
frame = [0x00, 0x0A, 0x00, 0xF0, 0, 0, 0, 0]
input_signal = dbc.decode_message(0x420, frame)
part_str = str(input_signal['SA1_Status_ParkButtonReq'])
pprint(part_str)
if input_signal['SA1_Status_ParkButtonReq'] == 'No request':
    print('yes')
# dbc = cantools.database.load_file("DBC/SX7H.dbc")
# # print(dbc.messages)
# sa_message = dbc.get_message_by_name('SA1')
# # print(sa_message)
# # pprint(sa_message.signals)
# frame = [0x00, 0x0A, 0x00, 0xF0, 0, 0, 0, 0]
# input_signal = dbc.decode_message(0x420, frame)
# part_str = str(input_signal['SA1_Status_ParkButtonReq'])
# pprint(part_str)
# if input_signal['SA1_Status_ParkButtonReq'] == 'No request':
#     print('yes')
data = sa_message.encode({
    'SA1_Status_PRNDL': 0,
    'SA1_Status_GearShftPosReq': 0,  #'Shifter position Zero',
    'SA1_Status_ParkButtonReq': 2,
    "SA1_ShifterManualSignal": 0,
    "SA1_ShifterModeSignal": 1,
    "SA1_Status_ShftSensSng_Fault": 0,
    "SA1_IND_ShifterMisUsd": 0,
    "SA1_Live_counter": 1,
    "SA1_Status_UnlockButtonReq": 1,
    "SA1_Status_EcoShifterModeReq": 0,
    "SA1_Status_RqGearPosInV": 0xf,
    "SA1_Status_ShiftPosValidFlag": 0,
})
# data = sa_message.encode({
#     'SA1_Status_PRNDL': 0,
#     'SA1_Status_GearShftPosReq': 0,  #'Shifter position Zero',
#     'SA1_Status_ParkButtonReq': 2,
#     "SA1_ShifterManualSignal": 0,
#     "SA1_ShifterModeSignal": 1,
#     "SA1_Status_ShftSensSng_Fault": 0,
#     "SA1_IND_ShifterMisUsd": 0,
#     "SA1_Live_counter": 1,
#     "SA1_Status_UnlockButtonReq": 1,
#     "SA1_Status_EcoShifterModeReq": 0,
#     "SA1_Status_RqGearPosInV": 0xf,
#     "SA1_Status_ShiftPosValidFlag": 0,
# })
# print(data)
# print(SA1_Status_ParkButtonReq_dic['No request'])
with open(r"./source bin/SX7H_Shifter.bin", 'rb') as fd:
    bin_file = fd.read()
    print(bin_file[0:20])
print('=====================================================')
hex_rd = Hex_read()
hex_rd.Open_file(r"./source bin/SX7H_Shifter.hex")
hex_file = hex_rd.data
print(hex_file[0:20])
aa = binascii.a2b_hex(hex_file)
print(aa[0:20])
widgets/ShifterTool.py
@@ -4,7 +4,7 @@
from logging import exception
import threading
from typing import List
from can import BusABC, Message
from can import BusABC, Message, Logger, Listener
from udsoncan.client import Client
from udsoncan.exceptions import TimeoutException
import udsoncan
@@ -23,6 +23,7 @@
import datetime
from ShifterDefine import *
from multiprocessing import Process, Queue, Value, Pipe
from hexread import *
MAX_RCV_NUM = 20
APP_ADDR_LOCATION_OFFSET = 8
@@ -255,6 +256,8 @@
            0x1c, 0x01, 0x06, 0x80, 0x1f, 0x01, 0x06, 0x80, 0xfb, 0x0a
        ]
        self.app_data: bytes = None
        self.start_timestamp = time.time()
        self.boot_logger = None
    def DeviceInit(self):
        self._usbcan = None
@@ -413,7 +416,15 @@
        msg.channel = 0
        msg.data = isotp_msg.data  # bytearray
        msg.is_extended_id = False
        msg.timestamp = time.time() - self.start_timestamp
        msg.is_error_frame = False
        msg.is_remote_frame = False
        self.Tool_send(msg)
    def Tool_send(self, msg: Message):
        self._usbcan.send(msg)
        if self.boot_logger is not None:
            self.boot_logger.on_message_received(msg)
    def WidgetsInit(self):
        # self.UI.setupUi(self)
@@ -475,6 +486,9 @@
        self.UI.pushButton_48.clicked.connect(self.start_programming)
        self.UI.pushButton_46.clicked.connect(self.loadAppfile)
        self.UI.radioButton_2.toggled.connect(self.SetFromBootFlag)
        self.UI.pushButton_47.clicked.connect(self.SetBootLog)
        # self.UI.comboBox_10.activated.connect(self.ReportByMask)
        self.UI.comboBox_11.addItems(list(ReportBymask_DTC.keys()))
        self.UI.comboBox_12.addItems(list(DTC_Dic.keys()))
@@ -546,12 +560,15 @@
                                    msg[i])  # send msg to isotp
                            self.msgQueue.put(msg[i])
                            if self.boot_logger is not None:
                                self.boot_logger.on_message_received(msg[i])
                            # logger.info('time:'.format(msg[i].timestamp)+'Rx: ID=0x{:0<3x} '.format(msg[i].arbitration_id) + ' '.join(['0x' +
                            #    '{:0<2x}'.format(a).upper() for a in list(msg[i].data)]))
                            # send signal to update screen
                        g_signal.sig_MsgReceived.emit(num)
                # msgToSendCnt = self.sendQueue.qsize()
                # if msgToSendCnt > 0:
                #     msg = Message()
@@ -560,7 +577,7 @@
                #         msg = self.sendQueue.get()
                #         # logger.info('Tx: ID=0x{:0<3x} '.format(
                #         #     msg.arbitration_id)+'DLC={} '.format(msg.dlc)+'externd flag ={} '.format(msg.is_extended_id)+'remote frame:{} '.format(msg.is_remote_frame))
                #         self._usbcan.send(msg)
                #         self.Tool_send(msg)
    def send_dump(self):
        msg = Message()
        msg.arbitration_id = self.shifter.canid.phy_rxId
@@ -569,7 +586,9 @@
        msg.data = [0x02, 0x3e, 0x80, 0, 0, 0, 0, 0]
        msg.is_extended_id = False
        msg.is_remote_frame = False
        self._usbcan.send(msg)
        msg.is_error_frame = False
        msg.timestamp = time.time() - self.start_timestamp
        self.Tool_send(msg)
    def send_VehiclePosition(self, data=[]):
        msg = Message()
@@ -579,7 +598,9 @@
        msg.data = data
        msg.is_extended_id = False
        msg.is_remote_frame = False
        self._usbcan.send(msg)
        msg.is_error_frame = False
        msg.timestamp = time.time() - self.start_timestamp
        self.Tool_send(msg)
    def open_close(self):
        if self._isOpen.value == 0:
@@ -591,7 +612,7 @@
                    TIMING_DICT.keys())[self.devicedescription.baudrate]
                self._usbcan = USBCAN(device_type=4,
                                      device_index=0,
                                      can_index=0,
                                      can_index=self.devicedescription.channel,
                                      bitrate=bitrate,
                                      can_filters=can_filters)
                if not self._usbcan.InitAndStart():
@@ -612,6 +633,9 @@
            self.needdisconnect.value = 1
            self._deviceOpenUpdate()
            self.TestPresentTimer.stop()
    def closeEvent(self, e):
        self.boot_logger.stop()
    def update_HardwareDevice(self):
@@ -1153,11 +1177,48 @@
        if fileName != '':
            self.UI.comboBox_4.addItem(fileName)
            self.UI.comboBox_4.setCurrentText(fileName)
            with open(fileName, 'rb') as fd:
                self.app_data = fd.read()
                drv_data_len = len(
                    self.drv_data) if self.drv_data is not None else 0
                app_data_len = len(
                    self.app_data) if self.app_data is not None else 0
                total_len = drv_data_len + app_data_len
                self.UI.progressBar.setRange(0, total_len)
            # with open(fileName, 'rb') as fd:
            #     self.app_data = fd.read()
            #     drv_data_len = len(
            #         self.drv_data) if self.drv_data is not None else 0
            #     app_data_len = len(
            #         self.app_data) if self.app_data is not None else 0
            #     total_len = drv_data_len + app_data_len
            #     self.UI.progressBar.setRange(0, total_len)
            # print(self.app_data)
            # print("===============================================")
            hex_read = Hex_read()
            hex_read.Open_file(fileName)
            self.app_data = hex_read.data
            drv_data_len = len(
                self.drv_data) if self.drv_data is not None else 0
            app_data_len = len(
                self.app_data) if self.app_data is not None else 0
            total_len = drv_data_len + app_data_len
            self.UI.progressBar.setRange(0, total_len)
    def SetBootLog(self):
        if self.in_programming:
            self.disp_string('## 正在编程中')
            return
        fileName, _ = QtWidgets.QFileDialog.getSaveFileName(
            None, 'comboBox_10', '.',
            'All Files (*);;ASC Files (*.asc);;Blf Files (*.blf)')
        if fileName != '':
            self.UI.comboBox_10.addItem(fileName)
            self.UI.comboBox_10.setCurrentText(fileName)
            print(fileName)
            self.boot_logger = Logger(fileName)
            # self.listener = [self.boot_logger]
            # with open(fileName, 'rb') as fd:
            #     self.app_data = fd.read()
            #     drv_data_len = len(
            #         self.drv_data) if self.drv_data is not None else 0
            #     app_data_len = len(
            #         self.app_data) if self.app_data is not None else 0
            #     total_len = drv_data_len + app_data_len
            #     self.UI.progressBar.setRange(0, total_len)