Skip to content
Snippets Groups Projects
Forked from an inaccessible project.
OBSW_TcPacker.py 8.80 KiB
# -*- coding: utf-8 -*-
"""
Program: OBSW_UnitTest.py
Date: 01.11.2019
Description: Packs the TC queue for specific service or device testing

Manual:
Contains a service select factory which returns specific service or device tests.
Also contains an all service and all device tc queue packer.
Run script with -s <Service or Device> -m 3 with optional -p parameter for file output inside the log folder

@author: R. Mueller
"""
import os

from tc.OBSW_TcPacket import *
from tc.OBSW_TcService2 import packService2TestInto
from tc.OBSW_TcService3 import packService3TestInto
from tc.OBSW_TcService8 import packService8TestInto
from tc.OBSW_TcService200 import packModeData, packService200TestInto

from datetime import datetime
import queue


def serviceTestSelect(service, serviceQueue):
    if service == 2:
        return packService2TestInto(serviceQueue)
    elif service == 3:
        return packService3TestInto(serviceQueue)
    elif service == 5:
        return packService5TestInto(serviceQueue)
    elif service == 8:
        return packService8TestInto(serviceQueue)
    elif service == 9:
        return packService9TestInto(serviceQueue)
    elif service == 17:
        return packService17TestInto(serviceQueue)
    elif service == 200:
        return packService200TestInto(serviceQueue)
    elif service == "Dummy":
        return packDummyDeviceTestInto(serviceQueue)
    elif service == "GPS0":
        # Object ID: GPS Device
        objectId = bytearray([0x44, 0x00, 0x1F, 0x00])
        return packGpsTestInto(objectId, serviceQueue)
    elif service == "GPS1":
        # Object ID: GPS Device
        objectId = bytearray([0x44, 0x00, 0x20, 0x00])
        return packGpsTestInto(objectId, serviceQueue)
    elif service == "Error":
        return packErrorTestingInto(serviceQueue)
    else:
        print("Invalid Service !")
        exit()


def packService5TestInto(tcQueue):
    tcQueue.put(("print", "Testing Service 5"))
    # disable events
    tcQueue.put(("print", "\r\nTesting Service 5: Disable event"))
    command = PUSTelecommand(service=5, subservice=6, SSC=500)
    tcQueue.put(command.packCommandTuple())
    # trigger event
    tcQueue.put(("print", "\r\nTesting Service 5: Trigger event"))
    command = PUSTelecommand(service=17, subservice=128, SSC=510)
    tcQueue.put(command.packCommandTuple())
    # enable event
    tcQueue.put(("print", "\r\nTesting Service 5: Enable event"))
    command = PUSTelecommand(service=5, subservice=5, SSC=520)
    tcQueue.put(command.packCommandTuple())
    # trigger event
    tcQueue.put(("print", "\r\nTesting Service 5: Trigger another event"))
    command = PUSTelecommand(service=17, subservice=128, SSC=530)
    tcQueue.put(command.packCommandTuple())
    tcQueue.put(("print", "\r"))
    tcQueue.put(("export", "log/tmtc_log_service5.txt"))
    return tcQueue


def packService9TestInto(tcQueue):
    tcQueue.put(("print", "Testing Service 9"))
    timeTestASCIICodeA = '2019-08-30T20:50:33.892429Z' + '\0'
    timeTestASCIICodeB = '2019-270T05:50:33.002000Z' + '\0'
    timeTestCurrentTime = datetime.now().isoformat() + "Z" + '\0'
    timeTest1 = timeTestASCIICodeA.encode('ascii')
    timeTest2 = timeTestASCIICodeB.encode('ascii')
    timeTest3 = timeTestCurrentTime.encode('ascii')
    print("Time Code 1 :" + str(timeTest1))
    print("Time Code 2 :" + str(timeTest2))
    print("Time Code 3 :" + str(timeTest3) + "\r")
    # time setting
    tcQueue.put(("print", "\r\nTesting Service 9: Testing timecode A"))
    command = PUSTelecommand(service=9, subservice=128, SSC=900, data=timeTest1)
    tcQueue.put(command.packCommandTuple())
    tcQueue.put(("print", "\r\nTesting Service 9: Testing timecode B"))
    command = PUSTelecommand(service=9, subservice=128, SSC=910, data=timeTest2)
    tcQueue.put(command.packCommandTuple())
    tcQueue.put(("print", "\r\nTesting Service 9: Testing timecode Current Time"))
    command = PUSTelecommand(service=9, subservice=128, SSC=920, data=timeTest3)
    tcQueue.put(command.packCommandTuple())
    # TODO: Add other time formats here
    tcQueue.put(("print", "\r"))
    tcQueue.put(("export", "log/tmtc_log_service9.txt"))
    return tcQueue


def packService17TestInto(tcQueue):
    tcQueue.put(("print", "Testing Service 17"))
    # ping test
    tcQueue.put(("print", "\n\rTesting Service 17: Ping Test"))
    command = PUSTelecommand(service=17, subservice=1, SSC=1700)
    tcQueue.put(command.packCommandTuple())
    # enable event
    tcQueue.put(("print", "\n\rTesting Service 17: Enable Event"))
    command = PUSTelecommand(service=5, subservice=5, SSC=52)
    tcQueue.put(command.packCommandTuple())
    # test event
    tcQueue.put(("print", "\n\rTesting Service 17: Trigger event"))
    command = PUSTelecommand(service=17, subservice=128, SSC=1701)
    tcQueue.put(command.packCommandTuple())
    tcQueue.put(("print", "\r"))
    tcQueue.put(("export", "log/tmtc_log_service17.txt"))
    return tcQueue


def packDummyDeviceTestInto(tcQueue):
    tcQueue.put(("print", "Testing Dummy Device"))
    # Object ID: Dummy Device
    objectId = bytearray([0x44, 0x00, 0xAF, 0xFE])
    # Set On Mode
    tcQueue.put(("print", "\n\rTesting Service Dummy: Set On"))
    modeData = packModeData(objectId, 1, 0)
    command = PUSTelecommand(service=200, subservice=1, SSC=1, data=modeData)
    tcQueue.put(command.packCommandTuple())
    # Test Service 2 commands
    tcQueue.put(("print", "\n\rTesting Service Dummy: Service 2"))
    packService2TestInto(tcQueue, True)
    # Test Service 8
    tcQueue.put(("print", "\n\rTesting Service Dummy: Service 8"))
    packService8TestInto(tcQueue, True)
    tcQueue.put(("print", "\r"))
    tcQueue.put(("export", "log/tmtc_log_service_dummy.txt"))
    return tcQueue


def packGpsTestInto(objectId, tcQueue):
    if bytes(objectId).hex() == "44001f00":
        gpsString = "GPS0"
    elif bytes(objectId).hex() == "44002000":
        gpsString = "GPS1"
    tcQueue.put(("print", "Testing " + gpsString + " Device"))
    # Set Mode Off
    tcQueue.put(("print", "\n\rTesting  " + gpsString + ": Set Off"))
    modeData = packModeData(objectId, 0, 0)
    command = PUSTelecommand(service=200, subservice=1, SSC=11, data=modeData)
    tcQueue.put(command.packCommandTuple())
    # Set Mode On
    tcQueue.put(("print", "\n\rTesting " + gpsString + ": Set On"))
    modeData = packModeData(objectId, 1, 0)
    command = PUSTelecommand(service=200, subservice=1, SSC=12, data=modeData)
    tcQueue.put(command.packCommandTuple())
    # Enable HK report
    sidGps = 0
    if objectId == bytearray([0x44, 0x00, 0x1F, 0x00]):
        sidGps = bytearray([0x00, 0x00, 0x1f, 0x00])
        tcQueue.put(("print", "\r\nTesting " + gpsString + ": Enable HK Reporting"))
        command = PUSTelecommand(service=3, subservice=5, SSC=13, data=sidGps)
        tcQueue.put(command.packCommandTuple())
    elif objectId == bytearray([0x44, 0x00, 0x20, 0x00]):
        sidGps = bytearray([0x00, 0x00, 0x2f, 0x00])
        tcQueue.put(("print", "\r\nTesting " + gpsString + ": Enable HK Reporting"))
        command = PUSTelecommand(service=3, subservice=5, SSC=14, data=sidGps)
        tcQueue.put(command.packCommandTuple())
    # pack wait interval until mode is on and a few gps replies have been received
    tcQueue.put(("wait", 5))
    # Disable HK reporting
    tcQueue.put(("print", "\r\nTesting Service 3: Disable " + gpsString + " definition"))
    command = PUSTelecommand(service=3, subservice=6, SSC=15, data=sidGps)
    tcQueue.put(command.packCommandTuple())
    # Set Mode Off
    tcQueue.put(("print", "\n\rTesting " + gpsString + ": Set Off"))
    modeData = packModeData(objectId, 0, 0)
    command = PUSTelecommand(service=200, subservice=1, SSC=13, data=modeData)
    tcQueue.put(command.packCommandTuple())
    tcQueue.put(("print", "\r"))
    tcQueue.put(("export", "log/tmtc_log_service_" + gpsString + ".txt"))
    return tcQueue


def packErrorTestingInto(tcQueue):
    # a lot of events
    command = PUSTelecommand(service=17, subservice=129, SSC=2010)
    tcQueue.put(command.packCommandTuple())
    # a lot of ping testing
    command = PUSTelecommand(service=17, subservice=130, SSC=2020)
    tcQueue.put(command.packCommandTuple())
    return tcQueue


def createTotalTcQueue():
    if not os.path.exists("log"):
        os.mkdir("log")
    tcQueue = queue.Queue()
    tcQueue = packService2TestInto(tcQueue)
    tcQueue = packService5TestInto(tcQueue)
    tcQueue = packService8TestInto(tcQueue)
    tcQueue = packService9TestInto(tcQueue)
    tcQueue = packService17TestInto(tcQueue)
    tcQueue = packService200TestInto(tcQueue)
    tcQueue = packDummyDeviceTestInto(tcQueue)
    objectId = bytearray([0x44, 0x00, 0x1F, 0x00])
    tcQueue = packGpsTestInto(objectId, tcQueue)
    return tcQueue