Something went wrong on our end
Forked from an inaccessible project.
-
Robin.Mueller authoredRobin.Mueller authored
tmtc_frontend.py 13.55 KiB
#!/usr/bin/python3.7
"""
@file tmtc_frontend.py
@date
01.11.2019
@brief
This is part of the TMTC client developed by the SOURCE project by KSat
@description
GUI Testing for TMTC client
@manual
@author:
R. Mueller
"""
from multiprocessing import Process
from PyQt5.QtWidgets import *
from core.tmtc_backend import TmTcHandler
from tmtc_core.tc.obsw_pus_tc_base import PusTelecommand
from tmtc_core.utility.obsw_logger import get_logger
from config import obsw_config
import threading
LOGGER = get_logger()
# A first simple version has drop down menus to chose all necessary options
# which are normally handled by the args parser.
# when pressing save, all chosen values get passed to the globals file (OBSW_Config)
# To start the program, another button with start needs to be set up.
# A third button to perform a keyboard interrupt should be implemented
# include a really nice source badge and make it large !
# plan this on paper first...
# Step 1: Huge Mission Badge in Tkinter window because that is cool.
# Step 2: Simple buttons to run servce tests around the badge.
def initialize_frontend(tmtc_backend: TmTcHandler):
tmtc_frontend = TmTcFrontend(tmtc_backend)
frontend_process = Process(target=tmtc_frontend.init_ui())
frontend_process.start()
class SingleCommandTable(QTableWidget):
def __init__(self):
super().__init__()
self.setRowCount(1)
self.setColumnCount(5)
self.setHorizontalHeaderItem(0, QTableWidgetItem("Service"))
self.setHorizontalHeaderItem(1, QTableWidgetItem("Subservice"))
self.setHorizontalHeaderItem(2, QTableWidgetItem("SSC"))
self.setHorizontalHeaderItem(3, QTableWidgetItem("Data"))
self.setHorizontalHeaderItem(4, QTableWidgetItem("CRC"))
self.setItem(0, 0, QTableWidgetItem("17"))
self.setItem(0, 1, QTableWidgetItem("1"))
self.setItem(0, 2, QTableWidgetItem("20"))
class TmTcFrontend:
# TODO: this list should probably be inside an enum in the obsw_config.py
serviceList = [ 2, 3, 5, 8, 9, 17, 20, 200, "Dummy", "GPS0", "GPS1" ] # , "Error"]
service_test_button: QPushButton
single_command_button: QPushButton
command_table: QTableWidget
single_command_service: int = 17
single_command_sub_service: int = 1
single_command_ssc: int = 20
single_command_data: bytearray = bytearray([])
isBusy: bool
def __init__(self, tmtc_handler: TmTcHandler):
self.tmtc_handler = tmtc_handler
obsw_config.G_SERVICE = 17
obsw_config.G_COM_IF = obsw_config.ComIF.QEMU
def service_index_changed(self, index: int):
obsw_config.G_SERVICE = self.serviceList[index]
LOGGER.info("service_test_mode_selection updated: " + str(self.serviceList[index]))
def single_command_set_service(self, value):
self.single_command_service = value
def single_command_set_sub_service(self, value):
self.single_command_sub_service = value
def single_command_set_ssc(self, value):
self.single_command_ssc = value
def start_service_test_clicked(self):
LOGGER.info("start service test button pressed")
LOGGER.info("start testing service: " + str(obsw_config.G_SERVICE))
self.tmtc_handler.mode = obsw_config.ModeList.ServiceTestMode
# start the action in a new process
p = threading.Thread(target=self.handle_tm_tc_action)
p.start()
def send_single_command_clicked(self, table):
LOGGER.info("send single command pressed")
# parse the values from the table
#service = int(self.commandTable.item(0, 0).text())
#subservice = int(self.commandTable.item(0, 1).text())
#ssc = int(self.commandTable.item(0, 2).text())
LOGGER.info("service: " + str(self.single_command_service) +
", subservice: " + str(self.single_command_sub_service) +
", ssc: " + str(self.single_command_ssc))
# TODO: data needs to be parsed in a different way
# data = int(self.commandTable.item(0, 3).text())
# crc = int(self.commandTable.item(0, 4).text())
# create a command out of the parsed table
command = PusTelecommand(
service=self.single_command_service, subservice=self.single_command_sub_service,
ssc=self.single_command_ssc)
self.tmtc_handler.single_command_package = command.pack_command_tuple()
self.tmtc_handler.mode = obsw_config.ModeList.SingleCommandMode
# start the action in a new process
p = threading.Thread(target=self.handle_tm_tc_action)
p.start()
def handle_tm_tc_action(self):
LOGGER.info("start tmtc_handler.handle_action")
self.isBusy = True
self.set_send_buttons(False)
self.tmtc_handler.handle_action()
self.isBusy = False
self.set_send_buttons(True)
LOGGER.info("finished tmtc_handler.handle_action")
def set_send_buttons(self, state: bool):
self.service_test_button.setEnabled(state)
self.single_command_button.setEnabled(state)
def init_ui(self):
app = QApplication([])
win = QWidget()
grid = QGridLayout()
row = 0
grid.addWidget(QLabel("Configuration:"), row, 0, 1, 2)
row += 1
checkbox_console = QCheckBox("print output to console")
checkbox_console.setChecked(obsw_config.G_PRINT_TM)
checkbox_console.stateChanged.connect(checkbox_console_print)
checkbox_log = QCheckBox("print output to log file")
checkbox_log.setChecked(obsw_config.G_PRINT_TO_FILE)
checkbox_log.stateChanged.connect(checkbox_log_print)
checkbox_raw_tm = QCheckBox("print all raw TM data directly")
checkbox_raw_tm.setChecked(obsw_config.G_PRINT_RAW_TM)
checkbox_raw_tm.stateChanged.connect(checkbox_print_raw_data)
checkbox_hk = QCheckBox("print hk data")
checkbox_hk.setChecked(obsw_config.G_PRINT_HK_DATA)
checkbox_hk.stateChanged.connect(checkbox_print_hk_data)
checkbox_short = QCheckBox("short display mode")
checkbox_short.setChecked(obsw_config.G_DISPLAY_MODE == "short")
checkbox_short.stateChanged.connect(checkbox_short_display_mode)
grid.addWidget(checkbox_log, row, 0, 1, 1)
grid.addWidget(checkbox_console, row, 1, 1, 1)
row += 1
grid.addWidget(checkbox_raw_tm, row, 0, 1, 1)
grid.addWidget(checkbox_hk, row, 1, 1, 1)
row += 1
grid.addWidget(checkbox_short, row, 0, 1, 1)
row += 1
grid.addWidget(QLabel("tm timeout:"), row, 0, 1, 1)
grid.addWidget(QLabel("tm timeout factor:"), row, 1, 1, 1)
row += 1
spin_timeout = QDoubleSpinBox()
spin_timeout.setValue(obsw_config.G_TM_TIMEOUT)
# TODO: set sensible min/max values
spin_timeout.setSingleStep(0.1)
spin_timeout.setMinimum(0.25)
spin_timeout.setMaximum(60)
spin_timeout.valueChanged.connect(number_timeout)
grid.addWidget(spin_timeout, row, 0, 1, 1)
spin_timeout_factor = QDoubleSpinBox()
spin_timeout_factor.setValue(obsw_config.G_TC_SEND_TIMEOUT_FACTOR)
# TODO: set sensible min/max values
spin_timeout_factor.setSingleStep(0.1)
spin_timeout_factor.setMinimum(0.25)
spin_timeout_factor.setMaximum(10)
spin_timeout_factor.valueChanged.connect(number_timeout_factor)
grid.addWidget(spin_timeout_factor, row, 1, 1, 1)
row += 1
grid.addWidget(QLabel("Client IP:"), row, 0, 1, 1)
grid.addWidget(QLabel("Board IP:"), row, 1, 1, 1)
row += 1
spin_client_ip = QLineEdit()
# TODO: set sensible min/max values
spin_client_ip.setInputMask("000.000.000.000;_")
spin_client_ip.textChanged.connect(ip_change_client)
grid.addWidget(spin_client_ip, row, 0, 1, 1)
spin_board_ip = QLineEdit()
# TODO: set sensible min/max values
spin_board_ip.setInputMask("000.000.000.000;_")
spin_board_ip.textChanged.connect(ip_change_board)
#spin_board_ip.setText(obsw_config.G_SEND_ADDRESS[0])
grid.addWidget(spin_board_ip, row, 1, 1, 1)
row += 1
# com if configuration
grid.addWidget(QLabel("COM IF:"), row, 0, 1, 1)
com_if_comboBox = QComboBox()
# add all possible ComIFs to the comboBox
for comIf in obsw_config.ComIF:
com_if_comboBox.addItem(comIf.name)
com_if_comboBox.setCurrentIndex(obsw_config.G_COM_IF.value)
com_if_comboBox.currentIndexChanged.connect(com_if_index_changed)
grid.addWidget(com_if_comboBox, row, 1, 1, 1)
row += 1
# service test mode gui
grid.addWidget(QLabel("Service Test Mode:"), row, 0, 1, 2)
row += 1
comboBox = QComboBox()
for service in self.serviceList:
comboBox.addItem("Service - " + str(service))
comboBox.setCurrentIndex(self.serviceList.index(obsw_config.G_SERVICE))
comboBox.currentIndexChanged.connect(self.service_index_changed)
grid.addWidget(comboBox, row, 0, 1, 1)
self.service_test_button = QPushButton()
self.service_test_button.setText("Start Service Test")
self.service_test_button.clicked.connect(self.start_service_test_clicked)
grid.addWidget(self.service_test_button, row, 1, 1, 1)
row += 1
# single command operation
grid.addWidget(QLabel("Single Command Operation:"), row, 0, 1, 1)
row += 1
singleCommandGrid = QGridLayout()
singleCommandGrid.setSpacing(5)
singleCommandGrid.addWidget(QLabel("Service:"), row, 0, 1, 1)
singleCommandGrid.addWidget(QLabel("SubService:"), row, 1, 1, 1)
singleCommandGrid.addWidget(QLabel("SSC:"), row, 2, 1, 1)
row += 1
spin_service = QSpinBox()
spin_service.setValue(self.single_command_service)
# TODO: set sensible min/max values
spin_service.setMinimum(0)
spin_service.setMaximum(99999)
spin_service.valueChanged.connect(self.single_command_set_service)
singleCommandGrid.addWidget(spin_service, row, 0, 1, 1)
spin_sub_service = QSpinBox()
spin_sub_service.setValue(self.single_command_sub_service)
# TODO: set sensible min/max values
spin_sub_service.setMinimum(0)
spin_sub_service.setMaximum(99999)
spin_sub_service.valueChanged.connect(self.single_command_set_sub_service)
singleCommandGrid.addWidget(spin_sub_service, row, 1, 1, 1)
spin_ssc = QSpinBox()
spin_ssc.setValue(self.single_command_ssc)
# TODO: set sensible min/max values
spin_ssc.setMinimum(0)
spin_ssc.setMaximum(99999)
spin_ssc.valueChanged.connect(self.single_command_set_ssc)
singleCommandGrid.addWidget(spin_ssc, row, 2, 1, 1)
row += 1
singleCommandGrid.addWidget(QLabel("Data:"), row, 0, 1, 3)
row += 1
# TODO: how should this be converted to the byte array?
singleCommandDataBox = QTextEdit()
singleCommandGrid.addWidget(singleCommandDataBox, row, 0, 1, 3)
grid.addItem(singleCommandGrid, row, 0, 1, 2)
row += 1
#self.commandTable = SingleCommandTable()
#grid.addWidget(self.commandTable, row, 0, 1, 2)
row += 1
self.single_command_button = QPushButton()
self.single_command_button.setText("Send single command")
self.single_command_button.clicked.connect(self.send_single_command_clicked)
grid.addWidget(self.single_command_button, row, 0, 1, 2)
row += 1
win.setLayout(grid)
win.resize(900, 800)
win.show()
# resize table columns to fill the window width
#for i in range(0, 5):
# self.commandTable.setColumnWidth(i, int(self.commandTable.width() / 5) - 3)
app.exec_()
def com_if_index_changed(index: int):
obsw_config.G_COM_IF = obsw_config.ComIF(index)
LOGGER.info("com if updated: " + str(obsw_config.G_COM_IF))
def checkbox_console_print(state: int):
LOGGER.info(["enabled", "disabled"][state == 0] + " console print")
obsw_config.G_PRINT_TM = state == 0
def checkbox_log_print(state: int):
LOGGER.info(["enabled", "disabled"][state == 0] + " print to log")
obsw_config.G_PRINT_TO_FILE = state == 0
def checkbox_print_raw_data(state: int):
LOGGER.info(["enabled", "disabled"][state == 0] + " printing of raw data")
obsw_config.G_PRINT_RAW_TM = state == 0
def checkbox_print_hk_data(state: int):
LOGGER.info(["enabled", "disabled"][state == 0] + " printing of hk data")
obsw_config.G_PRINT_HK_DATA = state == 0
def checkbox_short_display_mode(state: int):
LOGGER.info(["enabled", "disabled"][state == 0] + " short display mode")
obsw_config.G_DISPLAY_MODE = ["short", "long"][state == 0]
def number_timeout(value: float):
LOGGER.info("tm timeout changed to: " + str(value))
obsw_config.G_TM_TIMEOUT = value
def number_timeout_factor(value: float):
LOGGER.info("tm timeout factor changed to: " + str(value))
obsw_config.G_TC_SEND_TIMEOUT_FACTOR = value
def ip_change_client(value):
LOGGER.info("client ip changed: " + value)
obsw_config.G_REC_ADDRESS = (value, 2008)
def ip_change_board(value):
LOGGER.info("board ip changed: " + value)
obsw_config.G_SEND_ADDRESS = (value, 7)