diff --git a/comIF/obsw_com_config.py b/comIF/obsw_com_config.py new file mode 100644 index 0000000000000000000000000000000000000000..8824fa5e9813b2cad47585bc5cc852ae3bfd34e2 --- /dev/null +++ b/comIF/obsw_com_config.py @@ -0,0 +1,38 @@ +import sys + +from utility.obsw_tmtc_printer import TmTcPrinter +from comIF.obsw_com_interface import CommunicationInterface +from comIF.obsw_dummy_com_if import DummyComIF +from comIF.obsw_ethernet_com_if import EthernetComIF +from comIF.obsw_serial_com_if import SerialComIF +from utility.obsw_logger import get_logger +import config.obsw_config as g + +logger = get_logger() + +def set_communication_interface(tmtc_printer: TmTcPrinter) -> CommunicationInterface: + """ + Return the desired communication interface object + :param tmtc_printer: TmTcPrinter object. + :return: CommunicationInterface object + """ + try: + if g.G_COM_IF == g.ComIF.Ethernet: + communication_interface = EthernetComIF( + tmtc_printer=tmtc_printer, tm_timeout=g.G_TM_TIMEOUT, + tc_timeout_factor=g.G_TC_SEND_TIMEOUT_FACTOR, send_address=g.G_SEND_ADDRESS, + receive_address=g.G_REC_ADDRESS) + elif g.G_COM_IF == g.ComIF.Serial: + com_port = g.G_COM_PORT + baud_rate = 250000 + g.G_SERIAL_TIMEOUT = 1 + communication_interface = SerialComIF( + tmtc_printer=tmtc_printer, com_port=com_port, baud_rate=baud_rate, + serial_timeout=g.G_SERIAL_TIMEOUT) + else: + communication_interface = DummyComIF(tmtc_printer=tmtc_printer) + return communication_interface + except (IOError, OSError): + logger.error("Error setting up communication interface") + logger.exception("Error") + sys.exit() \ No newline at end of file diff --git a/comIF/obsw_dummy_com_if.py b/comIF/obsw_dummy_com_if.py index 653b536f6782979e7ea06bd9f569c18253e20e4c..995e6f40f8942a835113f1bb45dc5c8fa95ba520 100644 --- a/comIF/obsw_dummy_com_if.py +++ b/comIF/obsw_dummy_com_if.py @@ -31,5 +31,3 @@ class DummyComIF(CommunicationInterface): def send_telecommand(self, tc_packet: PusTelecommand, tc_packet_info: PusTcInfoT = None) -> None: if isinstance(tc_packet_info, dict) and tc_packet_info.__len__() > 0: self.service_sent = tc_packet_info[TcDictionaryKeys.SERVICE] - print("Send function accessed") - print(self.service_sent) diff --git a/config/obsw_config.py b/config/obsw_config.py index c76d2c5f81b37d9b5eaf00131fa5f66cfb952dce..a54a77667d1c8ab076f1656d388b6c94dca7164d 100644 --- a/config/obsw_config.py +++ b/config/obsw_config.py @@ -8,6 +8,7 @@ """ import enum import struct +import pprint from typing import Tuple """ @@ -58,6 +59,10 @@ GPS1_SID = bytearray([0x00, 0x00, 0x2f, 0x00]) All global variables, set in main program with arg parser """ +# TMTC Client +G_TMTC_LOGGER_NAME = "TMTC Logger" +G_ERROR_LOG_FILE_NAME = "tmtc_error.log" +G_PP = pprint.PrettyPrinter # General Settings G_SCRIPT_MODE = 1 @@ -125,16 +130,12 @@ def set_globals(args): elif args.mode == 5: G_MODE_ID = ModeList.UnitTest else: - print("Invalid mode argument, setting default mode (1)") G_MODE_ID = ModeList[1] if args.comIF == 0: - print("Setting ethernet communication interface") G_COM_IF = ComIF.Ethernet elif args.comIF == 1: - print("Setting serial communication interface") G_COM_IF = ComIF.Serial else: - print("Setting dummy communication interface") G_COM_IF = ComIF.Dummy G_SERVICE = str(args.service) diff --git a/obsw_tmtc_client.py b/obsw_tmtc_client.py index 3c549b8d3c11b6093554255217b319deeeae1bb8..b2b06c2ac01addd2015ae52f2148e27825392f01 100644 --- a/obsw_tmtc_client.py +++ b/obsw_tmtc_client.py @@ -1,58 +1,6 @@ #!/usr/bin/python3.8 -""" -@file - obsw_tmtc_client.py -@date - 01.11.2019 -@brief - This client was developed by KSat for the SOURCE project to test the on-board software. - It can be used to to send and receive TMTC packets and TMTC sequences. -@manual -Manual installation of crcmod and pyserial might be needed - 1. Install pip if it is not installed yet - 2. Install crcmod and all other required packages: - Command: python3.8 -m pip install crcmod - or use IDE (interpreter settings -> pip in PyCharm) - -The script can be used by specifying command line parameters. -Please run this script with the -h flag or without any command line parameters to display options. -GUI is work-in-progress -It might be necessary to set board or PC IP address if using ethernet communication. -Default values should work normally though. Use higher timeout value (-t Parameter) for STM32 - -Example command to test G_SERVICE 17, -assuming no set client IP (set manually to PC IP Address if necessary) -and default board IP 169.254.1.38: - obsw_tmtc_client.py -m 3 -s 17 -Example to run Unit Test: - obsw_tmtc_client.py -m 5 -Example to test G_SERVICE 17 with HK output and serial communication: - obsw_tmtc_client.py -m 3 -s 17 --hk -c 1 -Get command line help: - obsw_tmtc_client.py -h - -There are four different Modes: - 0. GUI Mode: Experimental mode, also called if no input parameter are specified - 1. Listener Mode: Only Listen for incoming TM packets - 2. SingleCommandMode: Send Single Command repeatedly until answer is received, - only listen after that - 3. ServiceTestMode: Send all Telecommands belonging to a certain G_SERVICE - and scan for replies for each telecommand. Listen after that - 4. SoftwareTestMode: Send all services and perform reply scanning like mode 3. - Listen after that - 5. Unit Test Mode: Performs a unit test which returns a simple OK or NOT OK. This mode - has the capability to send TCs in bursts, where applicable - -If there are problems receiving packets with Ethernet Communication, -use the tool Wireshark to track ethernet communication -for UDP echo packets (requests and response). -If the packets appear, there might be a problematic firewall setting. -Please ensure that python.exe UDP packets are not blocked in advanced firewall settings -and create a rule to allow packets from port 2008. - -@author: - S. Gaisser, J. Meier, R. Mueller -""" +# -*- coding: utf-8 -*- + import atexit import unittest import logging @@ -72,28 +20,77 @@ from utility.obsw_args_parser import parse_input_arguments from utility.obsw_tmtc_printer import TmTcPrinter from utility.obsw_exit_handler import keyboard_interrupt_handler -from comIF.obsw_ethernet_com_if import EthernetComIF -from comIF.obsw_serial_com_if import SerialComIF -from comIF.obsw_dummy_com_if import DummyComIF -from comIF.obsw_com_interface import ComIfT +from comIF.obsw_com_config import set_communication_interface from gui.obsw_tmtc_gui import TmTcGUI from gui.obsw_backend_test import TmTcBackend +from utility.obsw_logger import set_tmtc_logger +from utility.obsw_logger import get_logger + def main(): """ - Runs the TMTC client, depending on input arguments. + This client was developed by KSat for the SOURCE project to test the on-board software. + It can be used to to send and receive TMTC packets and TMTC sequences. + + @manual + Manual installation of crcmod and pyserial might be needed + 1. Install pip if it is not installed yet + 2. Install crcmod and all other required packages: + Command: python3.8 -m pip install crcmod + or use IDE (interpreter settings -> pip in PyCharm) + + The script can be used by specifying command line parameters. + Please run this script with the -h flag or without any command line parameters to display options. + GUI is work-in-progress + It might be necessary to set board or PC IP address if using ethernet communication. + Default values should work normally though. Use higher timeout value (-t Parameter) for STM32 + + Example command to test G_SERVICE 17, + assuming no set client IP (set manually to PC IP Address if necessary) + and default board IP 169.254.1.38: + obsw_tmtc_client.py -m 3 -s 17 + Example to run Unit Test: + obsw_tmtc_client.py -m 5 + Example to test G_SERVICE 17 with HK output and serial communication: + obsw_tmtc_client.py -m 3 -s 17 --hk -c 1 + Get command line help: + obsw_tmtc_client.py -h + + There are four different Modes: + 0. GUI Mode: Experimental mode, also called if no input parameter are specified + 1. Listener Mode: Only Listen for incoming TM packets + 2. SingleCommandMode: Send Single Command repeatedly until answer is received, + only listen after that + 3. ServiceTestMode: Send all Telecommands belonging to a certain G_SERVICE + and scan for replies for each telecommand. Listen after that + 4. SoftwareTestMode: Send all services and perform reply scanning like mode 3. + Listen after that + 5. Unit Test Mode: Performs a unit test which returns a simple OK or NOT OK. This mode + has the capability to send TCs in bursts, where applicable + + If there are problems receiving packets with Ethernet Communication, + use the tool Wireshark to track ethernet communication + for UDP echo packets (requests and response). + If the packets appear, there might be a problematic firewall setting. + Please ensure that python.exe UDP packets are not blocked in advanced firewall settings + and create a rule to allow packets from port 2008. TODO: The whole code below could ba packed into a class too. If a backend/frontend architecture is to be implemented, this will make things easier We propably instantiate all sender/receiver objects (or maybe instantiate them once we need them) on object construction. The main could spawn a process for the frontend (e.g. GUI or web interface) and a process for the backend (the sender/receiver classes, passing values to a SQL database, loading the MIB database...) - :return: """ + set_tmtc_logger() + logger = get_logger() + logger.info("Starting TMTC Client") + logger.info("Parsing input arguments") args = parse_input_arguments() + logger.info("Setting global variables") set_globals(args) + tmtc_printer = TmTcPrinter(g.G_DISPLAY_MODE, g.G_PRINT_TO_FILE, True) if g.G_MODE_ID == g.ModeList.GUIMode: backend = TmTcBackend() @@ -102,7 +99,7 @@ def main(): gui.start() backend.join() gui.join() - print("Both processes have closed") + logger.info("Both processes have closed") sys.exit() else: communication_interface = set_communication_interface(tmtc_printer) @@ -112,16 +109,18 @@ def main(): tc_timeout_factor=g.G_TC_SEND_TIMEOUT_FACTOR) tm_listener.start() if g.G_MODE_ID == g.ModeList.ListenerMode: - print("Listening for packages...") + logger.info("Listening for packages...") elif g.G_MODE_ID == g.ModeList.SingleCommandMode: pus_packet_tuple = command_preparation() sender_and_receiver = SingleCommandSenderReceiver( com_interface=communication_interface, tmtc_printer=tmtc_printer, tm_listener=tm_listener, pus_packet_tuple=pus_packet_tuple) + logger.info("Performing single command operation") sender_and_receiver.send_single_tc_and_receive_tm() elif g.G_MODE_ID == g.ModeList.ServiceTestMode: service_queue = deque() + logger.info("Performing service command operation") sender_and_receiver = SequentialCommandSenderReceiver( com_interface=communication_interface, tmtc_printer=tmtc_printer, tm_listener=tm_listener, tc_queue=service_test_select(g.G_SERVICE, service_queue)) @@ -129,6 +128,7 @@ def main(): elif g.G_MODE_ID == g.ModeList.SoftwareTestMode: all_tc_queue = create_total_tc_queue() + logger.info("Performing multjple service commands operation") sender_and_receiver = SequentialCommandSenderReceiver( com_interface=communication_interface, tmtc_printer=tmtc_printer, tc_queue=all_tc_queue, tm_listener=tm_listener) @@ -139,12 +139,13 @@ def main(): g.G_TM_LISTENER = tm_listener g.G_COM_INTERFACE = communication_interface g.G_TMTC_PRINTER = tmtc_printer + logger.info("Performing module tests") # noinspection PyTypeChecker suite = unittest.TestLoader().loadTestsFromModule(obsw_pus_service_test) unittest.TextTestRunner(verbosity=2).run(suite) else: - print("Unknown Mode, Configuration error !") + logging.error("Unknown Mode, Configuration error !") sys.exit() # At some later point, the program will run permanently and be able to take commands. # For now we put a permanent loop here so the program @@ -161,40 +162,8 @@ def command_preparation(): # Direct command which triggers an additional step reply and one completion reply # Single Command Testing command = PusTelecommand(service=17, subservice=1, ssc=21) - # command.print() - # file = bytearray([1, 2, 3, 4, 5]) - # command = PUSTelecommand(G_SERVICE=23, subservice=1, SSC=21, _tm_data=file) - # command.packCommandTuple() return command.pack_command_tuple() -def set_communication_interface(tmtc_printer: TmTcPrinter) -> ComIfT: - """ - Return the desired communication interface object - :param tmtc_printer: TmTcPrinter object. - :return: CommunicationInterface object - """ - try: - if g.G_COM_IF == g.ComIF.Ethernet: - communication_interface = EthernetComIF( - tmtc_printer=tmtc_printer, tm_timeout=g.G_TM_TIMEOUT, - tc_timeout_factor=g.G_TC_SEND_TIMEOUT_FACTOR, send_address=g.G_SEND_ADDRESS, - receive_address=g.G_REC_ADDRESS) - elif g.G_COM_IF == g.ComIF.Serial: - com_port = g.G_COM_PORT - baud_rate = 250000 - g.G_SERIAL_TIMEOUT = 1 - communication_interface = SerialComIF( - tmtc_printer=tmtc_printer, com_port=com_port, baud_rate=baud_rate, - serial_timeout=g.G_SERIAL_TIMEOUT) - else: - communication_interface = DummyComIF(tmtc_printer=tmtc_printer) - return communication_interface - except (IOError, OSError): - print("Error setting up communication interface") - logging.exception("Error") - sys.exit() - - if __name__ == "__main__": main() diff --git a/sendreceive/obsw_command_sender_receiver.py b/sendreceive/obsw_command_sender_receiver.py index 2684765da460f9f3c2a5234e1a5cc127f2a5bb20..c09a01fe96bfffe752396425ecf595661f5ba3f2 100644 --- a/sendreceive/obsw_command_sender_receiver.py +++ b/sendreceive/obsw_command_sender_receiver.py @@ -20,6 +20,8 @@ from utility.obsw_tmtc_printer import TmTcPrinter from sendreceive.obsw_tm_listener import TmListener from tc.obsw_pus_tc_base import TcQueueEntryT from tm.obsw_pus_tm_factory import PusTmQueueT +from utility.obsw_logger import get_logger +logger = get_logger() # pylint: disable=too-many-instance-attributes @@ -127,12 +129,12 @@ class CommandSenderReceiver: if self._start_time == 0: self._start_time = time.time() if self._timeout_counter == 5: - print("Command Sender Receiver: No response from command !") + logger.info("Command Sender Receiver: No response from command !") sys.exit() if self._start_time != 0: self._elapsed_time = time.time() - self._start_time if self._elapsed_time >= self._tm_timeout * self._tc_send_timeout_factor: - print("Command Sender Receiver: Timeout, sending TC again !") + logger.info("CommandSenderReceiver: Timeout, sending TC again !") self._com_interface.send_telecommand(self._last_tc, self._last_tc_info) self._timeout_counter = self._timeout_counter + 1 self._start_time = time.time() diff --git a/tc/obsw_pus_tc_base.py b/tc/obsw_pus_tc_base.py index ba25669bda2be2ab821d751e222dd96fa081292a..d8186cf433bfadb98b4fc2364e1634f6a5c2320d 100644 --- a/tc/obsw_pus_tc_base.py +++ b/tc/obsw_pus_tc_base.py @@ -1,14 +1,8 @@ -# -*- coding: utf-8 -*- -""" -Created on Wed Apr 4 11:43:00 2018 - -@author: S. Gaisser, R. Mueller -""" import sys import crcmod import logging from enum import Enum -from typing import TypeVar, Dict, Union, Tuple, Deque +from typing import Dict, Union, Tuple, Deque class TcDictionaryKeys(Enum): @@ -27,37 +21,69 @@ TcQueueT = Deque[TcQueueEntryT] PusTcInfoQueueT = Deque[PusTcInfoT] -class PusTelecommand: - headerSize = 6 - def __init__(self, service: int, subservice: int, ssc=0, data=bytearray([]), source_id: int = 0, - version: int = 0, packet_type: int = 1, data_field_header_flag: int = 1, - apid: int = 0x73, length: int = 0): +class PusTelecommand: + HEADER_SIZE = 6 + """ + Class representation of a PUS telecommand. It can be used to pack a raw telecommand from + input parameters. The structure of a PUS telecommand is specified in ECSS-E-70-41A on p.42 + and is also shown below (bottom) + """ + def __init__(self, service: int, subservice: int, ssc=0, app_data=bytearray([]), + source_id: int = 0, version: int = 0, apid: int = 0x73): + """ + Initiates a telecommand with the given parameters. + """ + packet_type = 1 + data_field_header_flag = 1 self.packet_id = [0x0, 0x0] self.packet_id[0] = ((version << 5) & 0xE0) | ((packet_type & 0x01) << 4) | \ ((data_field_header_flag & 0x01) << 3) | ((apid & 0x700) >> 8) self.packet_id[1] = apid & 0xFF self.ssc = ssc self.psc = (ssc & 0x3FFF) | (0xC0 << 8) - self.length = length self.pus_version_and_ack_byte = 0b00011111 self.service = service self.subservice = subservice self.source_id = source_id - self.data = data + self.app_data = app_data + + def __repr__(self): + """ + Returns the representation of a class instance. + TODO: Maybe a custom ID or dict consisting of SSC, Service and Subservice would be better. + """ + return self.ssc + + def __str__(self): + """ + Returns string representation of a class instance. + """ + return "TC[" + str(self.service) + "," + str(self.subservice) + "] " + " with SSC " + \ + str(self.ssc) - def get_length(self) -> int: + def get_data_length(self) -> int: """ Retrieve size of TC packet in bytes. + Formula according to PUS Standard: C = (Number of octets in packet data field) - 1. + The size of the TC packet is the size of the packet secondary header with + source ID + the length of the application data + length of the CRC16 checksum - 1 """ try: - length = 4 + len(self.data) + 1 - return length + data_length = 4 + len(self.app_data) + 1 + return data_length except TypeError: print("OBSW_TcPacket: Invalid service_type of data") logging.exception("Error") sys.exit() + def get_total_length(self): + """ + Length of full packet in bytes. + The header length is 6 bytes and the data length + 1 is the size of the data field. + """ + return self.get_data_length() + PusTelecommand.HEADER_SIZE + 1 + def pack(self) -> bytearray: """ Serializes the TC data fields into a bytearray. @@ -67,14 +93,14 @@ class PusTelecommand: data_to_pack.append(self.packet_id[1]) data_to_pack.append((self.psc & 0xFF00) >> 8) data_to_pack.append(self.psc & 0xFF) - length = self.get_length() + length = self.get_data_length() data_to_pack.append((length & 0xFF00) >> 8) data_to_pack.append(length & 0xFF) data_to_pack.append(self.pus_version_and_ack_byte) data_to_pack.append(self.service) data_to_pack.append(self.subservice) data_to_pack.append(self.source_id) - data_to_pack += self.data + data_to_pack += self.app_data crc_func = crcmod.mkCrcFun(0x11021, rev=False, initCrc=0xFFFF, xorOut=0x0000) crc = crc_func(data_to_pack) @@ -91,7 +117,7 @@ class PusTelecommand: TcDictionaryKeys.SUBSERVICE: self.subservice, TcDictionaryKeys.SSC: self.ssc, TcDictionaryKeys.PACKED_ID: self.packet_id, - TcDictionaryKeys.DATA: self.data + TcDictionaryKeys.DATA: self.app_data } return tc_information @@ -110,10 +136,12 @@ class PusTelecommand: print("]") -# Takes pusPackets, removes current Packet Error Control, -# calculates new CRC (16 bits at wiretapping_packet end) and -# adds it as correct Packet Error Control Code. Reference: ECSS-E70-41A p. 207-212 -def generate_packet_crc(tc_packet: PusTelecommand) -> PusTelecommand: +def generate_packet_crc(tc_packet: bytearray) -> bytearray: + """ + Takes pusPackets, removes current Packet Error Control, calculates new + CRC16 checksum and adds it as correct Packet Error Control Code. + Reference: ECSS-E70-41A p. 207-212 + """ crc_func = crcmod.mkCrcFun(0x11021, rev=False, initCrc=0xFFFF, xorOut=0x0000) crc = crc_func(bytearray(tc_packet[0:len(tc_packet) - 2])) tc_packet[len(tc_packet) - 2] = (crc & 0xFF00) >> 8 @@ -122,6 +150,9 @@ def generate_packet_crc(tc_packet: PusTelecommand) -> PusTelecommand: def generate_crc(data: bytearray) -> bytearray: + """ + Takes the application data, appends the CRC16 checksum and returns resulting bytearray + """ data_with_crc = bytearray() data_with_crc += data crc_func = crcmod.mkCrcFun(0x11021, rev=False, initCrc=0xFFFF, xorOut=0x0000) diff --git a/tc/obsw_pus_tc_packer.py b/tc/obsw_pus_tc_packer.py index 0df207e8a1d44337674cf524f4964ba438b52cdc..8a68bd7fe7c91d06b85efe99e23639153b9bd028 100644 --- a/tc/obsw_pus_tc_packer.py +++ b/tc/obsw_pus_tc_packer.py @@ -92,13 +92,13 @@ def pack_service9_test_into(tc_queue: TcQueueT) -> TcQueueT: print("Time Code 3 :" + str(time_test3) + "\r") # time setting tc_queue.appendleft(("print", "\r\nTesting Service 9: Testing timecode A")) - command = PusTelecommand(service=9, subservice=128, ssc=900, data=time_test1) + command = PusTelecommand(service=9, subservice=128, ssc=900, app_data=time_test1) tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(("print", "\r\nTesting Service 9: Testing timecode B")) - command = PusTelecommand(service=9, subservice=128, ssc=910, data=time_test2) + command = PusTelecommand(service=9, subservice=128, ssc=910, app_data=time_test2) tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(("print", "\r\nTesting Service 9: Testing timecode Current Time")) - command = PusTelecommand(service=9, subservice=128, ssc=920, data=time_test3) + command = PusTelecommand(service=9, subservice=128, ssc=920, app_data=time_test3) tc_queue.appendleft(command.pack_command_tuple()) # TODO: Add other time formats here tc_queue.appendleft(("print", "\r")) @@ -132,7 +132,7 @@ def pack_dummy_device_test_into(tc_queue: TcQueueT) -> TcQueueT: # Set On Mode tc_queue.appendleft(("print", "\n\rTesting Service Dummy: Set On")) mode_data = packModeData(object_id, 1, 0) - command = PusTelecommand(service=200, subservice=1, ssc=1, data=mode_data) + command = PusTelecommand(service=200, subservice=1, ssc=1, app_data=mode_data) tc_queue.appendleft(command.pack_command_tuple()) # Test Service 2 commands tc_queue.appendleft(("print", "\n\rTesting Service Dummy: Service 2")) @@ -156,35 +156,35 @@ def pack_gps_test_into(object_id: bytearray, tc_queue: TcQueueT) -> TcQueueT: # Set Mode Off tc_queue.appendleft(("print", "\n\rTesting " + gps_string + ": Set Off")) mode_data = packModeData(object_id, 0, 0) - command = PusTelecommand(service=200, subservice=1, ssc=11, data=mode_data) + command = PusTelecommand(service=200, subservice=1, ssc=11, app_data=mode_data) tc_queue.appendleft(command.pack_command_tuple()) # Set Mode On tc_queue.appendleft(("print", "\n\rTesting " + gps_string + ": Set On")) mode_data = packModeData(object_id, 1, 0) - command = PusTelecommand(service=200, subservice=1, ssc=12, data=mode_data) + command = PusTelecommand(service=200, subservice=1, ssc=12, app_data=mode_data) tc_queue.appendleft(command.pack_command_tuple()) # Enable HK report sid_gps = 0 if object_id == g.GPS0_ObjectId: sid_gps = g.GPS0_SID tc_queue.appendleft(("print", "\r\nTesting " + gps_string + ": Enable HK Reporting")) - command = PusTelecommand(service=3, subservice=5, ssc=13, data=sid_gps) + command = PusTelecommand(service=3, subservice=5, ssc=13, app_data=sid_gps) tc_queue.appendleft(command.pack_command_tuple()) elif object_id == g.GPS1_ObjectId: sid_gps = g.GPS1_SID tc_queue.appendleft(("print", "\r\nTesting " + gps_string + ": Enable HK Reporting")) - command = PusTelecommand(service=3, subservice=5, ssc=14, data=sid_gps) + command = PusTelecommand(service=3, subservice=5, ssc=14, app_data=sid_gps) tc_queue.appendleft(command.pack_command_tuple()) # pack wait interval until mode is on and a few gps replies have been received tc_queue.appendleft(("wait", 5)) # Disable HK reporting tc_queue.appendleft(("print", "\r\nTesting Service 3: Disable " + gps_string + " definition")) - command = PusTelecommand(service=3, subservice=6, ssc=15, data=sid_gps) + command = PusTelecommand(service=3, subservice=6, ssc=15, app_data=sid_gps) tc_queue.appendleft(command.pack_command_tuple()) # Set Mode Off tc_queue.appendleft(("print", "\n\rTesting " + gps_string + ": Set Off")) mode_data = packModeData(object_id, 0, 0) - command = PusTelecommand(service=200, subservice=1, ssc=13, data=mode_data) + command = PusTelecommand(service=200, subservice=1, ssc=13, app_data=mode_data) tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(("print", "\r")) tc_queue.appendleft(("export", "log/tmtc_log_service_" + gps_string + ".txt")) diff --git a/tc/obsw_tc_service2.py b/tc/obsw_tc_service2.py index cf3b63613554a5759c71b9801a01f4da309fef43..717c88f7190009bf03f3ce04d8a8eaa665f81524 100644 --- a/tc/obsw_tc_service2.py +++ b/tc/obsw_tc_service2.py @@ -21,29 +21,29 @@ def pack_service2_test_into(tc_queue: Deque, called_externally: bool = False) -> # Set Raw Mode tc_queue.appendleft(("print", "\r\nTesting Service 2: Setting Raw Mode")) mode_data = packModeData(object_id, 3, 0) - command = PusTelecommand(service=200, subservice=1, ssc=2020, data=mode_data) + command = PusTelecommand(service=200, subservice=1, ssc=2020, app_data=mode_data) tc_queue.appendleft(command.pack_command_tuple()) # toggle wiretapping raw tc_queue.appendleft(("print", "\r\nTesting Service 2: Toggling Wiretapping Raw")) wiretapping_toggle_data = pack_wiretapping_mode(object_id, 1) toggle_wiretapping_on_command = PusTelecommand(service=2, subservice=129, ssc=200, - data=wiretapping_toggle_data) + app_data=wiretapping_toggle_data) tc_queue.appendleft(toggle_wiretapping_on_command.pack_command_tuple()) # send raw command, _tm_data should be returned via TM[2,130] and TC[2,131] tc_queue.appendleft(("print", "\r\nTesting Service 2: Sending Raw Command")) raw_command = g.DUMMY_COMMAND_1 raw_data = object_id + raw_command - raw_command = PusTelecommand(service=2, subservice=128, ssc=201, data=raw_data) + raw_command = PusTelecommand(service=2, subservice=128, ssc=201, app_data=raw_data) tc_queue.appendleft(raw_command.pack_command_tuple()) # toggle wiretapping off tc_queue.appendleft(("print", "\r\nTesting Service 2: Toggle Wiretapping Off")) wiretapping_toggle_data = pack_wiretapping_mode(object_id, 0) toggle_wiretapping_off_command = PusTelecommand(service=2, subservice=129, ssc=204, - data=wiretapping_toggle_data) + app_data=wiretapping_toggle_data) tc_queue.appendleft(toggle_wiretapping_off_command.pack_command_tuple()) # send raw command which should be returned via TM[2,130] tc_queue.appendleft(("print", "\r\nTesting Service 2: Send second raw command")) - command = PusTelecommand(service=2, subservice=128, ssc=205, data=raw_data) + command = PusTelecommand(service=2, subservice=128, ssc=205, app_data=raw_data) tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(("print", "\r")) if called_externally is False: diff --git a/tc/obsw_tc_service200.py b/tc/obsw_tc_service200.py index 654116acfbafebf62772d5d9d433f7d95f5b3df3..9b08445b1c42f89698ebc949f62eb8698cbed49d 100644 --- a/tc/obsw_tc_service200.py +++ b/tc/obsw_tc_service200.py @@ -24,22 +24,22 @@ def pack_service200_test_into(tcQueue: TcQueueT) -> TcQueueT: # Set On Mode tcQueue.appendleft(("print", "\r\nTesting Service 200: Set Mode On")) modeData = packModeData(objectId, 1, 0) - command = PusTelecommand(service=200, subservice=1, ssc=2000, data=modeData) + command = PusTelecommand(service=200, subservice=1, ssc=2000, app_data=modeData) tcQueue.appendleft(command.pack_command_tuple()) # Set Normal mode tcQueue.appendleft(("print", "\r\nTesting Service 200: Set Mode Normal")) modeData = packModeData(objectId, 2, 0) - command = PusTelecommand(service=200, subservice=1, ssc=2010, data=modeData) + command = PusTelecommand(service=200, subservice=1, ssc=2010, app_data=modeData) tcQueue.appendleft(command.pack_command_tuple()) # Set Raw Mode tcQueue.appendleft(("print", "\r\nTesting Service 200: Set Mode Raw")) modeData = packModeData(objectId, 3, 0) - command = PusTelecommand(service=200, subservice=1, ssc=2020, data=modeData) + command = PusTelecommand(service=200, subservice=1, ssc=2020, app_data=modeData) tcQueue.appendleft(command.pack_command_tuple()) # Set Off Mode tcQueue.appendleft(("print", "\r\nTesting Service 200: Set Mode Off")) modeData = packModeData(objectId, 0, 0) - command = PusTelecommand(service=200, subservice=1, ssc=2030, data=modeData) + command = PusTelecommand(service=200, subservice=1, ssc=2030, app_data=modeData) tcQueue.appendleft(command.pack_command_tuple()) tcQueue.appendleft(("export", "log/tmtc_log_service200.txt")) return tcQueue diff --git a/tc/obsw_tc_service3.py b/tc/obsw_tc_service3.py index c23309cfc30e0184ceb7ca016a5142336a32a127..43a72e2254f3faac5dfb7f2f864473b189b29cc7 100644 --- a/tc/obsw_tc_service3.py +++ b/tc/obsw_tc_service3.py @@ -30,30 +30,30 @@ def pack_service3_test_into(tc_queue: Deque) -> Deque: # deleting pre-defined test entry tc_queue.appendleft(("print", "\r\nTesting Service 3: Deleting pre-defined HK definition")) - command = PusTelecommand(service=3, subservice=3, ssc=3000, data=sid1) + command = PusTelecommand(service=3, subservice=3, ssc=3000, app_data=sid1) tc_queue.appendleft(command.pack_command_tuple()) # adding pre-defined definition to hk using test pool variables tc_queue.appendleft(("print", "\r\nTesting Service 3: Adding pre-defined HK definition")) - command = PusTelecommand(service=3, subservice=1, ssc=3010, data=hkDefinition1) + command = PusTelecommand(service=3, subservice=1, ssc=3010, app_data=hkDefinition1) tc_queue.appendleft(command.pack_command_tuple()) # adding custom definition to diagnostics using test pool variables tc_queue.appendleft(("print", "\r\nTesting Service 3: Adding custom diganostics definition")) - command = PusTelecommand(service=3, subservice=2, ssc=3020, data=hkDefinition2) + command = PusTelecommand(service=3, subservice=2, ssc=3020, app_data=hkDefinition2) tc_queue.appendleft(command.pack_command_tuple()) # enable custom hk definition tc_queue.appendleft(("print", "\r\nTesting Service 3: Enable custom definition")) - command = PusTelecommand(service=3, subservice=5, ssc=3030, data=sid1) + command = PusTelecommand(service=3, subservice=5, ssc=3030, app_data=sid1) tc_queue.appendleft(command.pack_command_tuple()) # enable custom diag definition tc_queue.appendleft(("print", "\r\nTesting Service 3: Enable custom diagnostics definition")) - command = PusTelecommand(service=3, subservice=7, ssc=3040, data=sid2) + command = PusTelecommand(service=3, subservice=7, ssc=3040, app_data=sid2) tc_queue.appendleft(command.pack_command_tuple()) # enable gps0 tc_queue.appendleft(("print", "\r\nTesting Service 3: Enable GPS definition")) - command = PusTelecommand(service=3, subservice=5, ssc=3050, data=sid_gps) + command = PusTelecommand(service=3, subservice=5, ssc=3050, app_data=sid_gps) tc_queue.appendleft(command.pack_command_tuple()) # maybe wait a bit to receive at least 2 packets.. @@ -61,61 +61,61 @@ def pack_service3_test_into(tc_queue: Deque) -> Deque: # Disable custom hk definition tc_queue.appendleft(("print", "\r\nTesting Service 3: Disable custom definition")) - command = PusTelecommand(service=3, subservice=6, ssc=3060, data=sid1) + command = PusTelecommand(service=3, subservice=6, ssc=3060, app_data=sid1) tc_queue.appendleft(command.pack_command_tuple()) # Disable custom diag definition tc_queue.appendleft(("print", "\r\nTesting Service 3: Disable custom diagnostics definition")) - command = PusTelecommand(service=3, subservice=8, ssc=3070, data=sid2) + command = PusTelecommand(service=3, subservice=8, ssc=3070, app_data=sid2) tc_queue.appendleft(command.pack_command_tuple()) # disable gps0 tc_queue.appendleft(("print", "\r\nTesting Service 3: Disable GPS definition")) - command = PusTelecommand(service=3, subservice=6, ssc=3080, data=sid_gps) + command = PusTelecommand(service=3, subservice=6, ssc=3080, app_data=sid_gps) tc_queue.appendleft(command.pack_command_tuple()) # report custom Diag definition tc_queue.appendleft(("print", "\r\nTesting Service 3: Reporting diag definition")) - command = PusTelecommand(service=3, subservice=11, ssc=3100, data=sid2) + command = PusTelecommand(service=3, subservice=11, ssc=3100, app_data=sid2) tc_queue.appendleft(command.pack_command_tuple()) # report gps definition tc_queue.appendleft(("print", "\r\nTesting Service 3: Reporting GPS definition")) - command = PusTelecommand(service=3, subservice=9, ssc=3110, data=sid_gps) + command = PusTelecommand(service=3, subservice=9, ssc=3110, app_data=sid_gps) tc_queue.appendleft(command.pack_command_tuple()) # generate one custom hk definition tc_queue.appendleft(("print", "\r\nTesting Service 3: Generate one custom hk definition")) - command = PusTelecommand(service=3, subservice=27, ssc=3120, data=sid1) + command = PusTelecommand(service=3, subservice=27, ssc=3120, app_data=sid1) tc_queue.appendleft(command.pack_command_tuple()) # generate one custom diag definition tc_queue.appendleft(("print", "\r\nTesting Service 3: Generate one custom diagnostics definition")) - command = PusTelecommand(service=3, subservice=28, ssc=3120, data=sid2) + command = PusTelecommand(service=3, subservice=28, ssc=3120, app_data=sid2) tc_queue.appendleft(command.pack_command_tuple()) # generate one gps 0 definition tc_queue.appendleft(("print", "\r\nTesting Service 3: Generate one gps 0 defintion")) - command = PusTelecommand(service=3, subservice=27, ssc=3120, data=sid_gps) + command = PusTelecommand(service=3, subservice=27, ssc=3120, app_data=sid_gps) tc_queue.appendleft(command.pack_command_tuple()) # modify custom hk definition interval newInterval = struct.pack('>f', 10.0) newIntervalCommand = sid1 + newInterval tc_queue.appendleft(("print", "\r\nTesting Service 3: Changing pre-defined HK definition interval")) - command = PusTelecommand(service=3, subservice=31, ssc=3090, data=newIntervalCommand) + command = PusTelecommand(service=3, subservice=31, ssc=3090, app_data=newIntervalCommand) tc_queue.appendleft(command.pack_command_tuple()) # report custom HK definition tc_queue.appendleft(("print", "\r\nTesting Service 3: Reporting pre-defined HK definition with changed interval")) - command = PusTelecommand(service=3, subservice=9, ssc=3090, data=sid1) + command = PusTelecommand(service=3, subservice=9, ssc=3090, app_data=sid1) tc_queue.appendleft(command.pack_command_tuple()) # modify custom diag definition interval newIntervalCommand = sid2 + newInterval tc_queue.appendleft(("print", "\r\nTesting Service 3: Changing custom diag HK definition interval")) - command = PusTelecommand(service=3, subservice=32, ssc=3090, data=newIntervalCommand) + command = PusTelecommand(service=3, subservice=32, ssc=3090, app_data=newIntervalCommand) tc_queue.appendleft(command.pack_command_tuple()) # report custom diag definition tc_queue.appendleft(("print", "\r\nTesting Service 3: Reporting diag definition")) - command = PusTelecommand(service=3, subservice=11, ssc=3100, data=sid2) + command = PusTelecommand(service=3, subservice=11, ssc=3100, app_data=sid2) tc_queue.appendleft(command.pack_command_tuple()) # append parameter to custom hk definiton # append parameter to custom diag definition # delete custom diag definition tc_queue.appendleft(("print", "\r\nTesting Service 3: Deleting custom diagnostics definition")) - command = PusTelecommand(service=3, subservice=4, ssc=3120, data=sid2) + command = PusTelecommand(service=3, subservice=4, ssc=3120, app_data=sid2) tc_queue.appendleft(command.pack_command_tuple()) # do some basic testing on predefined structs too diff --git a/tc/obsw_tc_service8.py b/tc/obsw_tc_service8.py index d0da23e443ddaf124f97f8addd216ed8243234f6..54594081ea7adb9557b680566ff1f0555c7cd2cb 100644 --- a/tc/obsw_tc_service8.py +++ b/tc/obsw_tc_service8.py @@ -21,20 +21,20 @@ def pack_service8_test_into(tc_queue: Deque, called_externally: bool = False) -> # set mode on tc_queue.appendleft(("print", "\r\nTesting Service 8: Set On Mode")) mode_data = packModeData(object_id, 1, 0) - command = PusTelecommand(service=200, subservice=1, ssc=800, data=mode_data) + command = PusTelecommand(service=200, subservice=1, ssc=800, app_data=mode_data) tc_queue.appendleft(command.pack_command_tuple()) # set mode normal tc_queue.appendleft(("print", "\r\nTesting Service 8: Set Normal Mode")) mode_data = packModeData(object_id, 2, 0) - command = PusTelecommand(service=200, subservice=1, ssc=810, data=mode_data) + command = PusTelecommand(service=200, subservice=1, ssc=810, app_data=mode_data) tc_queue.appendleft(command.pack_command_tuple()) # Direct command which triggers completion reply tc_queue.appendleft(("print", "\r\nTesting Service 8: Trigger Completion Reply")) action_id = g.DUMMY_COMMAND_1 direct_command = object_id + action_id - command = PusTelecommand(service=8, subservice=128, ssc=820, data=direct_command) + command = PusTelecommand(service=8, subservice=128, ssc=820, app_data=direct_command) tc_queue.appendleft(command.pack_command_tuple()) # Direct command which triggers _tm_data reply @@ -43,14 +43,14 @@ def pack_service8_test_into(tc_queue: Deque, called_externally: bool = False) -> command_param1 = g.DUMMY_COMMAND_2_PARAM_1 command_param2 = g.DUMMY_COMMAND_2_PARAM_2 direct_command = object_id + action_id + command_param1 + command_param2 - command = PusTelecommand(service=8, subservice=128, ssc=830, data=direct_command) + command = PusTelecommand(service=8, subservice=128, ssc=830, app_data=direct_command) tc_queue.appendleft(command.pack_command_tuple()) # Direct command which triggers an additional step reply and one completion reply tc_queue.appendleft(("print", "\r\nTesting Service 8: Trigger Step and Completion Reply")) action_id = g.DUMMY_COMMAND_3 direct_command = object_id + action_id - command = PusTelecommand(service=8, subservice=128, ssc=840, data=direct_command) + command = PusTelecommand(service=8, subservice=128, ssc=840, app_data=direct_command) tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(("wait", 2)) tc_queue.appendleft(("print", "\r")) diff --git a/tm/obsw_pus_tm_base.py b/tm/obsw_pus_tm_base.py index f2b4666df0ab0d3e31930b197774e08fa30c3522..b18784c7e8bc85ee062d79e2cf6e5b0b35526c83 100644 --- a/tm/obsw_pus_tm_base.py +++ b/tm/obsw_pus_tm_base.py @@ -1,8 +1,3 @@ -""" -@brief: Generic PUS wiretapping_packet class to deserialize raw PUS telemetry. -@date: 09.04.2020 -@author: R.Mueller, S. Gaisser -""" import datetime from enum import Enum, auto from typing import Final, Dict @@ -34,15 +29,18 @@ PusTmInfoT = Dict[TmDictionaryKeys, any] class PusTelemetry: """ - Generic PUS telemetry class. It is instantiated by passing the raw pus telemetry wiretapping_packet - (bytearray) to the constructor. It automatically deserializes the wiretapping_packet, exposing - various wiretapping_packet fields via getter functions. + Generic PUS telemetry class representation. + It is instantiated by passing the raw pus telemetry packet (bytearray) to the constructor. + It automatically deserializes the packet, exposing various packet fields via getter functions. + PUS Telemetry structure according to ECSS-E-70-41A p.46. Also see structure below (bottom). """ - def __init__(self, byte_array: bytes): + def __init__(self, byte_array: bytearray = bytearray()): + if byte_array == bytearray(): + return self._packet_raw: Final = byte_array self._pus_header: Final = PusPacketHeader(byte_array) byte_array = byte_array[6:] - self._data_field_header: Final = ObswPusPacketDataFieldHeader(byte_array) + self._data_field_header: Final = PusPacketDataFieldHeader(byte_array) byte_array = byte_array[12:] self._tm_data: Final = byte_array[:len(byte_array) - 2] self._crc: Final = byte_array[len(byte_array) - 2] << 8 | byte_array[len(byte_array) - 1] @@ -163,10 +161,37 @@ class PusTelemetry: print(self.return_data_string()) +class PusTelemetryPacked(PusTelemetry): + """ + Alternative way to create a PUS Telemetry packet by specifying telemetry parameters, + similarly to the way telecommands are created. This can be used to create telemetry + directly in the software. + """ + def __init__(self, service: int, subservice: int, ssc: int = 0, apid: int = 0x73, version: int = 0, + packet_type: int = 0, data_field_header_flag: int = 1): + super().__init__() + self.packet_id = [0x0, 0x0] + self.packet_id[0] = ((version << 5) & 0xE0) | ((packet_type & 0x01) << 4) | \ + ((data_field_header_flag & 0x01) << 3) | ((apid & 0x700) >> 8) + self.ssc = ssc + self.psc = (ssc & 0x3FFF) | (0b11 << 16) + self.pus_version_and_ack_byte = 0b00011111 + + # NOTE: In PUS-C, the PUS Version is 2 and specified for the first 4 bits. + # The other 4 bits of the first byte are the spacecraft time reference status + # To change to PUS-C, set 0b00100000 + self.data_field_version = 0b00010000 + self.service = service + self.subservice = subservice + self.pack_subcounter = 0 + self.time = 0 + + + # pylint: disable=too-many-instance-attributes class PusPacketHeader: """ - This class unnpacks the PUS wiretapping_packet header (see PUS structure below or PUS documentation) + This class unnpacks the PUS packet header, also see PUS structure below or PUS documentation. """ PUS_HEADER_SIZE = 6 @@ -208,7 +233,6 @@ class PusPacketHeader: array.append("Yes") else: array.append("No") - # header_list.append(str(self.valid)) @staticmethod def append_pus_packet_header_column_headers(array): @@ -217,13 +241,16 @@ class PusPacketHeader: array.append("Packet Valid") -class ObswPusPacketDataFieldHeader: +class PusPacketDataFieldHeader: + """ + Unpacks the PUS packet data field header. + """ def __init__(self, bytes_array): self.pus_version_and_ack_byte = (bytes_array[0] & 0x70) >> 4 self.service_type = bytes_array[1] self.service_subtype = bytes_array[2] self.subcounter = bytes_array[3] - self.time = ObswTimestamp(bytes_array[4:13]) + self.time = PusTelemetryTimestamp(bytes_array[4:13]) def append_data_field_header(self, content_list: list): """ @@ -248,9 +275,9 @@ class ObswPusPacketDataFieldHeader: self.time.print_time_headers(header_list) -class ObswTimestamp: +class PusTelemetryTimestamp: """ - Unpacks the time datafield of the TM wiretapping_packet. + Unpacks the time datafield of the TM packet. """ def __init__(self, byte_array): # pField = byte_array[0] @@ -276,8 +303,8 @@ class ObswTimestamp: # pylint: disable=line-too-long # Structure of a PUS Packet : -# A PUS wiretapping_packet consists of consecutive bits, the allocation and structure is standardised. -# Extended information can be found in ECSS-E-70-41A on p.42 +# A PUS packet consists of consecutive bits, the allocation and structure is standardised. +# Extended information can be found in ECSS-E-70-41A on p.46 # The easiest form to send a PUS Packet is in hexadecimal form. # A two digit hexadecimal number equals one byte, 8 bits or one octet # o = optional, Srv = Service diff --git a/utility/obsw_args_parser.py b/utility/obsw_args_parser.py index a98b306c7600cdea058ebf812696b918202e6793..33803a47ea1201805c3f1d4d7d5b160ecd5785f7 100644 --- a/utility/obsw_args_parser.py +++ b/utility/obsw_args_parser.py @@ -10,6 +10,11 @@ import argparse import sys +import logging +from utility.obsw_logger import get_logger + + +logger = get_logger() def parse_input_arguments(): @@ -55,7 +60,9 @@ def parse_input_arguments(): print("No Input Arguments specified.") arg_parser.print_help() args, unknown = arg_parser.parse_known_args() - print(args) + for argument in vars(args): + # logger.debug(argument + ": " + str(getattr(args, argument))) + pass handle_args(args, unknown) return args diff --git a/utility/obsw_logger.py b/utility/obsw_logger.py new file mode 100644 index 0000000000000000000000000000000000000000..96e9511452e4fd44dd7201c60d34ae0e529ce229 --- /dev/null +++ b/utility/obsw_logger.py @@ -0,0 +1,49 @@ +import logging +import os +import sys +import config.obsw_config as g + + +class InfoFilter(logging.Filter): + def filter(self, rec): + if rec.levelno == logging.INFO or rec.levelno == logging.DEBUG: + return rec.levelno + + +def set_tmtc_logger() -> logging.Logger: + logger = logging.getLogger(g.G_TMTC_LOGGER_NAME) + logger.setLevel(level=logging.DEBUG) + generic_format = logging.Formatter( + fmt='%(levelname)-8s: %(asctime)s.%(msecs)03d | %(message)s', + datefmt='%Y-%m-%d %H:%M:%S') + fault_format = logging.Formatter( + fmt='%(levelname)-8s: %(asctime)s.%(msecs)03d | [%(filename)s:%(lineno)d] | %(message)s', + datefmt='%Y-%m-%d %H:%M:%S') + + console_info_handler = logging.StreamHandler(stream=sys.stdout) + console_info_handler.setLevel(logging.DEBUG) + console_info_handler.addFilter(InfoFilter()) + + console_error_handler = logging.StreamHandler(stream=sys.stderr) + console_error_handler.setLevel(logging.WARNING) + try: + file_handler = logging.FileHandler( + filename="log/" + g.G_ERROR_LOG_FILE_NAME, encoding='utf-8', mode='w') + except FileNotFoundError: + os.mkdir("log") + file_handler = logging.FileHandler( + filename="log/" + g.G_ERROR_LOG_FILE_NAME, encoding='utf-8', mode='w') + + file_handler.setLevel(level=logging.WARNING) + + file_handler.setFormatter(generic_format) + console_info_handler.setFormatter(generic_format) + console_error_handler.setFormatter(fault_format) + logger.addHandler(file_handler) + logger.addHandler(console_info_handler) + logger.addHandler(console_error_handler) + return logger + +def get_logger() -> logging.Logger: + logger = logging.getLogger(g.G_TMTC_LOGGER_NAME) + return logger \ No newline at end of file diff --git a/utility/obsw_tmtc_printer.py b/utility/obsw_tmtc_printer.py index 22381c1dd74a47dda13da72e6f483f8092477260..6e8a972a1ff59380849ca343568df29f8ff17d68 100644 --- a/utility/obsw_tmtc_printer.py +++ b/utility/obsw_tmtc_printer.py @@ -10,17 +10,27 @@ """ import os import sys +import enum +import logging from config import obsw_config as g from tm.obsw_pus_tm_base import PusTelemetry from tm.obsw_tm_service_3 import Service3TM -from tc.obsw_pus_tc_base import PusTelecommand, PusTcInfoT, TcDictionaryKeys +from tc.obsw_pus_tc_base import PusTcInfoT, TcDictionaryKeys +from utility.obsw_logger import get_logger + +logger = get_logger() + + +class DisplayMode(enum.Enum): + SHORT = enum.auto() + LONG = enum.auto() class TmTcPrinter: """ This class handles printing to the command line and to files. """ - def __init__(self, display_mode: str = "long", do_print_to_file: bool = True, + def __init__(self, display_mode: DisplayMode.LONG, do_print_to_file: bool = True, print_tc: bool = True): """ :param display_mode: "long" or "short" TODO: replace by enum @@ -43,7 +53,7 @@ class TmTcPrinter: :param packet: :return: """ - if self.display_mode == "short": + if self.display_mode == DisplayMode.SHORT: self.__handle_short_print(packet) else: self.__handle_long_print(packet) @@ -269,7 +279,7 @@ class TmTcPrinter: if tc_packet_info is None: print("TMTC Printer: No packet info supplied to print") return - if self.display_mode == "short": + if self.display_mode == DisplayMode.SHORT: self.__handle_short_tc_print(tc_packet_info) else: self.__handle_long_tc_print(tc_packet_info) @@ -283,7 +293,7 @@ class TmTcPrinter: self.print_buffer = "Sent TC[" + str(tc_packet_info[TcDictionaryKeys.SERVICE]) + "," + \ str(tc_packet_info[TcDictionaryKeys.SUBSERVICE]) + "] " + " with SSC " + \ str(tc_packet_info[TcDictionaryKeys.SSC]) - print(self.print_buffer) + logger.info(self.print_buffer) self.add_print_buffer_to_file_buffer() def __handle_long_tc_print(self, tc_packet_info: PusTcInfoT): @@ -299,7 +309,7 @@ class TmTcPrinter: str(tc_packet_info[TcDictionaryKeys.SSC]) + " sent with data " + \ self.return_data_string(tc_packet_info[TcDictionaryKeys.DATA]) - print(self.print_buffer) + logger.info(self.print_buffer) self.add_print_buffer_to_file_buffer() except TypeError: print("TMTC Printer: Type Error !")