diff --git a/comIF/obsw_com_interface.py b/comIF/obsw_com_interface.py index 0b271f219ca24a6393900cf574bd4bfa7d7b8886..8481faecf37ebf235b143be577284f7456c5a3e0 100644 --- a/comIF/obsw_com_interface.py +++ b/comIF/obsw_com_interface.py @@ -9,7 +9,8 @@ Description: Generic Communication Interface. Defines the syntax of the communic """ from abc import abstractmethod from typing import TypeVar, Tuple, Union -from tm.obsw_pus_tm_base import PusTmQueueT, PusTmTupleQueueT, PusTmInfoQueueT, PusTmListT +from tm.obsw_pus_tm_base import PusTmTupleQueueT, PusTmInfoQueueT, PusTmListT +from tm.obsw_pus_tm_factory import PusTmQueueT from utility.obsw_tmtc_printer import TmTcPrinterT from tc.obsw_pus_tc_base import PusTcInfoT diff --git a/comIF/obsw_dummy_com_if.py b/comIF/obsw_dummy_com_if.py index 0f3eacee482a8d898a704979ae7d9ecdabc7685e..ac20f17d11380fb9e675cb08de2e567358365f32 100644 --- a/comIF/obsw_dummy_com_if.py +++ b/comIF/obsw_dummy_com_if.py @@ -23,5 +23,5 @@ class DummyComIF(CommunicationInterface): def receive_telemetry(self, parameters: any = 0): pass - def send_telecommand(self, tcPacket, tcPacketInfo="") -> None: + def send_telecommand(self, tc_packet, tc_packet_info="") -> None: pass diff --git a/comIF/obsw_ethernet_com_if.py b/comIF/obsw_ethernet_com_if.py index 94b19d6c55d6a88b6617a766528e6ec98e4218f1..14200fc3c793dad1e8885c3636e40e8fb48f691c 100644 --- a/comIF/obsw_ethernet_com_if.py +++ b/comIF/obsw_ethernet_com_if.py @@ -25,15 +25,15 @@ class EthernetComIF(CommunicationInterface): """ Communication interface for UDP communication. """ - def __init__(self, tmtc_printer: TmTcPrinterT, tmTimeout: float, tcTimeoutFactor: float, - receiveAddress: g.ethernetAddressT, sendAddress: g.ethernetAddressT): + def __init__(self, tmtc_printer: TmTcPrinterT, tm_timeout: float, tc_timeout_factor: float, + receive_address: g.ethernetAddressT, send_address: g.ethernetAddressT): super().__init__(tmtc_printer) - self.tm_timeout = tmTimeout - self.tc_timeout_factor = tcTimeoutFactor + self.tm_timeout = tm_timeout + self.tc_timeout_factor = tc_timeout_factor self.sock_send = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.sock_receive = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - self.receive_address = receiveAddress - self.send_address = sendAddress + self.receive_address = receive_address + self.send_address = send_address self.set_up_socket() def close(self) -> None: @@ -53,9 +53,9 @@ class EthernetComIF(CommunicationInterface): ready = self.data_available(pollTimeout) if ready: data = self.sock_receive.recvfrom(1024)[0] - packet = PusTelemetryFactory(data) - self.tmtc_printer.print_telemetry(packet) - packet_list = [packet] + tm_packet = PusTelemetryFactory.create(data) + self.tmtc_printer.print_telemetry(tm_packet) + packet_list = [tm_packet] return True, packet_list return False, [] diff --git a/comIF/obsw_serial_com_if.py b/comIF/obsw_serial_com_if.py index f567127224036bf814dfe2483d126e4e66811d27..f3df7345233326e0c75cecb67f4291cef163b4bf 100644 --- a/comIF/obsw_serial_com_if.py +++ b/comIF/obsw_serial_com_if.py @@ -27,11 +27,11 @@ class SerialComIF(CommunicationInterface): Communication Interface to use serial communication. This requires the PySerial library on Windows. It has been tested only on Windows. """ - def __init__(self, tmtcPrinter: TmTcPrinterT, comPort: str, baudRate: int, - serialTimeout: float): - super().__init__(tmtcPrinter) + def __init__(self, tmtc_printer: TmTcPrinterT, com_port: str, baud_rate: int, + serial_timeout: float): + super().__init__(tmtc_printer) try: - self.serial = serial.Serial(port=comPort, baudrate=baudRate, timeout=serialTimeout) + self.serial = serial.Serial(port=com_port, baudrate=baud_rate, timeout=serial_timeout) except serial.SerialException: print("Serial Port can not be opened") logging.exception("Error") @@ -62,7 +62,7 @@ class SerialComIF(CommunicationInterface): pus_data_list, number_of_packets = self.__poll_pus_packets() packet_list = [] for counter in range(0, number_of_packets): - packet = PusTelemetryFactory(pus_data_list[counter]) + packet = PusTelemetryFactory.create(pus_data_list[counter]) self.tmtc_printer.print_telemetry(packet) packet_list.append(packet) return True, packet_list @@ -119,21 +119,21 @@ class SerialComIF(CommunicationInterface): self.number_of_packets = self.number_of_packets + 1 return end_index - def receive_telemetry_and_store_tm(self, tmQueue: PusTmQueueT) -> Union[None, PusTmQueueT]: + def receive_telemetry_and_store_tm(self, tm_queue: PusTmQueueT) -> Union[None, PusTmQueueT]: (packet_received, pus_packets) = self.poll_interface() if packet_received: for packet in pus_packets: - tmQueue.append(packet) - return tmQueue + tm_queue.append(packet) + return tm_queue - def receive_telemetry_and_store_info(self, tmInfoQueue: PusTmInfoQueueT) -> \ + def receive_telemetry_and_store_info(self, tm_info_queue: PusTmInfoQueueT) -> \ Union[None, PusTmInfoQueueT]: (packet_received, pus_packets) = self.poll_interface() if packet_received: for packet in pus_packets: tm_info = packet.pack_tm_information() - tmInfoQueue.append(tm_info) - return tmInfoQueue + tm_info_queue.append(tm_info) + return tm_info_queue def receive_telemetry_and_store_tuple(self, tm_tuple_queue: PusTmTupleQueueT) -> \ Union[None, PusTmTupleQueueT]: diff --git a/obsw_tmtc_client.py b/obsw_tmtc_client.py index 909736ebace6cb6bf0865b2848bcc65fc77ea32d..bd41525fed75fd0d948dbed8e08a3957ea32cad2 100644 --- a/obsw_tmtc_client.py +++ b/obsw_tmtc_client.py @@ -107,8 +107,8 @@ def main(): communication_interface = set_communication_interface(tmtc_printer) atexit.register(keyboard_interrupt_handler, comInterface=communication_interface) tm_listener = TmListener( - comInterface=communication_interface, tmTimeout=g.G_TM_TIMEOUT, - tcTimeoutFactor=g.G_TC_SEND_TIMEOUT_FACTOR) + com_interface=communication_interface, tm_timeout=g.G_TM_TIMEOUT, + tc_timeout_factor=g.G_TC_SEND_TIMEOUT_FACTOR) tm_listener.start() if g.G_MODE_ID == g.ModeList.ListenerMode: print("Listening for packages...") @@ -176,16 +176,16 @@ def set_communication_interface(tmtc_printer: TmTcPrinterT) -> ComIfT: try: if g.G_COM_IF == g.ComIF.Ethernet: communication_interface = EthernetComIF( - tmtc_printer=tmtc_printer, tmTimeout=g.G_TM_TIMEOUT, - tcTimeoutFactor=g.G_TC_SEND_TIMEOUT_FACTOR, sendAddress=g.G_SEND_ADDRESS, - receiveAddress=g.G_REC_ADDRESS) + tmtc_printer=tmtc_printer, tm_timeout=g.G_TM_TIMEOUT, + tc_timeout_factor=g.G_TC_SEND_TIMEOUT_FACTOR, send_address=g.G_SEND_ADDRESS, + receive_address=g.G_REC_ADDRESS) else: com_port = g.G_COM_PORT baud_rate = 115200 g.G_SERIAL_TIMEOUT = 0.05 communication_interface = SerialComIF( - tmtcPrinter=tmtc_printer, comPort=com_port, baudRate=baud_rate, - serialTimeout=g.G_SERIAL_TIMEOUT) + tmtc_printer=tmtc_printer, com_port=com_port, baud_rate=baud_rate, + serial_timeout=g.G_SERIAL_TIMEOUT) # else: # pass return communication_interface diff --git a/sendreceive/obsw_multiple_commands_sender_receiver.py b/sendreceive/obsw_multiple_commands_sender_receiver.py index 77b38a376654f445e510d4657efd75dc6a44098e..1bc839d20d6ef478cd1994ce159a86bbd985cdbb 100644 --- a/sendreceive/obsw_multiple_commands_sender_receiver.py +++ b/sendreceive/obsw_multiple_commands_sender_receiver.py @@ -51,13 +51,13 @@ class MultipleCommandSenderReceiver(SequentialCommandSenderReceiver): def send_tc_queue_and_return_info(self): try: - self._tm_listener.modeId = g.ModeList.UnitTest - self._tm_listener.modeChangeEvent.set() + self._tm_listener.mode_id = g.ModeList.UnitTest + self._tm_listener.mode_change_event.set() # TC info queue is set in this function self.__send_all_queue() self.wait_for_last_replies_listening(self._tm_timeout / 1.5) self.tmInfoQueue = self.__retrieve_listener_tm_info_queue() - self._tm_listener.modeOpFinished.set() + self._tm_listener.mode_op_finished.set() if g.G_PRINT_TO_FILE: self._tmtc_printer.print_to_file() except (KeyboardInterrupt, SystemExit): @@ -95,13 +95,13 @@ class MultipleCommandSenderReceiver(SequentialCommandSenderReceiver): def __retrieve_listener_tm_info_queue(self): if self._tm_listener.replyEvent.is_set(): - return self._tm_listener.tmInfoQueue + return self._tm_listener.tm_info_queue else: print("Multiple Command SenderReceiver: Configuration error," " reply event not set in TM listener") def __clear_listener_tm_info_queue(self): - self._tm_listener.tmInfoQueue.clear() + self._tm_listener.tm_info_queue.clear() @staticmethod def wait_for_last_replies_listening(waitTime: float): diff --git a/sendreceive/obsw_sequential_sender_receiver.py b/sendreceive/obsw_sequential_sender_receiver.py index b486490309fcc6c478ab8fea37cb9c55a72b3ffb..5a10b3bd9ba200fe50308f0335535a572b437e49 100644 --- a/sendreceive/obsw_sequential_sender_receiver.py +++ b/sendreceive/obsw_sequential_sender_receiver.py @@ -40,8 +40,8 @@ class SequentialCommandSenderReceiver(CommandSenderReceiver): Primary function which is called for sequential transfer. :return: """ - self._tm_listener.modeId = g.ModeList.ServiceTestMode - self._tm_listener.modeChangeEvent.set() + self._tm_listener.mode_id = g.ModeList.ServiceTestMode + self._tm_listener.mode_change_event.set() self.__send_and_receive_first_packet() # this flag is set in the separate thread ! try: @@ -59,7 +59,7 @@ class SequentialCommandSenderReceiver(CommandSenderReceiver): self._start_time = time.time() self._check_for_timeout() if not self.__mode_op_finished: - self._tm_listener.modeOpFinished.set() + self._tm_listener.mode_op_finished.set() self.__mode_op_finished = True print("Sequential SenderReceiver: All replies received!") if g.G_PRINT_TO_FILE: diff --git a/sendreceive/obsw_single_command_sender_receiver.py b/sendreceive/obsw_single_command_sender_receiver.py index 4fa2ea00adc32e9b3d37cef5c1a465ed8fc56d39..4fc0900128d3b676ef6e8027cf304fe132fa54e9 100644 --- a/sendreceive/obsw_single_command_sender_receiver.py +++ b/sendreceive/obsw_single_command_sender_receiver.py @@ -43,13 +43,13 @@ class SingleCommandSenderReceiver(CommandSenderReceiver): :return: """ if not self.faulty_input: - self._tm_listener.modeId = g.ModeList.SingleCommandMode - self._tm_listener.modeChangeEvent.set() + self._tm_listener.mode_id = g.ModeList.SingleCommandMode + self._tm_listener.mode_change_event.set() self._com_interface.send_telecommand(self.pus_packet, self.pus_packet_info) while not self._reply_received: # wait until reply is received super()._check_for_first_reply() if self._reply_received: print("Single Command SenderReceiver: Reply received") - self._tm_listener.modeOpFinished.set() + self._tm_listener.mode_op_finished.set() print("Listening for packages ...") diff --git a/sendreceive/obsw_tm_listener.py b/sendreceive/obsw_tm_listener.py index 64a6e699ea0578a725c991fc12e5aa6419fbd629..3c4b109dd5172c8b4bd9b6e53b4ed6c2f477a836 100644 --- a/sendreceive/obsw_tm_listener.py +++ b/sendreceive/obsw_tm_listener.py @@ -15,6 +15,8 @@ from collections import deque from typing import TypeVar from comIF.obsw_com_interface import ComIfT +from tm.obsw_pus_tm_base import PusTmInfoQueueT +from tm.obsw_pus_tm_factory import PusTmQueueT import config.OBSW_Config as g TmListenerT = TypeVar('TmListenerT', bound='TmListener') @@ -27,102 +29,117 @@ class TmListener: and changing the mode to do special mode operations. The mode operation ends as soon the modeOpFinished Event is set() ! """ - def __init__(self, comInterface: ComIfT, tmTimeout: float, tcTimeoutFactor: float): - self.tmTimeout = tmTimeout - self.tcTimeoutFactor = tcTimeoutFactor - self.comInterface = comInterface + def __init__(self, com_interface: ComIfT, tm_timeout: float, tc_timeout_factor: float): + self.tm_timeout = tm_timeout + self.tc_timeout_factor = tc_timeout_factor + self.com_interface = com_interface # this will be the default mode (listener mode) - self.modeId = g.ModeList.ListenerMode + self.mode_id = g.ModeList.ListenerMode # TM Listener operations can be suspended by setting this flag - self.listenerActive = threading.Event() - self.listenerActive.set() + self.listener_active = threading.Event() + self.listener_active.set() # I don't think a listener is useful without the main program, # so we might just declare it daemonic. # UPDATE: Right now, the main program is not in a permanent loop and setting the # thread daemonic will cancel the program. # Solved for now by setting a permanent loop at the end of the main program - self.listenerThread = threading.Thread(target=self.performOperation, daemon=True) + self.listener_thread = threading.Thread(target=self.perform_operation, daemon=True) # This Event is set by sender objects to perform mode operations - self.modeChangeEvent = threading.Event() + self.mode_change_event = threading.Event() # This Event is set by sender objects if all necessary operations are done # to transition back to listener mode - self.modeOpFinished = threading.Event() + self.mode_op_finished = threading.Event() # maybe we will just make the thread daemonic... # self.terminationEvent = threading.Event() # This Event is set and cleared by the listener to inform the sender objects # if a reply has been received self.replyEvent = threading.Event() # Will be filled for the Unit Test - self.tmInfoQueue = deque() + self.tm_info_queue = deque() + self.tm_packet_queue = deque() def start(self): - self.listenerThread.start() + self.listener_thread.start() - def performOperation(self): + def perform_operation(self): while True: - if self.listenerActive.is_set(): - self.defaultOperation() + if self.listener_active.is_set(): + self.default_operation() else: time.sleep(1) - def defaultOperation(self): - self.comInterface.poll_interface() - if self.modeChangeEvent.is_set(): + def default_operation(self): + self.com_interface.poll_interface() + if self.mode_change_event.is_set(): # TODO: We should put this in a timeout.. Each mode operation up until now only takes # a maximum specified time (software test 5 minutes maybe?). # Otherwise, this is a permanent loop - self.modeChangeEvent.clear() - while not self.modeOpFinished.is_set(): - self.performModeOperation() - self.modeOpFinished.clear() + self.mode_change_event.clear() + while not self.mode_op_finished.is_set(): + self.perform_mode_operation() + self.mode_op_finished.clear() print("Transitioning to listener mode.") - self.modeId = g.ModeList.ListenerMode + self.mode_id = g.ModeList.ListenerMode - def performModeOperation(self): + def perform_mode_operation(self): """ By setting the modeChangeEvent with set() and specifying the mode variable, the TmListener is instructed to perform certain operations. :return: """ # Listener Mode - if self.modeId == g.ModeList.ListenerMode: + if self.mode_id == g.ModeList.ListenerMode: + # TODO: print packet list, this is not done automatically pass # Single Command Mode - elif self.modeId == g.ModeList.SingleCommandMode: + elif self.mode_id == g.ModeList.SingleCommandMode: # Listen for one reply sequence. - if self.checkForOneTelemetrySequence(): + if self.check_for_one_telemetry_sequence(): # Set reply event, will be cleared by checkForFirstReply() self.replyEvent.set() # Sequential Command Mode - elif self.modeId == g.ModeList.ServiceTestMode or \ - self.modeId == g.ModeList.SoftwareTestMode: - if self.checkForOneTelemetrySequence(): + elif self.mode_id == g.ModeList.ServiceTestMode or \ + self.mode_id == g.ModeList.SoftwareTestMode: + if self.check_for_one_telemetry_sequence(): print("TM Listener: Reply sequence received!") self.replyEvent.set() - elif self.modeId == g.ModeList.UnitTest: - self.tmInfoQueue = self.comInterface.receive_telemetry_and_store_info(self.tmInfoQueue) + elif self.mode_id == g.ModeList.UnitTest: + self.tm_info_queue = self.com_interface.receive_telemetry_and_store_info(self.tm_info_queue) self.replyEvent.set() - def checkForOneTelemetrySequence(self) -> bool: + def check_for_one_telemetry_sequence(self) -> bool: """ Receive all telemetry for a specified time period. This function prints the telemetry sequence but does not return it. :return: """ - tmReady = self.comInterface.data_available(self.tmTimeout * self.tcTimeoutFactor) - if tmReady is False: + tm_ready = self.com_interface.data_available(self.tm_timeout * self.tc_timeout_factor) + if tm_ready is False: return False else: - self.comInterface.receive_telemetry() + self.tm_packet_queue.append(self.com_interface.receive_telemetry()) start_time = time.time() elapsed_time = 0 - while elapsed_time < self.tmTimeout: - tmReady = self.comInterface.data_available(1.0) - if tmReady: - self.comInterface.receive_telemetry() + while elapsed_time < self.tm_timeout: + tm_ready = self.com_interface.data_available(1.0) + if tm_ready: + self.tm_packet_queue.append(self.com_interface.receive_telemetry()) elapsed_time = time.time() - start_time # the timeout value can be set by special TC queue entries if packet handling # takes longer, but it is reset here to the global value - if self.tmTimeout is not g.G_TM_TIMEOUT: - self.tmTimeout = g.G_TM_TIMEOUT + if self.tm_timeout is not g.G_TM_TIMEOUT: + self.tm_timeout = g.G_TM_TIMEOUT return True + + def retrieve_tm_packet_queue(self): + return self.tm_packet_queue.copy() + + @staticmethod + def retrieve_tm_info_queue(tm_info_queue_to_fill: PusTmInfoQueueT, tm_queue: PusTmQueueT): + while tm_queue.__len__() != 0: + tm_packet = tm_queue.pop() + tm_info_queue_to_fill.append(tm_packet.pack_tm_information()) + + def clear_tm_packet_queue(self): + self.tm_packet_queue.clear() + diff --git a/tm/obsw_pus_tm_base.py b/tm/obsw_pus_tm_base.py index 012cc49fc744d84ee009668e98c3b52239396614..67823a9b392380da2b9c207a85078fba9fb69dec 100644 --- a/tm/obsw_pus_tm_base.py +++ b/tm/obsw_pus_tm_base.py @@ -13,7 +13,7 @@ PusTmT = TypeVar('PusTmT', bound='PUSTelemetry') PusTmInfoT = Dict[str, any] PusTmTupleT = Tuple[PusTmInfoT, PusTmT] -PusTmQueueT = Deque[PusTmT] + PusTmListT = List[PusTmT] PusTmInfoQueueT = Deque[PusTmInfoT] PusTmTupleQueueT = Deque[PusTmTupleT] @@ -259,6 +259,7 @@ class ObswTimestamp: array.append("OBSWTime (s)") array.append("Time") + # pylint: disable=line-too-long # Structure of a PUS Packet : # A PUS packet consists of consecutive bits, the allocation and structure is standardised. diff --git a/tm/obsw_pus_tm_factory.py b/tm/obsw_pus_tm_factory.py index 125127e2633f13fa2bb73765913c2df00977cf7a..cc09041400b1dbf00c9c547fbc8e253bb41f1580 100644 --- a/tm/obsw_pus_tm_factory.py +++ b/tm/obsw_pus_tm_factory.py @@ -5,7 +5,7 @@ Date: 01.11.2019 Description: Deserialize TM byte header_list into PUS TM Class Author: R.Mueller, S. Gaisser """ - +from typing import Deque, Type from tm.obsw_pus_tm_base import PusTelemetry from tm.obsw_tm_service_1 import Service1TM from tm.obsw_tm_service_3 import Service3TM @@ -13,37 +13,60 @@ from tm.obsw_tm_service_5 import Service5TM import struct -def PusTelemetryFactory(raw_packet: bytes): - """ - Returns a PusTelemetry class instance by extracting the service directly from the raw packet. - Sneaky solution allows this function to immediately return - the right telemetry packet - :param raw_packet: - :return: - """ - service_type = raw_packet[7] - if service_type == 1: - return Service1TM(raw_packet) - elif service_type == 2: - return Service2TM(raw_packet) - elif service_type == 3: - return Service3TM(raw_packet) - elif service_type == 5: - return Service5TM(raw_packet) - elif service_type == 8: - return Service8TM(raw_packet) - elif service_type == 17: - return Service17TM(raw_packet) - elif service_type == 200: - return Service200TM(raw_packet) - else: +PusTmQueueT = Deque[PusTelemetry] + + +class PusTelemetryFactory(object): + @staticmethod + def create(raw_tm_packet: bytes) -> PusTelemetry: + service_type = raw_tm_packet[7] + if service_type == 1: + return Service1TM(raw_tm_packet) + if service_type == 2: + return Service2TM(raw_tm_packet) + if service_type == 3: + return Service3TM(raw_tm_packet) + if service_type == 5: + return Service5TM(raw_tm_packet) + if service_type == 8: + return Service8TM(raw_tm_packet) + if service_type == 17: + return Service17TM(raw_tm_packet) + if service_type == 200: + return Service200TM(raw_tm_packet) print("The service " + str(service_type) + " is not implemented in Telemetry Factory") - return PusTelemetry(raw_packet) - - + return PusTelemetry(raw_tm_packet) + + +# def pus_telemetry_factory(raw_packet: bytes): +# """ +# Returns a PusTelemetry class instance by extracting the service directly from the raw packet. +# Sneaky solution allows this function to immediately return +# the right telemetry packet +# :param raw_packet: +# :return: +# """ +# service_type = raw_packet[7] +# if service_type == 1: +# return Service1TM(raw_packet) +# elif service_type == 2: +# return Service2TM(raw_packet) +# elif service_type == 3: +# return Service3TM(raw_packet) +# elif service_type == 5: +# return Service5TM(raw_packet) +# elif service_type == 8: +# return Service8TM(raw_packet) +# elif service_type == 17: +# return Service17TM(raw_packet) +# elif service_type == 200: +# return Service200TM(raw_packet) +# else: +# print("The service " + str(service_type) + " is not implemented in Telemetry Factory") +# return PusTelemetry(raw_packet) class Service2TM(PusTelemetry): - def __init__(self, byteArray): - super().__init__(byteArray) + def __init__(self, byte_array): + super().__init__(byte_array) def append_telemetry_content(self, array): super().append_telemetry_content(array) @@ -55,8 +78,9 @@ class Service2TM(PusTelemetry): class Service8TM(PusTelemetry): - def __init__(self, byteArray): - super().__init__(byteArray) + def __init__(self, byte_array): + super().__init__(byte_array) + self.specify_packet_info("Functional Commanding Reply") def append_telemetry_content(self, array): super().append_telemetry_content(array) @@ -68,8 +92,9 @@ class Service8TM(PusTelemetry): class Service9TM(PusTelemetry): - def __init__(self, byteArray): - super().__init__(byteArray) + def __init__(self, byte_array): + super().__init__(byte_array) + self.specify_packet_info("Time Service Reply") def append_telemetry_content(self, array): super().append_telemetry_content(array) @@ -95,8 +120,8 @@ class Service17TM(PusTelemetry): class Service200TM(PusTelemetry): - def __init__(self, byteArray): - super().__init__(byteArray) + def __init__(self, byte_array): + super().__init__(byte_array) self.isCantReachModeReply = False self.isModeReply = False self.specify_packet_info("Mode Reply") diff --git a/utility/obsw_args_parser.py b/utility/obsw_args_parser.py index 71ab6a65e98ba82b0561913333f73a7489287bec..05f9c9a4bfe484b32bb14618a63402942ec52548 100644 --- a/utility/obsw_args_parser.py +++ b/utility/obsw_args_parser.py @@ -80,7 +80,7 @@ def handle_unspecified_args(args) -> None: :return: None """ if args.comIF == 1 and args.tm_timeout is None: - args.tmTimeout = 6.0 + args.tm_timeout = 6.0 if args.comIF == 1 and args.com_port is None: args.com_port = input("Serial Commuinication specified without COM port. " "Please enter COM Port: ")