Skip to content
Snippets Groups Projects
Forked from an inaccessible project.
obsw_tm_service_23.py 3.01 KiB
import struct

from tmtc_core.pus_tm.tmtcc_pus_tm_base import PusTelemetry
from tmtc_core.utility.obsw_logger import get_logger

LOGGER = get_logger()


class Service23TM(PusTelemetry):
    MAX_REPOSITORY_LENGTH = 64
    MAX_FILENAME_LENGTH = 12

    def __init__(self, byte_array):
        super().__init__(byte_array)
        if len(self.get_tm_data()) < 4:
            LOGGER.error("Service23TM: Invalid packet format!")
            return
        self.object_id = struct.unpack('!I', self._tm_data[0:4])[0]
        self.repo_path = ""
        self.filename = ""
        self.file_size = 0
        self.lock_status = False
        self.data_start_idx = 0
        if self.get_subservice() == 4:
            self.unpack_repo_and_filename()
            self.unpack_file_attributes()
        elif self.get_subservice() == 132:
            self.unpack_repo_and_filename()
            pass

    def unpack_repo_and_filename(self):
        repo_path_found = False
        path_idx_start = 0
        max_len_to_scan = len(self.get_tm_data()) - 4
        for idx in range(4, max_len_to_scan):
            if not repo_path_found and self._tm_data[idx] == 0:
                repo_bytes = self._tm_data[4:idx]
                self.repo_path = repo_bytes.decode('utf-8')
                path_idx_start = idx + 1
                idx += 1
                repo_path_found = True
            if repo_path_found:
                if self._tm_data[idx] == 0:
                    filename_bytes = self._tm_data[path_idx_start:idx]
                    self.filename = filename_bytes.decode('utf-8')
                    self.data_start_idx = idx + 1
                    break

    def unpack_file_attributes(self):
        # Size of file length (4) + lock status (1), adapt if more field are added!
        print(len(self.get_tm_data()) - self.data_start_idx)
        if len(self.get_tm_data()) - self.data_start_idx != 5:
            LOGGER.error("Service23TM: Invalid lenght of file attributes data")
            return
        self.file_size = struct.unpack('!I', self.get_tm_data()[
                                             self.data_start_idx: self.data_start_idx + 4])[0]
        self.lock_status = self.get_tm_data()[self.data_start_idx + 4]

    def append_telemetry_content(self, content_list: list):
        super().append_telemetry_content(content_list)
        content_list.append(hex(self.object_id))
        content_list.append(self.repo_path)
        content_list.append(self.filename)
        if self.get_subservice() == 4:
            content_list.append(self.file_size)
            if self.lock_status == 0:
                content_list.append("No")
            else:
                content_list.append("Yes")

    def append_telemetry_column_headers(self, header_list: list):
        super().append_telemetry_column_headers(header_list)
        header_list.append("Object ID")
        header_list.append("Repo Path")
        header_list.append("File Name")
        if self.get_subservice() == 4:
            header_list.append("File Size")
            header_list.append("Locked")