diff --git a/comIF/obsw_ethernet_com_if.py b/comIF/obsw_ethernet_com_if.py index 3239f7a4c0e7642904fd6aedd8d8d4f4b79d3ac1..ae7efef1fd79f7bb6d694dd5138c1b7774090d89 100644 --- a/comIF/obsw_ethernet_com_if.py +++ b/comIF/obsw_ethernet_com_if.py @@ -14,7 +14,7 @@ from comIF.obsw_com_interface import CommunicationInterface, PusTmListT from tm.obsw_pus_tm_factory import PusTelemetryFactory from tc.obsw_pus_tc_base import PusTcInfoT from utility.obsw_tmtc_printer import TmTcPrinter -import config.obsw_config as g +from config.obsw_definitions import ethernetAddressT # pylint: disable=abstract-method @@ -29,7 +29,7 @@ class EthernetComIF(CommunicationInterface): self.udp_socket.sendto(data, self.destination_address) def __init__(self, tmtc_printer: TmTcPrinter, tm_timeout: float, tc_timeout_factor: float, - receive_address: g.ethernetAddressT, send_address: g.ethernetAddressT): + receive_address: ethernetAddressT, send_address: ethernetAddressT): super().__init__(tmtc_printer) self.tm_timeout = tm_timeout self.tc_timeout_factor = tc_timeout_factor diff --git a/config/obsw_config.py b/config/obsw_config.py index 4e0d327b48ea713abe36a1f33dce2b44ae4ab083..0af50405301e326c1517f0eba4a1f56fff6b18bc 100644 --- a/config/obsw_config.py +++ b/config/obsw_config.py @@ -6,37 +6,11 @@ @brief Global settings for UDP client """ -import enum + import struct import pprint -from typing import Tuple import logging - -""" -Global service_type definitions -""" -ethernetAddressT = Tuple[str, int] - - -# Mode options, set by args parser -class ModeList(enum.Enum): - GUIMode = 0 - ListenerMode = 1 - SingleCommandMode = 2 - ServiceTestMode = 3 - SoftwareTestMode = 4 - BinaryUploadMode = 5 - UnitTest = 6 - PromptMode = 32 - - -class ComIF(enum.Enum): - Dummy = 0 - Serial = 1 - QEMU = 2 - Ethernet = 3 - - +from config.obsw_definitions import ModeList, ComIF """ Mission/Device specific information. diff --git a/config/obsw_definitions.py b/config/obsw_definitions.py new file mode 100644 index 0000000000000000000000000000000000000000..bfa7674b22288d7558282e64901b93f9908cbb19 --- /dev/null +++ b/config/obsw_definitions.py @@ -0,0 +1,23 @@ +import enum +from typing import Tuple + +ethernetAddressT = Tuple[str, int] + + +# Mode options, set by args parser +class ModeList(enum.Enum): + GUIMode = 0 + ListenerMode = 1 + SingleCommandMode = 2 + ServiceTestMode = 3 + SoftwareTestMode = 4 + BinaryUploadMode = 5 + UnitTest = 6 + PromptMode = 32 + + +class ComIF(enum.Enum): + Dummy = 0 + Serial = 1 + QEMU = 2 + Ethernet = 3 diff --git a/obsw_tmtc_client.py b/obsw_tmtc_client.py index f3bbcd76686705f7ee813ba596aafb9c688cf5ac..75e7dafc2d23c236189dc51943a9bddc9a18f0ae 100755 --- a/obsw_tmtc_client.py +++ b/obsw_tmtc_client.py @@ -63,7 +63,7 @@ from typing import Tuple, Union from test import obsw_pus_service_test from config import obsw_config as g from config.obsw_config import set_globals -from tc.obsw_pus_tc_packer import create_total_tc_queue, pack_service_queue +from tc.obsw_pus_tc_packer import create_total_tc_queue, ServiceQueuePacker from tc.obsw_pus_tc_base import PusTcInfo from obsw_user_code import command_preparation_hook @@ -198,7 +198,8 @@ class TmTcHandler: elif self.mode == g.ModeList.ServiceTestMode: service_queue = deque() - pack_service_queue(g.G_SERVICE, service_queue) + service_queue_packer = ServiceQueuePacker() + service_queue_packer.pack_service_queue(g.G_SERVICE, service_queue) LOGGER.info("Performing service command operation") sender_and_receiver = SequentialCommandSenderReceiver( com_interface=self.communication_interface, tmtc_printer=self.tmtc_printer, diff --git a/tc/obsw_pus_tc_base.py b/tc/obsw_pus_tc_base.py index c539b14fc86710179be01412f8dd9fbd8ccaa74a..bac256cffac9b41db52da4c30fb98044f69a7a40 100644 --- a/tc/obsw_pus_tc_base.py +++ b/tc/obsw_pus_tc_base.py @@ -11,6 +11,7 @@ from utility.obsw_logger import get_logger LOGGER = get_logger() + class TcDictionaryKeys(Enum): """ Keys for telecommand dictionary """ SERVICE = 1 @@ -40,6 +41,7 @@ class PusTelecommand: and is also shown below (bottom) """ HEADER_SIZE = 6 + def __init__(self, service: int, subservice: int, ssc=0, app_data: bytearray = bytearray([]), source_id: int = 0, version: int = 0, apid: int = 0x73): """ diff --git a/tc/obsw_pus_tc_packer.py b/tc/obsw_pus_tc_packer.py index c6ebe8dafe5606d42096d5c4077ce18ca43e7f54..b3f3d61883a7a13c89d6eed61e1636fe1ca48160 100644 --- a/tc/obsw_pus_tc_packer.py +++ b/tc/obsw_pus_tc_packer.py @@ -27,36 +27,40 @@ from typing import Union LOGGER = get_logger() -def pack_service_queue(service: Union[int, str], service_queue: TcQueueT): - if service == 2: - return pack_service2_test_into(service_queue) - if service == 3: - return pack_service3_test_into(service_queue) - if service == 5: - return pack_service5_test_into(service_queue) - if service == 8: - return pack_service8_test_into(service_queue) - if service == 9: - return pack_service9_test_into(service_queue) - if service == 17: - return pack_service17_test_into(service_queue) - if service == 20: - return pack_service20_test_into(service_queue) - if service == 200: - return pack_service200_test_into(service_queue) - if service == "Dummy": - return pack_dummy_device_test_into(service_queue) - if service == "GPS0": - # Object ID: GPS Device - object_id = g.GPS0_ObjectId - return pack_gps_test_into(object_id, service_queue) - if service == "GPS1": - # Object ID: GPS Device - object_id = g.GPS1_ObjectId - return pack_gps_test_into(object_id, service_queue) - if service == "Error": - return pack_error_testing_into(service_queue) - LOGGER.warning("Invalid Service !") +class ServiceQueuePacker: + def __init__(self): + pass + + def pack_service_queue(self, service: Union[int, str], service_queue: TcQueueT): + if service == 2: + return pack_service2_test_into(service_queue) + if service == 3: + return pack_service3_test_into(service_queue) + if service == 5: + return pack_service5_test_into(service_queue) + if service == 8: + return pack_service8_test_into(service_queue) + if service == 9: + return pack_service9_test_into(service_queue) + if service == 17: + return pack_service17_test_into(service_queue) + if service == 20: + return pack_service20_test_into(service_queue) + if service == 200: + return pack_service200_test_into(service_queue) + if service == "Dummy": + return pack_dummy_device_test_into(service_queue) + if service == "GPS0": + # Object ID: GPS Device + object_id = g.GPS0_ObjectId + return pack_gps_test_into(object_id, service_queue) + if service == "GPS1": + # Object ID: GPS Device + object_id = g.GPS1_ObjectId + return pack_gps_test_into(object_id, service_queue) + if service == "Error": + return pack_error_testing_into(service_queue) + LOGGER.warning("Invalid Service !") # TODO: a way to select certain services would be nice (by passing a dict or array maybe)