diff --git a/.idea/runConfigurations/tmtcclient_Module_Test_Serial.xml b/.idea/runConfigurations/tmtcclient_Module_Test_Serial.xml index edc30694c5b1a4ed02b2d773a89fef28ccb4c638..61aa71dc850e5caf9c5bc54cb29b45ff3b63c73e 100644 --- a/.idea/runConfigurations/tmtcclient_Module_Test_Serial.xml +++ b/.idea/runConfigurations/tmtcclient_Module_Test_Serial.xml @@ -13,7 +13,7 @@ <option name="ADD_SOURCE_ROOTS" value="true" /> <EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" /> <option name="SCRIPT_NAME" value="$PROJECT_DIR$/obsw_tmtc_client.py" /> - <option name="PARAMETERS" value="-m 5 -c 1 --hk" /> + <option name="PARAMETERS" value="-m 6 -c 1 --hk" /> <option name="SHOW_COMMAND_LINE" value="false" /> <option name="EMULATE_TERMINAL" value="true" /> <option name="MODULE_MODE" value="false" /> diff --git a/config/obsw_config.py b/config/obsw_config.py index e6edf5751fac6de88a582a4020ac32316b790608..e9fbfc09cfcd9739d2cdc156a35309019c8e8713 100644 --- a/config/obsw_config.py +++ b/config/obsw_config.py @@ -143,6 +143,8 @@ def set_globals(args): elif args.mode == 4: G_MODE_ID = ModeList.SoftwareTestMode elif args.mode == 5: + G_MODE_ID = ModeList.BinaryUploadMode + elif args.mode == 6: G_MODE_ID = ModeList.UnitTest else: G_MODE_ID = ModeList[1] diff --git a/obsw_tmtc_client.py b/obsw_tmtc_client.py index 27a6d6a8e61fce6b1562c2416be7f6853e157362..b78431fe455510aab06dc5facfe113e4df423562 100755 --- a/obsw_tmtc_client.py +++ b/obsw_tmtc_client.py @@ -75,6 +75,7 @@ from utility.obsw_args_parser import parse_input_arguments from utility.obsw_tmtc_printer import TmTcPrinter from utility.obsw_exit_handler import keyboard_interrupt_handler from utility.obsw_logger import set_tmtc_logger, get_logger +from utility.obsw_binary_uploader import perform_binary_upload from comIF.obsw_com_config import set_communication_interface @@ -139,6 +140,14 @@ class TmTcHandler: self.com_if = g.G_COM_IF # This flag could be used later to command the TMTC Client with a front-end self.command_received = True + self.tmtc_printer = TmTcPrinter(g.G_DISPLAY_MODE, g.G_PRINT_TO_FILE, True) + self.communication_interface = set_communication_interface(self.tmtc_printer) + self.tm_listener = TmListener( + com_interface=self.communication_interface, tm_timeout=g.G_TM_TIMEOUT, + tc_timeout_factor=g.G_TC_SEND_TIMEOUT_FACTOR + ) + atexit.register(keyboard_interrupt_handler, com_interface=self.communication_interface) + self.tm_listener.start() def perform_operation(self): """ @@ -159,27 +168,17 @@ class TmTcHandler: """ Command handling. """ - tmtc_printer = TmTcPrinter(g.G_DISPLAY_MODE, g.G_PRINT_TO_FILE, True) - communication_interface = set_communication_interface(tmtc_printer) - atexit.register(keyboard_interrupt_handler, com_interface=communication_interface) - - tm_listener = TmListener( - com_interface=communication_interface, tm_timeout=g.G_TM_TIMEOUT, - tc_timeout_factor=g.G_TC_SEND_TIMEOUT_FACTOR - ) - tm_listener.start() - if self.mode == g.ModeList.ListenerMode: - if tm_listener.event_reply_received: + if self.tm_listener.event_reply_received: # TODO: Test this. LOGGER.info("TmTcHandler: Packets received.") - tmtc_printer.print_telemetry_queue(tm_listener.retrieve_tm_packet_queue()) + self.tmtc_printer.print_telemetry_queue(self.tm_listener.retrieve_tm_packet_queue()) self.command_received = True elif self.mode == g.ModeList.SingleCommandMode: pus_packet_tuple = command_preparation() sender_and_receiver = SingleCommandSenderReceiver( - com_interface=communication_interface, tmtc_printer=tmtc_printer, - tm_listener=tm_listener) + com_interface=self.communication_interface, tmtc_printer=self.tmtc_printer, + tm_listener=self.tm_listener) LOGGER.info("Performing single command operation") sender_and_receiver.send_single_tc_and_receive_tm(pus_packet_tuple=pus_packet_tuple) @@ -188,35 +187,36 @@ class TmTcHandler: pack_service_queue(g.G_SERVICE, service_queue) LOGGER.info("Performing service command operation") sender_and_receiver = SequentialCommandSenderReceiver( - com_interface=communication_interface, tmtc_printer=tmtc_printer, - tm_listener=tm_listener, tc_queue=service_queue) + com_interface=self.communication_interface, tmtc_printer=self.tmtc_printer, + tm_listener=self.tm_listener, tc_queue=service_queue) sender_and_receiver.send_queue_tc_and_receive_tm_sequentially() elif self.mode == g.ModeList.SoftwareTestMode: all_tc_queue = create_total_tc_queue() LOGGER.info("Performing multiple service commands operation") sender_and_receiver = SequentialCommandSenderReceiver( - com_interface=communication_interface, tmtc_printer=tmtc_printer, - tc_queue=all_tc_queue, tm_listener=tm_listener) + com_interface=self.communication_interface, tmtc_printer=self.tmtc_printer, + tc_queue=all_tc_queue, tm_listener=self.tm_listener) sender_and_receiver.send_queue_tc_and_receive_tm_sequentially() LOGGER.info("SequentialSenderReceiver: Exporting output to log file.") - tmtc_printer.print_file_buffer_list_to_file("log/tmtc_log.txt", True) + self.tmtc_printer.print_file_buffer_list_to_file("log/tmtc_log.txt", True) + + elif self.mode == g.ModeList.BinaryUploadMode: + # Upload binary, prompt user for input, in the end prompt for new mode and enter that + # mode + perform_binary_upload() + self.command_received = True elif self.mode == g.ModeList.UnitTest: # Set up test suite and run it with runner. Verbosity specifies detail level - g.G_TM_LISTENER = tm_listener - g.G_COM_INTERFACE = communication_interface - g.G_TMTC_PRINTER = tmtc_printer + g.G_TM_LISTENER = self.tm_listener + g.G_COM_INTERFACE = self.communication_interface + g.G_TMTC_PRINTER = self.tmtc_printer LOGGER.info("Performing module tests") # noinspection PyTypeChecker suite = unittest.TestLoader().loadTestsFromModule(obsw_pus_service_test) unittest.TextTestRunner(verbosity=2).run(suite) - elif self.mode == g.ModeList.BinaryUploadMode: - # Upload binary, prompt user for input, in the end prompt for new mode and enter that - # mode - - self.command_received = True else: logging.error("Unknown Mode, Configuration error !") sys.exit() diff --git a/utility/obsw_binary_uploader.py b/utility/obsw_binary_uploader.py index f507f12ae52bb5518d692c9b2f461c78c7295ca5..d0872b97790882b1f6bd0d8ef052799a1e5ae145 100644 --- a/utility/obsw_binary_uploader.py +++ b/utility/obsw_binary_uploader.py @@ -7,3 +7,6 @@ a supplied binary. The binary will be sent via the specified communication inter It will be possible to encode the data (for example using DLE encoding) """ + +def perform_binary_upload(): + print("Test!")