Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • Robin.Mueller/tmtc
1 result
Show changes
Showing
with 247 additions and 226 deletions
from typing import Deque
from config.obsw_config import CORE_CONTROLLER_ID
from tc.obsw_tc_service8 import make_action_id
from tmtc_core.tc.obsw_pus_tc_base import PusTelecommand
from config.tmtcc_config import CORE_CONTROLLER_ID
from tmtc_core.pus_tc.tmtcc_tc_service8_functional_cmd import make_action_id
from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand
def pack_core_command(tc_queue: Deque, op_code, ssc: int = 0):
......
# -*- coding: utf-8 -*-
"""
@file obsw_tc_gps.py
@file tmtcc_tc_gps.py
@brief GPS Device: GPS device testing
@author R. Mueller
@date 02.05.2020
"""
from tc.obsw_pus_tc_packer import TcQueueT, PusTelecommand
from tc.obsw_tc_service2 import pack_mode_data
from tmtc_core.pus_tc.tmtcc_pus_tc_packer import TcQueueT
from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand
from pus_tc.tmtcc_tc_service2_raw_cmd import pack_mode_data
import config.tmtcc_config as g
import config.obsw_config as g
def pack_gps_test_into(object_id: bytearray, tc_queue: TcQueueT) -> TcQueueT:
......
from typing import Union
from tmtc_core.tc.obsw_pus_tc_base import PusTelecommand, Deque
from tc.obsw_tc_service8 import make_action_id
from tc.obsw_tc_service20 import pack_boolean_parameter_setting
from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand, Deque
from tmtc_core.pus_tc.tmtcc_tc_service8_functional_cmd import make_action_id
from pus_tc.tmtcc_tc_service20_parameter import pack_boolean_parameter_setting
from tmtc_core.utility.obsw_logger import get_logger
from config.obsw_config import SW_IMAGE_HANDLER_ID
from config.tmtcc_config import SW_IMAGE_HANDLER_ID
LOGGER = get_logger()
......
# -*- coding: utf-8 -*-
"""
@file obsw_tc_packer.py
@brief Packs the TC queue for specific G_SERVICE or device testing
@details
Contains the sevice packet which can pack specific service queues (hardcoded for now)
@author R. Mueller
@date 01.11.2019
"""
import os
from tmtc_core.tc.obsw_pus_tc_base import PusTelecommand, TcQueueT
from tc.obsw_tc_service2 import pack_service2_test_into
from tc.obsw_tc_service3 import pack_service3_test_into
from tc.obsw_tc_service8 import pack_service8_test_into
from tc.obsw_tc_service9 import pack_service9_test_into
from tc.obsw_tc_service23_sdcard import pack_service23_commands_into
from tc.obsw_tc_service20 import pack_service20_test_into
from tc.obsw_tc_utility import pack_utility_command
from tc.obsw_tc_service200 import pack_mode_data, pack_service200_test_into
from tc.obsw_tc_service5_17 import pack_service5_test_into, pack_service17_test_into
from tc.obsw_image_handler import generate_img_handler_packet
from tc.obsw_tc_gps import pack_gps_test_into
from tc.obsw_tc_core import pack_core_command
from tmtc_core.utility.obsw_logger import get_logger
import config.obsw_config as g
from collections import deque
from typing import Union
LOGGER = get_logger()
class ServiceQueuePacker:
def __init__(self):
pass
@staticmethod
def pack_service_queue(service: Union[int, str], op_code: int, service_queue: TcQueueT):
if service == 2:
return pack_service2_test_into(service_queue)
if service == 3:
return pack_service3_test_into(service_queue)
if service == 5:
return pack_service5_test_into(service_queue)
if service == 8:
return pack_service8_test_into(service_queue)
if service == 9:
return pack_service9_test_into(service_queue)
if service == 17:
return pack_service17_test_into(service_queue, op_code)
if service == 20:
return pack_service20_test_into(service_queue)
if service == 23 or service.lower() == "sd":
return pack_service23_commands_into(service_queue, op_code)
if service == 200:
return pack_service200_test_into(service_queue)
if service.lower() == "dummy":
return pack_dummy_device_test_into(service_queue)
if service.lower() == "img":
return generate_img_handler_packet(service_queue, op_code)
if service.lower() == "core":
return pack_core_command(service_queue, op_code)
if service.lower() == "led":
return pack_utility_command(service_queue, op_code)
if service.lower() == "gps0":
# Object ID: GPS Device
object_id = g.GPS0_DEVICE_ID
return pack_gps_test_into(object_id, service_queue)
if service.lower() == "gps1":
# Object ID: GPS Device
object_id = g.GPS1_DEVICE_ID
return pack_gps_test_into(object_id, service_queue)
if service.lower() == "Error":
return pack_error_testing_into(service_queue)
LOGGER.warning("Invalid Service !")
# TODO: a way to select certain services would be nice (by passing a dict or array maybe)
def create_total_tc_queue() -> TcQueueT:
if not os.path.exists("log"):
os.mkdir("log")
tc_queue = deque()
tc_queue = pack_service2_test_into(tc_queue)
tc_queue = pack_service3_test_into(tc_queue)
tc_queue = pack_service5_test_into(tc_queue)
tc_queue = pack_service8_test_into(tc_queue)
tc_queue = pack_service9_test_into(tc_queue)
tc_queue = pack_service17_test_into(tc_queue)
tc_queue = pack_service20_test_into(tc_queue)
tc_queue = pack_service200_test_into(tc_queue)
tc_queue = pack_dummy_device_test_into(tc_queue)
object_id = g.GPS0_DEVICE_ID
tc_queue = pack_gps_test_into(object_id, tc_queue)
return tc_queue
def pack_dummy_device_test_into(tc_queue: TcQueueT) -> TcQueueT:
tc_queue.appendleft(("print", "Testing Dummy Device"))
# Object ID: Dummy Device
object_id = g.DUMMY_DEVICE_ID
# Set On Mode
tc_queue.appendleft(("print", "Testing Service Dummy: Set On"))
mode_data = pack_mode_data(object_id, 1, 0)
command = PusTelecommand(service=200, subservice=1, ssc=1, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
# Test Service 2 commands
tc_queue.appendleft(("print", "Testing Service Dummy: Service 2"))
pack_service2_test_into(tc_queue, True)
# Test Service 8
tc_queue.appendleft(("print", "Testing Service Dummy: Service 8"))
pack_service8_test_into(tc_queue, True)
tc_queue.appendleft(("export", "log/tmtc_log_service_dummy.txt"))
return tc_queue
def pack_error_testing_into(tc_queue: TcQueueT) -> TcQueueT:
# a lot of events
command = PusTelecommand(service=17, subservice=129, ssc=2010)
tc_queue.appendleft(command.pack_command_tuple())
# a lot of ping testing
command = PusTelecommand(service=17, subservice=130, ssc=2020)
tc_queue.appendleft(command.pack_command_tuple())
return tc_queue
import os
from collections import deque
from typing import Union
import config.tmtcc_config as g
from tmtc_core.utility.obsw_logger import get_logger
from tmtc_core.pus_tc.tmtcc_pus_tc_base import TcQueueT, PusTelecommand
from pus_tc.tmtcc_tc_service2_raw_cmd import pack_service2_test_into
from pus_tc.tmtcc_tc_service3_housekeeping import pack_service3_test_into
from pus_tc.tmtcc_tc_service8_func_cmd import pack_service8_test_into
from pus_tc.tmtcc_tc_service9_time import pack_service9_test_into
from pus_tc.tmtcc_tc_service23_sdcard import pack_service23_commands_into
from pus_tc.tmtcc_tc_service20_parameter import pack_service20_test_into
from pus_tc.tmtcc_pus_tc_utility import pack_utility_command
from pus_tc.tmtcc_tc_service200_mode import pack_mode_data, pack_service200_test_into
from pus_tc.tmtcc_tc_service5_event import pack_service5_test_into
from pus_tc.tmtcc_service17_test import pack_service17_test_into
from pus_tc.tmtcc_tc_image_handler import generate_img_handler_packet
from pus_tc.tmtcc_tc_gps import pack_gps_test_into
from pus_tc.tmtcc_tc_core import pack_core_command
LOGGER = get_logger()
def pack_service_queue(service: Union[int, str], op_code: int, service_queue: TcQueueT):
if service == 2:
return pack_service2_test_into(service_queue)
if service == 3:
return pack_service3_test_into(service_queue, op_code)
if service == 5:
return pack_service5_test_into(service_queue)
if service == 8:
return pack_service8_test_into(service_queue)
if service == 9:
return pack_service9_test_into(service_queue)
if service == 17:
return pack_service17_test_into(service_queue, op_code)
if service == 20:
return pack_service20_test_into(service_queue)
if service == 23 or service.lower() == "sd":
return pack_service23_commands_into(service_queue, op_code)
if service == 200:
return pack_service200_test_into(service_queue)
if service.lower() == "dummy":
return pack_dummy_device_test_into(service_queue)
if service.lower() == "img":
return generate_img_handler_packet(service_queue, op_code)
if service.lower() == "core":
return pack_core_command(service_queue, op_code)
if service.lower() == "led":
return pack_utility_command(service_queue, op_code)
if service.lower() == "gps0":
# Object ID: GPS Device
object_id = g.GPS0_DEVICE_ID
return pack_gps_test_into(object_id, service_queue)
if service.lower() == "gps1":
# Object ID: GPS Device
object_id = g.GPS1_DEVICE_ID
return pack_gps_test_into(object_id, service_queue)
if service.lower() == "Error":
return pack_error_testing_into(service_queue)
LOGGER.warning("Invalid Service !")
# TODO: a way to select certain services would be nice (by passing a dict or array maybe)
def create_total_tc_queue() -> TcQueueT:
if not os.path.exists("log"):
os.mkdir("log")
tc_queue = deque()
tc_queue = pack_service2_test_into(tc_queue)
tc_queue = pack_service3_test_into(tc_queue)
tc_queue = pack_service5_test_into(tc_queue)
tc_queue = pack_service8_test_into(tc_queue)
tc_queue = pack_service9_test_into(tc_queue)
tc_queue = pack_service17_test_into(tc_queue)
tc_queue = pack_service20_test_into(tc_queue)
tc_queue = pack_service200_test_into(tc_queue)
tc_queue = pack_dummy_device_test_into(tc_queue)
object_id = g.GPS0_DEVICE_ID
tc_queue = pack_gps_test_into(object_id, tc_queue)
return tc_queue
def pack_dummy_device_test_into(tc_queue: TcQueueT) -> TcQueueT:
tc_queue.appendleft(("print", "Testing Dummy Device"))
# Object ID: Dummy Device
object_id = g.DUMMY_DEVICE_ID
# Set On Mode
tc_queue.appendleft(("print", "Testing Service Dummy: Set On"))
mode_data = pack_mode_data(object_id, 1, 0)
command = PusTelecommand(service=200, subservice=1, ssc=1, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
# Test Service 2 commands
tc_queue.appendleft(("print", "Testing Service Dummy: Service 2"))
pack_service2_test_into(tc_queue, True)
# Test Service 8
tc_queue.appendleft(("print", "Testing Service Dummy: Service 8"))
pack_service8_test_into(tc_queue, True)
tc_queue.appendleft(("export", "log/tmtc_log_service_dummy.txt"))
return tc_queue
def pack_error_testing_into(tc_queue: TcQueueT) -> TcQueueT:
# a lot of events
command = PusTelecommand(service=17, subservice=129, ssc=2010)
tc_queue.appendleft(command.pack_command_tuple())
# a lot of ping testing
command = PusTelecommand(service=17, subservice=130, ssc=2020)
tc_queue.appendleft(command.pack_command_tuple())
return tc_queue
\ No newline at end of file
# -*- coding: utf-8 -*-
"""
@file obsw_tc_service200.py
@file tmtcc_tc_service200_mode.py
@brief PUS Service 200: PUS custom service 200: Mode commanding
@author R. Mueller
@date 02.05.2020
"""
from tmtc_core.tc.obsw_pus_tc_base import PusTelecommand
from tc.obsw_pus_tc_packer import TcQueueT
import config.obsw_config as g
from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand
from tmtc_core.pus_tc.tmtcc_pus_tc_packer import TcQueueT
import config.tmtcc_config as g
import struct
......
# -*- coding: utf-8 -*-
"""
@file obsw_tc_service20.py
@file tmtcc_tc_service20_parameter.py
@brief PUS Service 20: Parameter management.
@author J. Gerhards
@date 30.06.2020
......@@ -8,10 +8,10 @@
import struct
from typing import Deque
import config.obsw_config as g
from tmtc_core.tc.obsw_pus_tc_base import PusTelecommand, TcQueueT
import config.tmtcc_config as g
from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand, TcQueueT
from tmtc_core.utility.obsw_logger import get_logger
from tc.obsw_tc_service200 import pack_mode_data
from pus_tc.tmtcc_tc_service200_mode import pack_mode_data
LOGGER = get_logger()
......@@ -52,9 +52,9 @@ def pack_boolean_parameter_setting(object_id: bytearray, domain_id: int,
def pack_service20_test_into(tc_queue: Deque, called_externally: bool = False) -> Deque:
# parameter IDs
parameterID0 = 0
parameterID1 = 256
parameterID2 = 512
parameter_id0 = 0
parameter_id1 = 256
parameter_id2 = 512
if called_externally is False:
tc_queue.appendleft(("print", "Testing Service 20"))
......@@ -68,48 +68,48 @@ def pack_service20_test_into(tc_queue: Deque, called_externally: bool = False) -
# test checking Load for uint32_t
tc_queue.appendleft(("print", "Testing Service 20: Load uint32_t"))
parameter_id = struct.pack(">I", parameterID0)
parameter_id = struct.pack(">I", parameter_id0)
type_and_matrix_data = pack_type_and_matrix_data(3, 14, 1, 1)
parameter_data = struct.pack(">I", 42)
payload = object_id + parameter_id+ type_and_matrix_data + parameter_data
payload = object_id + parameter_id + type_and_matrix_data + parameter_data
command = PusTelecommand(service=20, subservice=128, ssc=2010, app_data=payload)
tc_queue.appendleft(command.pack_command_tuple())
# test checking Dump for uint32_t
tc_queue.appendleft(("print", "Testing Service 20: Dump uint32_t"))
parameter_id = struct.pack(">I", parameterID0)
parameter_id = struct.pack(">I", parameter_id0)
payload = object_id + parameter_id
command = PusTelecommand(service=20, subservice=129, ssc=2020, app_data=payload)
tc_queue.appendleft(command.pack_command_tuple())
# test checking Load for int32_t
tc_queue.appendleft(("print", "Testing Service 20: Load int32_t"))
parameter_id = struct.pack(">I", parameterID1)
parameter_id = struct.pack(">I", parameter_id1)
type_and_matrix_data = pack_type_and_matrix_data(4, 14, 1, 1)
parameter_data = struct.pack(">i", -42)
payload = object_id + parameter_id+ type_and_matrix_data + parameter_data
payload = object_id + parameter_id + type_and_matrix_data + parameter_data
command = PusTelecommand(service=20, subservice=128, ssc=2030, app_data=payload)
tc_queue.appendleft(command.pack_command_tuple())
# test checking Dump for int32_t
tc_queue.appendleft(("print", "Testing Service 20: Dump int32_t"))
parameter_id = struct.pack(">I", parameterID1)
parameter_id = struct.pack(">I", parameter_id1)
payload = object_id + parameter_id
command = PusTelecommand(service=20, subservice=129, ssc=2040, app_data=payload)
tc_queue.appendleft(command.pack_command_tuple())
# test checking Load for float
tc_queue.appendleft(("print", "Testing Service 20: Load float"))
parameter_id = struct.pack(">I", parameterID2)
parameter_id = struct.pack(">I", parameter_id2)
type_and_matrix_data = pack_type_and_matrix_data(5, 1, 1, 1)
parameter_data = struct.pack(">f", 4.2)
payload = object_id + parameter_id+ type_and_matrix_data + parameter_data
payload = object_id + parameter_id + type_and_matrix_data + parameter_data
command = PusTelecommand(service=20, subservice=128, ssc=2050, app_data=payload)
tc_queue.appendleft(command.pack_command_tuple())
# test checking Dump for float
tc_queue.appendleft(("print", "Testing Service 20: Dump float"))
parameter_id = struct.pack(">I", parameterID2)
parameter_id = struct.pack(">I", parameter_id2)
payload = object_id + parameter_id
command = PusTelecommand(service=20, subservice=129, ssc=2060, app_data=payload)
tc_queue.appendleft(command.pack_command_tuple())
......
......@@ -4,11 +4,12 @@ Created: 21.01.2020 07:48
@author: Jakob Meier
"""
import config.obsw_config as g
import config.tmtcc_config as g
from typing import Deque, Union
from tc.obsw_pus_tc_packer import PusTelecommand, TcQueueT
from tc.obsw_tc_service8 import make_action_id
from tmtc_core.pus_tc.tmtcc_pus_tc_packer import TcQueueT
from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand
from tmtc_core.pus_tc.tmtcc_tc_service8_functional_cmd import make_action_id
from tmtc_core.utility.obsw_logger import get_logger
LOGGER = get_logger()
......
# -*- coding: utf-8 -*-
"""
@file obsw_tc_service2.py
@brief PUS Service 2: Device Access, Native low-level commanding
@file tmtcc_tc_service2_raw_cmd.py
@brief PUS Service 2: Device Access, native low-level commanding
@author R. Mueller
@date 01.11.2019
"""
import struct
import config.obsw_config as g
from tmtc_core.tc.obsw_pus_tc_base import PusTelecommand, Deque
from tc.obsw_tc_service200 import pack_mode_data
import config.tmtcc_config as g
from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand, Deque
from pus_tc.tmtcc_tc_service200_mode import pack_mode_data
def pack_service2_test_into(tc_queue: Deque, called_externally: bool = False) -> Deque:
......
# -*- coding: utf-8 -*-
"""
@file obsw_tc_service3.py
@file tmtcc_tc_service3_housekeeping.py
@brief PUS Service 3: Housekeeping Service.
@author R. Mueller
@date 02.05.2020
"""
import struct
from typing import Deque
from tmtc_core.tc.obsw_pus_tc_base import PusTelecommand
import config.obsw_config as g
from typing import Deque, Union
from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand
import config.tmtcc_config as g
def make_sid(object_id: bytearray, set_id: int) -> bytearray:
......@@ -23,12 +23,15 @@ def make_interval(interval_seconds: float) -> bytearray:
# adding custom defintion to hk using test pool variables
def pack_service3_test_into(tc_queue: Deque) -> Deque:
tc_queue.appendleft(("print", "Testing Service 3"))
# Predefined packet testing
pack_test_device_test(tc_queue)
pack_custom_tests(tc_queue)
pack_internal_error_reporter_tests(tc_queue)
def pack_service3_test_into(tc_queue: Deque, op_code: Union[str, int] = 0) -> Deque:
if op_code.lower() == "ie":
tc_queue.appendleft(("print", "Testing Internal Error Reporter"))
pack_internal_error_reporter_tests(tc_queue)
else:
tc_queue.appendleft(("print", "Testing Service 3"))
# Predefined packet testing
pack_test_device_test(tc_queue)
# pack_thermal_sensor_tests(tc_queue)
tc_queue.appendleft(("export", "log/tmtc_log_service3.txt"))
return tc_queue
......@@ -37,7 +40,7 @@ def pack_test_device_test(tc_queue: Deque):
pass
def pack_custom_tests(tc_queue: Deque):
def pack_thermal_sensor_tests(tc_queue: Deque):
tc_queue.appendleft(("print", "Generate one thermal sensor packet: "))
command = PusTelecommand(service=3, subservice=27, ssc=3100, app_data=sid_thermalsensor)
tc_queue.appendleft(command.pack_command_tuple())
......
# -*- coding: utf-8 -*-
"""
@file tmtcc_tc_service5_event.py
@brief PUS Service 5: Event Service
PUS Service 17: Test Service
@author R. Mueller
@date 02.05.2020
"""
from tmtc_core.pus_tc.tmtcc_pus_tc_packer import TcQueueT
from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand
from tmtc_core.pus_tc.tmtcc_tc_service17_test import pack_service17_ping_command
def pack_service5_test_into(tc_queue: TcQueueT):
tc_queue.appendleft(("print", "Testing Service 5"))
# invalid subservice
tc_queue.appendleft(("print", "Testing Service 5: Invalid subservice"))
command = PusTelecommand(service=5, subservice=1, ssc=500)
tc_queue.appendleft(command.pack_command_tuple())
# disable events
tc_queue.appendleft(("print", "Testing Service 5: Disable event"))
command = PusTelecommand(service=5, subservice=6, ssc=500)
tc_queue.appendleft(command.pack_command_tuple())
# trigger event
tc_queue.appendleft(("print", "Testing Service 5: Trigger event"))
command = PusTelecommand(service=17, subservice=128, ssc=510)
tc_queue.appendleft(command.pack_command_tuple())
# enable event
tc_queue.appendleft(("print", "Testing Service 5: Enable event"))
command = PusTelecommand(service=5, subservice=5, ssc=520)
tc_queue.appendleft(command.pack_command_tuple())
# trigger event
tc_queue.appendleft(("print", "Testing Service 5: Trigger another event"))
command = PusTelecommand(service=17, subservice=128, ssc=530)
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft(("export", "log/tmtc_log_service5.txt"))
# -*- coding: utf-8 -*-
"""
@file obsw_tc_service8.py
@file tmtcc_tc_service8_func_cmd.py
@brief PUS Service 8: High-level functional commanding.
@author R. Mueller
@date 01.11.2019
"""
import struct
from typing import Deque
import config.obsw_config as g
from tmtc_core.tc.obsw_pus_tc_base import PusTelecommand
from tc.obsw_tc_service200 import pack_mode_data
import config.tmtcc_config as g
from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand
from pus_tc.tmtcc_tc_service200_mode import pack_mode_data
def pack_service8_test_into(tc_queue: Deque, called_externally: bool = False) -> Deque:
......@@ -64,13 +63,3 @@ def pack_service8_test_into(tc_queue: Deque, called_externally: bool = False) ->
tc_queue.appendleft(("export", "log/tmtc_log_service8.txt"))
return tc_queue
def generate_action_command(object_id: bytearray, action_id: int, data: bytearray = bytearray([]),
ssc: int = 0):
data_to_pack = bytearray(object_id)
data_to_pack += make_action_id(action_id) + data
return PusTelecommand(service=8, subservice=128, ssc=ssc, app_data=data_to_pack)
def make_action_id(action_id: int) -> bytearray:
return bytearray(struct.pack('!I', action_id))
# -*- coding: utf-8 -*-
"""
@file obsw_tc_service9.py
@file tmtcc_tc_service9_time.py
@brief PUS Service 9: Time management.
@author R. Mueller
@date 01.11.2019
"""
from datetime import datetime
from tc.obsw_pus_tc_packer import TcQueueT, PusTelecommand
from tmtc_core.pus_tc.tmtcc_pus_tc_packer import TcQueueT
from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand
from tmtc_core.utility.obsw_logger import get_logger
LOGGER = get_logger()
......
File moved
import struct
from tmtc_core.tm.obsw_pus_tm_base import PusTelemetry
from tmtc_core.pus_tm.tmtcc_pus_tm_base import PusTelemetry
from tmtc_core.utility.obsw_logger import get_logger
from tm.obsw_tm_service_1 import Service1TM
from tm.obsw_tm_service_3 import Service3TM
from tm.obsw_tm_service_5 import Service5TM
from tm.obsw_tm_service_20 import Service20TM
from tm.obsw_tm_service_23 import Service23TM
from tmtc_core.pus_tm.tmtcc_tm_service_1 import Service1TM
from pus_tm.obsw_tm_service_3 import Service3TM
from tmtc_core.pus_tm.tmtcc_tm_service_5 import Service5TM
from pus_tm.obsw_tm_service_20 import Service20TM
from pus_tm.obsw_tm_service_23 import Service23TM
LOGGER = get_logger()
......
import struct
from tmtc_core.tm.obsw_pus_tm_base import PusTelemetry, TmDictionaryKeys, PusTmInfoT
from tmtc_core.pus_tm.tmtcc_pus_tm_base import PusTelemetry, TmDictionaryKeys, PusTmInfoT
from tmtc_core.utility.obsw_logger import get_logger
LOGGER = get_logger()
class Service20TM(PusTelemetry):
......@@ -22,7 +25,7 @@ class Service20TM(PusTelemetry):
if self.type_ptc == 5 and self.type_pfc == 1:
self.param = struct.unpack('>f', self._tm_data[12:datasize])[0]
else:
logger.info("Error when receiving Pus Service 20 TM: subservice is != 130, but 130 is \
LOGGER.info("Error when receiving Pus Service 20 TM: subservice is != 130, but 130 is \
only known subservice")
self.specify_packet_info("Functional Commanding Reply")
......
import struct
from tmtc_core.tm.obsw_pus_tm_base import PusTelemetry
from tmtc_core.pus_tm.tmtcc_pus_tm_base import PusTelemetry
from tmtc_core.utility.obsw_logger import get_logger
LOGGER = get_logger()
......
......@@ -6,11 +6,11 @@ Description: Deserialize Housekeeping TM
Author: R. Mueller
"""
from tmtc_core.tm.obsw_pus_tm_base import PusTelemetry
from tmtc_core.pus_tm.tmtcc_pus_tm_base import PusTelemetry
from typing import Type
from tmtc_core.utility.obsw_logger import get_logger
import struct
import config.obsw_config as g
import config.tmtcc_config as g
LOGGER = get_logger()
......
......@@ -44,12 +44,12 @@ from abc import abstractmethod
from collections import deque
from typing import Deque
from config import obsw_config as g
from tc.obsw_pus_tc_packer import pack_dummy_device_test_into
from tmtc_core.tc.obsw_pus_tc_base import PusTcInfoQueueT, TcDictionaryKeys
from tm.obsw_tm_service_1 import PusPacketInfoService1T
from tmtc_core.tm.obsw_pus_tm_base import TmDictionaryKeys
from tmtc_core.tm.obsw_pus_tm_factory import PusTmInfoQueueT, PusTmInfoT
from config import tmtcc_config as g
from pus_tc.tmtcc_tc_packer_hook import pack_dummy_device_test_into
from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTcInfoQueueT, TcDictionaryKeys
from tmtc_core.pus_tm.tmtcc_tm_service_1 import PusPacketInfoService1T
from tmtc_core.pus_tm.tmtcc_pus_tm_base import TmDictionaryKeys
from tmtc_core.pus_tm.tmtcc_pus_tm_factory import PusTmInfoQueueT, PusTmInfoT
from tmtc_core.sendreceive.obsw_multiple_commands_sender_receiver import MultipleCommandSenderReceiver
from tmtc_core.utility.obsw_logger import get_logger
......@@ -80,12 +80,12 @@ class TestService(unittest.TestCase):
:return:
"""
cls._displayMode = "long"
# wait intervals between tc send bursts.
# Example: [2,4] sends to send 2 tc from queue and wait, then sends another 2 and wait again
# wait intervals between pus_tc send bursts.
# Example: [2,4] sends to send 2 pus_tc from queue and wait, then sends another 2 and wait again
cls.wait_intervals = []
cls.wait_time = 5.0
cls.print_tc = True
# default wait time between tc send bursts
# default wait time between pus_tc send bursts
cls.test_queue = deque()
# Extremely ugly solution so that we can use the Python Unit Test Framework
......@@ -127,7 +127,7 @@ class TestService(unittest.TestCase):
TODO: Maybe we should instantiate this once in the main and then reuse it instead
of calling the constructor over and over again.
If done so we need a setter for: wait_time, wait_intervals, printTm,
tc_timeout_factor and the tc.queue. Furthermore, changing parameters should
tc_timeout_factor and the pus_tc.queue. Furthermore, changing parameters should
only be allowed as long as the commander/receiver is not running by checking a flag
:return:
"""
......@@ -199,7 +199,7 @@ class TestService(unittest.TestCase):
def scan_for_respective_tc(self, current_tm_info: PusTmInfoT):
"""
this function looks whether the tc verification SSC matched
this function looks whether the pus_tc verification SSC matched
a source sequence count of the sent TM
"""
current_subservice = current_tm_info[TmDictionaryKeys.SUBSERVICE]
......
......@@ -9,10 +9,10 @@ import unittest
from typing import Deque
from test.obsw_module_test import TestService, PusTmInfoQueueT, TmDictionaryKeys, AssertionDictKeys
from tmtc_core.tc.obsw_pus_tc_base import PusTcInfoQueueT
from tc.obsw_pus_tc_packer import pack_service17_test_into, pack_service5_test_into, \
pack_service2_test_into, pack_service8_test_into, pack_service200_test_into, pack_service20_test_into
import config.obsw_config as g
from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTcInfoQueueT
from pus_tc.tmtcc_tc_packer_hook import pack_service17_test_into, pack_service5_test_into, \
pack_service2_test_into, pack_service8_test_into, pack_service200_test_into
import config.tmtcc_config as g
from tmtc_core.utility.obsw_logger import get_logger
LOGGER = get_logger()
......@@ -183,8 +183,8 @@ class TestService17(TestService):
def _analyse_tm_tc_info(self, tm_info_queue: PusTmInfoQueueT, tc_info_queue: PusTcInfoQueueT):
assertion_dict = super()._analyse_tm_tc_info(tm_info_queue=tm_info_queue,
tc_info_queue=tc_info_queue)
# add anything elsee other than tc verification counter
# and ssc that is needed for tm analysis
# add anything elsee other than pus_tc verification counter
# and ssc that is needed for pus_tm analysis
return assertion_dict
def analyse_tc_info(self, tc_info_queue):
......@@ -256,8 +256,8 @@ class TestService200(TestService):
def _analyse_tm_tc_info(self, tm_info_queue: PusTmInfoQueueT, tc_info_queue: PusTcInfoQueueT):
assertion_dict = super()._analyse_tm_tc_info(tm_info_queue=tm_info_queue,
tc_info_queue=tc_info_queue)
# add anything else other than tc verification counter
# and ssc that is needed for tm analysis
# add anything else other than pus_tc verification counter
# and ssc that is needed for pus_tm analysis
return assertion_dict
def analyse_tc_info(self, tc_info_queue):
......
# -*- coding: utf-8 -*-
"""
Program: obsw_tm_service_1.py
Date: 30.12.2019
Description: Deserialize Pus Verification TM
Author: R. Mueller
"""
import struct
from typing import Dict
from tmtc_core.tm.obsw_pus_tm_base import PusTelemetry, TmDictionaryKeys
from tmtc_core.tm.obsw_pus_tm_creator import PusTelemetryCreator
from tmtc_core.utility.obsw_logger import get_logger
LOGGER = get_logger()
PusPacketInfoService1T = Dict[TmDictionaryKeys, any]
class Service1TM(PusTelemetry):
"""
Service 1 TM class representation. Can be used to deserialize raw service 1 packets.
"""
def __init__(self, byte_array: bytearray):
super().__init__(byte_array)
self.has_tc_error_code = False
self.is_step_reply = False
# Failure Reports with error code
self.err_code = 0
self.step_number = 0
self.error_param1 = 0
self.error_param2 = 0
self.tc_packet_id = self._tm_data[0] << 8 | self._tm_data[1]
self.tc_ssc = ((self._tm_data[2] & 0x3F) << 8) | self._tm_data[3]
if self.get_subservice() % 2 == 0:
self.__handle_failure_verification()
else:
self.__handle_success_verification()
def append_telemetry_content(self, content_list: list):
super().append_telemetry_content(content_list)
content_list.append(str(hex(self.tc_packet_id)))
content_list.append(str(self.tc_ssc))
if self.has_tc_error_code:
if self.is_step_reply:
content_list.append(str(self.step_number))
content_list.append(str(hex(self.err_code)))
content_list.append(str(hex(self.error_param1)) + ", " + str(self.error_param1))
content_list.append(str(hex(self.error_param2)) + ", " + str(self.error_param2))
elif self.is_step_reply:
content_list.append(str(self.step_number))
def append_telemetry_column_headers(self, header_list: list):
super().append_telemetry_column_headers(header_list)
header_list.append("TC Packet ID")
header_list.append("TC SSC")
if self.has_tc_error_code:
if self.is_step_reply:
header_list.append("Step Number")
header_list.append("Return Value")
header_list.append("Error Param 1")
header_list.append("Error Param 2")
elif self.is_step_reply:
header_list.append("Step Number")
def __handle_failure_verification(self):
self.specify_packet_info("Failure Verficiation")
self.has_tc_error_code = True
if self.get_subservice() == 2:
self.append_packet_info(" : Acceptance failure")
elif self.get_subservice() == 4:
self.append_packet_info(" : Start failure")
elif self.get_subservice() == 6:
self.is_step_reply = True
self.append_packet_info(" : Step Failure")
self.step_number = struct.unpack('>B', self._tm_data[4:5])[0]
self.err_code = struct.unpack('>H', self._tm_data[5:7])[0]
self.error_param1 = struct.unpack('>I', self._tm_data[7:11])[0]
self.error_param2 = struct.unpack('>I', self._tm_data[11:15])[0]
elif self.get_subservice() == 8:
self.err_code = struct.unpack('>H', self._tm_data[4:6])[0]
self.error_param1 = struct.unpack('>I', self._tm_data[6:10])[0]
self.error_param2 = struct.unpack('>I', self._tm_data[10:14])[0]
else:
LOGGER.error("Service1TM: Invalid subservice")
def __handle_success_verification(self):
self.specify_packet_info("Success Verification")
if self.get_subservice() == 1:
self.append_packet_info(" : Acceptance success")
elif self.get_subservice() == 3:
self.append_packet_info(" : Start success")
elif self.get_subservice() == 5:
self.is_step_reply = True
self.append_packet_info(" : Step Success")
self.step_number = struct.unpack('>B', self._tm_data[4:5])[0]
elif self.get_subservice() == 7:
self.append_packet_info(" : Completion success")
else:
LOGGER.error("Service1TM: Invalid subservice")
def pack_tm_information(self) -> PusPacketInfoService1T:
tm_information = super().pack_tm_information()
add_information = {
TmDictionaryKeys.TC_PACKET_ID: self.tc_packet_id,
TmDictionaryKeys.TC_SSC: self.tc_ssc,
}
tm_information.update(add_information)
if self.has_tc_error_code:
tm_information.update({TmDictionaryKeys.ERROR_CODE: self.err_code})
if self.is_step_reply:
tm_information.update({TmDictionaryKeys.STEP_NUMBER: self.step_number})
return tm_information
class Service1TmPacked(PusTelemetryCreator):
"""
Class representation for Service 1 TM creation.
"""
def __init__(self, subservice: int, ssc: int = 0, tc_packet_id: int = 0, tc_ssc: int = 0):
source_data = bytearray()
source_data.append((tc_packet_id & 0xFF00) >> 8)
source_data.append(tc_packet_id & 0xFF)
tc_psc = (tc_ssc & 0x3FFF) | (0b11 << 16)
source_data.append((tc_psc & 0xFF00) >> 8)
source_data.append(tc_psc & 0xFF)
super().__init__(service=1, subservice=subservice, ssc=ssc, source_data=source_data)
def pack(self) -> bytearray:
return super().pack()