Commit 9c7cfc34 authored by Robin Mueller's avatar Robin Mueller
Browse files

performing major refactoring

parent 0fe730a3
......@@ -4,7 +4,7 @@ import sys
from collections import deque
from typing import Tuple, Union
from tmtc_core.core.tmtc_core_definitions import ComInterfaces
from tmtc_core.core.definitions import ComInterfaces, CoreGlobalIds
from tmtc_core.utility.tmtcc_logger import get_logger
from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTcInfo, PusTelecommand
from tmtc_core.sendreceive.tmtcc_single_command_sender_receiver import SingleCommandSenderReceiver
......@@ -85,17 +85,16 @@ class TmTcHandler:
executed_handler.start()
def initialize(self):
from config.tmtcc_globals import GlobalIds
from tmtc_core.core.tmtcc_globals_manager import get_global
from tmtc_core.core.globals_manager import get_global
"""
Perform initialization steps which might be necessary after class construction.
This has to be called at some point before using the class!
"""
com_if = get_global(GlobalIds.COM_IF)
tc_send_timeout_factor = get_global(GlobalIds.TC_SEND_TIMEOUT_FACTOR)
tm_timeout = get_global(GlobalIds.TM_TIMEOUT)
display_mode = get_global(GlobalIds.DISPLAY_MODE)
print_to_file = get_global(GlobalIds.PRINT_TO_FILE)
com_if = get_global(CoreGlobalIds.COM_IF)
tc_send_timeout_factor = get_global(CoreGlobalIds.TC_SEND_TIMEOUT_FACTOR)
tm_timeout = get_global(CoreGlobalIds.TM_TIMEOUT)
display_mode = get_global(CoreGlobalIds.DISPLAY_MODE)
print_to_file = get_global(CoreGlobalIds.PRINT_TO_FILE)
self.tmtc_printer = TmTcPrinter(display_mode, print_to_file, True)
self.communication_interface = create_communication_interface_user(com_if, self.tmtc_printer)
self.tm_listener = TmListener(
......@@ -158,8 +157,7 @@ class TmTcHandler:
self.mode = ModeList.PromptMode
elif self.mode == ModeList.ServiceTestMode:
from tmtc_core.core.tmtcc_globals_manager import get_global
from config.tmtcc_globals import GlobalIds
from tmtc_core.core.globals_manager import get_global
service_queue = deque()
service_queue_packer = ServiceQueuePacker()
service_queue_packer.pack_service_queue_core(
......
......@@ -6,7 +6,7 @@
import sys
from typing import Union
from tmtc_core.core.tmtc_core_definitions import ComInterfaces
from tmtc_core.core.definitions import ComInterfaces
from tmtc_core.com_if.tmtcc_com_interface_base import CommunicationInterface
from tmtc_core.com_if.tmtcc_dummy_com_if import DummyComIF
from tmtc_core.com_if.tmtcc_ethernet_com_if import EthernetComIF
......@@ -21,8 +21,8 @@ LOGGER = get_logger()
def create_communication_interface_default(com_if: ComInterfaces, tmtc_printer: TmTcPrinter) -> \
Union[CommunicationInterface, None]:
from config.tmtcc_globals import GlobalIds
from tmtc_core.core.tmtcc_globals_manager import get_global
from tmtc_core.core.definitions import CoreGlobalIds
from tmtc_core.core.globals_manager import get_global
"""
Return the desired communication interface object
:param tmtc_printer: TmTcPrinter object.
......@@ -31,16 +31,16 @@ def create_communication_interface_default(com_if: ComInterfaces, tmtc_printer:
try:
if com_if == ComInterfaces.EthernetUDP:
from config.tmtcc_definitions import EthernetConfig
ethernet_cfg_dict = get_global(GlobalIds.ETHERNET_CONFIG)
ethernet_cfg_dict = get_global(CoreGlobalIds.ETHERNET_CONFIG)
send_addr = ethernet_cfg_dict[EthernetConfig.SEND_ADDRESS]
rcv_addr = ethernet_cfg_dict[EthernetConfig.RECV_ADDRESS]
communication_interface = EthernetComIF(
tmtc_printer=tmtc_printer, tm_timeout=get_global(GlobalIds.TM_TIMEOUT),
tc_timeout_factor=get_global(GlobalIds.TC_SEND_TIMEOUT_FACTOR),
tmtc_printer=tmtc_printer, tm_timeout=get_global(CoreGlobalIds.TM_TIMEOUT),
tc_timeout_factor=get_global(CoreGlobalIds.TC_SEND_TIMEOUT_FACTOR),
send_address=send_addr, receive_address=rcv_addr)
elif com_if == ComInterfaces.Serial:
from config.tmtcc_definitions import SerialConfig
serial_cfg = get_global(GlobalIds.SERIAL_CONFIG)
serial_cfg = get_global(CoreGlobalIds.SERIAL_CONFIG)
serial_baudrate = serial_cfg[SerialConfig.SERIAL_BAUD_RATE]
serial_timeout = serial_cfg[SerialConfig.SERIAL_TIMEOUT]
# Determine COM port, either extract from JSON file or ask from user.
......@@ -55,7 +55,7 @@ def create_communication_interface_default(com_if: ComInterfaces, tmtc_printer:
serial_timeout)
elif com_if == ComInterfaces.QEMU:
from config.tmtcc_definitions import SerialConfig
serial_cfg = get_global(GlobalIds.SERIAL_CONFIG)
serial_cfg = get_global(CoreGlobalIds.SERIAL_CONFIG)
serial_timeout = serial_cfg[SerialConfig.SERIAL_TIMEOUT]
communication_interface = QEMUComIF(
tmtc_printer=tmtc_printer, serial_timeout=serial_timeout,
......
......@@ -21,3 +21,42 @@ class QueueCommands(enum.Enum):
WAIT = enum.auto()
EXPORT_LOG = enum.auto()
SET_TIMEOUT = enum.auto()
class CoreGlobalIds(enum.IntEnum):
"""
Numbers from 128 to 200 are reserved for core globals
"""
# Object handles
TMTC_HOOK = 128
COM_INTERFACE_HANDLE = 129
TM_LISTENER_HANDLE = 130
TMTC_PRINTER_HANDLE = 131
PRETTY_PRINTER = 132
# Parameters
APID = 140
MODE = 141
SERVICE = 142
SERVICELIST = 143
COM_IF = 144
OP_CODE = 145
TM_TIMEOUT = 146
# Miscellaneous
DISPLAY_MODE = 150
USE_LISTENER_AFTER_OP = 151
PRINT_HK = 152
PRINT_TM = 153
PRINT_RAW_TM = 154
PRINT_TO_FILE = 155
RESEND_TC = 156
TC_SEND_TIMEOUT_FACTOR = 157
# Config dictionaries
USE_SERIAL = 160
SERIAL_CONFIG = 161
USE_ETHERNET = 162
ETHERNET_CONFIG = 163
......@@ -14,7 +14,7 @@ from PyQt5.QtWidgets import *
from PyQt5.QtGui import QPixmap, QIcon
from tmtc_core.core.tmtcc_backend import TmTcHandler
from tmtc_core.core.tmtc_core_definitions import ComInterfaces
from tmtc_core.core.tmtccore_definitions import ComInterfaces
from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand
from tmtc_core.utility.tmtcc_logger import get_logger
from tmtc_core.core.tmtcc_globals_manager import get_global, update_global
......
from config.tmtcc_globals import GlobalIds
class GlobalsManager:
......@@ -24,7 +23,7 @@ class GlobalsManager:
self.lock = Lock()
# noinspection PyUnresolvedReferences
def get_global(self, global_param_key: GlobalIds):
def get_global(self, global_param_key: int):
global_param = self.globals_dict.get(global_param_key)
if global_param is None:
try:
......@@ -38,7 +37,7 @@ class GlobalsManager:
else:
return global_param
def add_global(self, global_param_id: GlobalIds, parameter: any):
def add_global(self, global_param_id: int, parameter: any):
self.globals_dict.update({global_param_id: parameter})
def lock_global_pool(self, timeout_seconds: float = 1) -> bool:
......@@ -48,11 +47,11 @@ class GlobalsManager:
self.lock.release()
def get_global(global_param_id: GlobalIds):
def get_global(global_param_id: int):
return GlobalsManager.get_manager().get_global(global_param_id)
def update_global(global_param_id: GlobalIds, parameter: any):
def update_global(global_param_id: int, parameter: any):
return GlobalsManager.get_manager().add_global(global_param_id, parameter)
......
......@@ -5,11 +5,11 @@ from typing import Union, Dict
class TmTcHookBase:
from tmtc_core.com_if.tmtcc_com_interface_base import CommunicationInterface
from tmtc_core.core.tmtcc_backend import TmTcHandler
from tmtc_core.core.backend import TmTcHandler
from tmtc_core.utility.tmtcc_tmtc_printer import TmTcPrinter
from tmtc_core.pus_tm.tmtcc_pus_tm_base import PusTelemetry
from tmtc_core.pus_tc.tmtcc_pus_tc_base import TcQueueT
from tmtc_core.core.tmtc_core_definitions import ComInterfaces
from tmtc_core.core.definitions import ComInterfaces
from tmtcc_definitions import ModeList, ServiceList
def __init__(self):
......
import argparse
from tmtc_core.com_if.tmtcc_serial_com_if import SerialCommunicationType
from tmtc_core.com_if.tmtcc_serial_utilities import determine_com_port
from tmtc_core.core.definitions import CoreGlobalIds, ComInterfaces
from tmtcc_definitions import ModeList, ServiceList
def default_add_globals_pre_args_parsing(gui: bool = False):
from tmtc_core.core.globals_manager import update_global
import pprint
update_global(CoreGlobalIds.APID, 0xef)
update_global(CoreGlobalIds.COM_IF, ComInterfaces.EthernetUDP)
update_global(CoreGlobalIds.TC_SEND_TIMEOUT_FACTOR, 2)
update_global(CoreGlobalIds.TM_TIMEOUT, 4)
update_global(CoreGlobalIds.DISPLAY_MODE, "long")
update_global(CoreGlobalIds.PRINT_TO_FILE, True)
update_global(CoreGlobalIds.SERIAL_CONFIG, dict())
update_global(CoreGlobalIds.ETHERNET_CONFIG, dict())
pp = pprint.PrettyPrinter()
update_global(CoreGlobalIds.PRETTY_PRINTER, pp)
update_global(CoreGlobalIds.TM_LISTENER_HANDLE, None)
update_global(CoreGlobalIds.COM_INTERFACE_HANDLE, None)
update_global(CoreGlobalIds.TMTC_PRINTER_HANDLE, None)
update_global(CoreGlobalIds.PRINT_RAW_TM, False)
update_global(CoreGlobalIds.RESEND_TC, False)
update_global(CoreGlobalIds.OP_CODE, "0")
update_global(CoreGlobalIds.MODE, ModeList.ListenerMode)
if gui:
default_set_up_ethernet_cfg()
servicelist = dict()
servicelist[ServiceList.SERVICE_2] = ["Service 2 Raw Commanding"]
servicelist[ServiceList.SERVICE_3] = ["Service 3 Housekeeping"]
servicelist[ServiceList.SERVICE_5] = ["Service 5 Event"]
servicelist[ServiceList.SERVICE_8] = ["Service 8 Functional Commanding"]
servicelist[ServiceList.SERVICE_9] = ["Service 9 Time"]
servicelist[ServiceList.SERVICE_17] = ["Service 17 Test"]
servicelist[ServiceList.SERVICE_20] = ["Service 20 Parameters"]
servicelist[ServiceList.SERVICE_23] = ["Service 23 File Management"]
servicelist[ServiceList.SERVICE_200] = ["Service 200 Mode Management"]
update_global(CoreGlobalIds.SERVICE, ServiceList.SERVICE_17)
update_global(CoreGlobalIds.SERVICELIST, servicelist)
def default_add_globals_post_args_parsing(args: argparse.Namespace):
from tmtc_core.core.globals_manager import update_global
from tmtc_core.utility.tmtcc_logger import get_logger
from config.tmtcc_definitions import ModeList
logger = get_logger()
mode_param = ModeList.ListenerMode
if 0 <= args.mode <= 6:
if args.mode == 0:
mode_param = ModeList.GUIMode
elif args.mode == 1:
mode_param = ModeList.ListenerMode
elif args.mode == 2:
mode_param = ModeList.SingleCommandMode
elif args.mode == 3:
mode_param = ModeList.ServiceTestMode
elif args.mode == 4:
mode_param = ModeList.SoftwareTestMode
update_global(CoreGlobalIds.MODE, mode_param)
if args.com_if == ComInterfaces.EthernetUDP.value:
com_if = ComInterfaces.EthernetUDP
elif args.com_if == ComInterfaces.Serial.value:
com_if = ComInterfaces.Serial
elif args.com_if == ComInterfaces.Dummy.value:
com_if = ComInterfaces.Dummy
elif args.com_if == ComInterfaces.QEMU.value:
com_if = ComInterfaces.QEMU
else:
com_if = ComInterfaces.Serial
update_global(CoreGlobalIds.COM_IF, com_if)
if args.short_display_mode:
display_mode_param = "short"
else:
display_mode_param = "long"
update_global(CoreGlobalIds.DISPLAY_MODE, display_mode_param)
service = str(args.service).lower()
if service == "2":
service = ServiceList.SERVICE_2
elif service == "3":
service = ServiceList.SERVICE_3
elif service == "5":
service = ServiceList.SERVICE_5
elif service == "8":
service = ServiceList.SERVICE_8
elif service == "9":
service = ServiceList.SERVICE_9
elif service == "17":
service = ServiceList.SERVICE_17
elif service == "20":
service = ServiceList.SERVICE_20
elif service == "23":
service = ServiceList.SERVICE_23
else:
logger.warning("Service not known! Setting standard service 17")
service = ServiceList.SERVICE_17
update_global(CoreGlobalIds.SERVICE, service)
if args.op_code is None:
op_code = 0
else:
op_code = str(args.op_code).lower()
update_global(CoreGlobalIds.OP_CODE, op_code)
update_global(CoreGlobalIds.USE_LISTENER_AFTER_OP, args.listener)
update_global(CoreGlobalIds.TM_TIMEOUT, args.tm_timeout)
update_global(CoreGlobalIds.PRINT_HK, args.print_hk)
update_global(CoreGlobalIds.PRINT_TM, args.print_tm)
update_global(CoreGlobalIds.PRINT_RAW_TM, args.raw_data_print)
update_global(CoreGlobalIds.PRINT_TO_FILE, args.print_log)
update_global(CoreGlobalIds.RESEND_TC, args.resend_tc)
update_global(CoreGlobalIds.TC_SEND_TIMEOUT_FACTOR, 3)
use_serial_cfg = False
if com_if == ComInterfaces.Serial or com_if == ComInterfaces.QEMU:
use_serial_cfg = True
if use_serial_cfg:
default_set_up_serial_cfg(com_if)
use_ethernet_cfg = False
if com_if == ComInterfaces.EthernetUDP:
use_ethernet_cfg = True
if use_ethernet_cfg:
# TODO: Port and IP address can also be passed as CLI parameters. Use them here if applicable
default_set_up_ethernet_cfg()
def default_set_up_ethernet_cfg():
from tmtc_core.core.tmtcc_globals_manager import update_global
update_global(CoreGlobalIds.USE_ETHERNET, True)
from tmtc_core.core.tmtcc_globals_manager import get_global
from config.tmtcc_definitions import EthernetConfig
from tmtc_core.com_if.tmtcc_ethernet_utilities import determine_ip_addresses
ethernet_cfg_dict = get_global(CoreGlobalIds.ETHERNET_CONFIG)
# This will either load the addresses from a JSON file or prompt them from the user.
send_addr, rcv_addr = determine_ip_addresses()
ethernet_cfg_dict.update({EthernetConfig.SEND_ADDRESS: send_addr})
ethernet_cfg_dict.update({EthernetConfig.RECV_ADDRESS: rcv_addr})
update_global(CoreGlobalIds.ETHERNET_CONFIG, ethernet_cfg_dict)
def default_set_up_serial_cfg(com_if: ComInterfaces):
from tmtc_core.core.globals_manager import update_global
update_global(CoreGlobalIds.USE_SERIAL, True)
from tmtc_core.core.globals_manager import get_global
from config.tmtcc_definitions import SerialConfig
serial_cfg_dict = get_global(CoreGlobalIds.SERIAL_CONFIG)
if com_if == ComInterfaces.Serial:
com_port = determine_com_port()
else:
com_port = ""
serial_cfg_dict.update({SerialConfig.SERIAL_PORT: com_port})
serial_cfg_dict.update({SerialConfig.SERIAL_BAUD_RATE: 115200})
serial_cfg_dict.update({SerialConfig.SERIAL_TIMEOUT: 0.01})
serial_cfg_dict.update({SerialConfig.SERIAL_COMM_TYPE: SerialCommunicationType.DLE_ENCODING})
serial_cfg_dict.update({SerialConfig.SERIAL_FRAME_SIZE: 256})
serial_cfg_dict.update({SerialConfig.SERIAL_DLE_QUEUE_LEN: 25})
serial_cfg_dict.update({SerialConfig.SERIAL_DLE_MAX_FRAME_SIZE: 1024})
update_global(CoreGlobalIds.SERIAL_CONFIG, serial_cfg_dict)
\ No newline at end of file
......@@ -14,15 +14,14 @@ if the first reply has not been received.
import time
from tmtc_core.com_if.tmtcc_com_interface_base import CommunicationInterface
from tmtc_core.core.tmtc_core_definitions import QueueCommands
from tmtc_core.core.definitions import QueueCommands, CoreGlobalIds
from tmtc_core.utility.tmtcc_tmtc_printer import TmTcPrinter
from tmtc_core.utility.tmtcc_logger import get_logger
from tmtc_core.sendreceive.tmtcc_tm_listener import TmListener
from tmtc_core.pus_tc.tmtcc_pus_tc_base import TcQueueEntryT
from tmtc_core.pus_tm.tmtcc_pus_tm_factory import PusTmQueueT
from config.tmtcc_globals import GlobalIds
from tmtc_core.core.tmtcc_globals_manager import get_global
from tmtc_core.core.globals_manager import get_global
LOGGER = get_logger()
......@@ -41,8 +40,8 @@ class CommandSenderReceiver:
and pass it here
:param tmtc_printer: TmTcPrinter object. Instantiate it and pass it here.
"""
self._tm_timeout = get_global(GlobalIds.TM_TIMEOUT)
self._tc_send_timeout_factor = get_global(GlobalIds.TC_SEND_TIMEOUT_FACTOR)
self._tm_timeout = get_global(CoreGlobalIds.TM_TIMEOUT)
self._tc_send_timeout_factor = get_global(CoreGlobalIds.TC_SEND_TIMEOUT_FACTOR)
if isinstance(com_interface, CommunicationInterface):
self._com_interface = com_interface
......@@ -86,7 +85,7 @@ class CommandSenderReceiver:
:return:
"""
if tm_timeout == -1:
tm_timeout = get_global(GlobalIds.TM_TIMEOUT)
tm_timeout = get_global(CoreGlobalIds.TM_TIMEOUT)
self._tm_timeout = tm_timeout
def set_tc_send_timeout_factor(
......@@ -98,7 +97,7 @@ class CommandSenderReceiver:
:return:
"""
if new_factor == -1:
new_factor = get_global(GlobalIds.TC_SEND_TIMEOUT_FACTOR)
new_factor = get_global(CoreGlobalIds.TC_SEND_TIMEOUT_FACTOR)
self._tc_send_timeout_factor = new_factor
def _check_for_first_reply(self) -> None:
......@@ -201,8 +200,7 @@ class CommandSenderReceiver:
self._elapsed_time = time.time() - self._start_time
if self._elapsed_time >= self._tm_timeout * self._tc_send_timeout_factor:
from tmtc_core.core.tmtcc_globals_manager import get_global
from config.tmtcc_globals import GlobalIds
if get_global(GlobalIds.RESEND_TC):
if get_global(CoreGlobalIds.RESEND_TC):
LOGGER.info("CommandSenderReceiver: Timeout, sending TC again !")
self._com_interface.send_telecommand(self._last_tc, self._last_tc_info)
self._timeout_counter = self._timeout_counter + 1
......
#!/usr/bin/python3
"""
@brief Core method called by entry point files to initiate the TMTC commander. The commander is started
by running the run_tmtc_client function.
@brief Core method called by entry point files to initiate the TMTC commander.
The commander is started by running the run_tmtc_client function.
@details
@manual
@author R. Mueller
"""
import sys
from tmtc_core.core.tmtcc_hook_base import TmTcHookBase
from tmtc_core.core.tmtccore_definitions import CoreGlobalIds
from tmtc_core.utility.tmtcc_core_args_parser import parse_input_arguments
from tmtc_core.utility.tmtcc_logger import set_tmtc_logger, get_logger
from config.tmtcc_globals import add_globals_post_args_parsing, add_globals_pre_args_parsing
logger = get_logger()
try:
......@@ -25,9 +25,22 @@ except ImportError as error:
logger.warning("TMTC Runner: No user versioning file found!")
def assign_tmtc_commander_hooks(hook_base: TmTcHookBase):
from tmtc_core.core.tmtcc_globals_manager import update_global
from tmtc_core.core.tmtccore_definitions import CoreGlobalIds
if hook_base is None:
logger.error("Passed hook base object handle is invalid! Terminating..")
sys.exit(-1)
update_global(CoreGlobalIds.TMTC_HOOK, hook_base)
def run_tmtc_client(use_gui: bool, reduced_printout: bool = False):
"""
Main method, reads input arguments, sets global variables and start TMTC handler.
This is the primary function to run the TMTC commander. Users should call this function to
start the TMTC commander.
:param use_gui: Specify whether the GUI is used or not
:param reduced_printout: It is possible to reduce the initial printout with this flag
:return:
"""
if not reduced_printout:
handle_init_printout(use_gui)
......@@ -36,6 +49,7 @@ def run_tmtc_client(use_gui: bool, reduced_printout: bool = False):
logger.info("Starting TMTC Client..")
if use_gui:
from config.tmtcc_globals import add_globals_pre_args_parsing
add_globals_pre_args_parsing(True)
else:
handle_cli_args_and_globals()
......@@ -58,6 +72,7 @@ def handle_init_printout(use_gui: bool):
def handle_cli_args_and_globals():
from config.tmtcc_globals import add_globals_pre_args_parsing, add_globals_post_args_parsing
logger.info("Setting up pre-globals..")
add_globals_pre_args_parsing(False)
logger.info("Parsing input arguments..")
......@@ -70,12 +85,12 @@ def handle_cli_args_and_globals():
def start_tmtc_commander_cli():
from tmtc_core.core.tmtcc_backend import TmTcHandler
from tmtc_core.core.tmtcc_globals_manager import get_global
from config.tmtcc_globals import GlobalIds
service = get_global(GlobalIds.SERVICE)
op_code = get_global(GlobalIds.OP_CODE)
service = get_global(CoreGlobalIds.SERVICE)
op_code = get_global(CoreGlobalIds.OP_CODE)
# The global variables are set by the argument parser.
tmtc_handler = TmTcHandler(get_global(GlobalIds.COM_IF), get_global(GlobalIds.MODE), service, op_code)
tmtc_handler.set_one_shot_or_loop_handling(get_global(GlobalIds.USE_LISTENER_AFTER_OP))
tmtc_handler = TmTcHandler(get_global(CoreGlobalIds.COM_IF), get_global(CoreGlobalIds.MODE),
service, op_code)
tmtc_handler.set_one_shot_or_loop_handling(get_global(CoreGlobalIds.USE_LISTENER_AFTER_OP))
tmtc_handler.initialize()
tmtc_handler.start()
......@@ -83,15 +98,14 @@ def start_tmtc_commander_cli():
def start_tmtc_commander_qt_gui():
from tmtc_core.core.tmtc_frontend import TmTcFrontend
from tmtc_core.core.tmtcc_globals_manager import get_global
from config.tmtcc_globals import GlobalIds
try:
from PyQt5.QtWidgets import QApplication
except ImportError:
logger.error("PyQt5 module not installed, can't run GUI mode!")
sys.exit(1)
app = QApplication(["TMTC Commander"])
tmtc_gui = TmTcFrontend(get_global(GlobalIds.COM_IF), get_global(GlobalIds.MODE),
get_global(GlobalIds.SERVICE))
tmtc_gui = TmTcFrontend(get_global(CoreGlobalIds.COM_IF), get_global(CoreGlobalIds.MODE),
get_global(CoreGlobalIds.SERVICE))
tmtc_gui.start_ui()
sys.exit(app.exec_())
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment