From 6edd2fbafbd367d653c26110a79ce9f5c2102dd3 Mon Sep 17 00:00:00 2001 From: "Robin.Mueller" <robin.mueller.m@gmail.com> Date: Wed, 8 Apr 2020 23:32:51 +0200 Subject: [PATCH] some more pep adaptions --- .../OBSW_TmTcClient_Software_Serial.xml | 4 ++-- comIF/obsw_com_interface.py | 2 +- comIF/obsw_ethernet_com_if.py | 6 +++--- comIF/obsw_serial_com_if.py | 8 ++++---- config/OBSW_Config.py | 9 ++++----- .../OBSW_MultipleCommandsSenderReceiver.py | 5 +++-- .../obsw_single_command_sender_receiver.py | 1 - tc/OBSW_TcPacket.py | 4 +++- tc/OBSW_TcService2.py | 3 +-- test/OBSW_PusServiceTest.py | 13 ++++++------ test/OBSW_UnitTest.py | 12 +++++------ tm/{OBSW_PusPacket.py => obsw_pus_tm_base.py} | 6 ++++-- ...BSW_TmPacket.py => obsw_pus_tm_factory.py} | 20 +++++++++---------- tm/{OBSW_PusTm.py => obsw_pus_tm_generic.py} | 12 +++++------ ...BSW_TmService1.py => obsw_tm_service_1.py} | 16 +++++++-------- ...BSW_TmService3.py => obsw_tm_service_3.py} | 6 +++--- ...BSW_TmService5.py => obsw_tm_service_5.py} | 18 ++++++++--------- utility/obsw_args_parser.py | 4 ++-- utility/obsw_tmtc_printer.py | 4 ++-- 19 files changed, 78 insertions(+), 75 deletions(-) rename tm/{OBSW_PusPacket.py => obsw_pus_tm_base.py} (96%) rename tm/{OBSW_TmPacket.py => obsw_pus_tm_factory.py} (88%) rename tm/{OBSW_PusTm.py => obsw_pus_tm_generic.py} (77%) rename tm/{OBSW_TmService1.py => obsw_tm_service_1.py} (85%) rename tm/{OBSW_TmService3.py => obsw_tm_service_3.py} (95%) rename tm/{OBSW_TmService5.py => obsw_tm_service_5.py} (75%) diff --git a/.idea/runConfigurations/OBSW_TmTcClient_Software_Serial.xml b/.idea/runConfigurations/OBSW_TmTcClient_Software_Serial.xml index 1fad219..2088585 100644 --- a/.idea/runConfigurations/OBSW_TmTcClient_Software_Serial.xml +++ b/.idea/runConfigurations/OBSW_TmTcClient_Software_Serial.xml @@ -12,8 +12,8 @@ <option name="ADD_CONTENT_ROOTS" value="true" /> <option name="ADD_SOURCE_ROOTS" value="true" /> <EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" /> - <option name="SCRIPT_NAME" value="C:\Users\Robin\NoSyncDokumente\sourceobsw\tmtc\obsw_tmtc_client.py" /> - <option name="PARAMETERS" value="-m 4 -p -c 1 --hk -t 5" /> + <option name="SCRIPT_NAME" value="$PROJECT_DIR$/obsw_tmtc_client.py" /> + <option name="PARAMETERS" value="-m 4 -c 1 --hk -t 5" /> <option name="SHOW_COMMAND_LINE" value="false" /> <option name="EMULATE_TERMINAL" value="false" /> <option name="MODULE_MODE" value="false" /> diff --git a/comIF/obsw_com_interface.py b/comIF/obsw_com_interface.py index d6761c3..916f95a 100644 --- a/comIF/obsw_com_interface.py +++ b/comIF/obsw_com_interface.py @@ -9,7 +9,7 @@ Description: Generic Communication Interface. Defines the syntax of the communic """ from abc import abstractmethod from typing import TypeVar, Tuple, Union -from tm.OBSW_PusTm import pusTmQueueT, pusTmTupleQueueT, pusTmInfoQueueT, pusTmListT +from tm.obsw_pus_tm_generic import pusTmQueueT, pusTmTupleQueueT, pusTmInfoQueueT, pusTmListT from utility.obsw_tmtc_printer import TmTcPrinterT from tc.OBSW_TcPacket import pusTcInfoT diff --git a/comIF/obsw_ethernet_com_if.py b/comIF/obsw_ethernet_com_if.py index c17faf0..f456271 100644 --- a/comIF/obsw_ethernet_com_if.py +++ b/comIF/obsw_ethernet_com_if.py @@ -12,7 +12,7 @@ from typing import Tuple, Union from comIF.obsw_com_interface import CommunicationInterface, pusTmListT, pusTmQueueT, \ pusTmTupleQueueT, pusTmInfoQueueT -from tm.OBSW_TmPacket import PUSTelemetryFactory +from tm.obsw_pus_tm_factory import PUSTelemetryFactory from tc.OBSW_TcPacket import pusTcInfoT from utility.obsw_tmtc_printer import TmTcPrinterT import config.OBSW_Config as g @@ -69,7 +69,7 @@ class EthernetComIF(CommunicationInterface): Union[None, pusTmInfoQueueT]: packets = self.receive_telemetry() for packet in packets: - packet_info = packet.packTmInformation() + packet_info = packet.pack_tm_information() self.tmtc_printer.print_telemetry(packet) tmInfoQueue.append(packet_info) return tmInfoQueue @@ -84,7 +84,7 @@ class EthernetComIF(CommunicationInterface): Union[None, pusTmTupleQueueT]: packet_list = self.receive_telemetry() for packet in packet_list: - packet_info = packet.packTmInformation() + packet_info = packet.pack_tm_information() tm_tuple = (packet_info, packet) self.tmtc_printer.print_telemetry(packet) tmTupleQueue.append(tm_tuple) diff --git a/comIF/obsw_serial_com_if.py b/comIF/obsw_serial_com_if.py index 93799a2..ca921e9 100644 --- a/comIF/obsw_serial_com_if.py +++ b/comIF/obsw_serial_com_if.py @@ -13,8 +13,8 @@ from typing import Tuple, List, Union, Optional import serial from comIF.obsw_com_interface import CommunicationInterface, pusTmQueueT from utility.obsw_tmtc_printer import TmTcPrinterT -from tm.OBSW_TmPacket import PUSTelemetryFactory -from tm.OBSW_PusTm import pusTmInfoQueueT, pusTmTupleQueueT, pusTmListT +from tm.obsw_pus_tm_factory import PUSTelemetryFactory +from tm.obsw_pus_tm_generic import pusTmInfoQueueT, pusTmTupleQueueT, pusTmListT from tc.OBSW_TcPacket import pusTcInfoT SERIAL_PACKET_MAX_SIZE = 1024 @@ -131,7 +131,7 @@ class SerialComIF(CommunicationInterface): (packet_received, pus_packets) = self.poll_interface() if packet_received: for packet in pus_packets: - tm_info = packet.packTmInformation() + tm_info = packet.pack_tm_information() tmInfoQueue.append(tm_info) return tmInfoQueue @@ -140,6 +140,6 @@ class SerialComIF(CommunicationInterface): (packet_received, pus_packets) = self.poll_interface() if packet_received: for packet in pus_packets: - tm_info = packet.packTmInformation() + tm_info = packet.pack_tm_information() tm_tuple_queue.append((tm_info, packet)) return tm_tuple_queue diff --git a/config/OBSW_Config.py b/config/OBSW_Config.py index c5ab216..804b566 100644 --- a/config/OBSW_Config.py +++ b/config/OBSW_Config.py @@ -7,13 +7,12 @@ Global settings for UDP client """ import enum -from typing import Tuple, TypeVar +from typing import Tuple """ Global type definitions """ ethernetAddressT = Tuple[str, int] -# socketType = TypeVar['socketType'] # Mode options, set by args parser @@ -85,9 +84,9 @@ tmtcPrinter = None # noinspection PyUnusedLocal def setGlobals(args): - global G_REC_ADDRESS, G_SEND_ADDRESS, G_SCRIPT_MODE, G_MODE_ID, G_SERVICE, G_DISPLAY_MODE - global G_COM_IF, G_COM_PORT, G_SERIAL_TIMEOUT, G_TM_TIMEOUT, G_TC_SEND_TIMEOUT_FACTOR, G_PRINT_TO_FILE - global G_PRINT_HK_DATA, G_PRINT_RAW_TM + global G_REC_ADDRESS, G_SEND_ADDRESS, G_SCRIPT_MODE, G_MODE_ID, G_SERVICE, G_DISPLAY_MODE,\ + G_COM_IF, G_COM_PORT, G_SERIAL_TIMEOUT, G_TM_TIMEOUT, G_TC_SEND_TIMEOUT_FACTOR, \ + G_PRINT_TO_FILE, G_PRINT_HK_DATA, G_PRINT_RAW_TM if args.mode == 0: print("GUI mode not implemented yet !") if args.shortDisplayMode: diff --git a/sendreceive/OBSW_MultipleCommandsSenderReceiver.py b/sendreceive/OBSW_MultipleCommandsSenderReceiver.py index 4b730e2..55bfafd 100644 --- a/sendreceive/OBSW_MultipleCommandsSenderReceiver.py +++ b/sendreceive/OBSW_MultipleCommandsSenderReceiver.py @@ -19,8 +19,9 @@ import config.OBSW_Config as g class MultipleCommandSenderReceiver(SequentialCommandSenderReceiver): """ - Difference to seqential sender: This class can send TCs in bursts. Wait intervals can be specified with - wait time between the send bursts. This is generally done in the separate test classes in UnitTest + Difference to seqential sender: This class can send TCs in bursts. + Wait intervals can be specified with wait time between the send bursts. + This is generally done in the separate test classes in UnitTest """ def __init__(self, comInterface: ComIfT, tmtcPrinter: TmTcPrinterT, tcQueue: Deque, tmListener: TmListenerT, tmTimeout: float, waitIntervals: list, diff --git a/sendreceive/obsw_single_command_sender_receiver.py b/sendreceive/obsw_single_command_sender_receiver.py index 1c81caa..9918ebf 100644 --- a/sendreceive/obsw_single_command_sender_receiver.py +++ b/sendreceive/obsw_single_command_sender_receiver.py @@ -31,7 +31,6 @@ class SingleCommandSenderReceiver(CommandSenderReceiver): listen for TMs for this time period :param tcTimeoutFactor: If TM is not received, resend TC after G_TM_TIMEOUT * G_TC_SEND_TIMEOUT_FACTOR - :param doPrintToFile: Specify whether output is also printed to a file """ super().__init__(comInterface=comInterface, tmListener=tmListener, tmtcPrinter=tmtcPrinter, tmTimeout=tmTimeout, tcSendTimeoutFactor=tcTimeoutFactor) diff --git a/tc/OBSW_TcPacket.py b/tc/OBSW_TcPacket.py index 9bd5e21..3015e2f 100644 --- a/tc/OBSW_TcPacket.py +++ b/tc/OBSW_TcPacket.py @@ -93,13 +93,15 @@ class PUSTelecommand: print("]") -# Takes pusPackets, removes current Packet Error Control, calculates new CRC (16 bits at packet end) and +# Takes pusPackets, removes current Packet Error Control, +# calculates new CRC (16 bits at packet end) and # adds it as correct Packet Error Control Code. Reference: ECSS-E70-41A p. 207-212 def generatePacketCRC(TCPacket: PusTcT) -> PusTcT: crc_func = crcmod.mkCrcFun(0x11021, rev=False, initCrc=0xFFFF, xorOut=0x0000) crc = crc_func(bytearray(TCPacket[0:len(TCPacket) - 2])) TCPacket[len(TCPacket) - 2] = (crc & 0xFF00) >> 8 TCPacket[len(TCPacket) - 1] = crc & 0xFF + return TCPacket def generateCRC(data: bytearray) -> bytearray: diff --git a/tc/OBSW_TcService2.py b/tc/OBSW_TcService2.py index 4b8d629..7b1697f 100644 --- a/tc/OBSW_TcService2.py +++ b/tc/OBSW_TcService2.py @@ -7,9 +7,8 @@ Description: PUS Custom Service 8: Device Access, Native low-level commanding @author: R. Mueller """ import struct -from typing import Deque -from tc.OBSW_TcPacket import * +from tc.OBSW_TcPacket import PUSTelecommand, Deque from tc.OBSW_TcService200 import packModeData diff --git a/test/OBSW_PusServiceTest.py b/test/OBSW_PusServiceTest.py index 6770b15..2cc98e4 100644 --- a/test/OBSW_PusServiceTest.py +++ b/test/OBSW_PusServiceTest.py @@ -25,20 +25,20 @@ class TestService2(TestService): while not tmInfoQueue.__len__() == 0: currentTmInfo = tmInfoQueue.pop() # Tc verification scanning is generic and has been moved to the superclass - if currentTmInfo["G_SERVICE"] == 1: + if currentTmInfo["service"] == 1: super().scanForRespectiveTc(currentTmInfo) # Here, the desired event Id or RID can be specified - if currentTmInfo["G_SERVICE"] == 5: + if currentTmInfo["service"] == 5: # mode change if currentTmInfo["EventID"] == 7401 and currentTmInfo["RID"] == 0x4400affe: self.eventCounter = self.eventCounter + 1 - if currentTmInfo["G_SERVICE"] == 200 and currentTmInfo["subservice"] == 6: + if currentTmInfo["service"] == 200 and currentTmInfo["subservice"] == 6: # mode change confirmation self.miscCounter = self.miscCounter + 1 # wiretapping messages - elif currentTmInfo["G_SERVICE"] == 2 and currentTmInfo["subservice"] == 130: + elif currentTmInfo["service"] == 2 and currentTmInfo["subservice"] == 130: self.miscCounter = self.miscCounter + 1 - elif currentTmInfo["G_SERVICE"] == 2 and currentTmInfo["subservice"] == 131: + elif currentTmInfo["service"] == 2 and currentTmInfo["subservice"] == 131: self.miscCounter = self.miscCounter + 1 if currentTmInfo["valid"] == 0: self.valid = False @@ -104,7 +104,8 @@ class TestService17(TestService): def analyseTmTcInfo(self, tmInfoQueue: pusTmInfoQueueT, tcInfoQueue: pusTcInfoQueueT): assertionDict = super().analyseTmTcInfo(tmInfoQueue=tmInfoQueue, tcInfoQueue=tcInfoQueue) - # add anything elsee other than tc verification counter and ssc that is needed for tm analysis + # add anything elsee other than tc verification counter + # and ssc that is needed for tm analysis return assertionDict def analyseTcInfo(self, tcInfoQueue): diff --git a/test/OBSW_UnitTest.py b/test/OBSW_UnitTest.py index 49184ca..15d3105 100644 --- a/test/OBSW_UnitTest.py +++ b/test/OBSW_UnitTest.py @@ -33,11 +33,11 @@ from collections import deque from typing import Deque from tc.OBSW_TcPacker import packDummyDeviceTestInto -from tm.OBSW_TmService1 import pusPacketInfoService1T +from tm.obsw_tm_service_1 import pusPacketInfoService1T from abc import abstractmethod from sendreceive.OBSW_MultipleCommandsSenderReceiver import MultipleCommandSenderReceiver from config import OBSW_Config as g -from tm.OBSW_PusTm import pusTmInfoQueueT, pusTmInfoT +from tm.obsw_pus_tm_generic import pusTmInfoQueueT, pusTmInfoT from tc.OBSW_TcPacket import pusTcInfoQueueT TmInfoQueueService1T = Deque[pusPacketInfoService1T] @@ -134,7 +134,7 @@ class TestService(unittest.TestCase): # For commands with multiple steps, update this value manually ! self.tcVerifyStepCounter = self.tcVerifyCounter + 1 self.tcSscArray.append(currentTcInfo["ssc"]) - self.tcServiceArray.append(currentTcInfo["G_SERVICE"]) + self.tcServiceArray.append(currentTcInfo["service"]) self.tcSubserviceArray.append(currentTcInfo["subservice"]) # must be implemented by child test ! @@ -178,13 +178,13 @@ class TestService(unittest.TestCase): while not tmInfoQueue.__len__() == 0: currentTmInfo = tmInfoQueue.pop() # Tc verification scanning is generic and has been moved to the superclass - if currentTmInfo["G_SERVICE"] == 1: + if currentTmInfo["service"] == 1: self.scanForRespectiveTc(currentTmInfo) # Here, the desired event Id or RID can be specified - if currentTmInfo["G_SERVICE"] == 5: + if currentTmInfo["service"] == 5: if currentTmInfo["EventID"] == 8200 and currentTmInfo["RID"] == 0x51001700: self.eventCounter = self.eventCounter + 1 - if currentTmInfo["G_SERVICE"] == 17: + if currentTmInfo["service"] == 17: self.miscCounter = self.miscCounter + 1 if currentTmInfo["valid"] == 0: self.valid = False diff --git a/tm/OBSW_PusPacket.py b/tm/obsw_pus_tm_base.py similarity index 96% rename from tm/OBSW_PusPacket.py rename to tm/obsw_pus_tm_base.py index 8caa234..5817c0d 100644 --- a/tm/OBSW_PusPacket.py +++ b/tm/obsw_pus_tm_base.py @@ -112,10 +112,12 @@ class OBSWTimestamp: byteArray = byteArray[1:] self.days = ((byteArray[0] << 8) | (byteArray[1])) - 4383 self.seconds = self.days * (24*60*60) - sDay = ((byteArray[2] << 24) | (byteArray[3] << 16) | (byteArray[4]) << 8 | byteArray[5])/1000 + sDay = ((byteArray[2] << 24) | (byteArray[3] << 16) | + (byteArray[4]) << 8 | byteArray[5])/1000 self.seconds += sDay self.time = self.seconds - self.datetime = str(datetime.datetime.utcfromtimestamp(self.time).strftime("%Y-%m-%d %H:%M:%S.%f")) + self.datetime = str(datetime.datetime. + utcfromtimestamp(self.time).strftime("%Y-%m-%d %H:%M:%S.%f")) def printTime(self, array): array.append(self.time) diff --git a/tm/OBSW_TmPacket.py b/tm/obsw_pus_tm_factory.py similarity index 88% rename from tm/OBSW_TmPacket.py rename to tm/obsw_pus_tm_factory.py index e05f52e..f063301 100644 --- a/tm/OBSW_TmPacket.py +++ b/tm/obsw_pus_tm_factory.py @@ -1,15 +1,15 @@ # -*- coding: utf-8 -*- """ -Program: OBSW_TmPacket.py +Program: obsw_pus_tm_factory.py Date: 01.11.2019 Description: Deserialize TM byte array into PUS TM Class Author: R.Mueller, S. Gaisser """ -from tm.OBSW_PusTm import PUSTelemetry -from tm.OBSW_TmService1 import Service1TM -from tm.OBSW_TmService3 import Service3TM -from tm.OBSW_TmService5 import Service5TM +from tm.obsw_pus_tm_generic import PUSTelemetry +from tm.obsw_tm_service_1 import Service1TM +from tm.obsw_tm_service_3 import Service3TM +from tm.obsw_tm_service_5 import Service5TM import struct @@ -77,7 +77,7 @@ class Service9TM(PUSTelemetry): class Service17TM(PUSTelemetry): def __init__(self, byteArray): super().__init__(byteArray) - self.printPacketInfo("Test Reply") + self.print_packet_info("Test Reply") def print_telemetry_header(self, array): super().print_telemetry_header(array) @@ -93,18 +93,18 @@ class Service200TM(PUSTelemetry): super().__init__(byteArray) self.isCantReachModeReply = False self.isModeReply = False - self.printPacketInfo("Mode Reply") + self.print_packet_info("Mode Reply") self.objectId = struct.unpack('>I', self.byteArrayData[0:4])[0] if self.dataFieldHeader.subtype == 7: - self.appendPacketInfo(": Can't reach mode") + self.append_packet_info(": Can't reach mode") self.isCantReachModeReply = True self.returnValue = self.byteArrayData[4] << 8 | self.byteArrayData[5] elif self.dataFieldHeader.subtype == 6 or self.dataFieldHeader.subtype == 8: self.isModeReply = True if self.dataFieldHeader.subtype == 8: - self.appendPacketInfo(": Wrong Mode") + self.append_packet_info(": Wrong Mode") elif self.dataFieldHeader.subtype == 6: - self.appendPacketInfo(": Mode reached") + self.append_packet_info(": Mode reached") self.mode = struct.unpack('>I', self.byteArrayData[4:8])[0] self.submode = self.byteArrayData[8] diff --git a/tm/OBSW_PusTm.py b/tm/obsw_pus_tm_generic.py similarity index 77% rename from tm/OBSW_PusTm.py rename to tm/obsw_pus_tm_generic.py index cc82f7d..af8376b 100644 --- a/tm/OBSW_PusTm.py +++ b/tm/obsw_pus_tm_generic.py @@ -1,4 +1,4 @@ -from tm.OBSW_PusPacket import OBSWPusPacket +from tm.obsw_pus_tm_base import OBSWPusPacket from typing import TypeVar, Dict, Tuple, Deque, List PusTmT = TypeVar('PusTmT', bound='PUSTelemetry') @@ -17,13 +17,13 @@ class PUSTelemetry(OBSWPusPacket): self.byteArrayData = self.data self.printInfo = "" - def unpackTelemetry(self): + def unpack_telemetry(self): pass - def printPacketInfo(self, printInfo): + def print_packet_info(self, printInfo): self.printInfo = printInfo - def appendPacketInfo(self, printInfo): + def append_packet_info(self, printInfo): self.printInfo = self.printInfo + printInfo def print_telemetry_header(self, array): @@ -32,9 +32,9 @@ class PUSTelemetry(OBSWPusPacket): def print_telemetry_column_headers(self, array): super().printPusPacketHeaderColumnHeaders(array) - def packTmInformation(self) -> pusTmInfoT: + def pack_tm_information(self) -> pusTmInfoT: tmInformation = { - "G_SERVICE": self.getService(), + "service": self.getService(), "subservice": self.getSubservice(), "ssc": self.getSSC(), "data": self.byteArrayData, diff --git a/tm/OBSW_TmService1.py b/tm/obsw_tm_service_1.py similarity index 85% rename from tm/OBSW_TmService1.py rename to tm/obsw_tm_service_1.py index 32616eb..9ba912d 100644 --- a/tm/OBSW_TmService1.py +++ b/tm/obsw_tm_service_1.py @@ -1,11 +1,11 @@ # -*- coding: utf-8 -*- """ -Program: OBSW_TmService1.py +Program: obsw_tm_service_1.py Date: 30.12.2019 Description: Deserialize Pus Verification TM Author: R. Mueller """ -from tm.OBSW_PusTm import PUSTelemetry +from tm.obsw_pus_tm_generic import PUSTelemetry from typing import Dict import struct @@ -20,13 +20,13 @@ class Service1TM(PUSTelemetry): # Failure Reports with error code self.tcPacketId = self.byteArrayData[0] << 8 | self.byteArrayData[1] self.tcSSC = ((self.byteArrayData[2] & 0x3F) << 8) | self.byteArrayData[3] - self.printPacketInfo("Success Verification") + self.print_packet_info("Success Verification") if self.dataFieldHeader.subtype % 2 == 0: - self.printPacketInfo("Failure Verficiation") + self.print_packet_info("Failure Verficiation") self.tcErrorCode = True if self.dataFieldHeader.subtype == 6: self.isStep = True - self.appendPacketInfo(" : Step Failure") + self.append_packet_info(" : Step Failure") self.stepNumber = struct.unpack('>B', self.byteArrayData[4:5])[0] self.ErrCode = struct.unpack('>H', self.byteArrayData[5:7])[0] self.errorParam1 = struct.unpack('>I', self.byteArrayData[7:11])[0] @@ -39,7 +39,7 @@ class Service1TM(PUSTelemetry): elif self.dataFieldHeader.subtype == 5: self.isStep = True - self.appendPacketInfo(" : Step Success") + self.append_packet_info(" : Step Success") self.stepNumber = struct.unpack('>B', self.byteArrayData[4:5])[0] def print_telemetry_header(self, array): @@ -68,8 +68,8 @@ class Service1TM(PUSTelemetry): elif self.isStep: array.append("Step Number") - def packTmInformation(self) -> pusPacketInfoService1T: - tmInformation = super().packTmInformation() + def pack_tm_information(self) -> pusPacketInfoService1T: + tmInformation = super().pack_tm_information() addInformation = { "tcPacketId": self.tcPacketId, "tcSSC": self.tcSSC, diff --git a/tm/OBSW_TmService3.py b/tm/obsw_tm_service_3.py similarity index 95% rename from tm/OBSW_TmService3.py rename to tm/obsw_tm_service_3.py index 1d01629..cb6bd01 100644 --- a/tm/OBSW_TmService3.py +++ b/tm/obsw_tm_service_3.py @@ -1,12 +1,12 @@ # -*- coding: utf-8 -*- """ -Program: OBSW_TmService3.py +Program: obsw_tm_service_3.py Date: 30.12.2019 Description: Deserialize Housekeeping TM Author: R. Mueller """ -from tm.OBSW_PusTm import PUSTelemetry +from tm.obsw_pus_tm_generic import PUSTelemetry from typing import TypeVar import struct @@ -24,7 +24,7 @@ class Service3TM(PUSTelemetry): self.hkDefinition = [] self.numberOfParameters = 0 self.validity_buffer = [] - self.printPacketInfo("Housekeeping Packet") + self.print_packet_info("Housekeeping Packet") self.paramLength = 0 if self.getSubservice() == 10 or self.getSubservice() == 12: self.handleFillingDefinitionArrays() diff --git a/tm/OBSW_TmService5.py b/tm/obsw_tm_service_5.py similarity index 75% rename from tm/OBSW_TmService5.py rename to tm/obsw_tm_service_5.py index c308a92..7c14811 100644 --- a/tm/OBSW_TmService5.py +++ b/tm/obsw_tm_service_5.py @@ -1,27 +1,27 @@ # -*- coding: utf-8 -*- """ -Program: OBSW_TmService5.py +Program: obsw_tm_service_5.py Date: 30.12.2019 Description: Deserialize PUS Event Report Author: R. Mueller """ -from tm.OBSW_PusTm import PUSTelemetry +from tm.obsw_pus_tm_generic import PUSTelemetry import struct class Service5TM(PUSTelemetry): def __init__(self, byteArray): super().__init__(byteArray) - self.printPacketInfo("Event") + self.print_packet_info("Event") if self.getSubservice() == 1: - self.appendPacketInfo(" Info") + self.append_packet_info(" Info") elif self.getSubservice() == 2: - self.appendPacketInfo(" Error Low Severity") + self.append_packet_info(" Error Low Severity") elif self.getSubservice() == 3: - self.appendPacketInfo(" Error Med Severity") + self.append_packet_info(" Error Med Severity") elif self.getSubservice() == 4: - self.appendPacketInfo(" Error High Severity") + self.append_packet_info(" Error High Severity") self.eventId = struct.unpack('>H', self.byteArrayData[0:2])[0] self.objectId = struct.unpack('>I', self.byteArrayData[2:6])[0] self.param1 = struct.unpack('>I', self.byteArrayData[6:10])[0] @@ -41,8 +41,8 @@ class Service5TM(PUSTelemetry): array.append("Parameter 1") array.append("Parameter 2") - def packTmInformation(self): - tmInformation = super().packTmInformation() + def pack_tm_information(self): + tmInformation = super().pack_tm_information() addInformation = { "RID": self.objectId, "EventID": self.eventId, diff --git a/utility/obsw_args_parser.py b/utility/obsw_args_parser.py index 022910f..3ec95a7 100644 --- a/utility/obsw_args_parser.py +++ b/utility/obsw_args_parser.py @@ -37,8 +37,8 @@ def parse_input_arguments(): '--np', dest='printFile', help='Supply --np to suppress print output to file.', action='store_false') arg_parser.add_argument( - '-o', '--tc_timeout_factor', type=float, help='TC Timeout Factor. Multiplied with TM Timeout,' - ' TC sent again after this time period. Default: 3.5', default=3.5) + '-o', '--tc_timeout_factor', type=float, help='TC Timeout Factor. Multiplied with ' + 'TM Timeout, TC sent again after this time period. Default: 3.5', default=3.5) arg_parser.add_argument( '-r', '--rawDataPrint', help='Supply -r to print all raw data directly', action='store_true') diff --git a/utility/obsw_tmtc_printer.py b/utility/obsw_tmtc_printer.py index 3c65c1a..88240a8 100644 --- a/utility/obsw_tmtc_printer.py +++ b/utility/obsw_tmtc_printer.py @@ -12,8 +12,8 @@ import os import sys from typing import TypeVar from config import OBSW_Config as g -from tm.OBSW_PusTm import PusTmT -from tm.OBSW_TmService3 import PusTm3T +from tm.obsw_pus_tm_generic import PusTmT +from tm.obsw_tm_service_3 import PusTm3T from tc.OBSW_TcPacket import PusTcT, pusTcInfoT TmTcPrinterT = TypeVar('TmTcPrinterT', bound='TmTcPrinter') -- GitLab