Skip to content
Snippets Groups Projects
Forked from an inaccessible project.
obsw_pus_service_test.py 6.97 KiB
"""
@file   obsw_pus_service_test.py
@date   01.11.2019
@brief  Contains specific PUS service tests.
@author: R. Mueller
"""
import struct
import unittest
from typing import Deque

from test.obsw_module_test import TestService, PusTmInfoQueueT, TmDictionaryKeys
from tc.obsw_pus_tc_base import PusTcInfoQueueT
from tc.obsw_pus_tc_packer import pack_service17_test_into, pack_service5_test_into, \
    pack_service2_test_into, pack_service8_test_into
import config.obsw_config as g


class TestService2(TestService):
    """
    Test raw commanding service.
    """
    @classmethod
    def setUpClass(cls: TestService):
        super().setUpClass()
        print("Testing Service 2")
        # all commands must be sent sequentially, not as a burst
        cls.wait_intervals = [1, 2, 3, 4]
        cls.wait_time = [2, 2, 2, 3]
        pack_service2_test_into(cls.test_queue)

    def test_service2(self):
        """
        Tests the raw commanding service.
        """
        assertion_dict = self.perform_testing_and_generate_assertion_dict()
        self.event_expected = 2
        self.misc_expected = 4
        super()._perform_generic_assertion_test(assertion_dict)

    def analyse_tm_info(self, tm_info_queue: PusTmInfoQueueT, assertion_dict: dict):
        while not tm_info_queue.__len__() == 0:
            current_tm_info = tm_info_queue.pop()
            # Tc verification scanning is generic and has been moved to the superclass
            if current_tm_info[TmDictionaryKeys.SERVICE] == 1:
                super().scan_for_respective_tc(current_tm_info)
            # Here, the desired event Id or RID can be specified
            if current_tm_info[TmDictionaryKeys.SERVICE] == 5:
                # mode change
                if (current_tm_info[TmDictionaryKeys.REPORTER_ID] ==
                        struct.unpack('>I', g.DUMMY_DEVICE_ID)[0]) \
                        and (current_tm_info[TmDictionaryKeys.EVENT_ID] == 7401 or
                             current_tm_info[TmDictionaryKeys.EVENT_ID] == 7400):
                    self.event_counter = self.event_counter + 1
            if current_tm_info[TmDictionaryKeys.SERVICE] == 200 and \
                    current_tm_info[TmDictionaryKeys.SUBSERVICE] == 6:
                # mode change confirmation
                self.misc_counter += 1
            # wiretapping messages
            elif current_tm_info[TmDictionaryKeys.SERVICE] == 2 and \
                    (current_tm_info[TmDictionaryKeys.SUBSERVICE] == 130 or
                     current_tm_info[TmDictionaryKeys.SUBSERVICE] == 131):
                self.misc_counter += 1
            if current_tm_info[TmDictionaryKeys.VALID] == 0:
                self.valid = False
        assertion_dict.update({"MiscCount": self.misc_counter})


class TestService5(TestService):
    @classmethod
    def setUpClass(cls: TestService):
        super().setUpClass()
        print("Testing Service 5")
        # Wait intervals after TC 1,2 and 3 with specified wait times
        # This is required because the OBSW tasks runs with fixed sequences
        cls.wait_intervals = [1, 2, 3]
        cls.wait_time = [1.5, 2.0, 2.0]
        pack_service5_test_into(cls.test_queue)

    def test_Service5(self):
        # analyseTmInfo() and analyseTcInfo are called here
        super()._perform_service5or17_test()

    def analyse_tc_info(self, tc_info_queue: PusTcInfoQueueT):
        super().analyse_tc_info(tc_info_queue)

    def analyse_tm_info(self, tm_info_queue: PusTmInfoQueueT, assertion_dict: dict):
        super()._analyse_service5or17_tm(tm_info_queue, assertion_dict)

    @classmethod
    def tearDownClass(cls):
        super().tearDownClass()


class TestService6(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        print("Hallo Test 6!")


class TestService8(TestService):

    @classmethod
    def setUpClass(cls: TestService):
        super().setUpClass()
        print("Testing Service 8")
        cls.wait_intervals = [1, 2, 3, 4]
        cls.wait_time = [1.5, 0.7, 1.0, 1.1]
        cls.data_reply_count = 0
        pack_service8_test_into(cls.test_queue)

    def test_Service8(self):

        assertion_dict = self.perform_testing_and_generate_assertion_dict()
        self.event_expected = 4
        self.misc_expected = 2
        self.data_reply_expected = 1
        # One reply generates an additional step.
        self.tc_verify_step_counter += 1
        self.assertEqual(assertion_dict["TcStepCount"], self.tc_verify_step_counter)

        self.assertEqual(self.data_reply_count, self.data_reply_expected)
        super()._perform_generic_assertion_test(assertion_dict)

    def analyse_tm_info(self, tm_info_queue: Deque, assertion_dict: dict):
        while not tm_info_queue.__len__() == 0:
            current_tm_info = tm_info_queue.pop()
            if current_tm_info[TmDictionaryKeys.SERVICE] == 1:
                self.scan_for_respective_tc(current_tm_info)
            if current_tm_info[TmDictionaryKeys.SERVICE] == 5:
                if (current_tm_info[TmDictionaryKeys.REPORTER_ID] ==
                        struct.unpack('>I', g.DUMMY_DEVICE_ID)[0]) and \
                        (current_tm_info[TmDictionaryKeys.EVENT_ID] == 7401
                         or current_tm_info[TmDictionaryKeys.EVENT_ID] == 7400):
                    self.event_counter += 1
            if current_tm_info[TmDictionaryKeys.SERVICE] == 200:
                # mode change confirmation
                self.misc_counter += 1
            if (current_tm_info[TmDictionaryKeys.SERVICE] == 8 and
                    current_tm_info[TmDictionaryKeys.SUBSERVICE] == 130):
                self.data_reply_count += 1


class TestService9(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        print("Hallo Test 9!")


class TestService17(TestService):
    @classmethod
    def setUpClass(cls: TestService):
        super().setUpClass()
        print("Testing Service 17")
        cls.wait_intervals = [2]
        cls.wait_time = 2
        cls.tm_timeout = g.G_TM_TIMEOUT
        pack_service17_test_into(cls.test_queue)

    def test_Service17(self):
        super()._perform_service5or17_test()

    def _analyse_tm_tc_info(self, tm_info_queue: PusTmInfoQueueT, tc_info_queue: PusTcInfoQueueT):
        assertion_dict = super()._analyse_tm_tc_info(tm_info_queue=tm_info_queue,
                                                     tc_info_queue=tc_info_queue)
        # add anything elsee other than tc verification counter
        # and ssc that is needed for tm analysis
        return assertion_dict

    def analyse_tc_info(self, tc_info_queue):
        super().analyse_tc_info(tc_info_queue)

    def analyse_tm_info(self, tm_info_queue: PusTmInfoQueueT, assertion_dict: dict):
        super()._analyse_service5or17_tm(tm_info_queue, assertion_dict)

    @classmethod
    def tearDownClass(cls):
        print("Testing Service 17 finished")
        super().tearDownClass()