diff --git a/config/obsw_com_config.py b/config/obsw_com_config.py index 3de6c2863cc55bd60f83dc150043fe31ff972d53..66e5da5837784e1577612a39e0f19a1f4a01843a 100644 --- a/config/obsw_com_config.py +++ b/config/obsw_com_config.py @@ -40,9 +40,9 @@ def set_communication_interface(tmtc_printer: TmTcPrinter) -> Union[Communicatio communication_interface.set_dle_settings( g.G_SERIAL_DLE_MAX_QUEUE_LEN, g.G_SERIAL_DLE_MAX_FRAME_SIZE, serial_timeout) elif g.G_COM_IF == g.ComIF.QEMU: + serial_timeout = g.G_SERIAL_TIMEOUT communication_interface = QEMUComIF( - tmtc_printer=tmtc_printer, tm_timeout=g.G_TM_TIMEOUT, - tc_timeout_factor=g.G_TC_SEND_TIMEOUT_FACTOR) + tmtc_printer=tmtc_printer, serial_timeout=serial_timeout) else: communication_interface = DummyComIF(tmtc_printer=tmtc_printer) if not communication_interface.valid: diff --git a/gui/obsw_backend_test.py b/gui/obsw_backend_test.py deleted file mode 100644 index c21caeec5ea9d5dacbc119a6c49e66b27237083a..0000000000000000000000000000000000000000 --- a/gui/obsw_backend_test.py +++ /dev/null @@ -1,36 +0,0 @@ -from multiprocessing.connection import Listener -from multiprocessing import Process -from tmtc_core.utility.obsw_logger import get_logger -import logging - - -LOGGER = get_logger() - -class TmTcBackend(Process): - def __init__(self): - from obsw_tmtc_client import TmTcHandler - super(TmTcBackend, self).__init__() - self.address = ('localhost', 6000) # family is deduced to be 'AF_INET' - self.tmtc_backend = TmTcHandler() - self.listener = Listener(self.address, authkey=None) - self.conn = 0 - - def run(self): - self.listen() - - def listen(self): - self.conn = self.listener.accept() - LOGGER.info("TmTcBackend: Connection accepted from %s",str(self.listener.last_accepted)) - while True: - msg = self.conn.recv() - # do something with msg - # here, the core client could be called to perform operations based on received message - if msg == 'test': - LOGGER.info("TmTcBackend: Hallo Welt !") - elif msg == 'close': - try: - self.conn.close() - break - except OSError: - logging.exception("Error: ") - self.listener.close() diff --git a/gui/obsw_tmtc_gui.py b/gui/obsw_tmtc_gui.py index 8c322d5e8e7f54f2ba7a23b5dd9b6209624db546..39e9e3d8a69409aeca986ce181c9bdd7c33fcee1 100644 --- a/gui/obsw_tmtc_gui.py +++ b/gui/obsw_tmtc_gui.py @@ -13,11 +13,12 @@ @author: R. Mueller """ -import tkinter as tk -from multiprocessing.connection import Client -from multiprocessing import Process +from PyQt5.QtWidgets import * +from obsw_tmtc_client import TmTcHandler +from tmtc_core.tc.obsw_pus_tc_base import PusTelecommand from tmtc_core.utility.obsw_logger import get_logger -import time +from config import obsw_config +import threading LOGGER = get_logger() @@ -30,32 +31,269 @@ LOGGER = get_logger() # plan this on paper first... # Step 1: Huge Mission Badge in Tkinter window because that is cool. # Step 2: Simple buttons to run servce tests around the badge. -class TmTcGUI(Process): + +class TmTcGUI: def __init__(self): super(TmTcGUI, self).__init__() - self.root = None - self.address = ('localhost', 6000) - self.conn = Client(self.address, authkey=None) - self.counter = 0 def run(self): - self.open() - - def open(self): - self.root = tk.Tk() - self.root.title("Hallo Welt") - while True: - LOGGER.info("TmTcGUI: Sending test message") - self.conn.send("test") - self.root.update() - time.sleep(0.1) - self.counter = self.counter + 1 - if self.counter == 50: - self.conn.send("close") - self.conn.close() - break - # self.window.mainloop() + application = Application() + # init the backend + application.init_tmtc_handler() + # init the frontend + application.init_ui() def close(self): pass # self.window.quit() + +class SingleCommandTable(QTableWidget): + def __init__(self): + super().__init__() + self.setRowCount(1) + self.setColumnCount(5) + self.setHorizontalHeaderItem(0, QTableWidgetItem("Service")) + self.setHorizontalHeaderItem(1, QTableWidgetItem("Subservice")) + self.setHorizontalHeaderItem(2, QTableWidgetItem("SSC")) + self.setHorizontalHeaderItem(3, QTableWidgetItem("Data")) + self.setHorizontalHeaderItem(4, QTableWidgetItem("CRC")) + self.setItem(0, 0, QTableWidgetItem("17")) + self.setItem(0, 1, QTableWidgetItem("1")) + self.setItem(0, 2, QTableWidgetItem("20")) + + +class Application: + tmtc_handler: TmTcHandler + + # TODO: this list should probably be inside an enum in the obsw_config.py + serviceList = [ 2, 3, 5, 8, 9, 17, 20, 200, "Dummy", "GPS0", "GPS1" ] # , "Error"] + + serviceTest_button: QPushButton + singleCommand_button: QPushButton + commandTable: QTableWidget + + isBusy: bool + + def __init__(self): + obsw_config.G_SERVICE = 17 + obsw_config.G_COM_IF = obsw_config.ComIF.QEMU + + def init_tmtc_handler(self): + self.tmtc_handler = TmTcHandler() + + def com_if_index_changed(self, index: int): + obsw_config.G_COM_IF = obsw_config.ComIF(index) + LOGGER.info("com if updated: " + str(obsw_config.G_COM_IF)) + + def service_index_changed(self, index: int): + obsw_config.G_SERVICE = self.serviceList[index] + LOGGER.info("service_test_mode_selection updated: " + str(self.serviceList[index])) + + def checkbox_console_print(self, state: int): + LOGGER.info(["enabled", "disabled"][state == 0] + " console print") + obsw_config.G_PRINT_TM = state == 0 + + def checkbox_log_print(self, state: int): + LOGGER.info(["enabled", "disabled"][state == 0] + " print to log") + obsw_config.G_PRINT_TO_FILE = state == 0 + + def checkbox_print_raw_data(self, state: int): + LOGGER.info(["enabled", "disabled"][state == 0] + " printing of raw data") + obsw_config.G_PRINT_RAW_TM = state == 0 + + def checkbox_print_hk_data(self, state: int): + LOGGER.info(["enabled", "disabled"][state == 0] + " printing of hk data") + obsw_config.G_PRINT_HK_DATA = state == 0 + + def checkbox_short_display_mode(self, state: int): + LOGGER.info(["enabled", "disabled"][state == 0] + " short display mode") + obsw_config.G_DISPLAY_MODE = ["short", "long"][state == 0] + + def number_timeout(self, value: float): + LOGGER.info("tm timeout changed to: " + str(value)) + obsw_config.G_TM_TIMEOUT = value + + def number_timeout_factor(self, value: float): + LOGGER.info("tm timeout factor changed to: " + str(value)) + obsw_config.G_TC_SEND_TIMEOUT_FACTOR = value + + def ip_change_client(self, value): + LOGGER.info("client ip changed: " + value) + obsw_config.G_REC_ADDRESS = (value, 2008) + + def ip_change_board(self, value): + LOGGER.info("board ip changed: " + value) + obsw_config.G_SEND_ADDRESS = (value, 7) + + def start_service_test_clicked(self): + LOGGER.info("start service test button pressed") + LOGGER.info("start testing service: " + str(obsw_config.G_SERVICE)) + self.tmtc_handler.mode = obsw_config.ModeList.ServiceTestMode + # start the action in a new process + p = threading.Thread(target=self.handleTmTcAction) + p.start() + + def send_single_command_clicked(self, table): + LOGGER.info("send single command pressed") + + # parse the values from the table + service = int(self.commandTable.item(0, 0).text()) + subservice = int(self.commandTable.item(0, 1).text()) + ssc = int(self.commandTable.item(0, 2).text()) + # TODO: data needs to be parsed in a different way + # data = int(self.commandTable.item(0, 3).text()) + # crc = int(self.commandTable.item(0, 4).text()) + + # create a command out of the parsed table + command = PusTelecommand(service=service, subservice=subservice, ssc=ssc) + obsw_config.G_PUS_PACKET_TUPLE = command.pack_command_tuple() + + self.tmtc_handler.mode = obsw_config.ModeList.SingleCommandMode + # start the action in a new process + p = threading.Thread(target=self.handleTmTcAction) + p.start() + + def handleTmTcAction(self): + LOGGER.info("start tmtc_handler.handle_action") + self.isBusy = True + self.setSendButtons(False) + self.tmtc_handler.handle_action() + self.isBusy = False + self.setSendButtons(True) + LOGGER.info("finished tmtc_handler.handle_action") + + def setSendButtons(self, state: bool): + self.serviceTest_button.setEnabled(state) + self.singleCommand_button.setEnabled(state) + + def init_ui(self): + app = QApplication([]) + + win = QWidget() + grid = QGridLayout() + + row = 0 + grid.addWidget(QLabel("Configuration:"), row, 0, 1, 2) + row += 1 + + checkbox_console = QCheckBox("print output to console") + checkbox_console.setChecked(obsw_config.G_PRINT_TM) + checkbox_console.stateChanged.connect(self.checkbox_console_print) + + checkbox_log = QCheckBox("print output to log file") + checkbox_log.setChecked(obsw_config.G_PRINT_TO_FILE) + checkbox_log.stateChanged.connect(self.checkbox_log_print) + + checkbox_raw_tm = QCheckBox("print all raw TM data directly") + checkbox_raw_tm.setChecked(obsw_config.G_PRINT_RAW_TM) + checkbox_raw_tm.stateChanged.connect(self.checkbox_print_raw_data) + + checkbox_hk = QCheckBox("print hk data") + checkbox_hk.setChecked(obsw_config.G_PRINT_HK_DATA) + checkbox_hk.stateChanged.connect(self.checkbox_print_hk_data) + + checkbox_short = QCheckBox("short display mode") + checkbox_short.setChecked(obsw_config.G_DISPLAY_MODE == "short") + checkbox_short.stateChanged.connect(self.checkbox_short_display_mode) + + grid.addWidget(checkbox_log, row, 0, 1, 1) + grid.addWidget(checkbox_console, row, 1, 1, 1) + row += 1 + grid.addWidget(checkbox_raw_tm, row, 0, 1, 1) + grid.addWidget(checkbox_hk, row, 1, 1, 1) + row += 1 + grid.addWidget(checkbox_short, row, 0, 1, 1) + row += 1 + + grid.addWidget(QLabel("tm timeout:"), row, 0, 1, 1) + grid.addWidget(QLabel("tm timeout factor:"), row, 1, 1, 1) + row += 1 + + spin_timeout = QDoubleSpinBox() + spin_timeout.setValue(obsw_config.G_TM_TIMEOUT) + # TODO: set sensible min/max values + spin_timeout.setSingleStep(0.1) + spin_timeout.setMinimum(0.25) + spin_timeout.setMaximum(60) + spin_timeout.valueChanged.connect(self.number_timeout) + grid.addWidget(spin_timeout, row, 0, 1, 1) + + spin_timeout_factor = QDoubleSpinBox() + spin_timeout_factor.setValue(obsw_config.G_TC_SEND_TIMEOUT_FACTOR) + # TODO: set sensible min/max values + spin_timeout_factor.setSingleStep(0.1) + spin_timeout_factor.setMinimum(0.25) + spin_timeout_factor.setMaximum(10) + spin_timeout_factor.valueChanged.connect(self.number_timeout_factor) + grid.addWidget(spin_timeout_factor, row, 1, 1, 1) + row += 1 + + grid.addWidget(QLabel("Client IP:"), row, 0, 1, 1) + grid.addWidget(QLabel("Board IP:"), row, 1, 1, 1) + row += 1 + + spin_client_ip = QLineEdit() + # TODO: set sensible min/max values + spin_client_ip.setInputMask("000.000.000.000;_") + spin_client_ip.textChanged.connect(self.ip_change_client) + grid.addWidget(spin_client_ip, row, 0, 1, 1) + + spin_board_ip = QLineEdit() + # TODO: set sensible min/max values + spin_board_ip.setInputMask("000.000.000.000;_") + spin_board_ip.textChanged.connect(self.ip_change_board) + #spin_board_ip.setText(obsw_config.G_SEND_ADDRESS[0]) + grid.addWidget(spin_board_ip, row, 1, 1, 1) + + + row += 1 + # com if configuration + grid.addWidget(QLabel("COM IF:"), row, 0, 1, 1) + com_if_comboBox = QComboBox() + # add all possible ComIFs to the comboBox + for comIf in obsw_config.ComIF: + com_if_comboBox.addItem(comIf.name) + com_if_comboBox.setCurrentIndex(obsw_config.G_COM_IF.value) + com_if_comboBox.currentIndexChanged.connect(self.com_if_index_changed) + grid.addWidget(com_if_comboBox, row, 1, 1, 1) + row += 1 + + # service test mode gui + grid.addWidget(QLabel("Service Test Mode:"), row, 0, 1, 2) + row += 1 + + comboBox = QComboBox() + for service in self.serviceList: + comboBox.addItem("Service - " + str(service)) + comboBox.setCurrentIndex(self.serviceList.index(obsw_config.G_SERVICE)) + comboBox.currentIndexChanged.connect(self.service_index_changed) + grid.addWidget(comboBox, row, 0, 1, 1) + + self.serviceTest_button = QPushButton() + self.serviceTest_button.setText("Start Service Test") + self.serviceTest_button.clicked.connect(self.start_service_test_clicked) + grid.addWidget(self.serviceTest_button, row, 1, 1, 1) + row += 1 + + # single command operation + grid.addWidget(QLabel("Single Command Operation:"), row, 0, 1, 1) + row += 1 + + self.commandTable = SingleCommandTable() + grid.addWidget(self.commandTable, row, 0, 1, 2) + row += 1 + self.singleCommand_button = QPushButton() + self.singleCommand_button.setText("Send single command") + self.singleCommand_button.clicked.connect(self.send_single_command_clicked) + grid.addWidget(self.singleCommand_button, row, 0, 1, 2) + row += 1 + + win.setLayout(grid) + win.resize(900, 800) + win.show() + + # resize table columns to fill the window width + for i in range(0, 5): + self.commandTable.setColumnWidth(i, int(self.commandTable.width() / 5) - 3) + + app.exec_() \ No newline at end of file diff --git a/obsw_tmtc_client.py b/obsw_tmtc_client.py index d97dc3586a4c4345c9312a4803f0d644d7077a85..d57cc0109c03f5a885b4b2fb6698a7d4a82cf756 100755 --- a/obsw_tmtc_client.py +++ b/obsw_tmtc_client.py @@ -50,9 +50,8 @@ from tc.obsw_pus_tc_packer import create_total_tc_queue, ServiceQueuePacker from utility.obsw_args_parser import parse_input_arguments from utility.obsw_binary_uploader import perform_binary_upload -from gui.obsw_tmtc_gui import TmTcGUI -from gui.obsw_backend_test import TmTcBackend from obsw_user_code import command_preparation_hook +import gui.obsw_tmtc_gui LOGGER = get_logger() @@ -73,8 +72,7 @@ def main(): LOGGER.info("Starting TMTC Handler") if g.G_MODE_ID == g.ModeList.GUIMode: - do_gui_test() - + open_gui() else: tmtc_handler = TmTcHandler() tmtc_handler.perform_operation() @@ -86,16 +84,10 @@ def main(): pass -def do_gui_test(): - # Experimental - backend = TmTcBackend() - backend.start() - gui = TmTcGUI() - gui.start() - backend.join() - gui.join() - LOGGER.info("Both processes have closed") - sys.exit() +def open_gui(): + tmtcGui = gui.obsw_tmtc_gui.TmTcGUI() + tmtcGui.run() + # sys.exit() def command_preparation() -> Tuple[bytearray, Union[None, PusTcInfo]]: