Something went wrong on our end
Forked from an inaccessible project.
-
Robin.Mueller authoredRobin.Mueller authored
obsw_ethernet_com_if.py 4.30 KiB
"""
@file obsw_ethernet_com_if.py
@date 01.11.2019
@brief Ethernet Communication Interface
@author R. Mueller
"""
import select
import socket
import sys
from typing import Tuple, Union
from comIF.obsw_com_interface import CommunicationInterface, PusTmListT, PusTmQueueT, \
PusTmTupleQueueT, PusTmInfoQueueT
from tm.obsw_pus_tm_factory import pus_telemetry_factory
from tc.obsw_pus_tc_base import PusTcInfoT
from utility.obsw_tmtc_printer import TmTcPrinter
import config.obsw_config as g
# pylint: disable=abstract-method
# pylint: disable=arguments-differ
# pylint: disable=too-many-arguments
# TODO: decouple printing from receiving. Printing takes a long time and blocks!
class EthernetComIF(CommunicationInterface):
"""
Communication interface for UDP communication.
"""
def __init__(self, tmtc_printer: TmTcPrinter, tmTimeout: float, tcTimeoutFactor: float,
receiveAddress: g.ethernetAddressT, sendAddress: g.ethernetAddressT):
super().__init__(tmtc_printer)
self.tm_timeout = tmTimeout
self.tc_timeout_factor = tcTimeoutFactor
self.sock_send = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock_receive = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.receive_address = receiveAddress
self.send_address = sendAddress
self.set_up_socket()
def close(self) -> None:
pass
def send_telecommand(self, tc_packet: bytearray, tc_packet_info: PusTcInfoT = None) -> None:
self.tmtc_printer.print_telecommand(tc_packet, tc_packet_info)
self.sock_send.sendto(tc_packet, self.send_address)
def data_available(self, timeout: float = 0) -> bool:
ready = select.select([self.sock_receive], [], [], timeout)
if ready[0]:
return True
return False
def poll_interface(self, pollTimeout: float = 0) -> Tuple[bool, PusTmListT]:
ready = self.data_available(pollTimeout)
if ready:
data = self.sock_receive.recvfrom(1024)[0]
packet = pus_telemetry_factory(data)
self.tmtc_printer.print_telemetry(packet)
packet_list = [packet]
return True, packet_list
return False, []
def receive_telemetry(self, parameters: any = 0) -> list:
(packet_received, packet_list) = self.poll_interface()
if packet_received:
return packet_list
return []
def receive_telemetry_and_store_info(self, tmInfoQueue: PusTmInfoQueueT) -> \
Union[None, PusTmInfoQueueT]:
packets = self.receive_telemetry()
for packet in packets:
packet_info = packet.pack_tm_information()
if g.G_PRINT_TM:
self.tmtc_printer.print_telemetry(packet)
tmInfoQueue.append(packet_info)
return tmInfoQueue
def receive_telemetry_and_store_tm(self, tmQueue: PusTmQueueT) -> Union[None, PusTmQueueT]:
packets = self.receive_telemetry()
for packet in packets:
tmQueue.append(packet)
return tmQueue
def receive_telemetry_and_store_tuple(self, tmTupleQueue: PusTmTupleQueueT) -> \
Union[None, PusTmTupleQueueT]:
packet_list = self.receive_telemetry()
for packet in packet_list:
packet_info = packet.pack_tm_information()
tm_tuple = (packet_info, packet)
if g.G_PRINT_TM:
self.tmtc_printer.print_telemetry(packet)
tmTupleQueue.append(tm_tuple)
return tmTupleQueue
def set_up_socket(self):
"""
Sets up the sockets for the UDP communication.
:return:
"""
try:
self.sock_receive.bind(self.receive_address)
self.sock_receive.setblocking(False)
except OSError:
print("Socket already set-up.")
# print("Socket Receive Address: " + str(g.G_REC_ADDRESS))
# logging.exception("Error: ")
# exit()
except TypeError:
print("Invalid Receive Address")
sys.exit()
def connect_to_board(self):
"""
For UDP, this is used to initiate communication.
:return:
"""
test = bytearray([])
self.send_telecommand(test)