Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • Robin.Mueller/tmtc
1 result
Show changes
# -*- coding: utf-8 -*-
"""
Program: obsw_tm_service_5.py
Date: 30.12.2019
Description: Deserialize PUS Event Report
Author: R. Mueller
"""
from tmtc_core.tm.obsw_pus_tm_base import PusTelemetry, TmDictionaryKeys
from tmtc_core.tm.obsw_pus_tm_factory import PusTmInfoT
import struct
class Service5TM(PusTelemetry):
def __init__(self, byte_array):
super().__init__(byte_array)
self.specify_packet_info("Event")
if self.get_subservice() == 1:
self.append_packet_info(" Info")
elif self.get_subservice() == 2:
self.append_packet_info(" Error Low Severity")
elif self.get_subservice() == 3:
self.append_packet_info(" Error Med Severity")
elif self.get_subservice() == 4:
self.append_packet_info(" Error High Severity")
self.eventId = struct.unpack('>H', self._tm_data[0:2])[0]
self.objectId = struct.unpack('>I', self._tm_data[2:6])[0]
self.param1 = struct.unpack('>I', self._tm_data[6:10])[0]
self.param2 = struct.unpack('>I', self._tm_data[10:14])[0]
def append_telemetry_content(self, array):
super().append_telemetry_content(array)
array.append(str(self.eventId))
array.append(hex(self.objectId))
array.append(str(hex(self.param1)) + ", " + str(self.param1))
array.append(str(hex(self.param2)) + ", " + str(self.param2))
def append_telemetry_column_headers(self, array):
super().append_telemetry_column_headers(array)
array.append("Event ID")
array.append("Reporter ID")
array.append("Parameter 1")
array.append("Parameter 2")
def pack_tm_information(self) -> PusTmInfoT:
tm_information = super().pack_tm_information()
add_information = {
TmDictionaryKeys.REPORTER_ID: self.objectId,
TmDictionaryKeys.EVENT_ID: self.eventId,
TmDictionaryKeys.EVENT_PARAM_1: self.param1,
TmDictionaryKeys.EVENT_PARAM_2: self.param2
}
tm_information.update(add_information)
return tm_information
#!/usr/bin/python3 #!/usr/bin/python3
""" """
@brief This client was developed by KSat for the SOURCE project to test the on-board software. @brief TMTC Commander entry point for command line mode.
@details @details
This client features multiple sender/receiver modes and has been designed This client was developed by KSat for the SOURCE project to test the on-board software but
to be extensible and easy to use. This clien is based on the PUS standard for the format has evolved into a more generic tool for satellite developers to perform TMTC (Telemetry and Telecommand)
of telecommands and telemetry. It can also send TMTC via different interfaces like the handling and testing via different communication interfaces. Currently, only the PUS standard is
serial interface (USB port) or ethernet interface. implemented as a packet standard.
Run this file with the -h flag to display options.
@license @license
Copyright 2020 KSat e.V. Stuttgart Copyright 2020 KSat e.V. Stuttgart
...@@ -22,13 +24,14 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ...@@ -22,13 +24,14 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
@manual @author R. Mueller
Run this file with the -h flag to display options.
""" """
from core.tmtc_client_core import run_tmtc_client from core.tmtc_client_core import run_tmtc_client
def main(): def main():
run_tmtc_client(False) run_tmtc_client(False)
if __name__ == "__main__": if __name__ == "__main__":
main() main()
#!/usr/bin/python3 #!/usr/bin/python3
""" """
@brief This client was developed by KSat for the SOURCE project to test the on-board software. @brief TMTC Commander entry point for GUI mode.
@details @details
This client features multiple sender/receiver modes and has been designed This client was developed by KSat for the SOURCE project to test the on-board software but
to be extensible and easy to use. This clien is based on the PUS standard for the format has evolved into a more generic tool for satellite developers to perform TMTC (Telemetry and Telecommand)
of telecommands and telemetry. It can also send TMTC via different interfaces like the handling and testing via different communication interfaces. Currently, only the PUS standard is
serial interface (USB port) or ethernet interface. implemented as a packet standard.
Run this file with the -h flag to display options.
@license @license
Copyright 2020 KSat e.V. Stuttgart Copyright 2020 KSat e.V. Stuttgart
...@@ -22,12 +24,14 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ...@@ -22,12 +24,14 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
@manual @author R. Mueller
""" """
from core.tmtc_client_core import run_tmtc_client from core.tmtc_client_core import run_tmtc_client
def main(): def main():
run_tmtc_client(True) run_tmtc_client(True)
if __name__ == "__main__": if __name__ == "__main__":
main() main()
\ No newline at end of file
Subproject commit 1e06be166b0bf47a2a921fd45eed49ea9f5782c3 Subproject commit 3749440b9f1051b9d022f00012f1da417759fe32
#!/usr/bin/python3.8 #!/usr/bin/python3.8
""" """
Argument parser module. Argument parser module.
""" """
import argparse import argparse
import sys import sys
from tmtc_core.utility.obsw_logger import get_logger from tmtc_core.utility.obsw_logger import get_logger
LOGGER = get_logger() LOGGER = get_logger()
def parse_input_arguments(): def parse_input_arguments():
""" """
Parses all input arguments Parses all input arguments
:return: Input arguments contained in a special namespace and accessable by args.<variable> :return: Input arguments contained in a special namespace and accessable by args.<variable>
""" """
arg_parser = argparse.ArgumentParser(description="TMTC Client Command Line Interface") arg_parser = argparse.ArgumentParser(description="TMTC Client Command Line Interface")
arg_parser.add_argument( arg_parser.add_argument(
'-m', '--mode', type=int, help='Target Mode. Default is 1 (Listener Mode), ' '-m', '--mode', type=int, help='Target Mode. Default is 1 (Listener Mode), '
'0: GUI Mode, 1: Listener Mode, 2: Single Command Mode, 3: Service Test Mode, ' '1: Listener Mode, 2: Single Command Mode, 3: Service Test Mode, '
'4: Software Test Mode, 5: Binary Upload Mode, 6: Unit Test Mode ', default=0) '4: Software Test Mode, 5: Binary Upload Mode, 6: Unit Test Mode ', default=0)
arg_parser.add_argument( arg_parser.add_argument(
'-c', '--com_if', type=int, help='Communication Interface. 0: Dummy Interface, 1: Serial, ' '-c', '--com_if', type=int, help='Communication Interface. 0: Dummy Interface, 1: Serial, '
'2: QEMU, 3: UDP', default=2) '2: QEMU, 3: UDP', default=2)
arg_parser.add_argument( arg_parser.add_argument(
'-o', '--op_code', help='Operation code, which is passed to the TC ' '-o', '--op_code', help='Operation code, which is passed to the TC '
'packer functions', default=0) 'packer functions', default=0)
arg_parser.add_argument('--clientIP', help='Client(Computer) IP. Default:\'\'', default='') arg_parser.add_argument('--clientIP', help='Client(Computer) IP. Default:\'\'', default='')
arg_parser.add_argument( arg_parser.add_argument(
'--boardIP', help='Board IP. Default: Localhost 127.0.0.1', default="127.0.0.1") '--boardIP', help='Board IP. Default: Localhost 127.0.0.1', default="127.0.0.1")
arg_parser.add_argument('-s', '--service', help='Service to test. Default: 17', default=17) arg_parser.add_argument('-s', '--service', help='Service to test. Default: 17', default=17)
arg_parser.add_argument( arg_parser.add_argument(
'-l','--listener', help='Determine whether the listener mode will be active after ' '-l', '--listener', help='Determine whether the listener mode will be active '
'performing the operation', 'after performing the operation',
action='store_false') action='store_false')
arg_parser.add_argument( arg_parser.add_argument(
'-t', '--tm_timeout', type=float, help='TM Timeout when listening to verification sequence.' '-t', '--tm_timeout', type=float, help='TM Timeout when listening to verification sequence.'
' Default: 5 seconds', default=5.0) ' Default: 5 seconds', default=5.0)
arg_parser.add_argument( arg_parser.add_argument(
'--nl', dest='print_log', help='Supply --nl to suppress print output to log files.', '--nl', dest='print_log', help='Supply --nl to suppress print output to log files.',
action='store_false') action='store_false')
arg_parser.add_argument( arg_parser.add_argument(
'--np', dest='print_tm', help='Supply --np to suppress print output to console.', '--np', dest='print_tm', help='Supply --np to suppress print output to console.',
action='store_false') action='store_false')
arg_parser.add_argument( arg_parser.add_argument(
'--tc_timeout_factor', type=float, help='TC Timeout Factor. Multiplied with ' '--tc_timeout_factor', type=float, help='TC Timeout Factor. Multiplied with '
'TM Timeout, TC sent again after this time period. Default: 3.5', default=3.5) 'TM Timeout, TC sent again after this time period. Default: 3.5', default=3.5)
arg_parser.add_argument( arg_parser.add_argument(
'-r', '--rawDataPrint', help='Supply -r to print all raw TM data directly', '-r', '--rawDataPrint', help='Supply -r to print all raw TM data directly',
action='store_true') action='store_true')
arg_parser.add_argument( arg_parser.add_argument(
'-d', '--shortDisplayMode', help='Supply -d to print short output', action='store_true') '-d', '--shortDisplayMode', help='Supply -d to print short output', action='store_true')
arg_parser.add_argument( arg_parser.add_argument(
'--hk', dest='print_hk', help='Supply -k or --hk to print HK data', action='store_true') '--hk', dest='print_hk', help='Supply -k or --hk to print HK data', action='store_true')
arg_parser.add_argument('--COM', dest="com_port", help='COM Port for serial communication') arg_parser.add_argument(
arg_parser.add_argument( '--rs', dest="resend_tc", help='Specify whether TCs are sent again after timeout',
'--rs', dest="resend_tc", help='Specify whether TCs are sent again after timeout', action='store_true')
action='store_true')
if len(sys.argv) == 1:
if len(sys.argv) == 1: print("No Input Arguments specified.")
print("No Input Arguments specified.") arg_parser.print_help()
arg_parser.print_help() args, unknown = arg_parser.parse_known_args()
args, unknown = arg_parser.parse_known_args() # for argument in vars(args):
# for argument in vars(args): # LOGGER.debug(argument + ": " + str(getattr(args, argument)))
# LOGGER.debug(argument + ": " + str(getattr(args, argument))) handle_args(args, unknown)
handle_args(args, unknown)
return args return args
def handle_args(args, unknown: list) -> None: def handle_args(args, unknown: list) -> None:
""" """
Handles the parsed arguments. Handles the parsed arguments.
:param args: Namespace objects :param args: Namespace objects
(see https://docs.python.org/dev/library/argparse.html#argparse.Namespace) (see https://docs.python.org/dev/library/argparse.html#argparse.Namespace)
:param unknown: List of unknown parameters. :param unknown: List of unknown parameters.
:return: None :return: None
""" """
if len(unknown) > 0: if len(unknown) > 0:
print("Unknown arguments detected: " + str(unknown)) print("Unknown arguments detected: " + str(unknown))
if len(sys.argv) > 1: if len(sys.argv) > 1:
handle_unspecified_args(args) handle_unspecified_args(args)
if len(sys.argv) == 1: if len(sys.argv) == 1:
handle_empty_args(args) handle_empty_args(args)
def handle_unspecified_args(args) -> None: def handle_unspecified_args(args) -> None:
""" """
If some arguments are unspecified, they are set here with (variable) default values. If some arguments are unspecified, they are set here with (variable) default values.
:param args: :param args:
:return: None :return: None
""" """
if args.com_if == 1 and args.tm_timeout is None: if args.com_if == 1 and args.tm_timeout is None:
args.tm_timeout = 6.0 args.tm_timeout = 6.0
if args.mode is None: if args.mode is None:
print("No mode specified with -m Parameter.") print("No mode specified with -m Parameter.")
print("Possible Modes: ") print("Possible Modes: ")
print("1: Listener Mode") print("1: Listener Mode")
print("2: Single Command Mode with manual command") print("2: Single Command Mode with manual command")
print("3: Service Mode, Commands specified in tc folder") print("3: Service Mode, Commands specified in pus_tc folder")
print("4: Software Mode, runs all command specified in obsw_pus_tc_packer.py") print("4: Software Mode, runs all command specified in tmtcc_pus_tc_packer.py")
print("5: Unit Test, runs unit test specified in obsw_module_test.py") print("5: Unit Test, runs unit test specified in obsw_module_test.py")
args.mode = input("Please enter Mode: ") args.mode = input("Please enter Mode: ")
if args.mode == 1 and args.service is None: if args.mode == 1 and args.service is None:
args.service = input("No Service specified for Service Mode. " args.service = input("No Service specified for Service Mode. "
"Please enter PUS G_SERVICE number: ") "Please enter PUS G_SERVICE number: ")
def handle_empty_args(args) -> None: def handle_empty_args(args) -> None:
""" """
If no args were supplied, request input from user directly. If no args were supplied, request input from user directly.
TODO: This still needs to be extended. TODO: This still needs to be extended.
:param args: :param args:
:return: :return:
""" """
print_hk = input("Print HK packets ? (y/n or yes/no)") print_hk = input("Print HK packets ? (y/n or yes/no)")
try: try:
print_hk = print_hk.lower() print_hk = print_hk.lower()
except TypeError: except TypeError:
pass pass
if print_hk in ('y', 'yes', 1): if print_hk in ('y', 'yes', 1):
args.print_hk = True args.print_hk = True
else: else:
args.print_hk = False args.print_hk = False
print_to_log = input("Export G_SERVICE test output to log files ? (y/n or yes/no)") print_to_log = input("Export G_SERVICE test output to log files ? (y/n or yes/no)")
try: try:
print_to_log = print_to_log.lower() print_to_log = print_to_log.lower()
except TypeError: except TypeError:
pass pass
if print_to_log in ('n', 'no', 0): if print_to_log in ('n', 'no', 0):
args.printFile = False args.printFile = False
else: else:
args.printFile = True args.printFile = True
from enum import Enum from enum import Enum
import math import math
from config.obsw_config import SD_CARD_HANDLER_ID from config.tmtcc_config import SD_CARD_HANDLER_ID
from tmtc_core.tc.obsw_pus_tc_base import TcQueueT, PusTelecommand from tmtc_core.pus_tc.tmtcc_pus_tc_base import TcQueueT, PusTelecommand
from tc.obsw_tc_service23_sdcard import \ from pus_tc.tmtcc_tc_service23_sdcard import \
calculate_allowed_file_data_size, generate_rm_file_srv23_2_packet, \ calculate_allowed_file_data_size, generate_rm_file_srv23_2_packet, \
generate_create_file_srv23_1_packet, generate_finish_append_to_file_srv23_131_packet, \ generate_create_file_srv23_1_packet, generate_finish_append_to_file_srv23_131_packet, \
generate_lock_file_srv23_5_6_packet generate_lock_file_srv23_5_6_packet
...@@ -223,6 +223,7 @@ class FileTransferHelper: ...@@ -223,6 +223,7 @@ class FileTransferHelper:
header += data[number_of_packets * size_of_data_blocks:len(data)] header += data[number_of_packets * size_of_data_blocks:len(data)]
commands = PusTelecommand(service=23, subservice=130, ssc=init_ssc + packet_sequence_number, commands = PusTelecommand(service=23, subservice=130, ssc=init_ssc + packet_sequence_number,
app_data=header) app_data=header)
self.__current_ssc = init_ssc + packet_sequence_number
self.tc_queue.appendleft(commands.pack_command_tuple()) self.tc_queue.appendleft(commands.pack_command_tuple())
def __handle_finish_and_lock_packet_generation(self): def __handle_finish_and_lock_packet_generation(self):
...@@ -230,12 +231,14 @@ class FileTransferHelper: ...@@ -230,12 +231,14 @@ class FileTransferHelper:
last_command = generate_finish_append_to_file_srv23_131_packet( last_command = generate_finish_append_to_file_srv23_131_packet(
filename=self.target_filename, repository_path=self.target_repository, filename=self.target_filename, repository_path=self.target_repository,
ssc=self.__current_ssc, lock_file=self.__lock_file) ssc=self.__current_ssc, lock_file=self.__lock_file)
self.__current_ssc += 1
else: else:
if self.__lock_file: if self.__lock_file:
last_command = generate_lock_file_srv23_5_6_packet( last_command = generate_lock_file_srv23_5_6_packet(
filename=self.target_filename, repository_path=self.target_repository, filename=self.target_filename, repository_path=self.target_repository,
object_id=self.object_id, lock=True, ssc=self.__current_ssc) object_id=self.object_id, lock=True, ssc=self.__current_ssc)
self.__current_ssc += 1
else: else:
self.__number_of_finish_packets = 0 self.__number_of_finish_packets = 0
return return
self.tc_queue.appendleft(last_command.pack_command_tuple()) self.tc_queue.appendleft(last_command.pack_command_tuple())
\ No newline at end of file