diff --git a/.gitignore b/.gitignore index ff456d6912b537e7a9fb03f28635b2d78c449818..27cf7b0f5c713747eb3ed7a971a58e1f349097e8 100644 --- a/.gitignore +++ b/.gitignore @@ -1,12 +1,3 @@ -_build/* -_dependencies/* -_bin/* - -_bin -_obj -_dependencies - -.settings/* .settings __pycache__ @@ -17,6 +8,7 @@ __pycache__ *.tmp *.open *.ini +*.json generators/*.csv diff --git a/.idea/runConfigurations/tmtcclient_Copy_OBSW_Update.xml b/.idea/runConfigurations/tmtcclient_Copy_OBSW_Update.xml index 5da1e90b560d3818ec94438c15516f7203be48b3..b07441d03a83ada94617bcd82d942312e7f6dac5 100644 --- a/.idea/runConfigurations/tmtcclient_Copy_OBSW_Update.xml +++ b/.idea/runConfigurations/tmtcclient_Copy_OBSW_Update.xml @@ -13,7 +13,7 @@ <option name="ADD_SOURCE_ROOTS" value="true" /> <EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" /> <option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" /> - <option name="PARAMETERS" value="-m 3 -s Img -o A4U -c 1 -t 4" /> + <option name="PARAMETERS" value="-m 3 -s Img -o A4U -c 1 -t 6" /> <option name="SHOW_COMMAND_LINE" value="false" /> <option name="EMULATE_TERMINAL" value="true" /> <option name="MODULE_MODE" value="false" /> diff --git a/.idea/runConfigurations/tmtcclient_Disable_LEDs.xml b/.idea/runConfigurations/tmtcclient_Disable_LEDs.xml new file mode 100644 index 0000000000000000000000000000000000000000..d045fd7a55f6f7e37e684d06a4f2c1f40d928311 --- /dev/null +++ b/.idea/runConfigurations/tmtcclient_Disable_LEDs.xml @@ -0,0 +1,24 @@ +<component name="ProjectRunConfigurationManager"> + <configuration default="false" name="tmtcclient Disable LEDs" type="PythonConfigurationType" factoryName="Python" folderName="Serial Utility"> + <module name="tmtc" /> + <option name="INTERPRETER_OPTIONS" value="" /> + <option name="PARENT_ENVS" value="true" /> + <envs> + <env name="PYTHONUNBUFFERED" value="1" /> + </envs> + <option name="SDK_HOME" value="" /> + <option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" /> + <option name="IS_MODULE_SDK" value="true" /> + <option name="ADD_CONTENT_ROOTS" value="true" /> + <option name="ADD_SOURCE_ROOTS" value="true" /> + <EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" /> + <option name="SCRIPT_NAME" value="$PROJECT_DIR$/obsw_tmtc_client.py" /> + <option name="PARAMETERS" value="-m 3 -s led -o A1 -c 1 -t 2.2" /> + <option name="SHOW_COMMAND_LINE" value="false" /> + <option name="EMULATE_TERMINAL" value="true" /> + <option name="MODULE_MODE" value="false" /> + <option name="REDIRECT_INPUT" value="false" /> + <option name="INPUT_FILE" value="" /> + <method v="2" /> + </configuration> +</component> \ No newline at end of file diff --git a/.idea/runConfigurations/tmtcclient_Enable_LEDs.xml b/.idea/runConfigurations/tmtcclient_Enable_LEDs.xml new file mode 100644 index 0000000000000000000000000000000000000000..710c1251fd43b3df5bcd8ac92a1b1688c51a54c7 --- /dev/null +++ b/.idea/runConfigurations/tmtcclient_Enable_LEDs.xml @@ -0,0 +1,24 @@ +<component name="ProjectRunConfigurationManager"> + <configuration default="false" name="tmtcclient Enable LEDs" type="PythonConfigurationType" factoryName="Python" folderName="Serial Utility"> + <module name="tmtc" /> + <option name="INTERPRETER_OPTIONS" value="" /> + <option name="PARENT_ENVS" value="true" /> + <envs> + <env name="PYTHONUNBUFFERED" value="1" /> + </envs> + <option name="SDK_HOME" value="" /> + <option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" /> + <option name="IS_MODULE_SDK" value="true" /> + <option name="ADD_CONTENT_ROOTS" value="true" /> + <option name="ADD_SOURCE_ROOTS" value="true" /> + <EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" /> + <option name="SCRIPT_NAME" value="$PROJECT_DIR$/obsw_tmtc_client.py" /> + <option name="PARAMETERS" value="-m 3 -s led -o A0 -c 1 -t 2.2" /> + <option name="SHOW_COMMAND_LINE" value="false" /> + <option name="EMULATE_TERMINAL" value="true" /> + <option name="MODULE_MODE" value="false" /> + <option name="REDIRECT_INPUT" value="false" /> + <option name="INPUT_FILE" value="" /> + <method v="2" /> + </configuration> +</component> \ No newline at end of file diff --git a/.idea/runConfigurations/tmtcclient_Report_File_Attributes.xml b/.idea/runConfigurations/tmtcclient_Report_File_Attributes.xml new file mode 100644 index 0000000000000000000000000000000000000000..db863595c539f7dcd16ed7aea1dbaf1f4684622e --- /dev/null +++ b/.idea/runConfigurations/tmtcclient_Report_File_Attributes.xml @@ -0,0 +1,24 @@ +<component name="ProjectRunConfigurationManager"> + <configuration default="false" name="tmtcclient Report File Attributes" type="PythonConfigurationType" factoryName="Python" folderName="Serial FileManagement"> + <module name="tmtc" /> + <option name="INTERPRETER_OPTIONS" value="" /> + <option name="PARENT_ENVS" value="true" /> + <envs> + <env name="PYTHONUNBUFFERED" value="1" /> + </envs> + <option name="SDK_HOME" value="" /> + <option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" /> + <option name="IS_MODULE_SDK" value="true" /> + <option name="ADD_CONTENT_ROOTS" value="true" /> + <option name="ADD_SOURCE_ROOTS" value="true" /> + <EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" /> + <option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" /> + <option name="PARAMETERS" value="-m 3 -s SD -o 3 -c 1 -t 2.5" /> + <option name="SHOW_COMMAND_LINE" value="false" /> + <option name="EMULATE_TERMINAL" value="true" /> + <option name="MODULE_MODE" value="false" /> + <option name="REDIRECT_INPUT" value="false" /> + <option name="INPUT_FILE" value="" /> + <method v="2" /> + </configuration> +</component> \ No newline at end of file diff --git a/.idea/runConfigurations/tmtcclient_Service_3_Serial_.xml b/.idea/runConfigurations/tmtcclient_Service_3_IntErrorReporter_Test.xml similarity index 79% rename from .idea/runConfigurations/tmtcclient_Service_3_Serial_.xml rename to .idea/runConfigurations/tmtcclient_Service_3_IntErrorReporter_Test.xml index 15d51da4d1d1c373b763e2928324f7070bd1a14f..77e6aebdc72bfe6a1e11723e119564e132ddeb6a 100644 --- a/.idea/runConfigurations/tmtcclient_Service_3_Serial_.xml +++ b/.idea/runConfigurations/tmtcclient_Service_3_IntErrorReporter_Test.xml @@ -1,5 +1,5 @@ <component name="ProjectRunConfigurationManager"> - <configuration default="false" name="tmtcclient Service 3 Serial " type="PythonConfigurationType" factoryName="Python" folderName="Serial Service Test"> + <configuration default="false" name="tmtcclient Service 3 IntErrorReporter Test" type="PythonConfigurationType" factoryName="Python" folderName="Serial Service Test"> <module name="tmtc" /> <option name="INTERPRETER_OPTIONS" value="" /> <option name="PARENT_ENVS" value="true" /> @@ -13,7 +13,7 @@ <option name="ADD_SOURCE_ROOTS" value="true" /> <EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" /> <option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" /> - <option name="PARAMETERS" value="-m 3 -s 3 -c 1 -t 3 --hk" /> + <option name="PARAMETERS" value="-m 3 -s 3 -c 1 -t 3 -o ie --hk" /> <option name="SHOW_COMMAND_LINE" value="false" /> <option name="EMULATE_TERMINAL" value="false" /> <option name="MODULE_MODE" value="false" /> diff --git a/README.md b/README.md index 0c0ec34d7c3969a442abbceaff67a769d5fc6e3a..d915114e9c9c66a1c727688847b817903c0560aa 100644 --- a/README.md +++ b/README.md @@ -1,12 +1,23 @@ TMTC Client ==== +This client was developed by KSat for the SOURCE project to test the on-board software but +has evolved into a more generic tool for satellite developers to perform TMTC (Telemetry and Telecommand) +handling and testing via different communication interfaces. Currently, only the PUS standard is +implemented as a packet standard. This tool can be used either as a command line tool +or as a GUI tool. + +This client currently supports the following communication interfaces: +1. Ethernet, UDP packets +2. Serial Communication +3. QEMU + ## Cloning the repository It is recommended to fork this repository for new missions. The `tmtc_core` folder contains most generic software components while this repository is based on the SOURCE mission. However, this repostiory is a good starting point and example -implementation. +implementation which can be adapted easily to other projects. Fork the repository or clone it with following command: ```sh @@ -26,19 +37,19 @@ pip install -r requirements.txt Now the script can be tested by running ```sh -python obsw_tmtc_client.py -h +python tmtc_client_cli.py -h ``` It is recommended to use and setup PyCharm to also use the preconfigured run configurations. - ## Prerequisites -Runs with Python 3.8. +Tested with Python 3.8. Don't use Python 2.x! Manual installation of crcmod and pyserial might be needed. It is recommended to install use PyCharm to run this client. Students of the Uni Stuttgart usually have access to PyCharm Professional. + 1. Install pip for used Python distribution if it is not installed yet 2. Install crcmod and all other required packages if using system python compiler. If using the local venv as the compiler, these packages @@ -60,62 +71,80 @@ sudo apt-get install python-tk On Windows, the package should be included. ## How To Use + +It is recommended to use PyCharm and load the run configurations +to have a starting point. PyCharm also provided the option +of remote deployment, which allows TMTC testing with remote +setups (e.g. flatsat setup in a cleanroom). + +Some configuration constants might be stored in a JSON file in the +config folder. To reset the configuration, delete the JSON file. + +### Command line mode The script can be used by specifying command line parameters. -Please run this script with the -h flag -or without any command line parameters to display options. -The client can be used to send via Ethernet or serial port. - -GUI is work-in-progress. -It might be necessary to set board or PC IP address if using ethernet communication. -Default values should work normally though. - -## Examples -Example command to test service 17, -assuming no set client IP (set manually to PC IP Address if necessary). IP Address or port numbers have to be -set manually in the global configuration file (IP address can be passed as a command line argument): -```sh -obsw_tmtc_client.py -m 3 -s 17 -``` -Example to run Unit Test: +Please run this script with the -h flag or without any command line parameters to +display options. + +### Import run configurations in PyCharm +The PyCharm IDE can be used to comfortably manage a set of run configuations +(for example tests for different services). These configurations were shared +through the version control system git and should be imported automatically. +If these configurations dont show up, try to open the tmtc folder as +a new PyCharm project in a new window. + +To add new configurations, go to Edit Configurations... +at the top right corner in the drop-down menu. +Specify the new run configurations and set a tick at Share through VCS. + +### Examples +Example command to send a ping command. Specify the communication interface by +adding `-c <number>` to the command. ```sh -obsw_tmtc_client.py -m 5 +tmtc_client_cli.py -m 3 -s 17 ``` -Example to test service 3 with serial communication, printing all housekeeping packets, -COM port needs to be typed in manually (or set with --COM \<COM PORT>): + +Example to run listener mode ```sh -obsw_tmtc_client.py -m 3 -s 3 --hk -c +tmtc_client_cli.py -m 1 ``` -## Modes -There are different modes. Run the client with the `-h` flag -to display the modes. +## Architectural notes + +Some additional information about the structure of this Python program +are provided here. -## Ethernet Communication +### Modes and the TMTC queue +There are different communication modes. Run the client with the `-h` flag +to display the modes. The TMTC commander is able to send multiple telecommands +sequentially by using a provided queue. The queue is filled by the +developer. Some examples can be found in the `tc` folder. The queue +can also be filled with special commands, for example a wait command or +a print command. -Port needs to be specified manually for now, IP adresses can be set in -command line. +This application is also able to listen to telemetry packets in a separate thread. +The specific TM handling is also implemented by the developer. Some +examples can be found in the `tm` folder. -### Issues -If there are problems receiving packets, use the tool Wireshark to track -ethernet communication for UDP echo packets (requests and response). -If the packets appear, there might be a problematic firewall setting. -Please ensure that python.exe UDP packets are not blocked in advanced firewall -settings and create a rule to allow packets from port 2008. +### Communication Interfaces -## Serial Communication -Serial communication was implemented and is tested for Windwos 10 and Ubuntu 20.04. +The communication interfaces decouple the used interface from the communication +logic. This enables to write the telecommand and telemetry specifications +without worrying about the used communication interface. + +#### Serial Communication +Serial communication was implemented and is tested for Windows 10 and Ubuntu 20.04. It requires the PySerial package installed. It should be noted that there are several modes for the serial communication. There is a fixed frame mode and a mode based on a simple DLE transport layer. When using the DLE transport layer, sent packets are encoded with DLE while received packets need to be DLE encoded. -## Module Test -Includes a moduel tester which sends TCs in a queue and automatically -analyzes the replies. This is the best way to test the functionality of the -software right now as a software internal TC injector has not been implemented -yet for the FSFW. -Some more information will follow on how to write Unit Tests. +## Issues + +### Ethernet Communication +If there are issued with the Ethernet communcation, +there might be a problematic firewall setting. +It might be necessary to allow UDP packets on certain ports ## Developers Information Code Style: [PEP8](https://www.python.org/dev/peps/pep-0008/). @@ -126,14 +155,3 @@ Install it with pip and then install and set-up the Pylint plugin in PyCharm. There are a lot of features which would be nice, for example a GUI. The architecture of the program should allow extension like that without too many issues, as the sending and telemetry listening are decoupled. - -## Import run configurations in PyCharm -The PyCharm IDE can be used to comfortably manage a set of run configuations -(for example tests for different services). These configurations were shared -through the version control system git and should be imported automatically. -If these configurations dont show up, try to open the tmtc folder as -a new PyCharm project in a new window. - -To add new configurations, go to Edit Configurations... -at the top right corner in the drop-down menu. -Specify the new run configurations and set a tick at Share through VCS. diff --git a/SOURCEbadge.png b/SOURCEbadge.png new file mode 100644 index 0000000000000000000000000000000000000000..d02b2745e8a8fec61abdf300d4caf0bef2d19453 Binary files /dev/null and b/SOURCEbadge.png differ diff --git a/config/obsw_com_config.py b/config/obsw_com_config.py deleted file mode 100644 index 0d45593ee4d216993bc43b1dc0daf1f79d857681..0000000000000000000000000000000000000000 --- a/config/obsw_com_config.py +++ /dev/null @@ -1,60 +0,0 @@ -""" -Set-up function. Initiates the communication interface. -""" -import sys -from typing import Union - -from tmtc_core.comIF.obsw_com_interface import CommunicationInterface -from tmtc_core.comIF.obsw_dummy_com_if import DummyComIF -from tmtc_core.comIF.obsw_ethernet_com_if import EthernetComIF -from tmtc_core.comIF.obsw_serial_com_if import SerialComIF, SerialCommunicationType -from tmtc_core.comIF.obsw_qemu_com_if import QEMUComIF - -from tmtc_core.utility.obsw_logger import get_logger -from tmtc_core.utility.obsw_tmtc_printer import TmTcPrinter - -import config.obsw_config as g - -LOGGER = get_logger() - - -def set_communication_interface(tmtc_printer: TmTcPrinter) -> Union[CommunicationInterface, None]: - """ - Return the desired communication interface object - :param tmtc_printer: TmTcPrinter object. - :return: CommunicationInterface object - """ - try: - if g.G_COM_IF == g.ComInterfaces.Ethernet: - communication_interface = EthernetComIF( - tmtc_printer=tmtc_printer, tm_timeout=g.G_TM_TIMEOUT, - tc_timeout_factor=g.G_TC_SEND_TIMEOUT_FACTOR, send_address=g.G_ETHERNET_SEND_ADDRESS, - receive_address=g.G_ETHERNET_RECV_ADDRESS) - elif g.G_COM_IF == g.ComInterfaces.Serial: - serial_baudrate = g.G_SERIAL_BAUDRATE - serial_timeout = g.G_SERIAL_TIMEOUT - communication_interface = SerialComIF( - tmtc_printer=tmtc_printer, com_port=g.G_COM_PORT, baud_rate=serial_baudrate, - serial_timeout=serial_timeout, - ser_com_type=SerialCommunicationType.DLE_ENCODING) - communication_interface.set_dle_settings( - g.G_SERIAL_DLE_MAX_QUEUE_LEN, g.G_SERIAL_DLE_MAX_FRAME_SIZE, serial_timeout) - elif g.G_COM_IF == g.ComInterfaces.QEMU: - serial_timeout = g.G_SERIAL_TIMEOUT - communication_interface = QEMUComIF( - tmtc_printer=tmtc_printer, - serial_timeout=serial_timeout, - ser_com_type=SerialCommunicationType.DLE_ENCODING) - communication_interface.set_dle_settings( - g.G_SERIAL_DLE_MAX_QUEUE_LEN, g.G_SERIAL_DLE_MAX_FRAME_SIZE, serial_timeout) - else: - communication_interface = DummyComIF(tmtc_printer=tmtc_printer) - if not communication_interface.valid: - LOGGER.warning("Invalid communication interface!") - sys.exit() - communication_interface.initialize() - return communication_interface - except (IOError, OSError): - LOGGER.error("Error setting up communication interface") - LOGGER.exception("Error") - sys.exit() diff --git a/config/tmtcc_com_config.py b/config/tmtcc_com_config.py new file mode 100644 index 0000000000000000000000000000000000000000..bb4bdffc1855acbe3e2fc2b99818b62ef7cbe047 --- /dev/null +++ b/config/tmtcc_com_config.py @@ -0,0 +1,130 @@ +""" +Set-up function. Initiates the communication interface. +""" +import json +import os +import sys +from typing import Union + +import serial +import serial.tools.list_ports + +from tmtc_core.comIF.obsw_com_interface import CommunicationInterface +from tmtc_core.comIF.obsw_dummy_com_if import DummyComIF +from tmtc_core.comIF.obsw_ethernet_com_if import EthernetComIF +from tmtc_core.comIF.obsw_serial_com_if import SerialComIF, SerialCommunicationType +from tmtc_core.comIF.obsw_qemu_com_if import QEMUComIF + +from tmtc_core.utility.obsw_logger import get_logger +from tmtc_core.utility.obsw_tmtc_printer import TmTcPrinter + +import config.tmtcc_config as g + +LOGGER = get_logger() + + +def set_communication_interface(tmtc_printer: TmTcPrinter) -> Union[CommunicationInterface, None]: + """ + Return the desired communication interface object + :param tmtc_printer: TmTcPrinter object. + :return: CommunicationInterface object + """ + try: + if g.G_COM_IF == g.ComInterfaces.Ethernet: + communication_interface = EthernetComIF( + tmtc_printer=tmtc_printer, tm_timeout=g.G_TM_TIMEOUT, + tc_timeout_factor=g.G_TC_SEND_TIMEOUT_FACTOR, + send_address=g.G_ETHERNET_SEND_ADDRESS, + receive_address=g.G_ETHERNET_RECV_ADDRESS) + elif g.G_COM_IF == g.ComInterfaces.Serial: + serial_baudrate = g.G_SERIAL_BAUDRATE + serial_timeout = g.G_SERIAL_TIMEOUT + # Determine COM port, either extract from JSON file or ask from user. + com_port = determine_com_port() + communication_interface = SerialComIF( + tmtc_printer=tmtc_printer, com_port=com_port, baud_rate=serial_baudrate, + serial_timeout=serial_timeout, + ser_com_type=SerialCommunicationType.DLE_ENCODING) + communication_interface.set_dle_settings( + g.G_SERIAL_DLE_MAX_QUEUE_LEN, g.G_SERIAL_DLE_MAX_FRAME_SIZE, serial_timeout) + elif g.G_COM_IF == g.ComInterfaces.QEMU: + serial_timeout = g.G_SERIAL_TIMEOUT + communication_interface = QEMUComIF( + tmtc_printer=tmtc_printer, + serial_timeout=serial_timeout, + ser_com_type=SerialCommunicationType.DLE_ENCODING) + communication_interface.set_dle_settings( + g.G_SERIAL_DLE_MAX_QUEUE_LEN, g.G_SERIAL_DLE_MAX_FRAME_SIZE, serial_timeout) + else: + communication_interface = DummyComIF(tmtc_printer=tmtc_printer) + if not communication_interface.valid: + LOGGER.warning("Invalid communication interface!") + sys.exit() + communication_interface.initialize() + return communication_interface + except (IOError, OSError): + LOGGER.error("Error setting up communication interface") + LOGGER.exception("Error") + sys.exit() + + +def determine_com_port() -> str: + reconfigure_com_port = False + com_port = "" + + if os.path.isfile("config/tmtcc_config.json"): + with open("config/tmtcc_config.json", "r") as write: + load_data = json.load(write) + com_port = load_data["COM_PORT"] + if not check_port_validity(com_port): + reconfigure = input( + "COM port from configuration file not contained within serial" + "port list. Reconfigure serial port? [y/n]: ") + if reconfigure.lower() in ['y', "yes"]: + write.close() + os.remove("config/tmtcc_config.json") + reconfigure_com_port = True + else: + reconfigure_com_port = True + + if reconfigure_com_port: + com_port = prompt_com_port() + save_to_json = input("Do you want to store serial port to " + "configuration? (y/n): ") + if save_to_json.lower() in ['y', "yes"]: + with open("config/tmtcc_config.json", "w") as write: + json.dump(dict(COM_PORT=com_port), write, indent=4) + return com_port + + +def prompt_com_port() -> str: + while True: + com_port = input( + "Configuring serial port. Please enter COM Port" + "(enter h to display list of COM ports): ") + if com_port == 'h': + ports = serial.tools.list_ports.comports() + for port, desc, hwid in sorted(ports): + print("{}: {} [{}]".format(port, desc, hwid)) + else: + if not check_port_validity(com_port): + print("Serial port not in list of available serial ports. Try again? [y/n]") + try_again = input() + if try_again.lower() in ['y', "yes"]: + continue + else: + break + else: + break + return com_port + + +def check_port_validity(com_port_to_check: str) -> bool: + port_list = [] + ports = serial.tools.list_ports.comports() + for port, desc, hwid in sorted(ports): + port_list.append(port) + if com_port_to_check not in port_list: + return False + return True + diff --git a/config/obsw_config.py b/config/tmtcc_config.py similarity index 76% rename from config/obsw_config.py rename to config/tmtcc_config.py index 60b761db14e45ba9a9f40b0534969dfa172b98fe..389ba4c6cf4d489c96978cc20e3016ac81c194ff 100644 --- a/config/obsw_config.py +++ b/config/tmtcc_config.py @@ -1,21 +1,23 @@ """ -@file - obsw_config.py -@date - 01.11.2019 -@brief - Global settings for UDP client +@file tmtcc_config.py +@date 01.11.2019 +@brief Global settings for TMTC commander. """ import struct import pprint import logging -from config.obsw_definitions import ModeList, ComInterfaces +from config.tmtcc_definitions import ModeList +from tmtc_core.tmtc_core_definitions import ComInterfaces """ Mission/Device specific information. """ +# TODO: Automate / Autofill this file with the MIB parser +# TODO: JSON file so we can store variables which are not tracked by version control but still +# remain the same on a machine (e.g. COM port or IP addresses) + # Object IDs # Commands @@ -58,7 +60,6 @@ G_MAX_APP_DATA_LENGTH = G_MAX_BINARY_FRAME_LENGTH - 100 G_COM_IF: ComInterfaces = ComInterfaces.QEMU # COM Port for serial communication -G_COM_PORT = 'COM0' G_SERIAL_TIMEOUT = 0.01 G_SERIAL_BAUDRATE = 230400 G_SERIAL_FRAME_SIZE = 256 @@ -94,10 +95,11 @@ G_TM_LISTENER = None G_COM_INTERFACE = None G_TMTC_PRINTER = None + # noinspection PyUnusedLocal def set_globals(args): global G_ETHERNET_RECV_ADDRESS, G_ETHERNET_SEND_ADDRESS, G_SCRIPT_MODE, G_MODE_ID, G_SERVICE, \ - G_DISPLAY_MODE, G_COM_IF, G_COM_PORT, G_SERIAL_TIMEOUT, G_TM_TIMEOUT, \ + G_DISPLAY_MODE, G_COM_IF, G_SERIAL_TIMEOUT, G_TM_TIMEOUT, \ G_TC_SEND_TIMEOUT_FACTOR, G_PRINT_TO_FILE, G_PRINT_HK_DATA, G_PRINT_RAW_TM, G_PRINT_TM, \ G_OP_CODE, G_RESEND_TC, G_LISTENER_AFTER_OP @@ -149,11 +151,40 @@ def set_globals(args): G_PRINT_TM = args.print_tm G_PRINT_TO_FILE = args.print_log G_PRINT_RAW_TM = args.rawDataPrint - G_COM_PORT = args.com_port G_TM_TIMEOUT = args.tm_timeout G_RESEND_TC = args.resend_tc G_LISTENER_AFTER_OP = args.listener - from config.obsw_user_code import global_setup_hook + from config.tmtcc_user_code import global_setup_hook global_setup_hook() + +def set_glob_apid(new_apid: int): + global G_APID + G_APID = new_apid + + +def get_glob_apid(): + global G_APID + return G_APID + + +def set_glob_com_if(new_com_if: ComInterfaces): + global G_COM_IF + G_COM_IF = new_com_if + + +def get_glob_com_if(): + global G_COM_IF + return G_COM_IF + + +def set_glob_mode(new_mode : ModeList): + global G_MODE_ID + G_MODE_ID = new_mode + + +def get_glob_mode(): + global G_MODE_ID + return G_MODE_ID + diff --git a/config/obsw_definitions.py b/config/tmtcc_definitions.py similarity index 63% rename from config/obsw_definitions.py rename to config/tmtcc_definitions.py index 3aff14867aa39318c7bfa8907fbcf511c7ea4ea8..0b7748ac2dff1f16e3edd69588af0185ab95bd65 100644 --- a/config/obsw_definitions.py +++ b/config/tmtcc_definitions.py @@ -1,7 +1,4 @@ import enum -from typing import Tuple - -ethernetAddressT = Tuple[str, int] # Mode options, set by args parser @@ -17,8 +14,3 @@ class ModeList(enum.Enum): PromptMode = 32 -class ComInterfaces(enum.Enum): - Dummy = 0 - Serial = 1 - QEMU = 2 - Ethernet = 3 diff --git a/config/obsw_user_code.py b/config/tmtcc_user_code.py similarity index 60% rename from config/obsw_user_code.py rename to config/tmtcc_user_code.py index 4cfc48eba6b4266772ca1db8db9f2dbcb8781390..fb5e6fb72b7a0d2c5515ed05a5554e41a3863e3f 100644 --- a/config/obsw_user_code.py +++ b/config/tmtcc_user_code.py @@ -2,11 +2,14 @@ User defined code can be added here. """ from typing import Union, Tuple -from tmtc_core.tc.obsw_pus_tc_base import PusTcInfo +from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTcInfo +from tmtc_core.tmtc_core_definitions import ComInterfaces from enum import Enum -# Yeah, I did not have a better idea yet.. +# TODO: We really need that JSON file which will not be tracked by version control. +# In that JSON file, we could set the current developer, so that developers can +# hook into the config. class Developer(Enum): Robin = 0 @@ -22,24 +25,24 @@ def command_preparation_hook() -> Tuple[bytearray, Union[None, PusTcInfo]]: return prepare_robins_commands() +def prepare_robins_commands(): + from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand + command = PusTelecommand(service=17, subservice=1, ssc=20) + return command.pack_command_tuple() + def global_setup_hook(): """ Can be used to alter the global variables in a custom defined way. For example, device specific com ports or ethernet ports can be set here. The global variables in the config.obsw_config file can be edited here by using the handle. - For example: config.obsw_config.G_ETHERNET_SEND_ADDRESS = new_send_address """ if Developer == Developer.Robin: global_setup_hook_robin() -def prepare_robins_commands(): - from tmtc_core.tc.obsw_pus_tc_base import PusTelecommand - command = PusTelecommand(service=17, subservice=1, ssc=20) - return command.pack_command_tuple() - - def global_setup_hook_robin(): - import config.obsw_config - pass + from config.tmtcc_config import get_glob_com_if, set_glob_apid + if get_glob_com_if() == ComInterfaces.Ethernet: + # Configure APID for FSFW example. Set this back to 0x73 for STM32! + set_glob_apid(0xEF) diff --git a/config/tmtcc_version.py b/config/tmtcc_version.py new file mode 100644 index 0000000000000000000000000000000000000000..c8a46e69fd5d5588edc5819913367e2b82a6cdfa --- /dev/null +++ b/config/tmtcc_version.py @@ -0,0 +1,7 @@ +""" +@brief Version file +""" + +SW_NAME = "tmtcc" +SW_VERSION = 1 +SW_SUBVERSION = 1 diff --git a/core/tmtc_backend.py b/core/tmtc_backend.py index 9580eeed34db4de1e1cc0475c87b2bf9b389b457..05eb615a98720e2e4e7d1173b319edbe6dc85f40 100644 --- a/core/tmtc_backend.py +++ b/core/tmtc_backend.py @@ -6,41 +6,41 @@ from multiprocessing import Process from collections import deque from typing import Tuple, Union -from config import obsw_config as g -from config.obsw_definitions import ModeList -from config.obsw_user_code import command_preparation_hook - +from tmtc_core.tmtc_core_definitions import ComInterfaces from tmtc_core.utility.obsw_logger import get_logger -from tmtc_core.tc.obsw_pus_tc_base import PusTcInfo +from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTcInfo from tmtc_core.sendreceive.obsw_single_command_sender_receiver import SingleCommandSenderReceiver from tmtc_core.sendreceive.obsw_sequential_sender_receiver import SequentialCommandSenderReceiver from tmtc_core.sendreceive.obsw_tm_listener import TmListener from tmtc_core.comIF.obsw_com_interface import CommunicationInterface from tmtc_core.utility.obsw_tmtc_printer import TmTcPrinter from tmtc_core.utility.obsw_exit_handler import keyboard_interrupt_handler +from tmtc_core.pus_tc.tmtcc_pus_tc_packer import ServiceQueuePacker -from tc.obsw_pus_tc_packer import ServiceQueuePacker, create_total_tc_queue +from config.tmtcc_com_config import set_communication_interface +from config.tmtcc_definitions import ModeList +from pus_tc.tmtcc_tc_packer_hook import create_total_tc_queue from test.obsw_pus_service_test import run_selected_pus_tests -from config.obsw_com_config import set_communication_interface -from utility.obsw_binary_uploader import BinaryFileUploader +from utility.tmtcc_binary_uploader import BinaryFileUploader LOGGER = get_logger() - class TmTcHandler: """ This is the primary class which handles TMTC reception. This can be seen as the backend in case a GUI or front-end is implemented. """ - def __init__(self, init_mode: ModeList = ModeList.ListenerMode): + def __init__(self, init_com_if: ComInterfaces = ComInterfaces.Dummy, + init_mode: ModeList = ModeList.ListenerMode): self.mode = init_mode - self.com_if = g.G_COM_IF + self.com_if = init_com_if # This flag could be used later to command the TMTC Client with a front-end self.one_shot_operation = True self.tmtc_printer: Union[None, TmTcPrinter] = None self.communication_interface: Union[None, CommunicationInterface] = None self.tm_listener: Union[None, TmListener] = None + self.exit_on_com_if_init_failure = True self.single_command_package: Tuple[bytearray, Union[None, PusTcInfo]] = bytearray(), None @@ -57,18 +57,27 @@ class TmTcHandler: """ self.mode = mode + def set_com_if(self, com_if: ComInterfaces): + self.com_if = com_if + @staticmethod - def prepare_tmtc_handler_start(init_mode: ModeList = g.ModeList.ListenerMode): - tmtc_handler = TmTcHandler(init_mode) + def prepare_tmtc_handler_start( + init_com_if: ComInterfaces = ComInterfaces.Dummy, + init_mode: ModeList = ModeList.ListenerMode): + tmtc_handler = TmTcHandler(init_com_if, init_mode) tmtc_task = Process(target=TmTcHandler.start_handler, args=(tmtc_handler, )) return tmtc_task @staticmethod def start_handler(executed_handler): + if not isinstance(executed_handler, TmTcHandler): + LOGGER.error("Unexpected argument, should be TmTcHandler!") + sys.exit(1) executed_handler.initialize() - executed_handler.perform_operation() + executed_handler.start() def initialize(self): + from config import tmtcc_config as g """ Perform initialization steps which might be necessary after class construction. This has to be called at some point before using the class! @@ -79,12 +88,19 @@ class TmTcHandler: com_interface=self.communication_interface, tm_timeout=g.G_TM_TIMEOUT, tc_timeout_factor=g.G_TC_SEND_TIMEOUT_FACTOR ) - if self.communication_interface.valid: + atexit.register(keyboard_interrupt_handler, com_interface=self.communication_interface) + + def start(self): + try: + self.communication_interface.open() self.tm_listener.start() - else: - LOGGER.info("No communication interface set for now") + except IOError: + LOGGER.error("Communication Interface could not be opened!") LOGGER.info("TM listener will not be started") - atexit.register(keyboard_interrupt_handler, com_interface=self.communication_interface) + if self.exit_on_com_if_init_failure: + LOGGER.error("Closing TMTC commander..") + sys.exit(1) + self.perform_operation() def perform_operation(self): """ @@ -95,7 +111,7 @@ class TmTcHandler: except KeyboardInterrupt: LOGGER.info("Keyboard Interrupt.") sys.exit() - except IOError as e: + except IOError: LOGGER.error("IO Error occured!") sys.exit() @@ -103,34 +119,35 @@ class TmTcHandler: """ Command handling. """ - if self.mode == g.ModeList.PromptMode: + if self.mode == ModeList.PromptMode: self.prompt_mode() - if self.mode == g.ModeList.ListenerMode: + if self.mode == ModeList.ListenerMode: if self.tm_listener.reply_event(): LOGGER.info("TmTcHandler: Packets received.") self.tmtc_printer.print_telemetry_queue(self.tm_listener.retrieve_tm_packet_queue()) self.tm_listener.clear_tm_packet_queue() self.tm_listener.clear_reply_event() - elif self.mode == g.ModeList.SingleCommandMode: + elif self.mode == ModeList.SingleCommandMode: if self.single_command_package is None: pus_packet_tuple = command_preparation() else: - LOGGER.info("send package from gui") + LOGGER.info("Sending single packet from GUI..") pus_packet_tuple = self.single_command_package sender_and_receiver = SingleCommandSenderReceiver( com_interface=self.communication_interface, tmtc_printer=self.tmtc_printer, tm_listener=self.tm_listener) - LOGGER.info("Performing single command operation") + LOGGER.info("Performing single command operation..") sender_and_receiver.send_single_tc_and_receive_tm(pus_packet_tuple=pus_packet_tuple) - self.mode = g.ModeList.PromptMode + self.mode = ModeList.PromptMode - elif self.mode == g.ModeList.ServiceTestMode: + elif self.mode == ModeList.ServiceTestMode: + from config import tmtcc_config as g service_queue = deque() service_queue_packer = ServiceQueuePacker() op_code = g.G_OP_CODE - service_queue_packer.pack_service_queue( + service_queue_packer.pack_service_queue_core( service=g.G_SERVICE, service_queue=service_queue, op_code=op_code) if not self.communication_interface.valid: return @@ -151,15 +168,16 @@ class TmTcHandler: LOGGER.info("SequentialSenderReceiver: Exporting output to log file.") self.tmtc_printer.print_file_buffer_list_to_file("log/tmtc_log.txt", True) - elif self.mode == g.ModeList.BinaryUploadMode: + elif self.mode == ModeList.BinaryUploadMode: # Upload binary, prompt user for input, in the end prompt for new mode and enter that # mode file_uploader = BinaryFileUploader(self.communication_interface, self.tmtc_printer, self.tm_listener) file_uploader.perform_file_upload() - self.mode = g.ModeList.ListenerMode + self.mode = ModeList.ListenerMode - elif self.mode == g.ModeList.UnitTest: + elif self.mode == ModeList.UnitTest: + from config import tmtcc_config as g # Set up test suite and run it with runner. Verbosity specifies detail level g.G_TM_LISTENER = self.tm_listener g.G_COM_INTERFACE = self.communication_interface @@ -172,15 +190,15 @@ class TmTcHandler: sys.exit() def __core_operation(self, one_shot): - if self.mode == g.ModeList.ListenerMode: + if self.mode == ModeList.ListenerMode: one_shot = False if not one_shot: while True: self.__handle_action() - if self.mode == g.ModeList.Idle: + if self.mode == ModeList.Idle: LOGGER.info("TMTC Client in idle mode") time.sleep(5) - elif self.mode == g.ModeList.ListenerMode: + elif self.mode == ModeList.ListenerMode: time.sleep(1) else: self.__handle_action() @@ -219,4 +237,10 @@ def command_preparation() -> Tuple[bytearray, Union[None, PusTcInfo]]: Prepare command for single command testing :return: """ + try: + from config.tmtcc_user_code import command_preparation_hook + except ImportError as e: + print(e) + LOGGER.error("Hook function for command application not implemented!") + sys.exit(1) return command_preparation_hook() diff --git a/core/tmtc_client_core.py b/core/tmtc_client_core.py index 08b93b1296755014784b019de34c275ae927ead7..4ef4aec1e4c29b5f9bcd5bfb7c4ef317b40fdff7 100755 --- a/core/tmtc_client_core.py +++ b/core/tmtc_client_core.py @@ -1,75 +1,60 @@ #!/usr/bin/python3 """ -@brief This client was developed by KSat for the SOURCE project to test the on-board software. +@brief Core method called by entry point files to initiate the TMTC commander @details -This client features multiple sender/receiver modes and has been designed -to be extensible and easy to use. This clien is based on the PUS standard for the format -of telecommands and telemetry. It can also send TMTC via different interfaces like the -serial interface (USB port) or ethernet interface. - -@license -Copyright 2020 KSat e.V. Stuttgart - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. - @manual -Run this file with the -h flag to display options. +@author R. Mueller """ -from multiprocessing import Process +import sys + +from config.tmtcc_version import SW_VERSION, SW_SUBVERSION from tmtc_core.utility.obsw_logger import set_tmtc_logger, get_logger -from config.obsw_config import set_globals -import config.obsw_config as g +from config.tmtcc_config import set_globals +import config.tmtcc_config as g from core.tmtc_backend import TmTcHandler from core.tmtc_frontend import TmTcFrontend -from utility.obsw_args_parser import parse_input_arguments +from utility.tmtcc_args_parser import parse_input_arguments LOGGER = get_logger() -def run_tmtc_client(use_gui: bool): + +def run_tmtc_client(use_gui: bool, reduced_printout: bool = False): """ Main method, reads input arguments, sets global variables and start TMTC handler. """ + if not reduced_printout: + print("-- Python TMTC Commander --") + if use_gui: + print("-- GUI mode --") + else: + print("-- Command line mode --") + print("-- Software version v" + str(SW_VERSION) + "." + str(SW_SUBVERSION) + " --") set_tmtc_logger() - LOGGER.info("Starting TMTC Client") + LOGGER.info("Starting TMTC Client..") if not use_gui: - LOGGER.info("Parsing input arguments") + LOGGER.info("Parsing input arguments..") args = parse_input_arguments() - LOGGER.info("Setting global variables") + LOGGER.info("Setting global variables..") set_globals(args) - LOGGER.info("Starting TMTC Handler") + LOGGER.info("Starting TMTC Handler..") - tmtc_frontend_task = Process - - # Currently does not work, problems with QEMU / Asyncio if not use_gui: - tmtc_handler = TmTcHandler(g.G_MODE_ID) + # The global variables are set by the argument parser. + tmtc_handler = TmTcHandler(g.get_glob_com_if(), g.get_glob_mode()) tmtc_handler.set_one_shot_or_loop_handling(g.G_LISTENER_AFTER_OP) tmtc_handler.initialize() - tmtc_handler.perform_operation() + tmtc_handler.start() else: + from PyQt5.QtWidgets import QApplication + app = QApplication(["TMTC Commander"]) tmtc_gui = TmTcFrontend() tmtc_gui.start_ui() - # tmtc_handler_task = TmTcHandler.prepare_tmtc_handler_start() - # tmtc_frontend = TmTcFrontend() - # tmtc_frontend_task = tmtc_frontend.prepare_start(tmtc_frontend) - # tmtc_frontend_task.start() - # tmtc_handler_task.start() - # tmtc_handler_task.join() - # tmtc_frontend_task.join() + sys.exit(app.exec_()) + diff --git a/core/tmtc_frontend.py b/core/tmtc_frontend.py index 9a5908a2cc24732126de974caf32f2df8df9480c..3c0c96e911557a9bf1e4c6c03f6d2d48d603b430 100644 --- a/core/tmtc_frontend.py +++ b/core/tmtc_frontend.py @@ -7,15 +7,18 @@ @manual @author R. Mueller, P. Scheurenbrand """ +import threading from multiprocessing import Process from PyQt5.QtWidgets import * +from PyQt5.QtGui import QPixmap, QIcon + from core.tmtc_backend import TmTcHandler -from tmtc_core.tc.obsw_pus_tc_base import PusTelecommand +from config import tmtcc_config + +from tmtc_core.tmtc_core_definitions import ComInterfaces +from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand from tmtc_core.utility.obsw_logger import get_logger -from config import obsw_config -from config.obsw_definitions import ComInterfaces -import threading LOGGER = get_logger() @@ -23,10 +26,11 @@ LOGGER = get_logger() TODO: Make it look nicer. Add SOURCE or KSat logo. """ -class TmTcFrontend: - # TODO: this list should probably be inside an enum in the obsw_config.py - serviceList = [ 2, 3, 5, 8, 9, 17, 20, 200, "Dummy", "GPS0", "GPS1" ] # , "Error"] +class TmTcFrontend(QMainWindow): + + # TODO: this list should probably be inside an enum in the tmtcc_config.py + serviceList = [2, 3, 5, 8, 9, 17, 20, 200, "Dummy", "GPS0", "GPS1"] # , "Error"] service_test_button: QPushButton single_command_button: QPushButton @@ -40,19 +44,20 @@ class TmTcFrontend: is_busy: bool def __init__(self): + super(TmTcFrontend, self).__init__() self.tmtc_handler = TmTcHandler() # TODO: Perform initialization on button press with specified ComIF # Also, when changing ComIF, ensure that old ComIF is closed (e.g. with printout) # Lock all other elements while ComIF is invalid. self.tmtc_handler.initialize() - obsw_config.G_SERVICE = 17 - obsw_config.G_COM_IF = obsw_config.ComInterfaces.QEMU + tmtcc_config.G_SERVICE = 17 + tmtcc_config.G_COM_IF = tmtcc_config.ComInterfaces.QEMU def prepare_start(self, args: any) -> Process: return Process(target=self.start_ui) def service_index_changed(self, index: int): - obsw_config.G_SERVICE = self.serviceList[index] + tmtcc_config.G_SERVICE = self.serviceList[index] LOGGER.info("service_test_mode_selection updated: " + str(self.serviceList[index])) def single_command_set_service(self, value): @@ -66,8 +71,8 @@ class TmTcFrontend: def start_service_test_clicked(self): LOGGER.info("start service test button pressed") - LOGGER.info("start testing service: " + str(obsw_config.G_SERVICE)) - self.tmtc_handler.mode = obsw_config.ModeList.ServiceTestMode + LOGGER.info("start testing service: " + str(tmtcc_config.G_SERVICE)) + self.tmtc_handler.mode = tmtcc_config.ModeList.ServiceTestMode # start the action in a new process p = threading.Thread(target=self.handle_tm_tc_action) p.start() @@ -76,9 +81,9 @@ class TmTcFrontend: LOGGER.info("send single command pressed") # parse the values from the table - #service = int(self.commandTable.item(0, 0).text()) - #subservice = int(self.commandTable.item(0, 1).text()) - #ssc = int(self.commandTable.item(0, 2).text()) + # service = int(self.commandTable.item(0, 0).text()) + # subservice = int(self.commandTable.item(0, 1).text()) + # ssc = int(self.commandTable.item(0, 2).text()) LOGGER.info("service: " + str(self.single_command_service) + ", subservice: " + str(self.single_command_sub_service) + @@ -94,7 +99,7 @@ class TmTcFrontend: ssc=self.single_command_ssc) self.tmtc_handler.single_command_package = command.pack_command_tuple() - self.tmtc_handler.mode = obsw_config.ModeList.SingleCommandMode + self.tmtc_handler.mode = tmtcc_config.ModeList.SingleCommandMode # start the action in a new process p = threading.Thread(target=self.handle_tm_tc_action) p.start() @@ -113,33 +118,40 @@ class TmTcFrontend: self.single_command_button.setEnabled(state) def start_ui(self): - app = QApplication([]) - win = QWidget() + win = QWidget(self) + self.setCentralWidget(win) grid = QGridLayout() + self.setWindowTitle("TMTC Commander") + label = QLabel(self) + pixmap = QPixmap("SOURCEbadge.png") # QPixmap is the class, easy to put pic on screen + label.setGeometry(720, 15, 110, 110) + label.setPixmap(pixmap) + self.setWindowIcon(QIcon("SOURCEbadge.png")) + label.setScaledContents(True) row = 0 grid.addWidget(QLabel("Configuration:"), row, 0, 1, 2) row += 1 checkbox_console = QCheckBox("print output to console") - checkbox_console.setChecked(obsw_config.G_PRINT_TM) + checkbox_console.setChecked(tmtcc_config.G_PRINT_TM) checkbox_console.stateChanged.connect(checkbox_console_print) checkbox_log = QCheckBox("print output to log file") - checkbox_log.setChecked(obsw_config.G_PRINT_TO_FILE) + checkbox_log.setChecked(tmtcc_config.G_PRINT_TO_FILE) checkbox_log.stateChanged.connect(checkbox_log_print) checkbox_raw_tm = QCheckBox("print all raw TM data directly") - checkbox_raw_tm.setChecked(obsw_config.G_PRINT_RAW_TM) + checkbox_raw_tm.setChecked(tmtcc_config.G_PRINT_RAW_TM) checkbox_raw_tm.stateChanged.connect(checkbox_print_raw_data) checkbox_hk = QCheckBox("print hk data") - checkbox_hk.setChecked(obsw_config.G_PRINT_HK_DATA) + checkbox_hk.setChecked(tmtcc_config.G_PRINT_HK_DATA) checkbox_hk.stateChanged.connect(checkbox_print_hk_data) checkbox_short = QCheckBox("short display mode") - checkbox_short.setChecked(obsw_config.G_DISPLAY_MODE == "short") + checkbox_short.setChecked(tmtcc_config.G_DISPLAY_MODE == "short") checkbox_short.stateChanged.connect(checkbox_short_display_mode) grid.addWidget(checkbox_log, row, 0, 1, 1) @@ -160,14 +172,14 @@ class TmTcFrontend: # TODO: set sensible min/max values spin_timeout.setSingleStep(0.1) spin_timeout.setMinimum(0.25) - spin_timeout.setMaximum(60)# + spin_timeout.setMaximum(60) # https://youtrack.jetbrains.com/issue/PY-22908 # Ignore those warnings for now. spin_timeout.valueChanged.connect(number_timeout) grid.addWidget(spin_timeout, row, 0, 1, 1) spin_timeout_factor = QDoubleSpinBox() - spin_timeout_factor.setValue(obsw_config.G_TC_SEND_TIMEOUT_FACTOR) + spin_timeout_factor.setValue(tmtcc_config.G_TC_SEND_TIMEOUT_FACTOR) # TODO: set sensible min/max values spin_timeout_factor.setSingleStep(0.1) spin_timeout_factor.setMinimum(0.25) @@ -197,25 +209,25 @@ class TmTcFrontend: row += 1 # com if configuration grid.addWidget(QLabel("Communication Interface:"), row, 0, 1, 1) - com_if_comboBox = QComboBox() + com_if_combo_box = QComboBox() # add all possible ComIFs to the comboBox - for comIf in obsw_config.ComInterfaces: - com_if_comboBox.addItem(comIf.name) - com_if_comboBox.setCurrentIndex(obsw_config.G_COM_IF.value) - com_if_comboBox.currentIndexChanged.connect(com_if_index_changed) - grid.addWidget(com_if_comboBox, row, 1, 1, 1) + for comIf in tmtcc_config.ComInterfaces: + com_if_combo_box.addItem(comIf.name) + com_if_combo_box.setCurrentIndex(tmtcc_config.G_COM_IF.value) + com_if_combo_box.currentIndexChanged.connect(com_if_index_changed) + grid.addWidget(com_if_combo_box, row, 1, 1, 1) row += 1 # service test mode gui grid.addWidget(QLabel("Service Test Mode:"), row, 0, 1, 2) row += 1 - comboBox = QComboBox() + combo_box = QComboBox() for service in self.serviceList: - comboBox.addItem("Service - " + str(service)) - comboBox.setCurrentIndex(self.serviceList.index(obsw_config.G_SERVICE)) - comboBox.currentIndexChanged.connect(self.service_index_changed) - grid.addWidget(comboBox, row, 0, 1, 1) + combo_box.addItem("Service - " + str(service)) + combo_box.setCurrentIndex(self.serviceList.index(tmtcc_config.G_SERVICE)) + combo_box.currentIndexChanged.connect(self.service_index_changed) + grid.addWidget(combo_box, row, 0, 1, 1) self.service_test_button = QPushButton() self.service_test_button.setText("Start Service Test") @@ -284,14 +296,13 @@ class TmTcFrontend: row += 1 win.setLayout(grid) - win.resize(900, 800) - win.show() + self.resize(900, 800) + self.show() # resize table columns to fill the window width #for i in range(0, 5): # self.commandTable.setColumnWidth(i, int(self.commandTable.width() / 5) - 3) - app.exec_() class SingleCommandTable(QTableWidget): def __init__(self): @@ -308,49 +319,49 @@ class SingleCommandTable(QTableWidget): self.setItem(0, 2, QTableWidgetItem("20")) def com_if_index_changed(index: int): - obsw_config.G_COM_IF = ComInterfaces(index) - LOGGER.info("com if updated: " + str(obsw_config.G_COM_IF)) + tmtcc_config.G_COM_IF = ComInterfaces(index) + LOGGER.info("com if updated: " + str(tmtcc_config.G_COM_IF)) def checkbox_console_print(state: int): LOGGER.info(["enabled", "disabled"][state == 0] + " console print") - obsw_config.G_PRINT_TM = state == 0 + tmtcc_config.G_PRINT_TM = state == 0 def checkbox_log_print(state: int): LOGGER.info(["enabled", "disabled"][state == 0] + " print to log") - obsw_config.G_PRINT_TO_FILE = state == 0 + tmtcc_config.G_PRINT_TO_FILE = state == 0 def checkbox_print_raw_data(state: int): LOGGER.info(["enabled", "disabled"][state == 0] + " printing of raw data") - obsw_config.G_PRINT_RAW_TM = state == 0 + tmtcc_config.G_PRINT_RAW_TM = state == 0 def checkbox_print_hk_data(state: int): LOGGER.info(["enabled", "disabled"][state == 0] + " printing of hk data") - obsw_config.G_PRINT_HK_DATA = state == 0 + tmtcc_config.G_PRINT_HK_DATA = state == 0 def checkbox_short_display_mode(state: int): LOGGER.info(["enabled", "disabled"][state == 0] + " short display mode") - obsw_config.G_DISPLAY_MODE = ["short", "long"][state == 0] + tmtcc_config.G_DISPLAY_MODE = ["short", "long"][state == 0] def number_timeout(value: float): - LOGGER.info("tm timeout changed to: " + str(value)) - obsw_config.G_TM_TIMEOUT = value + LOGGER.info("pus_tm timeout changed to: " + str(value)) + tmtcc_config.G_TM_TIMEOUT = value def number_timeout_factor(value: float): - LOGGER.info("tm timeout factor changed to: " + str(value)) - obsw_config.G_TC_SEND_TIMEOUT_FACTOR = value + LOGGER.info("pus_tm timeout factor changed to: " + str(value)) + tmtcc_config.G_TC_SEND_TIMEOUT_FACTOR = value def ip_change_client(value): LOGGER.info("client ip changed: " + value) - obsw_config.G_REC_ADDRESS = (value, 2008) + tmtcc_config.G_REC_ADDRESS = (value, 2008) def ip_change_board(value): LOGGER.info("board ip changed: " + value) - obsw_config.G_SEND_ADDRESS = (value, 7) \ No newline at end of file + tmtcc_config.G_SEND_ADDRESS = (value, 7) \ No newline at end of file diff --git a/tc/__init__.py b/pus_tc/__init__.py similarity index 100% rename from tc/__init__.py rename to pus_tc/__init__.py diff --git a/tc/obsw_pus_tc_frame_packer.py b/pus_tc/tmtcc_pus_tc_frame_packer.py similarity index 96% rename from tc/obsw_pus_tc_frame_packer.py rename to pus_tc/tmtcc_pus_tc_frame_packer.py index 12447d34e9288af1fcff012337ac936c21d9e0ff..7830b47acb50ffb675fd7d956b9e13a201a431b5 100644 --- a/tc/obsw_pus_tc_frame_packer.py +++ b/pus_tc/tmtcc_pus_tc_frame_packer.py @@ -2,7 +2,7 @@ @brief Helper module to pack telecommand frames. """ from typing import List, Tuple -from tmtc_core.tc.obsw_pus_tc_base import PusTelecommand, PusTcInfo +from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand, PusTcInfo from tmtc_core.utility.obsw_logger import get_logger LOGGER = get_logger() diff --git a/tc/obsw_tc_utility.py b/pus_tc/tmtcc_pus_tc_utility.py similarity index 65% rename from tc/obsw_tc_utility.py rename to pus_tc/tmtcc_pus_tc_utility.py index 9f21283dc0bdfdc34194b3e135432540233b7d13..989389fc9c721c11f72f9cb82522c24db25711f3 100644 --- a/tc/obsw_tc_utility.py +++ b/pus_tc/tmtcc_pus_tc_utility.py @@ -1,8 +1,8 @@ from typing import Union -from tmtc_core.tc.obsw_pus_tc_base import TcQueueT -from tc.obsw_tc_service8 import generate_action_command -from config.obsw_config import LED_TASK_ID +from tmtc_core.pus_tc.tmtcc_pus_tc_base import TcQueueT +from tmtc_core.pus_tc.tmtcc_tc_service8_functional_cmd import generate_action_command +from config.tmtcc_config import LED_TASK_ID def pack_utility_command(service_queue: TcQueueT, op_code: Union[str, int]): diff --git a/tc/obsw_tc_service5_17.py b/pus_tc/tmtcc_service17_test.py similarity index 51% rename from tc/obsw_tc_service5_17.py rename to pus_tc/tmtcc_service17_test.py index 23dbd25701c81f31e544a658d3a617df6f281a03..17171c6eff7a4ddf3d80866f5e87d79ccc9dd32a 100644 --- a/tc/obsw_tc_service5_17.py +++ b/pus_tc/tmtcc_service17_test.py @@ -1,48 +1,13 @@ -# -*- coding: utf-8 -*- -""" -@file obsw_tc_service5_17.py -@brief PUS Service 5: Event Service - PUS Service 17: Test Service -@author R. Mueller -@date 02.05.2020 -""" +from tmtc_core.pus_tc.tmtcc_pus_tc_base import TcQueueT, PusTelecommand +from tmtc_core.pus_tc.tmtcc_tc_service17_test import pack_service17_ping_command -from tc.obsw_pus_tc_packer import TcQueueT, PusTelecommand - -def pack_service5_test_into(tc_queue: TcQueueT) -> TcQueueT: - tc_queue.appendleft(("print", "Testing Service 5")) - # invalid subservice - tc_queue.appendleft(("print", "Testing Service 5: Invalid subservice")) - command = PusTelecommand(service=5, subservice=1, ssc=500) - tc_queue.appendleft(command.pack_command_tuple()) - # disable events - tc_queue.appendleft(("print", "Testing Service 5: Disable event")) - command = PusTelecommand(service=5, subservice=6, ssc=500) - tc_queue.appendleft(command.pack_command_tuple()) - # trigger event - tc_queue.appendleft(("print", "Testing Service 5: Trigger event")) - command = PusTelecommand(service=17, subservice=128, ssc=510) - tc_queue.appendleft(command.pack_command_tuple()) - # enable event - tc_queue.appendleft(("print", "Testing Service 5: Enable event")) - command = PusTelecommand(service=5, subservice=5, ssc=520) - tc_queue.appendleft(command.pack_command_tuple()) - # trigger event - tc_queue.appendleft(("print", "Testing Service 5: Trigger another event")) - command = PusTelecommand(service=17, subservice=128, ssc=530) - tc_queue.appendleft(command.pack_command_tuple()) - tc_queue.appendleft(("export", "log/tmtc_log_service5.txt")) - return tc_queue - - -def pack_service17_test_into(tc_queue: TcQueueT, op_code: int = 0) -> TcQueueT: +def pack_service17_test_into(tc_queue: TcQueueT, op_code: int = 0): if op_code == 0: tc_queue.appendleft(("print", "Testing Service 17")) # ping test tc_queue.appendleft(("print", "Testing Service 17: Ping Test")) - command = PusTelecommand(service=17, subservice=1, ssc=1700) - tc_queue.appendleft(command.pack_command_tuple()) + tc_queue.appendleft(pack_service17_ping_command(ssc=1700).pack_command_tuple()) # enable event tc_queue.appendleft(("print", "Testing Service 17: Enable Event")) command = PusTelecommand(service=5, subservice=5, ssc=52) @@ -77,4 +42,4 @@ def pack_enable_periodic_print_packet(tc_queue: TcQueueT, enable: bool, ssc: int def pack_trigger_exception_packet(tc_queue: TcQueueT, ssc: int): tc_queue.appendleft(("print", "Triggering software exception")) command = PusTelecommand(service=17, subservice=150, ssc=ssc) - tc_queue.appendleft(command.pack_command_tuple()) + tc_queue.appendleft(command.pack_command_tuple()) \ No newline at end of file diff --git a/tc/obsw_tc_core.py b/pus_tc/tmtcc_tc_core.py similarity index 88% rename from tc/obsw_tc_core.py rename to pus_tc/tmtcc_tc_core.py index be4a2fa3910f779bcdd995113cf4b6fff0a6287c..e427ddfe865cf6946c4eb248f110148ed7617735 100644 --- a/tc/obsw_tc_core.py +++ b/pus_tc/tmtcc_tc_core.py @@ -1,8 +1,8 @@ from typing import Deque -from config.obsw_config import CORE_CONTROLLER_ID -from tc.obsw_tc_service8 import make_action_id -from tmtc_core.tc.obsw_pus_tc_base import PusTelecommand +from config.tmtcc_config import CORE_CONTROLLER_ID +from tmtc_core.pus_tc.tmtcc_tc_service8_functional_cmd import make_action_id +from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand def pack_core_command(tc_queue: Deque, op_code, ssc: int = 0): diff --git a/tc/obsw_tc_gps.py b/pus_tc/tmtcc_tc_gps.py similarity index 90% rename from tc/obsw_tc_gps.py rename to pus_tc/tmtcc_tc_gps.py index 1a047bf304543f8268967c814f068419fd3bfc96..73a8b1d702a729ac68f853921feb33d1480589b8 100644 --- a/tc/obsw_tc_gps.py +++ b/pus_tc/tmtcc_tc_gps.py @@ -1,15 +1,17 @@ # -*- coding: utf-8 -*- """ -@file obsw_tc_gps.py +@file tmtcc_tc_gps.py @brief GPS Device: GPS device testing @author R. Mueller @date 02.05.2020 """ -from tc.obsw_pus_tc_packer import TcQueueT, PusTelecommand -from tc.obsw_tc_service2 import pack_mode_data +from tmtc_core.pus_tc.tmtcc_pus_tc_packer import TcQueueT +from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand +from pus_tc.tmtcc_tc_service2_raw_cmd import pack_mode_data + +import config.tmtcc_config as g -import config.obsw_config as g def pack_gps_test_into(object_id: bytearray, tc_queue: TcQueueT) -> TcQueueT: diff --git a/tc/obsw_image_handler.py b/pus_tc/tmtcc_tc_image_handler.py similarity index 89% rename from tc/obsw_image_handler.py rename to pus_tc/tmtcc_tc_image_handler.py index 6f3216861781687d30e6180c43cdb21eed8379a7..e2dd6cf60f15bf0baeba050e4f233002517f346f 100644 --- a/tc/obsw_image_handler.py +++ b/pus_tc/tmtcc_tc_image_handler.py @@ -1,10 +1,10 @@ from typing import Union -from tmtc_core.tc.obsw_pus_tc_base import PusTelecommand, Deque -from tc.obsw_tc_service8 import make_action_id -from tc.obsw_tc_service20 import pack_boolean_parameter_setting +from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand, Deque +from tmtc_core.pus_tc.tmtcc_tc_service8_functional_cmd import make_action_id +from pus_tc.tmtcc_tc_service20_parameter import pack_boolean_parameter_setting from tmtc_core.utility.obsw_logger import get_logger -from config.obsw_config import SW_IMAGE_HANDLER_ID +from config.tmtcc_config import SW_IMAGE_HANDLER_ID LOGGER = get_logger() diff --git a/pus_tc/tmtcc_tc_packer_hook.py b/pus_tc/tmtcc_tc_packer_hook.py new file mode 100644 index 0000000000000000000000000000000000000000..c78a10584fa95b982a36edc3f76818c0076038f6 --- /dev/null +++ b/pus_tc/tmtcc_tc_packer_hook.py @@ -0,0 +1,113 @@ +import os +from collections import deque +from typing import Union + +import config.tmtcc_config as g +from tmtc_core.utility.obsw_logger import get_logger +from tmtc_core.pus_tc.tmtcc_pus_tc_base import TcQueueT, PusTelecommand + +from pus_tc.tmtcc_tc_service2_raw_cmd import pack_service2_test_into +from pus_tc.tmtcc_tc_service3_housekeeping import pack_service3_test_into +from pus_tc.tmtcc_tc_service8_func_cmd import pack_service8_test_into +from pus_tc.tmtcc_tc_service9_time import pack_service9_test_into +from pus_tc.tmtcc_tc_service23_sdcard import pack_service23_commands_into +from pus_tc.tmtcc_tc_service20_parameter import pack_service20_test_into +from pus_tc.tmtcc_pus_tc_utility import pack_utility_command +from pus_tc.tmtcc_tc_service200_mode import pack_mode_data, pack_service200_test_into +from pus_tc.tmtcc_tc_service5_event import pack_service5_test_into +from pus_tc.tmtcc_service17_test import pack_service17_test_into +from pus_tc.tmtcc_tc_image_handler import generate_img_handler_packet +from pus_tc.tmtcc_tc_gps import pack_gps_test_into +from pus_tc.tmtcc_tc_core import pack_core_command + + +LOGGER = get_logger() + +def pack_service_queue(service: Union[int, str], op_code: int, service_queue: TcQueueT): + if service == 2: + return pack_service2_test_into(service_queue) + if service == 3: + return pack_service3_test_into(service_queue, op_code) + if service == 5: + return pack_service5_test_into(service_queue) + if service == 8: + return pack_service8_test_into(service_queue) + if service == 9: + return pack_service9_test_into(service_queue) + if service == 17: + return pack_service17_test_into(service_queue, op_code) + if service == 20: + return pack_service20_test_into(service_queue) + if service == 23 or service.lower() == "sd": + return pack_service23_commands_into(service_queue, op_code) + if service == 200: + return pack_service200_test_into(service_queue) + if service.lower() == "dummy": + return pack_dummy_device_test_into(service_queue) + if service.lower() == "img": + return generate_img_handler_packet(service_queue, op_code) + if service.lower() == "core": + return pack_core_command(service_queue, op_code) + if service.lower() == "led": + return pack_utility_command(service_queue, op_code) + if service.lower() == "gps0": + # Object ID: GPS Device + object_id = g.GPS0_DEVICE_ID + return pack_gps_test_into(object_id, service_queue) + if service.lower() == "gps1": + # Object ID: GPS Device + object_id = g.GPS1_DEVICE_ID + return pack_gps_test_into(object_id, service_queue) + if service.lower() == "Error": + return pack_error_testing_into(service_queue) + LOGGER.warning("Invalid Service !") + +# TODO: a way to select certain services would be nice (by passing a dict or array maybe) +def create_total_tc_queue() -> TcQueueT: + if not os.path.exists("log"): + os.mkdir("log") + tc_queue = deque() + tc_queue = pack_service2_test_into(tc_queue) + tc_queue = pack_service3_test_into(tc_queue) + tc_queue = pack_service5_test_into(tc_queue) + tc_queue = pack_service8_test_into(tc_queue) + tc_queue = pack_service9_test_into(tc_queue) + tc_queue = pack_service17_test_into(tc_queue) + tc_queue = pack_service20_test_into(tc_queue) + tc_queue = pack_service200_test_into(tc_queue) + tc_queue = pack_dummy_device_test_into(tc_queue) + object_id = g.GPS0_DEVICE_ID + tc_queue = pack_gps_test_into(object_id, tc_queue) + return tc_queue + + + + + +def pack_dummy_device_test_into(tc_queue: TcQueueT) -> TcQueueT: + tc_queue.appendleft(("print", "Testing Dummy Device")) + # Object ID: Dummy Device + object_id = g.DUMMY_DEVICE_ID + # Set On Mode + tc_queue.appendleft(("print", "Testing Service Dummy: Set On")) + mode_data = pack_mode_data(object_id, 1, 0) + command = PusTelecommand(service=200, subservice=1, ssc=1, app_data=mode_data) + tc_queue.appendleft(command.pack_command_tuple()) + # Test Service 2 commands + tc_queue.appendleft(("print", "Testing Service Dummy: Service 2")) + pack_service2_test_into(tc_queue, True) + # Test Service 8 + tc_queue.appendleft(("print", "Testing Service Dummy: Service 8")) + pack_service8_test_into(tc_queue, True) + tc_queue.appendleft(("export", "log/tmtc_log_service_dummy.txt")) + return tc_queue + + +def pack_error_testing_into(tc_queue: TcQueueT) -> TcQueueT: + # a lot of events + command = PusTelecommand(service=17, subservice=129, ssc=2010) + tc_queue.appendleft(command.pack_command_tuple()) + # a lot of ping testing + command = PusTelecommand(service=17, subservice=130, ssc=2020) + tc_queue.appendleft(command.pack_command_tuple()) + return tc_queue \ No newline at end of file diff --git a/tc/obsw_tc_service200.py b/pus_tc/tmtcc_tc_service200_mode.py similarity index 88% rename from tc/obsw_tc_service200.py rename to pus_tc/tmtcc_tc_service200_mode.py index cf7f5a18a88d663955c87b52631c9851180fa65b..0e70d8fb35b72bf84b38238799bd3d07753fec1b 100644 --- a/tc/obsw_tc_service200.py +++ b/pus_tc/tmtcc_tc_service200_mode.py @@ -1,13 +1,13 @@ # -*- coding: utf-8 -*- """ -@file obsw_tc_service200.py +@file tmtcc_tc_service200_mode.py @brief PUS Service 200: PUS custom service 200: Mode commanding @author R. Mueller @date 02.05.2020 """ -from tmtc_core.tc.obsw_pus_tc_base import PusTelecommand -from tc.obsw_pus_tc_packer import TcQueueT -import config.obsw_config as g +from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand +from tmtc_core.pus_tc.tmtcc_pus_tc_packer import TcQueueT +import config.tmtcc_config as g import struct diff --git a/tc/obsw_tc_service20.py b/pus_tc/tmtcc_tc_service20_parameter.py similarity index 83% rename from tc/obsw_tc_service20.py rename to pus_tc/tmtcc_tc_service20_parameter.py index b0dedf027528d041fa0b87c3adb7aa26ed746abd..75b028ad33a9281a4cb27a6f87b2de25e3a85395 100644 --- a/tc/obsw_tc_service20.py +++ b/pus_tc/tmtcc_tc_service20_parameter.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- """ -@file obsw_tc_service20.py +@file tmtcc_tc_service20_parameter.py @brief PUS Service 20: Parameter management. @author J. Gerhards @date 30.06.2020 @@ -8,10 +8,10 @@ import struct from typing import Deque -import config.obsw_config as g -from tmtc_core.tc.obsw_pus_tc_base import PusTelecommand, TcQueueT +import config.tmtcc_config as g +from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand, TcQueueT from tmtc_core.utility.obsw_logger import get_logger -from tc.obsw_tc_service200 import pack_mode_data +from pus_tc.tmtcc_tc_service200_mode import pack_mode_data LOGGER = get_logger() @@ -52,9 +52,9 @@ def pack_boolean_parameter_setting(object_id: bytearray, domain_id: int, def pack_service20_test_into(tc_queue: Deque, called_externally: bool = False) -> Deque: # parameter IDs - parameterID0 = 0 - parameterID1 = 256 - parameterID2 = 512 + parameter_id0 = 0 + parameter_id1 = 256 + parameter_id2 = 512 if called_externally is False: tc_queue.appendleft(("print", "Testing Service 20")) @@ -68,48 +68,48 @@ def pack_service20_test_into(tc_queue: Deque, called_externally: bool = False) - # test checking Load for uint32_t tc_queue.appendleft(("print", "Testing Service 20: Load uint32_t")) - parameter_id = struct.pack(">I", parameterID0) + parameter_id = struct.pack(">I", parameter_id0) type_and_matrix_data = pack_type_and_matrix_data(3, 14, 1, 1) parameter_data = struct.pack(">I", 42) - payload = object_id + parameter_id+ type_and_matrix_data + parameter_data + payload = object_id + parameter_id + type_and_matrix_data + parameter_data command = PusTelecommand(service=20, subservice=128, ssc=2010, app_data=payload) tc_queue.appendleft(command.pack_command_tuple()) # test checking Dump for uint32_t tc_queue.appendleft(("print", "Testing Service 20: Dump uint32_t")) - parameter_id = struct.pack(">I", parameterID0) + parameter_id = struct.pack(">I", parameter_id0) payload = object_id + parameter_id command = PusTelecommand(service=20, subservice=129, ssc=2020, app_data=payload) tc_queue.appendleft(command.pack_command_tuple()) # test checking Load for int32_t tc_queue.appendleft(("print", "Testing Service 20: Load int32_t")) - parameter_id = struct.pack(">I", parameterID1) + parameter_id = struct.pack(">I", parameter_id1) type_and_matrix_data = pack_type_and_matrix_data(4, 14, 1, 1) parameter_data = struct.pack(">i", -42) - payload = object_id + parameter_id+ type_and_matrix_data + parameter_data + payload = object_id + parameter_id + type_and_matrix_data + parameter_data command = PusTelecommand(service=20, subservice=128, ssc=2030, app_data=payload) tc_queue.appendleft(command.pack_command_tuple()) # test checking Dump for int32_t tc_queue.appendleft(("print", "Testing Service 20: Dump int32_t")) - parameter_id = struct.pack(">I", parameterID1) + parameter_id = struct.pack(">I", parameter_id1) payload = object_id + parameter_id command = PusTelecommand(service=20, subservice=129, ssc=2040, app_data=payload) tc_queue.appendleft(command.pack_command_tuple()) # test checking Load for float tc_queue.appendleft(("print", "Testing Service 20: Load float")) - parameter_id = struct.pack(">I", parameterID2) + parameter_id = struct.pack(">I", parameter_id2) type_and_matrix_data = pack_type_and_matrix_data(5, 1, 1, 1) parameter_data = struct.pack(">f", 4.2) - payload = object_id + parameter_id+ type_and_matrix_data + parameter_data + payload = object_id + parameter_id + type_and_matrix_data + parameter_data command = PusTelecommand(service=20, subservice=128, ssc=2050, app_data=payload) tc_queue.appendleft(command.pack_command_tuple()) # test checking Dump for float tc_queue.appendleft(("print", "Testing Service 20: Dump float")) - parameter_id = struct.pack(">I", parameterID2) + parameter_id = struct.pack(">I", parameter_id2) payload = object_id + parameter_id command = PusTelecommand(service=20, subservice=129, ssc=2060, app_data=payload) tc_queue.appendleft(command.pack_command_tuple()) diff --git a/tc/obsw_tc_service23_sdcard.py b/pus_tc/tmtcc_tc_service23_sdcard.py similarity index 98% rename from tc/obsw_tc_service23_sdcard.py rename to pus_tc/tmtcc_tc_service23_sdcard.py index b89ae1bfb3f10dc9cd95af6f9d92ed503f2e20a0..14814962d0dd1f70f5901a62ee1165639df03402 100644 --- a/tc/obsw_tc_service23_sdcard.py +++ b/pus_tc/tmtcc_tc_service23_sdcard.py @@ -4,11 +4,12 @@ Created: 21.01.2020 07:48 @author: Jakob Meier """ -import config.obsw_config as g +import config.tmtcc_config as g from typing import Deque, Union -from tc.obsw_pus_tc_packer import PusTelecommand, TcQueueT -from tc.obsw_tc_service8 import make_action_id +from tmtc_core.pus_tc.tmtcc_pus_tc_packer import TcQueueT +from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand +from tmtc_core.pus_tc.tmtcc_tc_service8_functional_cmd import make_action_id from tmtc_core.utility.obsw_logger import get_logger LOGGER = get_logger() diff --git a/tc/obsw_tc_service2.py b/pus_tc/tmtcc_tc_service2_raw_cmd.py similarity index 90% rename from tc/obsw_tc_service2.py rename to pus_tc/tmtcc_tc_service2_raw_cmd.py index 22906fb235d867bc2e09f352b4caf9d10e2380ee..d40f21efea7fed91f3a4b7c5de03b499450fa78e 100644 --- a/tc/obsw_tc_service2.py +++ b/pus_tc/tmtcc_tc_service2_raw_cmd.py @@ -1,15 +1,15 @@ # -*- coding: utf-8 -*- """ -@file obsw_tc_service2.py -@brief PUS Service 2: Device Access, Native low-level commanding +@file tmtcc_tc_service2_raw_cmd.py +@brief PUS Service 2: Device Access, native low-level commanding @author R. Mueller @date 01.11.2019 """ import struct -import config.obsw_config as g -from tmtc_core.tc.obsw_pus_tc_base import PusTelecommand, Deque -from tc.obsw_tc_service200 import pack_mode_data +import config.tmtcc_config as g +from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand, Deque +from pus_tc.tmtcc_tc_service200_mode import pack_mode_data def pack_service2_test_into(tc_queue: Deque, called_externally: bool = False) -> Deque: diff --git a/tc/obsw_tc_service3.py b/pus_tc/tmtcc_tc_service3_housekeeping.py similarity index 85% rename from tc/obsw_tc_service3.py rename to pus_tc/tmtcc_tc_service3_housekeeping.py index b4b267fa43d8b4e78e9c325f85170c755c1c5f83..31c8cf0ae232dcfccadd5f0799933a52a9ff1152 100644 --- a/tc/obsw_tc_service3.py +++ b/pus_tc/tmtcc_tc_service3_housekeeping.py @@ -1,14 +1,14 @@ # -*- coding: utf-8 -*- """ -@file obsw_tc_service3.py +@file tmtcc_tc_service3_housekeeping.py @brief PUS Service 3: Housekeeping Service. @author R. Mueller @date 02.05.2020 """ import struct -from typing import Deque -from tmtc_core.tc.obsw_pus_tc_base import PusTelecommand -import config.obsw_config as g +from typing import Deque, Union +from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand +import config.tmtcc_config as g def make_sid(object_id: bytearray, set_id: int) -> bytearray: @@ -23,12 +23,15 @@ def make_interval(interval_seconds: float) -> bytearray: # adding custom defintion to hk using test pool variables -def pack_service3_test_into(tc_queue: Deque) -> Deque: - tc_queue.appendleft(("print", "Testing Service 3")) - # Predefined packet testing - pack_test_device_test(tc_queue) - pack_custom_tests(tc_queue) - pack_internal_error_reporter_tests(tc_queue) +def pack_service3_test_into(tc_queue: Deque, op_code: Union[str, int] = 0) -> Deque: + if op_code.lower() == "ie": + tc_queue.appendleft(("print", "Testing Internal Error Reporter")) + pack_internal_error_reporter_tests(tc_queue) + else: + tc_queue.appendleft(("print", "Testing Service 3")) + # Predefined packet testing + pack_test_device_test(tc_queue) + # pack_thermal_sensor_tests(tc_queue) tc_queue.appendleft(("export", "log/tmtc_log_service3.txt")) return tc_queue @@ -37,7 +40,7 @@ def pack_test_device_test(tc_queue: Deque): pass -def pack_custom_tests(tc_queue: Deque): +def pack_thermal_sensor_tests(tc_queue: Deque): tc_queue.appendleft(("print", "Generate one thermal sensor packet: ")) command = PusTelecommand(service=3, subservice=27, ssc=3100, app_data=sid_thermalsensor) tc_queue.appendleft(command.pack_command_tuple()) diff --git a/pus_tc/tmtcc_tc_service5_event.py b/pus_tc/tmtcc_tc_service5_event.py new file mode 100644 index 0000000000000000000000000000000000000000..4d8bb4d1df0152b91cd3f04591e8b85bc6f95e41 --- /dev/null +++ b/pus_tc/tmtcc_tc_service5_event.py @@ -0,0 +1,38 @@ +# -*- coding: utf-8 -*- +""" +@file tmtcc_tc_service5_event.py +@brief PUS Service 5: Event Service + PUS Service 17: Test Service +@author R. Mueller +@date 02.05.2020 +""" +from tmtc_core.pus_tc.tmtcc_pus_tc_packer import TcQueueT +from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand +from tmtc_core.pus_tc.tmtcc_tc_service17_test import pack_service17_ping_command + + +def pack_service5_test_into(tc_queue: TcQueueT): + tc_queue.appendleft(("print", "Testing Service 5")) + # invalid subservice + tc_queue.appendleft(("print", "Testing Service 5: Invalid subservice")) + command = PusTelecommand(service=5, subservice=1, ssc=500) + tc_queue.appendleft(command.pack_command_tuple()) + # disable events + tc_queue.appendleft(("print", "Testing Service 5: Disable event")) + command = PusTelecommand(service=5, subservice=6, ssc=500) + tc_queue.appendleft(command.pack_command_tuple()) + # trigger event + tc_queue.appendleft(("print", "Testing Service 5: Trigger event")) + command = PusTelecommand(service=17, subservice=128, ssc=510) + tc_queue.appendleft(command.pack_command_tuple()) + # enable event + tc_queue.appendleft(("print", "Testing Service 5: Enable event")) + command = PusTelecommand(service=5, subservice=5, ssc=520) + tc_queue.appendleft(command.pack_command_tuple()) + # trigger event + tc_queue.appendleft(("print", "Testing Service 5: Trigger another event")) + command = PusTelecommand(service=17, subservice=128, ssc=530) + tc_queue.appendleft(command.pack_command_tuple()) + tc_queue.appendleft(("export", "log/tmtc_log_service5.txt")) + + diff --git a/tc/obsw_tc_service8.py b/pus_tc/tmtcc_tc_service8_func_cmd.py similarity index 79% rename from tc/obsw_tc_service8.py rename to pus_tc/tmtcc_tc_service8_func_cmd.py index 6449fcf45f5599368fc0b30061cc7d68ef0ce8ea..dfa52785db0e348f4bb31d7c9f149d1989f3c572 100644 --- a/tc/obsw_tc_service8.py +++ b/pus_tc/tmtcc_tc_service8_func_cmd.py @@ -1,16 +1,15 @@ # -*- coding: utf-8 -*- """ -@file obsw_tc_service8.py +@file tmtcc_tc_service8_func_cmd.py @brief PUS Service 8: High-level functional commanding. @author R. Mueller @date 01.11.2019 """ -import struct from typing import Deque -import config.obsw_config as g -from tmtc_core.tc.obsw_pus_tc_base import PusTelecommand -from tc.obsw_tc_service200 import pack_mode_data +import config.tmtcc_config as g +from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand +from pus_tc.tmtcc_tc_service200_mode import pack_mode_data def pack_service8_test_into(tc_queue: Deque, called_externally: bool = False) -> Deque: @@ -64,13 +63,3 @@ def pack_service8_test_into(tc_queue: Deque, called_externally: bool = False) -> tc_queue.appendleft(("export", "log/tmtc_log_service8.txt")) return tc_queue - -def generate_action_command(object_id: bytearray, action_id: int, data: bytearray = bytearray([]), - ssc: int = 0): - data_to_pack = bytearray(object_id) - data_to_pack += make_action_id(action_id) + data - return PusTelecommand(service=8, subservice=128, ssc=ssc, app_data=data_to_pack) - - -def make_action_id(action_id: int) -> bytearray: - return bytearray(struct.pack('!I', action_id)) diff --git a/tc/obsw_tc_service9.py b/pus_tc/tmtcc_tc_service9_time.py similarity index 91% rename from tc/obsw_tc_service9.py rename to pus_tc/tmtcc_tc_service9_time.py index 7970b0e5215faba74de4de94156f4a6236f16e29..63456e6071eba534e1c0471ea07436b80e9128ab 100644 --- a/tc/obsw_tc_service9.py +++ b/pus_tc/tmtcc_tc_service9_time.py @@ -1,13 +1,14 @@ # -*- coding: utf-8 -*- """ -@file obsw_tc_service9.py +@file tmtcc_tc_service9_time.py @brief PUS Service 9: Time management. @author R. Mueller @date 01.11.2019 """ from datetime import datetime -from tc.obsw_pus_tc_packer import TcQueueT, PusTelecommand +from tmtc_core.pus_tc.tmtcc_pus_tc_packer import TcQueueT +from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTelecommand from tmtc_core.utility.obsw_logger import get_logger LOGGER = get_logger() diff --git a/tm/__init__.py b/pus_tm/__init__.py similarity index 100% rename from tm/__init__.py rename to pus_tm/__init__.py diff --git a/tm/obsw_pus_tm_factory_hook.py b/pus_tm/obsw_pus_tm_factory_hook.py similarity index 92% rename from tm/obsw_pus_tm_factory_hook.py rename to pus_tm/obsw_pus_tm_factory_hook.py index 94359cf02de06f448c37c09b620af0f6d13944a1..8b2c647dca385f18cf9b536e78a4cf4726aaed9c 100644 --- a/tm/obsw_pus_tm_factory_hook.py +++ b/pus_tm/obsw_pus_tm_factory_hook.py @@ -1,13 +1,13 @@ import struct -from tmtc_core.tm.obsw_pus_tm_base import PusTelemetry +from tmtc_core.pus_tm.tmtcc_pus_tm_base import PusTelemetry from tmtc_core.utility.obsw_logger import get_logger -from tm.obsw_tm_service_1 import Service1TM -from tm.obsw_tm_service_3 import Service3TM -from tm.obsw_tm_service_5 import Service5TM -from tm.obsw_tm_service_20 import Service20TM -from tm.obsw_tm_service_23 import Service23TM +from tmtc_core.pus_tm.tmtcc_tm_service_1 import Service1TM +from pus_tm.obsw_tm_service_3 import Service3TM +from tmtc_core.pus_tm.tmtcc_tm_service_5 import Service5TM +from pus_tm.obsw_tm_service_20 import Service20TM +from pus_tm.obsw_tm_service_23 import Service23TM LOGGER = get_logger() diff --git a/tm/obsw_tm_service_20.py b/pus_tm/obsw_tm_service_20.py similarity index 91% rename from tm/obsw_tm_service_20.py rename to pus_tm/obsw_tm_service_20.py index 48d4d630d126251cbd80976cf87fffcc78401c8f..aa0b661955b306095d9404d81b4707dc312da8d1 100644 --- a/tm/obsw_tm_service_20.py +++ b/pus_tm/obsw_tm_service_20.py @@ -1,6 +1,9 @@ import struct -from tmtc_core.tm.obsw_pus_tm_base import PusTelemetry, TmDictionaryKeys, PusTmInfoT +from tmtc_core.pus_tm.tmtcc_pus_tm_base import PusTelemetry, TmDictionaryKeys, PusTmInfoT +from tmtc_core.utility.obsw_logger import get_logger + +LOGGER = get_logger() class Service20TM(PusTelemetry): @@ -22,7 +25,7 @@ class Service20TM(PusTelemetry): if self.type_ptc == 5 and self.type_pfc == 1: self.param = struct.unpack('>f', self._tm_data[12:datasize])[0] else: - logger.info("Error when receiving Pus Service 20 TM: subservice is != 130, but 130 is \ + LOGGER.info("Error when receiving Pus Service 20 TM: subservice is != 130, but 130 is \ only known subservice") self.specify_packet_info("Functional Commanding Reply") diff --git a/tm/obsw_tm_service_23.py b/pus_tm/obsw_tm_service_23.py similarity index 98% rename from tm/obsw_tm_service_23.py rename to pus_tm/obsw_tm_service_23.py index b4668c332ff5c583f286129cbed23a6e5a5dafd3..1e92544f0904388fd76c6c1d20c8620a53f070bc 100644 --- a/tm/obsw_tm_service_23.py +++ b/pus_tm/obsw_tm_service_23.py @@ -1,6 +1,6 @@ import struct -from tmtc_core.tm.obsw_pus_tm_base import PusTelemetry +from tmtc_core.pus_tm.tmtcc_pus_tm_base import PusTelemetry from tmtc_core.utility.obsw_logger import get_logger LOGGER = get_logger() diff --git a/tm/obsw_tm_service_3.py b/pus_tm/obsw_tm_service_3.py similarity index 96% rename from tm/obsw_tm_service_3.py rename to pus_tm/obsw_tm_service_3.py index ddb4376333700671b40b2f244a48a4a14f8b691a..b5270d1b742abcfcb58c52498c2b21b88e34dd13 100644 --- a/tm/obsw_tm_service_3.py +++ b/pus_tm/obsw_tm_service_3.py @@ -6,11 +6,11 @@ Description: Deserialize Housekeeping TM Author: R. Mueller """ -from tmtc_core.tm.obsw_pus_tm_base import PusTelemetry +from tmtc_core.pus_tm.tmtcc_pus_tm_base import PusTelemetry from typing import Type from tmtc_core.utility.obsw_logger import get_logger import struct -import config.obsw_config as g +import config.tmtcc_config as g LOGGER = get_logger() diff --git a/tc/obsw_pus_tc_packer.py b/tc/obsw_pus_tc_packer.py deleted file mode 100644 index c4d281ed5f67cd7d483a8e84c7430df55ea3bb4f..0000000000000000000000000000000000000000 --- a/tc/obsw_pus_tc_packer.py +++ /dev/null @@ -1,124 +0,0 @@ -# -*- coding: utf-8 -*- -""" -@file obsw_tc_packer.py -@brief Packs the TC queue for specific G_SERVICE or device testing -@details -Contains the sevice packet which can pack specific service queues (hardcoded for now) -@author R. Mueller -@date 01.11.2019 -""" -import os - -from tmtc_core.tc.obsw_pus_tc_base import PusTelecommand, TcQueueT -from tc.obsw_tc_service2 import pack_service2_test_into -from tc.obsw_tc_service3 import pack_service3_test_into -from tc.obsw_tc_service8 import pack_service8_test_into -from tc.obsw_tc_service9 import pack_service9_test_into -from tc.obsw_tc_service23_sdcard import pack_service23_commands_into -from tc.obsw_tc_service20 import pack_service20_test_into -from tc.obsw_tc_utility import pack_utility_command -from tc.obsw_tc_service200 import pack_mode_data, pack_service200_test_into -from tc.obsw_tc_service5_17 import pack_service5_test_into, pack_service17_test_into -from tc.obsw_image_handler import generate_img_handler_packet -from tc.obsw_tc_gps import pack_gps_test_into -from tc.obsw_tc_core import pack_core_command - -from tmtc_core.utility.obsw_logger import get_logger -import config.obsw_config as g -from collections import deque -from typing import Union - -LOGGER = get_logger() - - -class ServiceQueuePacker: - def __init__(self): - pass - - @staticmethod - def pack_service_queue(service: Union[int, str], op_code: int, service_queue: TcQueueT): - if service == 2: - return pack_service2_test_into(service_queue) - if service == 3: - return pack_service3_test_into(service_queue) - if service == 5: - return pack_service5_test_into(service_queue) - if service == 8: - return pack_service8_test_into(service_queue) - if service == 9: - return pack_service9_test_into(service_queue) - if service == 17: - return pack_service17_test_into(service_queue, op_code) - if service == 20: - return pack_service20_test_into(service_queue) - if service == 23 or service.lower() == "sd": - return pack_service23_commands_into(service_queue, op_code) - if service == 200: - return pack_service200_test_into(service_queue) - if service.lower() == "dummy": - return pack_dummy_device_test_into(service_queue) - if service.lower() == "img": - return generate_img_handler_packet(service_queue, op_code) - if service.lower() == "core": - return pack_core_command(service_queue, op_code) - if service.lower() == "led": - return pack_utility_command(service_queue, op_code) - if service.lower() == "gps0": - # Object ID: GPS Device - object_id = g.GPS0_DEVICE_ID - return pack_gps_test_into(object_id, service_queue) - if service.lower() == "gps1": - # Object ID: GPS Device - object_id = g.GPS1_DEVICE_ID - return pack_gps_test_into(object_id, service_queue) - if service.lower() == "Error": - return pack_error_testing_into(service_queue) - LOGGER.warning("Invalid Service !") - - -# TODO: a way to select certain services would be nice (by passing a dict or array maybe) -def create_total_tc_queue() -> TcQueueT: - if not os.path.exists("log"): - os.mkdir("log") - tc_queue = deque() - tc_queue = pack_service2_test_into(tc_queue) - tc_queue = pack_service3_test_into(tc_queue) - tc_queue = pack_service5_test_into(tc_queue) - tc_queue = pack_service8_test_into(tc_queue) - tc_queue = pack_service9_test_into(tc_queue) - tc_queue = pack_service17_test_into(tc_queue) - tc_queue = pack_service20_test_into(tc_queue) - tc_queue = pack_service200_test_into(tc_queue) - tc_queue = pack_dummy_device_test_into(tc_queue) - object_id = g.GPS0_DEVICE_ID - tc_queue = pack_gps_test_into(object_id, tc_queue) - return tc_queue - - -def pack_dummy_device_test_into(tc_queue: TcQueueT) -> TcQueueT: - tc_queue.appendleft(("print", "Testing Dummy Device")) - # Object ID: Dummy Device - object_id = g.DUMMY_DEVICE_ID - # Set On Mode - tc_queue.appendleft(("print", "Testing Service Dummy: Set On")) - mode_data = pack_mode_data(object_id, 1, 0) - command = PusTelecommand(service=200, subservice=1, ssc=1, app_data=mode_data) - tc_queue.appendleft(command.pack_command_tuple()) - # Test Service 2 commands - tc_queue.appendleft(("print", "Testing Service Dummy: Service 2")) - pack_service2_test_into(tc_queue, True) - # Test Service 8 - tc_queue.appendleft(("print", "Testing Service Dummy: Service 8")) - pack_service8_test_into(tc_queue, True) - tc_queue.appendleft(("export", "log/tmtc_log_service_dummy.txt")) - return tc_queue - - -def pack_error_testing_into(tc_queue: TcQueueT) -> TcQueueT: - # a lot of events - command = PusTelecommand(service=17, subservice=129, ssc=2010) - tc_queue.appendleft(command.pack_command_tuple()) - # a lot of ping testing - command = PusTelecommand(service=17, subservice=130, ssc=2020) - tc_queue.appendleft(command.pack_command_tuple()) - return tc_queue diff --git a/test/obsw_module_test.py b/test/obsw_module_test.py index fbdd1b609b3beed2f19d34c24108f6909c7bf47c..905074a6492bb3211d02ea1b1ee2e148649aa39e 100644 --- a/test/obsw_module_test.py +++ b/test/obsw_module_test.py @@ -44,12 +44,12 @@ from abc import abstractmethod from collections import deque from typing import Deque -from config import obsw_config as g -from tc.obsw_pus_tc_packer import pack_dummy_device_test_into -from tmtc_core.tc.obsw_pus_tc_base import PusTcInfoQueueT, TcDictionaryKeys -from tm.obsw_tm_service_1 import PusPacketInfoService1T -from tmtc_core.tm.obsw_pus_tm_base import TmDictionaryKeys -from tmtc_core.tm.obsw_pus_tm_factory import PusTmInfoQueueT, PusTmInfoT +from config import tmtcc_config as g +from pus_tc.tmtcc_tc_packer_hook import pack_dummy_device_test_into +from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTcInfoQueueT, TcDictionaryKeys +from tmtc_core.pus_tm.tmtcc_tm_service_1 import PusPacketInfoService1T +from tmtc_core.pus_tm.tmtcc_pus_tm_base import TmDictionaryKeys +from tmtc_core.pus_tm.tmtcc_pus_tm_factory import PusTmInfoQueueT, PusTmInfoT from tmtc_core.sendreceive.obsw_multiple_commands_sender_receiver import MultipleCommandSenderReceiver from tmtc_core.utility.obsw_logger import get_logger @@ -80,12 +80,12 @@ class TestService(unittest.TestCase): :return: """ cls._displayMode = "long" - # wait intervals between tc send bursts. - # Example: [2,4] sends to send 2 tc from queue and wait, then sends another 2 and wait again + # wait intervals between pus_tc send bursts. + # Example: [2,4] sends to send 2 pus_tc from queue and wait, then sends another 2 and wait again cls.wait_intervals = [] cls.wait_time = 5.0 cls.print_tc = True - # default wait time between tc send bursts + # default wait time between pus_tc send bursts cls.test_queue = deque() # Extremely ugly solution so that we can use the Python Unit Test Framework @@ -127,7 +127,7 @@ class TestService(unittest.TestCase): TODO: Maybe we should instantiate this once in the main and then reuse it instead of calling the constructor over and over again. If done so we need a setter for: wait_time, wait_intervals, printTm, - tc_timeout_factor and the tc.queue. Furthermore, changing parameters should + tc_timeout_factor and the pus_tc.queue. Furthermore, changing parameters should only be allowed as long as the commander/receiver is not running by checking a flag :return: """ @@ -199,7 +199,7 @@ class TestService(unittest.TestCase): def scan_for_respective_tc(self, current_tm_info: PusTmInfoT): """ - this function looks whether the tc verification SSC matched + this function looks whether the pus_tc verification SSC matched a source sequence count of the sent TM """ current_subservice = current_tm_info[TmDictionaryKeys.SUBSERVICE] diff --git a/test/obsw_pus_service_test.py b/test/obsw_pus_service_test.py index 16137a669cb53a9f0cfc0076a1c349c2ab771bcf..80c3ba3760039eb04b808957061561b0575fcc75 100644 --- a/test/obsw_pus_service_test.py +++ b/test/obsw_pus_service_test.py @@ -9,10 +9,10 @@ import unittest from typing import Deque from test.obsw_module_test import TestService, PusTmInfoQueueT, TmDictionaryKeys, AssertionDictKeys -from tmtc_core.tc.obsw_pus_tc_base import PusTcInfoQueueT -from tc.obsw_pus_tc_packer import pack_service17_test_into, pack_service5_test_into, \ - pack_service2_test_into, pack_service8_test_into, pack_service200_test_into, pack_service20_test_into -import config.obsw_config as g +from tmtc_core.pus_tc.tmtcc_pus_tc_base import PusTcInfoQueueT +from pus_tc.tmtcc_tc_packer_hook import pack_service17_test_into, pack_service5_test_into, \ + pack_service2_test_into, pack_service8_test_into, pack_service200_test_into +import config.tmtcc_config as g from tmtc_core.utility.obsw_logger import get_logger LOGGER = get_logger() @@ -183,8 +183,8 @@ class TestService17(TestService): def _analyse_tm_tc_info(self, tm_info_queue: PusTmInfoQueueT, tc_info_queue: PusTcInfoQueueT): assertion_dict = super()._analyse_tm_tc_info(tm_info_queue=tm_info_queue, tc_info_queue=tc_info_queue) - # add anything elsee other than tc verification counter - # and ssc that is needed for tm analysis + # add anything elsee other than pus_tc verification counter + # and ssc that is needed for pus_tm analysis return assertion_dict def analyse_tc_info(self, tc_info_queue): @@ -256,8 +256,8 @@ class TestService200(TestService): def _analyse_tm_tc_info(self, tm_info_queue: PusTmInfoQueueT, tc_info_queue: PusTcInfoQueueT): assertion_dict = super()._analyse_tm_tc_info(tm_info_queue=tm_info_queue, tc_info_queue=tc_info_queue) - # add anything else other than tc verification counter - # and ssc that is needed for tm analysis + # add anything else other than pus_tc verification counter + # and ssc that is needed for pus_tm analysis return assertion_dict def analyse_tc_info(self, tc_info_queue): diff --git a/tm/obsw_tm_service_1.py b/tm/obsw_tm_service_1.py deleted file mode 100644 index 806070419fc9988f09f26439a8a9c8462409722a..0000000000000000000000000000000000000000 --- a/tm/obsw_tm_service_1.py +++ /dev/null @@ -1,129 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Program: obsw_tm_service_1.py -Date: 30.12.2019 -Description: Deserialize Pus Verification TM -Author: R. Mueller -""" -import struct -from typing import Dict - -from tmtc_core.tm.obsw_pus_tm_base import PusTelemetry, TmDictionaryKeys -from tmtc_core.tm.obsw_pus_tm_creator import PusTelemetryCreator -from tmtc_core.utility.obsw_logger import get_logger - -LOGGER = get_logger() -PusPacketInfoService1T = Dict[TmDictionaryKeys, any] - - -class Service1TM(PusTelemetry): - """ - Service 1 TM class representation. Can be used to deserialize raw service 1 packets. - """ - def __init__(self, byte_array: bytearray): - super().__init__(byte_array) - self.has_tc_error_code = False - self.is_step_reply = False - # Failure Reports with error code - self.err_code = 0 - self.step_number = 0 - self.error_param1 = 0 - self.error_param2 = 0 - self.tc_packet_id = self._tm_data[0] << 8 | self._tm_data[1] - self.tc_ssc = ((self._tm_data[2] & 0x3F) << 8) | self._tm_data[3] - if self.get_subservice() % 2 == 0: - self.__handle_failure_verification() - else: - self.__handle_success_verification() - - def append_telemetry_content(self, content_list: list): - super().append_telemetry_content(content_list) - content_list.append(str(hex(self.tc_packet_id))) - content_list.append(str(self.tc_ssc)) - if self.has_tc_error_code: - if self.is_step_reply: - content_list.append(str(self.step_number)) - content_list.append(str(hex(self.err_code))) - content_list.append(str(hex(self.error_param1)) + ", " + str(self.error_param1)) - content_list.append(str(hex(self.error_param2)) + ", " + str(self.error_param2)) - elif self.is_step_reply: - content_list.append(str(self.step_number)) - - def append_telemetry_column_headers(self, header_list: list): - super().append_telemetry_column_headers(header_list) - header_list.append("TC Packet ID") - header_list.append("TC SSC") - if self.has_tc_error_code: - if self.is_step_reply: - header_list.append("Step Number") - header_list.append("Return Value") - header_list.append("Error Param 1") - header_list.append("Error Param 2") - elif self.is_step_reply: - header_list.append("Step Number") - - def __handle_failure_verification(self): - self.specify_packet_info("Failure Verficiation") - self.has_tc_error_code = True - if self.get_subservice() == 2: - self.append_packet_info(" : Acceptance failure") - elif self.get_subservice() == 4: - self.append_packet_info(" : Start failure") - elif self.get_subservice() == 6: - self.is_step_reply = True - self.append_packet_info(" : Step Failure") - self.step_number = struct.unpack('>B', self._tm_data[4:5])[0] - self.err_code = struct.unpack('>H', self._tm_data[5:7])[0] - self.error_param1 = struct.unpack('>I', self._tm_data[7:11])[0] - self.error_param2 = struct.unpack('>I', self._tm_data[11:15])[0] - elif self.get_subservice() == 8: - self.err_code = struct.unpack('>H', self._tm_data[4:6])[0] - self.error_param1 = struct.unpack('>I', self._tm_data[6:10])[0] - self.error_param2 = struct.unpack('>I', self._tm_data[10:14])[0] - else: - LOGGER.error("Service1TM: Invalid subservice") - - def __handle_success_verification(self): - self.specify_packet_info("Success Verification") - if self.get_subservice() == 1: - self.append_packet_info(" : Acceptance success") - elif self.get_subservice() == 3: - self.append_packet_info(" : Start success") - elif self.get_subservice() == 5: - self.is_step_reply = True - self.append_packet_info(" : Step Success") - self.step_number = struct.unpack('>B', self._tm_data[4:5])[0] - elif self.get_subservice() == 7: - self.append_packet_info(" : Completion success") - else: - LOGGER.error("Service1TM: Invalid subservice") - - def pack_tm_information(self) -> PusPacketInfoService1T: - tm_information = super().pack_tm_information() - add_information = { - TmDictionaryKeys.TC_PACKET_ID: self.tc_packet_id, - TmDictionaryKeys.TC_SSC: self.tc_ssc, - } - tm_information.update(add_information) - if self.has_tc_error_code: - tm_information.update({TmDictionaryKeys.ERROR_CODE: self.err_code}) - if self.is_step_reply: - tm_information.update({TmDictionaryKeys.STEP_NUMBER: self.step_number}) - return tm_information - - -class Service1TmPacked(PusTelemetryCreator): - """ - Class representation for Service 1 TM creation. - """ - def __init__(self, subservice: int, ssc: int = 0, tc_packet_id: int = 0, tc_ssc: int = 0): - source_data = bytearray() - source_data.append((tc_packet_id & 0xFF00) >> 8) - source_data.append(tc_packet_id & 0xFF) - tc_psc = (tc_ssc & 0x3FFF) | (0b11 << 16) - source_data.append((tc_psc & 0xFF00) >> 8) - source_data.append(tc_psc & 0xFF) - super().__init__(service=1, subservice=subservice, ssc=ssc, source_data=source_data) - - def pack(self) -> bytearray: - return super().pack() diff --git a/tm/obsw_tm_service_5.py b/tm/obsw_tm_service_5.py deleted file mode 100644 index a25c4b203b1ead83dc653251995cebb656ffdf03..0000000000000000000000000000000000000000 --- a/tm/obsw_tm_service_5.py +++ /dev/null @@ -1,54 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Program: obsw_tm_service_5.py -Date: 30.12.2019 -Description: Deserialize PUS Event Report -Author: R. Mueller -""" - -from tmtc_core.tm.obsw_pus_tm_base import PusTelemetry, TmDictionaryKeys -from tmtc_core.tm.obsw_pus_tm_factory import PusTmInfoT -import struct - - -class Service5TM(PusTelemetry): - def __init__(self, byte_array): - super().__init__(byte_array) - self.specify_packet_info("Event") - if self.get_subservice() == 1: - self.append_packet_info(" Info") - elif self.get_subservice() == 2: - self.append_packet_info(" Error Low Severity") - elif self.get_subservice() == 3: - self.append_packet_info(" Error Med Severity") - elif self.get_subservice() == 4: - self.append_packet_info(" Error High Severity") - self.eventId = struct.unpack('>H', self._tm_data[0:2])[0] - self.objectId = struct.unpack('>I', self._tm_data[2:6])[0] - self.param1 = struct.unpack('>I', self._tm_data[6:10])[0] - self.param2 = struct.unpack('>I', self._tm_data[10:14])[0] - - def append_telemetry_content(self, array): - super().append_telemetry_content(array) - array.append(str(self.eventId)) - array.append(hex(self.objectId)) - array.append(str(hex(self.param1)) + ", " + str(self.param1)) - array.append(str(hex(self.param2)) + ", " + str(self.param2)) - - def append_telemetry_column_headers(self, array): - super().append_telemetry_column_headers(array) - array.append("Event ID") - array.append("Reporter ID") - array.append("Parameter 1") - array.append("Parameter 2") - - def pack_tm_information(self) -> PusTmInfoT: - tm_information = super().pack_tm_information() - add_information = { - TmDictionaryKeys.REPORTER_ID: self.objectId, - TmDictionaryKeys.EVENT_ID: self.eventId, - TmDictionaryKeys.EVENT_PARAM_1: self.param1, - TmDictionaryKeys.EVENT_PARAM_2: self.param2 - } - tm_information.update(add_information) - return tm_information diff --git a/tmtc_client_cli.py b/tmtc_client_cli.py index 48c02fbe3b710e6d782bdb345d16760375c0aab8..1f761c98c72158a9d9aae8e63c9c2762eeddc4f2 100644 --- a/tmtc_client_cli.py +++ b/tmtc_client_cli.py @@ -1,11 +1,13 @@ #!/usr/bin/python3 """ -@brief This client was developed by KSat for the SOURCE project to test the on-board software. +@brief TMTC Commander entry point for command line mode. @details -This client features multiple sender/receiver modes and has been designed -to be extensible and easy to use. This clien is based on the PUS standard for the format -of telecommands and telemetry. It can also send TMTC via different interfaces like the -serial interface (USB port) or ethernet interface. +This client was developed by KSat for the SOURCE project to test the on-board software but +has evolved into a more generic tool for satellite developers to perform TMTC (Telemetry and Telecommand) +handling and testing via different communication interfaces. Currently, only the PUS standard is +implemented as a packet standard. + +Run this file with the -h flag to display options. @license Copyright 2020 KSat e.V. Stuttgart @@ -22,13 +24,14 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -@manual -Run this file with the -h flag to display options. +@author R. Mueller """ from core.tmtc_client_core import run_tmtc_client + def main(): run_tmtc_client(False) + if __name__ == "__main__": main() diff --git a/tmtc_client_gui.py b/tmtc_client_gui.py index 60722026def47208fcd2bef2eb3e07607f2ea440..4caf5c22bdfa1b83dbd5f2e4da45a13908101343 100644 --- a/tmtc_client_gui.py +++ b/tmtc_client_gui.py @@ -1,11 +1,13 @@ #!/usr/bin/python3 """ -@brief This client was developed by KSat for the SOURCE project to test the on-board software. +@brief TMTC Commander entry point for GUI mode. @details -This client features multiple sender/receiver modes and has been designed -to be extensible and easy to use. This clien is based on the PUS standard for the format -of telecommands and telemetry. It can also send TMTC via different interfaces like the -serial interface (USB port) or ethernet interface. +This client was developed by KSat for the SOURCE project to test the on-board software but +has evolved into a more generic tool for satellite developers to perform TMTC (Telemetry and Telecommand) +handling and testing via different communication interfaces. Currently, only the PUS standard is +implemented as a packet standard. + +Run this file with the -h flag to display options. @license Copyright 2020 KSat e.V. Stuttgart @@ -22,12 +24,14 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -@manual +@author R. Mueller """ from core.tmtc_client_core import run_tmtc_client + def main(): run_tmtc_client(True) + if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/tmtc_core b/tmtc_core index 1e06be166b0bf47a2a921fd45eed49ea9f5782c3..12ce9917d67b543fd561dc72f9372761419ccbd8 160000 --- a/tmtc_core +++ b/tmtc_core @@ -1 +1 @@ -Subproject commit 1e06be166b0bf47a2a921fd45eed49ea9f5782c3 +Subproject commit 12ce9917d67b543fd561dc72f9372761419ccbd8 diff --git a/utility/obsw_args_parser.py b/utility/tmtcc_args_parser.py similarity index 87% rename from utility/obsw_args_parser.py rename to utility/tmtcc_args_parser.py index 47f2018ae2a2a64961d5f7e4e6b5eadc207e2ce3..069aa42af78da13538660aa0927ce6b22a27a91e 100644 --- a/utility/obsw_args_parser.py +++ b/utility/tmtcc_args_parser.py @@ -1,134 +1,134 @@ -#!/usr/bin/python3.8 -""" -Argument parser module. -""" -import argparse -import sys -from tmtc_core.utility.obsw_logger import get_logger - - -LOGGER = get_logger() - - -def parse_input_arguments(): - """ - Parses all input arguments - :return: Input arguments contained in a special namespace and accessable by args.<variable> - """ - arg_parser = argparse.ArgumentParser(description="TMTC Client Command Line Interface") - - arg_parser.add_argument( - '-m', '--mode', type=int, help='Target Mode. Default is 1 (Listener Mode), ' - '0: GUI Mode, 1: Listener Mode, 2: Single Command Mode, 3: Service Test Mode, ' - '4: Software Test Mode, 5: Binary Upload Mode, 6: Unit Test Mode ', default=0) - arg_parser.add_argument( - '-c', '--com_if', type=int, help='Communication Interface. 0: Dummy Interface, 1: Serial, ' - '2: QEMU, 3: UDP', default=2) - arg_parser.add_argument( - '-o', '--op_code', help='Operation code, which is passed to the TC ' - 'packer functions', default=0) - arg_parser.add_argument('--clientIP', help='Client(Computer) IP. Default:\'\'', default='') - arg_parser.add_argument( - '--boardIP', help='Board IP. Default: Localhost 127.0.0.1', default="127.0.0.1") - arg_parser.add_argument('-s', '--service', help='Service to test. Default: 17', default=17) - arg_parser.add_argument( - '-l','--listener', help='Determine whether the listener mode will be active after ' - 'performing the operation', - action='store_false') - arg_parser.add_argument( - '-t', '--tm_timeout', type=float, help='TM Timeout when listening to verification sequence.' - ' Default: 5 seconds', default=5.0) - arg_parser.add_argument( - '--nl', dest='print_log', help='Supply --nl to suppress print output to log files.', - action='store_false') - arg_parser.add_argument( - '--np', dest='print_tm', help='Supply --np to suppress print output to console.', - action='store_false') - arg_parser.add_argument( - '--tc_timeout_factor', type=float, help='TC Timeout Factor. Multiplied with ' - 'TM Timeout, TC sent again after this time period. Default: 3.5', default=3.5) - arg_parser.add_argument( - '-r', '--rawDataPrint', help='Supply -r to print all raw TM data directly', - action='store_true') - arg_parser.add_argument( - '-d', '--shortDisplayMode', help='Supply -d to print short output', action='store_true') - arg_parser.add_argument( - '--hk', dest='print_hk', help='Supply -k or --hk to print HK data', action='store_true') - arg_parser.add_argument('--COM', dest="com_port", help='COM Port for serial communication') - arg_parser.add_argument( - '--rs', dest="resend_tc", help='Specify whether TCs are sent again after timeout', - action='store_true') - - if len(sys.argv) == 1: - print("No Input Arguments specified.") - arg_parser.print_help() - args, unknown = arg_parser.parse_known_args() - # for argument in vars(args): - # LOGGER.debug(argument + ": " + str(getattr(args, argument))) - handle_args(args, unknown) - return args - - -def handle_args(args, unknown: list) -> None: - """ - Handles the parsed arguments. - :param args: Namespace objects - (see https://docs.python.org/dev/library/argparse.html#argparse.Namespace) - :param unknown: List of unknown parameters. - :return: None - """ - if len(unknown) > 0: - print("Unknown arguments detected: " + str(unknown)) - if len(sys.argv) > 1: - handle_unspecified_args(args) - if len(sys.argv) == 1: - handle_empty_args(args) - - -def handle_unspecified_args(args) -> None: - """ - If some arguments are unspecified, they are set here with (variable) default values. - :param args: - :return: None - """ - if args.com_if == 1 and args.tm_timeout is None: - args.tm_timeout = 6.0 - if args.mode is None: - print("No mode specified with -m Parameter.") - print("Possible Modes: ") - print("1: Listener Mode") - print("2: Single Command Mode with manual command") - print("3: Service Mode, Commands specified in tc folder") - print("4: Software Mode, runs all command specified in obsw_pus_tc_packer.py") - print("5: Unit Test, runs unit test specified in obsw_module_test.py") - args.mode = input("Please enter Mode: ") - if args.mode == 1 and args.service is None: - args.service = input("No Service specified for Service Mode. " - "Please enter PUS G_SERVICE number: ") - - -def handle_empty_args(args) -> None: - """ - If no args were supplied, request input from user directly. - TODO: This still needs to be extended. - :param args: - :return: - """ - print_hk = input("Print HK packets ? (y/n or yes/no)") - try: - print_hk = print_hk.lower() - except TypeError: - pass - if print_hk in ('y', 'yes', 1): - args.print_hk = True - else: - args.print_hk = False - print_to_log = input("Export G_SERVICE test output to log files ? (y/n or yes/no)") - try: - print_to_log = print_to_log.lower() - except TypeError: - pass - if print_to_log in ('n', 'no', 0): - args.printFile = False - else: - args.printFile = True +#!/usr/bin/python3.8 +""" +Argument parser module. +""" +import argparse +import sys +from tmtc_core.utility.obsw_logger import get_logger + + +LOGGER = get_logger() + + +def parse_input_arguments(): + """ + Parses all input arguments + :return: Input arguments contained in a special namespace and accessable by args.<variable> + """ + arg_parser = argparse.ArgumentParser(description="TMTC Client Command Line Interface") + + arg_parser.add_argument( + '-m', '--mode', type=int, help='Target Mode. Default is 1 (Listener Mode), ' + '1: Listener Mode, 2: Single Command Mode, 3: Service Test Mode, ' + '4: Software Test Mode, 5: Binary Upload Mode, 6: Unit Test Mode ', default=0) + arg_parser.add_argument( + '-c', '--com_if', type=int, help='Communication Interface. 0: Dummy Interface, 1: Serial, ' + '2: QEMU, 3: UDP', default=2) + arg_parser.add_argument( + '-o', '--op_code', help='Operation code, which is passed to the TC ' + 'packer functions', default=0) + arg_parser.add_argument('--clientIP', help='Client(Computer) IP. Default:\'\'', default='') + arg_parser.add_argument( + '--boardIP', help='Board IP. Default: Localhost 127.0.0.1', default="127.0.0.1") + arg_parser.add_argument('-s', '--service', help='Service to test. Default: 17', default=17) + arg_parser.add_argument( + '-l', '--listener', help='Determine whether the listener mode will be active ' + 'after performing the operation', + action='store_false') + arg_parser.add_argument( + '-t', '--tm_timeout', type=float, help='TM Timeout when listening to verification sequence.' + ' Default: 5 seconds', default=5.0) + arg_parser.add_argument( + '--nl', dest='print_log', help='Supply --nl to suppress print output to log files.', + action='store_false') + arg_parser.add_argument( + '--np', dest='print_tm', help='Supply --np to suppress print output to console.', + action='store_false') + arg_parser.add_argument( + '--tc_timeout_factor', type=float, help='TC Timeout Factor. Multiplied with ' + 'TM Timeout, TC sent again after this time period. Default: 3.5', default=3.5) + arg_parser.add_argument( + '-r', '--rawDataPrint', help='Supply -r to print all raw TM data directly', + action='store_true') + arg_parser.add_argument( + '-d', '--shortDisplayMode', help='Supply -d to print short output', action='store_true') + arg_parser.add_argument( + '--hk', dest='print_hk', help='Supply -k or --hk to print HK data', action='store_true') + arg_parser.add_argument( + '--rs', dest="resend_tc", help='Specify whether TCs are sent again after timeout', + action='store_true') + + if len(sys.argv) == 1: + print("No Input Arguments specified.") + arg_parser.print_help() + args, unknown = arg_parser.parse_known_args() + # for argument in vars(args): + # LOGGER.debug(argument + ": " + str(getattr(args, argument))) + handle_args(args, unknown) + + return args + + +def handle_args(args, unknown: list) -> None: + """ + Handles the parsed arguments. + :param args: Namespace objects + (see https://docs.python.org/dev/library/argparse.html#argparse.Namespace) + :param unknown: List of unknown parameters. + :return: None + """ + if len(unknown) > 0: + print("Unknown arguments detected: " + str(unknown)) + if len(sys.argv) > 1: + handle_unspecified_args(args) + if len(sys.argv) == 1: + handle_empty_args(args) + + +def handle_unspecified_args(args) -> None: + """ + If some arguments are unspecified, they are set here with (variable) default values. + :param args: + :return: None + """ + if args.com_if == 1 and args.tm_timeout is None: + args.tm_timeout = 6.0 + if args.mode is None: + print("No mode specified with -m Parameter.") + print("Possible Modes: ") + print("1: Listener Mode") + print("2: Single Command Mode with manual command") + print("3: Service Mode, Commands specified in pus_tc folder") + print("4: Software Mode, runs all command specified in tmtcc_pus_tc_packer.py") + print("5: Unit Test, runs unit test specified in obsw_module_test.py") + args.mode = input("Please enter Mode: ") + if args.mode == 1 and args.service is None: + args.service = input("No Service specified for Service Mode. " + "Please enter PUS G_SERVICE number: ") + + +def handle_empty_args(args) -> None: + """ + If no args were supplied, request input from user directly. + TODO: This still needs to be extended. + :param args: + :return: + """ + print_hk = input("Print HK packets ? (y/n or yes/no)") + try: + print_hk = print_hk.lower() + except TypeError: + pass + if print_hk in ('y', 'yes', 1): + args.print_hk = True + else: + args.print_hk = False + print_to_log = input("Export G_SERVICE test output to log files ? (y/n or yes/no)") + try: + print_to_log = print_to_log.lower() + except TypeError: + pass + if print_to_log in ('n', 'no', 0): + args.printFile = False + else: + args.printFile = True diff --git a/utility/obsw_binary_uploader.py b/utility/tmtcc_binary_uploader.py similarity index 67% rename from utility/obsw_binary_uploader.py rename to utility/tmtcc_binary_uploader.py index 60c3de66a08d7a357d2ae58b547c3c0d61c3d09b..c1c8417ce03cf87e9a297fddcff0ba792cd69452 100644 --- a/utility/obsw_binary_uploader.py +++ b/utility/tmtcc_binary_uploader.py @@ -1,246 +1,306 @@ -#!/usr/bin/python3.8 -""" -@brief Binary Uploader Module -@details -This module will be used to upload binaries to the OBC via a communication port, given -a supplied binary. The binary will be sent via the specified communication interface. -It will be possible to encode the data (for example using DLE encoding) -""" -import os -import time -import tkinter as tk -from tkinter import filedialog -from collections import deque -from glob import glob -from typing import Deque - -from tmtc_core.comIF.obsw_com_interface import CommunicationInterface -from utility.obsw_file_transfer_helper import FileTransferHelper -import config.obsw_config as g -from tmtc_core.utility.obsw_tmtc_printer import TmTcPrinter, DisplayMode -from tmtc_core.utility.obsw_logger import get_logger -from tmtc_core.sendreceive.obsw_tm_listener import TmListener - -LOGGER = get_logger() - - -class BinaryFileUploader: - def __init__(self, com_if: CommunicationInterface, tmtc_printer: TmTcPrinter, - tm_listener: TmListener): - """ - Initializes the binary file uploader with the required components. - @param com_if: - @param tmtc_printer: - @param tm_listener: - """ - self.com_if = com_if - self.tmtc_printer = tmtc_printer - self.tm_listener = tm_listener - self.iobc = False - self.send_interval = 1.0 - - def perform_file_upload(self): - gui_cl_prompt = input("GUI(0) or command line version (1)? [0/1]: ") - if gui_cl_prompt == 0: - gui_cl_prompt = True - else: - gui_cl_prompt = False - print("Please select file to upload: ") - file_path = "" - if gui_cl_prompt: - root = tk.Tk() - root.withdraw() - root.wm_attributes('-topmost', 1) - file_path = filedialog.askopenfilename(parent=root) - print("File select: " + str(file_path)) - if file_path == (): - LOGGER.warning("Invalid file path, exiting binary upload mode.") - return - else: - result = [y for x in os.walk("../_bin") for y in glob(os.path.join(x[0], '*.bin'))] - print("Files found in _bin folder: ") - for idx, path in enumerate(result): - print("Selection " + str(idx) + ": " + str(path)) - select_valid = False - selection = input("Please enter desired selection [c to cancel]: ") - while not select_valid: - if selection == 'c': - print("Exiting binary upload mode..") - return - if not selection.isdigit(): - selection = input("Invalid input, try again [c to cancel]: ") - if selection.isdigit(): - if int(selection) < len(result): - file_path = result[int(selection)] - select_valid = True - else: - selection = input("Invalid input, try again [c to cancel]: ") - - print_string = file_path.rsplit(os.path.sep, 1)[-1] + " was selected." - LOGGER.info(print_string) - calc_hamming_code = input("Calculate and send hamming code? [y/n]: ") - if calc_hamming_code in ['y', 'yes', 1]: - calc_hamming_code = True - print("Hamming code will be calculated and sent in tail packet") - else: - calc_hamming_code = False - print("Hamming code will not be calculated") - - iobc_prompt = input("iOBC? [y/n]: ") - if iobc_prompt in ['y', 'yes', 1]: - self.iobc = True - self.send_interval = 0.8 - iobc_prompt = True - else: - self.iobc = False - self.send_interval = 0.6 - iobc_prompt = False - - bootloader_prompt = input("Bootloader (0) or Software Image (1)? [0/1]: ") - if str(bootloader_prompt) == "0": - bootloader_prompt = True - else: - bootloader_prompt = False - - prompt_lock = input("Lock file with last packet? [y/n]: ") - if prompt_lock in ['n', "no", 0]: - prompt_lock = False - else: - prompt_lock = True - - if bootloader_prompt: - file_name = "bl.bin" - else: - file_name = "obsw_up.bin" - - if iobc_prompt: - if bootloader_prompt: - repository_name = "BIN/IOBC/BL" - else: - repository_name = "BIN/IOBC/OBSW" - else: - if bootloader_prompt: - repository_name = "BIN/AT91/BL" - else: - repository_name = "BIN/AT91/OBSW" - - if calc_hamming_code: - pass - - # Right now, the size of PUS packets is limited to 1500 bytes which also limits the app - # data length. - frame_length = g.G_MAX_APP_DATA_LENGTH - - if calc_hamming_code: - # now we calculate the hamming code - pass - - tc_queue = deque() - - # Delete existing binary file first, otherwise data might be appended to otherwise - # valid file which already exists. - file_transfer_helper = FileTransferHelper( - tc_queue=tc_queue, max_size_of_app_data=frame_length, target_repository=repository_name, - target_filename=file_name) - - init_ssc = 0 - self.tmtc_printer.set_display_mode(DisplayMode.SHORT) - - # Configure file transfer helper - file_transfer_helper.set_data_from_file(file_path) - file_transfer_helper.set_to_delete_old_file() - if prompt_lock: - file_transfer_helper.set_to_lock_file(prompt_lock) - else: - file_transfer_helper.set_to_lock_file(prompt_lock) - - # Generate the packets. - file_transfer_helper.generate_packets(init_ssc) - - self.tm_listener.set_listener_mode(TmListener.ListenerModes.MANUAL) - print_string = "BinaryUploader: Detected file size: " + str( - file_transfer_helper.file_size()) - LOGGER.info(print_string) - total_num_packets = file_transfer_helper.get_number_of_packets_generated() - print_string = "BinaryUploader: " + str(total_num_packets) + \ - " packets generated." - if prompt_lock: - print_string += " File will be locked." - else: - print_string += " File will not be locked." - LOGGER.info(print_string) - - reception_deque = deque() - self.__perform_send_algorithm(tc_queue, total_num_packets, reception_deque) - - print_string = "BinaryUploader: All binary packets were sent!" - LOGGER.info(print_string) - print_string = str(reception_deque.__len__()) + " replies received." - - LOGGER.info(print_string) - time.sleep(15) - reception_deque.extend(self.tm_listener.retrieve_tm_packet_queue()) - for tm_list in reception_deque: - for tm_packet in tm_list: - if tm_packet.get_service() == 23 and tm_packet.get_subservice() == 132: - # tmtc_printer.print_telemetry(tm_packet) - pass - self.tm_listener.clear_tm_packet_queue() - LOGGER.info("Transitioning back to listener mode..") - - def __perform_send_algorithm(self, tc_queue: Deque, number_of_packets: int, reception_deque: - Deque): - last_check = time.time() - last_sent = time.time() - total_time = self.send_interval * number_of_packets - idx = 1 - while tc_queue: - next_send = last_sent + self.send_interval - (tc_packet, tc_info) = tc_queue.pop() - if not isinstance(tc_packet, str): - # print_string = "Sending packet " + str(idx) + ".." - # LOGGER.info(print_string) - idx += 1 - self.com_if.send_telecommand(tc_packet, tc_info) - self.tmtc_printer.print_telecommand(tc_packet, tc_info) - elif tc_packet == "print": - LOGGER.info(tc_info) - remaining_time_string = "Remaining time: " + \ - str(round(total_time - (idx - 2) * self.send_interval, 2)) + \ - " seconds" - print_progress_bar(idx - 2, number_of_packets, print_end="\n", - suffix=remaining_time_string) - # sys.stdout.write("\033[F") # Cursor up one line - packets_received = self.tm_listener.retrieve_tm_packet_queue() - reception_deque.extend(packets_received) - # Every 5 seconds, check whether any reply has been received. If not, cancel operation. - if time.time() - last_check > 5.0 and len(packets_received) == 0: - LOGGER.warning("No replies are being received, cancelling upload operation..") - time_to_sleep = next_send - time.time() - last_sent = next_send - time.sleep(time_to_sleep) - - -# https://stackoverflow.com/questions/3173320/text-progress-bar-in-the-console -# Thank you Greensticks :-) -def print_progress_bar(iteration, total, prefix='', suffix='', decimals=1, length=100, - fill='â–ˆ', print_end="\r"): - """ - Call in a loop to create terminal progress bar - @params: - iteration - Required : current iteration (Int) - total - Required : total iterations (Int) - prefix - Optional : prefix string (Str) - suffix - Optional : suffix string (Str) - decimals - Optional : positive number of decimals in percent complete (Int) - length - Optional : character length of bar (Int) - fill - Optional : bar fill character (Str) - print_end - Optional : end character (e.g. "\r", "\r\n") (Str) - """ - percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total))) - filled_length = int(length * iteration // total) - bar = fill * filled_length + '-' * (length - filled_length) - print(f'\r{prefix} |{bar}| {percent}% {suffix}', end=print_end) - # Print New Line on Complete - if iteration == total: - print() +#!/usr/bin/python3.8 +""" +@brief Binary Uploader Module +@details +This module will be used to upload binaries to the OBC via a communication port, given +a supplied binary. The binary will be sent via the specified communication interface. +It will be possible to encode the data (for example using DLE encoding) +""" +import os +import time +import tkinter as tk +from tkinter import filedialog +from collections import deque +from glob import glob + +from tmtc_core.pus_tm.tmtcc_tm_service_1 import Service1TM +from tmtc_core.comIF.obsw_com_interface import CommunicationInterface +from tmtc_core.pus_tc.tmtcc_pus_tc_base import TcQueueT, TcDictionaryKeys +from utility.tmtcc_file_transfer_helper import FileTransferHelper +import config.tmtcc_config as g +from tmtc_core.utility.obsw_tmtc_printer import TmTcPrinter, DisplayMode +from tmtc_core.utility.obsw_logger import get_logger +from tmtc_core.sendreceive.obsw_tm_listener import TmListener + +LOGGER = get_logger() + + +class BinaryFileUploader: + def __init__(self, com_if: CommunicationInterface, tmtc_printer: TmTcPrinter, + tm_listener: TmListener): + """ + Initializes the binary file uploader with the required components. + @param com_if: + @param tmtc_printer: + @param tm_listener: + """ + self.com_if = com_if + self.tmtc_printer = tmtc_printer + self.tm_listener = tm_listener + self.iobc = False + # Will be set later depending on board. + self.send_interval = 0 + + def perform_file_upload(self): + gui_cl_prompt = input("GUI(0) or command line version (1)? [0/1]: ") + if gui_cl_prompt == 0: + gui_cl_prompt = True + else: + gui_cl_prompt = False + print("Please select file to upload: ") + file_path = "" + if gui_cl_prompt: + root = tk.Tk() + root.withdraw() + root.wm_attributes('-topmost', 1) + file_path = filedialog.askopenfilename(parent=root) + print("File select: " + str(file_path)) + if file_path == (): + LOGGER.warning("Invalid file path, exiting binary upload mode.") + return + else: + result = [y for x in os.walk("../_bin") for y in glob(os.path.join(x[0], '*.bin'))] + print("Files found in _bin folder: ") + for idx, path in enumerate(result): + print("Selection " + str(idx) + ": " + str(path)) + select_valid = False + selection = input("Please enter desired selection [c to cancel]: ") + while not select_valid: + if selection == 'c': + print("Exiting binary upload mode..") + return + if not selection.isdigit(): + selection = input("Invalid input, try again [c to cancel]: ") + if selection.isdigit(): + if int(selection) < len(result): + file_path = result[int(selection)] + select_valid = True + else: + selection = input("Invalid input, try again [c to cancel]: ") + + print_string = file_path.rsplit(os.path.sep, 1)[-1] + " was selected." + LOGGER.info(print_string) + calc_hamming_code = input("Calculate and send hamming code? [y/n]: ") + if calc_hamming_code in ['y', 'yes', 1]: + calc_hamming_code = True + print("Hamming code will be calculated and sent in tail packet") + else: + calc_hamming_code = False + print("Hamming code will not be calculated") + + iobc_prompt = input("iOBC? [y/n]: ") + if iobc_prompt in ['y', 'yes', 1]: + self.iobc = True + self.send_interval = 1.4 + iobc_prompt = True + else: + self.iobc = False + self.send_interval = 0.6 + iobc_prompt = False + + bootloader_prompt = input("Bootloader (0) or Software Image (1)? [0/1]: ") + if str(bootloader_prompt) == "0": + bootloader_prompt = True + else: + bootloader_prompt = False + + prompt_lock = input("Lock file with last packet? [y/n]: ") + if prompt_lock in ['n', "no", 0]: + prompt_lock = False + else: + prompt_lock = True + + if bootloader_prompt: + file_name = "bl.bin" + else: + file_name = "obsw_up.bin" + + if iobc_prompt: + if bootloader_prompt: + repository_name = "BIN/IOBC/BL" + else: + repository_name = "BIN/IOBC/OBSW" + else: + if bootloader_prompt: + repository_name = "BIN/AT91/BL" + else: + repository_name = "BIN/AT91/OBSW" + + if calc_hamming_code: + pass + + # Right now, the size of PUS packets is limited to 1500 bytes which also limits the app + # data length. + frame_length = g.G_MAX_APP_DATA_LENGTH + + if calc_hamming_code: + # now we calculate the hamming code + pass + + tc_queue = deque() + + # Delete existing binary file first, otherwise data might be appended to otherwise + # valid file which already exists. + file_transfer_helper = FileTransferHelper( + tc_queue=tc_queue, max_size_of_app_data=frame_length, target_repository=repository_name, + target_filename=file_name) + + init_ssc = 0 + self.tmtc_printer.set_display_mode(DisplayMode.SHORT) + + # Configure file transfer helper + file_transfer_helper.set_data_from_file(file_path) + file_transfer_helper.set_to_delete_old_file() + if prompt_lock: + file_transfer_helper.set_to_lock_file(prompt_lock) + else: + file_transfer_helper.set_to_lock_file(prompt_lock) + + # Generate the packets. + file_transfer_helper.generate_packets(init_ssc) + + self.tm_listener.set_listener_mode(TmListener.ListenerModes.MANUAL) + print_string = "BinaryUploader: Detected file size: " + str( + file_transfer_helper.file_size()) + LOGGER.info(print_string) + total_num_packets = file_transfer_helper.get_number_of_packets_generated() + print_string = "BinaryUploader: " + str(total_num_packets) + \ + " packets generated." + if prompt_lock: + print_string += " File will be locked." + else: + print_string += " File will not be locked." + LOGGER.info(print_string) + + packets_received = self.__perform_send_algorithm(tc_queue, total_num_packets) + + LOGGER.info("BinaryUploader: All binary packets were sent!") + print_string = str(packets_received) + " replies received." + + LOGGER.info(print_string) + self.tm_listener.clear_tm_packet_queue() + LOGGER.info("Upload operation finished successfully.") + + def __perform_send_algorithm(self, tc_queue: TcQueueT, number_of_packets: int) -> int: + last_check = time.time() + last_sent = time.time() + reception_dict = dict() + total_time = self.send_interval * number_of_packets + idx = 1 + new_packets_received = 0 + num_packets_received = 0 + while tc_queue: + next_send = last_sent + self.send_interval + tc_packet, tc_info = tc_queue.pop() + if not isinstance(tc_packet, str): + # print_string = "Sending packet " + str(idx) + ".." + # LOGGER.info(print_string) + idx += 1 + self.com_if.send_telecommand(tc_packet, tc_info) + self.tmtc_printer.print_telecommand(tc_packet, tc_info) + # Store a list which tracks whether acceptance messages were received + # The first entry is a simply counter. + reception_dict.update({tc_info[TcDictionaryKeys.SSC]: [0, False, False, False]}) + elif tc_packet == "print": + LOGGER.info(tc_info) + remaining_time_string = "Remaining time: " + \ + str(round(total_time - (idx - 2) * self.send_interval, 2)) + \ + " seconds" + print_progress_bar(idx - 2, number_of_packets, print_end="\n", + suffix=remaining_time_string) + # sys.stdout.write("\033[F") # Cursor up one line + new_packets_received += self.__handle_tm_queue(reception_dict) + + # Every 5 seconds, check whether any reply has been received. If not, cancel operation. + if time.time() - last_check > 5.0 and new_packets_received == 0: + LOGGER.warning("No replies are being received, cancelling upload operation..") + elif time.time() - last_check > 5.0: + num_packets_received += new_packets_received + new_packets_received = 0 + last_check = time.time() + + time_to_sleep = next_send - time.time() + last_sent = next_send + time.sleep(time_to_sleep) + + # handle last telemetry packets coming in. + wait_time = 4 + LOGGER.info("Waiting " + str(wait_time) + " more seconds for TM packets..") + time_till_wake = time.time() + wait_time + while time.time() < time_till_wake: + new_packets_received += self.__handle_tm_queue(reception_dict) + time.sleep(1) + + num_packets_received += new_packets_received + return num_packets_received + + def __handle_tm_queue(self, reception_dict: dict) -> int: + num_packets_received = 0 + packets_received = self.tm_listener.retrieve_tm_packet_queue() + self.tm_listener.clear_tm_packet_queue() + # reception_deque.extend(packets_received) + for packet_list in packets_received: + for packet in packet_list: + num_packets_received += 1 + if packet.get_service() == 1: + packet: Service1TM + tc_ssc = packet.get_tc_ssc() + acceptance_list = reception_dict.get(tc_ssc) + if acceptance_list is None: + LOGGER.warning("Invalid TC SSC with number " + str(tc_ssc) + + " detected!") + continue + if packet.get_subservice() == 1: + acceptance_list[1] = True + if packet.get_subservice() == 3: + acceptance_list[2] = True + if packet.get_subservice() == 7: + acceptance_list[3] = True + reception_dict.update({tc_ssc: acceptance_list}) + elif packet.get_service() == 5: + # TODO: print event + LOGGER.info("Event received!") + else: + print_string = "Other TM packet with service ID " + str(packet.get_service()) + \ + " received!" + LOGGER.info(print_string) + clear_list = [] + for key, reception_list in reception_dict.items(): + if reception_list[1] and reception_list[2] and reception_list[3]: + # All replies received, we can delete the entry and confirm succesfull handling + print_string = "All replies received for upload packet with SSC " + str(key) + "." + LOGGER.info(print_string) + clear_list.append(key) + elif reception_list[0] > 5: + # No replies for telecommand. + LOGGER.warning("No reply received for TC SSC " + str(key) + "!") + # Continue nonetheless + clear_list.append(key) + for key in clear_list: + del reception_dict[key] + return num_packets_received + + +# https://stackoverflow.com/questions/3173320/text-progress-bar-in-the-console +# Thank you Greensticks :-) +def print_progress_bar(iteration, total, prefix='', suffix='', decimals=1, length=100, + fill='â–ˆ', print_end="\r"): + """ + Call in a loop to create terminal progress bar + @params: + iteration - Required : current iteration (Int) + total - Required : total iterations (Int) + prefix - Optional : prefix string (Str) + suffix - Optional : suffix string (Str) + decimals - Optional : positive number of decimals in percent complete (Int) + length - Optional : character length of bar (Int) + fill - Optional : bar fill character (Str) + print_end - Optional : end character (e.g. "\r", "\r\n") (Str) + """ + percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total))) + filled_length = int(length * iteration // total) + bar = fill * filled_length + '-' * (length - filled_length) + print(f'\r{prefix} |{bar}| {percent}% {suffix}', end=print_end) + # Print New Line on Complete + if iteration == total: + print() diff --git a/utility/binary_writer.py b/utility/tmtcc_binary_writer.py similarity index 100% rename from utility/binary_writer.py rename to utility/tmtcc_binary_writer.py diff --git a/utility/obsw_file_transfer_helper.py b/utility/tmtcc_file_transfer_helper.py similarity index 97% rename from utility/obsw_file_transfer_helper.py rename to utility/tmtcc_file_transfer_helper.py index c6efb90e7e15cf98b494e9c895f9137b88107c6f..4a47de3dcfb013c5dcc25d5e38eff292eee4bbbc 100644 --- a/utility/obsw_file_transfer_helper.py +++ b/utility/tmtcc_file_transfer_helper.py @@ -1,9 +1,9 @@ from enum import Enum import math -from config.obsw_config import SD_CARD_HANDLER_ID -from tmtc_core.tc.obsw_pus_tc_base import TcQueueT, PusTelecommand -from tc.obsw_tc_service23_sdcard import \ +from config.tmtcc_config import SD_CARD_HANDLER_ID +from tmtc_core.pus_tc.tmtcc_pus_tc_base import TcQueueT, PusTelecommand +from pus_tc.tmtcc_tc_service23_sdcard import \ calculate_allowed_file_data_size, generate_rm_file_srv23_2_packet, \ generate_create_file_srv23_1_packet, generate_finish_append_to_file_srv23_131_packet, \ generate_lock_file_srv23_5_6_packet @@ -223,6 +223,7 @@ class FileTransferHelper: header += data[number_of_packets * size_of_data_blocks:len(data)] commands = PusTelecommand(service=23, subservice=130, ssc=init_ssc + packet_sequence_number, app_data=header) + self.__current_ssc = init_ssc + packet_sequence_number self.tc_queue.appendleft(commands.pack_command_tuple()) def __handle_finish_and_lock_packet_generation(self): @@ -230,12 +231,14 @@ class FileTransferHelper: last_command = generate_finish_append_to_file_srv23_131_packet( filename=self.target_filename, repository_path=self.target_repository, ssc=self.__current_ssc, lock_file=self.__lock_file) + self.__current_ssc += 1 else: if self.__lock_file: last_command = generate_lock_file_srv23_5_6_packet( filename=self.target_filename, repository_path=self.target_repository, object_id=self.object_id, lock=True, ssc=self.__current_ssc) + self.__current_ssc += 1 else: self.__number_of_finish_packets = 0 return - self.tc_queue.appendleft(last_command.pack_command_tuple()) \ No newline at end of file + self.tc_queue.appendleft(last_command.pack_command_tuple())