diff --git a/obsw_tmtc_client.py b/obsw_tmtc_client.py index 0a8bb7f7af5dcb9cb5c79575892f50758d21c757..b31983de2ffabefff6e49af57e331f0882c88e8e 100755 --- a/obsw_tmtc_client.py +++ b/obsw_tmtc_client.py @@ -156,7 +156,6 @@ class TmTcHandler: except (IOError, KeyboardInterrupt): LOGGER.info("Closing TMTC client.") - def handle_action(self): """ Command handling. @@ -218,5 +217,6 @@ class TmTcHandler: logging.error("Unknown Mode, Configuration error !") sys.exit() + if __name__ == "__main__": main() diff --git a/sendreceive/obsw_multiple_commands_sender_receiver.py b/sendreceive/obsw_multiple_commands_sender_receiver.py index 8531800d3531a5ee391a4c5a7a180eb234a6198a..7c8bfd79a41ce817590da24f49369da37eed9a08 100644 --- a/sendreceive/obsw_multiple_commands_sender_receiver.py +++ b/sendreceive/obsw_multiple_commands_sender_receiver.py @@ -16,6 +16,7 @@ from utility.obsw_tmtc_printer import get_logger LOGGER = get_logger() + class MultipleCommandSenderReceiver(SequentialCommandSenderReceiver): """ Difference to seqential sender: This class can send TCs in bursts. @@ -43,7 +44,7 @@ class MultipleCommandSenderReceiver(SequentialCommandSenderReceiver): self.waitTime = wait_time self.printTm = print_tm self.tm_packet_queue = deque() - self.tcInfoQueue = deque() + self.tc_info_queue = deque() self.pusPacketInfo = [] self.pusPacket = [] self.waitCounter = 0 @@ -55,17 +56,17 @@ class MultipleCommandSenderReceiver(SequentialCommandSenderReceiver): # TC info queue is set in this function self.__send_all_queue() self.wait_for_last_replies_listening(self._tm_timeout / 1.4) - self.tm_packet_queue = self._tm_listener.retrieve_tm_packet_queue() + # Get a copy of the queue, otherwise we will lose the data. + tm_packet_queue_list = self._tm_listener.retrieve_tm_packet_queue().copy() self._tm_listener.clear_tm_packet_queue() self._tm_listener.event_mode_op_finished.set() if g.G_PRINT_TO_FILE: self._tmtc_printer.print_to_file() + return self.tc_info_queue, tm_packet_queue_list except (KeyboardInterrupt, SystemExit): LOGGER.info("Closing TMTC Client") sys.exit() - return self.tcInfoQueue, self.tm_packet_queue - def __handle_tc_resending(self): while not self.__all_replies_received: if self._tc_queue.__len__ == 0: @@ -81,7 +82,7 @@ class MultipleCommandSenderReceiver(SequentialCommandSenderReceiver): tc_queue_tuple = self._tc_queue.pop() if self.check_queue_entry(tc_queue_tuple): pus_packet, pus_packet_info = tc_queue_tuple - self.tcInfoQueue.append(pus_packet_info) + self.tc_info_queue.append(pus_packet_info) self._com_interface.send_telecommand(pus_packet, pus_packet_info) self.__handle_waiting() diff --git a/sendreceive/obsw_sequential_sender_receiver.py b/sendreceive/obsw_sequential_sender_receiver.py index 782c41a08023ebe3b9dc8629660a4d633795c4a6..1d107ce7281549071c3078daffc98082debc68b1 100644 --- a/sendreceive/obsw_sequential_sender_receiver.py +++ b/sendreceive/obsw_sequential_sender_receiver.py @@ -67,7 +67,6 @@ class SequentialCommandSenderReceiver(CommandSenderReceiver): self.__mode_op_finished = True LOGGER.info("SequentialSenderReceiver: All replies received!") - def __perform_next_tc_send(self): if self._tm_listener.event_reply_received.is_set(): self._reply_received = True diff --git a/tc/obsw_tc_service2.py b/tc/obsw_tc_service2.py index cdd54b6a55beda096cf243c5423bd0b422ac694a..670442ff612e49d6c3cfa4f5111588da2a378374 100644 --- a/tc/obsw_tc_service2.py +++ b/tc/obsw_tc_service2.py @@ -28,7 +28,7 @@ def pack_service2_test_into(tc_queue: Deque, called_externally: bool = False) -> toggle_wiretapping_on_command = PusTelecommand(service=2, subservice=129, ssc=200, app_data=wiretapping_toggle_data) tc_queue.appendleft(toggle_wiretapping_on_command.pack_command_tuple()) - # send raw command, _tm_data should be returned via TM[2,130] and TC[2,131] + # send raw command, wiretapping should be returned via TM[2,130] and TC[2,131] tc_queue.appendleft(("print", "Testing Service 2: Sending Raw Command")) raw_command = g.DUMMY_COMMAND_1 raw_data = object_id + raw_command diff --git a/tc/obsw_tc_service5_17.py b/tc/obsw_tc_service5_17.py index 69c7f30fdb18dc61210626f2065f6908a2eb28b8..9045f08e163c5778014c80d116ac8721ed4f7699 100644 --- a/tc/obsw_tc_service5_17.py +++ b/tc/obsw_tc_service5_17.py @@ -52,7 +52,7 @@ def pack_service17_test_into(tc_queue: TcQueueT) -> TcQueueT: tc_queue.appendleft(command.pack_command_tuple()) # invalid subservice tc_queue.appendleft(("print", "Testing Service 17: Invalid subservice")) - command = PusTelecommand(service=17, subservice=243, ssc=1700) + command = PusTelecommand(service=17, subservice=243, ssc=1702) tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(("export", "log/tmtc_log_service17.txt")) return tc_queue diff --git a/test/obsw_module_test.py b/test/obsw_module_test.py index 25d856064108bc477a1477166e595fd409516b60..6cb4d373a5294332cd96cda2485563512d04f81d 100644 --- a/test/obsw_module_test.py +++ b/test/obsw_module_test.py @@ -35,9 +35,11 @@ to be specified in the analyse_tc_info method of the child test. """ import sys import unittest +from enum import Enum from abc import abstractmethod from collections import deque from typing import Deque +import pprint from config import obsw_config as g from tc.obsw_pus_tc_packer import pack_dummy_device_test_into @@ -51,6 +53,17 @@ from utility.obsw_logger import get_logger TmInfoQueueService1T = Deque[PusPacketInfoService1T] LOGGER = get_logger() + +class AssertionDictKeys(Enum): + TC_START_COUNT = 1, + TC_COMPLETION_COUNT = 2, + TC_STEP_COUNT = 3, + EVENT_COUNT = 4, + MISC_COUNT = 5, + FAIL_COUNT = 6, + VALID = 7 + + class TestService(unittest.TestCase): """ Generic service test class. @@ -85,7 +98,8 @@ class TestService(unittest.TestCase): cls.tc_subservice_array = [] # these number of TCs sent which need need to be verified # separate step counter if there is more than one step for a command ! - cls.tc_verify_counter = 0 + cls.tc_start_counter = 0 + cls.tc_complete_counter = 0 cls.tc_verify_step_counter = 0 # These values are incremented in the analyseTmInfo function @@ -99,6 +113,8 @@ class TestService(unittest.TestCase): cls.event_expected = 0 cls.misc_expected = 0 cls.misc_counter = 0 + cls.fail_counter = 0 + cls.fail_expected = 0 cls.valid = True def perform_testing_and_generate_assertion_dict(self): @@ -120,6 +136,7 @@ class TestService(unittest.TestCase): tm_packet_queue = deque() try: (tc_info_queue, tm_packet_queue) = module_tester.send_tc_queue_and_return_info() + module_tester.print_tm_queue(tm_packet_queue) except (IOError, KeyboardInterrupt): LOGGER.info("Closing TMTC Handler") tm_info_queue = deque() @@ -127,6 +144,7 @@ class TestService(unittest.TestCase): tm_packet_list = tm_packet_queue.pop() for tm_packet in tm_packet_list: tm_info_queue.appendleft(tm_packet.pack_tm_information()) + assertion_dict = self._analyse_tm_tc_info(tm_info_queue, tc_info_queue) return assertion_dict @@ -141,14 +159,14 @@ class TestService(unittest.TestCase): self.analyse_tm_info(tm_info_queue, assertion_dict) assertion_dict.update({ - "TcStartCount": self.tc_verified_start, - "TcCompletionCount": self.tc_verified_completion, - "TcStepCount": self.tc_verified_step, - "EventCount": self.event_counter, - "MiscCount": self.misc_counter, - "Valid": self.valid + AssertionDictKeys.TC_START_COUNT: self.tc_verified_start, + AssertionDictKeys.TC_COMPLETION_COUNT: self.tc_verified_completion, + AssertionDictKeys.TC_STEP_COUNT: self.tc_verified_step, + AssertionDictKeys.EVENT_COUNT: self.event_counter, + AssertionDictKeys.MISC_COUNT: self.misc_counter, + AssertionDictKeys.FAIL_COUNT: self.fail_counter, + AssertionDictKeys.VALID: self.valid }) - return assertion_dict def analyse_tc_info(self, tc_info_queue: PusTcInfoQueueT): @@ -156,9 +174,10 @@ class TestService(unittest.TestCase): Analyse the properties of the sent telecommands and fills them into a list which will be used for analysis later. """ + # pprint.pprint(tc_info_queue) while not tc_info_queue.__len__() == 0: current_tc_info = tc_info_queue.pop() - self.tc_verify_counter = self.tc_verify_counter + 1 + self.tc_start_counter += 1 # For commands with multiple steps, update this value manually ! # Only service 8 generates a step reply. if current_tc_info[TcDictionaryKeys.SERVICE] == 8: @@ -166,6 +185,8 @@ class TestService(unittest.TestCase): self.tc_ssc_array.append(current_tc_info[TcDictionaryKeys.SSC]) self.tc_service_array.append(current_tc_info[TcDictionaryKeys.SERVICE]) self.tc_subservice_array.append(current_tc_info[TcDictionaryKeys.SUBSERVICE]) + # Default completion counter is equla to start counter. + self.tc_complete_counter = self.tc_start_counter @abstractmethod def analyse_tm_info(self, tm_info_queue: Deque, assertion_dict: dict): @@ -183,44 +204,47 @@ class TestService(unittest.TestCase): for possible_index, search_index in enumerate(self.tc_ssc_array): if search_index == current_tm_info[TmDictionaryKeys.TC_SSC]: if current_subservice == 1: - self.tc_verified_start = self.tc_verified_start + 1 + self.tc_verified_start += 1 + return elif current_subservice == 5: - self.tc_verified_step = self.tc_verified_step + 1 + self.tc_verified_step += 1 + return elif current_subservice == 7: - self.tc_verified_completion = self.tc_verified_completion + 1 + self.tc_verified_completion += 1 + return + elif current_subservice == 8: + self.fail_counter += 1 + return def _perform_generic_assertion_test(self, assertion_dict: dict): if assertion_dict is None: print("Performing Generic Assertion Test: " - "Configuratrion error, asseretion dictionary was not passed properly") + "Configuration error, assertion dictionary was not passed properly!") sys.exit() - self.assertEqual(assertion_dict["TcStartCount"], self.tc_verify_counter) - self.assertEqual(assertion_dict["TcCompletionCount"], self.tc_verify_counter) - self.assertEqual(assertion_dict["EventCount"], self.event_expected) - self.assertEqual(assertion_dict["MiscCount"], self.misc_expected) - self.assertTrue(assertion_dict["Valid"]) - - # these tests are identical - def _perform_service5or17_test(self): - assertion_dict = self.perform_testing_and_generate_assertion_dict() - self.event_expected = 1 - self.misc_expected = 2 - self._perform_generic_assertion_test(assertion_dict) + self.assertEqual(assertion_dict[AssertionDictKeys.TC_START_COUNT], self.tc_start_counter) + self.assertEqual(assertion_dict[AssertionDictKeys.TC_COMPLETION_COUNT], + self.tc_complete_counter) + self.assertEqual(assertion_dict[AssertionDictKeys.EVENT_COUNT], self.event_expected) + self.assertEqual(assertion_dict[AssertionDictKeys.MISC_COUNT], self.misc_expected) + self.assertEqual(assertion_dict[AssertionDictKeys.FAIL_COUNT], self.fail_expected) + self.assertTrue(assertion_dict[AssertionDictKeys.VALID]) def _analyse_service5or17_tm(self, tm_info_queue: PusTmInfoQueueT, assertion_dict: dict): self.misc_counter = 0 + # pprint.pprint(tm_info_queue) while not tm_info_queue.__len__() == 0: current_tm_info = tm_info_queue.pop() # Tc verification scanning is generic and has been moved to the superclass if current_tm_info[TmDictionaryKeys.SERVICE] == 1: self.scan_for_respective_tc(current_tm_info) # Here, the desired event Id or RID can be specified - if current_tm_info[TmDictionaryKeys.SERVICE] == 5: + elif current_tm_info[TmDictionaryKeys.SERVICE] == 5: if (current_tm_info[TmDictionaryKeys.EVENT_ID] == 8200 and current_tm_info[TmDictionaryKeys.REPORTER_ID] == 0x51001700): self.event_counter = self.event_counter + 1 - if current_tm_info[TmDictionaryKeys.SERVICE] == 17: + elif current_tm_info[TmDictionaryKeys.SERVICE] == 17: self.misc_counter = self.misc_counter + 1 + if current_tm_info[TmDictionaryKeys.VALID] == 0: self.valid = False assertion_dict.update({"MiscCount": self.misc_counter}) diff --git a/test/obsw_pus_service_test.py b/test/obsw_pus_service_test.py index ec29e2d3aee69bcc1782c35d90eb67ed109e93a7..a7e3b42497c6a57e268ba0ef9e117bf4fcbc5db9 100644 --- a/test/obsw_pus_service_test.py +++ b/test/obsw_pus_service_test.py @@ -8,7 +8,7 @@ import struct import unittest from typing import Deque -from test.obsw_module_test import TestService, PusTmInfoQueueT, TmDictionaryKeys +from test.obsw_module_test import TestService, PusTmInfoQueueT, TmDictionaryKeys, AssertionDictKeys from tc.obsw_pus_tc_base import PusTcInfoQueueT from tc.obsw_pus_tc_packer import pack_service17_test_into, pack_service5_test_into, \ pack_service2_test_into, pack_service8_test_into @@ -28,7 +28,7 @@ class TestService2(TestService): print("Testing Service 2") # all commands must be sent sequentially, not as a burst cls.wait_intervals = [1, 2, 3, 4] - cls.wait_time = [1.7, 2.5, 1.7, 2.5] + cls.wait_time = [2.5, 2.5, 2.5, 2.5] pack_service2_test_into(cls.test_queue) def test_service2(self): @@ -36,8 +36,8 @@ class TestService2(TestService): Tests the raw commanding service. """ assertion_dict = self.perform_testing_and_generate_assertion_dict() - self.event_expected = 2 - self.misc_expected = 4 + self.event_expected = 3 + self.misc_expected = 5 super()._perform_generic_assertion_test(assertion_dict) def analyse_tm_info(self, tm_info_queue: PusTmInfoQueueT, assertion_dict: dict): @@ -76,12 +76,17 @@ class TestService5(TestService): # Wait intervals after TC 1,2 and 3 with specified wait times # This is required because the OBSW tasks runs with fixed sequences cls.wait_intervals = [1, 2, 3, 4] - cls.wait_time = [0.7, 1.5, 0.7, 1.5] + cls.wait_time = [2.0, 2.0, 2.0, 2.0] pack_service5_test_into(cls.test_queue) def test_Service5(self): # analyseTmInfo() and analyseTcInfo are called here - super()._perform_service5or17_test() + assertion_dict = self.perform_testing_and_generate_assertion_dict() + self.event_expected = 1 + self.misc_expected = 2 + self.fail_expected = 1 + self.tc_complete_counter -= 1 + self._perform_generic_assertion_test(assertion_dict) def analyse_tc_info(self, tc_info_queue: PusTcInfoQueueT): super().analyse_tc_info(tc_info_queue) @@ -107,19 +112,22 @@ class TestService8(TestService): super().setUpClass() LOGGER.info("Testing Service 8") cls.wait_intervals = [1, 2, 3, 4] - cls.wait_time = [1.5, 1.2, 1.2, 1.5] + cls.wait_time = [2.0, 2.0, 2.0, 2.0] cls.data_reply_count = 0 pack_service8_test_into(cls.test_queue) def test_Service8(self): assertion_dict = self.perform_testing_and_generate_assertion_dict() - self.event_expected = 4 - self.misc_expected = 2 + # 3 x Mode changes + self.misc_expected = 3 + # 2 x Event per mode change + self.event_expected = 6 self.data_reply_expected = 1 # One reply generates an additional step. self.tc_verify_step_counter += 1 - self.assertEqual(assertion_dict["TcStepCount"], self.tc_verify_step_counter) + self.assertEqual(assertion_dict[AssertionDictKeys.TC_STEP_COUNT], + self.tc_verify_step_counter) self.assertEqual(self.data_reply_count, self.data_reply_expected) super()._perform_generic_assertion_test(assertion_dict) @@ -154,13 +162,18 @@ class TestService17(TestService): def setUpClass(cls: TestService): super().setUpClass() LOGGER.info("Testing Service 17") - cls.wait_intervals = [2,3] - cls.wait_time = [2,1] + cls.wait_intervals = [2, 3] + cls.wait_time = [2, 1] cls.tm_timeout = g.G_TM_TIMEOUT pack_service17_test_into(cls.test_queue) def test_Service17(self): - super()._perform_service5or17_test() + assertion_dict = self.perform_testing_and_generate_assertion_dict() + self.event_expected = 1 + self.misc_expected = 2 + self.fail_expected = 1 + self.tc_complete_counter -= 1 + self._perform_generic_assertion_test(assertion_dict) def _analyse_tm_tc_info(self, tm_info_queue: PusTmInfoQueueT, tc_info_queue: PusTcInfoQueueT): assertion_dict = super()._analyse_tm_tc_info(tm_info_queue=tm_info_queue, diff --git a/tm/obsw_pus_tm_base.py b/tm/obsw_pus_tm_base.py index 37b50e60c26c03e96715e971d1a4000b5aae09be..9eeadf7b806174793b6f1cb2340a5230fd4d1421 100644 --- a/tm/obsw_pus_tm_base.py +++ b/tm/obsw_pus_tm_base.py @@ -10,6 +10,7 @@ from crcmod import crcmod logger = get_logger() + class TmDictionaryKeys(Enum): SERVICE = auto() SUBSERVICE = auto()