From bff668eb2e42ada2e90d71f6bf71849bc4cf371f Mon Sep 17 00:00:00 2001 From: "Robin.Mueller" <robin.mueller.m@gmail.com> Date: Thu, 16 Apr 2020 20:47:24 +0200 Subject: [PATCH] various improvements, dict keys are enums now Additionally service 8 unit test added --- ...=> OBSW_TmTcClient_Module_Test_Serial.xml} | 2 +- .../OBSW_TmTcClient_Service_8_Serial.xml | 2 +- comIF/obsw_ethernet_com_if.py | 6 +- comIF/obsw_serial_com_if.py | 4 +- config/{OBSW_Config.py => obsw_config.py} | 10 +- ...SW_BackendTest.py => obsw_backend_test.py} | 0 gui/{OBSW_TmtcGUI.py => obsw_tmtc_gui.py} | 2 +- obsw_tmtc_client.py | 20 +- sendreceive/obsw_command_sender_receiver.py | 22 +- .../obsw_multiple_commands_sender_receiver.py | 4 +- .../obsw_sequential_sender_receiver.py | 6 +- .../obsw_single_command_sender_receiver.py | 16 +- sendreceive/obsw_tm_listener.py | 2 +- tc/OBSW_TcService2.py | 62 ---- tc/OBSW_TcService3.py | 124 -------- tc/OBSW_TcService8.py | 59 ---- tc/obsw_pus_tc_base.py | 127 ++++---- tc/obsw_pus_tc_packer.py | 284 +++++++++--------- tc/obsw_tc_service2.py | 63 ++++ ..._TcService200.py => obsw_tc_service200.py} | 22 +- tc/obsw_tc_service3.py | 124 ++++++++ tc/obsw_tc_service8.py | 64 ++++ test/obsw_module_test.py | 221 ++++++++------ test/obsw_pus_service_test.py | 136 ++++++--- tm/obsw_pus_tm_base.py | 37 ++- tm/obsw_pus_tm_factory.py | 51 ++-- tm/obsw_tm_service_1.py | 70 +++-- tm/obsw_tm_service_5.py | 24 +- utility/obsw_tmtc_printer.py | 25 +- 29 files changed, 867 insertions(+), 722 deletions(-) rename .idea/runConfigurations/{OBSW_TmTcClient_Unit_Test_Serial.xml => OBSW_TmTcClient_Module_Test_Serial.xml} (86%) rename config/{OBSW_Config.py => obsw_config.py} (88%) rename gui/{OBSW_BackendTest.py => obsw_backend_test.py} (100%) rename gui/{OBSW_TmtcGUI.py => obsw_tmtc_gui.py} (95%) delete mode 100644 tc/OBSW_TcService2.py delete mode 100644 tc/OBSW_TcService3.py delete mode 100644 tc/OBSW_TcService8.py create mode 100644 tc/obsw_tc_service2.py rename tc/{OBSW_TcService200.py => obsw_tc_service200.py} (71%) create mode 100644 tc/obsw_tc_service3.py create mode 100644 tc/obsw_tc_service8.py diff --git a/.idea/runConfigurations/OBSW_TmTcClient_Unit_Test_Serial.xml b/.idea/runConfigurations/OBSW_TmTcClient_Module_Test_Serial.xml similarity index 86% rename from .idea/runConfigurations/OBSW_TmTcClient_Unit_Test_Serial.xml rename to .idea/runConfigurations/OBSW_TmTcClient_Module_Test_Serial.xml index 7d146a9..9d3a170 100644 --- a/.idea/runConfigurations/OBSW_TmTcClient_Unit_Test_Serial.xml +++ b/.idea/runConfigurations/OBSW_TmTcClient_Module_Test_Serial.xml @@ -1,5 +1,5 @@ <component name="ProjectRunConfigurationManager"> - <configuration default="false" name="OBSW_TmTcClient Unit Test Serial" type="PythonConfigurationType" factoryName="Python" folderName="Serial Communication"> + <configuration default="false" name="OBSW_TmTcClient Module Test Serial" type="PythonConfigurationType" factoryName="Python" folderName="Serial Communication"> <module name="tmtc" /> <option name="INTERPRETER_OPTIONS" value="" /> <option name="PARENT_ENVS" value="true" /> diff --git a/.idea/runConfigurations/OBSW_TmTcClient_Service_8_Serial.xml b/.idea/runConfigurations/OBSW_TmTcClient_Service_8_Serial.xml index f1df18f..77f41e7 100644 --- a/.idea/runConfigurations/OBSW_TmTcClient_Service_8_Serial.xml +++ b/.idea/runConfigurations/OBSW_TmTcClient_Service_8_Serial.xml @@ -13,7 +13,7 @@ <option name="ADD_SOURCE_ROOTS" value="true" /> <EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" /> <option name="SCRIPT_NAME" value="$PROJECT_DIR$/obsw_tmtc_client.py" /> - <option name="PARAMETERS" value="-m 3 -s 8 -p -c 1" /> + <option name="PARAMETERS" value="-m 3 -s 8 -c 1" /> <option name="SHOW_COMMAND_LINE" value="false" /> <option name="EMULATE_TERMINAL" value="true" /> <option name="MODULE_MODE" value="false" /> diff --git a/comIF/obsw_ethernet_com_if.py b/comIF/obsw_ethernet_com_if.py index 94b19d6..21b833c 100644 --- a/comIF/obsw_ethernet_com_if.py +++ b/comIF/obsw_ethernet_com_if.py @@ -12,10 +12,10 @@ from typing import Tuple, Union from comIF.obsw_com_interface import CommunicationInterface, PusTmListT, PusTmQueueT, \ PusTmTupleQueueT, PusTmInfoQueueT -from tm.obsw_pus_tm_factory import PusTelemetryFactory +from tm.obsw_pus_tm_factory import pus_telemetry_factory from tc.obsw_pus_tc_base import PusTcInfoT from utility.obsw_tmtc_printer import TmTcPrinterT -import config.OBSW_Config as g +import config.obsw_config as g # pylint: disable=abstract-method @@ -53,7 +53,7 @@ class EthernetComIF(CommunicationInterface): ready = self.data_available(pollTimeout) if ready: data = self.sock_receive.recvfrom(1024)[0] - packet = PusTelemetryFactory(data) + packet = pus_telemetry_factory(data) self.tmtc_printer.print_telemetry(packet) packet_list = [packet] return True, packet_list diff --git a/comIF/obsw_serial_com_if.py b/comIF/obsw_serial_com_if.py index cd25af9..9be78d7 100644 --- a/comIF/obsw_serial_com_if.py +++ b/comIF/obsw_serial_com_if.py @@ -13,7 +13,7 @@ from typing import Tuple, List, Union, Optional import serial from comIF.obsw_com_interface import CommunicationInterface, PusTmQueueT from utility.obsw_tmtc_printer import TmTcPrinterT -from tm.obsw_pus_tm_factory import PusTelemetryFactory +from tm.obsw_pus_tm_factory import pus_telemetry_factory from tm.obsw_pus_tm_base import PusTmInfoQueueT, PusTmTupleQueueT, PusTmListT from tc.obsw_pus_tc_base import PusTcInfoT @@ -62,7 +62,7 @@ class SerialComIF(CommunicationInterface): pus_data_list, number_of_packets = self.__poll_pus_packets() packet_list = [] for counter in range(0, number_of_packets): - packet = PusTelemetryFactory(pus_data_list[counter]) + packet = pus_telemetry_factory(pus_data_list[counter]) self.tmtc_printer.print_telemetry(packet) packet_list.append(packet) return True, packet_list diff --git a/config/OBSW_Config.py b/config/obsw_config.py similarity index 88% rename from config/OBSW_Config.py rename to config/obsw_config.py index 67ae5b3..09923ff 100644 --- a/config/OBSW_Config.py +++ b/config/obsw_config.py @@ -1,12 +1,13 @@ """ @file - OBSW_Config.py + obsw_config.py @date 01.11.2019 @brief Global settings for UDP client """ import enum +import struct from typing import Tuple """ @@ -41,6 +42,13 @@ GPS0_ObjectId = bytearray([0x44, 0x10, 0x1F, 0x00]) GPS1_ObjectId = bytearray([0x44, 0x20, 0x20, 0x00]) DUMMY_DEVICE_ID = bytearray([0x44, 0x00, 0xAF, 0xFE]) +# Commands +DUMMY_COMMAND_1 = struct.pack(">I", 666) +DUMMY_COMMAND_2 = bytearray([0xC0, 0xC0, 0xBA, 0xBE]) +DUMMY_COMMAND_2_PARAM_1 = bytearray([0xBA, 0xB0]) +DUMMY_COMMAND_2_PARAM_2 = bytearray([0x00, 0x00, 0x00, 0x52, 0x4F, 0x42, 0x49, 0x4E]) +DUMMY_COMMAND_3 = bytearray([0xBA, 0xDE, 0xAF, 0xFE]) + # SIDs GPS0_SID = bytearray([0x00, 0x00, 0x1f, 0x00]) GPS1_SID = bytearray([0x00, 0x00, 0x2f, 0x00]) diff --git a/gui/OBSW_BackendTest.py b/gui/obsw_backend_test.py similarity index 100% rename from gui/OBSW_BackendTest.py rename to gui/obsw_backend_test.py diff --git a/gui/OBSW_TmtcGUI.py b/gui/obsw_tmtc_gui.py similarity index 95% rename from gui/OBSW_TmtcGUI.py rename to gui/obsw_tmtc_gui.py index 5a55c6b..7bf34bc 100644 --- a/gui/OBSW_TmtcGUI.py +++ b/gui/obsw_tmtc_gui.py @@ -2,7 +2,7 @@ # -*- coding: utf-8 -*- """ @file - OBSW_TmtcGUI.py + obsw_tmtc_gui.py @date 01.11.2019 @brief diff --git a/obsw_tmtc_client.py b/obsw_tmtc_client.py index 1b4a464..7fd1a4a 100644 --- a/obsw_tmtc_client.py +++ b/obsw_tmtc_client.py @@ -60,9 +60,9 @@ import sys from collections import deque from test import obsw_pus_service_test -from config import OBSW_Config as g -from config.OBSW_Config import setGlobals -from tc.obsw_pus_tc_packer import PusTelecommand, createTotalTcQueue, service_test_select +from config import obsw_config as g +from config.obsw_config import setGlobals +from tc.obsw_pus_tc_packer import PusTelecommand, create_total_tc_queue, service_test_select from sendreceive.obsw_single_command_sender_receiver import SingleCommandSenderReceiver from sendreceive.obsw_sequential_sender_receiver import SequentialCommandSenderReceiver @@ -76,8 +76,8 @@ from comIF.obsw_ethernet_com_if import EthernetComIF from comIF.obsw_serial_com_if import SerialComIF from comIF.obsw_com_interface import ComIfT -from gui.OBSW_TmtcGUI import TmTcGUI -from gui.OBSW_BackendTest import TmTcBackend +from gui.obsw_tmtc_gui import TmTcGUI +from gui.obsw_backend_test import TmTcBackend def main(): @@ -115,7 +115,7 @@ def main(): elif g.G_MODE_ID == g.ModeList.SingleCommandMode: pus_packet_tuple = command_preparation() sender_and_receiver = SingleCommandSenderReceiver( - comInterface=communication_interface, tmtcPrinter=tmtc_printer, tmListener=tm_listener, + com_interface=communication_interface, tmtc_printer=tmtc_printer, tm_listener=tm_listener, pusPacketTuple=pus_packet_tuple) sender_and_receiver.send_single_tc_and_receive_tm() @@ -127,7 +127,7 @@ def main(): sender_and_receiver.send_queue_tc_and_receive_tm_sequentially() elif g.G_MODE_ID == g.ModeList.SoftwareTestMode: - all_tc_queue = createTotalTcQueue() + all_tc_queue = create_total_tc_queue() sender_and_receiver = SequentialCommandSenderReceiver( com_interface=communication_interface, tmtc_printer=tmtc_printer, tc_queue=all_tc_queue, tm_listener=tm_listener) @@ -159,12 +159,12 @@ def command_preparation(): """ # Direct command which triggers an additional step reply and one completion reply # Single Command Testing - command = PusTelecommand(service=17, subservice=1, SSC=21) + command = PusTelecommand(service=17, subservice=1, ssc=21) # command.print() # file = bytearray([1, 2, 3, 4, 5]) # command = PUSTelecommand(G_SERVICE=23, subservice=1, SSC=21, _tm_data=file) # command.packCommandTuple() - return command.packCommandTuple() + return command.pack_command_tuple() def set_communication_interface(tmtc_printer: TmTcPrinterT) -> ComIfT: @@ -182,7 +182,7 @@ def set_communication_interface(tmtc_printer: TmTcPrinterT) -> ComIfT: else: com_port = g.G_COM_PORT baud_rate = 115200 - g.G_SERIAL_TIMEOUT = 0.05 + g.G_SERIAL_TIMEOUT = 0.03 communication_interface = SerialComIF( tmtc_printer=tmtc_printer, com_port=com_port, baud_rate=baud_rate, serial_timeout=g.G_SERIAL_TIMEOUT) diff --git a/sendreceive/obsw_command_sender_receiver.py b/sendreceive/obsw_command_sender_receiver.py index 6456c17..f6ddc2d 100644 --- a/sendreceive/obsw_command_sender_receiver.py +++ b/sendreceive/obsw_command_sender_receiver.py @@ -14,7 +14,7 @@ if the first reply has not been received. import sys import time -import config.OBSW_Config as g +import config.obsw_config as g from comIF.obsw_com_interface import CommunicationInterface from utility.obsw_tmtc_printer import TmTcPrinter from sendreceive.obsw_tm_listener import TmListener @@ -27,28 +27,28 @@ class CommandSenderReceiver: This is the generic CommandSenderReceiver object. All TMTC objects inherit this object, for example specific implementations (e.g. SingleCommandSenderReceiver) """ - def __init__(self, comInterface: CommunicationInterface, tmtcPrinter: TmTcPrinter, - tmListener: TmListener): + def __init__(self, com_interface: CommunicationInterface, tmtc_printer: TmTcPrinter, + tm_listener: TmListener): """ - :param comInterface: CommunicationInterface object. Instantiate the desired one + :param com_interface: CommunicationInterface object. Instantiate the desired one and pass it here - :param tmtcPrinter: TmTcPrinter object. Instantiate it and pass it here. + :param tmtc_printer: TmTcPrinter object. Instantiate it and pass it here. """ self._tm_timeout = g.G_TM_TIMEOUT self._tc_send_timeout_factor = g.G_TC_SEND_TIMEOUT_FACTOR - if isinstance(comInterface, CommunicationInterface): - self._com_interface = comInterface + if isinstance(com_interface, CommunicationInterface): + self._com_interface = com_interface else: raise TypeError("Invalid communication interface service_type!") - if isinstance(tmtcPrinter, TmTcPrinter): - self._tmtc_printer = tmtcPrinter + if isinstance(tmtc_printer, TmTcPrinter): + self._tmtc_printer = tmtc_printer else: raise TypeError("Invalid TMTC Printer service_type!") - if isinstance(tmListener, TmListener): - self._tm_listener = tmListener + if isinstance(tm_listener, TmListener): + self._tm_listener = tm_listener else: raise TypeError("Invalid TM Listener service_type!") self._reply_received = False diff --git a/sendreceive/obsw_multiple_commands_sender_receiver.py b/sendreceive/obsw_multiple_commands_sender_receiver.py index c3b7652..7cc029e 100644 --- a/sendreceive/obsw_multiple_commands_sender_receiver.py +++ b/sendreceive/obsw_multiple_commands_sender_receiver.py @@ -1,6 +1,6 @@ """ @file - OBSW_Config.py + obsw_config.py @date 01.11.2019 @brief @@ -14,7 +14,7 @@ from sendreceive.obsw_sequential_sender_receiver import SequentialCommandSenderR from comIF.obsw_com_interface import CommunicationInterface from utility.obsw_tmtc_printer import TmTcPrinter from sendreceive.obsw_tm_listener import TmListenerT -import config.OBSW_Config as g +import config.obsw_config as g class MultipleCommandSenderReceiver(SequentialCommandSenderReceiver): diff --git a/sendreceive/obsw_sequential_sender_receiver.py b/sendreceive/obsw_sequential_sender_receiver.py index c0b896c..9cd2b22 100644 --- a/sendreceive/obsw_sequential_sender_receiver.py +++ b/sendreceive/obsw_sequential_sender_receiver.py @@ -7,7 +7,7 @@ import sys import time -import config.OBSW_Config as g +import config.obsw_config as g from sendreceive.obsw_command_sender_receiver import CommandSenderReceiver from sendreceive.obsw_tm_listener import TmListenerT from comIF.obsw_com_interface import CommunicationInterface @@ -28,8 +28,8 @@ class SequentialCommandSenderReceiver(CommandSenderReceiver): :param tmtc_printer: TmTcPrinter object, passed on to CommandSenderReceiver for this time period """ - super().__init__(comInterface=com_interface, tmtcPrinter=tmtc_printer, - tmListener=tm_listener) + super().__init__(com_interface=com_interface, tmtc_printer=tmtc_printer, + tm_listener=tm_listener) self._tc_queue = tc_queue self.__first_reply_received = False self.__all_replies_received = False diff --git a/sendreceive/obsw_single_command_sender_receiver.py b/sendreceive/obsw_single_command_sender_receiver.py index 4fa2ea0..4bb1674 100644 --- a/sendreceive/obsw_single_command_sender_receiver.py +++ b/sendreceive/obsw_single_command_sender_receiver.py @@ -1,7 +1,7 @@ #!/usr/bin/python3.8 """ @file - OBSW_Config.py + obsw_config.py @date 01.11.2019 @brief @@ -13,7 +13,7 @@ from sendreceive.obsw_command_sender_receiver import CommandSenderReceiver from sendreceive.obsw_tm_listener import TmListenerT from comIF.obsw_com_interface import CommunicationInterface from utility.obsw_tmtc_printer import TmTcPrinterT -import config.OBSW_Config as g +import config.obsw_config as g class SingleCommandSenderReceiver(CommandSenderReceiver): @@ -21,14 +21,14 @@ class SingleCommandSenderReceiver(CommandSenderReceiver): Specific implementation of CommandSenderReceiver to send a single telecommand This object can be used by instantiating it and calling sendSingleTcAndReceiveTm() """ - def __init__(self, comInterface: CommunicationInterface, tmtcPrinter: TmTcPrinterT, - tmListener: TmListenerT, pusPacketTuple: tuple): + def __init__(self, com_interface: CommunicationInterface, tmtc_printer: TmTcPrinterT, + tm_listener: TmListenerT, pusPacketTuple: tuple): """ - :param comInterface: CommunicationInterface object, passed on to CommandSenderReceiver - :param tmListener: TmListener object which runs in the background and receives all TM - :param tmtcPrinter: TmTcPrinter object, passed on to CommandSenderReceiver + :param com_interface: CommunicationInterface object, passed on to CommandSenderReceiver + :param tm_listener: TmListener object which runs in the background and receives all TM + :param tmtc_printer: TmTcPrinter object, passed on to CommandSenderReceiver """ - super().__init__(comInterface=comInterface, tmListener=tmListener, tmtcPrinter=tmtcPrinter) + super().__init__(com_interface=com_interface, tm_listener=tm_listener, tmtc_printer=tmtc_printer) self.faulty_input = False try: self.pus_packet, self.pus_packet_info = pusPacketTuple diff --git a/sendreceive/obsw_tm_listener.py b/sendreceive/obsw_tm_listener.py index cd935aa..391dab0 100644 --- a/sendreceive/obsw_tm_listener.py +++ b/sendreceive/obsw_tm_listener.py @@ -15,7 +15,7 @@ from collections import deque from typing import TypeVar from comIF.obsw_com_interface import ComIfT -import config.OBSW_Config as g +import config.obsw_config as g TmListenerT = TypeVar('TmListenerT', bound='TmListener') diff --git a/tc/OBSW_TcService2.py b/tc/OBSW_TcService2.py deleted file mode 100644 index cdc1826..0000000 --- a/tc/OBSW_TcService2.py +++ /dev/null @@ -1,62 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Program: obsw_module_test.py -Date: 01.11.2019 -Description: PUS Custom Service 8: Device Access, Native low-level commanding - -@author: R. Mueller -""" -import struct - -from tc.obsw_pus_tc_base import PusTelecommand, Deque -from tc.OBSW_TcService200 import packModeData - - -def packService2TestInto(tcQueue: Deque, calledExternally: bool = False) -> Deque: - if calledExternally is False: - tcQueue.appendleft(("print", "Testing Service 2")) - objectId = bytearray([0x44, 0x00, 0xAF, 0xFE]) # dummy device - # don't forget to set object mode raw (G_SERVICE 200) before calling this ! - # Set Raw Mode - tcQueue.appendleft(("print", "\r\nTesting Service 2: Setting Raw Mode")) - modeData = packModeData(objectId, 3, 0) - command = PusTelecommand(service=200, subservice=1, SSC=2020, data=modeData) - tcQueue.appendleft(command.packCommandTuple()) - # toggle wiretapping raw - tcQueue.appendleft(("print", "\r\nTesting Service 2: Toggling Wiretapping Raw")) - wiretappingToggleData = packWiretappingMode(objectId, 1) - toggleWiretappingOnCommand = PusTelecommand(service=2, subservice=129, SSC=200, - data=wiretappingToggleData) - tcQueue.appendleft(toggleWiretappingOnCommand.packCommandTuple()) - # send raw command, _tm_data should be returned via TM[2,130] and TC[2,131] - tcQueue.appendleft(("print", "\r\nTesting Service 2: Sending Raw Command")) - rawCommand = struct.pack(">I", 666) - rawData = objectId + rawCommand - rawCommand = PusTelecommand(service=2, subservice=128, SSC=201, data=rawData) - tcQueue.appendleft(rawCommand.packCommandTuple()) - # toggle wiretapping off - tcQueue.appendleft(("print", "\r\nTesting Service 2: Toggle Wiretapping Off")) - wiretappingToggleData = packWiretappingMode(objectId, 0) - toggleWiretappingOffCommand = PusTelecommand(service=2, subservice=129, SSC=204, - data=wiretappingToggleData) - tcQueue.appendleft(toggleWiretappingOffCommand.packCommandTuple()) - # send raw command which should be returned via TM[2,130] - tcQueue.appendleft(("print", "\r\nTesting Service 2: Send second raw command")) - command = PusTelecommand(service=2, subservice=128, SSC=205, data=rawData) - tcQueue.appendleft(command.packCommandTuple()) - tcQueue.appendleft(("print", "\r")) - if calledExternally is False: - tcQueue.appendleft(("export", "log/tmtc_log_service2.txt")) - return tcQueue - - -# wiretappingMode = 0: MODE_OFF, wiretappingMode = 1: MODE_RAW -def packWiretappingMode(objectId, wiretappingMode_): - wiretappingMode = struct.pack(">B", wiretappingMode_) # MODE_OFF : 0x00, MODE_RAW: 0x01 - wiretappingToggleData = objectId + wiretappingMode - return wiretappingToggleData - - -def packSpecificService2TestInto(tcQueue, objectIdList, dataList, sscList, printStringList, - calledExternally=False): - pass diff --git a/tc/OBSW_TcService3.py b/tc/OBSW_TcService3.py deleted file mode 100644 index 1c9de0a..0000000 --- a/tc/OBSW_TcService3.py +++ /dev/null @@ -1,124 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Program: obsw_module_test.py -Date: 01.11.2019 -Description: PUS Custom Service 8: Device Access, Native low-level commanding - -@author: R. Mueller -""" -import struct -from typing import Deque -from tc.obsw_pus_tc_base import PusTelecommand - - -def packService3TestInto(tcQueue: Deque) -> Deque: - tcQueue.appendleft(("print", "Testing Service 3")) - # adding custom defintion to hk using test pool variables - sid1 = bytearray([0x00, 0x00, 0x43, 0x00]) - sid2 = bytearray([0x00, 0x00, 0x44, 0x00]) - sidGps = bytearray([0x00, 0x00, 0x1f, 0x00]) - collectionInterval = struct.pack('>f', 3) - numberOfParameters = struct.pack('B', 5) - p1 = bytearray([0x01, 0x01, 0x01, 0x01]) - p2 = bytearray([0x02, 0x02, 0x02, 0x02]) - p3 = bytearray([0x03, 0x03, 0x03, 0x03]) - p4 = bytearray([0x04, 0x04, 0x04, 0x04]) - p5 = bytearray([0x05, 0x05, 0x05, 0x05]) - hkDefinition1 = sid1 + collectionInterval + numberOfParameters + p1 + p2 + p3 + p4 + p5 - collectionInterval = struct.pack('>f', 6) - hkDefinition2 = sid2 + collectionInterval + numberOfParameters + p1 + p2 + p3 + p4 + p5 - - # deleting pre-defined test entry - tcQueue.appendleft(("print", "\r\nTesting Service 3: Deleting pre-defined HK definition")) - command = PusTelecommand(service=3, subservice=3, SSC=3000, data=sid1) - tcQueue.appendleft(command.packCommandTuple()) - - # adding pre-defined definition to hk using test pool variables - tcQueue.appendleft(("print", "\r\nTesting Service 3: Adding pre-defined HK definition")) - command = PusTelecommand(service=3, subservice=1, SSC=3010, data=hkDefinition1) - tcQueue.appendleft(command.packCommandTuple()) - - # adding custom definition to diagnostics using test pool variables - tcQueue.appendleft(("print", "\r\nTesting Service 3: Adding custom diganostics definition")) - command = PusTelecommand(service=3, subservice=2, SSC=3020, data=hkDefinition2) - tcQueue.appendleft(command.packCommandTuple()) - - # enable custom hk definition - tcQueue.appendleft(("print", "\r\nTesting Service 3: Enable custom definition")) - command = PusTelecommand(service=3, subservice=5, SSC=3030, data=sid1) - tcQueue.appendleft(command.packCommandTuple()) - # enable custom diag definition - tcQueue.appendleft(("print", "\r\nTesting Service 3: Enable custom diagnostics definition")) - command = PusTelecommand(service=3, subservice=7, SSC=3040, data=sid2) - tcQueue.appendleft(command.packCommandTuple()) - # enable gps0 - tcQueue.appendleft(("print", "\r\nTesting Service 3: Enable GPS definition")) - command = PusTelecommand(service=3, subservice=5, SSC=3050, data=sidGps) - tcQueue.appendleft(command.packCommandTuple()) - - # maybe wait a bit to receive at least 2 packets.. - tcQueue.appendleft(("wait", 3)) - - # Disable custom hk definition - tcQueue.appendleft(("print", "\r\nTesting Service 3: Disable custom definition")) - command = PusTelecommand(service=3, subservice=6, SSC=3060, data=sid1) - tcQueue.appendleft(command.packCommandTuple()) - # Disable custom diag definition - tcQueue.appendleft(("print", "\r\nTesting Service 3: Disable custom diagnostics definition")) - command = PusTelecommand(service=3, subservice=8, SSC=3070, data=sid2) - tcQueue.appendleft(command.packCommandTuple()) - # disable gps0 - tcQueue.appendleft(("print", "\r\nTesting Service 3: Disable GPS definition")) - command = PusTelecommand(service=3, subservice=6, SSC=3080, data=sidGps) - tcQueue.appendleft(command.packCommandTuple()) - # report custom Diag definition - tcQueue.appendleft(("print", "\r\nTesting Service 3: Reporting diag definition")) - command = PusTelecommand(service=3, subservice=11, SSC=3100, data=sid2) - tcQueue.appendleft(command.packCommandTuple()) - # report gps definition - tcQueue.appendleft(("print", "\r\nTesting Service 3: Reporting GPS definition")) - command = PusTelecommand(service=3, subservice=9, SSC=3110, data=sidGps) - tcQueue.appendleft(command.packCommandTuple()) - # generate one custom hk definition - tcQueue.appendleft(("print", "\r\nTesting Service 3: Generate one custom hk definition")) - command = PusTelecommand(service=3, subservice=27, SSC=3120, data=sid1) - tcQueue.appendleft(command.packCommandTuple()) - # generate one custom diag definition - tcQueue.appendleft(("print", "\r\nTesting Service 3: Generate one custom diagnostics definition")) - command = PusTelecommand(service=3, subservice=28, SSC=3120, data=sid2) - tcQueue.appendleft(command.packCommandTuple()) - # generate one gps 0 definition - tcQueue.appendleft(("print", "\r\nTesting Service 3: Generate one gps 0 defintion")) - command = PusTelecommand(service=3, subservice=27, SSC=3120, data=sidGps) - tcQueue.appendleft(command.packCommandTuple()) - # modify custom hk definition interval - newInterval = struct.pack('>f', 10.0) - newIntervalCommand = sid1 + newInterval - tcQueue.appendleft(("print", "\r\nTesting Service 3: Changing pre-defined HK definition interval")) - command = PusTelecommand(service=3, subservice=31, SSC=3090, data=newIntervalCommand) - tcQueue.appendleft(command.packCommandTuple()) - # report custom HK definition - tcQueue.appendleft(("print", "\r\nTesting Service 3: Reporting pre-defined HK definition with changed interval")) - command = PusTelecommand(service=3, subservice=9, SSC=3090, data=sid1) - tcQueue.appendleft(command.packCommandTuple()) - # modify custom diag definition interval - newIntervalCommand = sid2 + newInterval - tcQueue.appendleft(("print", "\r\nTesting Service 3: Changing custom diag HK definition interval")) - command = PusTelecommand(service=3, subservice=32, SSC=3090, data=newIntervalCommand) - tcQueue.appendleft(command.packCommandTuple()) - # report custom diag definition - tcQueue.appendleft(("print", "\r\nTesting Service 3: Reporting diag definition")) - command = PusTelecommand(service=3, subservice=11, SSC=3100, data=sid2) - tcQueue.appendleft(command.packCommandTuple()) - # append parameter to custom hk definiton - # append parameter to custom diag definition - - # delete custom diag definition - tcQueue.appendleft(("print", "\r\nTesting Service 3: Deleting custom diagnostics definition")) - command = PusTelecommand(service=3, subservice=4, SSC=3120, data=sid2) - tcQueue.appendleft(command.packCommandTuple()) - - # do some basic testing on predefined structs too - # e.g. add one variable, change interval, report them.... - tcQueue.appendleft(("export", "log/tmtc_log_service3.txt")) - return tcQueue diff --git a/tc/OBSW_TcService8.py b/tc/OBSW_TcService8.py deleted file mode 100644 index 98e8d2c..0000000 --- a/tc/OBSW_TcService8.py +++ /dev/null @@ -1,59 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Program: obsw_module_test.py -Date: 01.11.2019 -Description: PUS Custom Service 8: Function Management, High-Level Commanding - -@author: R. Mueller -""" -import struct -from typing import Deque - -from tc.obsw_pus_tc_base import PusTelecommand -from tc.OBSW_TcService200 import packModeData - - -def packService8TestInto(tcQueue: Deque, calledExternally: bool = False) -> Deque: - if calledExternally is False: - tcQueue.appendleft(("print", "Testing Service 8")) - objectId = bytearray([0x44, 0x00, 0xAF, 0xFE]) - # set mode on - tcQueue.appendleft(("print", "\r\nTesting Service 8: Set Normal Mode")) - modeData = packModeData(objectId, 1, 0) - command = PusTelecommand(service=200, subservice=1, SSC=800, data=modeData) - tcQueue.appendleft(command.packCommandTuple()) - # set mode normal - tcQueue.appendleft(("print", "\r\nTesting Service 8: Set Normal Mode")) - modeData = packModeData(objectId, 2, 0) - command = PusTelecommand(service=200, subservice=1, SSC=810, data=modeData) - tcQueue.appendleft(command.packCommandTuple()) - # Direct command which triggers completion reply - tcQueue.appendleft(("print", "\r\nTesting Service 8: Trigger Completion Reply")) - actionId = struct.pack(">I", 666) - directCommand = objectId + actionId - command = PusTelecommand(service=8, subservice=128, SSC=820, data=directCommand) - tcQueue.appendleft(command.packCommandTuple()) - # Direct command which triggers _tm_data reply - tcQueue.appendleft(("print", "\r\nTesting Service 8: Trigger Data Reply")) - actionId = bytearray([0xC0, 0xC0, 0xBA, 0xBE]) - commandParam1 = bytearray([0xBA, 0xB0]) - commandParam2 = bytearray([0x00, 0x00, 0x00, 0x52, 0x4F, 0x42, 0x49, 0x4E]) - directCommand = objectId + actionId + commandParam1 + commandParam2 - command = PusTelecommand(service=8, subservice=128, SSC=830, data=directCommand) - tcQueue.appendleft(command.packCommandTuple()) - # Direct command which triggers an additional step reply and one completion reply - tcQueue.appendleft(("print", "\r\nTesting Service 8: Trigger Step and Completion Reply")) - actionId = bytearray([0xBA, 0xDE, 0xAF, 0xFE]) - directCommand = objectId + actionId - command = PusTelecommand(service=8, subservice=128, SSC=840, data=directCommand) - tcQueue.appendleft(command.packCommandTuple()) - tcQueue.appendleft(("wait", 2)) - tcQueue.appendleft(("print", "\r")) - if calledExternally is False: - tcQueue.appendleft(("export", "log/tmtc_log_service8.txt")) - return tcQueue - - -def packSpecificService8TestInto(tcQueue, objectIdList, actionIdList, dataList, - sscList, printStringList, calledExternally=False): - pass diff --git a/tc/obsw_pus_tc_base.py b/tc/obsw_pus_tc_base.py index 473f4c2..22061af 100644 --- a/tc/obsw_pus_tc_base.py +++ b/tc/obsw_pus_tc_base.py @@ -4,16 +4,26 @@ Created on Wed Apr 4 11:43:00 2018 @author: S. Gaisser, R. Mueller """ - +import sys import crcmod import logging +from enum import Enum from typing import TypeVar, Dict, Union, Tuple, Deque PusTcT = TypeVar('PusTcT', bound='PUSTelecommand') -PusTcInfoT = Union[Dict[str, any], None] + + +class TcDictionaryKeys(Enum): + SERVICE = 1 + SUBSERVICE = 2 + SSC = 3 + PACKED_ID = 4 + DATA = 5 + + +PusTcInfoT = Union[Dict[TcDictionaryKeys, any], None] PusTcTupleT = Tuple[bytearray, PusTcInfoT] TcAuxiliaryTupleT = Tuple[str, any] -# Union: Type can be either of those 2 TcQueueEntryT = Union[TcAuxiliaryTupleT, PusTcTupleT] TcQueueT = Deque[TcQueueEntryT] PusTcInfoQueueT = Deque[PusTcInfoT] @@ -22,65 +32,74 @@ PusTcInfoQueueT = Deque[PusTcInfoT] class PusTelecommand: headerSize = 6 - def __init__(self, service: int, subservice: int, SSC=0, data=bytearray([]), sourceId: int = 0, - version: int = 0, packetType: int = 1, dataFieldHeaderFlag: int = 1, + def __init__(self, service: int, subservice: int, ssc=0, data=bytearray([]), source_id: int = 0, + version: int = 0, packet_type: int = 1, data_field_header_flag: int = 1, apid: int = 0x73, length: int = 0): self.packet_id = [0x0, 0x0] - self.packet_id[0] = ((version << 5) & 0xE0) | ((packetType & 0x01) << 4) | \ - ((dataFieldHeaderFlag & 0x01) << 3) | ((apid & 0x700) >> 8) + self.packet_id[0] = ((version << 5) & 0xE0) | ((packet_type & 0x01) << 4) | \ + ((data_field_header_flag & 0x01) << 3) | ((apid & 0x700) >> 8) self.packet_id[1] = apid & 0xFF - self.ssc = SSC - self.psc = (SSC & 0x3FFF) | (0xC0 << 8) + self.ssc = ssc + self.psc = (ssc & 0x3FFF) | (0xC0 << 8) self.length = length - self.pusVersionAndAckByte = 0b00011111 + self.pus_version_and_ack_byte = 0b00011111 self.service = service self.subservice = subservice - self.sourceId = sourceId + self.source_id = source_id self.data = data - def getLength(self) -> int: + def get_length(self) -> int: + """ + Retrieve size of TC packet in bytes. + """ try: length = 4 + len(self.data) + 1 return length except TypeError: print("OBSW_TcPacket: Invalid service_type of data") logging.exception("Error") - exit() + sys.exit() def pack(self) -> bytearray: - dataToPack = bytearray() - dataToPack.append(self.packet_id[0]) - dataToPack.append(self.packet_id[1]) - dataToPack.append((self.psc & 0xFF00) >> 8) - dataToPack.append(self.psc & 0xFF) - length = self.getLength() - dataToPack.append((length & 0xFF00) >> 8) - dataToPack.append(length & 0xFF) - dataToPack.append(self.pusVersionAndAckByte) - dataToPack.append(self.service) - dataToPack.append(self.subservice) - dataToPack.append(self.sourceId) - dataToPack += self.data + """ + Serializes the TC data fields into a bytearray. + """ + data_to_pack = bytearray() + data_to_pack.append(self.packet_id[0]) + data_to_pack.append(self.packet_id[1]) + data_to_pack.append((self.psc & 0xFF00) >> 8) + data_to_pack.append(self.psc & 0xFF) + length = self.get_length() + data_to_pack.append((length & 0xFF00) >> 8) + data_to_pack.append(length & 0xFF) + data_to_pack.append(self.pus_version_and_ack_byte) + data_to_pack.append(self.service) + data_to_pack.append(self.subservice) + data_to_pack.append(self.source_id) + data_to_pack += self.data crc_func = crcmod.mkCrcFun(0x11021, rev=False, initCrc=0xFFFF, xorOut=0x0000) - crc = crc_func(dataToPack) - - dataToPack.append((crc & 0xFF00) >> 8) - dataToPack.append(crc & 0xFF) - return dataToPack - - def packInformation(self) -> PusTcInfoT: - tcInformation = { - "service": self.service, - "subservice": self.subservice, - "ssc": self.ssc, - "packet_id": self.packet_id, - "data": self.data + crc = crc_func(data_to_pack) + + data_to_pack.append((crc & 0xFF00) >> 8) + data_to_pack.append(crc & 0xFF) + return data_to_pack + + def pack_information(self) -> PusTcInfoT: + """ + Packs TM information into a dictionary. + """ + tc_information = { + TcDictionaryKeys.SERVICE: self.service, + TcDictionaryKeys.SUBSERVICE: self.subservice, + TcDictionaryKeys.SSC: self.ssc, + TcDictionaryKeys.PACKED_ID: self.packet_id, + TcDictionaryKeys.DATA: self.data } - return tcInformation + return tc_information - def packCommandTuple(self) -> PusTcTupleT: - commandTuple = (self.pack(), self.packInformation()) - return commandTuple + def pack_command_tuple(self) -> PusTcTupleT: + command_tuple = (self.pack(), self.pack_information()) + return command_tuple def print(self): packet = self.pack() @@ -96,22 +115,22 @@ class PusTelecommand: # Takes pusPackets, removes current Packet Error Control, # calculates new CRC (16 bits at wiretapping_packet end) and # adds it as correct Packet Error Control Code. Reference: ECSS-E70-41A p. 207-212 -def generatePacketCRC(TCPacket: PusTcT) -> PusTcT: +def generate_packet_crc(tc_packet: PusTcT) -> PusTcT: crc_func = crcmod.mkCrcFun(0x11021, rev=False, initCrc=0xFFFF, xorOut=0x0000) - crc = crc_func(bytearray(TCPacket[0:len(TCPacket) - 2])) - TCPacket[len(TCPacket) - 2] = (crc & 0xFF00) >> 8 - TCPacket[len(TCPacket) - 1] = crc & 0xFF - return TCPacket + crc = crc_func(bytearray(tc_packet[0:len(tc_packet) - 2])) + tc_packet[len(tc_packet) - 2] = (crc & 0xFF00) >> 8 + tc_packet[len(tc_packet) - 1] = crc & 0xFF + return tc_packet -def generateCRC(data: bytearray) -> bytearray: - dataWithCRC = bytearray() - dataWithCRC += data +def generate_crc(data: bytearray) -> bytearray: + data_with_crc = bytearray() + data_with_crc += data crc_func = crcmod.mkCrcFun(0x11021, rev=False, initCrc=0xFFFF, xorOut=0x0000) crc = crc_func(data) - dataWithCRC.append((crc & 0xFF00) >> 8) - dataWithCRC.append(crc & 0xFF) - return dataWithCRC + data_with_crc.append((crc & 0xFF00) >> 8) + data_with_crc.append(crc & 0xFF) + return data_with_crc # pylint: disable=line-too-long diff --git a/tc/obsw_pus_tc_packer.py b/tc/obsw_pus_tc_packer.py index 5ec05f6..2f378a4 100644 --- a/tc/obsw_pus_tc_packer.py +++ b/tc/obsw_pus_tc_packer.py @@ -14,204 +14,204 @@ Run script with -s <Service or Device> -m 3 with optional -p parameter for file import os from tc.obsw_pus_tc_base import PusTelecommand, TcQueueT -from tc.OBSW_TcService2 import packService2TestInto -from tc.OBSW_TcService3 import packService3TestInto -from tc.OBSW_TcService8 import packService8TestInto -from tc.OBSW_TcService200 import packModeData, packService200TestInto -import config.OBSW_Config as g +from tc.obsw_tc_service2 import pack_service2_test_into +from tc.obsw_tc_service3 import pack_service3_test_into +from tc.obsw_tc_service8 import pack_service8_test_into +from tc.obsw_tc_service200 import packModeData, pack_service200_test_into +import config.obsw_config as g from datetime import datetime from collections import deque from typing import Union -def service_test_select(service: Union[int, str], serviceQueue: TcQueueT) -> TcQueueT: +def service_test_select(service: Union[int, str], service_queue: TcQueueT) -> TcQueueT: if service == 2: - return packService2TestInto(serviceQueue) + return pack_service2_test_into(service_queue) elif service == 3: - return packService3TestInto(serviceQueue) + return pack_service3_test_into(service_queue) elif service == 5: - return packService5TestInto(serviceQueue) + return pack_service5_test_into(service_queue) elif service == 8: - return packService8TestInto(serviceQueue) + return pack_service8_test_into(service_queue) elif service == 9: - return packService9TestInto(serviceQueue) + return pack_service9_test_into(service_queue) elif service == 17: - return packService17TestInto(serviceQueue) + return pack_service17_test_into(service_queue) elif service == 200: - return packService200TestInto(serviceQueue) + return pack_service200_test_into(service_queue) elif service == "Dummy": - return packDummyDeviceTestInto(serviceQueue) + return pack_dummy_device_test_into(service_queue) elif service == "GPS0": # Object ID: GPS Device - objectId = g.GPS0_ObjectId - return packGpsTestInto(objectId, serviceQueue) + object_id = g.GPS0_ObjectId + return pack_gps_test_into(object_id, service_queue) elif service == "GPS1": # Object ID: GPS Device - objectId = g.GPS1_ObjectId - return packGpsTestInto(objectId, serviceQueue) + object_id = g.GPS1_ObjectId + return pack_gps_test_into(object_id, service_queue) elif service == "Error": - return packErrorTestingInto(serviceQueue) + return pack_error_testing_into(service_queue) else: print("Invalid Service !") exit() -def packService5TestInto(tcQueue: TcQueueT) -> TcQueueT: - tcQueue.appendleft(("print", "Testing Service 5")) +def pack_service5_test_into(tc_queue: TcQueueT) -> TcQueueT: + tc_queue.appendleft(("print", "Testing Service 5")) # disable events - tcQueue.appendleft(("print", "\r\nTesting Service 5: Disable event")) - command = PusTelecommand(service=5, subservice=6, SSC=500) - tcQueue.appendleft(command.packCommandTuple()) + tc_queue.appendleft(("print", "\r\nTesting Service 5: Disable event")) + command = PusTelecommand(service=5, subservice=6, ssc=500) + tc_queue.appendleft(command.pack_command_tuple()) # trigger event - tcQueue.appendleft(("print", "\r\nTesting Service 5: Trigger event")) - command = PusTelecommand(service=17, subservice=128, SSC=510) - tcQueue.appendleft(command.packCommandTuple()) + tc_queue.appendleft(("print", "\r\nTesting Service 5: Trigger event")) + command = PusTelecommand(service=17, subservice=128, ssc=510) + tc_queue.appendleft(command.pack_command_tuple()) # enable event - tcQueue.appendleft(("print", "\r\nTesting Service 5: Enable event")) - command = PusTelecommand(service=5, subservice=5, SSC=520) - tcQueue.appendleft(command.packCommandTuple()) + tc_queue.appendleft(("print", "\r\nTesting Service 5: Enable event")) + command = PusTelecommand(service=5, subservice=5, ssc=520) + tc_queue.appendleft(command.pack_command_tuple()) # trigger event - tcQueue.appendleft(("print", "\r\nTesting Service 5: Trigger another event")) - command = PusTelecommand(service=17, subservice=128, SSC=530) - tcQueue.appendleft(command.packCommandTuple()) - tcQueue.appendleft(("print", "\r")) - tcQueue.appendleft(("export", "log/tmtc_log_service5.txt")) - return tcQueue - - -def packService9TestInto(tcQueue: TcQueueT) -> TcQueueT: - tcQueue.appendleft(("print", "Testing Service 9")) - timeTestASCIICodeA = '2019-08-30T20:50:33.892429Z' + '\0' - timeTestASCIICodeB = '2019-270T05:50:33.002000Z' + '\0' - timeTestCurrentTime = datetime.now().isoformat() + "Z" + '\0' - timeTest1 = timeTestASCIICodeA.encode('ascii') - timeTest2 = timeTestASCIICodeB.encode('ascii') - timeTest3 = timeTestCurrentTime.encode('ascii') - print("Time Code 1 :" + str(timeTest1)) - print("Time Code 2 :" + str(timeTest2)) - print("Time Code 3 :" + str(timeTest3) + "\r") + tc_queue.appendleft(("print", "\r\nTesting Service 5: Trigger another event")) + command = PusTelecommand(service=17, subservice=128, ssc=530) + tc_queue.appendleft(command.pack_command_tuple()) + tc_queue.appendleft(("print", "\r")) + tc_queue.appendleft(("export", "log/tmtc_log_service5.txt")) + return tc_queue + + +def pack_service9_test_into(tc_queue: TcQueueT) -> TcQueueT: + tc_queue.appendleft(("print", "Testing Service 9")) + time_test_ascii_code_a = '2019-08-30T20:50:33.892429Z' + '\0' + time_test_ascii_code_b = '2019-270T05:50:33.002000Z' + '\0' + time_test_current_time = datetime.now().isoformat() + "Z" + '\0' + time_test1 = time_test_ascii_code_a.encode('ascii') + time_test2 = time_test_ascii_code_b.encode('ascii') + time_test3 = time_test_current_time.encode('ascii') + print("Time Code 1 :" + str(time_test1)) + print("Time Code 2 :" + str(time_test2)) + print("Time Code 3 :" + str(time_test3) + "\r") # time setting - tcQueue.appendleft(("print", "\r\nTesting Service 9: Testing timecode A")) - command = PusTelecommand(service=9, subservice=128, SSC=900, data=timeTest1) - tcQueue.appendleft(command.packCommandTuple()) - tcQueue.appendleft(("print", "\r\nTesting Service 9: Testing timecode B")) - command = PusTelecommand(service=9, subservice=128, SSC=910, data=timeTest2) - tcQueue.appendleft(command.packCommandTuple()) - tcQueue.appendleft(("print", "\r\nTesting Service 9: Testing timecode Current Time")) - command = PusTelecommand(service=9, subservice=128, SSC=920, data=timeTest3) - tcQueue.appendleft(command.packCommandTuple()) + tc_queue.appendleft(("print", "\r\nTesting Service 9: Testing timecode A")) + command = PusTelecommand(service=9, subservice=128, ssc=900, data=time_test1) + tc_queue.appendleft(command.pack_command_tuple()) + tc_queue.appendleft(("print", "\r\nTesting Service 9: Testing timecode B")) + command = PusTelecommand(service=9, subservice=128, ssc=910, data=time_test2) + tc_queue.appendleft(command.pack_command_tuple()) + tc_queue.appendleft(("print", "\r\nTesting Service 9: Testing timecode Current Time")) + command = PusTelecommand(service=9, subservice=128, ssc=920, data=time_test3) + tc_queue.appendleft(command.pack_command_tuple()) # TODO: Add other time formats here - tcQueue.appendleft(("print", "\r")) - tcQueue.appendleft(("export", "log/tmtc_log_service9.txt")) - return tcQueue + tc_queue.appendleft(("print", "\r")) + tc_queue.appendleft(("export", "log/tmtc_log_service9.txt")) + return tc_queue -def packService17TestInto(tcQueue: TcQueueT) -> TcQueueT: - tcQueue.appendleft(("print", "Testing Service 17")) +def pack_service17_test_into(tc_queue: TcQueueT) -> TcQueueT: + tc_queue.appendleft(("print", "Testing Service 17")) # ping test - tcQueue.appendleft(("print", "\n\rTesting Service 17: Ping Test")) - command = PusTelecommand(service=17, subservice=1, SSC=1700) - tcQueue.appendleft(command.packCommandTuple()) + tc_queue.appendleft(("print", "\n\rTesting Service 17: Ping Test")) + command = PusTelecommand(service=17, subservice=1, ssc=1700) + tc_queue.appendleft(command.pack_command_tuple()) # enable event - tcQueue.appendleft(("print", "\n\rTesting Service 17: Enable Event")) - command = PusTelecommand(service=5, subservice=5, SSC=52) - tcQueue.appendleft(command.packCommandTuple()) + tc_queue.appendleft(("print", "\n\rTesting Service 17: Enable Event")) + command = PusTelecommand(service=5, subservice=5, ssc=52) + tc_queue.appendleft(command.pack_command_tuple()) # test event - tcQueue.appendleft(("print", "\n\rTesting Service 17: Trigger event")) - command = PusTelecommand(service=17, subservice=128, SSC=1701) - tcQueue.appendleft(command.packCommandTuple()) - tcQueue.appendleft(("print", "\r")) - tcQueue.appendleft(("export", "log/tmtc_log_service17.txt")) - return tcQueue + tc_queue.appendleft(("print", "\n\rTesting Service 17: Trigger event")) + command = PusTelecommand(service=17, subservice=128, ssc=1701) + tc_queue.appendleft(command.pack_command_tuple()) + tc_queue.appendleft(("print", "\r")) + tc_queue.appendleft(("export", "log/tmtc_log_service17.txt")) + return tc_queue -def packDummyDeviceTestInto(tcQueue: TcQueueT) -> TcQueueT: - tcQueue.appendleft(("print", "Testing Dummy Device")) +def pack_dummy_device_test_into(tc_queue: TcQueueT) -> TcQueueT: + tc_queue.appendleft(("print", "Testing Dummy Device")) # Object ID: Dummy Device objectId = bytearray([0x44, 0x00, 0xAF, 0xFE]) # Set On Mode - tcQueue.appendleft(("print", "\n\rTesting Service Dummy: Set On")) + tc_queue.appendleft(("print", "\n\rTesting Service Dummy: Set On")) modeData = packModeData(objectId, 1, 0) - command = PusTelecommand(service=200, subservice=1, SSC=1, data=modeData) - tcQueue.appendleft(command.packCommandTuple()) + command = PusTelecommand(service=200, subservice=1, ssc=1, data=modeData) + tc_queue.appendleft(command.pack_command_tuple()) # Test Service 2 commands - tcQueue.appendleft(("print", "\n\rTesting Service Dummy: Service 2")) - packService2TestInto(tcQueue, True) + tc_queue.appendleft(("print", "\n\rTesting Service Dummy: Service 2")) + pack_service2_test_into(tc_queue, True) # Test Service 8 - tcQueue.appendleft(("print", "\n\rTesting Service Dummy: Service 8")) - packService8TestInto(tcQueue, True) - tcQueue.appendleft(("print", "\r")) - tcQueue.appendleft(("export", "log/tmtc_log_service_dummy.txt")) - return tcQueue - - -def packGpsTestInto(objectId: bytearray, tcQueue: TcQueueT) -> TcQueueT: - if objectId == g.GPS0_ObjectId: - gpsString = "GPS0" - elif objectId == g.GPS1_ObjectId: - gpsString = "GPS1" + tc_queue.appendleft(("print", "\n\rTesting Service Dummy: Service 8")) + pack_service8_test_into(tc_queue, True) + tc_queue.appendleft(("print", "\r")) + tc_queue.appendleft(("export", "log/tmtc_log_service_dummy.txt")) + return tc_queue + + +def pack_gps_test_into(object_id: bytearray, tc_queue: TcQueueT) -> TcQueueT: + if object_id == g.GPS0_ObjectId: + gps_string = "GPS0" + elif object_id == g.GPS1_ObjectId: + gps_string = "GPS1" else: - gpsString = "unknown" - tcQueue.appendleft(("print", "Testing " + gpsString + " Device")) + gps_string = "unknown" + tc_queue.appendleft(("print", "Testing " + gps_string + " Device")) # Set Mode Off - tcQueue.appendleft(("print", "\n\rTesting " + gpsString + ": Set Off")) - modeData = packModeData(objectId, 0, 0) - command = PusTelecommand(service=200, subservice=1, SSC=11, data=modeData) - tcQueue.appendleft(command.packCommandTuple()) + tc_queue.appendleft(("print", "\n\rTesting " + gps_string + ": Set Off")) + mode_data = packModeData(object_id, 0, 0) + command = PusTelecommand(service=200, subservice=1, ssc=11, data=mode_data) + tc_queue.appendleft(command.pack_command_tuple()) # Set Mode On - tcQueue.appendleft(("print", "\n\rTesting " + gpsString + ": Set On")) - modeData = packModeData(objectId, 1, 0) - command = PusTelecommand(service=200, subservice=1, SSC=12, data=modeData) - tcQueue.appendleft(command.packCommandTuple()) + tc_queue.appendleft(("print", "\n\rTesting " + gps_string + ": Set On")) + mode_data = packModeData(object_id, 1, 0) + command = PusTelecommand(service=200, subservice=1, ssc=12, data=mode_data) + tc_queue.appendleft(command.pack_command_tuple()) # Enable HK report - sidGps = 0 - if objectId == g.GPS0_ObjectId: - sidGps = g.GPS0_SID - tcQueue.appendleft(("print", "\r\nTesting " + gpsString + ": Enable HK Reporting")) - command = PusTelecommand(service=3, subservice=5, SSC=13, data=sidGps) - tcQueue.appendleft(command.packCommandTuple()) - elif objectId == g.GPS1_ObjectId: - sidGps = g.GPS1_SID - tcQueue.appendleft(("print", "\r\nTesting " + gpsString + ": Enable HK Reporting")) - command = PusTelecommand(service=3, subservice=5, SSC=14, data=sidGps) - tcQueue.appendleft(command.packCommandTuple()) + sid_gps = 0 + if object_id == g.GPS0_ObjectId: + sid_gps = g.GPS0_SID + tc_queue.appendleft(("print", "\r\nTesting " + gps_string + ": Enable HK Reporting")) + command = PusTelecommand(service=3, subservice=5, ssc=13, data=sid_gps) + tc_queue.appendleft(command.pack_command_tuple()) + elif object_id == g.GPS1_ObjectId: + sid_gps = g.GPS1_SID + tc_queue.appendleft(("print", "\r\nTesting " + gps_string + ": Enable HK Reporting")) + command = PusTelecommand(service=3, subservice=5, ssc=14, data=sid_gps) + tc_queue.appendleft(command.pack_command_tuple()) # pack wait interval until mode is on and a few gps replies have been received - tcQueue.appendleft(("wait", 5)) + tc_queue.appendleft(("wait", 5)) # Disable HK reporting - tcQueue.appendleft(("print", "\r\nTesting Service 3: Disable " + gpsString + " definition")) - command = PusTelecommand(service=3, subservice=6, SSC=15, data=sidGps) - tcQueue.appendleft(command.packCommandTuple()) + tc_queue.appendleft(("print", "\r\nTesting Service 3: Disable " + gps_string + " definition")) + command = PusTelecommand(service=3, subservice=6, ssc=15, data=sid_gps) + tc_queue.appendleft(command.pack_command_tuple()) # Set Mode Off - tcQueue.appendleft(("print", "\n\rTesting " + gpsString + ": Set Off")) - modeData = packModeData(objectId, 0, 0) - command = PusTelecommand(service=200, subservice=1, SSC=13, data=modeData) - tcQueue.appendleft(command.packCommandTuple()) - tcQueue.appendleft(("print", "\r")) - tcQueue.appendleft(("export", "log/tmtc_log_service_" + gpsString + ".txt")) - return tcQueue + tc_queue.appendleft(("print", "\n\rTesting " + gps_string + ": Set Off")) + mode_data = packModeData(object_id, 0, 0) + command = PusTelecommand(service=200, subservice=1, ssc=13, data=mode_data) + tc_queue.appendleft(command.pack_command_tuple()) + tc_queue.appendleft(("print", "\r")) + tc_queue.appendleft(("export", "log/tmtc_log_service_" + gps_string + ".txt")) + return tc_queue -def packErrorTestingInto(tcQueue: TcQueueT) -> TcQueueT: +def pack_error_testing_into(tc_queue: TcQueueT) -> TcQueueT: # a lot of events - command = PusTelecommand(service=17, subservice=129, SSC=2010) - tcQueue.appendleft(command.packCommandTuple()) + command = PusTelecommand(service=17, subservice=129, ssc=2010) + tc_queue.appendleft(command.pack_command_tuple()) # a lot of ping testing - command = PusTelecommand(service=17, subservice=130, SSC=2020) - tcQueue.appendleft(command.packCommandTuple()) - return tcQueue + command = PusTelecommand(service=17, subservice=130, ssc=2020) + tc_queue.appendleft(command.pack_command_tuple()) + return tc_queue -def createTotalTcQueue() -> TcQueueT: +def create_total_tc_queue() -> TcQueueT: if not os.path.exists("log"): os.mkdir("log") - tcQueue = deque() - tcQueue = packService2TestInto(tcQueue) - tcQueue = packService5TestInto(tcQueue) - tcQueue = packService8TestInto(tcQueue) - tcQueue = packService9TestInto(tcQueue) - tcQueue = packService17TestInto(tcQueue) - tcQueue = packService200TestInto(tcQueue) - tcQueue = packDummyDeviceTestInto(tcQueue) + tc_queue = deque() + tc_queue = pack_service2_test_into(tc_queue) + tc_queue = pack_service5_test_into(tc_queue) + tc_queue = pack_service8_test_into(tc_queue) + tc_queue = pack_service9_test_into(tc_queue) + tc_queue = pack_service17_test_into(tc_queue) + tc_queue = pack_service200_test_into(tc_queue) + tc_queue = pack_dummy_device_test_into(tc_queue) # objectId = bytearray([0x44, 0x00, 0x1F, 0x00]) # tc_queue = packGpsTestInto(objectId, tc_queue) - return tcQueue + return tc_queue diff --git a/tc/obsw_tc_service2.py b/tc/obsw_tc_service2.py new file mode 100644 index 0000000..b1c6151 --- /dev/null +++ b/tc/obsw_tc_service2.py @@ -0,0 +1,63 @@ +# -*- coding: utf-8 -*- +""" +Program: obsw_module_test.py +Date: 01.11.2019 +Description: PUS Custom Service 8: Device Access, Native low-level commanding + +@author: R. Mueller +""" +import struct + +import config.obsw_config as g +from tc.obsw_pus_tc_base import PusTelecommand, Deque +from tc.obsw_tc_service200 import packModeData + + +def pack_service2_test_into(tc_queue: Deque, called_externally: bool = False) -> Deque: + if called_externally is False: + tc_queue.appendleft(("print", "Testing Service 2")) + object_id = g.DUMMY_DEVICE_ID # dummy device + # don't forget to set object mode raw (G_SERVICE 200) before calling this ! + # Set Raw Mode + tc_queue.appendleft(("print", "\r\nTesting Service 2: Setting Raw Mode")) + mode_data = packModeData(object_id, 3, 0) + command = PusTelecommand(service=200, subservice=1, ssc=2020, data=mode_data) + tc_queue.appendleft(command.pack_command_tuple()) + # toggle wiretapping raw + tc_queue.appendleft(("print", "\r\nTesting Service 2: Toggling Wiretapping Raw")) + wiretapping_toggle_data = pack_wiretapping_mode(object_id, 1) + toggle_wiretapping_on_command = PusTelecommand(service=2, subservice=129, ssc=200, + data=wiretapping_toggle_data) + tc_queue.appendleft(toggle_wiretapping_on_command.pack_command_tuple()) + # send raw command, _tm_data should be returned via TM[2,130] and TC[2,131] + tc_queue.appendleft(("print", "\r\nTesting Service 2: Sending Raw Command")) + raw_command = g.DUMMY_COMMAND_1 + raw_data = object_id + raw_command + raw_command = PusTelecommand(service=2, subservice=128, ssc=201, data=raw_data) + tc_queue.appendleft(raw_command.pack_command_tuple()) + # toggle wiretapping off + tc_queue.appendleft(("print", "\r\nTesting Service 2: Toggle Wiretapping Off")) + wiretapping_toggle_data = pack_wiretapping_mode(object_id, 0) + toggle_wiretapping_off_command = PusTelecommand(service=2, subservice=129, ssc=204, + data=wiretapping_toggle_data) + tc_queue.appendleft(toggle_wiretapping_off_command.pack_command_tuple()) + # send raw command which should be returned via TM[2,130] + tc_queue.appendleft(("print", "\r\nTesting Service 2: Send second raw command")) + command = PusTelecommand(service=2, subservice=128, ssc=205, data=raw_data) + tc_queue.appendleft(command.pack_command_tuple()) + tc_queue.appendleft(("print", "\r")) + if called_externally is False: + tc_queue.appendleft(("export", "log/tmtc_log_service2.txt")) + return tc_queue + + +# wiretappingMode = 0: MODE_OFF, wiretappingMode = 1: MODE_RAW +def pack_wiretapping_mode(object_id, wiretapping_mode_): + wiretapping_mode = struct.pack(">B", wiretapping_mode_) # MODE_OFF : 0x00, MODE_RAW: 0x01 + wiretapping_toggle_data = object_id + wiretapping_mode + return wiretapping_toggle_data + + +def pack_specific_service2_test_into(tcQueue, objectIdList, dataList, sscList, printStringList, + calledExternally=False): + pass diff --git a/tc/OBSW_TcService200.py b/tc/obsw_tc_service200.py similarity index 71% rename from tc/OBSW_TcService200.py rename to tc/obsw_tc_service200.py index 2c9fe91..654116a 100644 --- a/tc/OBSW_TcService200.py +++ b/tc/obsw_tc_service200.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- """ -Program: OBSW_TcService200.py +Program: obsw_tc_service200.py Date: 01.11.2019 Description: PUS Custom Service 200: Mode Commanding @@ -13,34 +13,34 @@ Run script with -s <Service or Device> -m 3 with optional -p parameter for file """ from tc.obsw_pus_tc_base import PusTelecommand from tc.obsw_pus_tc_packer import TcQueueT -import config.OBSW_Config as g +import config.obsw_config as g import struct -def packService200TestInto(tcQueue: TcQueueT) -> TcQueueT: +def pack_service200_test_into(tcQueue: TcQueueT) -> TcQueueT: tcQueue.appendleft(("print", "Testing Service 200")) # Object ID: Dummy Device objectId = g.DUMMY_DEVICE_ID # Set On Mode tcQueue.appendleft(("print", "\r\nTesting Service 200: Set Mode On")) modeData = packModeData(objectId, 1, 0) - command = PusTelecommand(service=200, subservice=1, SSC=2000, data=modeData) - tcQueue.appendleft(command.packCommandTuple()) + command = PusTelecommand(service=200, subservice=1, ssc=2000, data=modeData) + tcQueue.appendleft(command.pack_command_tuple()) # Set Normal mode tcQueue.appendleft(("print", "\r\nTesting Service 200: Set Mode Normal")) modeData = packModeData(objectId, 2, 0) - command = PusTelecommand(service=200, subservice=1, SSC=2010, data=modeData) - tcQueue.appendleft(command.packCommandTuple()) + command = PusTelecommand(service=200, subservice=1, ssc=2010, data=modeData) + tcQueue.appendleft(command.pack_command_tuple()) # Set Raw Mode tcQueue.appendleft(("print", "\r\nTesting Service 200: Set Mode Raw")) modeData = packModeData(objectId, 3, 0) - command = PusTelecommand(service=200, subservice=1, SSC=2020, data=modeData) - tcQueue.appendleft(command.packCommandTuple()) + command = PusTelecommand(service=200, subservice=1, ssc=2020, data=modeData) + tcQueue.appendleft(command.pack_command_tuple()) # Set Off Mode tcQueue.appendleft(("print", "\r\nTesting Service 200: Set Mode Off")) modeData = packModeData(objectId, 0, 0) - command = PusTelecommand(service=200, subservice=1, SSC=2030, data=modeData) - tcQueue.appendleft(command.packCommandTuple()) + command = PusTelecommand(service=200, subservice=1, ssc=2030, data=modeData) + tcQueue.appendleft(command.pack_command_tuple()) tcQueue.appendleft(("export", "log/tmtc_log_service200.txt")) return tcQueue diff --git a/tc/obsw_tc_service3.py b/tc/obsw_tc_service3.py new file mode 100644 index 0000000..c23309c --- /dev/null +++ b/tc/obsw_tc_service3.py @@ -0,0 +1,124 @@ +# -*- coding: utf-8 -*- +""" +Program: obsw_module_test.py +Date: 01.11.2019 +Description: PUS Custom Service 8: Device Access, Native low-level commanding + +@author: R. Mueller +""" +import struct +from typing import Deque +from tc.obsw_pus_tc_base import PusTelecommand + + +def pack_service3_test_into(tc_queue: Deque) -> Deque: + tc_queue.appendleft(("print", "Testing Service 3")) + # adding custom defintion to hk using test pool variables + sid1 = bytearray([0x00, 0x00, 0x43, 0x00]) + sid2 = bytearray([0x00, 0x00, 0x44, 0x00]) + sid_gps = bytearray([0x00, 0x00, 0x1f, 0x00]) + collection_interval = struct.pack('>f', 3) + number_of_parameters = struct.pack('B', 5) + p1 = bytearray([0x01, 0x01, 0x01, 0x01]) + p2 = bytearray([0x02, 0x02, 0x02, 0x02]) + p3 = bytearray([0x03, 0x03, 0x03, 0x03]) + p4 = bytearray([0x04, 0x04, 0x04, 0x04]) + p5 = bytearray([0x05, 0x05, 0x05, 0x05]) + hkDefinition1 = sid1 + collection_interval + number_of_parameters + p1 + p2 + p3 + p4 + p5 + collection_interval = struct.pack('>f', 6) + hkDefinition2 = sid2 + collection_interval + number_of_parameters + p1 + p2 + p3 + p4 + p5 + + # deleting pre-defined test entry + tc_queue.appendleft(("print", "\r\nTesting Service 3: Deleting pre-defined HK definition")) + command = PusTelecommand(service=3, subservice=3, ssc=3000, data=sid1) + tc_queue.appendleft(command.pack_command_tuple()) + + # adding pre-defined definition to hk using test pool variables + tc_queue.appendleft(("print", "\r\nTesting Service 3: Adding pre-defined HK definition")) + command = PusTelecommand(service=3, subservice=1, ssc=3010, data=hkDefinition1) + tc_queue.appendleft(command.pack_command_tuple()) + + # adding custom definition to diagnostics using test pool variables + tc_queue.appendleft(("print", "\r\nTesting Service 3: Adding custom diganostics definition")) + command = PusTelecommand(service=3, subservice=2, ssc=3020, data=hkDefinition2) + tc_queue.appendleft(command.pack_command_tuple()) + + # enable custom hk definition + tc_queue.appendleft(("print", "\r\nTesting Service 3: Enable custom definition")) + command = PusTelecommand(service=3, subservice=5, ssc=3030, data=sid1) + tc_queue.appendleft(command.pack_command_tuple()) + # enable custom diag definition + tc_queue.appendleft(("print", "\r\nTesting Service 3: Enable custom diagnostics definition")) + command = PusTelecommand(service=3, subservice=7, ssc=3040, data=sid2) + tc_queue.appendleft(command.pack_command_tuple()) + # enable gps0 + tc_queue.appendleft(("print", "\r\nTesting Service 3: Enable GPS definition")) + command = PusTelecommand(service=3, subservice=5, ssc=3050, data=sid_gps) + tc_queue.appendleft(command.pack_command_tuple()) + + # maybe wait a bit to receive at least 2 packets.. + tc_queue.appendleft(("wait", 3)) + + # Disable custom hk definition + tc_queue.appendleft(("print", "\r\nTesting Service 3: Disable custom definition")) + command = PusTelecommand(service=3, subservice=6, ssc=3060, data=sid1) + tc_queue.appendleft(command.pack_command_tuple()) + # Disable custom diag definition + tc_queue.appendleft(("print", "\r\nTesting Service 3: Disable custom diagnostics definition")) + command = PusTelecommand(service=3, subservice=8, ssc=3070, data=sid2) + tc_queue.appendleft(command.pack_command_tuple()) + # disable gps0 + tc_queue.appendleft(("print", "\r\nTesting Service 3: Disable GPS definition")) + command = PusTelecommand(service=3, subservice=6, ssc=3080, data=sid_gps) + tc_queue.appendleft(command.pack_command_tuple()) + # report custom Diag definition + tc_queue.appendleft(("print", "\r\nTesting Service 3: Reporting diag definition")) + command = PusTelecommand(service=3, subservice=11, ssc=3100, data=sid2) + tc_queue.appendleft(command.pack_command_tuple()) + # report gps definition + tc_queue.appendleft(("print", "\r\nTesting Service 3: Reporting GPS definition")) + command = PusTelecommand(service=3, subservice=9, ssc=3110, data=sid_gps) + tc_queue.appendleft(command.pack_command_tuple()) + # generate one custom hk definition + tc_queue.appendleft(("print", "\r\nTesting Service 3: Generate one custom hk definition")) + command = PusTelecommand(service=3, subservice=27, ssc=3120, data=sid1) + tc_queue.appendleft(command.pack_command_tuple()) + # generate one custom diag definition + tc_queue.appendleft(("print", "\r\nTesting Service 3: Generate one custom diagnostics definition")) + command = PusTelecommand(service=3, subservice=28, ssc=3120, data=sid2) + tc_queue.appendleft(command.pack_command_tuple()) + # generate one gps 0 definition + tc_queue.appendleft(("print", "\r\nTesting Service 3: Generate one gps 0 defintion")) + command = PusTelecommand(service=3, subservice=27, ssc=3120, data=sid_gps) + tc_queue.appendleft(command.pack_command_tuple()) + # modify custom hk definition interval + newInterval = struct.pack('>f', 10.0) + newIntervalCommand = sid1 + newInterval + tc_queue.appendleft(("print", "\r\nTesting Service 3: Changing pre-defined HK definition interval")) + command = PusTelecommand(service=3, subservice=31, ssc=3090, data=newIntervalCommand) + tc_queue.appendleft(command.pack_command_tuple()) + # report custom HK definition + tc_queue.appendleft(("print", "\r\nTesting Service 3: Reporting pre-defined HK definition with changed interval")) + command = PusTelecommand(service=3, subservice=9, ssc=3090, data=sid1) + tc_queue.appendleft(command.pack_command_tuple()) + # modify custom diag definition interval + newIntervalCommand = sid2 + newInterval + tc_queue.appendleft(("print", "\r\nTesting Service 3: Changing custom diag HK definition interval")) + command = PusTelecommand(service=3, subservice=32, ssc=3090, data=newIntervalCommand) + tc_queue.appendleft(command.pack_command_tuple()) + # report custom diag definition + tc_queue.appendleft(("print", "\r\nTesting Service 3: Reporting diag definition")) + command = PusTelecommand(service=3, subservice=11, ssc=3100, data=sid2) + tc_queue.appendleft(command.pack_command_tuple()) + # append parameter to custom hk definiton + # append parameter to custom diag definition + + # delete custom diag definition + tc_queue.appendleft(("print", "\r\nTesting Service 3: Deleting custom diagnostics definition")) + command = PusTelecommand(service=3, subservice=4, ssc=3120, data=sid2) + tc_queue.appendleft(command.pack_command_tuple()) + + # do some basic testing on predefined structs too + # e.g. add one variable, change interval, report them.... + tc_queue.appendleft(("export", "log/tmtc_log_service3.txt")) + return tc_queue diff --git a/tc/obsw_tc_service8.py b/tc/obsw_tc_service8.py new file mode 100644 index 0000000..7e2d2cd --- /dev/null +++ b/tc/obsw_tc_service8.py @@ -0,0 +1,64 @@ +# -*- coding: utf-8 -*- +""" +Program: obsw_module_test.py +Date: 01.11.2019 +Description: PUS Custom Service 8: Function Management, High-Level Commanding + +@author: R. Mueller +""" +from typing import Deque + +import config.obsw_config as g +from tc.obsw_pus_tc_base import PusTelecommand +from tc.obsw_tc_service200 import packModeData + + +def pack_service8_test_into(tc_queue: Deque, called_externally: bool = False) -> Deque: + if called_externally is False: + tc_queue.appendleft(("print", "Testing Service 8")) + object_id = g.DUMMY_DEVICE_ID + + # set mode on + tc_queue.appendleft(("print", "\r\nTesting Service 8: Set On Mode")) + mode_data = packModeData(object_id, 1, 0) + command = PusTelecommand(service=200, subservice=1, ssc=800, data=mode_data) + tc_queue.appendleft(command.pack_command_tuple()) + + # set mode normal + tc_queue.appendleft(("print", "\r\nTesting Service 8: Set Normal Mode")) + mode_data = packModeData(object_id, 2, 0) + command = PusTelecommand(service=200, subservice=1, ssc=810, data=mode_data) + tc_queue.appendleft(command.pack_command_tuple()) + + # Direct command which triggers completion reply + tc_queue.appendleft(("print", "\r\nTesting Service 8: Trigger Completion Reply")) + action_id = g.DUMMY_COMMAND_1 + direct_command = object_id + action_id + command = PusTelecommand(service=8, subservice=128, ssc=820, data=direct_command) + tc_queue.appendleft(command.pack_command_tuple()) + + # Direct command which triggers _tm_data reply + tc_queue.appendleft(("print", "\r\nTesting Service 8: Trigger Data Reply")) + action_id = g.DUMMY_COMMAND_2 + command_param1 = g.DUMMY_COMMAND_2_PARAM_1 + command_param2 = g.DUMMY_COMMAND_2_PARAM_2 + direct_command = object_id + action_id + command_param1 + command_param2 + command = PusTelecommand(service=8, subservice=128, ssc=830, data=direct_command) + tc_queue.appendleft(command.pack_command_tuple()) + + # Direct command which triggers an additional step reply and one completion reply + tc_queue.appendleft(("print", "\r\nTesting Service 8: Trigger Step and Completion Reply")) + action_id = g.DUMMY_COMMAND_3 + direct_command = object_id + action_id + command = PusTelecommand(service=8, subservice=128, ssc=840, data=direct_command) + tc_queue.appendleft(command.pack_command_tuple()) + tc_queue.appendleft(("wait", 2)) + tc_queue.appendleft(("print", "\r")) + if called_externally is False: + tc_queue.appendleft(("export", "log/tmtc_log_service8.txt")) + return tc_queue + + +def pack_specific_service8_test_into(tcQueue, objectIdList, actionIdList, dataList, + sscList, printStringList, calledExternally=False): + pass diff --git a/test/obsw_module_test.py b/test/obsw_module_test.py index e194aef..39b6d08 100644 --- a/test/obsw_module_test.py +++ b/test/obsw_module_test.py @@ -1,13 +1,15 @@ """ -Program: obsw_module_test.py -Date: 01.11.2019 -Description: Module/High-Level test of on-board software, used by the TMTC client in -software test mode. It is very difficult to execute Unit Tests on embedded hardware. +@file obsw_module_test.py +@date 01.11.2019 +@brief Module/High-Level test of on-board software, used by the TMTC client in + software test mode. +@details +It is very difficult to execute Unit Tests on embedded hardware. The most significant functonalities for satellites are commanding and TM reception. As such, these functionalities will be tested by sending TCs and checking the expected Telemetry. Still, we make use of the Unit Testing Framework provided by Python (with some hacks involved). -Manual: +@manual Set up the TMTC client as specified in the header comment and use the unit testing mode For Developers: @@ -27,21 +29,23 @@ TC source sequence count. For PUS standalone services (PUS Service Base), only the start and completion success need to be asserted. For PUS gateway services (PUS Commanding Service Base), step verification needs to be asserted additionally. If there are commands with multiple steps, this needs -to be specified in the analyseTcInfo method of the child test. +to be specified in the analyse_tc_info method of the child test. @author: R. Mueller """ +import sys import unittest +from abc import abstractmethod from collections import deque from typing import Deque -from tc.obsw_pus_tc_packer import packDummyDeviceTestInto +from config import obsw_config as g +from tc.obsw_pus_tc_packer import pack_dummy_device_test_into +from tc.obsw_pus_tc_base import PusTcInfoQueueT, TcDictionaryKeys from tm.obsw_tm_service_1 import pusPacketInfoService1T -from abc import abstractmethod +from tm.obsw_pus_tm_base import PusTmInfoQueueT, PusTmInfoT, TmDictionaryKeys from sendreceive.obsw_multiple_commands_sender_receiver import MultipleCommandSenderReceiver -from config import OBSW_Config as g -from tm.obsw_pus_tm_base import PusTmInfoQueueT, PusTmInfoT -from tc.obsw_pus_tc_base import PusTcInfoQueueT + TmInfoQueueService1T = Deque[pusPacketInfoService1T] @@ -74,7 +78,27 @@ class TestService(unittest.TestCase): cls.tmtc_printer = g.G_TMTC_PRINTER cls.tm_listener = g.G_TM_LISTENER cls.communication_interface = g.G_COM_INTERFACE - # connectToBoard() + + cls.tc_ssc_array = [] + cls.tc_service_array = [] + cls.tc_subservice_array = [] + # these number of TCs sent which need need to be verified + # separate step counter if there is more than one step for a command ! + cls.tc_verify_counter = 0 + cls.tc_verify_step_counter = 0 + + # These values are incremented in the analyseTmInfo function + # and compared to the counter values + cls.tc_verified_start = 0 + cls.tc_verified_completion = 0 + cls.tc_verified_step = 0 + + # The expected values are set in child test + cls.event_counter = 0 + cls.event_expected = 0 + cls.misc_expected = 0 + cls.misc_counter = 0 + cls.valid = True def perform_testing_and_generate_assertion_dict(self): """ @@ -89,130 +113,127 @@ class TestService(unittest.TestCase): """ module_tester = MultipleCommandSenderReceiver( com_interface=self.communication_interface, tmtc_printer=self.tmtc_printer, - tm_listener=self.tm_listener, tc_queue=self.test_queue, wait_intervals=self.wait_intervals, - wait_time=self.wait_time, print_tm=g.G_PRINT_RAW_TM) + tm_listener=self.tm_listener, tc_queue=self.test_queue, + wait_intervals=self.wait_intervals, wait_time=self.wait_time, print_tm=g.G_PRINT_RAW_TM) (tc_info_queue, tm_info_queue) = module_tester.send_tc_queue_and_return_info() assertion_dict = self._analyse_tm_tc_info(tm_info_queue, tc_info_queue) return assertion_dict - def _analyse_tm_tc_info( - self, tm_info_queue: PusTmInfoQueueT, tcInfoQueue: PusTcInfoQueueT) -> dict: - self.tcSscArray = [] - self.tcServiceArray = [] - self.tcSubserviceArray = [] - - # these number of TCs sent which need need to be verified - # separate step counter if there is more than one step for a command ! - self.tcVerifyCounter = 0 - self.tcVerifyStepCounter = 0 + def _analyse_tm_tc_info(self, tm_info_queue: PusTmInfoQueueT, + tc_info_queue: PusTcInfoQueueT) -> dict: # child test implements this # default implementation provided - self.analyseTcInfo(tcInfoQueue) + self.analyse_tc_info(tc_info_queue) - # These values are incremented in the analyseTmInfo function - # and compared to the counter values - self.tcVerifiedStart = 0 - self.tcVerifiedCompletion = 0 - self.tcVerifiedStep = 0 - # The expected values are set in child test - self.eventCounter = 0 - self.miscCounter = 0 - self.valid = True - - assertionDict = {} + assertion_dict = {} # child test implements this and can add additional dict entries - self.analyseTmInfo(tm_info_queue, assertionDict) - - assertionDict.update({ - "TcStartCount": self.tcVerifiedStart, - "TcCompletionCount": self.tcVerifiedCompletion, - "TcStepCount": self.tcVerifiedStep, - "EventCount": self.eventCounter, - "MiscCount": self.miscCounter, + self.analyse_tm_info(tm_info_queue, assertion_dict) + + assertion_dict.update({ + "TcStartCount": self.tc_verified_start, + "TcCompletionCount": self.tc_verified_completion, + "TcStepCount": self.tc_verified_step, + "EventCount": self.event_counter, + "MiscCount": self.misc_counter, "Valid": self.valid }) - return assertionDict + return assertion_dict - def analyseTcInfo(self, tcInfoQueue: PusTcInfoQueueT): - while not tcInfoQueue.__len__() == 0: - currentTcInfo = tcInfoQueue.pop() - self.tcVerifyCounter = self.tcVerifyCounter + 1 + def analyse_tc_info(self, tc_info_queue: PusTcInfoQueueT): + """ + Analyse the properties of the sent telecommands and fills them + into a list which will be used for analysis later. + """ + while not tc_info_queue.__len__() == 0: + current_tc_info = tc_info_queue.pop() + self.tc_verify_counter = self.tc_verify_counter + 1 # For commands with multiple steps, update this value manually ! - self.tcVerifyStepCounter = self.tcVerifyCounter + 1 - self.tcSscArray.append(currentTcInfo["ssc"]) - self.tcServiceArray.append(currentTcInfo["service"]) - self.tcSubserviceArray.append(currentTcInfo["subservice"]) + # Only service 8 generates a step reply. + if current_tc_info[TcDictionaryKeys.SERVICE] == 8: + self.tc_verify_step_counter += 1 + self.tc_ssc_array.append(current_tc_info[TcDictionaryKeys.SSC]) + self.tc_service_array.append(current_tc_info[TcDictionaryKeys.SERVICE]) + self.tc_subservice_array.append(current_tc_info[TcDictionaryKeys.SUBSERVICE]) - # must be implemented by child test ! @abstractmethod - def analyseTmInfo(self, tmInfoQueue: Deque, assertionDict: dict): - pass + def analyse_tm_info(self, tm_info_queue: Deque, assertion_dict: dict): + """ + In this function, the received TM information is analysed for correctness. + Must be implemented by child test ! + """ - # this function looks whether the tc verification SSC matched - # a SSC of the sent TM - def scanForRespectiveTc(self, currentTmInfo: PusTmInfoT): - currentSubservice = currentTmInfo["subservice"] - for possibleIndex, searchIndex in enumerate(self.tcSscArray): - if searchIndex == currentTmInfo["tcSSC"]: - if currentSubservice == 1: - self.tcVerifiedStart = self.tcVerifiedStart + 1 - elif currentSubservice == 5: - self.tcVerifiedStep = self.tcVerifiedStep + 1 - elif currentSubservice == 7: - self.tcVerifiedCompletion = self.tcVerifiedCompletion + 1 - - def _perform_generic_assertion_test(self, assertionDict: dict): - if assertionDict is None: + def scan_for_respective_tc(self, current_tm_info: PusTmInfoT): + """ + this function looks whether the tc verification SSC matched + a source sequence count of the sent TM + """ + current_subservice = current_tm_info[TmDictionaryKeys.SUBSERVICE] + for possible_index, search_index in enumerate(self.tc_ssc_array): + if search_index == current_tm_info[TmDictionaryKeys.TC_SSC]: + if current_subservice == 1: + self.tc_verified_start = self.tc_verified_start + 1 + elif current_subservice == 5: + self.tc_verified_step = self.tc_verified_step + 1 + elif current_subservice == 7: + self.tc_verified_completion = self.tc_verified_completion + 1 + + def _perform_generic_assertion_test(self, assertion_dict: dict): + if assertion_dict is None: print("Performing Generic Assertion Test: " "Configuratrion error, asseretion dictionary was not passed properly") - exit() - self.assertEqual(assertionDict["TcStartCount"], self.tcVerifyCounter) - self.assertEqual(assertionDict["TcCompletionCount"], self.tcVerifyCounter) - self.assertEqual(assertionDict["EventCount"], self.eventExpected) - self.assertEqual(assertionDict["MiscCount"], self.miscExpected) - self.assertTrue(assertionDict["Valid"]) + sys.exit() + self.assertEqual(assertion_dict["TcStartCount"], self.tc_verify_counter) + self.assertEqual(assertion_dict["TcCompletionCount"], self.tc_verify_counter) + self.assertEqual(assertion_dict["EventCount"], self.event_expected) + self.assertEqual(assertion_dict["MiscCount"], self.misc_expected) + self.assertTrue(assertion_dict["Valid"]) # these tests are identical - def performService5or17Test(self): - assertionDict = self.perform_testing_and_generate_assertion_dict() - self.eventExpected = 1 - self.miscExpected = 2 - self._perform_generic_assertion_test(assertionDict) - - def analyseService5or17TM(self, tmInfoQueue: PusTmInfoQueueT, assertionDict: dict): - self.miscCounter = 0 - while not tmInfoQueue.__len__() == 0: - currentTmInfo = tmInfoQueue.pop() + def _perform_service5or17_test(self): + assertion_dict = self.perform_testing_and_generate_assertion_dict() + self.event_expected = 1 + self.misc_expected = 2 + self._perform_generic_assertion_test(assertion_dict) + + def _analyse_service5or17_tm(self, tm_info_queue: PusTmInfoQueueT, assertion_dict: dict): + self.misc_counter = 0 + while not tm_info_queue.__len__() == 0: + current_tm_info = tm_info_queue.pop() # Tc verification scanning is generic and has been moved to the superclass - if currentTmInfo["service"] == 1: - self.scanForRespectiveTc(currentTmInfo) + if current_tm_info[TmDictionaryKeys.SERVICE] == 1: + self.scan_for_respective_tc(current_tm_info) # Here, the desired event Id or RID can be specified - if currentTmInfo["service"] == 5: - if currentTmInfo["EventID"] == 8200 and currentTmInfo["RID"] == 0x51001700: - self.eventCounter = self.eventCounter + 1 - if currentTmInfo["service"] == 17: - self.miscCounter = self.miscCounter + 1 - if currentTmInfo["valid"] == 0: + if current_tm_info[TmDictionaryKeys.SERVICE] == 5: + if (current_tm_info[TmDictionaryKeys.EVENT_ID] == 8200 and + current_tm_info[TmDictionaryKeys.REPORTER_ID] == 0x51001700): + + self.event_counter = self.event_counter + 1 + if current_tm_info[TmDictionaryKeys.SERVICE] == 17: + self.misc_counter = self.misc_counter + 1 + if current_tm_info[TmDictionaryKeys.VALID] == 0: self.valid = False - assertionDict.update({"MiscCount": self.miscCounter}) + assertion_dict.update({"MiscCount": self.misc_counter}) @classmethod def tearDownClass(cls): - cls.eventCounter = 0 - cls.tcVerifyCounter = 0 + cls.event_counter = 0 + cls.tc_verify_counter = 0 # noinspection PyUnresolvedReferences cls.test_queue.clear() class TestDummyDevice(TestService): + """ + Test for the dummy class. + """ @classmethod def setUpClass(cls): super().setUpClass() print("Testing Dummy Device") - packDummyDeviceTestInto(super().test_queue) + pack_dummy_device_test_into(super().test_queue) - def analyseTmInfo(self, tmInfoQueue, assertionDict): + def analyse_tm_info(self, tm_info_queue, assertion_dict): pass diff --git a/test/obsw_pus_service_test.py b/test/obsw_pus_service_test.py index 945b5e6..e59002d 100644 --- a/test/obsw_pus_service_test.py +++ b/test/obsw_pus_service_test.py @@ -1,11 +1,24 @@ +""" +@file obsw_pus_service_test.py +@date 01.11.2019 +@brief Contains specific PUS service tests. +@author: R. Mueller +""" +import struct import unittest -from test.obsw_module_test import TestService, PusTmInfoQueueT +from typing import Deque + +from test.obsw_module_test import TestService, PusTmInfoQueueT, TmDictionaryKeys from tc.obsw_pus_tc_base import PusTcInfoQueueT -from tc.obsw_pus_tc_packer import packService17TestInto, packService5TestInto, packService2TestInto -import config.OBSW_Config as g +from tc.obsw_pus_tc_packer import pack_service17_test_into, pack_service5_test_into, \ + pack_service2_test_into, pack_service8_test_into +import config.obsw_config as g class TestService2(TestService): + """ + Test raw commanding service. + """ @classmethod def setUpClass(cls: TestService): super().setUpClass() @@ -13,36 +26,43 @@ class TestService2(TestService): # all commands must be sent sequentially, not as a burst cls.wait_intervals = [1, 2, 3, 4] cls.wait_time = [2, 2, 2, 3] - packService2TestInto(cls.test_queue) + pack_service2_test_into(cls.test_queue) - def test_Service2(self): + def test_service2(self): + """ + Tests the raw commanding service. + """ assertion_dict = self.perform_testing_and_generate_assertion_dict() - self.eventExpected = 1 - self.miscExpected = 4 + self.event_expected = 2 + self.misc_expected = 4 super()._perform_generic_assertion_test(assertion_dict) - def analyseTmInfo(self, tm_info_queue: PusTmInfoQueueT, assertion_dict: dict): + def analyse_tm_info(self, tm_info_queue: PusTmInfoQueueT, assertion_dict: dict): while not tm_info_queue.__len__() == 0: current_tm_info = tm_info_queue.pop() # Tc verification scanning is generic and has been moved to the superclass - if current_tm_info["service"] == 1: - super().scanForRespectiveTc(current_tm_info) + if current_tm_info[TmDictionaryKeys.SERVICE] == 1: + super().scan_for_respective_tc(current_tm_info) # Here, the desired event Id or RID can be specified - if current_tm_info["service"] == 5: + if current_tm_info[TmDictionaryKeys.SERVICE] == 5: # mode change - if current_tm_info["EventID"] == 7401 and current_tm_info["RID"] == 0x4400affe: - self.eventCounter = self.eventCounter + 1 - if current_tm_info["service"] == 200 and current_tm_info["subservice"] == 6: + if (current_tm_info[TmDictionaryKeys.REPORTER_ID] == + struct.unpack('>I', g.DUMMY_DEVICE_ID)[0]) \ + and (current_tm_info[TmDictionaryKeys.EVENT_ID] == 7401 or + current_tm_info[TmDictionaryKeys.EVENT_ID] == 7400): + self.event_counter = self.event_counter + 1 + if current_tm_info[TmDictionaryKeys.SERVICE] == 200 and \ + current_tm_info[TmDictionaryKeys.SUBSERVICE] == 6: # mode change confirmation - self.miscCounter = self.miscCounter + 1 + self.misc_counter += 1 # wiretapping messages - elif current_tm_info["service"] == 2 and current_tm_info["subservice"] == 130: - self.miscCounter = self.miscCounter + 1 - elif current_tm_info["service"] == 2 and current_tm_info["subservice"] == 131: - self.miscCounter = self.miscCounter + 1 - if current_tm_info["valid"] == 0: + elif current_tm_info[TmDictionaryKeys.SERVICE] == 2 and \ + (current_tm_info[TmDictionaryKeys.SUBSERVICE] == 130 or + current_tm_info[TmDictionaryKeys.SUBSERVICE] == 131): + self.misc_counter += 1 + if current_tm_info[TmDictionaryKeys.VALID] == 0: self.valid = False - assertion_dict.update({"MiscCount": self.miscCounter}) + assertion_dict.update({"MiscCount": self.misc_counter}) class TestService5(TestService): @@ -54,17 +74,17 @@ class TestService5(TestService): # This is required because the OBSW tasks runs with fixed sequences cls.wait_intervals = [1, 2, 3] cls.wait_time = [1.5, 2.0, 2.0] - packService5TestInto(cls.test_queue) + pack_service5_test_into(cls.test_queue) def test_Service5(self): # analyseTmInfo() and analyseTcInfo are called here - super().performService5or17Test() + super()._perform_service5or17_test() - def analyseTcInfo(self, tcInfoQueue: PusTcInfoQueueT): - super().analyseTcInfo(tcInfoQueue) + def analyse_tc_info(self, tc_info_queue: PusTcInfoQueueT): + super().analyse_tc_info(tc_info_queue) - def analyseTmInfo(self, tmInfoQueue: PusTmInfoQueueT, assertionDict: dict): - super().analyseService5or17TM(tmInfoQueue, assertionDict) + def analyse_tm_info(self, tm_info_queue: PusTmInfoQueueT, assertion_dict: dict): + super()._analyse_service5or17_tm(tm_info_queue, assertion_dict) @classmethod def tearDownClass(cls): @@ -77,10 +97,47 @@ class TestService6(unittest.TestCase): print("Hallo Test 6!") -class TestService8(unittest.TestCase): +class TestService8(TestService): + @classmethod - def setUpClass(cls): - print("Hallo Test 2!") + def setUpClass(cls: TestService): + super().setUpClass() + print("Testing Service 8") + cls.wait_intervals = [1, 2, 3, 4] + cls.wait_time = [1.5, 0.7, 0.7, 1.0] + cls.data_reply_count = 0 + pack_service8_test_into(cls.test_queue) + + def test_Service8(self): + + assertion_dict = self.perform_testing_and_generate_assertion_dict() + self.event_expected = 4 + self.misc_expected = 2 + self.data_reply_expected = 1 + # One reply generates an additional step. + self.tc_verify_step_counter += 1 + self.assertEqual(assertion_dict["TcStepCount"], self.tc_verify_step_counter) + + self.assertEqual(self.data_reply_count, self.data_reply_expected) + super()._perform_generic_assertion_test(assertion_dict) + + def analyse_tm_info(self, tm_info_queue: Deque, assertion_dict: dict): + while not tm_info_queue.__len__() == 0: + current_tm_info = tm_info_queue.pop() + if current_tm_info[TmDictionaryKeys.SERVICE] == 1: + self.scan_for_respective_tc(current_tm_info) + if current_tm_info[TmDictionaryKeys.SERVICE] == 5: + if (current_tm_info[TmDictionaryKeys.REPORTER_ID] == + struct.unpack('>I', g.DUMMY_DEVICE_ID)[0]) and \ + (current_tm_info[TmDictionaryKeys.EVENT_ID] == 7401 + or current_tm_info[TmDictionaryKeys.EVENT_ID] == 7400): + self.event_counter += 1 + if current_tm_info[TmDictionaryKeys.SERVICE] == 200: + # mode change confirmation + self.misc_counter += 1 + if (current_tm_info[TmDictionaryKeys.SERVICE] == 8 and + current_tm_info[TmDictionaryKeys.SUBSERVICE] == 130): + self.data_reply_count += 1 class TestService9(unittest.TestCase): @@ -97,22 +154,23 @@ class TestService17(TestService): cls.wait_intervals = [2] cls.wait_time = 2 cls.tm_timeout = g.G_TM_TIMEOUT - packService17TestInto(cls.test_queue) + pack_service17_test_into(cls.test_queue) def test_Service17(self): - super().performService5or17Test() + super()._perform_service5or17_test() - def _analyse_tm_tc_info(self, tmInfoQueue: PusTmInfoQueueT, tcInfoQueue: PusTcInfoQueueT): - assertionDict = super()._analyse_tm_tc_info(tm_info_queue=tmInfoQueue, tcInfoQueue=tcInfoQueue) + def _analyse_tm_tc_info(self, tm_info_queue: PusTmInfoQueueT, tc_info_queue: PusTcInfoQueueT): + assertion_dict = super()._analyse_tm_tc_info(tm_info_queue=tm_info_queue, + tc_info_queue=tc_info_queue) # add anything elsee other than tc verification counter # and ssc that is needed for tm analysis - return assertionDict + return assertion_dict - def analyseTcInfo(self, tcInfoQueue): - super().analyseTcInfo(tcInfoQueue) + def analyse_tc_info(self, tc_info_queue): + super().analyse_tc_info(tc_info_queue) - def analyseTmInfo(self, tmInfoQueue: PusTmInfoQueueT, assertionDict: dict): - super().analyseService5or17TM(tmInfoQueue, assertionDict) + def analyse_tm_info(self, tm_info_queue: PusTmInfoQueueT, assertion_dict: dict): + super()._analyse_service5or17_tm(tm_info_queue, assertion_dict) @classmethod def tearDownClass(cls): diff --git a/tm/obsw_pus_tm_base.py b/tm/obsw_pus_tm_base.py index 60a4735..0915f8e 100644 --- a/tm/obsw_pus_tm_base.py +++ b/tm/obsw_pus_tm_base.py @@ -4,13 +4,34 @@ @author: R.Mueller, S. Gaisser """ import datetime +from enum import Enum, auto from typing import TypeVar, Dict, Tuple, Deque, List, Final from crcmod import crcmod +class TmDictionaryKeys(Enum): + SERVICE = auto() + SUBSERVICE = auto() + SUBCOUNTER = auto() + SSC = auto() + DATA = auto() + CRC = auto() + VALID = auto() + # Service 1 + TC_PACKET_ID = auto() + TC_SSC = auto() + ERROR_CODE = auto() + STEP_NUMBER = auto() + # Service 5 + EVENT_ID = auto() + REPORTER_ID = auto() + EVENT_PARAM_1 = auto() + EVENT_PARAM_2 = auto() + + PusTmT = TypeVar('PusTmT', bound='PUSTelemetry') -PusTmInfoT = Dict[str, any] +PusTmInfoT = Dict[TmDictionaryKeys, any] PusTmTupleT = Tuple[PusTmInfoT, PusTmT] PusTmQueueT = Deque[PusTmT] @@ -59,12 +80,12 @@ class PusTelemetry: :return: TM dictionary """ tm_information = { - "service": self.get_service(), - "subservice": self.get_subservice(), - "ssc": self.get_ssc(), - "data": self._tm_data, - "crc": self._crc, - "valid": self._pus_header.valid + TmDictionaryKeys.SERVICE: self.get_service(), + TmDictionaryKeys.SUBSERVICE: self.get_subservice(), + TmDictionaryKeys.SSC: self.get_ssc(), + TmDictionaryKeys.DATA: self._tm_data, + TmDictionaryKeys.CRC: self._crc, + TmDictionaryKeys.VALID: self._pus_header.valid } return tm_information @@ -107,7 +128,7 @@ class PusTelemetry: self._data_field_header.append_data_field_header_column_header(header_list) self._pus_header.append_pus_packet_header_column_headers(header_list) - def get_raw_packet(self) -> bytearray: + def get_raw_packet(self) -> bytes: """ Get the whole TM wiretapping_packet as a bytearray (raw) :return: TM wiretapping_packet diff --git a/tm/obsw_pus_tm_factory.py b/tm/obsw_pus_tm_factory.py index 55bbc7c..66c70ef 100644 --- a/tm/obsw_pus_tm_factory.py +++ b/tm/obsw_pus_tm_factory.py @@ -13,10 +13,10 @@ from tm.obsw_tm_service_5 import Service5TM import struct -def PusTelemetryFactory(raw_packet: bytes): +def pus_telemetry_factory(raw_packet: bytes): """ - Returns a PusTelemetry class instance by extracting the service directly from the raw wiretapping_packet. - Sneaky solution allows this function to immediately return + Returns a PusTelemetry class instance by extracting the service directly from the + raw wiretapping_packet. Sneaky solution allows this function to immediately return the right telemetry wiretapping_packet :param raw_packet: :return: @@ -24,26 +24,27 @@ def PusTelemetryFactory(raw_packet: bytes): service_type = raw_packet[7] if service_type == 1: return Service1TM(raw_packet) - elif service_type == 2: + if service_type == 2: return Service2TM(raw_packet) - elif service_type == 3: + if service_type == 3: return Service3TM(raw_packet) - elif service_type == 5: + if service_type == 5: return Service5TM(raw_packet) - elif service_type == 8: + if service_type == 8: return Service8TM(raw_packet) - elif service_type == 17: + if service_type == 17: return Service17TM(raw_packet) - elif service_type == 200: + if service_type == 200: return Service200TM(raw_packet) - else: - print("The service " + str(service_type) + " is not implemented in Telemetry Factory") - return PusTelemetry(raw_packet) - + print("The service " + str(service_type) + " is not implemented in Telemetry Factory") + return PusTelemetry(raw_packet) + + class Service2TM(PusTelemetry): - def __init__(self, byteArray): - super().__init__(byteArray) + def __init__(self, byte_array: bytes): + super().__init__(byte_array) + self.specify_packet_info("Raw Commanding Reply") def append_telemetry_content(self, array): super().append_telemetry_content(array) @@ -57,6 +58,7 @@ class Service2TM(PusTelemetry): class Service8TM(PusTelemetry): def __init__(self, byteArray): super().__init__(byteArray) + self.specify_packet_info("Direct Commanding Reply") def append_telemetry_content(self, array): super().append_telemetry_content(array) @@ -68,21 +70,22 @@ class Service8TM(PusTelemetry): class Service9TM(PusTelemetry): - def __init__(self, byteArray): - super().__init__(byteArray) + def __init__(self, byte_array): + super().__init__(byte_array) + self.specify_packet_info("Time Service Reply") - def append_telemetry_content(self, array): + def append_telemetry_content(self, array: list): super().append_telemetry_content(array) return - def append_telemetry_column_headers(self, array): - super().append_telemetry_column_headers(array) + def append_telemetry_column_headers(self, column_header: list): + super().append_telemetry_column_headers(column_header) return class Service17TM(PusTelemetry): - def __init__(self, byteArray): - super().__init__(byteArray) + def __init__(self, byte_array): + super().__init__(byte_array) self.specify_packet_info("Test Reply") def append_telemetry_content(self, array): @@ -95,8 +98,8 @@ class Service17TM(PusTelemetry): class Service200TM(PusTelemetry): - def __init__(self, byteArray): - super().__init__(byteArray) + def __init__(self, byte_array): + super().__init__(byte_array) self.isCantReachModeReply = False self.isModeReply = False self.specify_packet_info("Mode Reply") diff --git a/tm/obsw_tm_service_1.py b/tm/obsw_tm_service_1.py index 9b5b7ab..ec4211b 100644 --- a/tm/obsw_tm_service_1.py +++ b/tm/obsw_tm_service_1.py @@ -5,78 +5,82 @@ Date: 30.12.2019 Description: Deserialize Pus Verification TM Author: R. Mueller """ -from tm.obsw_pus_tm_base import PusTelemetry -from typing import Dict import struct +from typing import Dict + +from tm.obsw_pus_tm_base import PusTelemetry, TmDictionaryKeys + -pusPacketInfoService1T = Dict[str, any] +pusPacketInfoService1T = Dict[TmDictionaryKeys, any] class Service1TM(PusTelemetry): - def __init__(self, byteArray): - super().__init__(byteArray) - self.tcErrorCode = False - self.isStep = False + def __init__(self, byte_array: bytes): + super().__init__(byte_array) + self.has_tc_error_code = False + self.is_step_reply = False # Failure Reports with error code + self.err_code = 0 + self.step_number = 0 self.tcPacketId = self._tm_data[0] << 8 | self._tm_data[1] self.tcSSC = ((self._tm_data[2] & 0x3F) << 8) | self._tm_data[3] self.specify_packet_info("Success Verification") if self.get_subservice() % 2 == 0: self.specify_packet_info("Failure Verficiation") - self.tcErrorCode = True + self.has_tc_error_code = True if self._data_field_header.subtype == 6: - self.isStep = True + self.is_step_reply = True self.append_packet_info(" : Step Failure") - self.stepNumber = struct.unpack('>B', self._tm_data[4:5])[0] - self.ErrCode = struct.unpack('>H', self._tm_data[5:7])[0] + self.step_number = struct.unpack('>B', self._tm_data[4:5])[0] + self.err_code = struct.unpack('>H', self._tm_data[5:7])[0] self.errorParam1 = struct.unpack('>I', self._tm_data[7:11])[0] self.errorParam2 = struct.unpack('>I', self._tm_data[11:15])[0] else: self.print_data() - self.ErrCode = struct.unpack('>H', self._tm_data[4:6])[0] + self.err_code = struct.unpack('>H', self._tm_data[4:6])[0] self.errorParam1 = struct.unpack('>I', self._tm_data[6:10])[0] self.errorParam2 = struct.unpack('>I', self._tm_data[10:14])[0] elif self.get_subservice() == 5: - self.isStep = True + self.is_step_reply = True self.append_packet_info(" : Step Success") - self.stepNumber = struct.unpack('>B', self._tm_data[4:5])[0] + self.step_number = struct.unpack('>B', self._tm_data[4:5])[0] def append_telemetry_content(self, array): super().append_telemetry_content(array) array.append(str(hex(self.tcPacketId))) array.append(str(self.tcSSC)) - if self.tcErrorCode: - if self.isStep: - array.append(str(self.stepNumber)) - array.append(str(hex(self.ErrCode))) + if self.has_tc_error_code: + if self.is_step_reply: + array.append(str(self.step_number)) + array.append(str(hex(self.err_code))) array.append(str(hex(self.errorParam1)) + ", " + str(self.errorParam1)) array.append(str(hex(self.errorParam2)) + ", " + str(self.errorParam2)) - elif self.isStep: - array.append(str(self.stepNumber)) + elif self.is_step_reply: + array.append(str(self.step_number)) def append_telemetry_column_headers(self, array): super().append_telemetry_column_headers(array) array.append("TC Packet ID") array.append("TC SSC") - if self.tcErrorCode: - if self.isStep: + if self.has_tc_error_code: + if self.is_step_reply: array.append("Step Number") array.append("Return Value") array.append("Error Param 1") array.append("Error Param 2") - elif self.isStep: + elif self.is_step_reply: array.append("Step Number") def pack_tm_information(self) -> pusPacketInfoService1T: - tmInformation = super().pack_tm_information() - addInformation = { - "tcPacketId": self.tcPacketId, - "tcSSC": self.tcSSC, + tm_information = super().pack_tm_information() + add_information = { + TmDictionaryKeys.TC_PACKET_ID: self.tcPacketId, + TmDictionaryKeys.TC_SSC: self.tcSSC, } - tmInformation.update(addInformation) - if self.tcErrorCode: - tmInformation.update({"errCode": self.ErrCode}) - if self.isStep: - tmInformation.update({"stepNumber": self.ErrCode}) - return tmInformation + tm_information.update(add_information) + if self.has_tc_error_code: + tm_information.update({TmDictionaryKeys.ERROR_CODE: self.err_code}) + if self.is_step_reply: + tm_information.update({TmDictionaryKeys.STEP_NUMBER: self.step_number}) + return tm_information diff --git a/tm/obsw_tm_service_5.py b/tm/obsw_tm_service_5.py index 4df7aae..ad44eae 100644 --- a/tm/obsw_tm_service_5.py +++ b/tm/obsw_tm_service_5.py @@ -6,13 +6,13 @@ Description: Deserialize PUS Event Report Author: R. Mueller """ -from tm.obsw_pus_tm_base import PusTelemetry +from tm.obsw_pus_tm_base import PusTelemetry, TmDictionaryKeys, PusTmInfoT import struct class Service5TM(PusTelemetry): - def __init__(self, byteArray): - super().__init__(byteArray) + def __init__(self, byte_array): + super().__init__(byte_array) self.specify_packet_info("Event") if self.get_subservice() == 1: self.append_packet_info(" Info") @@ -41,13 +41,13 @@ class Service5TM(PusTelemetry): array.append("Parameter 1") array.append("Parameter 2") - def pack_tm_information(self): - tmInformation = super().pack_tm_information() - addInformation = { - "RID": self.objectId, - "EventID": self.eventId, - "Param1": self.param1, - "Param2": self.param2 + def pack_tm_information(self) -> PusTmInfoT: + tm_information = super().pack_tm_information() + add_information = { + TmDictionaryKeys.REPORTER_ID: self.objectId, + TmDictionaryKeys.EVENT_ID: self.eventId, + TmDictionaryKeys.EVENT_PARAM_1: self.param1, + TmDictionaryKeys.EVENT_PARAM_2: self.param2 } - tmInformation.update(addInformation) - return tmInformation + tm_information.update(add_information) + return tm_information diff --git a/utility/obsw_tmtc_printer.py b/utility/obsw_tmtc_printer.py index 74aeef3..477a9dc 100644 --- a/utility/obsw_tmtc_printer.py +++ b/utility/obsw_tmtc_printer.py @@ -2,7 +2,7 @@ # -*- coding: utf-8 -*- """ @file - OBSW_Config.py + obsw_config.py @date 01.11.2019 @brief @@ -11,10 +11,10 @@ import os import sys from typing import TypeVar -from config import OBSW_Config as g +from config import obsw_config as g from tm.obsw_pus_tm_base import PusTmT, PusTelemetry from tm.obsw_tm_service_3 import PusTm3T -from tc.obsw_pus_tc_base import PusTcT, PusTcInfoT +from tc.obsw_pus_tc_base import PusTcT, PusTcInfoT, TcDictionaryKeys TmTcPrinterT = TypeVar('TmTcPrinterT', bound='TmTcPrinter') @@ -30,6 +30,9 @@ class TmTcPrinter: :param do_print_to_file: if true, print to file :param print_tc: if true, print TCs """ + # TODO: we should implement a list of strings here. each service test string + # is written into a list entry. For the file output, the list entries are concatenated + # and put into the main log file. self.print_buffer = "" # global print buffer which will be useful to print something to file self.file_buffer = "" @@ -277,9 +280,9 @@ class TmTcPrinter: :param tc_packet_info: :return: """ - self.print_buffer = "Sent TC[" + str(tc_packet_info["service"]) + "," + \ - str(tc_packet_info["subservice"]) + "] " + " with SSC " + \ - str(tc_packet_info["ssc"]) + self.print_buffer = "Sent TC[" + str(tc_packet_info[TcDictionaryKeys.SERVICE]) + "," + \ + str(tc_packet_info[TcDictionaryKeys.SUBSERVICE]) + "] " + " with SSC " + \ + str(tc_packet_info[TcDictionaryKeys.SSC]) print(self.print_buffer) self.add_print_buffer_to_file_buffer() @@ -290,10 +293,12 @@ class TmTcPrinter: :return: """ try: - self.print_buffer = "Telecommand TC[" + str(tc_packet_info["service"]) + "," + \ - str(tc_packet_info["subservice"]) + "] with SSC " + \ - str(tc_packet_info["ssc"]) + " sent with data " + \ - self.return_data_string(tc_packet_info["data"]) + self.print_buffer = \ + "Telecommand TC[" + str(tc_packet_info[TcDictionaryKeys.SERVICE]) + "," + \ + str(tc_packet_info[TcDictionaryKeys.SUBSERVICE]) + "] with SSC " + \ + str(tc_packet_info[TcDictionaryKeys.SSC]) + " sent with data " + \ + self.return_data_string(tc_packet_info[TcDictionaryKeys.DATA]) + print(self.print_buffer) self.add_print_buffer_to_file_buffer() except TypeError: -- GitLab