diff --git a/OBSW_TmTcClient.py b/OBSW_TmTcClient.py index a1c9c9069007ed9e0505c3db211f338a98ca9e4d..36d35f497285aba3120a1555c230cc15b663d65f 100644 --- a/OBSW_TmTcClient.py +++ b/OBSW_TmTcClient.py @@ -52,6 +52,7 @@ S. Gaisser, J. Meier, R. Mueller """ import atexit +import time from collections import deque import unittest import logging @@ -90,6 +91,7 @@ def main(): exit() else: communicationInterface = setCommunicationInterface(tmtcPrinter) + print(communicationInterface) atexit.register(keyboardInterruptHandler, comInterface=communicationInterface) tmListener = TmListener( comInterface=communicationInterface, tmTimeout=g.tmTimeout, tcTimeoutFactor=g.tcSendTimeoutFactor) @@ -125,6 +127,9 @@ def main(): Set up test suite and run it with runner. Verbosity specifies detail level """ # noinspection PyTypeChecker + g.tmListener = tmListener + g.comInterface = communicationInterface + g.tmtcPrinter = tmtcPrinter suite = unittest.TestLoader().loadTestsFromModule(OBSW_PusServiceTest) unittest.TextTestRunner(verbosity=2).run(suite) @@ -155,20 +160,19 @@ def setCommunicationInterface(tmtcPrinter: TmTcPrinterT) -> ComIF_T: :param tmtcPrinter: TmTcPrinter object. :return: CommunicationInterface object """ - communicationInterface = None try: - if g.comIF == 0 and g.modeId != g.modeList.UnitTest: + if g.comIF == 0: communicationInterface = EthernetComIF( tmtcPrinter=tmtcPrinter, tmTimeout=g.tmTimeout, tcTimeoutFactor=g.tcSendTimeoutFactor, sendAddress=g.sendAddress, receiveAddress=g.recAddress) - elif g.modeId != g.modeList.UnitTest: + else: comPort = g.comPort baudRate = 115200 g.serialTimeout = 0.05 communicationInterface = SerialComIF(tmtcPrinter=tmtcPrinter, comPort=comPort, baudRate=baudRate, serialTimeout=g.serialTimeout) - else: - pass + # else: + # pass return communicationInterface except (IOError, OSError): print("Error setting up communication interface") diff --git a/comIF/OBSW_Serial_ComIF.py b/comIF/OBSW_Serial_ComIF.py index 57a38672ac87e510475122b52cb6629a460c5bed..ce14c6c9ca5fb2989f27aacfd428cf5327a4ff27 100644 --- a/comIF/OBSW_Serial_ComIF.py +++ b/comIF/OBSW_Serial_ComIF.py @@ -8,7 +8,7 @@ import time import serial import logging -from typing import Tuple, List, Union +from typing import Tuple, List, Union, Optional from comIF.OBSW_ComInterface import CommunicationInterface, pusTmQueueT from utility.OBSW_TmTcPrinter import TmTcPrinterT from tm.OBSW_TmPacket import PUSTelemetryFactory @@ -73,7 +73,7 @@ class SerialComIF(CommunicationInterface): elapsed_time = time.time() - start_time return False - def pollPusPackets(self) -> Tuple[List[bytearray], int]: + def pollPusPackets(self) -> Tuple[List[Optional[bytes]], int]: pusDataList = [] self.data = self.serial.read(1024) packetSize = (self.data[4] << 8 | self.data[5]) + 7 diff --git a/config/OBSW_Config.py b/config/OBSW_Config.py index 4088c2699469d0859ff2f5cdc914b0724d315b25..1466506e9f8f3f14294d392c948afc5d4e50784b 100644 --- a/config/OBSW_Config.py +++ b/config/OBSW_Config.py @@ -122,3 +122,6 @@ def setGlobals(args): displayMode = displayMode service = service printToFile = args.printFile + tmListener = None + comInterface = None + tmtcPrinter = None diff --git a/sendreceive/OBSW_MultipleCommandsSenderReceiver.py b/sendreceive/OBSW_MultipleCommandsSenderReceiver.py index f9243b3123fc8b4b3303c5845319590eb049ee85..affdd64e33cd67ca533c69dd6c9747547291f94b 100644 --- a/sendreceive/OBSW_MultipleCommandsSenderReceiver.py +++ b/sendreceive/OBSW_MultipleCommandsSenderReceiver.py @@ -37,8 +37,8 @@ class MultipleCommandSenderReceiver(SequentialCommandSenderReceiver): :param tcTimeoutFactor: :param doPrintToFile: """ - super().__init__(comInterface=comInterface, tmtcPrinter=tmtcPrinter, tmListener=tmListener,tmTimeout=tmTimeout, - tcQueue=tcQueue,tcTimeoutFactor=tcTimeoutFactor, doPrintToFile=doPrintToFile) + super().__init__(comInterface=comInterface, tmtcPrinter=tmtcPrinter, tmListener=tmListener, tmTimeout=tmTimeout, + tcQueue=tcQueue, tcTimeoutFactor=tcTimeoutFactor, doPrintToFile=doPrintToFile) self.waitIntervals = waitIntervals self.waitTime = waitTime self.printTm = printTm @@ -56,7 +56,7 @@ class MultipleCommandSenderReceiver(SequentialCommandSenderReceiver): self.sendAllQueue() self.handleLastRepliesListening(self.tmTimeout/2.0) self._tmListener.modeOpFinished.set() - self.tmInfoQueue = self._tmListener.retrieveTmInfoQueue() + self.tmInfoQueue = self.retrieveTmInfoQueue() # self.handleTcResending() Turned off for now, not needed if self.doPrintToFile: self._tmtcPrinter.printToFile() @@ -73,7 +73,7 @@ class MultipleCommandSenderReceiver(SequentialCommandSenderReceiver): self.checkForTimeout() def sendAllQueue(self): - while not self.tcQueue.empty(): + while not self.tcQueue.__len__() == 0: self.sendAndPrintTc() def sendAndPrintTc(self): diff --git a/sendreceive/OBSW_TmListener.py b/sendreceive/OBSW_TmListener.py index 22e356a35e93c441e49a9e03c6fbdb3e53db372a..583130e7f05120a464e349720817311d3f04c218 100644 --- a/sendreceive/OBSW_TmListener.py +++ b/sendreceive/OBSW_TmListener.py @@ -91,6 +91,7 @@ class TmListener: self.replyEvent.set() elif self.modeId == g.modeList.UnitTest: self.tmInfoQueue = self.comInterface.receiveTelemetryAndStoreTuple(self.tmInfoQueue) + self.replyEvent.set() def checkForOneTelemetrySequence(self) -> bool: """ diff --git a/test/OBSW_PusServiceTest.py b/test/OBSW_PusServiceTest.py index 0583ce939cb0c33cf8a55cd66222ef1ae8bc6db7..937f4f1df030f51bab503dfe985c5cb6d92d7ff4 100644 --- a/test/OBSW_PusServiceTest.py +++ b/test/OBSW_PusServiceTest.py @@ -13,7 +13,7 @@ class TestService2(TestService): # all commands must be sent sequentially, not as a burst cls.waitIntervals = [1, 2, 3, 4] cls.waitTime = [2, 2, 2, 2] - packService2TestInto(super().testQueue) + packService2TestInto(cls.testQueue) def test_Service2(self): assertionDict = self.performTestingAndGenerateAssertionDict() @@ -54,7 +54,7 @@ class TestService5(TestService): # This is required because the OBSW tasks runs with fixed sequences cls.waitIntervals = [1, 2, 3] cls.waitTime = [1.5, 2.0, 2.0] - packService5TestInto(super().testQueue) + packService5TestInto(cls.testQueue) def test_Service5(self): # analyseTmInfo() and analyseTcInfo are called here @@ -97,7 +97,7 @@ class TestService17(TestService): cls.waitIntervals = [2] cls.waitTime = 2 cls.tmTimeout = g.tmTimeout - packService17TestInto(super().testQueue) + packService17TestInto(cls.testQueue) def test_Service17(self): super().performService5or17Test() diff --git a/test/OBSW_UnitTest.py b/test/OBSW_UnitTest.py index fbbdc7e44153f065afcacd35fef2e390a2d07dac..074171517eb6d76cb5956d4fa9005380d9ee9b52 100644 --- a/test/OBSW_UnitTest.py +++ b/test/OBSW_UnitTest.py @@ -31,12 +31,8 @@ from typing import Deque from tc.OBSW_TcPacker import packDummyDeviceTestInto from tm.OBSW_TmService1 import pusPacketInfoService1T -from sendreceive.OBSW_TmListener import TmListener from abc import abstractmethod from sendreceive.OBSW_MultipleCommandsSenderReceiver import MultipleCommandSenderReceiver -from utility.OBSW_TmTcPrinter import TmTcPrinter -from comIF.OBSW_Ethernet_ComIF import EthernetComIF -from comIF.OBSW_Serial_ComIF import SerialComIF from config import OBSW_Config as g TmInfoQueueService1T = Deque[pusPacketInfoService1T] @@ -70,26 +66,27 @@ class TestService(unittest.TestCase): cls.testQueue = deque() cls.tcTimeoutFactor = g.tcSendTimeoutFactor cls.printFile = g.printToFile - cls.tmtcPrinter = TmTcPrinter(cls._displayMode, cls.printFile, True) - cls.communicationInterface = None - if g.comIF == 0: - cls.communicationInterface = EthernetComIF( - tmtcPrinter=cls.tmtcPrinter, tmTimeout=g.tmTimeout, tcTimeoutFactor=g.tcSendTimeoutFactor, - sendAddress=g.sendAddress, receiveAddress=g.recAddress) - else: - baudRate = 115200 - cls.communicationInterface = SerialComIF( - tmtcPrinter=cls.tmtcPrinter, comPort=g.comPort, baudRate=baudRate, serialTimeout=g.serialTimeout) - cls.tmListener = TmListener( - comInterface=cls.communicationInterface, tmTimeout=g.tmTimeout, tcTimeoutFactor=cls.tcTimeoutFactor) + cls.tmtcPrinter = g.tmtcPrinter + # cls.communicationInterface = None + # if g.comIF == 0: + # cls.communicationInterface = EthernetComIF( + # tmtcPrinter=cls.tmtcPrinter, tmTimeout=g.tmTimeout, tcTimeoutFactor=g.tcSendTimeoutFactor, + # sendAddress=g.sendAddress, receiveAddress=g.recAddress) + # else: + # baudRate = 115200 + # cls.communicationInterface = SerialComIF( + # tmtcPrinter=cls.tmtcPrinter, comPort=g.comPort, baudRate=baudRate, serialTimeout=g.serialTimeout) + cls.tmListener = g.tmListener + cls.communicationInterface = g.comInterface # connectToBoard() # This function should be called in each individual test to send the actual telecommands # which are stored inside testQueue def performTestingAndGenerateAssertionDict(self): + print(self.communicationInterface) UnitTester = MultipleCommandSenderReceiver( - comInterface=self.communicationInterface, tmtcPrinter=self.tmtcPrinter,tmListener=self.tmListener, - tcQueue=self.testQueue,tmTimeout=self.tmTimeout, waitIntervals=self.waitIntervals, waitTime=self.waitTime, + comInterface=self.communicationInterface, tmtcPrinter=self.tmtcPrinter, tmListener=self.tmListener, + tcQueue=self.testQueue, tmTimeout=self.tmTimeout, waitIntervals=self.waitIntervals, waitTime=self.waitTime, printTm=self.printTm, tcTimeoutFactor=self.tcTimeoutFactor, doPrintToFile=self.printFile) (tcInfoQueue, tmInfoQueue) = UnitTester.sendTcQueueAndReturnTcInfo() assertionDict = self.analyseTmTcInfo(tmInfoQueue, tcInfoQueue) @@ -179,9 +176,9 @@ class TestService(unittest.TestCase): self.miscExpected = 2 self.performGenericAssertionTest(assertionDict) - def analyseService5or17TM(self, tmInfoQueue, assertionDict): + def analyseService5or17TM(self, tmInfoQueue: Deque, assertionDict: dict): self.miscCounter = 0 - while not tmInfoQueue.empty(): + while not tmInfoQueue.__len__() == 0: currentTmInfo = tmInfoQueue.pop() # Tc verification scanning is generic and has been moved to the superclass if currentTmInfo["service"] == 1: @@ -202,8 +199,8 @@ class TestService(unittest.TestCase): cls.tcVerifyCounter = 0 # noinspection PyUnresolvedReferences cls.testQueue.clear() - if g.comIF == 1: - cls.communicationInterface.close() + # if g.comIF == 1: + # cls.communicationInterface.close() class TestDummyDevice(TestService): diff --git a/utility/OBSW_ExitHandler.py b/utility/OBSW_ExitHandler.py index 94d5817b3ec8ea1582491c6d4426d3fc36f0f6df..4fa3792448cd3900ce21f85deddbdb14439654f3 100644 --- a/utility/OBSW_ExitHandler.py +++ b/utility/OBSW_ExitHandler.py @@ -18,5 +18,5 @@ def keyboardInterruptHandler(comInterface: ComIF_T): print("Disconnect registered") # Unit Test closes Serial Port at the end # We could do some optional stuff here - if comInterface != 0: + if comInterface is not None: comInterface.sendTelecommand(bytearray([0, 0, 0, 0, 0]))