diff --git a/.gitignore b/.gitignore index e79337e8feef5d626089a78b32af08acea338276..83e1518665dce88453afcc1a0d3f025daa293703 100644 --- a/.gitignore +++ b/.gitignore @@ -22,4 +22,6 @@ __pycache__ *.ini generators/*.csv -tmtc_log.txt \ No newline at end of file + +*tmtc*.txt +log \ No newline at end of file diff --git a/comIF/OBSW_ComInterface.py b/comIF/OBSW_ComInterface.py new file mode 100644 index 0000000000000000000000000000000000000000..5b3ca257f8ccd8fc5d747551a44abac1794035e5 --- /dev/null +++ b/comIF/OBSW_ComInterface.py @@ -0,0 +1,10 @@ +class CommunicationInterface(): + def __init__(self): + pass + + def sendTelecommand(self, tcPacket): + pass + + def receiveTelemetry(self, tmPacket): + pass + diff --git a/comIF/OBSW_Ethernet_ComIF.py b/comIF/OBSW_Ethernet_ComIF.py new file mode 100644 index 0000000000000000000000000000000000000000..f15ec6e249500f4871dbe87affa9ee327de51985 --- /dev/null +++ b/comIF/OBSW_Ethernet_ComIF.py @@ -0,0 +1,6 @@ +from comIF.OBSW_ComInterface import CommunicationInterface + + +class EthernetComIF(CommunicationInterface): + def __init__(self): + pass diff --git a/comIF/OBSW_Serial_ComIF.py b/comIF/OBSW_Serial_ComIF.py new file mode 100644 index 0000000000000000000000000000000000000000..2aaa3690ddf5cb97d6cf1037b57e376080499dac --- /dev/null +++ b/comIF/OBSW_Serial_ComIF.py @@ -0,0 +1,7 @@ +from comIF.OBSW_ComInterface import CommunicationInterface + + +class SerialComIF(CommunicationInterface): + def __init__(self): + pass + diff --git a/comIF/__init__.py b/comIF/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/sendreceive/OBSW_CommandSenderReceiver.py b/sendreceive/OBSW_CommandSenderReceiver.py index b127803f6e121ee241d6db2e4873f8849b0a796d..e9c6844d6cb765c3be3e84b260654968ba94a2c3 100644 --- a/sendreceive/OBSW_CommandSenderReceiver.py +++ b/sendreceive/OBSW_CommandSenderReceiver.py @@ -23,7 +23,7 @@ from tm.OBSW_TmPacket import PUSTelemetryFactory # Generic TMTC SendReceive class which is implemented # by specific implementations (e.g. SingleCommandSenderReceiver) class CommandSenderReceiver: - def __init__(self, displayMode, tmTimeout, tcSendTimeoutFactor, printTc, doPrintToFile): + def __init__(self, comInterface, displayMode, tmTimeout, tcSendTimeoutFactor, printTc, doPrintToFile): self.displayMode = displayMode self.tmTimeout = tmTimeout self.printTc = printTc @@ -59,6 +59,7 @@ class CommandSenderReceiver: self.tmtcPrinter.printTelemetry(packet) self.tmReady = select.select([g.sockReceive], [], [], self.tmTimeout / 1.5) + # Check for special queue entries. def checkQueueEntry(self, tcQueueEntry): (self.pusPacketInfo, self.pusPacket) = tcQueueEntry self.queueEntryIsTelecommand = False @@ -68,6 +69,10 @@ class CommandSenderReceiver: elif self.pusPacketInfo == "print": printString = self.pusPacket self.tmtcPrinter.printString(printString) + elif self.pusPacketInfo == "export": + exportName = self.pusPacket + if self.doPrintToFile: + self.tmtcPrinter.printToFile(exportName, True) else: self.queueEntryIsTelecommand = True diff --git a/sendreceive/OBSW_MultipleCommandsSenderReceiver.py b/sendreceive/OBSW_MultipleCommandsSenderReceiver.py index 1b0ca479638db99644630120a426844f6b639901..47f28f55ae33fb9ed6f7175123670cc81ce3ae51 100644 --- a/sendreceive/OBSW_MultipleCommandsSenderReceiver.py +++ b/sendreceive/OBSW_MultipleCommandsSenderReceiver.py @@ -17,9 +17,9 @@ import select # Difference to seqential sender: This class can send TCs in bursts. Wait intervals can be specified with # wait time between the send bursts. This is generally done in the separate test classes in UnitTest class MultipleCommandSenderReceiver(SequentialCommandSenderReceiver): - def __init__(self, displayMode, tcQueue, tmTimeout, waitIntervals, + def __init__(self, comInterface, displayMode, tcQueue, tmTimeout, waitIntervals, waitTime, printTm, printTc, tcTimeoutFactor, doPrintToFile): - super().__init__(displayMode, tmTimeout, tcQueue, tcTimeoutFactor, printTc, doPrintToFile) + super().__init__(displayMode, comInterface, tmTimeout, tcQueue, tcTimeoutFactor, printTc, doPrintToFile) self.waitIntervals = waitIntervals self.waitTime = waitTime self.printTm = printTm @@ -35,7 +35,12 @@ class MultipleCommandSenderReceiver(SequentialCommandSenderReceiver): time.sleep(1) try: self.sendAllQueue() - self.handleTcResending() + while not self.allRepliesReceived: + time.sleep(1) + # self.handleTcResending() Turned off for now, not needed + if self.doPrintToFile: + print("HELLOOOO !") + self.tmtcPrinter.printToFile() except (KeyboardInterrupt, SystemExit): super().handleInterrupt(receiverThread) return self.tcInfoQueue, self.tmInfoQueue @@ -46,8 +51,6 @@ class MultipleCommandSenderReceiver(SequentialCommandSenderReceiver): if self.start_time == 0: self.start_time = time.time() self.checkForTimeout() - if self.doPrintToFile: - self.tmtcPrinter.printToFile() def sendAllQueue(self): waitCounter = 0 @@ -75,7 +78,6 @@ class MultipleCommandSenderReceiver(SequentialCommandSenderReceiver): self.tmReady = select.select([g.sockReceive], [], [], 2.0) if self.tmReady[0]: self.receiveTelemetryAndStoreInformation() - time.sleep(0.5) if self.tcQueue.empty(): print("TC queue empty. Listening for a few more seconds ... ") start_time = time.time() diff --git a/sendreceive/OBSW_SequentialSenderReceiver.py b/sendreceive/OBSW_SequentialSenderReceiver.py index 10eef52789d1c73150f24f54218ffbc7460841ab..91135a4a80dcb4d66474e49fc90113429637fa95 100644 --- a/sendreceive/OBSW_SequentialSenderReceiver.py +++ b/sendreceive/OBSW_SequentialSenderReceiver.py @@ -17,8 +17,8 @@ import select # Specific implementation of CommandSenderReceiver to send multiple telecommands in sequence class SequentialCommandSenderReceiver(CommandSenderReceiver): - def __init__(self, displayMode, tmTimeout, tcQueue, tcTimeoutFactor, printTc, doPrintToFile): - super().__init__(displayMode, tmTimeout, tcTimeoutFactor, printTc, doPrintToFile) + def __init__(self, comInterface, displayMode, tmTimeout, tcQueue, tcTimeoutFactor, printTc, doPrintToFile): + super().__init__(displayMode, comInterface, tmTimeout, tcTimeoutFactor, printTc, doPrintToFile) self.tcQueue = tcQueue self.firstReplyReceived = False self.allRepliesReceived = False @@ -61,6 +61,7 @@ class SequentialCommandSenderReceiver(CommandSenderReceiver): if self.replyReceived: self.start_time = time.time() self.sendNextTelecommand() + self.replyReceived = False # just calculate elapsed time if start time has already been set (= command has been sent) else: self.checkForTimeout() @@ -76,9 +77,12 @@ class SequentialCommandSenderReceiver(CommandSenderReceiver): self.checkQueueEntry(self.tcQueue.get()) if self.queueEntryIsTelecommand: self.sendTelecommand() - self.replyReceived = False + elif self.tcQueue.empty(): + # Special case: Last queue entry is not a Telecommand + self.allRepliesReceived = True else: - self.sendNextTelecommand() + self.replyReceived = True + self.performNextTcSend() # this function runs is a separate thread and checks for replies. If the first reply has not been received, # it attempts to send the telecommand again. diff --git a/sendreceive/OBSW_SingleCommandSenderReceiver.py b/sendreceive/OBSW_SingleCommandSenderReceiver.py index be0950b0644b9a071357cd33b2e700fc7a26ef4c..4964cb3ca403b0321d1448bb1f7c99742917fb67 100644 --- a/sendreceive/OBSW_SingleCommandSenderReceiver.py +++ b/sendreceive/OBSW_SingleCommandSenderReceiver.py @@ -16,8 +16,8 @@ import time # Specific implementation of CommandSenderReceiver to send a single telecommand class SingleCommandSenderReceiver(CommandSenderReceiver): - def __init__(self,displayMode, pusPacketTuple, tmTimeout, tcTimeoutFactor, doPrintToFile): - super().__init__(displayMode, tmTimeout, tcTimeoutFactor, doPrintToFile) + def __init__(self, comInterface, displayMode, pusPacketTuple, tmTimeout, tcTimeoutFactor, doPrintToFile): + super().__init__(comInterface, displayMode, tmTimeout, tcTimeoutFactor, doPrintToFile) self.pusPacketTuple = pusPacketTuple (self.pusPacketInfo, self.pusPacket) = self.pusPacketTuple diff --git a/tc/OBSW_TcPacker.py b/tc/OBSW_TcPacker.py index a6a8bda55d6bb698199817718d45ac915dc00714..6a11214f25d7cc63ae1d31c2d627704d27f6b681 100644 --- a/tc/OBSW_TcPacker.py +++ b/tc/OBSW_TcPacker.py @@ -1,6 +1,23 @@ # -*- coding: utf-8 -*- +""" +Program: OBSW_UnitTest.py +Date: 01.11.2019 +Description: Packs the TC queue for specific service or device testing + +Manual: +Contains a service select factory which returns specific service or device tests. +Also contains an all service and all device tc queue packer. +Run script with -s <Service or Device> -m 3 with optional -p parameter for file output inside the log folder + +@author: R. Mueller +""" +import os + from tc.OBSW_TcPacket import * -import struct +from tc.OBSW_TcService2 import packService2TestInto +from tc.OBSW_TcService8 import packService8TestInto +from tc.OBSW_TcService200 import packModeData, packService200TestInto + from datetime import datetime import queue @@ -35,56 +52,31 @@ def serviceTestSelect(service, serviceQueue): exit() -def packService2TestInto(tcQueue, builderQueue=0): - objectId = bytearray([0x44, 0x00, 0xAF, 0xFE]) # dummy device - # don't forget to set object mode raw (service 200) before calling this ! - # Set Raw Mode - modeData = packModeData(objectId, 3, 0) - command = PUSTelecommand(service=200, subservice=1, SSC=2020, data=modeData) - tcQueue.put(command.packCommandTuple()) - # toggle wiretapping raw - wiretappingToggleData = packWiretappingMode(objectId, 1) - toggleWiretappingOnCommand = PUSTelecommand(service=2, subservice=129, SSC=200, data=wiretappingToggleData) - tcQueue.put(toggleWiretappingOnCommand.packCommandTuple()) - # send raw command, data should be returned via TM[2,130] and TC[2,131] - rawCommand = struct.pack(">I", 666) - rawData = objectId + rawCommand - rawCommand = PUSTelecommand(service=2, subservice=128, SSC=201, data=rawData) - tcQueue.put(rawCommand.packCommandTuple()) - # toggle wiretapping off - wiretappingToggleData = packWiretappingMode(objectId, 0) - toggleWiretappingOffCommand = PUSTelecommand(service=2, subservice=129, SSC=204, data=wiretappingToggleData) - tcQueue.put(toggleWiretappingOffCommand.packCommandTuple()) - # send raw command which should be returned via TM[2,130] - command = PUSTelecommand(service=2, subservice=128, SSC=205, data=rawData) - tcQueue.put(command.packCommandTuple()) - return tcQueue - - -# wiretappingMode = 0: MODE_OFF, wiretappingMode = 1: MODE_RAW -def packWiretappingMode(objectId, wiretappingMode_): - wiretappingMode = struct.pack(">B", wiretappingMode_) # MODE_OFF : 0x00, MODE_RAW: 0x01 - wiretappingToggleData = objectId + wiretappingMode - return wiretappingToggleData - - def packService5TestInto(tcQueue): + tcQueue.put(("print", "Testing Service 5")) # disable events + tcQueue.put(("print", "\r\nTesting Service 5: Disable event")) command = PUSTelecommand(service=5, subservice=6, SSC=500) tcQueue.put(command.packCommandTuple()) # trigger event + tcQueue.put(("print", "\r\nTesting Service 5: Trigger event")) command = PUSTelecommand(service=17, subservice=128, SSC=510) tcQueue.put(command.packCommandTuple()) # enable event + tcQueue.put(("print", "\r\nTesting Service 5: Enable event")) command = PUSTelecommand(service=5, subservice=5, SSC=520) tcQueue.put(command.packCommandTuple()) # trigger event + tcQueue.put(("print", "\r\nTesting Service 5: Trigger another event")) command = PUSTelecommand(service=17, subservice=128, SSC=530) tcQueue.put(command.packCommandTuple()) + tcQueue.put(("print", "\r")) + tcQueue.put(("export", "log/tmtc_log_service5.txt")) return tcQueue def packService9TestInto(tcQueue): + tcQueue.put(("print", "Testing Service 9")) timeTestASCIICodeA = '2019-08-30T20:50:33.892429Z' + '\0' timeTestASCIICodeB = '2019-270T05:50:33.002000Z' + '\0' timeTestCurrentTime = datetime.now().isoformat() + "Z" + '\0' @@ -93,124 +85,86 @@ def packService9TestInto(tcQueue): timeTest3 = timeTestCurrentTime.encode('ascii') print("Time Code 1 :" + str(timeTest1)) print("Time Code 2 :" + str(timeTest2)) - print("Time Code 3 :" + str(timeTest3)) + print("Time Code 3 :" + str(timeTest3) + "\r") # time setting + tcQueue.put(("print", "\r\nTesting Service 9: Testing timecode A")) command = PUSTelecommand(service=9, subservice=128, SSC=900, data=timeTest1) tcQueue.put(command.packCommandTuple()) + tcQueue.put(("print", "\r\nTesting Service 9: Testing timecode B")) command = PUSTelecommand(service=9, subservice=128, SSC=910, data=timeTest2) tcQueue.put(command.packCommandTuple()) + tcQueue.put(("print", "\r\nTesting Service 9: Testing timecode Current Time")) command = PUSTelecommand(service=9, subservice=128, SSC=920, data=timeTest3) tcQueue.put(command.packCommandTuple()) # TODO: Add other time formats here - return tcQueue - - -def packService8TestInto(tcQueue): - objectId = bytearray([0x44, 0x00, 0xAF, 0xFE]) - # set mode normal - modeData = packModeData(objectId, 2, 0) - command = PUSTelecommand(service=200, subservice=1, SSC=3, data=modeData) - tcQueue.put(command.packCommandTuple()) - # Direct command which triggers completion reply - actionId = struct.pack(">I", 666) - directCommand = objectId + actionId - command = PUSTelecommand(service=8, subservice=128, SSC=810, data=directCommand) - tcQueue.put(command.packCommandTuple()) - # Direct command which triggers data reply - actionId = bytearray([0xC0, 0xC0, 0xBA, 0xBE]) - commandParam1 = bytearray([0xBA, 0xB0]) - commandParam2 = bytearray([0x00, 0x00, 0x00, 0x52, 0x4F, 0x42, 0x49, 0x4E]) - directCommand = objectId + actionId + commandParam1 + commandParam2 - command = PUSTelecommand(service=8, subservice=128, SSC=820, data=directCommand) - tcQueue.put(command.packCommandTuple()) - # Direct command which triggers an additional step reply and one completion reply - actionId = bytearray([0xBA, 0xDE, 0xAF, 0xFE]) - directCommand = objectId + actionId - command = PUSTelecommand(service=8, subservice=128, SSC=830, data=directCommand) - tcQueue.put(command.packCommandTuple()) + tcQueue.put(("print", "\r")) + tcQueue.put(("export", "log/tmtc_log_service9.txt")) return tcQueue def packService17TestInto(tcQueue): + tcQueue.put(("print", "Testing Service 17")) # ping test - tcQueue.put(("print", "Testing Service 17: Ping Test")) + tcQueue.put(("print", "\n\rTesting Service 17: Ping Test")) command = PUSTelecommand(service=17, subservice=1, SSC=1700) tcQueue.put(command.packCommandTuple()) # enable event - tcQueue.put(("print", "Testing Service 17: Enable Event")) + tcQueue.put(("print", "\n\rTesting Service 17: Enable Event")) command = PUSTelecommand(service=5, subservice=5, SSC=52) tcQueue.put(command.packCommandTuple()) # test event - tcQueue.put(("print", "Testing Service 17: Trigger event")) + tcQueue.put(("print", "\n\rTesting Service 17: Trigger event")) command = PUSTelecommand(service=17, subservice=128, SSC=1701) tcQueue.put(command.packCommandTuple()) - return tcQueue - - -def packService200TestInto(tcQueue): - # Object ID: Dummy Device - objectId = bytearray([0x44, 0x00, 0xAF, 0xFE]) - # Set On Mode - modeData = packModeData(objectId, 1, 0) - command = PUSTelecommand(service=200, subservice=1, SSC=2000, data=modeData) - tcQueue.put(command.packCommandTuple()) - # Set Normal mode - modeData = packModeData(objectId, 2, 0) - command = PUSTelecommand(service=200, subservice=1, SSC=2010, data=modeData) - tcQueue.put(command.packCommandTuple()) - # Set Raw Mode - modeData = packModeData(objectId, 3, 0) - command = PUSTelecommand(service=200, subservice=1, SSC=2020, data=modeData) - tcQueue.put(command.packCommandTuple()) - # Set Off Mode - modeData = packModeData(objectId, 0, 0) - command = PUSTelecommand(service=200, subservice=1, SSC=2030, data=modeData) - tcQueue.put(command.packCommandTuple()) + tcQueue.put(("print", "\r")) + tcQueue.put(("export", "log/tmtc_log_service17.txt")) return tcQueue def packDummyDeviceTestInto(tcQueue): + tcQueue.put(("print", "Testing Dummy Device")) # Object ID: Dummy Device objectId = bytearray([0x44, 0x00, 0xAF, 0xFE]) # Set On Mode + tcQueue.put(("print", "\n\rTesting Service Dummy: Set On")) modeData = packModeData(objectId, 1, 0) command = PUSTelecommand(service=200, subservice=1, SSC=1, data=modeData) tcQueue.put(command.packCommandTuple()) # Test Service 2 commands - packService2TestInto(tcQueue) + tcQueue.put(("print", "\n\rTesting Service Dummy: Service 2")) + packService2TestInto(tcQueue, True) # Test Service 8 - packService8TestInto(tcQueue) + tcQueue.put(("print", "\n\rTesting Service Dummy: Service 8")) + packService8TestInto(tcQueue, True) + tcQueue.put(("print", "\r")) + tcQueue.put(("export", "log/tmtc_log_service_dummy.txt")) return tcQueue def packGpsTestInto(objectId, tcQueue): + tcQueue.put(("print", "Testing GPS Device")) # Set Mode Off + tcQueue.put(("print", "\n\rTesting GPS: Set Off")) modeData = packModeData(objectId, 0, 0) command = PUSTelecommand(service=200, subservice=1, SSC=11, data=modeData) tcQueue.put(command.packCommandTuple()) # Set Mode On + tcQueue.put(("print", "\n\rTesting GPS: Set On")) modeData = packModeData(objectId, 1, 0) command = PUSTelecommand(service=200, subservice=1, SSC=12, data=modeData) tcQueue.put(command.packCommandTuple()) # pack wait interval until mode is one - tcQueue.put(("wait", 5)) + tcQueue.put(("wait", 3)) # Set Mode Off + tcQueue.put(("print", "\n\rTesting GPS: Set Off")) modeData = packModeData(objectId, 0, 0) command = PUSTelecommand(service=200, subservice=1, SSC=13, data=modeData) tcQueue.put(command.packCommandTuple()) + tcQueue.put(("print", "\r")) + tcQueue.put(("export", "log/tmtc_log_service_gps.txt")) return tcQueue -# Mode 0: Off, Mode 1: Mode On, Mode 2: Mode Normal, Mode 3: Mode Raw -def packModeData(objectId, mode_, submode_): - # Normal mode - mode = struct.pack(">I", mode_) - # Submode default - submode = struct.pack('B', submode_) - modeData = objectId + mode + submode - return modeData - - def packErrorTestingInto(tcQueue): # a lot of events command = PUSTelecommand(service=17, subservice=129, SSC=2010) @@ -222,6 +176,8 @@ def packErrorTestingInto(tcQueue): def createTotalTcQueue(): + if not os.path.exists("log"): + os.mkdir("log") tcQueue = queue.Queue() tcQueue = packService2TestInto(tcQueue) tcQueue = packService5TestInto(tcQueue) diff --git a/tc/OBSW_TcService2.py b/tc/OBSW_TcService2.py new file mode 100644 index 0000000000000000000000000000000000000000..5eae374c35ae1ed75308bf1bf7ef4dc9849ef107 --- /dev/null +++ b/tc/OBSW_TcService2.py @@ -0,0 +1,60 @@ +# -*- coding: utf-8 -*- +""" +Program: OBSW_UnitTest.py +Date: 01.11.2019 +Description: PUS Custom Service 8: Device Access, Native low-level commanding + +@author: R. Mueller +""" + +import struct + +from tc.OBSW_TcPacket import * +from tc.OBSW_TcService200 import packModeData + + +def packService2TestInto(tcQueue, calledExternally=False): + if calledExternally is False: + tcQueue.put(("print", "Testing Service 2")) + objectId = bytearray([0x44, 0x00, 0xAF, 0xFE]) # dummy device + # don't forget to set object mode raw (service 200) before calling this ! + # Set Raw Mode + tcQueue.put(("print", "\r\nTesting Service 2: Setting Raw Mode")) + modeData = packModeData(objectId, 3, 0) + command = PUSTelecommand(service=200, subservice=1, SSC=2020, data=modeData) + tcQueue.put(command.packCommandTuple()) + # toggle wiretapping raw + tcQueue.put(("print", "\r\nTesting Service 2: Toggling Wiretapping Raw")) + wiretappingToggleData = packWiretappingMode(objectId, 1) + toggleWiretappingOnCommand = PUSTelecommand(service=2, subservice=129, SSC=200, data=wiretappingToggleData) + tcQueue.put(toggleWiretappingOnCommand.packCommandTuple()) + # send raw command, data should be returned via TM[2,130] and TC[2,131] + tcQueue.put(("print", "\r\nTesting Service 2: Sending Raw Command")) + rawCommand = struct.pack(">I", 666) + rawData = objectId + rawCommand + rawCommand = PUSTelecommand(service=2, subservice=128, SSC=201, data=rawData) + tcQueue.put(rawCommand.packCommandTuple()) + # toggle wiretapping off + tcQueue.put(("print", "\r\nTesting Service 2: Toggle Wiretapping Off")) + wiretappingToggleData = packWiretappingMode(objectId, 0) + toggleWiretappingOffCommand = PUSTelecommand(service=2, subservice=129, SSC=204, data=wiretappingToggleData) + tcQueue.put(toggleWiretappingOffCommand.packCommandTuple()) + # send raw command which should be returned via TM[2,130] + tcQueue.put(("print", "\r\nTesting Service 2: Send second raw command")) + command = PUSTelecommand(service=2, subservice=128, SSC=205, data=rawData) + tcQueue.put(command.packCommandTuple()) + tcQueue.put(("print", "\r")) + if calledExternally is False: + tcQueue.put(("export", "log/tmtc_log_service2.txt")) + return tcQueue + + +# wiretappingMode = 0: MODE_OFF, wiretappingMode = 1: MODE_RAW +def packWiretappingMode(objectId, wiretappingMode_): + wiretappingMode = struct.pack(">B", wiretappingMode_) # MODE_OFF : 0x00, MODE_RAW: 0x01 + wiretappingToggleData = objectId + wiretappingMode + return wiretappingToggleData + + +def packSpecificService2TestInto(tcQueue, objectIdList, dataList, sscList, printStringList, calledExternally=False): + pass diff --git a/tc/OBSW_TcService200.py b/tc/OBSW_TcService200.py new file mode 100644 index 0000000000000000000000000000000000000000..995fcd3a198e063839ccf24d94bb0c575a52ecc3 --- /dev/null +++ b/tc/OBSW_TcService200.py @@ -0,0 +1,48 @@ +# -*- coding: utf-8 -*- +""" +Program: OBSW_UnitTest.py +Date: 01.11.2019 +Description: PUS Custom Service 200: Mode Commanding + +Manual: +Contains a service select factory which returns specific service or device tests. +Also contains an all service and all device tc queue packer. +Run script with -s <Service or Device> -m 3 with optional -p parameter for file output inside the log folder + +@author: R. Mueller +""" +from tc.OBSW_TcPacket import * +import struct + + +def packService200TestInto(tcQueue): + # Object ID: Dummy Device + objectId = bytearray([0x44, 0x00, 0xAF, 0xFE]) + # Set On Mode + modeData = packModeData(objectId, 1, 0) + command = PUSTelecommand(service=200, subservice=1, SSC=2000, data=modeData) + tcQueue.put(command.packCommandTuple()) + # Set Normal mode + modeData = packModeData(objectId, 2, 0) + command = PUSTelecommand(service=200, subservice=1, SSC=2010, data=modeData) + tcQueue.put(command.packCommandTuple()) + # Set Raw Mode + modeData = packModeData(objectId, 3, 0) + command = PUSTelecommand(service=200, subservice=1, SSC=2020, data=modeData) + tcQueue.put(command.packCommandTuple()) + # Set Off Mode + modeData = packModeData(objectId, 0, 0) + command = PUSTelecommand(service=200, subservice=1, SSC=2030, data=modeData) + tcQueue.put(command.packCommandTuple()) + tcQueue.put(("export", "log/tmtc_log_service200.txt")) + return tcQueue + + +# Mode 0: Off, Mode 1: Mode On, Mode 2: Mode Normal, Mode 3: Mode Raw +def packModeData(objectId, mode_, submode_): + # Normal mode + mode = struct.pack(">I", mode_) + # Submode default + submode = struct.pack('B', submode_) + modeData = objectId + mode + submode + return modeData diff --git a/tc/OBSW_TcService8.py b/tc/OBSW_TcService8.py new file mode 100644 index 0000000000000000000000000000000000000000..6768e4cda2b2b26166019536773d8105be9836fc --- /dev/null +++ b/tc/OBSW_TcService8.py @@ -0,0 +1,53 @@ +# -*- coding: utf-8 -*- +""" +Program: OBSW_UnitTest.py +Date: 01.11.2019 +Description: PUS Custom Service 8: Function Management, High-Level Commanding + +@author: R. Mueller +""" + +import struct + +from tc.OBSW_TcPacket import * +from tc.OBSW_TcService200 import packModeData + + +def packService8TestInto(tcQueue, calledExternally=False): + if calledExternally is False: + tcQueue.put(("print", "Testing Service 8")) + objectId = bytearray([0x44, 0x00, 0xAF, 0xFE]) + # set mode normal + tcQueue.put(("print", "\r\nTesting Service 8: Set Normal Mode")) + modeData = packModeData(objectId, 2, 0) + command = PUSTelecommand(service=200, subservice=1, SSC=3, data=modeData) + tcQueue.put(command.packCommandTuple()) + # Direct command which triggers completion reply + tcQueue.put(("print", "\r\nTesting Service 8: Trigger Completion Reply")) + actionId = struct.pack(">I", 666) + directCommand = objectId + actionId + command = PUSTelecommand(service=8, subservice=128, SSC=810, data=directCommand) + tcQueue.put(command.packCommandTuple()) + # Direct command which triggers data reply + tcQueue.put(("print", "\r\nTesting Service 8: Trigger Data Reply")) + actionId = bytearray([0xC0, 0xC0, 0xBA, 0xBE]) + commandParam1 = bytearray([0xBA, 0xB0]) + commandParam2 = bytearray([0x00, 0x00, 0x00, 0x52, 0x4F, 0x42, 0x49, 0x4E]) + directCommand = objectId + actionId + commandParam1 + commandParam2 + command = PUSTelecommand(service=8, subservice=128, SSC=820, data=directCommand) + tcQueue.put(command.packCommandTuple()) + # Direct command which triggers an additional step reply and one completion reply + tcQueue.put(("print", "\r\nTesting Service 8: Trigger Step and Completion Reply")) + actionId = bytearray([0xBA, 0xDE, 0xAF, 0xFE]) + directCommand = objectId + actionId + command = PUSTelecommand(service=8, subservice=128, SSC=830, data=directCommand) + tcQueue.put(command.packCommandTuple()) + tcQueue.put(("print", "\r")) + if calledExternally is False: + tcQueue.put(("export", "log/tmtc_log_service8.txt")) + return tcQueue + + +def packSpecificService8TestInto(tcQueue, objectIdList, actionIdList, dataList, + sscList, printStringList, calledExternally=False): + pass diff --git a/test/OBSW_UnitTest.py b/test/OBSW_UnitTest.py index de96f5fa5575d0f2b4e350cf28fc2086b6b64bf7..531a2aa7822d5e7fa4a38c4590c0af0590574063 100644 --- a/test/OBSW_UnitTest.py +++ b/test/OBSW_UnitTest.py @@ -44,12 +44,13 @@ class TestService(unittest.TestCase): cls.waitTime = 5.0 cls.testQueue = queue.Queue() cls.tcTimeoutFactor = 3.0 + cls.printFile = True connectToBoard() def performTestingAndGenerateAssertionDict(self): UnitTester = MultipleCommandSenderReceiver(self.displayMode, self.testQueue, self.tmTimeout, self.waitIntervals, self.waitTime, self.printTm, - self.printTc, self.tcTimeoutFactor, False) + self.printTc, self.tcTimeoutFactor, self.printFile) (tcInfoQueue, tmInfoQueue) = UnitTester.sendTcQueueAndReturnTcInfo() assertionDict = self.analyseTmTcInfo(tmInfoQueue, tcInfoQueue) return assertionDict diff --git a/utility/OBSW_TmTcPrinter.py b/utility/OBSW_TmTcPrinter.py index b61113d02ceb275f9b63bddc1c267d1ae3a7895a..4c7af73cbd7b3425e1b0a5be671c10f1f02bd577 100644 --- a/utility/OBSW_TmTcPrinter.py +++ b/utility/OBSW_TmTcPrinter.py @@ -115,8 +115,10 @@ class TmtcPrinter: if self.doPrintToFile: self.fileBuffer = self.fileBuffer + self.printBuffer + "\n" - def printToFile(self): - file = open("tmtc_log.txt", 'w') + def printToFile(self, logName="tmtc_log.txt", clearFileBuffer=False): + file = open(logName, 'w') file.write(self.fileBuffer) + if clearFileBuffer: + self.fileBuffer = "" file.close()