Something went wrong on our end
Forked from an inaccessible project.
-
Robin.Mueller authoredRobin.Mueller authored
obsw_pus_service_test.py 6.97 KiB
"""
@file obsw_pus_service_test.py
@date 01.11.2019
@brief Contains specific PUS service tests.
@author: R. Mueller
"""
import struct
import unittest
from typing import Deque
from test.obsw_module_test import TestService, PusTmInfoQueueT, TmDictionaryKeys
from tc.obsw_pus_tc_base import PusTcInfoQueueT
from tc.obsw_pus_tc_packer import pack_service17_test_into, pack_service5_test_into, \
pack_service2_test_into, pack_service8_test_into
import config.obsw_config as g
class TestService2(TestService):
"""
Test raw commanding service.
"""
@classmethod
def setUpClass(cls: TestService):
super().setUpClass()
print("Testing Service 2")
# all commands must be sent sequentially, not as a burst
cls.wait_intervals = [1, 2, 3, 4]
cls.wait_time = [2, 2, 2, 3]
pack_service2_test_into(cls.test_queue)
def test_service2(self):
"""
Tests the raw commanding service.
"""
assertion_dict = self.perform_testing_and_generate_assertion_dict()
self.event_expected = 2
self.misc_expected = 4
super()._perform_generic_assertion_test(assertion_dict)
def analyse_tm_info(self, tm_info_queue: PusTmInfoQueueT, assertion_dict: dict):
while not tm_info_queue.__len__() == 0:
current_tm_info = tm_info_queue.pop()
# Tc verification scanning is generic and has been moved to the superclass
if current_tm_info[TmDictionaryKeys.SERVICE] == 1:
super().scan_for_respective_tc(current_tm_info)
# Here, the desired event Id or RID can be specified
if current_tm_info[TmDictionaryKeys.SERVICE] == 5:
# mode change
if (current_tm_info[TmDictionaryKeys.REPORTER_ID] ==
struct.unpack('>I', g.DUMMY_DEVICE_ID)[0]) \
and (current_tm_info[TmDictionaryKeys.EVENT_ID] == 7401 or
current_tm_info[TmDictionaryKeys.EVENT_ID] == 7400):
self.event_counter = self.event_counter + 1
if current_tm_info[TmDictionaryKeys.SERVICE] == 200 and \
current_tm_info[TmDictionaryKeys.SUBSERVICE] == 6:
# mode change confirmation
self.misc_counter += 1
# wiretapping messages
elif current_tm_info[TmDictionaryKeys.SERVICE] == 2 and \
(current_tm_info[TmDictionaryKeys.SUBSERVICE] == 130 or
current_tm_info[TmDictionaryKeys.SUBSERVICE] == 131):
self.misc_counter += 1
if current_tm_info[TmDictionaryKeys.VALID] == 0:
self.valid = False
assertion_dict.update({"MiscCount": self.misc_counter})
class TestService5(TestService):
@classmethod
def setUpClass(cls: TestService):
super().setUpClass()
print("Testing Service 5")
# Wait intervals after TC 1,2 and 3 with specified wait times
# This is required because the OBSW tasks runs with fixed sequences
cls.wait_intervals = [1, 2, 3]
cls.wait_time = [1.5, 2.0, 2.0]
pack_service5_test_into(cls.test_queue)
def test_Service5(self):
# analyseTmInfo() and analyseTcInfo are called here
super()._perform_service5or17_test()
def analyse_tc_info(self, tc_info_queue: PusTcInfoQueueT):
super().analyse_tc_info(tc_info_queue)
def analyse_tm_info(self, tm_info_queue: PusTmInfoQueueT, assertion_dict: dict):
super()._analyse_service5or17_tm(tm_info_queue, assertion_dict)
@classmethod
def tearDownClass(cls):
super().tearDownClass()
class TestService6(unittest.TestCase):
@classmethod
def setUpClass(cls):
print("Hallo Test 6!")
class TestService8(TestService):
@classmethod
def setUpClass(cls: TestService):
super().setUpClass()
print("Testing Service 8")
cls.wait_intervals = [1, 2, 3, 4]
cls.wait_time = [1.5, 0.7, 1.0, 1.1]
cls.data_reply_count = 0
pack_service8_test_into(cls.test_queue)
def test_Service8(self):
assertion_dict = self.perform_testing_and_generate_assertion_dict()
self.event_expected = 4
self.misc_expected = 2
self.data_reply_expected = 1
# One reply generates an additional step.
self.tc_verify_step_counter += 1
self.assertEqual(assertion_dict["TcStepCount"], self.tc_verify_step_counter)
self.assertEqual(self.data_reply_count, self.data_reply_expected)
super()._perform_generic_assertion_test(assertion_dict)
def analyse_tm_info(self, tm_info_queue: Deque, assertion_dict: dict):
while not tm_info_queue.__len__() == 0:
current_tm_info = tm_info_queue.pop()
if current_tm_info[TmDictionaryKeys.SERVICE] == 1:
self.scan_for_respective_tc(current_tm_info)
if current_tm_info[TmDictionaryKeys.SERVICE] == 5:
if (current_tm_info[TmDictionaryKeys.REPORTER_ID] ==
struct.unpack('>I', g.DUMMY_DEVICE_ID)[0]) and \
(current_tm_info[TmDictionaryKeys.EVENT_ID] == 7401
or current_tm_info[TmDictionaryKeys.EVENT_ID] == 7400):
self.event_counter += 1
if current_tm_info[TmDictionaryKeys.SERVICE] == 200:
# mode change confirmation
self.misc_counter += 1
if (current_tm_info[TmDictionaryKeys.SERVICE] == 8 and
current_tm_info[TmDictionaryKeys.SUBSERVICE] == 130):
self.data_reply_count += 1
class TestService9(unittest.TestCase):
@classmethod
def setUpClass(cls):
print("Hallo Test 9!")
class TestService17(TestService):
@classmethod
def setUpClass(cls: TestService):
super().setUpClass()
print("Testing Service 17")
cls.wait_intervals = [2]
cls.wait_time = 2
cls.tm_timeout = g.G_TM_TIMEOUT
pack_service17_test_into(cls.test_queue)
def test_Service17(self):
super()._perform_service5or17_test()
def _analyse_tm_tc_info(self, tm_info_queue: PusTmInfoQueueT, tc_info_queue: PusTcInfoQueueT):
assertion_dict = super()._analyse_tm_tc_info(tm_info_queue=tm_info_queue,
tc_info_queue=tc_info_queue)
# add anything elsee other than tc verification counter
# and ssc that is needed for tm analysis
return assertion_dict
def analyse_tc_info(self, tc_info_queue):
super().analyse_tc_info(tc_info_queue)
def analyse_tm_info(self, tm_info_queue: PusTmInfoQueueT, assertion_dict: dict):
super()._analyse_service5or17_tm(tm_info_queue, assertion_dict)
@classmethod
def tearDownClass(cls):
print("Testing Service 17 finished")
super().tearDownClass()