from asyncio.log import logger
|
from ctypes import *
|
from curses import raw
|
# import platform
|
import sys
|
import os
|
import logging
|
from can import BusABC, Message
|
import time
|
from multiprocessing import Queue, Value, Pipe
|
|
USBCAN_DEVICE_TYPE = c_uint
|
|
INVALID_DEVICE_HANDLE = 0
|
INVALID_CHANNEL_HANDLE = 0
|
|
# can type
|
DEVICETYPE = {
|
'USBCAN-I': 3,
|
'USBCAN-II': 4,
|
}
|
|
# can mode
|
NORMAL_MODE = 0
|
LISTEN_MODE = 1
|
|
# filter type
|
SINGLE_FILTER = 0
|
DOUBLE_FILTER = 1
|
|
# channel number
|
CAN_CHANNEL_0 = 0
|
CAN_CHANNEL_1 = 1
|
CAN_CHANNEL_2 = 2
|
CAN_CHANNEL_3 = 3
|
CAN_CHANNEL_4 = 4
|
CAN_CHANNEL_5 = 5
|
CAN_CHANNEL_6 = 6
|
CAN_CHANNEL_7 = 7
|
CAN_CHANNEL_8 = 8
|
CAN_CHANNEL_9 = 9
|
CAN_CHANNEL_10 = 10
|
CAN_CHANNEL_11 = 11
|
CAN_CHANNEL_12 = 12
|
|
# status
|
STATUS_OK = 1
|
|
# sendtype
|
SEND_NORMAL = 0
|
SEND_SINGLE = 1
|
SELF_SEND_RECV = 2
|
SELF_SEND_RECV_SINGLE = 3
|
'''
|
Interface return status
|
'''
|
USBCAN_STATUS_ERR = 0
|
USBCAN_STATUS_OK = 1
|
USBCAN_STATUS_ONLINE = 2
|
USBCAN_STATUS_OFFLINE = 3
|
USBCAN_STATUS_UNSUPPORTED = 4
|
|
|
class VCI_INIT_CONFIG(Structure):
|
_fields_ = [("AccCode", c_ulong), ("AccMask", c_ulong),
|
("Reserved", c_ulong), ("Filter", c_ubyte),
|
("Timing0", c_ubyte), ("Timing1", c_ubyte), ("Mode", c_ubyte)]
|
|
|
class VCI_CAN_OBJ(Structure):
|
_fields_ = [("ID", c_uint), ("TimeStamp", c_uint), ("TimeFlag", c_ubyte),
|
("SendType", c_ubyte), ("RemoteFlag", c_ubyte),
|
("ExternFlag", c_ubyte), ("DataLen", c_ubyte),
|
("Data", c_ubyte * 8), ("Reserved", c_ubyte * 3)]
|
|
|
class PVCI_ERR_INFO(Structure):
|
_fields_ = [("ErrorCode", c_uint), ("PassiveErrData", c_ubyte * 3),
|
("ArLostErrData", c_ubyte)]
|
|
|
baudRateConfig = {
|
'5Kbps': {
|
'time0': 0xBF,
|
'time1': 0xFF
|
},
|
'10Kbps': {
|
'time0': 0x31,
|
'time1': 0x1C
|
},
|
'20Kbps': {
|
'time0': 0x18,
|
'time1': 0x1C
|
},
|
'40Kbps': {
|
'time0': 0x87,
|
'time1': 0xFF
|
},
|
'50Kbps': {
|
'time0': 0x09,
|
'time1': 0x1C
|
},
|
'80Kbps': {
|
'time0': 0x83,
|
'time1': 0xFF
|
},
|
'100Kbps': {
|
'time0': 0x04,
|
'time1': 0x1C
|
},
|
'125Kbps': {
|
'time0': 0x03,
|
'time1': 0x1C
|
},
|
'200Kbps': {
|
'time0': 0x81,
|
'time1': 0xFA
|
},
|
'250Kbps': {
|
'time0': 0x01,
|
'time1': 0x1C
|
},
|
'400Kbps': {
|
'time0': 0x80,
|
'time1': 0xFA
|
},
|
'500Kbps': {
|
'time0': 0x00,
|
'time1': 0x1C
|
},
|
'666Kbps': {
|
'time0': 0x80,
|
'time1': 0xB6
|
},
|
'800Kbps': {
|
'time0': 0x00,
|
'time1': 0x16
|
},
|
'1000Kbps': {
|
'time0': 0x00,
|
'time1': 0x14
|
},
|
}
|
|
TIMING_DICT = {
|
5000: (0xBF, 0xFF),
|
10000: (0x31, 0x1C),
|
20000: (0x18, 0x1C),
|
33330: (0x09, 0x6F),
|
40000: (0x87, 0xFF),
|
50000: (0x09, 0x1C),
|
66660: (0x04, 0x6F),
|
80000: (0x83, 0xFF),
|
83330: (0x03, 0x6F),
|
100000: (0x04, 0x1C),
|
125000: (0x03, 0x1C),
|
200000: (0x81, 0xFA),
|
250000: (0x01, 0x1C),
|
400000: (0x80, 0xFA),
|
500000: (0x00, 0x1C),
|
666000: (0x80, 0xB6),
|
800000: (0x00, 0x16),
|
1000000: (0x00, 0x14),
|
}
|
'''
|
Device information
|
'''
|
|
|
class USBCAN_DEVICE_INFO(Structure):
|
_fields_ = [("hw_Version", c_ushort), ("fw_Version", c_ushort),
|
("dr_Version", c_ushort), ("in_Version", c_ushort),
|
("irq_Num", c_ushort), ("can_Num", c_ubyte),
|
("str_Serial_Num", c_ubyte * 20),
|
("str_hw_Type", c_ubyte * 40), ("reserved", c_ushort * 4)]
|
|
def __str__(self):
|
return "Hardware Version:%s\nFirmware Version:%s\nDriver Interface:%s\nInterface Interface:%s\nInterrupt Number:%d\nCAN Number:%d\nSerial:%s\nHardware Type:%s\n" % (
|
self.hw_version, self.fw_version, self.dr_version, self.in_version,
|
self.irq_num, self.can_num, self.serial, self.hw_type)
|
|
def _version(self, version):
|
return ("V%02x.%02x" if version // 0xFF >= 9 else
|
"V%d.%02x") % (version // 0xFF, version & 0xFF)
|
|
@property
|
def hw_version(self):
|
return self._version(self.hw_Version)
|
|
@property
|
def fw_version(self):
|
return self._version(self.fw_Version)
|
|
@property
|
def dr_version(self):
|
return self._version(self.dr_Version)
|
|
@property
|
def in_version(self):
|
return self._version(self.in_Version)
|
|
@property
|
def irq_num(self):
|
return self.irq_Num
|
|
@property
|
def can_num(self):
|
return self.can_Num
|
|
@property
|
def serial(self):
|
serial = ''
|
for c in self.str_Serial_Num:
|
if c > 0:
|
serial += chr(c)
|
else:
|
break
|
return serial
|
|
@property
|
def hw_type(self):
|
hw_type = ''
|
for c in self.str_hw_Type:
|
if c > 0:
|
hw_type += chr(c)
|
else:
|
break
|
return hw_type
|
|
|
class _USBCAN_CHANNEL_CAN_INIT_CONFIG(Structure):
|
_fields_ = [("acc_code", c_uint), ("acc_mask", c_uint),
|
("reserved", c_uint), ("filter", c_ubyte),
|
("timing0", c_ubyte), ("timing1", c_ubyte), ("mode", c_ubyte)]
|
|
|
class _USBCAN_CHANNEL_CANFD_INIT_CONFIG(Structure):
|
_fields_ = [("acc_code", c_uint), ("acc_mask", c_uint),
|
("abit_timing", c_uint), ("dbit_timing", c_uint),
|
("brp", c_uint), ("filter", c_ubyte), ("mode", c_ubyte),
|
("pad", c_ushort), ("reserved", c_uint)]
|
|
|
class _USBCAN_CHANNEL_INIT_CONFIG(Union):
|
_fields_ = [("can", _USBCAN_CHANNEL_CAN_INIT_CONFIG),
|
("canfd", _USBCAN_CHANNEL_CANFD_INIT_CONFIG)]
|
|
|
class USBCAN_CHANNEL_INIT_CONFIG(Structure):
|
_fields_ = [("can_type", c_uint), ("config", _USBCAN_CHANNEL_INIT_CONFIG)]
|
|
|
class USBCAN_CHANNEL_ERR_INFO(Structure):
|
_fields_ = [("error_code", c_uint), ("passive_ErrData", c_ubyte * 3),
|
("arLost_ErrData", c_ubyte)]
|
|
|
class USBCAN_CHANNEL_STATUS(Structure):
|
_fields_ = [("errInterrupt", c_ubyte), ("regMode", c_ubyte),
|
("regStatus", c_ubyte), ("regALCapture", c_ubyte),
|
("regECCapture", c_ubyte), ("regEWLimit", c_ubyte),
|
("regRECounter", c_ubyte), ("regTECounter", c_ubyte),
|
("Reserved", c_ubyte)]
|
|
|
platformInfo = sys.version
|
if '64 bit' in platformInfo:
|
_CanDLLName = os.getcwd() + r'/config/ControlCANx64/ControlCAN.dll'
|
else:
|
_CanDLLName = os.getcwd() + r'/config/ControlCANx86/ControlCAN.dll'
|
USBCAN_DLL = windll.LoadLibrary(_CanDLLName)
|
|
|
class USBCAN():
|
|
def __init__(self,
|
device_type=4,
|
device_index=0,
|
can_index=0,
|
bitrate=None,
|
can_filters=None,
|
**kwargs):
|
# super(USBCAN, self).__init__(can_index, can_filters, **kwargs)
|
# self.logger = logging.getLogger()
|
# self.logger.setLevel(logging.DEBUG)
|
self.logger = logger
|
formatter = logging.Formatter(
|
'%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
# 终端Handler
|
consoleHandler = logging.StreamHandler()
|
consoleHandler.setLevel(logging.DEBUG)
|
# 文件Handler
|
fileHandler = logging.FileHandler('./log/USBCAN.log',
|
mode='a',
|
encoding='UTF-8')
|
fileHandler.setLevel(logging.NOTSET)
|
|
consoleHandler.setFormatter(formatter)
|
fileHandler.setFormatter(formatter)
|
|
self.logger.addHandler(consoleHandler)
|
self.logger.addHandler(fileHandler)
|
|
self.device = device_type
|
self.device_index = device_index
|
self.channel = can_index
|
|
Timing0, Timing1 = TIMING_DICT[bitrate]
|
|
if Timing0 is None or Timing1 is None:
|
self.logger.error("Timing registers are not set")
|
|
self.init_config = VCI_INIT_CONFIG(0, 0xFFFFFFFF, 0, 1, Timing0,
|
Timing1, 0)
|
|
def InitAndStart(self):
|
if USBCAN_DLL.VCI_OpenDevice(self.device, self.device_index,
|
0) == USBCAN_STATUS_ERR:
|
self.logger.error("VCI_OpenDevice Error")
|
return False
|
|
# for can_index in self.channels:
|
if USBCAN_DLL.VCI_InitCAN(self.device, self.device_index, self.channel,
|
byref(
|
self.init_config)) == USBCAN_STATUS_ERR:
|
self.logger.error("VCI_InitCAN Error")
|
self.CloseDevice()
|
return False
|
|
if USBCAN_DLL.VCI_StartCAN(self.device, self.device_index,
|
self.channel) == USBCAN_STATUS_ERR:
|
self.logger.error("VCI_StartCAN Error")
|
self.CloseDevice()
|
return False
|
return True
|
|
def OpenDevice(self, device_type, device_index, reserved):
|
try:
|
return USBCAN_DLL.VCI_OpenDevice(device_type, device_index,
|
reserved)
|
except:
|
self.logger.error("Exception on OpenDevice!")
|
raise
|
|
def CloseDevice(self):
|
try:
|
return USBCAN_DLL.VCI_CloseDevice(self.device, self.device_index)
|
except:
|
self.logger.error("Exception on CloseDevice!")
|
raise
|
|
def GetDeviceInf(self):
|
try:
|
info = USBCAN_DEVICE_INFO()
|
ret = USBCAN_DLL.VCI_ReadBoardInfo(self.device, self.device_index,
|
byref(info))
|
return info if ret == USBCAN_STATUS_OK else None
|
except:
|
self.logger.error("Exception on USBCAN_GetDeviceInf")
|
raise
|
|
def InitCAN(self, device_type, device_index, can_index, init_config):
|
try:
|
return USBCAN_DLL.VCI_InitCAN(device_type, device_index, can_index,
|
byref(init_config))
|
except:
|
self.logger.error("Exception on USBCAN_InitCAN!")
|
raise
|
|
def StartCAN(self, device_type, device_index, can_index):
|
try:
|
return USBCAN_DLL.VCI_StartCAN(device_type, device_index,
|
can_index)
|
except:
|
self.logger.error("Exception on USBCAN_StartCAN!")
|
raise
|
|
def ResetCAN(self, device_type, device_index, can_index):
|
try:
|
return USBCAN_DLL.VCI_ResetCAN(device_type, device_index,
|
can_index)
|
except:
|
self.logger.error("Exception on USBCAN_ResetCAN!")
|
raise
|
|
def ClearBuffer(self):
|
try:
|
return USBCAN_DLL.VCI_ClearBuffer(self.device, self.device_index,
|
self.channel)
|
except:
|
self.logger.error("Exception on USBCAN_ClearBuffer!")
|
raise
|
|
def GetReceiveNum(self):
|
try:
|
return USBCAN_DLL.VCI_GetReceiveNum(self.device, self.device_index,
|
self.channel)
|
except:
|
self.logger.error("Exception on USBCAN_GetReceiveNum!")
|
raise
|
|
def send(self, msg: Message):
|
# extern_flag = 1 if msg.is_extended_id else 0
|
# extern_flag = 0
|
raw_message = VCI_CAN_OBJ()
|
raw_message.ID = (c_uint)(msg.arbitration_id)
|
raw_message.TimeStamp = (c_uint)(0)
|
raw_message.TimeFlag = (c_ubyte)(0)
|
raw_message.SendType = (c_ubyte)(1)
|
raw_message.RemoteFlag = (c_ubyte)(msg.is_remote_frame)
|
raw_message.ExternFlag = (c_ubyte)(msg.is_extended_id)
|
raw_message.DataLen = (c_ubyte)(msg.dlc)
|
raw_message.Data = (c_ubyte * 8)(*[c_ubyte(c) for c in msg.data])
|
|
# (msg.arbitration_id, 0, 0, 1, msg.is_remote_frame,
|
# extern_flag, msg.dlc, (c_ubyte * 8)(*msg.data), (c_ubyte * 3)(*[0, 0, 0]))
|
|
USBCAN_DLL.VCI_Transmit(self.device, self.device_index, self.channel,
|
byref(raw_message), 1)
|
|
def Receive(self, len=10, timeout=None):
|
raw_message = (VCI_CAN_OBJ * len)()
|
|
timeout = -1 if timeout is None else c_int(timeout * 1000)
|
msg = list()
|
rtn = USBCAN_DLL.VCI_Receive(self.device,
|
self.device_index, self.channel,
|
byref(raw_message), len, timeout)
|
if rtn == 0xFFFFFFFF or rtn == 0:
|
return None, False
|
else:
|
for i in range(rtn):
|
msg.append(
|
Message(
|
timestamp=raw_message[i].TimeStamp
|
if raw_message[i].TimeFlag else 0.0,
|
arbitration_id=raw_message[i].ID,
|
is_remote_frame=raw_message[i].RemoteFlag,
|
channel=0,
|
extended_id=raw_message[i].ExternFlag,
|
data=raw_message[i].Data,
|
))
|
return (msg, rtn)
|
|
def ListeningMsg(self, connectRet, needClose, msgQueue: Queue,
|
sendQueue: Queue):
|
'''call by single process to deal can messages.
|
Arguments:
|
connectRet {Value} -- return value of connection
|
needClose {Value} -- disconnect when needed
|
msgQueue {Queue} -- received data
|
'''
|
while True:
|
while connectRet.value == 1:
|
# print(connectRet.value, needClose.value, start.value)
|
if needClose.value == 1:
|
ret = self.CloseDevice()
|
if ret == 1:
|
needClose.value = 0
|
connectRet.value = 0
|
return
|
# msgToSendCnt = sendQueue.qsize()
|
# if msgToSendCnt > 0:
|
# msg = Message()
|
# for i in range(msgToSendCnt):
|
# msg = sendQueue.get()
|
# self.send(msg, 1)
|
# time.sleep(0.001)
|
# loopCnt += 1
|
revRet, num = self.Receive(len=10)
|
if num == 0:
|
pass
|
else:
|
for i in range(num):
|
msgQueue.put(revRet[i])
|
self.logger.info(
|
"received can message.it't id is:{}".format(
|
str(revRet[i].arbitration_id)))
|
time.sleep(0.05)
|