diff --git a/tc/obsw_pus_tc_packer.py b/tc/obsw_pus_tc_packer.py index 4e73036583a1500e8782313c2d203bf515a3e31e..5d28ea0e0aed75927b85a7ee5fae6c2e1333ea51 100644 --- a/tc/obsw_pus_tc_packer.py +++ b/tc/obsw_pus_tc_packer.py @@ -69,6 +69,7 @@ def create_total_tc_queue() -> TcQueueT: tc_queue = pack_service8_test_into(tc_queue) tc_queue = pack_service9_test_into(tc_queue) tc_queue = pack_service17_test_into(tc_queue) + tc_queue = pack_service20_test_into(tc_queue) tc_queue = pack_service200_test_into(tc_queue) tc_queue = pack_dummy_device_test_into(tc_queue) object_id = g.GPS0_ObjectId diff --git a/tc/obsw_tc_service20.py b/tc/obsw_tc_service20.py index 1f607ca42c394fd3730a2afa9511de3255ac155f..51925b12188b07e3afcff1c73ea651ef1c647228 100644 --- a/tc/obsw_tc_service20.py +++ b/tc/obsw_tc_service20.py @@ -56,12 +56,12 @@ def pack_service20_test_into(tc_queue: Deque, called_externally: bool = False) - #tc_queue.appendleft(command.pack_command_tuple()) #test checking Load for uint32_t - tc_queue.appendleft(("print", "Testing Service 20: Load uint32_t")) - parameter_id = struct.pack(">I", parameterID0) - parameter_data = struct.pack(">I", 42) - payload = object_id + parameter_id + parameter_data - command = PusTelecommand(service=20, subservice=128, ssc=2001, app_data=payload) - tc_queue.appendleft(command.pack_command_tuple()) + #tc_queue.appendleft(("print", "Testing Service 20: Load uint32_t")) + #parameter_id = struct.pack(">I", parameterID0) + #parameter_data = struct.pack(">I", 42) + #payload = object_id + parameter_id + parameter_data + #command = PusTelecommand(service=20, subservice=128, ssc=2001, app_data=payload) + #tc_queue.appendleft(command.pack_command_tuple()) #test checking Dump for uint32_t tc_queue.appendleft(("print", "Testing Service 20: Dump uint32_t")) diff --git a/test/obsw_pus_service_test.py b/test/obsw_pus_service_test.py index 9ac82556edad1fe24c275e8f45aa7ffd89680064..07e9ff8f1c857602c9a846415a4b90a29c6bc508 100644 --- a/test/obsw_pus_service_test.py +++ b/test/obsw_pus_service_test.py @@ -11,7 +11,7 @@ from typing import Deque from test.obsw_module_test import TestService, PusTmInfoQueueT, TmDictionaryKeys, AssertionDictKeys from tc.obsw_pus_tc_base import PusTcInfoQueueT from tc.obsw_pus_tc_packer import pack_service17_test_into, pack_service5_test_into, \ - pack_service2_test_into, pack_service8_test_into, pack_service200_test_into + pack_service2_test_into, pack_service8_test_into, pack_service200_test_into, pack_service20_test_into import config.obsw_config as g from utility.obsw_logger import get_logger diff --git a/tm/obsw_pus_tm_factory.py b/tm/obsw_pus_tm_factory.py index f33e0340081795dc079f146e57f43190a5864c95..9ae03c4e5398b545b6714014cef3e01888ea65f5 100644 --- a/tm/obsw_pus_tm_factory.py +++ b/tm/obsw_pus_tm_factory.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- from typing import Deque, List, Tuple -from tm.obsw_pus_tm_base import PusTelemetry, PusTmInfoT +from tm.obsw_pus_tm_base import PusTelemetry, PusTmInfoT, TmDictionaryKeys from tm.obsw_tm_service_1 import Service1TM from tm.obsw_tm_service_3 import Service3TM from tm.obsw_tm_service_5 import Service5TM @@ -105,25 +105,50 @@ class Service17TM(PusTelemetry): class Service20TM(PusTelemetry): def __init__(self, byte_array): super().__init__(byte_array) + datasize = len(self._tm_data) self.objectId = struct.unpack('>I', self._tm_data[0:4])[0] self.parameter_id = struct.unpack('>I', self._tm_data[4:8])[0] - self.param = struct.unpack('>I', self._tm_data[8:12])[0] + if self.get_subservice() == 130: + self.type = struct.unpack('>H', self._tm_data[8:10])[0] + self.type_ptc = self._tm_data[8] + self.type_pfc = self._tm_data[9] + self.column = self._tm_data[10] + self.row = self._tm_data[11] + self.param = struct.unpack('>I', self._tm_data[12:datasize])[0] self.specify_packet_info("Functional Commanding Reply") def append_telemetry_content(self, array): super().append_telemetry_content(array) array.append(hex(self.objectId)) array.append(self.parameter_id) - array.append(str(hex(self.param))) + if self.get_subservice() == 130: + array.append(str("ptc: " + self.type_ptc + "| pfc: " + self.type_pfc)) + array.append(str(self.column)) + array.append(str(self.row)) + array.append(str(hex(self.param))) return def append_telemetry_column_headers(self, array): super().append_telemetry_column_headers(array) array.append("objectID") array.append("parameterID") - array.append("parameter") + if self.get_subservice() == 130: + array.append("type") + array.append("column") + array.append("row") + array.append("parameter") return + def pack_tm_information(self) -> PusTmInfoT: + tm_information = super().pack_tm_information() + add_information = { + TmDictionaryKeys.REPORTER_ID: self.objectId, + TmDictionaryKeys.EVENT_ID: self.parameter_id, + TmDictionaryKeys.EVENT_PARAM_1: self.param + } + tm_information.update(add_information) + return tm_information + class Service200TM(PusTelemetry): def __init__(self, byte_array): super().__init__(byte_array)