diff --git a/tc/obsw_tc_service20.py b/tc/obsw_tc_service20.py new file mode 100644 index 0000000000000000000000000000000000000000..e86b0cb859a9f48cc3e63ea3b283b5184d184f19 --- /dev/null +++ b/tc/obsw_tc_service20.py @@ -0,0 +1,90 @@ +# -*- coding: utf-8 -*- +""" +@file obsw_tc_service20.py +@brief PUS Service 20: Parameter management. +@author J. Gerhards +@date 30.06.2020 +""" +from typing import Deque + +import config.obsw_config as g +from tc.obsw_pus_tc_base import PusTelecommand +from tc.obsw_tc_service200 import pack_mode_data + + +def pack_service20_test_into(tc_queue: Deque, called_externally: bool = False) -> Deque: + if called_externally is False: + tc_queue.appendleft(("print", "Testing Service 20")) + object_id = g.DUMMY_DEVICE_ID + + #test invalid subservice + + #test invalid objectid //TODO: do we have an objectid known to be empty (even in future)? + + #test invalid parameterID for load + + #test invalid parameterID for dump + + #test checking Load for uint32_t + + #test checking Dump for uint32_t + + #test checking Load for int32_t + + #test checking Dump for int32_t + + #test checking Load for float + + #test checking Dump for float + + + + if called_externally is False: + tc_queue.appendleft(("export", "log/tmtc_log_service8.txt")) + return tc_queue + + + + + # set mode on + tc_queue.appendleft(("print", "Testing Service 8: Set On Mode")) + mode_data = pack_mode_data(object_id, 1, 0) + command = PusTelecommand(service=200, subservice=1, ssc=800, app_data=mode_data) + tc_queue.appendleft(command.pack_command_tuple()) + + # set mode normal + tc_queue.appendleft(("print", "Testing Service 8: Set Normal Mode")) + mode_data = pack_mode_data(object_id, 2, 0) + command = PusTelecommand(service=200, subservice=1, ssc=810, app_data=mode_data) + tc_queue.appendleft(command.pack_command_tuple()) + + # Direct command which triggers completion reply + tc_queue.appendleft(("print", "Testing Service 8: Trigger Completion Reply")) + action_id = g.DUMMY_COMMAND_1 + direct_command = object_id + action_id + command = PusTelecommand(service=8, subservice=128, ssc=820, app_data=direct_command) + tc_queue.appendleft(command.pack_command_tuple()) + + # Direct command which triggers _tm_data reply + tc_queue.appendleft(("print", "Testing Service 8: Trigger Data Reply")) + action_id = g.DUMMY_COMMAND_2 + command_param1 = g.DUMMY_COMMAND_2_PARAM_1 + command_param2 = g.DUMMY_COMMAND_2_PARAM_2 + direct_command = object_id + action_id + command_param1 + command_param2 + command = PusTelecommand(service=8, subservice=128, ssc=830, app_data=direct_command) + tc_queue.appendleft(command.pack_command_tuple()) + + # Direct command which triggers an additional step reply and one completion reply + tc_queue.appendleft(("print", "Testing Service 8: Trigger Step and Completion Reply")) + action_id = g.DUMMY_COMMAND_3 + direct_command = object_id + action_id + command = PusTelecommand(service=8, subservice=128, ssc=840, app_data=direct_command) + tc_queue.appendleft(command.pack_command_tuple()) + + # set mode off + tc_queue.appendleft(("print", "Testing Service 8: Set Off Mode")) + mode_data = pack_mode_data(object_id, 0, 0) + command = PusTelecommand(service=200, subservice=1, ssc=800, app_data=mode_data) + tc_queue.appendleft(command.pack_command_tuple()) + tc_queue.appendleft(("wait", 2)) + diff --git a/test/obsw_pus_service_test.py b/test/obsw_pus_service_test.py index 3aa61d557b36c27f7cb1a014500275de692db006..e546a92aafc466e9a2dafdf673537f92ce030826 100644 --- a/test/obsw_pus_service_test.py +++ b/test/obsw_pus_service_test.py @@ -185,6 +185,43 @@ class TestService17(TestService): print("Testing Service 17 finished") super().tearDownClass() +class TestService20(TestService): #TODO: implement correctly + + @classmethod + def setUpClass(cls: TestService): + super().setUpClass() + LOGGER.info("Testing Service 20") + cls.wait_intervals = [1, 2, 3, 4, 5] + cls.wait_time = [1.5, 1.5, 2.2, 2.2, 2.0] + cls.data_reply_count = 0 + pack_service20_test_into(cls.test_queue) + + def test_Service8(self): + + assertion_dict = self.perform_testing_and_generate_assertion_dict() + # 3 x Mode changes + self.misc_expected = 3 + # 2 x Event per mode change + self.event_expected = 6 + self.data_reply_expected = 1 + # One reply generates an additional step. + self.tc_verify_step_counter += 1 + self.assertEqual(assertion_dict[AssertionDictKeys.TC_STEP_COUNT], + self.tc_verify_step_counter) + + self.assertEqual(self.data_reply_count, self.data_reply_expected) + super()._perform_generic_assertion_test(assertion_dict) + + def analyse_tm_info(self, tm_info_queue: Deque, assertion_dict: dict): + while not tm_info_queue.__len__() == 0: + current_tm_info = tm_info_queue.pop() + if current_tm_info[TmDictionaryKeys.SERVICE] == 1: + self.scan_for_respective_tc(current_tm_info) + if (current_tm_info[TmDictionaryKeys.SERVICE] == 8 and + current_tm_info[TmDictionaryKeys.SUBSERVICE] == 130): + self.data_reply_count += 1 + self._generic_mode_tm_check(current_tm_info, g.DUMMY_DEVICE_ID, [7401, 7400]) + class TestService200(TestService): @classmethod