From 640efd8de09292bc4255679d5e1ce1ef78d4b20e Mon Sep 17 00:00:00 2001 From: Robin Mueller <robin.mueller.m@gmail.com> Date: Thu, 12 Mar 2020 12:02:46 +0100 Subject: [PATCH] introducing strong typing hints --- comIF/OBSW_ComInterface.py | 54 +++++++++++++------ comIF/OBSW_DummyComIF.py | 4 +- comIF/OBSW_Serial_ComIF.py | 41 +++++++++----- .../OBSW_SingleCommandSenderReceiver.py | 2 +- sendreceive/OBSW_TmListener.py | 2 +- tc/OBSW_TcPacker.py | 24 +++++---- tc/OBSW_TcPacket.py | 15 ++++-- tc/OBSW_TcService200.py | 5 +- test/OBSW_UnitTest.py | 2 +- tm/OBSW_PusPacket.py | 5 +- tm/OBSW_PusTm.py | 13 +++-- utility/OBSW_TmTcPrinter.py | 11 ++-- 12 files changed, 116 insertions(+), 62 deletions(-) diff --git a/comIF/OBSW_ComInterface.py b/comIF/OBSW_ComInterface.py index 5121548..b5c8373 100644 --- a/comIF/OBSW_ComInterface.py +++ b/comIF/OBSW_ComInterface.py @@ -8,16 +8,18 @@ Description: Generic Communication Interface. Defines the syntax of the communic @author: R. Mueller """ from abc import abstractmethod -from typing import TypeVar, Tuple, Union, List, Deque -from tm.OBSW_PusTm import pusPacketT, pusPacketInfoT +from typing import TypeVar, Tuple, Union, List +from tm.OBSW_PusTm import PusTmT, pusTmQueueT, pusTmTupleQueueT, pusTmInfoQueueT +from utility.OBSW_TmTcPrinter import TmTcPrinterT +from tc.OBSW_TcPacket import PusTcT, pusTcInfoT ComIF_T = TypeVar('ComIF_T', bound='CommunicationInterface') -pusTupleQueueT = Deque[Tuple[pusPacketT, pusPacketInfoT]] -packetListT = List[pusPacketT] + +packetListT = List[PusTmT] class CommunicationInterface: - def __init__(self, tmtcPrinter): + def __init__(self, tmtcPrinter: TmTcPrinterT): self.tmtcPrinter = tmtcPrinter @abstractmethod @@ -25,7 +27,7 @@ class CommunicationInterface: pass @abstractmethod - def sendTelecommand(self, tcPacket, tcPacketInfo="") -> None: + def sendTelecommand(self, tcPacket: PusTcT, tcPacketInfo: pusTcInfoT = None) -> None: """ Send telecommands :param tcPacket: TC packet to send @@ -35,7 +37,7 @@ class CommunicationInterface: pass @abstractmethod - def receiveTelemetry(self, parameters=0): + def receiveTelemetry(self, parameters: any = 0): """ Returns a list of packets. Most of the time, this will simply call the pollInterface function :param parameters: @@ -45,7 +47,7 @@ class CommunicationInterface: return packetList @abstractmethod - def pollInterface(self, parameters) -> Tuple[bool, packetListT]: + def pollInterface(self, parameters: any = 0) -> Tuple[bool, packetListT]: """ Poll the interface and return a list of received packets :param parameters: @@ -55,7 +57,7 @@ class CommunicationInterface: pass @abstractmethod - def dataAvailable(self, parameters): + def dataAvailable(self, parameters: any): """ Check whether data is available :param parameters: @@ -63,13 +65,35 @@ class CommunicationInterface: """ pass - # Receive Telemetry and store it into a queue - def receiveTelemetryAndStoreIntoQueue(self, tmQueue): - pass + def receiveTelemetryAndStoreInfo(self, tmInfoQueue: pusTmInfoQueueT) -> Union[None, pusTmInfoQueueT]: + """ + Receive telemetry and store packet information in dict format into a queue. + If no data is available, don't do anything. + :param tmInfoQueue: + :return: + """ + print("No child implementation provided yet!") + return None - # Receive Telemetry and store a tuple consisting of TM information and TM packet into queue - def receiveTelemetryAndStoreTuple(self, tmTupleQueue) -> Union[None, pusTupleQueueT]: - pass + def receiveTelemetryAndStoreTm(self, tmQueue: pusTmQueueT) -> Union[None, pusTmQueueT]: + """ + Receive telemetry packets and store TM object into a queue. + If no data is available, don't do anything. + :param tmQueue: + :return: + """ + print("No child implementation provided yet!") + return None + + def receiveTelemetryAndStoreTuple(self, tmTupleQueue: pusTmTupleQueueT) -> Union[None, pusTmTupleQueueT]: + """ + Poll telemetry and store a tuple consisting of TM information and the TM packet into the queue. + If no data is available, don't do anything. + :param tmTupleQueue: + :return: + """ + print("No child implementation provided yet!") + return None diff --git a/comIF/OBSW_DummyComIF.py b/comIF/OBSW_DummyComIF.py index a5b8149..24795c9 100644 --- a/comIF/OBSW_DummyComIF.py +++ b/comIF/OBSW_DummyComIF.py @@ -17,10 +17,10 @@ class DummyComIF(CommunicationInterface): def dataAvailable(self, parameters): pass - def pollInterface(self, parameters) -> Tuple[bool, list]: + def pollInterface(self, parameters: any = 0) -> Tuple[bool, list]: pass - def receiveTelemetry(self, parameters=0): + def receiveTelemetry(self, parameters: any = 0): pass def sendTelecommand(self, tcPacket, tcPacketInfo="") -> None: diff --git a/comIF/OBSW_Serial_ComIF.py b/comIF/OBSW_Serial_ComIF.py index 41cd950..30d5be5 100644 --- a/comIF/OBSW_Serial_ComIF.py +++ b/comIF/OBSW_Serial_ComIF.py @@ -8,9 +8,13 @@ import time import serial import logging -from typing import Tuple, Union, Deque -from comIF.OBSW_ComInterface import CommunicationInterface, pusTupleQueueT, packetListT +from typing import Tuple, List, Union +from comIF.OBSW_ComInterface import CommunicationInterface, pusTmQueueT, packetListT from tm.OBSW_TmPacket import PUSTelemetryFactory +from tm.OBSW_PusTm import pusTmInfoQueueT, pusTmTupleQueueT +from tc.OBSW_TcPacket import pusTcInfoT + +SERIAL_PACKET_MAX_SIZE = 1024 class SerialComIF(CommunicationInterface): @@ -34,7 +38,7 @@ class SerialComIF(CommunicationInterface): logging.exception("Error") exit() - def sendTelecommand(self, tcPacket, tcPacketInfo=""): + def sendTelecommand(self, tcPacket: bytearray, tcPacketInfo: pusTcInfoT = None): self.tmtcPrinter.printTelecommand(tcPacket, tcPacketInfo) self.serial.write(tcPacket) @@ -45,7 +49,7 @@ class SerialComIF(CommunicationInterface): else: return [] - def pollInterface(self, parameter=0) -> Tuple[bool, packetListT]: + def pollInterface(self, parameter: int = 0) -> Tuple[bool, packetListT]: if self.dataAvailable(): pusDataList, numberOfPackets = self.pollPusPackets() packetList = [] @@ -57,7 +61,7 @@ class SerialComIF(CommunicationInterface): else: return False, [] - def dataAvailable(self, timeout=0): + def dataAvailable(self, timeout: float = 0) -> bool: if self.serial.in_waiting > 0: return True elif timeout > 0: @@ -69,7 +73,7 @@ class SerialComIF(CommunicationInterface): elapsed_time = time.time() - start_time return False - def pollPusPackets(self): + def pollPusPackets(self) -> Tuple[List[bytearray], int]: pusDataList = [] self.data = self.serial.read(1024) packetSize = (self.data[4] << 8 | self.data[5]) + 7 @@ -84,7 +88,7 @@ class SerialComIF(CommunicationInterface): pusDataList.append(self.data) return pusDataList, self.numberOfPackets - def handleMultiplePackets(self, packetSize, readSize, pusDataList): + def handleMultiplePackets(self, packetSize: int, readSize: int, pusDataList: list): endOfBuffer = readSize - 1 endIndex = packetSize startIndex = 0 @@ -93,32 +97,41 @@ class SerialComIF(CommunicationInterface): while endIndex < endOfBuffer: endIndex = self.parseNextPackets(endIndex, pusDataList) - def parseNextPackets(self, endIndex, pusDataList): + def parseNextPackets(self, endIndex: int, pusDataList: list) -> int: startIndex = endIndex endIndex = endIndex + 5 nextPacketSize = (self.data[endIndex - 1] << 8 | self.data[endIndex]) + 7 - if nextPacketSize > 1024: + if nextPacketSize > SERIAL_PACKET_MAX_SIZE: print("PUS Polling: Very Large packet detected, large packet reading not implemented yet !") print("Detected Size: " + str(nextPacketSize)) - return pusDataList, self.numberOfPackets + return SERIAL_PACKET_MAX_SIZE endIndex = startIndex + nextPacketSize pusData = self.data[startIndex:endIndex] pusDataList.append(pusData) self.numberOfPackets = self.numberOfPackets + 1 return endIndex - def receiveTelemetryAndStoreIntoQueue(self, tmQueue): + def receiveTelemetryAndStoreTm(self, tmQueue: pusTmQueueT) -> Union[None, pusTmQueueT]: + (packetReceived, pusPackets) = self.pollInterface() + if packetReceived: + for packet in pusPackets: + tmQueue.append(packet) + return tmQueue + + def receiveTelemetryAndStoreInfo(self, tmInfoQueue: pusTmInfoQueueT) -> Union[None, pusTmInfoQueueT]: (packetReceived, pusPackets) = self.pollInterface() if packetReceived: for packet in pusPackets: - tmQueue.put(packet) + tmInfo = packet.packTmInformation() + tmInfoQueue.append(tmInfo) + return tmInfoQueue - def receiveTelemetryAndStoreTuple(self, tmTupleQueue: Deque) -> pusTupleQueueT: + def receiveTelemetryAndStoreTuple(self, tmTupleQueue: pusTmTupleQueueT) -> Union[None, pusTmTupleQueueT]: (packetReceived, pusPackets) = self.pollInterface() if packetReceived: for packet in pusPackets: tmInfo = packet.packTmInformation() - tmTupleQueue.append((packet, tmInfo)) + tmTupleQueue.append((tmInfo, packet)) return tmTupleQueue diff --git a/sendreceive/OBSW_SingleCommandSenderReceiver.py b/sendreceive/OBSW_SingleCommandSenderReceiver.py index 4f0266a..0c670db 100644 --- a/sendreceive/OBSW_SingleCommandSenderReceiver.py +++ b/sendreceive/OBSW_SingleCommandSenderReceiver.py @@ -39,7 +39,7 @@ class SingleCommandSenderReceiver(CommandSenderReceiver): exit() def sendSingleTcAndReceiveTm(self): - self.tmListener.modeId = g.modeList[2] + self.tmListener.modeId = g.modeList.SingleCommandMode self.tmListener.modeChangeEvent.set() self.comInterface.sendTelecommand(self.pusPacket, self.pusPacketInfo) while not self.replyReceived: diff --git a/sendreceive/OBSW_TmListener.py b/sendreceive/OBSW_TmListener.py index 2c83842..01848bb 100644 --- a/sendreceive/OBSW_TmListener.py +++ b/sendreceive/OBSW_TmListener.py @@ -90,7 +90,7 @@ class TmListener: print("TM Listener: Reply sequence received!") self.replyEvent.set() elif self.modeId == g.modeList.UnitTest: - self.comInterface.receiveTelemetryAndStoreTuple(self.tmInfoQueue) + self.tmInfoQueue = self.comInterface.receiveTelemetryAndStoreTuple(self.tmInfoQueue) def checkForOneTelemetrySequence(self) -> bool: """ diff --git a/tc/OBSW_TcPacker.py b/tc/OBSW_TcPacker.py index 5d2142e..5f11e00 100644 --- a/tc/OBSW_TcPacker.py +++ b/tc/OBSW_TcPacker.py @@ -13,7 +13,7 @@ Run script with -s <Service or Device> -m 3 with optional -p parameter for file """ import os -from tc.OBSW_TcPacket import PUSTelecommand +from tc.OBSW_TcPacket import PUSTelecommand, pusTcTupleT from tc.OBSW_TcService2 import packService2TestInto from tc.OBSW_TcService3 import packService3TestInto from tc.OBSW_TcService8 import packService8TestInto @@ -21,10 +21,14 @@ from tc.OBSW_TcService200 import packModeData, packService200TestInto import config.OBSW_Config as g from datetime import datetime from collections import deque -from typing import Deque,Union +from typing import Deque, Union, Tuple +tcAuxiliaryTupleT = Tuple[str, any] +# Union: Type can be either of those 2 +tcQueueT = Deque[Union[tcAuxiliaryTupleT, pusTcTupleT]] -def serviceTestSelect(service: Union[int, str], serviceQueue: Deque) -> Deque: + +def serviceTestSelect(service: Union[int, str], serviceQueue: tcQueueT) -> tcQueueT: if service == 2: return packService2TestInto(serviceQueue) elif service == 3: @@ -56,7 +60,7 @@ def serviceTestSelect(service: Union[int, str], serviceQueue: Deque) -> Deque: exit() -def packService5TestInto(tcQueue: Deque): +def packService5TestInto(tcQueue: tcQueueT) -> tcQueueT: tcQueue.append(("print", "Testing Service 5")) # disable events tcQueue.append(("print", "\r\nTesting Service 5: Disable event")) @@ -79,7 +83,7 @@ def packService5TestInto(tcQueue: Deque): return tcQueue -def packService9TestInto(tcQueue: Deque) -> Deque: +def packService9TestInto(tcQueue: tcQueueT) -> tcQueueT: tcQueue.append(("print", "Testing Service 9")) timeTestASCIICodeA = '2019-08-30T20:50:33.892429Z' + '\0' timeTestASCIICodeB = '2019-270T05:50:33.002000Z' + '\0' @@ -106,7 +110,7 @@ def packService9TestInto(tcQueue: Deque) -> Deque: return tcQueue -def packService17TestInto(tcQueue: Deque) -> Deque: +def packService17TestInto(tcQueue: tcQueueT) -> tcQueueT: tcQueue.append(("print", "Testing Service 17")) # ping test tcQueue.append(("print", "\n\rTesting Service 17: Ping Test")) @@ -125,7 +129,7 @@ def packService17TestInto(tcQueue: Deque) -> Deque: return tcQueue -def packDummyDeviceTestInto(tcQueue: Deque) -> Deque: +def packDummyDeviceTestInto(tcQueue: tcQueueT) -> tcQueueT: tcQueue.append(("print", "Testing Dummy Device")) # Object ID: Dummy Device objectId = bytearray([0x44, 0x00, 0xAF, 0xFE]) @@ -145,7 +149,7 @@ def packDummyDeviceTestInto(tcQueue: Deque) -> Deque: return tcQueue -def packGpsTestInto(objectId: bytearray, tcQueue: Deque) -> Deque: +def packGpsTestInto(objectId: bytearray, tcQueue: tcQueueT) -> tcQueueT: if objectId == g.GPS0_ObjectId: gpsString = "GPS0" elif objectId == g.GPS1_ObjectId: @@ -191,7 +195,7 @@ def packGpsTestInto(objectId: bytearray, tcQueue: Deque) -> Deque: return tcQueue -def packErrorTestingInto(tcQueue: Deque) -> Deque: +def packErrorTestingInto(tcQueue: tcQueueT) -> tcQueueT: # a lot of events command = PUSTelecommand(service=17, subservice=129, SSC=2010) tcQueue.append(command.packCommandTuple()) @@ -201,7 +205,7 @@ def packErrorTestingInto(tcQueue: Deque) -> Deque: return tcQueue -def createTotalTcQueue(): +def createTotalTcQueue() -> tcQueueT: if not os.path.exists("log"): os.mkdir("log") tcQueue = deque() diff --git a/tc/OBSW_TcPacket.py b/tc/OBSW_TcPacket.py index 02ae70d..bfa6e5a 100644 --- a/tc/OBSW_TcPacket.py +++ b/tc/OBSW_TcPacket.py @@ -7,13 +7,18 @@ Created on Wed Apr 4 11:43:00 2018 import crcmod import logging +from typing import TypeVar, Dict, Tuple, Union + +PusTcT = TypeVar('PusTcT', bound='PUSTelecommand') +pusTcInfoT = Union[Dict[str, any], None] +pusTcTupleT = Tuple[pusTcInfoT, bytearray] class PUSTelecommand: headerSize = 6 - def __init__(self, service: int, subservice: int, SSC=0, data=bytearray([]), sourceId=0, version=0, - packetType=1, dataFieldHeaderFlag=1, apid=0x73, length=0): + def __init__(self, service: int, subservice: int, SSC=0, data=bytearray([]), sourceId: int = 0, version: int = 0, + packetType: int = 1, dataFieldHeaderFlag: int = 1, apid: int = 0x73, length: int = 0): self.packetId = [0x0, 0x0] self.packetId[0] = ((version << 5) & 0xE0) | ((packetType & 0x01) << 4) | \ ((dataFieldHeaderFlag & 0x01) << 3) | ((apid & 0x700) >> 8) @@ -37,7 +42,7 @@ class PUSTelecommand: logging.exception("Error") exit() - def pack(self): + def pack(self) -> bytearray: dataToPack = bytearray() dataToPack.append(self.packetId[0]) dataToPack.append(self.packetId[1]) @@ -58,7 +63,7 @@ class PUSTelecommand: dataToPack.append(crc & 0xFF) return dataToPack - def packInformation(self): + def packInformation(self) -> pusTcInfoT: tcInformation = { "service": self.service, "subservice": self.subservice, @@ -68,7 +73,7 @@ class PUSTelecommand: } return tcInformation - def packCommandTuple(self): + def packCommandTuple(self) -> pusTcTupleT: commandTuple = (self.packInformation(), self.pack()) return commandTuple diff --git a/tc/OBSW_TcService200.py b/tc/OBSW_TcService200.py index a9c545c..b37942a 100644 --- a/tc/OBSW_TcService200.py +++ b/tc/OBSW_TcService200.py @@ -12,11 +12,12 @@ Run script with -s <Service or Device> -m 3 with optional -p parameter for file @author: R. Mueller """ from tc.OBSW_TcPacket import PUSTelecommand +from tc.OBSW_TcPacker import tcQueueT from typing import Deque import struct -def packService200TestInto(tcQueue: Deque) -> Deque: +def packService200TestInto(tcQueue: tcQueueT) -> tcQueueT: tcQueue.append(("print", "Testing Service 200")) # Object ID: Dummy Device objectId = bytearray([0x44, 0x00, 0xAF, 0xFE]) @@ -45,7 +46,7 @@ def packService200TestInto(tcQueue: Deque) -> Deque: # Mode 0: Off, Mode 1: Mode On, Mode 2: Mode Normal, Mode 3: Mode Raw -def packModeData(objectId, mode_, submode_): +def packModeData(objectId: bytearray, mode_: int, submode_: int) -> bytearray: # Normal mode mode = struct.pack(">I", mode_) # Submode default diff --git a/test/OBSW_UnitTest.py b/test/OBSW_UnitTest.py index 2c84fe5..1bc0960 100644 --- a/test/OBSW_UnitTest.py +++ b/test/OBSW_UnitTest.py @@ -181,7 +181,7 @@ class TestService(unittest.TestCase): def analyseService5or17TM(self, tmInfoQueue, assertionDict): self.miscCounter = 0 while not tmInfoQueue.empty(): - currentTmInfo = tmInfoQueue.get() + currentTmInfo = tmInfoQueue.pop() # Tc verification scanning is generic and has been moved to the superclass if currentTmInfo["service"] == 1: self.scanForRespectiveTc(currentTmInfo) diff --git a/tm/OBSW_PusPacket.py b/tm/OBSW_PusPacket.py index 000e8cc..dea7bf9 100644 --- a/tm/OBSW_PusPacket.py +++ b/tm/OBSW_PusPacket.py @@ -111,7 +111,10 @@ class OBSWPusPacket: byteArray = byteArray[12:] self.data = byteArray[:len(byteArray)-2] self.crc = byteArray[len(byteArray)-2] << 8 | byteArray[len(byteArray)-1] - + + def getRawPacket(self) -> bytearray: + return self.packetRaw + def printPusPacketHeader(self, array): self.dataFieldHeader.printDataFieldHeader(array) self.PUSHeader.printPusPacketHeader(array) diff --git a/tm/OBSW_PusTm.py b/tm/OBSW_PusTm.py index 591ff3b..7e622c1 100644 --- a/tm/OBSW_PusTm.py +++ b/tm/OBSW_PusTm.py @@ -1,8 +1,13 @@ from tm.OBSW_PusPacket import OBSWPusPacket -from typing import TypeVar, Dict +from typing import TypeVar, Dict, Tuple, Deque, Union -pusPacketT = TypeVar('pusPacketT', bound='PUSTelemetry') -pusPacketInfoT = Dict[str, any] +PusTmT = TypeVar('PusTmT', bound='PUSTelemetry') +pusTmInfoT = Dict[str, any] +pusTmTupleT = Tuple[pusTmInfoT, PusTmT] + +pusTmQueueT = Deque[PusTmT] +pusTmInfoQueueT = Deque[pusTmInfoT] +pusTmTupleQueueT = Deque[pusTmTupleT] class PUSTelemetry(OBSWPusPacket): @@ -26,7 +31,7 @@ class PUSTelemetry(OBSWPusPacket): def printTelemetryColumnHeaders(self, array): super().printPusPacketHeaderColumnHeaders(array) - def packTmInformation(self) -> pusPacketInfoT: + def packTmInformation(self) -> pusTmInfoT: tmInformation = { "service": self.getService(), "subservice": self.getSubservice(), diff --git a/utility/OBSW_TmTcPrinter.py b/utility/OBSW_TmTcPrinter.py index e7970ce..19b0812 100644 --- a/utility/OBSW_TmTcPrinter.py +++ b/utility/OBSW_TmTcPrinter.py @@ -170,12 +170,12 @@ class TmTcPrinter: return (byte >> shiftNumber) & 1 # This function handles the printing of Telecommands - def printTelecommand(self, pusPacket, pusPacketInfo=""): + def printTelecommand(self, pusPacket, pusPacketInfo=None): if self.printTc: if len(pusPacket) == 0: print("TMTC Printer: Empty packet was sent, configuration error") exit() - if pusPacketInfo == "": + if pusPacketInfo is None: print("TMTC Printer: No packet info supplied to print") return if self.displayMode == "short": @@ -191,10 +191,9 @@ class TmTcPrinter: def handleLongTcPrint(self, pusPacketInfo): try: - self.printBuffer = "Telecommand TC[" + str(pusPacketInfo["service"]) + "," + str( - pusPacketInfo["subservice"]) \ - + "] with SSC " + str(pusPacketInfo["ssc"]) + " sent with data " \ - + self.returnDataString(pusPacketInfo["data"]) + self.printBuffer = "Telecommand TC[" + str(pusPacketInfo["service"]) + "," + \ + str(pusPacketInfo["subservice"]) + "] with SSC " + str(pusPacketInfo["ssc"]) \ + + " sent with data " + self.returnDataString(pusPacketInfo["data"]) print(self.printBuffer) self.addPrintBufferToFileBuffer() except TypeError: -- GitLab