Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • Robin.Mueller/tmtc
1 result
Show changes
Commits on Source (59)
Showing
with 499 additions and 269 deletions
_build/*
_dependencies/*
_bin/*
_bin
_obj
_dependencies
.settings/*
.settings
__pycache__
......@@ -17,6 +8,7 @@ __pycache__
*.tmp
*.open
*.ini
*.json
generators/*.csv
......
......@@ -13,7 +13,7 @@
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-m 3 -s Img -o A4U -c 1 -t 4" />
<option name="PARAMETERS" value="-m 3 -s Img -o A4U -c 1 -t 6" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" />
......
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="tmtcclient Disable LEDs" type="PythonConfigurationType" factoryName="Python" folderName="Serial Utility">
<module name="tmtc" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/obsw_tmtc_client.py" />
<option name="PARAMETERS" value="-m 3 -s led -o A1 -c 1 -t 2.2" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
</component>
\ No newline at end of file
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="tmtcclient Enable LEDs" type="PythonConfigurationType" factoryName="Python" folderName="Serial Utility">
<module name="tmtc" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/obsw_tmtc_client.py" />
<option name="PARAMETERS" value="-m 3 -s led -o A0 -c 1 -t 2.2" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
</component>
\ No newline at end of file
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="tmtcclient Report File Attributes" type="PythonConfigurationType" factoryName="Python" folderName="Serial FileManagement">
<module name="tmtc" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-m 3 -s SD -o 3 -c 1 -t 2.5" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
</component>
\ No newline at end of file
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="tmtcclient Service 3 Serial " type="PythonConfigurationType" factoryName="Python" folderName="Serial Service Test">
<configuration default="false" name="tmtcclient Service 3 IntErrorReporter Test" type="PythonConfigurationType" factoryName="Python" folderName="Serial Service Test">
<module name="tmtc" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
......@@ -13,7 +13,7 @@
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-m 3 -s 3 -c 1 -t 3 --hk" />
<option name="PARAMETERS" value="-m 3 -s 3 -c 1 -t 3 -o ie --hk" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" />
<option name="MODULE_MODE" value="false" />
......
TMTC Client
====
This client was developed by KSat for the SOURCE project to test the on-board software but
has evolved into a more generic tool for satellite developers to perform TMTC (Telemetry and Telecommand)
handling and testing via different communication interfaces. Currently, only the PUS standard is
implemented as a packet standard. This tool can be used either as a command line tool
or as a GUI tool.
This client currently supports the following communication interfaces:
1. Ethernet, UDP packets
2. Serial Communication
3. QEMU
## Cloning the repository
It is recommended to fork this repository for new missions.
The `tmtc_core` folder contains most generic software
components while this repository is based on the SOURCE mission.
However, this repostiory is a good starting point and example
implementation.
implementation which can be adapted easily to other projects.
Fork the repository or clone it with following command:
```sh
......@@ -26,19 +37,19 @@ pip install -r requirements.txt
Now the script can be tested by running
```sh
python obsw_tmtc_client.py -h
python tmtc_client_cli.py -h
```
It is recommended to use and setup PyCharm to also use the preconfigured
run configurations.
## Prerequisites
Runs with Python 3.8.
Tested with Python 3.8.
Don't use Python 2.x!
Manual installation of crcmod and pyserial might be needed.
It is recommended to install use PyCharm to run this client.
Students of the Uni Stuttgart usually have access to PyCharm Professional.
1. Install pip for used Python distribution if it is not installed yet
2. Install crcmod and all other required packages if using system python
compiler. If using the local venv as the compiler, these packages
......@@ -60,62 +71,80 @@ sudo apt-get install python-tk
On Windows, the package should be included.
## How To Use
It is recommended to use PyCharm and load the run configurations
to have a starting point. PyCharm also provided the option
of remote deployment, which allows TMTC testing with remote
setups (e.g. flatsat setup in a cleanroom).
Some configuration constants might be stored in a JSON file in the
config folder. To reset the configuration, delete the JSON file.
### Command line mode
The script can be used by specifying command line parameters.
Please run this script with the -h flag
or without any command line parameters to display options.
The client can be used to send via Ethernet or serial port.
GUI is work-in-progress.
It might be necessary to set board or PC IP address if using ethernet communication.
Default values should work normally though.
## Examples
Example command to test service 17,
assuming no set client IP (set manually to PC IP Address if necessary). IP Address or port numbers have to be
set manually in the global configuration file (IP address can be passed as a command line argument):
```sh
obsw_tmtc_client.py -m 3 -s 17
```
Example to run Unit Test:
Please run this script with the -h flag or without any command line parameters to
display options.
### Import run configurations in PyCharm
The PyCharm IDE can be used to comfortably manage a set of run configuations
(for example tests for different services). These configurations were shared
through the version control system git and should be imported automatically.
If these configurations dont show up, try to open the tmtc folder as
a new PyCharm project in a new window.
To add new configurations, go to Edit Configurations...
at the top right corner in the drop-down menu.
Specify the new run configurations and set a tick at Share through VCS.
### Examples
Example command to send a ping command. Specify the communication interface by
adding `-c <number>` to the command.
```sh
obsw_tmtc_client.py -m 5
tmtc_client_cli.py -m 3 -s 17
```
Example to test service 3 with serial communication, printing all housekeeping packets,
COM port needs to be typed in manually (or set with --COM \<COM PORT>):
Example to run listener mode
```sh
obsw_tmtc_client.py -m 3 -s 3 --hk -c
tmtc_client_cli.py -m 1
```
## Modes
There are different modes. Run the client with the `-h` flag
to display the modes.
## Architectural notes
Some additional information about the structure of this Python program
are provided here.
## Ethernet Communication
### Modes and the TMTC queue
There are different communication modes. Run the client with the `-h` flag
to display the modes. The TMTC commander is able to send multiple telecommands
sequentially by using a provided queue. The queue is filled by the
developer. Some examples can be found in the `tc` folder. The queue
can also be filled with special commands, for example a wait command or
a print command.
Port needs to be specified manually for now, IP adresses can be set in
command line.
This application is also able to listen to telemetry packets in a separate thread.
The specific TM handling is also implemented by the developer. Some
examples can be found in the `tm` folder.
### Issues
If there are problems receiving packets, use the tool Wireshark to track
ethernet communication for UDP echo packets (requests and response).
If the packets appear, there might be a problematic firewall setting.
Please ensure that python.exe UDP packets are not blocked in advanced firewall
settings and create a rule to allow packets from port 2008.
### Communication Interfaces
## Serial Communication
Serial communication was implemented and is tested for Windwos 10 and Ubuntu 20.04.
The communication interfaces decouple the used interface from the communication
logic. This enables to write the telecommand and telemetry specifications
without worrying about the used communication interface.
#### Serial Communication
Serial communication was implemented and is tested for Windows 10 and Ubuntu 20.04.
It requires the PySerial package installed.
It should be noted that there are several modes for the serial communication.
There is a fixed frame mode and a mode based on a simple DLE transport layer.
When using the DLE transport layer, sent packets are encoded with DLE while
received packets need to be DLE encoded.
## Module Test
Includes a moduel tester which sends TCs in a queue and automatically
analyzes the replies. This is the best way to test the functionality of the
software right now as a software internal TC injector has not been implemented
yet for the FSFW.
Some more information will follow on how to write Unit Tests.
## Issues
### Ethernet Communication
If there are issued with the Ethernet communcation,
there might be a problematic firewall setting.
It might be necessary to allow UDP packets on certain ports
## Developers Information
Code Style: [PEP8](https://www.python.org/dev/peps/pep-0008/).
......@@ -126,14 +155,3 @@ Install it with pip and then install and set-up the Pylint plugin in PyCharm.
There are a lot of features which would be nice, for example a GUI.
The architecture of the program should allow extension like that without
too many issues, as the sending and telemetry listening are decoupled.
## Import run configurations in PyCharm
The PyCharm IDE can be used to comfortably manage a set of run configuations
(for example tests for different services). These configurations were shared
through the version control system git and should be imported automatically.
If these configurations dont show up, try to open the tmtc folder as
a new PyCharm project in a new window.
To add new configurations, go to Edit Configurations...
at the top right corner in the drop-down menu.
Specify the new run configurations and set a tick at Share through VCS.
SOURCEbadge.png

313 KiB

"""
Set-up function. Initiates the communication interface.
"""
import json
import os
import sys
from typing import Union
import serial
import serial.tools.list_ports
from tmtc_core.comIF.obsw_com_interface import CommunicationInterface
from tmtc_core.comIF.obsw_dummy_com_if import DummyComIF
from tmtc_core.comIF.obsw_ethernet_com_if import EthernetComIF
......@@ -13,7 +18,7 @@ from tmtc_core.comIF.obsw_qemu_com_if import QEMUComIF
from tmtc_core.utility.obsw_logger import get_logger
from tmtc_core.utility.obsw_tmtc_printer import TmTcPrinter
import config.obsw_config as g
import config.tmtcc_config as g
LOGGER = get_logger()
......@@ -28,13 +33,16 @@ def set_communication_interface(tmtc_printer: TmTcPrinter) -> Union[Communicatio
if g.G_COM_IF == g.ComInterfaces.Ethernet:
communication_interface = EthernetComIF(
tmtc_printer=tmtc_printer, tm_timeout=g.G_TM_TIMEOUT,
tc_timeout_factor=g.G_TC_SEND_TIMEOUT_FACTOR, send_address=g.G_ETHERNET_SEND_ADDRESS,
tc_timeout_factor=g.G_TC_SEND_TIMEOUT_FACTOR,
send_address=g.G_ETHERNET_SEND_ADDRESS,
receive_address=g.G_ETHERNET_RECV_ADDRESS)
elif g.G_COM_IF == g.ComInterfaces.Serial:
serial_baudrate = g.G_SERIAL_BAUDRATE
serial_timeout = g.G_SERIAL_TIMEOUT
# Determine COM port, either extract from JSON file or ask from user.
com_port = determine_com_port()
communication_interface = SerialComIF(
tmtc_printer=tmtc_printer, com_port=g.G_COM_PORT, baud_rate=serial_baudrate,
tmtc_printer=tmtc_printer, com_port=com_port, baud_rate=serial_baudrate,
serial_timeout=serial_timeout,
ser_com_type=SerialCommunicationType.DLE_ENCODING)
communication_interface.set_dle_settings(
......@@ -58,3 +66,65 @@ def set_communication_interface(tmtc_printer: TmTcPrinter) -> Union[Communicatio
LOGGER.error("Error setting up communication interface")
LOGGER.exception("Error")
sys.exit()
def determine_com_port() -> str:
reconfigure_com_port = False
com_port = ""
if os.path.isfile("config/tmtcc_config.json"):
with open("config/tmtcc_config.json", "r") as write:
load_data = json.load(write)
com_port = load_data["COM_PORT"]
if not check_port_validity(com_port):
reconfigure = input(
"COM port from configuration file not contained within serial"
"port list. Reconfigure serial port? [y/n]: ")
if reconfigure.lower() in ['y', "yes"]:
write.close()
os.remove("config/tmtcc_config.json")
reconfigure_com_port = True
else:
reconfigure_com_port = True
if reconfigure_com_port:
com_port = prompt_com_port()
save_to_json = input("Do you want to store serial port to "
"configuration? (y/n): ")
if save_to_json.lower() in ['y', "yes"]:
with open("config/tmtcc_config.json", "w") as write:
json.dump(dict(COM_PORT=com_port), write, indent=4)
return com_port
def prompt_com_port() -> str:
while True:
com_port = input(
"Configuring serial port. Please enter COM Port"
"(enter h to display list of COM ports): ")
if com_port == 'h':
ports = serial.tools.list_ports.comports()
for port, desc, hwid in sorted(ports):
print("{}: {} [{}]".format(port, desc, hwid))
else:
if not check_port_validity(com_port):
print("Serial port not in list of available serial ports. Try again? [y/n]")
try_again = input()
if try_again.lower() in ['y', "yes"]:
continue
else:
break
else:
break
return com_port
def check_port_validity(com_port_to_check: str) -> bool:
port_list = []
ports = serial.tools.list_ports.comports()
for port, desc, hwid in sorted(ports):
port_list.append(port)
if com_port_to_check not in port_list:
return False
return True
"""
@file
obsw_config.py
@date
01.11.2019
@brief
Global settings for UDP client
@file tmtcc_config.py
@date 01.11.2019
@brief Global settings for TMTC commander.
"""
import struct
import pprint
import logging
from config.obsw_definitions import ModeList, ComInterfaces
from config.tmtcc_definitions import ModeList
from tmtc_core.tmtc_core_definitions import ComInterfaces
"""
Mission/Device specific information.
"""
# TODO: Automate / Autofill this file with the MIB parser
# TODO: JSON file so we can store variables which are not tracked by version control but still
# remain the same on a machine (e.g. COM port or IP addresses)
# Object IDs
# Commands
......@@ -58,7 +60,6 @@ G_MAX_APP_DATA_LENGTH = G_MAX_BINARY_FRAME_LENGTH - 100
G_COM_IF: ComInterfaces = ComInterfaces.QEMU
# COM Port for serial communication
G_COM_PORT = 'COM0'
G_SERIAL_TIMEOUT = 0.01
G_SERIAL_BAUDRATE = 230400
G_SERIAL_FRAME_SIZE = 256
......@@ -94,10 +95,11 @@ G_TM_LISTENER = None
G_COM_INTERFACE = None
G_TMTC_PRINTER = None
# noinspection PyUnusedLocal
def set_globals(args):
global G_ETHERNET_RECV_ADDRESS, G_ETHERNET_SEND_ADDRESS, G_SCRIPT_MODE, G_MODE_ID, G_SERVICE, \
G_DISPLAY_MODE, G_COM_IF, G_COM_PORT, G_SERIAL_TIMEOUT, G_TM_TIMEOUT, \
G_DISPLAY_MODE, G_COM_IF, G_SERIAL_TIMEOUT, G_TM_TIMEOUT, \
G_TC_SEND_TIMEOUT_FACTOR, G_PRINT_TO_FILE, G_PRINT_HK_DATA, G_PRINT_RAW_TM, G_PRINT_TM, \
G_OP_CODE, G_RESEND_TC, G_LISTENER_AFTER_OP
......@@ -149,11 +151,40 @@ def set_globals(args):
G_PRINT_TM = args.print_tm
G_PRINT_TO_FILE = args.print_log
G_PRINT_RAW_TM = args.rawDataPrint
G_COM_PORT = args.com_port
G_TM_TIMEOUT = args.tm_timeout
G_RESEND_TC = args.resend_tc
G_LISTENER_AFTER_OP = args.listener
from config.obsw_user_code import global_setup_hook
from config.tmtcc_user_code import global_setup_hook
global_setup_hook()
def set_glob_apid(new_apid: int):
global G_APID
G_APID = new_apid
def get_glob_apid():
global G_APID
return G_APID
def set_glob_com_if(new_com_if: ComInterfaces):
global G_COM_IF
G_COM_IF = new_com_if
def get_glob_com_if():
global G_COM_IF
return G_COM_IF
def set_glob_mode(new_mode : ModeList):
global G_MODE_ID
G_MODE_ID = new_mode
def get_glob_mode():
global G_MODE_ID
return G_MODE_ID
import enum
from typing import Tuple
ethernetAddressT = Tuple[str, int]
# Mode options, set by args parser
......@@ -17,8 +14,3 @@ class ModeList(enum.Enum):
PromptMode = 32
class ComInterfaces(enum.Enum):
Dummy = 0
Serial = 1
QEMU = 2
Ethernet = 3
......@@ -2,11 +2,14 @@
User defined code can be added here.
"""
from typing import Union, Tuple
from tmtc_core.tc.obsw_pus_tc_base import PusTcInfo
from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTcInfo
from tmtc_core.tmtc_core_definitions import ComInterfaces
from enum import Enum
# Yeah, I did not have a better idea yet..
# TODO: We really need that JSON file which will not be tracked by version control.
# In that JSON file, we could set the current developer, so that developers can
# hook into the config.
class Developer(Enum):
Robin = 0
......@@ -22,24 +25,24 @@ def command_preparation_hook() -> Tuple[bytearray, Union[None, PusTcInfo]]:
return prepare_robins_commands()
def prepare_robins_commands():
from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand
command = PusTelecommand(service=17, subservice=1, ssc=20)
return command.pack_command_tuple()
def global_setup_hook():
"""
Can be used to alter the global variables in a custom defined way.
For example, device specific com ports or ethernet ports can be set here.
The global variables in the config.obsw_config file can be edited here
by using the handle.
For example: config.obsw_config.G_ETHERNET_SEND_ADDRESS = new_send_address
"""
if Developer == Developer.Robin:
global_setup_hook_robin()
def prepare_robins_commands():
from tmtc_core.tc.obsw_pus_tc_base import PusTelecommand
command = PusTelecommand(service=17, subservice=1, ssc=20)
return command.pack_command_tuple()
def global_setup_hook_robin():
import config.obsw_config
pass
from config.tmtcc_config import get_glob_com_if, set_glob_apid
if get_glob_com_if() == ComInterfaces.Ethernet:
# Configure APID for FSFW example. Set this back to 0x73 for STM32!
set_glob_apid(0xEF)
"""
@brief Version file
"""
SW_NAME = "tmtcc"
SW_VERSION = 1
SW_SUBVERSION = 1
......@@ -6,41 +6,41 @@ from multiprocessing import Process
from collections import deque
from typing import Tuple, Union
from config import obsw_config as g
from config.obsw_definitions import ModeList
from config.obsw_user_code import command_preparation_hook
from tmtc_core.tmtc_core_definitions import ComInterfaces
from tmtc_core.utility.obsw_logger import get_logger
from tmtc_core.tc.obsw_pus_tc_base import PusTcInfo
from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTcInfo
from tmtc_core.sendreceive.obsw_single_command_sender_receiver import SingleCommandSenderReceiver
from tmtc_core.sendreceive.obsw_sequential_sender_receiver import SequentialCommandSenderReceiver
from tmtc_core.sendreceive.obsw_tm_listener import TmListener
from tmtc_core.comIF.obsw_com_interface import CommunicationInterface
from tmtc_core.utility.obsw_tmtc_printer import TmTcPrinter
from tmtc_core.utility.obsw_exit_handler import keyboard_interrupt_handler
from tmtc_core.pus_tc.tmtcc_pus_tc_packer import ServiceQueuePacker
from tc.obsw_pus_tc_packer import ServiceQueuePacker, create_total_tc_queue
from config.tmtcc_com_config import set_communication_interface
from config.tmtcc_definitions import ModeList
from pus_tc.tmtcc_tc_packer_hook import create_total_tc_queue
from test.obsw_pus_service_test import run_selected_pus_tests
from config.obsw_com_config import set_communication_interface
from utility.obsw_binary_uploader import BinaryFileUploader
from utility.tmtcc_binary_uploader import BinaryFileUploader
LOGGER = get_logger()
class TmTcHandler:
"""
This is the primary class which handles TMTC reception. This can be seen as the backend
in case a GUI or front-end is implemented.
"""
def __init__(self, init_mode: ModeList = ModeList.ListenerMode):
def __init__(self, init_com_if: ComInterfaces = ComInterfaces.Dummy,
init_mode: ModeList = ModeList.ListenerMode):
self.mode = init_mode
self.com_if = g.G_COM_IF
self.com_if = init_com_if
# This flag could be used later to command the TMTC Client with a front-end
self.one_shot_operation = True
self.tmtc_printer: Union[None, TmTcPrinter] = None
self.communication_interface: Union[None, CommunicationInterface] = None
self.tm_listener: Union[None, TmListener] = None
self.exit_on_com_if_init_failure = True
self.single_command_package: Tuple[bytearray, Union[None, PusTcInfo]] = bytearray(), None
......@@ -57,18 +57,27 @@ class TmTcHandler:
"""
self.mode = mode
def set_com_if(self, com_if: ComInterfaces):
self.com_if = com_if
@staticmethod
def prepare_tmtc_handler_start(init_mode: ModeList = g.ModeList.ListenerMode):
tmtc_handler = TmTcHandler(init_mode)
def prepare_tmtc_handler_start(
init_com_if: ComInterfaces = ComInterfaces.Dummy,
init_mode: ModeList = ModeList.ListenerMode):
tmtc_handler = TmTcHandler(init_com_if, init_mode)
tmtc_task = Process(target=TmTcHandler.start_handler, args=(tmtc_handler, ))
return tmtc_task
@staticmethod
def start_handler(executed_handler):
if not isinstance(executed_handler, TmTcHandler):
LOGGER.error("Unexpected argument, should be TmTcHandler!")
sys.exit(1)
executed_handler.initialize()
executed_handler.perform_operation()
executed_handler.start()
def initialize(self):
from config import tmtcc_config as g
"""
Perform initialization steps which might be necessary after class construction.
This has to be called at some point before using the class!
......@@ -79,12 +88,19 @@ class TmTcHandler:
com_interface=self.communication_interface, tm_timeout=g.G_TM_TIMEOUT,
tc_timeout_factor=g.G_TC_SEND_TIMEOUT_FACTOR
)
if self.communication_interface.valid:
atexit.register(keyboard_interrupt_handler, com_interface=self.communication_interface)
def start(self):
try:
self.communication_interface.open()
self.tm_listener.start()
else:
LOGGER.info("No communication interface set for now")
except IOError:
LOGGER.error("Communication Interface could not be opened!")
LOGGER.info("TM listener will not be started")
atexit.register(keyboard_interrupt_handler, com_interface=self.communication_interface)
if self.exit_on_com_if_init_failure:
LOGGER.error("Closing TMTC commander..")
sys.exit(1)
self.perform_operation()
def perform_operation(self):
"""
......@@ -95,7 +111,7 @@ class TmTcHandler:
except KeyboardInterrupt:
LOGGER.info("Keyboard Interrupt.")
sys.exit()
except IOError as e:
except IOError:
LOGGER.error("IO Error occured!")
sys.exit()
......@@ -103,34 +119,35 @@ class TmTcHandler:
"""
Command handling.
"""
if self.mode == g.ModeList.PromptMode:
if self.mode == ModeList.PromptMode:
self.prompt_mode()
if self.mode == g.ModeList.ListenerMode:
if self.mode == ModeList.ListenerMode:
if self.tm_listener.reply_event():
LOGGER.info("TmTcHandler: Packets received.")
self.tmtc_printer.print_telemetry_queue(self.tm_listener.retrieve_tm_packet_queue())
self.tm_listener.clear_tm_packet_queue()
self.tm_listener.clear_reply_event()
elif self.mode == g.ModeList.SingleCommandMode:
elif self.mode == ModeList.SingleCommandMode:
if self.single_command_package is None:
pus_packet_tuple = command_preparation()
else:
LOGGER.info("send package from gui")
LOGGER.info("Sending single packet from GUI..")
pus_packet_tuple = self.single_command_package
sender_and_receiver = SingleCommandSenderReceiver(
com_interface=self.communication_interface, tmtc_printer=self.tmtc_printer,
tm_listener=self.tm_listener)
LOGGER.info("Performing single command operation")
LOGGER.info("Performing single command operation..")
sender_and_receiver.send_single_tc_and_receive_tm(pus_packet_tuple=pus_packet_tuple)
self.mode = g.ModeList.PromptMode
self.mode = ModeList.PromptMode
elif self.mode == g.ModeList.ServiceTestMode:
elif self.mode == ModeList.ServiceTestMode:
from config import tmtcc_config as g
service_queue = deque()
service_queue_packer = ServiceQueuePacker()
op_code = g.G_OP_CODE
service_queue_packer.pack_service_queue(
service_queue_packer.pack_service_queue_core(
service=g.G_SERVICE, service_queue=service_queue, op_code=op_code)
if not self.communication_interface.valid:
return
......@@ -151,15 +168,16 @@ class TmTcHandler:
LOGGER.info("SequentialSenderReceiver: Exporting output to log file.")
self.tmtc_printer.print_file_buffer_list_to_file("log/tmtc_log.txt", True)
elif self.mode == g.ModeList.BinaryUploadMode:
elif self.mode == ModeList.BinaryUploadMode:
# Upload binary, prompt user for input, in the end prompt for new mode and enter that
# mode
file_uploader = BinaryFileUploader(self.communication_interface, self.tmtc_printer,
self.tm_listener)
file_uploader.perform_file_upload()
self.mode = g.ModeList.ListenerMode
self.mode = ModeList.ListenerMode
elif self.mode == g.ModeList.UnitTest:
elif self.mode == ModeList.UnitTest:
from config import tmtcc_config as g
# Set up test suite and run it with runner. Verbosity specifies detail level
g.G_TM_LISTENER = self.tm_listener
g.G_COM_INTERFACE = self.communication_interface
......@@ -172,15 +190,15 @@ class TmTcHandler:
sys.exit()
def __core_operation(self, one_shot):
if self.mode == g.ModeList.ListenerMode:
if self.mode == ModeList.ListenerMode:
one_shot = False
if not one_shot:
while True:
self.__handle_action()
if self.mode == g.ModeList.Idle:
if self.mode == ModeList.Idle:
LOGGER.info("TMTC Client in idle mode")
time.sleep(5)
elif self.mode == g.ModeList.ListenerMode:
elif self.mode == ModeList.ListenerMode:
time.sleep(1)
else:
self.__handle_action()
......@@ -219,4 +237,10 @@ def command_preparation() -> Tuple[bytearray, Union[None, PusTcInfo]]:
Prepare command for single command testing
:return:
"""
try:
from config.tmtcc_user_code import command_preparation_hook
except ImportError as e:
print(e)
LOGGER.error("Hook function for command application not implemented!")
sys.exit(1)
return command_preparation_hook()
#!/usr/bin/python3
"""
@brief This client was developed by KSat for the SOURCE project to test the on-board software.
@brief Core method called by entry point files to initiate the TMTC commander
@details
This client features multiple sender/receiver modes and has been designed
to be extensible and easy to use. This clien is based on the PUS standard for the format
of telecommands and telemetry. It can also send TMTC via different interfaces like the
serial interface (USB port) or ethernet interface.
@license
Copyright 2020 KSat e.V. Stuttgart
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
@manual
Run this file with the -h flag to display options.
@author R. Mueller
"""
from multiprocessing import Process
import sys
from config.tmtcc_version import SW_VERSION, SW_SUBVERSION
from tmtc_core.utility.obsw_logger import set_tmtc_logger, get_logger
from config.obsw_config import set_globals
import config.obsw_config as g
from config.tmtcc_config import set_globals
import config.tmtcc_config as g
from core.tmtc_backend import TmTcHandler
from core.tmtc_frontend import TmTcFrontend
from utility.obsw_args_parser import parse_input_arguments
from utility.tmtcc_args_parser import parse_input_arguments
LOGGER = get_logger()
def run_tmtc_client(use_gui: bool):
def run_tmtc_client(use_gui: bool, reduced_printout: bool = False):
"""
Main method, reads input arguments, sets global variables and start TMTC handler.
"""
if not reduced_printout:
print("-- Python TMTC Commander --")
if use_gui:
print("-- GUI mode --")
else:
print("-- Command line mode --")
print("-- Software version v" + str(SW_VERSION) + "." + str(SW_SUBVERSION) + " --")
set_tmtc_logger()
LOGGER.info("Starting TMTC Client")
LOGGER.info("Starting TMTC Client..")
if not use_gui:
LOGGER.info("Parsing input arguments")
LOGGER.info("Parsing input arguments..")
args = parse_input_arguments()
LOGGER.info("Setting global variables")
LOGGER.info("Setting global variables..")
set_globals(args)
LOGGER.info("Starting TMTC Handler")
LOGGER.info("Starting TMTC Handler..")
tmtc_frontend_task = Process
# Currently does not work, problems with QEMU / Asyncio
if not use_gui:
tmtc_handler = TmTcHandler(g.G_MODE_ID)
# The global variables are set by the argument parser.
tmtc_handler = TmTcHandler(g.get_glob_com_if(), g.get_glob_mode())
tmtc_handler.set_one_shot_or_loop_handling(g.G_LISTENER_AFTER_OP)
tmtc_handler.initialize()
tmtc_handler.perform_operation()
tmtc_handler.start()
else:
from PyQt5.QtWidgets import QApplication
app = QApplication(["TMTC Commander"])
tmtc_gui = TmTcFrontend()
tmtc_gui.start_ui()
# tmtc_handler_task = TmTcHandler.prepare_tmtc_handler_start()
# tmtc_frontend = TmTcFrontend()
# tmtc_frontend_task = tmtc_frontend.prepare_start(tmtc_frontend)
# tmtc_frontend_task.start()
# tmtc_handler_task.start()
# tmtc_handler_task.join()
# tmtc_frontend_task.join()
sys.exit(app.exec_())
......@@ -7,15 +7,18 @@
@manual
@author R. Mueller, P. Scheurenbrand
"""
import threading
from multiprocessing import Process
from PyQt5.QtWidgets import *
from PyQt5.QtGui import QPixmap, QIcon
from core.tmtc_backend import TmTcHandler
from tmtc_core.tc.obsw_pus_tc_base import PusTelecommand
from config import tmtcc_config
from tmtc_core.tmtc_core_definitions import ComInterfaces
from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand
from tmtc_core.utility.obsw_logger import get_logger
from config import obsw_config
from config.obsw_definitions import ComInterfaces
import threading
LOGGER = get_logger()
......@@ -23,10 +26,11 @@ LOGGER = get_logger()
TODO: Make it look nicer. Add SOURCE or KSat logo.
"""
class TmTcFrontend:
# TODO: this list should probably be inside an enum in the obsw_config.py
serviceList = [ 2, 3, 5, 8, 9, 17, 20, 200, "Dummy", "GPS0", "GPS1" ] # , "Error"]
class TmTcFrontend(QMainWindow):
# TODO: this list should probably be inside an enum in the tmtcc_config.py
serviceList = [2, 3, 5, 8, 9, 17, 20, 200, "Dummy", "GPS0", "GPS1"] # , "Error"]
service_test_button: QPushButton
single_command_button: QPushButton
......@@ -40,19 +44,20 @@ class TmTcFrontend:
is_busy: bool
def __init__(self):
super(TmTcFrontend, self).__init__()
self.tmtc_handler = TmTcHandler()
# TODO: Perform initialization on button press with specified ComIF
# Also, when changing ComIF, ensure that old ComIF is closed (e.g. with printout)
# Lock all other elements while ComIF is invalid.
self.tmtc_handler.initialize()
obsw_config.G_SERVICE = 17
obsw_config.G_COM_IF = obsw_config.ComInterfaces.QEMU
tmtcc_config.G_SERVICE = 17
tmtcc_config.G_COM_IF = tmtcc_config.ComInterfaces.QEMU
def prepare_start(self, args: any) -> Process:
return Process(target=self.start_ui)
def service_index_changed(self, index: int):
obsw_config.G_SERVICE = self.serviceList[index]
tmtcc_config.G_SERVICE = self.serviceList[index]
LOGGER.info("service_test_mode_selection updated: " + str(self.serviceList[index]))
def single_command_set_service(self, value):
......@@ -66,8 +71,8 @@ class TmTcFrontend:
def start_service_test_clicked(self):
LOGGER.info("start service test button pressed")
LOGGER.info("start testing service: " + str(obsw_config.G_SERVICE))
self.tmtc_handler.mode = obsw_config.ModeList.ServiceTestMode
LOGGER.info("start testing service: " + str(tmtcc_config.G_SERVICE))
self.tmtc_handler.mode = tmtcc_config.ModeList.ServiceTestMode
# start the action in a new process
p = threading.Thread(target=self.handle_tm_tc_action)
p.start()
......@@ -76,9 +81,9 @@ class TmTcFrontend:
LOGGER.info("send single command pressed")
# parse the values from the table
#service = int(self.commandTable.item(0, 0).text())
#subservice = int(self.commandTable.item(0, 1).text())
#ssc = int(self.commandTable.item(0, 2).text())
# service = int(self.commandTable.item(0, 0).text())
# subservice = int(self.commandTable.item(0, 1).text())
# ssc = int(self.commandTable.item(0, 2).text())
LOGGER.info("service: " + str(self.single_command_service) +
", subservice: " + str(self.single_command_sub_service) +
......@@ -94,7 +99,7 @@ class TmTcFrontend:
ssc=self.single_command_ssc)
self.tmtc_handler.single_command_package = command.pack_command_tuple()
self.tmtc_handler.mode = obsw_config.ModeList.SingleCommandMode
self.tmtc_handler.mode = tmtcc_config.ModeList.SingleCommandMode
# start the action in a new process
p = threading.Thread(target=self.handle_tm_tc_action)
p.start()
......@@ -113,33 +118,40 @@ class TmTcFrontend:
self.single_command_button.setEnabled(state)
def start_ui(self):
app = QApplication([])
win = QWidget()
win = QWidget(self)
self.setCentralWidget(win)
grid = QGridLayout()
self.setWindowTitle("TMTC Commander")
label = QLabel(self)
pixmap = QPixmap("SOURCEbadge.png") # QPixmap is the class, easy to put pic on screen
label.setGeometry(720, 15, 110, 110)
label.setPixmap(pixmap)
self.setWindowIcon(QIcon("SOURCEbadge.png"))
label.setScaledContents(True)
row = 0
grid.addWidget(QLabel("Configuration:"), row, 0, 1, 2)
row += 1
checkbox_console = QCheckBox("print output to console")
checkbox_console.setChecked(obsw_config.G_PRINT_TM)
checkbox_console.setChecked(tmtcc_config.G_PRINT_TM)
checkbox_console.stateChanged.connect(checkbox_console_print)
checkbox_log = QCheckBox("print output to log file")
checkbox_log.setChecked(obsw_config.G_PRINT_TO_FILE)
checkbox_log.setChecked(tmtcc_config.G_PRINT_TO_FILE)
checkbox_log.stateChanged.connect(checkbox_log_print)
checkbox_raw_tm = QCheckBox("print all raw TM data directly")
checkbox_raw_tm.setChecked(obsw_config.G_PRINT_RAW_TM)
checkbox_raw_tm.setChecked(tmtcc_config.G_PRINT_RAW_TM)
checkbox_raw_tm.stateChanged.connect(checkbox_print_raw_data)
checkbox_hk = QCheckBox("print hk data")
checkbox_hk.setChecked(obsw_config.G_PRINT_HK_DATA)
checkbox_hk.setChecked(tmtcc_config.G_PRINT_HK_DATA)
checkbox_hk.stateChanged.connect(checkbox_print_hk_data)
checkbox_short = QCheckBox("short display mode")
checkbox_short.setChecked(obsw_config.G_DISPLAY_MODE == "short")
checkbox_short.setChecked(tmtcc_config.G_DISPLAY_MODE == "short")
checkbox_short.stateChanged.connect(checkbox_short_display_mode)
grid.addWidget(checkbox_log, row, 0, 1, 1)
......@@ -160,14 +172,14 @@ class TmTcFrontend:
# TODO: set sensible min/max values
spin_timeout.setSingleStep(0.1)
spin_timeout.setMinimum(0.25)
spin_timeout.setMaximum(60)#
spin_timeout.setMaximum(60)
# https://youtrack.jetbrains.com/issue/PY-22908
# Ignore those warnings for now.
spin_timeout.valueChanged.connect(number_timeout)
grid.addWidget(spin_timeout, row, 0, 1, 1)
spin_timeout_factor = QDoubleSpinBox()
spin_timeout_factor.setValue(obsw_config.G_TC_SEND_TIMEOUT_FACTOR)
spin_timeout_factor.setValue(tmtcc_config.G_TC_SEND_TIMEOUT_FACTOR)
# TODO: set sensible min/max values
spin_timeout_factor.setSingleStep(0.1)
spin_timeout_factor.setMinimum(0.25)
......@@ -197,25 +209,25 @@ class TmTcFrontend:
row += 1
# com if configuration
grid.addWidget(QLabel("Communication Interface:"), row, 0, 1, 1)
com_if_comboBox = QComboBox()
com_if_combo_box = QComboBox()
# add all possible ComIFs to the comboBox
for comIf in obsw_config.ComInterfaces:
com_if_comboBox.addItem(comIf.name)
com_if_comboBox.setCurrentIndex(obsw_config.G_COM_IF.value)
com_if_comboBox.currentIndexChanged.connect(com_if_index_changed)
grid.addWidget(com_if_comboBox, row, 1, 1, 1)
for comIf in tmtcc_config.ComInterfaces:
com_if_combo_box.addItem(comIf.name)
com_if_combo_box.setCurrentIndex(tmtcc_config.G_COM_IF.value)
com_if_combo_box.currentIndexChanged.connect(com_if_index_changed)
grid.addWidget(com_if_combo_box, row, 1, 1, 1)
row += 1
# service test mode gui
grid.addWidget(QLabel("Service Test Mode:"), row, 0, 1, 2)
row += 1
comboBox = QComboBox()
combo_box = QComboBox()
for service in self.serviceList:
comboBox.addItem("Service - " + str(service))
comboBox.setCurrentIndex(self.serviceList.index(obsw_config.G_SERVICE))
comboBox.currentIndexChanged.connect(self.service_index_changed)
grid.addWidget(comboBox, row, 0, 1, 1)
combo_box.addItem("Service - " + str(service))
combo_box.setCurrentIndex(self.serviceList.index(tmtcc_config.G_SERVICE))
combo_box.currentIndexChanged.connect(self.service_index_changed)
grid.addWidget(combo_box, row, 0, 1, 1)
self.service_test_button = QPushButton()
self.service_test_button.setText("Start Service Test")
......@@ -284,14 +296,13 @@ class TmTcFrontend:
row += 1
win.setLayout(grid)
win.resize(900, 800)
win.show()
self.resize(900, 800)
self.show()
# resize table columns to fill the window width
#for i in range(0, 5):
# self.commandTable.setColumnWidth(i, int(self.commandTable.width() / 5) - 3)
app.exec_()
class SingleCommandTable(QTableWidget):
def __init__(self):
......@@ -308,49 +319,49 @@ class SingleCommandTable(QTableWidget):
self.setItem(0, 2, QTableWidgetItem("20"))
def com_if_index_changed(index: int):
obsw_config.G_COM_IF = ComInterfaces(index)
LOGGER.info("com if updated: " + str(obsw_config.G_COM_IF))
tmtcc_config.G_COM_IF = ComInterfaces(index)
LOGGER.info("com if updated: " + str(tmtcc_config.G_COM_IF))
def checkbox_console_print(state: int):
LOGGER.info(["enabled", "disabled"][state == 0] + " console print")
obsw_config.G_PRINT_TM = state == 0
tmtcc_config.G_PRINT_TM = state == 0
def checkbox_log_print(state: int):
LOGGER.info(["enabled", "disabled"][state == 0] + " print to log")
obsw_config.G_PRINT_TO_FILE = state == 0
tmtcc_config.G_PRINT_TO_FILE = state == 0
def checkbox_print_raw_data(state: int):
LOGGER.info(["enabled", "disabled"][state == 0] + " printing of raw data")
obsw_config.G_PRINT_RAW_TM = state == 0
tmtcc_config.G_PRINT_RAW_TM = state == 0
def checkbox_print_hk_data(state: int):
LOGGER.info(["enabled", "disabled"][state == 0] + " printing of hk data")
obsw_config.G_PRINT_HK_DATA = state == 0
tmtcc_config.G_PRINT_HK_DATA = state == 0
def checkbox_short_display_mode(state: int):
LOGGER.info(["enabled", "disabled"][state == 0] + " short display mode")
obsw_config.G_DISPLAY_MODE = ["short", "long"][state == 0]
tmtcc_config.G_DISPLAY_MODE = ["short", "long"][state == 0]
def number_timeout(value: float):
LOGGER.info("tm timeout changed to: " + str(value))
obsw_config.G_TM_TIMEOUT = value
LOGGER.info("pus_tm timeout changed to: " + str(value))
tmtcc_config.G_TM_TIMEOUT = value
def number_timeout_factor(value: float):
LOGGER.info("tm timeout factor changed to: " + str(value))
obsw_config.G_TC_SEND_TIMEOUT_FACTOR = value
LOGGER.info("pus_tm timeout factor changed to: " + str(value))
tmtcc_config.G_TC_SEND_TIMEOUT_FACTOR = value
def ip_change_client(value):
LOGGER.info("client ip changed: " + value)
obsw_config.G_REC_ADDRESS = (value, 2008)
tmtcc_config.G_REC_ADDRESS = (value, 2008)
def ip_change_board(value):
LOGGER.info("board ip changed: " + value)
obsw_config.G_SEND_ADDRESS = (value, 7)
\ No newline at end of file
tmtcc_config.G_SEND_ADDRESS = (value, 7)
\ No newline at end of file
File moved
......@@ -2,7 +2,7 @@
@brief Helper module to pack telecommand frames.
"""
from typing import List, Tuple
from tmtc_core.tc.obsw_pus_tc_base import PusTelecommand, PusTcInfo
from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand, PusTcInfo
from tmtc_core.utility.obsw_logger import get_logger
LOGGER = get_logger()
......
from typing import Union
from tmtc_core.tc.obsw_pus_tc_base import TcQueueT
from tc.obsw_tc_service8 import generate_action_command
from config.obsw_config import LED_TASK_ID
from tmtc_core.pus_tc.tmtcc_pus_tc_base import TcQueueT
from tmtc_core.pus_tc.tmtcc_tc_service8_functional_cmd import generate_action_command
from config.tmtcc_config import LED_TASK_ID
def pack_utility_command(service_queue: TcQueueT, op_code: Union[str, int]):
......
# -*- coding: utf-8 -*-
"""
@file obsw_tc_service5_17.py
@brief PUS Service 5: Event Service
PUS Service 17: Test Service
@author R. Mueller
@date 02.05.2020
"""
from tmtc_core.pus_tc.tmtcc_pus_tc_base import TcQueueT, PusTelecommand
from tmtc_core.pus_tc.tmtcc_tc_service17_test import pack_service17_ping_command
from tc.obsw_pus_tc_packer import TcQueueT, PusTelecommand
def pack_service5_test_into(tc_queue: TcQueueT) -> TcQueueT:
tc_queue.appendleft(("print", "Testing Service 5"))
# invalid subservice
tc_queue.appendleft(("print", "Testing Service 5: Invalid subservice"))
command = PusTelecommand(service=5, subservice=1, ssc=500)
tc_queue.appendleft(command.pack_command_tuple())
# disable events
tc_queue.appendleft(("print", "Testing Service 5: Disable event"))
command = PusTelecommand(service=5, subservice=6, ssc=500)
tc_queue.appendleft(command.pack_command_tuple())
# trigger event
tc_queue.appendleft(("print", "Testing Service 5: Trigger event"))
command = PusTelecommand(service=17, subservice=128, ssc=510)
tc_queue.appendleft(command.pack_command_tuple())
# enable event
tc_queue.appendleft(("print", "Testing Service 5: Enable event"))
command = PusTelecommand(service=5, subservice=5, ssc=520)
tc_queue.appendleft(command.pack_command_tuple())
# trigger event
tc_queue.appendleft(("print", "Testing Service 5: Trigger another event"))
command = PusTelecommand(service=17, subservice=128, ssc=530)
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft(("export", "log/tmtc_log_service5.txt"))
return tc_queue
def pack_service17_test_into(tc_queue: TcQueueT, op_code: int = 0) -> TcQueueT:
def pack_service17_test_into(tc_queue: TcQueueT, op_code: int = 0):
if op_code == 0:
tc_queue.appendleft(("print", "Testing Service 17"))
# ping test
tc_queue.appendleft(("print", "Testing Service 17: Ping Test"))
command = PusTelecommand(service=17, subservice=1, ssc=1700)
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft(pack_service17_ping_command(ssc=1700).pack_command_tuple())
# enable event
tc_queue.appendleft(("print", "Testing Service 17: Enable Event"))
command = PusTelecommand(service=5, subservice=5, ssc=52)
......@@ -77,4 +42,4 @@ def pack_enable_periodic_print_packet(tc_queue: TcQueueT, enable: bool, ssc: int
def pack_trigger_exception_packet(tc_queue: TcQueueT, ssc: int):
tc_queue.appendleft(("print", "Triggering software exception"))
command = PusTelecommand(service=17, subservice=150, ssc=ssc)
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft(command.pack_command_tuple())
\ No newline at end of file