Skip to content
Snippets Groups Projects
Forked from an inaccessible project.
OBSW_UnitTest.py 9.49 KiB
# -*- coding: utf-8 -*-
"""
Program: OBSW_UnitTest.py
Date: 01.11.2019
Description: Unit test of on-board software, used by the UDP client in software test mode

Manual:
Set up the UDP client as specified in the header comment and use the unit testing mode

For Developers:
TestService is the template method, analyseTmInfo and analyseTcInfo are implemented for
specific services or devices by setting up assertionDict entries which count received packets
based on subservice or entry values.

Example Service 17:
TC[17,1] and TC[17,128] are sent, TM[1,1] and TM[1,7] (TC verification) are expected twice, TM[17,2] (ping reply)
is expected twice and TM[5,1] (test event) is expected once. Note that the TC verification counting is
done by the template class by comparing with TC source sequence count. For PUS standalone services (PUS Service Base),
only the start and completion success need to be asserted. For PUS gateway services (PUS Commanding Service Base),
step verification needs to be asserted additionally. If there are commands with multiple steps, this needs
to be specified in the analyseTcInfo method of the child test.

@author: R. Mueller
"""
import unittest
import queue
from tc.OBSW_TcPacker import packService17TestInto, packService5TestInto, packDummyDeviceTestInto
from sendreceive.OBSW_MultipleCommandsSenderReceiver import MultipleCommandSenderReceiver
from OBSW_UdpClient import connectToBoard
from utility.OBSW_TmTcPrinter import TmtcPrinter
from comIF.OBSW_Ethernet_ComIF import EthernetComIF
import OBSW_Config as g


class TestService(unittest.TestCase):
    testQueue = queue.Queue()

    @classmethod
    def setUpClass(cls):
        cls.displayMode = "long"

        # default timeout for receiving TM, set in subclass manually
        cls.tmTimeout = 3
        # wait intervals between tc send bursts.
        # Example: [2,4] sends to send 2 tc from queue and wait, then sends another 2 and wait again
        cls.waitIntervals = []
        cls.waitTime = 2
        cls.printTm = True
        cls.printTc = True
        # default wait time between tc send bursts
        cls.waitTime = 5.0
        cls.testQueue = queue.Queue()
        cls.tcTimeoutFactor = 3.0
        cls.printFile = True
        cls.tmtcPrinter = TmtcPrinter(cls.displayMode, cls.printFile, True)
        cls.communicationInterface = EthernetComIF(cls.tmtcPrinter, cls.tmTimeout, cls.tcTimeoutFactor, g.sockSend,
                                                   g.sockReceive, g.sendAddress)
        connectToBoard()

    def performTestingAndGenerateAssertionDict(self):
        UnitTester = MultipleCommandSenderReceiver(self.communicationInterface, self.tmtcPrinter, self.testQueue,
                                                   self.tmTimeout, self.waitIntervals, self.waitTime, self.printTm,
                                                   self.tcTimeoutFactor, self.printFile)
        (tcInfoQueue, tmInfoQueue) = UnitTester.sendTcQueueAndReturnTcInfo()
        assertionDict = self.analyseTmTcInfo(tmInfoQueue, tcInfoQueue)
        return assertionDict

    def analyseTmTcInfo(self, tmInfoQueue, tcInfoQueue):
        self.tcSscArray = []
        self.tcServiceArray = []
        self.tcSubserviceArray = []

        # these number of TCs sent which need need to be verified
        # separate step counter if there is more than one step for a command !
        self.tcVerifyCounter = 0
        self.tcVerifyStepCounter = 0
        # child test implements this
        self.analyseTcInfo(tcInfoQueue)

        # These values are incremented in the analyseTmInfo function
        # and compared to the counter values
        self.tcVerifiedStart = 0
        self.tcVerifiedCompletion = 0
        self.tcVerifiedStep = 0
        # The expected values are set in child test
        self.eventCounter = 0
        self.miscCounter = 0
        self.valid = True

        assertionDict = {}
        # child test implements this and can add additional dict entries
        self.analyseTmInfo(tmInfoQueue, assertionDict)

        assertionDict.update({
            "TcStartCount": self.tcVerifiedStart,
            "TcCompletionCount": self.tcVerifiedCompletion,
            "TcStepCount": self.tcVerifiedStep,
            "EventCount": self.eventCounter,
            "MiscCount": self.miscCounter,
            "Valid": self.valid
        })

        return assertionDict

    def analyseTcInfo(self, tcInfoQueue):
        while not tcInfoQueue.empty():
            currentTcInfo = tcInfoQueue.get()
            self.tcVerifyCounter = self.tcVerifyCounter + 1
            # For commands with multiple steps, update this value manually !
            self.tcVerifyStepCounter = self.tcVerifyCounter + 1
            self.tcSscArray.append(currentTcInfo["ssc"])
            self.tcServiceArray.append(currentTcInfo["service"])
            self.tcSubserviceArray.append(currentTcInfo["subservice"])

    # this function looks whether the tc verification SSC matched
    # a SSC of the sent TM
    def scanForRespectiveTc(self, currentTmInfo):
        currentSubservice = currentTmInfo["subservice"]
        for possibleIndex, searchIndex in enumerate(self.tcSscArray):
            if searchIndex == currentTmInfo["tcSSC"]:
                if currentSubservice == 1:
                    self.tcVerifiedStart = self.tcVerifiedStart + 1
                elif currentSubservice == 5:
                    self.tcVerifiedStep = self.tcVerifiedStep + 1
                elif currentSubservice == 7:
                    self.tcVerifiedCompletion = self.tcVerifiedCompletion + 1

    # these tests are identical
    def performService5or17Test(self):
        assertionDict = self.performTestingAndGenerateAssertionDict()
        self.eventExpected = 1
        self.miscExpected = 2
        self.assertEqual(assertionDict["TcStartCount"], self.tcVerifyCounter)
        self.assertEqual(assertionDict["TcCompletionCount"], self.tcVerifyCounter)
        self.assertEqual(assertionDict["EventCount"], self.eventExpected)
        self.assertEqual(assertionDict["MiscCount"], self.miscExpected)
        self.assertTrue(assertionDict["Valid"])

    def analyseService5or17TM(self, tmInfoQueue, assertionDict):
        self.miscCounter = 0
        while not tmInfoQueue.empty():
            currentTmInfo = tmInfoQueue.get()
            # Tc verification scanning is generic and has been moved to the superclass
            if currentTmInfo["service"] == 1:
                self.scanForRespectiveTc(currentTmInfo)
            # Here, the desired event Id or RID can be specified
            if currentTmInfo["service"] == 5:
                if currentTmInfo["EventID"] == 8200 and currentTmInfo["RID"] == 0x51001700:
                    self.eventCounter = self.eventCounter + 1
            if currentTmInfo["service"] == 17:
                self.miscCounter = self.miscCounter + 1
            if currentTmInfo["valid"] == 0:
                self.valid = False
        assertionDict.update({"MiscCount": self.miscCounter})

    @classmethod
    def tearDownClass(cls):
        cls.eventCounter = 0
        cls.tcVerifyCounter = 0
        cls.testQueue.queue.clear()

    def analyseTmInfo(self, tmInfoQueue, assertionDict):
        pass


class TestService5(TestService):

    @classmethod
    def setUpClass(cls):
        super().setUpClass()
        print("Testing Service 5")
        cls.waitIntervals = [1, 2, 3]
        cls.waitTime = [1.2, 1.5, 1.2]
        packService5TestInto(cls.testQueue)

    def test_Service5(self):

        # analyseTmInfo() and analyseTcInfo are called here
        super().performService5or17Test()

    def analyseTcInfo(self, tcInfoQueue):
        super().analyseTcInfo(tcInfoQueue)

    def analyseTmInfo(self, tmInfoQueue, assertionDict):
        super().analyseService5or17TM(tmInfoQueue, assertionDict)

    @classmethod
    def tearDownClass(cls):
        super().tearDownClass()


class TestService6(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        print("Hallo Test 6!")


class TestService8(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        print("Hallo Test 2!")


class TestService9(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        print("Hallo Test 9!")


class TestService17(TestService):
    @classmethod
    def setUpClass(cls):
        super().setUpClass()
        print("Testing Service 17")
        cls.waitIntervals = [2]
        cls.waitTime = 2
        cls.tmTimeout = 2
        packService17TestInto(cls.testQueue)

    def test_Service17(self):
        super().performService5or17Test()

    def analyseTmTcInfo(self, tmInfoQueue, tcInfoQueue):
        assertionDict = super().analyseTmTcInfo(tmInfoQueue, tcInfoQueue)
        # add anything elsee other than tc verification counter and ssc that is needed for tm analysis
        return assertionDict

    def analyseTcInfo(self, tcInfoQueue):
        super().analyseTcInfo(tcInfoQueue)

    def analyseTmInfo(self, tmInfoQueue, assertionDict):
        super().analyseService5or17TM(tmInfoQueue, assertionDict)

    @classmethod
    def tearDownClass(cls):
        print("Testing Service 17 finished")
        super().tearDownClass()


class TestDummyDevice(TestService):
    @classmethod
    def setUpClass(cls):
        super().setUpClass()
        print("Testing Dummy Device")
        packDummyDeviceTestInto(cls.testQueue)


if __name__ == '__main__':
    unittest.main()