Skip to content
Snippets Groups Projects
Commit 59a7593e authored by Robin Mueller's avatar Robin Mueller
Browse files

file uploader is class now

parent 9af59fda
No related branches found
No related tags found
No related merge requests found
...@@ -48,7 +48,7 @@ from tmtc_core.utility.obsw_logger import set_tmtc_logger, get_logger ...@@ -48,7 +48,7 @@ from tmtc_core.utility.obsw_logger import set_tmtc_logger, get_logger
from test.obsw_pus_service_test import run_selected_pus_tests from test.obsw_pus_service_test import run_selected_pus_tests
from tc.obsw_pus_tc_packer import create_total_tc_queue, ServiceQueuePacker from tc.obsw_pus_tc_packer import create_total_tc_queue, ServiceQueuePacker
from utility.obsw_args_parser import parse_input_arguments from utility.obsw_args_parser import parse_input_arguments
from utility.obsw_binary_uploader import perform_file_upload from utility.obsw_binary_uploader import BinaryFileUploader
from gui.obsw_tmtc_gui import TmTcGUI from gui.obsw_tmtc_gui import TmTcGUI
from gui.obsw_backend_test import TmTcBackend from gui.obsw_backend_test import TmTcBackend
...@@ -205,7 +205,9 @@ class TmTcHandler: ...@@ -205,7 +205,9 @@ class TmTcHandler:
elif self.mode == g.ModeList.BinaryUploadMode: elif self.mode == g.ModeList.BinaryUploadMode:
# Upload binary, prompt user for input, in the end prompt for new mode and enter that # Upload binary, prompt user for input, in the end prompt for new mode and enter that
# mode # mode
perform_file_upload(self.communication_interface, self.tmtc_printer, self.tm_listener) file_uploader = BinaryFileUploader(self.communication_interface, self.tmtc_printer,
self.tm_listener)
file_uploader.perform_file_upload()
self.command_received = True self.command_received = True
self.mode = g.ModeList.ListenerMode self.mode = g.ModeList.ListenerMode
......
...@@ -25,193 +25,192 @@ LOGGER = get_logger() ...@@ -25,193 +25,192 @@ LOGGER = get_logger()
class BinaryFileUploader: class BinaryFileUploader:
def __init__(self): def __init__(self, com_if: CommunicationInterface, tmtc_printer: TmTcPrinter,
pass tm_listener: TmListener):
"""
Initializes the binary file uploader with the required components.
def perform_file_upload(com_if: CommunicationInterface, tmtc_printer: TmTcPrinter, @param com_if:
tm_listener: TmListener): @param tmtc_printer:
""" @param tm_listener:
TODO: This should be a class.. """
@param com_if: self.com_if = com_if
@param tmtc_printer: self.tmtc_printer = tmtc_printer
@param tm_listener: self.tm_listener = tm_listener
@return:
""" def perform_file_upload(self):
gui_cl_prompt = input("GUI(0) or command line version (1)? [0/1]: ") gui_cl_prompt = input("GUI(0) or command line version (1)? [0/1]: ")
if gui_cl_prompt == 0: if gui_cl_prompt == 0:
gui_cl_prompt = True gui_cl_prompt = True
else: else:
gui_cl_prompt = False gui_cl_prompt = False
print("Please select file to upload: ") print("Please select file to upload: ")
file_path = "" file_path = ""
if gui_cl_prompt: if gui_cl_prompt:
root = tk.Tk() root = tk.Tk()
root.withdraw() root.withdraw()
root.wm_attributes('-topmost', 1) root.wm_attributes('-topmost', 1)
file_path = filedialog.askopenfilename(parent=root) file_path = filedialog.askopenfilename(parent=root)
print("File select: " + str(file_path)) print("File select: " + str(file_path))
if file_path == (): if file_path == ():
LOGGER.warning("Invalid file path, exiting binary upload mode.") LOGGER.warning("Invalid file path, exiting binary upload mode.")
return
else:
result = [y for x in os.walk("../_bin") for y in glob(os.path.join(x[0], '*.bin'))]
print("Files found in _bin folder: ")
for idx, path in enumerate(result):
print("Selection " + str(idx) + ": " + str(path))
select_valid = False
selection = input("Please enter desired selection [c to cancel]: ")
while not select_valid:
if selection == 'c':
print("Exiting binary upload mode..")
return return
if not selection.isdigit(): else:
selection = input("Invalid input, try again [c to cancel]: ") result = [y for x in os.walk("../_bin") for y in glob(os.path.join(x[0], '*.bin'))]
if selection.isdigit(): print("Files found in _bin folder: ")
if int(selection) < len(result): for idx, path in enumerate(result):
file_path = result[int(selection)] print("Selection " + str(idx) + ": " + str(path))
select_valid = True select_valid = False
else: selection = input("Please enter desired selection [c to cancel]: ")
while not select_valid:
if selection == 'c':
print("Exiting binary upload mode..")
return
if not selection.isdigit():
selection = input("Invalid input, try again [c to cancel]: ") selection = input("Invalid input, try again [c to cancel]: ")
if selection.isdigit():
if int(selection) < len(result):
file_path = result[int(selection)]
select_valid = True
else:
selection = input("Invalid input, try again [c to cancel]: ")
print_string = file_path.rsplit(os.path.sep, 1)[-1] + " was selected."
LOGGER.info(print_string)
calc_hamming_code = input("Calculate and send hamming code? [y/n]: ")
if calc_hamming_code in ['y', 'yes', 1]:
calc_hamming_code = True
print("Hamming code will be calculated and sent in tail packet")
else:
calc_hamming_code = False
print("Hamming code will not be calculated")
print_string = file_path.rsplit(os.path.sep, 1)[-1] + " was selected." iobc_prompt = input("iOBC? [y/n]: ")
LOGGER.info(print_string) if iobc_prompt in ['y', 'yes', 1]:
calc_hamming_code = input("Calculate and send hamming code? [y/n]: ") iobc_prompt = True
if calc_hamming_code in ['y', 'yes', 1]:
calc_hamming_code = True
print("Hamming code will be calculated and sent in tail packet")
else:
calc_hamming_code = False
print("Hamming code will not be calculated")
iobc_prompt = input("iOBC? [y/n]: ")
if iobc_prompt in ['y', 'yes', 1]:
iobc_prompt = True
else:
iobc_prompt = False
bootloader_prompt = input("Bootloader (0) or Software Image (1)? [0/1]: ")
if str(bootloader_prompt) == "0":
bootloader_prompt = True
else:
bootloader_prompt = False
prompt_lock = input("Lock file with last packet? [y/n]: ")
if prompt_lock in ['n', "no", 0]:
prompt_lock = False
else:
prompt_lock = True
if bootloader_prompt:
file_name = "bl.bin"
else:
file_name = "obsw_up.bin"
if iobc_prompt:
if bootloader_prompt:
repository_name = "BIN/IOBC/BL"
else: else:
repository_name = "BIN/IOBC/OBSW" iobc_prompt = False
else:
bootloader_prompt = input("Bootloader (0) or Software Image (1)? [0/1]: ")
if str(bootloader_prompt) == "0":
bootloader_prompt = True
else:
bootloader_prompt = False
prompt_lock = input("Lock file with last packet? [y/n]: ")
if prompt_lock in ['n', "no", 0]:
prompt_lock = False
else:
prompt_lock = True
if bootloader_prompt: if bootloader_prompt:
repository_name = "BIN/AT91/BL" file_name = "bl.bin"
else:
file_name = "obsw_up.bin"
if iobc_prompt:
if bootloader_prompt:
repository_name = "BIN/IOBC/BL"
else:
repository_name = "BIN/IOBC/OBSW"
else:
if bootloader_prompt:
repository_name = "BIN/AT91/BL"
else:
repository_name = "BIN/AT91/OBSW"
# Right now, the size of PUS packets is limited to 1500 bytes which also limits the app
# data length.
frame_length = g.G_MAX_APP_DATA_LENGTH
if calc_hamming_code:
# now we calculate the hamming code
pass
tc_queue = deque()
# Delete existing binary file first, otherwise data might be appended to otherwise
# valid file which already exists.
file_transfer_helper = FileTransferHelper(
tc_queue=tc_queue, max_size_of_app_data=frame_length, target_repository=repository_name,
target_filename=file_name)
init_ssc = 0
self.tmtc_printer.set_display_mode(DisplayMode.SHORT)
# Configure file transfer helper
file_transfer_helper.set_data_from_file(file_path)
file_transfer_helper.set_to_delete_old_file()
if prompt_lock:
file_transfer_helper.set_to_lock_file(prompt_lock)
else:
file_transfer_helper.set_to_lock_file(prompt_lock)
# Generate the packets.
file_transfer_helper.generate_packets(init_ssc)
self.tm_listener.set_listener_mode(TmListener.ListenerModes.MANUAL)
print_string = "BinaryUploader: Detected file size: " + str(
file_transfer_helper.file_size())
LOGGER.info(print_string)
total_num_packets = file_transfer_helper.get_number_of_packets_generated()
print_string = "BinaryUploader: " + str(total_num_packets) + \
" packets generated."
if prompt_lock:
print_string += " File will be locked."
else: else:
repository_name = "BIN/AT91/OBSW" print_string += " File will not be locked."
LOGGER.info(print_string)
# Right now, the size of PUS packets is limited to 1500 bytes which also limits the app
# data length. reception_deque = deque()
frame_length = g.G_MAX_APP_DATA_LENGTH self.__perform_send_algorithm(tc_queue, total_num_packets, reception_deque)
if calc_hamming_code: print_string = "BinaryUploader: All binary packets were sent!"
# now we calculate the hamming code LOGGER.info(print_string)
pass print_string = str(reception_deque.__len__()) + " replies received."
tc_queue = deque() LOGGER.info(print_string)
time.sleep(15)
# Delete existing binary file first, otherwise data might be appended to otherwise reception_deque.extend(self.tm_listener.retrieve_tm_packet_queue())
# valid file which already exists. for tm_list in reception_deque:
file_transfer_helper = FileTransferHelper( for tm_packet in tm_list:
tc_queue=tc_queue, max_size_of_app_data=frame_length, target_repository=repository_name, if tm_packet.get_service() == 23 and tm_packet.get_subservice() == 132:
target_filename=file_name) # tmtc_printer.print_telemetry(tm_packet)
pass
init_ssc = 0 self.tm_listener.clear_tm_packet_queue()
tmtc_printer.set_display_mode(DisplayMode.SHORT) LOGGER.info("Transitioning back to listener mode..")
# Configure file transfer helper def __perform_send_algorithm(self, tc_queue: Deque, number_of_packets: int, reception_deque:
file_transfer_helper.set_data_from_file(file_path) Deque):
file_transfer_helper.set_to_delete_old_file() interval = 0.6
if prompt_lock: last_check = time.time()
file_transfer_helper.set_to_lock_file(prompt_lock) last_sent = time.time()
else: total_time = interval * number_of_packets
file_transfer_helper.set_to_lock_file(prompt_lock) idx = 1
while tc_queue:
# Generate the packets. next_send = last_sent + interval
file_transfer_helper.generate_packets(init_ssc) (tc_packet, tc_info) = tc_queue.pop()
if not isinstance(tc_packet, str):
tm_listener.set_listener_mode(TmListener.ListenerModes.MANUAL) # print_string = "Sending packet " + str(idx) + ".."
print_string = "BinaryUploader: Detected file size: " + str(file_transfer_helper.file_size()) # LOGGER.info(print_string)
LOGGER.info(print_string) idx += 1
total_num_packets = file_transfer_helper.get_number_of_packets_generated() self.com_if.send_telecommand(tc_packet, tc_info)
print_string = "BinaryUploader: " + str(total_num_packets) + \ self.tmtc_printer.print_telecommand(tc_packet, tc_info)
" packets generated." elif tc_packet == "print":
if prompt_lock: LOGGER.info(tc_info)
print_string += " File will be locked." remaining_time_string = "Remaining time: " + \
else: str(round(total_time - (idx - 2) * interval, 2)) + \
print_string += " File will not be locked." " seconds"
LOGGER.info(print_string) print_progress_bar(idx - 2, number_of_packets, print_end="\n",
suffix=remaining_time_string)
reception_deque = deque() # sys.stdout.write("\033[F") # Cursor up one line
perform_send_algorithm(tc_queue, total_num_packets, com_if, tmtc_printer, tm_listener, packets_received = self.tm_listener.retrieve_tm_packet_queue()
reception_deque) reception_deque.extend(packets_received)
# Every 5 seconds, check whether any reply has been received. If not, cancel operation.
print_string = "BinaryUploader: All binary packets were sent!" if time.time() - last_check > 5.0 and len(packets_received) == 0:
LOGGER.info(print_string) LOGGER.warning("No replies are being received, cancelling upload operation..")
print_string = str(reception_deque.__len__()) + " replies received." time_to_sleep = next_send - time.time()
last_sent = next_send
LOGGER.info(print_string) time.sleep(time_to_sleep)
time.sleep(15)
reception_deque.extend(tm_listener.retrieve_tm_packet_queue())
for tm_list in reception_deque:
for tm_packet in tm_list:
if tm_packet.get_service() == 23 and tm_packet.get_subservice() == 132:
# tmtc_printer.print_telemetry(tm_packet)
pass
tm_listener.clear_tm_packet_queue()
LOGGER.info("Transitioning back to listener mode..")
def perform_send_algorithm(tc_queue: Deque, number_of_packets: int, com_if: CommunicationInterface,
tmtc_printer: TmTcPrinter, tm_listener: TmListener,
reception_deque: Deque):
interval = 0.6
last_check = time.time()
last_sent = time.time()
total_time = interval * number_of_packets
idx = 1
while tc_queue:
next_send = last_sent + interval
(tc_packet, tc_info) = tc_queue.pop()
if not isinstance(tc_packet, str):
# print_string = "Sending packet " + str(idx) + ".."
# LOGGER.info(print_string)
idx += 1
com_if.send_telecommand(tc_packet, tc_info)
tmtc_printer.print_telecommand(tc_packet, tc_info)
elif tc_packet == "print":
LOGGER.info(tc_info)
remaining_time_string = "Remaining time: " + \
str(round(total_time - (idx - 2) * interval, 2)) + \
" seconds"
print_progress_bar(idx - 2, number_of_packets, print_end="\n", suffix=remaining_time_string)
# sys.stdout.write("\033[F") # Cursor up one line
packets_received = tm_listener.retrieve_tm_packet_queue()
reception_deque.extend(packets_received)
# Every 5 seconds, check whether any reply has been received. If not, cancel operation.
if time.time() - last_check > 5.0 and len(packets_received) == 0:
LOGGER.warning("No replies are being received, cancelling upload operation..")
time_to_sleep = next_send - time.time()
last_sent = next_send
time.sleep(time_to_sleep)
# https://stackoverflow.com/questions/3173320/text-progress-bar-in-the-console # https://stackoverflow.com/questions/3173320/text-progress-bar-in-the-console
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment