diff --git a/OBSW_Config.py b/OBSW_Config.py new file mode 100644 index 0000000000000000000000000000000000000000..7dfe133b9cbf5e65e1cd12f5e6e090cd42d405cf --- /dev/null +++ b/OBSW_Config.py @@ -0,0 +1,33 @@ +""" +@file + OBSW_Config.py +@date + 01.11.2019 +@brief + Global settings for UDP client +""" +import socket + +""" +All global variables, set in main program with arg parser +""" +# General Settings +scriptMode = 1 +modeId = 0 +service = 17 +displayMode = "long" + +# Time related +tmTimeout = 10 +tcSendTimeoutFactor = 2.0 + +# Ethernet connection settings +recAddress = 0 +sendAddress = 0 +sockSend = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) +sockReceive = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + +# Print Settings +printToFile = False +printRawTmData = False + diff --git a/OBSW_UdpClient.py b/OBSW_UdpClient.py new file mode 100644 index 0000000000000000000000000000000000000000..5323f9e06f16a7ea40a28beb26b1b26ec16a6a7a --- /dev/null +++ b/OBSW_UdpClient.py @@ -0,0 +1,205 @@ +#!/usr/bin/python3.7 +# -*- coding: utf-8 -*- +""" +@file + OBSW_UdpClient.py +@date + 01.11.2019 +@brief + This client was developed by KSat for the SOURCE project to test the on-board software. + It can be used to to send and receive TMTC packages. + TODO: Decouple communication interface (Serial, Ethernet) from SendReceiveClasses +@manual + Manual installation of crcmod might be needed + 1. Install pip if it is not installed yet + 2. Install crcmod and all other reqiored packages: + Command: pip install crcmod + or use IDE (interpreter settings -> pip in PyCharm) + + The script can be used by specifying command line parameters. Please run this script with the -h flag + or without any command line parameters to display options. GUI is work-in-progress + It might be necessary to set board or PC IP address if using ethernet communication. + Default values should work normally though. + + Example command to test service 17, + assuming no set client IP (set manually to PC IP Address if necessary) and default board IP 169.254.1.38: + OBSW_UdpClient.py -m 3 -s 17 + Example to run Unit Test: + OBSW_UdpClient.py -m 5 + + There are four different Modes: + 0. GUI Mode: Experimental mode, also called if no input parameter are specified + 1. Listener Mode: Only Listen for incoming TM packets + 2. SingleCommandMode: Send Single Command repeatedly until answer is received, + only listen after that + 3. ServiceTestMode: Send all Telecommands belonging to a certain service + and scan for replies for each telecommand. Listen after that + 4. SoftwareTestMode: Send all services and perform reply scanning like mode 3. + Listen after that + 5. Unit Test Mode: Performs a unit test which returns a simple OK or NOT OK. This mode + has the capability to send TCs in bursts, where applicable + + The TC timeout factor is mulitplied with the TM timeout to specifiy + when a TC is sent again, if no reply is received. The timeout factor is higher for the STM32. + + If there are problems receiving packets, use the tool Wireshark to track ethernet communication + for UDP echo packets (requests and response). + If the packets appear, there might be a problematic firewall setting. + Please ensure that python.exe UDP packets are not blocked in advanced firewall settings + and create a rule to allow packets from port 2008. + +@author: + S. Gaisser, J. Meier, R. Mueller +""" + +import sys +import atexit +import queue +import socket +import unittest +import argparse + +from test import OBSW_UnitTest +import OBSW_Config as g +from tc.OBSW_TcPacker import PUSTelecommand, createTotalTcQueue, serviceTestSelect +from sendreceive.OBSW_CommandSenderReceiver import CommandSenderReceiver, connectToBoard +from sendreceive.OBSW_SingleCommandSenderReceiver import SingleCommandSenderReceiver +from sendreceive.OBSW_SequentialSenderReceiver import SequentialCommandSenderReceiver + +# Mode options, set by args parser +modeList = { + 0: "GUIMode", + 1: "ListenerMode", + 2: "SingleCommandMode", + 3: "ServiceTestMode", + 4: "SoftwareTestMode", + 5: "OBSWUnitTest" +} + + + +def main(): + args = parseInputArguments() + setGlobals(args) + setUpSocket() + print("Attempting to connect") + connectToBoard() + + if g.modeId == "GUIMode": + pass + elif g.modeId == "ListenerMode": + Receiver = CommandSenderReceiver(g.displayMode, g.tmTimeout, g.tcSendTimeoutFactor, g.printToFile) + Receiver.performListenerMode() + + elif g.modeId == "SingleCommandMode": + pusPacketTuple = commandPreparation() + SenderAndReceiver = SingleCommandSenderReceiver(g.displayMode, pusPacketTuple, g.tmTimeout, + g.tcSendTimeoutFactor, g.printToFile) + SenderAndReceiver.sendSingleTcAndReceiveTm() + + elif g.modeId == "ServiceTestMode": + serviceQueue = queue.Queue() + SenderAndReceiver = SequentialCommandSenderReceiver(g.displayMode, g.tmTimeout, + serviceTestSelect(g.service, serviceQueue), + g.tcSendTimeoutFactor, g.printToFile) + SenderAndReceiver.sendQueueTcAndReceiveTmSequentially() + + elif g.modeId == "SoftwareTestMode": + allTcQueue = createTotalTcQueue() + SenderAndReceiver = SequentialCommandSenderReceiver(g.displayMode, g.tmTimeout, + allTcQueue, g.tcSendTimeoutFactor, g.printToFile) + SenderAndReceiver.sendQueueTcAndReceiveTmSequentially() + + elif g.modeId == "OBSWUnitTest": + # Set up test suite and run it with runner + # Verbosity specifies detail level + suite = unittest.TestLoader().loadTestsFromModule(OBSW_UnitTest) + unittest.TextTestRunner(verbosity=2).run(suite) + + else: + print("Unknown Mode, Configuration error !") + exit() + + +# Prepare command for single command testing +def commandPreparation(): + # Single Command Testing + command = PUSTelecommand(service=17, subservice=1, SSC=21) + command.packCommandTuple() + return command.packCommandTuple() + + +def parseInputArguments(): + argParser = argparse.ArgumentParser(description="TMTC Client Command Line Interface") + + argParser.add_argument('-m', '--mode', type=int, help='Target Mode. Default is 1(Listener Mode), ' + '0: GUI Mode, 1:Listener Mode, ' + '2: Single Command Mode, 3: Service Test Mode, ' + '4: Software Test Mode, 5: Unit Test Mode ', default=0) + argParser.add_argument('-c', '--clientIP', help='Client(Computer) IP. Default:\'\'', default='') + argParser.add_argument('-b', '--boardIP', help='Board IP. Default: 169.254.1.38', default='169.254.1.38') + argParser.add_argument('-s', '--service', help='Service to test. Default: 17', default=17) + argParser.add_argument('-t', '--tmTimeout', type=float, help='TM Timeout. Default: 10)', default=10.0) + argParser.add_argument('-p', '--printFile', help='Supply -p to print output to file. Default: False', + action='store_true') + argParser.add_argument('-o', '--tcTimeoutFactor', type=float, help='TC Timeout Factor. Default: 2.0', default=2.0) + argParser.add_argument('-r', '--rawDataPrint', help='Supply -r to print all raw data directly', action='store_true') + argParser.add_argument('-d', '--shortDisplayMode', help='Supply -d to print short output', action='store_true') + if len(sys.argv) == 1: + print("No Input Arguments specified, setting default values.") + argParser.print_help() + args = argParser.parse_args() + print(args) + return args + + +def setGlobals(args): + if args.shortDisplayMode: + displayMode = "short" + else: + displayMode = "long" + # Board IP address and ethernet port IP address can be passed optionally + # by passing line parameter In PyCharm: Set parameters in run configuration + # Add IP address of Ethernet port. Use command ipconfig in windows console or ifconfig in Linux. + portReceive = 2008 + recAddress = (args.clientIP, portReceive) + # Static IP of board + portSend = 7 + sendAddress = (args.boardIP, portSend) + if 0 <= args.mode <= 5: + modeId = modeList[args.mode] + else: + print("Invalid mode argument, setting default mode (1)") + modeId = modeList[1] + service = str(args.service) + if service.isdigit(): + service = int(args.service) + else: + service = args.service + g.recAddress = recAddress + g.sendAddress = sendAddress + g.sockReceive = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + g.sockSend = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + g.modeId = modeId + g.tmTimeout = args.tmTimeout + g.printRawTmData = args.rawDataPrint + g.displayMode = displayMode + g.service = service + g.printToFile = args.printFile + + +def setUpSocket(): + g.sockReceive.bind(g.recAddress) + g.sockReceive.setblocking(False) + + +def keyboardInterruptHandler(): + print("The End !") + disconnect = bytearray([0, 0, 0, 0, 0]) + g.sockSend.sendto(disconnect, g.sendAddress) + + +if __name__ == "__main__": + atexit.register(keyboardInterruptHandler) + main() + diff --git a/gui/OBSW_TmtcGUI.py b/gui/OBSW_TmtcGUI.py new file mode 100644 index 0000000000000000000000000000000000000000..09599a96f96d213b85d7e79c1171c60430b43d7b --- /dev/null +++ b/gui/OBSW_TmtcGUI.py @@ -0,0 +1,28 @@ +#!/usr/bin/python3.7 +# -*- coding: utf-8 -*- +""" +@file + OBSW_TmtcGUI.py +@date + 01.11.2019 +@brief + This is part of the TMTC client developed by the SOURCE project by KSat +@description + GUI Testing for TMTC client +@manual +@author: + R. Mueller +""" +from tkinter import * +import OBSW_Config as g + +# A first simple version has drop down menus to chose all necessary options +# which are normally handled by the args parser. +# when pressing save, all chosen values get passed to the globals file (OBSW_Config) +# To start the program, another button with start needs to be set up. +# A third button to perform a keyboard interrupt should be implemented +# include a really nice source badge and make it large ! +# plan this on paper first... +window = Tk() +window.title("Hallo Welt") +window.mainloop() diff --git a/gui/__init__.py b/gui/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/sendreceive/OBSW_CommandSenderReceiver.py b/sendreceive/OBSW_CommandSenderReceiver.py new file mode 100644 index 0000000000000000000000000000000000000000..7c8259b3abde80d4bbacb47cb314489a0b297130 --- /dev/null +++ b/sendreceive/OBSW_CommandSenderReceiver.py @@ -0,0 +1,115 @@ +""" +Program: OBSW_UnitTest.py +Date: 01.11.2019 +Description: All functions related to TmTc Sending and Receiving, used by UDP client + +Manual: +Set up the UDP client as specified in the header comment and use the unit testing mode + +A separate thread is used to listen for replies and send a new telecommand +if the first reply has not been received. +This is still experimental. + +@author: R. Mueller +""" +import OBSW_Config as g +import select +import time +from utility.OBSW_TmTcPrinter import TmtcPrinter +from tm.OBSW_TmPacket import PUSTelemetryFactory + + +# Generic TMTC SendReceive class which is implemented +# by specific implementations (e.g. SingleCommandSenderReceiver) +class CommandSenderReceiver: + def __init__(self, displayMode, tmTimeout, tcSendTimeoutFactor, doPrintToFile): + self.displayMode = displayMode + # TODO: Make this variable modifiable ! + self.timeoutInSeconds = tmTimeout + self.replyReceived = False + self.tmReady = False + self.pusPacketInfo = [] + self.pusPacket = [] + self.tmtcPrinter = TmtcPrinter(self.displayMode, doPrintToFile) + self.start_time = 0 + self.elapsed_time = 0 + self.timeoutCounter = 0 + self.tcSendTimeoutFactor = tcSendTimeoutFactor + self.doPrintToFile = doPrintToFile + + # checks for replies. if no reply is received, send telecommand again + def checkForFirstReply(self): + self.tmReady = select.select([g.sockReceive], [], [], self.timeoutInSeconds * self.tcSendTimeoutFactor) + if self.tmReady[0]: + self.checkForOneTelemetrySequence() + self.replyReceived = True + else: + if len(self.pusPacket) == 0: + print("No command has been sent yet") + else: + self.sendTelecommand() + + # check for one sequence of replies for a telecommand (e.g. TM[1,1] , TM[1,7] ...) + def checkForOneTelemetrySequence(self): + while self.tmReady[0]: + packet = self.receiveTelemetry() + self.tmtcPrinter.printTelemetry(packet) + self.tmReady = select.select([g.sockReceive], [], [], self.timeoutInSeconds / 1.5) + + def sendTelecommand(self): + self.tmtcPrinter.displaySentCommand(self.pusPacketInfo, self.pusPacket, self.displayMode) + g.sockSend.sendto(self.pusPacket, g.sendAddress) + + def performListenerMode(self): + timeoutInSeconds = 10 + while True: + print("Listening for packages ...") + ready = select.select([g.sockReceive], [], [], timeoutInSeconds) + if ready[0]: + packet = self.receiveTelemetry() + self.tmtcPrinter.printTelemetry(packet) + + def checkForTimeout(self): + if self.timeoutCounter == 5: + print("No response from command !") + exit() + if self.start_time != 0: + self.elapsed_time = time.time() - self.start_time + if self.elapsed_time > self.timeoutInSeconds * self.tcSendTimeoutFactor: + print("Timeout ! Sending Telecommand again") + self.sendTelecommand() + self.timeoutCounter = self.timeoutCounter + 1 + self.start_time = time.time() + time.sleep(3) + + @staticmethod + def receiveTelemetry(): + data = g.sockReceive.recvfrom(1024)[0] + packet = PUSTelemetryFactory(data) + return packet + + @staticmethod + def receiveTelemetryDeserializeAndStore(tmQueue): + data = g.sockReceive.recvfrom(1024)[0] + packet = PUSTelemetryFactory(data) + tmQueue.put(packet) + return tmQueue + + @staticmethod + def receiveTelemetryAndStoreTuple(tmTupleQueue): + data = g.sockReceive.recvfrom(1024)[0] + tmInfo = PUSTelemetryFactory(data).packTmInformation() + tmTuple = (data, tmInfo) + tmTupleQueue.put(tmTuple) + return tmTuple + + +def connectToBoard(): + # Maybe there is a cleaner way to start comm with udp server + # so that it knows the ip address? + test = bytearray([]) + g.sockSend.sendto(test, g.sendAddress) + # send multiple times to get connection if there are problems + + + diff --git a/sendreceive/OBSW_MultipleCommandsSenderReceiver.py b/sendreceive/OBSW_MultipleCommandsSenderReceiver.py new file mode 100644 index 0000000000000000000000000000000000000000..55c62d0f03d1240802d73f3e4d7fb6cbc724f973 --- /dev/null +++ b/sendreceive/OBSW_MultipleCommandsSenderReceiver.py @@ -0,0 +1,104 @@ +""" +@file + OBSW_Config.py +@date + 01.11.2019 +@brief + Used to send multiple TCs as bursts and listen for replies simultaneously. Used by UnitTester +""" +import OBSW_Config as g +from sendreceive.OBSW_SequentialSenderReceiver import SequentialCommandSenderReceiver +import queue +import time +import threading +import select + + +# Difference to seqential sender: This class can send TCs in bursts. Wait intervals can be specified with +# wait time between the send bursts. This is generally done in the separate test classes in UnitTest +class MultipleCommandSenderReceiver(SequentialCommandSenderReceiver): + def __init__(self, displayMode, tcQueue, tmTimeout, waitIntervals, + waitTime, printTm, printTc, tcTimeoutFactor, doPrintToFile): + super().__init__(displayMode, tmTimeout, tcQueue, tcTimeoutFactor, doPrintToFile) + self.waitIntervals = waitIntervals + self.waitTime = waitTime + self.printTm = printTm + self.printTc = printTc + self.tmInfoQueue = queue.Queue() + self.tcInfoQueue = queue.Queue() + self.pusPacketInfo = [] + self.pusPacket = [] + + def sendTcQueueAndReturnTcInfo(self): + receiverThread = threading.Thread(target=self.checkForMultipleReplies) + receiverThread.start() + self.sendAllQueue() + try: + self.handleTcSending() + except (KeyboardInterrupt, SystemExit): + super().handleInterrupt(receiverThread) + return self.tcInfoQueue, self.tmInfoQueue + + def handleTcSending(self): + while not self.allRepliesReceived: + if self.tcQueue.empty(): + if self.start_time == 0: + print("TC queue empty") + self.start_time = time.time() + self.checkForTimeout() + if self.doPrintToFile: + self.tmtcPrinter.printToFile() + + def sendAllQueue(self): + waitCounter = 0 + while not self.tcQueue.empty(): + self.sendAndPrintTc() + waitCounter = self.handleWaiting(waitCounter) + + def sendAndPrintTc(self): + (self.pusPacketInfo, self.pusPacket) = self.tcQueue.get() + if self.printTc: + self.tmtcPrinter.displaySentCommand(self.pusPacketInfo, self.pusPacket, self.displayMode) + self.tcInfoQueue.put(self.pusPacketInfo) + g.sockSend.sendto(self.pusPacket, g.sendAddress) + + def handleWaiting(self, waitCounter): + time.sleep(0.5) + waitCounter = waitCounter + 1 + if waitCounter in self.waitIntervals: + time.sleep(self.waitTime) + return waitCounter + + def checkForMultipleReplies(self): + super().checkForMultipleReplies() + + def handleReplyListening(self): + super().handleReplyListening() + + def handleTelemetrySequence(self): + if self.run_event.is_set(): + self.checkForOneTelemetrySequence() + if self.tcQueue.empty(): + print("Setting flag all reply received in handleTelemetrySequence") + self.allRepliesReceived = True + + def handleFirstReplyListening(self): + super().handleFirstReplyListening() + + # check for one sequence of replies for a telecommand (e.g. TM[1,1] , TM[1,7] ...) + def checkForOneTelemetrySequence(self): + while self.tmReady[0]: + self.receiveTelemetryAndStoreInformation() + self.tmReady = select.select([g.sockReceive], [], [], self.timeoutInSeconds / 1.5) + print("I checked the sequence") + if self.tcQueue.empty(): + print("After checking, the TC queue is empty") + self.allRepliesReceived = True + + def receiveTelemetryAndStoreInformation(self): + packet = self.receiveTelemetry() + if self.printTm: + self.tmtcPrinter.printTelemetry(packet) + tmInfo = packet.packTmInformation() + self.tmInfoQueue.put(tmInfo) + diff --git a/sendreceive/OBSW_SequentialSenderReceiver.py b/sendreceive/OBSW_SequentialSenderReceiver.py new file mode 100644 index 0000000000000000000000000000000000000000..0ec83305fbc44d57616c98e29e723afcdb479413 --- /dev/null +++ b/sendreceive/OBSW_SequentialSenderReceiver.py @@ -0,0 +1,112 @@ +#!/usr/bin/python3.7 +# -*- coding: utf-8 -*- +""" +@file + OBSW_Config.py +@date + 01.11.2019 +@brief + Used to send multiple TCs in sequence and listen for replies after each sent tc +""" +from sendreceive.OBSW_CommandSenderReceiver import CommandSenderReceiver, connectToBoard +import threading +import OBSW_Config as g +import time +import select + + +# Specific implementation of CommandSenderReceiver to send multiple telecommands in sequence +class SequentialCommandSenderReceiver(CommandSenderReceiver): + def __init__(self, displayMode, tmTimeout, tcQueue, tcTimeoutFactor, doPrintToFile): + super().__init__(displayMode, tmTimeout, tcTimeoutFactor, doPrintToFile) + self.tcQueue = tcQueue + self.firstReplyReceived = False + self.allRepliesReceived = False + self.start_time = 0 + self.elapsed_time = 0 + self.abortFlag = False + self.run_event = threading.Event() + self.run_event.set() + + def sendQueueTcAndReceiveTmSequentially(self): + receiverThread = threading.Thread(target=self.checkForMultipleReplies) + receiverThread.start() + time.sleep(1) + self.sendAndReceiveFirstPacket() + # this flag is set in the separate thread ! + try: + self.handleTcSending() + except (KeyboardInterrupt, SystemExit): + self.handleInterrupt(receiverThread) + + def handleTcSending(self): + while not self.allRepliesReceived: + while not self.tcQueue.empty(): + self.performNextTcSend() + if self.tcQueue.empty(): + print("TC queue empty") + self.checkForTimeout() + if self.doPrintToFile: + print("Exporting output to log file") + self.tmtcPrinter.printToFile() + self.performListenerMode() + + def handleInterrupt(self, receiverThread): + print("Keyboard Interrupt detected") + self.run_event.clear() + receiverThread.join() + + def performNextTcSend(self): + # this flag is set in the separate receiver thread too + if self.replyReceived: + self.start_time = time.time() + self.sendNextTelecommand() + # just calculate elapsed time if start time has already been set (= command has been sent) + else: + self.checkForTimeout() + + def sendAndReceiveFirstPacket(self): + (self.pusPacketInfo, self.pusPacket) = self.tcQueue.get() + self.sendTelecommand() + + def sendNextTelecommand(self): + (self.pusPacketInfo, self.pusPacket) = self.tcQueue.get() + if self.pusPacketInfo == "wait": + waitTime = self.pusPacket + time.sleep(waitTime) + else: + self.tmtcPrinter.displaySentCommand(self.pusPacketInfo, self.pusPacket, self.displayMode) + self.replyReceived = False + g.sockSend.sendto(self.pusPacket, g.sendAddress) + + # this function runs is a separate thread andchecks for replies. If the first reply has not been received, + # it attempts to send the telecommand again. + # if the tc queue is empty and the last telemetry sequence has been received, + # a flag is set to transition into listener mode + def checkForMultipleReplies(self): + while not self.allRepliesReceived and self.run_event.is_set(): + # listen for duration timeoutInSeconds for replies + self.handleReplyListening() + self.tmReady = select.select([g.sockReceive], [], [], 5.0) + + def handleReplyListening(self): + if self.firstReplyReceived: + if self.tmReady[0]: + self.handleTelemetrySequence() + else: + self.handleFirstReplyListening() + + def handleTelemetrySequence(self): + if self.run_event.is_set(): + self.checkForOneTelemetrySequence() + # set this flag so the other thread can send the next telecommand + self.replyReceived = True + if self.tcQueue.empty(): + self.allRepliesReceived = True + + def handleFirstReplyListening(self): + while not self.firstReplyReceived and self.run_event.is_set(): + # this function sends the telecommand again if no reply is received + self.checkForFirstReply() + if self.replyReceived: + self.firstReplyReceived = True diff --git a/sendreceive/OBSW_SingleCommandSenderReceiver.py b/sendreceive/OBSW_SingleCommandSenderReceiver.py new file mode 100644 index 0000000000000000000000000000000000000000..be0950b0644b9a071357cd33b2e700fc7a26ef4c --- /dev/null +++ b/sendreceive/OBSW_SingleCommandSenderReceiver.py @@ -0,0 +1,38 @@ +#!/usr/bin/python3.7 +# -*- coding: utf-8 -*- +""" +@file + OBSW_Config.py +@date + 01.11.2019 +@brief + Used to send single tcs and listen for replies after that +""" +from sendreceive.OBSW_CommandSenderReceiver import CommandSenderReceiver +import OBSW_Config as g +import threading +import time + + +# Specific implementation of CommandSenderReceiver to send a single telecommand +class SingleCommandSenderReceiver(CommandSenderReceiver): + def __init__(self,displayMode, pusPacketTuple, tmTimeout, tcTimeoutFactor, doPrintToFile): + super().__init__(displayMode, tmTimeout, tcTimeoutFactor, doPrintToFile) + self.pusPacketTuple = pusPacketTuple + (self.pusPacketInfo, self.pusPacket) = self.pusPacketTuple + + def sendSingleTcAndReceiveTm(self): + self.tmtcPrinter.displaySentCommand(self.pusPacketInfo, self.pusPacket, self.displayMode) + print("Starting listener thread") + threading.Thread(target=self.receiveReply).start() + g.sockSend.sendto(self.pusPacket, g.sendAddress) + while not self.replyReceived: + # wait until reply is received + time.sleep(3) + if self.replyReceived: + self.performListenerMode() + + # runs in separate thread. sends TC again if no TM is received after timeout + def receiveReply(self): + while not self.replyReceived: + super().checkForFirstReply() diff --git a/tc/OBSW_TcPacker.py b/tc/OBSW_TcPacker.py new file mode 100644 index 0000000000000000000000000000000000000000..d011e3be70bbeaa76f5bb2d404dedcee0ffb7a3a --- /dev/null +++ b/tc/OBSW_TcPacker.py @@ -0,0 +1,231 @@ +# -*- coding: utf-8 -*- +from tc.OBSW_TcPacket import * +import struct +from datetime import datetime +import queue + + +def serviceTestSelect(service, serviceQueue): + if service == 2: + return packService2TestInto(serviceQueue) + elif service == 5: + return packService5TestInto(serviceQueue) + elif service == 8: + return packService8TestInto(serviceQueue) + elif service == 9: + return packService9TestInto(serviceQueue) + elif service == 17: + return packService17TestInto(serviceQueue) + elif service == 200: + return packService200TestInto(serviceQueue) + elif service == "Dummy": + return packDummyDeviceTestInto(serviceQueue) + elif service == "GPS": + # Object ID: GPS Device + objectId = bytearray([0x44, 0x00, 0x1F, 0x00]) + return packGpsTestInto(objectId, serviceQueue) + elif service == "rGPS": + # Object ID: GPS Device + objectId = bytearray([0x44, 0x00, 0x20, 0x00]) + return packGpsTestInto(objectId, serviceQueue) + elif service == "Error": + return packErrorTestingInto(serviceQueue) + else: + print("Invalid Service !") + exit() + + +def packService2TestInto(tcQueue, builderQueue=0): + objectId = bytearray([0x44, 0x00, 0xAF, 0xFE]) # dummy device + # don't forget to set object mode raw (service 200) before calling this ! + # Set Raw Mode + modeData = packModeData(objectId, 3, 0) + command = PUSTelecommand(service=200, subservice=1, SSC=2020, data=modeData) + tcQueue.put(command.packCommandTuple()) + # toggle wiretapping raw + wiretappingToggleData = packWiretappingMode(objectId, 1) + toggleWiretappingOnCommand = PUSTelecommand(service=2, subservice=129, SSC=200, data=wiretappingToggleData) + tcQueue.put(toggleWiretappingOnCommand.packCommandTuple()) + # send raw command, data should be returned via TM[2,130] and TC[2,131] + rawCommand = struct.pack(">I", 666) + rawData = objectId + rawCommand + rawCommand = PUSTelecommand(service=2, subservice=128, SSC=201, data=rawData) + tcQueue.put(rawCommand.packCommandTuple()) + # toggle wiretapping off + wiretappingToggleData = packWiretappingMode(objectId, 0) + toggleWiretappingOffCommand = PUSTelecommand(service=2, subservice=129, SSC=204, data=wiretappingToggleData) + tcQueue.put(toggleWiretappingOffCommand.packCommandTuple()) + # send raw command which should be returned via TM[2,130] + command = PUSTelecommand(service=2, subservice=128, SSC=205, data=rawData) + tcQueue.put(command.packCommandTuple()) + return tcQueue + + +# wiretappingMode = 0: MODE_OFF, wiretappingMode = 1: MODE_RAW +def packWiretappingMode(objectId, wiretappingMode_): + wiretappingMode = struct.pack(">B", wiretappingMode_) # MODE_OFF : 0x00, MODE_RAW: 0x01 + wiretappingToggleData = objectId + wiretappingMode + return wiretappingToggleData + + +def packService5TestInto(tcQueue): + # disable events + command = PUSTelecommand(service=5, subservice=6, SSC=500) + tcQueue.put(command.packCommandTuple()) + # trigger event + command = PUSTelecommand(service=17, subservice=128, SSC=510) + tcQueue.put(command.packCommandTuple()) + # enable event + command = PUSTelecommand(service=5, subservice=5, SSC=520) + tcQueue.put(command.packCommandTuple()) + # trigger event + command = PUSTelecommand(service=17, subservice=128, SSC=530) + tcQueue.put(command.packCommandTuple()) + return tcQueue + + +def packService9TestInto(tcQueue): + timeTestASCIICodeA = '2019-08-30T20:50:33.892429Z' + '\0' + timeTestASCIICodeB = '2019-270T05:50:33.002000Z' + '\0' + timeTestCurrentTime = datetime.now().isoformat() + "Z" + '\0' + timeTest1 = timeTestASCIICodeA.encode('ascii') + timeTest2 = timeTestASCIICodeB.encode('ascii') + timeTest3 = timeTestCurrentTime.encode('ascii') + print("Time Code 1 :" + str(timeTest1)) + print("Time Code 2 :" + str(timeTest2)) + print("Time Code 3 :" + str(timeTest3)) + # time setting + command = PUSTelecommand(service=9, subservice=128, SSC=900, data=timeTest1) + tcQueue.put(command.packCommandTuple()) + command = PUSTelecommand(service=9, subservice=128, SSC=910, data=timeTest2) + tcQueue.put(command.packCommandTuple()) + command = PUSTelecommand(service=9, subservice=128, SSC=920, data=timeTest3) + tcQueue.put(command.packCommandTuple()) + # TODO: Add other time formats here + return tcQueue + + +def packService8TestInto(tcQueue): + objectId = bytearray([0x44, 0x00, 0xAF, 0xFE]) + # set mode normal + modeData = packModeData(objectId, 2, 0) + command = PUSTelecommand(service=200, subservice=1, SSC=3, data=modeData) + tcQueue.put(command.packCommandTuple()) + # Direct command which triggers completion reply + actionId = struct.pack(">I", 666) + directCommand = objectId + actionId + command = PUSTelecommand(service=8, subservice=128, SSC=810, data=directCommand) + tcQueue.put(command.packCommandTuple()) + # Direct command which triggers data reply + actionId = bytearray([0xC0, 0xC0, 0xBA, 0xBE]) + commandParam1 = bytearray([0xBA, 0xB0]) + commandParam2 = bytearray([0x00, 0x00, 0x00, 0x52, 0x4F, 0x42, 0x49, 0x4E]) + directCommand = objectId + actionId + commandParam1 + commandParam2 + command = PUSTelecommand(service=8, subservice=128, SSC=820, data=directCommand) + tcQueue.put(command.packCommandTuple()) + # Direct command which triggers an additional step reply and one completion reply + actionId = bytearray([0xBA, 0xDE, 0xAF, 0xFE]) + directCommand = objectId + actionId + command = PUSTelecommand(service=8, subservice=128, SSC=830, data=directCommand) + tcQueue.put(command.packCommandTuple()) + return tcQueue + + +def packService17TestInto(tcQueue): + # ping test + command = PUSTelecommand(service=17, subservice=1, SSC=1700) + tcQueue.put(command.packCommandTuple()) + # enable event + command = PUSTelecommand(service=5, subservice=5, SSC=52) + tcQueue.put(command.packCommandTuple()) + # test event + command = PUSTelecommand(service=17, subservice=128, SSC=1701) + tcQueue.put(command.packCommandTuple()) + return tcQueue + + +def packService200TestInto(tcQueue): + # Object ID: Dummy Device + objectId = bytearray([0x44, 0x00, 0xAF, 0xFE]) + # Set On Mode + modeData = packModeData(objectId, 1, 0) + command = PUSTelecommand(service=200, subservice=1, SSC=2000, data=modeData) + tcQueue.put(command.packCommandTuple()) + # Set Normal mode + modeData = packModeData(objectId, 2, 0) + command = PUSTelecommand(service=200, subservice=1, SSC=2010, data=modeData) + tcQueue.put(command.packCommandTuple()) + # Set Raw Mode + modeData = packModeData(objectId, 3, 0) + command = PUSTelecommand(service=200, subservice=1, SSC=2020, data=modeData) + tcQueue.put(command.packCommandTuple()) + # Set Off Mode + modeData = packModeData(objectId, 0, 0) + command = PUSTelecommand(service=200, subservice=1, SSC=2030, data=modeData) + tcQueue.put(command.packCommandTuple()) + return tcQueue + + +def packDummyDeviceTestInto(tcQueue): + # Object ID: Dummy Device + objectId = bytearray([0x44, 0x00, 0xAF, 0xFE]) + # Set On Mode + modeData = packModeData(objectId, 1, 0) + command = PUSTelecommand(service=200, subservice=1, SSC=1, data=modeData) + tcQueue.put(command.packCommandTuple()) + # Test Service 2 commands + packService2TestInto(tcQueue) + # Test Service 8 + packService8TestInto(tcQueue) + return tcQueue + + +def packGpsTestInto(objectId, tcQueue): + # Set Mode Off + modeData = packModeData(objectId, 0, 0) + command = PUSTelecommand(service=200, subservice=1, SSC=11, data=modeData) + tcQueue.put(command.packCommandTuple()) + # Set Mode On + modeData = packModeData(objectId, 1, 0) + command = PUSTelecommand(service=200, subservice=1, SSC=12, data=modeData) + tcQueue.put(command.packCommandTuple()) + # pack wait interval until mode is one + tcQueue.put(("wait", 5)) + # Set Mode Off + modeData = packModeData(objectId, 0, 0) + command = PUSTelecommand(service=200, subservice=1, SSC=13, data=modeData) + tcQueue.put(command.packCommandTuple()) + return tcQueue + + +# Mode 0: Off, Mode 1: Mode On, Mode 2: Mode Normal, Mode 3: Mode Raw +def packModeData(objectId, mode_, submode_): + # Normal mode + mode = struct.pack(">I", mode_) + # Submode default + submode = struct.pack('B', submode_) + modeData = objectId + mode + submode + return modeData + + +def packErrorTestingInto(tcQueue): + # a lot of events + command = PUSTelecommand(service=17, subservice=129, SSC=2010) + tcQueue.put(command.packCommandTuple()) + # a lot of ping testing + command = PUSTelecommand(service=17, subservice=130, SSC=2020) + tcQueue.put(command.packCommandTuple()) + return tcQueue + + +def createTotalTcQueue(): + tcQueue = queue.Queue() + tcQueue = packService2TestInto(tcQueue) + tcQueue = packService5TestInto(tcQueue) + tcQueue = packService8TestInto(tcQueue) + tcQueue = packService9TestInto(tcQueue) + tcQueue = packService17TestInto(tcQueue) + tcQueue = packService200TestInto(tcQueue) + tcQueue = packDummyDeviceTestInto(tcQueue) + tcQueue = packDummyGpsTestInto(tcQueue) + return tcQueue diff --git a/tc/OBSW_TcPacket.py b/tc/OBSW_TcPacket.py new file mode 100644 index 0000000000000000000000000000000000000000..9548dad4c5ea656650bd28fdb5b712ad7f05c72d --- /dev/null +++ b/tc/OBSW_TcPacket.py @@ -0,0 +1,122 @@ +# -*- coding: utf-8 -*- +""" +Created on Wed Apr 4 11:43:00 2018 + +@author: S. Gaisser, R. Mueller +""" + +import crcmod + + +class PUSTelecommand: + headerSize = 6 + + def __init__(self, service: int, subservice: int, SSC=0, data=bytearray([]), sourceId=0, version=0, + packetType=1, dataFieldHeaderFlag=1, apid=0x73, length=0): + self.packetId = [0x0, 0x0] + self.packetId[0] = ((version << 5) & 0xE0) | ((packetType & 0x01) << 4) | \ + ((dataFieldHeaderFlag & 0x01) << 3) | ((apid & 0x700) >> 8) + self.packetId[1] = apid & 0xFF + self.ssc = SSC + self.psc = (SSC & 0x3FFF) | (0xC0 << 8) + self.length = length + self.pusVersionAndAckByte = 0b00011111 + self.service = service + self.subservice = subservice + self.sourceId = sourceId + self.data = data + # print("Apid:" + str(self.apid)) + + def getLength(self): + return 4 + len(self.data) + 1 + + def pack(self): + dataToPack = bytearray() + dataToPack.append(self.packetId[0]) + dataToPack.append(self.packetId[1]) + dataToPack.append((self.psc & 0xFF00) >> 8) + dataToPack.append(self.psc & 0xFF) + length = self.getLength() + dataToPack.append((length & 0xFF00) >> 8) + dataToPack.append(length & 0xFF) + dataToPack.append(self.pusVersionAndAckByte) + dataToPack.append(self.service) + dataToPack.append(self.subservice) + dataToPack.append(self.sourceId) + dataToPack += self.data + crc_func = crcmod.mkCrcFun(0x11021, rev=False, initCrc=0xFFFF, xorOut=0x0000) + crc = crc_func(dataToPack) + + dataToPack.append((crc & 0xFF00) >> 8) + dataToPack.append(crc & 0xFF) + return dataToPack + + def packInformation(self): + tcInformation = { + "service": self.service, + "subservice": self.subservice, + "ssc": self.ssc, + "packetId": self.packetId, + "data": self.data + } + return tcInformation + + def packCommandTuple(self): + commandTuple = (self.packInformation(), self.pack()) + return commandTuple + + +# Takes pusPackets, removes current Packet Error Control, calculates new CRC (16 bits at packet end) and +# adds it as correct Packet Error Control Code. Reference: ECSS-E70-41A p. 207-212 +def generatePacketCRC(TCPacket): + crc_func = crcmod.mkCrcFun(0x11021, rev=False, initCrc=0xFFFF, xorOut=0x0000) + crc = crc_func(bytearray(TCPacket[0:len(TCPacket) - 2])) + TCPacket[len(TCPacket) - 2] = (crc & 0xFF00) >> 8 + TCPacket[len(TCPacket) - 1] = crc & 0xFF + + +def generateCRC(data): + dataWithCRC = bytearray() + dataWithCRC += data + crc_func = crcmod.mkCrcFun(0x11021, rev=False, initCrc=0xFFFF, xorOut=0x0000) + crc = crc_func(data) + dataWithCRC.append((crc & 0xFF00) >> 8) + dataWithCRC.append(crc & 0xFF) + return dataWithCRC + +# Structure of a PUS TC Packet : +# A PUS packet consists of consecutive bits, the allocation and structure is standardised. +# Extended information can be found in ECSS-E-70-41A on p.42 +# The easiest form to send a PUS Packet is in hexadecimal form. +# A two digit hexadecimal number equals one byte, 8 bits or one octet +# o = optional, Srv = Service +# +# The structure is shown as follows for TC[17,1] +# 1. Structure Header +# 2. Structure Subheader +# 3. Component (size in bits) +# 4. Hexadecimal number +# 5. Binary Number +# 6. Decimal Number +# +# -------------------------------------------Packet Header(48)------------------------------------------| Packet | +# ----------------Packet ID(16)----------------------|Packet Sequence Control (16)| Packet Length (16) | Data Field | +# Version | Type(1) |Data Field |APID(11) | SequenceFlags(2) |Sequence | | (Variable) | +# Number(3) | |Header Flag(1)| | |Count(14)| | | +# 0x18 | 0x73 | 0xc0 | 0x19 | 0x00 | 0x04 | | +# 000 1 1 000| 01110011 | 11 000000 | 00011001|00000000 | 0000100 | | +# 0 | 1 | 1 | 115(ASCII s) | 3 | 25 | 0 | 4 | | +# +# - Packet Length is an unsigned integer C = Number of Octets in Packet Data Field - 1 +# +# Packet Data Field Structure: +# +# ------------------------------------------------Packet Data Field------------------------------------------------- | +# ---------------------------------Data Field Header ---------------------------|AppData|Spare| PacketErrCtr | +# CCSDS(1)|TC PUS Ver.(3)|Ack(4)|SrvType (8)|SrvSubtype(8)|Source ID(o)|Spare(o)| (var)|(var)| (16) | +# 0x11 (0x1F) | 0x11 | 0x01 | | | | | 0xA0 | 0xB8 | +# 0 001 1111 |00010001 | 00000001 | | | | | | | +# 0 1 1111 | 17 | 1 | | | | | | | +# +# - The source ID is present as one byte. For now, ground = 0x00. + diff --git a/test/OBSW_UnitTest.py b/test/OBSW_UnitTest.py new file mode 100644 index 0000000000000000000000000000000000000000..52633cbe65efffad8cc704004af18f2cbdb7f84e --- /dev/null +++ b/test/OBSW_UnitTest.py @@ -0,0 +1,251 @@ +# -*- coding: utf-8 -*- +""" +Program: OBSW_UnitTest.py +Date: 01.11.2019 +Description: Unit test of on-board software, used by the UDP client in software test mode + +Manual: +Set up the UDP client as specified in the header comment and use the unit testing mode + +For Developers: +TestService is the template method, analyseTmInfo and analyseTcInfo are implemented for +specific services or devices by setting up assertionDict entries which count received packets +based on subservice or entry values. + +Example Service 17: +TC[17,1] and TC[17,128] are sent, TM[1,1] and TM[1,7] (TC verification) are expected twice, TM[17,2] (ping reply) +is expected twice and TM[5,1] (test event) is expected once. Note that the TC verification counting is +done by the template class by comparing with TC source sequence count. For PUS standalone services (PUS Service Base), +only the start and completion success need to be asserted. For PUS gateway services (PUS Commanding Service Base), +step verification needs to be asserted additionally. If there are commands with multiple steps, this needs +to be specified in the analyseTcInfo method of the child test. + +@author: R. Mueller +""" +import unittest +import queue +from tc.OBSW_TcPacker import packService17TestInto, packService5TestInto, packDummyDeviceTestInto +from sendreceive.OBSW_MultipleCommandsSenderReceiver import MultipleCommandSenderReceiver +from OBSW_UdpClient import connectToBoard + + +class TestService(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.displayMode = "long" + # default timeout for receiving TM, set in subclass manually + cls.tmTimeout = 15 + # wait intervals between tc send bursts. + # Example: [2,4] sends to send 2 tc from queue and wait, then sends another 2 and wait again + cls.waitIntervals = [] + cls.printTm = True + cls.printTc = True + # default wait time between tc send bursts + cls.waitTime = 5.0 + cls.testQueue = queue.Queue() + cls.tcTimeoutFactor = 2.0 + connectToBoard() + + def performTestingAndGenerateAssertionDict(self): + UnitTester = MultipleCommandSenderReceiver(self.displayMode, self.testQueue, self.tmTimeout, + self.waitIntervals, self.waitTime, self.printTm, + self.printTc, self.tcTimeoutFactor, False) + (tcInfoQueue, tmInfoQueue) = UnitTester.sendTcQueueAndReturnTcInfo() + assertionDict = self.analyseTmTcInfo(tmInfoQueue, tcInfoQueue) + return assertionDict + + def analyseTmTcInfo(self, tmInfoQueue, tcInfoQueue): + self.tcSscArray = [] + self.tcServiceArray = [] + self.tcSubserviceArray = [] + + # these number of TCs sent which need need to be verified + # separate step counter if there is more than one step for a command ! + self.tcVerifyCounter = 0 + self.tcVerifyStepCounter = 0 + # child test implements this + self.analyseTcInfo(tcInfoQueue) + + # These values are incremented in the analyseTmInfo function + # and compared to the counter values + self.tcVerifiedStart = 0 + self.tcVerifiedCompletion = 0 + self.tcVerifiedStep = 0 + # The expected values are set in child test + self.eventCounter = 0 + self.miscCounter = 0 + self.valid = True + + assertionDict = {} + # child test implements this and can add additional dict entries + self.analyseTmInfo(tmInfoQueue, assertionDict) + + assertionDict.update({ + "TcStartCount": self.tcVerifiedStart, + "TcCompletionCount": self.tcVerifiedCompletion, + "TcStepCount": self.tcVerifiedStep, + "EventCount": self.eventCounter, + "MiscCount": self.miscCounter, + "Valid": self.valid + }) + + return assertionDict + + def analyseTcInfo(self, tcInfoQueue): + while not tcInfoQueue.empty(): + currentTcInfo = tcInfoQueue.get() + self.tcVerifyCounter = self.tcVerifyCounter + 1 + # For commands with multiple steps, update this value manually ! + self.tcVerifyStepCounter = self.tcVerifyCounter + 1 + self.tcSscArray.append(currentTcInfo["ssc"]) + self.tcServiceArray.append(currentTcInfo["service"]) + self.tcSubserviceArray.append(currentTcInfo["subservice"]) + + # this function looks whether the tc verification SSC matched + # a SSC of the sent TM + def scanForRespectiveTc(self, currentTmInfo): + currentSubservice = currentTmInfo["subservice"] + for possibleIndex, searchIndex in enumerate(self.tcSscArray): + if searchIndex == currentTmInfo["tcSSC"]: + if currentSubservice == 1: + self.tcVerifiedStart = self.tcVerifiedStart + 1 + elif currentSubservice == 5: + self.tcVerifiedStep = self.tcVerifiedStep + 1 + elif currentSubservice == 7: + self.tcVerifiedCompletion = self.tcVerifiedCompletion + 1 + + # these tests are identical + def performService5or17Test(self): + assertionDict = self.performTestingAndGenerateAssertionDict() + self.eventExpected = 1 + self.miscExpected = 2 + self.assertEqual(assertionDict["TcStartCount"], self.tcVerifyCounter) + self.assertEqual(assertionDict["TcCompletionCount"], self.tcVerifyCounter) + self.assertEqual(assertionDict["EventCount"], self.eventExpected) + self.assertEqual(assertionDict["MiscCount"], self.miscExpected) + self.assertTrue(assertionDict["Valid"]) + + def analyseService5or17TM(self, tmInfoQueue, assertionDict): + self.miscCounter = 0 + while not tmInfoQueue.empty(): + currentTmInfo = tmInfoQueue.get() + # Tc verification scanning is generic and has been moved to the superclass + if currentTmInfo["service"] == 1: + self.scanForRespectiveTc(currentTmInfo) + # Here, the desired event Id or RID can be specified + if currentTmInfo["service"] == 5: + if currentTmInfo["EventID"] == 8200 and currentTmInfo["RID"] == 0x51001700: + self.eventCounter = self.eventCounter + 1 + if currentTmInfo["service"] == 17: + self.miscCounter = self.miscCounter + 1 + if currentTmInfo["valid"] == 0: + self.valid = False + assertionDict.update({"MiscCount": self.miscCounter}) + + @classmethod + def tearDownClass(cls): + cls.eventCounter = 0 + cls.tcVerifyCounter = 0 + cls.testQueue.queue.clear() + + def analyseTmInfo(self, tmInfoQueue, assertionDict): + pass + + +class TestService5(TestService): + + @classmethod + def setUpClass(cls): + super().setUpClass() + print("Testing Service 5") + cls.waitIntervals = [2] + cls.waitTime = 7 + packService5TestInto(cls.testQueue) + + def test_Service5(self): + + # analyseTmInfo() and analyseTcInfo are called here + super().performService5or17Test() + + def analyseTcInfo(self, tcInfoQueue): + super().analyseTcInfo(tcInfoQueue) + + def analyseTmInfo(self, tmInfoQueue, assertionDict): + super().analyseService5or17TM(tmInfoQueue, assertionDict) + + @classmethod + def tearDownClass(cls): + super().tearDownClass() + + +class TestService6(unittest.TestCase): + @classmethod + def setUpClass(cls): + print("Hallo Test 6!") + + +class TestService8(unittest.TestCase): + @classmethod + def setUpClass(cls): + print("Hallo Test 2!") + + +class TestService9(unittest.TestCase): + @classmethod + def setUpClass(cls): + print("Hallo Test 9!") + + +class TestService17(TestService): + @classmethod + def setUpClass(cls): + super().setUpClass() + print("Testing Service 17") + cls.waitIntervals = [2] + cls.waitTime = 2 + packService17TestInto(cls.testQueue) + + def test_Service17(self): + super().performService5or17Test() + + def analyseTmTcInfo(self, tmInfoQueue, tcInfoQueue): + assertionDict = super().analyseTmTcInfo(tmInfoQueue, tcInfoQueue) + # add anything elsee other than tc verification counter and ssc that is needed for tm analysis + return assertionDict + + def analyseTcInfo(self, tcInfoQueue): + super().analyseTcInfo(tcInfoQueue) + + def analyseTmInfo(self, tmInfoQueue, assertionDict): + super().analyseService5or17TM(tmInfoQueue, assertionDict) + + @classmethod + def tearDownClass(cls): + print("Testing Service 17 finished") + super().tearDownClass() + + +class TestDummyDevice(TestService): + @classmethod + def setUpClass(cls): + super().setUpClass() + print("Testing Dummy Device") + packDummyDeviceTestInto(cls.testQueue) + + +if __name__ == '__main__': + unittest.main() + + + + + + + + + + + + + + diff --git a/test/__init__.py b/test/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/tm/OBSW_PusPacket.py b/tm/OBSW_PusPacket.py new file mode 100644 index 0000000000000000000000000000000000000000..d664adc5894f9678a3def40221bbc053d61c993b --- /dev/null +++ b/tm/OBSW_PusPacket.py @@ -0,0 +1,172 @@ +# -*- coding: utf-8 -*- +""" +Created on Wed Apr 4 11:44:48 2018 + +@author: gaisser +""" + +import crcmod +import datetime + + +class PUSPacketHeader: + headerSize = 6 + + def __init__(self, bytesArray): + dataToCheck = bytesArray + if len(bytesArray) < PUSPacketHeader.headerSize: + self.version = 0 + self.type = 0 + self.dataFieldHeaderFlag = 0 + self.apid = 0 + self.segmentationFlag = 0 + self.sourceSequenceCount = 0 + self.length = 0 + self.valid = False + return + self.version = bytesArray[0] >> 5 + self.type = bytesArray[0] & 0x10 + self.dataFieldHeaderFlag = (bytesArray[0] & 0x8) >> 3 + self.apid = ((bytesArray[0] & 0x7) << 8) | bytesArray[1] + self.segmentationFlag = (bytesArray[2] & 0xC0) >> 6 + self.sourceSequenceCount = ((bytesArray[2] & 0x3F) << 8) | bytesArray[3] + self.length = bytesArray[4] << 8 | bytesArray[5] + self.valid = False + crc_func = crcmod.mkCrcFun(0x11021, rev=False, initCrc=0xFFFF, xorOut=0x0000) + if len(dataToCheck) < ((self.length+1)+PUSPacketHeader.headerSize): + print("Invalid packet length") + return + dataToCheck = dataToCheck[0:(self.length+1)+PUSPacketHeader.headerSize] + crc = crc_func(dataToCheck) + if crc == 0: + self.valid = True + else: + print("Invalid CRC detected !") + + def printPusPacketHeader(self, array): + array.append(str(chr(self.apid))) + array.append(str(self.sourceSequenceCount)) + if self.valid: + array.append("Yes") + else: + array.append("No") + # array.append(str(self.valid)) + + @staticmethod + def printPusPacketHeaderColumnHeaders(array): + array.append("APID") + array.append("SSC") + array.append("Packet Valid") + + +class OBSWTimestamp: + def __init__(self, byteArray): + # pField = byteArray[0] + byteArray = byteArray[1:] + self.days = ((byteArray[0] << 8) | (byteArray[1])) - 4383 + self.seconds = self.days * (24*60*60) + sDay = ((byteArray[2] << 24) | (byteArray[3] << 16) | (byteArray[4]) << 8 | byteArray[5])/1000 + self.seconds += sDay + self.time = self.seconds + self.datetime = str(datetime.datetime.utcfromtimestamp(self.time).strftime("%Y-%m-%d %H:%M:%S.%f")) + + def printTime(self, array): + array.append(self.time) + array.append(self.datetime) + + @staticmethod + def printTimeHeaders(array): + array.append("OBSWTime (s)") + array.append("Time") + + +class OBSWPUSPacketDataFieldHeader: + def __init__(self, bytesArray): + self.pusVersionAndAckByte = (bytesArray[0] & 0x70) >> 4 + self.type = bytesArray[1] + self.subtype = bytesArray[2] + self.subcounter = bytesArray[3] + self.time = OBSWTimestamp(bytesArray[4:13]) + + def printDataFieldHeader(self, array): + array.append(str(self.type)) + array.append(str(self.subtype)) + array.append(str(self.subcounter)) + + self.time.printTime(array) + + def printDataFieldHeaderColumnHeader(self, array): + array.append("Service") + array.append("Subservice") + array.append("Subcounter") + self.time.printTimeHeaders(array) + + +class OBSWPusPacket: + def __init__(self, byteArray): + self.packetRaw = byteArray + self.PUSHeader = PUSPacketHeader(byteArray) + byteArray = byteArray[6:] + self.dataFieldHeader = OBSWPUSPacketDataFieldHeader(byteArray) + byteArray = byteArray[12:] + self.data = byteArray[:len(byteArray)-2] + self.crc = byteArray[len(byteArray)-2] << 8 | byteArray[len(byteArray)-1] + + def printPusPacketHeader(self, array): + self.dataFieldHeader.printDataFieldHeader(array) + self.PUSHeader.printPusPacketHeader(array) + + def printPusPacketHeaderColumnHeaders(self, array): + self.dataFieldHeader.printDataFieldHeaderColumnHeader(array) + self.PUSHeader.printPusPacketHeaderColumnHeaders(array) + + def getPacketSize(self): + # PusHeader Size + data size + size = PUSPacketHeader.headerSize + self.PUSHeader.length + 1 + return size + + def getService(self): + return self.dataFieldHeader.type + + def getSubservice(self): + return self.dataFieldHeader.subtype + + def getSSC(self): + return self.PUSHeader.sourceSequenceCount + +# Structure of a PUS Packet : +# A PUS packet consists of consecutive bits, the allocation and structure is standardised. +# Extended information can be found in ECSS-E-70-41A on p.42 +# The easiest form to send a PUS Packet is in hexadecimal form. +# A two digit hexadecimal number equals one byte, 8 bits or one octet +# o = optional, Srv = Service +# +# The structure is shown as follows for TM[17,2] +# 1. Structure Header +# 2. Structure Subheader +# 3. Component (size in bits) +# 4. Hexadecimal number +# 5. Binary Number +# 6. Decimal Number +# +# -------------------------------------------Packet Header(48)------------------------------------------| Packet | +# ----------------Packet ID(16)----------------------|Packet Sequence Control (16)| Packet Length (16) | Data Field | +# Version | Type(1) |Data Field |APID(11) | SequenceFlags(2) |Sequence | | (Variable) | +# Number(3) | |Header Flag(1)| | |Count(14)| | | +# 0x18 | 0x73 | 0xc0 | 0x19 | 0x00 | 0x04 | | +# 000 1 0 000| 01110011 | 11 000000 | 00011001|00000000 | 0000100 | | +# 0 | 1 | 0 | 115(ASCII s) | 3 | 25 | 0 | 4 | | +# +# - Packet Length is an unsigned integer C = Number of Octets in Packet Data Field - 1 +# +# Packet Data Field Structure: +# +# ------------------------------------------------Packet Data Field------------------------------------------------- | +# ---------------------------------Data Field Header ---------------------------|AppData|Spare| PacketErrCtr | +# CCSDS(1)|TC PUS Ver.(3)|Ack(4)|SrvType (8)|SrvSubtype(8)|Source ID(o)|Spare(o)| (var)|(var)| (16) | +# 0x11 (0x1F) | 0x11 | 0x01 | | | | | Calc. | Calc. | +# 0 001 1111 |00010001 | 00000001 | | | | | | | +# 0 1 1111 | 17 | 2 | | | | | | | +# +# - The source ID is present as one byte. Is it necessary? For now, ground = 0x00. + diff --git a/tm/OBSW_TmPacket.py b/tm/OBSW_TmPacket.py new file mode 100644 index 0000000000000000000000000000000000000000..a77235c295dd9d77d266f663c7ecb3c9156fa021 --- /dev/null +++ b/tm/OBSW_TmPacket.py @@ -0,0 +1,236 @@ +# -*- coding: utf-8 -*- +""" +Program: OBSW_TmPacket.py +Date: 01.11.2019 +Description: Deserialize TM byte array into PUS TM Class +Author: R.Mueller, S. Gaisser +""" + +from tm.OBSW_PusPacket import OBSWPusPacket +import struct +# TM Packets use the generic space packet structure provided by OBSWPusPacket to generate individual +# telemetry packets for all services +# TO DO: Different classes for different services? + + +def PUSTelemetryFactory(rawPacket): + servicetype = rawPacket[7] + # extract service type from data + if servicetype == 1: + return Service1TM(rawPacket) + elif servicetype == 2: + return Service2TM(rawPacket) + elif servicetype == 5: + return Service5TM(rawPacket) + elif servicetype == 8: + return Service8TM(rawPacket) + elif servicetype == 17: + return Service17TM(rawPacket) + elif servicetype == 200: + return Service200TM(rawPacket) + else: + print("The service " + str(servicetype) + " is not implemented in Telemetry Factory") + return PUSTelemetry(rawPacket) + + +class PUSTelemetry(OBSWPusPacket): + def __init__(self, byteArray): + super().__init__(byteArray) + self.byteArrayData = self.data + + def unpackTelemetry(self): + pass + + def printTelemetryHeader(self, array): + super().printPusPacketHeader(array) + + def printTelemetryColumnHeaders(self, array): + super().printPusPacketHeaderColumnHeaders(array) + + def packTmInformation(self): + tmInformation = { + "service": self.getService(), + "subservice": self.getSubservice(), + "ssc": self.getSSC(), + "data": self.byteArrayData, + "crc": self.crc, + "valid": self.PUSHeader.valid + } + return tmInformation + + +class Service1TM(PUSTelemetry): + def __init__(self, byteArray): + super().__init__(byteArray) + self.tcErrorCode = False + self.isStep = False + # Failure Reports with error code + self.tcPacketId = self.byteArrayData[0] << 8 | self.byteArrayData[1] + self.tcSSC = ((self.byteArrayData[2] & 0x3F) << 8) | self.byteArrayData[3] + if self.dataFieldHeader.subtype % 2 == 0: + self.tcErrorCode = True + if self.dataFieldHeader.subtype == 6: + self.isStep = True + self.stepNumber = struct.unpack('>B', self.byteArrayData[4:5])[0] + self.ErrCode = struct.unpack('>H', self.byteArrayData[5:7])[0] + else: + self.ErrCode = struct.unpack('>H', self.byteArrayData[4:6])[0] + elif self.dataFieldHeader.subtype == 5: + self.isStep = True + self.stepNumber = struct.unpack('>B', self.byteArrayData[4:5])[0] + + def printTelemetryHeader(self, array): + super().printTelemetryHeader(array) + array.append(str(hex(self.tcPacketId))) + array.append(str(self.tcSSC)) + if self.tcErrorCode: + if self.isStep: + array.append(str(self.stepNumber)) + array.append(str(hex(self.ErrCode))) + elif self.isStep: + array.append(str(self.stepNumber)) + + def printTelemetryColumnHeaders(self, array): + super().printTelemetryColumnHeaders(array) + array.append("TC Packet ID") + array.append("TC SSC") + if self.tcErrorCode: + if self.isStep: + array.append("Step Number") + array.append("Err. Code") + elif self.isStep: + array.append("Step Number") + + def packTmInformation(self): + tmInformation = super().packTmInformation() + addInformation = { + "tcPacketId": self.tcPacketId, + "tcSSC": self.tcSSC, + } + tmInformation.update(addInformation) + if self.tcErrorCode: + tmInformation.update({"errCode": self.ErrCode}) + if self.isStep: + tmInformation.update({"stepNumber": self.ErrCode}) + return tmInformation + + +class Service2TM(PUSTelemetry): + def __init__(self, byteArray): + super().__init__(byteArray) + + def printTelemetryHeader(self, array): + super().printTelemetryHeader(array) + return + + def printTelemetryColumnHeaders(self, array): + super().printTelemetryColumnHeaders(array) + return + + +class Service5TM(PUSTelemetry): + def __init__(self, byteArray): + super().__init__(byteArray) + self.eventId = struct.unpack('>H', self.byteArrayData[0:2])[0] + self.objectId = struct.unpack('>I', self.byteArrayData[2:6])[0] + self.param1 = struct.unpack('>I', self.byteArrayData[6:10])[0] + self.param2 = struct.unpack('>I', self.byteArrayData[10:14])[0] + + def printTelemetryHeader(self, array): + super().printTelemetryHeader(array) + array.append(str(self.eventId)) + array.append(hex(self.objectId)) + array.append(str(self.param1)) + array.append(str(self.param2)) + + def printTelemetryColumnHeaders(self, array): + super().printTelemetryColumnHeaders(array) + array.append("Event ID") + array.append("Reporter ID") + array.append("Parameter 1") + array.append("Parameter 2") + + def packTmInformation(self): + tmInformation = super().packTmInformation() + addInformation = { + "RID": self.objectId, + "EventID": self.eventId, + "Param1": self.param1, + "Param2": self.param2 + } + tmInformation.update(addInformation) + return tmInformation + + +class Service8TM(PUSTelemetry): + def __init__(self, byteArray): + super().__init__(byteArray) + + def printTelemetryHeader(self, array): + super().printTelemetryHeader(array) + return + + def printTelemetryColumnHeaders(self, array): + super().printTelemetryColumnHeaders(array) + return + + +class Service9TM(PUSTelemetry): + def __init__(self, byteArray): + super().__init__(byteArray) + + def printTelemetryHeader(self, array): + super().printTelemetryHeader(array) + return + + def printTelemetryColumnHeaders(self, array): + super().printTelemetryColumnHeaders(array) + return + + +class Service17TM(PUSTelemetry): + def __init__(self, byteArray): + super().__init__(byteArray) + + def printTelemetryHeader(self, array): + super().printTelemetryHeader(array) + return + + def printTelemetryColumnHeaders(self, array): + super().printTelemetryColumnHeaders(array) + return + + +class Service200TM(PUSTelemetry): + def __init__(self, byteArray): + super().__init__(byteArray) + self.isCantReachModeReply = False + self.isModeReply = False + self.objectId = struct.unpack('>I', self.byteArrayData[0:4])[0] + if self.dataFieldHeader.subtype == 7: + self.isCantReachModeReply = True + self.returnValue = self.byteArrayData[4] << 8 | self.byteArrayData[5] + elif self.dataFieldHeader.subtype == 6 or self.dataFieldHeader.subtype == 8: + self.isModeReply = True + self.mode = struct.unpack('>I', self.byteArrayData[4:8])[0] + self.submode = self.byteArrayData[8] + + def printTelemetryHeader(self, array): + super().printTelemetryHeader(array) + array.append(hex(self.objectId)) + if self.isCantReachModeReply: + array.append(hex(self.returnValue)) + elif self.isModeReply: + array.append(str(self.mode)) + array.append(str(self.submode)) + return + + def printTelemetryColumnHeaders(self, array): + super().printTelemetryColumnHeaders(array) + array.append("Object ID") + if self.isCantReachModeReply: + array.append("Return Value") + elif self.isModeReply: + array.append("Mode") + array.append("Submode") + return diff --git a/utility/OBSW_TmTcPrinter.py b/utility/OBSW_TmTcPrinter.py new file mode 100644 index 0000000000000000000000000000000000000000..8e41f0013cda5b975de8ea4581dcb81f814587e1 --- /dev/null +++ b/utility/OBSW_TmTcPrinter.py @@ -0,0 +1,116 @@ +#!/usr/bin/python3.7 +# -*- coding: utf-8 -*- +""" +@file + OBSW_Config.py +@date + 01.11.2019 +@brief + Class that performs all printing functionalities +""" +import OBSW_Config as g + + +# TODO: Print everything in a file +class TmtcPrinter: + def __init__(self, displayMode, doPrintToFile): + self.printBuffer = "" + # global print buffer which will be useful to print something to file + self.fileBuffer = "" + self.displayMode = displayMode + self.doPrintToFile = doPrintToFile + + def printTelemetry(self, packet): + if self.displayMode == "short": + self.handleShortPrint(packet) + else: + self.handleLongPrint(packet) + self.handleWiretappingPacket(packet) + self.handleDataReplyPacket(packet) + if g.printRawTmData: + self.printBuffer = "TM Data:" + "\n" + self.returnDataString(packet.data) + print(self.printBuffer) + self.addPrintBufferToFileBuffer() + + def handleShortPrint(self, packet): + self.printBuffer = "Received TM[" + str(packet.getService()) + "," + str( + packet.getSubservice()) + "]" + print(self.printBuffer) + self.addPrintBufferToFileBuffer() + + def handleLongPrint(self, packet): + self.printBuffer = "Received Telemetry:" + print(self.printBuffer) + self.addPrintBufferToFileBuffer() + self.handleColumnHeaderPrint(packet) + self.handleTmContentPrint(packet) + + def handleColumnHeaderPrint(self, packet): + recPus = [] + packet.printTelemetryColumnHeaders(recPus) + self.printBuffer = str(recPus) + print(self.printBuffer) + self.addPrintBufferToFileBuffer() + + def handleTmContentPrint(self, packet): + recPus = [] + packet.printTelemetryHeader(recPus) + self.printBuffer = str(recPus) + print(self.printBuffer) + self.addPrintBufferToFileBuffer() + + def handleWiretappingPacket(self, packet): + if packet.getService() == 2 and (packet.getSubservice() == 131 or packet.getSubservice() == 130): + self.printBuffer = "Wiretapping Packet or Raw Reply from TM [" + \ + str(packet.getService()) + "," + str(packet.getSubservice()) + "]:" + self.printBuffer = self.printBuffer + self.returnDataString(packet.data) + print(self.printBuffer) + self.addPrintBufferToFileBuffer() + + def handleDataReplyPacket(self, packet): + if packet.getService() == 8 and packet.getSubservice() == 130: + self.printBuffer = "Service 8 Direct Command Reply TM[8,130] with data: " \ + + self.returnDataString(packet.data) + print(self.printBuffer) + + # This function handles the printing of Telecommands + def displaySentCommand(self, pusPacketInfo, pusPacket, displayMode): + if len(pusPacket) == 0: + print("Empty packet was sent, configuration error") + exit() + if displayMode == "short": + self.handleShortTcPrint(pusPacketInfo) + else: + self.handleLongTcPrint(pusPacketInfo) + + def handleShortTcPrint(self, pusPacketInfo): + self.printBuffer = "Sent TC[" + str(pusPacketInfo["service"]) + "," + str(pusPacketInfo["subservice"]) \ + + "] " + " with SSC " + str(pusPacketInfo["ssc"]) + print(self.printBuffer) + self.addPrintBufferToFileBuffer() + + def handleLongTcPrint(self, pusPacketInfo): + self.printBuffer = "Telecommand TC[" + str(pusPacketInfo["service"]) + "," + str(pusPacketInfo["subservice"]) \ + + "] with SSC " + str(pusPacketInfo["ssc"]) + " sent with data " \ + + self.returnDataString(pusPacketInfo["data"]) + print(self.printBuffer) + self.addPrintBufferToFileBuffer() + + @staticmethod + def returnDataString(byteArray): + strToPrint = "[" + for byte in byteArray: + strToPrint += str(hex(byte)) + " , " + strToPrint = strToPrint.rstrip(' , ') + strToPrint += ']' + return strToPrint + + def addPrintBufferToFileBuffer(self): + if self.doPrintToFile: + self.fileBuffer = self.fileBuffer + self.printBuffer + "\n" + + def printToFile(self): + file = open("tmtc_log.txt", 'w') + file.write(self.fileBuffer) + file.close() +