com_if_setup.py 4.02 KB
Newer Older
Robin Mueller's avatar
Robin Mueller committed
1
2
3
4
5
6
7
8
"""
@brief      This is the default configuration for the communication interface
@details    This file can serve as an example of how to set up the required communication interface.
            It is called by the default communication interface setup function.
"""
import sys
from typing import Union

Robin Mueller's avatar
Robin Mueller committed
9
10
11
12
13
14
15
16
17
from tmtccmd.core.definitions import ComInterfaces
from tmtccmd.com_if.tmtcc_com_interface_base import CommunicationInterface
from tmtccmd.com_if.tmtcc_dummy_com_if import DummyComIF
from tmtccmd.com_if.tmtcc_ethernet_com_if import EthernetComIF
from tmtccmd.com_if.tmtcc_qemu_com_if import QEMUComIF
from tmtccmd.com_if.tmtcc_serial_com_if import SerialCommunicationType, SerialComIF
from tmtccmd.com_if.tmtcc_serial_utilities import determine_com_port
from tmtccmd.utility.tmtcc_logger import get_logger
from tmtccmd.utility.tmtcc_tmtc_printer import TmTcPrinter
Robin Mueller's avatar
Robin Mueller committed
18
19
20
21

LOGGER = get_logger()


Robin Mueller's avatar
Robin Mueller committed
22
def create_communication_interface_default(com_if: ComInterfaces, tmtc_printer: TmTcPrinter) -> \
Robin Mueller's avatar
Robin Mueller committed
23
        Union[CommunicationInterface, None]:
Robin Mueller's avatar
Robin Mueller committed
24
25
    from tmtccmd.core.definitions import CoreGlobalIds
    from tmtccmd.core.globals_manager import get_global
Robin Mueller's avatar
Robin Mueller committed
26
27
28
29
30
31
32
33
    """
    Return the desired communication interface object
    :param tmtc_printer: TmTcPrinter object.
    :return: CommunicationInterface object
    """
    try:
        if com_if == ComInterfaces.EthernetUDP:
            from config.tmtcc_definitions import EthernetConfig
Robin Mueller's avatar
Robin Mueller committed
34
            ethernet_cfg_dict = get_global(CoreGlobalIds.ETHERNET_CONFIG)
Robin Mueller's avatar
Robin Mueller committed
35
36
37
            send_addr = ethernet_cfg_dict[EthernetConfig.SEND_ADDRESS]
            rcv_addr = ethernet_cfg_dict[EthernetConfig.RECV_ADDRESS]
            communication_interface = EthernetComIF(
Robin Mueller's avatar
Robin Mueller committed
38
39
                tmtc_printer=tmtc_printer, tm_timeout=get_global(CoreGlobalIds.TM_TIMEOUT),
                tc_timeout_factor=get_global(CoreGlobalIds.TC_SEND_TIMEOUT_FACTOR),
Robin Mueller's avatar
Robin Mueller committed
40
41
42
                send_address=send_addr, receive_address=rcv_addr)
        elif com_if == ComInterfaces.Serial:
            from config.tmtcc_definitions import SerialConfig
Robin Mueller's avatar
Robin Mueller committed
43
            serial_cfg = get_global(CoreGlobalIds.SERIAL_CONFIG)
Robin Mueller's avatar
Robin Mueller committed
44
45
46
47
48
49
50
51
52
53
            serial_baudrate = serial_cfg[SerialConfig.SERIAL_BAUD_RATE]
            serial_timeout = serial_cfg[SerialConfig.SERIAL_TIMEOUT]
            # Determine COM port, either extract from JSON file or ask from user.
            com_port = determine_com_port()
            communication_interface = SerialComIF(
                tmtc_printer=tmtc_printer, com_port=com_port, baud_rate=serial_baudrate,
                serial_timeout=serial_timeout,
                ser_com_type=SerialCommunicationType.DLE_ENCODING)
            dle_max_queue_len = serial_cfg[SerialConfig.SERIAL_DLE_QUEUE_LEN]
            dle_max_frame_size = serial_cfg[SerialConfig.SERIAL_DLE_MAX_FRAME_SIZE]
Robin Mueller's avatar
Robin Mueller committed
54
55
            communication_interface.set_dle_settings(dle_max_queue_len, dle_max_frame_size,
                                                     serial_timeout)
Robin Mueller's avatar
Robin Mueller committed
56
57
        elif com_if == ComInterfaces.QEMU:
            from config.tmtcc_definitions import SerialConfig
Robin Mueller's avatar
Robin Mueller committed
58
            serial_cfg = get_global(CoreGlobalIds.SERIAL_CONFIG)
Robin Mueller's avatar
Robin Mueller committed
59
60
            serial_timeout = serial_cfg[SerialConfig.SERIAL_TIMEOUT]
            communication_interface = QEMUComIF(
Robin Mueller's avatar
Robin Mueller committed
61
                tmtc_printer=tmtc_printer, serial_timeout=serial_timeout,
Robin Mueller's avatar
Robin Mueller committed
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
                ser_com_type=SerialCommunicationType.DLE_ENCODING)
            dle_max_queue_len = serial_cfg[SerialConfig.SERIAL_DLE_QUEUE_LEN]
            dle_max_frame_size = serial_cfg[SerialConfig.SERIAL_DLE_MAX_FRAME_SIZE]
            communication_interface.set_dle_settings(
                dle_max_queue_len, dle_max_frame_size, serial_timeout)
        else:
            communication_interface = DummyComIF(tmtc_printer=tmtc_printer)
        if not communication_interface.valid:
            LOGGER.warning("Invalid communication interface!")
            sys.exit()
        communication_interface.initialize()
        return communication_interface
    except (IOError, OSError) as e:
        LOGGER.error("Error setting up communication interface")
        print(e)
        sys.exit(1)