diff --git a/OBSW_TmTcClient.py b/OBSW_TmTcClient.py index 36d35f497285aba3120a1555c230cc15b663d65f..f15a84c5c31cec382957817c2492dd6a5b3fb351 100644 --- a/OBSW_TmTcClient.py +++ b/OBSW_TmTcClient.py @@ -80,7 +80,7 @@ def main(): tmtcPrinter = TmTcPrinter(g.displayMode, g.printToFile, True) tmListener = 0 communicationInterface = 0 - if g.modeId == g.modeList.GUIMode: + if g.modeId == g.ModeList.GUIMode: backend = TmTcBackend() backend.start() GUI = TmTcGUI() @@ -96,9 +96,9 @@ def main(): tmListener = TmListener( comInterface=communicationInterface, tmTimeout=g.tmTimeout, tcTimeoutFactor=g.tcSendTimeoutFactor) tmListener.start() - if g.modeId == g.modeList.ListenerMode: + if g.modeId == g.ModeList.ListenerMode: print("Listening for packages...") - elif g.modeId == g.modeList.SingleCommandMode: + elif g.modeId == g.ModeList.SingleCommandMode: pusPacketTuple = commandPreparation() SenderAndReceiver = SingleCommandSenderReceiver( comInterface=communicationInterface, tmtcPrinter=tmtcPrinter, tmListener=tmListener, @@ -106,7 +106,7 @@ def main(): doPrintToFile=g.printToFile) SenderAndReceiver.sendSingleTcAndReceiveTm() - elif g.modeId == g.modeList.ServiceTestMode: + elif g.modeId == g.ModeList.ServiceTestMode: serviceQueue = deque() SenderAndReceiver = SequentialCommandSenderReceiver( comInterface=communicationInterface, tmtcPrinter=tmtcPrinter, tmTimeout=g.tmTimeout, @@ -114,7 +114,7 @@ def main(): doPrintToFile=g.printToFile, tmListener=tmListener) SenderAndReceiver.sendQueueTcAndReceiveTmSequentially() - elif g.modeId == g.modeList.SoftwareTestMode: + elif g.modeId == g.ModeList.SoftwareTestMode: allTcQueue = createTotalTcQueue() SenderAndReceiver = SequentialCommandSenderReceiver( comInterface=communicationInterface, tmtcPrinter=tmtcPrinter, tmTimeout=g.tmTimeout, @@ -122,7 +122,7 @@ def main(): tmListener=tmListener) SenderAndReceiver.sendQueueTcAndReceiveTmSequentially() - elif g.modeId == g.modeList.UnitTest: + elif g.modeId == g.ModeList.UnitTest: """ Set up test suite and run it with runner. Verbosity specifies detail level """ diff --git a/config/OBSW_Config.py b/config/OBSW_Config.py index 1466506e9f8f3f14294d392c948afc5d4e50784b..4bb665d56fd1ab8d9a0c1b16dd0471b844e7ecd4 100644 --- a/config/OBSW_Config.py +++ b/config/OBSW_Config.py @@ -17,7 +17,7 @@ ethernetAddressT = Tuple[str, int] # Mode options, set by args parser -class modeList(enum.Enum): +class ModeList(enum.Enum): GUIMode = 0 ListenerMode = 1 SingleCommandMode = 2 @@ -69,6 +69,13 @@ printToFile = False printHkData = False printRawTmData = False +""" +These objects are set for the Unit Test, no better solution found yet +""" +tmListener = None +comInterface = None +tmtcPrinter = None + # noinspection PyUnusedLocal def setGlobals(args): @@ -90,20 +97,20 @@ def setGlobals(args): sendAddressToSet = (args.boardIP, portSend) if 0 <= args.mode <= 5: if args.mode == 0: - modeId = modeList.GUIMode + modeId = ModeList.GUIMode elif args.mode == 1: - modeId = modeList.ListenerMode + modeId = ModeList.ListenerMode elif args.mode == 2: - modeId = modeList.SingleCommandMode + modeId = ModeList.SingleCommandMode elif args.mode == 3: - modeId = modeList.ServiceTestMode + modeId = ModeList.ServiceTestMode elif args.mode == 4: - modeId = modeList.SoftwareTestMode + modeId = ModeList.SoftwareTestMode elif args.mode == 5: - modeId = modeList.UnitTest + modeId = ModeList.UnitTest else: print("Invalid mode argument, setting default mode (1)") - modeId = modeList[1] + modeId = ModeList[1] service = str(args.service) if service.isdigit(): service = int(args.service) @@ -122,6 +129,3 @@ def setGlobals(args): displayMode = displayMode service = service printToFile = args.printFile - tmListener = None - comInterface = None - tmtcPrinter = None diff --git a/sendreceive/OBSW_MultipleCommandsSenderReceiver.py b/sendreceive/OBSW_MultipleCommandsSenderReceiver.py index affdd64e33cd67ca533c69dd6c9747547291f94b..4c5b4d106ef7e1f533c7bbe475c3a5f4279fc9c2 100644 --- a/sendreceive/OBSW_MultipleCommandsSenderReceiver.py +++ b/sendreceive/OBSW_MultipleCommandsSenderReceiver.py @@ -14,6 +14,7 @@ from sendreceive.OBSW_SequentialSenderReceiver import SequentialCommandSenderRec from comIF.OBSW_ComInterface import ComIF_T from utility.OBSW_TmTcPrinter import TmTcPrinterT from sendreceive.OBSW_TmListener import TmListenerT +import config.OBSW_Config as g class MultipleCommandSenderReceiver(SequentialCommandSenderReceiver): @@ -48,21 +49,21 @@ class MultipleCommandSenderReceiver(SequentialCommandSenderReceiver): self.pusPacket = [] self.waitCounter = 0 - def sendTcQueueAndReturnTcInfo(self): - # receiverThread = threading.Thread(target=self.checkForMultipleReplies) - # receiverThread.start() - # time.sleep(0.5) + def sendTcQueueAndReturnInfo(self): try: + self._tmListener.modeId = g.ModeList.UnitTest + self._tmListener.modeChangeEvent.set() + # TC info queue is set in this function self.sendAllQueue() self.handleLastRepliesListening(self.tmTimeout/2.0) + self.tmInfoQueue = self.retrieveListenerTmInfoQueue() self._tmListener.modeOpFinished.set() - self.tmInfoQueue = self.retrieveTmInfoQueue() - # self.handleTcResending() Turned off for now, not needed if self.doPrintToFile: self._tmtcPrinter.printToFile() except (KeyboardInterrupt, SystemExit): print("Keyboard Interrupt or System Exit detected") exit() + return self.tcInfoQueue, self.tmInfoQueue def handleTcResending(self): @@ -91,41 +92,22 @@ class MultipleCommandSenderReceiver(SequentialCommandSenderReceiver): else: time.sleep(self.waitTime) - def retrieveTmInfoQueue(self): + def retrieveListenerTmInfoQueue(self): if self._tmListener.replyEvent.is_set(): return self._tmListener.tmInfoQueue else: print("Multiple Command SenderReceiver: Configuration error, reply event not set in TM listener") + def clearListenerTmInfoQueue(self): + self._tmListener.tmInfoQueue.clear() + @staticmethod def handleLastRepliesListening(waitTime: float): elapsed_time_seconds = 0 start_time = time.time() while elapsed_time_seconds < waitTime: elapsed_time_seconds = time.time() - start_time - # def checkForMultipleReplies(self): - # super().checkForMultipleReplies() - - # def checkForMultipleReplies(self): - # while not self.allRepliesReceived and self.run_event.is_set(): - # # listen for duration timeoutInSeconds for replies - # self.handleReplyListening() - - # def handleReplyListening(self): - # tmReady = self.comInterface.dataAvailable(2.0) - # if tmReady: - # self.receiveTelemetryAndStoreInformation() - # if self.tcQueue.empty(): - # print("TC queue empty. Listening for a few more seconds ... ") - # start_time = time.time() - # self.handleLastRepliesListening(start_time) - # self.allRepliesReceived = True - - # def receiveTelemetryAndStoreInformation(self): - # packetList = self.comInterface.receiveTelemetry() - # for counter in range(0, len(packetList)): - # tmInfo = packetList[counter].packTmInformation() - # self.tmInfoQueue.append(tmInfo) + diff --git a/sendreceive/OBSW_SequentialSenderReceiver.py b/sendreceive/OBSW_SequentialSenderReceiver.py index a39966eeff11654cb1ba7fbaaf62994baa636661..9d09c1eabd1456dfc08108e0e83651d2472c408e 100644 --- a/sendreceive/OBSW_SequentialSenderReceiver.py +++ b/sendreceive/OBSW_SequentialSenderReceiver.py @@ -43,7 +43,7 @@ class SequentialCommandSenderReceiver(CommandSenderReceiver): self.abortFlag = False def sendQueueTcAndReceiveTmSequentially(self): - self._tmListener.modeId = g.modeList.ServiceTestMode + self._tmListener.modeId = g.ModeList.ServiceTestMode self._tmListener.modeChangeEvent.set() self.sendAndReceiveFirstPacket() # this flag is set in the separate thread ! diff --git a/sendreceive/OBSW_SingleCommandSenderReceiver.py b/sendreceive/OBSW_SingleCommandSenderReceiver.py index 8280333ac196c33893b8aaaa2b5866ed5af3a9d0..e95d9132c3fef6018f192467682fe34ca77caac9 100644 --- a/sendreceive/OBSW_SingleCommandSenderReceiver.py +++ b/sendreceive/OBSW_SingleCommandSenderReceiver.py @@ -39,7 +39,7 @@ class SingleCommandSenderReceiver(CommandSenderReceiver): exit() def sendSingleTcAndReceiveTm(self): - self._tmListener.modeId = g.modeList.SingleCommandMode + self._tmListener.modeId = g.ModeList.SingleCommandMode self._tmListener.modeChangeEvent.set() self._comInterface.sendTelecommand(self.pusPacket, self.pusPacketInfo) while not self.replyReceived: diff --git a/sendreceive/OBSW_TmListener.py b/sendreceive/OBSW_TmListener.py index 583130e7f05120a464e349720817311d3f04c218..6d8321c5ea61951e1b4f0a445850c23d41126830 100644 --- a/sendreceive/OBSW_TmListener.py +++ b/sendreceive/OBSW_TmListener.py @@ -31,7 +31,7 @@ class TmListener: self.tcTimeoutFactor = tcTimeoutFactor self.comInterface = comInterface # this will be the default mode (listener mode) - self.modeId = g.modeList.ListenerMode + self.modeId = g.ModeList.ListenerMode self.listenerActive = threading.Event() # TM Listener operations can be suspended by setting this flag self.listenerActive.set() # I don't think a listener is useful without the main program, so we might just declare it daemonic. @@ -67,7 +67,8 @@ class TmListener: self.modeChangeEvent.clear() while not self.modeOpFinished.is_set(): self.performModeOperation() - self.modeId = g.modeList.ListenerMode + self.modeOpFinished.clear() + self.modeId = g.ModeList.ListenerMode def performModeOperation(self): """ @@ -76,21 +77,21 @@ class TmListener: :return: """ # Listener Mode - if self.modeId == g.modeList.ListenerMode: + if self.modeId == g.ModeList.ListenerMode: pass # Single Command Mode - elif self.modeId == g.modeList.SingleCommandMode: + elif self.modeId == g.ModeList.SingleCommandMode: # Listen for one reply sequence. if self.checkForOneTelemetrySequence(): # Set reply event, will be cleared by checkForFirstReply() self.replyEvent.set() # Sequential Command Mode - elif self.modeId == g.modeList.ServiceTestMode or self.modeId == g.modeList.SoftwareTestMode: + elif self.modeId == g.ModeList.ServiceTestMode or self.modeId == g.ModeList.SoftwareTestMode: if self.checkForOneTelemetrySequence(): print("TM Listener: Reply sequence received!") self.replyEvent.set() - elif self.modeId == g.modeList.UnitTest: - self.tmInfoQueue = self.comInterface.receiveTelemetryAndStoreTuple(self.tmInfoQueue) + elif self.modeId == g.ModeList.UnitTest: + self.tmInfoQueue = self.comInterface.receiveTelemetryAndStoreInfo(self.tmInfoQueue) self.replyEvent.set() def checkForOneTelemetrySequence(self) -> bool: diff --git a/tc/OBSW_TcPacket.py b/tc/OBSW_TcPacket.py index cc810d8fc43c36db72cf07f3e738e4b96d782af7..f23cbf319da0fbd6d79778c6da0606c75903aad0 100644 --- a/tc/OBSW_TcPacket.py +++ b/tc/OBSW_TcPacket.py @@ -15,6 +15,7 @@ pusTcTupleT = Tuple[pusTcInfoT, bytearray] tcAuxiliaryTupleT = Tuple[str, any] # Union: Type can be either of those 2 tcQueueT = Deque[Union[tcAuxiliaryTupleT, pusTcTupleT]] +pusTcInfoQueueT = Deque[pusTcInfoT] class PUSTelecommand: diff --git a/test/OBSW_PusServiceTest.py b/test/OBSW_PusServiceTest.py index 937f4f1df030f51bab503dfe985c5cb6d92d7ff4..b73404fd657d0751243e8b5b1ecf88c7fa0b000b 100644 --- a/test/OBSW_PusServiceTest.py +++ b/test/OBSW_PusServiceTest.py @@ -1,6 +1,6 @@ import unittest -from test.OBSW_UnitTest import TestService -from tm.OBSW_PusTm import pusTmInfoQueueT +from test.OBSW_UnitTest import TestService, pusTmInfoQueueT +from tc.OBSW_TcPacket import pusTcInfoQueueT from tc.OBSW_TcPacker import packService17TestInto, packService5TestInto, packService2TestInto import config.OBSW_Config as g @@ -60,10 +60,10 @@ class TestService5(TestService): # analyseTmInfo() and analyseTcInfo are called here super().performService5or17Test() - def analyseTcInfo(self, tcInfoQueue): + def analyseTcInfo(self, tcInfoQueue: pusTcInfoQueueT): super().analyseTcInfo(tcInfoQueue) - def analyseTmInfo(self, tmInfoQueue, assertionDict): + def analyseTmInfo(self, tmInfoQueue: pusTmInfoQueueT, assertionDict: dict): super().analyseService5or17TM(tmInfoQueue, assertionDict) @classmethod @@ -102,15 +102,15 @@ class TestService17(TestService): def test_Service17(self): super().performService5or17Test() - def analyseTmTcInfo(self, tmInfoQueue, tcInfoQueue): - assertionDict = super().analyseTmTcInfo(tmInfoQueue, tcInfoQueue) + def analyseTmTcInfo(self, tmInfoQueue: pusTmInfoQueueT, tcInfoQueue: pusTcInfoQueueT): + assertionDict = super().analyseTmTcInfo(tmInfoQueue=tmInfoQueue, tcInfoQueue=tcInfoQueue) # add anything elsee other than tc verification counter and ssc that is needed for tm analysis return assertionDict def analyseTcInfo(self, tcInfoQueue): super().analyseTcInfo(tcInfoQueue) - def analyseTmInfo(self, tmInfoQueue, assertionDict): + def analyseTmInfo(self, tmInfoQueue: pusTmInfoQueueT, assertionDict: dict): super().analyseService5or17TM(tmInfoQueue, assertionDict) @classmethod diff --git a/test/OBSW_UnitTest.py b/test/OBSW_UnitTest.py index 074171517eb6d76cb5956d4fa9005380d9ee9b52..c4a4714619e761e9ed2429b1d5485484a05974b2 100644 --- a/test/OBSW_UnitTest.py +++ b/test/OBSW_UnitTest.py @@ -1,7 +1,10 @@ """ Program: OBSW_UnitTest.py Date: 01.11.2019 -Description: Unit test of on-board software, used by the TMTC client in software test mode +Description: Unit/High-Level test of on-board software, used by the TMTC client in software test mode +It is very difficult to execute Unit Tests on embedded hardware. The most significant functonalities for satellites +are commanding and TM reception. As such, these functionalities will be tested by sending TCs and checking +the expected Telemetry. Manual: Set up the TMTC client as specified in the header comment and use the unit testing mode @@ -34,6 +37,9 @@ from tm.OBSW_TmService1 import pusPacketInfoService1T from abc import abstractmethod from sendreceive.OBSW_MultipleCommandsSenderReceiver import MultipleCommandSenderReceiver from config import OBSW_Config as g +from tm.OBSW_PusTm import pusTmInfoQueueT, pusTmInfoT +from tc.OBSW_TcPacket import pusTcInfoQueueT +import pprint as pp TmInfoQueueService1T = Deque[pusPacketInfoService1T] @@ -52,9 +58,6 @@ class TestService(unittest.TestCase): :return: """ cls._displayMode = "long" - - # default timeout for receiving TM, set in subclass manually - cls.tmTimeout = g.tmTimeout # wait intervals between tc send bursts. # Example: [2,4] sends to send 2 tc from queue and wait, then sends another 2 and wait again cls.waitIntervals = [] @@ -64,18 +67,12 @@ class TestService(unittest.TestCase): # default wait time between tc send bursts cls.waitTime = 5.0 cls.testQueue = deque() + # Extremely ugly solution so that we can use the Python Unit Test Framework + # default timeout for receiving TM, set in subclass manually + cls.tmTimeout = g.tmTimeout cls.tcTimeoutFactor = g.tcSendTimeoutFactor cls.printFile = g.printToFile cls.tmtcPrinter = g.tmtcPrinter - # cls.communicationInterface = None - # if g.comIF == 0: - # cls.communicationInterface = EthernetComIF( - # tmtcPrinter=cls.tmtcPrinter, tmTimeout=g.tmTimeout, tcTimeoutFactor=g.tcSendTimeoutFactor, - # sendAddress=g.sendAddress, receiveAddress=g.recAddress) - # else: - # baudRate = 115200 - # cls.communicationInterface = SerialComIF( - # tmtcPrinter=cls.tmtcPrinter, comPort=g.comPort, baudRate=baudRate, serialTimeout=g.serialTimeout) cls.tmListener = g.tmListener cls.communicationInterface = g.comInterface # connectToBoard() @@ -83,16 +80,16 @@ class TestService(unittest.TestCase): # This function should be called in each individual test to send the actual telecommands # which are stored inside testQueue def performTestingAndGenerateAssertionDict(self): - print(self.communicationInterface) UnitTester = MultipleCommandSenderReceiver( comInterface=self.communicationInterface, tmtcPrinter=self.tmtcPrinter, tmListener=self.tmListener, tcQueue=self.testQueue, tmTimeout=self.tmTimeout, waitIntervals=self.waitIntervals, waitTime=self.waitTime, printTm=self.printTm, tcTimeoutFactor=self.tcTimeoutFactor, doPrintToFile=self.printFile) - (tcInfoQueue, tmInfoQueue) = UnitTester.sendTcQueueAndReturnTcInfo() + (tcInfoQueue, tmInfoQueue) = UnitTester.sendTcQueueAndReturnInfo() assertionDict = self.analyseTmTcInfo(tmInfoQueue, tcInfoQueue) + # UnitTester.clearListenerTmInfoQueue() return assertionDict - def analyseTmTcInfo(self, tmInfoQueue: Deque, tcInfoQueue: Deque) -> dict: + def analyseTmTcInfo(self, tmInfoQueue: pusTmInfoQueueT, tcInfoQueue: pusTcInfoQueueT) -> dict: self.tcSscArray = [] self.tcServiceArray = [] self.tcSubserviceArray = [] @@ -130,7 +127,7 @@ class TestService(unittest.TestCase): return assertionDict - def analyseTcInfo(self, tcInfoQueue: Deque): + def analyseTcInfo(self, tcInfoQueue: pusTcInfoQueueT): while not tcInfoQueue.__len__() == 0: currentTcInfo = tcInfoQueue.pop() self.tcVerifyCounter = self.tcVerifyCounter + 1 @@ -147,7 +144,7 @@ class TestService(unittest.TestCase): # this function looks whether the tc verification SSC matched # a SSC of the sent TM - def scanForRespectiveTc(self, currentTmInfo: dict): + def scanForRespectiveTc(self, currentTmInfo: pusTmInfoT): currentSubservice = currentTmInfo["subservice"] for possibleIndex, searchIndex in enumerate(self.tcSscArray): if searchIndex == currentTmInfo["tcSSC"]: @@ -176,7 +173,7 @@ class TestService(unittest.TestCase): self.miscExpected = 2 self.performGenericAssertionTest(assertionDict) - def analyseService5or17TM(self, tmInfoQueue: Deque, assertionDict: dict): + def analyseService5or17TM(self, tmInfoQueue: pusTmInfoQueueT, assertionDict: dict): self.miscCounter = 0 while not tmInfoQueue.__len__() == 0: currentTmInfo = tmInfoQueue.pop()