Something went wrong on our end
Forked from an inaccessible project.
-
Robin.Mueller authoredRobin.Mueller authored
obsw_ethernet_com_if.py 4.27 KiB
"""
@file obsw_ethernet_com_if.py
@date 01.11.2019
@brief Ethernet Communication Interface
@author R. Mueller
"""
import select
import socket
import sys
from typing import Tuple
from comIF.obsw_com_interface import CommunicationInterface, PusTmListT
from tm.obsw_pus_tm_factory import PusTelemetryFactory
from tc.obsw_pus_tc_base import PusTcInfoT
from utility.obsw_tmtc_printer import TmTcPrinter
import config.obsw_config as g
# pylint: disable=abstract-method
# pylint: disable=arguments-differ
# pylint: disable=too-many-arguments
# TODO: decouple printing from receiving. Printing takes a long time and blocks!
class EthernetComIF(CommunicationInterface):
"""
Communication interface for UDP communication.
"""
def __init__(self, tmtc_printer: TmTcPrinter, tm_timeout: float, tc_timeout_factor: float,
receive_address: g.ethernetAddressT, send_address: g.ethernetAddressT):
super().__init__(tmtc_printer)
self.tm_timeout = tm_timeout
self.tc_timeout_factor = tc_timeout_factor
self.sock_send = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock_receive = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.receive_address = receive_address
self.send_address = send_address
self.set_up_socket()
def close(self) -> None:
pass
def send_telecommand(self, tc_packet: bytearray, tc_packet_info: PusTcInfoT = None) -> None:
self.tmtc_printer.print_telecommand(tc_packet, tc_packet_info)
self.sock_send.sendto(tc_packet, self.send_address)
def data_available(self, timeout: float = 0) -> bool:
ready = select.select([self.sock_receive], [], [], timeout)
if ready[0]:
return True
return False
def poll_interface(self, poll_timeout: float = 0) -> Tuple[bool, PusTmListT]:
ready = self.data_available(poll_timeout)
if ready:
data = self.sock_receive.recvfrom(1024)[0]
tm_packet = PusTelemetryFactory.create(data)
# self.tmtc_printer.print_telemetry(tm_packet)
packet_list = [tm_packet]
return True, packet_list
return False, []
def receive_telemetry(self, parameters: any = 0) -> list:
(packet_received, packet_list) = self.poll_interface()
if packet_received:
return packet_list
return []
def set_up_socket(self):
"""
Sets up the sockets for the UDP communication.
:return:
"""
try:
self.sock_receive.bind(self.receive_address)
self.sock_receive.setblocking(False)
except OSError:
print("Socket already set-up.")
# print("Socket Receive Address: " + str(g.G_REC_ADDRESS))
# logging.exception("Error: ")
# exit()
except TypeError:
print("Invalid Receive Address")
sys.exit()
def connect_to_board(self):
"""
For UDP, this is used to initiate communication.
:return:
"""
test = bytearray([])
self.send_telecommand(test)
# def receive_telemetry_and_store_info(self, tm_info_queue: PusTmInfoQueueT) -> \
# Union[None, PusTmInfoQueueT]:
# packets = self.receive_telemetry()
# for packet in packets:
# packet_info = packet.pack_tm_information()
# if g.G_PRINT_TM:
# self.tmtc_printer.print_telemetry(packet)
# tm_info_queue.append(packet_info)
# return tm_info_queue
#
# def receive_telemetry_and_store_tm(self, tm_queue: PusTmQueueT) -> Union[None, PusTmQueueT]:
# packets = self.receive_telemetry()
# for packet in packets:
# tm_queue.append(packet)
# return tm_queue
#
# def receive_telemetry_and_store_tuple(self, tm_tuple_queue):
# packet_list = self.receive_telemetry()
# for packet in packet_list:
# packet_info = packet.pack_tm_information()
# tm_tuple = (packet_info, packet)
# if g.G_PRINT_TM:
# self.tmtc_printer.print_telemetry(packet)
# tm_tuple_queue.append(tm_tuple)
# return tm_tuple_queue