| | |
| | | |
| | | from ast import Pass |
| | | import threading |
| | | from can import BusABC, Message |
| | | from udsoncan.client import Client |
| | |
| | | class UISignals(QtCore.QObject): |
| | | sig_Disp_str = QtCore.Signal(str) |
| | | sig_MsgReceived = QtCore.Signal(int) |
| | | |
| | | |
| | | g_signal = UISignals() |
| | | |
| | | |
| | | class HardwreDevice(): |
| | |
| | | # self.IsoTPtoUDSQueue.qsize())) |
| | | |
| | | # time.sleep(self.isotp_layer.sleep_time()) |
| | | time.sleep(0.0001) |
| | | time.sleep(0.001) |
| | | |
| | | except Exception as e: |
| | | self.exit_requested = True |
| | |
| | | # self.protocol("WM_DELETE_WINDOW", self.Form_OnClosing) |
| | | self.msgQueue = Queue() # can layer receive queue |
| | | self.sendQueue = Queue() # can layer send queue |
| | | self.signal = UISignals() |
| | | |
| | | self.shifter = ShifterClass() |
| | | self.devicedescription = HardwreDevice() |
| | |
| | | self.UI.setupUi(window) |
| | | self._dev_info = None |
| | | self.can_thread = threading.Thread(target=self.can_thread) |
| | | |
| | | self.TestPresentTimer = QtCore.QTimer() |
| | | self.TestPresentTimer.timeout.connect(self.creat_testpresentReq) |
| | | self.DeviceInit() |
| | | self.WidgetsInit() |
| | | self.ChnInfoUpdate(self._isOpen) |
| | |
| | | msg = Message() |
| | | msg.arbitration_id = isotp_msg.arbitration_id |
| | | msg.dlc = isotp_msg.dlc |
| | | msg.data = isotp_msg.data |
| | | msg.channel = 0 |
| | | msg.data = isotp_msg.data # bytearray |
| | | msg.is_extended_id = False |
| | | # for i in range(isotp_msg.dlc): |
| | | # msg.data[i] = isotp_msg.data[i] |
| | |
| | | self.UI.comboBox_3.activated.connect(self.update_HardwareDevice) |
| | | self.UI.comboBox_5.activated.connect(self.update_HardwareDevice) |
| | | |
| | | self.signal.sig_Disp_str.connect(self.disp_string) |
| | | self.signal.sig_MsgReceived.connect(self._updateRootList) |
| | | g_signal.sig_Disp_str.connect(self.disp_string) |
| | | g_signal.sig_MsgReceived.connect(self._updateRootList) |
| | | |
| | | self.initUDSUI() |
| | | |
| | | def initUDSUI(self): |
| | | self.UI.comboBox.addItems(['0x01默认模式', '0x02编程模式', '0x03编程模式']) |
| | | self.UI.comboBox.addItems(['0x01默认模式', '0x02编程模式', '0x03扩展模式']) |
| | | self.UI.comboBox.activated.connect(self.sessioncontrol_req) |
| | | |
| | | self.UI.comboBox_8.addItems(['0x01硬件复位', '0x03软件复位']) |
| | | self.UI.comboBox_8.activated.connect(self.MCUReset_req) |
| | | |
| | | self.UI.comboBox_9.addItems(['0x00使能收发', '0x01能收禁发', '0x03禁止收发']) |
| | | self.UI.comboBox_9.activated.connect(self.communicationControl_req) |
| | | |
| | | self.UI.radioButton.toggled.connect(self.TestPresentEvent) |
| | | |
| | | def _formatMsgData(self, index, item, received): |
| | |
| | | msg[i]) # send msg to isotp |
| | | self.msgQueue.put(msg[i]) |
| | | # send signal to update screen |
| | | logger.info('Rx: ID=0x{:0<3x} '.format(msg[i].arbitration_id) + ' '.join(['0x' + |
| | | '{:0<2x}'.format(a).upper() for a in list(msg[i].data)])) |
| | | self.signal.sig_MsgReceived.emit(num) |
| | | # logger.info('Rx: ID=0x{:0<3x} '.format(msg[i].arbitration_id) + ' '.join(['0x' + |
| | | # '{:0<2x}'.format(a).upper() for a in list(msg[i].data)])) |
| | | g_signal.sig_MsgReceived.emit(num) |
| | | msgToSendCnt = self.sendQueue.qsize() |
| | | if msgToSendCnt > 0: |
| | | msg = Message() |
| | | for i in range(msgToSendCnt): |
| | | |
| | | msg = self.sendQueue.get() |
| | | logger.info('Tx: ID=0x{:0<3x} '.format( |
| | | msg.arbitration_id)+'DLC={} '.format(msg.dlc)+'externd flag ={} '.format(msg.is_extended_id)+'remote frame:{} '.format(msg.is_remote_frame)) |
| | | # logger.info('Tx: ID=0x{:0<3x} '.format( |
| | | # msg.arbitration_id)+'DLC={} '.format(msg.dlc)+'externd flag ={} '.format(msg.is_extended_id)+'remote frame:{} '.format(msg.is_remote_frame)) |
| | | self._usbcan.send(msg) |
| | | |
| | | def open_close(self): |
| | |
| | | pass |
| | | |
| | | def TestPresentEvent(self): |
| | | if self.UI.radioButton.isChecked(): |
| | | self.TestPresentTimer.start(4000) |
| | | else: |
| | | self.TestPresentTimer.stop() |
| | | |
| | | def creat_testpresentReq(self): |
| | | self.udsclient.tester_present() |
| | | |
| | | def sessioncontrol_req(self): |
| | | sesson = self.UI.comboBox.currentIndex() + 1 |
| | | self.udsclient.change_session(sesson) |
| | | |
| | | # self.UI.comboBox_8.addItems(['0x01硬件复位', '0x03软件复位']) |
| | | def MCUReset_req(self): |
| | | req = 0x03 if self.UI.comboBox_8.currentData() == '0x01硬件复位' else 0x01 |
| | | self.udsclient.ecu_reset(req) |
| | | |
| | | # self.UI.comboBox_9.addItems(['0x00使能收发', '0x01能收禁发', '0x03禁止收发']) |
| | | def communicationControl_req(self): |
| | | req = 0x00 |
| | | select = self.UI.comboBox_9.currentData() |
| | | if select == '0x00使能收发': |
| | | req = 0 |
| | | elif select == '0x01能收禁发': |
| | | req = 1 |
| | | else: |
| | | req = 3 |
| | | self.udsclient.communication_control( |
| | | control_type=0, communication_type=req) |