| | |
| | | from ast import Not, Pass |
| | | from asyncio.windows_events import NULL |
| | | from concurrent.futures import thread |
| | | from curses import flash |
| | | from logging import exception |
| | | import threading |
| | | from typing import List |
| | | from can import BusABC, Message |
| | | from can import BusABC, Message, Logger, Listener |
| | | from udsoncan.client import Client |
| | | from udsoncan.exceptions import TimeoutException |
| | | import udsoncan |
| | |
| | | import datetime |
| | | from ShifterDefine import * |
| | | from multiprocessing import Process, Queue, Value, Pipe |
| | | from hexread import * |
| | | |
| | | MAX_RCV_NUM = 20 |
| | | APP_ADDR_LOCATION_OFFSET = 8 |
| | |
| | | self.sendQueue = Queue() # can layer send queue |
| | | self.CANtoIsoTPQueue = Queue() # CAN --> isotp |
| | | self.shifter = ShifterClass() |
| | | self.Vehicle = VehicleClass() |
| | | self.devicedescription = HardwreDevice() |
| | | self.windows = QtWidgets.QMainWindow() |
| | | self.UI = Ui_MainWindow() |
| | | self.UI.setupUi(window) |
| | | self._dev_info = None |
| | | self.dbc = cantools.database.load_file("DBC/SX7H.dbc") |
| | | self.can_thread = threading.Thread(target=self.can_thread) |
| | | self.TestPresentTimer = QtCore.QTimer() |
| | | self.TestPresentTimer.timeout.connect(self.creat_testpresentReq) |
| | |
| | | 0x1c, 0x01, 0x06, 0x80, 0x1f, 0x01, 0x06, 0x80, 0xfb, 0x0a |
| | | ] |
| | | self.app_data: bytes = None |
| | | self.start_timestamp = time.time() |
| | | self.boot_logger = None |
| | | |
| | | def DeviceInit(self): |
| | | self._usbcan = None |
| | |
| | | msg.channel = 0 |
| | | msg.data = isotp_msg.data # bytearray |
| | | msg.is_extended_id = False |
| | | msg.timestamp = time.time() - self.start_timestamp |
| | | msg.is_error_frame = False |
| | | msg.is_remote_frame = False |
| | | self.Tool_send(msg) |
| | | |
| | | def Tool_send(self, msg: Message): |
| | | self._usbcan.send(msg) |
| | | if self.boot_logger is not None: |
| | | self.boot_logger.on_message_received(msg) |
| | | |
| | | def WidgetsInit(self): |
| | | # self.UI.setupUi(self) |
| | |
| | | self.UI.pushButton_48.clicked.connect(self.start_programming) |
| | | self.UI.pushButton_46.clicked.connect(self.loadAppfile) |
| | | self.UI.radioButton_2.toggled.connect(self.SetFromBootFlag) |
| | | |
| | | self.UI.pushButton_47.clicked.connect(self.SetBootLog) |
| | | # self.UI.comboBox_10.activated.connect(self.ReportByMask) |
| | | |
| | | self.UI.comboBox_11.addItems(list(ReportBymask_DTC.keys())) |
| | | self.UI.comboBox_12.addItems(list(DTC_Dic.keys())) |
| | |
| | | msg[i]) # send msg to isotp |
| | | |
| | | self.msgQueue.put(msg[i]) |
| | | if self.boot_logger is not None: |
| | | self.boot_logger.on_message_received(msg[i]) |
| | | # logger.info('time:'.format(msg[i].timestamp)+'Rx: ID=0x{:0<3x} '.format(msg[i].arbitration_id) + ' '.join(['0x' + |
| | | # '{:0<2x}'.format(a).upper() for a in list(msg[i].data)])) |
| | | |
| | | # send signal to update screen |
| | | |
| | | g_signal.sig_MsgReceived.emit(num) |
| | | |
| | | # msgToSendCnt = self.sendQueue.qsize() |
| | | # if msgToSendCnt > 0: |
| | | # msg = Message() |
| | |
| | | # msg = self.sendQueue.get() |
| | | # # logger.info('Tx: ID=0x{:0<3x} '.format( |
| | | # # msg.arbitration_id)+'DLC={} '.format(msg.dlc)+'externd flag ={} '.format(msg.is_extended_id)+'remote frame:{} '.format(msg.is_remote_frame)) |
| | | # self._usbcan.send(msg) |
| | | # self.Tool_send(msg) |
| | | def send_dump(self): |
| | | msg = Message() |
| | | msg.arbitration_id = self.shifter.canid.phy_rxId |
| | |
| | | msg.channel = 0 |
| | | msg.data = [0x02, 0x3e, 0x80, 0, 0, 0, 0, 0] |
| | | msg.is_extended_id = False |
| | | self._usbcan.send(msg) |
| | | msg.is_remote_frame = False |
| | | msg.is_error_frame = False |
| | | msg.timestamp = time.time() - self.start_timestamp |
| | | self.Tool_send(msg) |
| | | |
| | | def send_VehiclePosition(self, data=[]): |
| | | msg = Message() |
| | | msg.arbitration_id = 0x10 |
| | | msg.dlc = 8 |
| | | msg.channel = 0 |
| | | msg.data = data |
| | | msg.is_extended_id = False |
| | | msg.is_remote_frame = False |
| | | msg.is_error_frame = False |
| | | msg.timestamp = time.time() - self.start_timestamp |
| | | self.Tool_send(msg) |
| | | |
| | | def open_close(self): |
| | | if self._isOpen.value == 0: |
| | |
| | | TIMING_DICT.keys())[self.devicedescription.baudrate] |
| | | self._usbcan = USBCAN(device_type=4, |
| | | device_index=0, |
| | | can_index=0, |
| | | can_index=self.devicedescription.channel, |
| | | bitrate=bitrate, |
| | | can_filters=can_filters) |
| | | if not self._usbcan.InitAndStart(): |
| | |
| | | self.needdisconnect.value = 1 |
| | | self._deviceOpenUpdate() |
| | | self.TestPresentTimer.stop() |
| | | |
| | | def closeEvent(self, e): |
| | | self.boot_logger.stop() |
| | | |
| | | def update_HardwareDevice(self): |
| | | |
| | |
| | | self.UI.pushButton_41.setStyleSheet( |
| | | Style_dic[self.shifter.UnlockButton]) |
| | | self.UI.pushButton_40.setStyleSheet(Style_dic[self.shifter.Pbutton]) |
| | | #X2 |
| | | self.UI.pushButton_37.setStyleSheet( |
| | | Style_dic[self.shifter.position == 2]) |
| | | #X1 |
| | | self.UI.pushButton_38.setStyleSheet( |
| | | Style_dic[self.shifter.position == 3]) |
| | | #Z |
| | | self.UI.pushButton_30.setStyleSheet( |
| | | Style_dic[self.shifter.position == 0]) |
| | | #Y1 |
| | | self.UI.pushButton_33.setStyleSheet( |
| | | Style_dic[self.shifter.position == 4]) |
| | | #Y2 |
| | | self.UI.pushButton_34.setStyleSheet( |
| | | Style_dic[self.shifter.position == 5]) |
| | | #M |
| | | self.UI.pushButton_39.setStyleSheet( |
| | | Style_dic[self.shifter.position == 0xe]) |
| | | #M+ |
| | | self.UI.pushButton_35.setStyleSheet( |
| | | Style_dic[self.shifter.position == 0xd]) |
| | | #M- |
| | | self.UI.pushButton_36.setStyleSheet( |
| | | Style_dic[self.shifter.position == 0xc]) |
| | | self.UI.pushButton_42.setStyleSheet( |
| | | ColorStyle_dic[self.Vehicle.ShiftLeverPos == "P"]) |
| | | self.UI.pushButton_43.setStyleSheet( |
| | | ColorStyle_dic[self.Vehicle.ShiftLeverPos == "D"]) |
| | | self.UI.pushButton_44.setStyleSheet( |
| | | ColorStyle_dic[self.Vehicle.ShiftLeverPos == "N"]) |
| | | self.UI.pushButton_45.setStyleSheet( |
| | | ColorStyle_dic[self.Vehicle.ShiftLeverPos == "R"]) |
| | | |
| | | def _updateRootList(self): |
| | | _dataSize = self.msgQueue.qsize() |
| | |
| | | for i in range(_dataSize): |
| | | receiveNum += 1 |
| | | msg = self.msgQueue.get() |
| | | # self.shifter.FramUnpack(msg.arbitration_id, msg.data) |
| | | if msg.arbitration_id == 0x420: |
| | | self.shifter.FramUnpack(msg.arbitration_id, msg.data) |
| | | resp_data = self.Vehicle.ShiftLogic(self.shifter) |
| | | self.send_VehiclePosition(resp_data) |
| | | formateddata.append(self._formatMsgData( |
| | | receiveNum, msg, True)) # return a data list |
| | | self._insertDataSmooth(data=formateddata, datasize=_dataSize) |
| | | # self.dispShiftstatus() |
| | | self.dispShiftstatus() |
| | | |
| | | def _insertDataSmooth(self, data, datasize): |
| | | # row = 6-datasize |
| | |
| | | self.UI.lineEdit_2.clear() |
| | | data = self.ReadByDID(tempdid) |
| | | if data is not None and len(str(data[tempdid])): |
| | | # print(data[tempdid]) |
| | | self.UI.lineEdit_2.setText(data[tempdid]) |
| | | if '\x00' in data[tempdid]: |
| | | out1, out2 = data[tempdid].split('\x00', 1) |
| | | self.UI.lineEdit_2.setText(out1) |
| | | else: |
| | | self.UI.lineEdit_2.setText(data[tempdid]) |
| | | |
| | | def WriteDataByID(self): |
| | | tempdid = DID_dic[self.UI.comboBox_6.currentText()] |
| | |
| | | if fileName != '': |
| | | self.UI.comboBox_4.addItem(fileName) |
| | | self.UI.comboBox_4.setCurrentText(fileName) |
| | | with open(fileName, 'rb') as fd: |
| | | self.app_data = fd.read() |
| | | drv_data_len = len( |
| | | self.drv_data) if self.drv_data is not None else 0 |
| | | app_data_len = len( |
| | | self.app_data) if self.app_data is not None else 0 |
| | | total_len = drv_data_len + app_data_len |
| | | self.UI.progressBar.setRange(0, total_len) |
| | | |
| | | # with open(fileName, 'rb') as fd: |
| | | # self.app_data = fd.read() |
| | | # drv_data_len = len( |
| | | # self.drv_data) if self.drv_data is not None else 0 |
| | | # app_data_len = len( |
| | | # self.app_data) if self.app_data is not None else 0 |
| | | # total_len = drv_data_len + app_data_len |
| | | # self.UI.progressBar.setRange(0, total_len) |
| | | |
| | | # print(self.app_data) |
| | | # print("===============================================") |
| | | hex_read = Hex_read() |
| | | hex_read.Open_file(fileName) |
| | | self.app_data = hex_read.data |
| | | drv_data_len = len( |
| | | self.drv_data) if self.drv_data is not None else 0 |
| | | app_data_len = len( |
| | | self.app_data) if self.app_data is not None else 0 |
| | | total_len = drv_data_len + app_data_len |
| | | self.UI.progressBar.setRange(0, total_len) |
| | | |
| | | def SetBootLog(self): |
| | | if self.in_programming: |
| | | self.disp_string('## 正在编程中') |
| | | return |
| | | |
| | | fileName, _ = QtWidgets.QFileDialog.getSaveFileName( |
| | | None, 'comboBox_10', '.', |
| | | 'All Files (*);;ASC Files (*.asc);;Blf Files (*.blf)') |
| | | |
| | | if fileName != '': |
| | | self.UI.comboBox_10.addItem(fileName) |
| | | self.UI.comboBox_10.setCurrentText(fileName) |
| | | print(fileName) |
| | | self.boot_logger = Logger(fileName) |
| | | # self.listener = [self.boot_logger] |
| | | # with open(fileName, 'rb') as fd: |
| | | # self.app_data = fd.read() |
| | | # drv_data_len = len( |
| | | # self.drv_data) if self.drv_data is not None else 0 |
| | | # app_data_len = len( |
| | | # self.app_data) if self.app_data is not None else 0 |
| | | # total_len = drv_data_len + app_data_len |
| | | # self.UI.progressBar.setRange(0, total_len) |