Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision

Target

Select target project
  • Robin.Mueller/tmtc
1 result
Select Git revision
Show changes
# -*- coding: utf-8 -*-
"""
Program: obsw_tm_service_5.py
Date: 30.12.2019
Description: Deserialize PUS Event Report
Author: R. Mueller
"""
from tmtc_core.tm.obsw_pus_tm_base import PusTelemetry, TmDictionaryKeys
from tmtc_core.tm.obsw_pus_tm_factory import PusTmInfoT
import struct
class Service5TM(PusTelemetry):
def __init__(self, byte_array):
super().__init__(byte_array)
self.specify_packet_info("Event")
if self.get_subservice() == 1:
self.append_packet_info(" Info")
elif self.get_subservice() == 2:
self.append_packet_info(" Error Low Severity")
elif self.get_subservice() == 3:
self.append_packet_info(" Error Med Severity")
elif self.get_subservice() == 4:
self.append_packet_info(" Error High Severity")
self.eventId = struct.unpack('>H', self._tm_data[0:2])[0]
self.objectId = struct.unpack('>I', self._tm_data[2:6])[0]
self.param1 = struct.unpack('>I', self._tm_data[6:10])[0]
self.param2 = struct.unpack('>I', self._tm_data[10:14])[0]
def append_telemetry_content(self, array):
super().append_telemetry_content(array)
array.append(str(self.eventId))
array.append(hex(self.objectId))
array.append(str(hex(self.param1)) + ", " + str(self.param1))
array.append(str(hex(self.param2)) + ", " + str(self.param2))
def append_telemetry_column_headers(self, array):
super().append_telemetry_column_headers(array)
array.append("Event ID")
array.append("Reporter ID")
array.append("Parameter 1")
array.append("Parameter 2")
def pack_tm_information(self) -> PusTmInfoT:
tm_information = super().pack_tm_information()
add_information = {
TmDictionaryKeys.REPORTER_ID: self.objectId,
TmDictionaryKeys.EVENT_ID: self.eventId,
TmDictionaryKeys.EVENT_PARAM_1: self.param1,
TmDictionaryKeys.EVENT_PARAM_2: self.param2
}
tm_information.update(add_information)
return tm_information
#!/usr/bin/python3 #!/usr/bin/python3
""" """
@brief This client was developed by KSat for the SOURCE project to test the on-board software. @brief TMTC Commander entry point for command line mode.
@details @details
This client features multiple sender/receiver modes and has been designed This client was developed by KSat for the SOURCE project to test the on-board software but
to be extensible and easy to use. This clien is based on the PUS standard for the format has evolved into a more generic tool for satellite developers to perform TMTC (Telemetry and Telecommand)
of telecommands and telemetry. It can also send TMTC via different interfaces like the handling and testing via different communication interfaces. Currently, only the PUS standard is
serial interface (USB port) or ethernet interface. implemented as a packet standard.
Run this file with the -h flag to display options.
@license @license
Copyright 2020 KSat e.V. Stuttgart Copyright 2020 KSat e.V. Stuttgart
...@@ -22,13 +24,14 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ...@@ -22,13 +24,14 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
@manual @author R. Mueller
Run this file with the -h flag to display options.
""" """
from core.tmtc_client_core import run_tmtc_client from core.tmtc_client_core import run_tmtc_client
def main(): def main():
run_tmtc_client(False) run_tmtc_client(False)
if __name__ == "__main__": if __name__ == "__main__":
main() main()
#!/usr/bin/python3 #!/usr/bin/python3
""" """
@brief This client was developed by KSat for the SOURCE project to test the on-board software. @brief TMTC Commander entry point for GUI mode.
@details @details
This client features multiple sender/receiver modes and has been designed This client was developed by KSat for the SOURCE project to test the on-board software but
to be extensible and easy to use. This clien is based on the PUS standard for the format has evolved into a more generic tool for satellite developers to perform TMTC (Telemetry and Telecommand)
of telecommands and telemetry. It can also send TMTC via different interfaces like the handling and testing via different communication interfaces. Currently, only the PUS standard is
serial interface (USB port) or ethernet interface. implemented as a packet standard.
Run this file with the -h flag to display options.
@license @license
Copyright 2020 KSat e.V. Stuttgart Copyright 2020 KSat e.V. Stuttgart
...@@ -22,12 +24,14 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ...@@ -22,12 +24,14 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
@manual @author R. Mueller
""" """
from core.tmtc_client_core import run_tmtc_client from core.tmtc_client_core import run_tmtc_client
def main(): def main():
run_tmtc_client(True) run_tmtc_client(True)
if __name__ == "__main__": if __name__ == "__main__":
main() main()
\ No newline at end of file
Subproject commit 1e06be166b0bf47a2a921fd45eed49ea9f5782c3 Subproject commit 3749440b9f1051b9d022f00012f1da417759fe32
#!/usr/bin/python3.8 #!/usr/bin/python3.8
""" """
Argument parser module. Argument parser module.
""" """
import argparse import argparse
import sys import sys
from tmtc_core.utility.obsw_logger import get_logger from tmtc_core.utility.obsw_logger import get_logger
LOGGER = get_logger() LOGGER = get_logger()
def parse_input_arguments(): def parse_input_arguments():
""" """
Parses all input arguments Parses all input arguments
:return: Input arguments contained in a special namespace and accessable by args.<variable> :return: Input arguments contained in a special namespace and accessable by args.<variable>
""" """
arg_parser = argparse.ArgumentParser(description="TMTC Client Command Line Interface") arg_parser = argparse.ArgumentParser(description="TMTC Client Command Line Interface")
arg_parser.add_argument( arg_parser.add_argument(
'-m', '--mode', type=int, help='Target Mode. Default is 1 (Listener Mode), ' '-m', '--mode', type=int, help='Target Mode. Default is 1 (Listener Mode), '
'0: GUI Mode, 1: Listener Mode, 2: Single Command Mode, 3: Service Test Mode, ' '1: Listener Mode, 2: Single Command Mode, 3: Service Test Mode, '
'4: Software Test Mode, 5: Binary Upload Mode, 6: Unit Test Mode ', default=0) '4: Software Test Mode, 5: Binary Upload Mode, 6: Unit Test Mode ', default=0)
arg_parser.add_argument( arg_parser.add_argument(
'-c', '--com_if', type=int, help='Communication Interface. 0: Dummy Interface, 1: Serial, ' '-c', '--com_if', type=int, help='Communication Interface. 0: Dummy Interface, 1: Serial, '
'2: QEMU, 3: UDP', default=2) '2: QEMU, 3: UDP', default=2)
arg_parser.add_argument( arg_parser.add_argument(
'-o', '--op_code', help='Operation code, which is passed to the TC ' '-o', '--op_code', help='Operation code, which is passed to the TC '
'packer functions', default=0) 'packer functions', default=0)
arg_parser.add_argument('--clientIP', help='Client(Computer) IP. Default:\'\'', default='') arg_parser.add_argument('--clientIP', help='Client(Computer) IP. Default:\'\'', default='')
arg_parser.add_argument( arg_parser.add_argument(
'--boardIP', help='Board IP. Default: Localhost 127.0.0.1', default="127.0.0.1") '--boardIP', help='Board IP. Default: Localhost 127.0.0.1', default="127.0.0.1")
arg_parser.add_argument('-s', '--service', help='Service to test. Default: 17', default=17) arg_parser.add_argument('-s', '--service', help='Service to test. Default: 17', default=17)
arg_parser.add_argument( arg_parser.add_argument(
'-l','--listener', help='Determine whether the listener mode will be active after ' '-l', '--listener', help='Determine whether the listener mode will be active '
'performing the operation', 'after performing the operation',
action='store_false') action='store_false')
arg_parser.add_argument( arg_parser.add_argument(
'-t', '--tm_timeout', type=float, help='TM Timeout when listening to verification sequence.' '-t', '--tm_timeout', type=float, help='TM Timeout when listening to verification sequence.'
' Default: 5 seconds', default=5.0) ' Default: 5 seconds', default=5.0)
arg_parser.add_argument( arg_parser.add_argument(
'--nl', dest='print_log', help='Supply --nl to suppress print output to log files.', '--nl', dest='print_log', help='Supply --nl to suppress print output to log files.',
action='store_false') action='store_false')
arg_parser.add_argument( arg_parser.add_argument(
'--np', dest='print_tm', help='Supply --np to suppress print output to console.', '--np', dest='print_tm', help='Supply --np to suppress print output to console.',
action='store_false') action='store_false')
arg_parser.add_argument( arg_parser.add_argument(
'--tc_timeout_factor', type=float, help='TC Timeout Factor. Multiplied with ' '--tc_timeout_factor', type=float, help='TC Timeout Factor. Multiplied with '
'TM Timeout, TC sent again after this time period. Default: 3.5', default=3.5) 'TM Timeout, TC sent again after this time period. Default: 3.5', default=3.5)
arg_parser.add_argument( arg_parser.add_argument(
'-r', '--rawDataPrint', help='Supply -r to print all raw TM data directly', '-r', '--rawDataPrint', help='Supply -r to print all raw TM data directly',
action='store_true') action='store_true')
arg_parser.add_argument( arg_parser.add_argument(
'-d', '--shortDisplayMode', help='Supply -d to print short output', action='store_true') '-d', '--shortDisplayMode', help='Supply -d to print short output', action='store_true')
arg_parser.add_argument( arg_parser.add_argument(
'--hk', dest='print_hk', help='Supply -k or --hk to print HK data', action='store_true') '--hk', dest='print_hk', help='Supply -k or --hk to print HK data', action='store_true')
arg_parser.add_argument('--COM', dest="com_port", help='COM Port for serial communication') arg_parser.add_argument(
arg_parser.add_argument( '--rs', dest="resend_tc", help='Specify whether TCs are sent again after timeout',
'--rs', dest="resend_tc", help='Specify whether TCs are sent again after timeout', action='store_true')
action='store_true')
if len(sys.argv) == 1:
if len(sys.argv) == 1: print("No Input Arguments specified.")
print("No Input Arguments specified.") arg_parser.print_help()
arg_parser.print_help() args, unknown = arg_parser.parse_known_args()
args, unknown = arg_parser.parse_known_args() # for argument in vars(args):
# for argument in vars(args): # LOGGER.debug(argument + ": " + str(getattr(args, argument)))
# LOGGER.debug(argument + ": " + str(getattr(args, argument))) handle_args(args, unknown)
handle_args(args, unknown)
return args return args
def handle_args(args, unknown: list) -> None: def handle_args(args, unknown: list) -> None:
""" """
Handles the parsed arguments. Handles the parsed arguments.
:param args: Namespace objects :param args: Namespace objects
(see https://docs.python.org/dev/library/argparse.html#argparse.Namespace) (see https://docs.python.org/dev/library/argparse.html#argparse.Namespace)
:param unknown: List of unknown parameters. :param unknown: List of unknown parameters.
:return: None :return: None
""" """
if len(unknown) > 0: if len(unknown) > 0:
print("Unknown arguments detected: " + str(unknown)) print("Unknown arguments detected: " + str(unknown))
if len(sys.argv) > 1: if len(sys.argv) > 1:
handle_unspecified_args(args) handle_unspecified_args(args)
if len(sys.argv) == 1: if len(sys.argv) == 1:
handle_empty_args(args) handle_empty_args(args)
def handle_unspecified_args(args) -> None: def handle_unspecified_args(args) -> None:
""" """
If some arguments are unspecified, they are set here with (variable) default values. If some arguments are unspecified, they are set here with (variable) default values.
:param args: :param args:
:return: None :return: None
""" """
if args.com_if == 1 and args.tm_timeout is None: if args.com_if == 1 and args.tm_timeout is None:
args.tm_timeout = 6.0 args.tm_timeout = 6.0
if args.mode is None: if args.mode is None:
print("No mode specified with -m Parameter.") print("No mode specified with -m Parameter.")
print("Possible Modes: ") print("Possible Modes: ")
print("1: Listener Mode") print("1: Listener Mode")
print("2: Single Command Mode with manual command") print("2: Single Command Mode with manual command")
print("3: Service Mode, Commands specified in tc folder") print("3: Service Mode, Commands specified in pus_tc folder")
print("4: Software Mode, runs all command specified in obsw_pus_tc_packer.py") print("4: Software Mode, runs all command specified in tmtcc_pus_tc_packer.py")
print("5: Unit Test, runs unit test specified in obsw_module_test.py") print("5: Unit Test, runs unit test specified in obsw_module_test.py")
args.mode = input("Please enter Mode: ") args.mode = input("Please enter Mode: ")
if args.mode == 1 and args.service is None: if args.mode == 1 and args.service is None:
args.service = input("No Service specified for Service Mode. " args.service = input("No Service specified for Service Mode. "
"Please enter PUS G_SERVICE number: ") "Please enter PUS G_SERVICE number: ")
def handle_empty_args(args) -> None: def handle_empty_args(args) -> None:
""" """
If no args were supplied, request input from user directly. If no args were supplied, request input from user directly.
TODO: This still needs to be extended. TODO: This still needs to be extended.
:param args: :param args:
:return: :return:
""" """
print_hk = input("Print HK packets ? (y/n or yes/no)") print_hk = input("Print HK packets ? (y/n or yes/no)")
try: try:
print_hk = print_hk.lower() print_hk = print_hk.lower()
except TypeError: except TypeError:
pass pass
if print_hk in ('y', 'yes', 1): if print_hk in ('y', 'yes', 1):
args.print_hk = True args.print_hk = True
else: else:
args.print_hk = False args.print_hk = False
print_to_log = input("Export G_SERVICE test output to log files ? (y/n or yes/no)") print_to_log = input("Export G_SERVICE test output to log files ? (y/n or yes/no)")
try: try:
print_to_log = print_to_log.lower() print_to_log = print_to_log.lower()
except TypeError: except TypeError:
pass pass
if print_to_log in ('n', 'no', 0): if print_to_log in ('n', 'no', 0):
args.printFile = False args.printFile = False
else: else:
args.printFile = True args.printFile = True
#!/usr/bin/python3.8 #!/usr/bin/python3.8
""" """
@brief Binary Uploader Module @brief Binary Uploader Module
@details @details
This module will be used to upload binaries to the OBC via a communication port, given This module will be used to upload binaries to the OBC via a communication port, given
a supplied binary. The binary will be sent via the specified communication interface. a supplied binary. The binary will be sent via the specified communication interface.
It will be possible to encode the data (for example using DLE encoding) It will be possible to encode the data (for example using DLE encoding)
""" """
import os import os
import time import time
import tkinter as tk import tkinter as tk
from tkinter import filedialog from tkinter import filedialog
from collections import deque from collections import deque
from glob import glob from glob import glob
from typing import Deque
from tmtc_core.pus_tm.tmtcc_tm_service_1 import Service1TM
from tmtc_core.comIF.obsw_com_interface import CommunicationInterface from tmtc_core.comIF.obsw_com_interface import CommunicationInterface
from utility.obsw_file_transfer_helper import FileTransferHelper from tmtc_core.pus_tc.tmtcc_pus_tc_base import TcQueueT, TcDictionaryKeys
import config.obsw_config as g from utility.tmtcc_file_transfer_helper import FileTransferHelper
from tmtc_core.utility.obsw_tmtc_printer import TmTcPrinter, DisplayMode import config.tmtcc_config as g
from tmtc_core.utility.obsw_logger import get_logger from tmtc_core.utility.obsw_tmtc_printer import TmTcPrinter, DisplayMode
from tmtc_core.sendreceive.obsw_tm_listener import TmListener from tmtc_core.utility.obsw_logger import get_logger
from tmtc_core.sendreceive.obsw_tm_listener import TmListener
LOGGER = get_logger()
LOGGER = get_logger()
class BinaryFileUploader:
def __init__(self, com_if: CommunicationInterface, tmtc_printer: TmTcPrinter, class BinaryFileUploader:
tm_listener: TmListener): def __init__(self, com_if: CommunicationInterface, tmtc_printer: TmTcPrinter,
""" tm_listener: TmListener):
Initializes the binary file uploader with the required components. """
@param com_if: Initializes the binary file uploader with the required components.
@param tmtc_printer: @param com_if:
@param tm_listener: @param tmtc_printer:
""" @param tm_listener:
self.com_if = com_if """
self.tmtc_printer = tmtc_printer self.com_if = com_if
self.tm_listener = tm_listener self.tmtc_printer = tmtc_printer
self.iobc = False self.tm_listener = tm_listener
self.send_interval = 1.0 self.iobc = False
# Will be set later depending on board.
def perform_file_upload(self): self.send_interval = 0
gui_cl_prompt = input("GUI(0) or command line version (1)? [0/1]: ")
if gui_cl_prompt == 0: def perform_file_upload(self):
gui_cl_prompt = True gui_cl_prompt = input("GUI(0) or command line version (1)? [0/1]: ")
else: if gui_cl_prompt == 0:
gui_cl_prompt = False gui_cl_prompt = True
print("Please select file to upload: ") else:
file_path = "" gui_cl_prompt = False
if gui_cl_prompt: print("Please select file to upload: ")
root = tk.Tk() file_path = ""
root.withdraw() if gui_cl_prompt:
root.wm_attributes('-topmost', 1) root = tk.Tk()
file_path = filedialog.askopenfilename(parent=root) root.withdraw()
print("File select: " + str(file_path)) root.wm_attributes('-topmost', 1)
if file_path == (): file_path = filedialog.askopenfilename(parent=root)
LOGGER.warning("Invalid file path, exiting binary upload mode.") print("File select: " + str(file_path))
return if file_path == ():
else: LOGGER.warning("Invalid file path, exiting binary upload mode.")
result = [y for x in os.walk("../_bin") for y in glob(os.path.join(x[0], '*.bin'))] return
print("Files found in _bin folder: ") else:
for idx, path in enumerate(result): result = [y for x in os.walk("../_bin") for y in glob(os.path.join(x[0], '*.bin'))]
print("Selection " + str(idx) + ": " + str(path)) print("Files found in _bin folder: ")
select_valid = False for idx, path in enumerate(result):
selection = input("Please enter desired selection [c to cancel]: ") print("Selection " + str(idx) + ": " + str(path))
while not select_valid: select_valid = False
if selection == 'c': selection = input("Please enter desired selection [c to cancel]: ")
print("Exiting binary upload mode..") while not select_valid:
return if selection == 'c':
if not selection.isdigit(): print("Exiting binary upload mode..")
selection = input("Invalid input, try again [c to cancel]: ") return
if selection.isdigit(): if not selection.isdigit():
if int(selection) < len(result): selection = input("Invalid input, try again [c to cancel]: ")
file_path = result[int(selection)] if selection.isdigit():
select_valid = True if int(selection) < len(result):
else: file_path = result[int(selection)]
selection = input("Invalid input, try again [c to cancel]: ") select_valid = True
else:
print_string = file_path.rsplit(os.path.sep, 1)[-1] + " was selected." selection = input("Invalid input, try again [c to cancel]: ")
LOGGER.info(print_string)
calc_hamming_code = input("Calculate and send hamming code? [y/n]: ") print_string = file_path.rsplit(os.path.sep, 1)[-1] + " was selected."
if calc_hamming_code in ['y', 'yes', 1]: LOGGER.info(print_string)
calc_hamming_code = True calc_hamming_code = input("Calculate and send hamming code? [y/n]: ")
print("Hamming code will be calculated and sent in tail packet") if calc_hamming_code in ['y', 'yes', 1]:
else: calc_hamming_code = True
calc_hamming_code = False print("Hamming code will be calculated and sent in tail packet")
print("Hamming code will not be calculated") else:
calc_hamming_code = False
iobc_prompt = input("iOBC? [y/n]: ") print("Hamming code will not be calculated")
if iobc_prompt in ['y', 'yes', 1]:
self.iobc = True iobc_prompt = input("iOBC? [y/n]: ")
self.send_interval = 0.8 if iobc_prompt in ['y', 'yes', 1]:
iobc_prompt = True self.iobc = True
else: self.send_interval = 1.4
self.iobc = False iobc_prompt = True
self.send_interval = 0.6 else:
iobc_prompt = False self.iobc = False
self.send_interval = 0.6
bootloader_prompt = input("Bootloader (0) or Software Image (1)? [0/1]: ") iobc_prompt = False
if str(bootloader_prompt) == "0":
bootloader_prompt = True bootloader_prompt = input("Bootloader (0) or Software Image (1)? [0/1]: ")
else: if str(bootloader_prompt) == "0":
bootloader_prompt = False bootloader_prompt = True
else:
prompt_lock = input("Lock file with last packet? [y/n]: ") bootloader_prompt = False
if prompt_lock in ['n', "no", 0]:
prompt_lock = False prompt_lock = input("Lock file with last packet? [y/n]: ")
else: if prompt_lock in ['n', "no", 0]:
prompt_lock = True prompt_lock = False
else:
if bootloader_prompt: prompt_lock = True
file_name = "bl.bin"
else: if bootloader_prompt:
file_name = "obsw_up.bin" file_name = "bl.bin"
else:
if iobc_prompt: file_name = "obsw_up.bin"
if bootloader_prompt:
repository_name = "BIN/IOBC/BL" if iobc_prompt:
else: if bootloader_prompt:
repository_name = "BIN/IOBC/OBSW" repository_name = "BIN/IOBC/BL"
else: else:
if bootloader_prompt: repository_name = "BIN/IOBC/OBSW"
repository_name = "BIN/AT91/BL" else:
else: if bootloader_prompt:
repository_name = "BIN/AT91/OBSW" repository_name = "BIN/AT91/BL"
else:
if calc_hamming_code: repository_name = "BIN/AT91/OBSW"
pass
if calc_hamming_code:
# Right now, the size of PUS packets is limited to 1500 bytes which also limits the app pass
# data length.
frame_length = g.G_MAX_APP_DATA_LENGTH # Right now, the size of PUS packets is limited to 1500 bytes which also limits the app
# data length.
if calc_hamming_code: frame_length = g.G_MAX_APP_DATA_LENGTH
# now we calculate the hamming code
pass if calc_hamming_code:
# now we calculate the hamming code
tc_queue = deque() pass
# Delete existing binary file first, otherwise data might be appended to otherwise tc_queue = deque()
# valid file which already exists.
file_transfer_helper = FileTransferHelper( # Delete existing binary file first, otherwise data might be appended to otherwise
tc_queue=tc_queue, max_size_of_app_data=frame_length, target_repository=repository_name, # valid file which already exists.
target_filename=file_name) file_transfer_helper = FileTransferHelper(
tc_queue=tc_queue, max_size_of_app_data=frame_length, target_repository=repository_name,
init_ssc = 0 target_filename=file_name)
self.tmtc_printer.set_display_mode(DisplayMode.SHORT)
init_ssc = 0
# Configure file transfer helper self.tmtc_printer.set_display_mode(DisplayMode.SHORT)
file_transfer_helper.set_data_from_file(file_path)
file_transfer_helper.set_to_delete_old_file() # Configure file transfer helper
if prompt_lock: file_transfer_helper.set_data_from_file(file_path)
file_transfer_helper.set_to_lock_file(prompt_lock) file_transfer_helper.set_to_delete_old_file()
else: if prompt_lock:
file_transfer_helper.set_to_lock_file(prompt_lock) file_transfer_helper.set_to_lock_file(prompt_lock)
else:
# Generate the packets. file_transfer_helper.set_to_lock_file(prompt_lock)
file_transfer_helper.generate_packets(init_ssc)
# Generate the packets.
self.tm_listener.set_listener_mode(TmListener.ListenerModes.MANUAL) file_transfer_helper.generate_packets(init_ssc)
print_string = "BinaryUploader: Detected file size: " + str(
file_transfer_helper.file_size()) self.tm_listener.set_listener_mode(TmListener.ListenerModes.MANUAL)
LOGGER.info(print_string) print_string = "BinaryUploader: Detected file size: " + str(
total_num_packets = file_transfer_helper.get_number_of_packets_generated() file_transfer_helper.file_size())
print_string = "BinaryUploader: " + str(total_num_packets) + \ LOGGER.info(print_string)
" packets generated." total_num_packets = file_transfer_helper.get_number_of_packets_generated()
if prompt_lock: print_string = "BinaryUploader: " + str(total_num_packets) + \
print_string += " File will be locked." " packets generated."
else: if prompt_lock:
print_string += " File will not be locked." print_string += " File will be locked."
LOGGER.info(print_string) else:
print_string += " File will not be locked."
reception_deque = deque() LOGGER.info(print_string)
self.__perform_send_algorithm(tc_queue, total_num_packets, reception_deque)
packets_received = self.__perform_send_algorithm(tc_queue, total_num_packets)
print_string = "BinaryUploader: All binary packets were sent!"
LOGGER.info(print_string) LOGGER.info("BinaryUploader: All binary packets were sent!")
print_string = str(reception_deque.__len__()) + " replies received." print_string = str(packets_received) + " replies received."
LOGGER.info(print_string) LOGGER.info(print_string)
time.sleep(15) self.tm_listener.clear_tm_packet_queue()
reception_deque.extend(self.tm_listener.retrieve_tm_packet_queue()) LOGGER.info("Upload operation finished successfully.")
for tm_list in reception_deque:
for tm_packet in tm_list: def __perform_send_algorithm(self, tc_queue: TcQueueT, number_of_packets: int) -> int:
if tm_packet.get_service() == 23 and tm_packet.get_subservice() == 132: last_check = time.time()
# tmtc_printer.print_telemetry(tm_packet) last_sent = time.time()
pass reception_dict = dict()
self.tm_listener.clear_tm_packet_queue() total_time = self.send_interval * number_of_packets
LOGGER.info("Transitioning back to listener mode..") idx = 1
new_packets_received = 0
def __perform_send_algorithm(self, tc_queue: Deque, number_of_packets: int, reception_deque: num_packets_received = 0
Deque): while tc_queue:
last_check = time.time() next_send = last_sent + self.send_interval
last_sent = time.time() tc_packet, tc_info = tc_queue.pop()
total_time = self.send_interval * number_of_packets if not isinstance(tc_packet, str):
idx = 1 # print_string = "Sending packet " + str(idx) + ".."
while tc_queue: # LOGGER.info(print_string)
next_send = last_sent + self.send_interval idx += 1
(tc_packet, tc_info) = tc_queue.pop() self.com_if.send_telecommand(tc_packet, tc_info)
if not isinstance(tc_packet, str): self.tmtc_printer.print_telecommand(tc_packet, tc_info)
# print_string = "Sending packet " + str(idx) + ".." # Store a list which tracks whether acceptance messages were received
# LOGGER.info(print_string) # The first entry is a simply counter.
idx += 1 reception_dict.update({tc_info[TcDictionaryKeys.SSC]: [0, False, False, False]})
self.com_if.send_telecommand(tc_packet, tc_info) elif tc_packet == "print":
self.tmtc_printer.print_telecommand(tc_packet, tc_info) LOGGER.info(tc_info)
elif tc_packet == "print": remaining_time_string = "Remaining time: " + \
LOGGER.info(tc_info) str(round(total_time - (idx - 2) * self.send_interval, 2)) + \
remaining_time_string = "Remaining time: " + \ " seconds"
str(round(total_time - (idx - 2) * self.send_interval, 2)) + \ print_progress_bar(idx - 2, number_of_packets, print_end="\n",
" seconds" suffix=remaining_time_string)
print_progress_bar(idx - 2, number_of_packets, print_end="\n", # sys.stdout.write("\033[F") # Cursor up one line
suffix=remaining_time_string) new_packets_received += self.__handle_tm_queue(reception_dict)
# sys.stdout.write("\033[F") # Cursor up one line
packets_received = self.tm_listener.retrieve_tm_packet_queue() # Every 5 seconds, check whether any reply has been received. If not, cancel operation.
reception_deque.extend(packets_received) if time.time() - last_check > 5.0 and new_packets_received == 0:
# Every 5 seconds, check whether any reply has been received. If not, cancel operation. LOGGER.warning("No replies are being received, cancelling upload operation..")
if time.time() - last_check > 5.0 and len(packets_received) == 0: elif time.time() - last_check > 5.0:
LOGGER.warning("No replies are being received, cancelling upload operation..") num_packets_received += new_packets_received
time_to_sleep = next_send - time.time() new_packets_received = 0
last_sent = next_send last_check = time.time()
time.sleep(time_to_sleep)
time_to_sleep = next_send - time.time()
last_sent = next_send
# https://stackoverflow.com/questions/3173320/text-progress-bar-in-the-console time.sleep(time_to_sleep)
# Thank you Greensticks :-)
def print_progress_bar(iteration, total, prefix='', suffix='', decimals=1, length=100, # handle last telemetry packets coming in.
fill='', print_end="\r"): wait_time = 4
""" LOGGER.info("Waiting " + str(wait_time) + " more seconds for TM packets..")
Call in a loop to create terminal progress bar time_till_wake = time.time() + wait_time
@params: while time.time() < time_till_wake:
iteration - Required : current iteration (Int) new_packets_received += self.__handle_tm_queue(reception_dict)
total - Required : total iterations (Int) time.sleep(1)
prefix - Optional : prefix string (Str)
suffix - Optional : suffix string (Str) num_packets_received += new_packets_received
decimals - Optional : positive number of decimals in percent complete (Int) return num_packets_received
length - Optional : character length of bar (Int)
fill - Optional : bar fill character (Str) def __handle_tm_queue(self, reception_dict: dict) -> int:
print_end - Optional : end character (e.g. "\r", "\r\n") (Str) num_packets_received = 0
""" packets_received = self.tm_listener.retrieve_tm_packet_queue()
percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total))) self.tm_listener.clear_tm_packet_queue()
filled_length = int(length * iteration // total) # reception_deque.extend(packets_received)
bar = fill * filled_length + '-' * (length - filled_length) for packet_list in packets_received:
print(f'\r{prefix} |{bar}| {percent}% {suffix}', end=print_end) for packet in packet_list:
# Print New Line on Complete num_packets_received += 1
if iteration == total: if packet.get_service() == 1:
print() packet: Service1TM
tc_ssc = packet.get_tc_ssc()
acceptance_list = reception_dict.get(tc_ssc)
if acceptance_list is None:
LOGGER.warning("Invalid TC SSC with number " + str(tc_ssc) +
" detected!")
continue
if packet.get_subservice() == 1:
acceptance_list[1] = True
if packet.get_subservice() == 3:
acceptance_list[2] = True
if packet.get_subservice() == 7:
acceptance_list[3] = True
reception_dict.update({tc_ssc: acceptance_list})
elif packet.get_service() == 5:
# TODO: print event
LOGGER.info("Event received!")
else:
print_string = "Other TM packet with service ID " + str(packet.get_service()) + \
" received!"
LOGGER.info(print_string)
clear_list = []
for key, reception_list in reception_dict.items():
if reception_list[1] and reception_list[2] and reception_list[3]:
# All replies received, we can delete the entry and confirm succesfull handling
print_string = "All replies received for upload packet with SSC " + str(key) + "."
LOGGER.info(print_string)
clear_list.append(key)
elif reception_list[0] > 5:
# No replies for telecommand.
LOGGER.warning("No reply received for TC SSC " + str(key) + "!")
# Continue nonetheless
clear_list.append(key)
for key in clear_list:
del reception_dict[key]
return num_packets_received
# https://stackoverflow.com/questions/3173320/text-progress-bar-in-the-console
# Thank you Greensticks :-)
def print_progress_bar(iteration, total, prefix='', suffix='', decimals=1, length=100,
fill='', print_end="\r"):
"""
Call in a loop to create terminal progress bar
@params:
iteration - Required : current iteration (Int)
total - Required : total iterations (Int)
prefix - Optional : prefix string (Str)
suffix - Optional : suffix string (Str)
decimals - Optional : positive number of decimals in percent complete (Int)
length - Optional : character length of bar (Int)
fill - Optional : bar fill character (Str)
print_end - Optional : end character (e.g. "\r", "\r\n") (Str)
"""
percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total)))
filled_length = int(length * iteration // total)
bar = fill * filled_length + '-' * (length - filled_length)
print(f'\r{prefix} |{bar}| {percent}% {suffix}', end=print_end)
# Print New Line on Complete
if iteration == total:
print()
from enum import Enum from enum import Enum
import math import math
from config.obsw_config import SD_CARD_HANDLER_ID from config.tmtcc_config import SD_CARD_HANDLER_ID
from tmtc_core.tc.obsw_pus_tc_base import TcQueueT, PusTelecommand from tmtc_core.pus_tc.tmtcc_pus_tc_base import TcQueueT, PusTelecommand
from tc.obsw_tc_service23_sdcard import \ from pus_tc.tmtcc_tc_service23_sdcard import \
calculate_allowed_file_data_size, generate_rm_file_srv23_2_packet, \ calculate_allowed_file_data_size, generate_rm_file_srv23_2_packet, \
generate_create_file_srv23_1_packet, generate_finish_append_to_file_srv23_131_packet, \ generate_create_file_srv23_1_packet, generate_finish_append_to_file_srv23_131_packet, \
generate_lock_file_srv23_5_6_packet generate_lock_file_srv23_5_6_packet
...@@ -223,6 +223,7 @@ class FileTransferHelper: ...@@ -223,6 +223,7 @@ class FileTransferHelper:
header += data[number_of_packets * size_of_data_blocks:len(data)] header += data[number_of_packets * size_of_data_blocks:len(data)]
commands = PusTelecommand(service=23, subservice=130, ssc=init_ssc + packet_sequence_number, commands = PusTelecommand(service=23, subservice=130, ssc=init_ssc + packet_sequence_number,
app_data=header) app_data=header)
self.__current_ssc = init_ssc + packet_sequence_number
self.tc_queue.appendleft(commands.pack_command_tuple()) self.tc_queue.appendleft(commands.pack_command_tuple())
def __handle_finish_and_lock_packet_generation(self): def __handle_finish_and_lock_packet_generation(self):
...@@ -230,12 +231,14 @@ class FileTransferHelper: ...@@ -230,12 +231,14 @@ class FileTransferHelper:
last_command = generate_finish_append_to_file_srv23_131_packet( last_command = generate_finish_append_to_file_srv23_131_packet(
filename=self.target_filename, repository_path=self.target_repository, filename=self.target_filename, repository_path=self.target_repository,
ssc=self.__current_ssc, lock_file=self.__lock_file) ssc=self.__current_ssc, lock_file=self.__lock_file)
self.__current_ssc += 1
else: else:
if self.__lock_file: if self.__lock_file:
last_command = generate_lock_file_srv23_5_6_packet( last_command = generate_lock_file_srv23_5_6_packet(
filename=self.target_filename, repository_path=self.target_repository, filename=self.target_filename, repository_path=self.target_repository,
object_id=self.object_id, lock=True, ssc=self.__current_ssc) object_id=self.object_id, lock=True, ssc=self.__current_ssc)
self.__current_ssc += 1
else: else:
self.__number_of_finish_packets = 0 self.__number_of_finish_packets = 0
return return
self.tc_queue.appendleft(last_command.pack_command_tuple()) self.tc_queue.appendleft(last_command.pack_command_tuple())
\ No newline at end of file