|
|
# # def send_msg():
|
# # meg = Message()
|
# # msg = VCI_CAN_OBJ()
|
from ast import arg
|
import threading
|
from tkinter.tix import Tree
|
from USBCAN import *
|
import sys
|
from PySide2.QtWidgets import QApplication, QMainWindow
|
from mainwindows import *
|
from multiprocessing import Process, Queue, Value, Pipe
|
import datetime
|
# def rece_msg(bus):
|
# msg = bus.Receive(0.1)
|
# if msg[0] is not None:
|
# print(msg[0].arbitration_id)
|
DID_dic = {
|
"ProgrammingCounter": 0x2100,
|
"ProAtpCounter": 0x2101,
|
"BackupConfig": 0x2102,
|
"VehicleName": 0x2103,
|
"DiagVersion": 0x2104,
|
"BootSoftId": 0x2105,
|
"DFLZMPartNum": 0xF187,
|
"VehiclEECUSoftwareVersion": 0x2108,
|
"SupplierID": 0xF18A,
|
"ECUManufacturingDate": 0x210B,
|
"ECUSerialNumID": 0xf18c,
|
"VINDataIdentifier": 0xF190,
|
"ECUHardwareVersion": 0xF193,
|
"ECUSoftwareVersion": 0xF195,
|
"SystemNameOrEngineType": 0x2110,
|
"RepairShopCodeOrTester": 0x2111,
|
"ECUProgtammingDate": 0x2111,
|
"InstallDate": 0x2113,
|
"NetConfig": 0xf019,
|
"FunctionConfig": 0xf010,
|
"ActiveDiagSession": 0x2106,
|
"EGSMSensorPositon": 0x2116,
|
"ReleaseButtonState": 0x2118,
|
"PButtionState": 0x2119,
|
"PaddleState": 0x211A,
|
"ECUPowerVoltage": 0x211B,
|
"VehicleSpeed": 0x211c,
|
"SystemName": 0xF197,
|
"WriteFingerPrint": 0xF15A,
|
"ReadFingerPrint": 0xf15b,
|
"IndicationLEDControl": 0x8101
|
}
|
|
print(DID_dic["IndicationLEDControl"])
|
|
# def f(conn):
|
# conn.send([1, 'test', None])
|
# conn.send([2, 'test', None])
|
# print(conn.recv())
|
# conn.close()
|
|
|
# if __name__ == "__main__":
|
# parent_conn, child_conn = Pipe() # 产生两个返回对象,一个是管道这一头,一个是另一头
|
# p = Process(target=f, args=(child_conn,))
|
# p.start()
|
# print(parent_conn.recv())
|
# print(parent_conn.recv())
|
# parent_conn.send('father test')
|
# p.join()
|
|
# def send_msg(bus):
|
# msg = Message(
|
# arbitration_id=0x742,
|
# is_remote_frame=0,
|
# channel=0,
|
# dlc=8,
|
# data=[0]*8,
|
# )
|
# bus.send(msg)
|
|
|
# # bus = open_device()
|
# # while True:
|
# # send_msg(bus)
|
# # # rece_msg(bus)
|
|
|
# device_type = DEVICETYPE['USBCAN-II']
|
# device_index = 0
|
# can_channel = 0
|
# can_filters = [
|
# # {'can_id': self.shifter.canid.normalid, 'can_mask': 0xFFFFFFFF},
|
# {'can_id': 0x420, 'can_mask': 0xFFFFFFFF}]
|
# bus = USBCAN(device_type, device_index, can_channel,
|
# bitrate=500000, can_filters=can_filters)
|
|
# msg = Message(
|
# arbitration_id=0x742,
|
# is_remote_frame=0,
|
# channel=0,
|
# dlc=8,
|
# data=bytearray([0x02, 0x3e, 0, 0, 0, 0, 0, 0])
|
# )
|
|
|
# q_rx = Queue()
|
# q_tx = Queue()
|
# connect = Value('i', 1)
|
# close = Value('i', 0)
|
|
# # print(q_rx.empty())
|
|
# # def start():
|
|
|
# # def rece_msg():
|
# # # num = bus.GetReceiveNum(device_type, device_index, can_channel)
|
# # print(num)
|
# # for i in range(num):
|
# # # while not q_rx.empty():
|
# # print(q_rx.get())
|
# # # time.sleep(0.1)
|
|
|
# # bus.CloseDevice()
|
# # start()
|
# def formatedata(index, item, received):
|
# data = []
|
# if received:
|
# data.append('{:0>4}'.format(index))
|
# data.append(item.timestamp)
|
# data.append('接收')
|
# else:
|
# data.append('')
|
# data.append(item.timestamp)
|
# data.append('发送')
|
|
# # print(time.time(), time.localtime(item.TimeStamp), item.TimeStamp)
|
# data.append(str.upper(str(hex(item.arbitration_id))).replace('X', 'x'))
|
|
# # data.append('远程帧' if int(item.is_remote_frame) == 1 else '数据帧')
|
# data.append(item.dlc)
|
# if int(item.is_extended_id) == 1:
|
# data.append('扩展帧')
|
# else:
|
# data.append('标准帧')
|
|
# data.append(' '.join(['{:0<2x}'.format(a) for a in list(item.data)]))
|
# return data
|
|
|
# def rece_msg():
|
# # msg, num = bus.Receive(len=10)
|
# num = q_rx.qsize()
|
# if not num == 0:
|
# for i in range(num):
|
# # print(msg[0].arbitration_id)
|
# print(formatedata(i, q_rx.get(), True))
|
|
|
# def tx_task(bus: USBCAN):
|
# while True:
|
# bus.send(msg=msg)
|
# time.sleep(0.1)
|
|
|
# def rx_task(bus: USBCAN):
|
# while True:
|
# msg, num = bus.Receive(len=10)
|
# time.sleep(0.1)
|
# if not num == 0:
|
# for i in range(num):
|
# if msg[i].arbitration_id == 0x77a:
|
# # print(formatedata(i, msg[i], True))
|
# print(msg[i].data)
|
|
|
# if __name__ == '__main__':
|
# bus.InitAndStart()
|
# info = bus.GetDeviceInf()
|
# print(info.fw_version)
|
# # arg = {bus}
|
# t = threading.Thread(target=rx_task, args=(bus,))
|
# tx = threading.Thread(target=tx_task, args=(bus,))
|
# t.start()
|
# tx.start()
|
# msgProcess = Process(
|
# name='pyUSBCANListener',
|
# target=bus.ListeningMsg,
|
# args=(connect, close, q_rx, q_tx))
|
# msgProcess.daemon = True
|
# msgProcess.start()
|
# while True:
|
# rece_msg()
|
|
# def inserttable(table, data):
|
# # for row in range(5):
|
# for column in range(7):
|
# item = QTableWidgetItem() # 模型
|
# item.setTextAlignment(Qt.AlignHCenter | Qt.AlignVCenter) # 文本显示位置
|
# item.setText(str(data[column]))
|
# item.setFont(QFont("微软雅黑", 12, QFont.Black)) # 设置字体
|
# table.setItem(0, column, item)
|
|
|
# if __name__ == '__main__':
|
|
# app = QApplication(sys.argv)
|
# # loop = QEventLoop(app)
|
# # asyncio.set_event_loop(loop)
|
|
# # w = MainWidget()
|
# # w = FlashBootloaderWidget()
|
# # w = DataIdentifierWidget()
|
# # w = DiagnosticTroubleCodeWidget()
|
|
# windows = QMainWindow()
|
# w = Ui_MainWindow()
|
# w.setupUi(windows)
|
# windows.show()
|
# msg = Message(arbitration_id=0x420, is_extended_id=0,
|
# dlc=8, data=bytearray(8))
|
# data = formatedata(0, msg, True)
|
# inserttable(w.tableWidget, data)
|
# # w.show()
|
|
# sys.exit(app.exec_())
|