Skip to content
Snippets Groups Projects
Forked from an inaccessible project.
obsw_tmtc_printer.py 12.94 KiB
#!/usr/bin/python3.7
# -*- coding: utf-8 -*-
"""
@file
    obsw_config.py
@date
    01.11.2019
@brief
    Class that performs all printing functionalities
"""
import os
import sys
import enum
from config import obsw_config as g
from tm.obsw_pus_tm_base import PusTelemetry
from tm.obsw_tm_service_3 import Service3TM
from tc.obsw_pus_tc_base import PusTcInfoT, TcDictionaryKeys
from utility.obsw_logger import get_logger

logger = get_logger()


class DisplayMode(enum.Enum):
    SHORT = enum.auto()
    LONG = enum.auto()


class TmTcPrinter:
    """
    This class handles printing to the command line and to files.
    """
    def __init__(self, display_mode: DisplayMode.LONG, do_print_to_file: bool = True,
                 print_tc: bool = True):
        """
        :param display_mode:
        :param do_print_to_file: if true, print to file
        :param print_tc: if true, print TCs
        """
        # TODO: we should implement a list of strings here. each service test string
        #       is written into a list entry. For the file output, the list entries are concatenated
        #       and put into the main log file.
        self.display_mode = display_mode
        self.do_print_to_file = do_print_to_file
        self.print_tc = print_tc
        self.__print_buffer = ""
        # global print buffer which will be useful to print something to file
        self.__file_buffer = ""
        # List implementation to store
        self.file_buffer_list = []


    def print_telemetry(self, packet: PusTelemetry):
        """
        This function handles printing telemetry
        :param packet:
        :return:
        """
        if self.display_mode == DisplayMode.SHORT:
            self.__handle_short_print(packet)
        else:
            self.__handle_long_print(packet)
        self.__handle_wiretapping_packet(packet)
        self.__handle_data_reply_packet(packet)
        if packet.get_service() == 3 and \
                (packet.get_subservice() == 25 or packet.get_subservice() == 26):
            self.__handle_hk_print(packet)
        if packet.get_service() == 3 and \
                (packet.get_subservice() == 10 or packet.get_subservice() == 12):
            self.__handle_hk_definition_print(packet)
        if g.G_PRINT_RAW_TM:
            self.__print_buffer = "TM Data:" + "\n" + self.return_data_string(packet.get_tm_data())
            logger.info(self.__print_buffer)
            self.add_print_buffer_to_file_buffer()

    def __handle_short_print(self, tm_packet: PusTelemetry):
        self.__print_buffer = "Received TM[" + str(tm_packet.get_service()) + "," + str(
            tm_packet.get_subservice()) + "]"
        logger.info(self.__print_buffer)
        self.add_print_buffer_to_file_buffer()

    def __handle_long_print(self, tm_packet: PusTelemetry):
        self.__print_buffer = "Received Telemetry: " + tm_packet.print_info
        logger.info(self.__print_buffer)
        self.add_print_buffer_to_file_buffer()
        self.__handle_column_header_print(tm_packet)
        self.__handle_tm_content_print(tm_packet)

    def __handle_column_header_print(self, tm_packet: PusTelemetry):
        rec_pus = []
        tm_packet.append_telemetry_column_headers(rec_pus)
        self.__print_buffer = str(rec_pus)
        logger.info(self.__print_buffer)
        self.add_print_buffer_to_file_buffer()

    def __handle_tm_content_print(self, tm_packet: PusTelemetry):
        """
        :param tm_packet:
        :return:
        """
        rec_pus = []
        tm_packet.append_telemetry_content(rec_pus)
        self.__print_buffer = str(rec_pus)
        logger.info(self.__print_buffer)
        self.add_print_buffer_to_file_buffer()

    def __handle_hk_print(self, tm_packet: Service3TM):
        """
        Prints HK _tm_data previously set by TM receiver
        :param tm_packet:
        :return:
        """
        if g.G_PRINT_HK_DATA:
            self.__print_buffer = "HK Data from SID "
            self.__print_buffer = self.__print_buffer + str(hex(tm_packet.sid)) + " :"
            self.__print_hk(tm_packet)
            self.__print_validity_buffer(tm_packet)

    def __handle_hk_definition_print(self, tm_packet: Service3TM):
        """
        :param tm_packet:
        :return:
        """
        if g.G_PRINT_HK_DATA:
            self.__print_buffer = "HK Definition from SID "
            self.__print_buffer = self.__print_buffer + str(hex(tm_packet.sid)) + " :"
            self.__print_hk(tm_packet)

    def __print_hk(self, tm_packet: Service3TM):
        """
        :param tm_packet:
        :return:
        """
        print(self.__print_buffer)
        self.add_print_buffer_to_file_buffer()
        self.__print_buffer = str(tm_packet.hkHeader)
        print(self.__print_buffer)
        self.add_print_buffer_to_file_buffer()
        self.__print_buffer = str(tm_packet.hkContent)
        print(self.__print_buffer)
        self.add_print_buffer_to_file_buffer()

    def __print_validity_buffer(self, tm_packet: Service3TM):
        """
        :param tm_packet:
        :return:
        """
        self.__print_buffer = "Valid: "
        print(self.__print_buffer)
        self.add_print_buffer_to_file_buffer()
        self.__handle_validity_buffer_print(tm_packet.validity_buffer, tm_packet.numberOfParameters)

    def __handle_validity_buffer_print(self, validity_buffer: bytearray, number_of_parameters):
        """
        :param validity_buffer:
        :param number_of_parameters:
        :return:
        """
        self.__print_buffer = "["
        counter = 0
        for index, byte in enumerate(validity_buffer):
            for bit in range(1, 9):
                if self.bit_extractor(byte, bit) == 1:
                    self.__print_buffer = self.__print_buffer + "Yes"
                else:
                    self.__print_buffer = self.__print_buffer + "No"
                counter += 1
                if counter == number_of_parameters:
                    self.__print_buffer = self.__print_buffer + "]"
                    break
                else:
                    self.__print_buffer = self.__print_buffer + ", "
        logger.info(self.__print_buffer)
        self.add_print_buffer_to_file_buffer()

    def __handle_wiretapping_packet(self, wiretapping_packet: PusTelemetry):
        """
        :param wiretapping_packet:
        :return:
        """
        if wiretapping_packet.get_service() == 2 and (wiretapping_packet.get_subservice() == 131 or
                                                      wiretapping_packet.get_subservice() == 130):
            self.__print_buffer = "Wiretapping Packet or Raw Reply from TM [" + \
                                  str(wiretapping_packet.get_service()) + "," + \
                                  str(wiretapping_packet.get_subservice()) + "]: "
            self.__print_buffer = self.__print_buffer + wiretapping_packet.return_source_data()
            logger.info(self.__print_buffer)
            self.add_print_buffer_to_file_buffer()

    def __handle_data_reply_packet(self, tm_packet: PusTelemetry):
        """
        Handles the PUS Service 8 _tm_data reply packets.
        :param tm_packet:
        :return:
        """
        if tm_packet.get_service() == 8 and tm_packet.get_subservice() == 130:
            self.__print_buffer = "Service 8 Direct Command Reply TM[8,130] with TM data: " \
                                  + tm_packet.return_source_data()
            logger.info(self.__print_buffer)

    def print_data(self, byte_array: bytearray):
        """
        :param byte_array:
        :return: None
        """
        string = self.return_data_string(byte_array)
        logger.info(string)

    @staticmethod
    def return_data_string(byte_array: bytearray) -> str:
        """
        Converts a bytearray to string format for printing
        :param byte_array:
        :return:
        """
        str_to_print = "["
        for byte in byte_array:
            str_to_print += str(hex(byte)) + " , "
        str_to_print = str_to_print.rstrip(' ')
        str_to_print = str_to_print.rstrip(',')
        str_to_print += ']'
        return str_to_print

    def print_string(self, string: str, add_cr_to_file_buffer: bool = False):
        """
        Print a string and adds it to the file buffer.
        :param string:
        :param add_cr_to_file_buffer:
        :return:
        """
        self.__print_buffer = string
        logger.info(self.__print_buffer)
        if self.do_print_to_file:
            self.add_print_buffer_to_file_buffer(add_cr_to_file_buffer)

    def add_to_print_string(self, string_to_add: str=""):
        self.__print_buffer += string_to_add

    def add_print_buffer_to_file_buffer(self, add_cr_to_file_buffer: bool = False):
        """
        :return:
        """
        if self.do_print_to_file:
            if add_cr_to_file_buffer:
                self.__file_buffer += self.__print_buffer + "\r\n\r\n"
            else:
                self.__file_buffer += self.__print_buffer + "\r\n"



    def add_print_buffer_to_buffer_list(self):
        self.file_buffer_list.append(self.__file_buffer)

    def clear_file_buffer(self):
        self.__file_buffer = ""

    def print_to_file(self, log_name: str = "log/tmtc_log.txt", clear_file_buffer: bool = False):
        """
        :param log_name:
        :param clear_file_buffer:
        :return:
        """
        try:
            file = open(log_name, 'w')
        except FileNotFoundError:
            logger.info("Log directory does not exists, creating log folder.")
            os.mkdir('log')
            file = open(log_name, 'w')
        file.write(self.__file_buffer)
        if clear_file_buffer:
            self.__file_buffer = ""
        logger.info("Log file written to " + log_name)
        file.close()

    def print_file_buffer_list_to_file(self, log_name: str = "log/tmtc_log.txt",
                                       clear_list: bool = True):
        """
        Joins the string list and prints it to an output file.
        :param log_name:
        :param clear_list:
        :return:
        """
        try:
            file = open(log_name, 'w')
        except FileNotFoundError:
            logger.info("Log directory does not exists, creating log folder.")
            os.mkdir('log')
            file = open(log_name, 'w')
        file_buffer = ''.join(self.file_buffer_list)
        file.write(file_buffer)
        if clear_list:
            self.file_buffer_list = []
        logger.info("Log file written to " + log_name)
        file.close()



    @staticmethod
    def bit_extractor(byte: int, position: int):
        """

        :param byte:
        :param position:
        :return:
        """
        shift_number = position + (6 - 2 * (position - 1))
        return (byte >> shift_number) & 1

    def print_telecommand(self, tc_packet: bytes, tc_packet_info: PusTcInfoT = None):
        """
        This function handles the printing of Telecommands
        :param tc_packet:
        :param tc_packet_info:
        :return:
        """
        if self.print_tc:
            if len(tc_packet) == 0:
                logger.error("TMTC Printer: Empty packet was sent, configuration error")
                return
            if tc_packet_info is None:
                logger.error("TMTC Printer: No packet info supplied to print")
                return
            if self.display_mode == DisplayMode.SHORT:
                self.__handle_short_tc_print(tc_packet_info)
            else:
                self.__handle_long_tc_print(tc_packet_info)

    def __handle_short_tc_print(self, tc_packet_info: PusTcInfoT):
        """
        Brief TC print
        :param tc_packet_info:
        :return:
        """
        self.__print_buffer = "Sent TC[" + str(tc_packet_info[TcDictionaryKeys.SERVICE]) + "," + \
                              str(tc_packet_info[TcDictionaryKeys.SUBSERVICE]) + "] " + " with SSC " + \
                              str(tc_packet_info[TcDictionaryKeys.SSC])
        logger.info(self.__print_buffer)
        self.add_print_buffer_to_file_buffer()

    def __handle_long_tc_print(self, tc_packet_info: PusTcInfoT):
        """
        Long TC print
        :param tc_packet_info:
        :return:
        """
        try:
            self.__print_buffer = \
                "Telecommand TC[" + str(tc_packet_info[TcDictionaryKeys.SERVICE]) + "," + \
                str(tc_packet_info[TcDictionaryKeys.SUBSERVICE]) + "] with SSC " + \
                str(tc_packet_info[TcDictionaryKeys.SSC]) + " sent with data " + \
                self.return_data_string(tc_packet_info[TcDictionaryKeys.DATA])

            logger.info(self.__print_buffer)
            self.add_print_buffer_to_file_buffer()
        except TypeError as e:
            logger.error("TMTC Printer: Type Error! Traceback: " + str(e))