| | |
| | | |
| | | |
| | | # # def send_msg(): |
| | | # # meg = Message() |
| | | # # msg = VCI_CAN_OBJ() |
| | | from ast import arg |
| | | import encodings |
| | | import threading |
| | | from tkinter.tix import Tree |
| | | from USBCAN import * |
| | |
| | | from mainwindows import * |
| | | from multiprocessing import Process, Queue, Value, Pipe |
| | | import datetime |
| | | from ShifterDefine import * |
| | | import cantools |
| | | from pprint import pprint |
| | | # def rece_msg(bus): |
| | | # msg = bus.Receive(0.1) |
| | | # if msg[0] is not None: |
| | |
| | | # print(conn.recv()) |
| | | # conn.close() |
| | | |
| | | |
| | | # if __name__ == "__main__": |
| | | # parent_conn, child_conn = Pipe() # 产生两个返回对象,一个是管道这一头,一个是另一头 |
| | | # p = Process(target=f, args=(child_conn,)) |
| | |
| | | # ) |
| | | # bus.send(msg) |
| | | |
| | | |
| | | # # bus = open_device() |
| | | # # while True: |
| | | # # send_msg(bus) |
| | | # # # rece_msg(bus) |
| | | |
| | | |
| | | # device_type = DEVICETYPE['USBCAN-II'] |
| | | # device_index = 0 |
| | |
| | | # data=bytearray([0x02, 0x3e, 0, 0, 0, 0, 0, 0]) |
| | | # ) |
| | | |
| | | |
| | | # q_rx = Queue() |
| | | # q_tx = Queue() |
| | | # connect = Value('i', 1) |
| | |
| | | |
| | | # # def start(): |
| | | |
| | | |
| | | # # def rece_msg(): |
| | | # # # num = bus.GetReceiveNum(device_type, device_index, can_channel) |
| | | # # print(num) |
| | |
| | | # # # while not q_rx.empty(): |
| | | # # print(q_rx.get()) |
| | | # # # time.sleep(0.1) |
| | | |
| | | |
| | | # # bus.CloseDevice() |
| | | # # start() |
| | |
| | | # data.append(' '.join(['{:0<2x}'.format(a) for a in list(item.data)])) |
| | | # return data |
| | | |
| | | |
| | | # def rece_msg(): |
| | | # # msg, num = bus.Receive(len=10) |
| | | # num = q_rx.qsize() |
| | |
| | | # # print(msg[0].arbitration_id) |
| | | # print(formatedata(i, q_rx.get(), True)) |
| | | |
| | | |
| | | # def tx_task(bus: USBCAN): |
| | | # while True: |
| | | # bus.send(msg=msg) |
| | | # time.sleep(0.1) |
| | | |
| | | |
| | | # def rx_task(bus: USBCAN): |
| | | # while True: |
| | |
| | | # if msg[i].arbitration_id == 0x77a: |
| | | # # print(formatedata(i, msg[i], True)) |
| | | # print(msg[i].data) |
| | | |
| | | |
| | | # if __name__ == '__main__': |
| | | # bus.InitAndStart() |
| | |
| | | # item.setFont(QFont("微软雅黑", 12, QFont.Black)) # 设置字体 |
| | | # table.setItem(0, column, item) |
| | | |
| | | |
| | | # if __name__ == '__main__': |
| | | |
| | | # app = QApplication(sys.argv) |
| | |
| | | # # w.show() |
| | | |
| | | # sys.exit(app.exec_()) |
| | | |
| | | dbc = cantools.database.load_file("DBC/SX7H.dbc") |
| | | # print(dbc.messages) |
| | | sa_message = dbc.get_message_by_name('SA1') |
| | | # print(sa_message) |
| | | # pprint(sa_message.signals) |
| | | frame = [0x00, 0x0A, 0x00, 0xF0, 0, 0, 0, 0] |
| | | input_signal = dbc.decode_message(0x420, frame) |
| | | part_str = str(input_signal['SA1_Status_ParkButtonReq']) |
| | | pprint(part_str) |
| | | if input_signal['SA1_Status_ParkButtonReq'] == 'No request': |
| | | print('yes') |
| | | |
| | | data = sa_message.encode({ |
| | | 'SA1_Status_PRNDL': 0, |
| | | 'SA1_Status_GearShftPosReq': 0, #'Shifter position Zero', |
| | | 'SA1_Status_ParkButtonReq': 2, |
| | | "SA1_ShifterManualSignal": 0, |
| | | "SA1_ShifterModeSignal": 1, |
| | | "SA1_Status_ShftSensSng_Fault": 0, |
| | | "SA1_IND_ShifterMisUsd": 0, |
| | | "SA1_Live_counter": 1, |
| | | "SA1_Status_UnlockButtonReq": 1, |
| | | "SA1_Status_EcoShifterModeReq": 0, |
| | | "SA1_Status_RqGearPosInV": 0xf, |
| | | "SA1_Status_ShiftPosValidFlag": 0, |
| | | }) |
| | | # print(data) |
| | | # print(SA1_Status_ParkButtonReq_dic['No request']) |