tao_z
2022-06-06 20cc603922983f154d10b7d56c6b49e5e02997bd
USBCAN.py
@@ -1,3 +1,4 @@
from asyncio.log import logger
from ctypes import *
from curses import raw
# import platform
@@ -50,7 +51,6 @@
SEND_SINGLE = 1
SELF_SEND_RECV = 2
SELF_SEND_RECV_SINGLE = 3
'''
 Interface return status
'''
@@ -62,52 +62,84 @@
class VCI_INIT_CONFIG(Structure):
    _fields_ = [("AccCode", c_ulong),
                ("AccMask", c_ulong),
                ("Reserved", c_ulong),
                ("Filter", c_ubyte),
                ("Timing0", c_ubyte),
                ("Timing1", c_ubyte),
                ("Mode", c_ubyte)
                ]
    _fields_ = [("AccCode", c_ulong), ("AccMask", c_ulong),
                ("Reserved", c_ulong), ("Filter", c_ubyte),
                ("Timing0", c_ubyte), ("Timing1", c_ubyte), ("Mode", c_ubyte)]
class VCI_CAN_OBJ(Structure):
    _fields_ = [("ID", c_uint),
                ("TimeStamp", c_uint),
                ("TimeFlag", c_ubyte),
                ("SendType", c_ubyte),
                ("RemoteFlag", c_ubyte),
                ("ExternFlag", c_ubyte),
                ("DataLen", c_ubyte),
                ("Data", c_ubyte*8),
                ("Reserved", c_ubyte*3)
                ]
    _fields_ = [("ID", c_uint), ("TimeStamp", c_uint), ("TimeFlag", c_ubyte),
                ("SendType", c_ubyte), ("RemoteFlag", c_ubyte),
                ("ExternFlag", c_ubyte), ("DataLen", c_ubyte),
                ("Data", c_ubyte * 8), ("Reserved", c_ubyte * 3)]
class PVCI_ERR_INFO(Structure):
    _fields_ = [("ErrorCode", c_uint),
                ("PassiveErrData", c_ubyte*3),
                ("ArLostErrData", c_ubyte)
                ]
    _fields_ = [("ErrorCode", c_uint), ("PassiveErrData", c_ubyte * 3),
                ("ArLostErrData", c_ubyte)]
baudRateConfig = {
    '5Kbps': {'time0': 0xBF, 'time1': 0xFF},
    '10Kbps': {'time0': 0x31, 'time1': 0x1C},
    '20Kbps': {'time0': 0x18, 'time1': 0x1C},
    '40Kbps': {'time0': 0x87, 'time1': 0xFF},
    '50Kbps': {'time0': 0x09, 'time1': 0x1C},
    '80Kbps': {'time0': 0x83, 'time1': 0xFF},
    '100Kbps': {'time0': 0x04, 'time1': 0x1C},
    '125Kbps': {'time0': 0x03, 'time1': 0x1C},
    '200Kbps': {'time0': 0x81, 'time1': 0xFA},
    '250Kbps': {'time0': 0x01, 'time1': 0x1C},
    '400Kbps': {'time0': 0x80, 'time1': 0xFA},
    '500Kbps': {'time0': 0x00, 'time1': 0x1C},
    '666Kbps': {'time0': 0x80, 'time1': 0xB6},
    '800Kbps': {'time0': 0x00, 'time1': 0x16},
    '1000Kbps': {'time0': 0x00, 'time1': 0x14},
    '5Kbps': {
        'time0': 0xBF,
        'time1': 0xFF
    },
    '10Kbps': {
        'time0': 0x31,
        'time1': 0x1C
    },
    '20Kbps': {
        'time0': 0x18,
        'time1': 0x1C
    },
    '40Kbps': {
        'time0': 0x87,
        'time1': 0xFF
    },
    '50Kbps': {
        'time0': 0x09,
        'time1': 0x1C
    },
    '80Kbps': {
        'time0': 0x83,
        'time1': 0xFF
    },
    '100Kbps': {
        'time0': 0x04,
        'time1': 0x1C
    },
    '125Kbps': {
        'time0': 0x03,
        'time1': 0x1C
    },
    '200Kbps': {
        'time0': 0x81,
        'time1': 0xFA
    },
    '250Kbps': {
        'time0': 0x01,
        'time1': 0x1C
    },
    '400Kbps': {
        'time0': 0x80,
        'time1': 0xFA
    },
    '500Kbps': {
        'time0': 0x00,
        'time1': 0x1C
    },
    '666Kbps': {
        'time0': 0x80,
        'time1': 0xB6
    },
    '800Kbps': {
        'time0': 0x00,
        'time1': 0x16
    },
    '1000Kbps': {
        'time0': 0x00,
        'time1': 0x14
    },
}
TIMING_DICT = {
@@ -136,22 +168,20 @@
class USBCAN_DEVICE_INFO(Structure):
    _fields_ = [("hw_Version", c_ushort),
                ("fw_Version", c_ushort),
                ("dr_Version", c_ushort),
                ("in_Version", c_ushort),
                ("irq_Num", c_ushort),
                ("can_Num", c_ubyte),
    _fields_ = [("hw_Version", c_ushort), ("fw_Version", c_ushort),
                ("dr_Version", c_ushort), ("in_Version", c_ushort),
                ("irq_Num", c_ushort), ("can_Num", c_ubyte),
                ("str_Serial_Num", c_ubyte * 20),
                ("str_hw_Type", c_ubyte * 40),
                ("reserved", c_ushort * 4)]
                ("str_hw_Type", c_ubyte * 40), ("reserved", c_ushort * 4)]
    def __str__(self):
        return "Hardware Version:%s\nFirmware Version:%s\nDriver Interface:%s\nInterface Interface:%s\nInterrupt Number:%d\nCAN Number:%d\nSerial:%s\nHardware Type:%s\n" % (
            self.hw_version, self.fw_version, self.dr_version, self.in_version, self.irq_num, self.can_num, self.serial, self.hw_type)
            self.hw_version, self.fw_version, self.dr_version, self.in_version,
            self.irq_num, self.can_num, self.serial, self.hw_type)
    def _version(self, version):
        return ("V%02x.%02x" if version // 0xFF >= 9 else "V%d.%02x") % (version // 0xFF, version & 0xFF)
        return ("V%02x.%02x" if version // 0xFF >= 9 else
                "V%d.%02x") % (version // 0xFF, version & 0xFF)
    @property
    def hw_version(self):
@@ -199,25 +229,16 @@
class _USBCAN_CHANNEL_CAN_INIT_CONFIG(Structure):
    _fields_ = [("acc_code", c_uint),
                ("acc_mask", c_uint),
                ("reserved", c_uint),
                ("filter",   c_ubyte),
                ("timing0",  c_ubyte),
                ("timing1",  c_ubyte),
                ("mode",     c_ubyte)]
    _fields_ = [("acc_code", c_uint), ("acc_mask", c_uint),
                ("reserved", c_uint), ("filter", c_ubyte),
                ("timing0", c_ubyte), ("timing1", c_ubyte), ("mode", c_ubyte)]
class _USBCAN_CHANNEL_CANFD_INIT_CONFIG(Structure):
    _fields_ = [("acc_code",     c_uint),
                ("acc_mask",     c_uint),
                ("abit_timing",  c_uint),
                ("dbit_timing",  c_uint),
                ("brp",          c_uint),
                ("filter",       c_ubyte),
                ("mode",         c_ubyte),
                ("pad",          c_ushort),
                ("reserved",     c_uint)]
    _fields_ = [("acc_code", c_uint), ("acc_mask", c_uint),
                ("abit_timing", c_uint), ("dbit_timing", c_uint),
                ("brp", c_uint), ("filter", c_ubyte), ("mode", c_ubyte),
                ("pad", c_ushort), ("reserved", c_uint)]
class _USBCAN_CHANNEL_INIT_CONFIG(Union):
@@ -226,25 +247,19 @@
class USBCAN_CHANNEL_INIT_CONFIG(Structure):
    _fields_ = [("can_type", c_uint),
                ("config", _USBCAN_CHANNEL_INIT_CONFIG)]
    _fields_ = [("can_type", c_uint), ("config", _USBCAN_CHANNEL_INIT_CONFIG)]
class USBCAN_CHANNEL_ERR_INFO(Structure):
    _fields_ = [("error_code", c_uint),
                ("passive_ErrData", c_ubyte * 3),
    _fields_ = [("error_code", c_uint), ("passive_ErrData", c_ubyte * 3),
                ("arLost_ErrData", c_ubyte)]
class USBCAN_CHANNEL_STATUS(Structure):
    _fields_ = [("errInterrupt", c_ubyte),
                ("regMode",      c_ubyte),
                ("regStatus",    c_ubyte),
                ("regALCapture", c_ubyte),
                ("regECCapture", c_ubyte),
                ("regEWLimit",   c_ubyte),
                ("regRECounter", c_ubyte),
                ("regTECounter", c_ubyte),
    _fields_ = [("errInterrupt", c_ubyte), ("regMode", c_ubyte),
                ("regStatus", c_ubyte), ("regALCapture", c_ubyte),
                ("regECCapture", c_ubyte), ("regEWLimit", c_ubyte),
                ("regRECounter", c_ubyte), ("regTECounter", c_ubyte),
                ("Reserved",     c_ubyte)]
@@ -257,18 +272,27 @@
class USBCAN():
    def __init__(self, device_type=4, device_index=0, can_index=0, bitrate=None, can_filters=None, **kwargs):
    def __init__(self,
                 device_type=4,
                 device_index=0,
                 can_index=0,
                 bitrate=None,
                 can_filters=None,
                 **kwargs):
        # super(USBCAN, self).__init__(can_index, can_filters, **kwargs)
        self.logger = logging.getLogger()
        self.logger.setLevel(logging.DEBUG)
        # self.logger = logging.getLogger()
        # self.logger.setLevel(logging.DEBUG)
        self.logger = logger
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        # 终端Handler
        consoleHandler = logging.StreamHandler()
        consoleHandler.setLevel(logging.DEBUG)
        # 文件Handler
        fileHandler = logging.FileHandler(
            './log/USBCAN.log', mode='a', encoding='UTF-8')
        fileHandler = logging.FileHandler('./log/USBCAN.log',
                                          mode='a',
                                          encoding='UTF-8')
        fileHandler.setLevel(logging.NOTSET)
        consoleHandler.setFormatter(formatter)
@@ -286,21 +310,25 @@
        if Timing0 is None or Timing1 is None:
            self.logger.error("Timing registers are not set")
        self.init_config = VCI_INIT_CONFIG(
            0, 0xFFFFFFFF, 0, 1, Timing0, Timing1, 0)
        self.init_config = VCI_INIT_CONFIG(0, 0xFFFFFFFF, 0, 1, Timing0,
                                           Timing1, 0)
    def InitAndStart(self):
        if USBCAN_DLL.VCI_OpenDevice(self.device, self.device_index, 0) == USBCAN_STATUS_ERR:
        if USBCAN_DLL.VCI_OpenDevice(self.device, self.device_index,
                                     0) == USBCAN_STATUS_ERR:
            self.logger.error("VCI_OpenDevice Error")
            return False
            # for can_index in self.channels:
        if USBCAN_DLL.VCI_InitCAN(self.device, self.device_index, self.channel, byref(self.init_config)) == USBCAN_STATUS_ERR:
        if USBCAN_DLL.VCI_InitCAN(self.device, self.device_index, self.channel,
                                  byref(
                                      self.init_config)) == USBCAN_STATUS_ERR:
            self.logger.error("VCI_InitCAN Error")
            self.CloseDevice()
            return False
        if USBCAN_DLL.VCI_StartCAN(self.device, self.device_index, self.channel) == USBCAN_STATUS_ERR:
        if USBCAN_DLL.VCI_StartCAN(self.device, self.device_index,
                                   self.channel) == USBCAN_STATUS_ERR:
            self.logger.error("VCI_StartCAN Error")
            self.CloseDevice()
            return False
@@ -308,7 +336,8 @@
    def OpenDevice(self, device_type, device_index, reserved):
        try:
            return USBCAN_DLL.VCI_OpenDevice(device_type, device_index, reserved)
            return USBCAN_DLL.VCI_OpenDevice(device_type, device_index,
                                             reserved)
        except:
            self.logger.error("Exception on OpenDevice!")
            raise
@@ -323,8 +352,8 @@
    def GetDeviceInf(self):
        try:
            info = USBCAN_DEVICE_INFO()
            ret = USBCAN_DLL.VCI_ReadBoardInfo(
                self.device, self.device_index, byref(info))
            ret = USBCAN_DLL.VCI_ReadBoardInfo(self.device, self.device_index,
                                               byref(info))
            return info if ret == USBCAN_STATUS_OK else None
        except:
            self.logger.error("Exception on USBCAN_GetDeviceInf")
@@ -332,35 +361,40 @@
    def InitCAN(self, device_type,  device_index, can_index, init_config):
        try:
            return USBCAN_DLL.VCI_InitCAN(device_type, device_index, can_index, byref(init_config))
            return USBCAN_DLL.VCI_InitCAN(device_type, device_index, can_index,
                                          byref(init_config))
        except:
            self.logger.error("Exception on USBCAN_InitCAN!")
            raise
    def StartCAN(self, device_type,  device_index, can_index):
        try:
            return USBCAN_DLL.VCI_StartCAN(device_type,  device_index, can_index)
            return USBCAN_DLL.VCI_StartCAN(device_type, device_index,
                                           can_index)
        except:
            self.logger.error("Exception on USBCAN_StartCAN!")
            raise
    def ResetCAN(self, device_type,  device_index, can_index):
        try:
            return USBCAN_DLL.VCI_ResetCAN(device_type,  device_index, can_index)
            return USBCAN_DLL.VCI_ResetCAN(device_type, device_index,
                                           can_index)
        except:
            self.logger.error("Exception on USBCAN_ResetCAN!")
            raise
    def ClearBuffer(self):
        try:
            return USBCAN_DLL.VCI_ClearBuffer(self.device,  self.device_index, self.channel)
            return USBCAN_DLL.VCI_ClearBuffer(self.device, self.device_index,
                                              self.channel)
        except:
            self.logger.error("Exception on USBCAN_ClearBuffer!")
            raise
    def GetReceiveNum(self):
        try:
            return USBCAN_DLL.VCI_GetReceiveNum(self.device, self.device_index, self.channel)
            return USBCAN_DLL.VCI_GetReceiveNum(self.device, self.device_index,
                                                self.channel)
        except:
            self.logger.error("Exception on USBCAN_GetReceiveNum!")
            raise
@@ -381,22 +415,25 @@
        # (msg.arbitration_id, 0, 0, 1, msg.is_remote_frame,
        #                           extern_flag, msg.dlc, (c_ubyte * 8)(*msg.data), (c_ubyte * 3)(*[0, 0, 0]))
        USBCAN_DLL.VCI_Transmit(
            self.device, self.device_index, self.channel, byref(raw_message), 1)
        USBCAN_DLL.VCI_Transmit(self.device, self.device_index, self.channel,
                                byref(raw_message), 1)
    def Receive(self, len=10, timeout=None):
        raw_message = (VCI_CAN_OBJ*len)()
        timeout = -1 if timeout is None else c_int(timeout * 1000)
        msg = list()
        rtn = USBCAN_DLL.VCI_Receive(
            self.device, self.device_index, self.channel, byref(raw_message), len, timeout)
        rtn = USBCAN_DLL.VCI_Receive(self.device,
                                     self.device_index, self.channel,
                                     byref(raw_message), len, timeout)
        if rtn == 0xFFFFFFFF or rtn == 0:
            return None, False
        else:
            for i in range(rtn):
                msg.append(Message(
                    timestamp=raw_message[i].TimeStamp if raw_message[i].TimeFlag else 0.0,
                msg.append(
                    Message(
                        timestamp=raw_message[i].TimeStamp
                        if raw_message[i].TimeFlag else 0.0,
                    arbitration_id=raw_message[i].ID,
                    is_remote_frame=raw_message[i].RemoteFlag,
                    channel=0,
@@ -405,7 +442,8 @@
                ))
            return (msg, rtn)
    def ListeningMsg(self, connectRet, needClose,  msgQueue: Queue, sendQueue: Queue):
    def ListeningMsg(self, connectRet, needClose, msgQueue: Queue,
                     sendQueue: Queue):
        '''call by single process to deal can messages.
        Arguments:
            connectRet {Value} -- return value of connection
@@ -436,5 +474,6 @@
                    for i in range(num):
                        msgQueue.put(revRet[i])
                        self.logger.info(
                            "received can message.it't id is:{}".format(str(revRet[i].arbitration_id)))
                            "received can message.it't id is:{}".format(
                                str(revRet[i].arbitration_id)))
                time.sleep(0.05)