diff --git a/.idea/runConfigurations/OBSW_TmTcClient_Test.xml b/.idea/runConfigurations/OBSW_TmTcClient_Test.xml deleted file mode 100644 index 5962769e4ef805d011cf162f3f98f4265b33c95f..0000000000000000000000000000000000000000 --- a/.idea/runConfigurations/OBSW_TmTcClient_Test.xml +++ /dev/null @@ -1,24 +0,0 @@ -<component name="ProjectRunConfigurationManager"> - <configuration default="false" name="OBSW_TmTcClient Test" type="PythonConfigurationType" factoryName="Python"> - <module name="tmtc" /> - <option name="INTERPRETER_OPTIONS" value="" /> - <option name="PARENT_ENVS" value="true" /> - <envs> - <env name="PYTHONUNBUFFERED" value="1" /> - </envs> - <option name="SDK_HOME" value="" /> - <option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" /> - <option name="IS_MODULE_SDK" value="true" /> - <option name="ADD_CONTENT_ROOTS" value="true" /> - <option name="ADD_SOURCE_ROOTS" value="true" /> - <EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" /> - <option name="SCRIPT_NAME" value="C:\Users\Robin\NoSyncDokumente\sourceobsw\tmtc\obsw_tmtc_client.py" /> - <option name="PARAMETERS" value="-h" /> - <option name="SHOW_COMMAND_LINE" value="false" /> - <option name="EMULATE_TERMINAL" value="false" /> - <option name="MODULE_MODE" value="false" /> - <option name="REDIRECT_INPUT" value="false" /> - <option name="INPUT_FILE" value="" /> - <method v="2" /> - </configuration> -</component> \ No newline at end of file diff --git a/.idea/runConfigurations/OBSW_TmTcClient_GUI.xml b/.idea/runConfigurations/tmtcclient_GUI.xml similarity index 89% rename from .idea/runConfigurations/OBSW_TmTcClient_GUI.xml rename to .idea/runConfigurations/tmtcclient_GUI.xml index 571fa9aa2fbe026458906f0fd04b1c0c78ad3038..6ae7919b09fd2b1514488e8ab73c7b964ebf0c64 100644 --- a/.idea/runConfigurations/OBSW_TmTcClient_GUI.xml +++ b/.idea/runConfigurations/tmtcclient_GUI.xml @@ -1,5 +1,5 @@ <component name="ProjectRunConfigurationManager"> - <configuration default="false" name="OBSW_TmTcClient GUI" type="PythonConfigurationType" factoryName="Python"> + <configuration default="false" name="tmtcclient GUI" type="PythonConfigurationType" factoryName="Python"> <module name="tmtc" /> <option name="INTERPRETER_OPTIONS" value="" /> <option name="PARENT_ENVS" value="true" /> diff --git a/.idea/runConfigurations/OBSW_TmTcClient_Help.xml b/.idea/runConfigurations/tmtcclient_Help.xml similarity index 80% rename from .idea/runConfigurations/OBSW_TmTcClient_Help.xml rename to .idea/runConfigurations/tmtcclient_Help.xml index 0568677bccad9898e4a78bd0f89940f361c7f402..683cda5aa7276a7e5321b3ba9c6028681505b480 100644 --- a/.idea/runConfigurations/OBSW_TmTcClient_Help.xml +++ b/.idea/runConfigurations/tmtcclient_Help.xml @@ -1,5 +1,5 @@ <component name="ProjectRunConfigurationManager"> - <configuration default="false" name="OBSW_TmTcClient Help" type="PythonConfigurationType" factoryName="Python"> + <configuration default="false" name="tmtcclient Help" type="PythonConfigurationType" factoryName="Python"> <module name="tmtc" /> <option name="INTERPRETER_OPTIONS" value="" /> <option name="PARENT_ENVS" value="true" /> @@ -12,7 +12,7 @@ <option name="ADD_CONTENT_ROOTS" value="true" /> <option name="ADD_SOURCE_ROOTS" value="true" /> <EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" /> - <option name="SCRIPT_NAME" value="C:\Users\Robin\NoSyncDokumente\sourceobsw\tmtc\obsw_tmtc_client.py" /> + <option name="SCRIPT_NAME" value="C:/Users/Robin/NoSyncDokumente/sourceobsw/tmtc/obsw_tmtc_client.py" /> <option name="PARAMETERS" value="-h" /> <option name="SHOW_COMMAND_LINE" value="false" /> <option name="EMULATE_TERMINAL" value="false" /> diff --git a/.idea/runConfigurations/OBSW_UdpClient_Software_Testmode.xml b/.idea/runConfigurations/tmtcclient_Moduletest_UDP.xml similarity index 77% rename from .idea/runConfigurations/OBSW_UdpClient_Software_Testmode.xml rename to .idea/runConfigurations/tmtcclient_Moduletest_UDP.xml index 7342a05ba2a74ed1ac28499da17c04c41e724789..e9db0cc4278724bf230dedc643bf96f0c0949106 100644 --- a/.idea/runConfigurations/OBSW_UdpClient_Software_Testmode.xml +++ b/.idea/runConfigurations/tmtcclient_Moduletest_UDP.xml @@ -1,5 +1,5 @@ <component name="ProjectRunConfigurationManager"> - <configuration default="false" name="OBSW_UdpClient Software Testmode" type="PythonConfigurationType" factoryName="Python" folderName="UDP Communication"> + <configuration default="false" name="tmtcclient Moduletest UDP" type="PythonConfigurationType" factoryName="Python" folderName="UDP Communication"> <module name="tmtc" /> <option name="INTERPRETER_OPTIONS" value="" /> <option name="PARENT_ENVS" value="true" /> @@ -12,7 +12,7 @@ <option name="ADD_CONTENT_ROOTS" value="true" /> <option name="ADD_SOURCE_ROOTS" value="true" /> <EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" /> - <option name="SCRIPT_NAME" value="C:\Users\Robin\NoSyncDokumente\sourceobsw\tmtc\obsw_tmtc_client.py" /> + <option name="SCRIPT_NAME" value="C:/Users/Robin/NoSyncDokumente/sourceobsw/tmtc/obsw_tmtc_client.py" /> <option name="PARAMETERS" value="-m 4 -p" /> <option name="SHOW_COMMAND_LINE" value="false" /> <option name="EMULATE_TERMINAL" value="false" /> diff --git a/.idea/runConfigurations/OBSW_UdpClient_Service_17_Test.xml b/.idea/runConfigurations/tmtcclient_Service_17_UDP.xml similarity index 78% rename from .idea/runConfigurations/OBSW_UdpClient_Service_17_Test.xml rename to .idea/runConfigurations/tmtcclient_Service_17_UDP.xml index b882acf9c17dc569a53194046d63ae570f51c104..22544e608f0a2880084fd6eb8202eaaf8c8c8c0a 100644 --- a/.idea/runConfigurations/OBSW_UdpClient_Service_17_Test.xml +++ b/.idea/runConfigurations/tmtcclient_Service_17_UDP.xml @@ -1,5 +1,5 @@ <component name="ProjectRunConfigurationManager"> - <configuration default="false" name="OBSW_UdpClient Service 17 Test" type="PythonConfigurationType" factoryName="Python" folderName="UDP Communication"> + <configuration default="false" name="tmtcclient Service 17 UDP" type="PythonConfigurationType" factoryName="Python" folderName="UDP Communication"> <module name="tmtc" /> <option name="INTERPRETER_OPTIONS" value="" /> <option name="PARENT_ENVS" value="true" /> @@ -12,7 +12,7 @@ <option name="ADD_CONTENT_ROOTS" value="true" /> <option name="ADD_SOURCE_ROOTS" value="true" /> <EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" /> - <option name="SCRIPT_NAME" value="C:\Users\Robin\NoSyncDokumente\sourceobsw\tmtc\obsw_tmtc_client.py" /> + <option name="SCRIPT_NAME" value="C:/Users/Robin/NoSyncDokumente/sourceobsw/tmtc/obsw_tmtc_client.py" /> <option name="PARAMETERS" value="-m 3 -s 17 -p -t 5" /> <option name="SHOW_COMMAND_LINE" value="false" /> <option name="EMULATE_TERMINAL" value="true" /> diff --git a/.idea/runConfigurations/OBSW_TmTcClient__Service_200_Serial.xml b/.idea/runConfigurations/tmtcclient__Service_200_Serial.xml similarity index 80% rename from .idea/runConfigurations/OBSW_TmTcClient__Service_200_Serial.xml rename to .idea/runConfigurations/tmtcclient__Service_200_Serial.xml index 3ece0d3eaf7e29f8e12d3315e5112a8c94130d53..ec2175bfadade9795139b22a62d815dd0ffa752e 100644 --- a/.idea/runConfigurations/OBSW_TmTcClient__Service_200_Serial.xml +++ b/.idea/runConfigurations/tmtcclient__Service_200_Serial.xml @@ -1,5 +1,5 @@ <component name="ProjectRunConfigurationManager"> - <configuration default="false" name="OBSW_TmTcClient Service 200 Serial" type="PythonConfigurationType" factoryName="Python" folderName="Serial Communication"> + <configuration default="false" name="tmtcclient Service 200 Serial" type="PythonConfigurationType" factoryName="Python" folderName="Serial Communication"> <module name="tmtc" /> <option name="INTERPRETER_OPTIONS" value="" /> <option name="PARENT_ENVS" value="true" /> @@ -13,7 +13,7 @@ <option name="ADD_SOURCE_ROOTS" value="true" /> <EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" /> <option name="SCRIPT_NAME" value="$PROJECT_DIR$/obsw_tmtc_client.py" /> - <option name="PARAMETERS" value="-m 3 -s 200 -c 1 --hk -t 5" /> + <option name="PARAMETERS" value="-m 3 -s 200 -c 1 --hk -t 3" /> <option name="SHOW_COMMAND_LINE" value="false" /> <option name="EMULATE_TERMINAL" value="false" /> <option name="MODULE_MODE" value="false" /> diff --git a/obsw_tmtc_client.py b/obsw_tmtc_client.py index 0dd7b1bdee03ac7da5bc4930f7e934d0289aa184..b31983de2ffabefff6e49af57e331f0882c88e8e 100755 --- a/obsw_tmtc_client.py +++ b/obsw_tmtc_client.py @@ -195,7 +195,7 @@ class TmTcHandler: elif self.mode == g.ModeList.SoftwareTestMode: all_tc_queue = create_total_tc_queue() - LOGGER.info("Performing multjple service commands operation") + LOGGER.info("Performing multiple service commands operation") sender_and_receiver = SequentialCommandSenderReceiver( com_interface=communication_interface, tmtc_printer=tmtc_printer, tc_queue=all_tc_queue, tm_listener=tm_listener) @@ -217,5 +217,6 @@ class TmTcHandler: logging.error("Unknown Mode, Configuration error !") sys.exit() + if __name__ == "__main__": main() diff --git a/sendreceive/obsw_multiple_commands_sender_receiver.py b/sendreceive/obsw_multiple_commands_sender_receiver.py index 8531800d3531a5ee391a4c5a7a180eb234a6198a..274204c3d182ca459b708717e17a8ac044bd5cf5 100644 --- a/sendreceive/obsw_multiple_commands_sender_receiver.py +++ b/sendreceive/obsw_multiple_commands_sender_receiver.py @@ -16,6 +16,7 @@ from utility.obsw_tmtc_printer import get_logger LOGGER = get_logger() + class MultipleCommandSenderReceiver(SequentialCommandSenderReceiver): """ Difference to seqential sender: This class can send TCs in bursts. @@ -43,7 +44,7 @@ class MultipleCommandSenderReceiver(SequentialCommandSenderReceiver): self.waitTime = wait_time self.printTm = print_tm self.tm_packet_queue = deque() - self.tcInfoQueue = deque() + self.tc_info_queue = deque() self.pusPacketInfo = [] self.pusPacket = [] self.waitCounter = 0 @@ -55,17 +56,19 @@ class MultipleCommandSenderReceiver(SequentialCommandSenderReceiver): # TC info queue is set in this function self.__send_all_queue() self.wait_for_last_replies_listening(self._tm_timeout / 1.4) - self.tm_packet_queue = self._tm_listener.retrieve_tm_packet_queue() + # Get a copy of the queue, otherwise we will lose the data. + tm_packet_queue_list = self._tm_listener.retrieve_tm_packet_queue().copy() + if g.G_PRINT_TM: + self.print_tm_queue(self._tm_listener.retrieve_tm_packet_queue()) self._tm_listener.clear_tm_packet_queue() self._tm_listener.event_mode_op_finished.set() if g.G_PRINT_TO_FILE: self._tmtc_printer.print_to_file() + return self.tc_info_queue, tm_packet_queue_list except (KeyboardInterrupt, SystemExit): LOGGER.info("Closing TMTC Client") sys.exit() - return self.tcInfoQueue, self.tm_packet_queue - def __handle_tc_resending(self): while not self.__all_replies_received: if self._tc_queue.__len__ == 0: @@ -81,7 +84,7 @@ class MultipleCommandSenderReceiver(SequentialCommandSenderReceiver): tc_queue_tuple = self._tc_queue.pop() if self.check_queue_entry(tc_queue_tuple): pus_packet, pus_packet_info = tc_queue_tuple - self.tcInfoQueue.append(pus_packet_info) + self.tc_info_queue.append(pus_packet_info) self._com_interface.send_telecommand(pus_packet, pus_packet_info) self.__handle_waiting() diff --git a/sendreceive/obsw_sequential_sender_receiver.py b/sendreceive/obsw_sequential_sender_receiver.py index 782c41a08023ebe3b9dc8629660a4d633795c4a6..1d107ce7281549071c3078daffc98082debc68b1 100644 --- a/sendreceive/obsw_sequential_sender_receiver.py +++ b/sendreceive/obsw_sequential_sender_receiver.py @@ -67,7 +67,6 @@ class SequentialCommandSenderReceiver(CommandSenderReceiver): self.__mode_op_finished = True LOGGER.info("SequentialSenderReceiver: All replies received!") - def __perform_next_tc_send(self): if self._tm_listener.event_reply_received.is_set(): self._reply_received = True diff --git a/tc/obsw_tc_service2.py b/tc/obsw_tc_service2.py index cdd54b6a55beda096cf243c5423bd0b422ac694a..670442ff612e49d6c3cfa4f5111588da2a378374 100644 --- a/tc/obsw_tc_service2.py +++ b/tc/obsw_tc_service2.py @@ -28,7 +28,7 @@ def pack_service2_test_into(tc_queue: Deque, called_externally: bool = False) -> toggle_wiretapping_on_command = PusTelecommand(service=2, subservice=129, ssc=200, app_data=wiretapping_toggle_data) tc_queue.appendleft(toggle_wiretapping_on_command.pack_command_tuple()) - # send raw command, _tm_data should be returned via TM[2,130] and TC[2,131] + # send raw command, wiretapping should be returned via TM[2,130] and TC[2,131] tc_queue.appendleft(("print", "Testing Service 2: Sending Raw Command")) raw_command = g.DUMMY_COMMAND_1 raw_data = object_id + raw_command diff --git a/tc/obsw_tc_service5_17.py b/tc/obsw_tc_service5_17.py index 69c7f30fdb18dc61210626f2065f6908a2eb28b8..9045f08e163c5778014c80d116ac8721ed4f7699 100644 --- a/tc/obsw_tc_service5_17.py +++ b/tc/obsw_tc_service5_17.py @@ -52,7 +52,7 @@ def pack_service17_test_into(tc_queue: TcQueueT) -> TcQueueT: tc_queue.appendleft(command.pack_command_tuple()) # invalid subservice tc_queue.appendleft(("print", "Testing Service 17: Invalid subservice")) - command = PusTelecommand(service=17, subservice=243, ssc=1700) + command = PusTelecommand(service=17, subservice=243, ssc=1702) tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(("export", "log/tmtc_log_service17.txt")) return tc_queue diff --git a/test/obsw_module_test.py b/test/obsw_module_test.py index 25d856064108bc477a1477166e595fd409516b60..ba476d98049c01f97c47945acc4aed5cc8916396 100644 --- a/test/obsw_module_test.py +++ b/test/obsw_module_test.py @@ -4,9 +4,11 @@ @brief Module/High-Level test of on-board software, used by the TMTC client in software test mode. @details +Supply --np to suppress output of TM, ommit for debugging! + It is very difficult to execute Unit Tests on embedded hardware. -The most significant functonalities for satellites are commanding and TM reception. -As such, these functionalities will be tested by sending TCs and checking the expected Telemetry. +The most significant software components for satellites are commanding and TM reception. +As such, these components will be tested by sending TCs and checking the expected Telemetry. Still, we make use of the Unit Testing Framework provided by Python (with some hacks involved). @manual @@ -33,8 +35,11 @@ to be specified in the analyse_tc_info method of the child test. @author: R. Mueller """ +import struct + import sys import unittest +from enum import Enum from abc import abstractmethod from collections import deque from typing import Deque @@ -51,6 +56,17 @@ from utility.obsw_logger import get_logger TmInfoQueueService1T = Deque[PusPacketInfoService1T] LOGGER = get_logger() + +class AssertionDictKeys(Enum): + TC_START_COUNT = 1, + TC_COMPLETION_COUNT = 2, + TC_STEP_COUNT = 3, + EVENT_COUNT = 4, + MISC_COUNT = 5, + FAIL_COUNT = 6, + VALID = 7 + + class TestService(unittest.TestCase): """ Generic service test class. @@ -85,7 +101,8 @@ class TestService(unittest.TestCase): cls.tc_subservice_array = [] # these number of TCs sent which need need to be verified # separate step counter if there is more than one step for a command ! - cls.tc_verify_counter = 0 + cls.tc_start_counter = 0 + cls.tc_complete_counter = 0 cls.tc_verify_step_counter = 0 # These values are incremented in the analyseTmInfo function @@ -99,6 +116,8 @@ class TestService(unittest.TestCase): cls.event_expected = 0 cls.misc_expected = 0 cls.misc_counter = 0 + cls.fail_counter = 0 + cls.fail_expected = 0 cls.valid = True def perform_testing_and_generate_assertion_dict(self): @@ -127,6 +146,7 @@ class TestService(unittest.TestCase): tm_packet_list = tm_packet_queue.pop() for tm_packet in tm_packet_list: tm_info_queue.appendleft(tm_packet.pack_tm_information()) + assertion_dict = self._analyse_tm_tc_info(tm_info_queue, tc_info_queue) return assertion_dict @@ -141,14 +161,14 @@ class TestService(unittest.TestCase): self.analyse_tm_info(tm_info_queue, assertion_dict) assertion_dict.update({ - "TcStartCount": self.tc_verified_start, - "TcCompletionCount": self.tc_verified_completion, - "TcStepCount": self.tc_verified_step, - "EventCount": self.event_counter, - "MiscCount": self.misc_counter, - "Valid": self.valid + AssertionDictKeys.TC_START_COUNT: self.tc_verified_start, + AssertionDictKeys.TC_COMPLETION_COUNT: self.tc_verified_completion, + AssertionDictKeys.TC_STEP_COUNT: self.tc_verified_step, + AssertionDictKeys.EVENT_COUNT: self.event_counter, + AssertionDictKeys.MISC_COUNT: self.misc_counter, + AssertionDictKeys.FAIL_COUNT: self.fail_counter, + AssertionDictKeys.VALID: self.valid }) - return assertion_dict def analyse_tc_info(self, tc_info_queue: PusTcInfoQueueT): @@ -156,9 +176,10 @@ class TestService(unittest.TestCase): Analyse the properties of the sent telecommands and fills them into a list which will be used for analysis later. """ + # pprint.pprint(tc_info_queue) while not tc_info_queue.__len__() == 0: current_tc_info = tc_info_queue.pop() - self.tc_verify_counter = self.tc_verify_counter + 1 + self.tc_start_counter += 1 # For commands with multiple steps, update this value manually ! # Only service 8 generates a step reply. if current_tc_info[TcDictionaryKeys.SERVICE] == 8: @@ -166,6 +187,8 @@ class TestService(unittest.TestCase): self.tc_ssc_array.append(current_tc_info[TcDictionaryKeys.SSC]) self.tc_service_array.append(current_tc_info[TcDictionaryKeys.SERVICE]) self.tc_subservice_array.append(current_tc_info[TcDictionaryKeys.SUBSERVICE]) + # Default completion counter is equla to start counter. + self.tc_complete_counter = self.tc_start_counter @abstractmethod def analyse_tm_info(self, tm_info_queue: Deque, assertion_dict: dict): @@ -183,48 +206,65 @@ class TestService(unittest.TestCase): for possible_index, search_index in enumerate(self.tc_ssc_array): if search_index == current_tm_info[TmDictionaryKeys.TC_SSC]: if current_subservice == 1: - self.tc_verified_start = self.tc_verified_start + 1 + self.tc_verified_start += 1 + return elif current_subservice == 5: - self.tc_verified_step = self.tc_verified_step + 1 + self.tc_verified_step += 1 + return elif current_subservice == 7: - self.tc_verified_completion = self.tc_verified_completion + 1 + self.tc_verified_completion += 1 + return + elif current_subservice == 8: + self.fail_counter += 1 + return def _perform_generic_assertion_test(self, assertion_dict: dict): if assertion_dict is None: print("Performing Generic Assertion Test: " - "Configuratrion error, asseretion dictionary was not passed properly") + "Configuration error, assertion dictionary was not passed properly!") sys.exit() - self.assertEqual(assertion_dict["TcStartCount"], self.tc_verify_counter) - self.assertEqual(assertion_dict["TcCompletionCount"], self.tc_verify_counter) - self.assertEqual(assertion_dict["EventCount"], self.event_expected) - self.assertEqual(assertion_dict["MiscCount"], self.misc_expected) - self.assertTrue(assertion_dict["Valid"]) - - # these tests are identical - def _perform_service5or17_test(self): - assertion_dict = self.perform_testing_and_generate_assertion_dict() - self.event_expected = 1 - self.misc_expected = 2 - self._perform_generic_assertion_test(assertion_dict) + self.assertEqual(assertion_dict[AssertionDictKeys.TC_START_COUNT], self.tc_start_counter) + self.assertEqual(assertion_dict[AssertionDictKeys.TC_COMPLETION_COUNT], + self.tc_complete_counter) + self.assertEqual(assertion_dict[AssertionDictKeys.EVENT_COUNT], self.event_expected) + self.assertEqual(assertion_dict[AssertionDictKeys.MISC_COUNT], self.misc_expected) + self.assertEqual(assertion_dict[AssertionDictKeys.FAIL_COUNT], self.fail_expected) + self.assertTrue(assertion_dict[AssertionDictKeys.VALID]) def _analyse_service5or17_tm(self, tm_info_queue: PusTmInfoQueueT, assertion_dict: dict): self.misc_counter = 0 + # pprint.pprint(tm_info_queue) while not tm_info_queue.__len__() == 0: current_tm_info = tm_info_queue.pop() # Tc verification scanning is generic and has been moved to the superclass if current_tm_info[TmDictionaryKeys.SERVICE] == 1: self.scan_for_respective_tc(current_tm_info) # Here, the desired event Id or RID can be specified - if current_tm_info[TmDictionaryKeys.SERVICE] == 5: + elif current_tm_info[TmDictionaryKeys.SERVICE] == 5: if (current_tm_info[TmDictionaryKeys.EVENT_ID] == 8200 and current_tm_info[TmDictionaryKeys.REPORTER_ID] == 0x51001700): self.event_counter = self.event_counter + 1 - if current_tm_info[TmDictionaryKeys.SERVICE] == 17: + elif current_tm_info[TmDictionaryKeys.SERVICE] == 17: self.misc_counter = self.misc_counter + 1 + if current_tm_info[TmDictionaryKeys.VALID] == 0: self.valid = False assertion_dict.update({"MiscCount": self.misc_counter}) + def _generic_mode_tm_check(self, current_tm_info: dict, + reporter_id: bytearray, event_id_list: list): + # mode change + if current_tm_info[TmDictionaryKeys.SERVICE] == 5: + if (current_tm_info[TmDictionaryKeys.REPORTER_ID] == + struct.unpack('>I', reporter_id)[0]) \ + and (current_tm_info[TmDictionaryKeys.EVENT_ID] in event_id_list): + self.event_counter = self.event_counter + 1 + if current_tm_info[TmDictionaryKeys.SERVICE] == 200 and \ + current_tm_info[TmDictionaryKeys.SUBSERVICE] == 6: + # mode change confirmation + self.misc_counter += 1 + + @classmethod # we could print out everything here. def tearDownClass(cls): diff --git a/test/obsw_pus_service_test.py b/test/obsw_pus_service_test.py index ec29e2d3aee69bcc1782c35d90eb67ed109e93a7..3aa61d557b36c27f7cb1a014500275de692db006 100644 --- a/test/obsw_pus_service_test.py +++ b/test/obsw_pus_service_test.py @@ -8,10 +8,10 @@ import struct import unittest from typing import Deque -from test.obsw_module_test import TestService, PusTmInfoQueueT, TmDictionaryKeys +from test.obsw_module_test import TestService, PusTmInfoQueueT, TmDictionaryKeys, AssertionDictKeys from tc.obsw_pus_tc_base import PusTcInfoQueueT from tc.obsw_pus_tc_packer import pack_service17_test_into, pack_service5_test_into, \ - pack_service2_test_into, pack_service8_test_into + pack_service2_test_into, pack_service8_test_into, pack_service200_test_into import config.obsw_config as g from utility.obsw_logger import get_logger @@ -28,7 +28,7 @@ class TestService2(TestService): print("Testing Service 2") # all commands must be sent sequentially, not as a burst cls.wait_intervals = [1, 2, 3, 4] - cls.wait_time = [1.7, 2.5, 1.7, 2.5] + cls.wait_time = [2.5, 2.5, 2.5, 2.5] pack_service2_test_into(cls.test_queue) def test_service2(self): @@ -36,8 +36,8 @@ class TestService2(TestService): Tests the raw commanding service. """ assertion_dict = self.perform_testing_and_generate_assertion_dict() - self.event_expected = 2 - self.misc_expected = 4 + self.event_expected = 3 + self.misc_expected = 5 super()._perform_generic_assertion_test(assertion_dict) def analyse_tm_info(self, tm_info_queue: PusTmInfoQueueT, assertion_dict: dict): @@ -75,13 +75,18 @@ class TestService5(TestService): LOGGER.info("Testing Service 5") # Wait intervals after TC 1,2 and 3 with specified wait times # This is required because the OBSW tasks runs with fixed sequences - cls.wait_intervals = [1, 2, 3, 4] - cls.wait_time = [0.7, 1.5, 0.7, 1.5] + cls.wait_intervals = [1, 2, 3] + cls.wait_time = [2.0, 2.0, 2.0] pack_service5_test_into(cls.test_queue) def test_Service5(self): # analyseTmInfo() and analyseTcInfo are called here - super()._perform_service5or17_test() + assertion_dict = self.perform_testing_and_generate_assertion_dict() + self.event_expected = 1 + self.misc_expected = 2 + self.fail_expected = 1 + self.tc_complete_counter -= 1 + self._perform_generic_assertion_test(assertion_dict) def analyse_tc_info(self, tc_info_queue: PusTcInfoQueueT): super().analyse_tc_info(tc_info_queue) @@ -106,20 +111,23 @@ class TestService8(TestService): def setUpClass(cls: TestService): super().setUpClass() LOGGER.info("Testing Service 8") - cls.wait_intervals = [1, 2, 3, 4] - cls.wait_time = [1.5, 1.2, 1.2, 1.5] + cls.wait_intervals = [1, 2, 3, 4, 5] + cls.wait_time = [1.5, 1.5, 2.2, 2.2, 2.0] cls.data_reply_count = 0 pack_service8_test_into(cls.test_queue) def test_Service8(self): assertion_dict = self.perform_testing_and_generate_assertion_dict() - self.event_expected = 4 - self.misc_expected = 2 + # 3 x Mode changes + self.misc_expected = 3 + # 2 x Event per mode change + self.event_expected = 6 self.data_reply_expected = 1 # One reply generates an additional step. self.tc_verify_step_counter += 1 - self.assertEqual(assertion_dict["TcStepCount"], self.tc_verify_step_counter) + self.assertEqual(assertion_dict[AssertionDictKeys.TC_STEP_COUNT], + self.tc_verify_step_counter) self.assertEqual(self.data_reply_count, self.data_reply_expected) super()._perform_generic_assertion_test(assertion_dict) @@ -129,18 +137,10 @@ class TestService8(TestService): current_tm_info = tm_info_queue.pop() if current_tm_info[TmDictionaryKeys.SERVICE] == 1: self.scan_for_respective_tc(current_tm_info) - if current_tm_info[TmDictionaryKeys.SERVICE] == 5: - if (current_tm_info[TmDictionaryKeys.REPORTER_ID] == - struct.unpack('>I', g.DUMMY_DEVICE_ID)[0]) and \ - (current_tm_info[TmDictionaryKeys.EVENT_ID] == 7401 - or current_tm_info[TmDictionaryKeys.EVENT_ID] == 7400): - self.event_counter += 1 - if current_tm_info[TmDictionaryKeys.SERVICE] == 200: - # mode change confirmation - self.misc_counter += 1 if (current_tm_info[TmDictionaryKeys.SERVICE] == 8 and current_tm_info[TmDictionaryKeys.SUBSERVICE] == 130): self.data_reply_count += 1 + self._generic_mode_tm_check(current_tm_info, g.DUMMY_DEVICE_ID, [7401, 7400]) class TestService9(unittest.TestCase): @@ -154,13 +154,18 @@ class TestService17(TestService): def setUpClass(cls: TestService): super().setUpClass() LOGGER.info("Testing Service 17") - cls.wait_intervals = [2,3] - cls.wait_time = [2,1] + cls.wait_intervals = [2, 3] + cls.wait_time = [2, 1] cls.tm_timeout = g.G_TM_TIMEOUT pack_service17_test_into(cls.test_queue) def test_Service17(self): - super()._perform_service5or17_test() + assertion_dict = self.perform_testing_and_generate_assertion_dict() + self.event_expected = 1 + self.misc_expected = 2 + self.fail_expected = 1 + self.tc_complete_counter -= 1 + self._perform_generic_assertion_test(assertion_dict) def _analyse_tm_tc_info(self, tm_info_queue: PusTmInfoQueueT, tc_info_queue: PusTcInfoQueueT): assertion_dict = super()._analyse_tm_tc_info(tm_info_queue=tm_info_queue, @@ -179,3 +184,40 @@ class TestService17(TestService): def tearDownClass(cls): print("Testing Service 17 finished") super().tearDownClass() + + +class TestService200(TestService): + @classmethod + def setUpClass(cls: TestService): + super().setUpClass() + LOGGER.info("Testing Service 200") + cls.wait_intervals = [1, 2, 3] + cls.wait_time = [2, 2, 2] + cls.tm_timeout = g.G_TM_TIMEOUT + pack_service200_test_into(cls.test_queue) + + def test_Service200(self): + assertion_dict = self.perform_testing_and_generate_assertion_dict() + # 4 x Mode change with 2 events for each + self.event_expected = 8 + self.misc_expected = 4 + self._perform_generic_assertion_test(assertion_dict) + + def _analyse_tm_tc_info(self, tm_info_queue: PusTmInfoQueueT, tc_info_queue: PusTcInfoQueueT): + assertion_dict = super()._analyse_tm_tc_info(tm_info_queue=tm_info_queue, + tc_info_queue=tc_info_queue) + # add anything else other than tc verification counter + # and ssc that is needed for tm analysis + return assertion_dict + + def analyse_tc_info(self, tc_info_queue): + super().analyse_tc_info(tc_info_queue) + + def analyse_tm_info(self, tm_info_queue: Deque, assertion_dict: dict): + while not tm_info_queue.__len__() == 0: + current_tm_info = tm_info_queue.pop() + # Tc verification scanning is generic and has been moved to the superclass + if current_tm_info[TmDictionaryKeys.SERVICE] == 1: + self.scan_for_respective_tc(current_tm_info) + self._generic_mode_tm_check(current_tm_info, g.DUMMY_DEVICE_ID, [7401, 7400]) + diff --git a/tm/obsw_pus_tm_base.py b/tm/obsw_pus_tm_base.py index 37b50e60c26c03e96715e971d1a4000b5aae09be..9eeadf7b806174793b6f1cb2340a5230fd4d1421 100644 --- a/tm/obsw_pus_tm_base.py +++ b/tm/obsw_pus_tm_base.py @@ -10,6 +10,7 @@ from crcmod import crcmod logger = get_logger() + class TmDictionaryKeys(Enum): SERVICE = auto() SUBSERVICE = auto()