Skip to content
Snippets Groups Projects
Forked from an inaccessible project.
OBSW_TmPacket.py 8.57 KiB
# -*- coding: utf-8 -*-
"""
Program: OBSW_TmPacket.py
Date: 01.11.2019
Description: Deserialize TM byte array into PUS TM Class
Author: R.Mueller, S. Gaisser
"""

from tm.OBSW_PusTm import PUSTelemetry
from tm.OBSW_TmService3 import Service3TM
import struct
# TM Packets use the generic space packet structure provided by OBSWPusPacket to generate individual
# telemetry packets for all services
# TO DO: Different classes for different services?


def PUSTelemetryFactory(rawPacket):
    servicetype = rawPacket[7]
    # extract service type from data
    if servicetype == 1:
        return Service1TM(rawPacket)
    elif servicetype == 2:
        return Service2TM(rawPacket)
    elif servicetype == 3:
        return Service3TM(rawPacket)
    elif servicetype == 5:
        return Service5TM(rawPacket)
    elif servicetype == 8:
        return Service8TM(rawPacket)
    elif servicetype == 17:
        return Service17TM(rawPacket)
    elif servicetype == 200:
        return Service200TM(rawPacket)
    else:
        print("The service " + str(servicetype) + " is not implemented in Telemetry Factory")
        return PUSTelemetry(rawPacket)


class Service1TM(PUSTelemetry):
    def __init__(self, byteArray):
        super().__init__(byteArray)
        self.tcErrorCode = False
        self.isStep = False
        # Failure Reports with error code
        self.tcPacketId = self.byteArrayData[0] << 8 | self.byteArrayData[1]
        self.tcSSC = ((self.byteArrayData[2] & 0x3F) << 8) | self.byteArrayData[3]
        self.printPacketInfo("Success Verification")
        if self.dataFieldHeader.subtype % 2 == 0:
            self.printPacketInfo("Failure Verficiation")
            self.tcErrorCode = True
            if self.dataFieldHeader.subtype == 6:
                self.isStep = True
                self.appendPacketInfo(" : Step Failure")
                self.stepNumber = struct.unpack('>B', self.byteArrayData[4:5])[0]
                self.ErrCode = struct.unpack('>H', self.byteArrayData[5:7])[0]
                self.errorParam1 = struct.unpack('>I', self.byteArrayData[7:11])[0]
                self.errorParam2 = struct.unpack('>I', self.byteArrayData[11:15])[0]
            else:
                self.ErrCode = struct.unpack('>H', self.byteArrayData[4:6])[0]
                self.errorParam1 = struct.unpack('>I', self.byteArrayData[6:10])[0]
                self.errorParam2 = struct.unpack('>I', self.byteArrayData[10:14])[0]

        elif self.dataFieldHeader.subtype == 5:
            self.isStep = True
            self.appendPacketInfo(" : Step Success")
            self.stepNumber = struct.unpack('>B', self.byteArrayData[4:5])[0]
                
    def printTelemetryHeader(self, array):
        super().printTelemetryHeader(array)
        array.append(str(hex(self.tcPacketId)))
        array.append(str(self.tcSSC))
        if self.tcErrorCode:
            if self.isStep:
                array.append(str(self.stepNumber))
            array.append(str(hex(self.ErrCode)))
            array.append(str(hex(self.errorParam1)))
            array.append(str(hex(self.errorParam2)))
        elif self.isStep:
            array.append(str(self.stepNumber))
            
    def printTelemetryColumnHeaders(self, array):
        super().printTelemetryColumnHeaders(array)
        array.append("TC Packet ID")
        array.append("TC SSC")
        if self.tcErrorCode:
            if self.isStep:
                array.append("Step Number")
            array.append("Return Value")
            array.append("Error Param 1")
            array.append("Error Param 2")
        elif self.isStep:
            array.append("Step Number")

    def packTmInformation(self):
        tmInformation = super().packTmInformation()
        addInformation = {
            "tcPacketId": self.tcPacketId,
            "tcSSC": self.tcSSC,
        }
        tmInformation.update(addInformation)
        if self.tcErrorCode:
            tmInformation.update({"errCode": self.ErrCode})
        if self.isStep:
            tmInformation.update({"stepNumber": self.ErrCode})
        return tmInformation
    
    
class Service2TM(PUSTelemetry):
    def __init__(self, byteArray):
        super().__init__(byteArray)

    def printTelemetryHeader(self, array):
        super().printTelemetryHeader(array)
        return

    def printTelemetryColumnHeaders(self, array):
        super().printTelemetryColumnHeaders(array)
        return


class Service5TM(PUSTelemetry):
    def __init__(self, byteArray):
        super().__init__(byteArray)
        self.printPacketInfo("Event")
        if self.getSubservice() == 1:
            self.appendPacketInfo(" Info")
        elif self.getSubservice() == 2:
            self.appendPacketInfo(" Error Low Severity")
        elif self.getSubservice() == 3:
            self.appendPacketInfo(" Error Med Severity")
        elif self.getSubservice() == 4:
            self.appendPacketInfo(" Error High Severity")
        self.eventId = struct.unpack('>H', self.byteArrayData[0:2])[0]
        self.objectId = struct.unpack('>I', self.byteArrayData[2:6])[0]
        self.param1 = struct.unpack('>I', self.byteArrayData[6:10])[0]
        self.param2 = struct.unpack('>I', self.byteArrayData[10:14])[0]
        
    def printTelemetryHeader(self, array):
        super().printTelemetryHeader(array)
        array.append(str(self.eventId))
        array.append(hex(self.objectId))
        array.append(str(self.param1))
        array.append(str(self.param2))
        
    def printTelemetryColumnHeaders(self, array):
        super().printTelemetryColumnHeaders(array)
        array.append("Event ID")
        array.append("Reporter ID")
        array.append("Parameter 1")
        array.append("Parameter 2")

    def packTmInformation(self):
        tmInformation = super().packTmInformation()
        addInformation = {
            "RID": self.objectId,
            "EventID": self.eventId,
            "Param1": self.param1,
            "Param2": self.param2
        }
        tmInformation.update(addInformation)
        return tmInformation


class Service8TM(PUSTelemetry):
    def __init__(self, byteArray):
        super().__init__(byteArray)

    def printTelemetryHeader(self, array):
        super().printTelemetryHeader(array)
        return

    def printTelemetryColumnHeaders(self, array):
        super().printTelemetryColumnHeaders(array)
        return
    
    
class Service9TM(PUSTelemetry):
    def __init__(self, byteArray):
        super().__init__(byteArray)

    def printTelemetryHeader(self, array):
        super().printTelemetryHeader(array)
        return

    def printTelemetryColumnHeaders(self, array):
        super().printTelemetryColumnHeaders(array)
        return
    
    
class Service17TM(PUSTelemetry):
    def __init__(self, byteArray):
        super().__init__(byteArray)
        self.printPacketInfo("Test Reply")

    def printTelemetryHeader(self, array):
        super().printTelemetryHeader(array)
        return

    def printTelemetryColumnHeaders(self, array):
        super().printTelemetryColumnHeaders(array)
        return


class Service200TM(PUSTelemetry):
    def __init__(self, byteArray):
        super().__init__(byteArray)
        self.isCantReachModeReply = False
        self.isModeReply = False
        self.printPacketInfo("Mode Reply")
        self.objectId = struct.unpack('>I', self.byteArrayData[0:4])[0]
        if self.dataFieldHeader.subtype == 7:
            self.appendPacketInfo(": Can't reach mode")
            self.isCantReachModeReply = True
            self.returnValue = self.byteArrayData[4] << 8 | self.byteArrayData[5]
        elif self.dataFieldHeader.subtype == 6 or self.dataFieldHeader.subtype == 8:
            self.isModeReply = True
            if self.dataFieldHeader.subtype == 8:
                self.appendPacketInfo(": Wrong Mode")
            elif self.dataFieldHeader.subtype == 6:
                self.appendPacketInfo(": Mode reached")
            self.mode = struct.unpack('>I', self.byteArrayData[4:8])[0]
            self.submode = self.byteArrayData[8]

    def printTelemetryHeader(self, array):
        super().printTelemetryHeader(array)
        array.append(hex(self.objectId))
        if self.isCantReachModeReply:
            array.append(hex(self.returnValue))
        elif self.isModeReply:
            array.append(str(self.mode))
            array.append(str(self.submode))
        return

    def printTelemetryColumnHeaders(self, array):
        super().printTelemetryColumnHeaders(array)
        array.append("Object ID")
        if self.isCantReachModeReply:
            array.append("Return Value")
        elif self.isModeReply:
            array.append("Mode")
            array.append("Submode")
        return