Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • Robin.Mueller/tmtc
1 result
Show changes
# -*- coding: utf-8 -*-
"""
Program: obsw_tm_service_5.py
Date: 30.12.2019
Description: Deserialize PUS Event Report
Author: R. Mueller
"""
from tmtc_core.tm.obsw_pus_tm_base import PusTelemetry, TmDictionaryKeys
from tmtc_core.tm.obsw_pus_tm_factory import PusTmInfoT
import struct
class Service5TM(PusTelemetry):
def __init__(self, byte_array):
super().__init__(byte_array)
self.specify_packet_info("Event")
if self.get_subservice() == 1:
self.append_packet_info(" Info")
elif self.get_subservice() == 2:
self.append_packet_info(" Error Low Severity")
elif self.get_subservice() == 3:
self.append_packet_info(" Error Med Severity")
elif self.get_subservice() == 4:
self.append_packet_info(" Error High Severity")
self.eventId = struct.unpack('>H', self._tm_data[0:2])[0]
self.objectId = struct.unpack('>I', self._tm_data[2:6])[0]
self.param1 = struct.unpack('>I', self._tm_data[6:10])[0]
self.param2 = struct.unpack('>I', self._tm_data[10:14])[0]
def append_telemetry_content(self, array):
super().append_telemetry_content(array)
array.append(str(self.eventId))
array.append(hex(self.objectId))
array.append(str(hex(self.param1)) + ", " + str(self.param1))
array.append(str(hex(self.param2)) + ", " + str(self.param2))
def append_telemetry_column_headers(self, array):
super().append_telemetry_column_headers(array)
array.append("Event ID")
array.append("Reporter ID")
array.append("Parameter 1")
array.append("Parameter 2")
def pack_tm_information(self) -> PusTmInfoT:
tm_information = super().pack_tm_information()
add_information = {
TmDictionaryKeys.REPORTER_ID: self.objectId,
TmDictionaryKeys.EVENT_ID: self.eventId,
TmDictionaryKeys.EVENT_PARAM_1: self.param1,
TmDictionaryKeys.EVENT_PARAM_2: self.param2
}
tm_information.update(add_information)
return tm_information
#!/usr/bin/python3
"""
@brief This client was developed by KSat for the SOURCE project to test the on-board software.
@brief TMTC Commander entry point for command line mode.
@details
This client features multiple sender/receiver modes and has been designed
to be extensible and easy to use. This clien is based on the PUS standard for the format
of telecommands and telemetry. It can also send TMTC via different interfaces like the
serial interface (USB port) or ethernet interface.
This client was developed by KSat for the SOURCE project to test the on-board software but
has evolved into a more generic tool for satellite developers to perform TMTC (Telemetry and Telecommand)
handling and testing via different communication interfaces. Currently, only the PUS standard is
implemented as a packet standard.
Run this file with the -h flag to display options.
@license
Copyright 2020 KSat e.V. Stuttgart
......@@ -22,13 +24,14 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
@manual
Run this file with the -h flag to display options.
@author R. Mueller
"""
from core.tmtc_client_core import run_tmtc_client
def main():
run_tmtc_client(False)
if __name__ == "__main__":
main()
#!/usr/bin/python3
"""
@brief This client was developed by KSat for the SOURCE project to test the on-board software.
@brief TMTC Commander entry point for GUI mode.
@details
This client features multiple sender/receiver modes and has been designed
to be extensible and easy to use. This clien is based on the PUS standard for the format
of telecommands and telemetry. It can also send TMTC via different interfaces like the
serial interface (USB port) or ethernet interface.
This client was developed by KSat for the SOURCE project to test the on-board software but
has evolved into a more generic tool for satellite developers to perform TMTC (Telemetry and Telecommand)
handling and testing via different communication interfaces. Currently, only the PUS standard is
implemented as a packet standard.
Run this file with the -h flag to display options.
@license
Copyright 2020 KSat e.V. Stuttgart
......@@ -22,12 +24,14 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
@manual
@author R. Mueller
"""
from core.tmtc_client_core import run_tmtc_client
def main():
run_tmtc_client(True)
if __name__ == "__main__":
main()
\ No newline at end of file
main()
Subproject commit 1e06be166b0bf47a2a921fd45eed49ea9f5782c3
Subproject commit 3749440b9f1051b9d022f00012f1da417759fe32
#!/usr/bin/python3.8
"""
Argument parser module.
"""
import argparse
import sys
from tmtc_core.utility.obsw_logger import get_logger
LOGGER = get_logger()
def parse_input_arguments():
"""
Parses all input arguments
:return: Input arguments contained in a special namespace and accessable by args.<variable>
"""
arg_parser = argparse.ArgumentParser(description="TMTC Client Command Line Interface")
arg_parser.add_argument(
'-m', '--mode', type=int, help='Target Mode. Default is 1 (Listener Mode), '
'0: GUI Mode, 1: Listener Mode, 2: Single Command Mode, 3: Service Test Mode, '
'4: Software Test Mode, 5: Binary Upload Mode, 6: Unit Test Mode ', default=0)
arg_parser.add_argument(
'-c', '--com_if', type=int, help='Communication Interface. 0: Dummy Interface, 1: Serial, '
'2: QEMU, 3: UDP', default=2)
arg_parser.add_argument(
'-o', '--op_code', help='Operation code, which is passed to the TC '
'packer functions', default=0)
arg_parser.add_argument('--clientIP', help='Client(Computer) IP. Default:\'\'', default='')
arg_parser.add_argument(
'--boardIP', help='Board IP. Default: Localhost 127.0.0.1', default="127.0.0.1")
arg_parser.add_argument('-s', '--service', help='Service to test. Default: 17', default=17)
arg_parser.add_argument(
'-l','--listener', help='Determine whether the listener mode will be active after '
'performing the operation',
action='store_false')
arg_parser.add_argument(
'-t', '--tm_timeout', type=float, help='TM Timeout when listening to verification sequence.'
' Default: 5 seconds', default=5.0)
arg_parser.add_argument(
'--nl', dest='print_log', help='Supply --nl to suppress print output to log files.',
action='store_false')
arg_parser.add_argument(
'--np', dest='print_tm', help='Supply --np to suppress print output to console.',
action='store_false')
arg_parser.add_argument(
'--tc_timeout_factor', type=float, help='TC Timeout Factor. Multiplied with '
'TM Timeout, TC sent again after this time period. Default: 3.5', default=3.5)
arg_parser.add_argument(
'-r', '--rawDataPrint', help='Supply -r to print all raw TM data directly',
action='store_true')
arg_parser.add_argument(
'-d', '--shortDisplayMode', help='Supply -d to print short output', action='store_true')
arg_parser.add_argument(
'--hk', dest='print_hk', help='Supply -k or --hk to print HK data', action='store_true')
arg_parser.add_argument('--COM', dest="com_port", help='COM Port for serial communication')
arg_parser.add_argument(
'--rs', dest="resend_tc", help='Specify whether TCs are sent again after timeout',
action='store_true')
if len(sys.argv) == 1:
print("No Input Arguments specified.")
arg_parser.print_help()
args, unknown = arg_parser.parse_known_args()
# for argument in vars(args):
# LOGGER.debug(argument + ": " + str(getattr(args, argument)))
handle_args(args, unknown)
return args
def handle_args(args, unknown: list) -> None:
"""
Handles the parsed arguments.
:param args: Namespace objects
(see https://docs.python.org/dev/library/argparse.html#argparse.Namespace)
:param unknown: List of unknown parameters.
:return: None
"""
if len(unknown) > 0:
print("Unknown arguments detected: " + str(unknown))
if len(sys.argv) > 1:
handle_unspecified_args(args)
if len(sys.argv) == 1:
handle_empty_args(args)
def handle_unspecified_args(args) -> None:
"""
If some arguments are unspecified, they are set here with (variable) default values.
:param args:
:return: None
"""
if args.com_if == 1 and args.tm_timeout is None:
args.tm_timeout = 6.0
if args.mode is None:
print("No mode specified with -m Parameter.")
print("Possible Modes: ")
print("1: Listener Mode")
print("2: Single Command Mode with manual command")
print("3: Service Mode, Commands specified in tc folder")
print("4: Software Mode, runs all command specified in obsw_pus_tc_packer.py")
print("5: Unit Test, runs unit test specified in obsw_module_test.py")
args.mode = input("Please enter Mode: ")
if args.mode == 1 and args.service is None:
args.service = input("No Service specified for Service Mode. "
"Please enter PUS G_SERVICE number: ")
def handle_empty_args(args) -> None:
"""
If no args were supplied, request input from user directly.
TODO: This still needs to be extended.
:param args:
:return:
"""
print_hk = input("Print HK packets ? (y/n or yes/no)")
try:
print_hk = print_hk.lower()
except TypeError:
pass
if print_hk in ('y', 'yes', 1):
args.print_hk = True
else:
args.print_hk = False
print_to_log = input("Export G_SERVICE test output to log files ? (y/n or yes/no)")
try:
print_to_log = print_to_log.lower()
except TypeError:
pass
if print_to_log in ('n', 'no', 0):
args.printFile = False
else:
args.printFile = True
#!/usr/bin/python3.8
"""
Argument parser module.
"""
import argparse
import sys
from tmtc_core.utility.obsw_logger import get_logger
LOGGER = get_logger()
def parse_input_arguments():
"""
Parses all input arguments
:return: Input arguments contained in a special namespace and accessable by args.<variable>
"""
arg_parser = argparse.ArgumentParser(description="TMTC Client Command Line Interface")
arg_parser.add_argument(
'-m', '--mode', type=int, help='Target Mode. Default is 1 (Listener Mode), '
'1: Listener Mode, 2: Single Command Mode, 3: Service Test Mode, '
'4: Software Test Mode, 5: Binary Upload Mode, 6: Unit Test Mode ', default=0)
arg_parser.add_argument(
'-c', '--com_if', type=int, help='Communication Interface. 0: Dummy Interface, 1: Serial, '
'2: QEMU, 3: UDP', default=2)
arg_parser.add_argument(
'-o', '--op_code', help='Operation code, which is passed to the TC '
'packer functions', default=0)
arg_parser.add_argument('--clientIP', help='Client(Computer) IP. Default:\'\'', default='')
arg_parser.add_argument(
'--boardIP', help='Board IP. Default: Localhost 127.0.0.1', default="127.0.0.1")
arg_parser.add_argument('-s', '--service', help='Service to test. Default: 17', default=17)
arg_parser.add_argument(
'-l', '--listener', help='Determine whether the listener mode will be active '
'after performing the operation',
action='store_false')
arg_parser.add_argument(
'-t', '--tm_timeout', type=float, help='TM Timeout when listening to verification sequence.'
' Default: 5 seconds', default=5.0)
arg_parser.add_argument(
'--nl', dest='print_log', help='Supply --nl to suppress print output to log files.',
action='store_false')
arg_parser.add_argument(
'--np', dest='print_tm', help='Supply --np to suppress print output to console.',
action='store_false')
arg_parser.add_argument(
'--tc_timeout_factor', type=float, help='TC Timeout Factor. Multiplied with '
'TM Timeout, TC sent again after this time period. Default: 3.5', default=3.5)
arg_parser.add_argument(
'-r', '--rawDataPrint', help='Supply -r to print all raw TM data directly',
action='store_true')
arg_parser.add_argument(
'-d', '--shortDisplayMode', help='Supply -d to print short output', action='store_true')
arg_parser.add_argument(
'--hk', dest='print_hk', help='Supply -k or --hk to print HK data', action='store_true')
arg_parser.add_argument(
'--rs', dest="resend_tc", help='Specify whether TCs are sent again after timeout',
action='store_true')
if len(sys.argv) == 1:
print("No Input Arguments specified.")
arg_parser.print_help()
args, unknown = arg_parser.parse_known_args()
# for argument in vars(args):
# LOGGER.debug(argument + ": " + str(getattr(args, argument)))
handle_args(args, unknown)
return args
def handle_args(args, unknown: list) -> None:
"""
Handles the parsed arguments.
:param args: Namespace objects
(see https://docs.python.org/dev/library/argparse.html#argparse.Namespace)
:param unknown: List of unknown parameters.
:return: None
"""
if len(unknown) > 0:
print("Unknown arguments detected: " + str(unknown))
if len(sys.argv) > 1:
handle_unspecified_args(args)
if len(sys.argv) == 1:
handle_empty_args(args)
def handle_unspecified_args(args) -> None:
"""
If some arguments are unspecified, they are set here with (variable) default values.
:param args:
:return: None
"""
if args.com_if == 1 and args.tm_timeout is None:
args.tm_timeout = 6.0
if args.mode is None:
print("No mode specified with -m Parameter.")
print("Possible Modes: ")
print("1: Listener Mode")
print("2: Single Command Mode with manual command")
print("3: Service Mode, Commands specified in pus_tc folder")
print("4: Software Mode, runs all command specified in tmtcc_pus_tc_packer.py")
print("5: Unit Test, runs unit test specified in obsw_module_test.py")
args.mode = input("Please enter Mode: ")
if args.mode == 1 and args.service is None:
args.service = input("No Service specified for Service Mode. "
"Please enter PUS G_SERVICE number: ")
def handle_empty_args(args) -> None:
"""
If no args were supplied, request input from user directly.
TODO: This still needs to be extended.
:param args:
:return:
"""
print_hk = input("Print HK packets ? (y/n or yes/no)")
try:
print_hk = print_hk.lower()
except TypeError:
pass
if print_hk in ('y', 'yes', 1):
args.print_hk = True
else:
args.print_hk = False
print_to_log = input("Export G_SERVICE test output to log files ? (y/n or yes/no)")
try:
print_to_log = print_to_log.lower()
except TypeError:
pass
if print_to_log in ('n', 'no', 0):
args.printFile = False
else:
args.printFile = True
from enum import Enum
import math
from config.obsw_config import SD_CARD_HANDLER_ID
from tmtc_core.tc.obsw_pus_tc_base import TcQueueT, PusTelecommand
from tc.obsw_tc_service23_sdcard import \
from config.tmtcc_config import SD_CARD_HANDLER_ID
from tmtc_core.pus_tc.tmtcc_pus_tc_base import TcQueueT, PusTelecommand
from pus_tc.tmtcc_tc_service23_sdcard import \
calculate_allowed_file_data_size, generate_rm_file_srv23_2_packet, \
generate_create_file_srv23_1_packet, generate_finish_append_to_file_srv23_131_packet, \
generate_lock_file_srv23_5_6_packet
......@@ -223,6 +223,7 @@ class FileTransferHelper:
header += data[number_of_packets * size_of_data_blocks:len(data)]
commands = PusTelecommand(service=23, subservice=130, ssc=init_ssc + packet_sequence_number,
app_data=header)
self.__current_ssc = init_ssc + packet_sequence_number
self.tc_queue.appendleft(commands.pack_command_tuple())
def __handle_finish_and_lock_packet_generation(self):
......@@ -230,12 +231,14 @@ class FileTransferHelper:
last_command = generate_finish_append_to_file_srv23_131_packet(
filename=self.target_filename, repository_path=self.target_repository,
ssc=self.__current_ssc, lock_file=self.__lock_file)
self.__current_ssc += 1
else:
if self.__lock_file:
last_command = generate_lock_file_srv23_5_6_packet(
filename=self.target_filename, repository_path=self.target_repository,
object_id=self.object_id, lock=True, ssc=self.__current_ssc)
self.__current_ssc += 1
else:
self.__number_of_finish_packets = 0
return
self.tc_queue.appendleft(last_command.pack_command_tuple())
\ No newline at end of file
self.tc_queue.appendleft(last_command.pack_command_tuple())