| | |
| | | |
| | | |
| | | # # def send_msg(): |
| | | # # meg = Message() |
| | | # # msg = VCI_CAN_OBJ() |
| | | from ast import arg |
| | | import encodings |
| | | import threading |
| | | from tkinter.tix import Tree |
| | | from USBCAN import * |
| | | import sys |
| | | from PySide2.QtWidgets import QApplication, QMainWindow |
| | | from mainwindows import * |
| | | from multiprocessing import Process, Queue, Value |
| | | from multiprocessing import Process, Queue, Value, Pipe |
| | | import datetime |
| | | from ShifterDefine import * |
| | | import cantools |
| | | from pprint import pprint |
| | | # def rece_msg(bus): |
| | | # msg = bus.Receive(0.1) |
| | | # if msg[0] is not None: |
| | | # print(msg[0].arbitration_id) |
| | | DID_dic = { |
| | | "ProgrammingCounter": 0x2100, |
| | | "ProAtpCounter": 0x2101, |
| | | "BackupConfig": 0x2102, |
| | | "VehicleName": 0x2103, |
| | | "DiagVersion": 0x2104, |
| | | "BootSoftId": 0x2105, |
| | | "DFLZMPartNum": 0xF187, |
| | | "VehiclEECUSoftwareVersion": 0x2108, |
| | | "SupplierID": 0xF18A, |
| | | "ECUManufacturingDate": 0x210B, |
| | | "ECUSerialNumID": 0xf18c, |
| | | "VINDataIdentifier": 0xF190, |
| | | "ECUHardwareVersion": 0xF193, |
| | | "ECUSoftwareVersion": 0xF195, |
| | | "SystemNameOrEngineType": 0x2110, |
| | | "RepairShopCodeOrTester": 0x2111, |
| | | "ECUProgtammingDate": 0x2111, |
| | | "InstallDate": 0x2113, |
| | | "NetConfig": 0xf019, |
| | | "FunctionConfig": 0xf010, |
| | | "ActiveDiagSession": 0x2106, |
| | | "EGSMSensorPositon": 0x2116, |
| | | "ReleaseButtonState": 0x2118, |
| | | "PButtionState": 0x2119, |
| | | "PaddleState": 0x211A, |
| | | "ECUPowerVoltage": 0x211B, |
| | | "VehicleSpeed": 0x211c, |
| | | "SystemName": 0xF197, |
| | | "WriteFingerPrint": 0xF15A, |
| | | "ReadFingerPrint": 0xf15b, |
| | | "IndicationLEDControl": 0x8101 |
| | | } |
| | | |
| | | print(DID_dic["IndicationLEDControl"]) |
| | | |
| | | def send_msg(bus): |
| | | msg = Message( |
| | | arbitration_id=0x742, |
| | | is_remote_frame=0, |
| | | channel=0, |
| | | dlc=8, |
| | | data=[0]*8, |
| | | ) |
| | | bus.send(msg) |
| | | # def f(conn): |
| | | # conn.send([1, 'test', None]) |
| | | # conn.send([2, 'test', None]) |
| | | # print(conn.recv()) |
| | | # conn.close() |
| | | |
| | | # if __name__ == "__main__": |
| | | # parent_conn, child_conn = Pipe() # 产生两个返回对象,一个是管道这一头,一个是另一头 |
| | | # p = Process(target=f, args=(child_conn,)) |
| | | # p.start() |
| | | # print(parent_conn.recv()) |
| | | # print(parent_conn.recv()) |
| | | # parent_conn.send('father test') |
| | | # p.join() |
| | | |
| | | # bus = open_device() |
| | | # while True: |
| | | # send_msg(bus) |
| | | # # rece_msg(bus) |
| | | # def send_msg(bus): |
| | | # msg = Message( |
| | | # arbitration_id=0x742, |
| | | # is_remote_frame=0, |
| | | # channel=0, |
| | | # dlc=8, |
| | | # data=[0]*8, |
| | | # ) |
| | | # bus.send(msg) |
| | | |
| | | # # bus = open_device() |
| | | # # while True: |
| | | # # send_msg(bus) |
| | | # # # rece_msg(bus) |
| | | |
| | | device_type = DEVICETYPE['USBCAN-II'] |
| | | device_index = 0 |
| | | can_channel = 0 |
| | | can_filters = [ |
| | | # {'can_id': self.shifter.canid.normalid, 'can_mask': 0xFFFFFFFF}, |
| | | {'can_id': 0x420, 'can_mask': 0xFFFFFFFF}] |
| | | bus = USBCAN(device_type, device_index, can_channel, |
| | | bitrate=500000, can_filters=can_filters) |
| | | # device_type = DEVICETYPE['USBCAN-II'] |
| | | # device_index = 0 |
| | | # can_channel = 0 |
| | | # can_filters = [ |
| | | # # {'can_id': self.shifter.canid.normalid, 'can_mask': 0xFFFFFFFF}, |
| | | # {'can_id': 0x420, 'can_mask': 0xFFFFFFFF}] |
| | | # bus = USBCAN(device_type, device_index, can_channel, |
| | | # bitrate=500000, can_filters=can_filters) |
| | | |
| | | msg = Message( |
| | | arbitration_id=0x742, |
| | | is_remote_frame=0, |
| | | channel=0, |
| | | dlc=8, |
| | | data=bytearray(0x02, 0x3e, 0, 0, 0, 0, 0, 0), |
| | | ) |
| | | # msg = Message( |
| | | # arbitration_id=0x742, |
| | | # is_remote_frame=0, |
| | | # channel=0, |
| | | # dlc=8, |
| | | # data=bytearray([0x02, 0x3e, 0, 0, 0, 0, 0, 0]) |
| | | # ) |
| | | |
| | | # q_rx = Queue() |
| | | # q_tx = Queue() |
| | | # connect = Value('i', 1) |
| | | # close = Value('i', 0) |
| | | |
| | | q_rx = Queue() |
| | | q_tx = Queue() |
| | | connect = Value('i', 1) |
| | | close = Value('i', 0) |
| | | # # print(q_rx.empty()) |
| | | |
| | | # print(q_rx.empty()) |
| | | # # def start(): |
| | | |
| | | # def start(): |
| | | # # def rece_msg(): |
| | | # # # num = bus.GetReceiveNum(device_type, device_index, can_channel) |
| | | # # print(num) |
| | | # # for i in range(num): |
| | | # # # while not q_rx.empty(): |
| | | # # print(q_rx.get()) |
| | | # # # time.sleep(0.1) |
| | | |
| | | # # bus.CloseDevice() |
| | | # # start() |
| | | # def formatedata(index, item, received): |
| | | # data = [] |
| | | # if received: |
| | | # data.append('{:0>4}'.format(index)) |
| | | # data.append(item.timestamp) |
| | | # data.append('接收') |
| | | # else: |
| | | # data.append('') |
| | | # data.append(item.timestamp) |
| | | # data.append('发送') |
| | | |
| | | # # print(time.time(), time.localtime(item.TimeStamp), item.TimeStamp) |
| | | # data.append(str.upper(str(hex(item.arbitration_id))).replace('X', 'x')) |
| | | |
| | | # # data.append('远程帧' if int(item.is_remote_frame) == 1 else '数据帧') |
| | | # data.append(item.dlc) |
| | | # if int(item.is_extended_id) == 1: |
| | | # data.append('扩展帧') |
| | | # else: |
| | | # data.append('标准帧') |
| | | |
| | | # data.append(' '.join(['{:0<2x}'.format(a) for a in list(item.data)])) |
| | | # return data |
| | | |
| | | # def rece_msg(): |
| | | # # num = bus.GetReceiveNum(device_type, device_index, can_channel) |
| | | # print(num) |
| | | # for i in range(num): |
| | | # # while not q_rx.empty(): |
| | | # print(q_rx.get()) |
| | | # # time.sleep(0.1) |
| | | # # msg, num = bus.Receive(len=10) |
| | | # num = q_rx.qsize() |
| | | # if not num == 0: |
| | | # for i in range(num): |
| | | # # print(msg[0].arbitration_id) |
| | | # print(formatedata(i, q_rx.get(), True)) |
| | | |
| | | # def tx_task(bus: USBCAN): |
| | | # while True: |
| | | # bus.send(msg=msg) |
| | | # time.sleep(0.1) |
| | | |
| | | # bus.CloseDevice() |
| | | # start() |
| | | def formatedata(index, item, received): |
| | | data = [] |
| | | if received: |
| | | data.append('{:0>4}'.format(index)) |
| | | data.append(item.timestamp) |
| | | data.append('接收') |
| | | else: |
| | | data.append('') |
| | | data.append(item.timestamp) |
| | | data.append('发送') |
| | | # def rx_task(bus: USBCAN): |
| | | # while True: |
| | | # msg, num = bus.Receive(len=10) |
| | | # time.sleep(0.1) |
| | | # if not num == 0: |
| | | # for i in range(num): |
| | | # if msg[i].arbitration_id == 0x77a: |
| | | # # print(formatedata(i, msg[i], True)) |
| | | # print(msg[i].data) |
| | | |
| | | # print(time.time(), time.localtime(item.TimeStamp), item.TimeStamp) |
| | | data.append(str.upper(str(hex(item.arbitration_id))).replace('X', 'x')) |
| | | |
| | | # data.append('远程帧' if int(item.is_remote_frame) == 1 else '数据帧') |
| | | data.append(item.dlc) |
| | | if int(item.is_extended_id) == 1: |
| | | data.append('扩展帧') |
| | | else: |
| | | data.append('标准帧') |
| | | |
| | | data.append(' '.join(['{:0<2x}'.format(a) for a in list(item.data)])) |
| | | return data |
| | | |
| | | |
| | | def rece_msg(): |
| | | # msg, num = bus.Receive(len=10) |
| | | num = q_rx.qsize() |
| | | if not num == 0: |
| | | for i in range(num): |
| | | # print(msg[0].arbitration_id) |
| | | print(formatedata(i, q_rx.get(), True)) |
| | | |
| | | |
| | | def tx_task(bus: USBCAN): |
| | | while True: |
| | | bus.Transmit(device_type, device_index, can_channel, msg=msg, len=1) |
| | | time.sleep(0.1) |
| | | |
| | | |
| | | def rx_task(bus: USBCAN): |
| | | while True: |
| | | msg, num = bus.Receive(len=10) |
| | | time.sleep(0.1) |
| | | if not num == 0: |
| | | for i in range(num): |
| | | if msg[i].arbitration_id == 0x77a: |
| | | print(msg[i].arbitration_id) |
| | | |
| | | |
| | | if __name__ == '__main__': |
| | | bus.InitAndStart() |
| | | info = bus.GetDeviceInf() |
| | | print(info.fw_version) |
| | | # arg = {bus} |
| | | t = threading.Thread(target=rx_task, args=(bus,)) |
| | | tx = threading.Thread(target=tx_task, args=(bus,)) |
| | | t.start() |
| | | tx.start() |
| | | # msgProcess = Process( |
| | | # name='pyUSBCANListener', |
| | | # target=bus.ListeningMsg, |
| | | # args=(connect, close, q_rx, q_tx)) |
| | | # msgProcess.daemon = True |
| | | # msgProcess.start() |
| | | # while True: |
| | | # rece_msg() |
| | | # if __name__ == '__main__': |
| | | # bus.InitAndStart() |
| | | # info = bus.GetDeviceInf() |
| | | # print(info.fw_version) |
| | | # # arg = {bus} |
| | | # t = threading.Thread(target=rx_task, args=(bus,)) |
| | | # tx = threading.Thread(target=tx_task, args=(bus,)) |
| | | # t.start() |
| | | # tx.start() |
| | | # msgProcess = Process( |
| | | # name='pyUSBCANListener', |
| | | # target=bus.ListeningMsg, |
| | | # args=(connect, close, q_rx, q_tx)) |
| | | # msgProcess.daemon = True |
| | | # msgProcess.start() |
| | | # while True: |
| | | # rece_msg() |
| | | |
| | | # def inserttable(table, data): |
| | | # # for row in range(5): |
| | |
| | | # item.setText(str(data[column])) |
| | | # item.setFont(QFont("微软雅黑", 12, QFont.Black)) # 设置字体 |
| | | # table.setItem(0, column, item) |
| | | |
| | | |
| | | # if __name__ == '__main__': |
| | | |
| | |
| | | # # w.show() |
| | | |
| | | # sys.exit(app.exec_()) |
| | | |
| | | dbc = cantools.database.load_file("DBC/SX7H.dbc") |
| | | # print(dbc.messages) |
| | | sa_message = dbc.get_message_by_name('SA1') |
| | | # print(sa_message) |
| | | # pprint(sa_message.signals) |
| | | frame = [0x00, 0x0A, 0x00, 0xF0, 0, 0, 0, 0] |
| | | input_signal = dbc.decode_message(0x420, frame) |
| | | part_str = str(input_signal['SA1_Status_ParkButtonReq']) |
| | | pprint(part_str) |
| | | if input_signal['SA1_Status_ParkButtonReq'] == 'No request': |
| | | print('yes') |
| | | |
| | | data = sa_message.encode({ |
| | | 'SA1_Status_PRNDL': 0, |
| | | 'SA1_Status_GearShftPosReq': 0, #'Shifter position Zero', |
| | | 'SA1_Status_ParkButtonReq': 2, |
| | | "SA1_ShifterManualSignal": 0, |
| | | "SA1_ShifterModeSignal": 1, |
| | | "SA1_Status_ShftSensSng_Fault": 0, |
| | | "SA1_IND_ShifterMisUsd": 0, |
| | | "SA1_Live_counter": 1, |
| | | "SA1_Status_UnlockButtonReq": 1, |
| | | "SA1_Status_EcoShifterModeReq": 0, |
| | | "SA1_Status_RqGearPosInV": 0xf, |
| | | "SA1_Status_ShiftPosValidFlag": 0, |
| | | }) |
| | | # print(data) |
| | | # print(SA1_Status_ParkButtonReq_dic['No request']) |