Skip to content
Snippets Groups Projects
Commit 1389bd50 authored by Robin.Mueller's avatar Robin.Mueller
Browse files

srv3 update

parent 8a4b1b11
Branches
No related tags found
No related merge requests found
......@@ -42,26 +42,30 @@ def pack_test_device_test(tc_queue: Deque):
def pack_custom_tests(tc_queue: Deque):
tc_queue.appendleft(("print", "Testing Service 3: "))
tc_queue.appendleft(("print", "Generate one thermal sensor packet: "))
command = PusTelecommand(service=3, subservice=27, ssc=3100, app_data=sid_thermalsensor)
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft(("print", "Enabling periodic thermal sensor packet generation: "))
command = PusTelecommand(service=3, subservice=5, ssc=3101, app_data=sid_thermalsensor)
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft(("wait", 6))
new_interval = 1.2
interval_app_data = sid_thermalsensor + make_interval(new_interval)
tc_queue.appendleft(("print", "Setting interval to one second: "))
command = PusTelecommand(service=3, subservice=31, ssc=3102, app_data=interval_app_data)
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft(("wait", 3))
tc_queue.appendleft(("print", "Disabling periodic thermal sensor packet generation: "))
command = PusTelecommand(service=3, subservice=6, ssc=3103, app_data=sid_thermalsensor)
# tc_queue.appendleft(("print", "Generate one thermal sensor packet: "))
# command = PusTelecommand(service=3, subservice=27, ssc=3100, app_data=sid_thermalsensor)
# tc_queue.appendleft(command.pack_command_tuple())
#
# tc_queue.appendleft(("print", "Enabling periodic thermal sensor packet generation: "))
# command = PusTelecommand(service=3, subservice=5, ssc=3101, app_data=sid_thermalsensor)
# tc_queue.appendleft(command.pack_command_tuple())
#
# tc_queue.appendleft(("wait", 6))
#
# new_interval = 1.2
# interval_app_data = sid_thermalsensor + make_interval(new_interval)
# tc_queue.appendleft(("print", "Setting interval to one second: "))
# command = PusTelecommand(service=3, subservice=31, ssc=3102, app_data=interval_app_data)
# tc_queue.appendleft(command.pack_command_tuple())
#
# tc_queue.appendleft(("wait", 3))
#
# tc_queue.appendleft(("print", "Disabling periodic thermal sensor packet generation: "))
# command = PusTelecommand(service=3, subservice=6, ssc=3103, app_data=sid_thermalsensor)
# tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft(("print", "Generating structure report for thermal sensor packet: "))
command = PusTelecommand(service=3, subservice=9, ssc=3104, app_data=sid_thermalsensor)
tc_queue.appendleft(command.pack_command_tuple())
......
......@@ -16,9 +16,23 @@ LOGGER = get_logger()
class Service3TM(PusTelemetry):
"""
@brief This class encapsulates the format of Service 3 telemetry
"""
# Minimal packet contains SID, which consists of object ID(4) and set ID(4)
MINIMAL_PACKET_SIZE = 8
# Minimal structure report contains SID (8), reporting status(1), validity flag (1) and
# collection interval as float (4)
STRUCTURE_REPORT_FIXED_HEADER_SIZE = MINIMAL_PACKET_SIZE + 6
def __init__(self, byte_array: bytearray):
super().__init__(byte_array)
# print("Length of TM data: " + str(len(self._tm_data)))
if len(self._tm_data) < 8:
warning = "Service3TM: handle_filling_definition_arrays: Invalid Service 3 packet," \
" is too short!"
LOGGER.warning(warning)
return
self.objectId = struct.unpack('!I', self._tm_data[0:4])[0]
self.objectIdRaw = self._tm_data[0:4]
self.setId = struct.unpack('!I', self._tm_data[4:8])[0]
......@@ -48,24 +62,48 @@ class Service3TM(PusTelemetry):
array.append("HK Data Size")
def handle_filling_definition_arrays(self):
self.hkHeader = ["Object ID", "Set ID", "Report Status",
self.print_source_data()
if len(self._tm_data) < self.STRUCTURE_REPORT_FIXED_HEADER_SIZE:
warning = "Service3TM: handle_filling_definition_arrays: Invalid structure report " \
"from " + str(hex(self.objectId)) + ", is shorter than" + \
str(self.STRUCTURE_REPORT_FIXED_HEADER_SIZE) + "."
LOGGER.warning(warning)
return
self.hkHeader = ["Object ID", "Set ID", "Report Status", "Set Valid"
"Collection Interval", "Number Of IDs"]
reporting_enabled = self._tm_data[4]
collection_interval = struct.unpack('>f', self._tm_data[5:9])[0]
number_of_params = self._tm_data[9]
reporting_enabled = self._tm_data[8]
print(reporting_enabled)
set_valid = self._tm_data[9]
print(set_valid)
collection_interval = struct.unpack('>f', self._tm_data[10:14])[0]
print(collection_interval)
num_params = self._tm_data[14]
print(num_params)
if len(self._tm_data) < self.STRUCTURE_REPORT_FIXED_HEADER_SIZE + num_params * 4:
warning = "Service3TM: handle_filling_definition_arrays: Invalid structure report " \
"from " + str(hex(self.objectId)) + ", is shorter than " + \
str(self.STRUCTURE_REPORT_FIXED_HEADER_SIZE + num_params * 4) + "."
LOGGER.warning(warning)
return
parameters = []
index2 = 1
for index in range(10, (number_of_params * 4) + 10, 4):
parameter = struct.unpack('>I', self._tm_data[index:index + 4])[0]
self.hkHeader.append("Pool ID " + str(index2))
counter = 1
for array_index in range(self.STRUCTURE_REPORT_FIXED_HEADER_SIZE,
self.STRUCTURE_REPORT_FIXED_HEADER_SIZE + 4 * num_params, 4):
parameter = struct.unpack('>I', self._tm_data[array_index:array_index + 4])[0]
self.hkHeader.append("Pool ID " + str(counter))
parameters.append(str(hex(parameter)))
index2 = index2 + 1
counter = counter + 1
if reporting_enabled == 1:
status_string = "On"
else:
status_string = "Off"
self.hkContent = [hex(self.objectId), self.setId, status_string,
collection_interval, number_of_params]
if set_valid:
valid_string = "On"
else:
valid_string = "Off"
self.hkContent = [hex(self.objectId), self.setId, status_string, valid_string,
collection_interval, num_params]
self.hkContent.extend(parameters)
def handle_filling_hk_arrays(self):
......@@ -118,7 +156,8 @@ class Service3TM(PusTelemetry):
test_uint32 = struct.unpack('>I', self._tm_data[8:12])[0]
float_vector1 = struct.unpack('>f', self._tm_data[12:16])[0]
float_vector2 = struct.unpack('>f', self._tm_data[16:20])[0]
self.hkContent = [test_bool, test_uint8, test_uint16, test_uint32, float_vector1, float_vector2]
self.hkContent = [test_bool, test_uint8, test_uint16,
test_uint32, float_vector1, float_vector2]
self.validity_buffer = self._tm_data[20:]
# print(self.validityBuffer)
# print(str(format(self.validityBuffer[0], '#010b')))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment