import struct from tmtc_core.tm.obsw_pus_tm_base import PusTelemetry from tmtc_core.utility.obsw_logger import get_logger LOGGER = get_logger() class Service23TM(PusTelemetry): MAX_REPOSITORY_LENGTH = 64 MAX_FILENAME_LENGTH = 12 def __init__(self, byte_array): super().__init__(byte_array) if len(self.get_tm_data()) < 4: LOGGER.error("Service23TM: Invalid packet format!") return self.object_id = struct.unpack('!I', self._tm_data[0:4])[0] self.repo_path = "" self.filename = "" self.file_size = 0 self.lock_status = False self.data_start_idx = 0 if self.get_subservice() == 4: self.unpack_repo_and_filename() self.unpack_file_attributes() elif self.get_subservice() == 132: self.unpack_repo_and_filename() pass def unpack_repo_and_filename(self): repo_path_found = False path_idx_start = 0 max_len_to_scan = len(self.get_tm_data()) - 4 for idx in range(4, max_len_to_scan): if not repo_path_found and self._tm_data[idx] == 0: repo_bytes = self._tm_data[4:idx] self.repo_path = repo_bytes.decode('utf-8') path_idx_start = idx + 1 idx += 1 repo_path_found = True if repo_path_found: if self._tm_data[idx] == 0: filename_bytes = self._tm_data[path_idx_start:idx] self.filename = filename_bytes.decode('utf-8') self.data_start_idx = idx + 1 break def unpack_file_attributes(self): # Size of file length (4) + lock status (1), adapt if more field are added! print(len(self.get_tm_data()) - self.data_start_idx) if len(self.get_tm_data()) - self.data_start_idx != 5: LOGGER.error("Service23TM: Invalid lenght of file attributes data") return self.file_size = struct.unpack('!I', self.get_tm_data()[ self.data_start_idx: self.data_start_idx + 4])[0] self.lock_status = self.get_tm_data()[self.data_start_idx + 4] def append_telemetry_content(self, content_list: list): super().append_telemetry_content(content_list) content_list.append(hex(self.object_id)) content_list.append(self.repo_path) content_list.append(self.filename) if self.get_subservice() == 4: content_list.append(self.file_size) if self.lock_status == 0: content_list.append("No") else: content_list.append("Yes") def append_telemetry_column_headers(self, header_list: list): super().append_telemetry_column_headers(header_list) header_list.append("Object ID") header_list.append("Repo Path") header_list.append("File Name") if self.get_subservice() == 4: header_list.append("File Size") header_list.append("Locked")