Something went wrong on our end
Forked from an inaccessible project.
-
Robin.Mueller authoredRobin.Mueller authored
obsw_tc_service23_sdcard.py 18.38 KiB
# -*- coding: utf-8 -*-
"""
Created: 21.01.2020 07:48
@author: Jakob Meier
"""
import config.obsw_config as g
from typing import Deque, Union
from tc.obsw_pus_tc_packer import PusTelecommand, TcQueueT
from tc.obsw_tc_service8 import make_action_id
from tmtc_core.utility.obsw_logger import get_logger
LOGGER = get_logger()
def generate_print_sd_card_packet(
ssc: int, object_id: bytearray = g.SD_CARD_HANDLER_ID) -> PusTelecommand:
app_data = bytearray(object_id)
app_data += make_action_id(2)
return PusTelecommand(service=8, subservice=128, ssc=ssc, app_data=app_data)
def generate_clear_sd_card_packet(
ssc: int, object_id: bytearray = g.SD_CARD_HANDLER_ID) -> PusTelecommand:
app_data = bytearray(object_id)
app_data += make_action_id(20)
return PusTelecommand(service=8, subservice=128, ssc=ssc, app_data=app_data)
def generate_format_sd_card_packet(
ssc: int, object_id: bytearray = g.SD_CARD_HANDLER_ID) -> PusTelecommand:
app_data = bytearray(object_id)
app_data += make_action_id(21)
return PusTelecommand(service=8, subservice=128, ssc=ssc, app_data=app_data)
def generate_generic_folder_structure(
tc_queue: Deque, init_ssc: int, object_id: bytearray = g.SD_CARD_HANDLER_ID,
iobc: bool = False):
tc_queue.appendleft(("print", "Creating TC folder"))
command = generate_mkdir_srv23_9_packet("TC", ssc=init_ssc, object_id=object_id)
tc_queue.appendleft(command.pack_command_tuple())
init_ssc += 1
command = generate_mkdir_srv23_9_packet(
repository_path="TC", directory_name="LARGE", ssc=init_ssc, object_id=object_id)
tc_queue.appendleft(command.pack_command_tuple())
init_ssc += 1
command = generate_mkdir_srv23_9_packet(
repository_path="TC", directory_name="SMALL", ssc=init_ssc, object_id=object_id)
tc_queue.appendleft(command.pack_command_tuple())
init_ssc += 1
tc_queue.appendleft(("print", "Creating TM folder"))
command = generate_mkdir_srv23_9_packet("TM", ssc=init_ssc, object_id=object_id)
tc_queue.appendleft(command.pack_command_tuple())
init_ssc += 1
command = generate_mkdir_srv23_9_packet(
repository_path="TM", directory_name="HK", ssc=init_ssc, object_id=object_id)
tc_queue.appendleft(command.pack_command_tuple())
init_ssc += 1
command = generate_mkdir_srv23_9_packet(
repository_path="TM", directory_name="SC", ssc=init_ssc, object_id=object_id)
tc_queue.appendleft(command.pack_command_tuple())
command = generate_mkdir_srv23_9_packet(
repository_path="TM/SC", directory_name="LARGE", ssc=init_ssc, object_id=object_id)
tc_queue.appendleft(command.pack_command_tuple())
init_ssc += 1
command = generate_mkdir_srv23_9_packet(
repository_path="TM/SC", directory_name="SMALL", ssc=init_ssc, object_id=object_id)
tc_queue.appendleft(command.pack_command_tuple())
init_ssc += 1
tc_queue.appendleft(("print", "Creating BIN folder"))
command = generate_mkdir_srv23_9_packet("BIN", ssc=init_ssc, object_id=object_id)
tc_queue.appendleft(command.pack_command_tuple())
init_ssc += 1
if iobc:
command = generate_mkdir_srv23_9_packet(
repository_path="BIN", directory_name="IOBC", ssc=init_ssc, object_id=object_id)
tc_queue.appendleft(command.pack_command_tuple())
init_ssc += 1
command = generate_mkdir_srv23_9_packet(
repository_path="BIN/IOBC", directory_name="BL", ssc=init_ssc, object_id=object_id)
tc_queue.appendleft(command.pack_command_tuple())
init_ssc += 1
command = generate_mkdir_srv23_9_packet(
repository_path="BIN/IOBC", directory_name="OBSW", ssc=init_ssc, object_id=object_id)
tc_queue.appendleft(command.pack_command_tuple())
init_ssc += 1
else:
command = generate_mkdir_srv23_9_packet(
repository_path="BIN", directory_name="AT91", ssc=init_ssc, object_id=object_id)
tc_queue.appendleft(command.pack_command_tuple())
init_ssc += 1
command = generate_mkdir_srv23_9_packet(
repository_path="BIN/AT91", directory_name="BL", ssc=init_ssc, object_id=object_id)
tc_queue.appendleft(command.pack_command_tuple())
init_ssc += 1
command = generate_mkdir_srv23_9_packet(
repository_path="BIN/AT91", directory_name="OBSW", ssc=init_ssc, object_id=object_id)
tc_queue.appendleft(command.pack_command_tuple())
init_ssc += 1
tc_queue.appendleft(("print", "Creating MISC folder"))
command = generate_mkdir_srv23_9_packet("MISC", ssc=init_ssc, object_id=object_id)
tc_queue.appendleft(command.pack_command_tuple())
def generate_create_file_srv23_1_packet(
filename: str, repository_path: str, ssc: int, max_size_of_app_data: int,
initial_data: bytearray = bytearray([]),
object_id: bytearray = g.SD_CARD_HANDLER_ID) -> Union[PusTelecommand, None]:
if len(initial_data) > calculate_allowed_file_data_size(
max_size_of_app_data, filename, repository_path):
LOGGER.error("generate_create_file_srv23_1_packet: Initial data too large!")
return None
data_to_pack = pack_generic_file_command_header(object_id=object_id, first_path=repository_path,
second_path=filename)
data_to_pack += initial_data
return PusTelecommand(service=23, subservice=1, ssc=ssc, app_data=data_to_pack)
def generate_rm_file_srv23_2_packet(filename: str, repository_path: str,
ssc: int, object_id=g.SD_CARD_HANDLER_ID) -> PusTelecommand:
"""
This function generates a packet which is used to delete a file on a
file-system-capable on-board memory.
@param filename: The name of the file to delete
@param repository_path: The path where the directory shall be created
@param ssc: source sequence count
@param object_id: The object ID of the memory handler which manages the file system
@return The telecommand.
"""
data_to_pack = pack_generic_file_command_header(object_id=object_id, first_path=repository_path,
second_path=filename)
return PusTelecommand(service=23, subservice=2, ssc=ssc, app_data=data_to_pack)
def generate_report_file_attr_srv23_3_packet(
filename: str, repository_path: str, ssc: int,
object_id=g.SD_CARD_HANDLER_ID) -> PusTelecommand:
"""
@param filename: The name of the file to delete
@param repository_path: The path where the directory shall be created
@param ssc: source sequence count
@param object_id: The object ID of the memory handler which manages the file system
@return The telecommand.
"""
data_to_pack = pack_generic_file_command_header(object_id=object_id, first_path=repository_path,
second_path=filename)
return PusTelecommand(service=23, subservice=3, ssc=ssc, app_data=data_to_pack)
def generate_mkdir_srv23_9_packet(directory_name: str, ssc: int, repository_path: str = "/",
object_id: bytearray = g.SD_CARD_HANDLER_ID) -> PusTelecommand:
"""
This function generates a packet which is used to create directories on file systems.
@param directory_name: The path where the directory shall be created
@param repository_path: The name of the directory to create
@param ssc: source sequence count
@param object_id: The object ID of the memory handler which manages the file system
@return The telecommand.
"""
data_to_pack = pack_generic_file_command_header(object_id=object_id, first_path=repository_path,
second_path=directory_name)
return PusTelecommand(service=23, subservice=9, ssc=ssc, app_data=data_to_pack)
def generate_rmdir_srv23_10_packet(directory_name: str, repository_path: str, ssc: int,
object_id: bytearray = g.SD_CARD_HANDLER_ID) -> PusTelecommand:
"""
This function generates a packet which deletes the a directory at the specified repository path.
@param directory_name: Name of the directory to delete
@param repository_path: Path to directory dirname
@param ssc: source sequence count
@param object_id: object ID of the memory handler (e.g. SD Card Handler)
@return The telecommand.
"""
data_to_pack = pack_generic_file_command_header(object_id=object_id, first_path=repository_path,
second_path=directory_name)
return PusTelecommand(service=23, subservice=10, ssc=ssc, app_data=data_to_pack)
def generate_append_to_file_srv23_130_packet(
filename: str, repository_path: str, packet_sequence_number: int,
ssc: int, file_data: bytearray = bytearray([]),
object_id: bytearray = g.SD_CARD_HANDLER_ID) -> PusTelecommand:
data_to_pack = pack_generic_file_command_header(object_id=object_id, first_path=repository_path,
second_path=filename)
data_to_pack += file_data
data_to_pack.append(packet_sequence_number >> 8)
data_to_pack.append(packet_sequence_number & 0xff)
return PusTelecommand(service=23, subservice=130, ssc=ssc, app_data=data_to_pack)
def generate_finish_append_to_file_srv23_131_packet(
filename: str, repository_path: str, ssc: int, lock_file: bool = False,
object_id: bytearray = g.SD_CARD_HANDLER_ID):
data_to_pack = pack_generic_file_command_header(object_id=object_id, first_path=repository_path,
second_path=filename)
data_to_pack.append(lock_file)
return PusTelecommand(service=23, subservice=131, ssc=ssc, app_data=data_to_pack)
def generate_read_file_srv23_140_packet(
repository_path: str, filename: str, ssc: int,
object_id: bytearray = g.SD_CARD_HANDLER_ID) -> PusTelecommand:
"""
This function generates the application data field for a PUS packet with service
23 and subservie 140. Subservice 140 is a custom service to request the data of a file.
@param repository_path: The path of the target file
@param filename: Name of file from which the content shall be read
@param ssc: source sequence count
@param object_id: object ID of the memory handler (e.g. SD Card Handler)
@return: The application data field of the (23,129) PUS packet
"""
data_to_pack = pack_generic_file_command_header(object_id=object_id, first_path=repository_path,
second_path=filename)
return PusTelecommand(service=23, subservice=140, ssc=ssc, app_data=data_to_pack)
def generate_lock_file_srv23_5_6_packet(ssc: int, lock: bool, repository_path: str,
filename: str, object_id: bytearray = g.SD_CARD_HANDLER_ID):
data_to_pack = pack_generic_file_command_header(object_id=object_id, first_path=repository_path,
second_path=filename)
if lock:
return PusTelecommand(service=23, subservice=5, ssc=ssc, app_data=data_to_pack)
else:
return PusTelecommand(service=23, subservice=6, ssc=ssc, app_data=data_to_pack)
def pack_service23_commands_into(tc_queue: TcQueueT, op_code: int) -> Deque:
# sd_handler_id = g.SD_CARD_HANDLER_ID
if op_code == 0:
generate_generic_service_23_test(tc_queue)
elif op_code == "3" or op_code == 3:
LOGGER.info("Press h in the following input requests")
LOGGER.info("to send a command to display the folder structure instead")
(repo_path, filename) = prompt_for_repo_filename()
if repo_path == "" and filename == "h":
tc_queue.appendleft(generate_print_sd_card_packet(ssc=0).pack_command_tuple())
elif repo_path == "" and filename == "c":
return tc_queue
else:
tc_queue.append(("print", "Requesting file attributes"))
command = generate_report_file_attr_srv23_3_packet(
ssc=0, filename=filename, repository_path=repo_path)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "5" or op_code == 5:
LOGGER.info("Press h in the following input requests")
LOGGER.info("to send a command to display the folder structure instead")
(repo_path, filename) = prompt_for_repo_filename()
if repo_path == "" and filename == "h":
tc_queue.append(("print", "Printing active file system"))
tc_queue.appendleft(generate_print_sd_card_packet(ssc=0).pack_command_tuple())
elif repo_path == "" and filename == "c":
return tc_queue
else:
tc_queue.append(("print", "Locking file"))
command = generate_lock_file_srv23_5_6_packet(ssc=0, repository_path=repo_path,
filename=filename, lock=True)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "6" or op_code == 6:
LOGGER.info("Press h in the following input requests to send a command to display the"
"folder structure instead")
(repo_path, filename) = prompt_for_repo_filename()
if repo_path == "" and filename == "h":
tc_queue.append(("print", "Printing active file system"))
tc_queue.appendleft(generate_print_sd_card_packet(ssc=0).pack_command_tuple())
elif repo_path == "" and filename == "c":
return tc_queue
else:
tc_queue.append(("print", "Unlocking file"))
command = generate_lock_file_srv23_5_6_packet(ssc=0, repository_path=repo_path,
filename=filename, lock=False)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "A2":
tc_queue.append(("print", "Printing active file system"))
command = generate_print_sd_card_packet(ssc=2300)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "A20":
tc_queue.append(("print", "Clearing active file system"))
command = generate_clear_sd_card_packet(ssc=2300)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "A21":
tc_queue.append(("print", "Formatting active file system"))
command = generate_format_sd_card_packet(ssc=2300)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "C0A":
tc_queue.append(("print", "Generating generic folder structure on AT91"))
generate_generic_folder_structure(tc_queue, init_ssc=0, iobc=False)
elif op_code == "C0I":
tc_queue.append(("print", "Generating generic folder structure on iOBC"))
generate_generic_folder_structure(tc_queue, init_ssc=0, iobc=True)
return tc_queue
def generate_generic_service_23_test(tc_queue: TcQueueT):
tc_queue.appendleft(("print", "Testing Service 23"))
tc_queue.append(("print", "Create directory 'test'"))
command = generate_mkdir_srv23_9_packet(directory_name="test", repository_path="/", ssc=2300)
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.append(("print", "Create subdirectory 'subdir' in 'test'"))
command = generate_mkdir_srv23_9_packet(repository_path="test", ssc=2301,
directory_name="subdir")
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft(("print", "Create test.bin"))
command = generate_create_file_srv23_1_packet(
filename="test.bin", repository_path="test/subdir", ssc=2302,
initial_data=bytearray([0x01, 0x00, 0x01, 0x00]),
max_size_of_app_data=1024)
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft(("print", "Printing file structure."))
command = generate_print_sd_card_packet(ssc=2300)
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft(("print", "Clearing SD card"))
command = generate_clear_sd_card_packet(ssc=2301)
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft(("print", "Printing file structure"))
command = generate_print_sd_card_packet(ssc=2302)
tc_queue.appendleft(command.pack_command_tuple())
# tc_queue.appendleft(("print", "Read data of test.bin"))
# command = generate_read_file_srv23_129_packet("test/subdir", "test.bin", ssc=2303)
# tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft(("print", "Delete 'test.bin'"))
command = generate_rm_file_srv23_2_packet(
filename="test.bin", repository_path="test/subdir", ssc=2304)
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft(("print", "Delete 'subdir' directory"))
command = generate_rmdir_srv23_10_packet(directory_name="subdir", repository_path="test",
ssc=2305)
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft(("print", "Delete 'test' directory"))
command = generate_rmdir_srv23_10_packet(directory_name="test", repository_path="/",
ssc=2306)
tc_queue.appendleft(command.pack_command_tuple())
def calculate_allowed_file_data_size(max_app_data_size: int, filename: str, repository: str):
# Subtract two because of '\0' terminators
return max_app_data_size - len(filename) - len(repository) - 2
def prompt_for_repo_filename():
input_confirmation = False
repo_path = ""
filename = ""
while not input_confirmation:
repo_path = input("Please type in repository path [c to cancel]: ")
if repo_path == "h":
return "", "h"
filename = input("Please type in filename path [c to cancel]: ")
print("Selection for repostiory path: " + str(repo_path))
print("Selection for filename: " + str(filename))
if repo_path == "c" or filename == "c":
return "", "c"
if repo_path == "h" or filename == "h":
return "", "h"
input_confirmation = input("Confirm selection [y/n]: ")
if input_confirmation in ['y', "yes", 1]:
input_confirmation = True
return repo_path, filename
def pack_generic_file_command_header(object_id: bytearray, first_path: str, second_path: str):
data_to_pack = bytearray(object_id)
data_to_pack += first_path.encode('utf-8')
# Add string terminator of repository paht
data_to_pack.append(0)
data_to_pack += second_path.encode('utf-8')
# Add string terminator of filename
data_to_pack.append(0)
return data_to_pack