Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • Robin.Mueller/tmtc
1 result
Show changes
Showing
with 288 additions and 215 deletions
......@@ -12,7 +12,7 @@
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/obsw_tmtc_client.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-m 3 -s Dummy -c 2 -t 2" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
......
......@@ -12,7 +12,7 @@
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/obsw_tmtc_client.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-m 3 -s GPS0 -c 2 -t 2.5 --hk" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
......
......@@ -12,7 +12,7 @@
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/obsw_tmtc_client.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-m 3 -s GPS1 -c 2 -t 2.5 --hk" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
......
......@@ -12,7 +12,7 @@
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/obsw_tmtc_client.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/core/tmtc_client_core.py" />
<option name="PARAMETERS" value="-m 2 -c 3 --boardIP=127.0.0.1" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" />
......
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="tmtcclient Single Command Serial " type="PythonConfigurationType" factoryName="Python" folderName="Serial Communication">
<configuration default="false" name="tmtcclient Single Command Serial" type="PythonConfigurationType" factoryName="Python" folderName="Serial Service Test">
<module name="tmtc" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
......@@ -12,8 +12,8 @@
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/obsw_tmtc_client.py" />
<option name="PARAMETERS" value="-m 2 -c 1 -t 5" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-m 2 -c 1 -t 3" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" />
<option name="MODULE_MODE" value="false" />
......
......@@ -12,7 +12,7 @@
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/obsw_tmtc_client.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/core/tmtc_client_core.py" />
<option name="PARAMETERS" value="-m 4 -c 2 -t 3" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
......
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="tmtcclient Trigger Exceptions" type="PythonConfigurationType" factoryName="Python" folderName="Serial Utility">
<module name="tmtc" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-m 3 -s 17 -o 150 -c 1 -t 2.2" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
</component>
\ No newline at end of file
......@@ -12,7 +12,7 @@
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/obsw_tmtc_client.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/core/tmtc_client_core.py" />
<option name="PARAMETERS" value="-m 5 -c 2 -t 5" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
......
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="tmtcclient Unlock File" type="PythonConfigurationType" factoryName="Python" folderName="Serial FileManagement">
<module name="tmtc" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-m 3 -s SD -o 6 -c 1 -t 2.5" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
</component>
\ No newline at end of file
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="tmtcclient Service 200 Serial" type="PythonConfigurationType" factoryName="Python" folderName="Serial Communication">
<configuration default="false" name="tmtcclient Service 200 Serial" type="PythonConfigurationType" factoryName="Python" folderName="Serial Service Test">
<module name="tmtc" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
......@@ -12,7 +12,7 @@
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/obsw_tmtc_client.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-m 3 -s 200 -c 1 --hk -t 3" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" />
......
......@@ -16,7 +16,6 @@ git clone https://git.ksat-stuttgart.de/source/tmtc.git
Initiate the core components by initiating and updating the submodules
```sh
git submodule init
git submodule sync
git submoduke update
```
......@@ -27,7 +26,7 @@ pip install -r requirements.txt
Now the script can be tested by running
```sh
python3 obsw_tmtc_client.py -h
python obsw_tmtc_client.py -h
```
It is recommended to use and setup PyCharm to also use the preconfigured
......@@ -137,4 +136,4 @@ a new PyCharm project in a new window.
To add new configurations, go to Edit Configurations...
at the top right corner in the drop-down menu.
Specify the new run configurations and set a tick at Share through VCS.
\ No newline at end of file
Specify the new run configurations and set a tick at Share through VCS.
......@@ -6,9 +6,10 @@ import logging
LOGGER = get_logger()
class TmTcBackend(Process):
def __init__(self):
from obsw_tmtc_client import TmTcHandler
from core.tmtc_client_core import TmTcHandler
super(TmTcBackend, self).__init__()
self.address = ('localhost', 6000) # family is deduced to be 'AF_INET'
self.tmtc_backend = TmTcHandler()
......@@ -20,7 +21,7 @@ class TmTcBackend(Process):
def listen(self):
self.conn = self.listener.accept()
LOGGER.info("TmTcBackend: Connection accepted from %s",str(self.listener.last_accepted))
LOGGER.info("TmTcBackend: Connection accepted from %s", str(self.listener.last_accepted))
while True:
msg = self.conn.recv()
# do something with msg
......
# -*- coding: utf-8 -*-
"""
Created on Wed Apr 4 11:44:48 2018
Generic PUS packet class to deserialize raw PUS telemetry.
@author: S. Gaisser
"""
import crcmod
import datetime
class ObswPusPacket:
def __init__(self, byte_array: bytearray):
self.__packet_raw = byte_array
self.PUSHeader = PUSPacketHeader(byte_array)
byte_array = byte_array[6:]
self.dataFieldHeader = OBSWPUSPacketDataFieldHeader(byte_array)
byte_array = byte_array[12:]
self.data = byte_array[:len(byte_array) - 2]
self.crc = byte_array[len(byte_array) - 2] << 8 | byte_array[len(byte_array) - 1]
def get_raw_packet(self) -> bytearray:
return self.__packet_raw
def append_pus_packet_header(self, array):
self.dataFieldHeader.printDataFieldHeader(array)
self.PUSHeader.printPusPacketHeader(array)
def append_pus_packet_header_column_headers(self, array):
self.dataFieldHeader.printDataFieldHeaderColumnHeader(array)
self.PUSHeader.printPusPacketHeaderColumnHeaders(array)
def getPacketSize(self):
# PusHeader Size + data size
size = PUSPacketHeader.headerSize + self.PUSHeader.length + 1
return size
def getService(self):
return self.dataFieldHeader.type
def getSubservice(self):
return self.dataFieldHeader.subtype
def getSSC(self):
return self.PUSHeader.sourceSequenceCount
def printData(self):
print(self.returnDataString())
def returnDataString(self):
strToPrint = "["
for byte in self.data:
strToPrint += str(hex(byte)) + " , "
strToPrint = strToPrint.rstrip(' , ')
strToPrint += ']'
return strToPrint
......@@ -25,12 +25,12 @@ def set_communication_interface(tmtc_printer: TmTcPrinter) -> Union[Communicatio
:return: CommunicationInterface object
"""
try:
if g.G_COM_IF == g.ComIF.Ethernet:
if g.G_COM_IF == g.ComInterfaces.Ethernet:
communication_interface = EthernetComIF(
tmtc_printer=tmtc_printer, tm_timeout=g.G_TM_TIMEOUT,
tc_timeout_factor=g.G_TC_SEND_TIMEOUT_FACTOR, send_address=g.G_ETHERNET_SEND_ADDRESS,
receive_address=g.G_ETHERNET_RECV_ADDRESS)
elif g.G_COM_IF == g.ComIF.Serial:
elif g.G_COM_IF == g.ComInterfaces.Serial:
serial_baudrate = g.G_SERIAL_BAUDRATE
serial_timeout = g.G_SERIAL_TIMEOUT
communication_interface = SerialComIF(
......@@ -39,10 +39,14 @@ def set_communication_interface(tmtc_printer: TmTcPrinter) -> Union[Communicatio
ser_com_type=SerialCommunicationType.DLE_ENCODING)
communication_interface.set_dle_settings(
g.G_SERIAL_DLE_MAX_QUEUE_LEN, g.G_SERIAL_DLE_MAX_FRAME_SIZE, serial_timeout)
elif g.G_COM_IF == g.ComIF.QEMU:
elif g.G_COM_IF == g.ComInterfaces.QEMU:
serial_timeout = g.G_SERIAL_TIMEOUT
communication_interface = QEMUComIF(
tmtc_printer=tmtc_printer, tm_timeout=g.G_TM_TIMEOUT,
tc_timeout_factor=g.G_TC_SEND_TIMEOUT_FACTOR)
tmtc_printer=tmtc_printer,
serial_timeout=serial_timeout,
ser_com_type=SerialCommunicationType.DLE_ENCODING)
communication_interface.set_dle_settings(
g.G_SERIAL_DLE_MAX_QUEUE_LEN, g.G_SERIAL_DLE_MAX_FRAME_SIZE, serial_timeout)
else:
communication_interface = DummyComIF(tmtc_printer=tmtc_printer)
if not communication_interface.valid:
......
......@@ -10,8 +10,7 @@
import struct
import pprint
import logging
from socket import INADDR_ANY
from config.obsw_definitions import ModeList, ComIF
from config.obsw_definitions import ModeList, ComInterfaces
"""
Mission/Device specific information.
......@@ -28,6 +27,15 @@ Other global variables
# TMTC Client
G_TMTC_LOGGER_NAME = "TMTC Logger"
G_ERROR_LOG_FILE_NAME = "tmtc_error.log"
G_PP = pprint.PrettyPrinter()
LOGGER = logging.getLogger(G_TMTC_LOGGER_NAME)
# General Settings
G_SCRIPT_MODE = 1
G_MODE_ID: ModeList = ModeList.ListenerMode
G_SERVICE = 17
G_OP_CODE = 0
G_LISTENER_AFTER_OP = False
G_DISPLAY_MODE = "long"
# General TMTC Settings
G_APID = 0x65 # Global APID for EIVE
......@@ -44,6 +52,13 @@ G_TM_TIMEOUT = 6
G_TC_SEND_TIMEOUT_FACTOR = 2.0
# Serial communication
# Binary Upload Settings
G_MAX_BINARY_FRAME_LENGTH = 1500
G_MAX_APP_DATA_LENGTH = G_MAX_BINARY_FRAME_LENGTH - 100
G_COM_IF: ComInterfaces = ComInterfaces.QEMU
# COM Port for serial communication
G_COM_PORT = 'COM0'
G_SERIAL_TIMEOUT = 0.01
G_SERIAL_BAUDRATE = 230400
G_SERIAL_FRAME_SIZE = 256
......@@ -79,14 +94,13 @@ G_TM_LISTENER = None
G_COM_INTERFACE = None
G_TMTC_PRINTER = None
# noinspection PyUnusedLocal
def set_globals(args):
global G_ETHERNET_RECV_ADDRESS, G_ETHERNET_SEND_ADDRESS, G_SCRIPT_MODE, G_MODE_ID, G_SERVICE, G_DISPLAY_MODE,\
G_COM_IF, G_COM_PORT, G_SERIAL_TIMEOUT, G_TM_TIMEOUT, G_TC_SEND_TIMEOUT_FACTOR, \
G_PRINT_TO_FILE, G_PRINT_HK_DATA, G_PRINT_RAW_TM, G_PRINT_TM
if args.mode == 0:
LOGGER.info("GUI mode not implemented yet !")
global G_ETHERNET_RECV_ADDRESS, G_ETHERNET_SEND_ADDRESS, G_SCRIPT_MODE, G_MODE_ID, G_SERVICE, \
G_DISPLAY_MODE, G_COM_IF, G_COM_PORT, G_SERIAL_TIMEOUT, G_TM_TIMEOUT, \
G_TC_SEND_TIMEOUT_FACTOR, G_PRINT_TO_FILE, G_PRINT_HK_DATA, G_PRINT_RAW_TM, G_PRINT_TM, \
G_OP_CODE, G_RESEND_TC, G_LISTENER_AFTER_OP
if args.shortDisplayMode:
G_DISPLAY_MODE = "short"
else:
......@@ -108,20 +122,28 @@ def set_globals(args):
G_MODE_ID = ModeList.UnitTest
else:
G_MODE_ID = ModeList[1]
if args.com_if == ComIF.Ethernet.value:
G_COM_IF = ComIF.Ethernet
elif args.com_if == ComIF.Serial.value:
G_COM_IF = ComIF.Serial
elif args.com_if == ComIF.QEMU.value:
G_COM_IF = ComIF.QEMU
if args.com_if == ComInterfaces.Ethernet.value:
G_COM_IF = ComInterfaces.Ethernet
elif args.com_if == ComInterfaces.Serial.value:
G_COM_IF = ComInterfaces.Serial
elif args.com_if == ComInterfaces.QEMU.value:
G_COM_IF = ComInterfaces.QEMU
else:
G_COM_IF = ComIF.Dummy
G_COM_IF = ComInterfaces.Dummy
G_SERVICE = str(args.service)
if G_SERVICE.isdigit():
G_SERVICE = int(args.service)
else:
G_SERVICE = args.service
G_OP_CODE = str(args.op_code)
if G_OP_CODE.isdigit():
G_OP_CODE = int(G_OP_CODE)
else:
G_OP_CODE = str(G_OP_CODE)
G_MODE_ID = G_MODE_ID
G_PRINT_HK_DATA = args.print_hk
G_PRINT_TM = args.print_tm
......@@ -130,6 +152,8 @@ def set_globals(args):
G_COM_PORT = args.com_port
G_TM_TIMEOUT = args.tm_timeout
G_RESEND_TC = args.resend_tc
from obsw_user_code import global_setup_hook
G_LISTENER_AFTER_OP = args.listener
from config.obsw_user_code import global_setup_hook
global_setup_hook()
......@@ -17,7 +17,7 @@ class ModeList(enum.Enum):
PromptMode = 32
class ComIF(enum.Enum):
class ComInterfaces(enum.Enum):
Dummy = 0
Serial = 1
QEMU = 2
......
File moved
File moved
#!/usr/bin/python3
# -*- coding: utf-8 -*-
"""
@brief This client was developed by KSat for the SOURCE project to test the on-board software.
@details
This client features multiple sender/receiver modes and has been designed
to be extensible and easy to use. This clien is based on the PUS standard for the format
of telecommands and telemetry. It can also send TMTC via different interfaces like the
serial interface (USB port) or ethernet interface.
@license
Copyright 2020 KSat e.V. Stuttgart
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
@manual
Run this file with the -h flag to display options.
"""
import atexit
import time
import logging
import sys
from multiprocessing import Process
from collections import deque
from typing import Tuple, Union
from config import obsw_config as g
from config.obsw_config import set_globals
from config.obsw_com_config import set_communication_interface
from config.obsw_definitions import ModeList
from config.obsw_user_code import command_preparation_hook
from tmtc_core.utility.obsw_logger import get_logger
from tmtc_core.tc.obsw_pus_tc_base import PusTcInfo
from tmtc_core.sendreceive.obsw_single_command_sender_receiver import SingleCommandSenderReceiver
from tmtc_core.sendreceive.obsw_sequential_sender_receiver import SequentialCommandSenderReceiver
from tmtc_core.sendreceive.obsw_tm_listener import TmListener
from tmtc_core.comIF.obsw_com_interface import CommunicationInterface
from tmtc_core.utility.obsw_tmtc_printer import TmTcPrinter
from tmtc_core.utility.obsw_exit_handler import keyboard_interrupt_handler
from tmtc_core.utility.obsw_logger import set_tmtc_logger, get_logger
from tc.obsw_pus_tc_packer import ServiceQueuePacker, create_total_tc_queue
from test.obsw_pus_service_test import run_selected_pus_tests
from tc.obsw_pus_tc_packer import create_total_tc_queue, ServiceQueuePacker
from utility.obsw_args_parser import parse_input_arguments
from utility.obsw_binary_uploader import perform_binary_upload
from gui.obsw_tmtc_gui import TmTcGUI
from gui.obsw_backend_test import TmTcBackend
from obsw_user_code import command_preparation_hook
from config.obsw_com_config import set_communication_interface
from utility.obsw_binary_uploader import BinaryFileUploader
LOGGER = get_logger()
def main():
class TmTcHandler:
"""
Main method, reads input arguments, sets global variables and start TMTC handler.
This is the primary class which handles TMTC reception. This can be seen as the backend
in case a GUI or front-end is implemented.
"""
set_tmtc_logger()
LOGGER.info("Starting TMTC Client")
LOGGER.info("Parsing input arguments")
args = parse_input_arguments()
LOGGER.info("Setting global variables")
set_globals(args)
LOGGER.info("Starting TMTC Handler")
if g.G_MODE_ID == g.ModeList.GUIMode:
do_gui_test()
else:
tmtc_handler = TmTcHandler()
tmtc_handler.perform_operation()
def __init__(self, init_mode: ModeList = ModeList.ListenerMode):
self.mode = init_mode
self.com_if = g.G_COM_IF
# This flag could be used later to command the TMTC Client with a front-end
self.one_shot_operation = True
# At some later point, the program will run permanently and be able to take commands.
# For now we put a permanent loop here so the program
# doesn't exit automatically (TM Listener is daemonic)
while True:
pass
self.tmtc_printer: Union[None, TmTcPrinter] = None
self.communication_interface: Union[None, CommunicationInterface] = None
self.tm_listener: Union[None, TmListener] = None
self.single_command_package: Tuple[bytearray, Union[None, PusTcInfo]] = bytearray(), None
def do_gui_test():
# Experimental
backend = TmTcBackend()
backend.start()
gui = TmTcGUI()
gui.start()
backend.join()
gui.join()
LOGGER.info("Both processes have closed")
sys.exit()
def set_one_shot_or_loop_handling(self, enable: bool):
"""
Specify whether the perform_operation() call will only handle one action depending
on the mode or keep listening for replies after handling an operation.
"""
self.one_shot_operation = enable
def set_mode(self, mode: ModeList):
"""
Set the mode which will determine what perform_operation does.
"""
self.mode = mode
def command_preparation() -> Tuple[bytearray, Union[None, PusTcInfo]]:
"""
Prepare command for single command testing
:return:
"""
return command_preparation_hook()
@staticmethod
def prepare_tmtc_handler_start(init_mode: ModeList = g.ModeList.ListenerMode):
tmtc_handler = TmTcHandler(init_mode)
tmtc_task = Process(target=TmTcHandler.start_handler, args=(tmtc_handler, ))
return tmtc_task
@staticmethod
def start_handler(executed_handler):
executed_handler.initialize()
executed_handler.perform_operation()
class TmTcHandler:
"""
This is the primary class which handles TMTC reception. This can be seen as the backend
in case a GUI or front-end is implemented.
"""
def __init__(self):
self.mode = g.G_MODE_ID
self.com_if = g.G_COM_IF
# This flag could be used later to command the TMTC Client with a front-end
self.command_received = True
def initialize(self):
"""
Perform initialization steps which might be necessary after class construction.
This has to be called at some point before using the class!
"""
self.tmtc_printer = TmTcPrinter(g.G_DISPLAY_MODE, g.G_PRINT_TO_FILE, True)
self.communication_interface = set_communication_interface(self.tmtc_printer)
self.tm_listener = TmListener(
......@@ -133,25 +90,16 @@ class TmTcHandler:
"""
Periodic operation
"""
while True:
try:
if self.command_received:
self.command_received = False
self.handle_action()
if self.mode == g.ModeList.Idle:
LOGGER.info("TMTC Client in idle mode")
time.sleep(5)
if self.mode == g.ModeList.ListenerMode:
time.sleep(1)
except KeyboardInterrupt:
LOGGER.info("Closing TMTC client.")
sys.exit()
except IOError as e:
LOGGER.exception(e)
LOGGER.info("Closing TMTC client.")
sys.exit()
def handle_action(self):
try:
self.__core_operation(self.one_shot_operation)
except KeyboardInterrupt:
LOGGER.info("Keyboard Interrupt.")
sys.exit()
except IOError as e:
LOGGER.error("IO Error occured!")
sys.exit()
def __handle_action(self):
"""
Command handling.
"""
......@@ -159,27 +107,31 @@ class TmTcHandler:
self.prompt_mode()
if self.mode == g.ModeList.ListenerMode:
if self.tm_listener.event_reply_received.is_set():
if self.tm_listener.reply_event():
LOGGER.info("TmTcHandler: Packets received.")
self.tmtc_printer.print_telemetry_queue(self.tm_listener.retrieve_tm_packet_queue())
self.tm_listener.clear_tm_packet_queue()
self.tm_listener.event_reply_received.clear()
self.command_received = True
self.tm_listener.clear_reply_event()
elif self.mode == g.ModeList.SingleCommandMode:
pus_packet_tuple = command_preparation()
if self.single_command_package is None:
pus_packet_tuple = command_preparation()
else:
LOGGER.info("send package from gui")
pus_packet_tuple = self.single_command_package
sender_and_receiver = SingleCommandSenderReceiver(
com_interface=self.communication_interface, tmtc_printer=self.tmtc_printer,
tm_listener=self.tm_listener)
LOGGER.info("Performing single command operation")
sender_and_receiver.send_single_tc_and_receive_tm(pus_packet_tuple=pus_packet_tuple)
self.command_received = True
self.mode = g.ModeList.PromptMode
elif self.mode == g.ModeList.ServiceTestMode:
service_queue = deque()
service_queue_packer = ServiceQueuePacker()
service_queue_packer.pack_service_queue(g.G_SERVICE, service_queue)
op_code = g.G_OP_CODE
service_queue_packer.pack_service_queue(
service=g.G_SERVICE, service_queue=service_queue, op_code=op_code)
if not self.communication_interface.valid:
return
LOGGER.info("Performing service command operation")
......@@ -187,7 +139,6 @@ class TmTcHandler:
com_interface=self.communication_interface, tmtc_printer=self.tmtc_printer,
tm_listener=self.tm_listener, tc_queue=service_queue)
sender_and_receiver.send_queue_tc_and_receive_tm_sequentially()
self.command_received = True
self.mode = g.ModeList.ListenerMode
elif self.mode == g.ModeList.SoftwareTestMode:
......@@ -203,8 +154,9 @@ class TmTcHandler:
elif self.mode == g.ModeList.BinaryUploadMode:
# Upload binary, prompt user for input, in the end prompt for new mode and enter that
# mode
perform_binary_upload()
self.command_received = True
file_uploader = BinaryFileUploader(self.communication_interface, self.tmtc_printer,
self.tm_listener)
file_uploader.perform_file_upload()
self.mode = g.ModeList.ListenerMode
elif self.mode == g.ModeList.UnitTest:
......@@ -219,6 +171,19 @@ class TmTcHandler:
logging.error("Unknown Mode, Configuration error !")
sys.exit()
def __core_operation(self, one_shot):
if not one_shot:
while True:
self.__handle_action()
if self.mode == g.ModeList.Idle:
LOGGER.info("TMTC Client in idle mode")
time.sleep(5)
elif self.mode == g.ModeList.ListenerMode:
time.sleep(1)
else:
self.__handle_action()
def prompt_mode(self):
next_mode = input("Please enter next mode (enter h for list of modes): ")
if next_mode == 'h':
......@@ -235,6 +200,22 @@ class TmTcHandler:
else:
self.mode = g.ModeList.ListenerMode
# These two will not be used for now.
@staticmethod
def prepare_tmtc_handler_start_in_process(init_mode: ModeList):
tmtc_handler_task = Process(target=TmTcHandler.start_tmtc_handler, args=(init_mode, ))
return tmtc_handler_task
@staticmethod
def start_tmtc_handler(handler_args: any):
tmtc_handler = TmTcHandler(handler_args)
tmtc_handler.initialize()
tmtc_handler.perform_operation()
if __name__ == "__main__":
main()
def command_preparation() -> Tuple[bytearray, Union[None, PusTcInfo]]:
"""
Prepare command for single command testing
:return:
"""
return command_preparation_hook()
#!/usr/bin/python3
"""
@brief This client was developed by KSat for the SOURCE project to test the on-board software.
@details
This client features multiple sender/receiver modes and has been designed
to be extensible and easy to use. This clien is based on the PUS standard for the format
of telecommands and telemetry. It can also send TMTC via different interfaces like the
serial interface (USB port) or ethernet interface.
@license
Copyright 2020 KSat e.V. Stuttgart
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
@manual
Run this file with the -h flag to display options.
"""
from multiprocessing import Process
from tmtc_core.utility.obsw_logger import set_tmtc_logger, get_logger
from config.obsw_config import set_globals
import config.obsw_config as g
from core.tmtc_backend import TmTcHandler
from core.tmtc_frontend import TmTcFrontend
from utility.obsw_args_parser import parse_input_arguments
LOGGER = get_logger()
def run_tmtc_client(use_gui: bool):
"""
Main method, reads input arguments, sets global variables and start TMTC handler.
"""
set_tmtc_logger()
LOGGER.info("Starting TMTC Client")
if not use_gui:
LOGGER.info("Parsing input arguments")
args = parse_input_arguments()
LOGGER.info("Setting global variables")
set_globals(args)
LOGGER.info("Starting TMTC Handler")
tmtc_frontend_task = Process
# Currently does not work, problems with QEMU / Asyncio
if not use_gui:
tmtc_handler = TmTcHandler(g.G_MODE_ID)
tmtc_handler.set_one_shot_or_loop_handling(g.G_LISTENER_AFTER_OP)
tmtc_handler.initialize()
tmtc_handler.perform_operation()
else:
tmtc_gui = TmTcFrontend()
tmtc_gui.start_ui()
# tmtc_handler_task = TmTcHandler.prepare_tmtc_handler_start()
# tmtc_frontend = TmTcFrontend()
# tmtc_frontend_task = tmtc_frontend.prepare_start(tmtc_frontend)
# tmtc_frontend_task.start()
# tmtc_handler_task.start()
# tmtc_handler_task.join()
# tmtc_frontend_task.join()