from ast import Not, Pass
|
from asyncio.windows_events import NULL
|
from concurrent.futures import thread
|
from curses import flash
|
from logging import exception
|
import threading
|
from typing import List
|
from can import BusABC, Message, Logger, Listener
|
from udsoncan.client import Client
|
from udsoncan.exceptions import TimeoutException
|
import udsoncan
|
from udsoncan.connections import BaseConnection
|
from udsoncan import services, Response, MemoryLocation
|
import isotp
|
from isotp import CanMessage
|
import queue
|
from USBCAN import *
|
from Shifter import *
|
from styleSheet import *
|
from mainwindows import Ui_MainWindow
|
from PySide2 import QtWidgets, QtCore, QtGui
|
import struct
|
import time
|
import datetime
|
from ShifterDefine import *
|
from multiprocessing import Process, Queue, Value, Pipe
|
from hexread import *
|
|
MAX_RCV_NUM = 20
|
APP_ADDR_LOCATION_OFFSET = 8
|
APP_LENGTH_LOCATION_OFFSET = 12
|
logger = logging.getLogger()
|
logger.setLevel(logging.DEBUG)
|
formatter = logging.Formatter(
|
'%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
consoleHandler = logging.StreamHandler()
|
consoleHandler.setLevel(logging.DEBUG)
|
|
fileHandler = logging.FileHandler('./log/ShiftTool.log',
|
mode='a',
|
encoding='UTF-8')
|
fileHandler.setLevel(logging.DEBUG)
|
|
consoleHandler.setFormatter(formatter)
|
fileHandler.setFormatter(formatter)
|
|
logger.addHandler(consoleHandler)
|
logger.addHandler(fileHandler)
|
|
|
class UISignals(QtCore.QObject):
|
sig_Disp_str = QtCore.Signal(str)
|
sig_MsgReceived = QtCore.Signal(int)
|
|
|
g_signal = UISignals()
|
uds_conn, isotp_conn = Pipe()
|
user_conn, drive_conn = Pipe()
|
|
|
class HardwreDevice():
|
|
def __init__(self):
|
self.device_type = 4
|
self.channel = 0 # can_index
|
self.device = 0 # device_index
|
self.baudrate = 0
|
|
|
class PeriodSendThread(object):
|
|
def __init__(self, period_func, args=[], kwargs={}):
|
self._thread = threading.Thread(target=self._run)
|
self._function = period_func
|
self._args = args
|
self._kwargs = kwargs
|
self._period = 0
|
self._event = threading.Event()
|
self._period_event = threading.Event()
|
self._terminated = False
|
|
def start(self):
|
self._thread.start()
|
|
def stop(self):
|
self._terminated = True
|
self._event.set()
|
self._thread.join()
|
|
def send_start(self, period):
|
self._period = period
|
self._event.set()
|
|
def send_stop(self):
|
self._period_event.set()
|
|
def _run(self):
|
while True:
|
self._event.wait()
|
self._event.clear()
|
if self._terminated:
|
break
|
self._function(*self._args, **self._kwargs)
|
while not self._period_event.wait(self._period):
|
self._function(*self._args, **self._kwargs)
|
self._period_event.clear()
|
|
|
class ShifterTool():
|
|
class IsoTpConnection(BaseConnection):
|
|
mtu = 4095
|
|
def __init__(self, isotp_layer, name=None):
|
BaseConnection.__init__(self, name)
|
self.UDStoIsoTPQueue = queue.Queue() # USD --> isotp
|
self.IsoTPtoUDSQueue = queue.Queue() # ISOTP --> UDS
|
# self.IsoTPtoCANQueue = queue.Queue() # ISOTP--> CAN
|
self._read_thread = None
|
self.exit_requested = False
|
self.opened = False
|
self.isotp_layer = isotp_layer
|
|
assert isinstance(
|
self.isotp_layer, isotp.TransportLayer
|
), 'isotp_layer must be a valid isotp.TransportLayer '
|
|
def open(self):
|
self.exit_requested = False
|
self._read_thread = threading.Thread(None,
|
target=self.rxthread_task)
|
self._read_thread.start()
|
self.opened = True
|
logger.info('Connection opened')
|
return self
|
|
def __enter__(self):
|
return self
|
|
def __exit__(self, type, value, traceback):
|
self.close()
|
|
def is_open(self):
|
return self.opened
|
|
def close(self):
|
self.empty_rxqueue()
|
self.empty_txqueue()
|
self.exit_requested = True
|
self._read_thread.join()
|
self.isotp_layer.reset()
|
self.opened = False
|
logger.info('Connection closed')
|
|
def specific_send(self, payload):
|
if self.mtu is not None:
|
if len(payload) > self.mtu:
|
logger.warning(
|
"Truncating payload to be set to a length of %d" %
|
(self.mtu))
|
payload = payload[0:self.mtu]
|
|
# isotp.protocol.TransportLayer uses byte array. udsoncan is strict on bytes format
|
self.isotp_layer.send(bytearray(payload))
|
# self.UDStoIsoTPQueue.put(bytearray(payload))
|
|
def specific_wait_frame(self, timeout=2):
|
if not self.opened:
|
raise RuntimeError("Connection is not open")
|
|
timedout = False
|
frame = None
|
# frame = uds_conn.recv()
|
try:
|
frame = self.IsoTPtoUDSQueue.get(block=True, timeout=5)
|
except queue.Empty:
|
timedout = True
|
# frame = self.CANtoIsoTPQueue.get(block=True, timeout=timeout)
|
|
if timedout:
|
raise TimeoutException(
|
"Did not receive frame IsoTP Transport layer in time (timeout=%s sec)"
|
% timeout)
|
|
if self.mtu is not None:
|
if frame is not None and len(frame) > self.mtu:
|
logger.warning(
|
"Truncating received payload to a length of %d" %
|
(self.mtu))
|
frame = frame[0:self.mtu]
|
|
# isotp.protocol.TransportLayer uses bytearray. udsoncan is strict on bytes format
|
return bytes(frame)
|
|
def empty_rxqueue(self):
|
while not self.IsoTPtoUDSQueue.empty():
|
self.IsoTPtoUDSQueue.get()
|
|
def empty_txqueue(self):
|
while not self.UDStoIsoTPQueue.empty():
|
self.UDStoIsoTPQueue.get()
|
|
def rxthread_task(self):
|
while not self.exit_requested:
|
try:
|
# # self.logger.debug("UDStoIsoTPQueue queue size is now %d" % (
|
# # self.UDStoIsoTPQueue.qsize()))
|
# while not self.UDStoIsoTPQueue.empty():
|
# self.isotp_layer.send(self.UDStoIsoTPQueue.get())
|
|
self.isotp_layer.process()
|
|
while self.isotp_layer.available():
|
# isotp_conn.send(self.isotp_layer.recv())
|
self.IsoTPtoUDSQueue.put(self.isotp_layer.recv())
|
self.logger.debug(
|
"IsoTPtoUDSQueue queue size is now %d" %
|
(self.IsoTPtoUDSQueue.qsize()))
|
|
# time.sleep(self.isotp_layer.sleep_time())
|
time.sleep(0.001)
|
|
except Exception as e:
|
self.exit_requested = True
|
logger.error(str(e))
|
print("Error occurred while read CAN(FD) data!")
|
|
def __init__(self, window: QtWidgets.QMainWindow):
|
super(ShifterTool, self).__init__()
|
# self.title("CCDiag")
|
# self.resizable(False, False)
|
# self.geometry(str(WIDGHT_WIDTH) + "x" +
|
# str(WIDGHT_HEIGHT) + '+200+100')
|
# self.protocol("WM_DELETE_WINDOW", self.Form_OnClosing)
|
self.msgQueue = Queue() # can layer receive queue
|
self.sendQueue = Queue() # can layer send queue
|
self.CANtoIsoTPQueue = Queue() # CAN --> isotp
|
self.shifter = ShifterClass()
|
self.Vehicle = VehicleClass()
|
self.devicedescription = HardwreDevice()
|
self.windows = QtWidgets.QMainWindow()
|
self.UI = Ui_MainWindow()
|
self.UI.setupUi(window)
|
self._dev_info = None
|
self.dbc = cantools.database.load_file("DBC/SX7H.dbc")
|
self.can_thread = threading.Thread(target=self.can_thread)
|
self.TestPresentTimer = QtCore.QTimer()
|
self.TestPresentTimer.timeout.connect(self.creat_testpresentReq)
|
self.DeviceInit()
|
self.WidgetsInit()
|
self.ChnInfoUpdate(self._isOpen)
|
self.in_programming = False
|
self.startfromboot = False
|
self.drv_data = [
|
0x1c, 0x01, 0x06, 0x80, 0x1f, 0x01, 0x06, 0x80, 0xfb, 0x0a
|
]
|
self.app_data: bytes = None
|
self.start_timestamp = time.time()
|
self.boot_logger = None
|
|
def DeviceInit(self):
|
self._usbcan = None
|
self._isOpen = Value('i', 0) # 进程之间交换can连接状态
|
self._isChnOpen = False
|
self.needdisconnect = Value('i', 0) # 当前连接需要断开
|
|
# current device info
|
self._is_canfd = False
|
self._res_support = False
|
|
# read can/canfd message thread
|
# self._read_thread = None
|
self._terminated = False
|
self._lock = threading.RLock()
|
|
self.isotp_params = {
|
# Will request the sender to wait 32ms between consecutive frame. 0-127ms or 100-900ns with values from 0xF1-0xF9
|
'stmin': 32,
|
# Request the sender to send 8 consecutives frames before sending a new flow control message
|
'blocksize': 8,
|
# Number of wait frame allowed before triggering an error
|
'wftmax': 0,
|
# Link layer (CAN layer) works with 8 byte payload (CAN 2.0)
|
'tx_data_length': 8,
|
# Will pad all transmitted CAN messages with byte 0x00. None means no padding
|
'tx_padding': 0,
|
# Triggers a timeout if a flow control is awaited for more than 1000 milliseconds
|
'rx_flowcontrol_timeout': 1000,
|
# Triggers a timeout if a consecutive frame is awaited for more than 1000 milliseconds
|
'rx_consecutive_frame_timeout': 1000,
|
# When sending, respect the stmin requirement of the receiver. If set to True, go as fast as possible.
|
'squash_stmin_requirement': False
|
}
|
self._isotpaddr_PHYS = isotp.Address(
|
isotp.AddressingMode.Normal_11bits,
|
txid=self.shifter.canid.phy_rxId,
|
rxid=self.shifter.canid.phy_txId)
|
self._isotpaddr_FUNC = isotp.Address(
|
isotp.AddressingMode.Normal_11bits,
|
txid=self.shifter.canid.fun_rxId,
|
rxid=self.shifter.canid.phy_txId)
|
# self._isotpaddr_EPS = isotp.Address(
|
# isotp.AddressingMode.Normal_11bits, txid=EPS_RX_ID_PHYS, rxid=EPS_TX_ID)
|
# self._isotpaddr_EPS4wd = isotp.Address(
|
# isotp.AddressingMode.Normal_11bits, txid=EPS4wd_RX_ID_PHYS, rxid=EPS4wd_TX_ID)
|
self.isotp_layer = isotp.TransportLayer(rxfn=self.isotp_rcv,
|
txfn=self.isotp_send,
|
address=self._isotpaddr_PHYS,
|
params=self.isotp_params)
|
self.conn = ShifterTool.IsoTpConnection(isotp_layer=self.isotp_layer)
|
self.udsclient = Client(self.conn, request_timeout=2)
|
self.udsclient.config['security_algo'] = self.SecAlgo
|
self.udsclient.config['security_algo_params'] = [
|
0x11223344, 0x20AA097B, 0x11223344, 0x70237577
|
]
|
self.udsclient.config['data_identifiers'] = self.shifter.did_config
|
|
self.udsclient.config['server_address_format'] = 32
|
self.udsclient.config['server_memorysize_format'] = 32
|
|
def SecAlgo(self, level, seed, params=None):
|
"""
|
Builds the security key to unlock a security level.
|
|
temp_key = bytearray(seed)
|
self.output_key = bytearray(seed)
|
xorkey = bytearray(params['xorkey'])
|
|
for i in range(len(temp_key)):
|
temp_key[i] = temp_key[i] ^ xorkey[i]
|
|
self.output_key[0] = (temp_key[3] & 0x0F) | (temp_key[2] & 0xF0)
|
self.output_key[1] = ((temp_key[2] & 0x1F) << 3) | (
|
(temp_key[1] & 0xF8) >> 3)
|
self.output_key[2] = ((temp_key[1] & 0xFC) >> 2) | (temp_key[0] & 0xC0)
|
self.output_key[3] = ((temp_key[0] & 0x0F) << 4) | (temp_key[3] & 0x0F)
|
"""
|
temp_key = (seed[0] << 24) | (seed[1] << 16) | (seed[2] << 8) | (
|
seed[3])
|
if level == 0x01:
|
output_key_temp = 0x20AA097B
|
output_key = struct.pack('BBBB', (output_key_temp >> 24) & 0xFF,
|
(output_key_temp >> 16) & 0xFF,
|
(output_key_temp >> 8) & 0xFF,
|
output_key_temp & 0xFF)
|
# output_key_temp = ((((temp_key >> 4) ^ temp_key)
|
# << 3) ^ temp_key) & 0xFFFFFFFF
|
elif level == 0x09:
|
output_key_temp = 0x70237577
|
output_key = struct.pack('BBBB', (output_key_temp >> 24) & 0xFF,
|
(output_key_temp >> 16) & 0xFF,
|
(output_key_temp >> 8) & 0xFF,
|
output_key_temp & 0xFF)
|
# _temp_y = ((temp_key << 24) & 0xFF000000) + ((temp_key << 8) &
|
# 0xFF0000) + ((temp_key >> 8) & 0xFF00) + ((temp_key >> 24) & 0xFF)
|
# _temp_z = 0
|
# _temp_sum = 0
|
# for i in range(64):
|
# _temp_y += ((((_temp_z << 4) ^ (_temp_z >> 5)) + _temp_z)
|
# ^ (_temp_sum + params[_temp_sum & 0x3])) & 0xFFFFFFFF
|
# _temp_y = _temp_y & 0xFFFFFFFF
|
# _temp_sum += 0x8F750A1D
|
# _temp_sum = _temp_sum & 0xFFFFFFFF
|
# _temp_z += ((((_temp_y << 4) ^ (_temp_y >> 5)) + _temp_y) ^
|
# (_temp_sum + params[(_temp_sum >> 11) & 0x3])) & 0xFFFFFFFF
|
# _temp_z = _temp_z & 0xFFFFFFFF
|
# output_key_temp = (((_temp_z << 24) & 0xFF000000) | ((_temp_z << 8) & 0xFF0000) | (
|
# (_temp_z >> 8) & 0xFF00) | ((_temp_z >> 24) & 0xFF))
|
elif level == 0x7D:
|
output_key_temp = 0xAB2F9F36099D81F3
|
output_key = struct.pack(
|
'BBBBBBBB', (output_key_temp >> 56) & 0xFF,
|
(output_key_temp >> 48) & 0xFF, (output_key_temp >> 40) & 0xFF,
|
(output_key_temp >> 32) & 0xFF, (output_key_temp >> 24) & 0xFF,
|
(output_key_temp >> 16) & 0xFF, (output_key_temp >> 8) & 0xFF,
|
output_key_temp & 0xFF)
|
else:
|
output_key_temp = temp_key
|
|
output_key = struct.pack('BBBB', (output_key_temp >> 24) & 0xFF,
|
(output_key_temp >> 16) & 0xFF,
|
(output_key_temp >> 8) & 0xFF,
|
output_key_temp & 0xFF)
|
|
return output_key
|
|
def getDateTimeBytes(self):
|
"""
|
get year/month/day and convert into bytes
|
"""
|
_year_high = int(str(datetime.datetime.now().year), 16) >> 8
|
_year_low = int(str(datetime.datetime.now().year), 16) & 0xFF
|
_month = int(str(datetime.datetime.now().month), 16)
|
_day = int(str(datetime.datetime.now().day), 16)
|
_hour = int(str(datetime.datetime.now().hour), 16)
|
_minute = int(str(datetime.datetime.now().minute), 16)
|
_second = int(str(datetime.datetime.now().second), 16)
|
|
return (_year_high, _year_low, _month, _day, _hour, _minute, _second)
|
|
def isotp_rcv(self):
|
'''receive msg from can layer
|
use queue read from can layer listen'''
|
can_msgs = None
|
if not self.CANtoIsoTPQueue.empty():
|
can_msgs = self.CANtoIsoTPQueue.get()
|
return can_msgs
|
|
def isotp_send(self, isotp_msg):
|
'''send isotp message to can layer
|
use queue send'''
|
msg = Message()
|
msg.arbitration_id = isotp_msg.arbitration_id
|
msg.dlc = isotp_msg.dlc
|
msg.channel = 0
|
msg.data = isotp_msg.data # bytearray
|
msg.is_extended_id = False
|
msg.timestamp = time.time() - self.start_timestamp
|
msg.is_error_frame = False
|
msg.is_remote_frame = False
|
self.Tool_send(msg)
|
|
def Tool_send(self, msg: Message):
|
self._usbcan.send(msg)
|
if self.boot_logger is not None:
|
self.boot_logger.on_message_received(msg)
|
|
def WidgetsInit(self):
|
# self.UI.setupUi(self)
|
self.UI.comboBox_2.addItem('USBCAN-I')
|
self.UI.comboBox_2.addItem('USBCAN-II')
|
|
# 波特率
|
self.UI.comboBox_3.addItems([('%dK' % (i / 1000))
|
for i in TIMING_DICT.keys()])
|
# CHANNEL
|
self.UI.comboBox_5.addItems(['CH1/3', 'CH2/4'])
|
|
self.UI.pushButton.clicked.connect(self.open_close)
|
|
self.UI.comboBox_3.activated.connect(self.update_HardwareDevice)
|
self.UI.comboBox_5.activated.connect(self.update_HardwareDevice)
|
|
g_signal.sig_Disp_str.connect(self.disp_string)
|
g_signal.sig_MsgReceived.connect(self._updateRootList)
|
|
self.initUDSUI()
|
|
def initUDSUI(self):
|
self.UI.comboBox.addItems(['0x01默认模式', '0x02编程模式', '0x03扩展模式'])
|
self.UI.comboBox.activated.connect(self.sessioncontrol_req)
|
|
self.UI.comboBox_8.addItems(['0x01硬件复位', '0x03软件复位'])
|
self.UI.comboBox_8.activated.connect(self.MCUReset_req)
|
|
self.UI.comboBox_9.addItems(['0x00使能收发', '0x01能收禁发', '0x03禁止收发'])
|
self.UI.comboBox_9.activated.connect(self.communicationControl_req)
|
self.UI.comboBox_6.addItems(list(DID_dic.keys()))
|
self.DID_display()
|
self.UI.comboBox_6.activated.connect(self.DID_display)
|
self.UI.comboBox_7.addItems(list(DTCGroup_dic.keys()))
|
self.UI.comboBox_7.activated.connect(self.ClearDTC)
|
|
self.UI.pushButton_31.clicked.connect(self.SecurityUnlockLevel_1)
|
self.UI.pushButton_32.clicked.connect(self.SecurityUnlockLevel_2)
|
|
self.UI.pushButton_12.clicked.connect(self.ReadSupplyID)
|
self.UI.pushButton_4.clicked.connect(self.ReadVIN)
|
self.UI.pushButton_6.clicked.connect(self.ReadMfgDate)
|
self.UI.pushButton_14.clicked.connect(self.ReadDataByID)
|
self.UI.pushButton_3.clicked.connect(self.WriteDataByID)
|
self.UI.pushButton_9.clicked.connect(self.StartCalibraiton)
|
self.UI.pushButton_11.clicked.connect(self.Calibraiton_Z)
|
self.UI.pushButton_13.clicked.connect(self.Calibraiton_M)
|
self.UI.pushButton_18.clicked.connect(self.Calibraiton_MP)
|
self.UI.pushButton_22.clicked.connect(self.Calibraiton_MN)
|
self.UI.pushButton_24.clicked.connect(self.Calibraiton_X2)
|
self.UI.pushButton_26.clicked.connect(self.Calibraiton_X1)
|
self.UI.pushButton_27.clicked.connect(self.Calibraiton_Y1)
|
self.UI.pushButton_28.clicked.connect(self.Calibraiton_Y2)
|
self.UI.pushButton_29.clicked.connect(self.Calibraiton_GAP)
|
|
self.UI.radioButton.toggled.connect(self.TestPresentEvent)
|
|
self.UI.pushButton_48.clicked.connect(self.start_programming)
|
self.UI.pushButton_46.clicked.connect(self.loadAppfile)
|
self.UI.radioButton_2.toggled.connect(self.SetFromBootFlag)
|
|
self.UI.pushButton_47.clicked.connect(self.SetBootLog)
|
# self.UI.comboBox_10.activated.connect(self.ReportByMask)
|
|
self.UI.comboBox_11.addItems(list(ReportBymask_DTC.keys()))
|
self.UI.comboBox_12.addItems(list(DTC_Dic.keys()))
|
self.UI.comboBox_13.addItems(list(DTC_Control_dic.keys()))
|
self.UI.comboBox_13.activated.connect(self.DTC_Control)
|
|
self.UI.pushButton_49.clicked.connect(self.ReportSupportDTC)
|
self.UI.comboBox_11.activated.connect(self.ReportByMask)
|
self.UI.comboBox_12.activated.connect(self.ReportSnapshotByDTC)
|
|
def _formatMsgData(self, index, item, received):
|
'''msg data to list
|
|
Arguments:
|
index {int} -- msg index
|
item {} -- recevive
|
received {-bool} -- always true
|
|
Returns:
|
[list] -- data list:
|
[index, received, TimeStamp, id,
|
RemoteFlag, ExternFlag, DataLen, data]
|
'''
|
|
data = []
|
if received:
|
data.append('{0:0>4}'.format(index))
|
data.append(item.timestamp)
|
data.append('接收')
|
else:
|
data.append('')
|
data.append(item.timestamp)
|
data.append('发送')
|
data.append(str.upper(str(hex(item.arbitration_id))).replace('X', 'x'))
|
|
data.append(item.dlc)
|
if int(item.is_extended_id) == 1:
|
data.append('扩展帧')
|
else:
|
data.append('标准帧')
|
|
data.append(' '.join(
|
['0x' + '{:0>2x}'.format(a).upper() for a in list(item.data)]))
|
return data
|
|
def can_thread_stop(self):
|
self.can_thread.join()
|
|
def can_thread_start(self):
|
self.can_thread.start()
|
|
def can_thread(self):
|
while True:
|
time.sleep(0.01)
|
while self._isOpen.value == 1:
|
time.sleep(0.01)
|
if self.needdisconnect.value == 1:
|
ret = self._usbcan.CloseDevice()
|
if ret == 1:
|
self.needdisconnect.value = 0
|
self._isOpen.value = 0
|
else:
|
msg, num = self._usbcan.Receive(len=1)
|
if not num == 0:
|
for i in range(num):
|
if msg[i].arbitration_id == self.shifter.canid.phy_txId:
|
# conn.send(msg[i]) # pipe connection send
|
self.CANtoIsoTPQueue.put(
|
msg[i]) # send msg to isotp
|
|
self.msgQueue.put(msg[i])
|
if self.boot_logger is not None:
|
self.boot_logger.on_message_received(msg[i])
|
# logger.info('time:'.format(msg[i].timestamp)+'Rx: ID=0x{:0<3x} '.format(msg[i].arbitration_id) + ' '.join(['0x' +
|
# '{:0<2x}'.format(a).upper() for a in list(msg[i].data)]))
|
|
# send signal to update screen
|
|
g_signal.sig_MsgReceived.emit(num)
|
|
# msgToSendCnt = self.sendQueue.qsize()
|
# if msgToSendCnt > 0:
|
# msg = Message()
|
# for i in range(msgToSendCnt):
|
|
# msg = self.sendQueue.get()
|
# # logger.info('Tx: ID=0x{:0<3x} '.format(
|
# # msg.arbitration_id)+'DLC={} '.format(msg.dlc)+'externd flag ={} '.format(msg.is_extended_id)+'remote frame:{} '.format(msg.is_remote_frame))
|
# self.Tool_send(msg)
|
def send_dump(self):
|
msg = Message()
|
msg.arbitration_id = self.shifter.canid.phy_rxId
|
msg.dlc = 8
|
msg.channel = 0
|
msg.data = [0x02, 0x3e, 0x80, 0, 0, 0, 0, 0]
|
msg.is_extended_id = False
|
msg.is_remote_frame = False
|
msg.is_error_frame = False
|
msg.timestamp = time.time() - self.start_timestamp
|
self.Tool_send(msg)
|
|
def send_VehiclePosition(self, data=[]):
|
msg = Message()
|
msg.arbitration_id = 0x10
|
msg.dlc = 8
|
msg.channel = 0
|
msg.data = data
|
msg.is_extended_id = False
|
msg.is_remote_frame = False
|
msg.is_error_frame = False
|
msg.timestamp = time.time() - self.start_timestamp
|
self.Tool_send(msg)
|
|
def open_close(self):
|
if self._isOpen.value == 0:
|
if self._usbcan is None:
|
self.update_HardwareDevice()
|
can_filters = [{'can_id': 0x420, 'can_mask': 0xFFFFFFFF}]
|
|
bitrate = list(
|
TIMING_DICT.keys())[self.devicedescription.baudrate]
|
self._usbcan = USBCAN(device_type=4,
|
device_index=0,
|
can_index=self.devicedescription.channel,
|
bitrate=bitrate,
|
can_filters=can_filters)
|
if not self._usbcan.InitAndStart():
|
logger.info("Open usbcan device fail.")
|
|
self._isOpen.value = 1
|
self._deviceOpenUpdate()
|
self.can_thread_start()
|
self.conn.open() # start iso tp thread
|
self.send_dump()
|
self.send_dump()
|
|
else:
|
self._usbcan.InitAndStart()
|
self._isOpen.value = 1
|
self._deviceOpenUpdate()
|
else:
|
self.needdisconnect.value = 1
|
self._deviceOpenUpdate()
|
self.TestPresentTimer.stop()
|
|
def closeEvent(self, e):
|
self.boot_logger.stop()
|
|
def update_HardwareDevice(self):
|
|
self.devicedescription.device_type = self.UI.comboBox_2.currentIndex(
|
) + 3
|
self.devicedescription.channel = self.UI.comboBox_5.currentIndex()
|
self.devicedescription.baudrate = self.UI.comboBox_3.currentIndex()
|
|
def _deviceOpenUpdate(self):
|
if self._isOpen.value == 1:
|
self.UI.pushButton.setText("Close CAN")
|
elif self.needdisconnect.value == 1:
|
self.UI.pushButton.setText("Open CAN")
|
|
# set ui info
|
|
def _StartlistenMsgProcess(self):
|
self.msgProcess = Process(name='pyUSBCANListener',
|
target=self._usbcan.ListeningMsg,
|
args=(self._isOpen, self.needdisconnect,
|
self.msgQueue, self.sendQueue))
|
self.msgProcess.daemon = True
|
self.msgProcess.start()
|
# 1.5s后检测连接状态,该值可能需要标定
|
# self.root.after(1500, func=self._checkConnectStatus)
|
while self._isOpen.value == 1:
|
self._updateRootList()
|
time.sleep(0.1)
|
|
def disp_string(self, out_s):
|
dt = datetime.datetime.now()
|
nowtime_str = dt.strftime('%I:%M:%S') # time
|
self.UI.textEdit_2.insertPlainText(nowtime_str + ': ')
|
cursor = self.UI.textEdit_2.textCursor()
|
cursor.movePosition(QtGui.QTextCursor.End)
|
self.UI.textEdit_2.setTextCursor(cursor)
|
self.UI.textEdit_2.insertPlainText(str(out_s + "\t\n"))
|
|
# self.statusbar.showMessage(str(out))
|
def dispShiftstatus(self):
|
self.UI.pushButton_41.setStyleSheet(
|
Style_dic[self.shifter.UnlockButton])
|
self.UI.pushButton_40.setStyleSheet(Style_dic[self.shifter.Pbutton])
|
#X2
|
self.UI.pushButton_37.setStyleSheet(
|
Style_dic[self.shifter.position == 2])
|
#X1
|
self.UI.pushButton_38.setStyleSheet(
|
Style_dic[self.shifter.position == 3])
|
#Z
|
self.UI.pushButton_30.setStyleSheet(
|
Style_dic[self.shifter.position == 0])
|
#Y1
|
self.UI.pushButton_33.setStyleSheet(
|
Style_dic[self.shifter.position == 4])
|
#Y2
|
self.UI.pushButton_34.setStyleSheet(
|
Style_dic[self.shifter.position == 5])
|
#M
|
self.UI.pushButton_39.setStyleSheet(
|
Style_dic[self.shifter.position == 0xe])
|
#M+
|
self.UI.pushButton_35.setStyleSheet(
|
Style_dic[self.shifter.position == 0xd])
|
#M-
|
self.UI.pushButton_36.setStyleSheet(
|
Style_dic[self.shifter.position == 0xc])
|
self.UI.pushButton_42.setStyleSheet(
|
ColorStyle_dic[self.Vehicle.ShiftLeverPos == "P"])
|
self.UI.pushButton_43.setStyleSheet(
|
ColorStyle_dic[self.Vehicle.ShiftLeverPos == "D"])
|
self.UI.pushButton_44.setStyleSheet(
|
ColorStyle_dic[self.Vehicle.ShiftLeverPos == "N"])
|
self.UI.pushButton_45.setStyleSheet(
|
ColorStyle_dic[self.Vehicle.ShiftLeverPos == "R"])
|
|
def _updateRootList(self):
|
_dataSize = self.msgQueue.qsize()
|
receiveNum = 0
|
formateddata = list()
|
if _dataSize > 5:
|
_dataSize = 5
|
for i in range(_dataSize):
|
receiveNum += 1
|
msg = self.msgQueue.get()
|
if msg.arbitration_id == 0x420:
|
self.shifter.FramUnpack(msg.arbitration_id, msg.data)
|
resp_data = self.Vehicle.ShiftLogic(self.shifter)
|
self.send_VehiclePosition(resp_data)
|
formateddata.append(self._formatMsgData(
|
receiveNum, msg, True)) # return a data list
|
self._insertDataSmooth(data=formateddata, datasize=_dataSize)
|
self.dispShiftstatus()
|
|
def _insertDataSmooth(self, data, datasize):
|
# row = 6-datasize
|
# for row in range(datasize):
|
for row in range(datasize):
|
insertdata = data[row]
|
if insertdata[3] == '0x420':
|
row = 0
|
elif insertdata[3] == "0x77a":
|
row = 1
|
for column in range(7):
|
item = QtWidgets.QTableWidgetItem()
|
item.setTextAlignment(0x84) # 文本显示位置
|
item.setText(str(insertdata[column]))
|
|
self.UI.tableWidget.setItem(row, column, item)
|
|
def ChnInfoUpdate(self, openflag):
|
pass
|
|
def TestPresentEvent(self):
|
if self.UI.radioButton.isChecked():
|
self.TestPresentTimer.start(3000)
|
else:
|
self.TestPresentTimer.stop()
|
|
def creat_testpresentReq(self):
|
self.send_dump()
|
|
def sessioncontrol_req(self):
|
sesson = self.UI.comboBox.currentIndex() + 1
|
self.udsclient.change_session(sesson)
|
|
# self.UI.comboBox_8.addItems(['0x01硬件复位', '0x03软件复位'])
|
def MCUReset_req(self):
|
req = 0x03 if self.UI.comboBox_8.currentText() == '0x01硬件复位' else 0x01
|
self.udsclient.ecu_reset(req)
|
|
def ClearDTC(self):
|
self.udsclient.clear_dtc(
|
group=DTCGroup_dic[self.UI.comboBox_7.currentText()])
|
|
def DTC_Control(self):
|
self.udsclient.control_dtc_setting(
|
setting_type=DTC_Control_dic[self.UI.comboBox_13.currentText()])
|
|
def Security_req(self):
|
self.udsclient.unlock_security_access(1)
|
# Security_dic[self.UI.comboBox_8.currentData()]
|
|
def SecurityUnlockLevel_1(self):
|
data = self.udsclient.unlock_security_access(1)
|
if data.positive:
|
self.UI.pushButton_31.setStyleSheet(
|
"background-color:rgb(255, 85, 127)")
|
|
def SecurityUnlockLevel_2(self):
|
data = self.udsclient.unlock_security_access(9)
|
if data.positive:
|
self.UI.pushButton_32.setStyleSheet(
|
"background-color:rgb(255, 85, 127)")
|
|
def SecurityUnlockLevel_3(self):
|
data = self.udsclient.unlock_security_access(0x7d)
|
if data.positive:
|
self.UI.pushButton_32.setStyleSheet(
|
"background-color:rgb(0, 128, 0)")
|
return data
|
|
def ReadSupplyID(self):
|
data = self.ReadByDID(0xF18A)
|
if data is not None:
|
self.UI.lineEdit_8.setText(str(data[0xF18A]))
|
|
def ReadVIN(self):
|
data = self.ReadByDID(0xF190)
|
if data is not None:
|
self.UI.lineEdit_3.setText(str(data[0xF190]))
|
|
def ReadMfgDate(self):
|
data = self.ReadByDID(0x210B)
|
if data is not None:
|
self.UI.lineEdit_5.setText(str(data[0x210B]))
|
|
def ReadByDID(self, didlist: List):
|
values = {}
|
try:
|
response = self.udsclient.read_data_by_identifier(didlist)
|
if response.positive:
|
values = response.service_data.values
|
return values
|
except Exception as e:
|
g_signal.sig_Disp_str.emit(e)
|
|
def DID_display(self):
|
# print(self.UI.comboBox_6.currentText())
|
self.UI.lineEdit.setText("0x{:0<4x}".format(
|
(DID_dic[self.UI.comboBox_6.currentText()])))
|
|
def ReadDataByID(self):
|
tempdid = DID_dic[self.UI.comboBox_6.currentText()]
|
self.UI.lineEdit_2.clear()
|
data = self.ReadByDID(tempdid)
|
if data is not None and len(str(data[tempdid])):
|
if '\x00' in data[tempdid]:
|
out1, out2 = data[tempdid].split('\x00', 1)
|
self.UI.lineEdit_2.setText(out1)
|
else:
|
self.UI.lineEdit_2.setText(data[tempdid])
|
|
def WriteDataByID(self):
|
tempdid = DID_dic[self.UI.comboBox_6.currentText()]
|
writedata = self.UI.lineEdit_2.text()
|
try:
|
response = self.udsclient.write_data_by_identifier(
|
tempdid, writedata)
|
if response.positive:
|
values = response.service_data.values
|
except Exception as e:
|
g_signal.sig_Disp_str.emit(e)
|
|
def communicationControl_req(self):
|
req = 0x00
|
select = self.UI.comboBox_9.currentText()
|
if select == '0x00使能收发':
|
req = 0
|
elif select == '0x01能收禁发':
|
req = 1
|
else:
|
req = 3
|
try:
|
self.udsclient.communication_control(control_type=req,
|
communication_type=1)
|
except Exception as e:
|
self.disp_string('%s' % e)
|
|
def StartCalibraiton(self):
|
sesson = 3
|
self.UI.comboBox.setCurrentIndex(sesson - 1)
|
response = self.udsclient.change_session(sesson)
|
if response.positive:
|
response = self.SecurityUnlockLevel_3()
|
if response.positive:
|
startcmd = bytearray([0x8, 0x0, 0x0, 0x0])
|
if self.RequestRoutControl(0xFFAA, bytes(startcmd)):
|
self.UI.pushButton_9.setStyleSheet(
|
"background-color:rgb(255, 85, 127)")
|
|
def Calibraiton_Z(self):
|
calcmd = bytearray([0x1, 0x0, 0x0, 0x0])
|
if self.RequestRoutControl(0xFFAA, bytes(calcmd)):
|
self.UI.pushButton_11.setStyleSheet(
|
"background-color:rgb(255, 85, 127)")
|
|
def Calibraiton_M(self):
|
calcmd = bytearray([0x0, 0x0, 0x0, 0x0])
|
if self.RequestRoutControl(0xFFAA, bytes(calcmd)):
|
self.UI.pushButton_13.setStyleSheet(
|
"background-color:rgb(255, 85, 127)")
|
|
def Calibraiton_MP(self):
|
calcmd = bytearray([0x2, 0x0, 0x0, 0x0])
|
if self.RequestRoutControl(0xFFAA, bytes(calcmd)):
|
self.UI.pushButton_18.setStyleSheet(
|
"background-color:rgb(255, 85, 127)")
|
|
def Calibraiton_MN(self):
|
calcmd = bytearray([0x3, 0x0, 0x0, 0x0])
|
if self.RequestRoutControl(0xFFAA, bytes(calcmd)):
|
self.UI.pushButton_22.setStyleSheet(
|
"background-color:rgb(255, 85, 127)")
|
|
def Calibraiton_X2(self):
|
calcmd = bytearray([0x4, 0x0, 0x0, 0x0])
|
if self.RequestRoutControl(0xFFAA, bytes(calcmd)):
|
self.UI.pushButton_24.setStyleSheet(
|
"background-color:rgb(255, 85, 127)")
|
|
def Calibraiton_X1(self):
|
calcmd = bytearray([0x5, 0x0, 0x0, 0x0])
|
if self.RequestRoutControl(0xFFAA, bytes(calcmd)):
|
self.UI.pushButton_26.setStyleSheet(
|
"background-color:rgb(255, 85, 127)")
|
|
def Calibraiton_Y1(self):
|
calcmd = bytearray([0x6, 0x0, 0x0, 0x0])
|
if self.RequestRoutControl(0xFFAA, bytes(calcmd)):
|
self.UI.pushButton_27.setStyleSheet(
|
"background-color:rgb(255, 85, 127)")
|
|
def Calibraiton_Y2(self):
|
calcmd = bytearray([0x7, 0x0, 0x0, 0x0])
|
if self.RequestRoutControl(0xFFAA, bytes(calcmd)):
|
self.UI.pushButton_28.setStyleSheet(
|
"background-color:rgb(255, 85, 127)")
|
|
def Calibraiton_GAP(self):
|
calcmd = bytearray([0x9, 0x0, 0x0, 0x0])
|
if self.RequestRoutControl(0xFFAA, bytes(calcmd)):
|
self.UI.pushButton_29.setStyleSheet(
|
"background-color:rgb(255, 85, 127)")
|
|
def RequestRoutControl(self, routine_id, data):
|
resp_1 = self.udsclient.start_routine(routine_id=routine_id, data=data)
|
return resp_1.positive
|
|
def ReportSupportDTC(self):
|
resp_1 = self.udsclient.get_supported_dtc()
|
if resp_1.positive:
|
for i in range(len(resp_1.data) // 4):
|
a = i * 4 + 2
|
b = i * 4 + 3
|
c = i * 4 + 4
|
d = i * 4 + 5
|
dtc_hex = resp_1.data[a] << 16 | resp_1.data[
|
b] << 8 | resp_1.data[c] << 0
|
log = 'DTC:0x%.6x Status:0x%.2x ' % (
|
dtc_hex, resp_1.data[d]) + DTC_DescriptionDic[dtc_hex]
|
for j in range(8):
|
if resp_1.data[d] >> j & 0x1:
|
log += ' bit%d' % j
|
self.disp_string(log)
|
|
def ReportByMask(self):
|
mask = ReportBymask_DTC[self.UI.comboBox_11.currentText()]
|
resp_1 = self.udsclient.get_dtc_by_status_mask(status_mask=mask)
|
if resp_1.positive:
|
for i in range(len(resp_1.data) // 4):
|
a = i * 4 + 2
|
b = i * 4 + 3
|
c = i * 4 + 4
|
d = i * 4 + 5
|
dtc_hex = resp_1.data[a] << 16 | resp_1.data[
|
b] << 8 | resp_1.data[c] << 0
|
log = 'DTC:0x%.2x%.2x%.2x Status:0x%.2x %s ' % (
|
resp_1.data[a], resp_1.data[b], resp_1.data[c],
|
resp_1.data[d], DTC_DescriptionDic[dtc_hex])
|
for j in range(8):
|
if resp_1.data[d] >> j & 0x1:
|
log += 'bit%d ' % j
|
self.disp_string(log)
|
|
def ReportSnapshotByDTC(self):
|
dtc = DTC_Dic[self.UI.comboBox_12.currentText()][
|
self.UI.comboBox_12.currentIndex()]
|
try:
|
resp = self.udsclient.get_dtc_snapshot_by_dtc_number(
|
dtc=dtc, record_number=1)
|
if resp.positive:
|
number_of_did = resp.data[6]
|
status = resp.data[4]
|
print(status)
|
dtc = resp.service_data.dtcs[0]
|
snapshots = dtc.snapshots
|
# print(dtc.id)
|
# print(dtc.status.get_byte())
|
log = "DTC:0x%.6x Status:0x%.2x" % (dtc.id, status) + ' '
|
# print(type(snapshots[0].did))
|
# print(type(snapshots[0].data))
|
for i in range(number_of_did):
|
log += (DTC_SanpshotDescriptionDic[snapshots[i].did] +
|
':' + snapshots[i].data + ' ')
|
self.disp_string(log)
|
|
# print(snapshots[i].did, snapshots[i].data)
|
except Exception as e:
|
self.disp_string('%s' % e)
|
|
def SetFromBootFlag(self):
|
self.startfromboot = self.UI.radioButton_2.isChecked()
|
|
def pre_programming(self):
|
if not self.startfromboot:
|
self.disp_string('# 预编程步骤')
|
|
# 进入extended session
|
self.disp_string('>>> 进入扩展模式')
|
response = self.udsclient.change_session(3)
|
if response.positive:
|
self.disp_string('>>> 解锁安全访问')
|
response = self.udsclient.unlock_security_access(9)
|
|
if response.positive:
|
# 检查编程条件
|
self.disp_string('>>> 检查编程条件')
|
response = self.udsclient.start_routine(0xff02)
|
if response.positive:
|
# 关闭DTC的存储
|
self.disp_string('>>> 关闭DTC的存储')
|
response = self.udsclient.control_dtc_setting(2)
|
# print(response)
|
if response.positive:
|
# 关闭与诊断无关的报文
|
self.disp_string('>>> 关闭与诊断无关的报文')
|
response = self.udsclient.communication_control(0x03, 0x01)
|
else:
|
self.disp_string('>>> 预编程失败')
|
|
return response.positive
|
# print(response)
|
|
def main_programming(self):
|
self.disp_string('# 主编程步骤')
|
|
if self.startfromboot:
|
self.disp_string('>>> 进入扩展模式')
|
response = self.udsclient.change_session(3)
|
|
# 进入programming session
|
self.disp_string('>>> 进入编程模式')
|
response = self.udsclient.change_session(2)
|
# print(response)
|
if response.positive:
|
# 安全访问
|
self.disp_string('>>> 安全访问')
|
response = self.udsclient.unlock_security_access(9)
|
# print(response)
|
if response.positive:
|
self.disp_string('>>> 请求下载驱动文件')
|
address = 0x20004600
|
memorysize = 10
|
memory_location = MemoryLocation(address, memorysize, 32, 32)
|
response = self.udsclient.request_download(memory_location)
|
# print(response)
|
if response.positive:
|
# 发送driver文件
|
self.disp_string('>>> 发送driver文件')
|
drv_file = self.drv_data
|
response = self.udsclient.transfer_data(1, bytes(drv_file))
|
self.set_progressbar_pos(len(drv_file))
|
# print(response)
|
if response.positive:
|
self.disp_string('>>> 发送驱动结束,请求推出')
|
response = self.udsclient.request_transfer_exit()
|
# print(response)
|
if response.positive:
|
# driver文件完整性检验
|
self.disp_string('>>> driver文件完整性检验')
|
drivecrc = bytes([0x9B, 0x39, 0xf2, 0xec])
|
response = self.udsclient.start_routine(0xf001, drivecrc)
|
# print(response)
|
if response.positive:
|
app_file = self.app_data
|
address = app_file[APP_ADDR_LOCATION_OFFSET] << 24 | app_file[
|
APP_ADDR_LOCATION_OFFSET + 1] << 16 | app_file[
|
APP_ADDR_LOCATION_OFFSET +
|
2] << 8 | app_file[APP_ADDR_LOCATION_OFFSET + 3] << 0
|
memorysize = app_file[APP_LENGTH_LOCATION_OFFSET] << 24 | app_file[
|
APP_LENGTH_LOCATION_OFFSET + 1] << 16 | app_file[
|
APP_LENGTH_LOCATION_OFFSET +
|
2] << 8 | app_file[APP_LENGTH_LOCATION_OFFSET + 3] << 0
|
memory_location = MemoryLocation(address, memorysize, 32, 32)
|
|
# 删除app存储空间
|
self.disp_string('>>> 删除app存储空间')
|
data = b''
|
data += memory_location.alfid.get_byte()
|
data += memory_location.get_address_bytes()
|
data += memory_location.get_memorysize_bytes()
|
response = self.udsclient.start_routine(0xff00, data)
|
# print(response)
|
if response.positive:
|
# 发送app文件
|
self.disp_string('>>> 发送app文件')
|
response = self.udsclient.request_download(memory_location)
|
# print(response)
|
|
max_length = response.service_data.max_length
|
|
# 有效数据长度, 去除sid和sequence两个字节
|
payload_length = max_length - 2
|
|
count = (len(app_file) + payload_length - 1) // payload_length
|
|
base = self.get_progressbar_pos()
|
for i in range(count):
|
start = i * payload_length
|
end = start + payload_length
|
response = self.udsclient.transfer_data((i + 1) % 256,
|
app_file[start:end])
|
self.set_progressbar_pos(base + end)
|
# print(response)
|
if response.positive:
|
response = self.udsclient.request_transfer_exit()
|
# print(response)
|
if response.positive:
|
# app文件完整性检验
|
self.disp_string('>>> app文件完整性检验')
|
response = self.udsclient.start_routine(0xf001, app_file[0:4])
|
# print(response)
|
return response.positive
|
|
def post_programming(self):
|
|
self.disp_string('# 后编程步骤')
|
|
# 回到default session
|
self.disp_string('>>> 回到默认模式')
|
|
response = self.udsclient.change_session(1)
|
return response.positive
|
|
def start_programming(self):
|
self.UI.pushButton_48.setDisabled(True)
|
self.set_progressbar_pos(0)
|
|
t1 = time.time()
|
|
try:
|
|
self.pre_programming()
|
self.main_programming()
|
self.post_programming()
|
except Exception as e:
|
|
self.disp_string('%s' % e)
|
# self.UI.pushButton_48.setDisabled(False)
|
|
t2 = time.time()
|
|
self.disp_string('finished in %.2f sec' % (t2 - t1))
|
self.in_programming = False
|
self.UI.pushButton_48.setDisabled(False)
|
|
def set_progressbar_pos(self, pos):
|
|
drv_data_len = len(self.drv_data) if self.drv_data is not None else 0
|
app_data_len = len(self.app_data) if self.app_data is not None else 0
|
total_len = drv_data_len + app_data_len
|
if pos > total_len:
|
pos = total_len
|
elif pos < 0:
|
pos = 0
|
|
self.UI.progressBar.setValue(pos)
|
|
def get_progressbar_pos(self):
|
return self.UI.progressBar.value()
|
|
def loadAppfile(self):
|
if self.in_programming:
|
self.disp_string('## 正在编程中')
|
return
|
|
fileName, _ = QtWidgets.QFileDialog.getOpenFileName(
|
None, 'comboBox_4', '.',
|
'All Files (*);;Bin Files (*.bin);;Hex Files (*.hex)')
|
|
if fileName != '':
|
self.UI.comboBox_4.addItem(fileName)
|
self.UI.comboBox_4.setCurrentText(fileName)
|
|
# with open(fileName, 'rb') as fd:
|
# self.app_data = fd.read()
|
# drv_data_len = len(
|
# self.drv_data) if self.drv_data is not None else 0
|
# app_data_len = len(
|
# self.app_data) if self.app_data is not None else 0
|
# total_len = drv_data_len + app_data_len
|
# self.UI.progressBar.setRange(0, total_len)
|
|
# print(self.app_data)
|
# print("===============================================")
|
hex_read = Hex_read()
|
hex_read.Open_file(fileName)
|
self.app_data = hex_read.data
|
drv_data_len = len(
|
self.drv_data) if self.drv_data is not None else 0
|
app_data_len = len(
|
self.app_data) if self.app_data is not None else 0
|
total_len = drv_data_len + app_data_len
|
self.UI.progressBar.setRange(0, total_len)
|
|
def SetBootLog(self):
|
if self.in_programming:
|
self.disp_string('## 正在编程中')
|
return
|
|
fileName, _ = QtWidgets.QFileDialog.getSaveFileName(
|
None, 'comboBox_10', '.',
|
'All Files (*);;ASC Files (*.asc);;Blf Files (*.blf)')
|
|
if fileName != '':
|
self.UI.comboBox_10.addItem(fileName)
|
self.UI.comboBox_10.setCurrentText(fileName)
|
print(fileName)
|
self.boot_logger = Logger(fileName)
|
# self.listener = [self.boot_logger]
|
# with open(fileName, 'rb') as fd:
|
# self.app_data = fd.read()
|
# drv_data_len = len(
|
# self.drv_data) if self.drv_data is not None else 0
|
# app_data_len = len(
|
# self.app_data) if self.app_data is not None else 0
|
# total_len = drv_data_len + app_data_len
|
# self.UI.progressBar.setRange(0, total_len)
|