diff --git a/comIF/OBSW_ComInterface.py b/comIF/OBSW_ComInterface.py index b5c8373a307c98e8341528d7f4a9fe711850b6a5..21a90ebae70eaf7e943a145fe791557f75f28fb9 100644 --- a/comIF/OBSW_ComInterface.py +++ b/comIF/OBSW_ComInterface.py @@ -8,15 +8,13 @@ Description: Generic Communication Interface. Defines the syntax of the communic @author: R. Mueller """ from abc import abstractmethod -from typing import TypeVar, Tuple, Union, List -from tm.OBSW_PusTm import PusTmT, pusTmQueueT, pusTmTupleQueueT, pusTmInfoQueueT +from typing import TypeVar, Tuple, Union +from tm.OBSW_PusTm import pusTmQueueT, pusTmTupleQueueT, pusTmInfoQueueT, pusTmListT from utility.OBSW_TmTcPrinter import TmTcPrinterT -from tc.OBSW_TcPacket import PusTcT, pusTcInfoT +from tc.OBSW_TcPacket import pusTcInfoT ComIF_T = TypeVar('ComIF_T', bound='CommunicationInterface') -packetListT = List[PusTmT] - class CommunicationInterface: def __init__(self, tmtcPrinter: TmTcPrinterT): @@ -27,7 +25,7 @@ class CommunicationInterface: pass @abstractmethod - def sendTelecommand(self, tcPacket: PusTcT, tcPacketInfo: pusTcInfoT = None) -> None: + def sendTelecommand(self, tcPacket: bytearray, tcPacketInfo: pusTcInfoT = None) -> None: """ Send telecommands :param tcPacket: TC packet to send @@ -37,7 +35,7 @@ class CommunicationInterface: pass @abstractmethod - def receiveTelemetry(self, parameters: any = 0): + def receiveTelemetry(self, parameters: any = 0) -> pusTmListT: """ Returns a list of packets. Most of the time, this will simply call the pollInterface function :param parameters: @@ -47,7 +45,7 @@ class CommunicationInterface: return packetList @abstractmethod - def pollInterface(self, parameters: any = 0) -> Tuple[bool, packetListT]: + def pollInterface(self, parameters: any = 0) -> Tuple[bool, pusTmListT]: """ Poll the interface and return a list of received packets :param parameters: diff --git a/comIF/OBSW_Ethernet_ComIF.py b/comIF/OBSW_Ethernet_ComIF.py index d49b1331e9179285b0f54b86284c2276b6332516..5e3aded6e24ad9a07577fbc0eed7733343d354ff 100644 --- a/comIF/OBSW_Ethernet_ComIF.py +++ b/comIF/OBSW_Ethernet_ComIF.py @@ -6,44 +6,43 @@ @author R. Mueller """ import select +import socket +from typing import Tuple, Union -from comIF.OBSW_ComInterface import CommunicationInterface +from comIF.OBSW_ComInterface import CommunicationInterface, pusTmListT, pusTmQueueT, pusTmTupleQueueT, pusTmInfoQueueT from tm.OBSW_TmPacket import PUSTelemetryFactory +from tc.OBSW_TcPacket import pusTcInfoT +from utility.OBSW_TmTcPrinter import TmTcPrinterT import config.OBSW_Config as g class EthernetComIF(CommunicationInterface): - def __init__(self, tmtcPrinter, tmTimeout, tcTimeoutFactor, sendSocket, recvSocket, sendAddress): + def __init__(self, tmtcPrinter: TmTcPrinterT, tmTimeout: float, tcTimeoutFactor: float, + receiveAddress: g.ethernetAddressT, sendAddress: g.ethernetAddressT): super().__init__(tmtcPrinter) self.tmTimeout = tmTimeout self.tcTimeoutFactor = tcTimeoutFactor - self.sendSocket = sendSocket - self.recvSocket = recvSocket + self.sockSend = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.sockReceive = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.receiveAddress = receiveAddress self.sendAddress = sendAddress self.setUpSocket() def close(self) -> None: pass - def sendTelecommand(self, tcPacket, tcPacketInfo=""): + def sendTelecommand(self, tcPacket: bytearray, tcPacketInfo: pusTcInfoT = None) -> None: self.tmtcPrinter.printTelecommand(tcPacket, tcPacketInfo) - self.sendSocket.sendto(tcPacket, self.sendAddress) + self.sockSend.sendto(tcPacket, self.sendAddress) - def performListenerMode(self): - pollTimeout = 10 - print("Listening for packages ...") - while True: - self.pollInterface(pollTimeout) - - def dataAvailable(self, timeout): - ready = select.select([self.recvSocket], [], [], timeout) + def dataAvailable(self, timeout: float = 0) -> bool: + ready = select.select([self.sockReceive], [], [], timeout) if ready is None: return False elif ready[0]: return ready - # TODO: This still needs to be fixed, this function should return a list of telemetry packets - def pollInterface(self, pollTimeout): + def pollInterface(self, pollTimeout: float = 0) -> Tuple[bool, pusTmListT]: ready = self.dataAvailable(pollTimeout) if ready is False: return False, [] @@ -51,31 +50,40 @@ class EthernetComIF(CommunicationInterface): packetList = self.receiveTelemetry() return True, packetList - def receiveTelemetry(self, parameters=0): - data = self.recvSocket.recvfrom(1024)[0] + def receiveTelemetry(self, parameters: any = 0) -> list: + data = self.sockReceive.recvfrom(1024)[0] packet = PUSTelemetryFactory(data) self.tmtcPrinter.printTelemetry(packet) packetList = [packet] return packetList - def receiveTelemetryAndStoreIntoQueue(self, tmQueue): - packet = self.receiveTelemetry() - tmQueue.put(packet) + def receiveTelemetryAndStoreInfo(self, tmInfoQueue: pusTmInfoQueueT) -> Union[None, pusTmInfoQueueT]: + packets = self.receiveTelemetry() + for packet in packets: + packetInfo = packet.packTmInformation() + self.tmtcPrinter.printTelemetry(packet) + tmInfoQueue.append(packetInfo) + return tmInfoQueue + + def receiveTelemetryAndStoreTm(self, tmQueue: pusTmQueueT) -> Union[None, pusTmQueueT]: + packets = self.receiveTelemetry() + for packet in packets: + tmQueue.append(packet) return tmQueue - def receiveTelemetryAndStoreTuple(self, tmTupleQueue): - data = self.recvSocket.recvfrom(1024)[0] - tmInfo = PUSTelemetryFactory(data).packTmInformation() - packet = PUSTelemetryFactory(data) - self.tmtcPrinter.printTelemetry(packet) - tmTuple = (packet, tmInfo) - tmTupleQueue.put(tmTuple) - return tmTuple + def receiveTelemetryAndStoreTuple(self, tmTupleQueue: pusTmTupleQueueT) -> Union[None, pusTmTupleQueueT]: + packetList = self.receiveTelemetry() + for packet in packetList: + packetInfo = packet.packTmInformation() + tmTuple = (packetInfo, packet) + self.tmtcPrinter.printTelemetry(packet) + tmTupleQueue.append(tmTuple) + return tmTupleQueue def setUpSocket(self): try: - self.recvSocket.bind(g.recAddress) - self.recvSocket.setblocking(False) + self.sockReceive.bind(self.receiveAddress) + self.sockReceive.setblocking(False) except OSError: print("Socket already set-up.") # print("Socket Receive Address: " + str(g.recAddress)) diff --git a/comIF/OBSW_Serial_ComIF.py b/comIF/OBSW_Serial_ComIF.py index 30d5be5b893e52f1a18043b19cdb5f4429f0c5c0..57a38672ac87e510475122b52cb6629a460c5bed 100644 --- a/comIF/OBSW_Serial_ComIF.py +++ b/comIF/OBSW_Serial_ComIF.py @@ -9,18 +9,18 @@ import time import serial import logging from typing import Tuple, List, Union -from comIF.OBSW_ComInterface import CommunicationInterface, pusTmQueueT, packetListT +from comIF.OBSW_ComInterface import CommunicationInterface, pusTmQueueT +from utility.OBSW_TmTcPrinter import TmTcPrinterT from tm.OBSW_TmPacket import PUSTelemetryFactory -from tm.OBSW_PusTm import pusTmInfoQueueT, pusTmTupleQueueT +from tm.OBSW_PusTm import pusTmInfoQueueT, pusTmTupleQueueT, pusTmListT from tc.OBSW_TcPacket import pusTcInfoT SERIAL_PACKET_MAX_SIZE = 1024 class SerialComIF(CommunicationInterface): - def __init__(self, tmtcPrinter, comPort, baudRate, serialTimeout): + def __init__(self, tmtcPrinter: TmTcPrinterT, comPort: str, baudRate: int, serialTimeout: float): super().__init__(tmtcPrinter) - self.tmtcPrinter = tmtcPrinter try: self.serial = serial.Serial(port=comPort, baudrate=baudRate, timeout=serialTimeout) except serial.SerialException: @@ -49,7 +49,7 @@ class SerialComIF(CommunicationInterface): else: return [] - def pollInterface(self, parameter: int = 0) -> Tuple[bool, packetListT]: + def pollInterface(self, parameters: any = 0) -> Tuple[bool, pusTmListT]: if self.dataAvailable(): pusDataList, numberOfPackets = self.pollPusPackets() packetList = [] diff --git a/config/OBSW_Config.py b/config/OBSW_Config.py index 06f4c0106152826850cd17596c0c2f9cad722ab4..3060081c817e31d0674de14e9489728d952beb7b 100644 --- a/config/OBSW_Config.py +++ b/config/OBSW_Config.py @@ -8,6 +8,11 @@ """ import socket import enum +from typing import Tuple, TypeVar + +# Contains IP address as string and port as int +ethernetAddressT = Tuple[str, int] +socketType = TypeVar['socketType'] """ @@ -57,8 +62,6 @@ tcSendTimeoutFactor = 2.0 # Ethernet connection settings recAddress = 0 sendAddress = (0, 0) -sockSend = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) -sockReceive = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Print Settings printToFile = False diff --git a/tm/OBSW_PusTm.py b/tm/OBSW_PusTm.py index 7e622c1627eb8ef0981fe55def6ba6b5839c5c62..797ab6cd797c32a0e6a41bad429a40c1801b9aa5 100644 --- a/tm/OBSW_PusTm.py +++ b/tm/OBSW_PusTm.py @@ -1,11 +1,12 @@ from tm.OBSW_PusPacket import OBSWPusPacket -from typing import TypeVar, Dict, Tuple, Deque, Union +from typing import TypeVar, Dict, Tuple, Deque, List PusTmT = TypeVar('PusTmT', bound='PUSTelemetry') pusTmInfoT = Dict[str, any] pusTmTupleT = Tuple[pusTmInfoT, PusTmT] pusTmQueueT = Deque[PusTmT] +pusTmListT = List[PusTmT] pusTmInfoQueueT = Deque[pusTmInfoT] pusTmTupleQueueT = Deque[pusTmTupleT]