Something went wrong on our end
Forked from an inaccessible project.
-
Robin.Mueller authoredRobin.Mueller authored
OBSW_Ethernet_ComIF.py 3.89 KiB
"""
@file OBSW_Ethernet_ComIF.py
@date 01.11.2019
@brief Ethernet Communication Interface
@author R. Mueller
"""
import select
import socket
from typing import Tuple, Union
from comIF.OBSW_ComInterface import CommunicationInterface, pusTmListT, pusTmQueueT, pusTmTupleQueueT, pusTmInfoQueueT
from tm.OBSW_TmPacket import PUSTelemetryFactory
from tc.OBSW_TcPacket import pusTcInfoT
from utility.OBSW_TmTcPrinter import TmTcPrinterT
import config.OBSW_Config as g
class EthernetComIF(CommunicationInterface):
def __init__(self, tmtcPrinter: TmTcPrinterT, tmTimeout: float, tcTimeoutFactor: float,
receiveAddress: g.ethernetAddressT, sendAddress: g.ethernetAddressT):
super().__init__(tmtcPrinter)
self.tmTimeout = tmTimeout
self.tcTimeoutFactor = tcTimeoutFactor
self.sockSend = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sockReceive = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.receiveAddress = receiveAddress
self.sendAddress = sendAddress
self.setUpSocket()
def close(self) -> None:
pass
def sendTelecommand(self, tcPacket: bytearray, tcPacketInfo: pusTcInfoT = None) -> None:
self.tmtcPrinter.printTelecommand(tcPacket, tcPacketInfo)
self.sockSend.sendto(tcPacket, self.sendAddress)
def dataAvailable(self, timeout: float = 0) -> bool:
ready = select.select([self.sockReceive], [], [], timeout)
if ready is None:
return False
elif ready[0]:
return ready
def pollInterface(self, pollTimeout: float = 0) -> Tuple[bool, pusTmListT]:
ready = self.dataAvailable(pollTimeout)
if ready:
data = self.sockReceive.recvfrom(1024)[0]
packet = PUSTelemetryFactory(data)
self.tmtcPrinter.printTelemetry(packet)
packetList = [packet]
return True, packetList
else:
return False, []
def receiveTelemetry(self, parameters: any = 0) -> list:
(packetReceived, packetList) = self.pollInterface()
if packetReceived:
return packetList
else:
return []
def receiveTelemetryAndStoreInfo(self, tmInfoQueue: pusTmInfoQueueT) -> Union[None, pusTmInfoQueueT]:
packets = self.receiveTelemetry()
for packet in packets:
packetInfo = packet.packTmInformation()
self.tmtcPrinter.printTelemetry(packet)
tmInfoQueue.append(packetInfo)
return tmInfoQueue
def receiveTelemetryAndStoreTm(self, tmQueue: pusTmQueueT) -> Union[None, pusTmQueueT]:
packets = self.receiveTelemetry()
for packet in packets:
tmQueue.append(packet)
return tmQueue
def receiveTelemetryAndStoreTuple(self, tmTupleQueue: pusTmTupleQueueT) -> Union[None, pusTmTupleQueueT]:
packetList = self.receiveTelemetry()
for packet in packetList:
packetInfo = packet.packTmInformation()
tmTuple = (packetInfo, packet)
self.tmtcPrinter.printTelemetry(packet)
tmTupleQueue.append(tmTuple)
return tmTupleQueue
def setUpSocket(self):
try:
self.sockReceive.bind(self.receiveAddress)
self.sockReceive.setblocking(False)
except OSError:
print("Socket already set-up.")
# print("Socket Receive Address: " + str(g.recAddress))
# logging.exception("Error: ")
# exit()
except TypeError:
print("Invalid Receive Address")
exit()
def connectToBoard(self):
# Maybe there is a cleaner way to start comm with udp server
# so that it knows the ip address?
test = bytearray([])
self.sendTelecommand(test)
# send multiple times to get connection if there are problems