Something went wrong on our end
Forked from an inaccessible project.
-
Robin Mueller authoredRobin Mueller authored
OBSW_MultipleCommandsSenderReceiver.py 5.34 KiB
"""
@file
OBSW_Config.py
@date
01.11.2019
@brief
Used to send multiple TCs as bursts and listen for replies simultaneously. Used by UnitTester
"""
import time
from typing import Union, Deque
from collections import deque
from sendreceive.OBSW_SequentialSenderReceiver import SequentialCommandSenderReceiver
from comIF.OBSW_ComInterface import ComIF_T
from utility.OBSW_TmTcPrinter import TmTcPrinterT
from sendreceive.OBSW_TmListener import TmListenerT
class MultipleCommandSenderReceiver(SequentialCommandSenderReceiver):
"""
Difference to seqential sender: This class can send TCs in bursts. Wait intervals can be specified with
wait time between the send bursts. This is generally done in the separate test classes in UnitTest
"""
def __init__(self, comInterface: ComIF_T, tmtcPrinter: TmTcPrinterT, tcQueue: Deque, tmListener: TmListenerT,
tmTimeout: float, waitIntervals: list, waitTime: Union[float, list], printTm: bool,
tcTimeoutFactor: float, doPrintToFile: bool):
"""
TCs are sent in burst when applicable. Wait intervals can be specified by supplying respective arguments
:param comInterface:
:param tmtcPrinter:
:param tcQueue:
:param tmTimeout:
:param waitIntervals: List of pause intervals. For example [1,3] means that a waitTime is applied after
sendinf the first and the third telecommand
:param waitTime: List of wait times or uniform wait time as float
:param printTm:
:param tcTimeoutFactor:
:param doPrintToFile:
"""
super().__init__(comInterface=comInterface, tmtcPrinter=tmtcPrinter, tmListener=tmListener,tmTimeout=tmTimeout,
tcQueue=tcQueue,tcTimeoutFactor=tcTimeoutFactor, doPrintToFile=doPrintToFile)
self.waitIntervals = waitIntervals
self.waitTime = waitTime
self.printTm = printTm
self.tmInfoQueue = deque()
self.tcInfoQueue = deque()
self.pusPacketInfo = []
self.pusPacket = []
self.waitCounter = 0
def sendTcQueueAndReturnTcInfo(self):
# receiverThread = threading.Thread(target=self.checkForMultipleReplies)
# receiverThread.start()
# time.sleep(0.5)
try:
self.sendAllQueue()
self.handleLastRepliesListening(self.tmTimeout/2.0)
self._tmListener.modeOpFinished.set()
self.tmInfoQueue = self._tmListener.retrieveTmInfoQueue()
# self.handleTcResending() Turned off for now, not needed
if self.doPrintToFile:
self._tmtcPrinter.printToFile()
except (KeyboardInterrupt, SystemExit):
print("Keyboard Interrupt or System Exit detected")
exit()
return self.tcInfoQueue, self.tmInfoQueue
def handleTcResending(self):
while not self.allRepliesReceived:
if self.tcQueue.empty():
if self.start_time == 0:
self.start_time = time.time()
self.checkForTimeout()
def sendAllQueue(self):
while not self.tcQueue.empty():
self.sendAndPrintTc()
def sendAndPrintTc(self):
self.checkQueueEntry(self.tcQueue.pop())
if self.queueEntryIsTelecommand:
self.tcInfoQueue.append(self.pusPacketInfo)
self._comInterface.sendTelecommand(self.pusPacket, self.pusPacketInfo)
self.handleWaiting()
def handleWaiting(self):
self.waitCounter = self.waitCounter + 1
if self.waitCounter in self.waitIntervals:
if isinstance(self.waitTime, list):
time.sleep(self.waitTime[self.waitIntervals.index(self.waitCounter)])
else:
time.sleep(self.waitTime)
def retrieveTmInfoQueue(self):
if self._tmListener.replyEvent.is_set():
return self._tmListener.tmInfoQueue
else:
print("Multiple Command SenderReceiver: Configuration error, reply event not set in TM listener")
@staticmethod
def handleLastRepliesListening(waitTime: float):
elapsed_time_seconds = 0
start_time = time.time()
while elapsed_time_seconds < waitTime:
elapsed_time_seconds = time.time() - start_time
# def checkForMultipleReplies(self):
# super().checkForMultipleReplies()
# def checkForMultipleReplies(self):
# while not self.allRepliesReceived and self.run_event.is_set():
# # listen for duration timeoutInSeconds for replies
# self.handleReplyListening()
# def handleReplyListening(self):
# tmReady = self.comInterface.dataAvailable(2.0)
# if tmReady:
# self.receiveTelemetryAndStoreInformation()
# if self.tcQueue.empty():
# print("TC queue empty. Listening for a few more seconds ... ")
# start_time = time.time()
# self.handleLastRepliesListening(start_time)
# self.allRepliesReceived = True
# def receiveTelemetryAndStoreInformation(self):
# packetList = self.comInterface.receiveTelemetry()
# for counter in range(0, len(packetList)):
# tmInfo = packetList[counter].packTmInformation()
# self.tmInfoQueue.append(tmInfo)