diff --git a/test/obsw_module_test.py b/test/obsw_module_test.py index 53bee92cde04f0a964c906295c11bbe3d93a2c32..25d856064108bc477a1477166e595fd409516b60 100644 --- a/test/obsw_module_test.py +++ b/test/obsw_module_test.py @@ -42,13 +42,13 @@ from typing import Deque from config import obsw_config as g from tc.obsw_pus_tc_packer import pack_dummy_device_test_into from tc.obsw_pus_tc_base import PusTcInfoQueueT, TcDictionaryKeys -from tm.obsw_tm_service_1 import pusPacketInfoService1T +from tm.obsw_tm_service_1 import PusPacketInfoService1T from tm.obsw_pus_tm_base import TmDictionaryKeys from tm.obsw_pus_tm_factory import PusTmInfoQueueT, PusTmInfoT from sendreceive.obsw_multiple_commands_sender_receiver import MultipleCommandSenderReceiver from utility.obsw_logger import get_logger -TmInfoQueueService1T = Deque[pusPacketInfoService1T] +TmInfoQueueService1T = Deque[PusPacketInfoService1T] LOGGER = get_logger() class TestService(unittest.TestCase): diff --git a/tm/obsw_tm_service_1.py b/tm/obsw_tm_service_1.py index b856d398df5126d9f921ccd48cfcdfbf85af5fdf..8faddf294fd0e1690a20611f097f063501cf7bcb 100644 --- a/tm/obsw_tm_service_1.py +++ b/tm/obsw_tm_service_1.py @@ -13,10 +13,13 @@ from tm.obsw_pus_tm_creator import PusTelemetryCreator from utility.obsw_logger import get_logger LOGGER = get_logger() -pusPacketInfoService1T = Dict[TmDictionaryKeys, any] +PusPacketInfoService1T = Dict[TmDictionaryKeys, any] class Service1TM(PusTelemetry): + """ + Service 1 TM class representation. Can be used to deserialize raw service 1 packets. + """ def __init__(self, byte_array: bytearray): super().__init__(byte_array) self.has_tc_error_code = False @@ -24,61 +27,81 @@ class Service1TM(PusTelemetry): # Failure Reports with error code self.err_code = 0 self.step_number = 0 - self.tcPacketId = self._tm_data[0] << 8 | self._tm_data[1] - self.tcSSC = ((self._tm_data[2] & 0x3F) << 8) | self._tm_data[3] - self.specify_packet_info("Success Verification") + self.tc_packet_id = self._tm_data[0] << 8 | self._tm_data[1] + self.tc_ssc = ((self._tm_data[2] & 0x3F) << 8) | self._tm_data[3] if self.get_subservice() % 2 == 0: - self.specify_packet_info("Failure Verficiation") - self.has_tc_error_code = True - if self.get_subservice() == 6: - self.is_step_reply = True - self.append_packet_info(" : Step Failure") - self.step_number = struct.unpack('>B', self._tm_data[4:5])[0] - self.err_code = struct.unpack('>H', self._tm_data[5:7])[0] - self.errorParam1 = struct.unpack('>I', self._tm_data[7:11])[0] - self.errorParam2 = struct.unpack('>I', self._tm_data[11:15])[0] - else: - self.print_source_data() - self.err_code = struct.unpack('>H', self._tm_data[4:6])[0] - self.errorParam1 = struct.unpack('>I', self._tm_data[6:10])[0] - self.errorParam2 = struct.unpack('>I', self._tm_data[10:14])[0] - - elif self.get_subservice() == 5: - self.is_step_reply = True - self.append_packet_info(" : Step Success") - self.step_number = struct.unpack('>B', self._tm_data[4:5])[0] + self.__handle_failure_verification() + else: + self.__handle_success_verification() - def append_telemetry_content(self, array): - super().append_telemetry_content(array) - array.append(str(hex(self.tcPacketId))) - array.append(str(self.tcSSC)) + def append_telemetry_content(self, content_list: list): + super().append_telemetry_content(content_list) + content_list.append(str(hex(self.tc_packet_id))) + content_list.append(str(self.tc_ssc)) if self.has_tc_error_code: if self.is_step_reply: - array.append(str(self.step_number)) - array.append(str(hex(self.err_code))) - array.append(str(hex(self.errorParam1)) + ", " + str(self.errorParam1)) - array.append(str(hex(self.errorParam2)) + ", " + str(self.errorParam2)) + content_list.append(str(self.step_number)) + content_list.append(str(hex(self.err_code))) + content_list.append(str(hex(self.error_param1)) + ", " + str(self.error_param1)) + content_list.append(str(hex(self.error_param2)) + ", " + str(self.error_param2)) elif self.is_step_reply: - array.append(str(self.step_number)) + content_list.append(str(self.step_number)) - def append_telemetry_column_headers(self, array): - super().append_telemetry_column_headers(array) - array.append("TC Packet ID") - array.append("TC SSC") + def append_telemetry_column_headers(self, header_list: list): + super().append_telemetry_column_headers(header_list) + header_list.append("TC Packet ID") + header_list.append("TC SSC") if self.has_tc_error_code: if self.is_step_reply: - array.append("Step Number") - array.append("Return Value") - array.append("Error Param 1") - array.append("Error Param 2") + header_list.append("Step Number") + header_list.append("Return Value") + header_list.append("Error Param 1") + header_list.append("Error Param 2") elif self.is_step_reply: - array.append("Step Number") + header_list.append("Step Number") + + def __handle_failure_verification(self): + self.specify_packet_info("Failure Verficiation") + self.has_tc_error_code = True + if self.get_subservice() == 2: + self.append_packet_info(" : Acceptance failure") + elif self.get_subservice() == 4: + self.append_packet_info(" : Start failure") + elif self.get_subservice() == 6: + self.is_step_reply = True + self.append_packet_info(" : Step Failure") + self.step_number = struct.unpack('>B', self._tm_data[4:5])[0] + self.err_code = struct.unpack('>H', self._tm_data[5:7])[0] + self.error_param1 = struct.unpack('>I', self._tm_data[7:11])[0] + self.error_param2 = struct.unpack('>I', self._tm_data[11:15])[0] + elif self.get_subservice() == 8: + self.print_source_data() + self.err_code = struct.unpack('>H', self._tm_data[4:6])[0] + self.error_param1 = struct.unpack('>I', self._tm_data[6:10])[0] + self.error_param2 = struct.unpack('>I', self._tm_data[10:14])[0] + else: + LOGGER.error("Service1TM: Invalid subservice") + + def __handle_success_verification(self): + self.specify_packet_info("Success Verification") + if self.get_subservice() == 1: + self.append_packet_info(" : Acceptance success") + elif self.get_subservice() == 3: + self.append_packet_info(" : Start success") + elif self.get_subservice() == 5: + self.is_step_reply = True + self.append_packet_info(" : Step Success") + self.step_number = struct.unpack('>B', self._tm_data[4:5])[0] + elif self.get_subservice() == 7: + self.append_packet_info(" : Completion success") + else: + LOGGER.error("Service1TM: Invalid subservice") - def pack_tm_information(self) -> pusPacketInfoService1T: + def pack_tm_information(self) -> PusPacketInfoService1T: tm_information = super().pack_tm_information() add_information = { - TmDictionaryKeys.TC_PACKET_ID: self.tcPacketId, - TmDictionaryKeys.TC_SSC: self.tcSSC, + TmDictionaryKeys.TC_PACKET_ID: self.tc_packet_id, + TmDictionaryKeys.TC_SSC: self.tc_ssc, } tm_information.update(add_information) if self.has_tc_error_code: @@ -89,11 +112,14 @@ class Service1TM(PusTelemetry): class Service1TmPacked(PusTelemetryCreator): - def __init__(self, subservice: int, ssc: int=0, tc_packet_id: int=0, tc_ssc: int=0): + """ + Class representation for Service 1 TM creation. + """ + def __init__(self, subservice: int, ssc: int = 0, tc_packet_id: int = 0, tc_ssc: int = 0): source_data = bytearray() source_data.append((tc_packet_id & 0xFF00) >> 8) source_data.append(tc_packet_id & 0xFF) - tc_psc = (tc_ssc & 0x3FFF) | (0b11 << 16) + tc_psc = (tc_ssc & 0x3FFF) | (0b11 << 16) source_data.append((tc_psc & 0xFF00) >> 8) source_data.append(tc_psc & 0xFF) super().__init__(service=1, subservice=subservice, ssc=ssc, source_data=source_data)