diff --git a/config/obsw_com_config.py b/config/obsw_com_config.py new file mode 100644 index 0000000000000000000000000000000000000000..7d9a8b9124a21ef04db90253c07a569e8cc279a2 --- /dev/null +++ b/config/obsw_com_config.py @@ -0,0 +1,56 @@ +""" +Set-up function. Initiates the communication interface. +""" +import sys +from typing import Union + +from tmtc_core.comIF.obsw_com_interface import CommunicationInterface +from tmtc_core.comIF.obsw_dummy_com_if import DummyComIF +from tmtc_core.comIF.obsw_ethernet_com_if import EthernetComIF +from tmtc_core.comIF.obsw_serial_com_if import SerialComIF, SerialCommunicationType +from tmtc_core.comIF.obsw_qemu_com_if import QEMUComIF + +from tmtc_core.utility.obsw_logger import get_logger +from tmtc_core.utility.obsw_tmtc_printer import TmTcPrinter + +import config.obsw_config as g + +LOGGER = get_logger() + + +def set_communication_interface(tmtc_printer: TmTcPrinter) -> Union[CommunicationInterface, None]: + """ + Return the desired communication interface object + :param tmtc_printer: TmTcPrinter object. + :return: CommunicationInterface object + """ + try: + if g.G_COM_IF == g.ComIF.Ethernet: + communication_interface = EthernetComIF( + tmtc_printer=tmtc_printer, tm_timeout=g.G_TM_TIMEOUT, + tc_timeout_factor=g.G_TC_SEND_TIMEOUT_FACTOR, send_address=g.G_SEND_ADDRESS, + receive_address=g.G_REC_ADDRESS) + elif g.G_COM_IF == g.ComIF.Serial: + serial_baudrate = g.G_SERIAL_BAUDRATE + serial_timeout = g.G_SERIAL_TIMEOUT + communication_interface = SerialComIF( + tmtc_printer=tmtc_printer, com_port=g.G_COM_PORT, baud_rate=serial_baudrate, + serial_timeout=serial_timeout, + ser_com_type=SerialCommunicationType.DLE_ENCODING) + communication_interface.set_dle_settings( + g.G_SERIAL_DLE_MAX_QUEUE_LEN, g.G_SERIAL_DLE_MAX_FRAME_SIZE, serial_timeout) + elif g.G_COM_IF == g.ComIF.QEMU: + communication_interface = QEMUComIF( + tmtc_printer=tmtc_printer, tm_timeout=g.G_TM_TIMEOUT, + tc_timeout_factor=g.G_TC_SEND_TIMEOUT_FACTOR) + else: + communication_interface = DummyComIF(tmtc_printer=tmtc_printer) + if not communication_interface.valid: + LOGGER.warning("Invalid communication interface!") + return None + communication_interface.initialize() + return communication_interface + except (IOError, OSError): + LOGGER.error("Error setting up communication interface") + LOGGER.exception("Error") + sys.exit() diff --git a/config/obsw_config.py b/config/obsw_config.py index d212346d7c0da07866f0c53a7ecc42251a5bbe66..db122b02ed89247983fbb65ea1fd40edac256b56 100644 --- a/config/obsw_config.py +++ b/config/obsw_config.py @@ -66,6 +66,10 @@ G_COM_PORT = 'COM0' G_SERIAL_TIMEOUT = 0.01 G_SERIAL_BAUDRATE = 230400 G_SERIAL_FRAME_SIZE = 256 + +G_SERIAL_DLE_MAX_QUEUE_LEN = 25 +G_SERIAL_DLE_MAX_FRAME_SIZE = 1024 + # Time related G_TM_TIMEOUT = 6 G_TC_SEND_TIMEOUT_FACTOR = 2.0 diff --git a/obsw_tmtc_client.py b/obsw_tmtc_client.py index a55d1629d1d0123d550fb06a71d48d5b3e9f5da3..018d2e82dce8243a638ba0c2586a179021ac9e19 100755 --- a/obsw_tmtc_client.py +++ b/obsw_tmtc_client.py @@ -44,7 +44,7 @@ from tmtc_core.sendreceive.obsw_tm_listener import TmListener from tmtc_core.utility.obsw_tmtc_printer import TmTcPrinter from tmtc_core.utility.obsw_exit_handler import keyboard_interrupt_handler from tmtc_core.utility.obsw_logger import set_tmtc_logger, get_logger -from tmtc_core.comIF.obsw_com_config import set_communication_interface +from config.obsw_com_config import set_communication_interface from test import obsw_pus_service_test from tc.obsw_pus_tc_packer import create_total_tc_queue, ServiceQueuePacker diff --git a/tmtc_core b/tmtc_core index 4c402373e4edb9463296c9f76d8281f94153bdf6..9c509ddce9474dd29debfa9467386c0a32f46636 160000 --- a/tmtc_core +++ b/tmtc_core @@ -1 +1 @@ -Subproject commit 4c402373e4edb9463296c9f76d8281f94153bdf6 +Subproject commit 9c509ddce9474dd29debfa9467386c0a32f46636