Skip to content
Snippets Groups Projects
Forked from an inaccessible project.
obsw_pus_tc_packer.py 9.37 KiB
# -*- coding: utf-8 -*-
"""
Program: obsw_module_test.py
Date: 01.11.2019
Description: Packs the TC queue for specific G_SERVICE or device testing

Manual:
Contains a G_SERVICE select factory which returns specific G_SERVICE or device tests.
Also contains an all G_SERVICE and all device tc queue packer.
Run script with -s <Service or Device> -m 3 with optional -p parameter for file output inside the log folder

@author: R. Mueller
"""
import os

from tc.obsw_pus_tc_base import PusTelecommand, TcQueueT
from tc.OBSW_TcService2 import packService2TestInto
from tc.OBSW_TcService3 import packService3TestInto
from tc.OBSW_TcService8 import packService8TestInto
from tc.OBSW_TcService200 import packModeData, packService200TestInto
import config.OBSW_Config as g
from datetime import datetime
from collections import deque
from typing import Union


def service_test_select(service: Union[int, str], serviceQueue: TcQueueT) -> TcQueueT:
    if service == 2:
        return packService2TestInto(serviceQueue)
    elif service == 3:
        return packService3TestInto(serviceQueue)
    elif service == 5:
        return packService5TestInto(serviceQueue)
    elif service == 8:
        return packService8TestInto(serviceQueue)
    elif service == 9:
        return packService9TestInto(serviceQueue)
    elif service == 17:
        return packService17TestInto(serviceQueue)
    elif service == 200:
        return packService200TestInto(serviceQueue)
    elif service == "Dummy":
        return packDummyDeviceTestInto(serviceQueue)
    elif service == "GPS0":
        # Object ID: GPS Device
        objectId = g.GPS0_ObjectId
        return packGpsTestInto(objectId, serviceQueue)
    elif service == "GPS1":
        # Object ID: GPS Device
        objectId = g.GPS1_ObjectId
        return packGpsTestInto(objectId, serviceQueue)
    elif service == "Error":
        return packErrorTestingInto(serviceQueue)
    else:
        print("Invalid Service !")
        exit()


def packService5TestInto(tcQueue: TcQueueT) -> TcQueueT:
    tcQueue.appendleft(("print", "Testing Service 5"))
    # disable events
    tcQueue.appendleft(("print", "\r\nTesting Service 5: Disable event"))
    command = PusTelecommand(service=5, subservice=6, SSC=500)
    tcQueue.appendleft(command.packCommandTuple())
    # trigger event
    tcQueue.appendleft(("print", "\r\nTesting Service 5: Trigger event"))
    command = PusTelecommand(service=17, subservice=128, SSC=510)
    tcQueue.appendleft(command.packCommandTuple())
    # enable event
    tcQueue.appendleft(("print", "\r\nTesting Service 5: Enable event"))
    command = PusTelecommand(service=5, subservice=5, SSC=520)
    tcQueue.appendleft(command.packCommandTuple())
    # trigger event
    tcQueue.appendleft(("print", "\r\nTesting Service 5: Trigger another event"))
    command = PusTelecommand(service=17, subservice=128, SSC=530)
    tcQueue.appendleft(command.packCommandTuple())
    tcQueue.appendleft(("print", "\r"))
    tcQueue.appendleft(("export", "log/tmtc_log_service5.txt"))
    return tcQueue


def packService9TestInto(tcQueue: TcQueueT) -> TcQueueT:
    tcQueue.appendleft(("print", "Testing Service 9"))
    timeTestASCIICodeA = '2019-08-30T20:50:33.892429Z' + '\0'
    timeTestASCIICodeB = '2019-270T05:50:33.002000Z' + '\0'
    timeTestCurrentTime = datetime.now().isoformat() + "Z" + '\0'
    timeTest1 = timeTestASCIICodeA.encode('ascii')
    timeTest2 = timeTestASCIICodeB.encode('ascii')
    timeTest3 = timeTestCurrentTime.encode('ascii')
    print("Time Code 1 :" + str(timeTest1))
    print("Time Code 2 :" + str(timeTest2))
    print("Time Code 3 :" + str(timeTest3) + "\r")
    # time setting
    tcQueue.appendleft(("print", "\r\nTesting Service 9: Testing timecode A"))
    command = PusTelecommand(service=9, subservice=128, SSC=900, data=timeTest1)
    tcQueue.appendleft(command.packCommandTuple())
    tcQueue.appendleft(("print", "\r\nTesting Service 9: Testing timecode B"))
    command = PusTelecommand(service=9, subservice=128, SSC=910, data=timeTest2)
    tcQueue.appendleft(command.packCommandTuple())
    tcQueue.appendleft(("print", "\r\nTesting Service 9: Testing timecode Current Time"))
    command = PusTelecommand(service=9, subservice=128, SSC=920, data=timeTest3)
    tcQueue.appendleft(command.packCommandTuple())
    # TODO: Add other time formats here
    tcQueue.appendleft(("print", "\r"))
    tcQueue.appendleft(("export", "log/tmtc_log_service9.txt"))
    return tcQueue


def packService17TestInto(tcQueue: TcQueueT) -> TcQueueT:
    tcQueue.appendleft(("print", "Testing Service 17"))
    # ping test
    tcQueue.appendleft(("print", "\n\rTesting Service 17: Ping Test"))
    command = PusTelecommand(service=17, subservice=1, SSC=1700)
    tcQueue.appendleft(command.packCommandTuple())
    # enable event
    tcQueue.appendleft(("print", "\n\rTesting Service 17: Enable Event"))
    command = PusTelecommand(service=5, subservice=5, SSC=52)
    tcQueue.appendleft(command.packCommandTuple())
    # test event
    tcQueue.appendleft(("print", "\n\rTesting Service 17: Trigger event"))
    command = PusTelecommand(service=17, subservice=128, SSC=1701)
    tcQueue.appendleft(command.packCommandTuple())
    tcQueue.appendleft(("print", "\r"))
    tcQueue.appendleft(("export", "log/tmtc_log_service17.txt"))
    return tcQueue


def packDummyDeviceTestInto(tcQueue: TcQueueT) -> TcQueueT:
    tcQueue.appendleft(("print", "Testing Dummy Device"))
    # Object ID: Dummy Device
    objectId = bytearray([0x44, 0x00, 0xAF, 0xFE])
    # Set On Mode
    tcQueue.appendleft(("print", "\n\rTesting Service Dummy: Set On"))
    modeData = packModeData(objectId, 1, 0)
    command = PusTelecommand(service=200, subservice=1, SSC=1, data=modeData)
    tcQueue.appendleft(command.packCommandTuple())
    # Test Service 2 commands
    tcQueue.appendleft(("print", "\n\rTesting Service Dummy: Service 2"))
    packService2TestInto(tcQueue, True)
    # Test Service 8
    tcQueue.appendleft(("print", "\n\rTesting Service Dummy: Service 8"))
    packService8TestInto(tcQueue, True)
    tcQueue.appendleft(("print", "\r"))
    tcQueue.appendleft(("export", "log/tmtc_log_service_dummy.txt"))
    return tcQueue


def packGpsTestInto(objectId: bytearray, tcQueue: TcQueueT) -> TcQueueT:
    if objectId == g.GPS0_ObjectId:
        gpsString = "GPS0"
    elif objectId == g.GPS1_ObjectId:
        gpsString = "GPS1"
    else:
        gpsString = "unknown"
    tcQueue.appendleft(("print", "Testing " + gpsString + " Device"))
    # Set Mode Off
    tcQueue.appendleft(("print", "\n\rTesting  " + gpsString + ": Set Off"))
    modeData = packModeData(objectId, 0, 0)
    command = PusTelecommand(service=200, subservice=1, SSC=11, data=modeData)
    tcQueue.appendleft(command.packCommandTuple())
    # Set Mode On
    tcQueue.appendleft(("print", "\n\rTesting " + gpsString + ": Set On"))
    modeData = packModeData(objectId, 1, 0)
    command = PusTelecommand(service=200, subservice=1, SSC=12, data=modeData)
    tcQueue.appendleft(command.packCommandTuple())
    # Enable HK report
    sidGps = 0
    if objectId == g.GPS0_ObjectId:
        sidGps = g.GPS0_SID
        tcQueue.appendleft(("print", "\r\nTesting " + gpsString + ": Enable HK Reporting"))
        command = PusTelecommand(service=3, subservice=5, SSC=13, data=sidGps)
        tcQueue.appendleft(command.packCommandTuple())
    elif objectId == g.GPS1_ObjectId:
        sidGps = g.GPS1_SID
        tcQueue.appendleft(("print", "\r\nTesting " + gpsString + ": Enable HK Reporting"))
        command = PusTelecommand(service=3, subservice=5, SSC=14, data=sidGps)
        tcQueue.appendleft(command.packCommandTuple())
    # pack wait interval until mode is on and a few gps replies have been received
    tcQueue.appendleft(("wait", 5))
    # Disable HK reporting
    tcQueue.appendleft(("print", "\r\nTesting Service 3: Disable " + gpsString + " definition"))
    command = PusTelecommand(service=3, subservice=6, SSC=15, data=sidGps)
    tcQueue.appendleft(command.packCommandTuple())
    # Set Mode Off
    tcQueue.appendleft(("print", "\n\rTesting " + gpsString + ": Set Off"))
    modeData = packModeData(objectId, 0, 0)
    command = PusTelecommand(service=200, subservice=1, SSC=13, data=modeData)
    tcQueue.appendleft(command.packCommandTuple())
    tcQueue.appendleft(("print", "\r"))
    tcQueue.appendleft(("export", "log/tmtc_log_service_" + gpsString + ".txt"))
    return tcQueue


def packErrorTestingInto(tcQueue: TcQueueT) -> TcQueueT:
    # a lot of events
    command = PusTelecommand(service=17, subservice=129, SSC=2010)
    tcQueue.appendleft(command.packCommandTuple())
    # a lot of ping testing
    command = PusTelecommand(service=17, subservice=130, SSC=2020)
    tcQueue.appendleft(command.packCommandTuple())
    return tcQueue


def createTotalTcQueue() -> TcQueueT:
    if not os.path.exists("log"):
        os.mkdir("log")
    tcQueue = deque()
    tcQueue = packService2TestInto(tcQueue)
    tcQueue = packService5TestInto(tcQueue)
    tcQueue = packService8TestInto(tcQueue)
    tcQueue = packService9TestInto(tcQueue)
    tcQueue = packService17TestInto(tcQueue)
    tcQueue = packService200TestInto(tcQueue)
    tcQueue = packDummyDeviceTestInto(tcQueue)
    # objectId = bytearray([0x44, 0x00, 0x1F, 0x00])
    # tc_queue = packGpsTestInto(objectId, tc_queue)
    return tcQueue