#!/usr/bin/python3.8
"""
@file
obsw_tmtc_client.py
@date
01.11.2019
@brief
This client was developed by KSat for the SOURCE project to test the on-board software.
It can be used to to send and receive TMTC packets and TMTC sequences.
@manual
Manual installation of crcmod and pyserial might be needed
1. Install pip if it is not installed yet
2. Install crcmod and all other required packages:
Command: python3.8 -m pip install crcmod
or use IDE (interpreter settings -> pip in PyCharm)
The script can be used by specifying command line parameters.
Please run this script with the -h flag or without any command line parameters to display options.
GUI is work-in-progress
It might be necessary to set board or PC IP address if using ethernet communication.
Default values should work normally though. Use higher timeout value (-t Parameter) for STM32
Example command to test G_SERVICE 17,
assuming no set client IP (set manually to PC IP Address if necessary)
and default board IP 169.254.1.38:
obsw_tmtc_client.py -m 3 -s 17
Example to run Unit Test:
obsw_tmtc_client.py -m 5
Example to test G_SERVICE 17 with HK output and serial communication:
obsw_tmtc_client.py -m 3 -s 17 --hk -c 1
Get command line help:
obsw_tmtc_client.py -h
There are four different Modes:
0. GUI Mode: Experimental mode, also called if no input parameter are specified
1. Listener Mode: Only Listen for incoming TM packets
2. SingleCommandMode: Send Single Command repeatedly until answer is received,
only listen after that
3. ServiceTestMode: Send all Telecommands belonging to a certain G_SERVICE
and scan for replies for each telecommand. Listen after that
4. SoftwareTestMode: Send all services and perform reply scanning like mode 3.
Listen after that
5. Unit Test Mode: Performs a unit test which returns a simple OK or NOT OK. This mode
has the capability to send TCs in bursts, where applicable
If there are problems receiving packets with Ethernet Communication,
use the tool Wireshark to track ethernet communication
for UDP echo packets (requests and response).
If the packets appear, there might be a problematic firewall setting.
Please ensure that python.exe UDP packets are not blocked in advanced firewall settings
and create a rule to allow packets from port 2008.
@author:
S. Gaisser, J. Meier, R. Mueller
"""
import atexit
import unittest
import logging
import sys
from collections import deque
from test import obsw_pus_service_test
from config import obsw_config as g
from config.obsw_config import setGlobals
from tc.obsw_pus_tc_packer import PusTelecommand, create_total_tc_queue, service_test_select
from sendreceive.obsw_single_command_sender_receiver import SingleCommandSenderReceiver
from sendreceive.obsw_sequential_sender_receiver import SequentialCommandSenderReceiver
from sendreceive.obsw_tm_listener import TmListener
from utility.obsw_args_parser import parse_input_arguments
from utility.obsw_tmtc_printer import TmTcPrinter, TmTcPrinterT
from utility.obsw_exit_handler import keyboard_interrupt_handler
from comIF.obsw_ethernet_com_if import EthernetComIF
from comIF.obsw_serial_com_if import SerialComIF
from comIF.obsw_com_interface import ComIfT
from gui.obsw_tmtc_gui import TmTcGUI
from gui.obsw_backend_test import TmTcBackend
def main():
"""
Runs the TMTC client, depending on input arguments.
TODO: The whole code below could ba packed into a class too.
If a backend/frontend architecture is to be implemented, this will make things easier
We propably instantiate all sender/receiver objects (or maybe instantiate them once
we need them) on object construction. The main could spawn a process for the frontend
(e.g. GUI or web interface) and a process for the backend (the sender/receiver classes,
passing values to a SQL database, loading the MIB database...)
:return:
"""
args = parse_input_arguments()
setGlobals(args)
tmtc_printer = TmTcPrinter(g.G_DISPLAY_MODE, g.G_PRINT_TO_FILE, True)
if g.G_MODE_ID == g.ModeList.GUIMode:
backend = TmTcBackend()
backend.start()
gui = TmTcGUI()
gui.start()
backend.join()
gui.join()
print("Both processes have closed")
sys.exit()
else:
communication_interface = set_communication_interface(tmtc_printer)
atexit.register(keyboard_interrupt_handler, comInterface=communication_interface)
tm_listener = TmListener(
comInterface=communication_interface, tmTimeout=g.G_TM_TIMEOUT,
tcTimeoutFactor=g.G_TC_SEND_TIMEOUT_FACTOR)
tm_listener.start()
if g.G_MODE_ID == g.ModeList.ListenerMode:
print("Listening for packages...")
elif g.G_MODE_ID == g.ModeList.SingleCommandMode:
pus_packet_tuple = command_preparation()
sender_and_receiver = SingleCommandSenderReceiver(
com_interface=communication_interface, tmtc_printer=tmtc_printer, tm_listener=tm_listener,
pusPacketTuple=pus_packet_tuple)
sender_and_receiver.send_single_tc_and_receive_tm()
elif g.G_MODE_ID == g.ModeList.ServiceTestMode:
service_queue = deque()
sender_and_receiver = SequentialCommandSenderReceiver(
com_interface=communication_interface, tmtc_printer=tmtc_printer, tm_listener=tm_listener,
tc_queue=service_test_select(g.G_SERVICE, service_queue))
sender_and_receiver.send_queue_tc_and_receive_tm_sequentially()
elif g.G_MODE_ID == g.ModeList.SoftwareTestMode:
all_tc_queue = create_total_tc_queue()
sender_and_receiver = SequentialCommandSenderReceiver(
com_interface=communication_interface, tmtc_printer=tmtc_printer,
tc_queue=all_tc_queue, tm_listener=tm_listener)
sender_and_receiver.send_queue_tc_and_receive_tm_sequentially()
elif g.G_MODE_ID == g.ModeList.UnitTest:
# Set up test suite and run it with runner. Verbosity specifies detail level
g.G_TM_LISTENER = tm_listener
g.G_COM_INTERFACE = communication_interface
g.G_TMTC_PRINTER = tmtc_printer
# noinspection PyTypeChecker
suite = unittest.TestLoader().loadTestsFromModule(obsw_pus_service_test)
unittest.TextTestRunner(verbosity=2).run(suite)
else:
print("Unknown Mode, Configuration error !")
sys.exit()
# At some later point, the program will run permanently and be able to take commands.
# For now we put a permanent loop here so the program
# doesn't exit automatically (TM Listener is daemonic)
while True:
pass
def command_preparation():
"""
Prepare command for single command testing
:return:
"""
# Direct command which triggers an additional step reply and one completion reply
# Single Command Testing
command = PusTelecommand(service=17, subservice=1, ssc=21)
# command.print()
# file = bytearray([1, 2, 3, 4, 5])
# command = PUSTelecommand(G_SERVICE=23, subservice=1, SSC=21, _tm_data=file)
# command.packCommandTuple()
return command.pack_command_tuple()
def set_communication_interface(tmtc_printer: TmTcPrinterT) -> ComIfT:
"""
Return the desired communication interface object
:param tmtc_printer: TmTcPrinter object.
:return: CommunicationInterface object
"""
try:
if g.G_COM_IF == g.ComIF.Ethernet:
communication_interface = EthernetComIF(
tmtc_printer=tmtc_printer, tmTimeout=g.G_TM_TIMEOUT,
tcTimeoutFactor=g.G_TC_SEND_TIMEOUT_FACTOR, sendAddress=g.G_SEND_ADDRESS,
receiveAddress=g.G_REC_ADDRESS)
else:
com_port = g.G_COM_PORT
baud_rate = 115200
g.G_SERIAL_TIMEOUT = 0.03
communication_interface = SerialComIF(
tmtc_printer=tmtc_printer, com_port=com_port, baud_rate=baud_rate,
serial_timeout=g.G_SERIAL_TIMEOUT)
# else:
# pass
return communication_interface
except (IOError, OSError):
print("Error setting up communication interface")
logging.exception("Error")
sys.exit()
if __name__ == "__main__":
main()