diff --git a/comIF/obsw_ethernet_com_if.py b/comIF/obsw_ethernet_com_if.py index 7e8e69503533a79b5ecbdf3dd5c3d7343e79a76f..343c912868d5b30abbd92dbecdf7e7d2947aa9de 100644 --- a/comIF/obsw_ethernet_com_if.py +++ b/comIF/obsw_ethernet_com_if.py @@ -54,7 +54,7 @@ class EthernetComIF(CommunicationInterface): if ready: data = self.sock_receive.recvfrom(1024)[0] tm_packet = PusTelemetryFactory.create(data) - self.tmtc_printer.print_telemetry(tm_packet) + # self.tmtc_printer.print_telemetry(tm_packet) packet_list = [tm_packet] return True, packet_list return False, [] diff --git a/comIF/obsw_serial_com_if.py b/comIF/obsw_serial_com_if.py index 8e8ed275727ba6d273fc50a24f6b2a617e5c9ece..0bfafb7959c98fec36bd0c577239bb0242c1aff4 100644 --- a/comIF/obsw_serial_com_if.py +++ b/comIF/obsw_serial_com_if.py @@ -12,7 +12,7 @@ from typing import Tuple, List, Union, Optional import serial from comIF.obsw_com_interface import CommunicationInterface, PusTmQueueT -from utility.obsw_tmtc_printer import TmTcPrinterT +from utility.obsw_tmtc_printer import TmTcPrinter from tm.obsw_pus_tm_factory import PusTelemetryFactory, PusTmInfoQueueT, PusTmTupleQueueT, PusTmListT from tc.obsw_pus_tc_base import PusTcInfoT @@ -26,7 +26,7 @@ class SerialComIF(CommunicationInterface): Communication Interface to use serial communication. This requires the PySerial library on Windows. It has been tested only on Windows. """ - def __init__(self, tmtc_printer: TmTcPrinterT, com_port: str, baud_rate: int, + def __init__(self, tmtc_printer: TmTcPrinter, com_port: str, baud_rate: int, serial_timeout: float): super().__init__(tmtc_printer) try: diff --git a/config/obsw_config.py b/config/obsw_config.py index 81f87e6ba9785d2c34ae08a46c941ee630866beb..aab02521603c75ef78a89b2a8f8f31fe83ea869a 100644 --- a/config/obsw_config.py +++ b/config/obsw_config.py @@ -92,7 +92,7 @@ G_TMTC_PRINTER = None # noinspection PyUnusedLocal -def setGlobals(args): +def set_globals(args): global G_REC_ADDRESS, G_SEND_ADDRESS, G_SCRIPT_MODE, G_MODE_ID, G_SERVICE, G_DISPLAY_MODE,\ G_COM_IF, G_COM_PORT, G_SERIAL_TIMEOUT, G_TM_TIMEOUT, G_TC_SEND_TIMEOUT_FACTOR, \ G_PRINT_TO_FILE, G_PRINT_HK_DATA, G_PRINT_RAW_TM, G_PRINT_TM diff --git a/obsw_tmtc_client.py b/obsw_tmtc_client.py index aa30748a0a010ecec3e4376355730318408e5edb..0c5e73fecce298cbdb7b2967b12a11d107ad582a 100644 --- a/obsw_tmtc_client.py +++ b/obsw_tmtc_client.py @@ -61,7 +61,7 @@ from collections import deque from test import obsw_pus_service_test from config import obsw_config as g -from config.obsw_config import setGlobals +from config.obsw_config import set_globals from tc.obsw_pus_tc_packer import PusTelecommand, create_total_tc_queue, service_test_select from sendreceive.obsw_single_command_sender_receiver import SingleCommandSenderReceiver @@ -69,7 +69,7 @@ from sendreceive.obsw_sequential_sender_receiver import SequentialCommandSenderR from sendreceive.obsw_tm_listener import TmListener from utility.obsw_args_parser import parse_input_arguments -from utility.obsw_tmtc_printer import TmTcPrinter, TmTcPrinterT +from utility.obsw_tmtc_printer import TmTcPrinter from utility.obsw_exit_handler import keyboard_interrupt_handler from comIF.obsw_ethernet_com_if import EthernetComIF @@ -92,7 +92,7 @@ def main(): :return: """ args = parse_input_arguments() - setGlobals(args) + set_globals(args) tmtc_printer = TmTcPrinter(g.G_DISPLAY_MODE, g.G_PRINT_TO_FILE, True) if g.G_MODE_ID == g.ModeList.GUIMode: backend = TmTcBackend() @@ -116,7 +116,7 @@ def main(): pus_packet_tuple = command_preparation() sender_and_receiver = SingleCommandSenderReceiver( com_interface=communication_interface, tmtc_printer=tmtc_printer, tm_listener=tm_listener, - pusPacketTuple=pus_packet_tuple) + pus_packet_tuple=pus_packet_tuple) sender_and_receiver.send_single_tc_and_receive_tm() elif g.G_MODE_ID == g.ModeList.ServiceTestMode: @@ -167,7 +167,7 @@ def command_preparation(): return command.pack_command_tuple() -def set_communication_interface(tmtc_printer: TmTcPrinterT) -> ComIfT: +def set_communication_interface(tmtc_printer: TmTcPrinter) -> ComIfT: """ Return the desired communication interface object :param tmtc_printer: TmTcPrinter object. diff --git a/sendreceive/obsw_multiple_commands_sender_receiver.py b/sendreceive/obsw_multiple_commands_sender_receiver.py index fd81560047a94f173c53184ccdda508651c2cad3..8765d8ec25a82835d8d056c77ae68c7047c9f699 100644 --- a/sendreceive/obsw_multiple_commands_sender_receiver.py +++ b/sendreceive/obsw_multiple_commands_sender_receiver.py @@ -57,7 +57,7 @@ class MultipleCommandSenderReceiver(SequentialCommandSenderReceiver): self.__send_all_queue() self.wait_for_last_replies_listening(self._tm_timeout / 1.5) self.tmTupleQueue = self.__retrieve_listener_tm_tuple() - self._tm_listener.modeOpFinished.set() + self._tm_listener.mode_op_finished.set() if g.G_PRINT_TO_FILE: self._tmtc_printer.print_to_file() except (KeyboardInterrupt, SystemExit): diff --git a/sendreceive/obsw_sequential_sender_receiver.py b/sendreceive/obsw_sequential_sender_receiver.py index e15f5d4f93acc9075171765d3daf6c536bf36f51..64a5d8eeb0d8e858ce3b4282d251016f76c01908 100644 --- a/sendreceive/obsw_sequential_sender_receiver.py +++ b/sendreceive/obsw_sequential_sender_receiver.py @@ -11,7 +11,7 @@ import config.obsw_config as g from sendreceive.obsw_command_sender_receiver import CommandSenderReceiver from sendreceive.obsw_tm_listener import TmListenerT from comIF.obsw_com_interface import CommunicationInterface -from utility.obsw_tmtc_printer import TmTcPrinterT +from utility.obsw_tmtc_printer import TmTcPrinter from tc.obsw_pus_tc_base import TcQueueT @@ -19,7 +19,7 @@ class SequentialCommandSenderReceiver(CommandSenderReceiver): """ Specific implementation of CommandSenderReceiver to send multiple telecommands in sequence """ - def __init__(self, com_interface: CommunicationInterface, tmtc_printer: TmTcPrinterT, + def __init__(self, com_interface: CommunicationInterface, tmtc_printer: TmTcPrinter, tm_listener: TmListenerT, tc_queue: TcQueueT): """ :param com_interface: CommunicationInterface object, passed on to CommandSenderReceiver diff --git a/sendreceive/obsw_single_command_sender_receiver.py b/sendreceive/obsw_single_command_sender_receiver.py index cc72c8312f4e94d28428647ad37adf01c8c73536..8c3fc165365fdc4ee42f10835a6adcb463b66234 100644 --- a/sendreceive/obsw_single_command_sender_receiver.py +++ b/sendreceive/obsw_single_command_sender_receiver.py @@ -12,7 +12,7 @@ import logging from sendreceive.obsw_command_sender_receiver import CommandSenderReceiver from sendreceive.obsw_tm_listener import TmListenerT from comIF.obsw_com_interface import CommunicationInterface -from utility.obsw_tmtc_printer import TmTcPrinterT +from utility.obsw_tmtc_printer import TmTcPrinter import config.obsw_config as g @@ -21,8 +21,8 @@ class SingleCommandSenderReceiver(CommandSenderReceiver): Specific implementation of CommandSenderReceiver to send a single telecommand This object can be used by instantiating it and calling sendSingleTcAndReceiveTm() """ - def __init__(self, com_interface: CommunicationInterface, tmtc_printer: TmTcPrinterT, - tm_listener: TmListenerT, pusPacketTuple: tuple): + def __init__(self, com_interface: CommunicationInterface, tmtc_printer: TmTcPrinter, + tm_listener: TmListenerT, pus_packet_tuple: tuple): """ :param com_interface: CommunicationInterface object, passed on to CommandSenderReceiver :param tm_listener: TmListener object which runs in the background and receives all TM @@ -32,7 +32,7 @@ class SingleCommandSenderReceiver(CommandSenderReceiver): tmtc_printer=tmtc_printer) self.faulty_input = False try: - self.pus_packet, self.pus_packet_info = pusPacketTuple + self.pus_packet, self.pus_packet_info = pus_packet_tuple except TypeError: print("Invalid queue entry detected") logging.exception("Error") diff --git a/tc/obsw_tc_service2.py b/tc/obsw_tc_service2.py index b1c6151a487a903cc825a777571d2b459c27ad33..cf3b63613554a5759c71b9801a01f4da309fef43 100644 --- a/tc/obsw_tc_service2.py +++ b/tc/obsw_tc_service2.py @@ -56,8 +56,3 @@ def pack_wiretapping_mode(object_id, wiretapping_mode_): wiretapping_mode = struct.pack(">B", wiretapping_mode_) # MODE_OFF : 0x00, MODE_RAW: 0x01 wiretapping_toggle_data = object_id + wiretapping_mode return wiretapping_toggle_data - - -def pack_specific_service2_test_into(tcQueue, objectIdList, dataList, sscList, printStringList, - calledExternally=False): - pass diff --git a/tc/obsw_tc_service8.py b/tc/obsw_tc_service8.py index 7e2d2cdbcce89175ee23ec1d4f82bf9994d456f4..d0da23e443ddaf124f97f8addd216ed8243234f6 100644 --- a/tc/obsw_tc_service8.py +++ b/tc/obsw_tc_service8.py @@ -57,8 +57,3 @@ def pack_service8_test_into(tc_queue: Deque, called_externally: bool = False) -> if called_externally is False: tc_queue.appendleft(("export", "log/tmtc_log_service8.txt")) return tc_queue - - -def pack_specific_service8_test_into(tcQueue, objectIdList, actionIdList, dataList, - sscList, printStringList, calledExternally=False): - pass diff --git a/tm/obsw_tm_service_3.py b/tm/obsw_tm_service_3.py index 3a6791e2bda40abf8a7a35944ecf825d3598d29d..d5b0d0e8f18c9d319c559d08024e743a52d3a95a 100644 --- a/tm/obsw_tm_service_3.py +++ b/tm/obsw_tm_service_3.py @@ -7,6 +7,7 @@ Author: R. Mueller """ from tm.obsw_pus_tm_base import PusTelemetry +from typing import Type import struct @@ -24,9 +25,9 @@ class Service3TM(PusTelemetry): self.specify_packet_info("Housekeeping Packet") self.paramLength = 0 if self.get_subservice() == 10 or self.get_subservice() == 12: - self.handleFillingDefinitionArrays() + self.handle_filling_definition_arrays() if self.get_subservice() == 25 or self.get_subservice() == 26: - self.handleFillingHkArrays() + self.handle_filling_hk_arrays() def append_telemetry_content(self, array): super().append_telemetry_content(array) @@ -38,34 +39,34 @@ class Service3TM(PusTelemetry): array.append("SID") array.append("HK Data Size") - def handleFillingDefinitionArrays(self): + def handle_filling_definition_arrays(self): self.hkHeader = ["SID", "Report Status", "Collection Interval", "Number Of IDs"] - reportingEnabled = self._tm_data[4] - collectionInterval = struct.unpack('>f', self._tm_data[5:9])[0] - numberOfParams = self._tm_data[9] + reporting_enabled = self._tm_data[4] + collection_interval = struct.unpack('>f', self._tm_data[5:9])[0] + number_of_params = self._tm_data[9] parameters = [] index2 = 1 - for index in range(10, (numberOfParams * 4) + 10, 4): + for index in range(10, (number_of_params * 4) + 10, 4): parameter = struct.unpack('>I', self._tm_data[index:index + 4])[0] self.hkHeader.append("Pool ID " + str(index2)) parameters.append(str(hex(parameter))) index2 = index2 + 1 - if reportingEnabled == 1: - statusString = "On" + if reporting_enabled == 1: + status_string = "On" else: - statusString = "Off" - self.hkContent = [hex(self.sid), statusString, collectionInterval, numberOfParams] + status_string = "Off" + self.hkContent = [hex(self.sid), status_string, collection_interval, number_of_params] self.hkContent.extend(parameters) - def handleFillingHkArrays(self): + def handle_filling_hk_arrays(self): self.paramLength = len(self._tm_data) - 4 # TODO: This can be automated by using the MIB parser pool names and pool datatypes if self.sid == 0x1f00 or self.sid == 0x2f00: - self.handleGpsHkData() + self.handle_gps_hk_data() elif self.sid == 0x4300 or self.sid == 0x4400: - self.handleTestHkData() + self.handle_test_hk_data() - def handleGpsHkData(self): + def handle_gps_hk_data(self): self.numberOfParameters = 9 self.hkHeader = ["Fix Mode", "SV in Fix", "GNSS Week", "Time of Week", "Latitude", "Longitude", "Mean Sea Altitude", "Position X", "Position Y", "Position Z", @@ -91,7 +92,7 @@ class Service3TM(PusTelemetry): # print(str(format(self.validityBuffer[1], '#010b'))) # print("Validity Buffer Length: " + str(len(self.validityBuffer))) - def handleTestHkData(self): + def handle_test_hk_data(self): self.numberOfParameters = 6 self.hkHeader = ["Bool", "UINT8", "UINT16", "UINT32", "FLOAT1", "FLOAT2"] testBool = self._tm_data[4] @@ -107,3 +108,4 @@ class Service3TM(PusTelemetry): # print("Validity Buffer Length: " + str(len(self.validityBuffer))) +Service3TM: Type[PusTelemetry] diff --git a/utility/obsw_tmtc_printer.py b/utility/obsw_tmtc_printer.py index 670bff0517ef9a60e3828a4b16e38f2b52ad609f..72e3a7ade6c6fb8a4762d1eace1c8d5721c91bc7 100644 --- a/utility/obsw_tmtc_printer.py +++ b/utility/obsw_tmtc_printer.py @@ -10,14 +10,11 @@ """ import os import sys -from typing import TypeVar from config import obsw_config as g from tm.obsw_pus_tm_base import PusTelemetry from tm.obsw_tm_service_3 import Service3TM from tc.obsw_pus_tc_base import PusTcT, PusTcInfoT, TcDictionaryKeys -TmTcPrinterT = TypeVar('TmTcPrinterT', bound='TmTcPrinter') - class TmTcPrinter: """