Skip to content
Snippets Groups Projects
Forked from an inaccessible project.
obsw_tc_service23.py 13.03 KiB
# -*- coding: utf-8 -*-
"""
Created: 21.01.2020 07:48

@author: Jakob Meier
"""
import math
import config.obsw_config as g
from typing import Deque

from tc.obsw_pus_tc_packer import PusTelecommand


class Service23WriteToFile:
    """ This class generates the telecommand packet for the custom PUS service [23, 128]
    @param objectID(bytearray): The objectID of the filesystem handler (e.g. SD_CARD_HANDLER_ID, PLOCHandler)
    @param repository_path(str): The directory of the file
    @param filename(str): The name of the file (e.g. boot.bin)
    @param file_data(bytearray): The data to write to the file
    """
    def __init__(self, repository_path: str, filename: str, size_of_data_blocks: int,
                 object_id: bytearray = bytearray([]), file_data: bytearray = bytearray([])):
        """

        @param repository_path:
        @param filename:
        @param object_id:
        @param file_data:
        """
        self.file_data = []
        self.number_of_packets = 0
        self.file_data = file_data
        self.data_to_pack = object_id
        repository_path_length_h = len(bytearray(repository_path, 'utf-8')) >> 8
        repository_path_length_l = 0x00FF & len(bytearray(repository_path, 'utf-8'))
        self.data_to_pack.append(repository_path_length_h)
        self.data_to_pack.append(repository_path_length_l)
        if repository_path != "":
            self.data_to_pack += bytearray(repository_path, 'utf-8')
        self.data_to_pack.append(len(bytearray(filename, 'utf-8')))
        self.data_to_pack += bytearray(filename, 'utf-8')
        self.pus_packets = self.split_large_file(size_of_data_blocks)

    def split_large_file(self, size_of_data_blocks):
        """ This function splits a large file in multiple packets
            This is necessary because the packet size is limited

        :param size_of_data_blocks: The file is splitted in data blocks of this size
        :return: List containing the PUS packets generated for each data block
        """
        self.number_of_packets = math.floor(len(self.file_data) / size_of_data_blocks)
        packetNumber = 0

        commands = []
        for i in range(self.number_of_packets):
            # append packet number
            self.data_to_pack.append(size_of_data_blocks)
            self.data_to_pack.append(packetNumber >> 8)
            self.data_to_pack.append(0xFF & packetNumber)
            self.data_to_pack += self.file_data[i * size_of_data_blocks:(i + 1) * size_of_data_blocks]
            commands.append(PusTelecommand(service=23, subservice=128, ssc=21, app_data=self.data_to_pack))
            # Last data block, packet number and data block size is removed to create new command with next data block,
            # packet number and data block size
            self.data_to_pack = self.data_to_pack[:len(self.data_to_pack) - size_of_data_blocks - 2 - 1]
            packetNumber = packetNumber + 1
        # Calculate remaining data bytes
        remaining_data_size = len(self.file_data) - self.number_of_packets * size_of_data_blocks
        self.data_to_pack.append(remaining_data_size)
        self.data_to_pack.append(packetNumber >> 8)
        self.data_to_pack.append(0xFF & packetNumber)
        self.data_to_pack += self.file_data[self.number_of_packets * size_of_data_blocks:len(self.file_data)]
        commands.append(PusTelecommand(service=23, subservice=128, ssc=21, app_data=self.data_to_pack))
        return commands

    def get_number_of_packets(self):
        return self.number_of_packets

# def sendSoftwareUpdate(comInterface, tmtcPrinter, tmListener, tmTimeout, tcTimeoutFactor, doPrintToFile):
#     """ This function sends all packets to upload a new version of the software image
#         The new image is assumed to be stored in the tmtc directory
#     """
#     image = open("sourceobsw-at91sam9g20_ek-sdram.bin", "rb").read()
#     update = Service23WriteToFile("", "boot.bin", objectID=[0x4D, 0x00, 0x73, 0xAD], file_data=image)
#     updateQueue = deque()
#     updateQueue.appendleft(("print", "Service 23 Software Update"))
#     i = 1
#     for packet in update.pus_packets:
#         updateQueue.appendleft(
#             ("print", "Packet " + str(i) + "/" + str(len(update.pus_packets)) + " of software update sent"))
#         updateQueue.appendleft(packet.packCommandTuple())
#         i = i + 1
#     updateQueue.appendleft(("print", "Transmission of software update complete"))
#     SenderAndReceiver = SequentialCommandSenderReceiver(comInterface, tmtcPrinter, tmListener, tmTimeout, updateQueue,
#                                                         tcTimeoutFactor, doPrintToFile)
#     SenderAndReceiver.sendQueueTcAndReceiveTmSequentially()


def generate_service23_subservice2_packet(filename: str, repositoryPath: str = "",
                                          object_id=bytearray([])):
    """ This function generates the application data field of a service 23/subservice 2 to
        delete a file on a file-system-capable on-board memory.
    : param filename: The name of the file to delete
            repository_path: The path where the directory shall be created
            objectID: The object ID of the memory handler which manages the file system
    :return The TC[23,2] PUS packet
    """
    data_to_pack = object_id
    data_to_pack += bytearray(repositoryPath, 'utf-8')
    # Add string terminator to repository_path
    data_to_pack.append(0)
    data_to_pack += bytearray(filename, 'utf-8')
    # Add string terminator to filename
    data_to_pack.append(0)
    return PusTelecommand(service=23, subservice=2, ssc=21, app_data=data_to_pack)


def generate_service23_subservice9_packet(directory_name: str, repository_path: str = "",
                                          object_id=bytearray([])):
    """ This function generates the application data field of a service 23/subservice 9 packet.
        This service can be used to create directories on file systems.
    : param repository_path: The path where the directory shall be created
            directoryName: The name of the directory to create
            objectID: The object ID of the memory handler which manages the file system
    :return
    """
    data_to_pack = object_id
    data_to_pack += bytearray(repository_path, 'utf-8')
    # Add string terminator to repository path
    data_to_pack.append(0)
    data_to_pack += bytearray(directory_name, 'utf-8')
    # Add string terminator to directory name
    data_to_pack.append(0)
    return PusTelecommand(service=23, subservice=9, ssc=21, app_data=data_to_pack)


def generateService23Subservice10Packet(directory_name: str, repository_path: str = "",
                                        object_id=bytearray([])):
    """ This function generates the application data field for a PUS packet with service
        23 and subservie 10.
        This service deletes the directory dirname.
    @param directory_name: Name of the directory to delete
    @param repository_path: Path to directory dirname
    @param object_id: object ID of the memory handler (e.g. SD Card Handler)
    @return The application data field of the (23,10) PUS packet
    """
    data_to_pack = object_id
    data_to_pack += bytearray(repository_path, 'utf-8')
    # Add string terminator of repository path
    data_to_pack.append(0)
    data_to_pack += bytearray(directory_name, 'utf-8')
    # Add string terminator of directory name
    data_to_pack.append(0)
    return PusTelecommand(service=23, subservice=10, ssc=21, app_data=data_to_pack)


def generate_service23_subservice128_packet(repository_path: str, filename: str,
                                            object_id: bytearray = bytearray([]),
                                            file_data: bytearray = bytearray([])):
    """ This function generates the application data field for a PUS packet with service
        23 and subservie 128. Subservice 128 is a custom service to write data in a file.
        Additionally file is created if not already existing.
    @param repository_path: The path of the target file
    @param filename: Name of file from which the content shall be read
    @param object_id: object ID of the memory handler (e.g. SD Card Handler)
    @param file_data: The data to write in the file
    @return: The application data field of the (23,128) PUS packet
    """
    data_to_pack = object_id
    data_to_pack += bytearray(repository_path, 'utf-8')
    # Add string terminator of repository path
    data_to_pack.append(0)
    data_to_pack += bytearray(filename, 'utf-8')
    # Add string terminator of filename
    data_to_pack.append(0)
    return split_large_file(data_to_pack, 236, file_data)


def split_large_file(data_to_pack: bytearray, size_of_data_blocks, data: bytearray):
    """
    This function splits a large file in multiple packets.
    This is necessary because the packet size is limited.
    @param data_to_pack: bytearray of data. data will be split and appended to this bytearray
    @param size_of_data_blocks: The file is splitted in data blocks of this size
    @param data: The data to pack in multiple packets
    @return List containing the PUS packets generated for each data block
    """
    numberOfPackets = math.floor(len(data) / size_of_data_blocks)

    packetNumber = 0

    commands = []
    for i in range(numberOfPackets):
        data_to_pack.append(packetNumber >> 8)
        data_to_pack.append(0xFF & packetNumber)
        data_to_pack += data[i * size_of_data_blocks:(i + 1) * size_of_data_blocks]
        commands.append(PusTelecommand(service=23, subservice=128, ssc=21, app_data=data_to_pack))
        # Last data block, packet number and data block size is removed to create new command with
        # next data block, packet number
        data_to_pack = data_to_pack[:len(data_to_pack) - size_of_data_blocks - 2]
        packetNumber = packetNumber + 1
    data_to_pack.append(packetNumber >> 8)
    data_to_pack.append(0xFF & packetNumber)
    data_to_pack += data[numberOfPackets * size_of_data_blocks:len(data)]
    commands.append(PusTelecommand(service=23, subservice=128, ssc=21, app_data=data_to_pack))
    return commands


def generate_service23_subservice129_packet(repository_path: str, filename: str,
                                            object_id: bytearray = bytearray([])):
    """
    This function generates the application data field for a PUS packet with service
    23 and subservie 129. Subservice 129 is a custom service to request the data of a file.
    @param repository_path: The path of the target file
    @param filename: Name of file from which the content shall be read
    @param object_id: object ID of the memory handler (e.g. SD Card Handler)
    @return: The application data field of the (23,129) PUS packet
    """
    data_to_pack = object_id
    data_to_pack += bytearray(repository_path, 'utf-8')
    # Add string terminator of repository paht
    data_to_pack.append(0)
    data_to_pack += bytearray(filename, 'utf-8')
    # Add string terminator of filename
    data_to_pack.append(0)
    return PusTelecommand(service=23, subservice=129, ssc=21, app_data=data_to_pack)


def pack_service23_test_into(tc_queue: Deque) -> Deque:
    sd_handler_id = g.SD_CARD_HANDLER_ID
    tc_queue.appendleft(("print", "Testing Service 23"))
    tc_queue.append(("print", "Create directory 'test'"))
    command = generate_service23_subservice9_packet(directory_name="test", object_id=sd_handler_id)
    tc_queue.appendleft(command.pack_command_tuple())
    # tc_queue.append(("print", "Create subdirectory 'subdir' in 'test'"))
    # command = generate_service23_subservice9_packet(repository_path="test", directory_name="subdir",
    #                                                 object_id=sd_handler_id)
    # tc_queue.appendleft(command.pack_command_tuple())
    # tc_queue.appendleft(("print", "Create and write in test.bin"))
    # command = generate_service23_subservice128_packet(
    #     "test/subdir", "test.bin", sd_handler_id, file_data=bytearray([0x01, 0x00, 0x01, 0x00]))
    # tc_queue.appendleft(command[0].pack_command_tuple())
    # tc_queue.appendleft(("print", "Read data of test.bin"))
    # command = generate_service23_subservice129_packet("test/subdir", "test.bin", sd_handler_id)
    # tc_queue.appendleft(command.pack_command_tuple())
    # tc_queue.appendleft(("print", "Delete 'test.bin'"))
    # command = generate_service23_subservice2_packet("test.bin", "test/subdir", object_id=sd_handler_id)
    # tc_queue.appendleft(command.pack_command_tuple())
    # tc_queue.appendleft(("print", "Delete 'subdir' directory"))
    # command = generateService23Subservice10Packet("subdir", "test", object_id=sd_handler_id)
    # tc_queue.appendleft(command.pack_command_tuple())
    # tc_queue.appendleft(("print", "Delete 'test' directory"))
    # command = generateService23Subservice10Packet(directory_name="test", object_id=sd_handler_id)
    # tc_queue.appendleft(command.pack_command_tuple())
    tc_queue.appendleft(("print", "\r"))
    return tc_queue