diff --git a/obsw_tmtc_client.py b/obsw_tmtc_client.py index fc3730e791470f58dc3af867795abe584143d855..9a7f76c678f6aebe51b690ef3c7e2e28cf24bd27 100644 --- a/obsw_tmtc_client.py +++ b/obsw_tmtc_client.py @@ -130,8 +130,8 @@ class TmTcHandler: while True: try: if self.command_received: - self.handle_action() self.command_received = False + self.handle_action() LOGGER.info("TMTC Client in idle mode") time.sleep(5) except (IOError, KeyboardInterrupt): @@ -160,7 +160,11 @@ class TmTcHandler: tc_timeout_factor=g.G_TC_SEND_TIMEOUT_FACTOR) tm_listener.start() if self.mode == g.ModeList.ListenerMode: - LOGGER.info("Listening for packages...") + if tm_listener.event_reply_received: + # TODO: Test this. + LOGGER.info("TmTcHandler: Packets received.") + tmtc_printer.print_telemetry_queue(tm_listener.retrieve_tm_packet_queue()) + self.command_received = True elif self.mode == g.ModeList.SingleCommandMode: pus_packet_tuple = command_preparation() sender_and_receiver = SingleCommandSenderReceiver( diff --git a/sendreceive/obsw_command_sender_receiver.py b/sendreceive/obsw_command_sender_receiver.py index 0e5f11f57779b680b938765c5844e97fcdf04611..c72f573e53c36e96bf1051e4ae2d059980290010 100644 --- a/sendreceive/obsw_command_sender_receiver.py +++ b/sendreceive/obsw_command_sender_receiver.py @@ -96,10 +96,10 @@ class CommandSenderReceiver: Checks for replies. If no reply is received, send telecommand again in checkForTimeout() :return: None """ - if self._tm_listener.replyEvent.is_set(): + if self._tm_listener.event_reply_received.is_set(): self._reply_received = True self._operation_pending = False - self._tm_listener.replyEvent.clear() + self._tm_listener.event_reply_received.clear() else: self._check_for_timeout() diff --git a/sendreceive/obsw_multiple_commands_sender_receiver.py b/sendreceive/obsw_multiple_commands_sender_receiver.py index 8e7f67dd5979544eec9e91d9d981a16c8d699b9d..7fe0377161b33a3fb1abdb7b86513ca0837f6392 100644 --- a/sendreceive/obsw_multiple_commands_sender_receiver.py +++ b/sendreceive/obsw_multiple_commands_sender_receiver.py @@ -54,12 +54,12 @@ class MultipleCommandSenderReceiver(SequentialCommandSenderReceiver): def send_tc_queue_and_return_info(self): try: self._tm_listener.mode_id = g.ModeList.UnitTest - self._tm_listener.mode_change_event.set() + self._tm_listener.event_mode_change.set() # TC info queue is set in this function self.__send_all_queue() self.wait_for_last_replies_listening(self._tm_timeout / 1.5) self.tmTupleQueue = self.__retrieve_listener_tm_tuple() - self._tm_listener.mode_op_finished.set() + self._tm_listener.event_mode_op_finished.set() if g.G_PRINT_TO_FILE: self._tmtc_printer.print_to_file() except (KeyboardInterrupt, SystemExit): @@ -96,7 +96,7 @@ class MultipleCommandSenderReceiver(SequentialCommandSenderReceiver): time.sleep(self.waitTime) def __retrieve_listener_tm_tuple(self): - if self._tm_listener.replyEvent.is_set(): + if self._tm_listener.event_reply_received.is_set(): return self._tm_listener.__tm_info_queue else: print("Multiple Command SenderReceiver: Configuration error," diff --git a/sendreceive/obsw_sequential_sender_receiver.py b/sendreceive/obsw_sequential_sender_receiver.py index 83ba8d49b0007463dbbf1780cc1bf869a2e9cb9a..91970e3f7c8b1579a549d07a6fcad8f0e4d76d70 100644 --- a/sendreceive/obsw_sequential_sender_receiver.py +++ b/sendreceive/obsw_sequential_sender_receiver.py @@ -44,7 +44,7 @@ class SequentialCommandSenderReceiver(CommandSenderReceiver): :return: """ self._tm_listener.mode_id = g.ModeList.ServiceTestMode - self._tm_listener.mode_change_event.set() + self._tm_listener.event_mode_change.set() self.__send_and_receive_first_packet() # this flag is set in the separate thread ! try: @@ -61,7 +61,7 @@ class SequentialCommandSenderReceiver(CommandSenderReceiver): self._start_time = time.time() self._check_for_timeout() if not self.__mode_op_finished: - self._tm_listener.mode_op_finished.set() + self._tm_listener.event_mode_op_finished.set() self.__mode_op_finished = True logger.info("SequentialSenderReceiver: All replies received!") if g.G_PRINT_TO_FILE: @@ -69,9 +69,9 @@ class SequentialCommandSenderReceiver(CommandSenderReceiver): self._tmtc_printer.print_file_buffer_list_to_file("log/tmtc_log.txt", True) def __perform_next_tc_send(self): - if self._tm_listener.replyEvent.is_set(): + if self._tm_listener.event_reply_received.is_set(): self._reply_received = True - self._tm_listener.replyEvent.clear() + self._tm_listener.event_reply_received.clear() # this flag is set in the separate receiver thread too if self._reply_received: self.print_tm_queue(self._tm_listener.retrieve_tm_packet_queue()) diff --git a/sendreceive/obsw_single_command_sender_receiver.py b/sendreceive/obsw_single_command_sender_receiver.py index 1cdb00449d36b6f7b73b3535db5a9f9c2ad2e2cd..88f271648ca176823a7b5d190e410da44068acb9 100644 --- a/sendreceive/obsw_single_command_sender_receiver.py +++ b/sendreceive/obsw_single_command_sender_receiver.py @@ -51,7 +51,7 @@ class SingleCommandSenderReceiver(CommandSenderReceiver): return self._operation_pending = True self._tm_listener.mode_id = g.ModeList.SingleCommandMode - self._tm_listener.mode_change_event.set() + self._tm_listener.event_mode_change.set() self._tmtc_printer.print_telecommand(pus_packet, pus_packet_info) self._com_interface.send_telecommand(pus_packet, pus_packet_info) self._last_tc = pus_packet @@ -60,7 +60,7 @@ class SingleCommandSenderReceiver(CommandSenderReceiver): # wait until reply is received super()._check_for_first_reply() if self._reply_received: - self._tm_listener.mode_op_finished.set() + self._tm_listener.event_mode_op_finished.set() self.print_tm_queue(self._tm_listener.retrieve_tm_packet_queue()) logger.info("SingleCommandSenderReceiver: Reply received") logger.info("Listening for packages ...") diff --git a/sendreceive/obsw_tm_listener.py b/sendreceive/obsw_tm_listener.py index 43ab5ec2e99276872247a10f682efd585b7618a0..0906aaacb5727034621d535911c571eb3dcb1cad 100644 --- a/sendreceive/obsw_tm_listener.py +++ b/sendreceive/obsw_tm_listener.py @@ -8,6 +8,9 @@ This will enable to run the listener in a separate thread later. This Listener will propably have some kind of mode. In default configuration, it will just listen for packets + TODO: Have a look at the new asyncio library. Pyserial has an asyncio extension. + See: https://pyserial-asyncio.readthedocs.io/en/latest/index.html + Maybe this would simplify things here. """ import sys import time @@ -24,6 +27,7 @@ TmListenerT = TypeVar('TmListenerT', bound='TmListener') class TmListener: + MODE_OPERATION_TIMEOUT = 300 """ Performs all TM listening operations. This listener can be used by setting the modeChangeEvent Event with the set() function @@ -38,26 +42,31 @@ class TmListener: # this will be the default mode (listener mode) self.mode_id = g.ModeList.ListenerMode # TM Listener operations can be suspended by setting this flag - self.listener_active = threading.Event() - self.listener_active.set() + self.event_listener_active = threading.Event() + self.event_listener_active.set() # I don't think a listener is useful without the main program, # so we might just declare it daemonic. # UPDATE: Right now, the main program is not in a permanent loop and setting the # thread daemonic will cancel the program. # Solved for now by setting a permanent loop at the end of the main program self.listener_thread = threading.Thread(target=self.perform_operation, daemon=True) + self.lock_listener = threading.Lock() # This Event is set by sender objects to perform mode operations - self.mode_change_event = threading.Event() + self.event_mode_change = threading.Event() # This Event is set by sender objects if all necessary operations are done # to transition back to listener mode - self.mode_op_finished = threading.Event() + self.event_mode_op_finished = threading.Event() # maybe we will just make the thread daemonic... # self.terminationEvent = threading.Event() # This Event is set and cleared by the listener to inform the sender objects # if a reply has been received - self.replyEvent = threading.Event() + self.event_reply_received = threading.Event() # Will be filled for the Unit Test - self.__tm_info_queue = deque() + # TODO: Maybe its better to use a normal queue, which (by-design) offers more thread-safety + # Then we also don't need a lock. But I think its okay to treat the tm packet queue + # as a regular data structure for now. + # Then we propably have to pop the queue entries and put them into a list + # in the main program self.__tm_packet_queue = deque() def start(self): @@ -65,23 +74,26 @@ class TmListener: def perform_operation(self): while True: - if self.listener_active.is_set(): + if self.event_listener_active.is_set(): self.default_operation() else: time.sleep(1) def default_operation(self): self.com_interface.poll_interface() - if self.mode_change_event.is_set(): - # TODO: We should put this in a timeout.. Each mode operation up until now only takes - # a maximum specified time (software test 5 minutes maybe?). - # Otherwise, this is a permanent loop - self.mode_change_event.clear() - while not self.mode_op_finished.is_set(): + if self.event_mode_change.is_set(): + self.event_mode_change.clear() + start_time = time.time() + while not self.event_mode_op_finished.is_set(): + elapsed_time = time.time() - start_time + if elapsed_time < TmListener.MODE_OPERATION_TIMEOUT: self.perform_mode_operation() - self.mode_op_finished.clear() - logger.info("TmListener: Transitioning to listener mode.") - self.mode_id = g.ModeList.ListenerMode + else: + logger.warning("TmListener: Mode operation timeout occured!") + break + self.event_mode_op_finished.clear() + logger.info("TmListener: Transitioning to listener mode.") + self.mode_id = g.ModeList.ListenerMode def perform_mode_operation(self): """ @@ -91,24 +103,24 @@ class TmListener: """ # Listener Mode if self.mode_id == g.ModeList.ListenerMode: - # TODO: print packet list, this is not done automatically pass # Single Command Mode elif self.mode_id == g.ModeList.SingleCommandMode: # Listen for one reply sequence. if self.check_for_one_telemetry_sequence(): # Set reply event, will be cleared by checkForFirstReply() - self.replyEvent.set() + self.event_reply_received.set() # Sequential Command Mode elif self.mode_id == g.ModeList.ServiceTestMode or \ self.mode_id == g.ModeList.SoftwareTestMode: if self.check_for_one_telemetry_sequence(): logger.info("TmListener: Reply sequence received!") - self.replyEvent.set() + self.event_reply_received.set() elif self.mode_id == g.ModeList.UnitTest: - # TODO: needs to be reworked. + # TODO: needs to be reworked. Info queue is stupid. just pop the normal queue and + # pack the information manually # self.__tm_info_queue = self.com_interface.receive_telemetry_and_store_info(self.__tm_info_queue) - self.replyEvent.set() + self.event_reply_received.set() def check_for_one_telemetry_sequence(self) -> bool: """ @@ -120,26 +132,43 @@ class TmListener: if tm_ready is False: return False elif tm_ready is True: - self.__tm_packet_queue.append(self.com_interface.receive_telemetry()) - start_time = time.time() - elapsed_time = 0 - logger.info("TmListener: Listening for " + str(self.tm_timeout) + " seconds") - while elapsed_time < self.tm_timeout: - tm_ready = self.com_interface.data_available(1.0) - if tm_ready: - self.__tm_packet_queue.append(self.com_interface.receive_telemetry()) - elapsed_time = time.time() - start_time - # the timeout value can be set by special TC queue entries if wiretapping_packet handling - # takes longer, but it is reset here to the global value - if self.tm_timeout is not g.G_TM_TIMEOUT: - self.tm_timeout = g.G_TM_TIMEOUT - return True + return self.__read_telemetry_sequence() else: logger.error("TmListener: Configuration error in communication interface!") sys.exit() + def __read_telemetry_sequence(self): + """ + Thread-safe implementation for reading a telemetry sequence. + """ + if not self.lock_listener.acquire(True, timeout=1): + logger.error("TmListener: Blocked on lock acquisition!") + self.__tm_packet_queue.append(self.com_interface.receive_telemetry()) + self.lock_listener.release() + start_time = time.time() + elapsed_time = 0 + logger.info("TmListener: Listening for " + str(self.tm_timeout) + " seconds") + while elapsed_time < self.tm_timeout: + tm_ready = self.com_interface.data_available(1.0) + if tm_ready: + if not self.lock_listener.acquire(True, timeout=1): + logger.error("TmListener: Blocked on lock acquisition!") + self.__tm_packet_queue.append(self.com_interface.receive_telemetry()) + self.lock_listener.release() + elapsed_time = time.time() - start_time + # the timeout value can be set by special TC queue entries if wiretapping_packet handling + # takes longer, but it is reset here to the global value + if self.tm_timeout is not g.G_TM_TIMEOUT: + self.tm_timeout = g.G_TM_TIMEOUT + return True + + # TODO: Not thread-safe, implement lock def retrieve_tm_packet_queue(self) -> PusTmQueueT: - return self.__tm_packet_queue.copy() + if not self.lock_listener.acquire(True, timeout=1): + logger.error("TmListener: Blocked on lock acquisition!") + tm_queue_copy = self.__tm_packet_queue.copy() + self.lock_listener.release() + return tm_queue_copy @staticmethod def retrieve_info_queue_from_packet_queue( diff --git a/utility/obsw_tmtc_printer.py b/utility/obsw_tmtc_printer.py index aecd05ab88ea1961b66b3a4b0fe4eddd7112d531..e887ae948f851fe615f5f5c8edac6fb6b473e267 100644 --- a/utility/obsw_tmtc_printer.py +++ b/utility/obsw_tmtc_printer.py @@ -12,6 +12,7 @@ import os import enum from config import obsw_config as g from tm.obsw_pus_tm_base import PusTelemetry +from tm.obsw_pus_tm_factory import PusTmQueueT from tm.obsw_tm_service_3 import Service3TM from tc.obsw_pus_tc_base import PusTcInfoT, TcDictionaryKeys from utility.obsw_logger import get_logger @@ -48,6 +49,10 @@ class TmTcPrinter: # List implementation to store self.file_buffer_list = [] + def print_telemetry_queue(self, tm_queue: PusTmQueueT): + for tm_list in tm_queue: + for tm_packet in tm_list: + self.print_telemetry(tm_packet) def print_telemetry(self, packet: PusTelemetry): """ @@ -55,6 +60,7 @@ class TmTcPrinter: :param packet: :return: """ + print() if self.display_mode == DisplayMode.SHORT: self.__handle_short_print(packet) else: