diff --git a/comIF/OBSW_ComInterface.py b/comIF/OBSW_ComInterface.py index 5c971409cb82d0c0e3b1f4b8c207517c6971106a..49cab7f6cfdbe4ea75969473a83b0242857afa42 100644 --- a/comIF/OBSW_ComInterface.py +++ b/comIF/OBSW_ComInterface.py @@ -20,6 +20,10 @@ class CommunicationInterface: def __init__(self, tmtcPrinter): self.tmtcPrinter = tmtcPrinter + @abstractmethod + def close(self) -> None: + pass + @abstractmethod def sendTelecommand(self, tcPacket, tcPacketInfo="") -> None: """ diff --git a/comIF/OBSW_DummyComIF.py b/comIF/OBSW_DummyComIF.py index 15e5ea437902489e2d33a60e32cf9701076ef70d..a5b81497a902da40a79c52b932806f35fff8fc99 100644 --- a/comIF/OBSW_DummyComIF.py +++ b/comIF/OBSW_DummyComIF.py @@ -11,6 +11,9 @@ from comIF.OBSW_ComInterface import CommunicationInterface class DummyComIF(CommunicationInterface): + def close(self) -> None: + pass + def dataAvailable(self, parameters): pass diff --git a/comIF/OBSW_Ethernet_ComIF.py b/comIF/OBSW_Ethernet_ComIF.py index cc5a075dd8c72dcbebb2605365cdc07af87faa81..d49b1331e9179285b0f54b86284c2276b6332516 100644 --- a/comIF/OBSW_Ethernet_ComIF.py +++ b/comIF/OBSW_Ethernet_ComIF.py @@ -22,6 +22,9 @@ class EthernetComIF(CommunicationInterface): self.sendAddress = sendAddress self.setUpSocket() + def close(self) -> None: + pass + def sendTelecommand(self, tcPacket, tcPacketInfo=""): self.tmtcPrinter.printTelecommand(tcPacket, tcPacketInfo) self.sendSocket.sendto(tcPacket, self.sendAddress) diff --git a/comIF/OBSW_Serial_ComIF.py b/comIF/OBSW_Serial_ComIF.py index 5f234eecc866500af2661aa4a18a18294e308095..5241e3cc5b8b7cbb4bacf710d9695780cdb5afd4 100644 --- a/comIF/OBSW_Serial_ComIF.py +++ b/comIF/OBSW_Serial_ComIF.py @@ -21,11 +21,19 @@ class SerialComIF(CommunicationInterface): self.serial = serial.Serial(port=comPort, baudrate=baudRate, timeout=serialTimeout) except serial.SerialException: print("Serial Port can not be opened") - logging.exception("Error: ") + logging.exception("Error") exit() self.data = bytearray() self.numberOfPackets = 0 + def close(self): + try: + self.serial.close() + except serial.SerialException: + print("Serial Port could not be closed !") + logging.exception("Error") + exit() + def sendTelecommand(self, tcPacket, tcPacketInfo=""): self.tmtcPrinter.printTelecommand(tcPacket, tcPacketInfo) self.serial.write(tcPacket) diff --git a/sendreceive/OBSW_CommandSenderReceiver.py b/sendreceive/OBSW_CommandSenderReceiver.py index 7100728b390d314276fbb10605edf5f6c0223f47..e50981c3df412b45f021908eb6c25386fe639646 100644 --- a/sendreceive/OBSW_CommandSenderReceiver.py +++ b/sendreceive/OBSW_CommandSenderReceiver.py @@ -12,17 +12,15 @@ if the first reply has not been received. @author: R. Mueller """ import time -from config import OBSW_Config as g from comIF.OBSW_ComInterface import CommunicationInterface, ComIF_T from utility.OBSW_TmTcPrinter import TmTcPrinterT, TmTcPrinter from sendreceive.OBSW_TmListener import TmListenerT, TmListener -# Generic TMTC SendReceive class which is implemented -# by specific implementations (e.g. SingleCommandSenderReceiver) class CommandSenderReceiver: """ - This is the generic CommandSenderReceiver object. All TMTC objects inherit this object + This is the generic CommandSenderReceiver object. All TMTC objects inherit this object, + for example specific implementations (e.g. SingleCommandSenderReceiver) """ def __init__(self, comInterface: ComIF_T, tmtcPrinter: TmTcPrinterT, tmListener: TmListenerT, tmTimeout: float, tcSendTimeoutFactor: float, doPrintToFile: bool): @@ -121,26 +119,6 @@ class CommandSenderReceiver: self.start_time = time.time() time.sleep(1) - # check for sequence of replies for a telecommand (e.g. TM[1,1] , TM[1,7] ...) for tmTimeout value - # returns True on success - # def checkForOneTelemetrySequence(self): - # tmReady = self.comInterface.dataAvailable(self.tmTimeout * self.tcSendTimeoutFactor) - # if tmReady is False: - # return False - # else: - # self.comInterface.receiveTelemetry() - # start_time = time.time() - # elapsed_time = 0 - # while elapsed_time < self.tmTimeout: - # tmReady = self.comInterface.dataAvailable(1.0) - # if tmReady: - # self.comInterface.receiveTelemetry() - # elapsed_time = time.time() - start_time - # # the timeout value can be set by special TC queue entries if packet handling takes longer, - # # but it is reset here to the global value - # if self.tmTimeout is not g.tmTimeout: - # self.tmTimeout = g.tmTimeout - # return True diff --git a/sendreceive/OBSW_MultipleCommandsSenderReceiver.py b/sendreceive/OBSW_MultipleCommandsSenderReceiver.py index 2008cbe2289c44d376a1ebe47ccd61ce7180ba85..bc3ff58f61e9b872477e0f396b3a3f53ffc10050 100644 --- a/sendreceive/OBSW_MultipleCommandsSenderReceiver.py +++ b/sendreceive/OBSW_MultipleCommandsSenderReceiver.py @@ -6,12 +6,10 @@ @brief Used to send multiple TCs as bursts and listen for replies simultaneously. Used by UnitTester """ -import queue import time -import threading -from typing import Union +from typing import Union, Deque +from collections import deque -from config import OBSW_Config as g from sendreceive.OBSW_SequentialSenderReceiver import SequentialCommandSenderReceiver from comIF.OBSW_ComInterface import ComIF_T from utility.OBSW_TmTcPrinter import TmTcPrinterT @@ -23,7 +21,7 @@ class MultipleCommandSenderReceiver(SequentialCommandSenderReceiver): Difference to seqential sender: This class can send TCs in bursts. Wait intervals can be specified with wait time between the send bursts. This is generally done in the separate test classes in UnitTest """ - def __init__(self, comInterface: ComIF_T, tmtcPrinter: TmTcPrinterT, tcQueue, tmListener: TmListenerT, + def __init__(self, comInterface: ComIF_T, tmtcPrinter: TmTcPrinterT, tcQueue: Deque, tmListener: TmListenerT, tmTimeout: float, waitIntervals: list, waitTime: Union[float, list], printTm: bool, tcTimeoutFactor: float, doPrintToFile: bool): """ @@ -44,16 +42,16 @@ class MultipleCommandSenderReceiver(SequentialCommandSenderReceiver): self.waitIntervals = waitIntervals self.waitTime = waitTime self.printTm = printTm - self.tmInfoQueue = queue.Queue() - self.tcInfoQueue = queue.Queue() + self.tmInfoQueue = deque() + self.tcInfoQueue = deque() self.pusPacketInfo = [] self.pusPacket = [] self.waitCounter = 0 def sendTcQueueAndReturnTcInfo(self): - receiverThread = threading.Thread(target=self.checkForMultipleReplies) - receiverThread.start() - time.sleep(0.5) + # receiverThread = threading.Thread(target=self.checkForMultipleReplies) + # receiverThread.start() + # time.sleep(0.5) try: self.sendAllQueue() while not self.allRepliesReceived: @@ -76,15 +74,12 @@ class MultipleCommandSenderReceiver(SequentialCommandSenderReceiver): def sendAllQueue(self): while not self.tcQueue.empty(): - if g.comIF == 1: - pass - # time.sleep(0.01) # pause could be smaller, but causes overrun error on flashed board self.sendAndPrintTc() def sendAndPrintTc(self): - self.checkQueueEntry(self.tcQueue.get()) + self.checkQueueEntry(self.tcQueue.pop()) if self.queueEntryIsTelecommand: - self.tcInfoQueue.put(self.pusPacketInfo) + self.tcInfoQueue.append(self.pusPacketInfo) self.comInterface.sendTelecommand(self.pusPacket, self.pusPacketInfo) self.handleWaiting() @@ -102,8 +97,8 @@ class MultipleCommandSenderReceiver(SequentialCommandSenderReceiver): else: print("Multiple Command SenderReceiver: Configuration error, reply event not set in TM listener") - def checkForMultipleReplies(self): - super().checkForMultipleReplies() + # def checkForMultipleReplies(self): + # super().checkForMultipleReplies() # def checkForMultipleReplies(self): # while not self.allRepliesReceived and self.run_event.is_set(): @@ -124,7 +119,7 @@ class MultipleCommandSenderReceiver(SequentialCommandSenderReceiver): packetList = self.comInterface.receiveTelemetry() for counter in range(0, len(packetList)): tmInfo = packetList[counter].packTmInformation() - self.tmInfoQueue.put(tmInfo) + self.tmInfoQueue.append(tmInfo) def handleLastRepliesListening(self, start_time): elapsed_time_seconds = 0 diff --git a/sendreceive/OBSW_SequentialSenderReceiver.py b/sendreceive/OBSW_SequentialSenderReceiver.py index baea1ec127cc684f8d9c3b9a2fb466d5ece94b83..c7c7e432357988473500e18fb0112eccc3c2c43f 100644 --- a/sendreceive/OBSW_SequentialSenderReceiver.py +++ b/sendreceive/OBSW_SequentialSenderReceiver.py @@ -8,7 +8,7 @@ @brief Used to send multiple TCs in sequence and listen for replies after each sent tc """ -import threading +from typing import Deque import time import config.OBSW_Config as g @@ -23,7 +23,7 @@ class SequentialCommandSenderReceiver(CommandSenderReceiver): Specific implementation of CommandSenderReceiver to send multiple telecommands in sequence """ def __init__(self, comInterface: ComIF_T, tmtcPrinter: TmTcPrinterT, tmListener: TmListenerT, tmTimeout: float, - tcQueue, tcTimeoutFactor: float, doPrintToFile: bool): + tcQueue: Deque, tcTimeoutFactor: float, doPrintToFile: bool): """ :param comInterface: CommunicationInterface object, passed on to CommandSenderReceiver :param tmListener: TmListener object which runs in the background and receives all Telemetry @@ -43,7 +43,7 @@ class SequentialCommandSenderReceiver(CommandSenderReceiver): self.abortFlag = False def sendQueueTcAndReceiveTmSequentially(self): - self.tmListener.modeId = g.modeList[3] + self.tmListener.modeId = g.modeList.ServiceTestMode self.tmListener.modeChangeEvent.set() self.sendAndReceiveFirstPacket() # this flag is set in the separate thread ! @@ -56,9 +56,9 @@ class SequentialCommandSenderReceiver(CommandSenderReceiver): def handleTcSending(self): while not self.allRepliesReceived: - while not self.tcQueue.empty(): + while not self.tcQueue.__len__() == 0: self.performNextTcSend() - if self.tcQueue.empty(): + if self.tcQueue.__len__() == 0: self.start_time = time.time() self.checkForTimeout() print("Sequential SenderReceiver: All replies received!") @@ -81,18 +81,18 @@ class SequentialCommandSenderReceiver(CommandSenderReceiver): self.checkForTimeout() def sendAndReceiveFirstPacket(self): - self.checkQueueEntry(self.tcQueue.get()) + self.checkQueueEntry(self.tcQueue.pop()) if self.queueEntryIsTelecommand: self.comInterface.sendTelecommand(self.pusPacket, self.pusPacketInfo) else: self.sendAndReceiveFirstPacket() def sendNextTelecommand(self): - self.checkQueueEntry(self.tcQueue.get()) + self.checkQueueEntry(self.tcQueue.pop()) if self.queueEntryIsTelecommand: self.start_time = time.time() self.comInterface.sendTelecommand(self.pusPacket, self.pusPacketInfo) - elif self.tcQueue.empty(): + elif self.tcQueue.__len__() == 0: # Special case: Last queue entry is not a Telecommand self.allRepliesReceived = True else: diff --git a/sendreceive/OBSW_TmListener.py b/sendreceive/OBSW_TmListener.py index 7f2821c1556c32652c5006f1b8333a687ac42e8e..8fccb0cb0ccb3e2749652c67192e024d3ea57b77 100644 --- a/sendreceive/OBSW_TmListener.py +++ b/sendreceive/OBSW_TmListener.py @@ -10,7 +10,7 @@ """ import time import threading -import queue +from collections import deque from typing import TypeVar from comIF.OBSW_ComInterface import ComIF_T @@ -47,7 +47,7 @@ class TmListener: # This Event is set and cleared by the listener to inform the sender objects if a reply has been received self.replyEvent = threading.Event() # Will be filled for the Unit Test - self.tmInfoQueue = queue.Queue() + self.tmInfoQueue = deque() def start(self): self.listenerThread.start() diff --git a/tc/OBSW_TcPacker.py b/tc/OBSW_TcPacker.py index cc8dd5090ade2f240dcee9f33c8dab88ef9508e4..5d2142effd5d00d8000c9b2ac41b3d68b9fd5329 100644 --- a/tc/OBSW_TcPacker.py +++ b/tc/OBSW_TcPacker.py @@ -13,7 +13,7 @@ Run script with -s <Service or Device> -m 3 with optional -p parameter for file """ import os -from tc.OBSW_TcPacket import * +from tc.OBSW_TcPacket import PUSTelecommand from tc.OBSW_TcService2 import packService2TestInto from tc.OBSW_TcService3 import packService3TestInto from tc.OBSW_TcService8 import packService8TestInto @@ -21,9 +21,10 @@ from tc.OBSW_TcService200 import packModeData, packService200TestInto import config.OBSW_Config as g from datetime import datetime from collections import deque +from typing import Deque,Union -def serviceTestSelect(service, serviceQueue): +def serviceTestSelect(service: Union[int, str], serviceQueue: Deque) -> Deque: if service == 2: return packService2TestInto(serviceQueue) elif service == 3: @@ -55,31 +56,31 @@ def serviceTestSelect(service, serviceQueue): exit() -def packService5TestInto(tcQueue): - tcQueue.put(("print", "Testing Service 5")) +def packService5TestInto(tcQueue: Deque): + tcQueue.append(("print", "Testing Service 5")) # disable events - tcQueue.put(("print", "\r\nTesting Service 5: Disable event")) + tcQueue.append(("print", "\r\nTesting Service 5: Disable event")) command = PUSTelecommand(service=5, subservice=6, SSC=500) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # trigger event - tcQueue.put(("print", "\r\nTesting Service 5: Trigger event")) + tcQueue.append(("print", "\r\nTesting Service 5: Trigger event")) command = PUSTelecommand(service=17, subservice=128, SSC=510) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # enable event - tcQueue.put(("print", "\r\nTesting Service 5: Enable event")) + tcQueue.append(("print", "\r\nTesting Service 5: Enable event")) command = PUSTelecommand(service=5, subservice=5, SSC=520) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # trigger event - tcQueue.put(("print", "\r\nTesting Service 5: Trigger another event")) + tcQueue.append(("print", "\r\nTesting Service 5: Trigger another event")) command = PUSTelecommand(service=17, subservice=128, SSC=530) - tcQueue.put(command.packCommandTuple()) - tcQueue.put(("print", "\r")) - tcQueue.put(("export", "log/tmtc_log_service5.txt")) + tcQueue.append(command.packCommandTuple()) + tcQueue.append(("print", "\r")) + tcQueue.append(("export", "log/tmtc_log_service5.txt")) return tcQueue -def packService9TestInto(tcQueue): - tcQueue.put(("print", "Testing Service 9")) +def packService9TestInto(tcQueue: Deque) -> Deque: + tcQueue.append(("print", "Testing Service 9")) timeTestASCIICodeA = '2019-08-30T20:50:33.892429Z' + '\0' timeTestASCIICodeB = '2019-270T05:50:33.002000Z' + '\0' timeTestCurrentTime = datetime.now().isoformat() + "Z" + '\0' @@ -90,120 +91,120 @@ def packService9TestInto(tcQueue): print("Time Code 2 :" + str(timeTest2)) print("Time Code 3 :" + str(timeTest3) + "\r") # time setting - tcQueue.put(("print", "\r\nTesting Service 9: Testing timecode A")) + tcQueue.append(("print", "\r\nTesting Service 9: Testing timecode A")) command = PUSTelecommand(service=9, subservice=128, SSC=900, data=timeTest1) - tcQueue.put(command.packCommandTuple()) - tcQueue.put(("print", "\r\nTesting Service 9: Testing timecode B")) + tcQueue.append(command.packCommandTuple()) + tcQueue.append(("print", "\r\nTesting Service 9: Testing timecode B")) command = PUSTelecommand(service=9, subservice=128, SSC=910, data=timeTest2) - tcQueue.put(command.packCommandTuple()) - tcQueue.put(("print", "\r\nTesting Service 9: Testing timecode Current Time")) + tcQueue.append(command.packCommandTuple()) + tcQueue.append(("print", "\r\nTesting Service 9: Testing timecode Current Time")) command = PUSTelecommand(service=9, subservice=128, SSC=920, data=timeTest3) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # TODO: Add other time formats here - tcQueue.put(("print", "\r")) - tcQueue.put(("export", "log/tmtc_log_service9.txt")) + tcQueue.append(("print", "\r")) + tcQueue.append(("export", "log/tmtc_log_service9.txt")) return tcQueue -def packService17TestInto(tcQueue): - tcQueue.put(("print", "Testing Service 17")) +def packService17TestInto(tcQueue: Deque) -> Deque: + tcQueue.append(("print", "Testing Service 17")) # ping test - tcQueue.put(("print", "\n\rTesting Service 17: Ping Test")) + tcQueue.append(("print", "\n\rTesting Service 17: Ping Test")) command = PUSTelecommand(service=17, subservice=1, SSC=1700) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # enable event - tcQueue.put(("print", "\n\rTesting Service 17: Enable Event")) + tcQueue.append(("print", "\n\rTesting Service 17: Enable Event")) command = PUSTelecommand(service=5, subservice=5, SSC=52) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # test event - tcQueue.put(("print", "\n\rTesting Service 17: Trigger event")) + tcQueue.append(("print", "\n\rTesting Service 17: Trigger event")) command = PUSTelecommand(service=17, subservice=128, SSC=1701) - tcQueue.put(command.packCommandTuple()) - tcQueue.put(("print", "\r")) - tcQueue.put(("export", "log/tmtc_log_service17.txt")) + tcQueue.append(command.packCommandTuple()) + tcQueue.append(("print", "\r")) + tcQueue.append(("export", "log/tmtc_log_service17.txt")) return tcQueue -def packDummyDeviceTestInto(tcQueue): - tcQueue.put(("print", "Testing Dummy Device")) +def packDummyDeviceTestInto(tcQueue: Deque) -> Deque: + tcQueue.append(("print", "Testing Dummy Device")) # Object ID: Dummy Device objectId = bytearray([0x44, 0x00, 0xAF, 0xFE]) # Set On Mode - tcQueue.put(("print", "\n\rTesting Service Dummy: Set On")) + tcQueue.append(("print", "\n\rTesting Service Dummy: Set On")) modeData = packModeData(objectId, 1, 0) command = PUSTelecommand(service=200, subservice=1, SSC=1, data=modeData) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # Test Service 2 commands - tcQueue.put(("print", "\n\rTesting Service Dummy: Service 2")) + tcQueue.append(("print", "\n\rTesting Service Dummy: Service 2")) packService2TestInto(tcQueue, True) # Test Service 8 - tcQueue.put(("print", "\n\rTesting Service Dummy: Service 8")) + tcQueue.append(("print", "\n\rTesting Service Dummy: Service 8")) packService8TestInto(tcQueue, True) - tcQueue.put(("print", "\r")) - tcQueue.put(("export", "log/tmtc_log_service_dummy.txt")) + tcQueue.append(("print", "\r")) + tcQueue.append(("export", "log/tmtc_log_service_dummy.txt")) return tcQueue -def packGpsTestInto(objectId, tcQueue): +def packGpsTestInto(objectId: bytearray, tcQueue: Deque) -> Deque: if objectId == g.GPS0_ObjectId: gpsString = "GPS0" elif objectId == g.GPS1_ObjectId: gpsString = "GPS1" else: gpsString = "unknown" - tcQueue.put(("print", "Testing " + gpsString + " Device")) + tcQueue.append(("print", "Testing " + gpsString + " Device")) # Set Mode Off - tcQueue.put(("print", "\n\rTesting " + gpsString + ": Set Off")) + tcQueue.append(("print", "\n\rTesting " + gpsString + ": Set Off")) modeData = packModeData(objectId, 0, 0) command = PUSTelecommand(service=200, subservice=1, SSC=11, data=modeData) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # Set Mode On - tcQueue.put(("print", "\n\rTesting " + gpsString + ": Set On")) + tcQueue.append(("print", "\n\rTesting " + gpsString + ": Set On")) modeData = packModeData(objectId, 1, 0) command = PUSTelecommand(service=200, subservice=1, SSC=12, data=modeData) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # Enable HK report sidGps = 0 if objectId == g.GPS0_ObjectId: sidGps = g.GPS0_SID - tcQueue.put(("print", "\r\nTesting " + gpsString + ": Enable HK Reporting")) + tcQueue.append(("print", "\r\nTesting " + gpsString + ": Enable HK Reporting")) command = PUSTelecommand(service=3, subservice=5, SSC=13, data=sidGps) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) elif objectId == g.GPS1_ObjectId: sidGps = g.GPS1_SID - tcQueue.put(("print", "\r\nTesting " + gpsString + ": Enable HK Reporting")) + tcQueue.append(("print", "\r\nTesting " + gpsString + ": Enable HK Reporting")) command = PUSTelecommand(service=3, subservice=5, SSC=14, data=sidGps) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # pack wait interval until mode is on and a few gps replies have been received - tcQueue.put(("wait", 5)) + tcQueue.append(("wait", 5)) # Disable HK reporting - tcQueue.put(("print", "\r\nTesting Service 3: Disable " + gpsString + " definition")) + tcQueue.append(("print", "\r\nTesting Service 3: Disable " + gpsString + " definition")) command = PUSTelecommand(service=3, subservice=6, SSC=15, data=sidGps) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # Set Mode Off - tcQueue.put(("print", "\n\rTesting " + gpsString + ": Set Off")) + tcQueue.append(("print", "\n\rTesting " + gpsString + ": Set Off")) modeData = packModeData(objectId, 0, 0) command = PUSTelecommand(service=200, subservice=1, SSC=13, data=modeData) - tcQueue.put(command.packCommandTuple()) - tcQueue.put(("print", "\r")) - tcQueue.put(("export", "log/tmtc_log_service_" + gpsString + ".txt")) + tcQueue.append(command.packCommandTuple()) + tcQueue.append(("print", "\r")) + tcQueue.append(("export", "log/tmtc_log_service_" + gpsString + ".txt")) return tcQueue -def packErrorTestingInto(tcQueue): +def packErrorTestingInto(tcQueue: Deque) -> Deque: # a lot of events command = PUSTelecommand(service=17, subservice=129, SSC=2010) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # a lot of ping testing command = PUSTelecommand(service=17, subservice=130, SSC=2020) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) return tcQueue def createTotalTcQueue(): if not os.path.exists("log"): os.mkdir("log") - tcQueue = queue.Queue() + tcQueue = deque() tcQueue = packService2TestInto(tcQueue) tcQueue = packService5TestInto(tcQueue) tcQueue = packService8TestInto(tcQueue) diff --git a/tc/OBSW_TcService2.py b/tc/OBSW_TcService2.py index 5eae374c35ae1ed75308bf1bf7ef4dc9849ef107..84be166447cbdcccd3ae36a13811569d53b887a9 100644 --- a/tc/OBSW_TcService2.py +++ b/tc/OBSW_TcService2.py @@ -6,46 +6,46 @@ Description: PUS Custom Service 8: Device Access, Native low-level commanding @author: R. Mueller """ - import struct +from typing import Deque from tc.OBSW_TcPacket import * from tc.OBSW_TcService200 import packModeData -def packService2TestInto(tcQueue, calledExternally=False): +def packService2TestInto(tcQueue: Deque, calledExternally: bool = False) -> Deque: if calledExternally is False: - tcQueue.put(("print", "Testing Service 2")) + tcQueue.append(("print", "Testing Service 2")) objectId = bytearray([0x44, 0x00, 0xAF, 0xFE]) # dummy device # don't forget to set object mode raw (service 200) before calling this ! # Set Raw Mode - tcQueue.put(("print", "\r\nTesting Service 2: Setting Raw Mode")) + tcQueue.append(("print", "\r\nTesting Service 2: Setting Raw Mode")) modeData = packModeData(objectId, 3, 0) command = PUSTelecommand(service=200, subservice=1, SSC=2020, data=modeData) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # toggle wiretapping raw - tcQueue.put(("print", "\r\nTesting Service 2: Toggling Wiretapping Raw")) + tcQueue.append(("print", "\r\nTesting Service 2: Toggling Wiretapping Raw")) wiretappingToggleData = packWiretappingMode(objectId, 1) toggleWiretappingOnCommand = PUSTelecommand(service=2, subservice=129, SSC=200, data=wiretappingToggleData) - tcQueue.put(toggleWiretappingOnCommand.packCommandTuple()) + tcQueue.append(toggleWiretappingOnCommand.packCommandTuple()) # send raw command, data should be returned via TM[2,130] and TC[2,131] - tcQueue.put(("print", "\r\nTesting Service 2: Sending Raw Command")) + tcQueue.append(("print", "\r\nTesting Service 2: Sending Raw Command")) rawCommand = struct.pack(">I", 666) rawData = objectId + rawCommand rawCommand = PUSTelecommand(service=2, subservice=128, SSC=201, data=rawData) - tcQueue.put(rawCommand.packCommandTuple()) + tcQueue.append(rawCommand.packCommandTuple()) # toggle wiretapping off - tcQueue.put(("print", "\r\nTesting Service 2: Toggle Wiretapping Off")) + tcQueue.append(("print", "\r\nTesting Service 2: Toggle Wiretapping Off")) wiretappingToggleData = packWiretappingMode(objectId, 0) toggleWiretappingOffCommand = PUSTelecommand(service=2, subservice=129, SSC=204, data=wiretappingToggleData) - tcQueue.put(toggleWiretappingOffCommand.packCommandTuple()) + tcQueue.append(toggleWiretappingOffCommand.packCommandTuple()) # send raw command which should be returned via TM[2,130] - tcQueue.put(("print", "\r\nTesting Service 2: Send second raw command")) + tcQueue.append(("print", "\r\nTesting Service 2: Send second raw command")) command = PUSTelecommand(service=2, subservice=128, SSC=205, data=rawData) - tcQueue.put(command.packCommandTuple()) - tcQueue.put(("print", "\r")) + tcQueue.append(command.packCommandTuple()) + tcQueue.append(("print", "\r")) if calledExternally is False: - tcQueue.put(("export", "log/tmtc_log_service2.txt")) + tcQueue.append(("export", "log/tmtc_log_service2.txt")) return tcQueue diff --git a/tc/OBSW_TcService200.py b/tc/OBSW_TcService200.py index ba4a2dc9b23daeed3c453909a76d7996a6f315c1..a9c545c7c8b22c9d56e0d0840d1a790c1757b2ba 100644 --- a/tc/OBSW_TcService200.py +++ b/tc/OBSW_TcService200.py @@ -11,35 +11,36 @@ Run script with -s <Service or Device> -m 3 with optional -p parameter for file @author: R. Mueller """ -from tc.OBSW_TcPacket import * +from tc.OBSW_TcPacket import PUSTelecommand +from typing import Deque import struct -def packService200TestInto(tcQueue): - tcQueue.put(("print", "Testing Service 200")) +def packService200TestInto(tcQueue: Deque) -> Deque: + tcQueue.append(("print", "Testing Service 200")) # Object ID: Dummy Device objectId = bytearray([0x44, 0x00, 0xAF, 0xFE]) # Set On Mode - tcQueue.put(("print", "\r\nTesting Service 200: Set Mode On")) + tcQueue.append(("print", "\r\nTesting Service 200: Set Mode On")) modeData = packModeData(objectId, 1, 0) command = PUSTelecommand(service=200, subservice=1, SSC=2000, data=modeData) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # Set Normal mode - tcQueue.put(("print", "\r\nTesting Service 200: Set Mode Normal")) + tcQueue.append(("print", "\r\nTesting Service 200: Set Mode Normal")) modeData = packModeData(objectId, 2, 0) command = PUSTelecommand(service=200, subservice=1, SSC=2010, data=modeData) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # Set Raw Mode - tcQueue.put(("print", "\r\nTesting Service 200: Set Mode Raw")) + tcQueue.append(("print", "\r\nTesting Service 200: Set Mode Raw")) modeData = packModeData(objectId, 3, 0) command = PUSTelecommand(service=200, subservice=1, SSC=2020, data=modeData) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # Set Off Mode - tcQueue.put(("print", "\r\nTesting Service 200: Set Mode Off")) + tcQueue.append(("print", "\r\nTesting Service 200: Set Mode Off")) modeData = packModeData(objectId, 0, 0) command = PUSTelecommand(service=200, subservice=1, SSC=2030, data=modeData) - tcQueue.put(command.packCommandTuple()) - tcQueue.put(("export", "log/tmtc_log_service200.txt")) + tcQueue.append(command.packCommandTuple()) + tcQueue.append(("export", "log/tmtc_log_service200.txt")) return tcQueue diff --git a/tc/OBSW_TcService3.py b/tc/OBSW_TcService3.py index 4f145bf7aeb22e541ae1512250f00601e7bab4da..5504f493f9f8b4c288f61c8330797587fd0a82be 100644 --- a/tc/OBSW_TcService3.py +++ b/tc/OBSW_TcService3.py @@ -7,11 +7,12 @@ Description: PUS Custom Service 8: Device Access, Native low-level commanding @author: R. Mueller """ import struct +from typing import Deque from tc.OBSW_TcPacket import PUSTelecommand -def packService3TestInto(tcQueue): - tcQueue.put(("print", "Testing Service 3")) +def packService3TestInto(tcQueue: Deque) -> Deque: + tcQueue.append(("print", "Testing Service 3")) # adding custom defintion to hk using test pool variables sid1 = bytearray([0x00, 0x00, 0x43, 0x00]) sid2 = bytearray([0x00, 0x00, 0x44, 0x00]) @@ -28,96 +29,96 @@ def packService3TestInto(tcQueue): hkDefinition2 = sid2 + collectionInterval + numberOfParameters + p1 + p2 + p3 + p4 + p5 # deleting pre-defined test entry - tcQueue.put(("print", "\r\nTesting Service 3: Deleting pre-defined HK definition")) + tcQueue.append(("print", "\r\nTesting Service 3: Deleting pre-defined HK definition")) command = PUSTelecommand(service=3, subservice=3, SSC=3000, data=sid1) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # adding pre-defined definition to hk using test pool variables - tcQueue.put(("print", "\r\nTesting Service 3: Adding pre-defined HK definition")) + tcQueue.append(("print", "\r\nTesting Service 3: Adding pre-defined HK definition")) command = PUSTelecommand(service=3, subservice=1, SSC=3010, data=hkDefinition1) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # adding custom definition to diagnostics using test pool variables - tcQueue.put(("print", "\r\nTesting Service 3: Adding custom diganostics definition")) + tcQueue.append(("print", "\r\nTesting Service 3: Adding custom diganostics definition")) command = PUSTelecommand(service=3, subservice=2, SSC=3020, data=hkDefinition2) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # enable custom hk definition - tcQueue.put(("print", "\r\nTesting Service 3: Enable custom definition")) + tcQueue.append(("print", "\r\nTesting Service 3: Enable custom definition")) command = PUSTelecommand(service=3, subservice=5, SSC=3030, data=sid1) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # enable custom diag definition - tcQueue.put(("print", "\r\nTesting Service 3: Enable custom diagnostics definition")) + tcQueue.append(("print", "\r\nTesting Service 3: Enable custom diagnostics definition")) command = PUSTelecommand(service=3, subservice=7, SSC=3040, data=sid2) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # enable gps0 - tcQueue.put(("print", "\r\nTesting Service 3: Enable GPS definition")) + tcQueue.append(("print", "\r\nTesting Service 3: Enable GPS definition")) command = PUSTelecommand(service=3, subservice=5, SSC=3050, data=sidGps) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # maybe wait a bit to receive at least 2 packets.. - tcQueue.put(("wait", 3)) + tcQueue.append(("wait", 3)) # Disable custom hk definition - tcQueue.put(("print", "\r\nTesting Service 3: Disable custom definition")) + tcQueue.append(("print", "\r\nTesting Service 3: Disable custom definition")) command = PUSTelecommand(service=3, subservice=6, SSC=3060, data=sid1) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # Disable custom diag definition - tcQueue.put(("print", "\r\nTesting Service 3: Disable custom diagnostics definition")) + tcQueue.append(("print", "\r\nTesting Service 3: Disable custom diagnostics definition")) command = PUSTelecommand(service=3, subservice=8, SSC=3070, data=sid2) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # disable gps0 - tcQueue.put(("print", "\r\nTesting Service 3: Disable GPS definition")) + tcQueue.append(("print", "\r\nTesting Service 3: Disable GPS definition")) command = PUSTelecommand(service=3, subservice=6, SSC=3080, data=sidGps) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # report custom Diag definition - tcQueue.put(("print", "\r\nTesting Service 3: Reporting diag definition")) + tcQueue.append(("print", "\r\nTesting Service 3: Reporting diag definition")) command = PUSTelecommand(service=3, subservice=11, SSC=3100, data=sid2) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # report gps definition - tcQueue.put(("print", "\r\nTesting Service 3: Reporting GPS definition")) + tcQueue.append(("print", "\r\nTesting Service 3: Reporting GPS definition")) command = PUSTelecommand(service=3, subservice=9, SSC=3110, data=sidGps) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # generate one custom hk definition - tcQueue.put(("print", "\r\nTesting Service 3: Generate one custom hk definition")) + tcQueue.append(("print", "\r\nTesting Service 3: Generate one custom hk definition")) command = PUSTelecommand(service=3, subservice=27, SSC=3120, data=sid1) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # generate one custom diag definition - tcQueue.put(("print", "\r\nTesting Service 3: Generate one custom diagnostics definition")) + tcQueue.append(("print", "\r\nTesting Service 3: Generate one custom diagnostics definition")) command = PUSTelecommand(service=3, subservice=28, SSC=3120, data=sid2) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # generate one gps 0 definition - tcQueue.put(("print", "\r\nTesting Service 3: Generate one gps 0 defintion")) + tcQueue.append(("print", "\r\nTesting Service 3: Generate one gps 0 defintion")) command = PUSTelecommand(service=3, subservice=27, SSC=3120, data=sidGps) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # modify custom hk definition interval newInterval = struct.pack('>f', 10.0) newIntervalCommand = sid1 + newInterval - tcQueue.put(("print", "\r\nTesting Service 3: Changing pre-defined HK definition interval")) + tcQueue.append(("print", "\r\nTesting Service 3: Changing pre-defined HK definition interval")) command = PUSTelecommand(service=3, subservice=31, SSC=3090, data=newIntervalCommand) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # report custom HK definition - tcQueue.put(("print", "\r\nTesting Service 3: Reporting pre-defined HK definition with changed interval")) + tcQueue.append(("print", "\r\nTesting Service 3: Reporting pre-defined HK definition with changed interval")) command = PUSTelecommand(service=3, subservice=9, SSC=3090, data=sid1) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # modify custom diag definition interval newIntervalCommand = sid2 + newInterval - tcQueue.put(("print", "\r\nTesting Service 3: Changing custom diag HK definition interval")) + tcQueue.append(("print", "\r\nTesting Service 3: Changing custom diag HK definition interval")) command = PUSTelecommand(service=3, subservice=32, SSC=3090, data=newIntervalCommand) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # report custom diag definition - tcQueue.put(("print", "\r\nTesting Service 3: Reporting diag definition")) + tcQueue.append(("print", "\r\nTesting Service 3: Reporting diag definition")) command = PUSTelecommand(service=3, subservice=11, SSC=3100, data=sid2) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # append parameter to custom hk definiton # append parameter to custom diag definition # delete custom diag definition - tcQueue.put(("print", "\r\nTesting Service 3: Deleting custom diagnostics definition")) + tcQueue.append(("print", "\r\nTesting Service 3: Deleting custom diagnostics definition")) command = PUSTelecommand(service=3, subservice=4, SSC=3120, data=sid2) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # do some basic testing on predefined structs too # e.g. add one variable, change interval, report them.... - tcQueue.put(("export", "log/tmtc_log_service3.txt")) + tcQueue.append(("export", "log/tmtc_log_service3.txt")) return tcQueue diff --git a/tc/OBSW_TcService8.py b/tc/OBSW_TcService8.py index e8ba84ae853a25adc0782a8b84c47c3c8adeb03c..2395d45b54711ded51b232ea024e4c869e89ba93 100644 --- a/tc/OBSW_TcService8.py +++ b/tc/OBSW_TcService8.py @@ -6,51 +6,51 @@ Description: PUS Custom Service 8: Function Management, High-Level Commanding @author: R. Mueller """ - import struct +from typing import Deque -from tc.OBSW_TcPacket import * +from tc.OBSW_TcPacket import PUSTelecommand from tc.OBSW_TcService200 import packModeData -def packService8TestInto(tcQueue, calledExternally=False): +def packService8TestInto(tcQueue: Deque, calledExternally: bool = False) -> Deque: if calledExternally is False: - tcQueue.put(("print", "Testing Service 8")) + tcQueue.append(("print", "Testing Service 8")) objectId = bytearray([0x44, 0x00, 0xAF, 0xFE]) # set mode on - tcQueue.put(("print", "\r\nTesting Service 8: Set Normal Mode")) + tcQueue.append(("print", "\r\nTesting Service 8: Set Normal Mode")) modeData = packModeData(objectId, 1, 0) command = PUSTelecommand(service=200, subservice=1, SSC=800, data=modeData) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # set mode normal - tcQueue.put(("print", "\r\nTesting Service 8: Set Normal Mode")) + tcQueue.append(("print", "\r\nTesting Service 8: Set Normal Mode")) modeData = packModeData(objectId, 2, 0) command = PUSTelecommand(service=200, subservice=1, SSC=810, data=modeData) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # Direct command which triggers completion reply - tcQueue.put(("print", "\r\nTesting Service 8: Trigger Completion Reply")) + tcQueue.append(("print", "\r\nTesting Service 8: Trigger Completion Reply")) actionId = struct.pack(">I", 666) directCommand = objectId + actionId command = PUSTelecommand(service=8, subservice=128, SSC=820, data=directCommand) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # Direct command which triggers data reply - tcQueue.put(("print", "\r\nTesting Service 8: Trigger Data Reply")) + tcQueue.append(("print", "\r\nTesting Service 8: Trigger Data Reply")) actionId = bytearray([0xC0, 0xC0, 0xBA, 0xBE]) commandParam1 = bytearray([0xBA, 0xB0]) commandParam2 = bytearray([0x00, 0x00, 0x00, 0x52, 0x4F, 0x42, 0x49, 0x4E]) directCommand = objectId + actionId + commandParam1 + commandParam2 command = PUSTelecommand(service=8, subservice=128, SSC=830, data=directCommand) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # Direct command which triggers an additional step reply and one completion reply - tcQueue.put(("print", "\r\nTesting Service 8: Trigger Step and Completion Reply")) + tcQueue.append(("print", "\r\nTesting Service 8: Trigger Step and Completion Reply")) actionId = bytearray([0xBA, 0xDE, 0xAF, 0xFE]) directCommand = objectId + actionId command = PUSTelecommand(service=8, subservice=128, SSC=840, data=directCommand) - tcQueue.put(command.packCommandTuple()) - tcQueue.put(("wait", 2)) - tcQueue.put(("print", "\r")) + tcQueue.append(command.packCommandTuple()) + tcQueue.append(("wait", 2)) + tcQueue.append(("print", "\r")) if calledExternally is False: - tcQueue.put(("export", "log/tmtc_log_service8.txt")) + tcQueue.append(("export", "log/tmtc_log_service8.txt")) return tcQueue diff --git a/test/OBSW_PusServiceTest.py b/test/OBSW_PusServiceTest.py index 34d28fcbae0b45fbaba862c3e0331953f39f45b0..3e9310d230fada96f365f1639603ce758f116cb1 100644 --- a/test/OBSW_PusServiceTest.py +++ b/test/OBSW_PusServiceTest.py @@ -1,5 +1,5 @@ import unittest -from test.OBSW_UnitTest import TestService +from test.OBSW_UnitTest import TestService, TmInfoQueueT from tc.OBSW_TcPacker import packService17TestInto, packService5TestInto, packService2TestInto import config.OBSW_Config as g @@ -12,7 +12,7 @@ class TestService2(TestService): # all commands must be sent sequentially, not as a burst cls.waitIntervals = [1, 2, 3, 4] cls.waitTime = [2, 2, 2, 2] - packService2TestInto(cls.testQueue) + packService2TestInto(super().testQueue) def test_Service2(self): assertionDict = self.performTestingAndGenerateAssertionDict() @@ -20,9 +20,9 @@ class TestService2(TestService): self.miscExpected = 4 super().performGenericAssertionTest(assertionDict) - def analyseTmInfo(self, tmInfoQueue, assertionDict): - while not tmInfoQueue.empty(): - currentTmInfo = tmInfoQueue.get() + def analyseTmInfo(self, tmInfoQueue: TmInfoQueueT, assertionDict: dict): + while not tmInfoQueue.__len__() == 0: + currentTmInfo = tmInfoQueue.pop() # Tc verification scanning is generic and has been moved to the superclass if currentTmInfo["service"] == 1: super().scanForRespectiveTc(currentTmInfo) @@ -53,7 +53,7 @@ class TestService5(TestService): # This is required because the OBSW tasks runs with fixed sequences cls.waitIntervals = [1, 2, 3] cls.waitTime = [1.5, 2.0, 2.0] - packService5TestInto(cls.testQueue) + packService5TestInto(super().testQueue) def test_Service5(self): # analyseTmInfo() and analyseTcInfo are called here @@ -96,7 +96,7 @@ class TestService17(TestService): cls.waitIntervals = [2] cls.waitTime = 2 cls.tmTimeout = g.tmTimeout - packService17TestInto(cls.testQueue) + packService17TestInto(super().testQueue) def test_Service17(self): super().performService5or17Test() diff --git a/test/OBSW_UnitTest.py b/test/OBSW_UnitTest.py index 01011029d21e133f63c73c082ac7db16e9d8b545..2c84fe5673467279b1379db4891e7f96b27111b1 100644 --- a/test/OBSW_UnitTest.py +++ b/test/OBSW_UnitTest.py @@ -26,10 +26,13 @@ to be specified in the analyseTcInfo method of the child test. @author: R. Mueller """ import unittest -import queue - +from collections import deque +from typing import Deque from tc.OBSW_TcPacker import packDummyDeviceTestInto +from tm.OBSW_PusTm import pusPacketInfoT +from tm.OBSW_TmService1 import pusPacketInfoService1T +from sendreceive.OBSW_TmListener import TmListener from abc import abstractmethod from sendreceive.OBSW_MultipleCommandsSenderReceiver import MultipleCommandSenderReceiver from utility.OBSW_TmTcPrinter import TmTcPrinter @@ -37,17 +40,24 @@ from comIF.OBSW_Ethernet_ComIF import EthernetComIF from comIF.OBSW_Serial_ComIF import SerialComIF from config import OBSW_Config as g +TmInfoQueueT = Deque[pusPacketInfoT] +TmInfoQueueService1T = Deque[pusPacketInfoService1T] + -# documentation: https://docs.python.org/3/library/unittest.html#unittest.TestCase.setUpClass class TestService(unittest.TestCase): - testQueue = queue.Queue() - communicationInterface = 0 + """ + Generic service test class. + See documentation: https://docs.python.org/3/library/unittest.html#unittest.TestCase.setUpClass + """ - # A class method which is run before every individually class module is run - # must be decorated by the @classmethod fixture @classmethod def setUpClass(cls): - cls.displayMode = "long" + """ + A class method which is run before every individually class module is run + must be decorated by the @classmethod fixture + :return: + """ + cls._displayMode = "long" # default timeout for receiving TM, set in subclass manually cls.tmTimeout = g.tmTimeout @@ -59,29 +69,32 @@ class TestService(unittest.TestCase): cls.printTc = True # default wait time between tc send bursts cls.waitTime = 5.0 - cls.testQueue = queue.Queue() + cls.testQueue = deque() cls.tcTimeoutFactor = g.tcSendTimeoutFactor cls.printFile = g.printToFile - cls.tmtcPrinter = TmTcPrinter(cls.displayMode, cls.printFile, True) + cls.tmtcPrinter = TmTcPrinter(cls._displayMode, cls.printFile, True) + cls.communicationInterface = None if g.comIF == 0: cls.communicationInterface = EthernetComIF(cls.tmtcPrinter, g.tmTimeout, g.tcSendTimeoutFactor, g.sockSend, g.sockReceive, g.sendAddress) else: baudRate = 115200 cls.communicationInterface = SerialComIF(cls.tmtcPrinter, g.comPort, baudRate, g.serialTimeout) + cls.tmListener = TmListener(cls.communicationInterface, g.tmTimeout, cls.tcTimeoutFactor) # connectToBoard() # This function should be called in each individual test to send the actual telecommands # which are stored inside testQueue def performTestingAndGenerateAssertionDict(self): - UnitTester = MultipleCommandSenderReceiver(self.communicationInterface, self.tmtcPrinter, self.testQueue, - self.tmTimeout, self.waitIntervals, self.waitTime, self.printTm, - self.tcTimeoutFactor, self.printFile) + UnitTester = MultipleCommandSenderReceiver( + comInterface=self.communicationInterface, tmtcPrinter=self.tmtcPrinter,tmListener=self.tmListener, + tcQueue=self.testQueue,tmTimeout=self.tmTimeout, waitIntervals=self.waitIntervals, waitTime=self.waitTime, + printTm=self.printTm, tcTimeoutFactor=self.tcTimeoutFactor, doPrintToFile=self.printFile) (tcInfoQueue, tmInfoQueue) = UnitTester.sendTcQueueAndReturnTcInfo() assertionDict = self.analyseTmTcInfo(tmInfoQueue, tcInfoQueue) return assertionDict - def analyseTmTcInfo(self, tmInfoQueue, tcInfoQueue): + def analyseTmTcInfo(self, tmInfoQueue: Deque, tcInfoQueue: Deque) -> dict: self.tcSscArray = [] self.tcServiceArray = [] self.tcSubserviceArray = [] @@ -119,9 +132,9 @@ class TestService(unittest.TestCase): return assertionDict - def analyseTcInfo(self, tcInfoQueue): - while not tcInfoQueue.empty(): - currentTcInfo = tcInfoQueue.get() + def analyseTcInfo(self, tcInfoQueue: Deque): + while not tcInfoQueue.__len__() == 0: + currentTcInfo = tcInfoQueue.pop() self.tcVerifyCounter = self.tcVerifyCounter + 1 # For commands with multiple steps, update this value manually ! self.tcVerifyStepCounter = self.tcVerifyCounter + 1 @@ -131,12 +144,12 @@ class TestService(unittest.TestCase): # must be implemented by child test ! @abstractmethod - def analyseTmInfo(self, tmInfoQueue, assertionDict): + def analyseTmInfo(self, tmInfoQueue: Deque, assertionDict: dict): pass # this function looks whether the tc verification SSC matched # a SSC of the sent TM - def scanForRespectiveTc(self, currentTmInfo): + def scanForRespectiveTc(self, currentTmInfo: dict): currentSubservice = currentTmInfo["subservice"] for possibleIndex, searchIndex in enumerate(self.tcSscArray): if searchIndex == currentTmInfo["tcSSC"]: @@ -147,7 +160,7 @@ class TestService(unittest.TestCase): elif currentSubservice == 7: self.tcVerifiedCompletion = self.tcVerifiedCompletion + 1 - def performGenericAssertionTest(self, assertionDict): + def performGenericAssertionTest(self, assertionDict: dict): if assertionDict is None: print("Performing Generic Assertion Test: " "Configuratrion error, asseretion dictionary was not passed properly") @@ -186,9 +199,10 @@ class TestService(unittest.TestCase): def tearDownClass(cls): cls.eventCounter = 0 cls.tcVerifyCounter = 0 - cls.testQueue.queue.clear() + # noinspection PyUnresolvedReferences + cls.testQueue.clear() if g.comIF == 1: - cls.communicationInterface.serial.close() + cls.communicationInterface.close() class TestDummyDevice(TestService): @@ -196,7 +210,7 @@ class TestDummyDevice(TestService): def setUpClass(cls): super().setUpClass() print("Testing Dummy Device") - packDummyDeviceTestInto(cls.testQueue) + packDummyDeviceTestInto(super().testQueue) def analyseTmInfo(self, tmInfoQueue, assertionDict): pass diff --git a/tm/OBSW_TmPacket.py b/tm/OBSW_TmPacket.py index d37864dbfa0748939d738debac2415e94fd649b7..dbeabddbbf25d87919eb7a206e473b8964a117ac 100644 --- a/tm/OBSW_TmPacket.py +++ b/tm/OBSW_TmPacket.py @@ -11,9 +11,6 @@ from tm.OBSW_TmService1 import Service1TM from tm.OBSW_TmService3 import Service3TM from tm.OBSW_TmService5 import Service5TM import struct -# TM Packets use the generic space packet structure provided by OBSWPusPacket to generate individual -# telemetry packets for all services -# TO DO: Different classes for different services? def PUSTelemetryFactory(rawPacket): diff --git a/tm/OBSW_TmService1.py b/tm/OBSW_TmService1.py index 969aaa173c465adbbf43e545cd1f7add2527f0e2..59d68d2c7c74b411fa68e6d1a80abf18566941e4 100644 --- a/tm/OBSW_TmService1.py +++ b/tm/OBSW_TmService1.py @@ -5,10 +5,12 @@ Date: 30.12.2019 Description: Deserialize Pus Verification TM Author: R. Mueller """ - from tm.OBSW_PusTm import PUSTelemetry +from typing import Dict import struct +pusPacketInfoService1T = Dict[str, any] + class Service1TM(PUSTelemetry): def __init__(self, byteArray): @@ -66,7 +68,7 @@ class Service1TM(PUSTelemetry): elif self.isStep: array.append("Step Number") - def packTmInformation(self): + def packTmInformation(self) -> pusPacketInfoService1T: tmInformation = super().packTmInformation() addInformation = { "tcPacketId": self.tcPacketId,