diff --git a/comIF/obsw_com_config.py b/comIF/obsw_com_config.py index 4e50bf104b231f8f25418d75eb076d3bbb213bc7..c50524a8997cb085db1dede620f21e535ca5b775 100644 --- a/comIF/obsw_com_config.py +++ b/comIF/obsw_com_config.py @@ -30,16 +30,15 @@ def set_communication_interface(tmtc_printer: TmTcPrinter) -> CommunicationInter receive_address=g.G_REC_ADDRESS) elif g.G_COM_IF == g.ComIF.Serial: com_port = g.G_COM_PORT - baud_rate = 250000 - g.G_SERIAL_TIMEOUT = 1 + serial_baudrate = g.G_SERIAL_BAUDRATE + serial_timeout = g.G_SERIAL_TIMEOUT communication_interface = SerialComIF( - tmtc_printer=tmtc_printer, com_port=com_port, baud_rate=baud_rate, - serial_timeout=g.G_SERIAL_TIMEOUT) + tmtc_printer=tmtc_printer, com_port=com_port, baud_rate=serial_baudrate, + serial_timeout=serial_timeout) elif g.G_COM_IF == g.ComIF.QEMU: communication_interface = QEMUComIF( tmtcPrinter=tmtc_printer, tmTimeout=g.G_TM_TIMEOUT, tcTimeoutFactor=g.G_TC_SEND_TIMEOUT_FACTOR) - else: communication_interface = DummyComIF(tmtc_printer=tmtc_printer) return communication_interface diff --git a/config/obsw_config.py b/config/obsw_config.py index 8a675a103b028d149362f4f1dde627c10498ea24..3ca8f26d2a4c51b0d8406269353c137c8eacb439 100644 --- a/config/obsw_config.py +++ b/config/obsw_config.py @@ -75,9 +75,10 @@ G_DISPLAY_MODE = "long" G_COM_IF = 2 # COM Port for serial communication G_COM_PORT = 'COM0' -G_SERIAL_TIMEOUT = 0.01 +G_SERIAL_TIMEOUT = 1 +G_SERIAL_BAUDRATE = 250000 # Time related -G_TM_TIMEOUT = 10 +G_TM_TIMEOUT = 6 G_TC_SEND_TIMEOUT_FACTOR = 2.0 # Ethernet connection settings diff --git a/sendreceive/obsw_sequential_sender_receiver.py b/sendreceive/obsw_sequential_sender_receiver.py index 91970e3f7c8b1579a549d07a6fcad8f0e4d76d70..5b0c9dd260ce4d46c53ced1a06f58c22ee5092cd 100644 --- a/sendreceive/obsw_sequential_sender_receiver.py +++ b/sendreceive/obsw_sequential_sender_receiver.py @@ -15,7 +15,7 @@ from utility.obsw_tmtc_printer import TmTcPrinter from utility.obsw_logger import get_logger from tc.obsw_pus_tc_base import TcQueueT -logger = get_logger() +LOGGER = get_logger() class SequentialCommandSenderReceiver(CommandSenderReceiver): @@ -50,7 +50,7 @@ class SequentialCommandSenderReceiver(CommandSenderReceiver): try: self.__handle_tc_sending() except (KeyboardInterrupt, SystemExit): - logger.info("Closing TMTC Client") + LOGGER.info("Closing TMTC Client") sys.exit() def __handle_tc_sending(self): @@ -63,9 +63,9 @@ class SequentialCommandSenderReceiver(CommandSenderReceiver): if not self.__mode_op_finished: self._tm_listener.event_mode_op_finished.set() self.__mode_op_finished = True - logger.info("SequentialSenderReceiver: All replies received!") + LOGGER.info("SequentialSenderReceiver: All replies received!") if g.G_PRINT_TO_FILE: - logger.info("SequentialSenderReceiver: Exporting output to log file.") + LOGGER.info("SequentialSenderReceiver: Exporting output to log file.") self._tmtc_printer.print_file_buffer_list_to_file("log/tmtc_log.txt", True) def __perform_next_tc_send(self): diff --git a/tc/obsw_pus_tc_base.py b/tc/obsw_pus_tc_base.py index 9f114fe84a0df33bb7052f1915f3abda609e33e3..b8966b42fd158313720fef01ea4828b2d76f779b 100644 --- a/tc/obsw_pus_tc_base.py +++ b/tc/obsw_pus_tc_base.py @@ -1,13 +1,16 @@ -import sys -import crcmod -import logging +""" +@brief This module contains the PUS telemetry class representation to + deserialize raw PUS telemetry packets. +""" from enum import Enum from typing import Dict, Union, Tuple, Deque - +import crcmod from utility.obsw_logger import get_logger -logger = get_logger() + +LOGGER = get_logger() class TcDictionaryKeys(Enum): + """ Keys for telecommand dictionary """ SERVICE = 1 SUBSERVICE = 2 SSC = 3 @@ -23,14 +26,18 @@ TcQueueT = Deque[TcQueueEntryT] PusTcInfoQueueT = Deque[PusTcInfoT] +# pylint: disable=too-many-instance-attributes +# pylint: disable=too-many-arguments class PusTelecommand: - HEADER_SIZE = 6 """ - Class representation of a PUS telecommand. It can be used to pack a raw telecommand from - input parameters. The structure of a PUS telecommand is specified in ECSS-E-70-41A on p.42 + @brief Class representation of a PUS telecommand. + @details + It can be used to pack a raw telecommand from + input parameters. The structure of a PUS telecommand is specified in ECSS-E-70-41A on p.42 and is also shown below (bottom) """ - def __init__(self, service: int, subservice: int, ssc=0, app_data: bytearray=bytearray([]), + HEADER_SIZE = 6 + def __init__(self, service: int, subservice: int, ssc=0, app_data: bytearray = bytearray([]), source_id: int = 0, version: int = 0, apid: int = 0x73): """ Initiates a telecommand with the given parameters. @@ -75,7 +82,7 @@ class PusTelecommand: data_length = 4 + len(self.app_data) + 1 return data_length except TypeError: - logger.error("PusTelecommand: Invalid type of application data!") + LOGGER.error("PusTelecommand: Invalid type of application data!") return 0 def get_total_length(self): @@ -123,10 +130,12 @@ class PusTelecommand: return tc_information def pack_command_tuple(self) -> PusTcTupleT: + """ Pack a tuple consisting of the raw packet and an information dictionary """ command_tuple = (self.pack(), self.pack_information()) return command_tuple def print(self): + """ Print the raw command in a clean format. """ packet = self.pack() print("Command in Hexadecimal: [", end="") for counter in range(len(packet)): diff --git a/tc/obsw_pus_tc_packer.py b/tc/obsw_pus_tc_packer.py index 9dcd2c0374e6dfa390947e18ed9c8063c806490b..274263651369269a80f26f739d31711caac99a46 100644 --- a/tc/obsw_pus_tc_packer.py +++ b/tc/obsw_pus_tc_packer.py @@ -17,7 +17,7 @@ from tc.obsw_pus_tc_base import PusTelecommand, TcQueueT from tc.obsw_tc_service2 import pack_service2_test_into from tc.obsw_tc_service3 import pack_service3_test_into from tc.obsw_tc_service8 import pack_service8_test_into -from tc.obsw_tc_service200 import packModeData, pack_service200_test_into +from tc.obsw_tc_service200 import pack_mode_data, pack_service200_test_into from utility.obsw_logger import get_logger import config.obsw_config as g from datetime import datetime @@ -76,7 +76,6 @@ def pack_service5_test_into(tc_queue: TcQueueT) -> TcQueueT: tc_queue.appendleft(("print", "\r\nTesting Service 5: Trigger another event")) command = PusTelecommand(service=17, subservice=128, ssc=530) tc_queue.appendleft(command.pack_command_tuple()) - tc_queue.appendleft(("print", "\r")) tc_queue.appendleft(("export", "log/tmtc_log_service5.txt")) return tc_queue @@ -103,7 +102,6 @@ def pack_service9_test_into(tc_queue: TcQueueT) -> TcQueueT: command = PusTelecommand(service=9, subservice=128, ssc=920, app_data=bytearray(time_test3)) tc_queue.appendleft(command.pack_command_tuple()) # TODO: Add other time formats here - tc_queue.appendleft(("print", "\r")) tc_queue.appendleft(("export", "log/tmtc_log_service9.txt")) return tc_queue @@ -132,7 +130,7 @@ def pack_dummy_device_test_into(tc_queue: TcQueueT) -> TcQueueT: object_id = g.DUMMY_DEVICE_ID # Set On Mode tc_queue.appendleft(("print", "Testing Service Dummy: Set On")) - mode_data = packModeData(object_id, 1, 0) + mode_data = pack_mode_data(object_id, 1, 0) command = PusTelecommand(service=200, subservice=1, ssc=1, app_data=mode_data) tc_queue.appendleft(command.pack_command_tuple()) # Test Service 2 commands @@ -141,7 +139,6 @@ def pack_dummy_device_test_into(tc_queue: TcQueueT) -> TcQueueT: # Test Service 8 tc_queue.appendleft(("print", "Testing Service Dummy: Service 8")) pack_service8_test_into(tc_queue, True) - tc_queue.appendleft(("print", "\r")) tc_queue.appendleft(("export", "log/tmtc_log_service_dummy.txt")) return tc_queue @@ -156,12 +153,12 @@ def pack_gps_test_into(object_id: bytearray, tc_queue: TcQueueT) -> TcQueueT: tc_queue.appendleft(("print", "Testing " + gps_string + " Device")) # Set Mode Off tc_queue.appendleft(("print", "Testing " + gps_string + ": Set Off")) - mode_data = packModeData(object_id, 0, 0) + mode_data = pack_mode_data(object_id, 0, 0) command = PusTelecommand(service=200, subservice=1, ssc=11, app_data=mode_data) tc_queue.appendleft(command.pack_command_tuple()) # Set Mode On tc_queue.appendleft(("print", "Testing " + gps_string + ": Set On")) - mode_data = packModeData(object_id, 1, 0) + mode_data = pack_mode_data(object_id, 1, 0) command = PusTelecommand(service=200, subservice=1, ssc=12, app_data=mode_data) tc_queue.appendleft(command.pack_command_tuple()) # Enable HK report @@ -184,7 +181,7 @@ def pack_gps_test_into(object_id: bytearray, tc_queue: TcQueueT) -> TcQueueT: tc_queue.appendleft(command.pack_command_tuple()) # Set Mode Off tc_queue.appendleft(("print", "Testing " + gps_string + ": Set Off")) - mode_data = packModeData(object_id, 0, 0) + mode_data = pack_mode_data(object_id, 0, 0) command = PusTelecommand(service=200, subservice=1, ssc=13, app_data=mode_data) tc_queue.appendleft(command.pack_command_tuple()) tc_queue.appendleft(("print", "\r")) diff --git a/tc/obsw_tc_service2.py b/tc/obsw_tc_service2.py index 75bceaa4225a49a7ff2076d9da9a24846fd28931..5e04926afdcea39008c71158e373ef10dc75c326 100644 --- a/tc/obsw_tc_service2.py +++ b/tc/obsw_tc_service2.py @@ -10,7 +10,7 @@ import struct import config.obsw_config as g from tc.obsw_pus_tc_base import PusTelecommand, Deque -from tc.obsw_tc_service200 import packModeData +from tc.obsw_tc_service200 import pack_mode_data def pack_service2_test_into(tc_queue: Deque, called_externally: bool = False) -> Deque: @@ -20,7 +20,7 @@ def pack_service2_test_into(tc_queue: Deque, called_externally: bool = False) -> # don't forget to set object mode raw (G_SERVICE 200) before calling this ! # Set Raw Mode tc_queue.appendleft(("print", "Testing Service 2: Setting Raw Mode")) - mode_data = packModeData(object_id, 3, 0) + mode_data = pack_mode_data(object_id, 3, 0) command = PusTelecommand(service=200, subservice=1, ssc=2020, app_data=mode_data) tc_queue.appendleft(command.pack_command_tuple()) # toggle wiretapping raw @@ -45,7 +45,6 @@ def pack_service2_test_into(tc_queue: Deque, called_externally: bool = False) -> tc_queue.appendleft(("print", "Testing Service 2: Send second raw command")) command = PusTelecommand(service=2, subservice=128, ssc=205, app_data=raw_data) tc_queue.appendleft(command.pack_command_tuple()) - tc_queue.appendleft(("print", "\r")) if called_externally is False: tc_queue.appendleft(("export", "log/tmtc_log_service2.txt")) return tc_queue diff --git a/tc/obsw_tc_service200.py b/tc/obsw_tc_service200.py index 6862d3e90cb038921f1605eae46f8c5cf41b4eaa..cd91a2d525525923c0206b39c759b111014b84b1 100644 --- a/tc/obsw_tc_service200.py +++ b/tc/obsw_tc_service200.py @@ -17,39 +17,39 @@ import config.obsw_config as g import struct -def pack_service200_test_into(tcQueue: TcQueueT) -> TcQueueT: - tcQueue.appendleft(("print", "Testing Service 200")) +def pack_service200_test_into(tc_queue: TcQueueT) -> TcQueueT: + tc_queue.appendleft(("print", "Testing Service 200")) # Object ID: Dummy Device - objectId = g.DUMMY_DEVICE_ID + object_id = g.DUMMY_DEVICE_ID # Set On Mode - tcQueue.appendleft(("print", "Testing Service 200: Set Mode On")) - modeData = packModeData(objectId, 1, 0) - command = PusTelecommand(service=200, subservice=1, ssc=2000, app_data=modeData) - tcQueue.appendleft(command.pack_command_tuple()) + tc_queue.appendleft(("print", "Testing Service 200: Set Mode On")) + mode_data = pack_mode_data(object_id, 1, 0) + command = PusTelecommand(service=200, subservice=1, ssc=2000, app_data=mode_data) + tc_queue.appendleft(command.pack_command_tuple()) # Set Normal mode - tcQueue.appendleft(("print", "Testing Service 200: Set Mode Normal")) - modeData = packModeData(objectId, 2, 0) - command = PusTelecommand(service=200, subservice=1, ssc=2010, app_data=modeData) - tcQueue.appendleft(command.pack_command_tuple()) + tc_queue.appendleft(("print", "Testing Service 200: Set Mode Normal")) + mode_data = pack_mode_data(object_id, 2, 0) + command = PusTelecommand(service=200, subservice=1, ssc=2010, app_data=mode_data) + tc_queue.appendleft(command.pack_command_tuple()) # Set Raw Mode - tcQueue.appendleft(("print", "Testing Service 200: Set Mode Raw")) - modeData = packModeData(objectId, 3, 0) - command = PusTelecommand(service=200, subservice=1, ssc=2020, app_data=modeData) - tcQueue.appendleft(command.pack_command_tuple()) + tc_queue.appendleft(("print", "Testing Service 200: Set Mode Raw")) + mode_data = pack_mode_data(object_id, 3, 0) + command = PusTelecommand(service=200, subservice=1, ssc=2020, app_data=mode_data) + tc_queue.appendleft(command.pack_command_tuple()) # Set Off Mode - tcQueue.appendleft(("print", "Testing Service 200: Set Mode Off")) - modeData = packModeData(objectId, 0, 0) - command = PusTelecommand(service=200, subservice=1, ssc=2030, app_data=modeData) - tcQueue.appendleft(command.pack_command_tuple()) - tcQueue.appendleft(("export", "log/tmtc_log_service200.txt")) - return tcQueue + tc_queue.appendleft(("print", "Testing Service 200: Set Mode Off")) + mode_data = pack_mode_data(object_id, 0, 0) + command = PusTelecommand(service=200, subservice=1, ssc=2030, app_data=mode_data) + tc_queue.appendleft(command.pack_command_tuple()) + tc_queue.appendleft(("export", "log/tmtc_log_service200.txt")) + return tc_queue # Mode 0: Off, Mode 1: Mode On, Mode 2: Mode Normal, Mode 3: Mode Raw -def packModeData(objectId: bytearray, mode_: int, submode_: int) -> bytearray: +def pack_mode_data(object_id: bytearray, mode_: int, submode_: int) -> bytearray: # Normal mode mode = struct.pack(">I", mode_) # Submode default submode = struct.pack('B', submode_) - modeData = objectId + mode + submode - return modeData + mode_data = object_id + mode + submode + return mode_data diff --git a/tc/obsw_tc_service3.py b/tc/obsw_tc_service3.py index c94856a94cbea3157ddcd8a3c94025c1cfb9549f..60df9ac3b6d958ec14de0bfd9c3b0346d065050b 100644 --- a/tc/obsw_tc_service3.py +++ b/tc/obsw_tc_service3.py @@ -24,9 +24,9 @@ def pack_service3_test_into(tc_queue: Deque) -> Deque: p3 = bytearray([0x03, 0x03, 0x03, 0x03]) p4 = bytearray([0x04, 0x04, 0x04, 0x04]) p5 = bytearray([0x05, 0x05, 0x05, 0x05]) - hkDefinition1 = sid1 + collection_interval + number_of_parameters + p1 + p2 + p3 + p4 + p5 + hk_definition1 = sid1 + collection_interval + number_of_parameters + p1 + p2 + p3 + p4 + p5 collection_interval = struct.pack('>f', 6) - hkDefinition2 = sid2 + collection_interval + number_of_parameters + p1 + p2 + p3 + p4 + p5 + hk_definition2 = sid2 + collection_interval + number_of_parameters + p1 + p2 + p3 + p4 + p5 # deleting pre-defined test entry tc_queue.appendleft(("print", "Testing Service 3: Deleting pre-defined HK definition")) @@ -35,12 +35,12 @@ def pack_service3_test_into(tc_queue: Deque) -> Deque: # adding pre-defined definition to hk using test pool variables tc_queue.appendleft(("print", "Testing Service 3: Adding pre-defined HK definition")) - command = PusTelecommand(service=3, subservice=1, ssc=3010, app_data=hkDefinition1) + command = PusTelecommand(service=3, subservice=1, ssc=3010, app_data=hk_definition1) tc_queue.appendleft(command.pack_command_tuple()) # adding custom definition to diagnostics using test pool variables tc_queue.appendleft(("print", "Testing Service 3: Adding custom diganostics definition")) - command = PusTelecommand(service=3, subservice=2, ssc=3020, app_data=hkDefinition2) + command = PusTelecommand(service=3, subservice=2, ssc=3020, app_data=hk_definition2) tc_queue.appendleft(command.pack_command_tuple()) # enable custom hk definition @@ -84,30 +84,30 @@ def pack_service3_test_into(tc_queue: Deque) -> Deque: command = PusTelecommand(service=3, subservice=27, ssc=3120, app_data=sid1) tc_queue.appendleft(command.pack_command_tuple()) # generate one custom diag definition - tc_queue.appendleft(("print", "\r\nTesting Service 3: Generate one custom diagnostics definition")) + tc_queue.appendleft(("print", "Testing Service 3: Generate one custom diagnostics definition")) command = PusTelecommand(service=3, subservice=28, ssc=3120, app_data=sid2) tc_queue.appendleft(command.pack_command_tuple()) # generate one gps 0 definition - tc_queue.appendleft(("print", "\r\nTesting Service 3: Generate one gps 0 defintion")) + tc_queue.appendleft(("print", "Testing Service 3: Generate one gps 0 defintion")) command = PusTelecommand(service=3, subservice=27, ssc=3120, app_data=sid_gps) tc_queue.appendleft(command.pack_command_tuple()) # modify custom hk definition interval - newInterval = struct.pack('>f', 10.0) - newIntervalCommand = sid1 + newInterval - tc_queue.appendleft(("print", "\r\nTesting Service 3: Changing pre-defined HK definition interval")) - command = PusTelecommand(service=3, subservice=31, ssc=3090, app_data=newIntervalCommand) + new_interval = struct.pack('>f', 10.0) + new_interval_command = sid1 + new_interval + tc_queue.appendleft(("print", "Testing Service 3: Changing pre-defined HK definition interval")) + command = PusTelecommand(service=3, subservice=31, ssc=3090, app_data=new_interval_command) tc_queue.appendleft(command.pack_command_tuple()) # report custom HK definition - tc_queue.appendleft(("print", "\r\nTesting Service 3: Reporting pre-defined HK definition with changed interval")) + tc_queue.appendleft(("print", "Testing Service 3: Reporting pre-defined HK definition with changed interval")) command = PusTelecommand(service=3, subservice=9, ssc=3090, app_data=sid1) tc_queue.appendleft(command.pack_command_tuple()) # modify custom diag definition interval - newIntervalCommand = sid2 + newInterval - tc_queue.appendleft(("print", "\r\nTesting Service 3: Changing custom diag HK definition interval")) - command = PusTelecommand(service=3, subservice=32, ssc=3090, app_data=newIntervalCommand) + new_interval_command = sid2 + new_interval + tc_queue.appendleft(("print", "Testing Service 3: Changing custom diag HK definition interval")) + command = PusTelecommand(service=3, subservice=32, ssc=3090, app_data=new_interval_command) tc_queue.appendleft(command.pack_command_tuple()) # report custom diag definition - tc_queue.appendleft(("print", "\r\nTesting Service 3: Reporting diag definition")) + tc_queue.appendleft(("print", "Testing Service 3: Reporting diag definition")) command = PusTelecommand(service=3, subservice=11, ssc=3100, app_data=sid2) tc_queue.appendleft(command.pack_command_tuple()) # append parameter to custom hk definiton diff --git a/tc/obsw_tc_service8.py b/tc/obsw_tc_service8.py index 54594081ea7adb9557b680566ff1f0555c7cd2cb..29f49e76b6030674c1fbb8167fa2dedab41617ef 100644 --- a/tc/obsw_tc_service8.py +++ b/tc/obsw_tc_service8.py @@ -10,7 +10,7 @@ from typing import Deque import config.obsw_config as g from tc.obsw_pus_tc_base import PusTelecommand -from tc.obsw_tc_service200 import packModeData +from tc.obsw_tc_service200 import pack_mode_data def pack_service8_test_into(tc_queue: Deque, called_externally: bool = False) -> Deque: @@ -19,26 +19,26 @@ def pack_service8_test_into(tc_queue: Deque, called_externally: bool = False) -> object_id = g.DUMMY_DEVICE_ID # set mode on - tc_queue.appendleft(("print", "\r\nTesting Service 8: Set On Mode")) - mode_data = packModeData(object_id, 1, 0) + tc_queue.appendleft(("print", "Testing Service 8: Set On Mode")) + mode_data = pack_mode_data(object_id, 1, 0) command = PusTelecommand(service=200, subservice=1, ssc=800, app_data=mode_data) tc_queue.appendleft(command.pack_command_tuple()) # set mode normal - tc_queue.appendleft(("print", "\r\nTesting Service 8: Set Normal Mode")) - mode_data = packModeData(object_id, 2, 0) + tc_queue.appendleft(("print", "Testing Service 8: Set Normal Mode")) + mode_data = pack_mode_data(object_id, 2, 0) command = PusTelecommand(service=200, subservice=1, ssc=810, app_data=mode_data) tc_queue.appendleft(command.pack_command_tuple()) # Direct command which triggers completion reply - tc_queue.appendleft(("print", "\r\nTesting Service 8: Trigger Completion Reply")) + tc_queue.appendleft(("print", "Testing Service 8: Trigger Completion Reply")) action_id = g.DUMMY_COMMAND_1 direct_command = object_id + action_id command = PusTelecommand(service=8, subservice=128, ssc=820, app_data=direct_command) tc_queue.appendleft(command.pack_command_tuple()) # Direct command which triggers _tm_data reply - tc_queue.appendleft(("print", "\r\nTesting Service 8: Trigger Data Reply")) + tc_queue.appendleft(("print", "Testing Service 8: Trigger Data Reply")) action_id = g.DUMMY_COMMAND_2 command_param1 = g.DUMMY_COMMAND_2_PARAM_1 command_param2 = g.DUMMY_COMMAND_2_PARAM_2 @@ -47,7 +47,7 @@ def pack_service8_test_into(tc_queue: Deque, called_externally: bool = False) -> tc_queue.appendleft(command.pack_command_tuple()) # Direct command which triggers an additional step reply and one completion reply - tc_queue.appendleft(("print", "\r\nTesting Service 8: Trigger Step and Completion Reply")) + tc_queue.appendleft(("print", "Testing Service 8: Trigger Step and Completion Reply")) action_id = g.DUMMY_COMMAND_3 direct_command = object_id + action_id command = PusTelecommand(service=8, subservice=128, ssc=840, app_data=direct_command)