Something went wrong on our end
Forked from an inaccessible project.
-
Robin.Mueller authoredRobin.Mueller authored
OBSW_UnitTest.py 9.49 KiB
# -*- coding: utf-8 -*-
"""
Program: OBSW_UnitTest.py
Date: 01.11.2019
Description: Unit test of on-board software, used by the UDP client in software test mode
Manual:
Set up the UDP client as specified in the header comment and use the unit testing mode
For Developers:
TestService is the template method, analyseTmInfo and analyseTcInfo are implemented for
specific services or devices by setting up assertionDict entries which count received packets
based on subservice or entry values.
Example Service 17:
TC[17,1] and TC[17,128] are sent, TM[1,1] and TM[1,7] (TC verification) are expected twice, TM[17,2] (ping reply)
is expected twice and TM[5,1] (test event) is expected once. Note that the TC verification counting is
done by the template class by comparing with TC source sequence count. For PUS standalone services (PUS Service Base),
only the start and completion success need to be asserted. For PUS gateway services (PUS Commanding Service Base),
step verification needs to be asserted additionally. If there are commands with multiple steps, this needs
to be specified in the analyseTcInfo method of the child test.
@author: R. Mueller
"""
import unittest
import queue
from tc.OBSW_TcPacker import packService17TestInto, packService5TestInto, packDummyDeviceTestInto
from sendreceive.OBSW_MultipleCommandsSenderReceiver import MultipleCommandSenderReceiver
from OBSW_UdpClient import connectToBoard
from utility.OBSW_TmTcPrinter import TmtcPrinter
from comIF.OBSW_Ethernet_ComIF import EthernetComIF
import OBSW_Config as g
class TestService(unittest.TestCase):
testQueue = queue.Queue()
@classmethod
def setUpClass(cls):
cls.displayMode = "long"
# default timeout for receiving TM, set in subclass manually
cls.tmTimeout = 3
# wait intervals between tc send bursts.
# Example: [2,4] sends to send 2 tc from queue and wait, then sends another 2 and wait again
cls.waitIntervals = []
cls.waitTime = 2
cls.printTm = True
cls.printTc = True
# default wait time between tc send bursts
cls.waitTime = 5.0
cls.testQueue = queue.Queue()
cls.tcTimeoutFactor = 3.0
cls.printFile = True
cls.tmtcPrinter = TmtcPrinter(cls.displayMode, cls.printFile, True)
cls.communicationInterface = EthernetComIF(cls.tmtcPrinter, cls.tmTimeout, cls.tcTimeoutFactor, g.sockSend,
g.sockReceive, g.sendAddress)
connectToBoard()
def performTestingAndGenerateAssertionDict(self):
UnitTester = MultipleCommandSenderReceiver(self.communicationInterface, self.tmtcPrinter, self.testQueue,
self.tmTimeout, self.waitIntervals, self.waitTime, self.printTm,
self.tcTimeoutFactor, self.printFile)
(tcInfoQueue, tmInfoQueue) = UnitTester.sendTcQueueAndReturnTcInfo()
assertionDict = self.analyseTmTcInfo(tmInfoQueue, tcInfoQueue)
return assertionDict
def analyseTmTcInfo(self, tmInfoQueue, tcInfoQueue):
self.tcSscArray = []
self.tcServiceArray = []
self.tcSubserviceArray = []
# these number of TCs sent which need need to be verified
# separate step counter if there is more than one step for a command !
self.tcVerifyCounter = 0
self.tcVerifyStepCounter = 0
# child test implements this
self.analyseTcInfo(tcInfoQueue)
# These values are incremented in the analyseTmInfo function
# and compared to the counter values
self.tcVerifiedStart = 0
self.tcVerifiedCompletion = 0
self.tcVerifiedStep = 0
# The expected values are set in child test
self.eventCounter = 0
self.miscCounter = 0
self.valid = True
assertionDict = {}
# child test implements this and can add additional dict entries
self.analyseTmInfo(tmInfoQueue, assertionDict)
assertionDict.update({
"TcStartCount": self.tcVerifiedStart,
"TcCompletionCount": self.tcVerifiedCompletion,
"TcStepCount": self.tcVerifiedStep,
"EventCount": self.eventCounter,
"MiscCount": self.miscCounter,
"Valid": self.valid
})
return assertionDict
def analyseTcInfo(self, tcInfoQueue):
while not tcInfoQueue.empty():
currentTcInfo = tcInfoQueue.get()
self.tcVerifyCounter = self.tcVerifyCounter + 1
# For commands with multiple steps, update this value manually !
self.tcVerifyStepCounter = self.tcVerifyCounter + 1
self.tcSscArray.append(currentTcInfo["ssc"])
self.tcServiceArray.append(currentTcInfo["service"])
self.tcSubserviceArray.append(currentTcInfo["subservice"])
# this function looks whether the tc verification SSC matched
# a SSC of the sent TM
def scanForRespectiveTc(self, currentTmInfo):
currentSubservice = currentTmInfo["subservice"]
for possibleIndex, searchIndex in enumerate(self.tcSscArray):
if searchIndex == currentTmInfo["tcSSC"]:
if currentSubservice == 1:
self.tcVerifiedStart = self.tcVerifiedStart + 1
elif currentSubservice == 5:
self.tcVerifiedStep = self.tcVerifiedStep + 1
elif currentSubservice == 7:
self.tcVerifiedCompletion = self.tcVerifiedCompletion + 1
# these tests are identical
def performService5or17Test(self):
assertionDict = self.performTestingAndGenerateAssertionDict()
self.eventExpected = 1
self.miscExpected = 2
self.assertEqual(assertionDict["TcStartCount"], self.tcVerifyCounter)
self.assertEqual(assertionDict["TcCompletionCount"], self.tcVerifyCounter)
self.assertEqual(assertionDict["EventCount"], self.eventExpected)
self.assertEqual(assertionDict["MiscCount"], self.miscExpected)
self.assertTrue(assertionDict["Valid"])
def analyseService5or17TM(self, tmInfoQueue, assertionDict):
self.miscCounter = 0
while not tmInfoQueue.empty():
currentTmInfo = tmInfoQueue.get()
# Tc verification scanning is generic and has been moved to the superclass
if currentTmInfo["service"] == 1:
self.scanForRespectiveTc(currentTmInfo)
# Here, the desired event Id or RID can be specified
if currentTmInfo["service"] == 5:
if currentTmInfo["EventID"] == 8200 and currentTmInfo["RID"] == 0x51001700:
self.eventCounter = self.eventCounter + 1
if currentTmInfo["service"] == 17:
self.miscCounter = self.miscCounter + 1
if currentTmInfo["valid"] == 0:
self.valid = False
assertionDict.update({"MiscCount": self.miscCounter})
@classmethod
def tearDownClass(cls):
cls.eventCounter = 0
cls.tcVerifyCounter = 0
cls.testQueue.queue.clear()
def analyseTmInfo(self, tmInfoQueue, assertionDict):
pass
class TestService5(TestService):
@classmethod
def setUpClass(cls):
super().setUpClass()
print("Testing Service 5")
cls.waitIntervals = [1, 2, 3]
cls.waitTime = [1.2, 1.5, 1.2]
packService5TestInto(cls.testQueue)
def test_Service5(self):
# analyseTmInfo() and analyseTcInfo are called here
super().performService5or17Test()
def analyseTcInfo(self, tcInfoQueue):
super().analyseTcInfo(tcInfoQueue)
def analyseTmInfo(self, tmInfoQueue, assertionDict):
super().analyseService5or17TM(tmInfoQueue, assertionDict)
@classmethod
def tearDownClass(cls):
super().tearDownClass()
class TestService6(unittest.TestCase):
@classmethod
def setUpClass(cls):
print("Hallo Test 6!")
class TestService8(unittest.TestCase):
@classmethod
def setUpClass(cls):
print("Hallo Test 2!")
class TestService9(unittest.TestCase):
@classmethod
def setUpClass(cls):
print("Hallo Test 9!")
class TestService17(TestService):
@classmethod
def setUpClass(cls):
super().setUpClass()
print("Testing Service 17")
cls.waitIntervals = [2]
cls.waitTime = 2
cls.tmTimeout = 2
packService17TestInto(cls.testQueue)
def test_Service17(self):
super().performService5or17Test()
def analyseTmTcInfo(self, tmInfoQueue, tcInfoQueue):
assertionDict = super().analyseTmTcInfo(tmInfoQueue, tcInfoQueue)
# add anything elsee other than tc verification counter and ssc that is needed for tm analysis
return assertionDict
def analyseTcInfo(self, tcInfoQueue):
super().analyseTcInfo(tcInfoQueue)
def analyseTmInfo(self, tmInfoQueue, assertionDict):
super().analyseService5or17TM(tmInfoQueue, assertionDict)
@classmethod
def tearDownClass(cls):
print("Testing Service 17 finished")
super().tearDownClass()
class TestDummyDevice(TestService):
@classmethod
def setUpClass(cls):
super().setUpClass()
print("Testing Dummy Device")
packDummyDeviceTestInto(cls.testQueue)
if __name__ == '__main__':
unittest.main()