diff --git a/comIF/OBSW_ComInterface.py b/comIF/OBSW_ComInterface.py index 21a90ebae70eaf7e943a145fe791557f75f28fb9..7f9af565b35566e8ac2acfed04f8a1a87854cf2a 100644 --- a/comIF/OBSW_ComInterface.py +++ b/comIF/OBSW_ComInterface.py @@ -10,7 +10,7 @@ Description: Generic Communication Interface. Defines the syntax of the communic from abc import abstractmethod from typing import TypeVar, Tuple, Union from tm.OBSW_PusTm import pusTmQueueT, pusTmTupleQueueT, pusTmInfoQueueT, pusTmListT -from utility.OBSW_TmTcPrinter import TmTcPrinterT +from utility.obsw_tmtc_printer import TmTcPrinterT from tc.OBSW_TcPacket import pusTcInfoT ComIF_T = TypeVar('ComIF_T', bound='CommunicationInterface') diff --git a/comIF/OBSW_Ethernet_ComIF.py b/comIF/OBSW_Ethernet_ComIF.py index 762de6830c387cca631a308855a6b390f698b203..d4de40ab9ffba23cf8b58ae1b86c33b8341ea383 100644 --- a/comIF/OBSW_Ethernet_ComIF.py +++ b/comIF/OBSW_Ethernet_ComIF.py @@ -12,7 +12,7 @@ from typing import Tuple, Union from comIF.OBSW_ComInterface import CommunicationInterface, pusTmListT, pusTmQueueT, pusTmTupleQueueT, pusTmInfoQueueT from tm.OBSW_TmPacket import PUSTelemetryFactory from tc.OBSW_TcPacket import pusTcInfoT -from utility.OBSW_TmTcPrinter import TmTcPrinterT +from utility.obsw_tmtc_printer import TmTcPrinterT import config.OBSW_Config as g @@ -32,7 +32,7 @@ class EthernetComIF(CommunicationInterface): pass def sendTelecommand(self, tcPacket: bytearray, tcPacketInfo: pusTcInfoT = None) -> None: - self.tmtcPrinter.printTelecommand(tcPacket, tcPacketInfo) + self.tmtcPrinter.print_telecommand(tcPacket, tcPacketInfo) self.sockSend.sendto(tcPacket, self.sendAddress) def dataAvailable(self, timeout: float = 0) -> bool: @@ -47,7 +47,7 @@ class EthernetComIF(CommunicationInterface): if ready: data = self.sockReceive.recvfrom(1024)[0] packet = PUSTelemetryFactory(data) - self.tmtcPrinter.printTelemetry(packet) + self.tmtcPrinter.print_telemetry(packet) packetList = [packet] return True, packetList else: @@ -64,7 +64,7 @@ class EthernetComIF(CommunicationInterface): packets = self.receiveTelemetry() for packet in packets: packetInfo = packet.packTmInformation() - self.tmtcPrinter.printTelemetry(packet) + self.tmtcPrinter.print_telemetry(packet) tmInfoQueue.append(packetInfo) return tmInfoQueue @@ -79,7 +79,7 @@ class EthernetComIF(CommunicationInterface): for packet in packetList: packetInfo = packet.packTmInformation() tmTuple = (packetInfo, packet) - self.tmtcPrinter.printTelemetry(packet) + self.tmtcPrinter.print_telemetry(packet) tmTupleQueue.append(tmTuple) return tmTupleQueue diff --git a/comIF/OBSW_Serial_ComIF.py b/comIF/OBSW_Serial_ComIF.py index d7a5ed7b1c53ee0157efec1c905e0400c8d3555a..a721ff5227b0e9e86a90dc73d74d846136a13ca9 100644 --- a/comIF/OBSW_Serial_ComIF.py +++ b/comIF/OBSW_Serial_ComIF.py @@ -10,7 +10,7 @@ import serial import logging from typing import Tuple, List, Union, Optional from comIF.OBSW_ComInterface import CommunicationInterface, pusTmQueueT -from utility.OBSW_TmTcPrinter import TmTcPrinterT +from utility.obsw_tmtc_printer import TmTcPrinterT from tm.OBSW_TmPacket import PUSTelemetryFactory from tm.OBSW_PusTm import pusTmInfoQueueT, pusTmTupleQueueT, pusTmListT from tc.OBSW_TcPacket import pusTcInfoT @@ -39,7 +39,7 @@ class SerialComIF(CommunicationInterface): exit() def sendTelecommand(self, tcPacket: bytearray, tcPacketInfo: pusTcInfoT = None): - self.tmtcPrinter.printTelecommand(tcPacket, tcPacketInfo) + self.tmtcPrinter.print_telecommand(tcPacket, tcPacketInfo) self.serial.write(tcPacket) def receiveTelemetry(self, parameters: any = 0) -> list: @@ -55,7 +55,7 @@ class SerialComIF(CommunicationInterface): packetList = [] for counter in range(0, numberOfPackets): packet = PUSTelemetryFactory(pusDataList[counter]) - self.tmtcPrinter.printTelemetry(packet) + self.tmtcPrinter.print_telemetry(packet) packetList.append(packet) return True, packetList else: diff --git a/config/OBSW_Config.py b/config/OBSW_Config.py index 06d0df50ccf03dc13fcc2b92f1e15d0e9a1fddd8..1597c0cc738dad3e18108b37e2b4fe7b00af9c3b 100644 --- a/config/OBSW_Config.py +++ b/config/OBSW_Config.py @@ -71,7 +71,7 @@ sendAddress = (0, 0) # Print Settings printToFile = False -printHkData = False +G_PRINT_HK_DATA = False printRawTmData = False """ @@ -85,7 +85,7 @@ tmtcPrinter = None # noinspection PyUnusedLocal def setGlobals(args): global recAddress, sendAddress, scriptMode, modeId, service, displayMode, comIF, comPort, serial_timeout - global tmTimeout, tcSendTimeoutFactor, printToFile, printHkData, printRawTmData + global tmTimeout, tcSendTimeoutFactor, printToFile, G_PRINT_HK_DATA, printRawTmData if args.mode == 0: print("GUI mode not implemented yet !") if args.shortDisplayMode: @@ -134,7 +134,7 @@ def setGlobals(args): sendAddress = sendAddressToSet modeId = modeId comPort = args.COM - printHkData = args.hk + G_PRINT_HK_DATA = args.hk tmTimeout = args.tmTimeout printRawTmData = args.rawDataPrint displayMode = displayMode diff --git a/obsw_tmtc_client.py b/obsw_tmtc_client.py index dcac207aa53cdc5ab5b7f0f7f4d986bfa73d317a..6e947273375d0094183be3cbe5d13044b3db8296 100644 --- a/obsw_tmtc_client.py +++ b/obsw_tmtc_client.py @@ -66,10 +66,10 @@ from tc.OBSW_TcPacker import PUSTelecommand, createTotalTcQueue, serviceTestSele from sendreceive.OBSW_SingleCommandSenderReceiver import SingleCommandSenderReceiver from sendreceive.OBSW_SequentialSenderReceiver import SequentialCommandSenderReceiver -from sendreceive.OBSW_TmListener import TmListener +from sendreceive.obsw_tm_listener import TmListener from utility.OBSW_ArgParser import parseInputArguments -from utility.OBSW_TmTcPrinter import TmTcPrinter, TmTcPrinterT +from utility.obsw_tmtc_printer import TmTcPrinter, TmTcPrinterT from utility.OBSW_ExitHandler import keyboardInterruptHandler from comIF.OBSW_Ethernet_ComIF import EthernetComIF diff --git a/sendreceive/OBSW_CommandSenderReceiver.py b/sendreceive/OBSW_CommandSenderReceiver.py index 152f2f37ef469833034826d03e54a2f1fc8ce7d9..aaacb4e3c4b27932be65b9474d33fc267e9edee1 100644 --- a/sendreceive/OBSW_CommandSenderReceiver.py +++ b/sendreceive/OBSW_CommandSenderReceiver.py @@ -13,8 +13,8 @@ if the first reply has not been received. """ import time from comIF.OBSW_ComInterface import CommunicationInterface, ComIF_T -from utility.OBSW_TmTcPrinter import TmTcPrinterT, TmTcPrinter -from sendreceive.OBSW_TmListener import TmListenerT, TmListener +from utility.obsw_tmtc_printer import TmTcPrinterT, TmTcPrinter +from sendreceive.obsw_tm_listener import TmListenerT, TmListener class CommandSenderReceiver: @@ -90,11 +90,11 @@ class CommandSenderReceiver: time.sleep(waitTime) elif self.pusPacketInfo == "print": printString = self.pusPacket - self._tmtcPrinter.printString(printString) + self._tmtcPrinter.print_string(printString) elif self.pusPacketInfo == "export": exportName = self.pusPacket if self.doPrintToFile: - self._tmtcPrinter.printToFile(exportName, True) + self._tmtcPrinter.print_to_file(exportName, True) elif self.pusPacketInfo == "timeout": self.tmTimeout = self.pusPacket else: diff --git a/sendreceive/OBSW_MultipleCommandsSenderReceiver.py b/sendreceive/OBSW_MultipleCommandsSenderReceiver.py index d94544dbca3dd0d9cabe4ff01e42444742ebc421..9db8d22b57361b09e644fbf101d08d212528865d 100644 --- a/sendreceive/OBSW_MultipleCommandsSenderReceiver.py +++ b/sendreceive/OBSW_MultipleCommandsSenderReceiver.py @@ -12,8 +12,8 @@ from collections import deque from sendreceive.OBSW_SequentialSenderReceiver import SequentialCommandSenderReceiver from comIF.OBSW_ComInterface import ComIF_T -from utility.OBSW_TmTcPrinter import TmTcPrinterT -from sendreceive.OBSW_TmListener import TmListenerT +from utility.obsw_tmtc_printer import TmTcPrinterT +from sendreceive.obsw_tm_listener import TmListenerT import config.OBSW_Config as g @@ -62,7 +62,7 @@ class MultipleCommandSenderReceiver(SequentialCommandSenderReceiver): self.tmInfoQueue = self.retrieveListenerTmInfoQueue() self._tmListener.modeOpFinished.set() if self.doPrintToFile: - self._tmtcPrinter.printToFile() + self._tmtcPrinter.print_to_file() except (KeyboardInterrupt, SystemExit): print("Keyboard Interrupt or System Exit detected") exit() diff --git a/sendreceive/OBSW_SequentialSenderReceiver.py b/sendreceive/OBSW_SequentialSenderReceiver.py index 78ece60dca3f629820da590aee167bc8b97123a7..7a940abd954ed14aa995ac1a7edcfd1b4fdb619a 100644 --- a/sendreceive/OBSW_SequentialSenderReceiver.py +++ b/sendreceive/OBSW_SequentialSenderReceiver.py @@ -13,9 +13,9 @@ import time import config.OBSW_Config as g from sendreceive.OBSW_CommandSenderReceiver import CommandSenderReceiver -from sendreceive.OBSW_TmListener import TmListenerT +from sendreceive.obsw_tm_listener import TmListenerT from comIF.OBSW_ComInterface import ComIF_T -from utility.OBSW_TmTcPrinter import TmTcPrinterT +from utility.obsw_tmtc_printer import TmTcPrinterT class SequentialCommandSenderReceiver(CommandSenderReceiver): @@ -65,7 +65,7 @@ class SequentialCommandSenderReceiver(CommandSenderReceiver): print("Sequential SenderReceiver: All replies received!") if self.doPrintToFile: print("Sequential SenderReceiver: Exporting output to log file.") - self._tmtcPrinter.printToFile() + self._tmtcPrinter.print_to_file() def __performNextTcSend(self): if self._tmListener.replyEvent.is_set(): diff --git a/sendreceive/OBSW_SingleCommandSenderReceiver.py b/sendreceive/OBSW_SingleCommandSenderReceiver.py index 4e3f5f53bcc91b0fdaa1ca0c3c80ea6b586bb957..26f794ac3827920caf3878cee2471c8ccc200c46 100644 --- a/sendreceive/OBSW_SingleCommandSenderReceiver.py +++ b/sendreceive/OBSW_SingleCommandSenderReceiver.py @@ -9,8 +9,8 @@ """ from sendreceive.OBSW_CommandSenderReceiver import CommandSenderReceiver from comIF.OBSW_ComInterface import ComIF_T -from utility.OBSW_TmTcPrinter import TmTcPrinterT -from sendreceive.OBSW_TmListener import TmListenerT +from utility.obsw_tmtc_printer import TmTcPrinterT +from sendreceive.obsw_tm_listener import TmListenerT import config.OBSW_Config as g import logging diff --git a/sendreceive/OBSW_TmListener.py b/sendreceive/obsw_tm_listener.py similarity index 96% rename from sendreceive/OBSW_TmListener.py rename to sendreceive/obsw_tm_listener.py index 5e77eaea3918f5ac215c1ec1c8b6e880ec3e3dde..0df22c0b7167ba7dd7fc276455276a62f662cb85 100644 --- a/sendreceive/OBSW_TmListener.py +++ b/sendreceive/obsw_tm_listener.py @@ -1,12 +1,13 @@ """ @file - OBSW_TmListener.py + obsw_tm_listener.py @date 01.11.2019 @brief Separate class to listen to telecommands. This will enable to run the listener in a separate thread later. - This Listener will propably have some kind of mode. In default configuration, it will just listen for packets + This Listener will propably have some kind of mode. In default configuration, + it will just listen for packets """ import time import threading diff --git a/tm/OBSW_PusTm.py b/tm/OBSW_PusTm.py index 797ab6cd797c32a0e6a41bad429a40c1801b9aa5..5cc6c574adaff48564e691ee868ca57bbaa264f0 100644 --- a/tm/OBSW_PusTm.py +++ b/tm/OBSW_PusTm.py @@ -26,10 +26,10 @@ class PUSTelemetry(OBSWPusPacket): def appendPacketInfo(self, printInfo): self.printInfo = self.printInfo + printInfo - def printTelemetryHeader(self, array): + def print_telemetry_header(self, array): super().printPusPacketHeader(array) - def printTelemetryColumnHeaders(self, array): + def print_telemetry_column_headers(self, array): super().printPusPacketHeaderColumnHeaders(array) def packTmInformation(self) -> pusTmInfoT: diff --git a/tm/OBSW_TmPacket.py b/tm/OBSW_TmPacket.py index dbeabddbbf25d87919eb7a206e473b8964a117ac..a263d11d0fdb6cdf4a2d78bd57f0b36ba6f44f8e 100644 --- a/tm/OBSW_TmPacket.py +++ b/tm/OBSW_TmPacket.py @@ -39,12 +39,12 @@ class Service2TM(PUSTelemetry): def __init__(self, byteArray): super().__init__(byteArray) - def printTelemetryHeader(self, array): - super().printTelemetryHeader(array) + def print_telemetry_header(self, array): + super().print_telemetry_header(array) return - def printTelemetryColumnHeaders(self, array): - super().printTelemetryColumnHeaders(array) + def print_telemetry_column_headers(self, array): + super().print_telemetry_column_headers(array) return @@ -52,12 +52,12 @@ class Service8TM(PUSTelemetry): def __init__(self, byteArray): super().__init__(byteArray) - def printTelemetryHeader(self, array): - super().printTelemetryHeader(array) + def print_telemetry_header(self, array): + super().print_telemetry_header(array) return - def printTelemetryColumnHeaders(self, array): - super().printTelemetryColumnHeaders(array) + def print_telemetry_column_headers(self, array): + super().print_telemetry_column_headers(array) return @@ -65,12 +65,12 @@ class Service9TM(PUSTelemetry): def __init__(self, byteArray): super().__init__(byteArray) - def printTelemetryHeader(self, array): - super().printTelemetryHeader(array) + def print_telemetry_header(self, array): + super().print_telemetry_header(array) return - def printTelemetryColumnHeaders(self, array): - super().printTelemetryColumnHeaders(array) + def print_telemetry_column_headers(self, array): + super().print_telemetry_column_headers(array) return @@ -79,12 +79,12 @@ class Service17TM(PUSTelemetry): super().__init__(byteArray) self.printPacketInfo("Test Reply") - def printTelemetryHeader(self, array): - super().printTelemetryHeader(array) + def print_telemetry_header(self, array): + super().print_telemetry_header(array) return - def printTelemetryColumnHeaders(self, array): - super().printTelemetryColumnHeaders(array) + def print_telemetry_column_headers(self, array): + super().print_telemetry_column_headers(array) return @@ -108,8 +108,8 @@ class Service200TM(PUSTelemetry): self.mode = struct.unpack('>I', self.byteArrayData[4:8])[0] self.submode = self.byteArrayData[8] - def printTelemetryHeader(self, array): - super().printTelemetryHeader(array) + def print_telemetry_header(self, array): + super().print_telemetry_header(array) array.append(hex(self.objectId)) if self.isCantReachModeReply: array.append(hex(self.returnValue)) @@ -118,8 +118,8 @@ class Service200TM(PUSTelemetry): array.append(str(self.submode)) return - def printTelemetryColumnHeaders(self, array): - super().printTelemetryColumnHeaders(array) + def print_telemetry_column_headers(self, array): + super().print_telemetry_column_headers(array) array.append("Object ID") if self.isCantReachModeReply: array.append("Return Value") diff --git a/tm/OBSW_TmService1.py b/tm/OBSW_TmService1.py index 59d68d2c7c74b411fa68e6d1a80abf18566941e4..32616ebd602e140b3f5a073ded113551f384c52f 100644 --- a/tm/OBSW_TmService1.py +++ b/tm/OBSW_TmService1.py @@ -42,8 +42,8 @@ class Service1TM(PUSTelemetry): self.appendPacketInfo(" : Step Success") self.stepNumber = struct.unpack('>B', self.byteArrayData[4:5])[0] - def printTelemetryHeader(self, array): - super().printTelemetryHeader(array) + def print_telemetry_header(self, array): + super().print_telemetry_header(array) array.append(str(hex(self.tcPacketId))) array.append(str(self.tcSSC)) if self.tcErrorCode: @@ -55,8 +55,8 @@ class Service1TM(PUSTelemetry): elif self.isStep: array.append(str(self.stepNumber)) - def printTelemetryColumnHeaders(self, array): - super().printTelemetryColumnHeaders(array) + def print_telemetry_column_headers(self, array): + super().print_telemetry_column_headers(array) array.append("TC Packet ID") array.append("TC SSC") if self.tcErrorCode: diff --git a/tm/OBSW_TmService3.py b/tm/OBSW_TmService3.py index 6a9e61002b0c8176c83f19ea88c0bfe148deb71c..1d01629cef015533aded7babd9c5767eb14cb787 100644 --- a/tm/OBSW_TmService3.py +++ b/tm/OBSW_TmService3.py @@ -7,8 +7,11 @@ Author: R. Mueller """ from tm.OBSW_PusTm import PUSTelemetry +from typing import TypeVar import struct +PusTm3T = TypeVar('PusTm3T', bound='Service3TM') + class Service3TM(PUSTelemetry): def __init__(self, byteArray): @@ -20,7 +23,7 @@ class Service3TM(PUSTelemetry): self.hkDefinitionHeader = [] self.hkDefinition = [] self.numberOfParameters = 0 - self.validityBuffer = [] + self.validity_buffer = [] self.printPacketInfo("Housekeeping Packet") self.paramLength = 0 if self.getSubservice() == 10 or self.getSubservice() == 12: @@ -28,17 +31,15 @@ class Service3TM(PUSTelemetry): if self.getSubservice() == 25 or self.getSubservice() == 26: self.handleFillingHkArrays() - def printTelemetryHeader(self, array): - super().printTelemetryHeader(array) + def print_telemetry_header(self, array): + super().print_telemetry_header(array) array.append(hex(self.sid)) array.append(int(self.paramLength)) - return - def printTelemetryColumnHeaders(self, array): - super().printTelemetryColumnHeaders(array) + def print_telemetry_column_headers(self, array): + super().print_telemetry_column_headers(array) array.append("SID") array.append("HK Data Size") - return def handleFillingDefinitionArrays(self): self.hkHeader = ["SID", "Report Status", "Collection Interval", "Number Of IDs"] @@ -69,8 +70,8 @@ class Service3TM(PUSTelemetry): def handleGpsHkData(self): self.numberOfParameters = 9 - self.hkHeader = ["Fix Mode", "SV in Fix", "GNSS Week", "Time of Week", "Latitude", "Longitude", - "Mean Sea Altitude", "Position X", "Position Y", "Position Z", + self.hkHeader = ["Fix Mode", "SV in Fix", "GNSS Week", "Time of Week", "Latitude", + "Longitude", "Mean Sea Altitude", "Position X", "Position Y", "Position Z", "Velocity X", "Velocity Y", "Velocity Z"] fixMode = self.byteArrayData[4] svInFix = self.byteArrayData[5] @@ -87,7 +88,7 @@ class Service3TM(PUSTelemetry): vz = struct.unpack('>d', self.byteArrayData[64:72])[0] self.hkContent = [fixMode, svInFix, gnssWeek, timeOfWeek, latitude, longitude, msa, positionX, positionY, positionZ, vx, vy, vz] - self.validityBuffer = self.byteArrayData[72:] + self.validity_buffer = self.byteArrayData[72:] # print(self.validityBuffer) # print(str(format(self.validityBuffer[0], '#010b'))) # print(str(format(self.validityBuffer[1], '#010b'))) @@ -103,7 +104,7 @@ class Service3TM(PUSTelemetry): floatVector1 = struct.unpack('>f', self.byteArrayData[12:16])[0] floatVector2 = struct.unpack('>f', self.byteArrayData[16:20])[0] self.hkContent = [testBool, testUint8, testUint16, testUint32, floatVector1, floatVector2] - self.validityBuffer = self.byteArrayData[20:] + self.validity_buffer = self.byteArrayData[20:] # print(self.validityBuffer) # print(str(format(self.validityBuffer[0], '#010b'))) # print("Validity Buffer Length: " + str(len(self.validityBuffer))) diff --git a/tm/OBSW_TmService5.py b/tm/OBSW_TmService5.py index 8dd25e2f80777cd47d8ca6c5587b6376248ed26c..c308a9297cbb7dffd40830bc853b22d509715e11 100644 --- a/tm/OBSW_TmService5.py +++ b/tm/OBSW_TmService5.py @@ -27,15 +27,15 @@ class Service5TM(PUSTelemetry): self.param1 = struct.unpack('>I', self.byteArrayData[6:10])[0] self.param2 = struct.unpack('>I', self.byteArrayData[10:14])[0] - def printTelemetryHeader(self, array): - super().printTelemetryHeader(array) + def print_telemetry_header(self, array): + super().print_telemetry_header(array) array.append(str(self.eventId)) array.append(hex(self.objectId)) array.append(str(hex(self.param1)) + ", " + str(self.param1)) array.append(str(hex(self.param2)) + ", " + str(self.param2)) - def printTelemetryColumnHeaders(self, array): - super().printTelemetryColumnHeaders(array) + def print_telemetry_column_headers(self, array): + super().print_telemetry_column_headers(array) array.append("Event ID") array.append("Reporter ID") array.append("Parameter 1") diff --git a/utility/OBSW_TmTcPrinter.py b/utility/OBSW_TmTcPrinter.py deleted file mode 100644 index cc9b1e50244b2b22105fd439f6da1928274e2388..0000000000000000000000000000000000000000 --- a/utility/OBSW_TmTcPrinter.py +++ /dev/null @@ -1,200 +0,0 @@ -#!/usr/bin/python3.7 -# -*- coding: utf-8 -*- -""" -@file - OBSW_Config.py -@date - 01.11.2019 -@brief - Class that performs all printing functionalities -""" -import os -from typing import TypeVar -from config import OBSW_Config as g - -TmTcPrinterT = TypeVar('TmTcPrinterT', bound='TmTcPrinter') - - -class TmTcPrinter: - def __init__(self, displayMode="long", doPrintToFile=False, printTc=True): - self.printBuffer = "" - # global print buffer which will be useful to print something to file - self.fileBuffer = "" - self.displayMode = displayMode - self.doPrintToFile = doPrintToFile - self.printTc = printTc - - def printTelemetry(self, packet): - if self.displayMode == "short": - self.handleShortPrint(packet) - else: - self.handleLongPrint(packet) - self.handleWiretappingPacket(packet) - self.handleDataReplyPacket(packet) - if packet.getService() == 3 and (packet.getSubservice() == 25 or packet.getSubservice() == 26): - self.handleHkPrint(packet) - if packet.getService() == 3 and (packet.getSubservice() == 10 or packet.getSubservice() == 12): - self.handleHkDefinitionPrint(packet) - if g.printRawTmData: - self.printBuffer = "TM Data:" + "\n" + self.returnDataString(packet.data) - print(self.printBuffer) - self.addPrintBufferToFileBuffer() - - def handleShortPrint(self, packet): - self.printBuffer = "Received TM[" + str(packet.getService()) + "," + str( - packet.getSubservice()) + "]" - print(self.printBuffer) - self.addPrintBufferToFileBuffer() - - def handleLongPrint(self, packet): - self.printBuffer = "Received Telemetry: " + packet.printInfo - print(self.printBuffer) - self.addPrintBufferToFileBuffer() - self.handleColumnHeaderPrint(packet) - self.handleTmContentPrint(packet) - - def handleColumnHeaderPrint(self, packet): - recPus = [] - packet.printTelemetryColumnHeaders(recPus) - self.printBuffer = str(recPus) - print(self.printBuffer) - self.addPrintBufferToFileBuffer() - - def handleTmContentPrint(self, packet): - recPus = [] - packet.printTelemetryHeader(recPus) - self.printBuffer = str(recPus) - print(self.printBuffer) - self.addPrintBufferToFileBuffer() - - def handleHkPrint(self, packet): - if g.printHkData: - self.printBuffer = "HK Data from SID " - self.printBuffer = self.printBuffer + str(hex(packet.sid)) + " :" - self.printHk(packet) - self.printValidityBuffer(packet) - - def handleHkDefinitionPrint(self, packet): - if g.printHkData: - self.printBuffer = "HK Definition from SID " - self.printBuffer = self.printBuffer + str(hex(packet.sid)) + " :" - self.printHk(packet) - - def printHk(self, packet): - print(self.printBuffer) - self.addPrintBufferToFileBuffer() - self.printBuffer = str(packet.hkHeader) - print(self.printBuffer) - self.addPrintBufferToFileBuffer() - self.printBuffer = str(packet.hkContent) - print(self.printBuffer) - self.addPrintBufferToFileBuffer() - - def printValidityBuffer(self, packet): - self.printBuffer = "Valid: " - print(self.printBuffer) - self.addPrintBufferToFileBuffer() - self.handleValidityBufferPrint(packet.validityBuffer, packet.numberOfParameters) - - def handleValidityBufferPrint(self, validityBuffer, numberOfParameters): - self.printBuffer = "[" - parameterCount = 0 - for count in range(len(validityBuffer)): - for bit in range(1, 9): - if self.bitExtractor(validityBuffer[count], bit) == 1: - self.printBuffer = self.printBuffer + "Yes" - else: - self.printBuffer = self.printBuffer + "No" - parameterCount = parameterCount + 1 - if parameterCount == numberOfParameters: - self.printBuffer = self.printBuffer + "]" - break - else: - self.printBuffer = self.printBuffer + ", " - print(self.printBuffer) - self.addPrintBufferToFileBuffer() - - def handleWiretappingPacket(self, packet): - if packet.getService() == 2 and (packet.getSubservice() == 131 or packet.getSubservice() == 130): - self.printBuffer = "Wiretapping Packet or Raw Reply from TM [" + \ - str(packet.getService()) + "," + str(packet.getSubservice()) + "]:" - self.printBuffer = self.printBuffer + self.returnDataString(packet.data) - print(self.printBuffer) - self.addPrintBufferToFileBuffer() - - def handleDataReplyPacket(self, packet): - if packet.getService() == 8 and packet.getSubservice() == 130: - self.printBuffer = "Service 8 Direct Command Reply TM[8,130] with data: " \ - + self.returnDataString(packet.data) - print(self.printBuffer) - - def printData(self, byteArray): - string = self.returnDataString(byteArray) - print(string) - - @staticmethod - def returnDataString(byteArray): - strToPrint = "[" - for byte in byteArray: - strToPrint += str(hex(byte)) + " , " - strToPrint = strToPrint.rstrip(' , ') - strToPrint += ']' - return strToPrint - - def printString(self, string): - self.printBuffer = string - print(self.printBuffer) - if self.doPrintToFile: - self.addPrintBufferToFileBuffer() - - def addPrintBufferToFileBuffer(self): - if self.doPrintToFile: - self.fileBuffer = self.fileBuffer + self.printBuffer + "\n" - - def printToFile(self, logName: str = "log/tmtc_log.txt", clearFileBuffer: bool = False): - try: - file = open(logName, 'w') - except FileNotFoundError: - print("Log directory does not exists, creating log folder.") - os.mkdir('log') - file = open(logName, 'w') - file.write(self.fileBuffer) - if clearFileBuffer: - self.fileBuffer = "" - print("Log file written to " + logName) - file.close() - - @staticmethod - def bitExtractor(byte, position): - shiftNumber = position + (6 - 2 * (position - 1)) - return (byte >> shiftNumber) & 1 - - # This function handles the printing of Telecommands - def printTelecommand(self, pusPacket, pusPacketInfo=None): - if self.printTc: - if len(pusPacket) == 0: - print("TMTC Printer: Empty packet was sent, configuration error") - exit() - if pusPacketInfo is None: - print("TMTC Printer: No packet info supplied to print") - return - if self.displayMode == "short": - self.handleShortTcPrint(pusPacketInfo) - else: - self.handleLongTcPrint(pusPacketInfo) - - def handleShortTcPrint(self, pusPacketInfo): - self.printBuffer = "Sent TC[" + str(pusPacketInfo["service"]) + "," + str(pusPacketInfo["subservice"]) \ - + "] " + " with SSC " + str(pusPacketInfo["ssc"]) - print(self.printBuffer) - self.addPrintBufferToFileBuffer() - - def handleLongTcPrint(self, pusPacketInfo): - try: - self.printBuffer = "Telecommand TC[" + str(pusPacketInfo["service"]) + "," + \ - str(pusPacketInfo["subservice"]) + "] with SSC " + str(pusPacketInfo["ssc"]) \ - + " sent with data " + self.returnDataString(pusPacketInfo["data"]) - print(self.printBuffer) - self.addPrintBufferToFileBuffer() - except TypeError: - print("TMTC Printer: Type Error !") diff --git a/utility/obsw_tmtc_printer.py b/utility/obsw_tmtc_printer.py new file mode 100644 index 0000000000000000000000000000000000000000..6ccf78cdaa5172e0e9660b2d05ccfac715deebf5 --- /dev/null +++ b/utility/obsw_tmtc_printer.py @@ -0,0 +1,297 @@ +#!/usr/bin/python3.7 +# -*- coding: utf-8 -*- +""" +@file + OBSW_Config.py +@date + 01.11.2019 +@brief + Class that performs all printing functionalities +""" +import os +import sys +from typing import TypeVar +from config import OBSW_Config as g +from tm.OBSW_PusTm import PusTmT +from tm.OBSW_TmService3 import PusTm3T +from tc.OBSW_TcPacket import PusTcT, pusTcInfoT + +TmTcPrinterT = TypeVar('TmTcPrinterT', bound='TmTcPrinter') + + +class TmTcPrinter: + """ + This class handles printing to the command line and to files. + """ + def __init__(self, displayMode: str = "long", doPrintToFile: bool = False, + printTc: bool = True): + """ + :param displayMode: "long" or "short" TODO: replace by enum + :param doPrintToFile: if true, print to file + :param printTc: if true, print TCs + """ + self.print_buffer = "" + # global print buffer which will be useful to print something to file + self.file_buffer = "" + self.display_mode = displayMode + self.do_print_to_file = doPrintToFile + self.print_tc = printTc + + def print_telemetry(self, packet: PusTmT): + """ + This function handles printing telemetry + :param packet: + :return: + """ + if self.display_mode == "short": + self.__handle_short_print(packet) + else: + self.__handle_long_print(packet) + self.handle_wiretapping_packet(packet) + self.handle_data_reply_packet(packet) + if packet.getService() == 3 and \ + (packet.getSubservice() == 25 or packet.getSubservice() == 26): + self.__handle_hk_print(packet) + if packet.getService() == 3 and \ + (packet.getSubservice() == 10 or packet.getSubservice() == 12): + self.__handle_hk_definition_print(packet) + if g.printRawTmData: + self.print_buffer = "TM Data:" + "\n" + self.return_data_string(packet.data) + print(self.print_buffer) + self.add_print_buffer_to_file_buffer() + + def __handle_short_print(self, tm_packet: PusTmT): + self.print_buffer = "Received TM[" + str(tm_packet.getService()) + "," + str( + tm_packet.getSubservice()) + "]" + print(self.print_buffer) + self.add_print_buffer_to_file_buffer() + + def __handle_long_print(self, tm_packet: PusTmT): + self.print_buffer = "Received Telemetry: " + tm_packet.printInfo + print(self.print_buffer) + self.add_print_buffer_to_file_buffer() + self.__handle_column_header_print(tm_packet) + self.handle_tm_content_print(tm_packet) + + def __handle_column_header_print(self, tm_packet: PusTmT): + rec_pus = [] + tm_packet.print_telemetry_column_headers(rec_pus) + self.print_buffer = str(rec_pus) + print(self.print_buffer) + self.add_print_buffer_to_file_buffer() + + def handle_tm_content_print(self, tm_packet: PusTmT): + """ + :param tm_packet: + :return: + """ + rec_pus = [] + tm_packet.print_telemetry_header(rec_pus) + self.print_buffer = str(rec_pus) + print(self.print_buffer) + self.add_print_buffer_to_file_buffer() + + def __handle_hk_print(self, tm_packet: PusTmT): + """ + Prints HK data previously set by TM receiver + :param tm_packet: + :return: + """ + if g.G_PRINT_HK_DATA: + self.print_buffer = "HK Data from SID " + self.print_buffer = self.print_buffer + str(hex(tm_packet.sid)) + " :" + self.print_hk(tm_packet) + self.print_validity_buffer(tm_packet) + + def __handle_hk_definition_print(self, tm_packet: PusTmT): + """ + :param tm_packet: + :return: + """ + if g.G_PRINT_HK_DATA: + self.print_buffer = "HK Definition from SID " + self.print_buffer = self.print_buffer + str(hex(tm_packet.sid)) + " :" + self.print_hk(tm_packet) + + def print_hk(self, tm_packet: PusTm3T): + """ + :param tm_packet: + :return: + """ + print(self.print_buffer) + self.add_print_buffer_to_file_buffer() + self.print_buffer = str(tm_packet.hkHeader) + print(self.print_buffer) + self.add_print_buffer_to_file_buffer() + self.print_buffer = str(tm_packet.hkContent) + print(self.print_buffer) + self.add_print_buffer_to_file_buffer() + + def print_validity_buffer(self, tm_packet: PusTm3T): + """ + :param tm_packet: + :return: + """ + self.print_buffer = "Valid: " + print(self.print_buffer) + self.add_print_buffer_to_file_buffer() + self.handle_validity_buffer_print(tm_packet.validity_buffer, tm_packet.numberOfParameters) + + def handle_validity_buffer_print(self, validity_buffer: bytearray, number_of_parameters): + """ + :param validity_buffer: + :param number_of_parameters: + :return: + """ + self.print_buffer = "[" + for index, byte in enumerate(validity_buffer): + for bit in range(1, 9): + if self.bit_extractor(byte, bit) == 1: + self.print_buffer = self.print_buffer + "Yes" + else: + self.print_buffer = self.print_buffer + "No" + if index == number_of_parameters: + self.print_buffer = self.print_buffer + "]" + else: + self.print_buffer = self.print_buffer + ", " + print(self.print_buffer) + self.add_print_buffer_to_file_buffer() + + def handle_wiretapping_packet(self, packet): + """ + :param packet: + :return: + """ + if packet.getService() == 2 and \ + (packet.getSubservice() == 131 or packet.getSubservice() == 130): + self.print_buffer = "Wiretapping Packet or Raw Reply from TM [" + \ + str(packet.getService()) + "," + str(packet.getSubservice()) + "]:" + self.print_buffer = self.print_buffer + self.return_data_string(packet.data) + print(self.print_buffer) + self.add_print_buffer_to_file_buffer() + + def handle_data_reply_packet(self, packet): + """ + + :param packet: + :return: + """ + if packet.getService() == 8 and packet.getSubservice() == 130: + self.print_buffer = "Service 8 Direct Command Reply TM[8,130] with data: " \ + + self.return_data_string(packet.data) + print(self.print_buffer) + + def print_data(self, byte_array: bytearray): + """ + :param byte_array: + :return: None + """ + string = self.return_data_string(byte_array) + print(string) + + @staticmethod + def return_data_string(byte_array: bytearray) -> str: + """ + :param byte_array: + :return: + """ + str_to_print = "[" + for byte in byte_array: + str_to_print += str(hex(byte)) + " , " + str_to_print = str_to_print.rstrip(',') + str_to_print += ']' + return str_to_print + + def print_string(self, string: str): + """ + :param string: + :return: + """ + self.print_buffer = string + print(self.print_buffer) + if self.do_print_to_file: + self.add_print_buffer_to_file_buffer() + + def add_print_buffer_to_file_buffer(self): + """ + :return: + """ + if self.do_print_to_file: + self.file_buffer = self.file_buffer + self.print_buffer + "\n" + + def print_to_file(self, log_name: str = "log/tmtc_log.txt", clear_file_buffer: bool = False): + """ + + :param log_name: + :param clear_file_buffer: + :return: + """ + try: + file = open(log_name, 'w') + except FileNotFoundError: + print("Log directory does not exists, creating log folder.") + os.mkdir('log') + file = open(log_name, 'w') + file.write(self.file_buffer) + if clear_file_buffer: + self.file_buffer = "" + print("Log file written to " + log_name) + file.close() + + @staticmethod + def bit_extractor(byte: int, position: int): + """ + + :param byte: + :param position: + :return: + """ + shift_number = position + (6 - 2 * (position - 1)) + return (byte >> shift_number) & 1 + + def print_telecommand(self, tc_packet: PusTcT, tc_packet_info: pusTcInfoT = None): + """ + This function handles the printing of Telecommands + :param tc_packet: + :param tc_packet_info: + :return: + """ + if self.print_tc: + if len(tc_packet) == 0: + print("TMTC Printer: Empty packet was sent, configuration error") + sys.exit() + if tc_packet_info is None: + print("TMTC Printer: No packet info supplied to print") + return + if self.display_mode == "short": + self.handle_short_tc_print(tc_packet_info) + else: + self.handle_long_tc_print(tc_packet_info) + + def handle_short_tc_print(self, tc_packet_info: pusTcInfoT): + """ + Brief TC print + :param tc_packet_info: + :return: + """ + self.print_buffer = "Sent TC[" + str(tc_packet_info["service"]) + "," + \ + str(tc_packet_info["subservice"]) + "] " + " with SSC " + \ + str(tc_packet_info["ssc"]) + print(self.print_buffer) + self.add_print_buffer_to_file_buffer() + + def handle_long_tc_print(self, tc_packet_info: pusTcInfoT): + """ + Long TC print + :param tc_packet_info: + :return: + """ + try: + self.print_buffer = "Telecommand TC[" + str(tc_packet_info["service"]) + "," + \ + str(tc_packet_info["subservice"]) + "] with SSC " + \ + str(tc_packet_info["ssc"]) + " sent with data " + \ + self.return_data_string(tc_packet_info["data"]) + print(self.print_buffer) + self.add_print_buffer_to_file_buffer() + except TypeError: + print("TMTC Printer: Type Error !")