from asyncio.log import logger from ctypes import * from curses import raw # import platform import sys import os import logging from can import BusABC, Message import time from multiprocessing import Queue, Value, Pipe USBCAN_DEVICE_TYPE = c_uint INVALID_DEVICE_HANDLE = 0 INVALID_CHANNEL_HANDLE = 0 # can type DEVICETYPE = { 'USBCAN-I': 3, 'USBCAN-II': 4, } # can mode NORMAL_MODE = 0 LISTEN_MODE = 1 # filter type SINGLE_FILTER = 0 DOUBLE_FILTER = 1 # channel number CAN_CHANNEL_0 = 0 CAN_CHANNEL_1 = 1 CAN_CHANNEL_2 = 2 CAN_CHANNEL_3 = 3 CAN_CHANNEL_4 = 4 CAN_CHANNEL_5 = 5 CAN_CHANNEL_6 = 6 CAN_CHANNEL_7 = 7 CAN_CHANNEL_8 = 8 CAN_CHANNEL_9 = 9 CAN_CHANNEL_10 = 10 CAN_CHANNEL_11 = 11 CAN_CHANNEL_12 = 12 # status STATUS_OK = 1 # sendtype SEND_NORMAL = 0 SEND_SINGLE = 1 SELF_SEND_RECV = 2 SELF_SEND_RECV_SINGLE = 3 ''' Interface return status ''' USBCAN_STATUS_ERR = 0 USBCAN_STATUS_OK = 1 USBCAN_STATUS_ONLINE = 2 USBCAN_STATUS_OFFLINE = 3 USBCAN_STATUS_UNSUPPORTED = 4 class VCI_INIT_CONFIG(Structure): _fields_ = [("AccCode", c_ulong), ("AccMask", c_ulong), ("Reserved", c_ulong), ("Filter", c_ubyte), ("Timing0", c_ubyte), ("Timing1", c_ubyte), ("Mode", c_ubyte)] class VCI_CAN_OBJ(Structure): _fields_ = [("ID", c_uint), ("TimeStamp", c_uint), ("TimeFlag", c_ubyte), ("SendType", c_ubyte), ("RemoteFlag", c_ubyte), ("ExternFlag", c_ubyte), ("DataLen", c_ubyte), ("Data", c_ubyte * 8), ("Reserved", c_ubyte * 3)] class PVCI_ERR_INFO(Structure): _fields_ = [("ErrorCode", c_uint), ("PassiveErrData", c_ubyte * 3), ("ArLostErrData", c_ubyte)] baudRateConfig = { '5Kbps': { 'time0': 0xBF, 'time1': 0xFF }, '10Kbps': { 'time0': 0x31, 'time1': 0x1C }, '20Kbps': { 'time0': 0x18, 'time1': 0x1C }, '40Kbps': { 'time0': 0x87, 'time1': 0xFF }, '50Kbps': { 'time0': 0x09, 'time1': 0x1C }, '80Kbps': { 'time0': 0x83, 'time1': 0xFF }, '100Kbps': { 'time0': 0x04, 'time1': 0x1C }, '125Kbps': { 'time0': 0x03, 'time1': 0x1C }, '200Kbps': { 'time0': 0x81, 'time1': 0xFA }, '250Kbps': { 'time0': 0x01, 'time1': 0x1C }, '400Kbps': { 'time0': 0x80, 'time1': 0xFA }, '500Kbps': { 'time0': 0x00, 'time1': 0x1C }, '666Kbps': { 'time0': 0x80, 'time1': 0xB6 }, '800Kbps': { 'time0': 0x00, 'time1': 0x16 }, '1000Kbps': { 'time0': 0x00, 'time1': 0x14 }, } TIMING_DICT = { 5000: (0xBF, 0xFF), 10000: (0x31, 0x1C), 20000: (0x18, 0x1C), 33330: (0x09, 0x6F), 40000: (0x87, 0xFF), 50000: (0x09, 0x1C), 66660: (0x04, 0x6F), 80000: (0x83, 0xFF), 83330: (0x03, 0x6F), 100000: (0x04, 0x1C), 125000: (0x03, 0x1C), 200000: (0x81, 0xFA), 250000: (0x01, 0x1C), 400000: (0x80, 0xFA), 500000: (0x00, 0x1C), 666000: (0x80, 0xB6), 800000: (0x00, 0x16), 1000000: (0x00, 0x14), } ''' Device information ''' class USBCAN_DEVICE_INFO(Structure): _fields_ = [("hw_Version", c_ushort), ("fw_Version", c_ushort), ("dr_Version", c_ushort), ("in_Version", c_ushort), ("irq_Num", c_ushort), ("can_Num", c_ubyte), ("str_Serial_Num", c_ubyte * 20), ("str_hw_Type", c_ubyte * 40), ("reserved", c_ushort * 4)] def __str__(self): return "Hardware Version:%s\nFirmware Version:%s\nDriver Interface:%s\nInterface Interface:%s\nInterrupt Number:%d\nCAN Number:%d\nSerial:%s\nHardware Type:%s\n" % ( self.hw_version, self.fw_version, self.dr_version, self.in_version, self.irq_num, self.can_num, self.serial, self.hw_type) def _version(self, version): return ("V%02x.%02x" if version // 0xFF >= 9 else "V%d.%02x") % (version // 0xFF, version & 0xFF) @property def hw_version(self): return self._version(self.hw_Version) @property def fw_version(self): return self._version(self.fw_Version) @property def dr_version(self): return self._version(self.dr_Version) @property def in_version(self): return self._version(self.in_Version) @property def irq_num(self): return self.irq_Num @property def can_num(self): return self.can_Num @property def serial(self): serial = '' for c in self.str_Serial_Num: if c > 0: serial += chr(c) else: break return serial @property def hw_type(self): hw_type = '' for c in self.str_hw_Type: if c > 0: hw_type += chr(c) else: break return hw_type class _USBCAN_CHANNEL_CAN_INIT_CONFIG(Structure): _fields_ = [("acc_code", c_uint), ("acc_mask", c_uint), ("reserved", c_uint), ("filter", c_ubyte), ("timing0", c_ubyte), ("timing1", c_ubyte), ("mode", c_ubyte)] class _USBCAN_CHANNEL_CANFD_INIT_CONFIG(Structure): _fields_ = [("acc_code", c_uint), ("acc_mask", c_uint), ("abit_timing", c_uint), ("dbit_timing", c_uint), ("brp", c_uint), ("filter", c_ubyte), ("mode", c_ubyte), ("pad", c_ushort), ("reserved", c_uint)] class _USBCAN_CHANNEL_INIT_CONFIG(Union): _fields_ = [("can", _USBCAN_CHANNEL_CAN_INIT_CONFIG), ("canfd", _USBCAN_CHANNEL_CANFD_INIT_CONFIG)] class USBCAN_CHANNEL_INIT_CONFIG(Structure): _fields_ = [("can_type", c_uint), ("config", _USBCAN_CHANNEL_INIT_CONFIG)] class USBCAN_CHANNEL_ERR_INFO(Structure): _fields_ = [("error_code", c_uint), ("passive_ErrData", c_ubyte * 3), ("arLost_ErrData", c_ubyte)] class USBCAN_CHANNEL_STATUS(Structure): _fields_ = [("errInterrupt", c_ubyte), ("regMode", c_ubyte), ("regStatus", c_ubyte), ("regALCapture", c_ubyte), ("regECCapture", c_ubyte), ("regEWLimit", c_ubyte), ("regRECounter", c_ubyte), ("regTECounter", c_ubyte), ("Reserved", c_ubyte)] platformInfo = sys.version if '64 bit' in platformInfo: _CanDLLName = os.getcwd() + r'/config/ControlCANx64/ControlCAN.dll' else: _CanDLLName = os.getcwd() + r'/config/ControlCANx86/ControlCAN.dll' USBCAN_DLL = windll.LoadLibrary(_CanDLLName) class USBCAN(): def __init__(self, device_type=4, device_index=0, can_index=0, bitrate=None, can_filters=None, **kwargs): # super(USBCAN, self).__init__(can_index, can_filters, **kwargs) # self.logger = logging.getLogger() # self.logger.setLevel(logging.DEBUG) self.logger = logger formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s') # 终端Handler consoleHandler = logging.StreamHandler() consoleHandler.setLevel(logging.DEBUG) # 文件Handler fileHandler = logging.FileHandler('./log/USBCAN.log', mode='a', encoding='UTF-8') fileHandler.setLevel(logging.DEBUG) consoleHandler.setFormatter(formatter) fileHandler.setFormatter(formatter) self.logger.addHandler(consoleHandler) self.logger.addHandler(fileHandler) self.device = device_type self.device_index = device_index self.channel = can_index self.timestamp_start = None self.laststamp = None Timing0, Timing1 = TIMING_DICT[bitrate] if Timing0 is None or Timing1 is None: self.logger.error("Timing registers are not set") self.init_config = VCI_INIT_CONFIG(0, 0xFFFFFFFF, 0, 1, Timing0, Timing1, 0) def InitAndStart(self): if USBCAN_DLL.VCI_OpenDevice(self.device, self.device_index, 0) == USBCAN_STATUS_ERR: self.logger.error("VCI_OpenDevice Error") return False # for can_index in self.channels: if USBCAN_DLL.VCI_InitCAN(self.device, self.device_index, self.channel, byref( self.init_config)) == USBCAN_STATUS_ERR: self.logger.error("VCI_InitCAN Error") self.CloseDevice() return False if USBCAN_DLL.VCI_StartCAN(self.device, self.device_index, self.channel) == USBCAN_STATUS_ERR: self.logger.error("VCI_StartCAN Error") self.CloseDevice() return False return True def OpenDevice(self, device_type, device_index, reserved): try: return USBCAN_DLL.VCI_OpenDevice(device_type, device_index, reserved) except: self.logger.error("Exception on OpenDevice!") raise def CloseDevice(self): try: return USBCAN_DLL.VCI_CloseDevice(self.device, self.device_index) except: self.logger.error("Exception on CloseDevice!") raise def GetDeviceInf(self): try: info = USBCAN_DEVICE_INFO() ret = USBCAN_DLL.VCI_ReadBoardInfo(self.device, self.device_index, byref(info)) return info if ret == USBCAN_STATUS_OK else None except: self.logger.error("Exception on USBCAN_GetDeviceInf") raise def InitCAN(self, device_type, device_index, can_index, init_config): try: return USBCAN_DLL.VCI_InitCAN(device_type, device_index, can_index, byref(init_config)) except: self.logger.error("Exception on USBCAN_InitCAN!") raise def StartCAN(self, device_type, device_index, can_index): try: return USBCAN_DLL.VCI_StartCAN(device_type, device_index, can_index) except: self.logger.error("Exception on USBCAN_StartCAN!") raise def ResetCAN(self, device_type, device_index, can_index): try: return USBCAN_DLL.VCI_ResetCAN(device_type, device_index, can_index) except: self.logger.error("Exception on USBCAN_ResetCAN!") raise def ClearBuffer(self): try: return USBCAN_DLL.VCI_ClearBuffer(self.device, self.device_index, self.channel) except: self.logger.error("Exception on USBCAN_ClearBuffer!") raise def GetReceiveNum(self): try: return USBCAN_DLL.VCI_GetReceiveNum(self.device, self.device_index, self.channel) except: self.logger.error("Exception on USBCAN_GetReceiveNum!") raise def send(self, msg: Message): # extern_flag = 1 if msg.is_extended_id else 0 # extern_flag = 0 raw_message = VCI_CAN_OBJ() raw_message.ID = (c_uint)(msg.arbitration_id) raw_message.TimeStamp = (c_uint)(0) raw_message.TimeFlag = (c_ubyte)(0) raw_message.SendType = (c_ubyte)(1) raw_message.RemoteFlag = False raw_message.ExternFlag = (c_ubyte)(msg.is_extended_id) raw_message.DataLen = (c_ubyte)(msg.dlc) raw_message.Data = (c_ubyte * 8)(*[c_ubyte(c) for c in msg.data]) # (msg.arbitration_id, 0, 0, 1, msg.is_remote_frame, # extern_flag, msg.dlc, (c_ubyte * 8)(*msg.data), (c_ubyte * 3)(*[0, 0, 0])) USBCAN_DLL.VCI_Transmit(self.device, self.device_index, self.channel, byref(raw_message), 1) def Receive(self, len=10, timeout=None): raw_message = (VCI_CAN_OBJ * len)() timeout = -1 if timeout is None else c_int(timeout) msg = list() rtn = USBCAN_DLL.VCI_Receive(self.device, self.device_index, self.channel, byref(raw_message), len, timeout) if rtn == 0xFFFFFFFF or rtn == 0: return None, False else: if self.timestamp_start == None: self.laststamp = raw_message[0].TimeStamp self.timestamp_start = self.laststamp for i in range(rtn): msg.append( Message(timestamp=( (raw_message[i].TimeStamp - self.timestamp_start) / 1000000) if raw_message[i].TimeFlag else 0.0, arbitration_id=raw_message[i].ID, is_remote_frame=raw_message[i].RemoteFlag, channel=0, extended_id=raw_message[i].ExternFlag, data=raw_message[i].Data, is_error_frame=False)) # timestamp = raw_message[i].TimeStamp if raw_message[ # i].TimeFlag else 0.0, # print(timestamp) return (msg, rtn) def ListeningMsg(self, connectRet, needClose, msgQueue: Queue, sendQueue: Queue): '''call by single process to deal can messages. Arguments: connectRet {Value} -- return value of connection needClose {Value} -- disconnect when needed msgQueue {Queue} -- received data ''' while True: while connectRet.value == 1: # print(connectRet.value, needClose.value, start.value) if needClose.value == 1: ret = self.CloseDevice() if ret == 1: needClose.value = 0 connectRet.value = 0 return # msgToSendCnt = sendQueue.qsize() # if msgToSendCnt > 0: # msg = Message() # for i in range(msgToSendCnt): # msg = sendQueue.get() # self.send(msg, 1) # time.sleep(0.001) # loopCnt += 1 revRet, num = self.Receive(len=10) if num == 0: pass else: for i in range(num): msgQueue.put(revRet[i]) self.logger.info( "received can message.it't id is:{}".format( str(revRet[i].arbitration_id))) time.sleep(0.05)