diff --git a/.idea/runConfigurations/OBSW_TmTcClient_Unit_Test_Serial.xml b/.idea/runConfigurations/OBSW_TmTcClient_Module_Test_Serial.xml similarity index 86% rename from .idea/runConfigurations/OBSW_TmTcClient_Unit_Test_Serial.xml rename to .idea/runConfigurations/OBSW_TmTcClient_Module_Test_Serial.xml index 7d146a93a1e68d68a60ad4144136339141103314..9d3a170984f7bfcf98901cb9311ab024498ebe60 100644 --- a/.idea/runConfigurations/OBSW_TmTcClient_Unit_Test_Serial.xml +++ b/.idea/runConfigurations/OBSW_TmTcClient_Module_Test_Serial.xml @@ -1,5 +1,5 @@ <component name="ProjectRunConfigurationManager"> - <configuration default="false" name="OBSW_TmTcClient Unit Test Serial" type="PythonConfigurationType" factoryName="Python" folderName="Serial Communication"> + <configuration default="false" name="OBSW_TmTcClient Module Test Serial" type="PythonConfigurationType" factoryName="Python" folderName="Serial Communication"> <module name="tmtc" /> <option name="INTERPRETER_OPTIONS" value="" /> <option name="PARENT_ENVS" value="true" /> diff --git a/.idea/runConfigurations/OBSW_TmTcClient_Service_8_Serial.xml b/.idea/runConfigurations/OBSW_TmTcClient_Service_8_Serial.xml index f1df18fdf5489ab6968dc070e07ef8c711d006f7..77f41e73d5d86f6fe56b1675ea7b4e5342f4ecf1 100644 --- a/.idea/runConfigurations/OBSW_TmTcClient_Service_8_Serial.xml +++ b/.idea/runConfigurations/OBSW_TmTcClient_Service_8_Serial.xml @@ -13,7 +13,7 @@ <option name="ADD_SOURCE_ROOTS" value="true" /> <EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" /> <option name="SCRIPT_NAME" value="$PROJECT_DIR$/obsw_tmtc_client.py" /> - <option name="PARAMETERS" value="-m 3 -s 8 -p -c 1" /> + <option name="PARAMETERS" value="-m 3 -s 8 -c 1" /> <option name="SHOW_COMMAND_LINE" value="false" /> <option name="EMULATE_TERMINAL" value="true" /> <option name="MODULE_MODE" value="false" /> diff --git a/comIF/obsw_ethernet_com_if.py b/comIF/obsw_ethernet_com_if.py index 94b19d6c55d6a88b6617a766528e6ec98e4218f1..21b833cd641dfc6beb231f4119545aab48c4327a 100644 --- a/comIF/obsw_ethernet_com_if.py +++ b/comIF/obsw_ethernet_com_if.py @@ -12,10 +12,10 @@ from typing import Tuple, Union from comIF.obsw_com_interface import CommunicationInterface, PusTmListT, PusTmQueueT, \ PusTmTupleQueueT, PusTmInfoQueueT -from tm.obsw_pus_tm_factory import PusTelemetryFactory +from tm.obsw_pus_tm_factory import pus_telemetry_factory from tc.obsw_pus_tc_base import PusTcInfoT from utility.obsw_tmtc_printer import TmTcPrinterT -import config.OBSW_Config as g +import config.obsw_config as g # pylint: disable=abstract-method @@ -53,7 +53,7 @@ class EthernetComIF(CommunicationInterface): ready = self.data_available(pollTimeout) if ready: data = self.sock_receive.recvfrom(1024)[0] - packet = PusTelemetryFactory(data) + packet = pus_telemetry_factory(data) self.tmtc_printer.print_telemetry(packet) packet_list = [packet] return True, packet_list diff --git a/comIF/obsw_serial_com_if.py b/comIF/obsw_serial_com_if.py index cd25af9e5a86c3779747e628a3a7ca854165fa1c..9be78d7550a616471d7a335efac7f40a7e373bde 100644 --- a/comIF/obsw_serial_com_if.py +++ b/comIF/obsw_serial_com_if.py @@ -13,7 +13,7 @@ from typing import Tuple, List, Union, Optional import serial from comIF.obsw_com_interface import CommunicationInterface, PusTmQueueT from utility.obsw_tmtc_printer import TmTcPrinterT -from tm.obsw_pus_tm_factory import PusTelemetryFactory +from tm.obsw_pus_tm_factory import pus_telemetry_factory from tm.obsw_pus_tm_base import PusTmInfoQueueT, PusTmTupleQueueT, PusTmListT from tc.obsw_pus_tc_base import PusTcInfoT @@ -62,7 +62,7 @@ class SerialComIF(CommunicationInterface): pus_data_list, number_of_packets = self.__poll_pus_packets() packet_list = [] for counter in range(0, number_of_packets): - packet = PusTelemetryFactory(pus_data_list[counter]) + packet = pus_telemetry_factory(pus_data_list[counter]) self.tmtc_printer.print_telemetry(packet) packet_list.append(packet) return True, packet_list diff --git a/config/OBSW_Config.py b/config/obsw_config.py similarity index 88% rename from config/OBSW_Config.py rename to config/obsw_config.py index 67ae5b3a71e877a714d640115693b334699b60d9..09923ffd82c975dce92452bd119a29bf6c681844 100644 --- a/config/OBSW_Config.py +++ b/config/obsw_config.py @@ -1,12 +1,13 @@ """ @file - OBSW_Config.py + obsw_config.py @date 01.11.2019 @brief Global settings for UDP client """ import enum +import struct from typing import Tuple """ @@ -41,6 +42,13 @@ GPS0_ObjectId = bytearray([0x44, 0x10, 0x1F, 0x00]) GPS1_ObjectId = bytearray([0x44, 0x20, 0x20, 0x00]) DUMMY_DEVICE_ID = bytearray([0x44, 0x00, 0xAF, 0xFE]) +# Commands +DUMMY_COMMAND_1 = struct.pack(">I", 666) +DUMMY_COMMAND_2 = bytearray([0xC0, 0xC0, 0xBA, 0xBE]) +DUMMY_COMMAND_2_PARAM_1 = bytearray([0xBA, 0xB0]) +DUMMY_COMMAND_2_PARAM_2 = bytearray([0x00, 0x00, 0x00, 0x52, 0x4F, 0x42, 0x49, 0x4E]) +DUMMY_COMMAND_3 = bytearray([0xBA, 0xDE, 0xAF, 0xFE]) + # SIDs GPS0_SID = bytearray([0x00, 0x00, 0x1f, 0x00]) GPS1_SID = bytearray([0x00, 0x00, 0x2f, 0x00]) diff --git a/gui/OBSW_BackendTest.py b/gui/obsw_backend_test.py similarity index 100% rename from gui/OBSW_BackendTest.py rename to gui/obsw_backend_test.py diff --git a/gui/OBSW_TmtcGUI.py b/gui/obsw_tmtc_gui.py similarity index 95% rename from gui/OBSW_TmtcGUI.py rename to gui/obsw_tmtc_gui.py index 5a55c6b55f9e5242b49549b64402a1a3265eed12..7bf34bc2510bbaae1e88b8b66076cadbbea45b4f 100644 --- a/gui/OBSW_TmtcGUI.py +++ b/gui/obsw_tmtc_gui.py @@ -2,7 +2,7 @@ # -*- coding: utf-8 -*- """ @file - OBSW_TmtcGUI.py + obsw_tmtc_gui.py @date 01.11.2019 @brief diff --git a/obsw_tmtc_client.py b/obsw_tmtc_client.py index 1b4a46489f60df8a4fa4f22b872b628f614d58ac..7fd1a4a67a73cf30aec7ac897ae37c71a2d7e798 100644 --- a/obsw_tmtc_client.py +++ b/obsw_tmtc_client.py @@ -60,9 +60,9 @@ import sys from collections import deque from test import obsw_pus_service_test -from config import OBSW_Config as g -from config.OBSW_Config import setGlobals -from tc.obsw_pus_tc_packer import PusTelecommand, createTotalTcQueue, service_test_select +from config import obsw_config as g +from config.obsw_config import setGlobals +from tc.obsw_pus_tc_packer import PusTelecommand, create_total_tc_queue, service_test_select from sendreceive.obsw_single_command_sender_receiver import SingleCommandSenderReceiver from sendreceive.obsw_sequential_sender_receiver import SequentialCommandSenderReceiver @@ -76,8 +76,8 @@ from comIF.obsw_ethernet_com_if import EthernetComIF from comIF.obsw_serial_com_if import SerialComIF from comIF.obsw_com_interface import ComIfT -from gui.OBSW_TmtcGUI import TmTcGUI -from gui.OBSW_BackendTest import TmTcBackend +from gui.obsw_tmtc_gui import TmTcGUI +from gui.obsw_backend_test import TmTcBackend def main(): @@ -115,7 +115,7 @@ def main(): elif g.G_MODE_ID == g.ModeList.SingleCommandMode: pus_packet_tuple = command_preparation() sender_and_receiver = SingleCommandSenderReceiver( - comInterface=communication_interface, tmtcPrinter=tmtc_printer, tmListener=tm_listener, + com_interface=communication_interface, tmtc_printer=tmtc_printer, tm_listener=tm_listener, pusPacketTuple=pus_packet_tuple) sender_and_receiver.send_single_tc_and_receive_tm() @@ -127,7 +127,7 @@ def main(): sender_and_receiver.send_queue_tc_and_receive_tm_sequentially() elif g.G_MODE_ID == g.ModeList.SoftwareTestMode: - all_tc_queue = createTotalTcQueue() + all_tc_queue = create_total_tc_queue() sender_and_receiver = SequentialCommandSenderReceiver( com_interface=communication_interface, tmtc_printer=tmtc_printer, tc_queue=all_tc_queue, tm_listener=tm_listener) @@ -159,12 +159,12 @@ def command_preparation(): """ # Direct command which triggers an additional step reply and one completion reply # Single Command Testing - command = PusTelecommand(service=17, subservice=1, SSC=21) + command = PusTelecommand(service=17, subservice=1, ssc=21) # command.print() # file = bytearray([1, 2, 3, 4, 5]) # command = PUSTelecommand(G_SERVICE=23, subservice=1, SSC=21, _tm_data=file) # command.packCommandTuple() - return command.packCommandTuple() + return command.pack_command_tuple() def set_communication_interface(tmtc_printer: TmTcPrinterT) -> ComIfT: @@ -182,7 +182,7 @@ def set_communication_interface(tmtc_printer: TmTcPrinterT) -> ComIfT: else: com_port = g.G_COM_PORT baud_rate = 115200 - g.G_SERIAL_TIMEOUT = 0.05 + g.G_SERIAL_TIMEOUT = 0.03 communication_interface = SerialComIF( tmtc_printer=tmtc_printer, com_port=com_port, baud_rate=baud_rate, serial_timeout=g.G_SERIAL_TIMEOUT) diff --git a/sendreceive/obsw_command_sender_receiver.py b/sendreceive/obsw_command_sender_receiver.py index 6456c171d27f06871c450a9ef1654db8ebb915dd..f6ddc2d8411aaad5c1acb5796b139b97aba83f5e 100644 --- a/sendreceive/obsw_command_sender_receiver.py +++ b/sendreceive/obsw_command_sender_receiver.py @@ -14,7 +14,7 @@ if the first reply has not been received. import sys import time -import config.OBSW_Config as g +import config.obsw_config as g from comIF.obsw_com_interface import CommunicationInterface from utility.obsw_tmtc_printer import TmTcPrinter from sendreceive.obsw_tm_listener import TmListener @@ -27,28 +27,28 @@ class CommandSenderReceiver: This is the generic CommandSenderReceiver object. All TMTC objects inherit this object, for example specific implementations (e.g. SingleCommandSenderReceiver) """ - def __init__(self, comInterface: CommunicationInterface, tmtcPrinter: TmTcPrinter, - tmListener: TmListener): + def __init__(self, com_interface: CommunicationInterface, tmtc_printer: TmTcPrinter, + tm_listener: TmListener): """ - :param comInterface: CommunicationInterface object. Instantiate the desired one + :param com_interface: CommunicationInterface object. Instantiate the desired one and pass it here - :param tmtcPrinter: TmTcPrinter object. Instantiate it and pass it here. + :param tmtc_printer: TmTcPrinter object. Instantiate it and pass it here. """ self._tm_timeout = g.G_TM_TIMEOUT self._tc_send_timeout_factor = g.G_TC_SEND_TIMEOUT_FACTOR - if isinstance(comInterface, CommunicationInterface): - self._com_interface = comInterface + if isinstance(com_interface, CommunicationInterface): + self._com_interface = com_interface else: raise TypeError("Invalid communication interface service_type!") - if isinstance(tmtcPrinter, TmTcPrinter): - self._tmtc_printer = tmtcPrinter + if isinstance(tmtc_printer, TmTcPrinter): + self._tmtc_printer = tmtc_printer else: raise TypeError("Invalid TMTC Printer service_type!") - if isinstance(tmListener, TmListener): - self._tm_listener = tmListener + if isinstance(tm_listener, TmListener): + self._tm_listener = tm_listener else: raise TypeError("Invalid TM Listener service_type!") self._reply_received = False diff --git a/sendreceive/obsw_multiple_commands_sender_receiver.py b/sendreceive/obsw_multiple_commands_sender_receiver.py index c3b7652bba8c47d21e89799e97c7591945a31294..7cc029eae50f6afd338c461a6b711da7250f4693 100644 --- a/sendreceive/obsw_multiple_commands_sender_receiver.py +++ b/sendreceive/obsw_multiple_commands_sender_receiver.py @@ -1,6 +1,6 @@ """ @file - OBSW_Config.py + obsw_config.py @date 01.11.2019 @brief @@ -14,7 +14,7 @@ from sendreceive.obsw_sequential_sender_receiver import SequentialCommandSenderR from comIF.obsw_com_interface import CommunicationInterface from utility.obsw_tmtc_printer import TmTcPrinter from sendreceive.obsw_tm_listener import TmListenerT -import config.OBSW_Config as g +import config.obsw_config as g class MultipleCommandSenderReceiver(SequentialCommandSenderReceiver): diff --git a/sendreceive/obsw_sequential_sender_receiver.py b/sendreceive/obsw_sequential_sender_receiver.py index c0b896ccecc0a0363c6425b80d060e3a0794d406..9cd2b22eb058ab156c37f6f3b2d147bd69d328a2 100644 --- a/sendreceive/obsw_sequential_sender_receiver.py +++ b/sendreceive/obsw_sequential_sender_receiver.py @@ -7,7 +7,7 @@ import sys import time -import config.OBSW_Config as g +import config.obsw_config as g from sendreceive.obsw_command_sender_receiver import CommandSenderReceiver from sendreceive.obsw_tm_listener import TmListenerT from comIF.obsw_com_interface import CommunicationInterface @@ -28,8 +28,8 @@ class SequentialCommandSenderReceiver(CommandSenderReceiver): :param tmtc_printer: TmTcPrinter object, passed on to CommandSenderReceiver for this time period """ - super().__init__(comInterface=com_interface, tmtcPrinter=tmtc_printer, - tmListener=tm_listener) + super().__init__(com_interface=com_interface, tmtc_printer=tmtc_printer, + tm_listener=tm_listener) self._tc_queue = tc_queue self.__first_reply_received = False self.__all_replies_received = False diff --git a/sendreceive/obsw_single_command_sender_receiver.py b/sendreceive/obsw_single_command_sender_receiver.py index 4fa2ea00adc32e9b3d37cef5c1a465ed8fc56d39..4bb1674db51f535892f0215c84f19948767e94e6 100644 --- a/sendreceive/obsw_single_command_sender_receiver.py +++ b/sendreceive/obsw_single_command_sender_receiver.py @@ -1,7 +1,7 @@ #!/usr/bin/python3.8 """ @file - OBSW_Config.py + obsw_config.py @date 01.11.2019 @brief @@ -13,7 +13,7 @@ from sendreceive.obsw_command_sender_receiver import CommandSenderReceiver from sendreceive.obsw_tm_listener import TmListenerT from comIF.obsw_com_interface import CommunicationInterface from utility.obsw_tmtc_printer import TmTcPrinterT -import config.OBSW_Config as g +import config.obsw_config as g class SingleCommandSenderReceiver(CommandSenderReceiver): @@ -21,14 +21,14 @@ class SingleCommandSenderReceiver(CommandSenderReceiver): Specific implementation of CommandSenderReceiver to send a single telecommand This object can be used by instantiating it and calling sendSingleTcAndReceiveTm() """ - def __init__(self, comInterface: CommunicationInterface, tmtcPrinter: TmTcPrinterT, - tmListener: TmListenerT, pusPacketTuple: tuple): + def __init__(self, com_interface: CommunicationInterface, tmtc_printer: TmTcPrinterT, + tm_listener: TmListenerT, pusPacketTuple: tuple): """ - :param comInterface: CommunicationInterface object, passed on to CommandSenderReceiver - :param tmListener: TmListener object which runs in the background and receives all TM - :param tmtcPrinter: TmTcPrinter object, passed on to CommandSenderReceiver + :param com_interface: CommunicationInterface object, passed on to CommandSenderReceiver + :param tm_listener: TmListener object which runs in the background and receives all TM + :param tmtc_printer: TmTcPrinter object, passed on to CommandSenderReceiver """ - super().__init__(comInterface=comInterface, tmListener=tmListener, tmtcPrinter=tmtcPrinter) + super().__init__(com_interface=com_interface, tm_listener=tm_listener, tmtc_printer=tmtc_printer) self.faulty_input = False try: self.pus_packet, self.pus_packet_info = pusPacketTuple diff --git a/sendreceive/obsw_tm_listener.py b/sendreceive/obsw_tm_listener.py index cd935aabb750fef37d79c6e3dbc6cc8a25f9222c..391dab0c990620bfc4595884ace60b273669f4dd 100644 --- a/sendreceive/obsw_tm_listener.py +++ b/sendreceive/obsw_tm_listener.py @@ -15,7 +15,7 @@ from collections import deque from typing import TypeVar from comIF.obsw_com_interface import ComIfT -import config.OBSW_Config as g +import config.obsw_config as g TmListenerT = TypeVar('TmListenerT', bound='TmListener') diff --git a/tc/OBSW_TcService2.py b/tc/OBSW_TcService2.py deleted file mode 100644 index cdc1826e126e7413b51d6a9d89b901b0d35682af..0000000000000000000000000000000000000000 --- a/tc/OBSW_TcService2.py +++ /dev/null @@ -1,62 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Program: obsw_module_test.py -Date: 01.11.2019 -Description: PUS Custom Service 8: Device Access, Native low-level commanding - -@author: R. Mueller -""" -import struct - -from tc.obsw_pus_tc_base import PusTelecommand, Deque -from tc.OBSW_TcService200 import packModeData - - -def packService2TestInto(tcQueue: Deque, calledExternally: bool = False) -> Deque: - if calledExternally is False: - tcQueue.appendleft(("print", "Testing Service 2")) - objectId = bytearray([0x44, 0x00, 0xAF, 0xFE]) # dummy device - # don't forget to set object mode raw (G_SERVICE 200) before calling this ! - # Set Raw Mode - tcQueue.appendleft(("print", "\r\nTesting Service 2: Setting Raw Mode")) - modeData = packModeData(objectId, 3, 0) - command = PusTelecommand(service=200, subservice=1, SSC=2020, data=modeData) - tcQueue.appendleft(command.packCommandTuple()) - # toggle wiretapping raw - tcQueue.appendleft(("print", "\r\nTesting Service 2: Toggling Wiretapping Raw")) - wiretappingToggleData = packWiretappingMode(objectId, 1) - toggleWiretappingOnCommand = PusTelecommand(service=2, subservice=129, SSC=200, - data=wiretappingToggleData) - tcQueue.appendleft(toggleWiretappingOnCommand.packCommandTuple()) - # send raw command, _tm_data should be returned via TM[2,130] and TC[2,131] - tcQueue.appendleft(("print", "\r\nTesting Service 2: Sending Raw Command")) - rawCommand = struct.pack(">I", 666) - rawData = objectId + rawCommand - rawCommand = PusTelecommand(service=2, subservice=128, SSC=201, data=rawData) - tcQueue.appendleft(rawCommand.packCommandTuple()) - # toggle wiretapping off - tcQueue.appendleft(("print", "\r\nTesting Service 2: Toggle Wiretapping Off")) - wiretappingToggleData = packWiretappingMode(objectId, 0) - toggleWiretappingOffCommand = PusTelecommand(service=2, subservice=129, SSC=204, - data=wiretappingToggleData) - tcQueue.appendleft(toggleWiretappingOffCommand.packCommandTuple()) - # send raw command which should be returned via TM[2,130] - tcQueue.appendleft(("print", "\r\nTesting Service 2: Send second raw command")) - command = PusTelecommand(service=2, subservice=128, SSC=205, data=rawData) - tcQueue.appendleft(command.packCommandTuple()) - tcQueue.appendleft(("print", "\r")) - if calledExternally is False: - tcQueue.appendleft(("export", "log/tmtc_log_service2.txt")) - return tcQueue - - -# wiretappingMode = 0: MODE_OFF, wiretappingMode = 1: MODE_RAW -def packWiretappingMode(objectId, wiretappingMode_): - wiretappingMode = struct.pack(">B", wiretappingMode_) # MODE_OFF : 0x00, MODE_RAW: 0x01 - wiretappingToggleData = objectId + wiretappingMode - return wiretappingToggleData - - -def packSpecificService2TestInto(tcQueue, objectIdList, dataList, sscList, printStringList, - calledExternally=False): - pass diff --git a/tc/OBSW_TcService3.py b/tc/OBSW_TcService3.py deleted file mode 100644 index 1c9de0a30e342bd5b3454bd50958f157b57ac2e6..0000000000000000000000000000000000000000 --- a/tc/OBSW_TcService3.py +++ /dev/null @@ -1,124 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Program: obsw_module_test.py -Date: 01.11.2019 -Description: PUS Custom Service 8: Device Access, Native low-level commanding - -@author: R. Mueller -""" -import struct -from typing import Deque -from tc.obsw_pus_tc_base import PusTelecommand - - -def packService3TestInto(tcQueue: Deque) -> Deque: - tcQueue.appendleft(("print", "Testing Service 3")) - # adding custom defintion to hk using test pool variables - sid1 = bytearray([0x00, 0x00, 0x43, 0x00]) - sid2 = bytearray([0x00, 0x00, 0x44, 0x00]) - sidGps = bytearray([0x00, 0x00, 0x1f, 0x00]) - collectionInterval = struct.pack('>f', 3) - numberOfParameters = struct.pack('B', 5) - p1 = bytearray([0x01, 0x01, 0x01, 0x01]) - p2 = bytearray([0x02, 0x02, 0x02, 0x02]) - p3 = bytearray([0x03, 0x03, 0x03, 0x03]) - p4 = bytearray([0x04, 0x04, 0x04, 0x04]) - p5 = bytearray([0x05, 0x05, 0x05, 0x05]) - hkDefinition1 = sid1 + collectionInterval + numberOfParameters + p1 + p2 + p3 + p4 + p5 - collectionInterval = struct.pack('>f', 6) - hkDefinition2 = sid2 + collectionInterval + numberOfParameters + p1 + p2 + p3 + p4 + p5 - - # deleting pre-defined test entry - tcQueue.appendleft(("print", "\r\nTesting Service 3: Deleting pre-defined HK definition")) - command = PusTelecommand(service=3, subservice=3, SSC=3000, data=sid1) - tcQueue.appendleft(command.packCommandTuple()) - - # adding pre-defined definition to hk using test pool variables - tcQueue.appendleft(("print", "\r\nTesting Service 3: Adding pre-defined HK definition")) - command = PusTelecommand(service=3, subservice=1, SSC=3010, data=hkDefinition1) - tcQueue.appendleft(command.packCommandTuple()) - - # adding custom definition to diagnostics using test pool variables - tcQueue.appendleft(("print", "\r\nTesting Service 3: Adding custom diganostics definition")) - command = PusTelecommand(service=3, subservice=2, SSC=3020, data=hkDefinition2) - tcQueue.appendleft(command.packCommandTuple()) - - # enable custom hk definition - tcQueue.appendleft(("print", "\r\nTesting Service 3: Enable custom definition")) - command = PusTelecommand(service=3, subservice=5, SSC=3030, data=sid1) - tcQueue.appendleft(command.packCommandTuple()) - # enable custom diag definition - tcQueue.appendleft(("print", "\r\nTesting Service 3: Enable custom diagnostics definition")) - command = PusTelecommand(service=3, subservice=7, SSC=3040, data=sid2) - tcQueue.appendleft(command.packCommandTuple()) - # enable gps0 - tcQueue.appendleft(("print", "\r\nTesting Service 3: Enable GPS definition")) - command = PusTelecommand(service=3, subservice=5, SSC=3050, data=sidGps) - tcQueue.appendleft(command.packCommandTuple()) - - # maybe wait a bit to receive at least 2 packets.. - tcQueue.appendleft(("wait", 3)) - - # Disable custom hk definition - tcQueue.appendleft(("print", "\r\nTesting Service 3: Disable custom definition")) - command = PusTelecommand(service=3, subservice=6, SSC=3060, data=sid1) - tcQueue.appendleft(command.packCommandTuple()) - # Disable custom diag definition - tcQueue.appendleft(("print", "\r\nTesting Service 3: Disable custom diagnostics definition")) - command = PusTelecommand(service=3, subservice=8, SSC=3070, data=sid2) - tcQueue.appendleft(command.packCommandTuple()) - # disable gps0 - tcQueue.appendleft(("print", "\r\nTesting Service 3: Disable GPS definition")) - command = PusTelecommand(service=3, subservice=6, SSC=3080, data=sidGps) - tcQueue.appendleft(command.packCommandTuple()) - # report custom Diag definition - tcQueue.appendleft(("print", "\r\nTesting Service 3: Reporting diag definition")) - command = PusTelecommand(service=3, subservice=11, SSC=3100, data=sid2) - tcQueue.appendleft(command.packCommandTuple()) - # report gps definition - tcQueue.appendleft(("print", "\r\nTesting Service 3: Reporting GPS definition")) - command = PusTelecommand(service=3, subservice=9, SSC=3110, data=sidGps) - tcQueue.appendleft(command.packCommandTuple()) - # generate one custom hk definition - tcQueue.appendleft(("print", "\r\nTesting Service 3: Generate one custom hk definition")) - command = PusTelecommand(service=3, subservice=27, SSC=3120, data=sid1) - tcQueue.appendleft(command.packCommandTuple()) - # generate one custom diag definition - tcQueue.appendleft(("print", "\r\nTesting Service 3: Generate one custom diagnostics definition")) - command = PusTelecommand(service=3, subservice=28, SSC=3120, data=sid2) - tcQueue.appendleft(command.packCommandTuple()) - # generate one gps 0 definition - tcQueue.appendleft(("print", "\r\nTesting Service 3: Generate one gps 0 defintion")) - command = PusTelecommand(service=3, subservice=27, SSC=3120, data=sidGps) - tcQueue.appendleft(command.packCommandTuple()) - # modify custom hk definition interval - newInterval = struct.pack('>f', 10.0) - newIntervalCommand = sid1 + newInterval - tcQueue.appendleft(("print", "\r\nTesting Service 3: Changing pre-defined HK definition interval")) - command = PusTelecommand(service=3, subservice=31, SSC=3090, data=newIntervalCommand) - tcQueue.appendleft(command.packCommandTuple()) - # report custom HK definition - tcQueue.appendleft(("print", "\r\nTesting Service 3: Reporting pre-defined HK definition with changed interval")) - command = PusTelecommand(service=3, subservice=9, SSC=3090, data=sid1) - tcQueue.appendleft(command.packCommandTuple()) - # modify custom diag definition interval - newIntervalCommand = sid2 + newInterval - tcQueue.appendleft(("print", "\r\nTesting Service 3: Changing custom diag HK definition interval")) - command = PusTelecommand(service=3, subservice=32, SSC=3090, data=newIntervalCommand) - tcQueue.appendleft(command.packCommandTuple()) - # report custom diag definition - tcQueue.appendleft(("print", "\r\nTesting Service 3: Reporting diag definition")) - command = PusTelecommand(service=3, subservice=11, SSC=3100, data=sid2) - tcQueue.appendleft(command.packCommandTuple()) - # append parameter to custom hk definiton - # append parameter to custom diag definition - - # delete custom diag definition - tcQueue.appendleft(("print", "\r\nTesting Service 3: Deleting custom diagnostics definition")) - command = PusTelecommand(service=3, subservice=4, SSC=3120, data=sid2) - tcQueue.appendleft(command.packCommandTuple()) - - # do some basic testing on predefined structs too - # e.g. add one variable, change interval, report them.... - tcQueue.appendleft(("export", "log/tmtc_log_service3.txt")) - return tcQueue diff --git a/tc/OBSW_TcService8.py b/tc/OBSW_TcService8.py deleted file mode 100644 index 98e8d2c7e19bd02260cfdfb51017776e043a0b2d..0000000000000000000000000000000000000000 --- a/tc/OBSW_TcService8.py +++ /dev/null @@ -1,59 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Program: obsw_module_test.py -Date: 01.11.2019 -Description: PUS Custom Service 8: Function Management, High-Level Commanding - -@author: R. Mueller -""" -import struct -from typing import Deque - -from tc.obsw_pus_tc_base import PusTelecommand -from tc.OBSW_TcService200 import packModeData - - -def packService8TestInto(tcQueue: Deque, calledExternally: bool = False) -> Deque: - if calledExternally is False: - tcQueue.appendleft(("print", "Testing Service 8")) - objectId = bytearray([0x44, 0x00, 0xAF, 0xFE]) - # set mode on - tcQueue.appendleft(("print", "\r\nTesting Service 8: Set Normal Mode")) - modeData = packModeData(objectId, 1, 0) - command = PusTelecommand(service=200, subservice=1, SSC=800, data=modeData) - tcQueue.appendleft(command.packCommandTuple()) - # set mode normal - tcQueue.appendleft(("print", "\r\nTesting Service 8: Set Normal Mode")) - modeData = packModeData(objectId, 2, 0) - command = PusTelecommand(service=200, subservice=1, SSC=810, data=modeData) - tcQueue.appendleft(command.packCommandTuple()) - # Direct command which triggers completion reply - tcQueue.appendleft(("print", "\r\nTesting Service 8: Trigger Completion Reply")) - actionId = struct.pack(">I", 666) - directCommand = objectId + actionId - command = PusTelecommand(service=8, subservice=128, SSC=820, data=directCommand) - tcQueue.appendleft(command.packCommandTuple()) - # Direct command which triggers _tm_data reply - tcQueue.appendleft(("print", "\r\nTesting Service 8: Trigger Data Reply")) - actionId = bytearray([0xC0, 0xC0, 0xBA, 0xBE]) - commandParam1 = bytearray([0xBA, 0xB0]) - commandParam2 = bytearray([0x00, 0x00, 0x00, 0x52, 0x4F, 0x42, 0x49, 0x4E]) - directCommand = objectId + actionId + commandParam1 + commandParam2 - command = PusTelecommand(service=8, subservice=128, SSC=830, data=directCommand) - tcQueue.appendleft(command.packCommandTuple()) - # Direct command which triggers an additional step reply and one completion reply - tcQueue.appendleft(("print", "\r\nTesting Service 8: Trigger Step and Completion Reply")) - actionId = bytearray([0xBA, 0xDE, 0xAF, 0xFE]) - directCommand = objectId + actionId - command = PusTelecommand(service=8, subservice=128, SSC=840, data=directCommand) - tcQueue.appendleft(command.packCommandTuple()) - tcQueue.appendleft(("wait", 2)) - tcQueue.appendleft(("print", "\r")) - if calledExternally is False: - tcQueue.appendleft(("export", "log/tmtc_log_service8.txt")) - return tcQueue - - -def packSpecificService8TestInto(tcQueue, objectIdList, actionIdList, dataList, - sscList, printStringList, calledExternally=False): - pass diff --git a/tc/obsw_pus_tc_base.py b/tc/obsw_pus_tc_base.py index 473f4c2345dad769a94650c5f1af7cf4629d4fd6..22061afc7c0328a2a9aec63a36d817c539a7a9b2 100644 --- a/tc/obsw_pus_tc_base.py +++ b/tc/obsw_pus_tc_base.py @@ -4,16 +4,26 @@ Created on Wed Apr 4 11:43:00 2018 @author: S. Gaisser, R. Mueller """ - +import sys import crcmod import logging +from enum import Enum from typing import TypeVar, Dict, Union, Tuple, Deque PusTcT = TypeVar('PusTcT', bound='PUSTelecommand') -PusTcInfoT = Union[Dict[str, any], None] + + +class TcDictionaryKeys(Enum): + SERVICE = 1 + SUBSERVICE = 2 + SSC = 3 + PACKED_ID = 4 + DATA = 5 + + +PusTcInfoT = Union[Dict[TcDictionaryKeys, any], None] PusTcTupleT = Tuple[bytearray, PusTcInfoT] TcAuxiliaryTupleT = Tuple[str, any] -# Union: Type can be either of those 2 TcQueueEntryT = Union[TcAuxiliaryTupleT, PusTcTupleT] TcQueueT = Deque[TcQueueEntryT] PusTcInfoQueueT = Deque[PusTcInfoT] @@ -22,65 +32,74 @@ PusTcInfoQueueT = Deque[PusTcInfoT] class PusTelecommand: headerSize = 6 - def __init__(self, service: int, subservice: int, SSC=0, data=bytearray([]), sourceId: int = 0, - version: int = 0, packetType: int = 1, dataFieldHeaderFlag: int = 1, + def __init__(self, service: int, subservice: int, ssc=0, data=bytearray([]), source_id: int = 0, + version: int = 0, packet_type: int = 1, data_field_header_flag: int = 1, apid: int = 0x73, length: int = 0): self.packet_id = [0x0, 0x0] - self.packet_id[0] = ((version << 5) & 0xE0) | ((packetType & 0x01) << 4) | \ - ((dataFieldHeaderFlag & 0x01) << 3) | ((apid & 0x700) >> 8) + self.packet_id[0] = ((version << 5) & 0xE0) | ((packet_type & 0x01) << 4) | \ + ((data_field_header_flag & 0x01) << 3) | ((apid & 0x700) >> 8) self.packet_id[1] = apid & 0xFF - self.ssc = SSC - self.psc = (SSC & 0x3FFF) | (0xC0 << 8) + self.ssc = ssc + self.psc = (ssc & 0x3FFF) | (0xC0 << 8) self.length = length - self.pusVersionAndAckByte = 0b00011111 + self.pus_version_and_ack_byte = 0b00011111 self.service = service self.subservice = subservice - self.sourceId = sourceId + self.source_id = source_id self.data = data - def getLength(self) -> int: + def get_length(self) -> int: + """ + Retrieve size of TC packet in bytes. + """ try: length = 4 + len(self.data) + 1 return length except TypeError: print("OBSW_TcPacket: Invalid service_type of data") logging.exception("Error") - exit() + sys.exit() def pack(self) -> bytearray: - dataToPack = bytearray() - dataToPack.append(self.packet_id[0]) - dataToPack.append(self.packet_id[1]) - dataToPack.append((self.psc & 0xFF00) >> 8) - dataToPack.append(self.psc & 0xFF) - length = self.getLength() - dataToPack.append((length & 0xFF00) >> 8) - dataToPack.append(length & 0xFF) - dataToPack.append(self.pusVersionAndAckByte) - dataToPack.append(self.service) - dataToPack.append(self.subservice) - dataToPack.append(self.sourceId) - dataToPack += self.data + """ + Serializes the TC data fields into a bytearray. + """ + data_to_pack = bytearray() + data_to_pack.append(self.packet_id[0]) + data_to_pack.append(self.packet_id[1]) + data_to_pack.append((self.psc & 0xFF00) >> 8) + data_to_pack.append(self.psc & 0xFF) + length = self.get_length() + data_to_pack.append((length & 0xFF00) >> 8) + data_to_pack.append(length & 0xFF) + data_to_pack.append(self.pus_version_and_ack_byte) + data_to_pack.append(self.service) + data_to_pack.append(self.subservice) + data_to_pack.append(self.source_id) + data_to_pack += self.data crc_func = crcmod.mkCrcFun(0x11021, rev=False, initCrc=0xFFFF, xorOut=0x0000) - crc = crc_func(dataToPack) - - dataToPack.append((crc & 0xFF00) >> 8) - dataToPack.append(crc & 0xFF) - return dataToPack - - def packInformation(self) -> PusTcInfoT: - tcInformation = { - "service": self.service, - "subservice": self.subservice, - "ssc": self.ssc, - "packet_id": self.packet_id, - "data": self.data + crc = crc_func(data_to_pack) + + data_to_pack.append((crc & 0xFF00) >> 8) + data_to_pack.append(crc & 0xFF) + return data_to_pack + + def pack_information(self) -> PusTcInfoT: + """ + Packs TM information into a dictionary. + """ + tc_information = { + TcDictionaryKeys.SERVICE: self.service, + TcDictionaryKeys.SUBSERVICE: self.subservice, + TcDictionaryKeys.SSC: self.ssc, + TcDictionaryKeys.PACKED_ID: self.packet_id, + TcDictionaryKeys.DATA: self.data } - return tcInformation + return tc_information - def packCommandTuple(self) -> PusTcTupleT: - commandTuple = (self.pack(), self.packInformation()) - return commandTuple + def pack_command_tuple(self) -> PusTcTupleT: + command_tuple = (self.pack(), self.pack_information()) + return command_tuple def print(self): packet = self.pack() @@ -96,22 +115,22 @@ class PusTelecommand: # Takes pusPackets, removes current Packet Error Control, # calculates new CRC (16 bits at wiretapping_packet end) and # adds it as correct Packet Error Control Code. Reference: ECSS-E70-41A p. 207-212 -def generatePacketCRC(TCPacket: PusTcT) -> PusTcT: +def generate_packet_crc(tc_packet: PusTcT) -> PusTcT: crc_func = crcmod.mkCrcFun(0x11021, rev=False, initCrc=0xFFFF, xorOut=0x0000) - crc = crc_func(bytearray(TCPacket[0:len(TCPacket) - 2])) - TCPacket[len(TCPacket) - 2] = (crc & 0xFF00) >> 8 - TCPacket[len(TCPacket) - 1] = crc & 0xFF - return TCPacket + crc = crc_func(bytearray(tc_packet[0:len(tc_packet) - 2])) + tc_packet[len(tc_packet) - 2] = (crc & 0xFF00) >> 8 + tc_packet[len(tc_packet) - 1] = crc & 0xFF + return tc_packet -def generateCRC(data: bytearray) -> bytearray: - dataWithCRC = bytearray() - dataWithCRC += data +def generate_crc(data: bytearray) -> bytearray: + data_with_crc = bytearray() + data_with_crc += data crc_func = crcmod.mkCrcFun(0x11021, rev=False, initCrc=0xFFFF, xorOut=0x0000) crc = crc_func(data) - dataWithCRC.append((crc & 0xFF00) >> 8) - dataWithCRC.append(crc & 0xFF) - return dataWithCRC + data_with_crc.append((crc & 0xFF00) >> 8) + data_with_crc.append(crc & 0xFF) + return data_with_crc # pylint: disable=line-too-long diff --git a/tc/obsw_pus_tc_packer.py b/tc/obsw_pus_tc_packer.py index 5ec05f6a3b38fbf241319e2849e0054dd3dc461f..2f378a42cfbb20519246ce95ff37780dc47965b7 100644 --- a/tc/obsw_pus_tc_packer.py +++ b/tc/obsw_pus_tc_packer.py @@ -14,204 +14,204 @@ Run script with -s <Service or Device> -m 3 with optional -p parameter for file import os from tc.obsw_pus_tc_base import PusTelecommand, TcQueueT -from tc.OBSW_TcService2 import packService2TestInto -from tc.OBSW_TcService3 import packService3TestInto -from tc.OBSW_TcService8 import packService8TestInto -from tc.OBSW_TcService200 import packModeData, packService200TestInto -import config.OBSW_Config as g +from tc.obsw_tc_service2 import pack_service2_test_into +from tc.obsw_tc_service3 import pack_service3_test_into +from tc.obsw_tc_service8 import pack_service8_test_into +from tc.obsw_tc_service200 import packModeData, pack_service200_test_into +import config.obsw_config as g from datetime import datetime from collections import deque from typing import Union -def service_test_select(service: Union[int, str], serviceQueue: TcQueueT) -> TcQueueT: +def service_test_select(service: Union[int, str], service_queue: TcQueueT) -> TcQueueT: if service == 2: - return packService2TestInto(serviceQueue) + return pack_service2_test_into(service_queue) elif service == 3: - return packService3TestInto(serviceQueue) + return pack_service3_test_into(service_queue) elif service == 5: - return packService5TestInto(serviceQueue) + return pack_service5_test_into(service_queue) elif service == 8: - return packService8TestInto(serviceQueue) + return pack_service8_test_into(service_queue) elif service == 9: - return packService9TestInto(serviceQueue) + return pack_service9_test_into(service_queue) elif service == 17: - return packService17TestInto(serviceQueue) + return pack_service17_test_into(service_queue) elif service == 200: - return packService200TestInto(serviceQueue) + return pack_service200_test_into(service_queue) elif service == "Dummy": - return packDummyDeviceTestInto(serviceQueue) + return pack_dummy_device_test_into(service_queue) elif service == "GPS0": # Object ID: GPS Device - objectId = g.GPS0_ObjectId - return packGpsTestInto(objectId, serviceQueue) + object_id = g.GPS0_ObjectId + return pack_gps_test_into(object_id, service_queue) elif service == "GPS1": # Object ID: GPS Device - objectId = g.GPS1_ObjectId - return packGpsTestInto(objectId, serviceQueue) + object_id = g.GPS1_ObjectId + return pack_gps_test_into(object_id, service_queue) elif service == "Error": - return packErrorTestingInto(serviceQueue) + return pack_error_testing_into(service_queue) else: print("Invalid Service !") exit() -def packService5TestInto(tcQueue: TcQueueT) -> TcQueueT: - tcQueue.appendleft(("print", "Testing Service 5")) +def pack_service5_test_into(tc_queue: TcQueueT) -> TcQueueT: + tc_queue.appendleft(("print", "Testing Service 5")) # disable events - tcQueue.appendleft(("print", "\r\nTesting Service 5: Disable event")) - command = PusTelecommand(service=5, subservice=6, SSC=500) - tcQueue.appendleft(command.packCommandTuple()) + tc_queue.appendleft(("print", "\r\nTesting Service 5: Disable event")) + command = PusTelecommand(service=5, subservice=6, ssc=500) + tc_queue.appendleft(command.pack_command_tuple()) # trigger event - tcQueue.appendleft(("print", "\r\nTesting Service 5: Trigger event")) - command = PusTelecommand(service=17, subservice=128, SSC=510) - tcQueue.appendleft(command.packCommandTuple()) + tc_queue.appendleft(("print", "\r\nTesting Service 5: Trigger event")) + command = PusTelecommand(service=17, subservice=128, ssc=510) + tc_queue.appendleft(command.pack_command_tuple()) # enable event - tcQueue.appendleft(("print", "\r\nTesting Service 5: Enable event")) - command = PusTelecommand(service=5, subservice=5, SSC=520) - tcQueue.appendleft(command.packCommandTuple()) + tc_queue.appendleft(("print", "\r\nTesting Service 5: Enable event")) + command = PusTelecommand(service=5, subservice=5, ssc=520) + tc_queue.appendleft(command.pack_command_tuple()) # trigger event - tcQueue.appendleft(("print", "\r\nTesting Service 5: Trigger another event")) - command = PusTelecommand(service=17, subservice=128, SSC=530) - tcQueue.appendleft(command.packCommandTuple()) - tcQueue.appendleft(("print", "\r")) - tcQueue.appendleft(("export", "log/tmtc_log_service5.txt")) - return tcQueue - - -def packService9TestInto(tcQueue: TcQueueT) -> TcQueueT: - tcQueue.appendleft(("print", "Testing Service 9")) - timeTestASCIICodeA = '2019-08-30T20:50:33.892429Z' + '\0' - timeTestASCIICodeB = '2019-270T05:50:33.002000Z' + '\0' - timeTestCurrentTime = datetime.now().isoformat() + "Z" + '\0' - timeTest1 = timeTestASCIICodeA.encode('ascii') - timeTest2 = timeTestASCIICodeB.encode('ascii') - timeTest3 = timeTestCurrentTime.encode('ascii') - print("Time Code 1 :" + str(timeTest1)) - print("Time Code 2 :" + str(timeTest2)) - print("Time Code 3 :" + str(timeTest3) + "\r") + tc_queue.appendleft(("print", "\r\nTesting Service 5: Trigger another event")) + command = PusTelecommand(service=17, subservice=128, ssc=530) + tc_queue.appendleft(command.pack_command_tuple()) + tc_queue.appendleft(("print", "\r")) + tc_queue.appendleft(("export", "log/tmtc_log_service5.txt")) + return tc_queue + + +def pack_service9_test_into(tc_queue: TcQueueT) -> TcQueueT: + tc_queue.appendleft(("print", "Testing Service 9")) + time_test_ascii_code_a = '2019-08-30T20:50:33.892429Z' + '\0' + time_test_ascii_code_b = '2019-270T05:50:33.002000Z' + '\0' + time_test_current_time = datetime.now().isoformat() + "Z" + '\0' + time_test1 = time_test_ascii_code_a.encode('ascii') + time_test2 = time_test_ascii_code_b.encode('ascii') + time_test3 = time_test_current_time.encode('ascii') + print("Time Code 1 :" + str(time_test1)) + print("Time Code 2 :" + str(time_test2)) + print("Time Code 3 :" + str(time_test3) + "\r") # time setting - tcQueue.appendleft(("print", "\r\nTesting Service 9: Testing timecode A")) - command = PusTelecommand(service=9, subservice=128, SSC=900, data=timeTest1) - tcQueue.appendleft(command.packCommandTuple()) - tcQueue.appendleft(("print", "\r\nTesting Service 9: Testing timecode B")) - command = PusTelecommand(service=9, subservice=128, SSC=910, data=timeTest2) - tcQueue.appendleft(command.packCommandTuple()) - tcQueue.appendleft(("print", "\r\nTesting Service 9: Testing timecode Current Time")) - command = PusTelecommand(service=9, subservice=128, SSC=920, data=timeTest3) - tcQueue.appendleft(command.packCommandTuple()) + tc_queue.appendleft(("print", "\r\nTesting Service 9: Testing timecode A")) + command = PusTelecommand(service=9, subservice=128, ssc=900, data=time_test1) + tc_queue.appendleft(command.pack_command_tuple()) + tc_queue.appendleft(("print", "\r\nTesting Service 9: Testing timecode B")) + command = PusTelecommand(service=9, subservice=128, ssc=910, data=time_test2) + tc_queue.appendleft(command.pack_command_tuple()) + tc_queue.appendleft(("print", "\r\nTesting Service 9: Testing timecode Current Time")) + command = PusTelecommand(service=9, subservice=128, ssc=920, data=time_test3) + tc_queue.appendleft(command.pack_command_tuple()) # TODO: Add other time formats here - tcQueue.appendleft(("print", "\r")) - tcQueue.appendleft(("export", "log/tmtc_log_service9.txt")) - return tcQueue + tc_queue.appendleft(("print", "\r")) + tc_queue.appendleft(("export", "log/tmtc_log_service9.txt")) + return tc_queue -def packService17TestInto(tcQueue: TcQueueT) -> TcQueueT: - tcQueue.appendleft(("print", "Testing Service 17")) +def pack_service17_test_into(tc_queue: TcQueueT) -> TcQueueT: + tc_queue.appendleft(("print", "Testing Service 17")) # ping test - tcQueue.appendleft(("print", "\n\rTesting Service 17: Ping Test")) - command = PusTelecommand(service=17, subservice=1, SSC=1700) - tcQueue.appendleft(command.packCommandTuple()) + tc_queue.appendleft(("print", "\n\rTesting Service 17: Ping Test")) + command = PusTelecommand(service=17, subservice=1, ssc=1700) + tc_queue.appendleft(command.pack_command_tuple()) # enable event - tcQueue.appendleft(("print", "\n\rTesting Service 17: Enable Event")) - command = PusTelecommand(service=5, subservice=5, SSC=52) - tcQueue.appendleft(command.packCommandTuple()) + tc_queue.appendleft(("print", "\n\rTesting Service 17: Enable Event")) + command = PusTelecommand(service=5, subservice=5, ssc=52) + tc_queue.appendleft(command.pack_command_tuple()) # test event - tcQueue.appendleft(("print", "\n\rTesting Service 17: Trigger event")) - command = PusTelecommand(service=17, subservice=128, SSC=1701) - tcQueue.appendleft(command.packCommandTuple()) - tcQueue.appendleft(("print", "\r")) - tcQueue.appendleft(("export", "log/tmtc_log_service17.txt")) - return tcQueue + tc_queue.appendleft(("print", "\n\rTesting Service 17: Trigger event")) + command = PusTelecommand(service=17, subservice=128, ssc=1701) + tc_queue.appendleft(command.pack_command_tuple()) + tc_queue.appendleft(("print", "\r")) + tc_queue.appendleft(("export", "log/tmtc_log_service17.txt")) + return tc_queue -def packDummyDeviceTestInto(tcQueue: TcQueueT) -> TcQueueT: - tcQueue.appendleft(("print", "Testing Dummy Device")) +def pack_dummy_device_test_into(tc_queue: TcQueueT) -> TcQueueT: + tc_queue.appendleft(("print", "Testing Dummy Device")) # Object ID: Dummy Device objectId = bytearray([0x44, 0x00, 0xAF, 0xFE]) # Set On Mode - tcQueue.appendleft(("print", "\n\rTesting Service Dummy: Set On")) + tc_queue.appendleft(("print", "\n\rTesting Service Dummy: Set On")) modeData = packModeData(objectId, 1, 0) - command = PusTelecommand(service=200, subservice=1, SSC=1, data=modeData) - tcQueue.appendleft(command.packCommandTuple()) + command = PusTelecommand(service=200, subservice=1, ssc=1, data=modeData) + tc_queue.appendleft(command.pack_command_tuple()) # Test Service 2 commands - tcQueue.appendleft(("print", "\n\rTesting Service Dummy: Service 2")) - packService2TestInto(tcQueue, True) + tc_queue.appendleft(("print", "\n\rTesting Service Dummy: Service 2")) + pack_service2_test_into(tc_queue, True) # Test Service 8 - tcQueue.appendleft(("print", "\n\rTesting Service Dummy: Service 8")) - packService8TestInto(tcQueue, True) - tcQueue.appendleft(("print", "\r")) - tcQueue.appendleft(("export", "log/tmtc_log_service_dummy.txt")) - return tcQueue - - -def packGpsTestInto(objectId: bytearray, tcQueue: TcQueueT) -> TcQueueT: - if objectId == g.GPS0_ObjectId: - gpsString = "GPS0" - elif objectId == g.GPS1_ObjectId: - gpsString = "GPS1" + tc_queue.appendleft(("print", "\n\rTesting Service Dummy: Service 8")) + pack_service8_test_into(tc_queue, True) + tc_queue.appendleft(("print", "\r")) + tc_queue.appendleft(("export", "log/tmtc_log_service_dummy.txt")) + return tc_queue + + +def pack_gps_test_into(object_id: bytearray, tc_queue: TcQueueT) -> TcQueueT: + if object_id == g.GPS0_ObjectId: + gps_string = "GPS0" + elif object_id == g.GPS1_ObjectId: + gps_string = "GPS1" else: - gpsString = "unknown" - tcQueue.appendleft(("print", "Testing " + gpsString + " Device")) + gps_string = "unknown" + tc_queue.appendleft(("print", "Testing " + gps_string + " Device")) # Set Mode Off - tcQueue.appendleft(("print", "\n\rTesting " + gpsString + ": Set Off")) - modeData = packModeData(objectId, 0, 0) - command = PusTelecommand(service=200, subservice=1, SSC=11, data=modeData) - tcQueue.appendleft(command.packCommandTuple()) + tc_queue.appendleft(("print", "\n\rTesting " + gps_string + ": Set Off")) + mode_data = packModeData(object_id, 0, 0) + command = PusTelecommand(service=200, subservice=1, ssc=11, data=mode_data) + tc_queue.appendleft(command.pack_command_tuple()) # Set Mode On - tcQueue.appendleft(("print", "\n\rTesting " + gpsString + ": Set On")) - modeData = packModeData(objectId, 1, 0) - command = PusTelecommand(service=200, subservice=1, SSC=12, data=modeData) - tcQueue.appendleft(command.packCommandTuple()) + tc_queue.appendleft(("print", "\n\rTesting " + gps_string + ": Set On")) + mode_data = packModeData(object_id, 1, 0) + command = PusTelecommand(service=200, subservice=1, ssc=12, data=mode_data) + tc_queue.appendleft(command.pack_command_tuple()) # Enable HK report - sidGps = 0 - if objectId == g.GPS0_ObjectId: - sidGps = g.GPS0_SID - tcQueue.appendleft(("print", "\r\nTesting " + gpsString + ": Enable HK Reporting")) - command = PusTelecommand(service=3, subservice=5, SSC=13, data=sidGps) - tcQueue.appendleft(command.packCommandTuple()) - elif objectId == g.GPS1_ObjectId: - sidGps = g.GPS1_SID - tcQueue.appendleft(("print", "\r\nTesting " + gpsString + ": Enable HK Reporting")) - command = PusTelecommand(service=3, subservice=5, SSC=14, data=sidGps) - tcQueue.appendleft(command.packCommandTuple()) + sid_gps = 0 + if object_id == g.GPS0_ObjectId: + sid_gps = g.GPS0_SID + tc_queue.appendleft(("print", "\r\nTesting " + gps_string + ": Enable HK Reporting")) + command = PusTelecommand(service=3, subservice=5, ssc=13, data=sid_gps) + tc_queue.appendleft(command.pack_command_tuple()) + elif object_id == g.GPS1_ObjectId: + sid_gps = g.GPS1_SID + tc_queue.appendleft(("print", "\r\nTesting " + gps_string + ": Enable HK Reporting")) + command = PusTelecommand(service=3, subservice=5, ssc=14, data=sid_gps) + tc_queue.appendleft(command.pack_command_tuple()) # pack wait interval until mode is on and a few gps replies have been received - tcQueue.appendleft(("wait", 5)) + tc_queue.appendleft(("wait", 5)) # Disable HK reporting - tcQueue.appendleft(("print", "\r\nTesting Service 3: Disable " + gpsString + " definition")) - command = PusTelecommand(service=3, subservice=6, SSC=15, data=sidGps) - tcQueue.appendleft(command.packCommandTuple()) + tc_queue.appendleft(("print", "\r\nTesting Service 3: Disable " + gps_string + " definition")) + command = PusTelecommand(service=3, subservice=6, ssc=15, data=sid_gps) + tc_queue.appendleft(command.pack_command_tuple()) # Set Mode Off - tcQueue.appendleft(("print", "\n\rTesting " + gpsString + ": Set Off")) - modeData = packModeData(objectId, 0, 0) - command = PusTelecommand(service=200, subservice=1, SSC=13, data=modeData) - tcQueue.appendleft(command.packCommandTuple()) - tcQueue.appendleft(("print", "\r")) - tcQueue.appendleft(("export", "log/tmtc_log_service_" + gpsString + ".txt")) - return tcQueue + tc_queue.appendleft(("print", "\n\rTesting " + gps_string + ": Set Off")) + mode_data = packModeData(object_id, 0, 0) + command = PusTelecommand(service=200, subservice=1, ssc=13, data=mode_data) + tc_queue.appendleft(command.pack_command_tuple()) + tc_queue.appendleft(("print", "\r")) + tc_queue.appendleft(("export", "log/tmtc_log_service_" + gps_string + ".txt")) + return tc_queue -def packErrorTestingInto(tcQueue: TcQueueT) -> TcQueueT: +def pack_error_testing_into(tc_queue: TcQueueT) -> TcQueueT: # a lot of events - command = PusTelecommand(service=17, subservice=129, SSC=2010) - tcQueue.appendleft(command.packCommandTuple()) + command = PusTelecommand(service=17, subservice=129, ssc=2010) + tc_queue.appendleft(command.pack_command_tuple()) # a lot of ping testing - command = PusTelecommand(service=17, subservice=130, SSC=2020) - tcQueue.appendleft(command.packCommandTuple()) - return tcQueue + command = PusTelecommand(service=17, subservice=130, ssc=2020) + tc_queue.appendleft(command.pack_command_tuple()) + return tc_queue -def createTotalTcQueue() -> TcQueueT: +def create_total_tc_queue() -> TcQueueT: if not os.path.exists("log"): os.mkdir("log") - tcQueue = deque() - tcQueue = packService2TestInto(tcQueue) - tcQueue = packService5TestInto(tcQueue) - tcQueue = packService8TestInto(tcQueue) - tcQueue = packService9TestInto(tcQueue) - tcQueue = packService17TestInto(tcQueue) - tcQueue = packService200TestInto(tcQueue) - tcQueue = packDummyDeviceTestInto(tcQueue) + tc_queue = deque() + tc_queue = pack_service2_test_into(tc_queue) + tc_queue = pack_service5_test_into(tc_queue) + tc_queue = pack_service8_test_into(tc_queue) + tc_queue = pack_service9_test_into(tc_queue) + tc_queue = pack_service17_test_into(tc_queue) + tc_queue = pack_service200_test_into(tc_queue) + tc_queue = pack_dummy_device_test_into(tc_queue) # objectId = bytearray([0x44, 0x00, 0x1F, 0x00]) # tc_queue = packGpsTestInto(objectId, tc_queue) - return tcQueue + return tc_queue diff --git a/tc/obsw_tc_service2.py b/tc/obsw_tc_service2.py new file mode 100644 index 0000000000000000000000000000000000000000..b1c6151a487a903cc825a777571d2b459c27ad33 --- /dev/null +++ b/tc/obsw_tc_service2.py @@ -0,0 +1,63 @@ +# -*- coding: utf-8 -*- +""" +Program: obsw_module_test.py +Date: 01.11.2019 +Description: PUS Custom Service 8: Device Access, Native low-level commanding + +@author: R. Mueller +""" +import struct + +import config.obsw_config as g +from tc.obsw_pus_tc_base import PusTelecommand, Deque +from tc.obsw_tc_service200 import packModeData + + +def pack_service2_test_into(tc_queue: Deque, called_externally: bool = False) -> Deque: + if called_externally is False: + tc_queue.appendleft(("print", "Testing Service 2")) + object_id = g.DUMMY_DEVICE_ID # dummy device + # don't forget to set object mode raw (G_SERVICE 200) before calling this ! + # Set Raw Mode + tc_queue.appendleft(("print", "\r\nTesting Service 2: Setting Raw Mode")) + mode_data = packModeData(object_id, 3, 0) + command = PusTelecommand(service=200, subservice=1, ssc=2020, data=mode_data) + tc_queue.appendleft(command.pack_command_tuple()) + # toggle wiretapping raw + tc_queue.appendleft(("print", "\r\nTesting Service 2: Toggling Wiretapping Raw")) + wiretapping_toggle_data = pack_wiretapping_mode(object_id, 1) + toggle_wiretapping_on_command = PusTelecommand(service=2, subservice=129, ssc=200, + data=wiretapping_toggle_data) + tc_queue.appendleft(toggle_wiretapping_on_command.pack_command_tuple()) + # send raw command, _tm_data should be returned via TM[2,130] and TC[2,131] + tc_queue.appendleft(("print", "\r\nTesting Service 2: Sending Raw Command")) + raw_command = g.DUMMY_COMMAND_1 + raw_data = object_id + raw_command + raw_command = PusTelecommand(service=2, subservice=128, ssc=201, data=raw_data) + tc_queue.appendleft(raw_command.pack_command_tuple()) + # toggle wiretapping off + tc_queue.appendleft(("print", "\r\nTesting Service 2: Toggle Wiretapping Off")) + wiretapping_toggle_data = pack_wiretapping_mode(object_id, 0) + toggle_wiretapping_off_command = PusTelecommand(service=2, subservice=129, ssc=204, + data=wiretapping_toggle_data) + tc_queue.appendleft(toggle_wiretapping_off_command.pack_command_tuple()) + # send raw command which should be returned via TM[2,130] + tc_queue.appendleft(("print", "\r\nTesting Service 2: Send second raw command")) + command = PusTelecommand(service=2, subservice=128, ssc=205, data=raw_data) + tc_queue.appendleft(command.pack_command_tuple()) + tc_queue.appendleft(("print", "\r")) + if called_externally is False: + tc_queue.appendleft(("export", "log/tmtc_log_service2.txt")) + return tc_queue + + +# wiretappingMode = 0: MODE_OFF, wiretappingMode = 1: MODE_RAW +def pack_wiretapping_mode(object_id, wiretapping_mode_): + wiretapping_mode = struct.pack(">B", wiretapping_mode_) # MODE_OFF : 0x00, MODE_RAW: 0x01 + wiretapping_toggle_data = object_id + wiretapping_mode + return wiretapping_toggle_data + + +def pack_specific_service2_test_into(tcQueue, objectIdList, dataList, sscList, printStringList, + calledExternally=False): + pass diff --git a/tc/OBSW_TcService200.py b/tc/obsw_tc_service200.py similarity index 71% rename from tc/OBSW_TcService200.py rename to tc/obsw_tc_service200.py index 2c9fe91c682209c69e6d3936b1f7a43d2bd48921..654116acfbafebf62772d5d9d433f7d95f5b3df3 100644 --- a/tc/OBSW_TcService200.py +++ b/tc/obsw_tc_service200.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- """ -Program: OBSW_TcService200.py +Program: obsw_tc_service200.py Date: 01.11.2019 Description: PUS Custom Service 200: Mode Commanding @@ -13,34 +13,34 @@ Run script with -s <Service or Device> -m 3 with optional -p parameter for file """ from tc.obsw_pus_tc_base import PusTelecommand from tc.obsw_pus_tc_packer import TcQueueT -import config.OBSW_Config as g +import config.obsw_config as g import struct -def packService200TestInto(tcQueue: TcQueueT) -> TcQueueT: +def pack_service200_test_into(tcQueue: TcQueueT) -> TcQueueT: tcQueue.appendleft(("print", "Testing Service 200")) # Object ID: Dummy Device objectId = g.DUMMY_DEVICE_ID # Set On Mode tcQueue.appendleft(("print", "\r\nTesting Service 200: Set Mode On")) modeData = packModeData(objectId, 1, 0) - command = PusTelecommand(service=200, subservice=1, SSC=2000, data=modeData) - tcQueue.appendleft(command.packCommandTuple()) + command = PusTelecommand(service=200, subservice=1, ssc=2000, data=modeData) + tcQueue.appendleft(command.pack_command_tuple()) # Set Normal mode tcQueue.appendleft(("print", "\r\nTesting Service 200: Set Mode Normal")) modeData = packModeData(objectId, 2, 0) - command = PusTelecommand(service=200, subservice=1, SSC=2010, data=modeData) - tcQueue.appendleft(command.packCommandTuple()) + command = PusTelecommand(service=200, subservice=1, ssc=2010, data=modeData) + tcQueue.appendleft(command.pack_command_tuple()) # Set Raw Mode tcQueue.appendleft(("print", "\r\nTesting Service 200: Set Mode Raw")) modeData = packModeData(objectId, 3, 0) - command = PusTelecommand(service=200, subservice=1, SSC=2020, data=modeData) - tcQueue.appendleft(command.packCommandTuple()) + command = PusTelecommand(service=200, subservice=1, ssc=2020, data=modeData) + tcQueue.appendleft(command.pack_command_tuple()) # Set Off Mode tcQueue.appendleft(("print", "\r\nTesting Service 200: Set Mode Off")) modeData = packModeData(objectId, 0, 0) - command = PusTelecommand(service=200, subservice=1, SSC=2030, data=modeData) - tcQueue.appendleft(command.packCommandTuple()) + command = PusTelecommand(service=200, subservice=1, ssc=2030, data=modeData) + tcQueue.appendleft(command.pack_command_tuple()) tcQueue.appendleft(("export", "log/tmtc_log_service200.txt")) return tcQueue diff --git a/tc/obsw_tc_service3.py b/tc/obsw_tc_service3.py new file mode 100644 index 0000000000000000000000000000000000000000..c23309cfc30e0184ceb7ca016a5142336a32a127 --- /dev/null +++ b/tc/obsw_tc_service3.py @@ -0,0 +1,124 @@ +# -*- coding: utf-8 -*- +""" +Program: obsw_module_test.py +Date: 01.11.2019 +Description: PUS Custom Service 8: Device Access, Native low-level commanding + +@author: R. Mueller +""" +import struct +from typing import Deque +from tc.obsw_pus_tc_base import PusTelecommand + + +def pack_service3_test_into(tc_queue: Deque) -> Deque: + tc_queue.appendleft(("print", "Testing Service 3")) + # adding custom defintion to hk using test pool variables + sid1 = bytearray([0x00, 0x00, 0x43, 0x00]) + sid2 = bytearray([0x00, 0x00, 0x44, 0x00]) + sid_gps = bytearray([0x00, 0x00, 0x1f, 0x00]) + collection_interval = struct.pack('>f', 3) + number_of_parameters = struct.pack('B', 5) + p1 = bytearray([0x01, 0x01, 0x01, 0x01]) + p2 = bytearray([0x02, 0x02, 0x02, 0x02]) + p3 = bytearray([0x03, 0x03, 0x03, 0x03]) + p4 = bytearray([0x04, 0x04, 0x04, 0x04]) + p5 = bytearray([0x05, 0x05, 0x05, 0x05]) + hkDefinition1 = sid1 + collection_interval + number_of_parameters + p1 + p2 + p3 + p4 + p5 + collection_interval = struct.pack('>f', 6) + hkDefinition2 = sid2 + collection_interval + number_of_parameters + p1 + p2 + p3 + p4 + p5 + + # deleting pre-defined test entry + tc_queue.appendleft(("print", "\r\nTesting Service 3: Deleting pre-defined HK definition")) + command = PusTelecommand(service=3, subservice=3, ssc=3000, data=sid1) + tc_queue.appendleft(command.pack_command_tuple()) + + # adding pre-defined definition to hk using test pool variables + tc_queue.appendleft(("print", "\r\nTesting Service 3: Adding pre-defined HK definition")) + command = PusTelecommand(service=3, subservice=1, ssc=3010, data=hkDefinition1) + tc_queue.appendleft(command.pack_command_tuple()) + + # adding custom definition to diagnostics using test pool variables + tc_queue.appendleft(("print", "\r\nTesting Service 3: Adding custom diganostics definition")) + command = PusTelecommand(service=3, subservice=2, ssc=3020, data=hkDefinition2) + tc_queue.appendleft(command.pack_command_tuple()) + + # enable custom hk definition + tc_queue.appendleft(("print", "\r\nTesting Service 3: Enable custom definition")) + command = PusTelecommand(service=3, subservice=5, ssc=3030, data=sid1) + tc_queue.appendleft(command.pack_command_tuple()) + # enable custom diag definition + tc_queue.appendleft(("print", "\r\nTesting Service 3: Enable custom diagnostics definition")) + command = PusTelecommand(service=3, subservice=7, ssc=3040, data=sid2) + tc_queue.appendleft(command.pack_command_tuple()) + # enable gps0 + tc_queue.appendleft(("print", "\r\nTesting Service 3: Enable GPS definition")) + command = PusTelecommand(service=3, subservice=5, ssc=3050, data=sid_gps) + tc_queue.appendleft(command.pack_command_tuple()) + + # maybe wait a bit to receive at least 2 packets.. + tc_queue.appendleft(("wait", 3)) + + # Disable custom hk definition + tc_queue.appendleft(("print", "\r\nTesting Service 3: Disable custom definition")) + command = PusTelecommand(service=3, subservice=6, ssc=3060, data=sid1) + tc_queue.appendleft(command.pack_command_tuple()) + # Disable custom diag definition + tc_queue.appendleft(("print", "\r\nTesting Service 3: Disable custom diagnostics definition")) + command = PusTelecommand(service=3, subservice=8, ssc=3070, data=sid2) + tc_queue.appendleft(command.pack_command_tuple()) + # disable gps0 + tc_queue.appendleft(("print", "\r\nTesting Service 3: Disable GPS definition")) + command = PusTelecommand(service=3, subservice=6, ssc=3080, data=sid_gps) + tc_queue.appendleft(command.pack_command_tuple()) + # report custom Diag definition + tc_queue.appendleft(("print", "\r\nTesting Service 3: Reporting diag definition")) + command = PusTelecommand(service=3, subservice=11, ssc=3100, data=sid2) + tc_queue.appendleft(command.pack_command_tuple()) + # report gps definition + tc_queue.appendleft(("print", "\r\nTesting Service 3: Reporting GPS definition")) + command = PusTelecommand(service=3, subservice=9, ssc=3110, data=sid_gps) + tc_queue.appendleft(command.pack_command_tuple()) + # generate one custom hk definition + tc_queue.appendleft(("print", "\r\nTesting Service 3: Generate one custom hk definition")) + command = PusTelecommand(service=3, subservice=27, ssc=3120, data=sid1) + tc_queue.appendleft(command.pack_command_tuple()) + # generate one custom diag definition + tc_queue.appendleft(("print", "\r\nTesting Service 3: Generate one custom diagnostics definition")) + command = PusTelecommand(service=3, subservice=28, ssc=3120, data=sid2) + tc_queue.appendleft(command.pack_command_tuple()) + # generate one gps 0 definition + tc_queue.appendleft(("print", "\r\nTesting Service 3: Generate one gps 0 defintion")) + command = PusTelecommand(service=3, subservice=27, ssc=3120, data=sid_gps) + tc_queue.appendleft(command.pack_command_tuple()) + # modify custom hk definition interval + newInterval = struct.pack('>f', 10.0) + newIntervalCommand = sid1 + newInterval + tc_queue.appendleft(("print", "\r\nTesting Service 3: Changing pre-defined HK definition interval")) + command = PusTelecommand(service=3, subservice=31, ssc=3090, data=newIntervalCommand) + tc_queue.appendleft(command.pack_command_tuple()) + # report custom HK definition + tc_queue.appendleft(("print", "\r\nTesting Service 3: Reporting pre-defined HK definition with changed interval")) + command = PusTelecommand(service=3, subservice=9, ssc=3090, data=sid1) + tc_queue.appendleft(command.pack_command_tuple()) + # modify custom diag definition interval + newIntervalCommand = sid2 + newInterval + tc_queue.appendleft(("print", "\r\nTesting Service 3: Changing custom diag HK definition interval")) + command = PusTelecommand(service=3, subservice=32, ssc=3090, data=newIntervalCommand) + tc_queue.appendleft(command.pack_command_tuple()) + # report custom diag definition + tc_queue.appendleft(("print", "\r\nTesting Service 3: Reporting diag definition")) + command = PusTelecommand(service=3, subservice=11, ssc=3100, data=sid2) + tc_queue.appendleft(command.pack_command_tuple()) + # append parameter to custom hk definiton + # append parameter to custom diag definition + + # delete custom diag definition + tc_queue.appendleft(("print", "\r\nTesting Service 3: Deleting custom diagnostics definition")) + command = PusTelecommand(service=3, subservice=4, ssc=3120, data=sid2) + tc_queue.appendleft(command.pack_command_tuple()) + + # do some basic testing on predefined structs too + # e.g. add one variable, change interval, report them.... + tc_queue.appendleft(("export", "log/tmtc_log_service3.txt")) + return tc_queue diff --git a/tc/obsw_tc_service8.py b/tc/obsw_tc_service8.py new file mode 100644 index 0000000000000000000000000000000000000000..7e2d2cdbcce89175ee23ec1d4f82bf9994d456f4 --- /dev/null +++ b/tc/obsw_tc_service8.py @@ -0,0 +1,64 @@ +# -*- coding: utf-8 -*- +""" +Program: obsw_module_test.py +Date: 01.11.2019 +Description: PUS Custom Service 8: Function Management, High-Level Commanding + +@author: R. Mueller +""" +from typing import Deque + +import config.obsw_config as g +from tc.obsw_pus_tc_base import PusTelecommand +from tc.obsw_tc_service200 import packModeData + + +def pack_service8_test_into(tc_queue: Deque, called_externally: bool = False) -> Deque: + if called_externally is False: + tc_queue.appendleft(("print", "Testing Service 8")) + object_id = g.DUMMY_DEVICE_ID + + # set mode on + tc_queue.appendleft(("print", "\r\nTesting Service 8: Set On Mode")) + mode_data = packModeData(object_id, 1, 0) + command = PusTelecommand(service=200, subservice=1, ssc=800, data=mode_data) + tc_queue.appendleft(command.pack_command_tuple()) + + # set mode normal + tc_queue.appendleft(("print", "\r\nTesting Service 8: Set Normal Mode")) + mode_data = packModeData(object_id, 2, 0) + command = PusTelecommand(service=200, subservice=1, ssc=810, data=mode_data) + tc_queue.appendleft(command.pack_command_tuple()) + + # Direct command which triggers completion reply + tc_queue.appendleft(("print", "\r\nTesting Service 8: Trigger Completion Reply")) + action_id = g.DUMMY_COMMAND_1 + direct_command = object_id + action_id + command = PusTelecommand(service=8, subservice=128, ssc=820, data=direct_command) + tc_queue.appendleft(command.pack_command_tuple()) + + # Direct command which triggers _tm_data reply + tc_queue.appendleft(("print", "\r\nTesting Service 8: Trigger Data Reply")) + action_id = g.DUMMY_COMMAND_2 + command_param1 = g.DUMMY_COMMAND_2_PARAM_1 + command_param2 = g.DUMMY_COMMAND_2_PARAM_2 + direct_command = object_id + action_id + command_param1 + command_param2 + command = PusTelecommand(service=8, subservice=128, ssc=830, data=direct_command) + tc_queue.appendleft(command.pack_command_tuple()) + + # Direct command which triggers an additional step reply and one completion reply + tc_queue.appendleft(("print", "\r\nTesting Service 8: Trigger Step and Completion Reply")) + action_id = g.DUMMY_COMMAND_3 + direct_command = object_id + action_id + command = PusTelecommand(service=8, subservice=128, ssc=840, data=direct_command) + tc_queue.appendleft(command.pack_command_tuple()) + tc_queue.appendleft(("wait", 2)) + tc_queue.appendleft(("print", "\r")) + if called_externally is False: + tc_queue.appendleft(("export", "log/tmtc_log_service8.txt")) + return tc_queue + + +def pack_specific_service8_test_into(tcQueue, objectIdList, actionIdList, dataList, + sscList, printStringList, calledExternally=False): + pass diff --git a/test/obsw_module_test.py b/test/obsw_module_test.py index e194aef52caf5989e24b037f9c2d0de117e95777..39b6d088366f989b4ce2b90498147f33232dfb95 100644 --- a/test/obsw_module_test.py +++ b/test/obsw_module_test.py @@ -1,13 +1,15 @@ """ -Program: obsw_module_test.py -Date: 01.11.2019 -Description: Module/High-Level test of on-board software, used by the TMTC client in -software test mode. It is very difficult to execute Unit Tests on embedded hardware. +@file obsw_module_test.py +@date 01.11.2019 +@brief Module/High-Level test of on-board software, used by the TMTC client in + software test mode. +@details +It is very difficult to execute Unit Tests on embedded hardware. The most significant functonalities for satellites are commanding and TM reception. As such, these functionalities will be tested by sending TCs and checking the expected Telemetry. Still, we make use of the Unit Testing Framework provided by Python (with some hacks involved). -Manual: +@manual Set up the TMTC client as specified in the header comment and use the unit testing mode For Developers: @@ -27,21 +29,23 @@ TC source sequence count. For PUS standalone services (PUS Service Base), only the start and completion success need to be asserted. For PUS gateway services (PUS Commanding Service Base), step verification needs to be asserted additionally. If there are commands with multiple steps, this needs -to be specified in the analyseTcInfo method of the child test. +to be specified in the analyse_tc_info method of the child test. @author: R. Mueller """ +import sys import unittest +from abc import abstractmethod from collections import deque from typing import Deque -from tc.obsw_pus_tc_packer import packDummyDeviceTestInto +from config import obsw_config as g +from tc.obsw_pus_tc_packer import pack_dummy_device_test_into +from tc.obsw_pus_tc_base import PusTcInfoQueueT, TcDictionaryKeys from tm.obsw_tm_service_1 import pusPacketInfoService1T -from abc import abstractmethod +from tm.obsw_pus_tm_base import PusTmInfoQueueT, PusTmInfoT, TmDictionaryKeys from sendreceive.obsw_multiple_commands_sender_receiver import MultipleCommandSenderReceiver -from config import OBSW_Config as g -from tm.obsw_pus_tm_base import PusTmInfoQueueT, PusTmInfoT -from tc.obsw_pus_tc_base import PusTcInfoQueueT + TmInfoQueueService1T = Deque[pusPacketInfoService1T] @@ -74,7 +78,27 @@ class TestService(unittest.TestCase): cls.tmtc_printer = g.G_TMTC_PRINTER cls.tm_listener = g.G_TM_LISTENER cls.communication_interface = g.G_COM_INTERFACE - # connectToBoard() + + cls.tc_ssc_array = [] + cls.tc_service_array = [] + cls.tc_subservice_array = [] + # these number of TCs sent which need need to be verified + # separate step counter if there is more than one step for a command ! + cls.tc_verify_counter = 0 + cls.tc_verify_step_counter = 0 + + # These values are incremented in the analyseTmInfo function + # and compared to the counter values + cls.tc_verified_start = 0 + cls.tc_verified_completion = 0 + cls.tc_verified_step = 0 + + # The expected values are set in child test + cls.event_counter = 0 + cls.event_expected = 0 + cls.misc_expected = 0 + cls.misc_counter = 0 + cls.valid = True def perform_testing_and_generate_assertion_dict(self): """ @@ -89,130 +113,127 @@ class TestService(unittest.TestCase): """ module_tester = MultipleCommandSenderReceiver( com_interface=self.communication_interface, tmtc_printer=self.tmtc_printer, - tm_listener=self.tm_listener, tc_queue=self.test_queue, wait_intervals=self.wait_intervals, - wait_time=self.wait_time, print_tm=g.G_PRINT_RAW_TM) + tm_listener=self.tm_listener, tc_queue=self.test_queue, + wait_intervals=self.wait_intervals, wait_time=self.wait_time, print_tm=g.G_PRINT_RAW_TM) (tc_info_queue, tm_info_queue) = module_tester.send_tc_queue_and_return_info() assertion_dict = self._analyse_tm_tc_info(tm_info_queue, tc_info_queue) return assertion_dict - def _analyse_tm_tc_info( - self, tm_info_queue: PusTmInfoQueueT, tcInfoQueue: PusTcInfoQueueT) -> dict: - self.tcSscArray = [] - self.tcServiceArray = [] - self.tcSubserviceArray = [] - - # these number of TCs sent which need need to be verified - # separate step counter if there is more than one step for a command ! - self.tcVerifyCounter = 0 - self.tcVerifyStepCounter = 0 + def _analyse_tm_tc_info(self, tm_info_queue: PusTmInfoQueueT, + tc_info_queue: PusTcInfoQueueT) -> dict: # child test implements this # default implementation provided - self.analyseTcInfo(tcInfoQueue) + self.analyse_tc_info(tc_info_queue) - # These values are incremented in the analyseTmInfo function - # and compared to the counter values - self.tcVerifiedStart = 0 - self.tcVerifiedCompletion = 0 - self.tcVerifiedStep = 0 - # The expected values are set in child test - self.eventCounter = 0 - self.miscCounter = 0 - self.valid = True - - assertionDict = {} + assertion_dict = {} # child test implements this and can add additional dict entries - self.analyseTmInfo(tm_info_queue, assertionDict) - - assertionDict.update({ - "TcStartCount": self.tcVerifiedStart, - "TcCompletionCount": self.tcVerifiedCompletion, - "TcStepCount": self.tcVerifiedStep, - "EventCount": self.eventCounter, - "MiscCount": self.miscCounter, + self.analyse_tm_info(tm_info_queue, assertion_dict) + + assertion_dict.update({ + "TcStartCount": self.tc_verified_start, + "TcCompletionCount": self.tc_verified_completion, + "TcStepCount": self.tc_verified_step, + "EventCount": self.event_counter, + "MiscCount": self.misc_counter, "Valid": self.valid }) - return assertionDict + return assertion_dict - def analyseTcInfo(self, tcInfoQueue: PusTcInfoQueueT): - while not tcInfoQueue.__len__() == 0: - currentTcInfo = tcInfoQueue.pop() - self.tcVerifyCounter = self.tcVerifyCounter + 1 + def analyse_tc_info(self, tc_info_queue: PusTcInfoQueueT): + """ + Analyse the properties of the sent telecommands and fills them + into a list which will be used for analysis later. + """ + while not tc_info_queue.__len__() == 0: + current_tc_info = tc_info_queue.pop() + self.tc_verify_counter = self.tc_verify_counter + 1 # For commands with multiple steps, update this value manually ! - self.tcVerifyStepCounter = self.tcVerifyCounter + 1 - self.tcSscArray.append(currentTcInfo["ssc"]) - self.tcServiceArray.append(currentTcInfo["service"]) - self.tcSubserviceArray.append(currentTcInfo["subservice"]) + # Only service 8 generates a step reply. + if current_tc_info[TcDictionaryKeys.SERVICE] == 8: + self.tc_verify_step_counter += 1 + self.tc_ssc_array.append(current_tc_info[TcDictionaryKeys.SSC]) + self.tc_service_array.append(current_tc_info[TcDictionaryKeys.SERVICE]) + self.tc_subservice_array.append(current_tc_info[TcDictionaryKeys.SUBSERVICE]) - # must be implemented by child test ! @abstractmethod - def analyseTmInfo(self, tmInfoQueue: Deque, assertionDict: dict): - pass + def analyse_tm_info(self, tm_info_queue: Deque, assertion_dict: dict): + """ + In this function, the received TM information is analysed for correctness. + Must be implemented by child test ! + """ - # this function looks whether the tc verification SSC matched - # a SSC of the sent TM - def scanForRespectiveTc(self, currentTmInfo: PusTmInfoT): - currentSubservice = currentTmInfo["subservice"] - for possibleIndex, searchIndex in enumerate(self.tcSscArray): - if searchIndex == currentTmInfo["tcSSC"]: - if currentSubservice == 1: - self.tcVerifiedStart = self.tcVerifiedStart + 1 - elif currentSubservice == 5: - self.tcVerifiedStep = self.tcVerifiedStep + 1 - elif currentSubservice == 7: - self.tcVerifiedCompletion = self.tcVerifiedCompletion + 1 - - def _perform_generic_assertion_test(self, assertionDict: dict): - if assertionDict is None: + def scan_for_respective_tc(self, current_tm_info: PusTmInfoT): + """ + this function looks whether the tc verification SSC matched + a source sequence count of the sent TM + """ + current_subservice = current_tm_info[TmDictionaryKeys.SUBSERVICE] + for possible_index, search_index in enumerate(self.tc_ssc_array): + if search_index == current_tm_info[TmDictionaryKeys.TC_SSC]: + if current_subservice == 1: + self.tc_verified_start = self.tc_verified_start + 1 + elif current_subservice == 5: + self.tc_verified_step = self.tc_verified_step + 1 + elif current_subservice == 7: + self.tc_verified_completion = self.tc_verified_completion + 1 + + def _perform_generic_assertion_test(self, assertion_dict: dict): + if assertion_dict is None: print("Performing Generic Assertion Test: " "Configuratrion error, asseretion dictionary was not passed properly") - exit() - self.assertEqual(assertionDict["TcStartCount"], self.tcVerifyCounter) - self.assertEqual(assertionDict["TcCompletionCount"], self.tcVerifyCounter) - self.assertEqual(assertionDict["EventCount"], self.eventExpected) - self.assertEqual(assertionDict["MiscCount"], self.miscExpected) - self.assertTrue(assertionDict["Valid"]) + sys.exit() + self.assertEqual(assertion_dict["TcStartCount"], self.tc_verify_counter) + self.assertEqual(assertion_dict["TcCompletionCount"], self.tc_verify_counter) + self.assertEqual(assertion_dict["EventCount"], self.event_expected) + self.assertEqual(assertion_dict["MiscCount"], self.misc_expected) + self.assertTrue(assertion_dict["Valid"]) # these tests are identical - def performService5or17Test(self): - assertionDict = self.perform_testing_and_generate_assertion_dict() - self.eventExpected = 1 - self.miscExpected = 2 - self._perform_generic_assertion_test(assertionDict) - - def analyseService5or17TM(self, tmInfoQueue: PusTmInfoQueueT, assertionDict: dict): - self.miscCounter = 0 - while not tmInfoQueue.__len__() == 0: - currentTmInfo = tmInfoQueue.pop() + def _perform_service5or17_test(self): + assertion_dict = self.perform_testing_and_generate_assertion_dict() + self.event_expected = 1 + self.misc_expected = 2 + self._perform_generic_assertion_test(assertion_dict) + + def _analyse_service5or17_tm(self, tm_info_queue: PusTmInfoQueueT, assertion_dict: dict): + self.misc_counter = 0 + while not tm_info_queue.__len__() == 0: + current_tm_info = tm_info_queue.pop() # Tc verification scanning is generic and has been moved to the superclass - if currentTmInfo["service"] == 1: - self.scanForRespectiveTc(currentTmInfo) + if current_tm_info[TmDictionaryKeys.SERVICE] == 1: + self.scan_for_respective_tc(current_tm_info) # Here, the desired event Id or RID can be specified - if currentTmInfo["service"] == 5: - if currentTmInfo["EventID"] == 8200 and currentTmInfo["RID"] == 0x51001700: - self.eventCounter = self.eventCounter + 1 - if currentTmInfo["service"] == 17: - self.miscCounter = self.miscCounter + 1 - if currentTmInfo["valid"] == 0: + if current_tm_info[TmDictionaryKeys.SERVICE] == 5: + if (current_tm_info[TmDictionaryKeys.EVENT_ID] == 8200 and + current_tm_info[TmDictionaryKeys.REPORTER_ID] == 0x51001700): + + self.event_counter = self.event_counter + 1 + if current_tm_info[TmDictionaryKeys.SERVICE] == 17: + self.misc_counter = self.misc_counter + 1 + if current_tm_info[TmDictionaryKeys.VALID] == 0: self.valid = False - assertionDict.update({"MiscCount": self.miscCounter}) + assertion_dict.update({"MiscCount": self.misc_counter}) @classmethod def tearDownClass(cls): - cls.eventCounter = 0 - cls.tcVerifyCounter = 0 + cls.event_counter = 0 + cls.tc_verify_counter = 0 # noinspection PyUnresolvedReferences cls.test_queue.clear() class TestDummyDevice(TestService): + """ + Test for the dummy class. + """ @classmethod def setUpClass(cls): super().setUpClass() print("Testing Dummy Device") - packDummyDeviceTestInto(super().test_queue) + pack_dummy_device_test_into(super().test_queue) - def analyseTmInfo(self, tmInfoQueue, assertionDict): + def analyse_tm_info(self, tm_info_queue, assertion_dict): pass diff --git a/test/obsw_pus_service_test.py b/test/obsw_pus_service_test.py index 945b5e629f68c3e0db2cd75d497dbfd8886d592c..e59002dccb6a18f6b510aa4d09939536cb18b6ed 100644 --- a/test/obsw_pus_service_test.py +++ b/test/obsw_pus_service_test.py @@ -1,11 +1,24 @@ +""" +@file obsw_pus_service_test.py +@date 01.11.2019 +@brief Contains specific PUS service tests. +@author: R. Mueller +""" +import struct import unittest -from test.obsw_module_test import TestService, PusTmInfoQueueT +from typing import Deque + +from test.obsw_module_test import TestService, PusTmInfoQueueT, TmDictionaryKeys from tc.obsw_pus_tc_base import PusTcInfoQueueT -from tc.obsw_pus_tc_packer import packService17TestInto, packService5TestInto, packService2TestInto -import config.OBSW_Config as g +from tc.obsw_pus_tc_packer import pack_service17_test_into, pack_service5_test_into, \ + pack_service2_test_into, pack_service8_test_into +import config.obsw_config as g class TestService2(TestService): + """ + Test raw commanding service. + """ @classmethod def setUpClass(cls: TestService): super().setUpClass() @@ -13,36 +26,43 @@ class TestService2(TestService): # all commands must be sent sequentially, not as a burst cls.wait_intervals = [1, 2, 3, 4] cls.wait_time = [2, 2, 2, 3] - packService2TestInto(cls.test_queue) + pack_service2_test_into(cls.test_queue) - def test_Service2(self): + def test_service2(self): + """ + Tests the raw commanding service. + """ assertion_dict = self.perform_testing_and_generate_assertion_dict() - self.eventExpected = 1 - self.miscExpected = 4 + self.event_expected = 2 + self.misc_expected = 4 super()._perform_generic_assertion_test(assertion_dict) - def analyseTmInfo(self, tm_info_queue: PusTmInfoQueueT, assertion_dict: dict): + def analyse_tm_info(self, tm_info_queue: PusTmInfoQueueT, assertion_dict: dict): while not tm_info_queue.__len__() == 0: current_tm_info = tm_info_queue.pop() # Tc verification scanning is generic and has been moved to the superclass - if current_tm_info["service"] == 1: - super().scanForRespectiveTc(current_tm_info) + if current_tm_info[TmDictionaryKeys.SERVICE] == 1: + super().scan_for_respective_tc(current_tm_info) # Here, the desired event Id or RID can be specified - if current_tm_info["service"] == 5: + if current_tm_info[TmDictionaryKeys.SERVICE] == 5: # mode change - if current_tm_info["EventID"] == 7401 and current_tm_info["RID"] == 0x4400affe: - self.eventCounter = self.eventCounter + 1 - if current_tm_info["service"] == 200 and current_tm_info["subservice"] == 6: + if (current_tm_info[TmDictionaryKeys.REPORTER_ID] == + struct.unpack('>I', g.DUMMY_DEVICE_ID)[0]) \ + and (current_tm_info[TmDictionaryKeys.EVENT_ID] == 7401 or + current_tm_info[TmDictionaryKeys.EVENT_ID] == 7400): + self.event_counter = self.event_counter + 1 + if current_tm_info[TmDictionaryKeys.SERVICE] == 200 and \ + current_tm_info[TmDictionaryKeys.SUBSERVICE] == 6: # mode change confirmation - self.miscCounter = self.miscCounter + 1 + self.misc_counter += 1 # wiretapping messages - elif current_tm_info["service"] == 2 and current_tm_info["subservice"] == 130: - self.miscCounter = self.miscCounter + 1 - elif current_tm_info["service"] == 2 and current_tm_info["subservice"] == 131: - self.miscCounter = self.miscCounter + 1 - if current_tm_info["valid"] == 0: + elif current_tm_info[TmDictionaryKeys.SERVICE] == 2 and \ + (current_tm_info[TmDictionaryKeys.SUBSERVICE] == 130 or + current_tm_info[TmDictionaryKeys.SUBSERVICE] == 131): + self.misc_counter += 1 + if current_tm_info[TmDictionaryKeys.VALID] == 0: self.valid = False - assertion_dict.update({"MiscCount": self.miscCounter}) + assertion_dict.update({"MiscCount": self.misc_counter}) class TestService5(TestService): @@ -54,17 +74,17 @@ class TestService5(TestService): # This is required because the OBSW tasks runs with fixed sequences cls.wait_intervals = [1, 2, 3] cls.wait_time = [1.5, 2.0, 2.0] - packService5TestInto(cls.test_queue) + pack_service5_test_into(cls.test_queue) def test_Service5(self): # analyseTmInfo() and analyseTcInfo are called here - super().performService5or17Test() + super()._perform_service5or17_test() - def analyseTcInfo(self, tcInfoQueue: PusTcInfoQueueT): - super().analyseTcInfo(tcInfoQueue) + def analyse_tc_info(self, tc_info_queue: PusTcInfoQueueT): + super().analyse_tc_info(tc_info_queue) - def analyseTmInfo(self, tmInfoQueue: PusTmInfoQueueT, assertionDict: dict): - super().analyseService5or17TM(tmInfoQueue, assertionDict) + def analyse_tm_info(self, tm_info_queue: PusTmInfoQueueT, assertion_dict: dict): + super()._analyse_service5or17_tm(tm_info_queue, assertion_dict) @classmethod def tearDownClass(cls): @@ -77,10 +97,47 @@ class TestService6(unittest.TestCase): print("Hallo Test 6!") -class TestService8(unittest.TestCase): +class TestService8(TestService): + @classmethod - def setUpClass(cls): - print("Hallo Test 2!") + def setUpClass(cls: TestService): + super().setUpClass() + print("Testing Service 8") + cls.wait_intervals = [1, 2, 3, 4] + cls.wait_time = [1.5, 0.7, 0.7, 1.0] + cls.data_reply_count = 0 + pack_service8_test_into(cls.test_queue) + + def test_Service8(self): + + assertion_dict = self.perform_testing_and_generate_assertion_dict() + self.event_expected = 4 + self.misc_expected = 2 + self.data_reply_expected = 1 + # One reply generates an additional step. + self.tc_verify_step_counter += 1 + self.assertEqual(assertion_dict["TcStepCount"], self.tc_verify_step_counter) + + self.assertEqual(self.data_reply_count, self.data_reply_expected) + super()._perform_generic_assertion_test(assertion_dict) + + def analyse_tm_info(self, tm_info_queue: Deque, assertion_dict: dict): + while not tm_info_queue.__len__() == 0: + current_tm_info = tm_info_queue.pop() + if current_tm_info[TmDictionaryKeys.SERVICE] == 1: + self.scan_for_respective_tc(current_tm_info) + if current_tm_info[TmDictionaryKeys.SERVICE] == 5: + if (current_tm_info[TmDictionaryKeys.REPORTER_ID] == + struct.unpack('>I', g.DUMMY_DEVICE_ID)[0]) and \ + (current_tm_info[TmDictionaryKeys.EVENT_ID] == 7401 + or current_tm_info[TmDictionaryKeys.EVENT_ID] == 7400): + self.event_counter += 1 + if current_tm_info[TmDictionaryKeys.SERVICE] == 200: + # mode change confirmation + self.misc_counter += 1 + if (current_tm_info[TmDictionaryKeys.SERVICE] == 8 and + current_tm_info[TmDictionaryKeys.SUBSERVICE] == 130): + self.data_reply_count += 1 class TestService9(unittest.TestCase): @@ -97,22 +154,23 @@ class TestService17(TestService): cls.wait_intervals = [2] cls.wait_time = 2 cls.tm_timeout = g.G_TM_TIMEOUT - packService17TestInto(cls.test_queue) + pack_service17_test_into(cls.test_queue) def test_Service17(self): - super().performService5or17Test() + super()._perform_service5or17_test() - def _analyse_tm_tc_info(self, tmInfoQueue: PusTmInfoQueueT, tcInfoQueue: PusTcInfoQueueT): - assertionDict = super()._analyse_tm_tc_info(tm_info_queue=tmInfoQueue, tcInfoQueue=tcInfoQueue) + def _analyse_tm_tc_info(self, tm_info_queue: PusTmInfoQueueT, tc_info_queue: PusTcInfoQueueT): + assertion_dict = super()._analyse_tm_tc_info(tm_info_queue=tm_info_queue, + tc_info_queue=tc_info_queue) # add anything elsee other than tc verification counter # and ssc that is needed for tm analysis - return assertionDict + return assertion_dict - def analyseTcInfo(self, tcInfoQueue): - super().analyseTcInfo(tcInfoQueue) + def analyse_tc_info(self, tc_info_queue): + super().analyse_tc_info(tc_info_queue) - def analyseTmInfo(self, tmInfoQueue: PusTmInfoQueueT, assertionDict: dict): - super().analyseService5or17TM(tmInfoQueue, assertionDict) + def analyse_tm_info(self, tm_info_queue: PusTmInfoQueueT, assertion_dict: dict): + super()._analyse_service5or17_tm(tm_info_queue, assertion_dict) @classmethod def tearDownClass(cls): diff --git a/tm/obsw_pus_tm_base.py b/tm/obsw_pus_tm_base.py index 60a47357465d28070e0ab5827674816a460b4400..0915f8e5d7bad4d7e20b00c68a700f9527a0bf89 100644 --- a/tm/obsw_pus_tm_base.py +++ b/tm/obsw_pus_tm_base.py @@ -4,13 +4,34 @@ @author: R.Mueller, S. Gaisser """ import datetime +from enum import Enum, auto from typing import TypeVar, Dict, Tuple, Deque, List, Final from crcmod import crcmod +class TmDictionaryKeys(Enum): + SERVICE = auto() + SUBSERVICE = auto() + SUBCOUNTER = auto() + SSC = auto() + DATA = auto() + CRC = auto() + VALID = auto() + # Service 1 + TC_PACKET_ID = auto() + TC_SSC = auto() + ERROR_CODE = auto() + STEP_NUMBER = auto() + # Service 5 + EVENT_ID = auto() + REPORTER_ID = auto() + EVENT_PARAM_1 = auto() + EVENT_PARAM_2 = auto() + + PusTmT = TypeVar('PusTmT', bound='PUSTelemetry') -PusTmInfoT = Dict[str, any] +PusTmInfoT = Dict[TmDictionaryKeys, any] PusTmTupleT = Tuple[PusTmInfoT, PusTmT] PusTmQueueT = Deque[PusTmT] @@ -59,12 +80,12 @@ class PusTelemetry: :return: TM dictionary """ tm_information = { - "service": self.get_service(), - "subservice": self.get_subservice(), - "ssc": self.get_ssc(), - "data": self._tm_data, - "crc": self._crc, - "valid": self._pus_header.valid + TmDictionaryKeys.SERVICE: self.get_service(), + TmDictionaryKeys.SUBSERVICE: self.get_subservice(), + TmDictionaryKeys.SSC: self.get_ssc(), + TmDictionaryKeys.DATA: self._tm_data, + TmDictionaryKeys.CRC: self._crc, + TmDictionaryKeys.VALID: self._pus_header.valid } return tm_information @@ -107,7 +128,7 @@ class PusTelemetry: self._data_field_header.append_data_field_header_column_header(header_list) self._pus_header.append_pus_packet_header_column_headers(header_list) - def get_raw_packet(self) -> bytearray: + def get_raw_packet(self) -> bytes: """ Get the whole TM wiretapping_packet as a bytearray (raw) :return: TM wiretapping_packet diff --git a/tm/obsw_pus_tm_factory.py b/tm/obsw_pus_tm_factory.py index 55bbc7c7b6b2e4ffa65ef3ae7893bb6466fa7b2f..66c70eff2e44a756d1624886cdc5673fe10b9096 100644 --- a/tm/obsw_pus_tm_factory.py +++ b/tm/obsw_pus_tm_factory.py @@ -13,10 +13,10 @@ from tm.obsw_tm_service_5 import Service5TM import struct -def PusTelemetryFactory(raw_packet: bytes): +def pus_telemetry_factory(raw_packet: bytes): """ - Returns a PusTelemetry class instance by extracting the service directly from the raw wiretapping_packet. - Sneaky solution allows this function to immediately return + Returns a PusTelemetry class instance by extracting the service directly from the + raw wiretapping_packet. Sneaky solution allows this function to immediately return the right telemetry wiretapping_packet :param raw_packet: :return: @@ -24,26 +24,27 @@ def PusTelemetryFactory(raw_packet: bytes): service_type = raw_packet[7] if service_type == 1: return Service1TM(raw_packet) - elif service_type == 2: + if service_type == 2: return Service2TM(raw_packet) - elif service_type == 3: + if service_type == 3: return Service3TM(raw_packet) - elif service_type == 5: + if service_type == 5: return Service5TM(raw_packet) - elif service_type == 8: + if service_type == 8: return Service8TM(raw_packet) - elif service_type == 17: + if service_type == 17: return Service17TM(raw_packet) - elif service_type == 200: + if service_type == 200: return Service200TM(raw_packet) - else: - print("The service " + str(service_type) + " is not implemented in Telemetry Factory") - return PusTelemetry(raw_packet) - + print("The service " + str(service_type) + " is not implemented in Telemetry Factory") + return PusTelemetry(raw_packet) + + class Service2TM(PusTelemetry): - def __init__(self, byteArray): - super().__init__(byteArray) + def __init__(self, byte_array: bytes): + super().__init__(byte_array) + self.specify_packet_info("Raw Commanding Reply") def append_telemetry_content(self, array): super().append_telemetry_content(array) @@ -57,6 +58,7 @@ class Service2TM(PusTelemetry): class Service8TM(PusTelemetry): def __init__(self, byteArray): super().__init__(byteArray) + self.specify_packet_info("Direct Commanding Reply") def append_telemetry_content(self, array): super().append_telemetry_content(array) @@ -68,21 +70,22 @@ class Service8TM(PusTelemetry): class Service9TM(PusTelemetry): - def __init__(self, byteArray): - super().__init__(byteArray) + def __init__(self, byte_array): + super().__init__(byte_array) + self.specify_packet_info("Time Service Reply") - def append_telemetry_content(self, array): + def append_telemetry_content(self, array: list): super().append_telemetry_content(array) return - def append_telemetry_column_headers(self, array): - super().append_telemetry_column_headers(array) + def append_telemetry_column_headers(self, column_header: list): + super().append_telemetry_column_headers(column_header) return class Service17TM(PusTelemetry): - def __init__(self, byteArray): - super().__init__(byteArray) + def __init__(self, byte_array): + super().__init__(byte_array) self.specify_packet_info("Test Reply") def append_telemetry_content(self, array): @@ -95,8 +98,8 @@ class Service17TM(PusTelemetry): class Service200TM(PusTelemetry): - def __init__(self, byteArray): - super().__init__(byteArray) + def __init__(self, byte_array): + super().__init__(byte_array) self.isCantReachModeReply = False self.isModeReply = False self.specify_packet_info("Mode Reply") diff --git a/tm/obsw_tm_service_1.py b/tm/obsw_tm_service_1.py index 9b5b7ab7ff25cb205a4c75e12319ffa3d5b5b9f4..ec4211ba2aec5076fcc4ddfddaa8d46a7574f481 100644 --- a/tm/obsw_tm_service_1.py +++ b/tm/obsw_tm_service_1.py @@ -5,78 +5,82 @@ Date: 30.12.2019 Description: Deserialize Pus Verification TM Author: R. Mueller """ -from tm.obsw_pus_tm_base import PusTelemetry -from typing import Dict import struct +from typing import Dict + +from tm.obsw_pus_tm_base import PusTelemetry, TmDictionaryKeys + -pusPacketInfoService1T = Dict[str, any] +pusPacketInfoService1T = Dict[TmDictionaryKeys, any] class Service1TM(PusTelemetry): - def __init__(self, byteArray): - super().__init__(byteArray) - self.tcErrorCode = False - self.isStep = False + def __init__(self, byte_array: bytes): + super().__init__(byte_array) + self.has_tc_error_code = False + self.is_step_reply = False # Failure Reports with error code + self.err_code = 0 + self.step_number = 0 self.tcPacketId = self._tm_data[0] << 8 | self._tm_data[1] self.tcSSC = ((self._tm_data[2] & 0x3F) << 8) | self._tm_data[3] self.specify_packet_info("Success Verification") if self.get_subservice() % 2 == 0: self.specify_packet_info("Failure Verficiation") - self.tcErrorCode = True + self.has_tc_error_code = True if self._data_field_header.subtype == 6: - self.isStep = True + self.is_step_reply = True self.append_packet_info(" : Step Failure") - self.stepNumber = struct.unpack('>B', self._tm_data[4:5])[0] - self.ErrCode = struct.unpack('>H', self._tm_data[5:7])[0] + self.step_number = struct.unpack('>B', self._tm_data[4:5])[0] + self.err_code = struct.unpack('>H', self._tm_data[5:7])[0] self.errorParam1 = struct.unpack('>I', self._tm_data[7:11])[0] self.errorParam2 = struct.unpack('>I', self._tm_data[11:15])[0] else: self.print_data() - self.ErrCode = struct.unpack('>H', self._tm_data[4:6])[0] + self.err_code = struct.unpack('>H', self._tm_data[4:6])[0] self.errorParam1 = struct.unpack('>I', self._tm_data[6:10])[0] self.errorParam2 = struct.unpack('>I', self._tm_data[10:14])[0] elif self.get_subservice() == 5: - self.isStep = True + self.is_step_reply = True self.append_packet_info(" : Step Success") - self.stepNumber = struct.unpack('>B', self._tm_data[4:5])[0] + self.step_number = struct.unpack('>B', self._tm_data[4:5])[0] def append_telemetry_content(self, array): super().append_telemetry_content(array) array.append(str(hex(self.tcPacketId))) array.append(str(self.tcSSC)) - if self.tcErrorCode: - if self.isStep: - array.append(str(self.stepNumber)) - array.append(str(hex(self.ErrCode))) + if self.has_tc_error_code: + if self.is_step_reply: + array.append(str(self.step_number)) + array.append(str(hex(self.err_code))) array.append(str(hex(self.errorParam1)) + ", " + str(self.errorParam1)) array.append(str(hex(self.errorParam2)) + ", " + str(self.errorParam2)) - elif self.isStep: - array.append(str(self.stepNumber)) + elif self.is_step_reply: + array.append(str(self.step_number)) def append_telemetry_column_headers(self, array): super().append_telemetry_column_headers(array) array.append("TC Packet ID") array.append("TC SSC") - if self.tcErrorCode: - if self.isStep: + if self.has_tc_error_code: + if self.is_step_reply: array.append("Step Number") array.append("Return Value") array.append("Error Param 1") array.append("Error Param 2") - elif self.isStep: + elif self.is_step_reply: array.append("Step Number") def pack_tm_information(self) -> pusPacketInfoService1T: - tmInformation = super().pack_tm_information() - addInformation = { - "tcPacketId": self.tcPacketId, - "tcSSC": self.tcSSC, + tm_information = super().pack_tm_information() + add_information = { + TmDictionaryKeys.TC_PACKET_ID: self.tcPacketId, + TmDictionaryKeys.TC_SSC: self.tcSSC, } - tmInformation.update(addInformation) - if self.tcErrorCode: - tmInformation.update({"errCode": self.ErrCode}) - if self.isStep: - tmInformation.update({"stepNumber": self.ErrCode}) - return tmInformation + tm_information.update(add_information) + if self.has_tc_error_code: + tm_information.update({TmDictionaryKeys.ERROR_CODE: self.err_code}) + if self.is_step_reply: + tm_information.update({TmDictionaryKeys.STEP_NUMBER: self.step_number}) + return tm_information diff --git a/tm/obsw_tm_service_5.py b/tm/obsw_tm_service_5.py index 4df7aae5770e130bf0dcce11ef0f98d4a9e056c6..ad44eaeb2fb0e4de3e60c1dd320b06af46b3b443 100644 --- a/tm/obsw_tm_service_5.py +++ b/tm/obsw_tm_service_5.py @@ -6,13 +6,13 @@ Description: Deserialize PUS Event Report Author: R. Mueller """ -from tm.obsw_pus_tm_base import PusTelemetry +from tm.obsw_pus_tm_base import PusTelemetry, TmDictionaryKeys, PusTmInfoT import struct class Service5TM(PusTelemetry): - def __init__(self, byteArray): - super().__init__(byteArray) + def __init__(self, byte_array): + super().__init__(byte_array) self.specify_packet_info("Event") if self.get_subservice() == 1: self.append_packet_info(" Info") @@ -41,13 +41,13 @@ class Service5TM(PusTelemetry): array.append("Parameter 1") array.append("Parameter 2") - def pack_tm_information(self): - tmInformation = super().pack_tm_information() - addInformation = { - "RID": self.objectId, - "EventID": self.eventId, - "Param1": self.param1, - "Param2": self.param2 + def pack_tm_information(self) -> PusTmInfoT: + tm_information = super().pack_tm_information() + add_information = { + TmDictionaryKeys.REPORTER_ID: self.objectId, + TmDictionaryKeys.EVENT_ID: self.eventId, + TmDictionaryKeys.EVENT_PARAM_1: self.param1, + TmDictionaryKeys.EVENT_PARAM_2: self.param2 } - tmInformation.update(addInformation) - return tmInformation + tm_information.update(add_information) + return tm_information diff --git a/utility/obsw_tmtc_printer.py b/utility/obsw_tmtc_printer.py index 74aeef33632c198bb88e95ae4e31251221204c8e..477a9dc21af7de9b639824c0a4fe0a3fc1d0bedc 100644 --- a/utility/obsw_tmtc_printer.py +++ b/utility/obsw_tmtc_printer.py @@ -2,7 +2,7 @@ # -*- coding: utf-8 -*- """ @file - OBSW_Config.py + obsw_config.py @date 01.11.2019 @brief @@ -11,10 +11,10 @@ import os import sys from typing import TypeVar -from config import OBSW_Config as g +from config import obsw_config as g from tm.obsw_pus_tm_base import PusTmT, PusTelemetry from tm.obsw_tm_service_3 import PusTm3T -from tc.obsw_pus_tc_base import PusTcT, PusTcInfoT +from tc.obsw_pus_tc_base import PusTcT, PusTcInfoT, TcDictionaryKeys TmTcPrinterT = TypeVar('TmTcPrinterT', bound='TmTcPrinter') @@ -30,6 +30,9 @@ class TmTcPrinter: :param do_print_to_file: if true, print to file :param print_tc: if true, print TCs """ + # TODO: we should implement a list of strings here. each service test string + # is written into a list entry. For the file output, the list entries are concatenated + # and put into the main log file. self.print_buffer = "" # global print buffer which will be useful to print something to file self.file_buffer = "" @@ -277,9 +280,9 @@ class TmTcPrinter: :param tc_packet_info: :return: """ - self.print_buffer = "Sent TC[" + str(tc_packet_info["service"]) + "," + \ - str(tc_packet_info["subservice"]) + "] " + " with SSC " + \ - str(tc_packet_info["ssc"]) + self.print_buffer = "Sent TC[" + str(tc_packet_info[TcDictionaryKeys.SERVICE]) + "," + \ + str(tc_packet_info[TcDictionaryKeys.SUBSERVICE]) + "] " + " with SSC " + \ + str(tc_packet_info[TcDictionaryKeys.SSC]) print(self.print_buffer) self.add_print_buffer_to_file_buffer() @@ -290,10 +293,12 @@ class TmTcPrinter: :return: """ try: - self.print_buffer = "Telecommand TC[" + str(tc_packet_info["service"]) + "," + \ - str(tc_packet_info["subservice"]) + "] with SSC " + \ - str(tc_packet_info["ssc"]) + " sent with data " + \ - self.return_data_string(tc_packet_info["data"]) + self.print_buffer = \ + "Telecommand TC[" + str(tc_packet_info[TcDictionaryKeys.SERVICE]) + "," + \ + str(tc_packet_info[TcDictionaryKeys.SUBSERVICE]) + "] with SSC " + \ + str(tc_packet_info[TcDictionaryKeys.SSC]) + " sent with data " + \ + self.return_data_string(tc_packet_info[TcDictionaryKeys.DATA]) + print(self.print_buffer) self.add_print_buffer_to_file_buffer() except TypeError: