Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • Robin.Mueller/tmtc
1 result
Show changes
Showing
with 288 additions and 215 deletions
...@@ -12,7 +12,7 @@ ...@@ -12,7 +12,7 @@
<option name="ADD_CONTENT_ROOTS" value="true" /> <option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" /> <option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" /> <EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/obsw_tmtc_client.py" /> <option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-m 3 -s Dummy -c 2 -t 2" /> <option name="PARAMETERS" value="-m 3 -s Dummy -c 2 -t 2" />
<option name="SHOW_COMMAND_LINE" value="false" /> <option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" /> <option name="EMULATE_TERMINAL" value="true" />
......
...@@ -12,7 +12,7 @@ ...@@ -12,7 +12,7 @@
<option name="ADD_CONTENT_ROOTS" value="true" /> <option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" /> <option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" /> <EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/obsw_tmtc_client.py" /> <option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-m 3 -s GPS0 -c 2 -t 2.5 --hk" /> <option name="PARAMETERS" value="-m 3 -s GPS0 -c 2 -t 2.5 --hk" />
<option name="SHOW_COMMAND_LINE" value="false" /> <option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" /> <option name="EMULATE_TERMINAL" value="true" />
......
...@@ -12,7 +12,7 @@ ...@@ -12,7 +12,7 @@
<option name="ADD_CONTENT_ROOTS" value="true" /> <option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" /> <option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" /> <EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/obsw_tmtc_client.py" /> <option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-m 3 -s GPS1 -c 2 -t 2.5 --hk" /> <option name="PARAMETERS" value="-m 3 -s GPS1 -c 2 -t 2.5 --hk" />
<option name="SHOW_COMMAND_LINE" value="false" /> <option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" /> <option name="EMULATE_TERMINAL" value="true" />
......
...@@ -12,7 +12,7 @@ ...@@ -12,7 +12,7 @@
<option name="ADD_CONTENT_ROOTS" value="true" /> <option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" /> <option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" /> <EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/obsw_tmtc_client.py" /> <option name="SCRIPT_NAME" value="$PROJECT_DIR$/core/tmtc_client_core.py" />
<option name="PARAMETERS" value="-m 2 -c 3 --boardIP=127.0.0.1" /> <option name="PARAMETERS" value="-m 2 -c 3 --boardIP=127.0.0.1" />
<option name="SHOW_COMMAND_LINE" value="false" /> <option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" /> <option name="EMULATE_TERMINAL" value="false" />
......
<component name="ProjectRunConfigurationManager"> <component name="ProjectRunConfigurationManager">
<configuration default="false" name="tmtcclient Single Command Serial " type="PythonConfigurationType" factoryName="Python" folderName="Serial Communication"> <configuration default="false" name="tmtcclient Single Command Serial" type="PythonConfigurationType" factoryName="Python" folderName="Serial Service Test">
<module name="tmtc" /> <module name="tmtc" />
<option name="INTERPRETER_OPTIONS" value="" /> <option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" /> <option name="PARENT_ENVS" value="true" />
...@@ -12,8 +12,8 @@ ...@@ -12,8 +12,8 @@
<option name="ADD_CONTENT_ROOTS" value="true" /> <option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" /> <option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" /> <EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/obsw_tmtc_client.py" /> <option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-m 2 -c 1 -t 5" /> <option name="PARAMETERS" value="-m 2 -c 1 -t 3" />
<option name="SHOW_COMMAND_LINE" value="false" /> <option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" /> <option name="EMULATE_TERMINAL" value="false" />
<option name="MODULE_MODE" value="false" /> <option name="MODULE_MODE" value="false" />
......
...@@ -12,7 +12,7 @@ ...@@ -12,7 +12,7 @@
<option name="ADD_CONTENT_ROOTS" value="true" /> <option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" /> <option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" /> <EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/obsw_tmtc_client.py" /> <option name="SCRIPT_NAME" value="$PROJECT_DIR$/core/tmtc_client_core.py" />
<option name="PARAMETERS" value="-m 4 -c 2 -t 3" /> <option name="PARAMETERS" value="-m 4 -c 2 -t 3" />
<option name="SHOW_COMMAND_LINE" value="false" /> <option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" /> <option name="EMULATE_TERMINAL" value="true" />
......
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="tmtcclient Trigger Exceptions" type="PythonConfigurationType" factoryName="Python" folderName="Serial Utility">
<module name="tmtc" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-m 3 -s 17 -o 150 -c 1 -t 2.2" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
</component>
\ No newline at end of file
...@@ -12,7 +12,7 @@ ...@@ -12,7 +12,7 @@
<option name="ADD_CONTENT_ROOTS" value="true" /> <option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" /> <option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" /> <EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/obsw_tmtc_client.py" /> <option name="SCRIPT_NAME" value="$PROJECT_DIR$/core/tmtc_client_core.py" />
<option name="PARAMETERS" value="-m 5 -c 2 -t 5" /> <option name="PARAMETERS" value="-m 5 -c 2 -t 5" />
<option name="SHOW_COMMAND_LINE" value="false" /> <option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" /> <option name="EMULATE_TERMINAL" value="true" />
......
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="tmtcclient Unlock File" type="PythonConfigurationType" factoryName="Python" folderName="Serial FileManagement">
<module name="tmtc" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-m 3 -s SD -o 6 -c 1 -t 2.5" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
</component>
\ No newline at end of file
<component name="ProjectRunConfigurationManager"> <component name="ProjectRunConfigurationManager">
<configuration default="false" name="tmtcclient Service 200 Serial" type="PythonConfigurationType" factoryName="Python" folderName="Serial Communication"> <configuration default="false" name="tmtcclient Service 200 Serial" type="PythonConfigurationType" factoryName="Python" folderName="Serial Service Test">
<module name="tmtc" /> <module name="tmtc" />
<option name="INTERPRETER_OPTIONS" value="" /> <option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" /> <option name="PARENT_ENVS" value="true" />
...@@ -12,7 +12,7 @@ ...@@ -12,7 +12,7 @@
<option name="ADD_CONTENT_ROOTS" value="true" /> <option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" /> <option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" /> <EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/obsw_tmtc_client.py" /> <option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtc_client_cli.py" />
<option name="PARAMETERS" value="-m 3 -s 200 -c 1 --hk -t 3" /> <option name="PARAMETERS" value="-m 3 -s 200 -c 1 --hk -t 3" />
<option name="SHOW_COMMAND_LINE" value="false" /> <option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" /> <option name="EMULATE_TERMINAL" value="false" />
......
...@@ -16,7 +16,6 @@ git clone https://git.ksat-stuttgart.de/source/tmtc.git ...@@ -16,7 +16,6 @@ git clone https://git.ksat-stuttgart.de/source/tmtc.git
Initiate the core components by initiating and updating the submodules Initiate the core components by initiating and updating the submodules
```sh ```sh
git submodule init git submodule init
git submodule sync
git submoduke update git submoduke update
``` ```
...@@ -27,7 +26,7 @@ pip install -r requirements.txt ...@@ -27,7 +26,7 @@ pip install -r requirements.txt
Now the script can be tested by running Now the script can be tested by running
```sh ```sh
python3 obsw_tmtc_client.py -h python obsw_tmtc_client.py -h
``` ```
It is recommended to use and setup PyCharm to also use the preconfigured It is recommended to use and setup PyCharm to also use the preconfigured
...@@ -137,4 +136,4 @@ a new PyCharm project in a new window. ...@@ -137,4 +136,4 @@ a new PyCharm project in a new window.
To add new configurations, go to Edit Configurations... To add new configurations, go to Edit Configurations...
at the top right corner in the drop-down menu. at the top right corner in the drop-down menu.
Specify the new run configurations and set a tick at Share through VCS. Specify the new run configurations and set a tick at Share through VCS.
\ No newline at end of file
...@@ -6,9 +6,10 @@ import logging ...@@ -6,9 +6,10 @@ import logging
LOGGER = get_logger() LOGGER = get_logger()
class TmTcBackend(Process): class TmTcBackend(Process):
def __init__(self): def __init__(self):
from obsw_tmtc_client import TmTcHandler from core.tmtc_client_core import TmTcHandler
super(TmTcBackend, self).__init__() super(TmTcBackend, self).__init__()
self.address = ('localhost', 6000) # family is deduced to be 'AF_INET' self.address = ('localhost', 6000) # family is deduced to be 'AF_INET'
self.tmtc_backend = TmTcHandler() self.tmtc_backend = TmTcHandler()
...@@ -20,7 +21,7 @@ class TmTcBackend(Process): ...@@ -20,7 +21,7 @@ class TmTcBackend(Process):
def listen(self): def listen(self):
self.conn = self.listener.accept() self.conn = self.listener.accept()
LOGGER.info("TmTcBackend: Connection accepted from %s",str(self.listener.last_accepted)) LOGGER.info("TmTcBackend: Connection accepted from %s", str(self.listener.last_accepted))
while True: while True:
msg = self.conn.recv() msg = self.conn.recv()
# do something with msg # do something with msg
......
# -*- coding: utf-8 -*-
"""
Created on Wed Apr 4 11:44:48 2018
Generic PUS packet class to deserialize raw PUS telemetry.
@author: S. Gaisser
"""
import crcmod
import datetime
class ObswPusPacket:
def __init__(self, byte_array: bytearray):
self.__packet_raw = byte_array
self.PUSHeader = PUSPacketHeader(byte_array)
byte_array = byte_array[6:]
self.dataFieldHeader = OBSWPUSPacketDataFieldHeader(byte_array)
byte_array = byte_array[12:]
self.data = byte_array[:len(byte_array) - 2]
self.crc = byte_array[len(byte_array) - 2] << 8 | byte_array[len(byte_array) - 1]
def get_raw_packet(self) -> bytearray:
return self.__packet_raw
def append_pus_packet_header(self, array):
self.dataFieldHeader.printDataFieldHeader(array)
self.PUSHeader.printPusPacketHeader(array)
def append_pus_packet_header_column_headers(self, array):
self.dataFieldHeader.printDataFieldHeaderColumnHeader(array)
self.PUSHeader.printPusPacketHeaderColumnHeaders(array)
def getPacketSize(self):
# PusHeader Size + data size
size = PUSPacketHeader.headerSize + self.PUSHeader.length + 1
return size
def getService(self):
return self.dataFieldHeader.type
def getSubservice(self):
return self.dataFieldHeader.subtype
def getSSC(self):
return self.PUSHeader.sourceSequenceCount
def printData(self):
print(self.returnDataString())
def returnDataString(self):
strToPrint = "["
for byte in self.data:
strToPrint += str(hex(byte)) + " , "
strToPrint = strToPrint.rstrip(' , ')
strToPrint += ']'
return strToPrint
...@@ -25,12 +25,12 @@ def set_communication_interface(tmtc_printer: TmTcPrinter) -> Union[Communicatio ...@@ -25,12 +25,12 @@ def set_communication_interface(tmtc_printer: TmTcPrinter) -> Union[Communicatio
:return: CommunicationInterface object :return: CommunicationInterface object
""" """
try: try:
if g.G_COM_IF == g.ComIF.Ethernet: if g.G_COM_IF == g.ComInterfaces.Ethernet:
communication_interface = EthernetComIF( communication_interface = EthernetComIF(
tmtc_printer=tmtc_printer, tm_timeout=g.G_TM_TIMEOUT, tmtc_printer=tmtc_printer, tm_timeout=g.G_TM_TIMEOUT,
tc_timeout_factor=g.G_TC_SEND_TIMEOUT_FACTOR, send_address=g.G_ETHERNET_SEND_ADDRESS, tc_timeout_factor=g.G_TC_SEND_TIMEOUT_FACTOR, send_address=g.G_ETHERNET_SEND_ADDRESS,
receive_address=g.G_ETHERNET_RECV_ADDRESS) receive_address=g.G_ETHERNET_RECV_ADDRESS)
elif g.G_COM_IF == g.ComIF.Serial: elif g.G_COM_IF == g.ComInterfaces.Serial:
serial_baudrate = g.G_SERIAL_BAUDRATE serial_baudrate = g.G_SERIAL_BAUDRATE
serial_timeout = g.G_SERIAL_TIMEOUT serial_timeout = g.G_SERIAL_TIMEOUT
communication_interface = SerialComIF( communication_interface = SerialComIF(
...@@ -39,10 +39,14 @@ def set_communication_interface(tmtc_printer: TmTcPrinter) -> Union[Communicatio ...@@ -39,10 +39,14 @@ def set_communication_interface(tmtc_printer: TmTcPrinter) -> Union[Communicatio
ser_com_type=SerialCommunicationType.DLE_ENCODING) ser_com_type=SerialCommunicationType.DLE_ENCODING)
communication_interface.set_dle_settings( communication_interface.set_dle_settings(
g.G_SERIAL_DLE_MAX_QUEUE_LEN, g.G_SERIAL_DLE_MAX_FRAME_SIZE, serial_timeout) g.G_SERIAL_DLE_MAX_QUEUE_LEN, g.G_SERIAL_DLE_MAX_FRAME_SIZE, serial_timeout)
elif g.G_COM_IF == g.ComIF.QEMU: elif g.G_COM_IF == g.ComInterfaces.QEMU:
serial_timeout = g.G_SERIAL_TIMEOUT
communication_interface = QEMUComIF( communication_interface = QEMUComIF(
tmtc_printer=tmtc_printer, tm_timeout=g.G_TM_TIMEOUT, tmtc_printer=tmtc_printer,
tc_timeout_factor=g.G_TC_SEND_TIMEOUT_FACTOR) serial_timeout=serial_timeout,
ser_com_type=SerialCommunicationType.DLE_ENCODING)
communication_interface.set_dle_settings(
g.G_SERIAL_DLE_MAX_QUEUE_LEN, g.G_SERIAL_DLE_MAX_FRAME_SIZE, serial_timeout)
else: else:
communication_interface = DummyComIF(tmtc_printer=tmtc_printer) communication_interface = DummyComIF(tmtc_printer=tmtc_printer)
if not communication_interface.valid: if not communication_interface.valid:
......
...@@ -10,8 +10,7 @@ ...@@ -10,8 +10,7 @@
import struct import struct
import pprint import pprint
import logging import logging
from socket import INADDR_ANY from config.obsw_definitions import ModeList, ComInterfaces
from config.obsw_definitions import ModeList, ComIF
""" """
Mission/Device specific information. Mission/Device specific information.
...@@ -28,6 +27,15 @@ Other global variables ...@@ -28,6 +27,15 @@ Other global variables
# TMTC Client # TMTC Client
G_TMTC_LOGGER_NAME = "TMTC Logger" G_TMTC_LOGGER_NAME = "TMTC Logger"
G_ERROR_LOG_FILE_NAME = "tmtc_error.log" G_ERROR_LOG_FILE_NAME = "tmtc_error.log"
G_PP = pprint.PrettyPrinter()
LOGGER = logging.getLogger(G_TMTC_LOGGER_NAME)
# General Settings
G_SCRIPT_MODE = 1
G_MODE_ID: ModeList = ModeList.ListenerMode
G_SERVICE = 17
G_OP_CODE = 0
G_LISTENER_AFTER_OP = False
G_DISPLAY_MODE = "long"
# General TMTC Settings # General TMTC Settings
G_APID = 0x65 # Global APID for EIVE G_APID = 0x65 # Global APID for EIVE
...@@ -44,6 +52,13 @@ G_TM_TIMEOUT = 6 ...@@ -44,6 +52,13 @@ G_TM_TIMEOUT = 6
G_TC_SEND_TIMEOUT_FACTOR = 2.0 G_TC_SEND_TIMEOUT_FACTOR = 2.0
# Serial communication # Serial communication
# Binary Upload Settings
G_MAX_BINARY_FRAME_LENGTH = 1500
G_MAX_APP_DATA_LENGTH = G_MAX_BINARY_FRAME_LENGTH - 100
G_COM_IF: ComInterfaces = ComInterfaces.QEMU
# COM Port for serial communication
G_COM_PORT = 'COM0'
G_SERIAL_TIMEOUT = 0.01 G_SERIAL_TIMEOUT = 0.01
G_SERIAL_BAUDRATE = 230400 G_SERIAL_BAUDRATE = 230400
G_SERIAL_FRAME_SIZE = 256 G_SERIAL_FRAME_SIZE = 256
...@@ -79,14 +94,13 @@ G_TM_LISTENER = None ...@@ -79,14 +94,13 @@ G_TM_LISTENER = None
G_COM_INTERFACE = None G_COM_INTERFACE = None
G_TMTC_PRINTER = None G_TMTC_PRINTER = None
# noinspection PyUnusedLocal # noinspection PyUnusedLocal
def set_globals(args): def set_globals(args):
global G_ETHERNET_RECV_ADDRESS, G_ETHERNET_SEND_ADDRESS, G_SCRIPT_MODE, G_MODE_ID, G_SERVICE, G_DISPLAY_MODE,\ global G_ETHERNET_RECV_ADDRESS, G_ETHERNET_SEND_ADDRESS, G_SCRIPT_MODE, G_MODE_ID, G_SERVICE, \
G_COM_IF, G_COM_PORT, G_SERIAL_TIMEOUT, G_TM_TIMEOUT, G_TC_SEND_TIMEOUT_FACTOR, \ G_DISPLAY_MODE, G_COM_IF, G_COM_PORT, G_SERIAL_TIMEOUT, G_TM_TIMEOUT, \
G_PRINT_TO_FILE, G_PRINT_HK_DATA, G_PRINT_RAW_TM, G_PRINT_TM G_TC_SEND_TIMEOUT_FACTOR, G_PRINT_TO_FILE, G_PRINT_HK_DATA, G_PRINT_RAW_TM, G_PRINT_TM, \
if args.mode == 0: G_OP_CODE, G_RESEND_TC, G_LISTENER_AFTER_OP
LOGGER.info("GUI mode not implemented yet !")
if args.shortDisplayMode: if args.shortDisplayMode:
G_DISPLAY_MODE = "short" G_DISPLAY_MODE = "short"
else: else:
...@@ -108,20 +122,28 @@ def set_globals(args): ...@@ -108,20 +122,28 @@ def set_globals(args):
G_MODE_ID = ModeList.UnitTest G_MODE_ID = ModeList.UnitTest
else: else:
G_MODE_ID = ModeList[1] G_MODE_ID = ModeList[1]
if args.com_if == ComIF.Ethernet.value:
G_COM_IF = ComIF.Ethernet if args.com_if == ComInterfaces.Ethernet.value:
elif args.com_if == ComIF.Serial.value: G_COM_IF = ComInterfaces.Ethernet
G_COM_IF = ComIF.Serial elif args.com_if == ComInterfaces.Serial.value:
elif args.com_if == ComIF.QEMU.value: G_COM_IF = ComInterfaces.Serial
G_COM_IF = ComIF.QEMU elif args.com_if == ComInterfaces.QEMU.value:
G_COM_IF = ComInterfaces.QEMU
else: else:
G_COM_IF = ComIF.Dummy G_COM_IF = ComInterfaces.Dummy
G_SERVICE = str(args.service) G_SERVICE = str(args.service)
if G_SERVICE.isdigit(): if G_SERVICE.isdigit():
G_SERVICE = int(args.service) G_SERVICE = int(args.service)
else: else:
G_SERVICE = args.service G_SERVICE = args.service
G_OP_CODE = str(args.op_code)
if G_OP_CODE.isdigit():
G_OP_CODE = int(G_OP_CODE)
else:
G_OP_CODE = str(G_OP_CODE)
G_MODE_ID = G_MODE_ID G_MODE_ID = G_MODE_ID
G_PRINT_HK_DATA = args.print_hk G_PRINT_HK_DATA = args.print_hk
G_PRINT_TM = args.print_tm G_PRINT_TM = args.print_tm
...@@ -130,6 +152,8 @@ def set_globals(args): ...@@ -130,6 +152,8 @@ def set_globals(args):
G_COM_PORT = args.com_port G_COM_PORT = args.com_port
G_TM_TIMEOUT = args.tm_timeout G_TM_TIMEOUT = args.tm_timeout
G_RESEND_TC = args.resend_tc G_RESEND_TC = args.resend_tc
from obsw_user_code import global_setup_hook G_LISTENER_AFTER_OP = args.listener
from config.obsw_user_code import global_setup_hook
global_setup_hook() global_setup_hook()
...@@ -17,7 +17,7 @@ class ModeList(enum.Enum): ...@@ -17,7 +17,7 @@ class ModeList(enum.Enum):
PromptMode = 32 PromptMode = 32
class ComIF(enum.Enum): class ComInterfaces(enum.Enum):
Dummy = 0 Dummy = 0
Serial = 1 Serial = 1
QEMU = 2 QEMU = 2
......
File moved
File moved
#!/usr/bin/python3
# -*- coding: utf-8 -*-
"""
@brief This client was developed by KSat for the SOURCE project to test the on-board software.
@details
This client features multiple sender/receiver modes and has been designed
to be extensible and easy to use. This clien is based on the PUS standard for the format
of telecommands and telemetry. It can also send TMTC via different interfaces like the
serial interface (USB port) or ethernet interface.
@license
Copyright 2020 KSat e.V. Stuttgart
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
@manual
Run this file with the -h flag to display options.
"""
import atexit import atexit
import time import time
import logging import logging
import sys import sys
from multiprocessing import Process
from collections import deque from collections import deque
from typing import Tuple, Union from typing import Tuple, Union
from config import obsw_config as g from config import obsw_config as g
from config.obsw_config import set_globals from config.obsw_definitions import ModeList
from config.obsw_com_config import set_communication_interface from config.obsw_user_code import command_preparation_hook
from tmtc_core.utility.obsw_logger import get_logger
from tmtc_core.tc.obsw_pus_tc_base import PusTcInfo from tmtc_core.tc.obsw_pus_tc_base import PusTcInfo
from tmtc_core.sendreceive.obsw_single_command_sender_receiver import SingleCommandSenderReceiver from tmtc_core.sendreceive.obsw_single_command_sender_receiver import SingleCommandSenderReceiver
from tmtc_core.sendreceive.obsw_sequential_sender_receiver import SequentialCommandSenderReceiver from tmtc_core.sendreceive.obsw_sequential_sender_receiver import SequentialCommandSenderReceiver
from tmtc_core.sendreceive.obsw_tm_listener import TmListener from tmtc_core.sendreceive.obsw_tm_listener import TmListener
from tmtc_core.comIF.obsw_com_interface import CommunicationInterface
from tmtc_core.utility.obsw_tmtc_printer import TmTcPrinter from tmtc_core.utility.obsw_tmtc_printer import TmTcPrinter
from tmtc_core.utility.obsw_exit_handler import keyboard_interrupt_handler from tmtc_core.utility.obsw_exit_handler import keyboard_interrupt_handler
from tmtc_core.utility.obsw_logger import set_tmtc_logger, get_logger
from tc.obsw_pus_tc_packer import ServiceQueuePacker, create_total_tc_queue
from test.obsw_pus_service_test import run_selected_pus_tests from test.obsw_pus_service_test import run_selected_pus_tests
from tc.obsw_pus_tc_packer import create_total_tc_queue, ServiceQueuePacker from config.obsw_com_config import set_communication_interface
from utility.obsw_args_parser import parse_input_arguments from utility.obsw_binary_uploader import BinaryFileUploader
from utility.obsw_binary_uploader import perform_binary_upload
from gui.obsw_tmtc_gui import TmTcGUI
from gui.obsw_backend_test import TmTcBackend
from obsw_user_code import command_preparation_hook
LOGGER = get_logger() LOGGER = get_logger()
def main(): class TmTcHandler:
""" """
Main method, reads input arguments, sets global variables and start TMTC handler. This is the primary class which handles TMTC reception. This can be seen as the backend
in case a GUI or front-end is implemented.
""" """
set_tmtc_logger() def __init__(self, init_mode: ModeList = ModeList.ListenerMode):
LOGGER.info("Starting TMTC Client") self.mode = init_mode
self.com_if = g.G_COM_IF
LOGGER.info("Parsing input arguments") # This flag could be used later to command the TMTC Client with a front-end
args = parse_input_arguments() self.one_shot_operation = True
LOGGER.info("Setting global variables")
set_globals(args)
LOGGER.info("Starting TMTC Handler")
if g.G_MODE_ID == g.ModeList.GUIMode:
do_gui_test()
else:
tmtc_handler = TmTcHandler()
tmtc_handler.perform_operation()
# At some later point, the program will run permanently and be able to take commands. self.tmtc_printer: Union[None, TmTcPrinter] = None
# For now we put a permanent loop here so the program self.communication_interface: Union[None, CommunicationInterface] = None
# doesn't exit automatically (TM Listener is daemonic) self.tm_listener: Union[None, TmListener] = None
while True:
pass
self.single_command_package: Tuple[bytearray, Union[None, PusTcInfo]] = bytearray(), None
def do_gui_test(): def set_one_shot_or_loop_handling(self, enable: bool):
# Experimental """
backend = TmTcBackend() Specify whether the perform_operation() call will only handle one action depending
backend.start() on the mode or keep listening for replies after handling an operation.
gui = TmTcGUI() """
gui.start() self.one_shot_operation = enable
backend.join()
gui.join()
LOGGER.info("Both processes have closed")
sys.exit()
def set_mode(self, mode: ModeList):
"""
Set the mode which will determine what perform_operation does.
"""
self.mode = mode
def command_preparation() -> Tuple[bytearray, Union[None, PusTcInfo]]: @staticmethod
""" def prepare_tmtc_handler_start(init_mode: ModeList = g.ModeList.ListenerMode):
Prepare command for single command testing tmtc_handler = TmTcHandler(init_mode)
:return: tmtc_task = Process(target=TmTcHandler.start_handler, args=(tmtc_handler, ))
""" return tmtc_task
return command_preparation_hook()
@staticmethod
def start_handler(executed_handler):
executed_handler.initialize()
executed_handler.perform_operation()
class TmTcHandler: def initialize(self):
""" """
This is the primary class which handles TMTC reception. This can be seen as the backend Perform initialization steps which might be necessary after class construction.
in case a GUI or front-end is implemented. This has to be called at some point before using the class!
""" """
def __init__(self):
self.mode = g.G_MODE_ID
self.com_if = g.G_COM_IF
# This flag could be used later to command the TMTC Client with a front-end
self.command_received = True
self.tmtc_printer = TmTcPrinter(g.G_DISPLAY_MODE, g.G_PRINT_TO_FILE, True) self.tmtc_printer = TmTcPrinter(g.G_DISPLAY_MODE, g.G_PRINT_TO_FILE, True)
self.communication_interface = set_communication_interface(self.tmtc_printer) self.communication_interface = set_communication_interface(self.tmtc_printer)
self.tm_listener = TmListener( self.tm_listener = TmListener(
...@@ -133,25 +90,16 @@ class TmTcHandler: ...@@ -133,25 +90,16 @@ class TmTcHandler:
""" """
Periodic operation Periodic operation
""" """
while True: try:
try: self.__core_operation(self.one_shot_operation)
if self.command_received: except KeyboardInterrupt:
self.command_received = False LOGGER.info("Keyboard Interrupt.")
self.handle_action() sys.exit()
if self.mode == g.ModeList.Idle: except IOError as e:
LOGGER.info("TMTC Client in idle mode") LOGGER.error("IO Error occured!")
time.sleep(5) sys.exit()
if self.mode == g.ModeList.ListenerMode:
time.sleep(1) def __handle_action(self):
except KeyboardInterrupt:
LOGGER.info("Closing TMTC client.")
sys.exit()
except IOError as e:
LOGGER.exception(e)
LOGGER.info("Closing TMTC client.")
sys.exit()
def handle_action(self):
""" """
Command handling. Command handling.
""" """
...@@ -159,27 +107,31 @@ class TmTcHandler: ...@@ -159,27 +107,31 @@ class TmTcHandler:
self.prompt_mode() self.prompt_mode()
if self.mode == g.ModeList.ListenerMode: if self.mode == g.ModeList.ListenerMode:
if self.tm_listener.event_reply_received.is_set(): if self.tm_listener.reply_event():
LOGGER.info("TmTcHandler: Packets received.") LOGGER.info("TmTcHandler: Packets received.")
self.tmtc_printer.print_telemetry_queue(self.tm_listener.retrieve_tm_packet_queue()) self.tmtc_printer.print_telemetry_queue(self.tm_listener.retrieve_tm_packet_queue())
self.tm_listener.clear_tm_packet_queue() self.tm_listener.clear_tm_packet_queue()
self.tm_listener.event_reply_received.clear() self.tm_listener.clear_reply_event()
self.command_received = True
elif self.mode == g.ModeList.SingleCommandMode: elif self.mode == g.ModeList.SingleCommandMode:
pus_packet_tuple = command_preparation() if self.single_command_package is None:
pus_packet_tuple = command_preparation()
else:
LOGGER.info("send package from gui")
pus_packet_tuple = self.single_command_package
sender_and_receiver = SingleCommandSenderReceiver( sender_and_receiver = SingleCommandSenderReceiver(
com_interface=self.communication_interface, tmtc_printer=self.tmtc_printer, com_interface=self.communication_interface, tmtc_printer=self.tmtc_printer,
tm_listener=self.tm_listener) tm_listener=self.tm_listener)
LOGGER.info("Performing single command operation") LOGGER.info("Performing single command operation")
sender_and_receiver.send_single_tc_and_receive_tm(pus_packet_tuple=pus_packet_tuple) sender_and_receiver.send_single_tc_and_receive_tm(pus_packet_tuple=pus_packet_tuple)
self.command_received = True
self.mode = g.ModeList.PromptMode self.mode = g.ModeList.PromptMode
elif self.mode == g.ModeList.ServiceTestMode: elif self.mode == g.ModeList.ServiceTestMode:
service_queue = deque() service_queue = deque()
service_queue_packer = ServiceQueuePacker() service_queue_packer = ServiceQueuePacker()
service_queue_packer.pack_service_queue(g.G_SERVICE, service_queue) op_code = g.G_OP_CODE
service_queue_packer.pack_service_queue(
service=g.G_SERVICE, service_queue=service_queue, op_code=op_code)
if not self.communication_interface.valid: if not self.communication_interface.valid:
return return
LOGGER.info("Performing service command operation") LOGGER.info("Performing service command operation")
...@@ -187,7 +139,6 @@ class TmTcHandler: ...@@ -187,7 +139,6 @@ class TmTcHandler:
com_interface=self.communication_interface, tmtc_printer=self.tmtc_printer, com_interface=self.communication_interface, tmtc_printer=self.tmtc_printer,
tm_listener=self.tm_listener, tc_queue=service_queue) tm_listener=self.tm_listener, tc_queue=service_queue)
sender_and_receiver.send_queue_tc_and_receive_tm_sequentially() sender_and_receiver.send_queue_tc_and_receive_tm_sequentially()
self.command_received = True
self.mode = g.ModeList.ListenerMode self.mode = g.ModeList.ListenerMode
elif self.mode == g.ModeList.SoftwareTestMode: elif self.mode == g.ModeList.SoftwareTestMode:
...@@ -203,8 +154,9 @@ class TmTcHandler: ...@@ -203,8 +154,9 @@ class TmTcHandler:
elif self.mode == g.ModeList.BinaryUploadMode: elif self.mode == g.ModeList.BinaryUploadMode:
# Upload binary, prompt user for input, in the end prompt for new mode and enter that # Upload binary, prompt user for input, in the end prompt for new mode and enter that
# mode # mode
perform_binary_upload() file_uploader = BinaryFileUploader(self.communication_interface, self.tmtc_printer,
self.command_received = True self.tm_listener)
file_uploader.perform_file_upload()
self.mode = g.ModeList.ListenerMode self.mode = g.ModeList.ListenerMode
elif self.mode == g.ModeList.UnitTest: elif self.mode == g.ModeList.UnitTest:
...@@ -219,6 +171,19 @@ class TmTcHandler: ...@@ -219,6 +171,19 @@ class TmTcHandler:
logging.error("Unknown Mode, Configuration error !") logging.error("Unknown Mode, Configuration error !")
sys.exit() sys.exit()
def __core_operation(self, one_shot):
if not one_shot:
while True:
self.__handle_action()
if self.mode == g.ModeList.Idle:
LOGGER.info("TMTC Client in idle mode")
time.sleep(5)
elif self.mode == g.ModeList.ListenerMode:
time.sleep(1)
else:
self.__handle_action()
def prompt_mode(self): def prompt_mode(self):
next_mode = input("Please enter next mode (enter h for list of modes): ") next_mode = input("Please enter next mode (enter h for list of modes): ")
if next_mode == 'h': if next_mode == 'h':
...@@ -235,6 +200,22 @@ class TmTcHandler: ...@@ -235,6 +200,22 @@ class TmTcHandler:
else: else:
self.mode = g.ModeList.ListenerMode self.mode = g.ModeList.ListenerMode
# These two will not be used for now.
@staticmethod
def prepare_tmtc_handler_start_in_process(init_mode: ModeList):
tmtc_handler_task = Process(target=TmTcHandler.start_tmtc_handler, args=(init_mode, ))
return tmtc_handler_task
@staticmethod
def start_tmtc_handler(handler_args: any):
tmtc_handler = TmTcHandler(handler_args)
tmtc_handler.initialize()
tmtc_handler.perform_operation()
if __name__ == "__main__": def command_preparation() -> Tuple[bytearray, Union[None, PusTcInfo]]:
main() """
Prepare command for single command testing
:return:
"""
return command_preparation_hook()
#!/usr/bin/python3
"""
@brief This client was developed by KSat for the SOURCE project to test the on-board software.
@details
This client features multiple sender/receiver modes and has been designed
to be extensible and easy to use. This clien is based on the PUS standard for the format
of telecommands and telemetry. It can also send TMTC via different interfaces like the
serial interface (USB port) or ethernet interface.
@license
Copyright 2020 KSat e.V. Stuttgart
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
@manual
Run this file with the -h flag to display options.
"""
from multiprocessing import Process
from tmtc_core.utility.obsw_logger import set_tmtc_logger, get_logger
from config.obsw_config import set_globals
import config.obsw_config as g
from core.tmtc_backend import TmTcHandler
from core.tmtc_frontend import TmTcFrontend
from utility.obsw_args_parser import parse_input_arguments
LOGGER = get_logger()
def run_tmtc_client(use_gui: bool):
"""
Main method, reads input arguments, sets global variables and start TMTC handler.
"""
set_tmtc_logger()
LOGGER.info("Starting TMTC Client")
if not use_gui:
LOGGER.info("Parsing input arguments")
args = parse_input_arguments()
LOGGER.info("Setting global variables")
set_globals(args)
LOGGER.info("Starting TMTC Handler")
tmtc_frontend_task = Process
# Currently does not work, problems with QEMU / Asyncio
if not use_gui:
tmtc_handler = TmTcHandler(g.G_MODE_ID)
tmtc_handler.set_one_shot_or_loop_handling(g.G_LISTENER_AFTER_OP)
tmtc_handler.initialize()
tmtc_handler.perform_operation()
else:
tmtc_gui = TmTcFrontend()
tmtc_gui.start_ui()
# tmtc_handler_task = TmTcHandler.prepare_tmtc_handler_start()
# tmtc_frontend = TmTcFrontend()
# tmtc_frontend_task = tmtc_frontend.prepare_start(tmtc_frontend)
# tmtc_frontend_task.start()
# tmtc_handler_task.start()
# tmtc_handler_task.join()
# tmtc_frontend_task.join()