diff --git a/comIF/OBSW_Ethernet_ComIF.py b/comIF/OBSW_Ethernet_ComIF.py index d4de40ab9ffba23cf8b58ae1b86c33b8341ea383..010c0ab29b6b3eca2a41ae17bf96f5df25f67374 100644 --- a/comIF/OBSW_Ethernet_ComIF.py +++ b/comIF/OBSW_Ethernet_ComIF.py @@ -89,7 +89,7 @@ class EthernetComIF(CommunicationInterface): self.sockReceive.setblocking(False) except OSError: print("Socket already set-up.") - # print("Socket Receive Address: " + str(g.recAddress)) + # print("Socket Receive Address: " + str(g.G_REC_ADDRESS)) # logging.exception("Error: ") # exit() except TypeError: diff --git a/config/OBSW_Config.py b/config/OBSW_Config.py index 622fba063852befa4fa75448bf46b58c1b7e8b91..bf38c78b1601f61f68798952bcc2baeaa3115b61 100644 --- a/config/OBSW_Config.py +++ b/config/OBSW_Config.py @@ -51,28 +51,28 @@ All global variables, set in main program with arg parser # General Settings -scriptMode = 1 -modeId = 0 -service = 17 -displayMode = "long" +G_SCRIPT_MODE = 1 +G_MODE_ID = 0 +G_SERVICE = 17 +G_DISPLAY_MODE = "long" -comIF = 0 +G_COM_IF = 0 # COM Port for serial communication -comPort = 'COM0' -serial_timeout = 0.01 +G_COM_PORT = 'COM0' +G_SERIAL_TIMEOUT = 0.01 # Time related -tmTimeout = 10 -tcSendTimeoutFactor = 2.0 +G_TM_TIMEOUT = 10 +G_TC_SEND_TIMEOUT_FACTOR = 2.0 # Ethernet connection settings -recAddress = 0 -sendAddress = (0, 0) +G_REC_ADDRESS = 0 +G_SEND_ADDRESS = (0, 0) # Print Settings -G_PRINT_TO_FILE = False +G_PRINT_TO_FILE = True G_PRINT_HK_DATA = False -printRawTmData = False +G_PRINT_RAW_TM = False """ These objects are set for the Unit Test, no better solution found yet @@ -84,15 +84,15 @@ tmtcPrinter = None # noinspection PyUnusedLocal def setGlobals(args): - global recAddress, sendAddress, scriptMode, modeId, service, displayMode, comIF, comPort - global serial_timeout, tmTimeout, tcSendTimeoutFactor, G_PRINT_TO_FILE - global G_PRINT_HK_DATA, printRawTmData + global G_REC_ADDRESS, G_SEND_ADDRESS, G_SCRIPT_MODE, G_MODE_ID, G_SERVICE, G_DISPLAY_MODE + global G_COM_IF, G_COM_PORT, G_SERIAL_TIMEOUT, G_TM_TIMEOUT, G_TC_SEND_TIMEOUT_FACTOR, G_PRINT_TO_FILE + global G_PRINT_HK_DATA, G_PRINT_RAW_TM if args.mode == 0: print("GUI mode not implemented yet !") if args.shortDisplayMode: - displayMode = "short" + G_DISPLAY_MODE = "short" else: - displayMode = "long" + G_DISPLAY_MODE = "long" # Board IP address and ethernet port IP address can be passed optionally # by passing line parameter In PyCharm: Set parameters in run configuration # Add IP address of Ethernet port. Use command ipconfig in windows console or ifconfig in Linux. @@ -103,41 +103,41 @@ def setGlobals(args): sendAddressToSet = (args.boardIP, portSend) if 0 <= args.mode <= 5: if args.mode == 0: - modeId = ModeList.GUIMode + G_MODE_ID = ModeList.GUIMode elif args.mode == 1: - modeId = ModeList.ListenerMode + G_MODE_ID = ModeList.ListenerMode elif args.mode == 2: - modeId = ModeList.SingleCommandMode + G_MODE_ID = ModeList.SingleCommandMode elif args.mode == 3: - modeId = ModeList.ServiceTestMode + G_MODE_ID = ModeList.ServiceTestMode elif args.mode == 4: - modeId = ModeList.SoftwareTestMode + G_MODE_ID = ModeList.SoftwareTestMode elif args.mode == 5: - modeId = ModeList.UnitTest + G_MODE_ID = ModeList.UnitTest else: print("Invalid mode argument, setting default mode (1)") - modeId = ModeList[1] + G_MODE_ID = ModeList[1] if args.comIF == 0: - comIF = ComIF.Ethernet + G_COM_IF = ComIF.Ethernet elif args.comIF == 1: - comIF = ComIF.Serial + G_COM_IF = ComIF.Serial else: print("Invalid Communication Interface, setting serial (1)") - comIF = ComIF.Serial + G_COM_IF = ComIF.Serial - service = str(args.service) - if service.isdigit(): - service = int(args.service) + G_SERVICE = str(args.service) + if G_SERVICE.isdigit(): + G_SERVICE = int(args.service) else: - service = args.service - recAddress = recAddressToSet - sendAddress = sendAddressToSet - modeId = modeId - comPort = args.COM - G_PRINT_HK_DATA = args.hk - tmTimeout = args.tmTimeout - printRawTmData = args.rawDataPrint - displayMode = displayMode - service = service + G_SERVICE = args.service + G_REC_ADDRESS = recAddressToSet + G_SEND_ADDRESS = sendAddressToSet + G_MODE_ID = G_MODE_ID + G_PRINT_HK_DATA = args.print_hk G_PRINT_TO_FILE = args.printFile + G_PRINT_RAW_TM = args.rawDataPrint + G_SERVICE = G_SERVICE + G_COM_PORT = args.com_port + G_TM_TIMEOUT = args.tm_timeout + diff --git a/gui/OBSW_TmtcGUI.py b/gui/OBSW_TmtcGUI.py index b6684bba5d6993588d1db8a98a2a37fd965015ec..5a55c6b55f9e5242b49549b64402a1a3265eed12 100644 --- a/gui/OBSW_TmtcGUI.py +++ b/gui/OBSW_TmtcGUI.py @@ -27,7 +27,7 @@ import time # include a really nice source badge and make it large ! # plan this on paper first... # Step 1: Huge Mission Badge in Tkinter window because that is cool. -# Step 2: Simple buttons to run service test around the badge. +# Step 2: Simple buttons to run G_SERVICE test around the badge. class TmTcGUI(Process): def __init__(self): super(TmTcGUI, self).__init__() diff --git a/obsw_tmtc_client.py b/obsw_tmtc_client.py index b87a27fa7bae48c8e780924c3ffa4abed8a8fff4..7ec3c6338f3ddb830a0762ae07228347644e2fa5 100644 --- a/obsw_tmtc_client.py +++ b/obsw_tmtc_client.py @@ -20,13 +20,13 @@ GUI is work-in-progress It might be necessary to set board or PC IP address if using ethernet communication. Default values should work normally though. Use higher timeout value (-t Parameter) for STM32 -Example command to test service 17, +Example command to test G_SERVICE 17, assuming no set client IP (set manually to PC IP Address if necessary) and default board IP 169.254.1.38: obsw_tmtc_client.py -m 3 -s 17 Example to run Unit Test: obsw_tmtc_client.py -m 5 -Example to test service 17 with HK output and serial communication: +Example to test G_SERVICE 17 with HK output and serial communication: obsw_tmtc_client.py -m 3 -s 17 --hk -c 1 Get command line help: obsw_tmtc_client.py -h @@ -36,7 +36,7 @@ There are four different Modes: 1. Listener Mode: Only Listen for incoming TM packets 2. SingleCommandMode: Send Single Command repeatedly until answer is received, only listen after that - 3. ServiceTestMode: Send all Telecommands belonging to a certain service + 3. ServiceTestMode: Send all Telecommands belonging to a certain G_SERVICE and scan for replies for each telecommand. Listen after that 4. SoftwareTestMode: Send all services and perform reply scanning like mode 3. Listen after that @@ -87,8 +87,8 @@ def main(): """ args = parseInputArguments() setGlobals(args) - tmtc_printer = TmTcPrinter(g.displayMode, g.G_PRINT_TO_FILE, True) - if g.modeId == g.ModeList.GUIMode: + tmtc_printer = TmTcPrinter(g.G_DISPLAY_MODE, g.G_PRINT_TO_FILE, True) + if g.G_MODE_ID == g.ModeList.GUIMode: backend = TmTcBackend() backend.start() gui = TmTcGUI() @@ -101,35 +101,35 @@ def main(): communication_interface = set_communication_interface(tmtc_printer) atexit.register(keyboardInterruptHandler, comInterface=communication_interface) tm_listener = TmListener( - comInterface=communication_interface, tmTimeout=g.tmTimeout, - tcTimeoutFactor=g.tcSendTimeoutFactor) + comInterface=communication_interface, tmTimeout=g.G_TM_TIMEOUT, + tcTimeoutFactor=g.G_TC_SEND_TIMEOUT_FACTOR) tm_listener.start() - if g.modeId == g.ModeList.ListenerMode: + if g.G_MODE_ID == g.ModeList.ListenerMode: print("Listening for packages...") - elif g.modeId == g.ModeList.SingleCommandMode: + elif g.G_MODE_ID == g.ModeList.SingleCommandMode: pus_packet_tuple = command_preparation() sender_and_receiver = SingleCommandSenderReceiver( comInterface=communication_interface, tmtcPrinter=tmtc_printer, tmListener=tm_listener, - pusPacketTuple=pus_packet_tuple, tmTimeout=g.tmTimeout, - tcTimeoutFactor=g.tcSendTimeoutFactor, doPrintToFile=g.G_PRINT_TO_FILE) + pusPacketTuple=pus_packet_tuple, tmTimeout=g.G_TM_TIMEOUT, + tcTimeoutFactor=g.G_TC_SEND_TIMEOUT_FACTOR, doPrintToFile=g.G_PRINT_TO_FILE) sender_and_receiver.send_single_tc_and_receive_tm() - elif g.modeId == g.ModeList.ServiceTestMode: + elif g.G_MODE_ID == g.ModeList.ServiceTestMode: service_queue = deque() sender_and_receiver = SequentialCommandSenderReceiver( - comInterface=communication_interface, tmtcPrinter=tmtc_printer, tmTimeout=g.tmTimeout, - tcQueue=serviceTestSelect(g.service, service_queue), - tcTimeoutFactor=g.tcSendTimeoutFactor, tmListener=tm_listener) + comInterface=communication_interface, tmtcPrinter=tmtc_printer, tmTimeout=g.G_TM_TIMEOUT, + tcQueue=serviceTestSelect(g.G_SERVICE, service_queue), + tcTimeoutFactor=g.G_TC_SEND_TIMEOUT_FACTOR, tmListener=tm_listener) sender_and_receiver.sendQueueTcAndReceiveTmSequentially() - elif g.modeId == g.ModeList.SoftwareTestMode: + elif g.G_MODE_ID == g.ModeList.SoftwareTestMode: all_tc_queue = createTotalTcQueue() sender_and_receiver = SequentialCommandSenderReceiver( - comInterface=communication_interface, tmtcPrinter=tmtc_printer, tmTimeout=g.tmTimeout, - tcQueue=all_tc_queue, tcTimeoutFactor=g.tcSendTimeoutFactor, tmListener=tm_listener) + comInterface=communication_interface, tmtcPrinter=tmtc_printer, tmTimeout=g.G_TM_TIMEOUT, + tcQueue=all_tc_queue, tcTimeoutFactor=g.G_TC_SEND_TIMEOUT_FACTOR, tmListener=tm_listener) sender_and_receiver.sendQueueTcAndReceiveTmSequentially() - elif g.modeId == g.ModeList.UnitTest: + elif g.G_MODE_ID == g.ModeList.UnitTest: # Set up test suite and run it with runner. Verbosity specifies detail level g.tmListener = tm_listener g.comInterface = communication_interface @@ -158,7 +158,7 @@ def command_preparation(): command = PUSTelecommand(service=17, subservice=1, SSC=21) # command.print() # file = bytearray([1, 2, 3, 4, 5]) - # command = PUSTelecommand(service=23, subservice=1, SSC=21, data=file) + # command = PUSTelecommand(G_SERVICE=23, subservice=1, SSC=21, data=file) # command.packCommandTuple() return command.packCommandTuple() @@ -170,18 +170,18 @@ def set_communication_interface(tmtc_printer: TmTcPrinterT) -> ComIF_T: :return: CommunicationInterface object """ try: - if g.comIF == g.ComIF.Ethernet: + if g.G_COM_IF == g.ComIF.Ethernet: communication_interface = EthernetComIF( - tmtcPrinter=tmtc_printer, tmTimeout=g.tmTimeout, - tcTimeoutFactor=g.tcSendTimeoutFactor, sendAddress=g.sendAddress, - receiveAddress=g.recAddress) + tmtcPrinter=tmtc_printer, tmTimeout=g.G_TM_TIMEOUT, + tcTimeoutFactor=g.G_TC_SEND_TIMEOUT_FACTOR, sendAddress=g.G_SEND_ADDRESS, + receiveAddress=g.G_REC_ADDRESS) else: - com_port = g.comPort + com_port = g.G_COM_PORT baud_rate = 115200 - g.serial_timeout = 0.05 + g.G_SERIAL_TIMEOUT = 0.05 communication_interface = SerialComIF( tmtcPrinter=tmtc_printer, comPort=com_port, baudRate=baud_rate, - serialTimeout=g.serial_timeout) + serialTimeout=g.G_SERIAL_TIMEOUT) # else: # pass return communication_interface diff --git a/sendreceive/OBSW_MultipleCommandsSenderReceiver.py b/sendreceive/OBSW_MultipleCommandsSenderReceiver.py index 9447a275b301a83a3c9f354b59594ed2722884e9..2b86b4e1a229f20a9d5238ea81b5b5921eb66736 100644 --- a/sendreceive/OBSW_MultipleCommandsSenderReceiver.py +++ b/sendreceive/OBSW_MultipleCommandsSenderReceiver.py @@ -22,9 +22,9 @@ class MultipleCommandSenderReceiver(SequentialCommandSenderReceiver): Difference to seqential sender: This class can send TCs in bursts. Wait intervals can be specified with wait time between the send bursts. This is generally done in the separate test classes in UnitTest """ - def __init__(self, comInterface: ComIF_T, tmtcPrinter: TmTcPrinterT, tcQueue: Deque, tmListener: TmListenerT, - tmTimeout: float, waitIntervals: list, waitTime: Union[float, list], printTm: bool, - tcTimeoutFactor: float, doPrintToFile: bool): + def __init__(self, comInterface: ComIF_T, tmtcPrinter: TmTcPrinterT, tcQueue: Deque, + tmListener: TmListenerT, tmTimeout: float, waitIntervals: list, + waitTime: Union[float, list], printTm: bool, tcTimeoutFactor: float): """ TCs are sent in burst when applicable. Wait intervals can be specified by supplying respective arguments @@ -57,10 +57,11 @@ class MultipleCommandSenderReceiver(SequentialCommandSenderReceiver): self._tm_listener.modeChangeEvent.set() # TC info queue is set in this function self.sendAllQueue() - self.handleLastRepliesListening(self.tm_timeout / 2.0) + self.handleLastRepliesListening(self.tm_timeout / 1.5) self.tmInfoQueue = self.retrieveListenerTmInfoQueue() self._tm_listener.modeOpFinished.set() if g.G_PRINT_TO_FILE: + print("blub") self._tmtc_printer.print_to_file() except (KeyboardInterrupt, SystemExit): print("Keyboard Interrupt or System Exit detected") @@ -80,9 +81,11 @@ class MultipleCommandSenderReceiver(SequentialCommandSenderReceiver): self.sendAndPrintTc() def sendAndPrintTc(self): - if self.check_queue_entry(self._tcQueue.pop()): - self.tcInfoQueue.append(self.pusPacketInfo) - self._com_interface.sendTelecommand(self.pusPacket, self.pusPacketInfo) + tc_queue_tuple = self._tcQueue.pop() + if self.check_queue_entry(tc_queue_tuple): + pus_packet, pus_packet_info = tc_queue_tuple + self.tcInfoQueue.append(pus_packet_info) + self._com_interface.sendTelecommand(pus_packet, pus_packet_info) self.handleWaiting() def handleWaiting(self): diff --git a/sendreceive/OBSW_SequentialSenderReceiver.py b/sendreceive/OBSW_SequentialSenderReceiver.py index 4c2ae27e0beb75717fe1ae0f9a57f875ec686ab8..ef73d6bc2f6a1d206cc02bab973bd56a79945e27 100644 --- a/sendreceive/OBSW_SequentialSenderReceiver.py +++ b/sendreceive/OBSW_SequentialSenderReceiver.py @@ -29,7 +29,7 @@ class SequentialCommandSenderReceiver(CommandSenderReceiver): :param tmListener: TmListener object which runs in the background and receives all Telemetry :param tmtcPrinter: TmTcPrinter object, passed on to CommandSenderReceiver :param tmTimeout: TM Timeout. After each sent telecommand, listen for TMs for this time period - :param tcTimeoutFactor: If TM is not received, resend TC after tmTimeout * tcSendTimeoutFactor + :param tcTimeoutFactor: If TM is not received, resend TC after G_TM_TIMEOUT * G_TC_SEND_TIMEOUT_FACTOR :param doPrintToFile: Specify whether output is also printed to a file """ super().__init__( diff --git a/sendreceive/obsw_command_sender_receiver.py b/sendreceive/obsw_command_sender_receiver.py index 453caaf629f3ab887c8e1293554493e837086bda..36fcb7ad84ce26f9e65bedd764dbc58c72eb9b32 100644 --- a/sendreceive/obsw_command_sender_receiver.py +++ b/sendreceive/obsw_command_sender_receiver.py @@ -38,7 +38,7 @@ class CommandSenderReceiver: :param tmTimeout: TM Timeout. After each sent telecommand, listen for TMs for this time period :param tcSendTimeoutFactor: If TM is not received, - resend TC after tmTimeout * tcSendTimeoutFactor + resend TC after G_TM_TIMEOUT * G_TC_SEND_TIMEOUT_FACTOR """ self.tm_timeout = tmTimeout self.tc_send_timeout_factor = tcSendTimeoutFactor diff --git a/sendreceive/obsw_single_command_sender_receiver.py b/sendreceive/obsw_single_command_sender_receiver.py index 27be3c18ce50c958fa70e856d60ddff168043a11..238f0d35f58af6fb52960e4659facb686cfe5f7b 100644 --- a/sendreceive/obsw_single_command_sender_receiver.py +++ b/sendreceive/obsw_single_command_sender_receiver.py @@ -31,7 +31,7 @@ class SingleCommandSenderReceiver(CommandSenderReceiver): :param tmTimeout: TM Timeout. After each sent telecommand, listen for TMs for this time period :param tcTimeoutFactor: If TM is not received, - resend TC after tmTimeout * tcSendTimeoutFactor + resend TC after G_TM_TIMEOUT * G_TC_SEND_TIMEOUT_FACTOR :param doPrintToFile: Specify whether output is also printed to a file """ super().__init__(comInterface=comInterface, tmListener=tmListener, tmtcPrinter=tmtcPrinter, diff --git a/sendreceive/obsw_tm_listener.py b/sendreceive/obsw_tm_listener.py index 0df22c0b7167ba7dd7fc276455276a62f662cb85..0155d7fed23a9bbea3d549da2f79f76ff61cb15c 100644 --- a/sendreceive/obsw_tm_listener.py +++ b/sendreceive/obsw_tm_listener.py @@ -123,6 +123,6 @@ class TmListener: elapsed_time = time.time() - start_time # the timeout value can be set by special TC queue entries if packet handling # takes longer, but it is reset here to the global value - if self.tmTimeout is not g.tmTimeout: - self.tmTimeout = g.tmTimeout + if self.tmTimeout is not g.G_TM_TIMEOUT: + self.tmTimeout = g.G_TM_TIMEOUT return True diff --git a/tc/OBSW_TcPacker.py b/tc/OBSW_TcPacker.py index fbab63c29f8ec46d0056042edc64955b63cf1eff..2bd476521d4ac6df437d5e48a905598aad985f6b 100644 --- a/tc/OBSW_TcPacker.py +++ b/tc/OBSW_TcPacker.py @@ -2,11 +2,11 @@ """ Program: OBSW_UnitTest.py Date: 01.11.2019 -Description: Packs the TC queue for specific service or device testing +Description: Packs the TC queue for specific G_SERVICE or device testing Manual: -Contains a service select factory which returns specific service or device tests. -Also contains an all service and all device tc queue packer. +Contains a G_SERVICE select factory which returns specific G_SERVICE or device tests. +Also contains an all G_SERVICE and all device tc queue packer. Run script with -s <Service or Device> -m 3 with optional -p parameter for file output inside the log folder @author: R. Mueller diff --git a/tc/OBSW_TcPacket.py b/tc/OBSW_TcPacket.py index fad67cbfa47a95144ae3cdb3a4e89406262934c4..9bd5e21aca0553c17651effb1c957614e4d3b209 100644 --- a/tc/OBSW_TcPacket.py +++ b/tc/OBSW_TcPacket.py @@ -70,7 +70,7 @@ class PUSTelecommand: def packInformation(self) -> pusTcInfoT: tcInformation = { - "service": self.service, + "G_SERVICE": self.service, "subservice": self.subservice, "ssc": self.ssc, "packetId": self.packetId, diff --git a/tc/OBSW_TcService2.py b/tc/OBSW_TcService2.py index f0ca1543af99eed73efeb6151873d6274ffedef9..4b8d629e02e0e15d2d8410b2e161ede60b1ceab4 100644 --- a/tc/OBSW_TcService2.py +++ b/tc/OBSW_TcService2.py @@ -17,7 +17,7 @@ def packService2TestInto(tcQueue: Deque, calledExternally: bool = False) -> Dequ if calledExternally is False: tcQueue.appendleft(("print", "Testing Service 2")) objectId = bytearray([0x44, 0x00, 0xAF, 0xFE]) # dummy device - # don't forget to set object mode raw (service 200) before calling this ! + # don't forget to set object mode raw (G_SERVICE 200) before calling this ! # Set Raw Mode tcQueue.appendleft(("print", "\r\nTesting Service 2: Setting Raw Mode")) modeData = packModeData(objectId, 3, 0) diff --git a/tc/OBSW_TcService200.py b/tc/OBSW_TcService200.py index 3abc8e9634485e1df489c6ef34616409594b6194..31105dbf455a85f22e1dde942e16f5410306b623 100644 --- a/tc/OBSW_TcService200.py +++ b/tc/OBSW_TcService200.py @@ -5,8 +5,8 @@ Date: 01.11.2019 Description: PUS Custom Service 200: Mode Commanding Manual: -Contains a service select factory which returns specific service or device tests. -Also contains an all service and all device tc queue packer. +Contains a G_SERVICE select factory which returns specific G_SERVICE or device tests. +Also contains an all G_SERVICE and all device tc queue packer. Run script with -s <Service or Device> -m 3 with optional -p parameter for file output inside the log folder @author: R. Mueller diff --git a/test/OBSW_PusServiceTest.py b/test/OBSW_PusServiceTest.py index b73404fd657d0751243e8b5b1ecf88c7fa0b000b..6770b1587292761407ef1be060172b29ccbbbf05 100644 --- a/test/OBSW_PusServiceTest.py +++ b/test/OBSW_PusServiceTest.py @@ -12,7 +12,7 @@ class TestService2(TestService): print("Testing Service 2") # all commands must be sent sequentially, not as a burst cls.waitIntervals = [1, 2, 3, 4] - cls.waitTime = [2, 2, 2, 2] + cls.waitTime = [2, 2, 2, 3] packService2TestInto(cls.testQueue) def test_Service2(self): @@ -25,20 +25,20 @@ class TestService2(TestService): while not tmInfoQueue.__len__() == 0: currentTmInfo = tmInfoQueue.pop() # Tc verification scanning is generic and has been moved to the superclass - if currentTmInfo["service"] == 1: + if currentTmInfo["G_SERVICE"] == 1: super().scanForRespectiveTc(currentTmInfo) # Here, the desired event Id or RID can be specified - if currentTmInfo["service"] == 5: + if currentTmInfo["G_SERVICE"] == 5: # mode change if currentTmInfo["EventID"] == 7401 and currentTmInfo["RID"] == 0x4400affe: self.eventCounter = self.eventCounter + 1 - if currentTmInfo["service"] == 200 and currentTmInfo["subservice"] == 6: + if currentTmInfo["G_SERVICE"] == 200 and currentTmInfo["subservice"] == 6: # mode change confirmation self.miscCounter = self.miscCounter + 1 # wiretapping messages - elif currentTmInfo["service"] == 2 and currentTmInfo["subservice"] == 130: + elif currentTmInfo["G_SERVICE"] == 2 and currentTmInfo["subservice"] == 130: self.miscCounter = self.miscCounter + 1 - elif currentTmInfo["service"] == 2 and currentTmInfo["subservice"] == 131: + elif currentTmInfo["G_SERVICE"] == 2 and currentTmInfo["subservice"] == 131: self.miscCounter = self.miscCounter + 1 if currentTmInfo["valid"] == 0: self.valid = False @@ -96,7 +96,7 @@ class TestService17(TestService): print("Testing Service 17") cls.waitIntervals = [2] cls.waitTime = 2 - cls.tmTimeout = g.tmTimeout + cls.tmTimeout = g.G_TM_TIMEOUT packService17TestInto(cls.testQueue) def test_Service17(self): diff --git a/test/OBSW_UnitTest.py b/test/OBSW_UnitTest.py index 800559c78b3014f1d87c3fc6b5391521a524012a..40e8aab624e3453f63eeb9dc241e56627fd8ae45 100644 --- a/test/OBSW_UnitTest.py +++ b/test/OBSW_UnitTest.py @@ -13,7 +13,7 @@ For Developers: TestService is the template method, analyseTmInfo and analyseTcInfo are implemented for specific services or devices by setting up assertionDict entries which count received packets based on subservice or entry values. There is a default implementation provided for analyseTcInfo. -The log files generated by individual service test or the software test mode when providing +The log files generated by individual G_SERVICE test or the software test mode when providing the -p flag are very useful for extending the unit tester. To check the dictionary keys of the telecommand and telemetry information queue entries, go look up the packTmInformation() and packTcInformation() implementations in the TM and TC folder. @@ -45,7 +45,7 @@ TmInfoQueueService1T = Deque[pusPacketInfoService1T] class TestService(unittest.TestCase): """ - Generic service test class. + Generic G_SERVICE test class. See documentation: https://docs.python.org/3/library/unittest.html#unittest.TestCase.setUpClass """ @classmethod @@ -67,8 +67,8 @@ class TestService(unittest.TestCase): cls.testQueue = deque() # Extremely ugly solution so that we can use the Python Unit Test Framework # default timeout for receiving TM, set in subclass manually - cls.tmTimeout = g.tmTimeout - cls.tcTimeoutFactor = g.tcSendTimeoutFactor + cls.tmTimeout = g.G_TM_TIMEOUT + cls.tcTimeoutFactor = g.G_TC_SEND_TIMEOUT_FACTOR cls.printFile = g.G_PRINT_TO_FILE cls.tmtcPrinter = g.tmtcPrinter cls.tmListener = g.tmListener @@ -84,7 +84,7 @@ class TestService(unittest.TestCase): comInterface=self.communicationInterface, tmtcPrinter=self.tmtcPrinter, tmListener=self.tmListener, tcQueue=self.testQueue, tmTimeout=self.tmTimeout, waitIntervals=self.waitIntervals, waitTime=self.waitTime, printTm=self.printTm, - tcTimeoutFactor=self.tcTimeoutFactor, doPrintToFile=self.printFile) + tcTimeoutFactor=self.tcTimeoutFactor) (tcInfoQueue, tmInfoQueue) = UnitTester.sendTcQueueAndReturnInfo() assertionDict = self.analyseTmTcInfo(tmInfoQueue, tcInfoQueue) return assertionDict @@ -134,7 +134,7 @@ class TestService(unittest.TestCase): # For commands with multiple steps, update this value manually ! self.tcVerifyStepCounter = self.tcVerifyCounter + 1 self.tcSscArray.append(currentTcInfo["ssc"]) - self.tcServiceArray.append(currentTcInfo["service"]) + self.tcServiceArray.append(currentTcInfo["G_SERVICE"]) self.tcSubserviceArray.append(currentTcInfo["subservice"]) # must be implemented by child test ! @@ -178,13 +178,13 @@ class TestService(unittest.TestCase): while not tmInfoQueue.__len__() == 0: currentTmInfo = tmInfoQueue.pop() # Tc verification scanning is generic and has been moved to the superclass - if currentTmInfo["service"] == 1: + if currentTmInfo["G_SERVICE"] == 1: self.scanForRespectiveTc(currentTmInfo) # Here, the desired event Id or RID can be specified - if currentTmInfo["service"] == 5: + if currentTmInfo["G_SERVICE"] == 5: if currentTmInfo["EventID"] == 8200 and currentTmInfo["RID"] == 0x51001700: self.eventCounter = self.eventCounter + 1 - if currentTmInfo["service"] == 17: + if currentTmInfo["G_SERVICE"] == 17: self.miscCounter = self.miscCounter + 1 if currentTmInfo["valid"] == 0: self.valid = False diff --git a/tm/OBSW_PusTm.py b/tm/OBSW_PusTm.py index 5cc6c574adaff48564e691ee868ca57bbaa264f0..cc82f7dac0bc76a4cacb66cdc845c43c5ef79348 100644 --- a/tm/OBSW_PusTm.py +++ b/tm/OBSW_PusTm.py @@ -34,7 +34,7 @@ class PUSTelemetry(OBSWPusPacket): def packTmInformation(self) -> pusTmInfoT: tmInformation = { - "service": self.getService(), + "G_SERVICE": self.getService(), "subservice": self.getSubservice(), "ssc": self.getSSC(), "data": self.byteArrayData, diff --git a/tm/OBSW_TmPacket.py b/tm/OBSW_TmPacket.py index a263d11d0fdb6cdf4a2d78bd57f0b36ba6f44f8e..e05f52e476ed6ae3d5c5150940fc909138aa026e 100644 --- a/tm/OBSW_TmPacket.py +++ b/tm/OBSW_TmPacket.py @@ -15,7 +15,7 @@ import struct def PUSTelemetryFactory(rawPacket): servicetype = rawPacket[7] - # extract service type from data + # extract G_SERVICE type from data if servicetype == 1: return Service1TM(rawPacket) elif servicetype == 2: @@ -31,7 +31,7 @@ def PUSTelemetryFactory(rawPacket): elif servicetype == 200: return Service200TM(rawPacket) else: - print("The service " + str(servicetype) + " is not implemented in Telemetry Factory") + print("The G_SERVICE " + str(servicetype) + " is not implemented in Telemetry Factory") return PUSTelemetry(rawPacket) diff --git a/utility/OBSW_ArgParser.py b/utility/OBSW_ArgParser.py index dba2a699879b81e13621ae95986668b62127181b..24c7700e2249e13aa2917d80513f0b5c9861827f 100644 --- a/utility/OBSW_ArgParser.py +++ b/utility/OBSW_ArgParser.py @@ -19,37 +19,51 @@ def parseInputArguments(): """ argParser = argparse.ArgumentParser(description="TMTC Client Command Line Interface") - argParser.add_argument('-m', '--mode', type=int, help='Target Mode. Default is 1(Listener Mode), ' - '0: GUI Mode, 1:Listener Mode, ' - '2: Single Command Mode, 3: Service Test Mode, ' - '4: Software Test Mode, 5: Unit Test Mode ', default=0) - argParser.add_argument('-c', '--comIF', type=int, help='Communication Interface. 0 for Ethernet, 1 for Serial', - default=0) + argParser.add_argument( + '-m', '--mode', type=int, help='Target Mode. Default is 1(Listener Mode), ' + '0: GUI Mode, 1:Listener Mode, 2: Single Command Mode, 3: Service Test Mode, ' + '4: Software Test Mode, 5: Unit Test Mode ', default=0) + argParser.add_argument( + '-c', '--comIF', type=int, help='Communication Interface. 0 for Ethernet, 1 for Serial', + default=0) argParser.add_argument('--clientIP', help='Client(Computer) IP. Default:\'\'', default='') - argParser.add_argument('--boardIP', help='Board IP. Default: 169.254.1.38', default='169.254.1.38') + argParser.add_argument( + '--boardIP', help='Board IP. Default: 169.254.1.38', default='169.254.1.38') argParser.add_argument('-s', '--service', help='Service to test. Default: 17', default=17) - argParser.add_argument('-t', '--tmTimeout', type=float, help='TM Timeout when listening to verification sequence.' - ' Default: 12, 6(Serial)', default=6.0) - argParser.add_argument('-p', '--printFile', help='Supply -p to print output to file. Default: False', - action='store_true') + argParser.add_argument('-t', '--tm_timeout', type=float, help='TM Timeout when listening to ' + 'verification sequence. Default: 12, 6(Serial)', default=6.0) + argParser.add_argument('--np', dest='printFile', help='Supply --np to suppress print ' + 'output to file.', action='store_false') argParser.add_argument('-o', '--tcTimeoutFactor', type=float, help='TC Timeout Factor. Multiplied with TM Timeout, ' 'TC sent again after this time period. Default: 3.5', default=3.5) - argParser.add_argument('-r', '--rawDataPrint', help='Supply -r to print all raw data directly', action='store_true') - argParser.add_argument('-d', '--shortDisplayMode', help='Supply -d to print short output', action='store_true') - argParser.add_argument('-k', '--hk', help='Supply -k or --hk to print HK data', action='store_true') - argParser.add_argument('--COM', help='COM Port for serial communication') + argParser.add_argument('-r', '--rawDataPrint', help='Supply -r to print all raw data directly', + action='store_true') + argParser.add_argument('-d', '--shortDisplayMode', help='Supply -d to print short output', + action='store_true') + argParser.add_argument('--hk', dest='print_hk', help='Supply -k or --hk to print HK data', + action='store_true') + argParser.add_argument('--COM', dest="com_port", help='COM Port for serial communication') if len(sys.argv) == 1: print("No Input Arguments specified.") argParser.print_help() - args = argParser.parse_args() - assignEmptyArgs(args) + args, unknown = argParser.parse_known_args() print(args) + handle_args(args, unknown) return args -def assignEmptyArgs(args): +def handle_args(args, unknown: list): + """ + Handles the parsed arguments. + :param args: Namespace objects + (see https://docs.python.org/dev/library/argparse.html#argparse.Namespace) + :param unknown: List of unknown parameters. + :return: + """ + if len(unknown) > 0: + print("Unknown arguments detected: " + str(unknown)) if len(sys.argv) > 1: handleUnspecifiedArgs(args) if len(sys.argv) == 1: @@ -57,10 +71,11 @@ def assignEmptyArgs(args): def handleUnspecifiedArgs(args): - if args.comIF == 1 and args.tmTimeout is None: + if args.comIF == 1 and args.tm_timeout is None: args.tmTimeout = 6.0 - if args.comIF == 1 and args.COM is None: - args.COM = input("Serial Commuinication specified without COM port. Please enter COM Port: ") + if args.comIF == 1 and args.com_port is None: + args.com_port = input("Serial Commuinication specified without COM port. " + "Please enter COM Port: ") if args.mode is None: print("No mode specified with -m Parameter.") print("Possible Modes: ") @@ -72,7 +87,7 @@ def handleUnspecifiedArgs(args): args.mode = input("Please enter Mode: ") if args.mode == 1 and args.service is None: args.service = input("No Service specified for Service Mode. " - "Please enter PUS service number: ") + "Please enter PUS G_SERVICE number: ") def handleEmptyArgs(args): @@ -82,15 +97,15 @@ def handleEmptyArgs(args): except TypeError: pass if printHk in ('y', 'yes', 1): - args.hk = True + args.print_hk = True else: - args.hk = False - printToLog = input("Export service test output to log files ? (y/n or yes/no)") + args.print_hk = False + printToLog = input("Export G_SERVICE test output to log files ? (y/n or yes/no)") try: printToLog = printToLog.lower() except TypeError: pass - if printToLog == 'y' or printToLog == 'yes' or printHk == 1: - args.printFile = True - else: + if printToLog == 'n' or printToLog == 'no' or printToLog == 0: args.printFile = False + else: + args.printFile = True diff --git a/utility/obsw_tmtc_printer.py b/utility/obsw_tmtc_printer.py index d3822bc0258bc5d66db381709f13a6641d47be7f..3c65c1a4df3eb94713f325a2d26be724805fb1fa 100644 --- a/utility/obsw_tmtc_printer.py +++ b/utility/obsw_tmtc_printer.py @@ -55,7 +55,7 @@ class TmTcPrinter: if packet.getService() == 3 and \ (packet.getSubservice() == 10 or packet.getSubservice() == 12): self.__handle_hk_definition_print(packet) - if g.printRawTmData: + if g.G_PRINT_RAW_TM: self.print_buffer = "TM Data:" + "\n" + self.return_data_string(packet.data) print(self.print_buffer) self.add_print_buffer_to_file_buffer() @@ -275,7 +275,7 @@ class TmTcPrinter: :param tc_packet_info: :return: """ - self.print_buffer = "Sent TC[" + str(tc_packet_info["service"]) + "," + \ + self.print_buffer = "Sent TC[" + str(tc_packet_info["G_SERVICE"]) + "," + \ str(tc_packet_info["subservice"]) + "] " + " with SSC " + \ str(tc_packet_info["ssc"]) print(self.print_buffer) @@ -288,7 +288,7 @@ class TmTcPrinter: :return: """ try: - self.print_buffer = "Telecommand TC[" + str(tc_packet_info["service"]) + "," + \ + self.print_buffer = "Telecommand TC[" + str(tc_packet_info["G_SERVICE"]) + "," + \ str(tc_packet_info["subservice"]) + "] with SSC " + \ str(tc_packet_info["ssc"]) + " sent with data " + \ self.return_data_string(tc_packet_info["data"])