From 72a935686fc605132d832085f3d087011cf1ba29 Mon Sep 17 00:00:00 2001 From: "Robin.Mueller" <robin.mueller.m@gmail.com> Date: Tue, 6 Oct 2020 16:06:12 +0200 Subject: [PATCH] file renamed --- tc/obsw_pus_tc_packer.py | 2 +- ...rvice23.py => obsw_tc_service23_sdcard.py} | 944 +++++++++--------- utility/obsw_binary_uploader.py | 2 +- 3 files changed, 474 insertions(+), 474 deletions(-) rename tc/{obsw_tc_service23.py => obsw_tc_service23_sdcard.py} (97%) diff --git a/tc/obsw_pus_tc_packer.py b/tc/obsw_pus_tc_packer.py index 5af7472..3dc996b 100644 --- a/tc/obsw_pus_tc_packer.py +++ b/tc/obsw_pus_tc_packer.py @@ -14,7 +14,7 @@ from tc.obsw_tc_service2 import pack_service2_test_into from tc.obsw_tc_service3 import pack_service3_test_into from tc.obsw_tc_service8 import pack_service8_test_into from tc.obsw_tc_service9 import pack_service9_test_into -from tc.obsw_tc_service23 import pack_service23_commands_into +from tc.obsw_tc_service23_sdcard import pack_service23_commands_into from tc.obsw_tc_service20 import pack_service20_test_into from tc.obsw_tc_service200 import pack_mode_data, pack_service200_test_into from tc.obsw_tc_service5_17 import pack_service5_test_into, pack_service17_test_into diff --git a/tc/obsw_tc_service23.py b/tc/obsw_tc_service23_sdcard.py similarity index 97% rename from tc/obsw_tc_service23.py rename to tc/obsw_tc_service23_sdcard.py index 5db47d7..d149523 100644 --- a/tc/obsw_tc_service23.py +++ b/tc/obsw_tc_service23_sdcard.py @@ -1,472 +1,472 @@ -# -*- coding: utf-8 -*- -""" -Created: 21.01.2020 07:48 - -@author: Jakob Meier -""" -import math -from enum import Enum - -import config.obsw_config as g -from typing import Deque, Union - -from tc.obsw_pus_tc_packer import PusTelecommand, TcQueueT -from tc.obsw_tc_service8 import make_action_id -from tmtc_core.utility.obsw_logger import get_logger - -LOGGER = get_logger() - - -class FileTransferHelper: - """ - This helper class fills the provided TC queue with appropriate PUS telecommands - to transfer a large file. - There are three modes which determine which telecommands will be generated: - 1. NORMAL: Generate telecommand to create a new file and append data packets if - the file data is too large. This will be the default mode. - 2. DELETE_OLD: Generate telecommand to delete old file and then perform same steps as the - normal mode - 3. RENAME_OLD: Rename old file and then perform same steps as in normal mode. - - Please note that the setter functions set_data have to be used to assign data, otherwise - an empty file will be created. The mode is set with setter commands as well. - """ - class TransferMode(Enum): - # Normal mode - NORMAL = 1 - # Generate a command to delete the old file first - DELETE_OLD = 2 - # Generate a command to rename the old file first. - RENAME_OLD = 3 - - def __init__(self, tc_queue: TcQueueT, max_size_of_app_data: int, - target_repository: str, target_filename: str, - object_id=g.SD_CARD_HANDLER_ID): - """ - @param tc_queue: TC queue which will be filled - @param max_size_of_app_data: Maximum allowed app data size. Number of generated packets - will depend on this value - @param target_repository: Repository path on target. - @param target_filename: Filename on target - @param object_id: - """ - self.transfer_mode = self.TransferMode.NORMAL - self.max_size_of_app_data = max_size_of_app_data - self.max_file_data_size = 0 - self.allowed_file_data_size = calculate_allowed_file_data_size( - max_size_of_app_data, target_filename, target_repository) - self.target_filename = target_filename - self.target_repository = target_repository - self.renamed_name = self.target_filename + "old" - self.object_id = object_id - self.tc_queue = tc_queue - self.file_data = bytearray() - self.local_filename = "" - # This will generate a telecommand to delete the old file, if it exists - self.delete_old_file = False - # This will generater a telecommand to rename the old file, if it exists - self.rename_old_file = False - - def set_data_from_file(self, local_filename: str): - with open(local_filename, 'rb') as file: - self.file_data = file.read() - - def set_data_raw(self, tc_data: bytearray): - self.file_data = tc_data - - def set_to_delete_old_file(self): - self.transfer_mode = self.TransferMode.DELETE_OLD - - def set_to_rename_old_file(self, renamed_name: str): - self.transfer_mode = self.TransferMode.RENAME_OLD - self.renamed_name = renamed_name - - def set_max_file_data_size(self, max_file_data_size: int): - """ - If this value is specified and the source file is large (larger than the maximum allowed - app data!), the file data size will be set to this value. - @param max_file_data_size: - @return: - """ - self.max_file_data_size = max_file_data_size - - def queue_size(self): - return len(self.tc_queue) - - def generate_packets(self, ssc: int): - if self.transfer_mode == self.TransferMode.DELETE_OLD: - command = generate_rm_file_srv23_2_packet(self.target_filename, self.target_repository, - ssc, self.object_id) - ssc += 1 - self.tc_queue.appendleft(command.pack_command_tuple()) - elif self.transfer_mode == self.TransferMode.RENAME_OLD: - # not implemented yet - pass - if len(self.file_data) > self.allowed_file_data_size: - # Large file, create file with init_data - if self.max_file_data_size > 0: - init_data = self.file_data[0:self.max_file_data_size] - else: - init_data = self.file_data[0:self.allowed_file_data_size] - else: - # Small file, one packet for file creation sufficient - command = generate_create_file_srv23_1_packet( - self.target_filename, self.target_repository, ssc, self.max_size_of_app_data, - self.file_data) - ssc += 1 - self.tc_queue.appendleft(command.pack_command_tuple()) - return - - # Create large file. - command = generate_create_file_srv23_1_packet( - self.target_filename, self.target_repository, ssc, self.max_size_of_app_data, - init_data) - ssc += 1 - self.tc_queue.appendleft(command.pack_command_tuple()) - - rest_of_data = self.file_data[self.allowed_file_data_size:] - # Generate the rest of the packets to write to large file - if self.max_file_data_size > 0: - self.__generate_append_to_file_packets_automatically( - rest_of_data, self.max_file_data_size, ssc) - else: - self.__generate_append_to_file_packets_automatically( - rest_of_data, self.max_size_of_app_data, ssc) - - def __generate_append_to_file_packets_automatically( - self, data: bytearray, size_of_data_blocks: int, init_ssc: int): - """ - This function generates PUS packets which is used to write data in a file. - A new file will be created if not already existing. If the file already exists, this might - lead to - - If the file data is larger than the maximum allowed size of application data, this function - will split the data into multiple packets and increment the initial SSC number by one for - each packet. - @param data: Data which will be split up. - @param init_ssc: First SSC, which will be incremented for each packet. - """ - header = self.object_id - header += bytearray(self.target_repository, 'utf-8') - # Add string terminator of repository path - header.append(0) - header += bytearray(self.target_filename, 'utf-8') - # Add string terminator of filename - header.append(0) - self.__split_large_file(header, size_of_data_blocks, data, init_ssc) - - def __split_large_file(self, header: bytearray, size_of_data_blocks: int, - data: bytearray, init_ssc: int): - """ - This function splits a large file in multiple packets and packs the generated packets - into the member deque. This is necessary because the packet size is limited. - @param header: Repository and file name which will always stay the same - @param size_of_data_blocks: The file data blocks will have this size - @param data: The data to pack in multiple packets - @param init_ssc: The ssc of the first command, will be incremented by one for each packet. - """ - number_of_packets = math.floor(len(data) / size_of_data_blocks) - packet_sequence_number = 0 - - for i in range(number_of_packets): - header.append(packet_sequence_number >> 8) - header.append(0xFF & packet_sequence_number) - header += data[i * size_of_data_blocks:(i + 1) * size_of_data_blocks] - - commands = PusTelecommand(service=23, subservice=128, ssc=init_ssc + i, - app_data=header) - self.tc_queue.appendleft(commands.pack_command_tuple()) - - # Remove everything except the header - header = header[:len(header) - size_of_data_blocks - 2] - packet_sequence_number = packet_sequence_number + 1 - header.append(packet_sequence_number >> 8) - header.append(0xFF & packet_sequence_number) - header += data[number_of_packets * size_of_data_blocks:len(data)] - commands = PusTelecommand(service=23, subservice=128, ssc=init_ssc + packet_sequence_number, - app_data=header) - self.tc_queue.appendleft(commands.pack_command_tuple()) - - -def generate_print_sd_card_packet( - ssc: int, object_id: bytearray = g.SD_CARD_HANDLER_ID) -> PusTelecommand: - app_data = bytearray(object_id) - app_data += make_action_id(2) - return PusTelecommand(service=8, subservice=128, ssc=ssc, app_data=app_data) - - -def generate_clear_sd_card_packet( - ssc: int, object_id: bytearray = g.SD_CARD_HANDLER_ID) -> PusTelecommand: - app_data = bytearray(object_id) - app_data += make_action_id(20) - return PusTelecommand(service=8, subservice=128, ssc=ssc, app_data=app_data) - - -def generate_format_sd_card_packet( - ssc: int, object_id: bytearray = g.SD_CARD_HANDLER_ID) -> PusTelecommand: - app_data = bytearray(object_id) - app_data += make_action_id(21) - return PusTelecommand(service=8, subservice=128, ssc=ssc, app_data=app_data) - - -def generate_generic_folder_structure( - tc_queue: Deque, init_ssc: int, object_id: bytearray = g.SD_CARD_HANDLER_ID, - iobc: bool = False): - tc_queue.appendleft(("print", "Creating TC folder")) - command = generate_mkdir_srv23_9_packet("TC", ssc=init_ssc, object_id=object_id) - tc_queue.appendleft(command.pack_command_tuple()) - init_ssc += 1 - command = generate_mkdir_srv23_9_packet( - repository_path="TC", directory_name="LARGE", ssc=init_ssc, object_id=object_id) - tc_queue.appendleft(command.pack_command_tuple()) - init_ssc += 1 - command = generate_mkdir_srv23_9_packet( - repository_path="TC", directory_name="SMALL", ssc=init_ssc, object_id=object_id) - tc_queue.appendleft(command.pack_command_tuple()) - init_ssc += 1 - - tc_queue.appendleft(("print", "Creating TM folder")) - command = generate_mkdir_srv23_9_packet("TM", ssc=init_ssc, object_id=object_id) - tc_queue.appendleft(command.pack_command_tuple()) - init_ssc += 1 - command = generate_mkdir_srv23_9_packet( - repository_path="TM", directory_name="HK", ssc=init_ssc, object_id=object_id) - tc_queue.appendleft(command.pack_command_tuple()) - init_ssc += 1 - command = generate_mkdir_srv23_9_packet( - repository_path="TM", directory_name="SC", ssc=init_ssc, object_id=object_id) - tc_queue.appendleft(command.pack_command_tuple()) - command = generate_mkdir_srv23_9_packet( - repository_path="TM/SC", directory_name="LARGE", ssc=init_ssc, object_id=object_id) - tc_queue.appendleft(command.pack_command_tuple()) - init_ssc += 1 - command = generate_mkdir_srv23_9_packet( - repository_path="TM/SC", directory_name="SMALL", ssc=init_ssc, object_id=object_id) - tc_queue.appendleft(command.pack_command_tuple()) - init_ssc += 1 - - tc_queue.appendleft(("print", "Creating BIN folder")) - command = generate_mkdir_srv23_9_packet("BIN", ssc=init_ssc, object_id=object_id) - tc_queue.appendleft(command.pack_command_tuple()) - init_ssc += 1 - if iobc: - command = generate_mkdir_srv23_9_packet( - repository_path="BIN", directory_name="IOBC", ssc=init_ssc, object_id=object_id) - tc_queue.appendleft(command.pack_command_tuple()) - init_ssc += 1 - command = generate_mkdir_srv23_9_packet( - repository_path="BIN/IOBC", directory_name="BL", ssc=init_ssc, object_id=object_id) - tc_queue.appendleft(command.pack_command_tuple()) - init_ssc += 1 - command = generate_mkdir_srv23_9_packet( - repository_path="BIN/IOBC", directory_name="OBSW", ssc=init_ssc, object_id=object_id) - tc_queue.appendleft(command.pack_command_tuple()) - init_ssc += 1 - else: - command = generate_mkdir_srv23_9_packet( - repository_path="BIN", directory_name="AT91", ssc=init_ssc, object_id=object_id) - tc_queue.appendleft(command.pack_command_tuple()) - init_ssc += 1 - command = generate_mkdir_srv23_9_packet( - repository_path="BIN/AT91", directory_name="BL", ssc=init_ssc, object_id=object_id) - tc_queue.appendleft(command.pack_command_tuple()) - init_ssc += 1 - command = generate_mkdir_srv23_9_packet( - repository_path="BIN/AT91", directory_name="OBSW", ssc=init_ssc, object_id=object_id) - tc_queue.appendleft(command.pack_command_tuple()) - init_ssc += 1 - - tc_queue.appendleft(("print", "Creating MISC folder")) - command = generate_mkdir_srv23_9_packet("MISC", ssc=init_ssc, object_id=object_id) - tc_queue.appendleft(command.pack_command_tuple()) - - -def generate_create_file_srv23_1_packet( - filename: str, repository_path: str, ssc: int, max_size_of_app_data: int, - initial_data: bytearray = bytearray([]), - object_id: bytearray = g.SD_CARD_HANDLER_ID) -> Union[PusTelecommand, None]: - if len(initial_data) > calculate_allowed_file_data_size( - max_size_of_app_data, filename, repository_path): - LOGGER.error("generate_create_file_srv23_1_packet: Initial data too large!") - return None - data_to_pack = bytearray(object_id) - data_to_pack += bytearray(repository_path, 'utf-8') - # Add string terminator to repository_path - data_to_pack.append(0) - data_to_pack += bytearray(filename, 'utf-8') - # Add string terminator to filename - data_to_pack.append(0) - - data_to_pack += initial_data - return PusTelecommand(service=23, subservice=1, ssc=ssc, app_data=data_to_pack) - - -def generate_rm_file_srv23_2_packet(filename: str, repository_path: str, - ssc: int, object_id=g.SD_CARD_HANDLER_ID) -> PusTelecommand: - """ - This function generates a packet which is used to delete a file on a - file-system-capable on-board memory. - @param filename: The name of the file to delete - @param repository_path: The path where the directory shall be created - @param ssc: source sequence count - @param object_id: The object ID of the memory handler which manages the file system - :return The TC[23,2] PUS packet - """ - data_to_pack = bytearray(object_id) - data_to_pack += bytearray(repository_path, 'utf-8') - # Add string terminator to repository_path - data_to_pack.append(0) - data_to_pack += bytearray(filename, 'utf-8') - # Add string terminator to filename - data_to_pack.append(0) - return PusTelecommand(service=23, subservice=2, ssc=ssc, app_data=data_to_pack) - - -def generate_mkdir_srv23_9_packet(directory_name: str, ssc: int, repository_path: str = "/", - object_id: bytearray = g.SD_CARD_HANDLER_ID) -> PusTelecommand: - """ - This function generates a packet which is used to create directories on file systems. - @param directory_name: The path where the directory shall be created - @param repository_path: The name of the directory to create - @param ssc: source sequence count - @param object_id: The object ID of the memory handler which manages the file system - :return - """ - data_to_pack = bytearray(object_id) - data_to_pack += bytearray(repository_path, 'utf-8') - # Add string terminator to repository path - data_to_pack.append(0) - data_to_pack += bytearray(directory_name, 'utf-8') - # Add string terminator to directory name - data_to_pack.append(0) - return PusTelecommand(service=23, subservice=9, ssc=ssc, app_data=data_to_pack) - - -def generate_rmdir_srv23_10_packet(directory_name: str, repository_path: str, ssc: int, - object_id: bytearray = g.SD_CARD_HANDLER_ID) -> PusTelecommand: - """ - This function generates a packet which deletes the a directory at the specified repository path. - @param directory_name: Name of the directory to delete - @param repository_path: Path to directory dirname - @param ssc: source sequence count - @param object_id: object ID of the memory handler (e.g. SD Card Handler) - @return The application data field of the (23,10) PUS packet - """ - data_to_pack = bytearray(object_id) - data_to_pack += bytearray(repository_path, 'utf-8') - # Add string terminator of repository path - data_to_pack.append(0) - data_to_pack += bytearray(directory_name, 'utf-8') - # Add string terminator of directory name - data_to_pack.append(0) - return PusTelecommand(service=23, subservice=10, ssc=ssc, app_data=data_to_pack) - - -def generate_append_to_file_srv23_128_packet( - filename: str, repository_path: str, packet_sequence_number: int, - ssc: int, file_data: bytearray = bytearray([]), - object_id: bytearray = g.SD_CARD_HANDLER_ID) -> PusTelecommand: - data_to_pack = bytearray(object_id) - data_to_pack += bytearray(repository_path, 'utf-8') - # Add string terminator of repository path - data_to_pack.append(0) - data_to_pack += bytearray(filename, 'utf-8') - # Add string terminator of filename - data_to_pack.append(0) - data_to_pack += file_data - data_to_pack.append(packet_sequence_number >> 8) - data_to_pack.append(packet_sequence_number & 0xff) - return PusTelecommand(service=23, subservice=128, ssc=ssc, app_data=data_to_pack) - - -def generate_read_file_srv23_129_packet( - repository_path: str, filename: str, ssc: int, - object_id: bytearray = g.SD_CARD_HANDLER_ID) -> PusTelecommand: - """ - This function generates the application data field for a PUS packet with service - 23 and subservie 129. Subservice 129 is a custom service to request the data of a file. - @param repository_path: The path of the target file - @param filename: Name of file from which the content shall be read - @param ssc: source sequence count - @param object_id: object ID of the memory handler (e.g. SD Card Handler) - @return: The application data field of the (23,129) PUS packet - """ - data_to_pack = bytearray(object_id) - data_to_pack += bytearray(repository_path, 'utf-8') - # Add string terminator of repository paht - data_to_pack.append(0) - data_to_pack += bytearray(filename, 'utf-8') - # Add string terminator of filename - data_to_pack.append(0) - return PusTelecommand(service=23, subservice=129, ssc=ssc, app_data=data_to_pack) - - -def pack_service23_commands_into(tc_queue: Deque, op_code: int) -> Deque: - # sd_handler_id = g.SD_CARD_HANDLER_ID - if op_code == 0: - tc_queue.appendleft(("print", "Testing Service 23")) - - tc_queue.append(("print", "Create directory 'test'")) - command = generate_mkdir_srv23_9_packet(directory_name="test", repository_path="/", ssc=2300) - tc_queue.appendleft(command.pack_command_tuple()) - tc_queue.append(("print", "Create subdirectory 'subdir' in 'test'")) - command = generate_mkdir_srv23_9_packet(repository_path="test", ssc=2301, - directory_name="subdir") - tc_queue.appendleft(command.pack_command_tuple()) - - tc_queue.appendleft(("print", "Create test.bin")) - command = generate_create_file_srv23_1_packet( - filename="test.bin", repository_path="test/subdir", ssc=2302, - initial_data=bytearray([0x01, 0x00, 0x01, 0x00]), - max_size_of_app_data=1024) - tc_queue.appendleft(command.pack_command_tuple()) - - tc_queue.appendleft(("print", "Printing file structure.")) - command = generate_print_sd_card_packet(ssc=2300) - tc_queue.appendleft(command.pack_command_tuple()) - - tc_queue.appendleft(("print", "Clearing SD card")) - command = generate_clear_sd_card_packet(ssc=2301) - tc_queue.appendleft(command.pack_command_tuple()) - - tc_queue.appendleft(("print", "Printing file structure")) - command = generate_print_sd_card_packet(ssc=2302) - tc_queue.appendleft(command.pack_command_tuple()) - - # tc_queue.appendleft(("print", "Read data of test.bin")) - # command = generate_read_file_srv23_129_packet("test/subdir", "test.bin", ssc=2303) - # tc_queue.appendleft(command.pack_command_tuple()) - tc_queue.appendleft(("print", "Delete 'test.bin'")) - command = generate_rm_file_srv23_2_packet( - filename="test.bin", repository_path="test/subdir", ssc=2304) - tc_queue.appendleft(command.pack_command_tuple()) - tc_queue.appendleft(("print", "Delete 'subdir' directory")) - command = generate_rmdir_srv23_10_packet(directory_name="subdir", repository_path="test", - ssc=2305) - tc_queue.appendleft(command.pack_command_tuple()) - tc_queue.appendleft(("print", "Delete 'test' directory")) - command = generate_rmdir_srv23_10_packet(directory_name="test", repository_path="/", - ssc=2306) - tc_queue.appendleft(command.pack_command_tuple()) - elif op_code == "A2": - tc_queue.append(("print", "Printing active file system")) - command = generate_print_sd_card_packet(ssc=2300) - tc_queue.appendleft(command.pack_command_tuple()) - elif op_code == "A20": - tc_queue.append(("print", "Clearing active file system")) - command = generate_clear_sd_card_packet(ssc=2300) - tc_queue.appendleft(command.pack_command_tuple()) - elif op_code == "A21": - tc_queue.append(("print", "Formatting active file system")) - command = generate_format_sd_card_packet(ssc=2300) - tc_queue.appendleft(command.pack_command_tuple()) - elif op_code == "C0A": - generate_generic_folder_structure(tc_queue, init_ssc=0, iobc=False) - elif op_code == "C0I": - generate_generic_folder_structure(tc_queue, init_ssc=0, iobc=True) - return tc_queue - - -def calculate_allowed_file_data_size(max_app_data_size: int, filename: str, repository: str): - # Subtract two because of '\0' terminators - return max_app_data_size - len(filename) - len(repository) - 2 +# -*- coding: utf-8 -*- +""" +Created: 21.01.2020 07:48 + +@author: Jakob Meier +""" +import math +from enum import Enum + +import config.obsw_config as g +from typing import Deque, Union + +from tc.obsw_pus_tc_packer import PusTelecommand, TcQueueT +from tc.obsw_tc_service8 import make_action_id +from tmtc_core.utility.obsw_logger import get_logger + +LOGGER = get_logger() + + +class FileTransferHelper: + """ + This helper class fills the provided TC queue with appropriate PUS telecommands + to transfer a large file. + There are three modes which determine which telecommands will be generated: + 1. NORMAL: Generate telecommand to create a new file and append data packets if + the file data is too large. This will be the default mode. + 2. DELETE_OLD: Generate telecommand to delete old file and then perform same steps as the + normal mode + 3. RENAME_OLD: Rename old file and then perform same steps as in normal mode. + + Please note that the setter functions set_data have to be used to assign data, otherwise + an empty file will be created. The mode is set with setter commands as well. + """ + class TransferMode(Enum): + # Normal mode + NORMAL = 1 + # Generate a command to delete the old file first + DELETE_OLD = 2 + # Generate a command to rename the old file first. + RENAME_OLD = 3 + + def __init__(self, tc_queue: TcQueueT, max_size_of_app_data: int, + target_repository: str, target_filename: str, + object_id=g.SD_CARD_HANDLER_ID): + """ + @param tc_queue: TC queue which will be filled + @param max_size_of_app_data: Maximum allowed app data size. Number of generated packets + will depend on this value + @param target_repository: Repository path on target. + @param target_filename: Filename on target + @param object_id: + """ + self.transfer_mode = self.TransferMode.NORMAL + self.max_size_of_app_data = max_size_of_app_data + self.max_file_data_size = 0 + self.allowed_file_data_size = calculate_allowed_file_data_size( + max_size_of_app_data, target_filename, target_repository) + self.target_filename = target_filename + self.target_repository = target_repository + self.renamed_name = self.target_filename + "old" + self.object_id = object_id + self.tc_queue = tc_queue + self.file_data = bytearray() + self.local_filename = "" + # This will generate a telecommand to delete the old file, if it exists + self.delete_old_file = False + # This will generater a telecommand to rename the old file, if it exists + self.rename_old_file = False + + def set_data_from_file(self, local_filename: str): + with open(local_filename, 'rb') as file: + self.file_data = file.read() + + def set_data_raw(self, tc_data: bytearray): + self.file_data = tc_data + + def set_to_delete_old_file(self): + self.transfer_mode = self.TransferMode.DELETE_OLD + + def set_to_rename_old_file(self, renamed_name: str): + self.transfer_mode = self.TransferMode.RENAME_OLD + self.renamed_name = renamed_name + + def set_max_file_data_size(self, max_file_data_size: int): + """ + If this value is specified and the source file is large (larger than the maximum allowed + app data!), the file data size will be set to this value. + @param max_file_data_size: + @return: + """ + self.max_file_data_size = max_file_data_size + + def queue_size(self): + return len(self.tc_queue) + + def generate_packets(self, ssc: int): + if self.transfer_mode == self.TransferMode.DELETE_OLD: + command = generate_rm_file_srv23_2_packet(self.target_filename, self.target_repository, + ssc, self.object_id) + ssc += 1 + self.tc_queue.appendleft(command.pack_command_tuple()) + elif self.transfer_mode == self.TransferMode.RENAME_OLD: + # not implemented yet + pass + if len(self.file_data) > self.allowed_file_data_size: + # Large file, create file with init_data + if self.max_file_data_size > 0: + init_data = self.file_data[0:self.max_file_data_size] + else: + init_data = self.file_data[0:self.allowed_file_data_size] + else: + # Small file, one packet for file creation sufficient + command = generate_create_file_srv23_1_packet( + self.target_filename, self.target_repository, ssc, self.max_size_of_app_data, + self.file_data) + ssc += 1 + self.tc_queue.appendleft(command.pack_command_tuple()) + return + + # Create large file. + command = generate_create_file_srv23_1_packet( + self.target_filename, self.target_repository, ssc, self.max_size_of_app_data, + init_data) + ssc += 1 + self.tc_queue.appendleft(command.pack_command_tuple()) + + rest_of_data = self.file_data[self.allowed_file_data_size:] + # Generate the rest of the packets to write to large file + if self.max_file_data_size > 0: + self.__generate_append_to_file_packets_automatically( + rest_of_data, self.max_file_data_size, ssc) + else: + self.__generate_append_to_file_packets_automatically( + rest_of_data, self.max_size_of_app_data, ssc) + + def __generate_append_to_file_packets_automatically( + self, data: bytearray, size_of_data_blocks: int, init_ssc: int): + """ + This function generates PUS packets which is used to write data in a file. + A new file will be created if not already existing. If the file already exists, this might + lead to + + If the file data is larger than the maximum allowed size of application data, this function + will split the data into multiple packets and increment the initial SSC number by one for + each packet. + @param data: Data which will be split up. + @param init_ssc: First SSC, which will be incremented for each packet. + """ + header = self.object_id + header += bytearray(self.target_repository, 'utf-8') + # Add string terminator of repository path + header.append(0) + header += bytearray(self.target_filename, 'utf-8') + # Add string terminator of filename + header.append(0) + self.__split_large_file(header, size_of_data_blocks, data, init_ssc) + + def __split_large_file(self, header: bytearray, size_of_data_blocks: int, + data: bytearray, init_ssc: int): + """ + This function splits a large file in multiple packets and packs the generated packets + into the member deque. This is necessary because the packet size is limited. + @param header: Repository and file name which will always stay the same + @param size_of_data_blocks: The file data blocks will have this size + @param data: The data to pack in multiple packets + @param init_ssc: The ssc of the first command, will be incremented by one for each packet. + """ + number_of_packets = math.floor(len(data) / size_of_data_blocks) + packet_sequence_number = 0 + + for i in range(number_of_packets): + header.append(packet_sequence_number >> 8) + header.append(0xFF & packet_sequence_number) + header += data[i * size_of_data_blocks:(i + 1) * size_of_data_blocks] + + commands = PusTelecommand(service=23, subservice=128, ssc=init_ssc + i, + app_data=header) + self.tc_queue.appendleft(commands.pack_command_tuple()) + + # Remove everything except the header + header = header[:len(header) - size_of_data_blocks - 2] + packet_sequence_number = packet_sequence_number + 1 + header.append(packet_sequence_number >> 8) + header.append(0xFF & packet_sequence_number) + header += data[number_of_packets * size_of_data_blocks:len(data)] + commands = PusTelecommand(service=23, subservice=128, ssc=init_ssc + packet_sequence_number, + app_data=header) + self.tc_queue.appendleft(commands.pack_command_tuple()) + + +def generate_print_sd_card_packet( + ssc: int, object_id: bytearray = g.SD_CARD_HANDLER_ID) -> PusTelecommand: + app_data = bytearray(object_id) + app_data += make_action_id(2) + return PusTelecommand(service=8, subservice=128, ssc=ssc, app_data=app_data) + + +def generate_clear_sd_card_packet( + ssc: int, object_id: bytearray = g.SD_CARD_HANDLER_ID) -> PusTelecommand: + app_data = bytearray(object_id) + app_data += make_action_id(20) + return PusTelecommand(service=8, subservice=128, ssc=ssc, app_data=app_data) + + +def generate_format_sd_card_packet( + ssc: int, object_id: bytearray = g.SD_CARD_HANDLER_ID) -> PusTelecommand: + app_data = bytearray(object_id) + app_data += make_action_id(21) + return PusTelecommand(service=8, subservice=128, ssc=ssc, app_data=app_data) + + +def generate_generic_folder_structure( + tc_queue: Deque, init_ssc: int, object_id: bytearray = g.SD_CARD_HANDLER_ID, + iobc: bool = False): + tc_queue.appendleft(("print", "Creating TC folder")) + command = generate_mkdir_srv23_9_packet("TC", ssc=init_ssc, object_id=object_id) + tc_queue.appendleft(command.pack_command_tuple()) + init_ssc += 1 + command = generate_mkdir_srv23_9_packet( + repository_path="TC", directory_name="LARGE", ssc=init_ssc, object_id=object_id) + tc_queue.appendleft(command.pack_command_tuple()) + init_ssc += 1 + command = generate_mkdir_srv23_9_packet( + repository_path="TC", directory_name="SMALL", ssc=init_ssc, object_id=object_id) + tc_queue.appendleft(command.pack_command_tuple()) + init_ssc += 1 + + tc_queue.appendleft(("print", "Creating TM folder")) + command = generate_mkdir_srv23_9_packet("TM", ssc=init_ssc, object_id=object_id) + tc_queue.appendleft(command.pack_command_tuple()) + init_ssc += 1 + command = generate_mkdir_srv23_9_packet( + repository_path="TM", directory_name="HK", ssc=init_ssc, object_id=object_id) + tc_queue.appendleft(command.pack_command_tuple()) + init_ssc += 1 + command = generate_mkdir_srv23_9_packet( + repository_path="TM", directory_name="SC", ssc=init_ssc, object_id=object_id) + tc_queue.appendleft(command.pack_command_tuple()) + command = generate_mkdir_srv23_9_packet( + repository_path="TM/SC", directory_name="LARGE", ssc=init_ssc, object_id=object_id) + tc_queue.appendleft(command.pack_command_tuple()) + init_ssc += 1 + command = generate_mkdir_srv23_9_packet( + repository_path="TM/SC", directory_name="SMALL", ssc=init_ssc, object_id=object_id) + tc_queue.appendleft(command.pack_command_tuple()) + init_ssc += 1 + + tc_queue.appendleft(("print", "Creating BIN folder")) + command = generate_mkdir_srv23_9_packet("BIN", ssc=init_ssc, object_id=object_id) + tc_queue.appendleft(command.pack_command_tuple()) + init_ssc += 1 + if iobc: + command = generate_mkdir_srv23_9_packet( + repository_path="BIN", directory_name="IOBC", ssc=init_ssc, object_id=object_id) + tc_queue.appendleft(command.pack_command_tuple()) + init_ssc += 1 + command = generate_mkdir_srv23_9_packet( + repository_path="BIN/IOBC", directory_name="BL", ssc=init_ssc, object_id=object_id) + tc_queue.appendleft(command.pack_command_tuple()) + init_ssc += 1 + command = generate_mkdir_srv23_9_packet( + repository_path="BIN/IOBC", directory_name="OBSW", ssc=init_ssc, object_id=object_id) + tc_queue.appendleft(command.pack_command_tuple()) + init_ssc += 1 + else: + command = generate_mkdir_srv23_9_packet( + repository_path="BIN", directory_name="AT91", ssc=init_ssc, object_id=object_id) + tc_queue.appendleft(command.pack_command_tuple()) + init_ssc += 1 + command = generate_mkdir_srv23_9_packet( + repository_path="BIN/AT91", directory_name="BL", ssc=init_ssc, object_id=object_id) + tc_queue.appendleft(command.pack_command_tuple()) + init_ssc += 1 + command = generate_mkdir_srv23_9_packet( + repository_path="BIN/AT91", directory_name="OBSW", ssc=init_ssc, object_id=object_id) + tc_queue.appendleft(command.pack_command_tuple()) + init_ssc += 1 + + tc_queue.appendleft(("print", "Creating MISC folder")) + command = generate_mkdir_srv23_9_packet("MISC", ssc=init_ssc, object_id=object_id) + tc_queue.appendleft(command.pack_command_tuple()) + + +def generate_create_file_srv23_1_packet( + filename: str, repository_path: str, ssc: int, max_size_of_app_data: int, + initial_data: bytearray = bytearray([]), + object_id: bytearray = g.SD_CARD_HANDLER_ID) -> Union[PusTelecommand, None]: + if len(initial_data) > calculate_allowed_file_data_size( + max_size_of_app_data, filename, repository_path): + LOGGER.error("generate_create_file_srv23_1_packet: Initial data too large!") + return None + data_to_pack = bytearray(object_id) + data_to_pack += bytearray(repository_path, 'utf-8') + # Add string terminator to repository_path + data_to_pack.append(0) + data_to_pack += bytearray(filename, 'utf-8') + # Add string terminator to filename + data_to_pack.append(0) + + data_to_pack += initial_data + return PusTelecommand(service=23, subservice=1, ssc=ssc, app_data=data_to_pack) + + +def generate_rm_file_srv23_2_packet(filename: str, repository_path: str, + ssc: int, object_id=g.SD_CARD_HANDLER_ID) -> PusTelecommand: + """ + This function generates a packet which is used to delete a file on a + file-system-capable on-board memory. + @param filename: The name of the file to delete + @param repository_path: The path where the directory shall be created + @param ssc: source sequence count + @param object_id: The object ID of the memory handler which manages the file system + :return The TC[23,2] PUS packet + """ + data_to_pack = bytearray(object_id) + data_to_pack += bytearray(repository_path, 'utf-8') + # Add string terminator to repository_path + data_to_pack.append(0) + data_to_pack += bytearray(filename, 'utf-8') + # Add string terminator to filename + data_to_pack.append(0) + return PusTelecommand(service=23, subservice=2, ssc=ssc, app_data=data_to_pack) + + +def generate_mkdir_srv23_9_packet(directory_name: str, ssc: int, repository_path: str = "/", + object_id: bytearray = g.SD_CARD_HANDLER_ID) -> PusTelecommand: + """ + This function generates a packet which is used to create directories on file systems. + @param directory_name: The path where the directory shall be created + @param repository_path: The name of the directory to create + @param ssc: source sequence count + @param object_id: The object ID of the memory handler which manages the file system + :return + """ + data_to_pack = bytearray(object_id) + data_to_pack += bytearray(repository_path, 'utf-8') + # Add string terminator to repository path + data_to_pack.append(0) + data_to_pack += bytearray(directory_name, 'utf-8') + # Add string terminator to directory name + data_to_pack.append(0) + return PusTelecommand(service=23, subservice=9, ssc=ssc, app_data=data_to_pack) + + +def generate_rmdir_srv23_10_packet(directory_name: str, repository_path: str, ssc: int, + object_id: bytearray = g.SD_CARD_HANDLER_ID) -> PusTelecommand: + """ + This function generates a packet which deletes the a directory at the specified repository path. + @param directory_name: Name of the directory to delete + @param repository_path: Path to directory dirname + @param ssc: source sequence count + @param object_id: object ID of the memory handler (e.g. SD Card Handler) + @return The application data field of the (23,10) PUS packet + """ + data_to_pack = bytearray(object_id) + data_to_pack += bytearray(repository_path, 'utf-8') + # Add string terminator of repository path + data_to_pack.append(0) + data_to_pack += bytearray(directory_name, 'utf-8') + # Add string terminator of directory name + data_to_pack.append(0) + return PusTelecommand(service=23, subservice=10, ssc=ssc, app_data=data_to_pack) + + +def generate_append_to_file_srv23_128_packet( + filename: str, repository_path: str, packet_sequence_number: int, + ssc: int, file_data: bytearray = bytearray([]), + object_id: bytearray = g.SD_CARD_HANDLER_ID) -> PusTelecommand: + data_to_pack = bytearray(object_id) + data_to_pack += bytearray(repository_path, 'utf-8') + # Add string terminator of repository path + data_to_pack.append(0) + data_to_pack += bytearray(filename, 'utf-8') + # Add string terminator of filename + data_to_pack.append(0) + data_to_pack += file_data + data_to_pack.append(packet_sequence_number >> 8) + data_to_pack.append(packet_sequence_number & 0xff) + return PusTelecommand(service=23, subservice=128, ssc=ssc, app_data=data_to_pack) + + +def generate_read_file_srv23_129_packet( + repository_path: str, filename: str, ssc: int, + object_id: bytearray = g.SD_CARD_HANDLER_ID) -> PusTelecommand: + """ + This function generates the application data field for a PUS packet with service + 23 and subservie 129. Subservice 129 is a custom service to request the data of a file. + @param repository_path: The path of the target file + @param filename: Name of file from which the content shall be read + @param ssc: source sequence count + @param object_id: object ID of the memory handler (e.g. SD Card Handler) + @return: The application data field of the (23,129) PUS packet + """ + data_to_pack = bytearray(object_id) + data_to_pack += bytearray(repository_path, 'utf-8') + # Add string terminator of repository paht + data_to_pack.append(0) + data_to_pack += bytearray(filename, 'utf-8') + # Add string terminator of filename + data_to_pack.append(0) + return PusTelecommand(service=23, subservice=129, ssc=ssc, app_data=data_to_pack) + + +def pack_service23_commands_into(tc_queue: Deque, op_code: int) -> Deque: + # sd_handler_id = g.SD_CARD_HANDLER_ID + if op_code == 0: + tc_queue.appendleft(("print", "Testing Service 23")) + + tc_queue.append(("print", "Create directory 'test'")) + command = generate_mkdir_srv23_9_packet(directory_name="test", repository_path="/", ssc=2300) + tc_queue.appendleft(command.pack_command_tuple()) + tc_queue.append(("print", "Create subdirectory 'subdir' in 'test'")) + command = generate_mkdir_srv23_9_packet(repository_path="test", ssc=2301, + directory_name="subdir") + tc_queue.appendleft(command.pack_command_tuple()) + + tc_queue.appendleft(("print", "Create test.bin")) + command = generate_create_file_srv23_1_packet( + filename="test.bin", repository_path="test/subdir", ssc=2302, + initial_data=bytearray([0x01, 0x00, 0x01, 0x00]), + max_size_of_app_data=1024) + tc_queue.appendleft(command.pack_command_tuple()) + + tc_queue.appendleft(("print", "Printing file structure.")) + command = generate_print_sd_card_packet(ssc=2300) + tc_queue.appendleft(command.pack_command_tuple()) + + tc_queue.appendleft(("print", "Clearing SD card")) + command = generate_clear_sd_card_packet(ssc=2301) + tc_queue.appendleft(command.pack_command_tuple()) + + tc_queue.appendleft(("print", "Printing file structure")) + command = generate_print_sd_card_packet(ssc=2302) + tc_queue.appendleft(command.pack_command_tuple()) + + # tc_queue.appendleft(("print", "Read data of test.bin")) + # command = generate_read_file_srv23_129_packet("test/subdir", "test.bin", ssc=2303) + # tc_queue.appendleft(command.pack_command_tuple()) + tc_queue.appendleft(("print", "Delete 'test.bin'")) + command = generate_rm_file_srv23_2_packet( + filename="test.bin", repository_path="test/subdir", ssc=2304) + tc_queue.appendleft(command.pack_command_tuple()) + tc_queue.appendleft(("print", "Delete 'subdir' directory")) + command = generate_rmdir_srv23_10_packet(directory_name="subdir", repository_path="test", + ssc=2305) + tc_queue.appendleft(command.pack_command_tuple()) + tc_queue.appendleft(("print", "Delete 'test' directory")) + command = generate_rmdir_srv23_10_packet(directory_name="test", repository_path="/", + ssc=2306) + tc_queue.appendleft(command.pack_command_tuple()) + elif op_code == "A2": + tc_queue.append(("print", "Printing active file system")) + command = generate_print_sd_card_packet(ssc=2300) + tc_queue.appendleft(command.pack_command_tuple()) + elif op_code == "A20": + tc_queue.append(("print", "Clearing active file system")) + command = generate_clear_sd_card_packet(ssc=2300) + tc_queue.appendleft(command.pack_command_tuple()) + elif op_code == "A21": + tc_queue.append(("print", "Formatting active file system")) + command = generate_format_sd_card_packet(ssc=2300) + tc_queue.appendleft(command.pack_command_tuple()) + elif op_code == "C0A": + generate_generic_folder_structure(tc_queue, init_ssc=0, iobc=False) + elif op_code == "C0I": + generate_generic_folder_structure(tc_queue, init_ssc=0, iobc=True) + return tc_queue + + +def calculate_allowed_file_data_size(max_app_data_size: int, filename: str, repository: str): + # Subtract two because of '\0' terminators + return max_app_data_size - len(filename) - len(repository) - 2 diff --git a/utility/obsw_binary_uploader.py b/utility/obsw_binary_uploader.py index d6e4671..d5b394c 100644 --- a/utility/obsw_binary_uploader.py +++ b/utility/obsw_binary_uploader.py @@ -12,7 +12,7 @@ from tkinter import filedialog from collections import deque from tmtc_core.comIF.obsw_com_interface import CommunicationInterface -from tc.obsw_tc_service23 import FileTransferHelper, generate_generic_folder_structure +from tc.obsw_tc_service23_sdcard import FileTransferHelper, generate_generic_folder_structure import config.obsw_config as g from tmtc_core.tc.obsw_pus_tc_base import PusTelecommand from tmtc_core.utility.obsw_tmtc_printer import TmTcPrinter, DisplayMode -- GitLab