import atexit import time import logging import sys from collections import deque from config import obsw_config as g from typing import Tuple, Union from tc.obsw_pus_tc_packer import ServiceQueuePacker, create_total_tc_queue from test.obsw_pus_service_test import run_selected_pus_tests from tmtc_core.utility.obsw_logger import get_logger from tmtc_core.tc.obsw_pus_tc_base import PusTcInfo from tmtc_core.sendreceive.obsw_single_command_sender_receiver import SingleCommandSenderReceiver from tmtc_core.sendreceive.obsw_sequential_sender_receiver import SequentialCommandSenderReceiver from tmtc_core.sendreceive.obsw_tm_listener import TmListener from tmtc_core.utility.obsw_tmtc_printer import TmTcPrinter from tmtc_core.utility.obsw_exit_handler import keyboard_interrupt_handler from config.obsw_com_config import set_communication_interface from utility.obsw_binary_uploader import BinaryFileUploader from config.obsw_user_code import command_preparation_hook LOGGER = get_logger() class TmTcHandler: """ This is the primary class which handles TMTC reception. This can be seen as the backend in case a GUI or front-end is implemented. """ def __init__(self): self.mode = g.G_MODE_ID self.com_if = g.G_COM_IF # This flag could be used later to command the TMTC Client with a front-end self.command_received = True self.tmtc_printer = TmTcPrinter(g.G_DISPLAY_MODE, g.G_PRINT_TO_FILE, True) self.communication_interface = set_communication_interface(self.tmtc_printer) self.tm_listener = TmListener( com_interface=self.communication_interface, tm_timeout=g.G_TM_TIMEOUT, tc_timeout_factor=g.G_TC_SEND_TIMEOUT_FACTOR ) self.single_command_package: Tuple[bytearray, Union[None, PusTcInfo]] = bytearray(), None if self.communication_interface.valid: self.tm_listener.start() else: LOGGER.info("No communication interface set for now") LOGGER.info("TM listener will not be started") atexit.register(keyboard_interrupt_handler, com_interface=self.communication_interface) def perform_operation(self): """ Periodic operation """ while True: try: if self.command_received: self.command_received = False self.handle_action() if self.mode == g.ModeList.Idle: LOGGER.info("TMTC Client in idle mode") time.sleep(5) if self.mode == g.ModeList.ListenerMode: time.sleep(1) except KeyboardInterrupt: LOGGER.info("Closing TMTC client.") sys.exit() except IOError as e: LOGGER.exception(e) LOGGER.info("Closing TMTC client.") sys.exit() def handle_action(self): """ Command handling. """ if self.mode == g.ModeList.PromptMode: self.prompt_mode() if self.mode == g.ModeList.ListenerMode: if self.tm_listener.reply_event(): LOGGER.info("TmTcHandler: Packets received.") self.tmtc_printer.print_telemetry_queue(self.tm_listener.retrieve_tm_packet_queue()) self.tm_listener.clear_tm_packet_queue() self.tm_listener.clear_reply_event() self.command_received = True elif self.mode == g.ModeList.SingleCommandMode: if self.single_command_package is None: pus_packet_tuple = command_preparation() else: LOGGER.info("send package from gui") pus_packet_tuple = self.single_command_package sender_and_receiver = SingleCommandSenderReceiver( com_interface=self.communication_interface, tmtc_printer=self.tmtc_printer, tm_listener=self.tm_listener) LOGGER.info("Performing single command operation") sender_and_receiver.send_single_tc_and_receive_tm(pus_packet_tuple=pus_packet_tuple) self.command_received = True self.mode = g.ModeList.PromptMode elif self.mode == g.ModeList.ServiceTestMode: service_queue = deque() service_queue_packer = ServiceQueuePacker() op_code = g.G_OP_CODE service_queue_packer.pack_service_queue( service=g.G_SERVICE, service_queue=service_queue, op_code=op_code) if not self.communication_interface.valid: return LOGGER.info("Performing service command operation") sender_and_receiver = SequentialCommandSenderReceiver( com_interface=self.communication_interface, tmtc_printer=self.tmtc_printer, tm_listener=self.tm_listener, tc_queue=service_queue) sender_and_receiver.send_queue_tc_and_receive_tm_sequentially() self.command_received = True self.mode = g.ModeList.ListenerMode elif self.mode == g.ModeList.SoftwareTestMode: all_tc_queue = create_total_tc_queue() LOGGER.info("Performing multiple service commands operation") sender_and_receiver = SequentialCommandSenderReceiver( com_interface=self.communication_interface, tmtc_printer=self.tmtc_printer, tc_queue=all_tc_queue, tm_listener=self.tm_listener) sender_and_receiver.send_queue_tc_and_receive_tm_sequentially() LOGGER.info("SequentialSenderReceiver: Exporting output to log file.") self.tmtc_printer.print_file_buffer_list_to_file("log/tmtc_log.txt", True) elif self.mode == g.ModeList.BinaryUploadMode: # Upload binary, prompt user for input, in the end prompt for new mode and enter that # mode file_uploader = BinaryFileUploader(self.communication_interface, self.tmtc_printer, self.tm_listener) file_uploader.perform_file_upload() self.command_received = True self.mode = g.ModeList.ListenerMode elif self.mode == g.ModeList.UnitTest: # Set up test suite and run it with runner. Verbosity specifies detail level g.G_TM_LISTENER = self.tm_listener g.G_COM_INTERFACE = self.communication_interface g.G_TMTC_PRINTER = self.tmtc_printer LOGGER.info("Performing module tests") run_selected_pus_tests() else: logging.error("Unknown Mode, Configuration error !") sys.exit() def prompt_mode(self): next_mode = input("Please enter next mode (enter h for list of modes): ") if next_mode == 'h': print("Mode 0: Idle mode") print("Mode 1: Listener mode") print("Mode 2: Single Command mode") print("Mode 3: Service mode") print("Mode 4: Software mode") print("Mode 5: Binary upload mode") print("Mode 5: Module test mode") self.prompt_mode() elif next_mode == 1: self.mode = g.ModeList.ListenerMode else: self.mode = g.ModeList.ListenerMode def command_preparation() -> Tuple[bytearray, Union[None, PusTcInfo]]: """ Prepare command for single command testing :return: """ return command_preparation_hook()