Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • Robin.Mueller/tmtc
1 result
Show changes
Showing
with 247 additions and 226 deletions
from typing import Deque from typing import Deque
from config.obsw_config import CORE_CONTROLLER_ID from config.tmtcc_config import CORE_CONTROLLER_ID
from tc.obsw_tc_service8 import make_action_id from tmtc_core.pus_tc.tmtcc_tc_service8_functional_cmd import make_action_id
from tmtc_core.tc.obsw_pus_tc_base import PusTelecommand from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand
def pack_core_command(tc_queue: Deque, op_code, ssc: int = 0): def pack_core_command(tc_queue: Deque, op_code, ssc: int = 0):
......
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
""" """
@file obsw_tc_gps.py @file tmtcc_tc_gps.py
@brief GPS Device: GPS device testing @brief GPS Device: GPS device testing
@author R. Mueller @author R. Mueller
@date 02.05.2020 @date 02.05.2020
""" """
from tc.obsw_pus_tc_packer import TcQueueT, PusTelecommand from tmtc_core.pus_tc.tmtcc_pus_tc_packer import TcQueueT
from tc.obsw_tc_service2 import pack_mode_data from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand
from pus_tc.tmtcc_tc_service2_raw_cmd import pack_mode_data
import config.tmtcc_config as g
import config.obsw_config as g
def pack_gps_test_into(object_id: bytearray, tc_queue: TcQueueT) -> TcQueueT: def pack_gps_test_into(object_id: bytearray, tc_queue: TcQueueT) -> TcQueueT:
......
from typing import Union from typing import Union
from tmtc_core.tc.obsw_pus_tc_base import PusTelecommand, Deque from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand, Deque
from tc.obsw_tc_service8 import make_action_id from tmtc_core.pus_tc.tmtcc_tc_service8_functional_cmd import make_action_id
from tc.obsw_tc_service20 import pack_boolean_parameter_setting from pus_tc.tmtcc_tc_service20_parameter import pack_boolean_parameter_setting
from tmtc_core.utility.obsw_logger import get_logger from tmtc_core.utility.obsw_logger import get_logger
from config.obsw_config import SW_IMAGE_HANDLER_ID from config.tmtcc_config import SW_IMAGE_HANDLER_ID
LOGGER = get_logger() LOGGER = get_logger()
......
# -*- coding: utf-8 -*- import os
""" from collections import deque
@file obsw_tc_packer.py from typing import Union
@brief Packs the TC queue for specific G_SERVICE or device testing
@details import config.tmtcc_config as g
Contains the sevice packet which can pack specific service queues (hardcoded for now) from tmtc_core.utility.obsw_logger import get_logger
@author R. Mueller from tmtc_core.pus_tc.tmtcc_pus_tc_base import TcQueueT, PusTelecommand
@date 01.11.2019
""" from pus_tc.tmtcc_tc_service2_raw_cmd import pack_service2_test_into
import os from pus_tc.tmtcc_tc_service3_housekeeping import pack_service3_test_into
from pus_tc.tmtcc_tc_service8_func_cmd import pack_service8_test_into
from tmtc_core.tc.obsw_pus_tc_base import PusTelecommand, TcQueueT from pus_tc.tmtcc_tc_service9_time import pack_service9_test_into
from tc.obsw_tc_service2 import pack_service2_test_into from pus_tc.tmtcc_tc_service23_sdcard import pack_service23_commands_into
from tc.obsw_tc_service3 import pack_service3_test_into from pus_tc.tmtcc_tc_service20_parameter import pack_service20_test_into
from tc.obsw_tc_service8 import pack_service8_test_into from pus_tc.tmtcc_pus_tc_utility import pack_utility_command
from tc.obsw_tc_service9 import pack_service9_test_into from pus_tc.tmtcc_tc_service200_mode import pack_mode_data, pack_service200_test_into
from tc.obsw_tc_service23_sdcard import pack_service23_commands_into from pus_tc.tmtcc_tc_service5_event import pack_service5_test_into
from tc.obsw_tc_service20 import pack_service20_test_into from pus_tc.tmtcc_service17_test import pack_service17_test_into
from tc.obsw_tc_utility import pack_utility_command from pus_tc.tmtcc_tc_image_handler import generate_img_handler_packet
from tc.obsw_tc_service200 import pack_mode_data, pack_service200_test_into from pus_tc.tmtcc_tc_gps import pack_gps_test_into
from tc.obsw_tc_service5_17 import pack_service5_test_into, pack_service17_test_into from pus_tc.tmtcc_tc_core import pack_core_command
from tc.obsw_image_handler import generate_img_handler_packet
from tc.obsw_tc_gps import pack_gps_test_into
from tc.obsw_tc_core import pack_core_command LOGGER = get_logger()
from tmtc_core.utility.obsw_logger import get_logger def pack_service_queue(service: Union[int, str], op_code: int, service_queue: TcQueueT):
import config.obsw_config as g if service == 2:
from collections import deque return pack_service2_test_into(service_queue)
from typing import Union if service == 3:
return pack_service3_test_into(service_queue, op_code)
LOGGER = get_logger() if service == 5:
return pack_service5_test_into(service_queue)
if service == 8:
class ServiceQueuePacker: return pack_service8_test_into(service_queue)
def __init__(self): if service == 9:
pass return pack_service9_test_into(service_queue)
if service == 17:
@staticmethod return pack_service17_test_into(service_queue, op_code)
def pack_service_queue(service: Union[int, str], op_code: int, service_queue: TcQueueT): if service == 20:
if service == 2: return pack_service20_test_into(service_queue)
return pack_service2_test_into(service_queue) if service == 23 or service.lower() == "sd":
if service == 3: return pack_service23_commands_into(service_queue, op_code)
return pack_service3_test_into(service_queue) if service == 200:
if service == 5: return pack_service200_test_into(service_queue)
return pack_service5_test_into(service_queue) if service.lower() == "dummy":
if service == 8: return pack_dummy_device_test_into(service_queue)
return pack_service8_test_into(service_queue) if service.lower() == "img":
if service == 9: return generate_img_handler_packet(service_queue, op_code)
return pack_service9_test_into(service_queue) if service.lower() == "core":
if service == 17: return pack_core_command(service_queue, op_code)
return pack_service17_test_into(service_queue, op_code) if service.lower() == "led":
if service == 20: return pack_utility_command(service_queue, op_code)
return pack_service20_test_into(service_queue) if service.lower() == "gps0":
if service == 23 or service.lower() == "sd": # Object ID: GPS Device
return pack_service23_commands_into(service_queue, op_code) object_id = g.GPS0_DEVICE_ID
if service == 200: return pack_gps_test_into(object_id, service_queue)
return pack_service200_test_into(service_queue) if service.lower() == "gps1":
if service.lower() == "dummy": # Object ID: GPS Device
return pack_dummy_device_test_into(service_queue) object_id = g.GPS1_DEVICE_ID
if service.lower() == "img": return pack_gps_test_into(object_id, service_queue)
return generate_img_handler_packet(service_queue, op_code) if service.lower() == "Error":
if service.lower() == "core": return pack_error_testing_into(service_queue)
return pack_core_command(service_queue, op_code) LOGGER.warning("Invalid Service !")
if service.lower() == "led":
return pack_utility_command(service_queue, op_code) # TODO: a way to select certain services would be nice (by passing a dict or array maybe)
if service.lower() == "gps0": def create_total_tc_queue() -> TcQueueT:
# Object ID: GPS Device if not os.path.exists("log"):
object_id = g.GPS0_DEVICE_ID os.mkdir("log")
return pack_gps_test_into(object_id, service_queue) tc_queue = deque()
if service.lower() == "gps1": tc_queue = pack_service2_test_into(tc_queue)
# Object ID: GPS Device tc_queue = pack_service3_test_into(tc_queue)
object_id = g.GPS1_DEVICE_ID tc_queue = pack_service5_test_into(tc_queue)
return pack_gps_test_into(object_id, service_queue) tc_queue = pack_service8_test_into(tc_queue)
if service.lower() == "Error": tc_queue = pack_service9_test_into(tc_queue)
return pack_error_testing_into(service_queue) tc_queue = pack_service17_test_into(tc_queue)
LOGGER.warning("Invalid Service !") tc_queue = pack_service20_test_into(tc_queue)
tc_queue = pack_service200_test_into(tc_queue)
tc_queue = pack_dummy_device_test_into(tc_queue)
# TODO: a way to select certain services would be nice (by passing a dict or array maybe) object_id = g.GPS0_DEVICE_ID
def create_total_tc_queue() -> TcQueueT: tc_queue = pack_gps_test_into(object_id, tc_queue)
if not os.path.exists("log"): return tc_queue
os.mkdir("log")
tc_queue = deque()
tc_queue = pack_service2_test_into(tc_queue)
tc_queue = pack_service3_test_into(tc_queue)
tc_queue = pack_service5_test_into(tc_queue)
tc_queue = pack_service8_test_into(tc_queue) def pack_dummy_device_test_into(tc_queue: TcQueueT) -> TcQueueT:
tc_queue = pack_service9_test_into(tc_queue) tc_queue.appendleft(("print", "Testing Dummy Device"))
tc_queue = pack_service17_test_into(tc_queue) # Object ID: Dummy Device
tc_queue = pack_service20_test_into(tc_queue) object_id = g.DUMMY_DEVICE_ID
tc_queue = pack_service200_test_into(tc_queue) # Set On Mode
tc_queue = pack_dummy_device_test_into(tc_queue) tc_queue.appendleft(("print", "Testing Service Dummy: Set On"))
object_id = g.GPS0_DEVICE_ID mode_data = pack_mode_data(object_id, 1, 0)
tc_queue = pack_gps_test_into(object_id, tc_queue) command = PusTelecommand(service=200, subservice=1, ssc=1, app_data=mode_data)
return tc_queue tc_queue.appendleft(command.pack_command_tuple())
# Test Service 2 commands
tc_queue.appendleft(("print", "Testing Service Dummy: Service 2"))
def pack_dummy_device_test_into(tc_queue: TcQueueT) -> TcQueueT: pack_service2_test_into(tc_queue, True)
tc_queue.appendleft(("print", "Testing Dummy Device")) # Test Service 8
# Object ID: Dummy Device tc_queue.appendleft(("print", "Testing Service Dummy: Service 8"))
object_id = g.DUMMY_DEVICE_ID pack_service8_test_into(tc_queue, True)
# Set On Mode tc_queue.appendleft(("export", "log/tmtc_log_service_dummy.txt"))
tc_queue.appendleft(("print", "Testing Service Dummy: Set On")) return tc_queue
mode_data = pack_mode_data(object_id, 1, 0)
command = PusTelecommand(service=200, subservice=1, ssc=1, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple()) def pack_error_testing_into(tc_queue: TcQueueT) -> TcQueueT:
# Test Service 2 commands # a lot of events
tc_queue.appendleft(("print", "Testing Service Dummy: Service 2")) command = PusTelecommand(service=17, subservice=129, ssc=2010)
pack_service2_test_into(tc_queue, True) tc_queue.appendleft(command.pack_command_tuple())
# Test Service 8 # a lot of ping testing
tc_queue.appendleft(("print", "Testing Service Dummy: Service 8")) command = PusTelecommand(service=17, subservice=130, ssc=2020)
pack_service8_test_into(tc_queue, True) tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft(("export", "log/tmtc_log_service_dummy.txt")) return tc_queue
return tc_queue \ No newline at end of file
def pack_error_testing_into(tc_queue: TcQueueT) -> TcQueueT:
# a lot of events
command = PusTelecommand(service=17, subservice=129, ssc=2010)
tc_queue.appendleft(command.pack_command_tuple())
# a lot of ping testing
command = PusTelecommand(service=17, subservice=130, ssc=2020)
tc_queue.appendleft(command.pack_command_tuple())
return tc_queue
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
""" """
@file obsw_tc_service200.py @file tmtcc_tc_service200_mode.py
@brief PUS Service 200: PUS custom service 200: Mode commanding @brief PUS Service 200: PUS custom service 200: Mode commanding
@author R. Mueller @author R. Mueller
@date 02.05.2020 @date 02.05.2020
""" """
from tmtc_core.tc.obsw_pus_tc_base import PusTelecommand from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand
from tc.obsw_pus_tc_packer import TcQueueT from tmtc_core.pus_tc.tmtcc_pus_tc_packer import TcQueueT
import config.obsw_config as g import config.tmtcc_config as g
import struct import struct
......
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
""" """
@file obsw_tc_service20.py @file tmtcc_tc_service20_parameter.py
@brief PUS Service 20: Parameter management. @brief PUS Service 20: Parameter management.
@author J. Gerhards @author J. Gerhards
@date 30.06.2020 @date 30.06.2020
...@@ -8,10 +8,10 @@ ...@@ -8,10 +8,10 @@
import struct import struct
from typing import Deque from typing import Deque
import config.obsw_config as g import config.tmtcc_config as g
from tmtc_core.tc.obsw_pus_tc_base import PusTelecommand, TcQueueT from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand, TcQueueT
from tmtc_core.utility.obsw_logger import get_logger from tmtc_core.utility.obsw_logger import get_logger
from tc.obsw_tc_service200 import pack_mode_data from pus_tc.tmtcc_tc_service200_mode import pack_mode_data
LOGGER = get_logger() LOGGER = get_logger()
...@@ -52,9 +52,9 @@ def pack_boolean_parameter_setting(object_id: bytearray, domain_id: int, ...@@ -52,9 +52,9 @@ def pack_boolean_parameter_setting(object_id: bytearray, domain_id: int,
def pack_service20_test_into(tc_queue: Deque, called_externally: bool = False) -> Deque: def pack_service20_test_into(tc_queue: Deque, called_externally: bool = False) -> Deque:
# parameter IDs # parameter IDs
parameterID0 = 0 parameter_id0 = 0
parameterID1 = 256 parameter_id1 = 256
parameterID2 = 512 parameter_id2 = 512
if called_externally is False: if called_externally is False:
tc_queue.appendleft(("print", "Testing Service 20")) tc_queue.appendleft(("print", "Testing Service 20"))
...@@ -68,48 +68,48 @@ def pack_service20_test_into(tc_queue: Deque, called_externally: bool = False) - ...@@ -68,48 +68,48 @@ def pack_service20_test_into(tc_queue: Deque, called_externally: bool = False) -
# test checking Load for uint32_t # test checking Load for uint32_t
tc_queue.appendleft(("print", "Testing Service 20: Load uint32_t")) tc_queue.appendleft(("print", "Testing Service 20: Load uint32_t"))
parameter_id = struct.pack(">I", parameterID0) parameter_id = struct.pack(">I", parameter_id0)
type_and_matrix_data = pack_type_and_matrix_data(3, 14, 1, 1) type_and_matrix_data = pack_type_and_matrix_data(3, 14, 1, 1)
parameter_data = struct.pack(">I", 42) parameter_data = struct.pack(">I", 42)
payload = object_id + parameter_id+ type_and_matrix_data + parameter_data payload = object_id + parameter_id + type_and_matrix_data + parameter_data
command = PusTelecommand(service=20, subservice=128, ssc=2010, app_data=payload) command = PusTelecommand(service=20, subservice=128, ssc=2010, app_data=payload)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
# test checking Dump for uint32_t # test checking Dump for uint32_t
tc_queue.appendleft(("print", "Testing Service 20: Dump uint32_t")) tc_queue.appendleft(("print", "Testing Service 20: Dump uint32_t"))
parameter_id = struct.pack(">I", parameterID0) parameter_id = struct.pack(">I", parameter_id0)
payload = object_id + parameter_id payload = object_id + parameter_id
command = PusTelecommand(service=20, subservice=129, ssc=2020, app_data=payload) command = PusTelecommand(service=20, subservice=129, ssc=2020, app_data=payload)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
# test checking Load for int32_t # test checking Load for int32_t
tc_queue.appendleft(("print", "Testing Service 20: Load int32_t")) tc_queue.appendleft(("print", "Testing Service 20: Load int32_t"))
parameter_id = struct.pack(">I", parameterID1) parameter_id = struct.pack(">I", parameter_id1)
type_and_matrix_data = pack_type_and_matrix_data(4, 14, 1, 1) type_and_matrix_data = pack_type_and_matrix_data(4, 14, 1, 1)
parameter_data = struct.pack(">i", -42) parameter_data = struct.pack(">i", -42)
payload = object_id + parameter_id+ type_and_matrix_data + parameter_data payload = object_id + parameter_id + type_and_matrix_data + parameter_data
command = PusTelecommand(service=20, subservice=128, ssc=2030, app_data=payload) command = PusTelecommand(service=20, subservice=128, ssc=2030, app_data=payload)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
# test checking Dump for int32_t # test checking Dump for int32_t
tc_queue.appendleft(("print", "Testing Service 20: Dump int32_t")) tc_queue.appendleft(("print", "Testing Service 20: Dump int32_t"))
parameter_id = struct.pack(">I", parameterID1) parameter_id = struct.pack(">I", parameter_id1)
payload = object_id + parameter_id payload = object_id + parameter_id
command = PusTelecommand(service=20, subservice=129, ssc=2040, app_data=payload) command = PusTelecommand(service=20, subservice=129, ssc=2040, app_data=payload)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
# test checking Load for float # test checking Load for float
tc_queue.appendleft(("print", "Testing Service 20: Load float")) tc_queue.appendleft(("print", "Testing Service 20: Load float"))
parameter_id = struct.pack(">I", parameterID2) parameter_id = struct.pack(">I", parameter_id2)
type_and_matrix_data = pack_type_and_matrix_data(5, 1, 1, 1) type_and_matrix_data = pack_type_and_matrix_data(5, 1, 1, 1)
parameter_data = struct.pack(">f", 4.2) parameter_data = struct.pack(">f", 4.2)
payload = object_id + parameter_id+ type_and_matrix_data + parameter_data payload = object_id + parameter_id + type_and_matrix_data + parameter_data
command = PusTelecommand(service=20, subservice=128, ssc=2050, app_data=payload) command = PusTelecommand(service=20, subservice=128, ssc=2050, app_data=payload)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
# test checking Dump for float # test checking Dump for float
tc_queue.appendleft(("print", "Testing Service 20: Dump float")) tc_queue.appendleft(("print", "Testing Service 20: Dump float"))
parameter_id = struct.pack(">I", parameterID2) parameter_id = struct.pack(">I", parameter_id2)
payload = object_id + parameter_id payload = object_id + parameter_id
command = PusTelecommand(service=20, subservice=129, ssc=2060, app_data=payload) command = PusTelecommand(service=20, subservice=129, ssc=2060, app_data=payload)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
......
...@@ -4,11 +4,12 @@ Created: 21.01.2020 07:48 ...@@ -4,11 +4,12 @@ Created: 21.01.2020 07:48
@author: Jakob Meier @author: Jakob Meier
""" """
import config.obsw_config as g import config.tmtcc_config as g
from typing import Deque, Union from typing import Deque, Union
from tc.obsw_pus_tc_packer import PusTelecommand, TcQueueT from tmtc_core.pus_tc.tmtcc_pus_tc_packer import TcQueueT
from tc.obsw_tc_service8 import make_action_id from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand
from tmtc_core.pus_tc.tmtcc_tc_service8_functional_cmd import make_action_id
from tmtc_core.utility.obsw_logger import get_logger from tmtc_core.utility.obsw_logger import get_logger
LOGGER = get_logger() LOGGER = get_logger()
......
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
""" """
@file obsw_tc_service2.py @file tmtcc_tc_service2_raw_cmd.py
@brief PUS Service 2: Device Access, Native low-level commanding @brief PUS Service 2: Device Access, native low-level commanding
@author R. Mueller @author R. Mueller
@date 01.11.2019 @date 01.11.2019
""" """
import struct import struct
import config.obsw_config as g import config.tmtcc_config as g
from tmtc_core.tc.obsw_pus_tc_base import PusTelecommand, Deque from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand, Deque
from tc.obsw_tc_service200 import pack_mode_data from pus_tc.tmtcc_tc_service200_mode import pack_mode_data
def pack_service2_test_into(tc_queue: Deque, called_externally: bool = False) -> Deque: def pack_service2_test_into(tc_queue: Deque, called_externally: bool = False) -> Deque:
......
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
""" """
@file obsw_tc_service3.py @file tmtcc_tc_service3_housekeeping.py
@brief PUS Service 3: Housekeeping Service. @brief PUS Service 3: Housekeeping Service.
@author R. Mueller @author R. Mueller
@date 02.05.2020 @date 02.05.2020
""" """
import struct import struct
from typing import Deque from typing import Deque, Union
from tmtc_core.tc.obsw_pus_tc_base import PusTelecommand from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand
import config.obsw_config as g import config.tmtcc_config as g
def make_sid(object_id: bytearray, set_id: int) -> bytearray: def make_sid(object_id: bytearray, set_id: int) -> bytearray:
...@@ -23,12 +23,15 @@ def make_interval(interval_seconds: float) -> bytearray: ...@@ -23,12 +23,15 @@ def make_interval(interval_seconds: float) -> bytearray:
# adding custom defintion to hk using test pool variables # adding custom defintion to hk using test pool variables
def pack_service3_test_into(tc_queue: Deque) -> Deque: def pack_service3_test_into(tc_queue: Deque, op_code: Union[str, int] = 0) -> Deque:
tc_queue.appendleft(("print", "Testing Service 3")) if op_code.lower() == "ie":
# Predefined packet testing tc_queue.appendleft(("print", "Testing Internal Error Reporter"))
pack_test_device_test(tc_queue) pack_internal_error_reporter_tests(tc_queue)
pack_custom_tests(tc_queue) else:
pack_internal_error_reporter_tests(tc_queue) tc_queue.appendleft(("print", "Testing Service 3"))
# Predefined packet testing
pack_test_device_test(tc_queue)
# pack_thermal_sensor_tests(tc_queue)
tc_queue.appendleft(("export", "log/tmtc_log_service3.txt")) tc_queue.appendleft(("export", "log/tmtc_log_service3.txt"))
return tc_queue return tc_queue
...@@ -37,7 +40,7 @@ def pack_test_device_test(tc_queue: Deque): ...@@ -37,7 +40,7 @@ def pack_test_device_test(tc_queue: Deque):
pass pass
def pack_custom_tests(tc_queue: Deque): def pack_thermal_sensor_tests(tc_queue: Deque):
tc_queue.appendleft(("print", "Generate one thermal sensor packet: ")) tc_queue.appendleft(("print", "Generate one thermal sensor packet: "))
command = PusTelecommand(service=3, subservice=27, ssc=3100, app_data=sid_thermalsensor) command = PusTelecommand(service=3, subservice=27, ssc=3100, app_data=sid_thermalsensor)
tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(command.pack_command_tuple())
......
# -*- coding: utf-8 -*-
"""
@file tmtcc_tc_service5_event.py
@brief PUS Service 5: Event Service
PUS Service 17: Test Service
@author R. Mueller
@date 02.05.2020
"""
from tmtc_core.pus_tc.tmtcc_pus_tc_packer import TcQueueT
from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand
from tmtc_core.pus_tc.tmtcc_tc_service17_test import pack_service17_ping_command
def pack_service5_test_into(tc_queue: TcQueueT):
tc_queue.appendleft(("print", "Testing Service 5"))
# invalid subservice
tc_queue.appendleft(("print", "Testing Service 5: Invalid subservice"))
command = PusTelecommand(service=5, subservice=1, ssc=500)
tc_queue.appendleft(command.pack_command_tuple())
# disable events
tc_queue.appendleft(("print", "Testing Service 5: Disable event"))
command = PusTelecommand(service=5, subservice=6, ssc=500)
tc_queue.appendleft(command.pack_command_tuple())
# trigger event
tc_queue.appendleft(("print", "Testing Service 5: Trigger event"))
command = PusTelecommand(service=17, subservice=128, ssc=510)
tc_queue.appendleft(command.pack_command_tuple())
# enable event
tc_queue.appendleft(("print", "Testing Service 5: Enable event"))
command = PusTelecommand(service=5, subservice=5, ssc=520)
tc_queue.appendleft(command.pack_command_tuple())
# trigger event
tc_queue.appendleft(("print", "Testing Service 5: Trigger another event"))
command = PusTelecommand(service=17, subservice=128, ssc=530)
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft(("export", "log/tmtc_log_service5.txt"))
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
""" """
@file obsw_tc_service8.py @file tmtcc_tc_service8_func_cmd.py
@brief PUS Service 8: High-level functional commanding. @brief PUS Service 8: High-level functional commanding.
@author R. Mueller @author R. Mueller
@date 01.11.2019 @date 01.11.2019
""" """
import struct
from typing import Deque from typing import Deque
import config.obsw_config as g import config.tmtcc_config as g
from tmtc_core.tc.obsw_pus_tc_base import PusTelecommand from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand
from tc.obsw_tc_service200 import pack_mode_data from pus_tc.tmtcc_tc_service200_mode import pack_mode_data
def pack_service8_test_into(tc_queue: Deque, called_externally: bool = False) -> Deque: def pack_service8_test_into(tc_queue: Deque, called_externally: bool = False) -> Deque:
...@@ -64,13 +63,3 @@ def pack_service8_test_into(tc_queue: Deque, called_externally: bool = False) -> ...@@ -64,13 +63,3 @@ def pack_service8_test_into(tc_queue: Deque, called_externally: bool = False) ->
tc_queue.appendleft(("export", "log/tmtc_log_service8.txt")) tc_queue.appendleft(("export", "log/tmtc_log_service8.txt"))
return tc_queue return tc_queue
def generate_action_command(object_id: bytearray, action_id: int, data: bytearray = bytearray([]),
ssc: int = 0):
data_to_pack = bytearray(object_id)
data_to_pack += make_action_id(action_id) + data
return PusTelecommand(service=8, subservice=128, ssc=ssc, app_data=data_to_pack)
def make_action_id(action_id: int) -> bytearray:
return bytearray(struct.pack('!I', action_id))
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
""" """
@file obsw_tc_service9.py @file tmtcc_tc_service9_time.py
@brief PUS Service 9: Time management. @brief PUS Service 9: Time management.
@author R. Mueller @author R. Mueller
@date 01.11.2019 @date 01.11.2019
""" """
from datetime import datetime from datetime import datetime
from tc.obsw_pus_tc_packer import TcQueueT, PusTelecommand from tmtc_core.pus_tc.tmtcc_pus_tc_packer import TcQueueT
from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand
from tmtc_core.utility.obsw_logger import get_logger from tmtc_core.utility.obsw_logger import get_logger
LOGGER = get_logger() LOGGER = get_logger()
......
File moved
import struct import struct
from tmtc_core.tm.obsw_pus_tm_base import PusTelemetry from tmtc_core.pus_tm.tmtcc_pus_tm_base import PusTelemetry
from tmtc_core.utility.obsw_logger import get_logger from tmtc_core.utility.obsw_logger import get_logger
from tm.obsw_tm_service_1 import Service1TM from tmtc_core.pus_tm.tmtcc_tm_service_1 import Service1TM
from tm.obsw_tm_service_3 import Service3TM from pus_tm.obsw_tm_service_3 import Service3TM
from tm.obsw_tm_service_5 import Service5TM from tmtc_core.pus_tm.tmtcc_tm_service_5 import Service5TM
from tm.obsw_tm_service_20 import Service20TM from pus_tm.obsw_tm_service_20 import Service20TM
from tm.obsw_tm_service_23 import Service23TM from pus_tm.obsw_tm_service_23 import Service23TM
LOGGER = get_logger() LOGGER = get_logger()
......
import struct import struct
from tmtc_core.tm.obsw_pus_tm_base import PusTelemetry, TmDictionaryKeys, PusTmInfoT from tmtc_core.pus_tm.tmtcc_pus_tm_base import PusTelemetry, TmDictionaryKeys, PusTmInfoT
from tmtc_core.utility.obsw_logger import get_logger
LOGGER = get_logger()
class Service20TM(PusTelemetry): class Service20TM(PusTelemetry):
...@@ -22,7 +25,7 @@ class Service20TM(PusTelemetry): ...@@ -22,7 +25,7 @@ class Service20TM(PusTelemetry):
if self.type_ptc == 5 and self.type_pfc == 1: if self.type_ptc == 5 and self.type_pfc == 1:
self.param = struct.unpack('>f', self._tm_data[12:datasize])[0] self.param = struct.unpack('>f', self._tm_data[12:datasize])[0]
else: else:
logger.info("Error when receiving Pus Service 20 TM: subservice is != 130, but 130 is \ LOGGER.info("Error when receiving Pus Service 20 TM: subservice is != 130, but 130 is \
only known subservice") only known subservice")
self.specify_packet_info("Functional Commanding Reply") self.specify_packet_info("Functional Commanding Reply")
......
import struct import struct
from tmtc_core.tm.obsw_pus_tm_base import PusTelemetry from tmtc_core.pus_tm.tmtcc_pus_tm_base import PusTelemetry
from tmtc_core.utility.obsw_logger import get_logger from tmtc_core.utility.obsw_logger import get_logger
LOGGER = get_logger() LOGGER = get_logger()
......
...@@ -6,11 +6,11 @@ Description: Deserialize Housekeeping TM ...@@ -6,11 +6,11 @@ Description: Deserialize Housekeeping TM
Author: R. Mueller Author: R. Mueller
""" """
from tmtc_core.tm.obsw_pus_tm_base import PusTelemetry from tmtc_core.pus_tm.tmtcc_pus_tm_base import PusTelemetry
from typing import Type from typing import Type
from tmtc_core.utility.obsw_logger import get_logger from tmtc_core.utility.obsw_logger import get_logger
import struct import struct
import config.obsw_config as g import config.tmtcc_config as g
LOGGER = get_logger() LOGGER = get_logger()
......
...@@ -44,12 +44,12 @@ from abc import abstractmethod ...@@ -44,12 +44,12 @@ from abc import abstractmethod
from collections import deque from collections import deque
from typing import Deque from typing import Deque
from config import obsw_config as g from config import tmtcc_config as g
from tc.obsw_pus_tc_packer import pack_dummy_device_test_into from pus_tc.tmtcc_tc_packer_hook import pack_dummy_device_test_into
from tmtc_core.tc.obsw_pus_tc_base import PusTcInfoQueueT, TcDictionaryKeys from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTcInfoQueueT, TcDictionaryKeys
from tm.obsw_tm_service_1 import PusPacketInfoService1T from tmtc_core.pus_tm.tmtcc_tm_service_1 import PusPacketInfoService1T
from tmtc_core.tm.obsw_pus_tm_base import TmDictionaryKeys from tmtc_core.pus_tm.tmtcc_pus_tm_base import TmDictionaryKeys
from tmtc_core.tm.obsw_pus_tm_factory import PusTmInfoQueueT, PusTmInfoT from tmtc_core.pus_tm.tmtcc_pus_tm_factory import PusTmInfoQueueT, PusTmInfoT
from tmtc_core.sendreceive.obsw_multiple_commands_sender_receiver import MultipleCommandSenderReceiver from tmtc_core.sendreceive.obsw_multiple_commands_sender_receiver import MultipleCommandSenderReceiver
from tmtc_core.utility.obsw_logger import get_logger from tmtc_core.utility.obsw_logger import get_logger
...@@ -80,12 +80,12 @@ class TestService(unittest.TestCase): ...@@ -80,12 +80,12 @@ class TestService(unittest.TestCase):
:return: :return:
""" """
cls._displayMode = "long" cls._displayMode = "long"
# wait intervals between tc send bursts. # wait intervals between pus_tc send bursts.
# Example: [2,4] sends to send 2 tc from queue and wait, then sends another 2 and wait again # Example: [2,4] sends to send 2 pus_tc from queue and wait, then sends another 2 and wait again
cls.wait_intervals = [] cls.wait_intervals = []
cls.wait_time = 5.0 cls.wait_time = 5.0
cls.print_tc = True cls.print_tc = True
# default wait time between tc send bursts # default wait time between pus_tc send bursts
cls.test_queue = deque() cls.test_queue = deque()
# Extremely ugly solution so that we can use the Python Unit Test Framework # Extremely ugly solution so that we can use the Python Unit Test Framework
...@@ -127,7 +127,7 @@ class TestService(unittest.TestCase): ...@@ -127,7 +127,7 @@ class TestService(unittest.TestCase):
TODO: Maybe we should instantiate this once in the main and then reuse it instead TODO: Maybe we should instantiate this once in the main and then reuse it instead
of calling the constructor over and over again. of calling the constructor over and over again.
If done so we need a setter for: wait_time, wait_intervals, printTm, If done so we need a setter for: wait_time, wait_intervals, printTm,
tc_timeout_factor and the tc.queue. Furthermore, changing parameters should tc_timeout_factor and the pus_tc.queue. Furthermore, changing parameters should
only be allowed as long as the commander/receiver is not running by checking a flag only be allowed as long as the commander/receiver is not running by checking a flag
:return: :return:
""" """
...@@ -199,7 +199,7 @@ class TestService(unittest.TestCase): ...@@ -199,7 +199,7 @@ class TestService(unittest.TestCase):
def scan_for_respective_tc(self, current_tm_info: PusTmInfoT): def scan_for_respective_tc(self, current_tm_info: PusTmInfoT):
""" """
this function looks whether the tc verification SSC matched this function looks whether the pus_tc verification SSC matched
a source sequence count of the sent TM a source sequence count of the sent TM
""" """
current_subservice = current_tm_info[TmDictionaryKeys.SUBSERVICE] current_subservice = current_tm_info[TmDictionaryKeys.SUBSERVICE]
......
...@@ -9,10 +9,10 @@ import unittest ...@@ -9,10 +9,10 @@ import unittest
from typing import Deque from typing import Deque
from test.obsw_module_test import TestService, PusTmInfoQueueT, TmDictionaryKeys, AssertionDictKeys from test.obsw_module_test import TestService, PusTmInfoQueueT, TmDictionaryKeys, AssertionDictKeys
from tmtc_core.tc.obsw_pus_tc_base import PusTcInfoQueueT from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTcInfoQueueT
from tc.obsw_pus_tc_packer import pack_service17_test_into, pack_service5_test_into, \ from pus_tc.tmtcc_tc_packer_hook import pack_service17_test_into, pack_service5_test_into, \
pack_service2_test_into, pack_service8_test_into, pack_service200_test_into, pack_service20_test_into pack_service2_test_into, pack_service8_test_into, pack_service200_test_into
import config.obsw_config as g import config.tmtcc_config as g
from tmtc_core.utility.obsw_logger import get_logger from tmtc_core.utility.obsw_logger import get_logger
LOGGER = get_logger() LOGGER = get_logger()
...@@ -183,8 +183,8 @@ class TestService17(TestService): ...@@ -183,8 +183,8 @@ class TestService17(TestService):
def _analyse_tm_tc_info(self, tm_info_queue: PusTmInfoQueueT, tc_info_queue: PusTcInfoQueueT): def _analyse_tm_tc_info(self, tm_info_queue: PusTmInfoQueueT, tc_info_queue: PusTcInfoQueueT):
assertion_dict = super()._analyse_tm_tc_info(tm_info_queue=tm_info_queue, assertion_dict = super()._analyse_tm_tc_info(tm_info_queue=tm_info_queue,
tc_info_queue=tc_info_queue) tc_info_queue=tc_info_queue)
# add anything elsee other than tc verification counter # add anything elsee other than pus_tc verification counter
# and ssc that is needed for tm analysis # and ssc that is needed for pus_tm analysis
return assertion_dict return assertion_dict
def analyse_tc_info(self, tc_info_queue): def analyse_tc_info(self, tc_info_queue):
...@@ -256,8 +256,8 @@ class TestService200(TestService): ...@@ -256,8 +256,8 @@ class TestService200(TestService):
def _analyse_tm_tc_info(self, tm_info_queue: PusTmInfoQueueT, tc_info_queue: PusTcInfoQueueT): def _analyse_tm_tc_info(self, tm_info_queue: PusTmInfoQueueT, tc_info_queue: PusTcInfoQueueT):
assertion_dict = super()._analyse_tm_tc_info(tm_info_queue=tm_info_queue, assertion_dict = super()._analyse_tm_tc_info(tm_info_queue=tm_info_queue,
tc_info_queue=tc_info_queue) tc_info_queue=tc_info_queue)
# add anything else other than tc verification counter # add anything else other than pus_tc verification counter
# and ssc that is needed for tm analysis # and ssc that is needed for pus_tm analysis
return assertion_dict return assertion_dict
def analyse_tc_info(self, tc_info_queue): def analyse_tc_info(self, tc_info_queue):
......
# -*- coding: utf-8 -*-
"""
Program: obsw_tm_service_1.py
Date: 30.12.2019
Description: Deserialize Pus Verification TM
Author: R. Mueller
"""
import struct
from typing import Dict
from tmtc_core.tm.obsw_pus_tm_base import PusTelemetry, TmDictionaryKeys
from tmtc_core.tm.obsw_pus_tm_creator import PusTelemetryCreator
from tmtc_core.utility.obsw_logger import get_logger
LOGGER = get_logger()
PusPacketInfoService1T = Dict[TmDictionaryKeys, any]
class Service1TM(PusTelemetry):
"""
Service 1 TM class representation. Can be used to deserialize raw service 1 packets.
"""
def __init__(self, byte_array: bytearray):
super().__init__(byte_array)
self.has_tc_error_code = False
self.is_step_reply = False
# Failure Reports with error code
self.err_code = 0
self.step_number = 0
self.error_param1 = 0
self.error_param2 = 0
self.tc_packet_id = self._tm_data[0] << 8 | self._tm_data[1]
self.tc_ssc = ((self._tm_data[2] & 0x3F) << 8) | self._tm_data[3]
if self.get_subservice() % 2 == 0:
self.__handle_failure_verification()
else:
self.__handle_success_verification()
def append_telemetry_content(self, content_list: list):
super().append_telemetry_content(content_list)
content_list.append(str(hex(self.tc_packet_id)))
content_list.append(str(self.tc_ssc))
if self.has_tc_error_code:
if self.is_step_reply:
content_list.append(str(self.step_number))
content_list.append(str(hex(self.err_code)))
content_list.append(str(hex(self.error_param1)) + ", " + str(self.error_param1))
content_list.append(str(hex(self.error_param2)) + ", " + str(self.error_param2))
elif self.is_step_reply:
content_list.append(str(self.step_number))
def append_telemetry_column_headers(self, header_list: list):
super().append_telemetry_column_headers(header_list)
header_list.append("TC Packet ID")
header_list.append("TC SSC")
if self.has_tc_error_code:
if self.is_step_reply:
header_list.append("Step Number")
header_list.append("Return Value")
header_list.append("Error Param 1")
header_list.append("Error Param 2")
elif self.is_step_reply:
header_list.append("Step Number")
def __handle_failure_verification(self):
self.specify_packet_info("Failure Verficiation")
self.has_tc_error_code = True
if self.get_subservice() == 2:
self.append_packet_info(" : Acceptance failure")
elif self.get_subservice() == 4:
self.append_packet_info(" : Start failure")
elif self.get_subservice() == 6:
self.is_step_reply = True
self.append_packet_info(" : Step Failure")
self.step_number = struct.unpack('>B', self._tm_data[4:5])[0]
self.err_code = struct.unpack('>H', self._tm_data[5:7])[0]
self.error_param1 = struct.unpack('>I', self._tm_data[7:11])[0]
self.error_param2 = struct.unpack('>I', self._tm_data[11:15])[0]
elif self.get_subservice() == 8:
self.err_code = struct.unpack('>H', self._tm_data[4:6])[0]
self.error_param1 = struct.unpack('>I', self._tm_data[6:10])[0]
self.error_param2 = struct.unpack('>I', self._tm_data[10:14])[0]
else:
LOGGER.error("Service1TM: Invalid subservice")
def __handle_success_verification(self):
self.specify_packet_info("Success Verification")
if self.get_subservice() == 1:
self.append_packet_info(" : Acceptance success")
elif self.get_subservice() == 3:
self.append_packet_info(" : Start success")
elif self.get_subservice() == 5:
self.is_step_reply = True
self.append_packet_info(" : Step Success")
self.step_number = struct.unpack('>B', self._tm_data[4:5])[0]
elif self.get_subservice() == 7:
self.append_packet_info(" : Completion success")
else:
LOGGER.error("Service1TM: Invalid subservice")
def pack_tm_information(self) -> PusPacketInfoService1T:
tm_information = super().pack_tm_information()
add_information = {
TmDictionaryKeys.TC_PACKET_ID: self.tc_packet_id,
TmDictionaryKeys.TC_SSC: self.tc_ssc,
}
tm_information.update(add_information)
if self.has_tc_error_code:
tm_information.update({TmDictionaryKeys.ERROR_CODE: self.err_code})
if self.is_step_reply:
tm_information.update({TmDictionaryKeys.STEP_NUMBER: self.step_number})
return tm_information
class Service1TmPacked(PusTelemetryCreator):
"""
Class representation for Service 1 TM creation.
"""
def __init__(self, subservice: int, ssc: int = 0, tc_packet_id: int = 0, tc_ssc: int = 0):
source_data = bytearray()
source_data.append((tc_packet_id & 0xFF00) >> 8)
source_data.append(tc_packet_id & 0xFF)
tc_psc = (tc_ssc & 0x3FFF) | (0b11 << 16)
source_data.append((tc_psc & 0xFF00) >> 8)
source_data.append(tc_psc & 0xFF)
super().__init__(service=1, subservice=subservice, ssc=ssc, source_data=source_data)
def pack(self) -> bytearray:
return super().pack()