Skip to content
Snippets Groups Projects
Forked from an inaccessible project.
obsw_pus_tm_creator.py 3.25 KiB
from tm.obsw_pus_tm_base import PusTelemetry, PusTelemetryTimestamp
from utility.obsw_logger import get_logger
import crcmod


logger = get_logger()


class PusTelemetryCreator(PusTelemetry):
    """
    Alternative way to create a PUS Telemetry packet by specifying telemetry parameters,
    similarly to the way telecommands are created. This can be used to create telemetry
    directly in the software.
    """
    def __init__(self, service: int, subservice: int, ssc: int = 0,
                 source_data: bytearray=bytearray([]), apid: int = 0x73, version: int = 0):
        super().__init__()
        # packet type for telemetry is 0 as specified in standard
        packet_type = 0
        # specified in standard
        data_field_header_flag = 1
        self.packet_id = [0x0, 0x0]
        self.packet_id[0] = ((version << 5) & 0xE0) | ((packet_type & 0x01) << 4) | \
                            ((data_field_header_flag & 0x01) << 3) | ((apid & 0x700) >> 8)
        self.packet_id[1] = apid & 0xFF
        self.ssc = ssc
        self.psc = (ssc & 0x3FFF) | (0b11 << 16)
        self.pus_version_and_ack_byte = 0b00011111

        # NOTE: In PUS-C, the PUS Version is 2 and specified for the first 4 bits.
        # The other 4 bits of the first byte are the spacecraft time reference status
        # To change to PUS-C, set 0b00100000
        self.data_field_version = 0b00010000
        self.service = service
        self.subservice = subservice
        self.pack_subcounter = 0
        # it is assumed the time field consts of 8 bytes.

        self.source_data = source_data

    def pack(self) -> bytearray:
        tm_packet_raw = bytearray()
        # PUS Header
        tm_packet_raw.extend(self.packet_id)
        tm_packet_raw.append((self.psc & 0xFF00) >> 8)
        tm_packet_raw.append(self.psc & 0xFF)
        source_length = self.get_source_data_length()
        tm_packet_raw.append((source_length & 0xFF00) >> 8)
        tm_packet_raw.append(source_length & 0xFF)
        # PUS Source Data Field
        tm_packet_raw.append(self.data_field_version)
        tm_packet_raw.append(self.service)
        tm_packet_raw.append(self.subservice)
        tm_packet_raw.append(self.pack_subcounter)
        tm_packet_raw.extend(PusTelemetryTimestamp.pack_current_time())
        # Source Data
        tm_packet_raw.extend(self.source_data)
        # CRC16 checksum
        crc_func = crcmod.mkCrcFun(0x11021, rev=False, initCrc=0xFFFF, xorOut=0x0000)
        crc16 = crc_func(tm_packet_raw)
        tm_packet_raw.append((crc16 & 0xFF00) >> 8)
        tm_packet_raw.append(crc16 & 0xFF)
        return tm_packet_raw

    def get_source_data_length(self) -> int:
        """
        Retrieve size of TM packet data header in bytes.
        Formula according to PUS Standard: C = (Number of octets in packet source data field) - 1.
        The size of the TM packet is the size of the packet secondary header with
        the timestamp + the length of the application data + PUS timestamp size +
        length of the CRC16 checksum - 1
        """
        try:
            data_length = 4  + PusTelemetry.PUS_TIMESTAMP_SIZE + len(self.source_data) + 1
            return data_length
        except TypeError:
            logger.error("PusTelecommand: Invalid type of application data!")
            return 0