from ShifterDefine import *
|
import struct
|
from udsoncan import DidCodec
|
|
import configparser
|
import cantools
|
|
ProjectDBC = cantools.database.load_file("DBC/SX7H.dbc")
|
|
|
class AsciiCodec(DidCodec):
|
|
def __init__(self, string_len=None):
|
if string_len is None:
|
raise ValueError("You must provide a length to the AsciiCodec")
|
self.string_len = string_len
|
|
def encode(self, string_ascii):
|
if len(string_ascii) > self.string_len:
|
string_ascii = string_ascii[0:self.string_len]
|
|
elif len(string_ascii) < self.string_len:
|
for i in range(self.string_len - len(string_ascii)):
|
string_ascii += ' '
|
|
return string_ascii.encode('ascii')
|
|
def decode(self, string_bin):
|
string_ascii = string_bin.decode('ascii')
|
if len(string_ascii) > self.string_len:
|
string_ascii = string_ascii[0:self.string_len]
|
|
return string_ascii.strip()
|
|
def __len__(self):
|
return self.string_len
|
|
|
class PartNumberCodec(DidCodec):
|
|
def __init__(self, byte_len=None):
|
if byte_len is None:
|
raise ValueError(
|
"You must provide a length to the PartNumberCodec")
|
self.byte_len = byte_len
|
|
def encode(self, string_ascii):
|
if self.byte_len == 1:
|
data = struct.pack('>B', int(string_ascii))
|
elif self.byte_len == 2:
|
data = struct.pack('>H', int(string_ascii))
|
elif self.byte_len == 4:
|
data = struct.pack('>L', int(string_ascii))
|
elif self.byte_len == 8:
|
data = struct.pack('>Q', int(string_ascii))
|
|
if len(data) > self.byte_len:
|
data = data[0:self.byte_len]
|
|
return data
|
|
def decode(self, byte_data):
|
data = 0
|
for i in byte_data:
|
data = data << 8 | i
|
return str(data)
|
|
def __len__(self):
|
return self.byte_len
|
|
|
class PartNumberVersionCodec(DidCodec):
|
|
def __init__(self, byte_len=None):
|
if byte_len is None:
|
raise ValueError(
|
"You must provide a length to the PartNumberCodec")
|
self.byte_len = byte_len
|
|
def encode(self, string_ascii):
|
if self.byte_len == 1:
|
data = struct.pack('>B', int(string_ascii))
|
elif self.byte_len == 2:
|
data = struct.pack('>H', int(string_ascii))
|
elif self.byte_len == 4:
|
data = struct.pack('>L', int(string_ascii))
|
elif self.byte_len == 8:
|
data = struct.pack('>Q', int(string_ascii))
|
|
if len(data) > self.byte_len:
|
data = data[0:self.byte_len]
|
|
return data
|
|
def decode(self, byte_data):
|
ret = ''
|
for i in byte_data:
|
ret += '%.1x' % i
|
return ret
|
|
def __len__(self):
|
return self.byte_len
|
|
|
class BCDCodec(DidCodec):
|
|
def __init__(self, byte_len=None):
|
self.byte_len = byte_len
|
|
def encode(self, string_ascii):
|
string_ascii = string_ascii.strip()
|
|
if len(string_ascii) > self.byte_len * 2:
|
string_ascii = string_ascii[0:self.byte_len * 2]
|
elif len(string_ascii) < self.byte_len * 2:
|
for i in range(self.byte_len * 2 - len(string_ascii)):
|
string_ascii += '0'
|
|
data = bytearray()
|
for i in range(self.byte_len):
|
part = string_ascii[2 * i:2 * i + 2]
|
data.append(int(part, 16))
|
|
return data
|
|
def decode(self, byte_data):
|
ret = ''
|
for i in byte_data:
|
ret += '%.2x' % i
|
|
return ret
|
|
def __len__(self):
|
return self.byte_len
|
|
|
UnlockButton_dic = {"Unlock Button No Request": 0, "Unlock Button Request": 1}
|
|
|
class ShifterClass():
|
|
def __init__(self):
|
self.position = None
|
self.Pbutton = None
|
self.UnlockButton = None
|
self.Voltage = 0
|
self.canid = ShifterCANID()
|
|
self.did_config = {
|
0x2100: PartNumberCodec(1),
|
0x2101: PartNumberCodec(1),
|
0x2103: AsciiCodec(8),
|
0x2104: AsciiCodec(4),
|
0x2105: AsciiCodec(16),
|
0x2106: PartNumberCodec(1),
|
0xF187: AsciiCodec(16),
|
0x2108: AsciiCodec(18),
|
0xF18A: AsciiCodec(10),
|
0x210B: BCDCodec(4),
|
0xF18C: AsciiCodec(14),
|
0xF190: AsciiCodec(17),
|
0xF193: AsciiCodec(26),
|
0xF195: AsciiCodec(28),
|
0x2110: AsciiCodec(8),
|
0x2111: AsciiCodec(16),
|
0x2112: BCDCodec(4),
|
0x2113: BCDCodec(4),
|
0x2116: PartNumberCodec(2),
|
0x2118: PartNumberCodec(1),
|
0x2119: PartNumberCodec(1),
|
0x211A: PartNumberCodec(1),
|
0x211B: PartNumberCodec(1),
|
0x211c: PartNumberCodec(2),
|
0xF197: AsciiCodec(8),
|
0xF15a: AsciiCodec(10),
|
0xF15B: AsciiCodec(10),
|
0xDF00: PartNumberCodec(1),
|
0xDF01: PartNumberCodec(2),
|
0xDF02: PartNumberCodec(1),
|
0xDF03: PartNumberCodec(3),
|
0xDF04: PartNumberCodec(3),
|
0x1000: PartNumberCodec(2),
|
0xF010: AsciiCodec(16),
|
0xF019: AsciiCodec(16),
|
0x2102: AsciiCodec(64)
|
}
|
try:
|
config = configparser.ConfigParser()
|
config.read('config.ini')
|
self.canid.phy_txId = int(config['CAN_ID']['tx_id'], 16)
|
self.canid.phy_rxId = int(config['CAN_ID']['rx_id'], 16)
|
self.canid.fun_rxId = int(config['CAN_ID']['fnrx_id'], 16)
|
self.canid.normalid = int(config['CAN_ID']['normal_tx_id'], 16)
|
except Exception as e:
|
self.canid.phy_txId = 0x77A
|
self.canid.phy_rxId = 0x742
|
self.canid.fun_rxId = 0x7df
|
self.canid.normalid = 0x420
|
self.SA1_message = ProjectDBC.get_message_by_name('SA1')
|
|
def FramUnpack(self, id=0x420, frame=[]):
|
input_signal = ProjectDBC.decode_message(id, frame)
|
# part_str = str(input_signal['SA1_Status_ParkButtonReq'])
|
# print(type(part_str))
|
# if input_signal['SA1_Status_ParkButtonReq'] == 'No request':
|
# print('yes')
|
# print(SA1_Status_ParkButtonReq_dic[part_str])
|
self.Pbutton = SA1_Status_ParkButtonReq_dic[str(
|
input_signal['SA1_Status_ParkButtonReq'])]
|
self.UnlockButton = SA1_Status_UnlockButtonReq_dic[str(
|
input_signal['SA1_Status_UnlockButtonReq'])]
|
self.position = SA1_Status_GearShftPosReq_dic[str(
|
input_signal['SA1_Status_GearShftPosReq'])]
|
|
def PackFrame(self, dic):
|
data = self.SA1_message.encode({
|
'SA1_Status_PRNDL':
|
0,
|
'SA1_Status_GearShftPosReq':
|
SA1_Status_GearShftPosReq_dic[self.position],
|
'SA1_Status_ParkButtonReq':
|
SA1_Status_ParkButtonReq_dic[self.Pbutton],
|
"SA1_ShifterManualSignal":
|
0,
|
"SA1_ShifterModeSignal":
|
0,
|
"SA1_Status_ShftSensSng_Fault":
|
0,
|
"SA1_IND_ShifterMisUsd":
|
0,
|
"SA1_Live_counter":
|
1,
|
"SA1_Status_UnlockButtonReq":
|
SA1_Status_UnlockButtonReq_dic[self.UnlockButton],
|
"SA1_Status_EcoShifterModeReq":
|
0,
|
"SA1_Status_RqGearPosInV":
|
0xf,
|
"SA1_Status_ShiftPosValidFlag":
|
0,
|
})
|
return data
|
|
|
class VehicleClass():
|
|
def __init__(self):
|
self.Odm = 0
|
self.speed = 0
|
self.brake = 0
|
self.ShiftLeverPos = 'P'
|
self.pre_ShiftLeverPos = 'P'
|
self.max_pos = 'Shifter position Zero'
|
self.IC1_message = ProjectDBC.get_message_by_name('IC1')
|
self.TCU2_message = ProjectDBC.get_message_by_name('TCU2')
|
self.ABS1_message = ProjectDBC.get_message_by_name('ABS1')
|
self.Engine2_message = ProjectDBC.get_message_by_name('Engine2')
|
|
def pack_IC1(self, odm):
|
self.Odm = odm
|
data = self.IC1_message.encode({"odometer_value": odm})
|
return data
|
|
def pack_TCU2(self, shiftpos):
|
#from value get key
|
self.ShiftLeverPos = [
|
k for k, v in TCU2_ShiterLevel_dic.items() if v == shiftpos
|
][0]
|
data = self.TCU2_message.encode({
|
"ShiftLeverPos": shiftpos,
|
"TCUDrivingMode": 0,
|
"ActualGear": 0
|
})
|
return data
|
|
def pack_ABS1(self, speed):
|
self.speed = speed
|
data = self.ABS1_message.encode({"vehicle_speed": speed})
|
return data
|
|
def pack_Engine2(self, brake):
|
self.brake = brake
|
data = self.Engine2_message.encode({"brake_pedal_status ": brake})
|
return data
|
|
def ShiftLogic(self, shift: ShifterClass):
|
reqpos = 0
|
if shift.Pbutton == SA1_Status_ParkButtonReq_dic[
|
'Driver request park button']:
|
reqpos = TCU2_ShiterLevel_dic['P']
|
self.pre_ShiftLeverPos = "P"
|
self.max_pos = 'Shifter position Zero'
|
elif (shift.position >= SA1_Status_GearShftPosReq_dic['M-']) and (
|
shift.position <= SA1_Status_GearShftPosReq_dic['M']):
|
reqpos = TCU2_ShiterLevel_dic['M']
|
self.pre_ShiftLeverPos = "P"
|
self.max_pos = 'Shifter position Zero'
|
elif shift.position is SA1_Status_GearShftPosReq_dic[
|
'Shifter not initialized']:
|
pass
|
else:
|
if shift.position is not SA1_Status_GearShftPosReq_dic[
|
'Shifter position Zero']:
|
self.max_pos = ShiftMaxPosMap_dic[self.max_pos][shift.position]
|
reqpos = GearShiftMap_dic[self.pre_ShiftLeverPos][self.max_pos]
|
else:
|
reqpos = GearShiftMap_dic[self.pre_ShiftLeverPos][self.max_pos]
|
self.max_pos = 'Shifter position Zero'
|
self.pre_ShiftLeverPos = [
|
k for k, v in TCU2_ShiterLevel_dic.items() if v == reqpos
|
][0]
|
|
data = self.pack_TCU2(reqpos)
|
return data
|