Skip to content
Snippets Groups Projects
Forked from an inaccessible project.
OBSW_TmTcClient.py 7.75 KiB
#!/usr/bin/env python3
"""
@file
    OBSW_TmTcClient.py
@date
    01.11.2019
@brief
    This client was developed by KSat for the SOURCE project to test the on-board software.
    It can be used to to send and receive TMTC packets and TMTC sequences.
@manual
    Manual installation of crcmod might be needed
        1. Install pip if it is not installed yet
        2. Install crcmod and all other reqiored packages:
           Command: pip install crcmod
           or use IDE (interpreter settings -> pip in PyCharm)

    The script can be used by specifying command line parameters. Please run this script with the -h flag
    or without any command line parameters to display options. GUI is work-in-progress
    It might be necessary to set board or PC IP address if using ethernet communication.
    Default values should work normally though. Use higher timeout value (-t Parameter) for STM32

    Example command to test service 17,
    assuming no set client IP (set manually to PC IP Address if necessary) and default board IP 169.254.1.38:
    OBSW_TmTcClient.py -m 3 -s 17
    Example to run Unit Test:
    OBSW_TmTcClient.py -m 5
    Example to test service 17 with HK output and serial communication:
    OBSW_TmTcClient.py -m 3 -s 17 --hk -c 1
    Get command line help:
    OBSW_TmTcClient.py -h

    There are four different Modes:
        0. GUI Mode: Experimental mode, also called if no input parameter are specified
        1. Listener Mode: Only Listen for incoming TM packets
        2. SingleCommandMode: Send Single Command repeatedly until answer is received,
            only listen after that
        3. ServiceTestMode: Send all Telecommands belonging to a certain service
            and scan for replies for each telecommand. Listen after that
        4. SoftwareTestMode: Send all services and perform reply scanning like mode 3.
            Listen after that
        5. Unit Test Mode: Performs a unit test which returns a simple OK or NOT OK. This mode
            has the capability to send TCs in bursts, where applicable

    If there are problems receiving packets with Ethernet Communication,
    use the tool Wireshark to track ethernet communication
    for UDP echo packets (requests and response).
    If the packets appear, there might be a problematic firewall setting.
    Please ensure that python.exe UDP packets are not blocked in advanced firewall settings
    and create a rule to allow packets from port 2008.

@author:
    S. Gaisser, J. Meier, R. Mueller
"""
import atexit
from collections import deque
import unittest
import logging

from test import OBSW_PusServiceTest
from config import OBSW_Config as g
from config.OBSW_Config import setGlobals
from tc.OBSW_TcPacker import PUSTelecommand, createTotalTcQueue, serviceTestSelect
from sendreceive.OBSW_SingleCommandSenderReceiver import SingleCommandSenderReceiver
from sendreceive.OBSW_SequentialSenderReceiver import SequentialCommandSenderReceiver
from sendreceive.OBSW_TmListener import TmListener
from utility.OBSW_ArgParser import parseInputArguments
from utility.OBSW_TmTcPrinter import TmTcPrinter, TmTcPrinterT
from comIF.OBSW_Ethernet_ComIF import EthernetComIF
from comIF.OBSW_Serial_ComIF import SerialComIF
from gui.OBSW_TmtcGUI import TmTcGUI
from gui.OBSW_BackendTest import TmTcBackend
from utility.OBSW_ExitHandler import keyboardInterruptHandler
from comIF.OBSW_ComInterface import ComIF_T


def main():
    args = parseInputArguments()
    setGlobals(args)
    tmtcPrinter = TmTcPrinter(g.displayMode, g.printToFile, True)
    tmListener = 0
    communicationInterface = 0
    if g.modeId == g.modeList.GUIMode:
        backend = TmTcBackend()
        backend.start()
        GUI = TmTcGUI()
        GUI.start()
        backend.join()
        GUI.join()
        print("Both processes have closed")
        exit()
    else:
        communicationInterface = setCommunicationInterface(tmtcPrinter)
        atexit.register(keyboardInterruptHandler, comInterface=communicationInterface)
        tmListener = TmListener(
            comInterface=communicationInterface, tmTimeout=g.tmTimeout, tcTimeoutFactor=g.tcSendTimeoutFactor)
        tmListener.start()
    if g.modeId == g.modeList.ListenerMode:
        print("Listening for packages...")
    elif g.modeId == g.modeList.SingleCommandMode:
        pusPacketTuple = commandPreparation()
        SenderAndReceiver = SingleCommandSenderReceiver(
            comInterface=communicationInterface, tmtcPrinter=tmtcPrinter, tmListener=tmListener,
            pusPacketTuple=pusPacketTuple, tmTimeout=g.tmTimeout, tcTimeoutFactor=g.tcSendTimeoutFactor,
            doPrintToFile=g.printToFile)
        SenderAndReceiver.sendSingleTcAndReceiveTm()

    elif g.modeId == g.modeList.ServiceTestMode:
        serviceQueue = deque()
        SenderAndReceiver = SequentialCommandSenderReceiver(
            comInterface=communicationInterface, tmtcPrinter=tmtcPrinter, tmTimeout=g.tmTimeout,
            tcQueue=serviceTestSelect(g.service, serviceQueue), tcTimeoutFactor=g.tcSendTimeoutFactor,
            doPrintToFile=g.printToFile, tmListener=tmListener)
        SenderAndReceiver.sendQueueTcAndReceiveTmSequentially()

    elif g.modeId == g.modeList.SoftwareTestMode:
        allTcQueue = createTotalTcQueue()
        SenderAndReceiver = SequentialCommandSenderReceiver(
            comInterface=communicationInterface, tmtcPrinter=tmtcPrinter, tmTimeout=g.tmTimeout,
            tcQueue=allTcQueue, tcTimeoutFactor=g.tcSendTimeoutFactor, doPrintToFile=g.printToFile,
            tmListener=tmListener)
        SenderAndReceiver.sendQueueTcAndReceiveTmSequentially()

    elif g.modeId == g.modeList.UnitTest:
        """
        Set up test suite and run it with runner. Verbosity specifies detail level
        """
        # noinspection PyTypeChecker
        suite = unittest.TestLoader().loadTestsFromModule(OBSW_PusServiceTest)
        unittest.TextTestRunner(verbosity=2).run(suite)

    else:
        print("Unknown Mode, Configuration error !")
        exit()
    # At some later point, the program will run permanently and be able to take commands. For now
    # we put a permanent loop here so the program doesnt exit automatically (TM Listener is daemonic)
    while True:
        pass


# Prepare command for single command testing
def commandPreparation():
    # Direct command which triggers an additional step reply and one completion reply
    # Single Command Testing
    command = PUSTelecommand(service=17, subservice=1, SSC=21)
    # command.print()
    # file = bytearray([1, 2, 3, 4, 5])
    # command = PUSTelecommand(service=23, subservice=1, SSC=21, data=file)
    # command.packCommandTuple()
    return command.packCommandTuple()


def setCommunicationInterface(tmtcPrinter: TmTcPrinterT) -> ComIF_T:
    """
    Return the desired communication interface object
    :param tmtcPrinter: TmTcPrinter object.
    :return: CommunicationInterface object
    """
    communicationInterface = None
    try:
        if g.comIF == 0 and g.modeId != g.modeList.UnitTest:
            communicationInterface = EthernetComIF(
                tmtcPrinter=tmtcPrinter, tmTimeout=g.tmTimeout, tcTimeoutFactor=g.tcSendTimeoutFactor,
                sendAddress=g.sendAddress, receiveAddress=g.recAddress)
        elif g.modeId != g.modeList.UnitTest:
            comPort = g.comPort
            baudRate = 115200
            g.serialTimeout = 0.05
            communicationInterface = SerialComIF(tmtcPrinter=tmtcPrinter, comPort=comPort, baudRate=baudRate,
                                                 serialTimeout=g.serialTimeout)
        else:
            print("Could not set up communication interface !")
            exit()
        return communicationInterface
    except (IOError, OSError):
        print("Error setting up communication interface")
        logging.exception("Error")
        exit()


if __name__ == "__main__":
    main()