|
|
# # def send_msg():
|
# # meg = Message()
|
# # msg = VCI_CAN_OBJ()
|
from ast import arg
|
import threading
|
from tkinter.tix import Tree
|
from USBCAN import *
|
import sys
|
from PySide2.QtWidgets import QApplication, QMainWindow
|
from mainwindows import *
|
from multiprocessing import Process, Queue, Value
|
import datetime
|
# def rece_msg(bus):
|
# msg = bus.Receive(0.1)
|
# if msg[0] is not None:
|
# print(msg[0].arbitration_id)
|
|
|
def send_msg(bus):
|
msg = Message(
|
arbitration_id=0x742,
|
is_remote_frame=0,
|
channel=0,
|
dlc=8,
|
data=[0]*8,
|
)
|
bus.send(msg)
|
|
|
# bus = open_device()
|
# while True:
|
# send_msg(bus)
|
# # rece_msg(bus)
|
|
|
device_type = DEVICETYPE['USBCAN-II']
|
device_index = 0
|
can_channel = 0
|
can_filters = [
|
# {'can_id': self.shifter.canid.normalid, 'can_mask': 0xFFFFFFFF},
|
{'can_id': 0x420, 'can_mask': 0xFFFFFFFF}]
|
bus = USBCAN(device_type, device_index, can_channel,
|
bitrate=500000, can_filters=can_filters)
|
|
msg = Message(
|
arbitration_id=0x742,
|
is_remote_frame=0,
|
channel=0,
|
dlc=8,
|
data=bytearray(0x02, 0x3e, 0, 0, 0, 0, 0, 0),
|
)
|
|
|
q_rx = Queue()
|
q_tx = Queue()
|
connect = Value('i', 1)
|
close = Value('i', 0)
|
|
# print(q_rx.empty())
|
|
# def start():
|
|
|
# def rece_msg():
|
# # num = bus.GetReceiveNum(device_type, device_index, can_channel)
|
# print(num)
|
# for i in range(num):
|
# # while not q_rx.empty():
|
# print(q_rx.get())
|
# # time.sleep(0.1)
|
|
|
# bus.CloseDevice()
|
# start()
|
def formatedata(index, item, received):
|
data = []
|
if received:
|
data.append('{:0>4}'.format(index))
|
data.append(item.timestamp)
|
data.append('接收')
|
else:
|
data.append('')
|
data.append(item.timestamp)
|
data.append('发送')
|
|
# print(time.time(), time.localtime(item.TimeStamp), item.TimeStamp)
|
data.append(str.upper(str(hex(item.arbitration_id))).replace('X', 'x'))
|
|
# data.append('远程帧' if int(item.is_remote_frame) == 1 else '数据帧')
|
data.append(item.dlc)
|
if int(item.is_extended_id) == 1:
|
data.append('扩展帧')
|
else:
|
data.append('标准帧')
|
|
data.append(' '.join(['{:0<2x}'.format(a) for a in list(item.data)]))
|
return data
|
|
|
def rece_msg():
|
# msg, num = bus.Receive(len=10)
|
num = q_rx.qsize()
|
if not num == 0:
|
for i in range(num):
|
# print(msg[0].arbitration_id)
|
print(formatedata(i, q_rx.get(), True))
|
|
|
def tx_task(bus: USBCAN):
|
while True:
|
bus.Transmit(device_type, device_index, can_channel, msg=msg, len=1)
|
time.sleep(0.1)
|
|
|
def rx_task(bus: USBCAN):
|
while True:
|
msg, num = bus.Receive(len=10)
|
time.sleep(0.1)
|
if not num == 0:
|
for i in range(num):
|
if msg[i].arbitration_id == 0x77a:
|
print(msg[i].arbitration_id)
|
|
|
if __name__ == '__main__':
|
bus.InitAndStart()
|
info = bus.GetDeviceInf()
|
print(info.fw_version)
|
# arg = {bus}
|
t = threading.Thread(target=rx_task, args=(bus,))
|
tx = threading.Thread(target=tx_task, args=(bus,))
|
t.start()
|
tx.start()
|
# msgProcess = Process(
|
# name='pyUSBCANListener',
|
# target=bus.ListeningMsg,
|
# args=(connect, close, q_rx, q_tx))
|
# msgProcess.daemon = True
|
# msgProcess.start()
|
# while True:
|
# rece_msg()
|
|
# def inserttable(table, data):
|
# # for row in range(5):
|
# for column in range(7):
|
# item = QTableWidgetItem() # 模型
|
# item.setTextAlignment(Qt.AlignHCenter | Qt.AlignVCenter) # 文本显示位置
|
# item.setText(str(data[column]))
|
# item.setFont(QFont("微软雅黑", 12, QFont.Black)) # 设置字体
|
# table.setItem(0, column, item)
|
|
|
# if __name__ == '__main__':
|
|
# app = QApplication(sys.argv)
|
# # loop = QEventLoop(app)
|
# # asyncio.set_event_loop(loop)
|
|
# # w = MainWidget()
|
# # w = FlashBootloaderWidget()
|
# # w = DataIdentifierWidget()
|
# # w = DiagnosticTroubleCodeWidget()
|
|
# windows = QMainWindow()
|
# w = Ui_MainWindow()
|
# w.setupUi(windows)
|
# windows.show()
|
# msg = Message(arbitration_id=0x420, is_extended_id=0,
|
# dlc=8, data=bytearray(8))
|
# data = formatedata(0, msg, True)
|
# inserttable(w.tableWidget, data)
|
# # w.show()
|
|
# sys.exit(app.exec_())
|