diff --git a/tc/obsw_pus_tc_packer.py b/tc/obsw_pus_tc_packer.py index af185553ca87016c31d7cffbc6bad8809a612d85..4e73036583a1500e8782313c2d203bf515a3e31e 100644 --- a/tc/obsw_pus_tc_packer.py +++ b/tc/obsw_pus_tc_packer.py @@ -14,6 +14,7 @@ from tc.obsw_tc_service2 import pack_service2_test_into from tc.obsw_tc_service3 import pack_service3_test_into from tc.obsw_tc_service8 import pack_service8_test_into from tc.obsw_tc_service9 import pack_service9_test_into +from tc.obsw_tc_service20 import pack_service20_test_into from tc.obsw_tc_service200 import pack_mode_data, pack_service200_test_into from tc.obsw_tc_service5_17 import pack_service5_test_into, pack_service17_test_into from tc.obsw_tc_gps import pack_gps_test_into @@ -38,6 +39,8 @@ def pack_service_queue(service: Union[int, str], service_queue: TcQueueT): return pack_service9_test_into(service_queue) if service == 17: return pack_service17_test_into(service_queue) + if service == 20: + return pack_service20_test_into(service_queue) if service == 200: return pack_service200_test_into(service_queue) if service == "Dummy": diff --git a/tc/obsw_tc_service20.py b/tc/obsw_tc_service20.py new file mode 100644 index 0000000000000000000000000000000000000000..2d8756960696d35ab68eda4966833241f761311c --- /dev/null +++ b/tc/obsw_tc_service20.py @@ -0,0 +1,149 @@ +# -*- coding: utf-8 -*- +""" +@file obsw_tc_service20.py +@brief PUS Service 20: Parameter management. +@author J. Gerhards +@date 30.06.2020 +""" +import struct +from typing import Deque + +import config.obsw_config as g +from tc.obsw_pus_tc_base import PusTelecommand +from tc.obsw_tc_service200 import pack_mode_data + + +def pack_service20_test_into(tc_queue: Deque, called_externally: bool = False) -> Deque: + #parameter IDs + parameterID0 = 0 + parameterID1 = 1 + parameterID2 = 2 + + if called_externally is False: + tc_queue.appendleft(("print", "Testing Service 20")) + object_id = g.DUMMY_DEVICE_ID + + # set mode normal + tc_queue.appendleft(("print", "Testing Service 20: Set Normal Mode")) + mode_data = pack_mode_data(object_id, 2, 0) + command = PusTelecommand(service=200, subservice=1, ssc=2000, app_data=mode_data) + tc_queue.appendleft(command.pack_command_tuple()) + + #test invalid subservice + #use subservice 130 for invalid subservice check, as this is in use for dump reply + #(and therefore will never be a valid subservice) + #tc_queue.appendleft(("print", "Testing Service 20: Invalid subservice")) + #mode_data = pack_mode_data(object_id, 2, 0) + #command = PusTelecommand(service=20, subservice=130, ssc=810, app_data=mode_data) + #tc_queue.appendleft(command.pack_command_tuple()) + + #test invalid objectid //TODO: do we have an objectid known to be empty (even in future)? + #tc_queue.appendleft(("print", "Testing Service 20: Invalid object ID")) + #mode_data = pack_mode_data(object_id, 2, 0) + #command = PusTelecommand(service=20, subservice=128, ssc=810, app_data=mode_data) + #tc_queue.appendleft(command.pack_command_tuple()) + + #test invalid parameterID for load + #tc_queue.appendleft(("print", "Testing Service 20: Invalid parameter ID for load")) + #mode_data = pack_mode_data(object_id, 2, 0) + #command = PusTelecommand(service=20, subservice=128, ssc=810, app_data=mode_data) + #tc_queue.appendleft(command.pack_command_tuple()) + + #test invalid parameterID for dump + #tc_queue.appendleft(("print", "Testing Service 20: Invalid parameter ID for dump")) + #mode_data = pack_mode_data(object_id, 2, 0) + #command = PusTelecommand(service=20, subservice=129, ssc=810, app_data=mode_data) + #tc_queue.appendleft(command.pack_command_tuple()) + + #test checking Load for uint32_t + tc_queue.appendleft(("print", "Testing Service 20: Load uint32_t")) + parameter_id = struct.pack(">I", parameterID0) + parameter_data = struct.pack(">I", 42) + payload = object_id + parameter_id + parameter_data + command = PusTelecommand(service=20, subservice=128, ssc=2001, app_data=payload) + tc_queue.appendleft(command.pack_command_tuple()) + + #test checking Dump for uint32_t + tc_queue.appendleft(("print", "Testing Service 20: Dump uint32_t")) + parameter_id = struct.pack(">I", parameterID0) + payload = object_id + parameter_id + command = PusTelecommand(service=20, subservice=129, ssc=2001, app_data=payload) + tc_queue.appendleft(command.pack_command_tuple()) + + if called_externally is False: + tc_queue.appendleft(("export", "log/tmtc_log_service20.txt")) + return tc_queue + + +""" + #test checking Load for int32_t + tc_queue.appendleft(("print", "Testing Service 20: Load int32_t")) + mode_data = pack_mode_data(object_id, 2, 0) + command = PusTelecommand(service=20, subservice=128, ssc=2003, app_data=mode_data) + tc_queue.appendleft(command.pack_command_tuple()) + + #test checking Dump for int32_t + tc_queue.appendleft(("print", "Testing Service 20: Dump int32_t")) + mode_data = pack_mode_data(object_id, 2, 0) + command = PusTelecommand(service=20, subservice=129, ssc=2004, app_data=mode_data) + tc_queue.appendleft(command.pack_command_tuple()) + + #test checking Load for float + tc_queue.appendleft(("print", "Testing Service 20: Load float")) + mode_data = pack_mode_data(object_id, 2, 0) + command = PusTelecommand(service=20, subservice=128, ssc=2005, app_data=mode_data) + tc_queue.appendleft(command.pack_command_tuple()) + + #test checking Dump for float + tc_queue.appendleft(("print", "Testing Service 20: Dump float")) + mode_data = pack_mode_data(object_id, 2, 0) + command = PusTelecommand(service=20, subservice=129, ssc=2006, app_data=mode_data) + tc_queue.appendleft(command.pack_command_tuple()) +""" + + + + +""" + # set mode on + tc_queue.appendleft(("print", "Testing Service 8: Set On Mode")) + mode_data = pack_mode_data(object_id, 1, 0) + command = PusTelecommand(service=200, subservice=1, ssc=800, app_data=mode_data) + tc_queue.appendleft(command.pack_command_tuple()) + + # set mode normal + tc_queue.appendleft(("print", "Testing Service 8: Set Normal Mode")) + mode_data = pack_mode_data(object_id, 2, 0) + command = PusTelecommand(service=200, subservice=1, ssc=810, app_data=mode_data) + tc_queue.appendleft(command.pack_command_tuple()) + + # Direct command which triggers completion reply + tc_queue.appendleft(("print", "Testing Service 8: Trigger Completion Reply")) + action_id = g.DUMMY_COMMAND_1 + direct_command = object_id + action_id + command = PusTelecommand(service=8, subservice=128, ssc=820, app_data=direct_command) + tc_queue.appendleft(command.pack_command_tuple()) + + # Direct command which triggers _tm_data reply + tc_queue.appendleft(("print", "Testing Service 8: Trigger Data Reply")) + action_id = g.DUMMY_COMMAND_2 + command_param1 = g.DUMMY_COMMAND_2_PARAM_1 + command_param2 = g.DUMMY_COMMAND_2_PARAM_2 + direct_command = object_id + action_id + command_param1 + command_param2 + command = PusTelecommand(service=8, subservice=128, ssc=830, app_data=direct_command) + tc_queue.appendleft(command.pack_command_tuple()) + + # Direct command which triggers an additional step reply and one completion reply + tc_queue.appendleft(("print", "Testing Service 8: Trigger Step and Completion Reply")) + action_id = g.DUMMY_COMMAND_3 + direct_command = object_id + action_id + command = PusTelecommand(service=8, subservice=128, ssc=840, app_data=direct_command) + tc_queue.appendleft(command.pack_command_tuple()) + + # set mode off + tc_queue.appendleft(("print", "Testing Service 8: Set Off Mode")) + mode_data = pack_mode_data(object_id, 0, 0) + command = PusTelecommand(service=200, subservice=1, ssc=800, app_data=mode_data) + tc_queue.appendleft(command.pack_command_tuple()) + tc_queue.appendleft(("wait", 2)) +""" diff --git a/test/obsw_pus_service_test.py b/test/obsw_pus_service_test.py index f314c2acaae20024e71bc5954ff14b7e65b83385..9ac82556edad1fe24c275e8f45aa7ffd89680064 100644 --- a/test/obsw_pus_service_test.py +++ b/test/obsw_pus_service_test.py @@ -185,6 +185,43 @@ class TestService17(TestService): print("Testing Service 17 finished") super().tearDownClass() +class TestService20(TestService): #TODO: implement correctly + + @classmethod + def setUpClass(cls: TestService): + super().setUpClass() + LOGGER.info("Testing Service 20") + cls.wait_intervals = [1, 2, 3, 4, 5] + cls.wait_time = [1.5, 1.5, 2.2, 2.2, 2.0] + cls.data_reply_count = 0 + pack_service20_test_into(cls.test_queue) + + def test_Service20(self): + + assertion_dict = self.perform_testing_and_generate_assertion_dict() + # 3 x Mode changes + self.misc_expected = 3 + # 2 x Event per mode change + self.event_expected = 6 + self.data_reply_expected = 1 + # One reply generates an additional step. + self.tc_verify_step_counter += 1 + self.assertEqual(assertion_dict[AssertionDictKeys.TC_STEP_COUNT], + self.tc_verify_step_counter) + + self.assertEqual(self.data_reply_count, self.data_reply_expected) + super()._perform_generic_assertion_test(assertion_dict) + + def analyse_tm_info(self, tm_info_queue: Deque, assertion_dict: dict): + while not tm_info_queue.__len__() == 0: + current_tm_info = tm_info_queue.pop() + if current_tm_info[TmDictionaryKeys.SERVICE] == 1: + self.scan_for_respective_tc(current_tm_info) + if (current_tm_info[TmDictionaryKeys.SERVICE] == 8 and + current_tm_info[TmDictionaryKeys.SUBSERVICE] == 130): + self.data_reply_count += 1 + self._generic_mode_tm_check(current_tm_info, g.DUMMY_DEVICE_ID, [7401, 7400]) + class TestService200(TestService): @classmethod diff --git a/tm/obsw_pus_tm_factory.py b/tm/obsw_pus_tm_factory.py index f6b9be1758556e93072c073790a71bca8ad6027a..fe6857acd498416d6538690fcc59076be1be1f63 100644 --- a/tm/obsw_pus_tm_factory.py +++ b/tm/obsw_pus_tm_factory.py @@ -38,6 +38,8 @@ class PusTelemetryFactory(object): return Service8TM(raw_tm_packet) if service_type == 17: return Service17TM(raw_tm_packet) + if service_type == 20: + return Service20TM(raw_tm_packet) if service_type == 200: return Service200TM(raw_tm_packet) print("The service " + str(service_type) + " is not implemented in Telemetry Factory") @@ -100,6 +102,21 @@ class Service17TM(PusTelemetry): super().append_telemetry_column_headers(array) return +class Service20TM(PusTelemetry): + def __init__(self, byte_array): + super().__init__(byte_array) + self.parameter_id = struct.unpack('>I', self._tm_data[0:4])[0] + self.specify_packet_info("Functional Commanding Reply") + + def append_telemetry_content(self, array): + super().append_telemetry_content(array) + array.append(self.parameter_id) + return + + def append_telemetry_column_headers(self, array): + super().append_telemetry_column_headers(array) + array.append("param0_dump_repl") + return class Service200TM(PusTelemetry): def __init__(self, byte_array):