Skip to content
Snippets Groups Projects
Forked from an inaccessible project.
obsw_module_test.py 11.30 KiB
"""
@file   obsw_module_test.py
@date   01.11.2019
@brief  Module/High-Level test of on-board software, used by the TMTC client in
        software test mode.
@details
It is very difficult to execute Unit Tests on embedded hardware.
The most significant functonalities for satellites are commanding and TM reception.
As such, these functionalities will be tested by sending TCs and checking the expected Telemetry.
Still, we make use of the Unit Testing Framework provided by Python (with some hacks involved).

@manual
Set up the TMTC client as specified in the header comment and use the unit testing mode

For Developers:
TestService is the template method, analyseTmInfo and analyseTcInfo are implemented for
specific services or devices by setting up assertionDict entries which count received packets
based on subservice or entry values. There is a default implementation provided for analyseTcInfo.
The log files generated by individual G_SERVICE test or the software test mode when providing
the -p flag are very useful for extending the unit tester.
To check the dictionary keys of the telecommand and telemetry information queue entries,
go look up the packTmInformation() and packTcInformation() implementations in the TM and TC folder.

Example Service 17:
TC[17,1] and TC[17,128] are sent, TM[1,1] and TM[1,7] (TC verification) are expected twice,
TM[17,2] (ping reply) is expected twice and TM[5,1] (test event) is expected once.
Note that the TC verification counting is done by the template class by comparing with
TC source sequence count. For PUS standalone services (PUS Service Base),
only the start and completion success need to be asserted. For PUS gateway services
(PUS Commanding Service Base), step verification needs to be asserted additionally.
If there are commands with multiple steps, this needs
to be specified in the analyse_tc_info method of the child test.

@author: R. Mueller
"""
import sys
import unittest
from abc import abstractmethod
from collections import deque
from typing import Deque

from config import obsw_config as g
from tc.obsw_pus_tc_packer import pack_dummy_device_test_into
from tc.obsw_pus_tc_base import PusTcInfoQueueT, TcDictionaryKeys
from tm.obsw_tm_service_1 import PusPacketInfoService1T
from tm.obsw_pus_tm_base import TmDictionaryKeys
from tm.obsw_pus_tm_factory import PusTmInfoQueueT, PusTmInfoT
from sendreceive.obsw_multiple_commands_sender_receiver import MultipleCommandSenderReceiver
from utility.obsw_logger import get_logger

TmInfoQueueService1T = Deque[PusPacketInfoService1T]
LOGGER = get_logger()

class TestService(unittest.TestCase):
    """
    Generic service test class.
    See documentation: https://docs.python.org/3/library/unittest.html#unittest.TestCase.setUpClass
    """
    @classmethod
    def setUpClass(cls):
        """
        A class method which is run before every individually class module is run
        must be decorated by the @classmethod fixture
        :return:
        """
        cls._displayMode = "long"
        # wait intervals between tc send bursts.
        # Example: [2,4] sends to send 2 tc from queue and wait, then sends another 2 and wait again
        cls.wait_intervals = []
        cls.wait_time = 5.0
        cls.print_tc = True
        # default wait time between tc send bursts

        cls.test_queue = deque()
        # Extremely ugly solution so that we can use the Python Unit Test Framework
        cls.tm_timeout = g.G_TM_TIMEOUT
        cls.tc_timeout_factor = g.G_TC_SEND_TIMEOUT_FACTOR
        cls.print_file = g.G_PRINT_TO_FILE
        cls.tmtc_printer = g.G_TMTC_PRINTER
        cls.tm_listener = g.G_TM_LISTENER
        cls.communication_interface = g.G_COM_INTERFACE

        cls.tc_ssc_array = []
        cls.tc_service_array = []
        cls.tc_subservice_array = []
        # these number of TCs sent which need need to be verified
        # separate step counter if there is more than one step for a command !
        cls.tc_verify_counter = 0
        cls.tc_verify_step_counter = 0

        # These values are incremented in the analyseTmInfo function
        # and compared to the counter values
        cls.tc_verified_start = 0
        cls.tc_verified_completion = 0
        cls.tc_verified_step = 0

        # The expected values are set in child test
        cls.event_counter = 0
        cls.event_expected = 0
        cls.misc_expected = 0
        cls.misc_counter = 0
        cls.valid = True

    def perform_testing_and_generate_assertion_dict(self):
        """
        This function should be called in each individual test to send the actual telecommands
        which are stored inside test_queue
        TODO: Maybe we should instantiate this once in the main and then reuse it instead
               of calling the constructor over and over again.
               If done so we need a setter for: wait_time, wait_intervals, printTm,
               tc_timeout_factor and the tc.queue. Furthermore, changing parameters should
               only be allowed as long as the commander/receiver is not running by checking a flag
        :return:
        """
        module_tester = MultipleCommandSenderReceiver(
            com_interface=self.communication_interface, tmtc_printer=self.tmtc_printer,
            tm_listener=self.tm_listener, tc_queue=self.test_queue,
            wait_intervals=self.wait_intervals, wait_time=self.wait_time, print_tm=g.G_PRINT_RAW_TM)
        tc_info_queue = deque()
        tm_packet_queue = deque()
        try:
            (tc_info_queue, tm_packet_queue) = module_tester.send_tc_queue_and_return_info()
        except (IOError, KeyboardInterrupt):
            LOGGER.info("Closing TMTC Handler")
        tm_info_queue = deque()
        while tm_packet_queue.__len__() != 0:
            tm_packet_list = tm_packet_queue.pop()
            for tm_packet in tm_packet_list:
                tm_info_queue.appendleft(tm_packet.pack_tm_information())
        assertion_dict = self._analyse_tm_tc_info(tm_info_queue, tc_info_queue)
        return assertion_dict

    def _analyse_tm_tc_info(self, tm_info_queue: PusTmInfoQueueT,
                            tc_info_queue: PusTcInfoQueueT) -> dict:
        # child test implements this
        # default implementation provided
        self.analyse_tc_info(tc_info_queue)

        assertion_dict = {}
        # child test implements this and can add additional dict entries
        self.analyse_tm_info(tm_info_queue, assertion_dict)

        assertion_dict.update({
            "TcStartCount": self.tc_verified_start,
            "TcCompletionCount": self.tc_verified_completion,
            "TcStepCount": self.tc_verified_step,
            "EventCount": self.event_counter,
            "MiscCount": self.misc_counter,
            "Valid": self.valid
        })

        return assertion_dict

    def analyse_tc_info(self, tc_info_queue: PusTcInfoQueueT):
        """
        Analyse the properties of the sent telecommands and fills them
        into a list which will be used for analysis later.
        """
        while not tc_info_queue.__len__() == 0:
            current_tc_info = tc_info_queue.pop()
            self.tc_verify_counter = self.tc_verify_counter + 1
            # For commands with multiple steps, update this value manually !
            # Only service 8 generates a step reply.
            if current_tc_info[TcDictionaryKeys.SERVICE] == 8:
                self.tc_verify_step_counter += 1
            self.tc_ssc_array.append(current_tc_info[TcDictionaryKeys.SSC])
            self.tc_service_array.append(current_tc_info[TcDictionaryKeys.SERVICE])
            self.tc_subservice_array.append(current_tc_info[TcDictionaryKeys.SUBSERVICE])

    @abstractmethod
    def analyse_tm_info(self, tm_info_queue: Deque, assertion_dict: dict):
        """
        In this function, the received TM information is analysed for correctness.
        Must be implemented by child test !
        """

    def scan_for_respective_tc(self, current_tm_info: PusTmInfoT):
        """
        this function looks whether the tc verification SSC matched
        a source sequence count of the sent TM
        """
        current_subservice = current_tm_info[TmDictionaryKeys.SUBSERVICE]
        for possible_index, search_index in enumerate(self.tc_ssc_array):
            if search_index == current_tm_info[TmDictionaryKeys.TC_SSC]:
                if current_subservice == 1:
                    self.tc_verified_start = self.tc_verified_start + 1
                elif current_subservice == 5:
                    self.tc_verified_step = self.tc_verified_step + 1
                elif current_subservice == 7:
                    self.tc_verified_completion = self.tc_verified_completion + 1

    def _perform_generic_assertion_test(self, assertion_dict: dict):
        if assertion_dict is None:
            print("Performing Generic Assertion Test: "
                  "Configuratrion error, asseretion dictionary was not passed properly")
            sys.exit()
        self.assertEqual(assertion_dict["TcStartCount"], self.tc_verify_counter)
        self.assertEqual(assertion_dict["TcCompletionCount"], self.tc_verify_counter)
        self.assertEqual(assertion_dict["EventCount"], self.event_expected)
        self.assertEqual(assertion_dict["MiscCount"], self.misc_expected)
        self.assertTrue(assertion_dict["Valid"])

    # these tests are identical
    def _perform_service5or17_test(self):
        assertion_dict = self.perform_testing_and_generate_assertion_dict()
        self.event_expected = 1
        self.misc_expected = 2
        self._perform_generic_assertion_test(assertion_dict)

    def _analyse_service5or17_tm(self, tm_info_queue: PusTmInfoQueueT, assertion_dict: dict):
        self.misc_counter = 0
        while not tm_info_queue.__len__() == 0:
            current_tm_info = tm_info_queue.pop()
            # Tc verification scanning is generic and has been moved to the superclass
            if current_tm_info[TmDictionaryKeys.SERVICE] == 1:
                self.scan_for_respective_tc(current_tm_info)
            # Here, the desired event Id or RID can be specified
            if current_tm_info[TmDictionaryKeys.SERVICE] == 5:
                if (current_tm_info[TmDictionaryKeys.EVENT_ID] == 8200 and
                        current_tm_info[TmDictionaryKeys.REPORTER_ID] == 0x51001700):
                    self.event_counter = self.event_counter + 1
            if current_tm_info[TmDictionaryKeys.SERVICE] == 17:
                self.misc_counter = self.misc_counter + 1
            if current_tm_info[TmDictionaryKeys.VALID] == 0:
                self.valid = False
        assertion_dict.update({"MiscCount": self.misc_counter})

    @classmethod
    # we could print out everything here.
    def tearDownClass(cls):
        cls.event_counter = 0
        cls.tc_verify_counter = 0
        # noinspection PyUnresolvedReferences
        cls.test_queue.clear()


class TestDummyDevice(TestService):
    """
    Test for the dummy class.
    """
    @classmethod
    def setUpClass(cls):
        super().setUpClass()
        print("Testing Dummy Device")
        pack_dummy_device_test_into(super().test_queue)

    def analyse_tm_info(self, tm_info_queue, assertion_dict):
        pass


if __name__ == '__main__':
    unittest.main()