Something went wrong on our end
Forked from an inaccessible project.
-
Robin Mueller authoredRobin Mueller authored
obsw_command_sender_receiver.py 5.36 KiB
"""
Program: obsw_module_test.py
Date: 01.11.2019
Description: All functions related to TmTc Sending and Receiving, used by UDP client
Manual:
Set up the UDP client as specified in the header comment and use the unit testing mode
A separate thread is used to listen for replies and send a new telecommand
if the first reply has not been received.
@author: R. Mueller
"""
import sys
import time
import config.OBSW_Config as g
from comIF.obsw_com_interface import CommunicationInterface
from utility.obsw_tmtc_printer import TmTcPrinter
from sendreceive.obsw_tm_listener import TmListener
from tc.obsw_pus_tc_base import TcQueueEntryT
# pylint: disable=too-many-instance-attributes
class CommandSenderReceiver:
"""
This is the generic CommandSenderReceiver object. All TMTC objects inherit this object,
for example specific implementations (e.g. SingleCommandSenderReceiver)
"""
def __init__(self, comInterface: CommunicationInterface, tmtcPrinter: TmTcPrinter,
tmListener: TmListener):
"""
:param comInterface: CommunicationInterface object. Instantiate the desired one
and pass it here
:param tmtcPrinter: TmTcPrinter object. Instantiate it and pass it here.
"""
self._tm_timeout = g.G_TM_TIMEOUT
self._tc_send_timeout_factor = g.G_TC_SEND_TIMEOUT_FACTOR
if isinstance(comInterface, CommunicationInterface):
self._com_interface = comInterface
else:
raise TypeError("Invalid communication interface service_type!")
if isinstance(tmtcPrinter, TmTcPrinter):
self._tmtc_printer = tmtcPrinter
else:
raise TypeError("Invalid TMTC Printer service_type!")
if isinstance(tmListener, TmListener):
self._tm_listener = tmListener
else:
raise TypeError("Invalid TM Listener service_type!")
self._reply_received = False
self._start_time = 0
self._elapsed_time = 0
self._timeout_counter = 0
# needed to store last actual tc packet from queue
self._last_tc = bytearray()
self._last_tc_info = dict()
def set_tm_timeout(self, tm_timeout: float = g.G_TM_TIMEOUT):
"""
Set the TM timeout. Usually, the global value set by the args parser is set,
but the TM timeout can be reset (e.g. for slower architectures)
:param tm_timeout: New TM timeout value as a float value in seconds
:return:
"""
self._tm_timeout = tm_timeout
def set_tc_send_timeout_factor(self, new_factor: float = g.G_TC_SEND_TIMEOUT_FACTOR):
"""
Set the TC resend timeout factor. After self._tm_timeout * new_factor seconds,
a telecommand will be resent again.
:param new_factor: Factor as a float number
:return:
"""
self._tc_send_timeout_factor = new_factor
def _check_for_first_reply(self) -> None:
"""
Checks for replies. If no reply is received, send telecommand again in checkForTimeout()
:return: None
"""
if self._tm_listener.replyEvent.is_set():
self._reply_received = True
self._tm_listener.replyEvent.clear()
else:
self._check_for_timeout()
def check_queue_entry(self, tc_queue_entry: TcQueueEntryT) -> bool:
"""
Checks whether the entry in the tc queue is a telecommand.
The last telecommand and respective information are stored in _last_tc
and _last_tc_info
:param tc_queue_entry:
:return: True if queue entry is telecommand, False if it is not
"""
queue_entry_first, queue_entry_second = tc_queue_entry
queue_entry_is_telecommand = False
if queue_entry_first == "wait":
wait_time = queue_entry_second
self._tm_timeout = self._tm_timeout + wait_time
time.sleep(wait_time)
elif queue_entry_first == "print":
print_string = queue_entry_second
self._tmtc_printer.print_string(print_string)
elif queue_entry_first == "export":
export_name = queue_entry_second
self._tmtc_printer.print_to_file(export_name, False)
elif queue_entry_first == "timeout":
self._tm_timeout = queue_entry_second
else:
self._last_tc, self._last_tc_info = (queue_entry_first, queue_entry_second)
return True
return queue_entry_is_telecommand
def _check_for_timeout(self):
"""
Checks whether a timeout after sending a telecommand has occured and sends telecommand
again. If resending reached certain counter, exit the program.
:return:
"""
if self._timeout_counter == 5:
print("Command Sender Receiver: No response from command !")
sys.exit()
if self._start_time != 0:
self._elapsed_time = time.time() - self._start_time
if self._elapsed_time > self._tm_timeout * self._tc_send_timeout_factor:
print("Command Sender Receiver: Timeout, sending TC again !")
self._com_interface.send_telecommand(self._last_tc, self._last_tc_info)
self._timeout_counter = self._timeout_counter + 1
self._start_time = time.time()
time.sleep(0.5)