Something went wrong on our end
Forked from an inaccessible project.
-
Robin Mueller authoredRobin Mueller authored
obsw_tmtc_printer.py 10.70 KiB
#!/usr/bin/python3.7
# -*- coding: utf-8 -*-
"""
@file
OBSW_Config.py
@date
01.11.2019
@brief
Class that performs all printing functionalities
"""
import os
import sys
from typing import TypeVar
from config import OBSW_Config as g
from tm.obsw_pus_tm_base import PusTmT, PusTelemetry
from tm.obsw_tm_service_3 import PusTm3T
from tc.obsw_pus_tc_base import PusTcT, PusTcInfoT
TmTcPrinterT = TypeVar('TmTcPrinterT', bound='TmTcPrinter')
class TmTcPrinter:
"""
This class handles printing to the command line and to files.
"""
def __init__(self, display_mode: str = "long", do_print_to_file: bool = True,
print_tc: bool = True):
"""
:param display_mode: "long" or "short" TODO: replace by enum
:param do_print_to_file: if true, print to file
:param print_tc: if true, print TCs
"""
self.print_buffer = ""
# global print buffer which will be useful to print something to file
self.file_buffer = ""
self.display_mode = display_mode
self.do_print_to_file = do_print_to_file
self.print_tc = print_tc
def print_telemetry(self, packet: PusTelemetry):
"""
This function handles printing telemetry
:param packet:
:return:
"""
if self.display_mode == "short":
self.__handle_short_print(packet)
else:
self.__handle_long_print(packet)
self.__handle_wiretapping_packet(packet)
self.__handle_data_reply_packet(packet)
if packet.get_service() == 3 and \
(packet.get_subservice() == 25 or packet.get_subservice() == 26):
self.__handle_hk_print(packet)
if packet.get_service() == 3 and \
(packet.get_subservice() == 10 or packet.get_subservice() == 12):
self.__handle_hk_definition_print(packet)
if g.G_PRINT_RAW_TM:
self.print_buffer = "TM Data:" + "\n" + self.return_data_string(packet.get_tm_data())
print(self.print_buffer)
self.add_print_buffer_to_file_buffer()
def __handle_short_print(self, tm_packet: PusTelemetry):
self.print_buffer = "Received TM[" + str(tm_packet.get_service()) + "," + str(
tm_packet.get_subservice()) + "]"
print(self.print_buffer)
self.add_print_buffer_to_file_buffer()
def __handle_long_print(self, tm_packet: PusTelemetry):
self.print_buffer = "Received Telemetry: " + tm_packet.print_info
print(self.print_buffer)
self.add_print_buffer_to_file_buffer()
self.__handle_column_header_print(tm_packet)
self.__handle_tm_content_print(tm_packet)
def __handle_column_header_print(self, tm_packet: PusTelemetry):
rec_pus = []
tm_packet.append_telemetry_column_headers(rec_pus)
self.print_buffer = str(rec_pus)
print(self.print_buffer)
self.add_print_buffer_to_file_buffer()
def __handle_tm_content_print(self, tm_packet: PusTelemetry):
"""
:param tm_packet:
:return:
"""
rec_pus = []
tm_packet.append_telemetry_content(rec_pus)
self.print_buffer = str(rec_pus)
print(self.print_buffer)
self.add_print_buffer_to_file_buffer()
def __handle_hk_print(self, tm_packet: PusTmT):
"""
Prints HK _tm_data previously set by TM receiver
:param tm_packet:
:return:
"""
if g.G_PRINT_HK_DATA:
self.print_buffer = "HK Data from SID "
self.print_buffer = self.print_buffer + str(hex(tm_packet.sid)) + " :"
self.__print_hk(tm_packet)
self.__print_validity_buffer(tm_packet)
def __handle_hk_definition_print(self, tm_packet: PusTmT):
"""
:param tm_packet:
:return:
"""
if g.G_PRINT_HK_DATA:
self.print_buffer = "HK Definition from SID "
self.print_buffer = self.print_buffer + str(hex(tm_packet.sid)) + " :"
self.__print_hk(tm_packet)
def __print_hk(self, tm_packet: PusTm3T):
"""
:param tm_packet:
:return:
"""
print(self.print_buffer)
self.add_print_buffer_to_file_buffer()
self.print_buffer = str(tm_packet.hkHeader)
print(self.print_buffer)
self.add_print_buffer_to_file_buffer()
self.print_buffer = str(tm_packet.hkContent)
print(self.print_buffer)
self.add_print_buffer_to_file_buffer()
def __print_validity_buffer(self, tm_packet: PusTm3T):
"""
:param tm_packet:
:return:
"""
self.print_buffer = "Valid: "
print(self.print_buffer)
self.add_print_buffer_to_file_buffer()
self.__handle_validity_buffer_print(tm_packet.validity_buffer, tm_packet.numberOfParameters)
def __handle_validity_buffer_print(self, validity_buffer: bytearray, number_of_parameters):
"""
:param validity_buffer:
:param number_of_parameters:
:return:
"""
self.print_buffer = "["
for index, byte in enumerate(validity_buffer):
for bit in range(1, 9):
if self.bit_extractor(byte, bit) == 1:
self.print_buffer = self.print_buffer + "Yes"
else:
self.print_buffer = self.print_buffer + "No"
if index == number_of_parameters:
self.print_buffer = self.print_buffer + "]"
else:
self.print_buffer = self.print_buffer + ", "
print(self.print_buffer)
self.add_print_buffer_to_file_buffer()
def __handle_wiretapping_packet(self, packet):
"""
:param packet:
:return:
"""
if packet.get_service() == 2 and \
(packet.get_subservice() == 131 or packet.get_subservice() == 130):
self.print_buffer = "Wiretapping Packet or Raw Reply from TM [" + \
str(packet.get_service()) + "," + str(packet.get_subservice()) + "]:"
self.print_buffer = self.print_buffer + packet.return_data_string()
print(self.print_buffer)
self.add_print_buffer_to_file_buffer()
def __handle_data_reply_packet(self, tm_packet: PusTelemetry):
"""
Handles the PUS Service 8 _tm_data reply packets.
:param tm_packet:
:return:
"""
if tm_packet.get_service() == 8 and tm_packet.get_subservice() == 130:
self.print_buffer = "Service 8 Direct Command Reply TM[8,130] with _tm_data: " \
+ tm_packet.return_data_string()
print(self.print_buffer)
def print_data(self, byte_array: bytearray):
"""
:param byte_array:
:return: None
"""
string = self.return_data_string(byte_array)
print(string)
@staticmethod
def return_data_string(byte_array: bytearray) -> str:
"""
Converts a bytearray to string format for printing
:param byte_array:
:return:
"""
str_to_print = "["
for byte in byte_array:
str_to_print += str(hex(byte)) + " , "
str_to_print = str_to_print.rstrip(' ')
str_to_print = str_to_print.rstrip(',')
str_to_print += ']'
return str_to_print
def print_string(self, string: str):
"""
Print a string and adds it to the file buffer.
:param string:
:return:
"""
self.print_buffer = string
print(self.print_buffer)
if self.do_print_to_file:
self.add_print_buffer_to_file_buffer()
def add_print_buffer_to_file_buffer(self):
"""
:return:
"""
if self.do_print_to_file:
self.file_buffer = self.file_buffer + self.print_buffer + "\n"
def print_to_file(self, log_name: str = "log/tmtc_log.txt", clear_file_buffer: bool = False):
"""
:param log_name:
:param clear_file_buffer:
:return:
"""
try:
file = open(log_name, 'w')
except FileNotFoundError:
print("Log directory does not exists, creating log folder.")
os.mkdir('log')
file = open(log_name, 'w')
file.write(self.file_buffer)
if clear_file_buffer:
self.file_buffer = ""
print("Log file written to " + log_name)
file.close()
@staticmethod
def bit_extractor(byte: int, position: int):
"""
:param byte:
:param position:
:return:
"""
shift_number = position + (6 - 2 * (position - 1))
return (byte >> shift_number) & 1
def print_telecommand(self, tc_packet: PusTcT, tc_packet_info: PusTcInfoT = None):
"""
This function handles the printing of Telecommands
:param tc_packet:
:param tc_packet_info:
:return:
"""
if self.print_tc:
if len(tc_packet) == 0:
print("TMTC Printer: Empty packet was sent, configuration error")
sys.exit()
if tc_packet_info is None:
print("TMTC Printer: No packet info supplied to print")
return
if self.display_mode == "short":
self.__handle_short_tc_print(tc_packet_info)
else:
self.__handle_long_tc_print(tc_packet_info)
def __handle_short_tc_print(self, tc_packet_info: PusTcInfoT):
"""
Brief TC print
:param tc_packet_info:
:return:
"""
self.print_buffer = "Sent TC[" + str(tc_packet_info["service"]) + "," + \
str(tc_packet_info["subservice"]) + "] " + " with SSC " + \
str(tc_packet_info["ssc"])
print(self.print_buffer)
self.add_print_buffer_to_file_buffer()
def __handle_long_tc_print(self, tc_packet_info: PusTcInfoT):
"""
Long TC print
:param tc_packet_info:
:return:
"""
try:
self.print_buffer = "Telecommand TC[" + str(tc_packet_info["service"]) + "," + \
str(tc_packet_info["subservice"]) + "] with SSC " + \
str(tc_packet_info["ssc"]) + " sent with data " + \
self.return_data_string(tc_packet_info["data"])
print(self.print_buffer)
self.add_print_buffer_to_file_buffer()
except TypeError:
print("TMTC Printer: Type Error !")