Something went wrong on our end
Forked from an inaccessible project.
-
Robin Mueller authoredRobin Mueller authored
OBSW_UdpClient.py 9.16 KiB
#!/usr/bin/python3.7
# -*- coding: utf-8 -*-
"""
@file
OBSW_UdpClient.py
@date
01.11.2019
@brief
This client was developed by KSat for the SOURCE project to test the on-board software.
It can be used to to send and receive TMTC packages.
@manual
Manual installation of crcmod might be needed
1. Install pip if it is not installed yet
2. Install crcmod and all other reqiored packages:
Command: pip install crcmod
or use IDE (interpreter settings -> pip in PyCharm)
The script can be used by specifying command line parameters. Please run this script with the -h flag
or without any command line parameters to display options. GUI is work-in-progress
It might be necessary to set board or PC IP address if using ethernet communication.
Default values should work normally though.
Example command to test service 17,
assuming no set client IP (set manually to PC IP Address if necessary) and default board IP 169.254.1.38:
OBSW_UdpClient.py -m 3 -s 17
Example to run Unit Test:
OBSW_UdpClient.py -m 5
There are four different Modes:
0. GUI Mode: Experimental mode, also called if no input parameter are specified
1. Listener Mode: Only Listen for incoming TM packets
2. SingleCommandMode: Send Single Command repeatedly until answer is received,
only listen after that
3. ServiceTestMode: Send all Telecommands belonging to a certain service
and scan for replies for each telecommand. Listen after that
4. SoftwareTestMode: Send all services and perform reply scanning like mode 3.
Listen after that
5. Unit Test Mode: Performs a unit test which returns a simple OK or NOT OK. This mode
has the capability to send TCs in bursts, where applicable
The TC timeout factor is mulitplied with the TM timeout to specifiy
when a TC is sent again, if no reply is received. The timeout factor is higher for the STM32.
If there are problems receiving packets, use the tool Wireshark to track ethernet communication
for UDP echo packets (requests and response).
If the packets appear, there might be a problematic firewall setting.
Please ensure that python.exe UDP packets are not blocked in advanced firewall settings
and create a rule to allow packets from port 2008.
@author:
S. Gaisser, J. Meier, R. Mueller
"""
import sys
import atexit
import signal
import queue
import socket
import unittest
import argparse
from test import OBSW_UnitTest
import OBSW_Config as g
from tc.OBSW_TcPacker import PUSTelecommand, createTotalTcQueue, serviceTestSelect
from sendreceive.OBSW_CommandSenderReceiver import CommandSenderReceiver, connectToBoard
from sendreceive.OBSW_SingleCommandSenderReceiver import SingleCommandSenderReceiver
from sendreceive.OBSW_SequentialSenderReceiver import SequentialCommandSenderReceiver
from utility.OBSW_TmTcPrinter import TmtcPrinter
from comIF.OBSW_Ethernet_ComIF import EthernetComIF
from comIF.OBSW_Serial_ComIF import SerialComIF
# Mode options, set by args parser
modeList = {
0: "GUIMode",
1: "ListenerMode",
2: "SingleCommandMode",
3: "ServiceTestMode",
4: "SoftwareTestMode",
5: "OBSWUnitTest"
}
# noinspection PyTypeChecker
def main():
args = parseInputArguments()
setGlobals(args)
if g.modeId == "GUIMode":
print("GUI not implemented yet")
exit()
setUpSocket()
atexit.register(g.keyboardInterruptHandler)
print("Attempting to connect")
tmtcPrinter = TmtcPrinter(g.displayMode, g.printToFile, True)
if g.comIF == 0:
communicationInterface = EthernetComIF(tmtcPrinter, g.tmTimeout, g.tcSendTimeoutFactor,
g.sockSend, g.sockReceive, g.sendAddress)
else:
comPort = 'COM8'
baudRate = 9600
communicationInterface = SerialComIF(tmtcPrinter, comPort, baudRate, 0.2)
connectToBoard()
if g.modeId == "ListenerMode":
Receiver = CommandSenderReceiver(communicationInterface, tmtcPrinter, g.tmTimeout, g.tcSendTimeoutFactor,
g.printToFile)
Receiver.comInterface.performListenerMode()
elif g.modeId == "SingleCommandMode":
pusPacketTuple = commandPreparation()
SenderAndReceiver = SingleCommandSenderReceiver(communicationInterface, tmtcPrinter, pusPacketTuple,
g.tmTimeout, g.tcSendTimeoutFactor, g.printToFile)
SenderAndReceiver.sendSingleTcAndReceiveTm()
elif g.modeId == "ServiceTestMode":
serviceQueue = queue.Queue()
SenderAndReceiver = SequentialCommandSenderReceiver(communicationInterface, tmtcPrinter, g.tmTimeout,
serviceTestSelect(g.service, serviceQueue),
g.tcSendTimeoutFactor, g.printToFile)
SenderAndReceiver.sendQueueTcAndReceiveTmSequentially()
elif g.modeId == "SoftwareTestMode":
allTcQueue = createTotalTcQueue()
SenderAndReceiver = SequentialCommandSenderReceiver(communicationInterface, tmtcPrinter, g.tmTimeout,
allTcQueue, g.tcSendTimeoutFactor, g.printToFile)
SenderAndReceiver.sendQueueTcAndReceiveTmSequentially()
elif g.modeId == "OBSWUnitTest":
# Set up test suite and run it with runner
# Verbosity specifies detail level
suite = unittest.TestLoader().loadTestsFromModule(OBSW_UnitTest)
unittest.TextTestRunner(verbosity=2).run(suite)
else:
print("Unknown Mode, Configuration error !")
exit()
# Prepare command for single command testing
def commandPreparation():
# Single Command Testing
command = PUSTelecommand(service=17, subservice=1, SSC=21)
command.packCommandTuple()
return command.packCommandTuple()
def parseInputArguments():
argParser = argparse.ArgumentParser(description="TMTC Client Command Line Interface")
argParser.add_argument('-m', '--mode', type=int, help='Target Mode. Default is 1(Listener Mode), '
'0: GUI Mode, 1:Listener Mode, '
'2: Single Command Mode, 3: Service Test Mode, '
'4: Software Test Mode, 5: Unit Test Mode ', default=0)
argParser.add_argument('-c', '--comIF', type=int, help='Communication Interface. 0 for Ethernet, 1 for Serial',
default=0)
argParser.add_argument('--clientIP', help='Client(Computer) IP. Default:\'\'', default='')
argParser.add_argument('--boardIP', help='Board IP. Default: 169.254.1.38', default='169.254.1.38')
argParser.add_argument('-s', '--service', help='Service to test. Default: 17', default=17)
argParser.add_argument('-t', '--tmTimeout', type=float, help='TM Timeout. Default: 10)', default=10.0)
argParser.add_argument('-p', '--printFile', help='Supply -p to print output to file. Default: False',
action='store_true')
argParser.add_argument('-o', '--tcTimeoutFactor', type=float, help='TC Timeout Factor. Default: 2.0', default=2.0)
argParser.add_argument('-r', '--rawDataPrint', help='Supply -r to print all raw data directly', action='store_true')
argParser.add_argument('-d', '--shortDisplayMode', help='Supply -d to print short output', action='store_true')
if len(sys.argv) == 1:
print("No Input Arguments specified, setting default values.")
argParser.print_help()
args = argParser.parse_args()
print(args)
return args
def setGlobals(args):
if args.shortDisplayMode:
displayMode = "short"
else:
displayMode = "long"
# Board IP address and ethernet port IP address can be passed optionally
# by passing line parameter In PyCharm: Set parameters in run configuration
# Add IP address of Ethernet port. Use command ipconfig in windows console or ifconfig in Linux.
portReceive = 2008
recAddress = (args.clientIP, portReceive)
# Static IP of board
portSend = 7
sendAddress = (args.boardIP, portSend)
if 0 <= args.mode <= 5:
modeId = modeList[args.mode]
else:
print("Invalid mode argument, setting default mode (1)")
modeId = modeList[1]
service = str(args.service)
if service.isdigit():
service = int(args.service)
else:
service = args.service
g.recAddress = recAddress
g.sendAddress = sendAddress
g.comIF = args.comIF
g.sockReceive = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
g.sockSend = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
g.modeId = modeId
g.tmTimeout = args.tmTimeout
g.printRawTmData = args.rawDataPrint
g.displayMode = displayMode
g.service = service
g.printToFile = args.printFile
def setUpSocket():
g.sockReceive.bind(g.recAddress)
g.sockReceive.setblocking(False)
class GracefulKiller:
kill_now = False
def __init__(self):
signal.signal(signal.SIGINT, self.exit_gracefully)
signal.signal(signal.SIGTERM, self.exit_gracefully)
def exit_gracefully(self):
self.kill_now = True
print("I was killed")
if __name__ == "__main__":
main()