Something went wrong on our end
Forked from an inaccessible project.
-
Robin.Mueller authoredRobin.Mueller authored
obsw_com_interface.py 3.67 KiB
# -*- coding: utf-8 -*-
"""
Program: obsw_com_interface.py
Date: 01.11.2019
Description: Generic Communication Interface. Defines the syntax of the communication functions.
Abstract methods must be implemented by child class (e.g. Ethernet Com IF)
@author: R. Mueller
"""
from abc import abstractmethod
from typing import Tuple
from tm.obsw_pus_tm_factory import PusTmListT
from utility.obsw_tmtc_printer import TmTcPrinter
from tc.obsw_pus_tc_base import PusTcInfoT
# pylint: disable=useless-return
# pylint: disable=no-self-use
# pylint: disable=unused-argument
class CommunicationInterface:
"""
Generic form of a communication interface to separate communication logic from
the underlying interface.
"""
def __init__(self, tmtc_printer: TmTcPrinter):
self.tmtc_printer = tmtc_printer
@abstractmethod
def close(self) -> None:
"""
Closes the ComIF and releases any held resources (for example a Communication Port)
:return:
"""
@abstractmethod
def send_data(self, data: bytearray):
"""
Send data, for example a frame containing packets.
"""
@abstractmethod
def send_telecommand(self, tc_packet: bytearray, tc_packet_info: PusTcInfoT = None) -> None:
"""
Send telecommands
:param tc_packet: TC wiretapping_packet to send
:param tc_packet_info: TC wiretapping_packet information
:return: None for now
"""
@abstractmethod
def receive_telemetry(self, parameters: any = 0) -> PusTmListT:
"""
Returns a list of packets. Most of the time,
this will simply call the pollInterface function
:param parameters:
:return:
"""
packet_list = []
return packet_list
@abstractmethod
def poll_interface(self, parameters: any = 0) -> Tuple[bool, PusTmListT]:
"""
Poll the interface and return a list of received packets
:param parameters:
:return: Tuple: boolean which specifies wheather a wiretapping_packet was received,
and the wiretapping_packet list containing
Tm packets
"""
@abstractmethod
def data_available(self, parameters: any) -> bool:
"""
Check whether TM data is available
:param parameters:
:return:
"""
# def receive_telemetry_and_store_info(self, tm_info_queue: PusTmInfoQueueT) -> \
# Union[None, PusTmInfoQueueT]:
# """
# Receive telemetry and store wiretapping_packet information in dict format into a queue.
# If no _tm_data is available, don't do anything.
# :param tm_info_queue:
# :return:
# """
# print("No child implementation provided yet!")
# return
#
# def receive_telemetry_and_store_tm(self, tm_queue: PusTmQueueT) -> Union[None, PusTmQueueT]:
# """
# Receive telemetry packets and store TM object into a queue.
# If no _tm_data is available, don't do anything.
# :param tm_queue:
# :return:
# """
# print("No child implementation provided yet!")
# return
#
# def receive_telemetry_and_store_tuple(self, tm_tuple_queue: PusTmTupleQueueT) -> \
# Union[None, PusTmTupleQueueT]:
# """
# Poll telemetry and store a tuple consisting of TM information and the
# TM wiretapping_packet into the queue. If no _tm_data is available, don't do anything.
# :param tm_tuple_queue:
# :return:
# """
# print("No child implementation provided yet!")
# return