diff --git a/OBSW_Config.py b/OBSW_Config.py index fa099860a4dfc2e053cb8f7c2e748a0afcc7a656..24b7e34200ff8d78edaca0d3394ae03880dc2927 100644 --- a/OBSW_Config.py +++ b/OBSW_Config.py @@ -19,6 +19,8 @@ displayMode = "long" comIF = 0 +# COM Port for serial communication +comPort = 'COM0' # Time related tmTimeout = 10 tcSendTimeoutFactor = 2.0 diff --git a/OBSW_UdpClient.py b/OBSW_UdpClient.py index 67fe5a60f62fe8b2a64aa1f635de4f1cc4859a89..0c650aba05ce14e664df8cd7a93edd1fe6c2d477 100644 --- a/OBSW_UdpClient.py +++ b/OBSW_UdpClient.py @@ -96,7 +96,7 @@ def main(): communicationInterface = EthernetComIF(tmtcPrinter, g.tmTimeout, g.tcSendTimeoutFactor, g.sockSend, g.sockReceive, g.sendAddress) else: - comPort = 'COM9' + comPort = g.comPort baudRate = 115200 communicationInterface = SerialComIF(tmtcPrinter, comPort, baudRate, 0.007) connectToBoard() @@ -126,6 +126,7 @@ def main(): SenderAndReceiver.sendQueueTcAndReceiveTmSequentially() elif g.modeId == "OBSWUnitTest": + communicationInterface.serial.close() # Set up test suite and run it with runner # Verbosity specifies detail level suite = unittest.TestLoader().loadTestsFromModule(OBSW_UnitTest) @@ -156,17 +157,22 @@ def parseInputArguments(): argParser.add_argument('--clientIP', help='Client(Computer) IP. Default:\'\'', default='') argParser.add_argument('--boardIP', help='Board IP. Default: 169.254.1.38', default='169.254.1.38') argParser.add_argument('-s', '--service', help='Service to test. Default: 17', default=17) - argParser.add_argument('-t', '--tmTimeout', type=float, help='TM Timeout. Default: 10)', default=10.0) + argParser.add_argument('-t', '--tmTimeout', type=float, help='TM Timeout. Default: 6, 3(Serial)', default=6.0) argParser.add_argument('-p', '--printFile', help='Supply -p to print output to file. Default: False', action='store_true') - argParser.add_argument('-o', '--tcTimeoutFactor', type=float, help='TC Timeout Factor. Default: 2.0', default=2.0) + argParser.add_argument('-o', '--tcTimeoutFactor', type=float, help='TC Timeout Factor. Default: 3.5', default=3.5) argParser.add_argument('-r', '--rawDataPrint', help='Supply -r to print all raw data directly', action='store_true') argParser.add_argument('-d', '--shortDisplayMode', help='Supply -d to print short output', action='store_true') argParser.add_argument('-k', '--hk', help='Supply -k or --hk to print HK data', action='store_true') + argParser.add_argument('--COM', help='COM Port for serial communication', default='COM0') if len(sys.argv) == 1: print("No Input Arguments specified, setting default values.") argParser.print_help() args = argParser.parse_args() + if args.comIF == 1 and not args.COM: + print("No COM port provided, using COM0 !") + if args.comIF == 1 and args.tmTimeout == 6.0: + args.tmTimeout = 3 print(args) return args @@ -200,6 +206,7 @@ def setGlobals(args): g.sockReceive = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) g.sockSend = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) g.modeId = modeId + g.comPort = args.COM g.printHkData = args.hk g.tmTimeout = args.tmTimeout g.printRawTmData = args.rawDataPrint diff --git a/comIF/OBSW_Serial_ComIF.py b/comIF/OBSW_Serial_ComIF.py index b862bac56c1107fabeec33da9e01d326c791771c..7f88b4befc7050347fbf4f8eb9ab7b46960dd771 100644 --- a/comIF/OBSW_Serial_ComIF.py +++ b/comIF/OBSW_Serial_ComIF.py @@ -9,10 +9,8 @@ from tm.OBSW_TmPacket import PUSTelemetryFactory class SerialComIF(CommunicationInterface): def __init__(self, tmtcPrinter, comPort, baudRate, serialTimeout): super().__init__(tmtcPrinter) - self.comPort = comPort - self.baudRate = baudRate self.tmtcPrinter = tmtcPrinter - self.serial = serial.Serial(port=comPort, baudrate=self.baudRate, timeout=serialTimeout) + self.serial = serial.Serial(port=comPort, baudrate=baudRate, timeout=serialTimeout) # self.serial.open() def sendTelecommand(self, tcPacket, tcPacketInfo=""): diff --git a/sendreceive/OBSW_CommandSenderReceiver.py b/sendreceive/OBSW_CommandSenderReceiver.py index 8352a3b771dc0dba598b8c566cffeff00d21d6b4..e4da2682da9a3c1f8868ff74cee5daa5bc72a426 100644 --- a/sendreceive/OBSW_CommandSenderReceiver.py +++ b/sendreceive/OBSW_CommandSenderReceiver.py @@ -76,7 +76,7 @@ class CommandSenderReceiver: self.comInterface.sendTelecommand(self.pusPacket, self.pusPacketInfo) self.timeoutCounter = self.timeoutCounter + 1 self.start_time = time.time() - time.sleep(3) + time.sleep(1) # check for one sequence of replies for a telecommand (e.g. TM[1,1] , TM[1,7] ...) # returns True on success diff --git a/sendreceive/OBSW_MultipleCommandsSenderReceiver.py b/sendreceive/OBSW_MultipleCommandsSenderReceiver.py index 7e4f38299ccca385675e73cfb38cb6d718d68967..68ac322660b7ee1dc4fa97d8594c24bded9f6398 100644 --- a/sendreceive/OBSW_MultipleCommandsSenderReceiver.py +++ b/sendreceive/OBSW_MultipleCommandsSenderReceiver.py @@ -53,6 +53,8 @@ class MultipleCommandSenderReceiver(SequentialCommandSenderReceiver): def sendAllQueue(self): while not self.tcQueue.empty(): + if g.comIF == 1: + time.sleep(0.01) # pause could be smaller, but causes overrun error on flashed board self.sendAndPrintTc() def sendAndPrintTc(self): @@ -74,8 +76,8 @@ class MultipleCommandSenderReceiver(SequentialCommandSenderReceiver): super().checkForMultipleReplies() def handleReplyListening(self): - self.tmReady = select.select([g.sockReceive], [], [], 2.0) - if self.tmReady[0]: + tmReady = self.comInterface.dataAvailable(2.0) + if tmReady: self.receiveTelemetryAndStoreInformation() if self.tcQueue.empty(): print("TC queue empty. Listening for a few more seconds ... ") @@ -92,8 +94,8 @@ class MultipleCommandSenderReceiver(SequentialCommandSenderReceiver): elapsed_time_seconds = 0 while elapsed_time_seconds < self.tmTimeout: elapsed_time_seconds = time.time() - start_time - self.tmReady = select.select([g.sockReceive], [], [], 2.0) - if self.tmReady[0]: + tmReady = self.comInterface.dataAvailable(2.0) + if tmReady: self.receiveTelemetryAndStoreInformation() diff --git a/sendreceive/OBSW_SequentialSenderReceiver.py b/sendreceive/OBSW_SequentialSenderReceiver.py index d160c41081cd6de61d6a61a4228f92f8b1aedd08..8e326efc064e72fa7cf7434e69e1ca6b77ae5c27 100644 --- a/sendreceive/OBSW_SequentialSenderReceiver.py +++ b/sendreceive/OBSW_SequentialSenderReceiver.py @@ -29,7 +29,7 @@ class SequentialCommandSenderReceiver(CommandSenderReceiver): def sendQueueTcAndReceiveTmSequentially(self): receiverThread = threading.Thread(target=self.checkForMultipleReplies) receiverThread.start() - time.sleep(1) + time.sleep(0.5) self.sendAndReceiveFirstPacket() # this flag is set in the separate thread ! try: @@ -93,7 +93,7 @@ class SequentialCommandSenderReceiver(CommandSenderReceiver): def handleReplyListening(self): if self.firstReplyReceived: - tmReady = self.comInterface.dataAvailable(2.0) + tmReady = self.comInterface.dataAvailable(1.0) if tmReady: self.handleTelemetrySequence() else: diff --git a/test/OBSW_UnitTest.py b/test/OBSW_UnitTest.py index 68f7fb51b719ea27d7a7490209c2889f697e7f52..bb698d7fe4997b68018fe589bd647e152b5d4d27 100644 --- a/test/OBSW_UnitTest.py +++ b/test/OBSW_UnitTest.py @@ -22,13 +22,17 @@ to be specified in the analyseTcInfo method of the child test. @author: R. Mueller """ +import time import unittest import queue + + from tc.OBSW_TcPacker import packService17TestInto, packService5TestInto, packDummyDeviceTestInto from sendreceive.OBSW_MultipleCommandsSenderReceiver import MultipleCommandSenderReceiver from OBSW_UdpClient import connectToBoard from utility.OBSW_TmTcPrinter import TmtcPrinter from comIF.OBSW_Ethernet_ComIF import EthernetComIF +from comIF.OBSW_Serial_ComIF import SerialComIF import OBSW_Config as g @@ -53,9 +57,13 @@ class TestService(unittest.TestCase): cls.tcTimeoutFactor = 3.0 cls.printFile = True cls.tmtcPrinter = TmtcPrinter(cls.displayMode, cls.printFile, True) - cls.communicationInterface = EthernetComIF(cls.tmtcPrinter, cls.tmTimeout, cls.tcTimeoutFactor, g.sockSend, - g.sockReceive, g.sendAddress) - connectToBoard() + if g.comIF == 0: + cls.communicationInterface = EthernetComIF(cls.tmtcPrinter, g.tmTimeout, g.tcSendTimeoutFactor, + g.sockSend, g.sockReceive, g.sendAddress) + else: + baudRate = 115200 + cls.communicationInterface = SerialComIF(cls.tmtcPrinter, g.comPort, baudRate, 0.004) + # connectToBoard() def performTestingAndGenerateAssertionDict(self): UnitTester = MultipleCommandSenderReceiver(self.communicationInterface, self.tmtcPrinter, self.testQueue, @@ -158,6 +166,8 @@ class TestService(unittest.TestCase): cls.eventCounter = 0 cls.tcVerifyCounter = 0 cls.testQueue.queue.clear() + if g.comIF == 1: + cls.communicationInterface.serial.close() def analyseTmInfo(self, tmInfoQueue, assertionDict): pass