From ae4024e22249726ec8d1a2b4954999492f0271b0 Mon Sep 17 00:00:00 2001 From: Robin Mueller <robin.mueller.m@gmail.com> Date: Sun, 29 Nov 2020 19:15:46 +0100 Subject: [PATCH] improved tm checking for bin upload --- core/tmtc_backend.py | 2 +- core/tmtc_client_core.py | 2 +- tm/obsw_tm_service_1.py | 3 + ...sw_args_parser.py => tmtcc_args_parser.py} | 268 ++++----- ...y_uploader.py => tmtcc_binary_uploader.py} | 544 ++++++++++-------- ...inary_writer.py => tmtcc_binary_writer.py} | 0 ...elper.py => tmtcc_file_transfer_helper.py} | 0 7 files changed, 443 insertions(+), 376 deletions(-) rename utility/{obsw_args_parser.py => tmtcc_args_parser.py} (97%) rename utility/{obsw_binary_uploader.py => tmtcc_binary_uploader.py} (70%) rename utility/{binary_writer.py => tmtcc_binary_writer.py} (100%) rename utility/{obsw_file_transfer_helper.py => tmtcc_file_transfer_helper.py} (100%) diff --git a/core/tmtc_backend.py b/core/tmtc_backend.py index 89fa323..4e8860f 100644 --- a/core/tmtc_backend.py +++ b/core/tmtc_backend.py @@ -23,7 +23,7 @@ from tmtc_core.utility.obsw_exit_handler import keyboard_interrupt_handler from tc.obsw_pus_tc_packer import ServiceQueuePacker, create_total_tc_queue from test.obsw_pus_service_test import run_selected_pus_tests from config.tmtcc_com_config import set_communication_interface -from utility.obsw_binary_uploader import BinaryFileUploader +from utility.tmtcc_binary_uploader import BinaryFileUploader LOGGER = get_logger() diff --git a/core/tmtc_client_core.py b/core/tmtc_client_core.py index 534ea1b..4ef4aec 100755 --- a/core/tmtc_client_core.py +++ b/core/tmtc_client_core.py @@ -14,7 +14,7 @@ from config.tmtcc_config import set_globals import config.tmtcc_config as g from core.tmtc_backend import TmTcHandler from core.tmtc_frontend import TmTcFrontend -from utility.obsw_args_parser import parse_input_arguments +from utility.tmtcc_args_parser import parse_input_arguments LOGGER = get_logger() diff --git a/tm/obsw_tm_service_1.py b/tm/obsw_tm_service_1.py index 8060704..2cfa94c 100644 --- a/tm/obsw_tm_service_1.py +++ b/tm/obsw_tm_service_1.py @@ -111,6 +111,9 @@ class Service1TM(PusTelemetry): tm_information.update({TmDictionaryKeys.STEP_NUMBER: self.step_number}) return tm_information + def get_tc_ssc(self): + return self.tc_ssc + class Service1TmPacked(PusTelemetryCreator): """ diff --git a/utility/obsw_args_parser.py b/utility/tmtcc_args_parser.py similarity index 97% rename from utility/obsw_args_parser.py rename to utility/tmtcc_args_parser.py index 1093ff7..777f7e6 100644 --- a/utility/obsw_args_parser.py +++ b/utility/tmtcc_args_parser.py @@ -1,134 +1,134 @@ -#!/usr/bin/python3.8 -""" -Argument parser module. -""" -import argparse -import sys -from tmtc_core.utility.obsw_logger import get_logger - - -LOGGER = get_logger() - - -def parse_input_arguments(): - """ - Parses all input arguments - :return: Input arguments contained in a special namespace and accessable by args.<variable> - """ - arg_parser = argparse.ArgumentParser(description="TMTC Client Command Line Interface") - - arg_parser.add_argument( - '-m', '--mode', type=int, help='Target Mode. Default is 1 (Listener Mode), ' - '0: GUI Mode, 1: Listener Mode, 2: Single Command Mode, 3: Service Test Mode, ' - '4: Software Test Mode, 5: Binary Upload Mode, 6: Unit Test Mode ', default=0) - arg_parser.add_argument( - '-c', '--com_if', type=int, help='Communication Interface. 0: Dummy Interface, 1: Serial, ' - '2: QEMU, 3: UDP', default=2) - arg_parser.add_argument( - '-o', '--op_code', help='Operation code, which is passed to the TC ' - 'packer functions', default=0) - arg_parser.add_argument('--clientIP', help='Client(Computer) IP. Default:\'\'', default='') - arg_parser.add_argument( - '--boardIP', help='Board IP. Default: Localhost 127.0.0.1', default="127.0.0.1") - arg_parser.add_argument('-s', '--service', help='Service to test. Default: 17', default=17) - arg_parser.add_argument( - '-l', '--listener', help='Determine whether the listener mode will be active ' - 'after performing the operation', - action='store_false') - arg_parser.add_argument( - '-t', '--tm_timeout', type=float, help='TM Timeout when listening to verification sequence.' - ' Default: 5 seconds', default=5.0) - arg_parser.add_argument( - '--nl', dest='print_log', help='Supply --nl to suppress print output to log files.', - action='store_false') - arg_parser.add_argument( - '--np', dest='print_tm', help='Supply --np to suppress print output to console.', - action='store_false') - arg_parser.add_argument( - '--tc_timeout_factor', type=float, help='TC Timeout Factor. Multiplied with ' - 'TM Timeout, TC sent again after this time period. Default: 3.5', default=3.5) - arg_parser.add_argument( - '-r', '--rawDataPrint', help='Supply -r to print all raw TM data directly', - action='store_true') - arg_parser.add_argument( - '-d', '--shortDisplayMode', help='Supply -d to print short output', action='store_true') - arg_parser.add_argument( - '--hk', dest='print_hk', help='Supply -k or --hk to print HK data', action='store_true') - arg_parser.add_argument('--COM', dest="com_port", help='COM Port for serial communication') - arg_parser.add_argument( - '--rs', dest="resend_tc", help='Specify whether TCs are sent again after timeout', - action='store_true') - - if len(sys.argv) == 1: - print("No Input Arguments specified.") - arg_parser.print_help() - args, unknown = arg_parser.parse_known_args() - # for argument in vars(args): - # LOGGER.debug(argument + ": " + str(getattr(args, argument))) - handle_args(args, unknown) - return args - - -def handle_args(args, unknown: list) -> None: - """ - Handles the parsed arguments. - :param args: Namespace objects - (see https://docs.python.org/dev/library/argparse.html#argparse.Namespace) - :param unknown: List of unknown parameters. - :return: None - """ - if len(unknown) > 0: - print("Unknown arguments detected: " + str(unknown)) - if len(sys.argv) > 1: - handle_unspecified_args(args) - if len(sys.argv) == 1: - handle_empty_args(args) - - -def handle_unspecified_args(args) -> None: - """ - If some arguments are unspecified, they are set here with (variable) default values. - :param args: - :return: None - """ - if args.com_if == 1 and args.tm_timeout is None: - args.tm_timeout = 6.0 - if args.mode is None: - print("No mode specified with -m Parameter.") - print("Possible Modes: ") - print("1: Listener Mode") - print("2: Single Command Mode with manual command") - print("3: Service Mode, Commands specified in tc folder") - print("4: Software Mode, runs all command specified in obsw_pus_tc_packer.py") - print("5: Unit Test, runs unit test specified in obsw_module_test.py") - args.mode = input("Please enter Mode: ") - if args.mode == 1 and args.service is None: - args.service = input("No Service specified for Service Mode. " - "Please enter PUS G_SERVICE number: ") - - -def handle_empty_args(args) -> None: - """ - If no args were supplied, request input from user directly. - TODO: This still needs to be extended. - :param args: - :return: - """ - print_hk = input("Print HK packets ? (y/n or yes/no)") - try: - print_hk = print_hk.lower() - except TypeError: - pass - if print_hk in ('y', 'yes', 1): - args.print_hk = True - else: - args.print_hk = False - print_to_log = input("Export G_SERVICE test output to log files ? (y/n or yes/no)") - try: - print_to_log = print_to_log.lower() - except TypeError: - pass - if print_to_log in ('n', 'no', 0): - args.printFile = False - else: - args.printFile = True +#!/usr/bin/python3.8 +""" +Argument parser module. +""" +import argparse +import sys +from tmtc_core.utility.obsw_logger import get_logger + + +LOGGER = get_logger() + + +def parse_input_arguments(): + """ + Parses all input arguments + :return: Input arguments contained in a special namespace and accessable by args.<variable> + """ + arg_parser = argparse.ArgumentParser(description="TMTC Client Command Line Interface") + + arg_parser.add_argument( + '-m', '--mode', type=int, help='Target Mode. Default is 1 (Listener Mode), ' + '0: GUI Mode, 1: Listener Mode, 2: Single Command Mode, 3: Service Test Mode, ' + '4: Software Test Mode, 5: Binary Upload Mode, 6: Unit Test Mode ', default=0) + arg_parser.add_argument( + '-c', '--com_if', type=int, help='Communication Interface. 0: Dummy Interface, 1: Serial, ' + '2: QEMU, 3: UDP', default=2) + arg_parser.add_argument( + '-o', '--op_code', help='Operation code, which is passed to the TC ' + 'packer functions', default=0) + arg_parser.add_argument('--clientIP', help='Client(Computer) IP. Default:\'\'', default='') + arg_parser.add_argument( + '--boardIP', help='Board IP. Default: Localhost 127.0.0.1', default="127.0.0.1") + arg_parser.add_argument('-s', '--service', help='Service to test. Default: 17', default=17) + arg_parser.add_argument( + '-l', '--listener', help='Determine whether the listener mode will be active ' + 'after performing the operation', + action='store_false') + arg_parser.add_argument( + '-t', '--tm_timeout', type=float, help='TM Timeout when listening to verification sequence.' + ' Default: 5 seconds', default=5.0) + arg_parser.add_argument( + '--nl', dest='print_log', help='Supply --nl to suppress print output to log files.', + action='store_false') + arg_parser.add_argument( + '--np', dest='print_tm', help='Supply --np to suppress print output to console.', + action='store_false') + arg_parser.add_argument( + '--tc_timeout_factor', type=float, help='TC Timeout Factor. Multiplied with ' + 'TM Timeout, TC sent again after this time period. Default: 3.5', default=3.5) + arg_parser.add_argument( + '-r', '--rawDataPrint', help='Supply -r to print all raw TM data directly', + action='store_true') + arg_parser.add_argument( + '-d', '--shortDisplayMode', help='Supply -d to print short output', action='store_true') + arg_parser.add_argument( + '--hk', dest='print_hk', help='Supply -k or --hk to print HK data', action='store_true') + arg_parser.add_argument('--COM', dest="com_port", help='COM Port for serial communication') + arg_parser.add_argument( + '--rs', dest="resend_tc", help='Specify whether TCs are sent again after timeout', + action='store_true') + + if len(sys.argv) == 1: + print("No Input Arguments specified.") + arg_parser.print_help() + args, unknown = arg_parser.parse_known_args() + # for argument in vars(args): + # LOGGER.debug(argument + ": " + str(getattr(args, argument))) + handle_args(args, unknown) + return args + + +def handle_args(args, unknown: list) -> None: + """ + Handles the parsed arguments. + :param args: Namespace objects + (see https://docs.python.org/dev/library/argparse.html#argparse.Namespace) + :param unknown: List of unknown parameters. + :return: None + """ + if len(unknown) > 0: + print("Unknown arguments detected: " + str(unknown)) + if len(sys.argv) > 1: + handle_unspecified_args(args) + if len(sys.argv) == 1: + handle_empty_args(args) + + +def handle_unspecified_args(args) -> None: + """ + If some arguments are unspecified, they are set here with (variable) default values. + :param args: + :return: None + """ + if args.com_if == 1 and args.tm_timeout is None: + args.tm_timeout = 6.0 + if args.mode is None: + print("No mode specified with -m Parameter.") + print("Possible Modes: ") + print("1: Listener Mode") + print("2: Single Command Mode with manual command") + print("3: Service Mode, Commands specified in tc folder") + print("4: Software Mode, runs all command specified in obsw_pus_tc_packer.py") + print("5: Unit Test, runs unit test specified in obsw_module_test.py") + args.mode = input("Please enter Mode: ") + if args.mode == 1 and args.service is None: + args.service = input("No Service specified for Service Mode. " + "Please enter PUS G_SERVICE number: ") + + +def handle_empty_args(args) -> None: + """ + If no args were supplied, request input from user directly. + TODO: This still needs to be extended. + :param args: + :return: + """ + print_hk = input("Print HK packets ? (y/n or yes/no)") + try: + print_hk = print_hk.lower() + except TypeError: + pass + if print_hk in ('y', 'yes', 1): + args.print_hk = True + else: + args.print_hk = False + print_to_log = input("Export G_SERVICE test output to log files ? (y/n or yes/no)") + try: + print_to_log = print_to_log.lower() + except TypeError: + pass + if print_to_log in ('n', 'no', 0): + args.printFile = False + else: + args.printFile = True diff --git a/utility/obsw_binary_uploader.py b/utility/tmtcc_binary_uploader.py similarity index 70% rename from utility/obsw_binary_uploader.py rename to utility/tmtcc_binary_uploader.py index f27ded9..eb132be 100644 --- a/utility/obsw_binary_uploader.py +++ b/utility/tmtcc_binary_uploader.py @@ -1,240 +1,304 @@ -#!/usr/bin/python3.8 -""" -@brief Binary Uploader Module -@details -This module will be used to upload binaries to the OBC via a communication port, given -a supplied binary. The binary will be sent via the specified communication interface. -It will be possible to encode the data (for example using DLE encoding) -""" -import os -import time -import tkinter as tk -from tkinter import filedialog -from collections import deque -from glob import glob -from typing import Deque - -from tmtc_core.comIF.obsw_com_interface import CommunicationInterface -from utility.obsw_file_transfer_helper import FileTransferHelper -import config.tmtcc_config as g -from tmtc_core.utility.obsw_tmtc_printer import TmTcPrinter, DisplayMode -from tmtc_core.utility.obsw_logger import get_logger -from tmtc_core.sendreceive.obsw_tm_listener import TmListener - -LOGGER = get_logger() - - -class BinaryFileUploader: - def __init__(self, com_if: CommunicationInterface, tmtc_printer: TmTcPrinter, - tm_listener: TmListener): - """ - Initializes the binary file uploader with the required components. - @param com_if: - @param tmtc_printer: - @param tm_listener: - """ - self.com_if = com_if - self.tmtc_printer = tmtc_printer - self.tm_listener = tm_listener - self.iobc = False - # Will be set later depending on board. - self.send_interval = 0 - - def perform_file_upload(self): - gui_cl_prompt = input("GUI(0) or command line version (1)? [0/1]: ") - if gui_cl_prompt == 0: - gui_cl_prompt = True - else: - gui_cl_prompt = False - print("Please select file to upload: ") - file_path = "" - if gui_cl_prompt: - root = tk.Tk() - root.withdraw() - root.wm_attributes('-topmost', 1) - file_path = filedialog.askopenfilename(parent=root) - print("File select: " + str(file_path)) - if file_path == (): - LOGGER.warning("Invalid file path, exiting binary upload mode.") - return - else: - result = [y for x in os.walk("../_bin") for y in glob(os.path.join(x[0], '*.bin'))] - print("Files found in _bin folder: ") - for idx, path in enumerate(result): - print("Selection " + str(idx) + ": " + str(path)) - select_valid = False - selection = input("Please enter desired selection [c to cancel]: ") - while not select_valid: - if selection == 'c': - print("Exiting binary upload mode..") - return - if not selection.isdigit(): - selection = input("Invalid input, try again [c to cancel]: ") - if selection.isdigit(): - if int(selection) < len(result): - file_path = result[int(selection)] - select_valid = True - else: - selection = input("Invalid input, try again [c to cancel]: ") - - print_string = file_path.rsplit(os.path.sep, 1)[-1] + " was selected." - LOGGER.info(print_string) - calc_hamming_code = input("Calculate and send hamming code? [y/n]: ") - if calc_hamming_code in ['y', 'yes', 1]: - calc_hamming_code = True - print("Hamming code will be calculated and sent in tail packet") - else: - calc_hamming_code = False - print("Hamming code will not be calculated") - - iobc_prompt = input("iOBC? [y/n]: ") - if iobc_prompt in ['y', 'yes', 1]: - self.iobc = True - self.send_interval = 1.4 - iobc_prompt = True - else: - self.iobc = False - self.send_interval = 0.6 - iobc_prompt = False - - bootloader_prompt = input("Bootloader (0) or Software Image (1)? [0/1]: ") - if str(bootloader_prompt) == "0": - bootloader_prompt = True - else: - bootloader_prompt = False - - prompt_lock = input("Lock file with last packet? [y/n]: ") - if prompt_lock in ['n', "no", 0]: - prompt_lock = False - else: - prompt_lock = True - - if bootloader_prompt: - file_name = "bl.bin" - else: - file_name = "obsw_up.bin" - - if iobc_prompt: - if bootloader_prompt: - repository_name = "BIN/IOBC/BL" - else: - repository_name = "BIN/IOBC/OBSW" - else: - if bootloader_prompt: - repository_name = "BIN/AT91/BL" - else: - repository_name = "BIN/AT91/OBSW" - - if calc_hamming_code: - pass - - # Right now, the size of PUS packets is limited to 1500 bytes which also limits the app - # data length. - frame_length = g.G_MAX_APP_DATA_LENGTH - - if calc_hamming_code: - # now we calculate the hamming code - pass - - tc_queue = deque() - - # Delete existing binary file first, otherwise data might be appended to otherwise - # valid file which already exists. - file_transfer_helper = FileTransferHelper( - tc_queue=tc_queue, max_size_of_app_data=frame_length, target_repository=repository_name, - target_filename=file_name) - - init_ssc = 0 - self.tmtc_printer.set_display_mode(DisplayMode.SHORT) - - # Configure file transfer helper - file_transfer_helper.set_data_from_file(file_path) - file_transfer_helper.set_to_delete_old_file() - if prompt_lock: - file_transfer_helper.set_to_lock_file(prompt_lock) - else: - file_transfer_helper.set_to_lock_file(prompt_lock) - - # Generate the packets. - file_transfer_helper.generate_packets(init_ssc) - - self.tm_listener.set_listener_mode(TmListener.ListenerModes.MANUAL) - print_string = "BinaryUploader: Detected file size: " + str( - file_transfer_helper.file_size()) - LOGGER.info(print_string) - total_num_packets = file_transfer_helper.get_number_of_packets_generated() - print_string = "BinaryUploader: " + str(total_num_packets) + \ - " packets generated." - if prompt_lock: - print_string += " File will be locked." - else: - print_string += " File will not be locked." - LOGGER.info(print_string) - - reception_deque = deque() - self.__perform_send_algorithm(tc_queue, total_num_packets, reception_deque) - - print_string = "BinaryUploader: All binary packets were sent!" - LOGGER.info(print_string) - print_string = str(reception_deque.__len__()) + " replies received." - - LOGGER.info(print_string) - self.tm_listener.clear_tm_packet_queue() - LOGGER.info("Upload operation finished successfully.") - - def __perform_send_algorithm(self, tc_queue: Deque, number_of_packets: int, reception_deque: - Deque): - last_check = time.time() - last_sent = time.time() - total_time = self.send_interval * number_of_packets - idx = 1 - while tc_queue: - next_send = last_sent + self.send_interval - (tc_packet, tc_info) = tc_queue.pop() - if not isinstance(tc_packet, str): - # print_string = "Sending packet " + str(idx) + ".." - # LOGGER.info(print_string) - idx += 1 - self.com_if.send_telecommand(tc_packet, tc_info) - self.tmtc_printer.print_telecommand(tc_packet, tc_info) - elif tc_packet == "print": - LOGGER.info(tc_info) - remaining_time_string = "Remaining time: " + \ - str(round(total_time - (idx - 2) * self.send_interval, 2)) + \ - " seconds" - print_progress_bar(idx - 2, number_of_packets, print_end="\n", - suffix=remaining_time_string) - # sys.stdout.write("\033[F") # Cursor up one line - packets_received = self.tm_listener.retrieve_tm_packet_queue() - reception_deque.extend(packets_received) - # Every 5 seconds, check whether any reply has been received. If not, cancel operation. - if time.time() - last_check > 5.0 and len(packets_received) == 0: - LOGGER.warning("No replies are being received, cancelling upload operation..") - time_to_sleep = next_send - time.time() - last_sent = next_send - time.sleep(time_to_sleep) - - -# https://stackoverflow.com/questions/3173320/text-progress-bar-in-the-console -# Thank you Greensticks :-) -def print_progress_bar(iteration, total, prefix='', suffix='', decimals=1, length=100, - fill='â–ˆ', print_end="\r"): - """ - Call in a loop to create terminal progress bar - @params: - iteration - Required : current iteration (Int) - total - Required : total iterations (Int) - prefix - Optional : prefix string (Str) - suffix - Optional : suffix string (Str) - decimals - Optional : positive number of decimals in percent complete (Int) - length - Optional : character length of bar (Int) - fill - Optional : bar fill character (Str) - print_end - Optional : end character (e.g. "\r", "\r\n") (Str) - """ - percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total))) - filled_length = int(length * iteration // total) - bar = fill * filled_length + '-' * (length - filled_length) - print(f'\r{prefix} |{bar}| {percent}% {suffix}', end=print_end) - # Print New Line on Complete - if iteration == total: - print() +#!/usr/bin/python3.8 +""" +@brief Binary Uploader Module +@details +This module will be used to upload binaries to the OBC via a communication port, given +a supplied binary. The binary will be sent via the specified communication interface. +It will be possible to encode the data (for example using DLE encoding) +""" +import os +import time +import tkinter as tk +from tkinter import filedialog +from collections import deque +from glob import glob +from typing import Deque, List + +from tm.obsw_tm_service_1 import Service1TM +from tmtc_core.comIF.obsw_com_interface import CommunicationInterface +from tmtc_core.tc.obsw_pus_tc_base import TcQueueT, PusTcTupleT, TcDictionaryKeys +from tmtc_core.tm.obsw_pus_tm_factory import PusTmQueueT +from utility.tmtcc_file_transfer_helper import FileTransferHelper +import config.tmtcc_config as g +from tmtc_core.utility.obsw_tmtc_printer import TmTcPrinter, DisplayMode +from tmtc_core.utility.obsw_logger import get_logger +from tmtc_core.sendreceive.obsw_tm_listener import TmListener + +LOGGER = get_logger() + + +class BinaryFileUploader: + def __init__(self, com_if: CommunicationInterface, tmtc_printer: TmTcPrinter, + tm_listener: TmListener): + """ + Initializes the binary file uploader with the required components. + @param com_if: + @param tmtc_printer: + @param tm_listener: + """ + self.com_if = com_if + self.tmtc_printer = tmtc_printer + self.tm_listener = tm_listener + self.iobc = False + # Will be set later depending on board. + self.send_interval = 0 + + def perform_file_upload(self): + gui_cl_prompt = input("GUI(0) or command line version (1)? [0/1]: ") + if gui_cl_prompt == 0: + gui_cl_prompt = True + else: + gui_cl_prompt = False + print("Please select file to upload: ") + file_path = "" + if gui_cl_prompt: + root = tk.Tk() + root.withdraw() + root.wm_attributes('-topmost', 1) + file_path = filedialog.askopenfilename(parent=root) + print("File select: " + str(file_path)) + if file_path == (): + LOGGER.warning("Invalid file path, exiting binary upload mode.") + return + else: + result = [y for x in os.walk("../_bin") for y in glob(os.path.join(x[0], '*.bin'))] + print("Files found in _bin folder: ") + for idx, path in enumerate(result): + print("Selection " + str(idx) + ": " + str(path)) + select_valid = False + selection = input("Please enter desired selection [c to cancel]: ") + while not select_valid: + if selection == 'c': + print("Exiting binary upload mode..") + return + if not selection.isdigit(): + selection = input("Invalid input, try again [c to cancel]: ") + if selection.isdigit(): + if int(selection) < len(result): + file_path = result[int(selection)] + select_valid = True + else: + selection = input("Invalid input, try again [c to cancel]: ") + + print_string = file_path.rsplit(os.path.sep, 1)[-1] + " was selected." + LOGGER.info(print_string) + calc_hamming_code = input("Calculate and send hamming code? [y/n]: ") + if calc_hamming_code in ['y', 'yes', 1]: + calc_hamming_code = True + print("Hamming code will be calculated and sent in tail packet") + else: + calc_hamming_code = False + print("Hamming code will not be calculated") + + iobc_prompt = input("iOBC? [y/n]: ") + if iobc_prompt in ['y', 'yes', 1]: + self.iobc = True + self.send_interval = 1.4 + iobc_prompt = True + else: + self.iobc = False + self.send_interval = 0.6 + iobc_prompt = False + + bootloader_prompt = input("Bootloader (0) or Software Image (1)? [0/1]: ") + if str(bootloader_prompt) == "0": + bootloader_prompt = True + else: + bootloader_prompt = False + + prompt_lock = input("Lock file with last packet? [y/n]: ") + if prompt_lock in ['n', "no", 0]: + prompt_lock = False + else: + prompt_lock = True + + if bootloader_prompt: + file_name = "bl.bin" + else: + file_name = "obsw_up.bin" + + if iobc_prompt: + if bootloader_prompt: + repository_name = "BIN/IOBC/BL" + else: + repository_name = "BIN/IOBC/OBSW" + else: + if bootloader_prompt: + repository_name = "BIN/AT91/BL" + else: + repository_name = "BIN/AT91/OBSW" + + if calc_hamming_code: + pass + + # Right now, the size of PUS packets is limited to 1500 bytes which also limits the app + # data length. + frame_length = g.G_MAX_APP_DATA_LENGTH + + if calc_hamming_code: + # now we calculate the hamming code + pass + + tc_queue = deque() + + # Delete existing binary file first, otherwise data might be appended to otherwise + # valid file which already exists. + file_transfer_helper = FileTransferHelper( + tc_queue=tc_queue, max_size_of_app_data=frame_length, target_repository=repository_name, + target_filename=file_name) + + init_ssc = 0 + self.tmtc_printer.set_display_mode(DisplayMode.SHORT) + + # Configure file transfer helper + file_transfer_helper.set_data_from_file(file_path) + file_transfer_helper.set_to_delete_old_file() + if prompt_lock: + file_transfer_helper.set_to_lock_file(prompt_lock) + else: + file_transfer_helper.set_to_lock_file(prompt_lock) + + # Generate the packets. + file_transfer_helper.generate_packets(init_ssc) + + self.tm_listener.set_listener_mode(TmListener.ListenerModes.MANUAL) + print_string = "BinaryUploader: Detected file size: " + str( + file_transfer_helper.file_size()) + LOGGER.info(print_string) + total_num_packets = file_transfer_helper.get_number_of_packets_generated() + print_string = "BinaryUploader: " + str(total_num_packets) + \ + " packets generated." + if prompt_lock: + print_string += " File will be locked." + else: + print_string += " File will not be locked." + LOGGER.info(print_string) + + packets_received = self.__perform_send_algorithm(tc_queue, total_num_packets) + + LOGGER.info("BinaryUploader: All binary packets were sent!") + print_string = str(packets_received) + " replies received." + + LOGGER.info(print_string) + self.tm_listener.clear_tm_packet_queue() + LOGGER.info("Upload operation finished successfully.") + + def __perform_send_algorithm(self, tc_queue: TcQueueT, number_of_packets: int) -> int: + last_check = time.time() + last_sent = time.time() + reception_dict = dict() + total_time = self.send_interval * number_of_packets + idx = 1 + new_packets_received = 0 + num_packets_received = 0 + while tc_queue: + next_send = last_sent + self.send_interval + tc_packet, tc_info = tc_queue.pop() + if not isinstance(tc_packet, str): + # print_string = "Sending packet " + str(idx) + ".." + # LOGGER.info(print_string) + idx += 1 + self.com_if.send_telecommand(tc_packet, tc_info) + self.tmtc_printer.print_telecommand(tc_packet, tc_info) + # Store a list which tracks whether acceptance messages were received + # The first entry is a simply counter. + reception_dict.update({tc_info[TcDictionaryKeys.SSC]: [0, False, False, False]}) + elif tc_packet == "print": + LOGGER.info(tc_info) + remaining_time_string = "Remaining time: " + \ + str(round(total_time - (idx - 2) * self.send_interval, 2)) + \ + " seconds" + print_progress_bar(idx - 2, number_of_packets, print_end="\n", + suffix=remaining_time_string) + # sys.stdout.write("\033[F") # Cursor up one line + new_packets_received += self.__handle_tm_queue(reception_dict) + + # Every 5 seconds, check whether any reply has been received. If not, cancel operation. + if time.time() - last_check > 5.0 and new_packets_received == 0: + LOGGER.warning("No replies are being received, cancelling upload operation..") + elif time.time() - last_check > 5.0: + num_packets_received += new_packets_received + new_packets_received = 0 + last_check = time.time() + + time_to_sleep = next_send - time.time() + last_sent = next_send + time.sleep(time_to_sleep) + + # handle last telemetry packets coming in. + time_till_wake = time.time() + 5 + while time.time() < time_till_wake: + new_packets_received += self.__handle_tm_queue(reception_dict) + time.sleep(1) + + num_packets_received += new_packets_received + return num_packets_received + + def __handle_tm_queue(self, reception_dict: dict) -> int: + num_packets_received = 0 + packets_received = self.tm_listener.retrieve_tm_packet_queue() + self.tm_listener.clear_tm_packet_queue() + # reception_deque.extend(packets_received) + for packet_list in packets_received: + for packet in packet_list: + num_packets_received += 1 + if packet.get_service() == 1: + packet: Service1TM + tc_ssc = packet.get_tc_ssc() + acceptance_list = reception_dict.get(tc_ssc) + if acceptance_list is None: + LOGGER.warning("Invalid TC SSC with number " + str(tc_ssc) + + " detected!") + continue + if packet.get_subservice() == 1: + acceptance_list[1] = True + if packet.get_subservice() == 3: + acceptance_list[2] = True + if packet.get_subservice() == 7: + acceptance_list[3] = True + reception_dict.update({tc_ssc: acceptance_list}) + elif packet.get_service() == 5: + # TODO: print event + print("Event received!") + else: + print("Other TM packet with service ID" + str(packet.get_service()) + + " received!") + clear_list = [] + for key, reception_list in reception_dict.items(): + if reception_list[1] and reception_list[2] and reception_list[3]: + # All replies received, we can delete the entry and confirm succesfull handling + print("All replies received for upload packet with SSC " + str(key) + ".") + clear_list.append(key) + elif reception_list[0] > 5: + # No replies for telecommand. + LOGGER.warning("No reply received for TC SSC " + str(key) + "!") + # Continue nonetheless + clear_list.append(key) + for key in clear_list: + del reception_dict[key] + return num_packets_received + + +# https://stackoverflow.com/questions/3173320/text-progress-bar-in-the-console +# Thank you Greensticks :-) +def print_progress_bar(iteration, total, prefix='', suffix='', decimals=1, length=100, + fill='â–ˆ', print_end="\r"): + """ + Call in a loop to create terminal progress bar + @params: + iteration - Required : current iteration (Int) + total - Required : total iterations (Int) + prefix - Optional : prefix string (Str) + suffix - Optional : suffix string (Str) + decimals - Optional : positive number of decimals in percent complete (Int) + length - Optional : character length of bar (Int) + fill - Optional : bar fill character (Str) + print_end - Optional : end character (e.g. "\r", "\r\n") (Str) + """ + percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total))) + filled_length = int(length * iteration // total) + bar = fill * filled_length + '-' * (length - filled_length) + print(f'\r{prefix} |{bar}| {percent}% {suffix}', end=print_end) + # Print New Line on Complete + if iteration == total: + print() diff --git a/utility/binary_writer.py b/utility/tmtcc_binary_writer.py similarity index 100% rename from utility/binary_writer.py rename to utility/tmtcc_binary_writer.py diff --git a/utility/obsw_file_transfer_helper.py b/utility/tmtcc_file_transfer_helper.py similarity index 100% rename from utility/obsw_file_transfer_helper.py rename to utility/tmtcc_file_transfer_helper.py -- GitLab