diff --git a/comIF/OBSW_ComInterface.py b/comIF/OBSW_ComInterface.py deleted file mode 100644 index 7f9af565b35566e8ac2acfed04f8a1a87854cf2a..0000000000000000000000000000000000000000 --- a/comIF/OBSW_ComInterface.py +++ /dev/null @@ -1,99 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Program: OBSW_ComInterface.py -Date: 01.11.2019 -Description: Generic Communication Interface. Defines the syntax of the communication functions. - Abstract methods must be implemented by child class (e.g. Ethernet Com IF) - -@author: R. Mueller -""" -from abc import abstractmethod -from typing import TypeVar, Tuple, Union -from tm.OBSW_PusTm import pusTmQueueT, pusTmTupleQueueT, pusTmInfoQueueT, pusTmListT -from utility.obsw_tmtc_printer import TmTcPrinterT -from tc.OBSW_TcPacket import pusTcInfoT - -ComIF_T = TypeVar('ComIF_T', bound='CommunicationInterface') - - -class CommunicationInterface: - def __init__(self, tmtcPrinter: TmTcPrinterT): - self.tmtcPrinter = tmtcPrinter - - @abstractmethod - def close(self) -> None: - pass - - @abstractmethod - def sendTelecommand(self, tcPacket: bytearray, tcPacketInfo: pusTcInfoT = None) -> None: - """ - Send telecommands - :param tcPacket: TC packet to send - :param tcPacketInfo: TC packet information - :return: None for now - """ - pass - - @abstractmethod - def receiveTelemetry(self, parameters: any = 0) -> pusTmListT: - """ - Returns a list of packets. Most of the time, this will simply call the pollInterface function - :param parameters: - :return: - """ - packetList = [] - return packetList - - @abstractmethod - def pollInterface(self, parameters: any = 0) -> Tuple[bool, pusTmListT]: - """ - Poll the interface and return a list of received packets - :param parameters: - :return: Tuple: boolean which specifies wheather a packet was received, and the packet list containing - Tm packets - """ - pass - - @abstractmethod - def dataAvailable(self, parameters: any): - """ - Check whether data is available - :param parameters: - :return: - """ - pass - - def receiveTelemetryAndStoreInfo(self, tmInfoQueue: pusTmInfoQueueT) -> Union[None, pusTmInfoQueueT]: - """ - Receive telemetry and store packet information in dict format into a queue. - If no data is available, don't do anything. - :param tmInfoQueue: - :return: - """ - print("No child implementation provided yet!") - return None - - def receiveTelemetryAndStoreTm(self, tmQueue: pusTmQueueT) -> Union[None, pusTmQueueT]: - """ - Receive telemetry packets and store TM object into a queue. - If no data is available, don't do anything. - :param tmQueue: - :return: - """ - print("No child implementation provided yet!") - return None - - def receiveTelemetryAndStoreTuple(self, tmTupleQueue: pusTmTupleQueueT) -> Union[None, pusTmTupleQueueT]: - """ - Poll telemetry and store a tuple consisting of TM information and the TM packet into the queue. - If no data is available, don't do anything. - :param tmTupleQueue: - :return: - """ - print("No child implementation provided yet!") - return None - - - - - diff --git a/comIF/OBSW_DummyComIF.py b/comIF/OBSW_DummyComIF.py deleted file mode 100644 index 24795c98aea233a2dbfdc104946a34d16435697d..0000000000000000000000000000000000000000 --- a/comIF/OBSW_DummyComIF.py +++ /dev/null @@ -1,27 +0,0 @@ -""" -@file OBSW_DummyComIF.py -@date 09.03.2020 -@brief Dummy Communication Interface - -@author R. Mueller -""" -from typing import Tuple - -from comIF.OBSW_ComInterface import CommunicationInterface - - -class DummyComIF(CommunicationInterface): - def close(self) -> None: - pass - - def dataAvailable(self, parameters): - pass - - def pollInterface(self, parameters: any = 0) -> Tuple[bool, list]: - pass - - def receiveTelemetry(self, parameters: any = 0): - pass - - def sendTelecommand(self, tcPacket, tcPacketInfo="") -> None: - pass diff --git a/comIF/OBSW_Ethernet_ComIF.py b/comIF/OBSW_Ethernet_ComIF.py deleted file mode 100644 index 010c0ab29b6b3eca2a41ae17bf96f5df25f67374..0000000000000000000000000000000000000000 --- a/comIF/OBSW_Ethernet_ComIF.py +++ /dev/null @@ -1,105 +0,0 @@ -""" -@file OBSW_Ethernet_ComIF.py -@date 01.11.2019 -@brief Ethernet Communication Interface - -@author R. Mueller -""" -import select -import socket -from typing import Tuple, Union - -from comIF.OBSW_ComInterface import CommunicationInterface, pusTmListT, pusTmQueueT, pusTmTupleQueueT, pusTmInfoQueueT -from tm.OBSW_TmPacket import PUSTelemetryFactory -from tc.OBSW_TcPacket import pusTcInfoT -from utility.obsw_tmtc_printer import TmTcPrinterT -import config.OBSW_Config as g - - -class EthernetComIF(CommunicationInterface): - def __init__(self, tmtcPrinter: TmTcPrinterT, tmTimeout: float, tcTimeoutFactor: float, - receiveAddress: g.ethernetAddressT, sendAddress: g.ethernetAddressT): - super().__init__(tmtcPrinter) - self.tmTimeout = tmTimeout - self.tcTimeoutFactor = tcTimeoutFactor - self.sockSend = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - self.sockReceive = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - self.receiveAddress = receiveAddress - self.sendAddress = sendAddress - self.setUpSocket() - - def close(self) -> None: - pass - - def sendTelecommand(self, tcPacket: bytearray, tcPacketInfo: pusTcInfoT = None) -> None: - self.tmtcPrinter.print_telecommand(tcPacket, tcPacketInfo) - self.sockSend.sendto(tcPacket, self.sendAddress) - - def dataAvailable(self, timeout: float = 0) -> bool: - ready = select.select([self.sockReceive], [], [], timeout) - if ready is None: - return False - elif ready[0]: - return ready - - def pollInterface(self, pollTimeout: float = 0) -> Tuple[bool, pusTmListT]: - ready = self.dataAvailable(pollTimeout) - if ready: - data = self.sockReceive.recvfrom(1024)[0] - packet = PUSTelemetryFactory(data) - self.tmtcPrinter.print_telemetry(packet) - packetList = [packet] - return True, packetList - else: - return False, [] - - def receiveTelemetry(self, parameters: any = 0) -> list: - (packetReceived, packetList) = self.pollInterface() - if packetReceived: - return packetList - else: - return [] - - def receiveTelemetryAndStoreInfo(self, tmInfoQueue: pusTmInfoQueueT) -> Union[None, pusTmInfoQueueT]: - packets = self.receiveTelemetry() - for packet in packets: - packetInfo = packet.packTmInformation() - self.tmtcPrinter.print_telemetry(packet) - tmInfoQueue.append(packetInfo) - return tmInfoQueue - - def receiveTelemetryAndStoreTm(self, tmQueue: pusTmQueueT) -> Union[None, pusTmQueueT]: - packets = self.receiveTelemetry() - for packet in packets: - tmQueue.append(packet) - return tmQueue - - def receiveTelemetryAndStoreTuple(self, tmTupleQueue: pusTmTupleQueueT) -> Union[None, pusTmTupleQueueT]: - packetList = self.receiveTelemetry() - for packet in packetList: - packetInfo = packet.packTmInformation() - tmTuple = (packetInfo, packet) - self.tmtcPrinter.print_telemetry(packet) - tmTupleQueue.append(tmTuple) - return tmTupleQueue - - def setUpSocket(self): - try: - self.sockReceive.bind(self.receiveAddress) - self.sockReceive.setblocking(False) - except OSError: - print("Socket already set-up.") - # print("Socket Receive Address: " + str(g.G_REC_ADDRESS)) - # logging.exception("Error: ") - # exit() - except TypeError: - print("Invalid Receive Address") - exit() - - def connectToBoard(self): - # Maybe there is a cleaner way to start comm with udp server - # so that it knows the ip address? - test = bytearray([]) - self.sendTelecommand(test) - # send multiple times to get connection if there are problems - diff --git a/comIF/OBSW_Serial_ComIF.py b/comIF/OBSW_Serial_ComIF.py deleted file mode 100644 index a721ff5227b0e9e86a90dc73d74d846136a13ca9..0000000000000000000000000000000000000000 --- a/comIF/OBSW_Serial_ComIF.py +++ /dev/null @@ -1,137 +0,0 @@ -""" -@file OBSW_Serial_ComIF.py -@date 01.11.2019 -@brief Serial Communication Interface - -@author R. Mueller -""" -import time -import serial -import logging -from typing import Tuple, List, Union, Optional -from comIF.OBSW_ComInterface import CommunicationInterface, pusTmQueueT -from utility.obsw_tmtc_printer import TmTcPrinterT -from tm.OBSW_TmPacket import PUSTelemetryFactory -from tm.OBSW_PusTm import pusTmInfoQueueT, pusTmTupleQueueT, pusTmListT -from tc.OBSW_TcPacket import pusTcInfoT - -SERIAL_PACKET_MAX_SIZE = 1024 - - -class SerialComIF(CommunicationInterface): - def __init__(self, tmtcPrinter: TmTcPrinterT, comPort: str, baudRate: int, serialTimeout: float): - super().__init__(tmtcPrinter) - try: - self.serial = serial.Serial(port=comPort, baudrate=baudRate, timeout=serialTimeout) - except serial.SerialException: - print("Serial Port can not be opened") - logging.exception("Error") - exit() - self.data = bytearray() - self.numberOfPackets = 0 - - def close(self): - try: - self.serial.close() - except serial.SerialException: - print("Serial Port could not be closed !") - logging.exception("Error") - exit() - - def sendTelecommand(self, tcPacket: bytearray, tcPacketInfo: pusTcInfoT = None): - self.tmtcPrinter.print_telecommand(tcPacket, tcPacketInfo) - self.serial.write(tcPacket) - - def receiveTelemetry(self, parameters: any = 0) -> list: - (packetReceived, packetList) = self.pollInterface() - if packetReceived: - return packetList - else: - return [] - - def pollInterface(self, parameters: any = 0) -> Tuple[bool, pusTmListT]: - if self.dataAvailable(): - pusDataList, numberOfPackets = self.pollPusPackets() - packetList = [] - for counter in range(0, numberOfPackets): - packet = PUSTelemetryFactory(pusDataList[counter]) - self.tmtcPrinter.print_telemetry(packet) - packetList.append(packet) - return True, packetList - else: - return False, [] - - def dataAvailable(self, timeout: float = 0) -> bool: - if self.serial.in_waiting > 0: - return True - elif timeout > 0: - start_time = time.time() - elapsed_time = 0 - while elapsed_time < timeout: - if self.serial.in_waiting > 0: - return True - elapsed_time = time.time() - start_time - return False - - def pollPusPackets(self) -> Tuple[List[Optional[bytes]], int]: - pusDataList = [] - self.data = self.serial.read(1024) - packetSize = (self.data[4] << 8 | self.data[5]) + 7 - readSize = len(self.data) - self.numberOfPackets = 1 - if readSize < packetSize: - print("Serial Com IF: Size missmatch when polling PUS packet. Packet Size: " + - str(packetSize) + ". Read Size: " + str(readSize) + ". Check timeout too") - if readSize > packetSize: - self.handleMultiplePackets(packetSize, readSize, pusDataList) - else: - pusDataList.append(self.data) - return pusDataList, self.numberOfPackets - - def handleMultiplePackets(self, packetSize: int, readSize: int, pusDataList: list): - endOfBuffer = readSize - 1 - endIndex = packetSize - startIndex = 0 - pusData = self.data[startIndex:endIndex] - pusDataList.append(pusData) - while endIndex < endOfBuffer: - endIndex = self.parseNextPackets(endIndex, pusDataList) - - def parseNextPackets(self, endIndex: int, pusDataList: list) -> int: - startIndex = endIndex - endIndex = endIndex + 5 - nextPacketSize = (self.data[endIndex - 1] << 8 | self.data[endIndex]) + 7 - if nextPacketSize > SERIAL_PACKET_MAX_SIZE: - print("PUS Polling: Very Large packet detected, large packet reading not implemented yet !") - print("Detected Size: " + str(nextPacketSize)) - return SERIAL_PACKET_MAX_SIZE - endIndex = startIndex + nextPacketSize - pusData = self.data[startIndex:endIndex] - pusDataList.append(pusData) - self.numberOfPackets = self.numberOfPackets + 1 - return endIndex - - def receiveTelemetryAndStoreTm(self, tmQueue: pusTmQueueT) -> Union[None, pusTmQueueT]: - (packetReceived, pusPackets) = self.pollInterface() - if packetReceived: - for packet in pusPackets: - tmQueue.append(packet) - return tmQueue - - def receiveTelemetryAndStoreInfo(self, tmInfoQueue: pusTmInfoQueueT) -> Union[None, pusTmInfoQueueT]: - (packetReceived, pusPackets) = self.pollInterface() - if packetReceived: - for packet in pusPackets: - tmInfo = packet.packTmInformation() - tmInfoQueue.append(tmInfo) - return tmInfoQueue - - def receiveTelemetryAndStoreTuple(self, tmTupleQueue: pusTmTupleQueueT) -> Union[None, pusTmTupleQueueT]: - (packetReceived, pusPackets) = self.pollInterface() - if packetReceived: - for packet in pusPackets: - tmInfo = packet.packTmInformation() - tmTupleQueue.append((tmInfo, packet)) - return tmTupleQueue - - diff --git a/comIF/obsw_com_interface.py b/comIF/obsw_com_interface.py new file mode 100644 index 0000000000000000000000000000000000000000..d6761c3393ff2a8303c3830ac97a407501ea5341 --- /dev/null +++ b/comIF/obsw_com_interface.py @@ -0,0 +1,105 @@ +# -*- coding: utf-8 -*- +""" +Program: obsw_com_interface.py +Date: 01.11.2019 +Description: Generic Communication Interface. Defines the syntax of the communication functions. + Abstract methods must be implemented by child class (e.g. Ethernet Com IF) + +@author: R. Mueller +""" +from abc import abstractmethod +from typing import TypeVar, Tuple, Union +from tm.OBSW_PusTm import pusTmQueueT, pusTmTupleQueueT, pusTmInfoQueueT, pusTmListT +from utility.obsw_tmtc_printer import TmTcPrinterT +from tc.OBSW_TcPacket import pusTcInfoT + +ComIfT = TypeVar('ComIfT', bound='CommunicationInterface') + + +# pylint: disable=useless-return +# pylint: disable=no-self-use +# pylint: disable=unused-argument +class CommunicationInterface: + """ + Generic form of a communication interface to separate communication logic from + the underlying interface. + """ + def __init__(self, tmtc_printer: TmTcPrinterT): + self.tmtc_printer = tmtc_printer + + @abstractmethod + def close(self) -> None: + """ + Closes the ComIF and releases any held resources (for example a Communication Port) + :return: + """ + + @abstractmethod + def send_telecommand(self, tc_packet: bytearray, tc_packet_info: pusTcInfoT = None) -> None: + """ + Send telecommands + :param tc_packet: TC packet to send + :param tc_packet_info: TC packet information + :return: None for now + """ + + @abstractmethod + def receive_telemetry(self, parameters: any = 0) -> pusTmListT: + """ + Returns a list of packets. Most of the time, + this will simply call the pollInterface function + :param parameters: + :return: + """ + packet_list = [] + return packet_list + + @abstractmethod + def poll_interface(self, parameters: any = 0) -> Tuple[bool, pusTmListT]: + """ + Poll the interface and return a list of received packets + :param parameters: + :return: Tuple: boolean which specifies wheather a packet was received, + and the packet list containing + Tm packets + """ + + @abstractmethod + def data_available(self, parameters: any) -> bool: + """ + Check whether data is available + :param parameters: + :return: + """ + + def receive_telemetry_and_store_info(self, tm_info_queue: pusTmInfoQueueT) -> \ + Union[None, pusTmInfoQueueT]: + """ + Receive telemetry and store packet information in dict format into a queue. + If no data is available, don't do anything. + :param tm_info_queue: + :return: + """ + print("No child implementation provided yet!") + return + + def receive_telemetry_and_store_tm(self, tm_queue: pusTmQueueT) -> Union[None, pusTmQueueT]: + """ + Receive telemetry packets and store TM object into a queue. + If no data is available, don't do anything. + :param tm_queue: + :return: + """ + print("No child implementation provided yet!") + return + + def receive_telemetry_and_store_tuple(self, tm_tuple_queue: pusTmTupleQueueT) -> \ + Union[None, pusTmTupleQueueT]: + """ + Poll telemetry and store a tuple consisting of TM information and the + TM packet into the queue. If no data is available, don't do anything. + :param tm_tuple_queue: + :return: + """ + print("No child implementation provided yet!") + return diff --git a/comIF/obsw_dummy_com_if.py b/comIF/obsw_dummy_com_if.py new file mode 100644 index 0000000000000000000000000000000000000000..0f3eacee482a8d898a704979ae7d9ecdabc7685e --- /dev/null +++ b/comIF/obsw_dummy_com_if.py @@ -0,0 +1,27 @@ +""" +@file obsw_dummy_com_if.py +@date 09.03.2020 +@brief Dummy Communication Interface + +@author R. Mueller +""" +from typing import Tuple + +from comIF.obsw_com_interface import CommunicationInterface + + +class DummyComIF(CommunicationInterface): + def close(self) -> None: + pass + + def data_available(self, parameters): + pass + + def poll_interface(self, parameters: any = 0) -> Tuple[bool, list]: + pass + + def receive_telemetry(self, parameters: any = 0): + pass + + def send_telecommand(self, tcPacket, tcPacketInfo="") -> None: + pass diff --git a/comIF/obsw_ethernet_com_if.py b/comIF/obsw_ethernet_com_if.py new file mode 100644 index 0000000000000000000000000000000000000000..c17faf08e6b46df7b9751495f3fbb686f242741f --- /dev/null +++ b/comIF/obsw_ethernet_com_if.py @@ -0,0 +1,116 @@ +""" +@file obsw_ethernet_com_if.py +@date 01.11.2019 +@brief Ethernet Communication Interface + +@author R. Mueller +""" +import select +import socket +import sys +from typing import Tuple, Union + +from comIF.obsw_com_interface import CommunicationInterface, pusTmListT, pusTmQueueT, \ + pusTmTupleQueueT, pusTmInfoQueueT +from tm.OBSW_TmPacket import PUSTelemetryFactory +from tc.OBSW_TcPacket import pusTcInfoT +from utility.obsw_tmtc_printer import TmTcPrinterT +import config.OBSW_Config as g + + +# pylint: disable=abstract-method +# pylint: disable=arguments-differ +# pylint: disable=too-many-arguments +class EthernetComIF(CommunicationInterface): + """ + Communication interface for UDP communication. + """ + def __init__(self, tmtc_printer: TmTcPrinterT, tmTimeout: float, tcTimeoutFactor: float, + receiveAddress: g.ethernetAddressT, sendAddress: g.ethernetAddressT): + super().__init__(tmtc_printer) + self.tm_timeout = tmTimeout + self.tc_timeout_factor = tcTimeoutFactor + self.sock_send = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.sock_receive = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.receive_address = receiveAddress + self.send_address = sendAddress + self.set_up_socket() + + def close(self) -> None: + pass + + def send_telecommand(self, tc_packet: bytearray, tc_packet_info: pusTcInfoT = None) -> None: + self.tmtc_printer.print_telecommand(tc_packet, tc_packet_info) + self.sock_send.sendto(tc_packet, self.send_address) + + def data_available(self, timeout: float = 0) -> bool: + ready = select.select([self.sock_receive], [], [], timeout) + if ready is None: + return False + return ready + + def poll_interface(self, pollTimeout: float = 0) -> Tuple[bool, pusTmListT]: + ready = self.data_available(pollTimeout) + if ready: + data = self.sock_receive.recvfrom(1024)[0] + packet = PUSTelemetryFactory(data) + self.tmtc_printer.print_telemetry(packet) + packet_list = [packet] + return True, packet_list + return False, [] + + def receive_telemetry(self, parameters: any = 0) -> list: + (packet_received, packet_list) = self.poll_interface() + if packet_received: + return packet_list + return [] + + def receive_telemetry_and_store_info(self, tmInfoQueue: pusTmInfoQueueT) -> \ + Union[None, pusTmInfoQueueT]: + packets = self.receive_telemetry() + for packet in packets: + packet_info = packet.packTmInformation() + self.tmtc_printer.print_telemetry(packet) + tmInfoQueue.append(packet_info) + return tmInfoQueue + + def receive_telemetry_and_store_tm(self, tmQueue: pusTmQueueT) -> Union[None, pusTmQueueT]: + packets = self.receive_telemetry() + for packet in packets: + tmQueue.append(packet) + return tmQueue + + def receive_telemetry_and_store_tuple(self, tmTupleQueue: pusTmTupleQueueT) -> \ + Union[None, pusTmTupleQueueT]: + packet_list = self.receive_telemetry() + for packet in packet_list: + packet_info = packet.packTmInformation() + tm_tuple = (packet_info, packet) + self.tmtc_printer.print_telemetry(packet) + tmTupleQueue.append(tm_tuple) + return tmTupleQueue + + def set_up_socket(self): + """ + Sets up the sockets for the UDP communication. + :return: + """ + try: + self.sock_receive.bind(self.receive_address) + self.sock_receive.setblocking(False) + except OSError: + print("Socket already set-up.") + # print("Socket Receive Address: " + str(g.G_REC_ADDRESS)) + # logging.exception("Error: ") + # exit() + except TypeError: + print("Invalid Receive Address") + sys.exit() + + def connect_to_board(self): + """ + For UDP, this is used to initiate communication. + :return: + """ + test = bytearray([]) + self.send_telecommand(test) diff --git a/comIF/obsw_serial_com_if.py b/comIF/obsw_serial_com_if.py new file mode 100644 index 0000000000000000000000000000000000000000..93799a2eedeb5df8e0a9782fbf24d55be77ffd25 --- /dev/null +++ b/comIF/obsw_serial_com_if.py @@ -0,0 +1,145 @@ +""" +@file obsw_serial_com_if.py +@date 01.11.2019 +@brief Serial Communication Interface + +@author R. Mueller +""" +import sys +import time +import logging +from typing import Tuple, List, Union, Optional + +import serial +from comIF.obsw_com_interface import CommunicationInterface, pusTmQueueT +from utility.obsw_tmtc_printer import TmTcPrinterT +from tm.OBSW_TmPacket import PUSTelemetryFactory +from tm.OBSW_PusTm import pusTmInfoQueueT, pusTmTupleQueueT, pusTmListT +from tc.OBSW_TcPacket import pusTcInfoT + +SERIAL_PACKET_MAX_SIZE = 1024 +HEADER_BYTES_BEFORE_SIZE = 5 + + +# pylint: disable=arguments-differ +class SerialComIF(CommunicationInterface): + """ + Communication Interface to use serial communication. This requires the PySerial library + on Windows. It has been tested only on Windows. + """ + def __init__(self, tmtcPrinter: TmTcPrinterT, comPort: str, baudRate: int, + serialTimeout: float): + super().__init__(tmtcPrinter) + try: + self.serial = serial.Serial(port=comPort, baudrate=baudRate, timeout=serialTimeout) + except serial.SerialException: + print("Serial Port can not be opened") + logging.exception("Error") + sys.exit() + self.data = bytearray() + self.number_of_packets = 0 + + def close(self): + try: + self.serial.close() + except serial.SerialException: + print("Serial Port could not be closed !") + logging.exception("Error") + sys.exit() + + def send_telecommand(self, tc_packet: bytearray, tc_packet_info: pusTcInfoT = None) -> None: + self.tmtc_printer.print_telecommand(tc_packet, tc_packet_info) + self.serial.write(tc_packet) + + def receive_telemetry(self, parameters: any = 0) -> list: + (packet_received, packet_list) = self.poll_interface() + if packet_received: + return packet_list + return [] + + def poll_interface(self, parameters: any = 0) -> Tuple[bool, pusTmListT]: + if self.data_available(): + pus_data_list, number_of_packets = self.__poll_pus_packets() + packet_list = [] + for counter in range(0, number_of_packets): + packet = PUSTelemetryFactory(pus_data_list[counter]) + self.tmtc_printer.print_telemetry(packet) + packet_list.append(packet) + return True, packet_list + return False, [] + + def data_available(self, timeout: float = 0) -> bool: + if self.serial.in_waiting > 0: + return True + if timeout > 0: + start_time = time.time() + elapsed_time = 0 + while elapsed_time < timeout: + if self.serial.in_waiting > 0: + return True + elapsed_time = time.time() - start_time + return False + + def __poll_pus_packets(self) -> Tuple[List[Optional[bytes]], int]: + pus_data_list = [] + self.data = self.serial.read(1024) + packet_size = (self.data[4] << 8 | self.data[5]) + 7 + read_size = len(self.data) + self.number_of_packets = 1 + if read_size < packet_size: + print("Serial Com IF: Size missmatch when polling PUS packet. Packet Size: " + + str(packet_size) + ". Read Size: " + str(read_size) + ". Check timeout too") + if read_size > packet_size: + self.__handle_multiple_packets(packet_size, read_size, pus_data_list) + else: + pus_data_list.append(self.data) + return pus_data_list, self.number_of_packets + + def __handle_multiple_packets(self, packet_size: int, read_size: int, pus_data_list: list): + end_of_buffer = read_size - 1 + end_index = packet_size + start_index = 0 + pus_data = self.data[start_index:end_index] + pus_data_list.append(pus_data) + while end_index < end_of_buffer: + end_index = self.__parse_next_packets(end_index, pus_data_list) + + def __parse_next_packets(self, end_index: int, pus_data_list: list) -> int: + start_index = end_index + end_index = end_index + 5 + next_packet_size = (self.data[end_index - 1] << 8 | self.data[end_index]) + 7 + if next_packet_size > SERIAL_PACKET_MAX_SIZE: + print("PUS Polling: Very Large packet detected, " + "large packet reading not implemented yet !") + print("Detected Size: " + str(next_packet_size)) + return SERIAL_PACKET_MAX_SIZE + end_index = start_index + next_packet_size + pus_data = self.data[start_index:end_index] + pus_data_list.append(pus_data) + self.number_of_packets = self.number_of_packets + 1 + return end_index + + def receive_telemetry_and_store_tm(self, tmQueue: pusTmQueueT) -> Union[None, pusTmQueueT]: + (packet_received, pus_packets) = self.poll_interface() + if packet_received: + for packet in pus_packets: + tmQueue.append(packet) + return tmQueue + + def receive_telemetry_and_store_info(self, tmInfoQueue: pusTmInfoQueueT) -> \ + Union[None, pusTmInfoQueueT]: + (packet_received, pus_packets) = self.poll_interface() + if packet_received: + for packet in pus_packets: + tm_info = packet.packTmInformation() + tmInfoQueue.append(tm_info) + return tmInfoQueue + + def receive_telemetry_and_store_tuple(self, tm_tuple_queue: pusTmTupleQueueT) -> \ + Union[None, pusTmTupleQueueT]: + (packet_received, pus_packets) = self.poll_interface() + if packet_received: + for packet in pus_packets: + tm_info = packet.packTmInformation() + tm_tuple_queue.append((tm_info, packet)) + return tm_tuple_queue diff --git a/obsw_tmtc_client.py b/obsw_tmtc_client.py index 5dab45b0fff247c5d3a183e50ae6896271838dea..57171b64a53737d7dc26d625be4edb4701c8bf5c 100644 --- a/obsw_tmtc_client.py +++ b/obsw_tmtc_client.py @@ -65,16 +65,16 @@ from config.OBSW_Config import setGlobals from tc.OBSW_TcPacker import PUSTelecommand, createTotalTcQueue, serviceTestSelect from sendreceive.obsw_single_command_sender_receiver import SingleCommandSenderReceiver -from sendreceive.OBSW_SequentialSenderReceiver import SequentialCommandSenderReceiver +from sendreceive.obsw_sequential_sender_receiver import SequentialCommandSenderReceiver from sendreceive.obsw_tm_listener import TmListener -from utility.OBSW_ArgParser import parseInputArguments +from utility.obsw_args_parser import parse_input_arguments from utility.obsw_tmtc_printer import TmTcPrinter, TmTcPrinterT -from utility.OBSW_ExitHandler import keyboardInterruptHandler +from utility.obsw_exit_handler import keyboard_interrupt_handler -from comIF.OBSW_Ethernet_ComIF import EthernetComIF -from comIF.OBSW_Serial_ComIF import SerialComIF -from comIF.OBSW_ComInterface import ComIF_T +from comIF.obsw_ethernet_com_if import EthernetComIF +from comIF.obsw_serial_com_if import SerialComIF +from comIF.obsw_com_interface import ComIfT from gui.OBSW_TmtcGUI import TmTcGUI from gui.OBSW_BackendTest import TmTcBackend @@ -85,7 +85,7 @@ def main(): Runs the TMTC client, depending on input arguments :return: """ - args = parseInputArguments() + args = parse_input_arguments() setGlobals(args) tmtc_printer = TmTcPrinter(g.G_DISPLAY_MODE, g.G_PRINT_TO_FILE, True) if g.G_MODE_ID == g.ModeList.GUIMode: @@ -99,7 +99,7 @@ def main(): sys.exit() else: communication_interface = set_communication_interface(tmtc_printer) - atexit.register(keyboardInterruptHandler, comInterface=communication_interface) + atexit.register(keyboard_interrupt_handler, comInterface=communication_interface) tm_listener = TmListener( comInterface=communication_interface, tmTimeout=g.G_TM_TIMEOUT, tcTimeoutFactor=g.G_TC_SEND_TIMEOUT_FACTOR) @@ -120,14 +120,14 @@ def main(): comInterface=communication_interface, tmtcPrinter=tmtc_printer, tmTimeout=g.G_TM_TIMEOUT, tcQueue=serviceTestSelect(g.G_SERVICE, service_queue), tcTimeoutFactor=g.G_TC_SEND_TIMEOUT_FACTOR, tmListener=tm_listener) - sender_and_receiver.sendQueueTcAndReceiveTmSequentially() + sender_and_receiver.send_queue_tc_and_receive_tm_sequentially() elif g.G_MODE_ID == g.ModeList.SoftwareTestMode: all_tc_queue = createTotalTcQueue() sender_and_receiver = SequentialCommandSenderReceiver( comInterface=communication_interface, tmtcPrinter=tmtc_printer, tmTimeout=g.G_TM_TIMEOUT, tcQueue=all_tc_queue, tcTimeoutFactor=g.G_TC_SEND_TIMEOUT_FACTOR, tmListener=tm_listener) - sender_and_receiver.sendQueueTcAndReceiveTmSequentially() + sender_and_receiver.send_queue_tc_and_receive_tm_sequentially() elif g.G_MODE_ID == g.ModeList.UnitTest: # Set up test suite and run it with runner. Verbosity specifies detail level @@ -163,7 +163,7 @@ def command_preparation(): return command.packCommandTuple() -def set_communication_interface(tmtc_printer: TmTcPrinterT) -> ComIF_T: +def set_communication_interface(tmtc_printer: TmTcPrinterT) -> ComIfT: """ Return the desired communication interface object :param tmtc_printer: TmTcPrinter object. @@ -172,7 +172,7 @@ def set_communication_interface(tmtc_printer: TmTcPrinterT) -> ComIF_T: try: if g.G_COM_IF == g.ComIF.Ethernet: communication_interface = EthernetComIF( - tmtcPrinter=tmtc_printer, tmTimeout=g.G_TM_TIMEOUT, + tmtc_printer=tmtc_printer, tmTimeout=g.G_TM_TIMEOUT, tcTimeoutFactor=g.G_TC_SEND_TIMEOUT_FACTOR, sendAddress=g.G_SEND_ADDRESS, receiveAddress=g.G_REC_ADDRESS) else: diff --git a/sendreceive/OBSW_MultipleCommandsSenderReceiver.py b/sendreceive/OBSW_MultipleCommandsSenderReceiver.py index 2b86b4e1a229f20a9d5238ea81b5b5921eb66736..4b730e240e4fb0547c85abc82cfcc08451908bd0 100644 --- a/sendreceive/OBSW_MultipleCommandsSenderReceiver.py +++ b/sendreceive/OBSW_MultipleCommandsSenderReceiver.py @@ -10,8 +10,8 @@ import time from typing import Union, Deque from collections import deque -from sendreceive.OBSW_SequentialSenderReceiver import SequentialCommandSenderReceiver -from comIF.OBSW_ComInterface import ComIF_T +from sendreceive.obsw_sequential_sender_receiver import SequentialCommandSenderReceiver +from comIF.obsw_com_interface import ComIfT from utility.obsw_tmtc_printer import TmTcPrinterT from sendreceive.obsw_tm_listener import TmListenerT import config.OBSW_Config as g @@ -22,7 +22,7 @@ class MultipleCommandSenderReceiver(SequentialCommandSenderReceiver): Difference to seqential sender: This class can send TCs in bursts. Wait intervals can be specified with wait time between the send bursts. This is generally done in the separate test classes in UnitTest """ - def __init__(self, comInterface: ComIF_T, tmtcPrinter: TmTcPrinterT, tcQueue: Deque, + def __init__(self, comInterface: ComIfT, tmtcPrinter: TmTcPrinterT, tcQueue: Deque, tmListener: TmListenerT, tmTimeout: float, waitIntervals: list, waitTime: Union[float, list], printTm: bool, tcTimeoutFactor: float): """ @@ -38,7 +38,6 @@ class MultipleCommandSenderReceiver(SequentialCommandSenderReceiver): :param waitTime: List of wait times or uniform wait time as float :param printTm: :param tcTimeoutFactor: - :param doPrintToFile: """ super().__init__(comInterface=comInterface, tmtcPrinter=tmtcPrinter, tmListener=tmListener, tmTimeout=tmTimeout, tcQueue=tcQueue, tcTimeoutFactor=tcTimeoutFactor) @@ -51,7 +50,7 @@ class MultipleCommandSenderReceiver(SequentialCommandSenderReceiver): self.pusPacket = [] self.waitCounter = 0 - def sendTcQueueAndReturnInfo(self): + def send_tc_queue_and_return_info(self): try: self._tm_listener.modeId = g.ModeList.UnitTest self._tm_listener.modeChangeEvent.set() @@ -61,7 +60,6 @@ class MultipleCommandSenderReceiver(SequentialCommandSenderReceiver): self.tmInfoQueue = self.retrieveListenerTmInfoQueue() self._tm_listener.modeOpFinished.set() if g.G_PRINT_TO_FILE: - print("blub") self._tmtc_printer.print_to_file() except (KeyboardInterrupt, SystemExit): print("Keyboard Interrupt or System Exit detected") @@ -69,23 +67,23 @@ class MultipleCommandSenderReceiver(SequentialCommandSenderReceiver): return self.tcInfoQueue, self.tmInfoQueue - def handleTcResending(self): - while not self.__allRepliesReceived: - if self._tcQueue.empty(): + def __handle_tc_resending(self): + while not self.__all_replies_received: + if self._tc_queue.empty(): if self._start_time == 0: self._start_time = time.time() self._check_for_timeout() def sendAllQueue(self): - while not self._tcQueue.__len__() == 0: + while not self._tc_queue.__len__() == 0: self.sendAndPrintTc() def sendAndPrintTc(self): - tc_queue_tuple = self._tcQueue.pop() + tc_queue_tuple = self._tc_queue.pop() if self.check_queue_entry(tc_queue_tuple): pus_packet, pus_packet_info = tc_queue_tuple self.tcInfoQueue.append(pus_packet_info) - self._com_interface.sendTelecommand(pus_packet, pus_packet_info) + self._com_interface.send_telecommand(pus_packet, pus_packet_info) self.handleWaiting() def handleWaiting(self): diff --git a/sendreceive/obsw_command_sender_receiver.py b/sendreceive/obsw_command_sender_receiver.py index 36fcb7ad84ce26f9e65bedd764dbc58c72eb9b32..37ce5d9b801a921e06dd032de9925cda5614902b 100644 --- a/sendreceive/obsw_command_sender_receiver.py +++ b/sendreceive/obsw_command_sender_receiver.py @@ -13,7 +13,7 @@ if the first reply has not been received. """ import sys import time -from comIF.OBSW_ComInterface import CommunicationInterface, ComIF_T +from comIF.obsw_com_interface import CommunicationInterface, ComIfT from utility.obsw_tmtc_printer import TmTcPrinterT, TmTcPrinter from sendreceive.obsw_tm_listener import TmListenerT, TmListener from tc.OBSW_TcPacket import tcQueueEntryT @@ -29,7 +29,7 @@ class CommandSenderReceiver: This is the generic CommandSenderReceiver object. All TMTC objects inherit this object, for example specific implementations (e.g. SingleCommandSenderReceiver) """ - def __init__(self, comInterface: ComIF_T, tmtcPrinter: TmTcPrinterT, tmListener: TmListenerT, + def __init__(self, comInterface: ComIfT, tmtcPrinter: TmTcPrinterT, tmListener: TmListenerT, tmTimeout: float, tcSendTimeoutFactor: float): """ :param comInterface: CommunicationInterface object. Instantiate the desired one @@ -118,7 +118,7 @@ class CommandSenderReceiver: self._elapsed_time = time.time() - self._start_time if self._elapsed_time > self.tm_timeout * self.tc_send_timeout_factor: print("Command Sender Receiver: Timeout, sending TC again !") - self._com_interface.sendTelecommand(self._last_tc, self._last_tc_info) + self._com_interface.send_telecommand(self._last_tc, self._last_tc_info) self._timeout_counter = self._timeout_counter + 1 self._start_time = time.time() time.sleep(0.5) diff --git a/sendreceive/OBSW_SequentialSenderReceiver.py b/sendreceive/obsw_sequential_sender_receiver.py similarity index 58% rename from sendreceive/OBSW_SequentialSenderReceiver.py rename to sendreceive/obsw_sequential_sender_receiver.py index ef73d6bc2f6a1d206cc02bab973bd56a79945e27..9afc4efdc1add327e463ff93f17f692346cce85c 100644 --- a/sendreceive/OBSW_SequentialSenderReceiver.py +++ b/sendreceive/obsw_sequential_sender_receiver.py @@ -1,19 +1,16 @@ -#!/usr/bin/python3.7 -# -*- coding: utf-8 -*- +#!/usr/bin/python3.8 """ -@file - OBSW_Config.py -@date - 01.11.2019 -@brief - Used to send multiple TCs in sequence and listen for replies after each sent tc +@file obsw_sequential_sender_receiver.py +@date 01.11.2019 +@brief Used to send multiple TCs in sequence and listen for replies after each sent TC """ +import sys import time import config.OBSW_Config as g from sendreceive.obsw_command_sender_receiver import CommandSenderReceiver from sendreceive.obsw_tm_listener import TmListenerT -from comIF.OBSW_ComInterface import ComIF_T +from comIF.obsw_com_interface import ComIfT from utility.obsw_tmtc_printer import TmTcPrinterT from tc.OBSW_TcPacket import tcQueueT @@ -22,81 +19,85 @@ class SequentialCommandSenderReceiver(CommandSenderReceiver): """ Specific implementation of CommandSenderReceiver to send multiple telecommands in sequence """ - def __init__(self, comInterface: ComIF_T, tmtcPrinter: TmTcPrinterT, tmListener: TmListenerT, + def __init__(self, comInterface: ComIfT, tmtcPrinter: TmTcPrinterT, tmListener: TmListenerT, tmTimeout: float, tcQueue: tcQueueT, tcTimeoutFactor: float): """ :param comInterface: CommunicationInterface object, passed on to CommandSenderReceiver :param tmListener: TmListener object which runs in the background and receives all Telemetry :param tmtcPrinter: TmTcPrinter object, passed on to CommandSenderReceiver - :param tmTimeout: TM Timeout. After each sent telecommand, listen for TMs for this time period - :param tcTimeoutFactor: If TM is not received, resend TC after G_TM_TIMEOUT * G_TC_SEND_TIMEOUT_FACTOR - :param doPrintToFile: Specify whether output is also printed to a file + :param tmTimeout: TM Timeout. After each sent telecommand, listen for TMs + for this time period + :param tcTimeoutFactor: If TM is not received, resend TC after + G_TM_TIMEOUT * G_TC_SEND_TIMEOUT_FACTOR """ super().__init__( comInterface=comInterface, tmtcPrinter=tmtcPrinter, tmListener=tmListener, tmTimeout=tmTimeout, tcSendTimeoutFactor=tcTimeoutFactor) - self._tcQueue = tcQueue - self.__firstReplyReceived = False - self.__allRepliesReceived = False - self.__modeOpFinished = False + self._tc_queue = tcQueue + self.__first_reply_received = False + self.__all_replies_received = False + self.__mode_op_finished = False - def sendQueueTcAndReceiveTmSequentially(self): + def send_queue_tc_and_receive_tm_sequentially(self): + """ + Primary function which is called for sequential transfer. + :return: + """ self._tm_listener.modeId = g.ModeList.ServiceTestMode self._tm_listener.modeChangeEvent.set() - self.__sendAndReceiveFirstPacket() + self.__send_and_receive_first_packet() # this flag is set in the separate thread ! try: - self.handleTcSending() + self.__handle_tc_sending() except (KeyboardInterrupt, SystemExit): print("Keyboard Interrupt or System Exit detected") - exit() + sys.exit() # self.handleInterrupt(receiverThread) - def handleTcSending(self): - while not self.__allRepliesReceived: - while not self._tcQueue.__len__() == 0: - self.__performNextTcSend() - if self._tcQueue.__len__() == 0: + def __handle_tc_sending(self): + while not self.__all_replies_received: + while not self._tc_queue.__len__() == 0: + self.__perform_next_tc_send() + if self._tc_queue.__len__() == 0: self._start_time = time.time() self._check_for_timeout() - if not self.__modeOpFinished: + if not self.__mode_op_finished: self._tm_listener.modeOpFinished.set() - self.__modeOpFinished = True + self.__mode_op_finished = True print("Sequential SenderReceiver: All replies received!") if g.G_PRINT_TO_FILE: print("Sequential SenderReceiver: Exporting output to log file.") self._tmtc_printer.print_to_file() - def __performNextTcSend(self): + def __perform_next_tc_send(self): if self._tm_listener.replyEvent.is_set(): self._reply_received = True self._tm_listener.replyEvent.clear() # this flag is set in the separate receiver thread too if self._reply_received: - self.__sendNextTelecommand() + self.__send_next_telecommand() self._reply_received = False # just calculate elapsed time if start time has already been set (= command has been sent) else: self._check_for_timeout() - def __sendAndReceiveFirstPacket(self): - tc_queue_tuple = self._tcQueue.pop() + def __send_and_receive_first_packet(self): + tc_queue_tuple = self._tc_queue.pop() if self.check_queue_entry(tc_queue_tuple): pus_packet, pus_packet_info = tc_queue_tuple - self._com_interface.sendTelecommand(pus_packet, pus_packet_info) + self._com_interface.send_telecommand(pus_packet, pus_packet_info) else: - self.__sendAndReceiveFirstPacket() + self.__send_and_receive_first_packet() - def __sendNextTelecommand(self): - tc_queue_tuple = self._tcQueue.pop() + def __send_next_telecommand(self): + tc_queue_tuple = self._tc_queue.pop() if self.check_queue_entry(tc_queue_tuple): self._start_time = time.time() pus_packet, pus_packet_info = tc_queue_tuple - self._com_interface.sendTelecommand(pus_packet, pus_packet_info) - elif self._tcQueue.__len__() == 0: + self._com_interface.send_telecommand(pus_packet, pus_packet_info) + elif self._tc_queue.__len__() == 0: # Special case: Last queue entry is not a Telecommand - self.__allRepliesReceived = True + self.__all_replies_received = True else: # If the queue entry was not a telecommand, send next telecommand - self.__performNextTcSend() - + self.__perform_next_tc_send() diff --git a/sendreceive/obsw_single_command_sender_receiver.py b/sendreceive/obsw_single_command_sender_receiver.py index b7c9d299bb9c61036f5e7441632aed1356400db6..1c81caad9cb75431f772aedfa995ef4c19424a52 100644 --- a/sendreceive/obsw_single_command_sender_receiver.py +++ b/sendreceive/obsw_single_command_sender_receiver.py @@ -11,7 +11,7 @@ import logging from sendreceive.obsw_command_sender_receiver import CommandSenderReceiver from sendreceive.obsw_tm_listener import TmListenerT -from comIF.OBSW_ComInterface import ComIF_T +from comIF.obsw_com_interface import ComIfT from utility.obsw_tmtc_printer import TmTcPrinterT import config.OBSW_Config as g @@ -21,7 +21,7 @@ class SingleCommandSenderReceiver(CommandSenderReceiver): Specific implementation of CommandSenderReceiver to send a single telecommand This object can be used by instantiating it and calling sendSingleTcAndReceiveTm() """ - def __init__(self, comInterface: ComIF_T, tmtcPrinter: TmTcPrinterT, tmListener: TmListenerT, + def __init__(self, comInterface: ComIfT, tmtcPrinter: TmTcPrinterT, tmListener: TmListenerT, pusPacketTuple: tuple, tmTimeout: float, tcTimeoutFactor: float): """ :param comInterface: CommunicationInterface object, passed on to CommandSenderReceiver @@ -51,7 +51,7 @@ class SingleCommandSenderReceiver(CommandSenderReceiver): if not self.faulty_input: self._tm_listener.modeId = g.ModeList.SingleCommandMode self._tm_listener.modeChangeEvent.set() - self._com_interface.sendTelecommand(self.pus_packet, self.pus_packet_info) + self._com_interface.send_telecommand(self.pus_packet, self.pus_packet_info) while not self._reply_received: # wait until reply is received super()._check_for_first_reply() diff --git a/sendreceive/obsw_tm_listener.py b/sendreceive/obsw_tm_listener.py index 0155d7fed23a9bbea3d549da2f79f76ff61cb15c..64a6e699ea0578a725c991fc12e5aa6419fbd629 100644 --- a/sendreceive/obsw_tm_listener.py +++ b/sendreceive/obsw_tm_listener.py @@ -14,7 +14,7 @@ import threading from collections import deque from typing import TypeVar -from comIF.OBSW_ComInterface import ComIF_T +from comIF.obsw_com_interface import ComIfT import config.OBSW_Config as g TmListenerT = TypeVar('TmListenerT', bound='TmListener') @@ -27,7 +27,7 @@ class TmListener: and changing the mode to do special mode operations. The mode operation ends as soon the modeOpFinished Event is set() ! """ - def __init__(self, comInterface: ComIF_T, tmTimeout: float, tcTimeoutFactor: float): + def __init__(self, comInterface: ComIfT, tmTimeout: float, tcTimeoutFactor: float): self.tmTimeout = tmTimeout self.tcTimeoutFactor = tcTimeoutFactor self.comInterface = comInterface @@ -66,7 +66,7 @@ class TmListener: time.sleep(1) def defaultOperation(self): - self.comInterface.pollInterface() + self.comInterface.poll_interface() if self.modeChangeEvent.is_set(): # TODO: We should put this in a timeout.. Each mode operation up until now only takes # a maximum specified time (software test 5 minutes maybe?). @@ -100,7 +100,7 @@ class TmListener: print("TM Listener: Reply sequence received!") self.replyEvent.set() elif self.modeId == g.ModeList.UnitTest: - self.tmInfoQueue = self.comInterface.receiveTelemetryAndStoreInfo(self.tmInfoQueue) + self.tmInfoQueue = self.comInterface.receive_telemetry_and_store_info(self.tmInfoQueue) self.replyEvent.set() def checkForOneTelemetrySequence(self) -> bool: @@ -109,17 +109,17 @@ class TmListener: This function prints the telemetry sequence but does not return it. :return: """ - tmReady = self.comInterface.dataAvailable(self.tmTimeout * self.tcTimeoutFactor) + tmReady = self.comInterface.data_available(self.tmTimeout * self.tcTimeoutFactor) if tmReady is False: return False else: - self.comInterface.receiveTelemetry() + self.comInterface.receive_telemetry() start_time = time.time() elapsed_time = 0 while elapsed_time < self.tmTimeout: - tmReady = self.comInterface.dataAvailable(1.0) + tmReady = self.comInterface.data_available(1.0) if tmReady: - self.comInterface.receiveTelemetry() + self.comInterface.receive_telemetry() elapsed_time = time.time() - start_time # the timeout value can be set by special TC queue entries if packet handling # takes longer, but it is reset here to the global value diff --git a/test/OBSW_UnitTest.py b/test/OBSW_UnitTest.py index 40e8aab624e3453f63eeb9dc241e56627fd8ae45..49184ca4ad96bfc29618602ef4ecded7b94b788a 100644 --- a/test/OBSW_UnitTest.py +++ b/test/OBSW_UnitTest.py @@ -85,7 +85,7 @@ class TestService(unittest.TestCase): tmListener=self.tmListener, tcQueue=self.testQueue, tmTimeout=self.tmTimeout, waitIntervals=self.waitIntervals, waitTime=self.waitTime, printTm=self.printTm, tcTimeoutFactor=self.tcTimeoutFactor) - (tcInfoQueue, tmInfoQueue) = UnitTester.sendTcQueueAndReturnInfo() + (tcInfoQueue, tmInfoQueue) = UnitTester.send_tc_queue_and_return_info() assertionDict = self.analyseTmTcInfo(tmInfoQueue, tcInfoQueue) return assertionDict diff --git a/utility/OBSW_ArgParser.py b/utility/OBSW_ArgParser.py deleted file mode 100644 index 24c7700e2249e13aa2917d80513f0b5c9861827f..0000000000000000000000000000000000000000 --- a/utility/OBSW_ArgParser.py +++ /dev/null @@ -1,111 +0,0 @@ -#!/usr/bin/python3.7 -""" -@file - OBSW_ArgParser.py -@date - 11.01.2020 -@brief - Reads all input arguments or requests them from user. -""" - -import argparse -import sys - - -def parseInputArguments(): - """ - Parses all input arguments - :return: Input arguments - """ - argParser = argparse.ArgumentParser(description="TMTC Client Command Line Interface") - - argParser.add_argument( - '-m', '--mode', type=int, help='Target Mode. Default is 1(Listener Mode), ' - '0: GUI Mode, 1:Listener Mode, 2: Single Command Mode, 3: Service Test Mode, ' - '4: Software Test Mode, 5: Unit Test Mode ', default=0) - argParser.add_argument( - '-c', '--comIF', type=int, help='Communication Interface. 0 for Ethernet, 1 for Serial', - default=0) - argParser.add_argument('--clientIP', help='Client(Computer) IP. Default:\'\'', default='') - argParser.add_argument( - '--boardIP', help='Board IP. Default: 169.254.1.38', default='169.254.1.38') - argParser.add_argument('-s', '--service', help='Service to test. Default: 17', default=17) - argParser.add_argument('-t', '--tm_timeout', type=float, help='TM Timeout when listening to ' - 'verification sequence. Default: 12, 6(Serial)', default=6.0) - argParser.add_argument('--np', dest='printFile', help='Supply --np to suppress print ' - 'output to file.', action='store_false') - argParser.add_argument('-o', '--tcTimeoutFactor', type=float, - help='TC Timeout Factor. Multiplied with TM Timeout, ' - 'TC sent again after this time period. Default: 3.5', default=3.5) - argParser.add_argument('-r', '--rawDataPrint', help='Supply -r to print all raw data directly', - action='store_true') - argParser.add_argument('-d', '--shortDisplayMode', help='Supply -d to print short output', - action='store_true') - argParser.add_argument('--hk', dest='print_hk', help='Supply -k or --hk to print HK data', - action='store_true') - argParser.add_argument('--COM', dest="com_port", help='COM Port for serial communication') - - if len(sys.argv) == 1: - print("No Input Arguments specified.") - argParser.print_help() - args, unknown = argParser.parse_known_args() - print(args) - handle_args(args, unknown) - return args - - -def handle_args(args, unknown: list): - """ - Handles the parsed arguments. - :param args: Namespace objects - (see https://docs.python.org/dev/library/argparse.html#argparse.Namespace) - :param unknown: List of unknown parameters. - :return: - """ - if len(unknown) > 0: - print("Unknown arguments detected: " + str(unknown)) - if len(sys.argv) > 1: - handleUnspecifiedArgs(args) - if len(sys.argv) == 1: - handleEmptyArgs(args) - - -def handleUnspecifiedArgs(args): - if args.comIF == 1 and args.tm_timeout is None: - args.tmTimeout = 6.0 - if args.comIF == 1 and args.com_port is None: - args.com_port = input("Serial Commuinication specified without COM port. " - "Please enter COM Port: ") - if args.mode is None: - print("No mode specified with -m Parameter.") - print("Possible Modes: ") - print("1: Listener Mode") - print("2: Single Command Mode with manual command") - print("3: Service Mode, Commands specified in tc folder") - print("4: Software Mode, runs all command specified in OBSW_TcPacker.py") - print("5: Unit Test, runs unit test specified in OBSW_UnitTest.py") - args.mode = input("Please enter Mode: ") - if args.mode == 1 and args.service is None: - args.service = input("No Service specified for Service Mode. " - "Please enter PUS G_SERVICE number: ") - - -def handleEmptyArgs(args): - printHk = input("Print HK packets ? (y/n or yes/no)") - try: - printHk = printHk.lower() - except TypeError: - pass - if printHk in ('y', 'yes', 1): - args.print_hk = True - else: - args.print_hk = False - printToLog = input("Export G_SERVICE test output to log files ? (y/n or yes/no)") - try: - printToLog = printToLog.lower() - except TypeError: - pass - if printToLog == 'n' or printToLog == 'no' or printToLog == 0: - args.printFile = False - else: - args.printFile = True diff --git a/utility/obsw_args_parser.py b/utility/obsw_args_parser.py new file mode 100644 index 0000000000000000000000000000000000000000..022910fa003f9c14b468b2fd89f4fa8b97184ff4 --- /dev/null +++ b/utility/obsw_args_parser.py @@ -0,0 +1,125 @@ +#!/usr/bin/python3.7 +""" +@file + obsw_args_parser.py +@date + 11.01.2020 +@brief + Reads all input arguments or requests them from user. +""" + +import argparse +import sys + + +def parse_input_arguments(): + """ + Parses all input arguments + :return: Input arguments contained in a special namespace and accessable by args.<variable> + """ + arg_parser = argparse.ArgumentParser(description="TMTC Client Command Line Interface") + + arg_parser.add_argument( + '-m', '--mode', type=int, help='Target Mode. Default is 1(Listener Mode), ' + '0: GUI Mode, 1:Listener Mode, 2: Single Command Mode, 3: Service Test Mode, ' + '4: Software Test Mode, 5: Unit Test Mode ', default=0) + arg_parser.add_argument( + '-c', '--comIF', type=int, help='Communication Interface. 0 for Ethernet, 1 for Serial', + default=0) + arg_parser.add_argument('--clientIP', help='Client(Computer) IP. Default:\'\'', default='') + arg_parser.add_argument( + '--boardIP', help='Board IP. Default: 169.254.1.38', default='169.254.1.38') + arg_parser.add_argument('-s', '--service', help='Service to test. Default: 17', default=17) + arg_parser.add_argument( + '-t', '--tm_timeout', type=float, help='TM Timeout when listening to verification sequence.' + ' Default: 12, 6(Serial)', default=6.0) + arg_parser.add_argument( + '--np', dest='printFile', help='Supply --np to suppress print output to file.', + action='store_false') + arg_parser.add_argument( + '-o', '--tc_timeout_factor', type=float, help='TC Timeout Factor. Multiplied with TM Timeout,' + ' TC sent again after this time period. Default: 3.5', default=3.5) + arg_parser.add_argument( + '-r', '--rawDataPrint', help='Supply -r to print all raw data directly', + action='store_true') + arg_parser.add_argument( + '-d', '--shortDisplayMode', help='Supply -d to print short output', action='store_true') + arg_parser.add_argument( + '--hk', dest='print_hk', help='Supply -k or --hk to print HK data', action='store_true') + arg_parser.add_argument('--COM', dest="com_port", help='COM Port for serial communication') + + if len(sys.argv) == 1: + print("No Input Arguments specified.") + arg_parser.print_help() + args, unknown = arg_parser.parse_known_args() + print(args) + handle_args(args, unknown) + return args + + +def handle_args(args, unknown: list) -> None: + """ + Handles the parsed arguments. + :param args: Namespace objects + (see https://docs.python.org/dev/library/argparse.html#argparse.Namespace) + :param unknown: List of unknown parameters. + :return: None + """ + if len(unknown) > 0: + print("Unknown arguments detected: " + str(unknown)) + if len(sys.argv) > 1: + handle_unspecified_args(args) + if len(sys.argv) == 1: + handle_empty_args(args) + + +def handle_unspecified_args(args) -> None: + """ + If some arguments are unspecified, they are set here with (variable) default values. + :param args: + :return: None + """ + if args.comIF == 1 and args.tm_timeout is None: + args.tmTimeout = 6.0 + if args.comIF == 1 and args.com_port is None: + args.com_port = input("Serial Commuinication specified without COM port. " + "Please enter COM Port: ") + if args.mode is None: + print("No mode specified with -m Parameter.") + print("Possible Modes: ") + print("1: Listener Mode") + print("2: Single Command Mode with manual command") + print("3: Service Mode, Commands specified in tc folder") + print("4: Software Mode, runs all command specified in OBSW_TcPacker.py") + print("5: Unit Test, runs unit test specified in OBSW_UnitTest.py") + args.mode = input("Please enter Mode: ") + if args.mode == 1 and args.service is None: + args.service = input("No Service specified for Service Mode. " + "Please enter PUS G_SERVICE number: ") + + +def handle_empty_args(args) -> None: + """ + If no args were supplied, request input from user directly. + TODO: This still needs to be extended. + :param args: + :return: + """ + print_hk = input("Print HK packets ? (y/n or yes/no)") + try: + print_hk = print_hk.lower() + except TypeError: + pass + if print_hk in ('y', 'yes', 1): + args.print_hk = True + else: + args.print_hk = False + print_to_log = input("Export G_SERVICE test output to log files ? (y/n or yes/no)") + try: + print_to_log = print_to_log.lower() + except TypeError: + pass + if print_to_log in ('n', 'no', 0): + args.printFile = False + else: + args.printFile = True diff --git a/utility/OBSW_ExitHandler.py b/utility/obsw_exit_handler.py similarity index 81% rename from utility/OBSW_ExitHandler.py rename to utility/obsw_exit_handler.py index 4fa3792448cd3900ce21f85deddbdb14439654f3..118ba24c29f1508433e7c19cd8c02a24ac317716 100644 --- a/utility/OBSW_ExitHandler.py +++ b/utility/obsw_exit_handler.py @@ -1,5 +1,5 @@ import signal -from comIF.OBSW_ComInterface import ComIF_T +from comIF.obsw_com_interface import ComIfT class GracefulKiller: @@ -14,7 +14,7 @@ class GracefulKiller: print("I was killed") -def keyboardInterruptHandler(comInterface: ComIF_T): +def keyboard_interrupt_handler(comInterface: ComIfT): print("Disconnect registered") # Unit Test closes Serial Port at the end # We could do some optional stuff here