Something went wrong on our end
Forked from an inaccessible project.
-
Robin.Mueller authoredRobin.Mueller authored
obsw_pus_tc_packer.py 5.09 KiB
# -*- coding: utf-8 -*-
"""
@file obsw_tc_packer.py
@brief Packs the TC queue for specific G_SERVICE or device testing
@details
Contains the sevice packet which can pack specific service queues (hardcoded for now)
@author R. Mueller
@date 01.11.2019
"""
import os
from tmtc_core.tc.obsw_pus_tc_base import PusTelecommand, TcQueueT
from tc.obsw_tc_service2 import pack_service2_test_into
from tc.obsw_tc_service3 import pack_service3_test_into
from tc.obsw_tc_service8 import pack_service8_test_into
from tc.obsw_tc_service9 import pack_service9_test_into
from tc.obsw_tc_service23_sdcard import pack_service23_commands_into
from tc.obsw_tc_service20 import pack_service20_test_into
from tc.obsw_tc_utility import pack_utility_command
from tc.obsw_tc_service200 import pack_mode_data, pack_service200_test_into
from tc.obsw_tc_service5_17 import pack_service5_test_into, pack_service17_test_into
from tc.obsw_image_handler import generate_img_handler_packet
from tc.obsw_tc_gps import pack_gps_test_into
from tc.obsw_tc_core import pack_core_command
from tmtc_core.utility.obsw_logger import get_logger
import config.obsw_config as g
from collections import deque
from typing import Union
LOGGER = get_logger()
class ServiceQueuePacker:
def __init__(self):
pass
@staticmethod
def pack_service_queue(service: Union[int, str], op_code: int, service_queue: TcQueueT):
if service == 2:
return pack_service2_test_into(service_queue)
if service == 3:
return pack_service3_test_into(service_queue)
if service == 5:
return pack_service5_test_into(service_queue)
if service == 8:
return pack_service8_test_into(service_queue)
if service == 9:
return pack_service9_test_into(service_queue)
if service == 17:
return pack_service17_test_into(service_queue, op_code)
if service == 20:
return pack_service20_test_into(service_queue)
if service == 23 or service.lower() == "sd":
return pack_service23_commands_into(service_queue, op_code)
if service == 200:
return pack_service200_test_into(service_queue)
if service.lower() == "dummy":
return pack_dummy_device_test_into(service_queue)
if service.lower() == "img":
return generate_img_handler_packet(service_queue, op_code)
if service.lower() == "core":
return pack_core_command(service_queue, op_code)
if service.lower() == "led":
return pack_utility_command(service_queue, op_code)
if service.lower() == "gps0":
# Object ID: GPS Device
object_id = g.GPS0_DEVICE_ID
return pack_gps_test_into(object_id, service_queue)
if service.lower() == "gps1":
# Object ID: GPS Device
object_id = g.GPS1_DEVICE_ID
return pack_gps_test_into(object_id, service_queue)
if service.lower() == "Error":
return pack_error_testing_into(service_queue)
LOGGER.warning("Invalid Service !")
# TODO: a way to select certain services would be nice (by passing a dict or array maybe)
def create_total_tc_queue() -> TcQueueT:
if not os.path.exists("log"):
os.mkdir("log")
tc_queue = deque()
tc_queue = pack_service2_test_into(tc_queue)
tc_queue = pack_service3_test_into(tc_queue)
tc_queue = pack_service5_test_into(tc_queue)
tc_queue = pack_service8_test_into(tc_queue)
tc_queue = pack_service9_test_into(tc_queue)
tc_queue = pack_service17_test_into(tc_queue)
tc_queue = pack_service20_test_into(tc_queue)
tc_queue = pack_service200_test_into(tc_queue)
tc_queue = pack_dummy_device_test_into(tc_queue)
object_id = g.GPS0_DEVICE_ID
tc_queue = pack_gps_test_into(object_id, tc_queue)
return tc_queue
def pack_dummy_device_test_into(tc_queue: TcQueueT) -> TcQueueT:
tc_queue.appendleft(("print", "Testing Dummy Device"))
# Object ID: Dummy Device
object_id = g.DUMMY_DEVICE_ID
# Set On Mode
tc_queue.appendleft(("print", "Testing Service Dummy: Set On"))
mode_data = pack_mode_data(object_id, 1, 0)
command = PusTelecommand(service=200, subservice=1, ssc=1, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
# Test Service 2 commands
tc_queue.appendleft(("print", "Testing Service Dummy: Service 2"))
pack_service2_test_into(tc_queue, True)
# Test Service 8
tc_queue.appendleft(("print", "Testing Service Dummy: Service 8"))
pack_service8_test_into(tc_queue, True)
tc_queue.appendleft(("export", "log/tmtc_log_service_dummy.txt"))
return tc_queue
def pack_error_testing_into(tc_queue: TcQueueT) -> TcQueueT:
# a lot of events
command = PusTelecommand(service=17, subservice=129, ssc=2010)
tc_queue.appendleft(command.pack_command_tuple())
# a lot of ping testing
command = PusTelecommand(service=17, subservice=130, ssc=2020)
tc_queue.appendleft(command.pack_command_tuple())
return tc_queue