Skip to content
Snippets Groups Projects
Forked from an inaccessible project.
obsw_tm_service_20.py 2.60 KiB
import struct

from tmtc_core.tm.obsw_pus_tm_base import PusTelemetry, TmDictionaryKeys, PusTmInfoT


class Service20TM(PusTelemetry):
    def __init__(self, byte_array):
        super().__init__(byte_array)
        datasize = len(self._tm_data)
        self.objectId = struct.unpack('>I', self._tm_data[0:4])[0]
        self.parameter_id = struct.unpack('>I', self._tm_data[4:8])[0]
        if self.get_subservice() == 130:
            self.type = struct.unpack('>H', self._tm_data[8:10])[0]
            self.type_ptc = self._tm_data[8]
            self.type_pfc = self._tm_data[9]
            self.column = self._tm_data[10]
            self.row = self._tm_data[11]
            if self.type_ptc == 3 and self.type_pfc == 14:
                self.param = struct.unpack('>I', self._tm_data[12:datasize])[0]
            if self.type_ptc == 4 and self.type_pfc == 14:
                self.param = struct.unpack('>i', self._tm_data[12:datasize])[0]
            if self.type_ptc == 5 and self.type_pfc == 1:
                self.param = struct.unpack('>f', self._tm_data[12:datasize])[0]
        else:
            logger.info("Error when receiving Pus Service 20 TM: subservice is != 130, but 130 is \
                         only known subservice")
        self.specify_packet_info("Functional Commanding Reply")

    def append_telemetry_content(self, array):
        super().append_telemetry_content(array)
        array.append(hex(self.objectId))
        array.append(self.parameter_id)
        if self.get_subservice() == 130:
            array.append("PTC: " + str(self.type_ptc) + " | PFC: " + str(self.type_pfc))
            array.append(str(self.column))
            array.append(str(self.row))
            if self.type_ptc == 5 and self.type_pfc == 1:
                array.append(str(float(self.param)))
            else:
                array.append(str(hex(self.param)))
        return

    def append_telemetry_column_headers(self, array):
        super().append_telemetry_column_headers(array)
        array.append("objectID")
        array.append("parameterID")
        if self.get_subservice() == 130:
            array.append("type")
            array.append("column")
            array.append("row")
            array.append("parameter")
        return

    def pack_tm_information(self) -> PusTmInfoT:
        tm_information = super().pack_tm_information()
        add_information = {
            TmDictionaryKeys.REPORTER_ID: self.objectId,
            TmDictionaryKeys.EVENT_ID: self.parameter_id,
            TmDictionaryKeys.EVENT_PARAM_1: self.param
        }
        tm_information.update(add_information)
        return tm_information