|
import threading
|
from can import BusABC, Message
|
from udsoncan.client import Client
|
from udsoncan.exceptions import TimeoutException
|
import udsoncan
|
from udsoncan.connections import BaseConnection
|
from udsoncan import services, Response, MemoryLocation
|
import isotp
|
from isotp import CanMessage
|
import queue
|
from USBCAN import *
|
from Shifter import ShifterClass
|
from mainwindows import Ui_MainWindow
|
from PySide2 import QtWidgets, QtCore, QtGui
|
import struct
|
import time
|
import datetime
|
from multiprocessing import Array, Pool, Process, Queue, Value
|
|
MAX_RCV_NUM = 20
|
|
logger = logging.getLogger()
|
logger.setLevel(logging.DEBUG)
|
formatter = logging.Formatter(
|
'%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
consoleHandler = logging.StreamHandler()
|
consoleHandler.setLevel(logging.DEBUG)
|
|
fileHandler = logging.FileHandler(
|
'./log/ShiftTool.log', mode='a', encoding='UTF-8')
|
fileHandler.setLevel(logging.NOTSET)
|
|
consoleHandler.setFormatter(formatter)
|
fileHandler.setFormatter(formatter)
|
|
logger.addHandler(consoleHandler)
|
logger.addHandler(fileHandler)
|
|
|
class UISignals(QtCore.QObject):
|
sig_Disp_str = QtCore.Signal(str)
|
sig_MsgReceived = QtCore.Signal(int)
|
|
|
class HardwreDevice():
|
def __init__(self):
|
self.device_type = 4
|
self.channel = 0 # can_index
|
self.device = 0 # device_index
|
self.baudrate = 0
|
|
|
class PeriodSendThread(object):
|
def __init__(self, period_func, args=[], kwargs={}):
|
self._thread = threading.Thread(target=self._run)
|
self._function = period_func
|
self._args = args
|
self._kwargs = kwargs
|
self._period = 0
|
self._event = threading.Event()
|
self._period_event = threading.Event()
|
self._terminated = False
|
|
def start(self):
|
self._thread.start()
|
|
def stop(self):
|
self._terminated = True
|
self._event.set()
|
self._thread.join()
|
|
def send_start(self, period):
|
self._period = period
|
self._event.set()
|
|
def send_stop(self):
|
self._period_event.set()
|
|
def _run(self):
|
while True:
|
self._event.wait()
|
self._event.clear()
|
if self._terminated:
|
break
|
self._function(*self._args, **self._kwargs)
|
while not self._period_event.wait(self._period):
|
self._function(*self._args, **self._kwargs)
|
self._period_event.clear()
|
|
|
class ShifterTool():
|
|
class IsoTpConnection(BaseConnection):
|
|
mtu = 4095
|
|
def __init__(self, isotp_layer, name=None):
|
BaseConnection.__init__(self, name)
|
self.UDStoIsoTPQueue = queue.Queue() # USD --> isotp
|
self.IsoTPtoUDSQueue = queue.Queue() # ISOTP --> UDS
|
self.CANtoIsoTPQueue = queue.Queue() # CAN --> isotp
|
# self.IsoTPtoCANQueue = queue.Queue() # ISOTP--> CAN
|
self._read_thread = None
|
self.exit_requested = False
|
self.opened = False
|
self.isotp_layer = isotp_layer
|
|
assert isinstance(
|
self.isotp_layer, isotp.TransportLayer), 'isotp_layer must be a valid isotp.TransportLayer '
|
|
def open(self):
|
self.exit_requested = False
|
self._read_thread = threading.Thread(
|
None, target=self.rxthread_task)
|
self._read_thread.start()
|
self.opened = True
|
logger.info('Connection opened')
|
return self
|
|
def __enter__(self):
|
return self
|
|
def __exit__(self, type, value, traceback):
|
self.close()
|
|
def is_open(self):
|
return self.opened
|
|
def close(self):
|
self.empty_rxqueue()
|
self.empty_txqueue()
|
self.exit_requested = True
|
self._read_thread.join()
|
self.isotp_layer.reset()
|
self.opened = False
|
logger.info('Connection closed')
|
|
def specific_send(self, payload):
|
if self.mtu is not None:
|
if len(payload) > self.mtu:
|
logger.warning(
|
"Truncating payload to be set to a length of %d" % (self.mtu))
|
payload = payload[0:self.mtu]
|
|
# isotp.protocol.TransportLayer uses byte array. udsoncan is strict on bytes format
|
self.UDStoIsoTPQueue.put(bytearray(payload))
|
|
def specific_wait_frame(self, timeout=2):
|
if not self.opened:
|
raise RuntimeError("Connection is not open")
|
|
timedout = False
|
frame = None
|
try:
|
frame = self.IsoTPtoUDSQueue.get(block=True, timeout=timeout)
|
# frame = self.CANtoIsoTPQueue.get(block=True, timeout=timeout)
|
except queue.Empty:
|
timedout = True
|
|
if timedout:
|
raise TimeoutException(
|
"Did not receive frame IsoTP Transport layer in time (timeout=%s sec)" % timeout)
|
|
if self.mtu is not None:
|
if frame is not None and len(frame) > self.mtu:
|
logger.warning(
|
"Truncating received payload to a length of %d" % (self.mtu))
|
frame = frame[0:self.mtu]
|
|
# isotp.protocol.TransportLayer uses bytearray. udsoncan is strict on bytes format
|
return bytes(frame)
|
|
def empty_rxqueue(self):
|
while not self.IsoTPtoUDSQueue.empty():
|
self.IsoTPtoUDSQueue.get()
|
|
def empty_txqueue(self):
|
while not self.UDStoIsoTPQueue.empty():
|
self.UDStoIsoTPQueue.get()
|
|
def rxthread_task(self):
|
while not self.exit_requested:
|
try:
|
# self.logger.debug("UDStoIsoTPQueue queue size is now %d" % (
|
# self.UDStoIsoTPQueue.qsize()))
|
while not self.UDStoIsoTPQueue.empty():
|
self.isotp_layer.send(self.UDStoIsoTPQueue.get())
|
|
self.isotp_layer.process()
|
|
while self.isotp_layer.available():
|
self.IsoTPtoUDSQueue.put(self.isotp_layer.recv())
|
# self.logger.debug("IsoTPtoUDSQueue queue size is now %d" % (
|
# self.IsoTPtoUDSQueue.qsize()))
|
|
# time.sleep(self.isotp_layer.sleep_time())
|
time.sleep(0.0001)
|
|
except Exception as e:
|
self.exit_requested = True
|
logger.error(str(e))
|
print("Error occurred while read CAN(FD) data!")
|
|
def __init__(self, window: QtWidgets.QMainWindow):
|
super(ShifterTool, self).__init__()
|
# self.title("CCDiag")
|
# self.resizable(False, False)
|
# self.geometry(str(WIDGHT_WIDTH) + "x" +
|
# str(WIDGHT_HEIGHT) + '+200+100')
|
# self.protocol("WM_DELETE_WINDOW", self.Form_OnClosing)
|
self.msgQueue = Queue() # can layer receive queue
|
self.sendQueue = Queue() # can layer send queue
|
self.signal = UISignals()
|
|
self.shifter = ShifterClass()
|
self.devicedescription = HardwreDevice()
|
self.windows = QtWidgets.QMainWindow()
|
self.UI = Ui_MainWindow()
|
self.UI.setupUi(window)
|
self._dev_info = None
|
self.can_thread = threading.Thread(target=self.can_thread)
|
|
self.DeviceInit()
|
self.WidgetsInit()
|
self.ChnInfoUpdate(self._isOpen)
|
|
def DeviceInit(self):
|
self._usbcan = None
|
self._isOpen = Value('i', 0) # 进程之间交换can连接状态
|
self._isChnOpen = False
|
self.needdisconnect = Value('i', 0) # 当前连接需要断开
|
|
# current device info
|
self._is_canfd = False
|
self._res_support = False
|
|
# read can/canfd message thread
|
# self._read_thread = None
|
self._terminated = False
|
self._lock = threading.RLock()
|
|
self.isotp_params = {
|
# Will request the sender to wait 32ms between consecutive frame. 0-127ms or 100-900ns with values from 0xF1-0xF9
|
'stmin': 32,
|
# Request the sender to send 8 consecutives frames before sending a new flow control message
|
'blocksize': 8,
|
# Number of wait frame allowed before triggering an error
|
'wftmax': 0,
|
# Link layer (CAN layer) works with 8 byte payload (CAN 2.0)
|
'tx_data_length': 8,
|
# Will pad all transmitted CAN messages with byte 0x00. None means no padding
|
'tx_padding': 0,
|
# Triggers a timeout if a flow control is awaited for more than 1000 milliseconds
|
'rx_flowcontrol_timeout': 1000,
|
# Triggers a timeout if a consecutive frame is awaited for more than 1000 milliseconds
|
'rx_consecutive_frame_timeout': 1000,
|
# When sending, respect the stmin requirement of the receiver. If set to True, go as fast as possible.
|
'squash_stmin_requirement': False
|
}
|
self._isotpaddr_PHYS = isotp.Address(
|
isotp.AddressingMode.Normal_11bits, txid=self.shifter.canid.phy_rxId, rxid=self.shifter.canid.phy_txId)
|
self._isotpaddr_FUNC = isotp.Address(
|
isotp.AddressingMode.Normal_11bits, txid=self.shifter.canid.fun_rxId, rxid=self.shifter.canid.phy_txId)
|
# self._isotpaddr_EPS = isotp.Address(
|
# isotp.AddressingMode.Normal_11bits, txid=EPS_RX_ID_PHYS, rxid=EPS_TX_ID)
|
# self._isotpaddr_EPS4wd = isotp.Address(
|
# isotp.AddressingMode.Normal_11bits, txid=EPS4wd_RX_ID_PHYS, rxid=EPS4wd_TX_ID)
|
self.isotp_layer = isotp.TransportLayer(
|
rxfn=self.isotp_rcv, txfn=self.isotp_send, address=self._isotpaddr_PHYS, params=self.isotp_params)
|
self.conn = ShifterTool.IsoTpConnection(isotp_layer=self.isotp_layer)
|
self.udsclient = Client(self.conn, request_timeout=2)
|
self.udsclient.config['security_algo'] = self.SecAlgo
|
self.udsclient.config['security_algo_params'] = [
|
0x4FE87269, 0x6BC361D8, 0x9B127D51, 0x5BA41903]
|
self.udsclient.config['data_identifiers'] = self.shifter.did_config
|
|
self.udsclient.config['server_address_format'] = 32
|
self.udsclient.config['server_memorysize_format'] = 32
|
|
def SecAlgo(self, level, seed, params):
|
"""
|
Builds the security key to unlock a security level.
|
|
temp_key = bytearray(seed)
|
self.output_key = bytearray(seed)
|
xorkey = bytearray(params['xorkey'])
|
|
for i in range(len(temp_key)):
|
temp_key[i] = temp_key[i] ^ xorkey[i]
|
|
self.output_key[0] = (temp_key[3] & 0x0F) | (temp_key[2] & 0xF0)
|
self.output_key[1] = ((temp_key[2] & 0x1F) << 3) | (
|
(temp_key[1] & 0xF8) >> 3)
|
self.output_key[2] = ((temp_key[1] & 0xFC) >> 2) | (temp_key[0] & 0xC0)
|
self.output_key[3] = ((temp_key[0] & 0x0F) << 4) | (temp_key[3] & 0x0F)
|
"""
|
temp_key = (seed[0] << 24) | (
|
seed[1] << 16) | (seed[2] << 8) | (seed[3])
|
if level == 0x01:
|
output_key_temp = ((((temp_key >> 4) ^ temp_key)
|
<< 3) ^ temp_key) & 0xFFFFFFFF
|
elif level == 0x11:
|
_temp_y = ((temp_key << 24) & 0xFF000000) + ((temp_key << 8) &
|
0xFF0000) + ((temp_key >> 8) & 0xFF00) + ((temp_key >> 24) & 0xFF)
|
_temp_z = 0
|
_temp_sum = 0
|
for i in range(64):
|
_temp_y += ((((_temp_z << 4) ^ (_temp_z >> 5)) + _temp_z)
|
^ (_temp_sum + params[_temp_sum & 0x3])) & 0xFFFFFFFF
|
_temp_y = _temp_y & 0xFFFFFFFF
|
_temp_sum += 0x8F750A1D
|
_temp_sum = _temp_sum & 0xFFFFFFFF
|
_temp_z += ((((_temp_y << 4) ^ (_temp_y >> 5)) + _temp_y) ^
|
(_temp_sum + params[(_temp_sum >> 11) & 0x3])) & 0xFFFFFFFF
|
_temp_z = _temp_z & 0xFFFFFFFF
|
output_key_temp = (((_temp_z << 24) & 0xFF000000) | ((_temp_z << 8) & 0xFF0000) | (
|
(_temp_z >> 8) & 0xFF00) | ((_temp_z >> 24) & 0xFF))
|
else:
|
output_key_temp = temp_key
|
|
output_key = struct.pack('BBBB', (output_key_temp >> 24) & 0xFF, (
|
output_key_temp >> 16) & 0xFF, (output_key_temp >> 8) & 0xFF, output_key_temp & 0xFF)
|
|
return output_key
|
|
def getDateTimeBytes(self):
|
"""
|
get year/month/day and convert into bytes
|
"""
|
_year_high = int(str(datetime.datetime.now().year), 16) >> 8
|
_year_low = int(str(datetime.datetime.now().year), 16) & 0xFF
|
_month = int(str(datetime.datetime.now().month), 16)
|
_day = int(str(datetime.datetime.now().day), 16)
|
_hour = int(str(datetime.datetime.now().hour), 16)
|
_minute = int(str(datetime.datetime.now().minute), 16)
|
_second = int(str(datetime.datetime.now().second), 16)
|
|
return (_year_high, _year_low, _month, _day, _hour, _minute, _second)
|
|
def isotp_rcv(self):
|
'''receive msg from can layer
|
use queue read from can layer listen'''
|
can_msgs = None
|
if not self.conn.CANtoIsoTPQueue.empty():
|
can_msgs = self.conn.CANtoIsoTPQueue.get()
|
# can_num = self.conn.CANtoIsoTPQueue.size()
|
# # can_num = self._usbcan.GetReceiveNum(self.devicedescription.channel)
|
# if can_num and not self._terminated:
|
# read_cnt = MAX_RCV_NUM if can_num >= MAX_RCV_NUM else can_num
|
# for i in range(read_cnt):
|
# can_msgs[i] = self.conn.CANtoIsoTPQueue.get()
|
# else:
|
# can_msgs = None
|
return can_msgs
|
|
def isotp_send(self, isotp_msg):
|
'''send isotp message to can layer
|
use queue send'''
|
msg = Message()
|
msg.arbitration_id = isotp_msg.arbitration_id
|
msg.dlc = isotp_msg.dlc
|
msg.data = isotp_msg.data
|
msg.is_extended_id = False
|
# for i in range(isotp_msg.dlc):
|
# msg.data[i] = isotp_msg.data[i]
|
self.sendQueue.put(msg)
|
|
def WidgetsInit(self):
|
# self.UI.setupUi(self)
|
self.UI.comboBox_2.addItem('USBCAN-I')
|
self.UI.comboBox_2.addItem('USBCAN-II')
|
|
# 波特率
|
self.UI.comboBox_3.addItems([('%dK' % (i/1000))
|
for i in TIMING_DICT.keys()])
|
# CHANNEL
|
self.UI.comboBox_5.addItems(['CH1/3', 'CH2/4'])
|
|
self.UI.pushButton.clicked.connect(self.open_close)
|
|
self.UI.comboBox_3.activated.connect(self.update_HardwareDevice)
|
self.UI.comboBox_5.activated.connect(self.update_HardwareDevice)
|
|
self.signal.sig_Disp_str.connect(self.disp_string)
|
self.signal.sig_MsgReceived.connect(self._updateRootList)
|
|
self.initUDSUI()
|
|
def initUDSUI(self):
|
self.UI.comboBox.addItems(['0x01默认模式', '0x02编程模式', '0x03编程模式'])
|
self.UI.comboBox_8.addItems(['0x01硬件复位', '0x03软件复位'])
|
self.UI.comboBox_9.addItems(['0x00使能收发', '0x01能收禁发', '0x03禁止收发'])
|
self.UI.radioButton.toggled.connect(self.TestPresentEvent)
|
|
def _formatMsgData(self, index, item, received):
|
'''msg data to list
|
|
Arguments:
|
index {int} -- msg index
|
item {} -- recevive
|
received {-bool} -- always true
|
|
Returns:
|
[list] -- data list:
|
[index, received, TimeStamp, id,
|
RemoteFlag, ExternFlag, DataLen, data]
|
'''
|
|
data = []
|
if received:
|
data.append('{0:0>4}'.format(index))
|
data.append(item.timestamp)
|
data.append('接收')
|
else:
|
data.append('')
|
data.append(item.timestamp)
|
data.append('发送')
|
data.append(str.upper(str(hex(item.arbitration_id))).replace('X', 'x'))
|
|
data.append(item.dlc)
|
if int(item.is_extended_id) == 1:
|
data.append('扩展帧')
|
else:
|
data.append('标准帧')
|
|
data.append(' '.join(['0x' +
|
'{:0<2x}'.format(a).upper() for a in list(item.data)]))
|
return data
|
|
def can_thread_stop(self):
|
self.can_thread.join()
|
|
def can_thread_start(self):
|
self.can_thread.start()
|
|
def can_thread(self):
|
while self._isOpen.value == 1:
|
time.sleep(0.01)
|
if self.needdisconnect.value == 1:
|
ret = self._usbcan.CloseDevice()
|
if ret == 1:
|
self.needdisconnect.value = 0
|
self._isOpen.value = 0
|
else:
|
msg, num = self._usbcan.Receive(len=10)
|
if not num == 0:
|
for i in range(num):
|
if msg[i].arbitration_id == self.shifter.canid.phy_txId:
|
self.conn.CANtoIsoTPQueue.put(
|
msg[i]) # send msg to isotp
|
self.msgQueue.put(msg[i])
|
# send signal to update screen
|
logger.info('Rx: ID=0x{:0<3x} '.format(msg[i].arbitration_id) + ' '.join(['0x' +
|
'{:0<2x}'.format(a).upper() for a in list(msg[i].data)]))
|
self.signal.sig_MsgReceived.emit(num)
|
msgToSendCnt = self.sendQueue.qsize()
|
if msgToSendCnt > 0:
|
msg = Message()
|
for i in range(msgToSendCnt):
|
|
msg = self.sendQueue.get()
|
logger.info('Tx: ID=0x{:0<3x} '.format(
|
msg.arbitration_id)+'DLC={} '.format(msg.dlc)+'externd flag ={} '.format(msg.is_extended_id)+'remote frame:{} '.format(msg.is_remote_frame))
|
self._usbcan.send(msg)
|
|
def open_close(self):
|
if self._isOpen.value == 0:
|
if self._usbcan is None:
|
self.update_HardwareDevice()
|
can_filters = [
|
{'can_id': 0x420, 'can_mask': 0xFFFFFFFF}]
|
|
bitrate = list(TIMING_DICT.keys())[
|
self.devicedescription.baudrate]
|
self._usbcan = USBCAN(device_type=4, device_index=0,
|
can_index=0, bitrate=bitrate, can_filters=can_filters)
|
if not self._usbcan.InitAndStart():
|
logger.info("Open usbcan device fail.")
|
|
self._isOpen.value = 1
|
self._deviceOpenUpdate()
|
self.can_thread_start()
|
self.conn.open() # start iso tp thread
|
|
else:
|
self._usbcan.InitAndStart()
|
self._isOpen.value = 1
|
self._deviceOpenUpdate()
|
else:
|
self.needdisconnect.value = 1
|
self._deviceOpenUpdate()
|
|
def update_HardwareDevice(self):
|
|
self.devicedescription.device_type = self.UI.comboBox_2.currentIndex()+3
|
self.devicedescription.channel = self.UI.comboBox_5.currentIndex()
|
self.devicedescription.baudrate = self.UI.comboBox_3.currentIndex()
|
|
def _deviceOpenUpdate(self):
|
if self._isOpen.value == 1:
|
self.UI.pushButton.setText("Close CAN")
|
elif self.needdisconnect.value == 1:
|
self.UI.pushButton.setText("Open CAN")
|
# set ui info
|
|
def _StartlistenMsgProcess(self):
|
self.msgProcess = Process(
|
name='pyUSBCANListener',
|
target=self._usbcan.ListeningMsg,
|
args=(self._isOpen, self.needdisconnect, self.msgQueue, self.sendQueue))
|
self.msgProcess.daemon = True
|
self.msgProcess.start()
|
# 1.5s后检测连接状态,该值可能需要标定
|
# self.root.after(1500, func=self._checkConnectStatus)
|
while self._isOpen.value == 1:
|
self._updateRootList()
|
time.sleep(0.1)
|
|
def disp_string(self, out):
|
self.UI.statusbar.showMessage(str(out))
|
|
def _updateRootList(self):
|
_dataSize = self.msgQueue.qsize()
|
receiveNum = 0
|
formateddata = list()
|
if _dataSize > 5:
|
_dataSize = 5
|
for i in range(_dataSize):
|
receiveNum += 1
|
msg = self.msgQueue.get()
|
formateddata.append(self._formatMsgData(
|
receiveNum, msg, True)) # return a data list
|
self._insertDataSmooth(data=formateddata, datasize=_dataSize)
|
|
def _insertDataSmooth(self, data, datasize):
|
# row = 6-datasize
|
# for row in range(datasize):
|
for row in range(datasize):
|
insertdata = data[row]
|
if insertdata[3] == '0x420':
|
row = 0
|
elif insertdata[3] == "0x77a":
|
row = 1
|
for column in range(7):
|
item = QtWidgets.QTableWidgetItem()
|
item.setTextAlignment(0x84) # 文本显示位置
|
item.setText(str(insertdata[column]))
|
|
self.UI.tableWidget.setItem(row, column, item)
|
|
def ChnInfoUpdate(self, openflag):
|
pass
|
|
def TestPresentEvent(self):
|
self.udsclient.tester_present()
|