Skip to content
Snippets Groups Projects
Forked from an inaccessible project.
obsw_tmtc_printer.py 10.70 KiB
#!/usr/bin/python3.7
# -*- coding: utf-8 -*-
"""
@file
    OBSW_Config.py
@date
    01.11.2019
@brief
    Class that performs all printing functionalities
"""
import os
import sys
from typing import TypeVar
from config import OBSW_Config as g
from tm.obsw_pus_tm_base import PusTmT, PusTelemetry
from tm.obsw_tm_service_3 import PusTm3T
from tc.obsw_pus_tc_base import PusTcT, PusTcInfoT

TmTcPrinterT = TypeVar('TmTcPrinterT', bound='TmTcPrinter')


class TmTcPrinter:
    """
    This class handles printing to the command line and to files.
    """
    def __init__(self, display_mode: str = "long", do_print_to_file: bool = True,
                 print_tc: bool = True):
        """
        :param display_mode: "long" or "short" TODO: replace by enum
        :param do_print_to_file: if true, print to file
        :param print_tc: if true, print TCs
        """
        self.print_buffer = ""
        # global print buffer which will be useful to print something to file
        self.file_buffer = ""
        self.display_mode = display_mode
        self.do_print_to_file = do_print_to_file
        self.print_tc = print_tc

    def print_telemetry(self, packet: PusTelemetry):
        """
        This function handles printing telemetry
        :param packet:
        :return:
        """
        if self.display_mode == "short":
            self.__handle_short_print(packet)
        else:
            self.__handle_long_print(packet)
        self.__handle_wiretapping_packet(packet)
        self.__handle_data_reply_packet(packet)
        if packet.get_service() == 3 and \
                (packet.get_subservice() == 25 or packet.get_subservice() == 26):
            self.__handle_hk_print(packet)
        if packet.get_service() == 3 and \
                (packet.get_subservice() == 10 or packet.get_subservice() == 12):
            self.__handle_hk_definition_print(packet)
        if g.G_PRINT_RAW_TM:
            self.print_buffer = "TM Data:" + "\n" + self.return_data_string(packet.get_tm_data())
            print(self.print_buffer)
            self.add_print_buffer_to_file_buffer()

    def __handle_short_print(self, tm_packet: PusTelemetry):
        self.print_buffer = "Received TM[" + str(tm_packet.get_service()) + "," + str(
            tm_packet.get_subservice()) + "]"
        print(self.print_buffer)
        self.add_print_buffer_to_file_buffer()

    def __handle_long_print(self, tm_packet: PusTelemetry):
        self.print_buffer = "Received Telemetry: " + tm_packet.print_info
        print(self.print_buffer)
        self.add_print_buffer_to_file_buffer()
        self.__handle_column_header_print(tm_packet)
        self.__handle_tm_content_print(tm_packet)

    def __handle_column_header_print(self, tm_packet: PusTelemetry):
        rec_pus = []
        tm_packet.append_telemetry_column_headers(rec_pus)
        self.print_buffer = str(rec_pus)
        print(self.print_buffer)
        self.add_print_buffer_to_file_buffer()

    def __handle_tm_content_print(self, tm_packet: PusTelemetry):
        """
        :param tm_packet:
        :return:
        """
        rec_pus = []
        tm_packet.append_telemetry_content(rec_pus)
        self.print_buffer = str(rec_pus)
        print(self.print_buffer)
        self.add_print_buffer_to_file_buffer()

    def __handle_hk_print(self, tm_packet: PusTmT):
        """
        Prints HK _tm_data previously set by TM receiver
        :param tm_packet:
        :return:
        """
        if g.G_PRINT_HK_DATA:
            self.print_buffer = "HK Data from SID "
            self.print_buffer = self.print_buffer + str(hex(tm_packet.sid)) + " :"
            self.__print_hk(tm_packet)
            self.__print_validity_buffer(tm_packet)

    def __handle_hk_definition_print(self, tm_packet: PusTmT):
        """
        :param tm_packet:
        :return:
        """
        if g.G_PRINT_HK_DATA:
            self.print_buffer = "HK Definition from SID "
            self.print_buffer = self.print_buffer + str(hex(tm_packet.sid)) + " :"
            self.__print_hk(tm_packet)

    def __print_hk(self, tm_packet: PusTm3T):
        """
        :param tm_packet:
        :return:
        """
        print(self.print_buffer)
        self.add_print_buffer_to_file_buffer()
        self.print_buffer = str(tm_packet.hkHeader)
        print(self.print_buffer)
        self.add_print_buffer_to_file_buffer()
        self.print_buffer = str(tm_packet.hkContent)
        print(self.print_buffer)
        self.add_print_buffer_to_file_buffer()

    def __print_validity_buffer(self, tm_packet: PusTm3T):
        """
        :param tm_packet:
        :return:
        """
        self.print_buffer = "Valid: "
        print(self.print_buffer)
        self.add_print_buffer_to_file_buffer()
        self.__handle_validity_buffer_print(tm_packet.validity_buffer, tm_packet.numberOfParameters)

    def __handle_validity_buffer_print(self, validity_buffer: bytearray, number_of_parameters):
        """
        :param validity_buffer:
        :param number_of_parameters:
        :return:
        """
        self.print_buffer = "["
        for index, byte in enumerate(validity_buffer):
            for bit in range(1, 9):
                if self.bit_extractor(byte, bit) == 1:
                    self.print_buffer = self.print_buffer + "Yes"
                else:
                    self.print_buffer = self.print_buffer + "No"
                if index == number_of_parameters:
                    self.print_buffer = self.print_buffer + "]"
                else:
                    self.print_buffer = self.print_buffer + ", "
        print(self.print_buffer)
        self.add_print_buffer_to_file_buffer()

    def __handle_wiretapping_packet(self, packet):
        """
        :param packet:
        :return:
        """
        if packet.get_service() == 2 and \
                (packet.get_subservice() == 131 or packet.get_subservice() == 130):
            self.print_buffer = "Wiretapping Packet or Raw Reply from TM [" + \
                                str(packet.get_service()) + "," + str(packet.get_subservice()) + "]:"
            self.print_buffer = self.print_buffer + packet.return_data_string()
            print(self.print_buffer)
            self.add_print_buffer_to_file_buffer()

    def __handle_data_reply_packet(self, tm_packet: PusTelemetry):
        """
        Handles the PUS Service 8 _tm_data reply packets.
        :param tm_packet:
        :return:
        """
        if tm_packet.get_service() == 8 and tm_packet.get_subservice() == 130:
            self.print_buffer = "Service 8 Direct Command Reply TM[8,130] with _tm_data: " \
                                + tm_packet.return_data_string()
            print(self.print_buffer)

    def print_data(self, byte_array: bytearray):
        """
        :param byte_array:
        :return: None
        """
        string = self.return_data_string(byte_array)
        print(string)

    @staticmethod
    def return_data_string(byte_array: bytearray) -> str:
        """
        Converts a bytearray to string format for printing
        :param byte_array:
        :return:
        """
        str_to_print = "["
        for byte in byte_array:
            str_to_print += str(hex(byte)) + " , "
        str_to_print = str_to_print.rstrip(' ')
        str_to_print = str_to_print.rstrip(',')
        str_to_print += ']'
        return str_to_print

    def print_string(self, string: str):
        """
        Print a string and adds it to the file buffer.
        :param string:
        :return:
        """
        self.print_buffer = string
        print(self.print_buffer)
        if self.do_print_to_file:
            self.add_print_buffer_to_file_buffer()

    def add_print_buffer_to_file_buffer(self):
        """
        :return:
        """
        if self.do_print_to_file:
            self.file_buffer = self.file_buffer + self.print_buffer + "\n"

    def print_to_file(self, log_name: str = "log/tmtc_log.txt", clear_file_buffer: bool = False):
        """
        :param log_name:
        :param clear_file_buffer:
        :return:
        """
        try:
            file = open(log_name, 'w')
        except FileNotFoundError:
            print("Log directory does not exists, creating log folder.")
            os.mkdir('log')
            file = open(log_name, 'w')
        file.write(self.file_buffer)
        if clear_file_buffer:
            self.file_buffer = ""
        print("Log file written to " + log_name)
        file.close()

    @staticmethod
    def bit_extractor(byte: int, position: int):
        """

        :param byte:
        :param position:
        :return:
        """
        shift_number = position + (6 - 2 * (position - 1))
        return (byte >> shift_number) & 1

    def print_telecommand(self, tc_packet: PusTcT, tc_packet_info: PusTcInfoT = None):
        """
        This function handles the printing of Telecommands
        :param tc_packet:
        :param tc_packet_info:
        :return:
        """
        if self.print_tc:
            if len(tc_packet) == 0:
                print("TMTC Printer: Empty packet was sent, configuration error")
                sys.exit()
            if tc_packet_info is None:
                print("TMTC Printer: No packet info supplied to print")
                return
            if self.display_mode == "short":
                self.__handle_short_tc_print(tc_packet_info)
            else:
                self.__handle_long_tc_print(tc_packet_info)

    def __handle_short_tc_print(self, tc_packet_info: PusTcInfoT):
        """
        Brief TC print
        :param tc_packet_info:
        :return:
        """
        self.print_buffer = "Sent TC[" + str(tc_packet_info["service"]) + "," + \
                            str(tc_packet_info["subservice"]) + "] " + " with SSC " + \
                            str(tc_packet_info["ssc"])
        print(self.print_buffer)
        self.add_print_buffer_to_file_buffer()

    def __handle_long_tc_print(self, tc_packet_info: PusTcInfoT):
        """
        Long TC print
        :param tc_packet_info:
        :return:
        """
        try:
            self.print_buffer = "Telecommand TC[" + str(tc_packet_info["service"]) + "," + \
                                str(tc_packet_info["subservice"]) + "] with SSC " + \
                                str(tc_packet_info["ssc"]) + " sent with data " + \
                                self.return_data_string(tc_packet_info["data"])
            print(self.print_buffer)
            self.add_print_buffer_to_file_buffer()
        except TypeError:
            print("TMTC Printer: Type Error !")