Skip to content
Snippets Groups Projects
Forked from an inaccessible project.
tmtc_frontend.py 13.55 KiB
#!/usr/bin/python3.7
"""
@file   tmtc_frontend.py
@date
    01.11.2019
@brief
    This is part of the TMTC client developed by the SOURCE project by KSat
@description
    GUI Testing for TMTC client
@manual
@author:
    R. Mueller
"""
from multiprocessing import Process

from PyQt5.QtWidgets import *
from core.tmtc_backend import TmTcHandler
from tmtc_core.tc.obsw_pus_tc_base import PusTelecommand
from tmtc_core.utility.obsw_logger import get_logger
from config import obsw_config
import threading

LOGGER = get_logger()


# A first simple version has drop down menus to chose all necessary options
# which are normally handled by the args parser.
# when pressing save, all chosen values get passed to the globals file (OBSW_Config)
# To start the program, another button with start needs to be set up.
# A third button to perform a keyboard interrupt should be implemented
# include a really nice source badge and make it large !
# plan this on paper first...
# Step 1: Huge Mission Badge in Tkinter window because that is cool.
# Step 2: Simple buttons to run servce tests around the badge.

def initialize_frontend(tmtc_backend: TmTcHandler):
    tmtc_frontend = TmTcFrontend(tmtc_backend)
    frontend_process = Process(target=tmtc_frontend.init_ui())
    frontend_process.start()

class SingleCommandTable(QTableWidget):
    def __init__(self):
        super().__init__()
        self.setRowCount(1)
        self.setColumnCount(5)
        self.setHorizontalHeaderItem(0, QTableWidgetItem("Service"))
        self.setHorizontalHeaderItem(1, QTableWidgetItem("Subservice"))
        self.setHorizontalHeaderItem(2, QTableWidgetItem("SSC"))
        self.setHorizontalHeaderItem(3, QTableWidgetItem("Data"))
        self.setHorizontalHeaderItem(4, QTableWidgetItem("CRC"))
        self.setItem(0, 0, QTableWidgetItem("17"))
        self.setItem(0, 1, QTableWidgetItem("1"))
        self.setItem(0, 2, QTableWidgetItem("20"))


class TmTcFrontend:

    # TODO: this list should probably be inside an enum in the obsw_config.py
    serviceList = [ 2, 3, 5, 8, 9, 17, 20, 200, "Dummy", "GPS0", "GPS1" ] # , "Error"]

    service_test_button: QPushButton
    single_command_button: QPushButton
    command_table: QTableWidget

    single_command_service: int = 17
    single_command_sub_service: int = 1
    single_command_ssc: int = 20
    single_command_data: bytearray = bytearray([])

    isBusy: bool

    def __init__(self, tmtc_handler: TmTcHandler):
        self.tmtc_handler = tmtc_handler
        obsw_config.G_SERVICE = 17
        obsw_config.G_COM_IF = obsw_config.ComIF.QEMU

    def service_index_changed(self, index: int):
        obsw_config.G_SERVICE = self.serviceList[index]
        LOGGER.info("service_test_mode_selection updated: " + str(self.serviceList[index]))

    def single_command_set_service(self, value):
        self.single_command_service = value

    def single_command_set_sub_service(self, value):
        self.single_command_sub_service = value

    def single_command_set_ssc(self, value):
        self.single_command_ssc = value

    def start_service_test_clicked(self):
        LOGGER.info("start service test button pressed")
        LOGGER.info("start testing service: " + str(obsw_config.G_SERVICE))
        self.tmtc_handler.mode = obsw_config.ModeList.ServiceTestMode
        # start the action in a new process
        p = threading.Thread(target=self.handle_tm_tc_action)
        p.start()

    def send_single_command_clicked(self, table):
        LOGGER.info("send single command pressed")

        # parse the values from the table
        #service = int(self.commandTable.item(0, 0).text())
        #subservice = int(self.commandTable.item(0, 1).text())
        #ssc = int(self.commandTable.item(0, 2).text())

        LOGGER.info("service: " + str(self.single_command_service) +
                    ", subservice: " + str(self.single_command_sub_service) +
                    ", ssc: " + str(self.single_command_ssc))

        # TODO: data needs to be parsed in a different way
        # data = int(self.commandTable.item(0, 3).text())
        # crc = int(self.commandTable.item(0, 4).text())

        # create a command out of the parsed table
        command = PusTelecommand(
            service=self.single_command_service, subservice=self.single_command_sub_service,
            ssc=self.single_command_ssc)
        self.tmtc_handler.single_command_package = command.pack_command_tuple()

        self.tmtc_handler.mode = obsw_config.ModeList.SingleCommandMode
        # start the action in a new process
        p = threading.Thread(target=self.handle_tm_tc_action)
        p.start()

    def handle_tm_tc_action(self):
        LOGGER.info("start tmtc_handler.handle_action")
        self.isBusy = True
        self.set_send_buttons(False)
        self.tmtc_handler.handle_action()
        self.isBusy = False
        self.set_send_buttons(True)
        LOGGER.info("finished tmtc_handler.handle_action")

    def set_send_buttons(self, state: bool):
        self.service_test_button.setEnabled(state)
        self.single_command_button.setEnabled(state)

    def init_ui(self):
        app = QApplication([])

        win = QWidget()
        grid = QGridLayout()

        row = 0
        grid.addWidget(QLabel("Configuration:"), row, 0, 1, 2)
        row += 1

        checkbox_console = QCheckBox("print output to console")
        checkbox_console.setChecked(obsw_config.G_PRINT_TM)
        checkbox_console.stateChanged.connect(checkbox_console_print)

        checkbox_log = QCheckBox("print output to log file")
        checkbox_log.setChecked(obsw_config.G_PRINT_TO_FILE)
        checkbox_log.stateChanged.connect(checkbox_log_print)

        checkbox_raw_tm = QCheckBox("print all raw TM data directly")
        checkbox_raw_tm.setChecked(obsw_config.G_PRINT_RAW_TM)
        checkbox_raw_tm.stateChanged.connect(checkbox_print_raw_data)

        checkbox_hk = QCheckBox("print hk data")
        checkbox_hk.setChecked(obsw_config.G_PRINT_HK_DATA)
        checkbox_hk.stateChanged.connect(checkbox_print_hk_data)

        checkbox_short = QCheckBox("short display mode")
        checkbox_short.setChecked(obsw_config.G_DISPLAY_MODE == "short")
        checkbox_short.stateChanged.connect(checkbox_short_display_mode)

        grid.addWidget(checkbox_log, row, 0, 1, 1)
        grid.addWidget(checkbox_console, row, 1, 1, 1)
        row += 1
        grid.addWidget(checkbox_raw_tm, row, 0, 1, 1)
        grid.addWidget(checkbox_hk, row, 1, 1, 1)
        row += 1
        grid.addWidget(checkbox_short, row, 0, 1, 1)
        row += 1

        grid.addWidget(QLabel("tm timeout:"), row, 0, 1, 1)
        grid.addWidget(QLabel("tm timeout factor:"), row, 1, 1, 1)
        row += 1

        spin_timeout = QDoubleSpinBox()
        spin_timeout.setValue(obsw_config.G_TM_TIMEOUT)
        # TODO: set sensible min/max values
        spin_timeout.setSingleStep(0.1)
        spin_timeout.setMinimum(0.25)
        spin_timeout.setMaximum(60)
        spin_timeout.valueChanged.connect(number_timeout)
        grid.addWidget(spin_timeout, row, 0, 1, 1)

        spin_timeout_factor = QDoubleSpinBox()
        spin_timeout_factor.setValue(obsw_config.G_TC_SEND_TIMEOUT_FACTOR)
        # TODO: set sensible min/max values
        spin_timeout_factor.setSingleStep(0.1)
        spin_timeout_factor.setMinimum(0.25)
        spin_timeout_factor.setMaximum(10)
        spin_timeout_factor.valueChanged.connect(number_timeout_factor)
        grid.addWidget(spin_timeout_factor, row, 1, 1, 1)
        row += 1

        grid.addWidget(QLabel("Client IP:"), row, 0, 1, 1)
        grid.addWidget(QLabel("Board IP:"), row, 1, 1, 1)
        row += 1

        spin_client_ip = QLineEdit()
        # TODO: set sensible min/max values
        spin_client_ip.setInputMask("000.000.000.000;_")
        spin_client_ip.textChanged.connect(ip_change_client)
        grid.addWidget(spin_client_ip, row, 0, 1, 1)

        spin_board_ip = QLineEdit()
        # TODO: set sensible min/max values
        spin_board_ip.setInputMask("000.000.000.000;_")
        spin_board_ip.textChanged.connect(ip_change_board)
        #spin_board_ip.setText(obsw_config.G_SEND_ADDRESS[0])
        grid.addWidget(spin_board_ip, row, 1, 1, 1)


        row += 1
        # com if configuration
        grid.addWidget(QLabel("COM IF:"), row, 0, 1, 1)
        com_if_comboBox = QComboBox()
        # add all possible ComIFs to the comboBox
        for comIf in obsw_config.ComIF:
            com_if_comboBox.addItem(comIf.name)
        com_if_comboBox.setCurrentIndex(obsw_config.G_COM_IF.value)
        com_if_comboBox.currentIndexChanged.connect(com_if_index_changed)
        grid.addWidget(com_if_comboBox, row, 1, 1, 1)
        row += 1

        # service test mode gui
        grid.addWidget(QLabel("Service Test Mode:"), row, 0, 1, 2)
        row += 1

        comboBox = QComboBox()
        for service in self.serviceList:
            comboBox.addItem("Service - " + str(service))
        comboBox.setCurrentIndex(self.serviceList.index(obsw_config.G_SERVICE))
        comboBox.currentIndexChanged.connect(self.service_index_changed)
        grid.addWidget(comboBox, row, 0, 1, 1)

        self.service_test_button = QPushButton()
        self.service_test_button.setText("Start Service Test")
        self.service_test_button.clicked.connect(self.start_service_test_clicked)
        grid.addWidget(self.service_test_button, row, 1, 1, 1)
        row += 1

        # single command operation
        grid.addWidget(QLabel("Single Command Operation:"), row, 0, 1, 1)
        row += 1

        singleCommandGrid = QGridLayout()
        singleCommandGrid.setSpacing(5)

        singleCommandGrid.addWidget(QLabel("Service:"), row, 0, 1, 1)
        singleCommandGrid.addWidget(QLabel("SubService:"), row, 1, 1, 1)
        singleCommandGrid.addWidget(QLabel("SSC:"), row, 2, 1, 1)

        row += 1

        spin_service = QSpinBox()
        spin_service.setValue(self.single_command_service)
        # TODO: set sensible min/max values
        spin_service.setMinimum(0)
        spin_service.setMaximum(99999)
        spin_service.valueChanged.connect(self.single_command_set_service)
        singleCommandGrid.addWidget(spin_service, row, 0, 1, 1)

        spin_sub_service = QSpinBox()
        spin_sub_service.setValue(self.single_command_sub_service)
        # TODO: set sensible min/max values
        spin_sub_service.setMinimum(0)
        spin_sub_service.setMaximum(99999)
        spin_sub_service.valueChanged.connect(self.single_command_set_sub_service)
        singleCommandGrid.addWidget(spin_sub_service, row, 1, 1, 1)

        spin_ssc = QSpinBox()
        spin_ssc.setValue(self.single_command_ssc)
        # TODO: set sensible min/max values
        spin_ssc.setMinimum(0)
        spin_ssc.setMaximum(99999)
        spin_ssc.valueChanged.connect(self.single_command_set_ssc)
        singleCommandGrid.addWidget(spin_ssc, row, 2, 1, 1)

        row += 1

        singleCommandGrid.addWidget(QLabel("Data:"), row, 0, 1, 3)

        row += 1

        # TODO: how should this be converted to the byte array?
        singleCommandDataBox = QTextEdit()
        singleCommandGrid.addWidget(singleCommandDataBox, row, 0, 1, 3)

        grid.addItem(singleCommandGrid, row, 0, 1, 2)

        row += 1

        #self.commandTable = SingleCommandTable()
        #grid.addWidget(self.commandTable, row, 0, 1, 2)
        row += 1
        self.single_command_button = QPushButton()
        self.single_command_button.setText("Send single command")
        self.single_command_button.clicked.connect(self.send_single_command_clicked)
        grid.addWidget(self.single_command_button, row, 0, 1, 2)
        row += 1

        win.setLayout(grid)
        win.resize(900, 800)
        win.show()

        # resize table columns to fill the window width
        #for i in range(0, 5):
        #    self.commandTable.setColumnWidth(i, int(self.commandTable.width() / 5) - 3)

        app.exec_()



def com_if_index_changed(index: int):
    obsw_config.G_COM_IF = obsw_config.ComIF(index)
    LOGGER.info("com if updated: " + str(obsw_config.G_COM_IF))


def checkbox_console_print(state: int):
    LOGGER.info(["enabled", "disabled"][state == 0] + " console print")
    obsw_config.G_PRINT_TM = state == 0


def checkbox_log_print(state: int):
    LOGGER.info(["enabled", "disabled"][state == 0] + " print to log")
    obsw_config.G_PRINT_TO_FILE = state == 0


def checkbox_print_raw_data(state: int):
    LOGGER.info(["enabled", "disabled"][state == 0] + " printing of raw data")
    obsw_config.G_PRINT_RAW_TM = state == 0


def checkbox_print_hk_data(state: int):
    LOGGER.info(["enabled", "disabled"][state == 0] + " printing of hk data")
    obsw_config.G_PRINT_HK_DATA = state == 0


def checkbox_short_display_mode(state: int):
    LOGGER.info(["enabled", "disabled"][state == 0] + " short display mode")
    obsw_config.G_DISPLAY_MODE = ["short", "long"][state == 0]


def number_timeout(value: float):
    LOGGER.info("tm timeout changed to: " + str(value))
    obsw_config.G_TM_TIMEOUT = value


def number_timeout_factor(value: float):
    LOGGER.info("tm timeout factor changed to: " + str(value))
    obsw_config.G_TC_SEND_TIMEOUT_FACTOR = value


def ip_change_client(value):
    LOGGER.info("client ip changed: " + value)
    obsw_config.G_REC_ADDRESS = (value, 2008)


def ip_change_board(value):
    LOGGER.info("board ip changed: " + value)
    obsw_config.G_SEND_ADDRESS = (value, 7)