Commit 2ccc8aa9 authored by Robin Mueller's avatar Robin Mueller
Browse files

Merge branch 'mueller/master' into 'development'

bugfixes for hk service 3

See merge request source/tmtc_core!11
parents e9b79801 59049c23
......@@ -31,6 +31,7 @@ class Service3TM(Service3Base):
STRUCTURE_REPORT_FIXED_HEADER_SIZE = MINIMAL_PACKET_SIZE + 7
def __init__(self, byte_array: bytearray):
from tmtc_core.core.tmtcc_object_id_manager import get_key_from_raw_object_id
super().__init__(byte_array)
if len(self._tm_data) < 8:
warning = "Service3TM: handle_filling_definition_arrays: Invalid Service 3 packet," \
......@@ -39,7 +40,7 @@ class Service3TM(Service3Base):
return
self.object_id = struct.unpack('!I', self._tm_data[0:4])[0]
self.object_id_raw = self._tm_data[0:4]
self.object_id_key = get_key_from_raw_object_id(self._tm_data[0:4])
self.set_id = struct.unpack('!I', self._tm_data[4:8])[0]
self.specify_packet_info("Housekeeping Packet")
......@@ -49,17 +50,17 @@ class Service3TM(Service3Base):
if self.get_subservice() == 25 or self.get_subservice() == 26:
self.handle_filling_hk_arrays()
def append_telemetry_content(self, array):
super().append_telemetry_content(array)
array.append(hex(self.object_id))
array.append(hex(self.set_id))
array.append(int(self.param_length))
def append_telemetry_content(self, content_list: list):
super().append_telemetry_content(content_list=content_list)
content_list.append(hex(self.object_id))
content_list.append(hex(self.set_id))
content_list.append(int(self.param_length))
def append_telemetry_column_headers(self, array):
super().append_telemetry_column_headers(array)
array.append("Object ID")
array.append("Set ID")
array.append("HK Data Size")
def append_telemetry_column_headers(self, header_list: list):
super().append_telemetry_column_headers(header_list=header_list)
header_list.append("Object ID")
header_list.append("Set ID")
header_list.append("HK Data Size")
def handle_filling_definition_arrays(self):
if len(self._tm_data) < self.STRUCTURE_REPORT_FIXED_HEADER_SIZE:
......@@ -103,14 +104,14 @@ class Service3TM(Service3Base):
def handle_filling_hk_arrays(self):
try:
from pus_tm.tmtcc_pus_hk_handling import handle_user_hk_packet
from pus_tm.tmtcc_pus_hk_handling_hook import handle_user_hk_packet
except ImportError:
LOGGER.warning("Service3TM: User HK handling file missing!")
return
self.param_length = len(self._tm_data) - 8
(self.hk_header, self.hk_content, self.validity_buffer) = \
handle_user_hk_packet(self.object_id_raw, self._tm_data[8:], self)
(self.hk_header, self.hk_content, self.validity_buffer, self.number_of_parameters) = \
handle_user_hk_packet(self.object_id_key, self._tm_data[8:], self)
Service3TM: Type[PusTelemetry]
......@@ -5,7 +5,7 @@
it to your needs.
"""
from typing import Tuple
from config.tmtcc_object_ids import ObjectIds
from tmtc_core.pus_tm.tmtcc_pus_service_3 import Service3Base
from tmtc_core.utility.tmtcc_logger import get_logger
......@@ -13,14 +13,25 @@ LOGGER = get_logger()
def handle_user_hk_packet(
object_id: bytearray, hk_data: bytearray,
service3_packet: Service3Base) -> Tuple[list, list, bytearray]:
object_id: ObjectIds, hk_data: bytearray, service3_packet: Service3Base
) -> Tuple[list, list, bytearray, int]:
"""
This function is called when a Service 3 Housekeeping packet is received.
Please note that the object IDs should be compared by value because direct comparison of
enumerations does not work in Python. For example use:
if object_id.value == ObjectIds.TEST_OBJECT.value
to test equality based on the object ID list.
@param object_id:
@param hk_data:
@param service3_packet:
@return:
@return: Expects a tuple, consisting of two lists, a bytearray and an integer
The first list contains the header columns, the second list the list with
the corresponding values. The bytearray is the validity buffer, which is usually appended
at the end of the housekeeping packet. The last value is the number of parameters.
"""
LOGGER.info("Service3TM: Parsing for this SID has not been implemented.")
return [], [], bytearray()
return [], [], bytearray(), 0
......@@ -149,9 +149,8 @@ class TmTcPrinter:
from config.tmtcc_globals import GlobalIds
print_hk = get_global(GlobalIds.PRINT_HK)
if print_hk:
self.__print_buffer = "HK Data from Object ID "
self.__print_buffer += str(hex(tm_packet.object_id)) + " and set ID "
self.__print_buffer += str(hex(tm_packet.set_id)) + ":"
self.__print_buffer = f"HK Data from Object ID {tm_packet.object_id:#010x} and " \
f"set ID {tm_packet.set_id}:"
self.__print_hk(tm_packet)
self.__print_validity_buffer(tm_packet)
......@@ -164,9 +163,8 @@ class TmTcPrinter:
from config.tmtcc_globals import GlobalIds
print_hk = get_global(GlobalIds.PRINT_HK)
if print_hk:
self.__print_buffer = "HK Definition from Object ID "
self.__print_buffer += str(hex(tm_packet.object_id)) + " and set ID "
self.__print_buffer += str(tm_packet.set_id) + ":"
self.__print_buffer = f"HK Definition from Object ID {tm_packet.object_id:#010x} " \
f"and set ID {tm_packet.set_id}:"
self.__print_hk(tm_packet)
def __print_hk(self, tm_packet: Service3Base):
......@@ -199,8 +197,9 @@ class TmTcPrinter:
self.__print_buffer = "Valid: "
LOGGER.info(self.__print_buffer)
self.add_print_buffer_to_file_buffer()
self.__handle_validity_buffer_print(tm_packet.validity_buffer,
tm_packet.number_of_parameters)
self.__handle_validity_buffer_print(
tm_packet.validity_buffer, tm_packet.number_of_parameters
)
def __handle_validity_buffer_print(self, validity_buffer: bytearray, number_of_parameters):
"""
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment