From bc709025545e4ab75816d424e482de3501ae934e Mon Sep 17 00:00:00 2001 From: "Robin.Mueller" <robin.mueller.m@gmail.com> Date: Wed, 19 Aug 2020 20:54:28 +0200 Subject: [PATCH] structure improved --- .../tmtcclient_Module_Test_Serial.xml | 2 +- config/obsw_config.py | 2 + obsw_tmtc_client.py | 54 +++++++++---------- utility/obsw_binary_uploader.py | 3 ++ 4 files changed, 33 insertions(+), 28 deletions(-) diff --git a/.idea/runConfigurations/tmtcclient_Module_Test_Serial.xml b/.idea/runConfigurations/tmtcclient_Module_Test_Serial.xml index edc3069..61aa71d 100644 --- a/.idea/runConfigurations/tmtcclient_Module_Test_Serial.xml +++ b/.idea/runConfigurations/tmtcclient_Module_Test_Serial.xml @@ -13,7 +13,7 @@ <option name="ADD_SOURCE_ROOTS" value="true" /> <EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" /> <option name="SCRIPT_NAME" value="$PROJECT_DIR$/obsw_tmtc_client.py" /> - <option name="PARAMETERS" value="-m 5 -c 1 --hk" /> + <option name="PARAMETERS" value="-m 6 -c 1 --hk" /> <option name="SHOW_COMMAND_LINE" value="false" /> <option name="EMULATE_TERMINAL" value="true" /> <option name="MODULE_MODE" value="false" /> diff --git a/config/obsw_config.py b/config/obsw_config.py index e6edf57..e9fbfc0 100644 --- a/config/obsw_config.py +++ b/config/obsw_config.py @@ -143,6 +143,8 @@ def set_globals(args): elif args.mode == 4: G_MODE_ID = ModeList.SoftwareTestMode elif args.mode == 5: + G_MODE_ID = ModeList.BinaryUploadMode + elif args.mode == 6: G_MODE_ID = ModeList.UnitTest else: G_MODE_ID = ModeList[1] diff --git a/obsw_tmtc_client.py b/obsw_tmtc_client.py index 27a6d6a..b78431f 100755 --- a/obsw_tmtc_client.py +++ b/obsw_tmtc_client.py @@ -75,6 +75,7 @@ from utility.obsw_args_parser import parse_input_arguments from utility.obsw_tmtc_printer import TmTcPrinter from utility.obsw_exit_handler import keyboard_interrupt_handler from utility.obsw_logger import set_tmtc_logger, get_logger +from utility.obsw_binary_uploader import perform_binary_upload from comIF.obsw_com_config import set_communication_interface @@ -139,6 +140,14 @@ class TmTcHandler: self.com_if = g.G_COM_IF # This flag could be used later to command the TMTC Client with a front-end self.command_received = True + self.tmtc_printer = TmTcPrinter(g.G_DISPLAY_MODE, g.G_PRINT_TO_FILE, True) + self.communication_interface = set_communication_interface(self.tmtc_printer) + self.tm_listener = TmListener( + com_interface=self.communication_interface, tm_timeout=g.G_TM_TIMEOUT, + tc_timeout_factor=g.G_TC_SEND_TIMEOUT_FACTOR + ) + atexit.register(keyboard_interrupt_handler, com_interface=self.communication_interface) + self.tm_listener.start() def perform_operation(self): """ @@ -159,27 +168,17 @@ class TmTcHandler: """ Command handling. """ - tmtc_printer = TmTcPrinter(g.G_DISPLAY_MODE, g.G_PRINT_TO_FILE, True) - communication_interface = set_communication_interface(tmtc_printer) - atexit.register(keyboard_interrupt_handler, com_interface=communication_interface) - - tm_listener = TmListener( - com_interface=communication_interface, tm_timeout=g.G_TM_TIMEOUT, - tc_timeout_factor=g.G_TC_SEND_TIMEOUT_FACTOR - ) - tm_listener.start() - if self.mode == g.ModeList.ListenerMode: - if tm_listener.event_reply_received: + if self.tm_listener.event_reply_received: # TODO: Test this. LOGGER.info("TmTcHandler: Packets received.") - tmtc_printer.print_telemetry_queue(tm_listener.retrieve_tm_packet_queue()) + self.tmtc_printer.print_telemetry_queue(self.tm_listener.retrieve_tm_packet_queue()) self.command_received = True elif self.mode == g.ModeList.SingleCommandMode: pus_packet_tuple = command_preparation() sender_and_receiver = SingleCommandSenderReceiver( - com_interface=communication_interface, tmtc_printer=tmtc_printer, - tm_listener=tm_listener) + com_interface=self.communication_interface, tmtc_printer=self.tmtc_printer, + tm_listener=self.tm_listener) LOGGER.info("Performing single command operation") sender_and_receiver.send_single_tc_and_receive_tm(pus_packet_tuple=pus_packet_tuple) @@ -188,35 +187,36 @@ class TmTcHandler: pack_service_queue(g.G_SERVICE, service_queue) LOGGER.info("Performing service command operation") sender_and_receiver = SequentialCommandSenderReceiver( - com_interface=communication_interface, tmtc_printer=tmtc_printer, - tm_listener=tm_listener, tc_queue=service_queue) + com_interface=self.communication_interface, tmtc_printer=self.tmtc_printer, + tm_listener=self.tm_listener, tc_queue=service_queue) sender_and_receiver.send_queue_tc_and_receive_tm_sequentially() elif self.mode == g.ModeList.SoftwareTestMode: all_tc_queue = create_total_tc_queue() LOGGER.info("Performing multiple service commands operation") sender_and_receiver = SequentialCommandSenderReceiver( - com_interface=communication_interface, tmtc_printer=tmtc_printer, - tc_queue=all_tc_queue, tm_listener=tm_listener) + com_interface=self.communication_interface, tmtc_printer=self.tmtc_printer, + tc_queue=all_tc_queue, tm_listener=self.tm_listener) sender_and_receiver.send_queue_tc_and_receive_tm_sequentially() LOGGER.info("SequentialSenderReceiver: Exporting output to log file.") - tmtc_printer.print_file_buffer_list_to_file("log/tmtc_log.txt", True) + self.tmtc_printer.print_file_buffer_list_to_file("log/tmtc_log.txt", True) + + elif self.mode == g.ModeList.BinaryUploadMode: + # Upload binary, prompt user for input, in the end prompt for new mode and enter that + # mode + perform_binary_upload() + self.command_received = True elif self.mode == g.ModeList.UnitTest: # Set up test suite and run it with runner. Verbosity specifies detail level - g.G_TM_LISTENER = tm_listener - g.G_COM_INTERFACE = communication_interface - g.G_TMTC_PRINTER = tmtc_printer + g.G_TM_LISTENER = self.tm_listener + g.G_COM_INTERFACE = self.communication_interface + g.G_TMTC_PRINTER = self.tmtc_printer LOGGER.info("Performing module tests") # noinspection PyTypeChecker suite = unittest.TestLoader().loadTestsFromModule(obsw_pus_service_test) unittest.TextTestRunner(verbosity=2).run(suite) - elif self.mode == g.ModeList.BinaryUploadMode: - # Upload binary, prompt user for input, in the end prompt for new mode and enter that - # mode - - self.command_received = True else: logging.error("Unknown Mode, Configuration error !") sys.exit() diff --git a/utility/obsw_binary_uploader.py b/utility/obsw_binary_uploader.py index f507f12..d0872b9 100644 --- a/utility/obsw_binary_uploader.py +++ b/utility/obsw_binary_uploader.py @@ -7,3 +7,6 @@ a supplied binary. The binary will be sent via the specified communication inter It will be possible to encode the data (for example using DLE encoding) """ + +def perform_binary_upload(): + print("Test!") -- GitLab