Skip to content
Snippets Groups Projects
Forked from an inaccessible project.
OBSW_Ethernet_ComIF.py 3.89 KiB
"""
@file   OBSW_Ethernet_ComIF.py
@date   01.11.2019
@brief  Ethernet Communication Interface

@author R. Mueller
"""
import select
import socket
from typing import Tuple, Union

from comIF.OBSW_ComInterface import CommunicationInterface, pusTmListT, pusTmQueueT, pusTmTupleQueueT, pusTmInfoQueueT
from tm.OBSW_TmPacket import PUSTelemetryFactory
from tc.OBSW_TcPacket import pusTcInfoT
from utility.OBSW_TmTcPrinter import TmTcPrinterT
import config.OBSW_Config as g


class EthernetComIF(CommunicationInterface):
    def __init__(self, tmtcPrinter: TmTcPrinterT, tmTimeout: float, tcTimeoutFactor: float,
                 receiveAddress: g.ethernetAddressT, sendAddress: g.ethernetAddressT):
        super().__init__(tmtcPrinter)
        self.tmTimeout = tmTimeout
        self.tcTimeoutFactor = tcTimeoutFactor
        self.sockSend = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.sockReceive = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.receiveAddress = receiveAddress
        self.sendAddress = sendAddress
        self.setUpSocket()

    def close(self) -> None:
        pass

    def sendTelecommand(self, tcPacket: bytearray, tcPacketInfo: pusTcInfoT = None) -> None:
        self.tmtcPrinter.printTelecommand(tcPacket, tcPacketInfo)
        self.sockSend.sendto(tcPacket, self.sendAddress)

    def dataAvailable(self, timeout: float = 0) -> bool:
        ready = select.select([self.sockReceive], [], [], timeout)
        if ready is None:
            return False
        elif ready[0]:
            return ready

    def pollInterface(self, pollTimeout: float = 0) -> Tuple[bool, pusTmListT]:
        ready = self.dataAvailable(pollTimeout)
        if ready:
            data = self.sockReceive.recvfrom(1024)[0]
            packet = PUSTelemetryFactory(data)
            self.tmtcPrinter.printTelemetry(packet)
            packetList = [packet]
            return True, packetList
        else:
            return False, []

    def receiveTelemetry(self, parameters: any = 0) -> list:
        (packetReceived, packetList) = self.pollInterface()
        if packetReceived:
            return packetList
        else:
            return []

    def receiveTelemetryAndStoreInfo(self, tmInfoQueue: pusTmInfoQueueT) -> Union[None, pusTmInfoQueueT]:
        packets = self.receiveTelemetry()
        for packet in packets:
            packetInfo = packet.packTmInformation()
            self.tmtcPrinter.printTelemetry(packet)
            tmInfoQueue.append(packetInfo)
        return tmInfoQueue

    def receiveTelemetryAndStoreTm(self, tmQueue: pusTmQueueT) -> Union[None, pusTmQueueT]:
        packets = self.receiveTelemetry()
        for packet in packets:
            tmQueue.append(packet)
        return tmQueue

    def receiveTelemetryAndStoreTuple(self, tmTupleQueue: pusTmTupleQueueT) -> Union[None, pusTmTupleQueueT]:
        packetList = self.receiveTelemetry()
        for packet in packetList:
            packetInfo = packet.packTmInformation()
            tmTuple = (packetInfo, packet)
            self.tmtcPrinter.printTelemetry(packet)
            tmTupleQueue.append(tmTuple)
        return tmTupleQueue

    def setUpSocket(self):
        try:
            self.sockReceive.bind(self.receiveAddress)
            self.sockReceive.setblocking(False)
        except OSError:
            print("Socket already set-up.")
            # print("Socket Receive Address: " + str(g.recAddress))
            # logging.exception("Error: ")
            # exit()
        except TypeError:
            print("Invalid Receive Address")
            exit()

    def connectToBoard(self):
        # Maybe there is a cleaner way to start comm with udp server
        # so that it knows the ip address?
        test = bytearray([])
        self.sendTelecommand(test)
        # send multiple times to get connection if there are problems