Skip to content
Snippets Groups Projects
Forked from an inaccessible project.
obsw_tc_service23_sdcard.py 18.38 KiB
# -*- coding: utf-8 -*-
"""
Created: 21.01.2020 07:48

@author: Jakob Meier
"""
import config.obsw_config as g
from typing import Deque, Union

from tc.obsw_pus_tc_packer import PusTelecommand, TcQueueT
from tc.obsw_tc_service8 import make_action_id
from tmtc_core.utility.obsw_logger import get_logger

LOGGER = get_logger()


def generate_print_sd_card_packet(
        ssc: int, object_id: bytearray = g.SD_CARD_HANDLER_ID) -> PusTelecommand:
    app_data = bytearray(object_id)
    app_data += make_action_id(2)
    return PusTelecommand(service=8, subservice=128, ssc=ssc, app_data=app_data)


def generate_clear_sd_card_packet(
        ssc: int, object_id: bytearray = g.SD_CARD_HANDLER_ID) -> PusTelecommand:
    app_data = bytearray(object_id)
    app_data += make_action_id(20)
    return PusTelecommand(service=8, subservice=128, ssc=ssc, app_data=app_data)


def generate_format_sd_card_packet(
        ssc: int, object_id: bytearray = g.SD_CARD_HANDLER_ID) -> PusTelecommand:
    app_data = bytearray(object_id)
    app_data += make_action_id(21)
    return PusTelecommand(service=8, subservice=128, ssc=ssc, app_data=app_data)


def generate_generic_folder_structure(
        tc_queue: Deque, init_ssc: int, object_id: bytearray = g.SD_CARD_HANDLER_ID,
        iobc: bool = False):
    tc_queue.appendleft(("print", "Creating TC folder"))
    command = generate_mkdir_srv23_9_packet("TC", ssc=init_ssc, object_id=object_id)
    tc_queue.appendleft(command.pack_command_tuple())
    init_ssc += 1
    command = generate_mkdir_srv23_9_packet(
        repository_path="TC", directory_name="LARGE", ssc=init_ssc, object_id=object_id)
    tc_queue.appendleft(command.pack_command_tuple())
    init_ssc += 1
    command = generate_mkdir_srv23_9_packet(
        repository_path="TC", directory_name="SMALL", ssc=init_ssc, object_id=object_id)
    tc_queue.appendleft(command.pack_command_tuple())
    init_ssc += 1

    tc_queue.appendleft(("print", "Creating TM folder"))
    command = generate_mkdir_srv23_9_packet("TM", ssc=init_ssc, object_id=object_id)
    tc_queue.appendleft(command.pack_command_tuple())
    init_ssc += 1
    command = generate_mkdir_srv23_9_packet(
        repository_path="TM", directory_name="HK", ssc=init_ssc, object_id=object_id)
    tc_queue.appendleft(command.pack_command_tuple())
    init_ssc += 1
    command = generate_mkdir_srv23_9_packet(
        repository_path="TM", directory_name="SC", ssc=init_ssc, object_id=object_id)
    tc_queue.appendleft(command.pack_command_tuple())
    command = generate_mkdir_srv23_9_packet(
        repository_path="TM/SC", directory_name="LARGE", ssc=init_ssc, object_id=object_id)
    tc_queue.appendleft(command.pack_command_tuple())
    init_ssc += 1
    command = generate_mkdir_srv23_9_packet(
        repository_path="TM/SC", directory_name="SMALL", ssc=init_ssc, object_id=object_id)
    tc_queue.appendleft(command.pack_command_tuple())
    init_ssc += 1

    tc_queue.appendleft(("print", "Creating BIN folder"))
    command = generate_mkdir_srv23_9_packet("BIN", ssc=init_ssc, object_id=object_id)
    tc_queue.appendleft(command.pack_command_tuple())
    init_ssc += 1
    if iobc:
        command = generate_mkdir_srv23_9_packet(
            repository_path="BIN", directory_name="IOBC", ssc=init_ssc, object_id=object_id)
        tc_queue.appendleft(command.pack_command_tuple())
        init_ssc += 1
        command = generate_mkdir_srv23_9_packet(
            repository_path="BIN/IOBC", directory_name="BL", ssc=init_ssc, object_id=object_id)
        tc_queue.appendleft(command.pack_command_tuple())
        init_ssc += 1
        command = generate_mkdir_srv23_9_packet(
            repository_path="BIN/IOBC", directory_name="OBSW", ssc=init_ssc, object_id=object_id)
        tc_queue.appendleft(command.pack_command_tuple())
        init_ssc += 1
    else:
        command = generate_mkdir_srv23_9_packet(
            repository_path="BIN", directory_name="AT91", ssc=init_ssc, object_id=object_id)
        tc_queue.appendleft(command.pack_command_tuple())
        init_ssc += 1
        command = generate_mkdir_srv23_9_packet(
            repository_path="BIN/AT91", directory_name="BL", ssc=init_ssc, object_id=object_id)
        tc_queue.appendleft(command.pack_command_tuple())
        init_ssc += 1
        command = generate_mkdir_srv23_9_packet(
            repository_path="BIN/AT91", directory_name="OBSW", ssc=init_ssc, object_id=object_id)
        tc_queue.appendleft(command.pack_command_tuple())
        init_ssc += 1

    tc_queue.appendleft(("print", "Creating MISC folder"))
    command = generate_mkdir_srv23_9_packet("MISC", ssc=init_ssc, object_id=object_id)
    tc_queue.appendleft(command.pack_command_tuple())


def generate_create_file_srv23_1_packet(
        filename: str, repository_path: str, ssc: int, max_size_of_app_data: int,
        initial_data: bytearray = bytearray([]),
        object_id: bytearray = g.SD_CARD_HANDLER_ID) -> Union[PusTelecommand, None]:
    if len(initial_data) > calculate_allowed_file_data_size(
            max_size_of_app_data, filename, repository_path):
        LOGGER.error("generate_create_file_srv23_1_packet: Initial data too large!")
        return None
    data_to_pack = pack_generic_file_command_header(object_id=object_id, first_path=repository_path,
                                                    second_path=filename)
    data_to_pack += initial_data
    return PusTelecommand(service=23, subservice=1, ssc=ssc, app_data=data_to_pack)


def generate_rm_file_srv23_2_packet(filename: str, repository_path: str,
                                    ssc: int, object_id=g.SD_CARD_HANDLER_ID) -> PusTelecommand:
    """
    This function generates a packet which is used to delete a file on a
    file-system-capable on-board memory.
    @param filename: The name of the file to delete
    @param repository_path: The path where the directory shall be created
    @param ssc: source sequence count
    @param object_id: The object ID of the memory handler which manages the file system
    @return The telecommand.
    """
    data_to_pack = pack_generic_file_command_header(object_id=object_id, first_path=repository_path,
                                                    second_path=filename)
    return PusTelecommand(service=23, subservice=2, ssc=ssc, app_data=data_to_pack)


def generate_report_file_attr_srv23_3_packet(
        filename: str, repository_path: str, ssc: int,
        object_id=g.SD_CARD_HANDLER_ID) -> PusTelecommand:
    """
    @param filename: The name of the file to delete
    @param repository_path: The path where the directory shall be created
    @param ssc: source sequence count
    @param object_id: The object ID of the memory handler which manages the file system
    @return The telecommand.
    """
    data_to_pack = pack_generic_file_command_header(object_id=object_id, first_path=repository_path,
                                                    second_path=filename)
    return PusTelecommand(service=23, subservice=3, ssc=ssc, app_data=data_to_pack)


def generate_mkdir_srv23_9_packet(directory_name: str, ssc: int, repository_path: str = "/",
                                  object_id: bytearray = g.SD_CARD_HANDLER_ID) -> PusTelecommand:
    """
    This function generates a packet which is used to create directories on file systems.
    @param directory_name: The path where the directory shall be created
    @param repository_path: The name of the directory to create
    @param ssc: source sequence count
    @param object_id: The object ID of the memory handler which manages the file system
    @return The telecommand.
    """
    data_to_pack = pack_generic_file_command_header(object_id=object_id, first_path=repository_path,
                                                    second_path=directory_name)
    return PusTelecommand(service=23, subservice=9, ssc=ssc, app_data=data_to_pack)


def generate_rmdir_srv23_10_packet(directory_name: str, repository_path: str, ssc: int,
                                   object_id: bytearray = g.SD_CARD_HANDLER_ID) -> PusTelecommand:
    """
    This function generates a packet which deletes the a directory at the specified repository path.
    @param directory_name: Name of the directory to delete
    @param repository_path: Path to directory dirname
    @param ssc: source sequence count
    @param object_id: object ID of the memory handler (e.g. SD Card Handler)
    @return The telecommand.
    """
    data_to_pack = pack_generic_file_command_header(object_id=object_id, first_path=repository_path,
                                                    second_path=directory_name)
    return PusTelecommand(service=23, subservice=10, ssc=ssc, app_data=data_to_pack)


def generate_append_to_file_srv23_130_packet(
        filename: str, repository_path: str, packet_sequence_number: int,
        ssc: int, file_data: bytearray = bytearray([]),
        object_id: bytearray = g.SD_CARD_HANDLER_ID) -> PusTelecommand:
    data_to_pack = pack_generic_file_command_header(object_id=object_id, first_path=repository_path,
                                                    second_path=filename)
    data_to_pack += file_data
    data_to_pack.append(packet_sequence_number >> 8)
    data_to_pack.append(packet_sequence_number & 0xff)
    return PusTelecommand(service=23, subservice=130, ssc=ssc, app_data=data_to_pack)


def generate_finish_append_to_file_srv23_131_packet(
        filename: str, repository_path: str,  ssc: int, lock_file: bool = False,
        object_id: bytearray = g.SD_CARD_HANDLER_ID):
    data_to_pack = pack_generic_file_command_header(object_id=object_id, first_path=repository_path,
                                                    second_path=filename)
    data_to_pack.append(lock_file)
    return PusTelecommand(service=23, subservice=131, ssc=ssc, app_data=data_to_pack)


def generate_read_file_srv23_140_packet(
        repository_path: str, filename: str, ssc: int,
        object_id: bytearray = g.SD_CARD_HANDLER_ID) -> PusTelecommand:
    """
    This function generates the application data field for a PUS packet with service
    23 and subservie 140. Subservice 140 is a custom service to request the data of a file.
    @param repository_path: The path of the target file
    @param filename: Name of file from which the content shall be read
    @param ssc: source sequence count
    @param object_id: object ID of the memory handler (e.g. SD Card Handler)
    @return: The application data field of the (23,129) PUS packet
    """
    data_to_pack = pack_generic_file_command_header(object_id=object_id, first_path=repository_path,
                                                    second_path=filename)
    return PusTelecommand(service=23, subservice=140, ssc=ssc, app_data=data_to_pack)


def generate_lock_file_srv23_5_6_packet(ssc: int, lock: bool, repository_path: str,
                                        filename: str, object_id: bytearray = g.SD_CARD_HANDLER_ID):
    data_to_pack = pack_generic_file_command_header(object_id=object_id, first_path=repository_path,
                                                    second_path=filename)
    if lock:
        return PusTelecommand(service=23, subservice=5, ssc=ssc, app_data=data_to_pack)
    else:
        return PusTelecommand(service=23, subservice=6, ssc=ssc, app_data=data_to_pack)


def pack_service23_commands_into(tc_queue: TcQueueT, op_code: int) -> Deque:
    # sd_handler_id = g.SD_CARD_HANDLER_ID
    if op_code == 0:
        generate_generic_service_23_test(tc_queue)
    elif op_code == "3" or op_code == 3:
        LOGGER.info("Press h in the following input requests")
        LOGGER.info("to send a command to display the folder structure instead")
        (repo_path, filename) = prompt_for_repo_filename()
        if repo_path == "" and filename == "h":
            tc_queue.appendleft(generate_print_sd_card_packet(ssc=0).pack_command_tuple())
        elif repo_path == "" and filename == "c":
            return tc_queue
        else:
            tc_queue.append(("print", "Requesting file attributes"))
            command = generate_report_file_attr_srv23_3_packet(
                ssc=0, filename=filename, repository_path=repo_path)
            tc_queue.appendleft(command.pack_command_tuple())
    elif op_code == "5" or op_code == 5:
        LOGGER.info("Press h in the following input requests")
        LOGGER.info("to send a command to display the folder structure instead")
        (repo_path, filename) = prompt_for_repo_filename()
        if repo_path == "" and filename == "h":
            tc_queue.append(("print", "Printing active file system"))
            tc_queue.appendleft(generate_print_sd_card_packet(ssc=0).pack_command_tuple())
        elif repo_path == "" and filename == "c":
            return tc_queue
        else:
            tc_queue.append(("print", "Locking file"))
            command = generate_lock_file_srv23_5_6_packet(ssc=0, repository_path=repo_path,
                                                          filename=filename, lock=True)
            tc_queue.appendleft(command.pack_command_tuple())
    elif op_code == "6" or op_code == 6:
        LOGGER.info("Press h in the following input requests to send a command to display the"
                    "folder structure instead")
        (repo_path, filename) = prompt_for_repo_filename()
        if repo_path == "" and filename == "h":
            tc_queue.append(("print", "Printing active file system"))
            tc_queue.appendleft(generate_print_sd_card_packet(ssc=0).pack_command_tuple())
        elif repo_path == "" and filename == "c":
            return tc_queue
        else:
            tc_queue.append(("print", "Unlocking file"))
            command = generate_lock_file_srv23_5_6_packet(ssc=0, repository_path=repo_path,
                                                          filename=filename, lock=False)
            tc_queue.appendleft(command.pack_command_tuple())
    elif op_code == "A2":
        tc_queue.append(("print", "Printing active file system"))
        command = generate_print_sd_card_packet(ssc=2300)
        tc_queue.appendleft(command.pack_command_tuple())
    elif op_code == "A20":
        tc_queue.append(("print", "Clearing active file system"))
        command = generate_clear_sd_card_packet(ssc=2300)
        tc_queue.appendleft(command.pack_command_tuple())
    elif op_code == "A21":
        tc_queue.append(("print", "Formatting active file system"))
        command = generate_format_sd_card_packet(ssc=2300)
        tc_queue.appendleft(command.pack_command_tuple())
    elif op_code == "C0A":
        tc_queue.append(("print", "Generating generic folder structure on AT91"))
        generate_generic_folder_structure(tc_queue, init_ssc=0, iobc=False)
    elif op_code == "C0I":
        tc_queue.append(("print", "Generating generic folder structure on iOBC"))
        generate_generic_folder_structure(tc_queue, init_ssc=0, iobc=True)

    return tc_queue


def generate_generic_service_23_test(tc_queue: TcQueueT):
    tc_queue.appendleft(("print", "Testing Service 23"))

    tc_queue.append(("print", "Create directory 'test'"))
    command = generate_mkdir_srv23_9_packet(directory_name="test", repository_path="/", ssc=2300)
    tc_queue.appendleft(command.pack_command_tuple())
    tc_queue.append(("print", "Create subdirectory 'subdir' in 'test'"))
    command = generate_mkdir_srv23_9_packet(repository_path="test", ssc=2301,
                                            directory_name="subdir")
    tc_queue.appendleft(command.pack_command_tuple())

    tc_queue.appendleft(("print", "Create test.bin"))
    command = generate_create_file_srv23_1_packet(
        filename="test.bin", repository_path="test/subdir", ssc=2302,
        initial_data=bytearray([0x01, 0x00, 0x01, 0x00]),
        max_size_of_app_data=1024)
    tc_queue.appendleft(command.pack_command_tuple())

    tc_queue.appendleft(("print", "Printing file structure."))
    command = generate_print_sd_card_packet(ssc=2300)
    tc_queue.appendleft(command.pack_command_tuple())

    tc_queue.appendleft(("print", "Clearing SD card"))
    command = generate_clear_sd_card_packet(ssc=2301)
    tc_queue.appendleft(command.pack_command_tuple())

    tc_queue.appendleft(("print", "Printing file structure"))
    command = generate_print_sd_card_packet(ssc=2302)
    tc_queue.appendleft(command.pack_command_tuple())

    # tc_queue.appendleft(("print", "Read data of test.bin"))
    # command = generate_read_file_srv23_129_packet("test/subdir", "test.bin", ssc=2303)
    # tc_queue.appendleft(command.pack_command_tuple())
    tc_queue.appendleft(("print", "Delete 'test.bin'"))
    command = generate_rm_file_srv23_2_packet(
        filename="test.bin", repository_path="test/subdir", ssc=2304)
    tc_queue.appendleft(command.pack_command_tuple())
    tc_queue.appendleft(("print", "Delete 'subdir' directory"))
    command = generate_rmdir_srv23_10_packet(directory_name="subdir", repository_path="test",
                                             ssc=2305)
    tc_queue.appendleft(command.pack_command_tuple())
    tc_queue.appendleft(("print", "Delete 'test' directory"))
    command = generate_rmdir_srv23_10_packet(directory_name="test", repository_path="/",
                                             ssc=2306)
    tc_queue.appendleft(command.pack_command_tuple())


def calculate_allowed_file_data_size(max_app_data_size: int, filename: str, repository: str):
    # Subtract two because of '\0' terminators
    return max_app_data_size - len(filename) - len(repository) - 2


def prompt_for_repo_filename():
    input_confirmation = False
    repo_path = ""
    filename = ""
    while not input_confirmation:
        repo_path = input("Please type in repository path [c to cancel]: ")
        if repo_path == "h":
            return "", "h"
        filename = input("Please type in filename path [c to cancel]: ")
        print("Selection for repostiory path: " + str(repo_path))
        print("Selection for filename: " + str(filename))
        if repo_path == "c" or filename == "c":
            return "", "c"
        if repo_path == "h" or filename == "h":
            return "", "h"
        input_confirmation = input("Confirm selection [y/n]: ")
        if input_confirmation in ['y', "yes", 1]:
            input_confirmation = True
    return repo_path, filename


def pack_generic_file_command_header(object_id: bytearray, first_path: str, second_path: str):
    data_to_pack = bytearray(object_id)
    data_to_pack += first_path.encode('utf-8')
    # Add string terminator of repository paht
    data_to_pack.append(0)
    data_to_pack += second_path.encode('utf-8')
    # Add string terminator of filename
    data_to_pack.append(0)
    return data_to_pack