Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • Robin.Mueller/tmtc
1 result
Show changes
......@@ -122,8 +122,8 @@ class Service3TM(PusTelemetry):
# TODO: size check. sth not right with gps 0 test
self.number_of_parameters = 9
self.hk_header = ["Fix Mode", "SV in Fix", "GNSS Week", "Time of Week", "Latitude",
"Longitude", "Mean Sea Altitude", "Position X", "Position Y", "Position Z",
"Velocity X", "Velocity Y", "Velocity Z"]
"Longitude", "Mean Sea Altitude", "Position X", "Position Y",
"Position Z", "Velocity X", "Velocity Y", "Velocity Z"]
fix_mode = self._tm_data[4]
sv_in_fix = self._tm_data[5]
gnss_week = struct.unpack('>H', self._tm_data[6:8])[0]
......
#!/usr/bin/python3
"""
@brief This client was developed by KSat for the SOURCE project to test the on-board software.
@details
This client features multiple sender/receiver modes and has been designed
to be extensible and easy to use. This clien is based on the PUS standard for the format
of telecommands and telemetry. It can also send TMTC via different interfaces like the
serial interface (USB port) or ethernet interface.
@license
Copyright 2020 KSat e.V. Stuttgart
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
@manual
Run this file with the -h flag to display options.
"""
from core.tmtc_client_core import run_tmtc_client
def main():
run_tmtc_client(False)
if __name__ == "__main__":
main()
#!/usr/bin/python3
"""
@brief This client was developed by KSat for the SOURCE project to test the on-board software.
@details
This client features multiple sender/receiver modes and has been designed
to be extensible and easy to use. This clien is based on the PUS standard for the format
of telecommands and telemetry. It can also send TMTC via different interfaces like the
serial interface (USB port) or ethernet interface.
@license
Copyright 2020 KSat e.V. Stuttgart
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
@manual
"""
from core.tmtc_client_core import run_tmtc_client
def main():
run_tmtc_client(True)
if __name__ == "__main__":
main()
\ No newline at end of file
Subproject commit 4d37769e7c8254bf7e20a016cf076ae48db31e57
Subproject commit 1e06be166b0bf47a2a921fd45eed49ea9f5782c3
import os
import struct
from glob import glob
def main():
"""
Was written to assign sixth arm vector to binary size for bootloader.
@return:
"""
# Parse for binaries
file_path = file_selector("../../_bin", '*.bin')
file_size = os.path.getsize(file_path)
print("File size: " + str(file_size))
size_uint32 = struct.pack("<I", file_size)
print("File size as uint32: " + ' '.join(map('{:02X}'.format, size_uint32)))
with open(file_path, "rb+") as file:
# Read the first seven ARM vectors. The sixth one will be replaced.
arm_vectors = file.read(4 * 7)
print("ARM vectors: ")
# print(''.join('{:02x}'.format(x) for x in arm_vectors))
for x in range(0, 28, 4):
print(' '.join(map('{:02X}'.format, arm_vectors[x:x + 4])))
file.seek(5*4)
file.write(size_uint32)
file.seek(0)
arm_vectors = file.read(4 * 7)
print("New ARM vector: ")
for x in range(0, 28, 4):
print(' '.join(map('{:02X}'.format, arm_vectors[x:x + 4])))
pass
def file_selector(path: str, pattern: str) -> str:
result = [y for x in os.walk(path) for y in glob(os.path.join(x[0], pattern))]
print("Files found in _bin folder: ")
for idx, path in enumerate(result):
print("Selection " + str(idx) + ": " + str(path))
selection = input("Please enter desired selection [c to cancel]: ")
while True:
if selection == 'c':
return ""
if not selection.isdigit():
selection = input("Invalid input, try again [c to cancel]: ")
if selection.isdigit():
if int(selection) < len(result):
return result[int(selection)]
else:
selection = input("Invalid input, try again [c to cancel]: ")
if __name__ == "__main__":
main()
......@@ -18,16 +18,23 @@ def parse_input_arguments():
arg_parser = argparse.ArgumentParser(description="TMTC Client Command Line Interface")
arg_parser.add_argument(
'-m', '--mode', type=int, help='Target Mode. Default is 1(Listener Mode), '
'0: GUI Mode, 1:Listener Mode, 2: Single Command Mode, 3: Service Test Mode, '
'4: Software Test Mode, 5: Unit Test Mode ', default=0)
'-m', '--mode', type=int, help='Target Mode. Default is 1 (Listener Mode), '
'0: GUI Mode, 1: Listener Mode, 2: Single Command Mode, 3: Service Test Mode, '
'4: Software Test Mode, 5: Binary Upload Mode, 6: Unit Test Mode ', default=0)
arg_parser.add_argument(
'-c', '--com_if', type=int, help='Communication Interface. 0: Dummy Interface, 1: Serial, '
'2: QEMU, 3: UDP', default=2)
arg_parser.add_argument(
'-o', '--op_code', help='Operation code, which is passed to the TC '
'packer functions', default=0)
arg_parser.add_argument('--clientIP', help='Client(Computer) IP. Default:\'\'', default='')
arg_parser.add_argument(
'--boardIP', help='Board IP. Default: Localhost 127.0.0.1', default="127.0.0.1")
arg_parser.add_argument('-s', '--service', help='Service to test. Default: 17', default=17)
arg_parser.add_argument(
'-l','--listener', help='Determine whether the listener mode will be active after '
'performing the operation',
action='store_false')
arg_parser.add_argument(
'-t', '--tm_timeout', type=float, help='TM Timeout when listening to verification sequence.'
' Default: 5 seconds', default=5.0)
......@@ -38,7 +45,7 @@ def parse_input_arguments():
'--np', dest='print_tm', help='Supply --np to suppress print output to console.',
action='store_false')
arg_parser.add_argument(
'-o', '--tc_timeout_factor', type=float, help='TC Timeout Factor. Multiplied with '
'--tc_timeout_factor', type=float, help='TC Timeout Factor. Multiplied with '
'TM Timeout, TC sent again after this time period. Default: 3.5', default=3.5)
arg_parser.add_argument(
'-r', '--rawDataPrint', help='Supply -r to print all raw TM data directly',
......
......@@ -6,34 +6,241 @@ This module will be used to upload binaries to the OBC via a communication port,
a supplied binary. The binary will be sent via the specified communication interface.
It will be possible to encode the data (for example using DLE encoding)
"""
import os
import time
import tkinter as tk
from tkinter import filedialog
from collections import deque
from glob import glob
from typing import Deque
from tmtc_core.comIF.obsw_com_interface import CommunicationInterface
from utility.obsw_file_transfer_helper import FileTransferHelper
import config.obsw_config as g
from tmtc_core.utility.obsw_tmtc_printer import TmTcPrinter, DisplayMode
from tmtc_core.utility.obsw_logger import get_logger
from tmtc_core.sendreceive.obsw_tm_listener import TmListener
LOGGER = get_logger()
def perform_binary_upload():
print("Please select file to upload")
root = tk.Tk()
root.withdraw()
root.wm_attributes('-topmost', 1)
file_path = filedialog.askopenfilename(parent=root)
print("File select: " + str(file_path))
calc_hamming_code = input("Calculate and send hamming code? [y/n]")
if calc_hamming_code in ['y', 'yes', 1]:
calc_hamming_code = True
print("Hamming code will be calculated and sent in tail packet")
else:
calc_hamming_code = False
print("Hamming code will not be calculated")
# Right now, the size of PUS packets is limited to 1024 bytes. Therefore, we split up the
# binary in 1000 byte packets
frame_length = g.G_MAX_BINARY_FRAME_LENGTH
class BinaryFileUploader:
def __init__(self, com_if: CommunicationInterface, tmtc_printer: TmTcPrinter,
tm_listener: TmListener):
"""
Initializes the binary file uploader with the required components.
@param com_if:
@param tmtc_printer:
@param tm_listener:
"""
self.com_if = com_if
self.tmtc_printer = tmtc_printer
self.tm_listener = tm_listener
self.iobc = False
self.send_interval = 1.0
if calc_hamming_code:
# now we calculate the hamming code
pass
def perform_file_upload(self):
gui_cl_prompt = input("GUI(0) or command line version (1)? [0/1]: ")
if gui_cl_prompt == 0:
gui_cl_prompt = True
else:
gui_cl_prompt = False
print("Please select file to upload: ")
file_path = ""
if gui_cl_prompt:
root = tk.Tk()
root.withdraw()
root.wm_attributes('-topmost', 1)
file_path = filedialog.askopenfilename(parent=root)
print("File select: " + str(file_path))
if file_path == ():
LOGGER.warning("Invalid file path, exiting binary upload mode.")
return
else:
result = [y for x in os.walk("../_bin") for y in glob(os.path.join(x[0], '*.bin'))]
print("Files found in _bin folder: ")
for idx, path in enumerate(result):
print("Selection " + str(idx) + ": " + str(path))
select_valid = False
selection = input("Please enter desired selection [c to cancel]: ")
while not select_valid:
if selection == 'c':
print("Exiting binary upload mode..")
return
if not selection.isdigit():
selection = input("Invalid input, try again [c to cancel]: ")
if selection.isdigit():
if int(selection) < len(result):
file_path = result[int(selection)]
select_valid = True
else:
selection = input("Invalid input, try again [c to cancel]: ")
# We have to split the binary here first
print_string = file_path.rsplit(os.path.sep, 1)[-1] + " was selected."
LOGGER.info(print_string)
calc_hamming_code = input("Calculate and send hamming code? [y/n]: ")
if calc_hamming_code in ['y', 'yes', 1]:
calc_hamming_code = True
print("Hamming code will be calculated and sent in tail packet")
else:
calc_hamming_code = False
print("Hamming code will not be calculated")
iobc_prompt = input("iOBC? [y/n]: ")
if iobc_prompt in ['y', 'yes', 1]:
self.iobc = True
self.send_interval = 0.8
iobc_prompt = True
else:
self.iobc = False
self.send_interval = 0.6
iobc_prompt = False
bootloader_prompt = input("Bootloader (0) or Software Image (1)? [0/1]: ")
if str(bootloader_prompt) == "0":
bootloader_prompt = True
else:
bootloader_prompt = False
prompt_lock = input("Lock file with last packet? [y/n]: ")
if prompt_lock in ['n', "no", 0]:
prompt_lock = False
else:
prompt_lock = True
if bootloader_prompt:
file_name = "bl.bin"
else:
file_name = "obsw_up.bin"
if iobc_prompt:
if bootloader_prompt:
repository_name = "BIN/IOBC/BL"
else:
repository_name = "BIN/IOBC/OBSW"
else:
if bootloader_prompt:
repository_name = "BIN/AT91/BL"
else:
repository_name = "BIN/AT91/OBSW"
if calc_hamming_code:
pass
# Right now, the size of PUS packets is limited to 1500 bytes which also limits the app
# data length.
frame_length = g.G_MAX_APP_DATA_LENGTH
if calc_hamming_code:
# now we calculate the hamming code
pass
tc_queue = deque()
# Delete existing binary file first, otherwise data might be appended to otherwise
# valid file which already exists.
file_transfer_helper = FileTransferHelper(
tc_queue=tc_queue, max_size_of_app_data=frame_length, target_repository=repository_name,
target_filename=file_name)
init_ssc = 0
self.tmtc_printer.set_display_mode(DisplayMode.SHORT)
# Configure file transfer helper
file_transfer_helper.set_data_from_file(file_path)
file_transfer_helper.set_to_delete_old_file()
if prompt_lock:
file_transfer_helper.set_to_lock_file(prompt_lock)
else:
file_transfer_helper.set_to_lock_file(prompt_lock)
# Generate the packets.
file_transfer_helper.generate_packets(init_ssc)
self.tm_listener.set_listener_mode(TmListener.ListenerModes.MANUAL)
print_string = "BinaryUploader: Detected file size: " + str(
file_transfer_helper.file_size())
LOGGER.info(print_string)
total_num_packets = file_transfer_helper.get_number_of_packets_generated()
print_string = "BinaryUploader: " + str(total_num_packets) + \
" packets generated."
if prompt_lock:
print_string += " File will be locked."
else:
print_string += " File will not be locked."
LOGGER.info(print_string)
reception_deque = deque()
self.__perform_send_algorithm(tc_queue, total_num_packets, reception_deque)
print_string = "BinaryUploader: All binary packets were sent!"
LOGGER.info(print_string)
print_string = str(reception_deque.__len__()) + " replies received."
LOGGER.info(print_string)
time.sleep(15)
reception_deque.extend(self.tm_listener.retrieve_tm_packet_queue())
for tm_list in reception_deque:
for tm_packet in tm_list:
if tm_packet.get_service() == 23 and tm_packet.get_subservice() == 132:
# tmtc_printer.print_telemetry(tm_packet)
pass
self.tm_listener.clear_tm_packet_queue()
LOGGER.info("Transitioning back to listener mode..")
def __perform_send_algorithm(self, tc_queue: Deque, number_of_packets: int, reception_deque:
Deque):
last_check = time.time()
last_sent = time.time()
total_time = self.send_interval * number_of_packets
idx = 1
while tc_queue:
next_send = last_sent + self.send_interval
(tc_packet, tc_info) = tc_queue.pop()
if not isinstance(tc_packet, str):
# print_string = "Sending packet " + str(idx) + ".."
# LOGGER.info(print_string)
idx += 1
self.com_if.send_telecommand(tc_packet, tc_info)
self.tmtc_printer.print_telecommand(tc_packet, tc_info)
elif tc_packet == "print":
LOGGER.info(tc_info)
remaining_time_string = "Remaining time: " + \
str(round(total_time - (idx - 2) * self.send_interval, 2)) + \
" seconds"
print_progress_bar(idx - 2, number_of_packets, print_end="\n",
suffix=remaining_time_string)
# sys.stdout.write("\033[F") # Cursor up one line
packets_received = self.tm_listener.retrieve_tm_packet_queue()
reception_deque.extend(packets_received)
# Every 5 seconds, check whether any reply has been received. If not, cancel operation.
if time.time() - last_check > 5.0 and len(packets_received) == 0:
LOGGER.warning("No replies are being received, cancelling upload operation..")
time_to_sleep = next_send - time.time()
last_sent = next_send
time.sleep(time_to_sleep)
# https://stackoverflow.com/questions/3173320/text-progress-bar-in-the-console
# Thank you Greensticks :-)
def print_progress_bar(iteration, total, prefix='', suffix='', decimals=1, length=100,
fill='', print_end="\r"):
"""
Call in a loop to create terminal progress bar
@params:
iteration - Required : current iteration (Int)
total - Required : total iterations (Int)
prefix - Optional : prefix string (Str)
suffix - Optional : suffix string (Str)
decimals - Optional : positive number of decimals in percent complete (Int)
length - Optional : character length of bar (Int)
fill - Optional : bar fill character (Str)
print_end - Optional : end character (e.g. "\r", "\r\n") (Str)
"""
percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total)))
filled_length = int(length * iteration // total)
bar = fill * filled_length + '-' * (length - filled_length)
print(f'\r{prefix} |{bar}| {percent}% {suffix}', end=print_end)
# Print New Line on Complete
if iteration == total:
print()
from enum import Enum
import math
from config.obsw_config import SD_CARD_HANDLER_ID
from tmtc_core.tc.obsw_pus_tc_base import TcQueueT, PusTelecommand
from tc.obsw_tc_service23_sdcard import \
calculate_allowed_file_data_size, generate_rm_file_srv23_2_packet, \
generate_create_file_srv23_1_packet, generate_finish_append_to_file_srv23_131_packet, \
generate_lock_file_srv23_5_6_packet
class FileTransferHelper:
"""
This helper class fills the provided TC queue with appropriate PUS telecommands
to transfer a file.
There are three modes which determine which telecommands will be generated:
1. NORMAL: Generate telecommand to create a new file and append data packets if
the file data is too large. This will be the default mode.
2. DELETE_OLD: Generate telecommand to delete old file and then perform same steps as the
normal mode
3. RENAME_OLD: Rename old file and then perform same steps as in normal mode.
Please note that the setter functions set_data have to be used to assign data, otherwise
an empty file will be created. The mode is set with setter commands as well.
"""
class TransferMode(Enum):
# Normal mode
NORMAL = 1
# Generate a command to delete the old file first
DELETE_OLD = 2
# Generate a command to rename the old file first.
RENAME_OLD = 3
def __init__(self, tc_queue: TcQueueT, max_size_of_app_data: int,
target_repository: str, target_filename: str,
object_id=SD_CARD_HANDLER_ID):
"""
@param tc_queue: TC queue which will be filled
@param max_size_of_app_data: Maximum allowed app data size. Number of generated packets
will depend on this value
@param target_repository: Repository path on target.
@param target_filename: Filename on target
@param object_id:
"""
self.object_id = object_id
self.max_size_of_app_data = max_size_of_app_data
self.allowed_file_data_size = calculate_allowed_file_data_size(
max_size_of_app_data, target_filename, target_repository)
self.target_filename = target_filename
self.target_repository = target_repository
self.tc_queue = tc_queue
self.__transfer_mode = self.TransferMode.NORMAL
self.__max_file_data_size = 0
self.__renamed_name = self.target_filename + "old"
self.__large_file = False
self.__number_of_packets = 0
self.__number_of_append_packets = 0
self.__number_of_create_packets = 1
self.__number_of_delete_packets = 0
self.__number_of_finish_packets = 1
self.__current_ssc = 0
self.__lock_file = True
self.__local_filename = ""
self.__file_data = bytearray()
# This will generate a telecommand to delete the old file, if it exists
self.delete_old_file = False
# This will generater a telecommand to rename the old file, if it exists
self.rename_old_file = False
def set_data_from_file(self, local_filename: str):
with open(local_filename, 'rb') as file:
self.__file_data = file.read()
def set_data_raw(self, tc_data: bytearray):
self.__file_data = tc_data
def set_to_delete_old_file(self):
self.__transfer_mode = self.TransferMode.DELETE_OLD
def set_to_rename_old_file(self, renamed_name: str):
self.__transfer_mode = self.TransferMode.RENAME_OLD
self.__renamed_name = renamed_name
def set_to_lock_file(self, lock_file: bool):
"""
Command will be sent to lock file after succesfull transfer
@param lock_file:
@return:
"""
self.__lock_file = lock_file
def get_number_of_packets_generated(self):
return self.__number_of_packets
def set_max_file_data_size(self, max_file_data_size: int):
"""
If this value is specified and the source file is large (larger than the maximum allowed
app data!), the file data size will be set to this value.
@param max_file_data_size:
@return:
"""
self.__max_file_data_size = max_file_data_size
def file_size(self):
return len(self.__file_data)
def generate_packets(self, ssc: int):
"""
Main function to generate all packets and fill them into the provided deque.
@param ssc:
@return:
"""
self.__current_ssc = ssc
self.__handle_delete_packet_generation()
if self.__transfer_mode == self.TransferMode.RENAME_OLD:
# not implemented yet
pass
self.__handle_create_file_packet_generation()
self.__handle_finish_and_lock_packet_generation()
self.__number_of_packets = \
self.__number_of_create_packets + self.__number_of_append_packets + \
self.__number_of_delete_packets + self.__number_of_finish_packets
def __handle_delete_packet_generation(self):
if self.__transfer_mode == self.TransferMode.DELETE_OLD:
command = generate_rm_file_srv23_2_packet(
filename=self.target_filename, repository_path=self.target_repository,
ssc=self.__current_ssc, object_id=self.object_id)
self.__number_of_delete_packets = 1
self.__current_ssc += 1
self.tc_queue.appendleft(command.pack_command_tuple())
def __handle_create_file_packet_generation(self):
if len(self.__file_data) > self.allowed_file_data_size:
# Large file, create file with init_data
if self.__max_file_data_size > 0:
init_data = self.__file_data[0:self.__max_file_data_size]
else:
init_data = self.__file_data[0:self.allowed_file_data_size]
self.__large_file = True
else:
init_data = self.__file_data
# Create file.
command = generate_create_file_srv23_1_packet(
self.target_filename, self.target_repository, ssc=self.__current_ssc,
max_size_of_app_data=self.max_size_of_app_data, initial_data=init_data)
self.__current_ssc += 1
self.tc_queue.appendleft(command.pack_command_tuple())
if not self.__large_file:
return
rest_of_data = self.__file_data[self.allowed_file_data_size:]
# Generate the rest of the packets to write to large file
if self.__max_file_data_size > 0:
self.__generate_append_to_file_packets_automatically(
data=rest_of_data, target_repository=self.target_repository,
target_filename=self.target_filename, size_of_data_blocks=self.__max_file_data_size,
init_ssc=self.__current_ssc)
else:
self.__generate_append_to_file_packets_automatically(
data=rest_of_data, target_repository=self.target_repository,
target_filename=self.target_filename, size_of_data_blocks=self.max_size_of_app_data,
init_ssc=self.__current_ssc)
self.__current_ssc += 1
def __generate_append_to_file_packets_automatically(
self, data: bytearray, target_repository: str, target_filename: str,
size_of_data_blocks: int, init_ssc: int):
"""
This function generates PUS packets which is used to write data in a file.
A new file will be created if not already existing. If the file already exists, this might
lead to
If the file data is larger than the maximum allowed size of application data, this function
will split the data into multiple packets and increment the initial SSC number by one for
each packet.
@param data: Data which will be split up.
@param init_ssc: First SSC, which will be incremented for each packet.
"""
header = bytearray(self.object_id)
header += target_repository.encode('utf-8')
# Add string terminator of repository path
header.append(0)
header += target_filename.encode('utf-8')
# Add string terminator of filename
header.append(0)
self.__split_large_file(header, size_of_data_blocks, data, init_ssc)
def __split_large_file(self, header: bytearray, size_of_data_blocks: int,
data: bytearray, init_ssc: int):
"""
This function splits a large file in multiple packets and packs the generated packets
into the member deque. This is necessary because the packet size is limited.
@param header: Repository and file name which will always stay the same
@param size_of_data_blocks: The file data blocks will have this size
@param data: The data to pack in multiple packets
@param init_ssc: The ssc of the first command, will be incremented by one for each packet.
"""
number_of_packets = math.floor(len(data) / size_of_data_blocks)
packet_sequence_number = 0
for i in range(number_of_packets):
header.append(packet_sequence_number >> 8)
header.append(0xFF & packet_sequence_number)
header += data[i * size_of_data_blocks:(i + 1) * size_of_data_blocks]
commands = PusTelecommand(service=23, subservice=130, ssc=init_ssc + i,
app_data=header)
self.tc_queue.appendleft(commands.pack_command_tuple())
# Remove everything except the header
header = header[:len(header) - size_of_data_blocks - 2]
packet_sequence_number = packet_sequence_number + 1
# Last packet will be subservice 131 to finish the append operation
header.append(packet_sequence_number >> 8)
header.append(0xFF & packet_sequence_number)
self.__number_of_append_packets += number_of_packets
header += data[number_of_packets * size_of_data_blocks:len(data)]
commands = PusTelecommand(service=23, subservice=130, ssc=init_ssc + packet_sequence_number,
app_data=header)
self.tc_queue.appendleft(commands.pack_command_tuple())
def __handle_finish_and_lock_packet_generation(self):
if self.__large_file:
last_command = generate_finish_append_to_file_srv23_131_packet(
filename=self.target_filename, repository_path=self.target_repository,
ssc=self.__current_ssc, lock_file=self.__lock_file)
else:
if self.__lock_file:
last_command = generate_lock_file_srv23_5_6_packet(
filename=self.target_filename, repository_path=self.target_repository,
object_id=self.object_id, lock=True, ssc=self.__current_ssc)
else:
self.__number_of_finish_packets = 0
return
self.tc_queue.appendleft(last_command.pack_command_tuple())
\ No newline at end of file