Skip to content
Snippets Groups Projects
Forked from an inaccessible project.
obsw_binary_uploader.py 9.91 KiB
#!/usr/bin/python3.8
"""
@brief      Binary Uploader Module
@details
This module will be used to upload binaries to the OBC via a communication port, given
a supplied binary. The binary will be sent via the specified communication interface.
It will be possible to encode the data (for example using DLE encoding)
"""
import os
import time
import tkinter as tk
from tkinter import filedialog
from collections import deque
from glob import glob
from typing import Deque

from tmtc_core.comIF.obsw_com_interface import CommunicationInterface
from utility.obsw_file_transfer_helper import FileTransferHelper
import config.obsw_config as g
from tmtc_core.utility.obsw_tmtc_printer import TmTcPrinter, DisplayMode
from tmtc_core.utility.obsw_logger import get_logger
from tmtc_core.sendreceive.obsw_tm_listener import TmListener

LOGGER = get_logger()


class BinaryFileUploader:
    def __init__(self, com_if: CommunicationInterface, tmtc_printer: TmTcPrinter,
                 tm_listener: TmListener):
        """
        Initializes the binary file uploader with the required components.
        @param com_if:
        @param tmtc_printer:
        @param tm_listener:
        """
        self.com_if = com_if
        self.tmtc_printer = tmtc_printer
        self.tm_listener = tm_listener

    def perform_file_upload(self):
        gui_cl_prompt = input("GUI(0) or command line version (1)? [0/1]: ")
        if gui_cl_prompt == 0:
            gui_cl_prompt = True
        else:
            gui_cl_prompt = False
        print("Please select file to upload: ")
        file_path = ""
        if gui_cl_prompt:
            root = tk.Tk()
            root.withdraw()
            root.wm_attributes('-topmost', 1)
            file_path = filedialog.askopenfilename(parent=root)
            print("File select: " + str(file_path))
            if file_path == ():
                LOGGER.warning("Invalid file path, exiting binary upload mode.")
                return
        else:
            result = [y for x in os.walk("../_bin") for y in glob(os.path.join(x[0], '*.bin'))]
            print("Files found in _bin folder: ")
            for idx, path in enumerate(result):
                print("Selection " + str(idx) + ": " + str(path))
            select_valid = False
            selection = input("Please enter desired selection [c to cancel]: ")
            while not select_valid:
                if selection == 'c':
                    print("Exiting binary upload mode..")
                    return
                if not selection.isdigit():
                    selection = input("Invalid input, try again [c to cancel]: ")
                if selection.isdigit():
                    if int(selection) < len(result):
                        file_path = result[int(selection)]
                        select_valid = True
                    else:
                        selection = input("Invalid input, try again [c to cancel]: ")

        print_string = file_path.rsplit(os.path.sep, 1)[-1] + " was selected."
        LOGGER.info(print_string)
        calc_hamming_code = input("Calculate and send hamming code? [y/n]: ")
        if calc_hamming_code in ['y', 'yes', 1]:
            calc_hamming_code = True
            print("Hamming code will be calculated and sent in tail packet")
        else:
            calc_hamming_code = False
            print("Hamming code will not be calculated")

        iobc_prompt = input("iOBC? [y/n]: ")
        if iobc_prompt in ['y', 'yes', 1]:
            iobc_prompt = True
        else:
            iobc_prompt = False

        bootloader_prompt = input("Bootloader (0) or Software Image (1)? [0/1]: ")
        if str(bootloader_prompt) == "0":
            bootloader_prompt = True
        else:
            bootloader_prompt = False

        prompt_lock = input("Lock file with last packet? [y/n]: ")
        if prompt_lock in ['n', "no", 0]:
            prompt_lock = False
        else:
            prompt_lock = True

        if bootloader_prompt:
            file_name = "bl.bin"
        else:
            file_name = "obsw_up.bin"

        if iobc_prompt:
            if bootloader_prompt:
                repository_name = "BIN/IOBC/BL"
            else:
                repository_name = "BIN/IOBC/OBSW"
        else:
            if bootloader_prompt:
                repository_name = "BIN/AT91/BL"
            else:
                repository_name = "BIN/AT91/OBSW"

        if calc_hamming_code:
            pass

        # Right now, the size of PUS packets is limited to 1500 bytes which also limits the app
        # data length.
        frame_length = g.G_MAX_APP_DATA_LENGTH

        if calc_hamming_code:
            # now we calculate the hamming code
            pass

        tc_queue = deque()

        # Delete existing binary file first, otherwise data might be appended to otherwise
        # valid file which already exists.
        file_transfer_helper = FileTransferHelper(
            tc_queue=tc_queue, max_size_of_app_data=frame_length, target_repository=repository_name,
            target_filename=file_name)

        init_ssc = 0
        self.tmtc_printer.set_display_mode(DisplayMode.SHORT)

        # Configure file transfer helper
        file_transfer_helper.set_data_from_file(file_path)
        file_transfer_helper.set_to_delete_old_file()
        if prompt_lock:
            file_transfer_helper.set_to_lock_file(prompt_lock)
        else:
            file_transfer_helper.set_to_lock_file(prompt_lock)

        # Generate the packets.
        file_transfer_helper.generate_packets(init_ssc)

        self.tm_listener.set_listener_mode(TmListener.ListenerModes.MANUAL)
        print_string = "BinaryUploader: Detected file size: " + str(
            file_transfer_helper.file_size())
        LOGGER.info(print_string)
        total_num_packets = file_transfer_helper.get_number_of_packets_generated()
        print_string = "BinaryUploader: " + str(total_num_packets) + \
                       " packets generated."
        if prompt_lock:
            print_string += " File will be locked."
        else:
            print_string += " File will not be locked."
        LOGGER.info(print_string)

        reception_deque = deque()
        self.__perform_send_algorithm(tc_queue, total_num_packets, reception_deque)

        print_string = "BinaryUploader: All binary packets were sent!"
        LOGGER.info(print_string)
        print_string = str(reception_deque.__len__()) + " replies received."

        LOGGER.info(print_string)
        time.sleep(15)
        reception_deque.extend(self.tm_listener.retrieve_tm_packet_queue())
        for tm_list in reception_deque:
            for tm_packet in tm_list:
                if tm_packet.get_service() == 23 and tm_packet.get_subservice() == 132:
                    # tmtc_printer.print_telemetry(tm_packet)
                    pass
        self.tm_listener.clear_tm_packet_queue()
        LOGGER.info("Transitioning back to listener mode..")

    def __perform_send_algorithm(self, tc_queue: Deque, number_of_packets: int, reception_deque:
                                 Deque):
        interval = 0.6
        last_check = time.time()
        last_sent = time.time()
        total_time = interval * number_of_packets
        idx = 1
        while tc_queue:
            next_send = last_sent + interval
            (tc_packet, tc_info) = tc_queue.pop()
            if not isinstance(tc_packet, str):
                # print_string = "Sending packet " + str(idx) + ".."
                # LOGGER.info(print_string)
                idx += 1
                self.com_if.send_telecommand(tc_packet, tc_info)
                self.tmtc_printer.print_telecommand(tc_packet, tc_info)
            elif tc_packet == "print":
                LOGGER.info(tc_info)
            remaining_time_string = "Remaining time: " + \
                                    str(round(total_time - (idx - 2) * interval, 2)) + \
                                    " seconds"
            print_progress_bar(idx - 2, number_of_packets, print_end="\n",
                               suffix=remaining_time_string)
            # sys.stdout.write("\033[F")  # Cursor up one line
            packets_received = self.tm_listener.retrieve_tm_packet_queue()
            reception_deque.extend(packets_received)
            # Every 5 seconds, check whether any reply has been received. If not, cancel operation.
            if time.time() - last_check > 5.0 and len(packets_received) == 0:
                LOGGER.warning("No replies are being received, cancelling upload operation..")
            time_to_sleep = next_send - time.time()
            last_sent = next_send
            time.sleep(time_to_sleep)


# https://stackoverflow.com/questions/3173320/text-progress-bar-in-the-console
# Thank you Greensticks :-)
def print_progress_bar(iteration, total, prefix='', suffix='', decimals=1, length=100,
                       fill='', print_end="\r"):
    """
    Call in a loop to create terminal progress bar
    @params:
        iteration   - Required  : current iteration (Int)
        total       - Required  : total iterations (Int)
        prefix      - Optional  : prefix string (Str)
        suffix      - Optional  : suffix string (Str)
        decimals    - Optional  : positive number of decimals in percent complete (Int)
        length      - Optional  : character length of bar (Int)
        fill        - Optional  : bar fill character (Str)
        print_end    - Optional  : end character (e.g. "\r", "\r\n") (Str)
    """
    percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total)))
    filled_length = int(length * iteration // total)
    bar = fill * filled_length + '-' * (length - filled_length)
    print(f'\r{prefix} |{bar}| {percent}% {suffix}', end=print_end)
    # Print New Line on Complete
    if iteration == total:
        print()