# # def send_msg(): # # meg = Message() # # msg = VCI_CAN_OBJ() from ast import arg import encodings import threading from tkinter.tix import Tree from USBCAN import * import sys from PySide2.QtWidgets import QApplication, QMainWindow from mainwindows import * from multiprocessing import Process, Queue, Value, Pipe import datetime from ShifterDefine import * import cantools from pprint import pprint import binascii from hexread import * # def rece_msg(bus): # msg = bus.Receive(0.1) # if msg[0] is not None: # print(msg[0].arbitration_id) DID_dic = { "ProgrammingCounter": 0x2100, "ProAtpCounter": 0x2101, "BackupConfig": 0x2102, "VehicleName": 0x2103, "DiagVersion": 0x2104, "BootSoftId": 0x2105, "DFLZMPartNum": 0xF187, "VehiclEECUSoftwareVersion": 0x2108, "SupplierID": 0xF18A, "ECUManufacturingDate": 0x210B, "ECUSerialNumID": 0xf18c, "VINDataIdentifier": 0xF190, "ECUHardwareVersion": 0xF193, "ECUSoftwareVersion": 0xF195, "SystemNameOrEngineType": 0x2110, "RepairShopCodeOrTester": 0x2111, "ECUProgtammingDate": 0x2111, "InstallDate": 0x2113, "NetConfig": 0xf019, "FunctionConfig": 0xf010, "ActiveDiagSession": 0x2106, "EGSMSensorPositon": 0x2116, "ReleaseButtonState": 0x2118, "PButtionState": 0x2119, "PaddleState": 0x211A, "ECUPowerVoltage": 0x211B, "VehicleSpeed": 0x211c, "SystemName": 0xF197, "WriteFingerPrint": 0xF15A, "ReadFingerPrint": 0xf15b, "IndicationLEDControl": 0x8101 } # print(DID_dic["IndicationLEDControl"]) # def f(conn): # conn.send([1, 'test', None]) # conn.send([2, 'test', None]) # print(conn.recv()) # conn.close() # if __name__ == "__main__": # parent_conn, child_conn = Pipe() # 产生两个返回对象,一个是管道这一头,一个是另一头 # p = Process(target=f, args=(child_conn,)) # p.start() # print(parent_conn.recv()) # print(parent_conn.recv()) # parent_conn.send('father test') # p.join() # def send_msg(bus): # msg = Message( # arbitration_id=0x742, # is_remote_frame=0, # channel=0, # dlc=8, # data=[0]*8, # ) # bus.send(msg) # # bus = open_device() # # while True: # # send_msg(bus) # # # rece_msg(bus) # device_type = DEVICETYPE['USBCAN-II'] # device_index = 0 # can_channel = 0 # can_filters = [ # # {'can_id': self.shifter.canid.normalid, 'can_mask': 0xFFFFFFFF}, # {'can_id': 0x420, 'can_mask': 0xFFFFFFFF}] # bus = USBCAN(device_type, device_index, can_channel, # bitrate=500000, can_filters=can_filters) # msg = Message( # arbitration_id=0x742, # is_remote_frame=0, # channel=0, # dlc=8, # data=bytearray([0x02, 0x3e, 0, 0, 0, 0, 0, 0]) # ) # q_rx = Queue() # q_tx = Queue() # connect = Value('i', 1) # close = Value('i', 0) # # print(q_rx.empty()) # # def start(): # # def rece_msg(): # # # num = bus.GetReceiveNum(device_type, device_index, can_channel) # # print(num) # # for i in range(num): # # # while not q_rx.empty(): # # print(q_rx.get()) # # # time.sleep(0.1) # # bus.CloseDevice() # # start() # def formatedata(index, item, received): # data = [] # if received: # data.append('{:0>4}'.format(index)) # data.append(item.timestamp) # data.append('接收') # else: # data.append('') # data.append(item.timestamp) # data.append('发送') # # print(time.time(), time.localtime(item.TimeStamp), item.TimeStamp) # data.append(str.upper(str(hex(item.arbitration_id))).replace('X', 'x')) # # data.append('远程帧' if int(item.is_remote_frame) == 1 else '数据帧') # data.append(item.dlc) # if int(item.is_extended_id) == 1: # data.append('扩展帧') # else: # data.append('标准帧') # data.append(' '.join(['{:0<2x}'.format(a) for a in list(item.data)])) # return data # def rece_msg(): # # msg, num = bus.Receive(len=10) # num = q_rx.qsize() # if not num == 0: # for i in range(num): # # print(msg[0].arbitration_id) # print(formatedata(i, q_rx.get(), True)) # def tx_task(bus: USBCAN): # while True: # bus.send(msg=msg) # time.sleep(0.1) # def rx_task(bus: USBCAN): # while True: # msg, num = bus.Receive(len=10) # time.sleep(0.1) # if not num == 0: # for i in range(num): # if msg[i].arbitration_id == 0x77a: # # print(formatedata(i, msg[i], True)) # print(msg[i].data) # if __name__ == '__main__': # bus.InitAndStart() # info = bus.GetDeviceInf() # print(info.fw_version) # # arg = {bus} # t = threading.Thread(target=rx_task, args=(bus,)) # tx = threading.Thread(target=tx_task, args=(bus,)) # t.start() # tx.start() # msgProcess = Process( # name='pyUSBCANListener', # target=bus.ListeningMsg, # args=(connect, close, q_rx, q_tx)) # msgProcess.daemon = True # msgProcess.start() # while True: # rece_msg() # def inserttable(table, data): # # for row in range(5): # for column in range(7): # item = QTableWidgetItem() # 模型 # item.setTextAlignment(Qt.AlignHCenter | Qt.AlignVCenter) # 文本显示位置 # item.setText(str(data[column])) # item.setFont(QFont("微软雅黑", 12, QFont.Black)) # 设置字体 # table.setItem(0, column, item) # if __name__ == '__main__': # app = QApplication(sys.argv) # # loop = QEventLoop(app) # # asyncio.set_event_loop(loop) # # w = MainWidget() # # w = FlashBootloaderWidget() # # w = DataIdentifierWidget() # # w = DiagnosticTroubleCodeWidget() # windows = QMainWindow() # w = Ui_MainWindow() # w.setupUi(windows) # windows.show() # msg = Message(arbitration_id=0x420, is_extended_id=0, # dlc=8, data=bytearray(8)) # data = formatedata(0, msg, True) # inserttable(w.tableWidget, data) # # w.show() # sys.exit(app.exec_()) # dbc = cantools.database.load_file("DBC/SX7H.dbc") # # print(dbc.messages) # sa_message = dbc.get_message_by_name('SA1') # # print(sa_message) # # pprint(sa_message.signals) # frame = [0x00, 0x0A, 0x00, 0xF0, 0, 0, 0, 0] # input_signal = dbc.decode_message(0x420, frame) # part_str = str(input_signal['SA1_Status_ParkButtonReq']) # pprint(part_str) # if input_signal['SA1_Status_ParkButtonReq'] == 'No request': # print('yes') # data = sa_message.encode({ # 'SA1_Status_PRNDL': 0, # 'SA1_Status_GearShftPosReq': 0, #'Shifter position Zero', # 'SA1_Status_ParkButtonReq': 2, # "SA1_ShifterManualSignal": 0, # "SA1_ShifterModeSignal": 1, # "SA1_Status_ShftSensSng_Fault": 0, # "SA1_IND_ShifterMisUsd": 0, # "SA1_Live_counter": 1, # "SA1_Status_UnlockButtonReq": 1, # "SA1_Status_EcoShifterModeReq": 0, # "SA1_Status_RqGearPosInV": 0xf, # "SA1_Status_ShiftPosValidFlag": 0, # }) # print(data) # print(SA1_Status_ParkButtonReq_dic['No request']) with open(r"./source bin/SX7H_Shifter.bin", 'rb') as fd: bin_file = fd.read() print(bin_file[0:20]) print('=====================================================') hex_rd = Hex_read() hex_rd.Open_file(r"./source bin/SX7H_Shifter.hex") hex_file = hex_rd.data print(hex_file[0:20]) aa = binascii.a2b_hex(hex_file) print(aa[0:20])