Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • Robin.Mueller/tmtc
1 result
Show changes
# -*- coding: utf-8 -*-
"""
Program: obsw_tm_service_5.py
Date: 30.12.2019
Description: Deserialize PUS Event Report
Author: R. Mueller
"""
from tmtc_core.tm.obsw_pus_tm_base import PusTelemetry, TmDictionaryKeys
from tmtc_core.tm.obsw_pus_tm_factory import PusTmInfoT
import struct
class Service5TM(PusTelemetry):
def __init__(self, byte_array):
super().__init__(byte_array)
self.specify_packet_info("Event")
if self.get_subservice() == 1:
self.append_packet_info(" Info")
elif self.get_subservice() == 2:
self.append_packet_info(" Error Low Severity")
elif self.get_subservice() == 3:
self.append_packet_info(" Error Med Severity")
elif self.get_subservice() == 4:
self.append_packet_info(" Error High Severity")
self.eventId = struct.unpack('>H', self._tm_data[0:2])[0]
self.objectId = struct.unpack('>I', self._tm_data[2:6])[0]
self.param1 = struct.unpack('>I', self._tm_data[6:10])[0]
self.param2 = struct.unpack('>I', self._tm_data[10:14])[0]
def append_telemetry_content(self, array):
super().append_telemetry_content(array)
array.append(str(self.eventId))
array.append(hex(self.objectId))
array.append(str(hex(self.param1)) + ", " + str(self.param1))
array.append(str(hex(self.param2)) + ", " + str(self.param2))
def append_telemetry_column_headers(self, array):
super().append_telemetry_column_headers(array)
array.append("Event ID")
array.append("Reporter ID")
array.append("Parameter 1")
array.append("Parameter 2")
def pack_tm_information(self) -> PusTmInfoT:
tm_information = super().pack_tm_information()
add_information = {
TmDictionaryKeys.REPORTER_ID: self.objectId,
TmDictionaryKeys.EVENT_ID: self.eventId,
TmDictionaryKeys.EVENT_PARAM_1: self.param1,
TmDictionaryKeys.EVENT_PARAM_2: self.param2
}
tm_information.update(add_information)
return tm_information
#!/usr/bin/python3
"""
@brief This client was developed by KSat for the SOURCE project to test the on-board software.
@brief TMTC Commander entry point for command line mode.
@details
This client features multiple sender/receiver modes and has been designed
to be extensible and easy to use. This clien is based on the PUS standard for the format
of telecommands and telemetry. It can also send TMTC via different interfaces like the
serial interface (USB port) or ethernet interface.
This client was developed by KSat for the SOURCE project to test the on-board software but
has evolved into a more generic tool for satellite developers to perform TMTC (Telemetry and Telecommand)
handling and testing via different communication interfaces. Currently, only the PUS standard is
implemented as a packet standard.
Run this file with the -h flag to display options.
@license
Copyright 2020 KSat e.V. Stuttgart
......@@ -22,13 +24,14 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
@manual
Run this file with the -h flag to display options.
@author R. Mueller
"""
from core.tmtc_client_core import run_tmtc_client
def main():
run_tmtc_client(False)
if __name__ == "__main__":
main()
#!/usr/bin/python3
"""
@brief This client was developed by KSat for the SOURCE project to test the on-board software.
@brief TMTC Commander entry point for GUI mode.
@details
This client features multiple sender/receiver modes and has been designed
to be extensible and easy to use. This clien is based on the PUS standard for the format
of telecommands and telemetry. It can also send TMTC via different interfaces like the
serial interface (USB port) or ethernet interface.
This client was developed by KSat for the SOURCE project to test the on-board software but
has evolved into a more generic tool for satellite developers to perform TMTC (Telemetry and Telecommand)
handling and testing via different communication interfaces. Currently, only the PUS standard is
implemented as a packet standard.
Run this file with the -h flag to display options.
@license
Copyright 2020 KSat e.V. Stuttgart
......@@ -22,12 +24,14 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
@manual
@author R. Mueller
"""
from core.tmtc_client_core import run_tmtc_client
def main():
run_tmtc_client(True)
if __name__ == "__main__":
main()
\ No newline at end of file
main()
Subproject commit 1e06be166b0bf47a2a921fd45eed49ea9f5782c3
Subproject commit 3749440b9f1051b9d022f00012f1da417759fe32
#!/usr/bin/python3.8
"""
Argument parser module.
"""
import argparse
import sys
from tmtc_core.utility.obsw_logger import get_logger
LOGGER = get_logger()
def parse_input_arguments():
"""
Parses all input arguments
:return: Input arguments contained in a special namespace and accessable by args.<variable>
"""
arg_parser = argparse.ArgumentParser(description="TMTC Client Command Line Interface")
arg_parser.add_argument(
'-m', '--mode', type=int, help='Target Mode. Default is 1 (Listener Mode), '
'0: GUI Mode, 1: Listener Mode, 2: Single Command Mode, 3: Service Test Mode, '
'4: Software Test Mode, 5: Binary Upload Mode, 6: Unit Test Mode ', default=0)
arg_parser.add_argument(
'-c', '--com_if', type=int, help='Communication Interface. 0: Dummy Interface, 1: Serial, '
'2: QEMU, 3: UDP', default=2)
arg_parser.add_argument(
'-o', '--op_code', help='Operation code, which is passed to the TC '
'packer functions', default=0)
arg_parser.add_argument('--clientIP', help='Client(Computer) IP. Default:\'\'', default='')
arg_parser.add_argument(
'--boardIP', help='Board IP. Default: Localhost 127.0.0.1', default="127.0.0.1")
arg_parser.add_argument('-s', '--service', help='Service to test. Default: 17', default=17)
arg_parser.add_argument(
'-l','--listener', help='Determine whether the listener mode will be active after '
'performing the operation',
action='store_false')
arg_parser.add_argument(
'-t', '--tm_timeout', type=float, help='TM Timeout when listening to verification sequence.'
' Default: 5 seconds', default=5.0)
arg_parser.add_argument(
'--nl', dest='print_log', help='Supply --nl to suppress print output to log files.',
action='store_false')
arg_parser.add_argument(
'--np', dest='print_tm', help='Supply --np to suppress print output to console.',
action='store_false')
arg_parser.add_argument(
'--tc_timeout_factor', type=float, help='TC Timeout Factor. Multiplied with '
'TM Timeout, TC sent again after this time period. Default: 3.5', default=3.5)
arg_parser.add_argument(
'-r', '--rawDataPrint', help='Supply -r to print all raw TM data directly',
action='store_true')
arg_parser.add_argument(
'-d', '--shortDisplayMode', help='Supply -d to print short output', action='store_true')
arg_parser.add_argument(
'--hk', dest='print_hk', help='Supply -k or --hk to print HK data', action='store_true')
arg_parser.add_argument('--COM', dest="com_port", help='COM Port for serial communication')
arg_parser.add_argument(
'--rs', dest="resend_tc", help='Specify whether TCs are sent again after timeout',
action='store_true')
if len(sys.argv) == 1:
print("No Input Arguments specified.")
arg_parser.print_help()
args, unknown = arg_parser.parse_known_args()
# for argument in vars(args):
# LOGGER.debug(argument + ": " + str(getattr(args, argument)))
handle_args(args, unknown)
return args
def handle_args(args, unknown: list) -> None:
"""
Handles the parsed arguments.
:param args: Namespace objects
(see https://docs.python.org/dev/library/argparse.html#argparse.Namespace)
:param unknown: List of unknown parameters.
:return: None
"""
if len(unknown) > 0:
print("Unknown arguments detected: " + str(unknown))
if len(sys.argv) > 1:
handle_unspecified_args(args)
if len(sys.argv) == 1:
handle_empty_args(args)
def handle_unspecified_args(args) -> None:
"""
If some arguments are unspecified, they are set here with (variable) default values.
:param args:
:return: None
"""
if args.com_if == 1 and args.tm_timeout is None:
args.tm_timeout = 6.0
if args.mode is None:
print("No mode specified with -m Parameter.")
print("Possible Modes: ")
print("1: Listener Mode")
print("2: Single Command Mode with manual command")
print("3: Service Mode, Commands specified in tc folder")
print("4: Software Mode, runs all command specified in obsw_pus_tc_packer.py")
print("5: Unit Test, runs unit test specified in obsw_module_test.py")
args.mode = input("Please enter Mode: ")
if args.mode == 1 and args.service is None:
args.service = input("No Service specified for Service Mode. "
"Please enter PUS G_SERVICE number: ")
def handle_empty_args(args) -> None:
"""
If no args were supplied, request input from user directly.
TODO: This still needs to be extended.
:param args:
:return:
"""
print_hk = input("Print HK packets ? (y/n or yes/no)")
try:
print_hk = print_hk.lower()
except TypeError:
pass
if print_hk in ('y', 'yes', 1):
args.print_hk = True
else:
args.print_hk = False
print_to_log = input("Export G_SERVICE test output to log files ? (y/n or yes/no)")
try:
print_to_log = print_to_log.lower()
except TypeError:
pass
if print_to_log in ('n', 'no', 0):
args.printFile = False
else:
args.printFile = True
#!/usr/bin/python3.8
"""
Argument parser module.
"""
import argparse
import sys
from tmtc_core.utility.obsw_logger import get_logger
LOGGER = get_logger()
def parse_input_arguments():
"""
Parses all input arguments
:return: Input arguments contained in a special namespace and accessable by args.<variable>
"""
arg_parser = argparse.ArgumentParser(description="TMTC Client Command Line Interface")
arg_parser.add_argument(
'-m', '--mode', type=int, help='Target Mode. Default is 1 (Listener Mode), '
'1: Listener Mode, 2: Single Command Mode, 3: Service Test Mode, '
'4: Software Test Mode, 5: Binary Upload Mode, 6: Unit Test Mode ', default=0)
arg_parser.add_argument(
'-c', '--com_if', type=int, help='Communication Interface. 0: Dummy Interface, 1: Serial, '
'2: QEMU, 3: UDP', default=2)
arg_parser.add_argument(
'-o', '--op_code', help='Operation code, which is passed to the TC '
'packer functions', default=0)
arg_parser.add_argument('--clientIP', help='Client(Computer) IP. Default:\'\'', default='')
arg_parser.add_argument(
'--boardIP', help='Board IP. Default: Localhost 127.0.0.1', default="127.0.0.1")
arg_parser.add_argument('-s', '--service', help='Service to test. Default: 17', default=17)
arg_parser.add_argument(
'-l', '--listener', help='Determine whether the listener mode will be active '
'after performing the operation',
action='store_false')
arg_parser.add_argument(
'-t', '--tm_timeout', type=float, help='TM Timeout when listening to verification sequence.'
' Default: 5 seconds', default=5.0)
arg_parser.add_argument(
'--nl', dest='print_log', help='Supply --nl to suppress print output to log files.',
action='store_false')
arg_parser.add_argument(
'--np', dest='print_tm', help='Supply --np to suppress print output to console.',
action='store_false')
arg_parser.add_argument(
'--tc_timeout_factor', type=float, help='TC Timeout Factor. Multiplied with '
'TM Timeout, TC sent again after this time period. Default: 3.5', default=3.5)
arg_parser.add_argument(
'-r', '--rawDataPrint', help='Supply -r to print all raw TM data directly',
action='store_true')
arg_parser.add_argument(
'-d', '--shortDisplayMode', help='Supply -d to print short output', action='store_true')
arg_parser.add_argument(
'--hk', dest='print_hk', help='Supply -k or --hk to print HK data', action='store_true')
arg_parser.add_argument(
'--rs', dest="resend_tc", help='Specify whether TCs are sent again after timeout',
action='store_true')
if len(sys.argv) == 1:
print("No Input Arguments specified.")
arg_parser.print_help()
args, unknown = arg_parser.parse_known_args()
# for argument in vars(args):
# LOGGER.debug(argument + ": " + str(getattr(args, argument)))
handle_args(args, unknown)
return args
def handle_args(args, unknown: list) -> None:
"""
Handles the parsed arguments.
:param args: Namespace objects
(see https://docs.python.org/dev/library/argparse.html#argparse.Namespace)
:param unknown: List of unknown parameters.
:return: None
"""
if len(unknown) > 0:
print("Unknown arguments detected: " + str(unknown))
if len(sys.argv) > 1:
handle_unspecified_args(args)
if len(sys.argv) == 1:
handle_empty_args(args)
def handle_unspecified_args(args) -> None:
"""
If some arguments are unspecified, they are set here with (variable) default values.
:param args:
:return: None
"""
if args.com_if == 1 and args.tm_timeout is None:
args.tm_timeout = 6.0
if args.mode is None:
print("No mode specified with -m Parameter.")
print("Possible Modes: ")
print("1: Listener Mode")
print("2: Single Command Mode with manual command")
print("3: Service Mode, Commands specified in pus_tc folder")
print("4: Software Mode, runs all command specified in tmtcc_pus_tc_packer.py")
print("5: Unit Test, runs unit test specified in obsw_module_test.py")
args.mode = input("Please enter Mode: ")
if args.mode == 1 and args.service is None:
args.service = input("No Service specified for Service Mode. "
"Please enter PUS G_SERVICE number: ")
def handle_empty_args(args) -> None:
"""
If no args were supplied, request input from user directly.
TODO: This still needs to be extended.
:param args:
:return:
"""
print_hk = input("Print HK packets ? (y/n or yes/no)")
try:
print_hk = print_hk.lower()
except TypeError:
pass
if print_hk in ('y', 'yes', 1):
args.print_hk = True
else:
args.print_hk = False
print_to_log = input("Export G_SERVICE test output to log files ? (y/n or yes/no)")
try:
print_to_log = print_to_log.lower()
except TypeError:
pass
if print_to_log in ('n', 'no', 0):
args.printFile = False
else:
args.printFile = True
#!/usr/bin/python3.8
"""
@brief Binary Uploader Module
@details
This module will be used to upload binaries to the OBC via a communication port, given
a supplied binary. The binary will be sent via the specified communication interface.
It will be possible to encode the data (for example using DLE encoding)
"""
import os
import time
import tkinter as tk
from tkinter import filedialog
from collections import deque
from glob import glob
from typing import Deque
from tmtc_core.comIF.obsw_com_interface import CommunicationInterface
from utility.obsw_file_transfer_helper import FileTransferHelper
import config.obsw_config as g
from tmtc_core.utility.obsw_tmtc_printer import TmTcPrinter, DisplayMode
from tmtc_core.utility.obsw_logger import get_logger
from tmtc_core.sendreceive.obsw_tm_listener import TmListener
LOGGER = get_logger()
class BinaryFileUploader:
def __init__(self, com_if: CommunicationInterface, tmtc_printer: TmTcPrinter,
tm_listener: TmListener):
"""
Initializes the binary file uploader with the required components.
@param com_if:
@param tmtc_printer:
@param tm_listener:
"""
self.com_if = com_if
self.tmtc_printer = tmtc_printer
self.tm_listener = tm_listener
self.iobc = False
self.send_interval = 1.0
def perform_file_upload(self):
gui_cl_prompt = input("GUI(0) or command line version (1)? [0/1]: ")
if gui_cl_prompt == 0:
gui_cl_prompt = True
else:
gui_cl_prompt = False
print("Please select file to upload: ")
file_path = ""
if gui_cl_prompt:
root = tk.Tk()
root.withdraw()
root.wm_attributes('-topmost', 1)
file_path = filedialog.askopenfilename(parent=root)
print("File select: " + str(file_path))
if file_path == ():
LOGGER.warning("Invalid file path, exiting binary upload mode.")
return
else:
result = [y for x in os.walk("../_bin") for y in glob(os.path.join(x[0], '*.bin'))]
print("Files found in _bin folder: ")
for idx, path in enumerate(result):
print("Selection " + str(idx) + ": " + str(path))
select_valid = False
selection = input("Please enter desired selection [c to cancel]: ")
while not select_valid:
if selection == 'c':
print("Exiting binary upload mode..")
return
if not selection.isdigit():
selection = input("Invalid input, try again [c to cancel]: ")
if selection.isdigit():
if int(selection) < len(result):
file_path = result[int(selection)]
select_valid = True
else:
selection = input("Invalid input, try again [c to cancel]: ")
print_string = file_path.rsplit(os.path.sep, 1)[-1] + " was selected."
LOGGER.info(print_string)
calc_hamming_code = input("Calculate and send hamming code? [y/n]: ")
if calc_hamming_code in ['y', 'yes', 1]:
calc_hamming_code = True
print("Hamming code will be calculated and sent in tail packet")
else:
calc_hamming_code = False
print("Hamming code will not be calculated")
iobc_prompt = input("iOBC? [y/n]: ")
if iobc_prompt in ['y', 'yes', 1]:
self.iobc = True
self.send_interval = 0.8
iobc_prompt = True
else:
self.iobc = False
self.send_interval = 0.6
iobc_prompt = False
bootloader_prompt = input("Bootloader (0) or Software Image (1)? [0/1]: ")
if str(bootloader_prompt) == "0":
bootloader_prompt = True
else:
bootloader_prompt = False
prompt_lock = input("Lock file with last packet? [y/n]: ")
if prompt_lock in ['n', "no", 0]:
prompt_lock = False
else:
prompt_lock = True
if bootloader_prompt:
file_name = "bl.bin"
else:
file_name = "obsw_up.bin"
if iobc_prompt:
if bootloader_prompt:
repository_name = "BIN/IOBC/BL"
else:
repository_name = "BIN/IOBC/OBSW"
else:
if bootloader_prompt:
repository_name = "BIN/AT91/BL"
else:
repository_name = "BIN/AT91/OBSW"
if calc_hamming_code:
pass
# Right now, the size of PUS packets is limited to 1500 bytes which also limits the app
# data length.
frame_length = g.G_MAX_APP_DATA_LENGTH
if calc_hamming_code:
# now we calculate the hamming code
pass
tc_queue = deque()
# Delete existing binary file first, otherwise data might be appended to otherwise
# valid file which already exists.
file_transfer_helper = FileTransferHelper(
tc_queue=tc_queue, max_size_of_app_data=frame_length, target_repository=repository_name,
target_filename=file_name)
init_ssc = 0
self.tmtc_printer.set_display_mode(DisplayMode.SHORT)
# Configure file transfer helper
file_transfer_helper.set_data_from_file(file_path)
file_transfer_helper.set_to_delete_old_file()
if prompt_lock:
file_transfer_helper.set_to_lock_file(prompt_lock)
else:
file_transfer_helper.set_to_lock_file(prompt_lock)
# Generate the packets.
file_transfer_helper.generate_packets(init_ssc)
self.tm_listener.set_listener_mode(TmListener.ListenerModes.MANUAL)
print_string = "BinaryUploader: Detected file size: " + str(
file_transfer_helper.file_size())
LOGGER.info(print_string)
total_num_packets = file_transfer_helper.get_number_of_packets_generated()
print_string = "BinaryUploader: " + str(total_num_packets) + \
" packets generated."
if prompt_lock:
print_string += " File will be locked."
else:
print_string += " File will not be locked."
LOGGER.info(print_string)
reception_deque = deque()
self.__perform_send_algorithm(tc_queue, total_num_packets, reception_deque)
print_string = "BinaryUploader: All binary packets were sent!"
LOGGER.info(print_string)
print_string = str(reception_deque.__len__()) + " replies received."
LOGGER.info(print_string)
time.sleep(15)
reception_deque.extend(self.tm_listener.retrieve_tm_packet_queue())
for tm_list in reception_deque:
for tm_packet in tm_list:
if tm_packet.get_service() == 23 and tm_packet.get_subservice() == 132:
# tmtc_printer.print_telemetry(tm_packet)
pass
self.tm_listener.clear_tm_packet_queue()
LOGGER.info("Transitioning back to listener mode..")
def __perform_send_algorithm(self, tc_queue: Deque, number_of_packets: int, reception_deque:
Deque):
last_check = time.time()
last_sent = time.time()
total_time = self.send_interval * number_of_packets
idx = 1
while tc_queue:
next_send = last_sent + self.send_interval
(tc_packet, tc_info) = tc_queue.pop()
if not isinstance(tc_packet, str):
# print_string = "Sending packet " + str(idx) + ".."
# LOGGER.info(print_string)
idx += 1
self.com_if.send_telecommand(tc_packet, tc_info)
self.tmtc_printer.print_telecommand(tc_packet, tc_info)
elif tc_packet == "print":
LOGGER.info(tc_info)
remaining_time_string = "Remaining time: " + \
str(round(total_time - (idx - 2) * self.send_interval, 2)) + \
" seconds"
print_progress_bar(idx - 2, number_of_packets, print_end="\n",
suffix=remaining_time_string)
# sys.stdout.write("\033[F") # Cursor up one line
packets_received = self.tm_listener.retrieve_tm_packet_queue()
reception_deque.extend(packets_received)
# Every 5 seconds, check whether any reply has been received. If not, cancel operation.
if time.time() - last_check > 5.0 and len(packets_received) == 0:
LOGGER.warning("No replies are being received, cancelling upload operation..")
time_to_sleep = next_send - time.time()
last_sent = next_send
time.sleep(time_to_sleep)
# https://stackoverflow.com/questions/3173320/text-progress-bar-in-the-console
# Thank you Greensticks :-)
def print_progress_bar(iteration, total, prefix='', suffix='', decimals=1, length=100,
fill='', print_end="\r"):
"""
Call in a loop to create terminal progress bar
@params:
iteration - Required : current iteration (Int)
total - Required : total iterations (Int)
prefix - Optional : prefix string (Str)
suffix - Optional : suffix string (Str)
decimals - Optional : positive number of decimals in percent complete (Int)
length - Optional : character length of bar (Int)
fill - Optional : bar fill character (Str)
print_end - Optional : end character (e.g. "\r", "\r\n") (Str)
"""
percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total)))
filled_length = int(length * iteration // total)
bar = fill * filled_length + '-' * (length - filled_length)
print(f'\r{prefix} |{bar}| {percent}% {suffix}', end=print_end)
# Print New Line on Complete
if iteration == total:
print()
#!/usr/bin/python3.8
"""
@brief Binary Uploader Module
@details
This module will be used to upload binaries to the OBC via a communication port, given
a supplied binary. The binary will be sent via the specified communication interface.
It will be possible to encode the data (for example using DLE encoding)
"""
import os
import time
import tkinter as tk
from tkinter import filedialog
from collections import deque
from glob import glob
from tmtc_core.pus_tm.tmtcc_tm_service_1 import Service1TM
from tmtc_core.comIF.obsw_com_interface import CommunicationInterface
from tmtc_core.pus_tc.tmtcc_pus_tc_base import TcQueueT, TcDictionaryKeys
from utility.tmtcc_file_transfer_helper import FileTransferHelper
import config.tmtcc_config as g
from tmtc_core.utility.obsw_tmtc_printer import TmTcPrinter, DisplayMode
from tmtc_core.utility.obsw_logger import get_logger
from tmtc_core.sendreceive.obsw_tm_listener import TmListener
LOGGER = get_logger()
class BinaryFileUploader:
def __init__(self, com_if: CommunicationInterface, tmtc_printer: TmTcPrinter,
tm_listener: TmListener):
"""
Initializes the binary file uploader with the required components.
@param com_if:
@param tmtc_printer:
@param tm_listener:
"""
self.com_if = com_if
self.tmtc_printer = tmtc_printer
self.tm_listener = tm_listener
self.iobc = False
# Will be set later depending on board.
self.send_interval = 0
def perform_file_upload(self):
gui_cl_prompt = input("GUI(0) or command line version (1)? [0/1]: ")
if gui_cl_prompt == 0:
gui_cl_prompt = True
else:
gui_cl_prompt = False
print("Please select file to upload: ")
file_path = ""
if gui_cl_prompt:
root = tk.Tk()
root.withdraw()
root.wm_attributes('-topmost', 1)
file_path = filedialog.askopenfilename(parent=root)
print("File select: " + str(file_path))
if file_path == ():
LOGGER.warning("Invalid file path, exiting binary upload mode.")
return
else:
result = [y for x in os.walk("../_bin") for y in glob(os.path.join(x[0], '*.bin'))]
print("Files found in _bin folder: ")
for idx, path in enumerate(result):
print("Selection " + str(idx) + ": " + str(path))
select_valid = False
selection = input("Please enter desired selection [c to cancel]: ")
while not select_valid:
if selection == 'c':
print("Exiting binary upload mode..")
return
if not selection.isdigit():
selection = input("Invalid input, try again [c to cancel]: ")
if selection.isdigit():
if int(selection) < len(result):
file_path = result[int(selection)]
select_valid = True
else:
selection = input("Invalid input, try again [c to cancel]: ")
print_string = file_path.rsplit(os.path.sep, 1)[-1] + " was selected."
LOGGER.info(print_string)
calc_hamming_code = input("Calculate and send hamming code? [y/n]: ")
if calc_hamming_code in ['y', 'yes', 1]:
calc_hamming_code = True
print("Hamming code will be calculated and sent in tail packet")
else:
calc_hamming_code = False
print("Hamming code will not be calculated")
iobc_prompt = input("iOBC? [y/n]: ")
if iobc_prompt in ['y', 'yes', 1]:
self.iobc = True
self.send_interval = 1.4
iobc_prompt = True
else:
self.iobc = False
self.send_interval = 0.6
iobc_prompt = False
bootloader_prompt = input("Bootloader (0) or Software Image (1)? [0/1]: ")
if str(bootloader_prompt) == "0":
bootloader_prompt = True
else:
bootloader_prompt = False
prompt_lock = input("Lock file with last packet? [y/n]: ")
if prompt_lock in ['n', "no", 0]:
prompt_lock = False
else:
prompt_lock = True
if bootloader_prompt:
file_name = "bl.bin"
else:
file_name = "obsw_up.bin"
if iobc_prompt:
if bootloader_prompt:
repository_name = "BIN/IOBC/BL"
else:
repository_name = "BIN/IOBC/OBSW"
else:
if bootloader_prompt:
repository_name = "BIN/AT91/BL"
else:
repository_name = "BIN/AT91/OBSW"
if calc_hamming_code:
pass
# Right now, the size of PUS packets is limited to 1500 bytes which also limits the app
# data length.
frame_length = g.G_MAX_APP_DATA_LENGTH
if calc_hamming_code:
# now we calculate the hamming code
pass
tc_queue = deque()
# Delete existing binary file first, otherwise data might be appended to otherwise
# valid file which already exists.
file_transfer_helper = FileTransferHelper(
tc_queue=tc_queue, max_size_of_app_data=frame_length, target_repository=repository_name,
target_filename=file_name)
init_ssc = 0
self.tmtc_printer.set_display_mode(DisplayMode.SHORT)
# Configure file transfer helper
file_transfer_helper.set_data_from_file(file_path)
file_transfer_helper.set_to_delete_old_file()
if prompt_lock:
file_transfer_helper.set_to_lock_file(prompt_lock)
else:
file_transfer_helper.set_to_lock_file(prompt_lock)
# Generate the packets.
file_transfer_helper.generate_packets(init_ssc)
self.tm_listener.set_listener_mode(TmListener.ListenerModes.MANUAL)
print_string = "BinaryUploader: Detected file size: " + str(
file_transfer_helper.file_size())
LOGGER.info(print_string)
total_num_packets = file_transfer_helper.get_number_of_packets_generated()
print_string = "BinaryUploader: " + str(total_num_packets) + \
" packets generated."
if prompt_lock:
print_string += " File will be locked."
else:
print_string += " File will not be locked."
LOGGER.info(print_string)
packets_received = self.__perform_send_algorithm(tc_queue, total_num_packets)
LOGGER.info("BinaryUploader: All binary packets were sent!")
print_string = str(packets_received) + " replies received."
LOGGER.info(print_string)
self.tm_listener.clear_tm_packet_queue()
LOGGER.info("Upload operation finished successfully.")
def __perform_send_algorithm(self, tc_queue: TcQueueT, number_of_packets: int) -> int:
last_check = time.time()
last_sent = time.time()
reception_dict = dict()
total_time = self.send_interval * number_of_packets
idx = 1
new_packets_received = 0
num_packets_received = 0
while tc_queue:
next_send = last_sent + self.send_interval
tc_packet, tc_info = tc_queue.pop()
if not isinstance(tc_packet, str):
# print_string = "Sending packet " + str(idx) + ".."
# LOGGER.info(print_string)
idx += 1
self.com_if.send_telecommand(tc_packet, tc_info)
self.tmtc_printer.print_telecommand(tc_packet, tc_info)
# Store a list which tracks whether acceptance messages were received
# The first entry is a simply counter.
reception_dict.update({tc_info[TcDictionaryKeys.SSC]: [0, False, False, False]})
elif tc_packet == "print":
LOGGER.info(tc_info)
remaining_time_string = "Remaining time: " + \
str(round(total_time - (idx - 2) * self.send_interval, 2)) + \
" seconds"
print_progress_bar(idx - 2, number_of_packets, print_end="\n",
suffix=remaining_time_string)
# sys.stdout.write("\033[F") # Cursor up one line
new_packets_received += self.__handle_tm_queue(reception_dict)
# Every 5 seconds, check whether any reply has been received. If not, cancel operation.
if time.time() - last_check > 5.0 and new_packets_received == 0:
LOGGER.warning("No replies are being received, cancelling upload operation..")
elif time.time() - last_check > 5.0:
num_packets_received += new_packets_received
new_packets_received = 0
last_check = time.time()
time_to_sleep = next_send - time.time()
last_sent = next_send
time.sleep(time_to_sleep)
# handle last telemetry packets coming in.
wait_time = 4
LOGGER.info("Waiting " + str(wait_time) + " more seconds for TM packets..")
time_till_wake = time.time() + wait_time
while time.time() < time_till_wake:
new_packets_received += self.__handle_tm_queue(reception_dict)
time.sleep(1)
num_packets_received += new_packets_received
return num_packets_received
def __handle_tm_queue(self, reception_dict: dict) -> int:
num_packets_received = 0
packets_received = self.tm_listener.retrieve_tm_packet_queue()
self.tm_listener.clear_tm_packet_queue()
# reception_deque.extend(packets_received)
for packet_list in packets_received:
for packet in packet_list:
num_packets_received += 1
if packet.get_service() == 1:
packet: Service1TM
tc_ssc = packet.get_tc_ssc()
acceptance_list = reception_dict.get(tc_ssc)
if acceptance_list is None:
LOGGER.warning("Invalid TC SSC with number " + str(tc_ssc) +
" detected!")
continue
if packet.get_subservice() == 1:
acceptance_list[1] = True
if packet.get_subservice() == 3:
acceptance_list[2] = True
if packet.get_subservice() == 7:
acceptance_list[3] = True
reception_dict.update({tc_ssc: acceptance_list})
elif packet.get_service() == 5:
# TODO: print event
LOGGER.info("Event received!")
else:
print_string = "Other TM packet with service ID " + str(packet.get_service()) + \
" received!"
LOGGER.info(print_string)
clear_list = []
for key, reception_list in reception_dict.items():
if reception_list[1] and reception_list[2] and reception_list[3]:
# All replies received, we can delete the entry and confirm succesfull handling
print_string = "All replies received for upload packet with SSC " + str(key) + "."
LOGGER.info(print_string)
clear_list.append(key)
elif reception_list[0] > 5:
# No replies for telecommand.
LOGGER.warning("No reply received for TC SSC " + str(key) + "!")
# Continue nonetheless
clear_list.append(key)
for key in clear_list:
del reception_dict[key]
return num_packets_received
# https://stackoverflow.com/questions/3173320/text-progress-bar-in-the-console
# Thank you Greensticks :-)
def print_progress_bar(iteration, total, prefix='', suffix='', decimals=1, length=100,
fill='', print_end="\r"):
"""
Call in a loop to create terminal progress bar
@params:
iteration - Required : current iteration (Int)
total - Required : total iterations (Int)
prefix - Optional : prefix string (Str)
suffix - Optional : suffix string (Str)
decimals - Optional : positive number of decimals in percent complete (Int)
length - Optional : character length of bar (Int)
fill - Optional : bar fill character (Str)
print_end - Optional : end character (e.g. "\r", "\r\n") (Str)
"""
percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total)))
filled_length = int(length * iteration // total)
bar = fill * filled_length + '-' * (length - filled_length)
print(f'\r{prefix} |{bar}| {percent}% {suffix}', end=print_end)
# Print New Line on Complete
if iteration == total:
print()
from enum import Enum
import math
from config.obsw_config import SD_CARD_HANDLER_ID
from tmtc_core.tc.obsw_pus_tc_base import TcQueueT, PusTelecommand
from tc.obsw_tc_service23_sdcard import \
from config.tmtcc_config import SD_CARD_HANDLER_ID
from tmtc_core.pus_tc.tmtcc_pus_tc_base import TcQueueT, PusTelecommand
from pus_tc.tmtcc_tc_service23_sdcard import \
calculate_allowed_file_data_size, generate_rm_file_srv23_2_packet, \
generate_create_file_srv23_1_packet, generate_finish_append_to_file_srv23_131_packet, \
generate_lock_file_srv23_5_6_packet
......@@ -223,6 +223,7 @@ class FileTransferHelper:
header += data[number_of_packets * size_of_data_blocks:len(data)]
commands = PusTelecommand(service=23, subservice=130, ssc=init_ssc + packet_sequence_number,
app_data=header)
self.__current_ssc = init_ssc + packet_sequence_number
self.tc_queue.appendleft(commands.pack_command_tuple())
def __handle_finish_and_lock_packet_generation(self):
......@@ -230,12 +231,14 @@ class FileTransferHelper:
last_command = generate_finish_append_to_file_srv23_131_packet(
filename=self.target_filename, repository_path=self.target_repository,
ssc=self.__current_ssc, lock_file=self.__lock_file)
self.__current_ssc += 1
else:
if self.__lock_file:
last_command = generate_lock_file_srv23_5_6_packet(
filename=self.target_filename, repository_path=self.target_repository,
object_id=self.object_id, lock=True, ssc=self.__current_ssc)
self.__current_ssc += 1
else:
self.__number_of_finish_packets = 0
return
self.tc_queue.appendleft(last_command.pack_command_tuple())
\ No newline at end of file
self.tc_queue.appendleft(last_command.pack_command_tuple())