Skip to content
Snippets Groups Projects
Forked from an inaccessible project.
OBSW_UdpClient.py 9.16 KiB
#!/usr/bin/python3.7
# -*- coding: utf-8 -*-
"""
@file
    OBSW_UdpClient.py
@date
    01.11.2019
@brief
    This client was developed by KSat for the SOURCE project to test the on-board software.
    It can be used to to send and receive TMTC packages.
@manual
    Manual installation of crcmod might be needed
        1. Install pip if it is not installed yet
        2. Install crcmod and all other reqiored packages:
           Command: pip install crcmod
           or use IDE (interpreter settings -> pip in PyCharm)

    The script can be used by specifying command line parameters. Please run this script with the -h flag
    or without any command line parameters to display options. GUI is work-in-progress
    It might be necessary to set board or PC IP address if using ethernet communication.
    Default values should work normally though.

    Example command to test service 17,
    assuming no set client IP (set manually to PC IP Address if necessary) and default board IP 169.254.1.38:
    OBSW_UdpClient.py -m 3 -s 17
    Example to run Unit Test:
    OBSW_UdpClient.py -m 5

    There are four different Modes:
        0. GUI Mode: Experimental mode, also called if no input parameter are specified
        1. Listener Mode: Only Listen for incoming TM packets
        2. SingleCommandMode: Send Single Command repeatedly until answer is received,
            only listen after that
        3. ServiceTestMode: Send all Telecommands belonging to a certain service
            and scan for replies for each telecommand. Listen after that
        4. SoftwareTestMode: Send all services and perform reply scanning like mode 3.
            Listen after that
        5. Unit Test Mode: Performs a unit test which returns a simple OK or NOT OK. This mode
            has the capability to send TCs in bursts, where applicable

    The TC timeout factor is mulitplied with the TM timeout to specifiy
    when a TC is sent again, if no reply is received. The timeout factor is higher for the STM32.

    If there are problems receiving packets, use the tool Wireshark to track ethernet communication
    for UDP echo packets (requests and response).
    If the packets appear, there might be a problematic firewall setting.
    Please ensure that python.exe UDP packets are not blocked in advanced firewall settings
    and create a rule to allow packets from port 2008.

@author:
    S. Gaisser, J. Meier, R. Mueller
"""

import sys
import atexit
import signal
import queue
import socket
import unittest
import argparse

from test import OBSW_UnitTest
import OBSW_Config as g
from tc.OBSW_TcPacker import PUSTelecommand, createTotalTcQueue, serviceTestSelect
from sendreceive.OBSW_CommandSenderReceiver import CommandSenderReceiver, connectToBoard
from sendreceive.OBSW_SingleCommandSenderReceiver import SingleCommandSenderReceiver
from sendreceive.OBSW_SequentialSenderReceiver import SequentialCommandSenderReceiver
from utility.OBSW_TmTcPrinter import TmtcPrinter
from comIF.OBSW_Ethernet_ComIF import EthernetComIF
from comIF.OBSW_Serial_ComIF import SerialComIF

# Mode options, set by args parser
modeList = {
    0: "GUIMode",
    1: "ListenerMode",
    2: "SingleCommandMode",
    3: "ServiceTestMode",
    4: "SoftwareTestMode",
    5: "OBSWUnitTest"
}


# noinspection PyTypeChecker
def main():
    args = parseInputArguments()
    setGlobals(args)
    if g.modeId == "GUIMode":
        print("GUI not implemented yet")
        exit()

    setUpSocket()
    atexit.register(g.keyboardInterruptHandler)
    print("Attempting to connect")
    tmtcPrinter = TmtcPrinter(g.displayMode, g.printToFile, True)
    if g.comIF == 0:
        communicationInterface = EthernetComIF(tmtcPrinter, g.tmTimeout, g.tcSendTimeoutFactor,
                                               g.sockSend, g.sockReceive, g.sendAddress)
    else:
        comPort = 'COM8'
        baudRate = 9600
        communicationInterface = SerialComIF(tmtcPrinter, comPort, baudRate, 0.2)
    connectToBoard()

    if g.modeId == "ListenerMode":
        Receiver = CommandSenderReceiver(communicationInterface, tmtcPrinter, g.tmTimeout, g.tcSendTimeoutFactor,
                                         g.printToFile)
        Receiver.comInterface.performListenerMode()

    elif g.modeId == "SingleCommandMode":
        pusPacketTuple = commandPreparation()
        SenderAndReceiver = SingleCommandSenderReceiver(communicationInterface, tmtcPrinter, pusPacketTuple,
                                                        g.tmTimeout, g.tcSendTimeoutFactor, g.printToFile)
        SenderAndReceiver.sendSingleTcAndReceiveTm()

    elif g.modeId == "ServiceTestMode":
        serviceQueue = queue.Queue()
        SenderAndReceiver = SequentialCommandSenderReceiver(communicationInterface, tmtcPrinter, g.tmTimeout,
                                                            serviceTestSelect(g.service, serviceQueue),
                                                            g.tcSendTimeoutFactor, g.printToFile)
        SenderAndReceiver.sendQueueTcAndReceiveTmSequentially()

    elif g.modeId == "SoftwareTestMode":
        allTcQueue = createTotalTcQueue()
        SenderAndReceiver = SequentialCommandSenderReceiver(communicationInterface, tmtcPrinter, g.tmTimeout,
                                                            allTcQueue, g.tcSendTimeoutFactor, g.printToFile)
        SenderAndReceiver.sendQueueTcAndReceiveTmSequentially()

    elif g.modeId == "OBSWUnitTest":
        # Set up test suite and run it with runner
        # Verbosity specifies detail level
        suite = unittest.TestLoader().loadTestsFromModule(OBSW_UnitTest)
        unittest.TextTestRunner(verbosity=2).run(suite)

    else:
        print("Unknown Mode, Configuration error !")
        exit()


# Prepare command for single command testing
def commandPreparation():
    # Single Command Testing
    command = PUSTelecommand(service=17, subservice=1, SSC=21)
    command.packCommandTuple()
    return command.packCommandTuple()


def parseInputArguments():
    argParser = argparse.ArgumentParser(description="TMTC Client Command Line Interface")

    argParser.add_argument('-m', '--mode', type=int, help='Target Mode. Default is 1(Listener Mode), '
                           '0: GUI Mode, 1:Listener Mode, '
                           '2: Single Command Mode, 3: Service Test Mode, '
                           '4: Software Test Mode, 5: Unit Test Mode ', default=0)
    argParser.add_argument('-c', '--comIF', type=int, help='Communication Interface. 0 for Ethernet, 1 for Serial',
                           default=0)
    argParser.add_argument('--clientIP', help='Client(Computer) IP. Default:\'\'', default='')
    argParser.add_argument('--boardIP', help='Board IP. Default: 169.254.1.38', default='169.254.1.38')
    argParser.add_argument('-s', '--service',  help='Service to test. Default: 17', default=17)
    argParser.add_argument('-t', '--tmTimeout', type=float, help='TM Timeout. Default: 10)', default=10.0)
    argParser.add_argument('-p', '--printFile', help='Supply -p to print output to file. Default: False',
                           action='store_true')
    argParser.add_argument('-o', '--tcTimeoutFactor', type=float, help='TC Timeout Factor. Default: 2.0', default=2.0)
    argParser.add_argument('-r', '--rawDataPrint', help='Supply -r to print all raw data directly', action='store_true')
    argParser.add_argument('-d', '--shortDisplayMode', help='Supply -d to print short output', action='store_true')
    if len(sys.argv) == 1:
        print("No Input Arguments specified, setting default values.")
        argParser.print_help()
    args = argParser.parse_args()
    print(args)
    return args


def setGlobals(args):
    if args.shortDisplayMode:
        displayMode = "short"
    else:
        displayMode = "long"
    # Board IP address and ethernet port IP address can be passed optionally
    # by passing line parameter In PyCharm: Set parameters in run configuration
    # Add IP address of Ethernet port. Use command ipconfig in windows console or ifconfig in Linux.
    portReceive = 2008
    recAddress = (args.clientIP, portReceive)
    # Static IP of board
    portSend = 7
    sendAddress = (args.boardIP, portSend)
    if 0 <= args.mode <= 5:
        modeId = modeList[args.mode]
    else:
        print("Invalid mode argument, setting default mode (1)")
        modeId = modeList[1]
    service = str(args.service)
    if service.isdigit():
        service = int(args.service)
    else:
        service = args.service
    g.recAddress = recAddress
    g.sendAddress = sendAddress
    g.comIF = args.comIF
    g.sockReceive = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    g.sockSend = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    g.modeId = modeId
    g.tmTimeout = args.tmTimeout
    g.printRawTmData = args.rawDataPrint
    g.displayMode = displayMode
    g.service = service
    g.printToFile = args.printFile


def setUpSocket():
    g.sockReceive.bind(g.recAddress)
    g.sockReceive.setblocking(False)


class GracefulKiller:
    kill_now = False

    def __init__(self):
        signal.signal(signal.SIGINT, self.exit_gracefully)
        signal.signal(signal.SIGTERM, self.exit_gracefully)

    def exit_gracefully(self):
        self.kill_now = True
        print("I was killed")


if __name__ == "__main__":
    main()