Something went wrong on our end
Forked from an inaccessible project.
-
Robin Mueller authoredRobin Mueller authored
obsw_tm_service_23.py 3.01 KiB
import struct
from tmtc_core.pus_tm.tmtcc_pus_tm_base import PusTelemetry
from tmtc_core.utility.obsw_logger import get_logger
LOGGER = get_logger()
class Service23TM(PusTelemetry):
MAX_REPOSITORY_LENGTH = 64
MAX_FILENAME_LENGTH = 12
def __init__(self, byte_array):
super().__init__(byte_array)
if len(self.get_tm_data()) < 4:
LOGGER.error("Service23TM: Invalid packet format!")
return
self.object_id = struct.unpack('!I', self._tm_data[0:4])[0]
self.repo_path = ""
self.filename = ""
self.file_size = 0
self.lock_status = False
self.data_start_idx = 0
if self.get_subservice() == 4:
self.unpack_repo_and_filename()
self.unpack_file_attributes()
elif self.get_subservice() == 132:
self.unpack_repo_and_filename()
pass
def unpack_repo_and_filename(self):
repo_path_found = False
path_idx_start = 0
max_len_to_scan = len(self.get_tm_data()) - 4
for idx in range(4, max_len_to_scan):
if not repo_path_found and self._tm_data[idx] == 0:
repo_bytes = self._tm_data[4:idx]
self.repo_path = repo_bytes.decode('utf-8')
path_idx_start = idx + 1
idx += 1
repo_path_found = True
if repo_path_found:
if self._tm_data[idx] == 0:
filename_bytes = self._tm_data[path_idx_start:idx]
self.filename = filename_bytes.decode('utf-8')
self.data_start_idx = idx + 1
break
def unpack_file_attributes(self):
# Size of file length (4) + lock status (1), adapt if more field are added!
print(len(self.get_tm_data()) - self.data_start_idx)
if len(self.get_tm_data()) - self.data_start_idx != 5:
LOGGER.error("Service23TM: Invalid lenght of file attributes data")
return
self.file_size = struct.unpack('!I', self.get_tm_data()[
self.data_start_idx: self.data_start_idx + 4])[0]
self.lock_status = self.get_tm_data()[self.data_start_idx + 4]
def append_telemetry_content(self, content_list: list):
super().append_telemetry_content(content_list)
content_list.append(hex(self.object_id))
content_list.append(self.repo_path)
content_list.append(self.filename)
if self.get_subservice() == 4:
content_list.append(self.file_size)
if self.lock_status == 0:
content_list.append("No")
else:
content_list.append("Yes")
def append_telemetry_column_headers(self, header_list: list):
super().append_telemetry_column_headers(header_list)
header_list.append("Object ID")
header_list.append("Repo Path")
header_list.append("File Name")
if self.get_subservice() == 4:
header_list.append("File Size")
header_list.append("Locked")