Skip to content
Snippets Groups Projects
Forked from an inaccessible project.
obsw_pus_tm_factory.py 7.06 KiB
# -*- coding: utf-8 -*-
from typing import Deque, List, Tuple
from tm.obsw_pus_tm_base import PusTelemetry, PusTmInfoT, TmDictionaryKeys
from tm.obsw_tm_service_1 import Service1TM
from tm.obsw_tm_service_3 import Service3TM
from tm.obsw_tm_service_5 import Service5TM
from utility.obsw_logger import get_logger
import struct

logger = get_logger()
PusRawTmList = List[bytearray]
PusRawTmQueue = Deque[bytearray]
PusTmTupleT = Tuple[PusTmInfoT, PusTelemetry]

PusTmListT = List[PusTelemetry]
PusTmQueueT = Deque[PusTmListT]

PusTmInfoQueueT = Deque[PusTmInfoT]
PusTmTupleQueueT = Deque[PusTmTupleT]


class PusTelemetryFactory(object):
    """
    Deserialize TM bytearrays into PUS TM Classes
    """
    @staticmethod
    def create(raw_tm_packet: bytearray) -> PusTelemetry:
        service_type = raw_tm_packet[7]
        if service_type == 1:
            return Service1TM(raw_tm_packet)
        if service_type == 2:
            return Service2TM(raw_tm_packet)
        if service_type == 3:
            return Service3TM(raw_tm_packet)
        if service_type == 5:
            return Service5TM(raw_tm_packet)
        if service_type == 8:
            return Service8TM(raw_tm_packet)
        if service_type == 17:
            return Service17TM(raw_tm_packet)
        if service_type == 20:
            return Service20TM(raw_tm_packet)
        if service_type == 200:
            return Service200TM(raw_tm_packet)
        print("The service " + str(service_type) + " is not implemented in Telemetry Factory")
        return PusTelemetry(raw_tm_packet)



class Service2TM(PusTelemetry):
    def __init__(self, byte_array: bytearray):
        super().__init__(byte_array)
        self.specify_packet_info("Raw Commanding Reply")

    def append_telemetry_content(self, array):
        super().append_telemetry_content(array)
        return

    def append_telemetry_column_headers(self, array):
        super().append_telemetry_column_headers(array)
        return


class Service8TM(PusTelemetry):
    def __init__(self, byte_array):
        super().__init__(byte_array)
        self.specify_packet_info("Functional Commanding Reply")

    def append_telemetry_content(self, array):
        super().append_telemetry_content(array)
        return

    def append_telemetry_column_headers(self, array):
        super().append_telemetry_column_headers(array)
        return
    
    
class Service9TM(PusTelemetry):
    def __init__(self, byte_array):
        super().__init__(byte_array)
        self.specify_packet_info("Time Service Reply")

    def append_telemetry_content(self, array: list):
        super().append_telemetry_content(array)
        return

    def append_telemetry_column_headers(self, column_header: list):
        super().append_telemetry_column_headers(column_header)
        return
    
    
class Service17TM(PusTelemetry):
    def __init__(self, byte_array):
        super().__init__(byte_array)
        self.specify_packet_info("Test Reply")

    def append_telemetry_content(self, array):
        super().append_telemetry_content(array)
        return

    def append_telemetry_column_headers(self, array):
        super().append_telemetry_column_headers(array)
        return

class Service20TM(PusTelemetry):
    def __init__(self, byte_array):
        super().__init__(byte_array)
        datasize = len(self._tm_data)
        self.objectId = struct.unpack('>I', self._tm_data[0:4])[0]
        self.parameter_id = struct.unpack('>I', self._tm_data[4:8])[0]
        if self.get_subservice() == 130:
            self.type = struct.unpack('>H', self._tm_data[8:10])[0]
            self.type_ptc = self._tm_data[8]
            self.type_pfc = self._tm_data[9]
            self.column = self._tm_data[10]
            self.row = self._tm_data[11]
            if self.type_ptc == 3 and self.type_pfc == 14:
                self.param = struct.unpack('>I', self._tm_data[12:datasize])[0]
            if self.type_ptc == 4 and self.type_pfc == 14:
                self.param = struct.unpack('>i', self._tm_data[12:datasize])[0]
            if self.type_ptc == 5 and self.type_pfc == 1:
                self.param = struct.unpack('>f', self._tm_data[12:datasize])[0]
        self.specify_packet_info("Functional Commanding Reply")

    def append_telemetry_content(self, array):
        super().append_telemetry_content(array)
        array.append(hex(self.objectId))
        array.append(self.parameter_id)
        if self.get_subservice() == 130:
            array.append("PTC: " + str(self.type_ptc) + " | PFC: " + str(self.type_pfc))
            array.append(str(self.column))
            array.append(str(self.row))
            if self.type_ptc == 5 and self.type_pfc == 1:
                array.append(str(float(self.param)))
            else:
                array.append(str(hex(self.param)))
        return

    def append_telemetry_column_headers(self, array):
        super().append_telemetry_column_headers(array)
        array.append("objectID")
        array.append("parameterID")
        if self.get_subservice() == 130:
            array.append("type")
            array.append("column")
            array.append("row")
            array.append("parameter")
        return

    def pack_tm_information(self) -> PusTmInfoT:
        tm_information = super().pack_tm_information()
        add_information = {
            TmDictionaryKeys.REPORTER_ID: self.objectId,
            TmDictionaryKeys.EVENT_ID: self.parameter_id,
            TmDictionaryKeys.EVENT_PARAM_1: self.param
        }
        tm_information.update(add_information)
        return tm_information

class Service200TM(PusTelemetry):
    def __init__(self, byte_array):
        super().__init__(byte_array)
        self.isCantReachModeReply = False
        self.isModeReply = False
        self.specify_packet_info("Mode Reply")
        self.objectId = struct.unpack('>I', self._tm_data[0:4])[0]
        if self.get_subservice() == 7:
            self.append_packet_info(": Can't reach mode")
            self.isCantReachModeReply = True
            self.returnValue = self._tm_data[4] << 8 | self._tm_data[5]
        elif self.get_subservice() == 6 or self.get_subservice() == 8:
            self.isModeReply = True
            if self.get_subservice() == 8:
                self.append_packet_info(": Wrong Mode")
            elif self.get_subservice() == 6:
                self.append_packet_info(": Mode reached")
            self.mode = struct.unpack('>I', self._tm_data[4:8])[0]
            self.submode = self._tm_data[8]

    def append_telemetry_content(self, array):
        super().append_telemetry_content(array)
        array.append(hex(self.objectId))
        if self.isCantReachModeReply:
            array.append(hex(self.returnValue))
        elif self.isModeReply:
            array.append(str(self.mode))
            array.append(str(self.submode))
        return

    def append_telemetry_column_headers(self, array):
        super().append_telemetry_column_headers(array)
        array.append("Object ID")
        if self.isCantReachModeReply:
            array.append("Return Value")
        elif self.isModeReply:
            array.append("Mode")
            array.append("Submode")
        return