# # def send_msg(): # # meg = Message() # # msg = VCI_CAN_OBJ() from ast import arg import threading from tkinter.tix import Tree from USBCAN import * import sys from PySide2.QtWidgets import QApplication, QMainWindow from mainwindows import * from multiprocessing import Process, Queue, Value import datetime # def rece_msg(bus): # msg = bus.Receive(0.1) # if msg[0] is not None: # print(msg[0].arbitration_id) def send_msg(bus): msg = Message( arbitration_id=0x742, is_remote_frame=0, channel=0, dlc=8, data=[0]*8, ) bus.send(msg) # bus = open_device() # while True: # send_msg(bus) # # rece_msg(bus) device_type = DEVICETYPE['USBCAN-II'] device_index = 0 can_channel = 0 can_filters = [ # {'can_id': self.shifter.canid.normalid, 'can_mask': 0xFFFFFFFF}, {'can_id': 0x420, 'can_mask': 0xFFFFFFFF}] bus = USBCAN(device_type, device_index, can_channel, bitrate=500000, can_filters=can_filters) msg = Message( arbitration_id=0x742, is_remote_frame=0, channel=0, dlc=8, data=bytearray([0x02, 0x3e, 0, 0, 0, 0, 0, 0]) ) q_rx = Queue() q_tx = Queue() connect = Value('i', 1) close = Value('i', 0) # print(q_rx.empty()) # def start(): # def rece_msg(): # # num = bus.GetReceiveNum(device_type, device_index, can_channel) # print(num) # for i in range(num): # # while not q_rx.empty(): # print(q_rx.get()) # # time.sleep(0.1) # bus.CloseDevice() # start() def formatedata(index, item, received): data = [] if received: data.append('{:0>4}'.format(index)) data.append(item.timestamp) data.append('接收') else: data.append('') data.append(item.timestamp) data.append('发送') # print(time.time(), time.localtime(item.TimeStamp), item.TimeStamp) data.append(str.upper(str(hex(item.arbitration_id))).replace('X', 'x')) # data.append('远程帧' if int(item.is_remote_frame) == 1 else '数据帧') data.append(item.dlc) if int(item.is_extended_id) == 1: data.append('扩展帧') else: data.append('标准帧') data.append(' '.join(['{:0<2x}'.format(a) for a in list(item.data)])) return data def rece_msg(): # msg, num = bus.Receive(len=10) num = q_rx.qsize() if not num == 0: for i in range(num): # print(msg[0].arbitration_id) print(formatedata(i, q_rx.get(), True)) def tx_task(bus: USBCAN): while True: bus.send(msg=msg) time.sleep(0.1) def rx_task(bus: USBCAN): while True: msg, num = bus.Receive(len=10) time.sleep(0.1) if not num == 0: for i in range(num): if msg[i].arbitration_id == 0x77a: # print(formatedata(i, msg[i], True)) print(msg[i].data) if __name__ == '__main__': bus.InitAndStart() info = bus.GetDeviceInf() print(info.fw_version) # arg = {bus} t = threading.Thread(target=rx_task, args=(bus,)) tx = threading.Thread(target=tx_task, args=(bus,)) t.start() tx.start() # msgProcess = Process( # name='pyUSBCANListener', # target=bus.ListeningMsg, # args=(connect, close, q_rx, q_tx)) # msgProcess.daemon = True # msgProcess.start() # while True: # rece_msg() # def inserttable(table, data): # # for row in range(5): # for column in range(7): # item = QTableWidgetItem() # 模型 # item.setTextAlignment(Qt.AlignHCenter | Qt.AlignVCenter) # 文本显示位置 # item.setText(str(data[column])) # item.setFont(QFont("微软雅黑", 12, QFont.Black)) # 设置字体 # table.setItem(0, column, item) # if __name__ == '__main__': # app = QApplication(sys.argv) # # loop = QEventLoop(app) # # asyncio.set_event_loop(loop) # # w = MainWidget() # # w = FlashBootloaderWidget() # # w = DataIdentifierWidget() # # w = DiagnosticTroubleCodeWidget() # windows = QMainWindow() # w = Ui_MainWindow() # w.setupUi(windows) # windows.show() # msg = Message(arbitration_id=0x420, is_extended_id=0, # dlc=8, data=bytearray(8)) # data = formatedata(0, msg, True) # inserttable(w.tableWidget, data) # # w.show() # sys.exit(app.exec_())