Skip to content
Snippets Groups Projects
Forked from an inaccessible project.
obsw_ethernet_com_if.py 4.30 KiB
"""
@file   obsw_ethernet_com_if.py
@date   01.11.2019
@brief  Ethernet Communication Interface

@author R. Mueller
"""
import select
import socket
import sys
from typing import Tuple, Union

from comIF.obsw_com_interface import CommunicationInterface, PusTmListT, PusTmQueueT, \
    PusTmTupleQueueT, PusTmInfoQueueT
from tm.obsw_pus_tm_factory import pus_telemetry_factory
from tc.obsw_pus_tc_base import PusTcInfoT
from utility.obsw_tmtc_printer import TmTcPrinter
import config.obsw_config as g


# pylint: disable=abstract-method
# pylint: disable=arguments-differ
# pylint: disable=too-many-arguments
# TODO: decouple printing from receiving. Printing takes a long time and blocks!
class EthernetComIF(CommunicationInterface):
    """
    Communication interface for UDP communication.
    """
    def __init__(self, tmtc_printer: TmTcPrinter, tmTimeout: float, tcTimeoutFactor: float,
                 receiveAddress: g.ethernetAddressT, sendAddress: g.ethernetAddressT):
        super().__init__(tmtc_printer)
        self.tm_timeout = tmTimeout
        self.tc_timeout_factor = tcTimeoutFactor
        self.sock_send = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.sock_receive = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.receive_address = receiveAddress
        self.send_address = sendAddress
        self.set_up_socket()

    def close(self) -> None:
        pass

    def send_telecommand(self, tc_packet: bytearray, tc_packet_info: PusTcInfoT = None) -> None:
        self.tmtc_printer.print_telecommand(tc_packet, tc_packet_info)
        self.sock_send.sendto(tc_packet, self.send_address)

    def data_available(self, timeout: float = 0) -> bool:
        ready = select.select([self.sock_receive], [], [], timeout)
        if ready[0]:
            return True
        return False

    def poll_interface(self, pollTimeout: float = 0) -> Tuple[bool, PusTmListT]:
        ready = self.data_available(pollTimeout)
        if ready:
            data = self.sock_receive.recvfrom(1024)[0]
            packet = pus_telemetry_factory(data)
            self.tmtc_printer.print_telemetry(packet)
            packet_list = [packet]
            return True, packet_list
        return False, []

    def receive_telemetry(self, parameters: any = 0) -> list:
        (packet_received, packet_list) = self.poll_interface()
        if packet_received:
            return packet_list
        return []

    def receive_telemetry_and_store_info(self, tmInfoQueue: PusTmInfoQueueT) -> \
            Union[None, PusTmInfoQueueT]:
        packets = self.receive_telemetry()
        for packet in packets:
            packet_info = packet.pack_tm_information()
            if g.G_PRINT_TM:
                self.tmtc_printer.print_telemetry(packet)
            tmInfoQueue.append(packet_info)
        return tmInfoQueue

    def receive_telemetry_and_store_tm(self, tmQueue: PusTmQueueT) -> Union[None, PusTmQueueT]:
        packets = self.receive_telemetry()
        for packet in packets:
            tmQueue.append(packet)
        return tmQueue

    def receive_telemetry_and_store_tuple(self, tmTupleQueue: PusTmTupleQueueT) -> \
            Union[None, PusTmTupleQueueT]:
        packet_list = self.receive_telemetry()
        for packet in packet_list:
            packet_info = packet.pack_tm_information()
            tm_tuple = (packet_info, packet)
            if g.G_PRINT_TM:
                self.tmtc_printer.print_telemetry(packet)
            tmTupleQueue.append(tm_tuple)
        return tmTupleQueue

    def set_up_socket(self):
        """
        Sets up the sockets for the UDP communication.
        :return:
        """
        try:
            self.sock_receive.bind(self.receive_address)
            self.sock_receive.setblocking(False)
        except OSError:
            print("Socket already set-up.")
            # print("Socket Receive Address: " + str(g.G_REC_ADDRESS))
            # logging.exception("Error: ")
            # exit()
        except TypeError:
            print("Invalid Receive Address")
            sys.exit()

    def connect_to_board(self):
        """
        For UDP, this is used to initiate communication.
        :return:
        """
        test = bytearray([])
        self.send_telecommand(test)