From ba9f898f9144bc4a483ffa5a4dba18f26f30d622 Mon Sep 17 00:00:00 2001 From: Robin Mueller <robin.mueller.m@gmail.com> Date: Wed, 11 Mar 2020 18:24:41 +0100 Subject: [PATCH] replace queue by deque, lots of small stuff --- comIF/OBSW_ComInterface.py | 4 + comIF/OBSW_DummyComIF.py | 3 + comIF/OBSW_Ethernet_ComIF.py | 3 + comIF/OBSW_Serial_ComIF.py | 10 +- sendreceive/OBSW_CommandSenderReceiver.py | 26 +--- .../OBSW_MultipleCommandsSenderReceiver.py | 31 ++--- sendreceive/OBSW_SequentialSenderReceiver.py | 16 +-- sendreceive/OBSW_TmListener.py | 4 +- tc/OBSW_TcPacker.py | 127 +++++++++--------- tc/OBSW_TcService2.py | 30 ++--- tc/OBSW_TcService200.py | 25 ++-- tc/OBSW_TcService3.py | 85 ++++++------ tc/OBSW_TcService8.py | 34 ++--- test/OBSW_PusServiceTest.py | 14 +- test/OBSW_UnitTest.py | 60 +++++---- tm/OBSW_TmPacket.py | 3 - tm/OBSW_TmService1.py | 6 +- 17 files changed, 244 insertions(+), 237 deletions(-) diff --git a/comIF/OBSW_ComInterface.py b/comIF/OBSW_ComInterface.py index 5c97140..49cab7f 100644 --- a/comIF/OBSW_ComInterface.py +++ b/comIF/OBSW_ComInterface.py @@ -20,6 +20,10 @@ class CommunicationInterface: def __init__(self, tmtcPrinter): self.tmtcPrinter = tmtcPrinter + @abstractmethod + def close(self) -> None: + pass + @abstractmethod def sendTelecommand(self, tcPacket, tcPacketInfo="") -> None: """ diff --git a/comIF/OBSW_DummyComIF.py b/comIF/OBSW_DummyComIF.py index 15e5ea4..a5b8149 100644 --- a/comIF/OBSW_DummyComIF.py +++ b/comIF/OBSW_DummyComIF.py @@ -11,6 +11,9 @@ from comIF.OBSW_ComInterface import CommunicationInterface class DummyComIF(CommunicationInterface): + def close(self) -> None: + pass + def dataAvailable(self, parameters): pass diff --git a/comIF/OBSW_Ethernet_ComIF.py b/comIF/OBSW_Ethernet_ComIF.py index cc5a075..d49b133 100644 --- a/comIF/OBSW_Ethernet_ComIF.py +++ b/comIF/OBSW_Ethernet_ComIF.py @@ -22,6 +22,9 @@ class EthernetComIF(CommunicationInterface): self.sendAddress = sendAddress self.setUpSocket() + def close(self) -> None: + pass + def sendTelecommand(self, tcPacket, tcPacketInfo=""): self.tmtcPrinter.printTelecommand(tcPacket, tcPacketInfo) self.sendSocket.sendto(tcPacket, self.sendAddress) diff --git a/comIF/OBSW_Serial_ComIF.py b/comIF/OBSW_Serial_ComIF.py index 5f234ee..5241e3c 100644 --- a/comIF/OBSW_Serial_ComIF.py +++ b/comIF/OBSW_Serial_ComIF.py @@ -21,11 +21,19 @@ class SerialComIF(CommunicationInterface): self.serial = serial.Serial(port=comPort, baudrate=baudRate, timeout=serialTimeout) except serial.SerialException: print("Serial Port can not be opened") - logging.exception("Error: ") + logging.exception("Error") exit() self.data = bytearray() self.numberOfPackets = 0 + def close(self): + try: + self.serial.close() + except serial.SerialException: + print("Serial Port could not be closed !") + logging.exception("Error") + exit() + def sendTelecommand(self, tcPacket, tcPacketInfo=""): self.tmtcPrinter.printTelecommand(tcPacket, tcPacketInfo) self.serial.write(tcPacket) diff --git a/sendreceive/OBSW_CommandSenderReceiver.py b/sendreceive/OBSW_CommandSenderReceiver.py index 7100728..e50981c 100644 --- a/sendreceive/OBSW_CommandSenderReceiver.py +++ b/sendreceive/OBSW_CommandSenderReceiver.py @@ -12,17 +12,15 @@ if the first reply has not been received. @author: R. Mueller """ import time -from config import OBSW_Config as g from comIF.OBSW_ComInterface import CommunicationInterface, ComIF_T from utility.OBSW_TmTcPrinter import TmTcPrinterT, TmTcPrinter from sendreceive.OBSW_TmListener import TmListenerT, TmListener -# Generic TMTC SendReceive class which is implemented -# by specific implementations (e.g. SingleCommandSenderReceiver) class CommandSenderReceiver: """ - This is the generic CommandSenderReceiver object. All TMTC objects inherit this object + This is the generic CommandSenderReceiver object. All TMTC objects inherit this object, + for example specific implementations (e.g. SingleCommandSenderReceiver) """ def __init__(self, comInterface: ComIF_T, tmtcPrinter: TmTcPrinterT, tmListener: TmListenerT, tmTimeout: float, tcSendTimeoutFactor: float, doPrintToFile: bool): @@ -121,26 +119,6 @@ class CommandSenderReceiver: self.start_time = time.time() time.sleep(1) - # check for sequence of replies for a telecommand (e.g. TM[1,1] , TM[1,7] ...) for tmTimeout value - # returns True on success - # def checkForOneTelemetrySequence(self): - # tmReady = self.comInterface.dataAvailable(self.tmTimeout * self.tcSendTimeoutFactor) - # if tmReady is False: - # return False - # else: - # self.comInterface.receiveTelemetry() - # start_time = time.time() - # elapsed_time = 0 - # while elapsed_time < self.tmTimeout: - # tmReady = self.comInterface.dataAvailable(1.0) - # if tmReady: - # self.comInterface.receiveTelemetry() - # elapsed_time = time.time() - start_time - # # the timeout value can be set by special TC queue entries if packet handling takes longer, - # # but it is reset here to the global value - # if self.tmTimeout is not g.tmTimeout: - # self.tmTimeout = g.tmTimeout - # return True diff --git a/sendreceive/OBSW_MultipleCommandsSenderReceiver.py b/sendreceive/OBSW_MultipleCommandsSenderReceiver.py index 2008cbe..bc3ff58 100644 --- a/sendreceive/OBSW_MultipleCommandsSenderReceiver.py +++ b/sendreceive/OBSW_MultipleCommandsSenderReceiver.py @@ -6,12 +6,10 @@ @brief Used to send multiple TCs as bursts and listen for replies simultaneously. Used by UnitTester """ -import queue import time -import threading -from typing import Union +from typing import Union, Deque +from collections import deque -from config import OBSW_Config as g from sendreceive.OBSW_SequentialSenderReceiver import SequentialCommandSenderReceiver from comIF.OBSW_ComInterface import ComIF_T from utility.OBSW_TmTcPrinter import TmTcPrinterT @@ -23,7 +21,7 @@ class MultipleCommandSenderReceiver(SequentialCommandSenderReceiver): Difference to seqential sender: This class can send TCs in bursts. Wait intervals can be specified with wait time between the send bursts. This is generally done in the separate test classes in UnitTest """ - def __init__(self, comInterface: ComIF_T, tmtcPrinter: TmTcPrinterT, tcQueue, tmListener: TmListenerT, + def __init__(self, comInterface: ComIF_T, tmtcPrinter: TmTcPrinterT, tcQueue: Deque, tmListener: TmListenerT, tmTimeout: float, waitIntervals: list, waitTime: Union[float, list], printTm: bool, tcTimeoutFactor: float, doPrintToFile: bool): """ @@ -44,16 +42,16 @@ class MultipleCommandSenderReceiver(SequentialCommandSenderReceiver): self.waitIntervals = waitIntervals self.waitTime = waitTime self.printTm = printTm - self.tmInfoQueue = queue.Queue() - self.tcInfoQueue = queue.Queue() + self.tmInfoQueue = deque() + self.tcInfoQueue = deque() self.pusPacketInfo = [] self.pusPacket = [] self.waitCounter = 0 def sendTcQueueAndReturnTcInfo(self): - receiverThread = threading.Thread(target=self.checkForMultipleReplies) - receiverThread.start() - time.sleep(0.5) + # receiverThread = threading.Thread(target=self.checkForMultipleReplies) + # receiverThread.start() + # time.sleep(0.5) try: self.sendAllQueue() while not self.allRepliesReceived: @@ -76,15 +74,12 @@ class MultipleCommandSenderReceiver(SequentialCommandSenderReceiver): def sendAllQueue(self): while not self.tcQueue.empty(): - if g.comIF == 1: - pass - # time.sleep(0.01) # pause could be smaller, but causes overrun error on flashed board self.sendAndPrintTc() def sendAndPrintTc(self): - self.checkQueueEntry(self.tcQueue.get()) + self.checkQueueEntry(self.tcQueue.pop()) if self.queueEntryIsTelecommand: - self.tcInfoQueue.put(self.pusPacketInfo) + self.tcInfoQueue.append(self.pusPacketInfo) self.comInterface.sendTelecommand(self.pusPacket, self.pusPacketInfo) self.handleWaiting() @@ -102,8 +97,8 @@ class MultipleCommandSenderReceiver(SequentialCommandSenderReceiver): else: print("Multiple Command SenderReceiver: Configuration error, reply event not set in TM listener") - def checkForMultipleReplies(self): - super().checkForMultipleReplies() + # def checkForMultipleReplies(self): + # super().checkForMultipleReplies() # def checkForMultipleReplies(self): # while not self.allRepliesReceived and self.run_event.is_set(): @@ -124,7 +119,7 @@ class MultipleCommandSenderReceiver(SequentialCommandSenderReceiver): packetList = self.comInterface.receiveTelemetry() for counter in range(0, len(packetList)): tmInfo = packetList[counter].packTmInformation() - self.tmInfoQueue.put(tmInfo) + self.tmInfoQueue.append(tmInfo) def handleLastRepliesListening(self, start_time): elapsed_time_seconds = 0 diff --git a/sendreceive/OBSW_SequentialSenderReceiver.py b/sendreceive/OBSW_SequentialSenderReceiver.py index baea1ec..c7c7e43 100644 --- a/sendreceive/OBSW_SequentialSenderReceiver.py +++ b/sendreceive/OBSW_SequentialSenderReceiver.py @@ -8,7 +8,7 @@ @brief Used to send multiple TCs in sequence and listen for replies after each sent tc """ -import threading +from typing import Deque import time import config.OBSW_Config as g @@ -23,7 +23,7 @@ class SequentialCommandSenderReceiver(CommandSenderReceiver): Specific implementation of CommandSenderReceiver to send multiple telecommands in sequence """ def __init__(self, comInterface: ComIF_T, tmtcPrinter: TmTcPrinterT, tmListener: TmListenerT, tmTimeout: float, - tcQueue, tcTimeoutFactor: float, doPrintToFile: bool): + tcQueue: Deque, tcTimeoutFactor: float, doPrintToFile: bool): """ :param comInterface: CommunicationInterface object, passed on to CommandSenderReceiver :param tmListener: TmListener object which runs in the background and receives all Telemetry @@ -43,7 +43,7 @@ class SequentialCommandSenderReceiver(CommandSenderReceiver): self.abortFlag = False def sendQueueTcAndReceiveTmSequentially(self): - self.tmListener.modeId = g.modeList[3] + self.tmListener.modeId = g.modeList.ServiceTestMode self.tmListener.modeChangeEvent.set() self.sendAndReceiveFirstPacket() # this flag is set in the separate thread ! @@ -56,9 +56,9 @@ class SequentialCommandSenderReceiver(CommandSenderReceiver): def handleTcSending(self): while not self.allRepliesReceived: - while not self.tcQueue.empty(): + while not self.tcQueue.__len__() == 0: self.performNextTcSend() - if self.tcQueue.empty(): + if self.tcQueue.__len__() == 0: self.start_time = time.time() self.checkForTimeout() print("Sequential SenderReceiver: All replies received!") @@ -81,18 +81,18 @@ class SequentialCommandSenderReceiver(CommandSenderReceiver): self.checkForTimeout() def sendAndReceiveFirstPacket(self): - self.checkQueueEntry(self.tcQueue.get()) + self.checkQueueEntry(self.tcQueue.pop()) if self.queueEntryIsTelecommand: self.comInterface.sendTelecommand(self.pusPacket, self.pusPacketInfo) else: self.sendAndReceiveFirstPacket() def sendNextTelecommand(self): - self.checkQueueEntry(self.tcQueue.get()) + self.checkQueueEntry(self.tcQueue.pop()) if self.queueEntryIsTelecommand: self.start_time = time.time() self.comInterface.sendTelecommand(self.pusPacket, self.pusPacketInfo) - elif self.tcQueue.empty(): + elif self.tcQueue.__len__() == 0: # Special case: Last queue entry is not a Telecommand self.allRepliesReceived = True else: diff --git a/sendreceive/OBSW_TmListener.py b/sendreceive/OBSW_TmListener.py index 7f2821c..8fccb0c 100644 --- a/sendreceive/OBSW_TmListener.py +++ b/sendreceive/OBSW_TmListener.py @@ -10,7 +10,7 @@ """ import time import threading -import queue +from collections import deque from typing import TypeVar from comIF.OBSW_ComInterface import ComIF_T @@ -47,7 +47,7 @@ class TmListener: # This Event is set and cleared by the listener to inform the sender objects if a reply has been received self.replyEvent = threading.Event() # Will be filled for the Unit Test - self.tmInfoQueue = queue.Queue() + self.tmInfoQueue = deque() def start(self): self.listenerThread.start() diff --git a/tc/OBSW_TcPacker.py b/tc/OBSW_TcPacker.py index cc8dd50..5d2142e 100644 --- a/tc/OBSW_TcPacker.py +++ b/tc/OBSW_TcPacker.py @@ -13,7 +13,7 @@ Run script with -s <Service or Device> -m 3 with optional -p parameter for file """ import os -from tc.OBSW_TcPacket import * +from tc.OBSW_TcPacket import PUSTelecommand from tc.OBSW_TcService2 import packService2TestInto from tc.OBSW_TcService3 import packService3TestInto from tc.OBSW_TcService8 import packService8TestInto @@ -21,9 +21,10 @@ from tc.OBSW_TcService200 import packModeData, packService200TestInto import config.OBSW_Config as g from datetime import datetime from collections import deque +from typing import Deque,Union -def serviceTestSelect(service, serviceQueue): +def serviceTestSelect(service: Union[int, str], serviceQueue: Deque) -> Deque: if service == 2: return packService2TestInto(serviceQueue) elif service == 3: @@ -55,31 +56,31 @@ def serviceTestSelect(service, serviceQueue): exit() -def packService5TestInto(tcQueue): - tcQueue.put(("print", "Testing Service 5")) +def packService5TestInto(tcQueue: Deque): + tcQueue.append(("print", "Testing Service 5")) # disable events - tcQueue.put(("print", "\r\nTesting Service 5: Disable event")) + tcQueue.append(("print", "\r\nTesting Service 5: Disable event")) command = PUSTelecommand(service=5, subservice=6, SSC=500) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # trigger event - tcQueue.put(("print", "\r\nTesting Service 5: Trigger event")) + tcQueue.append(("print", "\r\nTesting Service 5: Trigger event")) command = PUSTelecommand(service=17, subservice=128, SSC=510) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # enable event - tcQueue.put(("print", "\r\nTesting Service 5: Enable event")) + tcQueue.append(("print", "\r\nTesting Service 5: Enable event")) command = PUSTelecommand(service=5, subservice=5, SSC=520) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # trigger event - tcQueue.put(("print", "\r\nTesting Service 5: Trigger another event")) + tcQueue.append(("print", "\r\nTesting Service 5: Trigger another event")) command = PUSTelecommand(service=17, subservice=128, SSC=530) - tcQueue.put(command.packCommandTuple()) - tcQueue.put(("print", "\r")) - tcQueue.put(("export", "log/tmtc_log_service5.txt")) + tcQueue.append(command.packCommandTuple()) + tcQueue.append(("print", "\r")) + tcQueue.append(("export", "log/tmtc_log_service5.txt")) return tcQueue -def packService9TestInto(tcQueue): - tcQueue.put(("print", "Testing Service 9")) +def packService9TestInto(tcQueue: Deque) -> Deque: + tcQueue.append(("print", "Testing Service 9")) timeTestASCIICodeA = '2019-08-30T20:50:33.892429Z' + '\0' timeTestASCIICodeB = '2019-270T05:50:33.002000Z' + '\0' timeTestCurrentTime = datetime.now().isoformat() + "Z" + '\0' @@ -90,120 +91,120 @@ def packService9TestInto(tcQueue): print("Time Code 2 :" + str(timeTest2)) print("Time Code 3 :" + str(timeTest3) + "\r") # time setting - tcQueue.put(("print", "\r\nTesting Service 9: Testing timecode A")) + tcQueue.append(("print", "\r\nTesting Service 9: Testing timecode A")) command = PUSTelecommand(service=9, subservice=128, SSC=900, data=timeTest1) - tcQueue.put(command.packCommandTuple()) - tcQueue.put(("print", "\r\nTesting Service 9: Testing timecode B")) + tcQueue.append(command.packCommandTuple()) + tcQueue.append(("print", "\r\nTesting Service 9: Testing timecode B")) command = PUSTelecommand(service=9, subservice=128, SSC=910, data=timeTest2) - tcQueue.put(command.packCommandTuple()) - tcQueue.put(("print", "\r\nTesting Service 9: Testing timecode Current Time")) + tcQueue.append(command.packCommandTuple()) + tcQueue.append(("print", "\r\nTesting Service 9: Testing timecode Current Time")) command = PUSTelecommand(service=9, subservice=128, SSC=920, data=timeTest3) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # TODO: Add other time formats here - tcQueue.put(("print", "\r")) - tcQueue.put(("export", "log/tmtc_log_service9.txt")) + tcQueue.append(("print", "\r")) + tcQueue.append(("export", "log/tmtc_log_service9.txt")) return tcQueue -def packService17TestInto(tcQueue): - tcQueue.put(("print", "Testing Service 17")) +def packService17TestInto(tcQueue: Deque) -> Deque: + tcQueue.append(("print", "Testing Service 17")) # ping test - tcQueue.put(("print", "\n\rTesting Service 17: Ping Test")) + tcQueue.append(("print", "\n\rTesting Service 17: Ping Test")) command = PUSTelecommand(service=17, subservice=1, SSC=1700) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # enable event - tcQueue.put(("print", "\n\rTesting Service 17: Enable Event")) + tcQueue.append(("print", "\n\rTesting Service 17: Enable Event")) command = PUSTelecommand(service=5, subservice=5, SSC=52) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # test event - tcQueue.put(("print", "\n\rTesting Service 17: Trigger event")) + tcQueue.append(("print", "\n\rTesting Service 17: Trigger event")) command = PUSTelecommand(service=17, subservice=128, SSC=1701) - tcQueue.put(command.packCommandTuple()) - tcQueue.put(("print", "\r")) - tcQueue.put(("export", "log/tmtc_log_service17.txt")) + tcQueue.append(command.packCommandTuple()) + tcQueue.append(("print", "\r")) + tcQueue.append(("export", "log/tmtc_log_service17.txt")) return tcQueue -def packDummyDeviceTestInto(tcQueue): - tcQueue.put(("print", "Testing Dummy Device")) +def packDummyDeviceTestInto(tcQueue: Deque) -> Deque: + tcQueue.append(("print", "Testing Dummy Device")) # Object ID: Dummy Device objectId = bytearray([0x44, 0x00, 0xAF, 0xFE]) # Set On Mode - tcQueue.put(("print", "\n\rTesting Service Dummy: Set On")) + tcQueue.append(("print", "\n\rTesting Service Dummy: Set On")) modeData = packModeData(objectId, 1, 0) command = PUSTelecommand(service=200, subservice=1, SSC=1, data=modeData) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # Test Service 2 commands - tcQueue.put(("print", "\n\rTesting Service Dummy: Service 2")) + tcQueue.append(("print", "\n\rTesting Service Dummy: Service 2")) packService2TestInto(tcQueue, True) # Test Service 8 - tcQueue.put(("print", "\n\rTesting Service Dummy: Service 8")) + tcQueue.append(("print", "\n\rTesting Service Dummy: Service 8")) packService8TestInto(tcQueue, True) - tcQueue.put(("print", "\r")) - tcQueue.put(("export", "log/tmtc_log_service_dummy.txt")) + tcQueue.append(("print", "\r")) + tcQueue.append(("export", "log/tmtc_log_service_dummy.txt")) return tcQueue -def packGpsTestInto(objectId, tcQueue): +def packGpsTestInto(objectId: bytearray, tcQueue: Deque) -> Deque: if objectId == g.GPS0_ObjectId: gpsString = "GPS0" elif objectId == g.GPS1_ObjectId: gpsString = "GPS1" else: gpsString = "unknown" - tcQueue.put(("print", "Testing " + gpsString + " Device")) + tcQueue.append(("print", "Testing " + gpsString + " Device")) # Set Mode Off - tcQueue.put(("print", "\n\rTesting " + gpsString + ": Set Off")) + tcQueue.append(("print", "\n\rTesting " + gpsString + ": Set Off")) modeData = packModeData(objectId, 0, 0) command = PUSTelecommand(service=200, subservice=1, SSC=11, data=modeData) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # Set Mode On - tcQueue.put(("print", "\n\rTesting " + gpsString + ": Set On")) + tcQueue.append(("print", "\n\rTesting " + gpsString + ": Set On")) modeData = packModeData(objectId, 1, 0) command = PUSTelecommand(service=200, subservice=1, SSC=12, data=modeData) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # Enable HK report sidGps = 0 if objectId == g.GPS0_ObjectId: sidGps = g.GPS0_SID - tcQueue.put(("print", "\r\nTesting " + gpsString + ": Enable HK Reporting")) + tcQueue.append(("print", "\r\nTesting " + gpsString + ": Enable HK Reporting")) command = PUSTelecommand(service=3, subservice=5, SSC=13, data=sidGps) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) elif objectId == g.GPS1_ObjectId: sidGps = g.GPS1_SID - tcQueue.put(("print", "\r\nTesting " + gpsString + ": Enable HK Reporting")) + tcQueue.append(("print", "\r\nTesting " + gpsString + ": Enable HK Reporting")) command = PUSTelecommand(service=3, subservice=5, SSC=14, data=sidGps) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # pack wait interval until mode is on and a few gps replies have been received - tcQueue.put(("wait", 5)) + tcQueue.append(("wait", 5)) # Disable HK reporting - tcQueue.put(("print", "\r\nTesting Service 3: Disable " + gpsString + " definition")) + tcQueue.append(("print", "\r\nTesting Service 3: Disable " + gpsString + " definition")) command = PUSTelecommand(service=3, subservice=6, SSC=15, data=sidGps) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # Set Mode Off - tcQueue.put(("print", "\n\rTesting " + gpsString + ": Set Off")) + tcQueue.append(("print", "\n\rTesting " + gpsString + ": Set Off")) modeData = packModeData(objectId, 0, 0) command = PUSTelecommand(service=200, subservice=1, SSC=13, data=modeData) - tcQueue.put(command.packCommandTuple()) - tcQueue.put(("print", "\r")) - tcQueue.put(("export", "log/tmtc_log_service_" + gpsString + ".txt")) + tcQueue.append(command.packCommandTuple()) + tcQueue.append(("print", "\r")) + tcQueue.append(("export", "log/tmtc_log_service_" + gpsString + ".txt")) return tcQueue -def packErrorTestingInto(tcQueue): +def packErrorTestingInto(tcQueue: Deque) -> Deque: # a lot of events command = PUSTelecommand(service=17, subservice=129, SSC=2010) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # a lot of ping testing command = PUSTelecommand(service=17, subservice=130, SSC=2020) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) return tcQueue def createTotalTcQueue(): if not os.path.exists("log"): os.mkdir("log") - tcQueue = queue.Queue() + tcQueue = deque() tcQueue = packService2TestInto(tcQueue) tcQueue = packService5TestInto(tcQueue) tcQueue = packService8TestInto(tcQueue) diff --git a/tc/OBSW_TcService2.py b/tc/OBSW_TcService2.py index 5eae374..84be166 100644 --- a/tc/OBSW_TcService2.py +++ b/tc/OBSW_TcService2.py @@ -6,46 +6,46 @@ Description: PUS Custom Service 8: Device Access, Native low-level commanding @author: R. Mueller """ - import struct +from typing import Deque from tc.OBSW_TcPacket import * from tc.OBSW_TcService200 import packModeData -def packService2TestInto(tcQueue, calledExternally=False): +def packService2TestInto(tcQueue: Deque, calledExternally: bool = False) -> Deque: if calledExternally is False: - tcQueue.put(("print", "Testing Service 2")) + tcQueue.append(("print", "Testing Service 2")) objectId = bytearray([0x44, 0x00, 0xAF, 0xFE]) # dummy device # don't forget to set object mode raw (service 200) before calling this ! # Set Raw Mode - tcQueue.put(("print", "\r\nTesting Service 2: Setting Raw Mode")) + tcQueue.append(("print", "\r\nTesting Service 2: Setting Raw Mode")) modeData = packModeData(objectId, 3, 0) command = PUSTelecommand(service=200, subservice=1, SSC=2020, data=modeData) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # toggle wiretapping raw - tcQueue.put(("print", "\r\nTesting Service 2: Toggling Wiretapping Raw")) + tcQueue.append(("print", "\r\nTesting Service 2: Toggling Wiretapping Raw")) wiretappingToggleData = packWiretappingMode(objectId, 1) toggleWiretappingOnCommand = PUSTelecommand(service=2, subservice=129, SSC=200, data=wiretappingToggleData) - tcQueue.put(toggleWiretappingOnCommand.packCommandTuple()) + tcQueue.append(toggleWiretappingOnCommand.packCommandTuple()) # send raw command, data should be returned via TM[2,130] and TC[2,131] - tcQueue.put(("print", "\r\nTesting Service 2: Sending Raw Command")) + tcQueue.append(("print", "\r\nTesting Service 2: Sending Raw Command")) rawCommand = struct.pack(">I", 666) rawData = objectId + rawCommand rawCommand = PUSTelecommand(service=2, subservice=128, SSC=201, data=rawData) - tcQueue.put(rawCommand.packCommandTuple()) + tcQueue.append(rawCommand.packCommandTuple()) # toggle wiretapping off - tcQueue.put(("print", "\r\nTesting Service 2: Toggle Wiretapping Off")) + tcQueue.append(("print", "\r\nTesting Service 2: Toggle Wiretapping Off")) wiretappingToggleData = packWiretappingMode(objectId, 0) toggleWiretappingOffCommand = PUSTelecommand(service=2, subservice=129, SSC=204, data=wiretappingToggleData) - tcQueue.put(toggleWiretappingOffCommand.packCommandTuple()) + tcQueue.append(toggleWiretappingOffCommand.packCommandTuple()) # send raw command which should be returned via TM[2,130] - tcQueue.put(("print", "\r\nTesting Service 2: Send second raw command")) + tcQueue.append(("print", "\r\nTesting Service 2: Send second raw command")) command = PUSTelecommand(service=2, subservice=128, SSC=205, data=rawData) - tcQueue.put(command.packCommandTuple()) - tcQueue.put(("print", "\r")) + tcQueue.append(command.packCommandTuple()) + tcQueue.append(("print", "\r")) if calledExternally is False: - tcQueue.put(("export", "log/tmtc_log_service2.txt")) + tcQueue.append(("export", "log/tmtc_log_service2.txt")) return tcQueue diff --git a/tc/OBSW_TcService200.py b/tc/OBSW_TcService200.py index ba4a2dc..a9c545c 100644 --- a/tc/OBSW_TcService200.py +++ b/tc/OBSW_TcService200.py @@ -11,35 +11,36 @@ Run script with -s <Service or Device> -m 3 with optional -p parameter for file @author: R. Mueller """ -from tc.OBSW_TcPacket import * +from tc.OBSW_TcPacket import PUSTelecommand +from typing import Deque import struct -def packService200TestInto(tcQueue): - tcQueue.put(("print", "Testing Service 200")) +def packService200TestInto(tcQueue: Deque) -> Deque: + tcQueue.append(("print", "Testing Service 200")) # Object ID: Dummy Device objectId = bytearray([0x44, 0x00, 0xAF, 0xFE]) # Set On Mode - tcQueue.put(("print", "\r\nTesting Service 200: Set Mode On")) + tcQueue.append(("print", "\r\nTesting Service 200: Set Mode On")) modeData = packModeData(objectId, 1, 0) command = PUSTelecommand(service=200, subservice=1, SSC=2000, data=modeData) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # Set Normal mode - tcQueue.put(("print", "\r\nTesting Service 200: Set Mode Normal")) + tcQueue.append(("print", "\r\nTesting Service 200: Set Mode Normal")) modeData = packModeData(objectId, 2, 0) command = PUSTelecommand(service=200, subservice=1, SSC=2010, data=modeData) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # Set Raw Mode - tcQueue.put(("print", "\r\nTesting Service 200: Set Mode Raw")) + tcQueue.append(("print", "\r\nTesting Service 200: Set Mode Raw")) modeData = packModeData(objectId, 3, 0) command = PUSTelecommand(service=200, subservice=1, SSC=2020, data=modeData) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # Set Off Mode - tcQueue.put(("print", "\r\nTesting Service 200: Set Mode Off")) + tcQueue.append(("print", "\r\nTesting Service 200: Set Mode Off")) modeData = packModeData(objectId, 0, 0) command = PUSTelecommand(service=200, subservice=1, SSC=2030, data=modeData) - tcQueue.put(command.packCommandTuple()) - tcQueue.put(("export", "log/tmtc_log_service200.txt")) + tcQueue.append(command.packCommandTuple()) + tcQueue.append(("export", "log/tmtc_log_service200.txt")) return tcQueue diff --git a/tc/OBSW_TcService3.py b/tc/OBSW_TcService3.py index 4f145bf..5504f49 100644 --- a/tc/OBSW_TcService3.py +++ b/tc/OBSW_TcService3.py @@ -7,11 +7,12 @@ Description: PUS Custom Service 8: Device Access, Native low-level commanding @author: R. Mueller """ import struct +from typing import Deque from tc.OBSW_TcPacket import PUSTelecommand -def packService3TestInto(tcQueue): - tcQueue.put(("print", "Testing Service 3")) +def packService3TestInto(tcQueue: Deque) -> Deque: + tcQueue.append(("print", "Testing Service 3")) # adding custom defintion to hk using test pool variables sid1 = bytearray([0x00, 0x00, 0x43, 0x00]) sid2 = bytearray([0x00, 0x00, 0x44, 0x00]) @@ -28,96 +29,96 @@ def packService3TestInto(tcQueue): hkDefinition2 = sid2 + collectionInterval + numberOfParameters + p1 + p2 + p3 + p4 + p5 # deleting pre-defined test entry - tcQueue.put(("print", "\r\nTesting Service 3: Deleting pre-defined HK definition")) + tcQueue.append(("print", "\r\nTesting Service 3: Deleting pre-defined HK definition")) command = PUSTelecommand(service=3, subservice=3, SSC=3000, data=sid1) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # adding pre-defined definition to hk using test pool variables - tcQueue.put(("print", "\r\nTesting Service 3: Adding pre-defined HK definition")) + tcQueue.append(("print", "\r\nTesting Service 3: Adding pre-defined HK definition")) command = PUSTelecommand(service=3, subservice=1, SSC=3010, data=hkDefinition1) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # adding custom definition to diagnostics using test pool variables - tcQueue.put(("print", "\r\nTesting Service 3: Adding custom diganostics definition")) + tcQueue.append(("print", "\r\nTesting Service 3: Adding custom diganostics definition")) command = PUSTelecommand(service=3, subservice=2, SSC=3020, data=hkDefinition2) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # enable custom hk definition - tcQueue.put(("print", "\r\nTesting Service 3: Enable custom definition")) + tcQueue.append(("print", "\r\nTesting Service 3: Enable custom definition")) command = PUSTelecommand(service=3, subservice=5, SSC=3030, data=sid1) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # enable custom diag definition - tcQueue.put(("print", "\r\nTesting Service 3: Enable custom diagnostics definition")) + tcQueue.append(("print", "\r\nTesting Service 3: Enable custom diagnostics definition")) command = PUSTelecommand(service=3, subservice=7, SSC=3040, data=sid2) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # enable gps0 - tcQueue.put(("print", "\r\nTesting Service 3: Enable GPS definition")) + tcQueue.append(("print", "\r\nTesting Service 3: Enable GPS definition")) command = PUSTelecommand(service=3, subservice=5, SSC=3050, data=sidGps) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # maybe wait a bit to receive at least 2 packets.. - tcQueue.put(("wait", 3)) + tcQueue.append(("wait", 3)) # Disable custom hk definition - tcQueue.put(("print", "\r\nTesting Service 3: Disable custom definition")) + tcQueue.append(("print", "\r\nTesting Service 3: Disable custom definition")) command = PUSTelecommand(service=3, subservice=6, SSC=3060, data=sid1) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # Disable custom diag definition - tcQueue.put(("print", "\r\nTesting Service 3: Disable custom diagnostics definition")) + tcQueue.append(("print", "\r\nTesting Service 3: Disable custom diagnostics definition")) command = PUSTelecommand(service=3, subservice=8, SSC=3070, data=sid2) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # disable gps0 - tcQueue.put(("print", "\r\nTesting Service 3: Disable GPS definition")) + tcQueue.append(("print", "\r\nTesting Service 3: Disable GPS definition")) command = PUSTelecommand(service=3, subservice=6, SSC=3080, data=sidGps) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # report custom Diag definition - tcQueue.put(("print", "\r\nTesting Service 3: Reporting diag definition")) + tcQueue.append(("print", "\r\nTesting Service 3: Reporting diag definition")) command = PUSTelecommand(service=3, subservice=11, SSC=3100, data=sid2) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # report gps definition - tcQueue.put(("print", "\r\nTesting Service 3: Reporting GPS definition")) + tcQueue.append(("print", "\r\nTesting Service 3: Reporting GPS definition")) command = PUSTelecommand(service=3, subservice=9, SSC=3110, data=sidGps) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # generate one custom hk definition - tcQueue.put(("print", "\r\nTesting Service 3: Generate one custom hk definition")) + tcQueue.append(("print", "\r\nTesting Service 3: Generate one custom hk definition")) command = PUSTelecommand(service=3, subservice=27, SSC=3120, data=sid1) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # generate one custom diag definition - tcQueue.put(("print", "\r\nTesting Service 3: Generate one custom diagnostics definition")) + tcQueue.append(("print", "\r\nTesting Service 3: Generate one custom diagnostics definition")) command = PUSTelecommand(service=3, subservice=28, SSC=3120, data=sid2) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # generate one gps 0 definition - tcQueue.put(("print", "\r\nTesting Service 3: Generate one gps 0 defintion")) + tcQueue.append(("print", "\r\nTesting Service 3: Generate one gps 0 defintion")) command = PUSTelecommand(service=3, subservice=27, SSC=3120, data=sidGps) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # modify custom hk definition interval newInterval = struct.pack('>f', 10.0) newIntervalCommand = sid1 + newInterval - tcQueue.put(("print", "\r\nTesting Service 3: Changing pre-defined HK definition interval")) + tcQueue.append(("print", "\r\nTesting Service 3: Changing pre-defined HK definition interval")) command = PUSTelecommand(service=3, subservice=31, SSC=3090, data=newIntervalCommand) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # report custom HK definition - tcQueue.put(("print", "\r\nTesting Service 3: Reporting pre-defined HK definition with changed interval")) + tcQueue.append(("print", "\r\nTesting Service 3: Reporting pre-defined HK definition with changed interval")) command = PUSTelecommand(service=3, subservice=9, SSC=3090, data=sid1) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # modify custom diag definition interval newIntervalCommand = sid2 + newInterval - tcQueue.put(("print", "\r\nTesting Service 3: Changing custom diag HK definition interval")) + tcQueue.append(("print", "\r\nTesting Service 3: Changing custom diag HK definition interval")) command = PUSTelecommand(service=3, subservice=32, SSC=3090, data=newIntervalCommand) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # report custom diag definition - tcQueue.put(("print", "\r\nTesting Service 3: Reporting diag definition")) + tcQueue.append(("print", "\r\nTesting Service 3: Reporting diag definition")) command = PUSTelecommand(service=3, subservice=11, SSC=3100, data=sid2) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # append parameter to custom hk definiton # append parameter to custom diag definition # delete custom diag definition - tcQueue.put(("print", "\r\nTesting Service 3: Deleting custom diagnostics definition")) + tcQueue.append(("print", "\r\nTesting Service 3: Deleting custom diagnostics definition")) command = PUSTelecommand(service=3, subservice=4, SSC=3120, data=sid2) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # do some basic testing on predefined structs too # e.g. add one variable, change interval, report them.... - tcQueue.put(("export", "log/tmtc_log_service3.txt")) + tcQueue.append(("export", "log/tmtc_log_service3.txt")) return tcQueue diff --git a/tc/OBSW_TcService8.py b/tc/OBSW_TcService8.py index e8ba84a..2395d45 100644 --- a/tc/OBSW_TcService8.py +++ b/tc/OBSW_TcService8.py @@ -6,51 +6,51 @@ Description: PUS Custom Service 8: Function Management, High-Level Commanding @author: R. Mueller """ - import struct +from typing import Deque -from tc.OBSW_TcPacket import * +from tc.OBSW_TcPacket import PUSTelecommand from tc.OBSW_TcService200 import packModeData -def packService8TestInto(tcQueue, calledExternally=False): +def packService8TestInto(tcQueue: Deque, calledExternally: bool = False) -> Deque: if calledExternally is False: - tcQueue.put(("print", "Testing Service 8")) + tcQueue.append(("print", "Testing Service 8")) objectId = bytearray([0x44, 0x00, 0xAF, 0xFE]) # set mode on - tcQueue.put(("print", "\r\nTesting Service 8: Set Normal Mode")) + tcQueue.append(("print", "\r\nTesting Service 8: Set Normal Mode")) modeData = packModeData(objectId, 1, 0) command = PUSTelecommand(service=200, subservice=1, SSC=800, data=modeData) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # set mode normal - tcQueue.put(("print", "\r\nTesting Service 8: Set Normal Mode")) + tcQueue.append(("print", "\r\nTesting Service 8: Set Normal Mode")) modeData = packModeData(objectId, 2, 0) command = PUSTelecommand(service=200, subservice=1, SSC=810, data=modeData) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # Direct command which triggers completion reply - tcQueue.put(("print", "\r\nTesting Service 8: Trigger Completion Reply")) + tcQueue.append(("print", "\r\nTesting Service 8: Trigger Completion Reply")) actionId = struct.pack(">I", 666) directCommand = objectId + actionId command = PUSTelecommand(service=8, subservice=128, SSC=820, data=directCommand) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # Direct command which triggers data reply - tcQueue.put(("print", "\r\nTesting Service 8: Trigger Data Reply")) + tcQueue.append(("print", "\r\nTesting Service 8: Trigger Data Reply")) actionId = bytearray([0xC0, 0xC0, 0xBA, 0xBE]) commandParam1 = bytearray([0xBA, 0xB0]) commandParam2 = bytearray([0x00, 0x00, 0x00, 0x52, 0x4F, 0x42, 0x49, 0x4E]) directCommand = objectId + actionId + commandParam1 + commandParam2 command = PUSTelecommand(service=8, subservice=128, SSC=830, data=directCommand) - tcQueue.put(command.packCommandTuple()) + tcQueue.append(command.packCommandTuple()) # Direct command which triggers an additional step reply and one completion reply - tcQueue.put(("print", "\r\nTesting Service 8: Trigger Step and Completion Reply")) + tcQueue.append(("print", "\r\nTesting Service 8: Trigger Step and Completion Reply")) actionId = bytearray([0xBA, 0xDE, 0xAF, 0xFE]) directCommand = objectId + actionId command = PUSTelecommand(service=8, subservice=128, SSC=840, data=directCommand) - tcQueue.put(command.packCommandTuple()) - tcQueue.put(("wait", 2)) - tcQueue.put(("print", "\r")) + tcQueue.append(command.packCommandTuple()) + tcQueue.append(("wait", 2)) + tcQueue.append(("print", "\r")) if calledExternally is False: - tcQueue.put(("export", "log/tmtc_log_service8.txt")) + tcQueue.append(("export", "log/tmtc_log_service8.txt")) return tcQueue diff --git a/test/OBSW_PusServiceTest.py b/test/OBSW_PusServiceTest.py index 34d28fc..3e9310d 100644 --- a/test/OBSW_PusServiceTest.py +++ b/test/OBSW_PusServiceTest.py @@ -1,5 +1,5 @@ import unittest -from test.OBSW_UnitTest import TestService +from test.OBSW_UnitTest import TestService, TmInfoQueueT from tc.OBSW_TcPacker import packService17TestInto, packService5TestInto, packService2TestInto import config.OBSW_Config as g @@ -12,7 +12,7 @@ class TestService2(TestService): # all commands must be sent sequentially, not as a burst cls.waitIntervals = [1, 2, 3, 4] cls.waitTime = [2, 2, 2, 2] - packService2TestInto(cls.testQueue) + packService2TestInto(super().testQueue) def test_Service2(self): assertionDict = self.performTestingAndGenerateAssertionDict() @@ -20,9 +20,9 @@ class TestService2(TestService): self.miscExpected = 4 super().performGenericAssertionTest(assertionDict) - def analyseTmInfo(self, tmInfoQueue, assertionDict): - while not tmInfoQueue.empty(): - currentTmInfo = tmInfoQueue.get() + def analyseTmInfo(self, tmInfoQueue: TmInfoQueueT, assertionDict: dict): + while not tmInfoQueue.__len__() == 0: + currentTmInfo = tmInfoQueue.pop() # Tc verification scanning is generic and has been moved to the superclass if currentTmInfo["service"] == 1: super().scanForRespectiveTc(currentTmInfo) @@ -53,7 +53,7 @@ class TestService5(TestService): # This is required because the OBSW tasks runs with fixed sequences cls.waitIntervals = [1, 2, 3] cls.waitTime = [1.5, 2.0, 2.0] - packService5TestInto(cls.testQueue) + packService5TestInto(super().testQueue) def test_Service5(self): # analyseTmInfo() and analyseTcInfo are called here @@ -96,7 +96,7 @@ class TestService17(TestService): cls.waitIntervals = [2] cls.waitTime = 2 cls.tmTimeout = g.tmTimeout - packService17TestInto(cls.testQueue) + packService17TestInto(super().testQueue) def test_Service17(self): super().performService5or17Test() diff --git a/test/OBSW_UnitTest.py b/test/OBSW_UnitTest.py index 0101102..2c84fe5 100644 --- a/test/OBSW_UnitTest.py +++ b/test/OBSW_UnitTest.py @@ -26,10 +26,13 @@ to be specified in the analyseTcInfo method of the child test. @author: R. Mueller """ import unittest -import queue - +from collections import deque +from typing import Deque from tc.OBSW_TcPacker import packDummyDeviceTestInto +from tm.OBSW_PusTm import pusPacketInfoT +from tm.OBSW_TmService1 import pusPacketInfoService1T +from sendreceive.OBSW_TmListener import TmListener from abc import abstractmethod from sendreceive.OBSW_MultipleCommandsSenderReceiver import MultipleCommandSenderReceiver from utility.OBSW_TmTcPrinter import TmTcPrinter @@ -37,17 +40,24 @@ from comIF.OBSW_Ethernet_ComIF import EthernetComIF from comIF.OBSW_Serial_ComIF import SerialComIF from config import OBSW_Config as g +TmInfoQueueT = Deque[pusPacketInfoT] +TmInfoQueueService1T = Deque[pusPacketInfoService1T] + -# documentation: https://docs.python.org/3/library/unittest.html#unittest.TestCase.setUpClass class TestService(unittest.TestCase): - testQueue = queue.Queue() - communicationInterface = 0 + """ + Generic service test class. + See documentation: https://docs.python.org/3/library/unittest.html#unittest.TestCase.setUpClass + """ - # A class method which is run before every individually class module is run - # must be decorated by the @classmethod fixture @classmethod def setUpClass(cls): - cls.displayMode = "long" + """ + A class method which is run before every individually class module is run + must be decorated by the @classmethod fixture + :return: + """ + cls._displayMode = "long" # default timeout for receiving TM, set in subclass manually cls.tmTimeout = g.tmTimeout @@ -59,29 +69,32 @@ class TestService(unittest.TestCase): cls.printTc = True # default wait time between tc send bursts cls.waitTime = 5.0 - cls.testQueue = queue.Queue() + cls.testQueue = deque() cls.tcTimeoutFactor = g.tcSendTimeoutFactor cls.printFile = g.printToFile - cls.tmtcPrinter = TmTcPrinter(cls.displayMode, cls.printFile, True) + cls.tmtcPrinter = TmTcPrinter(cls._displayMode, cls.printFile, True) + cls.communicationInterface = None if g.comIF == 0: cls.communicationInterface = EthernetComIF(cls.tmtcPrinter, g.tmTimeout, g.tcSendTimeoutFactor, g.sockSend, g.sockReceive, g.sendAddress) else: baudRate = 115200 cls.communicationInterface = SerialComIF(cls.tmtcPrinter, g.comPort, baudRate, g.serialTimeout) + cls.tmListener = TmListener(cls.communicationInterface, g.tmTimeout, cls.tcTimeoutFactor) # connectToBoard() # This function should be called in each individual test to send the actual telecommands # which are stored inside testQueue def performTestingAndGenerateAssertionDict(self): - UnitTester = MultipleCommandSenderReceiver(self.communicationInterface, self.tmtcPrinter, self.testQueue, - self.tmTimeout, self.waitIntervals, self.waitTime, self.printTm, - self.tcTimeoutFactor, self.printFile) + UnitTester = MultipleCommandSenderReceiver( + comInterface=self.communicationInterface, tmtcPrinter=self.tmtcPrinter,tmListener=self.tmListener, + tcQueue=self.testQueue,tmTimeout=self.tmTimeout, waitIntervals=self.waitIntervals, waitTime=self.waitTime, + printTm=self.printTm, tcTimeoutFactor=self.tcTimeoutFactor, doPrintToFile=self.printFile) (tcInfoQueue, tmInfoQueue) = UnitTester.sendTcQueueAndReturnTcInfo() assertionDict = self.analyseTmTcInfo(tmInfoQueue, tcInfoQueue) return assertionDict - def analyseTmTcInfo(self, tmInfoQueue, tcInfoQueue): + def analyseTmTcInfo(self, tmInfoQueue: Deque, tcInfoQueue: Deque) -> dict: self.tcSscArray = [] self.tcServiceArray = [] self.tcSubserviceArray = [] @@ -119,9 +132,9 @@ class TestService(unittest.TestCase): return assertionDict - def analyseTcInfo(self, tcInfoQueue): - while not tcInfoQueue.empty(): - currentTcInfo = tcInfoQueue.get() + def analyseTcInfo(self, tcInfoQueue: Deque): + while not tcInfoQueue.__len__() == 0: + currentTcInfo = tcInfoQueue.pop() self.tcVerifyCounter = self.tcVerifyCounter + 1 # For commands with multiple steps, update this value manually ! self.tcVerifyStepCounter = self.tcVerifyCounter + 1 @@ -131,12 +144,12 @@ class TestService(unittest.TestCase): # must be implemented by child test ! @abstractmethod - def analyseTmInfo(self, tmInfoQueue, assertionDict): + def analyseTmInfo(self, tmInfoQueue: Deque, assertionDict: dict): pass # this function looks whether the tc verification SSC matched # a SSC of the sent TM - def scanForRespectiveTc(self, currentTmInfo): + def scanForRespectiveTc(self, currentTmInfo: dict): currentSubservice = currentTmInfo["subservice"] for possibleIndex, searchIndex in enumerate(self.tcSscArray): if searchIndex == currentTmInfo["tcSSC"]: @@ -147,7 +160,7 @@ class TestService(unittest.TestCase): elif currentSubservice == 7: self.tcVerifiedCompletion = self.tcVerifiedCompletion + 1 - def performGenericAssertionTest(self, assertionDict): + def performGenericAssertionTest(self, assertionDict: dict): if assertionDict is None: print("Performing Generic Assertion Test: " "Configuratrion error, asseretion dictionary was not passed properly") @@ -186,9 +199,10 @@ class TestService(unittest.TestCase): def tearDownClass(cls): cls.eventCounter = 0 cls.tcVerifyCounter = 0 - cls.testQueue.queue.clear() + # noinspection PyUnresolvedReferences + cls.testQueue.clear() if g.comIF == 1: - cls.communicationInterface.serial.close() + cls.communicationInterface.close() class TestDummyDevice(TestService): @@ -196,7 +210,7 @@ class TestDummyDevice(TestService): def setUpClass(cls): super().setUpClass() print("Testing Dummy Device") - packDummyDeviceTestInto(cls.testQueue) + packDummyDeviceTestInto(super().testQueue) def analyseTmInfo(self, tmInfoQueue, assertionDict): pass diff --git a/tm/OBSW_TmPacket.py b/tm/OBSW_TmPacket.py index d37864d..dbeabdd 100644 --- a/tm/OBSW_TmPacket.py +++ b/tm/OBSW_TmPacket.py @@ -11,9 +11,6 @@ from tm.OBSW_TmService1 import Service1TM from tm.OBSW_TmService3 import Service3TM from tm.OBSW_TmService5 import Service5TM import struct -# TM Packets use the generic space packet structure provided by OBSWPusPacket to generate individual -# telemetry packets for all services -# TO DO: Different classes for different services? def PUSTelemetryFactory(rawPacket): diff --git a/tm/OBSW_TmService1.py b/tm/OBSW_TmService1.py index 969aaa1..59d68d2 100644 --- a/tm/OBSW_TmService1.py +++ b/tm/OBSW_TmService1.py @@ -5,10 +5,12 @@ Date: 30.12.2019 Description: Deserialize Pus Verification TM Author: R. Mueller """ - from tm.OBSW_PusTm import PUSTelemetry +from typing import Dict import struct +pusPacketInfoService1T = Dict[str, any] + class Service1TM(PUSTelemetry): def __init__(self, byteArray): @@ -66,7 +68,7 @@ class Service1TM(PUSTelemetry): elif self.isStep: array.append("Step Number") - def packTmInformation(self): + def packTmInformation(self) -> pusPacketInfoService1T: tmInformation = super().packTmInformation() addInformation = { "tcPacketId": self.tcPacketId, -- GitLab