diff --git a/.idea/runConfigurations/OBSW_TmTcClient_Service_9_Serial.xml b/.idea/runConfigurations/OBSW_TmTcClient_Service_9_Serial.xml index e608c4b749ac04f659876a79f5243c4738ca6829..9a4210466da0e27ab2d160ac11ac192294cd3184 100644 --- a/.idea/runConfigurations/OBSW_TmTcClient_Service_9_Serial.xml +++ b/.idea/runConfigurations/OBSW_TmTcClient_Service_9_Serial.xml @@ -12,8 +12,8 @@ <option name="ADD_CONTENT_ROOTS" value="true" /> <option name="ADD_SOURCE_ROOTS" value="true" /> <EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" /> - <option name="SCRIPT_NAME" value="C:\Users\Robin\NoSyncDokumente\sourceobsw\tmtc\obsw_tmtc_client.py" /> - <option name="PARAMETERS" value="-m 3 -s 9 -p -c 1" /> + <option name="SCRIPT_NAME" value="$PROJECT_DIR$/obsw_tmtc_client.py" /> + <option name="PARAMETERS" value="-m 3 -s 9 -c 1" /> <option name="SHOW_COMMAND_LINE" value="false" /> <option name="EMULATE_TERMINAL" value="true" /> <option name="MODULE_MODE" value="false" /> diff --git a/archive/obsw_pus_tm_base.py b/archive/obsw_pus_tm_base.py new file mode 100644 index 0000000000000000000000000000000000000000..287c9cdf06a80daf381a63a58d6e20007e14f18b --- /dev/null +++ b/archive/obsw_pus_tm_base.py @@ -0,0 +1,59 @@ +# -*- coding: utf-8 -*- +""" +Created on Wed Apr 4 11:44:48 2018 +Generic PUS packet class to deserialize raw PUS telemetry. +@author: S. Gaisser +""" + +import crcmod +import datetime + + +class ObswPusPacket: + def __init__(self, byte_array: bytearray): + self.__packet_raw = byte_array + self.PUSHeader = PUSPacketHeader(byte_array) + byte_array = byte_array[6:] + self.dataFieldHeader = OBSWPUSPacketDataFieldHeader(byte_array) + byte_array = byte_array[12:] + self.data = byte_array[:len(byte_array) - 2] + self.crc = byte_array[len(byte_array) - 2] << 8 | byte_array[len(byte_array) - 1] + + def get_raw_packet(self) -> bytearray: + return self.__packet_raw + + def append_pus_packet_header(self, array): + self.dataFieldHeader.printDataFieldHeader(array) + self.PUSHeader.printPusPacketHeader(array) + + def append_pus_packet_header_column_headers(self, array): + self.dataFieldHeader.printDataFieldHeaderColumnHeader(array) + self.PUSHeader.printPusPacketHeaderColumnHeaders(array) + + def getPacketSize(self): + # PusHeader Size + data size + size = PUSPacketHeader.headerSize + self.PUSHeader.length + 1 + return size + + def getService(self): + return self.dataFieldHeader.type + + def getSubservice(self): + return self.dataFieldHeader.subtype + + def getSSC(self): + return self.PUSHeader.sourceSequenceCount + + def printData(self): + print(self.returnDataString()) + + def returnDataString(self): + strToPrint = "[" + for byte in self.data: + strToPrint += str(hex(byte)) + " , " + strToPrint = strToPrint.rstrip(' , ') + strToPrint += ']' + return strToPrint + + + diff --git a/comIF/obsw_com_interface.py b/comIF/obsw_com_interface.py index 916f95a9c0a30f69cfd225297ffdab55cccbe5c9..0b271f219ca24a6393900cf574bd4bfa7d7b8886 100644 --- a/comIF/obsw_com_interface.py +++ b/comIF/obsw_com_interface.py @@ -9,9 +9,9 @@ Description: Generic Communication Interface. Defines the syntax of the communic """ from abc import abstractmethod from typing import TypeVar, Tuple, Union -from tm.obsw_pus_tm_generic import pusTmQueueT, pusTmTupleQueueT, pusTmInfoQueueT, pusTmListT +from tm.obsw_pus_tm_base import PusTmQueueT, PusTmTupleQueueT, PusTmInfoQueueT, PusTmListT from utility.obsw_tmtc_printer import TmTcPrinterT -from tc.OBSW_TcPacket import pusTcInfoT +from tc.obsw_pus_tc_base import PusTcInfoT ComIfT = TypeVar('ComIfT', bound='CommunicationInterface') @@ -35,7 +35,7 @@ class CommunicationInterface: """ @abstractmethod - def send_telecommand(self, tc_packet: bytearray, tc_packet_info: pusTcInfoT = None) -> None: + def send_telecommand(self, tc_packet: bytearray, tc_packet_info: PusTcInfoT = None) -> None: """ Send telecommands :param tc_packet: TC packet to send @@ -44,7 +44,7 @@ class CommunicationInterface: """ @abstractmethod - def receive_telemetry(self, parameters: any = 0) -> pusTmListT: + def receive_telemetry(self, parameters: any = 0) -> PusTmListT: """ Returns a list of packets. Most of the time, this will simply call the pollInterface function @@ -55,7 +55,7 @@ class CommunicationInterface: return packet_list @abstractmethod - def poll_interface(self, parameters: any = 0) -> Tuple[bool, pusTmListT]: + def poll_interface(self, parameters: any = 0) -> Tuple[bool, PusTmListT]: """ Poll the interface and return a list of received packets :param parameters: @@ -67,37 +67,37 @@ class CommunicationInterface: @abstractmethod def data_available(self, parameters: any) -> bool: """ - Check whether data is available + Check whether _tm_data is available :param parameters: :return: """ - def receive_telemetry_and_store_info(self, tm_info_queue: pusTmInfoQueueT) -> \ - Union[None, pusTmInfoQueueT]: + def receive_telemetry_and_store_info(self, tm_info_queue: PusTmInfoQueueT) -> \ + Union[None, PusTmInfoQueueT]: """ Receive telemetry and store packet information in dict format into a queue. - If no data is available, don't do anything. + If no _tm_data is available, don't do anything. :param tm_info_queue: :return: """ print("No child implementation provided yet!") return - def receive_telemetry_and_store_tm(self, tm_queue: pusTmQueueT) -> Union[None, pusTmQueueT]: + def receive_telemetry_and_store_tm(self, tm_queue: PusTmQueueT) -> Union[None, PusTmQueueT]: """ Receive telemetry packets and store TM object into a queue. - If no data is available, don't do anything. + If no _tm_data is available, don't do anything. :param tm_queue: :return: """ print("No child implementation provided yet!") return - def receive_telemetry_and_store_tuple(self, tm_tuple_queue: pusTmTupleQueueT) -> \ - Union[None, pusTmTupleQueueT]: + def receive_telemetry_and_store_tuple(self, tm_tuple_queue: PusTmTupleQueueT) -> \ + Union[None, PusTmTupleQueueT]: """ Poll telemetry and store a tuple consisting of TM information and the - TM packet into the queue. If no data is available, don't do anything. + TM packet into the queue. If no _tm_data is available, don't do anything. :param tm_tuple_queue: :return: """ diff --git a/comIF/obsw_ethernet_com_if.py b/comIF/obsw_ethernet_com_if.py index f4562712832deb6abd4b5b8d776d2cb4b1a2a597..94b19d6c55d6a88b6617a766528e6ec98e4218f1 100644 --- a/comIF/obsw_ethernet_com_if.py +++ b/comIF/obsw_ethernet_com_if.py @@ -10,10 +10,10 @@ import socket import sys from typing import Tuple, Union -from comIF.obsw_com_interface import CommunicationInterface, pusTmListT, pusTmQueueT, \ - pusTmTupleQueueT, pusTmInfoQueueT -from tm.obsw_pus_tm_factory import PUSTelemetryFactory -from tc.OBSW_TcPacket import pusTcInfoT +from comIF.obsw_com_interface import CommunicationInterface, PusTmListT, PusTmQueueT, \ + PusTmTupleQueueT, PusTmInfoQueueT +from tm.obsw_pus_tm_factory import PusTelemetryFactory +from tc.obsw_pus_tc_base import PusTcInfoT from utility.obsw_tmtc_printer import TmTcPrinterT import config.OBSW_Config as g @@ -39,7 +39,7 @@ class EthernetComIF(CommunicationInterface): def close(self) -> None: pass - def send_telecommand(self, tc_packet: bytearray, tc_packet_info: pusTcInfoT = None) -> None: + def send_telecommand(self, tc_packet: bytearray, tc_packet_info: PusTcInfoT = None) -> None: self.tmtc_printer.print_telecommand(tc_packet, tc_packet_info) self.sock_send.sendto(tc_packet, self.send_address) @@ -49,11 +49,11 @@ class EthernetComIF(CommunicationInterface): return False return ready - def poll_interface(self, pollTimeout: float = 0) -> Tuple[bool, pusTmListT]: + def poll_interface(self, pollTimeout: float = 0) -> Tuple[bool, PusTmListT]: ready = self.data_available(pollTimeout) if ready: data = self.sock_receive.recvfrom(1024)[0] - packet = PUSTelemetryFactory(data) + packet = PusTelemetryFactory(data) self.tmtc_printer.print_telemetry(packet) packet_list = [packet] return True, packet_list @@ -65,8 +65,8 @@ class EthernetComIF(CommunicationInterface): return packet_list return [] - def receive_telemetry_and_store_info(self, tmInfoQueue: pusTmInfoQueueT) -> \ - Union[None, pusTmInfoQueueT]: + def receive_telemetry_and_store_info(self, tmInfoQueue: PusTmInfoQueueT) -> \ + Union[None, PusTmInfoQueueT]: packets = self.receive_telemetry() for packet in packets: packet_info = packet.pack_tm_information() @@ -74,14 +74,14 @@ class EthernetComIF(CommunicationInterface): tmInfoQueue.append(packet_info) return tmInfoQueue - def receive_telemetry_and_store_tm(self, tmQueue: pusTmQueueT) -> Union[None, pusTmQueueT]: + def receive_telemetry_and_store_tm(self, tmQueue: PusTmQueueT) -> Union[None, PusTmQueueT]: packets = self.receive_telemetry() for packet in packets: tmQueue.append(packet) return tmQueue - def receive_telemetry_and_store_tuple(self, tmTupleQueue: pusTmTupleQueueT) -> \ - Union[None, pusTmTupleQueueT]: + def receive_telemetry_and_store_tuple(self, tmTupleQueue: PusTmTupleQueueT) -> \ + Union[None, PusTmTupleQueueT]: packet_list = self.receive_telemetry() for packet in packet_list: packet_info = packet.pack_tm_information() diff --git a/comIF/obsw_serial_com_if.py b/comIF/obsw_serial_com_if.py index ca921e92524a7ff04d73ca2bd08de195b02fa05e..f567127224036bf814dfe2483d126e4e66811d27 100644 --- a/comIF/obsw_serial_com_if.py +++ b/comIF/obsw_serial_com_if.py @@ -11,11 +11,11 @@ import logging from typing import Tuple, List, Union, Optional import serial -from comIF.obsw_com_interface import CommunicationInterface, pusTmQueueT +from comIF.obsw_com_interface import CommunicationInterface, PusTmQueueT from utility.obsw_tmtc_printer import TmTcPrinterT -from tm.obsw_pus_tm_factory import PUSTelemetryFactory -from tm.obsw_pus_tm_generic import pusTmInfoQueueT, pusTmTupleQueueT, pusTmListT -from tc.OBSW_TcPacket import pusTcInfoT +from tm.obsw_pus_tm_factory import PusTelemetryFactory +from tm.obsw_pus_tm_base import PusTmInfoQueueT, PusTmTupleQueueT, PusTmListT +from tc.obsw_pus_tc_base import PusTcInfoT SERIAL_PACKET_MAX_SIZE = 1024 HEADER_BYTES_BEFORE_SIZE = 5 @@ -47,7 +47,7 @@ class SerialComIF(CommunicationInterface): logging.exception("Error") sys.exit() - def send_telecommand(self, tc_packet: bytearray, tc_packet_info: pusTcInfoT = None) -> None: + def send_telecommand(self, tc_packet: bytearray, tc_packet_info: PusTcInfoT = None) -> None: self.tmtc_printer.print_telecommand(tc_packet, tc_packet_info) self.serial.write(tc_packet) @@ -57,12 +57,12 @@ class SerialComIF(CommunicationInterface): return packet_list return [] - def poll_interface(self, parameters: any = 0) -> Tuple[bool, pusTmListT]: + def poll_interface(self, parameters: any = 0) -> Tuple[bool, PusTmListT]: if self.data_available(): pus_data_list, number_of_packets = self.__poll_pus_packets() packet_list = [] for counter in range(0, number_of_packets): - packet = PUSTelemetryFactory(pus_data_list[counter]) + packet = PusTelemetryFactory(pus_data_list[counter]) self.tmtc_printer.print_telemetry(packet) packet_list.append(packet) return True, packet_list @@ -119,15 +119,15 @@ class SerialComIF(CommunicationInterface): self.number_of_packets = self.number_of_packets + 1 return end_index - def receive_telemetry_and_store_tm(self, tmQueue: pusTmQueueT) -> Union[None, pusTmQueueT]: + def receive_telemetry_and_store_tm(self, tmQueue: PusTmQueueT) -> Union[None, PusTmQueueT]: (packet_received, pus_packets) = self.poll_interface() if packet_received: for packet in pus_packets: tmQueue.append(packet) return tmQueue - def receive_telemetry_and_store_info(self, tmInfoQueue: pusTmInfoQueueT) -> \ - Union[None, pusTmInfoQueueT]: + def receive_telemetry_and_store_info(self, tmInfoQueue: PusTmInfoQueueT) -> \ + Union[None, PusTmInfoQueueT]: (packet_received, pus_packets) = self.poll_interface() if packet_received: for packet in pus_packets: @@ -135,8 +135,8 @@ class SerialComIF(CommunicationInterface): tmInfoQueue.append(tm_info) return tmInfoQueue - def receive_telemetry_and_store_tuple(self, tm_tuple_queue: pusTmTupleQueueT) -> \ - Union[None, pusTmTupleQueueT]: + def receive_telemetry_and_store_tuple(self, tm_tuple_queue: PusTmTupleQueueT) -> \ + Union[None, PusTmTupleQueueT]: (packet_received, pus_packets) = self.poll_interface() if packet_received: for packet in pus_packets: diff --git a/config/OBSW_Config.py b/config/OBSW_Config.py index 804b566faebac033206f0047e59a083a37915ce9..ae1d4d2fd72281badaefc27d67522949d81660f1 100644 --- a/config/OBSW_Config.py +++ b/config/OBSW_Config.py @@ -10,7 +10,7 @@ import enum from typing import Tuple """ -Global type definitions +Global service_type definitions """ ethernetAddressT = Tuple[str, int] diff --git a/obsw_tmtc_client.py b/obsw_tmtc_client.py index 57171b64a53737d7dc26d625be4edb4701c8bf5c..5c314b77aed2e5eff19c9b1a6ad4cb465b654147 100644 --- a/obsw_tmtc_client.py +++ b/obsw_tmtc_client.py @@ -62,7 +62,7 @@ from collections import deque from test import OBSW_PusServiceTest from config import OBSW_Config as g from config.OBSW_Config import setGlobals -from tc.OBSW_TcPacker import PUSTelecommand, createTotalTcQueue, serviceTestSelect +from tc.obsw_pus_tc_packer import PusTelecommand, createTotalTcQueue, service_test_select from sendreceive.obsw_single_command_sender_receiver import SingleCommandSenderReceiver from sendreceive.obsw_sequential_sender_receiver import SequentialCommandSenderReceiver @@ -118,7 +118,7 @@ def main(): service_queue = deque() sender_and_receiver = SequentialCommandSenderReceiver( comInterface=communication_interface, tmtcPrinter=tmtc_printer, tmTimeout=g.G_TM_TIMEOUT, - tcQueue=serviceTestSelect(g.G_SERVICE, service_queue), + tcQueue=service_test_select(g.G_SERVICE, service_queue), tcTimeoutFactor=g.G_TC_SEND_TIMEOUT_FACTOR, tmListener=tm_listener) sender_and_receiver.send_queue_tc_and_receive_tm_sequentially() @@ -155,10 +155,10 @@ def command_preparation(): """ # Direct command which triggers an additional step reply and one completion reply # Single Command Testing - command = PUSTelecommand(service=17, subservice=1, SSC=21) + command = PusTelecommand(service=17, subservice=1, SSC=21) # command.print() # file = bytearray([1, 2, 3, 4, 5]) - # command = PUSTelecommand(G_SERVICE=23, subservice=1, SSC=21, data=file) + # command = PUSTelecommand(G_SERVICE=23, subservice=1, SSC=21, _tm_data=file) # command.packCommandTuple() return command.packCommandTuple() diff --git a/sendreceive/obsw_command_sender_receiver.py b/sendreceive/obsw_command_sender_receiver.py index baf97f243d3600c1ed9236efe41ae309891085ee..571ef366384b8346bb79d5387aa8e56b93e8c2dc 100644 --- a/sendreceive/obsw_command_sender_receiver.py +++ b/sendreceive/obsw_command_sender_receiver.py @@ -16,7 +16,7 @@ import time from comIF.obsw_com_interface import CommunicationInterface, ComIfT from utility.obsw_tmtc_printer import TmTcPrinterT, TmTcPrinter from sendreceive.obsw_tm_listener import TmListenerT, TmListener -from tc.OBSW_TcPacket import tcQueueEntryT +from tc.obsw_pus_tc_base import TcQueueEntryT # pylint: disable=too-few-public-methods @@ -46,17 +46,17 @@ class CommandSenderReceiver: if isinstance(comInterface, CommunicationInterface): self._com_interface = comInterface else: - raise TypeError("Invalid communication interface type!") + raise TypeError("Invalid communication interface service_type!") if isinstance(tmtcPrinter, TmTcPrinter): self._tmtc_printer = tmtcPrinter else: - raise TypeError("Invalid TMTC Printer type!") + raise TypeError("Invalid TMTC Printer service_type!") if isinstance(tmListener, TmListener): self._tm_listener = tmListener else: - raise TypeError("Invalid TM Listener type!") + raise TypeError("Invalid TM Listener service_type!") self._reply_received = False self._start_time = 0 @@ -78,7 +78,7 @@ class CommandSenderReceiver: else: self._check_for_timeout() - def check_queue_entry(self, tc_queue_entry: tcQueueEntryT) -> bool: + def check_queue_entry(self, tc_queue_entry: TcQueueEntryT) -> bool: """ Checks whether the entry in the tc queue is a telecommand. The last telecommand and respective information are stored in _last_tc diff --git a/sendreceive/OBSW_MultipleCommandsSenderReceiver.py b/sendreceive/obsw_multiple_commands_sender_receiver.py similarity index 100% rename from sendreceive/OBSW_MultipleCommandsSenderReceiver.py rename to sendreceive/obsw_multiple_commands_sender_receiver.py diff --git a/sendreceive/obsw_sequential_sender_receiver.py b/sendreceive/obsw_sequential_sender_receiver.py index 9afc4efdc1add327e463ff93f17f692346cce85c..9f9afccd5fd2f4b3304353b5cbff6f19d7ec85ea 100644 --- a/sendreceive/obsw_sequential_sender_receiver.py +++ b/sendreceive/obsw_sequential_sender_receiver.py @@ -12,7 +12,7 @@ from sendreceive.obsw_command_sender_receiver import CommandSenderReceiver from sendreceive.obsw_tm_listener import TmListenerT from comIF.obsw_com_interface import ComIfT from utility.obsw_tmtc_printer import TmTcPrinterT -from tc.OBSW_TcPacket import tcQueueT +from tc.obsw_pus_tc_base import TcQueueT class SequentialCommandSenderReceiver(CommandSenderReceiver): @@ -20,7 +20,7 @@ class SequentialCommandSenderReceiver(CommandSenderReceiver): Specific implementation of CommandSenderReceiver to send multiple telecommands in sequence """ def __init__(self, comInterface: ComIfT, tmtcPrinter: TmTcPrinterT, tmListener: TmListenerT, - tmTimeout: float, tcQueue: tcQueueT, tcTimeoutFactor: float): + tmTimeout: float, tcQueue: TcQueueT, tcTimeoutFactor: float): """ :param comInterface: CommunicationInterface object, passed on to CommandSenderReceiver :param tmListener: TmListener object which runs in the background and receives all Telemetry diff --git a/tc/OBSW_TcService2.py b/tc/OBSW_TcService2.py index 7b1697f6212cccf6c82626fad82f1de5a2a2e59e..305a7576bda5cf17dd6e102ff7cf3c4796ce3dd6 100644 --- a/tc/OBSW_TcService2.py +++ b/tc/OBSW_TcService2.py @@ -8,7 +8,7 @@ Description: PUS Custom Service 8: Device Access, Native low-level commanding """ import struct -from tc.OBSW_TcPacket import PUSTelecommand, Deque +from tc.obsw_pus_tc_base import PusTelecommand, Deque from tc.OBSW_TcService200 import packModeData @@ -20,29 +20,29 @@ def packService2TestInto(tcQueue: Deque, calledExternally: bool = False) -> Dequ # Set Raw Mode tcQueue.appendleft(("print", "\r\nTesting Service 2: Setting Raw Mode")) modeData = packModeData(objectId, 3, 0) - command = PUSTelecommand(service=200, subservice=1, SSC=2020, data=modeData) + command = PusTelecommand(service=200, subservice=1, SSC=2020, data=modeData) tcQueue.appendleft(command.packCommandTuple()) # toggle wiretapping raw tcQueue.appendleft(("print", "\r\nTesting Service 2: Toggling Wiretapping Raw")) wiretappingToggleData = packWiretappingMode(objectId, 1) - toggleWiretappingOnCommand = PUSTelecommand(service=2, subservice=129, SSC=200, + toggleWiretappingOnCommand = PusTelecommand(service=2, subservice=129, SSC=200, data=wiretappingToggleData) tcQueue.appendleft(toggleWiretappingOnCommand.packCommandTuple()) - # send raw command, data should be returned via TM[2,130] and TC[2,131] + # send raw command, _tm_data should be returned via TM[2,130] and TC[2,131] tcQueue.appendleft(("print", "\r\nTesting Service 2: Sending Raw Command")) rawCommand = struct.pack(">I", 666) rawData = objectId + rawCommand - rawCommand = PUSTelecommand(service=2, subservice=128, SSC=201, data=rawData) + rawCommand = PusTelecommand(service=2, subservice=128, SSC=201, data=rawData) tcQueue.appendleft(rawCommand.packCommandTuple()) # toggle wiretapping off tcQueue.appendleft(("print", "\r\nTesting Service 2: Toggle Wiretapping Off")) wiretappingToggleData = packWiretappingMode(objectId, 0) - toggleWiretappingOffCommand = PUSTelecommand(service=2, subservice=129, SSC=204, + toggleWiretappingOffCommand = PusTelecommand(service=2, subservice=129, SSC=204, data=wiretappingToggleData) tcQueue.appendleft(toggleWiretappingOffCommand.packCommandTuple()) # send raw command which should be returned via TM[2,130] tcQueue.appendleft(("print", "\r\nTesting Service 2: Send second raw command")) - command = PUSTelecommand(service=2, subservice=128, SSC=205, data=rawData) + command = PusTelecommand(service=2, subservice=128, SSC=205, data=rawData) tcQueue.appendleft(command.packCommandTuple()) tcQueue.appendleft(("print", "\r")) if calledExternally is False: diff --git a/tc/OBSW_TcService200.py b/tc/OBSW_TcService200.py index 1e2ddf5a4228a3ff0342be52f8b18e77544cddc2..2c9fe91c682209c69e6d3936b1f7a43d2bd48921 100644 --- a/tc/OBSW_TcService200.py +++ b/tc/OBSW_TcService200.py @@ -11,35 +11,35 @@ Run script with -s <Service or Device> -m 3 with optional -p parameter for file @author: R. Mueller """ -from tc.OBSW_TcPacket import PUSTelecommand -from tc.OBSW_TcPacker import tcQueueT +from tc.obsw_pus_tc_base import PusTelecommand +from tc.obsw_pus_tc_packer import TcQueueT import config.OBSW_Config as g import struct -def packService200TestInto(tcQueue: tcQueueT) -> tcQueueT: +def packService200TestInto(tcQueue: TcQueueT) -> TcQueueT: tcQueue.appendleft(("print", "Testing Service 200")) # Object ID: Dummy Device objectId = g.DUMMY_DEVICE_ID # Set On Mode tcQueue.appendleft(("print", "\r\nTesting Service 200: Set Mode On")) modeData = packModeData(objectId, 1, 0) - command = PUSTelecommand(service=200, subservice=1, SSC=2000, data=modeData) + command = PusTelecommand(service=200, subservice=1, SSC=2000, data=modeData) tcQueue.appendleft(command.packCommandTuple()) # Set Normal mode tcQueue.appendleft(("print", "\r\nTesting Service 200: Set Mode Normal")) modeData = packModeData(objectId, 2, 0) - command = PUSTelecommand(service=200, subservice=1, SSC=2010, data=modeData) + command = PusTelecommand(service=200, subservice=1, SSC=2010, data=modeData) tcQueue.appendleft(command.packCommandTuple()) # Set Raw Mode tcQueue.appendleft(("print", "\r\nTesting Service 200: Set Mode Raw")) modeData = packModeData(objectId, 3, 0) - command = PUSTelecommand(service=200, subservice=1, SSC=2020, data=modeData) + command = PusTelecommand(service=200, subservice=1, SSC=2020, data=modeData) tcQueue.appendleft(command.packCommandTuple()) # Set Off Mode tcQueue.appendleft(("print", "\r\nTesting Service 200: Set Mode Off")) modeData = packModeData(objectId, 0, 0) - command = PUSTelecommand(service=200, subservice=1, SSC=2030, data=modeData) + command = PusTelecommand(service=200, subservice=1, SSC=2030, data=modeData) tcQueue.appendleft(command.packCommandTuple()) tcQueue.appendleft(("export", "log/tmtc_log_service200.txt")) return tcQueue diff --git a/tc/OBSW_TcService3.py b/tc/OBSW_TcService3.py index 15aacab299c6df2037898f1c4807fc5cea0411f9..d42e71a2bfd8517a83ba3a78a2ceb45dd24f212a 100644 --- a/tc/OBSW_TcService3.py +++ b/tc/OBSW_TcService3.py @@ -8,7 +8,7 @@ Description: PUS Custom Service 8: Device Access, Native low-level commanding """ import struct from typing import Deque -from tc.OBSW_TcPacket import PUSTelecommand +from tc.obsw_pus_tc_base import PusTelecommand def packService3TestInto(tcQueue: Deque) -> Deque: @@ -30,30 +30,30 @@ def packService3TestInto(tcQueue: Deque) -> Deque: # deleting pre-defined test entry tcQueue.appendleft(("print", "\r\nTesting Service 3: Deleting pre-defined HK definition")) - command = PUSTelecommand(service=3, subservice=3, SSC=3000, data=sid1) + command = PusTelecommand(service=3, subservice=3, SSC=3000, data=sid1) tcQueue.appendleft(command.packCommandTuple()) # adding pre-defined definition to hk using test pool variables tcQueue.appendleft(("print", "\r\nTesting Service 3: Adding pre-defined HK definition")) - command = PUSTelecommand(service=3, subservice=1, SSC=3010, data=hkDefinition1) + command = PusTelecommand(service=3, subservice=1, SSC=3010, data=hkDefinition1) tcQueue.appendleft(command.packCommandTuple()) # adding custom definition to diagnostics using test pool variables tcQueue.appendleft(("print", "\r\nTesting Service 3: Adding custom diganostics definition")) - command = PUSTelecommand(service=3, subservice=2, SSC=3020, data=hkDefinition2) + command = PusTelecommand(service=3, subservice=2, SSC=3020, data=hkDefinition2) tcQueue.appendleft(command.packCommandTuple()) # enable custom hk definition tcQueue.appendleft(("print", "\r\nTesting Service 3: Enable custom definition")) - command = PUSTelecommand(service=3, subservice=5, SSC=3030, data=sid1) + command = PusTelecommand(service=3, subservice=5, SSC=3030, data=sid1) tcQueue.appendleft(command.packCommandTuple()) # enable custom diag definition tcQueue.appendleft(("print", "\r\nTesting Service 3: Enable custom diagnostics definition")) - command = PUSTelecommand(service=3, subservice=7, SSC=3040, data=sid2) + command = PusTelecommand(service=3, subservice=7, SSC=3040, data=sid2) tcQueue.appendleft(command.packCommandTuple()) # enable gps0 tcQueue.appendleft(("print", "\r\nTesting Service 3: Enable GPS definition")) - command = PUSTelecommand(service=3, subservice=5, SSC=3050, data=sidGps) + command = PusTelecommand(service=3, subservice=5, SSC=3050, data=sidGps) tcQueue.appendleft(command.packCommandTuple()) # maybe wait a bit to receive at least 2 packets.. @@ -61,61 +61,61 @@ def packService3TestInto(tcQueue: Deque) -> Deque: # Disable custom hk definition tcQueue.appendleft(("print", "\r\nTesting Service 3: Disable custom definition")) - command = PUSTelecommand(service=3, subservice=6, SSC=3060, data=sid1) + command = PusTelecommand(service=3, subservice=6, SSC=3060, data=sid1) tcQueue.appendleft(command.packCommandTuple()) # Disable custom diag definition tcQueue.appendleft(("print", "\r\nTesting Service 3: Disable custom diagnostics definition")) - command = PUSTelecommand(service=3, subservice=8, SSC=3070, data=sid2) + command = PusTelecommand(service=3, subservice=8, SSC=3070, data=sid2) tcQueue.appendleft(command.packCommandTuple()) # disable gps0 tcQueue.appendleft(("print", "\r\nTesting Service 3: Disable GPS definition")) - command = PUSTelecommand(service=3, subservice=6, SSC=3080, data=sidGps) + command = PusTelecommand(service=3, subservice=6, SSC=3080, data=sidGps) tcQueue.appendleft(command.packCommandTuple()) # report custom Diag definition tcQueue.appendleft(("print", "\r\nTesting Service 3: Reporting diag definition")) - command = PUSTelecommand(service=3, subservice=11, SSC=3100, data=sid2) + command = PusTelecommand(service=3, subservice=11, SSC=3100, data=sid2) tcQueue.appendleft(command.packCommandTuple()) # report gps definition tcQueue.appendleft(("print", "\r\nTesting Service 3: Reporting GPS definition")) - command = PUSTelecommand(service=3, subservice=9, SSC=3110, data=sidGps) + command = PusTelecommand(service=3, subservice=9, SSC=3110, data=sidGps) tcQueue.appendleft(command.packCommandTuple()) # generate one custom hk definition tcQueue.appendleft(("print", "\r\nTesting Service 3: Generate one custom hk definition")) - command = PUSTelecommand(service=3, subservice=27, SSC=3120, data=sid1) + command = PusTelecommand(service=3, subservice=27, SSC=3120, data=sid1) tcQueue.appendleft(command.packCommandTuple()) # generate one custom diag definition tcQueue.appendleft(("print", "\r\nTesting Service 3: Generate one custom diagnostics definition")) - command = PUSTelecommand(service=3, subservice=28, SSC=3120, data=sid2) + command = PusTelecommand(service=3, subservice=28, SSC=3120, data=sid2) tcQueue.appendleft(command.packCommandTuple()) # generate one gps 0 definition tcQueue.appendleft(("print", "\r\nTesting Service 3: Generate one gps 0 defintion")) - command = PUSTelecommand(service=3, subservice=27, SSC=3120, data=sidGps) + command = PusTelecommand(service=3, subservice=27, SSC=3120, data=sidGps) tcQueue.appendleft(command.packCommandTuple()) # modify custom hk definition interval newInterval = struct.pack('>f', 10.0) newIntervalCommand = sid1 + newInterval tcQueue.appendleft(("print", "\r\nTesting Service 3: Changing pre-defined HK definition interval")) - command = PUSTelecommand(service=3, subservice=31, SSC=3090, data=newIntervalCommand) + command = PusTelecommand(service=3, subservice=31, SSC=3090, data=newIntervalCommand) tcQueue.appendleft(command.packCommandTuple()) # report custom HK definition tcQueue.appendleft(("print", "\r\nTesting Service 3: Reporting pre-defined HK definition with changed interval")) - command = PUSTelecommand(service=3, subservice=9, SSC=3090, data=sid1) + command = PusTelecommand(service=3, subservice=9, SSC=3090, data=sid1) tcQueue.appendleft(command.packCommandTuple()) # modify custom diag definition interval newIntervalCommand = sid2 + newInterval tcQueue.appendleft(("print", "\r\nTesting Service 3: Changing custom diag HK definition interval")) - command = PUSTelecommand(service=3, subservice=32, SSC=3090, data=newIntervalCommand) + command = PusTelecommand(service=3, subservice=32, SSC=3090, data=newIntervalCommand) tcQueue.appendleft(command.packCommandTuple()) # report custom diag definition tcQueue.appendleft(("print", "\r\nTesting Service 3: Reporting diag definition")) - command = PUSTelecommand(service=3, subservice=11, SSC=3100, data=sid2) + command = PusTelecommand(service=3, subservice=11, SSC=3100, data=sid2) tcQueue.appendleft(command.packCommandTuple()) # append parameter to custom hk definiton # append parameter to custom diag definition # delete custom diag definition tcQueue.appendleft(("print", "\r\nTesting Service 3: Deleting custom diagnostics definition")) - command = PUSTelecommand(service=3, subservice=4, SSC=3120, data=sid2) + command = PusTelecommand(service=3, subservice=4, SSC=3120, data=sid2) tcQueue.appendleft(command.packCommandTuple()) # do some basic testing on predefined structs too diff --git a/tc/OBSW_TcService8.py b/tc/OBSW_TcService8.py index 37be84832c760e47ec4bc389838e88580c7bcf61..ea008b48308ef8cc7525e28232e732f1a940ee51 100644 --- a/tc/OBSW_TcService8.py +++ b/tc/OBSW_TcService8.py @@ -9,7 +9,7 @@ Description: PUS Custom Service 8: Function Management, High-Level Commanding import struct from typing import Deque -from tc.OBSW_TcPacket import PUSTelecommand +from tc.obsw_pus_tc_base import PusTelecommand from tc.OBSW_TcService200 import packModeData @@ -20,32 +20,32 @@ def packService8TestInto(tcQueue: Deque, calledExternally: bool = False) -> Dequ # set mode on tcQueue.appendleft(("print", "\r\nTesting Service 8: Set Normal Mode")) modeData = packModeData(objectId, 1, 0) - command = PUSTelecommand(service=200, subservice=1, SSC=800, data=modeData) + command = PusTelecommand(service=200, subservice=1, SSC=800, data=modeData) tcQueue.appendleft(command.packCommandTuple()) # set mode normal tcQueue.appendleft(("print", "\r\nTesting Service 8: Set Normal Mode")) modeData = packModeData(objectId, 2, 0) - command = PUSTelecommand(service=200, subservice=1, SSC=810, data=modeData) + command = PusTelecommand(service=200, subservice=1, SSC=810, data=modeData) tcQueue.appendleft(command.packCommandTuple()) # Direct command which triggers completion reply tcQueue.appendleft(("print", "\r\nTesting Service 8: Trigger Completion Reply")) actionId = struct.pack(">I", 666) directCommand = objectId + actionId - command = PUSTelecommand(service=8, subservice=128, SSC=820, data=directCommand) + command = PusTelecommand(service=8, subservice=128, SSC=820, data=directCommand) tcQueue.appendleft(command.packCommandTuple()) - # Direct command which triggers data reply + # Direct command which triggers _tm_data reply tcQueue.appendleft(("print", "\r\nTesting Service 8: Trigger Data Reply")) actionId = bytearray([0xC0, 0xC0, 0xBA, 0xBE]) commandParam1 = bytearray([0xBA, 0xB0]) commandParam2 = bytearray([0x00, 0x00, 0x00, 0x52, 0x4F, 0x42, 0x49, 0x4E]) directCommand = objectId + actionId + commandParam1 + commandParam2 - command = PUSTelecommand(service=8, subservice=128, SSC=830, data=directCommand) + command = PusTelecommand(service=8, subservice=128, SSC=830, data=directCommand) tcQueue.appendleft(command.packCommandTuple()) # Direct command which triggers an additional step reply and one completion reply tcQueue.appendleft(("print", "\r\nTesting Service 8: Trigger Step and Completion Reply")) actionId = bytearray([0xBA, 0xDE, 0xAF, 0xFE]) directCommand = objectId + actionId - command = PUSTelecommand(service=8, subservice=128, SSC=840, data=directCommand) + command = PusTelecommand(service=8, subservice=128, SSC=840, data=directCommand) tcQueue.appendleft(command.packCommandTuple()) tcQueue.appendleft(("wait", 2)) tcQueue.appendleft(("print", "\r")) diff --git a/tc/OBSW_TcPacket.py b/tc/obsw_pus_tc_base.py similarity index 81% rename from tc/OBSW_TcPacket.py rename to tc/obsw_pus_tc_base.py index 3015e2f55ef1178e2a3336b758061510ae001de3..6b4858558bc94e83c485bf7a8f1b8d4a11a116f9 100644 --- a/tc/OBSW_TcPacket.py +++ b/tc/obsw_pus_tc_base.py @@ -10,24 +10,25 @@ import logging from typing import TypeVar, Dict, Union, Tuple, Deque PusTcT = TypeVar('PusTcT', bound='PUSTelecommand') -pusTcInfoT = Union[Dict[str, any], None] -pusTcTupleT = Tuple[bytearray, pusTcInfoT] -tcAuxiliaryTupleT = Tuple[str, any] +PusTcInfoT = Union[Dict[str, any], None] +PusTcTupleT = Tuple[bytearray, PusTcInfoT] +TcAuxiliaryTupleT = Tuple[str, any] # Union: Type can be either of those 2 -tcQueueEntryT = Union[tcAuxiliaryTupleT, pusTcTupleT] -tcQueueT = Deque[tcQueueEntryT] -pusTcInfoQueueT = Deque[pusTcInfoT] +TcQueueEntryT = Union[TcAuxiliaryTupleT, PusTcTupleT] +TcQueueT = Deque[TcQueueEntryT] +PusTcInfoQueueT = Deque[PusTcInfoT] -class PUSTelecommand: +class PusTelecommand: headerSize = 6 - def __init__(self, service: int, subservice: int, SSC=0, data=bytearray([]), sourceId: int = 0, version: int = 0, - packetType: int = 1, dataFieldHeaderFlag: int = 1, apid: int = 0x73, length: int = 0): - self.packetId = [0x0, 0x0] - self.packetId[0] = ((version << 5) & 0xE0) | ((packetType & 0x01) << 4) | \ - ((dataFieldHeaderFlag & 0x01) << 3) | ((apid & 0x700) >> 8) - self.packetId[1] = apid & 0xFF + def __init__(self, service: int, subservice: int, SSC=0, data=bytearray([]), sourceId: int = 0, + version: int = 0, packetType: int = 1, dataFieldHeaderFlag: int = 1, + apid: int = 0x73, length: int = 0): + self.packet_id = [0x0, 0x0] + self.packet_id[0] = ((version << 5) & 0xE0) | ((packetType & 0x01) << 4) | \ + ((dataFieldHeaderFlag & 0x01) << 3) | ((apid & 0x700) >> 8) + self.packet_id[1] = apid & 0xFF self.ssc = SSC self.psc = (SSC & 0x3FFF) | (0xC0 << 8) self.length = length @@ -36,21 +37,20 @@ class PUSTelecommand: self.subservice = subservice self.sourceId = sourceId self.data = data - # print("Apid:" + str(self.apid)) def getLength(self) -> int: try: length = 4 + len(self.data) + 1 return length except TypeError: - print("OBSW_TcPacket: Invalid type of data") + print("OBSW_TcPacket: Invalid service_type of data") logging.exception("Error") exit() def pack(self) -> bytearray: dataToPack = bytearray() - dataToPack.append(self.packetId[0]) - dataToPack.append(self.packetId[1]) + dataToPack.append(self.packet_id[0]) + dataToPack.append(self.packet_id[1]) dataToPack.append((self.psc & 0xFF00) >> 8) dataToPack.append(self.psc & 0xFF) length = self.getLength() @@ -68,17 +68,17 @@ class PUSTelecommand: dataToPack.append(crc & 0xFF) return dataToPack - def packInformation(self) -> pusTcInfoT: + def packInformation(self) -> PusTcInfoT: tcInformation = { - "G_SERVICE": self.service, + "service": self.service, "subservice": self.subservice, "ssc": self.ssc, - "packetId": self.packetId, + "packet_id": self.packet_id, "data": self.data } return tcInformation - def packCommandTuple(self) -> pusTcTupleT: + def packCommandTuple(self) -> PusTcTupleT: commandTuple = (self.pack(), self.packInformation()) return commandTuple @@ -113,6 +113,8 @@ def generateCRC(data: bytearray) -> bytearray: dataWithCRC.append(crc & 0xFF) return dataWithCRC +# pylint: disable=line-too-long + # Structure of a PUS TC Packet : # A PUS packet consists of consecutive bits, the allocation and structure is standardised. # Extended information can be found in ECSS-E-70-41A on p.42 @@ -148,4 +150,3 @@ def generateCRC(data: bytearray) -> bytearray: # 0 1 1111 | 17 | 1 | | | | | | | # # - The source ID is present as one byte. For now, ground = 0x00. - diff --git a/tc/OBSW_TcPacker.py b/tc/obsw_pus_tc_packer.py similarity index 79% rename from tc/OBSW_TcPacker.py rename to tc/obsw_pus_tc_packer.py index 2bd476521d4ac6df437d5e48a905598aad985f6b..9675f147ed151030da6ef9bf5e53c2be47f5be7c 100644 --- a/tc/OBSW_TcPacker.py +++ b/tc/obsw_pus_tc_packer.py @@ -13,7 +13,7 @@ Run script with -s <Service or Device> -m 3 with optional -p parameter for file """ import os -from tc.OBSW_TcPacket import PUSTelecommand, tcQueueT +from tc.obsw_pus_tc_base import PusTelecommand, TcQueueT from tc.OBSW_TcService2 import packService2TestInto from tc.OBSW_TcService3 import packService3TestInto from tc.OBSW_TcService8 import packService8TestInto @@ -24,7 +24,7 @@ from collections import deque from typing import Union -def serviceTestSelect(service: Union[int, str], serviceQueue: tcQueueT) -> tcQueueT: +def service_test_select(service: Union[int, str], serviceQueue: TcQueueT) -> TcQueueT: if service == 2: return packService2TestInto(serviceQueue) elif service == 3: @@ -56,30 +56,30 @@ def serviceTestSelect(service: Union[int, str], serviceQueue: tcQueueT) -> tcQue exit() -def packService5TestInto(tcQueue: tcQueueT) -> tcQueueT: +def packService5TestInto(tcQueue: TcQueueT) -> TcQueueT: tcQueue.appendleft(("print", "Testing Service 5")) # disable events tcQueue.appendleft(("print", "\r\nTesting Service 5: Disable event")) - command = PUSTelecommand(service=5, subservice=6, SSC=500) + command = PusTelecommand(service=5, subservice=6, SSC=500) tcQueue.appendleft(command.packCommandTuple()) # trigger event tcQueue.appendleft(("print", "\r\nTesting Service 5: Trigger event")) - command = PUSTelecommand(service=17, subservice=128, SSC=510) + command = PusTelecommand(service=17, subservice=128, SSC=510) tcQueue.appendleft(command.packCommandTuple()) # enable event tcQueue.appendleft(("print", "\r\nTesting Service 5: Enable event")) - command = PUSTelecommand(service=5, subservice=5, SSC=520) + command = PusTelecommand(service=5, subservice=5, SSC=520) tcQueue.appendleft(command.packCommandTuple()) # trigger event tcQueue.appendleft(("print", "\r\nTesting Service 5: Trigger another event")) - command = PUSTelecommand(service=17, subservice=128, SSC=530) + command = PusTelecommand(service=17, subservice=128, SSC=530) tcQueue.appendleft(command.packCommandTuple()) tcQueue.appendleft(("print", "\r")) tcQueue.appendleft(("export", "log/tmtc_log_service5.txt")) return tcQueue -def packService9TestInto(tcQueue: tcQueueT) -> tcQueueT: +def packService9TestInto(tcQueue: TcQueueT) -> TcQueueT: tcQueue.appendleft(("print", "Testing Service 9")) timeTestASCIICodeA = '2019-08-30T20:50:33.892429Z' + '\0' timeTestASCIICodeB = '2019-270T05:50:33.002000Z' + '\0' @@ -92,13 +92,13 @@ def packService9TestInto(tcQueue: tcQueueT) -> tcQueueT: print("Time Code 3 :" + str(timeTest3) + "\r") # time setting tcQueue.appendleft(("print", "\r\nTesting Service 9: Testing timecode A")) - command = PUSTelecommand(service=9, subservice=128, SSC=900, data=timeTest1) + command = PusTelecommand(service=9, subservice=128, SSC=900, data=timeTest1) tcQueue.appendleft(command.packCommandTuple()) tcQueue.appendleft(("print", "\r\nTesting Service 9: Testing timecode B")) - command = PUSTelecommand(service=9, subservice=128, SSC=910, data=timeTest2) + command = PusTelecommand(service=9, subservice=128, SSC=910, data=timeTest2) tcQueue.appendleft(command.packCommandTuple()) tcQueue.appendleft(("print", "\r\nTesting Service 9: Testing timecode Current Time")) - command = PUSTelecommand(service=9, subservice=128, SSC=920, data=timeTest3) + command = PusTelecommand(service=9, subservice=128, SSC=920, data=timeTest3) tcQueue.appendleft(command.packCommandTuple()) # TODO: Add other time formats here tcQueue.appendleft(("print", "\r")) @@ -106,33 +106,33 @@ def packService9TestInto(tcQueue: tcQueueT) -> tcQueueT: return tcQueue -def packService17TestInto(tcQueue: tcQueueT) -> tcQueueT: +def packService17TestInto(tcQueue: TcQueueT) -> TcQueueT: tcQueue.appendleft(("print", "Testing Service 17")) # ping test tcQueue.appendleft(("print", "\n\rTesting Service 17: Ping Test")) - command = PUSTelecommand(service=17, subservice=1, SSC=1700) + command = PusTelecommand(service=17, subservice=1, SSC=1700) tcQueue.appendleft(command.packCommandTuple()) # enable event tcQueue.appendleft(("print", "\n\rTesting Service 17: Enable Event")) - command = PUSTelecommand(service=5, subservice=5, SSC=52) + command = PusTelecommand(service=5, subservice=5, SSC=52) tcQueue.appendleft(command.packCommandTuple()) # test event tcQueue.appendleft(("print", "\n\rTesting Service 17: Trigger event")) - command = PUSTelecommand(service=17, subservice=128, SSC=1701) + command = PusTelecommand(service=17, subservice=128, SSC=1701) tcQueue.appendleft(command.packCommandTuple()) tcQueue.appendleft(("print", "\r")) tcQueue.appendleft(("export", "log/tmtc_log_service17.txt")) return tcQueue -def packDummyDeviceTestInto(tcQueue: tcQueueT) -> tcQueueT: +def packDummyDeviceTestInto(tcQueue: TcQueueT) -> TcQueueT: tcQueue.appendleft(("print", "Testing Dummy Device")) # Object ID: Dummy Device objectId = bytearray([0x44, 0x00, 0xAF, 0xFE]) # Set On Mode tcQueue.appendleft(("print", "\n\rTesting Service Dummy: Set On")) modeData = packModeData(objectId, 1, 0) - command = PUSTelecommand(service=200, subservice=1, SSC=1, data=modeData) + command = PusTelecommand(service=200, subservice=1, SSC=1, data=modeData) tcQueue.appendleft(command.packCommandTuple()) # Test Service 2 commands tcQueue.appendleft(("print", "\n\rTesting Service Dummy: Service 2")) @@ -145,7 +145,7 @@ def packDummyDeviceTestInto(tcQueue: tcQueueT) -> tcQueueT: return tcQueue -def packGpsTestInto(objectId: bytearray, tcQueue: tcQueueT) -> tcQueueT: +def packGpsTestInto(objectId: bytearray, tcQueue: TcQueueT) -> TcQueueT: if objectId == g.GPS0_ObjectId: gpsString = "GPS0" elif objectId == g.GPS1_ObjectId: @@ -156,52 +156,52 @@ def packGpsTestInto(objectId: bytearray, tcQueue: tcQueueT) -> tcQueueT: # Set Mode Off tcQueue.appendleft(("print", "\n\rTesting " + gpsString + ": Set Off")) modeData = packModeData(objectId, 0, 0) - command = PUSTelecommand(service=200, subservice=1, SSC=11, data=modeData) + command = PusTelecommand(service=200, subservice=1, SSC=11, data=modeData) tcQueue.appendleft(command.packCommandTuple()) # Set Mode On tcQueue.appendleft(("print", "\n\rTesting " + gpsString + ": Set On")) modeData = packModeData(objectId, 1, 0) - command = PUSTelecommand(service=200, subservice=1, SSC=12, data=modeData) + command = PusTelecommand(service=200, subservice=1, SSC=12, data=modeData) tcQueue.appendleft(command.packCommandTuple()) # Enable HK report sidGps = 0 if objectId == g.GPS0_ObjectId: sidGps = g.GPS0_SID tcQueue.appendleft(("print", "\r\nTesting " + gpsString + ": Enable HK Reporting")) - command = PUSTelecommand(service=3, subservice=5, SSC=13, data=sidGps) + command = PusTelecommand(service=3, subservice=5, SSC=13, data=sidGps) tcQueue.appendleft(command.packCommandTuple()) elif objectId == g.GPS1_ObjectId: sidGps = g.GPS1_SID tcQueue.appendleft(("print", "\r\nTesting " + gpsString + ": Enable HK Reporting")) - command = PUSTelecommand(service=3, subservice=5, SSC=14, data=sidGps) + command = PusTelecommand(service=3, subservice=5, SSC=14, data=sidGps) tcQueue.appendleft(command.packCommandTuple()) # pack wait interval until mode is on and a few gps replies have been received tcQueue.appendleft(("wait", 5)) # Disable HK reporting tcQueue.appendleft(("print", "\r\nTesting Service 3: Disable " + gpsString + " definition")) - command = PUSTelecommand(service=3, subservice=6, SSC=15, data=sidGps) + command = PusTelecommand(service=3, subservice=6, SSC=15, data=sidGps) tcQueue.appendleft(command.packCommandTuple()) # Set Mode Off tcQueue.appendleft(("print", "\n\rTesting " + gpsString + ": Set Off")) modeData = packModeData(objectId, 0, 0) - command = PUSTelecommand(service=200, subservice=1, SSC=13, data=modeData) + command = PusTelecommand(service=200, subservice=1, SSC=13, data=modeData) tcQueue.appendleft(command.packCommandTuple()) tcQueue.appendleft(("print", "\r")) tcQueue.appendleft(("export", "log/tmtc_log_service_" + gpsString + ".txt")) return tcQueue -def packErrorTestingInto(tcQueue: tcQueueT) -> tcQueueT: +def packErrorTestingInto(tcQueue: TcQueueT) -> TcQueueT: # a lot of events - command = PUSTelecommand(service=17, subservice=129, SSC=2010) + command = PusTelecommand(service=17, subservice=129, SSC=2010) tcQueue.appendleft(command.packCommandTuple()) # a lot of ping testing - command = PUSTelecommand(service=17, subservice=130, SSC=2020) + command = PusTelecommand(service=17, subservice=130, SSC=2020) tcQueue.appendleft(command.packCommandTuple()) return tcQueue -def createTotalTcQueue() -> tcQueueT: +def createTotalTcQueue() -> TcQueueT: if not os.path.exists("log"): os.mkdir("log") tcQueue = deque() diff --git a/test/OBSW_PusServiceTest.py b/test/OBSW_PusServiceTest.py index 2cc98e46d025cdcf9a2a28b3588f3aa739928e49..f173e2e1e6ecf012da6b1c06debc4a5566be54af 100644 --- a/test/OBSW_PusServiceTest.py +++ b/test/OBSW_PusServiceTest.py @@ -1,7 +1,7 @@ import unittest -from test.OBSW_UnitTest import TestService, pusTmInfoQueueT -from tc.OBSW_TcPacket import pusTcInfoQueueT -from tc.OBSW_TcPacker import packService17TestInto, packService5TestInto, packService2TestInto +from test.OBSW_UnitTest import TestService, PusTmInfoQueueT +from tc.obsw_pus_tc_base import PusTcInfoQueueT +from tc.obsw_pus_tc_packer import packService17TestInto, packService5TestInto, packService2TestInto import config.OBSW_Config as g @@ -21,7 +21,7 @@ class TestService2(TestService): self.miscExpected = 4 super().performGenericAssertionTest(assertionDict) - def analyseTmInfo(self, tmInfoQueue: pusTmInfoQueueT, assertionDict: dict): + def analyseTmInfo(self, tmInfoQueue: PusTmInfoQueueT, assertionDict: dict): while not tmInfoQueue.__len__() == 0: currentTmInfo = tmInfoQueue.pop() # Tc verification scanning is generic and has been moved to the superclass @@ -60,10 +60,10 @@ class TestService5(TestService): # analyseTmInfo() and analyseTcInfo are called here super().performService5or17Test() - def analyseTcInfo(self, tcInfoQueue: pusTcInfoQueueT): + def analyseTcInfo(self, tcInfoQueue: PusTcInfoQueueT): super().analyseTcInfo(tcInfoQueue) - def analyseTmInfo(self, tmInfoQueue: pusTmInfoQueueT, assertionDict: dict): + def analyseTmInfo(self, tmInfoQueue: PusTmInfoQueueT, assertionDict: dict): super().analyseService5or17TM(tmInfoQueue, assertionDict) @classmethod @@ -102,7 +102,7 @@ class TestService17(TestService): def test_Service17(self): super().performService5or17Test() - def analyseTmTcInfo(self, tmInfoQueue: pusTmInfoQueueT, tcInfoQueue: pusTcInfoQueueT): + def analyseTmTcInfo(self, tmInfoQueue: PusTmInfoQueueT, tcInfoQueue: PusTcInfoQueueT): assertionDict = super().analyseTmTcInfo(tmInfoQueue=tmInfoQueue, tcInfoQueue=tcInfoQueue) # add anything elsee other than tc verification counter # and ssc that is needed for tm analysis @@ -111,7 +111,7 @@ class TestService17(TestService): def analyseTcInfo(self, tcInfoQueue): super().analyseTcInfo(tcInfoQueue) - def analyseTmInfo(self, tmInfoQueue: pusTmInfoQueueT, assertionDict: dict): + def analyseTmInfo(self, tmInfoQueue: PusTmInfoQueueT, assertionDict: dict): super().analyseService5or17TM(tmInfoQueue, assertionDict) @classmethod diff --git a/test/OBSW_UnitTest.py b/test/OBSW_UnitTest.py index 15d31058bf472229f0b52520aba8c4064fb0a6f0..08904c0169315389437c06a6efa3b21f2f2a2cdf 100644 --- a/test/OBSW_UnitTest.py +++ b/test/OBSW_UnitTest.py @@ -32,13 +32,13 @@ import unittest from collections import deque from typing import Deque -from tc.OBSW_TcPacker import packDummyDeviceTestInto +from tc.obsw_pus_tc_packer import packDummyDeviceTestInto from tm.obsw_tm_service_1 import pusPacketInfoService1T from abc import abstractmethod -from sendreceive.OBSW_MultipleCommandsSenderReceiver import MultipleCommandSenderReceiver +from sendreceive.obsw_multiple_commands_sender_receiver import MultipleCommandSenderReceiver from config import OBSW_Config as g -from tm.obsw_pus_tm_generic import pusTmInfoQueueT, pusTmInfoT -from tc.OBSW_TcPacket import pusTcInfoQueueT +from tm.obsw_pus_tm_base import PusTmInfoQueueT, PusTmInfoT +from tc.obsw_pus_tc_base import PusTcInfoQueueT TmInfoQueueService1T = Deque[pusPacketInfoService1T] @@ -89,7 +89,7 @@ class TestService(unittest.TestCase): assertionDict = self.analyseTmTcInfo(tmInfoQueue, tcInfoQueue) return assertionDict - def analyseTmTcInfo(self, tmInfoQueue: pusTmInfoQueueT, tcInfoQueue: pusTcInfoQueueT) -> dict: + def analyseTmTcInfo(self, tmInfoQueue: PusTmInfoQueueT, tcInfoQueue: PusTcInfoQueueT) -> dict: self.tcSscArray = [] self.tcServiceArray = [] self.tcSubserviceArray = [] @@ -127,7 +127,7 @@ class TestService(unittest.TestCase): return assertionDict - def analyseTcInfo(self, tcInfoQueue: pusTcInfoQueueT): + def analyseTcInfo(self, tcInfoQueue: PusTcInfoQueueT): while not tcInfoQueue.__len__() == 0: currentTcInfo = tcInfoQueue.pop() self.tcVerifyCounter = self.tcVerifyCounter + 1 @@ -144,7 +144,7 @@ class TestService(unittest.TestCase): # this function looks whether the tc verification SSC matched # a SSC of the sent TM - def scanForRespectiveTc(self, currentTmInfo: pusTmInfoT): + def scanForRespectiveTc(self, currentTmInfo: PusTmInfoT): currentSubservice = currentTmInfo["subservice"] for possibleIndex, searchIndex in enumerate(self.tcSscArray): if searchIndex == currentTmInfo["tcSSC"]: @@ -173,7 +173,7 @@ class TestService(unittest.TestCase): self.miscExpected = 2 self.performGenericAssertionTest(assertionDict) - def analyseService5or17TM(self, tmInfoQueue: pusTmInfoQueueT, assertionDict: dict): + def analyseService5or17TM(self, tmInfoQueue: PusTmInfoQueueT, assertionDict: dict): self.miscCounter = 0 while not tmInfoQueue.__len__() == 0: currentTmInfo = tmInfoQueue.pop() diff --git a/tm/obsw_pus_tm_base.py b/tm/obsw_pus_tm_base.py index 5817c0db820dfd7441569a6385e1f62c96160a2f..70f06ac5aaa2759184c9e25180ab9fa8e692c6e2 100644 --- a/tm/obsw_pus_tm_base.py +++ b/tm/obsw_pus_tm_base.py @@ -1,189 +1,297 @@ -# -*- coding: utf-8 -*- -""" -Created on Wed Apr 4 11:44:48 2018 -Generic PUS packet class to deserialize raw PUS telemetry. -@author: S. Gaisser -""" - -import crcmod -import datetime - - -class OBSWPusPacket: - def __init__(self, byteArray): - self.packetRaw = byteArray - self.PUSHeader = PUSPacketHeader(byteArray) - byteArray = byteArray[6:] - self.dataFieldHeader = OBSWPUSPacketDataFieldHeader(byteArray) - byteArray = byteArray[12:] - self.data = byteArray[:len(byteArray)-2] - self.crc = byteArray[len(byteArray)-2] << 8 | byteArray[len(byteArray)-1] - - def getRawPacket(self) -> bytearray: - return self.packetRaw - - def printPusPacketHeader(self, array): - self.dataFieldHeader.printDataFieldHeader(array) - self.PUSHeader.printPusPacketHeader(array) - - def printPusPacketHeaderColumnHeaders(self, array): - self.dataFieldHeader.printDataFieldHeaderColumnHeader(array) - self.PUSHeader.printPusPacketHeaderColumnHeaders(array) - - def getPacketSize(self): - # PusHeader Size + data size - size = PUSPacketHeader.headerSize + self.PUSHeader.length + 1 - return size - - def getService(self): - return self.dataFieldHeader.type - - def getSubservice(self): - return self.dataFieldHeader.subtype - - def getSSC(self): - return self.PUSHeader.sourceSequenceCount - - def printData(self): - print(self.returnDataString()) - - def returnDataString(self): - strToPrint = "[" - for byte in self.data: - strToPrint += str(hex(byte)) + " , " - strToPrint = strToPrint.rstrip(' , ') - strToPrint += ']' - return strToPrint - - -class PUSPacketHeader: - headerSize = 6 - - def __init__(self, bytesArray): - dataToCheck = bytesArray - if len(bytesArray) < PUSPacketHeader.headerSize: - self.version = 0 - self.type = 0 - self.dataFieldHeaderFlag = 0 - self.apid = 0 - self.segmentationFlag = 0 - self.sourceSequenceCount = 0 - self.length = 0 - self.valid = False - return - self.version = bytesArray[0] >> 5 - self.type = bytesArray[0] & 0x10 - self.dataFieldHeaderFlag = (bytesArray[0] & 0x8) >> 3 - self.apid = ((bytesArray[0] & 0x7) << 8) | bytesArray[1] - self.segmentationFlag = (bytesArray[2] & 0xC0) >> 6 - self.sourceSequenceCount = ((bytesArray[2] & 0x3F) << 8) | bytesArray[3] - self.length = bytesArray[4] << 8 | bytesArray[5] - self.valid = False - crc_func = crcmod.mkCrcFun(0x11021, rev=False, initCrc=0xFFFF, xorOut=0x0000) - if len(dataToCheck) < ((self.length+1)+PUSPacketHeader.headerSize): - print("Invalid packet length") - return - dataToCheck = dataToCheck[0:(self.length+1)+PUSPacketHeader.headerSize] - crc = crc_func(dataToCheck) - if crc == 0: - self.valid = True - else: - print("Invalid CRC detected !") - - def printPusPacketHeader(self, array): - array.append(str(chr(self.apid))) - array.append(str(self.sourceSequenceCount)) - if self.valid: - array.append("Yes") - else: - array.append("No") - # array.append(str(self.valid)) - - @staticmethod - def printPusPacketHeaderColumnHeaders(array): - array.append("APID") - array.append("SSC") - array.append("Packet Valid") - - -class OBSWTimestamp: - def __init__(self, byteArray): - # pField = byteArray[0] - byteArray = byteArray[1:] - self.days = ((byteArray[0] << 8) | (byteArray[1])) - 4383 - self.seconds = self.days * (24*60*60) - sDay = ((byteArray[2] << 24) | (byteArray[3] << 16) | - (byteArray[4]) << 8 | byteArray[5])/1000 - self.seconds += sDay - self.time = self.seconds - self.datetime = str(datetime.datetime. - utcfromtimestamp(self.time).strftime("%Y-%m-%d %H:%M:%S.%f")) - - def printTime(self, array): - array.append(self.time) - array.append(self.datetime) - - @staticmethod - def printTimeHeaders(array): - array.append("OBSWTime (s)") - array.append("Time") - - -class OBSWPUSPacketDataFieldHeader: - def __init__(self, bytesArray): - self.pusVersionAndAckByte = (bytesArray[0] & 0x70) >> 4 - self.type = bytesArray[1] - self.subtype = bytesArray[2] - self.subcounter = bytesArray[3] - self.time = OBSWTimestamp(bytesArray[4:13]) - - def printDataFieldHeader(self, array): - array.append(str(self.type)) - array.append(str(self.subtype)) - array.append(str(self.subcounter)) - - self.time.printTime(array) - - def printDataFieldHeaderColumnHeader(self, array): - array.append("Service") - array.append("Subservice") - array.append("Subcounter") - self.time.printTimeHeaders(array) - - -# Structure of a PUS Packet : -# A PUS packet consists of consecutive bits, the allocation and structure is standardised. -# Extended information can be found in ECSS-E-70-41A on p.42 -# The easiest form to send a PUS Packet is in hexadecimal form. -# A two digit hexadecimal number equals one byte, 8 bits or one octet -# o = optional, Srv = Service -# -# The structure is shown as follows for TM[17,2] -# 1. Structure Header -# 2. Structure Subheader -# 3. Component (size in bits) -# 4. Hexadecimal number -# 5. Binary Number -# 6. Decimal Number -# -# -------------------------------------------Packet Header(48)------------------------------------------| Packet | -# ----------------Packet ID(16)----------------------|Packet Sequence Control (16)| Packet Length (16) | Data Field | -# Version | Type(1) |Data Field |APID(11) | SequenceFlags(2) |Sequence | | (Variable) | -# Number(3) | |Header Flag(1)| | |Count(14)| | | -# 0x18 | 0x73 | 0xc0 | 0x19 | 0x00 | 0x04 | | -# 000 1 0 000| 01110011 | 11 000000 | 00011001|00000000 | 0000100 | | -# 0 | 1 | 0 | 115(ASCII s) | 3 | 25 | 0 | 4 | | -# -# - Packet Length is an unsigned integer C = Number of Octets in Packet Data Field - 1 -# -# Packet Data Field Structure: -# -# ------------------------------------------------Packet Data Field------------------------------------------------- | -# ---------------------------------Data Field Header ---------------------------|AppData|Spare| PacketErrCtr | -# CCSDS(1)|TC PUS Ver.(3)|Ack(4)|SrvType (8)|SrvSubtype(8)|Source ID(o)|Spare(o)| (var)|(var)| (16) | -# 0x11 (0x1F) | 0x11 | 0x01 | | | | | Calc. | Calc. | -# 0 001 1111 |00010001 | 00000001 | | | | | | | -# 0 1 1111 | 17 | 2 | | | | | | | -# -# - The source ID is present as one byte. Is it necessary? For now, ground = 0x00. - +""" +@brief: Generic PUS packet class to deserialize raw PUS telemetry. +@date: 09.04.2020 +@author: R.Mueller, S. Gaisser +""" +import datetime +from typing import TypeVar, Dict, Tuple, Deque, List, Final + +from crcmod import crcmod + + +PusTmT = TypeVar('PusTmT', bound='PUSTelemetry') +PusTmInfoT = Dict[str, any] +PusTmTupleT = Tuple[PusTmInfoT, PusTmT] + +PusTmQueueT = Deque[PusTmT] +PusTmListT = List[PusTmT] +PusTmInfoQueueT = Deque[PusTmInfoT] +PusTmTupleQueueT = Deque[PusTmTupleT] + + +class PusTelemetry: + """ + Generic PUS telemetry class. It is instantiated by passing the raw pus telemetry packet + (bytearray) to the constructor. It automatically deserializes the packet, exposing + various packet fields via getter functions. + """ + def __init__(self, byte_array: bytearray): + self._packet_raw: Final = byte_array + self._pus_header: Final = PusPacketHeader(byte_array) + byte_array = byte_array[6:] + self._data_field_header: Final = ObswPusPacketDataFieldHeader(byte_array) + byte_array = byte_array[12:] + self._tm_data: Final = byte_array[:len(byte_array) - 2] + self._crc: Final = byte_array[len(byte_array) - 2] << 8 | byte_array[len(byte_array) - 1] + self.print_info = "" + + def get_service(self): + """ + :return: Service ID + """ + return self._data_field_header.service_type + + def get_subservice(self): + """ + :return: Subservice ID + """ + return self._data_field_header.service_subtype + + def get_tm_data(self) -> bytearray: + """ + :return: TM application data (raw) + """ + return self._tm_data + + def pack_tm_information(self) -> PusTmInfoT: + """ + Packs important TM information needed for tests in a convenient dictionary + :return: TM dictionary + """ + tm_information = { + "service": self.get_service(), + "subservice": self.get_subservice(), + "ssc": self.get_ssc(), + "data": self._tm_data, + "crc": self._crc, + "valid": self._pus_header.valid + } + return tm_information + + def specify_packet_info(self, print_info: str): + """ + Caches a print information string for later printing + :param print_info: + :return: + """ + self.print_info = print_info + + def append_packet_info(self, print_info: str): + """ + Similar to the function above, but appends to the existing information string. + :param print_info: + :return: + """ + self.print_info = self.print_info + print_info + + def append_telemetry_content(self, content_list: list): + """ + Default implementation adds the PUS header content to the list which can then be + printed with a simple print() command. To add additional content, override this method + (don't forget to still call this function with super() if the header is required) + :param content_list: Header content will be appended to this list + :return: + """ + self._data_field_header.append_data_field_header(content_list) + self._pus_header.append_pus_packet_header(content_list) + + def append_telemetry_column_headers(self, header_list: list): + """ + Default implementation adds the PUS header content header (confusing, I know) + to the list which can then be printed with a simple print() command. + To add additional headers, override this method + (don't forget to still call this function with super() if the header is required) + :param header_list: Header content will be appended to this list + :return: + """ + self._data_field_header.append_data_field_header_column_header(header_list) + self._pus_header.append_pus_packet_header_column_headers(header_list) + + def get_raw_packet(self) -> bytearray: + """ + Get the whole TM packet as a bytearray (raw) + :return: TM packet + """ + return self._packet_raw + + def get_packet_size(self) -> int: + """ + :return: Size of the TM packet + """ + # PusHeader Size + _tm_data size + size = PusPacketHeader.PUS_HEADER_SIZE + self._pus_header.length + 1 + return size + + def get_ssc(self) -> int: + """ + Get the source sequence count + :return: Source Sequence Count (see below, or PUS documentation) + """ + return self._pus_header.source_sequence_count + + def return_data_string(self) -> str: + """ + Returns the TM data in a clean printable string format + :return: + """ + str_to_print = "[" + for byte in self._tm_data: + str_to_print += str(hex(byte)) + " , " + str_to_print = str_to_print.rstrip(',', ) + str_to_print += ']' + return str_to_print + + def print_data(self): + """ + Prints the TM data in a clean format + :return: + """ + print(self.return_data_string()) + + +# pylint: disable=too-many-instance-attributes +class PusPacketHeader: + """ + This class unnpacks the PUS packet header (see PUS structure below or PUS documentation) + """ + PUS_HEADER_SIZE = 6 + + def __init__(self, pus_packet_raw: bytearray): + data_to_check = pus_packet_raw + if len(pus_packet_raw) < PusPacketHeader.PUS_HEADER_SIZE: + self.version = 0 + self.type = 0 + self.data_field_header_flag = 0 + self.apid = 0 + self.segmentation_flag = 0 + self.source_sequence_count = 0 + self.length = 0 + self.valid = False + return + self.version = pus_packet_raw[0] >> 5 + self.type = pus_packet_raw[0] & 0x10 + self.data_field_header_flag = (pus_packet_raw[0] & 0x8) >> 3 + self.apid = ((pus_packet_raw[0] & 0x7) << 8) | pus_packet_raw[1] + self.segmentation_flag = (pus_packet_raw[2] & 0xC0) >> 6 + self.source_sequence_count = ((pus_packet_raw[2] & 0x3F) << 8) | pus_packet_raw[3] + self.length = pus_packet_raw[4] << 8 | pus_packet_raw[5] + self.valid = False + crc_func = crcmod.mkCrcFun(0x11021, rev=False, initCrc=0xFFFF, xorOut=0x0000) + if len(data_to_check) < ((self.length + 1) + PusPacketHeader.PUS_HEADER_SIZE): + print("Invalid packet length") + return + data_to_check = data_to_check[0:(self.length + 1) + PusPacketHeader.PUS_HEADER_SIZE] + crc = crc_func(data_to_check) + if crc == 0: + self.valid = True + else: + print("Invalid CRC detected !") + + def append_pus_packet_header(self, array): + array.append(str(chr(self.apid))) + array.append(str(self.source_sequence_count)) + if self.valid: + array.append("Yes") + else: + array.append("No") + # header_list.append(str(self.valid)) + + @staticmethod + def append_pus_packet_header_column_headers(array): + array.append("APID") + array.append("SSC") + array.append("Packet Valid") + + +class ObswPusPacketDataFieldHeader: + def __init__(self, bytesArray): + self.pus_version_and_ack_byte = (bytesArray[0] & 0x70) >> 4 + self.service_type = bytesArray[1] + self.service_subtype = bytesArray[2] + self.subcounter = bytesArray[3] + self.time = ObswTimestamp(bytesArray[4:13]) + + def append_data_field_header(self, content_list: list): + """ + Append important data field header parameters to the passed content list. + :param content_list: + :return: + """ + content_list.append(str(self.service_type)) + content_list.append(str(self.service_subtype)) + content_list.append(str(self.subcounter)) + self.time.print_time(content_list) + + def append_data_field_header_column_header(self, header_list: list): + """ + Append important data field header column headers to the passed list. + :param header_list: + :return: + """ + header_list.append("Service") + header_list.append("Subservice") + header_list.append("Subcounter") + self.time.print_time_headers(header_list) + + +class ObswTimestamp: + """ + Unpacks the time datafield of the TM packet. + """ + def __init__(self, byteArray): + # pField = byte_array[0] + byteArray = byteArray[1:] + self.days = ((byteArray[0] << 8) | (byteArray[1])) - 4383 + self.seconds = self.days * (24 * 60 * 60) + s_day = ((byteArray[2] << 24) | (byteArray[3] << 16) | + (byteArray[4]) << 8 | byteArray[5]) / 1000 + self.seconds += s_day + self.time = self.seconds + self.datetime = str(datetime.datetime. + utcfromtimestamp(self.time).strftime("%Y-%m-%d %H:%M:%S.%f")) + + def print_time(self, array): + array.append(self.time) + array.append(self.datetime) + + @staticmethod + def print_time_headers(array): + array.append("OBSWTime (s)") + array.append("Time") + +# pylint: disable=line-too-long +# Structure of a PUS Packet : +# A PUS packet consists of consecutive bits, the allocation and structure is standardised. +# Extended information can be found in ECSS-E-70-41A on p.42 +# The easiest form to send a PUS Packet is in hexadecimal form. +# A two digit hexadecimal number equals one byte, 8 bits or one octet +# o = optional, Srv = Service +# +# The structure is shown as follows for TM[17,2] +# 1. Structure Header +# 2. Structure Subheader +# 3. Component (size in bits) +# 4. Hexadecimal number +# 5. Binary Number +# 6. Decimal Number +# +# -------------------------------------------Packet Header(48)------------------------------------------| Packet | +# ----------------Packet ID(16)----------------------|Packet Sequence Control (16)| Packet Length (16) | Data Field | +# Version | Type(1) |Data Field |APID(11) | SequenceFlags(2) |Sequence | | (Variable) | +# Number(3) | |Header Flag(1)| | |Count(14)| | | +# 0x18 | 0x73 | 0xc0 | 0x19 | 0x00 | 0x04 | | +# 000 1 0 000| 01110011 | 11 000000 | 00011001|00000000 | 0000100 | | +# 0 | 1 | 0 | 115(ASCII s) | 3 | 25 | 0 | 4 | | +# +# - Packet Length is an unsigned integer C = Number of Octets in Packet Data Field - 1 +# +# Packet Data Field Structure: +# +# ------------------------------------------------Packet Data Field------------------------------------------------- | +# ---------------------------------Data Field Header ---------------------------|AppData|Spare| PacketErrCtr | +# CCSDS(1)|TC PUS Ver.(3)|Ack(4)|SrvType (8)|SrvSubtype(8)|Source ID(o)|Spare(o)| (var)|(var)| (16) | +# 0x11 (0x1F) | 0x11 | 0x01 | | | | | Calc. | Calc. | +# 0 001 1111 |00010001 | 00000001 | | | | | | | +# 0 1 1111 | 17 | 2 | | | | | | | +# +# - The source ID is present as one byte. Is it necessary? For now, ground = 0x00. diff --git a/tm/obsw_pus_tm_factory.py b/tm/obsw_pus_tm_factory.py index f06330182d8b64d74a4a14ba630cd4d218959b12..5a1c4642445d6a0396ab5e3ff6700684df3694d0 100644 --- a/tm/obsw_pus_tm_factory.py +++ b/tm/obsw_pus_tm_factory.py @@ -2,114 +2,120 @@ """ Program: obsw_pus_tm_factory.py Date: 01.11.2019 -Description: Deserialize TM byte array into PUS TM Class +Description: Deserialize TM byte header_list into PUS TM Class Author: R.Mueller, S. Gaisser """ -from tm.obsw_pus_tm_generic import PUSTelemetry +from tm.obsw_pus_tm_base import PusTelemetry from tm.obsw_tm_service_1 import Service1TM from tm.obsw_tm_service_3 import Service3TM from tm.obsw_tm_service_5 import Service5TM import struct -def PUSTelemetryFactory(rawPacket): - servicetype = rawPacket[7] - # extract G_SERVICE type from data - if servicetype == 1: - return Service1TM(rawPacket) - elif servicetype == 2: - return Service2TM(rawPacket) - elif servicetype == 3: - return Service3TM(rawPacket) - elif servicetype == 5: - return Service5TM(rawPacket) - elif servicetype == 8: - return Service8TM(rawPacket) - elif servicetype == 17: - return Service17TM(rawPacket) - elif servicetype == 200: - return Service200TM(rawPacket) +def PusTelemetryFactory(raw_packet: bytearray): + """ + Returns a PusTelemetry class instance by extracting the service directly from the raw packet. + Sneaky solution allows this function to immediately return + the right telemetry packet + :param raw_packet: + :return: + """ + service_type = raw_packet[7] + if service_type == 1: + return Service1TM(raw_packet) + elif service_type == 2: + return Service2TM(raw_packet) + elif service_type == 3: + return Service3TM(raw_packet) + elif service_type == 5: + return Service5TM(raw_packet) + elif service_type == 8: + return Service8TM(raw_packet) + elif service_type == 17: + return Service17TM(raw_packet) + elif service_type == 200: + return Service200TM(raw_packet) else: - print("The G_SERVICE " + str(servicetype) + " is not implemented in Telemetry Factory") - return PUSTelemetry(rawPacket) + print("The service " + str(service_type) + " is not implemented in Telemetry Factory") + return PusTelemetry(raw_packet) -class Service2TM(PUSTelemetry): +class Service2TM(PusTelemetry): def __init__(self, byteArray): super().__init__(byteArray) - def print_telemetry_header(self, array): - super().print_telemetry_header(array) + def append_telemetry_content(self, array): + super().append_telemetry_content(array) return - def print_telemetry_column_headers(self, array): - super().print_telemetry_column_headers(array) + def append_telemetry_column_headers(self, array): + super().append_telemetry_column_headers(array) return -class Service8TM(PUSTelemetry): +class Service8TM(PusTelemetry): def __init__(self, byteArray): super().__init__(byteArray) - def print_telemetry_header(self, array): - super().print_telemetry_header(array) + def append_telemetry_content(self, array): + super().append_telemetry_content(array) return - def print_telemetry_column_headers(self, array): - super().print_telemetry_column_headers(array) + def append_telemetry_column_headers(self, array): + super().append_telemetry_column_headers(array) return -class Service9TM(PUSTelemetry): +class Service9TM(PusTelemetry): def __init__(self, byteArray): super().__init__(byteArray) - def print_telemetry_header(self, array): - super().print_telemetry_header(array) + def append_telemetry_content(self, array): + super().append_telemetry_content(array) return - def print_telemetry_column_headers(self, array): - super().print_telemetry_column_headers(array) + def append_telemetry_column_headers(self, array): + super().append_telemetry_column_headers(array) return -class Service17TM(PUSTelemetry): +class Service17TM(PusTelemetry): def __init__(self, byteArray): super().__init__(byteArray) - self.print_packet_info("Test Reply") + self.specify_packet_info("Test Reply") - def print_telemetry_header(self, array): - super().print_telemetry_header(array) + def append_telemetry_content(self, array): + super().append_telemetry_content(array) return - def print_telemetry_column_headers(self, array): - super().print_telemetry_column_headers(array) + def append_telemetry_column_headers(self, array): + super().append_telemetry_column_headers(array) return -class Service200TM(PUSTelemetry): +class Service200TM(PusTelemetry): def __init__(self, byteArray): super().__init__(byteArray) self.isCantReachModeReply = False self.isModeReply = False - self.print_packet_info("Mode Reply") - self.objectId = struct.unpack('>I', self.byteArrayData[0:4])[0] - if self.dataFieldHeader.subtype == 7: + self.specify_packet_info("Mode Reply") + self.objectId = struct.unpack('>I', self._tm_data[0:4])[0] + if self.get_subservice() == 7: self.append_packet_info(": Can't reach mode") self.isCantReachModeReply = True - self.returnValue = self.byteArrayData[4] << 8 | self.byteArrayData[5] - elif self.dataFieldHeader.subtype == 6 or self.dataFieldHeader.subtype == 8: + self.returnValue = self._tm_data[4] << 8 | self._tm_data[5] + elif self.get_subservice() == 6 or self.get_subservice() == 8: self.isModeReply = True - if self.dataFieldHeader.subtype == 8: + if self.get_subservice() == 8: self.append_packet_info(": Wrong Mode") - elif self.dataFieldHeader.subtype == 6: + elif self.get_subservice() == 6: self.append_packet_info(": Mode reached") - self.mode = struct.unpack('>I', self.byteArrayData[4:8])[0] - self.submode = self.byteArrayData[8] + self.mode = struct.unpack('>I', self._tm_data[4:8])[0] + self.submode = self._tm_data[8] - def print_telemetry_header(self, array): - super().print_telemetry_header(array) + def append_telemetry_content(self, array): + super().append_telemetry_content(array) array.append(hex(self.objectId)) if self.isCantReachModeReply: array.append(hex(self.returnValue)) @@ -118,8 +124,8 @@ class Service200TM(PUSTelemetry): array.append(str(self.submode)) return - def print_telemetry_column_headers(self, array): - super().print_telemetry_column_headers(array) + def append_telemetry_column_headers(self, array): + super().append_telemetry_column_headers(array) array.append("Object ID") if self.isCantReachModeReply: array.append("Return Value") diff --git a/tm/obsw_pus_tm_generic.py b/tm/obsw_pus_tm_generic.py deleted file mode 100644 index af8376b54ec47da876326ecb58ddbda714835b1d..0000000000000000000000000000000000000000 --- a/tm/obsw_pus_tm_generic.py +++ /dev/null @@ -1,44 +0,0 @@ -from tm.obsw_pus_tm_base import OBSWPusPacket -from typing import TypeVar, Dict, Tuple, Deque, List - -PusTmT = TypeVar('PusTmT', bound='PUSTelemetry') -pusTmInfoT = Dict[str, any] -pusTmTupleT = Tuple[pusTmInfoT, PusTmT] - -pusTmQueueT = Deque[PusTmT] -pusTmListT = List[PusTmT] -pusTmInfoQueueT = Deque[pusTmInfoT] -pusTmTupleQueueT = Deque[pusTmTupleT] - - -class PUSTelemetry(OBSWPusPacket): - def __init__(self, byteArray): - super().__init__(byteArray) - self.byteArrayData = self.data - self.printInfo = "" - - def unpack_telemetry(self): - pass - - def print_packet_info(self, printInfo): - self.printInfo = printInfo - - def append_packet_info(self, printInfo): - self.printInfo = self.printInfo + printInfo - - def print_telemetry_header(self, array): - super().printPusPacketHeader(array) - - def print_telemetry_column_headers(self, array): - super().printPusPacketHeaderColumnHeaders(array) - - def pack_tm_information(self) -> pusTmInfoT: - tmInformation = { - "service": self.getService(), - "subservice": self.getSubservice(), - "ssc": self.getSSC(), - "data": self.byteArrayData, - "crc": self.crc, - "valid": self.PUSHeader.valid - } - return tmInformation diff --git a/tm/obsw_tm_service_1.py b/tm/obsw_tm_service_1.py index 9ba912dec4a845ddfd36efec21d43ae4ac756b80..9b5b7ab7ff25cb205a4c75e12319ffa3d5b5b9f4 100644 --- a/tm/obsw_tm_service_1.py +++ b/tm/obsw_tm_service_1.py @@ -5,45 +5,45 @@ Date: 30.12.2019 Description: Deserialize Pus Verification TM Author: R. Mueller """ -from tm.obsw_pus_tm_generic import PUSTelemetry +from tm.obsw_pus_tm_base import PusTelemetry from typing import Dict import struct pusPacketInfoService1T = Dict[str, any] -class Service1TM(PUSTelemetry): +class Service1TM(PusTelemetry): def __init__(self, byteArray): super().__init__(byteArray) self.tcErrorCode = False self.isStep = False # Failure Reports with error code - self.tcPacketId = self.byteArrayData[0] << 8 | self.byteArrayData[1] - self.tcSSC = ((self.byteArrayData[2] & 0x3F) << 8) | self.byteArrayData[3] - self.print_packet_info("Success Verification") - if self.dataFieldHeader.subtype % 2 == 0: - self.print_packet_info("Failure Verficiation") + self.tcPacketId = self._tm_data[0] << 8 | self._tm_data[1] + self.tcSSC = ((self._tm_data[2] & 0x3F) << 8) | self._tm_data[3] + self.specify_packet_info("Success Verification") + if self.get_subservice() % 2 == 0: + self.specify_packet_info("Failure Verficiation") self.tcErrorCode = True - if self.dataFieldHeader.subtype == 6: + if self._data_field_header.subtype == 6: self.isStep = True self.append_packet_info(" : Step Failure") - self.stepNumber = struct.unpack('>B', self.byteArrayData[4:5])[0] - self.ErrCode = struct.unpack('>H', self.byteArrayData[5:7])[0] - self.errorParam1 = struct.unpack('>I', self.byteArrayData[7:11])[0] - self.errorParam2 = struct.unpack('>I', self.byteArrayData[11:15])[0] + self.stepNumber = struct.unpack('>B', self._tm_data[4:5])[0] + self.ErrCode = struct.unpack('>H', self._tm_data[5:7])[0] + self.errorParam1 = struct.unpack('>I', self._tm_data[7:11])[0] + self.errorParam2 = struct.unpack('>I', self._tm_data[11:15])[0] else: - self.printData() - self.ErrCode = struct.unpack('>H', self.byteArrayData[4:6])[0] - self.errorParam1 = struct.unpack('>I', self.byteArrayData[6:10])[0] - self.errorParam2 = struct.unpack('>I', self.byteArrayData[10:14])[0] + self.print_data() + self.ErrCode = struct.unpack('>H', self._tm_data[4:6])[0] + self.errorParam1 = struct.unpack('>I', self._tm_data[6:10])[0] + self.errorParam2 = struct.unpack('>I', self._tm_data[10:14])[0] - elif self.dataFieldHeader.subtype == 5: + elif self.get_subservice() == 5: self.isStep = True self.append_packet_info(" : Step Success") - self.stepNumber = struct.unpack('>B', self.byteArrayData[4:5])[0] + self.stepNumber = struct.unpack('>B', self._tm_data[4:5])[0] - def print_telemetry_header(self, array): - super().print_telemetry_header(array) + def append_telemetry_content(self, array): + super().append_telemetry_content(array) array.append(str(hex(self.tcPacketId))) array.append(str(self.tcSSC)) if self.tcErrorCode: @@ -55,8 +55,8 @@ class Service1TM(PUSTelemetry): elif self.isStep: array.append(str(self.stepNumber)) - def print_telemetry_column_headers(self, array): - super().print_telemetry_column_headers(array) + def append_telemetry_column_headers(self, array): + super().append_telemetry_column_headers(array) array.append("TC Packet ID") array.append("TC SSC") if self.tcErrorCode: diff --git a/tm/obsw_tm_service_3.py b/tm/obsw_tm_service_3.py index cb6bd0117f7aa67df06f4d4067f94cce85003e0e..b9426234beee18f5f44d284951f86dbd787c85c8 100644 --- a/tm/obsw_tm_service_3.py +++ b/tm/obsw_tm_service_3.py @@ -6,50 +6,50 @@ Description: Deserialize Housekeeping TM Author: R. Mueller """ -from tm.obsw_pus_tm_generic import PUSTelemetry +from tm.obsw_pus_tm_base import PusTelemetry from typing import TypeVar import struct PusTm3T = TypeVar('PusTm3T', bound='Service3TM') -class Service3TM(PUSTelemetry): +class Service3TM(PusTelemetry): def __init__(self, byteArray): super().__init__(byteArray) - print("Length of data: " + str(len(self.byteArrayData))) - self.sid = struct.unpack('>I', self.byteArrayData[0:4])[0] + print("Length of _tm_data: " + str(len(self._tm_data))) + self.sid = struct.unpack('>I', self._tm_data[0:4])[0] self.hkHeader = [] self.hkContent = [] self.hkDefinitionHeader = [] self.hkDefinition = [] self.numberOfParameters = 0 self.validity_buffer = [] - self.print_packet_info("Housekeeping Packet") + self.specify_packet_info("Housekeeping Packet") self.paramLength = 0 - if self.getSubservice() == 10 or self.getSubservice() == 12: + if self.get_subservice() == 10 or self.get_subservice() == 12: self.handleFillingDefinitionArrays() - if self.getSubservice() == 25 or self.getSubservice() == 26: + if self.get_subservice() == 25 or self.get_subservice() == 26: self.handleFillingHkArrays() - def print_telemetry_header(self, array): - super().print_telemetry_header(array) + def append_telemetry_content(self, array): + super().append_telemetry_content(array) array.append(hex(self.sid)) array.append(int(self.paramLength)) - def print_telemetry_column_headers(self, array): - super().print_telemetry_column_headers(array) + def append_telemetry_column_headers(self, array): + super().append_telemetry_column_headers(array) array.append("SID") array.append("HK Data Size") def handleFillingDefinitionArrays(self): self.hkHeader = ["SID", "Report Status", "Collection Interval", "Number Of IDs"] - reportingEnabled = self.byteArrayData[4] - collectionInterval = struct.unpack('>f', self.byteArrayData[5:9])[0] - numberOfParams = self.byteArrayData[9] + reportingEnabled = self._tm_data[4] + collectionInterval = struct.unpack('>f', self._tm_data[5:9])[0] + numberOfParams = self._tm_data[9] parameters = [] index2 = 1 for index in range(10, (numberOfParams * 4) + 10, 4): - parameter = struct.unpack('>I', self.byteArrayData[index:index + 4])[0] + parameter = struct.unpack('>I', self._tm_data[index:index + 4])[0] self.hkHeader.append("Pool ID " + str(index2)) parameters.append(str(hex(parameter))) index2 = index2 + 1 @@ -61,7 +61,7 @@ class Service3TM(PUSTelemetry): self.hkContent.extend(parameters) def handleFillingHkArrays(self): - self.paramLength = len(self.byteArrayData) - 4 + self.paramLength = len(self._tm_data) - 4 # TODO: This can be automated by using the MIB parser pool names and pool datatypes if self.sid == 0x1f00 or self.sid == 0x2f00: self.handleGpsHkData() @@ -73,22 +73,22 @@ class Service3TM(PUSTelemetry): self.hkHeader = ["Fix Mode", "SV in Fix", "GNSS Week", "Time of Week", "Latitude", "Longitude", "Mean Sea Altitude", "Position X", "Position Y", "Position Z", "Velocity X", "Velocity Y", "Velocity Z"] - fixMode = self.byteArrayData[4] - svInFix = self.byteArrayData[5] - gnssWeek = struct.unpack('>H', self.byteArrayData[6:8])[0] - timeOfWeek = struct.unpack('>I', self.byteArrayData[8:12])[0] - latitude = struct.unpack('>I', self.byteArrayData[12:16])[0] - longitude = struct.unpack('>I', self.byteArrayData[16:20])[0] - msa = struct.unpack('>I', self.byteArrayData[20:24])[0] - positionX = struct.unpack('>d', self.byteArrayData[24:32])[0] - positionY = struct.unpack('>d', self.byteArrayData[32:40])[0] - positionZ = struct.unpack('>d', self.byteArrayData[40:48])[0] - vx = struct.unpack('>d', self.byteArrayData[48:56])[0] - vy = struct.unpack('>d', self.byteArrayData[56:64])[0] - vz = struct.unpack('>d', self.byteArrayData[64:72])[0] + fixMode = self._tm_data[4] + svInFix = self._tm_data[5] + gnssWeek = struct.unpack('>H', self._tm_data[6:8])[0] + timeOfWeek = struct.unpack('>I', self._tm_data[8:12])[0] + latitude = struct.unpack('>I', self._tm_data[12:16])[0] + longitude = struct.unpack('>I', self._tm_data[16:20])[0] + msa = struct.unpack('>I', self._tm_data[20:24])[0] + positionX = struct.unpack('>d', self._tm_data[24:32])[0] + positionY = struct.unpack('>d', self._tm_data[32:40])[0] + positionZ = struct.unpack('>d', self._tm_data[40:48])[0] + vx = struct.unpack('>d', self._tm_data[48:56])[0] + vy = struct.unpack('>d', self._tm_data[56:64])[0] + vz = struct.unpack('>d', self._tm_data[64:72])[0] self.hkContent = [fixMode, svInFix, gnssWeek, timeOfWeek, latitude, longitude, msa, positionX, positionY, positionZ, vx, vy, vz] - self.validity_buffer = self.byteArrayData[72:] + self.validity_buffer = self._tm_data[72:] # print(self.validityBuffer) # print(str(format(self.validityBuffer[0], '#010b'))) # print(str(format(self.validityBuffer[1], '#010b'))) @@ -97,14 +97,14 @@ class Service3TM(PUSTelemetry): def handleTestHkData(self): self.numberOfParameters = 6 self.hkHeader = ["Bool", "UINT8", "UINT16", "UINT32", "FLOAT1", "FLOAT2"] - testBool = self.byteArrayData[4] - testUint8 = self.byteArrayData[5] - testUint16 = (self.byteArrayData[6] << 8) | self.byteArrayData[7] - testUint32 = struct.unpack('>I', self.byteArrayData[8:12])[0] - floatVector1 = struct.unpack('>f', self.byteArrayData[12:16])[0] - floatVector2 = struct.unpack('>f', self.byteArrayData[16:20])[0] + testBool = self._tm_data[4] + testUint8 = self._tm_data[5] + testUint16 = (self._tm_data[6] << 8) | self._tm_data[7] + testUint32 = struct.unpack('>I', self._tm_data[8:12])[0] + floatVector1 = struct.unpack('>f', self._tm_data[12:16])[0] + floatVector2 = struct.unpack('>f', self._tm_data[16:20])[0] self.hkContent = [testBool, testUint8, testUint16, testUint32, floatVector1, floatVector2] - self.validity_buffer = self.byteArrayData[20:] + self.validity_buffer = self._tm_data[20:] # print(self.validityBuffer) # print(str(format(self.validityBuffer[0], '#010b'))) # print("Validity Buffer Length: " + str(len(self.validityBuffer))) diff --git a/tm/obsw_tm_service_5.py b/tm/obsw_tm_service_5.py index 7c14811cc5e1dd22b9d7c8f736fe04df83025c2c..4df7aae5770e130bf0dcce11ef0f98d4a9e056c6 100644 --- a/tm/obsw_tm_service_5.py +++ b/tm/obsw_tm_service_5.py @@ -6,36 +6,36 @@ Description: Deserialize PUS Event Report Author: R. Mueller """ -from tm.obsw_pus_tm_generic import PUSTelemetry +from tm.obsw_pus_tm_base import PusTelemetry import struct -class Service5TM(PUSTelemetry): +class Service5TM(PusTelemetry): def __init__(self, byteArray): super().__init__(byteArray) - self.print_packet_info("Event") - if self.getSubservice() == 1: + self.specify_packet_info("Event") + if self.get_subservice() == 1: self.append_packet_info(" Info") - elif self.getSubservice() == 2: + elif self.get_subservice() == 2: self.append_packet_info(" Error Low Severity") - elif self.getSubservice() == 3: + elif self.get_subservice() == 3: self.append_packet_info(" Error Med Severity") - elif self.getSubservice() == 4: + elif self.get_subservice() == 4: self.append_packet_info(" Error High Severity") - self.eventId = struct.unpack('>H', self.byteArrayData[0:2])[0] - self.objectId = struct.unpack('>I', self.byteArrayData[2:6])[0] - self.param1 = struct.unpack('>I', self.byteArrayData[6:10])[0] - self.param2 = struct.unpack('>I', self.byteArrayData[10:14])[0] + self.eventId = struct.unpack('>H', self._tm_data[0:2])[0] + self.objectId = struct.unpack('>I', self._tm_data[2:6])[0] + self.param1 = struct.unpack('>I', self._tm_data[6:10])[0] + self.param2 = struct.unpack('>I', self._tm_data[10:14])[0] - def print_telemetry_header(self, array): - super().print_telemetry_header(array) + def append_telemetry_content(self, array): + super().append_telemetry_content(array) array.append(str(self.eventId)) array.append(hex(self.objectId)) array.append(str(hex(self.param1)) + ", " + str(self.param1)) array.append(str(hex(self.param2)) + ", " + str(self.param2)) - def print_telemetry_column_headers(self, array): - super().print_telemetry_column_headers(array) + def append_telemetry_column_headers(self, array): + super().append_telemetry_column_headers(array) array.append("Event ID") array.append("Reporter ID") array.append("Parameter 1") diff --git a/utility/obsw_args_parser.py b/utility/obsw_args_parser.py index 3ec95a74145228a08f5396196485460d9ccc5478..0da02b2c498658dbcd1e2981883bcc7cdf6f2a90 100644 --- a/utility/obsw_args_parser.py +++ b/utility/obsw_args_parser.py @@ -40,12 +40,12 @@ def parse_input_arguments(): '-o', '--tc_timeout_factor', type=float, help='TC Timeout Factor. Multiplied with ' 'TM Timeout, TC sent again after this time period. Default: 3.5', default=3.5) arg_parser.add_argument( - '-r', '--rawDataPrint', help='Supply -r to print all raw data directly', + '-r', '--rawDataPrint', help='Supply -r to print all raw _tm_data directly', action='store_true') arg_parser.add_argument( '-d', '--shortDisplayMode', help='Supply -d to print short output', action='store_true') arg_parser.add_argument( - '--hk', dest='print_hk', help='Supply -k or --hk to print HK data', action='store_true') + '--hk', dest='print_hk', help='Supply -k or --hk to print HK _tm_data', action='store_true') arg_parser.add_argument('--COM', dest="com_port", help='COM Port for serial communication') if len(sys.argv) == 1: @@ -90,7 +90,7 @@ def handle_unspecified_args(args) -> None: print("1: Listener Mode") print("2: Single Command Mode with manual command") print("3: Service Mode, Commands specified in tc folder") - print("4: Software Mode, runs all command specified in OBSW_TcPacker.py") + print("4: Software Mode, runs all command specified in obsw_pus_tc_packer.py") print("5: Unit Test, runs unit test specified in OBSW_UnitTest.py") args.mode = input("Please enter Mode: ") if args.mode == 1 and args.service is None: diff --git a/utility/obsw_tmtc_printer.py b/utility/obsw_tmtc_printer.py index 88240a8d628016d621ab20429530fc65fc6a693b..50593b1252a63268fb5364723cd651a5dcdc895e 100644 --- a/utility/obsw_tmtc_printer.py +++ b/utility/obsw_tmtc_printer.py @@ -12,9 +12,9 @@ import os import sys from typing import TypeVar from config import OBSW_Config as g -from tm.obsw_pus_tm_generic import PusTmT +from tm.obsw_pus_tm_base import PusTmT, PusTelemetry from tm.obsw_tm_service_3 import PusTm3T -from tc.OBSW_TcPacket import PusTcT, pusTcInfoT +from tc.obsw_pus_tc_base import PusTcT, PusTcInfoT TmTcPrinterT = TypeVar('TmTcPrinterT', bound='TmTcPrinter') @@ -37,7 +37,7 @@ class TmTcPrinter: self.do_print_to_file = doPrintToFile self.print_tc = printTc - def print_telemetry(self, packet: PusTmT): + def print_telemetry(self, packet: PusTelemetry): """ This function handles printing telemetry :param packet: @@ -49,51 +49,51 @@ class TmTcPrinter: self.__handle_long_print(packet) self.__handle_wiretapping_packet(packet) self.__handle_data_reply_packet(packet) - if packet.getService() == 3 and \ - (packet.getSubservice() == 25 or packet.getSubservice() == 26): + if packet.get_service() == 3 and \ + (packet.get_subservice() == 25 or packet.get_subservice() == 26): self.__handle_hk_print(packet) - if packet.getService() == 3 and \ - (packet.getSubservice() == 10 or packet.getSubservice() == 12): + if packet.get_service() == 3 and \ + (packet.get_subservice() == 10 or packet.get_subservice() == 12): self.__handle_hk_definition_print(packet) if g.G_PRINT_RAW_TM: - self.print_buffer = "TM Data:" + "\n" + self.return_data_string(packet.data) + self.print_buffer = "TM Data:" + "\n" + self.return_data_string(packet.get_tm_data()) print(self.print_buffer) self.add_print_buffer_to_file_buffer() - def __handle_short_print(self, tm_packet: PusTmT): - self.print_buffer = "Received TM[" + str(tm_packet.getService()) + "," + str( - tm_packet.getSubservice()) + "]" + def __handle_short_print(self, tm_packet: PusTelemetry): + self.print_buffer = "Received TM[" + str(tm_packet.get_service()) + "," + str( + tm_packet.get_subservice()) + "]" print(self.print_buffer) self.add_print_buffer_to_file_buffer() - def __handle_long_print(self, tm_packet: PusTmT): - self.print_buffer = "Received Telemetry: " + tm_packet.printInfo + def __handle_long_print(self, tm_packet: PusTelemetry): + self.print_buffer = "Received Telemetry: " + tm_packet.print_info print(self.print_buffer) self.add_print_buffer_to_file_buffer() self.__handle_column_header_print(tm_packet) - self.handle_tm_content_print(tm_packet) + self.__handle_tm_content_print(tm_packet) - def __handle_column_header_print(self, tm_packet: PusTmT): + def __handle_column_header_print(self, tm_packet: PusTelemetry): rec_pus = [] - tm_packet.print_telemetry_column_headers(rec_pus) + tm_packet.append_telemetry_column_headers(rec_pus) self.print_buffer = str(rec_pus) print(self.print_buffer) self.add_print_buffer_to_file_buffer() - def handle_tm_content_print(self, tm_packet: PusTmT): + def __handle_tm_content_print(self, tm_packet: PusTelemetry): """ :param tm_packet: :return: """ rec_pus = [] - tm_packet.print_telemetry_header(rec_pus) + tm_packet.append_telemetry_content(rec_pus) self.print_buffer = str(rec_pus) print(self.print_buffer) self.add_print_buffer_to_file_buffer() def __handle_hk_print(self, tm_packet: PusTmT): """ - Prints HK data previously set by TM receiver + Prints HK _tm_data previously set by TM receiver :param tm_packet: :return: """ @@ -162,23 +162,23 @@ class TmTcPrinter: :param packet: :return: """ - if packet.getService() == 2 and \ - (packet.getSubservice() == 131 or packet.getSubservice() == 130): + if packet.get_service() == 2 and \ + (packet.get_subservice() == 131 or packet.get_subservice() == 130): self.print_buffer = "Wiretapping Packet or Raw Reply from TM [" + \ - str(packet.getService()) + "," + str(packet.getSubservice()) + "]:" - self.print_buffer = self.print_buffer + self.return_data_string(packet.data) + str(packet.get_service()) + "," + str(packet.get_subservice()) + "]:" + self.print_buffer = self.print_buffer + packet.return_data_string() print(self.print_buffer) self.add_print_buffer_to_file_buffer() - def __handle_data_reply_packet(self, tm_packet): + def __handle_data_reply_packet(self, tm_packet: PusTelemetry): """ - Handles the PUS Service 8 data reply packets. + Handles the PUS Service 8 _tm_data reply packets. :param tm_packet: :return: """ - if tm_packet.getService() == 8 and tm_packet.getSubservice() == 130: - self.print_buffer = "Service 8 Direct Command Reply TM[8,130] with data: " \ - + self.return_data_string(tm_packet.data) + if tm_packet.get_service() == 8 and tm_packet.get_subservice() == 130: + self.print_buffer = "Service 8 Direct Command Reply TM[8,130] with _tm_data: " \ + + tm_packet.return_data_string() print(self.print_buffer) def print_data(self, byte_array: bytearray): @@ -250,7 +250,7 @@ class TmTcPrinter: shift_number = position + (6 - 2 * (position - 1)) return (byte >> shift_number) & 1 - def print_telecommand(self, tc_packet: PusTcT, tc_packet_info: pusTcInfoT = None): + def print_telecommand(self, tc_packet: PusTcT, tc_packet_info: PusTcInfoT = None): """ This function handles the printing of Telecommands :param tc_packet: @@ -269,26 +269,26 @@ class TmTcPrinter: else: self.__handle_long_tc_print(tc_packet_info) - def __handle_short_tc_print(self, tc_packet_info: pusTcInfoT): + def __handle_short_tc_print(self, tc_packet_info: PusTcInfoT): """ Brief TC print :param tc_packet_info: :return: """ - self.print_buffer = "Sent TC[" + str(tc_packet_info["G_SERVICE"]) + "," + \ + self.print_buffer = "Sent TC[" + str(tc_packet_info["service"]) + "," + \ str(tc_packet_info["subservice"]) + "] " + " with SSC " + \ str(tc_packet_info["ssc"]) print(self.print_buffer) self.add_print_buffer_to_file_buffer() - def __handle_long_tc_print(self, tc_packet_info: pusTcInfoT): + def __handle_long_tc_print(self, tc_packet_info: PusTcInfoT): """ Long TC print :param tc_packet_info: :return: """ try: - self.print_buffer = "Telecommand TC[" + str(tc_packet_info["G_SERVICE"]) + "," + \ + self.print_buffer = "Telecommand TC[" + str(tc_packet_info["service"]) + "," + \ str(tc_packet_info["subservice"]) + "] with SSC " + \ str(tc_packet_info["ssc"]) + " sent with data " + \ self.return_data_string(tc_packet_info["data"])