Skip to content
Snippets Groups Projects
Forked from an inaccessible project.
obsw_ethernet_com_if.py 4.27 KiB
"""
@file   obsw_ethernet_com_if.py
@date   01.11.2019
@brief  Ethernet Communication Interface

@author R. Mueller
"""
import select
import socket
import sys
from typing import Tuple

from comIF.obsw_com_interface import CommunicationInterface, PusTmListT
from tm.obsw_pus_tm_factory import PusTelemetryFactory
from tc.obsw_pus_tc_base import PusTcInfoT
from utility.obsw_tmtc_printer import TmTcPrinter
import config.obsw_config as g


# pylint: disable=abstract-method
# pylint: disable=arguments-differ
# pylint: disable=too-many-arguments
# TODO: decouple printing from receiving. Printing takes a long time and blocks!
class EthernetComIF(CommunicationInterface):
    """
    Communication interface for UDP communication.
    """
    def __init__(self, tmtc_printer: TmTcPrinter, tm_timeout: float, tc_timeout_factor: float,
                 receive_address: g.ethernetAddressT, send_address: g.ethernetAddressT):
        super().__init__(tmtc_printer)
        self.tm_timeout = tm_timeout
        self.tc_timeout_factor = tc_timeout_factor
        self.sock_send = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.sock_receive = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.receive_address = receive_address
        self.send_address = send_address
        self.set_up_socket()

    def close(self) -> None:
        pass

    def send_telecommand(self, tc_packet: bytearray, tc_packet_info: PusTcInfoT = None) -> None:
        self.tmtc_printer.print_telecommand(tc_packet, tc_packet_info)
        self.sock_send.sendto(tc_packet, self.send_address)

    def data_available(self, timeout: float = 0) -> bool:
        ready = select.select([self.sock_receive], [], [], timeout)
        if ready[0]:
            return True
        return False

    def poll_interface(self, poll_timeout: float = 0) -> Tuple[bool, PusTmListT]:
        ready = self.data_available(poll_timeout)
        if ready:
            data = self.sock_receive.recvfrom(1024)[0]
            tm_packet = PusTelemetryFactory.create(data)
            # self.tmtc_printer.print_telemetry(tm_packet)
            packet_list = [tm_packet]
            return True, packet_list
        return False, []

    def receive_telemetry(self, parameters: any = 0) -> list:
        (packet_received, packet_list) = self.poll_interface()
        if packet_received:
            return packet_list
        return []

    def set_up_socket(self):
        """
        Sets up the sockets for the UDP communication.
        :return:
        """
        try:
            self.sock_receive.bind(self.receive_address)
            self.sock_receive.setblocking(False)
        except OSError:
            print("Socket already set-up.")
            # print("Socket Receive Address: " + str(g.G_REC_ADDRESS))
            # logging.exception("Error: ")
            # exit()
        except TypeError:
            print("Invalid Receive Address")
            sys.exit()

    def connect_to_board(self):
        """
        For UDP, this is used to initiate communication.
        :return:
        """
        test = bytearray([])
        self.send_telecommand(test)

    # def receive_telemetry_and_store_info(self, tm_info_queue: PusTmInfoQueueT) -> \
    #         Union[None, PusTmInfoQueueT]:
    #     packets = self.receive_telemetry()
    #     for packet in packets:
    #         packet_info = packet.pack_tm_information()
    #         if g.G_PRINT_TM:
    #             self.tmtc_printer.print_telemetry(packet)
    #         tm_info_queue.append(packet_info)
    #     return tm_info_queue
    #
    # def receive_telemetry_and_store_tm(self, tm_queue: PusTmQueueT) -> Union[None, PusTmQueueT]:
    #     packets = self.receive_telemetry()
    #     for packet in packets:
    #         tm_queue.append(packet)
    #     return tm_queue
    #
    # def receive_telemetry_and_store_tuple(self, tm_tuple_queue):
    #     packet_list = self.receive_telemetry()
    #     for packet in packet_list:
    #         packet_info = packet.pack_tm_information()
    #         tm_tuple = (packet_info, packet)
    #         if g.G_PRINT_TM:
    #             self.tmtc_printer.print_telemetry(packet)
    #         tm_tuple_queue.append(tm_tuple)
    #     return tm_tuple_queue