From a1a1c89b91fc9476c51981699aaa7c2bccdd8570 Mon Sep 17 00:00:00 2001 From: tao_z <tzj0429@163.com> Date: Sun, 07 Aug 2022 14:19:02 +0800 Subject: [PATCH] SX7H初步完成 --- test.py | 311 +++++++++++++++++++++++++++++++++------------------ 1 files changed, 198 insertions(+), 113 deletions(-) diff --git a/test.py b/test.py index a61aad1..401bfa9 100644 --- a/test.py +++ b/test.py @@ -1,147 +1,191 @@ - - # # def send_msg(): # # meg = Message() # # msg = VCI_CAN_OBJ() from ast import arg +import encodings import threading from tkinter.tix import Tree from USBCAN import * import sys from PySide2.QtWidgets import QApplication, QMainWindow from mainwindows import * -from multiprocessing import Process, Queue, Value +from multiprocessing import Process, Queue, Value, Pipe import datetime +from ShifterDefine import * +import cantools +from pprint import pprint +import binascii +from hexread import * # def rece_msg(bus): # msg = bus.Receive(0.1) # if msg[0] is not None: # print(msg[0].arbitration_id) +DID_dic = { + "ProgrammingCounter": 0x2100, + "ProAtpCounter": 0x2101, + "BackupConfig": 0x2102, + "VehicleName": 0x2103, + "DiagVersion": 0x2104, + "BootSoftId": 0x2105, + "DFLZMPartNum": 0xF187, + "VehiclEECUSoftwareVersion": 0x2108, + "SupplierID": 0xF18A, + "ECUManufacturingDate": 0x210B, + "ECUSerialNumID": 0xf18c, + "VINDataIdentifier": 0xF190, + "ECUHardwareVersion": 0xF193, + "ECUSoftwareVersion": 0xF195, + "SystemNameOrEngineType": 0x2110, + "RepairShopCodeOrTester": 0x2111, + "ECUProgtammingDate": 0x2111, + "InstallDate": 0x2113, + "NetConfig": 0xf019, + "FunctionConfig": 0xf010, + "ActiveDiagSession": 0x2106, + "EGSMSensorPositon": 0x2116, + "ReleaseButtonState": 0x2118, + "PButtionState": 0x2119, + "PaddleState": 0x211A, + "ECUPowerVoltage": 0x211B, + "VehicleSpeed": 0x211c, + "SystemName": 0xF197, + "WriteFingerPrint": 0xF15A, + "ReadFingerPrint": 0xf15b, + "IndicationLEDControl": 0x8101 +} +# print(DID_dic["IndicationLEDControl"]) -def send_msg(bus): - msg = Message( - arbitration_id=0x742, - is_remote_frame=0, - channel=0, - dlc=8, - data=[0]*8, - ) - bus.send(msg) +# def f(conn): +# conn.send([1, 'test', None]) +# conn.send([2, 'test', None]) +# print(conn.recv()) +# conn.close() +# if __name__ == "__main__": +# parent_conn, child_conn = Pipe() # 产生两个返回对象,一个是管道这一头,一个是另一头 +# p = Process(target=f, args=(child_conn,)) +# p.start() +# print(parent_conn.recv()) +# print(parent_conn.recv()) +# parent_conn.send('father test') +# p.join() -# bus = open_device() -# while True: -# send_msg(bus) -# # rece_msg(bus) +# def send_msg(bus): +# msg = Message( +# arbitration_id=0x742, +# is_remote_frame=0, +# channel=0, +# dlc=8, +# data=[0]*8, +# ) +# bus.send(msg) +# # bus = open_device() +# # while True: +# # send_msg(bus) +# # # rece_msg(bus) -device_type = DEVICETYPE['USBCAN-II'] -device_index = 0 -can_channel = 0 -can_filters = [ - # {'can_id': self.shifter.canid.normalid, 'can_mask': 0xFFFFFFFF}, - {'can_id': 0x420, 'can_mask': 0xFFFFFFFF}] -bus = USBCAN(device_type, device_index, can_channel, - bitrate=500000, can_filters=can_filters) +# device_type = DEVICETYPE['USBCAN-II'] +# device_index = 0 +# can_channel = 0 +# can_filters = [ +# # {'can_id': self.shifter.canid.normalid, 'can_mask': 0xFFFFFFFF}, +# {'can_id': 0x420, 'can_mask': 0xFFFFFFFF}] +# bus = USBCAN(device_type, device_index, can_channel, +# bitrate=500000, can_filters=can_filters) -msg = Message( - arbitration_id=0x742, - is_remote_frame=0, - channel=0, - dlc=8, - data=bytearray([0x02, 0x3e, 0, 0, 0, 0, 0, 0]) -) +# msg = Message( +# arbitration_id=0x742, +# is_remote_frame=0, +# channel=0, +# dlc=8, +# data=bytearray([0x02, 0x3e, 0, 0, 0, 0, 0, 0]) +# ) +# q_rx = Queue() +# q_tx = Queue() +# connect = Value('i', 1) +# close = Value('i', 0) -q_rx = Queue() -q_tx = Queue() -connect = Value('i', 1) -close = Value('i', 0) +# # print(q_rx.empty()) -# print(q_rx.empty()) +# # def start(): -# def start(): +# # def rece_msg(): +# # # num = bus.GetReceiveNum(device_type, device_index, can_channel) +# # print(num) +# # for i in range(num): +# # # while not q_rx.empty(): +# # print(q_rx.get()) +# # # time.sleep(0.1) +# # bus.CloseDevice() +# # start() +# def formatedata(index, item, received): +# data = [] +# if received: +# data.append('{:0>4}'.format(index)) +# data.append(item.timestamp) +# data.append('接收') +# else: +# data.append('') +# data.append(item.timestamp) +# data.append('发送') + +# # print(time.time(), time.localtime(item.TimeStamp), item.TimeStamp) +# data.append(str.upper(str(hex(item.arbitration_id))).replace('X', 'x')) + +# # data.append('远程帧' if int(item.is_remote_frame) == 1 else '数据帧') +# data.append(item.dlc) +# if int(item.is_extended_id) == 1: +# data.append('扩展帧') +# else: +# data.append('标准帧') + +# data.append(' '.join(['{:0<2x}'.format(a) for a in list(item.data)])) +# return data # def rece_msg(): -# # num = bus.GetReceiveNum(device_type, device_index, can_channel) -# print(num) -# for i in range(num): -# # while not q_rx.empty(): -# print(q_rx.get()) -# # time.sleep(0.1) +# # msg, num = bus.Receive(len=10) +# num = q_rx.qsize() +# if not num == 0: +# for i in range(num): +# # print(msg[0].arbitration_id) +# print(formatedata(i, q_rx.get(), True)) +# def tx_task(bus: USBCAN): +# while True: +# bus.send(msg=msg) +# time.sleep(0.1) -# bus.CloseDevice() -# start() -def formatedata(index, item, received): - data = [] - if received: - data.append('{:0>4}'.format(index)) - data.append(item.timestamp) - data.append('接收') - else: - data.append('') - data.append(item.timestamp) - data.append('发送') +# def rx_task(bus: USBCAN): +# while True: +# msg, num = bus.Receive(len=10) +# time.sleep(0.1) +# if not num == 0: +# for i in range(num): +# if msg[i].arbitration_id == 0x77a: +# # print(formatedata(i, msg[i], True)) +# print(msg[i].data) - # print(time.time(), time.localtime(item.TimeStamp), item.TimeStamp) - data.append(str.upper(str(hex(item.arbitration_id))).replace('X', 'x')) - - # data.append('远程帧' if int(item.is_remote_frame) == 1 else '数据帧') - data.append(item.dlc) - if int(item.is_extended_id) == 1: - data.append('扩展帧') - else: - data.append('标准帧') - - data.append(' '.join(['{:0<2x}'.format(a) for a in list(item.data)])) - return data - - -def rece_msg(): - # msg, num = bus.Receive(len=10) - num = q_rx.qsize() - if not num == 0: - for i in range(num): - # print(msg[0].arbitration_id) - print(formatedata(i, q_rx.get(), True)) - - -def tx_task(bus: USBCAN): - while True: - bus.send(msg=msg) - time.sleep(0.1) - - -def rx_task(bus: USBCAN): - while True: - msg, num = bus.Receive(len=10) - time.sleep(0.1) - if not num == 0: - for i in range(num): - if msg[i].arbitration_id == 0x77a: - # print(formatedata(i, msg[i], True)) - print(msg[i].data) - - -if __name__ == '__main__': - bus.InitAndStart() - info = bus.GetDeviceInf() - print(info.fw_version) - # arg = {bus} - t = threading.Thread(target=rx_task, args=(bus,)) - tx = threading.Thread(target=tx_task, args=(bus,)) - t.start() - tx.start() - # msgProcess = Process( - # name='pyUSBCANListener', - # target=bus.ListeningMsg, - # args=(connect, close, q_rx, q_tx)) - # msgProcess.daemon = True - # msgProcess.start() - # while True: - # rece_msg() +# if __name__ == '__main__': +# bus.InitAndStart() +# info = bus.GetDeviceInf() +# print(info.fw_version) +# # arg = {bus} +# t = threading.Thread(target=rx_task, args=(bus,)) +# tx = threading.Thread(target=tx_task, args=(bus,)) +# t.start() +# tx.start() +# msgProcess = Process( +# name='pyUSBCANListener', +# target=bus.ListeningMsg, +# args=(connect, close, q_rx, q_tx)) +# msgProcess.daemon = True +# msgProcess.start() +# while True: +# rece_msg() # def inserttable(table, data): # # for row in range(5): @@ -151,7 +195,6 @@ # item.setText(str(data[column])) # item.setFont(QFont("微软雅黑", 12, QFont.Black)) # 设置字体 # table.setItem(0, column, item) - # if __name__ == '__main__': @@ -175,3 +218,45 @@ # # w.show() # sys.exit(app.exec_()) + +# dbc = cantools.database.load_file("DBC/SX7H.dbc") +# # print(dbc.messages) +# sa_message = dbc.get_message_by_name('SA1') +# # print(sa_message) +# # pprint(sa_message.signals) +# frame = [0x00, 0x0A, 0x00, 0xF0, 0, 0, 0, 0] +# input_signal = dbc.decode_message(0x420, frame) +# part_str = str(input_signal['SA1_Status_ParkButtonReq']) +# pprint(part_str) +# if input_signal['SA1_Status_ParkButtonReq'] == 'No request': +# print('yes') + +# data = sa_message.encode({ +# 'SA1_Status_PRNDL': 0, +# 'SA1_Status_GearShftPosReq': 0, #'Shifter position Zero', +# 'SA1_Status_ParkButtonReq': 2, +# "SA1_ShifterManualSignal": 0, +# "SA1_ShifterModeSignal": 1, +# "SA1_Status_ShftSensSng_Fault": 0, +# "SA1_IND_ShifterMisUsd": 0, +# "SA1_Live_counter": 1, +# "SA1_Status_UnlockButtonReq": 1, +# "SA1_Status_EcoShifterModeReq": 0, +# "SA1_Status_RqGearPosInV": 0xf, +# "SA1_Status_ShiftPosValidFlag": 0, +# }) +# print(data) +# print(SA1_Status_ParkButtonReq_dic['No request']) + +with open(r"./source bin/SX7H_Shifter.bin", 'rb') as fd: + bin_file = fd.read() + print(bin_file[0:20]) +print('=====================================================') + +hex_rd = Hex_read() +hex_rd.Open_file(r"./source bin/SX7H_Shifter.hex") +hex_file = hex_rd.data +print(hex_file[0:20]) + +aa = binascii.a2b_hex(hex_file) +print(aa[0:20]) -- Gitblit v1.8.0