diff --git a/comIF/obsw_serial_com_if.py b/comIF/obsw_serial_com_if.py index f567127224036bf814dfe2483d126e4e66811d27..7a9055950619156567ddbc6f37ff796db2172e2b 100644 --- a/comIF/obsw_serial_com_if.py +++ b/comIF/obsw_serial_com_if.py @@ -27,11 +27,11 @@ class SerialComIF(CommunicationInterface): Communication Interface to use serial communication. This requires the PySerial library on Windows. It has been tested only on Windows. """ - def __init__(self, tmtcPrinter: TmTcPrinterT, comPort: str, baudRate: int, - serialTimeout: float): - super().__init__(tmtcPrinter) + def __init__(self, tmtc_printer: TmTcPrinterT, com_port: str, baud_rate: int, + serial_timeout: float): + super().__init__(tmtc_printer) try: - self.serial = serial.Serial(port=comPort, baudrate=baudRate, timeout=serialTimeout) + self.serial = serial.Serial(port=com_port, baudrate=baud_rate, timeout=serial_timeout) except serial.SerialException: print("Serial Port can not be opened") logging.exception("Error") @@ -119,21 +119,21 @@ class SerialComIF(CommunicationInterface): self.number_of_packets = self.number_of_packets + 1 return end_index - def receive_telemetry_and_store_tm(self, tmQueue: PusTmQueueT) -> Union[None, PusTmQueueT]: + def receive_telemetry_and_store_tm(self, tm_queue: PusTmQueueT) -> Union[None, PusTmQueueT]: (packet_received, pus_packets) = self.poll_interface() if packet_received: for packet in pus_packets: - tmQueue.append(packet) - return tmQueue + tm_queue.append(packet) + return tm_queue - def receive_telemetry_and_store_info(self, tmInfoQueue: PusTmInfoQueueT) -> \ + def receive_telemetry_and_store_info(self, tm_info_queue: PusTmInfoQueueT) -> \ Union[None, PusTmInfoQueueT]: (packet_received, pus_packets) = self.poll_interface() if packet_received: for packet in pus_packets: tm_info = packet.pack_tm_information() - tmInfoQueue.append(tm_info) - return tmInfoQueue + tm_info_queue.append(tm_info) + return tm_info_queue def receive_telemetry_and_store_tuple(self, tm_tuple_queue: PusTmTupleQueueT) -> \ Union[None, PusTmTupleQueueT]: diff --git a/obsw_tmtc_client.py b/obsw_tmtc_client.py index 16087315ae3d045bf659b61b9a74f07b6e2d7249..1b4a46489f60df8a4fa4f22b872b628f614d58ac 100644 --- a/obsw_tmtc_client.py +++ b/obsw_tmtc_client.py @@ -184,8 +184,8 @@ def set_communication_interface(tmtc_printer: TmTcPrinterT) -> ComIfT: baud_rate = 115200 g.G_SERIAL_TIMEOUT = 0.05 communication_interface = SerialComIF( - tmtcPrinter=tmtc_printer, comPort=com_port, baudRate=baud_rate, - serialTimeout=g.G_SERIAL_TIMEOUT) + tmtc_printer=tmtc_printer, com_port=com_port, baud_rate=baud_rate, + serial_timeout=g.G_SERIAL_TIMEOUT) # else: # pass return communication_interface diff --git a/sendreceive/obsw_sequential_sender_receiver.py b/sendreceive/obsw_sequential_sender_receiver.py index b486490309fcc6c478ab8fea37cb9c55a72b3ffb..c0b896ccecc0a0363c6425b80d060e3a0794d406 100644 --- a/sendreceive/obsw_sequential_sender_receiver.py +++ b/sendreceive/obsw_sequential_sender_receiver.py @@ -64,7 +64,7 @@ class SequentialCommandSenderReceiver(CommandSenderReceiver): print("Sequential SenderReceiver: All replies received!") if g.G_PRINT_TO_FILE: print("Sequential SenderReceiver: Exporting output to log file.") - self._tmtc_printer.print_to_file() + self._tmtc_printer.print_to_file("log/tmtc_log.txt", True) def __perform_next_tc_send(self): if self._tm_listener.replyEvent.is_set(): diff --git a/test/obsw_pus_service_test.py b/test/obsw_pus_service_test.py index 310d5477d38864ef5b3bad0f86deab362e53130c..945b5e629f68c3e0db2cd75d497dbfd8886d592c 100644 --- a/test/obsw_pus_service_test.py +++ b/test/obsw_pus_service_test.py @@ -16,33 +16,33 @@ class TestService2(TestService): packService2TestInto(cls.test_queue) def test_Service2(self): - assertionDict = self.perform_testing_and_generate_assertion_dict() + assertion_dict = self.perform_testing_and_generate_assertion_dict() self.eventExpected = 1 self.miscExpected = 4 - super()._perform_generic_assertion_test(assertionDict) + super()._perform_generic_assertion_test(assertion_dict) - def analyseTmInfo(self, tmInfoQueue: PusTmInfoQueueT, assertionDict: dict): - while not tmInfoQueue.__len__() == 0: - currentTmInfo = tmInfoQueue.pop() + def analyseTmInfo(self, tm_info_queue: PusTmInfoQueueT, assertion_dict: dict): + while not tm_info_queue.__len__() == 0: + current_tm_info = tm_info_queue.pop() # Tc verification scanning is generic and has been moved to the superclass - if currentTmInfo["service"] == 1: - super().scanForRespectiveTc(currentTmInfo) + if current_tm_info["service"] == 1: + super().scanForRespectiveTc(current_tm_info) # Here, the desired event Id or RID can be specified - if currentTmInfo["service"] == 5: + if current_tm_info["service"] == 5: # mode change - if currentTmInfo["EventID"] == 7401 and currentTmInfo["RID"] == 0x4400affe: + if current_tm_info["EventID"] == 7401 and current_tm_info["RID"] == 0x4400affe: self.eventCounter = self.eventCounter + 1 - if currentTmInfo["service"] == 200 and currentTmInfo["subservice"] == 6: + if current_tm_info["service"] == 200 and current_tm_info["subservice"] == 6: # mode change confirmation self.miscCounter = self.miscCounter + 1 # wiretapping messages - elif currentTmInfo["service"] == 2 and currentTmInfo["subservice"] == 130: + elif current_tm_info["service"] == 2 and current_tm_info["subservice"] == 130: self.miscCounter = self.miscCounter + 1 - elif currentTmInfo["service"] == 2 and currentTmInfo["subservice"] == 131: + elif current_tm_info["service"] == 2 and current_tm_info["subservice"] == 131: self.miscCounter = self.miscCounter + 1 - if currentTmInfo["valid"] == 0: + if current_tm_info["valid"] == 0: self.valid = False - assertionDict.update({"MiscCount": self.miscCounter}) + assertion_dict.update({"MiscCount": self.miscCounter}) class TestService5(TestService):