tao_z
2022-06-10 48fc7d6c9549513d9b1da3f99f449fc7de27821f
test.py
@@ -10,138 +10,188 @@
import sys
from PySide2.QtWidgets import QApplication, QMainWindow
from mainwindows import *
from multiprocessing import Process, Queue, Value
from multiprocessing import Process, Queue, Value, Pipe
import datetime
# def rece_msg(bus):
#     msg = bus.Receive(0.1)
#     if msg[0] is not None:
#         print(msg[0].arbitration_id)
DID_dic = {
    "ProgrammingCounter": 0x2100,
    "ProAtpCounter": 0x2101,
    "BackupConfig": 0x2102,
    "VehicleName": 0x2103,
    "DiagVersion": 0x2104,
    "BootSoftId": 0x2105,
    "DFLZMPartNum": 0xF187,
    "VehiclEECUSoftwareVersion": 0x2108,
    "SupplierID": 0xF18A,
    "ECUManufacturingDate": 0x210B,
    "ECUSerialNumID": 0xf18c,
    "VINDataIdentifier": 0xF190,
    "ECUHardwareVersion": 0xF193,
    "ECUSoftwareVersion": 0xF195,
    "SystemNameOrEngineType": 0x2110,
    "RepairShopCodeOrTester": 0x2111,
    "ECUProgtammingDate": 0x2111,
    "InstallDate": 0x2113,
    "NetConfig": 0xf019,
    "FunctionConfig": 0xf010,
    "ActiveDiagSession": 0x2106,
    "EGSMSensorPositon": 0x2116,
    "ReleaseButtonState": 0x2118,
    "PButtionState": 0x2119,
    "PaddleState": 0x211A,
    "ECUPowerVoltage": 0x211B,
    "VehicleSpeed": 0x211c,
    "SystemName": 0xF197,
    "WriteFingerPrint": 0xF15A,
    "ReadFingerPrint": 0xf15b,
    "IndicationLEDControl": 0x8101
}
print(DID_dic["IndicationLEDControl"])
# def f(conn):
#     conn.send([1, 'test', None])
#     conn.send([2, 'test', None])
#     print(conn.recv())
#     conn.close()
def send_msg(bus):
    msg = Message(
        arbitration_id=0x742,
        is_remote_frame=0,
        channel=0,
        dlc=8,
        data=[0]*8,
    )
    bus.send(msg)
# if __name__ == "__main__":
#     parent_conn, child_conn = Pipe()  # 产生两个返回对象,一个是管道这一头,一个是另一头
#     p = Process(target=f, args=(child_conn,))
#     p.start()
#     print(parent_conn.recv())
#     print(parent_conn.recv())
#     parent_conn.send('father test')
#     p.join()
# def send_msg(bus):
#     msg = Message(
#         arbitration_id=0x742,
#         is_remote_frame=0,
#         channel=0,
#         dlc=8,
#         data=[0]*8,
#     )
#     bus.send(msg)
# bus = open_device()
# while True:
#     send_msg(bus)
#     # rece_msg(bus)
# # bus = open_device()
# # while True:
# #     send_msg(bus)
# #     # rece_msg(bus)
device_type = DEVICETYPE['USBCAN-II']
device_index = 0
can_channel = 0
can_filters = [
    # {'can_id': self.shifter.canid.normalid, 'can_mask': 0xFFFFFFFF},
    {'can_id': 0x420, 'can_mask': 0xFFFFFFFF}]
bus = USBCAN(device_type, device_index, can_channel,
             bitrate=500000, can_filters=can_filters)
# device_type = DEVICETYPE['USBCAN-II']
# device_index = 0
# can_channel = 0
# can_filters = [
#     # {'can_id': self.shifter.canid.normalid, 'can_mask': 0xFFFFFFFF},
#     {'can_id': 0x420, 'can_mask': 0xFFFFFFFF}]
# bus = USBCAN(device_type, device_index, can_channel,
#              bitrate=500000, can_filters=can_filters)
msg = Message(
    arbitration_id=0x742,
    is_remote_frame=0,
    channel=0,
    dlc=8,
    data=bytearray([0x02, 0x3e, 0, 0, 0, 0, 0, 0])
)
# msg = Message(
#     arbitration_id=0x742,
#     is_remote_frame=0,
#     channel=0,
#     dlc=8,
#     data=bytearray([0x02, 0x3e, 0, 0, 0, 0, 0, 0])
# )
q_rx = Queue()
q_tx = Queue()
connect = Value('i', 1)
close = Value('i', 0)
# q_rx = Queue()
# q_tx = Queue()
# connect = Value('i', 1)
# close = Value('i', 0)
# print(q_rx.empty())
# # print(q_rx.empty())
# def start():
# # def start():
# # def rece_msg():
# #     # num = bus.GetReceiveNum(device_type, device_index, can_channel)
# #     print(num)
# #     for i in range(num):
# #         # while not q_rx.empty():
# #         print(q_rx.get())
# #         #     time.sleep(0.1)
# # bus.CloseDevice()
# # start()
# def formatedata(index, item, received):
#     data = []
#     if received:
#         data.append('{:0>4}'.format(index))
#         data.append(item.timestamp)
#         data.append('接收')
#     else:
#         data.append('')
#         data.append(item.timestamp)
#         data.append('发送')
#     # print(time.time(), time.localtime(item.TimeStamp), item.TimeStamp)
#     data.append(str.upper(str(hex(item.arbitration_id))).replace('X', 'x'))
#     # data.append('远程帧' if int(item.is_remote_frame) == 1 else '数据帧')
#     data.append(item.dlc)
#     if int(item.is_extended_id) == 1:
#         data.append('扩展帧')
#     else:
#         data.append('标准帧')
#     data.append(' '.join(['{:0<2x}'.format(a) for a in list(item.data)]))
#     return data
# def rece_msg():
#     # num = bus.GetReceiveNum(device_type, device_index, can_channel)
#     print(num)
#     for i in range(num):
#         # while not q_rx.empty():
#         print(q_rx.get())
#         #     time.sleep(0.1)
#     # msg, num = bus.Receive(len=10)
#     num = q_rx.qsize()
#     if not num == 0:
#         for i in range(num):
#             # print(msg[0].arbitration_id)
#             print(formatedata(i, q_rx.get(), True))
# bus.CloseDevice()
# start()
def formatedata(index, item, received):
    data = []
    if received:
        data.append('{:0>4}'.format(index))
        data.append(item.timestamp)
        data.append('接收')
    else:
        data.append('')
        data.append(item.timestamp)
        data.append('发送')
    # print(time.time(), time.localtime(item.TimeStamp), item.TimeStamp)
    data.append(str.upper(str(hex(item.arbitration_id))).replace('X', 'x'))
    # data.append('远程帧' if int(item.is_remote_frame) == 1 else '数据帧')
    data.append(item.dlc)
    if int(item.is_extended_id) == 1:
        data.append('扩展帧')
    else:
        data.append('标准帧')
    data.append(' '.join(['{:0<2x}'.format(a) for a in list(item.data)]))
    return data
# def tx_task(bus: USBCAN):
#     while True:
#         bus.send(msg=msg)
#         time.sleep(0.1)
def rece_msg():
    # msg, num = bus.Receive(len=10)
    num = q_rx.qsize()
    if not num == 0:
        for i in range(num):
            # print(msg[0].arbitration_id)
            print(formatedata(i, q_rx.get(), True))
# def rx_task(bus: USBCAN):
#     while True:
#         msg, num = bus.Receive(len=10)
#         time.sleep(0.1)
#         if not num == 0:
#             for i in range(num):
#                 if msg[i].arbitration_id == 0x77a:
#                     # print(formatedata(i, msg[i], True))
#                     print(msg[i].data)
def tx_task(bus: USBCAN):
    while True:
        bus.send(msg=msg)
        time.sleep(0.1)
def rx_task(bus: USBCAN):
    while True:
        msg, num = bus.Receive(len=10)
        time.sleep(0.1)
        if not num == 0:
            for i in range(num):
                if msg[i].arbitration_id == 0x77a:
                    # print(formatedata(i, msg[i], True))
                    print(msg[i].data)
if __name__ == '__main__':
    bus.InitAndStart()
    info = bus.GetDeviceInf()
    print(info.fw_version)
    # arg = {bus}
    t = threading.Thread(target=rx_task, args=(bus,))
    tx = threading.Thread(target=tx_task, args=(bus,))
    t.start()
    tx.start()
    # msgProcess = Process(
    #     name='pyUSBCANListener',
    #     target=bus.ListeningMsg,
    #     args=(connect, close, q_rx, q_tx))
    # msgProcess.daemon = True
    # msgProcess.start()
    # while True:
    #     rece_msg()
# if __name__ == '__main__':
#     bus.InitAndStart()
#     info = bus.GetDeviceInf()
#     print(info.fw_version)
#     # arg = {bus}
#     t = threading.Thread(target=rx_task, args=(bus,))
#     tx = threading.Thread(target=tx_task, args=(bus,))
#     t.start()
#     tx.start()
# msgProcess = Process(
#     name='pyUSBCANListener',
#     target=bus.ListeningMsg,
#     args=(connect, close, q_rx, q_tx))
# msgProcess.daemon = True
# msgProcess.start()
# while True:
#     rece_msg()
# def inserttable(table, data):
#     # for row in range(5):